diff --git a/FONT/DejaVuSansMono.ttf b/FONT/DejaVuSansMono.ttf new file mode 100644 index 0000000..8b7bb2a Binary files /dev/null and b/FONT/DejaVuSansMono.ttf differ diff --git a/cogs/terminal_cog.py b/cogs/terminal_cog.py new file mode 100644 index 0000000..8a3e539 --- /dev/null +++ b/cogs/terminal_cog.py @@ -0,0 +1,522 @@ +import discord +from discord.ext import commands, tasks +from discord import app_commands, ui, File, Interaction +from PIL import Image, ImageDraw, ImageFont +import subprocess +import os +import io +from collections import deque +import shlex # For safer command parsing if not using shell=True for everything + +# --- Configuration --- +FONT_PATH = "FONT/DejaVuSansMono.ttf" # IMPORTANT: Make sure this font file (e.g., Courier New) is in the same directory as your bot, or provide an absolute path. + # You can download common monospaced fonts like DejaVuSansMono.ttf +FONT_SIZE = 15 +IMG_WIDTH = 800 +IMG_HEIGHT = 600 +PADDING = 10 +LINE_SPACING = 4 # Extra pixels between lines +BACKGROUND_COLOR = (30, 30, 30) # Dark grey +TEXT_COLOR = (220, 220, 220) # Light grey +PROMPT_COLOR = (70, 170, 240) # Blueish +ERROR_COLOR = (255, 100, 100) # Reddish +MAX_HISTORY_LINES = 500 # Max lines to keep in history +AUTO_UPDATE_INTERVAL_SECONDS = 3 +MAX_OUTPUT_LINES_PER_IMAGE = (IMG_HEIGHT - 2 * PADDING) // (FONT_SIZE + LINE_SPACING) + + +# --- Helper: Owner Check --- +async def is_owner_check(interaction: discord.Interaction) -> bool: + """Checks if the interacting user is the bot owner.""" + if interaction.client.application is None: # Should not happen in slash commands + await interaction.client.application_info() # Fetch if not cached + + # Using application.owner for robust owner checking + # For single owner bots, bot.owner_id can also be used if set. + # For team bots, application.owner will be the team. You might need to check team members. + # For simplicity, assuming a single owner or the first team member if it's a team. + app_info = await interaction.client.application_info() + if app_info.team: + # If it's a team, you might want to check if interaction.user.id is in team members + # For this example, we'll consider the team owner (usually the creator) + # This logic might need adjustment based on how discord.py handles team owners. + # A simpler approach for a single-person owned bot: return interaction.user.id == app_info.owner.id + # However, app_info.owner can be a User or a Team. + if isinstance(app_info.owner, discord.User): + return interaction.user.id == app_info.owner.id + elif isinstance(app_info.owner, discord.Team): + # Check if the user is a member of the team, or the team owner + # For simplicity, let's check if the user is the team owner (first member often is) + # or any member of the team. + if app_info.owner.owner_id == interaction.user.id: + return True + for member in app_info.owner.members: + if member.id == interaction.user.id: + return True # Or apply specific roles + return False # Or restrict to only team owner + return False # Fallback + elif app_info.owner: # Single owner + return interaction.user.id == app_info.owner.id + return False # Should not be reached if bot is properly configured + + +class TerminalCog(commands.Cog, name="Terminal"): + """Owner-only cog for a terminal interface in Discord.""" + + def __init__(self, bot: commands.Bot): + self.bot = bot + self.owner_id: int = 0 # Will be set in cog_load + self.terminal_active: bool = False + self.current_cwd: str = os.getcwd() + self.output_history: deque[str] = deque(maxlen=MAX_HISTORY_LINES) + self.scroll_offset: int = 0 + self.terminal_message: discord.Message | None = None + self.active_process: subprocess.Popen | None = None + self.terminal_view: TerminalView | None = None + try: + self.font = ImageFont.truetype(FONT_PATH, FONT_SIZE) + except IOError: + print(f"Error: Font file '{FONT_PATH}' not found. Using default PIL font. Terminal image quality may be affected.") + self.font = ImageFont.load_default() + + self.auto_update_task = tasks.loop(seconds=AUTO_UPDATE_INTERVAL_SECONDS)(self.refresh_terminal_output) + # Ensure cog_load is defined to set owner_id properly + # self.bot.loop.create_task(self._async_init()) # Alternative for async setup + + async def cog_load(self): + """Async setup for the cog, like fetching owner_id.""" + app_info = await self.bot.application_info() + if app_info.team: + # For teams, owner_id might be the team owner's ID or you might need a list of allowed admins. + # This example will use the team owner if available, otherwise the first listed owner. + self.owner_id = app_info.owner.owner_id if app_info.owner and hasattr(app_info.owner, 'owner_id') else (app_info.owner.id if app_info.owner else 0) + elif app_info.owner: + self.owner_id = app_info.owner.id + else: + print("Warning: Bot owner ID could not be determined. Terminal cog owner checks might fail.") + # Fallback or raise error + # self.owner_id = YOUR_FALLBACK_OWNER_ID # if you have one + + def _generate_terminal_image(self) -> io.BytesIO: + """Generates an image of the current terminal output.""" + image = Image.new('RGB', (IMG_WIDTH, IMG_HEIGHT), BACKGROUND_COLOR) + draw = ImageDraw.Draw(image) + + char_width, _ = self.font.getbbox("M")[2:] # Get width of a character + if char_width == 0: char_width = FONT_SIZE // 2 # Estimate if getbbox fails for default font + + y_pos = PADDING + + # Determine visible lines based on scroll offset + start_index = self.scroll_offset + end_index = min(len(self.output_history), self.scroll_offset + MAX_OUTPUT_LINES_PER_IMAGE) + + visible_lines = list(self.output_history)[start_index:end_index] + + for line in visible_lines: + # Basic coloring for prompt or errors + display_color = TEXT_COLOR + if line.strip().endswith(">") and self.current_cwd in line : # Basic prompt detection + display_color = PROMPT_COLOR + elif "error" in line.lower() or "failed" in line.lower(): # Basic error detection + display_color = ERROR_COLOR + + # Handle lines longer than image width (simple truncation) + # A more advanced version could wrap text or allow horizontal scrolling. + max_chars_on_line = (IMG_WIDTH - 2 * PADDING) // char_width if char_width > 0 else 80 + + # Truncate if needed + # Pillow's draw.text handles clipping, but explicit truncation can be clearer + # For simplicity, we'll let Pillow clip. + # if len(line) > max_chars_on_line: + # line = line[:max_chars_on_line-3] + "..." + + draw.text((PADDING, y_pos), line, font=self.font, fill=display_color) + y_pos += FONT_SIZE + LINE_SPACING + if y_pos > IMG_HEIGHT - PADDING - FONT_SIZE: + break # Stop if no more space + + img_byte_arr = io.BytesIO() + image.save(img_byte_arr, format='PNG') + img_byte_arr.seek(0) + return img_byte_arr + + async def _update_terminal_message(self, interaction: Interaction | None = None, new_content: str | None = None): + """Updates the terminal message with a new image and view.""" + if not self.terminal_message and interaction: + # This case should ideally be handled by sending a new message + # For now, we assume terminal_message is set after initial command + return + + if not self.terminal_active: + if self.terminal_message: + await self.terminal_message.edit(content="Terminal session ended.", view=None, attachments=[]) + return + + image_bytes = self._generate_terminal_image() + discord_file = File(fp=image_bytes, filename="terminal_output.png") + + if self.terminal_view: + self.terminal_view.update_button_states(self) # Update button enable/disable + + target_message = self.terminal_message + edit_kwargs = {"attachments": [discord_file], "view": self.terminal_view} + if new_content: + edit_kwargs["content"] = new_content + else: # Clear old content if any + edit_kwargs["content"] = None + + + try: + if interaction and not interaction.response.is_done(): + await interaction.response.edit_message(**edit_kwargs) + if not self.terminal_message: # If interaction was the first one + self.terminal_message = await interaction.original_response() + elif self.terminal_message: + await self.terminal_message.edit(**edit_kwargs) + else: + # This should not happen if terminal is active + print("Error: Terminal message not found for update.") + # If interaction is None, we can't send a new message easily here + # This path is usually for auto-updates. + except discord.NotFound: + print("Error: Terminal message not found (deleted?). Ending session.") + await self.stop_terminal_session() + except discord.HTTPException as e: + print(f"Error updating terminal message: {e}") + if e.status == 429: # Rate limited + print("Rate limited. Auto-update might be too fast or manual refresh too frequent.") + + + async def stop_terminal_session(self, interaction: Interaction | None = None): + """Stops the terminal session and cleans up.""" + self.terminal_active = False + if self.auto_update_task.is_running(): + self.auto_update_task.cancel() + if self.active_process: + try: + self.active_process.terminate() # Try to terminate gracefully + self.active_process.wait(timeout=1.0) # Wait a bit + except subprocess.TimeoutExpired: + self.active_process.kill() # Force kill if terminate fails + except Exception as e: + print(f"Error terminating process: {e}") + self.active_process = None + + final_message = "Terminal session ended." + if self.terminal_message: + try: + await self.terminal_message.edit(content=final_message, view=None, attachments=[]) + except discord.HTTPException: + pass # Message might already be gone + elif interaction: # If no persistent message, respond to interaction + if not interaction.response.is_done(): + await interaction.response.send_message(final_message, ephemeral=True) + else: + await interaction.followup.send(final_message, ephemeral=True) + + self.terminal_message = None + self.output_history.clear() + self.scroll_offset = 0 + + + @app_commands.command(name="terminal", description="Starts an owner-only terminal session.") + @app_commands.check(is_owner_check) + async def terminal_command(self, interaction: Interaction): + """Starts the terminal interface.""" + if self.terminal_active and self.terminal_message: + await interaction.response.send_message(f"A terminal session is already active. View it here: {self.terminal_message.jump_url}", ephemeral=True) + return + + await interaction.response.defer(ephemeral=False) # Ephemeral False to allow message editing + + self.terminal_active = True + self.current_cwd = os.getcwd() + self.output_history.clear() + self.output_history.append(f"Discord Terminal Initialized.") + self.output_history.append(f"Owner: {interaction.user.name} ({interaction.user.id})") + self.output_history.append(f"Current CWD: {self.current_cwd}") + self.output_history.append(f"{self.current_cwd}> ") + self.scroll_offset = max(0, len(self.output_history) - MAX_OUTPUT_LINES_PER_IMAGE) # Scroll to bottom + + self.terminal_view = TerminalView(cog=self, owner_id=self.owner_id) + + image_bytes = self._generate_terminal_image() + discord_file = File(fp=image_bytes, filename="terminal_output.png") + + # Send initial message and store it + # Use followup since we deferred + self.terminal_message = await interaction.followup.send( + file=discord_file, + view=self.terminal_view + ) + self.terminal_view.message = self.terminal_message # Give view a reference to the message + + @terminal_command.error + async def terminal_command_error(self, interaction: Interaction, error: app_commands.AppCommandError): + if isinstance(error, app_commands.CheckFailure): + await interaction.response.send_message("You do not have permission to use this command.", ephemeral=True) + else: + await interaction.response.send_message(f"An error occurred: {error}", ephemeral=True) + print(f"Terminal command error: {error}") + + async def execute_shell_command(self, command: str, interaction: Interaction): + """Executes a shell command and updates the terminal.""" + if not self.terminal_active: + await interaction.response.send_message("Terminal session is not active. Use `/terminal` to start.", ephemeral=True) + return + + # Add command to history (before execution) + self.output_history.append(f"{self.current_cwd}> {command}") + + # Handle 'cd' command separately + if command.strip().lower().startswith("cd "): + try: + target_dir_str = command.strip()[3:].strip() + if not target_dir_str: # "cd" or "cd " + # Go to home directory (platform dependent) + new_cwd = os.path.expanduser("~") + else: + # Replace ~ with home directory path + target_dir_str = os.path.expanduser(target_dir_str) + if os.path.isabs(target_dir_str): + new_cwd = target_dir_str + else: + new_cwd = os.path.abspath(os.path.join(self.current_cwd, target_dir_str)) + + if os.path.isdir(new_cwd): + self.current_cwd = new_cwd + # self.output_history.append(f"Changed directory to: {self.current_cwd}") # Optional: output for cd + else: + self.output_history.append(f"Error: Directory not found: {new_cwd}") + except Exception as e: + self.output_history.append(f"Error changing directory: {e}") + + self.output_history.append(f"{self.current_cwd}> ") # New prompt + self.scroll_offset = max(0, len(self.output_history) - MAX_OUTPUT_LINES_PER_IMAGE) + await self._update_terminal_message(interaction) + return + + # Handle 'exit' or 'quit' + if command.strip().lower() in ["exit", "quit"]: + self.output_history.append("Exiting terminal session...") + await self._update_terminal_message(interaction) # Show exit message + await self.stop_terminal_session(interaction) + return + + # For other commands, use subprocess + if self.active_process and self.active_process.poll() is None: + self.output_history.append("A command is already running. Please wait or refresh.") + await self._update_terminal_message(interaction) + return + + try: + self.active_process = subprocess.Popen( + command, + shell=True, # Security risk: Be absolutely sure this is owner-only. + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + cwd=self.current_cwd, + bufsize=1, # Line-buffered + universal_newlines=True # For text mode + ) + if not self.auto_update_task.is_running(): + self.auto_update_task.start() + + # Initial update to show command is running + self.scroll_offset = max(0, len(self.output_history) - MAX_OUTPUT_LINES_PER_IMAGE) + await self._update_terminal_message(interaction) + + except FileNotFoundError: + self.output_history.append(f"Error: Command not found: {command.split()[0]}") + self.output_history.append(f"{self.current_cwd}> ") + self.scroll_offset = max(0, len(self.output_history) - MAX_OUTPUT_LINES_PER_IMAGE) + await self._update_terminal_message(interaction) + except Exception as e: + self.output_history.append(f"Error executing command: {e}") + self.output_history.append(f"{self.current_cwd}> ") + self.scroll_offset = max(0, len(self.output_history) - MAX_OUTPUT_LINES_PER_IMAGE) + await self._update_terminal_message(interaction) + + + async def refresh_terminal_output(self, interaction: Interaction | None = None): + """Called by task loop or refresh button to update output from active process.""" + if not self.terminal_active: + if self.auto_update_task.is_running(): + self.auto_update_task.cancel() + return + + updated = False + if self.active_process: + new_output_lines = [] + try: + # Read non-blockingly (or as much as possible without full block) + # stdout + while True: # Read all available lines from stdout + line = self.active_process.stdout.readline() + if not line: # No more output currently, or EOF + break + new_output_lines.append(line.strip()) # Strip newlines + updated = True + + # stderr + while True: # Read all available lines from stderr + line = self.active_process.stderr.readline() + if not line: + break + new_output_lines.append(f"STDERR: {line.strip()}") + updated = True + + except Exception as e: + new_output_lines.append(f"Error reading process output: {e}") + updated = True + + if new_output_lines: + self.output_history.extend(new_output_lines) + + if self.active_process.poll() is not None: # Process has finished + return_code = self.active_process.returncode + # Read any final output + final_stdout, final_stderr = self.active_process.communicate() + if final_stdout: self.output_history.extend(final_stdout.strip().splitlines()) + if final_stderr: self.output_history.extend([f"STDERR: {l}" for l in final_stderr.strip().splitlines()]) + + self.output_history.append(f"Process finished with exit code {return_code}.") + self.output_history.append(f"{self.current_cwd}> ") # New prompt + self.active_process = None + if self.auto_update_task.is_running(): # Stop loop if it was running for this process + self.auto_update_task.stop() + updated = True + + if updated or interaction: # if interaction, means it's a manual refresh, so always update + self.scroll_offset = max(0, len(self.output_history) - MAX_OUTPUT_LINES_PER_IMAGE) + await self._update_terminal_message(interaction) + + +class TerminalInputModal(ui.Modal, title="Send Command to Terminal"): + command_input = ui.TextInput( + label="Command", + placeholder="Enter command (e.g., ls -l, python script.py)", + style=discord.TextStyle.long, # For multi-line, though usually single. + max_length=400 + ) + + def __init__(self, cog: TerminalCog): + super().__init__(timeout=300) # 5 minutes timeout for modal + self.cog = cog + + async def on_submit(self, interaction: Interaction): + command = self.command_input.value + if command: + # Defer here as execute_shell_command can take time and will edit later + await interaction.response.defer() + await self.cog.execute_shell_command(command, interaction) + else: + await interaction.response.send_message("No command entered.", ephemeral=True) + + async def on_error(self, interaction: Interaction, error: Exception): + await interaction.response.send_message(f"Modal error: {error}", ephemeral=True) + print(f"TerminalInputModal error: {error}") + + +class TerminalView(ui.View): + def __init__(self, cog: TerminalCog, owner_id: int): + super().__init__(timeout=None) # Persistent view + self.cog = cog + self.owner_id = owner_id + self.message: discord.Message | None = None # To store the message this view is attached to + + # Add buttons after initialization + self._add_buttons() + + def _add_buttons(self): + self.clear_items() # Clear existing items if any (e.g., on re-creation) + + # Scroll Up + self.scroll_up_button = ui.Button(label="Scroll Up", emoji="⬆️", style=discord.ButtonStyle.secondary, row=0) + self.scroll_up_button.callback = self.scroll_up_callback + self.add_item(self.scroll_up_button) + + # Scroll Down + self.scroll_down_button = ui.Button(label="Scroll Down", emoji="⬇️", style=discord.ButtonStyle.secondary, row=0) + self.scroll_down_button.callback = self.scroll_down_callback + self.add_item(self.scroll_down_button) + + # Send Input + self.send_input_button = ui.Button(label="Send Input", emoji="⌨️", style=discord.ButtonStyle.primary, row=1) + self.send_input_button.callback = self.send_input_callback + self.add_item(self.send_input_button) + + # Refresh + self.refresh_button = ui.Button(label="Refresh", emoji="🔄", style=discord.ButtonStyle.success, row=1) + self.refresh_button.callback = self.refresh_callback + self.add_item(self.refresh_button) + + # Close/Exit Button + self.close_button = ui.Button(label="Close Terminal", emoji="❌", style=discord.ButtonStyle.danger, row=1) + self.close_button.callback = self.close_callback + self.add_item(self.close_button) + + self.update_button_states(self.cog) + + + async def interaction_check(self, interaction: Interaction) -> bool: + """Ensure only the bot owner can interact.""" + # Use the cog's owner_id which should be set correctly + is_allowed = interaction.user.id == self.cog.owner_id + if not is_allowed: + await interaction.response.send_message("You are not authorized to use these buttons.", ephemeral=True) + return is_allowed + + def update_button_states(self, cog_state: TerminalCog): + """Enable/disable buttons based on terminal state.""" + # Scroll Up + self.scroll_up_button.disabled = cog_state.scroll_offset <= 0 + + # Scroll Down + max_scroll = len(cog_state.output_history) - MAX_OUTPUT_LINES_PER_IMAGE + self.scroll_down_button.disabled = cog_state.scroll_offset >= max_scroll or len(cog_state.output_history) <= MAX_OUTPUT_LINES_PER_IMAGE + + # Send Input & Refresh should generally be enabled if terminal is active + self.send_input_button.disabled = not cog_state.terminal_active or (cog_state.active_process is not None and cog_state.active_process.poll() is None) # Disable if command running + self.refresh_button.disabled = not cog_state.terminal_active + self.close_button.disabled = not cog_state.terminal_active + + + async def scroll_up_callback(self, interaction: Interaction): + self.cog.scroll_offset = max(0, self.cog.scroll_offset - (MAX_OUTPUT_LINES_PER_IMAGE // 2)) # Scroll half page + await self.cog._update_terminal_message(interaction) + + async def scroll_down_callback(self, interaction: Interaction): + max_scroll = len(self.cog.output_history) - MAX_OUTPUT_LINES_PER_IMAGE + self.cog.scroll_offset = min(max_scroll, self.cog.scroll_offset + (MAX_OUTPUT_LINES_PER_IMAGE // 2)) + self.cog.scroll_offset = max(0, self.cog.scroll_offset) # Ensure not negative + await self.cog._update_terminal_message(interaction) + + async def send_input_callback(self, interaction: Interaction): + modal = TerminalInputModal(self.cog) + await interaction.response.send_modal(modal) + # The modal's on_submit will handle command execution and update + + async def refresh_callback(self, interaction: Interaction): + # Defer because refresh_terminal_output might take a moment and edit + await interaction.response.defer() + await self.cog.refresh_terminal_output(interaction) # Pass interaction to update message + + async def close_callback(self, interaction: Interaction): + await interaction.response.defer() # Defer before stopping session + await self.cog.stop_terminal_session(interaction) + # The stop_terminal_session should edit the message to indicate closure. + # No further update needed here for the view itself as it will be removed. + +async def setup(bot: commands.Bot): + terminal_cog = TerminalCog(bot) + await bot.add_cog(terminal_cog) + # Call cog_load if it's not automatically handled by your bot's loading mechanism + # or if it needs to be explicitly called after add_cog for some reason. + # In most modern discord.py setups, cog_load is called automatically. + # If self.owner_id is not being set, ensure cog_load is being called. + # You might need to manually call it if your bot structure doesn't. + # Example: if hasattr(terminal_cog, 'cog_load'): + # await terminal_cog.cog_load()