discordbot/cogs/terminal_cog.py
Slipstream c9d6cc40a6
feat: Add terminal image endpoint and save images
Introduces a new API endpoint `/terminal_images` to serve generated terminal output images.

- Creates a new file `api_service/terminal_images_endpoint.py` to handle the static file serving.
- Modifies `api_service/api_server.py` to mount the new endpoint.
- Updates `cogs/terminal_cog.py` to save generated terminal images to a local directory (`terminal_images`) with unique filenames.
- Adds a base URL constant (`API_BASE_URL`) to `cogs/terminal_cog.py` for potential future use in generating image URLs.
2025-05-17 19:29:05 -06:00

559 lines
27 KiB
Python

import discord
from discord.ext import commands, tasks
from discord import app_commands, ui, File, Interaction
from PIL import Image, ImageDraw, ImageFont
import subprocess
import os
import io
import uuid
import time
from collections import deque
import shlex # For safer command parsing if not using shell=True for everything
# --- Configuration ---
FONT_PATH = "FONT/DejaVuSansMono.ttf" # IMPORTANT: Make sure this font file (e.g., Courier New) is in the same directory as your bot, or provide an absolute path.
# You can download common monospaced fonts like DejaVuSansMono.ttf
FONT_SIZE = 15
IMG_WIDTH = 800
IMG_HEIGHT = 600
PADDING = 10
LINE_SPACING = 4 # Extra pixels between lines
BACKGROUND_COLOR = (30, 30, 30) # Dark grey
TEXT_COLOR = (220, 220, 220) # Light grey
PROMPT_COLOR = (70, 170, 240) # Blueish
ERROR_COLOR = (255, 100, 100) # Reddish
MAX_HISTORY_LINES = 500 # Max lines to keep in history
AUTO_UPDATE_INTERVAL_SECONDS = 3
MAX_OUTPUT_LINES_PER_IMAGE = (IMG_HEIGHT - 2 * PADDING) // (FONT_SIZE + LINE_SPACING)
OWNER_ID = 452666956353503252
TERMINAL_IMAGES_DIR = "terminal_images" # Directory to store terminal images
# Use your actual domain or IP address here
API_BASE_URL = "https://slipstreamm.dev" # Base URL for the API
# --- Helper: Owner Check ---
async def is_owner_check(interaction: discord.Interaction) -> bool:
"""Checks if the interacting user is the hardcoded bot owner."""
return interaction.user.id == OWNER_ID
class TerminalCog(commands.Cog, name="Terminal"):
"""Owner-only cog for a terminal interface in Discord."""
def __init__(self, bot: commands.Bot):
self.bot = bot
self.owner_id: int = 0 # Will be set in cog_load
self.terminal_active: bool = False
self.current_cwd: str = os.getcwd()
self.output_history: deque[str] = deque(maxlen=MAX_HISTORY_LINES)
self.scroll_offset: int = 0
self.terminal_message: discord.Message | None = None
self.active_process: subprocess.Popen | None = None
self.terminal_view: TerminalView | None = None
self.last_command: str | None = None # Store the last command for display after execution
try:
self.font = ImageFont.truetype(FONT_PATH, FONT_SIZE)
except IOError:
print(f"Error: Font file '{FONT_PATH}' not found. Using default PIL font. Terminal image quality may be affected.")
self.font = ImageFont.load_default()
self.auto_update_task = tasks.loop(seconds=AUTO_UPDATE_INTERVAL_SECONDS)(self.refresh_terminal_output)
# Ensure cog_load is defined to set owner_id properly
# self.bot.loop.create_task(self._async_init()) # Alternative for async setup
async def cog_load(self):
"""Async setup for the cog, like fetching owner_id."""
app_info = await self.bot.application_info()
if app_info.team:
# For teams, owner_id might be the team owner's ID or you might need a list of allowed admins.
# This example will use the team owner if available, otherwise the first listed owner.
self.owner_id = app_info.owner.owner_id if app_info.owner and hasattr(app_info.owner, 'owner_id') else (app_info.owner.id if app_info.owner else 0)
elif app_info.owner:
self.owner_id = app_info.owner.id
else:
print("Warning: Bot owner ID could not be determined. Terminal cog owner checks might fail.")
# Fallback or raise error
# self.owner_id = YOUR_FALLBACK_OWNER_ID # if you have one
def _generate_terminal_image(self) -> tuple[io.BytesIO, str]:
"""
Generates an image of the current terminal output.
Returns a tuple of (BytesIO object, filename)
"""
image = Image.new('RGB', (IMG_WIDTH, IMG_HEIGHT), BACKGROUND_COLOR)
draw = ImageDraw.Draw(image)
char_width, _ = self.font.getbbox("M")[2:] # Get width of a character
if char_width == 0: char_width = FONT_SIZE // 2 # Estimate if getbbox fails for default font
y_pos = PADDING
# Determine visible lines based on scroll offset
start_index = self.scroll_offset
end_index = min(len(self.output_history), self.scroll_offset + MAX_OUTPUT_LINES_PER_IMAGE)
visible_lines = list(self.output_history)[start_index:end_index]
for line in visible_lines:
# Basic coloring for prompt or errors
display_color = TEXT_COLOR
if line.strip().endswith(">") and self.current_cwd in line : # Basic prompt detection
display_color = PROMPT_COLOR
elif "error" in line.lower() or "failed" in line.lower(): # Basic error detection
display_color = ERROR_COLOR
# Handle lines longer than image width (simple truncation)
# A more advanced version could wrap text or allow horizontal scrolling.
max_chars_on_line = (IMG_WIDTH - 2 * PADDING) // char_width if char_width > 0 else 80
# Truncate if needed
# Pillow's draw.text handles clipping, but explicit truncation can be clearer
# For simplicity, we'll let Pillow clip.
# if len(line) > max_chars_on_line:
# line = line[:max_chars_on_line-3] + "..."
draw.text((PADDING, y_pos), line, font=self.font, fill=display_color)
y_pos += FONT_SIZE + LINE_SPACING
if y_pos > IMG_HEIGHT - PADDING - FONT_SIZE:
break # Stop if no more space
# Create a unique filename with timestamp
filename = f"terminal_{uuid.uuid4().hex[:8]}_{int(time.time())}.png"
# Ensure the terminal_images directory exists
os.makedirs(TERMINAL_IMAGES_DIR, exist_ok=True)
# Save the image to the terminal_images directory
file_path = os.path.join(TERMINAL_IMAGES_DIR, filename)
image.save(file_path, format='PNG')
# Also return a BytesIO object for backward compatibility
img_byte_arr = io.BytesIO()
image.save(img_byte_arr, format='PNG')
img_byte_arr.seek(0)
return img_byte_arr, filename
async def _update_terminal_message(self, interaction: Interaction | None = None, new_content: str | None = None):
"""Updates the terminal message with a new image and view."""
if not self.terminal_message and interaction:
# This case should ideally be handled by sending a new message
# For now, we assume terminal_message is set after initial command
return
if not self.terminal_active:
if self.terminal_message:
await self.terminal_message.edit(content="Terminal session ended.", view=None, attachments=[])
return
# Generate the image and save it to the terminal_images directory
image_bytes, filename = self._generate_terminal_image()
# Create the URL for the image
image_url = f"{API_BASE_URL}/terminal_images/{filename}"
if self.terminal_view:
self.terminal_view.update_button_states(self) # Update button enable/disable
# Prepare the message content with the image URL
content = f"Terminal Output: [View Image]({image_url})" if not new_content else new_content
edit_kwargs = {"content": content, "view": self.terminal_view, "attachments": []}
try:
if interaction and not interaction.response.is_done():
await interaction.response.edit_message(**edit_kwargs)
if not self.terminal_message: # If interaction was the first one
self.terminal_message = await interaction.original_response()
elif self.terminal_message:
await self.terminal_message.edit(**edit_kwargs)
else:
# This should not happen if terminal is active
print("Error: Terminal message not found for update.")
# If interaction is None, we can't send a new message easily here
# This path is usually for auto-updates.
except discord.NotFound:
print("Error: Terminal message not found (deleted?). Ending session.")
await self.stop_terminal_session()
except discord.HTTPException as e:
print(f"Error updating terminal message: {e}")
if e.status == 429: # Rate limited
print("Rate limited. Auto-update might be too fast or manual refresh too frequent.")
async def stop_terminal_session(self, interaction: Interaction | None = None):
"""Stops the terminal session and cleans up."""
self.terminal_active = False
if self.auto_update_task.is_running():
self.auto_update_task.cancel()
if self.active_process:
try:
self.active_process.terminate() # Try to terminate gracefully
self.active_process.wait(timeout=1.0) # Wait a bit
except subprocess.TimeoutExpired:
self.active_process.kill() # Force kill if terminate fails
except Exception as e:
print(f"Error terminating process: {e}")
self.active_process = None
final_message = "Terminal session ended."
if self.terminal_message:
try:
await self.terminal_message.edit(content=final_message, view=None, attachments=[])
except discord.HTTPException:
pass # Message might already be gone
elif interaction: # If no persistent message, respond to interaction
if not interaction.response.is_done():
await interaction.response.send_message(final_message, ephemeral=True)
else:
await interaction.followup.send(final_message, ephemeral=True)
self.terminal_message = None
self.output_history.clear()
self.scroll_offset = 0
@app_commands.command(name="terminal", description="Starts an owner-only terminal session.")
@app_commands.check(is_owner_check)
async def terminal_command(self, interaction: Interaction):
"""Starts the terminal interface."""
if self.terminal_active and self.terminal_message:
await interaction.response.send_message(f"A terminal session is already active. View it here: {self.terminal_message.jump_url}", ephemeral=True)
return
await interaction.response.defer(ephemeral=False) # Ephemeral False to allow message editing
self.terminal_active = True
self.current_cwd = os.getcwd()
self.output_history.clear()
self.output_history.append(f"Discord Terminal Initialized.")
self.output_history.append(f"Owner: {interaction.user.name} ({interaction.user.id})")
self.output_history.append(f"Current CWD: {self.current_cwd}")
self.output_history.append(f"{self.current_cwd}> ")
self.scroll_offset = max(0, len(self.output_history) - MAX_OUTPUT_LINES_PER_IMAGE) # Scroll to bottom
self.terminal_view = TerminalView(cog=self, owner_id=self.owner_id)
# Generate the image and save it to the terminal_images directory
_, filename = self._generate_terminal_image()
# Create the URL for the image
image_url = f"{API_BASE_URL}/terminal_images/{filename}"
# Send initial message and store it
# Use followup since we deferred
self.terminal_message = await interaction.followup.send(
content=f"Terminal Output: [View Image]({image_url})",
view=self.terminal_view
)
self.terminal_view.message = self.terminal_message # Give view a reference to the message
@terminal_command.error
async def terminal_command_error(self, interaction: Interaction, error: app_commands.AppCommandError):
if isinstance(error, app_commands.CheckFailure):
await interaction.response.send_message("You do not have permission to use this command.", ephemeral=True)
else:
await interaction.response.send_message(f"An error occurred: {error}", ephemeral=True)
print(f"Terminal command error: {error}")
async def execute_shell_command(self, command: str, interaction: Interaction):
"""Executes a shell command and updates the terminal."""
if not self.terminal_active:
await interaction.response.send_message("Terminal session is not active. Use `/terminal` to start.", ephemeral=True)
return
# Handle 'clear' command separately
if command.strip().lower() == "clear" or command.strip().lower() == "cls":
self.output_history.clear()
self.output_history.append(f"{self.current_cwd}> ") # Add new prompt
self.scroll_offset = 0 # Reset scroll
await self._update_terminal_message(interaction)
return
# Handle 'cd' command separately
if command.strip().lower().startswith("cd "):
try:
target_dir_str = command.strip()[3:].strip()
if not target_dir_str: # "cd" or "cd "
# Go to home directory (platform dependent)
new_cwd = os.path.expanduser("~")
else:
# Replace ~ with home directory path
target_dir_str = os.path.expanduser(target_dir_str)
if os.path.isabs(target_dir_str):
new_cwd = target_dir_str
else:
new_cwd = os.path.abspath(os.path.join(self.current_cwd, target_dir_str))
if os.path.isdir(new_cwd):
self.current_cwd = new_cwd
# self.output_history.append(f"Changed directory to: {self.current_cwd}") # Optional: output for cd
else:
self.output_history.append(f"Error: Directory not found: {new_cwd}")
except Exception as e:
self.output_history.append(f"Error changing directory: {e}")
self.output_history.append(f"{self.current_cwd}> ") # New prompt
self.scroll_offset = max(0, len(self.output_history) - MAX_OUTPUT_LINES_PER_IMAGE)
await self._update_terminal_message(interaction)
return
# Handle 'exit' or 'quit'
if command.strip().lower() in ["exit", "quit"]:
self.output_history.append("Exiting terminal session...")
await self._update_terminal_message(interaction) # Show exit message
await self.stop_terminal_session(interaction)
return
self.last_command = command # Store command for display after execution
# For other commands, use subprocess
if self.active_process and self.active_process.poll() is None:
self.output_history.append("A command is already running. Please wait or refresh.")
await self._update_terminal_message(interaction)
return
# For other commands, use subprocess
if self.active_process and self.active_process.poll() is None:
self.output_history.append("A command is already running. Please wait or refresh.")
await self._update_terminal_message(interaction)
return
try:
# Use shlex.split for safer command parsing
command_parts = shlex.split(command)
if not command_parts:
self.output_history.append("No command provided.")
self.output_history.append(f"{self.current_cwd}> ")
self.scroll_offset = max(0, len(self.output_history) - MAX_OUTPUT_LINES_PER_IMAGE)
await self._update_terminal_message(interaction)
return
self.active_process = subprocess.Popen(
command_parts,
stdin=subprocess.PIPE, # Enable interactive input
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True, # Use text mode for easier handling of output
cwd=self.current_cwd,
# bufsize=1, # Removed line-buffering for better interactive handling
# universal_newlines=True # text=True handles this
)
if not self.auto_update_task.is_running():
self.auto_update_task.start()
# Initial update to show command is running
self.output_history.append(f"{self.current_cwd}> {command}") # Add command to history immediately
self.scroll_offset = max(0, len(self.output_history) - MAX_OUTPUT_LINES_PER_IMAGE)
await self._update_terminal_message(interaction)
except FileNotFoundError:
self.output_history.append(f"Error: Command not found: {command_parts[0]}")
self.output_history.append(f"{self.current_cwd}> ")
self.scroll_offset = max(0, len(self.output_history) - MAX_OUTPUT_LINES_PER_IMAGE)
await self._update_terminal_message(interaction)
except Exception as e:
self.output_history.append(f"Error executing command: {e}")
self.output_history.append(f"{self.current_cwd}> ")
self.scroll_offset = max(0, len(self.output_history) - MAX_OUTPUT_LINES_PER_IMAGE)
await self._update_terminal_message(interaction)
async def refresh_terminal_output(self, interaction: Interaction | None = None):
"""Called by task loop or refresh button to update output from active process."""
if not self.terminal_active:
if self.auto_update_task.is_running():
self.auto_update_task.cancel()
return
updated = False
if self.active_process:
if self.active_process.poll() is not None: # Process has finished
return_code = self.active_process.returncode
# Read any final output
final_stdout, final_stderr = self.active_process.communicate()
if final_stdout: self.output_history.extend(final_stdout.strip().splitlines())
if final_stderr: self.output_history.extend([f"STDERR: {l}" for l in final_stderr.strip().splitlines()])
self.output_history.append(f"Process finished with exit code {return_code}.")
self.output_history.append(f"{self.current_cwd}> ") # New prompt
self.active_process = None
if self.auto_update_task.is_running(): # Stop loop if it was running for this process
self.auto_update_task.stop()
updated = True
else: # Process is still running, check for new output without blocking
try:
# Read available output without blocking
stdout_output = self.active_process.stdout.read()
stderr_output = self.active_process.stderr.read()
if stdout_output:
self.output_history.extend(stdout_output.strip().splitlines())
updated = True
if stderr_output:
self.output_history.extend([f"STDERR: {l}" for l in stderr_output.strip().splitlines()])
updated = True
except io.UnsupportedOperation:
# This might happen if the stream is not seekable or non-blocking read is not supported
# In this case, we might just have to wait for the process to finish
pass # No update from this read attempt
except Exception as e:
self.output_history.append(f"Error reading process output: {e}")
updated = True
if updated or interaction: # if interaction, means it's a manual refresh, so always update
self.scroll_offset = max(0, len(self.output_history) - MAX_OUTPUT_LINES_PER_IMAGE)
await self._update_terminal_message(interaction)
# If an interactive prompt was detected and the process is still running,
# we might need to signal the user or change button states.
# This part will be handled in the TerminalView update_button_states
# and potentially a new mechanism for sending input.
class TerminalInputModal(ui.Modal, title="Send Command to Terminal"):
command_input = ui.TextInput(
label="Command",
placeholder="Enter command (e.g., ls -l, python script.py)",
style=discord.TextStyle.long, # For multi-line, though usually single.
max_length=400
)
def __init__(self, cog: TerminalCog):
super().__init__(timeout=300) # 5 minutes timeout for modal
self.cog = cog
async def on_submit(self, interaction: Interaction):
user_input = self.command_input.value
if not user_input:
await interaction.response.send_message("No input entered.", ephemeral=True)
return
# Defer the interaction as we will update the message later
await interaction.response.defer()
if self.cog.active_process and self.cog.active_process.poll() is None:
# There is an active process, assume the input is for it
try:
self.cog.active_process.stdin.write(user_input + '\n')
self.cog.active_process.stdin.flush()
# Add the input to history for display
self.cog.output_history.append(user_input)
self.cog.scroll_offset = max(0, len(self.cog.output_history) - MAX_OUTPUT_LINES_PER_IMAGE)
await self.cog._update_terminal_message(interaction) # Update message with the input
except Exception as e:
self.cog.output_history.append(f"Error sending input to process: {e}")
self.cog.scroll_offset = max(0, len(self.cog.output_history) - MAX_OUTPUT_LINES_PER_IMAGE)
await self.cog._update_terminal_message(interaction)
else:
# No active process, execute as a new command
await self.cog.execute_shell_command(user_input, interaction)
async def on_error(self, interaction: Interaction, error: Exception):
await interaction.response.send_message(f"Modal error: {error}", ephemeral=True)
print(f"TerminalInputModal error: {error}")
class TerminalView(ui.View):
def __init__(self, cog: TerminalCog, owner_id: int):
super().__init__(timeout=None) # Persistent view
self.cog = cog
self.owner_id = owner_id
self.message: discord.Message | None = None # To store the message this view is attached to
# Add buttons after initialization
self._add_buttons()
def _add_buttons(self):
self.clear_items() # Clear existing items if any (e.g., on re-creation)
# Scroll Up
self.scroll_up_button = ui.Button(label="Scroll Up", emoji="⬆️", style=discord.ButtonStyle.secondary, row=0)
self.scroll_up_button.callback = self.scroll_up_callback
self.add_item(self.scroll_up_button)
# Scroll Down
self.scroll_down_button = ui.Button(label="Scroll Down", emoji="⬇️", style=discord.ButtonStyle.secondary, row=0)
self.scroll_down_button.callback = self.scroll_down_callback
self.add_item(self.scroll_down_button)
# Send Input
self.send_input_button = ui.Button(label="Send Input", emoji="⌨️", style=discord.ButtonStyle.primary, row=1)
self.send_input_button.callback = self.send_input_callback
self.add_item(self.send_input_button)
# Refresh
self.refresh_button = ui.Button(label="Refresh", emoji="🔄", style=discord.ButtonStyle.success, row=1)
self.refresh_button.callback = self.refresh_callback
self.add_item(self.refresh_button)
# Close/Exit Button
self.close_button = ui.Button(label="Close Terminal", emoji="", style=discord.ButtonStyle.danger, row=1)
self.close_button.callback = self.close_callback
self.add_item(self.close_button)
self.update_button_states(self.cog)
async def interaction_check(self, interaction: Interaction) -> bool:
"""Ensure only the bot owner can interact."""
# Use the cog's owner_id which should be set correctly
is_allowed = interaction.user.id == OWNER_ID
if not is_allowed:
await interaction.response.send_message("You are not authorized to use these buttons.", ephemeral=True)
return is_allowed
def update_button_states(self, cog_state: TerminalCog):
"""Enable/disable buttons based on terminal state."""
# Scroll Up
self.scroll_up_button.disabled = cog_state.scroll_offset <= 0
# Scroll Down
max_scroll = len(cog_state.output_history) - MAX_OUTPUT_LINES_PER_IMAGE
self.scroll_down_button.disabled = cog_state.scroll_offset >= max_scroll or len(cog_state.output_history) <= MAX_OUTPUT_LINES_PER_IMAGE
# Send Input & Refresh should generally be enabled if terminal is active
self.send_input_button.disabled = not cog_state.terminal_active or (cog_state.active_process is not None and cog_state.active_process.poll() is None) # Disable if command running
self.refresh_button.disabled = not cog_state.terminal_active
self.close_button.disabled = not cog_state.terminal_active
async def scroll_up_callback(self, interaction: Interaction):
self.cog.scroll_offset = max(0, self.cog.scroll_offset - (MAX_OUTPUT_LINES_PER_IMAGE // 2)) # Scroll half page
await self.cog._update_terminal_message(interaction)
async def scroll_down_callback(self, interaction: Interaction):
max_scroll = len(self.cog.output_history) - MAX_OUTPUT_LINES_PER_IMAGE
self.cog.scroll_offset = min(max_scroll, self.cog.scroll_offset + (MAX_OUTPUT_LINES_PER_IMAGE // 2))
self.cog.scroll_offset = max(0, self.cog.scroll_offset) # Ensure not negative
await self.cog._update_terminal_message(interaction)
async def send_input_callback(self, interaction: Interaction):
modal = TerminalInputModal(self.cog)
await interaction.response.send_modal(modal)
# The modal's on_submit will handle command execution and update
async def refresh_callback(self, interaction: Interaction):
# Defer because refresh_terminal_output might take a moment and edit
await interaction.response.defer()
await self.cog.refresh_terminal_output(interaction) # Pass interaction to update message
async def close_callback(self, interaction: Interaction):
await interaction.response.defer() # Defer before stopping session
await self.cog.stop_terminal_session(interaction)
# The stop_terminal_session should edit the message to indicate closure.
# No further update needed here for the view itself as it will be removed.
async def setup(bot: commands.Bot):
terminal_cog = TerminalCog(bot)
await bot.add_cog(terminal_cog)
# Call cog_load if it's not automatically handled by your bot's loading mechanism
# or if it needs to be explicitly called after add_cog for some reason.
# In most modern discord.py setups, cog_load is called automatically.
# If self.owner_id is not being set, ensure cog_load is being called.
# You might need to manually call it if your bot structure doesn't.
# Example: if hasattr(terminal_cog, 'cog_load'):
# await terminal_cog.cog_load()