discordbot/cogs/ban_system_cog.py
Slipstream 9085e85e8d
Fix: Improve error reporting and add ban system cog
- Enhance `send_error_embed_to_owner` to include more context like command name, user, server, channel, timestamp, and message content.
- Add a new cog for a ban system, including commands for banning and unbanning users.
2025-05-20 19:15:43 -06:00

305 lines
13 KiB
Python

import discord
from discord.ext import commands
from discord import app_commands
import logging
import asyncio
import datetime
from typing import Optional, List, Dict, Any, Tuple
import asyncpg
# Configure logging
log = logging.getLogger(__name__)
class UserBannedError(commands.CheckFailure):
"""Custom exception for banned users."""
def __init__(self, user_id: int, message: str):
self.user_id = user_id
self.message = message
super().__init__(message)
class BanSystemCog(commands.Cog):
"""Cog for banning specific users from using the bot."""
def __init__(self, bot: commands.Bot):
self.bot = bot
self.banned_users_cache = {} # user_id -> {reason, message, banned_at, banned_by}
# Create the main command group for this cog
self.bansys_group = app_commands.Group(
name="bansys",
description="Bot user ban system commands (Owner only)"
)
# Register commands
self.register_commands()
# Add command group to the bot's tree
self.bot.tree.add_command(self.bansys_group)
log.info("BanSystemCog initialized with bansys command group.")
# Setup database table when the cog is loaded
self.bot.loop.create_task(self._setup_database())
# Register the global check
self.bot.add_check(self.check_if_user_banned)
async def _setup_database(self):
"""Create the banned_users table if it doesn't exist."""
# Wait for the bot to be ready to ensure the database pool is available
await self.bot.wait_until_ready()
if not hasattr(self.bot, 'pg_pool') or self.bot.pg_pool is None:
log.error("PostgreSQL pool not available. Ban system will not work properly.")
return
try:
async with self.bot.pg_pool.acquire() as conn:
await conn.execute("""
CREATE TABLE IF NOT EXISTS banned_users (
user_id BIGINT PRIMARY KEY,
reason TEXT,
message TEXT NOT NULL,
banned_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
banned_by BIGINT NOT NULL
);
""")
log.info("Created or verified banned_users table in PostgreSQL.")
# Load banned users into cache
await self._load_banned_users()
except Exception as e:
log.error(f"Error setting up banned_users table: {e}")
async def _load_banned_users(self):
"""Load all banned users into the cache."""
if not hasattr(self.bot, 'pg_pool') or self.bot.pg_pool is None:
log.error("PostgreSQL pool not available. Cannot load banned users.")
return
try:
async with self.bot.pg_pool.acquire() as conn:
records = await conn.fetch("SELECT * FROM banned_users")
# Clear the current cache
self.banned_users_cache.clear()
# Populate the cache with the database records
for record in records:
self.banned_users_cache[record['user_id']] = {
'reason': record['reason'],
'message': record['message'],
'banned_at': record['banned_at'],
'banned_by': record['banned_by']
}
log.info(f"Loaded {len(records)} banned users into cache.")
except Exception as e:
log.error(f"Error loading banned users: {e}")
async def check_if_user_banned(self, ctx):
"""Global check to prevent banned users from using commands."""
# Skip check for DMs
if not isinstance(ctx, commands.Context) and not hasattr(ctx, 'guild'):
return True
# Get the user ID
user_id = ctx.author.id if isinstance(ctx, commands.Context) else ctx.user.id
# Check if the user is banned
if user_id in self.banned_users_cache:
ban_info = self.banned_users_cache[user_id]
# Raise the custom exception with the ban message
raise UserBannedError(user_id, ban_info['message'])
# User is not banned, allow the command
return True
def register_commands(self):
"""Register all commands for this cog"""
# --- Ban User Command ---
ban_command = app_commands.Command(
name="ban",
description="Ban a user from using the bot",
callback=self.bansys_ban_callback,
parent=self.bansys_group
)
app_commands.describe(
user_id="The ID of the user to ban",
message="The message to show when they try to use commands",
reason="The reason for the ban (optional)",
ephemeral="Whether the response should be ephemeral (only visible to the user)"
)(ban_command)
self.bansys_group.add_command(ban_command)
# --- Unban User Command ---
unban_command = app_commands.Command(
name="unban",
description="Unban a user from using the bot",
callback=self.bansys_unban_callback,
parent=self.bansys_group
)
app_commands.describe(
user_id="The ID of the user to unban",
ephemeral="Whether the response should be ephemeral (only visible to the user)"
)(unban_command)
self.bansys_group.add_command(unban_command)
# --- List Banned Users Command ---
list_command = app_commands.Command(
name="list",
description="List all users banned from using the bot",
callback=self.bansys_list_callback,
parent=self.bansys_group
)
app_commands.describe(
ephemeral="Whether the response should be ephemeral (only visible to the user)"
)(list_command)
self.bansys_group.add_command(list_command)
async def bansys_ban_callback(self, interaction: discord.Interaction, user_id: str, message: str, reason: Optional[str] = None, ephemeral: bool = True):
"""Ban a user from using the bot."""
# Check if the user is the bot owner
if interaction.user.id != self.bot.owner_id:
await interaction.response.send_message("This command can only be used by the bot owner.", ephemeral=ephemeral)
return
try:
# Convert user_id to integer
user_id_int = int(user_id)
# Check if the user is already banned
if user_id_int in self.banned_users_cache:
await interaction.response.send_message(f"User {user_id_int} is already banned.", ephemeral=ephemeral)
return
# Add the user to the database
if hasattr(self.bot, 'pg_pool') and self.bot.pg_pool is not None:
async with self.bot.pg_pool.acquire() as conn:
await conn.execute("""
INSERT INTO banned_users (user_id, reason, message, banned_by)
VALUES ($1, $2, $3, $4)
ON CONFLICT (user_id) DO UPDATE
SET reason = $2, message = $3, banned_by = $4, banned_at = CURRENT_TIMESTAMP
""", user_id_int, reason, message, interaction.user.id)
# Add the user to the cache
self.banned_users_cache[user_id_int] = {
'reason': reason,
'message': message,
'banned_at': datetime.datetime.now(datetime.timezone.utc),
'banned_by': interaction.user.id
}
# Try to get the user's name for a more informative message
try:
user = await self.bot.fetch_user(user_id_int)
user_display = f"{user.name} ({user_id_int})"
except:
user_display = f"User ID: {user_id_int}"
await interaction.response.send_message(f"✅ Banned {user_display} from using the bot.\nMessage: {message}\nReason: {reason or 'No reason provided'}", ephemeral=ephemeral)
log.info(f"User {user_id_int} banned by {interaction.user.id}. Reason: {reason}")
except ValueError:
await interaction.response.send_message("Invalid user ID. Please provide a valid user ID.", ephemeral=ephemeral)
except Exception as e:
log.error(f"Error banning user {user_id}: {e}")
await interaction.response.send_message(f"An error occurred while banning the user: {e}", ephemeral=ephemeral)
async def bansys_unban_callback(self, interaction: discord.Interaction, user_id: str, ephemeral: bool = True):
"""Unban a user from using the bot."""
# Check if the user is the bot owner
if interaction.user.id != self.bot.owner_id:
await interaction.response.send_message("This command can only be used by the bot owner.", ephemeral=ephemeral)
return
try:
# Convert user_id to integer
user_id_int = int(user_id)
# Check if the user is banned
if user_id_int not in self.banned_users_cache:
await interaction.response.send_message(f"User {user_id_int} is not banned.", ephemeral=ephemeral)
return
# Remove the user from the database
if hasattr(self.bot, 'pg_pool') and self.bot.pg_pool is not None:
async with self.bot.pg_pool.acquire() as conn:
await conn.execute("DELETE FROM banned_users WHERE user_id = $1", user_id_int)
# Remove the user from the cache
del self.banned_users_cache[user_id_int]
# Try to get the user's name for a more informative message
try:
user = await self.bot.fetch_user(user_id_int)
user_display = f"{user.name} ({user_id_int})"
except:
user_display = f"User ID: {user_id_int}"
await interaction.response.send_message(f"✅ Unbanned {user_display} from using the bot.", ephemeral=ephemeral)
log.info(f"User {user_id_int} unbanned by {interaction.user.id}.")
except ValueError:
await interaction.response.send_message("Invalid user ID. Please provide a valid user ID.", ephemeral=ephemeral)
except Exception as e:
log.error(f"Error unbanning user {user_id}: {e}")
await interaction.response.send_message(f"An error occurred while unbanning the user: {e}", ephemeral=ephemeral)
async def bansys_list_callback(self, interaction: discord.Interaction, ephemeral: bool = True):
"""List all users banned from using the bot."""
# Check if the user is the bot owner
if interaction.user.id != self.bot.owner_id:
await interaction.response.send_message("This command can only be used by the bot owner.", ephemeral=ephemeral)
return
if not self.banned_users_cache:
await interaction.response.send_message("No users are currently banned.", ephemeral=ephemeral)
return
# Create an embed to display the banned users
embed = discord.Embed(
title="Banned Users",
description=f"Total banned users: {len(self.banned_users_cache)}",
color=discord.Color.red()
)
# Add each banned user to the embed
for user_id, ban_info in self.banned_users_cache.items():
# Try to get the user's name
try:
user = await self.bot.fetch_user(user_id)
user_display = f"{user.name} ({user_id})"
except:
user_display = f"User ID: {user_id}"
# Format the banned_at timestamp
banned_at = ban_info['banned_at'].strftime("%Y-%m-%d %H:%M:%S UTC") if isinstance(ban_info['banned_at'], datetime.datetime) else "Unknown"
# Try to get the banner's name
try:
banner = await self.bot.fetch_user(ban_info['banned_by'])
banner_display = f"{banner.name} ({ban_info['banned_by']})"
except:
banner_display = f"User ID: {ban_info['banned_by']}"
# Add a field for this user
embed.add_field(
name=user_display,
value=f"**Reason:** {ban_info['reason'] or 'No reason provided'}\n"
f"**Message:** {ban_info['message']}\n"
f"**Banned at:** {banned_at}\n"
f"**Banned by:** {banner_display}",
inline=False
)
await interaction.response.send_message(embed=embed, ephemeral=ephemeral)
# Setup function for loading the cog
async def setup(bot):
"""Add the BanSystemCog to the bot."""
await bot.add_cog(BanSystemCog(bot))
print("BanSystemCog setup complete.")