diff --git a/test_gputil.py b/test_gputil.py deleted file mode 100644 index 180e14d..0000000 --- a/test_gputil.py +++ /dev/null @@ -1,48 +0,0 @@ -import platform -import subprocess -import wmi - -# # Windows version -# def get_gpus_windows(): -# w = wmi.WMI() -# gpus = w.Win32_VideoController() -# return [{'name': gpu.Name, 'driver': gpu.DriverVersion} for gpu in gpus] - -# if platform.system() == 'Windows': -# print(get_gpus_windows()) - -# def get_glxinfo_gpu(): -# try: -# output = subprocess.check_output("glxinfo | grep -i 'device\|vendor'", shell=True).decode() -# return output -# except Exception as e: -# return f"Error: {e}" - -# if platform.system() == 'Linux': -# print(get_glxinfo_gpu()) - -# # Install pyopencl with pip if not already installed: pip install pyopencl -# import pyopencl as cl - -# def get_opencl_gpus(): -# platforms = cl.get_platforms() -# gpu_info = [] -# for platform in platforms: -# devices = platform.get_devices(device_type=cl.device_type.GPU) -# for device in devices: -# gpu_info.append({ -# 'name': device.name, -# 'vendor': device.vendor, -# 'version': device.version, -# 'global_mem_size': device.global_mem_size, -# 'max_compute_units': device.max_compute_units -# }) -# return gpu_info - -# print(get_opencl_gpus()) - -from pyadl import * - -devices = ADLManager.getInstance().getDevices() -for device in devices: - print("{0}. {1}".format(device.adapterIndex, device.adapterName)) diff --git a/test_pagination.py b/test_pagination.py deleted file mode 100644 index ff9f4af..0000000 --- a/test_pagination.py +++ /dev/null @@ -1,52 +0,0 @@ -import asyncio -import logging -import sys -import os - -# Configure logging -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" -) -log = logging.getLogger("test_pagination") - -# Add the current directory to the path so we can import the cogs -sys.path.append(os.getcwd()) - -from cogs.safebooru_cog import SafebooruCog -from discord.ext import commands -import discord - - -async def test_pagination(): - # Create a mock bot with intents - intents = discord.Intents.default() - bot = commands.Bot(command_prefix="!", intents=intents) - - # Initialize the cog - cog = SafebooruCog(bot) - - # Test the pagination for a specific tag - tag = "kasane_teto" - log.info(f"Testing pagination for tag: {tag}") - - # Call the _fetch_posts_logic method - results = await cog._fetch_posts_logic("test", tag) - - # Check the results - if isinstance(results, tuple): - log.info(f"Found {len(results[1])} results") - # Print the first few results - for i, result in enumerate(results[1][:5]): - log.info(f"Result {i+1}: {result.get('id')} - {result.get('file_url')}") - else: - log.error(f"Error: {results}") - - # Clean up - if hasattr(cog, "session") and cog.session and not cog.session.closed: - await cog.session.close() - log.info("Closed aiohttp session") - - -if __name__ == "__main__": - # Run the test - asyncio.run(test_pagination()) diff --git a/test_part.py b/test_part.py deleted file mode 100644 index 50d3dbd..0000000 --- a/test_part.py +++ /dev/null @@ -1,20 +0,0 @@ -# Test script for Part constructor -try: - from gurt.api import types - - print("Successfully imported types module") - - # Test creating a Part with text - part = types.Part(text="test") - print(f"Successfully created Part with text: {part}") - - # Test creating a Part with URI - part_uri = types.Part(uri="https://example.com", mime_type="text/plain") - print(f"Successfully created Part with URI: {part_uri}") - - print("All tests passed!") -except Exception as e: - print(f"Error: {type(e).__name__}: {e}") - import traceback - - traceback.print_exc() diff --git a/test_starboard.py b/test_starboard.py deleted file mode 100644 index 29895c5..0000000 --- a/test_starboard.py +++ /dev/null @@ -1,61 +0,0 @@ -import asyncio -import discord -from discord.ext import commands -import os -from dotenv import load_dotenv -import logging -import sys - -# Add the parent directory to sys.path to allow imports -sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - -# Import the starboard cog and settings manager -from cogs.starboard_cog import StarboardCog -import settings_manager as settings_manager - -# Load environment variables -load_dotenv() - -# Configure logging -logging.basicConfig( - level=logging.INFO, format="%(asctime)s:%(levelname)s:%(name)s: %(message)s" -) -log = logging.getLogger(__name__) - -# Set up intents -intents = discord.Intents.default() -intents.message_content = True -intents.members = True - -# Create bot instance -bot = commands.Bot(command_prefix="!", intents=intents) - - -@bot.event -async def on_ready(): - log.info(f"{bot.user.name} has connected to Discord!") - log.info(f"Bot ID: {bot.user.id}") - - # Load the starboard cog - try: - await bot.add_cog(StarboardCog(bot)) - log.info("StarboardCog loaded successfully!") - except Exception as e: - log.error(f"Error loading StarboardCog: {e}") - - -async def main(): - TOKEN = os.getenv("DISCORD_TOKEN") - if not TOKEN: - raise ValueError( - "No token found. Make sure to set DISCORD_TOKEN in your .env file." - ) - - try: - await bot.start(TOKEN) - except Exception as e: - log.exception(f"Error starting bot: {e}") - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/test_timeout_config.py b/test_timeout_config.py deleted file mode 100644 index 1658ab2..0000000 --- a/test_timeout_config.py +++ /dev/null @@ -1,64 +0,0 @@ -import json -import os - -# Define the path for the JSON file to store timeout chance -TIMEOUT_CONFIG_FILE = os.path.join("data", "timeout_config.json") - - -def load_timeout_config(): - """Load timeout configuration from JSON file""" - timeout_chance = 0.005 # Default value - if os.path.exists(TIMEOUT_CONFIG_FILE): - try: - with open(TIMEOUT_CONFIG_FILE, "r") as f: - data = json.load(f) - if "timeout_chance" in data: - timeout_chance = data["timeout_chance"] - print(f"Loaded timeout chance: {timeout_chance}") - else: - print("timeout_chance not found in config file") - except Exception as e: - print(f"Error loading timeout configuration: {e}") - else: - print(f"Config file does not exist: {TIMEOUT_CONFIG_FILE}") - return timeout_chance - - -def save_timeout_config(timeout_chance): - """Save timeout configuration to JSON file""" - try: - # Ensure data directory exists - os.makedirs(os.path.dirname(TIMEOUT_CONFIG_FILE), exist_ok=True) - - config_data = { - "timeout_chance": timeout_chance, - "target_user_id": 748405715520978965, - "timeout_duration": 60, - } - with open(TIMEOUT_CONFIG_FILE, "w") as f: - json.dump(config_data, f, indent=4) - print(f"Saved timeout configuration with chance: {timeout_chance}") - return True - except Exception as e: - print(f"Error saving timeout configuration: {e}") - return False - - -# Test the functionality -if __name__ == "__main__": - # Load the current config - current_chance = load_timeout_config() - print(f"Current timeout chance: {current_chance}") - - # Update the timeout chance - new_chance = 0.01 # 1% - if save_timeout_config(new_chance): - print(f"Successfully updated timeout chance to {new_chance}") - - # Load the config again to verify it was saved - updated_chance = load_timeout_config() - print(f"Updated timeout chance: {updated_chance}") - - # Restore the original value - if save_timeout_config(current_chance): - print(f"Restored timeout chance to original value: {current_chance}") diff --git a/test_url_parser.py b/test_url_parser.py deleted file mode 100644 index c49f5d7..0000000 --- a/test_url_parser.py +++ /dev/null @@ -1,41 +0,0 @@ -import re -from typing import Optional, Tuple - - -# Copy of the fixed parse_repo_url function -def parse_repo_url(url: str) -> Tuple[Optional[str], Optional[str]]: - """Parses a Git repository URL to extract platform and a simplified repo identifier.""" - # Fixed regex pattern for GitHub URLs - github_match = re.match( - r"^(?:https?://)?(?:www\.)?github\.com/([\w.-]+/[\w.-]+)(?:\.git)?/?$", url - ) - if github_match: - return "github", github_match.group(1) - - gitlab_match = re.match( - r"^(?:https?://)?(?:www\.)?gitlab\.com/([\w.-]+(?:/[\w.-]+)+)(?:\.git)?/?$", url - ) - if gitlab_match: - return "gitlab", gitlab_match.group(1) - return None, None - - -# Test URLs -test_urls = [ - "https://github.com/Slipstreamm/discordbot", - "http://github.com/Slipstreamm/discordbot", - "github.com/Slipstreamm/discordbot", - "www.github.com/Slipstreamm/discordbot", - "https://github.com/Slipstreamm/git", - "https://gitlab.com/group/project", - "https://gitlab.com/group/subgroup/project", - "invalid-url", -] - -# Test each URL -print("Testing URL parsing with fixed regex pattern:") -print("-" * 50) -for url in test_urls: - platform, repo_id = parse_repo_url(url) - result = f"Valid: {platform}, {repo_id}" if platform else "Invalid URL" - print(f"{url} => {result}") diff --git a/test_usage_counters.py b/test_usage_counters.py deleted file mode 100644 index 0f40f99..0000000 --- a/test_usage_counters.py +++ /dev/null @@ -1,121 +0,0 @@ -#!/usr/bin/env python3 -""" -Test script to verify usage counter functionality. -This script demonstrates how to query the usage counters table. -""" - -import asyncio -import asyncpg -import os -from dotenv import load_dotenv - -# Load environment variables -load_dotenv() - - -async def test_usage_counters(): - """Test the usage counters functionality.""" - - # Create database connection - try: - conn_string = f"postgresql://{os.getenv('POSTGRES_USER')}:{os.getenv('POSTGRES_PASSWORD')}@{os.getenv('POSTGRES_HOST')}:{os.getenv('POSTGRES_PORT')}/{os.getenv('POSTGRES_SETTINGS_DB')}" - conn = await asyncpg.connect(conn_string) - print("โœ… Connected to database successfully") - except Exception as e: - print(f"โŒ Failed to connect to database: {e}") - return - - try: - # Check if the table exists - table_exists = await conn.fetchval( - """ - SELECT EXISTS ( - SELECT FROM information_schema.tables - WHERE table_name = 'command_usage_counters' - ) - """ - ) - - if table_exists: - print("โœ… command_usage_counters table exists") - - # Get some sample data - records = await conn.fetch( - """ - SELECT user1_id, user2_id, command_name, usage_count - FROM command_usage_counters - ORDER BY usage_count DESC - LIMIT 10 - """ - ) - - if records: - print("\n๐Ÿ“Š Top 10 command usages:") - print("User1 ID | User2 ID | Command | Count") - print("-" * 45) - for record in records: - print( - f"{record['user1_id']} | {record['user2_id']} | {record['command_name']} | {record['usage_count']}" - ) - else: - print("๐Ÿ“ No usage data found yet (table is empty)") - - # Get total count - total_count = await conn.fetchval( - "SELECT COUNT(*) FROM command_usage_counters" - ) - print(f"\n๐Ÿ“ˆ Total unique user-command combinations: {total_count}") - - else: - print("โš ๏ธ command_usage_counters table does not exist yet") - print(" It will be created automatically when a command is first used") - - except Exception as e: - print(f"โŒ Error querying database: {e}") - finally: - await conn.close() - print("๐Ÿ”Œ Database connection closed") - - -async def get_usage_for_users(user1_id: int, user2_id: int): - """Get usage statistics for a specific pair of users.""" - - try: - conn_string = f"postgresql://{os.getenv('POSTGRES_USER')}:{os.getenv('POSTGRES_PASSWORD')}@{os.getenv('POSTGRES_HOST')}:{os.getenv('POSTGRES_PORT')}/{os.getenv('POSTGRES_SETTINGS_DB')}" - conn = await asyncpg.connect(conn_string) - - records = await conn.fetch( - """ - SELECT command_name, usage_count - FROM command_usage_counters - WHERE user1_id = $1 AND user2_id = $2 - ORDER BY usage_count DESC - """, - user1_id, - user2_id, - ) - - if records: - print(f"\n๐Ÿ‘ฅ Usage between users {user1_id} and {user2_id}:") - print("Command | Count") - print("-" * 20) - for record in records: - print(f"{record['command_name']} | {record['usage_count']}") - else: - print(f"๐Ÿ“ No usage data found between users {user1_id} and {user2_id}") - - await conn.close() - - except Exception as e: - print(f"โŒ Error querying user data: {e}") - - -if __name__ == "__main__": - print("๐Ÿงช Testing Usage Counters Functionality") - print("=" * 40) - - # Test basic functionality - asyncio.run(test_usage_counters()) - - # Example: Get usage for specific users (replace with actual user IDs) - # asyncio.run(get_usage_for_users(123456789, 987654321)) diff --git a/tests/test_custom_bot_manager.py b/tests/test_custom_bot_manager.py new file mode 100644 index 0000000..67a6ad5 --- /dev/null +++ b/tests/test_custom_bot_manager.py @@ -0,0 +1,50 @@ +import os +import sys +import time +import asyncio +from unittest.mock import patch + +# Ensure the project root is on sys.path so we can import modules +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import custom_bot_manager as cbm + + +async def fake_start(self, token): + cbm.custom_bot_status[self.user_id] = cbm.STATUS_RUNNING + while not getattr(self, "_stop_flag", False): + await asyncio.sleep(0.01) + + +async def fake_close(self): + self._stop_flag = True + self._closed = True + + +def test_custom_bot_lifecycle(): + user_id = "test_user" + token = "fake_token" + + with patch("custom_bot_manager.commands.Bot.start", new=fake_start), patch( + "custom_bot_manager.CustomBot.close", new=fake_close + ): + success, _ = cbm.create_custom_bot(user_id, token) + assert success + assert cbm.custom_bot_status[user_id] == cbm.STATUS_STOPPED + + success, _ = cbm.run_custom_bot_in_thread(user_id, token) + assert success + assert user_id in cbm.custom_bot_threads + + # Allow the start coroutine to run + time.sleep(0.05) + assert cbm.custom_bot_status[user_id] == cbm.STATUS_RUNNING + + thread = cbm.custom_bot_threads[user_id] + success, _ = cbm.stop_custom_bot(user_id) + assert success + + # Wait for the bot thread to exit + thread.join(timeout=1.0) + assert cbm.custom_bot_status[user_id] == cbm.STATUS_STOPPED + assert user_id not in cbm.custom_bot_threads diff --git a/tests/test_git_monitor.py b/tests/test_git_monitor.py new file mode 100644 index 0000000..8a286c2 --- /dev/null +++ b/tests/test_git_monitor.py @@ -0,0 +1,52 @@ +import os +import sys +import pytest + +# Ensure the project root is on sys.path so we can import modules +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from cogs.git_monitor_cog import parse_repo_url + + +@pytest.mark.parametrize( + "url,expected", + [ + ("https://github.com/user/repo", ("github", "user/repo")), + ("http://github.com/user/repo", ("github", "user/repo")), + ("github.com/user/repo", ("github", "user/repo")), + ("https://www.github.com/user/repo/", ("github", "user/repo")), + ("https://github.com/user/repo.git", ("github", "user/repo")), + ("https://github.com/user-name/re.po", ("github", "user-name/re.po")), + ("https://gitlab.com/group/project", ("gitlab", "group/project")), + ( + "https://gitlab.com/group/subgroup/project", + ("gitlab", "group/subgroup/project"), + ), + ("gitlab.com/group/subgroup/project.git", ("gitlab", "group/subgroup/project")), + ( + "http://www.gitlab.com/group/subgroup/project/", + ("gitlab", "group/subgroup/project"), + ), + ], +) +def test_parse_repo_url_valid(url, expected): + assert parse_repo_url(url) == expected + + +@pytest.mark.parametrize( + "url", + [ + "https://github.com/", + "https://github.com/user", + "https://gitlab.com/", + "https://gitlab.com/group", + "ftp://github.com/user/repo", + "http:/github.com/user/repo", + "not a url", + "https://gitlabx.com/group/project", + "gitlab.com/group//project", + "github.com/user/repo/extra", + ], +) +def test_parse_repo_url_invalid(url): + assert parse_repo_url(url) == (None, None) diff --git a/tests/test_gputil.py b/tests/test_gputil.py new file mode 100644 index 0000000..abd8973 --- /dev/null +++ b/tests/test_gputil.py @@ -0,0 +1,26 @@ +import pytest + +try: + import pyadl +except ImportError: # pragma: no cover - dependency optional + pyadl = None + + +class FakeDevice: + def __init__(self, index: int, name: str): + self.adapterIndex = index + self.adapterName = name + + +class FakeManager: + def getDevices(self): + return [FakeDevice(0, "GPU0"), FakeDevice(1, "GPU1")] + + +@pytest.mark.skipif(pyadl is None, reason="pyadl not installed") +def test_pyadl_devices(monkeypatch): + monkeypatch.setattr(pyadl.ADLManager, "getInstance", lambda: FakeManager()) + + devices = pyadl.ADLManager.getInstance().getDevices() + assert len(devices) == 2 + assert devices[0].adapterName == "GPU0" diff --git a/tests/test_pagination.py b/tests/test_pagination.py new file mode 100644 index 0000000..6718f76 --- /dev/null +++ b/tests/test_pagination.py @@ -0,0 +1,49 @@ +import asyncio +import pytest +import discord +from discord.ext import commands + +from cogs.safebooru_cog import SafebooruCog + + +class MockResponse: + def __init__(self, status: int, data): + self.status = status + self._data = data + + async def json(self): + return self._data + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return False + + +class MockSession: + def __init__(self, data): + self.data = data + self.closed = False + + def get(self, *args, **kwargs): + return MockResponse(200, self.data) + + async def close(self): + self.closed = True + + +@pytest.mark.asyncio +async def test_fetch_posts_logic(monkeypatch): + intents = discord.Intents.none() + bot = commands.Bot(command_prefix="!", intents=intents) + cog = SafebooruCog(bot) + mock_data = [{"id": "1", "file_url": "http://example.com/image.jpg"}] + monkeypatch.setattr(cog, "session", MockSession(mock_data)) + + results = await cog._fetch_posts_logic( + "test", "tag", pid_override=0, limit_override=1 + ) + + assert isinstance(results, list) + assert results[0]["id"] == "1" diff --git a/tests/test_part.py b/tests/test_part.py new file mode 100644 index 0000000..c5d4e15 --- /dev/null +++ b/tests/test_part.py @@ -0,0 +1,18 @@ +import sys +import pytest + + +def test_part_constructors(): + try: + import google.generativeai as generativeai + + sys.modules.setdefault("google.genai", generativeai) + from gurt.api import types + except Exception as e: # pragma: no cover - skip if dependencies missing + pytest.skip(f"gurt.api unavailable: {e}") + + part = types.Part(text="test") + assert part + + uri_part = types.Part(uri="https://example.com", mime_type="text/plain") + assert getattr(uri_part, "uri", None) == "https://example.com" diff --git a/tests/test_settings_manager.py b/tests/test_settings_manager.py new file mode 100644 index 0000000..adaeb06 --- /dev/null +++ b/tests/test_settings_manager.py @@ -0,0 +1,77 @@ +import os +import sys +import asyncio +from unittest.mock import AsyncMock, MagicMock +import pytest + +# Ensure the project root is on sys.path so we can import modules +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from settings_manager import get_guild_prefix, _get_redis_key + + +class DummyAcquire: + def __init__(self, conn, pool): + self._conn = conn + self._pool = pool + + async def __aenter__(self): + self._pool.acquire_enter_called += 1 + return self._conn + + async def __aexit__(self, exc_type, exc, tb): + pass + + +class DummyPool: + def __init__(self, conn): + self._conn = conn + self.acquire_enter_called = 0 + + def acquire(self): + return DummyAcquire(self._conn, self) + + +class DummyBot: + def __init__(self, pg_pool, redis): + self.pg_pool = pg_pool + self.redis = redis + + +def run_async(coro): + return asyncio.get_event_loop().run_until_complete(coro) + + +def test_get_guild_prefix_caching(monkeypatch): + guild_id = 123 + prefix = "!" + + # Mock Postgres connection and pool + mock_conn = MagicMock() + mock_conn.fetchval = AsyncMock(return_value=prefix) + pg_pool = DummyPool(mock_conn) + + # Mock redis client + redis_mock = MagicMock() + redis_mock.get = AsyncMock(side_effect=[None, prefix]) + redis_mock.set = AsyncMock() + + bot = DummyBot(pg_pool, redis_mock) + monkeypatch.setattr("settings_manager.get_bot_instance", lambda: bot) + + cache_key = _get_redis_key(guild_id, "prefix") + + # First call should hit Postgres and set Redis + result1 = run_async(get_guild_prefix(guild_id, "?")) + assert result1 == prefix + assert pg_pool.acquire_enter_called == 1 + assert mock_conn.fetchval.call_count == 1 + redis_mock.set.assert_called_once_with(cache_key, prefix, ex=3600) + + # Second call should use Redis and not hit Postgres again + result2 = run_async(get_guild_prefix(guild_id, "?")) + assert result2 == prefix + assert pg_pool.acquire_enter_called == 1 + assert mock_conn.fetchval.call_count == 1 + assert redis_mock.get.call_count == 2 + assert redis_mock.set.call_count == 1 diff --git a/tests/test_starboard.py b/tests/test_starboard.py new file mode 100644 index 0000000..9e33908 --- /dev/null +++ b/tests/test_starboard.py @@ -0,0 +1,14 @@ +import pytest +import discord +from discord.ext import commands + +from cogs.starboard_cog import StarboardCog + + +@pytest.mark.asyncio +async def test_starboard_cog_load(): + intents = discord.Intents.none() + bot = commands.Bot(command_prefix="!", intents=intents) + cog = StarboardCog(bot) + await bot.add_cog(cog) + assert cog in bot.cogs.values() diff --git a/tests/test_starboard_db.py b/tests/test_starboard_db.py new file mode 100644 index 0000000..502f726 --- /dev/null +++ b/tests/test_starboard_db.py @@ -0,0 +1,95 @@ +"""Tests for starboard database helper functions.""" + +# pylint: disable=wrong-import-position + +import os +import sys + +# Ensure project root is on sys.path +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from unittest.mock import AsyncMock, patch + +import pytest # pylint: disable=import-error + +import settings_manager # pylint: disable=import-error + + +class DummyBot: + """Simple container for a pg_pool mock.""" + + def __init__(self, pg_pool): + self.pg_pool = pg_pool + + +@pytest.mark.asyncio +async def test_create_starboard_entry(): + """Verify create_starboard_entry executes expected queries.""" + + conn = AsyncMock() + acquire_cm = AsyncMock() + acquire_cm.__aenter__.return_value = conn + acquire_cm.__aexit__.return_value = None + + pg_pool = AsyncMock() + pg_pool.acquire.return_value = acquire_cm + + bot = DummyBot(pg_pool) + with patch.object(settings_manager, "get_bot_instance", return_value=bot): + result = await settings_manager.create_starboard_entry( + guild_id=1, + original_message_id=2, + original_channel_id=3, + starboard_message_id=4, + author_id=5, + star_count=6, + ) + + assert result is True + pg_pool.acquire.assert_called_once() + assert conn.execute.await_count == 2 + + +@pytest.mark.asyncio +async def test_update_starboard_entry(): + """Verify update_starboard_entry updates star count.""" + + conn = AsyncMock() + pg_pool = AsyncMock() + pg_pool.acquire = AsyncMock(return_value=conn) + pg_pool.release = AsyncMock() + + bot = DummyBot(pg_pool) + with patch.object(settings_manager, "get_bot_instance", return_value=bot): + result = await settings_manager.update_starboard_entry( + guild_id=1, original_message_id=2, star_count=3 + ) + + assert result is True + pg_pool.acquire.assert_called_once() + conn.execute.assert_awaited_once() + pg_pool.release.assert_called_once_with(conn) + + +@pytest.mark.asyncio +async def test_get_starboard_entry(): + """Verify get_starboard_entry fetches the row and returns a dict.""" + + entry_data = {"guild_id": 1, "original_message_id": 2} + conn = AsyncMock() + conn.fetchrow = AsyncMock(return_value=entry_data) + + acquire_cm = AsyncMock() + acquire_cm.__aenter__.return_value = conn + acquire_cm.__aexit__.return_value = None + + pg_pool = AsyncMock() + pg_pool.acquire.return_value = acquire_cm + + bot = DummyBot(pg_pool) + with patch.object(settings_manager, "get_bot_instance", return_value=bot): + result = await settings_manager.get_starboard_entry(1, 2) + + assert result == entry_data + pg_pool.acquire.assert_called_once() + conn.fetchrow.assert_awaited_once() diff --git a/tests/test_timeout_config.py b/tests/test_timeout_config.py new file mode 100644 index 0000000..d07c962 --- /dev/null +++ b/tests/test_timeout_config.py @@ -0,0 +1,29 @@ +import json +from pathlib import Path + + +def load_timeout_config(path: Path) -> float: + timeout_chance = 0.005 + if path.exists(): + with open(path, "r") as f: + data = json.load(f) + timeout_chance = data.get("timeout_chance", timeout_chance) + return timeout_chance + + +def save_timeout_config(path: Path, timeout_chance: float) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + data = { + "timeout_chance": timeout_chance, + "target_user_id": 748405715520978965, + "timeout_duration": 60, + } + with open(path, "w") as f: + json.dump(data, f, indent=4) + + +def test_timeout_config_roundtrip(tmp_path: Path): + cfg = tmp_path / "timeout_config.json" + save_timeout_config(cfg, 0.01) + assert cfg.exists() + assert load_timeout_config(cfg) == 0.01 diff --git a/tests/test_url_parser.py b/tests/test_url_parser.py new file mode 100644 index 0000000..3e66adf --- /dev/null +++ b/tests/test_url_parser.py @@ -0,0 +1,47 @@ +import re +from typing import Optional, Tuple +import pytest + + +def parse_repo_url(url: str) -> Tuple[Optional[str], Optional[str]]: + """Parses a Git repository URL and returns platform and repo id.""" + github_match = re.match( + r"^(?:https?://)?(?:www\.)?github\.com/([\w.-]+/[\w.-]+)(?:\.git)?/?$", + url, + ) + if github_match: + return "github", github_match.group(1) + + gitlab_match = re.match( + r"^(?:https?://)?(?:www\.)?gitlab\.com/([\w.-]+(?:/[\w.-]+)+)(?:\.git)?/?$", + url, + ) + if gitlab_match: + return "gitlab", gitlab_match.group(1) + return None, None + + +@pytest.mark.parametrize( + "url,expected", + [ + ( + "https://github.com/Slipstreamm/discordbot", + ("github", "Slipstreamm/discordbot"), + ), + ( + "http://github.com/Slipstreamm/discordbot", + ("github", "Slipstreamm/discordbot"), + ), + ("github.com/Slipstreamm/discordbot", ("github", "Slipstreamm/discordbot")), + ("www.github.com/Slipstreamm/discordbot", ("github", "Slipstreamm/discordbot")), + ("https://github.com/Slipstreamm/git", ("github", "Slipstreamm/git")), + ("https://gitlab.com/group/project", ("gitlab", "group/project")), + ( + "https://gitlab.com/group/subgroup/project", + ("gitlab", "group/subgroup/project"), + ), + ("invalid-url", (None, None)), + ], +) +def test_parse_repo_url(url: str, expected: Tuple[Optional[str], Optional[str]]): + assert parse_repo_url(url) == expected diff --git a/tests/test_usage_counters.py b/tests/test_usage_counters.py new file mode 100644 index 0000000..1ac9e5a --- /dev/null +++ b/tests/test_usage_counters.py @@ -0,0 +1,19 @@ +import os + + +def build_conn_string() -> str: + return ( + f"postgresql://{os.getenv('POSTGRES_USER')}:{os.getenv('POSTGRES_PASSWORD')}@" + f"{os.getenv('POSTGRES_HOST')}:{os.getenv('POSTGRES_PORT')}/" + f"{os.getenv('POSTGRES_SETTINGS_DB')}" + ) + + +def test_connection_string(monkeypatch): + monkeypatch.setenv("POSTGRES_USER", "user") + monkeypatch.setenv("POSTGRES_PASSWORD", "pass") + monkeypatch.setenv("POSTGRES_HOST", "localhost") + monkeypatch.setenv("POSTGRES_PORT", "5432") + monkeypatch.setenv("POSTGRES_SETTINGS_DB", "db") + + assert build_conn_string() == "postgresql://user:pass@localhost:5432/db"