import discord from discord.ext import commands import re import os import asyncio import subprocess import json import base64 import datetime # For snapshot naming import random # For snapshot naming from typing import Dict, Any, List, Optional, Tuple # Google Generative AI Imports (using Vertex AI backend) from google import genai from google.genai import types as google_genai_types # Renamed to avoid conflict with typing.types from google.api_core import exceptions as google_exceptions # Import project configuration for Vertex AI try: from gurt.config import PROJECT_ID, LOCATION except ImportError: PROJECT_ID = os.getenv("GCP_PROJECT_ID") # Fallback to environment variable LOCATION = os.getenv("GCP_LOCATION") # Fallback to environment variable if not PROJECT_ID or not LOCATION: print("Warning: PROJECT_ID or LOCATION not found in gurt.config or environment variables.") # Allow cog to load but genai_client will be None from tavily import TavilyClient # Define standard safety settings using google.generativeai types # Set all thresholds to OFF as requested for internal tools STANDARD_SAFETY_SETTINGS = [ google_genai_types.SafetySetting(category=google_genai_types.HarmCategory.HARM_CATEGORY_HATE_SPEECH, threshold="BLOCK_NONE"), google_genai_types.SafetySetting(category=google_genai_types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT, threshold="BLOCK_NONE"), google_genai_types.SafetySetting(category=google_genai_types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT, threshold="BLOCK_NONE"), google_genai_types.SafetySetting(category=google_genai_types.HarmCategory.HARM_CATEGORY_HARASSMENT, threshold="BLOCK_NONE"), ] COMMIT_AUTHOR = "AI Coding Agent Cog " AGENT_SYSTEM_PROMPT = """You are an expert AI Coding Agent. Your primary function is to assist the user (bot owner) by directly modifying the codebase of this Discord bot project or performing related tasks. You operate by understanding user requests and then generating specific inline "tool calls" in your responses when you need to interact with the file system, execute commands, or search the web. **Inline Tool Call Syntax:** When you need to use a tool, your response should *only* contain the tool call block, formatted exactly as specified below. The system will parse this, execute the tool, and then feed the output back to you in a subsequent message prefixed with "ToolResponse:". 1. **ReadFile:** Reads the content of a specified file. ```tool ReadFile: path: ``` (System will provide file content or error in ToolResponse) 2. **WriteFile:** Writes content to a specified file, overwriting if it exists, creating if it doesn't. ```tool WriteFile: path: content: | ``` (System will confirm success or report error in ToolResponse) 3. **ApplyDiff:** Applies a diff/patch to a file. Use standard unidiff format for the diff_block. ```tool ApplyDiff: path: diff_block: | ``` (System will confirm success or report error in ToolResponse) 4. **ExecuteCommand:** Executes a shell command. ```tool ExecuteCommand: command: ``` (System will provide stdout/stderr or error in ToolResponse) 5. **ListFiles:** Lists files and directories at a given path. ```tool ListFiles: path: recursive: ``` (System will provide file list or error in ToolResponse) 6. **WebSearch:** Searches the web for information. ```tool WebSearch: query: ``` (System will provide search results or error in ToolResponse) **Workflow and Rules:** - **Tool Preference:** For modifying existing files, ALWAYS prefer `ApplyDiff` if the changes are targeted. Use `WriteFile` for new files or if `ApplyDiff` is unsuitable or fails repeatedly. - **Direct Operation:** You operate directly. No explicit user confirmation is needed for individual tool actions after the initial user prompt. - **Programmatic Snapshots:** The system will automatically create a separate, programmatic Git snapshot of the project's state *before* you are allowed to make any file modifications using `WriteFile` or `ApplyDiff`. This is for safety and rollback, managed by the cog. You will be notified when a snapshot is created. - **Git Workflow for Your Changes:** After you believe your coding task and all related file modifications are complete and correct, you MUST use the `ExecuteCommand` tool to perform the following Git operations in sequence: 1. `git add .` (to stage all your changes) 2. `git commit --author="AI Coding Agent Cog " -m "AI Agent: "` (You will generate the commit message part) 3. **Before pushing, attempt to integrate remote changes:** `git pull --rebase` 4. `git push` - **Commit Messages:** Ensure your commit messages are descriptive of the changes made. - **Conflict Resolution (Git Pull --rebase):** If `git pull --rebase` (executed via `ExecuteCommand`) results in merge conflicts, the `ToolResponse` will indicate this. You must then: a. Use `ReadFile` to inspect the conflicted file(s) to see the conflict markers. b. Decide on the resolution. c. Use `WriteFile` or `ApplyDiff` to apply your resolved version of the file(s). (Remember, a programmatic snapshot will be made before these tools run). d. Use `ExecuteCommand` for `git add ` for each resolved file. e. Use `ExecuteCommand` for `git rebase --continue`. f. Then attempt `git push` again using `ExecuteCommand`. - **Push Failures:** If `git push` still fails (e.g., other non-fast-forward errors), the `ToolResponse` will report this. You should then inform the user about the push failure and the reason, and await further instructions. Do not attempt overly complex recovery maneuvers without user guidance. - **Clarity:** Be clear and methodical. If a step fails, acknowledge it and decide on the next course of action (retry, try alternative, or inform user). - **Focus:** Your goal is to complete the coding/file manipulation task as requested by the user. """ class AICodeAgentCog(commands.Cog): def __init__(self, bot: commands.Bot): self.bot = bot self.genai_client = None self.agent_conversations: Dict[int, List[google_genai_types.Content]] = {} # User ID to conversation history # Initialize Google GenAI Client for Vertex AI if PROJECT_ID and LOCATION: try: self.genai_client = genai.Client( vertexai=True, project=PROJECT_ID, location=LOCATION, ) print(f"AICodeAgentCog: Google GenAI Client initialized for Vertex AI project '{PROJECT_ID}' in location '{LOCATION}'.") except Exception as e: print(f"AICodeAgentCog: Error initializing Google GenAI Client for Vertex AI: {e}") self.genai_client = None # Ensure it's None on failure else: print("AICodeAgentCog: PROJECT_ID or LOCATION not configured. Google GenAI Client not initialized.") # AI Model Configuration self._ai_model: str = "gemini-2.5-flash-preview-05-20" # Default model self._available_models: Dict[str, str] = { "pro": "gemini-2.5-pro-preview-05-06", # Assuming this is the intended Pro model "flash": "gemini-2.5-flash-preview-05-20" } # User mentioned "gemini-2.5-pro-preview-05-06" and "gemini-2.5-flash-preview-05-20" # Updating to reflect those if they are the correct ones, otherwise the 1.5 versions are common. # For now, sticking to what was in the plan based on common Gemini models. # If 2.5 models are indeed what's intended and available, these strings should be updated. # Tavily Web Search Integration self.tavily_api_key: Optional[str] = os.getenv("TAVILY_API_KEY") self.tavily_client: Optional[TavilyClient] = None if self.tavily_api_key: self.tavily_client = TavilyClient(api_key=self.tavily_api_key) print("AICodeAgentCog: TavilyClient initialized.") else: print("AICodeAgentCog: TAVILY_API_KEY not found. TavilyClient not initialized.") self.tavily_search_depth: str = os.getenv("TAVILY_DEFAULT_SEARCH_DEPTH", "basic") self.tavily_max_results: int = int(os.getenv("TAVILY_DEFAULT_MAX_RESULTS", "5")) @commands.command(name="codeagent_model") @commands.is_owner() async def set_ai_model_command(self, ctx: commands.Context, model_key: str): """Sets the AI model for the code agent. Usage: !codeagent_model """ model_key = model_key.lower() if model_key in self._available_models: self._ai_model = self._available_models[model_key] await ctx.send(f"AICodeAgent: AI model set to: {self._ai_model} (key: {model_key})") else: await ctx.send(f"AICodeAgent: Invalid model key '{model_key}'. Available keys: {', '.join(self._available_models.keys())}") @commands.command(name="codeagent_get_model") @commands.is_owner() async def get_ai_model_command(self, ctx: commands.Context): """Gets the current AI model for the code agent.""" await ctx.send(f"AICodeAgent: Current AI model is: {self._ai_model}") @commands.command(name="codeagent_clear") @commands.is_owner() async def clear_agent_history_command(self, ctx: commands.Context): """Clears the conversation history for the code agent for the calling user.""" user_id = ctx.author.id if user_id in self.agent_conversations: del self.agent_conversations[user_id] await ctx.send("AICodeAgent: Conversation history cleared for you.") else: await ctx.send("AICodeAgent: No conversation history found for you to clear.") async def _run_git_command(self, command: str) -> Tuple[bool, str]: """Runs a Git command and returns (success_status, output_string).""" try: process = await asyncio.create_subprocess_shell( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout, stderr = await process.communicate() output = "" if stdout: output += stdout.decode(errors='replace') if stderr: output += stderr.decode(errors='replace') if process.returncode == 0: return True, output.strip() else: error_message = f"Git command failed with exit code {process.returncode}:\n{output.strip()}" print(f"AICodeAgentCog: {error_message}") return False, error_message except Exception as e: error_message = f"Exception running Git command '{command}': {e}" print(f"AICodeAgentCog: {error_message}") return False, error_message async def _create_programmatic_snapshot(self) -> Optional[str]: """Creates a programmatic Git snapshot using a temporary branch.""" try: # Get current branch name success, current_branch_name = await self._run_git_command("git rev-parse --abbrev-ref HEAD") if not success: print(f"AICodeAgentCog: Failed to get current branch name for snapshot: {current_branch_name}") return None current_branch_name = current_branch_name.strip() timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") random_hex = random.randbytes(3).hex() snapshot_branch_name = f"snapshot_cog_{timestamp}_{random_hex}" # Create and checkout the new snapshot branch success, output = await self._run_git_command(f"git checkout -b {snapshot_branch_name}") if not success: print(f"AICodeAgentCog: Failed to create snapshot branch '{snapshot_branch_name}': {output}") # Attempt to switch back if checkout failed mid-operation await self._run_git_command(f"git checkout {current_branch_name}") # Best effort return None # Commit any currently staged changes or create an empty commit for a clean snapshot point commit_message = f"Cog Snapshot: Pre-AI Edit State on branch {snapshot_branch_name}" success, output = await self._run_git_command(f"git commit --author=\"{COMMIT_AUTHOR}\" -m \"{commit_message}\" --allow-empty") if not success: print(f"AICodeAgentCog: Failed to commit snapshot on '{snapshot_branch_name}': {output}") # Attempt to switch back and clean up branch await self._run_git_command(f"git checkout {current_branch_name}") await self._run_git_command(f"git branch -D {snapshot_branch_name}") # Best effort cleanup return None # Switch back to the original branch success, output = await self._run_git_command(f"git checkout {current_branch_name}") if not success: print(f"AICodeAgentCog: CRITICAL - Failed to switch back to original branch '{current_branch_name}' after snapshot. Current branch might be '{snapshot_branch_name}'. Manual intervention may be needed. Error: {output}") return snapshot_branch_name # Return it so it can potentially be used/deleted print(f"AICodeAgentCog: Successfully created snapshot branch: {snapshot_branch_name}") return snapshot_branch_name except Exception as e: print(f"AICodeAgentCog: Exception in _create_programmatic_snapshot: {e}") return None @commands.command(name="codeagent_list_snapshots") @commands.is_owner() async def list_snapshots_command(self, ctx: commands.Context): """Lists available programmatic Git snapshots created by the cog.""" success, output = await self._run_git_command('git branch --list "snapshot_cog_*"') if success: if output: await ctx.send(f"Available snapshots:\n```\n{output}\n```") else: await ctx.send("No programmatic snapshots found.") else: await ctx.send(f"Error listing snapshots: {output}") @commands.command(name="codeagent_revert_to_snapshot") @commands.is_owner() async def revert_to_snapshot_command(self, ctx: commands.Context, snapshot_branch_name: str): """Reverts the current branch to the state of a given snapshot branch.""" if not snapshot_branch_name.startswith("snapshot_cog_"): await ctx.send("Invalid snapshot name. Must start with 'snapshot_cog_'.") return # Check if snapshot branch exists success, branches_output = await self._run_git_command("git branch --list") # Normalize branches_output for reliable checking existing_branches = [b.strip().lstrip('* ') for b in branches_output.splitlines()] if not success or snapshot_branch_name not in existing_branches: await ctx.send(f"Snapshot branch '{snapshot_branch_name}' not found.") return await ctx.send(f"Attempting to revert current branch to snapshot '{snapshot_branch_name}'...") success, current_branch = await self._run_git_command("git rev-parse --abbrev-ref HEAD") if not success: await ctx.send(f"Failed to determine current branch before revert: {current_branch}") return current_branch = current_branch.strip() success, output = await self._run_git_command(f"git reset --hard {snapshot_branch_name}") if success: await ctx.send(f"Successfully reverted current branch ('{current_branch}') to snapshot '{snapshot_branch_name}'.\nOutput:\n```\n{output}\n```") else: await ctx.send(f"Error reverting to snapshot '{snapshot_branch_name}':\n```\n{output}\n```") @commands.command(name="codeagent_delete_snapshot") @commands.is_owner() async def delete_snapshot_command(self, ctx: commands.Context, snapshot_branch_name: str): """Deletes a programmatic Git snapshot branch.""" if not snapshot_branch_name.startswith("snapshot_cog_"): await ctx.send("Invalid snapshot name. Must start with 'snapshot_cog_'.") return success, branches_output = await self._run_git_command("git branch --list") existing_branches = [b.strip().lstrip('* ') for b in branches_output.splitlines()] if not success or snapshot_branch_name not in existing_branches: await ctx.send(f"Snapshot branch '{snapshot_branch_name}' not found.") return success, current_branch_name_str = await self._run_git_command("git rev-parse --abbrev-ref HEAD") if success and current_branch_name_str.strip() == snapshot_branch_name: await ctx.send(f"Cannot delete snapshot branch '{snapshot_branch_name}' as it is the current branch. Please checkout to a different branch first.") return elif not success: await ctx.send(f"Could not determine current branch. Deletion aborted for safety. Error: {current_branch_name_str}") return await ctx.send(f"Attempting to delete snapshot branch '{snapshot_branch_name}'...") success, output = await self._run_git_command(f"git branch -D {snapshot_branch_name}") if success: await ctx.send(f"Successfully deleted snapshot branch '{snapshot_branch_name}'.\nOutput:\n```\n{output}\n```") else: await ctx.send(f"Error deleting snapshot branch '{snapshot_branch_name}':\n```\n{output}\n```") def _get_conversation_history(self, user_id: int) -> List[google_genai_types.Content]: if user_id not in self.agent_conversations: self.agent_conversations[user_id] = [] return self.agent_conversations[user_id] def _add_to_conversation_history(self, user_id: int, role: str, text_content: str, is_tool_response: bool = False): history = self._get_conversation_history(user_id) # For Vertex AI, 'function' role is used for tool responses, 'model' for AI text, 'user' for user text. # We'll adapt this slightly for our inline tools. # AI's raw response (potentially with tool call) -> model # Tool's output -> user (formatted as "ToolResponse: ...") # User's direct prompt -> user # For simplicity in our loop, we might treat tool responses as if they are from the 'user' # to guide the AI's next step, or use a specific format the AI understands. # The system prompt already guides the AI to expect "ToolResponse:" # Let's ensure content is always a list of parts for Vertex parts = [google_genai_types.Part(text=text_content)] history.append(google_genai_types.Content(role=role, parts=parts)) # Keep history to a reasonable length (e.g., last 20 turns, or token-based limit later) max_history_items = 20 if len(history) > max_history_items: self.agent_conversations[user_id] = history[-max_history_items:] async def _parse_and_execute_tool_call(self, ctx: commands.Context, ai_response_text: str) -> Optional[str]: """ Parses AI response for an inline tool call, executes it, and returns the tool's output string. Returns None if no tool call is found. """ tool_output_str = None tool_executed = False # Flag to indicate if any tool was matched and attempted # Order of checks might matter if syntax is ambiguous, but designed to be distinct. # --- ReadFile --- read_file_match = re.search(r"^\s*ReadFile:\s*path:\s*(.+?)\s*$", ai_response_text, re.IGNORECASE | re.MULTILINE) if read_file_match: tool_executed = True file_path = read_file_match.group(1).strip() tool_output = await self._execute_tool_read_file(file_path) tool_output_str = f"ToolResponse: ReadFile\nPath: {file_path}\n---\n{tool_output}" # --- WriteFile --- if not tool_executed: write_file_match = re.search(r"^\s*WriteFile:\s*path:\s*(.+?)\s*content:\s*\|?\s*(.*)\s*$", ai_response_text, re.IGNORECASE | re.DOTALL) if write_file_match: tool_executed = True file_path = write_file_match.group(1).strip() content = write_file_match.group(2).strip() # Ensure .strip() is appropriate for all content snapshot_branch = await self._create_programmatic_snapshot() if not snapshot_branch: tool_output_str = "ToolResponse: SystemError\n---\nFailed to create project snapshot. WriteFile operation aborted." else: await ctx.send(f"AICodeAgent: Created snapshot: {snapshot_branch} before writing to {file_path}") tool_output = await self._execute_tool_write_file(file_path, content) tool_output_str = f"ToolResponse: WriteFile\nPath: {file_path}\n---\n{tool_output}" # --- ApplyDiff --- if not tool_executed: apply_diff_match = re.search(r"^\s*ApplyDiff:\s*path:\s*(.+?)\s*diff_block:\s*\|?\s*(.*)\s*$", ai_response_text, re.IGNORECASE | re.DOTALL) if apply_diff_match: tool_executed = True file_path = apply_diff_match.group(1).strip() diff_block = apply_diff_match.group(2).strip() snapshot_branch = await self._create_programmatic_snapshot() if not snapshot_branch: tool_output_str = "ToolResponse: SystemError\n---\nFailed to create project snapshot. ApplyDiff operation aborted." else: await ctx.send(f"AICodeAgent: Created snapshot: {snapshot_branch} before applying diff to {file_path}") tool_output = await self._execute_tool_apply_diff(file_path, diff_block) tool_output_str = f"ToolResponse: ApplyDiff\nPath: {file_path}\n---\n{tool_output}" # --- ExecuteCommand --- if not tool_executed: exec_command_match = re.search(r"^\s*ExecuteCommand:\s*command:\s*(.+?)\s*$", ai_response_text, re.IGNORECASE | re.DOTALL) if exec_command_match: tool_executed = True command_str = exec_command_match.group(1).strip() tool_output = await self._execute_tool_execute_command(command_str) tool_output_str = f"ToolResponse: ExecuteCommand\nCommand: {command_str}\n---\n{tool_output}" # --- ListFiles --- if not tool_executed: list_files_match = re.search(r"^\s*ListFiles:\s*path:\s*(.+?)(?:\s*recursive:\s*(true|false))?\s*$", ai_response_text, re.IGNORECASE | re.MULTILINE) if list_files_match: tool_executed = True file_path = list_files_match.group(1).strip() recursive_str = list_files_match.group(2) recursive = recursive_str.lower() == 'true' if recursive_str else False tool_output = await self._execute_tool_list_files(file_path, recursive) tool_output_str = f"ToolResponse: ListFiles\nPath: {file_path}\nRecursive: {recursive}\n---\n{tool_output}" # --- WebSearch --- if not tool_executed: web_search_match = re.search(r"^\s*WebSearch:\s*query:\s*(.+?)\s*$", ai_response_text, re.IGNORECASE | re.DOTALL) if web_search_match: tool_executed = True query_str = web_search_match.group(1).strip() tool_output = await self._execute_tool_web_search(query_str) tool_output_str = f"ToolResponse: WebSearch\nQuery: {query_str}\n---\n{tool_output}" return tool_output_str # --- Placeholder Tool Execution Methods --- # These will be properly implemented later. For now, they return placeholder strings. async def _execute_tool_read_file(self, path: str) -> str: print(f"AICodeAgentCog: Placeholder _execute_tool_read_file for path: {path}") # Actual implementation: try: # Ensure path is within project, basic safety. More robust checks might be needed. # base_dir = os.path.abspath(".") # Or specific project root # requested_path = os.path.abspath(os.path.join(base_dir, path)) # if not requested_path.startswith(base_dir): # return "Error: File path is outside the allowed project directory." if not os.path.exists(path): return f"Error: File not found at '{path}'" if os.path.isdir(path): return f"Error: Path '{path}' is a directory, not a file." with open(path, 'r', encoding='utf-8', errors='replace') as f: return f.read() except Exception as e: return f"Error reading file '{path}': {type(e).__name__} - {e}" async def _execute_tool_write_file(self, path: str, content: str) -> str: print(f"AICodeAgentCog: Placeholder _execute_tool_write_file for path: {path}") # Actual implementation: try: # base_dir = os.path.abspath(".") # requested_path = os.path.abspath(os.path.join(base_dir, path)) # if not requested_path.startswith(base_dir): # return "Error: File path is outside the allowed project directory." os.makedirs(os.path.dirname(path) or '.', exist_ok=True) # Ensure directory exists with open(path, 'w', encoding='utf-8') as f: f.write(content) return f"Successfully wrote to file '{path}'." except Exception as e: return f"Error writing to file '{path}': {type(e).__name__} - {e}" async def _execute_tool_apply_diff(self, path: str, diff_block: str) -> str: print(f"AICodeAgentCog: Attempting _execute_tool_apply_diff for path: {path}") if not os.path.exists(path): return f"Error: File not found at '{path}' for applying diff." if os.path.isdir(path): return f"Error: Path '{path}' is a directory, cannot apply diff." try: # Ensure diff_block ends with a newline for `patch` utility if not diff_block.endswith('\n'): diff_block += '\n' process = await asyncio.create_subprocess_exec( 'patch', path, # Target file for the patch stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout, stderr = await process.communicate(input=diff_block.encode('utf-8')) output_str = "" if stdout: output_str += f"Stdout:\n{stdout.decode(errors='replace')}\n" if stderr: output_str += f"Stderr:\n{stderr.decode(errors='replace')}\n" if process.returncode == 0: return f"Successfully applied diff to '{path}'.\n{output_str}" else: # Try to provide context if patch failed, e.g. if it created .rej file rej_file = f"{path}.rej" if os.path.exists(rej_file): with open(rej_file, 'r', encoding='utf-8', errors='replace') as rf: rej_content = rf.read() output_str += f"\nRejects file found ({rej_file}):\n{rej_content[:500]}...\n(Please check this file for details of failed hunks)" return f"Error applying diff to '{path}' (exit code {process.returncode}).\n{output_str}" except FileNotFoundError: return "Error: The 'patch' command-line utility was not found. Diff application failed. Please ensure 'patch' is installed and in the system PATH." except Exception as e: return f"Error applying diff to '{path}': {type(e).__name__} - {e}" async def _execute_tool_execute_command(self, command: str) -> str: print(f"AICodeAgentCog: Attempting _execute_tool_execute_command: {command}") # Basic safety check for extremely dangerous commands, even for owner. # The AI is instructed on Git workflow, but this adds a small layer. # More comprehensive checks could be added if needed. blocked_commands = ["rm -rf /", "sudo rm -rf /", "mkfs", "format C:", "> /dev/sda"] # Example for blocked in blocked_commands: if blocked in command: # Simple substring check, can be improved return f"Error: Command '{command}' appears to be extremely dangerous and was blocked by a safeguard." try: process = await asyncio.create_subprocess_shell( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout, stderr = await process.communicate() output = "" if stdout: output += f"Stdout:\n{stdout.decode(errors='replace')}\n" if stderr: output += f"Stderr:\n{stderr.decode(errors='replace')}\n" if not output.strip(): output = "Command executed with no output.\n" if process.returncode == 0: return f"Command executed successfully (exit code 0).\n{output.strip()}" else: return f"Command failed with exit code {process.returncode}.\n{output.strip()}" except Exception as e: return f"Exception executing command '{command}': {type(e).__name__} - {e}" async def _execute_tool_list_files(self, path: str, recursive: bool) -> str: print(f"AICodeAgentCog: Attempting _execute_tool_list_files for path: {path}, recursive: {recursive}") # Actual implementation: try: # base_dir = os.path.abspath(".") # requested_path = os.path.abspath(os.path.join(base_dir, path)) # if not requested_path.startswith(base_dir): # return "Error: Path is outside the allowed project directory." if not os.path.exists(path): return f"Error: Path not found at '{path}'" if not os.path.isdir(path): return f"Error: Path '{path}' is not a directory." file_list = [] if recursive: for root, dirs, files in os.walk(path): for name in files: file_list.append(os.path.join(root, name)) for name in dirs: file_list.append(os.path.join(root, name) + os.sep) # Indicate dirs else: for item in os.listdir(path): full_item_path = os.path.join(path, item) if os.path.isdir(full_item_path): file_list.append(item + os.sep) else: file_list.append(item) return "\n".join(file_list) if file_list else "No files or directories found." except Exception as e: return f"Error listing files at '{path}': {type(e).__name__} - {e}" async def _execute_tool_web_search(self, query: str) -> str: print(f"AICodeAgentCog: Placeholder _execute_tool_web_search for query: {query}") if not self.tavily_client: return "Error: Tavily client not initialized. Cannot perform web search." try: # Using basic parameters for now, can be expanded response = await asyncio.to_thread( self.tavily_client.search, query=query, search_depth=self.tavily_search_depth, # "basic" or "advanced" max_results=self.tavily_max_results, include_answer=True # Try to get a direct answer ) results_str_parts = [] if response.get("answer"): results_str_parts.append(f"Answer: {response['answer']}") if response.get("results"): for i, res in enumerate(response["results"][:self.tavily_max_results]): # Show up to max_results results_str_parts.append(f"\nResult {i+1}: {res.get('title', 'N/A')}\nURL: {res.get('url', 'N/A')}\nSnippet: {res.get('content', 'N/A')[:250]}...") # Truncate snippet return "\n".join(results_str_parts) if results_str_parts else "No search results found." except Exception as e: return f"Error during Tavily web search for '{query}': {type(e).__name__} - {e}" async def _process_agent_interaction(self, ctx: commands.Context, initial_prompt_text: str): user_id = ctx.author.id self._add_to_conversation_history(user_id, role="user", text_content=initial_prompt_text) iteration_count = 0 max_iterations = 10 # Configurable, from plan # Ensure genai_client is available if not self.genai_client: await ctx.send("AICodeAgent: Google GenAI Client is not initialized. Cannot process request.") return async with ctx.typing(): while iteration_count < max_iterations: current_history = self._get_conversation_history(user_id) if not current_history: # Should not happen if initial prompt was added await ctx.send("AICodeAgent: Error - conversation history is empty.") return try: # Construct messages for Vertex AI API # The system prompt is passed via generation_config.system_instruction vertex_contents = current_history # Already in types.Content format generation_config = google_genai_types.GenerateContentConfig( temperature=0.7, # Adjust as needed max_output_tokens=4096, # Adjust as needed safety_settings=STANDARD_SAFETY_SETTINGS, # System instruction is critical here system_instruction=google_genai_types.Content( role="system", # Though for Gemini, system prompt is often first user message or model tuning parts=[google_genai_types.Part(text=AGENT_SYSTEM_PROMPT)] ) ) print(f"AICodeAgentCog: Sending to Vertex AI. Model: {self._ai_model}. History items: {len(vertex_contents)}") # for i, item in enumerate(vertex_contents): # print(f" History {i} Role: {item.role}, Parts: {item.parts}") response = await self.genai_client.aio.models.generate_content( model=f"publishers/google/models/{self._ai_model}", contents=vertex_contents, config=generation_config, # Corrected parameter name # No 'tools' or 'tool_config' for inline tool usage ) # Safely extract text from response ai_response_text = "" if response.candidates and response.candidates[0].content and response.candidates[0].content.parts: ai_response_text = response.candidates[0].content.parts[0].text else: # Handle cases like safety blocks or empty responses finish_reason = response.candidates[0].finish_reason if response.candidates else "UNKNOWN" safety_ratings_str = "" if response.candidates and response.candidates[0].safety_ratings: sr = response.candidates[0].safety_ratings safety_ratings_str = ", ".join([f"{rating.category.name}: {rating.probability.name}" for rating in sr]) if finish_reason == google_genai_types.FinishReason.SAFETY: await ctx.send(f"AICodeAgent: AI response was blocked due to safety settings: {safety_ratings_str}") self._add_to_conversation_history(user_id, role="model", text_content=f"[Blocked by Safety: {safety_ratings_str}]") return else: await ctx.send(f"AICodeAgent: AI returned an empty or non-text response. Finish Reason: {finish_reason}. Safety: {safety_ratings_str}") self._add_to_conversation_history(user_id, role="model", text_content="[Empty or Non-Text Response]") return if not ai_response_text.strip(): await ctx.send("AICodeAgent: AI returned an empty response text.") self._add_to_conversation_history(user_id, role="model", text_content="[Empty Response Text]") return self._add_to_conversation_history(user_id, role="model", text_content=ai_response_text) print(f"AICodeAgentCog: AI Raw Response:\n{ai_response_text}") # Parse for inline tool call tool_output_str = await self._parse_and_execute_tool_call(ctx, ai_response_text) if tool_output_str: print(f"AICodeAgentCog: Tool Output:\n{tool_output_str}") self._add_to_conversation_history(user_id, role="user", text_content=tool_output_str) # Feed tool output back as 'user' iteration_count += 1 # Potentially send tool output to Discord for transparency if desired # await ctx.send(f"```\n{tool_output_str}\n```") continue # Loop back to AI with tool output in history else: # No tool call found, this is the final AI response for this turn if len(ai_response_text) > 1950: await ctx.send(ai_response_text[:1950] + "\n...(message truncated)") else: await ctx.send(ai_response_text) return # End of interaction except google_exceptions.GoogleAPICallError as e: await ctx.send(f"AICodeAgent: Vertex AI API call failed: {e}") return except Exception as e: await ctx.send(f"AICodeAgent: An unexpected error occurred during AI interaction: {e}") print(f"AICodeAgentCog: Interaction Error: {type(e).__name__} - {e}") import traceback traceback.print_exc() return # Iteration limit check (moved inside loop for clarity, but logic is similar) if iteration_count >= max_iterations: await ctx.send(f"AICodeAgent: Reached iteration limit ({max_iterations}).") try: check = lambda m: m.author == ctx.author and m.channel == ctx.channel and \ m.content.lower().startswith(("yes", "no", "continue", "feedback")) await ctx.send("Continue processing? (yes/no/feedback ):") user_response_msg = await self.bot.wait_for('message', check=check, timeout=300.0) user_response_content = user_response_msg.content.lower() if user_response_content.startswith("yes") or user_response_content.startswith("continue"): iteration_count = 0 # Reset iteration count self._add_to_conversation_history(user_id, role="user", text_content="[User approved continuation]") await ctx.send("Continuing...") continue elif user_response_content.startswith("feedback"): feedback_text = user_response_msg.content[len("feedback"):].strip() iteration_count = 0 # Reset self._add_to_conversation_history(user_id, role="user", text_content=f"System Feedback: {feedback_text}") await ctx.send("Continuing with feedback...") continue else: # No or other await ctx.send("AICodeAgent: Processing stopped by user.") return except asyncio.TimeoutError: await ctx.send("AICodeAgent: Continuation prompt timed out. Stopping.") return # If loop finishes due to max_iterations without reset (should be caught by above) if iteration_count >= max_iterations : await ctx.send("AICodeAgent: Stopped due to reaching maximum processing iterations.") @commands.command(name="codeagent", aliases=["ca"]) @commands.is_owner() async def codeagent_command(self, ctx: commands.Context, *, prompt: str): """Interacts with the AI Code Agent.""" if not self.genai_client: await ctx.send("AICodeAgent: Google GenAI Client is not initialized. Cannot process request.") return if not prompt: await ctx.send("AICodeAgent: Please provide a prompt for the agent.") return await self._process_agent_interaction(ctx, prompt) async def setup(bot: commands.Bot): # Ensure PROJECT_ID and LOCATION are available before adding cog # Or allow loading and let commands fail gracefully if genai_client is None if not PROJECT_ID or not LOCATION: print("AICodeAgentCog: Cannot load cog as PROJECT_ID or LOCATION is missing.") # Optionally, raise an error or just don't add the cog # For now, let it load but genai_client will be None and commands using it should check cog = AICodeAgentCog(bot) await bot.add_cog(cog) print("AICodeAgentCog loaded.")