diff --git a/tests/test_custom_bot_manager.py b/tests/test_custom_bot_manager.py new file mode 100644 index 0000000..67a6ad5 --- /dev/null +++ b/tests/test_custom_bot_manager.py @@ -0,0 +1,50 @@ +import os +import sys +import time +import asyncio +from unittest.mock import patch + +# Ensure the project root is on sys.path so we can import modules +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import custom_bot_manager as cbm + + +async def fake_start(self, token): + cbm.custom_bot_status[self.user_id] = cbm.STATUS_RUNNING + while not getattr(self, "_stop_flag", False): + await asyncio.sleep(0.01) + + +async def fake_close(self): + self._stop_flag = True + self._closed = True + + +def test_custom_bot_lifecycle(): + user_id = "test_user" + token = "fake_token" + + with patch("custom_bot_manager.commands.Bot.start", new=fake_start), patch( + "custom_bot_manager.CustomBot.close", new=fake_close + ): + success, _ = cbm.create_custom_bot(user_id, token) + assert success + assert cbm.custom_bot_status[user_id] == cbm.STATUS_STOPPED + + success, _ = cbm.run_custom_bot_in_thread(user_id, token) + assert success + assert user_id in cbm.custom_bot_threads + + # Allow the start coroutine to run + time.sleep(0.05) + assert cbm.custom_bot_status[user_id] == cbm.STATUS_RUNNING + + thread = cbm.custom_bot_threads[user_id] + success, _ = cbm.stop_custom_bot(user_id) + assert success + + # Wait for the bot thread to exit + thread.join(timeout=1.0) + assert cbm.custom_bot_status[user_id] == cbm.STATUS_STOPPED + assert user_id not in cbm.custom_bot_threads diff --git a/tests/test_settings_manager.py b/tests/test_settings_manager.py new file mode 100644 index 0000000..adaeb06 --- /dev/null +++ b/tests/test_settings_manager.py @@ -0,0 +1,77 @@ +import os +import sys +import asyncio +from unittest.mock import AsyncMock, MagicMock +import pytest + +# Ensure the project root is on sys.path so we can import modules +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from settings_manager import get_guild_prefix, _get_redis_key + + +class DummyAcquire: + def __init__(self, conn, pool): + self._conn = conn + self._pool = pool + + async def __aenter__(self): + self._pool.acquire_enter_called += 1 + return self._conn + + async def __aexit__(self, exc_type, exc, tb): + pass + + +class DummyPool: + def __init__(self, conn): + self._conn = conn + self.acquire_enter_called = 0 + + def acquire(self): + return DummyAcquire(self._conn, self) + + +class DummyBot: + def __init__(self, pg_pool, redis): + self.pg_pool = pg_pool + self.redis = redis + + +def run_async(coro): + return asyncio.get_event_loop().run_until_complete(coro) + + +def test_get_guild_prefix_caching(monkeypatch): + guild_id = 123 + prefix = "!" + + # Mock Postgres connection and pool + mock_conn = MagicMock() + mock_conn.fetchval = AsyncMock(return_value=prefix) + pg_pool = DummyPool(mock_conn) + + # Mock redis client + redis_mock = MagicMock() + redis_mock.get = AsyncMock(side_effect=[None, prefix]) + redis_mock.set = AsyncMock() + + bot = DummyBot(pg_pool, redis_mock) + monkeypatch.setattr("settings_manager.get_bot_instance", lambda: bot) + + cache_key = _get_redis_key(guild_id, "prefix") + + # First call should hit Postgres and set Redis + result1 = run_async(get_guild_prefix(guild_id, "?")) + assert result1 == prefix + assert pg_pool.acquire_enter_called == 1 + assert mock_conn.fetchval.call_count == 1 + redis_mock.set.assert_called_once_with(cache_key, prefix, ex=3600) + + # Second call should use Redis and not hit Postgres again + result2 = run_async(get_guild_prefix(guild_id, "?")) + assert result2 == prefix + assert pg_pool.acquire_enter_called == 1 + assert mock_conn.fetchval.call_count == 1 + assert redis_mock.get.call_count == 2 + assert redis_mock.set.call_count == 1