Merge remote-tracking branch 'origin/master'

This commit is contained in:
Slipstream 2025-06-06 04:22:56 +00:00
commit 97d4fc8ba8
Signed by: slipstream
GPG Key ID: 13E498CE010AC6FD
4 changed files with 274 additions and 0 deletions

View File

@ -0,0 +1,50 @@
import os
import sys
import time
import asyncio
from unittest.mock import patch
# Ensure the project root is on sys.path so we can import modules
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import custom_bot_manager as cbm
async def fake_start(self, token):
cbm.custom_bot_status[self.user_id] = cbm.STATUS_RUNNING
while not getattr(self, "_stop_flag", False):
await asyncio.sleep(0.01)
async def fake_close(self):
self._stop_flag = True
self._closed = True
def test_custom_bot_lifecycle():
user_id = "test_user"
token = "fake_token"
with patch("custom_bot_manager.commands.Bot.start", new=fake_start), patch(
"custom_bot_manager.CustomBot.close", new=fake_close
):
success, _ = cbm.create_custom_bot(user_id, token)
assert success
assert cbm.custom_bot_status[user_id] == cbm.STATUS_STOPPED
success, _ = cbm.run_custom_bot_in_thread(user_id, token)
assert success
assert user_id in cbm.custom_bot_threads
# Allow the start coroutine to run
time.sleep(0.05)
assert cbm.custom_bot_status[user_id] == cbm.STATUS_RUNNING
thread = cbm.custom_bot_threads[user_id]
success, _ = cbm.stop_custom_bot(user_id)
assert success
# Wait for the bot thread to exit
thread.join(timeout=1.0)
assert cbm.custom_bot_status[user_id] == cbm.STATUS_STOPPED
assert user_id not in cbm.custom_bot_threads

52
tests/test_git_monitor.py Normal file
View File

@ -0,0 +1,52 @@
import os
import sys
import pytest
# Ensure the project root is on sys.path so we can import modules
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from cogs.git_monitor_cog import parse_repo_url
@pytest.mark.parametrize(
"url,expected",
[
("https://github.com/user/repo", ("github", "user/repo")),
("http://github.com/user/repo", ("github", "user/repo")),
("github.com/user/repo", ("github", "user/repo")),
("https://www.github.com/user/repo/", ("github", "user/repo")),
("https://github.com/user/repo.git", ("github", "user/repo")),
("https://github.com/user-name/re.po", ("github", "user-name/re.po")),
("https://gitlab.com/group/project", ("gitlab", "group/project")),
(
"https://gitlab.com/group/subgroup/project",
("gitlab", "group/subgroup/project"),
),
("gitlab.com/group/subgroup/project.git", ("gitlab", "group/subgroup/project")),
(
"http://www.gitlab.com/group/subgroup/project/",
("gitlab", "group/subgroup/project"),
),
],
)
def test_parse_repo_url_valid(url, expected):
assert parse_repo_url(url) == expected
@pytest.mark.parametrize(
"url",
[
"https://github.com/",
"https://github.com/user",
"https://gitlab.com/",
"https://gitlab.com/group",
"ftp://github.com/user/repo",
"http:/github.com/user/repo",
"not a url",
"https://gitlabx.com/group/project",
"gitlab.com/group//project",
"github.com/user/repo/extra",
],
)
def test_parse_repo_url_invalid(url):
assert parse_repo_url(url) == (None, None)

View File

@ -0,0 +1,77 @@
import os
import sys
import asyncio
from unittest.mock import AsyncMock, MagicMock
import pytest
# Ensure the project root is on sys.path so we can import modules
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from settings_manager import get_guild_prefix, _get_redis_key
class DummyAcquire:
def __init__(self, conn, pool):
self._conn = conn
self._pool = pool
async def __aenter__(self):
self._pool.acquire_enter_called += 1
return self._conn
async def __aexit__(self, exc_type, exc, tb):
pass
class DummyPool:
def __init__(self, conn):
self._conn = conn
self.acquire_enter_called = 0
def acquire(self):
return DummyAcquire(self._conn, self)
class DummyBot:
def __init__(self, pg_pool, redis):
self.pg_pool = pg_pool
self.redis = redis
def run_async(coro):
return asyncio.get_event_loop().run_until_complete(coro)
def test_get_guild_prefix_caching(monkeypatch):
guild_id = 123
prefix = "!"
# Mock Postgres connection and pool
mock_conn = MagicMock()
mock_conn.fetchval = AsyncMock(return_value=prefix)
pg_pool = DummyPool(mock_conn)
# Mock redis client
redis_mock = MagicMock()
redis_mock.get = AsyncMock(side_effect=[None, prefix])
redis_mock.set = AsyncMock()
bot = DummyBot(pg_pool, redis_mock)
monkeypatch.setattr("settings_manager.get_bot_instance", lambda: bot)
cache_key = _get_redis_key(guild_id, "prefix")
# First call should hit Postgres and set Redis
result1 = run_async(get_guild_prefix(guild_id, "?"))
assert result1 == prefix
assert pg_pool.acquire_enter_called == 1
assert mock_conn.fetchval.call_count == 1
redis_mock.set.assert_called_once_with(cache_key, prefix, ex=3600)
# Second call should use Redis and not hit Postgres again
result2 = run_async(get_guild_prefix(guild_id, "?"))
assert result2 == prefix
assert pg_pool.acquire_enter_called == 1
assert mock_conn.fetchval.call_count == 1
assert redis_mock.get.call_count == 2
assert redis_mock.set.call_count == 1

View File

@ -0,0 +1,95 @@
"""Tests for starboard database helper functions."""
# pylint: disable=wrong-import-position
import os
import sys
# Ensure project root is on sys.path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from unittest.mock import AsyncMock, patch
import pytest # pylint: disable=import-error
import settings_manager # pylint: disable=import-error
class DummyBot:
"""Simple container for a pg_pool mock."""
def __init__(self, pg_pool):
self.pg_pool = pg_pool
@pytest.mark.asyncio
async def test_create_starboard_entry():
"""Verify create_starboard_entry executes expected queries."""
conn = AsyncMock()
acquire_cm = AsyncMock()
acquire_cm.__aenter__.return_value = conn
acquire_cm.__aexit__.return_value = None
pg_pool = AsyncMock()
pg_pool.acquire.return_value = acquire_cm
bot = DummyBot(pg_pool)
with patch.object(settings_manager, "get_bot_instance", return_value=bot):
result = await settings_manager.create_starboard_entry(
guild_id=1,
original_message_id=2,
original_channel_id=3,
starboard_message_id=4,
author_id=5,
star_count=6,
)
assert result is True
pg_pool.acquire.assert_called_once()
assert conn.execute.await_count == 2
@pytest.mark.asyncio
async def test_update_starboard_entry():
"""Verify update_starboard_entry updates star count."""
conn = AsyncMock()
pg_pool = AsyncMock()
pg_pool.acquire = AsyncMock(return_value=conn)
pg_pool.release = AsyncMock()
bot = DummyBot(pg_pool)
with patch.object(settings_manager, "get_bot_instance", return_value=bot):
result = await settings_manager.update_starboard_entry(
guild_id=1, original_message_id=2, star_count=3
)
assert result is True
pg_pool.acquire.assert_called_once()
conn.execute.assert_awaited_once()
pg_pool.release.assert_called_once_with(conn)
@pytest.mark.asyncio
async def test_get_starboard_entry():
"""Verify get_starboard_entry fetches the row and returns a dict."""
entry_data = {"guild_id": 1, "original_message_id": 2}
conn = AsyncMock()
conn.fetchrow = AsyncMock(return_value=entry_data)
acquire_cm = AsyncMock()
acquire_cm.__aenter__.return_value = conn
acquire_cm.__aexit__.return_value = None
pg_pool = AsyncMock()
pg_pool.acquire.return_value = acquire_cm
bot = DummyBot(pg_pool)
with patch.object(settings_manager, "get_bot_instance", return_value=bot):
result = await settings_manager.get_starboard_entry(1, 2)
assert result == entry_data
pg_pool.acquire.assert_called_once()
conn.fetchrow.assert_awaited_once()