Merge remote-tracking branch 'origin/master'
This commit is contained in:
commit
8452205132
50
tests/test_custom_bot_manager.py
Normal file
50
tests/test_custom_bot_manager.py
Normal file
@ -0,0 +1,50 @@
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import asyncio
|
||||
from unittest.mock import patch
|
||||
|
||||
# Ensure the project root is on sys.path so we can import modules
|
||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
import custom_bot_manager as cbm
|
||||
|
||||
|
||||
async def fake_start(self, token):
|
||||
cbm.custom_bot_status[self.user_id] = cbm.STATUS_RUNNING
|
||||
while not getattr(self, "_stop_flag", False):
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
|
||||
async def fake_close(self):
|
||||
self._stop_flag = True
|
||||
self._closed = True
|
||||
|
||||
|
||||
def test_custom_bot_lifecycle():
|
||||
user_id = "test_user"
|
||||
token = "fake_token"
|
||||
|
||||
with patch("custom_bot_manager.commands.Bot.start", new=fake_start), patch(
|
||||
"custom_bot_manager.CustomBot.close", new=fake_close
|
||||
):
|
||||
success, _ = cbm.create_custom_bot(user_id, token)
|
||||
assert success
|
||||
assert cbm.custom_bot_status[user_id] == cbm.STATUS_STOPPED
|
||||
|
||||
success, _ = cbm.run_custom_bot_in_thread(user_id, token)
|
||||
assert success
|
||||
assert user_id in cbm.custom_bot_threads
|
||||
|
||||
# Allow the start coroutine to run
|
||||
time.sleep(0.05)
|
||||
assert cbm.custom_bot_status[user_id] == cbm.STATUS_RUNNING
|
||||
|
||||
thread = cbm.custom_bot_threads[user_id]
|
||||
success, _ = cbm.stop_custom_bot(user_id)
|
||||
assert success
|
||||
|
||||
# Wait for the bot thread to exit
|
||||
thread.join(timeout=1.0)
|
||||
assert cbm.custom_bot_status[user_id] == cbm.STATUS_STOPPED
|
||||
assert user_id not in cbm.custom_bot_threads
|
77
tests/test_settings_manager.py
Normal file
77
tests/test_settings_manager.py
Normal file
@ -0,0 +1,77 @@
|
||||
import os
|
||||
import sys
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
import pytest
|
||||
|
||||
# Ensure the project root is on sys.path so we can import modules
|
||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from settings_manager import get_guild_prefix, _get_redis_key
|
||||
|
||||
|
||||
class DummyAcquire:
|
||||
def __init__(self, conn, pool):
|
||||
self._conn = conn
|
||||
self._pool = pool
|
||||
|
||||
async def __aenter__(self):
|
||||
self._pool.acquire_enter_called += 1
|
||||
return self._conn
|
||||
|
||||
async def __aexit__(self, exc_type, exc, tb):
|
||||
pass
|
||||
|
||||
|
||||
class DummyPool:
|
||||
def __init__(self, conn):
|
||||
self._conn = conn
|
||||
self.acquire_enter_called = 0
|
||||
|
||||
def acquire(self):
|
||||
return DummyAcquire(self._conn, self)
|
||||
|
||||
|
||||
class DummyBot:
|
||||
def __init__(self, pg_pool, redis):
|
||||
self.pg_pool = pg_pool
|
||||
self.redis = redis
|
||||
|
||||
|
||||
def run_async(coro):
|
||||
return asyncio.get_event_loop().run_until_complete(coro)
|
||||
|
||||
|
||||
def test_get_guild_prefix_caching(monkeypatch):
|
||||
guild_id = 123
|
||||
prefix = "!"
|
||||
|
||||
# Mock Postgres connection and pool
|
||||
mock_conn = MagicMock()
|
||||
mock_conn.fetchval = AsyncMock(return_value=prefix)
|
||||
pg_pool = DummyPool(mock_conn)
|
||||
|
||||
# Mock redis client
|
||||
redis_mock = MagicMock()
|
||||
redis_mock.get = AsyncMock(side_effect=[None, prefix])
|
||||
redis_mock.set = AsyncMock()
|
||||
|
||||
bot = DummyBot(pg_pool, redis_mock)
|
||||
monkeypatch.setattr("settings_manager.get_bot_instance", lambda: bot)
|
||||
|
||||
cache_key = _get_redis_key(guild_id, "prefix")
|
||||
|
||||
# First call should hit Postgres and set Redis
|
||||
result1 = run_async(get_guild_prefix(guild_id, "?"))
|
||||
assert result1 == prefix
|
||||
assert pg_pool.acquire_enter_called == 1
|
||||
assert mock_conn.fetchval.call_count == 1
|
||||
redis_mock.set.assert_called_once_with(cache_key, prefix, ex=3600)
|
||||
|
||||
# Second call should use Redis and not hit Postgres again
|
||||
result2 = run_async(get_guild_prefix(guild_id, "?"))
|
||||
assert result2 == prefix
|
||||
assert pg_pool.acquire_enter_called == 1
|
||||
assert mock_conn.fetchval.call_count == 1
|
||||
assert redis_mock.get.call_count == 2
|
||||
assert redis_mock.set.call_count == 1
|
Loading…
x
Reference in New Issue
Block a user