This commit is contained in:
Slipstream 2025-06-05 22:25:44 -06:00
commit 9521c12e56
Signed by: slipstream
GPG Key ID: 13E498CE010AC6FD
18 changed files with 476 additions and 407 deletions

View File

@ -1,48 +0,0 @@
import platform
import subprocess
import wmi
# # Windows version
# def get_gpus_windows():
# w = wmi.WMI()
# gpus = w.Win32_VideoController()
# return [{'name': gpu.Name, 'driver': gpu.DriverVersion} for gpu in gpus]
# if platform.system() == 'Windows':
# print(get_gpus_windows())
# def get_glxinfo_gpu():
# try:
# output = subprocess.check_output("glxinfo | grep -i 'device\|vendor'", shell=True).decode()
# return output
# except Exception as e:
# return f"Error: {e}"
# if platform.system() == 'Linux':
# print(get_glxinfo_gpu())
# # Install pyopencl with pip if not already installed: pip install pyopencl
# import pyopencl as cl
# def get_opencl_gpus():
# platforms = cl.get_platforms()
# gpu_info = []
# for platform in platforms:
# devices = platform.get_devices(device_type=cl.device_type.GPU)
# for device in devices:
# gpu_info.append({
# 'name': device.name,
# 'vendor': device.vendor,
# 'version': device.version,
# 'global_mem_size': device.global_mem_size,
# 'max_compute_units': device.max_compute_units
# })
# return gpu_info
# print(get_opencl_gpus())
from pyadl import *
devices = ADLManager.getInstance().getDevices()
for device in devices:
print("{0}. {1}".format(device.adapterIndex, device.adapterName))

View File

@ -1,52 +0,0 @@
import asyncio
import logging
import sys
import os
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
log = logging.getLogger("test_pagination")
# Add the current directory to the path so we can import the cogs
sys.path.append(os.getcwd())
from cogs.safebooru_cog import SafebooruCog
from discord.ext import commands
import discord
async def test_pagination():
# Create a mock bot with intents
intents = discord.Intents.default()
bot = commands.Bot(command_prefix="!", intents=intents)
# Initialize the cog
cog = SafebooruCog(bot)
# Test the pagination for a specific tag
tag = "kasane_teto"
log.info(f"Testing pagination for tag: {tag}")
# Call the _fetch_posts_logic method
results = await cog._fetch_posts_logic("test", tag)
# Check the results
if isinstance(results, tuple):
log.info(f"Found {len(results[1])} results")
# Print the first few results
for i, result in enumerate(results[1][:5]):
log.info(f"Result {i+1}: {result.get('id')} - {result.get('file_url')}")
else:
log.error(f"Error: {results}")
# Clean up
if hasattr(cog, "session") and cog.session and not cog.session.closed:
await cog.session.close()
log.info("Closed aiohttp session")
if __name__ == "__main__":
# Run the test
asyncio.run(test_pagination())

View File

@ -1,20 +0,0 @@
# Test script for Part constructor
try:
from gurt.api import types
print("Successfully imported types module")
# Test creating a Part with text
part = types.Part(text="test")
print(f"Successfully created Part with text: {part}")
# Test creating a Part with URI
part_uri = types.Part(uri="https://example.com", mime_type="text/plain")
print(f"Successfully created Part with URI: {part_uri}")
print("All tests passed!")
except Exception as e:
print(f"Error: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()

View File

@ -1,61 +0,0 @@
import asyncio
import discord
from discord.ext import commands
import os
from dotenv import load_dotenv
import logging
import sys
# Add the parent directory to sys.path to allow imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Import the starboard cog and settings manager
from cogs.starboard_cog import StarboardCog
import settings_manager as settings_manager
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s:%(levelname)s:%(name)s: %(message)s"
)
log = logging.getLogger(__name__)
# Set up intents
intents = discord.Intents.default()
intents.message_content = True
intents.members = True
# Create bot instance
bot = commands.Bot(command_prefix="!", intents=intents)
@bot.event
async def on_ready():
log.info(f"{bot.user.name} has connected to Discord!")
log.info(f"Bot ID: {bot.user.id}")
# Load the starboard cog
try:
await bot.add_cog(StarboardCog(bot))
log.info("StarboardCog loaded successfully!")
except Exception as e:
log.error(f"Error loading StarboardCog: {e}")
async def main():
TOKEN = os.getenv("DISCORD_TOKEN")
if not TOKEN:
raise ValueError(
"No token found. Make sure to set DISCORD_TOKEN in your .env file."
)
try:
await bot.start(TOKEN)
except Exception as e:
log.exception(f"Error starting bot: {e}")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -1,64 +0,0 @@
import json
import os
# Define the path for the JSON file to store timeout chance
TIMEOUT_CONFIG_FILE = os.path.join("data", "timeout_config.json")
def load_timeout_config():
"""Load timeout configuration from JSON file"""
timeout_chance = 0.005 # Default value
if os.path.exists(TIMEOUT_CONFIG_FILE):
try:
with open(TIMEOUT_CONFIG_FILE, "r") as f:
data = json.load(f)
if "timeout_chance" in data:
timeout_chance = data["timeout_chance"]
print(f"Loaded timeout chance: {timeout_chance}")
else:
print("timeout_chance not found in config file")
except Exception as e:
print(f"Error loading timeout configuration: {e}")
else:
print(f"Config file does not exist: {TIMEOUT_CONFIG_FILE}")
return timeout_chance
def save_timeout_config(timeout_chance):
"""Save timeout configuration to JSON file"""
try:
# Ensure data directory exists
os.makedirs(os.path.dirname(TIMEOUT_CONFIG_FILE), exist_ok=True)
config_data = {
"timeout_chance": timeout_chance,
"target_user_id": 748405715520978965,
"timeout_duration": 60,
}
with open(TIMEOUT_CONFIG_FILE, "w") as f:
json.dump(config_data, f, indent=4)
print(f"Saved timeout configuration with chance: {timeout_chance}")
return True
except Exception as e:
print(f"Error saving timeout configuration: {e}")
return False
# Test the functionality
if __name__ == "__main__":
# Load the current config
current_chance = load_timeout_config()
print(f"Current timeout chance: {current_chance}")
# Update the timeout chance
new_chance = 0.01 # 1%
if save_timeout_config(new_chance):
print(f"Successfully updated timeout chance to {new_chance}")
# Load the config again to verify it was saved
updated_chance = load_timeout_config()
print(f"Updated timeout chance: {updated_chance}")
# Restore the original value
if save_timeout_config(current_chance):
print(f"Restored timeout chance to original value: {current_chance}")

View File

@ -1,41 +0,0 @@
import re
from typing import Optional, Tuple
# Copy of the fixed parse_repo_url function
def parse_repo_url(url: str) -> Tuple[Optional[str], Optional[str]]:
"""Parses a Git repository URL to extract platform and a simplified repo identifier."""
# Fixed regex pattern for GitHub URLs
github_match = re.match(
r"^(?:https?://)?(?:www\.)?github\.com/([\w.-]+/[\w.-]+)(?:\.git)?/?$", url
)
if github_match:
return "github", github_match.group(1)
gitlab_match = re.match(
r"^(?:https?://)?(?:www\.)?gitlab\.com/([\w.-]+(?:/[\w.-]+)+)(?:\.git)?/?$", url
)
if gitlab_match:
return "gitlab", gitlab_match.group(1)
return None, None
# Test URLs
test_urls = [
"https://github.com/Slipstreamm/discordbot",
"http://github.com/Slipstreamm/discordbot",
"github.com/Slipstreamm/discordbot",
"www.github.com/Slipstreamm/discordbot",
"https://github.com/Slipstreamm/git",
"https://gitlab.com/group/project",
"https://gitlab.com/group/subgroup/project",
"invalid-url",
]
# Test each URL
print("Testing URL parsing with fixed regex pattern:")
print("-" * 50)
for url in test_urls:
platform, repo_id = parse_repo_url(url)
result = f"Valid: {platform}, {repo_id}" if platform else "Invalid URL"
print(f"{url} => {result}")

View File

@ -1,121 +0,0 @@
#!/usr/bin/env python3
"""
Test script to verify usage counter functionality.
This script demonstrates how to query the usage counters table.
"""
import asyncio
import asyncpg
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
async def test_usage_counters():
"""Test the usage counters functionality."""
# Create database connection
try:
conn_string = f"postgresql://{os.getenv('POSTGRES_USER')}:{os.getenv('POSTGRES_PASSWORD')}@{os.getenv('POSTGRES_HOST')}:{os.getenv('POSTGRES_PORT')}/{os.getenv('POSTGRES_SETTINGS_DB')}"
conn = await asyncpg.connect(conn_string)
print("✅ Connected to database successfully")
except Exception as e:
print(f"❌ Failed to connect to database: {e}")
return
try:
# Check if the table exists
table_exists = await conn.fetchval(
"""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'command_usage_counters'
)
"""
)
if table_exists:
print("✅ command_usage_counters table exists")
# Get some sample data
records = await conn.fetch(
"""
SELECT user1_id, user2_id, command_name, usage_count
FROM command_usage_counters
ORDER BY usage_count DESC
LIMIT 10
"""
)
if records:
print("\n📊 Top 10 command usages:")
print("User1 ID | User2 ID | Command | Count")
print("-" * 45)
for record in records:
print(
f"{record['user1_id']} | {record['user2_id']} | {record['command_name']} | {record['usage_count']}"
)
else:
print("📝 No usage data found yet (table is empty)")
# Get total count
total_count = await conn.fetchval(
"SELECT COUNT(*) FROM command_usage_counters"
)
print(f"\n📈 Total unique user-command combinations: {total_count}")
else:
print("⚠️ command_usage_counters table does not exist yet")
print(" It will be created automatically when a command is first used")
except Exception as e:
print(f"❌ Error querying database: {e}")
finally:
await conn.close()
print("🔌 Database connection closed")
async def get_usage_for_users(user1_id: int, user2_id: int):
"""Get usage statistics for a specific pair of users."""
try:
conn_string = f"postgresql://{os.getenv('POSTGRES_USER')}:{os.getenv('POSTGRES_PASSWORD')}@{os.getenv('POSTGRES_HOST')}:{os.getenv('POSTGRES_PORT')}/{os.getenv('POSTGRES_SETTINGS_DB')}"
conn = await asyncpg.connect(conn_string)
records = await conn.fetch(
"""
SELECT command_name, usage_count
FROM command_usage_counters
WHERE user1_id = $1 AND user2_id = $2
ORDER BY usage_count DESC
""",
user1_id,
user2_id,
)
if records:
print(f"\n👥 Usage between users {user1_id} and {user2_id}:")
print("Command | Count")
print("-" * 20)
for record in records:
print(f"{record['command_name']} | {record['usage_count']}")
else:
print(f"📝 No usage data found between users {user1_id} and {user2_id}")
await conn.close()
except Exception as e:
print(f"❌ Error querying user data: {e}")
if __name__ == "__main__":
print("🧪 Testing Usage Counters Functionality")
print("=" * 40)
# Test basic functionality
asyncio.run(test_usage_counters())
# Example: Get usage for specific users (replace with actual user IDs)
# asyncio.run(get_usage_for_users(123456789, 987654321))

View File

@ -0,0 +1,50 @@
import os
import sys
import time
import asyncio
from unittest.mock import patch
# Ensure the project root is on sys.path so we can import modules
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import custom_bot_manager as cbm
async def fake_start(self, token):
cbm.custom_bot_status[self.user_id] = cbm.STATUS_RUNNING
while not getattr(self, "_stop_flag", False):
await asyncio.sleep(0.01)
async def fake_close(self):
self._stop_flag = True
self._closed = True
def test_custom_bot_lifecycle():
user_id = "test_user"
token = "fake_token"
with patch("custom_bot_manager.commands.Bot.start", new=fake_start), patch(
"custom_bot_manager.CustomBot.close", new=fake_close
):
success, _ = cbm.create_custom_bot(user_id, token)
assert success
assert cbm.custom_bot_status[user_id] == cbm.STATUS_STOPPED
success, _ = cbm.run_custom_bot_in_thread(user_id, token)
assert success
assert user_id in cbm.custom_bot_threads
# Allow the start coroutine to run
time.sleep(0.05)
assert cbm.custom_bot_status[user_id] == cbm.STATUS_RUNNING
thread = cbm.custom_bot_threads[user_id]
success, _ = cbm.stop_custom_bot(user_id)
assert success
# Wait for the bot thread to exit
thread.join(timeout=1.0)
assert cbm.custom_bot_status[user_id] == cbm.STATUS_STOPPED
assert user_id not in cbm.custom_bot_threads

52
tests/test_git_monitor.py Normal file
View File

@ -0,0 +1,52 @@
import os
import sys
import pytest
# Ensure the project root is on sys.path so we can import modules
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from cogs.git_monitor_cog import parse_repo_url
@pytest.mark.parametrize(
"url,expected",
[
("https://github.com/user/repo", ("github", "user/repo")),
("http://github.com/user/repo", ("github", "user/repo")),
("github.com/user/repo", ("github", "user/repo")),
("https://www.github.com/user/repo/", ("github", "user/repo")),
("https://github.com/user/repo.git", ("github", "user/repo")),
("https://github.com/user-name/re.po", ("github", "user-name/re.po")),
("https://gitlab.com/group/project", ("gitlab", "group/project")),
(
"https://gitlab.com/group/subgroup/project",
("gitlab", "group/subgroup/project"),
),
("gitlab.com/group/subgroup/project.git", ("gitlab", "group/subgroup/project")),
(
"http://www.gitlab.com/group/subgroup/project/",
("gitlab", "group/subgroup/project"),
),
],
)
def test_parse_repo_url_valid(url, expected):
assert parse_repo_url(url) == expected
@pytest.mark.parametrize(
"url",
[
"https://github.com/",
"https://github.com/user",
"https://gitlab.com/",
"https://gitlab.com/group",
"ftp://github.com/user/repo",
"http:/github.com/user/repo",
"not a url",
"https://gitlabx.com/group/project",
"gitlab.com/group//project",
"github.com/user/repo/extra",
],
)
def test_parse_repo_url_invalid(url):
assert parse_repo_url(url) == (None, None)

26
tests/test_gputil.py Normal file
View File

@ -0,0 +1,26 @@
import pytest
try:
import pyadl
except ImportError: # pragma: no cover - dependency optional
pyadl = None
class FakeDevice:
def __init__(self, index: int, name: str):
self.adapterIndex = index
self.adapterName = name
class FakeManager:
def getDevices(self):
return [FakeDevice(0, "GPU0"), FakeDevice(1, "GPU1")]
@pytest.mark.skipif(pyadl is None, reason="pyadl not installed")
def test_pyadl_devices(monkeypatch):
monkeypatch.setattr(pyadl.ADLManager, "getInstance", lambda: FakeManager())
devices = pyadl.ADLManager.getInstance().getDevices()
assert len(devices) == 2
assert devices[0].adapterName == "GPU0"

49
tests/test_pagination.py Normal file
View File

@ -0,0 +1,49 @@
import asyncio
import pytest
import discord
from discord.ext import commands
from cogs.safebooru_cog import SafebooruCog
class MockResponse:
def __init__(self, status: int, data):
self.status = status
self._data = data
async def json(self):
return self._data
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return False
class MockSession:
def __init__(self, data):
self.data = data
self.closed = False
def get(self, *args, **kwargs):
return MockResponse(200, self.data)
async def close(self):
self.closed = True
@pytest.mark.asyncio
async def test_fetch_posts_logic(monkeypatch):
intents = discord.Intents.none()
bot = commands.Bot(command_prefix="!", intents=intents)
cog = SafebooruCog(bot)
mock_data = [{"id": "1", "file_url": "http://example.com/image.jpg"}]
monkeypatch.setattr(cog, "session", MockSession(mock_data))
results = await cog._fetch_posts_logic(
"test", "tag", pid_override=0, limit_override=1
)
assert isinstance(results, list)
assert results[0]["id"] == "1"

18
tests/test_part.py Normal file
View File

@ -0,0 +1,18 @@
import sys
import pytest
def test_part_constructors():
try:
import google.generativeai as generativeai
sys.modules.setdefault("google.genai", generativeai)
from gurt.api import types
except Exception as e: # pragma: no cover - skip if dependencies missing
pytest.skip(f"gurt.api unavailable: {e}")
part = types.Part(text="test")
assert part
uri_part = types.Part(uri="https://example.com", mime_type="text/plain")
assert getattr(uri_part, "uri", None) == "https://example.com"

View File

@ -0,0 +1,77 @@
import os
import sys
import asyncio
from unittest.mock import AsyncMock, MagicMock
import pytest
# Ensure the project root is on sys.path so we can import modules
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from settings_manager import get_guild_prefix, _get_redis_key
class DummyAcquire:
def __init__(self, conn, pool):
self._conn = conn
self._pool = pool
async def __aenter__(self):
self._pool.acquire_enter_called += 1
return self._conn
async def __aexit__(self, exc_type, exc, tb):
pass
class DummyPool:
def __init__(self, conn):
self._conn = conn
self.acquire_enter_called = 0
def acquire(self):
return DummyAcquire(self._conn, self)
class DummyBot:
def __init__(self, pg_pool, redis):
self.pg_pool = pg_pool
self.redis = redis
def run_async(coro):
return asyncio.get_event_loop().run_until_complete(coro)
def test_get_guild_prefix_caching(monkeypatch):
guild_id = 123
prefix = "!"
# Mock Postgres connection and pool
mock_conn = MagicMock()
mock_conn.fetchval = AsyncMock(return_value=prefix)
pg_pool = DummyPool(mock_conn)
# Mock redis client
redis_mock = MagicMock()
redis_mock.get = AsyncMock(side_effect=[None, prefix])
redis_mock.set = AsyncMock()
bot = DummyBot(pg_pool, redis_mock)
monkeypatch.setattr("settings_manager.get_bot_instance", lambda: bot)
cache_key = _get_redis_key(guild_id, "prefix")
# First call should hit Postgres and set Redis
result1 = run_async(get_guild_prefix(guild_id, "?"))
assert result1 == prefix
assert pg_pool.acquire_enter_called == 1
assert mock_conn.fetchval.call_count == 1
redis_mock.set.assert_called_once_with(cache_key, prefix, ex=3600)
# Second call should use Redis and not hit Postgres again
result2 = run_async(get_guild_prefix(guild_id, "?"))
assert result2 == prefix
assert pg_pool.acquire_enter_called == 1
assert mock_conn.fetchval.call_count == 1
assert redis_mock.get.call_count == 2
assert redis_mock.set.call_count == 1

14
tests/test_starboard.py Normal file
View File

@ -0,0 +1,14 @@
import pytest
import discord
from discord.ext import commands
from cogs.starboard_cog import StarboardCog
@pytest.mark.asyncio
async def test_starboard_cog_load():
intents = discord.Intents.none()
bot = commands.Bot(command_prefix="!", intents=intents)
cog = StarboardCog(bot)
await bot.add_cog(cog)
assert cog in bot.cogs.values()

View File

@ -0,0 +1,95 @@
"""Tests for starboard database helper functions."""
# pylint: disable=wrong-import-position
import os
import sys
# Ensure project root is on sys.path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from unittest.mock import AsyncMock, patch
import pytest # pylint: disable=import-error
import settings_manager # pylint: disable=import-error
class DummyBot:
"""Simple container for a pg_pool mock."""
def __init__(self, pg_pool):
self.pg_pool = pg_pool
@pytest.mark.asyncio
async def test_create_starboard_entry():
"""Verify create_starboard_entry executes expected queries."""
conn = AsyncMock()
acquire_cm = AsyncMock()
acquire_cm.__aenter__.return_value = conn
acquire_cm.__aexit__.return_value = None
pg_pool = AsyncMock()
pg_pool.acquire.return_value = acquire_cm
bot = DummyBot(pg_pool)
with patch.object(settings_manager, "get_bot_instance", return_value=bot):
result = await settings_manager.create_starboard_entry(
guild_id=1,
original_message_id=2,
original_channel_id=3,
starboard_message_id=4,
author_id=5,
star_count=6,
)
assert result is True
pg_pool.acquire.assert_called_once()
assert conn.execute.await_count == 2
@pytest.mark.asyncio
async def test_update_starboard_entry():
"""Verify update_starboard_entry updates star count."""
conn = AsyncMock()
pg_pool = AsyncMock()
pg_pool.acquire = AsyncMock(return_value=conn)
pg_pool.release = AsyncMock()
bot = DummyBot(pg_pool)
with patch.object(settings_manager, "get_bot_instance", return_value=bot):
result = await settings_manager.update_starboard_entry(
guild_id=1, original_message_id=2, star_count=3
)
assert result is True
pg_pool.acquire.assert_called_once()
conn.execute.assert_awaited_once()
pg_pool.release.assert_called_once_with(conn)
@pytest.mark.asyncio
async def test_get_starboard_entry():
"""Verify get_starboard_entry fetches the row and returns a dict."""
entry_data = {"guild_id": 1, "original_message_id": 2}
conn = AsyncMock()
conn.fetchrow = AsyncMock(return_value=entry_data)
acquire_cm = AsyncMock()
acquire_cm.__aenter__.return_value = conn
acquire_cm.__aexit__.return_value = None
pg_pool = AsyncMock()
pg_pool.acquire.return_value = acquire_cm
bot = DummyBot(pg_pool)
with patch.object(settings_manager, "get_bot_instance", return_value=bot):
result = await settings_manager.get_starboard_entry(1, 2)
assert result == entry_data
pg_pool.acquire.assert_called_once()
conn.fetchrow.assert_awaited_once()

View File

@ -0,0 +1,29 @@
import json
from pathlib import Path
def load_timeout_config(path: Path) -> float:
timeout_chance = 0.005
if path.exists():
with open(path, "r") as f:
data = json.load(f)
timeout_chance = data.get("timeout_chance", timeout_chance)
return timeout_chance
def save_timeout_config(path: Path, timeout_chance: float) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
data = {
"timeout_chance": timeout_chance,
"target_user_id": 748405715520978965,
"timeout_duration": 60,
}
with open(path, "w") as f:
json.dump(data, f, indent=4)
def test_timeout_config_roundtrip(tmp_path: Path):
cfg = tmp_path / "timeout_config.json"
save_timeout_config(cfg, 0.01)
assert cfg.exists()
assert load_timeout_config(cfg) == 0.01

47
tests/test_url_parser.py Normal file
View File

@ -0,0 +1,47 @@
import re
from typing import Optional, Tuple
import pytest
def parse_repo_url(url: str) -> Tuple[Optional[str], Optional[str]]:
"""Parses a Git repository URL and returns platform and repo id."""
github_match = re.match(
r"^(?:https?://)?(?:www\.)?github\.com/([\w.-]+/[\w.-]+)(?:\.git)?/?$",
url,
)
if github_match:
return "github", github_match.group(1)
gitlab_match = re.match(
r"^(?:https?://)?(?:www\.)?gitlab\.com/([\w.-]+(?:/[\w.-]+)+)(?:\.git)?/?$",
url,
)
if gitlab_match:
return "gitlab", gitlab_match.group(1)
return None, None
@pytest.mark.parametrize(
"url,expected",
[
(
"https://github.com/Slipstreamm/discordbot",
("github", "Slipstreamm/discordbot"),
),
(
"http://github.com/Slipstreamm/discordbot",
("github", "Slipstreamm/discordbot"),
),
("github.com/Slipstreamm/discordbot", ("github", "Slipstreamm/discordbot")),
("www.github.com/Slipstreamm/discordbot", ("github", "Slipstreamm/discordbot")),
("https://github.com/Slipstreamm/git", ("github", "Slipstreamm/git")),
("https://gitlab.com/group/project", ("gitlab", "group/project")),
(
"https://gitlab.com/group/subgroup/project",
("gitlab", "group/subgroup/project"),
),
("invalid-url", (None, None)),
],
)
def test_parse_repo_url(url: str, expected: Tuple[Optional[str], Optional[str]]):
assert parse_repo_url(url) == expected

View File

@ -0,0 +1,19 @@
import os
def build_conn_string() -> str:
return (
f"postgresql://{os.getenv('POSTGRES_USER')}:{os.getenv('POSTGRES_PASSWORD')}@"
f"{os.getenv('POSTGRES_HOST')}:{os.getenv('POSTGRES_PORT')}/"
f"{os.getenv('POSTGRES_SETTINGS_DB')}"
)
def test_connection_string(monkeypatch):
monkeypatch.setenv("POSTGRES_USER", "user")
monkeypatch.setenv("POSTGRES_PASSWORD", "pass")
monkeypatch.setenv("POSTGRES_HOST", "localhost")
monkeypatch.setenv("POSTGRES_PORT", "5432")
monkeypatch.setenv("POSTGRES_SETTINGS_DB", "db")
assert build_conn_string() == "postgresql://user:pass@localhost:5432/db"