Append the file extension to the generated upload URL if available. This ensures the URL is more accurate and directly points to the file with its correct type, both at initial generation and after file processing completes.
332 lines
15 KiB
Python
332 lines
15 KiB
Python
import discord
|
|
from discord.ext import commands
|
|
from discord import app_commands
|
|
import aiohttp
|
|
import io
|
|
import os
|
|
import json
|
|
import asyncio
|
|
import base64
|
|
from typing import Optional, Dict, Any, Union
|
|
import discord.ui
|
|
|
|
class CaptchaModal(discord.ui.Modal, title="Solve Image Captcha"):
|
|
def __init__(self, captcha_id: str):
|
|
# Set the modal timeout to 10 minutes to match the view's timeout for consistency
|
|
super().__init__(timeout=600)
|
|
self.captcha_id = captcha_id
|
|
self.solution = discord.ui.TextInput(
|
|
label="Captcha Solution",
|
|
placeholder="Enter the text from the image...",
|
|
required=True,
|
|
min_length=1,
|
|
max_length=100
|
|
)
|
|
self.add_item(self.solution)
|
|
self.interaction = None
|
|
self.is_submitted = asyncio.Event()
|
|
|
|
async def on_submit(self, interaction: discord.Interaction):
|
|
# This method will be called when the user submits the modal
|
|
# Store the interaction for later use
|
|
self.interaction = interaction
|
|
await interaction.response.defer(ephemeral=True) # Defer the response to prevent interaction timeout
|
|
# Set the event to signal that the modal has been submitted
|
|
self.is_submitted.set()
|
|
|
|
async def wait(self) -> bool:
|
|
"""Wait for the modal to be submitted or time out.
|
|
|
|
Returns:
|
|
bool: True if the modal timed out, False if it was submitted.
|
|
"""
|
|
try:
|
|
await asyncio.wait_for(self.is_submitted.wait(), timeout=self.timeout)
|
|
return False # Modal was submitted
|
|
except asyncio.TimeoutError:
|
|
return True # Modal timed out
|
|
|
|
|
|
class CaptchaView(discord.ui.View):
|
|
def __init__(self, modal: CaptchaModal, original_interactor_id: int):
|
|
super().__init__(timeout=600) # 10 minutes timeout
|
|
self.modal = modal
|
|
self.original_interactor_id = original_interactor_id
|
|
|
|
@discord.ui.button(label="Solve Captcha", style=discord.ButtonStyle.primary)
|
|
async def solve_button(self, interaction: discord.Interaction, button: discord.ui.Button):
|
|
# Check if the user interacting is the original user who initiated the command
|
|
if interaction.user.id != self.original_interactor_id:
|
|
await interaction.response.send_message("Only the user who initiated this command can solve the captcha.", ephemeral=True)
|
|
return
|
|
|
|
await interaction.response.send_modal(self.modal)
|
|
|
|
|
|
class UploadCog(commands.Cog, name="Upload"):
|
|
"""Cog for interacting with the upload API"""
|
|
|
|
def __init__(self, bot: commands.Bot):
|
|
self.bot = bot
|
|
# Remove trailing "/upload" from base URL as it's already part of the endpoint paths
|
|
self.api_base_url = "https://slipstreamm.dev"
|
|
self.session = None
|
|
self.captcha_cache = {} # Store captcha IDs temporarily
|
|
self.headers = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36"
|
|
}
|
|
print(f"UploadCog initialized with API base URL: {self.api_base_url}")
|
|
|
|
# Create command group
|
|
self.upload_group = app_commands.Group(
|
|
name="upload",
|
|
description="Commands for interacting with the upload API",
|
|
guild_only=False
|
|
)
|
|
|
|
# Register commands
|
|
self.register_commands()
|
|
|
|
# Add command group to the bot's tree
|
|
self.bot.tree.add_command(self.upload_group)
|
|
|
|
async def cog_load(self):
|
|
"""Called when the cog is loaded."""
|
|
self.session = aiohttp.ClientSession(headers=self.headers)
|
|
print("UploadCog session created")
|
|
|
|
async def cog_unload(self):
|
|
"""Called when the cog is unloaded."""
|
|
if self.session:
|
|
await self.session.close()
|
|
print("UploadCog session closed")
|
|
|
|
def register_commands(self):
|
|
"""Register all commands for this cog"""
|
|
|
|
# --- Upload File Command (Interactive) ---
|
|
upload_file_command = app_commands.Command(
|
|
name="file",
|
|
description="Upload a file, interactively solving an image captcha",
|
|
callback=self.upload_file_interactive_callback, # New callback name
|
|
parent=self.upload_group
|
|
)
|
|
app_commands.describe(
|
|
file="The file to upload",
|
|
expires_after="Time in seconds until the file expires (default: 86400 - 24 hours)"
|
|
)(upload_file_command)
|
|
self.upload_group.add_command(upload_file_command)
|
|
|
|
async def _make_api_request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
|
|
"""Make a request to the API"""
|
|
print(f"Making {method} request to {endpoint} with params: {kwargs}")
|
|
if not self.session:
|
|
self.session = aiohttp.ClientSession(headers=self.headers)
|
|
|
|
# Ensure the endpoint starts with a slash
|
|
if not endpoint.startswith("/"):
|
|
endpoint = f"/{endpoint}"
|
|
|
|
url = f"{self.api_base_url}{endpoint}"
|
|
|
|
# Debug the request details
|
|
print(f"Full URL: {url}")
|
|
print(f"Method: {method.upper()}")
|
|
print(f"Headers: {self.headers}")
|
|
|
|
# For FormData, don't set Content-Type header as aiohttp will set it with boundary
|
|
request_headers = self.headers.copy()
|
|
|
|
try:
|
|
if method.upper() == "GET":
|
|
async with self.session.get(url, **kwargs) as response:
|
|
if response.status == 200:
|
|
return await response.json()
|
|
else:
|
|
error_text = await response.text()
|
|
raise Exception(f"API request failed: {response.status} - {error_text}")
|
|
elif method.upper() == "POST":
|
|
print(f"Sending POST request to {url}")
|
|
# If we're sending form data, make sure we don't manually set Content-Type
|
|
if 'data' in kwargs and isinstance(kwargs['data'], aiohttp.FormData):
|
|
print("Sending multipart/form-data request")
|
|
# aiohttp will automatically set the correct Content-Type with boundary
|
|
|
|
async with self.session.post(url, headers=request_headers, **kwargs) as response:
|
|
print(f"Response status: {response.status}")
|
|
print(f"Response headers: {response.headers}")
|
|
|
|
if response.status in (200, 201):
|
|
return await response.json()
|
|
else:
|
|
error_text = await response.text()
|
|
print(f"Error response body: {error_text}")
|
|
raise Exception(f"API request failed: {response.status} - {error_text}")
|
|
else:
|
|
raise ValueError(f"Unsupported HTTP method: {method}")
|
|
except Exception as e:
|
|
print(f"Error making API request to {url}: {e}")
|
|
raise
|
|
|
|
async def upload_file_interactive_callback(self, interaction: discord.Interaction,
|
|
file: discord.Attachment,
|
|
expires_after: Optional[int] = 86400):
|
|
"""Upload a file, interactively solving an image captcha"""
|
|
await interaction.response.defer(ephemeral=False) # Defer the initial response
|
|
|
|
try:
|
|
# 1. Generate Image Captcha
|
|
captcha_data = await self._make_api_request("GET", "/upload/api/captcha/image")
|
|
captcha_id = captcha_data.get("captcha_id")
|
|
image_data = captcha_data.get("image", "")
|
|
|
|
if not captcha_id or not image_data:
|
|
raise Exception("Failed to retrieve captcha ID or image data.")
|
|
|
|
if image_data.startswith("data:image/png;base64,"):
|
|
image_data = image_data.replace("data:image/png;base64,", "")
|
|
|
|
image_bytes = io.BytesIO(base64.b64decode(image_data))
|
|
image_bytes.seek(0)
|
|
captcha_image_file = discord.File(fp=image_bytes, filename="captcha.png")
|
|
|
|
# 2. Send Captcha and Prompt for Solution via Modal
|
|
embed = discord.Embed(
|
|
title="Image Captcha Challenge",
|
|
description="Please solve the captcha challenge to upload your file.",
|
|
color=discord.Color.blue()
|
|
)
|
|
embed.add_field(name="Captcha ID", value=f"`{captcha_id}`", inline=False)
|
|
embed.set_image(url="attachment://captcha.png")
|
|
embed.set_footer(text="This captcha is valid for 10 minutes. Enter the text from the image.")
|
|
|
|
# Create the modal instance
|
|
modal = CaptchaModal(captcha_id=captcha_id)
|
|
|
|
# Create a view with a button that sends the modal, restricted to the original user
|
|
view = CaptchaView(modal=modal, original_interactor_id=interaction.user.id)
|
|
|
|
# Send the captcha image, instructions, and the view in a single message
|
|
await interaction.followup.send(embed=embed, file=captcha_image_file, view=view, ephemeral=True)
|
|
|
|
# Wait for the modal submission
|
|
timed_out = await modal.wait()
|
|
|
|
if timed_out:
|
|
await interaction.followup.send("Captcha solution timed out. Please try again.", ephemeral=True)
|
|
return
|
|
|
|
captcha_solution = modal.solution.value
|
|
|
|
# 3. Proceed with File Upload
|
|
if not file:
|
|
await interaction.followup.send("Please provide a file to upload", ephemeral=True)
|
|
return
|
|
|
|
if not captcha_solution:
|
|
await interaction.followup.send("Captcha solution was not provided.", ephemeral=True)
|
|
return
|
|
|
|
# Download the file
|
|
file_bytes = await file.read()
|
|
|
|
# Prepare form data
|
|
form_data = aiohttp.FormData()
|
|
form_data.add_field('file', file_bytes, filename=file.filename, content_type=file.content_type)
|
|
form_data.add_field('captcha_id', captcha_id)
|
|
form_data.add_field('captcha_solution', captcha_solution)
|
|
form_data.add_field('expires_after', str(expires_after))
|
|
|
|
# Debug form data fields
|
|
print(f"Form data fields: file, captcha_id={captcha_id}, captcha_solution={captcha_solution}, expires_after={expires_after}")
|
|
|
|
# Make API request to upload file
|
|
try:
|
|
print("Attempting to upload file to third-party endpoint...")
|
|
upload_data = await self._make_api_request("POST", "/upload/api/upload/third-party", data=form_data)
|
|
print(f"Upload successful, received data: {upload_data}")
|
|
except Exception as e:
|
|
print(f"Upload failed with error: {e}")
|
|
# Try a direct approach as a fallback
|
|
print("Trying direct aiohttp request as fallback...")
|
|
url = f"{self.api_base_url}/upload/api/upload/third-party"
|
|
async with self.session.post(url, data=form_data) as response:
|
|
if response.status in (200, 201):
|
|
upload_data = await response.json()
|
|
print(f"Direct upload successful: {upload_data}")
|
|
else:
|
|
error_text = await response.text()
|
|
print(f"Direct upload failed: {response.status} - {error_text}")
|
|
raise Exception(f"API request failed: {response.status} - {error_text}")
|
|
|
|
# Poll until access_ready is true or timeout after 30 seconds
|
|
file_id = upload_data.get("id", "unknown")
|
|
file_extension = upload_data.get("file_extension", "")
|
|
# Append file extension to the URL if available
|
|
file_url = f"https://slipstreamm.dev/uploads/{file_id}" + (f".{file_extension}" if file_extension else "")
|
|
|
|
# Send initial message that we're waiting for the file to be processed
|
|
status_message = await interaction.followup.send("File uploaded successfully. Waiting for file processing to complete...")
|
|
|
|
# Poll for access_ready status
|
|
max_attempts = 30 # 30 seconds max wait time
|
|
for attempt in range(max_attempts):
|
|
try:
|
|
# Get the current file status
|
|
file_status = await self._make_api_request("GET", f"/upload/api/file/{file_id}/status")
|
|
print(f"File status poll attempt {attempt+1}: {file_status}")
|
|
|
|
if file_status.get("access_ready", False):
|
|
print(f"File is ready after {attempt+1} attempts")
|
|
# Update upload_data with the latest information
|
|
upload_data = file_status
|
|
# Update file_url with the latest file extension
|
|
file_extension = file_status.get("file_extension", "")
|
|
file_url = f"https://slipstreamm.dev/uploads/{file_id}" + (f".{file_extension}" if file_extension else "")
|
|
break
|
|
|
|
# Wait 1 second before polling again
|
|
await asyncio.sleep(1)
|
|
except Exception as e:
|
|
print(f"Error polling file status: {e}")
|
|
# Continue polling despite errors
|
|
await asyncio.sleep(1)
|
|
|
|
# Create embed with upload information
|
|
embed = discord.Embed(
|
|
title="File Uploaded Successfully",
|
|
description=f"Your file has been uploaded and will expire in {expires_after} seconds",
|
|
color=discord.Color.green()
|
|
)
|
|
|
|
# Format file size nicely
|
|
file_size_bytes = upload_data.get('size', 0)
|
|
if file_size_bytes < 1024:
|
|
file_size_str = f"{file_size_bytes} bytes"
|
|
elif file_size_bytes < 1024 * 1024:
|
|
file_size_str = f"{file_size_bytes / 1024:.2f} KB"
|
|
else:
|
|
file_size_str = f"{file_size_bytes / (1024 * 1024):.2f} MB"
|
|
|
|
embed.add_field(name="File ID", value=file_id, inline=True)
|
|
embed.add_field(name="Original Name", value=upload_data.get("file_name", "unknown"), inline=True)
|
|
embed.add_field(name="File Size", value=file_size_str, inline=True)
|
|
embed.add_field(name="Content Type", value=upload_data.get("content_type", "unknown"), inline=True)
|
|
embed.add_field(name="Scan Status", value=upload_data.get("scan_status", "unknown"), inline=True)
|
|
embed.add_field(name="File URL", value=file_url, inline=False)
|
|
|
|
# Add clickable link
|
|
embed.description += f"\n\n[Click here to download]({file_url})"
|
|
|
|
# Edit the status message with the final embed
|
|
await status_message.edit(content=None, embed=embed)
|
|
|
|
except Exception as e:
|
|
# If an error occurs during captcha generation or upload, send an ephemeral error message
|
|
await interaction.followup.send(f"Error during file upload process: {e}", ephemeral=True)
|
|
|
|
async def setup(bot: commands.Bot):
|
|
"""Add the UploadCog to the bot."""
|
|
await bot.add_cog(UploadCog(bot))
|
|
print("UploadCog setup complete.")
|