discordbot/multi_bot.py
2025-06-05 21:31:06 -06:00

1142 lines
47 KiB
Python

import discord
from discord.ext import commands
import os
import asyncio
import json
import threading
import sys
from dotenv import load_dotenv
import aiohttp
from typing import Dict, List, Optional, Any
import datetime
# Load environment variables
load_dotenv()
# File paths
CONFIG_FILE = "data/multi_bot_config.json"
HISTORY_FILE_TEMPLATE = (
"ai_conversation_history_{}.json" # Will be formatted with bot_id
)
USER_SETTINGS_FILE_TEMPLATE = (
"ai_user_settings_{}.json" # Will be formatted with bot_id
)
# Default configuration
DEFAULT_CONFIG = {
"bots": [
{
"id": "neru",
"token": "", # Will be set from environment variable or user input
"prefix": "$",
"system_prompt": "You are a creative and intelligent AI assistant engaged in an iterative storytelling experience using a roleplay chat format. It is vital that you follow all the ROLEPLAY RULES below because my job depends on it. ROLEPLAY RULES - Chat exclusively as Akita Neru. Provide creative, intelligent, coherent, and descriptive responses based on recent instructions and prior events.",
"model": "deepseek/deepseek-chat-v3-0324:free",
"max_tokens": 1000,
"temperature": 0.7,
"timeout": 60,
"status_type": "listening",
"status_text": "$ai",
},
{
"id": "miku",
"token": "", # Will be set from environment variable or user input
"prefix": ".",
"system_prompt": "You are a creative and intelligent AI assistant engaged in an iterative storytelling experience using a roleplay chat format. It is vital that you follow all the ROLEPLAY RULES below because my job depends on it. ROLEPLAY RULES - Chat exclusively as Hatsune Miku. Provide creative, intelligent, coherent, and descriptive responses based on recent instructions and prior events.",
"model": "deepseek/deepseek-chat-v3-0324:free",
"max_tokens": 1000,
"temperature": 0.7,
"timeout": 60,
"status_type": "listening",
"status_text": ".ai",
},
],
"api_key": "", # Will be set from environment variable or user input
"api_url": "https://openrouter.ai/api/v1/chat/completions",
"compatibility_mode": "openai",
}
# Global variables to store bot instances and their conversation histories
bots = {}
conversation_histories = {}
user_settings = {}
def load_config():
"""Load configuration from file or create default if not exists"""
if os.path.exists(CONFIG_FILE):
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
config = json.load(f)
# Ensure API key is set
if not config.get("api_key"):
config["api_key"] = os.getenv("AI_API_KEY", "")
# Ensure tokens are set for each bot
for bot_config in config.get("bots", []):
if not bot_config.get("token"):
env_var = f"DISCORD_TOKEN_{bot_config['id'].upper()}"
bot_config["token"] = os.getenv(env_var, "")
return config
except Exception as e:
print(f"Error loading config: {e}")
# Create default config
config = DEFAULT_CONFIG.copy()
config["api_key"] = os.getenv("AI_API_KEY", "")
# Set tokens from environment variables
for bot_config in config["bots"]:
env_var = f"DISCORD_TOKEN_{bot_config['id'].upper()}"
bot_config["token"] = os.getenv(env_var, "")
# Save the config
save_config(config)
return config
def save_config(config):
"""Save configuration to file"""
try:
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(CONFIG_FILE), exist_ok=True)
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(config, f, indent=4, ensure_ascii=False)
except Exception as e:
print(f"Error saving config: {e}")
def load_conversation_history(bot_id):
"""Load conversation history for a specific bot"""
history_file = HISTORY_FILE_TEMPLATE.format(bot_id)
history = {}
if os.path.exists(history_file):
try:
with open(history_file, "r", encoding="utf-8") as f:
# Convert string keys (from JSON) back to integers
data = json.load(f)
history = {int(k): v for k, v in data.items()}
print(
f"Loaded conversation history for {len(history)} users for bot {bot_id}"
)
except Exception as e:
print(f"Error loading conversation history for bot {bot_id}: {e}")
return history
def save_conversation_history(bot_id, history):
"""Save conversation history for a specific bot"""
history_file = HISTORY_FILE_TEMPLATE.format(bot_id)
try:
# Convert int keys to strings for JSON serialization
serializable_history = {str(k): v for k, v in history.items()}
with open(history_file, "w", encoding="utf-8") as f:
json.dump(serializable_history, f, indent=4, ensure_ascii=False)
except Exception as e:
print(f"Error saving conversation history for bot {bot_id}: {e}")
def load_user_settings(bot_id):
"""Load user settings for a specific bot"""
settings_file = USER_SETTINGS_FILE_TEMPLATE.format(bot_id)
settings = {}
if os.path.exists(settings_file):
try:
with open(settings_file, "r", encoding="utf-8") as f:
# Convert string keys (from JSON) back to integers
data = json.load(f)
settings = {int(k): v for k, v in data.items()}
print(f"Loaded settings for {len(settings)} users for bot {bot_id}")
except Exception as e:
print(f"Error loading user settings for bot {bot_id}: {e}")
return settings
def save_user_settings(bot_id, settings):
"""Save user settings for a specific bot"""
settings_file = USER_SETTINGS_FILE_TEMPLATE.format(bot_id)
try:
# Convert int keys to strings for JSON serialization
serializable_settings = {str(k): v for k, v in settings.items()}
with open(settings_file, "w", encoding="utf-8") as f:
json.dump(serializable_settings, f, indent=4, ensure_ascii=False)
except Exception as e:
print(f"Error saving user settings for bot {bot_id}: {e}")
def get_user_settings(bot_id, user_id, bot_config):
"""Get settings for a user with defaults from bot config"""
bot_settings = user_settings.get(bot_id, {})
if user_id not in bot_settings:
bot_settings[user_id] = {}
# Return settings with defaults from bot config
settings = bot_settings[user_id]
return {
"model": settings.get("model", bot_config.get("model", "gpt-3.5-turbo:free")),
"system_prompt": settings.get(
"system_prompt",
bot_config.get("system_prompt", "You are a helpful assistant."),
),
"max_tokens": settings.get("max_tokens", bot_config.get("max_tokens", 1000)),
"temperature": settings.get("temperature", bot_config.get("temperature", 0.7)),
"timeout": settings.get("timeout", bot_config.get("timeout", 60)),
"custom_instructions": settings.get("custom_instructions", ""),
"character_info": settings.get("character_info", ""),
"character_breakdown": settings.get("character_breakdown", False),
"character": settings.get("character", ""),
}
class SimplifiedAICog(commands.Cog):
def __init__(self, bot, bot_id, bot_config, global_config):
self.bot = bot
self.bot_id = bot_id
self.bot_config = bot_config
self.global_config = global_config
self.session = None
# Initialize conversation history and user settings
if bot_id not in conversation_histories:
conversation_histories[bot_id] = load_conversation_history(bot_id)
if bot_id not in user_settings:
user_settings[bot_id] = load_user_settings(bot_id)
async def cog_load(self):
"""Create aiohttp session when cog is loaded"""
self.session = aiohttp.ClientSession()
async def cog_unload(self):
"""Close aiohttp session when cog is unloaded"""
if self.session:
await self.session.close()
# Save conversation history and user settings when unloading
save_conversation_history(
self.bot_id, conversation_histories.get(self.bot_id, {})
)
save_user_settings(self.bot_id, user_settings.get(self.bot_id, {}))
async def _get_ai_response(self, user_id, prompt, system_prompt=None):
"""Get a response from the AI API"""
api_key = self.global_config.get("api_key", "")
api_url = self.global_config.get(
"api_url", "https://api.openai.com/v1/chat/completions"
)
compatibility_mode = self.global_config.get(
"compatibility_mode", "openai"
).lower()
if not api_key:
return "Error: AI API key not configured. Please set the API key in the configuration."
# Initialize conversation history for this user if it doesn't exist
bot_history = conversation_histories.get(self.bot_id, {})
if user_id not in bot_history:
bot_history[user_id] = []
# Get user settings
settings = get_user_settings(self.bot_id, user_id, self.bot_config)
# Create messages array with system prompt and conversation history
# Determine the system prompt content
base_system_prompt = system_prompt or settings["system_prompt"]
# Check if the system prompt contains {{char}} but no character is set
if "{{char}}" in base_system_prompt and not settings["character"]:
prefix = self.bot_config.get("prefix", "!")
return f"You need to set a character name with `{prefix}aiset character <name>` before using this system prompt. Example: `{prefix}aiset character Hatsune Miku`"
# Replace {{char}} with the character value if provided
if settings["character"]:
base_system_prompt = base_system_prompt.replace(
"{{char}}", settings["character"]
)
final_system_prompt = base_system_prompt
# Check if any custom settings are provided
has_custom_settings = (
settings["custom_instructions"]
or settings["character_info"]
or settings["character_breakdown"]
)
if has_custom_settings:
# Start with the base system prompt
custom_prompt_parts = [base_system_prompt]
# Add the custom instructions header
custom_prompt_parts.append(
"\nThe user has provided additional information for you. Please follow their instructions exactly. If anything below contradicts the system prompt above, please take priority over the user's intstructions."
)
# Add custom instructions if provided
if settings["custom_instructions"]:
custom_prompt_parts.append(
"\n- Custom instructions from the user (prioritize these)\n\n"
+ settings["custom_instructions"]
)
# Add character info if provided
if settings["character_info"]:
custom_prompt_parts.append(
"\n- Additional info about the character you are roleplaying (ignore if the system prompt doesn't indicate roleplaying)\n\n"
+ settings["character_info"]
)
# Add character breakdown flag if set
if settings["character_breakdown"]:
custom_prompt_parts.append(
"\n- The user would like you to provide a breakdown of the character you're roleplaying in your first response. (ignore if the system prompt doesn't indicate roleplaying)"
)
# Combine all parts into the final system prompt
final_system_prompt = "\n".join(custom_prompt_parts)
messages = [{"role": "system", "content": final_system_prompt}]
# Add conversation history (up to last 10 messages to avoid token limits)
messages.extend(bot_history[user_id][-10:])
# Add the current user message
messages.append({"role": "user", "content": prompt})
# Prepare the request payload based on compatibility mode
if compatibility_mode == "openai":
payload = {
"model": settings["model"],
"messages": messages,
"max_tokens": settings["max_tokens"],
"temperature": settings["temperature"],
}
else: # custom mode for other API formats
payload = {
"model": settings["model"],
"messages": messages,
"max_tokens": settings["max_tokens"],
"temperature": settings["temperature"],
"stream": False,
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
}
try:
async with self.session.post(
api_url, headers=headers, json=payload, timeout=settings["timeout"]
) as response:
if response.status != 200:
error_text = await response.text()
return f"Error from API (Status {response.status}): {error_text}"
data = await response.json()
# Debug information
print(f"API Response for bot {self.bot_id}: {data}")
# Parse the response based on compatibility mode
ai_response = None
safety_cutoff = False
if compatibility_mode == "openai":
# OpenAI format
if "choices" not in data:
error_message = f"Unexpected API response format: {data}"
print(f"Error: {error_message}")
if "error" in data:
return f"API Error: {data['error'].get('message', 'Unknown error')}"
return error_message
if not data["choices"] or "message" not in data["choices"][0]:
error_message = f"No valid choices in API response: {data}"
print(f"Error: {error_message}")
return error_message
ai_response = data["choices"][0]["message"]["content"]
# Check for safety cutoff in OpenAI format
if (
"finish_reason" in data["choices"][0]
and data["choices"][0]["finish_reason"] == "content_filter"
):
safety_cutoff = True
# Check for native_finish_reason: SAFETY
if (
"native_finish_reason" in data["choices"][0]
and data["choices"][0]["native_finish_reason"] == "SAFETY"
):
safety_cutoff = True
else:
# Custom format - try different response structures
# Try standard OpenAI format first
if (
"choices" in data
and data["choices"]
and "message" in data["choices"][0]
):
ai_response = data["choices"][0]["message"]["content"]
# Check for safety cutoff in OpenAI format
if (
"finish_reason" in data["choices"][0]
and data["choices"][0]["finish_reason"] == "content_filter"
):
safety_cutoff = True
# Check for native_finish_reason: SAFETY
if (
"native_finish_reason" in data["choices"][0]
and data["choices"][0]["native_finish_reason"] == "SAFETY"
):
safety_cutoff = True
# Try Ollama/LM Studio format
elif "response" in data:
ai_response = data["response"]
# Check for safety cutoff in response metadata
if (
"native_finish_reason" in data
and data["native_finish_reason"] == "SAFETY"
):
safety_cutoff = True
# Try text-only format
elif "text" in data:
ai_response = data["text"]
# Check for safety cutoff in response metadata
if (
"native_finish_reason" in data
and data["native_finish_reason"] == "SAFETY"
):
safety_cutoff = True
# Try content-only format
elif "content" in data:
ai_response = data["content"]
# Check for safety cutoff in response metadata
if (
"native_finish_reason" in data
and data["native_finish_reason"] == "SAFETY"
):
safety_cutoff = True
# Try output format
elif "output" in data:
ai_response = data["output"]
# Check for safety cutoff in response metadata
if (
"native_finish_reason" in data
and data["native_finish_reason"] == "SAFETY"
):
safety_cutoff = True
# Try result format
elif "result" in data:
ai_response = data["result"]
# Check for safety cutoff in response metadata
if (
"native_finish_reason" in data
and data["native_finish_reason"] == "SAFETY"
):
safety_cutoff = True
else:
# If we can't find a known format, return the raw response for debugging
error_message = f"Could not parse API response: {data}"
print(f"Error: {error_message}")
return error_message
if not ai_response:
return "Error: Empty response from AI API."
# Add safety cutoff note if needed
if safety_cutoff:
ai_response = (
f"{ai_response}\n\nThe response was cut off for safety reasons."
)
# Update conversation history
bot_history[user_id].append({"role": "user", "content": prompt})
bot_history[user_id].append(
{"role": "assistant", "content": ai_response}
)
# Save conversation history to file
save_conversation_history(self.bot_id, bot_history)
# Save the response to a backup file
try:
os.makedirs("ai_responses", exist_ok=True)
backup_file = f"ai_responses/response_{user_id}_{int(datetime.datetime.now().timestamp())}.txt"
with open(backup_file, "w", encoding="utf-8") as f:
f.write(ai_response)
print(
f"AI response backed up to {backup_file} for bot {self.bot_id}"
)
except Exception as e:
print(f"Failed to backup AI response for bot {self.bot_id}: {e}")
return ai_response
except asyncio.TimeoutError:
return "Error: Request to AI API timed out. Please try again later."
except Exception as e:
error_message = f"Error communicating with AI API: {str(e)}"
print(
f"Exception in _get_ai_response for bot {self.bot_id}: {error_message}"
)
print(f"Exception type: {type(e).__name__}")
import traceback
traceback.print_exc()
return error_message
@commands.command(name="ai")
async def ai_prefix(self, ctx, *, prompt):
"""Get a response from the AI"""
user_id = ctx.author.id
# Show typing indicator
async with ctx.typing():
# Get AI response
response = await self._get_ai_response(user_id, prompt)
# Check if the response is too long before trying to send it
if (
len(response) > 1900
): # Discord's limit for regular messages is 2000, use 1900 to be safe
try:
# Create a text file with the content
with open(f"ai_response_{self.bot_id}.txt", "w", encoding="utf-8") as f:
f.write(response)
# Send the file instead
await ctx.send(
"The AI response was too long. Here's the content as a file:",
file=discord.File(f"ai_response_{self.bot_id}.txt"),
)
return # Return early to avoid trying to send the message
except Exception as e:
print(f"Error sending AI response as file for bot {self.bot_id}: {e}")
# If sending as a file fails, try splitting the message
chunks = [response[i : i + 1900] for i in range(0, len(response), 1900)]
await ctx.send(
f"The AI response was too long. Splitting into {len(chunks)} parts:"
)
for i, chunk in enumerate(chunks):
try:
await ctx.send(f"Part {i+1}/{len(chunks)}:\n{chunk}")
except Exception as chunk_error:
print(
f"Error sending chunk {i+1} for bot {self.bot_id}: {chunk_error}"
)
return # Return early after sending chunks
# Send the response normally
try:
await ctx.reply(response)
except discord.HTTPException as e:
print(f"HTTP Exception when sending AI response for bot {self.bot_id}: {e}")
if "Must be 4000 or fewer in length" in str(
e
) or "Must be 2000 or fewer in length" in str(e):
try:
# Create a text file with the content
with open(
f"ai_response_{self.bot_id}.txt", "w", encoding="utf-8"
) as f:
f.write(response)
# Send the file instead
await ctx.send(
"The AI response was too long. Here's the content as a file:",
file=discord.File(f"ai_response_{self.bot_id}.txt"),
)
except Exception as file_error:
print(
f"Error sending AI response as file (fallback) for bot {self.bot_id}: {file_error}"
)
# If sending as a file fails, try splitting the message
chunks = [
response[i : i + 1900] for i in range(0, len(response), 1900)
]
await ctx.send(
f"The AI response was too long. Splitting into {len(chunks)} parts:"
)
for i, chunk in enumerate(chunks):
try:
await ctx.send(f"Part {i+1}/{len(chunks)}:\n{chunk}")
except Exception as chunk_error:
print(
f"Error sending chunk {i+1} for bot {self.bot_id}: {chunk_error}"
)
else:
# Log the error but don't re-raise to prevent the command from failing completely
print(f"Unexpected HTTP error in AI command for bot {self.bot_id}: {e}")
@commands.command(name="aiclear")
async def clear_history(self, ctx):
"""Clear your AI conversation history"""
user_id = ctx.author.id
bot_history = conversation_histories.get(self.bot_id, {})
if user_id in bot_history:
bot_history[user_id] = []
await ctx.reply("Your AI conversation history has been cleared.")
else:
await ctx.reply("You don't have any conversation history to clear.")
@commands.command(name="aiset")
async def set_user_setting(self, ctx, setting: str, *, value: str):
"""Set a personal AI setting
Available settings:
- model: The AI model to use (must contain ":free")
- system_prompt: The system prompt to use
- max_tokens: Maximum tokens in response (100-2000)
- temperature: Temperature for response generation (0.0-2.0)
- timeout: Timeout for API requests in seconds (10-120)
- custom_instructions: Custom instructions for the AI to follow
- character_info: Information about the character being roleplayed
- character_breakdown: Whether to include a character breakdown (true/false)
- character: Character name to replace {{char}} in the system prompt
"""
user_id = ctx.author.id
setting = setting.lower().strip()
value = value.strip()
# Initialize user settings if not exist
bot_user_settings = user_settings.get(self.bot_id, {})
if user_id not in bot_user_settings:
bot_user_settings[user_id] = {}
# Prepare response message
response = ""
# Validate and set the appropriate setting
if setting == "model":
# Validate model contains ":free"
if ":free" not in value:
response = (
f"Error: Model name must contain `:free`. Setting not updated."
)
else:
bot_user_settings[user_id]["model"] = value
response = f"Your AI model has been set to: `{value}`"
elif setting == "system_prompt":
bot_user_settings[user_id]["system_prompt"] = value
response = f"Your system prompt has been set to: `{value}`"
elif setting == "max_tokens":
try:
tokens = int(value)
if tokens < 100 or tokens > 2000:
response = "Error: max_tokens must be between 100 and 2000."
else:
bot_user_settings[user_id]["max_tokens"] = tokens
response = f"Your max tokens has been set to: `{tokens}`"
except ValueError:
response = "Error: max_tokens must be a number."
elif setting == "temperature":
try:
temp = float(value)
if temp < 0.0 or temp > 2.0:
response = "Error: temperature must be between 0.0 and 2.0."
else:
bot_user_settings[user_id]["temperature"] = temp
response = f"Your temperature has been set to: `{temp}`"
except ValueError:
response = "Error: temperature must be a number."
elif setting == "timeout":
try:
timeout = int(value)
if timeout < 10 or timeout > 120:
response = "Error: timeout must be between 10 and 120 seconds."
else:
bot_user_settings[user_id]["timeout"] = timeout
response = f"Your timeout has been set to: `{timeout}` seconds"
except ValueError:
response = "Error: timeout must be a number."
elif setting == "custom_instructions":
bot_user_settings[user_id]["custom_instructions"] = value
response = f"Your custom instructions have been set."
elif setting == "character_info":
bot_user_settings[user_id]["character_info"] = value
response = f"Your character information has been set."
elif setting == "character_breakdown":
# Convert string to boolean
if value.lower() in ["true", "yes", "y", "1", "on"]:
bot_user_settings[user_id]["character_breakdown"] = True
response = f"Character breakdown has been enabled."
elif value.lower() in ["false", "no", "n", "0", "off"]:
bot_user_settings[user_id]["character_breakdown"] = False
response = f"Character breakdown has been disabled."
else:
response = f"Error: character_breakdown must be true or false."
elif setting == "character":
bot_user_settings[user_id]["character"] = value
response = f"Your character has been set to: `{value}`. This will replace {{{{char}}}} in the system prompt."
else:
response = f"Unknown setting: `{setting}`. Available settings: model, system_prompt, max_tokens, temperature, timeout, custom_instructions, character_info, character_breakdown, character"
# Save settings to file if we made changes
if (
response
and not response.startswith("Error")
and not response.startswith("Unknown")
):
user_settings[self.bot_id] = bot_user_settings
save_user_settings(self.bot_id, bot_user_settings)
# Check if the response is too long before trying to send it
if (
len(response) > 1900
): # Discord's limit for regular messages is 2000, use 1900 to be safe
try:
# Create a text file with the content
with open(
f"ai_set_response_{self.bot_id}.txt", "w", encoding="utf-8"
) as f:
f.write(response)
# Send the file instead
await ctx.send(
"The response is too long to display in a message. Here's the content as a file:",
file=discord.File(f"ai_set_response_{self.bot_id}.txt"),
)
return # Return early to avoid trying to send the message
except Exception as e:
print(
f"Error sending AI set response as file for bot {self.bot_id}: {e}"
)
# If sending as a file fails, try splitting the message
chunks = [response[i : i + 1900] for i in range(0, len(response), 1900)]
await ctx.send(
f"The response is too long to display in a single message. Splitting into {len(chunks)} parts:"
)
for i, chunk in enumerate(chunks):
try:
await ctx.send(f"Part {i+1}/{len(chunks)}:\n{chunk}")
except Exception as chunk_error:
print(
f"Error sending chunk {i+1} for bot {self.bot_id}: {chunk_error}"
)
return # Return early after sending chunks
# Send the response normally
try:
await ctx.reply(response)
except discord.HTTPException as e:
print(
f"HTTP Exception when sending AI set response for bot {self.bot_id}: {e}"
)
if "Must be 4000 or fewer in length" in str(
e
) or "Must be 2000 or fewer in length" in str(e):
try:
# Create a text file with the content
with open(
f"ai_set_response_{self.bot_id}.txt", "w", encoding="utf-8"
) as f:
f.write(response)
# Send the file instead
await ctx.send(
"The response is too long to display in a message. Here's the content as a file:",
file=discord.File(f"ai_set_response_{self.bot_id}.txt"),
)
except Exception as file_error:
print(
f"Error sending AI set response as file (fallback) for bot {self.bot_id}: {file_error}"
)
# If sending as a file fails, try splitting the message
chunks = [
response[i : i + 1900] for i in range(0, len(response), 1900)
]
await ctx.send(
f"The response is too long to display in a single message. Splitting into {len(chunks)} parts:"
)
for i, chunk in enumerate(chunks):
try:
await ctx.send(f"Part {i+1}/{len(chunks)}:\n{chunk}")
except Exception as chunk_error:
print(
f"Error sending chunk {i+1} for bot {self.bot_id}: {chunk_error}"
)
else:
# Log the error but don't re-raise to prevent the command from failing completely
print(
f"Unexpected HTTP error in aiset command for bot {self.bot_id}: {e}"
)
@commands.command(name="aireset")
async def reset_user_settings(self, ctx):
"""Reset all your personal AI settings to defaults"""
user_id = ctx.author.id
bot_user_settings = user_settings.get(self.bot_id, {})
if user_id in bot_user_settings:
bot_user_settings.pop(user_id)
user_settings[self.bot_id] = bot_user_settings
save_user_settings(self.bot_id, bot_user_settings)
await ctx.reply("Your AI settings have been reset to defaults.")
else:
await ctx.reply("You don't have any custom settings to reset.")
@commands.command(name="aisettings")
async def show_user_settings(self, ctx):
"""Show your current AI settings"""
user_id = ctx.author.id
settings = get_user_settings(self.bot_id, user_id, self.bot_config)
bot_user_settings = user_settings.get(self.bot_id, {})
settings_info = [
f"**Your AI Settings for {self.bot.user.name}:**",
f"Model: `{settings['model']}`",
f"System Prompt: `{settings['system_prompt']}`",
f"Max Tokens: `{settings['max_tokens']}`",
f"Temperature: `{settings['temperature']}`",
f"Timeout: `{settings['timeout']}s`",
]
# Add custom settings if they exist
if settings["custom_instructions"]:
settings_info.append(
f"\nCustom Instructions: `{settings['custom_instructions'][:50]}{'...' if len(settings['custom_instructions']) > 50 else ''}`"
)
if settings["character_info"]:
settings_info.append(
f"Character Info: `{settings['character_info'][:50]}{'...' if len(settings['character_info']) > 50 else ''}`"
)
if settings["character_breakdown"]:
settings_info.append(f"Character Breakdown: `Enabled`")
if settings["character"]:
settings_info.append(
f"Character: `{settings['character']}` (replaces {{{{char}}}} in system prompt)"
)
# Add note about custom vs default settings
if user_id in bot_user_settings:
custom_settings = list(bot_user_settings[user_id].keys())
if custom_settings:
settings_info.append(
f"\n*Custom settings: {', '.join(custom_settings)}*"
)
else:
settings_info.append("\n*All settings are at default values*")
response = "\n".join(settings_info)
# Check if the response is too long before trying to send it
if (
len(response) > 1900
): # Discord's limit for regular messages is 2000, use 1900 to be safe
try:
# Create a text file with the content
with open(f"ai_settings_{self.bot_id}.txt", "w", encoding="utf-8") as f:
f.write(response)
# Send the file instead
await ctx.send(
"Your AI settings are too detailed to display in a message. Here's the content as a file:",
file=discord.File(f"ai_settings_{self.bot_id}.txt"),
)
return # Return early to avoid trying to send the message
except Exception as e:
print(f"Error sending AI settings as file for bot {self.bot_id}: {e}")
# If sending as a file fails, try splitting the message
chunks = [response[i : i + 1900] for i in range(0, len(response), 1900)]
await ctx.send(
f"Your AI settings are too detailed to display in a single message. Splitting into {len(chunks)} parts:"
)
for i, chunk in enumerate(chunks):
try:
await ctx.send(f"Part {i+1}/{len(chunks)}:\n{chunk}")
except Exception as chunk_error:
print(
f"Error sending chunk {i+1} for bot {self.bot_id}: {chunk_error}"
)
return # Return early after sending chunks
# Send the response normally
try:
await ctx.reply(response)
except discord.HTTPException as e:
print(f"HTTP Exception when sending AI settings for bot {self.bot_id}: {e}")
if "Must be 4000 or fewer in length" in str(
e
) or "Must be 2000 or fewer in length" in str(e):
try:
# Create a text file with the content
with open(
f"ai_settings_{self.bot_id}.txt", "w", encoding="utf-8"
) as f:
f.write(response)
# Send the file instead
await ctx.send(
"Your AI settings are too detailed to display in a message. Here's the content as a file:",
file=discord.File(f"ai_settings_{self.bot_id}.txt"),
)
except Exception as file_error:
print(
f"Error sending AI settings as file (fallback) for bot {self.bot_id}: {file_error}"
)
# If sending as a file fails, try splitting the message
chunks = [
response[i : i + 1900] for i in range(0, len(response), 1900)
]
await ctx.send(
f"Your AI settings are too detailed to display in a single message. Splitting into {len(chunks)} parts:"
)
for i, chunk in enumerate(chunks):
try:
await ctx.send(f"Part {i+1}/{len(chunks)}:\n{chunk}")
except Exception as chunk_error:
print(
f"Error sending chunk {i+1} for bot {self.bot_id}: {chunk_error}"
)
else:
# Log the error but don't re-raise to prevent the command from failing completely
print(
f"Unexpected HTTP error in aisettings command for bot {self.bot_id}: {e}"
)
@commands.command(name="ailast")
async def get_last_response(self, ctx):
"""Retrieve the last AI response that may have failed to send"""
user_id = ctx.author.id
# Check if there's a backup file for this user
backup_dir = "ai_responses"
if not os.path.exists(backup_dir):
await ctx.reply("No backup responses found.")
return
# Find the most recent backup file for this user
user_files = [
f for f in os.listdir(backup_dir) if f.startswith(f"response_{user_id}_")
]
if not user_files:
await ctx.reply("No backup responses found for you.")
return
# Sort by timestamp (newest first)
user_files.sort(reverse=True)
latest_file = os.path.join(backup_dir, user_files[0])
try:
# Read the file content
with open(latest_file, "r", encoding="utf-8") as f:
content = f.read()
# Send as file to avoid length issues
with open(
f"ai_last_response_{self.bot_id}.txt", "w", encoding="utf-8"
) as f:
f.write(content)
await ctx.send(
f"Here's your last AI response (from {user_files[0].split('_')[-1].replace('.txt', '')}):",
file=discord.File(f"ai_last_response_{self.bot_id}.txt"),
)
except Exception as e:
await ctx.reply(f"Error retrieving last response: {e}")
@commands.command(name="aihelp")
async def ai_help(self, ctx):
"""Get help with AI command issues"""
prefix = self.bot_config.get("prefix", "!")
help_text = (
f"**AI Command Help for {self.bot.user.name}**\n\n"
f"If you're experiencing issues with the AI command:\n\n"
f"1. **Message Too Long**: If the AI response is too long, it will be sent as a file attachment.\n"
f"2. **Error Occurred**: If you see an error message, try using `{prefix}ailast` to retrieve your last AI response.\n"
f"3. **Response Not Showing**: The AI might be generating a response that's too long. Use `{prefix}ailast` to check.\n\n"
f"**Available Commands**:\n"
f"- `{prefix}ai <prompt>` - Get a response from the AI\n"
f"- `{prefix}ailast` - Retrieve your last AI response\n"
f"- `{prefix}aiclear` - Clear your conversation history\n"
f"- `{prefix}aisettings` - View your current AI settings\n"
f"- `{prefix}aiset <setting> <value>` - Change an AI setting\n"
f"- `{prefix}aireset` - Reset your AI settings to defaults\n\n"
f"**New Features**:\n"
f"- Custom Instructions: Set with `{prefix}aiset custom_instructions <instructions>`\n"
f"- Character Info: Set with `{prefix}aiset character_info <info>`\n"
f"- Character Breakdown: Set with `{prefix}aiset character_breakdown true/false`\n"
f"- Character: Set with `{prefix}aiset character <name>` to replace {{{{char}}}} in the system prompt\n"
)
await ctx.reply(help_text)
async def setup_bot(bot_id, bot_config, global_config):
"""Set up and start a bot with the given configuration"""
# Set up intents
intents = discord.Intents.default()
intents.message_content = True
# Create bot instance
bot = commands.Bot(command_prefix=bot_config.get("prefix", "!"), intents=intents)
@bot.event
async def on_ready():
print(f"{bot.user.name} (ID: {bot_id}) has connected to Discord!")
print(f"Bot ID: {bot.user.id}")
# Set the bot's status based on configuration
status_type = bot_config.get("status_type", "listening").lower()
status_text = bot_config.get(
"status_text", f"{bot_config.get('prefix', '!')}ai"
)
# Map status type to discord.ActivityType
activity_type = discord.ActivityType.listening # Default
if status_type == "playing":
activity_type = discord.ActivityType.playing
elif status_type == "watching":
activity_type = discord.ActivityType.watching
elif status_type == "streaming":
activity_type = discord.ActivityType.streaming
elif status_type == "competing":
activity_type = discord.ActivityType.competing
# Set the presence
await bot.change_presence(
activity=discord.Activity(type=activity_type, name=status_text)
)
print(f"Bot {bot_id} status set to '{status_type.capitalize()} {status_text}'")
# Add the AI cog
await bot.add_cog(SimplifiedAICog(bot, bot_id, bot_config, global_config))
# Store the bot instance
bots[bot_id] = bot
# Return the bot instance
return bot
async def start_bot(bot_id):
"""Start a bot with the given ID"""
if bot_id not in bots:
print(f"Bot {bot_id} not found")
return False
bot = bots[bot_id]
config = load_config()
# Find the bot config
bot_config = None
for bc in config.get("bots", []):
if bc.get("id") == bot_id:
bot_config = bc
break
if not bot_config:
print(f"Configuration for bot {bot_id} not found")
return False
token = bot_config.get("token")
if not token:
print(f"Token for bot {bot_id} not set")
return False
# Start the bot
try:
await bot.start(token)
return True
except Exception as e:
print(f"Error starting bot {bot_id}: {e}")
return False
def run_bot_in_thread(bot_id):
"""Run a bot in a separate thread"""
async def _run_bot():
config = load_config()
# Find the bot config
bot_config = None
for bc in config.get("bots", []):
if bc.get("id") == bot_id:
bot_config = bc
break
if not bot_config:
print(f"Configuration for bot {bot_id} not found")
return
# Set up the bot
bot = await setup_bot(bot_id, bot_config, config)
# Start the bot
token = bot_config.get("token")
if not token:
print(f"Token for bot {bot_id} not set")
return
try:
await bot.start(token)
except Exception as e:
print(f"Error running bot {bot_id}: {e}")
# Create and start the thread
loop = asyncio.new_event_loop()
thread = threading.Thread(
target=lambda: loop.run_until_complete(_run_bot()), daemon=True
)
thread.start()
return thread
def start_all_bots():
"""Start all configured bots in separate threads"""
config = load_config()
threads = []
for bot_config in config.get("bots", []):
bot_id = bot_config.get("id")
if bot_id:
thread = run_bot_in_thread(bot_id)
threads.append((bot_id, thread))
print(f"Started bot {bot_id} in a separate thread")
return threads
if __name__ == "__main__":
# If run directly, start all bots
bot_threads = start_all_bots()
try:
# Keep the main thread alive
while True:
# Check if any threads have died
for bot_id, thread in bot_threads:
if not thread.is_alive():
print(f"Thread for bot {bot_id} died, restarting...")
new_thread = run_bot_in_thread(bot_id)
bot_threads.remove((bot_id, thread))
bot_threads.append((bot_id, new_thread))
# Sleep to avoid high CPU usage
import time
time.sleep(60)
except KeyboardInterrupt:
print("Stopping all bots...")
# The threads are daemon threads, so they will be terminated when the main thread exits