This commit is contained in:
Slipstream 2025-05-06 19:52:46 -06:00
parent 96cf1d8bf0
commit 641c37437d
Signed by: slipstream
GPG Key ID: 13E498CE010AC6FD
2 changed files with 121 additions and 7 deletions

View File

@ -2082,9 +2082,9 @@ async def ai_moderation_action(
from discordbot.db import mod_log_db
bot = get_bot_instance()
# First attempt to add the mod log entry
case_id = await mod_log_db.add_mod_log(
bot.pg_pool,
# Use the thread-safe version of add_mod_log
case_id = await mod_log_db.add_mod_log_safe(
bot, # Pass the bot instance, not just the pool
guild_id=action.guild_id,
moderator_id=AI_MODERATOR_ID,
target_user_id=action.user_id,
@ -2103,8 +2103,9 @@ async def ai_moderation_action(
# If we have a case_id and message details, update the log entry
if case_id and action.message_id and action.channel_id:
update_success = await mod_log_db.update_mod_log_message_details(
bot.pg_pool,
# Use the thread-safe version of update_mod_log_message_details
update_success = await mod_log_db.update_mod_log_message_details_safe(
bot, # Pass the bot instance, not just the pool
case_id=case_id,
message_id=action.message_id,
channel_id=action.channel_id

View File

@ -1,7 +1,9 @@
import asyncpg
import logging
import asyncio
from typing import Optional, List, Tuple
import functools
import concurrent.futures
from typing import Optional, List, Tuple, Any, Callable
log = logging.getLogger(__name__)
@ -37,7 +39,104 @@ async def create_connection_with_retry(pool: asyncpg.Pool, max_retries: int = 3)
log.exception(f"Unexpected error acquiring connection: {e}")
return None, False
return None, False
# --- Cross-thread database operations ---
def run_in_bot_loop(bot_instance, coro_func):
"""
Runs a coroutine in the bot's event loop, even if called from a different thread.
Args:
bot_instance: The bot instance with access to the main event loop
coro_func: A coroutine function to run in the bot's event loop
Returns:
The result of the coroutine function
"""
if bot_instance is None:
log.error("Cannot run in bot loop: bot_instance is None")
return None
# Get the bot's event loop
loop = bot_instance.loop
if loop is None or loop.is_closed():
log.error("Bot event loop is None or closed")
return None
# If we're already in the right event loop, just run the coroutine
if asyncio.get_event_loop() == loop:
return asyncio.run_coroutine_threadsafe(coro_func(), loop).result()
# Otherwise, use run_coroutine_threadsafe to run in the bot's loop
future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
try:
# Wait for the result with a timeout
return future.result(timeout=10)
except concurrent.futures.TimeoutError:
log.error("Timeout waiting for database operation in bot loop")
return None
except Exception as e:
log.exception(f"Error running database operation in bot loop: {e}")
return None
async def add_mod_log_safe(bot_instance, guild_id: int, moderator_id: int, target_user_id: int,
action_type: str, reason: Optional[str], duration_seconds: Optional[int] = None) -> Optional[int]:
"""
Thread-safe version of add_mod_log that ensures the operation runs in the bot's event loop.
This should be used when calling from API routes or other threads.
"""
# Import here to avoid circular imports
from global_bot_accessor import get_bot_instance
# If bot_instance is not provided, try to get it from the global accessor
if bot_instance is None:
bot_instance = get_bot_instance()
if bot_instance is None:
log.error("Cannot add mod log safely: bot_instance is None and global accessor returned None")
return None
# Define the coroutine to run in the bot's event loop
async def _add_mod_log_coro():
if not hasattr(bot_instance, 'pg_pool') or bot_instance.pg_pool is None:
log.error("Bot pg_pool is None, cannot add mod log")
return None
return await add_mod_log(bot_instance.pg_pool, guild_id, moderator_id, target_user_id,
action_type, reason, duration_seconds)
# If we're already in the bot's event loop, just run the coroutine directly
if asyncio.get_running_loop() == bot_instance.loop:
return await _add_mod_log_coro()
# Otherwise, use the helper function to run in the bot's loop
return run_in_bot_loop(bot_instance, _add_mod_log_coro)
async def update_mod_log_message_details_safe(bot_instance, case_id: int, message_id: int, channel_id: int) -> bool:
"""
Thread-safe version of update_mod_log_message_details that ensures the operation runs in the bot's event loop.
This should be used when calling from API routes or other threads.
"""
# Import here to avoid circular imports
from global_bot_accessor import get_bot_instance
# If bot_instance is not provided, try to get it from the global accessor
if bot_instance is None:
bot_instance = get_bot_instance()
if bot_instance is None:
log.error("Cannot update mod log message details safely: bot_instance is None and global accessor returned None")
return False
# Define the coroutine to run in the bot's event loop
async def _update_details_coro():
if not hasattr(bot_instance, 'pg_pool') or bot_instance.pg_pool is None:
log.error("Bot pg_pool is None, cannot update mod log message details")
return False
return await update_mod_log_message_details(bot_instance.pg_pool, case_id, message_id, channel_id)
# If we're already in the bot's event loop, just run the coroutine directly
if asyncio.get_running_loop() == bot_instance.loop:
return await _update_details_coro()
# Otherwise, use the helper function to run in the bot's loop
return run_in_bot_loop(bot_instance, _update_details_coro)
async def setup_moderation_log_table(pool: asyncpg.Pool):
"""
@ -113,6 +212,15 @@ async def add_mod_log(pool: asyncpg.Pool, guild_id: int, moderator_id: int, targ
else:
log.error(f"Failed to add mod log entry for guild {guild_id}, action {action_type} - No case_id returned.")
return None
except RuntimeError as e:
if "got Future" in str(e) and "attached to a different loop" in str(e):
log.error(f"Event loop error adding mod log entry for guild {guild_id}: {e}")
# This is likely happening because the function is being called from a different thread/event loop
# We'll need to handle this case differently in the future
return None
else:
log.exception(f"Error adding mod log entry: {e}")
return None
except Exception as e:
log.exception(f"Error adding mod log entry: {e}")
return None
@ -120,6 +228,11 @@ async def add_mod_log(pool: asyncpg.Pool, guild_id: int, moderator_id: int, targ
# Always release the connection back to the pool
try:
await pool.release(connection)
except RuntimeError as e:
if "got Future" in str(e) and "attached to a different loop" in str(e):
log.warning(f"Event loop error releasing connection: {e}")
else:
log.warning(f"Error releasing connection back to pool: {e}")
except Exception as e:
log.warning(f"Error releasing connection back to pool: {e}")
# Continue execution even if we can't release the connection