Implement new moderation features

This commit is contained in:
Codex 2025-06-06 16:24:40 +00:00 committed by Slipstream
parent 15ca534042
commit 996ef0bd21
Signed by: slipstream
GPG Key ID: 13E498CE010AC6FD

View File

@ -14,6 +14,7 @@ from PIL import Image # For image processing
import cv2 # For video processing
import numpy as np # For array operations
import tempfile # For temporary file operations
import shutil # For backing up files
from typing import Optional, List, Dict, Any, Tuple # For type hinting
import asyncio
import aiofiles
@ -58,6 +59,9 @@ MOD_LOG_API_SECRET_ENV_VAR = "MOD_LOG_API_SECRET"
GUILD_CONFIG_DIR = "data/" # Using the existing directory for all json data
GUILD_CONFIG_PATH = os.path.join(GUILD_CONFIG_DIR, "guild_config.json")
USER_INFRACTIONS_PATH = os.path.join(GUILD_CONFIG_DIR, "user_infractions.json")
INFRACTION_BACKUP_DIR = os.path.join(GUILD_CONFIG_DIR, "infraction_backups")
os.makedirs(INFRACTION_BACKUP_DIR, exist_ok=True)
os.makedirs(GUILD_CONFIG_DIR, exist_ok=True)
@ -261,8 +265,54 @@ class AIModerationCog(commands.Cog):
".mkv",
".flv",
] # Expanded list
self.backup_task = self.bot.loop.create_task(
self.backup_infractions_periodically()
)
print("AIModerationCog Initialized.")
class QuickActionView(discord.ui.View):
"""Buttons for quick moderator actions."""
def __init__(self, parent: "AIModerationCog", target: discord.Member):
super().__init__(timeout=3600)
self.parent = parent
self.target = target
@discord.ui.button(label="Escalate Ban", style=discord.ButtonStyle.danger)
async def escalate(
self, interaction: discord.Interaction, button: discord.ui.Button
):
if not interaction.user.guild_permissions.ban_members:
await interaction.response.send_message(
"You lack permission to ban members.", ephemeral=True
)
return
try:
await self.target.ban(reason="Escalated via mod panel")
await interaction.response.send_message(
f"Banned {self.target.mention}.", ephemeral=True
)
except Exception as e: # noqa: BLE001
await interaction.response.send_message(
f"Failed to ban: {e}", ephemeral=True
)
self.disable_all_items()
await interaction.message.edit(view=self)
@discord.ui.button(label="Ignore", style=discord.ButtonStyle.secondary)
async def ignore(
self, interaction: discord.Interaction, button: discord.ui.Button
):
if interaction.user.guild_permissions.manage_messages:
await interaction.message.delete()
await interaction.response.send_message(
"Notification dismissed.", ephemeral=True
)
else:
await interaction.response.send_message(
"No permission to manage messages.", ephemeral=True
)
async def cog_load(self):
"""Called when the cog is loaded."""
print("AIModerationCog cog_load started.")
@ -287,6 +337,22 @@ class AIModerationCog(commands.Cog):
# The genai.Client doesn't have an explicit close method in the same way aiohttp.ClientSession does.
# It typically manages its own resources.
print("AIModerationCog Unloaded.")
if self.backup_task:
self.backup_task.cancel()
async def backup_infractions_periodically(self):
"""Periodically back up the infractions file."""
await self.bot.wait_until_ready()
while not self.bot.is_closed():
timestamp = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S")
backup_path = os.path.join(
INFRACTION_BACKUP_DIR, f"user_infractions_{timestamp}.json"
)
try:
shutil.copy(USER_INFRACTIONS_PATH, backup_path)
except Exception as e: # noqa: BLE001
print(f"Failed to back up infractions: {e}")
await asyncio.sleep(24 * 60 * 60)
async def process_image(self, attachment: discord.Attachment) -> tuple[str, bytes]:
"""
@ -453,6 +519,34 @@ class AIModerationCog(commands.Cog):
parent=aimod_group,
)
@aimod_group.command(
name="sync",
description="Reload AI moderation configuration and infractions from disk.",
)
@app_commands.checks.has_permissions(administrator=True)
async def aimod_sync(self, interaction: discord.Interaction):
"""Reload configuration files from disk."""
try:
async with aiofiles.open(GUILD_CONFIG_PATH, "r", encoding="utf-8") as f:
data = await f.read()
async with CONFIG_LOCK:
global GUILD_CONFIG
GUILD_CONFIG = json.loads(data)
async with aiofiles.open(
USER_INFRACTIONS_PATH, "r", encoding="utf-8"
) as f2:
data2 = await f2.read()
async with CONFIG_LOCK:
global USER_INFRACTIONS
USER_INFRACTIONS = json.loads(data2)
await interaction.response.send_message(
"Configuration synced from disk.", ephemeral=True
)
except Exception as e: # noqa: BLE001
await interaction.response.send_message(
f"Failed to reload configuration: {e}", ephemeral=True
)
@config_subgroup.command(
name="log_channel", description="Set the moderation log channel."
)
@ -602,6 +696,23 @@ class AIModerationCog(commands.Cog):
ephemeral=False,
)
@config_subgroup.command(
name="event_mode",
description="Toggle temporary event mode for this guild.",
)
@app_commands.describe(enabled="Enable event mode (true/false)")
async def event_mode(self, interaction: discord.Interaction, enabled: bool):
if not interaction.user.guild_permissions.administrator:
await interaction.response.send_message(
"You must be an administrator to use this command.", ephemeral=False
)
return
await set_guild_config(interaction.guild.id, "EVENT_MODE", enabled)
await interaction.response.send_message(
f"Event mode is now {'enabled' if enabled else 'disabled'}.",
ephemeral=False,
)
@infractions_subgroup.command(
name="view",
description="View a user's AI moderation infraction history (mod/admin only).",
@ -701,6 +812,61 @@ class AIModerationCog(commands.Cog):
ephemeral=False,
)
@infractions_subgroup.command(
name="leaderboard",
description="Show users with the fewest infractions.",
)
async def leaderboard(self, interaction: discord.Interaction):
guild_id = interaction.guild.id
counts = {}
for key, infractions in USER_INFRACTIONS.items():
if key.startswith(f"{guild_id}_"):
uid = int(key.split("_", 1)[1])
counts[uid] = len(infractions)
if not counts:
await interaction.response.send_message(
"No infractions recorded for this guild.", ephemeral=True
)
return
sorted_users = sorted(counts.items(), key=lambda x: x[1])[:5]
lines = []
for uid, count in sorted_users:
member = interaction.guild.get_member(uid)
name = member.display_name if member else f"ID:{uid}"
lines.append(f"**{name}** - {count} infractions")
embed = discord.Embed(
title="Best Behavior Leaderboard",
description="\n".join(lines),
color=discord.Color.green(),
)
await interaction.response.send_message(embed=embed, ephemeral=False)
@infractions_subgroup.command(
name="restore",
description="Restore infractions from the latest backup (admin only).",
)
@app_commands.checks.has_permissions(administrator=True)
async def restore_infractions(self, interaction: discord.Interaction):
backups = sorted(os.listdir(INFRACTION_BACKUP_DIR))
if not backups:
await interaction.response.send_message("No backups found.", ephemeral=True)
return
latest = os.path.join(INFRACTION_BACKUP_DIR, backups[-1])
try:
shutil.copy(latest, USER_INFRACTIONS_PATH)
async with aiofiles.open(USER_INFRACTIONS_PATH, "r", encoding="utf-8") as f:
data = await f.read()
async with CONFIG_LOCK:
global USER_INFRACTIONS
USER_INFRACTIONS = json.loads(data)
await interaction.response.send_message(
f"Infractions restored from {backups[-1]}", ephemeral=False
)
except Exception as e: # noqa: BLE001
await interaction.response.send_message(
f"Failed to restore infractions: {e}", ephemeral=True
)
@model_subgroup.command(
name="set", description="Change the AI model used for moderation (admin only)."
)
@ -1789,10 +1955,26 @@ CRITICAL: Do NOT output anything other than the required JSON response.
if not suicidal_role:
print(f"ERROR: Suicidal ping role ID {suicidal_role_id} not found.")
final_message = f"{ping_target}\n{action_taken_message}"
await log_channel.send(content=final_message, embed=notification_embed)
await log_channel.send(
content=final_message,
embed=notification_embed,
view=self.QuickActionView(self, message.author),
)
elif moderator_role: # For other violations
final_message = f"{mod_ping}\n{action_taken_message}"
await log_channel.send(content=final_message, embed=notification_embed)
suggestions_id = get_guild_config(
message.guild.id, "SUGGESTIONS_CHANNEL_ID"
)
suggestion_note = (
f"\nPlease review <#{suggestions_id}> for rule updates."
if suggestions_id
else ""
)
final_message = f"{mod_ping}\n{action_taken_message}{suggestion_note}"
await log_channel.send(
content=final_message,
embed=notification_embed,
view=self.QuickActionView(self, message.author),
)
else: # Fallback if moderator role is also not found for non-suicidal actions
print(
f"ERROR: Moderator role ID {moderator_role_id} not found for action {action}."
@ -1858,6 +2040,11 @@ CRITICAL: Do NOT output anything other than the required JSON response.
f"Moderation disabled for guild {message.guild.id}. Ignoring message {message.id}."
)
return
if get_guild_config(message.guild.id, "EVENT_MODE", False):
print(
f"Event mode enabled for guild {message.guild.id}. Ignoring message {message.id}."
)
return
# --- Suicidal Content Check ---
# Suicidal keyword check removed; handled by OpenRouter AI moderation.