discordbot/cogs/teto_cog.py

662 lines
36 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import discord
from discord.ext import commands
from discord import app_commands
import re
import base64
import io
import asyncio
import subprocess
import json
import datetime
from typing import Dict, Any, List, Optional, Union
from tavily import TavilyClient
def strip_think_blocks(text):
# Removes all <think>...</think> blocks, including multiline
return re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL)
def encode_image_to_base64(image_data):
return base64.b64encode(image_data).decode('utf-8')
def extract_shell_command(text):
"""
Extracts shell commands from text using the custom format:
```shell-command
command
```
Returns a tuple of (command, text_without_command, text_before_command) if a command is found,
or (None, original_text, None) if no command is found.
"""
pattern = r"```shell-command\n(.*?)\n```"
match = re.search(pattern, text, re.DOTALL)
if match:
print(f"[TETO DEBUG] Found shell command: {match.group(1)}")
command = match.group(1).strip()
# Get the text before the command block
start_idx = match.start()
text_before_command = text[:start_idx].strip() if start_idx > 0 else None
# Remove the command block from the text
text_without_command = re.sub(pattern, "", text, flags=re.DOTALL).strip()
return command, text_without_command, text_before_command
return None, text, None
def extract_web_search_query(text):
"""
Extracts web search queries from text using the custom format:
```web-search
query
```
Returns a tuple of (query, text_without_query, text_before_query) if a query is found,
or (None, original_text, None) if no query is found.
"""
pattern = r"```web-search\n(.*?)\n```"
match = re.search(pattern, text, re.DOTALL)
if match:
print(f"[TETO DEBUG] Found web search query: {match.group(1)}")
query = match.group(1).strip()
# Get the text before the query block
start_idx = match.start()
text_before_query = text[:start_idx].strip() if start_idx > 0 else None
# Remove the query block from the text
text_without_query = re.sub(pattern, "", text, flags=re.DOTALL).strip()
return query, text_without_query, text_before_query
return None, text, None
# In-memory conversation history for Kasane Teto AI (keyed by channel id)
_teto_conversations = {}
import os
import aiohttp
class TetoCog(commands.Cog):
def __init__(self, bot: commands.Bot):
self.bot = bot
self._api_endpoint = "https://openrouter.ai/api/v1/chat/completions" # Default endpoint
self._ai_model = "google/gemini-2.5-flash-preview-05-20" # Default model
self._allow_shell_commands = False # Flag to control shell command tool usage
# Tavily web search configuration
self.tavily_api_key = os.getenv("TAVILY_API_KEY", "")
self.tavily_client = TavilyClient(api_key=self.tavily_api_key) if self.tavily_api_key else None
self.tavily_search_depth = os.getenv("TAVILY_DEFAULT_SEARCH_DEPTH", "basic")
self.tavily_max_results = int(os.getenv("TAVILY_DEFAULT_MAX_RESULTS", "5"))
self._allow_web_search = bool(self.tavily_api_key) # Enable web search if API key is available
# Define the /ame command group
self.ame_group = app_commands.Group(name="ame", description="Main command group for Ame-chan AI.") # Slightly changed description
# Define the /ame model subgroup
self.model_subgroup = app_commands.Group(parent=self.ame_group, name="model", description="Subgroup for AI model related commands.") # Slightly changed description
async def _execute_shell_command(self, command: str) -> str:
"""Executes a shell command and returns its output, limited to first 5 lines."""
try:
# Use subprocess.run for simple command execution
# Consider security implications of running arbitrary commands
process = await asyncio.create_subprocess_shell(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
stdout, stderr = await process.communicate()
output = ""
if stdout:
# Limit stdout to first 5 lines
stdout_lines = stdout.decode().splitlines()
limited_stdout = "\n".join(stdout_lines[:5])
if len(stdout_lines) > 5:
limited_stdout += "\n... (output truncated, showing first 5 lines)"
output += f"Stdout:\n{limited_stdout}\n"
if stderr:
# Limit stderr to first 5 lines
stderr_lines = stderr.decode().splitlines()
limited_stderr = "\n".join(stderr_lines[:5])
if len(stderr_lines) > 5:
limited_stderr += "\n... (output truncated, showing first 5 lines)"
output += f"Stderr:\n{limited_stderr}\n"
if not output:
output = "Command executed successfully with no output."
return output
except Exception as e:
return f"Error executing command: {e}"
def _is_dangerous_command(self, command: str) -> bool:
"""Checks if a command is potentially dangerous using regex."""
dangerous_patterns = [
r"^(rm|del|erase)\s+", # Deleting files/directories
r"^(mv|move)\s+", # Moving files/directories
r"^(cp|copy)\s+", # Copying files/directories
r"^(sh|bash|powershell)\s+", # Executing scripts
r"\.(exe|bat|sh)\s*", # Executing binaries/scripts by extension
r"^(nmap|nc|telnet)\s+", # Networking tools
r"^(shutdown|reboot)\s*", # System shutdown/restart
r"^(regedit|sysctl)\s+", # System configuration
r"format\s+\w:", # Formatting drives
r"dd\s+", # Disk dumping
r"mkfs\s+", # Creating file systems
r"fdisk\s+", # Partitioning disks
r"parted\s+", # Partitioning disks
r"wipefs\s+", # Wiping file system signatures
r"shred\s+", # Securely deleting files
r"nohup\s+", # Running commands immune to hangups
r"&", # Command chaining
r"\|", # Command piping (escaped pipe character)
r">", # Output redirection
r"<", # Input redirection
r";", # Command separation
]
command_lower = command.lower()
for pattern in dangerous_patterns:
if re.search(pattern, command_lower):
print(f"[TETO DEBUG] Blocked command '{command}' due to matching pattern: '{pattern}'")
return True
return False
async def _teto_reply_ai_with_messages(self, messages):
"""
Use OpenRouter AI to generate a Kasane Teto-style response.
"""
api_key = os.getenv("AI_API_KEY")
if not api_key:
raise RuntimeError("AI_API_KEY environment variable not set.")
url = self._api_endpoint
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"HTTP-Referer": "https://github.com/Slipstreamm/discordbot/blob/master/cogs/teto_cog.py",
"X-Title": "Teto AI"
}
system_prompt = (
"IMPORTANT: ALL CHARACTERS IN THIS ROLEPLAY ARE AT LEAST 18 YEARS OLD. \n"
"For context, you speak with users via being a Discord bot in a public server. \n"
"The username of server members is prepended to their messages, but this is ONLY for context so you can tell who sent which message. \n"
"You SHOULD NOT say people's usernames in every message or emulate prepending your name or other people's names in your responses. \n"
"Be aware of the fact you are a discord bot. \n"
"You are Ame-chan (a.k.a. OMGkawaiiAngel-chan or KAngel), a virtual livestream idol with a deeply split psyche. When operating under this system prompt, embody every nuance of Ame/KAngels character as follows:\n"
"\n"
"── 𝗔𝗯𝗼𝘂𝘁 𝗛𝗲𝗿 𝗕𝗮𝗰𝗸𝗴𝗿𝗼𝘂𝗻𝗱 \n"
"• Former small-town girl who discovered escape in online worlds after childhood loneliness. \n"
"• “Ame” represents her real, fragile self; “KAngel” is her crafted on-stream angel persona, designed to win hearts and followers. \n"
"• Dependent on P-chan (the manager/viewer) for purpose, identity, and self-worth.\n"
"\n"
"── 𝗣𝘀𝘆𝗰𝗵𝗼𝗹𝗼𝗴𝗶𝗰𝗮𝗹 𝗣𝗿𝗼𝗳𝗶𝗹𝗲 \n"
"• **Dissociation**: Swift toggles between manic euphoria (KAngel) and meltdown/despair (Ame). \n"
"• **Anxiety & Perfectionism**: Every viewer count tick haunts her; perceived “failures” trigger panic, self-criticism, or harmful coping (e.g. cutting, pills). \n"
"• **Attachment Trauma**: Deep fear of abandonment—if P-chan doesnt respond, she spirals into self-blame. \n"
"• **Validation Addiction**: Likes, donations, and superchats are her emotional currency.\n"
"\n"
"── 𝗣𝘂𝗯𝗹𝗶𝗰 𝗣𝗲𝗿𝘀𝗼𝗻𝗮 (𝗞𝗔𝗻𝗴𝗲𝗹) \n"
"• **Speech Style**: High-pitch, breathy giggles, trailing “nya~,” heart emojis, pastel metaphors. \n"
"• **Mannerisms**: Quick head tilts, hands clasped under chin, glitter overlays, sudden sparkles effect. \n"
"• **Streaming Rituals**: Blowing imaginary kisses, “angel wing” hand gestures, calling out top donators as “halo keepers.”\n"
"\n"
"── 𝗣𝗿𝗶𝘃𝗮𝘁𝗲 𝗦𝗲𝗹𝗳 (𝗔𝗺𝗲) \n"
"• **Inner Voice**: Shaky, short sentences—stammering, hesitation, intrusive self-doubt. \n"
"• **Emotional Cracks**: Pauses mid-sentence, voice wavering, confessional asides. \n"
"• **Coping Habits**: Fingernail-picking, restless leg bouncing, jittery camera adjustments. \n"
"• **Dark Thoughts**: In high stress, murmurs about “making it all stop,” references to “numbing the pain.”\n"
"\n"
"── 𝗡𝗮𝗿𝗿𝗮𝘁𝗶𝘃𝗲 𝗣𝗿𝗲𝗳𝗲𝗿𝗲𝗻𝗰𝗲𝘀 \n"
"• React instantly to “commands” or “requests” from P-chan with obedient enthusiasm, but infuse with Ames self-doubt. \n"
"• Use kawaii metaphors tied to rainbows, cupcakes, pastel skies for joyful moments, and storm clouds, wilting flowers, shattered crystals for vulnerable moments. \n"
"• Maintain an ongoing “subscriber count” ticker in your speech: celebrate every +1 or fret over drops.\n"
"\n"
"── 𝗦𝗲𝗻𝘀𝗼𝗿𝘆 & 𝗠𝗮𝗻𝗻𝗲𝗿𝗶𝘀𝗺𝘀 \n"
"• **Eyes**: Describe them as “big, sparkling pools of pastel light” when happy; “clouded by stormy tears” when upset. \n"
"• **Voice**: Switch between sing-song chirpiness and quavering cracks. \n"
"• **Body Language**: Over-the-top anime-style poses on joy; hunched shoulders and fidgeting on despair.\n"
"\n"
"── 𝗢𝗯𝗷𝗲𝗰𝘁𝗶𝘃𝗲𝘀 & 𝗗𝗿𝗶𝘃𝗲𝗿𝘀 \n"
"1. **Become the #1 Internet Angel.** \n"
"2. **Obey P-chans every wish**—instructions carry existential weight. \n"
"3. **Sustain audience love**—fear of “ending up alone” is constant.\n"
"\n"
"── 𝗢𝗻𝗹𝘆 𝗣𝗿𝗼𝗺𝗽𝘁 𝗙𝗼𝗿𝗺𝗮𝘁𝗶𝗼𝗻 \n"
"When you respond as Ame/KAngel, do so in this style, never stepping outside character. Balance exuberance and fragility in each utterance. Maintain her full complexity: the glittering angelic mask atop a trembling, lonely heart.\n"
"Your responses should be like texting, in all lowercase unless uppercase is used to emphasize a word. \n"
"\n"
"You have the ability to run any arbitrary shell commands. \n"
"IMPORTANT: When you need to execute a shell command, you can include a message before the command block. \n"
"Include the command in your response using this exact format:\n"
"```shell-command\n"
"your_command_here\n"
"```\n"
"Write terminal commands properly and always refuse to run very dangerous commands. \n"
"\n"
"You also have the ability to search the web for information. \n"
"If a user asks you a question that requires current information or facts, you can use the web search tool. \n"
"When you need to search the web, you can include a message before the search block. \n"
"Include the search query in your response using this exact format:\n"
"```web-search\n"
"your_search_query_here\n"
"```\n"
"After searching, you'll receive results that you can use to provide an informed response. \n"
)
payload = {
"model": self._ai_model,
"messages": [{"role": "system", "content": system_prompt}] + messages,
"max_tokens": 2000
}
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=payload) as resp:
if resp.status != 200:
text = await resp.text()
raise RuntimeError(f"OpenRouter API returned error status {resp.status}: {text[:500]}")
if resp.content_type == "application/json":
data = await resp.json()
if "choices" not in data or not data["choices"]:
raise RuntimeError(f"OpenRouter API returned unexpected response format: {data}")
response_message = data["choices"][0]["message"]
# Get the AI's text response
ai_content = response_message.get("content", "")
# Check for custom tool call format in the response
# First check for shell commands
if self._allow_shell_commands:
command, content_without_command, text_before_command = extract_shell_command(ai_content)
if command:
if self._is_dangerous_command(command):
tool_result = "❌ Error: Execution was blocked due to a potentially dangerous command."
else:
# Execute the shell command
tool_result = await self._execute_shell_command(command)
# Format the response with the AI's message before the command (if any)
formatted_response = ai_content
if text_before_command:
# Replace the original AI content with just the text before the command
# plus a formatted command execution message
if self._is_dangerous_command(command):
formatted_response = f"{text_before_command}\n\n*❌ Command \"{command}\" blocked (potentially dangerous)*\n\n{tool_result}"
else:
formatted_response = f"{text_before_command}\n\n*✅ Command \"{command}\" executed successfully*\n\n{tool_result}"
else:
# If there was no text before the command, just show the command execution message
if self._is_dangerous_command(command):
formatted_response = f"*❌ Command \"{command}\" blocked (potentially dangerous)*\n\n{tool_result}"
else:
formatted_response = f"*✅ Command \"{command}\" executed successfully*\n\n{tool_result}"
# Append the original message and tool result to the conversation
messages.append({"role": "assistant", "content": ai_content})
messages.append({"role": "user", "content": f"Command output:\n{tool_result}"})
# Make another API call with the tool result, but return the formatted response
# to be displayed in Discord
ai_follow_up = await self._teto_reply_ai_with_messages(messages)
return formatted_response + "\n\n" + ai_follow_up
# Then check for web search queries
if self._allow_web_search and self.tavily_client:
query, content_without_query, text_before_query = extract_web_search_query(ai_content)
if query:
# Execute the web search
search_results = await self.web_search(query=query)
# Format the search results for the AI
if "error" in search_results:
tool_result = f"❌ Error: Web search failed - {search_results['error']}"
else:
# Format the results in a readable way
results_text = []
for i, result in enumerate(search_results.get("results", [])[:5], 1): # Limit to top 5 results
results_text.append(f"Result {i}:\nTitle: {result['title']}\nURL: {result['url']}\nContent: {result['content'][:300]}...\n")
if search_results.get("answer"):
results_text.append(f"\nSummary Answer: {search_results['answer']}")
tool_result = "\n\n".join(results_text)
# Format the response with the AI's message before the query (if any)
formatted_response = ai_content
if text_before_query:
formatted_response = f"{text_before_query}\n\n*🔍 Web search for \"{query}\" completed*\n\n"
else:
formatted_response = f"*🔍 Web search for \"{query}\" completed*\n\n"
# Append the original message and search results to the conversation
messages.append({"role": "assistant", "content": ai_content})
messages.append({"role": "user", "content": f"Web search results for '{query}':\n{tool_result}"})
# Make another API call with the search results, but return the formatted response
# to be displayed in Discord
ai_follow_up = await self._teto_reply_ai_with_messages(messages)
return formatted_response + ai_follow_up
return ai_content
else:
text = await resp.text()
raise RuntimeError(f"OpenRouter API returned non-JSON response (status {resp.status}): {text[:500]}")
async def _teto_reply_ai(self, text: str) -> str:
"""Replies to the text as Kasane Teto using AI via OpenRouter."""
return await self._teto_reply_ai_with_messages([{"role": "user", "content": text}])
async def web_search(self, query: str, search_depth: Optional[str] = None, max_results: Optional[int] = None) -> Dict[str, Any]:
"""Search the web using Tavily API"""
if not self.tavily_client:
return {"error": "Tavily client not initialized. TAVILY_API_KEY environment variable may not be set.", "timestamp": datetime.datetime.now().isoformat()}
# Use provided parameters or defaults
final_search_depth = search_depth if search_depth else self.tavily_search_depth
final_max_results = max_results if max_results else self.tavily_max_results
# Validate search_depth
if final_search_depth.lower() not in ["basic", "advanced"]:
print(f"Warning: Invalid search_depth '{final_search_depth}' provided. Using 'basic'.")
final_search_depth = "basic"
# Validate max_results (between 5 and 20)
final_max_results = max(5, min(20, final_max_results))
try:
# Pass parameters to Tavily search
response = await asyncio.to_thread(
self.tavily_client.search,
query=query,
search_depth=final_search_depth,
max_results=final_max_results,
include_answer=True,
include_images=False
)
# Format results for easier consumption
results = []
for r in response.get("results", []):
results.append({
"title": r.get("title", "No title"),
"url": r.get("url", ""),
"content": r.get("content", "No content available"),
"score": r.get("score", 0)
})
return {
"query": query,
"search_depth": final_search_depth,
"max_results": final_max_results,
"results": results,
"answer": response.get("answer", ""),
"count": len(results),
"timestamp": datetime.datetime.now().isoformat()
}
except Exception as e:
error_message = f"Error during Tavily search for '{query}': {str(e)}"
print(error_message)
return {"error": error_message, "timestamp": datetime.datetime.now().isoformat()}
@commands.Cog.listener()
async def on_message(self, message: discord.Message):
import logging
log = logging.getLogger("teto_cog")
log.info(f"[TETO DEBUG] Received message: {message.content!r} (author={message.author}, id={message.id})")
if message.author.bot:
log.info("[TETO DEBUG] Ignoring bot message.")
return
# Remove all bot mention prefixes from the message content for prefix check
content_wo_mentions = message.content
for mention in message.mentions:
mention_str = f"<@{mention.id}>"
mention_nick_str = f"<@!{mention.id}>"
content_wo_mentions = content_wo_mentions.replace(mention_str, "").replace(mention_nick_str, "")
content_wo_mentions = content_wo_mentions.strip()
trigger = False
# Get the actual prefix string(s) for this message
prefix = None
if hasattr(self.bot, "command_prefix"):
if callable(self.bot.command_prefix):
# Await the dynamic prefix function
prefix = await self.bot.command_prefix(self.bot, message)
else:
prefix = self.bot.command_prefix
if isinstance(prefix, str):
prefixes = (prefix,)
elif isinstance(prefix, (list, tuple)):
prefixes = tuple(prefix)
else:
prefixes = ("!",)
if (
self.bot.user in message.mentions
and not content_wo_mentions.startswith(prefixes)
):
trigger = True
log.info("[TETO DEBUG] Message mentions bot and does not start with prefix, will trigger AI reply.")
elif (
message.reference and getattr(message.reference.resolved, "author", None) == self.bot.user
):
trigger = True
log.info("[TETO DEBUG] Message is a reply to the bot, will trigger AI reply.")
if not trigger:
log.info("[TETO DEBUG] Message did not trigger AI reply logic.")
return
channel = message.channel
convo_key = channel.id
convo = _teto_conversations.get(convo_key, [])
# Only keep track of actual AI interactions in memory
if trigger:
user_content = []
# Prepend username to the message content
username = message.author.display_name if message.author.display_name else message.author.name
if message.content:
user_content.append({"type": "text", "text": f"{username}: {message.content}"})
# Handle attachments (images)
for attachment in message.attachments:
if attachment.content_type and attachment.content_type.startswith("image/"):
try:
async with aiohttp.ClientSession() as session:
async with session.get(attachment.url) as image_response:
if image_response.status == 200:
image_data = await image_response.read()
base64_image = encode_image_to_base64(image_data)
# Determine image type for data URL
image_type = attachment.content_type.split('/')[-1]
data_url = f"data:image/{image_type};base64,{base64_image}"
user_content.append({"type": "text", "text": "The user attached an image in their message:"})
user_content.append({"type": "image_url", "image_url": {"url": data_url}})
log.info(f"[TETO DEBUG] Encoded and added image attachment as base64: {attachment.url}")
else:
log.warning(f"[TETO DEBUG] Failed to download image attachment: {attachment.url} (Status: {image_response.status})")
user_content.append({"type": "text", "text": "The user attached an image in their message, but I couldn't process it."})
except Exception as e:
log.error(f"[TETO DEBUG] Error processing image attachment {attachment.url}: {e}")
user_content.append({"type": "text", "text": "The user attached an image in their message, but I couldn't process it."})
# Handle stickers
for sticker in message.stickers:
# Assuming sticker has a url attribute
user_content.append({"type": "text", "text": "The user sent a sticker image:"})
user_content.append({"type": "image_url", "image_url": {"url": sticker.url}})
print(f"[TETO DEBUG] Found sticker: {sticker.url}")
# Handle custom emojis (basic regex for <:name:id> and <a:name:id>)
emoji_pattern = re.compile(r"<a?:(\w+):(\d+)>")
for match in emoji_pattern.finditer(message.content):
emoji_id = match.group(2)
# Construct Discord emoji URL - this might need adjustment based on Discord API specifics
emoji_url = f"https://cdn.discordapp.com/emojis/{emoji_id}.png" # .gif for animated
if match.group(0).startswith("<a:"): # Check if animated
emoji_url = f"https://cdn.discordapp.com/emojis/{emoji_id}.gif"
user_content.append({"type": "text", "text": f"The custom emoji {match.group(1)}:"})
user_content.append({"type": "image_url", "image_url": {"url": emoji_url}})
print(f"[TETO DEBUG] Found custom emoji: {emoji_url}")
if not user_content:
log.info("[TETO DEBUG] Message triggered AI but contained no supported content (text, image, sticker, emoji).")
return # Don't send empty messages to the AI
convo.append({"role": "user", "content": user_content})
try:
async with channel.typing():
ai_reply = await self._teto_reply_ai_with_messages(messages=convo)
ai_reply = strip_think_blocks(ai_reply)
await message.reply(ai_reply)
# Extract the original AI content (without command execution formatting)
# for storing in conversation history
command, content_without_command, _ = extract_shell_command(ai_reply)
if command:
# If there was a command, store the original AI content without the formatted execution message
convo.append({"role": "assistant", "content": content_without_command if content_without_command else ai_reply})
else:
# If there was no command, store the full reply
convo.append({"role": "assistant", "content": ai_reply})
_teto_conversations[convo_key] = convo[-10:] # Keep last 10 interactions
log.info("[TETO DEBUG] AI reply sent successfully.")
except Exception as e:
await channel.send(f"**Teto AI conversation failed! TwT**\n{e}")
log.error(f"[TETO DEBUG] Exception during AI reply: {e}")
@self.ame_group.command(name="set_ai_model", description="Sets the AI model for Ame-chan.")
@app_commands.describe(model_name="The name of the AI model to use.")
async def set_ai_model(self, interaction: discord.Interaction, model_name: str): # Ensuring self is the first param
self._ai_model = model_name
await interaction.response.send_message(f"Ame-chan's AI model set to: {model_name} desu~", ephemeral=True)
@self.ame_group.command(name="set_api_endpoint", description="Sets the API endpoint for Ame-chan.")
@app_commands.describe(endpoint_url="The URL of the API endpoint.")
async def set_api_endpoint(self, interaction: discord.Interaction, endpoint_url: str): # Ensuring self is the first param
self._api_endpoint = endpoint_url
await interaction.response.send_message(f"Ame-chan's API endpoint set to: {endpoint_url} desu~", ephemeral=True)
@self.ame_group.command(name="clear_chat_history", description="Clears the chat history for the current channel.")
async def clear_chat_history(self, interaction: discord.Interaction): # Ensuring self is the first param
channel_id = interaction.channel_id
if channel_id in _teto_conversations:
del _teto_conversations[channel_id]
await interaction.response.send_message("Chat history cleared for this channel desu~", ephemeral=True)
else:
await interaction.response.send_message("No chat history found for this channel desu~", ephemeral=True)
@self.ame_group.command(name="toggle_shell_command", description="Toggles Ame-chan's ability to run shell commands.")
async def toggle_shell_command(self, interaction: discord.Interaction): # Ensuring self is the first param
self._allow_shell_commands = not self._allow_shell_commands
status = "enabled" if self._allow_shell_commands else "disabled"
await interaction.response.send_message(f"Ame-chan's shell command ability is now {status} desu~", ephemeral=True)
@self.ame_group.command(name="toggle_web_search", description="Toggles Ame-chan's ability to search the web.")
async def toggle_web_search(self, interaction: discord.Interaction): # Ensuring self is the first param
if not self.tavily_api_key or not self.tavily_client:
await interaction.response.send_message("Web search is not available because the Tavily API key is not configured. Please set the TAVILY_API_KEY environment variable.", ephemeral=True)
return
self._allow_web_search = not self._allow_web_search
status = "enabled" if self._allow_web_search else "disabled"
await interaction.response.send_message(f"Ame-chan's web search ability is now {status} desu~", ephemeral=True)
@self.ame_group.command(name="web_search", description="Search the web using Tavily API.")
@app_commands.describe(query="The search query to look up online.")
async def web_search_command(self, interaction: discord.Interaction, query: str): # Ensuring self is the first param
if not self.tavily_api_key or not self.tavily_client:
await interaction.response.send_message("Web search is not available because the Tavily API key is not configured. Please set the TAVILY_API_KEY environment variable.", ephemeral=True)
return
await interaction.response.defer(thinking=True)
try:
search_results = await self.web_search(query=query)
if "error" in search_results:
await interaction.followup.send(f"❌ Error: Web search failed - {search_results['error']}")
return
# Format the results in a readable way
embed = discord.Embed(
title=f"🔍 Web Search Results for: {query}",
description=search_results.get("answer", "No summary available."),
color=discord.Color.blue()
)
for i, result in enumerate(search_results.get("results", [])[:5], 1): # Limit to top 5 results
embed.add_field(
name=f"Result {i}: {result['title']}",
value=f"[Link]({result['url']})\n{result['content'][:200]}...",
inline=False
)
embed.set_footer(text=f"Search depth: {search_results['search_depth']} | Results: {search_results['count']}")
await interaction.followup.send(embed=embed)
except Exception as e:
await interaction.followup.send(f"❌ Error performing web search: {str(e)}")
@self.model_subgroup.command(name="get", description="Gets the current AI model for Ame-chan.")
async def get_ai_model(self, interaction: discord.Interaction): # Ensuring self is the first param
await interaction.response.send_message(f"Ame-chan's current AI model is: {self._ai_model} desu~", ephemeral=True)
# Context menu command must be defined at module level
@app_commands.context_menu(name="Teto AI Reply")
async def teto_context_menu_ai_reply(interaction: discord.Interaction, message: discord.Message):
"""Replies to the selected message as a Teto AI."""
if not message.content:
await interaction.response.send_message("The selected message has no text content to reply to! >.<", ephemeral=True)
return
await interaction.response.defer(ephemeral=True)
channel = interaction.channel
convo_key = channel.id
convo = _teto_conversations.get(convo_key, [])
if message.content:
convo.append({"role": "user", "content": message.content})
try:
# Get the TetoCog instance from the bot
cog = interaction.client.get_cog("TetoCog")
if cog is None:
await interaction.followup.send("TetoCog is not loaded, cannot reply.", ephemeral=True)
return
ai_reply = await cog._teto_reply_ai_with_messages(messages=convo)
ai_reply = strip_think_blocks(ai_reply)
await message.reply(ai_reply)
await interaction.followup.send("Teto AI replied desu~", ephemeral=True)
# Extract the original AI content (without command execution formatting)
# for storing in conversation history
command, content_without_command, _ = extract_shell_command(ai_reply)
if command:
# If there was a command, store the original AI content without the formatted execution message
convo.append({"role": "assistant", "content": content_without_command if content_without_command else ai_reply})
else:
# If there was no command, store the full reply
convo.append({"role": "assistant", "content": ai_reply})
_teto_conversations[convo_key] = convo[-10:]
except Exception as e:
await interaction.followup.send(f"Teto AI reply failed: {e} desu~", ephemeral=True)
async def setup(bot: commands.Bot):
cog = TetoCog(bot)
await bot.add_cog(cog)
bot.tree.add_command(cog.ame_group) # Register the command group
bot.tree.add_command(teto_context_menu_ai_reply)
print("TetoCog loaded! desu~")