feat: Add support for custom emojis and stickers in message formatting

This commit is contained in:
Slipstream 2025-05-27 22:51:43 -06:00
parent e8eaff4b9a
commit 6881dd4737
Signed by: slipstream
GPG Key ID: 13E498CE010AC6FD
2 changed files with 135 additions and 2 deletions

View File

@ -828,6 +828,16 @@ async def get_ai_response(cog: 'GurtCog', message: discord.Message, model_name:
if attach_desc_list:
attach_desc_str = "\n".join(attach_desc_list)
text_parts.append(f"\n[Attachments]:\n{attach_desc_str}")
# Add custom emoji and sticker descriptions from cache for historical messages
cached_emojis = msg.get("custom_emojis", [])
for emoji_info in cached_emojis:
text_parts.append(f"[Emoji: {emoji_info.get('name', 'unknown')}]")
cached_stickers = msg.get("stickers", [])
for sticker_info in cached_stickers:
sticker_format_text = f" (Format: {sticker_info.get('format')})" if sticker_info.get('format') else ""
text_parts.append(f"[Sticker: {sticker_info.get('name', 'unknown')}{sticker_format_text}]")
full_text = "\n".join(text_parts).strip()
if full_text: # Only add if there's some text content
@ -952,9 +962,91 @@ async def get_ai_response(cog: 'GurtCog', message: discord.Message, model_name:
elif not message.attachments:
print("No attachments found for the current message.")
elif current_user_content_index == -1:
print("Warning: Could not find current user message in contents to add attachments to.")
print("Warning: Could not find current user message in contents to add attachments to (for attachments).")
# --- End attachment processing ---
# --- Add current message custom emojis and stickers ---
if current_user_content_index != -1:
emoji_sticker_parts_to_add = []
# Process custom emojis from formatted_current_message
custom_emojis_current = formatted_current_message.get("custom_emojis", [])
for emoji_info in custom_emojis_current:
emoji_name = emoji_info.get("name")
emoji_url = emoji_info.get("url")
if emoji_name and emoji_url:
emoji_sticker_parts_to_add.append(types.Part(text=f"[Emoji: {emoji_name}]"))
print(f"Added text description for current message emoji: {emoji_name}")
try:
if not hasattr(cog, 'session') or not cog.session:
raise ValueError("aiohttp session not found in cog for emoji download.")
async with cog.session.get(emoji_url, timeout=10) as response:
if response.status == 200:
image_bytes = await response.read()
mime_type = response.content_type or "image/png" # Default for emojis
# Gemini supports: image/png, image/jpeg, image/webp, image/heic, image/heif. GIFs are common for emojis.
supported_emoji_mimes = ["image/png", "image/jpeg", "image/webp", "image/gif"]
clean_mime_type = mime_type.split(';')[0].lower()
if clean_mime_type in supported_emoji_mimes:
image_part = types.Part(data=image_bytes, mime_type=clean_mime_type)
emoji_sticker_parts_to_add.append(image_part)
print(f"Added image part for emoji: {emoji_name} (MIME: {clean_mime_type})")
else:
print(f"Unsupported MIME type for emoji {emoji_name}: {clean_mime_type}. URL: {emoji_url}")
else:
print(f"Failed to download emoji {emoji_name} from {emoji_url}: Status {response.status}")
except Exception as e:
print(f"Error processing emoji image {emoji_name} from {emoji_url}: {e}")
# Process stickers from formatted_current_message
stickers_current = formatted_current_message.get("stickers", [])
for sticker_info in stickers_current:
sticker_name = sticker_info.get("name")
sticker_url = sticker_info.get("url")
# Assuming format_message stores format as string like "StickerFormatType.png"
sticker_format_str = sticker_info.get("format")
if sticker_name and sticker_url:
emoji_sticker_parts_to_add.append(types.Part(text=f"[Sticker: {sticker_name}]"))
print(f"Added text description for current message sticker: {sticker_name}")
# Check if sticker format is PNG or APNG (which is also PNG compatible)
is_downloadable_sticker = sticker_format_str in ["StickerFormatType.png", "StickerFormatType.apng"]
if is_downloadable_sticker:
try:
if not hasattr(cog, 'session') or not cog.session:
raise ValueError("aiohttp session not found in cog for sticker download.")
async with cog.session.get(sticker_url, timeout=10) as response:
if response.status == 200:
image_bytes = await response.read()
# Stickers are typically image/png or image/webp (APNG often served as image/png)
mime_type = response.content_type or "image/png"
supported_sticker_mimes = ["image/png", "image/webp"]
clean_mime_type = mime_type.split(';')[0].lower()
if clean_mime_type in supported_sticker_mimes:
image_part = types.Part(data=image_bytes, mime_type=clean_mime_type)
emoji_sticker_parts_to_add.append(image_part)
print(f"Added image part for sticker: {sticker_name} (MIME: {clean_mime_type})")
else:
print(f"Unsupported MIME type for sticker {sticker_name}: {clean_mime_type}. URL: {sticker_url}")
else:
print(f"Failed to download sticker {sticker_name} from {sticker_url}: Status {response.status}")
except Exception as e:
print(f"Error processing sticker image {sticker_name} from {sticker_url}: {e}")
elif sticker_format_str == "StickerFormatType.lottie":
lottie_info_part = types.Part(text=f"[Sticker is a Lottie animation: {sticker_name}, URL: {sticker_url}]")
emoji_sticker_parts_to_add.append(lottie_info_part)
print(f"Sticker {sticker_name} is Lottie (JSON animation), added text info.")
else:
print(f"Sticker {sticker_name} has format {sticker_format_str}, not attempting image download. URL: {sticker_url}")
if emoji_sticker_parts_to_add:
contents[current_user_content_index].parts.extend(emoji_sticker_parts_to_add)
print(f"Extended user message at index {current_user_content_index} with {len(emoji_sticker_parts_to_add)} emoji/sticker parts.")
elif current_user_content_index == -1 : # Only print if it's specifically for emojis/stickers
print("Warning: Could not find current user message in contents to add emojis/stickers to.")
# --- End emoji and sticker processing for current message ---
# --- Prepare Tools ---
# Preprocess tool parameter schemas before creating the Tool object
preprocessed_declarations = []

View File

@ -85,9 +85,50 @@ def format_message(cog: 'GurtCog', message: discord.Message) -> Dict[str, Any]:
"replied_to_author_id": None,
"replied_to_author_name": None,
"replied_to_content_snippet": None, # Changed field name for clarity
"is_reply": False
"is_reply": False,
"custom_emojis": [], # Initialize custom_emojis list
"stickers": [] # Initialize stickers list
}
# --- Custom Emoji Processing ---
# Regex to find custom emojis: <:name:id> or <a:name:id>
emoji_pattern = re.compile(r'<(a)?:([a-zA-Z0-9_]+):([0-9]+)>')
for match in emoji_pattern.finditer(message.content):
animated_flag, emoji_name, emoji_id_str = match.groups()
emoji_id = int(emoji_id_str)
animated = bool(animated_flag)
emoji_obj = cog.bot.get_emoji(emoji_id)
if emoji_obj:
formatted_msg["custom_emojis"].append({
"name": emoji_obj.name,
"url": str(emoji_obj.url),
"id": str(emoji_obj.id),
"animated": emoji_obj.animated
})
else:
# Fallback if emoji is not directly accessible by the bot
# Construct a potential URL (Discord's CDN format)
extension = "gif" if animated else "png"
fallback_url = f"https://cdn.discordapp.com/emojis/{emoji_id}.{extension}"
formatted_msg["custom_emojis"].append({
"name": emoji_name, # Name from regex
"url": fallback_url,
"id": emoji_id_str,
"animated": animated
})
# --- Sticker Processing ---
if message.stickers:
for sticker_item in message.stickers:
# discord.StickerItem has name, id, format, and url
formatted_msg["stickers"].append({
"name": sticker_item.name,
"url": str(sticker_item.url), # sticker_item.url is already the asset URL
"id": str(sticker_item.id),
"format": str(sticker_item.format) # e.g., "StickerFormatType.png"
})
# --- Reply Processing ---
if message.reference and message.reference.message_id:
formatted_msg["replied_to_message_id"] = str(message.reference.message_id)