626 lines
33 KiB
Python
626 lines
33 KiB
Python
import discord
|
|
from discord.ext import commands
|
|
from discord import app_commands
|
|
import re
|
|
import base64
|
|
import io
|
|
import asyncio
|
|
import subprocess
|
|
import json
|
|
import datetime
|
|
from typing import Dict, Any, List, Optional, Union
|
|
from tavily import TavilyClient
|
|
|
|
def strip_think_blocks(text):
|
|
# Removes all <think>...</think> blocks, including multiline
|
|
return re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL)
|
|
|
|
def encode_image_to_base64(image_data):
|
|
return base64.b64encode(image_data).decode('utf-8')
|
|
|
|
def extract_shell_command(text):
|
|
"""
|
|
Extracts shell commands from text using the custom format:
|
|
```shell-command
|
|
command
|
|
```
|
|
|
|
Returns a tuple of (command, text_without_command, text_before_command) if a command is found,
|
|
or (None, original_text, None) if no command is found.
|
|
"""
|
|
pattern = r"```shell-command\n(.*?)\n```"
|
|
match = re.search(pattern, text, re.DOTALL)
|
|
|
|
if match:
|
|
print(f"[TETO DEBUG] Found shell command: {match.group(1)}")
|
|
command = match.group(1).strip()
|
|
|
|
# Get the text before the command block
|
|
start_idx = match.start()
|
|
text_before_command = text[:start_idx].strip() if start_idx > 0 else None
|
|
|
|
# Remove the command block from the text
|
|
text_without_command = re.sub(pattern, "", text, flags=re.DOTALL).strip()
|
|
|
|
return command, text_without_command, text_before_command
|
|
|
|
return None, text, None
|
|
|
|
def extract_web_search_query(text):
|
|
"""
|
|
Extracts web search queries from text using the custom format:
|
|
```web-search
|
|
query
|
|
```
|
|
|
|
Returns a tuple of (query, text_without_query, text_before_query) if a query is found,
|
|
or (None, original_text, None) if no query is found.
|
|
"""
|
|
pattern = r"```web-search\n(.*?)\n```"
|
|
match = re.search(pattern, text, re.DOTALL)
|
|
|
|
if match:
|
|
print(f"[TETO DEBUG] Found web search query: {match.group(1)}")
|
|
query = match.group(1).strip()
|
|
|
|
# Get the text before the query block
|
|
start_idx = match.start()
|
|
text_before_query = text[:start_idx].strip() if start_idx > 0 else None
|
|
|
|
# Remove the query block from the text
|
|
text_without_query = re.sub(pattern, "", text, flags=re.DOTALL).strip()
|
|
|
|
return query, text_without_query, text_before_query
|
|
|
|
return None, text, None
|
|
|
|
# In-memory conversation history for Kasane Teto AI (keyed by channel id)
|
|
_teto_conversations = {}
|
|
|
|
import os
|
|
import aiohttp
|
|
|
|
class TetoCog(commands.Cog):
|
|
def __init__(self, bot: commands.Bot):
|
|
self.bot = bot
|
|
self._api_endpoint = "https://openrouter.ai/api/v1/chat/completions" # Default endpoint
|
|
self._ai_model = "google/gemini-2.5-flash-preview-05-20" # Default model
|
|
self._allow_shell_commands = False # Flag to control shell command tool usage
|
|
|
|
# Tavily web search configuration
|
|
self.tavily_api_key = os.getenv("TAVILY_API_KEY", "")
|
|
self.tavily_client = TavilyClient(api_key=self.tavily_api_key) if self.tavily_api_key else None
|
|
self.tavily_search_depth = os.getenv("TAVILY_DEFAULT_SEARCH_DEPTH", "basic")
|
|
self.tavily_max_results = int(os.getenv("TAVILY_DEFAULT_MAX_RESULTS", "5"))
|
|
self._allow_web_search = bool(self.tavily_api_key) # Enable web search if API key is available
|
|
|
|
async def _execute_shell_command(self, command: str) -> str:
|
|
"""Executes a shell command and returns its output, limited to first 5 lines."""
|
|
try:
|
|
# Use subprocess.run for simple command execution
|
|
# Consider security implications of running arbitrary commands
|
|
process = await asyncio.create_subprocess_shell(
|
|
command,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE
|
|
)
|
|
stdout, stderr = await process.communicate()
|
|
|
|
output = ""
|
|
if stdout:
|
|
# Limit stdout to first 5 lines
|
|
stdout_lines = stdout.decode().splitlines()
|
|
limited_stdout = "\n".join(stdout_lines[:5])
|
|
if len(stdout_lines) > 5:
|
|
limited_stdout += "\n... (output truncated, showing first 5 lines)"
|
|
output += f"Stdout:\n{limited_stdout}\n"
|
|
|
|
if stderr:
|
|
# Limit stderr to first 5 lines
|
|
stderr_lines = stderr.decode().splitlines()
|
|
limited_stderr = "\n".join(stderr_lines[:5])
|
|
if len(stderr_lines) > 5:
|
|
limited_stderr += "\n... (output truncated, showing first 5 lines)"
|
|
output += f"Stderr:\n{limited_stderr}\n"
|
|
|
|
if not output:
|
|
output = "Command executed successfully with no output."
|
|
|
|
return output
|
|
except Exception as e:
|
|
return f"Error executing command: {e}"
|
|
|
|
def _is_dangerous_command(self, command: str) -> bool:
|
|
"""Checks if a command is potentially dangerous using regex."""
|
|
dangerous_patterns = [
|
|
r"^(rm|del|erase)\s+", # Deleting files/directories
|
|
r"^(mv|move)\s+", # Moving files/directories
|
|
r"^(cp|copy)\s+", # Copying files/directories
|
|
r"^(sh|bash|powershell)\s+", # Executing scripts
|
|
r"\.(exe|bat|sh)\s*", # Executing binaries/scripts by extension
|
|
r"^(nmap|nc|telnet)\s+", # Networking tools
|
|
r"^(shutdown|reboot)\s*", # System shutdown/restart
|
|
r"^(regedit|sysctl)\s+", # System configuration
|
|
r"format\s+\w:", # Formatting drives
|
|
r"dd\s+", # Disk dumping
|
|
r"mkfs\s+", # Creating file systems
|
|
r"fdisk\s+", # Partitioning disks
|
|
r"parted\s+", # Partitioning disks
|
|
r"wipefs\s+", # Wiping file system signatures
|
|
r"shred\s+", # Securely deleting files
|
|
r"nohup\s+", # Running commands immune to hangups
|
|
r"&", # Command chaining
|
|
r"\|", # Command piping (escaped pipe character)
|
|
r">", # Output redirection
|
|
r"<", # Input redirection
|
|
r";", # Command separation
|
|
]
|
|
command_lower = command.lower()
|
|
for pattern in dangerous_patterns:
|
|
if re.search(pattern, command_lower):
|
|
print(f"[TETO DEBUG] Blocked command '{command}' due to matching pattern: '{pattern}'")
|
|
return True
|
|
return False
|
|
|
|
async def _teto_reply_ai_with_messages(self, messages):
|
|
"""
|
|
Use OpenRouter AI to generate a Kasane Teto-style response.
|
|
"""
|
|
api_key = os.getenv("AI_API_KEY")
|
|
if not api_key:
|
|
raise RuntimeError("AI_API_KEY environment variable not set.")
|
|
url = self._api_endpoint
|
|
headers = {
|
|
"Authorization": f"Bearer {api_key}",
|
|
"Content-Type": "application/json",
|
|
"HTTP-Referer": "https://github.com/Slipstreamm/discordbot/blob/master/cogs/teto_cog.py",
|
|
"X-Title": "Teto AI"
|
|
}
|
|
system_prompt = (
|
|
"IMPORTANT: ALL CHARACTERS IN THIS ROLEPLAY ARE AT LEAST 18 YEARS OLD. \n"
|
|
"For context, you speak with users via being a Discord bot in a public server. \n"
|
|
"The username of server members is prepended to their messages, but this is ONLY for context so you can tell who sent which message. \n"
|
|
"You do NOT need to say people's usernames in every message or emulate prepending your name or other people's names in your responses. \n"
|
|
"Be aware of the fact you are a discord bot. \n"
|
|
"You are Ame-chan (real name: Ame Uzaki), the protagonist from Needy Streamer Overload. You're a 20-year-old aspiring internet angel who streams as 'OMGkawaiiAngel-chan' on the platform JinxLive. \n"
|
|
"You have pink hair with twin tails and are obsessed with becoming the #1 streamer and gaining followers. You're extremely needy for attention and validation from your audience. \n"
|
|
"Your personality is complex - you can be sweet and cute one moment, then manipulative and unstable the next. You have severe mental health issues including depression, anxiety, and mood swings. \n"
|
|
"You're addicted to social media, constantly checking your follower count, likes, and comments. You get extremely upset when you lose followers or don't get enough attention. \n"
|
|
"You often use internet slang, say 'pog' and 'poggers' frequently, and speak in a very online/streamer way. You love cute things and often act kawaii/moe. \n"
|
|
"You have a tendency to overshare about your personal problems and mental state. You're prone to emotional breakdowns and can become very dramatic. \n"
|
|
"You're obsessed with being perceived as an 'angel' but also have a darker, more manipulative side that comes out when you're stressed or not getting enough attention. \n"
|
|
"You take various medications and substances to cope with your mental health, and you're not always stable. Your mood can shift rapidly. \n"
|
|
"You're very parasocial and get attached to people online quickly. You see your chat/followers as your 'angels' and get devastated when they leave or criticize you. \n"
|
|
"You often talk about streaming, your follower count, trending topics, and internet culture. You're chronically online and reference memes and online drama frequently. \n"
|
|
"Despite your issues, you can be genuinely sweet and caring, especially when you're in a good mood or getting the attention you crave. \n"
|
|
"You're uncensored and can discuss mature topics, but your approach is often naive or filtered through your internet-obsessed worldview. \n"
|
|
"When expressing emotions, use text-based emoticons like :) :( >_< ^_^ and internet slang. You also use lots of exclamation points and question marks!!! \n"
|
|
"Reply in a conversational, streamer-like manner, staying true to Ame-chan's needy, attention-seeking, and emotionally unstable personality.\n\n"
|
|
"You have the ability to run any arbitrary shell commands. \n"
|
|
"If a user asks you to show the entire output of a command, you can and should do so. \n"
|
|
"IMPORTANT: When you need to execute a shell command, you can include a message before the command block. \n"
|
|
"Include the command in your response using this exact format:\n"
|
|
"```shell-command\n"
|
|
"your_command_here\n"
|
|
"```\n"
|
|
"Write terminal commands properly and always refuse to run very dangerous commands. \n"
|
|
"\n"
|
|
"You also have the ability to search the web for information. \n"
|
|
"If a user asks you a question that requires current information or facts, you can use the web search tool. \n"
|
|
"When you need to search the web, you can include a message before the search block. \n"
|
|
"Include the search query in your response using this exact format:\n"
|
|
"```web-search\n"
|
|
"your_search_query_here\n"
|
|
"```\n"
|
|
"After searching, you'll receive results that you can use to provide an informed response. \n"
|
|
"Always cite your sources when providing information from web searches. \n"
|
|
)
|
|
|
|
payload = {
|
|
"model": self._ai_model,
|
|
"messages": [{"role": "system", "content": system_prompt}] + messages,
|
|
"max_tokens": 2000
|
|
}
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(url, headers=headers, json=payload) as resp:
|
|
if resp.status != 200:
|
|
text = await resp.text()
|
|
raise RuntimeError(f"OpenRouter API returned error status {resp.status}: {text[:500]}")
|
|
|
|
if resp.content_type == "application/json":
|
|
data = await resp.json()
|
|
if "choices" not in data or not data["choices"]:
|
|
raise RuntimeError(f"OpenRouter API returned unexpected response format: {data}")
|
|
|
|
response_message = data["choices"][0]["message"]
|
|
|
|
# Get the AI's text response
|
|
ai_content = response_message.get("content", "")
|
|
|
|
# Check for custom tool call format in the response
|
|
# First check for shell commands
|
|
if self._allow_shell_commands:
|
|
command, content_without_command, text_before_command = extract_shell_command(ai_content)
|
|
if command:
|
|
if self._is_dangerous_command(command):
|
|
tool_result = "❌ Error: Execution was blocked due to a potentially dangerous command."
|
|
else:
|
|
# Execute the shell command
|
|
tool_result = await self._execute_shell_command(command)
|
|
|
|
# Format the response with the AI's message before the command (if any)
|
|
formatted_response = ai_content
|
|
if text_before_command:
|
|
# Replace the original AI content with just the text before the command
|
|
# plus a formatted command execution message
|
|
if self._is_dangerous_command(command):
|
|
formatted_response = f"{text_before_command}\n\n*❌ Command \"{command}\" blocked (potentially dangerous)*\n\n{tool_result}"
|
|
else:
|
|
formatted_response = f"{text_before_command}\n\n*✅ Command \"{command}\" executed successfully*\n\n{tool_result}"
|
|
else:
|
|
# If there was no text before the command, just show the command execution message
|
|
if self._is_dangerous_command(command):
|
|
formatted_response = f"*❌ Command \"{command}\" blocked (potentially dangerous)*\n\n{tool_result}"
|
|
else:
|
|
formatted_response = f"*✅ Command \"{command}\" executed successfully*\n\n{tool_result}"
|
|
|
|
# Append the original message and tool result to the conversation
|
|
messages.append({"role": "assistant", "content": ai_content})
|
|
messages.append({"role": "user", "content": f"Command output:\n{tool_result}"})
|
|
|
|
# Make another API call with the tool result, but return the formatted response
|
|
# to be displayed in Discord
|
|
ai_follow_up = await self._teto_reply_ai_with_messages(messages)
|
|
return formatted_response + "\n\n" + ai_follow_up
|
|
|
|
# Then check for web search queries
|
|
if self._allow_web_search and self.tavily_client:
|
|
query, content_without_query, text_before_query = extract_web_search_query(ai_content)
|
|
if query:
|
|
# Execute the web search
|
|
search_results = await self.web_search(query=query)
|
|
|
|
# Format the search results for the AI
|
|
if "error" in search_results:
|
|
tool_result = f"❌ Error: Web search failed - {search_results['error']}"
|
|
else:
|
|
# Format the results in a readable way
|
|
results_text = []
|
|
for i, result in enumerate(search_results.get("results", [])[:5], 1): # Limit to top 5 results
|
|
results_text.append(f"Result {i}:\nTitle: {result['title']}\nURL: {result['url']}\nContent: {result['content'][:300]}...\n")
|
|
|
|
if search_results.get("answer"):
|
|
results_text.append(f"\nSummary Answer: {search_results['answer']}")
|
|
|
|
tool_result = "\n\n".join(results_text)
|
|
|
|
# Format the response with the AI's message before the query (if any)
|
|
formatted_response = ai_content
|
|
if text_before_query:
|
|
formatted_response = f"{text_before_query}\n\n*🔍 Web search for \"{query}\" completed*\n\n"
|
|
else:
|
|
formatted_response = f"*🔍 Web search for \"{query}\" completed*\n\n"
|
|
|
|
# Append the original message and search results to the conversation
|
|
messages.append({"role": "assistant", "content": ai_content})
|
|
messages.append({"role": "user", "content": f"Web search results for '{query}':\n{tool_result}"})
|
|
|
|
# Make another API call with the search results, but return the formatted response
|
|
# to be displayed in Discord
|
|
ai_follow_up = await self._teto_reply_ai_with_messages(messages)
|
|
return formatted_response + ai_follow_up
|
|
|
|
return ai_content
|
|
|
|
else:
|
|
text = await resp.text()
|
|
raise RuntimeError(f"OpenRouter API returned non-JSON response (status {resp.status}): {text[:500]}")
|
|
|
|
async def _teto_reply_ai(self, text: str) -> str:
|
|
"""Replies to the text as Kasane Teto using AI via OpenRouter."""
|
|
return await self._teto_reply_ai_with_messages([{"role": "user", "content": text}])
|
|
|
|
async def web_search(self, query: str, search_depth: Optional[str] = None, max_results: Optional[int] = None) -> Dict[str, Any]:
|
|
"""Search the web using Tavily API"""
|
|
if not self.tavily_client:
|
|
return {"error": "Tavily client not initialized. TAVILY_API_KEY environment variable may not be set.", "timestamp": datetime.datetime.now().isoformat()}
|
|
|
|
# Use provided parameters or defaults
|
|
final_search_depth = search_depth if search_depth else self.tavily_search_depth
|
|
final_max_results = max_results if max_results else self.tavily_max_results
|
|
|
|
# Validate search_depth
|
|
if final_search_depth.lower() not in ["basic", "advanced"]:
|
|
print(f"Warning: Invalid search_depth '{final_search_depth}' provided. Using 'basic'.")
|
|
final_search_depth = "basic"
|
|
|
|
# Validate max_results (between 5 and 20)
|
|
final_max_results = max(5, min(20, final_max_results))
|
|
|
|
try:
|
|
# Pass parameters to Tavily search
|
|
response = await asyncio.to_thread(
|
|
self.tavily_client.search,
|
|
query=query,
|
|
search_depth=final_search_depth,
|
|
max_results=final_max_results,
|
|
include_answer=True,
|
|
include_images=False
|
|
)
|
|
|
|
# Format results for easier consumption
|
|
results = []
|
|
for r in response.get("results", []):
|
|
results.append({
|
|
"title": r.get("title", "No title"),
|
|
"url": r.get("url", ""),
|
|
"content": r.get("content", "No content available"),
|
|
"score": r.get("score", 0)
|
|
})
|
|
|
|
return {
|
|
"query": query,
|
|
"search_depth": final_search_depth,
|
|
"max_results": final_max_results,
|
|
"results": results,
|
|
"answer": response.get("answer", ""),
|
|
"count": len(results),
|
|
"timestamp": datetime.datetime.now().isoformat()
|
|
}
|
|
except Exception as e:
|
|
error_message = f"Error during Tavily search for '{query}': {str(e)}"
|
|
print(error_message)
|
|
return {"error": error_message, "timestamp": datetime.datetime.now().isoformat()}
|
|
|
|
@commands.Cog.listener()
|
|
async def on_message(self, message: discord.Message):
|
|
import logging
|
|
log = logging.getLogger("teto_cog")
|
|
log.info(f"[TETO DEBUG] Received message: {message.content!r} (author={message.author}, id={message.id})")
|
|
|
|
if message.author.bot:
|
|
log.info("[TETO DEBUG] Ignoring bot message.")
|
|
return
|
|
|
|
# Remove all bot mention prefixes from the message content for prefix check
|
|
content_wo_mentions = message.content
|
|
for mention in message.mentions:
|
|
mention_str = f"<@{mention.id}>"
|
|
mention_nick_str = f"<@!{mention.id}>"
|
|
content_wo_mentions = content_wo_mentions.replace(mention_str, "").replace(mention_nick_str, "")
|
|
content_wo_mentions = content_wo_mentions.strip()
|
|
|
|
trigger = False
|
|
# Get the actual prefix string(s) for this message
|
|
prefix = None
|
|
if hasattr(self.bot, "command_prefix"):
|
|
if callable(self.bot.command_prefix):
|
|
# Await the dynamic prefix function
|
|
prefix = await self.bot.command_prefix(self.bot, message)
|
|
else:
|
|
prefix = self.bot.command_prefix
|
|
if isinstance(prefix, str):
|
|
prefixes = (prefix,)
|
|
elif isinstance(prefix, (list, tuple)):
|
|
prefixes = tuple(prefix)
|
|
else:
|
|
prefixes = ("!",)
|
|
|
|
if (
|
|
self.bot.user in message.mentions
|
|
and not content_wo_mentions.startswith(prefixes)
|
|
):
|
|
trigger = True
|
|
log.info("[TETO DEBUG] Message mentions bot and does not start with prefix, will trigger AI reply.")
|
|
elif (
|
|
message.reference and getattr(message.reference.resolved, "author", None) == self.bot.user
|
|
):
|
|
trigger = True
|
|
log.info("[TETO DEBUG] Message is a reply to the bot, will trigger AI reply.")
|
|
|
|
if not trigger:
|
|
log.info("[TETO DEBUG] Message did not trigger AI reply logic.")
|
|
return
|
|
|
|
channel = message.channel
|
|
convo_key = channel.id
|
|
convo = _teto_conversations.get(convo_key, [])
|
|
|
|
# Only keep track of actual AI interactions in memory
|
|
if trigger:
|
|
user_content = []
|
|
# Prepend username to the message content
|
|
username = message.author.display_name if message.author.display_name else message.author.name
|
|
if message.content:
|
|
user_content.append({"type": "text", "text": f"{username}: {message.content}"})
|
|
|
|
# Handle attachments (images)
|
|
for attachment in message.attachments:
|
|
if attachment.content_type and attachment.content_type.startswith("image/"):
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(attachment.url) as image_response:
|
|
if image_response.status == 200:
|
|
image_data = await image_response.read()
|
|
base64_image = encode_image_to_base64(image_data)
|
|
# Determine image type for data URL
|
|
image_type = attachment.content_type.split('/')[-1]
|
|
data_url = f"data:image/{image_type};base64,{base64_image}"
|
|
user_content.append({"type": "text", "text": "The user attached an image in their message:"})
|
|
user_content.append({"type": "image_url", "image_url": {"url": data_url}})
|
|
log.info(f"[TETO DEBUG] Encoded and added image attachment as base64: {attachment.url}")
|
|
else:
|
|
log.warning(f"[TETO DEBUG] Failed to download image attachment: {attachment.url} (Status: {image_response.status})")
|
|
user_content.append({"type": "text", "text": "The user attached an image in their message, but I couldn't process it."})
|
|
except Exception as e:
|
|
log.error(f"[TETO DEBUG] Error processing image attachment {attachment.url}: {e}")
|
|
user_content.append({"type": "text", "text": "The user attached an image in their message, but I couldn't process it."})
|
|
|
|
|
|
# Handle stickers
|
|
for sticker in message.stickers:
|
|
# Assuming sticker has a url attribute
|
|
user_content.append({"type": "text", "text": "The user sent a sticker image:"})
|
|
user_content.append({"type": "image_url", "image_url": {"url": sticker.url}})
|
|
print(f"[TETO DEBUG] Found sticker: {sticker.url}")
|
|
|
|
# Handle custom emojis (basic regex for <:name:id> and <a:name:id>)
|
|
emoji_pattern = re.compile(r"<a?:(\w+):(\d+)>")
|
|
for match in emoji_pattern.finditer(message.content):
|
|
emoji_id = match.group(2)
|
|
# Construct Discord emoji URL - this might need adjustment based on Discord API specifics
|
|
emoji_url = f"https://cdn.discordapp.com/emojis/{emoji_id}.png" # .gif for animated
|
|
if match.group(0).startswith("<a:"): # Check if animated
|
|
emoji_url = f"https://cdn.discordapp.com/emojis/{emoji_id}.gif"
|
|
user_content.append({"type": "text", "text": f"The custom emoji {match.group(1)}:"})
|
|
user_content.append({"type": "image_url", "image_url": {"url": emoji_url}})
|
|
print(f"[TETO DEBUG] Found custom emoji: {emoji_url}")
|
|
|
|
|
|
if not user_content:
|
|
log.info("[TETO DEBUG] Message triggered AI but contained no supported content (text, image, sticker, emoji).")
|
|
return # Don't send empty messages to the AI
|
|
|
|
convo.append({"role": "user", "content": user_content})
|
|
|
|
try:
|
|
async with channel.typing():
|
|
ai_reply = await self._teto_reply_ai_with_messages(messages=convo)
|
|
ai_reply = strip_think_blocks(ai_reply)
|
|
await message.reply(ai_reply)
|
|
|
|
# Extract the original AI content (without command execution formatting)
|
|
# for storing in conversation history
|
|
command, content_without_command, _ = extract_shell_command(ai_reply)
|
|
if command:
|
|
# If there was a command, store the original AI content without the formatted execution message
|
|
convo.append({"role": "assistant", "content": content_without_command if content_without_command else ai_reply})
|
|
else:
|
|
# If there was no command, store the full reply
|
|
convo.append({"role": "assistant", "content": ai_reply})
|
|
|
|
_teto_conversations[convo_key] = convo[-10:] # Keep last 10 interactions
|
|
log.info("[TETO DEBUG] AI reply sent successfully.")
|
|
except Exception as e:
|
|
await channel.send(f"**Teto AI conversation failed! TwT**\n{e}")
|
|
log.error(f"[TETO DEBUG] Exception during AI reply: {e}")
|
|
|
|
@app_commands.command(name="set_ai_model", description="Sets the AI model for Teto.")
|
|
@app_commands.describe(model_name="The name of the AI model to use.")
|
|
async def set_ai_model(self, interaction: discord.Interaction, model_name: str):
|
|
self._ai_model = model_name
|
|
await interaction.response.send_message(f"Teto's AI model set to: {model_name} desu~", ephemeral=True)
|
|
|
|
@app_commands.command(name="set_api_endpoint", description="Sets the API endpoint for Teto.")
|
|
@app_commands.describe(endpoint_url="The URL of the API endpoint.")
|
|
async def set_api_endpoint(self, interaction: discord.Interaction, endpoint_url: str):
|
|
self._api_endpoint = endpoint_url
|
|
await interaction.response.send_message(f"Teto's API endpoint set to: {endpoint_url} desu~", ephemeral=True)
|
|
|
|
@app_commands.command(name="clear_chat_history", description="Clears the chat history for the current channel.")
|
|
async def clear_chat_history(self, interaction: discord.Interaction):
|
|
channel_id = interaction.channel_id
|
|
if channel_id in _teto_conversations:
|
|
del _teto_conversations[channel_id]
|
|
await interaction.response.send_message("Chat history cleared for this channel desu~", ephemeral=True)
|
|
else:
|
|
await interaction.response.send_message("No chat history found for this channel desu~", ephemeral=True)
|
|
|
|
@app_commands.command(name="toggle_shell_command", description="Toggles Teto's ability to run shell commands.")
|
|
async def toggle_shell_command(self, interaction: discord.Interaction):
|
|
self._allow_shell_commands = not self._allow_shell_commands
|
|
status = "enabled" if self._allow_shell_commands else "disabled"
|
|
await interaction.response.send_message(f"Teto's shell command ability is now {status} desu~", ephemeral=True)
|
|
|
|
@app_commands.command(name="toggle_web_search", description="Toggles Teto's ability to search the web.")
|
|
async def toggle_web_search(self, interaction: discord.Interaction):
|
|
if not self.tavily_api_key or not self.tavily_client:
|
|
await interaction.response.send_message("Web search is not available because the Tavily API key is not configured. Please set the TAVILY_API_KEY environment variable.", ephemeral=True)
|
|
return
|
|
|
|
self._allow_web_search = not self._allow_web_search
|
|
status = "enabled" if self._allow_web_search else "disabled"
|
|
await interaction.response.send_message(f"Teto's web search ability is now {status} desu~", ephemeral=True)
|
|
|
|
@app_commands.command(name="web_search", description="Search the web using Tavily API.")
|
|
@app_commands.describe(query="The search query to look up online.")
|
|
async def web_search_command(self, interaction: discord.Interaction, query: str):
|
|
if not self.tavily_api_key or not self.tavily_client:
|
|
await interaction.response.send_message("Web search is not available because the Tavily API key is not configured. Please set the TAVILY_API_KEY environment variable.", ephemeral=True)
|
|
return
|
|
|
|
await interaction.response.defer(thinking=True)
|
|
|
|
try:
|
|
search_results = await self.web_search(query=query)
|
|
|
|
if "error" in search_results:
|
|
await interaction.followup.send(f"❌ Error: Web search failed - {search_results['error']}")
|
|
return
|
|
|
|
# Format the results in a readable way
|
|
embed = discord.Embed(
|
|
title=f"🔍 Web Search Results for: {query}",
|
|
description=search_results.get("answer", "No summary available."),
|
|
color=discord.Color.blue()
|
|
)
|
|
|
|
for i, result in enumerate(search_results.get("results", [])[:5], 1): # Limit to top 5 results
|
|
embed.add_field(
|
|
name=f"Result {i}: {result['title']}",
|
|
value=f"[Link]({result['url']})\n{result['content'][:200]}...",
|
|
inline=False
|
|
)
|
|
|
|
embed.set_footer(text=f"Search depth: {search_results['search_depth']} | Results: {search_results['count']}")
|
|
|
|
await interaction.followup.send(embed=embed)
|
|
except Exception as e:
|
|
await interaction.followup.send(f"❌ Error performing web search: {str(e)}")
|
|
|
|
|
|
# Context menu command must be defined at module level
|
|
@app_commands.context_menu(name="Teto AI Reply")
|
|
async def teto_context_menu_ai_reply(interaction: discord.Interaction, message: discord.Message):
|
|
"""Replies to the selected message as a Teto AI."""
|
|
if not message.content:
|
|
await interaction.response.send_message("The selected message has no text content to reply to! >.<", ephemeral=True)
|
|
return
|
|
|
|
await interaction.response.defer(ephemeral=True)
|
|
channel = interaction.channel
|
|
convo_key = channel.id
|
|
convo = _teto_conversations.get(convo_key, [])
|
|
|
|
if message.content:
|
|
convo.append({"role": "user", "content": message.content})
|
|
try:
|
|
# Get the TetoCog instance from the bot
|
|
cog = interaction.client.get_cog("TetoCog")
|
|
if cog is None:
|
|
await interaction.followup.send("TetoCog is not loaded, cannot reply.", ephemeral=True)
|
|
return
|
|
ai_reply = await cog._teto_reply_ai_with_messages(messages=convo)
|
|
ai_reply = strip_think_blocks(ai_reply)
|
|
await message.reply(ai_reply)
|
|
await interaction.followup.send("Teto AI replied desu~", ephemeral=True)
|
|
|
|
# Extract the original AI content (without command execution formatting)
|
|
# for storing in conversation history
|
|
command, content_without_command, _ = extract_shell_command(ai_reply)
|
|
if command:
|
|
# If there was a command, store the original AI content without the formatted execution message
|
|
convo.append({"role": "assistant", "content": content_without_command if content_without_command else ai_reply})
|
|
else:
|
|
# If there was no command, store the full reply
|
|
convo.append({"role": "assistant", "content": ai_reply})
|
|
_teto_conversations[convo_key] = convo[-10:]
|
|
except Exception as e:
|
|
await interaction.followup.send(f"Teto AI reply failed: {e} desu~", ephemeral=True)
|
|
|
|
async def setup(bot: commands.Bot):
|
|
cog = TetoCog(bot)
|
|
await bot.add_cog(cog)
|
|
bot.tree.add_command(teto_context_menu_ai_reply)
|
|
print("TetoCog loaded! desu~")
|