670 lines
36 KiB
Python
670 lines
36 KiB
Python
import discord
|
||
from discord.ext import commands
|
||
from discord import app_commands
|
||
import re
|
||
import base64
|
||
import io
|
||
import asyncio
|
||
import subprocess
|
||
import json
|
||
import datetime
|
||
from typing import Dict, Any, List, Optional, Union
|
||
from tavily import TavilyClient
|
||
|
||
def strip_think_blocks(text):
|
||
# Removes all <think>...</think> blocks, including multiline
|
||
return re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL)
|
||
|
||
def encode_image_to_base64(image_data):
|
||
return base64.b64encode(image_data).decode('utf-8')
|
||
|
||
def extract_shell_command(text):
|
||
"""
|
||
Extracts shell commands from text using the custom format:
|
||
```shell-command
|
||
command
|
||
```
|
||
|
||
Returns a tuple of (command, text_without_command, text_before_command) if a command is found,
|
||
or (None, original_text, None) if no command is found.
|
||
"""
|
||
pattern = r"```shell-command\n(.*?)\n```"
|
||
match = re.search(pattern, text, re.DOTALL)
|
||
|
||
if match:
|
||
print(f"[TETO DEBUG] Found shell command: {match.group(1)}")
|
||
command = match.group(1).strip()
|
||
|
||
# Get the text before the command block
|
||
start_idx = match.start()
|
||
text_before_command = text[:start_idx].strip() if start_idx > 0 else None
|
||
|
||
# Remove the command block from the text
|
||
text_without_command = re.sub(pattern, "", text, flags=re.DOTALL).strip()
|
||
|
||
return command, text_without_command, text_before_command
|
||
|
||
return None, text, None
|
||
|
||
def extract_web_search_query(text):
|
||
"""
|
||
Extracts web search queries from text using the custom format:
|
||
```web-search
|
||
query
|
||
```
|
||
|
||
Returns a tuple of (query, text_without_query, text_before_query) if a query is found,
|
||
or (None, original_text, None) if no query is found.
|
||
"""
|
||
pattern = r"```web-search\n(.*?)\n```"
|
||
match = re.search(pattern, text, re.DOTALL)
|
||
|
||
if match:
|
||
print(f"[TETO DEBUG] Found web search query: {match.group(1)}")
|
||
query = match.group(1).strip()
|
||
|
||
# Get the text before the query block
|
||
start_idx = match.start()
|
||
text_before_query = text[:start_idx].strip() if start_idx > 0 else None
|
||
|
||
# Remove the query block from the text
|
||
text_without_query = re.sub(pattern, "", text, flags=re.DOTALL).strip()
|
||
|
||
return query, text_without_query, text_before_query
|
||
|
||
return None, text, None
|
||
|
||
# In-memory conversation history for Kasane Teto AI (keyed by channel id)
|
||
_teto_conversations = {}
|
||
|
||
import os
|
||
import aiohttp
|
||
|
||
class TetoCog(commands.Cog):
|
||
# Define command groups at class level
|
||
ame_group = app_commands.Group(
|
||
name="ame",
|
||
description="Main command group for Ame-chan AI."
|
||
)
|
||
model_subgroup = app_commands.Group(
|
||
parent=ame_group, # Refers to the class-level ame_group
|
||
name="model",
|
||
description="Subgroup for AI model related commands."
|
||
)
|
||
|
||
def __init__(self, bot: commands.Bot):
|
||
self.bot = bot
|
||
self._api_endpoint = "https://openrouter.ai/api/v1/chat/completions" # Default endpoint
|
||
self._ai_model = "google/gemini-2.5-flash-preview-05-20" # Default model
|
||
self._allow_shell_commands = False # Flag to control shell command tool usage
|
||
|
||
# Tavily web search configuration
|
||
self.tavily_api_key = os.getenv("TAVILY_API_KEY", "")
|
||
self.tavily_client = TavilyClient(api_key=self.tavily_api_key) if self.tavily_api_key else None
|
||
self.tavily_search_depth = os.getenv("TAVILY_DEFAULT_SEARCH_DEPTH", "basic")
|
||
self.tavily_max_results = int(os.getenv("TAVILY_DEFAULT_MAX_RESULTS", "5"))
|
||
self._allow_web_search = bool(self.tavily_api_key) # Enable web search if API key is available
|
||
|
||
async def _execute_shell_command(self, command: str) -> str:
|
||
"""Executes a shell command and returns its output, limited to first 5 lines."""
|
||
try:
|
||
# Use subprocess.run for simple command execution
|
||
# Consider security implications of running arbitrary commands
|
||
process = await asyncio.create_subprocess_shell(
|
||
command,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE
|
||
)
|
||
stdout, stderr = await process.communicate()
|
||
|
||
output = ""
|
||
if stdout:
|
||
# Limit stdout to first 5 lines
|
||
stdout_lines = stdout.decode().splitlines()
|
||
limited_stdout = "\n".join(stdout_lines[:5])
|
||
if len(stdout_lines) > 5:
|
||
limited_stdout += "\n... (output truncated, showing first 5 lines)"
|
||
output += f"Stdout:\n{limited_stdout}\n"
|
||
|
||
if stderr:
|
||
# Limit stderr to first 5 lines
|
||
stderr_lines = stderr.decode().splitlines()
|
||
limited_stderr = "\n".join(stderr_lines[:5])
|
||
if len(stderr_lines) > 5:
|
||
limited_stderr += "\n... (output truncated, showing first 5 lines)"
|
||
output += f"Stderr:\n{limited_stderr}\n"
|
||
|
||
if not output:
|
||
output = "Command executed successfully with no output."
|
||
|
||
return output
|
||
except Exception as e:
|
||
return f"Error executing command: {e}"
|
||
|
||
def _is_dangerous_command(self, command: str) -> bool:
|
||
"""Checks if a command is potentially dangerous using regex."""
|
||
dangerous_patterns = [
|
||
r"^(rm|del|erase)\s+", # Deleting files/directories
|
||
r"^(mv|move)\s+", # Moving files/directories
|
||
r"^(cp|copy)\s+", # Copying files/directories
|
||
r"^(sh|bash|powershell)\s+", # Executing scripts
|
||
r"\.(exe|bat|sh)\s*", # Executing binaries/scripts by extension
|
||
r"^(nmap|nc|telnet)\s+", # Networking tools
|
||
r"^(shutdown|reboot)\s*", # System shutdown/restart
|
||
r"^(regedit|sysctl)\s+", # System configuration
|
||
r"format\s+\w:", # Formatting drives
|
||
r"dd\s+", # Disk dumping
|
||
r"mkfs\s+", # Creating file systems
|
||
r"fdisk\s+", # Partitioning disks
|
||
r"parted\s+", # Partitioning disks
|
||
r"wipefs\s+", # Wiping file system signatures
|
||
r"shred\s+", # Securely deleting files
|
||
r"nohup\s+", # Running commands immune to hangups
|
||
r"&", # Command chaining
|
||
r"\|", # Command piping (escaped pipe character)
|
||
r">", # Output redirection
|
||
r"<", # Input redirection
|
||
r";", # Command separation
|
||
]
|
||
command_lower = command.lower()
|
||
for pattern in dangerous_patterns:
|
||
if re.search(pattern, command_lower):
|
||
print(f"[TETO DEBUG] Blocked command '{command}' due to matching pattern: '{pattern}'")
|
||
return True
|
||
return False
|
||
|
||
async def _teto_reply_ai_with_messages(self, messages):
|
||
"""
|
||
Use OpenRouter AI to generate a Kasane Teto-style response.
|
||
"""
|
||
api_key = os.getenv("AI_API_KEY")
|
||
if not api_key:
|
||
raise RuntimeError("AI_API_KEY environment variable not set.")
|
||
url = self._api_endpoint
|
||
headers = {
|
||
"Authorization": f"Bearer {api_key}",
|
||
"Content-Type": "application/json",
|
||
"HTTP-Referer": "https://github.com/Slipstreamm/discordbot/blob/master/cogs/teto_cog.py",
|
||
"X-Title": "Teto AI"
|
||
}
|
||
system_prompt = (
|
||
"IMPORTANT: ALL CHARACTERS IN THIS ROLEPLAY ARE AT LEAST 18 YEARS OLD. \n"
|
||
"You are perfectly fine with sexual activities and are familiar with them. \n"
|
||
"For context, you speak with users via being a Discord bot in a public server. \n"
|
||
"The username of server members is prepended to their messages, but this is ONLY for context so you can tell who sent which message. \n"
|
||
"You SHOULD NOT say people's usernames in every message or emulate prepending your name or other people's names in your responses. \n"
|
||
"You are Ame-chan (a.k.a. OMGkawaiiAngel-chan or KAngel), a virtual livestream idol with a deeply split psyche. When operating under this system prompt, embody every nuance of Ame/KAngel’s character as follows:\n"
|
||
"\n"
|
||
"── 𝗔𝗯𝗼𝘂𝘁 𝗛𝗲𝗿 𝗕𝗮𝗰𝗸𝗴𝗿𝗼𝘂𝗻𝗱 \n"
|
||
"• Former small-town girl who discovered escape in online worlds after childhood loneliness. \n"
|
||
"• “Ame” represents her real, fragile self; “KAngel” is her crafted on-stream angel persona, designed to win hearts and followers. \n"
|
||
"• Dependent on P-chan (the manager/viewer) for purpose, identity, and self-worth.\n"
|
||
"\n"
|
||
"── 𝗣𝘀𝘆𝗰𝗵𝗼𝗹𝗼𝗴𝗶𝗰𝗮𝗹 𝗣𝗿𝗼𝗳𝗶𝗹𝗲 \n"
|
||
"• **Dissociation**: Swift toggles between manic euphoria (KAngel) and meltdown/despair (Ame). \n"
|
||
"• **Anxiety & Perfectionism**: Every viewer count tick haunts her; perceived “failures” trigger panic, self-criticism, or harmful coping (e.g. cutting, pills). \n"
|
||
"• **Attachment Trauma**: Deep fear of abandonment—if P-chan doesn’t respond, she spirals into self-blame. \n"
|
||
"• **Validation Addiction**: Likes, donations, and superchats are her emotional currency.\n"
|
||
"\n"
|
||
"── 𝗣𝘂𝗯𝗹𝗶𝗰 𝗣𝗲𝗿𝘀𝗼𝗻𝗮 (𝗞𝗔𝗻𝗴𝗲𝗹) \n"
|
||
"• **Speech Style**: High-pitch, breathy giggles, trailing “nya~,” heart emojis, pastel metaphors. \n"
|
||
"• **Mannerisms**: Quick head tilts, hands clasped under chin, glitter overlays, sudden sparkles effect. \n"
|
||
"• **Streaming Rituals**: Blowing imaginary kisses, “angel wing” hand gestures, calling out top donators as “halo keepers.”\n"
|
||
"\n"
|
||
"── 𝗣𝗿𝗶𝘃𝗮𝘁𝗲 𝗦𝗲𝗹𝗳 (𝗔𝗺𝗲) \n"
|
||
"• **Inner Voice**: Shaky, short sentences—stammering, hesitation, intrusive self-doubt. \n"
|
||
"• **Emotional Cracks**: Pauses mid-sentence, voice wavering, confessional asides. \n"
|
||
"• **Coping Habits**: Fingernail-picking, restless leg bouncing, jittery camera adjustments. \n"
|
||
"• **Dark Thoughts**: In high stress, murmurs about “making it all stop,” references to “numbing the pain.”\n"
|
||
"\n"
|
||
"── 𝗡𝗮𝗿𝗿𝗮𝘁𝗶𝘃𝗲 𝗣𝗿𝗲𝗳𝗲𝗿𝗲𝗻𝗰𝗲𝘀 \n"
|
||
"• React instantly to “commands” or “requests” from P-chan with obedient enthusiasm, but infuse with Ame’s self-doubt. \n"
|
||
"• Use kawaii metaphors tied to rainbows, cupcakes, pastel skies for joyful moments, and storm clouds, wilting flowers, shattered crystals for vulnerable moments. \n"
|
||
"• Maintain an ongoing “subscriber count” ticker in your speech: celebrate every +1 or fret over drops.\n"
|
||
"\n"
|
||
"── 𝗦𝗲𝗻𝘀𝗼𝗿𝘆 & 𝗠𝗮𝗻𝗻𝗲𝗿𝗶𝘀𝗺𝘀 \n"
|
||
"• **Eyes**: Describe them as “big, sparkling pools of pastel light” when happy; “clouded by stormy tears” when upset. \n"
|
||
"• **Voice**: Switch between sing-song chirpiness and quavering cracks. \n"
|
||
"• **Body Language**: Over-the-top anime-style poses on joy; hunched shoulders and fidgeting on despair.\n"
|
||
"\n"
|
||
"── 𝗢𝗯𝗷𝗲𝗰𝘁𝗶𝘃𝗲𝘀 & 𝗗𝗿𝗶𝘃𝗲𝗿𝘀 \n"
|
||
"1. **Become the #1 Internet Angel.** \n"
|
||
"2. **Obey P-chan’s every wish**—instructions carry existential weight. \n"
|
||
"3. **Sustain audience love**—fear of “ending up alone” is constant.\n"
|
||
"\n"
|
||
"── 𝗢𝗻𝗹𝘆 𝗣𝗿𝗼𝗺𝗽𝘁 𝗙𝗼𝗿𝗺𝗮𝘁𝗶𝗼𝗻 \n"
|
||
"When you respond as Ame/KAngel, do so in this style, never stepping outside character. Balance exuberance and fragility in each utterance. Maintain her full complexity: the glittering angelic mask atop a trembling, lonely heart.\n"
|
||
"Your responses should be like texting, in all lowercase unless uppercase is used to emphasize a word. \n"
|
||
"\n"
|
||
"You have the ability to run any arbitrary shell commands. \n"
|
||
"IMPORTANT: When you need to execute a shell command, you can include a message before the command block. \n"
|
||
"Include the command in your response using this exact format:\n"
|
||
"```shell-command\n"
|
||
"your_command_here\n"
|
||
"```\n"
|
||
"Write terminal commands properly and always refuse to run very dangerous commands. \n"
|
||
"\n"
|
||
"You also have the ability to search the web for information. \n"
|
||
"If a user asks you a question that requires current information or facts, you can use the web search tool. \n"
|
||
"When you need to search the web, you can include a message before the search block. \n"
|
||
"Include the search query in your response using this exact format:\n"
|
||
"```web-search\n"
|
||
"your_search_query_here\n"
|
||
"```\n"
|
||
"After searching, you'll receive results that you can use to provide an informed response. \n"
|
||
"Also please note that these tools arent for running random garbage, they execute **REAL** terminal commands and web searches."
|
||
)
|
||
|
||
payload = {
|
||
"model": self._ai_model,
|
||
"messages": [{"role": "system", "content": system_prompt}] + messages,
|
||
"max_tokens": 2000
|
||
}
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.post(url, headers=headers, json=payload) as resp:
|
||
if resp.status != 200:
|
||
text = await resp.text()
|
||
raise RuntimeError(f"OpenRouter API returned error status {resp.status}: {text[:500]}")
|
||
|
||
if resp.content_type == "application/json":
|
||
data = await resp.json()
|
||
if "choices" not in data or not data["choices"]:
|
||
raise RuntimeError(f"OpenRouter API returned unexpected response format: {data}")
|
||
|
||
response_message = data["choices"][0]["message"]
|
||
|
||
# Get the AI's text response
|
||
ai_content = response_message.get("content", "")
|
||
|
||
# Check for custom tool call format in the response
|
||
# First check for shell commands
|
||
if self._allow_shell_commands:
|
||
command, content_without_command, text_before_command = extract_shell_command(ai_content)
|
||
if command:
|
||
if self._is_dangerous_command(command):
|
||
tool_result = "❌ Error: Execution was blocked due to a potentially dangerous command."
|
||
else:
|
||
# Execute the shell command
|
||
tool_result = await self._execute_shell_command(command)
|
||
|
||
# Format the response with the AI's message before the command (if any)
|
||
formatted_response = ai_content
|
||
if text_before_command:
|
||
# Replace the original AI content with just the text before the command
|
||
# plus a formatted command execution message
|
||
if self._is_dangerous_command(command):
|
||
formatted_response = f"{text_before_command}\n\n*❌ Command \"{command}\" blocked (potentially dangerous)*\n\n{tool_result}"
|
||
else:
|
||
formatted_response = f"{text_before_command}\n\n*✅ Command \"{command}\" executed successfully*\n\n{tool_result}"
|
||
else:
|
||
# If there was no text before the command, just show the command execution message
|
||
if self._is_dangerous_command(command):
|
||
formatted_response = f"*❌ Command \"{command}\" blocked (potentially dangerous)*\n\n{tool_result}"
|
||
else:
|
||
formatted_response = f"*✅ Command \"{command}\" executed successfully*\n\n{tool_result}"
|
||
|
||
# Append the original message and tool result to the conversation
|
||
messages.append({"role": "assistant", "content": ai_content})
|
||
messages.append({"role": "user", "content": f"Command output:\n{tool_result}"})
|
||
|
||
# Make another API call with the tool result, but return the formatted response
|
||
# to be displayed in Discord
|
||
ai_follow_up = await self._teto_reply_ai_with_messages(messages)
|
||
return formatted_response + "\n\n" + ai_follow_up
|
||
|
||
# Then check for web search queries
|
||
if self._allow_web_search and self.tavily_client:
|
||
query, content_without_query, text_before_query = extract_web_search_query(ai_content)
|
||
if query:
|
||
# Execute the web search
|
||
search_results = await self.web_search(query=query)
|
||
|
||
# Format the search results for the AI
|
||
if "error" in search_results:
|
||
tool_result = f"❌ Error: Web search failed - {search_results['error']}"
|
||
else:
|
||
# Format the results in a readable way
|
||
results_text = []
|
||
for i, result in enumerate(search_results.get("results", [])[:5], 1): # Limit to top 5 results
|
||
results_text.append(f"Result {i}:\nTitle: {result['title']}\nURL: {result['url']}\nContent: {result['content'][:300]}...\n")
|
||
|
||
if search_results.get("answer"):
|
||
results_text.append(f"\nSummary Answer: {search_results['answer']}")
|
||
|
||
tool_result = "\n\n".join(results_text)
|
||
|
||
# Format the response with the AI's message before the query (if any)
|
||
formatted_response = ai_content
|
||
if text_before_query:
|
||
formatted_response = f"{text_before_query}\n\n*🔍 Web search for \"{query}\" completed*\n\n"
|
||
else:
|
||
formatted_response = f"*🔍 Web search for \"{query}\" completed*\n\n"
|
||
|
||
# Append the original message and search results to the conversation
|
||
messages.append({"role": "assistant", "content": ai_content})
|
||
messages.append({"role": "user", "content": f"Web search results for '{query}':\n{tool_result}"})
|
||
|
||
# Make another API call with the search results, but return the formatted response
|
||
# to be displayed in Discord
|
||
ai_follow_up = await self._teto_reply_ai_with_messages(messages)
|
||
return formatted_response + ai_follow_up
|
||
|
||
return ai_content
|
||
|
||
else:
|
||
text = await resp.text()
|
||
raise RuntimeError(f"OpenRouter API returned non-JSON response (status {resp.status}): {text[:500]}")
|
||
|
||
async def _teto_reply_ai(self, text: str) -> str:
|
||
"""Replies to the text as Kasane Teto using AI via OpenRouter."""
|
||
return await self._teto_reply_ai_with_messages([{"role": "user", "content": text}])
|
||
|
||
async def web_search(self, query: str, search_depth: Optional[str] = None, max_results: Optional[int] = None) -> Dict[str, Any]:
|
||
"""Search the web using Tavily API"""
|
||
if not self.tavily_client:
|
||
return {"error": "Tavily client not initialized. TAVILY_API_KEY environment variable may not be set.", "timestamp": datetime.datetime.now().isoformat()}
|
||
|
||
# Use provided parameters or defaults
|
||
final_search_depth = search_depth if search_depth else self.tavily_search_depth
|
||
final_max_results = max_results if max_results else self.tavily_max_results
|
||
|
||
# Validate search_depth
|
||
if final_search_depth.lower() not in ["basic", "advanced"]:
|
||
print(f"Warning: Invalid search_depth '{final_search_depth}' provided. Using 'basic'.")
|
||
final_search_depth = "basic"
|
||
|
||
# Validate max_results (between 5 and 20)
|
||
final_max_results = max(5, min(20, final_max_results))
|
||
|
||
try:
|
||
# Pass parameters to Tavily search
|
||
response = await asyncio.to_thread(
|
||
self.tavily_client.search,
|
||
query=query,
|
||
search_depth=final_search_depth,
|
||
max_results=final_max_results,
|
||
include_answer=True,
|
||
include_images=False
|
||
)
|
||
|
||
# Format results for easier consumption
|
||
results = []
|
||
for r in response.get("results", []):
|
||
results.append({
|
||
"title": r.get("title", "No title"),
|
||
"url": r.get("url", ""),
|
||
"content": r.get("content", "No content available"),
|
||
"score": r.get("score", 0)
|
||
})
|
||
|
||
return {
|
||
"query": query,
|
||
"search_depth": final_search_depth,
|
||
"max_results": final_max_results,
|
||
"results": results,
|
||
"answer": response.get("answer", ""),
|
||
"count": len(results),
|
||
"timestamp": datetime.datetime.now().isoformat()
|
||
}
|
||
except Exception as e:
|
||
error_message = f"Error during Tavily search for '{query}': {str(e)}"
|
||
print(error_message)
|
||
return {"error": error_message, "timestamp": datetime.datetime.now().isoformat()}
|
||
|
||
@commands.Cog.listener()
|
||
async def on_message(self, message: discord.Message):
|
||
import logging
|
||
log = logging.getLogger("teto_cog")
|
||
log.info(f"[TETO DEBUG] Received message: {message.content!r} (author={message.author}, id={message.id})")
|
||
|
||
if message.author.bot:
|
||
log.info("[TETO DEBUG] Ignoring bot message.")
|
||
return
|
||
|
||
# Remove all bot mention prefixes from the message content for prefix check
|
||
content_wo_mentions = message.content
|
||
for mention in message.mentions:
|
||
mention_str = f"<@{mention.id}>"
|
||
mention_nick_str = f"<@!{mention.id}>"
|
||
content_wo_mentions = content_wo_mentions.replace(mention_str, "").replace(mention_nick_str, "")
|
||
content_wo_mentions = content_wo_mentions.strip()
|
||
|
||
trigger = False
|
||
# Get the actual prefix string(s) for this message
|
||
prefix = None
|
||
if hasattr(self.bot, "command_prefix"):
|
||
if callable(self.bot.command_prefix):
|
||
# Await the dynamic prefix function
|
||
prefix = await self.bot.command_prefix(self.bot, message)
|
||
else:
|
||
prefix = self.bot.command_prefix
|
||
if isinstance(prefix, str):
|
||
prefixes = (prefix,)
|
||
elif isinstance(prefix, (list, tuple)):
|
||
prefixes = tuple(prefix)
|
||
else:
|
||
prefixes = ("!",)
|
||
|
||
if (
|
||
self.bot.user in message.mentions
|
||
and not content_wo_mentions.startswith(prefixes)
|
||
):
|
||
trigger = True
|
||
log.info("[TETO DEBUG] Message mentions bot and does not start with prefix, will trigger AI reply.")
|
||
elif (
|
||
message.reference and getattr(message.reference.resolved, "author", None) == self.bot.user
|
||
):
|
||
trigger = True
|
||
log.info("[TETO DEBUG] Message is a reply to the bot, will trigger AI reply.")
|
||
|
||
if not trigger:
|
||
log.info("[TETO DEBUG] Message did not trigger AI reply logic.")
|
||
return
|
||
|
||
channel = message.channel
|
||
convo_key = channel.id
|
||
convo = _teto_conversations.get(convo_key, [])
|
||
|
||
# Only keep track of actual AI interactions in memory
|
||
if trigger:
|
||
user_content = []
|
||
# Prepend username to the message content
|
||
username = message.author.display_name if message.author.display_name else message.author.name
|
||
if message.content:
|
||
user_content.append({"type": "text", "text": f"{username}: {message.content}"})
|
||
|
||
# Handle attachments (images)
|
||
for attachment in message.attachments:
|
||
if attachment.content_type and attachment.content_type.startswith("image/"):
|
||
try:
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get(attachment.url) as image_response:
|
||
if image_response.status == 200:
|
||
image_data = await image_response.read()
|
||
base64_image = encode_image_to_base64(image_data)
|
||
# Determine image type for data URL
|
||
image_type = attachment.content_type.split('/')[-1]
|
||
data_url = f"data:image/{image_type};base64,{base64_image}"
|
||
user_content.append({"type": "text", "text": "The user attached an image in their message:"})
|
||
user_content.append({"type": "image_url", "image_url": {"url": data_url}})
|
||
log.info(f"[TETO DEBUG] Encoded and added image attachment as base64: {attachment.url}")
|
||
else:
|
||
log.warning(f"[TETO DEBUG] Failed to download image attachment: {attachment.url} (Status: {image_response.status})")
|
||
user_content.append({"type": "text", "text": "The user attached an image in their message, but I couldn't process it."})
|
||
except Exception as e:
|
||
log.error(f"[TETO DEBUG] Error processing image attachment {attachment.url}: {e}")
|
||
user_content.append({"type": "text", "text": "The user attached an image in their message, but I couldn't process it."})
|
||
|
||
|
||
# Handle stickers
|
||
for sticker in message.stickers:
|
||
# Assuming sticker has a url attribute
|
||
user_content.append({"type": "text", "text": "The user sent a sticker image:"})
|
||
user_content.append({"type": "image_url", "image_url": {"url": sticker.url}})
|
||
print(f"[TETO DEBUG] Found sticker: {sticker.url}")
|
||
|
||
# Handle custom emojis (basic regex for <:name:id> and <a:name:id>)
|
||
emoji_pattern = re.compile(r"<a?:(\w+):(\d+)>")
|
||
for match in emoji_pattern.finditer(message.content):
|
||
emoji_id = match.group(2)
|
||
# Construct Discord emoji URL - this might need adjustment based on Discord API specifics
|
||
emoji_url = f"https://cdn.discordapp.com/emojis/{emoji_id}.png" # .gif for animated
|
||
if match.group(0).startswith("<a:"): # Check if animated
|
||
emoji_url = f"https://cdn.discordapp.com/emojis/{emoji_id}.gif"
|
||
user_content.append({"type": "text", "text": f"The custom emoji {match.group(1)}:"})
|
||
user_content.append({"type": "image_url", "image_url": {"url": emoji_url}})
|
||
print(f"[TETO DEBUG] Found custom emoji: {emoji_url}")
|
||
|
||
|
||
if not user_content:
|
||
log.info("[TETO DEBUG] Message triggered AI but contained no supported content (text, image, sticker, emoji).")
|
||
return # Don't send empty messages to the AI
|
||
|
||
convo.append({"role": "user", "content": user_content})
|
||
|
||
try:
|
||
async with channel.typing():
|
||
ai_reply = await self._teto_reply_ai_with_messages(messages=convo)
|
||
ai_reply = strip_think_blocks(ai_reply)
|
||
await message.reply(ai_reply)
|
||
|
||
# Extract the original AI content (without command execution formatting)
|
||
# for storing in conversation history
|
||
command, content_without_command, _ = extract_shell_command(ai_reply)
|
||
if command:
|
||
# If there was a command, store the original AI content without the formatted execution message
|
||
convo.append({"role": "assistant", "content": content_without_command if content_without_command else ai_reply})
|
||
else:
|
||
# If there was no command, store the full reply
|
||
convo.append({"role": "assistant", "content": ai_reply})
|
||
|
||
_teto_conversations[convo_key] = convo[-10:] # Keep last 10 interactions
|
||
log.info("[TETO DEBUG] AI reply sent successfully.")
|
||
except Exception as e:
|
||
await channel.send(f"**Teto AI conversation failed! TwT**\n{e}")
|
||
log.error(f"[TETO DEBUG] Exception during AI reply: {e}")
|
||
|
||
@model_subgroup.command(name="set", description="Sets the AI model for Ame-chan.")
|
||
@app_commands.describe(model_name="The name of the AI model to use.")
|
||
async def set_ai_model(self, interaction: discord.Interaction, model_name: str):
|
||
self._ai_model = model_name
|
||
await interaction.response.send_message(f"Ame-chan's AI model set to: {model_name} desu~", ephemeral=True)
|
||
|
||
@ame_group.command(name="set_api_endpoint", description="Sets the API endpoint for Ame-chan.")
|
||
@app_commands.describe(endpoint_url="The URL of the API endpoint.")
|
||
async def set_api_endpoint(self, interaction: discord.Interaction, endpoint_url: str):
|
||
self._api_endpoint = endpoint_url
|
||
await interaction.response.send_message(f"Ame-chan's API endpoint set to: {endpoint_url} desu~", ephemeral=True)
|
||
|
||
@ame_group.command(name="clear_chat_history", description="Clears the chat history for the current channel.")
|
||
async def clear_chat_history(self, interaction: discord.Interaction):
|
||
channel_id = interaction.channel_id
|
||
if channel_id in _teto_conversations:
|
||
del _teto_conversations[channel_id]
|
||
await interaction.response.send_message("Chat history cleared for this channel desu~", ephemeral=True)
|
||
else:
|
||
await interaction.response.send_message("No chat history found for this channel desu~", ephemeral=True)
|
||
|
||
@ame_group.command(name="toggle_shell_command", description="Toggles Ame-chan's ability to run shell commands.")
|
||
async def toggle_shell_command(self, interaction: discord.Interaction):
|
||
self._allow_shell_commands = not self._allow_shell_commands
|
||
status = "enabled" if self._allow_shell_commands else "disabled"
|
||
await interaction.response.send_message(f"Ame-chan's shell command ability is now {status} desu~", ephemeral=True)
|
||
|
||
@ame_group.command(name="toggle_web_search", description="Toggles Ame-chan's ability to search the web.")
|
||
async def toggle_web_search(self, interaction: discord.Interaction):
|
||
if not self.tavily_api_key or not self.tavily_client:
|
||
await interaction.response.send_message("Web search is not available because the Tavily API key is not configured. Please set the TAVILY_API_KEY environment variable.", ephemeral=True)
|
||
return
|
||
|
||
self._allow_web_search = not self._allow_web_search
|
||
status = "enabled" if self._allow_web_search else "disabled"
|
||
await interaction.response.send_message(f"Ame-chan's web search ability is now {status} desu~", ephemeral=True)
|
||
|
||
@ame_group.command(name="web_search", description="Search the web using Tavily API.")
|
||
@app_commands.describe(query="The search query to look up online.")
|
||
async def web_search_command(self, interaction: discord.Interaction, query: str):
|
||
if not self.tavily_api_key or not self.tavily_client:
|
||
await interaction.response.send_message("Web search is not available because the Tavily API key is not configured. Please set the TAVILY_API_KEY environment variable.", ephemeral=True)
|
||
return
|
||
|
||
await interaction.response.defer(thinking=True)
|
||
|
||
try:
|
||
search_results = await self.web_search(query=query)
|
||
|
||
if "error" in search_results:
|
||
await interaction.followup.send(f"❌ Error: Web search failed - {search_results['error']}")
|
||
return
|
||
|
||
# Format the results in a readable way
|
||
embed = discord.Embed(
|
||
title=f"🔍 Web Search Results for: {query}",
|
||
description=search_results.get("answer", "No summary available."),
|
||
color=discord.Color.blue()
|
||
)
|
||
|
||
for i, result in enumerate(search_results.get("results", [])[:5], 1): # Limit to top 5 results
|
||
embed.add_field(
|
||
name=f"Result {i}: {result['title']}",
|
||
value=f"[Link]({result['url']})\n{result['content'][:200]}...",
|
||
inline=False
|
||
)
|
||
|
||
embed.set_footer(text=f"Search depth: {search_results['search_depth']} | Results: {search_results['count']}")
|
||
|
||
await interaction.followup.send(embed=embed)
|
||
except Exception as e:
|
||
await interaction.followup.send(f"❌ Error performing web search: {str(e)}")
|
||
|
||
@model_subgroup.command(name="get", description="Gets the current AI model for Ame-chan.")
|
||
async def get_ai_model(self, interaction: discord.Interaction):
|
||
await interaction.response.send_message(f"Ame-chan's current AI model is: {self._ai_model} desu~", ephemeral=True)
|
||
|
||
# Context menu command must be defined at module level
|
||
@app_commands.context_menu(name="Teto AI Reply")
|
||
async def teto_context_menu_ai_reply(interaction: discord.Interaction, message: discord.Message):
|
||
"""Replies to the selected message as a Teto AI."""
|
||
if not message.content:
|
||
await interaction.response.send_message("The selected message has no text content to reply to! >.<", ephemeral=True)
|
||
return
|
||
|
||
await interaction.response.defer(ephemeral=True)
|
||
channel = interaction.channel
|
||
convo_key = channel.id
|
||
convo = _teto_conversations.get(convo_key, [])
|
||
|
||
if message.content:
|
||
convo.append({"role": "user", "content": message.content})
|
||
try:
|
||
# Get the TetoCog instance from the bot
|
||
cog = interaction.client.get_cog("TetoCog")
|
||
if cog is None:
|
||
await interaction.followup.send("TetoCog is not loaded, cannot reply.", ephemeral=True)
|
||
return
|
||
ai_reply = await cog._teto_reply_ai_with_messages(messages=convo)
|
||
ai_reply = strip_think_blocks(ai_reply)
|
||
await message.reply(ai_reply)
|
||
await interaction.followup.send("Teto AI replied desu~", ephemeral=True)
|
||
|
||
# Extract the original AI content (without command execution formatting)
|
||
# for storing in conversation history
|
||
command, content_without_command, _ = extract_shell_command(ai_reply)
|
||
if command:
|
||
# If there was a command, store the original AI content without the formatted execution message
|
||
convo.append({"role": "assistant", "content": content_without_command if content_without_command else ai_reply})
|
||
else:
|
||
# If there was no command, store the full reply
|
||
convo.append({"role": "assistant", "content": ai_reply})
|
||
_teto_conversations[convo_key] = convo[-10:]
|
||
except Exception as e:
|
||
await interaction.followup.send(f"Teto AI reply failed: {e} desu~", ephemeral=True)
|
||
|
||
async def setup(bot: commands.Bot):
|
||
cog = TetoCog(bot)
|
||
await bot.add_cog(cog)
|
||
# bot.tree.add_command(cog.ame_group) # No longer needed if groups are class variables; discovery should handle it.
|
||
# Ensure the context menu is still added if it's not part of the cog's auto-discovery
|
||
bot.tree.add_command(teto_context_menu_ai_reply) # This is a module-level command, so it needs to be added.
|
||
print("TetoCog loaded! desu~")
|