715 lines
30 KiB
Python
715 lines
30 KiB
Python
import discord
|
|
from discord.ext import commands, tasks
|
|
from discord import app_commands, ui, File, Interaction
|
|
from PIL import Image, ImageDraw, ImageFont
|
|
import subprocess
|
|
import os
|
|
import io
|
|
import uuid
|
|
import time
|
|
import aiohttp
|
|
import asyncio
|
|
from collections import deque
|
|
import shlex # For safer command parsing if not using shell=True for everything
|
|
|
|
# --- Configuration ---
|
|
FONT_PATH = "FONT/DejaVuSansMono.ttf" # IMPORTANT: Make sure this font file (e.g., Courier New) is in the same directory as your bot, or provide an absolute path.
|
|
# You can download common monospaced fonts like DejaVuSansMono.ttf
|
|
FONT_SIZE = 15
|
|
IMG_WIDTH = 800
|
|
IMG_HEIGHT = 600
|
|
PADDING = 10
|
|
LINE_SPACING = 4 # Extra pixels between lines
|
|
BACKGROUND_COLOR = (30, 30, 30) # Dark grey
|
|
TEXT_COLOR = (220, 220, 220) # Light grey
|
|
PROMPT_COLOR = (70, 170, 240) # Blueish
|
|
ERROR_COLOR = (255, 100, 100) # Reddish
|
|
MAX_HISTORY_LINES = 500 # Max lines to keep in history
|
|
AUTO_UPDATE_INTERVAL_SECONDS = 3
|
|
MAX_OUTPUT_LINES_PER_IMAGE = (IMG_HEIGHT - 2 * PADDING) // (FONT_SIZE + LINE_SPACING)
|
|
OWNER_ID = 452666956353503252
|
|
TERMINAL_IMAGES_DIR = "terminal_images" # Directory to store terminal images
|
|
# Use your actual domain or IP address here
|
|
API_BASE_URL = "https://slipstreamm.dev" # Base URL for the API
|
|
|
|
|
|
# --- Helper: Owner Check ---
|
|
async def is_owner_check(interaction: discord.Interaction) -> bool:
|
|
"""Checks if the interacting user is the hardcoded bot owner."""
|
|
return interaction.user.id == OWNER_ID
|
|
|
|
|
|
class TerminalCog(commands.Cog, name="Terminal"):
|
|
"""Owner-only cog for a terminal interface in Discord."""
|
|
|
|
def __init__(self, bot: commands.Bot):
|
|
self.bot = bot
|
|
self.owner_id: int = 0 # Will be set in cog_load
|
|
self.terminal_active: bool = False
|
|
self.current_cwd: str = os.getcwd()
|
|
self.output_history: deque[str] = deque(maxlen=MAX_HISTORY_LINES)
|
|
self.scroll_offset: int = 0
|
|
self.terminal_message: discord.Message | None = None
|
|
self.active_process: subprocess.Popen | None = None
|
|
self.terminal_view: TerminalView | None = None
|
|
self.last_command: str | None = (
|
|
None # Store the last command for display after execution
|
|
)
|
|
|
|
# Ensure the terminal_images directory exists with proper permissions
|
|
os.makedirs(TERMINAL_IMAGES_DIR, exist_ok=True)
|
|
# Try to set permissions to allow web server to read files
|
|
try:
|
|
# 0o755 = Owner can read/write/execute, others can read/execute
|
|
os.chmod(TERMINAL_IMAGES_DIR, 0o755)
|
|
print(f"Set permissions for {TERMINAL_IMAGES_DIR} to 0o755")
|
|
except Exception as e:
|
|
print(f"Warning: Could not set permissions for {TERMINAL_IMAGES_DIR}: {e}")
|
|
|
|
try:
|
|
self.font = ImageFont.truetype(FONT_PATH, FONT_SIZE)
|
|
except IOError:
|
|
print(
|
|
f"Error: Font file '{FONT_PATH}' not found. Using default PIL font. Terminal image quality may be affected."
|
|
)
|
|
self.font = ImageFont.load_default()
|
|
|
|
self.auto_update_task = tasks.loop(seconds=AUTO_UPDATE_INTERVAL_SECONDS)(
|
|
self.refresh_terminal_output
|
|
)
|
|
# Ensure cog_load is defined to set owner_id properly
|
|
# self.bot.loop.create_task(self._async_init()) # Alternative for async setup
|
|
|
|
async def cog_load(self):
|
|
"""Async setup for the cog, like fetching owner_id."""
|
|
app_info = await self.bot.application_info()
|
|
if app_info.team:
|
|
# For teams, owner_id might be the team owner's ID or you might need a list of allowed admins.
|
|
# This example will use the team owner if available, otherwise the first listed owner.
|
|
self.owner_id = (
|
|
app_info.owner.owner_id
|
|
if app_info.owner and hasattr(app_info.owner, "owner_id")
|
|
else (app_info.owner.id if app_info.owner else 0)
|
|
)
|
|
elif app_info.owner:
|
|
self.owner_id = app_info.owner.id
|
|
else:
|
|
print(
|
|
"Warning: Bot owner ID could not be determined. Terminal cog owner checks might fail."
|
|
)
|
|
# Fallback or raise error
|
|
# self.owner_id = YOUR_FALLBACK_OWNER_ID # if you have one
|
|
|
|
async def _warm_cloudflare_cache(self, image_url: str):
|
|
"""
|
|
Sends a request to the image URL to warm up the Cloudflare cache.
|
|
|
|
Args:
|
|
image_url: The URL of the image to warm up
|
|
"""
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(image_url, timeout=5) as response:
|
|
if response.status == 200:
|
|
print(f"Successfully warmed Cloudflare cache for {image_url}")
|
|
else:
|
|
print(
|
|
f"Failed to warm Cloudflare cache for {image_url}: HTTP {response.status}"
|
|
)
|
|
except Exception as e:
|
|
print(f"Error warming Cloudflare cache for {image_url}: {e}")
|
|
|
|
def _generate_terminal_image(self) -> tuple[io.BytesIO, str]:
|
|
"""
|
|
Generates an image of the current terminal output.
|
|
Returns a tuple of (BytesIO object, filename)
|
|
"""
|
|
image = Image.new("RGB", (IMG_WIDTH, IMG_HEIGHT), BACKGROUND_COLOR)
|
|
draw = ImageDraw.Draw(image)
|
|
|
|
char_width, _ = self.font.getbbox("M")[2:] # Get width of a character
|
|
if char_width == 0:
|
|
char_width = FONT_SIZE // 2 # Estimate if getbbox fails for default font
|
|
|
|
y_pos = PADDING
|
|
|
|
# Determine visible lines based on scroll offset
|
|
start_index = self.scroll_offset
|
|
end_index = min(
|
|
len(self.output_history), self.scroll_offset + MAX_OUTPUT_LINES_PER_IMAGE
|
|
)
|
|
|
|
visible_lines = list(self.output_history)[start_index:end_index]
|
|
|
|
for line in visible_lines:
|
|
# Basic coloring for prompt or errors
|
|
display_color = TEXT_COLOR
|
|
if (
|
|
line.strip().endswith(">") and self.current_cwd in line
|
|
): # Basic prompt detection
|
|
display_color = PROMPT_COLOR
|
|
elif (
|
|
"error" in line.lower() or "failed" in line.lower()
|
|
): # Basic error detection
|
|
display_color = ERROR_COLOR
|
|
|
|
# Handle lines longer than image width (simple truncation)
|
|
# A more advanced version could wrap text or allow horizontal scrolling.
|
|
# For simplicity, we'll let Pillow clip text that's too long
|
|
# Uncomment the following code to implement manual truncation:
|
|
# max_chars_on_line = (IMG_WIDTH - 2 * PADDING) // char_width if char_width > 0 else 80
|
|
# if len(line) > max_chars_on_line:
|
|
# line = line[:max_chars_on_line-3] + "..."
|
|
|
|
draw.text((PADDING, y_pos), line, font=self.font, fill=display_color)
|
|
y_pos += FONT_SIZE + LINE_SPACING
|
|
if y_pos > IMG_HEIGHT - PADDING - FONT_SIZE:
|
|
break # Stop if no more space
|
|
|
|
# Create a unique filename with timestamp
|
|
filename = f"terminal_{uuid.uuid4().hex[:8]}_{int(time.time())}.png"
|
|
|
|
# Ensure the terminal_images directory exists
|
|
os.makedirs(TERMINAL_IMAGES_DIR, exist_ok=True)
|
|
|
|
# Save the image to the terminal_images directory
|
|
file_path = os.path.join(TERMINAL_IMAGES_DIR, filename)
|
|
image.save(file_path, format="PNG")
|
|
|
|
# Also return a BytesIO object for backward compatibility
|
|
img_byte_arr = io.BytesIO()
|
|
image.save(img_byte_arr, format="PNG")
|
|
img_byte_arr.seek(0)
|
|
|
|
return img_byte_arr, filename
|
|
|
|
async def _update_terminal_message(
|
|
self, interaction: Interaction | None = None, new_content: str | None = None
|
|
):
|
|
"""Updates the terminal message with a new image and view."""
|
|
if not self.terminal_message and interaction:
|
|
# This case should ideally be handled by sending a new message
|
|
# For now, we assume terminal_message is set after initial command
|
|
return
|
|
|
|
if not self.terminal_active:
|
|
if self.terminal_message:
|
|
await self.terminal_message.edit(
|
|
content="Terminal session ended.", view=None, attachments=[]
|
|
)
|
|
return
|
|
|
|
# Generate the image and save it to the terminal_images directory
|
|
_, filename = self._generate_terminal_image()
|
|
|
|
# Create the URL for the image
|
|
image_url = f"{API_BASE_URL}/terminal_images/{filename}"
|
|
|
|
# Warm up the Cloudflare cache by sending a request to the image URL
|
|
# Use asyncio.create_task to run this in the background without waiting for it
|
|
asyncio.create_task(self._warm_cloudflare_cache(image_url))
|
|
|
|
if self.terminal_view:
|
|
self.terminal_view.update_button_states(
|
|
self
|
|
) # Update button enable/disable
|
|
|
|
# Prepare the message content with the image URL
|
|
content = (
|
|
f"Terminal Output: [View Image]({image_url})"
|
|
if not new_content
|
|
else new_content
|
|
)
|
|
edit_kwargs = {
|
|
"content": content,
|
|
"view": self.terminal_view,
|
|
"attachments": [],
|
|
}
|
|
|
|
try:
|
|
if interaction and not interaction.response.is_done():
|
|
await interaction.response.edit_message(**edit_kwargs)
|
|
if not self.terminal_message: # If interaction was the first one
|
|
self.terminal_message = await interaction.original_response()
|
|
elif self.terminal_message:
|
|
await self.terminal_message.edit(**edit_kwargs)
|
|
else:
|
|
# This should not happen if terminal is active
|
|
print("Error: Terminal message not found for update.")
|
|
# If interaction is None, we can't send a new message easily here
|
|
# This path is usually for auto-updates.
|
|
except discord.NotFound:
|
|
print("Error: Terminal message not found (deleted?). Ending session.")
|
|
await self.stop_terminal_session()
|
|
except discord.HTTPException as e:
|
|
print(f"Error updating terminal message: {e}")
|
|
if e.status == 429: # Rate limited
|
|
print(
|
|
"Rate limited. Auto-update might be too fast or manual refresh too frequent."
|
|
)
|
|
|
|
async def stop_terminal_session(self, interaction: Interaction | None = None):
|
|
"""Stops the terminal session and cleans up."""
|
|
self.terminal_active = False
|
|
if self.auto_update_task.is_running():
|
|
self.auto_update_task.cancel()
|
|
if self.active_process:
|
|
try:
|
|
self.active_process.terminate() # Try to terminate gracefully
|
|
self.active_process.wait(timeout=1.0) # Wait a bit
|
|
except subprocess.TimeoutExpired:
|
|
self.active_process.kill() # Force kill if terminate fails
|
|
except Exception as e:
|
|
print(f"Error terminating process: {e}")
|
|
self.active_process = None
|
|
|
|
final_message = "Terminal session ended."
|
|
if self.terminal_message:
|
|
try:
|
|
await self.terminal_message.edit(
|
|
content=final_message, view=None, attachments=[]
|
|
)
|
|
except discord.HTTPException:
|
|
pass # Message might already be gone
|
|
elif interaction: # If no persistent message, respond to interaction
|
|
if not interaction.response.is_done():
|
|
await interaction.response.send_message(final_message, ephemeral=True)
|
|
else:
|
|
await interaction.followup.send(final_message, ephemeral=True)
|
|
|
|
self.terminal_message = None
|
|
self.output_history.clear()
|
|
self.scroll_offset = 0
|
|
|
|
@app_commands.command(
|
|
name="terminal", description="Starts an owner-only terminal session."
|
|
)
|
|
@app_commands.check(is_owner_check)
|
|
async def terminal_command(self, interaction: Interaction):
|
|
"""Starts the terminal interface."""
|
|
if self.terminal_active and self.terminal_message:
|
|
await interaction.response.send_message(
|
|
f"A terminal session is already active. View it here: {self.terminal_message.jump_url}",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
|
|
await interaction.response.defer(
|
|
ephemeral=False
|
|
) # Ephemeral False to allow message editing
|
|
|
|
self.terminal_active = True
|
|
self.current_cwd = os.getcwd()
|
|
self.output_history.clear()
|
|
self.output_history.append(f"Discord Terminal Initialized.")
|
|
self.output_history.append(
|
|
f"Owner: {interaction.user.name} ({interaction.user.id})"
|
|
)
|
|
self.output_history.append(f"Current CWD: {self.current_cwd}")
|
|
self.output_history.append(f"{self.current_cwd}> ")
|
|
self.scroll_offset = max(
|
|
0, len(self.output_history) - MAX_OUTPUT_LINES_PER_IMAGE
|
|
) # Scroll to bottom
|
|
|
|
self.terminal_view = TerminalView(cog=self, owner_id=self.owner_id)
|
|
|
|
# Generate the image and save it to the terminal_images directory
|
|
_, filename = self._generate_terminal_image()
|
|
|
|
# Create the URL for the image
|
|
image_url = f"{API_BASE_URL}/terminal_images/{filename}"
|
|
|
|
# Warm up the Cloudflare cache by sending a request to the image URL
|
|
# Use asyncio.create_task to run this in the background without waiting for it
|
|
asyncio.create_task(self._warm_cloudflare_cache(image_url))
|
|
|
|
# Send initial message and store it
|
|
# Use followup since we deferred
|
|
self.terminal_message = await interaction.followup.send(
|
|
content=f"Terminal Output: [View Image]({image_url})",
|
|
view=self.terminal_view,
|
|
)
|
|
self.terminal_view.message = (
|
|
self.terminal_message
|
|
) # Give view a reference to the message
|
|
|
|
@terminal_command.error
|
|
async def terminal_command_error(
|
|
self, interaction: Interaction, error: app_commands.AppCommandError
|
|
):
|
|
if isinstance(error, app_commands.CheckFailure):
|
|
await interaction.response.send_message(
|
|
"You do not have permission to use this command.", ephemeral=True
|
|
)
|
|
else:
|
|
await interaction.response.send_message(
|
|
f"An error occurred: {error}", ephemeral=True
|
|
)
|
|
print(f"Terminal command error: {error}")
|
|
|
|
async def execute_shell_command(self, command: str, interaction: Interaction):
|
|
"""Executes a shell command and updates the terminal."""
|
|
if not self.terminal_active:
|
|
await interaction.response.send_message(
|
|
"Terminal session is not active. Use `/terminal` to start.",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
|
|
# Handle 'clear' command separately
|
|
if command.strip().lower() == "clear" or command.strip().lower() == "cls":
|
|
self.output_history.clear()
|
|
self.output_history.append(f"{self.current_cwd}> ") # Add new prompt
|
|
self.scroll_offset = 0 # Reset scroll
|
|
await self._update_terminal_message(interaction)
|
|
return
|
|
|
|
# Handle 'cd' command separately
|
|
if command.strip().lower().startswith("cd "):
|
|
try:
|
|
target_dir_str = command.strip()[3:].strip()
|
|
if not target_dir_str: # "cd" or "cd "
|
|
# Go to home directory (platform dependent)
|
|
new_cwd = os.path.expanduser("~")
|
|
else:
|
|
# Replace ~ with home directory path
|
|
target_dir_str = os.path.expanduser(target_dir_str)
|
|
if os.path.isabs(target_dir_str):
|
|
new_cwd = target_dir_str
|
|
else:
|
|
new_cwd = os.path.abspath(
|
|
os.path.join(self.current_cwd, target_dir_str)
|
|
)
|
|
|
|
if os.path.isdir(new_cwd):
|
|
self.current_cwd = new_cwd
|
|
# self.output_history.append(f"Changed directory to: {self.current_cwd}") # Optional: output for cd
|
|
else:
|
|
self.output_history.append(f"Error: Directory not found: {new_cwd}")
|
|
except Exception as e:
|
|
self.output_history.append(f"Error changing directory: {e}")
|
|
|
|
self.output_history.append(f"{self.current_cwd}> ") # New prompt
|
|
self.scroll_offset = max(
|
|
0, len(self.output_history) - MAX_OUTPUT_LINES_PER_IMAGE
|
|
)
|
|
await self._update_terminal_message(interaction)
|
|
return
|
|
|
|
# Handle 'exit' or 'quit'
|
|
if command.strip().lower() in ["exit", "quit"]:
|
|
self.output_history.append("Exiting terminal session...")
|
|
await self._update_terminal_message(interaction) # Show exit message
|
|
await self.stop_terminal_session(interaction)
|
|
return
|
|
|
|
self.last_command = command # Store command for display after execution
|
|
|
|
# For other commands, use subprocess
|
|
if self.active_process and self.active_process.poll() is None:
|
|
self.output_history.append(
|
|
"A command is already running. Please wait or refresh."
|
|
)
|
|
await self._update_terminal_message(interaction)
|
|
return
|
|
|
|
# For other commands, use subprocess
|
|
if self.active_process and self.active_process.poll() is None:
|
|
self.output_history.append(
|
|
"A command is already running. Please wait or refresh."
|
|
)
|
|
await self._update_terminal_message(interaction)
|
|
return
|
|
|
|
try:
|
|
# Use shlex.split for safer command parsing
|
|
command_parts = shlex.split(command)
|
|
if not command_parts:
|
|
self.output_history.append("No command provided.")
|
|
self.output_history.append(f"{self.current_cwd}> ")
|
|
self.scroll_offset = max(
|
|
0, len(self.output_history) - MAX_OUTPUT_LINES_PER_IMAGE
|
|
)
|
|
await self._update_terminal_message(interaction)
|
|
return
|
|
|
|
self.active_process = subprocess.Popen(
|
|
command_parts,
|
|
stdin=subprocess.PIPE, # Enable interactive input
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True, # Use text mode for easier handling of output
|
|
cwd=self.current_cwd,
|
|
# bufsize=1, # Removed line-buffering for better interactive handling
|
|
# universal_newlines=True # text=True handles this
|
|
)
|
|
if not self.auto_update_task.is_running():
|
|
self.auto_update_task.start()
|
|
|
|
# Initial update to show command is running
|
|
self.output_history.append(
|
|
f"{self.current_cwd}> {command}"
|
|
) # Add command to history immediately
|
|
self.scroll_offset = max(
|
|
0, len(self.output_history) - MAX_OUTPUT_LINES_PER_IMAGE
|
|
)
|
|
await self._update_terminal_message(interaction)
|
|
|
|
except FileNotFoundError:
|
|
self.output_history.append(f"Error: Command not found: {command_parts[0]}")
|
|
self.output_history.append(f"{self.current_cwd}> ")
|
|
self.scroll_offset = max(
|
|
0, len(self.output_history) - MAX_OUTPUT_LINES_PER_IMAGE
|
|
)
|
|
await self._update_terminal_message(interaction)
|
|
except Exception as e:
|
|
self.output_history.append(f"Error executing command: {e}")
|
|
self.output_history.append(f"{self.current_cwd}> ")
|
|
self.scroll_offset = max(
|
|
0, len(self.output_history) - MAX_OUTPUT_LINES_PER_IMAGE
|
|
)
|
|
await self._update_terminal_message(interaction)
|
|
|
|
async def refresh_terminal_output(self, interaction: Interaction | None = None):
|
|
"""Called by task loop or refresh button to update output from active process."""
|
|
if not self.terminal_active:
|
|
if self.auto_update_task.is_running():
|
|
self.auto_update_task.cancel()
|
|
return
|
|
|
|
updated = False
|
|
if self.active_process:
|
|
if self.active_process.poll() is not None: # Process has finished
|
|
return_code = self.active_process.returncode
|
|
# Read any final output
|
|
final_stdout, final_stderr = self.active_process.communicate()
|
|
|
|
if final_stdout:
|
|
self.output_history.extend(final_stdout.strip().splitlines())
|
|
if final_stderr:
|
|
self.output_history.extend(
|
|
[f"STDERR: {l}" for l in final_stderr.strip().splitlines()]
|
|
)
|
|
|
|
self.output_history.append(
|
|
f"Process finished with exit code {return_code}."
|
|
)
|
|
self.output_history.append(f"{self.current_cwd}> ") # New prompt
|
|
self.active_process = None
|
|
if (
|
|
self.auto_update_task.is_running()
|
|
): # Stop loop if it was running for this process
|
|
self.auto_update_task.stop()
|
|
updated = True
|
|
else: # Process is still running, check for new output without blocking
|
|
try:
|
|
# Read available output without blocking
|
|
stdout_output = self.active_process.stdout.read()
|
|
stderr_output = self.active_process.stderr.read()
|
|
|
|
if stdout_output:
|
|
self.output_history.extend(stdout_output.strip().splitlines())
|
|
updated = True
|
|
if stderr_output:
|
|
self.output_history.extend(
|
|
[f"STDERR: {l}" for l in stderr_output.strip().splitlines()]
|
|
)
|
|
updated = True
|
|
|
|
except io.UnsupportedOperation:
|
|
# This might happen if the stream is not seekable or non-blocking read is not supported
|
|
# In this case, we might just have to wait for the process to finish
|
|
pass # No update from this read attempt
|
|
except Exception as e:
|
|
self.output_history.append(f"Error reading process output: {e}")
|
|
updated = True
|
|
|
|
if (
|
|
updated or interaction
|
|
): # if interaction, means it's a manual refresh, so always update
|
|
self.scroll_offset = max(
|
|
0, len(self.output_history) - MAX_OUTPUT_LINES_PER_IMAGE
|
|
)
|
|
await self._update_terminal_message(interaction)
|
|
|
|
# If an interactive prompt was detected and the process is still running,
|
|
# we might need to signal the user or change button states.
|
|
# This part will be handled in the TerminalView update_button_states
|
|
# and potentially a new mechanism for sending input.
|
|
|
|
|
|
class TerminalInputModal(ui.Modal, title="Send Command to Terminal"):
|
|
command_input = ui.TextInput(
|
|
label="Command",
|
|
placeholder="Enter command (e.g., ls -l, python script.py)",
|
|
style=discord.TextStyle.long, # For multi-line, though usually single.
|
|
max_length=400,
|
|
)
|
|
|
|
def __init__(self, cog: TerminalCog):
|
|
super().__init__(timeout=300) # 5 minutes timeout for modal
|
|
self.cog = cog
|
|
|
|
async def on_submit(self, interaction: Interaction):
|
|
user_input = self.command_input.value
|
|
if not user_input:
|
|
await interaction.response.send_message("No input entered.", ephemeral=True)
|
|
return
|
|
|
|
# Defer the interaction as we will update the message later
|
|
await interaction.response.defer()
|
|
|
|
if self.cog.active_process and self.cog.active_process.poll() is None:
|
|
# There is an active process, assume the input is for it
|
|
try:
|
|
self.cog.active_process.stdin.write(user_input + "\n")
|
|
self.cog.active_process.stdin.flush()
|
|
# Add the input to history for display
|
|
self.cog.output_history.append(user_input)
|
|
self.cog.scroll_offset = max(
|
|
0, len(self.cog.output_history) - MAX_OUTPUT_LINES_PER_IMAGE
|
|
)
|
|
await self.cog._update_terminal_message(
|
|
interaction
|
|
) # Update message with the input
|
|
except Exception as e:
|
|
self.cog.output_history.append(f"Error sending input to process: {e}")
|
|
self.cog.scroll_offset = max(
|
|
0, len(self.cog.output_history) - MAX_OUTPUT_LINES_PER_IMAGE
|
|
)
|
|
await self.cog._update_terminal_message(interaction)
|
|
else:
|
|
# No active process, execute as a new command
|
|
await self.cog.execute_shell_command(user_input, interaction)
|
|
|
|
async def on_error(self, interaction: Interaction, error: Exception):
|
|
await interaction.response.send_message(f"Modal error: {error}", ephemeral=True)
|
|
print(f"TerminalInputModal error: {error}")
|
|
|
|
|
|
class TerminalView(ui.View):
|
|
def __init__(self, cog: TerminalCog, owner_id: int):
|
|
super().__init__(timeout=None) # Persistent view
|
|
self.cog = cog
|
|
self.owner_id = owner_id
|
|
self.message: discord.Message | None = (
|
|
None # To store the message this view is attached to
|
|
)
|
|
|
|
# Add buttons after initialization
|
|
self._add_buttons()
|
|
|
|
def _add_buttons(self):
|
|
self.clear_items() # Clear existing items if any (e.g., on re-creation)
|
|
|
|
# Scroll Up
|
|
self.scroll_up_button = ui.Button(
|
|
label="Scroll Up", emoji="⬆️", style=discord.ButtonStyle.secondary, row=0
|
|
)
|
|
self.scroll_up_button.callback = self.scroll_up_callback
|
|
self.add_item(self.scroll_up_button)
|
|
|
|
# Scroll Down
|
|
self.scroll_down_button = ui.Button(
|
|
label="Scroll Down", emoji="⬇️", style=discord.ButtonStyle.secondary, row=0
|
|
)
|
|
self.scroll_down_button.callback = self.scroll_down_callback
|
|
self.add_item(self.scroll_down_button)
|
|
|
|
# Send Input
|
|
self.send_input_button = ui.Button(
|
|
label="Send Input", emoji="⌨️", style=discord.ButtonStyle.primary, row=1
|
|
)
|
|
self.send_input_button.callback = self.send_input_callback
|
|
self.add_item(self.send_input_button)
|
|
|
|
# Refresh
|
|
self.refresh_button = ui.Button(
|
|
label="Refresh", emoji="🔄", style=discord.ButtonStyle.success, row=1
|
|
)
|
|
self.refresh_button.callback = self.refresh_callback
|
|
self.add_item(self.refresh_button)
|
|
|
|
# Close/Exit Button
|
|
self.close_button = ui.Button(
|
|
label="Close Terminal", emoji="❌", style=discord.ButtonStyle.danger, row=1
|
|
)
|
|
self.close_button.callback = self.close_callback
|
|
self.add_item(self.close_button)
|
|
|
|
self.update_button_states(self.cog)
|
|
|
|
async def interaction_check(self, interaction: Interaction) -> bool:
|
|
"""Ensure only the bot owner can interact."""
|
|
# Use the cog's owner_id which should be set correctly
|
|
is_allowed = interaction.user.id == OWNER_ID
|
|
if not is_allowed:
|
|
await interaction.response.send_message(
|
|
"You are not authorized to use these buttons.", ephemeral=True
|
|
)
|
|
return is_allowed
|
|
|
|
def update_button_states(self, cog_state: TerminalCog):
|
|
"""Enable/disable buttons based on terminal state."""
|
|
# Scroll Up
|
|
self.scroll_up_button.disabled = cog_state.scroll_offset <= 0
|
|
|
|
# Scroll Down
|
|
max_scroll = len(cog_state.output_history) - MAX_OUTPUT_LINES_PER_IMAGE
|
|
self.scroll_down_button.disabled = (
|
|
cog_state.scroll_offset >= max_scroll
|
|
or len(cog_state.output_history) <= MAX_OUTPUT_LINES_PER_IMAGE
|
|
)
|
|
|
|
# Send Input & Refresh should generally be enabled if terminal is active
|
|
self.send_input_button.disabled = not cog_state.terminal_active or (
|
|
cog_state.active_process is not None
|
|
and cog_state.active_process.poll() is None
|
|
) # Disable if command running
|
|
self.refresh_button.disabled = not cog_state.terminal_active
|
|
self.close_button.disabled = not cog_state.terminal_active
|
|
|
|
async def scroll_up_callback(self, interaction: Interaction):
|
|
self.cog.scroll_offset = max(
|
|
0, self.cog.scroll_offset - (MAX_OUTPUT_LINES_PER_IMAGE // 2)
|
|
) # Scroll half page
|
|
await self.cog._update_terminal_message(interaction)
|
|
|
|
async def scroll_down_callback(self, interaction: Interaction):
|
|
max_scroll = len(self.cog.output_history) - MAX_OUTPUT_LINES_PER_IMAGE
|
|
self.cog.scroll_offset = min(
|
|
max_scroll, self.cog.scroll_offset + (MAX_OUTPUT_LINES_PER_IMAGE // 2)
|
|
)
|
|
self.cog.scroll_offset = max(0, self.cog.scroll_offset) # Ensure not negative
|
|
await self.cog._update_terminal_message(interaction)
|
|
|
|
async def send_input_callback(self, interaction: Interaction):
|
|
modal = TerminalInputModal(self.cog)
|
|
await interaction.response.send_modal(modal)
|
|
# The modal's on_submit will handle command execution and update
|
|
|
|
async def refresh_callback(self, interaction: Interaction):
|
|
# Defer because refresh_terminal_output might take a moment and edit
|
|
await interaction.response.defer()
|
|
await self.cog.refresh_terminal_output(
|
|
interaction
|
|
) # Pass interaction to update message
|
|
|
|
async def close_callback(self, interaction: Interaction):
|
|
await interaction.response.defer() # Defer before stopping session
|
|
await self.cog.stop_terminal_session(interaction)
|
|
# The stop_terminal_session should edit the message to indicate closure.
|
|
# No further update needed here for the view itself as it will be removed.
|
|
|
|
|
|
async def setup(bot: commands.Bot):
|
|
terminal_cog = TerminalCog(bot)
|
|
await bot.add_cog(terminal_cog)
|
|
# Call cog_load if it's not automatically handled by your bot's loading mechanism
|
|
# or if it needs to be explicitly called after add_cog for some reason.
|
|
# In most modern discord.py setups, cog_load is called automatically.
|
|
# If self.owner_id is not being set, ensure cog_load is being called.
|
|
# You might need to manually call it if your bot structure doesn't.
|
|
# Example: if hasattr(terminal_cog, 'cog_load'):
|
|
# await terminal_cog.cog_load()
|