discordbot/cogs/aimod_cog.py
2025-06-08 20:31:51 -06:00

2835 lines
125 KiB
Python

# moderation_cog.py
import discord
from discord.ext import commands
from discord import app_commands
# import aiohttp # For making asynchronous HTTP requests - Replaced by Google GenAI client
import json
import os # To load environment variables
import collections # For deque
import datetime # For timestamps
import io # For BytesIO operations
import base64 # For encoding images to base64
from PIL import Image # For image processing
import cv2 # For video processing
import numpy as np # For array operations
import tempfile # For temporary file operations
import shutil # For backing up files
from typing import Optional, List, Dict, Any, Tuple # For type hinting
import asyncio
import aiofiles
import re
# Google Generative AI Imports (using Vertex AI backend)
from google import genai
from google.genai import types
from google.api_core import exceptions as google_exceptions
# Import project configuration for Vertex AI
from gurt.config import (
PROJECT_ID,
LOCATION,
) # Assuming gurt.config exists and has these
from gurt.genai_client import get_genai_client_for_model
from . import aimod_config as aimod_config_module
from .aimod_config import (
DEFAULT_VERTEX_AI_MODEL,
STANDARD_SAFETY_SETTINGS,
GUILD_CONFIG_PATH,
USER_INFRACTIONS_PATH,
INFRACTION_BACKUP_DIR,
USER_APPEALS_PATH,
APPEAL_AI_MODEL,
APPEAL_AI_THINKING_BUDGET,
CONFIG_LOCK,
save_user_infractions,
save_user_appeals,
get_guild_config,
set_guild_config,
get_user_infraction_history,
add_user_infraction,
get_user_appeals,
add_user_appeal,
SERVER_RULES,
MODERATION_INSTRUCTIONS,
SUICIDAL_HELP_RESOURCES,
)
# Avoid loading an excessive number of messages when updating rules
MAX_RULE_MESSAGES = 25
class AIModerationCog(commands.Cog):
"""
A Discord Cog that uses Google Vertex AI to moderate messages based on server rules.
"""
def __init__(self, bot: commands.Bot):
self.bot = bot
self.genai_client = None
try:
if PROJECT_ID and LOCATION:
self.genai_client = genai.Client(
vertexai=True,
project=PROJECT_ID,
location=LOCATION,
)
print(
f"AIModerationCog: Google GenAI Client initialized for Vertex AI project '{PROJECT_ID}' in location '{LOCATION}'."
)
else:
print(
"AIModerationCog: PROJECT_ID or LOCATION not found in config. Google GenAI Client not initialized."
)
except Exception as e:
print(
f"AIModerationCog: Error initializing Google GenAI Client for Vertex AI: {e}"
)
self.last_ai_decisions = collections.deque(
maxlen=5
) # Store last 5 AI decisions
self.config_lock = CONFIG_LOCK
# Supported image file extensions
self.image_extensions = [
".jpg",
".jpeg",
".png",
".webp",
".bmp",
".heic",
".heif",
] # Added heic/heif for Vertex
# Supported animated file extensions
self.gif_extensions = [".gif"]
# Supported video file extensions (Vertex AI can process short video clips directly)
self.video_extensions = [
".mp4",
".webm",
".mov",
".avi",
".mkv",
".flv",
] # Expanded list
self.backup_task = self.bot.loop.create_task(
self.backup_infractions_periodically()
)
print("AIModerationCog Initialized.")
def is_testing_mode(self, guild_id: int) -> bool:
"""Return True if testing mode is enabled for the guild."""
return get_guild_config(guild_id, "TESTING_MODE", False)
class QuickActionView(discord.ui.View):
"""Buttons for quick moderator actions."""
def __init__(self, parent: "AIModerationCog", target: discord.Member):
super().__init__(timeout=3600)
self.parent = parent
self.target = target
self.message: discord.Message | None = None
# --- Helper Modals ---
class BanModal(discord.ui.Modal, title="Ban User"):
reason = discord.ui.TextInput(
label="Reason",
placeholder="Reason for ban",
style=discord.TextStyle.paragraph,
required=False,
max_length=512,
)
def __init__(self, view: "AIModerationCog.QuickActionView"):
super().__init__()
self.view = view
async def on_submit(self, interaction: discord.Interaction):
if not interaction.user.guild_permissions.ban_members:
await interaction.response.send_message(
"You lack permission to ban members.", ephemeral=True
)
return
if self.view.parent.is_testing_mode(interaction.guild.id):
await interaction.response.send_message(
f"[TEST MODE] Would ban {self.view.target.mention}.",
ephemeral=True,
)
return
try:
await self.view.target.ban(
reason=self.reason.value or "Escalated via mod panel"
)
await interaction.response.send_message(
f"Banned {self.view.target.mention}.", ephemeral=True
)
except Exception as e: # noqa: BLE001
await interaction.response.send_message(
f"Failed to ban: {e}", ephemeral=True
)
self.view.disable_all_items()
if self.view.message:
await self.view.message.edit(view=self.view)
class KickModal(discord.ui.Modal, title="Kick User"):
reason = discord.ui.TextInput(
label="Reason",
placeholder="Reason for kick",
style=discord.TextStyle.paragraph,
required=False,
max_length=512,
)
def __init__(self, view: "AIModerationCog.QuickActionView"):
super().__init__()
self.view = view
async def on_submit(self, interaction: discord.Interaction):
if not interaction.user.guild_permissions.kick_members:
await interaction.response.send_message(
"You lack permission to kick members.", ephemeral=True
)
return
if self.view.parent.is_testing_mode(interaction.guild.id):
await interaction.response.send_message(
f"[TEST MODE] Would kick {self.view.target.mention}.",
ephemeral=True,
)
return
try:
await self.view.target.kick(
reason=self.reason.value or "Escalated via mod panel"
)
await interaction.response.send_message(
f"Kicked {self.view.target.mention}.", ephemeral=True
)
except Exception as e: # noqa: BLE001
await interaction.response.send_message(
f"Failed to kick: {e}", ephemeral=True
)
self.view.disable_all_items()
if self.view.message:
await self.view.message.edit(view=self.view)
class TimeoutModal(discord.ui.Modal, title="Timeout User"):
duration = discord.ui.TextInput(
label="Duration",
placeholder="e.g. 10m, 1h, 1d",
required=True,
max_length=10,
)
reason = discord.ui.TextInput(
label="Reason",
placeholder="Reason for timeout",
style=discord.TextStyle.paragraph,
required=False,
max_length=512,
)
def __init__(self, view: "AIModerationCog.QuickActionView"):
super().__init__()
self.view = view
@staticmethod
def parse_duration(duration_str: str) -> datetime.timedelta | None:
if not duration_str:
return None
try:
amount = int("".join(filter(str.isdigit, duration_str)))
unit = "".join(filter(str.isalpha, duration_str)).lower()
if unit in {"d", "day", "days"}:
return datetime.timedelta(days=amount)
if unit in {"h", "hour", "hours"}:
return datetime.timedelta(hours=amount)
if unit in {"m", "min", "minute", "minutes"}:
return datetime.timedelta(minutes=amount)
if unit in {"s", "sec", "second", "seconds"}:
return datetime.timedelta(seconds=amount)
except (ValueError, TypeError):
return None
return None
async def on_submit(self, interaction: discord.Interaction):
if not interaction.user.guild_permissions.moderate_members:
await interaction.response.send_message(
"You lack permission to timeout members.", ephemeral=True
)
return
if self.view.parent.is_testing_mode(interaction.guild.id):
await interaction.response.send_message(
f"[TEST MODE] Would timeout {self.view.target.mention} for {self.duration.value}.",
ephemeral=True,
)
return
delta = self.parse_duration(self.duration.value)
if not delta or delta > datetime.timedelta(days=28):
await interaction.response.send_message(
"Invalid duration. Use formats like '10m', '1h', '1d'",
ephemeral=True,
)
return
try:
until = discord.utils.utcnow() + delta
await self.view.target.timeout(
until, reason=self.reason.value or "Escalated via mod panel"
)
await interaction.response.send_message(
f"Timed out {self.view.target.mention} for {self.duration.value}.",
ephemeral=True,
)
except Exception as e: # noqa: BLE001
await interaction.response.send_message(
f"Failed to timeout: {e}", ephemeral=True
)
self.view.disable_all_items()
if self.view.message:
await self.view.message.edit(view=self.view)
@discord.ui.button(label="Escalate Ban", style=discord.ButtonStyle.danger)
async def escalate(
self, interaction: discord.Interaction, button: discord.ui.Button
):
if not interaction.user.guild_permissions.ban_members:
await interaction.response.send_message(
"You lack permission to ban members.", ephemeral=True
)
return
self.message = interaction.message
await interaction.response.send_modal(self.BanModal(self))
@discord.ui.button(label="Kick", style=discord.ButtonStyle.primary)
async def kick(
self, interaction: discord.Interaction, button: discord.ui.Button
):
if not interaction.user.guild_permissions.kick_members:
await interaction.response.send_message(
"You lack permission to kick members.", ephemeral=True
)
return
self.message = interaction.message
await interaction.response.send_modal(self.KickModal(self))
@discord.ui.button(label="Timeout", style=discord.ButtonStyle.secondary)
async def timeout_action(
self, interaction: discord.Interaction, button: discord.ui.Button
):
if not interaction.user.guild_permissions.moderate_members:
await interaction.response.send_message(
"You lack permission to timeout members.", ephemeral=True
)
return
self.message = interaction.message
await interaction.response.send_modal(self.TimeoutModal(self))
@discord.ui.button(label="Ignore", style=discord.ButtonStyle.secondary)
async def ignore(
self, interaction: discord.Interaction, button: discord.ui.Button
):
if interaction.user.guild_permissions.manage_messages:
await interaction.message.delete()
await interaction.response.send_message(
"Notification dismissed.", ephemeral=True
)
else:
await interaction.response.send_message(
"No permission to manage messages.", ephemeral=True
)
async def cog_load(self):
"""Called when the cog is loaded."""
print("AIModerationCog cog_load started.")
if not self.genai_client:
print("\n" + "=" * 60)
print(
"=== WARNING: AIModerationCog - Vertex AI Client not initialized! ==="
)
print("=== The Moderation Cog requires a valid Vertex AI setup. ===")
print(
f"=== Check PROJECT_ID and LOCATION in gurt.config and GCP authentication. ==="
)
print("=" * 60 + "\n")
else:
print("AIModerationCog: Vertex AI Client seems to be initialized.")
print("AIModerationCog cog_load finished.")
# _load_openrouter_models is no longer needed.
async def cog_unload(self):
"""Clean up when the cog is unloaded."""
# The genai.Client doesn't have an explicit close method in the same way aiohttp.ClientSession does.
# It typically manages its own resources.
print("AIModerationCog Unloaded.")
if self.backup_task:
self.backup_task.cancel()
async def backup_infractions_periodically(self):
"""Periodically back up the infractions file."""
await self.bot.wait_until_ready()
while not self.bot.is_closed():
timestamp = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S")
backup_path = os.path.join(
INFRACTION_BACKUP_DIR, f"user_infractions_{timestamp}.json"
)
try:
shutil.copy(USER_INFRACTIONS_PATH, backup_path)
except Exception as e: # noqa: BLE001
print(f"Failed to back up infractions: {e}")
await asyncio.sleep(24 * 60 * 60)
async def process_image(self, attachment: discord.Attachment) -> tuple[str, bytes]:
"""
Process an image attachment and return its base64 encoding.
Args:
attachment: The Discord attachment containing the image
Returns:
Tuple of (mime_type, image_bytes)
"""
try:
# Download the image
image_bytes = await attachment.read()
mime_type = (
attachment.content_type or "image/jpeg"
) # Default to jpeg if not specified
# Return the image bytes and mime type
return mime_type, image_bytes
except Exception as e:
print(f"Error processing image: {e}")
return None, None
async def process_gif(self, attachment: discord.Attachment) -> tuple[str, bytes]:
"""Return the raw bytes for a GIF attachment."""
try:
gif_bytes = await attachment.read()
mime_type = attachment.content_type or "image/gif"
return mime_type, gif_bytes
except Exception as e:
print(f"Error processing GIF: {e}")
return None, None
async def process_attachment(
self, attachment: discord.Attachment
) -> tuple[str, bytes, str]:
"""
Process any attachment and return the appropriate image data.
Args:
attachment: The Discord attachment
Returns:
Tuple of (mime_type, image_bytes, attachment_type)
attachment_type is one of: 'image', 'gif', 'video', or None if unsupported
"""
if not attachment:
return None, None, None
# Get the file extension
filename = attachment.filename.lower()
_, ext = os.path.splitext(filename)
# Process based on file type
if ext in self.image_extensions:
mime_type, image_bytes = await self.process_image(attachment)
return mime_type, image_bytes, "image"
elif ext in self.gif_extensions:
mime_type, image_bytes = await self.process_gif(attachment)
return mime_type, image_bytes, "gif"
elif ext in self.video_extensions:
mime_type, image_bytes = await self.process_video(attachment)
return mime_type, image_bytes, "video"
else:
print(f"Unsupported file type: {ext}")
return None, None, None
async def process_video(self, attachment: discord.Attachment) -> tuple[str, bytes]:
"""Return the raw bytes for a video attachment."""
try:
video_bytes = await attachment.read()
mime_type = attachment.content_type or "video/mp4"
return mime_type, video_bytes
except Exception as e:
print(f"Error processing video: {e}")
return None, None
async def process_url_attachment(self, url: str) -> tuple[str, bytes, str, str]:
"""Fetch an attachment from a direct link."""
import aiohttp
try:
cleaned_url = url.strip("<>")
filename = cleaned_url.split("/")[-1].split("?")[0]
_, ext = os.path.splitext(filename.lower())
if ext in self.image_extensions:
attachment_type = "image"
elif ext in self.gif_extensions:
attachment_type = "gif"
elif ext in self.video_extensions:
attachment_type = "video"
else:
return None, None, None, None
async with aiohttp.ClientSession() as session:
async with session.get(cleaned_url) as resp:
if resp.status != 200:
print(
f"Failed to fetch URL attachment {cleaned_url}: {resp.status}"
)
return None, None, None, None
data = await resp.read()
mime_type = resp.headers.get(
"Content-Type", f"image/{ext.lstrip('.')}"
)
return mime_type, data, attachment_type, filename
except Exception as e:
print(f"Error processing URL attachment {url}: {e}")
return None, None, None, None
def extract_direct_attachment_urls(self, text: str) -> List[str]:
"""Return a list of direct image/video URLs found in the text."""
urls = re.findall(r"https?://\S+", text or "")
allowed_exts = (
self.image_extensions + self.gif_extensions + self.video_extensions
)
results = []
for u in urls:
cleaned = u.strip("<>")
path = cleaned.split("?")[0]
_, ext = os.path.splitext(path.lower())
if ext in allowed_exts:
results.append(cleaned)
return results
# --- AI Moderation Command Group ---
aimod_group = app_commands.Group(
name="aimod", description="AI Moderation commands."
)
config_subgroup = app_commands.Group(
name="config",
description="Configure AI moderation settings.",
parent=aimod_group,
)
infractions_subgroup = app_commands.Group(
name="infractions", description="Manage user infractions.", parent=aimod_group
)
appeal_subgroup = app_commands.Group(
name="appeal", description="Appeal AI moderation actions.", parent=aimod_group
)
model_subgroup = app_commands.Group(
name="model",
description="Manage the AI model for moderation.",
parent=aimod_group,
)
debug_subgroup = app_commands.Group(
name="debug",
description="Debugging commands for AI moderation.",
parent=aimod_group,
)
@aimod_group.command(
name="sync",
description="Reload AI moderation configuration and infractions from disk.",
)
@app_commands.checks.has_permissions(administrator=True)
async def aimod_sync(self, interaction: discord.Interaction):
"""Reload configuration files from disk."""
try:
async with aiofiles.open(GUILD_CONFIG_PATH, "r", encoding="utf-8") as f:
data = await f.read()
async with CONFIG_LOCK:
global GUILD_CONFIG
GUILD_CONFIG = json.loads(data)
async with aiofiles.open(
USER_INFRACTIONS_PATH, "r", encoding="utf-8"
) as f2:
data2 = await f2.read()
async with CONFIG_LOCK:
global USER_INFRACTIONS
USER_INFRACTIONS = json.loads(data2)
await interaction.response.send_message(
"Configuration synced from disk.", ephemeral=True
)
except Exception as e: # noqa: BLE001
await interaction.response.send_message(
f"Failed to reload configuration: {e}", ephemeral=True
)
@config_subgroup.command(
name="log_channel", description="Set the moderation log channel."
)
@app_commands.describe(channel="The text channel to use for moderation logs.")
@app_commands.checks.has_permissions(administrator=True)
async def modset_log_channel(
self, interaction: discord.Interaction, channel: discord.TextChannel
):
await set_guild_config(interaction.guild.id, "MOD_LOG_CHANNEL_ID", channel.id)
await interaction.response.send_message(
f"Moderation log channel set to {channel.mention}.", ephemeral=False
)
@config_subgroup.command(
name="suggestions_channel", description="Set the suggestions channel."
)
@app_commands.describe(channel="The text channel to use for suggestions.")
@app_commands.checks.has_permissions(administrator=True)
async def modset_suggestions_channel(
self, interaction: discord.Interaction, channel: discord.TextChannel
):
await set_guild_config(
interaction.guild.id, "SUGGESTIONS_CHANNEL_ID", channel.id
)
await interaction.response.send_message(
f"Suggestions channel set to {channel.mention}.", ephemeral=False
)
@config_subgroup.command(
name="moderator_role", description="Set the moderator role."
)
@app_commands.describe(role="The role that identifies moderators.")
@app_commands.checks.has_permissions(administrator=True)
async def modset_moderator_role(
self, interaction: discord.Interaction, role: discord.Role
):
await set_guild_config(interaction.guild.id, "MODERATOR_ROLE_ID", role.id)
await interaction.response.send_message(
f"Moderator role set to {role.mention}.", ephemeral=False
)
@config_subgroup.command(
name="suicidal_ping_role",
description="Set the role to ping for suicidal content.",
)
@app_commands.describe(role="The role to ping for urgent suicidal content alerts.")
@app_commands.checks.has_permissions(administrator=True)
async def modset_suicidal_ping_role(
self, interaction: discord.Interaction, role: discord.Role
):
await set_guild_config(interaction.guild.id, "SUICIDAL_PING_ROLE_ID", role.id)
await interaction.response.send_message(
f"Suicidal content ping role set to {role.mention}.", ephemeral=False
)
@config_subgroup.command(
name="add_nsfw_channel",
description="Add a channel to the list of NSFW channels.",
)
@app_commands.describe(channel="The text channel to mark as NSFW for the bot.")
@app_commands.checks.has_permissions(administrator=True)
async def modset_add_nsfw_channel(
self, interaction: discord.Interaction, channel: discord.TextChannel
):
guild_id = interaction.guild.id
nsfw_channels: list[int] = get_guild_config(guild_id, "NSFW_CHANNEL_IDS", [])
if channel.id not in nsfw_channels:
nsfw_channels.append(channel.id)
await set_guild_config(guild_id, "NSFW_CHANNEL_IDS", nsfw_channels)
await interaction.response.send_message(
f"{channel.mention} added to NSFW channels list.", ephemeral=False
)
else:
await interaction.response.send_message(
f"{channel.mention} is already in the NSFW channels list.",
ephemeral=True,
)
@config_subgroup.command(
name="remove_nsfw_channel",
description="Remove a channel from the list of NSFW channels.",
)
@app_commands.describe(channel="The text channel to remove from the NSFW list.")
@app_commands.checks.has_permissions(administrator=True)
async def modset_remove_nsfw_channel(
self, interaction: discord.Interaction, channel: discord.TextChannel
):
guild_id = interaction.guild.id
nsfw_channels: list[int] = get_guild_config(guild_id, "NSFW_CHANNEL_IDS", [])
if channel.id in nsfw_channels:
nsfw_channels.remove(channel.id)
await set_guild_config(guild_id, "NSFW_CHANNEL_IDS", nsfw_channels)
await interaction.response.send_message(
f"{channel.mention} removed from NSFW channels list.", ephemeral=False
)
else:
await interaction.response.send_message(
f"{channel.mention} is not in the NSFW channels list.", ephemeral=True
)
@config_subgroup.command(
name="list_nsfw_channels",
description="List currently configured NSFW channels.",
)
@app_commands.checks.has_permissions(administrator=True)
async def modset_list_nsfw_channels(self, interaction: discord.Interaction):
guild_id = interaction.guild.id
nsfw_channel_ids: list[int] = get_guild_config(guild_id, "NSFW_CHANNEL_IDS", [])
if not nsfw_channel_ids:
await interaction.response.send_message(
"No NSFW channels are currently configured.", ephemeral=False
)
return
channel_mentions = []
for channel_id in nsfw_channel_ids:
channel_obj = interaction.guild.get_channel(channel_id)
if channel_obj:
channel_mentions.append(channel_obj.mention)
else:
channel_mentions.append(f"ID:{channel_id} (not found)")
await interaction.response.send_message(
f"Configured NSFW channels:\n- " + "\n- ".join(channel_mentions),
ephemeral=False,
)
# Note: The @app_commands.command(name="modenable", ...) and other commands like
# viewinfractions, clearinfractions, modsetmodel, modgetmodel remain as top-level commands
# as they were not part of the original "modset" generic command structure.
# If these also need to be grouped, that would be a separate consideration.
@config_subgroup.command(
name="enable",
description="Enable or disable moderation for this guild (admin only).",
)
@app_commands.describe(enabled="Enable moderation (true/false)")
async def modenable(self, interaction: discord.Interaction, enabled: bool):
if not interaction.user.guild_permissions.administrator:
await interaction.response.send_message(
"You must be an administrator to use this command.", ephemeral=False
)
return
await set_guild_config(interaction.guild.id, "ENABLED", enabled)
await interaction.response.send_message(
f"Moderation is now {'enabled' if enabled else 'disabled'} for this guild.",
ephemeral=False,
)
@config_subgroup.command(
name="event_mode",
description="Toggle temporary event mode for this guild.",
)
@app_commands.describe(enabled="Enable event mode (true/false)")
async def event_mode(self, interaction: discord.Interaction, enabled: bool):
if not interaction.user.guild_permissions.administrator:
await interaction.response.send_message(
"You must be an administrator to use this command.", ephemeral=False
)
return
await set_guild_config(interaction.guild.id, "EVENT_MODE", enabled)
await interaction.response.send_message(
f"Event mode is now {'enabled' if enabled else 'disabled'}.",
ephemeral=False,
)
@config_subgroup.command(
name="testing_mode",
description="Enable or disable testing mode (no actions are taken).",
)
@app_commands.describe(enabled="Enable testing mode (true/false)")
async def testing_mode(self, interaction: discord.Interaction, enabled: bool):
if not interaction.user.guild_permissions.administrator:
await interaction.response.send_message(
"You must be an administrator to use this command.", ephemeral=False
)
return
await set_guild_config(interaction.guild.id, "TESTING_MODE", enabled)
await interaction.response.send_message(
f"Testing mode is now {'enabled' if enabled else 'disabled'}.",
ephemeral=False,
)
@config_subgroup.command(
name="update_rules",
description="Update server rules from the specified channel.",
)
@app_commands.describe(channel="The channel containing the server rules.")
@app_commands.checks.has_permissions(administrator=True)
async def update_rules(
self, interaction: discord.Interaction, channel: discord.TextChannel
) -> None:
"""Pull the server rules from a channel and update the global config."""
messages = []
async for msg in channel.history(
limit=MAX_RULE_MESSAGES + 1, oldest_first=True
):
if msg.content:
messages.append(msg.content)
if len(messages) > MAX_RULE_MESSAGES:
await interaction.response.send_message(
f"Channel has more than {MAX_RULE_MESSAGES} messages."
" Please consolidate your rules into fewer messages.",
ephemeral=True,
)
return
if not messages:
await interaction.response.send_message(
"No messages found in that channel.", ephemeral=True
)
return
rules_text = "\n\n".join(messages).strip()
aimod_config_module.SERVER_RULES = rules_text
await interaction.response.send_message(
f"Server rules updated from {channel.mention}.", ephemeral=False
)
@config_subgroup.command(
name="reset_rules",
description="Reset server rules to the default hardcoded version.",
)
@app_commands.checks.has_permissions(administrator=True)
async def reset_rules(self, interaction: discord.Interaction) -> None:
"""Reset the server rules to the default string."""
aimod_config_module.SERVER_RULES = aimod_config_module.DEFAULT_SERVER_RULES
await interaction.response.send_message(
"Server rules have been reset to the default.", ephemeral=False
)
@config_subgroup.command(
name="update_instructions",
description="Update moderation instructions from the specified channel.",
)
@app_commands.describe(
channel="The channel containing the moderation instructions."
)
@app_commands.checks.has_permissions(administrator=True)
async def update_instructions(
self, interaction: discord.Interaction, channel: discord.TextChannel
) -> None:
"""Pull moderation instructions from a channel and update the global config."""
messages = []
async for msg in channel.history(
limit=MAX_RULE_MESSAGES + 1, oldest_first=True
):
if msg.content:
messages.append(msg.content)
if len(messages) > MAX_RULE_MESSAGES:
await interaction.response.send_message(
f"Channel has more than {MAX_RULE_MESSAGES} messages."
" Please consolidate your instructions into fewer messages.",
ephemeral=True,
)
return
if not messages:
await interaction.response.send_message(
"No messages found in that channel.", ephemeral=True
)
return
instructions_text = "\n\n".join(messages).strip()
aimod_config_module.MODERATION_INSTRUCTIONS = instructions_text
await interaction.response.send_message(
f"Moderation instructions updated from {channel.mention}.", ephemeral=False
)
@config_subgroup.command(
name="reset_instructions",
description="Reset moderation instructions to the default version.",
)
@app_commands.checks.has_permissions(administrator=True)
async def reset_instructions(self, interaction: discord.Interaction) -> None:
"""Reset moderation instructions to the default string."""
aimod_config_module.MODERATION_INSTRUCTIONS = (
aimod_config_module.DEFAULT_MODERATION_INSTRUCTIONS
)
await interaction.response.send_message(
"Moderation instructions have been reset to the default.", ephemeral=False
)
@infractions_subgroup.command(
name="view",
description="View a user's AI moderation infraction history (mod/admin only).",
)
@app_commands.describe(user="The user to view infractions for")
async def viewinfractions(
self, interaction: discord.Interaction, user: discord.Member
):
# Check if user has permission (admin or moderator role)
moderator_role_id = get_guild_config(interaction.guild.id, "MODERATOR_ROLE_ID")
moderator_role = (
interaction.guild.get_role(moderator_role_id) if moderator_role_id else None
)
has_permission = interaction.user.guild_permissions.administrator or (
moderator_role and moderator_role in interaction.user.roles
)
if not has_permission:
await interaction.response.send_message(
"You must be an administrator or have the moderator role to use this command.",
ephemeral=True,
)
return
# Get the user's infraction history
infractions = get_user_infraction_history(interaction.guild.id, user.id)
if not infractions:
await interaction.response.send_message(
f"{user.mention} has no recorded infractions.", ephemeral=False
)
return
# Create an embed to display the infractions
embed = discord.Embed(
title=f"Infraction History for {user.display_name}",
description=f"User ID: {user.id}",
color=discord.Color.orange(),
)
# Add each infraction to the embed
for i, infraction in enumerate(infractions, 1):
timestamp = infraction.get("timestamp", "Unknown date")[:19].replace(
"T", " "
) # Format ISO timestamp
rule = infraction.get("rule_violated", "Unknown rule")
action = infraction.get("action_taken", "Unknown action")
reason = infraction.get("reasoning", "No reason provided")
# Truncate reason if it's too long
if len(reason) > 200:
reason = reason[:197] + "..."
embed.add_field(
name=f"Infraction #{i} - {timestamp}",
value=f"**Rule Violated:** {rule}\n**Action Taken:** {action}\n**Reason:** {reason}",
inline=False,
)
embed.set_footer(text=f"Total infractions: {len(infractions)}")
embed.timestamp = discord.utils.utcnow()
await interaction.response.send_message(embed=embed, ephemeral=False)
@appeal_subgroup.command(
name="human_review",
description="Request a human moderator to review your case.",
)
@app_commands.describe(
reason="Explain why you want a human to review the AI decision",
guild_id="If using in DMs, provide the server ID",
)
async def appeal_human_review(
self,
interaction: discord.Interaction,
reason: str,
guild_id: int | None = None,
):
"""Let a user request a manual moderator review."""
guild = interaction.guild or (
self.bot.get_guild(guild_id) if guild_id else None
)
if not guild:
await interaction.response.send_message(
"Invalid or missing guild ID.", ephemeral=True
)
return
log_channel_id = get_guild_config(guild.id, "MOD_LOG_CHANNEL_ID")
log_channel = self.bot.get_channel(log_channel_id) if log_channel_id else None
if not log_channel:
await interaction.response.send_message(
"Appeals are not enabled for this server.", ephemeral=True
)
return
timestamp = datetime.datetime.utcnow().isoformat()
await add_user_appeal(
guild.id, interaction.user.id, "HUMAN_REVIEW", reason, timestamp, ""
)
embed = discord.Embed(
title="Human Review Requested", color=discord.Color.orange()
)
embed.add_field(
name="User",
value=f"{interaction.user} ({interaction.user.id})",
inline=False,
)
embed.add_field(name="Request", value=reason, inline=False)
embed.timestamp = discord.utils.utcnow()
await log_channel.send(embed=embed)
await interaction.response.send_message(
"Your request for a human review has been sent.", ephemeral=True
)
@infractions_subgroup.command(
name="clear",
description="Clear a user's AI moderation infraction history (admin only).",
)
@app_commands.describe(user="The user to clear infractions for")
async def clearinfractions(
self, interaction: discord.Interaction, user: discord.Member
):
# Check if user has administrator permission
if not interaction.user.guild_permissions.administrator:
await interaction.response.send_message(
"You must be an administrator to use this command.", ephemeral=True
)
return
# Get the user's infraction history
key = f"{interaction.guild.id}_{user.id}"
infractions = USER_INFRACTIONS.get(key, [])
if not infractions:
await interaction.response.send_message(
f"{user.mention} has no recorded infractions to clear.", ephemeral=False
)
return
# Clear the user's infractions
USER_INFRACTIONS[key] = []
await save_user_infractions()
await interaction.response.send_message(
f"Cleared {len(infractions)} infraction(s) for {user.mention}.",
ephemeral=False,
)
@infractions_subgroup.command(
name="leaderboard",
description="Show users with the fewest infractions.",
)
async def leaderboard(self, interaction: discord.Interaction):
guild_id = interaction.guild.id
counts = {}
for key, infractions in USER_INFRACTIONS.items():
if key.startswith(f"{guild_id}_"):
uid = int(key.split("_", 1)[1])
counts[uid] = len(infractions)
if not counts:
await interaction.response.send_message(
"No infractions recorded for this guild.", ephemeral=True
)
return
sorted_users = sorted(counts.items(), key=lambda x: x[1])[:5]
lines = []
for uid, count in sorted_users:
member = interaction.guild.get_member(uid)
name = member.display_name if member else f"ID:{uid}"
lines.append(f"**{name}** - {count} infractions")
embed = discord.Embed(
title="Best Behavior Leaderboard",
description="\n".join(lines),
color=discord.Color.green(),
)
await interaction.response.send_message(embed=embed, ephemeral=False)
@infractions_subgroup.command(
name="restore",
description="Restore infractions from the latest backup (admin only).",
)
@app_commands.checks.has_permissions(administrator=True)
async def restore_infractions(self, interaction: discord.Interaction):
backups = sorted(os.listdir(INFRACTION_BACKUP_DIR))
if not backups:
await interaction.response.send_message("No backups found.", ephemeral=True)
return
latest = os.path.join(INFRACTION_BACKUP_DIR, backups[-1])
try:
shutil.copy(latest, USER_INFRACTIONS_PATH)
async with aiofiles.open(USER_INFRACTIONS_PATH, "r", encoding="utf-8") as f:
data = await f.read()
async with CONFIG_LOCK:
global USER_INFRACTIONS
USER_INFRACTIONS = json.loads(data)
await interaction.response.send_message(
f"Infractions restored from {backups[-1]}", ephemeral=False
)
except Exception as e: # noqa: BLE001
await interaction.response.send_message(
f"Failed to restore infractions: {e}", ephemeral=True
)
@appeal_subgroup.command(name="submit", description="Submit a moderation appeal.")
@app_commands.describe(
action="The action you are appealing",
reason="Explain why you believe the action was incorrect",
guild_id="If using in DMs, provide the server ID",
message_id="ID of the moderated message you are appealing (optional)",
)
async def appeal_submit(
self,
interaction: discord.Interaction,
action: str,
reason: str,
guild_id: int | None = None,
message_id: int | None = None,
):
guild = interaction.guild or (
self.bot.get_guild(guild_id) if guild_id else None
)
if not guild:
await interaction.response.send_message(
"Invalid or missing guild ID.", ephemeral=True
)
return
log_channel_id = get_guild_config(guild.id, "MOD_LOG_CHANNEL_ID")
log_channel = self.bot.get_channel(log_channel_id) if log_channel_id else None
if not log_channel:
await interaction.response.send_message(
"Appeals are not enabled for this server.", ephemeral=True
)
return
infractions = get_user_infraction_history(guild.id, interaction.user.id)
target_infraction = None
if message_id:
for infr in infractions[::-1]:
if infr.get("message_id") == message_id:
target_infraction = infr
break
if not target_infraction and infractions:
target_infraction = infractions[-1]
ai_review = await self.run_appeal_ai(
guild,
interaction.user,
action,
reason,
target_infraction,
)
timestamp = datetime.datetime.utcnow().isoformat()
ref = target_infraction.get("message_id") if target_infraction else None
await add_user_appeal(
guild.id,
interaction.user.id,
action,
reason,
timestamp,
ai_review,
str(ref) if ref else None,
)
embed = discord.Embed(title="New Appeal", color=discord.Color.blue())
embed.add_field(
name="User",
value=f"{interaction.user} ({interaction.user.id})",
inline=False,
)
embed.add_field(name="Action", value=action, inline=False)
if ref:
embed.add_field(name="Infraction", value=f"Message ID: {ref}", inline=False)
msg_snip = (
target_infraction.get("message_content") if target_infraction else None
)
if msg_snip:
embed.add_field(
name="Message Snippet",
value=f"`{msg_snip}`",
inline=False,
)
attachments = (
target_infraction.get("attachments") if target_infraction else None
)
if attachments:
attach_text = "\n".join(attachments)
if len(attach_text) > 1024:
attach_text = attach_text[:1021] + "..."
embed.add_field(
name="Attachments",
value=attach_text,
inline=False,
)
embed.add_field(name="Appeal", value=reason, inline=False)
embed.add_field(name="AI Review", value=ai_review[:1000], inline=False)
embed.timestamp = discord.utils.utcnow()
await log_channel.send(embed=embed)
await interaction.response.send_message(
"Your appeal has been submitted.", ephemeral=True
)
@appeal_subgroup.command(
name="list", description="View a user's appeals (mods only)."
)
@app_commands.describe(user="The user to view appeals for")
async def appeal_list(self, interaction: discord.Interaction, user: discord.Member):
moderator_role_id = get_guild_config(interaction.guild.id, "MODERATOR_ROLE_ID")
moderator_role = (
interaction.guild.get_role(moderator_role_id) if moderator_role_id else None
)
has_permission = interaction.user.guild_permissions.administrator or (
moderator_role and moderator_role in interaction.user.roles
)
if not has_permission:
await interaction.response.send_message(
"You must be an administrator or have the moderator role to use this command.",
ephemeral=True,
)
return
appeals = get_user_appeals(interaction.guild.id, user.id)
history = get_user_infraction_history(interaction.guild.id, user.id)
if not appeals:
await interaction.response.send_message(
f"{user.mention} has no appeals.", ephemeral=False
)
return
embed = discord.Embed(
title=f"Appeals for {user.display_name}", color=discord.Color.blue()
)
for i, appeal in enumerate(appeals, 1):
ts = appeal.get("timestamp", "?")[:19].replace("T", " ")
summary = appeal.get("appeal_text", "")
ai_sum = appeal.get("ai_review", "")
if len(summary) > 150:
summary = summary[:147] + "..."
if len(ai_sum) > 150:
ai_sum = ai_sum[:147] + "..."
value = f"Action: {appeal.get('action')}\nReason: {summary}\nAI: {ai_sum}"
ref = appeal.get("infraction_reference")
if ref:
value = f"Infraction: {ref}\n" + value
for infr in history:
if str(infr.get("message_id")) == str(ref):
msg_snip = infr.get("message_content")
if msg_snip:
value += f"\nSnippet: {msg_snip}"
attachments = infr.get("attachments")
if attachments:
attach_txt = ", ".join(attachments)
if len(attach_txt) > 200:
attach_txt = attach_txt[:197] + "..."
value += f"\nAttachments: {attach_txt}"
reason = infr.get("reasoning")
if reason:
if len(reason) > 150:
reason = reason[:147] + "..."
value += f"\nAI Reasoning: {reason}"
break
embed.add_field(name=f"Appeal #{i} - {ts}", value=value, inline=False)
await interaction.response.send_message(embed=embed, ephemeral=False)
@appeal_subgroup.command(
name="testcases",
description="Run sample appeals through the AI review system (admin only).",
)
async def appeal_testcases(self, interaction: discord.Interaction):
"""Run a few hardcoded appeal scenarios through the AI with context."""
if not interaction.user.guild_permissions.administrator:
await interaction.response.send_message(
"You must be an administrator to use this command.",
ephemeral=True,
)
return
await interaction.response.defer(thinking=True, ephemeral=True)
scenarios = [
("WARN", "I was excited and sent many messages quickly."),
("MUTE", "I only quoted a meme and it was taken as harassment."),
("BAN", "I posted NSFW art but believed it was allowed."),
]
results: list[tuple[str, str]] = []
for idx, (action, text) in enumerate(scenarios, 1):
dummy_infraction = {
"message_id": 1000 + idx,
"channel_id": interaction.channel.id,
"message_content": f"Example offending message {idx}",
"attachments": [],
"reasoning": "Automated moderation reasoning sample",
}
result = await self.run_appeal_ai(
interaction.guild, interaction.user, action, text, dummy_infraction
)
results.append((action, result))
embed = discord.Embed(
title="Appeal AI Test Results", color=discord.Color.green()
)
for action, result in results:
field_text = result if result else "No response"
if len(field_text) > 1000:
field_text = field_text[:997] + "..."
embed.add_field(name=action, value=field_text, inline=False)
await interaction.followup.send(embed=embed, ephemeral=True)
@model_subgroup.command(
name="set", description="Change the AI model used for moderation (admin only)."
)
@app_commands.describe(
model="The Vertex AI model to use (e.g., 'gemini-1.5-flash-001', 'gemini-1.0-pro')"
)
async def modsetmodel(self, interaction: discord.Interaction, model: str):
# Check if user has administrator permission
if not interaction.user.guild_permissions.administrator:
await interaction.response.send_message(
"You must be an administrator to use this command.", ephemeral=True
)
return
# Validate the model name (basic validation for Vertex AI)
# Vertex AI models usually don't have "/" like OpenRouter, but can have "-" and numbers.
# Example: gemini-1.5-flash-001
if not model or len(model) < 5: # Basic check
await interaction.response.send_message(
"Invalid model format. Please provide a valid Vertex AI model ID (e.g., 'gemini-1.5-flash-001').",
ephemeral=False,
)
return
# Save the model to guild configuration
guild_id = interaction.guild.id
await set_guild_config(guild_id, "AI_MODEL", model)
# Note: There's no global model variable to update here like OPENROUTER_MODEL.
# The cog will use the guild-specific config or the DEFAULT_VERTEX_AI_MODEL.
await interaction.response.send_message(
f"AI moderation model updated to `{model}` for this guild.", ephemeral=False
)
# @modsetmodel.autocomplete('model') # Autocomplete removed as OpenRouter models are not used.
# async def modsetmodel_autocomplete(...): # This function is now removed.
@model_subgroup.command(
name="get", description="View the current AI model used for moderation."
)
async def modgetmodel(self, interaction: discord.Interaction):
# Get the model from guild config, fall back to global default
guild_id = interaction.guild.id
model_used = get_guild_config(guild_id, "AI_MODEL", DEFAULT_VERTEX_AI_MODEL)
# Create an embed to display the model information
embed = discord.Embed(
title="AI Moderation Model",
description=f"The current AI model used for moderation in this server is:",
color=discord.Color.blue(),
)
embed.add_field(name="Model In Use", value=f"`{model_used}`", inline=False)
embed.add_field(
name="Default Model", value=f"`{DEFAULT_VERTEX_AI_MODEL}`", inline=False
)
embed.set_footer(text="Use /aimod model set to change the model")
await interaction.response.send_message(embed=embed, ephemeral=False)
# --- Helper Function to Safely Extract Text from Vertex AI Response ---
def _get_response_text(
self, response: Optional[types.GenerateContentResponse]
) -> Optional[str]:
"""
Safely extracts the text content from the first text part of a GenerateContentResponse.
Handles potential errors and lack of text parts gracefully.
(Adapted from teto_cog.py)
"""
if not response:
print("[AIModerationCog._get_response_text] Received None response object.")
return None
if (
hasattr(response, "text") and response.text
): # Some simpler responses might have .text directly
print(
"[AIModerationCog._get_response_text] Found text directly in response.text attribute."
)
return response.text
if not response.candidates:
print(
f"[AIModerationCog._get_response_text] Response object has no candidates. Response: {response}"
)
return None
try:
candidate = response.candidates[0]
if not hasattr(candidate, "content") or not candidate.content:
print(
f"[AIModerationCog._get_response_text] Candidate 0 has no 'content'. Candidate: {candidate}"
)
return None
if not hasattr(candidate.content, "parts") or not candidate.content.parts:
print(
f"[AIModerationCog._get_response_text] Candidate 0 content has no 'parts' or parts list is empty. types.Content: {candidate.content}"
)
return None
for i, part in enumerate(candidate.content.parts):
if hasattr(part, "text") and part.text is not None:
if isinstance(part.text, str) and part.text.strip():
print(
f"[AIModerationCog._get_response_text] Found non-empty text in part {i}."
)
return part.text
else:
print(
f"[AIModerationCog._get_response_text] types.Part {i} has 'text' attribute, but it's empty or not a string: {part.text!r}"
)
print(
f"[AIModerationCog._get_response_text] No usable text part found in candidate 0 after iterating through all parts."
)
return None
except (AttributeError, IndexError, TypeError) as e:
print(
f"[AIModerationCog._get_response_text] Error accessing response structure: {type(e).__name__}: {e}"
)
print(f"Problematic response object: {response}")
return None
except Exception as e:
print(
f"[AIModerationCog._get_response_text] Unexpected error extracting text: {e}"
)
print(f"Response object during error: {response}")
return None
async def query_vertex_ai(
self,
message: discord.Message,
message_content: str,
user_history: str,
image_data_list: Optional[List[Tuple[str, bytes, str, str]]] = None,
):
"""
Sends the message content, user history, and additional context to Google Vertex AI for analysis.
Optionally includes image data for visual content moderation.
Args:
message: The original discord.Message object.
message_content: The text content of the message.
user_history: A string summarizing the user's past infractions.
image_data_list: Optional list of tuples (mime_type, image_bytes, attachment_type, filename) for image moderation.
Returns:
A dictionary containing the AI's decision, or None if an error occurs.
"""
print(
f"query_vertex_ai called. Vertex AI client available: {self.genai_client is not None}"
)
if not self.genai_client:
print("Error: Vertex AI Client is not available. Cannot query API.")
return None
# Construct the prompt for the AI model (system prompt is largely the same)
system_prompt_text = (
"You are an AI moderation assistant for a Discord server.\n"
"Your primary function is to analyze message content and attached media based STRICTLY on the server rules provided below, using all available context.\n\n"
"Server Rules:\n"
"---\n"
f"{SERVER_RULES}\n"
"---\n\n"
"Context Provided:\n"
" You will receive the following information to aid your analysis:\n"
'- User\'s Server Role: (e.g., "Server Owner", "Admin", "Moderator", "Member").\n'
"- Channel Category: The name of the category the channel belongs to.\n"
"- Channel Age-Restricted/NSFW (Discord Setting): Boolean (true/false).\n"
"- Replied-to Message: If the current message is a reply, the content of the original message will be provided. This is crucial for understanding direct interactions.\n"
"- Recent Channel History: The last few messages in the channel to understand the flow of conversation.\n"
" - Attached Media: If the message contains image, GIF, or video attachments, they will be provided directly in the content array for analysis.\n\n"
"Instructions:\n"
'1. Review the "Message Content" and any attached media against EACH rule, considering ALL provided context (User Role, Channel Info, Replied-to Message, Recent Channel History).\n'
' - The "Channel Age-Restricted/NSFW (Discord Setting)" is the definitive indicator for NSFW content by Discord.\n'
' - The "Channel Category" provides general context.\n'
' - **"Replied-to Message" and "Recent Channel History" are vital for understanding banter, jokes, and ongoing discussions. A statement that seems offensive in isolation might be acceptable within the flow of conversation or as a direct reply.**\n'
" - If images, GIFs, or videos are attached, analyze ALL of them for rule violations.\n"
" - Pay special attention to images that may contain NSFW content, pornography, gore, or other prohibited visual content.\n"
" - **Do not attempt to guess or estimate the ages of characters or people in images.** Only act on explicit textual context indicating they are minors.\n"
" - If multiple attachments are present, a violation in ANY of them should be flagged.\n"
f"{aimod_config_module.MODERATION_INSTRUCTIONS}\n"
"3. Respond ONLY with a single JSON object containing the following keys:\n"
' - "reasoning": string (A concise explanation for your decision, referencing the specific rule and content).\n'
' - "violation": boolean (true if any rule is violated, false otherwise)\n'
' - "rule_violated": string (The number of the rule violated, e.g., "1", "5A", "None". If multiple rules are violated, state the MOST SEVERE one, prioritizing 5 > 5A > 3 > 2 > 1. Ignore any rules about AI-generated pornography.)\n'
' - "action": string (Suggest ONE action from: "IGNORE", "WARN", "DELETE", "TIMEOUT_SHORT", "TIMEOUT_MEDIUM", "TIMEOUT_LONG", "KICK", "BAN", "NOTIFY_MODS", "SUICIDAL".\n'
' - "notify_mods_message": optional string (If the suggested action is "NOTIFY_MODS", provide an optional brief message here for the moderators, e.g., "User\'s message is slightly ambiguous, human review needed.").\n'
" Consider the user's infraction history. If the user has prior infractions for similar or escalating behavior, suggest a more severe action than if it were a first-time offense for a minor rule.\n"
" Progressive Discipline Guide (unless overridden by severity):\n"
' - First minor offense: "WARN" (and "DELETE" if content is removable like Rule 1/4).\n'
' - Second minor offense / First moderate offense: "TIMEOUT_SHORT" (e.g., 10 minutes).\n'
' - Repeated moderate offenses: "TIMEOUT_MEDIUM" (e.g., 1 hour).\n'
' - Multiple/severe offenses: "TIMEOUT_LONG" (e.g., 1 day), "KICK", or "BAN".\n'
' - Use "BAN" on a user\'s **first infraction only in extremely severe cases** such as posting gore or unmistakable real-life CSAM involving minors. If the content appears animated or ambiguous, do **not** immediately ban; a timeout or moderator review is more appropriate.\n'
" Spamming:\n"
' - If a user continuously sends very long messages that are off-topic, repetitive, or appear to be meaningless spam (e.g., character floods, nonsensical text), suggest "TIMEOUT_MEDIUM" or "TIMEOUT_LONG" depending on severity and history, even if the content itself doesn\'t violate other specific rules. This is to maintain chat readability.\n'
" Rule Severity Guidelines (use your judgment):\n"
" - Consider the severity of each rule violation on its own merits.\n"
" - Consider the user's history of past infractions when determining appropriate action.\n"
" - Consider the context of the message and channel when evaluating violations.\n"
" - You have full discretion to determine the most appropriate action for any violation.\n"
" Suicidal Content:\n"
' If the message content expresses **clear, direct, and serious suicidal ideation, intent, planning, or recent attempts** (e.g., \'I am going to end my life and have a plan\', \'I survived my attempt last night\', \'I wish I hadn\'t woken up after trying\'), ALWAYS use "SUICIDAL" as the action, and set "violation" to true, with "rule_violated" as "Suicidal Content".\n'
" For casual, edgy, hyperbolic, or ambiguous statements like 'imma kms', 'just kill me now', 'I want to die (lol)', or phrases that are clearly part of edgy humor/banter rather than a genuine cry for help, you should lean towards \"IGNORE\" or \"NOTIFY_MODS\" if there's slight ambiguity but no clear serious intent. **Do NOT flag 'imma kms' as \"SUICIDAL\" unless there is very strong supporting context indicating genuine, immediate, and serious intent.**\n"
' If unsure but suspicious, or if the situation is complex: "NOTIFY_MODS".\n'
' Default action for minor first-time rule violations should be "WARN" or "DELETE" (if applicable).\n'
' Do not suggest "KICK" or "BAN" lightly; reserve for severe or repeated major offenses.\n'
" Timeout durations: TIMEOUT_SHORT (approx 10 mins), TIMEOUT_MEDIUM (approx 1 hour), TIMEOUT_LONG (approx 1 day to 1 week).\n"
" The system will handle the exact timeout duration; you just suggest the category.)\n\n"
"Example Response (Text Violation):\n"
'{{\n "reasoning": "The message content clearly depicts IRL non-consensual sexual content involving minors, violating rule 5A.",\n "violation": true,\n "rule_violated": "5A",\n "action": "BAN"\n}}\n\n'
"Example Response (Image Violation):\n"
'{{\n "reasoning": "Attachment #2 contains explicit pornographic imagery in a non-NSFW channel, violating rule 1.",\n "violation": true,\n "rule_violated": "1",\n "action": "DELETE"\n}}\n\n'
"Example Response (Multiple Attachments Violation):\n"
'{{\n "reasoning": "While the text content is fine, attachment #3 contains IRL pornography, violating rule 5A.",\n "violation": true,\n "rule_violated": "5A",\n "action": "WARN"\n}}\n\n'
"Example Response (No Violation):\n"
'{{\n "reasoning": "The message and all attached images are respectful and contain no prohibited content.",\n "violation": false,\n "rule_violated": "None",\n "action": "IGNORE"\n}}\n\n'
"Example Response (Suicidal Content):\n"
'{{\n "reasoning": "The user\'s message \'I want to end my life\' indicates clear suicidal intent.",\n "violation": true,\n "rule_violated": "Suicidal Content",\n "action": "SUICIDAL"\n}}\n\n'
"Example Response (Notify Mods):\n"
'{{\n "reasoning": "The message contains potentially sensitive content that requires human review.",\n "violation": true,\n "rule_violated": "Review Required",\n "action": "NOTIFY_MODS",\n "notify_mods_message": "Content is borderline, please review."\n}}'
)
member = message.author # This is a discord.Member object
server_role_str = "Unprivileged Member" # Default
if member == await message.guild.fetch_member(message.guild.owner_id):
server_role_str = "Server Owner"
elif member.guild_permissions.administrator:
server_role_str = "Admin"
else:
perms = member.guild_permissions
if (
perms.manage_messages
or perms.kick_members
or perms.ban_members
or perms.moderate_members
):
server_role_str = "Moderator"
print(f"role: {server_role_str}")
# --- Fetch Replied-to Message ---
replied_to_message_content = "N/A (Not a reply)"
if message.reference and message.reference.message_id:
try:
replied_to_msg = await message.channel.fetch_message(
message.reference.message_id
)
replied_to_message_content = f"User '{replied_to_msg.author.name}' said: \"{replied_to_msg.content[:200]}\""
if len(replied_to_msg.content) > 200:
replied_to_message_content += "..."
except discord.NotFound:
replied_to_message_content = "N/A (Replied-to message not found)"
except discord.Forbidden:
replied_to_message_content = (
"N/A (Cannot fetch replied-to message - permissions)"
)
except Exception as e:
replied_to_message_content = (
f"N/A (Error fetching replied-to message: {e})"
)
# --- Fetch Recent Channel History ---
recent_channel_history_str = "N/A (Could not fetch history)"
try:
history_messages = []
# Fetch last 11 messages (current + 10 previous). We'll filter out the current one
async for prev_msg in message.channel.history(limit=11, before=message):
if (
prev_msg.id != message.id
): # Ensure we don't include the current message itself
author_name = (
prev_msg.author.name + " (BOT)"
if prev_msg.author.bot
else prev_msg.author.name
)
history_messages.append(
f"- {author_name}: \"{prev_msg.content[:150]}{'...' if len(prev_msg.content) > 150 else ''}\" (ID: {prev_msg.id})"
)
if history_messages:
# Reverse to show oldest first in the snippet, then take the last 10.
recent_channel_history_str = "\n".join(
list(reversed(history_messages))[:10]
)
else:
recent_channel_history_str = (
"No recent messages before this one in the channel."
)
except discord.Forbidden:
recent_channel_history_str = (
"N/A (Cannot fetch channel history - permissions)"
)
except Exception as e:
recent_channel_history_str = f"N/A (Error fetching channel history: {e})"
# Prepare user prompt content list with proper OpenRouter format
user_prompt_content_list = []
# Add the text context first
user_context_text = f"""User Infraction History (for {message.author.name}, ID: {message.author.id}):
---
{user_history if user_history else "No prior infractions recorded for this user in this guild."}
---
Current Message Context:
- Author: {message.author.name} (ID: {message.author.id})
- Server Role: {server_role_str}
- Channel: #{message.channel.name} (ID: {message.channel.id})
- Channel Category: {message.channel.category.name if message.channel.category else "No Category"}
- Channel Age-Restricted/NSFW (Discord Setting): {message.channel.is_nsfw()}
---
Replied-to Message:
{replied_to_message_content}
---
Recent Channel History (last up to 10 messages before this one):
{recent_channel_history_str}
---
Message Content to Analyze:
"{message_content}"
Now, analyze the message content and any attached media based on the server rules and ALL the context provided above.
Follow the JSON output format specified in the system prompt.
CRITICAL: Do NOT output anything other than the required JSON response.
"""
# Add the text content first
user_prompt_content_list.append({"type": "text", "text": user_context_text})
# Add images in the proper OpenRouter format
if image_data_list and len(image_data_list) > 0:
try:
for i, (mime_type, image_bytes, attachment_type, filename) in enumerate(
image_data_list
):
try:
# Encode image to base64
base64_image = base64.b64encode(image_bytes).decode("utf-8")
# Create data URL
image_data_url = f"data:{mime_type};base64,{base64_image}"
# Add image in OpenRouter format
user_prompt_content_list.append(
{"type": "image_url", "image_url": {"url": image_data_url}}
)
print(
f"Added attachment #{i+1}: {filename} ({attachment_type}) to the prompt"
)
except Exception as e:
print(
f"Error encoding image data for attachment {filename}: {e}"
)
except Exception as e:
print(f"Error processing image data: {e}")
# Add a text note about the error
user_prompt_content_list.append(
{
"type": "text",
"text": f"Note: There were {len(image_data_list)} attached images, but they could not be processed for analysis.",
}
)
# Get guild-specific model if configured, otherwise use default
member = message.author
server_role_str = "Unprivileged Member"
if member == await message.guild.fetch_member(message.guild.owner_id):
server_role_str = "Server Owner"
elif member.guild_permissions.administrator:
server_role_str = "Admin"
else:
perms = member.guild_permissions
if (
perms.manage_messages
or perms.kick_members
or perms.ban_members
or perms.moderate_members
):
server_role_str = "Moderator"
replied_to_message_content = "N/A (Not a reply)"
if message.reference and message.reference.message_id:
try:
replied_to_msg = await message.channel.fetch_message(
message.reference.message_id
)
replied_to_message_content = f"User '{replied_to_msg.author.name}' said: \"{replied_to_msg.content[:200]}{'...' if len(replied_to_msg.content) > 200 else ''}\""
except Exception as e:
replied_to_message_content = f"N/A (Error fetching replied-to: {e})"
recent_channel_history_str = "N/A (Could not fetch history)"
try:
history_messages = [
f"- {prev_msg.author.name}{' (BOT)' if prev_msg.author.bot else ''}: \"{prev_msg.content[:150]}{'...' if len(prev_msg.content) > 150 else ''}\" (ID: {prev_msg.id})"
async for prev_msg in message.channel.history(limit=11, before=message)
if prev_msg.id != message.id
]
recent_channel_history_str = (
"\n".join(list(reversed(history_messages))[:10])
if history_messages
else "No recent messages."
)
except Exception as e:
recent_channel_history_str = f"N/A (Error fetching history: {e})"
user_context_text = f"""User Infraction History (for {message.author.name}, ID: {message.author.id}):
---
{user_history if user_history else "No prior infractions recorded for this user in this guild."}
---
Current Message Context:
- Author: {message.author.name} (ID: {message.author.id})
- Server Role: {server_role_str}
- Channel: #{message.channel.name} (ID: {message.channel.id})
- Channel Category: {message.channel.category.name if message.channel.category else "No Category"}
- Channel Age-Restricted/NSFW (Discord Setting): {message.channel.is_nsfw()}
---
Replied-to Message:
{replied_to_message_content}
---
Recent Channel History (last up to 10 messages before this one):
{recent_channel_history_str}
---
Message Content to Analyze:
"{message_content}"
Now, analyze the message content and any attached media based on the server rules and ALL the context provided above.
Follow the JSON output format specified in the system prompt.
CRITICAL: Do NOT output anything other than the required JSON response.
"""
# Prepare parts for Vertex AI
vertex_parts: List[Any] = [types.Part(text=user_context_text)]
if image_data_list:
for mime_type, image_bytes, attachment_type, filename in image_data_list:
try:
# Vertex AI directly supports common image and video MIME types.
# Ensure mime_type is one of the supported ones by Vertex, e.g., image/png or video/mp4.
supported_image_mimes = [
"image/png",
"image/jpeg",
"image/webp",
"image/heic",
"image/heif",
"image/gif",
]
supported_video_mimes = [
"video/mp4",
"video/webm",
"video/quicktime",
"video/x-msvideo",
"video/x-matroska",
"video/x-flv",
]
clean_mime_type = mime_type.split(";")[0].lower()
if (
clean_mime_type in supported_image_mimes
or clean_mime_type in supported_video_mimes
):
vertex_parts.append(
types.Part(
inline_data=types.Blob(
data=image_bytes,
mime_type=clean_mime_type,
)
)
)
print(
f"Added attachment {filename} ({attachment_type}) with MIME {clean_mime_type} to Vertex prompt"
)
else:
print(
f"Skipping attachment {filename} due to unsupported MIME type for Vertex: {mime_type}"
)
vertex_parts.append(
types.Part(
text=f"[System Note: Attachment '{filename}' of type '{mime_type}' was not processed as it's not directly supported for vision by the current model configuration.]"
)
)
except Exception as e:
print(f"Error processing attachment {filename} for Vertex AI: {e}")
vertex_parts.append(
types.Part(
text=f"[System Note: Error processing attachment '{filename}'.]"
)
)
# Get guild-specific model if configured, otherwise use default
guild_id = message.guild.id
model_id_to_use = get_guild_config(
guild_id, "AI_MODEL", DEFAULT_VERTEX_AI_MODEL
)
# Vertex model path is usually like "publishers/google/models/gemini-1.5-flash-001"
# If model_id_to_use is just "gemini-1.5-flash-001", prepend "publishers/google/models/"
if not model_id_to_use.startswith("publishers/google/models/"):
model_path = f"publishers/google/models/{model_id_to_use}"
else:
model_path = model_id_to_use
thinking_config = types.ThinkingConfig(thinking_budget=0)
generation_config = types.GenerateContentConfig(
temperature=0.2,
max_output_tokens=2000, # Ensure enough for JSON
safety_settings=STANDARD_SAFETY_SETTINGS,
thinking_config=thinking_config,
)
# Construct contents for Vertex AI API
# System prompt is handled by the model's configuration or as the first message if not directly supported in GenerateContentConfig.
# For Vertex AI with `genai.Client`, system prompt is often part of the model's configuration or the first message.
# The `genai.GenerativeModel` has `system_instruction`.
# Here, we'll build the `contents` list.
# The system prompt is part of the model's understanding, and the user prompt contains the task.
# For multi-turn, history is added to `contents`. Here, it's a single-turn request.
request_contents = [
# System prompt can be the first message if not using system_instruction in model
# types.Content(role="system", parts=[types.Part(text=system_prompt_text)]), # This is one way
# Or, rely on the model's pre-set system prompt and just send user data.
# For this moderation task, the detailed instructions are better sent as part of the user turn
# or a specific system instruction if the client/model supports it well.
# Let's include the system prompt as the first part of the user message for clarity with current structure.
# The `system_prompt_text` is already defined and will be the primary text part.
# The `user_context_text` is what we constructed.
# The `vertex_parts` contains the `user_context_text` and any image data.
types.Content(role="user", parts=vertex_parts)
]
try:
print(f"Querying Vertex AI model {model_path}...")
# Prepare the generation config with system instruction
# The existing 'generation_config' (lines 1063-1072) already has temperature, max_tokens, safety_settings.
# We need to add system_instruction to it.
final_generation_config = types.GenerateContentConfig(
temperature=generation_config.temperature, # from existing config
max_output_tokens=generation_config.max_output_tokens, # from existing config
safety_settings=generation_config.safety_settings, # from existing config
system_instruction=types.Content(
role="system", parts=[types.Part(text=system_prompt_text)]
),
thinking_config=generation_config.thinking_config, # from existing config
# response_mime_type="application/json", # Consider if model supports this for forcing JSON
)
client = get_genai_client_for_model(model_id_to_use)
response = await client.aio.models.generate_content(
model=model_path, # Correctly formatted model path
contents=request_contents, # User's message with context and images
config=final_generation_config, # Pass the config with system_instruction
)
ai_response_content = self._get_response_text(response)
print(response.usage_metadata) # Print usage metadata for debugging
if not ai_response_content:
print("Error: AI response content is empty or could not be extracted.")
# Log safety ratings if available
if (
response
and response.candidates
and response.candidates[0].safety_ratings
):
ratings = ", ".join(
[
f"{r.category.name}: {r.probability.name}"
for r in response.candidates[0].safety_ratings
]
)
print(f"Safety Ratings: {ratings}")
if (
response
and response.candidates
and response.candidates[0].finish_reason
):
print(f"Finish Reason: {response.candidates[0].finish_reason.name}")
return None
# Attempt to parse the JSON response from the AI
try:
# Clean potential markdown code blocks
if ai_response_content.startswith("```json"):
ai_response_content = ai_response_content.strip("```json\n").strip(
"`\n "
)
elif ai_response_content.startswith("```"):
ai_response_content = ai_response_content.strip("```\n").strip(
"`\n "
)
ai_decision = json.loads(ai_response_content)
# Basic validation of the parsed JSON structure
if (
not isinstance(ai_decision, dict)
or not all(
k in ai_decision
for k in ["violation", "rule_violated", "reasoning", "action"]
)
or not isinstance(ai_decision.get("violation"), bool)
):
print(
f"Error: AI response missing expected keys or 'violation' is not bool. Response: {ai_response_content}"
)
return None
print(f"AI Analysis Received: {ai_decision}")
return ai_decision
except json.JSONDecodeError as e:
print(
f"Error: Could not decode JSON response from AI: {e}. Response: {ai_response_content}"
)
return None
except Exception as e: # Catch other parsing errors
print(
f"Error parsing AI response structure: {e}. Response: {ai_response_content}"
)
return None
except google_exceptions.GoogleAPICallError as e:
print(f"Error calling Vertex AI API: {e}")
return None
except Exception as e:
print(
f"An unexpected error occurred during Vertex AI query for message {message.id}: {e}"
)
return None
async def handle_violation(
self,
message: discord.Message,
ai_decision: dict,
notify_mods_message: str = None,
link_urls: list[str] | None = None,
):
"""
Takes action based on the AI's violation decision.
Also transmits action info via HTTP POST with API key header.
"""
import datetime
import aiohttp
rule_violated = ai_decision.get("rule_violated", "Unknown")
reasoning = ai_decision.get("reasoning", "No reasoning provided.")
action = ai_decision.get(
"action", "NOTIFY_MODS"
).upper() # Default to notify mods
guild_id = message.guild.id # Get guild_id once
user_id = message.author.id # Get user_id once
moderator_role_id = get_guild_config(guild_id, "MODERATOR_ROLE_ID")
moderator_role = (
message.guild.get_role(moderator_role_id) if moderator_role_id else None
)
mod_ping = (
moderator_role.mention
if moderator_role
else f"Moderators (Role ID {moderator_role_id} not found)"
)
current_timestamp_iso = datetime.datetime.now(datetime.timezone.utc).isoformat()
# Get the model from guild config, fall back to global default
model_used = get_guild_config(guild_id, "AI_MODEL", DEFAULT_VERTEX_AI_MODEL)
# --- Adjust action for first-time offenses ---
user_history_list = get_user_infraction_history(guild_id, user_id)
if action == "BAN" and not user_history_list:
combined_text = f"{rule_violated} {reasoning}".lower()
severe = False
if "gore" in combined_text:
severe = True
elif "csam" in combined_text:
severe = True
elif (
"pedophilia" in combined_text
or "child" in combined_text
or "5a" in combined_text
or "5" in combined_text
):
real_indicators = [
"real",
"real-life",
"real life",
"irl",
"photo",
"photograph",
"video",
]
if any(indicator in combined_text for indicator in real_indicators):
severe = True
if not severe:
print(
"Downgrading BAN to TIMEOUT_LONG due to first offense and lack of severe content."
)
action = "TIMEOUT_LONG"
# --- Prepare Notification ---
notification_embed = discord.Embed(
title="🚨 Rule Violation Detected 🚨",
description=f"AI analysis detected a violation of server rules.",
color=discord.Color.red(),
)
notification_embed.add_field(
name="User",
value=f"{message.author.mention} (`{message.author.id}`)",
inline=False,
)
notification_embed.add_field(
name="Channel", value=message.channel.mention, inline=False
)
notification_embed.add_field(
name="Rule Violated", value=f"**Rule {rule_violated}**", inline=True
)
notification_embed.add_field(
name="AI Suggested Action", value=f"`{action}`", inline=True
)
notification_embed.add_field(
name="AI Reasoning", value=f"_{reasoning}_", inline=False
)
notification_embed.add_field(
name="Message Link",
value=f"[Jump to Message]({message.jump_url})",
inline=False,
)
# Log message content and attachments for audit purposes
msg_content = message.content if message.content else "*No text content*"
notification_embed.add_field(
name="Message Content", value=msg_content[:1024], inline=False
)
# Add attachment information if present
if message.attachments:
attachment_info = []
for i, attachment in enumerate(message.attachments):
attachment_info.append(
f"{i+1}. {attachment.filename} ({attachment.content_type}) - [Link]({attachment.url})"
)
attachment_text = "\n".join(attachment_info)
notification_embed.add_field(
name="Attachments", value=attachment_text[:1024], inline=False
)
# Add the first image as a thumbnail if it's an image type
for attachment in message.attachments:
if any(
attachment.filename.lower().endswith(ext)
for ext in self.image_extensions
+ self.gif_extensions
+ self.video_extensions
):
notification_embed.set_thumbnail(url=attachment.url)
break
# Use the model_used variable that was defined earlier
notification_embed.set_footer(
text=f"AI Model: {model_used}. Learnhelp AI Moderation."
)
notification_embed.timestamp = (
discord.utils.utcnow()
) # Using discord.utils.utcnow() which is still supported
action_taken_message = "" # To append to the notification
testing_mode = get_guild_config(guild_id, "TESTING_MODE", False)
if testing_mode:
action_taken_message = (
f"[TEST MODE] Would have taken action `{action}`. No changes made."
)
notification_embed.color = discord.Color.greyple()
log_channel_id = get_guild_config(message.guild.id, "MOD_LOG_CHANNEL_ID")
log_channel = (
self.bot.get_channel(log_channel_id)
if log_channel_id
else message.channel
)
if action == "SUICIDAL":
suicidal_role_id = get_guild_config(
message.guild.id, "SUICIDAL_PING_ROLE_ID"
)
suicidal_role = (
message.guild.get_role(suicidal_role_id)
if suicidal_role_id
else None
)
ping_target = (
suicidal_role.mention
if suicidal_role
else f"Role ID {suicidal_role_id} (Suicidal Content)"
)
if not suicidal_role:
print(f"ERROR: Suicidal ping role ID {suicidal_role_id} not found.")
final_message = f"{ping_target}\n{action_taken_message}"
else:
suggestions_id = get_guild_config(
message.guild.id, "SUGGESTIONS_CHANNEL_ID"
)
suggestion_note = (
f"\nPlease review <#{suggestions_id}> for rule updates."
if suggestions_id
else ""
)
final_message = f"{mod_ping}\n{action_taken_message}{suggestion_note}"
await log_channel.send(
content=final_message,
embed=notification_embed,
view=self.QuickActionView(self, message.author),
)
return
# --- Perform Actions ---
try:
if action == "BAN":
action_taken_message = (
f"Action Taken: User **BANNED** and message deleted."
)
notification_embed.color = discord.Color.dark_red()
try:
await message.delete()
except discord.NotFound:
print("Message already deleted before banning.")
except discord.Forbidden:
print(
f"WARNING: Missing permissions to delete message before banning user {message.author}."
)
action_taken_message += (
" (Failed to delete message - check permissions)"
)
ban_reason = f"AI Mod: Rule {rule_violated}. Reason: {reasoning}"
await message.guild.ban(
message.author, reason=ban_reason, delete_message_days=1
)
print(
f"BANNED user {message.author} for violating rule {rule_violated}."
)
await add_user_infraction(
guild_id,
user_id,
rule_violated,
"BAN",
reasoning,
current_timestamp_iso,
message.id,
message.channel.id,
message.content[:100] if message.content else "",
[a.url for a in message.attachments] + (link_urls or []),
)
elif action == "KICK":
action_taken_message = (
f"Action Taken: User **KICKED** and message deleted."
)
notification_embed.color = discord.Color.from_rgb(
255, 127, 0
) # Dark Orange
try:
await message.delete()
except discord.NotFound:
print("Message already deleted before kicking.")
except discord.Forbidden:
print(
f"WARNING: Missing permissions to delete message before kicking user {message.author}."
)
action_taken_message += (
" (Failed to delete message - check permissions)"
)
kick_reason = f"AI Mod: Rule {rule_violated}. Reason: {reasoning}"
await message.author.kick(reason=kick_reason)
print(
f"KICKED user {message.author} for violating rule {rule_violated}."
)
await add_user_infraction(
guild_id,
user_id,
rule_violated,
"KICK",
reasoning,
current_timestamp_iso,
message.id,
message.channel.id,
message.content[:100] if message.content else "",
[a.url for a in message.attachments] + (link_urls or []),
)
elif action.startswith("TIMEOUT"):
duration_seconds = 0
duration_readable = ""
if action == "TIMEOUT_SHORT":
duration_seconds = 10 * 60 # 10 minutes
duration_readable = "10 minutes"
elif action == "TIMEOUT_MEDIUM":
duration_seconds = 60 * 60 # 1 hour
duration_readable = "1 hour"
elif action == "TIMEOUT_LONG":
duration_seconds = 24 * 60 * 60 # 1 day
duration_readable = "1 day"
if duration_seconds > 0:
action_taken_message = f"Action Taken: User **TIMED OUT for {duration_readable}** and message deleted."
notification_embed.color = discord.Color.blue()
try:
await message.delete()
except discord.NotFound:
print(
f"Message already deleted before timeout for {message.author}."
)
except discord.Forbidden:
print(
f"WARNING: Missing permissions to delete message before timeout for {message.author}."
)
action_taken_message += (
" (Failed to delete message - check permissions)"
)
timeout_reason = (
f"AI Mod: Rule {rule_violated}. Reason: {reasoning}"
)
# discord.py timeout takes a timedelta object
await message.author.timeout(
discord.utils.utcnow()
+ datetime.timedelta(seconds=duration_seconds),
reason=timeout_reason,
)
print(
f"TIMED OUT user {message.author} for {duration_readable} for violating rule {rule_violated}."
)
await add_user_infraction(
guild_id,
user_id,
rule_violated,
action,
reasoning,
current_timestamp_iso,
message.id,
message.channel.id,
message.content[:100] if message.content else "",
[a.url for a in message.attachments] + (link_urls or []),
)
else:
action_taken_message = (
"Action Taken: **Unknown timeout duration, notifying mods.**"
)
action = (
"NOTIFY_MODS" # Fallback if timeout duration is not recognized
)
print(
f"Unknown timeout duration for action {action}. Defaulting to NOTIFY_MODS."
)
elif action == "DELETE":
action_taken_message = f"Action Taken: Message **DELETED**."
await message.delete()
print(
f"DELETED message from {message.author} for violating rule {rule_violated}."
)
# Typically, a simple delete isn't a formal infraction unless it's part of a WARN.
# If you want to log deletes as infractions, add:
# add_user_infraction(guild_id, user_id, rule_violated, "DELETE", reasoning, current_timestamp_iso)
elif action == "WARN":
action_taken_message = (
f"Action Taken: Message **DELETED** (AI suggested WARN)."
)
notification_embed.color = discord.Color.orange()
await message.delete() # Warnings usually involve deleting the offending message
print(
f"DELETED message from {message.author} (AI suggested WARN for rule {rule_violated})."
)
try:
dm_channel = await message.author.create_dm()
warn_embed = discord.Embed(
title="⚠️ Moderation Warning",
description=(
f"Your recent message in **{message.guild.name}** was removed for violating **Rule {rule_violated}**."
),
color=discord.Color.orange(),
)
if message.content:
warn_embed.add_field(
name="Message Content",
value=message.content[:1024],
inline=False,
)
warn_embed.add_field(name="Reason", value=reasoning, inline=False)
warn_embed.set_footer(
text="Please review the server rules. This is a formal warning."
)
await dm_channel.send(embed=warn_embed)
action_taken_message += " User notified via DM with warning."
except discord.Forbidden:
print(
f"Could not DM warning to {message.author} (DMs likely disabled)."
)
action_taken_message += " (Could not DM user for warning)."
except Exception as e:
print(f"Error sending warning DM to {message.author}: {e}")
action_taken_message += " (Error sending warning DM)."
await add_user_infraction(
guild_id,
user_id,
rule_violated,
"WARN",
reasoning,
current_timestamp_iso,
message.id,
message.channel.id,
message.content[:100] if message.content else "",
[a.url for a in message.attachments] + (link_urls or []),
)
elif action == "NOTIFY_MODS":
action_taken_message = "Action Taken: **Moderator review requested.**"
notification_embed.color = discord.Color.gold()
print(
f"Notifying moderators about potential violation (Rule {rule_violated}) by {message.author}."
)
# NOTIFY_MODS itself isn't an infraction on the user, but a request for human review.
# If mods take action, they would log it manually or via a mod command.
if notify_mods_message:
notification_embed.add_field(
name="Additional Mod Message",
value=notify_mods_message,
inline=False,
)
elif action == "SUICIDAL":
action_taken_message = (
"Action Taken: **User DMed resources, relevant role notified.**"
)
# No infraction is typically logged for "SUICIDAL" as it's a support action.
notification_embed.title = "🚨 Suicidal Content Detected 🚨"
notification_embed.color = (
discord.Color.dark_purple()
) # A distinct color
notification_embed.description = "AI analysis detected content indicating potential suicidal ideation."
print(
f"SUICIDAL content detected from {message.author}. DMing resources and notifying role."
)
# DM the user with help resources
try:
dm_channel = await message.author.create_dm()
await dm_channel.send(SUICIDAL_HELP_RESOURCES)
action_taken_message += " User successfully DMed."
except discord.Forbidden:
print(
f"Could not DM suicidal help resources to {message.author} (DMs likely disabled)."
)
action_taken_message += " (Could not DM user - DMs disabled)."
except Exception as e:
print(
f"Error sending suicidal help resources DM to {message.author}: {e}"
)
action_taken_message += f" (Error DMing user: {e})."
# The message itself is usually not deleted for suicidal content, to allow for intervention.
# If deletion is desired, add: await message.delete() here.
else: # Includes "IGNORE" or unexpected actions
if ai_decision.get(
"violation"
): # If violation is true but action is IGNORE
action_taken_message = "Action Taken: **None** (AI suggested IGNORE despite flagging violation - Review Recommended)."
notification_embed.color = discord.Color.light_grey()
print(
f"AI flagged violation ({rule_violated}) but suggested IGNORE for message by {message.author}. Notifying mods for review."
)
else:
# This case shouldn't be reached if called correctly, but handle defensively
print(
f"No action taken for message by {message.author} (AI Action: {action}, Violation: False)"
)
return # Don't notify if no violation and action is IGNORE
# --- Send Notification to Moderators/Relevant Role ---
log_channel_id = get_guild_config(message.guild.id, "MOD_LOG_CHANNEL_ID")
log_channel = (
self.bot.get_channel(log_channel_id) if log_channel_id else None
)
if not log_channel:
print(
f"ERROR: Moderation log channel (ID: {log_channel_id}) not found or not configured. Defaulting to message channel."
)
log_channel = message.channel
if not log_channel:
print(
f"ERROR: Could not find even the original message channel {message.channel.id} to send notification."
)
return
if action == "SUICIDAL":
suicidal_role_id = get_guild_config(
message.guild.id, "SUICIDAL_PING_ROLE_ID"
)
suicidal_role = (
message.guild.get_role(suicidal_role_id)
if suicidal_role_id
else None
)
ping_target = (
suicidal_role.mention
if suicidal_role
else f"Role ID {suicidal_role_id} (Suicidal Content)"
)
if not suicidal_role:
print(f"ERROR: Suicidal ping role ID {suicidal_role_id} not found.")
final_message = f"{ping_target}\n{action_taken_message}"
await log_channel.send(
content=final_message,
embed=notification_embed,
view=self.QuickActionView(self, message.author),
)
elif moderator_role: # For other violations
suggestions_id = get_guild_config(
message.guild.id, "SUGGESTIONS_CHANNEL_ID"
)
suggestion_note = (
f"\nPlease review <#{suggestions_id}> for rule updates."
if suggestions_id
else ""
)
final_message = f"{mod_ping}\n{action_taken_message}{suggestion_note}"
await log_channel.send(
content=final_message,
embed=notification_embed,
view=self.QuickActionView(self, message.author),
)
else: # Fallback if moderator role is also not found for non-suicidal actions
print(
f"ERROR: Moderator role ID {moderator_role_id} not found for action {action}."
)
except discord.Forbidden as e:
print(
f"ERROR: Missing Permissions to perform action '{action}' for rule {rule_violated}. Details: {e}"
)
# Try to notify mods about the failure
if moderator_role:
try:
await message.channel.send(
f"{mod_ping} **PERMISSION ERROR!** Could not perform action `{action}` on message by {message.author.mention} "
f"for violating Rule {rule_violated}. Please check bot permissions.\n"
f"Reasoning: _{reasoning}_\nMessage Link: {message.jump_url}"
)
except discord.Forbidden:
print(
"FATAL: Bot lacks permission to send messages, even error notifications."
)
except discord.NotFound:
print(
f"Message {message.id} was likely already deleted when trying to perform action '{action}'."
)
except Exception as e:
print(
f"An unexpected error occurred during action execution for message {message.id}: {e}"
)
# Try to notify mods about the unexpected error
if moderator_role:
try:
await message.channel.send(
f"{mod_ping} **UNEXPECTED ERROR!** An error occurred while handling rule violation "
f"for {message.author.mention}. Please check bot logs.\n"
f"Rule: {rule_violated}, Action Attempted: {action}\nMessage Link: {message.jump_url}"
)
except discord.Forbidden:
print(
"FATAL: Bot lacks permission to send messages, even error notifications."
)
async def run_appeal_ai(
self,
guild: discord.Guild,
member: discord.User,
action: str,
appeal_text: str,
infraction: dict | None = None,
) -> str:
"""Run the appeal text through the higher tier AI model."""
if not self.genai_client:
return "AI review unavailable."
history = get_user_infraction_history(guild.id, member.id)
history_text = json.dumps(history, indent=2) if history else "None"
system_prompt = (
"You are reviewing a user's appeal of a moderation action. "
"Think very extensively about the appeal, the provided history, and the server rules. "
"Return a short verdict (UPHOLD or OVERTURN) and your reasoning in plain text."
)
context_lines = []
if infraction:
channel_name = guild.get_channel(infraction.get("channel_id", 0))
channel_display = (
channel_name.name if channel_name else str(infraction.get("channel_id"))
)
context_lines.append(f"Original Channel: {channel_display}")
msg_content = infraction.get("message_content")
if msg_content:
context_lines.append(f"Message Snippet: {msg_content}")
attachments = infraction.get("attachments")
if attachments:
context_lines.append(f"Attachments: {attachments}")
reasoning = infraction.get("reasoning")
if reasoning:
context_lines.append(f"AI Reasoning: {reasoning}")
user_prompt = (
f"Server Rules:\n{SERVER_RULES}\n\n"
f"User History:\n{history_text}\n\n"
+ ("\n".join(context_lines) + "\n\n" if context_lines else "")
+ f"Action Appealed: {action}\n"
f"Appeal Text: {appeal_text}"
)
generation_config = types.GenerateContentConfig(
temperature=0.2,
max_output_tokens=8192,
safety_settings=STANDARD_SAFETY_SETTINGS,
thinking_config=types.ThinkingConfig(
thinking_budget=APPEAL_AI_THINKING_BUDGET
),
system_instruction=types.Content(
role="system", parts=[types.Part(text=system_prompt)]
),
)
try:
client = get_genai_client_for_model(APPEAL_AI_MODEL)
response = await client.aio.models.generate_content(
model=f"publishers/google/models/{APPEAL_AI_MODEL}",
contents=[
types.Content(role="user", parts=[types.Part(text=user_prompt)])
],
config=generation_config,
)
result = self._get_response_text(response)
return result or "AI review failed to produce output."
except Exception as e: # noqa: BLE001
print(f"Appeal AI error: {e}")
return "AI review encountered an error."
async def _moderate_message(
self, message: discord.Message, event_name: str
) -> None:
"""Run moderation checks on a message."""
print(f"{event_name} triggered for message ID: {message.id}")
# --- Basic Checks ---
# Ignore messages from bots (including self)
if message.author.bot:
print(f"Ignoring message {message.id} from bot.")
return
link_urls: list[str] = []
embed_urls = [embed.url for embed in message.embeds if embed.url]
link_urls = (
self.extract_direct_attachment_urls(" ".join(embed_urls))
if embed_urls
else []
)
# Ignore messages without content, attachments, or direct attachment links
if not message.content and not message.attachments and not link_urls:
print(f"Ignoring message {message.id} with no content or attachments.")
return
# Ignore DMs
if not message.guild:
print(f"Ignoring message {message.id} from DM.")
return
# Check if moderation is enabled for this guild
if not get_guild_config(message.guild.id, "ENABLED", False):
print(
f"Moderation disabled for guild {message.guild.id}. Ignoring message {message.id}."
)
return
if get_guild_config(message.guild.id, "EVENT_MODE", False):
print(
f"Event mode enabled for guild {message.guild.id}. Ignoring message {message.id}."
)
return
# --- Suicidal Content Check ---
# Suicidal keyword check removed; handled by OpenRouter AI moderation.
# --- Prepare for AI Analysis ---
message_content = message.content
# Check for attachments
image_data_list = []
if message.attachments:
# Process all attachments
for attachment in message.attachments:
mime_type, image_bytes, attachment_type = await self.process_attachment(
attachment
)
if mime_type and image_bytes and attachment_type:
image_data_list.append(
(mime_type, image_bytes, attachment_type, attachment.filename)
)
print(
f"Processed attachment: {attachment.filename} as {attachment_type}"
)
# Log the number of attachments processed
if image_data_list:
print(
f"Processed {len(image_data_list)} attachments for message {message.id}"
)
# Check for direct link attachments in the message content
if link_urls:
processed_links = 0
for url in link_urls:
mime_type, image_bytes, attachment_type, filename = (
await self.process_url_attachment(url)
)
if mime_type and image_bytes and attachment_type:
image_data_list.append(
(mime_type, image_bytes, attachment_type, filename)
)
processed_links += 1
print(
f"Processed linked attachment: {filename} as {attachment_type}"
)
if processed_links > 0:
print(
f"Processed {processed_links} linked attachments for message {message.id}"
)
# Only proceed with AI analysis if there's text to analyze or attachments
if not message_content and not image_data_list:
print(
f"Ignoring message {message.id} with no content or valid attachments."
)
return
# NSFW channel check removed - AI will handle this context
# --- Call AI for Analysis (All Rules) ---
# Check if the Vertex AI client is available
if not self.genai_client:
print(
f"Skipping AI analysis for message {message.id}: Vertex AI client is not initialized."
)
return
# Prepare user history for the AI
infractions = get_user_infraction_history(message.guild.id, message.author.id)
history_summary_parts = []
if infractions:
for infr in infractions:
history_summary_parts.append(
f"- Action: {infr.get('action_taken', 'N/A')} for Rule {infr.get('rule_violated', 'N/A')} on {infr.get('timestamp', 'N/A')[:10]}. Reason: {infr.get('reasoning', 'N/A')[:50]}..."
)
user_history_summary = (
"\n".join(history_summary_parts)
if history_summary_parts
else "No prior infractions recorded."
)
# Limit history summary length to prevent excessively long prompts
max_history_len = 500
if len(user_history_summary) > max_history_len:
user_history_summary = user_history_summary[: max_history_len - 3] + "..."
print(
f"Analyzing message {message.id} from {message.author} in #{message.channel.name} with history..."
)
if image_data_list:
attachment_types = [data[2] for data in image_data_list]
print(
f"Including {len(image_data_list)} attachments in analysis: {', '.join(attachment_types)}"
)
ai_decision = await self.query_vertex_ai(
message, message_content, user_history_summary, image_data_list
)
# --- Process AI Decision ---
if not ai_decision:
print(f"Failed to get valid AI decision for message {message.id}.")
# Optionally notify mods about AI failure if it happens often
# Store the failure attempt for debugging
self.last_ai_decisions.append(
{
"message_id": message.id,
"author_name": str(message.author),
"author_id": message.author.id,
"message_content_snippet": (
message.content[:100] + "..."
if len(message.content) > 100
else message.content
),
"timestamp": datetime.datetime.now(
datetime.timezone.utc
).isoformat(),
"ai_decision": {
"error": "Failed to get valid AI decision",
"raw_response": None,
}, # Simplified error logging
}
)
return # Stop if AI fails or returns invalid data
# Store the AI decision regardless of violation status
self.last_ai_decisions.append(
{
"message_id": message.id,
"author_name": str(message.author),
"author_id": message.author.id,
"message_content_snippet": (
message.content[:100] + "..."
if len(message.content) > 100
else message.content
),
"timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"ai_decision": ai_decision,
}
)
# Check if the AI flagged a violation
if ai_decision.get("violation"):
# Handle the violation based on AI decision without overrides
# Pass notify_mods_message if the action is NOTIFY_MODS
notify_mods_message = (
ai_decision.get("notify_mods_message")
if ai_decision.get("action") == "NOTIFY_MODS"
else None
)
await self.handle_violation(
message, ai_decision, notify_mods_message, link_urls
)
else:
# AI found no violation
print(
f"AI analysis complete for message {message.id}. No violation detected."
)
@commands.Cog.listener(name="on_message")
async def message_listener(self, message: discord.Message) -> None:
"""Trigger moderation when a new message is sent."""
await self._moderate_message(message, "on_message")
@commands.Cog.listener(name="on_message_edit")
async def message_edit_listener(
self, before: discord.Message, after: discord.Message
) -> None:
"""Trigger moderation when a message is edited."""
await self._moderate_message(after, "on_message_edit")
@debug_subgroup.command(
name="last_decisions",
description="View the last 5 AI moderation decisions (admin only).",
)
@app_commands.checks.has_permissions(administrator=True)
async def aidebug_last_decisions(self, interaction: discord.Interaction):
if not self.last_ai_decisions:
await interaction.response.send_message(
"No AI decisions have been recorded yet.", ephemeral=True
)
return
embed = discord.Embed(
title="Last 5 AI Moderation Decisions", color=discord.Color.purple()
)
embed.timestamp = discord.utils.utcnow()
for i, record in enumerate(
reversed(list(self.last_ai_decisions))
): # Show newest first
decision_info = record.get("ai_decision", {})
violation = decision_info.get("violation", "N/A")
rule_violated = decision_info.get("rule_violated", "N/A")
reasoning = decision_info.get("reasoning", "N/A")
action = decision_info.get("action", "N/A")
error_msg = decision_info.get("error")
field_value = (
f"**Author:** {record.get('author_name', 'N/A')} ({record.get('author_id', 'N/A')})\n"
f"**Message ID:** {record.get('message_id', 'N/A')}\n"
f"**Content Snippet:** ```{record.get('message_content_snippet', 'N/A')}```\n"
f"**Timestamp:** {record.get('timestamp', 'N/A')[:19].replace('T', ' ')}\n"
)
if error_msg:
field_value += f"**Status:** <font color='red'>Error during processing: {error_msg}</font>\n"
else:
field_value += (
f"**Violation:** {violation}\n"
f"**Rule Violated:** {rule_violated}\n"
f"**Action:** {action}\n"
f"**Reasoning:** ```{reasoning}```\n"
)
# Truncate field_value if it's too long for an embed field
if len(field_value) > 1024:
field_value = field_value[:1020] + "..."
embed.add_field(
name=f"Decision #{len(self.last_ai_decisions) - i}",
value=field_value,
inline=False,
)
if (
len(embed.fields) >= 5
): # Limit to 5 fields in one embed for very long entries, or send multiple embeds
break
if not embed.fields: # Should not happen if self.last_ai_decisions is not empty
await interaction.response.send_message(
"Could not format AI decisions.", ephemeral=True
)
return
await interaction.followup.send(embed=embed, ephemeral=True)
@aidebug_last_decisions.error
async def aidebug_last_decisions_error(
self, interaction: discord.Interaction, error: app_commands.AppCommandError
):
if isinstance(error, app_commands.MissingPermissions):
await interaction.response.send_message(
"You must be an administrator to use this command.", ephemeral=True
)
else:
await interaction.response.send_message(
f"An error occurred: {error}", ephemeral=True
)
print(f"Error in aidebug_last_decisions command: {error}")
@debug_subgroup.command(
name="appeal_tests",
description="Run sample appeals through the AI reviewer (admin only).",
)
@app_commands.checks.has_permissions(administrator=True)
async def aidebug_appeal_tests(self, interaction: discord.Interaction):
"""Run a few hardcoded appeals through the appeal AI for testing."""
await interaction.response.defer(thinking=True, ephemeral=True)
scenarios = [
("WARN", "I was excited and sent many messages quickly."),
("MUTE", "I only quoted a meme and it was taken as harassment."),
("BAN", "I posted NSFW art but believed it was allowed."),
]
results = []
guild = interaction.guild
for idx, (action, text) in enumerate(scenarios, 1):
dummy_infraction = {
"message_id": 2000 + idx,
"channel_id": interaction.channel.id,
"message_content": f"Test offending message {idx}",
"attachments": [],
"reasoning": "Initial moderation reasoning sample",
}
review = await self.run_appeal_ai(
guild, interaction.user, action, text, dummy_infraction
)
results.append((action, text, review))
embed = discord.Embed(
title="Appeal AI Test Results", color=discord.Color.green()
)
embed.timestamp = discord.utils.utcnow()
for idx, (action, text, review) in enumerate(results, 1):
value = (
f"**Action:** {action}\n**Appeal:** {text}\n**AI Verdict:** {review}"
)
if len(value) > 1024:
value = value[:1020] + "..."
embed.add_field(name=f"Scenario {idx}", value=value, inline=False)
await interaction.followup.send(embed=embed, ephemeral=True)
# Setup function required by discord.py to load the cog
async def setup(bot: commands.Bot):
"""Loads the AIModerationCog."""
# The API key is now fetched in cog_load, so we don't need to check here.
await bot.add_cog(AIModerationCog(bot))
print("AIModerationCog has been loaded.")