This commit is contained in:
Slipstream 2025-05-01 09:29:06 -06:00
parent 6097a1a85d
commit ff4b399794
Signed by: slipstream
GPG Key ID: 13E498CE010AC6FD

View File

@ -582,16 +582,10 @@ def parse_and_validate_json_response(
# --- Tool Processing ---
# Updated to use google.generativeai types
async def process_requested_tools(cog: 'GurtCog', function_call: types.FunctionCall) -> types.Part:
async def process_requested_tools(cog: 'GurtCog', function_call: types.FunctionCall) -> List[types.Part]: # Return type is List
"""
Process a tool request specified by the AI's FunctionCall response.
Args:
cog: The GurtCog instance.
function_call: The FunctionCall object from the GenerateContentResponse.
Returns:
A types.Part object containing the tool result or error, formatted for the follow-up API call.
Returns a list of types.Part objects (usually one, but potentially more if an image URL is detected in the result).
"""
function_name = function_call.name
# function_call.args is already a dict-like object in google.generativeai
@ -605,9 +599,7 @@ async def process_requested_tools(cog: 'GurtCog', function_call: types.FunctionC
try:
tool_func = TOOL_MAPPING[function_name]
# Execute the mapped function
# Ensure the function signature matches the expected arguments
# Pass cog if the tool implementation requires it
result = await tool_func(cog, **function_args)
result_dict = await tool_func(cog, **function_args)
# --- Tool Success Logging ---
tool_elapsed_time = time.monotonic() - tool_start_time
@ -618,20 +610,19 @@ async def process_requested_tools(cog: 'GurtCog', function_call: types.FunctionC
cog.tool_stats[function_name]['count'] += 1
print(f"Tool '{function_name}' executed successfully in {tool_elapsed_time:.2f}s.")
# Prepare result for API - must be JSON serializable, typically a dict
if not isinstance(result, dict):
# Attempt to convert common types or wrap in a dict
if isinstance(result, (str, int, float, bool, list)) or result is None:
result = {"result": result}
# Ensure result is a dict, converting if necessary
if not isinstance(result_dict, dict):
if isinstance(result_dict, (str, int, float, bool, list)) or result_dict is None:
result_dict = {"result": result_dict}
else:
print(f"Warning: Tool '{function_name}' returned non-standard type {type(result)}. Attempting str conversion.")
result = {"result": str(result)}
print(f"Warning: Tool '{function_name}' returned non-standard type {type(result_dict)}. Attempting str conversion.")
result_dict = {"result": str(result_dict)}
tool_result_content = result
tool_result_content = result_dict # Now guaranteed to be a dict
except Exception as e:
# --- Tool Failure Logging ---
tool_elapsed_time = time.monotonic() - tool_start_time
tool_elapsed_time = time.monotonic() - tool_start_time # Recalculate time even on failure
if function_name not in cog.tool_stats:
cog.tool_stats[function_name] = {'success': 0, 'failure': 0, 'total_time': 0.0, 'count': 0}
cog.tool_stats[function_name]['failure'] += 1
@ -641,22 +632,70 @@ async def process_requested_tools(cog: 'GurtCog', function_call: types.FunctionC
print(f"{error_message} (Took {tool_elapsed_time:.2f}s)")
import traceback
traceback.print_exc()
tool_result_content = {"error": error_message}
else:
tool_result_content = {"error": error_message} # Ensure it's a dict even on error
else: # This 'else' corresponds to 'if function_name in TOOL_MAPPING:'
# --- Tool Not Found Logging ---
tool_elapsed_time = time.monotonic() - tool_start_time
# Log attempt even if tool not found
tool_elapsed_time = time.monotonic() - tool_start_time # Time for the failed lookup
if function_name not in cog.tool_stats:
cog.tool_stats[function_name] = {'success': 0, 'failure': 0, 'total_time': 0.0, 'count': 0}
cog.tool_stats[function_name]['failure'] += 1
cog.tool_stats[function_name]['failure'] += 1 # Count as failure
cog.tool_stats[function_name]['total_time'] += tool_elapsed_time
cog.tool_stats[function_name]['count'] += 1
error_message = f"Tool '{function_name}' not found or implemented."
print(f"{error_message} (Took {tool_elapsed_time:.2f}s)")
tool_result_content = {"error": error_message}
tool_result_content = {"error": error_message} # Ensure it's a dict
# Return the result formatted as a types.Part for the API using the correct type
return types.Part(function_response=types.FunctionResponse(name=function_name, response=tool_result_content))
# --- Process result for potential image URLs ---
parts_to_return: List[types.Part] = []
original_image_url: Optional[str] = None # Store the original URL if found
modified_result_content = copy.deepcopy(tool_result_content) # Work on a copy
# --- Image URL Detection & Modification ---
# Check specific tools and keys known to contain image URLs
if function_name == "get_user_avatar_url" and isinstance(modified_result_content, dict):
avatar_url_value = modified_result_content.get("avatar_url")
if avatar_url_value and isinstance(avatar_url_value, str):
original_image_url = avatar_url_value # Store original
modified_result_content["avatar_url"] = "[Image Content Attached]" # Replace URL with placeholder
elif function_name == "get_user_profile_info" and isinstance(modified_result_content, dict):
profile_dict = modified_result_content.get("profile")
if isinstance(profile_dict, dict):
avatar_url_value = profile_dict.get("avatar_url")
if avatar_url_value and isinstance(avatar_url_value, str):
original_image_url = avatar_url_value # Store original
profile_dict["avatar_url"] = "[Image Content Attached]" # Replace URL in nested dict
# Add checks for other tools/keys that might return image URLs if necessary
# --- Create Parts ---
# Always add the function response part (using the potentially modified content)
function_response_part = types.Part(function_response=types.FunctionResponse(name=function_name, response=modified_result_content))
parts_to_return.append(function_response_part)
# Add image part if an original URL was found and seems valid
if original_image_url and isinstance(original_image_url, str) and original_image_url.startswith('http'):
try:
# More robust MIME type detection attempt (basic fallback)
# You might use libraries like 'mimetypes' or check headers if needed
image_ext = original_image_url.split('.')[-1].split('?')[0].lower()
mime_type = f"image/{image_ext}" if image_ext in ['png', 'jpg', 'jpeg', 'webp', 'gif', 'heic', 'heif'] else "image/jpeg" # Default guess
# Validate against known supported image types for Gemini
supported_image_mimes = ["image/png", "image/jpeg", "image/webp", "image/heic", "image/heif"]
if mime_type not in supported_image_mimes:
print(f"Warning: Detected MIME type '{mime_type}' for {original_image_url} might not be supported by Gemini. Attempting anyway.")
# You could fallback to 'image/jpeg' or omit the part if strictness is required
image_part = types.Part.from_uri(original_image_url, mime_type=mime_type)
parts_to_return.append(image_part)
print(f"Added image part for URL: {original_image_url} (MIME: {mime_type}) from tool '{function_name}' result.")
except Exception as e:
print(f"Error creating image part from URI {original_image_url}: {e}")
# Optionally add an error text part to inform the LLM
error_text_part = types.Part(text=f"[System Note: Failed to process image URI {original_image_url} from tool '{function_name}': {e}]")
parts_to_return.append(error_text_part)
return parts_to_return # Return the list of parts (will contain 1 or 2+ parts)
# --- Helper to find function call in parts ---
@ -819,49 +858,77 @@ async def get_ai_response(cog: 'GurtCog', message: discord.Message, model_name:
# current_message_parts.append(types.Part(text=text_content))
# --- End text content construction ---
# The attachment processing for the current message is now handled within gather_conversation_context
# if message.attachments:
# print(f"Processing {len(message.attachments)} attachments for message {message.id}")
# for attachment in message.attachments:
# mime_type = attachment.content_type
# file_url = attachment.url
# filename = attachment.filename
# --- Add current message attachments (uncommented and potentially modified) ---
# Find the last 'user' message in contents (which should be the current message)
current_user_content_index = -1
for i in range(len(contents) - 1, -1, -1):
if contents[i].role == "user":
current_user_content_index = i
break
# # Check if MIME type is supported for URI input by Gemini
# supported_mime_prefixes = ["image/", "video/", "audio/", "text/plain", "application/pdf"]
# is_supported = False
# detected_mime_type = mime_type if mime_type else "application/octet-stream"
# for prefix in supported_mime_prefixes:
# if detected_mime_type.startswith(prefix):
# is_supported = True
# break
if message.attachments and current_user_content_index != -1:
print(f"Processing {len(message.attachments)} attachments for current message {message.id}")
attachment_parts_to_add = [] # Collect parts to add to the current user message
# # Always add a text instruction for every attachment
# instruction_text = f"User attached a file: '{filename}' (Type: {detected_mime_type}). Analyze this file from the following URI and incorporate your understanding into your response."
# current_message_parts.append(types.Part(text=instruction_text))
# print(f"Added text instruction for attachment: {filename}")
# Fetch the formatted message data to get the descriptions generated earlier
# This avoids duplicating the description generation logic here
formatted_current_message = format_message(cog, message) # Pass cog
attachment_descriptions = formatted_current_message.get("attachment_descriptions", [])
desc_map = {desc.get("filename"): desc for desc in attachment_descriptions}
# if is_supported and file_url:
# try:
# # Add the URI part with a cleaned/fallback MIME type using from_uri
# clean_mime_type = detected_mime_type.split(';')[0] if detected_mime_type else "application/octet-stream"
# current_message_parts.append(types.Part.from_uri(file_url, clean_mime_type))
# print(f"Added URI part for attachment: {filename} ({clean_mime_type}) using URL: {file_url}")
# except Exception as e:
# print(f"Error creating types.Part for attachment {filename} ({detected_mime_type}): {e}")
# current_message_parts.append(types.Part(text=f"(System Note: Failed to process attachment '{filename}' - {e})"))
# else:
# print(f"Skipping unsupported or invalid attachment: {filename} (Type: {detected_mime_type}, URL: {file_url})")
# current_message_parts.append(types.Part(text=f"(System Note: User attached an unsupported file '{filename}' of type '{detected_mime_type}' which cannot be processed.)"))
for attachment in message.attachments:
mime_type = attachment.content_type
file_url = attachment.url
filename = attachment.filename
# Check if MIME type is supported for URI input by Gemini
# Expanded list based on Gemini 1.5 Pro docs (April 2024)
supported_mime_prefixes = [
"image/", # image/png, image/jpeg, image/heic, image/heif, image/webp
"video/", # video/mov, video/mpeg, video/mp4, video/mpg, video/avi, video/wmv, video/mpegps, video/flv
"audio/", # audio/mpeg, audio/mp3, audio/wav, audio/ogg, audio/flac, audio/opus, audio/amr, audio/midi
"text/", # text/plain, text/html, text/css, text/javascript, text/json, text/csv, text/rtf, text/markdown
"application/pdf",
"application/rtf", # Explicitly add RTF if needed
# Add more as supported/needed
]
is_supported = False
detected_mime_type = mime_type if mime_type else "application/octet-stream" # Default if missing
for prefix in supported_mime_prefixes:
if detected_mime_type.startswith(prefix):
is_supported = True
break
# Ensure there's always *some* content part, even if only text or errors
# This section is also no longer needed as the current message is included in conversation_context_messages
# if current_message_parts:
# contents.append(types.Content(role="user", parts=current_message_parts))
# else:
# print("Warning: No content parts generated for user message.")
# contents.append(types.Content(role="user", parts=[types.Part(text="")])) # Ensure content list isn't empty
# Get pre-formatted description string (already includes size, type etc.)
preformatted_desc = desc_map.get(filename, {}).get("description", f"[File: {filename} (unknown type)]")
# Add the descriptive text part using the pre-formatted description
instruction_text = f"{preformatted_desc}" # Simplified: Just use the description
attachment_parts_to_add.append(types.Part(text=instruction_text))
print(f"Added text description for attachment: {filename}")
if is_supported and file_url:
try:
# Add the URI part with a cleaned MIME type
clean_mime_type = detected_mime_type.split(';')[0] if detected_mime_type else "application/octet-stream"
attachment_parts_to_add.append(types.Part.from_uri(file_url, clean_mime_type))
print(f"Added URI part for supported attachment: {filename} ({clean_mime_type}) using URL: {file_url}")
except Exception as e:
print(f"Error creating URI types.Part for attachment {filename} ({detected_mime_type}): {e}")
attachment_parts_to_add.append(types.Part(text=f"(System Note: Failed to process attachment '{filename}' - {e})"))
else:
print(f"Skipping URI part for unsupported attachment: {filename} (Type: {detected_mime_type}, URL: {file_url})")
# Text description was already added above
# Add the collected attachment parts to the existing user message parts
if attachment_parts_to_add:
contents[current_user_content_index].parts.extend(attachment_parts_to_add)
print(f"Extended user message at index {current_user_content_index} with {len(attachment_parts_to_add)} attachment parts.")
elif not message.attachments:
print("No attachments found for the current message.")
elif current_user_content_index == -1:
print("Warning: Could not find current user message in contents to add attachments to.")
# --- End attachment processing ---
# --- Prepare Tools ---
# Preprocess tool parameter schemas before creating the Tool object
@ -995,29 +1062,34 @@ async def get_ai_response(cog: 'GurtCog', message: discord.Message, model_name:
print(f"Error caching model's tool request turn: {cache_err}")
# --- Execute all requested tools and gather response parts ---
function_response_parts = []
# --- Execute all requested tools and gather response parts ---
# function_response_parts = [] # <-- REMOVE THIS INITIALIZATION
all_function_response_parts: List[types.Part] = [] # New list to collect all parts
function_results_for_cache = [] # Store results for caching
for func_call in function_calls_found:
# Execute the tool using the updated helper
response_part = await process_requested_tools(cog, func_call)
function_response_parts.append(response_part)
# Extract result for caching (assuming response_part.function_response exists)
if hasattr(response_part, 'function_response') and response_part.function_response:
# Execute the tool using the updated helper, now returns a LIST of parts
returned_parts = await process_requested_tools(cog, func_call) # returns List[types.Part]
all_function_response_parts.extend(returned_parts) # <-- EXTEND the list
# --- Update caching logic ---
# Find the function_response part within returned_parts to get the result for cache
func_resp_part = next((p for p in returned_parts if hasattr(p, 'function_response')), None)
if func_resp_part and func_resp_part.function_response:
function_results_for_cache.append({
"name": response_part.function_response.name,
"response": response_part.function_response.response # This is already the dict result
"name": func_resp_part.function_response.name,
"response": func_resp_part.function_response.response # This is the modified dict result
})
# --- End update caching logic ---
# Append a single function role turn containing ALL response parts to the API contents
if function_response_parts:
function_response_content = types.Content(role="function", parts=function_response_parts)
if all_function_response_parts: # Check the new list
function_response_content = types.Content(role="function", parts=all_function_response_parts) # <-- Use the combined list
contents.append(function_response_content)
# <<< ADDED INSTRUCTION >>>
# <<< ADDED INSTRUCTION >>> remains the same
# Add a specific instruction after the tool results are appended
instruction_text = "System Note: You have just received the result(s) of the tool(s) you requested. Generate your final response according to the required JSON schema. CRITICAL: DO NOT attempt to call any more tools (including 'no_operation') in this response. Focus *solely* on creating the JSON output."
instruction_text = "System Note: You have just received the result(s) of the tool(s) you requested (potentially including image data). Generate your final response according to the required JSON schema. CRITICAL: DO NOT attempt to call any more tools (including 'no_operation') in this response. Focus *solely* on creating the JSON output." # Minor text update
instruction_part = types.Part(text=instruction_text)
# Using 'model' role seems appropriate for a system-like reminder integrated into the turn flow
instruction_content = types.Content(role="model", parts=[instruction_part])