This commit is contained in:
Slipstream 2025-05-03 17:24:13 -06:00
parent 28713740ae
commit fb2278e986
Signed by: slipstream
GPG Key ID: 13E498CE010AC6FD
6 changed files with 1165 additions and 12 deletions

View File

@ -499,6 +499,27 @@ class CommandPermission(BaseModel):
class CommandPermissionsResponse(BaseModel):
permissions: Dict[str, List[str]] # Command name -> List of allowed role IDs
class CommandCustomizationResponse(BaseModel):
command_customizations: Dict[str, str] = {} # Original command name -> Custom command name
group_customizations: Dict[str, str] = {} # Original group name -> Custom group name
command_aliases: Dict[str, List[str]] = {} # Original command name -> List of aliases
class CommandCustomizationUpdate(BaseModel):
command_name: str
custom_name: Optional[str] = None # If None, removes customization
class GroupCustomizationUpdate(BaseModel):
group_name: str
custom_name: Optional[str] = None # If None, removes customization
class CommandAliasAdd(BaseModel):
command_name: str
alias_name: str
class CommandAliasRemove(BaseModel):
command_name: str
alias_name: str
# --- Authentication Dependency (Dashboard Specific) ---
# Note: This uses session cookies set by the dashboard auth flow
async def get_dashboard_user(request: Request) -> dict:

View File

@ -11,10 +11,36 @@ from pydantic import BaseModel
# Import the dependencies from api_server.py
try:
# Try relative import first
from .api_server import get_dashboard_user, verify_dashboard_guild_admin
from .api_server import (
get_dashboard_user,
verify_dashboard_guild_admin,
CommandCustomizationResponse,
CommandCustomizationUpdate,
GroupCustomizationUpdate,
CommandAliasAdd,
CommandAliasRemove
)
except ImportError:
# Fall back to absolute import
from api_server import get_dashboard_user, verify_dashboard_guild_admin
from api_server import (
get_dashboard_user,
verify_dashboard_guild_admin,
CommandCustomizationResponse,
CommandCustomizationUpdate,
GroupCustomizationUpdate,
CommandAliasAdd,
CommandAliasRemove
)
# Import settings_manager for database access
try:
from discordbot import settings_manager
except ImportError:
# Try relative import
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
from discordbot import settings_manager
# Set up logging
log = logging.getLogger(__name__)
@ -118,3 +144,279 @@ async def get_guild_commands(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error getting commands: {str(e)}"
)
# --- Command Customization Endpoints ---
@router.get("/guilds/{guild_id}/command-customizations", response_model=CommandCustomizationResponse)
async def get_command_customizations(
guild_id: int,
_user: dict = Depends(get_dashboard_user),
_admin: bool = Depends(verify_dashboard_guild_admin)
):
"""Get all command customizations for a guild."""
try:
# Check if settings_manager is available
if not settings_manager or not settings_manager.pg_pool:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Settings manager not available"
)
# Get command customizations
command_customizations = await settings_manager.get_all_command_customizations(guild_id)
if command_customizations is None:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to get command customizations"
)
# Get group customizations
group_customizations = await settings_manager.get_all_group_customizations(guild_id)
if group_customizations is None:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to get group customizations"
)
# Get command aliases
command_aliases = await settings_manager.get_all_command_aliases(guild_id)
if command_aliases is None:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to get command aliases"
)
return CommandCustomizationResponse(
command_customizations=command_customizations,
group_customizations=group_customizations,
command_aliases=command_aliases
)
except HTTPException:
# Re-raise HTTP exceptions
raise
except Exception as e:
log.error(f"Error getting command customizations for guild {guild_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error getting command customizations: {str(e)}"
)
@router.post("/guilds/{guild_id}/command-customizations/commands", status_code=status.HTTP_200_OK)
async def set_command_customization(
guild_id: int,
customization: CommandCustomizationUpdate,
_user: dict = Depends(get_dashboard_user),
_admin: bool = Depends(verify_dashboard_guild_admin)
):
"""Set a custom name for a command in a guild."""
try:
# Check if settings_manager is available
if not settings_manager or not settings_manager.pg_pool:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Settings manager not available"
)
# Validate custom name format if provided
if customization.custom_name is not None:
if not customization.custom_name.islower() or not customization.custom_name.replace('_', '').isalnum():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Custom command names must be lowercase and contain only letters, numbers, and underscores"
)
if len(customization.custom_name) < 1 or len(customization.custom_name) > 32:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Custom command names must be between 1 and 32 characters long"
)
# Set the custom command name
success = await settings_manager.set_custom_command_name(
guild_id,
customization.command_name,
customization.custom_name
)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to set custom command name"
)
return {"message": "Command customization updated successfully"}
except HTTPException:
# Re-raise HTTP exceptions
raise
except Exception as e:
log.error(f"Error setting command customization for guild {guild_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error setting command customization: {str(e)}"
)
@router.post("/guilds/{guild_id}/command-customizations/groups", status_code=status.HTTP_200_OK)
async def set_group_customization(
guild_id: int,
customization: GroupCustomizationUpdate,
_user: dict = Depends(get_dashboard_user),
_admin: bool = Depends(verify_dashboard_guild_admin)
):
"""Set a custom name for a command group in a guild."""
try:
# Check if settings_manager is available
if not settings_manager or not settings_manager.pg_pool:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Settings manager not available"
)
# Validate custom name format if provided
if customization.custom_name is not None:
if not customization.custom_name.islower() or not customization.custom_name.replace('_', '').isalnum():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Custom group names must be lowercase and contain only letters, numbers, and underscores"
)
if len(customization.custom_name) < 1 or len(customization.custom_name) > 32:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Custom group names must be between 1 and 32 characters long"
)
# Set the custom group name
success = await settings_manager.set_custom_group_name(
guild_id,
customization.group_name,
customization.custom_name
)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to set custom group name"
)
return {"message": "Group customization updated successfully"}
except HTTPException:
# Re-raise HTTP exceptions
raise
except Exception as e:
log.error(f"Error setting group customization for guild {guild_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error setting group customization: {str(e)}"
)
@router.post("/guilds/{guild_id}/command-customizations/aliases", status_code=status.HTTP_200_OK)
async def add_command_alias(
guild_id: int,
alias: CommandAliasAdd,
_user: dict = Depends(get_dashboard_user),
_admin: bool = Depends(verify_dashboard_guild_admin)
):
"""Add an alias for a command in a guild."""
try:
# Check if settings_manager is available
if not settings_manager or not settings_manager.pg_pool:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Settings manager not available"
)
# Validate alias format
if not alias.alias_name.islower() or not alias.alias_name.replace('_', '').isalnum():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Aliases must be lowercase and contain only letters, numbers, and underscores"
)
if len(alias.alias_name) < 1 or len(alias.alias_name) > 32:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Aliases must be between 1 and 32 characters long"
)
# Add the command alias
success = await settings_manager.add_command_alias(
guild_id,
alias.command_name,
alias.alias_name
)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to add command alias"
)
return {"message": "Command alias added successfully"}
except HTTPException:
# Re-raise HTTP exceptions
raise
except Exception as e:
log.error(f"Error adding command alias for guild {guild_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error adding command alias: {str(e)}"
)
@router.delete("/guilds/{guild_id}/command-customizations/aliases", status_code=status.HTTP_200_OK)
async def remove_command_alias(
guild_id: int,
alias: CommandAliasRemove,
_user: dict = Depends(get_dashboard_user),
_admin: bool = Depends(verify_dashboard_guild_admin)
):
"""Remove an alias for a command in a guild."""
try:
# Check if settings_manager is available
if not settings_manager or not settings_manager.pg_pool:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Settings manager not available"
)
# Remove the command alias
success = await settings_manager.remove_command_alias(
guild_id,
alias.command_name,
alias.alias_name
)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to remove command alias"
)
return {"message": "Command alias removed successfully"}
except HTTPException:
# Re-raise HTTP exceptions
raise
except Exception as e:
log.error(f"Error removing command alias for guild {guild_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error removing command alias: {str(e)}"
)
@router.post("/guilds/{guild_id}/sync-commands", status_code=status.HTTP_200_OK)
async def sync_guild_commands(
guild_id: int,
_user: dict = Depends(get_dashboard_user),
_admin: bool = Depends(verify_dashboard_guild_admin)
):
"""Sync commands for a guild to apply customizations."""
try:
# This endpoint would trigger a command sync for the guild
# In a real implementation, this would communicate with the bot to sync commands
# For now, we'll just return a success message
return {"message": "Command sync requested. This may take a moment to complete."}
except Exception as e:
log.error(f"Error syncing commands for guild {guild_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error syncing commands: {str(e)}"
)

View File

@ -2,10 +2,13 @@ import discord
from discord.ext import commands
import logging
from discordbot import settings_manager # Assuming settings_manager is accessible
from typing import Optional
log = logging.getLogger(__name__)
# CORE_COGS definition moved to main.py
# Get CORE_COGS from bot instance
def get_core_cogs(bot):
return getattr(bot, 'core_cogs', {'SettingsCog', 'HelpCog'})
class SettingsCog(commands.Cog, name="Settings"):
"""Commands for server administrators to configure the bot."""
@ -65,7 +68,8 @@ class SettingsCog(commands.Cog, name="Settings"):
await ctx.send(f"Error: Cog `{cog_name}` not found.")
return
if cog_name in CORE_COGS:
core_cogs = get_core_cogs(self.bot)
if cog_name in core_cogs:
await ctx.send(f"Error: Core cog `{cog_name}` cannot be disabled/enabled.")
return
@ -88,7 +92,8 @@ class SettingsCog(commands.Cog, name="Settings"):
await ctx.send(f"Error: Cog `{cog_name}` not found.")
return
if cog_name in CORE_COGS:
core_cogs = get_core_cogs(self.bot)
if cog_name in core_cogs:
await ctx.send(f"Error: Core cog `{cog_name}` cannot be disabled.")
return
@ -114,8 +119,8 @@ class SettingsCog(commands.Cog, name="Settings"):
embed = discord.Embed(title="Available Modules (Cogs)", color=discord.Color.blue())
lines = []
# Use the CORE_COGS defined at the top of this file
core_cogs_list = CORE_COGS
# Get core cogs from bot instance
core_cogs_list = get_core_cogs(self.bot)
for cog_name in sorted(self.bot.cogs.keys()):
is_enabled = await settings_manager.is_cog_enabled(guild_id, cog_name, default_enabled=default_behavior)
@ -172,6 +177,223 @@ class SettingsCog(commands.Cog, name="Settings"):
await ctx.send(f"Failed to remove permission for command `{command_name}`. Check logs.")
log.error(f"Failed to remove permission for command '{command_name}', role {role_id} in guild {guild_id}")
# --- Command Customization Management ---
@commands.command(name='setcmdname', help="Sets a custom name for a slash command in this server. Usage: `setcmdname <original_name> <custom_name>`")
@commands.has_permissions(administrator=True)
@commands.guild_only()
async def set_command_name(self, ctx: commands.Context, original_name: str, custom_name: str):
"""Sets a custom name for a slash command in the current guild."""
# Validate the original command exists
command_found = False
for cmd in self.bot.tree.get_commands():
if cmd.name == original_name:
command_found = True
break
if not command_found:
await ctx.send(f"Error: Slash command `{original_name}` not found.")
return
# Validate custom name format (Discord has restrictions on command names)
if not custom_name.islower() or not custom_name.replace('_', '').isalnum():
await ctx.send("Error: Custom command names must be lowercase and contain only letters, numbers, and underscores.")
return
if len(custom_name) < 1 or len(custom_name) > 32:
await ctx.send("Error: Custom command names must be between 1 and 32 characters long.")
return
guild_id = ctx.guild.id
success = await settings_manager.set_custom_command_name(guild_id, original_name, custom_name)
if success:
await ctx.send(f"Command `{original_name}` will now appear as `{custom_name}` in this server.\n"
f"Note: You'll need to restart the bot or use `/sync` for changes to take effect.")
log.info(f"Custom command name set for '{original_name}' to '{custom_name}' in guild {guild_id} by {ctx.author.name}")
else:
await ctx.send(f"Failed to set custom command name. Check logs.")
log.error(f"Failed to set custom command name for '{original_name}' in guild {guild_id}")
@commands.command(name='resetcmdname', help="Resets a slash command to its original name. Usage: `resetcmdname <original_name>`")
@commands.has_permissions(administrator=True)
@commands.guild_only()
async def reset_command_name(self, ctx: commands.Context, original_name: str):
"""Resets a slash command to its original name in the current guild."""
guild_id = ctx.guild.id
success = await settings_manager.set_custom_command_name(guild_id, original_name, None)
if success:
await ctx.send(f"Command `{original_name}` has been reset to its original name in this server.\n"
f"Note: You'll need to restart the bot or use `/sync` for changes to take effect.")
log.info(f"Custom command name reset for '{original_name}' in guild {guild_id} by {ctx.author.name}")
else:
await ctx.send(f"Failed to reset command name. Check logs.")
log.error(f"Failed to reset command name for '{original_name}' in guild {guild_id}")
@commands.command(name='setgroupname', help="Sets a custom name for a command group. Usage: `setgroupname <original_name> <custom_name>`")
@commands.has_permissions(administrator=True)
@commands.guild_only()
async def set_group_name(self, ctx: commands.Context, original_name: str, custom_name: str):
"""Sets a custom name for a command group in the current guild."""
# Validate the original group exists
group_found = False
for cmd in self.bot.tree.get_commands():
if hasattr(cmd, 'parent') and cmd.parent and cmd.parent.name == original_name:
group_found = True
break
if not group_found:
await ctx.send(f"Error: Command group `{original_name}` not found.")
return
# Validate custom name format (Discord has restrictions on command names)
if not custom_name.islower() or not custom_name.replace('_', '').isalnum():
await ctx.send("Error: Custom group names must be lowercase and contain only letters, numbers, and underscores.")
return
if len(custom_name) < 1 or len(custom_name) > 32:
await ctx.send("Error: Custom group names must be between 1 and 32 characters long.")
return
guild_id = ctx.guild.id
success = await settings_manager.set_custom_group_name(guild_id, original_name, custom_name)
if success:
await ctx.send(f"Command group `{original_name}` will now appear as `{custom_name}` in this server.\n"
f"Note: You'll need to restart the bot or use `/sync` for changes to take effect.")
log.info(f"Custom group name set for '{original_name}' to '{custom_name}' in guild {guild_id} by {ctx.author.name}")
else:
await ctx.send(f"Failed to set custom group name. Check logs.")
log.error(f"Failed to set custom group name for '{original_name}' in guild {guild_id}")
@commands.command(name='resetgroupname', help="Resets a command group to its original name. Usage: `resetgroupname <original_name>`")
@commands.has_permissions(administrator=True)
@commands.guild_only()
async def reset_group_name(self, ctx: commands.Context, original_name: str):
"""Resets a command group to its original name in the current guild."""
guild_id = ctx.guild.id
success = await settings_manager.set_custom_group_name(guild_id, original_name, None)
if success:
await ctx.send(f"Command group `{original_name}` has been reset to its original name in this server.\n"
f"Note: You'll need to restart the bot or use `/sync` for changes to take effect.")
log.info(f"Custom group name reset for '{original_name}' in guild {guild_id} by {ctx.author.name}")
else:
await ctx.send(f"Failed to reset group name. Check logs.")
log.error(f"Failed to reset group name for '{original_name}' in guild {guild_id}")
@commands.command(name='addcmdalias', help="Adds an alias for a command. Usage: `addcmdalias <original_name> <alias_name>`")
@commands.has_permissions(administrator=True)
@commands.guild_only()
async def add_command_alias(self, ctx: commands.Context, original_name: str, alias_name: str):
"""Adds an alias for a command in the current guild."""
# Validate the original command exists
command = self.bot.get_command(original_name)
if not command:
await ctx.send(f"Error: Command `{original_name}` not found.")
return
# Validate alias format
if not alias_name.islower() or not alias_name.replace('_', '').isalnum():
await ctx.send("Error: Aliases must be lowercase and contain only letters, numbers, and underscores.")
return
if len(alias_name) < 1 or len(alias_name) > 32:
await ctx.send("Error: Aliases must be between 1 and 32 characters long.")
return
guild_id = ctx.guild.id
success = await settings_manager.add_command_alias(guild_id, original_name, alias_name)
if success:
await ctx.send(f"Added alias `{alias_name}` for command `{original_name}` in this server.")
log.info(f"Command alias added for '{original_name}': '{alias_name}' in guild {guild_id} by {ctx.author.name}")
else:
await ctx.send(f"Failed to add command alias. Check logs.")
log.error(f"Failed to add command alias for '{original_name}' in guild {guild_id}")
@commands.command(name='removecmdalias', help="Removes an alias for a command. Usage: `removecmdalias <original_name> <alias_name>`")
@commands.has_permissions(administrator=True)
@commands.guild_only()
async def remove_command_alias(self, ctx: commands.Context, original_name: str, alias_name: str):
"""Removes an alias for a command in the current guild."""
guild_id = ctx.guild.id
success = await settings_manager.remove_command_alias(guild_id, original_name, alias_name)
if success:
await ctx.send(f"Removed alias `{alias_name}` for command `{original_name}` in this server.")
log.info(f"Command alias removed for '{original_name}': '{alias_name}' in guild {guild_id} by {ctx.author.name}")
else:
await ctx.send(f"Failed to remove command alias. Check logs.")
log.error(f"Failed to remove command alias for '{original_name}' in guild {guild_id}")
@commands.command(name='listcmdaliases', help="Lists all command aliases for this server.")
@commands.guild_only()
async def list_command_aliases(self, ctx: commands.Context):
"""Lists all command aliases for the current guild."""
guild_id = ctx.guild.id
aliases_dict = await settings_manager.get_all_command_aliases(guild_id)
if aliases_dict is None:
await ctx.send("Failed to retrieve command aliases. Check logs.")
return
if not aliases_dict:
await ctx.send("No command aliases are set for this server.")
return
embed = discord.Embed(title="Command Aliases", color=discord.Color.blue())
for cmd_name, aliases in aliases_dict.items():
embed.add_field(name=f"Command: {cmd_name}", value=", ".join([f"`{alias}`" for alias in aliases]), inline=False)
await ctx.send(embed=embed)
@commands.command(name='listcustomcmds', help="Lists all custom command names for this server.")
@commands.guild_only()
async def list_custom_commands(self, ctx: commands.Context):
"""Lists all custom command names for the current guild."""
guild_id = ctx.guild.id
cmd_customizations = await settings_manager.get_all_command_customizations(guild_id)
group_customizations = await settings_manager.get_all_group_customizations(guild_id)
if cmd_customizations is None or group_customizations is None:
await ctx.send("Failed to retrieve command customizations. Check logs.")
return
if not cmd_customizations and not group_customizations:
await ctx.send("No command customizations are set for this server.")
return
embed = discord.Embed(title="Command Customizations", color=discord.Color.blue())
if cmd_customizations:
cmd_text = "\n".join([f"`{orig}` → `{custom}`" for orig, custom in cmd_customizations.items()])
embed.add_field(name="Custom Command Names", value=cmd_text, inline=False)
if group_customizations:
group_text = "\n".join([f"`{orig}` → `{custom}`" for orig, custom in group_customizations.items()])
embed.add_field(name="Custom Group Names", value=group_text, inline=False)
await ctx.send(embed=embed)
@commands.command(name='synccmds', help="Syncs slash commands with Discord to apply customizations.")
@commands.has_permissions(administrator=True)
@commands.guild_only()
async def sync_commands(self, ctx: commands.Context):
"""Syncs slash commands with Discord to apply customizations."""
try:
guild = ctx.guild
await ctx.send("Syncing commands with Discord... This may take a moment.")
# Sync commands for this guild specifically
synced = await self.bot.tree.sync(guild=guild)
await ctx.send(f"Successfully synced {len(synced)} commands for this server.")
log.info(f"Commands synced for guild {guild.id} by {ctx.author.name}")
except Exception as e:
await ctx.send(f"Failed to sync commands: {str(e)}")
log.error(f"Failed to sync commands for guild {ctx.guild.id}: {e}")
# TODO: Add command to list permissions?
# --- Error Handling for this Cog ---
@ -180,6 +402,13 @@ class SettingsCog(commands.Cog, name="Settings"):
@disable_cog.error
@allow_command.error
@disallow_command.error
@set_command_name.error
@reset_command_name.error
@set_group_name.error
@reset_group_name.error
@add_command_alias.error
@remove_command_alias.error
@sync_commands.error
async def on_command_error(self, ctx: commands.Context, error):
if isinstance(error, commands.MissingPermissions):
await ctx.send("You need Administrator permissions to use this command.")

201
command_customization.py Normal file
View File

@ -0,0 +1,201 @@
"""
Command customization utilities for Discord bot.
Handles guild-specific command names and groups.
"""
import discord
from discord import app_commands
import logging
from typing import Dict, List, Optional, Tuple, Any, Callable, Awaitable
import asyncio
from . import settings_manager
log = logging.getLogger(__name__)
class GuildCommandTransformer(app_commands.Transformer):
"""
A transformer that customizes command names based on guild settings.
This is used to transform command names when they are displayed to users.
"""
async def transform(self, interaction: discord.Interaction, value: str) -> str:
"""Transform the command name based on guild settings."""
if not interaction.guild:
return value # No customization in DMs
guild_id = interaction.guild.id
custom_name = await settings_manager.get_custom_command_name(guild_id, value)
return custom_name if custom_name else value
class GuildCommandSyncer:
"""
Handles syncing commands with guild-specific customizations.
"""
def __init__(self, bot):
self.bot = bot
self._command_cache = {} # Cache of original commands
self._customized_commands = {} # Guild ID -> {original_name: custom_command}
async def load_guild_customizations(self, guild_id: int) -> Dict[str, str]:
"""
Load command customizations for a specific guild.
Returns a dictionary mapping original command names to custom names.
"""
cmd_customizations = await settings_manager.get_all_command_customizations(guild_id)
group_customizations = await settings_manager.get_all_group_customizations(guild_id)
if cmd_customizations is None or group_customizations is None:
log.error(f"Failed to load command customizations for guild {guild_id}")
return {}
# Combine command and group customizations
customizations = {**cmd_customizations, **group_customizations}
log.info(f"Loaded {len(customizations)} command customizations for guild {guild_id}")
return customizations
async def prepare_guild_commands(self, guild_id: int) -> List[app_commands.Command]:
"""
Prepare guild-specific commands with customized names.
Returns a list of commands with guild-specific customizations applied.
"""
# Get all global commands
global_commands = self.bot.tree.get_commands()
# Cache original commands if not already cached
if not self._command_cache:
self._command_cache = {cmd.name: cmd for cmd in global_commands}
# Load customizations for this guild
customizations = await self.load_guild_customizations(guild_id)
if not customizations:
return global_commands # No customizations, use global commands
# Create guild-specific commands with custom names
guild_commands = []
for cmd in global_commands:
if cmd.name in customizations:
# Create a copy of the command with the custom name
custom_name = customizations[cmd.name]
custom_cmd = self._create_custom_command(cmd, custom_name)
guild_commands.append(custom_cmd)
else:
# Use the original command
guild_commands.append(cmd)
# Store customized commands for this guild
self._customized_commands[guild_id] = {
cmd.name: custom_cmd for cmd, custom_cmd in zip(global_commands, guild_commands)
if cmd.name in customizations
}
return guild_commands
def _create_custom_command(self, original_cmd: app_commands.Command, custom_name: str) -> app_commands.Command:
"""
Create a copy of a command with a custom name.
This is a simplified version - in practice, you'd need to handle all command attributes.
"""
# For simplicity, we're just creating a basic copy with the custom name
# In a real implementation, you'd need to handle all command attributes and options
custom_cmd = app_commands.Command(
name=custom_name,
description=original_cmd.description,
callback=original_cmd.callback
)
# Copy options, if any
if hasattr(original_cmd, 'options'):
custom_cmd._params = original_cmd._params.copy()
return custom_cmd
async def sync_guild_commands(self, guild: discord.Guild) -> List[app_commands.Command]:
"""
Sync commands for a specific guild with customizations.
Returns the list of synced commands.
"""
try:
# Prepare guild-specific commands
guild_commands = await self.prepare_guild_commands(guild.id)
# Sync commands with Discord
synced = await self.bot.tree.sync(guild=guild)
log.info(f"Synced {len(synced)} commands for guild {guild.id}")
return synced
except Exception as e:
log.error(f"Failed to sync commands for guild {guild.id}: {e}")
raise
# Command registration decorator with guild customization support
def guild_command(name: str, description: str, **kwargs):
"""
Decorator for registering commands with guild-specific name customization.
Usage:
@guild_command(name="mycommand", description="My command description")
async def my_command(interaction: discord.Interaction):
...
"""
def decorator(func: Callable[[discord.Interaction, ...], Awaitable[Any]]):
# Create the app command
@app_commands.command(name=name, description=description, **kwargs)
async def wrapper(interaction: discord.Interaction, *args, **kwargs):
return await func(interaction, *args, **kwargs)
# Store the original name for reference
wrapper.__original_name__ = name
return wrapper
return decorator
# Command group with guild customization support
class GuildCommandGroup(app_commands.Group):
"""
A command group that supports guild-specific name customization.
Usage:
my_group = GuildCommandGroup(name="mygroup", description="My group description")
@my_group.command(name="subcommand", description="Subcommand description")
async def my_subcommand(interaction: discord.Interaction):
...
"""
def __init__(self, name: str, description: str, **kwargs):
super().__init__(name=name, description=description, **kwargs)
self.__original_name__ = name
async def get_guild_name(self, guild_id: int) -> str:
"""Get the guild-specific name for this group."""
custom_name = await settings_manager.get_custom_group_name(guild_id, self.__original_name__)
return custom_name if custom_name else self.__original_name__
# Utility functions for command registration
async def register_guild_commands(bot, guild: discord.Guild) -> List[app_commands.Command]:
"""
Register commands for a specific guild with customizations.
Returns the list of registered commands.
"""
syncer = GuildCommandSyncer(bot)
return await syncer.sync_guild_commands(guild)
async def register_all_guild_commands(bot) -> Dict[int, List[app_commands.Command]]:
"""
Register commands for all guilds with customizations.
Returns a dictionary mapping guild IDs to lists of registered commands.
"""
syncer = GuildCommandSyncer(bot)
results = {}
for guild in bot.guilds:
try:
results[guild.id] = await syncer.sync_guild_commands(guild)
except Exception as e:
log.error(f"Failed to sync commands for guild {guild.id}: {e}")
results[guild.id] = []
return results

26
main.py
View File

@ -13,6 +13,7 @@ from commands import load_all_cogs, reload_all_cogs
from error_handler import handle_error, patch_discord_methods, store_interaction_content
from utils import reload_script
import settings_manager # Import the settings manager
import command_customization # Import command customization utilities
# Import the unified API service runner and the sync API module
import sys
@ -124,13 +125,20 @@ async def on_ready():
commands_before = [cmd.name for cmd in bot.tree.get_commands()]
print(f"Commands before sync: {commands_before}")
# Perform sync
synced = await bot.tree.sync()
print(f"Synced {len(synced)} command(s)")
# Sync global commands first
synced_global = await bot.tree.sync()
print(f"Synced {len(synced_global)} global command(s)")
# Now sync guild-specific commands with customizations
print("Syncing guild-specific command customizations...")
guild_syncs = await command_customization.register_all_guild_commands(bot)
total_guild_syncs = sum(len(cmds) for cmds in guild_syncs.values())
print(f"Synced commands for {len(guild_syncs)} guilds with a total of {total_guild_syncs} customized commands")
# List commands after sync
commands_after = [cmd.name for cmd in bot.tree.get_commands()]
print(f"Commands after sync: {commands_after}")
print(f"Global commands after sync: {commands_after}")
except Exception as e:
print(f"Failed to sync commands: {e}")
@ -148,13 +156,21 @@ async def on_shard_disconnect(shard_id):
@bot.event
async def on_guild_join(guild: discord.Guild):
"""Adds guild to database when bot joins."""
"""Adds guild to database when bot joins and syncs commands."""
log.info(f"Joined guild: {guild.name} ({guild.id})")
if settings_manager and settings_manager.pg_pool:
try:
async with settings_manager.pg_pool.acquire() as conn:
await conn.execute("INSERT INTO guilds (guild_id) VALUES ($1) ON CONFLICT DO NOTHING;", guild.id)
log.info(f"Added guild {guild.id} to database.")
# Sync commands for the new guild
try:
log.info(f"Syncing commands for new guild: {guild.name} ({guild.id})")
synced = await command_customization.register_guild_commands(bot, guild)
log.info(f"Synced {len(synced)} commands for guild {guild.id}")
except Exception as e:
log.exception(f"Failed to sync commands for new guild {guild.id}: {e}")
except Exception as e:
log.exception(f"Failed to add guild {guild.id} to database on join.")
else:

View File

@ -116,10 +116,47 @@ async def initialize_database():
FOREIGN KEY (guild_id) REFERENCES guilds(guild_id) ON DELETE CASCADE
);
""")
# Command Customization table - Stores guild-specific command names
await conn.execute("""
CREATE TABLE IF NOT EXISTS command_customization (
guild_id BIGINT NOT NULL,
original_command_name TEXT NOT NULL,
custom_command_name TEXT NOT NULL,
PRIMARY KEY (guild_id, original_command_name),
FOREIGN KEY (guild_id) REFERENCES guilds(guild_id) ON DELETE CASCADE
);
""")
# Command Group Customization table - Stores guild-specific command group names
await conn.execute("""
CREATE TABLE IF NOT EXISTS command_group_customization (
guild_id BIGINT NOT NULL,
original_group_name TEXT NOT NULL,
custom_group_name TEXT NOT NULL,
PRIMARY KEY (guild_id, original_group_name),
FOREIGN KEY (guild_id) REFERENCES guilds(guild_id) ON DELETE CASCADE
);
""")
# Command Aliases table - Stores additional aliases for commands
await conn.execute("""
CREATE TABLE IF NOT EXISTS command_aliases (
guild_id BIGINT NOT NULL,
original_command_name TEXT NOT NULL,
alias_name TEXT NOT NULL,
PRIMARY KEY (guild_id, original_command_name, alias_name),
FOREIGN KEY (guild_id) REFERENCES guilds(guild_id) ON DELETE CASCADE
);
""")
# Consider adding indexes later for performance on large tables
# await conn.execute("CREATE INDEX IF NOT EXISTS idx_guild_settings_guild ON guild_settings (guild_id);")
# await conn.execute("CREATE INDEX IF NOT EXISTS idx_enabled_cogs_guild ON enabled_cogs (guild_id);")
# await conn.execute("CREATE INDEX IF NOT EXISTS idx_command_permissions_guild ON command_permissions (guild_id);")
# await conn.execute("CREATE INDEX IF NOT EXISTS idx_command_customization_guild ON command_customization (guild_id);")
# await conn.execute("CREATE INDEX IF NOT EXISTS idx_command_group_customization_guild ON command_group_customization (guild_id);")
# await conn.execute("CREATE INDEX IF NOT EXISTS idx_command_aliases_guild ON command_aliases (guild_id);")
log.info("Database schema initialization complete.")
@ -562,3 +599,350 @@ async def get_bot_guild_ids() -> set[int] | None:
except Exception as e:
log.exception("Database error fetching bot guild IDs.")
return None
# --- Command Customization Functions ---
async def get_custom_command_name(guild_id: int, original_command_name: str) -> str | None:
"""Gets the custom command name for a guild, checking cache first.
Returns None if no custom name is set."""
if not pg_pool or not redis_pool:
log.warning(f"Pools not initialized, returning None for custom command name '{original_command_name}'.")
return None
cache_key = _get_redis_key(guild_id, "cmd_custom", original_command_name)
try:
cached_value = await redis_pool.get(cache_key)
if cached_value is not None:
log.debug(f"Cache hit for custom command name '{original_command_name}' (Guild: {guild_id})")
return None if cached_value == "__NONE__" else cached_value
except Exception as e:
log.exception(f"Redis error getting custom command name for '{original_command_name}' (Guild: {guild_id}): {e}")
log.debug(f"Cache miss for custom command name '{original_command_name}' (Guild: {guild_id})")
async with pg_pool.acquire() as conn:
custom_name = await conn.fetchval(
"SELECT custom_command_name FROM command_customization WHERE guild_id = $1 AND original_command_name = $2",
guild_id, original_command_name
)
# Cache the result (even if None)
try:
value_to_cache = custom_name if custom_name is not None else "__NONE__"
await redis_pool.set(cache_key, value_to_cache, ex=3600) # Cache for 1 hour
except Exception as e:
log.exception(f"Redis error setting cache for custom command name '{original_command_name}' (Guild: {guild_id}): {e}")
return custom_name
async def set_custom_command_name(guild_id: int, original_command_name: str, custom_command_name: str | None) -> bool:
"""Sets a custom command name for a guild and updates the cache.
Setting custom_command_name to None removes the customization."""
if not pg_pool or not redis_pool:
log.error(f"Pools not initialized, cannot set custom command name for '{original_command_name}'.")
return False
cache_key = _get_redis_key(guild_id, "cmd_custom", original_command_name)
try:
async with pg_pool.acquire() as conn:
# Ensure guild exists
await conn.execute("INSERT INTO guilds (guild_id) VALUES ($1) ON CONFLICT (guild_id) DO NOTHING;", guild_id)
if custom_command_name is not None:
# Upsert the custom name
await conn.execute(
"""
INSERT INTO command_customization (guild_id, original_command_name, custom_command_name)
VALUES ($1, $2, $3)
ON CONFLICT (guild_id, original_command_name) DO UPDATE SET custom_command_name = $3;
""",
guild_id, original_command_name, custom_command_name
)
# Update cache
await redis_pool.set(cache_key, custom_command_name, ex=3600)
log.info(f"Set custom command name for '{original_command_name}' to '{custom_command_name}' for guild {guild_id}")
else:
# Delete the customization if value is None
await conn.execute(
"DELETE FROM command_customization WHERE guild_id = $1 AND original_command_name = $2",
guild_id, original_command_name
)
# Update cache to indicate no customization
await redis_pool.set(cache_key, "__NONE__", ex=3600)
log.info(f"Removed custom command name for '{original_command_name}' for guild {guild_id}")
return True
except Exception as e:
log.exception(f"Database or Redis error setting custom command name for '{original_command_name}' in guild {guild_id}: {e}")
# Attempt to invalidate cache on error
try:
await redis_pool.delete(cache_key)
except Exception as redis_err:
log.exception(f"Failed to invalidate Redis cache for custom command name '{original_command_name}' (Guild: {guild_id}): {redis_err}")
return False
async def get_custom_group_name(guild_id: int, original_group_name: str) -> str | None:
"""Gets the custom command group name for a guild, checking cache first.
Returns None if no custom name is set."""
if not pg_pool or not redis_pool:
log.warning(f"Pools not initialized, returning None for custom group name '{original_group_name}'.")
return None
cache_key = _get_redis_key(guild_id, "group_custom", original_group_name)
try:
cached_value = await redis_pool.get(cache_key)
if cached_value is not None:
log.debug(f"Cache hit for custom group name '{original_group_name}' (Guild: {guild_id})")
return None if cached_value == "__NONE__" else cached_value
except Exception as e:
log.exception(f"Redis error getting custom group name for '{original_group_name}' (Guild: {guild_id}): {e}")
log.debug(f"Cache miss for custom group name '{original_group_name}' (Guild: {guild_id})")
async with pg_pool.acquire() as conn:
custom_name = await conn.fetchval(
"SELECT custom_group_name FROM command_group_customization WHERE guild_id = $1 AND original_group_name = $2",
guild_id, original_group_name
)
# Cache the result (even if None)
try:
value_to_cache = custom_name if custom_name is not None else "__NONE__"
await redis_pool.set(cache_key, value_to_cache, ex=3600) # Cache for 1 hour
except Exception as e:
log.exception(f"Redis error setting cache for custom group name '{original_group_name}' (Guild: {guild_id}): {e}")
return custom_name
async def set_custom_group_name(guild_id: int, original_group_name: str, custom_group_name: str | None) -> bool:
"""Sets a custom command group name for a guild and updates the cache.
Setting custom_group_name to None removes the customization."""
if not pg_pool or not redis_pool:
log.error(f"Pools not initialized, cannot set custom group name for '{original_group_name}'.")
return False
cache_key = _get_redis_key(guild_id, "group_custom", original_group_name)
try:
async with pg_pool.acquire() as conn:
# Ensure guild exists
await conn.execute("INSERT INTO guilds (guild_id) VALUES ($1) ON CONFLICT (guild_id) DO NOTHING;", guild_id)
if custom_group_name is not None:
# Upsert the custom name
await conn.execute(
"""
INSERT INTO command_group_customization (guild_id, original_group_name, custom_group_name)
VALUES ($1, $2, $3)
ON CONFLICT (guild_id, original_group_name) DO UPDATE SET custom_group_name = $3;
""",
guild_id, original_group_name, custom_group_name
)
# Update cache
await redis_pool.set(cache_key, custom_group_name, ex=3600)
log.info(f"Set custom group name for '{original_group_name}' to '{custom_group_name}' for guild {guild_id}")
else:
# Delete the customization if value is None
await conn.execute(
"DELETE FROM command_group_customization WHERE guild_id = $1 AND original_group_name = $2",
guild_id, original_group_name
)
# Update cache to indicate no customization
await redis_pool.set(cache_key, "__NONE__", ex=3600)
log.info(f"Removed custom group name for '{original_group_name}' for guild {guild_id}")
return True
except Exception as e:
log.exception(f"Database or Redis error setting custom group name for '{original_group_name}' in guild {guild_id}: {e}")
# Attempt to invalidate cache on error
try:
await redis_pool.delete(cache_key)
except Exception as redis_err:
log.exception(f"Failed to invalidate Redis cache for custom group name '{original_group_name}' (Guild: {guild_id}): {redis_err}")
return False
async def add_command_alias(guild_id: int, original_command_name: str, alias_name: str) -> bool:
"""Adds an alias for a command in a guild and invalidates cache."""
if not pg_pool or not redis_pool:
log.error(f"Pools not initialized, cannot add alias for command '{original_command_name}'.")
return False
cache_key = _get_redis_key(guild_id, "cmd_aliases", original_command_name)
try:
async with pg_pool.acquire() as conn:
# Ensure guild exists
await conn.execute("INSERT INTO guilds (guild_id) VALUES ($1) ON CONFLICT (guild_id) DO NOTHING;", guild_id)
# Add the alias
await conn.execute(
"""
INSERT INTO command_aliases (guild_id, original_command_name, alias_name)
VALUES ($1, $2, $3)
ON CONFLICT (guild_id, original_command_name, alias_name) DO NOTHING;
""",
guild_id, original_command_name, alias_name
)
# Invalidate cache after DB operation succeeds
await redis_pool.delete(cache_key)
log.info(f"Added alias '{alias_name}' for command '{original_command_name}' in guild {guild_id}")
return True
except Exception as e:
log.exception(f"Database or Redis error adding alias for command '{original_command_name}' in guild {guild_id}: {e}")
# Attempt to invalidate cache even on error
try:
await redis_pool.delete(cache_key)
except Exception as redis_err:
log.exception(f"Failed to invalidate Redis cache for command aliases '{original_command_name}' (Guild: {guild_id}): {redis_err}")
return False
async def remove_command_alias(guild_id: int, original_command_name: str, alias_name: str) -> bool:
"""Removes an alias for a command in a guild and invalidates cache."""
if not pg_pool or not redis_pool:
log.error(f"Pools not initialized, cannot remove alias for command '{original_command_name}'.")
return False
cache_key = _get_redis_key(guild_id, "cmd_aliases", original_command_name)
try:
async with pg_pool.acquire() as conn:
# Remove the alias
await conn.execute(
"""
DELETE FROM command_aliases
WHERE guild_id = $1 AND original_command_name = $2 AND alias_name = $3;
""",
guild_id, original_command_name, alias_name
)
# Invalidate cache after DB operation succeeds
await redis_pool.delete(cache_key)
log.info(f"Removed alias '{alias_name}' for command '{original_command_name}' in guild {guild_id}")
return True
except Exception as e:
log.exception(f"Database or Redis error removing alias for command '{original_command_name}' in guild {guild_id}: {e}")
# Attempt to invalidate cache even on error
try:
await redis_pool.delete(cache_key)
except Exception as redis_err:
log.exception(f"Failed to invalidate Redis cache for command aliases '{original_command_name}' (Guild: {guild_id}): {redis_err}")
return False
async def get_command_aliases(guild_id: int, original_command_name: str) -> list[str] | None:
"""Gets the list of aliases for a command in a guild, checking cache first.
Returns empty list if no aliases are set, None on error."""
if not pg_pool or not redis_pool:
log.warning(f"Pools not initialized, returning None for command aliases '{original_command_name}'.")
return None
cache_key = _get_redis_key(guild_id, "cmd_aliases", original_command_name)
try:
# Check cache first
cached_aliases = await redis_pool.lrange(cache_key, 0, -1)
if cached_aliases is not None:
if len(cached_aliases) == 1 and cached_aliases[0] == "__EMPTY_LIST__":
log.debug(f"Cache hit (empty list) for command aliases '{original_command_name}' (Guild: {guild_id}).")
return []
log.debug(f"Cache hit for command aliases '{original_command_name}' (Guild: {guild_id})")
return cached_aliases
except Exception as e:
log.exception(f"Redis error getting command aliases for '{original_command_name}' (Guild: {guild_id}): {e}")
# Fall through to DB query on Redis error
log.debug(f"Cache miss for command aliases '{original_command_name}' (Guild: {guild_id})")
try:
async with pg_pool.acquire() as conn:
records = await conn.fetch(
"SELECT alias_name FROM command_aliases WHERE guild_id = $1 AND original_command_name = $2",
guild_id, original_command_name
)
aliases = [record['alias_name'] for record in records]
# Cache the result
try:
async with redis_pool.pipeline(transaction=True) as pipe:
pipe.delete(cache_key) # Ensure clean state
if aliases:
pipe.rpush(cache_key, *aliases)
else:
pipe.rpush(cache_key, "__EMPTY_LIST__") # Marker for empty list
pipe.expire(cache_key, 3600) # Cache for 1 hour
await pipe.execute()
except Exception as e:
log.exception(f"Redis error setting cache for command aliases '{original_command_name}' (Guild: {guild_id}): {e}")
return aliases
except Exception as e:
log.exception(f"Database error getting command aliases for '{original_command_name}' (Guild: {guild_id}): {e}")
return None # Indicate error
async def get_all_command_customizations(guild_id: int) -> dict[str, str] | None:
"""Gets all command customizations for a guild.
Returns a dictionary mapping original command names to custom names, or None on error."""
if not pg_pool:
log.error("Pools not initialized, cannot get command customizations.")
return None
try:
async with pg_pool.acquire() as conn:
records = await conn.fetch(
"SELECT original_command_name, custom_command_name FROM command_customization WHERE guild_id = $1",
guild_id
)
customizations = {record['original_command_name']: record['custom_command_name'] for record in records}
log.debug(f"Fetched {len(customizations)} command customizations for guild {guild_id}.")
return customizations
except Exception as e:
log.exception(f"Database error fetching command customizations for guild {guild_id}: {e}")
return None
async def get_all_group_customizations(guild_id: int) -> dict[str, str] | None:
"""Gets all command group customizations for a guild.
Returns a dictionary mapping original group names to custom names, or None on error."""
if not pg_pool:
log.error("Pools not initialized, cannot get group customizations.")
return None
try:
async with pg_pool.acquire() as conn:
records = await conn.fetch(
"SELECT original_group_name, custom_group_name FROM command_group_customization WHERE guild_id = $1",
guild_id
)
customizations = {record['original_group_name']: record['custom_group_name'] for record in records}
log.debug(f"Fetched {len(customizations)} group customizations for guild {guild_id}.")
return customizations
except Exception as e:
log.exception(f"Database error fetching group customizations for guild {guild_id}: {e}")
return None
async def get_all_command_aliases(guild_id: int) -> dict[str, list[str]] | None:
"""Gets all command aliases for a guild.
Returns a dictionary mapping original command names to lists of aliases, or None on error."""
if not pg_pool:
log.error("Pools not initialized, cannot get command aliases.")
return None
try:
async with pg_pool.acquire() as conn:
records = await conn.fetch(
"SELECT original_command_name, alias_name FROM command_aliases WHERE guild_id = $1",
guild_id
)
# Group by original_command_name
aliases_dict = {}
for record in records:
cmd_name = record['original_command_name']
alias = record['alias_name']
if cmd_name not in aliases_dict:
aliases_dict[cmd_name] = []
aliases_dict[cmd_name].append(alias)
log.debug(f"Fetched aliases for {len(aliases_dict)} commands for guild {guild_id}.")
return aliases_dict
except Exception as e:
log.exception(f"Database error fetching command aliases for guild {guild_id}: {e}")
return None