feat: Enhance historical message processing with custom emoji and sticker image support

This commit is contained in:
Slipstream 2025-05-27 23:04:25 -06:00
parent 87e0fe0340
commit f02c2f865c
Signed by: slipstream
GPG Key ID: 13E498CE010AC6FD

View File

@ -829,24 +829,95 @@ async def get_ai_response(cog: 'GurtCog', message: discord.Message, model_name:
attach_desc_str = "\n".join(attach_desc_list)
text_parts.append(f"\n[Attachments]:\n{attach_desc_str}")
# Add custom emoji and sticker descriptions from cache for historical messages
# Add custom emoji and sticker descriptions and images from cache for historical messages
cached_emojis = msg.get("custom_emojis", [])
for emoji_info in cached_emojis:
text_parts.append(f"[Emoji: {emoji_info.get('name', 'unknown')}]")
emoji_name = emoji_info.get("name")
emoji_url = emoji_info.get("url")
if emoji_name:
text_parts.append(f"[Emoji: {emoji_name}]")
if emoji_url and emoji_url.startswith('http'):
try:
if not hasattr(cog, 'session') or not cog.session:
raise ValueError("aiohttp session not found in cog for emoji download.")
async with cog.session.get(emoji_url, timeout=5) as response:
if response.status == 200:
image_bytes = await response.read()
mime_type = response.content_type or "image/png"
supported_emoji_mimes = ["image/png", "image/jpeg", "image/webp", "image/gif"]
clean_mime_type = mime_type.split(';')[0].lower()
if clean_mime_type in supported_emoji_mimes:
parts.append(types.Part(data=image_bytes, mime_type=clean_mime_type))
print(f"Added image part for historical emoji: {emoji_name} (MIME: {clean_mime_type})")
else:
print(f"Unsupported MIME type for historical emoji {emoji_name}: {clean_mime_type}. URL: {emoji_url}")
else:
print(f"Failed to download historical emoji {emoji_name} from {emoji_url}: Status {response.status}")
except Exception as e:
print(f"Error processing historical emoji image {emoji_name} from {emoji_url}: {e}")
cached_stickers = msg.get("stickers", [])
for sticker_info in cached_stickers:
sticker_name = sticker_info.get("name")
sticker_url = sticker_info.get("url")
sticker_format_text = f" (Format: {sticker_info.get('format')})" if sticker_info.get('format') else ""
text_parts.append(f"[Sticker: {sticker_info.get('name', 'unknown')}{sticker_format_text}]")
if sticker_name:
text_parts.append(f"[Sticker: {sticker_name}{sticker_format_text}]")
is_downloadable_sticker = sticker_info.get("format") in ["StickerFormatType.png", "StickerFormatType.apng"]
if is_downloadable_sticker and sticker_url and sticker_url.startswith('http'):
try:
if not hasattr(cog, 'session') or not cog.session:
raise ValueError("aiohttp session not found in cog for sticker download.")
async with cog.session.get(sticker_url, timeout=5) as response:
if response.status == 200:
image_bytes = await response.read()
mime_type = response.content_type or "image/png"
supported_sticker_mimes = ["image/png", "image/webp"]
clean_mime_type = mime_type.split(';')[0].lower()
if clean_mime_type in supported_sticker_mimes:
parts.append(types.Part(data=image_bytes, mime_type=clean_mime_type))
print(f"Added image part for historical sticker: {sticker_name} (MIME: {clean_mime_type})")
else:
print(f"Unsupported MIME type for historical sticker {sticker_name}: {clean_mime_type}. URL: {sticker_url}")
else:
print(f"Failed to download historical sticker {sticker_name} from {sticker_url}: Status {response.status}")
except Exception as e:
print(f"Error processing historical sticker image {sticker_name} from {sticker_url}: {e}")
elif sticker_info.get("format") == "StickerFormatType.lottie":
parts.append(types.Part(text=f"[Sticker is a Lottie animation: {sticker_name}, URL: {sticker_url}]"))
full_text = "\n".join(text_parts).strip()
if full_text: # Only add if there's some text content
author_details = msg.get("author", {})
display_name = author_details.get("display_name", "Unknown User")
username = author_details.get("name", "unknown_username") # 'name' usually holds the username
# Construct a more informative author string
author_identifier_string = f"{display_name} (Username: {username})"
contents.append(types.Content(role=role, parts=[types.Part(text=f"{author_identifier_string}: {full_text}")]))
raw_display_name = author_details.get("display_name")
raw_name = author_details.get("name") # Discord username
author_id = author_details.get("id")
final_display_part = "Unknown User"
username_part_str = ""
if raw_display_name and str(raw_display_name).strip():
final_display_part = str(raw_display_name)
elif raw_name and str(raw_name).strip(): # Fallback display to username
final_display_part = str(raw_name)
elif author_id: # Fallback display to User ID
final_display_part = f"User ID: {author_id}"
# Construct username part if raw_name is valid and different from final_display_part
if raw_name and str(raw_name).strip() and str(raw_name).lower() != "none":
# Avoid "Username (Username: Username)" if display name fell back to raw_name
if final_display_part.lower() != str(raw_name).lower():
username_part_str = f" (Username: {str(raw_name)})"
# If username is bad/missing, but we have an ID, and ID isn't already the main display part
elif author_id and not (raw_name and str(raw_name).strip() and str(raw_name).lower() != "none"):
if not final_display_part.startswith("User ID:"):
username_part_str = f" (User ID: {author_id})"
author_identifier_string = f"{final_display_part}{username_part_str}"
# Append the text part to the existing parts list for this message
parts.append(types.Part(text=f"{author_identifier_string}: {full_text}"))
contents.append(types.Content(role=role, parts=parts)) # Add the content with all parts
# --- Prepare the current message content (potentially multimodal) ---