feat: Improve custom emoji and sticker handling in AI responses

Refactor AI response processing to correctly handle custom emojis and stickers.
Custom emojis are now converted to Discord's `<:name:id>` format for proper display.
Custom stickers are identified and their IDs are extracted to be sent as separate attachments, removing them from the main content.
This ensures Gurt can properly utilize learned custom emojis and stickers in its responses.
This commit is contained in:
Slipstream 2025-05-28 14:48:58 -06:00
parent 0c4df8d94d
commit f0de735d13
Signed by: slipstream
GPG Key ID: 13E498CE010AC6FD
3 changed files with 146 additions and 131 deletions

View File

@ -769,7 +769,7 @@ def find_function_call_in_parts(parts: Optional[List[types.Part]]) -> Optional[t
# --- Main AI Response Function ---
async def get_ai_response(cog: 'GurtCog', message: discord.Message, model_name: Optional[str] = None) -> Dict[str, Any]:
async def get_ai_response(cog: 'GurtCog', message: discord.Message, model_name: Optional[str] = None) -> Tuple[Dict[str, Any], List[str]]:
"""
Gets responses from the Vertex AI Gemini API, handling potential tool usage and returning
the final parsed response.
@ -1479,57 +1479,61 @@ async def get_ai_response(cog: 'GurtCog', message: discord.Message, model_name:
fallback_response = {"should_respond": True, "content": "...", "react_with_emoji": ""}
sticker_ids_to_send: List[str] = []
# --- Handle Custom Emoji/Sticker Replacement in Content ---
if final_parsed_data and final_parsed_data.get("content"):
content_to_process = final_parsed_data["content"]
# Find all potential custom emoji/sticker names like :name:
potential_custom_items = re.findall(r':([\w\d_]+):', content_to_process)
# Use a non-greedy match for the name to avoid matching across multiple colons
potential_custom_items = re.findall(r':([\w\d_]+?):', content_to_process)
modified_content = content_to_process
for item_name_key in potential_custom_items:
full_item_name = f":{item_name_key}:"
full_item_name_with_colons = f":{item_name_key}:"
# Check if it's a known custom emoji
emoji_url = await cog.emoji_manager.get_emoji(full_item_name)
if emoji_url:
# For now, we'll just ensure the name is there.
# Actual replacement to Discord's <name:id> format would require knowing the ID,
# which we don't store directly with this simple name/URL mapping.
# The AI is instructed to use the name, and the listener learns the ID.
# If the AI uses a name it learned, it should be fine.
# If we want Gurt to *send* an emoji it learned by URL, that's different.
# For now, this confirms the AI *can* reference it.
print(f"Found known custom emoji '{full_item_name}' in response content.")
# No replacement needed if AI uses the name correctly.
# If we wanted to send the image URL instead:
# modified_content = modified_content.replace(full_item_name, f"{full_item_name} ({emoji_url})")
emoji_data = await cog.emoji_manager.get_emoji(full_item_name_with_colons)
if emoji_data:
emoji_id = emoji_data.get("id")
is_animated = emoji_data.get("animated", False)
if emoji_id:
discord_emoji_syntax = f"<{'a' if is_animated else ''}:{item_name_key}:{emoji_id}>"
modified_content = modified_content.replace(full_item_name_with_colons, discord_emoji_syntax)
print(f"Replaced custom emoji '{full_item_name_with_colons}' with Discord syntax: {discord_emoji_syntax}")
else:
print(f"Found custom emoji '{full_item_name_with_colons}' but no ID stored.")
# Check if it's a known custom sticker
sticker_url = await cog.emoji_manager.get_sticker(full_item_name)
if sticker_url:
print(f"Found known custom sticker '{full_item_name}' in response content.")
# Stickers are sent as separate attachments by users.
# If Gurt is to send a sticker, it would typically send the URL.
# We can append the URL if the AI mentions a known sticker.
# This assumes the AI mentions it like text, and we append the URL.
if sticker_url not in modified_content: # Avoid duplicate appends
modified_content = f"{modified_content}\n{sticker_url}".strip()
print(f"Appended sticker URL for '{full_item_name}': {sticker_url}")
if modified_content != final_parsed_data["content"]:
final_parsed_data["content"] = modified_content
print("Content modified with custom emoji/sticker information.")
sticker_data = await cog.emoji_manager.get_sticker(full_item_name_with_colons)
if sticker_data:
sticker_id = sticker_data.get("id")
if sticker_id:
# Remove the sticker text from the content
modified_content = modified_content.replace(full_item_name_with_colons, "").strip()
sticker_ids_to_send.append(sticker_id)
print(f"Found custom sticker '{full_item_name_with_colons}', removed from content, added ID '{sticker_id}' to send list.")
else:
print(f"Found custom sticker '{full_item_name_with_colons}' but no ID stored.")
# Clean up any double spaces or leading/trailing whitespace after replacements
modified_content = re.sub(r'\s+', ' ', modified_content).strip()
final_parsed_data["content"] = modified_content
print("Content processed for custom emoji/sticker information.")
# Return dictionary structure remains the same, but initial_response is removed
return {
"final_response": final_parsed_data, # Parsed final data (or None)
"error": error_message, # Error message (or None)
"fallback_initial": fallback_response # Fallback for critical failures
}
return (
{
"final_response": final_parsed_data, # Parsed final data (or None)
"error": error_message, # Error message (or None)
"fallback_initial": fallback_response # Fallback for critical failures
},
sticker_ids_to_send # Return the list of sticker IDs
)
# --- Proactive AI Response Function ---
async def get_proactive_ai_response(cog: 'GurtCog', message: discord.Message, trigger_reason: str) -> Dict[str, Any]:
async def get_proactive_ai_response(cog: 'GurtCog', message: discord.Message, trigger_reason: str) -> Tuple[Dict[str, Any], List[str]]:
"""Generates a proactive response based on a specific trigger using Vertex AI."""
if not PROJECT_ID or not LOCATION:
return {"should_respond": False, "content": None, "react_with_emoji": None, "error": "Google Cloud Project ID or Location not configured"}
@ -1740,7 +1744,7 @@ async def get_proactive_ai_response(cog: 'GurtCog', message: discord.Message, tr
final_parsed_data["content"] = modified_content
print("Proactive content modified with custom emoji/sticker information.")
return final_parsed_data
return final_parsed_data, [] # Return empty list for stickers
# --- Internal AI Call for Specific Tasks ---

View File

@ -1,6 +1,6 @@
import json
import os
from typing import Dict, Optional, Tuple
from typing import Dict, Optional, Tuple, Union
DATA_FILE_PATH = "data/custom_emojis_stickers.json"
@ -48,11 +48,11 @@ class EmojiManager:
print(f"Error saving emoji/sticker data: {e}")
return False
async def add_emoji(self, name: str, url: str) -> bool:
async def add_emoji(self, name: str, emoji_id: str, is_animated: bool) -> bool:
"""Adds a custom emoji."""
if name in self.data["emojis"]:
return False # Emoji already exists
self.data["emojis"][name] = url
self.data["emojis"][name] = {"id": emoji_id, "animated": is_animated}
return self._save_data()
async def remove_emoji(self, name: str) -> bool:
@ -62,19 +62,19 @@ class EmojiManager:
del self.data["emojis"][name]
return self._save_data()
async def list_emojis(self) -> Dict[str, str]:
async def list_emojis(self) -> Dict[str, Dict[str, Union[str, bool]]]:
"""Lists all custom emojis."""
return self.data["emojis"]
async def get_emoji(self, name: str) -> Optional[str]:
async def get_emoji(self, name: str) -> Optional[Dict[str, Union[str, bool]]]:
"""Gets a specific custom emoji by name."""
return self.data["emojis"].get(name)
async def add_sticker(self, name: str, url: str) -> bool:
async def add_sticker(self, name: str, sticker_id: str) -> bool:
"""Adds a custom sticker."""
if name in self.data["stickers"]:
return False # Sticker already exists
self.data["stickers"][name] = url
self.data["stickers"][name] = {"id": sticker_id}
return self._save_data()
async def remove_sticker(self, name: str) -> bool:
@ -84,10 +84,10 @@ class EmojiManager:
del self.data["stickers"][name]
return self._save_data()
async def list_stickers(self) -> Dict[str, str]:
async def list_stickers(self) -> Dict[str, Dict[str, str]]:
"""Lists all custom stickers."""
return self.data["stickers"]
async def get_sticker(self, name: str) -> Optional[str]:
async def get_sticker(self, name: str) -> Optional[Dict[str, str]]:
"""Gets a specific custom sticker by name."""
return self.data["stickers"].get(name)

View File

@ -5,7 +5,7 @@ import asyncio
import time
import re
import os # Added for file handling in error case
from typing import TYPE_CHECKING, Union, Dict, Any, Optional
from typing import TYPE_CHECKING, List, Union, Dict, Any, Optional
# Relative imports
from .utils import format_message # Import format_message
@ -77,24 +77,26 @@ async def on_message_listener(cog: 'GurtCog', message: discord.Message):
# Custom Emojis in message content
if message.content:
custom_emojis = re.findall(r'<(a)?:(\w+):(\d+)>', message.content)
for animated, name, emoji_id in custom_emojis:
emoji_url = f"https://cdn.discordapp.com/emojis/{emoji_id}.{'gif' if animated else 'png'}"
# Use a consistent naming convention, perhaps just the name from Discord
for animated, name, emoji_id_str in custom_emojis: # Renamed emoji_id to emoji_id_str
# emoji_url = f"https://cdn.discordapp.com/emojis/{emoji_id_str}.{'gif' if animated else 'png'}"
emoji_name_key = f":{name}:"
existing_emoji = await cog.emoji_manager.get_emoji(emoji_name_key)
if not existing_emoji or existing_emoji != emoji_url: # Add if new or URL changed
print(f"Learning custom emoji: {emoji_name_key} ({emoji_url})")
await cog.emoji_manager.add_emoji(emoji_name_key, emoji_url)
# Check if already learned, if not, add it
existing_emoji_data = await cog.emoji_manager.get_emoji(emoji_name_key)
if not existing_emoji_data or existing_emoji_data.get("id") != emoji_id_str:
print(f"Learning custom emoji: {emoji_name_key} (ID: {emoji_id_str}, Animated: {bool(animated)})")
await cog.emoji_manager.add_emoji(emoji_name_key, emoji_id_str, bool(animated))
# Stickers in message
if message.stickers:
for sticker_item in message.stickers:
sticker_name_key = f":{sticker_item.name}:" # Use sticker name as key
sticker_url = sticker_item.url
existing_sticker = await cog.emoji_manager.get_sticker(sticker_name_key)
if not existing_sticker or existing_sticker != sticker_url: # Add if new or URL changed
print(f"Learning sticker: {sticker_name_key} ({sticker_url})")
await cog.emoji_manager.add_sticker(sticker_name_key, sticker_url)
sticker_id_str = str(sticker_item.id) # Ensure ID is string
# Check if already learned, if not, add it
existing_sticker_data = await cog.emoji_manager.get_sticker(sticker_name_key)
if not existing_sticker_data or existing_sticker_data.get("id") != sticker_id_str:
print(f"Learning sticker: {sticker_name_key} (ID: {sticker_id_str})")
await cog.emoji_manager.add_sticker(sticker_name_key, sticker_id_str)
# --- End Emoji/Sticker Learning ---
thread_id = message.channel.id if isinstance(message.channel, discord.Thread) else None
@ -358,31 +360,34 @@ async def on_message_listener(cog: 'GurtCog', message: discord.Message):
# --- Call AI and Handle Response ---
cog.current_channel = message.channel # Ensure current channel is set for API calls/tools
try:
response_bundle = None
try: # This is the outer try block
response_dict: Dict[str, Any]
sticker_ids_to_send: List[str] = [] # Initialize sticker_ids_to_send
if proactive_trigger_met:
print(f"Calling get_proactive_ai_response for message {message.id} due to: {consideration_reason}")
response_bundle = await get_proactive_ai_response(cog, message, consideration_reason)
response_dict, sticker_ids_to_send = await get_proactive_ai_response(cog, message, consideration_reason)
else:
print(f"Calling get_ai_response for message {message.id}")
response_bundle = await get_ai_response(cog, message)
response_dict, sticker_ids_to_send = await get_ai_response(cog, message)
# --- Handle AI Response Bundle ---
initial_response = response_bundle.get("initial_response")
final_response = response_bundle.get("final_response")
error_msg = response_bundle.get("error")
fallback_initial = response_bundle.get("fallback_initial")
# The 'initial_response' key is no longer used as get_ai_response/get_proactive_ai_response
# now directly return the final processed response data in the first element of the tuple.
# We'll use 'final_response_data' to hold this.
final_response_data = response_dict.get("final_response")
error_msg = response_dict.get("error")
fallback_initial = response_dict.get("fallback_initial") # This might still be relevant for critical errors
if error_msg:
print(f"Critical Error from AI response function: {error_msg}")
# NEW LOGIC: Always send a notification if an error occurred here
error_notification = f"Oops! Something went wrong while processing that. (`{error_msg[:100]}`)" # Include part of the error
error_notification = f"Oops! Something went wrong while processing that. (`{error_msg[:100]}`)"
try:
# await message.channel.send(error_notification) # Error notification disabled
print('disabled error notification')
#await message.channel.send(error_notification)
except Exception as send_err:
print(f"Failed to send error notification to channel: {send_err}")
return # Still exit after handling the error
return
# --- Process and Send Responses ---
sent_any_message = False
@ -390,27 +395,29 @@ async def on_message_listener(cog: 'GurtCog', message: discord.Message):
# Helper function to handle sending a single response text and caching
async def send_response_content(
response_data: Optional[Dict[str, Any]],
response_data_param: Optional[Dict[str, Any]], # Renamed to avoid conflict
response_label: str,
original_message: discord.Message # Add original message for context
original_message: discord.Message,
current_sticker_ids: List[str] # Pass the specific sticker IDs for this response
) -> bool:
nonlocal sent_any_message # Allow modification of the outer scope variable
if not response_data or not isinstance(response_data, dict) or \
not response_data.get("should_respond") or not response_data.get("content"):
return False # Nothing to send
nonlocal sent_any_message
if not response_data_param or not isinstance(response_data_param, dict) or \
not response_data_param.get("should_respond") or not response_data_param.get("content"):
# If content is None but stickers are present, we might still want to send
if not (response_data_param and response_data_param.get("should_respond") and current_sticker_ids):
return False
response_text = response_data["content"]
reply_to_id = response_data.get("reply_to_message_id")
response_text = response_data_param.get("content", "") # Default to empty string if content is None
reply_to_id = response_data_param.get("reply_to_message_id")
message_reference = None
print(f"Preparing to send {response_label} content...")
# --- Handle Reply ---
if reply_to_id and isinstance(reply_to_id, str) and reply_to_id.isdigit(): # Check if it's a valid ID string
if reply_to_id and isinstance(reply_to_id, str) and reply_to_id.isdigit():
try:
original_reply_msg = await original_message.channel.fetch_message(int(reply_to_id)) # Now safe to convert
original_reply_msg = await original_message.channel.fetch_message(int(reply_to_id))
if original_reply_msg:
message_reference = original_reply_msg.to_reference(fail_if_not_exists=False) # Don't error if deleted
message_reference = original_reply_msg.to_reference(fail_if_not_exists=False)
print(f"Will reply to message ID: {reply_to_id}")
else:
print(f"Warning: Could not fetch message {reply_to_id} to reply to.")
@ -421,12 +428,9 @@ async def on_message_listener(cog: 'GurtCog', message: discord.Message):
elif reply_to_id:
print(f"Warning: Invalid reply_to_id format received: {reply_to_id}")
# --- Handle Pings ---
ping_matches = re.findall(r'\[PING:\s*([^\]]+)\s*\]', response_text)
if ping_matches:
print(f"Found ping placeholders: {ping_matches}")
# Import get_user_id tool function dynamically or ensure it's accessible
from .tools import get_user_id
for user_name_to_ping in ping_matches:
user_id_result = await get_user_id(cog, user_name_to_ping.strip())
@ -437,20 +441,33 @@ async def on_message_listener(cog: 'GurtCog', message: discord.Message):
print(f"Replaced ping placeholder for '{user_name_to_ping}' with <@{user_id_to_ping}>")
else:
print(f"Warning: get_user_id succeeded for '{user_name_to_ping}' but returned no ID.")
response_text = response_text.replace(f'[PING: {user_name_to_ping}]', user_name_to_ping, 1) # Replace with name as fallback
response_text = response_text.replace(f'[PING: {user_name_to_ping}]', user_name_to_ping, 1)
else:
print(f"Warning: Could not find user ID for ping placeholder '{user_name_to_ping}'. Error: {user_id_result.get('error')}")
response_text = response_text.replace(f'[PING: {user_name_to_ping}]', user_name_to_ping, 1) # Replace with name as fallback
response_text = response_text.replace(f'[PING: {user_name_to_ping}]', user_name_to_ping, 1)
# --- Send Message ---
if len(response_text) > 1900:
discord_stickers_to_send = [discord.Object(id=int(s_id)) for s_id in current_sticker_ids if s_id.isdigit()] if current_sticker_ids else []
# Only proceed if there's text or stickers to send
if not response_text and not discord_stickers_to_send:
if response_data_param and response_data_param.get("should_respond"): # Log if it was supposed to respond but had nothing
print(f"Warning: {response_label} response marked 'should_respond' but has no content or stickers.")
return False
if len(response_text) > 1900: # Discord character limit is 2000, 1900 gives buffer
filepath = f'gurt_{response_label}_{original_message.id}.txt'
try:
with open(filepath, 'w', encoding='utf-8') as f: f.write(response_text)
# Send file with reference if applicable
await original_message.channel.send(f"{response_label.capitalize()} response too long:", file=discord.File(filepath), reference=message_reference, mention_author=True) # Also mention when sending as file
await original_message.channel.send(
f"{response_label.capitalize()} response too long:",
file=discord.File(filepath),
reference=message_reference,
mention_author=True,
stickers=discord_stickers_to_send
)
sent_any_message = True
print(f"Sent {response_label} content as file (Reply: {bool(message_reference)}).")
print(f"Sent {response_label} content as file (Reply: {bool(message_reference)}, Stickers: {len(discord_stickers_to_send)}).")
return True
except Exception as file_e: print(f"Error writing/sending long {response_label} response file: {file_e}")
finally:
@ -458,49 +475,44 @@ async def on_message_listener(cog: 'GurtCog', message: discord.Message):
except OSError as os_e: print(f"Error removing temp file {filepath}: {os_e}")
else:
try:
async with original_message.channel.typing():
await simulate_human_typing(cog, original_message.channel, response_text) # Use simulation
# Send message with reference if applicable
sent_msg = await original_message.channel.send(response_text, reference=message_reference, mention_author=True) # mention_author=True to ping the user being replied to
# Only enter typing context if there's text to send
if response_text:
async with original_message.channel.typing():
await simulate_human_typing(cog, original_message.channel, response_text)
sent_msg = await original_message.channel.send(
response_text if response_text else None, # Send None if only stickers
reference=message_reference,
mention_author=True,
stickers=discord_stickers_to_send
)
sent_any_message = True
# Cache this bot response
bot_response_cache_entry = format_message(cog, sent_msg) # Pass cog
bot_response_cache_entry = format_message(cog, sent_msg)
cog.message_cache['by_channel'][channel_id].append(bot_response_cache_entry)
cog.message_cache['global_recent'].append(bot_response_cache_entry)
cog.bot_last_spoke[channel_id] = time.time()
# Track participation topic
identified_topics = identify_conversation_topics(cog, [bot_response_cache_entry]) # Pass cog
identified_topics = identify_conversation_topics(cog, [bot_response_cache_entry])
if identified_topics:
topic = identified_topics[0]['topic'].lower().strip()
cog.gurt_participation_topics[topic] += 1
print(f"Tracked Gurt participation ({response_label}) in topic: '{topic}'")
print(f"Sent {response_label} content (Reply: {bool(message_reference)}).")
print(f"Sent {response_label} content (Reply: {bool(message_reference)}, Stickers: {len(discord_stickers_to_send)}).")
return True
except Exception as send_e:
print(f"Error sending {response_label} content: {send_e}")
return False
# Send initial response content if valid
# Pass the original message object 'message' here
sent_initial_message = await send_response_content(initial_response, "initial", message)
# Send the main response content (which is now in final_response_data)
# sticker_ids_to_send is already defined from the AI response unpacking
sent_main_message = await send_response_content(final_response_data, "final", message, sticker_ids_to_send)
# Send final response content if valid (and different from initial, if initial was sent)
sent_final_message = False
initial_content = initial_response.get("content") if initial_response else None
if final_response and (not sent_initial_message or initial_content != final_response.get("content")):
# Pass the original message object 'message' here too
sent_final_message = await send_response_content(final_response, "final", message)
# Handle Reaction (prefer final response for reaction if it exists)
reaction_source = final_response if final_response else initial_response
if reaction_source and isinstance(reaction_source, dict):
emoji_to_react = reaction_source.get("react_with_emoji")
# Handle Reaction (using final_response_data)
if final_response_data and isinstance(final_response_data, dict):
emoji_to_react = final_response_data.get("react_with_emoji")
if emoji_to_react and isinstance(emoji_to_react, str):
try:
# Basic validation for standard emoji
if 1 <= len(emoji_to_react) <= 4 and not re.match(r'<a?:.+?:\d+>', emoji_to_react):
# Only react if we haven't sent any message content (avoid double interaction)
if not sent_any_message:
if not sent_any_message: # Only react if no message was sent
await message.add_reaction(emoji_to_react)
reacted = True
print(f"Bot reacted to message {message.id} with {emoji_to_react}")
@ -510,22 +522,21 @@ async def on_message_listener(cog: 'GurtCog', message: discord.Message):
except Exception as e: print(f"Error adding reaction '{emoji_to_react}': {e}")
# Log if response was intended but nothing was sent/reacted
# Check if initial response intended action but nothing happened
initial_intended_action = initial_response and initial_response.get("should_respond")
initial_action_taken = sent_initial_message or (reacted and reaction_source == initial_response)
# Check if final response intended action but nothing happened
final_intended_action = final_response and final_response.get("should_respond")
final_action_taken = sent_final_message or (reacted and reaction_source == final_response)
if (initial_intended_action and not initial_action_taken) or \
(final_intended_action and not final_action_taken):
print(f"Warning: AI response intended action but nothing sent/reacted. Initial: {initial_response}, Final: {final_response}")
intended_action = final_response_data and final_response_data.get("should_respond")
action_taken = sent_main_message or reacted
if intended_action and not action_taken:
print(f"Warning: AI response intended action but nothing sent/reacted. Response data: {final_response_data}")
# Handle fallback if no other action was taken and fallback_initial is present
if not action_taken and fallback_initial and fallback_initial.get("should_respond"):
print("Attempting to send fallback_initial response...")
await send_response_content(fallback_initial, "fallback", message, []) # No stickers for fallback
except Exception as e:
print(f"Exception in on_message listener main block: {str(e)}")
import traceback
traceback.print_exc()
if bot_mentioned or replied_to_bot: # Check again in case error happened before response handling
if bot_mentioned or replied_to_bot:
await message.channel.send(random.choice(["...", "*confused gurting*", "brain broke sorry"]))