2887 lines
127 KiB
Python
2887 lines
127 KiB
Python
# moderation_cog.py
|
|
import discord
|
|
from discord.ext import commands
|
|
from discord import app_commands
|
|
|
|
# import aiohttp # For making asynchronous HTTP requests - Replaced by Google GenAI client
|
|
import json
|
|
import os # To load environment variables
|
|
import collections # For deque
|
|
import datetime # For timestamps
|
|
import io # For BytesIO operations
|
|
import base64 # For encoding images to base64
|
|
from PIL import Image # For image processing
|
|
import cv2 # For video processing
|
|
import numpy as np # For array operations
|
|
import tempfile # For temporary file operations
|
|
import shutil # For backing up files
|
|
from typing import Optional, List, Dict, Any, Tuple # For type hinting
|
|
import asyncio
|
|
import aiofiles
|
|
import re
|
|
|
|
# Google Generative AI Imports (using Vertex AI backend)
|
|
from google import genai
|
|
from google.genai import types
|
|
from google.api_core import exceptions as google_exceptions
|
|
|
|
# Import project configuration for Vertex AI
|
|
from gurt.config import (
|
|
PROJECT_ID,
|
|
LOCATION,
|
|
) # Assuming gurt.config exists and has these
|
|
from gurt.genai_client import get_genai_client_for_model
|
|
|
|
from . import aimod_config as aimod_config_module
|
|
from .aimod_config import (
|
|
DEFAULT_VERTEX_AI_MODEL,
|
|
STANDARD_SAFETY_SETTINGS,
|
|
GUILD_CONFIG_PATH,
|
|
USER_INFRACTIONS_PATH,
|
|
INFRACTION_BACKUP_DIR,
|
|
USER_APPEALS_PATH,
|
|
APPEAL_AI_MODEL,
|
|
APPEAL_AI_THINKING_BUDGET,
|
|
CONFIG_LOCK,
|
|
save_user_infractions,
|
|
save_user_appeals,
|
|
get_guild_config,
|
|
set_guild_config,
|
|
get_user_infraction_history,
|
|
add_user_infraction,
|
|
get_user_appeals,
|
|
add_user_appeal,
|
|
SERVER_RULES,
|
|
MODERATION_INSTRUCTIONS,
|
|
SUICIDAL_HELP_RESOURCES,
|
|
)
|
|
|
|
|
|
# Avoid loading an excessive number of messages when updating rules
|
|
MAX_RULE_MESSAGES = 25
|
|
|
|
|
|
class AIModerationCog(commands.Cog):
|
|
"""
|
|
A Discord Cog that uses Google Vertex AI to moderate messages based on server rules.
|
|
"""
|
|
|
|
def __init__(self, bot: commands.Bot):
|
|
self.bot = bot
|
|
self.genai_client = None
|
|
try:
|
|
if PROJECT_ID and LOCATION:
|
|
self.genai_client = genai.Client(
|
|
vertexai=True,
|
|
project=PROJECT_ID,
|
|
location=LOCATION,
|
|
)
|
|
print(
|
|
f"AIModerationCog: Google GenAI Client initialized for Vertex AI project '{PROJECT_ID}' in location '{LOCATION}'."
|
|
)
|
|
else:
|
|
print(
|
|
"AIModerationCog: PROJECT_ID or LOCATION not found in config. Google GenAI Client not initialized."
|
|
)
|
|
except Exception as e:
|
|
print(
|
|
f"AIModerationCog: Error initializing Google GenAI Client for Vertex AI: {e}"
|
|
)
|
|
|
|
self.last_ai_decisions = collections.deque(
|
|
maxlen=5
|
|
) # Store last 5 AI decisions
|
|
self.config_lock = CONFIG_LOCK
|
|
# Supported image file extensions
|
|
self.image_extensions = [
|
|
".jpg",
|
|
".jpeg",
|
|
".png",
|
|
".webp",
|
|
".bmp",
|
|
".heic",
|
|
".heif",
|
|
] # Added heic/heif for Vertex
|
|
# Supported animated file extensions
|
|
self.gif_extensions = [".gif"]
|
|
# Supported video file extensions (Vertex AI can process short video clips directly)
|
|
self.video_extensions = [
|
|
".mp4",
|
|
".webm",
|
|
".mov",
|
|
".avi",
|
|
".mkv",
|
|
".flv",
|
|
] # Expanded list
|
|
self.backup_task = self.bot.loop.create_task(
|
|
self.backup_infractions_periodically()
|
|
)
|
|
print("AIModerationCog Initialized.")
|
|
|
|
def is_testing_mode(self, guild_id: int) -> bool:
|
|
"""Return True if testing mode is enabled for the guild."""
|
|
return get_guild_config(guild_id, "TESTING_MODE", False)
|
|
|
|
class QuickActionView(discord.ui.View):
|
|
"""Buttons for quick moderator actions."""
|
|
|
|
def __init__(self, parent: "AIModerationCog", target: discord.Member):
|
|
super().__init__(timeout=3600)
|
|
self.parent = parent
|
|
self.target = target
|
|
self.message: discord.Message | None = None
|
|
|
|
# --- Helper Modals ---
|
|
class BanModal(discord.ui.Modal, title="Ban User"):
|
|
reason = discord.ui.TextInput(
|
|
label="Reason",
|
|
placeholder="Reason for ban",
|
|
style=discord.TextStyle.paragraph,
|
|
required=False,
|
|
max_length=512,
|
|
)
|
|
|
|
def __init__(self, view: "AIModerationCog.QuickActionView"):
|
|
super().__init__()
|
|
self.view = view
|
|
|
|
async def on_submit(self, interaction: discord.Interaction):
|
|
if not interaction.user.guild_permissions.ban_members:
|
|
await interaction.response.send_message(
|
|
"You lack permission to ban members.", ephemeral=True
|
|
)
|
|
return
|
|
if self.view.parent.is_testing_mode(interaction.guild.id):
|
|
await interaction.response.send_message(
|
|
f"[TEST MODE] Would ban {self.view.target.mention}.",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
try:
|
|
await self.view.target.ban(
|
|
reason=self.reason.value or "Escalated via mod panel"
|
|
)
|
|
await interaction.response.send_message(
|
|
f"Banned {self.view.target.mention}.", ephemeral=True
|
|
)
|
|
except Exception as e: # noqa: BLE001
|
|
await interaction.response.send_message(
|
|
f"Failed to ban: {e}", ephemeral=True
|
|
)
|
|
self.view.disable_all_items()
|
|
if self.view.message:
|
|
await self.view.message.edit(view=self.view)
|
|
|
|
class KickModal(discord.ui.Modal, title="Kick User"):
|
|
reason = discord.ui.TextInput(
|
|
label="Reason",
|
|
placeholder="Reason for kick",
|
|
style=discord.TextStyle.paragraph,
|
|
required=False,
|
|
max_length=512,
|
|
)
|
|
|
|
def __init__(self, view: "AIModerationCog.QuickActionView"):
|
|
super().__init__()
|
|
self.view = view
|
|
|
|
async def on_submit(self, interaction: discord.Interaction):
|
|
if not interaction.user.guild_permissions.kick_members:
|
|
await interaction.response.send_message(
|
|
"You lack permission to kick members.", ephemeral=True
|
|
)
|
|
return
|
|
if self.view.parent.is_testing_mode(interaction.guild.id):
|
|
await interaction.response.send_message(
|
|
f"[TEST MODE] Would kick {self.view.target.mention}.",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
try:
|
|
await self.view.target.kick(
|
|
reason=self.reason.value or "Escalated via mod panel"
|
|
)
|
|
await interaction.response.send_message(
|
|
f"Kicked {self.view.target.mention}.", ephemeral=True
|
|
)
|
|
except Exception as e: # noqa: BLE001
|
|
await interaction.response.send_message(
|
|
f"Failed to kick: {e}", ephemeral=True
|
|
)
|
|
self.view.disable_all_items()
|
|
if self.view.message:
|
|
await self.view.message.edit(view=self.view)
|
|
|
|
class TimeoutModal(discord.ui.Modal, title="Timeout User"):
|
|
duration = discord.ui.TextInput(
|
|
label="Duration",
|
|
placeholder="e.g. 10m, 1h, 1d",
|
|
required=True,
|
|
max_length=10,
|
|
)
|
|
reason = discord.ui.TextInput(
|
|
label="Reason",
|
|
placeholder="Reason for timeout",
|
|
style=discord.TextStyle.paragraph,
|
|
required=False,
|
|
max_length=512,
|
|
)
|
|
|
|
def __init__(self, view: "AIModerationCog.QuickActionView"):
|
|
super().__init__()
|
|
self.view = view
|
|
|
|
@staticmethod
|
|
def parse_duration(duration_str: str) -> datetime.timedelta | None:
|
|
if not duration_str:
|
|
return None
|
|
try:
|
|
amount = int("".join(filter(str.isdigit, duration_str)))
|
|
unit = "".join(filter(str.isalpha, duration_str)).lower()
|
|
if unit in {"d", "day", "days"}:
|
|
return datetime.timedelta(days=amount)
|
|
if unit in {"h", "hour", "hours"}:
|
|
return datetime.timedelta(hours=amount)
|
|
if unit in {"m", "min", "minute", "minutes"}:
|
|
return datetime.timedelta(minutes=amount)
|
|
if unit in {"s", "sec", "second", "seconds"}:
|
|
return datetime.timedelta(seconds=amount)
|
|
except (ValueError, TypeError):
|
|
return None
|
|
return None
|
|
|
|
async def on_submit(self, interaction: discord.Interaction):
|
|
if not interaction.user.guild_permissions.moderate_members:
|
|
await interaction.response.send_message(
|
|
"You lack permission to timeout members.", ephemeral=True
|
|
)
|
|
return
|
|
if self.view.parent.is_testing_mode(interaction.guild.id):
|
|
await interaction.response.send_message(
|
|
f"[TEST MODE] Would timeout {self.view.target.mention} for {self.duration.value}.",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
delta = self.parse_duration(self.duration.value)
|
|
if not delta or delta > datetime.timedelta(days=28):
|
|
await interaction.response.send_message(
|
|
"Invalid duration. Use formats like '10m', '1h', '1d'",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
try:
|
|
until = discord.utils.utcnow() + delta
|
|
await self.view.target.timeout(
|
|
until, reason=self.reason.value or "Escalated via mod panel"
|
|
)
|
|
await interaction.response.send_message(
|
|
f"Timed out {self.view.target.mention} for {self.duration.value}.",
|
|
ephemeral=True,
|
|
)
|
|
except Exception as e: # noqa: BLE001
|
|
await interaction.response.send_message(
|
|
f"Failed to timeout: {e}", ephemeral=True
|
|
)
|
|
self.view.disable_all_items()
|
|
if self.view.message:
|
|
await self.view.message.edit(view=self.view)
|
|
|
|
@discord.ui.button(label="Escalate Ban", style=discord.ButtonStyle.danger)
|
|
async def escalate(
|
|
self, interaction: discord.Interaction, button: discord.ui.Button
|
|
):
|
|
if not interaction.user.guild_permissions.ban_members:
|
|
await interaction.response.send_message(
|
|
"You lack permission to ban members.", ephemeral=True
|
|
)
|
|
return
|
|
self.message = interaction.message
|
|
await interaction.response.send_modal(self.BanModal(self))
|
|
|
|
@discord.ui.button(label="Kick", style=discord.ButtonStyle.primary)
|
|
async def kick(
|
|
self, interaction: discord.Interaction, button: discord.ui.Button
|
|
):
|
|
if not interaction.user.guild_permissions.kick_members:
|
|
await interaction.response.send_message(
|
|
"You lack permission to kick members.", ephemeral=True
|
|
)
|
|
return
|
|
self.message = interaction.message
|
|
await interaction.response.send_modal(self.KickModal(self))
|
|
|
|
@discord.ui.button(label="Timeout", style=discord.ButtonStyle.secondary)
|
|
async def timeout_action(
|
|
self, interaction: discord.Interaction, button: discord.ui.Button
|
|
):
|
|
if not interaction.user.guild_permissions.moderate_members:
|
|
await interaction.response.send_message(
|
|
"You lack permission to timeout members.", ephemeral=True
|
|
)
|
|
return
|
|
self.message = interaction.message
|
|
await interaction.response.send_modal(self.TimeoutModal(self))
|
|
|
|
@discord.ui.button(label="Ignore", style=discord.ButtonStyle.secondary)
|
|
async def ignore(
|
|
self, interaction: discord.Interaction, button: discord.ui.Button
|
|
):
|
|
if interaction.user.guild_permissions.manage_messages:
|
|
await interaction.message.delete()
|
|
await interaction.response.send_message(
|
|
"Notification dismissed.", ephemeral=True
|
|
)
|
|
else:
|
|
await interaction.response.send_message(
|
|
"No permission to manage messages.", ephemeral=True
|
|
)
|
|
|
|
async def cog_load(self):
|
|
"""Called when the cog is loaded."""
|
|
print("AIModerationCog cog_load started.")
|
|
if not self.genai_client:
|
|
print("\n" + "=" * 60)
|
|
print(
|
|
"=== WARNING: AIModerationCog - Vertex AI Client not initialized! ==="
|
|
)
|
|
print("=== The Moderation Cog requires a valid Vertex AI setup. ===")
|
|
print(
|
|
f"=== Check PROJECT_ID and LOCATION in gurt.config and GCP authentication. ==="
|
|
)
|
|
print("=" * 60 + "\n")
|
|
else:
|
|
print("AIModerationCog: Vertex AI Client seems to be initialized.")
|
|
print("AIModerationCog cog_load finished.")
|
|
|
|
# _load_openrouter_models is no longer needed.
|
|
|
|
async def cog_unload(self):
|
|
"""Clean up when the cog is unloaded."""
|
|
# The genai.Client doesn't have an explicit close method in the same way aiohttp.ClientSession does.
|
|
# It typically manages its own resources.
|
|
print("AIModerationCog Unloaded.")
|
|
if self.backup_task:
|
|
self.backup_task.cancel()
|
|
|
|
async def backup_infractions_periodically(self):
|
|
"""Periodically back up the infractions file."""
|
|
await self.bot.wait_until_ready()
|
|
while not self.bot.is_closed():
|
|
timestamp = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S")
|
|
backup_path = os.path.join(
|
|
INFRACTION_BACKUP_DIR, f"user_infractions_{timestamp}.json"
|
|
)
|
|
try:
|
|
shutil.copy(USER_INFRACTIONS_PATH, backup_path)
|
|
except Exception as e: # noqa: BLE001
|
|
print(f"Failed to back up infractions: {e}")
|
|
await asyncio.sleep(24 * 60 * 60)
|
|
|
|
async def process_image(self, attachment: discord.Attachment) -> tuple[str, bytes]:
|
|
"""
|
|
Process an image attachment and return its base64 encoding.
|
|
|
|
Args:
|
|
attachment: The Discord attachment containing the image
|
|
|
|
Returns:
|
|
Tuple of (mime_type, image_bytes)
|
|
"""
|
|
try:
|
|
# Download the image
|
|
image_bytes = await attachment.read()
|
|
mime_type = (
|
|
attachment.content_type or "image/jpeg"
|
|
) # Default to jpeg if not specified
|
|
|
|
# Return the image bytes and mime type
|
|
return mime_type, image_bytes
|
|
except Exception as e:
|
|
print(f"Error processing image: {e}")
|
|
return None, None
|
|
|
|
async def process_gif(self, attachment: discord.Attachment) -> tuple[str, bytes]:
|
|
"""Return the raw bytes for a GIF attachment."""
|
|
try:
|
|
gif_bytes = await attachment.read()
|
|
mime_type = attachment.content_type or "image/gif"
|
|
return mime_type, gif_bytes
|
|
except Exception as e:
|
|
print(f"Error processing GIF: {e}")
|
|
return None, None
|
|
|
|
async def process_attachment(
|
|
self, attachment: discord.Attachment
|
|
) -> tuple[str, bytes, str]:
|
|
"""
|
|
Process any attachment and return the appropriate image data.
|
|
|
|
Args:
|
|
attachment: The Discord attachment
|
|
|
|
Returns:
|
|
Tuple of (mime_type, image_bytes, attachment_type)
|
|
attachment_type is one of: 'image', 'gif', 'video', or None if unsupported
|
|
"""
|
|
if not attachment:
|
|
return None, None, None
|
|
|
|
# Get the file extension
|
|
filename = attachment.filename.lower()
|
|
_, ext = os.path.splitext(filename)
|
|
|
|
# Process based on file type
|
|
if ext in self.image_extensions:
|
|
mime_type, image_bytes = await self.process_image(attachment)
|
|
return mime_type, image_bytes, "image"
|
|
elif ext in self.gif_extensions:
|
|
mime_type, image_bytes = await self.process_gif(attachment)
|
|
return mime_type, image_bytes, "gif"
|
|
elif ext in self.video_extensions:
|
|
mime_type, image_bytes = await self.process_video(attachment)
|
|
return mime_type, image_bytes, "video"
|
|
else:
|
|
print(f"Unsupported file type: {ext}")
|
|
return None, None, None
|
|
|
|
async def process_video(self, attachment: discord.Attachment) -> tuple[str, bytes]:
|
|
"""Return the raw bytes for a video attachment."""
|
|
try:
|
|
video_bytes = await attachment.read()
|
|
mime_type = attachment.content_type or "video/mp4"
|
|
return mime_type, video_bytes
|
|
except Exception as e:
|
|
print(f"Error processing video: {e}")
|
|
return None, None
|
|
|
|
async def process_url_attachment(self, url: str) -> tuple[str, bytes, str, str]:
|
|
"""Fetch an attachment from a direct link."""
|
|
import aiohttp
|
|
|
|
try:
|
|
cleaned_url = url.strip("<>")
|
|
filename = cleaned_url.split("/")[-1].split("?")[0]
|
|
_, ext = os.path.splitext(filename.lower())
|
|
if ext in self.image_extensions:
|
|
attachment_type = "image"
|
|
elif ext in self.gif_extensions:
|
|
attachment_type = "gif"
|
|
elif ext in self.video_extensions:
|
|
attachment_type = "video"
|
|
else:
|
|
return None, None, None, None
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(cleaned_url) as resp:
|
|
if resp.status != 200:
|
|
print(
|
|
f"Failed to fetch URL attachment {cleaned_url}: {resp.status}"
|
|
)
|
|
return None, None, None, None
|
|
data = await resp.read()
|
|
mime_type = resp.headers.get(
|
|
"Content-Type", f"image/{ext.lstrip('.')}"
|
|
)
|
|
return mime_type, data, attachment_type, filename
|
|
except Exception as e:
|
|
print(f"Error processing URL attachment {url}: {e}")
|
|
return None, None, None, None
|
|
|
|
def extract_direct_attachment_urls(self, text: str) -> List[str]:
|
|
"""Return a list of direct image/video URLs found in the text."""
|
|
|
|
urls = re.findall(r"https?://\S+", text or "")
|
|
allowed_exts = (
|
|
self.image_extensions + self.gif_extensions + self.video_extensions
|
|
)
|
|
results = []
|
|
for u in urls:
|
|
cleaned = u.strip("<>")
|
|
path = cleaned.split("?")[0]
|
|
_, ext = os.path.splitext(path.lower())
|
|
if ext in allowed_exts:
|
|
results.append(cleaned)
|
|
return results
|
|
|
|
# --- AI Moderation Command Group ---
|
|
aimod_group = app_commands.Group(
|
|
name="aimod", description="AI Moderation commands."
|
|
)
|
|
config_subgroup = app_commands.Group(
|
|
name="config",
|
|
description="Configure AI moderation settings.",
|
|
parent=aimod_group,
|
|
)
|
|
infractions_subgroup = app_commands.Group(
|
|
name="infractions", description="Manage user infractions.", parent=aimod_group
|
|
)
|
|
appeal_subgroup = app_commands.Group(
|
|
name="appeal", description="Appeal AI moderation actions.", parent=aimod_group
|
|
)
|
|
model_subgroup = app_commands.Group(
|
|
name="model",
|
|
description="Manage the AI model for moderation.",
|
|
parent=aimod_group,
|
|
)
|
|
debug_subgroup = app_commands.Group(
|
|
name="debug",
|
|
description="Debugging commands for AI moderation.",
|
|
parent=aimod_group,
|
|
)
|
|
|
|
@aimod_group.command(
|
|
name="sync",
|
|
description="Reload AI moderation configuration and infractions from disk.",
|
|
)
|
|
@app_commands.checks.has_permissions(administrator=True)
|
|
async def aimod_sync(self, interaction: discord.Interaction):
|
|
"""Reload configuration files from disk."""
|
|
try:
|
|
async with aiofiles.open(GUILD_CONFIG_PATH, "r", encoding="utf-8") as f:
|
|
data = await f.read()
|
|
async with CONFIG_LOCK:
|
|
global GUILD_CONFIG
|
|
GUILD_CONFIG = json.loads(data)
|
|
async with aiofiles.open(
|
|
USER_INFRACTIONS_PATH, "r", encoding="utf-8"
|
|
) as f2:
|
|
data2 = await f2.read()
|
|
async with CONFIG_LOCK:
|
|
global USER_INFRACTIONS
|
|
USER_INFRACTIONS = json.loads(data2)
|
|
await interaction.response.send_message(
|
|
"Configuration synced from disk.", ephemeral=True
|
|
)
|
|
except Exception as e: # noqa: BLE001
|
|
await interaction.response.send_message(
|
|
f"Failed to reload configuration: {e}", ephemeral=True
|
|
)
|
|
|
|
@config_subgroup.command(
|
|
name="log_channel", description="Set the moderation log channel."
|
|
)
|
|
@app_commands.describe(channel="The text channel to use for moderation logs.")
|
|
@app_commands.checks.has_permissions(administrator=True)
|
|
async def modset_log_channel(
|
|
self, interaction: discord.Interaction, channel: discord.TextChannel
|
|
):
|
|
await set_guild_config(interaction.guild.id, "MOD_LOG_CHANNEL_ID", channel.id)
|
|
await interaction.response.send_message(
|
|
f"Moderation log channel set to {channel.mention}.", ephemeral=False
|
|
)
|
|
|
|
@config_subgroup.command(
|
|
name="suggestions_channel", description="Set the suggestions channel."
|
|
)
|
|
@app_commands.describe(channel="The text channel to use for suggestions.")
|
|
@app_commands.checks.has_permissions(administrator=True)
|
|
async def modset_suggestions_channel(
|
|
self, interaction: discord.Interaction, channel: discord.TextChannel
|
|
):
|
|
await set_guild_config(
|
|
interaction.guild.id, "SUGGESTIONS_CHANNEL_ID", channel.id
|
|
)
|
|
await interaction.response.send_message(
|
|
f"Suggestions channel set to {channel.mention}.", ephemeral=False
|
|
)
|
|
|
|
@config_subgroup.command(
|
|
name="moderator_role", description="Set the moderator role."
|
|
)
|
|
@app_commands.describe(role="The role that identifies moderators.")
|
|
@app_commands.checks.has_permissions(administrator=True)
|
|
async def modset_moderator_role(
|
|
self, interaction: discord.Interaction, role: discord.Role
|
|
):
|
|
await set_guild_config(interaction.guild.id, "MODERATOR_ROLE_ID", role.id)
|
|
await interaction.response.send_message(
|
|
f"Moderator role set to {role.mention}.", ephemeral=False
|
|
)
|
|
|
|
@config_subgroup.command(
|
|
name="suicidal_ping_role",
|
|
description="Set the role to ping for suicidal content.",
|
|
)
|
|
@app_commands.describe(role="The role to ping for urgent suicidal content alerts.")
|
|
@app_commands.checks.has_permissions(administrator=True)
|
|
async def modset_suicidal_ping_role(
|
|
self, interaction: discord.Interaction, role: discord.Role
|
|
):
|
|
await set_guild_config(interaction.guild.id, "SUICIDAL_PING_ROLE_ID", role.id)
|
|
await interaction.response.send_message(
|
|
f"Suicidal content ping role set to {role.mention}.", ephemeral=False
|
|
)
|
|
|
|
@config_subgroup.command(
|
|
name="add_nsfw_channel",
|
|
description="Add a channel to the list of NSFW channels.",
|
|
)
|
|
@app_commands.describe(channel="The text channel to mark as NSFW for the bot.")
|
|
@app_commands.checks.has_permissions(administrator=True)
|
|
async def modset_add_nsfw_channel(
|
|
self, interaction: discord.Interaction, channel: discord.TextChannel
|
|
):
|
|
guild_id = interaction.guild.id
|
|
nsfw_channels: list[int] = get_guild_config(guild_id, "NSFW_CHANNEL_IDS", [])
|
|
if channel.id not in nsfw_channels:
|
|
nsfw_channels.append(channel.id)
|
|
await set_guild_config(guild_id, "NSFW_CHANNEL_IDS", nsfw_channels)
|
|
await interaction.response.send_message(
|
|
f"{channel.mention} added to NSFW channels list.", ephemeral=False
|
|
)
|
|
else:
|
|
await interaction.response.send_message(
|
|
f"{channel.mention} is already in the NSFW channels list.",
|
|
ephemeral=True,
|
|
)
|
|
|
|
@config_subgroup.command(
|
|
name="remove_nsfw_channel",
|
|
description="Remove a channel from the list of NSFW channels.",
|
|
)
|
|
@app_commands.describe(channel="The text channel to remove from the NSFW list.")
|
|
@app_commands.checks.has_permissions(administrator=True)
|
|
async def modset_remove_nsfw_channel(
|
|
self, interaction: discord.Interaction, channel: discord.TextChannel
|
|
):
|
|
guild_id = interaction.guild.id
|
|
nsfw_channels: list[int] = get_guild_config(guild_id, "NSFW_CHANNEL_IDS", [])
|
|
if channel.id in nsfw_channels:
|
|
nsfw_channels.remove(channel.id)
|
|
await set_guild_config(guild_id, "NSFW_CHANNEL_IDS", nsfw_channels)
|
|
await interaction.response.send_message(
|
|
f"{channel.mention} removed from NSFW channels list.", ephemeral=False
|
|
)
|
|
else:
|
|
await interaction.response.send_message(
|
|
f"{channel.mention} is not in the NSFW channels list.", ephemeral=True
|
|
)
|
|
|
|
@config_subgroup.command(
|
|
name="list_nsfw_channels",
|
|
description="List currently configured NSFW channels.",
|
|
)
|
|
@app_commands.checks.has_permissions(administrator=True)
|
|
async def modset_list_nsfw_channels(self, interaction: discord.Interaction):
|
|
guild_id = interaction.guild.id
|
|
nsfw_channel_ids: list[int] = get_guild_config(guild_id, "NSFW_CHANNEL_IDS", [])
|
|
if not nsfw_channel_ids:
|
|
await interaction.response.send_message(
|
|
"No NSFW channels are currently configured.", ephemeral=False
|
|
)
|
|
return
|
|
|
|
channel_mentions = []
|
|
for channel_id in nsfw_channel_ids:
|
|
channel_obj = interaction.guild.get_channel(channel_id)
|
|
if channel_obj:
|
|
channel_mentions.append(channel_obj.mention)
|
|
else:
|
|
channel_mentions.append(f"ID:{channel_id} (not found)")
|
|
|
|
await interaction.response.send_message(
|
|
f"Configured NSFW channels:\n- " + "\n- ".join(channel_mentions),
|
|
ephemeral=False,
|
|
)
|
|
|
|
# Note: The @app_commands.command(name="modenable", ...) and other commands like
|
|
# viewinfractions, clearinfractions, modsetmodel, modgetmodel remain as top-level commands
|
|
# as they were not part of the original "modset" generic command structure.
|
|
# If these also need to be grouped, that would be a separate consideration.
|
|
|
|
@config_subgroup.command(
|
|
name="enable",
|
|
description="Enable or disable moderation for this guild (admin only).",
|
|
)
|
|
@app_commands.describe(enabled="Enable moderation (true/false)")
|
|
async def modenable(self, interaction: discord.Interaction, enabled: bool):
|
|
if not interaction.user.guild_permissions.administrator:
|
|
await interaction.response.send_message(
|
|
"You must be an administrator to use this command.", ephemeral=False
|
|
)
|
|
return
|
|
await set_guild_config(interaction.guild.id, "ENABLED", enabled)
|
|
await interaction.response.send_message(
|
|
f"Moderation is now {'enabled' if enabled else 'disabled'} for this guild.",
|
|
ephemeral=False,
|
|
)
|
|
|
|
@config_subgroup.command(
|
|
name="event_mode",
|
|
description="Toggle temporary event mode for this guild.",
|
|
)
|
|
@app_commands.describe(enabled="Enable event mode (true/false)")
|
|
async def event_mode(self, interaction: discord.Interaction, enabled: bool):
|
|
if not interaction.user.guild_permissions.administrator:
|
|
await interaction.response.send_message(
|
|
"You must be an administrator to use this command.", ephemeral=False
|
|
)
|
|
return
|
|
await set_guild_config(interaction.guild.id, "EVENT_MODE", enabled)
|
|
|
|
await interaction.response.send_message(
|
|
f"Event mode is now {'enabled' if enabled else 'disabled'}.",
|
|
ephemeral=False,
|
|
)
|
|
|
|
@config_subgroup.command(
|
|
name="testing_mode",
|
|
description="Enable or disable testing mode (no actions are taken).",
|
|
)
|
|
@app_commands.describe(enabled="Enable testing mode (true/false)")
|
|
async def testing_mode(self, interaction: discord.Interaction, enabled: bool):
|
|
if not interaction.user.guild_permissions.administrator:
|
|
await interaction.response.send_message(
|
|
"You must be an administrator to use this command.", ephemeral=False
|
|
)
|
|
return
|
|
await set_guild_config(interaction.guild.id, "TESTING_MODE", enabled)
|
|
await interaction.response.send_message(
|
|
f"Testing mode is now {'enabled' if enabled else 'disabled'}.",
|
|
ephemeral=False,
|
|
)
|
|
|
|
@config_subgroup.command(
|
|
name="update_rules",
|
|
description="Update server rules from the specified channel.",
|
|
)
|
|
@app_commands.describe(channel="The channel containing the server rules.")
|
|
@app_commands.checks.has_permissions(administrator=True)
|
|
async def update_rules(
|
|
self, interaction: discord.Interaction, channel: discord.TextChannel
|
|
) -> None:
|
|
"""Pull the server rules from a channel and update the global config."""
|
|
messages = []
|
|
async for msg in channel.history(
|
|
limit=MAX_RULE_MESSAGES + 1, oldest_first=True
|
|
):
|
|
if msg.content:
|
|
messages.append(msg.content)
|
|
if len(messages) > MAX_RULE_MESSAGES:
|
|
await interaction.response.send_message(
|
|
f"Channel has more than {MAX_RULE_MESSAGES} messages."
|
|
" Please consolidate your rules into fewer messages.",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
|
|
if not messages:
|
|
await interaction.response.send_message(
|
|
"No messages found in that channel.", ephemeral=True
|
|
)
|
|
return
|
|
|
|
rules_text = "\n\n".join(messages).strip()
|
|
aimod_config_module.SERVER_RULES = rules_text
|
|
await interaction.response.send_message(
|
|
f"Server rules updated from {channel.mention}.", ephemeral=False
|
|
)
|
|
|
|
@config_subgroup.command(
|
|
name="reset_rules",
|
|
description="Reset server rules to the default hardcoded version.",
|
|
)
|
|
@app_commands.checks.has_permissions(administrator=True)
|
|
async def reset_rules(self, interaction: discord.Interaction) -> None:
|
|
"""Reset the server rules to the default string."""
|
|
aimod_config_module.SERVER_RULES = aimod_config_module.DEFAULT_SERVER_RULES
|
|
await interaction.response.send_message(
|
|
"Server rules have been reset to the default.", ephemeral=False
|
|
)
|
|
|
|
@config_subgroup.command(
|
|
name="update_instructions",
|
|
description="Update moderation instructions from the specified channel.",
|
|
)
|
|
@app_commands.describe(
|
|
channel="The channel containing the moderation instructions."
|
|
)
|
|
@app_commands.checks.has_permissions(administrator=True)
|
|
async def update_instructions(
|
|
self, interaction: discord.Interaction, channel: discord.TextChannel
|
|
) -> None:
|
|
"""Pull moderation instructions from a channel and update the global config."""
|
|
messages = []
|
|
async for msg in channel.history(
|
|
limit=MAX_RULE_MESSAGES + 1, oldest_first=True
|
|
):
|
|
if msg.content:
|
|
messages.append(msg.content)
|
|
if len(messages) > MAX_RULE_MESSAGES:
|
|
await interaction.response.send_message(
|
|
f"Channel has more than {MAX_RULE_MESSAGES} messages."
|
|
" Please consolidate your instructions into fewer messages.",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
|
|
if not messages:
|
|
await interaction.response.send_message(
|
|
"No messages found in that channel.", ephemeral=True
|
|
)
|
|
return
|
|
|
|
instructions_text = "\n\n".join(messages).strip()
|
|
aimod_config_module.MODERATION_INSTRUCTIONS = instructions_text
|
|
await interaction.response.send_message(
|
|
f"Moderation instructions updated from {channel.mention}.", ephemeral=False
|
|
)
|
|
|
|
@config_subgroup.command(
|
|
name="reset_instructions",
|
|
description="Reset moderation instructions to the default version.",
|
|
)
|
|
@app_commands.checks.has_permissions(administrator=True)
|
|
async def reset_instructions(self, interaction: discord.Interaction) -> None:
|
|
"""Reset moderation instructions to the default string."""
|
|
aimod_config_module.MODERATION_INSTRUCTIONS = (
|
|
aimod_config_module.DEFAULT_MODERATION_INSTRUCTIONS
|
|
)
|
|
await interaction.response.send_message(
|
|
"Moderation instructions have been reset to the default.", ephemeral=False
|
|
)
|
|
|
|
@config_subgroup.command(
|
|
name="update_instructions",
|
|
description="Update moderation instructions from the specified channel.",
|
|
)
|
|
@app_commands.describe(
|
|
channel="The channel containing the moderation instructions."
|
|
)
|
|
@app_commands.checks.has_permissions(administrator=True)
|
|
async def update_instructions(
|
|
self, interaction: discord.Interaction, channel: discord.TextChannel
|
|
) -> None:
|
|
"""Pull moderation instructions from a channel and update the global config."""
|
|
messages = []
|
|
async for msg in channel.history(
|
|
limit=MAX_RULE_MESSAGES + 1, oldest_first=True
|
|
):
|
|
if msg.content:
|
|
messages.append(msg.content)
|
|
if len(messages) > MAX_RULE_MESSAGES:
|
|
await interaction.response.send_message(
|
|
f"Channel has more than {MAX_RULE_MESSAGES} messages."
|
|
" Please consolidate your instructions into fewer messages.",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
|
|
if not messages:
|
|
await interaction.response.send_message(
|
|
"No messages found in that channel.", ephemeral=True
|
|
)
|
|
return
|
|
|
|
instructions_text = "\n\n".join(messages).strip()
|
|
aimod_config_module.MODERATION_INSTRUCTIONS = instructions_text
|
|
await interaction.response.send_message(
|
|
f"Moderation instructions updated from {channel.mention}.", ephemeral=False
|
|
)
|
|
|
|
@config_subgroup.command(
|
|
name="reset_instructions",
|
|
description="Reset moderation instructions to the default version.",
|
|
)
|
|
@app_commands.checks.has_permissions(administrator=True)
|
|
async def reset_instructions(self, interaction: discord.Interaction) -> None:
|
|
"""Reset moderation instructions to the default string."""
|
|
aimod_config_module.MODERATION_INSTRUCTIONS = (
|
|
aimod_config_module.DEFAULT_MODERATION_INSTRUCTIONS
|
|
)
|
|
await interaction.response.send_message(
|
|
"Moderation instructions have been reset to the default.", ephemeral=False
|
|
)
|
|
|
|
@infractions_subgroup.command(
|
|
name="view",
|
|
description="View a user's AI moderation infraction history (mod/admin only).",
|
|
)
|
|
@app_commands.describe(user="The user to view infractions for")
|
|
async def viewinfractions(
|
|
self, interaction: discord.Interaction, user: discord.Member
|
|
):
|
|
# Check if user has permission (admin or moderator role)
|
|
moderator_role_id = get_guild_config(interaction.guild.id, "MODERATOR_ROLE_ID")
|
|
moderator_role = (
|
|
interaction.guild.get_role(moderator_role_id) if moderator_role_id else None
|
|
)
|
|
|
|
has_permission = interaction.user.guild_permissions.administrator or (
|
|
moderator_role and moderator_role in interaction.user.roles
|
|
)
|
|
|
|
if not has_permission:
|
|
await interaction.response.send_message(
|
|
"You must be an administrator or have the moderator role to use this command.",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
|
|
# Get the user's infraction history
|
|
infractions = get_user_infraction_history(interaction.guild.id, user.id)
|
|
|
|
if not infractions:
|
|
await interaction.response.send_message(
|
|
f"{user.mention} has no recorded infractions.", ephemeral=False
|
|
)
|
|
return
|
|
|
|
# Create an embed to display the infractions
|
|
embed = discord.Embed(
|
|
title=f"Infraction History for {user.display_name}",
|
|
description=f"User ID: {user.id}",
|
|
color=discord.Color.orange(),
|
|
)
|
|
|
|
# Add each infraction to the embed
|
|
for i, infraction in enumerate(infractions, 1):
|
|
timestamp = infraction.get("timestamp", "Unknown date")[:19].replace(
|
|
"T", " "
|
|
) # Format ISO timestamp
|
|
rule = infraction.get("rule_violated", "Unknown rule")
|
|
action = infraction.get("action_taken", "Unknown action")
|
|
reason = infraction.get("reasoning", "No reason provided")
|
|
|
|
# Truncate reason if it's too long
|
|
if len(reason) > 200:
|
|
reason = reason[:197] + "..."
|
|
|
|
embed.add_field(
|
|
name=f"Infraction #{i} - {timestamp}",
|
|
value=f"**Rule Violated:** {rule}\n**Action Taken:** {action}\n**Reason:** {reason}",
|
|
inline=False,
|
|
)
|
|
|
|
embed.set_footer(text=f"Total infractions: {len(infractions)}")
|
|
embed.timestamp = discord.utils.utcnow()
|
|
|
|
await interaction.response.send_message(embed=embed, ephemeral=False)
|
|
|
|
@appeal_subgroup.command(
|
|
name="human_review",
|
|
description="Request a human moderator to review your case.",
|
|
)
|
|
@app_commands.describe(
|
|
reason="Explain why you want a human to review the AI decision",
|
|
guild_id="If using in DMs, provide the server ID",
|
|
)
|
|
async def appeal_human_review(
|
|
self,
|
|
interaction: discord.Interaction,
|
|
reason: str,
|
|
guild_id: int | None = None,
|
|
):
|
|
"""Let a user request a manual moderator review."""
|
|
guild = interaction.guild or (
|
|
self.bot.get_guild(guild_id) if guild_id else None
|
|
)
|
|
if not guild:
|
|
await interaction.response.send_message(
|
|
"Invalid or missing guild ID.", ephemeral=True
|
|
)
|
|
return
|
|
|
|
log_channel_id = get_guild_config(guild.id, "MOD_LOG_CHANNEL_ID")
|
|
log_channel = self.bot.get_channel(log_channel_id) if log_channel_id else None
|
|
if not log_channel:
|
|
await interaction.response.send_message(
|
|
"Appeals are not enabled for this server.", ephemeral=True
|
|
)
|
|
return
|
|
|
|
timestamp = datetime.datetime.utcnow().isoformat()
|
|
await add_user_appeal(
|
|
guild.id, interaction.user.id, "HUMAN_REVIEW", reason, timestamp, ""
|
|
)
|
|
|
|
embed = discord.Embed(
|
|
title="Human Review Requested", color=discord.Color.orange()
|
|
)
|
|
embed.add_field(
|
|
name="User",
|
|
value=f"{interaction.user} ({interaction.user.id})",
|
|
inline=False,
|
|
)
|
|
embed.add_field(name="Request", value=reason, inline=False)
|
|
embed.timestamp = discord.utils.utcnow()
|
|
await log_channel.send(embed=embed)
|
|
|
|
await interaction.response.send_message(
|
|
"Your request for a human review has been sent.", ephemeral=True
|
|
)
|
|
|
|
@infractions_subgroup.command(
|
|
name="clear",
|
|
description="Clear a user's AI moderation infraction history (admin only).",
|
|
)
|
|
@app_commands.describe(user="The user to clear infractions for")
|
|
async def clearinfractions(
|
|
self, interaction: discord.Interaction, user: discord.Member
|
|
):
|
|
# Check if user has administrator permission
|
|
if not interaction.user.guild_permissions.administrator:
|
|
await interaction.response.send_message(
|
|
"You must be an administrator to use this command.", ephemeral=True
|
|
)
|
|
return
|
|
|
|
# Get the user's infraction history
|
|
key = f"{interaction.guild.id}_{user.id}"
|
|
infractions = USER_INFRACTIONS.get(key, [])
|
|
|
|
if not infractions:
|
|
await interaction.response.send_message(
|
|
f"{user.mention} has no recorded infractions to clear.", ephemeral=False
|
|
)
|
|
return
|
|
|
|
# Clear the user's infractions
|
|
USER_INFRACTIONS[key] = []
|
|
await save_user_infractions()
|
|
|
|
await interaction.response.send_message(
|
|
f"Cleared {len(infractions)} infraction(s) for {user.mention}.",
|
|
ephemeral=False,
|
|
)
|
|
|
|
@infractions_subgroup.command(
|
|
name="leaderboard",
|
|
description="Show users with the fewest infractions.",
|
|
)
|
|
async def leaderboard(self, interaction: discord.Interaction):
|
|
guild_id = interaction.guild.id
|
|
counts = {}
|
|
for key, infractions in USER_INFRACTIONS.items():
|
|
if key.startswith(f"{guild_id}_"):
|
|
uid = int(key.split("_", 1)[1])
|
|
counts[uid] = len(infractions)
|
|
if not counts:
|
|
await interaction.response.send_message(
|
|
"No infractions recorded for this guild.", ephemeral=True
|
|
)
|
|
return
|
|
sorted_users = sorted(counts.items(), key=lambda x: x[1])[:5]
|
|
lines = []
|
|
for uid, count in sorted_users:
|
|
member = interaction.guild.get_member(uid)
|
|
name = member.display_name if member else f"ID:{uid}"
|
|
lines.append(f"**{name}** - {count} infractions")
|
|
embed = discord.Embed(
|
|
title="Best Behavior Leaderboard",
|
|
description="\n".join(lines),
|
|
color=discord.Color.green(),
|
|
)
|
|
await interaction.response.send_message(embed=embed, ephemeral=False)
|
|
|
|
@infractions_subgroup.command(
|
|
name="restore",
|
|
description="Restore infractions from the latest backup (admin only).",
|
|
)
|
|
@app_commands.checks.has_permissions(administrator=True)
|
|
async def restore_infractions(self, interaction: discord.Interaction):
|
|
backups = sorted(os.listdir(INFRACTION_BACKUP_DIR))
|
|
if not backups:
|
|
await interaction.response.send_message("No backups found.", ephemeral=True)
|
|
return
|
|
latest = os.path.join(INFRACTION_BACKUP_DIR, backups[-1])
|
|
try:
|
|
shutil.copy(latest, USER_INFRACTIONS_PATH)
|
|
async with aiofiles.open(USER_INFRACTIONS_PATH, "r", encoding="utf-8") as f:
|
|
data = await f.read()
|
|
async with CONFIG_LOCK:
|
|
global USER_INFRACTIONS
|
|
USER_INFRACTIONS = json.loads(data)
|
|
await interaction.response.send_message(
|
|
f"Infractions restored from {backups[-1]}", ephemeral=False
|
|
)
|
|
except Exception as e: # noqa: BLE001
|
|
await interaction.response.send_message(
|
|
f"Failed to restore infractions: {e}", ephemeral=True
|
|
)
|
|
|
|
@appeal_subgroup.command(name="submit", description="Submit a moderation appeal.")
|
|
@app_commands.describe(
|
|
action="The action you are appealing",
|
|
reason="Explain why you believe the action was incorrect",
|
|
guild_id="If using in DMs, provide the server ID",
|
|
message_id="ID of the moderated message you are appealing (optional)",
|
|
)
|
|
async def appeal_submit(
|
|
self,
|
|
interaction: discord.Interaction,
|
|
action: str,
|
|
reason: str,
|
|
guild_id: int | None = None,
|
|
message_id: int | None = None,
|
|
):
|
|
guild = interaction.guild or (
|
|
self.bot.get_guild(guild_id) if guild_id else None
|
|
)
|
|
if not guild:
|
|
await interaction.response.send_message(
|
|
"Invalid or missing guild ID.", ephemeral=True
|
|
)
|
|
return
|
|
|
|
log_channel_id = get_guild_config(guild.id, "MOD_LOG_CHANNEL_ID")
|
|
log_channel = self.bot.get_channel(log_channel_id) if log_channel_id else None
|
|
if not log_channel:
|
|
await interaction.response.send_message(
|
|
"Appeals are not enabled for this server.", ephemeral=True
|
|
)
|
|
return
|
|
|
|
infractions = get_user_infraction_history(guild.id, interaction.user.id)
|
|
target_infraction = None
|
|
if message_id:
|
|
for infr in infractions[::-1]:
|
|
if infr.get("message_id") == message_id:
|
|
target_infraction = infr
|
|
break
|
|
if not target_infraction and infractions:
|
|
target_infraction = infractions[-1]
|
|
|
|
ai_review = await self.run_appeal_ai(
|
|
guild,
|
|
interaction.user,
|
|
action,
|
|
reason,
|
|
target_infraction,
|
|
)
|
|
timestamp = datetime.datetime.utcnow().isoformat()
|
|
ref = target_infraction.get("message_id") if target_infraction else None
|
|
await add_user_appeal(
|
|
guild.id,
|
|
interaction.user.id,
|
|
action,
|
|
reason,
|
|
timestamp,
|
|
ai_review,
|
|
str(ref) if ref else None,
|
|
)
|
|
|
|
embed = discord.Embed(title="New Appeal", color=discord.Color.blue())
|
|
embed.add_field(
|
|
name="User",
|
|
value=f"{interaction.user} ({interaction.user.id})",
|
|
inline=False,
|
|
)
|
|
embed.add_field(name="Action", value=action, inline=False)
|
|
if ref:
|
|
embed.add_field(name="Infraction", value=f"Message ID: {ref}", inline=False)
|
|
msg_snip = (
|
|
target_infraction.get("message_content") if target_infraction else None
|
|
)
|
|
if msg_snip:
|
|
embed.add_field(
|
|
name="Message Snippet",
|
|
value=f"`{msg_snip}`",
|
|
inline=False,
|
|
)
|
|
attachments = (
|
|
target_infraction.get("attachments") if target_infraction else None
|
|
)
|
|
if attachments:
|
|
attach_text = "\n".join(attachments)
|
|
if len(attach_text) > 1024:
|
|
attach_text = attach_text[:1021] + "..."
|
|
embed.add_field(
|
|
name="Attachments",
|
|
value=attach_text,
|
|
inline=False,
|
|
)
|
|
embed.add_field(name="Appeal", value=reason, inline=False)
|
|
embed.add_field(name="AI Review", value=ai_review[:1000], inline=False)
|
|
embed.timestamp = discord.utils.utcnow()
|
|
await log_channel.send(embed=embed)
|
|
|
|
await interaction.response.send_message(
|
|
"Your appeal has been submitted.", ephemeral=True
|
|
)
|
|
|
|
@appeal_subgroup.command(
|
|
name="list", description="View a user's appeals (mods only)."
|
|
)
|
|
@app_commands.describe(user="The user to view appeals for")
|
|
async def appeal_list(self, interaction: discord.Interaction, user: discord.Member):
|
|
moderator_role_id = get_guild_config(interaction.guild.id, "MODERATOR_ROLE_ID")
|
|
moderator_role = (
|
|
interaction.guild.get_role(moderator_role_id) if moderator_role_id else None
|
|
)
|
|
has_permission = interaction.user.guild_permissions.administrator or (
|
|
moderator_role and moderator_role in interaction.user.roles
|
|
)
|
|
if not has_permission:
|
|
await interaction.response.send_message(
|
|
"You must be an administrator or have the moderator role to use this command.",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
|
|
appeals = get_user_appeals(interaction.guild.id, user.id)
|
|
history = get_user_infraction_history(interaction.guild.id, user.id)
|
|
if not appeals:
|
|
await interaction.response.send_message(
|
|
f"{user.mention} has no appeals.", ephemeral=False
|
|
)
|
|
return
|
|
|
|
embed = discord.Embed(
|
|
title=f"Appeals for {user.display_name}", color=discord.Color.blue()
|
|
)
|
|
for i, appeal in enumerate(appeals, 1):
|
|
ts = appeal.get("timestamp", "?")[:19].replace("T", " ")
|
|
summary = appeal.get("appeal_text", "")
|
|
ai_sum = appeal.get("ai_review", "")
|
|
if len(summary) > 150:
|
|
summary = summary[:147] + "..."
|
|
if len(ai_sum) > 150:
|
|
ai_sum = ai_sum[:147] + "..."
|
|
value = f"Action: {appeal.get('action')}\nReason: {summary}\nAI: {ai_sum}"
|
|
ref = appeal.get("infraction_reference")
|
|
if ref:
|
|
value = f"Infraction: {ref}\n" + value
|
|
for infr in history:
|
|
if str(infr.get("message_id")) == str(ref):
|
|
msg_snip = infr.get("message_content")
|
|
if msg_snip:
|
|
value += f"\nSnippet: {msg_snip}"
|
|
attachments = infr.get("attachments")
|
|
if attachments:
|
|
attach_txt = ", ".join(attachments)
|
|
if len(attach_txt) > 200:
|
|
attach_txt = attach_txt[:197] + "..."
|
|
value += f"\nAttachments: {attach_txt}"
|
|
reason = infr.get("reasoning")
|
|
if reason:
|
|
if len(reason) > 150:
|
|
reason = reason[:147] + "..."
|
|
value += f"\nAI Reasoning: {reason}"
|
|
break
|
|
embed.add_field(name=f"Appeal #{i} - {ts}", value=value, inline=False)
|
|
await interaction.response.send_message(embed=embed, ephemeral=False)
|
|
|
|
@appeal_subgroup.command(
|
|
name="testcases",
|
|
description="Run sample appeals through the AI review system (admin only).",
|
|
)
|
|
async def appeal_testcases(self, interaction: discord.Interaction):
|
|
"""Run a few hardcoded appeal scenarios through the AI with context."""
|
|
if not interaction.user.guild_permissions.administrator:
|
|
await interaction.response.send_message(
|
|
"You must be an administrator to use this command.",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
|
|
await interaction.response.defer(thinking=True, ephemeral=True)
|
|
|
|
scenarios = [
|
|
("WARN", "I was excited and sent many messages quickly."),
|
|
("MUTE", "I only quoted a meme and it was taken as harassment."),
|
|
("BAN", "I posted NSFW art but believed it was allowed."),
|
|
]
|
|
results: list[tuple[str, str]] = []
|
|
for idx, (action, text) in enumerate(scenarios, 1):
|
|
dummy_infraction = {
|
|
"message_id": 1000 + idx,
|
|
"channel_id": interaction.channel.id,
|
|
"message_content": f"Example offending message {idx}",
|
|
"attachments": [],
|
|
"reasoning": "Automated moderation reasoning sample",
|
|
}
|
|
result = await self.run_appeal_ai(
|
|
interaction.guild, interaction.user, action, text, dummy_infraction
|
|
)
|
|
results.append((action, result))
|
|
|
|
embed = discord.Embed(
|
|
title="Appeal AI Test Results", color=discord.Color.green()
|
|
)
|
|
for action, result in results:
|
|
field_text = result if result else "No response"
|
|
if len(field_text) > 1000:
|
|
field_text = field_text[:997] + "..."
|
|
embed.add_field(name=action, value=field_text, inline=False)
|
|
|
|
await interaction.followup.send(embed=embed, ephemeral=True)
|
|
|
|
@model_subgroup.command(
|
|
name="set", description="Change the AI model used for moderation (admin only)."
|
|
)
|
|
@app_commands.describe(
|
|
model="The Vertex AI model to use (e.g., 'gemini-1.5-flash-001', 'gemini-1.0-pro')"
|
|
)
|
|
async def modsetmodel(self, interaction: discord.Interaction, model: str):
|
|
# Check if user has administrator permission
|
|
if not interaction.user.guild_permissions.administrator:
|
|
await interaction.response.send_message(
|
|
"You must be an administrator to use this command.", ephemeral=True
|
|
)
|
|
return
|
|
|
|
# Validate the model name (basic validation for Vertex AI)
|
|
# Vertex AI models usually don't have "/" like OpenRouter, but can have "-" and numbers.
|
|
# Example: gemini-1.5-flash-001
|
|
if not model or len(model) < 5: # Basic check
|
|
await interaction.response.send_message(
|
|
"Invalid model format. Please provide a valid Vertex AI model ID (e.g., 'gemini-1.5-flash-001').",
|
|
ephemeral=False,
|
|
)
|
|
return
|
|
|
|
# Save the model to guild configuration
|
|
guild_id = interaction.guild.id
|
|
await set_guild_config(guild_id, "AI_MODEL", model)
|
|
|
|
# Note: There's no global model variable to update here like OPENROUTER_MODEL.
|
|
# The cog will use the guild-specific config or the DEFAULT_VERTEX_AI_MODEL.
|
|
|
|
await interaction.response.send_message(
|
|
f"AI moderation model updated to `{model}` for this guild.", ephemeral=False
|
|
)
|
|
|
|
# @modsetmodel.autocomplete('model') # Autocomplete removed as OpenRouter models are not used.
|
|
# async def modsetmodel_autocomplete(...): # This function is now removed.
|
|
|
|
@model_subgroup.command(
|
|
name="get", description="View the current AI model used for moderation."
|
|
)
|
|
async def modgetmodel(self, interaction: discord.Interaction):
|
|
# Get the model from guild config, fall back to global default
|
|
guild_id = interaction.guild.id
|
|
model_used = get_guild_config(guild_id, "AI_MODEL", DEFAULT_VERTEX_AI_MODEL)
|
|
|
|
# Create an embed to display the model information
|
|
embed = discord.Embed(
|
|
title="AI Moderation Model",
|
|
description=f"The current AI model used for moderation in this server is:",
|
|
color=discord.Color.blue(),
|
|
)
|
|
embed.add_field(name="Model In Use", value=f"`{model_used}`", inline=False)
|
|
embed.add_field(
|
|
name="Default Model", value=f"`{DEFAULT_VERTEX_AI_MODEL}`", inline=False
|
|
)
|
|
embed.set_footer(text="Use /aimod model set to change the model")
|
|
|
|
await interaction.response.send_message(embed=embed, ephemeral=False)
|
|
|
|
# --- Helper Function to Safely Extract Text from Vertex AI Response ---
|
|
def _get_response_text(
|
|
self, response: Optional[types.GenerateContentResponse]
|
|
) -> Optional[str]:
|
|
"""
|
|
Safely extracts the text content from the first text part of a GenerateContentResponse.
|
|
Handles potential errors and lack of text parts gracefully.
|
|
(Adapted from teto_cog.py)
|
|
"""
|
|
if not response:
|
|
print("[AIModerationCog._get_response_text] Received None response object.")
|
|
return None
|
|
|
|
if (
|
|
hasattr(response, "text") and response.text
|
|
): # Some simpler responses might have .text directly
|
|
print(
|
|
"[AIModerationCog._get_response_text] Found text directly in response.text attribute."
|
|
)
|
|
return response.text
|
|
|
|
if not response.candidates:
|
|
print(
|
|
f"[AIModerationCog._get_response_text] Response object has no candidates. Response: {response}"
|
|
)
|
|
return None
|
|
|
|
try:
|
|
candidate = response.candidates[0]
|
|
if not hasattr(candidate, "content") or not candidate.content:
|
|
print(
|
|
f"[AIModerationCog._get_response_text] Candidate 0 has no 'content'. Candidate: {candidate}"
|
|
)
|
|
return None
|
|
if not hasattr(candidate.content, "parts") or not candidate.content.parts:
|
|
print(
|
|
f"[AIModerationCog._get_response_text] Candidate 0 content has no 'parts' or parts list is empty. types.Content: {candidate.content}"
|
|
)
|
|
return None
|
|
|
|
for i, part in enumerate(candidate.content.parts):
|
|
if hasattr(part, "text") and part.text is not None:
|
|
if isinstance(part.text, str) and part.text.strip():
|
|
print(
|
|
f"[AIModerationCog._get_response_text] Found non-empty text in part {i}."
|
|
)
|
|
return part.text
|
|
else:
|
|
print(
|
|
f"[AIModerationCog._get_response_text] types.Part {i} has 'text' attribute, but it's empty or not a string: {part.text!r}"
|
|
)
|
|
print(
|
|
f"[AIModerationCog._get_response_text] No usable text part found in candidate 0 after iterating through all parts."
|
|
)
|
|
return None
|
|
|
|
except (AttributeError, IndexError, TypeError) as e:
|
|
print(
|
|
f"[AIModerationCog._get_response_text] Error accessing response structure: {type(e).__name__}: {e}"
|
|
)
|
|
print(f"Problematic response object: {response}")
|
|
return None
|
|
except Exception as e:
|
|
print(
|
|
f"[AIModerationCog._get_response_text] Unexpected error extracting text: {e}"
|
|
)
|
|
print(f"Response object during error: {response}")
|
|
return None
|
|
|
|
async def query_vertex_ai(
|
|
self,
|
|
message: discord.Message,
|
|
message_content: str,
|
|
user_history: str,
|
|
image_data_list: Optional[List[Tuple[str, bytes, str, str]]] = None,
|
|
):
|
|
"""
|
|
Sends the message content, user history, and additional context to Google Vertex AI for analysis.
|
|
Optionally includes image data for visual content moderation.
|
|
|
|
Args:
|
|
message: The original discord.Message object.
|
|
message_content: The text content of the message.
|
|
user_history: A string summarizing the user's past infractions.
|
|
image_data_list: Optional list of tuples (mime_type, image_bytes, attachment_type, filename) for image moderation.
|
|
|
|
Returns:
|
|
A dictionary containing the AI's decision, or None if an error occurs.
|
|
"""
|
|
print(
|
|
f"query_vertex_ai called. Vertex AI client available: {self.genai_client is not None}"
|
|
)
|
|
if not self.genai_client:
|
|
print("Error: Vertex AI Client is not available. Cannot query API.")
|
|
return None
|
|
|
|
# Construct the prompt for the AI model (system prompt is largely the same)
|
|
system_prompt_text = (
|
|
"You are an AI moderation assistant for a Discord server.\n"
|
|
"Your primary function is to analyze message content and attached media based STRICTLY on the server rules provided below, using all available context.\n\n"
|
|
"Server Rules:\n"
|
|
"---\n"
|
|
f"{SERVER_RULES}\n"
|
|
"---\n\n"
|
|
"Context Provided:\n"
|
|
" You will receive the following information to aid your analysis:\n"
|
|
'- User\'s Server Role: (e.g., "Server Owner", "Admin", "Moderator", "Member").\n'
|
|
"- Channel Category: The name of the category the channel belongs to.\n"
|
|
"- Channel Age-Restricted/NSFW (Discord Setting): Boolean (true/false).\n"
|
|
"- Replied-to Message: If the current message is a reply, the content of the original message will be provided. This is crucial for understanding direct interactions.\n"
|
|
"- Recent Channel History: The last few messages in the channel to understand the flow of conversation.\n"
|
|
" - Attached Media: If the message contains image, GIF, or video attachments, they will be provided directly in the content array for analysis.\n\n"
|
|
"Instructions:\n"
|
|
'1. Review the "Message Content" and any attached media against EACH rule, considering ALL provided context (User Role, Channel Info, Replied-to Message, Recent Channel History).\n'
|
|
' - The "Channel Age-Restricted/NSFW (Discord Setting)" is the definitive indicator for NSFW content by Discord.\n'
|
|
' - The "Channel Category" provides general context.\n'
|
|
' - **"Replied-to Message" and "Recent Channel History" are vital for understanding banter, jokes, and ongoing discussions. A statement that seems offensive in isolation might be acceptable within the flow of conversation or as a direct reply.**\n'
|
|
" - If images, GIFs, or videos are attached, analyze ALL of them for rule violations.\n"
|
|
" - Pay special attention to images that may contain NSFW content, pornography, gore, or other prohibited visual content.\n"
|
|
" - **Do not attempt to guess or estimate the ages of characters or people in images.** Only act on explicit textual context indicating they are minors.\n"
|
|
" - If multiple attachments are present, a violation in ANY of them should be flagged.\n"
|
|
f"{aimod_config_module.MODERATION_INSTRUCTIONS}\n"
|
|
"3. Respond ONLY with a single JSON object containing the following keys:\n"
|
|
' - "reasoning": string (A concise explanation for your decision, referencing the specific rule and content).\n'
|
|
' - "violation": boolean (true if any rule is violated, false otherwise)\n'
|
|
' - "rule_violated": string (The number of the rule violated, e.g., "1", "5A", "None". If multiple rules are violated, state the MOST SEVERE one, prioritizing 5 > 5A > 3 > 2 > 1. Ignore any rules about AI-generated pornography.)\n'
|
|
' - "action": string (Suggest ONE action from: "IGNORE", "WARN", "DELETE", "TIMEOUT_SHORT", "TIMEOUT_MEDIUM", "TIMEOUT_LONG", "KICK", "BAN", "NOTIFY_MODS", "SUICIDAL".\n'
|
|
' - "notify_mods_message": optional string (If the suggested action is "NOTIFY_MODS", provide an optional brief message here for the moderators, e.g., "User\'s message is slightly ambiguous, human review needed.").\n'
|
|
" Consider the user's infraction history. If the user has prior infractions for similar or escalating behavior, suggest a more severe action than if it were a first-time offense for a minor rule.\n"
|
|
" Progressive Discipline Guide (unless overridden by severity):\n"
|
|
' - First minor offense: "WARN" (and "DELETE" if content is removable like Rule 1/4).\n'
|
|
' - Second minor offense / First moderate offense: "TIMEOUT_SHORT" (e.g., 10 minutes).\n'
|
|
' - Repeated moderate offenses: "TIMEOUT_MEDIUM" (e.g., 1 hour).\n'
|
|
' - Multiple/severe offenses: "TIMEOUT_LONG" (e.g., 1 day), "KICK", or "BAN".\n'
|
|
' - Use "BAN" on a user\'s **first infraction only in extremely severe cases** such as posting gore or unmistakable real-life CSAM involving minors. If the content appears animated or ambiguous, do **not** immediately ban; a timeout or moderator review is more appropriate.\n'
|
|
" Spamming:\n"
|
|
' - If a user continuously sends very long messages that are off-topic, repetitive, or appear to be meaningless spam (e.g., character floods, nonsensical text), suggest "TIMEOUT_MEDIUM" or "TIMEOUT_LONG" depending on severity and history, even if the content itself doesn\'t violate other specific rules. This is to maintain chat readability.\n'
|
|
" Rule Severity Guidelines (use your judgment):\n"
|
|
" - Consider the severity of each rule violation on its own merits.\n"
|
|
" - Consider the user's history of past infractions when determining appropriate action.\n"
|
|
" - Consider the context of the message and channel when evaluating violations.\n"
|
|
" - You have full discretion to determine the most appropriate action for any violation.\n"
|
|
" Suicidal Content:\n"
|
|
' If the message content expresses **clear, direct, and serious suicidal ideation, intent, planning, or recent attempts** (e.g., \'I am going to end my life and have a plan\', \'I survived my attempt last night\', \'I wish I hadn\'t woken up after trying\'), ALWAYS use "SUICIDAL" as the action, and set "violation" to true, with "rule_violated" as "Suicidal Content".\n'
|
|
" For casual, edgy, hyperbolic, or ambiguous statements like 'imma kms', 'just kill me now', 'I want to die (lol)', or phrases that are clearly part of edgy humor/banter rather than a genuine cry for help, you should lean towards \"IGNORE\" or \"NOTIFY_MODS\" if there's slight ambiguity but no clear serious intent. **Do NOT flag 'imma kms' as \"SUICIDAL\" unless there is very strong supporting context indicating genuine, immediate, and serious intent.**\n"
|
|
' If unsure but suspicious, or if the situation is complex: "NOTIFY_MODS".\n'
|
|
' Default action for minor first-time rule violations should be "WARN" or "DELETE" (if applicable).\n'
|
|
' Do not suggest "KICK" or "BAN" lightly; reserve for severe or repeated major offenses.\n'
|
|
" Timeout durations: TIMEOUT_SHORT (approx 10 mins), TIMEOUT_MEDIUM (approx 1 hour), TIMEOUT_LONG (approx 1 day to 1 week).\n"
|
|
" The system will handle the exact timeout duration; you just suggest the category.)\n\n"
|
|
"Example Response (Text Violation):\n"
|
|
'{{\n "reasoning": "The message content clearly depicts IRL non-consensual sexual content involving minors, violating rule 5A.",\n "violation": true,\n "rule_violated": "5A",\n "action": "BAN"\n}}\n\n'
|
|
"Example Response (Image Violation):\n"
|
|
'{{\n "reasoning": "Attachment #2 contains explicit pornographic imagery in a non-NSFW channel, violating rule 1.",\n "violation": true,\n "rule_violated": "1",\n "action": "DELETE"\n}}\n\n'
|
|
"Example Response (Multiple Attachments Violation):\n"
|
|
'{{\n "reasoning": "While the text content is fine, attachment #3 contains IRL pornography, violating rule 5A.",\n "violation": true,\n "rule_violated": "5A",\n "action": "WARN"\n}}\n\n'
|
|
"Example Response (No Violation):\n"
|
|
'{{\n "reasoning": "The message and all attached images are respectful and contain no prohibited content.",\n "violation": false,\n "rule_violated": "None",\n "action": "IGNORE"\n}}\n\n'
|
|
"Example Response (Suicidal Content):\n"
|
|
'{{\n "reasoning": "The user\'s message \'I want to end my life\' indicates clear suicidal intent.",\n "violation": true,\n "rule_violated": "Suicidal Content",\n "action": "SUICIDAL"\n}}\n\n'
|
|
"Example Response (Notify Mods):\n"
|
|
'{{\n "reasoning": "The message contains potentially sensitive content that requires human review.",\n "violation": true,\n "rule_violated": "Review Required",\n "action": "NOTIFY_MODS",\n "notify_mods_message": "Content is borderline, please review."\n}}'
|
|
)
|
|
|
|
member = message.author # This is a discord.Member object
|
|
server_role_str = "Unprivileged Member" # Default
|
|
|
|
if member == await message.guild.fetch_member(message.guild.owner_id):
|
|
server_role_str = "Server Owner"
|
|
elif member.guild_permissions.administrator:
|
|
server_role_str = "Admin"
|
|
else:
|
|
perms = member.guild_permissions
|
|
if (
|
|
perms.manage_messages
|
|
or perms.kick_members
|
|
or perms.ban_members
|
|
or perms.moderate_members
|
|
):
|
|
server_role_str = "Moderator"
|
|
|
|
print(f"role: {server_role_str}")
|
|
|
|
# --- Fetch Replied-to Message ---
|
|
replied_to_message_content = "N/A (Not a reply)"
|
|
if message.reference and message.reference.message_id:
|
|
try:
|
|
replied_to_msg = await message.channel.fetch_message(
|
|
message.reference.message_id
|
|
)
|
|
replied_to_message_content = f"User '{replied_to_msg.author.name}' said: \"{replied_to_msg.content[:200]}\""
|
|
if len(replied_to_msg.content) > 200:
|
|
replied_to_message_content += "..."
|
|
except discord.NotFound:
|
|
replied_to_message_content = "N/A (Replied-to message not found)"
|
|
except discord.Forbidden:
|
|
replied_to_message_content = (
|
|
"N/A (Cannot fetch replied-to message - permissions)"
|
|
)
|
|
except Exception as e:
|
|
replied_to_message_content = (
|
|
f"N/A (Error fetching replied-to message: {e})"
|
|
)
|
|
|
|
# --- Fetch Recent Channel History ---
|
|
recent_channel_history_str = "N/A (Could not fetch history)"
|
|
try:
|
|
history_messages = []
|
|
# Fetch last 11 messages (current + 10 previous). We'll filter out the current one
|
|
async for prev_msg in message.channel.history(limit=11, before=message):
|
|
if (
|
|
prev_msg.id != message.id
|
|
): # Ensure we don't include the current message itself
|
|
author_name = (
|
|
prev_msg.author.name + " (BOT)"
|
|
if prev_msg.author.bot
|
|
else prev_msg.author.name
|
|
)
|
|
history_messages.append(
|
|
f"- {author_name}: \"{prev_msg.content[:150]}{'...' if len(prev_msg.content) > 150 else ''}\" (ID: {prev_msg.id})"
|
|
)
|
|
if history_messages:
|
|
# Reverse to show oldest first in the snippet, then take the last 10.
|
|
recent_channel_history_str = "\n".join(
|
|
list(reversed(history_messages))[:10]
|
|
)
|
|
else:
|
|
recent_channel_history_str = (
|
|
"No recent messages before this one in the channel."
|
|
)
|
|
except discord.Forbidden:
|
|
recent_channel_history_str = (
|
|
"N/A (Cannot fetch channel history - permissions)"
|
|
)
|
|
except Exception as e:
|
|
recent_channel_history_str = f"N/A (Error fetching channel history: {e})"
|
|
|
|
# Prepare user prompt content list with proper OpenRouter format
|
|
user_prompt_content_list = []
|
|
|
|
# Add the text context first
|
|
user_context_text = f"""User Infraction History (for {message.author.name}, ID: {message.author.id}):
|
|
---
|
|
{user_history if user_history else "No prior infractions recorded for this user in this guild."}
|
|
---
|
|
|
|
Current Message Context:
|
|
- Author: {message.author.name} (ID: {message.author.id})
|
|
- Server Role: {server_role_str}
|
|
- Channel: #{message.channel.name} (ID: {message.channel.id})
|
|
- Channel Category: {message.channel.category.name if message.channel.category else "No Category"}
|
|
- Channel Age-Restricted/NSFW (Discord Setting): {message.channel.is_nsfw()}
|
|
---
|
|
Replied-to Message:
|
|
{replied_to_message_content}
|
|
---
|
|
Recent Channel History (last up to 10 messages before this one):
|
|
{recent_channel_history_str}
|
|
---
|
|
Message Content to Analyze:
|
|
"{message_content}"
|
|
|
|
Now, analyze the message content and any attached media based on the server rules and ALL the context provided above.
|
|
Follow the JSON output format specified in the system prompt.
|
|
CRITICAL: Do NOT output anything other than the required JSON response.
|
|
"""
|
|
# Add the text content first
|
|
user_prompt_content_list.append({"type": "text", "text": user_context_text})
|
|
|
|
# Add images in the proper OpenRouter format
|
|
if image_data_list and len(image_data_list) > 0:
|
|
try:
|
|
for i, (mime_type, image_bytes, attachment_type, filename) in enumerate(
|
|
image_data_list
|
|
):
|
|
try:
|
|
# Encode image to base64
|
|
base64_image = base64.b64encode(image_bytes).decode("utf-8")
|
|
# Create data URL
|
|
image_data_url = f"data:{mime_type};base64,{base64_image}"
|
|
|
|
# Add image in OpenRouter format
|
|
user_prompt_content_list.append(
|
|
{"type": "image_url", "image_url": {"url": image_data_url}}
|
|
)
|
|
|
|
print(
|
|
f"Added attachment #{i+1}: {filename} ({attachment_type}) to the prompt"
|
|
)
|
|
except Exception as e:
|
|
print(
|
|
f"Error encoding image data for attachment {filename}: {e}"
|
|
)
|
|
except Exception as e:
|
|
print(f"Error processing image data: {e}")
|
|
# Add a text note about the error
|
|
user_prompt_content_list.append(
|
|
{
|
|
"type": "text",
|
|
"text": f"Note: There were {len(image_data_list)} attached images, but they could not be processed for analysis.",
|
|
}
|
|
)
|
|
|
|
# Get guild-specific model if configured, otherwise use default
|
|
member = message.author
|
|
server_role_str = "Unprivileged Member"
|
|
if member == await message.guild.fetch_member(message.guild.owner_id):
|
|
server_role_str = "Server Owner"
|
|
elif member.guild_permissions.administrator:
|
|
server_role_str = "Admin"
|
|
else:
|
|
perms = member.guild_permissions
|
|
if (
|
|
perms.manage_messages
|
|
or perms.kick_members
|
|
or perms.ban_members
|
|
or perms.moderate_members
|
|
):
|
|
server_role_str = "Moderator"
|
|
|
|
replied_to_message_content = "N/A (Not a reply)"
|
|
if message.reference and message.reference.message_id:
|
|
try:
|
|
replied_to_msg = await message.channel.fetch_message(
|
|
message.reference.message_id
|
|
)
|
|
replied_to_message_content = f"User '{replied_to_msg.author.name}' said: \"{replied_to_msg.content[:200]}{'...' if len(replied_to_msg.content) > 200 else ''}\""
|
|
except Exception as e:
|
|
replied_to_message_content = f"N/A (Error fetching replied-to: {e})"
|
|
|
|
recent_channel_history_str = "N/A (Could not fetch history)"
|
|
try:
|
|
history_messages = [
|
|
f"- {prev_msg.author.name}{' (BOT)' if prev_msg.author.bot else ''}: \"{prev_msg.content[:150]}{'...' if len(prev_msg.content) > 150 else ''}\" (ID: {prev_msg.id})"
|
|
async for prev_msg in message.channel.history(limit=11, before=message)
|
|
if prev_msg.id != message.id
|
|
]
|
|
recent_channel_history_str = (
|
|
"\n".join(list(reversed(history_messages))[:10])
|
|
if history_messages
|
|
else "No recent messages."
|
|
)
|
|
except Exception as e:
|
|
recent_channel_history_str = f"N/A (Error fetching history: {e})"
|
|
|
|
user_context_text = f"""User Infraction History (for {message.author.name}, ID: {message.author.id}):
|
|
---
|
|
{user_history if user_history else "No prior infractions recorded for this user in this guild."}
|
|
---
|
|
|
|
Current Message Context:
|
|
- Author: {message.author.name} (ID: {message.author.id})
|
|
- Server Role: {server_role_str}
|
|
- Channel: #{message.channel.name} (ID: {message.channel.id})
|
|
- Channel Category: {message.channel.category.name if message.channel.category else "No Category"}
|
|
- Channel Age-Restricted/NSFW (Discord Setting): {message.channel.is_nsfw()}
|
|
---
|
|
Replied-to Message:
|
|
{replied_to_message_content}
|
|
---
|
|
Recent Channel History (last up to 10 messages before this one):
|
|
{recent_channel_history_str}
|
|
---
|
|
Message Content to Analyze:
|
|
"{message_content}"
|
|
|
|
Now, analyze the message content and any attached media based on the server rules and ALL the context provided above.
|
|
Follow the JSON output format specified in the system prompt.
|
|
CRITICAL: Do NOT output anything other than the required JSON response.
|
|
"""
|
|
# Prepare parts for Vertex AI
|
|
vertex_parts: List[Any] = [types.Part(text=user_context_text)]
|
|
if image_data_list:
|
|
for mime_type, image_bytes, attachment_type, filename in image_data_list:
|
|
try:
|
|
# Vertex AI directly supports common image and video MIME types.
|
|
# Ensure mime_type is one of the supported ones by Vertex, e.g., image/png or video/mp4.
|
|
supported_image_mimes = [
|
|
"image/png",
|
|
"image/jpeg",
|
|
"image/webp",
|
|
"image/heic",
|
|
"image/heif",
|
|
"image/gif",
|
|
]
|
|
supported_video_mimes = [
|
|
"video/mp4",
|
|
"video/webm",
|
|
"video/quicktime",
|
|
"video/x-msvideo",
|
|
"video/x-matroska",
|
|
"video/x-flv",
|
|
]
|
|
clean_mime_type = mime_type.split(";")[0].lower()
|
|
|
|
if (
|
|
clean_mime_type in supported_image_mimes
|
|
or clean_mime_type in supported_video_mimes
|
|
):
|
|
vertex_parts.append(
|
|
types.Part(
|
|
inline_data=types.Blob(
|
|
data=image_bytes,
|
|
mime_type=clean_mime_type,
|
|
)
|
|
)
|
|
)
|
|
print(
|
|
f"Added attachment {filename} ({attachment_type}) with MIME {clean_mime_type} to Vertex prompt"
|
|
)
|
|
else:
|
|
print(
|
|
f"Skipping attachment {filename} due to unsupported MIME type for Vertex: {mime_type}"
|
|
)
|
|
vertex_parts.append(
|
|
types.Part(
|
|
text=f"[System Note: Attachment '{filename}' of type '{mime_type}' was not processed as it's not directly supported for vision by the current model configuration.]"
|
|
)
|
|
)
|
|
except Exception as e:
|
|
print(f"Error processing attachment {filename} for Vertex AI: {e}")
|
|
vertex_parts.append(
|
|
types.Part(
|
|
text=f"[System Note: Error processing attachment '{filename}'.]"
|
|
)
|
|
)
|
|
|
|
# Get guild-specific model if configured, otherwise use default
|
|
guild_id = message.guild.id
|
|
model_id_to_use = get_guild_config(
|
|
guild_id, "AI_MODEL", DEFAULT_VERTEX_AI_MODEL
|
|
)
|
|
# Vertex model path is usually like "publishers/google/models/gemini-1.5-flash-001"
|
|
# If model_id_to_use is just "gemini-1.5-flash-001", prepend "publishers/google/models/"
|
|
if not model_id_to_use.startswith("publishers/google/models/"):
|
|
model_path = f"publishers/google/models/{model_id_to_use}"
|
|
else:
|
|
model_path = model_id_to_use
|
|
|
|
thinking_config = types.ThinkingConfig(thinking_budget=0)
|
|
|
|
generation_config = types.GenerateContentConfig(
|
|
temperature=0.2,
|
|
max_output_tokens=2000, # Ensure enough for JSON
|
|
safety_settings=STANDARD_SAFETY_SETTINGS,
|
|
thinking_config=thinking_config,
|
|
)
|
|
|
|
# Construct contents for Vertex AI API
|
|
# System prompt is handled by the model's configuration or as the first message if not directly supported in GenerateContentConfig.
|
|
# For Vertex AI with `genai.Client`, system prompt is often part of the model's configuration or the first message.
|
|
# The `genai.GenerativeModel` has `system_instruction`.
|
|
# Here, we'll build the `contents` list.
|
|
# The system prompt is part of the model's understanding, and the user prompt contains the task.
|
|
# For multi-turn, history is added to `contents`. Here, it's a single-turn request.
|
|
|
|
request_contents = [
|
|
# System prompt can be the first message if not using system_instruction in model
|
|
# types.Content(role="system", parts=[types.Part(text=system_prompt_text)]), # This is one way
|
|
# Or, rely on the model's pre-set system prompt and just send user data.
|
|
# For this moderation task, the detailed instructions are better sent as part of the user turn
|
|
# or a specific system instruction if the client/model supports it well.
|
|
# Let's include the system prompt as the first part of the user message for clarity with current structure.
|
|
# The `system_prompt_text` is already defined and will be the primary text part.
|
|
# The `user_context_text` is what we constructed.
|
|
# The `vertex_parts` contains the `user_context_text` and any image data.
|
|
types.Content(role="user", parts=vertex_parts)
|
|
]
|
|
|
|
try:
|
|
print(f"Querying Vertex AI model {model_path}...")
|
|
|
|
# Prepare the generation config with system instruction
|
|
# The existing 'generation_config' (lines 1063-1072) already has temperature, max_tokens, safety_settings.
|
|
# We need to add system_instruction to it.
|
|
final_generation_config = types.GenerateContentConfig(
|
|
temperature=generation_config.temperature, # from existing config
|
|
max_output_tokens=generation_config.max_output_tokens, # from existing config
|
|
safety_settings=generation_config.safety_settings, # from existing config
|
|
system_instruction=types.Content(
|
|
role="system", parts=[types.Part(text=system_prompt_text)]
|
|
),
|
|
thinking_config=generation_config.thinking_config, # from existing config
|
|
# response_mime_type="application/json", # Consider if model supports this for forcing JSON
|
|
)
|
|
|
|
client = get_genai_client_for_model(model_id_to_use)
|
|
response = await client.aio.models.generate_content(
|
|
model=model_path, # Correctly formatted model path
|
|
contents=request_contents, # User's message with context and images
|
|
config=final_generation_config, # Pass the config with system_instruction
|
|
)
|
|
|
|
ai_response_content = self._get_response_text(response)
|
|
|
|
print(response.usage_metadata) # Print usage metadata for debugging
|
|
|
|
if not ai_response_content:
|
|
print("Error: AI response content is empty or could not be extracted.")
|
|
# Log safety ratings if available
|
|
if (
|
|
response
|
|
and response.candidates
|
|
and response.candidates[0].safety_ratings
|
|
):
|
|
ratings = ", ".join(
|
|
[
|
|
f"{r.category.name}: {r.probability.name}"
|
|
for r in response.candidates[0].safety_ratings
|
|
]
|
|
)
|
|
print(f"Safety Ratings: {ratings}")
|
|
if (
|
|
response
|
|
and response.candidates
|
|
and response.candidates[0].finish_reason
|
|
):
|
|
print(f"Finish Reason: {response.candidates[0].finish_reason.name}")
|
|
return None
|
|
|
|
# Attempt to parse the JSON response from the AI
|
|
try:
|
|
# Clean potential markdown code blocks
|
|
if ai_response_content.startswith("```json"):
|
|
ai_response_content = ai_response_content.strip("```json\n").strip(
|
|
"`\n "
|
|
)
|
|
elif ai_response_content.startswith("```"):
|
|
ai_response_content = ai_response_content.strip("```\n").strip(
|
|
"`\n "
|
|
)
|
|
|
|
ai_decision = json.loads(ai_response_content)
|
|
|
|
# Basic validation of the parsed JSON structure
|
|
if (
|
|
not isinstance(ai_decision, dict)
|
|
or not all(
|
|
k in ai_decision
|
|
for k in ["violation", "rule_violated", "reasoning", "action"]
|
|
)
|
|
or not isinstance(ai_decision.get("violation"), bool)
|
|
):
|
|
print(
|
|
f"Error: AI response missing expected keys or 'violation' is not bool. Response: {ai_response_content}"
|
|
)
|
|
return None
|
|
|
|
print(f"AI Analysis Received: {ai_decision}")
|
|
return ai_decision
|
|
|
|
except json.JSONDecodeError as e:
|
|
print(
|
|
f"Error: Could not decode JSON response from AI: {e}. Response: {ai_response_content}"
|
|
)
|
|
return None
|
|
except Exception as e: # Catch other parsing errors
|
|
print(
|
|
f"Error parsing AI response structure: {e}. Response: {ai_response_content}"
|
|
)
|
|
return None
|
|
|
|
except google_exceptions.GoogleAPICallError as e:
|
|
print(f"Error calling Vertex AI API: {e}")
|
|
return None
|
|
except Exception as e:
|
|
print(
|
|
f"An unexpected error occurred during Vertex AI query for message {message.id}: {e}"
|
|
)
|
|
return None
|
|
|
|
async def handle_violation(
|
|
self,
|
|
message: discord.Message,
|
|
ai_decision: dict,
|
|
notify_mods_message: str = None,
|
|
link_urls: list[str] | None = None,
|
|
):
|
|
"""
|
|
Takes action based on the AI's violation decision.
|
|
Also transmits action info via HTTP POST with API key header.
|
|
"""
|
|
import datetime
|
|
import aiohttp
|
|
|
|
rule_violated = ai_decision.get("rule_violated", "Unknown")
|
|
reasoning = ai_decision.get("reasoning", "No reasoning provided.")
|
|
action = ai_decision.get(
|
|
"action", "NOTIFY_MODS"
|
|
).upper() # Default to notify mods
|
|
guild_id = message.guild.id # Get guild_id once
|
|
user_id = message.author.id # Get user_id once
|
|
|
|
moderator_role_id = get_guild_config(guild_id, "MODERATOR_ROLE_ID")
|
|
moderator_role = (
|
|
message.guild.get_role(moderator_role_id) if moderator_role_id else None
|
|
)
|
|
mod_ping = (
|
|
moderator_role.mention
|
|
if moderator_role
|
|
else f"Moderators (Role ID {moderator_role_id} not found)"
|
|
)
|
|
|
|
current_timestamp_iso = datetime.datetime.now(datetime.timezone.utc).isoformat()
|
|
|
|
# Get the model from guild config, fall back to global default
|
|
model_used = get_guild_config(guild_id, "AI_MODEL", DEFAULT_VERTEX_AI_MODEL)
|
|
|
|
# --- Adjust action for first-time offenses ---
|
|
user_history_list = get_user_infraction_history(guild_id, user_id)
|
|
if action == "BAN" and not user_history_list:
|
|
combined_text = f"{rule_violated} {reasoning}".lower()
|
|
severe = False
|
|
if "gore" in combined_text:
|
|
severe = True
|
|
elif "csam" in combined_text:
|
|
severe = True
|
|
elif (
|
|
"pedophilia" in combined_text
|
|
or "child" in combined_text
|
|
or "5a" in combined_text
|
|
or "5" in combined_text
|
|
):
|
|
real_indicators = [
|
|
"real",
|
|
"real-life",
|
|
"real life",
|
|
"irl",
|
|
"photo",
|
|
"photograph",
|
|
"video",
|
|
]
|
|
if any(indicator in combined_text for indicator in real_indicators):
|
|
severe = True
|
|
if not severe:
|
|
print(
|
|
"Downgrading BAN to TIMEOUT_LONG due to first offense and lack of severe content."
|
|
)
|
|
action = "TIMEOUT_LONG"
|
|
|
|
# --- Prepare Notification ---
|
|
notification_embed = discord.Embed(
|
|
title="🚨 Rule Violation Detected 🚨",
|
|
description=f"AI analysis detected a violation of server rules.",
|
|
color=discord.Color.red(),
|
|
)
|
|
notification_embed.add_field(
|
|
name="User",
|
|
value=f"{message.author.mention} (`{message.author.id}`)",
|
|
inline=False,
|
|
)
|
|
notification_embed.add_field(
|
|
name="Channel", value=message.channel.mention, inline=False
|
|
)
|
|
notification_embed.add_field(
|
|
name="Rule Violated", value=f"**Rule {rule_violated}**", inline=True
|
|
)
|
|
notification_embed.add_field(
|
|
name="AI Suggested Action", value=f"`{action}`", inline=True
|
|
)
|
|
notification_embed.add_field(
|
|
name="AI Reasoning", value=f"_{reasoning}_", inline=False
|
|
)
|
|
notification_embed.add_field(
|
|
name="Message Link",
|
|
value=f"[Jump to Message]({message.jump_url})",
|
|
inline=False,
|
|
)
|
|
# Log message content and attachments for audit purposes
|
|
msg_content = message.content if message.content else "*No text content*"
|
|
notification_embed.add_field(
|
|
name="Message Content", value=msg_content[:1024], inline=False
|
|
)
|
|
|
|
# Add attachment information if present
|
|
if message.attachments:
|
|
attachment_info = []
|
|
for i, attachment in enumerate(message.attachments):
|
|
attachment_info.append(
|
|
f"{i+1}. {attachment.filename} ({attachment.content_type}) - [Link]({attachment.url})"
|
|
)
|
|
attachment_text = "\n".join(attachment_info)
|
|
notification_embed.add_field(
|
|
name="Attachments", value=attachment_text[:1024], inline=False
|
|
)
|
|
|
|
# Add the first image as a thumbnail if it's an image type
|
|
for attachment in message.attachments:
|
|
if any(
|
|
attachment.filename.lower().endswith(ext)
|
|
for ext in self.image_extensions
|
|
+ self.gif_extensions
|
|
+ self.video_extensions
|
|
):
|
|
notification_embed.set_thumbnail(url=attachment.url)
|
|
break
|
|
# Use the model_used variable that was defined earlier
|
|
notification_embed.set_footer(
|
|
text=f"AI Model: {model_used}. Learnhelp AI Moderation."
|
|
)
|
|
notification_embed.timestamp = (
|
|
discord.utils.utcnow()
|
|
) # Using discord.utils.utcnow() which is still supported
|
|
|
|
action_taken_message = "" # To append to the notification
|
|
testing_mode = get_guild_config(guild_id, "TESTING_MODE", False)
|
|
if testing_mode:
|
|
action_taken_message = (
|
|
f"[TEST MODE] Would have taken action `{action}`. No changes made."
|
|
)
|
|
notification_embed.color = discord.Color.greyple()
|
|
log_channel_id = get_guild_config(message.guild.id, "MOD_LOG_CHANNEL_ID")
|
|
log_channel = (
|
|
self.bot.get_channel(log_channel_id)
|
|
if log_channel_id
|
|
else message.channel
|
|
)
|
|
if action == "SUICIDAL":
|
|
suicidal_role_id = get_guild_config(
|
|
message.guild.id, "SUICIDAL_PING_ROLE_ID"
|
|
)
|
|
suicidal_role = (
|
|
message.guild.get_role(suicidal_role_id)
|
|
if suicidal_role_id
|
|
else None
|
|
)
|
|
ping_target = (
|
|
suicidal_role.mention
|
|
if suicidal_role
|
|
else f"Role ID {suicidal_role_id} (Suicidal Content)"
|
|
)
|
|
if not suicidal_role:
|
|
print(f"ERROR: Suicidal ping role ID {suicidal_role_id} not found.")
|
|
final_message = f"{ping_target}\n{action_taken_message}"
|
|
else:
|
|
suggestions_id = get_guild_config(
|
|
message.guild.id, "SUGGESTIONS_CHANNEL_ID"
|
|
)
|
|
suggestion_note = (
|
|
f"\nPlease review <#{suggestions_id}> for rule updates."
|
|
if suggestions_id
|
|
else ""
|
|
)
|
|
final_message = f"{mod_ping}\n{action_taken_message}{suggestion_note}"
|
|
await log_channel.send(
|
|
content=final_message,
|
|
embed=notification_embed,
|
|
view=self.QuickActionView(self, message.author),
|
|
)
|
|
return
|
|
|
|
# --- Perform Actions ---
|
|
try:
|
|
if action == "BAN":
|
|
action_taken_message = (
|
|
f"Action Taken: User **BANNED** and message deleted."
|
|
)
|
|
notification_embed.color = discord.Color.dark_red()
|
|
try:
|
|
await message.delete()
|
|
except discord.NotFound:
|
|
print("Message already deleted before banning.")
|
|
except discord.Forbidden:
|
|
print(
|
|
f"WARNING: Missing permissions to delete message before banning user {message.author}."
|
|
)
|
|
action_taken_message += (
|
|
" (Failed to delete message - check permissions)"
|
|
)
|
|
ban_reason = f"AI Mod: Rule {rule_violated}. Reason: {reasoning}"
|
|
await message.guild.ban(
|
|
message.author, reason=ban_reason, delete_message_days=1
|
|
)
|
|
print(
|
|
f"BANNED user {message.author} for violating rule {rule_violated}."
|
|
)
|
|
await add_user_infraction(
|
|
guild_id,
|
|
user_id,
|
|
rule_violated,
|
|
"BAN",
|
|
reasoning,
|
|
current_timestamp_iso,
|
|
message.id,
|
|
message.channel.id,
|
|
message.content[:100] if message.content else "",
|
|
[a.url for a in message.attachments] + (link_urls or []),
|
|
)
|
|
|
|
elif action == "KICK":
|
|
action_taken_message = (
|
|
f"Action Taken: User **KICKED** and message deleted."
|
|
)
|
|
notification_embed.color = discord.Color.from_rgb(
|
|
255, 127, 0
|
|
) # Dark Orange
|
|
try:
|
|
await message.delete()
|
|
except discord.NotFound:
|
|
print("Message already deleted before kicking.")
|
|
except discord.Forbidden:
|
|
print(
|
|
f"WARNING: Missing permissions to delete message before kicking user {message.author}."
|
|
)
|
|
action_taken_message += (
|
|
" (Failed to delete message - check permissions)"
|
|
)
|
|
kick_reason = f"AI Mod: Rule {rule_violated}. Reason: {reasoning}"
|
|
await message.author.kick(reason=kick_reason)
|
|
print(
|
|
f"KICKED user {message.author} for violating rule {rule_violated}."
|
|
)
|
|
await add_user_infraction(
|
|
guild_id,
|
|
user_id,
|
|
rule_violated,
|
|
"KICK",
|
|
reasoning,
|
|
current_timestamp_iso,
|
|
message.id,
|
|
message.channel.id,
|
|
message.content[:100] if message.content else "",
|
|
[a.url for a in message.attachments] + (link_urls or []),
|
|
)
|
|
|
|
elif action.startswith("TIMEOUT"):
|
|
duration_seconds = 0
|
|
duration_readable = ""
|
|
if action == "TIMEOUT_SHORT":
|
|
duration_seconds = 10 * 60 # 10 minutes
|
|
duration_readable = "10 minutes"
|
|
elif action == "TIMEOUT_MEDIUM":
|
|
duration_seconds = 60 * 60 # 1 hour
|
|
duration_readable = "1 hour"
|
|
elif action == "TIMEOUT_LONG":
|
|
duration_seconds = 24 * 60 * 60 # 1 day
|
|
duration_readable = "1 day"
|
|
|
|
if duration_seconds > 0:
|
|
action_taken_message = f"Action Taken: User **TIMED OUT for {duration_readable}** and message deleted."
|
|
notification_embed.color = discord.Color.blue()
|
|
try:
|
|
await message.delete()
|
|
except discord.NotFound:
|
|
print(
|
|
f"Message already deleted before timeout for {message.author}."
|
|
)
|
|
except discord.Forbidden:
|
|
print(
|
|
f"WARNING: Missing permissions to delete message before timeout for {message.author}."
|
|
)
|
|
action_taken_message += (
|
|
" (Failed to delete message - check permissions)"
|
|
)
|
|
|
|
timeout_reason = (
|
|
f"AI Mod: Rule {rule_violated}. Reason: {reasoning}"
|
|
)
|
|
# discord.py timeout takes a timedelta object
|
|
await message.author.timeout(
|
|
discord.utils.utcnow()
|
|
+ datetime.timedelta(seconds=duration_seconds),
|
|
reason=timeout_reason,
|
|
)
|
|
print(
|
|
f"TIMED OUT user {message.author} for {duration_readable} for violating rule {rule_violated}."
|
|
)
|
|
await add_user_infraction(
|
|
guild_id,
|
|
user_id,
|
|
rule_violated,
|
|
action,
|
|
reasoning,
|
|
current_timestamp_iso,
|
|
message.id,
|
|
message.channel.id,
|
|
message.content[:100] if message.content else "",
|
|
[a.url for a in message.attachments] + (link_urls or []),
|
|
)
|
|
else:
|
|
action_taken_message = (
|
|
"Action Taken: **Unknown timeout duration, notifying mods.**"
|
|
)
|
|
action = (
|
|
"NOTIFY_MODS" # Fallback if timeout duration is not recognized
|
|
)
|
|
print(
|
|
f"Unknown timeout duration for action {action}. Defaulting to NOTIFY_MODS."
|
|
)
|
|
|
|
elif action == "DELETE":
|
|
action_taken_message = f"Action Taken: Message **DELETED**."
|
|
await message.delete()
|
|
print(
|
|
f"DELETED message from {message.author} for violating rule {rule_violated}."
|
|
)
|
|
# Typically, a simple delete isn't a formal infraction unless it's part of a WARN.
|
|
# If you want to log deletes as infractions, add:
|
|
# add_user_infraction(guild_id, user_id, rule_violated, "DELETE", reasoning, current_timestamp_iso)
|
|
|
|
elif action == "WARN":
|
|
action_taken_message = (
|
|
f"Action Taken: Message **DELETED** (AI suggested WARN)."
|
|
)
|
|
notification_embed.color = discord.Color.orange()
|
|
await message.delete() # Warnings usually involve deleting the offending message
|
|
print(
|
|
f"DELETED message from {message.author} (AI suggested WARN for rule {rule_violated})."
|
|
)
|
|
try:
|
|
dm_channel = await message.author.create_dm()
|
|
warn_embed = discord.Embed(
|
|
title="⚠️ Moderation Warning",
|
|
description=(
|
|
f"Your recent message in **{message.guild.name}** was removed for violating **Rule {rule_violated}**."
|
|
),
|
|
color=discord.Color.orange(),
|
|
)
|
|
if message.content:
|
|
warn_embed.add_field(
|
|
name="Message Content",
|
|
value=message.content[:1024],
|
|
inline=False,
|
|
)
|
|
warn_embed.add_field(name="Reason", value=reasoning, inline=False)
|
|
warn_embed.set_footer(
|
|
text="Please review the server rules. This is a formal warning."
|
|
)
|
|
await dm_channel.send(embed=warn_embed)
|
|
action_taken_message += " User notified via DM with warning."
|
|
except discord.Forbidden:
|
|
print(
|
|
f"Could not DM warning to {message.author} (DMs likely disabled)."
|
|
)
|
|
action_taken_message += " (Could not DM user for warning)."
|
|
except Exception as e:
|
|
print(f"Error sending warning DM to {message.author}: {e}")
|
|
action_taken_message += " (Error sending warning DM)."
|
|
await add_user_infraction(
|
|
guild_id,
|
|
user_id,
|
|
rule_violated,
|
|
"WARN",
|
|
reasoning,
|
|
current_timestamp_iso,
|
|
message.id,
|
|
message.channel.id,
|
|
message.content[:100] if message.content else "",
|
|
[a.url for a in message.attachments] + (link_urls or []),
|
|
)
|
|
|
|
elif action == "NOTIFY_MODS":
|
|
action_taken_message = "Action Taken: **Moderator review requested.**"
|
|
notification_embed.color = discord.Color.gold()
|
|
print(
|
|
f"Notifying moderators about potential violation (Rule {rule_violated}) by {message.author}."
|
|
)
|
|
# NOTIFY_MODS itself isn't an infraction on the user, but a request for human review.
|
|
# If mods take action, they would log it manually or via a mod command.
|
|
if notify_mods_message:
|
|
notification_embed.add_field(
|
|
name="Additional Mod Message",
|
|
value=notify_mods_message,
|
|
inline=False,
|
|
)
|
|
|
|
elif action == "SUICIDAL":
|
|
action_taken_message = (
|
|
"Action Taken: **User DMed resources, relevant role notified.**"
|
|
)
|
|
# No infraction is typically logged for "SUICIDAL" as it's a support action.
|
|
notification_embed.title = "🚨 Suicidal Content Detected 🚨"
|
|
notification_embed.color = (
|
|
discord.Color.dark_purple()
|
|
) # A distinct color
|
|
notification_embed.description = "AI analysis detected content indicating potential suicidal ideation."
|
|
print(
|
|
f"SUICIDAL content detected from {message.author}. DMing resources and notifying role."
|
|
)
|
|
# DM the user with help resources
|
|
try:
|
|
dm_channel = await message.author.create_dm()
|
|
await dm_channel.send(SUICIDAL_HELP_RESOURCES)
|
|
action_taken_message += " User successfully DMed."
|
|
except discord.Forbidden:
|
|
print(
|
|
f"Could not DM suicidal help resources to {message.author} (DMs likely disabled)."
|
|
)
|
|
action_taken_message += " (Could not DM user - DMs disabled)."
|
|
except Exception as e:
|
|
print(
|
|
f"Error sending suicidal help resources DM to {message.author}: {e}"
|
|
)
|
|
action_taken_message += f" (Error DMing user: {e})."
|
|
# The message itself is usually not deleted for suicidal content, to allow for intervention.
|
|
# If deletion is desired, add: await message.delete() here.
|
|
|
|
else: # Includes "IGNORE" or unexpected actions
|
|
if ai_decision.get(
|
|
"violation"
|
|
): # If violation is true but action is IGNORE
|
|
action_taken_message = "Action Taken: **None** (AI suggested IGNORE despite flagging violation - Review Recommended)."
|
|
notification_embed.color = discord.Color.light_grey()
|
|
print(
|
|
f"AI flagged violation ({rule_violated}) but suggested IGNORE for message by {message.author}. Notifying mods for review."
|
|
)
|
|
else:
|
|
# This case shouldn't be reached if called correctly, but handle defensively
|
|
print(
|
|
f"No action taken for message by {message.author} (AI Action: {action}, Violation: False)"
|
|
)
|
|
return # Don't notify if no violation and action is IGNORE
|
|
|
|
# --- Send Notification to Moderators/Relevant Role ---
|
|
log_channel_id = get_guild_config(message.guild.id, "MOD_LOG_CHANNEL_ID")
|
|
log_channel = (
|
|
self.bot.get_channel(log_channel_id) if log_channel_id else None
|
|
)
|
|
if not log_channel:
|
|
print(
|
|
f"ERROR: Moderation log channel (ID: {log_channel_id}) not found or not configured. Defaulting to message channel."
|
|
)
|
|
log_channel = message.channel
|
|
if not log_channel:
|
|
print(
|
|
f"ERROR: Could not find even the original message channel {message.channel.id} to send notification."
|
|
)
|
|
return
|
|
|
|
if action == "SUICIDAL":
|
|
suicidal_role_id = get_guild_config(
|
|
message.guild.id, "SUICIDAL_PING_ROLE_ID"
|
|
)
|
|
suicidal_role = (
|
|
message.guild.get_role(suicidal_role_id)
|
|
if suicidal_role_id
|
|
else None
|
|
)
|
|
ping_target = (
|
|
suicidal_role.mention
|
|
if suicidal_role
|
|
else f"Role ID {suicidal_role_id} (Suicidal Content)"
|
|
)
|
|
if not suicidal_role:
|
|
print(f"ERROR: Suicidal ping role ID {suicidal_role_id} not found.")
|
|
final_message = f"{ping_target}\n{action_taken_message}"
|
|
await log_channel.send(
|
|
content=final_message,
|
|
embed=notification_embed,
|
|
view=self.QuickActionView(self, message.author),
|
|
)
|
|
elif moderator_role: # For other violations
|
|
suggestions_id = get_guild_config(
|
|
message.guild.id, "SUGGESTIONS_CHANNEL_ID"
|
|
)
|
|
suggestion_note = (
|
|
f"\nPlease review <#{suggestions_id}> for rule updates."
|
|
if suggestions_id
|
|
else ""
|
|
)
|
|
final_message = f"{mod_ping}\n{action_taken_message}{suggestion_note}"
|
|
await log_channel.send(
|
|
content=final_message,
|
|
embed=notification_embed,
|
|
view=self.QuickActionView(self, message.author),
|
|
)
|
|
else: # Fallback if moderator role is also not found for non-suicidal actions
|
|
print(
|
|
f"ERROR: Moderator role ID {moderator_role_id} not found for action {action}."
|
|
)
|
|
|
|
except discord.Forbidden as e:
|
|
print(
|
|
f"ERROR: Missing Permissions to perform action '{action}' for rule {rule_violated}. Details: {e}"
|
|
)
|
|
# Try to notify mods about the failure
|
|
if moderator_role:
|
|
try:
|
|
await message.channel.send(
|
|
f"{mod_ping} **PERMISSION ERROR!** Could not perform action `{action}` on message by {message.author.mention} "
|
|
f"for violating Rule {rule_violated}. Please check bot permissions.\n"
|
|
f"Reasoning: _{reasoning}_\nMessage Link: {message.jump_url}"
|
|
)
|
|
except discord.Forbidden:
|
|
print(
|
|
"FATAL: Bot lacks permission to send messages, even error notifications."
|
|
)
|
|
except discord.NotFound:
|
|
print(
|
|
f"Message {message.id} was likely already deleted when trying to perform action '{action}'."
|
|
)
|
|
except Exception as e:
|
|
print(
|
|
f"An unexpected error occurred during action execution for message {message.id}: {e}"
|
|
)
|
|
# Try to notify mods about the unexpected error
|
|
if moderator_role:
|
|
try:
|
|
await message.channel.send(
|
|
f"{mod_ping} **UNEXPECTED ERROR!** An error occurred while handling rule violation "
|
|
f"for {message.author.mention}. Please check bot logs.\n"
|
|
f"Rule: {rule_violated}, Action Attempted: {action}\nMessage Link: {message.jump_url}"
|
|
)
|
|
except discord.Forbidden:
|
|
print(
|
|
"FATAL: Bot lacks permission to send messages, even error notifications."
|
|
)
|
|
|
|
async def run_appeal_ai(
|
|
self,
|
|
guild: discord.Guild,
|
|
member: discord.User,
|
|
action: str,
|
|
appeal_text: str,
|
|
infraction: dict | None = None,
|
|
) -> str:
|
|
"""Run the appeal text through the higher tier AI model."""
|
|
if not self.genai_client:
|
|
return "AI review unavailable."
|
|
|
|
history = get_user_infraction_history(guild.id, member.id)
|
|
history_text = json.dumps(history, indent=2) if history else "None"
|
|
|
|
system_prompt = (
|
|
"You are reviewing a user's appeal of a moderation action. "
|
|
"Think very extensively about the appeal, the provided history, and the server rules. "
|
|
"Return a short verdict (UPHOLD or OVERTURN) and your reasoning in plain text."
|
|
)
|
|
|
|
context_lines = []
|
|
if infraction:
|
|
channel_name = guild.get_channel(infraction.get("channel_id", 0))
|
|
channel_display = (
|
|
channel_name.name if channel_name else str(infraction.get("channel_id"))
|
|
)
|
|
context_lines.append(f"Original Channel: {channel_display}")
|
|
msg_content = infraction.get("message_content")
|
|
if msg_content:
|
|
context_lines.append(f"Message Snippet: {msg_content}")
|
|
attachments = infraction.get("attachments")
|
|
if attachments:
|
|
context_lines.append(f"Attachments: {attachments}")
|
|
reasoning = infraction.get("reasoning")
|
|
if reasoning:
|
|
context_lines.append(f"AI Reasoning: {reasoning}")
|
|
|
|
user_prompt = (
|
|
f"Server Rules:\n{SERVER_RULES}\n\n"
|
|
f"User History:\n{history_text}\n\n"
|
|
+ ("\n".join(context_lines) + "\n\n" if context_lines else "")
|
|
+ f"Action Appealed: {action}\n"
|
|
f"Appeal Text: {appeal_text}"
|
|
)
|
|
|
|
generation_config = types.GenerateContentConfig(
|
|
temperature=0.2,
|
|
max_output_tokens=8192,
|
|
safety_settings=STANDARD_SAFETY_SETTINGS,
|
|
thinking_config=types.ThinkingConfig(
|
|
thinking_budget=APPEAL_AI_THINKING_BUDGET
|
|
),
|
|
system_instruction=types.Content(
|
|
role="system", parts=[types.Part(text=system_prompt)]
|
|
),
|
|
)
|
|
|
|
try:
|
|
client = get_genai_client_for_model(APPEAL_AI_MODEL)
|
|
response = await client.aio.models.generate_content(
|
|
model=f"publishers/google/models/{APPEAL_AI_MODEL}",
|
|
contents=[
|
|
types.Content(role="user", parts=[types.Part(text=user_prompt)])
|
|
],
|
|
config=generation_config,
|
|
)
|
|
result = self._get_response_text(response)
|
|
return result or "AI review failed to produce output."
|
|
except Exception as e: # noqa: BLE001
|
|
print(f"Appeal AI error: {e}")
|
|
return "AI review encountered an error."
|
|
|
|
async def _moderate_message(
|
|
self, message: discord.Message, event_name: str
|
|
) -> None:
|
|
"""Run moderation checks on a message."""
|
|
print(f"{event_name} triggered for message ID: {message.id}")
|
|
# --- Basic Checks ---
|
|
# Ignore messages from bots (including self)
|
|
if message.author.bot:
|
|
print(f"Ignoring message {message.id} from bot.")
|
|
return
|
|
link_urls: list[str] = []
|
|
embed_urls = [embed.url for embed in message.embeds if embed.url]
|
|
link_urls = (
|
|
self.extract_direct_attachment_urls(" ".join(embed_urls))
|
|
if embed_urls
|
|
else []
|
|
)
|
|
# Ignore messages without content, attachments, or direct attachment links
|
|
if not message.content and not message.attachments and not link_urls:
|
|
print(f"Ignoring message {message.id} with no content or attachments.")
|
|
return
|
|
# Ignore DMs
|
|
if not message.guild:
|
|
print(f"Ignoring message {message.id} from DM.")
|
|
return
|
|
# Check if moderation is enabled for this guild
|
|
if not get_guild_config(message.guild.id, "ENABLED", False):
|
|
print(
|
|
f"Moderation disabled for guild {message.guild.id}. Ignoring message {message.id}."
|
|
)
|
|
return
|
|
if get_guild_config(message.guild.id, "EVENT_MODE", False):
|
|
print(
|
|
f"Event mode enabled for guild {message.guild.id}. Ignoring message {message.id}."
|
|
)
|
|
return
|
|
|
|
# --- Suicidal Content Check ---
|
|
# Suicidal keyword check removed; handled by OpenRouter AI moderation.
|
|
|
|
# --- Prepare for AI Analysis ---
|
|
message_content = message.content
|
|
|
|
# Check for attachments
|
|
image_data_list = []
|
|
if message.attachments:
|
|
# Process all attachments
|
|
for attachment in message.attachments:
|
|
mime_type, image_bytes, attachment_type = await self.process_attachment(
|
|
attachment
|
|
)
|
|
if mime_type and image_bytes and attachment_type:
|
|
image_data_list.append(
|
|
(mime_type, image_bytes, attachment_type, attachment.filename)
|
|
)
|
|
print(
|
|
f"Processed attachment: {attachment.filename} as {attachment_type}"
|
|
)
|
|
|
|
# Log the number of attachments processed
|
|
if image_data_list:
|
|
print(
|
|
f"Processed {len(image_data_list)} attachments for message {message.id}"
|
|
)
|
|
|
|
# Check for direct link attachments in the message content
|
|
if link_urls:
|
|
processed_links = 0
|
|
for url in link_urls:
|
|
mime_type, image_bytes, attachment_type, filename = (
|
|
await self.process_url_attachment(url)
|
|
)
|
|
if mime_type and image_bytes and attachment_type:
|
|
image_data_list.append(
|
|
(mime_type, image_bytes, attachment_type, filename)
|
|
)
|
|
processed_links += 1
|
|
print(
|
|
f"Processed linked attachment: {filename} as {attachment_type}"
|
|
)
|
|
|
|
if processed_links > 0:
|
|
print(
|
|
f"Processed {processed_links} linked attachments for message {message.id}"
|
|
)
|
|
|
|
# Only proceed with AI analysis if there's text to analyze or attachments
|
|
if not message_content and not image_data_list:
|
|
print(
|
|
f"Ignoring message {message.id} with no content or valid attachments."
|
|
)
|
|
return
|
|
|
|
# NSFW channel check removed - AI will handle this context
|
|
|
|
# --- Call AI for Analysis (All Rules) ---
|
|
# Check if the Vertex AI client is available
|
|
if not self.genai_client:
|
|
print(
|
|
f"Skipping AI analysis for message {message.id}: Vertex AI client is not initialized."
|
|
)
|
|
return
|
|
|
|
# Prepare user history for the AI
|
|
infractions = get_user_infraction_history(message.guild.id, message.author.id)
|
|
history_summary_parts = []
|
|
if infractions:
|
|
for infr in infractions:
|
|
history_summary_parts.append(
|
|
f"- Action: {infr.get('action_taken', 'N/A')} for Rule {infr.get('rule_violated', 'N/A')} on {infr.get('timestamp', 'N/A')[:10]}. Reason: {infr.get('reasoning', 'N/A')[:50]}..."
|
|
)
|
|
user_history_summary = (
|
|
"\n".join(history_summary_parts)
|
|
if history_summary_parts
|
|
else "No prior infractions recorded."
|
|
)
|
|
|
|
# Limit history summary length to prevent excessively long prompts
|
|
max_history_len = 500
|
|
if len(user_history_summary) > max_history_len:
|
|
user_history_summary = user_history_summary[: max_history_len - 3] + "..."
|
|
|
|
print(
|
|
f"Analyzing message {message.id} from {message.author} in #{message.channel.name} with history..."
|
|
)
|
|
if image_data_list:
|
|
attachment_types = [data[2] for data in image_data_list]
|
|
print(
|
|
f"Including {len(image_data_list)} attachments in analysis: {', '.join(attachment_types)}"
|
|
)
|
|
ai_decision = await self.query_vertex_ai(
|
|
message, message_content, user_history_summary, image_data_list
|
|
)
|
|
|
|
# --- Process AI Decision ---
|
|
if not ai_decision:
|
|
print(f"Failed to get valid AI decision for message {message.id}.")
|
|
# Optionally notify mods about AI failure if it happens often
|
|
# Store the failure attempt for debugging
|
|
self.last_ai_decisions.append(
|
|
{
|
|
"message_id": message.id,
|
|
"author_name": str(message.author),
|
|
"author_id": message.author.id,
|
|
"message_content_snippet": (
|
|
message.content[:100] + "..."
|
|
if len(message.content) > 100
|
|
else message.content
|
|
),
|
|
"timestamp": datetime.datetime.now(
|
|
datetime.timezone.utc
|
|
).isoformat(),
|
|
"ai_decision": {
|
|
"error": "Failed to get valid AI decision",
|
|
"raw_response": None,
|
|
}, # Simplified error logging
|
|
}
|
|
)
|
|
return # Stop if AI fails or returns invalid data
|
|
|
|
# Store the AI decision regardless of violation status
|
|
self.last_ai_decisions.append(
|
|
{
|
|
"message_id": message.id,
|
|
"author_name": str(message.author),
|
|
"author_id": message.author.id,
|
|
"message_content_snippet": (
|
|
message.content[:100] + "..."
|
|
if len(message.content) > 100
|
|
else message.content
|
|
),
|
|
"timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(),
|
|
"ai_decision": ai_decision,
|
|
}
|
|
)
|
|
|
|
# Check if the AI flagged a violation
|
|
if ai_decision.get("violation"):
|
|
# Handle the violation based on AI decision without overrides
|
|
# Pass notify_mods_message if the action is NOTIFY_MODS
|
|
notify_mods_message = (
|
|
ai_decision.get("notify_mods_message")
|
|
if ai_decision.get("action") == "NOTIFY_MODS"
|
|
else None
|
|
)
|
|
await self.handle_violation(
|
|
message, ai_decision, notify_mods_message, link_urls
|
|
)
|
|
else:
|
|
# AI found no violation
|
|
print(
|
|
f"AI analysis complete for message {message.id}. No violation detected."
|
|
)
|
|
|
|
@commands.Cog.listener(name="on_message")
|
|
async def message_listener(self, message: discord.Message) -> None:
|
|
"""Trigger moderation when a new message is sent."""
|
|
await self._moderate_message(message, "on_message")
|
|
|
|
@commands.Cog.listener(name="on_message_edit")
|
|
async def message_edit_listener(
|
|
self, before: discord.Message, after: discord.Message
|
|
) -> None:
|
|
"""Trigger moderation when a message is edited."""
|
|
await self._moderate_message(after, "on_message_edit")
|
|
|
|
@debug_subgroup.command(
|
|
name="last_decisions",
|
|
description="View the last 5 AI moderation decisions (admin only).",
|
|
)
|
|
@app_commands.checks.has_permissions(administrator=True)
|
|
async def aidebug_last_decisions(self, interaction: discord.Interaction):
|
|
if not self.last_ai_decisions:
|
|
await interaction.response.send_message(
|
|
"No AI decisions have been recorded yet.", ephemeral=True
|
|
)
|
|
return
|
|
|
|
embed = discord.Embed(
|
|
title="Last 5 AI Moderation Decisions", color=discord.Color.purple()
|
|
)
|
|
embed.timestamp = discord.utils.utcnow()
|
|
|
|
for i, record in enumerate(
|
|
reversed(list(self.last_ai_decisions))
|
|
): # Show newest first
|
|
decision_info = record.get("ai_decision", {})
|
|
violation = decision_info.get("violation", "N/A")
|
|
rule_violated = decision_info.get("rule_violated", "N/A")
|
|
reasoning = decision_info.get("reasoning", "N/A")
|
|
action = decision_info.get("action", "N/A")
|
|
error_msg = decision_info.get("error")
|
|
|
|
field_value = (
|
|
f"**Author:** {record.get('author_name', 'N/A')} ({record.get('author_id', 'N/A')})\n"
|
|
f"**Message ID:** {record.get('message_id', 'N/A')}\n"
|
|
f"**Content Snippet:** ```{record.get('message_content_snippet', 'N/A')}```\n"
|
|
f"**Timestamp:** {record.get('timestamp', 'N/A')[:19].replace('T', ' ')}\n"
|
|
)
|
|
if error_msg:
|
|
field_value += f"**Status:** <font color='red'>Error during processing: {error_msg}</font>\n"
|
|
else:
|
|
field_value += (
|
|
f"**Violation:** {violation}\n"
|
|
f"**Rule Violated:** {rule_violated}\n"
|
|
f"**Action:** {action}\n"
|
|
f"**Reasoning:** ```{reasoning}```\n"
|
|
)
|
|
|
|
# Truncate field_value if it's too long for an embed field
|
|
if len(field_value) > 1024:
|
|
field_value = field_value[:1020] + "..."
|
|
|
|
embed.add_field(
|
|
name=f"Decision #{len(self.last_ai_decisions) - i}",
|
|
value=field_value,
|
|
inline=False,
|
|
)
|
|
if (
|
|
len(embed.fields) >= 5
|
|
): # Limit to 5 fields in one embed for very long entries, or send multiple embeds
|
|
break
|
|
|
|
if not embed.fields: # Should not happen if self.last_ai_decisions is not empty
|
|
await interaction.response.send_message(
|
|
"Could not format AI decisions.", ephemeral=True
|
|
)
|
|
return
|
|
|
|
await interaction.followup.send(embed=embed, ephemeral=True)
|
|
|
|
@aidebug_last_decisions.error
|
|
async def aidebug_last_decisions_error(
|
|
self, interaction: discord.Interaction, error: app_commands.AppCommandError
|
|
):
|
|
if isinstance(error, app_commands.MissingPermissions):
|
|
await interaction.response.send_message(
|
|
"You must be an administrator to use this command.", ephemeral=True
|
|
)
|
|
else:
|
|
await interaction.response.send_message(
|
|
f"An error occurred: {error}", ephemeral=True
|
|
)
|
|
print(f"Error in aidebug_last_decisions command: {error}")
|
|
|
|
@debug_subgroup.command(
|
|
name="appeal_tests",
|
|
description="Run sample appeals through the AI reviewer (admin only).",
|
|
)
|
|
@app_commands.checks.has_permissions(administrator=True)
|
|
async def aidebug_appeal_tests(self, interaction: discord.Interaction):
|
|
"""Run a few hardcoded appeals through the appeal AI for testing."""
|
|
await interaction.response.defer(thinking=True, ephemeral=True)
|
|
scenarios = [
|
|
("WARN", "I was excited and sent many messages quickly."),
|
|
("MUTE", "I only quoted a meme and it was taken as harassment."),
|
|
("BAN", "I posted NSFW art but believed it was allowed."),
|
|
]
|
|
|
|
results = []
|
|
guild = interaction.guild
|
|
for idx, (action, text) in enumerate(scenarios, 1):
|
|
dummy_infraction = {
|
|
"message_id": 2000 + idx,
|
|
"channel_id": interaction.channel.id,
|
|
"message_content": f"Test offending message {idx}",
|
|
"attachments": [],
|
|
"reasoning": "Initial moderation reasoning sample",
|
|
}
|
|
review = await self.run_appeal_ai(
|
|
guild, interaction.user, action, text, dummy_infraction
|
|
)
|
|
results.append((action, text, review))
|
|
|
|
embed = discord.Embed(
|
|
title="Appeal AI Test Results", color=discord.Color.green()
|
|
)
|
|
embed.timestamp = discord.utils.utcnow()
|
|
|
|
for idx, (action, text, review) in enumerate(results, 1):
|
|
value = (
|
|
f"**Action:** {action}\n**Appeal:** {text}\n**AI Verdict:** {review}"
|
|
)
|
|
if len(value) > 1024:
|
|
value = value[:1020] + "..."
|
|
embed.add_field(name=f"Scenario {idx}", value=value, inline=False)
|
|
|
|
await interaction.followup.send(embed=embed, ephemeral=True)
|
|
|
|
|
|
# Setup function required by discord.py to load the cog
|
|
async def setup(bot: commands.Bot):
|
|
"""Loads the AIModerationCog."""
|
|
# The API key is now fetched in cog_load, so we don't need to check here.
|
|
await bot.add_cog(AIModerationCog(bot))
|
|
print("AIModerationCog has been loaded.")
|