Move scripts to pytest

This commit is contained in:
Codex 2025-06-06 04:22:31 +00:00 committed by Slipstream
parent 33c545de7b
commit 075461246f
Signed by: slipstream
GPG Key ID: 13E498CE010AC6FD
14 changed files with 202 additions and 407 deletions

View File

@ -1,48 +0,0 @@
import platform
import subprocess
import wmi
# # Windows version
# def get_gpus_windows():
# w = wmi.WMI()
# gpus = w.Win32_VideoController()
# return [{'name': gpu.Name, 'driver': gpu.DriverVersion} for gpu in gpus]
# if platform.system() == 'Windows':
# print(get_gpus_windows())
# def get_glxinfo_gpu():
# try:
# output = subprocess.check_output("glxinfo | grep -i 'device\|vendor'", shell=True).decode()
# return output
# except Exception as e:
# return f"Error: {e}"
# if platform.system() == 'Linux':
# print(get_glxinfo_gpu())
# # Install pyopencl with pip if not already installed: pip install pyopencl
# import pyopencl as cl
# def get_opencl_gpus():
# platforms = cl.get_platforms()
# gpu_info = []
# for platform in platforms:
# devices = platform.get_devices(device_type=cl.device_type.GPU)
# for device in devices:
# gpu_info.append({
# 'name': device.name,
# 'vendor': device.vendor,
# 'version': device.version,
# 'global_mem_size': device.global_mem_size,
# 'max_compute_units': device.max_compute_units
# })
# return gpu_info
# print(get_opencl_gpus())
from pyadl import *
devices = ADLManager.getInstance().getDevices()
for device in devices:
print("{0}. {1}".format(device.adapterIndex, device.adapterName))

View File

@ -1,52 +0,0 @@
import asyncio
import logging
import sys
import os
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
log = logging.getLogger("test_pagination")
# Add the current directory to the path so we can import the cogs
sys.path.append(os.getcwd())
from cogs.safebooru_cog import SafebooruCog
from discord.ext import commands
import discord
async def test_pagination():
# Create a mock bot with intents
intents = discord.Intents.default()
bot = commands.Bot(command_prefix="!", intents=intents)
# Initialize the cog
cog = SafebooruCog(bot)
# Test the pagination for a specific tag
tag = "kasane_teto"
log.info(f"Testing pagination for tag: {tag}")
# Call the _fetch_posts_logic method
results = await cog._fetch_posts_logic("test", tag)
# Check the results
if isinstance(results, tuple):
log.info(f"Found {len(results[1])} results")
# Print the first few results
for i, result in enumerate(results[1][:5]):
log.info(f"Result {i+1}: {result.get('id')} - {result.get('file_url')}")
else:
log.error(f"Error: {results}")
# Clean up
if hasattr(cog, "session") and cog.session and not cog.session.closed:
await cog.session.close()
log.info("Closed aiohttp session")
if __name__ == "__main__":
# Run the test
asyncio.run(test_pagination())

View File

@ -1,20 +0,0 @@
# Test script for Part constructor
try:
from gurt.api import types
print("Successfully imported types module")
# Test creating a Part with text
part = types.Part(text="test")
print(f"Successfully created Part with text: {part}")
# Test creating a Part with URI
part_uri = types.Part(uri="https://example.com", mime_type="text/plain")
print(f"Successfully created Part with URI: {part_uri}")
print("All tests passed!")
except Exception as e:
print(f"Error: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()

View File

@ -1,61 +0,0 @@
import asyncio
import discord
from discord.ext import commands
import os
from dotenv import load_dotenv
import logging
import sys
# Add the parent directory to sys.path to allow imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Import the starboard cog and settings manager
from cogs.starboard_cog import StarboardCog
import settings_manager as settings_manager
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s:%(levelname)s:%(name)s: %(message)s"
)
log = logging.getLogger(__name__)
# Set up intents
intents = discord.Intents.default()
intents.message_content = True
intents.members = True
# Create bot instance
bot = commands.Bot(command_prefix="!", intents=intents)
@bot.event
async def on_ready():
log.info(f"{bot.user.name} has connected to Discord!")
log.info(f"Bot ID: {bot.user.id}")
# Load the starboard cog
try:
await bot.add_cog(StarboardCog(bot))
log.info("StarboardCog loaded successfully!")
except Exception as e:
log.error(f"Error loading StarboardCog: {e}")
async def main():
TOKEN = os.getenv("DISCORD_TOKEN")
if not TOKEN:
raise ValueError(
"No token found. Make sure to set DISCORD_TOKEN in your .env file."
)
try:
await bot.start(TOKEN)
except Exception as e:
log.exception(f"Error starting bot: {e}")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -1,64 +0,0 @@
import json
import os
# Define the path for the JSON file to store timeout chance
TIMEOUT_CONFIG_FILE = os.path.join("data", "timeout_config.json")
def load_timeout_config():
"""Load timeout configuration from JSON file"""
timeout_chance = 0.005 # Default value
if os.path.exists(TIMEOUT_CONFIG_FILE):
try:
with open(TIMEOUT_CONFIG_FILE, "r") as f:
data = json.load(f)
if "timeout_chance" in data:
timeout_chance = data["timeout_chance"]
print(f"Loaded timeout chance: {timeout_chance}")
else:
print("timeout_chance not found in config file")
except Exception as e:
print(f"Error loading timeout configuration: {e}")
else:
print(f"Config file does not exist: {TIMEOUT_CONFIG_FILE}")
return timeout_chance
def save_timeout_config(timeout_chance):
"""Save timeout configuration to JSON file"""
try:
# Ensure data directory exists
os.makedirs(os.path.dirname(TIMEOUT_CONFIG_FILE), exist_ok=True)
config_data = {
"timeout_chance": timeout_chance,
"target_user_id": 748405715520978965,
"timeout_duration": 60,
}
with open(TIMEOUT_CONFIG_FILE, "w") as f:
json.dump(config_data, f, indent=4)
print(f"Saved timeout configuration with chance: {timeout_chance}")
return True
except Exception as e:
print(f"Error saving timeout configuration: {e}")
return False
# Test the functionality
if __name__ == "__main__":
# Load the current config
current_chance = load_timeout_config()
print(f"Current timeout chance: {current_chance}")
# Update the timeout chance
new_chance = 0.01 # 1%
if save_timeout_config(new_chance):
print(f"Successfully updated timeout chance to {new_chance}")
# Load the config again to verify it was saved
updated_chance = load_timeout_config()
print(f"Updated timeout chance: {updated_chance}")
# Restore the original value
if save_timeout_config(current_chance):
print(f"Restored timeout chance to original value: {current_chance}")

View File

@ -1,41 +0,0 @@
import re
from typing import Optional, Tuple
# Copy of the fixed parse_repo_url function
def parse_repo_url(url: str) -> Tuple[Optional[str], Optional[str]]:
"""Parses a Git repository URL to extract platform and a simplified repo identifier."""
# Fixed regex pattern for GitHub URLs
github_match = re.match(
r"^(?:https?://)?(?:www\.)?github\.com/([\w.-]+/[\w.-]+)(?:\.git)?/?$", url
)
if github_match:
return "github", github_match.group(1)
gitlab_match = re.match(
r"^(?:https?://)?(?:www\.)?gitlab\.com/([\w.-]+(?:/[\w.-]+)+)(?:\.git)?/?$", url
)
if gitlab_match:
return "gitlab", gitlab_match.group(1)
return None, None
# Test URLs
test_urls = [
"https://github.com/Slipstreamm/discordbot",
"http://github.com/Slipstreamm/discordbot",
"github.com/Slipstreamm/discordbot",
"www.github.com/Slipstreamm/discordbot",
"https://github.com/Slipstreamm/git",
"https://gitlab.com/group/project",
"https://gitlab.com/group/subgroup/project",
"invalid-url",
]
# Test each URL
print("Testing URL parsing with fixed regex pattern:")
print("-" * 50)
for url in test_urls:
platform, repo_id = parse_repo_url(url)
result = f"Valid: {platform}, {repo_id}" if platform else "Invalid URL"
print(f"{url} => {result}")

View File

@ -1,121 +0,0 @@
#!/usr/bin/env python3
"""
Test script to verify usage counter functionality.
This script demonstrates how to query the usage counters table.
"""
import asyncio
import asyncpg
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
async def test_usage_counters():
"""Test the usage counters functionality."""
# Create database connection
try:
conn_string = f"postgresql://{os.getenv('POSTGRES_USER')}:{os.getenv('POSTGRES_PASSWORD')}@{os.getenv('POSTGRES_HOST')}:{os.getenv('POSTGRES_PORT')}/{os.getenv('POSTGRES_SETTINGS_DB')}"
conn = await asyncpg.connect(conn_string)
print("✅ Connected to database successfully")
except Exception as e:
print(f"❌ Failed to connect to database: {e}")
return
try:
# Check if the table exists
table_exists = await conn.fetchval(
"""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'command_usage_counters'
)
"""
)
if table_exists:
print("✅ command_usage_counters table exists")
# Get some sample data
records = await conn.fetch(
"""
SELECT user1_id, user2_id, command_name, usage_count
FROM command_usage_counters
ORDER BY usage_count DESC
LIMIT 10
"""
)
if records:
print("\n📊 Top 10 command usages:")
print("User1 ID | User2 ID | Command | Count")
print("-" * 45)
for record in records:
print(
f"{record['user1_id']} | {record['user2_id']} | {record['command_name']} | {record['usage_count']}"
)
else:
print("📝 No usage data found yet (table is empty)")
# Get total count
total_count = await conn.fetchval(
"SELECT COUNT(*) FROM command_usage_counters"
)
print(f"\n📈 Total unique user-command combinations: {total_count}")
else:
print("⚠️ command_usage_counters table does not exist yet")
print(" It will be created automatically when a command is first used")
except Exception as e:
print(f"❌ Error querying database: {e}")
finally:
await conn.close()
print("🔌 Database connection closed")
async def get_usage_for_users(user1_id: int, user2_id: int):
"""Get usage statistics for a specific pair of users."""
try:
conn_string = f"postgresql://{os.getenv('POSTGRES_USER')}:{os.getenv('POSTGRES_PASSWORD')}@{os.getenv('POSTGRES_HOST')}:{os.getenv('POSTGRES_PORT')}/{os.getenv('POSTGRES_SETTINGS_DB')}"
conn = await asyncpg.connect(conn_string)
records = await conn.fetch(
"""
SELECT command_name, usage_count
FROM command_usage_counters
WHERE user1_id = $1 AND user2_id = $2
ORDER BY usage_count DESC
""",
user1_id,
user2_id,
)
if records:
print(f"\n👥 Usage between users {user1_id} and {user2_id}:")
print("Command | Count")
print("-" * 20)
for record in records:
print(f"{record['command_name']} | {record['usage_count']}")
else:
print(f"📝 No usage data found between users {user1_id} and {user2_id}")
await conn.close()
except Exception as e:
print(f"❌ Error querying user data: {e}")
if __name__ == "__main__":
print("🧪 Testing Usage Counters Functionality")
print("=" * 40)
# Test basic functionality
asyncio.run(test_usage_counters())
# Example: Get usage for specific users (replace with actual user IDs)
# asyncio.run(get_usage_for_users(123456789, 987654321))

26
tests/test_gputil.py Normal file
View File

@ -0,0 +1,26 @@
import pytest
try:
import pyadl
except ImportError: # pragma: no cover - dependency optional
pyadl = None
class FakeDevice:
def __init__(self, index: int, name: str):
self.adapterIndex = index
self.adapterName = name
class FakeManager:
def getDevices(self):
return [FakeDevice(0, "GPU0"), FakeDevice(1, "GPU1")]
@pytest.mark.skipif(pyadl is None, reason="pyadl not installed")
def test_pyadl_devices(monkeypatch):
monkeypatch.setattr(pyadl.ADLManager, "getInstance", lambda: FakeManager())
devices = pyadl.ADLManager.getInstance().getDevices()
assert len(devices) == 2
assert devices[0].adapterName == "GPU0"

49
tests/test_pagination.py Normal file
View File

@ -0,0 +1,49 @@
import asyncio
import pytest
import discord
from discord.ext import commands
from cogs.safebooru_cog import SafebooruCog
class MockResponse:
def __init__(self, status: int, data):
self.status = status
self._data = data
async def json(self):
return self._data
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return False
class MockSession:
def __init__(self, data):
self.data = data
self.closed = False
def get(self, *args, **kwargs):
return MockResponse(200, self.data)
async def close(self):
self.closed = True
@pytest.mark.asyncio
async def test_fetch_posts_logic(monkeypatch):
intents = discord.Intents.none()
bot = commands.Bot(command_prefix="!", intents=intents)
cog = SafebooruCog(bot)
mock_data = [{"id": "1", "file_url": "http://example.com/image.jpg"}]
monkeypatch.setattr(cog, "session", MockSession(mock_data))
results = await cog._fetch_posts_logic(
"test", "tag", pid_override=0, limit_override=1
)
assert isinstance(results, list)
assert results[0]["id"] == "1"

18
tests/test_part.py Normal file
View File

@ -0,0 +1,18 @@
import sys
import pytest
def test_part_constructors():
try:
import google.generativeai as generativeai
sys.modules.setdefault("google.genai", generativeai)
from gurt.api import types
except Exception as e: # pragma: no cover - skip if dependencies missing
pytest.skip(f"gurt.api unavailable: {e}")
part = types.Part(text="test")
assert part
uri_part = types.Part(uri="https://example.com", mime_type="text/plain")
assert getattr(uri_part, "uri", None) == "https://example.com"

14
tests/test_starboard.py Normal file
View File

@ -0,0 +1,14 @@
import pytest
import discord
from discord.ext import commands
from cogs.starboard_cog import StarboardCog
@pytest.mark.asyncio
async def test_starboard_cog_load():
intents = discord.Intents.none()
bot = commands.Bot(command_prefix="!", intents=intents)
cog = StarboardCog(bot)
await bot.add_cog(cog)
assert cog in bot.cogs.values()

View File

@ -0,0 +1,29 @@
import json
from pathlib import Path
def load_timeout_config(path: Path) -> float:
timeout_chance = 0.005
if path.exists():
with open(path, "r") as f:
data = json.load(f)
timeout_chance = data.get("timeout_chance", timeout_chance)
return timeout_chance
def save_timeout_config(path: Path, timeout_chance: float) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
data = {
"timeout_chance": timeout_chance,
"target_user_id": 748405715520978965,
"timeout_duration": 60,
}
with open(path, "w") as f:
json.dump(data, f, indent=4)
def test_timeout_config_roundtrip(tmp_path: Path):
cfg = tmp_path / "timeout_config.json"
save_timeout_config(cfg, 0.01)
assert cfg.exists()
assert load_timeout_config(cfg) == 0.01

47
tests/test_url_parser.py Normal file
View File

@ -0,0 +1,47 @@
import re
from typing import Optional, Tuple
import pytest
def parse_repo_url(url: str) -> Tuple[Optional[str], Optional[str]]:
"""Parses a Git repository URL and returns platform and repo id."""
github_match = re.match(
r"^(?:https?://)?(?:www\.)?github\.com/([\w.-]+/[\w.-]+)(?:\.git)?/?$",
url,
)
if github_match:
return "github", github_match.group(1)
gitlab_match = re.match(
r"^(?:https?://)?(?:www\.)?gitlab\.com/([\w.-]+(?:/[\w.-]+)+)(?:\.git)?/?$",
url,
)
if gitlab_match:
return "gitlab", gitlab_match.group(1)
return None, None
@pytest.mark.parametrize(
"url,expected",
[
(
"https://github.com/Slipstreamm/discordbot",
("github", "Slipstreamm/discordbot"),
),
(
"http://github.com/Slipstreamm/discordbot",
("github", "Slipstreamm/discordbot"),
),
("github.com/Slipstreamm/discordbot", ("github", "Slipstreamm/discordbot")),
("www.github.com/Slipstreamm/discordbot", ("github", "Slipstreamm/discordbot")),
("https://github.com/Slipstreamm/git", ("github", "Slipstreamm/git")),
("https://gitlab.com/group/project", ("gitlab", "group/project")),
(
"https://gitlab.com/group/subgroup/project",
("gitlab", "group/subgroup/project"),
),
("invalid-url", (None, None)),
],
)
def test_parse_repo_url(url: str, expected: Tuple[Optional[str], Optional[str]]):
assert parse_repo_url(url) == expected

View File

@ -0,0 +1,19 @@
import os
def build_conn_string() -> str:
return (
f"postgresql://{os.getenv('POSTGRES_USER')}:{os.getenv('POSTGRES_PASSWORD')}@"
f"{os.getenv('POSTGRES_HOST')}:{os.getenv('POSTGRES_PORT')}/"
f"{os.getenv('POSTGRES_SETTINGS_DB')}"
)
def test_connection_string(monkeypatch):
monkeypatch.setenv("POSTGRES_USER", "user")
monkeypatch.setenv("POSTGRES_PASSWORD", "pass")
monkeypatch.setenv("POSTGRES_HOST", "localhost")
monkeypatch.setenv("POSTGRES_PORT", "5432")
monkeypatch.setenv("POSTGRES_SETTINGS_DB", "db")
assert build_conn_string() == "postgresql://user:pass@localhost:5432/db"