discordbot/gurt/utils.py
2025-06-05 21:31:06 -06:00

314 lines
13 KiB
Python

import discord
import re
import random
import asyncio
import time
import datetime
import json
import os
from typing import TYPE_CHECKING, Optional, Tuple, Dict, Any
if TYPE_CHECKING:
from .cog import GurtCog # For type hinting
# --- Utility Functions ---
# Note: Functions needing cog state (like personality traits for mistakes)
# will need the 'cog' instance passed in.
def replace_mentions_with_names(
cog: "GurtCog", content: str, message: discord.Message
) -> str:
"""Replaces user mentions (<@id> or <@!id>) with their display names."""
if not message.mentions:
return content
processed_content = content
# Sort by length of ID to handle potential overlaps correctly (longer IDs first)
# Although Discord IDs are fixed length, this is safer if formats change
sorted_mentions = sorted(
message.mentions, key=lambda m: len(str(m.id)), reverse=True
)
for member in sorted_mentions:
# Use display_name for better readability
processed_content = processed_content.replace(
f"<@{member.id}>", member.display_name
)
processed_content = processed_content.replace(
f"<@!{member.id}>", member.display_name
) # Handle nickname mention format
return processed_content
def _format_attachment_size(size_bytes: int) -> str:
"""Formats attachment size into KB or MB."""
if size_bytes < 1024:
return f"{size_bytes} B"
elif size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.1f} KB"
else:
return f"{size_bytes / (1024 * 1024):.1f} MB"
def format_message(cog: "GurtCog", message: discord.Message) -> Dict[str, Any]:
"""
Helper function to format a discord.Message object into a dictionary,
including detailed reply info and attachment descriptions.
"""
# Process content first to replace mentions
processed_content = replace_mentions_with_names(
cog, message.content, message
) # Pass cog
# --- Attachment Processing ---
attachment_descriptions = []
for a in message.attachments:
size_str = _format_attachment_size(a.size)
file_type = (
"Image"
if a.content_type and a.content_type.startswith("image/")
else "File"
)
description = f"[{file_type}: {a.filename} ({a.content_type or 'unknown type'}, {size_str})]"
attachment_descriptions.append(
{
"description": description,
"filename": a.filename,
"content_type": a.content_type,
"size": a.size,
"url": a.url, # Keep URL for potential future use (e.g., vision model)
}
)
# --- End Attachment Processing ---
# Basic message structure
formatted_msg = {
"id": str(message.id),
"author": {
"id": str(message.author.id),
"name": message.author.name,
"display_name": message.author.display_name,
"bot": message.author.bot,
},
"content": processed_content, # Use processed content
"author_string": f"{message.author.display_name}{' (BOT)' if message.author.bot else ''}", # Add formatted author string
"created_at": message.created_at.isoformat(),
"attachment_descriptions": attachment_descriptions, # Use new descriptions list
# "attachments": [{"filename": a.filename, "url": a.url} for a in message.attachments], # REMOVED old field
# "embeds": len(message.embeds) > 0, # Replaced by embed_content below
"embed_content": [], # Initialize embed content list
"mentions": [
{"id": str(m.id), "name": m.name, "display_name": m.display_name}
for m in message.mentions
], # Keep detailed mentions
# Reply fields initialized
"replied_to_message_id": None,
"replied_to_author_id": None,
"replied_to_author_name": None,
"replied_to_content_snippet": None, # Changed field name for clarity
"is_reply": False,
"custom_emojis": [], # Initialize custom_emojis list
"stickers": [], # Initialize stickers list
}
# --- Custom Emoji Processing ---
# Regex to find custom emojis: <:name:id> or <a:name:id>
emoji_pattern = re.compile(r"<(a)?:([a-zA-Z0-9_]+):([0-9]+)>")
for match in emoji_pattern.finditer(message.content):
animated_flag, emoji_name, emoji_id_str = match.groups()
emoji_id = int(emoji_id_str)
animated = bool(animated_flag)
emoji_obj = cog.bot.get_emoji(emoji_id)
if emoji_obj:
formatted_msg["custom_emojis"].append(
{
"name": emoji_obj.name,
"url": str(emoji_obj.url),
"id": str(emoji_obj.id),
"animated": emoji_obj.animated,
}
)
else:
# Fallback if emoji is not directly accessible by the bot
# Construct a potential URL (Discord's CDN format)
extension = "gif" if animated else "png"
fallback_url = f"https://cdn.discordapp.com/emojis/{emoji_id}.{extension}"
formatted_msg["custom_emojis"].append(
{
"name": emoji_name, # Name from regex
"url": fallback_url,
"id": emoji_id_str,
"animated": animated,
}
)
# --- Sticker Processing ---
if message.stickers:
for sticker_item in message.stickers:
# discord.StickerItem has name, id, format, and url
formatted_msg["stickers"].append(
{
"name": sticker_item.name,
"url": str(
sticker_item.url
), # sticker_item.url is already the asset URL
"id": str(sticker_item.id),
"format": str(sticker_item.format), # e.g., "StickerFormatType.png"
}
)
# --- Reply Processing ---
if message.reference and message.reference.message_id:
formatted_msg["replied_to_message_id"] = str(message.reference.message_id)
formatted_msg["is_reply"] = True
# Try to get resolved details (might be None if message not cached/fetched)
ref_msg = message.reference.resolved
if isinstance(ref_msg, discord.Message): # Check if resolved is a Message
formatted_msg["replied_to_author_id"] = str(ref_msg.author.id)
formatted_msg["replied_to_author_name"] = ref_msg.author.display_name
# Create a snippet of the replied-to content
snippet = ref_msg.content
if len(snippet) > 80: # Truncate long replies
snippet = snippet[:77] + "..."
formatted_msg["replied_to_content_snippet"] = snippet
# else: print(f"Referenced message {message.reference.message_id} not resolved.") # Optional debug
# --- End Reply Processing ---
# --- Embed Processing ---
for embed in message.embeds:
embed_data = {
"title": embed.title if embed.title else None,
"description": embed.description if embed.description else None,
"url": embed.url if embed.url else None,
"color": embed.color.value if embed.color else None,
"timestamp": embed.timestamp.isoformat() if embed.timestamp else None,
"fields": [],
"footer": None,
"author": None,
"thumbnail_url": embed.thumbnail.url if embed.thumbnail else None,
"image_url": embed.image.url if embed.image else None,
}
if embed.footer and embed.footer.text:
embed_data["footer"] = {
"text": embed.footer.text,
"icon_url": embed.footer.icon_url,
}
if embed.author and embed.author.name:
embed_data["author"] = {
"name": embed.author.name,
"url": embed.author.url,
"icon_url": embed.author.icon_url,
}
for field in embed.fields:
embed_data["fields"].append(
{"name": field.name, "value": field.value, "inline": field.inline}
)
formatted_msg["embed_content"].append(embed_data)
# --- End Embed Processing ---
return formatted_msg
def update_relationship(cog: "GurtCog", user_id_1: str, user_id_2: str, change: float):
"""Updates the relationship score between two users."""
# Ensure consistent key order
if user_id_1 > user_id_2:
user_id_1, user_id_2 = user_id_2, user_id_1
# Initialize user_id_1's dict if not present
if user_id_1 not in cog.user_relationships:
cog.user_relationships[user_id_1] = {}
current_score = cog.user_relationships[user_id_1].get(user_id_2, 0.0)
new_score = max(0.0, min(current_score + change, 100.0)) # Clamp 0-100
cog.user_relationships[user_id_1][user_id_2] = new_score
# print(f"Updated relationship {user_id_1}-{user_id_2}: {current_score:.1f} -> {new_score:.1f} ({change:+.1f})") # Debug log
async def simulate_human_typing(cog: "GurtCog", channel, text: str):
"""Shows typing indicator without significant delay."""
# Minimal delay to ensure the typing indicator shows up reliably
# but doesn't add noticeable latency to the response.
# The actual sending of the message happens immediately after this.
# Check if the bot has permissions to send messages and type
perms = (
channel.permissions_for(channel.guild.me)
if isinstance(channel, discord.TextChannel)
else None
)
if perms is None or (
perms.send_messages and perms.send_tts_messages
): # send_tts_messages often implies typing allowed
try:
async with channel.typing():
await asyncio.sleep(
0.1
) # Very short sleep, just to ensure typing shows
except discord.Forbidden:
print(f"Warning: Missing permissions to type in channel {channel.id}")
except Exception as e:
print(f"Warning: Error during typing simulation in {channel.id}: {e}")
# else: print(f"Skipping typing simulation in {channel.id} due to missing permissions.") # Optional debug
async def log_internal_api_call(
cog: "GurtCog",
task_description: str,
payload: Dict[str, Any],
response_data: Optional[Dict[str, Any]],
error: Optional[Exception] = None,
):
"""Helper function to log internal API calls to a file."""
log_dir = "data"
log_file = os.path.join(log_dir, "internal_api_calls.log")
try:
os.makedirs(log_dir, exist_ok=True)
timestamp = datetime.datetime.now().isoformat()
log_entry = f"--- Log Entry: {timestamp} ---\n"
log_entry += f"Task: {task_description}\n"
log_entry += f"Model: {payload.get('model', 'N/A')}\n"
# Sanitize payload for logging (avoid large base64 images)
payload_to_log = payload.copy()
if "messages" in payload_to_log:
sanitized_messages = []
for msg in payload_to_log["messages"]:
if isinstance(msg.get("content"), list): # Multimodal message
new_content = []
for part in msg["content"]:
if part.get("type") == "image_url" and part.get(
"image_url", {}
).get("url", "").startswith("data:image"):
new_content.append(
{
"type": "image_url",
"image_url": {"url": "data:image/...[truncated]"},
}
)
else:
new_content.append(part)
sanitized_messages.append({**msg, "content": new_content})
else:
sanitized_messages.append(msg)
payload_to_log["messages"] = sanitized_messages
log_entry += f"Request Payload:\n{json.dumps(payload_to_log, indent=2)}\n"
if response_data:
log_entry += f"Response Data:\n{json.dumps(response_data, indent=2)}\n"
if error:
log_entry += f"Error: {str(error)}\n"
log_entry += "---\n\n"
# Use async file writing if in async context, but this helper might be called from sync code?
# Sticking to sync file I/O for simplicity here, assuming logging isn't performance critical path.
with open(log_file, "a", encoding="utf-8") as f:
f.write(log_entry)
except Exception as log_e:
print(f"!!! Failed to write to internal API log file {log_file}: {log_e}")
# Note: _create_human_like_mistake was removed as it wasn't used in the final on_message logic provided.
# If needed, it can be added back here, ensuring it takes 'cog' if it needs personality traits.