Set a 10-minute timeout for the `CaptchaModal` to prevent it from hanging indefinitely. Implemented a `wait()` method within the modal, allowing the calling function to asynchronously wait for submission or timeout. This enables proper flow control for the upload process. Restricted the "Solve Captcha" button in `CaptchaView` to only be clickable by the user who initiated the command, preventing unauthorized interaction.
264 lines
11 KiB
Python
264 lines
11 KiB
Python
import discord
|
|
from discord.ext import commands
|
|
from discord import app_commands
|
|
import aiohttp
|
|
import io
|
|
import os
|
|
import json
|
|
import asyncio
|
|
import base64
|
|
from typing import Optional, Dict, Any, Union
|
|
import discord.ui
|
|
|
|
class CaptchaModal(discord.ui.Modal, title="Solve Image Captcha"):
|
|
def __init__(self, captcha_id: str):
|
|
# Set the modal timeout to 10 minutes to match the view's timeout for consistency
|
|
super().__init__(timeout=600)
|
|
self.captcha_id = captcha_id
|
|
self.solution = discord.ui.TextInput(
|
|
label="Captcha Solution",
|
|
placeholder="Enter the text from the image...",
|
|
required=True,
|
|
min_length=1,
|
|
max_length=100
|
|
)
|
|
self.add_item(self.solution)
|
|
self.interaction = None
|
|
self.is_submitted = asyncio.Event()
|
|
|
|
async def on_submit(self, interaction: discord.Interaction):
|
|
# This method will be called when the user submits the modal
|
|
# Store the interaction for later use
|
|
self.interaction = interaction
|
|
await interaction.response.defer(ephemeral=True) # Defer the response to prevent interaction timeout
|
|
# Set the event to signal that the modal has been submitted
|
|
self.is_submitted.set()
|
|
|
|
async def wait(self) -> bool:
|
|
"""Wait for the modal to be submitted or time out.
|
|
|
|
Returns:
|
|
bool: True if the modal timed out, False if it was submitted.
|
|
"""
|
|
try:
|
|
await asyncio.wait_for(self.is_submitted.wait(), timeout=self.timeout)
|
|
return False # Modal was submitted
|
|
except asyncio.TimeoutError:
|
|
return True # Modal timed out
|
|
|
|
|
|
class CaptchaView(discord.ui.View):
|
|
def __init__(self, modal: CaptchaModal, original_interactor_id: int):
|
|
super().__init__(timeout=600) # 10 minutes timeout
|
|
self.modal = modal
|
|
self.original_interactor_id = original_interactor_id
|
|
|
|
@discord.ui.button(label="Solve Captcha", style=discord.ButtonStyle.primary)
|
|
async def solve_button(self, interaction: discord.Interaction, button: discord.ui.Button):
|
|
# Check if the user interacting is the original user who initiated the command
|
|
if interaction.user.id != self.original_interactor_id:
|
|
await interaction.response.send_message("Only the user who initiated this command can solve the captcha.", ephemeral=True)
|
|
return
|
|
|
|
await interaction.response.send_modal(self.modal)
|
|
|
|
|
|
class UploadCog(commands.Cog, name="Upload"):
|
|
"""Cog for interacting with the upload API"""
|
|
|
|
def __init__(self, bot: commands.Bot):
|
|
self.bot = bot
|
|
self.api_base_url = "https://upload.slipstreamm.dev/upload"
|
|
self.session = None
|
|
self.captcha_cache = {} # Store captcha IDs temporarily
|
|
self.headers = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36"
|
|
}
|
|
print("UploadCog initialized!")
|
|
|
|
# Create command group
|
|
self.upload_group = app_commands.Group(
|
|
name="upload",
|
|
description="Commands for interacting with the upload API",
|
|
guild_only=False
|
|
)
|
|
|
|
# Register commands
|
|
self.register_commands()
|
|
|
|
# Add command group to the bot's tree
|
|
self.bot.tree.add_command(self.upload_group)
|
|
|
|
async def cog_load(self):
|
|
"""Called when the cog is loaded."""
|
|
self.session = aiohttp.ClientSession(headers=self.headers)
|
|
print("UploadCog session created")
|
|
|
|
async def cog_unload(self):
|
|
"""Called when the cog is unloaded."""
|
|
if self.session:
|
|
await self.session.close()
|
|
print("UploadCog session closed")
|
|
|
|
def register_commands(self):
|
|
"""Register all commands for this cog"""
|
|
|
|
# --- Upload File Command (Interactive) ---
|
|
upload_file_command = app_commands.Command(
|
|
name="file",
|
|
description="Upload a file, interactively solving an image captcha",
|
|
callback=self.upload_file_interactive_callback, # New callback name
|
|
parent=self.upload_group
|
|
)
|
|
app_commands.describe(
|
|
file="The file to upload",
|
|
expires_after="Time in seconds until the file expires (default: 86400 - 24 hours)"
|
|
)(upload_file_command)
|
|
self.upload_group.add_command(upload_file_command)
|
|
|
|
async def _make_api_request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
|
|
"""Make a request to the API"""
|
|
print(f"Making {method} request to {endpoint} with params: {kwargs}")
|
|
if not self.session:
|
|
self.session = aiohttp.ClientSession(headers=self.headers)
|
|
|
|
# Ensure the endpoint starts with a slash
|
|
if not endpoint.startswith("/"):
|
|
endpoint = f"/{endpoint}"
|
|
|
|
url = f"{self.api_base_url}{endpoint}"
|
|
|
|
try:
|
|
if method.upper() == "GET":
|
|
async with self.session.get(url, **kwargs) as response:
|
|
if response.status == 200:
|
|
return await response.json()
|
|
else:
|
|
error_text = await response.text()
|
|
raise Exception(f"API request failed: {response.status} - {error_text}")
|
|
elif method.upper() == "POST":
|
|
async with self.session.post(url, **kwargs) as response:
|
|
if response.status in (200, 201):
|
|
return await response.json()
|
|
else:
|
|
error_text = await response.text()
|
|
raise Exception(f"API request failed: {response.status} - {error_text}")
|
|
else:
|
|
raise ValueError(f"Unsupported HTTP method: {method}")
|
|
except Exception as e:
|
|
print(f"Error making API request to {url}: {e}")
|
|
raise
|
|
|
|
async def upload_file_interactive_callback(self, interaction: discord.Interaction,
|
|
file: discord.Attachment,
|
|
expires_after: Optional[int] = 86400):
|
|
"""Upload a file, interactively solving an image captcha"""
|
|
await interaction.response.defer(ephemeral=False) # Defer the initial response
|
|
|
|
try:
|
|
# 1. Generate Image Captcha
|
|
captcha_data = await self._make_api_request("GET", "/api/captcha/image")
|
|
captcha_id = captcha_data.get("captcha_id")
|
|
image_data = captcha_data.get("image", "")
|
|
|
|
if not captcha_id or not image_data:
|
|
raise Exception("Failed to retrieve captcha ID or image data.")
|
|
|
|
if image_data.startswith("data:image/png;base64,"):
|
|
image_data = image_data.replace("data:image/png;base64,", "")
|
|
|
|
image_bytes = io.BytesIO(base64.b64decode(image_data))
|
|
image_bytes.seek(0)
|
|
captcha_image_file = discord.File(fp=image_bytes, filename="captcha.png")
|
|
|
|
# 2. Send Captcha and Prompt for Solution via Modal
|
|
embed = discord.Embed(
|
|
title="Image Captcha Challenge",
|
|
description="Please solve the captcha challenge to upload your file.",
|
|
color=discord.Color.blue()
|
|
)
|
|
embed.add_field(name="Captcha ID", value=f"`{captcha_id}`", inline=False)
|
|
embed.set_image(url="attachment://captcha.png")
|
|
embed.set_footer(text="This captcha is valid for 10 minutes. Enter the text from the image.")
|
|
|
|
# Create the modal instance
|
|
modal = CaptchaModal(captcha_id=captcha_id)
|
|
|
|
# Create a view with a button that sends the modal, restricted to the original user
|
|
view = CaptchaView(modal=modal, original_interactor_id=interaction.user.id)
|
|
|
|
# Send the captcha image, instructions, and the view in a single message
|
|
await interaction.followup.send(embed=embed, file=captcha_image_file, view=view, ephemeral=True)
|
|
|
|
# Wait for the modal submission
|
|
timed_out = await modal.wait()
|
|
|
|
if timed_out:
|
|
await interaction.followup.send("Captcha solution timed out. Please try again.", ephemeral=True)
|
|
return
|
|
|
|
captcha_solution = modal.solution.value
|
|
|
|
# 3. Proceed with File Upload
|
|
if not file:
|
|
await interaction.followup.send("Please provide a file to upload", ephemeral=True)
|
|
return
|
|
|
|
if not captcha_solution:
|
|
await interaction.followup.send("Captcha solution was not provided.", ephemeral=True)
|
|
return
|
|
|
|
# Download the file
|
|
file_bytes = await file.read()
|
|
|
|
# Prepare form data
|
|
form_data = aiohttp.FormData()
|
|
form_data.add_field('file', file_bytes, filename=file.filename, content_type=file.content_type)
|
|
form_data.add_field('captcha_id', captcha_id)
|
|
form_data.add_field('captcha_solution', captcha_solution)
|
|
form_data.add_field('expires_after', str(expires_after))
|
|
|
|
# Make API request to upload file
|
|
upload_data = await self._make_api_request("POST", "/api/upload/third-party", data=form_data)
|
|
|
|
# Create embed with upload information
|
|
embed = discord.Embed(
|
|
title="File Uploaded Successfully",
|
|
description=f"Your file has been uploaded and will expire in {expires_after} seconds",
|
|
color=discord.Color.green()
|
|
)
|
|
|
|
file_id = upload_data.get("id", "unknown")
|
|
file_url = f"{self.api_base_url}/uploads/{file_id}"
|
|
|
|
# Format file size nicely
|
|
file_size_bytes = upload_data.get('file_size', 0)
|
|
if file_size_bytes < 1024:
|
|
file_size_str = f"{file_size_bytes} bytes"
|
|
elif file_size_bytes < 1024 * 1024:
|
|
file_size_str = f"{file_size_bytes / 1024:.2f} KB"
|
|
else:
|
|
file_size_str = f"{file_size_bytes / (1024 * 1024):.2f} MB"
|
|
|
|
embed.add_field(name="File ID", value=file_id, inline=True)
|
|
embed.add_field(name="Original Name", value=upload_data.get("original_name", "unknown"), inline=True)
|
|
embed.add_field(name="File Size", value=file_size_str, inline=True)
|
|
embed.add_field(name="Content Type", value=upload_data.get("content_type", "unknown"), inline=True)
|
|
embed.add_field(name="Scan Status", value=upload_data.get("scan_status", "unknown"), inline=True)
|
|
embed.add_field(name="File URL", value=file_url, inline=False)
|
|
|
|
# Add clickable link
|
|
embed.description += f"\n\n[Click here to download]({file_url})"
|
|
|
|
# Send the final upload success message to the original interaction channel
|
|
await interaction.followup.send(embed=embed)
|
|
|
|
except Exception as e:
|
|
# If an error occurs during captcha generation or upload, send an ephemeral error message
|
|
await interaction.followup.send(f"Error during file upload process: {e}", ephemeral=True)
|
|
|
|
async def setup(bot: commands.Bot):
|
|
"""Add the UploadCog to the bot."""
|
|
await bot.add_cog(UploadCog(bot))
|
|
print("UploadCog setup complete.")
|