diff --git a/api_service/api_server.py b/api_service/api_server.py index 6d07983..cf31ec5 100644 --- a/api_service/api_server.py +++ b/api_service/api_server.py @@ -2082,9 +2082,9 @@ async def ai_moderation_action( from discordbot.db import mod_log_db bot = get_bot_instance() - # First attempt to add the mod log entry - case_id = await mod_log_db.add_mod_log( - bot.pg_pool, + # Use the thread-safe version of add_mod_log + case_id = await mod_log_db.add_mod_log_safe( + bot, # Pass the bot instance, not just the pool guild_id=action.guild_id, moderator_id=AI_MODERATOR_ID, target_user_id=action.user_id, @@ -2103,8 +2103,9 @@ async def ai_moderation_action( # If we have a case_id and message details, update the log entry if case_id and action.message_id and action.channel_id: - update_success = await mod_log_db.update_mod_log_message_details( - bot.pg_pool, + # Use the thread-safe version of update_mod_log_message_details + update_success = await mod_log_db.update_mod_log_message_details_safe( + bot, # Pass the bot instance, not just the pool case_id=case_id, message_id=action.message_id, channel_id=action.channel_id diff --git a/db/mod_log_db.py b/db/mod_log_db.py index 5bb5bf9..3306dd2 100644 --- a/db/mod_log_db.py +++ b/db/mod_log_db.py @@ -1,7 +1,9 @@ import asyncpg import logging import asyncio -from typing import Optional, List, Tuple +import functools +import concurrent.futures +from typing import Optional, List, Tuple, Any, Callable log = logging.getLogger(__name__) @@ -37,7 +39,104 @@ async def create_connection_with_retry(pool: asyncpg.Pool, max_retries: int = 3) log.exception(f"Unexpected error acquiring connection: {e}") return None, False - return None, False +# --- Cross-thread database operations --- + +def run_in_bot_loop(bot_instance, coro_func): + """ + Runs a coroutine in the bot's event loop, even if called from a different thread. + + Args: + bot_instance: The bot instance with access to the main event loop + coro_func: A coroutine function to run in the bot's event loop + + Returns: + The result of the coroutine function + """ + if bot_instance is None: + log.error("Cannot run in bot loop: bot_instance is None") + return None + + # Get the bot's event loop + loop = bot_instance.loop + if loop is None or loop.is_closed(): + log.error("Bot event loop is None or closed") + return None + + # If we're already in the right event loop, just run the coroutine + if asyncio.get_event_loop() == loop: + return asyncio.run_coroutine_threadsafe(coro_func(), loop).result() + + # Otherwise, use run_coroutine_threadsafe to run in the bot's loop + future = asyncio.run_coroutine_threadsafe(coro_func(), loop) + try: + # Wait for the result with a timeout + return future.result(timeout=10) + except concurrent.futures.TimeoutError: + log.error("Timeout waiting for database operation in bot loop") + return None + except Exception as e: + log.exception(f"Error running database operation in bot loop: {e}") + return None + +async def add_mod_log_safe(bot_instance, guild_id: int, moderator_id: int, target_user_id: int, + action_type: str, reason: Optional[str], duration_seconds: Optional[int] = None) -> Optional[int]: + """ + Thread-safe version of add_mod_log that ensures the operation runs in the bot's event loop. + This should be used when calling from API routes or other threads. + """ + # Import here to avoid circular imports + from global_bot_accessor import get_bot_instance + + # If bot_instance is not provided, try to get it from the global accessor + if bot_instance is None: + bot_instance = get_bot_instance() + if bot_instance is None: + log.error("Cannot add mod log safely: bot_instance is None and global accessor returned None") + return None + + # Define the coroutine to run in the bot's event loop + async def _add_mod_log_coro(): + if not hasattr(bot_instance, 'pg_pool') or bot_instance.pg_pool is None: + log.error("Bot pg_pool is None, cannot add mod log") + return None + return await add_mod_log(bot_instance.pg_pool, guild_id, moderator_id, target_user_id, + action_type, reason, duration_seconds) + + # If we're already in the bot's event loop, just run the coroutine directly + if asyncio.get_running_loop() == bot_instance.loop: + return await _add_mod_log_coro() + + # Otherwise, use the helper function to run in the bot's loop + return run_in_bot_loop(bot_instance, _add_mod_log_coro) + +async def update_mod_log_message_details_safe(bot_instance, case_id: int, message_id: int, channel_id: int) -> bool: + """ + Thread-safe version of update_mod_log_message_details that ensures the operation runs in the bot's event loop. + This should be used when calling from API routes or other threads. + """ + # Import here to avoid circular imports + from global_bot_accessor import get_bot_instance + + # If bot_instance is not provided, try to get it from the global accessor + if bot_instance is None: + bot_instance = get_bot_instance() + if bot_instance is None: + log.error("Cannot update mod log message details safely: bot_instance is None and global accessor returned None") + return False + + # Define the coroutine to run in the bot's event loop + async def _update_details_coro(): + if not hasattr(bot_instance, 'pg_pool') or bot_instance.pg_pool is None: + log.error("Bot pg_pool is None, cannot update mod log message details") + return False + return await update_mod_log_message_details(bot_instance.pg_pool, case_id, message_id, channel_id) + + # If we're already in the bot's event loop, just run the coroutine directly + if asyncio.get_running_loop() == bot_instance.loop: + return await _update_details_coro() + + # Otherwise, use the helper function to run in the bot's loop + return run_in_bot_loop(bot_instance, _update_details_coro) async def setup_moderation_log_table(pool: asyncpg.Pool): """ @@ -113,6 +212,15 @@ async def add_mod_log(pool: asyncpg.Pool, guild_id: int, moderator_id: int, targ else: log.error(f"Failed to add mod log entry for guild {guild_id}, action {action_type} - No case_id returned.") return None + except RuntimeError as e: + if "got Future" in str(e) and "attached to a different loop" in str(e): + log.error(f"Event loop error adding mod log entry for guild {guild_id}: {e}") + # This is likely happening because the function is being called from a different thread/event loop + # We'll need to handle this case differently in the future + return None + else: + log.exception(f"Error adding mod log entry: {e}") + return None except Exception as e: log.exception(f"Error adding mod log entry: {e}") return None @@ -120,6 +228,11 @@ async def add_mod_log(pool: asyncpg.Pool, guild_id: int, moderator_id: int, targ # Always release the connection back to the pool try: await pool.release(connection) + except RuntimeError as e: + if "got Future" in str(e) and "attached to a different loop" in str(e): + log.warning(f"Event loop error releasing connection: {e}") + else: + log.warning(f"Error releasing connection back to pool: {e}") except Exception as e: log.warning(f"Error releasing connection back to pool: {e}") # Continue execution even if we can't release the connection