discordbot/cogs/audio_cog.py
2025-04-25 14:03:49 -06:00

841 lines
40 KiB
Python

import discord
from discord import ui # Added for views/buttons
from discord.ext import commands, tasks
import asyncio
import yt_dlp as youtube_dl
import logging
from collections import deque
import math # For pagination calculation
# Configure logging
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
# Suppress noisy yt-dlp logs unless debugging
youtube_dl.utils.bug_reports_message = lambda: ''
# --- yt-dlp Options ---
YDL_OPTS_BASE = {
'format': 'bestaudio/best',
'outtmpl': '%(extractor)s-%(id)s-%(title)s.%(ext)s',
'restrictfilenames': True,
'noplaylist': False, # Allow playlists by default, override per call if needed
'nocheckcertificate': True,
'ignoreerrors': False,
'logtostderr': False,
'quiet': True,
'no_warnings': True,
'default_search': 'ytsearch', # Default to YouTube search
'source_address': '0.0.0.0', # Bind to all IPs for better connectivity
'cookiefile': 'cookies.txt'
}
FFMPEG_OPTIONS = {
'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5',
'options': '-vn' # No video
}
class Song:
"""Represents a song to be played."""
def __init__(self, source_url, title, webpage_url, duration, requested_by):
self.source_url = source_url
self.title = title
self.webpage_url = webpage_url
self.duration = duration
self.requested_by = requested_by # User who requested the song
def __str__(self):
return f"**{self.title}** ({self.format_duration()})"
def format_duration(self):
"""Formats duration in seconds to MM:SS or HH:MM:SS."""
if not self.duration:
return "N/A"
minutes, seconds = divmod(self.duration, 60)
hours, minutes = divmod(minutes, 60)
if hours > 0:
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
else:
return f"{minutes:02d}:{seconds:02d}"
class AudioCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.queues = {} # Dictionary to hold queues per guild {guild_id: deque()}
self.current_song = {} # Dictionary for current song per guild {guild_id: Song}
self.voice_clients = {} # Dictionary for voice clients per guild {guild_id: discord.VoiceClient}
self.play_next_song.start() # Start the background task
def get_queue(self, guild_id):
"""Gets the queue for a guild, creating it if it doesn't exist."""
return self.queues.setdefault(guild_id, deque())
def get_current_song(self, guild_id):
"""Gets the current song for a guild."""
return self.current_song.get(guild_id)
def cleanup(self, guild_id):
"""Cleans up resources for a guild."""
if guild_id in self.queues:
del self.queues[guild_id]
if guild_id in self.current_song:
del self.current_song[guild_id]
if guild_id in self.voice_clients:
vc = self.voice_clients.pop(guild_id)
if vc and vc.is_connected():
# Use asyncio.create_task for fire-and-forget disconnect
asyncio.create_task(vc.disconnect(force=True))
log.info(f"Cleaned up resources for guild {guild_id}")
async def cog_unload(self):
"""Cog unload cleanup."""
self.play_next_song.cancel()
for guild_id in list(self.voice_clients.keys()): # Iterate over keys copy
self.cleanup(guild_id)
@tasks.loop(seconds=1.0)
async def play_next_song(self):
"""Background task to play the next song in the queue for each guild."""
for guild_id, vc in list(self.voice_clients.items()): # Iterate over copy
if not vc or not vc.is_connected():
# If VC disconnected unexpectedly, clean up
log.warning(f"VC for guild {guild_id} disconnected unexpectedly. Cleaning up.")
self.cleanup(guild_id)
continue
queue = self.get_queue(guild_id)
if not vc.is_playing() and not vc.is_paused() and queue:
next_song = queue.popleft()
self.current_song[guild_id] = next_song
try:
log.info(f"Playing next song in guild {guild_id}: {next_song.title}")
source = discord.FFmpegPCMAudio(next_song.source_url, **FFMPEG_OPTIONS)
vc.play(source, after=lambda e: self.handle_after_play(e, guild_id))
# Optionally send a "Now Playing" message to the channel
# This requires storing the context or channel ID somewhere
except Exception as e:
log.error(f"Error playing song {next_song.title} in guild {guild_id}: {e}")
self.current_song[guild_id] = None # Clear current song on error
# Try to play the next one if available
if queue:
log.info(f"Trying next song in queue for guild {guild_id}")
# Let the loop handle the next iteration naturally
else:
log.info(f"Queue empty for guild {guild_id} after error.")
# Consider leaving VC after inactivity?
elif not vc.is_playing() and not vc.is_paused() and not queue:
# If nothing is playing and queue is empty, clear current song
if self.current_song.get(guild_id):
self.current_song[guild_id] = None
log.info(f"Queue empty and playback finished for guild {guild_id}. Current song cleared.")
# Add inactivity disconnect logic here if desired
def handle_after_play(self, error, guild_id):
"""Callback function after a song finishes playing."""
if error:
log.error(f'Player error in guild {guild_id}: {error}')
else:
log.info(f"Song finished playing in guild {guild_id}.")
# The loop will handle playing the next song
@play_next_song.before_loop
async def before_play_next_song(self):
await self.bot.wait_until_ready()
log.info("AudioCog background task started.")
async def _extract_info(self, query):
"""Extracts info using yt-dlp. Handles URLs and search queries."""
ydl_opts = YDL_OPTS_BASE.copy()
is_search = not (query.startswith('http://') or query.startswith('https://'))
if is_search:
# For search, limit to 1 result and treat as single item
ydl_opts['default_search'] = 'ytsearch1'
ydl_opts['noplaylist'] = True # Explicitly search for single video
log.info(f"Performing YouTube search for: {query}")
else:
# For URLs, let yt-dlp determine if it's a playlist or single video
# Do not use extract_flat, get full info
ydl_opts['noplaylist'] = False # Allow playlists
log.info(f"Processing URL: {query}")
try:
# Extract full information
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(query, download=False)
# Determine if it's a playlist *after* extraction
is_playlist = info.get('_type') == 'playlist'
if is_search and 'entries' in info and info['entries']:
# If search returned results, take the first one
info = info['entries'][0]
is_playlist = False # Search result is treated as single item
elif is_search and ('entries' not in info or not info['entries']):
# Handle case where search yields no results directly
return None, False, True # Indicate no info found
return info, is_playlist, is_search
except youtube_dl.utils.DownloadError as e:
# Handle specific errors if possible (e.g., video unavailable)
error_msg = str(e)
if 'video unavailable' in error_msg.lower():
raise commands.CommandError(f"The video '{query}' is unavailable.")
elif 'playlist does not exist' in error_msg.lower():
raise commands.CommandError(f"The playlist '{query}' does not exist or is private.")
log.error(f"yt-dlp download error for '{query}': {error_msg}")
raise commands.CommandError(f"Could not process '{query}'. Is it a valid URL or search term?")
except Exception as e:
log.error(f"Unexpected yt-dlp error for '{query}': {e}")
raise commands.CommandError("An unexpected error occurred while fetching video info.")
async def _search_youtube(self, query: str, max_results: int = 15): # Increased max_results for pagination
"""Performs a YouTube search and returns multiple results."""
# Clamp max_results to avoid excessively long searches if abused
max_results = min(max(1, max_results), 25) # Limit between 1 and 25
ydl_opts = YDL_OPTS_BASE.copy()
# Use ytsearchN: query to get N results
ydl_opts['default_search'] = f'ytsearch{max_results}'
ydl_opts['noplaylist'] = True # Ensure only videos are searched
log.info(f"Performing YouTube search for '{query}' (max {max_results} results)")
try:
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
# Extract info without downloading
info = ydl.extract_info(query, download=False)
# Check if 'entries' exist and contain results
if 'entries' in info and info['entries']:
return info['entries'] # Return the list of video dictionaries
else:
log.info(f"No search results found for '{query}'")
return [] # Return empty list if no results
except youtube_dl.utils.DownloadError as e:
log.error(f"yt-dlp search download error for '{query}': {e}")
# Don't raise here, let the command handle empty results
return []
except Exception as e:
log.error(f"Unexpected yt-dlp error during search for '{query}': {e}")
# Don't raise here, let the command handle empty results
return []
async def _ensure_voice_connection(self, ctx_or_interaction):
"""Ensures the bot is connected to the user's voice channel. Accepts Context or Interaction."""
is_interaction = isinstance(ctx_or_interaction, discord.Interaction)
if is_interaction:
guild = ctx_or_interaction.guild
author = ctx_or_interaction.user
if not guild: raise commands.CommandError("Interaction must be in a guild.")
else: # Is Context
guild = ctx_or_interaction.guild
author = ctx_or_interaction.author
if not guild: raise commands.CommandError("Command must be used in a guild.")
if not isinstance(author, discord.Member) or not author.voice or not author.voice.channel:
raise commands.CommandError("You are not connected to a voice channel.")
vc = self.voice_clients.get(guild.id)
target_channel = author.voice.channel
if not vc or not vc.is_connected():
try:
log.info(f"Connecting to voice channel {target_channel.name} in guild {guild.id}")
vc = await target_channel.connect()
self.voice_clients[guild.id] = vc
except asyncio.TimeoutError:
raise commands.CommandError(f"Connecting to {target_channel.name} timed out.")
except discord.errors.ClientException as e:
raise commands.CommandError(f"Already connected to a voice channel? Error: {e}")
except Exception as e:
log.error(f"Failed to connect to {target_channel.name}: {e}")
raise commands.CommandError(f"Failed to connect to the voice channel: {e}")
elif vc.channel != target_channel:
try:
log.info(f"Moving to voice channel {target_channel.name} in guild {guild.id}")
await vc.move_to(target_channel)
self.voice_clients[guild.id] = vc # Ensure the instance is updated if move_to returns a new one
except Exception as e:
log.error(f"Failed to move to {target_channel.name}: {e}")
raise commands.CommandError(f"Failed to move to your voice channel: {e}")
return vc
# --- Commands ---
@commands.command(name="join", aliases=['connect'])
async def join(self, ctx: commands.Context):
"""Connects the bot to your current voice channel."""
try:
await self._ensure_voice_connection(ctx)
await ctx.reply(f"Connected to **{ctx.author.voice.channel.name}**.")
except commands.CommandError as e:
await ctx.reply(str(e))
except Exception as e:
log.error(f"Error in join command: {e}")
await ctx.reply("An unexpected error occurred while trying to join.")
@commands.command(name="leave", aliases=['disconnect', 'dc'])
async def leave(self, ctx: commands.Context):
"""Disconnects the bot from the voice channel."""
vc = self.voice_clients.get(ctx.guild.id)
if not vc or not vc.is_connected():
await ctx.reply("I am not connected to a voice channel.")
return
log.info(f"Disconnecting from voice channel in guild {ctx.guild.id}")
await ctx.reply(f"Disconnecting from **{vc.channel.name}**.")
self.cleanup(ctx.guild.id) # This handles the disconnect and queue clearing
@commands.command(name="play", aliases=['p'])
async def play(self, ctx: commands.Context, *, query: str):
"""Plays a song or adds it/playlist to the queue. Accepts URL or search query."""
try:
vc = await self._ensure_voice_connection(ctx)
except commands.CommandError as e:
await ctx.reply(str(e))
return
except Exception as e:
log.error(f"Error ensuring voice connection in play command: {e}")
await ctx.reply("An unexpected error occurred before playing.")
return
queue = self.get_queue(ctx.guild.id)
songs_added = 0
playlist_title = None
song_to_announce = None # Store the single song if added
async with ctx.typing(): # Indicate processing
try:
# info now contains full data for playlist or single video
info, is_playlist, is_search = await self._extract_info(query)
if not info:
await ctx.reply("Could not find anything matching your query.")
return
if is_playlist:
playlist_title = info.get('title', 'Unnamed Playlist')
log.info(f"Adding playlist '{playlist_title}' to queue for guild {ctx.guild.id}")
entries = info.get('entries', []) # Should contain full entry info now
if not entries:
await ctx.reply(f"Playlist '{playlist_title}' seems to be empty or could not be loaded.")
return
for entry in entries:
if not entry: continue
# Extract stream URL directly from the entry info
stream_url = entry.get('url') # yt-dlp often provides the best stream URL here
if not stream_url: # Fallback to formats if needed
formats = entry.get('formats', [])
for f in formats:
# Prioritize opus or known good audio codecs
if f.get('url') and f.get('acodec') != 'none' and (f.get('vcodec') == 'none' or f.get('acodec') == 'opus'):
stream_url = f['url']
break
# Last resort fallback if still no URL
if not stream_url and formats:
for f in formats:
if f.get('url') and f.get('acodec') != 'none':
stream_url = f['url']
break
if not stream_url:
log.warning(f"Could not find playable stream URL for playlist entry: {entry.get('title', entry.get('id'))}")
await ctx.send(f"⚠️ Could not get audio for '{entry.get('title', 'an item')}' from playlist.", delete_after=15)
continue
try:
song = Song(
source_url=stream_url,
title=entry.get('title', 'Unknown Title'),
webpage_url=entry.get('webpage_url', entry.get('original_url')), # Use original_url as fallback
duration=entry.get('duration'),
requested_by=ctx.author
)
queue.append(song)
songs_added += 1
except Exception as song_e:
log.error(f"Error creating Song object for entry {entry.get('title', entry.get('id'))}: {song_e}")
await ctx.send(f"⚠️ Error processing metadata for '{entry.get('title', 'an item')}' from playlist.", delete_after=15)
else: # Single video or search result
# 'info' should be the dictionary for the single video here
stream_url = info.get('url')
if not stream_url: # Fallback if 'url' isn't top-level
formats = info.get('formats', [])
for f in formats:
# Prioritize opus or known good audio codecs
if f.get('url') and f.get('acodec') != 'none' and (f.get('vcodec') == 'none' or f.get('acodec') == 'opus'):
stream_url = f['url']
break
# Last resort fallback if still no URL
if not stream_url and formats:
for f in formats:
if f.get('url') and f.get('acodec') != 'none':
stream_url = f['url']
break
if not stream_url:
await ctx.reply("Could not extract a playable audio stream for the video.")
return
song = Song(
source_url=stream_url,
title=info.get('title', 'Unknown Title'),
webpage_url=info.get('webpage_url'),
duration=info.get('duration'),
requested_by=ctx.author
)
queue.append(song)
songs_added = 1
song_to_announce = song # Store for announcement
log.info(f"Added song '{song.title}' to queue for guild {ctx.guild.id}")
except commands.CommandError as e:
await ctx.reply(str(e))
return
except Exception as e:
log.exception(f"Error during song processing in play command: {e}") # Log full traceback
await ctx.reply("An unexpected error occurred while processing your request.")
return
# --- Send confirmation message ---
if songs_added > 0:
if is_playlist:
await ctx.reply(f"✅ Added **{songs_added}** songs from playlist **'{playlist_title}'** to the queue.")
elif song_to_announce: # Check if a single song was added
# For single adds, show position if queue was not empty before adding
queue_pos = len(queue) # Position is the current length (after adding)
if vc.is_playing() or vc.is_paused() or queue_pos > 1: # If something playing or queue had items before this add
await ctx.reply(f"✅ Added **{song_to_announce.title}** to the queue (position #{queue_pos}).")
else:
# If nothing was playing and queue was empty, this song will play next
# The loop will handle the "Now Playing" implicitly, so just confirm add
await ctx.reply(f"✅ Added **{song_to_announce.title}** to the queue.")
# No need to explicitly start playback here, the loop handles it.
else:
# This case might happen if playlist extraction failed for all entries or search failed
if not is_playlist and is_search:
# If it was a search and nothing was added, the earlier message handles it
pass # Already sent "Could not find anything..."
else:
await ctx.reply("Could not add any songs from the provided source.")
@commands.command(name="pause")
async def pause(self, ctx: commands.Context):
"""Pauses the current playback."""
vc = self.voice_clients.get(ctx.guild.id)
if not vc or not vc.is_playing():
await ctx.reply("I am not playing anything right now.")
return
if vc.is_paused():
await ctx.reply("Playback is already paused.")
return
vc.pause()
await ctx.reply("⏸️ Playback paused.")
log.info(f"Playback paused in guild {ctx.guild.id}")
@commands.command(name="resume")
async def resume(self, ctx: commands.Context):
"""Resumes paused playback."""
vc = self.voice_clients.get(ctx.guild.id)
if not vc or not vc.is_connected():
await ctx.reply("I am not connected to a voice channel.")
return
if not vc.is_paused():
await ctx.reply("Playback is not paused.")
return
vc.resume()
await ctx.reply("▶️ Playback resumed.")
log.info(f"Playback resumed in guild {ctx.guild.id}")
@commands.command(name="skip", aliases=['s'])
async def skip(self, ctx: commands.Context):
"""Skips the current song."""
vc = self.voice_clients.get(ctx.guild.id)
if not vc or not vc.is_playing():
await ctx.reply("I am not playing anything to skip.")
return
current = self.get_current_song(ctx.guild.id)
await ctx.reply(f"⏭️ Skipping **{current.title if current else 'the current song'}**...")
vc.stop() # Triggers the 'after' callback, which lets the loop play the next song
log.info(f"Song skipped in guild {ctx.guild.id} by {ctx.author}")
# The loop will handle playing the next song
@commands.command(name="stop")
async def stop(self, ctx: commands.Context):
"""Stops playback and clears the queue."""
vc = self.voice_clients.get(ctx.guild.id)
if not vc or not vc.is_connected():
await ctx.reply("I am not connected to a voice channel.")
return
queue = self.get_queue(ctx.guild.id)
queue.clear()
self.current_song[ctx.guild.id] = None # Clear current song immediately
if vc.is_playing() or vc.is_paused():
vc.stop() # Stop playback
await ctx.reply("⏹️ Playback stopped and queue cleared.")
log.info(f"Playback stopped and queue cleared in guild {ctx.guild.id} by {ctx.author}")
else:
await ctx.reply("⏹️ Queue cleared.") # If nothing was playing, just confirm queue clear
log.info(f"Queue cleared in guild {ctx.guild.id} by {ctx.author} (nothing was playing).")
@commands.command(name="queue", aliases=['q'])
async def queue(self, ctx: commands.Context):
"""Displays the current song queue."""
queue = self.get_queue(ctx.guild.id)
current = self.get_current_song(ctx.guild.id)
if not queue and not current:
await ctx.reply("The queue is empty and nothing is playing.")
return
embed = discord.Embed(title="Music Queue", color=discord.Color.blue())
if current:
embed.add_field(name="Now Playing", value=f"[{current.title}]({current.webpage_url}) | `{current.format_duration()}` | Requested by {current.requested_by.mention}", inline=False)
else:
embed.add_field(name="Now Playing", value="Nothing currently playing.", inline=False)
if queue:
queue_list = []
max_display = 10 # Limit display to avoid huge embeds
for i, song in enumerate(list(queue)[:max_display]):
queue_list.append(f"`{i+1}.` [{song.title}]({song.webpage_url}) | `{song.format_duration()}` | Req by {song.requested_by.mention}")
if queue_list:
embed.add_field(name="Up Next", value="\n".join(queue_list), inline=False)
if len(queue) > max_display:
embed.set_footer(text=f"... and {len(queue) - max_display} more songs.")
else:
embed.add_field(name="Up Next", value="The queue is empty.", inline=False)
await ctx.reply(embed=embed)
@commands.command(name="nowplaying", aliases=['np', 'current'])
async def nowplaying(self, ctx: commands.Context):
"""Shows the currently playing song."""
current = self.get_current_song(ctx.guild.id)
vc = self.voice_clients.get(ctx.guild.id)
if not vc or not vc.is_connected():
await ctx.reply("I'm not connected to a voice channel.")
return
if not current or not (vc.is_playing() or vc.is_paused()):
await ctx.reply("Nothing is currently playing.")
return
embed = discord.Embed(title="Now Playing", description=f"[{current.title}]({current.webpage_url})", color=discord.Color.green())
embed.add_field(name="Duration", value=f"`{current.format_duration()}`", inline=True)
embed.add_field(name="Requested by", value=current.requested_by.mention, inline=True)
# Add progress bar if possible (requires tracking start time)
# progress = ...
# embed.add_field(name="Progress", value=progress, inline=False)
if hasattr(current, 'thumbnail') and current.thumbnail: # Check if thumbnail exists
embed.set_thumbnail(url=current.thumbnail)
await ctx.reply(embed=embed)
# --- Search Command and View ---
@commands.command(name="search")
async def search(self, ctx: commands.Context, *, query: str):
"""Searches YouTube and displays results with selection buttons."""
if not ctx.guild:
await ctx.reply("This command can only be used in a server.")
return
# Store the initial message to edit later
message = await ctx.reply(f"Searching for '{query}'...")
async with ctx.typing(): # Keep typing indicator while searching
try:
# Fetch more results for pagination
results = await self._search_youtube(query, max_results=15)
except Exception as e:
log.error(f"Error during YouTube search: {e}")
await message.edit(content="An error occurred while searching.", view=None)
return
if not results:
await message.edit(content=f"No results found for '{query}'.", view=None)
return
# Prepare data for the view
search_results_data = []
for entry in results:
# Store necessary info for adding to queue later
search_results_data.append({
'title': entry.get('title', 'Unknown Title'),
'webpage_url': entry.get('webpage_url'),
'duration': entry.get('duration'),
'id': entry.get('id'), # Need ID to re-fetch stream URL later
'uploader': entry.get('uploader', 'Unknown Uploader')
})
# Create the view with pagination
view = PaginatedSearchResultView(ctx.author, search_results_data, self, query) # Pass query for title
view.interaction_message = message # Store message reference in the view
# Initial update of the message with the first page
await view.update_message(interaction=None) # Use interaction=None for initial send/edit
@staticmethod
def format_duration_static(duration):
"""Static version of format_duration for use outside Song objects."""
if not duration:
return "N/A"
minutes, seconds = divmod(duration, 60)
hours, minutes = divmod(minutes, 60)
if hours > 0:
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
else:
return f"{minutes:02d}:{seconds:02d}"
# Error Handling for Audio Cog specifically
@commands.Cog.listener()
async def on_command_error(self, ctx, error):
# Handle errors specific to this cog, prevents double messages if global handler exists
if isinstance(error, commands.CommandError) and ctx.cog == self:
# Check if the error is specific to commands in this cog
# Avoid handling errors already handled locally in commands if possible
# This basic check just prevents duplicate generic messages
log.warning(f"Command error in AudioCog: {error}")
# await ctx.reply(f"An error occurred: {error}") # Optional: Send specific cog errors
# Return True or pass to prevent global handler if needed
# pass # Let global handler take care of it unless specific handling is needed
async def setup(bot):
await bot.add_cog(AudioCog(bot))
log.info("AudioCog loaded successfully.")
# --- Paginated Search Result View ---
class PaginatedSearchResultView(ui.View):
RESULTS_PER_PAGE = 5
def __init__(self, author: discord.Member, results: list, cog: AudioCog, query: str, timeout=180.0):
super().__init__(timeout=timeout)
self.author = author
self.results = results
self.cog = cog
self.query = query # Store original query for embed title
self.current_page = 0
self.total_pages = math.ceil(len(self.results) / self.RESULTS_PER_PAGE)
self.interaction_message: discord.Message = None # To disable view later
self.update_buttons() # Initial button setup
def update_buttons(self):
"""Clears and adds buttons based on the current page and total results."""
self.clear_items()
start_index = self.current_page * self.RESULTS_PER_PAGE
end_index = min(start_index + self.RESULTS_PER_PAGE, len(self.results))
# Add result selection buttons for the current page
for i in range(start_index, end_index):
result = self.results[i]
button = ui.Button(
label=f"{i+1}", # Overall result number
style=discord.ButtonStyle.secondary,
custom_id=f"search_select_{i}",
row= (i - start_index) // 5 # Arrange buttons neatly if more than 5 per page (though we limit to 5)
)
# Use lambda to capture the correct index 'i'
button.callback = lambda interaction, index=i: self.select_button_callback(interaction, index)
self.add_item(button)
# Add navigation buttons (Previous/Next) - ensure they are on the last row
nav_row = math.ceil(self.RESULTS_PER_PAGE / 5) # Calculate row for nav buttons
if self.total_pages > 1:
prev_button = ui.Button(label="◀ Previous", style=discord.ButtonStyle.primary, custom_id="search_prev", disabled=self.current_page == 0, row=nav_row)
prev_button.callback = self.prev_button_callback
self.add_item(prev_button)
next_button = ui.Button(label="Next ▶", style=discord.ButtonStyle.primary, custom_id="search_next", disabled=self.current_page == self.total_pages - 1, row=nav_row)
next_button.callback = self.next_button_callback
self.add_item(next_button)
def create_embed(self) -> discord.Embed:
"""Creates the embed for the current page."""
embed = discord.Embed(
title=f"Search Results for '{self.query}' (Page {self.current_page + 1}/{self.total_pages})",
description="Click a button below to add the song to the queue.",
color=discord.Color.purple()
)
start_index = self.current_page * self.RESULTS_PER_PAGE
end_index = min(start_index + self.RESULTS_PER_PAGE, len(self.results))
if start_index >= len(self.results): # Should not happen with proper page clamping
embed.description = "No results on this page."
return embed
for i in range(start_index, end_index):
entry = self.results[i]
title = entry.get('title', 'Unknown Title')
url = entry.get('webpage_url')
duration_sec = entry.get('duration')
duration_fmt = self.cog.format_duration_static(duration_sec) if duration_sec else "N/A" # Use cog's static method
uploader = entry.get('uploader', 'Unknown Uploader')
embed.add_field(
name=f"{i+1}. {title}", # Use overall index + 1 for label
value=f"[{uploader}]({url}) | `{duration_fmt}`",
inline=False
)
embed.set_footer(text=f"Showing results {start_index + 1}-{end_index} of {len(self.results)}")
return embed
async def update_message(self, interaction: discord.Interaction = None):
"""Updates the message with the current page's embed and buttons."""
self.update_buttons()
embed = self.create_embed()
if interaction:
await interaction.response.edit_message(embed=embed, view=self)
elif self.interaction_message: # For initial send/edit
await self.interaction_message.edit(content=None, embed=embed, view=self) # Remove "Searching..." text
async def interaction_check(self, interaction: discord.Interaction) -> bool:
# Only allow the original command author to interact
if interaction.user != self.author:
await interaction.response.send_message("Only the person who started the search can interact with this.", ephemeral=True)
return False
return True
async def select_button_callback(self, interaction: discord.Interaction, index: int):
"""Callback when a result selection button is pressed."""
if not interaction.guild: return
selected_result = self.results[index]
log.info(f"Search result {index+1} ('{selected_result['title']}') selected by {interaction.user} in guild {interaction.guild.id}")
# Defer the interaction
await interaction.response.defer()
# Disable all buttons in the view after selection
for item in self.children:
if isinstance(item, ui.Button):
item.disabled = True
# Update the original message to show disabled buttons and confirmation
final_embed = self.create_embed() # Get current embed state
final_embed.description = f"Selected: **{selected_result['title']}**. Adding to queue..."
final_embed.color = discord.Color.green()
await interaction.edit_original_response(embed=final_embed, view=self)
self.stop() # Stop the view from listening further
# --- Add the selected song to the queue ---
try:
# Ensure bot is connected to voice (use interaction here)
vc = await self.cog._ensure_voice_connection(interaction)
if not vc:
log.error("Failed to ensure voice connection in search callback.")
await interaction.followup.send("Could not connect to voice channel.", ephemeral=True)
return
queue = self.cog.get_queue(interaction.guild.id)
# Re-fetch the stream URL
try:
query_for_stream = selected_result.get('webpage_url') or selected_result.get('id')
if not query_for_stream:
raise commands.CommandError("Missing video identifier for selected result.")
info, _, _ = await self.cog._extract_info(query_for_stream)
if not info:
raise commands.CommandError("Could not retrieve details for the selected video.")
stream_url = info.get('url')
if not stream_url:
formats = info.get('formats', [])
for f in formats:
if f.get('url') and f.get('acodec') != 'none' and (f.get('vcodec') == 'none' or f.get('acodec') == 'opus'):
stream_url = f['url']
break
if not stream_url and formats:
for f in formats:
if f.get('url') and f.get('acodec') != 'none':
stream_url = f['url']
break
if not stream_url:
raise commands.CommandError("Could not extract a playable audio stream.")
song = Song(
source_url=stream_url,
title=info.get('title', selected_result.get('title', 'Unknown Title')),
webpage_url=info.get('webpage_url', selected_result.get('webpage_url')),
duration=info.get('duration', selected_result.get('duration')),
requested_by=interaction.user
)
queue.append(song)
log.info(f"Added search result '{song.title}' to queue for guild {interaction.guild.id}")
# Send confirmation followup
queue_pos = len(queue)
if vc.is_playing() or vc.is_paused() or queue_pos > 1:
await interaction.followup.send(f"✅ Added **{song.title}** to the queue (position #{queue_pos}).")
else:
await interaction.followup.send(f"✅ Added **{song.title}** to the queue.")
except commands.CommandError as e:
log.error(f"Error adding search result to queue: {e}")
await interaction.followup.send(f"Error adding song: {e}", ephemeral=True)
except Exception as e:
log.exception(f"Unexpected error adding search result to queue: {e}")
await interaction.followup.send("An unexpected error occurred while adding the song.", ephemeral=True)
except commands.CommandError as e:
await interaction.followup.send(str(e), ephemeral=True)
except Exception as e:
log.exception(f"Unexpected error in search select callback: {e}")
await interaction.followup.send("An unexpected error occurred.", ephemeral=True)
async def prev_button_callback(self, interaction: discord.Interaction):
"""Callback for the previous page button."""
if self.current_page > 0:
self.current_page -= 1
await self.update_message(interaction)
async def next_button_callback(self, interaction: discord.Interaction):
"""Callback for the next page button."""
if self.current_page < self.total_pages - 1:
self.current_page += 1
await self.update_message(interaction)
async def on_timeout(self):
# Disable buttons on timeout
log.info(f"Paginated search view timed out for user {self.author.id}")
for item in self.children:
if isinstance(item, ui.Button):
item.disabled = True
# Try to edit the original message
if self.interaction_message:
try:
# Keep the last viewed embed but indicate timeout
timeout_embed = self.create_embed()
timeout_embed.description = "Search selection timed out."
timeout_embed.color = discord.Color.default() # Reset color
await self.interaction_message.edit(embed=timeout_embed, view=self)
except discord.NotFound:
log.warning("Original search message not found on timeout.")
except discord.Forbidden:
log.warning("Missing permissions to edit search message on timeout.")
except Exception as e:
log.error(f"Error editing search message on timeout: {e}")
# Override on_error if specific error handling for the view is needed
# async def on_error(self, interaction: discord.Interaction, error: Exception, item: ui.Item) -> None:
# log.error(f"Error in PaginatedSearchResultView interaction: {error}")
# await interaction.response.send_message("An error occurred with this interaction.", ephemeral=True)