1543 lines
67 KiB
Python
1543 lines
67 KiB
Python
import aiosqlite
|
|
import asyncio
|
|
import os
|
|
import time
|
|
import datetime
|
|
import re
|
|
import hashlib # Added for chroma_id generation
|
|
import json # Added for personality trait serialization/deserialization
|
|
from typing import Dict, List, Any, Optional, Tuple, Union # Added Union
|
|
import chromadb
|
|
from chromadb.utils import embedding_functions
|
|
from sentence_transformers import SentenceTransformer
|
|
import logging
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Constants
|
|
INTEREST_INITIAL_LEVEL = 0.1
|
|
INTEREST_MAX_LEVEL = 1.0
|
|
INTEREST_MIN_LEVEL = 0.0
|
|
INTEREST_DECAY_RATE = 0.02 # Default decay rate per cycle
|
|
INTEREST_DECAY_INTERVAL_HOURS = 24 # Default interval for decay check
|
|
|
|
|
|
# --- Helper Function for Keyword Scoring ---
|
|
def calculate_keyword_score(text: str, context: str) -> int:
|
|
"""Calculates a simple keyword overlap score."""
|
|
if not context or not text:
|
|
return 0
|
|
context_words = set(re.findall(r"\b\w+\b", context.lower()))
|
|
text_words = set(re.findall(r"\b\w+\b", text.lower()))
|
|
# Ignore very common words (basic stopword list)
|
|
stopwords = {
|
|
"the",
|
|
"a",
|
|
"is",
|
|
"in",
|
|
"it",
|
|
"of",
|
|
"and",
|
|
"to",
|
|
"for",
|
|
"on",
|
|
"with",
|
|
"that",
|
|
"this",
|
|
"i",
|
|
"you",
|
|
"me",
|
|
"my",
|
|
"your",
|
|
}
|
|
context_words -= stopwords
|
|
text_words -= stopwords
|
|
if not context_words: # Avoid division by zero if context is only stopwords
|
|
return 0
|
|
overlap = len(context_words.intersection(text_words))
|
|
# Normalize score slightly by context length (more overlap needed for longer context)
|
|
# score = overlap / (len(context_words) ** 0.5) # Example normalization
|
|
score = overlap # Simpler score for now
|
|
return score
|
|
|
|
|
|
class MemoryManager:
|
|
"""Handles database interactions for Gurt's memory (facts and semantic)."""
|
|
|
|
def __init__(
|
|
self,
|
|
db_path: str,
|
|
max_user_facts: int = 20,
|
|
max_general_facts: int = 100,
|
|
semantic_model_name: str = "all-MiniLM-L6-v2",
|
|
chroma_path: str = "data/chroma_db",
|
|
):
|
|
self.db_path = db_path
|
|
self.max_user_facts = max_user_facts
|
|
self.max_general_facts = max_general_facts
|
|
self.db_lock = asyncio.Lock() # Lock for SQLite operations
|
|
|
|
# Ensure data directories exist
|
|
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
|
|
os.makedirs(chroma_path, exist_ok=True)
|
|
logger.info(
|
|
f"MemoryManager initialized with db_path: {self.db_path}, chroma_path: {chroma_path}"
|
|
)
|
|
|
|
# --- Semantic Memory Setup ---
|
|
self.chroma_path = chroma_path
|
|
self.semantic_model_name = semantic_model_name
|
|
self.chroma_client = None
|
|
self.embedding_function = None
|
|
self.semantic_collection = None # For messages
|
|
self.fact_collection = None # For facts
|
|
self.transformer_model = None
|
|
self._initialize_semantic_memory_sync() # Initialize semantic components synchronously for simplicity during init
|
|
|
|
def _initialize_semantic_memory_sync(self):
|
|
"""Synchronously initializes ChromaDB client, model, and collection."""
|
|
try:
|
|
logger.info("Initializing ChromaDB client...")
|
|
# Use PersistentClient for saving data to disk
|
|
self.chroma_client = chromadb.PersistentClient(path=self.chroma_path)
|
|
|
|
logger.info(
|
|
f"Loading Sentence Transformer model: {self.semantic_model_name}..."
|
|
)
|
|
# Load the model directly
|
|
self.transformer_model = SentenceTransformer(self.semantic_model_name)
|
|
|
|
# Create a custom embedding function using the loaded model
|
|
class CustomEmbeddingFunction(embedding_functions.EmbeddingFunction):
|
|
def __init__(self, model):
|
|
self.model = model
|
|
|
|
def __call__(self, input: chromadb.Documents) -> chromadb.Embeddings:
|
|
# Ensure input is a list of strings
|
|
if not isinstance(input, list):
|
|
input = [str(input)] # Convert single item to list
|
|
elif not all(isinstance(item, str) for item in input):
|
|
input = [
|
|
str(item) for item in input
|
|
] # Ensure all items are strings
|
|
|
|
logger.debug(f"Generating embeddings for {len(input)} documents.")
|
|
embeddings = self.model.encode(
|
|
input, show_progress_bar=False
|
|
).tolist()
|
|
logger.debug(f"Generated {len(embeddings)} embeddings.")
|
|
return embeddings
|
|
|
|
self.embedding_function = CustomEmbeddingFunction(self.transformer_model)
|
|
|
|
logger.info(
|
|
"Getting/Creating ChromaDB collection 'gurt_semantic_memory'..."
|
|
)
|
|
# Get or create the collection with the custom embedding function
|
|
self.semantic_collection = self.chroma_client.get_or_create_collection(
|
|
name="gurt_semantic_memory",
|
|
embedding_function=self.embedding_function,
|
|
metadata={"hnsw:space": "cosine"}, # Use cosine distance for similarity
|
|
) # Added missing closing parenthesis
|
|
logger.info("ChromaDB message collection initialized successfully.")
|
|
|
|
logger.info("Getting/Creating ChromaDB collection 'gurt_fact_memory'...")
|
|
# Get or create the collection for facts
|
|
self.fact_collection = self.chroma_client.get_or_create_collection(
|
|
name="gurt_fact_memory",
|
|
embedding_function=self.embedding_function,
|
|
metadata={"hnsw:space": "cosine"}, # Use cosine distance for similarity
|
|
)
|
|
logger.info("ChromaDB fact collection initialized successfully.")
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Failed to initialize semantic memory (ChromaDB): {e}", exc_info=True
|
|
)
|
|
# Set components to None to indicate failure
|
|
self.chroma_client = None
|
|
self.transformer_model = None
|
|
self.embedding_function = None
|
|
self.semantic_collection = None
|
|
self.fact_collection = None # Also set fact_collection to None on error
|
|
|
|
async def initialize_sqlite_database(self):
|
|
"""Initializes the SQLite database and creates tables if they don't exist."""
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
await db.execute("PRAGMA journal_mode=WAL;")
|
|
|
|
# Create user_facts table if it doesn't exist
|
|
await db.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS user_facts (
|
|
user_id TEXT NOT NULL,
|
|
fact TEXT NOT NULL,
|
|
chroma_id TEXT, -- Added for linking to ChromaDB
|
|
timestamp REAL DEFAULT (unixepoch('now')),
|
|
PRIMARY KEY (user_id, fact)
|
|
);
|
|
"""
|
|
)
|
|
|
|
# Check if chroma_id column exists in user_facts table
|
|
try:
|
|
# Try to get column info
|
|
cursor = await db.execute("PRAGMA table_info(user_facts)")
|
|
columns = await cursor.fetchall()
|
|
column_names = [column[1] for column in columns]
|
|
|
|
# If chroma_id column doesn't exist, add it
|
|
if "chroma_id" not in column_names:
|
|
logger.info("Adding chroma_id column to user_facts table")
|
|
await db.execute("ALTER TABLE user_facts ADD COLUMN chroma_id TEXT")
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error checking/adding chroma_id column to user_facts: {e}",
|
|
exc_info=True,
|
|
)
|
|
|
|
# Create indexes
|
|
await db.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_user_facts_user ON user_facts (user_id);"
|
|
)
|
|
await db.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_user_facts_chroma_id ON user_facts (chroma_id);"
|
|
) # Index for chroma_id
|
|
|
|
# Create general_facts table if it doesn't exist
|
|
await db.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS general_facts (
|
|
fact TEXT PRIMARY KEY NOT NULL,
|
|
chroma_id TEXT, -- Added for linking to ChromaDB
|
|
timestamp REAL DEFAULT (unixepoch('now'))
|
|
);
|
|
"""
|
|
)
|
|
|
|
# Check if chroma_id column exists in general_facts table
|
|
try:
|
|
# Try to get column info
|
|
cursor = await db.execute("PRAGMA table_info(general_facts)")
|
|
columns = await cursor.fetchall()
|
|
column_names = [column[1] for column in columns]
|
|
|
|
# If chroma_id column doesn't exist, add it
|
|
if "chroma_id" not in column_names:
|
|
logger.info("Adding chroma_id column to general_facts table")
|
|
await db.execute(
|
|
"ALTER TABLE general_facts ADD COLUMN chroma_id TEXT"
|
|
)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error checking/adding chroma_id column to general_facts: {e}",
|
|
exc_info=True,
|
|
)
|
|
|
|
# Create index for general_facts
|
|
await db.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_general_facts_chroma_id ON general_facts (chroma_id);"
|
|
) # Index for chroma_id
|
|
|
|
# --- Add Personality Table ---
|
|
await db.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS gurt_personality (
|
|
trait_key TEXT PRIMARY KEY NOT NULL,
|
|
trait_value TEXT NOT NULL, -- Store value as JSON string
|
|
last_updated REAL DEFAULT (unixepoch('now'))
|
|
);
|
|
"""
|
|
)
|
|
logger.info("Personality table created/verified.")
|
|
# --- End Personality Table ---
|
|
|
|
# --- Add Interests Table ---
|
|
await db.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS gurt_interests (
|
|
interest_topic TEXT PRIMARY KEY NOT NULL,
|
|
interest_level REAL DEFAULT 0.1, -- Start with a small default level
|
|
last_updated REAL DEFAULT (unixepoch('now'))
|
|
);
|
|
"""
|
|
)
|
|
await db.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_interest_level ON gurt_interests (interest_level);"
|
|
)
|
|
logger.info("Interests table created/verified.")
|
|
# --- End Interests Table ---
|
|
|
|
# --- Add Goals Table ---
|
|
await db.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS gurt_goals (
|
|
goal_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
description TEXT NOT NULL UNIQUE, -- The goal description
|
|
status TEXT DEFAULT 'pending', -- e.g., pending, active, completed, failed
|
|
priority INTEGER DEFAULT 5, -- Lower number = higher priority
|
|
created_timestamp REAL DEFAULT (unixepoch('now')),
|
|
last_updated REAL DEFAULT (unixepoch('now')),
|
|
details TEXT, -- Optional JSON blob for sub-tasks, progress, etc.
|
|
guild_id TEXT, -- The server ID where the goal was created
|
|
channel_id TEXT, -- The channel ID where the goal was created
|
|
user_id TEXT -- The user ID who created the goal
|
|
);
|
|
"""
|
|
)
|
|
await db.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_goal_status ON gurt_goals (status);"
|
|
)
|
|
await db.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_goal_priority ON gurt_goals (priority);"
|
|
)
|
|
logger.info("Goals table created/verified.")
|
|
# --- End Goals Table ---
|
|
|
|
# Add ALTER TABLE statements here to ensure columns exist
|
|
try:
|
|
# Check if guild_id column exists
|
|
cursor = await db.execute("PRAGMA table_info(gurt_goals)")
|
|
columns = await cursor.fetchall()
|
|
column_names = [column[1] for column in columns]
|
|
|
|
if "guild_id" not in column_names:
|
|
logger.info("Adding guild_id column to gurt_goals table")
|
|
await db.execute("ALTER TABLE gurt_goals ADD COLUMN guild_id TEXT")
|
|
if "channel_id" not in column_names:
|
|
logger.info("Adding channel_id column to gurt_goals table")
|
|
await db.execute(
|
|
"ALTER TABLE gurt_goals ADD COLUMN channel_id TEXT"
|
|
)
|
|
if "user_id" not in column_names:
|
|
logger.info("Adding user_id column to gurt_goals table")
|
|
await db.execute("ALTER TABLE gurt_goals ADD COLUMN user_id TEXT")
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error checking/adding columns to gurt_goals table: {e}",
|
|
exc_info=True,
|
|
)
|
|
|
|
# --- Add Internal Actions Log Table ---
|
|
await db.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS internal_actions (
|
|
action_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp REAL DEFAULT (unixepoch('now')),
|
|
tool_name TEXT NOT NULL,
|
|
arguments_json TEXT, -- Store arguments as JSON string
|
|
reasoning TEXT, -- Added: Reasoning behind the action
|
|
result_summary TEXT -- Store a summary of the result or error message
|
|
);
|
|
"""
|
|
)
|
|
# Check if reasoning column exists
|
|
try:
|
|
cursor = await db.execute("PRAGMA table_info(internal_actions)")
|
|
columns = await cursor.fetchall()
|
|
column_names = [column[1] for column in columns]
|
|
if "reasoning" not in column_names:
|
|
logger.info("Adding reasoning column to internal_actions table")
|
|
await db.execute(
|
|
"ALTER TABLE internal_actions ADD COLUMN reasoning TEXT"
|
|
)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error checking/adding reasoning column to internal_actions: {e}",
|
|
exc_info=True,
|
|
)
|
|
|
|
await db.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_internal_actions_timestamp ON internal_actions (timestamp);"
|
|
)
|
|
await db.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_internal_actions_tool_name ON internal_actions (tool_name);"
|
|
)
|
|
logger.info("Internal Actions Log table created/verified.")
|
|
# --- End Internal Actions Log Table ---
|
|
|
|
await db.commit()
|
|
logger.info(f"SQLite database initialized/verified at {self.db_path}")
|
|
|
|
# --- SQLite Helper Methods ---
|
|
async def _db_execute(self, sql: str, params: tuple = ()):
|
|
async with self.db_lock:
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
await db.execute(sql, params)
|
|
await db.commit()
|
|
|
|
async def _db_fetchone(self, sql: str, params: tuple = ()) -> Optional[tuple]:
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
async with db.execute(sql, params) as cursor:
|
|
return await cursor.fetchone()
|
|
|
|
async def _db_fetchall(self, sql: str, params: tuple = ()) -> List[tuple]:
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
async with db.execute(sql, params) as cursor:
|
|
return await cursor.fetchall()
|
|
|
|
# --- User Fact Memory Methods (SQLite + Relevance) ---
|
|
|
|
async def add_user_fact(self, user_id: str, fact: str) -> Dict[str, Any]:
|
|
"""Stores a fact about a user in the SQLite database, enforcing limits."""
|
|
if not user_id or not fact:
|
|
return {"error": "user_id and fact are required."}
|
|
logger.info(f"Attempting to add user fact for {user_id}: '{fact}'")
|
|
try:
|
|
# Check SQLite first
|
|
existing = await self._db_fetchone(
|
|
"SELECT chroma_id FROM user_facts WHERE user_id = ? AND fact = ?",
|
|
(user_id, fact),
|
|
)
|
|
if existing:
|
|
logger.info(f"Fact already known for user {user_id} (SQLite).")
|
|
return {"status": "duplicate", "user_id": user_id, "fact": fact}
|
|
|
|
count_result = await self._db_fetchone(
|
|
"SELECT COUNT(*) FROM user_facts WHERE user_id = ?", (user_id,)
|
|
)
|
|
current_count = count_result[0] if count_result else 0
|
|
|
|
status = "added"
|
|
deleted_chroma_id = None
|
|
if current_count >= self.max_user_facts:
|
|
logger.warning(
|
|
f"User {user_id} fact limit ({self.max_user_facts}) reached. Deleting oldest."
|
|
)
|
|
# Fetch oldest fact and its chroma_id for deletion
|
|
oldest_fact_row = await self._db_fetchone(
|
|
"SELECT fact, chroma_id FROM user_facts WHERE user_id = ? ORDER BY timestamp ASC LIMIT 1",
|
|
(user_id,),
|
|
)
|
|
if oldest_fact_row:
|
|
oldest_fact, deleted_chroma_id = oldest_fact_row
|
|
await self._db_execute(
|
|
"DELETE FROM user_facts WHERE user_id = ? AND fact = ?",
|
|
(user_id, oldest_fact),
|
|
)
|
|
logger.info(
|
|
f"Deleted oldest fact for user {user_id} from SQLite: '{oldest_fact}'"
|
|
)
|
|
status = (
|
|
"limit_reached" # Indicate limit was hit but fact was added
|
|
)
|
|
|
|
# Generate chroma_id
|
|
fact_hash = hashlib.sha1(fact.encode()).hexdigest()[:16] # Short hash
|
|
chroma_id = f"user-{user_id}-{fact_hash}"
|
|
|
|
# Insert into SQLite
|
|
await self._db_execute(
|
|
"INSERT INTO user_facts (user_id, fact, chroma_id) VALUES (?, ?, ?)",
|
|
(user_id, fact, chroma_id),
|
|
)
|
|
logger.info(f"Fact added for user {user_id} to SQLite.")
|
|
|
|
# Add to ChromaDB fact collection
|
|
if self.fact_collection and self.embedding_function:
|
|
try:
|
|
metadata = {
|
|
"user_id": user_id,
|
|
"type": "user",
|
|
"timestamp": time.time(),
|
|
}
|
|
await asyncio.to_thread(
|
|
self.fact_collection.add,
|
|
documents=[fact],
|
|
metadatas=[metadata],
|
|
ids=[chroma_id],
|
|
)
|
|
logger.info(
|
|
f"Fact added/updated for user {user_id} in ChromaDB (ID: {chroma_id})."
|
|
)
|
|
|
|
# Delete the oldest fact from ChromaDB if limit was reached
|
|
if deleted_chroma_id:
|
|
logger.info(
|
|
f"Attempting to delete oldest fact from ChromaDB (ID: {deleted_chroma_id})."
|
|
)
|
|
await asyncio.to_thread(
|
|
self.fact_collection.delete, ids=[deleted_chroma_id]
|
|
)
|
|
logger.info(
|
|
f"Successfully deleted oldest fact from ChromaDB (ID: {deleted_chroma_id})."
|
|
)
|
|
|
|
except Exception as chroma_e:
|
|
logger.error(
|
|
f"ChromaDB error adding/deleting user fact for {user_id} (ID: {chroma_id}): {chroma_e}",
|
|
exc_info=True,
|
|
)
|
|
# Note: Fact is still in SQLite, but ChromaDB might be inconsistent. Consider rollback? For now, just log.
|
|
else:
|
|
logger.warning(
|
|
f"ChromaDB fact collection not available. Skipping embedding for user fact {user_id}."
|
|
)
|
|
|
|
return {"status": status, "user_id": user_id, "fact_added": fact}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error adding user fact for {user_id}: {e}", exc_info=True)
|
|
return {"error": f"Database error adding user fact: {str(e)}"}
|
|
|
|
async def get_user_facts(
|
|
self, user_id: str, context: Optional[str] = None
|
|
) -> List[str]:
|
|
"""Retrieves stored facts about a user, optionally scored by relevance to context."""
|
|
if not user_id:
|
|
logger.warning("get_user_facts called without user_id.")
|
|
return []
|
|
logger.info(
|
|
f"Retrieving facts for user {user_id} (context provided: {bool(context)})"
|
|
)
|
|
limit = self.max_user_facts # Use the class attribute for limit
|
|
|
|
try:
|
|
if context and self.fact_collection and self.embedding_function:
|
|
# --- Semantic Search ---
|
|
logger.debug(
|
|
f"Performing semantic search for user facts (User: {user_id}, Limit: {limit})"
|
|
)
|
|
try:
|
|
# Query ChromaDB for facts relevant to the context
|
|
results = await asyncio.to_thread(
|
|
self.fact_collection.query,
|
|
query_texts=[context],
|
|
n_results=limit,
|
|
where={ # Use $and for multiple conditions
|
|
"$and": [{"user_id": user_id}, {"type": "user"}]
|
|
},
|
|
include=["documents"], # Only need the fact text
|
|
)
|
|
logger.debug(f"ChromaDB user fact query results: {results}")
|
|
|
|
if results and results.get("documents") and results["documents"][0]:
|
|
relevant_facts = results["documents"][0]
|
|
logger.info(
|
|
f"Found {len(relevant_facts)} semantically relevant user facts for {user_id}."
|
|
)
|
|
return relevant_facts
|
|
else:
|
|
logger.info(
|
|
f"No semantic user facts found for {user_id} matching context."
|
|
)
|
|
return [] # Return empty list if no semantic matches
|
|
|
|
except Exception as chroma_e:
|
|
logger.error(
|
|
f"ChromaDB error searching user facts for {user_id}: {chroma_e}",
|
|
exc_info=True,
|
|
)
|
|
# Fallback to SQLite retrieval on ChromaDB error
|
|
logger.warning(
|
|
f"Falling back to SQLite retrieval for user facts {user_id} due to ChromaDB error."
|
|
)
|
|
# Proceed to the SQLite block below
|
|
# --- SQLite Fallback / No Context ---
|
|
# If no context, or if ChromaDB failed/unavailable, get newest N facts from SQLite
|
|
logger.debug(
|
|
f"Retrieving user facts from SQLite (User: {user_id}, Limit: {limit})"
|
|
)
|
|
rows_ordered = await self._db_fetchall(
|
|
"SELECT fact FROM user_facts WHERE user_id = ? ORDER BY timestamp DESC LIMIT ?",
|
|
(user_id, limit),
|
|
)
|
|
sqlite_facts = [row[0] for row in rows_ordered]
|
|
logger.info(
|
|
f"Retrieved {len(sqlite_facts)} user facts from SQLite for {user_id}."
|
|
)
|
|
return sqlite_facts
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error retrieving user facts for {user_id}: {e}", exc_info=True
|
|
)
|
|
return []
|
|
|
|
# --- General Fact Memory Methods (SQLite + Relevance) ---
|
|
|
|
async def add_general_fact(self, fact: str) -> Dict[str, Any]:
|
|
"""Stores a general fact in the SQLite database, enforcing limits."""
|
|
if not fact:
|
|
return {"error": "fact is required."}
|
|
logger.info(f"Attempting to add general fact: '{fact}'")
|
|
try:
|
|
# Check SQLite first
|
|
existing = await self._db_fetchone(
|
|
"SELECT chroma_id FROM general_facts WHERE fact = ?", (fact,)
|
|
)
|
|
if existing:
|
|
logger.info(f"General fact already known (SQLite): '{fact}'")
|
|
return {"status": "duplicate", "fact": fact}
|
|
|
|
count_result = await self._db_fetchone(
|
|
"SELECT COUNT(*) FROM general_facts", ()
|
|
)
|
|
current_count = count_result[0] if count_result else 0
|
|
|
|
status = "added"
|
|
deleted_chroma_id = None
|
|
if current_count >= self.max_general_facts:
|
|
logger.warning(
|
|
f"General fact limit ({self.max_general_facts}) reached. Deleting oldest."
|
|
)
|
|
# Fetch oldest fact and its chroma_id for deletion
|
|
oldest_fact_row = await self._db_fetchone(
|
|
"SELECT fact, chroma_id FROM general_facts ORDER BY timestamp ASC LIMIT 1",
|
|
(),
|
|
)
|
|
if oldest_fact_row:
|
|
oldest_fact, deleted_chroma_id = oldest_fact_row
|
|
await self._db_execute(
|
|
"DELETE FROM general_facts WHERE fact = ?", (oldest_fact,)
|
|
)
|
|
logger.info(
|
|
f"Deleted oldest general fact from SQLite: '{oldest_fact}'"
|
|
)
|
|
status = "limit_reached"
|
|
|
|
# Generate chroma_id
|
|
fact_hash = hashlib.sha1(fact.encode()).hexdigest()[:16] # Short hash
|
|
chroma_id = f"general-{fact_hash}"
|
|
|
|
# Insert into SQLite
|
|
await self._db_execute(
|
|
"INSERT INTO general_facts (fact, chroma_id) VALUES (?, ?)",
|
|
(fact, chroma_id),
|
|
)
|
|
logger.info(f"General fact added to SQLite: '{fact}'")
|
|
|
|
# Add to ChromaDB fact collection
|
|
if self.fact_collection and self.embedding_function:
|
|
try:
|
|
metadata = {"type": "general", "timestamp": time.time()}
|
|
await asyncio.to_thread(
|
|
self.fact_collection.add,
|
|
documents=[fact],
|
|
metadatas=[metadata],
|
|
ids=[chroma_id],
|
|
)
|
|
logger.info(
|
|
f"General fact added/updated in ChromaDB (ID: {chroma_id})."
|
|
)
|
|
|
|
# Delete the oldest fact from ChromaDB if limit was reached
|
|
if deleted_chroma_id:
|
|
logger.info(
|
|
f"Attempting to delete oldest general fact from ChromaDB (ID: {deleted_chroma_id})."
|
|
)
|
|
await asyncio.to_thread(
|
|
self.fact_collection.delete, ids=[deleted_chroma_id]
|
|
)
|
|
logger.info(
|
|
f"Successfully deleted oldest general fact from ChromaDB (ID: {deleted_chroma_id})."
|
|
)
|
|
|
|
except Exception as chroma_e:
|
|
logger.error(
|
|
f"ChromaDB error adding/deleting general fact (ID: {chroma_id}): {chroma_e}",
|
|
exc_info=True,
|
|
)
|
|
# Note: Fact is still in SQLite.
|
|
else:
|
|
logger.warning(
|
|
f"ChromaDB fact collection not available. Skipping embedding for general fact."
|
|
)
|
|
|
|
return {"status": status, "fact_added": fact}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error adding general fact: {e}", exc_info=True)
|
|
return {"error": f"Database error adding general fact: {str(e)}"}
|
|
|
|
async def get_general_facts(
|
|
self,
|
|
query: Optional[str] = None,
|
|
limit: Optional[int] = 10,
|
|
context: Optional[str] = None,
|
|
) -> List[str]:
|
|
"""Retrieves stored general facts, optionally filtering by query or scoring by context relevance."""
|
|
logger.info(
|
|
f"Retrieving general facts (query='{query}', limit={limit}, context provided: {bool(context)})"
|
|
)
|
|
limit = min(max(1, limit or 10), 50) # Use provided limit or default 10, max 50
|
|
|
|
try:
|
|
if context and self.fact_collection and self.embedding_function:
|
|
# --- Semantic Search (Prioritized if context is provided) ---
|
|
# Note: The 'query' parameter is ignored when context is provided for semantic search.
|
|
logger.debug(
|
|
f"Performing semantic search for general facts (Limit: {limit})"
|
|
)
|
|
try:
|
|
results = await asyncio.to_thread(
|
|
self.fact_collection.query,
|
|
query_texts=[context],
|
|
n_results=limit,
|
|
where={"type": "general"}, # Filter by type
|
|
include=["documents"], # Only need the fact text
|
|
)
|
|
logger.debug(f"ChromaDB general fact query results: {results}")
|
|
|
|
if results and results.get("documents") and results["documents"][0]:
|
|
relevant_facts = results["documents"][0]
|
|
logger.info(
|
|
f"Found {len(relevant_facts)} semantically relevant general facts."
|
|
)
|
|
return relevant_facts
|
|
else:
|
|
logger.info("No semantic general facts found matching context.")
|
|
return [] # Return empty list if no semantic matches
|
|
|
|
except Exception as chroma_e:
|
|
logger.error(
|
|
f"ChromaDB error searching general facts: {chroma_e}",
|
|
exc_info=True,
|
|
)
|
|
# Fallback to SQLite retrieval on ChromaDB error
|
|
logger.warning(
|
|
"Falling back to SQLite retrieval for general facts due to ChromaDB error."
|
|
)
|
|
# Proceed to the SQLite block below, respecting the original 'query' if present
|
|
# --- SQLite Fallback / No Context / ChromaDB Error ---
|
|
# If no context, or if ChromaDB failed/unavailable, get newest N facts from SQLite, applying query if present.
|
|
logger.debug(
|
|
f"Retrieving general facts from SQLite (Query: '{query}', Limit: {limit})"
|
|
)
|
|
sql = "SELECT fact FROM general_facts"
|
|
params = []
|
|
if query:
|
|
# Apply the LIKE query only in the SQLite fallback scenario
|
|
sql += " WHERE fact LIKE ?"
|
|
params.append(f"%{query}%")
|
|
|
|
sql += " ORDER BY timestamp DESC LIMIT ?"
|
|
params.append(limit)
|
|
|
|
rows_ordered = await self._db_fetchall(sql, tuple(params))
|
|
sqlite_facts = [row[0] for row in rows_ordered]
|
|
logger.info(
|
|
f"Retrieved {len(sqlite_facts)} general facts from SQLite (Query: '{query}')."
|
|
)
|
|
return sqlite_facts
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error retrieving general facts: {e}", exc_info=True)
|
|
return []
|
|
|
|
# --- Personality Trait Methods (SQLite) ---
|
|
|
|
async def set_personality_trait(self, key: str, value: Any):
|
|
"""Stores or updates a personality trait in the database."""
|
|
if not key:
|
|
logger.error("set_personality_trait called with empty key.")
|
|
return
|
|
try:
|
|
# Serialize the value to a JSON string to handle different types (str, int, float, bool)
|
|
value_json = json.dumps(value)
|
|
await self._db_execute(
|
|
"INSERT OR REPLACE INTO gurt_personality (trait_key, trait_value, last_updated) VALUES (?, ?, unixepoch('now'))",
|
|
(key, value_json),
|
|
)
|
|
logger.info(f"Personality trait '{key}' set/updated.")
|
|
except Exception as e:
|
|
logger.error(f"Error setting personality trait '{key}': {e}", exc_info=True)
|
|
|
|
async def get_personality_trait(self, key: str) -> Optional[Any]:
|
|
"""Retrieves a specific personality trait from the database."""
|
|
if not key:
|
|
logger.error("get_personality_trait called with empty key.")
|
|
return None
|
|
try:
|
|
row = await self._db_fetchone(
|
|
"SELECT trait_value FROM gurt_personality WHERE trait_key = ?", (key,)
|
|
)
|
|
if row:
|
|
# Deserialize the JSON string back to its original type
|
|
value = json.loads(row[0])
|
|
logger.debug(f"Retrieved personality trait '{key}': {value}")
|
|
return value
|
|
else:
|
|
logger.debug(f"Personality trait '{key}' not found.")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error getting personality trait '{key}': {e}", exc_info=True)
|
|
return None
|
|
|
|
async def get_all_personality_traits(self) -> Dict[str, Any]:
|
|
"""Retrieves all personality traits from the database."""
|
|
traits = {}
|
|
try:
|
|
rows = await self._db_fetchall(
|
|
"SELECT trait_key, trait_value FROM gurt_personality", ()
|
|
)
|
|
for key, value_json in rows:
|
|
try:
|
|
# Deserialize each value
|
|
traits[key] = json.loads(value_json)
|
|
except json.JSONDecodeError as json_e:
|
|
logger.error(
|
|
f"Error decoding JSON for trait '{key}': {json_e}. Value: {value_json}"
|
|
)
|
|
traits[key] = None # Or handle error differently
|
|
logger.info(f"Retrieved {len(traits)} personality traits.")
|
|
return traits
|
|
except Exception as e:
|
|
logger.error(f"Error getting all personality traits: {e}", exc_info=True)
|
|
return {}
|
|
|
|
async def load_baseline_personality(self, baseline_traits: Dict[str, Any]):
|
|
"""Loads baseline traits into the personality table ONLY if it's empty."""
|
|
if not baseline_traits:
|
|
logger.warning(
|
|
"load_baseline_personality called with empty baseline traits."
|
|
)
|
|
return
|
|
try:
|
|
# Check if the table is empty
|
|
count_result = await self._db_fetchone(
|
|
"SELECT COUNT(*) FROM gurt_personality", ()
|
|
)
|
|
current_count = count_result[0] if count_result else 0
|
|
|
|
if current_count == 0:
|
|
logger.info("Personality table is empty. Loading baseline traits...")
|
|
for key, value in baseline_traits.items():
|
|
await self.set_personality_trait(key, value)
|
|
logger.info(f"Loaded {len(baseline_traits)} baseline traits.")
|
|
else:
|
|
logger.info(
|
|
f"Personality table already contains {current_count} traits. Skipping baseline load."
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error loading baseline personality: {e}", exc_info=True)
|
|
|
|
async def load_baseline_interests(self, baseline_interests: Dict[str, float]):
|
|
"""Loads baseline interests into the interests table ONLY if it's empty."""
|
|
if not baseline_interests:
|
|
logger.warning(
|
|
"load_baseline_interests called with empty baseline interests."
|
|
)
|
|
return
|
|
try:
|
|
# Check if the table is empty
|
|
count_result = await self._db_fetchone(
|
|
"SELECT COUNT(*) FROM gurt_interests", ()
|
|
)
|
|
current_count = count_result[0] if count_result else 0
|
|
|
|
if current_count == 0:
|
|
logger.info("Interests table is empty. Loading baseline interests...")
|
|
async with self.db_lock:
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
for topic, level in baseline_interests.items():
|
|
topic_normalized = topic.lower().strip()
|
|
if not topic_normalized:
|
|
continue # Skip empty topics
|
|
# Clamp initial level just in case
|
|
level_clamped = max(
|
|
INTEREST_MIN_LEVEL, min(INTEREST_MAX_LEVEL, level)
|
|
)
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO gurt_interests (interest_topic, interest_level, last_updated)
|
|
VALUES (?, ?, unixepoch('now'))
|
|
""",
|
|
(topic_normalized, level_clamped),
|
|
)
|
|
await db.commit()
|
|
logger.info(f"Loaded {len(baseline_interests)} baseline interests.")
|
|
else:
|
|
logger.info(
|
|
f"Interests table already contains {current_count} interests. Skipping baseline load."
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error loading baseline interests: {e}", exc_info=True)
|
|
|
|
async def reset_personality_to_baseline(self, baseline_traits: Dict[str, Any]):
|
|
"""Clears all personality traits and reloads them from the provided baseline."""
|
|
logger.info("Resetting personality traits to baseline...")
|
|
try:
|
|
await self._db_execute("DELETE FROM gurt_personality")
|
|
logger.info("Cleared all existing personality traits from the database.")
|
|
await self.load_baseline_personality(baseline_traits)
|
|
logger.info("Successfully reset personality traits to baseline.")
|
|
return {
|
|
"status": "success",
|
|
"message": "Personality traits reset to baseline.",
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error resetting personality traits: {e}", exc_info=True)
|
|
return {"error": f"Database error resetting personality: {str(e)}"}
|
|
|
|
async def reset_interests_to_baseline(self, baseline_interests: Dict[str, float]):
|
|
"""Clears all interests and reloads them from the provided baseline."""
|
|
logger.info("Resetting interests to baseline...")
|
|
try:
|
|
await self._db_execute("DELETE FROM gurt_interests")
|
|
logger.info("Cleared all existing interests from the database.")
|
|
await self.load_baseline_interests(baseline_interests)
|
|
logger.info("Successfully reset interests to baseline.")
|
|
return {"status": "success", "message": "Interests reset to baseline."}
|
|
except Exception as e:
|
|
logger.error(f"Error resetting interests: {e}", exc_info=True)
|
|
return {"error": f"Database error resetting interests: {str(e)}"}
|
|
|
|
# --- Interest Methods (SQLite) ---
|
|
|
|
async def update_interest(self, topic: str, change: float):
|
|
"""
|
|
Updates the interest level for a given topic. Creates the topic if it doesn't exist.
|
|
Clamps the interest level between INTEREST_MIN_LEVEL and INTEREST_MAX_LEVEL.
|
|
|
|
Args:
|
|
topic: The interest topic (e.g., "gaming", "anime").
|
|
change: The amount to change the interest level by (can be positive or negative).
|
|
"""
|
|
if not topic:
|
|
logger.error("update_interest called with empty topic.")
|
|
return
|
|
topic = topic.lower().strip() # Normalize topic
|
|
if not topic:
|
|
logger.error("update_interest called with empty topic after normalization.")
|
|
return
|
|
|
|
try:
|
|
async with self.db_lock:
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
# Check if topic exists
|
|
cursor = await db.execute(
|
|
"SELECT interest_level FROM gurt_interests WHERE interest_topic = ?",
|
|
(topic,),
|
|
)
|
|
row = await cursor.fetchone()
|
|
|
|
if row:
|
|
current_level = row[0]
|
|
new_level = current_level + change
|
|
else:
|
|
# Topic doesn't exist, create it with initial level + change
|
|
current_level = (
|
|
INTEREST_INITIAL_LEVEL # Use constant for initial level
|
|
)
|
|
new_level = current_level + change
|
|
logger.info(
|
|
f"Creating new interest: '{topic}' with initial level {current_level:.3f} + change {change:.3f}"
|
|
)
|
|
|
|
# Clamp the new level
|
|
new_level_clamped = max(
|
|
INTEREST_MIN_LEVEL, min(INTEREST_MAX_LEVEL, new_level)
|
|
)
|
|
|
|
# Insert or update the topic
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO gurt_interests (interest_topic, interest_level, last_updated)
|
|
VALUES (?, ?, unixepoch('now'))
|
|
ON CONFLICT(interest_topic) DO UPDATE SET
|
|
interest_level = excluded.interest_level,
|
|
last_updated = excluded.last_updated;
|
|
""",
|
|
(topic, new_level_clamped),
|
|
)
|
|
await db.commit()
|
|
logger.info(
|
|
f"Interest '{topic}' updated: {current_level:.3f} -> {new_level_clamped:.3f} (Change: {change:.3f})"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating interest '{topic}': {e}", exc_info=True)
|
|
|
|
async def get_interests(
|
|
self, limit: int = 5, min_level: float = 0.2
|
|
) -> List[Tuple[str, float]]:
|
|
"""
|
|
Retrieves the top interests above a minimum level, ordered by interest level descending.
|
|
|
|
Args:
|
|
limit: The maximum number of interests to return.
|
|
min_level: The minimum interest level required to be included.
|
|
|
|
Returns:
|
|
A list of tuples, where each tuple is (interest_topic, interest_level).
|
|
"""
|
|
interests = []
|
|
try:
|
|
rows = await self._db_fetchall(
|
|
"SELECT interest_topic, interest_level FROM gurt_interests WHERE interest_level >= ? ORDER BY interest_level DESC LIMIT ?",
|
|
(min_level, limit),
|
|
)
|
|
interests = [(row[0], row[1]) for row in rows]
|
|
logger.info(
|
|
f"Retrieved {len(interests)} interests (Limit: {limit}, Min Level: {min_level})."
|
|
)
|
|
return interests
|
|
except Exception as e:
|
|
logger.error(f"Error getting interests: {e}", exc_info=True)
|
|
return []
|
|
|
|
async def decay_interests(
|
|
self,
|
|
decay_rate: float = INTEREST_DECAY_RATE,
|
|
decay_interval_hours: int = INTEREST_DECAY_INTERVAL_HOURS,
|
|
):
|
|
"""
|
|
Applies decay to interest levels for topics not updated recently.
|
|
|
|
Args:
|
|
decay_rate: The fraction to reduce the interest level by (e.g., 0.01 for 1% decay).
|
|
decay_interval_hours: Only decay interests not updated within this many hours.
|
|
"""
|
|
if not (0 < decay_rate < 1):
|
|
logger.error(f"Invalid decay_rate: {decay_rate}. Must be between 0 and 1.")
|
|
return
|
|
if decay_interval_hours <= 0:
|
|
logger.error(
|
|
f"Invalid decay_interval_hours: {decay_interval_hours}. Must be positive."
|
|
)
|
|
return
|
|
|
|
try:
|
|
cutoff_timestamp = time.time() - (decay_interval_hours * 3600)
|
|
logger.info(
|
|
f"Applying interest decay (Rate: {decay_rate}) for interests not updated since {datetime.datetime.fromtimestamp(cutoff_timestamp).isoformat()}..."
|
|
)
|
|
|
|
async with self.db_lock:
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
# Select topics eligible for decay
|
|
cursor = await db.execute(
|
|
"SELECT interest_topic, interest_level FROM gurt_interests WHERE last_updated < ?",
|
|
(cutoff_timestamp,),
|
|
)
|
|
topics_to_decay = await cursor.fetchall()
|
|
|
|
if not topics_to_decay:
|
|
logger.info("No interests found eligible for decay.")
|
|
return
|
|
|
|
updated_count = 0
|
|
# Apply decay and update
|
|
for topic, current_level in topics_to_decay:
|
|
# Calculate decay amount (ensure it doesn't go below min level instantly)
|
|
decay_amount = current_level * decay_rate
|
|
new_level = current_level - decay_amount
|
|
# Ensure level doesn't drop below the minimum threshold due to decay
|
|
new_level_clamped = max(INTEREST_MIN_LEVEL, new_level)
|
|
|
|
# Only update if the level actually changes significantly
|
|
if abs(new_level_clamped - current_level) > 0.001:
|
|
await db.execute(
|
|
"UPDATE gurt_interests SET interest_level = ? WHERE interest_topic = ?",
|
|
(new_level_clamped, topic),
|
|
)
|
|
logger.debug(
|
|
f"Decayed interest '{topic}': {current_level:.3f} -> {new_level_clamped:.3f}"
|
|
)
|
|
updated_count += 1
|
|
|
|
await db.commit()
|
|
logger.info(
|
|
f"Interest decay cycle complete. Updated {updated_count}/{len(topics_to_decay)} eligible interests."
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during interest decay: {e}", exc_info=True)
|
|
|
|
# --- Semantic Memory Methods (ChromaDB) ---
|
|
|
|
async def add_message_embedding(
|
|
self,
|
|
message_id: str,
|
|
formatted_message_data: Dict[str, Any],
|
|
metadata: Dict[str, Any],
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Generates embedding and stores a message (including attachment descriptions)
|
|
in ChromaDB.
|
|
"""
|
|
if not self.semantic_collection:
|
|
return {"error": "Semantic memory (ChromaDB) is not initialized."}
|
|
|
|
# Construct the text to embed: content + attachment descriptions
|
|
text_to_embed_parts = []
|
|
if formatted_message_data.get("content"):
|
|
text_to_embed_parts.append(formatted_message_data["content"])
|
|
|
|
attachment_descs = formatted_message_data.get("attachment_descriptions", [])
|
|
if attachment_descs:
|
|
# Add a separator if there's content AND attachments
|
|
if text_to_embed_parts:
|
|
text_to_embed_parts.append("\n") # Add newline separator
|
|
# Append descriptions
|
|
for att in attachment_descs:
|
|
text_to_embed_parts.append(att.get("description", ""))
|
|
|
|
text_to_embed = " ".join(text_to_embed_parts).strip()
|
|
|
|
if not text_to_embed:
|
|
# This might happen if a message ONLY contains attachments and no text content,
|
|
# but format_message should always produce descriptions. Log if empty.
|
|
logger.warning(
|
|
f"Message {message_id} resulted in empty text_to_embed. Original data: {formatted_message_data}"
|
|
)
|
|
return {"error": "Cannot add empty derived text to semantic memory."}
|
|
|
|
logger.info(
|
|
f"Adding message {message_id} to semantic memory (including attachments)."
|
|
)
|
|
try:
|
|
# ChromaDB expects lists for inputs
|
|
await asyncio.to_thread(
|
|
self.semantic_collection.add,
|
|
documents=[text_to_embed], # Embed the combined text
|
|
metadatas=[metadata],
|
|
ids=[message_id],
|
|
)
|
|
logger.info(f"Successfully added message {message_id} to ChromaDB.")
|
|
return {"status": "success", "message_id": message_id}
|
|
except Exception as e:
|
|
logger.error(
|
|
f"ChromaDB error adding message {message_id}: {e}", exc_info=True
|
|
)
|
|
return {"error": f"Semantic memory error adding message: {str(e)}"}
|
|
|
|
async def search_semantic_memory(
|
|
self,
|
|
query_text: str,
|
|
n_results: int = 5,
|
|
filter_metadata: Optional[Dict[str, Any]] = None,
|
|
) -> List[Dict[str, Any]]:
|
|
"""Searches ChromaDB for messages semantically similar to the query text."""
|
|
if not self.semantic_collection:
|
|
logger.warning(
|
|
"Search semantic memory called, but ChromaDB is not initialized."
|
|
)
|
|
return []
|
|
if not query_text:
|
|
logger.warning("Search semantic memory called with empty query text.")
|
|
return []
|
|
|
|
logger.info(
|
|
f"Searching semantic memory (n_results={n_results}, filter={filter_metadata}) for query: '{query_text[:50]}...'"
|
|
)
|
|
try:
|
|
# Perform the query in a separate thread as ChromaDB operations can be blocking
|
|
results = await asyncio.to_thread(
|
|
self.semantic_collection.query,
|
|
query_texts=[query_text],
|
|
n_results=n_results,
|
|
where=filter_metadata, # Optional filter based on metadata
|
|
include=[
|
|
"metadatas",
|
|
"documents",
|
|
"distances",
|
|
], # Include distance for relevance
|
|
)
|
|
logger.debug(f"ChromaDB query results: {results}")
|
|
|
|
# Process results
|
|
processed_results = []
|
|
if results and results.get("ids") and results["ids"][0]:
|
|
for i, doc_id in enumerate(results["ids"][0]):
|
|
processed_results.append(
|
|
{
|
|
"id": doc_id,
|
|
"document": (
|
|
results["documents"][0][i]
|
|
if results.get("documents")
|
|
else None
|
|
),
|
|
"metadata": (
|
|
results["metadatas"][0][i]
|
|
if results.get("metadatas")
|
|
else None
|
|
),
|
|
"distance": (
|
|
results["distances"][0][i]
|
|
if results.get("distances")
|
|
else None
|
|
),
|
|
}
|
|
)
|
|
logger.info(f"Found {len(processed_results)} semantic results.")
|
|
return processed_results
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"ChromaDB error searching memory for query '{query_text[:50]}...': {e}",
|
|
exc_info=True,
|
|
)
|
|
return []
|
|
|
|
async def delete_user_fact(
|
|
self, user_id: str, fact_to_delete: str
|
|
) -> Dict[str, Any]:
|
|
"""Deletes a specific fact for a user from both SQLite and ChromaDB."""
|
|
if not user_id or not fact_to_delete:
|
|
return {"error": "user_id and fact_to_delete are required."}
|
|
logger.info(f"Attempting to delete user fact for {user_id}: '{fact_to_delete}'")
|
|
deleted_chroma_id = None
|
|
try:
|
|
# Check if fact exists and get chroma_id
|
|
row = await self._db_fetchone(
|
|
"SELECT chroma_id FROM user_facts WHERE user_id = ? AND fact = ?",
|
|
(user_id, fact_to_delete),
|
|
)
|
|
if not row:
|
|
logger.warning(
|
|
f"Fact not found in SQLite for user {user_id}: '{fact_to_delete}'"
|
|
)
|
|
return {
|
|
"status": "not_found",
|
|
"user_id": user_id,
|
|
"fact": fact_to_delete,
|
|
}
|
|
|
|
deleted_chroma_id = row[0]
|
|
|
|
# Delete from SQLite
|
|
await self._db_execute(
|
|
"DELETE FROM user_facts WHERE user_id = ? AND fact = ?",
|
|
(user_id, fact_to_delete),
|
|
)
|
|
logger.info(
|
|
f"Deleted fact from SQLite for user {user_id}: '{fact_to_delete}'"
|
|
)
|
|
|
|
# Delete from ChromaDB if chroma_id exists
|
|
if deleted_chroma_id and self.fact_collection:
|
|
try:
|
|
logger.info(
|
|
f"Attempting to delete fact from ChromaDB (ID: {deleted_chroma_id})."
|
|
)
|
|
await asyncio.to_thread(
|
|
self.fact_collection.delete, ids=[deleted_chroma_id]
|
|
)
|
|
logger.info(
|
|
f"Successfully deleted fact from ChromaDB (ID: {deleted_chroma_id})."
|
|
)
|
|
except Exception as chroma_e:
|
|
logger.error(
|
|
f"ChromaDB error deleting user fact ID {deleted_chroma_id}: {chroma_e}",
|
|
exc_info=True,
|
|
)
|
|
# Log error but consider SQLite deletion successful
|
|
|
|
return {
|
|
"status": "deleted",
|
|
"user_id": user_id,
|
|
"fact_deleted": fact_to_delete,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error deleting user fact for {user_id}: {e}", exc_info=True)
|
|
return {"error": f"Database error deleting user fact: {str(e)}"}
|
|
|
|
async def delete_general_fact(self, fact_to_delete: str) -> Dict[str, Any]:
|
|
"""Deletes a specific general fact from both SQLite and ChromaDB."""
|
|
if not fact_to_delete:
|
|
return {"error": "fact_to_delete is required."}
|
|
logger.info(f"Attempting to delete general fact: '{fact_to_delete}'")
|
|
deleted_chroma_id = None
|
|
try:
|
|
# Check if fact exists and get chroma_id
|
|
row = await self._db_fetchone(
|
|
"SELECT chroma_id FROM general_facts WHERE fact = ?", (fact_to_delete,)
|
|
)
|
|
if not row:
|
|
logger.warning(f"General fact not found in SQLite: '{fact_to_delete}'")
|
|
return {"status": "not_found", "fact": fact_to_delete}
|
|
|
|
deleted_chroma_id = row[0]
|
|
|
|
# Delete from SQLite
|
|
await self._db_execute(
|
|
"DELETE FROM general_facts WHERE fact = ?", (fact_to_delete,)
|
|
)
|
|
logger.info(f"Deleted general fact from SQLite: '{fact_to_delete}'")
|
|
|
|
# Delete from ChromaDB if chroma_id exists
|
|
if deleted_chroma_id and self.fact_collection:
|
|
try:
|
|
logger.info(
|
|
f"Attempting to delete general fact from ChromaDB (ID: {deleted_chroma_id})."
|
|
)
|
|
await asyncio.to_thread(
|
|
self.fact_collection.delete, ids=[deleted_chroma_id]
|
|
)
|
|
logger.info(
|
|
f"Successfully deleted general fact from ChromaDB (ID: {deleted_chroma_id})."
|
|
)
|
|
except Exception as chroma_e:
|
|
logger.error(
|
|
f"ChromaDB error deleting general fact ID {deleted_chroma_id}: {chroma_e}",
|
|
exc_info=True,
|
|
)
|
|
# Log error but consider SQLite deletion successful
|
|
|
|
return {"status": "deleted", "fact_deleted": fact_to_delete}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error deleting general fact: {e}", exc_info=True)
|
|
return {"error": f"Database error deleting general fact: {str(e)}"}
|
|
|
|
# --- Goal Management Methods (SQLite) ---
|
|
|
|
async def add_goal(
|
|
self,
|
|
description: str,
|
|
priority: int = 5,
|
|
details: Optional[Dict[str, Any]] = None,
|
|
guild_id: Optional[str] = None,
|
|
channel_id: Optional[str] = None,
|
|
user_id: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
"""Adds a new goal to the database, including context."""
|
|
if not description:
|
|
return {"error": "Goal description is required."}
|
|
logger.info(f"Adding new goal (Priority {priority}): '{description}'")
|
|
details_json = json.dumps(details) if details else None
|
|
try:
|
|
# Check if goal already exists
|
|
existing = await self._db_fetchone(
|
|
"SELECT goal_id FROM gurt_goals WHERE description = ?", (description,)
|
|
)
|
|
if existing:
|
|
logger.warning(
|
|
f"Goal already exists: '{description}' (ID: {existing[0]})"
|
|
)
|
|
return {
|
|
"status": "duplicate",
|
|
"goal_id": existing[0],
|
|
"description": description,
|
|
}
|
|
|
|
async with self.db_lock:
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
cursor = await db.execute(
|
|
"""
|
|
INSERT INTO gurt_goals (description, priority, details, status, last_updated, guild_id, channel_id, user_id)
|
|
VALUES (?, ?, ?, 'pending', unixepoch('now'), ?, ?, ?)
|
|
""",
|
|
(
|
|
description,
|
|
priority,
|
|
details_json,
|
|
guild_id,
|
|
channel_id,
|
|
user_id,
|
|
),
|
|
)
|
|
await db.commit()
|
|
goal_id = cursor.lastrowid
|
|
logger.info(f"Goal added successfully (ID: {goal_id}): '{description}'")
|
|
return {"status": "added", "goal_id": goal_id, "description": description}
|
|
except Exception as e:
|
|
logger.error(f"Error adding goal '{description}': {e}", exc_info=True)
|
|
return {"error": f"Database error adding goal: {str(e)}"}
|
|
|
|
async def get_goals(
|
|
self, status: Optional[str] = None, limit: int = 10
|
|
) -> List[Dict[str, Any]]:
|
|
"""Retrieves goals, optionally filtered by status, ordered by priority."""
|
|
logger.info(f"Retrieving goals (Status: {status or 'any'}, Limit: {limit})")
|
|
goals = []
|
|
try:
|
|
sql = "SELECT goal_id, description, status, priority, created_timestamp, last_updated, details, guild_id, channel_id, user_id FROM gurt_goals"
|
|
params = []
|
|
if status:
|
|
sql += " WHERE status = ?"
|
|
params.append(status)
|
|
sql += " ORDER BY priority ASC, created_timestamp ASC LIMIT ?"
|
|
params.append(limit)
|
|
|
|
rows = await self._db_fetchall(sql, tuple(params))
|
|
for row in rows:
|
|
details = json.loads(row[6]) if row[6] else None
|
|
goals.append(
|
|
{
|
|
"goal_id": row[0],
|
|
"description": row[1],
|
|
"status": row[2],
|
|
"priority": row[3],
|
|
"created_timestamp": row[4],
|
|
"last_updated": row[5],
|
|
"details": details,
|
|
"guild_id": row[7],
|
|
"channel_id": row[8],
|
|
"user_id": row[9],
|
|
}
|
|
)
|
|
logger.info(f"Retrieved {len(goals)} goals.")
|
|
return goals
|
|
except Exception as e:
|
|
logger.error(f"Error retrieving goals: {e}", exc_info=True)
|
|
return []
|
|
|
|
async def update_goal(
|
|
self,
|
|
goal_id: int,
|
|
status: Optional[str] = None,
|
|
priority: Optional[int] = None,
|
|
details: Optional[Dict[str, Any]] = None,
|
|
) -> Dict[str, Any]:
|
|
"""Updates the status, priority, or details of a goal."""
|
|
logger.info(
|
|
f"Updating goal ID {goal_id} (Status: {status}, Priority: {priority}, Details: {bool(details)})"
|
|
)
|
|
if not any([status, priority is not None, details is not None]):
|
|
return {"error": "No update parameters provided."}
|
|
|
|
updates = []
|
|
params = []
|
|
if status:
|
|
updates.append("status = ?")
|
|
params.append(status)
|
|
if priority is not None:
|
|
updates.append("priority = ?")
|
|
params.append(priority)
|
|
if details is not None:
|
|
updates.append("details = ?")
|
|
params.append(json.dumps(details))
|
|
|
|
updates.append("last_updated = unixepoch('now')")
|
|
params.append(goal_id)
|
|
|
|
sql = f"UPDATE gurt_goals SET {', '.join(updates)} WHERE goal_id = ?"
|
|
|
|
try:
|
|
async with self.db_lock:
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
cursor = await db.execute(sql, tuple(params))
|
|
await db.commit()
|
|
if cursor.rowcount == 0:
|
|
logger.warning(f"Goal ID {goal_id} not found for update.")
|
|
return {"status": "not_found", "goal_id": goal_id}
|
|
logger.info(f"Goal ID {goal_id} updated successfully.")
|
|
return {"status": "updated", "goal_id": goal_id}
|
|
except Exception as e:
|
|
logger.error(f"Error updating goal ID {goal_id}: {e}", exc_info=True)
|
|
return {"error": f"Database error updating goal: {str(e)}"}
|
|
|
|
async def delete_goal(self, goal_id: int) -> Dict[str, Any]:
|
|
"""Deletes a goal from the database."""
|
|
logger.info(f"Attempting to delete goal ID {goal_id}")
|
|
try:
|
|
async with self.db_lock:
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
cursor = await db.execute(
|
|
"DELETE FROM gurt_goals WHERE goal_id = ?", (goal_id,)
|
|
)
|
|
await db.commit()
|
|
if cursor.rowcount == 0:
|
|
logger.warning(f"Goal ID {goal_id} not found for deletion.")
|
|
return {"status": "not_found", "goal_id": goal_id}
|
|
logger.info(f"Goal ID {goal_id} deleted successfully.")
|
|
return {"status": "deleted", "goal_id": goal_id}
|
|
except Exception as e:
|
|
logger.error(f"Error deleting goal ID {goal_id}: {e}", exc_info=True)
|
|
return {"error": f"Database error deleting goal: {str(e)}"}
|
|
|
|
# --- Internal Action Log Methods ---
|
|
|
|
async def add_internal_action_log(
|
|
self,
|
|
tool_name: str,
|
|
arguments: Optional[Dict[str, Any]],
|
|
result_summary: str,
|
|
reasoning: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
"""Logs the execution of an internal background action, including reasoning."""
|
|
if not tool_name:
|
|
return {"error": "Tool name is required for logging internal action."}
|
|
logger.info(
|
|
f"Logging internal action: Tool='{tool_name}', Args={arguments}, Reason='{reasoning}', Result='{result_summary[:100]}...'"
|
|
)
|
|
args_json = json.dumps(arguments) if arguments else None
|
|
# Truncate result summary and reasoning if too long for DB
|
|
max_len = 1000
|
|
truncated_summary = result_summary[:max_len] + (
|
|
"..." if len(result_summary) > max_len else ""
|
|
)
|
|
truncated_reasoning = (
|
|
reasoning[:max_len]
|
|
+ ("..." if reasoning and len(reasoning) > max_len else "")
|
|
if reasoning
|
|
else None
|
|
)
|
|
|
|
try:
|
|
async with self.db_lock:
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
cursor = await db.execute(
|
|
"""
|
|
INSERT INTO internal_actions (tool_name, arguments_json, reasoning, result_summary, timestamp)
|
|
VALUES (?, ?, ?, ?, unixepoch('now'))
|
|
""",
|
|
(tool_name, args_json, truncated_reasoning, truncated_summary),
|
|
)
|
|
await db.commit()
|
|
action_id = cursor.lastrowid
|
|
logger.info(
|
|
f"Internal action logged successfully (ID: {action_id}): Tool='{tool_name}'"
|
|
)
|
|
return {"status": "logged", "action_id": action_id}
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error logging internal action '{tool_name}': {e}", exc_info=True
|
|
)
|
|
return {"error": f"Database error logging internal action: {str(e)}"}
|
|
|
|
async def get_internal_action_logs(self, limit: int = 10) -> List[Dict[str, Any]]:
|
|
"""Retrieves the most recent internal action logs."""
|
|
logger.info(f"Retrieving last {limit} internal action logs.")
|
|
logs = []
|
|
try:
|
|
rows = await self._db_fetchall(
|
|
"""
|
|
SELECT action_id, timestamp, tool_name, arguments_json, reasoning, result_summary
|
|
FROM internal_actions
|
|
ORDER BY timestamp DESC
|
|
LIMIT ?
|
|
""",
|
|
(limit,),
|
|
)
|
|
for row in rows:
|
|
arguments = json.loads(row[3]) if row[3] else None
|
|
logs.append(
|
|
{
|
|
"action_id": row[0],
|
|
"timestamp": row[1],
|
|
"tool_name": row[2],
|
|
"arguments": arguments,
|
|
"reasoning": row[4],
|
|
"result_summary": row[5],
|
|
}
|
|
)
|
|
logger.info(f"Retrieved {len(logs)} internal action logs.")
|
|
return logs
|
|
except Exception as e:
|
|
logger.error(f"Error retrieving internal action logs: {e}", exc_info=True)
|
|
return []
|
|
|
|
# --- Add the new method below ---
|
|
async def clear_internal_action_logs(self) -> Dict[str, Any]:
|
|
"""Deletes all records from the internal_actions table."""
|
|
logger.info("Attempting to clear all internal action logs.")
|
|
try:
|
|
await self._db_execute("DELETE FROM internal_actions")
|
|
# Optionally, reset the autoincrement sequence if using SQLite
|
|
# await self._db_execute("DELETE FROM sqlite_sequence WHERE name='internal_actions'")
|
|
logger.info("Successfully cleared all internal action logs.")
|
|
return {"status": "cleared"}
|
|
except Exception as e:
|
|
logger.error(f"Error clearing internal action logs: {e}", exc_info=True)
|
|
return {"error": f"Database error clearing internal action logs: {str(e)}"}
|