diff --git a/tests/test_custom_bot_manager.py b/tests/test_custom_bot_manager.py new file mode 100644 index 0000000..67a6ad5 --- /dev/null +++ b/tests/test_custom_bot_manager.py @@ -0,0 +1,50 @@ +import os +import sys +import time +import asyncio +from unittest.mock import patch + +# Ensure the project root is on sys.path so we can import modules +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import custom_bot_manager as cbm + + +async def fake_start(self, token): + cbm.custom_bot_status[self.user_id] = cbm.STATUS_RUNNING + while not getattr(self, "_stop_flag", False): + await asyncio.sleep(0.01) + + +async def fake_close(self): + self._stop_flag = True + self._closed = True + + +def test_custom_bot_lifecycle(): + user_id = "test_user" + token = "fake_token" + + with patch("custom_bot_manager.commands.Bot.start", new=fake_start), patch( + "custom_bot_manager.CustomBot.close", new=fake_close + ): + success, _ = cbm.create_custom_bot(user_id, token) + assert success + assert cbm.custom_bot_status[user_id] == cbm.STATUS_STOPPED + + success, _ = cbm.run_custom_bot_in_thread(user_id, token) + assert success + assert user_id in cbm.custom_bot_threads + + # Allow the start coroutine to run + time.sleep(0.05) + assert cbm.custom_bot_status[user_id] == cbm.STATUS_RUNNING + + thread = cbm.custom_bot_threads[user_id] + success, _ = cbm.stop_custom_bot(user_id) + assert success + + # Wait for the bot thread to exit + thread.join(timeout=1.0) + assert cbm.custom_bot_status[user_id] == cbm.STATUS_STOPPED + assert user_id not in cbm.custom_bot_threads diff --git a/tests/test_git_monitor.py b/tests/test_git_monitor.py new file mode 100644 index 0000000..8a286c2 --- /dev/null +++ b/tests/test_git_monitor.py @@ -0,0 +1,52 @@ +import os +import sys +import pytest + +# Ensure the project root is on sys.path so we can import modules +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from cogs.git_monitor_cog import parse_repo_url + + +@pytest.mark.parametrize( + "url,expected", + [ + ("https://github.com/user/repo", ("github", "user/repo")), + ("http://github.com/user/repo", ("github", "user/repo")), + ("github.com/user/repo", ("github", "user/repo")), + ("https://www.github.com/user/repo/", ("github", "user/repo")), + ("https://github.com/user/repo.git", ("github", "user/repo")), + ("https://github.com/user-name/re.po", ("github", "user-name/re.po")), + ("https://gitlab.com/group/project", ("gitlab", "group/project")), + ( + "https://gitlab.com/group/subgroup/project", + ("gitlab", "group/subgroup/project"), + ), + ("gitlab.com/group/subgroup/project.git", ("gitlab", "group/subgroup/project")), + ( + "http://www.gitlab.com/group/subgroup/project/", + ("gitlab", "group/subgroup/project"), + ), + ], +) +def test_parse_repo_url_valid(url, expected): + assert parse_repo_url(url) == expected + + +@pytest.mark.parametrize( + "url", + [ + "https://github.com/", + "https://github.com/user", + "https://gitlab.com/", + "https://gitlab.com/group", + "ftp://github.com/user/repo", + "http:/github.com/user/repo", + "not a url", + "https://gitlabx.com/group/project", + "gitlab.com/group//project", + "github.com/user/repo/extra", + ], +) +def test_parse_repo_url_invalid(url): + assert parse_repo_url(url) == (None, None) diff --git a/tests/test_settings_manager.py b/tests/test_settings_manager.py new file mode 100644 index 0000000..adaeb06 --- /dev/null +++ b/tests/test_settings_manager.py @@ -0,0 +1,77 @@ +import os +import sys +import asyncio +from unittest.mock import AsyncMock, MagicMock +import pytest + +# Ensure the project root is on sys.path so we can import modules +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from settings_manager import get_guild_prefix, _get_redis_key + + +class DummyAcquire: + def __init__(self, conn, pool): + self._conn = conn + self._pool = pool + + async def __aenter__(self): + self._pool.acquire_enter_called += 1 + return self._conn + + async def __aexit__(self, exc_type, exc, tb): + pass + + +class DummyPool: + def __init__(self, conn): + self._conn = conn + self.acquire_enter_called = 0 + + def acquire(self): + return DummyAcquire(self._conn, self) + + +class DummyBot: + def __init__(self, pg_pool, redis): + self.pg_pool = pg_pool + self.redis = redis + + +def run_async(coro): + return asyncio.get_event_loop().run_until_complete(coro) + + +def test_get_guild_prefix_caching(monkeypatch): + guild_id = 123 + prefix = "!" + + # Mock Postgres connection and pool + mock_conn = MagicMock() + mock_conn.fetchval = AsyncMock(return_value=prefix) + pg_pool = DummyPool(mock_conn) + + # Mock redis client + redis_mock = MagicMock() + redis_mock.get = AsyncMock(side_effect=[None, prefix]) + redis_mock.set = AsyncMock() + + bot = DummyBot(pg_pool, redis_mock) + monkeypatch.setattr("settings_manager.get_bot_instance", lambda: bot) + + cache_key = _get_redis_key(guild_id, "prefix") + + # First call should hit Postgres and set Redis + result1 = run_async(get_guild_prefix(guild_id, "?")) + assert result1 == prefix + assert pg_pool.acquire_enter_called == 1 + assert mock_conn.fetchval.call_count == 1 + redis_mock.set.assert_called_once_with(cache_key, prefix, ex=3600) + + # Second call should use Redis and not hit Postgres again + result2 = run_async(get_guild_prefix(guild_id, "?")) + assert result2 == prefix + assert pg_pool.acquire_enter_called == 1 + assert mock_conn.fetchval.call_count == 1 + assert redis_mock.get.call_count == 2 + assert redis_mock.set.call_count == 1 diff --git a/tests/test_starboard_db.py b/tests/test_starboard_db.py new file mode 100644 index 0000000..502f726 --- /dev/null +++ b/tests/test_starboard_db.py @@ -0,0 +1,95 @@ +"""Tests for starboard database helper functions.""" + +# pylint: disable=wrong-import-position + +import os +import sys + +# Ensure project root is on sys.path +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from unittest.mock import AsyncMock, patch + +import pytest # pylint: disable=import-error + +import settings_manager # pylint: disable=import-error + + +class DummyBot: + """Simple container for a pg_pool mock.""" + + def __init__(self, pg_pool): + self.pg_pool = pg_pool + + +@pytest.mark.asyncio +async def test_create_starboard_entry(): + """Verify create_starboard_entry executes expected queries.""" + + conn = AsyncMock() + acquire_cm = AsyncMock() + acquire_cm.__aenter__.return_value = conn + acquire_cm.__aexit__.return_value = None + + pg_pool = AsyncMock() + pg_pool.acquire.return_value = acquire_cm + + bot = DummyBot(pg_pool) + with patch.object(settings_manager, "get_bot_instance", return_value=bot): + result = await settings_manager.create_starboard_entry( + guild_id=1, + original_message_id=2, + original_channel_id=3, + starboard_message_id=4, + author_id=5, + star_count=6, + ) + + assert result is True + pg_pool.acquire.assert_called_once() + assert conn.execute.await_count == 2 + + +@pytest.mark.asyncio +async def test_update_starboard_entry(): + """Verify update_starboard_entry updates star count.""" + + conn = AsyncMock() + pg_pool = AsyncMock() + pg_pool.acquire = AsyncMock(return_value=conn) + pg_pool.release = AsyncMock() + + bot = DummyBot(pg_pool) + with patch.object(settings_manager, "get_bot_instance", return_value=bot): + result = await settings_manager.update_starboard_entry( + guild_id=1, original_message_id=2, star_count=3 + ) + + assert result is True + pg_pool.acquire.assert_called_once() + conn.execute.assert_awaited_once() + pg_pool.release.assert_called_once_with(conn) + + +@pytest.mark.asyncio +async def test_get_starboard_entry(): + """Verify get_starboard_entry fetches the row and returns a dict.""" + + entry_data = {"guild_id": 1, "original_message_id": 2} + conn = AsyncMock() + conn.fetchrow = AsyncMock(return_value=entry_data) + + acquire_cm = AsyncMock() + acquire_cm.__aenter__.return_value = conn + acquire_cm.__aexit__.return_value = None + + pg_pool = AsyncMock() + pg_pool.acquire.return_value = acquire_cm + + bot = DummyBot(pg_pool) + with patch.object(settings_manager, "get_bot_instance", return_value=bot): + result = await settings_manager.get_starboard_entry(1, 2) + + assert result == entry_data + pg_pool.acquire.assert_called_once() + conn.fetchrow.assert_awaited_once()