This commit is contained in:
Slipstream 2025-05-03 16:41:44 -06:00
parent 6c97d123a5
commit 1500e9d768
Signed by: slipstream
GPG Key ID: 13E498CE010AC6FD
4 changed files with 1780 additions and 41 deletions

View File

@ -1,6 +1,7 @@
import discord
from discord import ui # Added for views/buttons
from discord.ext import commands, tasks
from discord import app_commands
import asyncio
import yt_dlp as youtube_dl
import logging
@ -28,7 +29,7 @@ YDL_OPTS_BASE = {
'default_search': 'ytsearch', # Default to YouTube search
'source_address': '0.0.0.0', # Bind to all IPs for better connectivity
'cookiefile': 'cookies.txt'
}
FFMPEG_OPTIONS = {
@ -59,13 +60,42 @@ class Song:
else:
return f"{minutes:02d}:{seconds:02d}"
class AudioCog(commands.Cog):
class AudioCog(commands.Cog, name="Audio"):
"""Cog for audio playback commands"""
def __init__(self, bot):
self.bot = bot
self.queues = {} # Dictionary to hold queues per guild {guild_id: deque()}
self.current_song = {} # Dictionary for current song per guild {guild_id: Song}
self.voice_clients = {} # Dictionary for voice clients per guild {guild_id: discord.VoiceClient}
self.play_next_song.start() # Start the background task
# Create the main command group for this cog
self.audio_group = app_commands.Group(
name="audio",
description="Audio playback commands"
)
# Create subgroups
self.playback_group = app_commands.Group(
name="playback",
description="Playback control commands",
parent=self.audio_group
)
self.queue_group = app_commands.Group(
name="queue",
description="Queue management commands",
parent=self.audio_group
)
# Register commands
self.register_commands()
# Add command groups to the bot's tree
self.bot.tree.add_command(self.audio_group)
# Start the background task
self.play_next_song.start()
def get_queue(self, guild_id):
"""Gets the queue for a guild, creating it if it doesn't exist."""
@ -88,6 +118,102 @@ class AudioCog(commands.Cog):
asyncio.create_task(vc.disconnect(force=True))
log.info(f"Cleaned up resources for guild {guild_id}")
def register_commands(self):
"""Register all commands for this cog"""
# --- Playback Group Commands ---
# Play command
play_command = app_commands.Command(
name="play",
description="Play a song or add it to the queue",
callback=self.audio_play_callback,
parent=self.playback_group
)
self.playback_group.add_command(play_command)
# Pause command
pause_command = app_commands.Command(
name="pause",
description="Pause the current playback",
callback=self.audio_pause_callback,
parent=self.playback_group
)
self.playback_group.add_command(pause_command)
# Resume command
resume_command = app_commands.Command(
name="resume",
description="Resume paused playback",
callback=self.audio_resume_callback,
parent=self.playback_group
)
self.playback_group.add_command(resume_command)
# Skip command
skip_command = app_commands.Command(
name="skip",
description="Skip the current song",
callback=self.audio_skip_callback,
parent=self.playback_group
)
self.playback_group.add_command(skip_command)
# Stop command
stop_command = app_commands.Command(
name="stop",
description="Stop playback and clear the queue",
callback=self.audio_stop_callback,
parent=self.playback_group
)
self.playback_group.add_command(stop_command)
# --- Queue Group Commands ---
# List command
list_command = app_commands.Command(
name="list",
description="Display the current song queue",
callback=self.audio_queue_list_callback,
parent=self.queue_group
)
self.queue_group.add_command(list_command)
# Clear command
clear_command = app_commands.Command(
name="clear",
description="Clear the song queue",
callback=self.audio_queue_clear_callback,
parent=self.queue_group
)
self.queue_group.add_command(clear_command)
# --- Main Audio Group Commands ---
# Join command
join_command = app_commands.Command(
name="join",
description="Join your voice channel",
callback=self.audio_join_callback,
parent=self.audio_group
)
self.audio_group.add_command(join_command)
# Leave command
leave_command = app_commands.Command(
name="leave",
description="Leave the voice channel",
callback=self.audio_leave_callback,
parent=self.audio_group
)
self.audio_group.add_command(leave_command)
# Search command
search_command = app_commands.Command(
name="search",
description="Search for songs on YouTube",
callback=self.audio_search_callback,
parent=self.audio_group
)
self.audio_group.add_command(search_command)
async def cog_unload(self):
"""Cog unload cleanup."""
self.play_next_song.cancel()
@ -225,7 +351,7 @@ class AudioCog(commands.Cog):
async def _ensure_voice_connection(self, ctx_or_interaction):
"""Ensures the bot is connected to the user's voice channel. Accepts Context or Interaction."""
is_interaction = isinstance(ctx_or_interaction, discord.Interaction)
if is_interaction:
guild = ctx_or_interaction.guild
author = ctx_or_interaction.user
@ -264,7 +390,325 @@ class AudioCog(commands.Cog):
return vc
# --- Commands ---
# --- Command Callbacks ---
# Audio group callbacks
async def audio_join_callback(self, interaction: discord.Interaction):
"""Callback for /audio join command"""
try:
await self._ensure_voice_connection(interaction)
await interaction.response.send_message(f"Connected to **{interaction.user.voice.channel.name}**.")
except commands.CommandError as e:
await interaction.response.send_message(str(e), ephemeral=True)
except Exception as e:
log.error(f"Error in join command: {e}")
await interaction.response.send_message("An unexpected error occurred while trying to join.", ephemeral=True)
async def audio_leave_callback(self, interaction: discord.Interaction):
"""Callback for /audio leave command"""
vc = self.voice_clients.get(interaction.guild.id)
if not vc or not vc.is_connected():
await interaction.response.send_message("I am not connected to a voice channel.", ephemeral=True)
return
log.info(f"Disconnecting from voice channel in guild {interaction.guild.id}")
await interaction.response.send_message(f"Disconnecting from **{vc.channel.name}**.")
self.cleanup(interaction.guild.id) # This handles the disconnect and queue clearing
async def audio_search_callback(self, interaction: discord.Interaction, query: str, max_results: int = 10):
"""Callback for /audio search command"""
# Defer the response since search might take time
await interaction.response.defer(ephemeral=True)
try:
# Perform the search
results = await self._search_youtube(query, max_results)
if not results:
await interaction.followup.send("No results found for your search query.", ephemeral=True)
return
# Create a formatted list of results
result_list = []
for i, result in enumerate(results):
title = result.get('title', 'Unknown Title')
duration = result.get('duration')
duration_str = "N/A"
if duration:
minutes, seconds = divmod(duration, 60)
hours, minutes = divmod(minutes, 60)
if hours > 0:
duration_str = f"{hours:02d}:{minutes:02d}:{seconds:02d}"
else:
duration_str = f"{minutes:02d}:{seconds:02d}"
result_list.append(f"**{i+1}.** {title} ({duration_str})")
# Create an embed with the results
embed = discord.Embed(
title=f"Search Results for '{query}'",
description="\n".join(result_list),
color=discord.Color.blue()
)
# Create a view with buttons to select a result
view = SearchResultsView(self, interaction.user, results)
# Send the results
await interaction.followup.send(embed=embed, view=view, ephemeral=True)
except Exception as e:
log.error(f"Error in search command: {e}")
await interaction.followup.send(f"An error occurred while searching: {e}", ephemeral=True)
# Playback group callbacks
async def audio_play_callback(self, interaction: discord.Interaction, query: str):
"""Callback for /audio playback play command"""
# Defer the response since this might take time
await interaction.response.defer()
try:
vc = await self._ensure_voice_connection(interaction)
except commands.CommandError as e:
await interaction.followup.send(str(e), ephemeral=True)
return
except Exception as e:
log.error(f"Error ensuring voice connection in play command: {e}")
await interaction.followup.send("An unexpected error occurred before playing.", ephemeral=True)
return
queue = self.get_queue(interaction.guild.id)
songs_added = 0
playlist_title = None
song_to_announce = None # Store the single song if added
try:
# info now contains full data for playlist or single video
info, is_playlist, is_search = await self._extract_info(query)
if not info:
await interaction.followup.send("Could not find anything matching your query.", ephemeral=True)
return
if is_playlist:
playlist_title = info.get('title', 'Unnamed Playlist')
log.info(f"Adding playlist '{playlist_title}' to queue for guild {interaction.guild.id}")
entries = info.get('entries', []) # Should contain full entry info now
if not entries:
await interaction.followup.send(f"Playlist '{playlist_title}' seems to be empty or could not be loaded.", ephemeral=True)
return
for entry in entries:
if not entry: continue
# Extract stream URL directly from the entry info
stream_url = entry.get('url') # yt-dlp often provides the best stream URL here
if not stream_url: # Fallback to formats if needed
formats = entry.get('formats', [])
for f in formats:
# Prioritize opus or known good audio codecs
if f.get('url') and f.get('acodec') != 'none' and (f.get('vcodec') == 'none' or f.get('acodec') == 'opus'):
stream_url = f['url']
break
# Last resort fallback if still no URL
if not stream_url and formats:
for f in formats:
if f.get('url') and f.get('acodec') != 'none':
stream_url = f['url']
break
if not stream_url:
log.warning(f"Could not find playable stream URL for playlist entry: {entry.get('title', entry.get('id'))}")
await interaction.followup.send(f"⚠️ Could not get audio for '{entry.get('title', 'an item')}' from playlist.", ephemeral=True)
continue
try:
song = Song(
source_url=stream_url,
title=entry.get('title', 'Unknown Title'),
webpage_url=entry.get('webpage_url', entry.get('original_url')), # Use original_url as fallback
duration=entry.get('duration'),
requested_by=interaction.user
)
queue.append(song)
songs_added += 1
except Exception as song_e:
log.error(f"Error creating Song object for entry {entry.get('title', entry.get('id'))}: {song_e}")
await interaction.followup.send(f"⚠️ Error processing metadata for '{entry.get('title', 'an item')}' from playlist.", ephemeral=True)
else: # Single video or search result
# 'info' should be the dictionary for the single video here
stream_url = info.get('url')
if not stream_url: # Fallback if 'url' isn't top-level
formats = info.get('formats', [])
for f in formats:
# Prioritize opus or known good audio codecs
if f.get('url') and f.get('acodec') != 'none' and (f.get('vcodec') == 'none' or f.get('acodec') == 'opus'):
stream_url = f['url']
break
# Last resort fallback if still no URL
if not stream_url and formats:
for f in formats:
if f.get('url') and f.get('acodec') != 'none':
stream_url = f['url']
break
if not stream_url:
await interaction.followup.send("Could not extract a playable audio stream for the video.", ephemeral=True)
return
song = Song(
source_url=stream_url,
title=info.get('title', 'Unknown Title'),
webpage_url=info.get('webpage_url'),
duration=info.get('duration'),
requested_by=interaction.user
)
queue.append(song)
songs_added = 1
song_to_announce = song # Store for announcement
log.info(f"Added song '{song.title}' to queue for guild {interaction.guild.id}")
except commands.CommandError as e:
await interaction.followup.send(str(e), ephemeral=True)
return
except Exception as e:
log.exception(f"Error during song processing in play command: {e}") # Log full traceback
await interaction.followup.send("An unexpected error occurred while processing your request.", ephemeral=True)
return
# --- Send confirmation message ---
if songs_added > 0:
if is_playlist:
await interaction.followup.send(f"✅ Added **{songs_added}** songs from playlist **'{playlist_title}'** to the queue.")
elif song_to_announce: # Check if a single song was added
# For single adds, show position if queue was not empty before adding
queue_pos = len(queue) # Position is the current length (after adding)
if vc.is_playing() or vc.is_paused() or queue_pos > 1: # If something playing or queue had items before this add
await interaction.followup.send(f"✅ Added **{song_to_announce.title}** to the queue (position #{queue_pos}).")
else:
# If nothing was playing and queue was empty, this song will play next
# The loop will handle the "Now Playing" implicitly, so just confirm add
await interaction.followup.send(f"✅ Added **{song_to_announce.title}** to the queue.")
# No need to explicitly start playback here, the loop handles it.
else:
# This case might happen if playlist extraction failed for all entries or search failed
if not is_playlist and is_search:
# If it was a search and nothing was added, the earlier message handles it
pass # Already sent "Could not find anything..."
else:
await interaction.followup.send("Could not add any songs from the provided source.", ephemeral=True)
async def audio_pause_callback(self, interaction: discord.Interaction):
"""Callback for /audio playback pause command"""
vc = self.voice_clients.get(interaction.guild.id)
if not vc or not vc.is_playing():
await interaction.response.send_message("I am not playing anything right now.", ephemeral=True)
return
if vc.is_paused():
await interaction.response.send_message("Playback is already paused.", ephemeral=True)
return
vc.pause()
await interaction.response.send_message("⏸️ Playback paused.")
log.info(f"Playback paused in guild {interaction.guild.id}")
async def audio_resume_callback(self, interaction: discord.Interaction):
"""Callback for /audio playback resume command"""
vc = self.voice_clients.get(interaction.guild.id)
if not vc or not vc.is_connected():
await interaction.response.send_message("I am not connected to a voice channel.", ephemeral=True)
return
if not vc.is_paused():
await interaction.response.send_message("Playback is not paused.", ephemeral=True)
return
vc.resume()
await interaction.response.send_message("▶️ Playback resumed.")
log.info(f"Playback resumed in guild {interaction.guild.id}")
async def audio_skip_callback(self, interaction: discord.Interaction):
"""Callback for /audio playback skip command"""
vc = self.voice_clients.get(interaction.guild.id)
if not vc or not vc.is_playing():
await interaction.response.send_message("I am not playing anything to skip.", ephemeral=True)
return
current = self.get_current_song(interaction.guild.id)
await interaction.response.send_message(f"⏭️ Skipping **{current.title if current else 'the current song'}**...")
vc.stop() # Triggers the 'after' callback, which lets the loop play the next song
log.info(f"Song skipped in guild {interaction.guild.id} by {interaction.user}")
# The loop will handle playing the next song
async def audio_stop_callback(self, interaction: discord.Interaction):
"""Callback for /audio playback stop command"""
vc = self.voice_clients.get(interaction.guild.id)
if not vc or not vc.is_connected():
await interaction.response.send_message("I am not connected to a voice channel.", ephemeral=True)
return
queue = self.get_queue(interaction.guild.id)
queue.clear()
self.current_song[interaction.guild.id] = None # Clear current song immediately
if vc.is_playing() or vc.is_paused():
vc.stop() # Stop playback
await interaction.response.send_message("⏹️ Playback stopped and queue cleared.")
log.info(f"Playback stopped and queue cleared in guild {interaction.guild.id} by {interaction.user}")
else:
await interaction.response.send_message("⏹️ Queue cleared.") # If nothing was playing, just confirm queue clear
log.info(f"Queue cleared in guild {interaction.guild.id} by {interaction.user} (nothing was playing).")
# Queue group callbacks
async def audio_queue_list_callback(self, interaction: discord.Interaction):
"""Callback for /audio queue list command"""
queue = self.get_queue(interaction.guild.id)
current = self.get_current_song(interaction.guild.id)
if not queue and not current:
await interaction.response.send_message("The queue is empty and nothing is playing.", ephemeral=True)
return
# Create an embed for the queue
embed = discord.Embed(
title="Music Queue",
color=discord.Color.blue()
)
# Add the current song
if current:
embed.add_field(
name="Now Playing",
value=f"{current} - Requested by {current.requested_by.mention}",
inline=False
)
# Add the queue
if queue:
queue_text = ""
for i, song in enumerate(queue):
queue_text += f"**{i+1}.** {song} - Requested by {song.requested_by.mention}\n"
embed.add_field(
name="Queue",
value=queue_text if queue_text else "The queue is empty.",
inline=False
)
await interaction.response.send_message(embed=embed)
async def audio_queue_clear_callback(self, interaction: discord.Interaction):
"""Callback for /audio queue clear command"""
queue = self.get_queue(interaction.guild.id)
if not queue:
await interaction.response.send_message("The queue is already empty.", ephemeral=True)
return
queue.clear()
await interaction.response.send_message("✅ Queue cleared.")
log.info(f"Queue cleared in guild {interaction.guild.id} by {interaction.user}")
# --- Legacy Commands ---
@commands.command(name="join", aliases=['connect'])
async def join(self, ctx: commands.Context):
@ -616,7 +1060,12 @@ class AudioCog(commands.Cog):
# pass # Let global handler take care of it unless specific handling is needed
async def setup(bot):
await bot.add_cog(AudioCog(bot))
"""Set up the AudioCog with the bot."""
print("Setting up AudioCog...")
cog = AudioCog(bot)
await bot.add_cog(cog)
print(f"AudioCog setup complete with command groups: {[cmd.name for cmd in bot.tree.get_commands() if cmd.name == 'audio']}")
print(f"Available subgroups: {[group.name for group in cog.audio_group.walk_commands() if isinstance(group, app_commands.Group)]}")
log.info("AudioCog loaded successfully.")
# --- Paginated Search Result View ---
@ -694,7 +1143,7 @@ class PaginatedSearchResultView(ui.View):
value=f"[{uploader}]({url}) | `{duration_fmt}`",
inline=False
)
embed.set_footer(text=f"Showing results {start_index + 1}-{end_index} of {len(self.results)}")
return embed

View File

@ -1,16 +1,53 @@
import discord
from discord.ext import commands
from discord import app_commands
import logging
import asyncio
import random
import datetime
from typing import Optional
# Import command classes and db functions from submodules
from .economy.database import init_db, close_db # Import close_db
from .economy.database import init_db, close_db, get_balance, update_balance, set_cooldown, check_cooldown
from .economy.database import get_user_job, set_user_job, remove_user_job, get_available_jobs, get_leaderboard
from .economy.earning import EarningCommands
from .economy.gambling import GamblingCommands
from .economy.utility import UtilityCommands
from .economy.risky import RiskyCommands
from .economy.jobs import JobsCommands # Import the new JobsCommands
# Create a database object for function calls
class DatabaseWrapper:
async def get_balance(self, user_id):
return await get_balance(user_id)
async def update_balance(self, user_id, amount):
return await update_balance(user_id, amount)
async def set_cooldown(self, user_id, command_name):
return await set_cooldown(user_id, command_name)
async def check_cooldown(self, user_id, command_name):
return await check_cooldown(user_id, command_name)
async def get_user_job(self, user_id):
return await get_user_job(user_id)
async def set_user_job(self, user_id, job_name):
return await set_user_job(user_id, job_name)
async def remove_user_job(self, user_id):
return await remove_user_job(user_id)
async def get_available_jobs(self):
return await get_available_jobs()
async def get_leaderboard(self, limit=10):
return await get_leaderboard(limit)
# Create an instance of the wrapper
database = DatabaseWrapper()
log = logging.getLogger(__name__)
# --- Main Cog Implementation ---
@ -32,7 +69,176 @@ class EconomyCog(
# If other parent cogs had complex __init__, we might need to call them explicitly,
# but in this case, they only store the bot instance, which super() handles.
self.bot = bot
log.info("EconomyCog initialized (combined).")
# Create the main command group for this cog
self.economy_group = app_commands.Group(
name="economy",
description="Economy system commands"
)
# Create subgroups
self.earning_group = app_commands.Group(
name="earning",
description="Commands for earning currency",
parent=self.economy_group
)
self.gambling_group = app_commands.Group(
name="gambling",
description="Gambling and games of chance",
parent=self.economy_group
)
self.utility_group = app_commands.Group(
name="utility",
description="Utility commands for the economy system",
parent=self.economy_group
)
self.risky_group = app_commands.Group(
name="risky",
description="High-risk, high-reward commands",
parent=self.economy_group
)
self.jobs_group = app_commands.Group(
name="jobs",
description="Job-related commands",
parent=self.economy_group
)
# Register commands
self.register_commands()
# Add command groups to the bot's tree
self.bot.tree.add_command(self.economy_group)
log.info("EconomyCog initialized with command groups.")
def register_commands(self):
"""Register all commands for this cog"""
# --- Earning Group Commands ---
# Daily command
daily_command = app_commands.Command(
name="daily",
description="Claim your daily reward",
callback=self.economy_daily_callback,
parent=self.earning_group
)
self.earning_group.add_command(daily_command)
# Beg command
beg_command = app_commands.Command(
name="beg",
description="Beg for some spare change",
callback=self.economy_beg_callback,
parent=self.earning_group
)
self.earning_group.add_command(beg_command)
# Work command
work_command = app_commands.Command(
name="work",
description="Do some work for a guaranteed reward",
callback=self.economy_work_callback,
parent=self.earning_group
)
self.earning_group.add_command(work_command)
# Scavenge command
scavenge_command = app_commands.Command(
name="scavenge",
description="Scavenge around for some spare change",
callback=self.economy_scavenge_callback,
parent=self.earning_group
)
self.earning_group.add_command(scavenge_command)
# --- Gambling Group Commands ---
# Coinflip command
coinflip_command = app_commands.Command(
name="coinflip",
description="Bet on a coin flip",
callback=self.economy_coinflip_callback,
parent=self.gambling_group
)
self.gambling_group.add_command(coinflip_command)
# Slots command
slots_command = app_commands.Command(
name="slots",
description="Play the slot machine",
callback=self.economy_slots_callback,
parent=self.gambling_group
)
self.gambling_group.add_command(slots_command)
# --- Utility Group Commands ---
# Balance command
balance_command = app_commands.Command(
name="balance",
description="Check your balance",
callback=self.economy_balance_callback,
parent=self.utility_group
)
self.utility_group.add_command(balance_command)
# Transfer command
transfer_command = app_commands.Command(
name="transfer",
description="Transfer money to another user",
callback=self.economy_transfer_callback,
parent=self.utility_group
)
self.utility_group.add_command(transfer_command)
# Leaderboard command
leaderboard_command = app_commands.Command(
name="leaderboard",
description="View the economy leaderboard",
callback=self.economy_leaderboard_callback,
parent=self.utility_group
)
self.utility_group.add_command(leaderboard_command)
# --- Risky Group Commands ---
# Rob command
rob_command = app_commands.Command(
name="rob",
description="Attempt to rob another user",
callback=self.economy_rob_callback,
parent=self.risky_group
)
self.risky_group.add_command(rob_command)
# --- Jobs Group Commands ---
# Apply command
apply_command = app_commands.Command(
name="apply",
description="Apply for a job",
callback=self.economy_apply_callback,
parent=self.jobs_group
)
self.jobs_group.add_command(apply_command)
# Quit command
quit_command = app_commands.Command(
name="quit",
description="Quit your current job",
callback=self.economy_quit_callback,
parent=self.jobs_group
)
self.jobs_group.add_command(quit_command)
# List command
list_command = app_commands.Command(
name="list",
description="List available jobs",
callback=self.economy_joblist_callback,
parent=self.jobs_group
)
self.jobs_group.add_command(list_command)
async def cog_load(self):
"""Called when the cog is loaded, ensures DB is initialized."""
@ -45,6 +251,582 @@ class EconomyCog(
# Prevent the cog from loading if DB init fails
raise commands.ExtensionFailed(self.qualified_name, e) from e
# --- Command Callbacks ---
# Earning group callbacks
async def economy_daily_callback(self, interaction: discord.Interaction):
"""Callback for /economy earning daily command"""
user_id = interaction.user.id
command_name = "daily"
cooldown_duration = datetime.timedelta(hours=24)
reward_amount = 100 # Example daily reward
last_used = await database.check_cooldown(user_id, command_name)
if last_used:
now_utc = datetime.datetime.now(datetime.timezone.utc)
# Ensure last_used is timezone-aware for comparison
if last_used.tzinfo is None:
last_used = last_used.replace(tzinfo=datetime.timezone.utc)
time_since_last_used = now_utc - last_used
if time_since_last_used < cooldown_duration:
time_left = cooldown_duration - time_since_last_used
hours, remainder = divmod(int(time_left.total_seconds()), 3600)
minutes, seconds = divmod(remainder, 60)
embed = discord.Embed(description=f"🕒 You've already claimed your daily reward. Try again in **{hours}h {minutes}m {seconds}s**.", color=discord.Color.orange())
await interaction.response.send_message(embed=embed, ephemeral=True)
return
# Not on cooldown or cooldown expired
await database.update_balance(user_id, reward_amount)
await database.set_cooldown(user_id, command_name)
current_balance = await database.get_balance(user_id)
embed = discord.Embed(
title="Daily Reward Claimed!",
description=f"🎉 You claimed your daily reward of **${reward_amount:,}**!",
color=discord.Color.green()
)
embed.add_field(name="New Balance", value=f"${current_balance:,}", inline=False)
await interaction.response.send_message(embed=embed)
async def economy_beg_callback(self, interaction: discord.Interaction):
"""Callback for /economy earning beg command"""
user_id = interaction.user.id
command_name = "beg"
cooldown_duration = datetime.timedelta(minutes=5) # 5-minute cooldown
success_chance = 0.4 # 40% chance of success
min_reward = 1
max_reward = 20
last_used = await database.check_cooldown(user_id, command_name)
if last_used:
now_utc = datetime.datetime.now(datetime.timezone.utc)
if last_used.tzinfo is None:
last_used = last_used.replace(tzinfo=datetime.timezone.utc)
time_since_last_used = now_utc - last_used
if time_since_last_used < cooldown_duration:
time_left = cooldown_duration - time_since_last_used
minutes, seconds = divmod(int(time_left.total_seconds()), 60)
embed = discord.Embed(description=f"🕒 You can't beg again so soon. Try again in **{minutes}m {seconds}s**.", color=discord.Color.orange())
await interaction.response.send_message(embed=embed, ephemeral=True)
return
# Set cooldown regardless of success/failure
await database.set_cooldown(user_id, command_name)
# Determine success
if random.random() < success_chance:
reward_amount = random.randint(min_reward, max_reward)
await database.update_balance(user_id, reward_amount)
current_balance = await database.get_balance(user_id)
embed = discord.Embed(
title="Begging Successful!",
description=f"🙏 Someone took pity on you! You received **${reward_amount:,}**.",
color=discord.Color.green()
)
embed.add_field(name="New Balance", value=f"${current_balance:,}", inline=False)
await interaction.response.send_message(embed=embed)
else:
embed = discord.Embed(
title="Begging Failed",
description="🤷 Nobody gave you anything. Better luck next time!",
color=discord.Color.red()
)
await interaction.response.send_message(embed=embed)
async def economy_work_callback(self, interaction: discord.Interaction):
"""Callback for /economy earning work command"""
user_id = interaction.user.id
command_name = "work"
cooldown_duration = datetime.timedelta(hours=1) # 1-hour cooldown
reward_amount = random.randint(15, 35) # Small reward range - This is now fallback if no job
# --- Check if user has a job ---
job_info = await database.get_user_job(user_id)
if job_info and job_info.get("name"):
job_key = job_info["name"]
command_to_use = f"`/economy jobs {job_key}`" # Updated command path
embed = discord.Embed(description=f"💼 You have a job! Use {command_to_use} instead of the generic work command.", color=discord.Color.blue())
await interaction.response.send_message(embed=embed, ephemeral=True)
return
# --- End Job Check ---
# Proceed with generic work only if no job
last_used = await database.check_cooldown(user_id, command_name)
if last_used:
now_utc = datetime.datetime.now(datetime.timezone.utc)
if last_used.tzinfo is None:
last_used = last_used.replace(tzinfo=datetime.timezone.utc)
time_since_last_used = now_utc - last_used
if time_since_last_used < cooldown_duration:
time_left = cooldown_duration - time_since_last_used
hours, remainder = divmod(int(time_left.total_seconds()), 3600)
minutes, seconds = divmod(remainder, 60)
embed = discord.Embed(description=f"🕒 You need to rest after working. Try again in **{hours}h {minutes}m {seconds}s**.", color=discord.Color.orange())
await interaction.response.send_message(embed=embed, ephemeral=True)
return
# Set cooldown and give reward
await database.set_cooldown(user_id, command_name)
await database.update_balance(user_id, reward_amount)
# Add some flavor text
work_messages = [
f"You worked hard and earned **${reward_amount:,}**!",
f"After a solid hour of work, you got **${reward_amount:,}**.",
f"Your efforts paid off! You received **${reward_amount:,}**.",
]
current_balance = await database.get_balance(user_id)
embed = discord.Embed(
title="Work Complete!",
description=random.choice(work_messages),
color=discord.Color.green()
)
embed.add_field(name="New Balance", value=f"${current_balance:,}", inline=False)
await interaction.response.send_message(embed=embed)
async def economy_scavenge_callback(self, interaction: discord.Interaction):
"""Callback for /economy earning scavenge command"""
user_id = interaction.user.id
command_name = "scavenge"
cooldown_duration = datetime.timedelta(minutes=30) # 30-minute cooldown
success_chance = 0.25 # 25% chance to find something
min_reward = 1
max_reward = 10
last_used = await database.check_cooldown(user_id, command_name)
if last_used:
now_utc = datetime.datetime.now(datetime.timezone.utc)
if last_used.tzinfo is None:
last_used = last_used.replace(tzinfo=datetime.timezone.utc)
time_since_last_used = now_utc - last_used
if time_since_last_used < cooldown_duration:
time_left = cooldown_duration - time_since_last_used
minutes, seconds = divmod(int(time_left.total_seconds()), 60)
embed = discord.Embed(description=f"🕒 You've searched recently. Try again in **{minutes}m {seconds}s**.", color=discord.Color.orange())
await interaction.response.send_message(embed=embed, ephemeral=True)
return
# Set cooldown regardless of success
await database.set_cooldown(user_id, command_name)
# Flavor text for scavenging
scavenge_locations = [
"under the sofa cushions", "in an old coat pocket", "behind the dumpster",
"in a dusty corner", "on the sidewalk", "in a forgotten drawer"
]
location = random.choice(scavenge_locations)
if random.random() < success_chance:
reward_amount = random.randint(min_reward, max_reward)
await database.update_balance(user_id, reward_amount)
current_balance = await database.get_balance(user_id)
embed = discord.Embed(
title="Scavenging Successful!",
description=f"🔍 You scavenged {location} and found **${reward_amount:,}**!",
color=discord.Color.green()
)
embed.add_field(name="New Balance", value=f"${current_balance:,}", inline=False)
await interaction.response.send_message(embed=embed)
else:
embed = discord.Embed(
title="Scavenging Failed",
description=f"🔍 You scavenged {location} but found nothing but lint.",
color=discord.Color.red()
)
await interaction.response.send_message(embed=embed)
# Gambling group callbacks
async def economy_coinflip_callback(self, interaction: discord.Interaction, bet: int, choice: app_commands.Choice[str]):
"""Callback for /economy gambling coinflip command"""
user_id = interaction.user.id
# Validate bet amount
if bet <= 0:
await interaction.response.send_message("❌ Your bet must be greater than 0.", ephemeral=True)
return
# Check if user has enough money
balance = await database.get_balance(user_id)
if bet > balance:
await interaction.response.send_message(f"❌ You don't have enough money. Your balance: ${balance:,}", ephemeral=True)
return
# Process the bet
result = "Heads" if random.random() < 0.5 else "Tails"
user_choice = choice.value
# Determine outcome
if result == user_choice:
# Win - double the bet
winnings = bet
await database.update_balance(user_id, winnings)
new_balance = await database.get_balance(user_id)
embed = discord.Embed(
title="Coinflip Win!",
description=f"The coin landed on **{result}**! You won **${winnings:,}**!",
color=discord.Color.green()
)
embed.add_field(name="New Balance", value=f"${new_balance:,}", inline=False)
else:
# Lose - subtract the bet
await database.update_balance(user_id, -bet)
new_balance = await database.get_balance(user_id)
embed = discord.Embed(
title="Coinflip Loss",
description=f"The coin landed on **{result}**. You lost **${bet:,}**.",
color=discord.Color.red()
)
embed.add_field(name="New Balance", value=f"${new_balance:,}", inline=False)
await interaction.response.send_message(embed=embed)
async def economy_slots_callback(self, interaction: discord.Interaction, bet: int):
"""Callback for /economy gambling slots command"""
user_id = interaction.user.id
# Validate bet amount
if bet <= 0:
await interaction.response.send_message("❌ Your bet must be greater than 0.", ephemeral=True)
return
# Check if user has enough money
balance = await database.get_balance(user_id)
if bet > balance:
await interaction.response.send_message(f"❌ You don't have enough money. Your balance: ${balance:,}", ephemeral=True)
return
# Define slot symbols and their payouts
symbols = ["🍒", "🍊", "🍋", "🍇", "🍉", "💎", "7"]
payouts = {
"🍒🍒🍒": 2, # 2x bet
"🍊🍊🍊": 3, # 3x bet
"🍋🍋🍋": 4, # 4x bet
"🍇🍇🍇": 5, # 5x bet
"🍉🍉🍉": 8, # 8x bet
"💎💎💎": 10, # 10x bet
"7⃣7⃣7": 20, # 20x bet
}
# Spin the slots
result = [random.choice(symbols) for _ in range(3)]
result_str = "".join(result)
# Check for win
win_multiplier = payouts.get(result_str, 0)
if win_multiplier > 0:
# Win
winnings = bet * win_multiplier
await database.update_balance(user_id, winnings - bet) # Subtract bet, add winnings
new_balance = await database.get_balance(user_id)
embed = discord.Embed(
title="🎰 Slots Win!",
description=f"[ {result[0]} | {result[1]} | {result[2]} ]\n\nYou won **${winnings:,}**! ({win_multiplier}x)",
color=discord.Color.green()
)
embed.add_field(name="New Balance", value=f"${new_balance:,}", inline=False)
else:
# Lose
await database.update_balance(user_id, -bet)
new_balance = await database.get_balance(user_id)
embed = discord.Embed(
title="🎰 Slots Loss",
description=f"[ {result[0]} | {result[1]} | {result[2]} ]\n\nYou lost **${bet:,}**.",
color=discord.Color.red()
)
embed.add_field(name="New Balance", value=f"${new_balance:,}", inline=False)
await interaction.response.send_message(embed=embed)
# Utility group callbacks
async def economy_balance_callback(self, interaction: discord.Interaction, user: discord.Member = None):
"""Callback for /economy utility balance command"""
target_user = user or interaction.user
user_id = target_user.id
balance = await database.get_balance(user_id)
if target_user == interaction.user:
embed = discord.Embed(
title="Your Balance",
description=f"💰 You have **${balance:,}**",
color=discord.Color.blue()
)
else:
embed = discord.Embed(
title=f"{target_user.display_name}'s Balance",
description=f"💰 {target_user.mention} has **${balance:,}**",
color=discord.Color.blue()
)
await interaction.response.send_message(embed=embed)
async def economy_transfer_callback(self, interaction: discord.Interaction, user: discord.Member, amount: int):
"""Callback for /economy utility transfer command"""
sender_id = interaction.user.id
receiver_id = user.id
# Validate transfer
if sender_id == receiver_id:
await interaction.response.send_message("❌ You can't transfer money to yourself.", ephemeral=True)
return
if amount <= 0:
await interaction.response.send_message("❌ Transfer amount must be greater than 0.", ephemeral=True)
return
# Check if sender has enough money
sender_balance = await database.get_balance(sender_id)
if amount > sender_balance:
await interaction.response.send_message(f"❌ You don't have enough money. Your balance: ${sender_balance:,}", ephemeral=True)
return
# Process transfer
await database.update_balance(sender_id, -amount)
await database.update_balance(receiver_id, amount)
# Get updated balances
new_sender_balance = await database.get_balance(sender_id)
embed = discord.Embed(
title="Transfer Complete",
description=f"💸 You sent **${amount:,}** to {user.mention}",
color=discord.Color.green()
)
embed.add_field(name="Your New Balance", value=f"${new_sender_balance:,}", inline=False)
await interaction.response.send_message(embed=embed)
async def economy_leaderboard_callback(self, interaction: discord.Interaction):
"""Callback for /economy utility leaderboard command"""
# Get top 10 users by balance
leaderboard_data = await database.get_leaderboard(limit=10)
if not leaderboard_data:
await interaction.response.send_message("No users found in the economy system yet.", ephemeral=True)
return
embed = discord.Embed(
title="Economy Leaderboard",
description="Top 10 richest users",
color=discord.Color.gold()
)
for i, (user_id, balance) in enumerate(leaderboard_data):
try:
user = await self.bot.fetch_user(user_id)
username = user.display_name
except:
username = f"User {user_id}"
medal = "🥇" if i == 0 else "🥈" if i == 1 else "🥉" if i == 2 else f"{i+1}."
embed.add_field(
name=f"{medal} {username}",
value=f"${balance:,}",
inline=False
)
await interaction.response.send_message(embed=embed)
# Risky group callbacks
async def economy_rob_callback(self, interaction: discord.Interaction, user: discord.Member):
"""Callback for /economy risky rob command"""
robber_id = interaction.user.id
victim_id = user.id
# Validate rob attempt
if robber_id == victim_id:
await interaction.response.send_message("❌ You can't rob yourself.", ephemeral=True)
return
# Check cooldown
command_name = "rob"
cooldown_duration = datetime.timedelta(hours=1)
last_used = await database.check_cooldown(robber_id, command_name)
if last_used:
now_utc = datetime.datetime.now(datetime.timezone.utc)
if last_used.tzinfo is None:
last_used = last_used.replace(tzinfo=datetime.timezone.utc)
time_since_last_used = now_utc - last_used
if time_since_last_used < cooldown_duration:
time_left = cooldown_duration - time_since_last_used
hours, remainder = divmod(int(time_left.total_seconds()), 3600)
minutes, seconds = divmod(remainder, 60)
embed = discord.Embed(description=f"🕒 You can't rob again so soon. Try again in **{hours}h {minutes}m {seconds}s**.", color=discord.Color.orange())
await interaction.response.send_message(embed=embed, ephemeral=True)
return
# Set cooldown regardless of outcome
await database.set_cooldown(robber_id, command_name)
# Get balances
robber_balance = await database.get_balance(robber_id)
victim_balance = await database.get_balance(victim_id)
# Minimum balance requirements
min_robber_balance = 50
min_victim_balance = 100
if robber_balance < min_robber_balance:
embed = discord.Embed(
title="Rob Failed",
description=f"❌ You need at least **${min_robber_balance}** to attempt a robbery.",
color=discord.Color.red()
)
await interaction.response.send_message(embed=embed)
return
if victim_balance < min_victim_balance:
embed = discord.Embed(
title="Rob Failed",
description=f"{user.mention} doesn't have enough money to be worth robbing.",
color=discord.Color.red()
)
await interaction.response.send_message(embed=embed)
return
# Determine success (30% chance)
success_chance = 0.3
if random.random() < success_chance:
# Success - steal 10-30% of victim's balance
steal_percent = random.uniform(0.1, 0.3)
steal_amount = int(victim_balance * steal_percent)
# Update balances
await database.update_balance(robber_id, steal_amount)
await database.update_balance(victim_id, -steal_amount)
# Get updated balance
new_robber_balance = await database.get_balance(robber_id)
embed = discord.Embed(
title="Rob Successful!",
description=f"💰 You successfully robbed {user.mention} and got away with **${steal_amount:,}**!",
color=discord.Color.green()
)
embed.add_field(name="Your New Balance", value=f"${new_robber_balance:,}", inline=False)
else:
# Failure - lose 10-20% of your balance as a fine
fine_percent = random.uniform(0.1, 0.2)
fine_amount = int(robber_balance * fine_percent)
# Update balance
await database.update_balance(robber_id, -fine_amount)
# Get updated balance
new_robber_balance = await database.get_balance(robber_id)
embed = discord.Embed(
title="Rob Failed",
description=f"🚔 You were caught trying to rob {user.mention} and had to pay a fine of **${fine_amount:,}**!",
color=discord.Color.red()
)
embed.add_field(name="Your New Balance", value=f"${new_robber_balance:,}", inline=False)
await interaction.response.send_message(embed=embed)
# Jobs group callbacks
async def economy_apply_callback(self, interaction: discord.Interaction, job: app_commands.Choice[str]):
"""Callback for /economy jobs apply command"""
user_id = interaction.user.id
job_name = job.value
# Check if user already has a job
current_job = await database.get_user_job(user_id)
if current_job and current_job.get("name"):
embed = discord.Embed(
description=f"❌ You already have a job as a {current_job['name']}. You must quit first before applying for a new job.",
color=discord.Color.red()
)
await interaction.response.send_message(embed=embed, ephemeral=True)
return
# Apply for the job
success = await database.set_user_job(user_id, job_name)
if success:
embed = discord.Embed(
title="Job Application Successful!",
description=f"🎉 Congratulations! You are now employed as a **{job_name}**.",
color=discord.Color.green()
)
embed.add_field(name="Next Steps", value=f"Use `/economy jobs {job_name}` to work at your new job!", inline=False)
else:
embed = discord.Embed(
title="Job Application Failed",
description="❌ There was an error processing your job application. Please try again later.",
color=discord.Color.red()
)
await interaction.response.send_message(embed=embed)
async def economy_quit_callback(self, interaction: discord.Interaction):
"""Callback for /economy jobs quit command"""
user_id = interaction.user.id
# Check if user has a job
current_job = await database.get_user_job(user_id)
if not current_job or not current_job.get("name"):
embed = discord.Embed(
description="❌ You don't currently have a job to quit.",
color=discord.Color.red()
)
await interaction.response.send_message(embed=embed, ephemeral=True)
return
job_name = current_job.get("name")
# Quit the job
success = await database.remove_user_job(user_id)
if success:
embed = discord.Embed(
title="Job Resignation",
description=f"✅ You have successfully quit your job as a **{job_name}**.",
color=discord.Color.blue()
)
else:
embed = discord.Embed(
title="Error",
description="❌ There was an error processing your resignation. Please try again later.",
color=discord.Color.red()
)
await interaction.response.send_message(embed=embed)
async def economy_joblist_callback(self, interaction: discord.Interaction):
"""Callback for /economy jobs list command"""
# Get list of available jobs
jobs = await database.get_available_jobs()
if not jobs:
await interaction.response.send_message("No jobs are currently available.", ephemeral=True)
return
embed = discord.Embed(
title="Available Jobs",
description="Here are the jobs you can apply for:",
color=discord.Color.blue()
)
for job in jobs:
embed.add_field(
name=f"{job['name']} - ${job['base_pay']} per shift",
value=job['description'],
inline=False
)
embed.set_footer(text="Apply for a job with /economy jobs apply")
await interaction.response.send_message(embed=embed)
async def cog_unload(self):
"""Called when the cog is unloaded, closes DB connections."""
log.info("Unloading EconomyCog (combined)...")
@ -58,5 +840,9 @@ class EconomyCog(
async def setup(bot: commands.Bot):
"""Sets up the combined EconomyCog."""
await bot.add_cog(EconomyCog(bot))
log.info("Combined EconomyCog added to bot.")
print("Setting up EconomyCog...")
cog = EconomyCog(bot)
await bot.add_cog(cog)
log.info("Combined EconomyCog added to bot with command groups.")
print(f"EconomyCog setup complete with command groups: {[cmd.name for cmd in bot.tree.get_commands() if cmd.name == 'economy']}")
print(f"Available subgroups: {[group.name for group in cog.economy_group.walk_commands() if isinstance(group, app_commands.Group)]}")

View File

@ -14,7 +14,7 @@ import ast
# Import game implementations from separate files
from .games.chess_game import (
generate_board_image, MoveInputModal, ChessView, ChessBotView,
generate_board_image, MoveInputModal, ChessView, ChessBotView,
get_stockfish_path
)
from .games.coinflip_game import CoinFlipView
@ -22,13 +22,46 @@ from .games.tictactoe_game import TicTacToeView, BotTicTacToeView
from .games.rps_game import RockPaperScissorsView
from .games.basic_games import roll_dice, flip_coin, magic8ball_response, play_hangman
class GamesCog(commands.Cog):
class GamesCog(commands.Cog, name="Games"):
"""Cog for game-related commands"""
def __init__(self, bot: commands.Bot):
self.bot = bot
# Store active bot game views to manage engine resources
self.active_chess_bot_views = {} # Store by message ID
self.ttt_games = {} # Store TicTacToe game instances by user ID
# Create the main command group for this cog
self.games_group = app_commands.Group(
name="games",
description="Play various games with the bot or other users"
)
# Create subgroups
self.chess_group = app_commands.Group(
name="chess",
description="Chess-related commands",
parent=self.games_group
)
self.tictactoe_group = app_commands.Group(
name="tictactoe",
description="Tic-tac-toe game commands",
parent=self.games_group
)
self.dice_group = app_commands.Group(
name="dice",
description="Dice and coin games",
parent=self.games_group
)
# Register commands
self.register_commands()
# Add command groups to the bot's tree
self.bot.tree.add_command(self.games_group)
def _array_to_fen(self, board_array: List[List[str]], turn: chess.Color) -> str:
"""Converts an 8x8 array representation to a basic FEN string."""
fen_rows = []
@ -55,6 +88,121 @@ class GamesCog(commands.Cog):
fen = f"{piece_placement} {turn_char} - - 0 1"
return fen
def register_commands(self):
"""Register all commands for this cog"""
# --- Dice Group Commands ---
# Coinflip command
coinflip_command = app_commands.Command(
name="coinflip",
description="Flip a coin and get Heads or Tails",
callback=self.games_coinflip_callback,
parent=self.dice_group
)
self.dice_group.add_command(coinflip_command)
# Roll command
roll_command = app_commands.Command(
name="roll",
description="Roll a dice and get a number between 1 and 6",
callback=self.games_roll_callback,
parent=self.dice_group
)
self.dice_group.add_command(roll_command)
# Magic 8-ball command
magic8ball_command = app_commands.Command(
name="magic8ball",
description="Ask the magic 8 ball a question",
callback=self.games_magic8ball_callback,
parent=self.dice_group
)
self.dice_group.add_command(magic8ball_command)
# --- Main Games Group Commands ---
# RPS command
rps_command = app_commands.Command(
name="rps",
description="Play Rock-Paper-Scissors against the bot",
callback=self.games_rps_callback,
parent=self.games_group
)
self.games_group.add_command(rps_command)
# RPS Challenge command
rpschallenge_command = app_commands.Command(
name="rpschallenge",
description="Challenge another user to a game of Rock-Paper-Scissors",
callback=self.games_rpschallenge_callback,
parent=self.games_group
)
self.games_group.add_command(rpschallenge_command)
# Guess command
guess_command = app_commands.Command(
name="guess",
description="Guess the number I'm thinking of (1-100)",
callback=self.games_guess_callback,
parent=self.games_group
)
self.games_group.add_command(guess_command)
# Hangman command
hangman_command = app_commands.Command(
name="hangman",
description="Play a game of Hangman",
callback=self.games_hangman_callback,
parent=self.games_group
)
self.games_group.add_command(hangman_command)
# --- TicTacToe Group Commands ---
# TicTacToe command
tictactoe_command = app_commands.Command(
name="play",
description="Challenge another user to a game of Tic-Tac-Toe",
callback=self.games_tictactoe_callback,
parent=self.tictactoe_group
)
self.tictactoe_group.add_command(tictactoe_command)
# TicTacToe Bot command
tictactoebot_command = app_commands.Command(
name="bot",
description="Play a game of Tic-Tac-Toe against the bot",
callback=self.games_tictactoebot_callback,
parent=self.tictactoe_group
)
self.tictactoe_group.add_command(tictactoebot_command)
# --- Chess Group Commands ---
# Chess command
chess_command = app_commands.Command(
name="play",
description="Challenge another user to a game of chess",
callback=self.games_chess_callback,
parent=self.chess_group
)
self.chess_group.add_command(chess_command)
# Chess Bot command
chessbot_command = app_commands.Command(
name="bot",
description="Play chess against the bot",
callback=self.games_chessbot_callback,
parent=self.chess_group
)
self.chess_group.add_command(chessbot_command)
# Load Chess command
loadchess_command = app_commands.Command(
name="load",
description="Load a chess game from FEN, PGN, or array representation",
callback=self.games_loadchess_callback,
parent=self.chess_group
)
self.chess_group.add_command(loadchess_command)
async def cog_unload(self):
"""Clean up resources when the cog is unloaded."""
print("Unloading GamesCog, closing active chess engines...")
@ -66,6 +214,354 @@ class GamesCog(commands.Cog):
self.active_chess_bot_views.clear()
print("GamesCog unloaded.")
# --- Command Callbacks ---
# Dice group callbacks
async def games_coinflip_callback(self, interaction: discord.Interaction):
"""Callback for /games dice coinflip command"""
result = flip_coin()
await interaction.response.send_message(f"The coin landed on **{result}**! 🪙")
async def games_roll_callback(self, interaction: discord.Interaction):
"""Callback for /games dice roll command"""
result = roll_dice()
await interaction.response.send_message(f"You rolled a **{result}**! 🎲")
async def games_magic8ball_callback(self, interaction: discord.Interaction, question: str = None):
"""Callback for /games dice magic8ball command"""
response = magic8ball_response()
await interaction.response.send_message(f"🎱 {response}")
# Games group callbacks
async def games_rps_callback(self, interaction: discord.Interaction, choice: app_commands.Choice[str]):
"""Callback for /games rps command"""
choices = ["Rock", "Paper", "Scissors"]
bot_choice = random.choice(choices)
user_choice = choice.value # Get value from choice
if user_choice == bot_choice:
result = "It's a tie!"
elif (user_choice == "Rock" and bot_choice == "Scissors") or \
(user_choice == "Paper" and bot_choice == "Rock") or \
(user_choice == "Scissors" and bot_choice == "Paper"):
result = "You win! 🎉"
else:
result = "You lose! 😢"
emojis = {
"Rock": "🪨",
"Paper": "📄",
"Scissors": "✂️"
}
await interaction.response.send_message(
f"You chose **{user_choice}** {emojis[user_choice]}\n"
f"I chose **{bot_choice}** {emojis[bot_choice]}\n\n"
f"{result}"
)
async def games_rpschallenge_callback(self, interaction: discord.Interaction, opponent: discord.Member):
"""Callback for /games rpschallenge command"""
initiator = interaction.user
if opponent == initiator:
await interaction.response.send_message("You cannot challenge yourself!", ephemeral=True)
return
if opponent.bot:
await interaction.response.send_message("You cannot challenge a bot!", ephemeral=True)
return
view = RockPaperScissorsView(initiator, opponent)
initial_message = f"Rock Paper Scissors: {initiator.mention} vs {opponent.mention}\n\nChoose your move!"
await interaction.response.send_message(initial_message, view=view)
message = await interaction.original_response()
view.message = message
async def games_guess_callback(self, interaction: discord.Interaction, guess: int):
"""Callback for /games guess command"""
# Simple implementation: generate number per guess (no state needed)
number_to_guess = random.randint(1, 100)
if guess < 1 or guess > 100:
await interaction.response.send_message("Please guess a number between 1 and 100.", ephemeral=True)
return
if guess == number_to_guess:
await interaction.response.send_message(f"🎉 Correct! The number was **{number_to_guess}**.")
elif guess < number_to_guess:
await interaction.response.send_message(f"Too low! The number was {number_to_guess}.")
else:
await interaction.response.send_message(f"Too high! The number was {number_to_guess}.")
async def games_hangman_callback(self, interaction: discord.Interaction):
"""Callback for /games hangman command"""
await play_hangman(self.bot, interaction.channel, interaction.user)
# TicTacToe group callbacks
async def games_tictactoe_callback(self, interaction: discord.Interaction, opponent: discord.Member):
"""Callback for /games tictactoe play command"""
initiator = interaction.user
if opponent == initiator:
await interaction.response.send_message("You cannot challenge yourself!", ephemeral=True)
return
if opponent.bot:
await interaction.response.send_message("You cannot challenge a bot! Use `/games tictactoe bot` instead.", ephemeral=True)
return
view = TicTacToeView(initiator, opponent)
initial_message = f"Tic Tac Toe: {initiator.mention} (X) vs {opponent.mention} (O)\n\nTurn: **{initiator.mention} (X)**"
await interaction.response.send_message(initial_message, view=view)
message = await interaction.original_response()
view.message = message # Store message for timeout handling
async def games_tictactoebot_callback(self, interaction: discord.Interaction, difficulty: app_commands.Choice[str] = None):
"""Callback for /games tictactoe bot command"""
# Use default if no choice is made (discord.py handles default value assignment)
difficulty_value = difficulty.value if difficulty else "minimax"
# Ensure tictactoe module is importable
try:
import sys
import os
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if parent_dir not in sys.path:
sys.path.append(parent_dir)
from tictactoe import TicTacToe # Assuming tictactoe.py is in the parent directory
except ImportError:
await interaction.response.send_message("Error: TicTacToe game engine module not found.", ephemeral=True)
return
except Exception as e:
await interaction.response.send_message(f"Error importing TicTacToe module: {e}", ephemeral=True)
return
# Create a new game instance
try:
game = TicTacToe(ai_player='O', ai_difficulty=difficulty_value)
except Exception as e:
await interaction.response.send_message(f"Error initializing TicTacToe game: {e}", ephemeral=True)
return
# Create a view for the user interface
view = BotTicTacToeView(game, interaction.user)
await interaction.response.send_message(
f"Tic Tac Toe: {interaction.user.mention} (X) vs Bot (O) - Difficulty: {difficulty_value.capitalize()}\n\nYour turn!",
view=view
)
view.message = await interaction.original_response()
# Chess group callbacks
async def games_chess_callback(self, interaction: discord.Interaction, opponent: discord.Member):
"""Callback for /games chess play command"""
initiator = interaction.user
if opponent == initiator:
await interaction.response.send_message("You cannot challenge yourself!", ephemeral=True)
return
if opponent.bot:
await interaction.response.send_message("You cannot challenge a bot! Use `/games chess bot` instead.", ephemeral=True)
return
# Initiator is white, opponent is black
view = ChessView(initiator, opponent)
initial_status = f"Turn: **{initiator.mention}** (White)"
initial_message = f"Chess: {initiator.mention} (White) vs {opponent.mention} (Black)\n\n{initial_status}"
board_image = generate_board_image(view.board) # Generate initial board image
await interaction.response.send_message(initial_message, file=board_image, view=view)
message = await interaction.original_response()
view.message = message
# Send initial DMs
asyncio.create_task(view._send_or_update_dm(view.white_player))
asyncio.create_task(view._send_or_update_dm(view.black_player))
async def games_chessbot_callback(self, interaction: discord.Interaction, color: app_commands.Choice[str] = None, variant: app_commands.Choice[str] = None, skill_level: int = 10, think_time: float = 1.0):
"""Callback for /games chess bot command"""
player = interaction.user
player_color_str = color.value if color else "white"
variant_str = variant.value if variant else "standard"
player_color = chess.WHITE if player_color_str == "white" else chess.BLACK
# Validate inputs
skill_level = max(0, min(20, skill_level))
think_time = max(0.1, min(5.0, think_time))
# Check if variant is supported (currently standard and chess960)
supported_variants = ["standard", "chess960"]
if variant_str not in supported_variants:
await interaction.response.send_message(f"Sorry, the variant '{variant_str}' is not currently supported. Choose from: {', '.join(supported_variants)}", ephemeral=True)
return
# Defer response as engine start might take a moment
await interaction.response.defer()
view = ChessBotView(player, player_color, variant_str, skill_level, think_time)
# Start the engine asynchronously
# Store interaction temporarily for potential error reporting during init
view._interaction = interaction
await view.start_engine()
del view._interaction # Remove temporary attribute
if view.engine is None or view.is_finished(): # Check if engine failed or view stopped during init
# Error message should have been sent by start_engine or view stopped itself
# Ensure we don't try to send another response if already handled
# No need to send another message here, start_engine handles it.
print("ChessBotView: Engine failed to start, stopping command execution.")
return # Stop if engine failed
# Determine initial message based on who moves first
initial_status_prefix = "Your turn." if player_color == chess.WHITE else "Bot is thinking..."
initial_message_content = view.get_board_message(initial_status_prefix)
board_image = generate_board_image(view.board, perspective_white=(player_color == chess.WHITE))
# Send the initial game state using followup
message = await interaction.followup.send(initial_message_content, file=board_image, view=view, wait=True)
view.message = message
self.active_chess_bot_views[message.id] = view # Track the view
# Send initial DM to player
asyncio.create_task(view._send_or_update_dm())
# If bot moves first (player chose black), trigger its move
if player_color == chess.BLACK:
# Don't await this, let it run in the background
asyncio.create_task(view.make_bot_move())
async def games_loadchess_callback(self, interaction: discord.Interaction, state: str, turn: Optional[app_commands.Choice[str]] = None, opponent: Optional[discord.Member] = None, color: Optional[app_commands.Choice[str]] = None, skill_level: int = 10, think_time: float = 1.0):
"""Callback for /games chess load command"""
await interaction.response.defer()
initiator = interaction.user
board = None
load_error = None
loaded_pgn_game = None # To store the loaded PGN game object if parsed
# --- Input Validation ---
if not opponent and not color:
await interaction.followup.send("The 'color' parameter is required when playing against the bot.", ephemeral=True)
return
# --- Parsing Logic ---
state_trimmed = state.strip()
# 1. Try parsing as PGN
if state_trimmed.startswith("[Event") or ('.' in state_trimmed and ('O-O' in state_trimmed or 'x' in state_trimmed or state_trimmed[0].isdigit())):
try:
pgn_io = io.StringIO(state_trimmed)
loaded_pgn_game = chess.pgn.read_game(pgn_io)
if loaded_pgn_game is None:
raise ValueError("Could not parse PGN data.")
# Get the board state from the end of the main line
board = loaded_pgn_game.end().board()
print("[Debug] Parsed as PGN.")
except Exception as e:
load_error = f"Could not parse as PGN: {e}. Trying other formats."
print(f"[Debug] PGN parsing failed: {e}")
loaded_pgn_game = None # Reset if PGN parsing failed
# 2. Try parsing as FEN (if not already parsed as PGN)
if board is None and '/' in state_trimmed and (' w ' in state_trimmed or ' b ' in state_trimmed):
try:
board = chess.Board(fen=state_trimmed)
print(f"[Debug] Parsed as FEN: {state_trimmed}")
except ValueError as e:
load_error = f"Invalid FEN string: {e}. Trying array format."
print(f"[Error] FEN parsing failed: {e}")
except Exception as e:
load_error = f"Unexpected FEN parsing error: {e}. Trying array format."
print(f"[Error] Unexpected FEN parsing error: {e}")
# 3. Try parsing as Array (if not parsed as PGN or FEN)
if board is None:
try:
# Check if it looks like a list before eval
if not state_trimmed.startswith('[') or not state_trimmed.endswith(']'):
raise ValueError("Input does not look like a list array.")
board_array = ast.literal_eval(state_trimmed)
print("[Debug] Attempting to parse as array...")
if not isinstance(board_array, list) or len(board_array) != 8 or \
not all(isinstance(row, list) and len(row) == 8 for row in board_array):
raise ValueError("Invalid array structure. Must be 8x8 list.")
if not turn:
load_error = "The 'turn' parameter is required when providing a board array."
else:
turn_color = chess.WHITE if turn.value == "white" else chess.BLACK
fen = self._array_to_fen(board_array, turn_color)
print(f"[Debug] Converted array to FEN: {fen}")
board = chess.Board(fen=fen)
except (ValueError, SyntaxError, TypeError) as e:
# If PGN/FEN failed, this is the final error message
load_error = f"Invalid state format. Could not parse as PGN, FEN, or Python list array. Error: {e}"
print(f"[Error] Array parsing failed: {e}")
except Exception as e:
load_error = f"Error parsing array state: {e}"
print(f"[Error] Unexpected array parsing error: {e}")
# --- Final Check and Error Handling ---
if board is None:
final_error = load_error or "Failed to load board state from the provided input."
await interaction.followup.send(final_error, ephemeral=True)
return
# --- Game Setup ---
if opponent:
# Player vs Player
if opponent == initiator:
await interaction.followup.send("You cannot challenge yourself!", ephemeral=True)
return
if opponent.bot:
await interaction.followup.send("You cannot challenge a bot! Use `/games chess bot` or load without opponent.", ephemeral=True)
return
white_player = initiator if board.turn == chess.WHITE else opponent
black_player = opponent if board.turn == chess.WHITE else initiator
view = ChessView(white_player, black_player, board=board) # Pass loaded board
# If loaded from PGN, set the game object in the view
if loaded_pgn_game:
view.game_pgn = loaded_pgn_game
view.pgn_node = loaded_pgn_game.end() # Start from the end node
current_player_mention = white_player.mention if board.turn == chess.WHITE else black_player.mention
turn_color_name = "White" if board.turn == chess.WHITE else "Black"
initial_status = f"Turn: **{current_player_mention}** ({turn_color_name})"
if board.is_check(): initial_status += " **Check!**"
initial_message = f"Loaded Chess Game: {white_player.mention} (White) vs {black_player.mention} (Black)\n\n{initial_status}"
perspective_white = (board.turn == chess.WHITE)
board_image = generate_board_image(view.board, perspective_white=perspective_white)
message = await interaction.followup.send(initial_message, file=board_image, view=view, wait=True)
view.message = message
# Send initial DMs
asyncio.create_task(view._send_or_update_dm(view.white_player))
asyncio.create_task(view._send_or_update_dm(view.black_player))
else:
# Player vs Bot
player = initiator
# Color is now required, checked at the start
player_color = chess.WHITE if color.value == "white" else chess.BLACK
skill_level = max(0, min(20, skill_level))
think_time = max(0.1, min(5.0, think_time))
variant_str = "chess960" if board.chess960 else "standard"
view = ChessBotView(player, player_color, variant_str, skill_level, think_time, board=board) # Pass loaded board
# If loaded from PGN, set the game object in the view
if loaded_pgn_game:
view.game_pgn = loaded_pgn_game
view.pgn_node = loaded_pgn_game.end() # Start from the end node
view._interaction = interaction # For error reporting during start
await view.start_engine()
if hasattr(view, '_interaction'): del view._interaction
# --- Legacy Commands (kept for backward compatibility) ---
@app_commands.command(name="coinflipbet", description="Challenge another user to a coin flip game.")
@app_commands.describe(
opponent="The user you want to challenge."
@ -89,27 +585,6 @@ class GamesCog(commands.Cog):
message = await interaction.original_response()
view.message = message
@app_commands.command(name="coinflip", description="Flip a coin and get Heads or Tails.")
async def coinflip(self, interaction: discord.Interaction):
"""Flips a coin and returns Heads or Tails."""
result = flip_coin()
await interaction.response.send_message(f"The coin landed on **{result}**! 🪙")
@app_commands.command(name="roll", description="Roll a dice and get a number between 1 and 6.")
async def roll(self, interaction: discord.Interaction):
"""Rolls a dice and returns a number between 1 and 6."""
result = roll_dice()
await interaction.response.send_message(f"You rolled a **{result}**! 🎲")
@app_commands.command(name="magic8ball", description="Ask the magic 8 ball a question.")
@app_commands.describe(
question="The question you want to ask the magic 8 ball."
)
async def magic8ball(self, interaction: discord.Interaction, question: str):
"""Provides a random response to a yes/no question."""
response = magic8ball_response()
await interaction.response.send_message(f"🎱 {response}")
@app_commands.command(name="rps", description="Play Rock-Paper-Scissors against the bot.")
@app_commands.describe(choice="Your choice: Rock, Paper, or Scissors.")
@app_commands.choices(choice=[
@ -685,4 +1160,9 @@ class GamesCog(commands.Cog):
await ctx.send(f"Too high! The number was {number_to_guess}.")
async def setup(bot: commands.Bot):
await bot.add_cog(GamesCog(bot))
"""Set up the GamesCog with the bot."""
print("Setting up GamesCog...")
cog = GamesCog(bot)
await bot.add_cog(cog)
print(f"GamesCog setup complete with command groups: {[cmd.name for cmd in bot.tree.get_commands() if cmd.name == 'games']}")
print(f"Available subgroups: {[group.name for group in cog.games_group.walk_commands() if isinstance(group, app_commands.Group)]}")

View File

@ -2,26 +2,50 @@ import discord
from discord.ext import commands
from discord import app_commands
class PingCog(commands.Cog):
class PingCog(commands.Cog, name="Ping"):
"""Cog for ping-related commands"""
def __init__(self, bot):
self.bot = bot
# Create the main command group for this cog
self.ping_group = app_commands.Group(
name="ping",
description="Check the bot's response time"
)
# Register commands
self.register_commands()
# Add command group to the bot's tree
self.bot.tree.add_command(self.ping_group)
def register_commands(self):
"""Register all commands for this cog"""
# Check command
check_command = app_commands.Command(
name="check",
description="Check the bot's response time",
callback=self.ping_check_callback,
parent=self.ping_group
)
self.ping_group.add_command(check_command)
async def _ping_logic(self):
"""Core logic for the ping command."""
latency = round(self.bot.latency * 1000)
return f'Pong! Response time: {latency}ms'
# --- Prefix Command ---
# --- Prefix Command (for backward compatibility) ---
@commands.command(name="ping")
async def ping(self, ctx: commands.Context):
"""Check the bot's response time."""
response = await self._ping_logic()
await ctx.reply(response)
# --- Slash Command ---
@app_commands.command(name="ping", description="Check the bot's response time")
async def ping_slash(self, interaction: discord.Interaction):
"""Slash command version of ping."""
# --- Slash Command Callbacks ---
async def ping_check_callback(self, interaction: discord.Interaction):
"""Callback for /ping check command"""
response = await self._ping_logic()
await interaction.response.send_message(response)