Add TerminalCog for owner-only terminal interface with command execution

- Implemented a terminal interface in Discord using a cog.
- Added command handling for shell commands with output displayed as images.
- Included scrolling functionality for terminal output.
- Integrated a modal for command input and buttons for user interaction.
- Ensured only the bot owner can access the terminal features.
- Added error handling for command execution and process management.
- Configured terminal appearance with customizable font and colors.
This commit is contained in:
Slipstream 2025-05-17 18:51:39 -06:00
parent eccc670823
commit a857e4103d
Signed by: slipstream
GPG Key ID: 13E498CE010AC6FD
2 changed files with 522 additions and 0 deletions

BIN
FONT/DejaVuSansMono.ttf Normal file

Binary file not shown.

522
cogs/terminal_cog.py Normal file
View File

@ -0,0 +1,522 @@
import discord
from discord.ext import commands, tasks
from discord import app_commands, ui, File, Interaction
from PIL import Image, ImageDraw, ImageFont
import subprocess
import os
import io
from collections import deque
import shlex # For safer command parsing if not using shell=True for everything
# --- Configuration ---
FONT_PATH = "FONT/DejaVuSansMono.ttf" # IMPORTANT: Make sure this font file (e.g., Courier New) is in the same directory as your bot, or provide an absolute path.
# You can download common monospaced fonts like DejaVuSansMono.ttf
FONT_SIZE = 15
IMG_WIDTH = 800
IMG_HEIGHT = 600
PADDING = 10
LINE_SPACING = 4 # Extra pixels between lines
BACKGROUND_COLOR = (30, 30, 30) # Dark grey
TEXT_COLOR = (220, 220, 220) # Light grey
PROMPT_COLOR = (70, 170, 240) # Blueish
ERROR_COLOR = (255, 100, 100) # Reddish
MAX_HISTORY_LINES = 500 # Max lines to keep in history
AUTO_UPDATE_INTERVAL_SECONDS = 3
MAX_OUTPUT_LINES_PER_IMAGE = (IMG_HEIGHT - 2 * PADDING) // (FONT_SIZE + LINE_SPACING)
# --- Helper: Owner Check ---
async def is_owner_check(interaction: discord.Interaction) -> bool:
"""Checks if the interacting user is the bot owner."""
if interaction.client.application is None: # Should not happen in slash commands
await interaction.client.application_info() # Fetch if not cached
# Using application.owner for robust owner checking
# For single owner bots, bot.owner_id can also be used if set.
# For team bots, application.owner will be the team. You might need to check team members.
# For simplicity, assuming a single owner or the first team member if it's a team.
app_info = await interaction.client.application_info()
if app_info.team:
# If it's a team, you might want to check if interaction.user.id is in team members
# For this example, we'll consider the team owner (usually the creator)
# This logic might need adjustment based on how discord.py handles team owners.
# A simpler approach for a single-person owned bot: return interaction.user.id == app_info.owner.id
# However, app_info.owner can be a User or a Team.
if isinstance(app_info.owner, discord.User):
return interaction.user.id == app_info.owner.id
elif isinstance(app_info.owner, discord.Team):
# Check if the user is a member of the team, or the team owner
# For simplicity, let's check if the user is the team owner (first member often is)
# or any member of the team.
if app_info.owner.owner_id == interaction.user.id:
return True
for member in app_info.owner.members:
if member.id == interaction.user.id:
return True # Or apply specific roles
return False # Or restrict to only team owner
return False # Fallback
elif app_info.owner: # Single owner
return interaction.user.id == app_info.owner.id
return False # Should not be reached if bot is properly configured
class TerminalCog(commands.Cog, name="Terminal"):
"""Owner-only cog for a terminal interface in Discord."""
def __init__(self, bot: commands.Bot):
self.bot = bot
self.owner_id: int = 0 # Will be set in cog_load
self.terminal_active: bool = False
self.current_cwd: str = os.getcwd()
self.output_history: deque[str] = deque(maxlen=MAX_HISTORY_LINES)
self.scroll_offset: int = 0
self.terminal_message: discord.Message | None = None
self.active_process: subprocess.Popen | None = None
self.terminal_view: TerminalView | None = None
try:
self.font = ImageFont.truetype(FONT_PATH, FONT_SIZE)
except IOError:
print(f"Error: Font file '{FONT_PATH}' not found. Using default PIL font. Terminal image quality may be affected.")
self.font = ImageFont.load_default()
self.auto_update_task = tasks.loop(seconds=AUTO_UPDATE_INTERVAL_SECONDS)(self.refresh_terminal_output)
# Ensure cog_load is defined to set owner_id properly
# self.bot.loop.create_task(self._async_init()) # Alternative for async setup
async def cog_load(self):
"""Async setup for the cog, like fetching owner_id."""
app_info = await self.bot.application_info()
if app_info.team:
# For teams, owner_id might be the team owner's ID or you might need a list of allowed admins.
# This example will use the team owner if available, otherwise the first listed owner.
self.owner_id = app_info.owner.owner_id if app_info.owner and hasattr(app_info.owner, 'owner_id') else (app_info.owner.id if app_info.owner else 0)
elif app_info.owner:
self.owner_id = app_info.owner.id
else:
print("Warning: Bot owner ID could not be determined. Terminal cog owner checks might fail.")
# Fallback or raise error
# self.owner_id = YOUR_FALLBACK_OWNER_ID # if you have one
def _generate_terminal_image(self) -> io.BytesIO:
"""Generates an image of the current terminal output."""
image = Image.new('RGB', (IMG_WIDTH, IMG_HEIGHT), BACKGROUND_COLOR)
draw = ImageDraw.Draw(image)
char_width, _ = self.font.getbbox("M")[2:] # Get width of a character
if char_width == 0: char_width = FONT_SIZE // 2 # Estimate if getbbox fails for default font
y_pos = PADDING
# Determine visible lines based on scroll offset
start_index = self.scroll_offset
end_index = min(len(self.output_history), self.scroll_offset + MAX_OUTPUT_LINES_PER_IMAGE)
visible_lines = list(self.output_history)[start_index:end_index]
for line in visible_lines:
# Basic coloring for prompt or errors
display_color = TEXT_COLOR
if line.strip().endswith(">") and self.current_cwd in line : # Basic prompt detection
display_color = PROMPT_COLOR
elif "error" in line.lower() or "failed" in line.lower(): # Basic error detection
display_color = ERROR_COLOR
# Handle lines longer than image width (simple truncation)
# A more advanced version could wrap text or allow horizontal scrolling.
max_chars_on_line = (IMG_WIDTH - 2 * PADDING) // char_width if char_width > 0 else 80
# Truncate if needed
# Pillow's draw.text handles clipping, but explicit truncation can be clearer
# For simplicity, we'll let Pillow clip.
# if len(line) > max_chars_on_line:
# line = line[:max_chars_on_line-3] + "..."
draw.text((PADDING, y_pos), line, font=self.font, fill=display_color)
y_pos += FONT_SIZE + LINE_SPACING
if y_pos > IMG_HEIGHT - PADDING - FONT_SIZE:
break # Stop if no more space
img_byte_arr = io.BytesIO()
image.save(img_byte_arr, format='PNG')
img_byte_arr.seek(0)
return img_byte_arr
async def _update_terminal_message(self, interaction: Interaction | None = None, new_content: str | None = None):
"""Updates the terminal message with a new image and view."""
if not self.terminal_message and interaction:
# This case should ideally be handled by sending a new message
# For now, we assume terminal_message is set after initial command
return
if not self.terminal_active:
if self.terminal_message:
await self.terminal_message.edit(content="Terminal session ended.", view=None, attachments=[])
return
image_bytes = self._generate_terminal_image()
discord_file = File(fp=image_bytes, filename="terminal_output.png")
if self.terminal_view:
self.terminal_view.update_button_states(self) # Update button enable/disable
target_message = self.terminal_message
edit_kwargs = {"attachments": [discord_file], "view": self.terminal_view}
if new_content:
edit_kwargs["content"] = new_content
else: # Clear old content if any
edit_kwargs["content"] = None
try:
if interaction and not interaction.response.is_done():
await interaction.response.edit_message(**edit_kwargs)
if not self.terminal_message: # If interaction was the first one
self.terminal_message = await interaction.original_response()
elif self.terminal_message:
await self.terminal_message.edit(**edit_kwargs)
else:
# This should not happen if terminal is active
print("Error: Terminal message not found for update.")
# If interaction is None, we can't send a new message easily here
# This path is usually for auto-updates.
except discord.NotFound:
print("Error: Terminal message not found (deleted?). Ending session.")
await self.stop_terminal_session()
except discord.HTTPException as e:
print(f"Error updating terminal message: {e}")
if e.status == 429: # Rate limited
print("Rate limited. Auto-update might be too fast or manual refresh too frequent.")
async def stop_terminal_session(self, interaction: Interaction | None = None):
"""Stops the terminal session and cleans up."""
self.terminal_active = False
if self.auto_update_task.is_running():
self.auto_update_task.cancel()
if self.active_process:
try:
self.active_process.terminate() # Try to terminate gracefully
self.active_process.wait(timeout=1.0) # Wait a bit
except subprocess.TimeoutExpired:
self.active_process.kill() # Force kill if terminate fails
except Exception as e:
print(f"Error terminating process: {e}")
self.active_process = None
final_message = "Terminal session ended."
if self.terminal_message:
try:
await self.terminal_message.edit(content=final_message, view=None, attachments=[])
except discord.HTTPException:
pass # Message might already be gone
elif interaction: # If no persistent message, respond to interaction
if not interaction.response.is_done():
await interaction.response.send_message(final_message, ephemeral=True)
else:
await interaction.followup.send(final_message, ephemeral=True)
self.terminal_message = None
self.output_history.clear()
self.scroll_offset = 0
@app_commands.command(name="terminal", description="Starts an owner-only terminal session.")
@app_commands.check(is_owner_check)
async def terminal_command(self, interaction: Interaction):
"""Starts the terminal interface."""
if self.terminal_active and self.terminal_message:
await interaction.response.send_message(f"A terminal session is already active. View it here: {self.terminal_message.jump_url}", ephemeral=True)
return
await interaction.response.defer(ephemeral=False) # Ephemeral False to allow message editing
self.terminal_active = True
self.current_cwd = os.getcwd()
self.output_history.clear()
self.output_history.append(f"Discord Terminal Initialized.")
self.output_history.append(f"Owner: {interaction.user.name} ({interaction.user.id})")
self.output_history.append(f"Current CWD: {self.current_cwd}")
self.output_history.append(f"{self.current_cwd}> ")
self.scroll_offset = max(0, len(self.output_history) - MAX_OUTPUT_LINES_PER_IMAGE) # Scroll to bottom
self.terminal_view = TerminalView(cog=self, owner_id=self.owner_id)
image_bytes = self._generate_terminal_image()
discord_file = File(fp=image_bytes, filename="terminal_output.png")
# Send initial message and store it
# Use followup since we deferred
self.terminal_message = await interaction.followup.send(
file=discord_file,
view=self.terminal_view
)
self.terminal_view.message = self.terminal_message # Give view a reference to the message
@terminal_command.error
async def terminal_command_error(self, interaction: Interaction, error: app_commands.AppCommandError):
if isinstance(error, app_commands.CheckFailure):
await interaction.response.send_message("You do not have permission to use this command.", ephemeral=True)
else:
await interaction.response.send_message(f"An error occurred: {error}", ephemeral=True)
print(f"Terminal command error: {error}")
async def execute_shell_command(self, command: str, interaction: Interaction):
"""Executes a shell command and updates the terminal."""
if not self.terminal_active:
await interaction.response.send_message("Terminal session is not active. Use `/terminal` to start.", ephemeral=True)
return
# Add command to history (before execution)
self.output_history.append(f"{self.current_cwd}> {command}")
# Handle 'cd' command separately
if command.strip().lower().startswith("cd "):
try:
target_dir_str = command.strip()[3:].strip()
if not target_dir_str: # "cd" or "cd "
# Go to home directory (platform dependent)
new_cwd = os.path.expanduser("~")
else:
# Replace ~ with home directory path
target_dir_str = os.path.expanduser(target_dir_str)
if os.path.isabs(target_dir_str):
new_cwd = target_dir_str
else:
new_cwd = os.path.abspath(os.path.join(self.current_cwd, target_dir_str))
if os.path.isdir(new_cwd):
self.current_cwd = new_cwd
# self.output_history.append(f"Changed directory to: {self.current_cwd}") # Optional: output for cd
else:
self.output_history.append(f"Error: Directory not found: {new_cwd}")
except Exception as e:
self.output_history.append(f"Error changing directory: {e}")
self.output_history.append(f"{self.current_cwd}> ") # New prompt
self.scroll_offset = max(0, len(self.output_history) - MAX_OUTPUT_LINES_PER_IMAGE)
await self._update_terminal_message(interaction)
return
# Handle 'exit' or 'quit'
if command.strip().lower() in ["exit", "quit"]:
self.output_history.append("Exiting terminal session...")
await self._update_terminal_message(interaction) # Show exit message
await self.stop_terminal_session(interaction)
return
# For other commands, use subprocess
if self.active_process and self.active_process.poll() is None:
self.output_history.append("A command is already running. Please wait or refresh.")
await self._update_terminal_message(interaction)
return
try:
self.active_process = subprocess.Popen(
command,
shell=True, # Security risk: Be absolutely sure this is owner-only.
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
cwd=self.current_cwd,
bufsize=1, # Line-buffered
universal_newlines=True # For text mode
)
if not self.auto_update_task.is_running():
self.auto_update_task.start()
# Initial update to show command is running
self.scroll_offset = max(0, len(self.output_history) - MAX_OUTPUT_LINES_PER_IMAGE)
await self._update_terminal_message(interaction)
except FileNotFoundError:
self.output_history.append(f"Error: Command not found: {command.split()[0]}")
self.output_history.append(f"{self.current_cwd}> ")
self.scroll_offset = max(0, len(self.output_history) - MAX_OUTPUT_LINES_PER_IMAGE)
await self._update_terminal_message(interaction)
except Exception as e:
self.output_history.append(f"Error executing command: {e}")
self.output_history.append(f"{self.current_cwd}> ")
self.scroll_offset = max(0, len(self.output_history) - MAX_OUTPUT_LINES_PER_IMAGE)
await self._update_terminal_message(interaction)
async def refresh_terminal_output(self, interaction: Interaction | None = None):
"""Called by task loop or refresh button to update output from active process."""
if not self.terminal_active:
if self.auto_update_task.is_running():
self.auto_update_task.cancel()
return
updated = False
if self.active_process:
new_output_lines = []
try:
# Read non-blockingly (or as much as possible without full block)
# stdout
while True: # Read all available lines from stdout
line = self.active_process.stdout.readline()
if not line: # No more output currently, or EOF
break
new_output_lines.append(line.strip()) # Strip newlines
updated = True
# stderr
while True: # Read all available lines from stderr
line = self.active_process.stderr.readline()
if not line:
break
new_output_lines.append(f"STDERR: {line.strip()}")
updated = True
except Exception as e:
new_output_lines.append(f"Error reading process output: {e}")
updated = True
if new_output_lines:
self.output_history.extend(new_output_lines)
if self.active_process.poll() is not None: # Process has finished
return_code = self.active_process.returncode
# Read any final output
final_stdout, final_stderr = self.active_process.communicate()
if final_stdout: self.output_history.extend(final_stdout.strip().splitlines())
if final_stderr: self.output_history.extend([f"STDERR: {l}" for l in final_stderr.strip().splitlines()])
self.output_history.append(f"Process finished with exit code {return_code}.")
self.output_history.append(f"{self.current_cwd}> ") # New prompt
self.active_process = None
if self.auto_update_task.is_running(): # Stop loop if it was running for this process
self.auto_update_task.stop()
updated = True
if updated or interaction: # if interaction, means it's a manual refresh, so always update
self.scroll_offset = max(0, len(self.output_history) - MAX_OUTPUT_LINES_PER_IMAGE)
await self._update_terminal_message(interaction)
class TerminalInputModal(ui.Modal, title="Send Command to Terminal"):
command_input = ui.TextInput(
label="Command",
placeholder="Enter command (e.g., ls -l, python script.py)",
style=discord.TextStyle.long, # For multi-line, though usually single.
max_length=400
)
def __init__(self, cog: TerminalCog):
super().__init__(timeout=300) # 5 minutes timeout for modal
self.cog = cog
async def on_submit(self, interaction: Interaction):
command = self.command_input.value
if command:
# Defer here as execute_shell_command can take time and will edit later
await interaction.response.defer()
await self.cog.execute_shell_command(command, interaction)
else:
await interaction.response.send_message("No command entered.", ephemeral=True)
async def on_error(self, interaction: Interaction, error: Exception):
await interaction.response.send_message(f"Modal error: {error}", ephemeral=True)
print(f"TerminalInputModal error: {error}")
class TerminalView(ui.View):
def __init__(self, cog: TerminalCog, owner_id: int):
super().__init__(timeout=None) # Persistent view
self.cog = cog
self.owner_id = owner_id
self.message: discord.Message | None = None # To store the message this view is attached to
# Add buttons after initialization
self._add_buttons()
def _add_buttons(self):
self.clear_items() # Clear existing items if any (e.g., on re-creation)
# Scroll Up
self.scroll_up_button = ui.Button(label="Scroll Up", emoji="⬆️", style=discord.ButtonStyle.secondary, row=0)
self.scroll_up_button.callback = self.scroll_up_callback
self.add_item(self.scroll_up_button)
# Scroll Down
self.scroll_down_button = ui.Button(label="Scroll Down", emoji="⬇️", style=discord.ButtonStyle.secondary, row=0)
self.scroll_down_button.callback = self.scroll_down_callback
self.add_item(self.scroll_down_button)
# Send Input
self.send_input_button = ui.Button(label="Send Input", emoji="⌨️", style=discord.ButtonStyle.primary, row=1)
self.send_input_button.callback = self.send_input_callback
self.add_item(self.send_input_button)
# Refresh
self.refresh_button = ui.Button(label="Refresh", emoji="🔄", style=discord.ButtonStyle.success, row=1)
self.refresh_button.callback = self.refresh_callback
self.add_item(self.refresh_button)
# Close/Exit Button
self.close_button = ui.Button(label="Close Terminal", emoji="", style=discord.ButtonStyle.danger, row=1)
self.close_button.callback = self.close_callback
self.add_item(self.close_button)
self.update_button_states(self.cog)
async def interaction_check(self, interaction: Interaction) -> bool:
"""Ensure only the bot owner can interact."""
# Use the cog's owner_id which should be set correctly
is_allowed = interaction.user.id == self.cog.owner_id
if not is_allowed:
await interaction.response.send_message("You are not authorized to use these buttons.", ephemeral=True)
return is_allowed
def update_button_states(self, cog_state: TerminalCog):
"""Enable/disable buttons based on terminal state."""
# Scroll Up
self.scroll_up_button.disabled = cog_state.scroll_offset <= 0
# Scroll Down
max_scroll = len(cog_state.output_history) - MAX_OUTPUT_LINES_PER_IMAGE
self.scroll_down_button.disabled = cog_state.scroll_offset >= max_scroll or len(cog_state.output_history) <= MAX_OUTPUT_LINES_PER_IMAGE
# Send Input & Refresh should generally be enabled if terminal is active
self.send_input_button.disabled = not cog_state.terminal_active or (cog_state.active_process is not None and cog_state.active_process.poll() is None) # Disable if command running
self.refresh_button.disabled = not cog_state.terminal_active
self.close_button.disabled = not cog_state.terminal_active
async def scroll_up_callback(self, interaction: Interaction):
self.cog.scroll_offset = max(0, self.cog.scroll_offset - (MAX_OUTPUT_LINES_PER_IMAGE // 2)) # Scroll half page
await self.cog._update_terminal_message(interaction)
async def scroll_down_callback(self, interaction: Interaction):
max_scroll = len(self.cog.output_history) - MAX_OUTPUT_LINES_PER_IMAGE
self.cog.scroll_offset = min(max_scroll, self.cog.scroll_offset + (MAX_OUTPUT_LINES_PER_IMAGE // 2))
self.cog.scroll_offset = max(0, self.cog.scroll_offset) # Ensure not negative
await self.cog._update_terminal_message(interaction)
async def send_input_callback(self, interaction: Interaction):
modal = TerminalInputModal(self.cog)
await interaction.response.send_modal(modal)
# The modal's on_submit will handle command execution and update
async def refresh_callback(self, interaction: Interaction):
# Defer because refresh_terminal_output might take a moment and edit
await interaction.response.defer()
await self.cog.refresh_terminal_output(interaction) # Pass interaction to update message
async def close_callback(self, interaction: Interaction):
await interaction.response.defer() # Defer before stopping session
await self.cog.stop_terminal_session(interaction)
# The stop_terminal_session should edit the message to indicate closure.
# No further update needed here for the view itself as it will be removed.
async def setup(bot: commands.Bot):
terminal_cog = TerminalCog(bot)
await bot.add_cog(terminal_cog)
# Call cog_load if it's not automatically handled by your bot's loading mechanism
# or if it needs to be explicitly called after add_cog for some reason.
# In most modern discord.py setups, cog_load is called automatically.
# If self.owner_id is not being set, ensure cog_load is being called.
# You might need to manually call it if your bot structure doesn't.
# Example: if hasattr(terminal_cog, 'cog_load'):
# await terminal_cog.cog_load()