diff --git a/api_service/api_server.py b/api_service/api_server.py index 10f9237..6d07983 100644 --- a/api_service/api_server.py +++ b/api_service/api_server.py @@ -10,6 +10,7 @@ from fastapi.staticfiles import StaticFiles from starlette.middleware.base import BaseHTTPMiddleware from starlette.middleware.sessions import SessionMiddleware import aiohttp +import asyncpg from database import Database # Existing DB import logging from pydantic_settings import BaseSettings, SettingsConfigDict @@ -2080,6 +2081,8 @@ async def ai_moderation_action( try: from discordbot.db import mod_log_db bot = get_bot_instance() + + # First attempt to add the mod log entry case_id = await mod_log_db.add_mod_log( bot.pg_pool, guild_id=action.guild_id, @@ -2089,16 +2092,44 @@ async def ai_moderation_action( reason=reason, duration_seconds=None ) - # Optionally update with message/channel info + + if not case_id: + log.error(f"Failed to add mod log entry for guild {guild_id}, user {action.user_id}, action {action_type}") + return { + "success": False, + "error": "Failed to add moderation log entry to database", + "message": "The action was recorded but could not be added to the moderation logs" + } + + # If we have a case_id and message details, update the log entry if case_id and action.message_id and action.channel_id: - await mod_log_db.update_mod_log_message_details( + update_success = await mod_log_db.update_mod_log_message_details( bot.pg_pool, case_id=case_id, message_id=action.message_id, channel_id=action.channel_id ) + + if not update_success: + log.warning( + f"Added mod log entry (case_id: {case_id}) but failed to update message details " + f"for guild {guild_id}, user {action.user_id}, action {action_type}" + ) + # Continue anyway since the main entry was added successfully + log.info(f"AI moderation action logged successfully for guild {guild_id}, user {action.user_id}, action {action_type}, case {case_id}") return {"success": True, "case_id": case_id} + + except asyncpg.exceptions.PostgresError as e: + # Handle database-specific errors + import traceback + tb = traceback.format_exc() + log.error( + f"Database error logging AI moderation action for guild {guild_id}, user {action.user_id}, " + f"action {action_type}. Exception: {e}\nTraceback: {tb}" + ) + return {"success": False, "error": f"Database error: {str(e)}", "traceback": tb} + except Exception as e: import traceback tb = traceback.format_exc() diff --git a/db/mod_log_db.py b/db/mod_log_db.py index d36746b..5bb5bf9 100644 --- a/db/mod_log_db.py +++ b/db/mod_log_db.py @@ -1,15 +1,57 @@ import asyncpg import logging -from typing import Optional, List, Dict, Any +import asyncio +from typing import Optional, List, Tuple log = logging.getLogger(__name__) +async def create_connection_with_retry(pool: asyncpg.Pool, max_retries: int = 3) -> Tuple[Optional[asyncpg.Connection], bool]: + """ + Creates a database connection with retry logic. + + Args: + pool: The connection pool to acquire a connection from + max_retries: Maximum number of retry attempts + + Returns: + Tuple containing (connection, success_flag) + """ + retry_count = 0 + retry_delay = 0.5 # Start with 500ms delay + + while retry_count < max_retries: + try: + connection = await pool.acquire() + return connection, True + except (asyncpg.exceptions.ConnectionDoesNotExistError, + asyncpg.exceptions.InterfaceError) as e: + retry_count += 1 + if retry_count < max_retries: + log.warning(f"Connection error when acquiring connection (attempt {retry_count}/{max_retries}): {e}") + await asyncio.sleep(retry_delay) + retry_delay *= 2 # Exponential backoff + else: + log.exception(f"Failed to acquire connection after {max_retries} attempts: {e}") + return None, False + except Exception as e: + log.exception(f"Unexpected error acquiring connection: {e}") + return None, False + + return None, False + async def setup_moderation_log_table(pool: asyncpg.Pool): """ Ensures the moderation_logs table and its indexes exist in the database. """ - async with pool.acquire() as connection: - try: + # Get a connection with retry logic + connection, success = await create_connection_with_retry(pool) + if not success or not connection: + log.error("Failed to acquire database connection for setting up moderation_logs table") + raise RuntimeError("Failed to acquire database connection for table setup") + + try: + # Use a transaction to ensure all schema changes are atomic + async with connection.transaction(): await connection.execute(""" CREATE TABLE IF NOT EXISTS moderation_logs ( case_id SERIAL PRIMARY KEY, @@ -36,9 +78,15 @@ async def setup_moderation_log_table(pool: asyncpg.Pool): CREATE INDEX IF NOT EXISTS idx_moderation_logs_moderator_id ON moderation_logs (moderator_id); """) log.info("Successfully ensured moderation_logs table and indexes exist.") + except Exception as e: + log.exception(f"Error setting up moderation_logs table: {e}") + raise # Re-raise the exception to indicate setup failure + finally: + # Always release the connection back to the pool + try: + await pool.release(connection) except Exception as e: - log.exception(f"Error setting up moderation_logs table: {e}") - raise # Re-raise the exception to indicate setup failure + log.warning(f"Error releasing connection back to pool: {e}") # --- Placeholder functions (to be implemented next) --- @@ -49,8 +97,15 @@ async def add_mod_log(pool: asyncpg.Pool, guild_id: int, moderator_id: int, targ VALUES ($1, $2, $3, $4, $5, $6) RETURNING case_id; """ + # Get a connection with retry logic + connection, success = await create_connection_with_retry(pool) + if not success or not connection: + log.error(f"Failed to acquire database connection for adding mod log entry for guild {guild_id}") + return None + try: - async with pool.acquire() as connection: + # Use a transaction to ensure atomicity + async with connection.transaction(): result = await connection.fetchrow(query, guild_id, moderator_id, target_user_id, action_type, reason, duration_seconds) if result: log.info(f"Added mod log entry for guild {guild_id}, action {action_type}. Case ID: {result['case_id']}") @@ -61,6 +116,13 @@ async def add_mod_log(pool: asyncpg.Pool, guild_id: int, moderator_id: int, targ except Exception as e: log.exception(f"Error adding mod log entry: {e}") return None + finally: + # Always release the connection back to the pool + try: + await pool.release(connection) + except Exception as e: + log.warning(f"Error releasing connection back to pool: {e}") + # Continue execution even if we can't release the connection async def update_mod_log_reason(pool: asyncpg.Pool, case_id: int, new_reason: str): """Updates the reason for a specific moderation log entry.""" @@ -69,8 +131,15 @@ async def update_mod_log_reason(pool: asyncpg.Pool, case_id: int, new_reason: st SET reason = $1 WHERE case_id = $2; """ + # Get a connection with retry logic + connection, success = await create_connection_with_retry(pool) + if not success or not connection: + log.error(f"Failed to acquire database connection for updating reason for case_id {case_id}") + return False + try: - async with pool.acquire() as connection: + # Use a transaction to ensure atomicity + async with connection.transaction(): result = await connection.execute(query, new_reason, case_id) if result == "UPDATE 1": log.info(f"Updated reason for case_id {case_id}") @@ -81,6 +150,13 @@ async def update_mod_log_reason(pool: asyncpg.Pool, case_id: int, new_reason: st except Exception as e: log.exception(f"Error updating mod log reason for case_id {case_id}: {e}") return False + finally: + # Always release the connection back to the pool + try: + await pool.release(connection) + except Exception as e: + log.warning(f"Error releasing connection back to pool: {e}") + # Continue execution even if we can't release the connection async def update_mod_log_message_details(pool: asyncpg.Pool, case_id: int, message_id: int, channel_id: int): """Updates the log_message_id and log_channel_id for a specific case.""" @@ -89,8 +165,15 @@ async def update_mod_log_message_details(pool: asyncpg.Pool, case_id: int, messa SET log_message_id = $1, log_channel_id = $2 WHERE case_id = $3; """ + # Get a connection with retry logic + connection, success = await create_connection_with_retry(pool) + if not success or not connection: + log.error(f"Failed to acquire database connection for updating message details for case_id {case_id}") + return False + try: - async with pool.acquire() as connection: + # Use a transaction to ensure atomicity + async with connection.transaction(): result = await connection.execute(query, message_id, channel_id, case_id) if result == "UPDATE 1": log.info(f"Updated message details for case_id {case_id}") @@ -101,17 +184,36 @@ async def update_mod_log_message_details(pool: asyncpg.Pool, case_id: int, messa except Exception as e: log.exception(f"Error updating mod log message details for case_id {case_id}: {e}") return False + finally: + # Always release the connection back to the pool + try: + await pool.release(connection) + except Exception as e: + log.warning(f"Error releasing connection back to pool: {e}") + # Continue execution even if we can't release the connection async def get_mod_log(pool: asyncpg.Pool, case_id: int) -> Optional[asyncpg.Record]: """Retrieves a specific moderation log entry by case_id.""" query = "SELECT * FROM moderation_logs WHERE case_id = $1;" + + # Get a connection with retry logic + connection, success = await create_connection_with_retry(pool) + if not success or not connection: + log.error(f"Failed to acquire database connection for retrieving mod log for case_id {case_id}") + return None + try: - async with pool.acquire() as connection: - record = await connection.fetchrow(query, case_id) - return record + record = await connection.fetchrow(query, case_id) + return record except Exception as e: log.exception(f"Error retrieving mod log for case_id {case_id}: {e}") return None + finally: + # Always release the connection back to the pool + try: + await pool.release(connection) + except Exception as e: + log.warning(f"Error releasing connection back to pool: {e}") async def get_user_mod_logs(pool: asyncpg.Pool, guild_id: int, target_user_id: int, limit: int = 50) -> List[asyncpg.Record]: """Retrieves moderation logs for a specific user in a guild, ordered by timestamp descending.""" @@ -121,13 +223,24 @@ async def get_user_mod_logs(pool: asyncpg.Pool, guild_id: int, target_user_id: i ORDER BY timestamp DESC LIMIT $3; """ + # Get a connection with retry logic + connection, success = await create_connection_with_retry(pool) + if not success or not connection: + log.error(f"Failed to acquire database connection for retrieving user mod logs for user {target_user_id} in guild {guild_id}") + return [] + try: - async with pool.acquire() as connection: - records = await connection.fetch(query, guild_id, target_user_id, limit) - return records + records = await connection.fetch(query, guild_id, target_user_id, limit) + return records except Exception as e: log.exception(f"Error retrieving user mod logs for user {target_user_id} in guild {guild_id}: {e}") return [] + finally: + # Always release the connection back to the pool + try: + await pool.release(connection) + except Exception as e: + log.warning(f"Error releasing connection back to pool: {e}") async def get_guild_mod_logs(pool: asyncpg.Pool, guild_id: int, limit: int = 50) -> List[asyncpg.Record]: """Retrieves the latest moderation logs for a guild, ordered by timestamp descending.""" @@ -137,10 +250,21 @@ async def get_guild_mod_logs(pool: asyncpg.Pool, guild_id: int, limit: int = 50) ORDER BY timestamp DESC LIMIT $2; """ + # Get a connection with retry logic + connection, success = await create_connection_with_retry(pool) + if not success or not connection: + log.error(f"Failed to acquire database connection for retrieving guild mod logs for guild {guild_id}") + return [] + try: - async with pool.acquire() as connection: - records = await connection.fetch(query, guild_id, limit) - return records + records = await connection.fetch(query, guild_id, limit) + return records except Exception as e: log.exception(f"Error retrieving guild mod logs for guild {guild_id}: {e}") return [] + finally: + # Always release the connection back to the pool + try: + await pool.release(connection) + except Exception as e: + log.warning(f"Error releasing connection back to pool: {e}")