import os import sys import asyncio from unittest.mock import AsyncMock, MagicMock import pytest # Ensure the project root is on sys.path so we can import modules sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from settings_manager import get_guild_prefix, _get_redis_key class DummyAcquire: def __init__(self, conn, pool): self._conn = conn self._pool = pool async def __aenter__(self): self._pool.acquire_enter_called += 1 return self._conn async def __aexit__(self, exc_type, exc, tb): pass class DummyPool: def __init__(self, conn): self._conn = conn self.acquire_enter_called = 0 def acquire(self): return DummyAcquire(self._conn, self) class DummyBot: def __init__(self, pg_pool, redis): self.pg_pool = pg_pool self.redis = redis def run_async(coro): return asyncio.get_event_loop().run_until_complete(coro) def test_get_guild_prefix_caching(monkeypatch): guild_id = 123 prefix = "!" # Mock Postgres connection and pool mock_conn = MagicMock() mock_conn.fetchval = AsyncMock(return_value=prefix) pg_pool = DummyPool(mock_conn) # Mock redis client redis_mock = MagicMock() redis_mock.get = AsyncMock(side_effect=[None, prefix]) redis_mock.set = AsyncMock() bot = DummyBot(pg_pool, redis_mock) monkeypatch.setattr("settings_manager.get_bot_instance", lambda: bot) cache_key = _get_redis_key(guild_id, "prefix") # First call should hit Postgres and set Redis result1 = run_async(get_guild_prefix(guild_id, "?")) assert result1 == prefix assert pg_pool.acquire_enter_called == 1 assert mock_conn.fetchval.call_count == 1 redis_mock.set.assert_called_once_with(cache_key, prefix, ex=3600) # Second call should use Redis and not hit Postgres again result2 = run_async(get_guild_prefix(guild_id, "?")) assert result2 == prefix assert pg_pool.acquire_enter_called == 1 assert mock_conn.fetchval.call_count == 1 assert redis_mock.get.call_count == 2 assert redis_mock.set.call_count == 1