import discord from discord.ext import commands, tasks from discord import app_commands, ui import datetime import asyncio import random import re # For parsing duration import json import os import aiofiles # Import aiofiles import aiofiles.os import discord_oauth GIVEAWAY_DATA_FILE = "data/giveaways.json" DATA_DIR = "data" # --- Helper Functions --- async def get_nitro_status_oauth(user: discord.User | discord.Member) -> bool | None: """Return True if user has Nitro according to OAuth info, False if not, None if unknown.""" token = await discord_oauth.get_token(str(user.id)) if not token: return None try: user_info = await discord_oauth.get_user_info(token) except Exception: return None return user_info.get("premium_type", 0) in (1, 2) # --- Additional Helper Functions --- async def is_user_nitro_like( user: discord.User | discord.Member, bot: commands.Bot = None ) -> bool: """Heuristically check if a user has Nitro, falling back to OAuth if available.""" nitro_oauth = await get_nitro_status_oauth(user) if nitro_oauth is not None: return nitro_oauth # Fetch the full user object to get banner information if bot: try: fetched_user = await bot.fetch_user(user.id) user = fetched_user except discord.NotFound: pass # Use the original user object if fetch fails if isinstance(user, discord.Member): # Member object has guild-specific avatar # Check guild avatar first, then global avatar if user.guild_avatar and user.guild_avatar.is_animated(): return True if user.avatar and user.avatar.is_animated(): return True elif user.avatar and user.avatar.is_animated(): # User object return True return user.banner is not None # --- UI Views and Buttons --- class GiveawayEnterButton(ui.Button["GiveawayEnterView"]): def __init__(self, cog_ref): super().__init__( label="Enter Giveaway", style=discord.ButtonStyle.green, custom_id="giveaway_enter_button", ) self.cog: GiveawaysCog = cog_ref # Store a reference to the cog async def callback(self, interaction: discord.Interaction): giveaway = self.cog._get_giveaway_by_message_id(interaction.message.id) if not giveaway or giveaway.get("ended", False): await interaction.response.send_message( "This giveaway has ended or is no longer active.", ephemeral=True ) # Optionally disable the button on the message if possible self.disabled = True await interaction.message.edit(view=self.view) return if giveaway["is_nitro_giveaway"] or giveaway.get("exclude_nitro_users"): nitro_status = await get_nitro_status_oauth(interaction.user) if nitro_status is None: await interaction.response.send_message( "Please authenticate with /auth so I can verify your Nitro status before entering.", ephemeral=True, ) return if giveaway["is_nitro_giveaway"] and not nitro_status: await interaction.response.send_message( "This is a Nitro-exclusive giveaway, and your account does not appear to have Nitro.", ephemeral=True, ) return if giveaway.get("exclude_nitro_users") and nitro_status: await interaction.response.send_message( "Nitro users are excluded from this giveaway.", ephemeral=True, ) return if interaction.user.id in giveaway["participants"]: await interaction.response.send_message( "You have already entered this giveaway!", ephemeral=True ) else: giveaway["participants"].add(interaction.user.id) await self.cog.save_giveaways() # Save after participant update await interaction.response.send_message( "You have successfully entered the giveaway!", ephemeral=True ) # Update participant count in embed if desired (optional) # embed = interaction.message.embeds[0] # embed.set_field_at(embed.fields.index(...) or create new field, name="Participants", value=str(len(giveaway["participants"]))) # await interaction.message.edit(embed=embed) class GiveawayEnterView(ui.View): def __init__( self, cog: "GiveawaysCog", timeout=None ): # Timeout=None for persistent super().__init__(timeout=timeout) self.cog = cog self.add_item(GiveawayEnterButton(cog_ref=self.cog)) class GiveawayRerollButton(ui.Button["GiveawayEndView"]): def __init__(self, cog_ref, original_giveaway_message_id: int): super().__init__( label="Reroll Winner", style=discord.ButtonStyle.blurple, custom_id=f"giveaway_reroll_button:{original_giveaway_message_id}", ) self.cog: GiveawaysCog = cog_ref self.original_giveaway_message_id = original_giveaway_message_id async def callback(self, interaction: discord.Interaction): # For reroll, we need to find the *original* giveaway data, which might not be in active_giveaways anymore. # We'll need to load it from the JSON file or have a separate store for ended giveaways. # For simplicity now, let's assume we can find it or it's passed appropriately. # This custom_id parsing is a common pattern for persistent buttons with dynamic data. # msg_id_str = interaction.data["custom_id"].split(":")[1] # original_msg_id = int(msg_id_str) # Find the giveaway data (this might need adjustment based on how ended giveaways are stored) # We'll search all giveaways loaded, including those marked "ended" giveaway_data = self.cog._get_giveaway_by_message_id( self.original_giveaway_message_id, search_all=True ) if not giveaway_data: await interaction.response.send_message( "Could not find the data for this giveaway to reroll.", ephemeral=True ) return if not interaction.user.guild_permissions.manage_guild: await interaction.response.send_message( "You don't have permission to reroll winners.", ephemeral=True ) return await interaction.response.defer(ephemeral=True) # Acknowledge participants_ids = list(giveaway_data.get("participants", [])) if not participants_ids: await interaction.followup.send( "There were no participants in this giveaway to reroll from.", ephemeral=True, ) return # Fetch user objects for participants entrants_users = [] for user_id in participants_ids: user = interaction.guild.get_member(user_id) if not user: # Try fetching if not in cache or left server try: user = await self.cog.bot.fetch_user(user_id) except discord.NotFound: continue # Skip if user cannot be found if user and not user.bot: if giveaway_data.get("is_nitro_giveaway") or giveaway_data.get( "exclude_nitro_users" ): nitro_status = await get_nitro_status_oauth(user) if nitro_status is None: continue if giveaway_data.get("is_nitro_giveaway") and not nitro_status: continue if giveaway_data.get("exclude_nitro_users") and nitro_status: continue entrants_users.append(user) if not entrants_users: await interaction.followup.send( "No eligible participants found for a reroll (e.g., after Nitro check or if users left).", ephemeral=True, ) return num_winners = giveaway_data.get("num_winners", 1) new_winners_list = [] if len(entrants_users) <= num_winners: new_winners_list = list(entrants_users) else: new_winners_list = random.sample(entrants_users, num_winners) if new_winners_list: winner_mentions = ", ".join(w.mention for w in new_winners_list) # Announce in the original giveaway channel original_channel = self.cog.bot.get_channel(giveaway_data["channel_id"]) if original_channel: await original_channel.send( f"🔄 Reroll for **{giveaway_data['prize']}**! Congratulations {winner_mentions}, you are the new winner(s)!" ) await interaction.followup.send( f"Reroll successful. New winner(s) announced in {original_channel.mention}.", ephemeral=True, ) else: await interaction.followup.send( "Reroll successful, but I couldn't find the original channel to announce.", ephemeral=True, ) else: await interaction.followup.send( "Could not select any new winners in the reroll.", ephemeral=True ) class GiveawayEndView(ui.View): def __init__( self, cog: "GiveawaysCog", original_giveaway_message_id: int, timeout=None ): # Timeout=None for persistent super().__init__(timeout=timeout) self.cog = cog self.add_item( GiveawayRerollButton( cog_ref=self.cog, original_giveaway_message_id=original_giveaway_message_id, ) ) class GiveawaysCog(commands.Cog, name="Giveaways"): """Cog for managing giveaways""" gway = app_commands.Group(name="gway", description="Giveaway related commands") def __init__(self, bot: commands.Bot): self.bot = bot self.active_giveaways = [] self.all_loaded_giveaways = [] # To keep ended ones for reroll lookup # Structure: # { # "message_id": int, # "channel_id": int, # "guild_id": int, # "prize": str, # "end_time": datetime.datetime, # Stored as ISO string in JSON # "num_winners": int, # "creator_id": int, # "participants": set(), # Store user_ids. Stored as list in JSON. # "is_nitro_giveaway": bool, # "exclude_nitro_users": bool, # "ended": bool # } # Ensure data directory exists before loading/saving asyncio.create_task(self._ensure_data_dir_exists()) # Run asynchronously asyncio.create_task(self.load_giveaways()) # Run asynchronously self.check_giveaways_loop.start() # Persistent views are added in setup_hook async def cog_load( self, ): # Changed from setup_hook to cog_load for better timing with bot ready asyncio.create_task(self._restore_views()) async def _restore_views(self): await self.bot.wait_until_ready() print("Re-adding persistent giveaway views...") temp_loaded_giveaways = [] # Use a temporary list for loading try: async with aiofiles.open(GIVEAWAY_DATA_FILE, mode="r") as f: content = await f.read() if not content: # Handle empty file case giveaways_data_for_views = [] else: giveaways_data_for_views = await self.bot.loop.run_in_executor( None, json.loads, content ) for gw_data in giveaways_data_for_views: # We only need to re-add views for messages that should have them is_ended = gw_data.get("ended", False) # Check if end_time is in the past if "ended" flag isn't perfectly reliable end_time_dt = datetime.datetime.fromisoformat(gw_data["end_time"]) if not is_ended and end_time_dt > datetime.datetime.now( datetime.timezone.utc ): # Active giveaway, re-add EnterView self.bot.add_view( GiveawayEnterView(cog=self), message_id=gw_data["message_id"], ) elif is_ended or end_time_dt <= datetime.datetime.now( datetime.timezone.utc ): # Ended giveaway, re-add EndView (with Reroll button) self.bot.add_view( GiveawayEndView( cog=self, original_giveaway_message_id=gw_data["message_id"], ), message_id=gw_data["message_id"], ) temp_loaded_giveaways.append( gw_data ) # Keep track for _get_giveaway_by_message_id print( f"Attempted to re-add views for {len(temp_loaded_giveaways)} giveaways." ) except FileNotFoundError: print("No giveaway data file found, skipping view re-adding.") except json.JSONDecodeError: print( "Error decoding giveaway data file. Starting with no active giveaways." ) except Exception as e: print(f"Error re-adding persistent views: {e}") async def _ensure_data_dir_exists(self): try: await aiofiles.os.makedirs( DATA_DIR, exist_ok=True ) # Use aiofiles.os for async mkdir, exist_ok handles if it already exists except Exception as e: print(f"Error ensuring data directory {DATA_DIR} exists: {e}") def cog_unload(self): self.check_giveaways_loop.cancel() async def load_giveaways(self): # Make async self.active_giveaways = [] self.all_loaded_giveaways = [] try: async with aiofiles.open(GIVEAWAY_DATA_FILE, mode="r") as f: content = await f.read() if not content: # Handle empty file case giveaways_data = [] else: giveaways_data = await self.bot.loop.run_in_executor( None, json.loads, content ) now = datetime.datetime.now(datetime.timezone.utc) for gw_data in giveaways_data: gw_data["end_time"] = datetime.datetime.fromisoformat( gw_data["end_time"] ) gw_data["participants"] = set(gw_data.get("participants", [])) gw_data.setdefault("is_nitro_giveaway", False) gw_data.setdefault("exclude_nitro_users", False) gw_data.setdefault( "ended", gw_data["end_time"] <= now ) # Set ended if time has passed self.all_loaded_giveaways.append( gw_data.copy() ) # Store all for reroll lookup if not gw_data["ended"]: self.active_giveaways.append(gw_data) print( f"Loaded {len(self.all_loaded_giveaways)} total giveaways ({len(self.active_giveaways)} active)." ) except FileNotFoundError: print("Giveaway data file not found. Starting with no active giveaways.") except json.JSONDecodeError: print( "Error decoding giveaway data file. Starting with no active giveaways." ) except Exception as e: print(f"An unexpected error occurred loading giveaways: {e}") async def save_giveaways(self): # Make async # Save all giveaways (from self.all_loaded_giveaways or by merging active and ended) # This ensures that "ended" status and participant lists are preserved. try: # Create a unified list to save, ensuring all giveaways are present # and active_giveaways reflects the most current state for those not yet ended. # Create a dictionary of active giveaways by message_id for quick updates active_map = {gw["message_id"]: gw for gw in self.active_giveaways} giveaways_to_save = [] # Iterate through all_loaded_giveaways to maintain the full history for gw_hist in self.all_loaded_giveaways: # If this giveaway is also in active_map, it means it's still active # or just ended in the current session. Use the version from active_map # as it might have newer participant data before being marked ended. if gw_hist["message_id"] in active_map: current_version = active_map[gw_hist["message_id"]] saved_gw = current_version.copy() else: # It's an older, ended giveaway not in the current active list saved_gw = gw_hist.copy() saved_gw["end_time"] = saved_gw["end_time"].isoformat() saved_gw["participants"] = list(saved_gw["participants"]) giveaways_to_save.append(saved_gw) # Add any brand new giveaways from self.active_giveaways not yet in self.all_loaded_giveaways # This case should ideally be handled by adding to both lists upon creation. # For robustness: all_saved_ids = {gw["message_id"] for gw in giveaways_to_save} for gw_active in self.active_giveaways: if gw_active["message_id"] not in all_saved_ids: new_gw_to_save = gw_active.copy() new_gw_to_save["end_time"] = new_gw_to_save["end_time"].isoformat() new_gw_to_save["participants"] = list( new_gw_to_save["participants"] ) giveaways_to_save.append(new_gw_to_save) # Also add to all_loaded_giveaways for next time self.all_loaded_giveaways.append(gw_active.copy()) # Offload json.dumps to executor json_string_to_save = await self.bot.loop.run_in_executor( None, json.dumps, giveaways_to_save, indent=4 ) async with aiofiles.open(GIVEAWAY_DATA_FILE, mode="w") as f: await f.write(json_string_to_save) # print(f"Saved {len(giveaways_to_save)} giveaways to disk.") except Exception as e: print(f"Error saving giveaways: {e}") def _get_giveaway_by_message_id(self, message_id: int, search_all: bool = False): """Helper to find a giveaway by its message ID.""" source_list = self.all_loaded_giveaways if search_all else self.active_giveaways for giveaway in source_list: if giveaway["message_id"] == message_id: return giveaway return None def parse_duration(self, duration_str: str) -> datetime.timedelta | None: """Parses a duration string (e.g., "1d", "3h", "30m", "1w") into a timedelta.""" match = re.fullmatch(r"(\d+)([smhdw])", duration_str.lower()) if not match: return None value, unit = int(match.group(1)), match.group(2) if unit == "s": return datetime.timedelta(seconds=value) elif unit == "m": return datetime.timedelta(minutes=value) elif unit == "h": return datetime.timedelta(hours=value) elif unit == "d": return datetime.timedelta(days=value) elif unit == "w": return datetime.timedelta(weeks=value) return None @gway.command(name="create", description="Create a new giveaway.") @app_commands.describe( prize="What is the prize?", duration="How long should the giveaway last? (e.g., 10m, 1h, 2d, 1w)", winners="How many winners? (default: 1)", nitro_giveaway="Is this a Nitro-only giveaway? (OAuth verification)", exclude_nitro="Exclude Nitro users from entering?", ) @app_commands.checks.has_permissions(manage_guild=True) async def create_giveaway_slash( self, interaction: discord.Interaction, prize: str, duration: str, winners: int = 1, nitro_giveaway: bool = False, exclude_nitro: bool = False, ): """Slash command to create a giveaway using buttons.""" parsed_duration = self.parse_duration(duration) if not parsed_duration: await interaction.response.send_message( "Invalid duration format. Use s, m, h, d, w (e.g., 10m, 1h, 2d).", ephemeral=True, ) return if winners < 1: await interaction.response.send_message( "Number of winners must be at least 1.", ephemeral=True ) return end_time = datetime.datetime.now(datetime.timezone.utc) + parsed_duration embed = discord.Embed( title=f"🎉 Giveaway: {prize} 🎉", description=f"Click the button below to enter!\n" f"Ends: {discord.utils.format_dt(end_time, style='R')} ({discord.utils.format_dt(end_time, style='F')})\n" f"Winners: {winners}", color=discord.Color.gold(), ) if nitro_giveaway: embed.description += "\n*This is a Nitro-exclusive giveaway!*" if exclude_nitro: embed.description += "\n*Users with Nitro are excluded from entering.*" embed.set_footer( text=f"Giveaway started by {interaction.user.display_name}. Entries: 0" ) # Initial entry count await interaction.response.send_message("Creating giveaway...", ephemeral=True) view = GiveawayEnterView(cog=self) giveaway_message = await interaction.channel.send(embed=embed, view=view) giveaway_data = { "message_id": giveaway_message.id, "channel_id": interaction.channel.id, "guild_id": interaction.guild.id, "prize": prize, "end_time": end_time, "num_winners": winners, "creator_id": interaction.user.id, "participants": set(), "is_nitro_giveaway": nitro_giveaway, "exclude_nitro_users": exclude_nitro, "ended": False, } self.active_giveaways.append(giveaway_data) self.all_loaded_giveaways.append( giveaway_data.copy() ) # Also add to the comprehensive list self.save_giveaways() await interaction.followup.send( f"Giveaway for '{prize}' created successfully!", ephemeral=True ) @tasks.loop(seconds=30) async def check_giveaways_loop(self): now = datetime.datetime.now(datetime.timezone.utc) giveaways_processed_in_this_run = False # Iterate over a copy of active_giveaways for safe removal/modification for giveaway_data in list(self.active_giveaways): if giveaway_data["ended"] or now < giveaway_data["end_time"]: continue # Skip already ended or not yet due giveaways_processed_in_this_run = True giveaway_data["ended"] = True # Mark as ended channel = self.bot.get_channel(giveaway_data["channel_id"]) if not channel: print( f"Error: Could not find channel {giveaway_data['channel_id']} for giveaway {giveaway_data['message_id']}" ) # Remove from active_giveaways directly as it can't be processed self.active_giveaways = [ gw for gw in self.active_giveaways if gw["message_id"] != giveaway_data["message_id"] ] continue try: message = await channel.fetch_message(giveaway_data["message_id"]) except discord.NotFound: print( f"Error: Could not find message {giveaway_data['message_id']} in channel {channel.id}" ) self.active_giveaways = [ gw for gw in self.active_giveaways if gw["message_id"] != giveaway_data["message_id"] ] continue except discord.Forbidden: print( f"Error: Bot lacks permissions to fetch message {giveaway_data['message_id']} in channel {channel.id}" ) # Cannot process, but keep it in active_giveaways for now, maybe perms will be fixed. # Or decide to remove it. For now, skip. continue # Fetch participants from the giveaway data entrants_users = [] for user_id in giveaway_data["participants"]: # Ensure user is still in the guild for Nitro check if applicable member = channel.guild.get_member(user_id) # Use guild from channel user_to_check = member if member else await self.bot.fetch_user(user_id) if not user_to_check: continue # User not found if user_to_check.bot: continue if giveaway_data["is_nitro_giveaway"] or giveaway_data.get( "exclude_nitro_users" ): nitro_status = await get_nitro_status_oauth(user_to_check) if nitro_status is None: continue if giveaway_data["is_nitro_giveaway"] and not nitro_status: continue if giveaway_data.get("exclude_nitro_users") and nitro_status: continue entrants_users.append(user_to_check) winners_list = [] if entrants_users: if len(entrants_users) <= giveaway_data["num_winners"]: winners_list = list(entrants_users) else: winners_list = random.sample( entrants_users, giveaway_data["num_winners"] ) winner_mentions_str = ( ", ".join(w.mention for w in winners_list) if winners_list else "None" ) if winners_list: await channel.send( f"Congratulations {winner_mentions_str}! You won **{giveaway_data['prize']}**!" ) else: await channel.send( f"The giveaway for **{giveaway_data['prize']}** has ended, but there were no eligible participants." ) new_embed = message.embeds[0] new_embed.description = f"Giveaway ended!\nWinners: {winner_mentions_str}" new_embed.color = discord.Color.dark_grey() new_embed.set_footer(text="Giveaway has concluded.") end_view = GiveawayEndView( cog=self, original_giveaway_message_id=giveaway_data["message_id"] ) try: await message.edit(embed=new_embed, view=end_view) except discord.Forbidden: print( f"Error: Bot lacks permissions to edit message for {giveaway_data['message_id']}" ) except discord.HTTPException as e: print( f"Error editing giveaway message {giveaway_data['message_id']}: {e}" ) # Remove from active_giveaways after processing self.active_giveaways = [ gw for gw in self.active_giveaways if gw["message_id"] != giveaway_data["message_id"] ] if giveaways_processed_in_this_run: self.save_giveaways() @check_giveaways_loop.before_loop async def before_check_giveaways_loop(self): await self.bot.wait_until_ready() @gway.command( name="rollmanual", description="Manually roll a winner from a message (for old giveaways or specific cases).", ) @app_commands.describe( message_id="The ID of the message (giveaway or any message with reactions).", winners="How many winners to pick? (default: 1)", emoji="Emoji for reaction-based roll (if not a button giveaway, default: 🎉)", ) @app_commands.checks.has_permissions(manage_guild=True) async def manual_roll_giveaway_slash( self, interaction: discord.Interaction, message_id: str, winners: int = 1, emoji: str = "🎉", ): if winners < 1: await interaction.response.send_message( "Number of winners must be at least 1.", ephemeral=True ) return try: msg_id = int(message_id) except ValueError: await interaction.response.send_message( "Invalid Message ID format. It should be a number.", ephemeral=True ) return await interaction.response.defer(ephemeral=True) # Try to find if this message_id corresponds to a known giveaway giveaway_info = self._get_giveaway_by_message_id(msg_id, search_all=True) entrants = set() # Store user objects message_to_roll = None try: message_to_roll = await interaction.channel.fetch_message(msg_id) except (discord.NotFound, discord.Forbidden): # Try searching all channels if not found or no perms in current for chan in interaction.guild.text_channels: try: message_to_roll = await chan.fetch_message(msg_id) if message_to_roll: break except (discord.NotFound, discord.Forbidden): continue if not message_to_roll: await interaction.followup.send( f"Could not find message with ID `{msg_id}` in this server or I lack permissions.", ephemeral=True, ) return if giveaway_info and "participants" in giveaway_info: # Use stored participants if available (from button-based system) for user_id in giveaway_info["participants"]: user = interaction.guild.get_member( user_id ) or await self.bot.fetch_user(user_id) if user and not user.bot: if giveaway_info.get("is_nitro_giveaway") or giveaway_info.get( "exclude_nitro_users" ): nitro_status = await get_nitro_status_oauth(user) if nitro_status is None: continue if giveaway_info.get("is_nitro_giveaway") and not nitro_status: continue if giveaway_info.get("exclude_nitro_users") and nitro_status: continue entrants.add(user) if not entrants: await interaction.followup.send( f"Found giveaway data for message `{msg_id}`, but no eligible stored participants.", ephemeral=True, ) return else: # Fallback to reactions if no participant data or not a known giveaway reaction_found = False for reaction in message_to_roll.reactions: if str(reaction.emoji) == emoji: reaction_found = True async for user in reaction.users(): if not user.bot: if giveaway_info and ( giveaway_info.get("is_nitro_giveaway") or giveaway_info.get("exclude_nitro_users") ): nitro_status = await get_nitro_status_oauth(user) if nitro_status is None: continue if ( giveaway_info.get("is_nitro_giveaway") and not nitro_status ): continue if ( giveaway_info.get("exclude_nitro_users") and nitro_status ): continue entrants.add(user) break if not reaction_found: await interaction.followup.send( f"No reactions found with {emoji} on message `{msg_id}`.", ephemeral=True, ) return if not entrants: await interaction.followup.send( f"No valid (non-bot) users reacted with {emoji} on message `{msg_id}`.", ephemeral=True, ) return winners_list = [] entrants_list = list(entrants) if len(entrants_list) <= winners: winners_list = entrants_list else: winners_list = random.sample(entrants_list, winners) if winners_list: winner_mentions = ", ".join(w.mention for w in winners_list) await interaction.followup.send( f"Manual roll from message `{msg_id}` in {message_to_roll.channel.mention}:\nCongratulations {winner_mentions}!", ephemeral=False, ) if interaction.channel.id != message_to_roll.channel.id: try: await message_to_roll.channel.send( f"Manual roll for message {message_to_roll.jump_url} concluded. Winner(s): {winner_mentions}" ) except discord.Forbidden: await interaction.followup.send( f"(Note: I couldn't announce the winner in {message_to_roll.channel.mention}.)", ephemeral=True, ) else: await interaction.followup.send( f"Could not select any winners from message `{msg_id}`.", ephemeral=True ) async def setup(bot: commands.Bot): cog = GiveawaysCog(bot) await bot.add_cog(cog) # The cog_load method will handle re-adding views once the bot is ready.