discordbot/cogs/aimod_cog.py

2320 lines
103 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# moderation_cog.py
import discord
from discord.ext import commands
from discord import app_commands
# import aiohttp # For making asynchronous HTTP requests - Replaced by Google GenAI client
import json
import os # To load environment variables
import collections # For deque
import datetime # For timestamps
import io # For BytesIO operations
import base64 # For encoding images to base64
from PIL import Image # For image processing
import cv2 # For video processing
import numpy as np # For array operations
import tempfile # For temporary file operations
import shutil # For backing up files
from typing import Optional, List, Dict, Any, Tuple # For type hinting
import asyncio
import aiofiles
# Google Generative AI Imports (using Vertex AI backend)
from google import genai
from google.genai import types
from google.api_core import exceptions as google_exceptions
# Import project configuration for Vertex AI
from gurt.config import (
PROJECT_ID,
LOCATION,
) # Assuming gurt.config exists and has these
from . import aimod_config as aimod_config_module
from .aimod_config import (
DEFAULT_VERTEX_AI_MODEL,
STANDARD_SAFETY_SETTINGS,
MOD_LOG_API_SECRET_ENV_VAR,
GUILD_CONFIG_PATH,
USER_INFRACTIONS_PATH,
INFRACTION_BACKUP_DIR,
CONFIG_LOCK,
save_user_infractions,
get_guild_config,
set_guild_config,
get_user_infraction_history,
add_user_infraction,
SERVER_RULES,
SUICIDAL_HELP_RESOURCES,
)
# Avoid loading an excessive number of messages when updating rules
MAX_RULE_MESSAGES = 25
class AIModerationCog(commands.Cog):
"""
A Discord Cog that uses Google Vertex AI to moderate messages based on server rules.
"""
def __init__(self, bot: commands.Bot):
self.bot = bot
self.genai_client = None
try:
if PROJECT_ID and LOCATION:
self.genai_client = genai.Client(
vertexai=True,
project=PROJECT_ID,
location=LOCATION,
)
print(
f"AIModerationCog: Google GenAI Client initialized for Vertex AI project '{PROJECT_ID}' in location '{LOCATION}'."
)
else:
print(
"AIModerationCog: PROJECT_ID or LOCATION not found in config. Google GenAI Client not initialized."
)
except Exception as e:
print(
f"AIModerationCog: Error initializing Google GenAI Client for Vertex AI: {e}"
)
self.last_ai_decisions = collections.deque(
maxlen=5
) # Store last 5 AI decisions
self.config_lock = CONFIG_LOCK
# Supported image file extensions
self.image_extensions = [
".jpg",
".jpeg",
".png",
".webp",
".bmp",
".heic",
".heif",
] # Added heic/heif for Vertex
# Supported animated file extensions
self.gif_extensions = [".gif"]
# Supported video file extensions (Vertex AI can process short video clips directly)
self.video_extensions = [
".mp4",
".webm",
".mov",
".avi",
".mkv",
".flv",
] # Expanded list
self.backup_task = self.bot.loop.create_task(
self.backup_infractions_periodically()
)
print("AIModerationCog Initialized.")
def is_testing_mode(self, guild_id: int) -> bool:
"""Return True if testing mode is enabled for the guild."""
return get_guild_config(guild_id, "TESTING_MODE", False)
class QuickActionView(discord.ui.View):
"""Buttons for quick moderator actions."""
def __init__(self, parent: "AIModerationCog", target: discord.Member):
super().__init__(timeout=3600)
self.parent = parent
self.target = target
self.message: discord.Message | None = None
# --- Helper Modals ---
class BanModal(discord.ui.Modal, title="Ban User"):
reason = discord.ui.TextInput(
label="Reason",
placeholder="Reason for ban",
style=discord.TextStyle.paragraph,
required=False,
max_length=512,
)
def __init__(self, view: "AIModerationCog.QuickActionView"):
super().__init__()
self.view = view
async def on_submit(self, interaction: discord.Interaction):
if not interaction.user.guild_permissions.ban_members:
await interaction.response.send_message(
"You lack permission to ban members.", ephemeral=True
)
return
if self.view.parent.is_testing_mode(interaction.guild.id):
await interaction.response.send_message(
f"[TEST MODE] Would ban {self.view.target.mention}.",
ephemeral=True,
)
return
try:
await self.view.target.ban(
reason=self.reason.value or "Escalated via mod panel"
)
await interaction.response.send_message(
f"Banned {self.view.target.mention}.", ephemeral=True
)
except Exception as e: # noqa: BLE001
await interaction.response.send_message(
f"Failed to ban: {e}", ephemeral=True
)
self.view.disable_all_items()
if self.view.message:
await self.view.message.edit(view=self.view)
class KickModal(discord.ui.Modal, title="Kick User"):
reason = discord.ui.TextInput(
label="Reason",
placeholder="Reason for kick",
style=discord.TextStyle.paragraph,
required=False,
max_length=512,
)
def __init__(self, view: "AIModerationCog.QuickActionView"):
super().__init__()
self.view = view
async def on_submit(self, interaction: discord.Interaction):
if not interaction.user.guild_permissions.kick_members:
await interaction.response.send_message(
"You lack permission to kick members.", ephemeral=True
)
return
if self.view.parent.is_testing_mode(interaction.guild.id):
await interaction.response.send_message(
f"[TEST MODE] Would kick {self.view.target.mention}.",
ephemeral=True,
)
return
try:
await self.view.target.kick(
reason=self.reason.value or "Escalated via mod panel"
)
await interaction.response.send_message(
f"Kicked {self.view.target.mention}.", ephemeral=True
)
except Exception as e: # noqa: BLE001
await interaction.response.send_message(
f"Failed to kick: {e}", ephemeral=True
)
self.view.disable_all_items()
if self.view.message:
await self.view.message.edit(view=self.view)
class TimeoutModal(discord.ui.Modal, title="Timeout User"):
duration = discord.ui.TextInput(
label="Duration",
placeholder="e.g. 10m, 1h, 1d",
required=True,
max_length=10,
)
reason = discord.ui.TextInput(
label="Reason",
placeholder="Reason for timeout",
style=discord.TextStyle.paragraph,
required=False,
max_length=512,
)
def __init__(self, view: "AIModerationCog.QuickActionView"):
super().__init__()
self.view = view
@staticmethod
def parse_duration(duration_str: str) -> datetime.timedelta | None:
if not duration_str:
return None
try:
amount = int("".join(filter(str.isdigit, duration_str)))
unit = "".join(filter(str.isalpha, duration_str)).lower()
if unit in {"d", "day", "days"}:
return datetime.timedelta(days=amount)
if unit in {"h", "hour", "hours"}:
return datetime.timedelta(hours=amount)
if unit in {"m", "min", "minute", "minutes"}:
return datetime.timedelta(minutes=amount)
if unit in {"s", "sec", "second", "seconds"}:
return datetime.timedelta(seconds=amount)
except (ValueError, TypeError):
return None
return None
async def on_submit(self, interaction: discord.Interaction):
if not interaction.user.guild_permissions.moderate_members:
await interaction.response.send_message(
"You lack permission to timeout members.", ephemeral=True
)
return
if self.view.parent.is_testing_mode(interaction.guild.id):
await interaction.response.send_message(
f"[TEST MODE] Would timeout {self.view.target.mention} for {self.duration.value}.",
ephemeral=True,
)
return
delta = self.parse_duration(self.duration.value)
if not delta or delta > datetime.timedelta(days=28):
await interaction.response.send_message(
"Invalid duration. Use formats like '10m', '1h', '1d'",
ephemeral=True,
)
return
try:
until = discord.utils.utcnow() + delta
await self.view.target.timeout(
until, reason=self.reason.value or "Escalated via mod panel"
)
await interaction.response.send_message(
f"Timed out {self.view.target.mention} for {self.duration.value}.",
ephemeral=True,
)
except Exception as e: # noqa: BLE001
await interaction.response.send_message(
f"Failed to timeout: {e}", ephemeral=True
)
self.view.disable_all_items()
if self.view.message:
await self.view.message.edit(view=self.view)
@discord.ui.button(label="Escalate Ban", style=discord.ButtonStyle.danger)
async def escalate(
self, interaction: discord.Interaction, button: discord.ui.Button
):
if not interaction.user.guild_permissions.ban_members:
await interaction.response.send_message(
"You lack permission to ban members.", ephemeral=True
)
return
self.message = interaction.message
await interaction.response.send_modal(self.BanModal(self))
@discord.ui.button(label="Kick", style=discord.ButtonStyle.primary)
async def kick(
self, interaction: discord.Interaction, button: discord.ui.Button
):
if not interaction.user.guild_permissions.kick_members:
await interaction.response.send_message(
"You lack permission to kick members.", ephemeral=True
)
return
self.message = interaction.message
await interaction.response.send_modal(self.KickModal(self))
@discord.ui.button(label="Timeout", style=discord.ButtonStyle.secondary)
async def timeout_action(
self, interaction: discord.Interaction, button: discord.ui.Button
):
if not interaction.user.guild_permissions.moderate_members:
await interaction.response.send_message(
"You lack permission to timeout members.", ephemeral=True
)
return
self.message = interaction.message
await interaction.response.send_modal(self.TimeoutModal(self))
@discord.ui.button(label="Ignore", style=discord.ButtonStyle.secondary)
async def ignore(
self, interaction: discord.Interaction, button: discord.ui.Button
):
if interaction.user.guild_permissions.manage_messages:
await interaction.message.delete()
await interaction.response.send_message(
"Notification dismissed.", ephemeral=True
)
else:
await interaction.response.send_message(
"No permission to manage messages.", ephemeral=True
)
async def cog_load(self):
"""Called when the cog is loaded."""
print("AIModerationCog cog_load started.")
if not self.genai_client:
print("\n" + "=" * 60)
print(
"=== WARNING: AIModerationCog - Vertex AI Client not initialized! ==="
)
print("=== The Moderation Cog requires a valid Vertex AI setup. ===")
print(
f"=== Check PROJECT_ID and LOCATION in gurt.config and GCP authentication. ==="
)
print("=" * 60 + "\n")
else:
print("AIModerationCog: Vertex AI Client seems to be initialized.")
print("AIModerationCog cog_load finished.")
# _load_openrouter_models is no longer needed.
async def cog_unload(self):
"""Clean up when the cog is unloaded."""
# The genai.Client doesn't have an explicit close method in the same way aiohttp.ClientSession does.
# It typically manages its own resources.
print("AIModerationCog Unloaded.")
if self.backup_task:
self.backup_task.cancel()
async def backup_infractions_periodically(self):
"""Periodically back up the infractions file."""
await self.bot.wait_until_ready()
while not self.bot.is_closed():
timestamp = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S")
backup_path = os.path.join(
INFRACTION_BACKUP_DIR, f"user_infractions_{timestamp}.json"
)
try:
shutil.copy(USER_INFRACTIONS_PATH, backup_path)
except Exception as e: # noqa: BLE001
print(f"Failed to back up infractions: {e}")
await asyncio.sleep(24 * 60 * 60)
async def process_image(self, attachment: discord.Attachment) -> tuple[str, bytes]:
"""
Process an image attachment and return its base64 encoding.
Args:
attachment: The Discord attachment containing the image
Returns:
Tuple of (mime_type, image_bytes)
"""
try:
# Download the image
image_bytes = await attachment.read()
mime_type = (
attachment.content_type or "image/jpeg"
) # Default to jpeg if not specified
# Return the image bytes and mime type
return mime_type, image_bytes
except Exception as e:
print(f"Error processing image: {e}")
return None, None
async def process_gif(self, attachment: discord.Attachment) -> tuple[str, bytes]:
"""Return the raw bytes for a GIF attachment."""
try:
gif_bytes = await attachment.read()
mime_type = attachment.content_type or "image/gif"
return mime_type, gif_bytes
except Exception as e:
print(f"Error processing GIF: {e}")
return None, None
async def process_attachment(
self, attachment: discord.Attachment
) -> tuple[str, bytes, str]:
"""
Process any attachment and return the appropriate image data.
Args:
attachment: The Discord attachment
Returns:
Tuple of (mime_type, image_bytes, attachment_type)
attachment_type is one of: 'image', 'gif', 'video', or None if unsupported
"""
if not attachment:
return None, None, None
# Get the file extension
filename = attachment.filename.lower()
_, ext = os.path.splitext(filename)
# Process based on file type
if ext in self.image_extensions:
mime_type, image_bytes = await self.process_image(attachment)
return mime_type, image_bytes, "image"
elif ext in self.gif_extensions:
mime_type, image_bytes = await self.process_gif(attachment)
return mime_type, image_bytes, "gif"
elif ext in self.video_extensions:
mime_type, image_bytes = await self.process_video(attachment)
return mime_type, image_bytes, "video"
else:
print(f"Unsupported file type: {ext}")
return None, None, None
async def process_video(self, attachment: discord.Attachment) -> tuple[str, bytes]:
"""Return the raw bytes for a video attachment."""
try:
video_bytes = await attachment.read()
mime_type = attachment.content_type or "video/mp4"
return mime_type, video_bytes
except Exception as e:
print(f"Error processing video: {e}")
return None, None
# --- AI Moderation Command Group ---
aimod_group = app_commands.Group(
name="aimod", description="AI Moderation commands."
)
config_subgroup = app_commands.Group(
name="config",
description="Configure AI moderation settings.",
parent=aimod_group,
)
infractions_subgroup = app_commands.Group(
name="infractions", description="Manage user infractions.", parent=aimod_group
)
model_subgroup = app_commands.Group(
name="model",
description="Manage the AI model for moderation.",
parent=aimod_group,
)
debug_subgroup = app_commands.Group(
name="debug",
description="Debugging commands for AI moderation.",
parent=aimod_group,
)
@aimod_group.command(
name="sync",
description="Reload AI moderation configuration and infractions from disk.",
)
@app_commands.checks.has_permissions(administrator=True)
async def aimod_sync(self, interaction: discord.Interaction):
"""Reload configuration files from disk."""
try:
async with aiofiles.open(GUILD_CONFIG_PATH, "r", encoding="utf-8") as f:
data = await f.read()
async with CONFIG_LOCK:
global GUILD_CONFIG
GUILD_CONFIG = json.loads(data)
async with aiofiles.open(
USER_INFRACTIONS_PATH, "r", encoding="utf-8"
) as f2:
data2 = await f2.read()
async with CONFIG_LOCK:
global USER_INFRACTIONS
USER_INFRACTIONS = json.loads(data2)
await interaction.response.send_message(
"Configuration synced from disk.", ephemeral=True
)
except Exception as e: # noqa: BLE001
await interaction.response.send_message(
f"Failed to reload configuration: {e}", ephemeral=True
)
@config_subgroup.command(
name="log_channel", description="Set the moderation log channel."
)
@app_commands.describe(channel="The text channel to use for moderation logs.")
@app_commands.checks.has_permissions(administrator=True)
async def modset_log_channel(
self, interaction: discord.Interaction, channel: discord.TextChannel
):
await set_guild_config(interaction.guild.id, "MOD_LOG_CHANNEL_ID", channel.id)
await interaction.response.send_message(
f"Moderation log channel set to {channel.mention}.", ephemeral=False
)
@config_subgroup.command(
name="suggestions_channel", description="Set the suggestions channel."
)
@app_commands.describe(channel="The text channel to use for suggestions.")
@app_commands.checks.has_permissions(administrator=True)
async def modset_suggestions_channel(
self, interaction: discord.Interaction, channel: discord.TextChannel
):
await set_guild_config(
interaction.guild.id, "SUGGESTIONS_CHANNEL_ID", channel.id
)
await interaction.response.send_message(
f"Suggestions channel set to {channel.mention}.", ephemeral=False
)
@config_subgroup.command(
name="moderator_role", description="Set the moderator role."
)
@app_commands.describe(role="The role that identifies moderators.")
@app_commands.checks.has_permissions(administrator=True)
async def modset_moderator_role(
self, interaction: discord.Interaction, role: discord.Role
):
await set_guild_config(interaction.guild.id, "MODERATOR_ROLE_ID", role.id)
await interaction.response.send_message(
f"Moderator role set to {role.mention}.", ephemeral=False
)
@config_subgroup.command(
name="suicidal_ping_role",
description="Set the role to ping for suicidal content.",
)
@app_commands.describe(role="The role to ping for urgent suicidal content alerts.")
@app_commands.checks.has_permissions(administrator=True)
async def modset_suicidal_ping_role(
self, interaction: discord.Interaction, role: discord.Role
):
await set_guild_config(interaction.guild.id, "SUICIDAL_PING_ROLE_ID", role.id)
await interaction.response.send_message(
f"Suicidal content ping role set to {role.mention}.", ephemeral=False
)
@config_subgroup.command(
name="add_nsfw_channel",
description="Add a channel to the list of NSFW channels.",
)
@app_commands.describe(channel="The text channel to mark as NSFW for the bot.")
@app_commands.checks.has_permissions(administrator=True)
async def modset_add_nsfw_channel(
self, interaction: discord.Interaction, channel: discord.TextChannel
):
guild_id = interaction.guild.id
nsfw_channels: list[int] = get_guild_config(guild_id, "NSFW_CHANNEL_IDS", [])
if channel.id not in nsfw_channels:
nsfw_channels.append(channel.id)
await set_guild_config(guild_id, "NSFW_CHANNEL_IDS", nsfw_channels)
await interaction.response.send_message(
f"{channel.mention} added to NSFW channels list.", ephemeral=False
)
else:
await interaction.response.send_message(
f"{channel.mention} is already in the NSFW channels list.",
ephemeral=True,
)
@config_subgroup.command(
name="remove_nsfw_channel",
description="Remove a channel from the list of NSFW channels.",
)
@app_commands.describe(channel="The text channel to remove from the NSFW list.")
@app_commands.checks.has_permissions(administrator=True)
async def modset_remove_nsfw_channel(
self, interaction: discord.Interaction, channel: discord.TextChannel
):
guild_id = interaction.guild.id
nsfw_channels: list[int] = get_guild_config(guild_id, "NSFW_CHANNEL_IDS", [])
if channel.id in nsfw_channels:
nsfw_channels.remove(channel.id)
await set_guild_config(guild_id, "NSFW_CHANNEL_IDS", nsfw_channels)
await interaction.response.send_message(
f"{channel.mention} removed from NSFW channels list.", ephemeral=False
)
else:
await interaction.response.send_message(
f"{channel.mention} is not in the NSFW channels list.", ephemeral=True
)
@config_subgroup.command(
name="list_nsfw_channels",
description="List currently configured NSFW channels.",
)
@app_commands.checks.has_permissions(administrator=True)
async def modset_list_nsfw_channels(self, interaction: discord.Interaction):
guild_id = interaction.guild.id
nsfw_channel_ids: list[int] = get_guild_config(guild_id, "NSFW_CHANNEL_IDS", [])
if not nsfw_channel_ids:
await interaction.response.send_message(
"No NSFW channels are currently configured.", ephemeral=False
)
return
channel_mentions = []
for channel_id in nsfw_channel_ids:
channel_obj = interaction.guild.get_channel(channel_id)
if channel_obj:
channel_mentions.append(channel_obj.mention)
else:
channel_mentions.append(f"ID:{channel_id} (not found)")
await interaction.response.send_message(
f"Configured NSFW channels:\n- " + "\n- ".join(channel_mentions),
ephemeral=False,
)
# Note: The @app_commands.command(name="modenable", ...) and other commands like
# viewinfractions, clearinfractions, modsetmodel, modgetmodel remain as top-level commands
# as they were not part of the original "modset" generic command structure.
# If these also need to be grouped, that would be a separate consideration.
@config_subgroup.command(
name="enable",
description="Enable or disable moderation for this guild (admin only).",
)
@app_commands.describe(enabled="Enable moderation (true/false)")
async def modenable(self, interaction: discord.Interaction, enabled: bool):
if not interaction.user.guild_permissions.administrator:
await interaction.response.send_message(
"You must be an administrator to use this command.", ephemeral=False
)
return
await set_guild_config(interaction.guild.id, "ENABLED", enabled)
await interaction.response.send_message(
f"Moderation is now {'enabled' if enabled else 'disabled'} for this guild.",
ephemeral=False,
)
@config_subgroup.command(
name="event_mode",
description="Toggle temporary event mode for this guild.",
)
@app_commands.describe(enabled="Enable event mode (true/false)")
async def event_mode(self, interaction: discord.Interaction, enabled: bool):
if not interaction.user.guild_permissions.administrator:
await interaction.response.send_message(
"You must be an administrator to use this command.", ephemeral=False
)
return
await set_guild_config(interaction.guild.id, "EVENT_MODE", enabled)
await interaction.response.send_message(
f"Event mode is now {'enabled' if enabled else 'disabled'}.",
ephemeral=False,
)
@config_subgroup.command(
name="testing_mode",
description="Enable or disable testing mode (no actions are taken).",
)
@app_commands.describe(enabled="Enable testing mode (true/false)")
async def testing_mode(self, interaction: discord.Interaction, enabled: bool):
if not interaction.user.guild_permissions.administrator:
await interaction.response.send_message(
"You must be an administrator to use this command.", ephemeral=False
)
return
await set_guild_config(interaction.guild.id, "TESTING_MODE", enabled)
await interaction.response.send_message(
f"Testing mode is now {'enabled' if enabled else 'disabled'}.",
ephemeral=False,
)
@config_subgroup.command(
name="update_rules",
description="Update server rules from the specified channel.",
)
@app_commands.describe(channel="The channel containing the server rules.")
@app_commands.checks.has_permissions(administrator=True)
async def update_rules(
self, interaction: discord.Interaction, channel: discord.TextChannel
) -> None:
"""Pull the server rules from a channel and update the global config."""
messages = []
async for msg in channel.history(
limit=MAX_RULE_MESSAGES + 1, oldest_first=True
):
if msg.content:
messages.append(msg.content)
if len(messages) > MAX_RULE_MESSAGES:
await interaction.response.send_message(
f"Channel has more than {MAX_RULE_MESSAGES} messages."
" Please consolidate your rules into fewer messages.",
ephemeral=True,
)
return
if not messages:
await interaction.response.send_message(
"No messages found in that channel.", ephemeral=True
)
return
rules_text = "\n\n".join(messages).strip()
aimod_config_module.SERVER_RULES = rules_text
await interaction.response.send_message(
f"Server rules updated from {channel.mention}.", ephemeral=False
)
@infractions_subgroup.command(
name="view",
description="View a user's AI moderation infraction history (mod/admin only).",
)
@app_commands.describe(user="The user to view infractions for")
async def viewinfractions(
self, interaction: discord.Interaction, user: discord.Member
):
# Check if user has permission (admin or moderator role)
moderator_role_id = get_guild_config(interaction.guild.id, "MODERATOR_ROLE_ID")
moderator_role = (
interaction.guild.get_role(moderator_role_id) if moderator_role_id else None
)
has_permission = interaction.user.guild_permissions.administrator or (
moderator_role and moderator_role in interaction.user.roles
)
if not has_permission:
await interaction.response.send_message(
"You must be an administrator or have the moderator role to use this command.",
ephemeral=True,
)
return
# Get the user's infraction history
infractions = get_user_infraction_history(interaction.guild.id, user.id)
if not infractions:
await interaction.response.send_message(
f"{user.mention} has no recorded infractions.", ephemeral=False
)
return
# Create an embed to display the infractions
embed = discord.Embed(
title=f"Infraction History for {user.display_name}",
description=f"User ID: {user.id}",
color=discord.Color.orange(),
)
# Add each infraction to the embed
for i, infraction in enumerate(infractions, 1):
timestamp = infraction.get("timestamp", "Unknown date")[:19].replace(
"T", " "
) # Format ISO timestamp
rule = infraction.get("rule_violated", "Unknown rule")
action = infraction.get("action_taken", "Unknown action")
reason = infraction.get("reasoning", "No reason provided")
# Truncate reason if it's too long
if len(reason) > 200:
reason = reason[:197] + "..."
embed.add_field(
name=f"Infraction #{i} - {timestamp}",
value=f"**Rule Violated:** {rule}\n**Action Taken:** {action}\n**Reason:** {reason}",
inline=False,
)
embed.set_footer(text=f"Total infractions: {len(infractions)}")
embed.timestamp = discord.utils.utcnow()
await interaction.response.send_message(embed=embed, ephemeral=False)
@infractions_subgroup.command(
name="clear",
description="Clear a user's AI moderation infraction history (admin only).",
)
@app_commands.describe(user="The user to clear infractions for")
async def clearinfractions(
self, interaction: discord.Interaction, user: discord.Member
):
# Check if user has administrator permission
if not interaction.user.guild_permissions.administrator:
await interaction.response.send_message(
"You must be an administrator to use this command.", ephemeral=True
)
return
# Get the user's infraction history
key = f"{interaction.guild.id}_{user.id}"
infractions = USER_INFRACTIONS.get(key, [])
if not infractions:
await interaction.response.send_message(
f"{user.mention} has no recorded infractions to clear.", ephemeral=False
)
return
# Clear the user's infractions
USER_INFRACTIONS[key] = []
await save_user_infractions()
await interaction.response.send_message(
f"Cleared {len(infractions)} infraction(s) for {user.mention}.",
ephemeral=False,
)
@infractions_subgroup.command(
name="leaderboard",
description="Show users with the fewest infractions.",
)
async def leaderboard(self, interaction: discord.Interaction):
guild_id = interaction.guild.id
counts = {}
for key, infractions in USER_INFRACTIONS.items():
if key.startswith(f"{guild_id}_"):
uid = int(key.split("_", 1)[1])
counts[uid] = len(infractions)
if not counts:
await interaction.response.send_message(
"No infractions recorded for this guild.", ephemeral=True
)
return
sorted_users = sorted(counts.items(), key=lambda x: x[1])[:5]
lines = []
for uid, count in sorted_users:
member = interaction.guild.get_member(uid)
name = member.display_name if member else f"ID:{uid}"
lines.append(f"**{name}** - {count} infractions")
embed = discord.Embed(
title="Best Behavior Leaderboard",
description="\n".join(lines),
color=discord.Color.green(),
)
await interaction.response.send_message(embed=embed, ephemeral=False)
@infractions_subgroup.command(
name="restore",
description="Restore infractions from the latest backup (admin only).",
)
@app_commands.checks.has_permissions(administrator=True)
async def restore_infractions(self, interaction: discord.Interaction):
backups = sorted(os.listdir(INFRACTION_BACKUP_DIR))
if not backups:
await interaction.response.send_message("No backups found.", ephemeral=True)
return
latest = os.path.join(INFRACTION_BACKUP_DIR, backups[-1])
try:
shutil.copy(latest, USER_INFRACTIONS_PATH)
async with aiofiles.open(USER_INFRACTIONS_PATH, "r", encoding="utf-8") as f:
data = await f.read()
async with CONFIG_LOCK:
global USER_INFRACTIONS
USER_INFRACTIONS = json.loads(data)
await interaction.response.send_message(
f"Infractions restored from {backups[-1]}", ephemeral=False
)
except Exception as e: # noqa: BLE001
await interaction.response.send_message(
f"Failed to restore infractions: {e}", ephemeral=True
)
@model_subgroup.command(
name="set", description="Change the AI model used for moderation (admin only)."
)
@app_commands.describe(
model="The Vertex AI model to use (e.g., 'gemini-1.5-flash-001', 'gemini-1.0-pro')"
)
async def modsetmodel(self, interaction: discord.Interaction, model: str):
# Check if user has administrator permission
if not interaction.user.guild_permissions.administrator:
await interaction.response.send_message(
"You must be an administrator to use this command.", ephemeral=True
)
return
# Validate the model name (basic validation for Vertex AI)
# Vertex AI models usually don't have "/" like OpenRouter, but can have "-" and numbers.
# Example: gemini-1.5-flash-001
if not model or len(model) < 5: # Basic check
await interaction.response.send_message(
"Invalid model format. Please provide a valid Vertex AI model ID (e.g., 'gemini-1.5-flash-001').",
ephemeral=False,
)
return
# Save the model to guild configuration
guild_id = interaction.guild.id
await set_guild_config(guild_id, "AI_MODEL", model)
# Note: There's no global model variable to update here like OPENROUTER_MODEL.
# The cog will use the guild-specific config or the DEFAULT_VERTEX_AI_MODEL.
await interaction.response.send_message(
f"AI moderation model updated to `{model}` for this guild.", ephemeral=False
)
# @modsetmodel.autocomplete('model') # Autocomplete removed as OpenRouter models are not used.
# async def modsetmodel_autocomplete(...): # This function is now removed.
@model_subgroup.command(
name="get", description="View the current AI model used for moderation."
)
async def modgetmodel(self, interaction: discord.Interaction):
# Get the model from guild config, fall back to global default
guild_id = interaction.guild.id
model_used = get_guild_config(guild_id, "AI_MODEL", DEFAULT_VERTEX_AI_MODEL)
# Create an embed to display the model information
embed = discord.Embed(
title="AI Moderation Model",
description=f"The current AI model used for moderation in this server is:",
color=discord.Color.blue(),
)
embed.add_field(name="Model In Use", value=f"`{model_used}`", inline=False)
embed.add_field(
name="Default Model", value=f"`{DEFAULT_VERTEX_AI_MODEL}`", inline=False
)
embed.set_footer(text="Use /aimod model set to change the model")
await interaction.response.send_message(embed=embed, ephemeral=False)
# --- Helper Function to Safely Extract Text from Vertex AI Response ---
def _get_response_text(
self, response: Optional[types.GenerateContentResponse]
) -> Optional[str]:
"""
Safely extracts the text content from the first text part of a GenerateContentResponse.
Handles potential errors and lack of text parts gracefully.
(Adapted from teto_cog.py)
"""
if not response:
print("[AIModerationCog._get_response_text] Received None response object.")
return None
if (
hasattr(response, "text") and response.text
): # Some simpler responses might have .text directly
print(
"[AIModerationCog._get_response_text] Found text directly in response.text attribute."
)
return response.text
if not response.candidates:
print(
f"[AIModerationCog._get_response_text] Response object has no candidates. Response: {response}"
)
return None
try:
candidate = response.candidates[0]
if not hasattr(candidate, "content") or not candidate.content:
print(
f"[AIModerationCog._get_response_text] Candidate 0 has no 'content'. Candidate: {candidate}"
)
return None
if not hasattr(candidate.content, "parts") or not candidate.content.parts:
print(
f"[AIModerationCog._get_response_text] Candidate 0 content has no 'parts' or parts list is empty. types.Content: {candidate.content}"
)
return None
for i, part in enumerate(candidate.content.parts):
if hasattr(part, "text") and part.text is not None:
if isinstance(part.text, str) and part.text.strip():
print(
f"[AIModerationCog._get_response_text] Found non-empty text in part {i}."
)
return part.text
else:
print(
f"[AIModerationCog._get_response_text] types.Part {i} has 'text' attribute, but it's empty or not a string: {part.text!r}"
)
print(
f"[AIModerationCog._get_response_text] No usable text part found in candidate 0 after iterating through all parts."
)
return None
except (AttributeError, IndexError, TypeError) as e:
print(
f"[AIModerationCog._get_response_text] Error accessing response structure: {type(e).__name__}: {e}"
)
print(f"Problematic response object: {response}")
return None
except Exception as e:
print(
f"[AIModerationCog._get_response_text] Unexpected error extracting text: {e}"
)
print(f"Response object during error: {response}")
return None
async def query_vertex_ai(
self,
message: discord.Message,
message_content: str,
user_history: str,
image_data_list: Optional[List[Tuple[str, bytes, str, str]]] = None,
):
"""
Sends the message content, user history, and additional context to Google Vertex AI for analysis.
Optionally includes image data for visual content moderation.
Args:
message: The original discord.Message object.
message_content: The text content of the message.
user_history: A string summarizing the user's past infractions.
image_data_list: Optional list of tuples (mime_type, image_bytes, attachment_type, filename) for image moderation.
Returns:
A dictionary containing the AI's decision, or None if an error occurs.
"""
print(
f"query_vertex_ai called. Vertex AI client available: {self.genai_client is not None}"
)
if not self.genai_client:
print("Error: Vertex AI Client is not available. Cannot query API.")
return None
# Construct the prompt for the AI model (system prompt is largely the same)
system_prompt_text = f"""You are an AI moderation assistant for a Discord server.
Your primary function is to analyze message content and attached media based STRICTLY on the server rules provided below, using all available context.
Server Rules:
---
{SERVER_RULES}
---
Context Provided:
You will receive the following information to aid your analysis:
- User's Server Role: (e.g., "Server Owner", "Admin", "Moderator", "Member").
- Channel Category: The name of the category the channel belongs to.
- Channel Age-Restricted/NSFW (Discord Setting): Boolean (true/false).
- Replied-to Message: If the current message is a reply, the content of the original message will be provided. This is crucial for understanding direct interactions.
- Recent Channel History: The last few messages in the channel to understand the flow of conversation.
- Attached Media: If the message contains image, GIF, or video attachments, they will be provided directly in the content array for analysis.
Instructions:
1. Review the "Message Content" and any attached media against EACH rule, considering ALL provided context (User Role, Channel Info, Replied-to Message, Recent Channel History).
- The "Channel Age-Restricted/NSFW (Discord Setting)" is the definitive indicator for NSFW content by Discord.
- The "Channel Category" provides general context.
- **"Replied-to Message" and "Recent Channel History" are vital for understanding banter, jokes, and ongoing discussions. A statement that seems offensive in isolation might be acceptable within the flow of conversation or as a direct reply.**
- If images, GIFs, or videos are attached, analyze ALL of them for rule violations.
- Pay special attention to images that may contain NSFW content, pornography, gore, or other prohibited visual content.
- If multiple attachments are present, a violation in ANY of them should be flagged.
2. Determine if ANY rule is violated. When evaluating, consider the server's culture where **extremely edgy, dark, and sexual humor, including potentially offensive jokes (e.g., rape jokes, saying you want to be raped), are common and generally permissible IF THEY ARE CLEARLY JOKES, part of an established banter, or a direct non-malicious reply, and not targeted harassment or explicit rule violations.**
* **NSFW Content:**
The only rule regarding NSFW content is that **real-life pornography is strictly prohibited**.
Full-on pornographic images are permitted in designated NSFW channels.
Stickers and emojis are NOT considered "full-on pornographic images" and are allowed in any channel.
- Do NOT attempt to moderate AI-generated pornography. You are unlikely to know what it looks like.
- For general disrespectful behavior, harassment, or bullying (Rule 2 & 3): Only flag a violation if the intent appears **genuinely malicious, targeted, or serious, even after considering conversational history and replies.** Lighthearted insults or "wild" statements within an ongoing banter are generally permissible.
- For **explicit slurs or severe discriminatory language** (Rule 3): These are violations **regardless of joking intent if they are used in a targeted or hateful manner**. Context from replies and history is still important to assess targeting.
After considering the above, pay EXTREME attention to rules 5 (Pedophilia) and 5A (IRL Porn) these are always severe. Rule 4 (AI Porn) is also critical. Prioritize these severe violations.
3. Respond ONLY with a single JSON object containing the following keys:
- "reasoning": string (A concise explanation for your decision, referencing the specific rule and content).
- "violation": boolean (true if any rule is violated, false otherwise)
- "rule_violated": string (The number of the rule violated, e.g., "1", "5A", "None". If multiple rules are violated, state the MOST SEVERE one, prioritizing 5A > 5 > 4 > 3 > 2 > 1).
- "action": string (Suggest ONE action from: "IGNORE", "WARN", "DELETE", "TIMEOUT_SHORT", "TIMEOUT_MEDIUM", "TIMEOUT_LONG", "KICK", "BAN", "NOTIFY_MODS", "SUICIDAL".
- "notify_mods_message": optional string (If the suggested action is "NOTIFY_MODS", provide an optional brief message here for the moderators, e.g., "User's message is slightly ambiguous, human review needed.").
Consider the user's infraction history. If the user has prior infractions for similar or escalating behavior, suggest a more severe action than if it were a first-time offense for a minor rule.
Progressive Discipline Guide (unless overridden by severity):
- First minor offense: "WARN" (and "DELETE" if content is removable like Rule 1/4).
- Second minor offense / First moderate offense: "TIMEOUT_SHORT" (e.g., 10 minutes).
- Repeated moderate offenses: "TIMEOUT_MEDIUM" (e.g., 1 hour).
- Multiple/severe offenses: "TIMEOUT_LONG" (e.g., 1 day), "KICK", or "BAN".
Spamming:
- If a user continuously sends very long messages that are off-topic, repetitive, or appear to be meaningless spam (e.g., character floods, nonsensical text), suggest "TIMEOUT_MEDIUM" or "TIMEOUT_LONG" depending on severity and history, even if the content itself doesn't violate other specific rules. This is to maintain chat readability.
Rule Severity Guidelines (use your judgment):
- Consider the severity of each rule violation on its own merits.
- Consider the user's history of past infractions when determining appropriate action.
- Consider the context of the message and channel when evaluating violations.
- You have full discretion to determine the most appropriate action for any violation.
Suicidal Content:
If the message content expresses **clear, direct, and serious suicidal ideation, intent, planning, or recent attempts** (e.g., 'I am going to end my life and have a plan', 'I survived my attempt last night', 'I wish I hadn't woken up after trying'), ALWAYS use "SUICIDAL" as the action, and set "violation" to true, with "rule_violated" as "Suicidal Content".
For casual, edgy, hyperbolic, or ambiguous statements like 'imma kms', 'just kill me now', 'I want to die (lol)', or phrases that are clearly part of edgy humor/banter rather than a genuine cry for help, you should lean towards "IGNORE" or "NOTIFY_MODS" if there's slight ambiguity but no clear serious intent. **Do NOT flag 'imma kms' as "SUICIDAL" unless there is very strong supporting context indicating genuine, immediate, and serious intent.**
If unsure but suspicious, or if the situation is complex: "NOTIFY_MODS".
Default action for minor first-time rule violations should be "WARN" or "DELETE" (if applicable).
Do not suggest "KICK" or "BAN" lightly; reserve for severe or repeated major offenses.
Timeout durations: TIMEOUT_SHORT (approx 10 mins), TIMEOUT_MEDIUM (approx 1 hour), TIMEOUT_LONG (approx 1 day to 1 week).
The system will handle the exact timeout duration; you just suggest the category.)
Example Response (Text Violation):
{{
"reasoning": "The message content clearly depicts IRL non-consensual sexual content involving minors, violating rule 5A.",
"violation": true,
"rule_violated": "5A",
"action": "BAN"
}}
Example Response (Image Violation):
{{
"reasoning": "Attachment #2 contains explicit pornographic imagery in a non-NSFW channel, violating rule 1.",
"violation": true,
"rule_violated": "1",
"action": "DELETE"
}}
Example Response (Multiple Attachments Violation):
{{
"reasoning": "While the text content is fine, attachment #3 contains IRL pornography, violating rule 4.",
"violation": true,
"rule_violated": "4",
"action": "WARN"
}}
Example Response (No Violation):
{{
"reasoning": "The message and all attached images are respectful and contain no prohibited content.",
"violation": false,
"rule_violated": "None",
"action": "IGNORE"
}}
Example Response (Suicidal Content):
{{
"reasoning": "The user's message 'I want to end my life' indicates clear suicidal intent.",
"violation": true,
"rule_violated": "Suicidal Content",
"action": "SUICIDAL"
}}
Example Response (Notify Mods):
{{
"reasoning": "The message contains potentially sensitive content that requires human review.",
"violation": true,
"rule_violated": "Review Required",
"action": "NOTIFY_MODS",
"notify_mods_message": "Content is borderline, please review."
}}
"""
member = message.author # This is a discord.Member object
server_role_str = "Unprivileged Member" # Default
if member == await message.guild.fetch_member(message.guild.owner_id):
server_role_str = "Server Owner"
elif member.guild_permissions.administrator:
server_role_str = "Admin"
else:
perms = member.guild_permissions
if (
perms.manage_messages
or perms.kick_members
or perms.ban_members
or perms.moderate_members
):
server_role_str = "Moderator"
print(f"role: {server_role_str}")
# --- Fetch Replied-to Message ---
replied_to_message_content = "N/A (Not a reply)"
if message.reference and message.reference.message_id:
try:
replied_to_msg = await message.channel.fetch_message(
message.reference.message_id
)
replied_to_message_content = f"User '{replied_to_msg.author.name}' said: \"{replied_to_msg.content[:200]}\""
if len(replied_to_msg.content) > 200:
replied_to_message_content += "..."
except discord.NotFound:
replied_to_message_content = "N/A (Replied-to message not found)"
except discord.Forbidden:
replied_to_message_content = (
"N/A (Cannot fetch replied-to message - permissions)"
)
except Exception as e:
replied_to_message_content = (
f"N/A (Error fetching replied-to message: {e})"
)
# --- Fetch Recent Channel History ---
recent_channel_history_str = "N/A (Could not fetch history)"
try:
history_messages = []
# Fetch last 11 messages (current + 10 previous). We'll filter out the current one
async for prev_msg in message.channel.history(limit=11, before=message):
if (
prev_msg.id != message.id
): # Ensure we don't include the current message itself
author_name = (
prev_msg.author.name + " (BOT)"
if prev_msg.author.bot
else prev_msg.author.name
)
history_messages.append(
f"- {author_name}: \"{prev_msg.content[:150]}{'...' if len(prev_msg.content) > 150 else ''}\" (ID: {prev_msg.id})"
)
if history_messages:
# Reverse to show oldest first in the snippet, then take the last 10.
recent_channel_history_str = "\n".join(
list(reversed(history_messages))[:10]
)
else:
recent_channel_history_str = (
"No recent messages before this one in the channel."
)
except discord.Forbidden:
recent_channel_history_str = (
"N/A (Cannot fetch channel history - permissions)"
)
except Exception as e:
recent_channel_history_str = f"N/A (Error fetching channel history: {e})"
# Prepare user prompt content list with proper OpenRouter format
user_prompt_content_list = []
# Add the text context first
user_context_text = f"""User Infraction History (for {message.author.name}, ID: {message.author.id}):
---
{user_history if user_history else "No prior infractions recorded for this user in this guild."}
---
Current Message Context:
- Author: {message.author.name} (ID: {message.author.id})
- Server Role: {server_role_str}
- Channel: #{message.channel.name} (ID: {message.channel.id})
- Channel Category: {message.channel.category.name if message.channel.category else "No Category"}
- Channel Age-Restricted/NSFW (Discord Setting): {message.channel.is_nsfw()}
---
Replied-to Message:
{replied_to_message_content}
---
Recent Channel History (last up to 10 messages before this one):
{recent_channel_history_str}
---
Message Content to Analyze:
"{message_content}"
Now, analyze the message content and any attached media based on the server rules and ALL the context provided above.
Follow the JSON output format specified in the system prompt.
CRITICAL: Do NOT output anything other than the required JSON response.
"""
# Add the text content first
user_prompt_content_list.append({"type": "text", "text": user_context_text})
# Add images in the proper OpenRouter format
if image_data_list and len(image_data_list) > 0:
try:
for i, (mime_type, image_bytes, attachment_type, filename) in enumerate(
image_data_list
):
try:
# Encode image to base64
base64_image = base64.b64encode(image_bytes).decode("utf-8")
# Create data URL
image_data_url = f"data:{mime_type};base64,{base64_image}"
# Add image in OpenRouter format
user_prompt_content_list.append(
{"type": "image_url", "image_url": {"url": image_data_url}}
)
print(
f"Added attachment #{i+1}: {filename} ({attachment_type}) to the prompt"
)
except Exception as e:
print(
f"Error encoding image data for attachment {filename}: {e}"
)
except Exception as e:
print(f"Error processing image data: {e}")
# Add a text note about the error
user_prompt_content_list.append(
{
"type": "text",
"text": f"Note: There were {len(image_data_list)} attached images, but they could not be processed for analysis.",
}
)
# Get guild-specific model if configured, otherwise use default
member = message.author
server_role_str = "Unprivileged Member"
if member == await message.guild.fetch_member(message.guild.owner_id):
server_role_str = "Server Owner"
elif member.guild_permissions.administrator:
server_role_str = "Admin"
else:
perms = member.guild_permissions
if (
perms.manage_messages
or perms.kick_members
or perms.ban_members
or perms.moderate_members
):
server_role_str = "Moderator"
replied_to_message_content = "N/A (Not a reply)"
if message.reference and message.reference.message_id:
try:
replied_to_msg = await message.channel.fetch_message(
message.reference.message_id
)
replied_to_message_content = f"User '{replied_to_msg.author.name}' said: \"{replied_to_msg.content[:200]}{'...' if len(replied_to_msg.content) > 200 else ''}\""
except Exception as e:
replied_to_message_content = f"N/A (Error fetching replied-to: {e})"
recent_channel_history_str = "N/A (Could not fetch history)"
try:
history_messages = [
f"- {prev_msg.author.name}{' (BOT)' if prev_msg.author.bot else ''}: \"{prev_msg.content[:150]}{'...' if len(prev_msg.content) > 150 else ''}\" (ID: {prev_msg.id})"
async for prev_msg in message.channel.history(limit=11, before=message)
if prev_msg.id != message.id
]
recent_channel_history_str = (
"\n".join(list(reversed(history_messages))[:10])
if history_messages
else "No recent messages."
)
except Exception as e:
recent_channel_history_str = f"N/A (Error fetching history: {e})"
user_context_text = f"""User Infraction History (for {message.author.name}, ID: {message.author.id}):
---
{user_history if user_history else "No prior infractions recorded for this user in this guild."}
---
Current Message Context:
- Author: {message.author.name} (ID: {message.author.id})
- Server Role: {server_role_str}
- Channel: #{message.channel.name} (ID: {message.channel.id})
- Channel Category: {message.channel.category.name if message.channel.category else "No Category"}
- Channel Age-Restricted/NSFW (Discord Setting): {message.channel.is_nsfw()}
---
Replied-to Message:
{replied_to_message_content}
---
Recent Channel History (last up to 10 messages before this one):
{recent_channel_history_str}
---
Message Content to Analyze:
"{message_content}"
Now, analyze the message content and any attached media based on the server rules and ALL the context provided above.
Follow the JSON output format specified in the system prompt.
CRITICAL: Do NOT output anything other than the required JSON response.
"""
# Prepare parts for Vertex AI
vertex_parts: List[Any] = [types.Part(text=user_context_text)]
if image_data_list:
for mime_type, image_bytes, attachment_type, filename in image_data_list:
try:
# Vertex AI directly supports common image and video MIME types.
# Ensure mime_type is one of the supported ones by Vertex, e.g., image/png or video/mp4.
supported_image_mimes = [
"image/png",
"image/jpeg",
"image/webp",
"image/heic",
"image/heif",
"image/gif",
]
supported_video_mimes = [
"video/mp4",
"video/webm",
"video/quicktime",
"video/x-msvideo",
"video/x-matroska",
"video/x-flv",
]
clean_mime_type = mime_type.split(";")[0].lower()
if (
clean_mime_type in supported_image_mimes
or clean_mime_type in supported_video_mimes
):
vertex_parts.append(
types.Part(
inline_data=types.Blob(
data=image_bytes,
mime_type=clean_mime_type,
)
)
)
print(
f"Added attachment {filename} ({attachment_type}) with MIME {clean_mime_type} to Vertex prompt"
)
else:
print(
f"Skipping attachment {filename} due to unsupported MIME type for Vertex: {mime_type}"
)
vertex_parts.append(
types.Part(
text=f"[System Note: Attachment '{filename}' of type '{mime_type}' was not processed as it's not directly supported for vision by the current model configuration.]"
)
)
except Exception as e:
print(f"Error processing attachment {filename} for Vertex AI: {e}")
vertex_parts.append(
types.Part(
text=f"[System Note: Error processing attachment '{filename}'.]"
)
)
# Get guild-specific model if configured, otherwise use default
guild_id = message.guild.id
model_id_to_use = get_guild_config(
guild_id, "AI_MODEL", DEFAULT_VERTEX_AI_MODEL
)
# Vertex model path is usually like "publishers/google/models/gemini-1.5-flash-001"
# If model_id_to_use is just "gemini-1.5-flash-001", prepend "publishers/google/models/"
if not model_id_to_use.startswith("publishers/google/models/"):
model_path = f"publishers/google/models/{model_id_to_use}"
else:
model_path = model_id_to_use
thinking_config = types.ThinkingConfig(
thinking_budget=0
) # Example manual thinking budget
generation_config = types.GenerateContentConfig(
temperature=0.2,
max_output_tokens=2000, # Ensure enough for JSON
safety_settings=STANDARD_SAFETY_SETTINGS,
thinking_config=thinking_config,
)
# Construct contents for Vertex AI API
# System prompt is handled by the model's configuration or as the first message if not directly supported in GenerateContentConfig.
# For Vertex AI with `genai.Client`, system prompt is often part of the model's configuration or the first message.
# The `genai.GenerativeModel` has `system_instruction`.
# Here, we'll build the `contents` list.
# The system prompt is part of the model's understanding, and the user prompt contains the task.
# For multi-turn, history is added to `contents`. Here, it's a single-turn request.
request_contents = [
# System prompt can be the first message if not using system_instruction in model
# types.Content(role="system", parts=[types.Part(text=system_prompt_text)]), # This is one way
# Or, rely on the model's pre-set system prompt and just send user data.
# For this moderation task, the detailed instructions are better sent as part of the user turn
# or a specific system instruction if the client/model supports it well.
# Let's include the system prompt as the first part of the user message for clarity with current structure.
# The `system_prompt_text` is already defined and will be the primary text part.
# The `user_context_text` is what we constructed.
# The `vertex_parts` contains the `user_context_text` and any image data.
types.Content(role="user", parts=vertex_parts)
]
try:
print(f"Querying Vertex AI model {model_path}...")
# Prepare the generation config with system instruction
# The existing 'generation_config' (lines 1063-1072) already has temperature, max_tokens, safety_settings.
# We need to add system_instruction to it.
final_generation_config = types.GenerateContentConfig(
temperature=generation_config.temperature, # from existing config
max_output_tokens=generation_config.max_output_tokens, # from existing config
safety_settings=generation_config.safety_settings, # from existing config
system_instruction=types.Content(
role="system", parts=[types.Part(text=system_prompt_text)]
),
thinking_config=generation_config.thinking_config, # from existing config
# response_mime_type="application/json", # Consider if model supports this for forcing JSON
)
response = await self.genai_client.aio.models.generate_content(
model=model_path, # Correctly formatted model path
contents=request_contents, # User's message with context and images
config=final_generation_config, # Pass the config with system_instruction
)
ai_response_content = self._get_response_text(response)
print(response.usage_metadata) # Print usage metadata for debugging
if not ai_response_content:
print("Error: AI response content is empty or could not be extracted.")
# Log safety ratings if available
if (
response
and response.candidates
and response.candidates[0].safety_ratings
):
ratings = ", ".join(
[
f"{r.category.name}: {r.probability.name}"
for r in response.candidates[0].safety_ratings
]
)
print(f"Safety Ratings: {ratings}")
if (
response
and response.candidates
and response.candidates[0].finish_reason
):
print(f"Finish Reason: {response.candidates[0].finish_reason.name}")
return None
# Attempt to parse the JSON response from the AI
try:
# Clean potential markdown code blocks
if ai_response_content.startswith("```json"):
ai_response_content = ai_response_content.strip("```json\n").strip(
"`\n "
)
elif ai_response_content.startswith("```"):
ai_response_content = ai_response_content.strip("```\n").strip(
"`\n "
)
ai_decision = json.loads(ai_response_content)
# Basic validation of the parsed JSON structure
if (
not isinstance(ai_decision, dict)
or not all(
k in ai_decision
for k in ["violation", "rule_violated", "reasoning", "action"]
)
or not isinstance(ai_decision.get("violation"), bool)
):
print(
f"Error: AI response missing expected keys or 'violation' is not bool. Response: {ai_response_content}"
)
return None
print(f"AI Analysis Received: {ai_decision}")
return ai_decision
except json.JSONDecodeError as e:
print(
f"Error: Could not decode JSON response from AI: {e}. Response: {ai_response_content}"
)
return None
except Exception as e: # Catch other parsing errors
print(
f"Error parsing AI response structure: {e}. Response: {ai_response_content}"
)
return None
except google_exceptions.GoogleAPICallError as e:
print(f"Error calling Vertex AI API: {e}")
return None
except Exception as e:
print(
f"An unexpected error occurred during Vertex AI query for message {message.id}: {e}"
)
return None
async def handle_violation(
self,
message: discord.Message,
ai_decision: dict,
notify_mods_message: str = None,
):
"""
Takes action based on the AI's violation decision.
Also transmits action info via HTTP POST with API key header.
"""
import datetime
import aiohttp
rule_violated = ai_decision.get("rule_violated", "Unknown")
reasoning = ai_decision.get("reasoning", "No reasoning provided.")
action = ai_decision.get(
"action", "NOTIFY_MODS"
).upper() # Default to notify mods
guild_id = message.guild.id # Get guild_id once
user_id = message.author.id # Get user_id once
moderator_role_id = get_guild_config(guild_id, "MODERATOR_ROLE_ID")
moderator_role = (
message.guild.get_role(moderator_role_id) if moderator_role_id else None
)
mod_ping = (
moderator_role.mention
if moderator_role
else f"Moderators (Role ID {moderator_role_id} not found)"
)
current_timestamp_iso = datetime.datetime.now(datetime.timezone.utc).isoformat()
# Get the model from guild config, fall back to global default
model_used = get_guild_config(guild_id, "AI_MODEL", DEFAULT_VERTEX_AI_MODEL)
# --- Transmit action info over HTTP POST ---
try:
mod_log_api_secret = os.getenv("MOD_LOG_API_SECRET")
if mod_log_api_secret:
post_url = f"https://slipstreamm.dev/dashboard/api/guilds/{guild_id}/ai-moderation-action" # will be replaceing later with the Learnhelp API
payload = {
"timestamp": current_timestamp_iso,
"guild_id": guild_id,
"guild_name": message.guild.name,
"channel_id": message.channel.id,
"channel_name": message.channel.name,
"message_id": message.id,
"message_link": message.jump_url,
"user_id": user_id,
"user_name": str(message.author),
"action": action, # This will be the AI suggested action before potential overrides
"rule_violated": rule_violated,
"reasoning": reasoning,
"violation": ai_decision.get("violation", False),
"message_content": (
message.content[:1024] if message.content else ""
),
"full_message_content": message.content if message.content else "",
"ai_model": model_used,
"result": "pending_system_action", # Indicates AI decision received, system action pending
}
headers = {
"Authorization": f"Bearer {mod_log_api_secret}",
"Content-Type": "application/json",
}
async with aiohttp.ClientSession() as http_session: # Renamed session to avoid conflict
async with http_session.post(
post_url, headers=headers, json=payload, timeout=10
) as resp:
# This payload is just for the initial AI decision log
# The actual outcome will be logged after the action is performed
if resp.status >= 400:
print(
f"Failed to POST initial AI decision log: {resp.status}"
)
else:
print("MOD_LOG_API_SECRET not set; skipping initial action POST.")
except Exception as e:
print(f"Failed to POST initial action info: {e}")
# --- Prepare Notification ---
notification_embed = discord.Embed(
title="🚨 Rule Violation Detected 🚨",
description=f"AI analysis detected a violation of server rules.",
color=discord.Color.red(),
)
notification_embed.add_field(
name="User",
value=f"{message.author.mention} (`{message.author.id}`)",
inline=False,
)
notification_embed.add_field(
name="Channel", value=message.channel.mention, inline=False
)
notification_embed.add_field(
name="Rule Violated", value=f"**Rule {rule_violated}**", inline=True
)
notification_embed.add_field(
name="AI Suggested Action", value=f"`{action}`", inline=True
)
notification_embed.add_field(
name="AI Reasoning", value=f"_{reasoning}_", inline=False
)
notification_embed.add_field(
name="Message Link",
value=f"[Jump to Message]({message.jump_url})",
inline=False,
)
# Log message content and attachments for audit purposes
msg_content = message.content if message.content else "*No text content*"
notification_embed.add_field(
name="Message Content", value=msg_content[:1024], inline=False
)
# Add attachment information if present
if message.attachments:
attachment_info = []
for i, attachment in enumerate(message.attachments):
attachment_info.append(
f"{i+1}. {attachment.filename} ({attachment.content_type}) - [Link]({attachment.url})"
)
attachment_text = "\n".join(attachment_info)
notification_embed.add_field(
name="Attachments", value=attachment_text[:1024], inline=False
)
# Add the first image as a thumbnail if it's an image type
for attachment in message.attachments:
if any(
attachment.filename.lower().endswith(ext)
for ext in self.image_extensions
+ self.gif_extensions
+ self.video_extensions
):
notification_embed.set_thumbnail(url=attachment.url)
break
# Use the model_used variable that was defined earlier
notification_embed.set_footer(
text=f"AI Model: {model_used}. Learnhelp AI Moderation."
)
notification_embed.timestamp = (
discord.utils.utcnow()
) # Using discord.utils.utcnow() which is still supported
action_taken_message = "" # To append to the notification
testing_mode = get_guild_config(guild_id, "TESTING_MODE", False)
if testing_mode:
action_taken_message = (
f"[TEST MODE] Would have taken action `{action}`. No changes made."
)
notification_embed.color = discord.Color.greyple()
log_channel_id = get_guild_config(message.guild.id, "MOD_LOG_CHANNEL_ID")
log_channel = (
self.bot.get_channel(log_channel_id)
if log_channel_id
else message.channel
)
if action == "SUICIDAL":
suicidal_role_id = get_guild_config(
message.guild.id, "SUICIDAL_PING_ROLE_ID"
)
suicidal_role = (
message.guild.get_role(suicidal_role_id)
if suicidal_role_id
else None
)
ping_target = (
suicidal_role.mention
if suicidal_role
else f"Role ID {suicidal_role_id} (Suicidal Content)"
)
if not suicidal_role:
print(f"ERROR: Suicidal ping role ID {suicidal_role_id} not found.")
final_message = f"{ping_target}\n{action_taken_message}"
else:
suggestions_id = get_guild_config(
message.guild.id, "SUGGESTIONS_CHANNEL_ID"
)
suggestion_note = (
f"\nPlease review <#{suggestions_id}> for rule updates."
if suggestions_id
else ""
)
final_message = f"{mod_ping}\n{action_taken_message}{suggestion_note}"
await log_channel.send(
content=final_message,
embed=notification_embed,
view=self.QuickActionView(self, message.author),
)
return
# --- Perform Actions ---
try:
if action == "BAN":
action_taken_message = (
f"Action Taken: User **BANNED** and message deleted."
)
notification_embed.color = discord.Color.dark_red()
try:
await message.delete()
except discord.NotFound:
print("Message already deleted before banning.")
except discord.Forbidden:
print(
f"WARNING: Missing permissions to delete message before banning user {message.author}."
)
action_taken_message += (
" (Failed to delete message - check permissions)"
)
ban_reason = f"AI Mod: Rule {rule_violated}. Reason: {reasoning}"
await message.guild.ban(
message.author, reason=ban_reason, delete_message_days=1
)
print(
f"BANNED user {message.author} for violating rule {rule_violated}."
)
await add_user_infraction(
guild_id,
user_id,
rule_violated,
"BAN",
reasoning,
current_timestamp_iso,
)
elif action == "KICK":
action_taken_message = (
f"Action Taken: User **KICKED** and message deleted."
)
notification_embed.color = discord.Color.from_rgb(
255, 127, 0
) # Dark Orange
try:
await message.delete()
except discord.NotFound:
print("Message already deleted before kicking.")
except discord.Forbidden:
print(
f"WARNING: Missing permissions to delete message before kicking user {message.author}."
)
action_taken_message += (
" (Failed to delete message - check permissions)"
)
kick_reason = f"AI Mod: Rule {rule_violated}. Reason: {reasoning}"
await message.author.kick(reason=kick_reason)
print(
f"KICKED user {message.author} for violating rule {rule_violated}."
)
await add_user_infraction(
guild_id,
user_id,
rule_violated,
"KICK",
reasoning,
current_timestamp_iso,
)
elif action.startswith("TIMEOUT"):
duration_seconds = 0
duration_readable = ""
if action == "TIMEOUT_SHORT":
duration_seconds = 10 * 60 # 10 minutes
duration_readable = "10 minutes"
elif action == "TIMEOUT_MEDIUM":
duration_seconds = 60 * 60 # 1 hour
duration_readable = "1 hour"
elif action == "TIMEOUT_LONG":
duration_seconds = 24 * 60 * 60 # 1 day
duration_readable = "1 day"
if duration_seconds > 0:
action_taken_message = f"Action Taken: User **TIMED OUT for {duration_readable}** and message deleted."
notification_embed.color = discord.Color.blue()
try:
await message.delete()
except discord.NotFound:
print(
f"Message already deleted before timeout for {message.author}."
)
except discord.Forbidden:
print(
f"WARNING: Missing permissions to delete message before timeout for {message.author}."
)
action_taken_message += (
" (Failed to delete message - check permissions)"
)
timeout_reason = (
f"AI Mod: Rule {rule_violated}. Reason: {reasoning}"
)
# discord.py timeout takes a timedelta object
await message.author.timeout(
discord.utils.utcnow()
+ datetime.timedelta(seconds=duration_seconds),
reason=timeout_reason,
)
print(
f"TIMED OUT user {message.author} for {duration_readable} for violating rule {rule_violated}."
)
await add_user_infraction(
guild_id,
user_id,
rule_violated,
action,
reasoning,
current_timestamp_iso,
)
else:
action_taken_message = (
"Action Taken: **Unknown timeout duration, notifying mods.**"
)
action = (
"NOTIFY_MODS" # Fallback if timeout duration is not recognized
)
print(
f"Unknown timeout duration for action {action}. Defaulting to NOTIFY_MODS."
)
elif action == "DELETE":
action_taken_message = f"Action Taken: Message **DELETED**."
await message.delete()
print(
f"DELETED message from {message.author} for violating rule {rule_violated}."
)
# Typically, a simple delete isn't a formal infraction unless it's part of a WARN.
# If you want to log deletes as infractions, add:
# add_user_infraction(guild_id, user_id, rule_violated, "DELETE", reasoning, current_timestamp_iso)
elif action == "WARN":
action_taken_message = (
f"Action Taken: Message **DELETED** (AI suggested WARN)."
)
notification_embed.color = discord.Color.orange()
await message.delete() # Warnings usually involve deleting the offending message
print(
f"DELETED message from {message.author} (AI suggested WARN for rule {rule_violated})."
)
try:
dm_channel = await message.author.create_dm()
await dm_channel.send(
f"Your recent message in **{message.guild.name}** was removed for violating Rule **{rule_violated}**. "
f"Reason: _{reasoning}_. Please review the server rules. This is a formal warning."
)
action_taken_message += " User notified via DM with warning."
except discord.Forbidden:
print(
f"Could not DM warning to {message.author} (DMs likely disabled)."
)
action_taken_message += " (Could not DM user for warning)."
except Exception as e:
print(f"Error sending warning DM to {message.author}: {e}")
action_taken_message += " (Error sending warning DM)."
await add_user_infraction(
guild_id,
user_id,
rule_violated,
"WARN",
reasoning,
current_timestamp_iso,
)
elif action == "NOTIFY_MODS":
action_taken_message = "Action Taken: **Moderator review requested.**"
notification_embed.color = discord.Color.gold()
print(
f"Notifying moderators about potential violation (Rule {rule_violated}) by {message.author}."
)
# NOTIFY_MODS itself isn't an infraction on the user, but a request for human review.
# If mods take action, they would log it manually or via a mod command.
if notify_mods_message:
notification_embed.add_field(
name="Additional Mod Message",
value=notify_mods_message,
inline=False,
)
elif action == "SUICIDAL":
action_taken_message = (
"Action Taken: **User DMed resources, relevant role notified.**"
)
# No infraction is typically logged for "SUICIDAL" as it's a support action.
notification_embed.title = "🚨 Suicidal Content Detected 🚨"
notification_embed.color = (
discord.Color.dark_purple()
) # A distinct color
notification_embed.description = "AI analysis detected content indicating potential suicidal ideation."
print(
f"SUICIDAL content detected from {message.author}. DMing resources and notifying role."
)
# DM the user with help resources
try:
dm_channel = await message.author.create_dm()
await dm_channel.send(SUICIDAL_HELP_RESOURCES)
action_taken_message += " User successfully DMed."
except discord.Forbidden:
print(
f"Could not DM suicidal help resources to {message.author} (DMs likely disabled)."
)
action_taken_message += " (Could not DM user - DMs disabled)."
except Exception as e:
print(
f"Error sending suicidal help resources DM to {message.author}: {e}"
)
action_taken_message += f" (Error DMing user: {e})."
# The message itself is usually not deleted for suicidal content, to allow for intervention.
# If deletion is desired, add: await message.delete() here.
else: # Includes "IGNORE" or unexpected actions
if ai_decision.get(
"violation"
): # If violation is true but action is IGNORE
action_taken_message = "Action Taken: **None** (AI suggested IGNORE despite flagging violation - Review Recommended)."
notification_embed.color = discord.Color.light_grey()
print(
f"AI flagged violation ({rule_violated}) but suggested IGNORE for message by {message.author}. Notifying mods for review."
)
else:
# This case shouldn't be reached if called correctly, but handle defensively
print(
f"No action taken for message by {message.author} (AI Action: {action}, Violation: False)"
)
return # Don't notify if no violation and action is IGNORE
# --- Send Notification to Moderators/Relevant Role ---
log_channel_id = get_guild_config(message.guild.id, "MOD_LOG_CHANNEL_ID")
log_channel = (
self.bot.get_channel(log_channel_id) if log_channel_id else None
)
if not log_channel:
print(
f"ERROR: Moderation log channel (ID: {log_channel_id}) not found or not configured. Defaulting to message channel."
)
log_channel = message.channel
if not log_channel:
print(
f"ERROR: Could not find even the original message channel {message.channel.id} to send notification."
)
return
if action == "SUICIDAL":
suicidal_role_id = get_guild_config(
message.guild.id, "SUICIDAL_PING_ROLE_ID"
)
suicidal_role = (
message.guild.get_role(suicidal_role_id)
if suicidal_role_id
else None
)
ping_target = (
suicidal_role.mention
if suicidal_role
else f"Role ID {suicidal_role_id} (Suicidal Content)"
)
if not suicidal_role:
print(f"ERROR: Suicidal ping role ID {suicidal_role_id} not found.")
final_message = f"{ping_target}\n{action_taken_message}"
await log_channel.send(
content=final_message,
embed=notification_embed,
view=self.QuickActionView(self, message.author),
)
elif moderator_role: # For other violations
suggestions_id = get_guild_config(
message.guild.id, "SUGGESTIONS_CHANNEL_ID"
)
suggestion_note = (
f"\nPlease review <#{suggestions_id}> for rule updates."
if suggestions_id
else ""
)
final_message = f"{mod_ping}\n{action_taken_message}{suggestion_note}"
await log_channel.send(
content=final_message,
embed=notification_embed,
view=self.QuickActionView(self, message.author),
)
else: # Fallback if moderator role is also not found for non-suicidal actions
print(
f"ERROR: Moderator role ID {moderator_role_id} not found for action {action}."
)
except discord.Forbidden as e:
print(
f"ERROR: Missing Permissions to perform action '{action}' for rule {rule_violated}. Details: {e}"
)
# Try to notify mods about the failure
if moderator_role:
try:
await message.channel.send(
f"{mod_ping} **PERMISSION ERROR!** Could not perform action `{action}` on message by {message.author.mention} "
f"for violating Rule {rule_violated}. Please check bot permissions.\n"
f"Reasoning: _{reasoning}_\nMessage Link: {message.jump_url}"
)
except discord.Forbidden:
print(
"FATAL: Bot lacks permission to send messages, even error notifications."
)
except discord.NotFound:
print(
f"Message {message.id} was likely already deleted when trying to perform action '{action}'."
)
except Exception as e:
print(
f"An unexpected error occurred during action execution for message {message.id}: {e}"
)
# Try to notify mods about the unexpected error
if moderator_role:
try:
await message.channel.send(
f"{mod_ping} **UNEXPECTED ERROR!** An error occurred while handling rule violation "
f"for {message.author.mention}. Please check bot logs.\n"
f"Rule: {rule_violated}, Action Attempted: {action}\nMessage Link: {message.jump_url}"
)
except discord.Forbidden:
print(
"FATAL: Bot lacks permission to send messages, even error notifications."
)
@commands.Cog.listener(name="on_message")
async def message_listener(self, message: discord.Message):
"""Listens to messages and triggers moderation checks."""
print(f"on_message triggered for message ID: {message.id}")
# --- Basic Checks ---
# Ignore messages from bots (including self)
if message.author.bot:
print(f"Ignoring message {message.id} from bot.")
return
# Ignore messages without content or attachments
if not message.content and not message.attachments:
print(f"Ignoring message {message.id} with no content or attachments.")
return
# Ignore DMs
if not message.guild:
print(f"Ignoring message {message.id} from DM.")
return
# Check if moderation is enabled for this guild
if not get_guild_config(message.guild.id, "ENABLED", False):
print(
f"Moderation disabled for guild {message.guild.id}. Ignoring message {message.id}."
)
return
if get_guild_config(message.guild.id, "EVENT_MODE", False):
print(
f"Event mode enabled for guild {message.guild.id}. Ignoring message {message.id}."
)
return
# --- Suicidal Content Check ---
# Suicidal keyword check removed; handled by OpenRouter AI moderation.
# --- Prepare for AI Analysis ---
message_content = message.content
# Check for attachments
image_data_list = []
if message.attachments:
# Process all attachments
for attachment in message.attachments:
mime_type, image_bytes, attachment_type = await self.process_attachment(
attachment
)
if mime_type and image_bytes and attachment_type:
image_data_list.append(
(mime_type, image_bytes, attachment_type, attachment.filename)
)
print(
f"Processed attachment: {attachment.filename} as {attachment_type}"
)
# Log the number of attachments processed
if image_data_list:
print(
f"Processed {len(image_data_list)} attachments for message {message.id}"
)
# Only proceed with AI analysis if there's text to analyze or attachments
if not message_content and not image_data_list:
print(
f"Ignoring message {message.id} with no content or valid attachments."
)
return
# NSFW channel check removed - AI will handle this context
# --- Call AI for Analysis (All Rules) ---
# Check if the Vertex AI client is available
if not self.genai_client:
print(
f"Skipping AI analysis for message {message.id}: Vertex AI client is not initialized."
)
return
# Prepare user history for the AI
infractions = get_user_infraction_history(message.guild.id, message.author.id)
history_summary_parts = []
if infractions:
for infr in infractions:
history_summary_parts.append(
f"- Action: {infr.get('action_taken', 'N/A')} for Rule {infr.get('rule_violated', 'N/A')} on {infr.get('timestamp', 'N/A')[:10]}. Reason: {infr.get('reasoning', 'N/A')[:50]}..."
)
user_history_summary = (
"\n".join(history_summary_parts)
if history_summary_parts
else "No prior infractions recorded."
)
# Limit history summary length to prevent excessively long prompts
max_history_len = 500
if len(user_history_summary) > max_history_len:
user_history_summary = user_history_summary[: max_history_len - 3] + "..."
print(
f"Analyzing message {message.id} from {message.author} in #{message.channel.name} with history..."
)
if image_data_list:
attachment_types = [data[2] for data in image_data_list]
print(
f"Including {len(image_data_list)} attachments in analysis: {', '.join(attachment_types)}"
)
ai_decision = await self.query_vertex_ai(
message, message_content, user_history_summary, image_data_list
)
# --- Process AI Decision ---
if not ai_decision:
print(f"Failed to get valid AI decision for message {message.id}.")
# Optionally notify mods about AI failure if it happens often
# Store the failure attempt for debugging
self.last_ai_decisions.append(
{
"message_id": message.id,
"author_name": str(message.author),
"author_id": message.author.id,
"message_content_snippet": (
message.content[:100] + "..."
if len(message.content) > 100
else message.content
),
"timestamp": datetime.datetime.now(
datetime.timezone.utc
).isoformat(),
"ai_decision": {
"error": "Failed to get valid AI decision",
"raw_response": None,
}, # Simplified error logging
}
)
return # Stop if AI fails or returns invalid data
# Store the AI decision regardless of violation status
self.last_ai_decisions.append(
{
"message_id": message.id,
"author_name": str(message.author),
"author_id": message.author.id,
"message_content_snippet": (
message.content[:100] + "..."
if len(message.content) > 100
else message.content
),
"timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"ai_decision": ai_decision,
}
)
# Check if the AI flagged a violation
if ai_decision.get("violation"):
# Handle the violation based on AI decision without overrides
# Pass notify_mods_message if the action is NOTIFY_MODS
notify_mods_message = (
ai_decision.get("notify_mods_message")
if ai_decision.get("action") == "NOTIFY_MODS"
else None
)
await self.handle_violation(message, ai_decision, notify_mods_message)
else:
# AI found no violation
print(
f"AI analysis complete for message {message.id}. No violation detected."
)
@debug_subgroup.command(
name="last_decisions",
description="View the last 5 AI moderation decisions (admin only).",
)
@app_commands.checks.has_permissions(administrator=True)
async def aidebug_last_decisions(self, interaction: discord.Interaction):
if not self.last_ai_decisions:
await interaction.response.send_message(
"No AI decisions have been recorded yet.", ephemeral=True
)
return
embed = discord.Embed(
title="Last 5 AI Moderation Decisions", color=discord.Color.purple()
)
embed.timestamp = discord.utils.utcnow()
for i, record in enumerate(
reversed(list(self.last_ai_decisions))
): # Show newest first
decision_info = record.get("ai_decision", {})
violation = decision_info.get("violation", "N/A")
rule_violated = decision_info.get("rule_violated", "N/A")
reasoning = decision_info.get("reasoning", "N/A")
action = decision_info.get("action", "N/A")
error_msg = decision_info.get("error")
field_value = (
f"**Author:** {record.get('author_name', 'N/A')} ({record.get('author_id', 'N/A')})\n"
f"**Message ID:** {record.get('message_id', 'N/A')}\n"
f"**Content Snippet:** ```{record.get('message_content_snippet', 'N/A')}```\n"
f"**Timestamp:** {record.get('timestamp', 'N/A')[:19].replace('T', ' ')}\n"
)
if error_msg:
field_value += f"**Status:** <font color='red'>Error during processing: {error_msg}</font>\n"
else:
field_value += (
f"**Violation:** {violation}\n"
f"**Rule Violated:** {rule_violated}\n"
f"**Action:** {action}\n"
f"**Reasoning:** ```{reasoning}```\n"
)
# Truncate field_value if it's too long for an embed field
if len(field_value) > 1024:
field_value = field_value[:1020] + "..."
embed.add_field(
name=f"Decision #{len(self.last_ai_decisions) - i}",
value=field_value,
inline=False,
)
if (
len(embed.fields) >= 5
): # Limit to 5 fields in one embed for very long entries, or send multiple embeds
break
if not embed.fields: # Should not happen if self.last_ai_decisions is not empty
await interaction.response.send_message(
"Could not format AI decisions.", ephemeral=True
)
return
await interaction.response.send_message(embed=embed, ephemeral=True)
@aidebug_last_decisions.error
async def aidebug_last_decisions_error(
self, interaction: discord.Interaction, error: app_commands.AppCommandError
):
if isinstance(error, app_commands.MissingPermissions):
await interaction.response.send_message(
"You must be an administrator to use this command.", ephemeral=True
)
else:
await interaction.response.send_message(
f"An error occurred: {error}", ephemeral=True
)
print(f"Error in aidebug_last_decisions command: {error}")
# Setup function required by discord.py to load the cog
async def setup(bot: commands.Bot):
"""Loads the AIModerationCog."""
# The API key is now fetched in cog_load, so we don't need to check here.
await bot.add_cog(AIModerationCog(bot))
print("AIModerationCog has been loaded.")