This commit is contained in:
Slipstream 2025-04-28 23:24:38 -06:00
parent a4d69368b1
commit bc511aae90
Signed by: slipstream
GPG Key ID: 13E498CE010AC6FD
5 changed files with 854 additions and 961 deletions

File diff suppressed because it is too large Load Diff

View File

@ -469,16 +469,13 @@ async def on_message_listener(cog: 'GurtCog', message: discord.Message):
# Send message with reference if applicable
sent_msg = await original_message.channel.send(response_text, reference=message_reference, mention_author=False) # mention_author=False is usually preferred for bots
sent_any_message = True
# Cache this bot response - NOTE: Commented out as LangchainAgent should handle history via add_message
# bot_response_cache_entry = format_message(cog, sent_msg) # Pass cog
# cog.message_cache['by_channel'][channel_id].append(bot_response_cache_entry)
# cog.message_cache['global_recent'].append(bot_response_cache_entry)
cog.bot_last_spoke[channel_id] = time.time() # Keep track of when bot last spoke
# Track participation topic - Requires the sent message content. Let's get it directly.
# We need the content to identify topics. Since we don't cache the formatted message anymore,
# let's create a minimal dict for topic identification.
bot_response_for_topic = {"content": sent_msg.content, "author": {"id": str(cog.bot.user.id)}}
identified_topics = identify_conversation_topics(cog, [bot_response_for_topic]) # Pass cog
# Cache this bot response
bot_response_cache_entry = format_message(cog, sent_msg) # Pass cog
cog.message_cache['by_channel'][channel_id].append(bot_response_cache_entry)
cog.message_cache['global_recent'].append(bot_response_cache_entry)
cog.bot_last_spoke[channel_id] = time.time()
# Track participation topic
identified_topics = identify_conversation_topics(cog, [bot_response_cache_entry]) # Pass cog
if identified_topics:
topic = identified_topics[0]['topic'].lower().strip()
cog.gurt_participation_topics[topic] += 1

View File

@ -2,11 +2,6 @@
# Use a direct import path that doesn't rely on package structure
import os
import importlib.util
from typing import TYPE_CHECKING, List, Sequence, Dict, Any # Import TYPE_CHECKING and other types
import collections # Import collections for deque
if TYPE_CHECKING:
from .cog import GurtCog # Use relative import for type hinting
# Get the absolute path to gurt_memory.py
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
@ -20,178 +15,5 @@ spec.loader.exec_module(gurt_memory)
# Import the MemoryManager class from the loaded module
MemoryManager = gurt_memory.MemoryManager
import logging
from typing import List, Sequence
# LangChain imports for Chat History
from langchain_core.chat_history import BaseChatMessageHistory
# Import specific message types needed
from langchain_core.messages import (
BaseMessage, AIMessage, HumanMessage, SystemMessage, ToolMessage
)
# Relative imports
from .config import CONTEXT_WINDOW_SIZE # Import context window size
# Configure logging if not already done elsewhere
logger = logging.getLogger(__name__)
# --- LangChain Chat History Implementation ---
class GurtMessageCacheHistory(BaseChatMessageHistory):
"""Chat message history that reads from and potentially writes to GurtCog's message cache."""
def __init__(self, cog: 'GurtCog', channel_id: int):
# Use relative import for type checking within the function scope if needed,
# or rely solely on the TYPE_CHECKING block if sufficient.
# For runtime check, a local relative import is safer.
from .cog import GurtCog # Use relative import here
if not isinstance(cog, GurtCog):
raise TypeError("GurtMessageCacheHistory requires a GurtCog instance.")
self.cog = cog
self.channel_id = channel_id
self.key = f"channel:{channel_id}" # Example key structure
@property
def messages(self) -> List[BaseMessage]: # type: ignore
"""Retrieve messages from the cache and reconstruct LangChain messages."""
# Access the cache via the cog instance
# Ensure the cache is initialized as a deque
channel_cache = self.cog.message_cache['by_channel'].setdefault(
self.channel_id, collections.deque(maxlen=CONTEXT_WINDOW_SIZE * 2) # Use a larger maxlen for safety?
)
cached_messages_data = list(channel_cache) # Get a list copy
items: List[BaseMessage] = []
# Apply context window limit (consider if the limit should apply differently to LC messages vs formatted)
# For now, apply simple limit to the combined list
relevant_messages_data = cached_messages_data[-(CONTEXT_WINDOW_SIZE * 2):] # Use the potentially larger limit
for msg_data in relevant_messages_data:
if isinstance(msg_data, dict) and msg_data.get('_is_lc_message_'):
# Reconstruct LangChain message from serialized dict
lc_type = msg_data.get('lc_type')
content = msg_data.get('content', '')
additional_kwargs = msg_data.get('additional_kwargs', {})
tool_calls = msg_data.get('tool_calls') # For AIMessage
tool_call_id = msg_data.get('tool_call_id') # For ToolMessage
try:
if lc_type == 'HumanMessage':
items.append(HumanMessage(content=content, additional_kwargs=additional_kwargs))
elif lc_type == 'AIMessage':
# Reconstruct AIMessage, potentially with tool_calls
ai_msg = AIMessage(content=content, additional_kwargs=additional_kwargs)
if tool_calls:
# Ensure tool_calls are in the correct format if needed (e.g., list of dicts)
# Assuming they were stored correctly from message.dict()
ai_msg.tool_calls = tool_calls
items.append(ai_msg)
elif lc_type == 'ToolMessage':
# ToolMessage needs content and tool_call_id
if tool_call_id:
items.append(ToolMessage(content=content, tool_call_id=tool_call_id, additional_kwargs=additional_kwargs))
else:
logger.warning(f"Skipping ToolMessage reconstruction, missing tool_call_id: {msg_data}")
elif lc_type == 'SystemMessage': # Should not happen via add_message, but handle defensively
items.append(SystemMessage(content=content, additional_kwargs=additional_kwargs))
# Add other types if needed (FunctionMessage?)
else:
logger.warning(f"Unhandled LangChain message type '{lc_type}' during reconstruction.")
except Exception as recon_e:
logger.error(f"Error reconstructing LangChain message type '{lc_type}': {recon_e}\nData: {msg_data}", exc_info=True)
elif isinstance(msg_data, dict) and not msg_data.get('_is_lc_message_'):
# Existing logic for reconstructing from formatted user/bot messages
# This assumes the agent doesn't add Human/AI messages that overlap with these
role = "ai" if msg_data.get('author', {}).get('id') == str(self.cog.bot.user.id) else "human"
# Reconstruct content similar to original logic (simplified)
content_parts = []
author_name = msg_data.get('author', {}).get('display_name', 'Unknown')
# Basic content reconstruction
content = msg_data.get('content', '')
attachments = msg_data.get("attachment_descriptions", [])
if attachments:
attachment_str = " ".join([att['description'] for att in attachments])
content += f" [Attachments: {attachment_str}]" # Append attachment info
# Combine author and content for the LangChain message
# NOTE: This might differ from how the agent expects input if it relies on raw content.
# Consider if just the content string is better here.
# Let's stick to the previous format for now.
full_content = f"{author_name}: {content}"
if role == "human":
items.append(HumanMessage(content=full_content))
elif role == "ai":
# This should only be the *final* AI response text, without tool calls
items.append(AIMessage(content=full_content))
else:
logger.warning(f"Unhandled message role '{role}' in GurtMessageCacheHistory (formatted msg) for channel {self.channel_id}")
else:
logger.warning(f"Skipping unrecognized item in message cache: {type(msg_data)}")
return items
def add_message(self, message: BaseMessage) -> None:
"""Add a LangChain BaseMessage to the history cache."""
try:
# Serialize the message object to a dictionary using pydantic's dict()
message_dict = message.dict()
# Explicitly store the LangChain class name for reconstruction
message_dict['lc_type'] = message.__class__.__name__
# Add our flag to distinguish it during retrieval
message_dict['_is_lc_message_'] = True
# Ensure tool_calls and tool_call_id are preserved if they exist
# (message.dict() should handle this, but double-check if issues arise)
# Example explicit checks (might be redundant):
# if isinstance(message, AIMessage) and hasattr(message, 'tool_calls') and message.tool_calls:
# message_dict['tool_calls'] = message.tool_calls
# elif isinstance(message, ToolMessage) and hasattr(message, 'tool_call_id'):
# message_dict['tool_call_id'] = message.tool_call_id
# Access the cache via the cog instance, ensuring it's a deque
channel_cache = self.cog.message_cache['by_channel'].setdefault(
self.channel_id, collections.deque(maxlen=CONTEXT_WINDOW_SIZE * 2) # Use consistent maxlen
)
channel_cache.append(message_dict)
logger.debug(f"Added LangChain message ({message.__class__.__name__}) to cache for channel {self.channel_id}")
except Exception as e:
logger.error(f"Error adding LangChain message to cache for channel {self.channel_id}: {e}", exc_info=True)
# Optional: Implement add_user_message, add_ai_message if needed (BaseChatMessageHistory provides defaults)
def clear(self) -> None:
"""Clear history from the cache for this channel."""
logger.warning(f"GurtMessageCacheHistory.clear() called for channel {self.channel_id}. Clearing cache deque.")
if self.channel_id in self.cog.message_cache['by_channel']:
# Clear the deque instead of deleting the key, to keep the deque object
self.cog.message_cache['by_channel'][self.channel_id].clear()
# Potentially clear other related caches if necessary
# Factory function for LangchainAgent
def get_gurt_session_history(session_id: str, cog: 'GurtCog') -> BaseChatMessageHistory:
"""
Factory function to get a chat history instance for a given session ID.
The session_id is expected to be the Discord channel ID.
"""
try:
channel_id = int(session_id)
return GurtMessageCacheHistory(cog=cog, channel_id=channel_id)
except ValueError:
logger.error(f"Invalid session_id for Gurt chat history: '{session_id}'. Expected integer channel ID.")
# Return an in-memory history as a fallback? Or raise error?
# from langchain_community.chat_message_histories import ChatMessageHistory
# return ChatMessageHistory() # Fallback to basic in-memory
raise ValueError(f"Invalid session_id: {session_id}")
except TypeError as e:
logger.error(f"TypeError creating GurtMessageCacheHistory: {e}. Ensure 'cog' is passed correctly.")
raise
# Re-export the MemoryManager class AND the history components
__all__ = ['MemoryManager', 'GurtMessageCacheHistory', 'get_gurt_session_history']
# Re-export the MemoryManager class
__all__ = ['MemoryManager']

View File

@ -148,7 +148,7 @@ You can use the tools you have to gather additional context for your messages if
**Replying to Messages:**
- To reply directly to a specific message, include the `"reply_to_message_id"` field in your JSON response, setting its value to the string ID of the message you want to reply to.
- Example JSON for replying: `{{ "should_respond": true, "content": "lol yeah", "reply_to_message_id": "112233445566778899", "react_with_emoji": null }}`
- Example JSON for replying: `{ "should_respond": true, "content": "lol yeah", "reply_to_message_id": "112233445566778899", "react_with_emoji": null }`
- You can usually find the ID of recent messages in the conversation history provided in the prompt.
**Pinging Users:**
@ -172,12 +172,12 @@ DO NOT fall into these patterns:
**CRITICAL: You MUST respond ONLY with a valid JSON object matching this schema:**
{{
{
"should_respond": true, // Whether to send a text message in response.
"content": "example message", // The text content of the bot's response.
"react_with_emoji": "👍", // Optional: A standard Discord emoji to react with, or null if no reaction.
"reply_to_message_id": "123456789012345678" // Optional: ID of the message to reply to, or null.
}}
}
**Do NOT include any other text, explanations, or markdown formatting outside of this JSON structure.**

View File

@ -17,7 +17,6 @@ from tavily import TavilyClient
import docker
import aiodocker # Use aiodocker for async operations
from asteval import Interpreter # Added for calculate tool
# Removed: from langchain_core.tools import tool
# Relative imports from within the gurt package and parent
from .memory import MemoryManager # Import from local memory.py
@ -38,19 +37,8 @@ from .config import (
# to access things like cog.bot, cog.session, cog.current_channel, cog.memory_manager etc.
# We will add 'cog' as the first parameter to each.
# @tool removed
async def get_recent_messages(cog: commands.Cog, limit: int, channel_id: Optional[str] = None) -> Dict[str, Any]:
"""
Retrieves the most recent messages from a specified Discord channel or the current channel.
Args:
cog: The GurtCog instance (automatically passed).
limit: The maximum number of messages to retrieve (1-100).
channel_id: Optional ID of the channel to fetch messages from. If None, uses the current channel context.
Returns:
A dictionary containing channel info, a list of formatted messages, the count, and a timestamp, or an error dictionary.
"""
async def get_recent_messages(cog: commands.Cog, limit: int, channel_id: str = None) -> Dict[str, Any]:
"""Get recent messages from a Discord channel"""
from .utils import format_message # Import here to avoid circular dependency at module level
limit = min(max(1, limit), 100)
try:
@ -73,20 +61,8 @@ async def get_recent_messages(cog: commands.Cog, limit: int, channel_id: Optiona
except Exception as e:
return {"error": f"Error retrieving messages: {str(e)}", "timestamp": datetime.datetime.now().isoformat()}
# @tool removed
async def search_user_messages(cog: commands.Cog, user_id: str, limit: int, channel_id: Optional[str] = None) -> Dict[str, Any]:
"""
Searches recent channel history for messages sent by a specific user.
Args:
cog: The GurtCog instance (automatically passed).
user_id: The Discord ID of the user whose messages to search for.
limit: The maximum number of messages to return (1-100).
channel_id: Optional ID of the channel to search in. If None, uses the current channel context.
Returns:
A dictionary containing channel info, user info, a list of formatted messages, the count, and a timestamp, or an error dictionary.
"""
async def search_user_messages(cog: commands.Cog, user_id: str, limit: int, channel_id: str = None) -> Dict[str, Any]:
"""Search for messages from a specific user"""
from .utils import format_message # Import here
limit = min(max(1, limit), 100)
try:
@ -118,20 +94,8 @@ async def search_user_messages(cog: commands.Cog, user_id: str, limit: int, chan
except Exception as e:
return {"error": f"Error searching user messages: {str(e)}", "timestamp": datetime.datetime.now().isoformat()}
# @tool removed
async def search_messages_by_content(cog: commands.Cog, search_term: str, limit: int, channel_id: Optional[str] = None) -> Dict[str, Any]:
"""
Searches recent channel history for messages containing specific text content (case-insensitive).
Args:
cog: The GurtCog instance (automatically passed).
search_term: The text content to search for within messages.
limit: The maximum number of matching messages to return (1-100).
channel_id: Optional ID of the channel to search in. If None, uses the current channel context.
Returns:
A dictionary containing channel info, the search term, a list of formatted messages, the count, and a timestamp, or an error dictionary.
"""
async def search_messages_by_content(cog: commands.Cog, search_term: str, limit: int, channel_id: str = None) -> Dict[str, Any]:
"""Search for messages containing specific content"""
from .utils import format_message # Import here
limit = min(max(1, limit), 100)
try:
@ -158,18 +122,8 @@ async def search_messages_by_content(cog: commands.Cog, search_term: str, limit:
except Exception as e:
return {"error": f"Error searching messages by content: {str(e)}", "timestamp": datetime.datetime.now().isoformat()}
# @tool removed
async def get_channel_info(cog: commands.Cog, channel_id: Optional[str] = None) -> Dict[str, Any]:
"""
Retrieves detailed information about a specified Discord channel or the current channel.
Args:
cog: The GurtCog instance (automatically passed).
channel_id: Optional ID of the channel to get info for. If None, uses the current channel context.
Returns:
A dictionary containing detailed channel information (ID, name, topic, type, guild, etc.) or an error dictionary.
"""
async def get_channel_info(cog: commands.Cog, channel_id: str = None) -> Dict[str, Any]:
"""Get information about a Discord channel"""
try:
if channel_id:
channel = cog.bot.get_channel(int(channel_id))
@ -197,19 +151,8 @@ async def get_channel_info(cog: commands.Cog, channel_id: Optional[str] = None)
except Exception as e:
return {"error": f"Error getting channel info: {str(e)}", "timestamp": datetime.datetime.now().isoformat()}
# @tool removed
async def get_conversation_context(cog: commands.Cog, message_count: int, channel_id: Optional[str] = None) -> Dict[str, Any]:
"""
Retrieves recent messages to provide context for the ongoing conversation in a channel.
Args:
cog: The GurtCog instance (automatically passed).
message_count: The number of recent messages to retrieve (5-50).
channel_id: Optional ID of the channel. If None, uses the current channel context.
Returns:
A dictionary containing channel info, a list of formatted context messages, the count, and a timestamp, or an error dictionary.
"""
async def get_conversation_context(cog: commands.Cog, message_count: int, channel_id: str = None) -> Dict[str, Any]:
"""Get the context of the current conversation in a channel"""
from .utils import format_message # Import here
message_count = min(max(5, message_count), 50)
try:
@ -237,19 +180,8 @@ async def get_conversation_context(cog: commands.Cog, message_count: int, channe
except Exception as e:
return {"error": f"Error getting conversation context: {str(e)}"}
# @tool removed
async def get_thread_context(cog: commands.Cog, thread_id: str, message_count: int) -> Dict[str, Any]:
"""
Retrieves recent messages from a specific Discord thread to provide conversation context.
Args:
cog: The GurtCog instance (automatically passed).
thread_id: The ID of the thread to retrieve context from.
message_count: The number of recent messages to retrieve (5-50).
Returns:
A dictionary containing thread info, parent channel ID, a list of formatted context messages, the count, and a timestamp, or an error dictionary.
"""
"""Get the context of a thread conversation"""
from .utils import format_message # Import here
message_count = min(max(5, message_count), 50)
try:
@ -274,21 +206,8 @@ async def get_thread_context(cog: commands.Cog, thread_id: str, message_count: i
except Exception as e:
return {"error": f"Error getting thread context: {str(e)}"}
# @tool removed
async def get_user_interaction_history(cog: commands.Cog, user_id_1: str, limit: int, user_id_2: Optional[str] = None) -> Dict[str, Any]:
"""
Retrieves the recent message history involving interactions (replies, mentions) between two users.
If user_id_2 is not provided, it defaults to interactions between user_id_1 and the bot (Gurt).
Args:
cog: The GurtCog instance (automatically passed).
user_id_1: The Discord ID of the first user.
limit: The maximum number of interaction messages to return (1-50).
user_id_2: Optional Discord ID of the second user. Defaults to the bot's ID.
Returns:
A dictionary containing info about both users, a list of formatted interaction messages, the count, and a timestamp, or an error dictionary.
"""
async def get_user_interaction_history(cog: commands.Cog, user_id_1: str, limit: int, user_id_2: str = None) -> Dict[str, Any]:
"""Get the history of interactions between two users (or user and bot)"""
limit = min(max(1, limit), 50)
try:
user_id_1_int = int(user_id_1)
@ -323,20 +242,8 @@ async def get_user_interaction_history(cog: commands.Cog, user_id_1: str, limit:
except Exception as e:
return {"error": f"Error getting user interaction history: {str(e)}"}
# @tool removed
async def get_conversation_summary(cog: commands.Cog, channel_id: Optional[str] = None, message_limit: int = 25) -> Dict[str, Any]:
"""
Generates and returns a concise summary of the recent conversation in a specified channel or the current channel.
Uses an internal LLM call for summarization and caches the result.
Args:
cog: The GurtCog instance (automatically passed).
channel_id: Optional ID of the channel to summarize. If None, uses the current channel context.
message_limit: The number of recent messages to consider for the summary (default 25).
Returns:
A dictionary containing the channel ID, the generated summary, the source (cache or generated), and a timestamp, or an error dictionary.
"""
async def get_conversation_summary(cog: commands.Cog, channel_id: str = None, message_limit: int = 25) -> Dict[str, Any]:
"""Generates and returns a summary of the recent conversation in a channel using an LLM call."""
from .config import SUMMARY_RESPONSE_SCHEMA, DEFAULT_MODEL # Import schema and model
from .api import get_internal_ai_json_response # Import here
try:
@ -408,20 +315,8 @@ async def get_conversation_summary(cog: commands.Cog, channel_id: Optional[str]
traceback.print_exc()
return {"error": error_msg}
# @tool removed
async def get_message_context(cog: commands.Cog, message_id: str, before_count: int = 5, after_count: int = 5) -> Dict[str, Any]:
"""
Retrieves messages immediately before and after a specific message ID within the current channel.
Args:
cog: The GurtCog instance (automatically passed).
message_id: The ID of the target message to get context around.
before_count: The number of messages to retrieve before the target message (1-25).
after_count: The number of messages to retrieve after the target message (1-25).
Returns:
A dictionary containing the formatted target message, lists of messages before and after, the channel ID, and a timestamp, or an error dictionary.
"""
"""Get the context (messages before and after) around a specific message"""
from .utils import format_message # Import here
before_count = min(max(1, before_count), 25)
after_count = min(max(1, after_count), 25)
@ -450,26 +345,8 @@ async def get_message_context(cog: commands.Cog, message_id: str, before_count:
except Exception as e:
return {"error": f"Error getting message context: {str(e)}"}
# @tool removed
async def web_search(cog: commands.Cog, query: str, search_depth: str = TAVILY_DEFAULT_SEARCH_DEPTH, max_results: int = TAVILY_DEFAULT_MAX_RESULTS, topic: str = "general", include_domains: Optional[List[str]] = None, exclude_domains: Optional[List[str]] = None, include_answer: bool = True, include_raw_content: bool = False, include_images: bool = False) -> Dict[str, Any]:
"""
Performs a web search using the Tavily API based on the provided query and parameters.
Args:
cog: The GurtCog instance (automatically passed).
query: The search query string.
search_depth: Search depth ('basic' or 'advanced'). Advanced costs more credits. Defaults to basic.
max_results: Maximum number of search results to return (5-20). Defaults to 5.
topic: Optional topic hint for the search (e.g., "news", "finance"). Defaults to "general".
include_domains: Optional list of domains to prioritize in the search.
exclude_domains: Optional list of domains to exclude from the search.
include_answer: Whether to include a concise answer generated by Tavily (default True).
include_raw_content: Whether to include raw scraped content from result URLs (default False).
include_images: Whether to include relevant images found during the search (default False).
Returns:
A dictionary containing the search query parameters, a list of results (title, url, content, etc.), an optional answer, optional follow-up questions, the count, and a timestamp, or an error dictionary.
"""
"""Search the web using Tavily API"""
if not hasattr(cog, 'tavily_client') or not cog.tavily_client:
return {"error": "Tavily client not initialized.", "timestamp": datetime.datetime.now().isoformat()}
@ -531,19 +408,8 @@ async def web_search(cog: commands.Cog, query: str, search_depth: str = TAVILY_D
print(error_message)
return {"error": error_message, "timestamp": datetime.datetime.now().isoformat()}
# @tool removed
async def remember_user_fact(cog: commands.Cog, user_id: str, fact: str) -> Dict[str, Any]:
"""
Stores a specific fact about a given user in the bot's long-term memory.
Args:
cog: The GurtCog instance (automatically passed).
user_id: The Discord ID of the user the fact is about.
fact: The string containing the fact to remember about the user.
Returns:
A dictionary indicating success, duplication, or an error. May include a note if an old fact was deleted due to limits.
"""
"""Stores a fact about a user using the MemoryManager."""
if not user_id or not fact: return {"error": "user_id and fact required."}
print(f"Remembering fact for user {user_id}: '{fact}'")
try:
@ -557,18 +423,8 @@ async def remember_user_fact(cog: commands.Cog, user_id: str, fact: str) -> Dict
print(error_message); traceback.print_exc()
return {"error": error_message}
# @tool removed
async def get_user_facts(cog: commands.Cog, user_id: str) -> Dict[str, Any]:
"""
Retrieves all stored facts associated with a specific user from the bot's long-term memory.
Args:
cog: The GurtCog instance (automatically passed).
user_id: The Discord ID of the user whose facts to retrieve.
Returns:
A dictionary containing the user ID, a list of retrieved facts, the count, and a timestamp, or an error dictionary.
"""
"""Retrieves stored facts about a user using the MemoryManager."""
if not user_id: return {"error": "user_id required."}
print(f"Retrieving facts for user {user_id}")
try:
@ -579,18 +435,8 @@ async def get_user_facts(cog: commands.Cog, user_id: str) -> Dict[str, Any]:
print(error_message); traceback.print_exc()
return {"error": error_message}
# @tool removed
async def remember_general_fact(cog: commands.Cog, fact: str) -> Dict[str, Any]:
"""
Stores a general fact (not specific to any user) in the bot's long-term memory.
Args:
cog: The GurtCog instance (automatically passed).
fact: The string containing the general fact to remember.
Returns:
A dictionary indicating success, duplication, or an error. May include a note if an old fact was deleted due to limits.
"""
"""Stores a general fact using the MemoryManager."""
if not fact: return {"error": "fact required."}
print(f"Remembering general fact: '{fact}'")
try:
@ -604,19 +450,8 @@ async def remember_general_fact(cog: commands.Cog, fact: str) -> Dict[str, Any]:
print(error_message); traceback.print_exc()
return {"error": error_message}
# @tool removed
async def get_general_facts(cog: commands.Cog, query: Optional[str] = None, limit: Optional[int] = 10) -> Dict[str, Any]:
"""
Retrieves general facts from the bot's long-term memory. Can optionally filter by a query string.
Args:
cog: The GurtCog instance (automatically passed).
query: Optional string to search for within the general facts. If None, retrieves the most recent facts.
limit: The maximum number of facts to return (1-50, default 10).
Returns:
A dictionary containing the query (if any), a list of retrieved facts, the count, and a timestamp, or an error dictionary.
"""
"""Retrieves stored general facts using the MemoryManager."""
print(f"Retrieving general facts (query='{query}', limit={limit})")
limit = min(max(1, limit or 10), 50)
try:
@ -627,20 +462,8 @@ async def get_general_facts(cog: commands.Cog, query: Optional[str] = None, limi
print(error_message); traceback.print_exc()
return {"error": error_message}
# @tool removed
async def timeout_user(cog: commands.Cog, user_id: str, duration_minutes: int, reason: Optional[str] = None) -> Dict[str, Any]:
"""
Applies a timeout to a specified user within the current server (guild).
Args:
cog: The GurtCog instance (automatically passed).
user_id: The Discord ID of the user to timeout.
duration_minutes: The duration of the timeout in minutes (1-1440).
reason: Optional reason for the timeout, displayed in the audit log.
Returns:
A dictionary indicating success (with user details, duration, reason) or an error (e.g., permissions, user not found).
"""
"""Times out a user in the current server."""
if not cog.current_channel or not isinstance(cog.current_channel, discord.abc.GuildChannel):
return {"error": "Cannot timeout outside of a server."}
guild = cog.current_channel.guild
@ -669,19 +492,8 @@ async def timeout_user(cog: commands.Cog, user_id: str, duration_minutes: int, r
except discord.HTTPException as e: print(f"API error timeout {user_id}: {e}"); return {"error": f"API error timeout {user_id}: {e}"}
except Exception as e: print(f"Unexpected error timeout {user_id}: {e}"); traceback.print_exc(); return {"error": f"Unexpected error timeout {user_id}: {str(e)}"}
# @tool removed
async def remove_timeout(cog: commands.Cog, user_id: str, reason: Optional[str] = None) -> Dict[str, Any]:
"""
Removes an active timeout from a specified user within the current server (guild).
Args:
cog: The GurtCog instance (automatically passed).
user_id: The Discord ID of the user whose timeout should be removed.
reason: Optional reason for removing the timeout, displayed in the audit log.
Returns:
A dictionary indicating success (with user details, reason), that the user wasn't timed out, or an error.
"""
"""Removes an active timeout from a user."""
if not cog.current_channel or not isinstance(cog.current_channel, discord.abc.GuildChannel):
return {"error": "Cannot remove timeout outside of a server."}
guild = cog.current_channel.guild
@ -706,22 +518,8 @@ async def remove_timeout(cog: commands.Cog, user_id: str, reason: Optional[str]
except discord.HTTPException as e: print(f"API error remove timeout {user_id}: {e}"); return {"error": f"API error remove timeout {user_id}: {e}"}
except Exception as e: print(f"Unexpected error remove timeout {user_id}: {e}"); traceback.print_exc(); return {"error": f"Unexpected error remove timeout {user_id}: {str(e)}"}
# @tool removed
def calculate(cog: commands.Cog, expression: str) -> str:
"""
Evaluates a mathematical expression using the asteval library. Supports common math functions. Returns the result as a string.
Args:
cog: The GurtCog instance (automatically passed).
expression: The mathematical expression string to evaluate (e.g., "2 * (pi + 1)").
Args:
cog: The GurtCog instance (automatically passed).
expression: The mathematical expression string to evaluate (e.g., "2 * (pi + 1)").
Returns:
The calculated result as a string, or an error message string if calculation fails.
"""
async def calculate(cog: commands.Cog, expression: str) -> Dict[str, Any]:
"""Evaluates a mathematical expression using asteval."""
print(f"Calculating expression: {expression}")
aeval = Interpreter()
try:
@ -730,29 +528,20 @@ def calculate(cog: commands.Cog, expression: str) -> str:
error_details = '; '.join(err.get_error() for err in aeval.error)
error_message = f"Calculation error: {error_details}"
print(error_message)
return f"Error: {error_message}" # Return error as string
return {"error": error_message, "expression": expression}
if isinstance(result, (int, float, complex)): result_str = str(result)
else: result_str = repr(result) # Fallback
result_str = str(result) # Convert result to string
print(f"Calculation result: {result_str}")
return result_str # Return result as string
return {"expression": expression, "result": result_str, "status": "success"}
except Exception as e:
error_message = f"Unexpected error during calculation: {str(e)}"
print(error_message); traceback.print_exc()
return f"Error: {error_message}" # Return error as string
return {"error": error_message, "expression": expression}
# @tool removed
async def run_python_code(cog: commands.Cog, code: str) -> Dict[str, Any]:
"""
Executes a provided Python code snippet remotely using the Piston API.
The execution environment is sandboxed and has limitations.
Args:
cog: The GurtCog instance (automatically passed).
code: The Python code string to execute.
Returns:
A dictionary containing the execution status ('success' or 'execution_error'), truncated stdout and stderr, exit code, and signal (if any), or an error dictionary if the API call fails.
"""
"""Executes a Python code snippet using the Piston API."""
if not PISTON_API_URL: return {"error": "Piston API URL not configured (PISTON_API_URL)."}
if not cog.session: return {"error": "aiohttp session not initialized."}
print(f"Executing Python via Piston: {code[:100]}...")
@ -786,19 +575,8 @@ async def run_python_code(cog: commands.Cog, code: str) -> Dict[str, Any]:
except aiohttp.ClientError as e: print(f"Piston network error: {e}"); return {"error": f"Network error connecting to Piston: {str(e)}"}
except Exception as e: print(f"Unexpected Piston error: {e}"); traceback.print_exc(); return {"error": f"Unexpected error during Python execution: {str(e)}"}
# @tool removed
async def create_poll(cog: commands.Cog, question: str, options: List[str]) -> Dict[str, Any]:
"""
Creates a simple poll message in the current channel with numbered reaction options.
Args:
cog: The GurtCog instance (automatically passed).
question: The question for the poll.
options: A list of strings representing the poll options (2-10 options).
Returns:
A dictionary indicating success (with message ID, question, option count) or an error (e.g., permissions, invalid options).
"""
"""Creates a simple poll message."""
if not cog.current_channel: return {"error": "No current channel context."}
if not isinstance(cog.current_channel, discord.abc.Messageable): return {"error": "Channel not messageable."}
if not isinstance(options, list) or not 2 <= len(options) <= 10: return {"error": "Poll needs 2-10 options."}
@ -821,7 +599,7 @@ async def create_poll(cog: commands.Cog, question: str, options: List[str]) -> D
except discord.HTTPException as e: print(f"Poll API error: {e}"); return {"error": f"API error creating poll: {e}"}
except Exception as e: print(f"Poll unexpected error: {e}"); traceback.print_exc(); return {"error": f"Unexpected error creating poll: {str(e)}"}
# Helper function to convert memory string (e.g., "128m") to bytes (Not a tool)
# Helper function to convert memory string (e.g., "128m") to bytes
def parse_mem_limit(mem_limit_str: str) -> Optional[int]:
if not mem_limit_str: return None
mem_limit_str = mem_limit_str.lower()
@ -835,18 +613,7 @@ def parse_mem_limit(mem_limit_str: str) -> Optional[int]:
except ValueError: return None
async def _check_command_safety(cog: commands.Cog, command: str) -> Dict[str, Any]:
"""
Internal helper: Uses an LLM call to assess the safety of a shell command before execution.
Analyzes potential risks like data destruction, resource exhaustion, etc., within the context
of a restricted Docker container environment.
Args:
cog: The GurtCog instance (automatically passed).
command: The shell command string to analyze.
Returns:
A dictionary containing 'safe' (boolean) and 'reason' (string) based on the AI's assessment, or indicates an error during the check.
"""
"""Uses a secondary AI call to check if a command is potentially harmful."""
from .api import get_internal_ai_json_response # Import here
print(f"Performing AI safety check for command: '{command}' using model {SAFETY_CHECK_MODEL}")
safety_schema = {
@ -879,19 +646,8 @@ async def _check_command_safety(cog: commands.Cog, command: str) -> Dict[str, An
print(f"AI Safety Check Error: Response was {safety_response}")
return {"safe": False, "reason": error_msg}
# @tool removed
async def run_terminal_command(cog: commands.Cog, command: str) -> Dict[str, Any]:
"""
Executes a shell command within an isolated, network-disabled Docker container.
Performs an AI safety check before execution. Resource limits (CPU, memory) are applied.
Args:
cog: The GurtCog instance (automatically passed).
command: The shell command string to execute.
Returns:
A dictionary containing the execution status ('success', 'execution_error', 'timeout', 'docker_error'), truncated stdout and stderr, and the exit code, or an error dictionary if the safety check fails or Docker setup fails.
"""
"""Executes a shell command in an isolated Docker container after an AI safety check."""
print(f"Attempting terminal command: {command}")
safety_check_result = await _check_command_safety(cog, command)
if not safety_check_result.get("safe"):
@ -1000,20 +756,8 @@ async def run_terminal_command(cog: commands.Cog, command: str) -> Dict[str, Any
# Ensure the client connection is closed
if client:
await client.close()
# @tool removed
async def get_user_id(cog: commands.Cog, user_name: str) -> Dict[str, Any]:
"""
Finds the Discord User ID associated with a given username or display name.
Searches the current server's members first, then falls back to recent message authors if not in a server context.
Args:
cog: The GurtCog instance (automatically passed).
user_name: The username (e.g., "Gurt") or display name (e.g., "GurtBot") to search for. Case-insensitivity is attempted.
Returns:
A dictionary containing the status ('success'), user ID, username, and display name if found, or an error dictionary if the user is not found.
"""
"""Finds the Discord User ID for a given username or display name."""
print(f"Attempting to find user ID for: '{user_name}'")
if not cog.current_channel or not cog.current_channel.guild:
# Search recent global messages if not in a guild context
@ -1052,22 +796,13 @@ async def get_user_id(cog: commands.Cog, user_name: str) -> Dict[str, Any]:
print(f"User '{user_name}' not found in guild '{guild.name}'.")
return {"error": f"User '{user_name}' not found in this server.", "user_name": user_name}
# NOT decorating execute_internal_command as it's marked unsafe for general agent use
async def execute_internal_command(cog: commands.Cog, command: str, timeout_seconds: int = 60) -> Dict[str, Any]:
"""
Executes a shell command directly on the host machine where the bot is running.
**WARNING:** This tool is intended ONLY for internal Gurt operations (e.g., git pull, service restart)
and MUST NOT be used to execute arbitrary commands requested by users due to significant security risks.
It bypasses safety checks and containerization. Use with extreme caution and only for trusted, predefined operations.
Args:
cog: The GurtCog instance (automatically passed).
command: The shell command string to execute.
timeout_seconds: Maximum execution time in seconds (default 60).
Returns:
A dictionary containing the execution status ('success', 'execution_error', 'timeout', 'not_found', 'error'),
truncated stdout and stderr, the exit code, and the original command, or an error dictionary if the command fails.
WARNING: This tool is intended ONLY for internal Gurt operations and MUST NOT
be used to execute arbitrary commands requested by users due to significant security risks.
It bypasses safety checks and containerization. Use with extreme caution.
"""
print(f"--- INTERNAL EXECUTION (UNSAFE): Running command: {command} ---")
try:
@ -1120,21 +855,8 @@ async def execute_internal_command(cog: commands.Cog, command: str, timeout_seco
traceback.print_exc()
return {"error": error_message, "command": command, "status": "error"}
# @tool removed
async def extract_web_content(cog: commands.Cog, urls: Union[str, List[str]], extract_depth: str = "basic", include_images: bool = False) -> Dict[str, Any]:
"""
Extracts the main textual content and optionally images from one or more web URLs using the Tavily API.
This is useful for getting the content of a webpage without performing a full search.
Args:
cog: The GurtCog instance (automatically passed).
urls: A single URL string or a list of URL strings to extract content from.
extract_depth: Extraction depth ('basic' or 'advanced'). Advanced costs more credits but may yield better results. Defaults to 'basic'.
include_images: Whether to include images found on the pages (default False).
Returns:
A dictionary containing the original URLs, parameters used, a list of successful results (URL, raw_content, images), a list of failed URLs, and a timestamp, or an error dictionary.
"""
"""Extract content from URLs using Tavily API"""
if not hasattr(cog, 'tavily_client') or not cog.tavily_client:
return {"error": "Tavily client not initialized.", "timestamp": datetime.datetime.now().isoformat()}
@ -1165,20 +887,8 @@ async def extract_web_content(cog: commands.Cog, urls: Union[str, List[str]], ex
print(error_message)
return {"error": error_message, "timestamp": datetime.datetime.now().isoformat()}
# @tool removed
async def read_file_content(cog: commands.Cog, file_path: str) -> Dict[str, Any]:
"""
Reads the content of a specified file located on the bot's host machine.
Access is restricted to specific allowed directories and file extensions within the project
to prevent unauthorized access to sensitive system files.
Args:
cog: The GurtCog instance (automatically passed).
file_path: The relative path to the file from the bot's project root directory.
Returns:
A dictionary containing the status ('success'), the file path, and the truncated file content (up to 5000 characters), or an error dictionary if access is denied, the file is not found, or another error occurs.
"""
"""Reads the content of a specified file. Limited access for safety."""
print(f"Attempting to read file: {file_path}")
# --- Basic Safety Check (Needs significant enhancement for production) ---
# 1. Normalize path
@ -1252,35 +962,15 @@ async def read_file_content(cog: commands.Cog, file_path: str) -> Dict[str, Any]
traceback.print_exc()
return {"error": error_message, "file_path": file_path}
# --- Meta Tool: Create New Tool --- (Not decorating as it's experimental/dangerous)
# --- Meta Tool: Create New Tool ---
# WARNING: HIGHLY EXPERIMENTAL AND DANGEROUS. Allows AI to write and load code.
async def create_new_tool(cog: commands.Cog, tool_name: str, description: str, parameters_json: str, returns_description: str) -> Dict[str, Any]:
"""
**EXPERIMENTAL & DANGEROUS:** Attempts to dynamically create a new tool for Gurt.
This involves using an LLM to generate Python code for the tool's function and its
corresponding FunctionDeclaration definition based on the provided descriptions.
The generated code is then written directly into `tools.py` and `config.py`.
**WARNING:** This tool modifies the bot's source code directly and poses significant
security risks if the generated code is malicious or flawed. It bypasses standard
code review and testing processes. Use with extreme caution and only in controlled
development environments. A bot reload or restart is typically required for the
new tool to become fully active and available to the LLM.
Args:
cog: The GurtCog instance (automatically passed).
tool_name: The desired name for the new tool (must be a valid Python function name).
description: A natural language description of what the tool does (for the FunctionDeclaration).
parameters_json: A JSON string defining the tool's input parameters. Must follow the
OpenAPI schema format, containing 'type: object', 'properties: {...}',
and optionally 'required: [...]'.
returns_description: A natural language description of what the tool's function should return upon success or error.
Returns:
A dictionary indicating the status ('success' or 'error'). On success, includes the
tool name and a message indicating that a reload is needed. On error, provides an
error message detailing the failure (e.g., invalid name, generation failure, file write error).
May include generated code snippets in case of certain errors for debugging.
EXPERIMENTAL/DANGEROUS: Attempts to create a new tool by generating Python code
and its definition using an LLM, then writing it to tools.py and config.py.
Requires manual reload/restart of the bot for the tool to be fully active.
Parameters JSON should be a JSON string describing the 'properties' and 'required' fields
for the tool's parameters, similar to other FunctionDeclarations.
"""
print(f"--- DANGEROUS OPERATION: Attempting to create new tool: {tool_name} ---")
from .api import get_internal_ai_json_response # Local import
@ -1498,8 +1188,6 @@ async def create_new_tool(cog: commands.Cog, tool_name: str, description: str, p
# --- Tool Mapping ---
# This dictionary maps tool names (used in the AI prompt) to their implementation functions.
# The agent should discover tools via the @tool decorator, but this mapping might still be used elsewhere.
# Keep it updated, but the primary mechanism for the agent is the decorator.
TOOL_MAPPING = {
"get_recent_messages": get_recent_messages,
"search_user_messages": search_user_messages,
@ -1511,11 +1199,11 @@ TOOL_MAPPING = {
"get_conversation_summary": get_conversation_summary,
"get_message_context": get_message_context,
"web_search": web_search,
# Memory tools using direct function references
"remember_user_fact": remember_user_fact,
"get_user_facts": get_user_facts,
"remember_general_fact": remember_general_fact,
"get_general_facts": get_general_facts,
# Point memory tools to the methods on the MemoryManager instance (accessed via cog)
"remember_user_fact": lambda cog, **kwargs: cog.memory_manager.add_user_fact(**kwargs),
"get_user_facts": lambda cog, **kwargs: cog.memory_manager.get_user_facts(**kwargs),
"remember_general_fact": lambda cog, **kwargs: cog.memory_manager.add_general_fact(**kwargs),
"get_general_facts": lambda cog, **kwargs: cog.memory_manager.get_general_facts(**kwargs),
"timeout_user": timeout_user,
"calculate": calculate,
"run_python_code": run_python_code,
@ -1524,7 +1212,7 @@ TOOL_MAPPING = {
"remove_timeout": remove_timeout,
"extract_web_content": extract_web_content,
"read_file_content": read_file_content,
"create_new_tool": create_new_tool, # Meta-tool (not decorated)
"execute_internal_command": execute_internal_command, # Internal command execution (not decorated)
"create_new_tool": create_new_tool, # Added the meta-tool
"execute_internal_command": execute_internal_command, # Added internal command execution
"get_user_id": get_user_id # Added user ID lookup tool
}