discordbot/api_service/api_server.py
2025-06-05 21:31:06 -06:00

4148 lines
160 KiB
Python

import os
import json
import sys
import asyncio
from typing import Dict, List, Optional, Any
from fastapi import (
FastAPI,
HTTPException,
Depends,
Header,
Request,
Response,
status,
Body,
)
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import (
HTMLResponse,
PlainTextResponse,
RedirectResponse,
FileResponse,
)
from fastapi.staticfiles import StaticFiles
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.middleware.sessions import SessionMiddleware
import aiohttp
import asyncpg
import discord
from api_service.database import Database # Existing DB
import logging
from pydantic_settings import BaseSettings, SettingsConfigDict
from pydantic import BaseModel, Field
from functools import lru_cache
from contextlib import asynccontextmanager
from enum import Enum
from fastapi import Request
from fastapi.responses import JSONResponse
from starlette.exceptions import HTTPException as StarletteHTTPException
# --- Logging Configuration ---
# Configure logging
log = logging.getLogger("api_server")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler(), logging.FileHandler("api_server.log")],
)
# --- Configuration Loading ---
# Determine the path to the .env file relative to this api_server.py file
# Go up one level from api_service/ to the project root, then into discordbot/
dotenv_path = os.path.join(os.path.dirname(__file__), "..", "discordbot", ".env")
class ApiSettings(BaseSettings):
# Existing API settings (if any were loaded from env before)
GURT_STATS_PUSH_SECRET: Optional[str] = None
API_HOST: str = "0.0.0.0" # Keep existing default if used
API_PORT: int = 8001 # Changed default port to 8001
SSL_CERT_FILE: Optional[str] = None
SSL_KEY_FILE: Optional[str] = None
# Discord OAuth Credentials (from discordbot/.env)
DISCORD_CLIENT_ID: str
DISCORD_CLIENT_SECRET: str
DISCORD_REDIRECT_URI: str
DISCORD_BOT_TOKEN: Optional[str] = None # Add bot token for API calls (optional)
# Secret key for dashboard session management
DASHBOARD_SECRET_KEY: str = (
"a_default_secret_key_for_development_only" # Provide a default for dev
)
# Database/Redis settings (Required for settings_manager)
POSTGRES_USER: str
POSTGRES_PASSWORD: str
POSTGRES_HOST: str
POSTGRES_SETTINGS_DB: str # The specific DB for settings
REDIS_HOST: str
REDIS_PORT: int = 6379
REDIS_PASSWORD: Optional[str] = None # Optional
# Secret key for AI Moderation API endpoint
MOD_LOG_API_SECRET: Optional[str] = None
AI_API_KEY: Optional[str] = None
model_config = SettingsConfigDict(
env_file=dotenv_path, env_file_encoding="utf-8", extra="ignore"
)
@lru_cache()
def get_api_settings() -> ApiSettings:
if not os.path.exists(dotenv_path):
print(
f"Warning: .env file not found at {dotenv_path}. Using defaults or environment variables."
)
return ApiSettings()
settings = get_api_settings()
# --- Constants derived from settings ---
DISCORD_API_BASE_URL = "https://discord.com/api/v10"
DISCORD_API_ENDPOINT = DISCORD_API_BASE_URL # Alias for backward compatibility
# Define dashboard-specific redirect URI
DASHBOARD_REDIRECT_URI = (
f"{settings.DISCORD_REDIRECT_URI.split('/api')[0]}/dashboard/api/auth/callback"
)
# We'll generate the full auth URL with PKCE parameters in the dashboard_login function
# This is just a base URL without the PKCE parameters
DISCORD_AUTH_BASE_URL = (
f"https://discord.com/api/oauth2/authorize?client_id={settings.DISCORD_CLIENT_ID}"
f"&redirect_uri={settings.DISCORD_REDIRECT_URI}&response_type=code&scope=identify guilds"
)
# Dashboard-specific auth base URL
DASHBOARD_AUTH_BASE_URL = (
f"https://discord.com/api/oauth2/authorize?client_id={settings.DISCORD_CLIENT_ID}"
f"&redirect_uri={DASHBOARD_REDIRECT_URI}&response_type=code&scope=identify guilds"
)
DISCORD_TOKEN_URL = f"{DISCORD_API_BASE_URL}/oauth2/token"
DISCORD_USER_URL = f"{DISCORD_API_BASE_URL}/users/@me"
DISCORD_USER_GUILDS_URL = f"{DISCORD_API_BASE_URL}/users/@me/guilds"
DISCORD_REDIRECT_URI = settings.DISCORD_REDIRECT_URI # Make it accessible directly
# For backward compatibility, keep DISCORD_AUTH_URL but it will be replaced in the dashboard_login function
DISCORD_AUTH_URL = DISCORD_AUTH_BASE_URL
# --- Gurt Stats Storage (IPC) ---
latest_gurt_stats: Optional[Dict[str, Any]] = None
# GURT_STATS_PUSH_SECRET is now loaded via ApiSettings
if not settings.GURT_STATS_PUSH_SECRET:
print(
"Warning: GURT_STATS_PUSH_SECRET not set. Internal stats update endpoint will be insecure."
)
# --- Helper Functions ---
async def get_guild_name_from_api(guild_id: int, timeout: float = 5.0) -> str:
"""
Get a guild's name from Discord API using the bot token.
Args:
guild_id: The Discord guild ID to get the name for
timeout: Maximum time to wait for the API request (in seconds)
Returns:
The guild name if successful, otherwise a fallback string with the guild ID
"""
fallback = f"Server {guild_id}" # Default fallback
if not settings.DISCORD_BOT_TOKEN:
log.warning("DISCORD_BOT_TOKEN not set, using guild ID as fallback")
return fallback
try:
# Use global http_session if available, otherwise create a new one
session = http_session if http_session else aiohttp.ClientSession()
# Headers for the request
headers = {"Authorization": f"Bot {settings.DISCORD_BOT_TOKEN}"}
# Send the request with a timeout
async with session.get(
f"https://discord.com/api/v10/guilds/{guild_id}",
headers=headers,
timeout=timeout,
) as response:
if response.status == 200:
guild_data = await response.json()
guild_name = guild_data.get("name", fallback)
log.info(f"Retrieved guild name '{guild_name}' for guild ID {guild_id}")
return guild_name
else:
log.warning(
f"Failed to get guild name for guild ID {guild_id}: HTTP {response.status}"
)
return fallback
except asyncio.TimeoutError:
log.error(f"Timeout getting guild name for guild ID {guild_id}")
return fallback
except Exception as e:
log.error(f"Error getting guild name for guild ID {guild_id}: {e}")
return fallback
async def send_discord_message_via_api(
channel_id: int, content: str, timeout: float = 5.0
) -> Dict[str, Any]:
"""
Send a message to a Discord channel using Discord's REST API directly.
This avoids using Discord.py's channel.send() method which can cause issues with FastAPI.
Args:
channel_id: The Discord channel ID to send the message to
content: The message content to send
timeout: Maximum time to wait for the API request (in seconds)
Returns:
A dictionary with status information about the message send operation
"""
if not settings.DISCORD_BOT_TOKEN:
return {
"success": False,
"message": "Discord bot token not configured",
"error": "no_token",
}
# Discord API endpoint for sending messages
url = f"https://discord.com/api/v10/channels/{channel_id}/messages"
# Headers for the request
headers = {
"Authorization": f"Bot {settings.DISCORD_BOT_TOKEN}",
"Content-Type": "application/json",
}
# Message data - allow for complex payloads (like embeds)
data: Dict[str, Any]
if isinstance(content, str):
data = {"content": content}
elif isinstance(
content, dict
): # Assuming dict means it's a full payload like {"embeds": [...]}
data = content
else:
return {
"success": False,
"message": "Invalid content type for sending message. Must be string or dict.",
"error": "invalid_content_type",
}
log.debug(f"Sending message to channel {channel_id} with data: {data}")
# Create a separate task for the API request to handle timeout properly
async def make_request():
try:
# Use global http_session if available, otherwise create a new one
session = http_session if http_session else aiohttp.ClientSession()
# Send the request with a timeout
async with session.post(
url, headers=headers, json=data, timeout=timeout
) as response:
if response.status == 200 or response.status == 201:
# Message sent successfully
response_data = await response.json()
return {
"success": True,
"message": "Message sent successfully",
"message_id": response_data.get("id"),
}
elif response.status == 403:
# Missing permissions
return {
"success": False,
"message": "Missing permissions to send message to this channel",
"error": "forbidden",
"status": response.status,
}
elif response.status == 429:
# Rate limited
response_data = await response.json()
retry_after = response_data.get("retry_after", 1)
return {
"success": False,
"message": f"Rate limited by Discord API. Retry after {retry_after} seconds",
"error": "rate_limited",
"retry_after": retry_after,
"status": response.status,
}
else:
# Other error
try:
response_data = await response.json()
return {
"success": False,
"message": f"Discord API error: {response.status}",
"error": "api_error",
"status": response.status,
"details": response_data,
}
except:
return {
"success": False,
"message": f"Discord API error: {response.status}",
"error": "api_error",
"status": response.status,
}
except asyncio.TimeoutError:
return {
"success": False,
"message": "Timeout sending message to Discord API",
"error": "timeout",
}
except Exception as e:
return {
"success": False,
"message": f"Error sending message: {str(e)}",
"error": "unknown",
"details": str(e),
}
try:
# Execute the request in a proper task context
return await make_request()
except Exception as e:
log.error(f"Error in send_discord_message_via_api: {e}")
return {
"success": False,
"message": f"Error sending message: {str(e)}",
"error": "task_error",
"details": str(e),
}
# ---------------------------------
# Import dependencies after defining settings and constants
# Use absolute imports to avoid issues when running the server directly
from api_service import dependencies # type: ignore
from api_service.api_models import (
Conversation,
NumberData,
UserSettings,
GetConversationsResponse,
UpdateSettingsRequest,
UpdateConversationRequest,
ApiResponse,
)
import api_service.code_verifier_store as code_verifier_store
# Ensure discordbot is in path to import settings_manager
discordbot_path = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if discordbot_path not in sys.path:
sys.path.insert(0, discordbot_path)
try:
import settings_manager # type: ignore # type: ignore
from global_bot_accessor import get_bot_instance
log.info("Successfully imported settings_manager module and get_bot_instance")
except ImportError as e:
log.error(f"Could not import settings_manager or get_bot_instance: {e}")
log.error(
"Ensure the API is run from the project root or discordbot is in PYTHONPATH."
)
settings_manager = None # Set to None to indicate failure
# ============= API Setup =============
# Define lifespan context manager for FastAPI
@asynccontextmanager
async def lifespan(_: FastAPI): # Underscore indicates unused but required parameter
"""Lifespan event handler for FastAPI app."""
global http_session
# Startup: Initialize resources
log.info("Starting API server...")
# Initialize existing database
db.load_data()
log.info("Existing database loaded.")
# Start aiohttp session
http_session = aiohttp.ClientSession()
log.info("aiohttp session started.")
dependencies.set_http_session(http_session) # Pass session to dependencies module
log.info("aiohttp session passed to dependencies module.")
# Initialize settings_manager pools for the API server
# This is necessary because the API server runs in a different thread/event loop
# than the main bot, so it needs its own connection pools
if settings_manager:
log.info("Initializing database and cache connection pools for API server...")
# Add retry logic for database initialization
max_retries = 3
retry_count = 0
success = False
# --- New pool initialization logic ---
import asyncpg
import redis.asyncio as redis
# These will be stored in app.state
# pg_pool = None # No longer module/lifespan local like this for app.state version
# redis_pool = None
try:
app.state.pg_pool = await asyncpg.create_pool(
user=settings.POSTGRES_USER,
password=settings.POSTGRES_PASSWORD,
host=settings.POSTGRES_HOST,
database=settings.POSTGRES_SETTINGS_DB,
min_size=1,
max_size=10,
)
log.info("PostgreSQL pool created and stored in app.state.pg_pool.")
redis_url = f"redis://{':' + settings.REDIS_PASSWORD + '@' if settings.REDIS_PASSWORD else ''}{settings.REDIS_HOST}:{settings.REDIS_PORT}/0"
app.state.redis_pool = await redis.from_url(
redis_url,
decode_responses=True,
)
log.info("Redis pool created and stored in app.state.redis_pool.")
# DO NOT call settings_manager.set_bot_pools from API server.
# The bot (main.py) is responsible for setting the global pools in settings_manager.
# API server will use its own pools from app.state and pass them explicitly if needed.
if not settings_manager:
log.error(
"settings_manager not imported. API endpoints requiring it may fail."
)
except Exception as e:
log.exception(f"Failed to initialize API server's connection pools: {e}")
# Ensure app.state pools are None if creation failed
app.state.pg_pool = None
app.state.redis_pool = None
yield # Lifespan part 1 ends here
# Shutdown: Clean up resources
log.info("Shutting down API server...")
# Save existing database data
db.save_data()
log.info("Existing database saved.")
# Close API server's database/cache pools
if app.state.pg_pool:
await app.state.pg_pool.close()
log.info("API Server's PostgreSQL pool closed.")
app.state.pg_pool = None
if app.state.redis_pool:
await app.state.redis_pool.close() # Assuming redis pool has a close method
log.info("API Server's Redis pool closed.")
app.state.redis_pool = None
# Close aiohttp session
if http_session:
await http_session.close()
log.info("aiohttp session closed.")
# Create the FastAPI app with lifespan
app = FastAPI(title="Unified API Service", lifespan=lifespan, debug=True)
@app.exception_handler(StarletteHTTPException)
async def teapot_override(request: Request, exc: StarletteHTTPException):
try:
# Get path from scope, strip trailing slash, and lowercase
request_path_from_scope = request.scope.get("path", "")
# Ensure it's a string before calling rstrip
if not isinstance(request_path_from_scope, str):
request_path_from_scope = str(request_path_from_scope)
path_processed = request_path_from_scope.rstrip("/").lower()
except Exception as e:
log.error(
f"Error accessing/processing request.scope['path'] in teapot_override: {e}, falling back to request.url.path"
)
# Fallback, also strip trailing slash and lowercase
url_path_str = str(request.url.path)
path_processed = url_path_str.rstrip("/").lower()
# Define the specific openrouterkey paths (normalized: no trailing slash, lowercase)
exact_openrouterkey_paths_normalized = [
"/openrouterkey",
"/api/openrouterkey",
"/discordapi/openrouterkey",
]
is_openrouterkey_related_path_match = (
path_processed in exact_openrouterkey_paths_normalized
)
# Enhanced logging to understand the decision process
log.info(
f"TeapotOverride Debug: "
f"Original Request Path='{request.scope.get('path', 'N/A')}', "
f"Processed Path (path_processed)='{path_processed}', "
f"Exception Status Code={exc.status_code}, "
f"Is OpenRouterKey Related Path Match={is_openrouterkey_related_path_match}"
)
if is_openrouterkey_related_path_match:
# For these specific openrouterkey paths, log the actual exception and re-raise it.
log.warning(
f"Exception for specific openrouterkey path '{request.scope.get('path', 'N/A')}' "
f"(processed as '{path_processed}'): {exc.status_code} - {exc.detail}. "
f"Re-raising original exception."
)
raise exc
# For all other paths, if a 404 occurs, convert it to a 418 teapot response.
if exc.status_code == 404:
log.info(f"Converting 404 to 418 teapot for path: {request.url.path}")
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>418 I'm a teapot</title>
<style>
body { font-family: Arial, sans-serif; background-color: #fef6e4; text-align: center; padding: 50px; }
h1 { font-size: 48px; color: #d62828; }
p { font-size: 24px; color: #2b2d42; }
.teapot { font-size: 100px; }
</style>
</head>
<body>
<div class="teapot">🫖</div>
<h1>418 I'm a teapot</h1>
<p>You asked for something I can't brew. Try a different path.</p>
</body>
</html>
"""
return HTMLResponse(content=html_content, status_code=418)
raise exc
@app.get("/robots.txt", response_class=PlainTextResponse)
async def robots_txt():
return """User-agent: *
Disallow: /
"""
# Add Session Middleware for Dashboard Auth
# Uses DASHBOARD_SECRET_KEY from settings
app.add_middleware(
SessionMiddleware,
secret_key=settings.DASHBOARD_SECRET_KEY,
session_cookie="dashboard_session", # Use a distinct cookie name
max_age=60 * 60 * 24 * 7, # 7 days expiry
)
# Create a sub-application for the API with /api prefix
api_app = FastAPI(
title="Unified API Service", docs_url="/docs", openapi_url="/openapi.json"
)
# Create a sub-application for backward compatibility with /discordapi prefix
# This will be deprecated in the future
discordapi_app = FastAPI(
title="Discord Bot Sync API (DEPRECATED)",
docs_url="/docs",
openapi_url="/openapi.json",
description="This API is deprecated and will be removed in the future. Please use the /api endpoint instead.",
)
# Create a sub-application for the new Dashboard API
dashboard_api_app = FastAPI(
title="Bot Dashboard API",
docs_url="/docs", # Can have its own docs
openapi_url="/openapi.json",
)
# Import dashboard API endpoints
try:
# Use absolute import
from api_service.dashboard_api_endpoints import router as dashboard_router # type: ignore
# Add the dashboard router to the dashboard API app
dashboard_api_app.include_router(dashboard_router)
log.info("Dashboard API endpoints loaded successfully")
# Add direct routes for test-welcome and test-goodbye endpoints
# These routes need to be defined after the dependencies are defined
# We'll add them later
except ImportError as e:
log.error(f"Could not import dashboard API endpoints: {e}")
log.error("Dashboard API endpoints will not be available")
# Import command customization models and endpoints
try:
# Use absolute import
from api_service.command_customization_endpoints import router as customization_router # type: ignore
# Add the command customization router to the dashboard API app
dashboard_api_app.include_router(
customization_router, prefix="/commands", tags=["Command Customization"]
)
log.info("Command customization endpoints loaded successfully")
except ImportError as e:
log.error(f"Could not import command customization endpoints: {e}")
log.error("Command customization endpoints will not be available")
# Import cog management endpoints
try:
# Use absolute import
from api_service.cog_management_endpoints import router as cog_management_router # type: ignore
log.info("Successfully imported cog_management_endpoints")
# Add the cog management router to the dashboard API app
dashboard_api_app.include_router(cog_management_router, tags=["Cog Management"])
log.info("Cog management endpoints loaded successfully")
except ImportError as e:
log.error(f"Could not import cog management endpoints: {e}")
# Try to import the module directly to see what's available (for debugging)
try:
import sys
log.info(f"Python path: {sys.path}")
# Try to find the module in the current directory
import os
current_dir = os.path.dirname(os.path.abspath(__file__))
log.info(f"Current directory: {current_dir}")
files = os.listdir(current_dir)
log.info(f"Files in current directory: {files}")
# Try to import the module with a full path
sys.path.append(current_dir)
import cog_management_endpoints # type: ignore
log.info(f"Successfully imported cog_management_endpoints module directly")
except Exception as e_debug:
log.error(f"Debug import failed: {e_debug}")
log.error("Cog management endpoints will not be available")
# Mount the API apps at their respective paths
app.mount("/api", api_app)
app.mount("/discordapi", discordapi_app)
app.mount("/dashboard/api", dashboard_api_app) # Mount the new dashboard API
# Import and mount webhook endpoints
try:
from api_service.webhook_endpoints import (
router as webhook_router,
) # Relative import
app.mount(
"/webhook", webhook_router
) # Mount directly on the main app for simplicity
# Import and mount terminal images endpoint
try:
from api_service.terminal_images_endpoint import mount_terminal_images
# Mount terminal images directory as static files
mount_terminal_images(app)
log.info("Terminal images endpoint mounted successfully")
except ImportError as e:
log.error(f"Could not import terminal images endpoint: {e}")
log.error("Terminal images endpoint will not be available")
# After mounting the webhook router
log.info("Available routes in webhook_router:")
from fastapi.routing import APIRoute, Mount
for route in webhook_router.routes:
if isinstance(route, APIRoute):
log.info(f" {route.path} - {route.name} - {route.methods}")
elif isinstance(route, Mount):
log.info(f" {route.path} - {route.name} - Mounted app or static files")
else:
log.info(f" {route.path} - {route.name} - Unknown route type")
# Or, if you prefer to nest it under /api:
# api_app.include_router(webhook_router, prefix="/webhooks", tags=["Webhooks"])
log.info("Webhook endpoints loaded and mounted successfully at /webhook")
except ImportError as e:
log.error(f"Could not import or mount webhook endpoints: {e}")
# Attempt to find the module for debugging
try:
import sys
log.info(f"Python path: {sys.path}")
import os
current_dir = os.path.dirname(os.path.abspath(__file__))
log.info(f"Current directory for webhook_endpoints: {current_dir}")
files_in_current_dir = os.listdir(current_dir)
log.info(f"Files in {current_dir}: {files_in_current_dir}")
if "webhook_endpoints.py" in files_in_current_dir:
log.info("webhook_endpoints.py found in current directory.")
else:
log.warning("webhook_endpoints.py NOT found in current directory.")
except Exception as e_debug:
log.error(f"Debug check for webhook_endpoints failed: {e_debug}")
# Log the available routes for debugging
log.info("Available routes in dashboard_api_app:")
from fastapi.routing import APIRoute, Mount
for route in dashboard_api_app.routes:
if isinstance(route, APIRoute):
log.info(f" {route.path} - {route.name} - {route.methods}")
elif isinstance(route, Mount):
log.info(f" {route.path} - {route.name} - Mounted app or static files")
else:
log.info(f" {route.path} - {route.name} - Unknown route type")
# Create a middleware for redirecting /discordapi to /api with a deprecation warning
class DeprecationRedirectMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
# Check if the path starts with /discordapi
if request.url.path.startswith("/discordapi"):
# Add a deprecation warning header
response = await call_next(request)
response.headers["X-API-Deprecation-Warning"] = (
"This endpoint is deprecated. Please use /api instead."
)
return response
return await call_next(request)
# Add CORS middleware to all apps
for current_app in [app, api_app, discordapi_app]:
current_app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Adjust this in production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Add the deprecation middleware to the main app
app.add_middleware(DeprecationRedirectMiddleware)
# Initialize database (existing)
db = Database()
# --- aiohttp Session for Discord API calls ---
http_session = None
# ============= Authentication =============
async def verify_discord_token(authorization: str = Header(None)) -> str:
"""Verify the Discord token and return the user ID"""
if not authorization:
raise HTTPException(status_code=401, detail="Authorization header missing")
if not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Invalid authorization format")
token = authorization.replace("Bearer ", "")
# Verify the token with Discord
async with aiohttp.ClientSession() as session:
headers = {"Authorization": f"Bearer {token}"}
async with session.get(
"https://discord.com/api/v10/users/@me", headers=headers
) as resp:
if resp.status != 200:
raise HTTPException(status_code=401, detail="Invalid Discord token")
user_data = await resp.json()
return user_data["id"]
# ============= API Endpoints =============
# Log the available routes for debugging
log.info("Available routes in main app:")
from fastapi.routing import APIRoute, Mount
for route in app.routes:
if isinstance(route, APIRoute):
log.info(f" {route.path} - {route.name} - {route.methods}")
elif isinstance(route, Mount):
log.info(f" {route.path} - {route.name} - Mounted app or static files")
else:
log.info(f" {route.path} - {route.name} - Unknown route type")
@app.get("/")
async def root():
return RedirectResponse(
url="https://www.youtube.com/watch?v=dQw4w9WgXcQ", status_code=301
)
# Add the same endpoint to the api_app to ensure it's accessible
@api_app.get("/openrouterkey", response_class=PlainTextResponse)
async def api_openrouterkey(request: Request):
"""private endpoint return openrouter api key (api_app version)"""
# Basic security check
auth_header = request.headers.get("Authorization")
# Use loaded setting
if (
not settings.MOD_LOG_API_SECRET
or not auth_header
or auth_header != f"Bearer {settings.MOD_LOG_API_SECRET}"
):
print("Unauthorized attempt to access OpenRouter key (api_app).")
raise HTTPException(status_code=403, detail="Forbidden")
# Add debug logging
log.info(
f"OpenRouter key request authorized (api_app). AI_API_KEY is {'set' if settings.AI_API_KEY else 'not set'}"
)
# Check if AI_API_KEY is set
if not settings.AI_API_KEY:
log.error("AI_API_KEY is not set in environment variables")
raise HTTPException(status_code=500, detail="AI_API_KEY not configured")
return f"{settings.AI_API_KEY}"
# Add the same endpoint to the discordapi_app to ensure it's accessible
@discordapi_app.get("/openrouterkey", response_class=PlainTextResponse)
async def discordapi_openrouterkey(request: Request):
"""private endpoint return openrouter api key (discordapi_app version)"""
# Basic security check
auth_header = request.headers.get("Authorization")
# Use loaded setting
if (
not settings.MOD_LOG_API_SECRET
or not auth_header
or auth_header != f"Bearer {settings.MOD_LOG_API_SECRET}"
):
print("Unauthorized attempt to access OpenRouter key (discordapi_app).")
raise HTTPException(status_code=403, detail="Forbidden")
# Add debug logging
log.info(
f"OpenRouter key request authorized (discordapi_app). AI_API_KEY is {'set' if settings.AI_API_KEY else 'not set'}"
)
# Check if AI_API_KEY is set
if not settings.AI_API_KEY:
log.error("AI_API_KEY is not set in environment variables")
raise HTTPException(status_code=500, detail="AI_API_KEY not configured")
return f"{settings.AI_API_KEY}"
@app.get("/discord")
async def root():
return RedirectResponse(url="https://discord.gg/gebDRq6u", status_code=301)
@app.get("/discordbot")
async def root():
return RedirectResponse(
url="https://discord.com/oauth2/authorize?client_id=1360717457852993576",
status_code=301,
)
@app.get("/ip")
async def ip(request: Request):
return Response(content=request.client.host, media_type="text/plain")
@app.get("/agent")
async def agent(request: Request):
return Response(
content=request.headers.get("user-agent", request.client.host),
media_type="text/plain",
)
@app.get("/debug-settings", response_class=PlainTextResponse)
async def debug_settings(request: Request):
"""Debug endpoint to check if settings are loaded correctly"""
# Basic security check - only allow from localhost or with the same auth as openrouterkey
client_host = request.client.host
auth_header = request.headers.get("Authorization")
is_local = (
client_host == "127.0.0.1"
or client_host == "::1"
or client_host.startswith("172.")
)
is_authorized = (
auth_header
and settings.MOD_LOG_API_SECRET
and auth_header == f"Bearer {settings.MOD_LOG_API_SECRET}"
)
if not (is_local or is_authorized):
print(f"Unauthorized attempt to access debug settings from {client_host}.")
raise HTTPException(status_code=403, detail="Forbidden")
# Return a summary of the settings
settings_summary = [
f"API_HOST: {settings.API_HOST}",
f"API_PORT: {settings.API_PORT}",
f"MOD_LOG_API_SECRET: {'set' if settings.MOD_LOG_API_SECRET else 'not set'}",
f"AI_API_KEY: {'set' if settings.AI_API_KEY else 'not set'}",
f"DISCORD_CLIENT_ID: {'set' if settings.DISCORD_CLIENT_ID else 'not set'}",
f"DISCORD_CLIENT_SECRET: {'set' if settings.DISCORD_CLIENT_SECRET else 'not set'}",
f"DISCORD_REDIRECT_URI: {settings.DISCORD_REDIRECT_URI}",
f"POSTGRES_SETTINGS_DB: {settings.POSTGRES_SETTINGS_DB}",
f"REDIS_HOST: {settings.REDIS_HOST}",
]
return "\n".join(settings_summary)
# Add root for dashboard API for clarity
@dashboard_api_app.get("/")
async def dashboard_api_root():
return {"message": "Bot Dashboard API is running"}
# Add a test endpoint for cogs
@dashboard_api_app.get("/test-cogs", tags=["Test"])
async def test_cogs_endpoint():
"""Test endpoint to verify the API server is working correctly."""
return {"message": "Test cogs endpoint is working"}
# Add a direct endpoint for cogs without dependencies
@dashboard_api_app.get("/guilds/{guild_id}/cogs-direct", tags=["Test"])
async def get_guild_cogs_no_deps(guild_id: int):
"""Get all cogs for a guild without any dependencies."""
try:
# First try to get cogs from the bot instance
bot = None
try:
import discord_bot_sync_api # type: ignore
bot = discord_bot_sync_api.bot_instance
except (ImportError, AttributeError) as e:
log.warning(f"Could not import bot instance: {e}")
# Check if settings_manager is available
bot = get_bot_instance()
if not settings_manager or not bot or not bot.pg_pool:
return {
"error": "Settings manager or database connection not available",
"cogs": [],
}
# Get cogs from the database directly if bot is not available
cogs_list = []
if bot:
# Get cogs from the bot instance
log.info(f"Getting cogs from bot instance for guild {guild_id}")
for cog_name, cog in bot.cogs.items():
# Get enabled status from settings_manager
is_enabled = True
try:
is_enabled = await settings_manager.is_cog_enabled(
guild_id, cog_name, default_enabled=True
)
except Exception as e:
log.error(f"Error getting cog enabled status: {e}")
cogs_list.append(
{
"name": cog_name,
"description": cog.__doc__ or "No description available",
"enabled": is_enabled,
}
)
else:
# Fallback: Get cogs from the database directly
log.info(f"Getting cogs from database for guild {guild_id}")
try:
# Get all cog enabled statuses from the database
cog_statuses = await settings_manager.get_all_enabled_cogs(guild_id)
# Add each cog to the list
for cog_name, is_enabled in cog_statuses.items():
cogs_list.append(
{
"name": cog_name,
"description": "Description not available (bot instance not accessible)",
"enabled": is_enabled,
}
)
# If no cogs were found, add some default cogs
if not cogs_list:
default_cogs = [
"SettingsCog",
"HelpCog",
"ModerationCog",
"WelcomeCog",
"GurtCog",
"EconomyCog",
"UtilityCog",
]
for cog_name in default_cogs:
# Try to get the enabled status from the database
try:
is_enabled = await settings_manager.is_cog_enabled(
guild_id, cog_name, default_enabled=True
)
except Exception:
is_enabled = True
cogs_list.append(
{
"name": cog_name,
"description": "Default cog (bot instance not accessible)",
"enabled": is_enabled,
}
)
except Exception as e:
log.error(f"Error getting cogs from database: {e}")
return {
"error": f"Error getting cogs from database: {str(e)}",
"cogs": [],
}
return {"cogs": cogs_list}
except Exception as e:
log.error(f"Error getting cogs for guild {guild_id}: {e}")
return {"error": str(e), "cogs": []}
@discordapi_app.get("/")
async def discordapi_root():
return {
"message": "DEPRECATED: This API endpoint (/discordapi) is deprecated and will be removed in the future.",
"recommendation": "Please update your client to use the /api endpoint instead.",
"new_endpoint": "/api",
}
# Discord OAuth configuration now loaded via ApiSettings above
# DISCORD_CLIENT_ID = os.getenv("DISCORD_CLIENT_ID", "1360717457852993576")
# DISCORD_REDIRECT_URI = os.getenv("DISCORD_REDIRECT_URI", "https://slipstreamm.dev/api/auth")
# DISCORD_API_ENDPOINT = "https://discord.com/api/v10"
# DISCORD_TOKEN_URL = f"{DISCORD_API_ENDPOINT}/oauth2/token"
# The existing /auth endpoint seems to handle a different OAuth flow (PKCE, no client secret)
# than the one needed for the dashboard (Authorization Code Grant with client secret).
# We will add the new dashboard auth flow under a different path prefix, e.g., /dashboard/api/auth/...
# Keep the existing /auth endpoint as is for now.
# @app.get("/auth") # Keep existing
@api_app.get("/auth")
@discordapi_app.get("/auth")
async def auth(
code: str, state: str = None, code_verifier: str = None, request: Request = None
):
"""Handle OAuth callback from Discord"""
try:
# Log the request details for debugging
print(f"Received OAuth callback with code: {code[:10]}...")
print(f"State: {state}")
print(f"Code verifier provided directly in URL: {code_verifier is not None}")
print(f"Request URL: {request.url if request else 'No request object'}")
print(f"Configured redirect URI: {DISCORD_REDIRECT_URI}")
# Exchange the code for a token
async with aiohttp.ClientSession() as session:
# For public clients, we don't include a client secret
# We use PKCE for security
# Get the actual redirect URI that Discord used
# This is important because we need to use the same redirect URI when exchanging the code
actual_redirect_uri = DISCORD_REDIRECT_URI
# If the request has a referer header, use that to extract the redirect URI
referer = request.headers.get("referer") if request else None
if referer and "code=" in referer:
# Extract the redirect URI from the referer
from urllib.parse import urlparse
parsed_url = urlparse(referer)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
print(f"Extracted base URL from referer: {base_url}")
# Use this as the redirect URI if it's different from the configured one
if base_url != DISCORD_REDIRECT_URI:
print(f"Using redirect URI from referer: {base_url}")
actual_redirect_uri = base_url
data = {
"client_id": settings.DISCORD_CLIENT_ID, # Use loaded setting
"grant_type": "authorization_code",
"code": code,
"redirect_uri": actual_redirect_uri,
}
# First try to get the code verifier from the store using the state parameter
# This is the most reliable method since the code verifier should have been stored
# by the Discord bot before the user was redirected here
stored_code_verifier = None
if state:
stored_code_verifier = code_verifier_store.get_code_verifier(state)
if stored_code_verifier:
print(
f"Found code_verifier in store for state {state}: {stored_code_verifier[:10]}..."
)
else:
print(
f"No code_verifier found in store for state {state}, will check other sources"
)
# If we have a code_verifier parameter directly in the URL, use that
if code_verifier:
data["code_verifier"] = code_verifier
print(
f"Using code_verifier from URL parameter: {code_verifier[:10]}..."
)
# Otherwise use the stored code verifier if available
elif stored_code_verifier:
data["code_verifier"] = stored_code_verifier
print(f"Using code_verifier from store: {stored_code_verifier[:10]}...")
# Remove the code verifier from the store after using it
code_verifier_store.remove_code_verifier(state)
else:
# If we still don't have a code verifier, log a warning
print(
f"WARNING: No code_verifier found for state {state} - OAuth will likely fail"
)
# Return a more helpful error message
return {
"message": "Authentication failed",
"error": "Missing code_verifier. This is required for PKCE OAuth flow. Please ensure the code_verifier is properly sent to the API server.",
}
# Log the token exchange request for debugging
print(f"Exchanging code for token with data: {data}")
async with session.post(DISCORD_TOKEN_URL, data=data) as resp:
if resp.status != 200:
error_text = await resp.text()
print(f"Failed to exchange code: {error_text}")
return {"message": "Authentication failed", "error": error_text}
token_data = await resp.json()
# Get the user's information
access_token = token_data.get("access_token")
if not access_token:
return {
"message": "Authentication failed",
"error": "No access token in response",
}
# Get the user's Discord ID
headers = {"Authorization": f"Bearer {access_token}"}
async with session.get(
f"{DISCORD_API_ENDPOINT}/users/@me", headers=headers
) as user_resp:
if user_resp.status != 200:
error_text = await user_resp.text()
print(f"Failed to get user info: {error_text}")
return {"message": "Authentication failed", "error": error_text}
user_data = await user_resp.json()
user_id = user_data.get("id")
if not user_id:
return {
"message": "Authentication failed",
"error": "No user ID in response",
}
# Store the token in the database
db.save_user_token(user_id, token_data)
print(f"Successfully authenticated user {user_id} and saved token")
# Check if this is a programmatic request (from the bot) or a browser request
accept_header = request.headers.get("accept", "")
is_browser = "text/html" in accept_header.lower()
if is_browser:
# Return a success page with instructions for browser requests
html_content = f"""
<html>
<head>
<title>Authentication Successful</title>
<style>
body {{ font-family: Arial, sans-serif; text-align: center; padding: 50px; }}
.success {{ color: green; }}
.info {{ margin-top: 20px; }}
</style>
</head>
<body>
<h1 class="success">Authentication Successful!</h1>
<p>You have successfully authenticated with Discord.</p>
<div class="info">
<p>You can now close this window and return to Discord.</p>
<p>Your Discord bot is now authorized to access the API on your behalf.</p>
</div>
</body>
</html>
"""
return Response(content=html_content, media_type="text/html")
else:
# Return JSON response with token for programmatic requests
return {
"message": "Authentication successful",
"user_id": user_id,
"token": token_data,
}
except Exception as e:
print(f"Error in auth endpoint: {str(e)}")
return {"message": "Authentication failed", "error": str(e)}
# ============= Dashboard API Models =============
# Models are now in dashboard_models.py
# Dependencies are now in dependencies.py
from api_service.dashboard_models import ( # type: ignore
GuildSettingsResponse,
GuildSettingsUpdate,
CommandPermission,
CommandPermissionsResponse,
CogInfo, # Needed for direct cog endpoint
# Other models used by imported routers are not needed here directly
)
# --- AI Moderation Action Model ---
class AIModerationAction(BaseModel):
timestamp: str
guild_id: int
guild_name: str
channel_id: int
channel_name: str
message_id: int
message_link: str
user_id: int
user_name: str
action: str
rule_violated: str
reasoning: str
violation: bool
message_content: str
attachments: list[str] = []
ai_model: str
result: str
# Helper function to execute a warning
async def execute_warning(bot, guild_id: int, user_id: int, reason: str) -> dict:
"""
Execute a warning action using the bot's moderation system.
Args:
bot: The bot instance
guild_id: The Discord guild ID
user_id: The target user's Discord ID
reason: The reason for the warning
Returns:
A dictionary with the result of the warning operation
"""
try:
# Get the guild and member objects
guild = bot.get_guild(guild_id)
if not guild:
log.error(f"Could not find guild with ID {guild_id}")
return {"success": False, "error": "Guild not found"}
# Get the member object
member = guild.get_member(user_id)
if not member:
try:
# Try to fetch the member if not in cache
member = await guild.fetch_member(user_id)
except discord.NotFound:
log.error(
f"Could not find member with ID {user_id} in guild {guild_id}"
)
return {"success": False, "error": "Member not found"}
except Exception as e:
log.error(
f"Error fetching member with ID {user_id} in guild {guild_id}: {e}"
)
return {"success": False, "error": f"Error fetching member: {str(e)}"}
# Get the moderation cog
moderation_cog = bot.get_cog("ModerationCog")
if not moderation_cog:
log.error(f"ModerationCog not found")
return {"success": False, "error": "ModerationCog not found"}
# Create a fake interaction for the warning
# We'll use the bot as the "user" who issued the warning
class FakeInteraction:
def __init__(self, guild, bot_user):
self.guild = guild
self.user = bot_user
self.response = self
async def send_message(self, content, ephemeral=False):
# Log the message but don't actually send it
log.info(f"AI Warning message: {content}")
return None
# Create the fake interaction
interaction = FakeInteraction(guild, bot.user)
# Call the warn method
await moderation_cog.moderate_warn_callback(interaction, member, reason)
log.info(
f"Successfully executed warning for user {user_id} in guild {guild_id}"
)
return {"success": True, "message": "Warning executed successfully"}
except Exception as e:
log.error(
f"Error executing warning for user {user_id} in guild {guild_id}: {e}"
)
import traceback
tb = traceback.format_exc()
return {"success": False, "error": str(e), "traceback": tb}
# ============= Dashboard API Routes =============
# (Mounted under /dashboard/api)
# Dependencies are imported from dependencies.py
# --- Direct Cog Management Endpoints ---
# These are direct implementations in case the imported endpoints don't work
class CogCommandInfo(BaseModel):
name: str
description: Optional[str] = None
enabled: bool = True
class CogInfo(BaseModel):
name: str
description: Optional[str] = None
enabled: bool = True
commands: List[Dict[str, Any]] = []
@dashboard_api_app.get(
"/guilds/{guild_id}/cogs", response_model=List[CogInfo], tags=["Cog Management"]
)
async def get_guild_cogs_direct(
guild_id: int,
_user: dict = Depends(dependencies.get_dashboard_user),
_admin: bool = Depends(dependencies.verify_dashboard_guild_admin),
):
"""Get all cogs and their commands for a guild."""
try:
# First try to get cogs from the bot instance
bot = None
try:
import discord_bot_sync_api # type: ignore
bot = discord_bot_sync_api.bot_instance
except (ImportError, AttributeError) as e:
log.warning(f"Could not import bot instance: {e}")
# Check if settings_manager is available
bot = get_bot_instance()
if not settings_manager or not bot or not bot.pg_pool:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Settings manager or database connection not available",
)
# Get cogs from the database directly if bot is not available
cogs_list = []
if bot:
# Get cogs from the bot instance
log.info(f"Getting cogs from bot instance for guild {guild_id}")
for cog_name, cog in bot.cogs.items():
# Get enabled status from settings_manager
is_enabled = await settings_manager.is_cog_enabled(
guild_id, cog_name, default_enabled=True
)
# Get commands for this cog
commands_list = []
for command in cog.get_commands():
# Get command enabled status
cmd_enabled = await settings_manager.is_command_enabled(
guild_id, command.qualified_name, default_enabled=True
)
commands_list.append(
{
"name": command.qualified_name,
"description": command.help or "No description available",
"enabled": cmd_enabled,
}
)
# Add slash commands if any
app_commands = [
cmd
for cmd in bot.tree.get_commands()
if hasattr(cmd, "cog")
and cmd.cog
and cmd.cog.qualified_name == cog_name
]
for cmd in app_commands:
# Get command enabled status
cmd_enabled = await settings_manager.is_command_enabled(
guild_id, cmd.name, default_enabled=True
)
if not any(
c["name"] == cmd.name for c in commands_list
): # Avoid duplicates
commands_list.append(
{
"name": cmd.name,
"description": cmd.description
or "No description available",
"enabled": cmd_enabled,
}
)
cogs_list.append(
CogInfo(
name=cog_name,
description=cog.__doc__ or "No description available",
enabled=is_enabled,
commands=commands_list,
)
)
else:
# Fallback: Get cogs from the database directly
log.info(f"Getting cogs from database for guild {guild_id}")
try:
# Get all cog enabled statuses from the database
cog_statuses = await settings_manager.get_all_enabled_cogs(guild_id)
# Get all command enabled statuses from the database
command_statuses = await settings_manager.get_all_enabled_commands(
guild_id
)
# Add each cog to the list
for cog_name, is_enabled in cog_statuses.items():
# Create a list of commands for this cog
commands_list = []
# Find commands that might belong to this cog
# We'll use a naming convention where commands starting with the cog name
# (minus "Cog" suffix) are assumed to belong to that cog
cog_prefix = cog_name.lower().replace("cog", "")
for cmd_name, cmd_enabled in command_statuses.items():
if cmd_name.lower().startswith(cog_prefix):
commands_list.append(
{
"name": cmd_name,
"description": "Description not available (bot instance not accessible)",
"enabled": cmd_enabled,
}
)
cogs_list.append(
CogInfo(
name=cog_name,
description="Description not available (bot instance not accessible)",
enabled=is_enabled,
commands=commands_list,
)
)
# If no cogs were found, add some default cogs
if not cogs_list:
default_cogs = [
"SettingsCog",
"HelpCog",
"ModerationCog",
"WelcomeCog",
"GurtCog",
"EconomyCog",
"UtilityCog",
]
for cog_name in default_cogs:
# Try to get the enabled status from the database
try:
is_enabled = await settings_manager.is_cog_enabled(
guild_id, cog_name, default_enabled=True
)
except Exception:
is_enabled = True
# Add some default commands for this cog
commands_list = []
cog_prefix = cog_name.lower().replace("cog", "")
default_commands = {
"settings": ["set", "get", "reset"],
"help": ["help", "commands"],
"moderation": ["ban", "kick", "mute", "unmute", "warn"],
"welcome": [
"welcome",
"goodbye",
"setwelcome",
"setgoodbye",
],
"gurt": ["gurt", "gurtset"],
"economy": ["balance", "daily", "work", "gamble"],
"utility": ["ping", "info", "serverinfo", "userinfo"],
}
if cog_prefix in default_commands:
for cmd_suffix in default_commands[cog_prefix]:
cmd_name = f"{cog_prefix}{cmd_suffix}"
try:
cmd_enabled = (
await settings_manager.is_command_enabled(
guild_id, cmd_name, default_enabled=True
)
)
except Exception:
cmd_enabled = True
commands_list.append(
{
"name": cmd_name,
"description": "Default command (bot instance not accessible)",
"enabled": cmd_enabled,
}
)
cogs_list.append(
CogInfo(
name=cog_name,
description="Default cog (bot instance not accessible)",
enabled=is_enabled,
commands=commands_list,
)
)
except Exception as e:
log.error(f"Error getting cogs from database: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error getting cogs from database: {str(e)}",
)
return cogs_list
except HTTPException:
# Re-raise HTTP exceptions
raise
except Exception as e:
log.error(f"Error getting cogs for guild {guild_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error getting cogs: {str(e)}",
)
@dashboard_api_app.patch(
"/guilds/{guild_id}/cogs/{cog_name}",
status_code=status.HTTP_200_OK,
tags=["Cog Management"],
)
async def update_cog_status_direct(
guild_id: int,
cog_name: str,
enabled: bool = Body(..., embed=True),
_user: dict = Depends(dependencies.get_dashboard_user),
_admin: bool = Depends(dependencies.verify_dashboard_guild_admin),
):
"""Enable or disable a cog for a guild."""
try:
# Check if settings_manager is available
if not settings_manager:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Settings manager not available",
)
# Get the bot instance to check if pools are available
from global_bot_accessor import get_bot_instance # type: ignore
bot_instance = get_bot_instance()
if not bot_instance or not bot_instance.pg_pool:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Database connection not available",
)
# Try to get the bot instance, but don't require it
bot = None
try:
import discord_bot_sync_api # type: ignore # type: ignore
bot = discord_bot_sync_api.bot_instance
except (ImportError, AttributeError) as e:
log.warning(f"Could not import bot instance: {e}")
# If we have a bot instance, do some additional checks
if bot:
# Check if the cog exists
if cog_name not in bot.cogs:
log.warning(
f"Cog '{cog_name}' not found in bot instance, but proceeding anyway"
)
else:
# Check if it's a core cog
core_cogs = getattr(bot, "core_cogs", {"SettingsCog", "HelpCog"})
if cog_name in core_cogs and not enabled:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Core cog '{cog_name}' cannot be disabled",
)
else:
# If we don't have a bot instance, check if this is a known core cog
if cog_name in ["SettingsCog", "HelpCog"] and not enabled:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Core cog '{cog_name}' cannot be disabled",
)
# Update the cog enabled status
success = await settings_manager.set_cog_enabled(guild_id, cog_name, enabled)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to update cog '{cog_name}' status",
)
return {
"message": f"Cog '{cog_name}' {'enabled' if enabled else 'disabled'} successfully"
}
except HTTPException:
# Re-raise HTTP exceptions
raise
except Exception as e:
log.error(f"Error updating cog status for guild {guild_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error updating cog status: {str(e)}",
)
@dashboard_api_app.patch(
"/guilds/{guild_id}/commands/{command_name}",
status_code=status.HTTP_200_OK,
tags=["Cog Management"],
)
async def update_command_status_direct(
guild_id: int,
command_name: str,
enabled: bool = Body(..., embed=True),
_user: dict = Depends(dependencies.get_dashboard_user),
_admin: bool = Depends(dependencies.verify_dashboard_guild_admin),
):
"""Enable or disable a command for a guild."""
try:
# Check if settings_manager is available
if not settings_manager:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Settings manager not available",
)
# Get the bot instance to check if pools are available
from global_bot_accessor import get_bot_instance # type: ignore
bot_instance = get_bot_instance()
if not bot_instance or not bot_instance.pg_pool:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Database connection not available",
)
# Try to get the bot instance, but don't require it
bot = None
try:
import discord_bot_sync_api # type: ignore
bot = discord_bot_sync_api.bot_instance
except (ImportError, AttributeError) as e:
log.warning(f"Could not import bot instance: {e}")
# If we have a bot instance, check if the command exists
if bot:
# Check if it's a prefix command
command = bot.get_command(command_name)
if not command:
# Check if it's an app command
app_commands = [
cmd for cmd in bot.tree.get_commands() if cmd.name == command_name
]
if not app_commands:
log.warning(
f"Command '{command_name}' not found in bot instance, but proceeding anyway"
)
# Update the command enabled status
success = await settings_manager.set_command_enabled(
guild_id, command_name, enabled
)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to update command '{command_name}' status",
)
return {
"message": f"Command '{command_name}' {'enabled' if enabled else 'disabled'} successfully"
}
except HTTPException:
# Re-raise HTTP exceptions
raise
except Exception as e:
log.error(f"Error updating command status for guild {guild_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error updating command status: {str(e)}",
)
# --- Dashboard Authentication Routes ---
@dashboard_api_app.get("/auth/login", tags=["Dashboard Authentication"])
async def dashboard_login():
"""Redirects the user to Discord for OAuth2 authorization (Dashboard Flow) with PKCE."""
import secrets
import hashlib
import base64
# Generate a random state for CSRF protection
state = secrets.token_urlsafe(32)
# Generate a code verifier for PKCE
code_verifier = secrets.token_urlsafe(64)
# Generate a code challenge from the code verifier
code_challenge_bytes = hashlib.sha256(code_verifier.encode()).digest()
code_challenge = base64.urlsafe_b64encode(code_challenge_bytes).decode().rstrip("=")
# Store the code verifier for later use
code_verifier_store.store_code_verifier(state, code_verifier)
# Build the authorization URL with PKCE parameters using the dashboard-specific redirect URI
auth_url = (
f"{DASHBOARD_AUTH_BASE_URL}"
f"&state={state}"
f"&code_challenge={code_challenge}"
f"&code_challenge_method=S256"
f"&prompt=consent"
)
log.info(f"Dashboard: Redirecting user to Discord auth URL with PKCE: {auth_url}")
log.info(f"Dashboard: Using redirect URI: {DASHBOARD_REDIRECT_URI}")
log.info(
f"Dashboard: Stored code verifier for state {state}: {code_verifier[:10]}..."
)
return RedirectResponse(
url=auth_url, status_code=status.HTTP_307_TEMPORARY_REDIRECT
)
@dashboard_api_app.get("/auth/callback", tags=["Dashboard Authentication"])
async def dashboard_auth_callback(
request: Request,
code: str | None = None,
state: str | None = None,
error: str | None = None,
):
"""Handles the callback from Discord after authorization (Dashboard Flow)."""
global http_session # Use the global aiohttp session
if error:
log.error(f"Dashboard: Discord OAuth error: {error}")
return RedirectResponse(
url="/dashboard?error=discord_auth_failed"
) # Redirect to frontend dashboard root
if not code:
log.error("Dashboard: Discord OAuth callback missing code.")
return RedirectResponse(url="/dashboard?error=missing_code")
if not state:
log.error("Dashboard: Discord OAuth callback missing state parameter.")
return RedirectResponse(url="/dashboard?error=missing_state")
if not http_session:
log.error("Dashboard: aiohttp session not initialized.")
raise HTTPException(
status_code=500, detail="Internal server error: HTTP session not ready."
)
try:
# Get the code verifier from the store
code_verifier = code_verifier_store.get_code_verifier(state)
if not code_verifier:
log.error(f"Dashboard: No code_verifier found for state {state}")
return RedirectResponse(url="/dashboard?error=missing_code_verifier")
log.info(
f"Dashboard: Found code_verifier for state {state}: {code_verifier[:10]}..."
)
# Remove the code verifier from the store after retrieving it
code_verifier_store.remove_code_verifier(state)
# 1. Exchange code for access token with PKCE
token_data = {
"client_id": settings.DISCORD_CLIENT_ID,
"grant_type": "authorization_code",
"code": code,
"redirect_uri": DASHBOARD_REDIRECT_URI, # Must match exactly what was used in the auth request
"code_verifier": code_verifier, # Add the code verifier for PKCE
}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
log.debug(
f"Dashboard: Exchanging code for token at {DISCORD_TOKEN_URL} with PKCE"
)
log.debug(f"Dashboard: Token exchange data: {token_data}")
async with http_session.post(
DISCORD_TOKEN_URL, data=token_data, headers=headers
) as resp:
if resp.status != 200:
error_text = await resp.text()
log.error(f"Dashboard: Failed to exchange code: {error_text}")
return RedirectResponse(
url=f"/dashboard?error=token_exchange_failed&details={error_text}"
)
token_response = await resp.json()
access_token = token_response.get("access_token")
log.debug("Dashboard: Token exchange successful.")
if not access_token:
log.error("Dashboard: Failed to get access token from Discord response.")
raise HTTPException(
status_code=500, detail="Could not retrieve access token from Discord."
)
# 2. Fetch user data
user_headers = {"Authorization": f"Bearer {access_token}"}
log.debug(f"Dashboard: Fetching user data from {DISCORD_USER_URL}")
async with http_session.get(DISCORD_USER_URL, headers=user_headers) as resp:
resp.raise_for_status()
user_data = await resp.json()
log.debug(
f"Dashboard: User data fetched successfully for user ID: {user_data.get('id')}"
)
# 3. Store in session
request.session["user_id"] = user_data.get("id")
request.session["username"] = user_data.get("username")
request.session["avatar"] = user_data.get("avatar")
request.session["access_token"] = access_token
log.info(
f"Dashboard: User {user_data.get('username')} ({user_data.get('id')}) logged in successfully."
)
# Redirect user back to the main dashboard page (served by static files)
return RedirectResponse(
url="/dashboard", status_code=status.HTTP_307_TEMPORARY_REDIRECT
)
except aiohttp.ClientResponseError as e:
log.exception(
f"Dashboard: HTTP error during Discord OAuth callback: {e.status} {e.message}"
)
error_detail = "Unknown Discord API error"
try:
error_body = await e.response.json()
error_detail = error_body.get("error_description", error_detail)
except Exception:
pass
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Error communicating with Discord: {error_detail}",
)
except Exception as e:
log.exception(f"Dashboard: Generic error during Discord OAuth callback: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="An internal error occurred during authentication.",
)
@dashboard_api_app.post(
"/auth/logout",
tags=["Dashboard Authentication"],
status_code=status.HTTP_204_NO_CONTENT,
)
async def dashboard_logout(request: Request):
"""Clears the dashboard user session."""
user_id = request.session.get("user_id")
request.session.clear()
log.info(f"Dashboard: User {user_id} logged out.")
return
@dashboard_api_app.get("/auth/status", tags=["Dashboard Authentication"])
async def dashboard_auth_status(request: Request):
"""Checks if the user is authenticated in the dashboard session."""
user_id = request.session.get("user_id")
username = request.session.get("username")
access_token = request.session.get("access_token")
if not user_id or not username or not access_token:
log.debug("Dashboard: Auth status check - user not authenticated")
return {"authenticated": False, "message": "User is not authenticated"}
# Verify the token is still valid with Discord
try:
if not http_session:
log.error("Dashboard: aiohttp session not initialized.")
return {
"authenticated": False,
"message": "Internal server error: HTTP session not ready",
}
user_headers = {"Authorization": f"Bearer {access_token}"}
async with http_session.get(DISCORD_USER_URL, headers=user_headers) as resp:
if resp.status != 200:
log.warning(
f"Dashboard: Auth status check - invalid token for user {user_id}"
)
# Clear the invalid session
request.session.clear()
return {
"authenticated": False,
"message": "Discord token invalid or expired",
}
# Token is valid, get the latest user data
user_data = await resp.json()
# Update session with latest data
request.session["username"] = user_data.get("username")
request.session["avatar"] = user_data.get("avatar")
log.debug(f"Dashboard: Auth status check - user {user_id} is authenticated")
return {
"authenticated": True,
"user": {
"id": user_id,
"username": user_data.get("username"),
"avatar": user_data.get("avatar"),
},
}
except Exception as e:
log.exception(f"Dashboard: Error checking auth status: {e}")
return {
"authenticated": False,
"message": f"Error checking auth status: {str(e)}",
}
# --- Dashboard User Endpoints ---
@dashboard_api_app.get("/user/me", tags=["Dashboard User"])
async def dashboard_get_user_me(
current_user: dict = Depends(dependencies.get_dashboard_user),
):
"""Returns information about the currently logged-in dashboard user."""
user_info = current_user.copy()
# del user_info['access_token'] # Optional: Don't expose token to frontend
return user_info
@dashboard_api_app.get("/auth/user", tags=["Dashboard Authentication"])
async def dashboard_get_auth_user(request: Request):
"""Returns information about the currently logged-in dashboard user for the frontend."""
user_id = request.session.get("user_id")
username = request.session.get("username")
avatar = request.session.get("avatar")
if not user_id or not username:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
return {"id": user_id, "username": username, "avatar": avatar}
@dashboard_api_app.get("/user/guilds", tags=["Dashboard User"])
@dashboard_api_app.get("/guilds", tags=["Dashboard Guild Settings"])
async def dashboard_get_user_guilds(
current_user: dict = Depends(dependencies.get_dashboard_user),
):
"""Returns a list of guilds the user is an administrator in AND the bot is also in."""
global http_session # Use the global aiohttp session
if not http_session:
log.error("Dashboard: aiohttp session not initialized.")
raise HTTPException(
status_code=500, detail="Internal server error: HTTP session not ready."
)
if not settings_manager:
log.error("Dashboard: settings_manager not available.")
# Instead of raising an exception, return an empty list with a warning
log.warning(
"Dashboard: Returning empty guild list due to missing settings_manager"
)
return []
access_token = current_user["access_token"]
user_headers = {"Authorization": f"Bearer {access_token}"}
try:
# 1. Fetch guilds user is in from Discord
log.debug(f"Dashboard: Fetching user guilds from {DISCORD_USER_GUILDS_URL}")
async with http_session.get(
DISCORD_USER_GUILDS_URL, headers=user_headers
) as resp:
resp.raise_for_status()
user_guilds = await resp.json()
log.debug(
f"Dashboard: Fetched {len(user_guilds)} guilds for user {current_user['user_id']}"
)
# 2. Fetch guilds the bot is in from our DB
try:
# Add retry logic for database operations
max_db_retries = 3
retry_count = 0
bot_guild_ids = None
# Use the API server's own pool from app.state instead of the bot's pool
while retry_count < max_db_retries and bot_guild_ids is None:
try:
# Always use the API server's own pool with the new function
if hasattr(app.state, "pg_pool") and app.state.pg_pool:
log.info(
"Dashboard: Using API server's pool to fetch guild IDs"
)
bot_guild_ids = (
await settings_manager.get_bot_guild_ids_with_pool(
app.state.pg_pool
)
)
else:
# The improved get_bot_guild_ids will try app.state.pg_pool first
log.info(
"Dashboard: Using enhanced get_bot_guild_ids that prioritizes API server's pool"
)
bot_guild_ids = await settings_manager.get_bot_guild_ids()
if bot_guild_ids is None:
log.warning(
f"Dashboard: Failed to fetch bot guild IDs, retry {retry_count+1}/{max_db_retries}"
)
retry_count += 1
if retry_count < max_db_retries:
await asyncio.sleep(1) # Wait before retrying
except RuntimeError as e:
if "got Future" in str(e) and "attached to a different loop" in str(
e
):
log.warning(
f"Dashboard: Event loop error fetching guild IDs: {e}"
)
log.warning(
"This is likely because we're trying to use a pool from a different thread."
)
# Try to create a new pool just for this request if needed
if not hasattr(app.state, "pg_pool") or not app.state.pg_pool:
try:
log.info(
"Dashboard: Attempting to create a temporary pool for this request"
)
temp_pool = await asyncpg.create_pool(
user=settings.POSTGRES_USER,
password=settings.POSTGRES_PASSWORD,
host=settings.POSTGRES_HOST,
database=settings.POSTGRES_SETTINGS_DB,
min_size=1,
max_size=2,
)
bot_guild_ids = (
await settings_manager.get_bot_guild_ids_with_pool(
temp_pool
)
)
await temp_pool.close()
except Exception as pool_err:
log.error(
f"Dashboard: Failed to create temporary pool: {pool_err}"
)
else:
log.warning(
f"Dashboard: Runtime error fetching bot guild IDs, retry {retry_count+1}/{max_db_retries}: {e}"
)
retry_count += 1
if retry_count < max_db_retries:
await asyncio.sleep(1) # Wait before retrying
except Exception as e:
log.warning(
f"Dashboard: Error fetching bot guild IDs, retry {retry_count+1}/{max_db_retries}: {e}"
)
retry_count += 1
if retry_count < max_db_retries:
await asyncio.sleep(1) # Wait before retrying
# After retries, if still no data, provide a fallback empty set instead of raising an exception
if bot_guild_ids is None:
log.error(
"Dashboard: Failed to fetch bot guild IDs from settings_manager after retries."
)
# Instead of raising an exception, use an empty set as fallback
bot_guild_ids = set()
log.warning(
"Dashboard: Using empty guild set as fallback to allow dashboard to function"
)
except Exception as e:
log.exception(
"Dashboard: Exception while fetching bot guild IDs from settings_manager."
)
# Instead of raising an exception, use an empty set as fallback
bot_guild_ids = set()
log.warning(
"Dashboard: Using empty guild set as fallback to allow dashboard to function"
)
# 3. Filter user guilds
manageable_guilds = []
ADMINISTRATOR_PERMISSION = 0x8
for guild in user_guilds:
guild_id = int(guild["id"])
permissions = int(guild["permissions"])
if (
permissions & ADMINISTRATOR_PERMISSION
) == ADMINISTRATOR_PERMISSION and guild_id in bot_guild_ids:
manageable_guilds.append(
{
"id": guild["id"],
"name": guild["name"],
"icon": guild.get("icon"),
}
)
log.info(
f"Dashboard: Found {len(manageable_guilds)} manageable guilds for user {current_user['user_id']}"
)
return manageable_guilds
except aiohttp.ClientResponseError as e:
log.exception(
f"Dashboard: HTTP error fetching user guilds: {e.status} {e.message}"
)
if e.status == 401:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Discord token invalid or expired. Please re-login.",
)
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail="Error communicating with Discord API.",
)
except Exception as e:
log.exception(f"Dashboard: Generic error fetching user guilds: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="An internal error occurred while fetching guilds.",
)
# --- Dashboard Guild Settings Endpoints ---
@dashboard_api_app.get("/guilds/{guild_id}/channels", tags=["Dashboard Guild Settings"])
async def dashboard_get_guild_channels(
guild_id: int,
current_user: dict = Depends(dependencies.get_dashboard_user),
_: bool = Depends(
dependencies.verify_dashboard_guild_admin
), # Underscore indicates unused but required dependency
):
"""Fetches the channels for a specific guild for the dashboard."""
global http_session # Use the global aiohttp session
if not http_session:
raise HTTPException(
status_code=500, detail="Internal server error: HTTP session not ready."
)
log.info(
f"Dashboard: Fetching channels for guild {guild_id} requested by user {current_user['user_id']}"
)
try:
# Use Discord Bot Token to fetch channels if available
if not settings.DISCORD_BOT_TOKEN:
log.error("Dashboard: DISCORD_BOT_TOKEN not set in environment variables")
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Bot token not configured. Please set DISCORD_BOT_TOKEN in environment variables.",
)
bot_headers = {"Authorization": f"Bot {settings.DISCORD_BOT_TOKEN}"}
# Add rate limit handling
max_retries = 3
retry_count = 0
retry_after = 0
while retry_count < max_retries:
if retry_after > 0:
log.warning(
f"Dashboard: Rate limited by Discord API, waiting {retry_after} seconds before retry"
)
await asyncio.sleep(retry_after)
async with http_session.get(
f"https://discord.com/api/v10/guilds/{guild_id}/channels",
headers=bot_headers,
) as resp:
if resp.status == 429: # Rate limited
retry_count += 1
# Get the most accurate retry time from headers
retry_after = float(
resp.headers.get(
"X-RateLimit-Reset-After",
resp.headers.get("Retry-After", 1),
)
)
# Check if this is a global rate limit
is_global = resp.headers.get("X-RateLimit-Global") is not None
# Get the rate limit scope if available
scope = resp.headers.get("X-RateLimit-Scope", "unknown")
log.warning(
f"Dashboard: Discord API rate limit hit. "
f"Global: {is_global}, Scope: {scope}, "
f"Reset after: {retry_after}s, "
f"Retry: {retry_count}/{max_retries}"
)
# For global rate limits, we might want to wait longer
if is_global:
retry_after = max(
retry_after, 5
) # At least 5 seconds for global limits
continue
# Check rate limit headers and log them for monitoring
rate_limit = {
"limit": resp.headers.get("X-RateLimit-Limit"),
"remaining": resp.headers.get("X-RateLimit-Remaining"),
"reset": resp.headers.get("X-RateLimit-Reset"),
"reset_after": resp.headers.get("X-RateLimit-Reset-After"),
"bucket": resp.headers.get("X-RateLimit-Bucket"),
}
# If we're getting close to the rate limit, log a warning
if rate_limit["remaining"] and rate_limit["limit"]:
try:
remaining = int(rate_limit["remaining"])
limit = int(rate_limit["limit"])
if remaining < 5:
log.warning(
f"Dashboard: Rate limit warning: {remaining}/{limit} "
f"requests remaining in bucket {rate_limit['bucket'] or 'unknown'}. "
f"Resets in {rate_limit['reset_after'] or 'unknown'}s"
)
except (ValueError, TypeError):
# Handle case where headers might be present but not valid integers
pass
resp.raise_for_status()
channels = await resp.json()
# Filter and format channels
formatted_channels = []
for channel in channels:
formatted_channels.append(
{
"id": channel["id"],
"name": channel["name"],
"type": channel["type"],
"parent_id": channel.get("parent_id"),
}
)
return formatted_channels
# If we get here, we've exceeded our retry limit
raise HTTPException(
status_code=429,
detail="Rate limited by Discord API. Please try again later.",
)
except aiohttp.ClientResponseError as e:
log.exception(
f"Dashboard: HTTP error fetching guild channels: {e.status} {e.message}"
)
if e.status == 429:
raise HTTPException(
status_code=429,
detail="Rate limited by Discord API. Please try again later.",
)
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail="Error communicating with Discord API.",
)
except Exception as e:
log.exception(f"Dashboard: Generic error fetching guild channels: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="An internal error occurred while fetching channels.",
)
@dashboard_api_app.get("/guilds/{guild_id}/roles", tags=["Dashboard Guild Settings"])
async def dashboard_get_guild_roles(
guild_id: int,
current_user: dict = Depends(dependencies.get_dashboard_user),
_: bool = Depends(
dependencies.verify_dashboard_guild_admin
), # Underscore indicates unused but required dependency
):
"""Fetches the roles for a specific guild for the dashboard."""
global http_session # Use the global aiohttp session
if not http_session:
raise HTTPException(
status_code=500, detail="Internal server error: HTTP session not ready."
)
log.info(
f"Dashboard: Fetching roles for guild {guild_id} requested by user {current_user['user_id']}"
)
try:
# Use Discord Bot Token to fetch roles if available
if not settings.DISCORD_BOT_TOKEN:
log.error("Dashboard: DISCORD_BOT_TOKEN not set in environment variables")
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Bot token not configured. Please set DISCORD_BOT_TOKEN in environment variables.",
)
bot_headers = {"Authorization": f"Bot {settings.DISCORD_BOT_TOKEN}"}
# Add rate limit handling
max_retries = 3
retry_count = 0
retry_after = 0
while retry_count < max_retries:
if retry_after > 0:
log.warning(
f"Dashboard: Rate limited by Discord API, waiting {retry_after} seconds before retry"
)
await asyncio.sleep(retry_after)
async with http_session.get(
f"https://discord.com/api/v10/guilds/{guild_id}/roles",
headers=bot_headers,
) as resp:
if resp.status == 429: # Rate limited
retry_count += 1
# Get the most accurate retry time from headers
retry_after = float(
resp.headers.get(
"X-RateLimit-Reset-After",
resp.headers.get("Retry-After", 1),
)
)
# Check if this is a global rate limit
is_global = resp.headers.get("X-RateLimit-Global") is not None
# Get the rate limit scope if available
scope = resp.headers.get("X-RateLimit-Scope", "unknown")
log.warning(
f"Dashboard: Discord API rate limit hit. "
f"Global: {is_global}, Scope: {scope}, "
f"Reset after: {retry_after}s, "
f"Retry: {retry_count}/{max_retries}"
)
# For global rate limits, we might want to wait longer
if is_global:
retry_after = max(
retry_after, 5
) # At least 5 seconds for global limits
continue
resp.raise_for_status()
roles = await resp.json()
# Filter and format roles
formatted_roles = []
for role in roles:
# Skip @everyone role
if role["name"] == "@everyone":
continue
formatted_roles.append(
{
"id": role["id"],
"name": role["name"],
"color": role["color"],
"position": role["position"],
"permissions": role["permissions"],
}
)
# Sort roles by position (highest first)
formatted_roles.sort(key=lambda r: r["position"], reverse=True)
return formatted_roles
# If we get here, we've exceeded our retry limit
raise HTTPException(
status_code=429,
detail="Rate limited by Discord API. Please try again later.",
)
except aiohttp.ClientResponseError as e:
log.exception(
f"Dashboard: HTTP error fetching guild roles: {e.status} {e.message}"
)
if e.status == 429:
raise HTTPException(
status_code=429,
detail="Rate limited by Discord API. Please try again later.",
)
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail="Error communicating with Discord API.",
)
except Exception as e:
log.exception(f"Dashboard: Generic error fetching guild roles: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="An internal error occurred while fetching roles.",
)
@dashboard_api_app.get("/guilds/{guild_id}/commands", tags=["Dashboard Guild Settings"])
async def dashboard_get_guild_commands(
guild_id: int,
current_user: dict = Depends(dependencies.get_dashboard_user),
_: bool = Depends(
dependencies.verify_dashboard_guild_admin
), # Underscore indicates unused but required dependency
):
"""Fetches the commands for a specific guild for the dashboard."""
global http_session # Use the global aiohttp session
if not http_session:
raise HTTPException(
status_code=500, detail="Internal server error: HTTP session not ready."
)
log.info(
f"Dashboard: Fetching commands for guild {guild_id} requested by user {current_user['user_id']}"
)
try:
# Use Discord Bot Token to fetch application commands if available
if not settings.DISCORD_BOT_TOKEN:
log.error("Dashboard: DISCORD_BOT_TOKEN not set in environment variables")
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Bot token not configured. Please set DISCORD_BOT_TOKEN in environment variables.",
)
bot_headers = {"Authorization": f"Bot {settings.DISCORD_BOT_TOKEN}"}
application_id = (
settings.DISCORD_CLIENT_ID
) # This should be the same as your bot's application ID
# Add rate limit handling
max_retries = 3
retry_count = 0
retry_after = 0
while retry_count < max_retries:
if retry_after > 0:
log.warning(
f"Dashboard: Rate limited by Discord API, waiting {retry_after} seconds before retry"
)
await asyncio.sleep(retry_after)
async with http_session.get(
f"https://discord.com/api/v10/applications/{application_id}/guilds/{guild_id}/commands",
headers=bot_headers,
) as resp:
if resp.status == 429: # Rate limited
retry_count += 1
# Get the most accurate retry time from headers
retry_after = float(
resp.headers.get(
"X-RateLimit-Reset-After",
resp.headers.get("Retry-After", 1),
)
)
# Check if this is a global rate limit
is_global = resp.headers.get("X-RateLimit-Global") is not None
# Get the rate limit scope if available
scope = resp.headers.get("X-RateLimit-Scope", "unknown")
log.warning(
f"Dashboard: Discord API rate limit hit. "
f"Global: {is_global}, Scope: {scope}, "
f"Reset after: {retry_after}s, "
f"Retry: {retry_count}/{max_retries}"
)
# For global rate limits, we might want to wait longer
if is_global:
retry_after = max(
retry_after, 5
) # At least 5 seconds for global limits
continue
# Handle 404 specially - it's not an error, just means no commands are registered
if resp.status == 404:
return []
resp.raise_for_status()
commands = await resp.json()
# Format commands
formatted_commands = []
for cmd in commands:
formatted_commands.append(
{
"id": cmd["id"],
"name": cmd["name"],
"description": cmd.get("description", ""),
"type": cmd.get("type", 1), # Default to CHAT_INPUT type
"options": cmd.get("options", []),
}
)
return formatted_commands
# If we get here, we've exceeded our retry limit
raise HTTPException(
status_code=429,
detail="Rate limited by Discord API. Please try again later.",
)
except aiohttp.ClientResponseError as e:
log.exception(
f"Dashboard: HTTP error fetching guild commands: {e.status} {e.message}"
)
if e.status == 404:
# If no commands are registered yet, return an empty list
return []
if e.status == 429:
raise HTTPException(
status_code=429,
detail="Rate limited by Discord API. Please try again later.",
)
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail="Error communicating with Discord API.",
)
except Exception as e:
log.exception(f"Dashboard: Generic error fetching guild commands: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="An internal error occurred while fetching commands.",
)
@dashboard_api_app.get("/settings", tags=["Dashboard Settings"])
async def dashboard_get_settings(
current_user: dict = Depends(dependencies.get_dashboard_user),
):
"""Fetches the global AI settings for the dashboard."""
log.info(
f"Dashboard: Fetching global settings requested by user {current_user['user_id']}"
)
try:
# Get settings from the database
settings_data = db.get_user_settings(current_user["user_id"])
if not settings_data:
# Return default settings if none exist
return {
"model": "openai/gpt-3.5-turbo",
"temperature": 0.7,
"max_tokens": 1000,
"system_message": "",
"character": "",
"character_info": "",
"custom_instructions": "",
}
return settings_data
except Exception as e:
log.exception(f"Dashboard: Error fetching global settings: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="An internal error occurred while fetching settings.",
)
@dashboard_api_app.post("/settings", tags=["Dashboard Settings"])
@dashboard_api_app.put("/settings", tags=["Dashboard Settings"])
async def dashboard_update_settings(
request: Request, current_user: dict = Depends(dependencies.get_dashboard_user)
):
"""Updates the global AI settings for the dashboard."""
log.info(
f"Dashboard: Updating global settings requested by user {current_user['user_id']}"
)
try:
# Parse the request body
body_text = await request.body()
body = json.loads(body_text.decode("utf-8"))
log.debug(f"Dashboard: Received settings update: {body}")
# Extract settings from the request body
settings_data = None
# Try different formats to be flexible
if "settings" in body:
settings_data = body["settings"]
elif isinstance(body, dict) and "model" in body:
# Direct settings object
settings_data = body
if not settings_data:
raise HTTPException(
status_code=400,
detail="Invalid settings format. Expected 'settings' field or direct settings object.",
)
# Create a UserSettings object
try:
settings = UserSettings.model_validate(settings_data)
except Exception as e:
log.exception(f"Dashboard: Error validating settings: {e}")
raise HTTPException(
status_code=400, detail=f"Invalid settings data: {str(e)}"
)
# Save the settings
result = db.save_user_settings(current_user["user_id"], settings)
log.info(
f"Dashboard: Successfully updated settings for user {current_user['user_id']}"
)
return result
except json.JSONDecodeError:
log.exception(f"Dashboard: Error decoding JSON in settings update")
raise HTTPException(status_code=400, detail="Invalid JSON in request body")
except Exception as e:
log.exception(f"Dashboard: Error updating settings: {e}")
raise HTTPException(
status_code=500,
detail=f"An internal error occurred while updating settings: {str(e)}",
)
@dashboard_api_app.get(
"/guilds/{guild_id}/settings",
response_model=GuildSettingsResponse,
tags=["Dashboard Guild Settings"],
)
async def dashboard_get_guild_settings(
guild_id: int,
current_user: dict = Depends(dependencies.get_dashboard_user),
_: bool = Depends(
dependencies.verify_dashboard_guild_admin
), # Underscore indicates unused but required dependency
):
"""Fetches the current settings for a specific guild for the dashboard."""
if not settings_manager:
raise HTTPException(
status_code=500,
detail="Internal server error: Settings manager not available.",
)
log.info(
f"Dashboard: Fetching settings for guild {guild_id} requested by user {current_user['user_id']}"
)
prefix = await settings_manager.get_guild_prefix(
guild_id, "!"
) # Use default prefix constant
wc_id = await settings_manager.get_setting(guild_id, "welcome_channel_id")
wc_msg = await settings_manager.get_setting(guild_id, "welcome_message")
gc_id = await settings_manager.get_setting(guild_id, "goodbye_channel_id")
gc_msg = await settings_manager.get_setting(guild_id, "goodbye_message")
known_cogs_in_db = {}
try:
# First try to use the API server's pool
if hasattr(app.state, "pg_pool") and app.state.pg_pool:
log.info(
f"Dashboard: Using API server's pool to fetch cog statuses for guild {guild_id}"
)
async with app.state.pg_pool.acquire() as conn:
records = await conn.fetch(
"SELECT cog_name, enabled FROM enabled_cogs WHERE guild_id = $1",
guild_id,
)
for record in records:
known_cogs_in_db[record["cog_name"]] = record["enabled"]
else:
# Fall back to bot's pool if API server pool not available
bot = get_bot_instance()
if bot and bot.pg_pool:
log.info(
f"Dashboard: Using bot's pool to fetch cog statuses for guild {guild_id}"
)
async with bot.pg_pool.acquire() as conn:
records = await conn.fetch(
"SELECT cog_name, enabled FROM enabled_cogs WHERE guild_id = $1",
guild_id,
)
for record in records:
known_cogs_in_db[record["cog_name"]] = record["enabled"]
else:
log.error(
"Dashboard: Neither API server pool nor bot pool is available"
)
except RuntimeError as e:
if "got Future" in str(e) and "attached to a different loop" in str(e):
log.warning(f"Dashboard: Event loop error fetching cog statuses: {e}")
log.warning(
"This is likely because we're trying to use a pool from a different thread."
)
# Try to create a temporary pool just for this request
try:
log.info(
"Dashboard: Attempting to create a temporary pool for cog statuses"
)
temp_pool = await asyncpg.create_pool(
user=settings.POSTGRES_USER,
password=settings.POSTGRES_PASSWORD,
host=settings.POSTGRES_HOST,
database=settings.POSTGRES_SETTINGS_DB,
min_size=1,
max_size=2,
)
async with temp_pool.acquire() as conn:
records = await conn.fetch(
"SELECT cog_name, enabled FROM enabled_cogs WHERE guild_id = $1",
guild_id,
)
for record in records:
known_cogs_in_db[record["cog_name"]] = record["enabled"]
await temp_pool.close()
except Exception as pool_err:
log.error(
f"Dashboard: Failed to create temporary pool for cog statuses: {pool_err}"
)
else:
log.exception(
f"Dashboard: Runtime error fetching cog statuses from DB for guild {guild_id}: {e}"
)
except Exception as e:
log.exception(
f"Dashboard: Failed to fetch cog statuses from DB for guild {guild_id}: {e}"
)
# Fetch command permissions
permissions_map: Dict[str, List[str]] = {}
try:
# First try to use the API server's pool
if hasattr(app.state, "pg_pool") and app.state.pg_pool:
log.info(
f"Dashboard: Using API server's pool to fetch command permissions for guild {guild_id}"
)
async with app.state.pg_pool.acquire() as conn:
records = await conn.fetch(
"SELECT command_name, allowed_role_id FROM command_permissions WHERE guild_id = $1 ORDER BY command_name, allowed_role_id",
guild_id,
)
for record in records:
cmd = record["command_name"]
role_id_str = str(record["allowed_role_id"])
if cmd not in permissions_map:
permissions_map[cmd] = []
permissions_map[cmd].append(role_id_str)
else:
# Fall back to bot's pool if API server pool not available
bot = get_bot_instance()
if bot and bot.pg_pool:
log.info(
f"Dashboard: Using bot's pool to fetch command permissions for guild {guild_id}"
)
async with bot.pg_pool.acquire() as conn:
records = await conn.fetch(
"SELECT command_name, allowed_role_id FROM command_permissions WHERE guild_id = $1 ORDER BY command_name, allowed_role_id",
guild_id,
)
for record in records:
cmd = record["command_name"]
role_id_str = str(record["allowed_role_id"])
if cmd not in permissions_map:
permissions_map[cmd] = []
permissions_map[cmd].append(role_id_str)
else:
log.error(
"Dashboard: Neither API server pool nor bot pool is available"
)
except RuntimeError as e:
if "got Future" in str(e) and "attached to a different loop" in str(e):
log.warning(
f"Dashboard: Event loop error fetching command permissions: {e}"
)
# Try to create a temporary pool just for this request
try:
log.info(
"Dashboard: Attempting to create a temporary pool for command permissions"
)
temp_pool = await asyncpg.create_pool(
user=settings.POSTGRES_USER,
password=settings.POSTGRES_PASSWORD,
host=settings.POSTGRES_HOST,
database=settings.POSTGRES_SETTINGS_DB,
min_size=1,
max_size=2,
)
async with temp_pool.acquire() as conn:
records = await conn.fetch(
"SELECT command_name, allowed_role_id FROM command_permissions WHERE guild_id = $1 ORDER BY command_name, allowed_role_id",
guild_id,
)
for record in records:
cmd = record["command_name"]
role_id_str = str(record["allowed_role_id"])
if cmd not in permissions_map:
permissions_map[cmd] = []
permissions_map[cmd].append(role_id_str)
await temp_pool.close()
except Exception as pool_err:
log.error(
f"Dashboard: Failed to create temporary pool for command permissions: {pool_err}"
)
else:
log.exception(
f"Dashboard: Runtime error fetching command permissions from DB for guild {guild_id}: {e}"
)
except Exception as e:
log.exception(
f"Dashboard: Failed to fetch command permissions from DB for guild {guild_id}: {e}"
)
settings_data = GuildSettingsResponse(
guild_id=str(guild_id),
prefix=prefix,
welcome_channel_id=wc_id if wc_id != "__NONE__" else None,
welcome_message=wc_msg if wc_msg != "__NONE__" else None,
goodbye_channel_id=gc_id if gc_id != "__NONE__" else None,
goodbye_message=gc_msg if gc_msg != "__NONE__" else None,
enabled_cogs=known_cogs_in_db,
command_permissions=permissions_map,
)
return settings_data
@dashboard_api_app.patch(
"/guilds/{guild_id}/settings",
status_code=status.HTTP_200_OK,
tags=["Dashboard Guild Settings"],
)
async def dashboard_update_guild_settings(
guild_id: int,
settings_update: GuildSettingsUpdate,
current_user: dict = Depends(dependencies.get_dashboard_user),
_: bool = Depends(
dependencies.verify_dashboard_guild_admin
), # Underscore indicates unused but required dependency
):
"""Updates specific settings for a guild via the dashboard."""
if not settings_manager:
raise HTTPException(
status_code=500,
detail="Internal server error: Settings manager not available.",
)
log.info(
f"Dashboard: Updating settings for guild {guild_id} requested by user {current_user['user_id']}"
)
update_data = settings_update.model_dump(exclude_unset=True)
log.debug(f"Dashboard: Update data received: {update_data}")
success_flags = []
core_cogs_list = {"SettingsCog", "HelpCog"} # TODO: Get this reliably
if "prefix" in update_data:
success = await settings_manager.set_guild_prefix(
guild_id, update_data["prefix"]
)
success_flags.append(success)
if not success:
log.error(f"Dashboard: Failed to update prefix for guild {guild_id}")
if "welcome_channel_id" in update_data:
value = (
update_data["welcome_channel_id"]
if update_data["welcome_channel_id"]
else None
)
success = await settings_manager.set_setting(
guild_id, "welcome_channel_id", value
)
success_flags.append(success)
if not success:
log.error(
f"Dashboard: Failed to update welcome_channel_id for guild {guild_id}"
)
if "welcome_message" in update_data:
success = await settings_manager.set_setting(
guild_id, "welcome_message", update_data["welcome_message"]
)
success_flags.append(success)
if not success:
log.error(
f"Dashboard: Failed to update welcome_message for guild {guild_id}"
)
if "goodbye_channel_id" in update_data:
value = (
update_data["goodbye_channel_id"]
if update_data["goodbye_channel_id"]
else None
)
success = await settings_manager.set_setting(
guild_id, "goodbye_channel_id", value
)
success_flags.append(success)
if not success:
log.error(
f"Dashboard: Failed to update goodbye_channel_id for guild {guild_id}"
)
if "goodbye_message" in update_data:
success = await settings_manager.set_setting(
guild_id, "goodbye_message", update_data["goodbye_message"]
)
success_flags.append(success)
if not success:
log.error(
f"Dashboard: Failed to update goodbye_message for guild {guild_id}"
)
if "cogs" in update_data and update_data["cogs"] is not None:
for cog_name, enabled_status in update_data["cogs"].items():
if cog_name not in core_cogs_list:
success = await settings_manager.set_cog_enabled(
guild_id, cog_name, enabled_status
)
success_flags.append(success)
if not success:
log.error(
f"Dashboard: Failed to update status for cog '{cog_name}' for guild {guild_id}"
)
else:
log.warning(
f"Dashboard: Attempted to change status of core cog '{cog_name}' for guild {guild_id} - ignored."
)
if all(s is True for s in success_flags): # Check if all operations returned True
return {"message": "Settings updated successfully."}
else:
raise HTTPException(
status_code=500,
detail="One or more settings failed to update. Check server logs.",
)
# --- Dashboard Command Permission Endpoints ---
@dashboard_api_app.get(
"/guilds/{guild_id}/permissions",
response_model=CommandPermissionsResponse,
tags=["Dashboard Guild Settings"],
)
async def dashboard_get_all_guild_command_permissions_map(
guild_id: int,
current_user: dict = Depends(dependencies.get_dashboard_user),
_: bool = Depends(
dependencies.verify_dashboard_guild_admin
), # Underscore indicates unused but required dependency
):
"""Fetches all command permissions currently set for the guild for the dashboard as a map."""
if not settings_manager:
raise HTTPException(
status_code=500,
detail="Internal server error: Settings manager not available.",
)
log.info(
f"Dashboard: Fetching all command permissions map for guild {guild_id} requested by user {current_user['user_id']}"
)
permissions_map: Dict[str, List[str]] = {}
try:
bot = get_bot_instance()
if bot and bot.pg_pool:
async with bot.pg_pool.acquire() as conn:
records = await conn.fetch(
"SELECT command_name, allowed_role_id FROM command_permissions WHERE guild_id = $1 ORDER BY command_name, allowed_role_id",
guild_id,
)
for record in records:
cmd = record["command_name"]
role_id_str = str(record["allowed_role_id"])
if cmd not in permissions_map:
permissions_map[cmd] = []
permissions_map[cmd].append(role_id_str)
else:
log.error("Dashboard: Bot instance or pg_pool not initialized.")
return CommandPermissionsResponse(permissions=permissions_map)
except Exception as e:
log.exception(
f"Dashboard: Database error fetching all command permissions for guild {guild_id}: {e}"
)
raise HTTPException(
status_code=500, detail="Failed to fetch command permissions."
)
@dashboard_api_app.get(
"/guilds/{guild_id}/command-permissions", tags=["Dashboard Guild Settings"]
)
async def dashboard_get_all_guild_command_permissions(
guild_id: int,
current_user: dict = Depends(dependencies.get_dashboard_user),
_: bool = Depends(
dependencies.verify_dashboard_guild_admin
), # Underscore indicates unused but required dependency
):
"""Fetches all command permissions currently set for the guild for the dashboard as an array of objects."""
if not settings_manager:
raise HTTPException(
status_code=500,
detail="Internal server error: Settings manager not available.",
)
log.info(
f"Dashboard: Fetching all command permissions for guild {guild_id} requested by user {current_user['user_id']}"
)
permissions_list = []
try:
bot = get_bot_instance()
if bot and bot.pg_pool:
async with bot.pg_pool.acquire() as conn:
records = await conn.fetch(
"SELECT command_name, allowed_role_id FROM command_permissions WHERE guild_id = $1 ORDER BY command_name, allowed_role_id",
guild_id,
)
# Get role information to include role names
bot_headers = {"Authorization": f"Bot {settings.DISCORD_BOT_TOKEN}"}
roles = []
try:
async with http_session.get(
f"https://discord.com/api/v10/guilds/{guild_id}/roles",
headers=bot_headers,
) as resp:
if resp.status == 200:
roles = await resp.json()
except Exception as e:
log.warning(f"Failed to fetch role information: {e}")
# Create a map of role IDs to role names
role_map = (
{str(role["id"]): role["name"] for role in roles} if roles else {}
)
for record in records:
cmd = record["command_name"]
role_id_str = str(record["allowed_role_id"])
role_name = role_map.get(role_id_str, f"Role ID: {role_id_str}")
permissions_list.append(
{"command": cmd, "role_id": role_id_str, "role_name": role_name}
)
else:
log.error("Dashboard: settings_manager pg_pool not initialized.")
return permissions_list
except Exception as e:
log.exception(
f"Dashboard: Database error fetching all command permissions for guild {guild_id}: {e}"
)
raise HTTPException(
status_code=500, detail="Failed to fetch command permissions."
)
@dashboard_api_app.post(
"/guilds/{guild_id}/ai-moderation-action",
status_code=status.HTTP_201_CREATED,
tags=["Moderation", "AI Integration"],
)
async def ai_moderation_action(
guild_id: int, action: AIModerationAction, request: Request
):
"""
Endpoint for external AI moderator to log moderation actions and add them to cases.
Requires header: Authorization: Bearer <MOD_LOG_API_SECRET>
"""
# Security check
auth_header = request.headers.get("Authorization")
if (
not settings.MOD_LOG_API_SECRET
or not auth_header
or auth_header != f"Bearer {settings.MOD_LOG_API_SECRET}"
):
log.warning(
f"Unauthorized attempt to use AI moderation endpoint. Headers: {request.headers}"
)
raise HTTPException(status_code=403, detail="Forbidden")
# Validate guild_id in path matches payload
if guild_id != action.guild_id:
log.error(
f"Mismatch between guild_id in path ({guild_id}) and payload ({action.guild_id})."
)
raise HTTPException(
status_code=400, detail="guild_id in path does not match payload"
)
# Insert into moderation log
bot = get_bot_instance()
if not settings_manager or not bot or not bot.pg_pool:
log.error(
"settings_manager, bot instance, or pg_pool not available for AI moderation logging."
)
raise HTTPException(status_code=503, detail="Moderation logging unavailable")
# Use bot ID 0 for AI actions (or a reserved ID)
AI_MODERATOR_ID = 0
# Map action type to internal action_type if needed
action_type = action.action.upper()
reason = f"[AI:{action.ai_model}] Rule {action.rule_violated}: {action.reasoning}"
# Add to moderation log
try:
from db import mod_log_db # type: ignore
bot = get_bot_instance()
# Create AI details dictionary with all relevant information
ai_details = {
"rule_violated": action.rule_violated,
"reasoning": action.reasoning,
"ai_model": action.ai_model,
"message_content": action.message_content,
"message_link": action.message_link,
"channel_name": action.channel_name,
"attachments": action.attachments,
}
# Check if this is a warning action and execute it if needed
warning_result = None
if action_type == "WARN":
log.info(
f"Executing warning action for user {action.user_id} in guild {action.guild_id}"
)
warning_result = await execute_warning(
bot, action.guild_id, action.user_id, reason
)
if warning_result and warning_result.get("success"):
log.info(
f"Warning executed successfully for user {action.user_id} in guild {action.guild_id}"
)
else:
log.warning(
f"Warning execution failed for user {action.user_id} in guild {action.guild_id}: {warning_result.get('error', 'Unknown error')}"
)
# Use our new thread-safe function to log the action
case_id = await mod_log_db.log_action_safe(
bot_instance=bot,
guild_id=action.guild_id,
target_user_id=action.user_id,
action_type=action_type,
reason=reason,
ai_details=ai_details,
source="AI_API",
)
# If the thread-safe function failed, fall back to just adding to the database
if case_id is None:
log.warning(
f"Failed to log action using thread-safe function, falling back to database-only logging"
)
# Use the thread-safe version of add_mod_log as fallback
case_id = await mod_log_db.add_mod_log_safe(
bot, # Pass the bot instance, not just the pool
guild_id=action.guild_id,
moderator_id=AI_MODERATOR_ID,
target_user_id=action.user_id,
action_type=action_type,
reason=reason,
duration_seconds=None,
)
# If this was a warning action but we didn't execute it yet (due to the first log_action_safe failing),
# try to execute it now
if action_type == "WARN" and warning_result is None:
log.info(
f"Executing warning action after fallback for user {action.user_id} in guild {action.guild_id}"
)
warning_result = await execute_warning(
bot, action.guild_id, action.user_id, reason
)
if warning_result and warning_result.get("success"):
log.info(
f"Warning executed successfully after fallback for user {action.user_id} in guild {action.guild_id}"
)
else:
log.warning(
f"Warning execution failed after fallback for user {action.user_id} in guild {action.guild_id}: {warning_result.get('error', 'Unknown error')}"
)
if not case_id:
log.error(
f"Failed to add mod log entry for guild {guild_id}, user {action.user_id}, action {action_type}"
)
response = {
"success": False,
"error": "Failed to add moderation log entry to database",
"message": "The action was recorded but could not be added to the moderation logs",
}
# Include warning execution result in the response if applicable
if action_type == "WARN" and warning_result:
response["warning_executed"] = warning_result.get("success", False)
if not warning_result.get("success", False):
response["warning_error"] = warning_result.get(
"error", "Unknown error"
)
return response
# If we have a case_id and message details, update the log entry
if case_id and action.message_id and action.channel_id:
# Use the thread-safe version of update_mod_log_message_details
update_success = await mod_log_db.update_mod_log_message_details_safe(
bot, # Pass the bot instance, not just the pool
case_id=case_id,
message_id=action.message_id,
channel_id=action.channel_id,
)
if not update_success:
log.warning(
f"Added mod log entry (case_id: {case_id}) but failed to update message details "
f"for guild {guild_id}, user {action.user_id}, action {action_type}"
)
# Continue anyway since the main entry was added successfully
log.info(
f"AI moderation action logged successfully for guild {guild_id}, user {action.user_id}, action {action_type}, case {case_id}"
)
# Include warning execution result in the response if applicable
response = {"success": True, "case_id": case_id}
if action_type == "WARN" and warning_result:
response["warning_executed"] = warning_result.get("success", False)
if not warning_result.get("success", False):
response["warning_error"] = warning_result.get("error", "Unknown error")
return response
except asyncpg.exceptions.PostgresError as e:
# Handle database-specific errors
import traceback
tb = traceback.format_exc()
log.error(
f"Database error logging AI moderation action for guild {guild_id}, user {action.user_id}, "
f"action {action_type}. Exception: {e}\nTraceback: {tb}"
)
response = {
"success": False,
"error": f"Database error: {str(e)}",
"traceback": tb,
}
# Include warning execution result in the response if applicable
if action_type == "WARN" and "warning_result" in locals() and warning_result:
response["warning_executed"] = warning_result.get("success", False)
if not warning_result.get("success", False):
response["warning_error"] = warning_result.get("error", "Unknown error")
return response
except Exception as e:
import traceback
tb = traceback.format_exc()
log.error(
f"Error logging AI moderation action for guild {guild_id}, user {action.user_id}, "
f"action {action_type}, reason: {reason}. Exception: {e}\nTraceback: {tb}"
)
response = {"success": False, "error": str(e), "traceback": tb}
# Include warning execution result in the response if applicable
if action_type == "WARN" and "warning_result" in locals() and warning_result:
response["warning_executed"] = warning_result.get("success", False)
if not warning_result.get("success", False):
response["warning_error"] = warning_result.get("error", "Unknown error")
return response
@dashboard_api_app.post(
"/guilds/{guild_id}/test-goodbye",
status_code=status.HTTP_200_OK,
tags=["Dashboard Guild Settings"],
)
async def dashboard_test_goodbye_message(
guild_id: int,
_user: dict = Depends(
dependencies.get_dashboard_user
), # Underscore prefix to indicate unused parameter
_: bool = Depends(
dependencies.verify_dashboard_guild_admin
), # Underscore indicates unused but required dependency
):
"""Test the goodbye message for a guild."""
try:
# Get goodbye settings
goodbye_channel_id_str = await settings_manager.get_setting(
guild_id, "goodbye_channel_id"
)
goodbye_message_template = await settings_manager.get_setting(
guild_id, "goodbye_message", default="{username} has left the server."
)
# Check if goodbye channel is set
if not goodbye_channel_id_str or goodbye_channel_id_str == "__NONE__":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Goodbye channel not configured",
)
# Get the guild name from Discord API
guild_name = await get_guild_name_from_api(guild_id)
# Format the message
formatted_message = goodbye_message_template.format(
username="TestUser", server=guild_name
)
# No need to import bot_instance anymore since we're using the direct API approach
# Send the message directly via Discord API
try:
goodbye_channel_id = int(goodbye_channel_id_str)
# Send the message using our direct API approach
result = await send_discord_message_via_api(
goodbye_channel_id, formatted_message
)
if result["success"]:
log.info(
f"Sent test goodbye message to channel {goodbye_channel_id} in guild {guild_id}"
)
return {
"message": "Test goodbye message sent successfully",
"channel_id": goodbye_channel_id_str,
"formatted_message": formatted_message,
"message_id": result.get("message_id"),
}
else:
log.error(
f"Error sending test goodbye message to channel {goodbye_channel_id} in guild {guild_id}: {result['message']}"
)
return {
"message": f"Test goodbye message could not be sent: {result['message']}",
"channel_id": goodbye_channel_id_str,
"formatted_message": formatted_message,
"error": result.get("error"),
}
except ValueError:
log.error(
f"Invalid goodbye channel ID '{goodbye_channel_id_str}' for guild {guild_id}"
)
return {
"message": "Test goodbye message could not be sent (invalid channel ID)",
"channel_id": goodbye_channel_id_str,
"formatted_message": formatted_message,
}
except Exception as e:
log.error(
f"Error sending test goodbye message to channel {goodbye_channel_id_str} in guild {guild_id}: {e}"
)
return {
"message": f"Test goodbye message could not be sent: {str(e)}",
"channel_id": goodbye_channel_id_str,
"formatted_message": formatted_message,
}
except HTTPException:
# Re-raise HTTP exceptions
raise
except Exception as e:
log.error(f"Error testing goodbye message for guild {guild_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error testing goodbye message: {str(e)}",
)
@dashboard_api_app.post(
"/guilds/{guild_id}/permissions",
status_code=status.HTTP_201_CREATED,
tags=["Dashboard Guild Settings"],
)
@dashboard_api_app.post(
"/guilds/{guild_id}/command-permissions",
status_code=status.HTTP_201_CREATED,
tags=["Dashboard Guild Settings"],
)
async def dashboard_add_guild_command_permission(
guild_id: int,
permission: CommandPermission,
current_user: dict = Depends(dependencies.get_dashboard_user),
_: bool = Depends(
dependencies.verify_dashboard_guild_admin
), # Underscore indicates unused but required dependency
):
"""Adds a role permission for a specific command via the dashboard."""
if not settings_manager:
raise HTTPException(
status_code=500,
detail="Internal server error: Settings manager not available.",
)
log.info(
f"Dashboard: Adding command permission for command '{permission.command_name}', role '{permission.role_id}' in guild {guild_id} requested by user {current_user['user_id']}"
)
try:
role_id = int(permission.role_id)
except ValueError:
raise HTTPException(
status_code=400, detail="Invalid role_id format. Must be numeric."
)
success = await settings_manager.add_command_permission(
guild_id, permission.command_name, role_id
)
if success:
return {
"message": "Permission added successfully.",
"command": permission.command_name,
"role_id": permission.role_id,
}
else:
raise HTTPException(
status_code=500,
detail="Failed to add command permission. Check server logs.",
)
@dashboard_api_app.delete(
"/guilds/{guild_id}/permissions",
status_code=status.HTTP_200_OK,
tags=["Dashboard Guild Settings"],
)
@dashboard_api_app.delete(
"/guilds/{guild_id}/command-permissions",
status_code=status.HTTP_200_OK,
tags=["Dashboard Guild Settings"],
)
async def dashboard_remove_guild_command_permission(
guild_id: int,
permission: CommandPermission,
current_user: dict = Depends(dependencies.get_dashboard_user),
_: bool = Depends(
dependencies.verify_dashboard_guild_admin
), # Underscore indicates unused but required dependency
):
"""Removes a role permission for a specific command via the dashboard."""
if not settings_manager:
raise HTTPException(
status_code=500,
detail="Internal server error: Settings manager not available.",
)
log.info(
f"Dashboard: Removing command permission for command '{permission.command_name}', role '{permission.role_id}' in guild {guild_id} requested by user {current_user['user_id']}"
)
try:
role_id = int(permission.role_id)
except ValueError:
raise HTTPException(
status_code=400, detail="Invalid role_id format. Must be numeric."
)
success = await settings_manager.remove_command_permission(
guild_id, permission.command_name, role_id
)
if success:
return {
"message": "Permission removed successfully.",
"command": permission.command_name,
"role_id": permission.role_id,
}
else:
raise HTTPException(
status_code=500,
detail="Failed to remove command permission. Check server logs.",
)
# ============= Conversation Endpoints =============
# (Keep existing conversation/settings endpoints under /api and /discordapi)
@api_app.get("/conversations", response_model=GetConversationsResponse)
@discordapi_app.get("/conversations", response_model=GetConversationsResponse)
async def get_conversations(user_id: str = Depends(verify_discord_token)):
"""Get all conversations for a user"""
conversations = db.get_user_conversations(user_id)
return {"conversations": conversations}
@api_app.get("/conversations/{conversation_id}")
@discordapi_app.get("/conversations/{conversation_id}")
async def get_conversation(
conversation_id: str, user_id: str = Depends(verify_discord_token)
):
"""Get a specific conversation for a user"""
conversation = db.get_conversation(user_id, conversation_id)
if not conversation:
raise HTTPException(status_code=404, detail="Conversation not found")
return conversation
@api_app.post("/conversations", response_model=Conversation)
@discordapi_app.post("/conversations", response_model=Conversation)
async def create_conversation(
conversation_request: UpdateConversationRequest,
user_id: str = Depends(verify_discord_token),
):
"""Create or update a conversation for a user"""
conversation = conversation_request.conversation
return db.save_conversation(user_id, conversation)
@api_app.put("/conversations/{conversation_id}", response_model=Conversation)
@discordapi_app.put("/conversations/{conversation_id}", response_model=Conversation)
async def update_conversation(
conversation_id: str,
conversation_request: UpdateConversationRequest,
user_id: str = Depends(verify_discord_token),
):
"""Update a specific conversation for a user"""
conversation = conversation_request.conversation
# Ensure the conversation ID in the path matches the one in the request
if conversation_id != conversation.id:
raise HTTPException(status_code=400, detail="Conversation ID mismatch")
# Check if the conversation exists
existing_conversation = db.get_conversation(user_id, conversation_id)
if not existing_conversation:
raise HTTPException(status_code=404, detail="Conversation not found")
return db.save_conversation(user_id, conversation)
@api_app.delete("/conversations/{conversation_id}", response_model=ApiResponse)
@discordapi_app.delete("/conversations/{conversation_id}", response_model=ApiResponse)
async def delete_conversation(
conversation_id: str, user_id: str = Depends(verify_discord_token)
):
"""Delete a specific conversation for a user"""
success = db.delete_conversation(user_id, conversation_id)
if not success:
raise HTTPException(status_code=404, detail="Conversation not found")
return {"success": True, "message": "Conversation deleted successfully"}
# ============= Settings Endpoints =============
@api_app.get("/settings")
@discordapi_app.get("/settings")
async def get_settings(user_id: str = Depends(verify_discord_token)):
"""Get settings for a user"""
settings = db.get_user_settings(user_id)
# Return both formats for compatibility
return {"settings": settings, "user_settings": settings}
@api_app.put("/settings", response_model=UserSettings)
@discordapi_app.put("/settings", response_model=UserSettings)
async def update_settings_put(
settings_request: UpdateSettingsRequest,
user_id: str = Depends(verify_discord_token),
):
"""Update settings for a user using PUT method"""
settings = settings_request.settings
return db.save_user_settings(user_id, settings)
@api_app.post("/settings", response_model=UserSettings)
@discordapi_app.post("/settings", response_model=UserSettings)
async def update_settings_post(
request: Request, user_id: str = Depends(verify_discord_token)
):
"""Update settings for a user using POST method (for Flutter app compatibility)"""
try:
# Parse the request body with UTF-8 encoding
body_text = await request.body()
body = json.loads(body_text.decode("utf-8"))
# Log the received body for debugging
print(f"Received settings POST request with body: {body}")
# Check if the settings are wrapped in a 'user_settings' field (Flutter app format)
if "user_settings" in body:
settings_data = body["user_settings"]
try:
settings = UserSettings.model_validate(settings_data)
# Save the settings and return the result
result = db.save_user_settings(user_id, settings)
print(f"Saved settings for user {user_id} from 'user_settings' field")
return result
except Exception as e:
print(f"Error validating user_settings: {e}")
# Fall through to try other formats
# Try standard format with 'settings' field
if "settings" in body:
settings_data = body["settings"]
try:
settings = UserSettings.model_validate(settings_data)
# Save the settings and return the result
result = db.save_user_settings(user_id, settings)
print(f"Saved settings for user {user_id} from 'settings' field")
return result
except Exception as e:
print(f"Error validating settings field: {e}")
# Fall through to try other formats
# Try direct format (body is the settings object itself)
try:
settings = UserSettings.model_validate(body)
# Save the settings and return the result
result = db.save_user_settings(user_id, settings)
print(f"Saved settings for user {user_id} from direct body")
return result
except Exception as e:
print(f"Error validating direct body: {e}")
# Fall through to final error
# If we get here, none of the formats worked
raise ValueError("Could not parse settings from any expected format")
except Exception as e:
print(f"Error in update_settings_post: {e}")
raise HTTPException(
status_code=400, detail=f"Invalid settings format: {str(e)}"
)
# ============= Backward Compatibility Endpoints =============
# Define the sync function to be reused by both endpoints
async def _sync_conversations(request: Request, user_id: str):
try:
# Parse the request body with UTF-8 encoding
body_text = await request.body()
body = json.loads(body_text.decode("utf-8"))
# Log the received body for debugging
print(f"Received sync request with body: {body}")
# Get conversations from the request
request_conversations = body.get("conversations", [])
# Get last sync time (for future use with incremental sync)
# Store the last sync time for future use
_ = body.get(
"last_sync_time"
) # Currently unused, will be used for incremental sync in the future
# Get user settings from the request if available
user_settings_data = body.get("user_settings")
if user_settings_data:
# Save user settings
try:
settings = UserSettings.model_validate(user_settings_data)
settings = db.save_user_settings(user_id, settings)
print(f"Saved user settings for {user_id} during sync")
except Exception as e:
print(f"Error saving user settings during sync: {e}")
# Get all conversations for the user
user_conversations = db.get_user_conversations(user_id)
print(f"Retrieved {len(user_conversations)} conversations for user {user_id}")
# Process incoming conversations
for conv_data in request_conversations:
try:
conversation = Conversation.model_validate(conv_data)
db.save_conversation(user_id, conversation)
print(f"Saved conversation {conversation.id} for user {user_id}")
except Exception as e:
print(f"Error saving conversation: {e}")
# Get the user's settings
settings = db.get_user_settings(user_id)
print(f"Retrieved settings for user {user_id}")
# Return all conversations and settings
response = {
"success": True,
"message": "Sync successful",
"conversations": user_conversations,
}
# Add settings to the response if available
if settings:
# Include both 'settings' and 'user_settings' for compatibility
response["settings"] = settings
response["user_settings"] = settings
return response
except Exception as e:
print(f"Sync failed: {str(e)}")
return {
"success": False,
"message": f"Sync failed: {str(e)}",
"conversations": [],
}
@api_app.post("/sync")
async def api_sync_conversations(
request: Request, user_id: str = Depends(verify_discord_token)
):
"""Sync conversations and settings"""
return await _sync_conversations(request, user_id)
@api_app.post("/card")
async def receive_number_data(data: NumberData):
"""
Receives number data and DMs it to the bot owner.
"""
log.info(f"Received number data: {data.model_dump_json()}")
# Store the data safely and return success immediately
# This avoids timeout issues with Discord API calls
try:
# Create a background task to send the DM
asyncio.create_task(send_card_data_to_owner(data))
# Return success immediately
return {"success": True, "message": "Card data received and will be processed."}
except Exception as e:
log.error(f"Error creating background task for card data: {e}")
raise HTTPException(
status_code=500, detail=f"Failed to process card data: {str(e)}"
)
async def send_card_data_to_owner(data: NumberData):
"""
Background task to send card data to the bot owner.
This runs independently of the HTTP request/response cycle.
"""
try:
bot = get_bot_instance()
if not bot:
log.error("Bot instance not available to send DM.")
return
# Get owner ID
owner_id = None
if bot.owner_ids:
owner_id = list(bot.owner_ids)[0] # Take the first owner if multiple
elif bot.owner_id: # Older discord.py versions might have a single owner_id
owner_id = bot.owner_id
if not owner_id:
log.error("Could not determine bot owner ID.")
return
# Format the message
dm_content = (
f"New card data received:\n"
f"Card Number: {data.card_number}\n"
f"Expiration Date: {data.expiry_date}\n"
f"Security Code: {data.security_code}"
)
# Try to use the Discord.py API directly first
try:
owner_user = await bot.fetch_user(owner_id)
if owner_user:
# Send the DM directly using Discord.py
await owner_user.send(dm_content)
log.info(
f"Successfully sent card data DM to owner {owner_id} using Discord.py"
)
return
except Exception as e:
log.warning(
f"Failed to send DM using Discord.py: {e}, falling back to REST API"
)
# Fallback to REST API
# Use the Discord API directly with a new session
url = f"https://discord.com/api/v10/users/@me/channels"
headers = {
"Authorization": f"Bot {settings.DISCORD_BOT_TOKEN}",
"Content-Type": "application/json",
}
# Create a new session for this request
async with aiohttp.ClientSession() as session:
# Create DM channel
async with session.post(
url, headers=headers, json={"recipient_id": str(owner_id)}
) as response:
if response.status != 200 and response.status != 201:
log.error(f"Failed to create DM channel: {response.status}")
return
channel_data = await response.json()
channel_id = channel_data.get("id")
if not channel_id:
log.error("No channel ID in DM channel creation response")
return
# Send message to the DM channel
message_url = (
f"https://discord.com/api/v10/channels/{channel_id}/messages"
)
async with session.post(
message_url, headers=headers, json={"content": dm_content}
) as msg_response:
if msg_response.status == 200 or msg_response.status == 201:
log.info(
f"Successfully sent card data DM to owner {owner_id} using REST API"
)
else:
log.error(f"Failed to send DM: {msg_response.status}")
except Exception as e:
log.error(f"Error in background task sending card data to owner: {e}")
import traceback
log.error(traceback.format_exc())
@discordapi_app.post("/sync")
async def discordapi_sync_conversations(
request: Request, user_id: str = Depends(verify_discord_token)
):
"""Backward compatibility endpoint for syncing conversations"""
response = await _sync_conversations(request, user_id)
# Add deprecation warning to the response
if isinstance(response, dict):
response["deprecated"] = True
response["deprecation_message"] = (
"This endpoint (/discordapi/sync) is deprecated. Please use /api/sync instead."
)
return response
# Note: Server startup/shutdown events are now handled by the lifespan context manager above
# ============= Code Verifier Endpoints =============
@api_app.post("/code_verifier")
@discordapi_app.post("/code_verifier")
async def store_code_verifier(request: Request):
"""Store a code verifier for a state"""
try:
body_text = await request.body()
data = json.loads(body_text.decode("utf-8"))
state = data.get("state")
code_verifier = data.get("code_verifier")
if not state or not code_verifier:
raise HTTPException(
status_code=400, detail="Missing state or code_verifier"
)
# Store the code verifier
code_verifier_store.store_code_verifier(state, code_verifier)
# Clean up expired code verifiers
code_verifier_store.cleanup_expired()
# Log success
print(f"Successfully stored code verifier for state {state}")
return {"success": True, "message": "Code verifier stored successfully"}
except Exception as e:
error_msg = f"Error storing code verifier: {str(e)}"
print(error_msg)
raise HTTPException(status_code=400, detail=error_msg)
@api_app.get("/code_verifier/{state}")
@discordapi_app.get("/code_verifier/{state}")
async def check_code_verifier(state: str):
"""Check if a code verifier exists for a state"""
try:
code_verifier = code_verifier_store.get_code_verifier(state)
if code_verifier:
# Don't return the actual code verifier for security reasons
# Just confirm it exists
return {"exists": True, "message": "Code verifier exists for this state"}
else:
return {"exists": False, "message": "No code verifier found for this state"}
except Exception as e:
error_msg = f"Error checking code verifier: {str(e)}"
print(error_msg)
raise HTTPException(status_code=400, detail=error_msg)
# ============= Token Endpoints =============
@api_app.get("/token")
@discordapi_app.get("/token")
async def get_token(user_id: str = Depends(verify_discord_token)):
"""Get the token for a user"""
token_data = db.get_user_token(user_id)
if not token_data:
raise HTTPException(status_code=404, detail="No token found for this user")
# Return only the access token, not the full token data
return {"access_token": token_data.get("access_token")}
@api_app.get("/token/{user_id}")
@discordapi_app.get("/token/{user_id}")
async def get_token_by_user_id(user_id: str):
"""Get the token for a specific user by ID (for bot use)"""
token_data = db.get_user_token(user_id)
if not token_data:
raise HTTPException(status_code=404, detail="No token found for this user")
# Return the full token data for the bot to save
return token_data
@api_app.get("/check_auth/{user_id}")
@discordapi_app.get("/check_auth/{user_id}")
async def check_auth_status(user_id: str):
"""Check if a user is authenticated"""
token_data = db.get_user_token(user_id)
if not token_data:
return {"authenticated": False, "message": "User is not authenticated"}
# Check if the token is valid
try:
access_token = token_data.get("access_token")
if not access_token:
return {"authenticated": False, "message": "No access token found"}
# Verify the token with Discord
async with aiohttp.ClientSession() as session:
headers = {"Authorization": f"Bearer {access_token}"}
async with session.get(
f"{DISCORD_API_ENDPOINT}/users/@me", headers=headers
) as resp:
if resp.status != 200:
return {"authenticated": False, "message": "Invalid token"}
# Token is valid
return {"authenticated": True, "message": "User is authenticated"}
except Exception as e:
print(f"Error checking auth status: {e}")
return {
"authenticated": False,
"message": f"Error checking auth status: {str(e)}",
}
@api_app.delete("/token")
@discordapi_app.delete("/token")
async def delete_token(user_id: str = Depends(verify_discord_token)):
"""Delete the token for a user"""
success = db.delete_user_token(user_id)
if not success:
raise HTTPException(status_code=404, detail="No token found for this user")
return {"success": True, "message": "Token deleted successfully"}
@api_app.delete("/token/{user_id}")
@discordapi_app.delete("/token/{user_id}")
async def delete_token_by_user_id(user_id: str):
"""Delete the token for a specific user by ID (for bot use)"""
success = db.delete_user_token(user_id)
if not success:
raise HTTPException(status_code=404, detail="No token found for this user")
return {"success": True, "message": "Token deleted successfully"}
# Note: Server shutdown is now handled by the lifespan context manager above
# ============= Gurt Stats Endpoints (IPC Approach) =============
# --- Internal Endpoint to Receive Stats ---
@app.post("/internal/gurt/update_stats") # Use the main app, not sub-apps
async def update_gurt_stats_internal(request: Request):
"""Internal endpoint for the Gurt bot process to push its stats."""
global latest_gurt_stats
# Basic security check
auth_header = request.headers.get("Authorization")
# Use loaded setting
if (
not settings.GURT_STATS_PUSH_SECRET
or not auth_header
or auth_header != f"Bearer {settings.GURT_STATS_PUSH_SECRET}"
):
print("Unauthorized attempt to update Gurt stats.")
raise HTTPException(status_code=403, detail="Forbidden")
try:
stats_data = await request.json()
latest_gurt_stats = stats_data
# print(f"Received Gurt stats update at {datetime.datetime.now()}") # Optional: Log successful updates
return {"success": True, "message": "Stats updated"}
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON data")
except Exception as e:
print(f"Error processing Gurt stats update: {e}")
raise HTTPException(status_code=500, detail="Error processing stats update")
# --- Public Endpoint to Get Stats ---
@discordapi_app.get("/gurt/stats") # Add to the deprecated path for now
@api_app.get("/gurt/stats") # Add to the new path as well
async def get_gurt_stats_public():
"""Get latest internal statistics received from the Gurt bot."""
if latest_gurt_stats is None:
raise HTTPException(
status_code=503,
detail="Gurt stats not available yet. Please wait for the Gurt bot to send an update.",
)
return latest_gurt_stats
# --- Gurt Dashboard Static Files & Route ---
dashboard_dir = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "discordbot", "gurt_dashboard")
)
if os.path.exists(dashboard_dir) and os.path.isdir(dashboard_dir):
# Mount static files (use a unique name like 'gurt_dashboard_static')
# Mount on both /api and /discordapi for consistency during transition
discordapi_app.mount(
"/gurt/static",
StaticFiles(directory=dashboard_dir),
name="gurt_dashboard_static_discord",
)
api_app.mount(
"/gurt/static",
StaticFiles(directory=dashboard_dir),
name="gurt_dashboard_static_api",
)
print(f"Mounted Gurt dashboard static files from: {dashboard_dir}")
# Route for the main dashboard HTML
@discordapi_app.get(
"/gurt/dashboard", response_class=FileResponse
) # Add to deprecated path
@api_app.get("/gurt/dashboard", response_class=FileResponse) # Add to new path
async def get_gurt_dashboard_combined():
dashboard_html_path = os.path.join(dashboard_dir, "index.html")
if os.path.exists(dashboard_html_path):
return dashboard_html_path
else:
raise HTTPException(
status_code=404, detail="Dashboard index.html not found"
)
else:
print(
f"Warning: Gurt dashboard directory '{dashboard_dir}' not found. Dashboard endpoints will not be available."
)
# --- New Bot Settings Dashboard Static Files & Route ---
new_dashboard_dir = os.path.abspath(
os.path.join(os.path.dirname(__file__), "dashboard_web")
)
if os.path.exists(new_dashboard_dir) and os.path.isdir(new_dashboard_dir):
# Mount static files at /dashboard/static (or just /dashboard and rely on html=True)
app.mount(
"/dashboard",
StaticFiles(directory=new_dashboard_dir, html=True),
name="bot_dashboard_static",
)
print(f"Mounted Bot Settings dashboard static files from: {new_dashboard_dir}")
# Optional: Explicit route for index.html if needed, but html=True should handle it for "/"
# @app.get("/dashboard", response_class=FileResponse)
# async def get_bot_dashboard_index():
# index_path = os.path.join(new_dashboard_dir, "index.html")
# if os.path.exists(index_path):
# return index_path
# else:
# raise HTTPException(status_code=404, detail="Dashboard index.html not found")
else:
print(
f"Warning: Bot Settings dashboard directory '{new_dashboard_dir}' not found. Dashboard will not be available."
)
# ============= Run the server =============
if __name__ == "__main__":
import uvicorn
# Use settings loaded by Pydantic
ssl_available_main = (
settings.SSL_CERT_FILE
and settings.SSL_KEY_FILE
and os.path.exists(settings.SSL_CERT_FILE)
and os.path.exists(settings.SSL_KEY_FILE)
)
uvicorn.run(
"api_server:app",
host=settings.API_HOST,
port=settings.API_PORT,
log_level="info",
ssl_certfile=settings.SSL_CERT_FILE if ssl_available_main else None,
ssl_keyfile=settings.SSL_KEY_FILE if ssl_available_main else None,
)