Compare commits

..

1 Commits

Author SHA1 Message Date
27c69065ae
Refactor system check to components 2025-06-14 16:43:37 +00:00
5 changed files with 307 additions and 619 deletions

View File

@ -1183,6 +1183,7 @@ async def auth(
<p>You have successfully authenticated with Discord.</p>
<div class="info">
<p>You can now close this window and return to Discord.</p>
<p>Your Discord bot is now authorized to access the API on your behalf.</p>
</div>
</body>
</html>

View File

@ -82,7 +82,7 @@ class OAuthCog(commands.Cog):
if token:
# Token is available locally, send a success message
await channel.send(
f"<@{user_id}> ✅ Authentication successful!"
f"<@{user_id}> ✅ Authentication successful! You can now use the API."
)
return
@ -116,7 +116,7 @@ class OAuthCog(commands.Cog):
user_id, token_data
)
await channel.send(
f"<@{user_id}> ✅ Authentication successful!"
f"<@{user_id}> ✅ Authentication successful! You can now use the API."
)
return
except Exception as e:
@ -140,7 +140,8 @@ class OAuthCog(commands.Cog):
# Send a DM to the user
try:
await discord_user.send(
f"✅ Authentication successful! You are now logged in as {user_info.get('username')}#{user_info.get('discriminator')}."
f"✅ Authentication successful! You are now logged in as {user_info.get('username')}#{user_info.get('discriminator')}.\n"
f"Your Discord bot is now authorized to access the API on your behalf."
)
except discord.errors.Forbidden:
# If we can't send a DM, try to find the channel where the auth command was used
@ -153,17 +154,8 @@ class OAuthCog(commands.Cog):
# Remove the pending auth entry
self.pending_auth.pop(user_id, None)
@commands.hybrid_group(name="auth", description="Manage Discord authentication.")
async def auth(self, ctx: commands.Context):
"""Manage Discord authentication."""
if ctx.invoked_subcommand is None:
await ctx.send_help(ctx.command)
@auth.command(
name="login",
description="Authenticate with Discord to allow the bot to access the API on your behalf.",
)
async def login(self, ctx: commands.Context):
@commands.command(name="auth")
async def auth_command(self, ctx):
"""Authenticate with Discord to allow the bot to access the API on your behalf."""
user_id = str(ctx.author.id)
@ -174,7 +166,7 @@ class OAuthCog(commands.Cog):
is_valid, _ = await discord_oauth.validate_token(token)
if is_valid:
await ctx.send(
f"You are already authenticated. Use `{ctx.prefix}auth logout` to revoke access or `{ctx.prefix}auth status` to check your status."
f"You are already authenticated. Use `!deauth` to revoke access or `!authstatus` to check your status."
)
return
@ -250,10 +242,8 @@ class OAuthCog(commands.Cog):
f"This link will expire in 10 minutes."
)
@auth.command(
name="logout", description="Revoke the bot's access to your Discord account."
)
async def logout(self, ctx: commands.Context):
@commands.command(name="deauth")
async def deauth_command(self, ctx):
"""Revoke the bot's access to your Discord account."""
user_id = str(ctx.author.id)
@ -286,8 +276,8 @@ class OAuthCog(commands.Cog):
else:
await ctx.send("❌ You are not currently authenticated.")
@auth.command(name="status", description="Check your authentication status.")
async def status(self, ctx: commands.Context):
@commands.command(name="authstatus")
async def auth_status_command(self, ctx):
"""Check your authentication status."""
user_id = str(ctx.author.id)
@ -306,7 +296,7 @@ class OAuthCog(commands.Cog):
await ctx.send(
f"✅ You are authenticated as {username}#{discriminator}.\n"
f"The bot can access any scopes granted by this token."
f"The bot can access the API on your behalf."
)
return
except discord_oauth.OAuthError:
@ -360,7 +350,7 @@ class OAuthCog(commands.Cog):
await ctx.send(
f"✅ You are authenticated as {username}#{discriminator}.\n"
f"The bot can access any scopes you allowed when authenticating.\n"
f"The bot can access the API on your behalf.\n"
f"(Token retrieved from API service)"
)
return
@ -378,9 +368,38 @@ class OAuthCog(commands.Cog):
# If we get here, the user is not authenticated anywhere
await ctx.send(
f"❌ You are not currently authenticated. Use `{ctx.prefix}auth login` to authenticate."
"❌ You are not currently authenticated. Use `!auth` to authenticate."
)
@commands.command(name="authhelp")
async def auth_help_command(self, ctx):
"""Get help with authentication commands."""
embed = discord.Embed(
title="Authentication Help",
description="Commands for managing Discord authentication",
color=discord.Color.blue(),
)
async def setup(bot: commands.Bot):
embed.add_field(
name="!auth",
value="Authenticate with Discord to allow the bot to access the API on your behalf",
inline=False,
)
embed.add_field(
name="!deauth",
value="Revoke the bot's access to your Discord account",
inline=False,
)
embed.add_field(
name="!authstatus", value="Check your authentication status", inline=False
)
embed.add_field(name="!authhelp", value="Show this help message", inline=False)
await ctx.send(embed=embed)
async def setup(bot):
await bot.add_cog(OAuthCog(bot))

View File

@ -5,263 +5,229 @@ import time
import psutil
import platform
import GPUtil
import distro
import distro # Ensure this is installed
# Import wmi for Windows motherboard info
try:
import wmi
WMI_AVAILABLE = True
except ImportError:
WMI_AVAILABLE = False
def create_progress_bar(value: float, total: float, length: int = 10) -> str:
"""Creates a text-based progress bar."""
if total == 0:
percentage = 0
else:
percentage = value / total
filled_length = int(length * percentage)
bar = '' * filled_length + '' * (length - filled_length)
return f"[{bar}] {percentage:.1%}"
class SystemStatusView(ui.LayoutView):
"""
A view that displays system and bot statistics in a visually appealing way.
It uses components v2 for a modern look.
"""
def __init__(
self,
bot_user: discord.User,
guild_count: int,
user_count: int,
os_info: str,
distro_info: str,
hostname: str,
uptime: str,
motherboard_info: str,
cpu_name: str,
cpu_usage: float,
ram_used: int,
ram_total: int,
gpu_info: str,
requester: discord.User,
) -> None:
super().__init__(timeout=None)
# --- Store all data for the view ---
self.bot_user = bot_user
self.guild_count = guild_count
self.user_count = user_count
self.os_info = os_info
self.distro_info = distro_info
self.hostname = hostname
self.uptime = uptime
self.motherboard_info = motherboard_info
self.cpu_name = cpu_name
self.cpu_usage = cpu_usage
self.ram_used = ram_used
self.ram_total = ram_total
self.gpu_info = gpu_info
self.requester = requester
# --- Build the UI ---
self._build_ui()
def _create_aligned_block(self, data: dict) -> str:
"""Creates a neatly aligned text block inside a markdown code block."""
max_key_len = max(len(k) for k in data.keys())
lines = []
for key, value in data.items():
lines.append(f"{key.ljust(max_key_len)} : {value}")
return "```\n" + "\n".join(lines) + "\n```"
def _build_ui(self):
"""Constructs the UI elements of the view."""
# Main container with Discord's "Blurple" color
container = ui.Container(accent_colour=None)
# --- Bot & System Info ---
self._add_bot_system_info(container)
# --- Hardware Info ---
self._add_hardware_info(container)
# --- Footer ---
self._add_footer(container)
self.add_item(container)
def _add_bot_system_info(self, container: ui.Container):
"""Adds bot and system information fields."""
bot_data = {
"Servers": self.guild_count,
"Users": self.user_count
}
container.add_item(ui.TextDisplay("**🤖 Bot Info**" + self._create_aligned_block(bot_data)))
system_data = {
"OS": f"{self.os_info}{self.distro_info}",
"Hostname": self.hostname,
"Uptime": self.uptime
}
container.add_item(ui.TextDisplay("**🖥️ System Info**" + self._create_aligned_block(system_data)))
def _add_hardware_info(self, container: ui.Container):
"""Adds hardware information with progress bars."""
container.add_item(ui.Separator(spacing=discord.SeparatorSpacing.small))
ram_usage_text = f"{self.ram_used // (1024**2):,}MB / {self.ram_total // (1024**2):,}MB"
cpu_bar = create_progress_bar(self.cpu_usage, 100.0)
ram_bar = create_progress_bar(self.ram_used, self.ram_total)
hardware_data = {
"CPU Usage": cpu_bar,
"RAM Usage": f"{ram_bar}\n{''.ljust(len('RAM Usage'))}{ram_usage_text}",
"Board": self.motherboard_info,
"CPU": self.cpu_name,
"GPU": self.gpu_info
}
container.add_item(ui.TextDisplay("**⚙️ Hardware Info**" + self._create_aligned_block(hardware_data)))
def _add_footer(self, container: ui.Container):
"""Adds the footer with timestamp and requester info."""
container.add_item(ui.Separator(spacing=discord.SeparatorSpacing.small))
timestamp = discord.utils.format_dt(discord.utils.utcnow(), style="R")
footer_text = f"Updated: {timestamp} | Requested by: {self.requester.display_name}"
container.add_item(ui.TextDisplay(f"_{footer_text}_"))
class SystemCheckCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
async def _build_system_check_view(self, context_or_interaction) -> SystemStatusView:
"""Gathers all system data and returns the constructed view."""
async def _system_check_logic(self, context_or_interaction):
"""Return detailed bot and system information as a LayoutView."""
# Bot information
bot_user = self.bot.user
guild_count = len(self.bot.guilds)
# A more efficient way to get unique users, avoiding large member lists in memory
user_ids = {member.id for guild in self.bot.guilds for member in guild.members if not member.bot}
# More efficient member counting - use cached members when available
# This avoids API calls that can cause timeouts
user_ids = set()
for guild in self.bot.guilds:
try:
# Use members that are already cached
for member in guild.members:
if not member.bot:
user_ids.add(member.id)
except Exception as e:
print(f"Error counting members in guild {guild.name}: {e}")
user_count = len(user_ids)
# System information
system = platform.system()
os_info = f"{system} {platform.release()}"
hostname = platform.node()
distro_info_str = ""
distro_info_str = "" # Renamed variable
if system == "Linux":
try:
# Use distro library for better Linux distribution detection
distro_name = distro.name(pretty=True)
distro_info_str = f" ({distro_name})"
except Exception:
distro_info_str = "" # Fail silently
distro_info_str = f"\n**Distro:** {distro_name}"
except ImportError:
distro_info_str = "\n**Distro:** (Install 'distro' package for details)"
except Exception as e:
distro_info_str = f"\n**Distro:** (Error getting info: {e})"
elif system == "Windows":
# Add Windows version details if possible
try:
# Use a more reliable way to get Windows version
win_ver = platform.win32_ver()
os_info = f"Windows {win_ver[0]} {win_ver[2]}"
except Exception:
pass # Fail silently
uptime_seconds = time.time() - psutil.boot_time()
days, rem = divmod(uptime_seconds, 86400)
hours, rem = divmod(rem, 3600)
minutes, _ = divmod(rem, 60)
uptime_str = f"{int(days)}d {int(hours)}h {int(minutes)}m"
win_ver = platform.version() # e.g., '10.0.19041'
win_build = platform.win32_ver()[1] # e.g., '19041'
os_info = f"Windows {win_ver} (Build {win_build})"
except Exception as e:
print(f"Could not get detailed Windows version: {e}")
# Keep the basic os_info
# Hardware information
uptime_seconds = time.time() - psutil.boot_time()
days, remainder = divmod(uptime_seconds, 86400)
hours, remainder = divmod(remainder, 3600)
minutes, seconds = divmod(remainder, 60)
uptime_str = ""
if days > 0:
uptime_str += f"{int(days)}d "
uptime_str += f"{int(hours):02}:{int(minutes):02}:{int(seconds):02}"
uptime = uptime_str.strip()
# Hardware information - use a shorter interval for CPU usage
cpu_usage = psutil.cpu_percent(interval=0.1)
# Get CPU info with a timeout to prevent hanging
try:
cpu_name_base = "N/A"
if system == "Linux":
with open("/proc/cpuinfo") as f:
for line in f:
if line.startswith("model name"):
cpu_name_base = line.split(":")[1].strip()
break
else: # Windows or fallback
cpu_name_base = platform.processor() or "N/A"
# Use a simpler approach for CPU name to avoid potential slowdowns
if platform.system() == "Windows":
cpu_name_base = platform.processor()
elif platform.system() == "Linux":
try:
with open("/proc/cpuinfo", "r") as f:
for line in f:
if line.startswith("model name"):
cpu_name_base = line.split(":")[1].strip()
break
else:
cpu_name_base = "Unknown CPU"
except:
cpu_name_base = platform.processor() or "Unknown CPU"
else:
cpu_name_base = platform.processor() or "Unknown CPU"
physical_cores = psutil.cpu_count(logical=False)
total_threads = psutil.cpu_count(logical=True)
cpu_name = f"{cpu_name_base} ({physical_cores}C/{total_threads}T)"
except Exception:
except Exception as e:
print(f"Error getting CPU info: {e}")
cpu_name = "N/A"
# Get motherboard information
motherboard_info = self._get_motherboard_info()
memory = psutil.virtual_memory()
# GPU Information
ram_usage = f"{memory.used // (1024 ** 2)} MB / {memory.total // (1024 ** 2)} MB ({memory.percent}%)"
# GPU Information (using GPUtil for cross-platform consistency if available)
gpu_info_lines = []
try:
gpus = GPUtil.getGPUs()
if gpus:
# Format multi-GPU info on new lines for readability
gpu_info_lines = [f"{gpu.name} ({gpu.load*100:.1f}% Load)" for gpu in gpus]
for gpu in gpus:
gpu_info_lines.append(
f"{gpu.name} ({gpu.load*100:.1f}% Load, {gpu.memoryUsed:.0f}/{gpu.memoryTotal:.0f} MB VRAM)"
)
gpu_info = "\n".join(gpu_info_lines)
else:
gpu_info = "No dedicated GPU detected"
except Exception:
gpu_info = "N/A"
gpu_info = "No dedicated GPU detected by GPUtil."
except ImportError:
gpu_info = "GPUtil library not installed. Cannot get detailed GPU info."
except Exception as e:
print(f"Error getting GPU info via GPUtil: {e}")
gpu_info = f"Error retrieving GPU info: {e}"
# Determine user based on context type
user = context_or_interaction.author if isinstance(context_or_interaction, commands.Context) else context_or_interaction.user
# Determine user and avatar URL based on context type
if isinstance(context_or_interaction, commands.Context):
user = context_or_interaction.author
avatar_url = user.display_avatar.url
elif isinstance(context_or_interaction, discord.Interaction):
user = context_or_interaction.user
avatar_url = user.display_avatar.url
else:
# Fallback or handle error if needed
user = self.bot.user # Or some default
avatar_url = self.bot.user.display_avatar.url if self.bot.user else None
return SystemStatusView(
bot_user=self.bot.user,
guild_count=guild_count,
user_count=user_count,
os_info=os_info,
distro_info=distro_info_str,
hostname=hostname,
uptime=uptime_str,
motherboard_info=motherboard_info,
cpu_name=cpu_name,
cpu_usage=cpu_usage,
ram_used=memory.used,
ram_total=memory.total,
gpu_info=gpu_info,
requester=user,
)
view = ui.LayoutView(timeout=None)
container = ui.Container(accent_colour=discord.Color.blue())
view.add_item(container)
def _get_motherboard_info(self) -> str:
if bot_user:
header = ui.Section(
accessory=ui.Thumbnail(media=bot_user.display_avatar.url)
)
header.add_item(ui.TextDisplay("**📊 System Status**"))
container.add_item(header)
else:
container.add_item(ui.TextDisplay("**📊 System Status**"))
container.add_item(ui.Separator(spacing=discord.SeparatorSpacing.small))
if bot_user:
container.add_item(ui.TextDisplay(f"**Bot Name:** {bot_user.name}"))
container.add_item(ui.TextDisplay(f"**Bot ID:** {bot_user.id}"))
else:
container.add_item(ui.TextDisplay("Bot user information not available."))
container.add_item(ui.TextDisplay(f"**Servers:** {guild_count}"))
container.add_item(ui.TextDisplay(f"**Unique Users:** {user_count}"))
container.add_item(ui.Separator(spacing=discord.SeparatorSpacing.small))
container.add_item(ui.TextDisplay(f"**OS:** {os_info}{distro_info_str}"))
container.add_item(ui.TextDisplay(f"**Hostname:** {hostname}"))
container.add_item(ui.TextDisplay(f"**Uptime:** {uptime}"))
container.add_item(ui.Separator(spacing=discord.SeparatorSpacing.small))
container.add_item(ui.TextDisplay(f"**Device Model:** {motherboard_info}"))
container.add_item(ui.TextDisplay(f"**CPU:** {cpu_name}"))
container.add_item(ui.TextDisplay(f"**CPU Usage:** {cpu_usage}%"))
container.add_item(ui.TextDisplay(f"**RAM Usage:** {ram_usage}"))
container.add_item(ui.TextDisplay(f"**GPU Info:** {gpu_info}"))
container.add_item(ui.Separator(spacing=discord.SeparatorSpacing.small))
footer = discord.utils.format_dt(discord.utils.utcnow(), style="f")
if user:
footer += f" | Requested by: {user.display_name}"
container.add_item(ui.TextDisplay(footer))
return view
@commands.hybrid_command(
name="systemcheck", description="Check the bot and system status"
)
async def system_check(self, ctx: commands.Context):
"""Hybrid command for checking the bot and system status."""
if ctx.interaction:
await ctx.interaction.response.defer(thinking=True)
try:
view = await self._system_check_logic(ctx)
await ctx.reply(view=view)
except Exception as e:
print(f"Error in system_check command: {e}")
await ctx.reply(
f"An error occurred while checking system status: {e}",
ephemeral=bool(ctx.interaction),
)
def _get_motherboard_info(self):
"""Get motherboard information based on the operating system."""
system = platform.system()
try:
if system == "Windows" and WMI_AVAILABLE:
w = wmi.WMI()
board = w.Win32_BaseBoard()[0]
return f"{board.Manufacturer} {board.Product}"
if system == "Windows":
if WMI_AVAILABLE:
w = wmi.WMI()
for board in w.Win32_BaseBoard():
return f"{board.Manufacturer} {board.Product}"
return "WMI module not available"
elif system == "Linux":
# Check for product_name first, then fallback to board_name
# Read motherboard product name from sysfs
try:
with open("/sys/devices/virtual/dmi/id/product_name", "r") as f:
return f.read().strip()
product_name = f.read().strip()
return product_name if product_name else "Unknown motherboard"
except FileNotFoundError:
with open("/sys/devices/virtual/dmi/id/board_name", "r") as f:
return f.read().strip()
return "N/A"
except Exception:
return "N/A"
@commands.command(name="systemcheck", aliases=["status"])
async def system_check(self, ctx: commands.Context):
"""Check the bot and system status."""
view = await self._build_system_check_view(ctx)
await ctx.reply(view=view, mention_author=False)
@app_commands.command(name="systemcheck", description="Check the bot and system status")
async def system_check_slash(self, interaction: discord.Interaction):
"""Slash command version of system check."""
await interaction.response.defer(thinking=True, ephemeral=False)
view = await self._build_system_check_view(interaction)
await interaction.followup.send(view=view)
return "/sys/devices/virtual/dmi/id/product_name not found"
except Exception as e:
return f"Error reading motherboard info: {e}"
except Exception as e:
return f"Error: {str(e)}"
else:
return f"Unsupported OS: {system}"
except Exception as e:
print(f"Error getting motherboard info: {e}")
return "Error retrieving motherboard info"
async def setup(bot):

View File

@ -1,152 +1,68 @@
import discord
from discord.ext import commands
from discord import ui
import logging
import sys
import os
from datetime import datetime, timedelta, timezone
from typing import Literal, Optional
# Add the parent directory to sys.path to ensure settings_manager is accessible
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import settings_manager
from global_bot_accessor import get_bot_instance
log = logging.getLogger(__name__)
class WelcomeMessageView(ui.LayoutView):
def __init__(self, member: discord.Member, message: str, member_count: int):
super().__init__(timeout=None)
accent_color = member.color if member.color != discord.Color.default() else discord.Color.green()
container = ui.Container(accent_colour=accent_color)
account_age = datetime.now(timezone.utc) - member.created_at
if account_age.days >= 365:
years = account_age.days // 365
months = (account_age.days % 365) // 30
age_str = f"{years} year{'s' if years != 1 else ''}, {months} month{'s' if months != 1 else ''} ago"
elif account_age.days >= 30:
months = account_age.days // 30
age_str = f"{months} month{'s' if months != 1 else ''} ago"
else:
age_str = f"{account_age.days} days ago"
header_section = ui.Section(
accessory=ui.Thumbnail(
media=member.display_avatar.url,
description="New Member Avatar",
)
)
text_block = "\n".join([
"🎉 **Welcome to the Server!** 🎉",
message,
"",
f"**Member:** {member.display_name}",
f"**Account Created:** {member.created_at.strftime('%B %d, %Y')}",
f"**Account Age:** {age_str}",
"",
"📊 **Server Statistics**",
f"**Total Members:** {member_count:,}",
f"**You are member #{member_count:,}**",
"",
"💬 Feel free to introduce yourself and have fun!"
])
header_section.add_item(ui.TextDisplay(text_block))
container.add_item(header_section)
self.add_item(container)
class GoodbyeMessageView(ui.LayoutView):
def __init__(self, member: discord.Member, message: str, member_count: int, reason: str = "left"):
super().__init__(timeout=None)
if reason == "banned":
accent_color = discord.Color.red()
emoji = "🔨"
title = "Member Banned"
elif reason == "kicked":
accent_color = discord.Color.orange()
emoji = "👢"
title = "Member Kicked"
else:
accent_color = discord.Color.dark_grey()
emoji = "👋"
title = "Member Left"
container = ui.Container(accent_colour=accent_color)
header_section = ui.Section(
accessory=ui.Thumbnail(
media=member.display_avatar.url,
description="Former Member Avatar",
)
)
lines = [
f"{emoji} **{title}** {emoji}",
message,
"",
f"**Member:** {member.display_name}",
f"**Username:** {member.name}"
]
if hasattr(member, 'joined_at') and member.joined_at:
join_date = member.joined_at.strftime('%B %d, %Y')
time_in_server = datetime.now(timezone.utc) - member.joined_at
if time_in_server.days >= 365:
years = time_in_server.days // 365
months = (time_in_server.days % 365) // 30
duration_str = f"{years} year{'s' if years != 1 else ''}, {months} month{'s' if months != 1 else ''}"
elif time_in_server.days >= 30:
months = time_in_server.days // 30
duration_str = f"{months} month{'s' if months != 1 else ''}"
else:
duration_str = f"{time_in_server.days} day{'s' if time_in_server.days != 1 else ''}"
lines += [
f"**Joined:** {join_date}",
f"**Time in Server:** {duration_str}"
]
lines += [
"",
"📊 **Server Statistics**",
f"**Current Members:** {member_count:,}"
]
header_section.add_item(ui.TextDisplay("\n".join(lines)))
container.add_item(header_section)
self.add_item(container)
class WelcomeCog(commands.Cog):
"""Handles welcome and goodbye messages for guilds."""
def __init__(self, bot: commands.Bot):
self.bot = bot
print("WelcomeCog: Initializing and registering event listeners")
# Check existing event listeners
print(
f"WelcomeCog: Bot event listeners before registration: {self.bot.extra_events}"
)
# Register event listeners
self.bot.add_listener(self.on_member_join, "on_member_join")
self.bot.add_listener(self.on_member_remove, "on_member_remove")
# Check if event listeners were registered
print(
f"WelcomeCog: Bot event listeners after registration: {self.bot.extra_events}"
)
print("WelcomeCog: Event listeners registered")
async def on_member_join(self, member: discord.Member):
"""Sends a welcome message when a new member joins."""
print(f"WelcomeCog: on_member_join event triggered for {member.name}")
guild = member.guild
if not guild:
print(f"WelcomeCog: Guild not found for member {member.name}")
return
log.debug(f"Member {member.name} joined guild {guild.name} ({guild.id})")
print(
f"WelcomeCog: Member {member.name} joined guild {guild.name} ({guild.id})"
)
# --- Fetch settings ---
print(f"WelcomeCog: Fetching welcome settings for guild {guild.id}")
welcome_channel_id_str = await settings_manager.get_setting(
guild.id, "welcome_channel_id"
)
welcome_message_template = await settings_manager.get_setting(
guild.id, "welcome_message", default="Welcome {user} to {server}!"
)
print(
f"WelcomeCog: Retrieved settings - channel_id: {welcome_channel_id_str}, message: {welcome_message_template}"
)
# Handle the "__NONE__" marker for potentially unset values
if not welcome_channel_id_str or welcome_channel_id_str == "__NONE__":
log.debug(f"Welcome channel not configured for guild {guild.id}")
print(f"WelcomeCog: Welcome channel not configured for guild {guild.id}")
return
try:
@ -156,23 +72,21 @@ class WelcomeCog(commands.Cog):
log.warning(
f"Welcome channel ID {welcome_channel_id} not found or not text channel in guild {guild.id}"
)
# Maybe remove the setting here if the channel is invalid?
return
# --- Format and send message ---
# Basic formatting, can be expanded
formatted_message = welcome_message_template.format(
user=member.mention, username=member.name, server=guild.name
)
# Get current member count
member_count = guild.member_count or len(guild.members)
view = WelcomeMessageView(member, formatted_message, member_count)
await channel.send(view=view)
await channel.send(formatted_message)
log.info(f"Sent welcome message for {member.name} in guild {guild.id}")
except ValueError as e:
except ValueError:
log.error(
f"ValueError in WelcomeCog for guild {guild.id}: {e}"
f"Invalid welcome_channel_id '{welcome_channel_id_str}' configured for guild {guild.id}"
)
except discord.Forbidden:
log.error(
@ -182,23 +96,32 @@ class WelcomeCog(commands.Cog):
log.exception(f"Error sending welcome message for guild {guild.id}: {e}")
async def on_member_remove(self, member: discord.Member):
"""Sends a goodbye message when a member leaves, is kicked, or is banned."""
"""Sends a goodbye message when a member leaves."""
print(f"WelcomeCog: on_member_remove event triggered for {member.name}")
guild = member.guild
if not guild:
print(f"WelcomeCog: Guild not found for member {member.name}")
return
log.debug(f"Member {member.name} left guild {guild.name} ({guild.id})")
print(f"WelcomeCog: Member {member.name} left guild {guild.name} ({guild.id})")
# --- Fetch settings ---
print(f"WelcomeCog: Fetching goodbye settings for guild {guild.id}")
goodbye_channel_id_str = await settings_manager.get_setting(
guild.id, "goodbye_channel_id"
)
goodbye_message_template = await settings_manager.get_setting(
guild.id, "goodbye_message", default="{username} has left the server."
)
print(
f"WelcomeCog: Retrieved settings - channel_id: {goodbye_channel_id_str}, message: {goodbye_message_template}"
)
# Handle the "__NONE__" marker
if not goodbye_channel_id_str or goodbye_channel_id_str == "__NONE__":
log.debug(f"Goodbye channel not configured for guild {guild.id}")
print(f"WelcomeCog: Goodbye channel not configured for guild {guild.id}")
return
try:
@ -210,70 +133,16 @@ class WelcomeCog(commands.Cog):
)
return
# --- Determine reason for leaving ---
reason = "left"
entry_user = None
# Check audit log for kick or ban. We check last 2 minutes just in case of delays.
try:
# Check for ban first
async for entry in guild.audit_logs(
limit=1,
action=discord.AuditLogAction.ban,
after=datetime.now(timezone.utc) - timedelta(minutes=2),
):
if entry.target and entry.target.id == member.id:
reason = "banned"
entry_user = entry.user
break
# If not banned, check for kick
if reason == "left":
async for entry in guild.audit_logs(
limit=1,
action=discord.AuditLogAction.kick,
after=datetime.now(timezone.utc) - timedelta(minutes=2),
):
if entry.target and entry.target.id == member.id:
reason = "kicked"
entry_user = entry.user
break
except discord.Forbidden:
log.warning(
f"Missing 'View Audit Log' permissions in guild {guild.id} to determine member remove reason."
)
except Exception as e:
log.error(
f"Error checking audit log for {member.name} in {guild.id}: {e}"
)
# --- Format and send message ---
if reason == "left":
formatted_message = goodbye_message_template.format(
user=member.mention, # Might not be mentionable after leaving
username=member.name,
server=guild.name,
)
elif reason == "kicked":
formatted_message = f"**{member.name}** was kicked from the server"
if entry_user and entry_user != self.bot.user:
formatted_message += f" by **{entry_user.name}**"
formatted_message += "."
else: # banned
formatted_message = f"**{member.name}** was banned from the server"
if entry_user and entry_user != self.bot.user:
formatted_message += f" by **{entry_user.name}**"
formatted_message += "."
# Get current member count
member_count = guild.member_count or len(guild.members)
view = GoodbyeMessageView(member, formatted_message, member_count, reason)
await channel.send(view=view)
log.info(
f"Sent goodbye message for {member.name} in guild {guild.id} (Reason: {reason})"
formatted_message = goodbye_message_template.format(
user=member.mention, # Might not be mentionable after leaving
username=member.name,
server=guild.name,
)
await channel.send(formatted_message)
log.info(f"Sent goodbye message for {member.name} in guild {guild.id}")
except ValueError:
log.error(
f"Invalid goodbye_channel_id '{goodbye_channel_id_str}' configured for guild {guild.id}"
@ -312,27 +181,14 @@ class WelcomeCog(commands.Cog):
)
if success_channel and success_message: # Both need to succeed
embed = discord.Embed(
title="✅ Welcome Messages Configured",
description=f"Welcome messages will now be sent to {channel.mention}",
color=discord.Color.green()
await ctx.send(
f"Welcome messages will now be sent to {channel.mention} with the template:\n```\n{message_template}\n```"
)
embed.add_field(
name="Message Template",
value=f"```\n{message_template}\n```",
inline=False
)
embed.add_field(
name="Available Variables",
value="`{user}` - Mentions the user\n`{username}` - User's name\n`{server}` - Server name",
inline=False
)
await ctx.send(embed=embed)
log.info(
f"Welcome settings updated for guild {guild_id} by {ctx.author.name}"
)
else:
await ctx.send("Failed to save welcome settings. Check logs.")
await ctx.send("Failed to save welcome settings. Check logs.")
log.error(f"Failed to save welcome settings for guild {guild_id}")
@commands.command(
@ -347,21 +203,20 @@ class WelcomeCog(commands.Cog):
key_message = "welcome_message" # Also clear the message template
# Use set_setting with None to delete the settings
success_channel = await settings_manager.set_setting(guild_id, key_channel, None)
success_message = await settings_manager.set_setting(guild_id, key_message, None)
success_channel = await settings_manager.set_setting(
guild_id, key_channel, None
)
success_message = await settings_manager.set_setting(
guild_id, key_message, None
)
if success_channel and success_message: # Both need to succeed
embed = discord.Embed(
title="✅ Welcome Messages Disabled",
description="Welcome messages have been disabled for this server.",
color=discord.Color.orange()
)
await ctx.send(embed=embed)
await ctx.send("Welcome messages have been disabled.")
log.info(
f"Welcome messages disabled for guild {guild_id} by {ctx.author.name}"
)
else:
await ctx.send("Failed to disable welcome messages. Check logs.")
await ctx.send("Failed to disable welcome messages. Check logs.")
log.error(f"Failed to disable welcome settings for guild {guild_id}")
@commands.command(
@ -391,32 +246,14 @@ class WelcomeCog(commands.Cog):
)
if success_channel and success_message: # Both need to succeed
embed = discord.Embed(
title="✅ Goodbye Messages Configured",
description=f"Goodbye messages will now be sent to {channel.mention}",
color=discord.Color.green()
await ctx.send(
f"Goodbye messages will now be sent to {channel.mention} with the template:\n```\n{message_template}\n```"
)
embed.add_field(
name="Message Template",
value=f"```\n{message_template}\n```",
inline=False
)
embed.add_field(
name="Available Variables",
value="`{user}` - Mentions the user (may not work after leaving)\n`{username}` - User's name\n`{server}` - Server name",
inline=False
)
embed.add_field(
name="Note",
value="Kick and ban messages will override the template with automatic formatting.",
inline=False
)
await ctx.send(embed=embed)
log.info(
f"Goodbye settings updated for guild {guild_id} by {ctx.author.name}"
)
else:
await ctx.send("Failed to save goodbye settings. Check logs.")
await ctx.send("Failed to save goodbye settings. Check logs.")
log.error(f"Failed to save goodbye settings for guild {guild_id}")
@commands.command(
@ -431,190 +268,50 @@ class WelcomeCog(commands.Cog):
key_message = "goodbye_message"
# Use set_setting with None to delete the settings
success_channel = await settings_manager.set_setting(guild_id, key_channel, None)
success_message = await settings_manager.set_setting(guild_id, key_message, None)
success_channel = await settings_manager.set_setting(
guild_id, key_channel, None
)
success_message = await settings_manager.set_setting(
guild_id, key_message, None
)
if success_channel and success_message: # Both need to succeed
embed = discord.Embed(
title="✅ Goodbye Messages Disabled",
description="Goodbye messages have been disabled for this server.",
color=discord.Color.orange()
)
await ctx.send(embed=embed)
await ctx.send("Goodbye messages have been disabled.")
log.info(
f"Goodbye messages disabled for guild {guild_id} by {ctx.author.name}"
)
else:
await ctx.send("Failed to disable goodbye messages. Check logs.")
await ctx.send("Failed to disable goodbye messages. Check logs.")
log.error(f"Failed to disable goodbye settings for guild {guild_id}")
# --- Test Commands ---
@commands.group(
name="testmessage",
help="Test the welcome or goodbye messages.",
invoke_without_command=True,
)
@commands.has_permissions(administrator=True)
@commands.guild_only()
async def testmessage(self, ctx: commands.Context):
"""Shows help for the testmessage command group."""
await ctx.send_help(ctx.command)
@testmessage.command(name="welcome")
async def test_welcome(
self, ctx: commands.Context, member: Optional[discord.Member] = None
):
"""Simulates a member joining to test the welcome message."""
target_member = member or ctx.author
await self.on_member_join(target_member)
embed = discord.Embed(
title="🧪 Test Message Sent",
description=f"Simulated welcome message for {target_member.mention}. Check the configured welcome channel.",
color=discord.Color.blue()
)
await ctx.send(embed=embed, ephemeral=True)
@testmessage.command(name="goodbye")
async def test_goodbye(
self,
ctx: commands.Context,
reason: Literal["left", "kicked", "banned"] = "left",
member: Optional[discord.Member] = None,
):
"""Simulates a member leaving to test the goodbye message."""
target_member = member or ctx.author
guild = ctx.guild
# --- Fetch settings ---
goodbye_channel_id_str = await settings_manager.get_setting(
guild.id, "goodbye_channel_id"
)
goodbye_message_template = await settings_manager.get_setting(
guild.id, "goodbye_message", default="{username} has left the server."
)
if not goodbye_channel_id_str or goodbye_channel_id_str == "__NONE__":
embed = discord.Embed(
title="❌ Configuration Error",
description="Goodbye message channel is not configured.",
color=discord.Color.red()
)
await ctx.send(embed=embed, ephemeral=True)
return
try:
goodbye_channel_id = int(goodbye_channel_id_str)
channel = guild.get_channel(goodbye_channel_id)
if not channel or not isinstance(channel, discord.TextChannel):
embed = discord.Embed(
title="❌ Configuration Error",
description="Configured goodbye channel not found or is not a text channel.",
color=discord.Color.red()
)
await ctx.send(embed=embed, ephemeral=True)
return
# --- Format and send message based on simulated reason ---
if reason == "left":
formatted_message = goodbye_message_template.format(
user=target_member.mention,
username=target_member.name,
server=guild.name,
)
elif reason == "kicked":
formatted_message = (
f"**{target_member.name}** was kicked from the server by **{ctx.author.name}**."
)
else: # banned
formatted_message = (
f"**{target_member.name}** was banned from the server by **{ctx.author.name}**."
)
# Get current member count
member_count = guild.member_count or len(guild.members)
view = GoodbyeMessageView(target_member, formatted_message, member_count, reason)
await channel.send(view=view)
embed = discord.Embed(
title="🧪 Test Message Sent",
description=f"Simulated goodbye message for {target_member.mention} (Reason: {reason}). Check the configured goodbye channel.",
color=discord.Color.blue()
)
await ctx.send(embed=embed, ephemeral=True)
except ValueError:
embed = discord.Embed(
title="❌ Configuration Error",
description="Invalid goodbye channel ID configured.",
color=discord.Color.red()
)
await ctx.send(embed=embed, ephemeral=True)
except discord.Forbidden:
embed = discord.Embed(
title="❌ Permission Error",
description="I don't have permissions to send messages in the configured goodbye channel.",
color=discord.Color.red()
)
await ctx.send(embed=embed, ephemeral=True)
except Exception as e:
log.exception(
f"Error sending test goodbye message for guild {guild.id}: {e}"
)
embed = discord.Embed(
title="❌ Unexpected Error",
description="An unexpected error occurred. Check the logs for details.",
color=discord.Color.red()
)
await ctx.send(embed=embed, ephemeral=True)
# Error Handling for this Cog
async def cog_command_error(self, ctx: commands.Context, error: commands.CommandError):
"""Handles errors for all commands in this cog."""
@set_welcome.error
@disable_welcome.error
@set_goodbye.error
@disable_goodbye.error
async def on_command_error(self, ctx: commands.Context, error):
if isinstance(error, commands.MissingPermissions):
embed = discord.Embed(
title="❌ Missing Permissions",
description="You need Administrator permissions to use this command.",
color=discord.Color.red()
)
await ctx.send(embed=embed, ephemeral=True)
await ctx.send("You need Administrator permissions to use this command.")
elif isinstance(error, commands.BadArgument):
embed = discord.Embed(
title="❌ Invalid Argument",
description=f"Invalid argument provided. Check the command help: `{ctx.prefix}help {ctx.command.name}`",
color=discord.Color.red()
await ctx.send(
f"Invalid argument provided. Check the command help: `{ctx.prefix}help {ctx.command.name}`"
)
await ctx.send(embed=embed, ephemeral=True)
elif isinstance(error, commands.MissingRequiredArgument):
embed = discord.Embed(
title="❌ Missing Argument",
description=f"Missing required argument. Check the command help: `{ctx.prefix}help {ctx.command.name}`",
color=discord.Color.red()
await ctx.send(
f"Missing required argument. Check the command help: `{ctx.prefix}help {ctx.command.name}`"
)
await ctx.send(embed=embed, ephemeral=True)
elif isinstance(error, commands.NoPrivateMessage):
embed = discord.Embed(
title="❌ Server Only",
description="This command cannot be used in private messages.",
color=discord.Color.red()
)
await ctx.send(embed=embed, ephemeral=True)
await ctx.send("This command cannot be used in private messages.")
else:
original_error = getattr(error, 'original', error)
log.error(
f"Unhandled error in WelcomeCog command '{ctx.command.name}': {original_error}"
f"Unhandled error in WelcomeCog command '{ctx.command.name}': {error}"
)
embed = discord.Embed(
title="❌ Unexpected Error",
description="An unexpected error occurred. Please check the logs for details.",
color=discord.Color.red()
)
await ctx.send(embed=embed, ephemeral=True)
await ctx.send("An unexpected error occurred. Please check the logs.")
async def setup(bot: commands.Bot):
# Ensure bot has pools initialized before adding the cog
print("WelcomeCog setup function called!")
if (
not hasattr(bot, "pg_pool")
or not hasattr(bot, "redis")
@ -624,8 +321,12 @@ async def setup(bot: commands.Bot):
log.warning(
"Bot pools not initialized before loading WelcomeCog. Cog will not load."
)
print("WelcomeCog: Bot pools not initialized. Cannot load cog.")
return # Prevent loading if pools are missing
welcome_cog = WelcomeCog(bot)
await bot.add_cog(welcome_cog)
log.info("WelcomeCog loaded.")
print(
f"WelcomeCog loaded! Event listeners registered: on_member_join, on_member_remove"
)
log.info("WelcomeCog loaded.")

View File

@ -76,6 +76,7 @@ async def handle_oauth_callback(request: web.Request) -> web.Response:
<p>You have successfully authenticated with Discord.</p>
<div class="info">
<p>You can now close this window and return to Discord.</p>
<p>Your Discord bot is now authorized to access the API on your behalf.</p>
</div>
</body>
</html>