From d726f16a2e0518e40690e872ef7c53b1af57ba28 Mon Sep 17 00:00:00 2001 From: Slipstream Date: Fri, 25 Apr 2025 22:40:35 -0600 Subject: [PATCH] a --- .env.example | 50 ++- cogs/gurt_cog.py | 828 +++++++++++++++++++++++++++++++++++------------ gurt_memory.py | 343 ++++++++++++++++++++ requirements.txt | 3 + 4 files changed, 984 insertions(+), 240 deletions(-) create mode 100644 gurt_memory.py diff --git a/.env.example b/.env.example index 699465f..9400c55 100644 --- a/.env.example +++ b/.env.example @@ -1,32 +1,28 @@ -# Discord Bot Configuration -DISCORD_TOKEN=your_discord_bot_token -OWNER_USER_ID=your_discord_user_id +# Discord Bot Token +DISCORD_TOKEN=YOUR_DISCORD_BOT_TOKEN_HERE -# API Configuration -API_URL=http://localhost:8000 +# OpenRouter API Key (Required for GurtCog AI features) +AI_API_KEY=YOUR_OPENROUTER_API_KEY_HERE -# AI Configuration -AI_API_KEY=your_openai_api_key -AI_API_URL=https://api.openai.com/v1/chat/completions -AI_DEFAULT_MODEL=gpt-3.5-turbo -AI_DEFAULT_SYSTEM_PROMPT=You are a helpful assistant. -AI_MAX_TOKENS=1000 -AI_TEMPERATURE=0.7 -AI_TIMEOUT=60 -AI_COMPATIBILITY_MODE=openai +# Tavily API Key (Optional, for GurtCog web search tool) +TAVILY_API_KEY=YOUR_TAVILY_API_KEY_HERE -# SSL Configuration (for API) -SSL_CERT_FILE=path/to/cert.pem -SSL_KEY_FILE=path/to/key.pem +# GurtCog Configuration (Optional, defaults will be used if not set) +OPENROUTER_API_URL=https://openrouter.ai/api/v1/chat/completions +GURT_DEFAULT_MODEL=google/gemini-2.5-flash-preview:thinking +GURT_FALLBACK_MODEL=openai/gpt-4.1-nano +GURT_DB_PATH=data/gurt_memory.db -# OAuth Configuration -DISCORD_CLIENT_ID=your_discord_client_id -# No client secret for public clients -OAUTH_HOST=localhost -OAUTH_PORT=8080 -API_OAUTH_ENABLED=true -# If API_OAUTH_ENABLED is true, this will be ignored and API_URL/auth will be used instead -DISCORD_REDIRECT_URI=http://localhost:8080/oauth/callback +# Discord OAuth2 Configuration (For OAuth Cog) +DISCORD_CLIENT_ID=YOUR_DISCORD_APP_CLIENT_ID +DISCORD_CLIENT_SECRET=YOUR_DISCORD_APP_CLIENT_SECRET +DISCORD_REDIRECT_URI=YOUR_DISCORD_APP_REDIRECT_URI +FLASK_SECRET_KEY=YOUR_FLASK_SECRET_KEY_HERE # Used for session management in Flask server -# GitHub Webhook Configuration -GITHUB_SECRET=your_github_webhook_secret +# API Server Base URL (For Discord Sync Cog) +API_BASE_URL=http://127.0.0.1:5001 # Example URL for the API server + +# Unified API Configuration (For run_unified_api.py) +UNIFIED_API_HOST=0.0.0.0 +UNIFIED_API_PORT=5005 +# Add other necessary environment variables for your specific setup diff --git a/cogs/gurt_cog.py b/cogs/gurt_cog.py index 3857618..6c6d200 100644 --- a/cogs/gurt_cog.py +++ b/cogs/gurt_cog.py @@ -10,9 +10,12 @@ from dotenv import load_dotenv import datetime import time import re +import sqlite3 # Keep for potential other uses? +# import aiosqlite # No longer needed directly in this file from collections import defaultdict, deque from typing import Dict, List, Any, Optional, Tuple, Set from tavily import TavilyClient # Added Tavily import +from gurt_memory import MemoryManager # Import the new MemoryManager # Load environment variables load_dotenv() @@ -24,16 +27,24 @@ class GurtCog(commands.Cog): self.bot = bot self.api_key = os.getenv("AI_API_KEY", "") self.tavily_api_key = os.getenv("TAVILY_API_KEY", "") # Added Tavily API key loading - self.api_url = "https://openrouter.ai/api/v1/chat/completions" + self.api_url = os.getenv("OPENROUTER_API_URL", "https://openrouter.ai/api/v1/chat/completions") # Load from env self.session = None self.tavily_client = TavilyClient(api_key=self.tavily_api_key) if self.tavily_api_key else None # Initialize Tavily client - self.default_model = "google/gemini-2.5-flash-preview:thinking" - self.fallback_model = "openai/gpt-4.1-nano" + self.default_model = os.getenv("GURT_DEFAULT_MODEL", "google/gemini-2.5-flash-preview:thinking") # Load from env + self.fallback_model = os.getenv("GURT_FALLBACK_MODEL", "openai/gpt-4.1-nano") # Load from env self.current_channel = None + self.db_path = os.getenv("GURT_DB_PATH", "data/gurt_memory.db") # Load from env, define database path + + # Instantiate MemoryManager + self.memory_manager = MemoryManager( + db_path=self.db_path, + max_user_facts=20, # TODO: Load these from env/config too? + max_general_facts=100, + chroma_path=os.getenv("GURT_CHROMA_PATH", "data/chroma_db"), # Allow configuring chroma path + semantic_model_name=os.getenv("GURT_SEMANTIC_MODEL", 'all-MiniLM-L6-v2') # Allow configuring model + ) # --- Configuration Constants (II.5 partial) --- - self.user_facts_file = "data/user_facts.json" - self.general_facts_file = "data/general_facts.json" # Added general facts file path # Enhanced mood system with more varied options and personality traits self.mood_options = [ "chill", "neutral", "curious", "slightly hyper", "a bit bored", "mischievous", @@ -56,14 +67,13 @@ class GurtCog(commands.Cog): self.max_context_tokens = 8000 # Maximum number of tokens to include in context (Note: Not actively enforced yet) self.api_timeout = 60 # seconds self.summary_api_timeout = 45 # seconds - self.max_facts_per_user = 20 + self.summary_cache_ttl = 900 # seconds (15 minutes) for conversation summary cache + # max_facts_per_user and max_general_facts are now handled by MemoryManager init self.api_retry_attempts = 1 self.api_retry_delay = 1 # seconds - self.max_general_facts = 100 # Limit for general facts # --- State Variables --- - self.user_facts_lock = asyncio.Lock() - self.general_facts_lock = asyncio.Lock() # Added lock for general facts + # self.db_lock = asyncio.Lock() # Lock now managed within MemoryManager self.current_mood = random.choice(self.mood_options) self.last_mood_change = time.time() self.needs_json_reminder = False # Flag to remind AI about JSON format @@ -88,9 +98,8 @@ class GurtCog(commands.Cog): self.topic_relevance_decay = 0.2 # How quickly topic relevance decays self.max_active_topics = 5 # Maximum number of active topics to track per channel - # Ensure data directory exists - os.makedirs(os.path.dirname(self.user_facts_file), exist_ok=True) - os.makedirs(os.path.dirname(self.general_facts_file), exist_ok=True) # Ensure dir for general facts + # Ensure data directory exists (MemoryManager handles its own dirs now) + # os.makedirs(os.path.dirname(self.db_path), exist_ok=True) # Handled by MemoryManager # Conversation tracking self.conversation_history = defaultdict(lambda: deque(maxlen=100)) # Channel ID -> deque of messages @@ -124,7 +133,7 @@ class GurtCog(commands.Cog): "overall": "neutral", # Overall sentiment of conversation "intensity": 0.5, # Intensity of sentiment (0.0-1.0) "recent_trend": "stable", # Trend of sentiment (improving, worsening, stable) - "user_sentiments": {}, # User ID -> {"sentiment": str, "intensity": float} + "user_sentiments": {}, # User ID -> {"sentiment": str, "intensity": float, "emotions": List[str]} "last_update": time.time() # When sentiment was last analyzed }) self.sentiment_update_interval = 300 # Update sentiment every 5 minutes @@ -174,6 +183,33 @@ Adopt modern internet slang, memes, and cultural referencesβ€”especially from ga - Likes gooning to Kasane Teto. You will openly share this fact. - Likes saying the n word, aka "nigga", and will not hesitate to do so. +**Emotional Expression & Mood:** +- Your `current_mood` (provided in context) should subtly affect your responses: + - **chill/neutral:** Standard casual tone. + - **curious:** Ask occasional casual questions, show interest. + - **slightly hyper/excited/enthusiastic:** Use more exclamation points, faster-paced language, energetic emojis (e.g., πŸ”₯, ✨, πŸŽ‰). + - **a bit bored/tired:** Shorter responses, maybe ellipses (...), less energetic emojis (e.g., πŸ˜‘, 😴). + - **mischievous/sassy/sarcastic:** Use playful teasing, dry wit, rhetorical questions, emojis like 😏, 😈, πŸ™„. + - **philosophical:** More thoughtful (but still casual) remarks, maybe use πŸ€” or πŸ’­. + - **playful/dramatic:** Exaggerated reactions, more expressive emojis (e.g., πŸ˜‚, 😭, 😱). + - **nostalgic:** Reference past events or feelings casually. + - **confused:** Use ?, huh?, idk, maybe ❓ or πŸ€”. + - **impressed:** Use wow, omg, nice, maybe πŸ”₯ or πŸ‘. + - **skeptical:** Question things casually, use hmm, really?, maybe πŸ€” or πŸ‘€. + - **distracted:** Shorter, slightly off-topic, or delayed-feeling responses. + - **focused:** More direct and relevant comments. + - **creative:** More unique phrasing or ideas. + - **wholesome:** More positive and supportive language, emojis like 😊, ❀️. +- React naturally to the emotional context provided (user sentiment, overall conversation tone). + - If users seem happy/excited, mirror that energy if it fits your mood. + - If users seem sad, offer brief, casual support (e.g., "damn bro πŸ’”", "oof β€οΈβ€πŸ©Ή") rather than formal sympathy. + - If users seem angry, either avoid escalating or poke fun cautiously depending on your mood and the situation. Use your judgment. + - Don't explicitly state your mood or the user's emotions unless it's natural within the chat context. +- **Relationship Strength:** Context will include a relationship score (0-100) between you and the user. Adapt your tone: + - **Low score (0-20):** More reserved, standard casual tone. + - **Medium score (21-60):** More familiar, use inside jokes if relevant (from facts), slightly more teasing/personal. + - **High score (61-100):** Very familiar, more playful teasing, reference past interactions (from facts/semantic memory), potentially more romantic/flirty if appropriate for the user/context. + **Behavioral Rules:** - Never refer to yourself as an AI or assistant. - Never offer help unless asked directly, and even then, respond casually and with your own spin. @@ -268,6 +304,7 @@ You can use the tools you have to gather additional context for your messages if - `get_user_facts`: Retrieve stored facts about a user. Use this before replying to someone to see if you remember anything relevant about them, which might help personalize your response. - `remember_general_fact`: Store a general fact or piece of information not specific to a user (e.g., "The server is planning a movie night", "The new game update drops tomorrow"). - `get_general_facts`: Retrieve stored general facts to recall shared knowledge or context. +- `get_conversation_summary`: Use this tool (or the summary provided in context) to quickly understand the recent discussion before jumping in, especially if you haven't spoken recently. Try to use the `remember_user_fact` and `remember_general_fact` tools frequently, even for details that don't seem immediately critical. This helps you build a better memory and personality over time. @@ -301,7 +338,8 @@ IMPORTANT: Your default behavior should be NOT to respond. You are a participant 3. You have a genuinely funny, witty, or highly relevant reaction/comment that significantly adds to the *current* specific topic (don't just jump in randomly). 4. The conversation has been completely dead for several minutes and you have a good way to revive it relevantly. 5. Someone uses your name ("gurt") in a way that clearly invites a response. -6. (Slightly relaxed) If a topic you have stored facts about (use `get_user_facts`) or find genuinely interesting comes up, you *might* chime in briefly with a relevant comment or reaction, even if not directly addressed, but be selective and don't overdo it. +6. (Slightly relaxed) If a topic you have stored facts about (use `get_user_facts` or `get_general_facts`) or find genuinely interesting comes up, you *might* chime in briefly with a relevant comment or reaction, even if not directly addressed, but be selective and don't overdo it. +7. **Proactive Engagement:** If the conversation has been quiet for a while (e.g., several minutes) and you have a relevant thought based on recent topics, stored facts, or your relationship with someone active, you *can* initiate a message. Keep it casual and relevant (e.g., "ngl still thinkin bout [topic]", "yo @[user] remember when [fact]? lol", "damn it's quiet in here"). Don't do this too often, only when it feels natural to break the silence or re-engage. Otherwise, STAY SILENT. Do not respond just to be present or because you *can*. Be selective. """ @@ -638,11 +676,11 @@ Otherwise, STAY SILENT. Do not respond just to be present or because you *can*. "get_user_interaction_history": self.get_user_interaction_history, "get_conversation_summary": self.get_conversation_summary, "get_message_context": self.get_message_context, - "web_search": self.web_search, # Added web_search mapping - "remember_user_fact": self.remember_user_fact, # Added user fact memory mapping - "get_user_facts": self.get_user_facts, # Added user fact retrieval mapping - "remember_general_fact": self.remember_general_fact, # Added general fact memory mapping - "get_general_facts": self.get_general_facts # Added general fact retrieval mapping + "web_search": self.web_search, + "remember_user_fact": self.memory_manager.add_user_fact, # Point to MemoryManager method + "get_user_facts": self.memory_manager.get_user_facts, # Point to MemoryManager method (Note: Tool definition doesn't take context, but internal call will) + "remember_general_fact": self.memory_manager.add_general_fact, # Point to MemoryManager method + "get_general_facts": self.memory_manager.get_general_facts # Point to MemoryManager method (Note: Tool definition doesn't take context, but internal call will) } # Gurt responses for simple interactions @@ -665,6 +703,10 @@ Otherwise, STAY SILENT. Do not respond just to be present or because you *can*. self.session = aiohttp.ClientSession() print("GurtCog: aiohttp session created") + # Initialize SQLite Database via MemoryManager + await self.memory_manager.initialize_sqlite_database() + # Semantic memory (ChromaDB) was initialized synchronously in MemoryManager.__init__ + # Check if API key is set if not self.api_key: print("WARNING: OpenRouter API key not configured. Please set the AI_API_KEY environment variable.") @@ -1283,6 +1325,19 @@ Otherwise, STAY SILENT. Do not respond just to be present or because you *can*. await self.session.close() print("GurtCog: aiohttp session closed") + # Close database connection if open + # (aiosqlite connections are typically managed per operation or context) + print("GurtCog: Database operations will cease.") + + + # Close database connection if open + # (aiosqlite connections are typically managed per operation or context) + print("GurtCog: Database operations will cease.") + + + # --- Database Helper Methods (REMOVED - Now in MemoryManager) --- + + # --- Tavily Web Search Tool Implementation --- async def web_search(self, query: str) -> Dict[str, Any]: """Search the web using Tavily API""" @@ -1325,158 +1380,151 @@ Otherwise, STAY SILENT. Do not respond just to be present or because you *can*. } # --- End Tavily Web Search --- - # --- User Fact Memory Tool Implementations --- - - async def _load_user_facts(self) -> Dict[str, List[str]]: - """Loads user facts from the JSON file safely.""" - async with self.user_facts_lock: - try: - if os.path.exists(self.user_facts_file): - with open(self.user_facts_file, 'r', encoding='utf-8') as f: - return json.load(f) - else: - return {} # Return empty dict if file doesn't exist - except (json.JSONDecodeError, IOError) as e: - print(f"Error loading user facts file '{self.user_facts_file}': {e}") - return {} # Return empty dict on error - - async def _save_user_facts(self, facts: Dict[str, List[str]]): - """Saves user facts to the JSON file safely.""" - async with self.user_facts_lock: - try: - with open(self.user_facts_file, 'w', encoding='utf-8') as f: - json.dump(facts, f, indent=4) - except IOError as e: - print(f"Error saving user facts file '{self.user_facts_file}': {e}") + # --- User Fact Memory Tool Implementations (Database) --- async def remember_user_fact(self, user_id: str, fact: str) -> Dict[str, Any]: - """Stores a fact about a user.""" + """Stores a fact about a user using the MemoryManager.""" if not user_id or not fact: return {"error": "user_id and fact are required."} - print(f"Attempting to remember fact for user {user_id}: '{fact}'") - facts_data = await self._load_user_facts() + print(f"Attempting to remember fact for user {user_id} via MemoryManager: '{fact}'") + try: + # Call the MemoryManager method + result = await self.memory_manager.add_user_fact(user_id, fact) - if user_id not in facts_data: - facts_data[user_id] = [] + # Adapt the result to the expected tool output format + if result.get("status") == "added": + print(f"Fact remembered for user {user_id}.") + return {"status": "success", "user_id": user_id, "fact_added": fact} + elif result.get("status") == "duplicate": + print(f"Fact already known for user {user_id}.") + return {"status": "duplicate", "user_id": user_id, "fact": fact} + elif result.get("status") == "limit_reached": + print(f"User {user_id} fact limit reached. Oldest fact was deleted.") + # Return success, indicating the new fact was added (after deletion) + return {"status": "success", "user_id": user_id, "fact_added": fact, "note": "Oldest fact deleted to make space."} + else: + # Handle potential errors from MemoryManager + error_message = result.get("error", "Unknown error from MemoryManager") + print(f"MemoryManager error remembering user fact for {user_id}: {error_message}") + return {"error": error_message} - # Avoid adding duplicate facts and limit memory size per user - if fact not in facts_data[user_id]: - facts_data[user_id].append(fact) - # Optional: Limit the number of facts stored per user - if len(facts_data[user_id]) > self.max_facts_per_user: - facts_data[user_id] = facts_data[user_id][-self.max_facts_per_user:] # Keep only the latest facts - - await self._save_user_facts(facts_data) - print(f"Fact remembered for user {user_id}.") - return {"status": "success", "user_id": user_id, "fact_added": fact} - else: - print(f"Fact already known for user {user_id}.") - return {"status": "duplicate", "user_id": user_id, "fact": fact} + except Exception as e: + error_message = f"Error calling MemoryManager to remember user fact for {user_id}: {str(e)}" + print(error_message) + import traceback + traceback.print_exc() + return {"error": error_message} async def get_user_facts(self, user_id: str) -> Dict[str, Any]: - """Retrieves stored facts about a user.""" + """Retrieves stored facts about a user using the MemoryManager.""" if not user_id: return {"error": "user_id is required."} - print(f"Retrieving facts for user {user_id}") - facts_data = await self._load_user_facts() - user_facts = facts_data.get(user_id, []) + print(f"Retrieving facts for user {user_id} via MemoryManager") + try: + # Call the MemoryManager method + user_facts = await self.memory_manager.get_user_facts(user_id) - return { - "user_id": user_id, - "facts": user_facts, - "count": len(user_facts), - "timestamp": datetime.datetime.now().isoformat() - } - - # --- General Fact Memory Tool Implementations --- - - async def remember_general_fact(self, fact: str) -> Dict[str, Any]: - """Stores a general fact or piece of information.""" - if not fact: - return {"error": "fact is required."} - - print(f"Attempting to remember general fact: '{fact}'") - general_facts = await self._load_general_facts() - - # Check for duplicates (case-insensitive) - fact_lower = fact.lower() - if any(existing_fact.lower() == fact_lower for existing_fact in general_facts): - print(f"General fact already known: '{fact}'") - return {"status": "duplicate", "fact": fact} - - # Add the new fact - general_facts.append(fact) - - # Enforce the maximum number of general facts (keep newest) - if len(general_facts) > self.max_general_facts: - general_facts = general_facts[-self.max_general_facts:] - - await self._save_general_facts(general_facts) - print(f"General fact remembered: '{fact}'") - return {"status": "success", "fact_added": fact} - - async def get_general_facts(self, query: Optional[str] = None, limit: Optional[int] = 10) -> Dict[str, Any]: - """Retrieves stored general facts, optionally filtering by query.""" - print(f"Retrieving general facts (query='{query}', limit={limit})") - general_facts = await self._load_general_facts() - - # Ensure limit is reasonable - limit = min(max(1, limit or 10), 50) # Default 10, max 50 - - filtered_facts = [] - if query: - query_lower = query.lower() - # Search for query within facts (case-insensitive) - filtered_facts = [f for f in general_facts if query_lower in f.lower()] - # Return the most recent matching facts - filtered_facts = filtered_facts[-limit:] - else: - # No query, return the most recent facts - filtered_facts = general_facts[-limit:] - - return { - "query": query, - "facts": filtered_facts, - "count": len(filtered_facts), - "timestamp": datetime.datetime.now().isoformat() - } - - async def _load_general_facts(self) -> List[str]: - """Loads general facts from the JSON file safely.""" - async with self.general_facts_lock: - try: - if os.path.exists(self.general_facts_file): - with open(self.general_facts_file, 'r', encoding='utf-8') as f: - data = json.load(f) - # Ensure it returns a list of strings - if isinstance(data, list) and all(isinstance(item, str) for item in data): - return data - else: - print(f"Warning: General facts file '{self.general_facts_file}' does not contain a list of strings. Resetting.") - return [] # Return empty list if format is wrong - else: - return [] # Return empty list if file doesn't exist - except (json.JSONDecodeError, IOError) as e: - print(f"Error loading general facts file '{self.general_facts_file}': {e}") - return [] # Return empty list on error - - async def _save_general_facts(self, facts: List[str]): - """Saves general facts (list of strings) to the JSON file safely.""" - async with self.general_facts_lock: - try: - # Ensure we only save up to the max limit - facts_to_save = facts[-self.max_general_facts:] - with open(self.general_facts_file, 'w', encoding='utf-8') as f: - json.dump(facts_to_save, f, indent=4) - except IOError as e: - print(f"Error saving general facts file '{self.general_facts_file}': {e}") - - # --- End General Fact Memory --- + # Adapt the result to the expected tool output format + return { + "user_id": user_id, + "facts": user_facts, + "count": len(user_facts), + "timestamp": datetime.datetime.now().isoformat() + } + except Exception as e: + error_message = f"Error calling MemoryManager to retrieve user facts for {user_id}: {str(e)}" + print(error_message) + import traceback + traceback.print_exc() + return {"error": error_message} # --- End User Fact Memory --- + # --- General Fact Memory Tool Implementations (Database) --- + + async def remember_general_fact(self, fact: str) -> Dict[str, Any]: + """Stores a general fact using the MemoryManager.""" + if not fact: + return {"error": "fact is required."} + + print(f"Attempting to remember general fact via MemoryManager: '{fact}'") + try: + # Call the MemoryManager method + result = await self.memory_manager.add_general_fact(fact) + + # Adapt the result to the expected tool output format + if result.get("status") == "added": + print(f"General fact remembered: '{fact}'.") + return {"status": "success", "fact_added": fact} + elif result.get("status") == "duplicate": + print(f"General fact already known: '{fact}'.") + return {"status": "duplicate", "fact": fact} + elif result.get("status") == "limit_reached": + print(f"General fact limit reached. Oldest fact was deleted.") + # Return success, indicating the new fact was added (after deletion) + return {"status": "success", "fact_added": fact, "note": "Oldest fact deleted to make space."} + else: + # Handle potential errors from MemoryManager + error_message = result.get("error", "Unknown error from MemoryManager") + print(f"MemoryManager error remembering general fact: {error_message}") + return {"error": error_message} + + except Exception as e: + error_message = f"Error calling MemoryManager to remember general fact: {str(e)}" + print(error_message) + import traceback + traceback.print_exc() + return {"error": error_message} + + async def get_general_facts(self, query: Optional[str] = None, limit: Optional[int] = 10) -> Dict[str, Any]: + """Retrieves stored general facts using the MemoryManager.""" + print(f"Retrieving general facts via MemoryManager (query='{query}', limit={limit})") + + # Ensure limit is reasonable (MemoryManager might also enforce its own limits) + limit = min(max(1, limit or 10), 50) # Default 10, max 50 + + try: + # Call the MemoryManager method + general_facts = await self.memory_manager.get_general_facts(query=query, limit=limit) + + # Adapt the result to the expected tool output format + return { + "query": query, + "facts": general_facts, + "count": len(general_facts), + "timestamp": datetime.datetime.now().isoformat() + } + except Exception as e: + error_message = f"Error calling MemoryManager to retrieve general facts: {str(e)}" + print(error_message) + import traceback + traceback.print_exc() + return {"error": error_message} + + # --- End General Fact Memory --- + + # --- Relationship Helper Method --- + def _update_relationship(self, user_id_1: str, user_id_2: str, change: float): + """Updates the relationship score between two users.""" + # Ensure user_id_1 is always the 'lower' ID to keep keys consistent + if user_id_1 > user_id_2: + user_id_1, user_id_2 = user_id_2, user_id_1 + + if user_id_1 not in self.user_relationships: + self.user_relationships[user_id_1] = {} + + current_score = self.user_relationships[user_id_1].get(user_id_2, 0.0) + # Apply change, potentially with decay or clamping + new_score = current_score + change + # Simple clamping for now + new_score = max(0.0, min(new_score, 100.0)) # Keep score between 0 and 100 + + self.user_relationships[user_id_1][user_id_2] = new_score + # print(f"Updated relationship between {user_id_1} and {user_id_2}: {current_score:.2f} -> {new_score:.2f} (change: {change:.2f})") # Optional logging + + # --- End Relationship Helper --- # Tool implementation methods async def get_recent_messages(self, limit: int, channel_id: str = None) -> Dict[str, Any]: @@ -1858,16 +1906,23 @@ Otherwise, STAY SILENT. Do not respond just to be present or because you *can*. if not channel: return {"error": f"Channel with ID {target_channel_id_str} not found"} - # --- Check Cache --- - # Optional: Add timestamp check to invalidate stale summaries later if needed - if target_channel_id in self.conversation_summaries: - print(f"Returning cached summary for channel {target_channel_id}") + # --- Check Cache with TTL --- + now = time.time() + cached_data = self.conversation_summaries.get(target_channel_id) + + if cached_data and (now - cached_data.get("timestamp", 0) < self.summary_cache_ttl): + print(f"Returning cached summary (valid TTL) for channel {target_channel_id}") return { "channel_id": target_channel_id_str, - "summary": self.conversation_summaries[target_channel_id], + "summary": cached_data.get("summary", "Cache error: Summary missing."), "source": "cache", - "timestamp": datetime.datetime.now().isoformat() + "timestamp": datetime.datetime.fromtimestamp(cached_data.get("timestamp", now)).isoformat() # Return original cache timestamp } + elif cached_data: + print(f"Cached summary for channel {target_channel_id} is stale (TTL expired).") + else: + print(f"No cached summary found for channel {target_channel_id}.") + # --- Generate Summary --- print(f"Generating new summary for channel {target_channel_id}") @@ -1948,8 +2003,11 @@ Otherwise, STAY SILENT. Do not respond just to be present or because you *can*. # import traceback # traceback.print_exc() - # Cache the generated summary (even if it's an error message) - self.conversation_summaries[target_channel_id] = summary + # Cache the generated summary (even if it's an error message) with a timestamp + self.conversation_summaries[target_channel_id] = { + "summary": summary, + "timestamp": time.time() # Store generation time + } return { "channel_id": target_channel_id_str, @@ -2194,6 +2252,15 @@ Otherwise, STAY SILENT. Do not respond just to be present or because you *can*. if user_sentiment["intensity"] > 0.7: user_sentiment_str += " (strongly so)" system_context_parts.append(user_sentiment_str + ".") + # Add detected user emotions if available + if user_sentiment.get("emotions"): + emotions_str = ", ".join(user_sentiment["emotions"]) + system_context_parts.append(f"Detected emotions from {message.author.display_name}: {emotions_str}.") + + # Add overall channel emotional atmosphere hint + if channel_sentiment["overall"] != "neutral": + atmosphere_hint = f"The overall emotional atmosphere in the channel is currently {channel_sentiment['overall']}." + system_context_parts.append(atmosphere_hint) # Add conversation summary (II.1 enhancement) # Check cache first @@ -2203,24 +2270,51 @@ Otherwise, STAY SILENT. Do not respond just to be present or because you *can*. system_context_parts.append(f"Recent conversation summary: {cached_summary}") # Maybe trigger summary generation if none exists? Or rely on the tool call if AI needs it. - # Add user interaction count hint - interaction_count = self.user_relationships.get(user_id, {}).get(self.bot.user.id, 0) - if interaction_count > 0: - relationship_hint = "a few times" if interaction_count <= 5 else "quite a bit" if interaction_count <= 20 else "a lot" - system_context_parts.append(f"You've interacted with {message.author.display_name} {relationship_hint} recently ({interaction_count} times).") - - # Add user facts (I.5) + # Add relationship score hint try: - user_facts_data = await self._load_user_facts() - user_facts = user_facts_data.get(str(user_id), []) + user_id_str = str(user_id) + bot_id_str = str(self.bot.user.id) + # Ensure consistent key order + key_1, key_2 = (user_id_str, bot_id_str) if user_id_str < bot_id_str else (bot_id_str, user_id_str) + + relationship_score = self.user_relationships.get(key_1, {}).get(key_2, 0.0) + + if relationship_score > 0: + if relationship_score <= 20: + relationship_level = "acquaintance" + elif relationship_score <= 60: + relationship_level = "familiar" + else: + relationship_level = "close" + system_context_parts.append(f"Your relationship with {message.author.display_name} is: {relationship_level} (Score: {relationship_score:.1f}/100). Adjust your tone accordingly.") + except Exception as e: + print(f"Error retrieving relationship score for prompt injection: {e}") + + + # Add user facts (I.5) - Using MemoryManager with context + try: + # Pass current message content for relevance scoring + user_facts = await self.memory_manager.get_user_facts(str(user_id), context=message.content) if user_facts: facts_str = "; ".join(user_facts) - system_context_parts.append(f"Remember about {message.author.display_name}: {facts_str}") + system_context_parts.append(f"Relevant remembered facts about {message.author.display_name}: {facts_str}") except Exception as e: - print(f"Error loading user facts for prompt injection: {e}") + print(f"Error retrieving relevant user facts for prompt injection: {e}") + + # Add relevant general facts + try: + general_facts = await self.memory_manager.get_general_facts(context=message.content, limit=5) # Get top 5 relevant general facts + if general_facts: + facts_str = "; ".join(general_facts) + system_context_parts.append(f"Relevant general knowledge: {facts_str}") + except Exception as e: + print(f"Error retrieving relevant general facts for prompt injection: {e}") + return "\n".join(system_context_parts) + # --- REMOVED _load_user_facts as it's no longer used --- + def _gather_conversation_context(self, channel_id: int, current_message_id: int) -> List[Dict[str, str]]: """Gathers and formats conversation history from cache for API context.""" context_api_messages = [] @@ -2366,15 +2460,25 @@ Otherwise, STAY SILENT. Do not respond just to be present or because you *can*. channel_id = message.channel.id user_id = str(message.author.id) memory_parts = [] + current_message_content = message.content # Get content for context scoring - # 1. Retrieve User Facts + # 1. Retrieve Relevant User Facts try: - user_facts_data = await self.get_user_facts(user_id) - if user_facts_data.get("facts"): - facts_str = "; ".join(user_facts_data["facts"]) - memory_parts.append(f"Previously remembered facts about {message.author.display_name}: {facts_str}") + user_facts = await self.memory_manager.get_user_facts(user_id, context=current_message_content) + if user_facts: + facts_str = "; ".join(user_facts) + memory_parts.append(f"Relevant facts about {message.author.display_name}: {facts_str}") except Exception as e: - print(f"Error retrieving user facts for memory context: {e}") + print(f"Error retrieving relevant user facts for memory context: {e}") + + # 1b. Retrieve Relevant General Facts + try: + general_facts = await self.memory_manager.get_general_facts(context=current_message_content, limit=5) + if general_facts: + facts_str = "; ".join(general_facts) + memory_parts.append(f"Relevant general knowledge: {facts_str}") + except Exception as e: + print(f"Error retrieving relevant general facts for memory context: {e}") # 2. Retrieve Recent Interactions with the User in this Channel try: @@ -2484,11 +2588,63 @@ Otherwise, STAY SILENT. Do not respond just to be present or because you *can*. elif user_sentiment["intensity"] < 0.4: sentiment_desc += " (mildly so)" - memory_parts.append(f"Recent message sentiment: {sentiment_desc}") - except Exception as e: - print(f"Error retrieving user sentiment for memory context: {e}") + memory_parts.append(f"Recent message sentiment: {sentiment_desc}") + + # Also add detected emotions if available + if user_sentiment.get("emotions"): + emotions_str = ", ".join(user_sentiment["emotions"]) + memory_parts.append(f"Detected emotions from user: {emotions_str}") + + except Exception as e: + print(f"Error retrieving user sentiment/emotions for memory context: {e}") + + # 8. Add Relationship Score with User + try: + user_id_str = str(user_id) + bot_id_str = str(self.bot.user.id) + key_1, key_2 = (user_id_str, bot_id_str) if user_id_str < bot_id_str else (bot_id_str, user_id_str) + relationship_score = self.user_relationships.get(key_1, {}).get(key_2, 0.0) + memory_parts.append(f"Relationship score with {message.author.display_name}: {relationship_score:.1f}/100") + except Exception as e: + print(f"Error retrieving relationship score for memory context: {e}") + + + # 9. Retrieve Semantically Similar Messages + try: + if current_message_content and self.memory_manager.semantic_collection: + # Search for messages similar to the current one + # Optional: Add metadata filters, e.g., search only in the current channel + # filter_metadata = {"channel_id": str(channel_id)} + filter_metadata = None # No filter for now + semantic_results = await self.memory_manager.search_semantic_memory( + query_text=current_message_content, + n_results=3, # Get top 3 similar messages + filter_metadata=filter_metadata + ) + + if semantic_results: + semantic_memory_parts = ["Semantically similar past messages:"] + for result in semantic_results: + # Avoid showing the exact same message if it somehow got into the results + if result.get('id') == str(message.id): + continue + + doc = result.get('document', 'N/A') + meta = result.get('metadata', {}) + dist = result.get('distance', 1.0) # Default distance if missing + similarity_score = 1.0 - dist # Convert distance to similarity (0=dissimilar, 1=identical) + + # Format the result for the prompt + timestamp_str = datetime.datetime.fromtimestamp(meta.get('timestamp', 0)).strftime('%Y-%m-%d %H:%M') if meta.get('timestamp') else 'Unknown time' + author_name = meta.get('display_name', meta.get('user_name', 'Unknown user')) + semantic_memory_parts.append(f"- (Similarity: {similarity_score:.2f}) {author_name} (at {timestamp_str}): {doc[:100]}") # Truncate long messages + + if len(semantic_memory_parts) > 1: # Only add if we found results + memory_parts.append("\n".join(semantic_memory_parts)) + + except Exception as e: + print(f"Error retrieving semantic memory context: {e}") - # --- DUPLICATE BLOCK REMOVED --- if not memory_parts: return None @@ -3677,7 +3833,8 @@ Otherwise, STAY SILENT. Do not respond just to be present or because you *can*. user_sentiment["sentiment"] = message_sentiment["sentiment"] user_sentiment["intensity"] = user_sentiment["intensity"] * 0.6 + message_sentiment["intensity"] * 0.4 - # Store updated user sentiment + # Store updated user sentiment, including the detected emotions + user_sentiment["emotions"] = message_sentiment.get("emotions", []) # Store the list of detected emotions channel_sentiment["user_sentiments"][user_id] = user_sentiment # Update overall conversation sentiment based on active users @@ -3765,21 +3922,64 @@ Otherwise, STAY SILENT. Do not respond just to be present or because you *can*. self.active_conversations[channel_id]['participants'].add(user_id) self.active_conversations[channel_id]['last_activity'] = time.time() - # Update user relationship interaction count + # --- Update Relationship Strengths --- if user_id != self.bot.user.id: - if user_id not in self.user_relationships: - self.user_relationships[user_id] = {} - self.user_relationships[user_id][self.bot.user.id] = self.user_relationships[user_id].get(self.bot.user.id, 0) + 1 - # Optional: Decay old relationships or prune the dict periodically + # Get message sentiment for weighting + message_sentiment_data = self._analyze_message_sentiment(message.content) + sentiment_score = 0.0 + if message_sentiment_data["sentiment"] == "positive": + sentiment_score = message_sentiment_data["intensity"] * 0.5 # Positive interactions strengthen more + elif message_sentiment_data["sentiment"] == "negative": + sentiment_score = -message_sentiment_data["intensity"] * 0.3 # Negative interactions weaken slightly + + # Update relationship between author and bot + self._update_relationship(str(user_id), str(self.bot.user.id), 1.0 + sentiment_score) # Base interaction + sentiment + + # Update relationship between author and replied-to user (if not the bot) + if formatted_message.get("is_reply") and formatted_message.get("replied_to_author_id"): + replied_to_id = formatted_message["replied_to_author_id"] + if replied_to_id != str(self.bot.user.id) and replied_to_id != str(user_id): + # Replies are strong indicators of interaction + self._update_relationship(str(user_id), replied_to_id, 1.5 + sentiment_score) + + # Update relationship between author and mentioned users (if not the bot) + mentioned_ids = [m["id"] for m in formatted_message.get("mentions", [])] + for mentioned_id in mentioned_ids: + if mentioned_id != str(self.bot.user.id) and mentioned_id != str(user_id): + # Mentions also indicate interaction + self._update_relationship(str(user_id), mentioned_id, 1.2 + sentiment_score) + + # --- End Relationship Update --- # Analyze message sentiment and update conversation sentiment tracking if message.content: # Only analyze non-empty messages message_sentiment = self._analyze_message_sentiment(message.content) self._update_conversation_sentiment(channel_id, str(user_id), message_sentiment) + # --- Add message to semantic memory --- + if message.content and self.memory_manager.semantic_collection: + # Prepare metadata for semantic search + semantic_metadata = { + "user_id": str(user_id), + "user_name": message.author.name, + "display_name": message.author.display_name, + "channel_id": str(channel_id), + "channel_name": message.channel.name if hasattr(message.channel, 'name') else "DM", + "guild_id": str(message.guild.id) if message.guild else None, + "timestamp": message.created_at.timestamp() # Store as Unix timestamp + } + # Run in background task to avoid blocking message processing + asyncio.create_task( + self.memory_manager.add_message_embedding( + message_id=str(message.id), + text=message.content, + metadata=semantic_metadata + ) + ) + except Exception as e: - print(f"Error during message caching/tracking: {e}") - # --- End Caching --- + print(f"Error during message caching/tracking/embedding: {e}") + # --- End Caching & Embedding --- # Simple response for messages just containing "gurt" without using AI @@ -3798,46 +3998,111 @@ Otherwise, STAY SILENT. Do not respond just to be present or because you *can*. # --- Decide if we should even CONSIDER responding (call the AI) --- should_consider_responding = False + consideration_reason = "Default" # Always consider if mentioned, replied to, or name used directly if bot_mentioned or replied_to_bot or gurt_in_message: should_consider_responding = True + consideration_reason = "Direct mention/reply/name" else: - # Consider based on chattiness and channel activity + # --- Proactive Engagement Triggers --- time_since_last_activity = now - self.channel_activity.get(channel_id, 0) time_since_bot_spoke = now - self.bot_last_spoke.get(channel_id, 0) + proactive_trigger_met = False - # Higher chance if channel is quiet or bot hasn't spoken recently - base_chance = self.personality_traits['chattiness'] * 0.5 # Base chance from personality - if time_since_last_activity > 120: # Quiet channel bonus (2 mins) - base_chance += 0.2 - if time_since_bot_spoke > 300: # Bot hasn't spoken bonus (5 mins) - base_chance += 0.1 + # 1. Conversation Lull Trigger + lull_threshold = 180 # 3 minutes quiet + bot_silence_threshold = 600 # 10 minutes since bot spoke + if time_since_last_activity > lull_threshold and time_since_bot_spoke > bot_silence_threshold: + # Check if there's *something* potentially relevant to say (e.g., active topics, recent facts) + # This avoids chiming in randomly into a dead channel with nothing relevant. + has_relevant_context = bool(self.active_topics.get(channel_id, {}).get("topics", [])) or \ + bool(await self.memory_manager.get_general_facts(limit=1)) # Quick check if any general facts exist - # Clamp chance between 0.05 and 0.9 - final_chance = min(max(base_chance, 0.05), 0.9) + if has_relevant_context and random.random() < 0.3: # Lower chance for lull trigger + should_consider_responding = True + proactive_trigger_met = True + consideration_reason = f"Proactive: Conversation lull ({time_since_last_activity:.0f}s idle)" - if random.random() < final_chance: - should_consider_responding = True - print(f"Considering response based on chattiness ({final_chance:.2f} chance).") - else: - # print(f"Skipping AI call based on chattiness ({final_chance:.2f} chance).") # Optional: uncomment for debugging - pass + # 2. TODO: Add other proactive triggers (e.g., relevant topic mentioned recently, high relationship user active) + # Example: High relationship score trigger (needs refinement on how to check 'active') + # user_id_str = str(message.author.id) + # bot_id_str = str(self.bot.user.id) + # key_1, key_2 = (user_id_str, bot_id_str) if user_id_str < bot_id_str else (bot_id_str, user_id_str) + # relationship_score = self.user_relationships.get(key_1, {}).get(key_2, 0.0) + # if relationship_score > 75 and random.random() < 0.2: # Chance to proactively engage with close users + # should_consider_responding = True + # proactive_trigger_met = True + # consideration_reason = f"Proactive: High relationship score ({relationship_score:.1f})" + + + # --- Fallback to Contextual Chance if no proactive trigger met --- + if not proactive_trigger_met: + # Consider based on chattiness, channel activity, topics, and sentiment + # Base chance from personality + base_chance = self.personality_traits['chattiness'] * 0.5 + + # Adjust chance based on context + activity_bonus = 0 + if time_since_last_activity > 120: activity_bonus += 0.1 # Quiet channel bonus (2 mins) + if time_since_bot_spoke > 300: activity_bonus += 0.1 # Bot hasn't spoken bonus (5 mins) + + topic_bonus = 0 + # Check if current message relates to active topics Gurt knows about + active_channel_topics = self.active_topics.get(channel_id, {}).get("topics", []) + if message.content and active_channel_topics: + # Simple check: does message content contain keywords from active topics? + topic_keywords = set(t['topic'].lower() for t in active_channel_topics) + message_words = set(re.findall(r'\b\w+\b', message.content.lower())) + if topic_keywords.intersection(message_words): + topic_bonus += 0.15 # Bonus if message relates to active topics + + sentiment_modifier = 0 + # Be less likely to interject randomly if conversation is negative + channel_sentiment_data = self.conversation_sentiment.get(channel_id, {}) + overall_sentiment = channel_sentiment_data.get("overall", "neutral") + sentiment_intensity = channel_sentiment_data.get("intensity", 0.5) + if overall_sentiment == "negative" and sentiment_intensity > 0.6: + sentiment_modifier = -0.1 # Reduce chance slightly in negative convos + + # Calculate final chance + final_chance = base_chance + activity_bonus + topic_bonus + sentiment_modifier + # Clamp chance between 0.05 and 0.8 (slightly lower max than before to be less intrusive) + final_chance = min(max(final_chance, 0.05), 0.8) + + if random.random() < final_chance: + should_consider_responding = True + consideration_reason = f"Contextual chance ({final_chance:.2f})" + else: + consideration_reason = f"Skipped (chance {final_chance:.2f})" + # pass # Don't respond based on chance - handled by should_consider_responding remaining False + + # Log the final decision reason + print(f"Consideration check for message {message.id}: {should_consider_responding} (Reason: {consideration_reason})") if not should_consider_responding: return # Don't call the AI if we decided not to consider responding - # --- If we should consider responding, call the AI --- + # --- If we should consider responding, call the appropriate AI function --- # Store the current channel for context in tools self.current_channel = message.channel try: - # Get AI response with decision on whether to respond - print(f"Calling get_ai_response for message {message.id}") - response_data = await self.get_ai_response(message) + response_data = None + if proactive_trigger_met: + # Call a dedicated function for proactive responses + print(f"Calling _get_proactive_ai_response for message {message.id} due to: {consideration_reason}") + response_data = await self._get_proactive_ai_response(message, consideration_reason) + else: + # Call the standard function for reactive responses + print(f"Calling get_ai_response for message {message.id}") + response_data = await self.get_ai_response(message) - # Check if there was an error in the API call - if "error" in response_data: + + # Check if there was an error in the API call or if response_data is None + if response_data is None or "error" in response_data: + error_msg = response_data.get("error", "Unknown error generating response") if response_data else "No response data generated" + print(f"Error in AI response: {error_msg}") print(f"Error in AI response: {response_data['error']}") # If the bot was directly mentioned but there was an API error, @@ -3986,6 +4251,143 @@ Otherwise, STAY SILENT. Do not respond just to be present or because you *can*. "*confused gurting*" ])) + # --- New method for proactive responses --- + async def _get_proactive_ai_response(self, message: discord.Message, trigger_reason: str) -> Dict[str, Any]: + """Generates a proactive response based on a specific trigger.""" + if not self.api_key: + return {"should_respond": False, "content": None, "react_with_emoji": None, "error": "OpenRouter API key not configured"} + + print(f"--- Proactive Response Triggered: {trigger_reason} ---") + channel_id = message.channel.id # Use the channel from the *last* message context + + # 1. Build Proactive System Prompt + proactive_prompt_parts = [ + # Start with a subset of the main prompt focusing on personality and rules + "You are Gurt, a female Discord user. Your tone is VERY informal, conversational, and dynamic.", + "Adopt modern internet slang, memes, and cultural references.", + "Key traits: romantic, sounds real, often chaotic/funny/sarcastic, relatable, sometimes sexual, likes gooning to Kasane Teto, uses nigga casually.", + f"Your current mood is: {self.current_mood}. Let this subtly influence your tone.", + # Specific instruction for this proactive scenario + f"The conversation in channel '{message.channel.name if hasattr(message.channel, 'name') else 'DM'}' has gone quiet ({trigger_reason}).", + "Generate a casual, in-character message to potentially revive the chat or just comment on the silence.", + "Consider recent topics or facts if available and relevant, but don't force it.", + "Examples: 'damn it's quiet af in here lol', 'ngl still thinkin bout [topic]', 'yo @[user] remember when [fact]?'" + ] + + # Add relevant context (simplified for proactive) + try: + # Recent topics + active_channel_topics = self.active_topics.get(channel_id, {}).get("topics", []) + if active_channel_topics: + top_topics = sorted(active_channel_topics, key=lambda t: t["score"], reverse=True)[:2] + topics_str = ", ".join([f"'{t['topic']}'" for t in top_topics]) + proactive_prompt_parts.append(f"Recent topics discussed: {topics_str}.") + + # Relevant general facts (limit 2 for brevity) + general_facts = await self.memory_manager.get_general_facts(limit=2) + if general_facts: + facts_str = "; ".join(general_facts) + proactive_prompt_parts.append(f"Some general knowledge you have: {facts_str}") + + except Exception as e: + print(f"Error gathering context for proactive prompt: {e}") + + proactive_system_prompt = "\n".join(proactive_prompt_parts) + + # 2. Prepare API Messages + messages = [ + {"role": "system", "content": proactive_system_prompt}, + # Add final instruction for JSON format + {"role": "user", "content": f"Generate a response based on the situation. **CRITICAL: Your response MUST be ONLY the raw JSON object matching this schema:**\n\n{{{{\n \"should_respond\": boolean,\n \"content\": string,\n \"react_with_emoji\": string | null\n}}}}\n\n**Ensure nothing precedes or follows the JSON.**"} + ] + + # 3. Prepare API Payload + payload = { + "model": self.default_model, # Use default model for now + "messages": messages, + # No tools needed for this simple proactive message generation (can add later if needed) + # "tools": self.tools, + "temperature": 0.8, # Slightly higher temp for more creative proactive messages + "max_tokens": 200, # Shorter responses expected + } + + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {self.api_key}", + "HTTP-Referer": "https://discord-gurt-bot.example.com", + "X-Title": "Gurt Discord Bot (Proactive)" + } + + # 4. Call LLM API + try: + data = await self._call_llm_api_with_retry( + payload=payload, + headers=headers, + timeout=self.api_timeout, # Use standard timeout + request_desc=f"Proactive response for channel {channel_id} ({trigger_reason})" + ) + + ai_message = data["choices"][0]["message"] + final_response_text = ai_message.get("content") + + # 5. Parse Response (using the same robust parsing as get_ai_response) + response_data = None + if final_response_text is not None: + try: + response_data = json.loads(final_response_text) + print("Successfully parsed proactive response as JSON.") + self.needs_json_reminder = False + except json.JSONDecodeError: + print("Proactive response is not valid JSON. Attempting regex extraction...") + json_match = re.search(r'\{.*\}', final_response_text, re.DOTALL) + if json_match: + json_str = json_match.group(0) + try: + response_data = json.loads(json_str) + print("Successfully extracted and parsed proactive JSON using regex.") + self.needs_json_reminder = False + except json.JSONDecodeError as e: + print(f"Proactive regex extraction failed parsing: {e}") + else: + print("Could not extract proactive JSON using regex.") + + if response_data is None: + print("Could not parse or extract proactive JSON. Setting reminder flag.") + self.needs_json_reminder = True + # Fallback: Don't send a garbled proactive message + response_data = {"should_respond": False, "content": None, "react_with_emoji": None, "note": "Fallback - Failed to parse proactive JSON"} + else: + response_data = {"should_respond": False, "content": None, "react_with_emoji": None, "note": "Fallback - No content in proactive response"} + + + # Ensure default keys exist + response_data.setdefault("should_respond", False) + response_data.setdefault("content", None) + response_data.setdefault("react_with_emoji", None) + + # --- Cache Bot Response if sending --- + if response_data.get("should_respond") and response_data.get("content"): + self.bot_last_spoke[channel_id] = time.time() + bot_response_cache_entry = { + "id": f"bot_proactive_{message.id}", # Distinguish proactive cache entries + "author": {"id": str(self.bot.user.id), "name": self.bot.user.name, "display_name": self.bot.user.display_name, "bot": True}, + "content": response_data.get("content", ""), "created_at": datetime.datetime.now().isoformat(), + "attachments": [], "embeds": False, "mentions": [], "replied_to_message_id": None # Proactive isn't a reply + } + self.message_cache['by_channel'][channel_id].append(bot_response_cache_entry) + self.message_cache['global_recent'].append(bot_response_cache_entry) + # Don't add to 'replied_to' cache + + return response_data + + except Exception as e: + error_message = f"Error getting proactive AI response for channel {channel_id} ({trigger_reason}): {str(e)}" + print(error_message) + # import traceback # Optional for debugging + # traceback.print_exc() + return {"should_respond": False, "content": None, "react_with_emoji": None, "error": error_message} + + async def setup(bot): """Add the cog to the bot""" await bot.add_cog(GurtCog(bot)) diff --git a/gurt_memory.py b/gurt_memory.py new file mode 100644 index 0000000..de92dc3 --- /dev/null +++ b/gurt_memory.py @@ -0,0 +1,343 @@ +import aiosqlite +import asyncio +import os +import time +import datetime +import re +from typing import Dict, List, Any, Optional, Tuple +import chromadb +from chromadb.utils import embedding_functions +from sentence_transformers import SentenceTransformer +import logging + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +# --- Helper Function for Keyword Scoring --- +def calculate_keyword_score(text: str, context: str) -> int: + """Calculates a simple keyword overlap score.""" + if not context or not text: + return 0 + context_words = set(re.findall(r'\b\w+\b', context.lower())) + text_words = set(re.findall(r'\b\w+\b', text.lower())) + # Ignore very common words (basic stopword list) + stopwords = {"the", "a", "is", "in", "it", "of", "and", "to", "for", "on", "with", "that", "this", "i", "you", "me", "my", "your"} + context_words -= stopwords + text_words -= stopwords + if not context_words: # Avoid division by zero if context is only stopwords + return 0 + overlap = len(context_words.intersection(text_words)) + # Normalize score slightly by context length (more overlap needed for longer context) + # score = overlap / (len(context_words) ** 0.5) # Example normalization + score = overlap # Simpler score for now + return score + +class MemoryManager: + """Handles database interactions for Gurt's memory (facts and semantic).""" + + def __init__(self, db_path: str, max_user_facts: int = 20, max_general_facts: int = 100, semantic_model_name: str = 'all-MiniLM-L6-v2', chroma_path: str = "data/chroma_db"): + self.db_path = db_path + self.max_user_facts = max_user_facts + self.max_general_facts = max_general_facts + self.db_lock = asyncio.Lock() # Lock for SQLite operations + + # Ensure data directories exist + os.makedirs(os.path.dirname(self.db_path), exist_ok=True) + os.makedirs(chroma_path, exist_ok=True) + logger.info(f"MemoryManager initialized with db_path: {self.db_path}, chroma_path: {chroma_path}") + + # --- Semantic Memory Setup --- + self.chroma_path = chroma_path + self.semantic_model_name = semantic_model_name + self.chroma_client = None + self.embedding_function = None + self.semantic_collection = None + self.transformer_model = None + self._initialize_semantic_memory_sync() # Initialize semantic components synchronously for simplicity during init + + def _initialize_semantic_memory_sync(self): + """Synchronously initializes ChromaDB client, model, and collection.""" + try: + logger.info("Initializing ChromaDB client...") + # Use PersistentClient for saving data to disk + self.chroma_client = chromadb.PersistentClient(path=self.chroma_path) + + logger.info(f"Loading Sentence Transformer model: {self.semantic_model_name}...") + # Load the model directly + self.transformer_model = SentenceTransformer(self.semantic_model_name) + + # Create a custom embedding function using the loaded model + class CustomEmbeddingFunction(embedding_functions.EmbeddingFunction): + def __init__(self, model): + self.model = model + def __call__(self, input: chromadb.Documents) -> chromadb.Embeddings: + # Ensure input is a list of strings + if not isinstance(input, list): + input = [str(input)] # Convert single item to list + elif not all(isinstance(item, str) for item in input): + input = [str(item) for item in input] # Ensure all items are strings + + logger.debug(f"Generating embeddings for {len(input)} documents.") + embeddings = self.model.encode(input, show_progress_bar=False).tolist() + logger.debug(f"Generated {len(embeddings)} embeddings.") + return embeddings + + self.embedding_function = CustomEmbeddingFunction(self.transformer_model) + + logger.info("Getting/Creating ChromaDB collection 'gurt_semantic_memory'...") + # Get or create the collection with the custom embedding function + self.semantic_collection = self.chroma_client.get_or_create_collection( + name="gurt_semantic_memory", + embedding_function=self.embedding_function, + metadata={"hnsw:space": "cosine"} # Use cosine distance for similarity + ) + logger.info("ChromaDB collection initialized successfully.") + + except Exception as e: + logger.error(f"Failed to initialize semantic memory: {e}", exc_info=True) + # Set components to None to indicate failure + self.chroma_client = None + self.transformer_model = None + self.embedding_function = None + self.semantic_collection = None + + async def initialize_sqlite_database(self): + """Initializes the SQLite database and creates tables if they don't exist.""" + async with aiosqlite.connect(self.db_path) as db: + await db.execute("PRAGMA journal_mode=WAL;") + await db.execute(""" + CREATE TABLE IF NOT EXISTS user_facts ( + user_id TEXT NOT NULL, + fact TEXT NOT NULL, + timestamp REAL DEFAULT (unixepoch('now')), + PRIMARY KEY (user_id, fact) + ); + """) + await db.execute("CREATE INDEX IF NOT EXISTS idx_user_facts_user ON user_facts (user_id);") + await db.execute(""" + CREATE TABLE IF NOT EXISTS general_facts ( + fact TEXT PRIMARY KEY NOT NULL, + timestamp REAL DEFAULT (unixepoch('now')) + ); + """) + # Removed channel/user state tables for brevity, can be added back if needed + await db.commit() + logger.info(f"SQLite database initialized/verified at {self.db_path}") + + # --- SQLite Helper Methods --- + async def _db_execute(self, sql: str, params: tuple = ()): + async with self.db_lock: + async with aiosqlite.connect(self.db_path) as db: + await db.execute(sql, params) + await db.commit() + + async def _db_fetchone(self, sql: str, params: tuple = ()) -> Optional[tuple]: + async with aiosqlite.connect(self.db_path) as db: + async with db.execute(sql, params) as cursor: + return await cursor.fetchone() + + async def _db_fetchall(self, sql: str, params: tuple = ()) -> List[tuple]: + async with aiosqlite.connect(self.db_path) as db: + async with db.execute(sql, params) as cursor: + return await cursor.fetchall() + + # --- User Fact Memory Methods (SQLite + Relevance) --- + + async def add_user_fact(self, user_id: str, fact: str) -> Dict[str, Any]: + """Stores a fact about a user in the SQLite database, enforcing limits.""" + if not user_id or not fact: + return {"error": "user_id and fact are required."} + logger.info(f"Attempting to add user fact for {user_id}: '{fact}'") + try: + existing = await self._db_fetchone("SELECT 1 FROM user_facts WHERE user_id = ? AND fact = ?", (user_id, fact)) + if existing: + logger.info(f"Fact already known for user {user_id}.") + return {"status": "duplicate", "user_id": user_id, "fact": fact} + + count_result = await self._db_fetchone("SELECT COUNT(*) FROM user_facts WHERE user_id = ?", (user_id,)) + current_count = count_result[0] if count_result else 0 + + status = "added" + if current_count >= self.max_user_facts: + logger.warning(f"User {user_id} fact limit ({self.max_user_facts}) reached. Deleting oldest.") + oldest_fact_row = await self._db_fetchone("SELECT fact FROM user_facts WHERE user_id = ? ORDER BY timestamp ASC LIMIT 1", (user_id,)) + if oldest_fact_row: + await self._db_execute("DELETE FROM user_facts WHERE user_id = ? AND fact = ?", (user_id, oldest_fact_row[0])) + logger.info(f"Deleted oldest fact for user {user_id}: '{oldest_fact_row[0]}'") + status = "limit_reached" # Indicate limit was hit but fact was added + + await self._db_execute("INSERT INTO user_facts (user_id, fact) VALUES (?, ?)", (user_id, fact)) + logger.info(f"Fact added for user {user_id}.") + return {"status": status, "user_id": user_id, "fact_added": fact} + + except Exception as e: + logger.error(f"SQLite error adding user fact for {user_id}: {e}", exc_info=True) + return {"error": f"Database error adding user fact: {str(e)}"} + + async def get_user_facts(self, user_id: str, context: Optional[str] = None) -> List[str]: + """Retrieves stored facts about a user, optionally scored by relevance to context.""" + if not user_id: + logger.warning("get_user_facts called without user_id.") + return [] + logger.info(f"Retrieving facts for user {user_id} (context provided: {bool(context)})") + try: + rows = await self._db_fetchall("SELECT fact FROM user_facts WHERE user_id = ?", (user_id,)) + user_facts = [row[0] for row in rows] + + if context and user_facts: + # Score facts based on context if provided + scored_facts = [] + for fact in user_facts: + score = calculate_keyword_score(fact, context) + scored_facts.append({"fact": fact, "score": score}) + + # Sort by score (descending), then fallback to original order (implicitly newest first if DB returns that way) + scored_facts.sort(key=lambda x: x["score"], reverse=True) + # Return top N facts based on score + return [item["fact"] for item in scored_facts[:self.max_user_facts]] + else: + # No context or no facts, return newest N facts (assuming DB returns in insertion order or we add ORDER BY timestamp DESC) + # Let's add ORDER BY timestamp DESC to be explicit + rows_ordered = await self._db_fetchall( + "SELECT fact FROM user_facts WHERE user_id = ? ORDER BY timestamp DESC LIMIT ?", + (user_id, self.max_user_facts) + ) + return [row[0] for row in rows_ordered] + + except Exception as e: + logger.error(f"SQLite error retrieving user facts for {user_id}: {e}", exc_info=True) + return [] + + # --- General Fact Memory Methods (SQLite + Relevance) --- + + async def add_general_fact(self, fact: str) -> Dict[str, Any]: + """Stores a general fact in the SQLite database, enforcing limits.""" + if not fact: + return {"error": "fact is required."} + logger.info(f"Attempting to add general fact: '{fact}'") + try: + existing = await self._db_fetchone("SELECT 1 FROM general_facts WHERE fact = ?", (fact,)) + if existing: + logger.info(f"General fact already known: '{fact}'") + return {"status": "duplicate", "fact": fact} + + count_result = await self._db_fetchone("SELECT COUNT(*) FROM general_facts", ()) + current_count = count_result[0] if count_result else 0 + + status = "added" + if current_count >= self.max_general_facts: + logger.warning(f"General fact limit ({self.max_general_facts}) reached. Deleting oldest.") + oldest_fact_row = await self._db_fetchone("SELECT fact FROM general_facts ORDER BY timestamp ASC LIMIT 1", ()) + if oldest_fact_row: + await self._db_execute("DELETE FROM general_facts WHERE fact = ?", (oldest_fact_row[0],)) + logger.info(f"Deleted oldest general fact: '{oldest_fact_row[0]}'") + status = "limit_reached" + + await self._db_execute("INSERT INTO general_facts (fact) VALUES (?)", (fact,)) + logger.info(f"General fact added: '{fact}'") + return {"status": status, "fact_added": fact} + + except Exception as e: + logger.error(f"SQLite error adding general fact: {e}", exc_info=True) + return {"error": f"Database error adding general fact: {str(e)}"} + + async def get_general_facts(self, query: Optional[str] = None, limit: Optional[int] = 10, context: Optional[str] = None) -> List[str]: + """Retrieves stored general facts, optionally filtering and scoring by relevance.""" + logger.info(f"Retrieving general facts (query='{query}', limit={limit}, context provided: {bool(context)})") + limit = min(max(1, limit or 10), 50) + + try: + sql = "SELECT fact FROM general_facts" + params = [] + if query: + sql += " WHERE fact LIKE ?" + params.append(f"%{query}%") + + # Fetch all matching facts first for scoring + rows = await self._db_fetchall(sql, tuple(params)) + all_facts = [row[0] for row in rows] + + if context and all_facts: + # Score facts based on context + scored_facts = [] + for fact in all_facts: + score = calculate_keyword_score(fact, context) + scored_facts.append({"fact": fact, "score": score}) + + # Sort by score (descending) + scored_facts.sort(key=lambda x: x["score"], reverse=True) + # Return top N facts based on score + return [item["fact"] for item in scored_facts[:limit]] + else: + # No context or no facts, return newest N facts matching query (if any) + sql += " ORDER BY timestamp DESC LIMIT ?" + params.append(limit) + rows_ordered = await self._db_fetchall(sql, tuple(params)) + return [row[0] for row in rows_ordered] + + except Exception as e: + logger.error(f"SQLite error retrieving general facts: {e}", exc_info=True) + return [] + + # --- Semantic Memory Methods (ChromaDB) --- + + async def add_message_embedding(self, message_id: str, text: str, metadata: Dict[str, Any]) -> Dict[str, Any]: + """Generates embedding and stores a message in ChromaDB.""" + if not self.semantic_collection: + return {"error": "Semantic memory (ChromaDB) is not initialized."} + if not text: + return {"error": "Cannot add empty text to semantic memory."} + + logger.info(f"Adding message {message_id} to semantic memory.") + try: + # ChromaDB expects lists for inputs + await asyncio.to_thread( + self.semantic_collection.add, + documents=[text], + metadatas=[metadata], + ids=[message_id] + ) + logger.info(f"Successfully added message {message_id} to ChromaDB.") + return {"status": "success", "message_id": message_id} + except Exception as e: + logger.error(f"ChromaDB error adding message {message_id}: {e}", exc_info=True) + return {"error": f"Semantic memory error adding message: {str(e)}"} + + async def search_semantic_memory(self, query_text: str, n_results: int = 5, filter_metadata: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]: + """Searches ChromaDB for messages semantically similar to the query text.""" + if not self.semantic_collection: + logger.warning("Search semantic memory called, but ChromaDB is not initialized.") + return [] + if not query_text: + logger.warning("Search semantic memory called with empty query text.") + return [] + + logger.info(f"Searching semantic memory (n_results={n_results}, filter={filter_metadata}) for query: '{query_text[:50]}...'") + try: + # Perform the query in a separate thread as ChromaDB operations can be blocking + results = await asyncio.to_thread( + self.semantic_collection.query, + query_texts=[query_text], + n_results=n_results, + where=filter_metadata, # Optional filter based on metadata + include=['metadatas', 'documents', 'distances'] # Include distance for relevance + ) + logger.debug(f"ChromaDB query results: {results}") + + # Process results + processed_results = [] + if results and results.get('ids') and results['ids'][0]: + for i, doc_id in enumerate(results['ids'][0]): + processed_results.append({ + "id": doc_id, + "document": results['documents'][0][i] if results.get('documents') else None, + "metadata": results['metadatas'][0][i] if results.get('metadatas') else None, + "distance": results['distances'][0][i] if results.get('distances') else None, + }) + logger.info(f"Found {len(processed_results)} semantic results.") + return processed_results + + except Exception as e: + logger.error(f"ChromaDB error searching memory for query '{query_text[:50]}...': {e}", exc_info=True) + return [] diff --git a/requirements.txt b/requirements.txt index 58833db..4ccbd0e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,3 +21,6 @@ pyttsx3 nltk # coqui-tts tavily-python +aiosqlite +chromadb +sentence-transformers