397 lines
13 KiB
Python
397 lines
13 KiB
Python
"""
|
|
Custom Bot Manager for handling user-specific bot instances.
|
|
This module provides functionality to create, start, stop, and manage custom bot instances
|
|
based on user-provided tokens.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import asyncio
|
|
import threading
|
|
import logging
|
|
import discord
|
|
from discord.ext import commands
|
|
import traceback
|
|
from typing import Dict, Optional, Tuple, List
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO, format="%(asctime)s:%(levelname)s:%(name)s: %(message)s"
|
|
)
|
|
log = logging.getLogger(__name__)
|
|
|
|
# Global storage for custom bot instances and their threads
|
|
custom_bots: Dict[str, commands.Bot] = {} # user_id -> bot instance
|
|
custom_bot_threads: Dict[str, threading.Thread] = {} # user_id -> thread
|
|
custom_bot_status: Dict[str, str] = {} # user_id -> status (running, stopped, error)
|
|
custom_bot_errors: Dict[str, str] = {} # user_id -> error message
|
|
|
|
# Status constants
|
|
STATUS_RUNNING = "running"
|
|
STATUS_STOPPED = "stopped"
|
|
STATUS_ERROR = "error"
|
|
|
|
# Default cogs to load for custom bots
|
|
DEFAULT_COGS = [
|
|
"cogs.help_cog",
|
|
"cogs.settings_cog",
|
|
"cogs.utility_cog",
|
|
"cogs.fun_cog",
|
|
"cogs.moderation_cog",
|
|
]
|
|
|
|
|
|
class CustomBot(commands.Bot):
|
|
"""Custom bot class with additional functionality for user-specific bots."""
|
|
|
|
def __init__(self, user_id: str, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.user_id = user_id
|
|
self.owner_id = int(os.getenv("OWNER_USER_ID", "0"))
|
|
self._cleanup_tasks = [] # Track cleanup tasks
|
|
|
|
async def setup_hook(self):
|
|
"""Called when the bot is first connected to Discord."""
|
|
log.info(f"Custom bot for user {self.user_id} is setting up...")
|
|
|
|
# Load default cogs
|
|
for cog in DEFAULT_COGS:
|
|
try:
|
|
await self.load_extension(cog)
|
|
log.info(f"Loaded extension {cog} for custom bot {self.user_id}")
|
|
except Exception as e:
|
|
log.error(
|
|
f"Failed to load extension {cog} for custom bot {self.user_id}: {e}"
|
|
)
|
|
traceback.print_exc()
|
|
|
|
async def close(self):
|
|
"""Override close to ensure proper cleanup of all resources."""
|
|
log.info(f"Closing custom bot for user {self.user_id}...")
|
|
|
|
# Close all cogs that have aiohttp sessions
|
|
for cog_name, cog in self.cogs.items():
|
|
try:
|
|
if hasattr(cog, "session") and cog.session and not cog.session.closed:
|
|
await cog.session.close()
|
|
log.info(
|
|
f"Closed aiohttp session for cog {cog_name} in custom bot {self.user_id}"
|
|
)
|
|
except Exception as e:
|
|
log.error(
|
|
f"Error closing session for cog {cog_name} in custom bot {self.user_id}: {e}"
|
|
)
|
|
|
|
# Wait a bit for sessions to close properly
|
|
await asyncio.sleep(0.1)
|
|
|
|
# Call parent close
|
|
await super().close()
|
|
log.info(f"Custom bot for user {self.user_id} closed successfully")
|
|
|
|
|
|
async def create_custom_bot(
|
|
user_id: str,
|
|
token: str,
|
|
prefix: str = "!",
|
|
status_type: str = "listening",
|
|
status_text: str = "!help",
|
|
) -> Tuple[bool, str]:
|
|
"""
|
|
Create a new custom bot instance for a user.
|
|
|
|
Args:
|
|
user_id: The Discord user ID who owns this bot
|
|
token: The Discord bot token
|
|
prefix: Command prefix for the bot
|
|
status_type: Activity type (playing, listening, watching, competing)
|
|
status_text: Status text to display
|
|
|
|
Returns:
|
|
Tuple of (success, message)
|
|
"""
|
|
# Check if a bot already exists for this user
|
|
if user_id in custom_bots and custom_bot_status.get(user_id) == STATUS_RUNNING:
|
|
return False, f"A bot is already running for user {user_id}. Stop it first."
|
|
|
|
try:
|
|
# Set up intents
|
|
intents = discord.Intents.default()
|
|
intents.message_content = True
|
|
intents.members = True
|
|
|
|
# Create bot instance
|
|
bot = CustomBot(user_id=user_id, command_prefix=prefix, intents=intents)
|
|
|
|
# Set up events
|
|
@bot.event
|
|
async def on_ready():
|
|
log.info(
|
|
f"Custom bot {bot.user.name} (ID: {bot.user.id}) for user {user_id} is ready!"
|
|
)
|
|
|
|
# Set the bot's status
|
|
activity_type = getattr(
|
|
discord.ActivityType, status_type, discord.ActivityType.listening
|
|
)
|
|
await bot.change_presence(
|
|
activity=discord.Activity(type=activity_type, name=status_text)
|
|
)
|
|
|
|
# Update status
|
|
custom_bot_status[user_id] = STATUS_RUNNING
|
|
if user_id in custom_bot_errors:
|
|
del custom_bot_errors[user_id]
|
|
|
|
@bot.event
|
|
async def on_error(event, *args, **kwargs):
|
|
log.error(
|
|
f"Error in custom bot for user {user_id} in event {event}: {sys.exc_info()[1]}"
|
|
)
|
|
custom_bot_errors[user_id] = str(sys.exc_info()[1])
|
|
|
|
# Store the bot instance
|
|
custom_bots[user_id] = bot
|
|
custom_bot_status[user_id] = STATUS_STOPPED
|
|
|
|
return True, f"Custom bot created for user {user_id}"
|
|
|
|
except Exception as e:
|
|
log.error(f"Error creating custom bot for user {user_id}: {e}")
|
|
custom_bot_status[user_id] = STATUS_ERROR
|
|
custom_bot_errors[user_id] = str(e)
|
|
return False, f"Error creating custom bot: {e}"
|
|
|
|
|
|
def run_custom_bot_in_thread(user_id: str, token: str) -> Tuple[bool, str]:
|
|
"""
|
|
Run a custom bot in a separate thread.
|
|
|
|
Args:
|
|
user_id: The Discord user ID who owns this bot
|
|
token: The Discord bot token
|
|
|
|
Returns:
|
|
Tuple of (success, message)
|
|
"""
|
|
if user_id not in custom_bots:
|
|
return False, f"No bot instance found for user {user_id}"
|
|
|
|
if user_id in custom_bot_threads and custom_bot_threads[user_id].is_alive():
|
|
return False, f"Bot is already running for user {user_id}"
|
|
|
|
bot = custom_bots[user_id]
|
|
|
|
def _run_bot_thread():
|
|
"""Run the bot in a new event loop within this thread."""
|
|
try:
|
|
# Create a new event loop for this thread
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
|
|
async def _run_bot():
|
|
try:
|
|
await bot.start(token)
|
|
except discord.errors.LoginFailure:
|
|
log.error(f"Invalid token for custom bot (user {user_id})")
|
|
custom_bot_status[user_id] = STATUS_ERROR
|
|
custom_bot_errors[user_id] = (
|
|
"Invalid Discord bot token. Please check your token and try again."
|
|
)
|
|
except Exception as e:
|
|
log.error(f"Error running custom bot for user {user_id}: {e}")
|
|
custom_bot_status[user_id] = STATUS_ERROR
|
|
custom_bot_errors[user_id] = str(e)
|
|
finally:
|
|
# Ensure proper cleanup
|
|
if not bot.is_closed():
|
|
try:
|
|
await bot.close()
|
|
except Exception as e:
|
|
log.error(
|
|
f"Error closing bot during cleanup for user {user_id}: {e}"
|
|
)
|
|
|
|
# Run the bot
|
|
loop.run_until_complete(_run_bot())
|
|
|
|
except Exception as e:
|
|
log.error(f"Error in bot thread for user {user_id}: {e}")
|
|
custom_bot_status[user_id] = STATUS_ERROR
|
|
custom_bot_errors[user_id] = str(e)
|
|
finally:
|
|
# Clean up the event loop
|
|
try:
|
|
loop.close()
|
|
except Exception as e:
|
|
log.error(f"Error closing event loop for user {user_id}: {e}")
|
|
|
|
# Create and start the thread
|
|
thread = threading.Thread(
|
|
target=_run_bot_thread, daemon=True, name=f"custom-bot-{user_id}"
|
|
)
|
|
thread.start()
|
|
|
|
# Store the thread
|
|
custom_bot_threads[user_id] = thread
|
|
|
|
return True, f"Started custom bot for user {user_id}"
|
|
|
|
|
|
def stop_custom_bot(user_id: str) -> Tuple[bool, str]:
|
|
"""
|
|
Stop a running custom bot.
|
|
|
|
Args:
|
|
user_id: The Discord user ID who owns this bot
|
|
|
|
Returns:
|
|
Tuple of (success, message)
|
|
"""
|
|
if user_id not in custom_bots:
|
|
return False, f"No bot instance found for user {user_id}"
|
|
|
|
if user_id not in custom_bot_threads or not custom_bot_threads[user_id].is_alive():
|
|
custom_bot_status[user_id] = STATUS_STOPPED
|
|
return True, f"Bot was not running for user {user_id}"
|
|
|
|
# Get the bot instance
|
|
bot = custom_bots[user_id]
|
|
|
|
def _close_bot_thread():
|
|
"""Close the bot in a proper event loop context."""
|
|
try:
|
|
# Create a new event loop for this thread
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
|
|
async def _close_bot():
|
|
try:
|
|
await bot.close()
|
|
custom_bot_status[user_id] = STATUS_STOPPED
|
|
log.info(f"Successfully closed custom bot for user {user_id}")
|
|
except Exception as e:
|
|
log.error(f"Error closing custom bot for user {user_id}: {e}")
|
|
custom_bot_status[user_id] = STATUS_ERROR
|
|
custom_bot_errors[user_id] = str(e)
|
|
|
|
# Run the close operation
|
|
loop.run_until_complete(_close_bot())
|
|
|
|
except Exception as e:
|
|
log.error(f"Error in close thread for user {user_id}: {e}")
|
|
custom_bot_status[user_id] = STATUS_ERROR
|
|
custom_bot_errors[user_id] = str(e)
|
|
finally:
|
|
# Clean up the event loop
|
|
try:
|
|
loop.close()
|
|
except Exception as e:
|
|
log.error(
|
|
f"Error closing event loop in close thread for user {user_id}: {e}"
|
|
)
|
|
|
|
# Run the close operation in a new thread
|
|
close_thread = threading.Thread(
|
|
target=_close_bot_thread, daemon=True, name=f"close-bot-{user_id}"
|
|
)
|
|
close_thread.start()
|
|
|
|
# Wait for the close thread to finish (with timeout)
|
|
close_thread.join(timeout=10.0) # Increased timeout for proper cleanup
|
|
|
|
# Clean up the thread reference
|
|
if user_id in custom_bot_threads:
|
|
del custom_bot_threads[user_id]
|
|
|
|
return True, f"Stopped custom bot for user {user_id}"
|
|
|
|
|
|
def get_custom_bot_status(user_id: str) -> Dict:
|
|
"""
|
|
Get the status of a custom bot.
|
|
|
|
Args:
|
|
user_id: The Discord user ID who owns this bot
|
|
|
|
Returns:
|
|
Dict with status information
|
|
"""
|
|
if user_id not in custom_bots:
|
|
return {
|
|
"exists": False,
|
|
"status": "not_created",
|
|
"error": None,
|
|
"is_running": False,
|
|
}
|
|
|
|
status = custom_bot_status.get(user_id, STATUS_STOPPED)
|
|
error = custom_bot_errors.get(user_id)
|
|
is_running = (
|
|
user_id in custom_bot_threads
|
|
and custom_bot_threads[user_id].is_alive()
|
|
and status == STATUS_RUNNING
|
|
)
|
|
|
|
return {"exists": True, "status": status, "error": error, "is_running": is_running}
|
|
|
|
|
|
def get_all_custom_bot_statuses() -> Dict[str, Dict]:
|
|
"""
|
|
Get the status of all custom bots.
|
|
|
|
Returns:
|
|
Dict mapping user_id to status information
|
|
"""
|
|
result = {}
|
|
for user_id in custom_bots:
|
|
result[user_id] = get_custom_bot_status(user_id)
|
|
return result
|
|
|
|
|
|
def list_custom_bots() -> List[Dict]:
|
|
"""
|
|
List all custom bot instances and their status.
|
|
|
|
Returns:
|
|
List of dictionaries containing bot information
|
|
"""
|
|
bots = []
|
|
for user_id in custom_bots.keys():
|
|
bot_info = get_custom_bot_status(user_id)
|
|
bots.append(bot_info)
|
|
return bots
|
|
|
|
|
|
def cleanup_all_custom_bots() -> None:
|
|
"""
|
|
Clean up all custom bot instances. Should be called when the main bot shuts down.
|
|
"""
|
|
log.info("Cleaning up all custom bots...")
|
|
|
|
# Stop all running bots
|
|
for user_id in list(custom_bots.keys()):
|
|
try:
|
|
if custom_bot_status.get(user_id) == STATUS_RUNNING:
|
|
log.info(f"Stopping custom bot for user {user_id}")
|
|
stop_custom_bot(user_id)
|
|
except Exception as e:
|
|
log.error(f"Error stopping custom bot for user {user_id}: {e}")
|
|
|
|
# Wait a bit for all bots to stop
|
|
import time
|
|
|
|
time.sleep(2)
|
|
|
|
# Force cleanup any remaining threads
|
|
for user_id, thread in list(custom_bot_threads.items()):
|
|
if thread.is_alive():
|
|
log.warning(f"Force terminating thread for custom bot {user_id}")
|
|
# Note: We can't force kill threads in Python, but we can clean up references
|
|
try:
|
|
del custom_bot_threads[user_id]
|
|
except KeyError:
|
|
pass
|
|
|
|
log.info("Custom bot cleanup completed")
|