# moderation_cog.py import discord from discord.ext import commands from discord import app_commands # import aiohttp # For making asynchronous HTTP requests - Replaced by Google GenAI client import json import os # To load environment variables import collections # For deque import datetime # For timestamps import io # For BytesIO operations import base64 # For encoding images to base64 from PIL import Image # For image processing import cv2 # For video processing import numpy as np # For array operations import tempfile # For temporary file operations import shutil # For backing up files from typing import Optional, List, Dict, Any, Tuple # For type hinting import asyncio import aiofiles import re # Google Generative AI Imports (using Vertex AI backend) from google import genai from google.genai import types from google.api_core import exceptions as google_exceptions # Import project configuration for Vertex AI from gurt.config import ( PROJECT_ID, LOCATION, ) # Assuming gurt.config exists and has these from gurt.genai_client import get_genai_client_for_model from . import aimod_config as aimod_config_module from .aimod_config import ( DEFAULT_VERTEX_AI_MODEL, STANDARD_SAFETY_SETTINGS, GUILD_CONFIG_PATH, USER_INFRACTIONS_PATH, INFRACTION_BACKUP_DIR, USER_APPEALS_PATH, APPEAL_AI_MODEL, APPEAL_AI_THINKING_BUDGET, CONFIG_LOCK, save_user_infractions, save_user_appeals, get_guild_config, set_guild_config, get_user_infraction_history, add_user_infraction, get_user_appeals, add_user_appeal, SERVER_RULES, MODERATION_INSTRUCTIONS, SUICIDAL_HELP_RESOURCES, ) # Avoid loading an excessive number of messages when updating rules MAX_RULE_MESSAGES = 25 class AIModerationCog(commands.Cog): """ A Discord Cog that uses Google Vertex AI to moderate messages based on server rules. """ def __init__(self, bot: commands.Bot): self.bot = bot self.genai_client = None try: if PROJECT_ID and LOCATION: self.genai_client = genai.Client( vertexai=True, project=PROJECT_ID, location=LOCATION, ) print( f"AIModerationCog: Google GenAI Client initialized for Vertex AI project '{PROJECT_ID}' in location '{LOCATION}'." ) else: print( "AIModerationCog: PROJECT_ID or LOCATION not found in config. Google GenAI Client not initialized." ) except Exception as e: print( f"AIModerationCog: Error initializing Google GenAI Client for Vertex AI: {e}" ) self.last_ai_decisions = collections.deque( maxlen=5 ) # Store last 5 AI decisions self.config_lock = CONFIG_LOCK # Supported image file extensions self.image_extensions = [ ".jpg", ".jpeg", ".png", ".webp", ".bmp", ".heic", ".heif", ] # Added heic/heif for Vertex # Supported animated file extensions self.gif_extensions = [".gif"] # Supported video file extensions (Vertex AI can process short video clips directly) self.video_extensions = [ ".mp4", ".webm", ".mov", ".avi", ".mkv", ".flv", ] # Expanded list self.backup_task = self.bot.loop.create_task( self.backup_infractions_periodically() ) print("AIModerationCog Initialized.") def is_testing_mode(self, guild_id: int) -> bool: """Return True if testing mode is enabled for the guild.""" return get_guild_config(guild_id, "TESTING_MODE", False) class QuickActionView(discord.ui.View): """Buttons for quick moderator actions.""" def __init__(self, parent: "AIModerationCog", target: discord.Member): super().__init__(timeout=3600) self.parent = parent self.target = target self.message: discord.Message | None = None # --- Helper Modals --- class BanModal(discord.ui.Modal, title="Ban User"): reason = discord.ui.TextInput( label="Reason", placeholder="Reason for ban", style=discord.TextStyle.paragraph, required=False, max_length=512, ) def __init__(self, view: "AIModerationCog.QuickActionView"): super().__init__() self.view = view async def on_submit(self, interaction: discord.Interaction): if not interaction.user.guild_permissions.ban_members: await interaction.response.send_message( "You lack permission to ban members.", ephemeral=True ) return if self.view.parent.is_testing_mode(interaction.guild.id): await interaction.response.send_message( f"[TEST MODE] Would ban {self.view.target.mention}.", ephemeral=True, ) return try: await self.view.target.ban( reason=self.reason.value or "Escalated via mod panel" ) await interaction.response.send_message( f"Banned {self.view.target.mention}.", ephemeral=True ) except Exception as e: # noqa: BLE001 await interaction.response.send_message( f"Failed to ban: {e}", ephemeral=True ) self.view.disable_all_items() if self.view.message: await self.view.message.edit(view=self.view) class KickModal(discord.ui.Modal, title="Kick User"): reason = discord.ui.TextInput( label="Reason", placeholder="Reason for kick", style=discord.TextStyle.paragraph, required=False, max_length=512, ) def __init__(self, view: "AIModerationCog.QuickActionView"): super().__init__() self.view = view async def on_submit(self, interaction: discord.Interaction): if not interaction.user.guild_permissions.kick_members: await interaction.response.send_message( "You lack permission to kick members.", ephemeral=True ) return if self.view.parent.is_testing_mode(interaction.guild.id): await interaction.response.send_message( f"[TEST MODE] Would kick {self.view.target.mention}.", ephemeral=True, ) return try: await self.view.target.kick( reason=self.reason.value or "Escalated via mod panel" ) await interaction.response.send_message( f"Kicked {self.view.target.mention}.", ephemeral=True ) except Exception as e: # noqa: BLE001 await interaction.response.send_message( f"Failed to kick: {e}", ephemeral=True ) self.view.disable_all_items() if self.view.message: await self.view.message.edit(view=self.view) class TimeoutModal(discord.ui.Modal, title="Timeout User"): duration = discord.ui.TextInput( label="Duration", placeholder="e.g. 10m, 1h, 1d", required=True, max_length=10, ) reason = discord.ui.TextInput( label="Reason", placeholder="Reason for timeout", style=discord.TextStyle.paragraph, required=False, max_length=512, ) def __init__(self, view: "AIModerationCog.QuickActionView"): super().__init__() self.view = view @staticmethod def parse_duration(duration_str: str) -> datetime.timedelta | None: if not duration_str: return None try: amount = int("".join(filter(str.isdigit, duration_str))) unit = "".join(filter(str.isalpha, duration_str)).lower() if unit in {"d", "day", "days"}: return datetime.timedelta(days=amount) if unit in {"h", "hour", "hours"}: return datetime.timedelta(hours=amount) if unit in {"m", "min", "minute", "minutes"}: return datetime.timedelta(minutes=amount) if unit in {"s", "sec", "second", "seconds"}: return datetime.timedelta(seconds=amount) except (ValueError, TypeError): return None return None async def on_submit(self, interaction: discord.Interaction): if not interaction.user.guild_permissions.moderate_members: await interaction.response.send_message( "You lack permission to timeout members.", ephemeral=True ) return if self.view.parent.is_testing_mode(interaction.guild.id): await interaction.response.send_message( f"[TEST MODE] Would timeout {self.view.target.mention} for {self.duration.value}.", ephemeral=True, ) return delta = self.parse_duration(self.duration.value) if not delta or delta > datetime.timedelta(days=28): await interaction.response.send_message( "Invalid duration. Use formats like '10m', '1h', '1d'", ephemeral=True, ) return try: until = discord.utils.utcnow() + delta await self.view.target.timeout( until, reason=self.reason.value or "Escalated via mod panel" ) await interaction.response.send_message( f"Timed out {self.view.target.mention} for {self.duration.value}.", ephemeral=True, ) except Exception as e: # noqa: BLE001 await interaction.response.send_message( f"Failed to timeout: {e}", ephemeral=True ) self.view.disable_all_items() if self.view.message: await self.view.message.edit(view=self.view) @discord.ui.button(label="Escalate Ban", style=discord.ButtonStyle.danger) async def escalate( self, interaction: discord.Interaction, button: discord.ui.Button ): if not interaction.user.guild_permissions.ban_members: await interaction.response.send_message( "You lack permission to ban members.", ephemeral=True ) return self.message = interaction.message await interaction.response.send_modal(self.BanModal(self)) @discord.ui.button(label="Kick", style=discord.ButtonStyle.primary) async def kick( self, interaction: discord.Interaction, button: discord.ui.Button ): if not interaction.user.guild_permissions.kick_members: await interaction.response.send_message( "You lack permission to kick members.", ephemeral=True ) return self.message = interaction.message await interaction.response.send_modal(self.KickModal(self)) @discord.ui.button(label="Timeout", style=discord.ButtonStyle.secondary) async def timeout_action( self, interaction: discord.Interaction, button: discord.ui.Button ): if not interaction.user.guild_permissions.moderate_members: await interaction.response.send_message( "You lack permission to timeout members.", ephemeral=True ) return self.message = interaction.message await interaction.response.send_modal(self.TimeoutModal(self)) @discord.ui.button(label="Ignore", style=discord.ButtonStyle.secondary) async def ignore( self, interaction: discord.Interaction, button: discord.ui.Button ): if interaction.user.guild_permissions.manage_messages: await interaction.message.delete() await interaction.response.send_message( "Notification dismissed.", ephemeral=True ) else: await interaction.response.send_message( "No permission to manage messages.", ephemeral=True ) async def cog_load(self): """Called when the cog is loaded.""" print("AIModerationCog cog_load started.") if not self.genai_client: print("\n" + "=" * 60) print( "=== WARNING: AIModerationCog - Vertex AI Client not initialized! ===" ) print("=== The Moderation Cog requires a valid Vertex AI setup. ===") print( f"=== Check PROJECT_ID and LOCATION in gurt.config and GCP authentication. ===" ) print("=" * 60 + "\n") else: print("AIModerationCog: Vertex AI Client seems to be initialized.") print("AIModerationCog cog_load finished.") # _load_openrouter_models is no longer needed. async def cog_unload(self): """Clean up when the cog is unloaded.""" # The genai.Client doesn't have an explicit close method in the same way aiohttp.ClientSession does. # It typically manages its own resources. print("AIModerationCog Unloaded.") if self.backup_task: self.backup_task.cancel() async def backup_infractions_periodically(self): """Periodically back up the infractions file.""" await self.bot.wait_until_ready() while not self.bot.is_closed(): timestamp = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S") backup_path = os.path.join( INFRACTION_BACKUP_DIR, f"user_infractions_{timestamp}.json" ) try: shutil.copy(USER_INFRACTIONS_PATH, backup_path) except Exception as e: # noqa: BLE001 print(f"Failed to back up infractions: {e}") await asyncio.sleep(24 * 60 * 60) async def process_image(self, attachment: discord.Attachment) -> tuple[str, bytes]: """ Process an image attachment and return its base64 encoding. Args: attachment: The Discord attachment containing the image Returns: Tuple of (mime_type, image_bytes) """ try: # Download the image image_bytes = await attachment.read() mime_type = ( attachment.content_type or "image/jpeg" ) # Default to jpeg if not specified # Return the image bytes and mime type return mime_type, image_bytes except Exception as e: print(f"Error processing image: {e}") return None, None async def process_gif(self, attachment: discord.Attachment) -> tuple[str, bytes]: """Return the raw bytes for a GIF attachment.""" try: gif_bytes = await attachment.read() mime_type = attachment.content_type or "image/gif" return mime_type, gif_bytes except Exception as e: print(f"Error processing GIF: {e}") return None, None async def process_attachment( self, attachment: discord.Attachment ) -> tuple[str, bytes, str]: """ Process any attachment and return the appropriate image data. Args: attachment: The Discord attachment Returns: Tuple of (mime_type, image_bytes, attachment_type) attachment_type is one of: 'image', 'gif', 'video', or None if unsupported """ if not attachment: return None, None, None # Get the file extension filename = attachment.filename.lower() _, ext = os.path.splitext(filename) # Process based on file type if ext in self.image_extensions: mime_type, image_bytes = await self.process_image(attachment) return mime_type, image_bytes, "image" elif ext in self.gif_extensions: mime_type, image_bytes = await self.process_gif(attachment) return mime_type, image_bytes, "gif" elif ext in self.video_extensions: mime_type, image_bytes = await self.process_video(attachment) return mime_type, image_bytes, "video" else: print(f"Unsupported file type: {ext}") return None, None, None async def process_video(self, attachment: discord.Attachment) -> tuple[str, bytes]: """Return the raw bytes for a video attachment.""" try: video_bytes = await attachment.read() mime_type = attachment.content_type or "video/mp4" return mime_type, video_bytes except Exception as e: print(f"Error processing video: {e}") return None, None async def process_url_attachment(self, url: str) -> tuple[str, bytes, str, str]: """Fetch an attachment from a direct link.""" import aiohttp try: cleaned_url = url.strip("<>") filename = cleaned_url.split("/")[-1].split("?")[0] _, ext = os.path.splitext(filename.lower()) if ext in self.image_extensions: attachment_type = "image" elif ext in self.gif_extensions: attachment_type = "gif" elif ext in self.video_extensions: attachment_type = "video" else: return None, None, None, None async with aiohttp.ClientSession() as session: async with session.get(cleaned_url) as resp: if resp.status != 200: print( f"Failed to fetch URL attachment {cleaned_url}: {resp.status}" ) return None, None, None, None data = await resp.read() mime_type = resp.headers.get( "Content-Type", f"image/{ext.lstrip('.')}" ) return mime_type, data, attachment_type, filename except Exception as e: print(f"Error processing URL attachment {url}: {e}") return None, None, None, None def extract_direct_attachment_urls(self, text: str) -> List[str]: """Return a list of direct image/video URLs found in the text.""" urls = re.findall(r"https?://\S+", text or "") allowed_exts = ( self.image_extensions + self.gif_extensions + self.video_extensions ) results = [] for u in urls: cleaned = u.strip("<>") path = cleaned.split("?")[0] _, ext = os.path.splitext(path.lower()) if ext in allowed_exts: results.append(cleaned) return results # --- AI Moderation Command Group --- aimod_group = app_commands.Group( name="aimod", description="AI Moderation commands." ) config_subgroup = app_commands.Group( name="config", description="Configure AI moderation settings.", parent=aimod_group, ) infractions_subgroup = app_commands.Group( name="infractions", description="Manage user infractions.", parent=aimod_group ) appeal_subgroup = app_commands.Group( name="appeal", description="Appeal AI moderation actions.", parent=aimod_group ) model_subgroup = app_commands.Group( name="model", description="Manage the AI model for moderation.", parent=aimod_group, ) debug_subgroup = app_commands.Group( name="debug", description="Debugging commands for AI moderation.", parent=aimod_group, ) @aimod_group.command( name="sync", description="Reload AI moderation configuration and infractions from disk.", ) @app_commands.checks.has_permissions(administrator=True) async def aimod_sync(self, interaction: discord.Interaction): """Reload configuration files from disk.""" try: async with aiofiles.open(GUILD_CONFIG_PATH, "r", encoding="utf-8") as f: data = await f.read() async with CONFIG_LOCK: global GUILD_CONFIG GUILD_CONFIG = json.loads(data) async with aiofiles.open( USER_INFRACTIONS_PATH, "r", encoding="utf-8" ) as f2: data2 = await f2.read() async with CONFIG_LOCK: global USER_INFRACTIONS USER_INFRACTIONS = json.loads(data2) await interaction.response.send_message( "Configuration synced from disk.", ephemeral=True ) except Exception as e: # noqa: BLE001 await interaction.response.send_message( f"Failed to reload configuration: {e}", ephemeral=True ) @config_subgroup.command( name="log_channel", description="Set the moderation log channel." ) @app_commands.describe(channel="The text channel to use for moderation logs.") @app_commands.checks.has_permissions(administrator=True) async def modset_log_channel( self, interaction: discord.Interaction, channel: discord.TextChannel ): await set_guild_config(interaction.guild.id, "MOD_LOG_CHANNEL_ID", channel.id) await interaction.response.send_message( f"Moderation log channel set to {channel.mention}.", ephemeral=False ) @config_subgroup.command( name="suggestions_channel", description="Set the suggestions channel." ) @app_commands.describe(channel="The text channel to use for suggestions.") @app_commands.checks.has_permissions(administrator=True) async def modset_suggestions_channel( self, interaction: discord.Interaction, channel: discord.TextChannel ): await set_guild_config( interaction.guild.id, "SUGGESTIONS_CHANNEL_ID", channel.id ) await interaction.response.send_message( f"Suggestions channel set to {channel.mention}.", ephemeral=False ) @config_subgroup.command( name="moderator_role", description="Set the moderator role." ) @app_commands.describe(role="The role that identifies moderators.") @app_commands.checks.has_permissions(administrator=True) async def modset_moderator_role( self, interaction: discord.Interaction, role: discord.Role ): await set_guild_config(interaction.guild.id, "MODERATOR_ROLE_ID", role.id) await interaction.response.send_message( f"Moderator role set to {role.mention}.", ephemeral=False ) @config_subgroup.command( name="suicidal_ping_role", description="Set the role to ping for suicidal content.", ) @app_commands.describe(role="The role to ping for urgent suicidal content alerts.") @app_commands.checks.has_permissions(administrator=True) async def modset_suicidal_ping_role( self, interaction: discord.Interaction, role: discord.Role ): await set_guild_config(interaction.guild.id, "SUICIDAL_PING_ROLE_ID", role.id) await interaction.response.send_message( f"Suicidal content ping role set to {role.mention}.", ephemeral=False ) @config_subgroup.command( name="add_nsfw_channel", description="Add a channel to the list of NSFW channels.", ) @app_commands.describe(channel="The text channel to mark as NSFW for the bot.") @app_commands.checks.has_permissions(administrator=True) async def modset_add_nsfw_channel( self, interaction: discord.Interaction, channel: discord.TextChannel ): guild_id = interaction.guild.id nsfw_channels: list[int] = get_guild_config(guild_id, "NSFW_CHANNEL_IDS", []) if channel.id not in nsfw_channels: nsfw_channels.append(channel.id) await set_guild_config(guild_id, "NSFW_CHANNEL_IDS", nsfw_channels) await interaction.response.send_message( f"{channel.mention} added to NSFW channels list.", ephemeral=False ) else: await interaction.response.send_message( f"{channel.mention} is already in the NSFW channels list.", ephemeral=True, ) @config_subgroup.command( name="remove_nsfw_channel", description="Remove a channel from the list of NSFW channels.", ) @app_commands.describe(channel="The text channel to remove from the NSFW list.") @app_commands.checks.has_permissions(administrator=True) async def modset_remove_nsfw_channel( self, interaction: discord.Interaction, channel: discord.TextChannel ): guild_id = interaction.guild.id nsfw_channels: list[int] = get_guild_config(guild_id, "NSFW_CHANNEL_IDS", []) if channel.id in nsfw_channels: nsfw_channels.remove(channel.id) await set_guild_config(guild_id, "NSFW_CHANNEL_IDS", nsfw_channels) await interaction.response.send_message( f"{channel.mention} removed from NSFW channels list.", ephemeral=False ) else: await interaction.response.send_message( f"{channel.mention} is not in the NSFW channels list.", ephemeral=True ) @config_subgroup.command( name="list_nsfw_channels", description="List currently configured NSFW channels.", ) @app_commands.checks.has_permissions(administrator=True) async def modset_list_nsfw_channels(self, interaction: discord.Interaction): guild_id = interaction.guild.id nsfw_channel_ids: list[int] = get_guild_config(guild_id, "NSFW_CHANNEL_IDS", []) if not nsfw_channel_ids: await interaction.response.send_message( "No NSFW channels are currently configured.", ephemeral=False ) return channel_mentions = [] for channel_id in nsfw_channel_ids: channel_obj = interaction.guild.get_channel(channel_id) if channel_obj: channel_mentions.append(channel_obj.mention) else: channel_mentions.append(f"ID:{channel_id} (not found)") await interaction.response.send_message( f"Configured NSFW channels:\n- " + "\n- ".join(channel_mentions), ephemeral=False, ) # Note: The @app_commands.command(name="modenable", ...) and other commands like # viewinfractions, clearinfractions, modsetmodel, modgetmodel remain as top-level commands # as they were not part of the original "modset" generic command structure. # If these also need to be grouped, that would be a separate consideration. @config_subgroup.command( name="enable", description="Enable or disable moderation for this guild (admin only).", ) @app_commands.describe(enabled="Enable moderation (true/false)") async def modenable(self, interaction: discord.Interaction, enabled: bool): if not interaction.user.guild_permissions.administrator: await interaction.response.send_message( "You must be an administrator to use this command.", ephemeral=False ) return await set_guild_config(interaction.guild.id, "ENABLED", enabled) await interaction.response.send_message( f"Moderation is now {'enabled' if enabled else 'disabled'} for this guild.", ephemeral=False, ) @config_subgroup.command( name="event_mode", description="Toggle temporary event mode for this guild.", ) @app_commands.describe(enabled="Enable event mode (true/false)") async def event_mode(self, interaction: discord.Interaction, enabled: bool): if not interaction.user.guild_permissions.administrator: await interaction.response.send_message( "You must be an administrator to use this command.", ephemeral=False ) return await set_guild_config(interaction.guild.id, "EVENT_MODE", enabled) await interaction.response.send_message( f"Event mode is now {'enabled' if enabled else 'disabled'}.", ephemeral=False, ) @config_subgroup.command( name="testing_mode", description="Enable or disable testing mode (no actions are taken).", ) @app_commands.describe(enabled="Enable testing mode (true/false)") async def testing_mode(self, interaction: discord.Interaction, enabled: bool): if not interaction.user.guild_permissions.administrator: await interaction.response.send_message( "You must be an administrator to use this command.", ephemeral=False ) return await set_guild_config(interaction.guild.id, "TESTING_MODE", enabled) await interaction.response.send_message( f"Testing mode is now {'enabled' if enabled else 'disabled'}.", ephemeral=False, ) @config_subgroup.command( name="update_rules", description="Update server rules from the specified channel.", ) @app_commands.describe(channel="The channel containing the server rules.") @app_commands.checks.has_permissions(administrator=True) async def update_rules( self, interaction: discord.Interaction, channel: discord.TextChannel ) -> None: """Pull the server rules from a channel and update the global config.""" messages = [] async for msg in channel.history( limit=MAX_RULE_MESSAGES + 1, oldest_first=True ): if msg.content: messages.append(msg.content) if len(messages) > MAX_RULE_MESSAGES: await interaction.response.send_message( f"Channel has more than {MAX_RULE_MESSAGES} messages." " Please consolidate your rules into fewer messages.", ephemeral=True, ) return if not messages: await interaction.response.send_message( "No messages found in that channel.", ephemeral=True ) return rules_text = "\n\n".join(messages).strip() aimod_config_module.SERVER_RULES = rules_text await interaction.response.send_message( f"Server rules updated from {channel.mention}.", ephemeral=False ) @config_subgroup.command( name="reset_rules", description="Reset server rules to the default hardcoded version.", ) @app_commands.checks.has_permissions(administrator=True) async def reset_rules(self, interaction: discord.Interaction) -> None: """Reset the server rules to the default string.""" aimod_config_module.SERVER_RULES = aimod_config_module.DEFAULT_SERVER_RULES await interaction.response.send_message( "Server rules have been reset to the default.", ephemeral=False ) @config_subgroup.command( name="update_instructions", description="Update moderation instructions from the specified channel.", ) @app_commands.describe( channel="The channel containing the moderation instructions." ) @app_commands.checks.has_permissions(administrator=True) async def update_instructions( self, interaction: discord.Interaction, channel: discord.TextChannel ) -> None: """Pull moderation instructions from a channel and update the global config.""" messages = [] async for msg in channel.history( limit=MAX_RULE_MESSAGES + 1, oldest_first=True ): if msg.content: messages.append(msg.content) if len(messages) > MAX_RULE_MESSAGES: await interaction.response.send_message( f"Channel has more than {MAX_RULE_MESSAGES} messages." " Please consolidate your instructions into fewer messages.", ephemeral=True, ) return if not messages: await interaction.response.send_message( "No messages found in that channel.", ephemeral=True ) return instructions_text = "\n\n".join(messages).strip() aimod_config_module.MODERATION_INSTRUCTIONS = instructions_text await interaction.response.send_message( f"Moderation instructions updated from {channel.mention}.", ephemeral=False ) @config_subgroup.command( name="reset_instructions", description="Reset moderation instructions to the default version.", ) @app_commands.checks.has_permissions(administrator=True) async def reset_instructions(self, interaction: discord.Interaction) -> None: """Reset moderation instructions to the default string.""" aimod_config_module.MODERATION_INSTRUCTIONS = ( aimod_config_module.DEFAULT_MODERATION_INSTRUCTIONS ) await interaction.response.send_message( "Moderation instructions have been reset to the default.", ephemeral=False ) @infractions_subgroup.command( name="view", description="View a user's AI moderation infraction history (mod/admin only).", ) @app_commands.describe(user="The user to view infractions for") async def viewinfractions( self, interaction: discord.Interaction, user: discord.Member ): # Check if user has permission (admin or moderator role) moderator_role_id = get_guild_config(interaction.guild.id, "MODERATOR_ROLE_ID") moderator_role = ( interaction.guild.get_role(moderator_role_id) if moderator_role_id else None ) has_permission = interaction.user.guild_permissions.administrator or ( moderator_role and moderator_role in interaction.user.roles ) if not has_permission: await interaction.response.send_message( "You must be an administrator or have the moderator role to use this command.", ephemeral=True, ) return # Get the user's infraction history infractions = get_user_infraction_history(interaction.guild.id, user.id) if not infractions: await interaction.response.send_message( f"{user.mention} has no recorded infractions.", ephemeral=False ) return # Create an embed to display the infractions embed = discord.Embed( title=f"Infraction History for {user.display_name}", description=f"User ID: {user.id}", color=discord.Color.orange(), ) # Add each infraction to the embed for i, infraction in enumerate(infractions, 1): timestamp = infraction.get("timestamp", "Unknown date")[:19].replace( "T", " " ) # Format ISO timestamp rule = infraction.get("rule_violated", "Unknown rule") action = infraction.get("action_taken", "Unknown action") reason = infraction.get("reasoning", "No reason provided") # Truncate reason if it's too long if len(reason) > 200: reason = reason[:197] + "..." embed.add_field( name=f"Infraction #{i} - {timestamp}", value=f"**Rule Violated:** {rule}\n**Action Taken:** {action}\n**Reason:** {reason}", inline=False, ) embed.set_footer(text=f"Total infractions: {len(infractions)}") embed.timestamp = discord.utils.utcnow() await interaction.response.send_message(embed=embed, ephemeral=False) @appeal_subgroup.command( name="human_review", description="Request a human moderator to review your case.", ) @app_commands.describe( reason="Explain why you want a human to review the AI decision", guild_id="If using in DMs, provide the server ID", ) async def appeal_human_review( self, interaction: discord.Interaction, reason: str, guild_id: int | None = None, ): """Let a user request a manual moderator review.""" guild = interaction.guild or ( self.bot.get_guild(guild_id) if guild_id else None ) if not guild: await interaction.response.send_message( "Invalid or missing guild ID.", ephemeral=True ) return log_channel_id = get_guild_config(guild.id, "MOD_LOG_CHANNEL_ID") log_channel = self.bot.get_channel(log_channel_id) if log_channel_id else None if not log_channel: await interaction.response.send_message( "Appeals are not enabled for this server.", ephemeral=True ) return timestamp = datetime.datetime.utcnow().isoformat() await add_user_appeal( guild.id, interaction.user.id, "HUMAN_REVIEW", reason, timestamp, "" ) embed = discord.Embed( title="Human Review Requested", color=discord.Color.orange() ) embed.add_field( name="User", value=f"{interaction.user} ({interaction.user.id})", inline=False, ) embed.add_field(name="Request", value=reason, inline=False) embed.timestamp = discord.utils.utcnow() await log_channel.send(embed=embed) await interaction.response.send_message( "Your request for a human review has been sent.", ephemeral=True ) @infractions_subgroup.command( name="clear", description="Clear a user's AI moderation infraction history (admin only).", ) @app_commands.describe(user="The user to clear infractions for") async def clearinfractions( self, interaction: discord.Interaction, user: discord.Member ): # Check if user has administrator permission if not interaction.user.guild_permissions.administrator: await interaction.response.send_message( "You must be an administrator to use this command.", ephemeral=True ) return # Get the user's infraction history key = f"{interaction.guild.id}_{user.id}" infractions = USER_INFRACTIONS.get(key, []) if not infractions: await interaction.response.send_message( f"{user.mention} has no recorded infractions to clear.", ephemeral=False ) return # Clear the user's infractions USER_INFRACTIONS[key] = [] await save_user_infractions() await interaction.response.send_message( f"Cleared {len(infractions)} infraction(s) for {user.mention}.", ephemeral=False, ) @infractions_subgroup.command( name="leaderboard", description="Show users with the fewest infractions.", ) async def leaderboard(self, interaction: discord.Interaction): guild_id = interaction.guild.id counts = {} for key, infractions in USER_INFRACTIONS.items(): if key.startswith(f"{guild_id}_"): uid = int(key.split("_", 1)[1]) counts[uid] = len(infractions) if not counts: await interaction.response.send_message( "No infractions recorded for this guild.", ephemeral=True ) return sorted_users = sorted(counts.items(), key=lambda x: x[1])[:5] lines = [] for uid, count in sorted_users: member = interaction.guild.get_member(uid) name = member.display_name if member else f"ID:{uid}" lines.append(f"**{name}** - {count} infractions") embed = discord.Embed( title="Best Behavior Leaderboard", description="\n".join(lines), color=discord.Color.green(), ) await interaction.response.send_message(embed=embed, ephemeral=False) @infractions_subgroup.command( name="restore", description="Restore infractions from the latest backup (admin only).", ) @app_commands.checks.has_permissions(administrator=True) async def restore_infractions(self, interaction: discord.Interaction): backups = sorted(os.listdir(INFRACTION_BACKUP_DIR)) if not backups: await interaction.response.send_message("No backups found.", ephemeral=True) return latest = os.path.join(INFRACTION_BACKUP_DIR, backups[-1]) try: shutil.copy(latest, USER_INFRACTIONS_PATH) async with aiofiles.open(USER_INFRACTIONS_PATH, "r", encoding="utf-8") as f: data = await f.read() async with CONFIG_LOCK: global USER_INFRACTIONS USER_INFRACTIONS = json.loads(data) await interaction.response.send_message( f"Infractions restored from {backups[-1]}", ephemeral=False ) except Exception as e: # noqa: BLE001 await interaction.response.send_message( f"Failed to restore infractions: {e}", ephemeral=True ) @appeal_subgroup.command(name="submit", description="Submit a moderation appeal.") @app_commands.describe( action="The action you are appealing", reason="Explain why you believe the action was incorrect", guild_id="If using in DMs, provide the server ID", message_id="ID of the moderated message you are appealing (optional)", ) async def appeal_submit( self, interaction: discord.Interaction, action: str, reason: str, guild_id: int | None = None, message_id: int | None = None, ): guild = interaction.guild or ( self.bot.get_guild(guild_id) if guild_id else None ) if not guild: await interaction.response.send_message( "Invalid or missing guild ID.", ephemeral=True ) return log_channel_id = get_guild_config(guild.id, "MOD_LOG_CHANNEL_ID") log_channel = self.bot.get_channel(log_channel_id) if log_channel_id else None if not log_channel: await interaction.response.send_message( "Appeals are not enabled for this server.", ephemeral=True ) return infractions = get_user_infraction_history(guild.id, interaction.user.id) target_infraction = None if message_id: for infr in infractions[::-1]: if infr.get("message_id") == message_id: target_infraction = infr break if not target_infraction and infractions: target_infraction = infractions[-1] ai_review = await self.run_appeal_ai( guild, interaction.user, action, reason, target_infraction, ) timestamp = datetime.datetime.utcnow().isoformat() ref = target_infraction.get("message_id") if target_infraction else None await add_user_appeal( guild.id, interaction.user.id, action, reason, timestamp, ai_review, str(ref) if ref else None, ) embed = discord.Embed(title="New Appeal", color=discord.Color.blue()) embed.add_field( name="User", value=f"{interaction.user} ({interaction.user.id})", inline=False, ) embed.add_field(name="Action", value=action, inline=False) if ref: embed.add_field(name="Infraction", value=f"Message ID: {ref}", inline=False) msg_snip = ( target_infraction.get("message_content") if target_infraction else None ) if msg_snip: embed.add_field( name="Message Snippet", value=f"`{msg_snip}`", inline=False, ) attachments = ( target_infraction.get("attachments") if target_infraction else None ) if attachments: attach_text = "\n".join(attachments) if len(attach_text) > 1024: attach_text = attach_text[:1021] + "..." embed.add_field( name="Attachments", value=attach_text, inline=False, ) embed.add_field(name="Appeal", value=reason, inline=False) embed.add_field(name="AI Review", value=ai_review[:1000], inline=False) embed.timestamp = discord.utils.utcnow() await log_channel.send(embed=embed) await interaction.response.send_message( "Your appeal has been submitted.", ephemeral=True ) @appeal_subgroup.command( name="list", description="View a user's appeals (mods only)." ) @app_commands.describe(user="The user to view appeals for") async def appeal_list(self, interaction: discord.Interaction, user: discord.Member): moderator_role_id = get_guild_config(interaction.guild.id, "MODERATOR_ROLE_ID") moderator_role = ( interaction.guild.get_role(moderator_role_id) if moderator_role_id else None ) has_permission = interaction.user.guild_permissions.administrator or ( moderator_role and moderator_role in interaction.user.roles ) if not has_permission: await interaction.response.send_message( "You must be an administrator or have the moderator role to use this command.", ephemeral=True, ) return appeals = get_user_appeals(interaction.guild.id, user.id) history = get_user_infraction_history(interaction.guild.id, user.id) if not appeals: await interaction.response.send_message( f"{user.mention} has no appeals.", ephemeral=False ) return embed = discord.Embed( title=f"Appeals for {user.display_name}", color=discord.Color.blue() ) for i, appeal in enumerate(appeals, 1): ts = appeal.get("timestamp", "?")[:19].replace("T", " ") summary = appeal.get("appeal_text", "") ai_sum = appeal.get("ai_review", "") if len(summary) > 150: summary = summary[:147] + "..." if len(ai_sum) > 150: ai_sum = ai_sum[:147] + "..." value = f"Action: {appeal.get('action')}\nReason: {summary}\nAI: {ai_sum}" ref = appeal.get("infraction_reference") if ref: value = f"Infraction: {ref}\n" + value for infr in history: if str(infr.get("message_id")) == str(ref): msg_snip = infr.get("message_content") if msg_snip: value += f"\nSnippet: {msg_snip}" attachments = infr.get("attachments") if attachments: attach_txt = ", ".join(attachments) if len(attach_txt) > 200: attach_txt = attach_txt[:197] + "..." value += f"\nAttachments: {attach_txt}" reason = infr.get("reasoning") if reason: if len(reason) > 150: reason = reason[:147] + "..." value += f"\nAI Reasoning: {reason}" break embed.add_field(name=f"Appeal #{i} - {ts}", value=value, inline=False) await interaction.response.send_message(embed=embed, ephemeral=False) @appeal_subgroup.command( name="testcases", description="Run sample appeals through the AI review system (admin only).", ) async def appeal_testcases(self, interaction: discord.Interaction): """Run a few hardcoded appeal scenarios through the AI with context.""" if not interaction.user.guild_permissions.administrator: await interaction.response.send_message( "You must be an administrator to use this command.", ephemeral=True, ) return await interaction.response.defer(thinking=True, ephemeral=True) scenarios = [ ("WARN", "I was excited and sent many messages quickly."), ("MUTE", "I only quoted a meme and it was taken as harassment."), ("BAN", "I posted NSFW art but believed it was allowed."), ] results: list[tuple[str, str]] = [] for idx, (action, text) in enumerate(scenarios, 1): dummy_infraction = { "message_id": 1000 + idx, "channel_id": interaction.channel.id, "message_content": f"Example offending message {idx}", "attachments": [], "reasoning": "Automated moderation reasoning sample", } result = await self.run_appeal_ai( interaction.guild, interaction.user, action, text, dummy_infraction ) results.append((action, result)) embed = discord.Embed( title="Appeal AI Test Results", color=discord.Color.green() ) for action, result in results: field_text = result if result else "No response" if len(field_text) > 1000: field_text = field_text[:997] + "..." embed.add_field(name=action, value=field_text, inline=False) await interaction.followup.send(embed=embed, ephemeral=True) @model_subgroup.command( name="set", description="Change the AI model used for moderation (admin only)." ) @app_commands.describe( model="The Vertex AI model to use (e.g., 'gemini-1.5-flash-001', 'gemini-1.0-pro')" ) async def modsetmodel(self, interaction: discord.Interaction, model: str): # Check if user has administrator permission if not interaction.user.guild_permissions.administrator: await interaction.response.send_message( "You must be an administrator to use this command.", ephemeral=True ) return # Validate the model name (basic validation for Vertex AI) # Vertex AI models usually don't have "/" like OpenRouter, but can have "-" and numbers. # Example: gemini-1.5-flash-001 if not model or len(model) < 5: # Basic check await interaction.response.send_message( "Invalid model format. Please provide a valid Vertex AI model ID (e.g., 'gemini-1.5-flash-001').", ephemeral=False, ) return # Save the model to guild configuration guild_id = interaction.guild.id await set_guild_config(guild_id, "AI_MODEL", model) # Note: There's no global model variable to update here like OPENROUTER_MODEL. # The cog will use the guild-specific config or the DEFAULT_VERTEX_AI_MODEL. await interaction.response.send_message( f"AI moderation model updated to `{model}` for this guild.", ephemeral=False ) # @modsetmodel.autocomplete('model') # Autocomplete removed as OpenRouter models are not used. # async def modsetmodel_autocomplete(...): # This function is now removed. @model_subgroup.command( name="get", description="View the current AI model used for moderation." ) async def modgetmodel(self, interaction: discord.Interaction): # Get the model from guild config, fall back to global default guild_id = interaction.guild.id model_used = get_guild_config(guild_id, "AI_MODEL", DEFAULT_VERTEX_AI_MODEL) # Create an embed to display the model information embed = discord.Embed( title="AI Moderation Model", description=f"The current AI model used for moderation in this server is:", color=discord.Color.blue(), ) embed.add_field(name="Model In Use", value=f"`{model_used}`", inline=False) embed.add_field( name="Default Model", value=f"`{DEFAULT_VERTEX_AI_MODEL}`", inline=False ) embed.set_footer(text="Use /aimod model set to change the model") await interaction.response.send_message(embed=embed, ephemeral=False) # --- Helper Function to Safely Extract Text from Vertex AI Response --- def _get_response_text( self, response: Optional[types.GenerateContentResponse] ) -> Optional[str]: """ Safely extracts the text content from the first text part of a GenerateContentResponse. Handles potential errors and lack of text parts gracefully. (Adapted from teto_cog.py) """ if not response: print("[AIModerationCog._get_response_text] Received None response object.") return None if ( hasattr(response, "text") and response.text ): # Some simpler responses might have .text directly print( "[AIModerationCog._get_response_text] Found text directly in response.text attribute." ) return response.text if not response.candidates: print( f"[AIModerationCog._get_response_text] Response object has no candidates. Response: {response}" ) return None try: candidate = response.candidates[0] if not hasattr(candidate, "content") or not candidate.content: print( f"[AIModerationCog._get_response_text] Candidate 0 has no 'content'. Candidate: {candidate}" ) return None if not hasattr(candidate.content, "parts") or not candidate.content.parts: print( f"[AIModerationCog._get_response_text] Candidate 0 content has no 'parts' or parts list is empty. types.Content: {candidate.content}" ) return None for i, part in enumerate(candidate.content.parts): if hasattr(part, "text") and part.text is not None: if isinstance(part.text, str) and part.text.strip(): print( f"[AIModerationCog._get_response_text] Found non-empty text in part {i}." ) return part.text else: print( f"[AIModerationCog._get_response_text] types.Part {i} has 'text' attribute, but it's empty or not a string: {part.text!r}" ) print( f"[AIModerationCog._get_response_text] No usable text part found in candidate 0 after iterating through all parts." ) return None except (AttributeError, IndexError, TypeError) as e: print( f"[AIModerationCog._get_response_text] Error accessing response structure: {type(e).__name__}: {e}" ) print(f"Problematic response object: {response}") return None except Exception as e: print( f"[AIModerationCog._get_response_text] Unexpected error extracting text: {e}" ) print(f"Response object during error: {response}") return None async def query_vertex_ai( self, message: discord.Message, message_content: str, user_history: str, image_data_list: Optional[List[Tuple[str, bytes, str, str]]] = None, ): """ Sends the message content, user history, and additional context to Google Vertex AI for analysis. Optionally includes image data for visual content moderation. Args: message: The original discord.Message object. message_content: The text content of the message. user_history: A string summarizing the user's past infractions. image_data_list: Optional list of tuples (mime_type, image_bytes, attachment_type, filename) for image moderation. Returns: A dictionary containing the AI's decision, or None if an error occurs. """ print( f"query_vertex_ai called. Vertex AI client available: {self.genai_client is not None}" ) if not self.genai_client: print("Error: Vertex AI Client is not available. Cannot query API.") return None # Construct the prompt for the AI model (system prompt is largely the same) system_prompt_text = ( "You are an AI moderation assistant for a Discord server.\n" "Your primary function is to analyze message content and attached media based STRICTLY on the server rules provided below, using all available context.\n\n" "Server Rules:\n" "---\n" f"{SERVER_RULES}\n" "---\n\n" f"{aimod_config_module.MODERATION_INSTRUCTIONS}" ) member = message.author # This is a discord.Member object server_role_str = "Unprivileged Member" # Default if member == await message.guild.fetch_member(message.guild.owner_id): server_role_str = "Server Owner" elif member.guild_permissions.administrator: server_role_str = "Admin" else: perms = member.guild_permissions if ( perms.manage_messages or perms.kick_members or perms.ban_members or perms.moderate_members ): server_role_str = "Moderator" print(f"role: {server_role_str}") # --- Fetch Replied-to Message --- replied_to_message_content = "N/A (Not a reply)" if message.reference and message.reference.message_id: try: replied_to_msg = await message.channel.fetch_message( message.reference.message_id ) replied_to_message_content = f"User '{replied_to_msg.author.name}' said: \"{replied_to_msg.content[:200]}\"" if len(replied_to_msg.content) > 200: replied_to_message_content += "..." except discord.NotFound: replied_to_message_content = "N/A (Replied-to message not found)" except discord.Forbidden: replied_to_message_content = ( "N/A (Cannot fetch replied-to message - permissions)" ) except Exception as e: replied_to_message_content = ( f"N/A (Error fetching replied-to message: {e})" ) # --- Fetch Recent Channel History --- recent_channel_history_str = "N/A (Could not fetch history)" try: history_messages = [] # Fetch last 11 messages (current + 10 previous). We'll filter out the current one async for prev_msg in message.channel.history(limit=11, before=message): if ( prev_msg.id != message.id ): # Ensure we don't include the current message itself author_name = ( prev_msg.author.name + " (BOT)" if prev_msg.author.bot else prev_msg.author.name ) history_messages.append( f"- {author_name}: \"{prev_msg.content[:150]}{'...' if len(prev_msg.content) > 150 else ''}\" (ID: {prev_msg.id})" ) if history_messages: # Reverse to show oldest first in the snippet, then take the last 10. recent_channel_history_str = "\n".join( list(reversed(history_messages))[:10] ) else: recent_channel_history_str = ( "No recent messages before this one in the channel." ) except discord.Forbidden: recent_channel_history_str = ( "N/A (Cannot fetch channel history - permissions)" ) except Exception as e: recent_channel_history_str = f"N/A (Error fetching channel history: {e})" # Prepare user prompt content list with proper OpenRouter format user_prompt_content_list = [] # Add the text context first user_context_text = f"""User Infraction History (for {message.author.name}, ID: {message.author.id}): --- {user_history if user_history else "No prior infractions recorded for this user in this guild."} --- Current Message Context: - Author: {message.author.name} (ID: {message.author.id}) - Server Role: {server_role_str} - Channel: #{message.channel.name} (ID: {message.channel.id}) - Channel Category: {message.channel.category.name if message.channel.category else "No Category"} - Channel Age-Restricted/NSFW (Discord Setting): {message.channel.is_nsfw()} --- Replied-to Message: {replied_to_message_content} --- Recent Channel History (last up to 10 messages before this one): {recent_channel_history_str} --- Message Content to Analyze: "{message_content}" Now, analyze the message content and any attached media based on the server rules and ALL the context provided above. Follow the JSON output format specified in the system prompt. CRITICAL: Do NOT output anything other than the required JSON response. """ # Add the text content first user_prompt_content_list.append({"type": "text", "text": user_context_text}) # Add images in the proper OpenRouter format if image_data_list and len(image_data_list) > 0: try: for i, (mime_type, image_bytes, attachment_type, filename) in enumerate( image_data_list ): try: # Encode image to base64 base64_image = base64.b64encode(image_bytes).decode("utf-8") # Create data URL image_data_url = f"data:{mime_type};base64,{base64_image}" # Add image in OpenRouter format user_prompt_content_list.append( {"type": "image_url", "image_url": {"url": image_data_url}} ) print( f"Added attachment #{i+1}: {filename} ({attachment_type}) to the prompt" ) except Exception as e: print( f"Error encoding image data for attachment {filename}: {e}" ) except Exception as e: print(f"Error processing image data: {e}") # Add a text note about the error user_prompt_content_list.append( { "type": "text", "text": f"Note: There were {len(image_data_list)} attached images, but they could not be processed for analysis.", } ) # Get guild-specific model if configured, otherwise use default member = message.author server_role_str = "Unprivileged Member" if member == await message.guild.fetch_member(message.guild.owner_id): server_role_str = "Server Owner" elif member.guild_permissions.administrator: server_role_str = "Admin" else: perms = member.guild_permissions if ( perms.manage_messages or perms.kick_members or perms.ban_members or perms.moderate_members ): server_role_str = "Moderator" replied_to_message_content = "N/A (Not a reply)" if message.reference and message.reference.message_id: try: replied_to_msg = await message.channel.fetch_message( message.reference.message_id ) replied_to_message_content = f"User '{replied_to_msg.author.name}' said: \"{replied_to_msg.content[:200]}{'...' if len(replied_to_msg.content) > 200 else ''}\"" except Exception as e: replied_to_message_content = f"N/A (Error fetching replied-to: {e})" recent_channel_history_str = "N/A (Could not fetch history)" try: history_messages = [ f"- {prev_msg.author.name}{' (BOT)' if prev_msg.author.bot else ''}: \"{prev_msg.content[:150]}{'...' if len(prev_msg.content) > 150 else ''}\" (ID: {prev_msg.id})" async for prev_msg in message.channel.history(limit=11, before=message) if prev_msg.id != message.id ] recent_channel_history_str = ( "\n".join(list(reversed(history_messages))[:10]) if history_messages else "No recent messages." ) except Exception as e: recent_channel_history_str = f"N/A (Error fetching history: {e})" user_context_text = f"""User Infraction History (for {message.author.name}, ID: {message.author.id}): --- {user_history if user_history else "No prior infractions recorded for this user in this guild."} --- Current Message Context: - Author: {message.author.name} (ID: {message.author.id}) - Server Role: {server_role_str} - Channel: #{message.channel.name} (ID: {message.channel.id}) - Channel Category: {message.channel.category.name if message.channel.category else "No Category"} - Channel Age-Restricted/NSFW (Discord Setting): {message.channel.is_nsfw()} --- Replied-to Message: {replied_to_message_content} --- Recent Channel History (last up to 10 messages before this one): {recent_channel_history_str} --- Message Content to Analyze: "{message_content}" Now, analyze the message content and any attached media based on the server rules and ALL the context provided above. Follow the JSON output format specified in the system prompt. CRITICAL: Do NOT output anything other than the required JSON response. """ # Prepare parts for Vertex AI vertex_parts: List[Any] = [types.Part(text=user_context_text)] if image_data_list: for mime_type, image_bytes, attachment_type, filename in image_data_list: try: # Vertex AI directly supports common image and video MIME types. # Ensure mime_type is one of the supported ones by Vertex, e.g., image/png or video/mp4. supported_image_mimes = [ "image/png", "image/jpeg", "image/webp", "image/heic", "image/heif", "image/gif", ] supported_video_mimes = [ "video/mp4", "video/webm", "video/quicktime", "video/x-msvideo", "video/x-matroska", "video/x-flv", ] clean_mime_type = mime_type.split(";")[0].lower() if ( clean_mime_type in supported_image_mimes or clean_mime_type in supported_video_mimes ): vertex_parts.append( types.Part( inline_data=types.Blob( data=image_bytes, mime_type=clean_mime_type, ) ) ) print( f"Added attachment {filename} ({attachment_type}) with MIME {clean_mime_type} to Vertex prompt" ) else: print( f"Skipping attachment {filename} due to unsupported MIME type for Vertex: {mime_type}" ) vertex_parts.append( types.Part( text=f"[System Note: Attachment '{filename}' of type '{mime_type}' was not processed as it's not directly supported for vision by the current model configuration.]" ) ) except Exception as e: print(f"Error processing attachment {filename} for Vertex AI: {e}") vertex_parts.append( types.Part( text=f"[System Note: Error processing attachment '{filename}'.]" ) ) # Get guild-specific model if configured, otherwise use default guild_id = message.guild.id model_id_to_use = get_guild_config( guild_id, "AI_MODEL", DEFAULT_VERTEX_AI_MODEL ) # Vertex model path is usually like "publishers/google/models/gemini-1.5-flash-001" # If model_id_to_use is just "gemini-1.5-flash-001", prepend "publishers/google/models/" if not model_id_to_use.startswith("publishers/google/models/"): model_path = f"publishers/google/models/{model_id_to_use}" else: model_path = model_id_to_use thinking_config = types.ThinkingConfig(thinking_budget=0) generation_config = types.GenerateContentConfig( temperature=0.2, max_output_tokens=2000, # Ensure enough for JSON safety_settings=STANDARD_SAFETY_SETTINGS, thinking_config=thinking_config, ) # Construct contents for Vertex AI API # System prompt is handled by the model's configuration or as the first message if not directly supported in GenerateContentConfig. # For Vertex AI with `genai.Client`, system prompt is often part of the model's configuration or the first message. # The `genai.GenerativeModel` has `system_instruction`. # Here, we'll build the `contents` list. # The system prompt is part of the model's understanding, and the user prompt contains the task. # For multi-turn, history is added to `contents`. Here, it's a single-turn request. request_contents = [ # System prompt can be the first message if not using system_instruction in model # types.Content(role="system", parts=[types.Part(text=system_prompt_text)]), # This is one way # Or, rely on the model's pre-set system prompt and just send user data. # For this moderation task, the detailed instructions are better sent as part of the user turn # or a specific system instruction if the client/model supports it well. # Let's include the system prompt as the first part of the user message for clarity with current structure. # The `system_prompt_text` is already defined and will be the primary text part. # The `user_context_text` is what we constructed. # The `vertex_parts` contains the `user_context_text` and any image data. types.Content(role="user", parts=vertex_parts) ] try: print(f"Querying Vertex AI model {model_path}...") # Prepare the generation config with system instruction # The existing 'generation_config' (lines 1063-1072) already has temperature, max_tokens, safety_settings. # We need to add system_instruction to it. final_generation_config = types.GenerateContentConfig( temperature=generation_config.temperature, # from existing config max_output_tokens=generation_config.max_output_tokens, # from existing config safety_settings=generation_config.safety_settings, # from existing config system_instruction=types.Content( role="system", parts=[types.Part(text=system_prompt_text)] ), thinking_config=generation_config.thinking_config, # from existing config # response_mime_type="application/json", # Consider if model supports this for forcing JSON ) client = get_genai_client_for_model(model_id_to_use) response = await client.aio.models.generate_content( model=model_path, # Correctly formatted model path contents=request_contents, # User's message with context and images config=final_generation_config, # Pass the config with system_instruction ) ai_response_content = self._get_response_text(response) print(response.usage_metadata) # Print usage metadata for debugging if not ai_response_content: print("Error: AI response content is empty or could not be extracted.") # Log safety ratings if available if ( response and response.candidates and response.candidates[0].safety_ratings ): ratings = ", ".join( [ f"{r.category.name}: {r.probability.name}" for r in response.candidates[0].safety_ratings ] ) print(f"Safety Ratings: {ratings}") if ( response and response.candidates and response.candidates[0].finish_reason ): print(f"Finish Reason: {response.candidates[0].finish_reason.name}") return None # Attempt to parse the JSON response from the AI try: # Clean potential markdown code blocks if ai_response_content.startswith("```json"): ai_response_content = ai_response_content.strip("```json\n").strip( "`\n " ) elif ai_response_content.startswith("```"): ai_response_content = ai_response_content.strip("```\n").strip( "`\n " ) ai_decision = json.loads(ai_response_content) # Basic validation of the parsed JSON structure if ( not isinstance(ai_decision, dict) or not all( k in ai_decision for k in ["violation", "rule_violated", "reasoning", "action"] ) or not isinstance(ai_decision.get("violation"), bool) ): print( f"Error: AI response missing expected keys or 'violation' is not bool. Response: {ai_response_content}" ) return None print(f"AI Analysis Received: {ai_decision}") return ai_decision except json.JSONDecodeError as e: print( f"Error: Could not decode JSON response from AI: {e}. Response: {ai_response_content}" ) return None except Exception as e: # Catch other parsing errors print( f"Error parsing AI response structure: {e}. Response: {ai_response_content}" ) return None except google_exceptions.GoogleAPICallError as e: print(f"Error calling Vertex AI API: {e}") return None except Exception as e: print( f"An unexpected error occurred during Vertex AI query for message {message.id}: {e}" ) return None async def handle_violation( self, message: discord.Message, ai_decision: dict, notify_mods_message: str = None, link_urls: list[str] | None = None, ): """ Takes action based on the AI's violation decision. Also transmits action info via HTTP POST with API key header. """ import datetime import aiohttp rule_violated = ai_decision.get("rule_violated", "Unknown") reasoning = ai_decision.get("reasoning", "No reasoning provided.") action = ai_decision.get( "action", "NOTIFY_MODS" ).upper() # Default to notify mods guild_id = message.guild.id # Get guild_id once user_id = message.author.id # Get user_id once moderator_role_id = get_guild_config(guild_id, "MODERATOR_ROLE_ID") moderator_role = ( message.guild.get_role(moderator_role_id) if moderator_role_id else None ) mod_ping = ( moderator_role.mention if moderator_role else f"Moderators (Role ID {moderator_role_id} not found)" ) current_timestamp_iso = datetime.datetime.now(datetime.timezone.utc).isoformat() # Get the model from guild config, fall back to global default model_used = get_guild_config(guild_id, "AI_MODEL", DEFAULT_VERTEX_AI_MODEL) # --- Adjust action for first-time offenses --- user_history_list = get_user_infraction_history(guild_id, user_id) if action == "BAN" and not user_history_list: combined_text = f"{rule_violated} {reasoning}".lower() severe = False if "gore" in combined_text: severe = True elif "csam" in combined_text: severe = True elif ( "pedophilia" in combined_text or "child" in combined_text or "5a" in combined_text or "5" in combined_text ): real_indicators = [ "real", "real-life", "real life", "irl", "photo", "photograph", "video", ] if any(indicator in combined_text for indicator in real_indicators): severe = True if not severe: print( "Downgrading BAN to TIMEOUT_LONG due to first offense and lack of severe content." ) action = "TIMEOUT_LONG" # --- Prepare Notification --- notification_embed = discord.Embed( title="🚨 Rule Violation Detected 🚨", description=f"AI analysis detected a violation of server rules.", color=discord.Color.red(), ) notification_embed.add_field( name="User", value=f"{message.author.mention} (`{message.author.id}`)", inline=False, ) notification_embed.add_field( name="Channel", value=message.channel.mention, inline=False ) notification_embed.add_field( name="Rule Violated", value=f"**Rule {rule_violated}**", inline=True ) notification_embed.add_field( name="AI Suggested Action", value=f"`{action}`", inline=True ) notification_embed.add_field( name="AI Reasoning", value=f"_{reasoning}_", inline=False ) notification_embed.add_field( name="Message Link", value=f"[Jump to Message]({message.jump_url})", inline=False, ) # Log message content and attachments for audit purposes msg_content = message.content if message.content else "*No text content*" notification_embed.add_field( name="Message Content", value=msg_content[:1024], inline=False ) # Add attachment information if present if message.attachments: attachment_info = [] for i, attachment in enumerate(message.attachments): attachment_info.append( f"{i+1}. {attachment.filename} ({attachment.content_type}) - [Link]({attachment.url})" ) attachment_text = "\n".join(attachment_info) notification_embed.add_field( name="Attachments", value=attachment_text[:1024], inline=False ) # Add the first image as a thumbnail if it's an image type for attachment in message.attachments: if any( attachment.filename.lower().endswith(ext) for ext in self.image_extensions + self.gif_extensions + self.video_extensions ): notification_embed.set_thumbnail(url=attachment.url) break # Use the model_used variable that was defined earlier notification_embed.set_footer( text=f"AI Model: {model_used}. Learnhelp AI Moderation." ) notification_embed.timestamp = ( discord.utils.utcnow() ) # Using discord.utils.utcnow() which is still supported action_taken_message = "" # To append to the notification testing_mode = get_guild_config(guild_id, "TESTING_MODE", False) if testing_mode: action_taken_message = ( f"[TEST MODE] Would have taken action `{action}`. No changes made." ) notification_embed.color = discord.Color.greyple() log_channel_id = get_guild_config(message.guild.id, "MOD_LOG_CHANNEL_ID") log_channel = ( self.bot.get_channel(log_channel_id) if log_channel_id else message.channel ) if action == "SUICIDAL": suicidal_role_id = get_guild_config( message.guild.id, "SUICIDAL_PING_ROLE_ID" ) suicidal_role = ( message.guild.get_role(suicidal_role_id) if suicidal_role_id else None ) ping_target = ( suicidal_role.mention if suicidal_role else f"Role ID {suicidal_role_id} (Suicidal Content)" ) if not suicidal_role: print(f"ERROR: Suicidal ping role ID {suicidal_role_id} not found.") final_message = f"{ping_target}\n{action_taken_message}" else: suggestions_id = get_guild_config( message.guild.id, "SUGGESTIONS_CHANNEL_ID" ) suggestion_note = ( f"\nPlease review <#{suggestions_id}> for rule updates." if suggestions_id else "" ) final_message = f"{mod_ping}\n{action_taken_message}{suggestion_note}" await log_channel.send( content=final_message, embed=notification_embed, view=self.QuickActionView(self, message.author), ) return # --- Perform Actions --- try: if action == "BAN": action_taken_message = ( f"Action Taken: User **BANNED** and message deleted." ) notification_embed.color = discord.Color.dark_red() try: await message.delete() except discord.NotFound: print("Message already deleted before banning.") except discord.Forbidden: print( f"WARNING: Missing permissions to delete message before banning user {message.author}." ) action_taken_message += ( " (Failed to delete message - check permissions)" ) ban_reason = f"AI Mod: Rule {rule_violated}. Reason: {reasoning}" await message.guild.ban( message.author, reason=ban_reason, delete_message_days=1 ) print( f"BANNED user {message.author} for violating rule {rule_violated}." ) await add_user_infraction( guild_id, user_id, rule_violated, "BAN", reasoning, current_timestamp_iso, message.id, message.channel.id, message.content[:100] if message.content else "", [a.url for a in message.attachments] + (link_urls or []), ) elif action == "KICK": action_taken_message = ( f"Action Taken: User **KICKED** and message deleted." ) notification_embed.color = discord.Color.from_rgb( 255, 127, 0 ) # Dark Orange try: await message.delete() except discord.NotFound: print("Message already deleted before kicking.") except discord.Forbidden: print( f"WARNING: Missing permissions to delete message before kicking user {message.author}." ) action_taken_message += ( " (Failed to delete message - check permissions)" ) kick_reason = f"AI Mod: Rule {rule_violated}. Reason: {reasoning}" await message.author.kick(reason=kick_reason) print( f"KICKED user {message.author} for violating rule {rule_violated}." ) await add_user_infraction( guild_id, user_id, rule_violated, "KICK", reasoning, current_timestamp_iso, message.id, message.channel.id, message.content[:100] if message.content else "", [a.url for a in message.attachments] + (link_urls or []), ) elif action.startswith("TIMEOUT"): duration_seconds = 0 duration_readable = "" if action == "TIMEOUT_SHORT": duration_seconds = 10 * 60 # 10 minutes duration_readable = "10 minutes" elif action == "TIMEOUT_MEDIUM": duration_seconds = 60 * 60 # 1 hour duration_readable = "1 hour" elif action == "TIMEOUT_LONG": duration_seconds = 24 * 60 * 60 # 1 day duration_readable = "1 day" if duration_seconds > 0: action_taken_message = f"Action Taken: User **TIMED OUT for {duration_readable}** and message deleted." notification_embed.color = discord.Color.blue() try: await message.delete() except discord.NotFound: print( f"Message already deleted before timeout for {message.author}." ) except discord.Forbidden: print( f"WARNING: Missing permissions to delete message before timeout for {message.author}." ) action_taken_message += ( " (Failed to delete message - check permissions)" ) timeout_reason = ( f"AI Mod: Rule {rule_violated}. Reason: {reasoning}" ) # discord.py timeout takes a timedelta object await message.author.timeout( discord.utils.utcnow() + datetime.timedelta(seconds=duration_seconds), reason=timeout_reason, ) print( f"TIMED OUT user {message.author} for {duration_readable} for violating rule {rule_violated}." ) await add_user_infraction( guild_id, user_id, rule_violated, action, reasoning, current_timestamp_iso, message.id, message.channel.id, message.content[:100] if message.content else "", [a.url for a in message.attachments] + (link_urls or []), ) else: action_taken_message = ( "Action Taken: **Unknown timeout duration, notifying mods.**" ) action = ( "NOTIFY_MODS" # Fallback if timeout duration is not recognized ) print( f"Unknown timeout duration for action {action}. Defaulting to NOTIFY_MODS." ) elif action == "DELETE": action_taken_message = f"Action Taken: Message **DELETED**." await message.delete() print( f"DELETED message from {message.author} for violating rule {rule_violated}." ) # Typically, a simple delete isn't a formal infraction unless it's part of a WARN. # If you want to log deletes as infractions, add: # add_user_infraction(guild_id, user_id, rule_violated, "DELETE", reasoning, current_timestamp_iso) elif action == "WARN": action_taken_message = ( f"Action Taken: Message **DELETED** (AI suggested WARN)." ) notification_embed.color = discord.Color.orange() await message.delete() # Warnings usually involve deleting the offending message print( f"DELETED message from {message.author} (AI suggested WARN for rule {rule_violated})." ) try: dm_channel = await message.author.create_dm() warn_embed = discord.Embed( title="⚠️ Moderation Warning", description=( f"Your recent message in **{message.guild.name}** was removed for violating **Rule {rule_violated}**." ), color=discord.Color.orange(), ) if message.content: warn_embed.add_field( name="Message Content", value=message.content[:1024], inline=False, ) warn_embed.add_field(name="Reason", value=reasoning, inline=False) warn_embed.set_footer( text="Please review the server rules. This is a formal warning." ) await dm_channel.send(embed=warn_embed) action_taken_message += " User notified via DM with warning." except discord.Forbidden: print( f"Could not DM warning to {message.author} (DMs likely disabled)." ) action_taken_message += " (Could not DM user for warning)." except Exception as e: print(f"Error sending warning DM to {message.author}: {e}") action_taken_message += " (Error sending warning DM)." await add_user_infraction( guild_id, user_id, rule_violated, "WARN", reasoning, current_timestamp_iso, message.id, message.channel.id, message.content[:100] if message.content else "", [a.url for a in message.attachments] + (link_urls or []), ) elif action == "NOTIFY_MODS": action_taken_message = "Action Taken: **Moderator review requested.**" notification_embed.color = discord.Color.gold() print( f"Notifying moderators about potential violation (Rule {rule_violated}) by {message.author}." ) # NOTIFY_MODS itself isn't an infraction on the user, but a request for human review. # If mods take action, they would log it manually or via a mod command. if notify_mods_message: notification_embed.add_field( name="Additional Mod Message", value=notify_mods_message, inline=False, ) elif action == "SUICIDAL": action_taken_message = ( "Action Taken: **User DMed resources, relevant role notified.**" ) # No infraction is typically logged for "SUICIDAL" as it's a support action. notification_embed.title = "🚨 Suicidal Content Detected 🚨" notification_embed.color = ( discord.Color.dark_purple() ) # A distinct color notification_embed.description = "AI analysis detected content indicating potential suicidal ideation." print( f"SUICIDAL content detected from {message.author}. DMing resources and notifying role." ) # DM the user with help resources try: dm_channel = await message.author.create_dm() await dm_channel.send(SUICIDAL_HELP_RESOURCES) action_taken_message += " User successfully DMed." except discord.Forbidden: print( f"Could not DM suicidal help resources to {message.author} (DMs likely disabled)." ) action_taken_message += " (Could not DM user - DMs disabled)." except Exception as e: print( f"Error sending suicidal help resources DM to {message.author}: {e}" ) action_taken_message += f" (Error DMing user: {e})." # The message itself is usually not deleted for suicidal content, to allow for intervention. # If deletion is desired, add: await message.delete() here. else: # Includes "IGNORE" or unexpected actions if ai_decision.get( "violation" ): # If violation is true but action is IGNORE action_taken_message = "Action Taken: **None** (AI suggested IGNORE despite flagging violation - Review Recommended)." notification_embed.color = discord.Color.light_grey() print( f"AI flagged violation ({rule_violated}) but suggested IGNORE for message by {message.author}. Notifying mods for review." ) else: # This case shouldn't be reached if called correctly, but handle defensively print( f"No action taken for message by {message.author} (AI Action: {action}, Violation: False)" ) return # Don't notify if no violation and action is IGNORE # --- Send Notification to Moderators/Relevant Role --- log_channel_id = get_guild_config(message.guild.id, "MOD_LOG_CHANNEL_ID") log_channel = ( self.bot.get_channel(log_channel_id) if log_channel_id else None ) if not log_channel: print( f"ERROR: Moderation log channel (ID: {log_channel_id}) not found or not configured. Defaulting to message channel." ) log_channel = message.channel if not log_channel: print( f"ERROR: Could not find even the original message channel {message.channel.id} to send notification." ) return if action == "SUICIDAL": suicidal_role_id = get_guild_config( message.guild.id, "SUICIDAL_PING_ROLE_ID" ) suicidal_role = ( message.guild.get_role(suicidal_role_id) if suicidal_role_id else None ) ping_target = ( suicidal_role.mention if suicidal_role else f"Role ID {suicidal_role_id} (Suicidal Content)" ) if not suicidal_role: print(f"ERROR: Suicidal ping role ID {suicidal_role_id} not found.") final_message = f"{ping_target}\n{action_taken_message}" await log_channel.send( content=final_message, embed=notification_embed, view=self.QuickActionView(self, message.author), ) elif moderator_role: # For other violations suggestions_id = get_guild_config( message.guild.id, "SUGGESTIONS_CHANNEL_ID" ) suggestion_note = ( f"\nPlease review <#{suggestions_id}> for rule updates." if suggestions_id else "" ) final_message = f"{mod_ping}\n{action_taken_message}{suggestion_note}" await log_channel.send( content=final_message, embed=notification_embed, view=self.QuickActionView(self, message.author), ) else: # Fallback if moderator role is also not found for non-suicidal actions print( f"ERROR: Moderator role ID {moderator_role_id} not found for action {action}." ) except discord.Forbidden as e: print( f"ERROR: Missing Permissions to perform action '{action}' for rule {rule_violated}. Details: {e}" ) # Try to notify mods about the failure if moderator_role: try: await message.channel.send( f"{mod_ping} **PERMISSION ERROR!** Could not perform action `{action}` on message by {message.author.mention} " f"for violating Rule {rule_violated}. Please check bot permissions.\n" f"Reasoning: _{reasoning}_\nMessage Link: {message.jump_url}" ) except discord.Forbidden: print( "FATAL: Bot lacks permission to send messages, even error notifications." ) except discord.NotFound: print( f"Message {message.id} was likely already deleted when trying to perform action '{action}'." ) except Exception as e: print( f"An unexpected error occurred during action execution for message {message.id}: {e}" ) # Try to notify mods about the unexpected error if moderator_role: try: await message.channel.send( f"{mod_ping} **UNEXPECTED ERROR!** An error occurred while handling rule violation " f"for {message.author.mention}. Please check bot logs.\n" f"Rule: {rule_violated}, Action Attempted: {action}\nMessage Link: {message.jump_url}" ) except discord.Forbidden: print( "FATAL: Bot lacks permission to send messages, even error notifications." ) async def run_appeal_ai( self, guild: discord.Guild, member: discord.User, action: str, appeal_text: str, infraction: dict | None = None, ) -> str: """Run the appeal text through the higher tier AI model.""" if not self.genai_client: return "AI review unavailable." history = get_user_infraction_history(guild.id, member.id) history_text = json.dumps(history, indent=2) if history else "None" system_prompt = ( "You are reviewing a user's appeal of a moderation action. " "Think very extensively about the appeal, the provided history, and the server rules. " "Return a short verdict (UPHOLD or OVERTURN) and your reasoning in plain text." ) context_lines = [] if infraction: channel_name = guild.get_channel(infraction.get("channel_id", 0)) channel_display = ( channel_name.name if channel_name else str(infraction.get("channel_id")) ) context_lines.append(f"Original Channel: {channel_display}") msg_content = infraction.get("message_content") if msg_content: context_lines.append(f"Message Snippet: {msg_content}") attachments = infraction.get("attachments") if attachments: context_lines.append(f"Attachments: {attachments}") reasoning = infraction.get("reasoning") if reasoning: context_lines.append(f"AI Reasoning: {reasoning}") user_prompt = ( f"Server Rules:\n{SERVER_RULES}\n\n" f"User History:\n{history_text}\n\n" + ("\n".join(context_lines) + "\n\n" if context_lines else "") + f"Action Appealed: {action}\n" f"Appeal Text: {appeal_text}" ) generation_config = types.GenerateContentConfig( temperature=0.2, max_output_tokens=8192, safety_settings=STANDARD_SAFETY_SETTINGS, thinking_config=types.ThinkingConfig( thinking_budget=APPEAL_AI_THINKING_BUDGET ), system_instruction=types.Content( role="system", parts=[types.Part(text=system_prompt)] ), ) try: client = get_genai_client_for_model(APPEAL_AI_MODEL) response = await client.aio.models.generate_content( model=f"publishers/google/models/{APPEAL_AI_MODEL}", contents=[ types.Content(role="user", parts=[types.Part(text=user_prompt)]) ], config=generation_config, ) result = self._get_response_text(response) return result or "AI review failed to produce output." except Exception as e: # noqa: BLE001 print(f"Appeal AI error: {e}") return "AI review encountered an error." async def _moderate_message( self, message: discord.Message, event_name: str ) -> None: """Run moderation checks on a message.""" print(f"{event_name} triggered for message ID: {message.id}") # --- Basic Checks --- # Ignore messages from bots (including self) if message.author.bot: print(f"Ignoring message {message.id} from bot.") return link_urls: list[str] = [] embed_urls = [embed.url for embed in message.embeds if embed.url] link_urls = ( self.extract_direct_attachment_urls(" ".join(embed_urls)) if embed_urls else [] ) # Ignore messages without content, attachments, or direct attachment links if not message.content and not message.attachments and not link_urls: print(f"Ignoring message {message.id} with no content or attachments.") return # Ignore DMs if not message.guild: print(f"Ignoring message {message.id} from DM.") return # Check if moderation is enabled for this guild if not get_guild_config(message.guild.id, "ENABLED", False): print( f"Moderation disabled for guild {message.guild.id}. Ignoring message {message.id}." ) return if get_guild_config(message.guild.id, "EVENT_MODE", False): print( f"Event mode enabled for guild {message.guild.id}. Ignoring message {message.id}." ) return # --- Suicidal Content Check --- # Suicidal keyword check removed; handled by OpenRouter AI moderation. # --- Prepare for AI Analysis --- message_content = message.content # Check for attachments image_data_list = [] if message.attachments: # Process all attachments for attachment in message.attachments: mime_type, image_bytes, attachment_type = await self.process_attachment( attachment ) if mime_type and image_bytes and attachment_type: image_data_list.append( (mime_type, image_bytes, attachment_type, attachment.filename) ) print( f"Processed attachment: {attachment.filename} as {attachment_type}" ) # Log the number of attachments processed if image_data_list: print( f"Processed {len(image_data_list)} attachments for message {message.id}" ) # Check for direct link attachments in the message content if link_urls: processed_links = 0 for url in link_urls: mime_type, image_bytes, attachment_type, filename = ( await self.process_url_attachment(url) ) if mime_type and image_bytes and attachment_type: image_data_list.append( (mime_type, image_bytes, attachment_type, filename) ) processed_links += 1 print( f"Processed linked attachment: {filename} as {attachment_type}" ) if processed_links > 0: print( f"Processed {processed_links} linked attachments for message {message.id}" ) # Only proceed with AI analysis if there's text to analyze or attachments if not message_content and not image_data_list: print( f"Ignoring message {message.id} with no content or valid attachments." ) return # NSFW channel check removed - AI will handle this context # --- Call AI for Analysis (All Rules) --- # Check if the Vertex AI client is available if not self.genai_client: print( f"Skipping AI analysis for message {message.id}: Vertex AI client is not initialized." ) return # Prepare user history for the AI infractions = get_user_infraction_history(message.guild.id, message.author.id) history_summary_parts = [] if infractions: for infr in infractions: history_summary_parts.append( f"- Action: {infr.get('action_taken', 'N/A')} for Rule {infr.get('rule_violated', 'N/A')} on {infr.get('timestamp', 'N/A')[:10]}. Reason: {infr.get('reasoning', 'N/A')[:50]}..." ) user_history_summary = ( "\n".join(history_summary_parts) if history_summary_parts else "No prior infractions recorded." ) # Limit history summary length to prevent excessively long prompts max_history_len = 500 if len(user_history_summary) > max_history_len: user_history_summary = user_history_summary[: max_history_len - 3] + "..." print( f"Analyzing message {message.id} from {message.author} in #{message.channel.name} with history..." ) if image_data_list: attachment_types = [data[2] for data in image_data_list] print( f"Including {len(image_data_list)} attachments in analysis: {', '.join(attachment_types)}" ) ai_decision = await self.query_vertex_ai( message, message_content, user_history_summary, image_data_list ) # --- Process AI Decision --- if not ai_decision: print(f"Failed to get valid AI decision for message {message.id}.") # Optionally notify mods about AI failure if it happens often # Store the failure attempt for debugging self.last_ai_decisions.append( { "message_id": message.id, "author_name": str(message.author), "author_id": message.author.id, "message_content_snippet": ( message.content[:100] + "..." if len(message.content) > 100 else message.content ), "timestamp": datetime.datetime.now( datetime.timezone.utc ).isoformat(), "ai_decision": { "error": "Failed to get valid AI decision", "raw_response": None, }, # Simplified error logging } ) return # Stop if AI fails or returns invalid data # Store the AI decision regardless of violation status self.last_ai_decisions.append( { "message_id": message.id, "author_name": str(message.author), "author_id": message.author.id, "message_content_snippet": ( message.content[:100] + "..." if len(message.content) > 100 else message.content ), "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(), "ai_decision": ai_decision, } ) # Check if the AI flagged a violation if ai_decision.get("violation"): # Handle the violation based on AI decision without overrides # Pass notify_mods_message if the action is NOTIFY_MODS notify_mods_message = ( ai_decision.get("notify_mods_message") if ai_decision.get("action") == "NOTIFY_MODS" else None ) await self.handle_violation( message, ai_decision, notify_mods_message, link_urls ) else: # AI found no violation print( f"AI analysis complete for message {message.id}. No violation detected." ) @commands.Cog.listener(name="on_message") async def message_listener(self, message: discord.Message) -> None: """Trigger moderation when a new message is sent.""" await self._moderate_message(message, "on_message") @commands.Cog.listener(name="on_message_edit") async def message_edit_listener( self, before: discord.Message, after: discord.Message ) -> None: """Trigger moderation when a message is edited.""" await self._moderate_message(after, "on_message_edit") @debug_subgroup.command( name="last_decisions", description="View the last 5 AI moderation decisions (admin only).", ) @app_commands.checks.has_permissions(administrator=True) async def aidebug_last_decisions(self, interaction: discord.Interaction): if not self.last_ai_decisions: await interaction.response.send_message( "No AI decisions have been recorded yet.", ephemeral=True ) return embed = discord.Embed( title="Last 5 AI Moderation Decisions", color=discord.Color.purple() ) embed.timestamp = discord.utils.utcnow() for i, record in enumerate( reversed(list(self.last_ai_decisions)) ): # Show newest first decision_info = record.get("ai_decision", {}) violation = decision_info.get("violation", "N/A") rule_violated = decision_info.get("rule_violated", "N/A") reasoning = decision_info.get("reasoning", "N/A") action = decision_info.get("action", "N/A") error_msg = decision_info.get("error") field_value = ( f"**Author:** {record.get('author_name', 'N/A')} ({record.get('author_id', 'N/A')})\n" f"**Message ID:** {record.get('message_id', 'N/A')}\n" f"**Content Snippet:** ```{record.get('message_content_snippet', 'N/A')}```\n" f"**Timestamp:** {record.get('timestamp', 'N/A')[:19].replace('T', ' ')}\n" ) if error_msg: field_value += f"**Status:** Error during processing: {error_msg}\n" else: field_value += ( f"**Violation:** {violation}\n" f"**Rule Violated:** {rule_violated}\n" f"**Action:** {action}\n" f"**Reasoning:** ```{reasoning}```\n" ) # Truncate field_value if it's too long for an embed field if len(field_value) > 1024: field_value = field_value[:1020] + "..." embed.add_field( name=f"Decision #{len(self.last_ai_decisions) - i}", value=field_value, inline=False, ) if ( len(embed.fields) >= 5 ): # Limit to 5 fields in one embed for very long entries, or send multiple embeds break if not embed.fields: # Should not happen if self.last_ai_decisions is not empty await interaction.response.send_message( "Could not format AI decisions.", ephemeral=True ) return await interaction.followup.send(embed=embed, ephemeral=True) @aidebug_last_decisions.error async def aidebug_last_decisions_error( self, interaction: discord.Interaction, error: app_commands.AppCommandError ): if isinstance(error, app_commands.MissingPermissions): await interaction.response.send_message( "You must be an administrator to use this command.", ephemeral=True ) else: await interaction.response.send_message( f"An error occurred: {error}", ephemeral=True ) print(f"Error in aidebug_last_decisions command: {error}") @debug_subgroup.command( name="appeal_tests", description="Run sample appeals through the AI reviewer (admin only).", ) @app_commands.checks.has_permissions(administrator=True) async def aidebug_appeal_tests(self, interaction: discord.Interaction): """Run a few hardcoded appeals through the appeal AI for testing.""" await interaction.response.defer(thinking=True, ephemeral=True) scenarios = [ ("WARN", "I was excited and sent many messages quickly."), ("MUTE", "I only quoted a meme and it was taken as harassment."), ("BAN", "I posted NSFW art but believed it was allowed."), ] results = [] guild = interaction.guild for idx, (action, text) in enumerate(scenarios, 1): dummy_infraction = { "message_id": 2000 + idx, "channel_id": interaction.channel.id, "message_content": f"Test offending message {idx}", "attachments": [], "reasoning": "Initial moderation reasoning sample", } review = await self.run_appeal_ai( guild, interaction.user, action, text, dummy_infraction ) results.append((action, text, review)) embed = discord.Embed( title="Appeal AI Test Results", color=discord.Color.green() ) embed.timestamp = discord.utils.utcnow() for idx, (action, text, review) in enumerate(results, 1): value = ( f"**Action:** {action}\n**Appeal:** {text}\n**AI Verdict:** {review}" ) if len(value) > 1024: value = value[:1020] + "..." embed.add_field(name=f"Scenario {idx}", value=value, inline=False) await interaction.followup.send(embed=embed, ephemeral=True) # Setup function required by discord.py to load the cog async def setup(bot: commands.Bot): """Loads the AIModerationCog.""" # The API key is now fetched in cog_load, so we don't need to check here. await bot.add_cog(AIModerationCog(bot)) print("AIModerationCog has been loaded.")