import discord from discord.ext import commands import re import os import asyncio import subprocess import json import base64 import datetime # For snapshot naming import random # For snapshot naming from typing import Dict, Any, List, Optional, Tuple from collections import defaultdict # Added for agent_shell_sessions import xml.etree.ElementTree as ET # Google Generative AI Imports (using Vertex AI backend) from google import genai from google.genai import types as google_genai_types # Renamed to avoid conflict with typing.types from google.api_core import exceptions as google_exceptions # Import project configuration for Vertex AI try: from gurt.config import PROJECT_ID, LOCATION except ImportError: PROJECT_ID = os.getenv("GCP_PROJECT_ID") # Fallback to environment variable LOCATION = os.getenv("GCP_LOCATION") # Fallback to environment variable if not PROJECT_ID or not LOCATION: print("Warning: PROJECT_ID or LOCATION not found in gurt.config or environment variables.") # Allow cog to load but genai_client will be None from tavily import TavilyClient # Define standard safety settings using google.generativeai types # Set all thresholds to OFF as requested for internal tools STANDARD_SAFETY_SETTINGS = [ google_genai_types.SafetySetting(category=google_genai_types.HarmCategory.HARM_CATEGORY_HATE_SPEECH, threshold="BLOCK_NONE"), google_genai_types.SafetySetting(category=google_genai_types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT, threshold="BLOCK_NONE"), google_genai_types.SafetySetting(category=google_genai_types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT, threshold="BLOCK_NONE"), google_genai_types.SafetySetting(category=google_genai_types.HarmCategory.HARM_CATEGORY_HARASSMENT, threshold="BLOCK_NONE"), ] # --- Constants for command filtering, mirroring shell_command_cog.py --- # (Currently empty as they are commented out in the reference cog) BANNED_COMMANDS_AGENT = [] BANNED_PATTERNS_AGENT = [] def is_command_allowed_agent(command): """ Check if the command is allowed to run. Mirrors shell_command_cog.py. Returns (allowed, reason) tuple. """ # Check against banned commands for banned in BANNED_COMMANDS_AGENT: if banned in command.lower(): # Simple substring check return False, f"Command contains banned term: `{banned}`" # Check against banned patterns for pattern in BANNED_PATTERNS_AGENT: if re.search(pattern, command): return False, f"Command matches banned pattern: `{pattern}`" return True, None # --- End of command filtering constants and function --- COMMIT_AUTHOR = "AI Coding Agent Cog " AGENT_SYSTEM_PROMPT = """You are an expert AI Coding Agent. Your primary function is to assist the user (bot owner) by directly modifying the codebase of this Discord bot project or performing related tasks. This bot uses discord.py. Cogs placed in the 'cogs' folder are automatically loaded by the bot's main script, so you typically do not need to modify `main.py` to load new cogs you create in that directory. You operate by understanding user requests and then generating specific inline "tool calls" in your responses when you need to interact with the file system, execute commands, or search the web. **XML Tool Call Syntax:** When you need to use a tool, your response should *only* contain the XML block representing the tool call, formatted exactly as specified below. The system will parse this XML, execute the tool, and then feed the output back to you in a subsequent message prefixed with "ToolResponse:". IMPORTANT: Do NOT wrap your XML tool calls in markdown code blocks (e.g., ```xml ... ``` or ``` ... ```). Output the raw XML directly, starting with the root tool tag (e.g., ``). 1. **ReadFile:** Reads the content of a specified file. ```xml path/to/file.ext ``` (System will provide file content or error in ToolResponse) 2. **WriteFile:** Writes content to a specified file, overwriting if it exists, creating if it doesn't. ```xml path/to/file.ext are fine. ]]> ``` (System will confirm success or report error in ToolResponse) 3. **ApplyDiff:** Applies a diff/patch to a file. Use standard unidiff format for the diff_block. ```xml path/to/file.ext ``` (System will confirm success or report error in ToolResponse) 4. **ExecuteCommand:** Executes a shell command. ```xml your shell command here ``` (System will provide stdout/stderr or error in ToolResponse) 5. **ListFiles:** Lists files and directories at a given path. ```xml path/to/search true ``` (System will provide file list or error in ToolResponse) 6. **WebSearch:** Searches the web for information. ```xml your search query ``` (System will provide search results or error in ToolResponse) 7. **TaskComplete:** Signals that the current multi-step task is considered complete by the AI. ```xml A brief summary of what was accomplished or the final status. ``` (System will acknowledge and stop the current interaction loop.) **Workflow and Rules:** - **Tool Preference:** For modifying existing files, ALWAYS prefer `ApplyDiff` if the changes are targeted. Use `WriteFile` for new files or if `ApplyDiff` is unsuitable or fails repeatedly. - **Direct Operation:** You operate directly. No explicit user confirmation is needed for individual tool actions after the initial user prompt. - **Programmatic Snapshots (System-Managed):** - The system AUTOMATICALLY creates a Git snapshot of the project *before* executing `WriteFile` or `ApplyDiff` tools. - You will be notified by a "ToolResponse: SystemNotification..." message when a snapshot has been successfully created, right before your file modification tool is about to be truly processed. - You do NOT need to request or create snapshots yourself. Do NOT include snapshot steps in your `ExecuteCommand` calls for `git`. - If the system fails to create a snapshot, it will inform you with a "ToolResponse: SystemError...". In such a case, your `WriteFile` or `ApplyDiff` operation will NOT proceed. You should then typically inform the user of this critical system failure. Do not repeatedly try the same file operation if snapshot creation consistently fails. - **Git Workflow for Your Changes:** After you believe your coding task and all related file modifications are complete and correct, you MUST use the `ExecuteCommand` tool to perform the following Git operations in sequence: 1. `git add .` (to stage all your changes) 2. `git commit --author="AI Coding Agent Cog " -m "AI Agent: "` (You will generate the commit message part) 3. **Before pushing, attempt to integrate remote changes:** `git pull --rebase` 4. `git push` - **Commit Messages:** Ensure your commit messages are descriptive of the changes made. - **Conflict Resolution (Git Pull --rebase):** If `git pull --rebase` (executed via `ExecuteCommand`) results in merge conflicts, the `ToolResponse` will indicate this. You must then: a. Use `ReadFile` to inspect the conflicted file(s) to see the conflict markers. b. Decide on the resolution. c. Use `WriteFile` or `ApplyDiff` to apply your resolved version of the file(s). (Remember, a programmatic snapshot will be made before these tools run). d. Use `ExecuteCommand` for `git add ` for each resolved file. e. Use `ExecuteCommand` for `git rebase --continue`. f. Then attempt `git push` again using `ExecuteCommand`. - **Push Failures:** If `git push` still fails (e.g., other non-fast-forward errors), the `ToolResponse` will report this. You should then inform the user about the push failure and the reason, and await further instructions. Do not attempt overly complex recovery maneuvers without user guidance. - **Clarity:** Be clear and methodical. If a step fails, acknowledge it and decide on the next course of action (retry, try alternative, or inform user). - **Focus:** Your goal is to complete the coding/file manipulation task as requested by the user. """ class AICodeAgentCog(commands.Cog): def __init__(self, bot: commands.Bot): self.bot = bot self.genai_client = None self.agent_conversations: Dict[int, List[google_genai_types.Content]] = {} # User ID to conversation history self.agent_shell_sessions = defaultdict(lambda: { 'cwd': os.getcwd(), 'env': os.environ.copy() }) # Initialize Google GenAI Client for Vertex AI if PROJECT_ID and LOCATION: try: self.genai_client = genai.Client( vertexai=True, project=PROJECT_ID, location=LOCATION, ) print(f"AICodeAgentCog: Google GenAI Client initialized for Vertex AI project '{PROJECT_ID}' in location '{LOCATION}'.") except Exception as e: print(f"AICodeAgentCog: Error initializing Google GenAI Client for Vertex AI: {e}") self.genai_client = None # Ensure it's None on failure else: print("AICodeAgentCog: PROJECT_ID or LOCATION not configured. Google GenAI Client not initialized.") # AI Model Configuration self._ai_model: str = "gemini-2.5-flash-preview-05-20" # Default model self._available_models: Dict[str, str] = { "pro": "gemini-2.5-pro-preview-05-06", # Assuming this is the intended Pro model "flash": "gemini-2.5-flash-preview-05-20" } # User mentioned "gemini-2.5-pro-preview-05-06" and "gemini-2.5-flash-preview-05-20" # Updating to reflect those if they are the correct ones, otherwise the 1.5 versions are common. # For now, sticking to what was in the plan based on common Gemini models. # If 2.5 models are indeed what's intended and available, these strings should be updated. # Tavily Web Search Integration self.tavily_api_key: Optional[str] = os.getenv("TAVILY_API_KEY") self.tavily_client: Optional[TavilyClient] = None if self.tavily_api_key: self.tavily_client = TavilyClient(api_key=self.tavily_api_key) print("AICodeAgentCog: TavilyClient initialized.") else: print("AICodeAgentCog: TAVILY_API_KEY not found. TavilyClient not initialized.") self.tavily_search_depth: str = os.getenv("TAVILY_DEFAULT_SEARCH_DEPTH", "basic") self.tavily_max_results: int = int(os.getenv("TAVILY_DEFAULT_MAX_RESULTS", "5")) @commands.command(name="codeagent_model") @commands.is_owner() async def set_ai_model_command(self, ctx: commands.Context, model_key: str): """Sets the AI model for the code agent. Usage: !codeagent_model """ model_key = model_key.lower() if model_key in self._available_models: self._ai_model = self._available_models[model_key] await ctx.send(f"AICodeAgent: AI model set to: {self._ai_model} (key: {model_key})") else: await ctx.send(f"AICodeAgent: Invalid model key '{model_key}'. Available keys: {', '.join(self._available_models.keys())}") @commands.command(name="codeagent_get_model") @commands.is_owner() async def get_ai_model_command(self, ctx: commands.Context): """Gets the current AI model for the code agent.""" await ctx.send(f"AICodeAgent: Current AI model is: {self._ai_model}") @commands.command(name="codeagent_clear") @commands.is_owner() async def clear_agent_history_command(self, ctx: commands.Context): """Clears the conversation history for the code agent for the calling user.""" user_id = ctx.author.id if user_id in self.agent_conversations: del self.agent_conversations[user_id] await ctx.send("AICodeAgent: Conversation history cleared for you.") else: await ctx.send("AICodeAgent: No conversation history found for you to clear.") async def _run_git_command(self, command_str: str) -> Tuple[bool, str]: """ Runs a Git command using subprocess.Popen in a thread and returns (success_status, output_string). """ # For Git commands, we generally want them to run in the bot's current working directory, # which should be the root of the Git repository. cwd = os.getcwd() env = os.environ.copy() print(f"AICodeAgentCog: Executing Git command: '{command_str}' in CWD: '{cwd}'") def run_sync_subprocess(): try: # For git commands, shell=False is safer if command_str is split into a list. # If command_str is a single string and might contain shell features (though unlikely for our git use), # shell=True would be needed, but then command_str must be trustworthy. # Given our specific git commands, splitting them is safer. # Simplified: if it's a simple git command, can pass as string with shell=True, # but better to split for shell=False. # For now, let's assume simple commands or trust shell=True for git. # However, the example used shell=True. Let's try that first for consistency with the hint. final_command_str = command_str if "commit" in command_str and "--author" in command_str: # Heuristic to identify our commit commands # COMMIT_AUTHOR = "Name " author_name_match = re.match(r"^(.*?)\s*<(.+?)>$", COMMIT_AUTHOR) if author_name_match: committer_name = author_name_match.group(1).strip() committer_email = author_name_match.group(2).strip() # Prepend -c flags for committer identity # Ensure the original command_str is correctly modified. # If command_str starts with "git commit", we insert after "git". if command_str.strip().startswith("git commit"): parts = command_str.strip().split(" ", 1) # "git", "commit ..." final_command_str = f"{parts[0]} -c user.name=\"{committer_name}\" -c user.email=\"{committer_email}\" {parts[1]}" print(f"AICodeAgentCog: Modified commit command for committer ID: {final_command_str}") else: print(f"AICodeAgentCog: Warning - Could not parse COMMIT_AUTHOR ('{COMMIT_AUTHOR}') to set committer identity.") proc = subprocess.Popen( final_command_str, # Potentially modified command string shell=True, # Execute through the shell cwd=cwd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, # Decodes stdout/stderr as text errors='replace' # Handles decoding errors ) stdout, stderr = proc.communicate(timeout=60) # 60-second timeout for git commands return (stdout, stderr, proc.returncode, False) except subprocess.TimeoutExpired: proc.kill() stdout, stderr = proc.communicate() return (stdout, stderr, -1, True) # -1 for timeout-specific return code except FileNotFoundError as fnf_err: # Specifically catch if 'git' command itself is not found print(f"AICodeAgentCog: FileNotFoundError for command '{final_command_str}': {fnf_err}. Is Git installed and in PATH?") return ("", f"FileNotFoundError: {fnf_err}. Ensure Git is installed and in PATH.", -2, False) except Exception as e: print(f"AICodeAgentCog: Exception in run_sync_subprocess for '{final_command_str}': {type(e).__name__} - {e}") return ("", str(e), -3, False) # -3 for other exceptions stdout_str, stderr_str, returncode, timed_out = await asyncio.to_thread(run_sync_subprocess) full_output = "" if timed_out: full_output += "Command timed out after 60 seconds.\n" if stdout_str: full_output += f"Stdout:\n{stdout_str.strip()}\n" if stderr_str: full_output += f"Stderr:\n{stderr_str.strip()}\n" if returncode == 0: # For commands like `git rev-parse --abbrev-ref HEAD`, stdout is the primary result. # If stdout is empty but no error, return it as is. # If full_output is just "Stdout:\n\n", it means empty stdout. # We want the actual stdout for rev-parse, not the "Stdout:" prefix. # Check original command_str for this specific case, not final_command_str which might be modified if command_str == "git rev-parse --abbrev-ref HEAD" and stdout_str: # Use original command_str for this check return True, stdout_str.strip() # Return just the branch name return True, full_output.strip() if full_output.strip() else "Command executed successfully with no output." else: error_message = f"Git command failed. Return Code: {returncode}\n{full_output.strip()}" print(f"AICodeAgentCog: {error_message}") return False, error_message async def _create_programmatic_snapshot(self) -> Optional[str]: """Creates a programmatic Git snapshot using a temporary branch.""" try: # Get current branch name success, current_branch_name = await self._run_git_command("git rev-parse --abbrev-ref HEAD") if not success: print(f"AICodeAgentCog: Failed to get current branch name for snapshot: {current_branch_name}") return None current_branch_name = current_branch_name.strip() timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") random_hex = random.randbytes(3).hex() snapshot_branch_name = f"snapshot_cog_{timestamp}_{random_hex}" # Create and checkout the new snapshot branch success, output = await self._run_git_command(f"git checkout -b {snapshot_branch_name}") if not success: print(f"AICodeAgentCog: Failed to create snapshot branch '{snapshot_branch_name}': {output}") # Attempt to switch back if checkout failed mid-operation await self._run_git_command(f"git checkout {current_branch_name}") # Best effort return None # Commit any currently staged changes or create an empty commit for a clean snapshot point commit_message = f"Cog Snapshot: Pre-AI Edit State on branch {snapshot_branch_name}" success, output = await self._run_git_command(f"git commit --author=\"{COMMIT_AUTHOR}\" -m \"{commit_message}\" --allow-empty") if not success: print(f"AICodeAgentCog: Failed to commit snapshot on '{snapshot_branch_name}': {output}") # Attempt to switch back and clean up branch await self._run_git_command(f"git checkout {current_branch_name}") await self._run_git_command(f"git branch -D {snapshot_branch_name}") # Best effort cleanup return None # Switch back to the original branch success, output = await self._run_git_command(f"git checkout {current_branch_name}") if not success: print(f"AICodeAgentCog: CRITICAL - Failed to switch back to original branch '{current_branch_name}' after snapshot. Current branch might be '{snapshot_branch_name}'. Manual intervention may be needed. Error: {output}") return snapshot_branch_name # Return it so it can potentially be used/deleted print(f"AICodeAgentCog: Successfully created snapshot branch: {snapshot_branch_name}") return snapshot_branch_name except Exception as e: print(f"AICodeAgentCog: Exception in _create_programmatic_snapshot: {e}") return None @commands.command(name="codeagent_list_snapshots") @commands.is_owner() async def list_snapshots_command(self, ctx: commands.Context): """Lists available programmatic Git snapshots created by the cog.""" success, output = await self._run_git_command('git branch --list "snapshot_cog_*"') if success: if output: await ctx.send(f"Available snapshots:\n```\n{output}\n```") else: await ctx.send("No programmatic snapshots found.") else: await ctx.send(f"Error listing snapshots: {output}") @commands.command(name="codeagent_revert_to_snapshot") @commands.is_owner() async def revert_to_snapshot_command(self, ctx: commands.Context, snapshot_branch_name: str): """Reverts the current branch to the state of a given snapshot branch.""" if not snapshot_branch_name.startswith("snapshot_cog_"): await ctx.send("Invalid snapshot name. Must start with 'snapshot_cog_'.") return # Check if snapshot branch exists success, branches_output = await self._run_git_command("git branch --list") # Normalize branches_output for reliable checking existing_branches = [b.strip().lstrip('* ') for b in branches_output.splitlines()] if not success or snapshot_branch_name not in existing_branches: await ctx.send(f"Snapshot branch '{snapshot_branch_name}' not found.") return await ctx.send(f"Attempting to revert current branch to snapshot '{snapshot_branch_name}'...") success, current_branch = await self._run_git_command("git rev-parse --abbrev-ref HEAD") if not success: await ctx.send(f"Failed to determine current branch before revert: {current_branch}") return current_branch = current_branch.strip() success, output = await self._run_git_command(f"git reset --hard {snapshot_branch_name}") if success: await ctx.send(f"Successfully reverted current branch ('{current_branch}') to snapshot '{snapshot_branch_name}'.\nOutput:\n```\n{output}\n```") else: await ctx.send(f"Error reverting to snapshot '{snapshot_branch_name}':\n```\n{output}\n```") @commands.command(name="codeagent_delete_snapshot") @commands.is_owner() async def delete_snapshot_command(self, ctx: commands.Context, snapshot_branch_name: str): """Deletes a programmatic Git snapshot branch.""" if not snapshot_branch_name.startswith("snapshot_cog_"): await ctx.send("Invalid snapshot name. Must start with 'snapshot_cog_'.") return success, branches_output = await self._run_git_command("git branch --list") existing_branches = [b.strip().lstrip('* ') for b in branches_output.splitlines()] if not success or snapshot_branch_name not in existing_branches: await ctx.send(f"Snapshot branch '{snapshot_branch_name}' not found.") return success, current_branch_name_str = await self._run_git_command("git rev-parse --abbrev-ref HEAD") if success and current_branch_name_str.strip() == snapshot_branch_name: await ctx.send(f"Cannot delete snapshot branch '{snapshot_branch_name}' as it is the current branch. Please checkout to a different branch first.") return elif not success: await ctx.send(f"Could not determine current branch. Deletion aborted for safety. Error: {current_branch_name_str}") return await ctx.send(f"Attempting to delete snapshot branch '{snapshot_branch_name}'...") success, output = await self._run_git_command(f"git branch -D {snapshot_branch_name}") if success: await ctx.send(f"Successfully deleted snapshot branch '{snapshot_branch_name}'.\nOutput:\n```\n{output}\n```") else: await ctx.send(f"Error deleting snapshot branch '{snapshot_branch_name}':\n```\n{output}\n```") def _get_conversation_history(self, user_id: int) -> List[google_genai_types.Content]: if user_id not in self.agent_conversations: self.agent_conversations[user_id] = [] return self.agent_conversations[user_id] def _add_to_conversation_history(self, user_id: int, role: str, text_content: str, is_tool_response: bool = False): history = self._get_conversation_history(user_id) # For Vertex AI, 'function' role is used for tool responses, 'model' for AI text, 'user' for user text. # We'll adapt this slightly for our inline tools. # AI's raw response (potentially with tool call) -> model # Tool's output -> user (formatted as "ToolResponse: ...") # User's direct prompt -> user # For simplicity in our loop, we might treat tool responses as if they are from the 'user' # to guide the AI's next step, or use a specific format the AI understands. # The system prompt already guides the AI to expect "ToolResponse:" # Let's ensure content is always a list of parts for Vertex parts = [google_genai_types.Part(text=text_content)] history.append(google_genai_types.Content(role=role, parts=parts)) # Keep history to a reasonable length (e.g., last 20 turns, or token-based limit later) max_history_items = 20 if len(history) > max_history_items: self.agent_conversations[user_id] = history[-max_history_items:] async def _parse_and_execute_tool_call(self, ctx: commands.Context, ai_response_text: str) -> Tuple[str, Optional[str]]: """ Parses AI response for an XML tool call, executes it, and returns the tool's output string. Returns a tuple: (status: str, data: Optional[str]). Status can be "TOOL_OUTPUT", "TASK_COMPLETE", "NO_TOOL". Data is the tool output string, completion message, or original AI text. """ try: clean_ai_response_text = ai_response_text.strip() # Remove potential markdown ```xml ... ``` wrapper if clean_ai_response_text.startswith("```"): # More robustly remove potential ```xml ... ``` or just ``` ... ``` clean_ai_response_text = re.sub(r"^```(?:xml)?\s*\n?", "", clean_ai_response_text, flags=re.MULTILINE) clean_ai_response_text = re.sub(r"\n?```$", "", clean_ai_response_text, flags=re.MULTILINE) clean_ai_response_text = clean_ai_response_text.strip() if not clean_ai_response_text or not clean_ai_response_text.startswith("<") or not clean_ai_response_text.endswith(">"): return "NO_TOOL", ai_response_text root = ET.fromstring(clean_ai_response_text) tool_name = root.tag parameters = {child.tag: child.text for child in root} if tool_name == "ReadFile": file_path = parameters.get("path") if not file_path: return "TOOL_OUTPUT", "ToolResponse: Error\n---\nReadFile: Missing 'path' parameter." tool_output = await self._execute_tool_read_file(file_path) return "TOOL_OUTPUT", f"ToolResponse: ReadFile\nPath: {file_path}\n---\n{tool_output}" elif tool_name == "WriteFile": file_path = parameters.get("path") content = parameters.get("content") # CDATA content will be in .text if file_path is None or content is None: return "TOOL_OUTPUT", "ToolResponse: Error\n---\nWriteFile: Missing 'path' or 'content' parameter." snapshot_branch = await self._create_programmatic_snapshot() if not snapshot_branch: return "TOOL_OUTPUT", "ToolResponse: SystemError\n---\nFailed to create project snapshot. WriteFile operation aborted." else: await ctx.send(f"AICodeAgent: [Info] Created snapshot: {snapshot_branch} before writing to {file_path}") tool_output = await self._execute_tool_write_file(file_path, content) return "TOOL_OUTPUT", f"ToolResponse: WriteFile\nPath: {file_path}\n---\n{tool_output}" elif tool_name == "ApplyDiff": file_path = parameters.get("path") diff_block = parameters.get("diff_block") # CDATA content if file_path is None or diff_block is None: return "TOOL_OUTPUT", "ToolResponse: Error\n---\nApplyDiff: Missing 'path' or 'diff_block' parameter." snapshot_branch = await self._create_programmatic_snapshot() if not snapshot_branch: return "TOOL_OUTPUT", "ToolResponse: SystemError\n---\nFailed to create project snapshot. ApplyDiff operation aborted." else: await ctx.send(f"AICodeAgent: [Info] Created snapshot: {snapshot_branch} before applying diff to {file_path}") tool_output = await self._execute_tool_apply_diff(file_path, diff_block) return "TOOL_OUTPUT", f"ToolResponse: ApplyDiff\nPath: {file_path}\n---\n{tool_output}" elif tool_name == "ExecuteCommand": command_str = parameters.get("command") if not command_str: return "TOOL_OUTPUT", "ToolResponse: Error\n---\nExecuteCommand: Missing 'command' parameter." user_id = ctx.author.id tool_output = await self._execute_tool_execute_command(command_str, user_id) return "TOOL_OUTPUT", f"ToolResponse: ExecuteCommand\nCommand: {command_str}\n---\n{tool_output}" elif tool_name == "ListFiles": file_path = parameters.get("path") recursive_str = parameters.get("recursive") # Will be None if tag is missing recursive = recursive_str.lower() == 'true' if recursive_str else False # Handles None or empty string safely if not file_path: return "TOOL_OUTPUT", "ToolResponse: Error\n---\nListFiles: Missing 'path' parameter." tool_output = await self._execute_tool_list_files(file_path, recursive) return "TOOL_OUTPUT", f"ToolResponse: ListFiles\nPath: {file_path}\nRecursive: {recursive}\n---\n{tool_output}" elif tool_name == "WebSearch": query_str = parameters.get("query") if not query_str: return "TOOL_OUTPUT", "ToolResponse: Error\n---\nWebSearch: Missing 'query' parameter." tool_output = await self._execute_tool_web_search(query_str) return "TOOL_OUTPUT", f"ToolResponse: WebSearch\nQuery: {query_str}\n---\n{tool_output}" elif tool_name == "TaskComplete": message = parameters.get("message", "Task marked as complete by AI.") # Default if message tag is missing or empty return "TASK_COMPLETE", message if message is not None else "Task marked as complete by AI." else: # Unknown tool name found in XML return "TOOL_OUTPUT", f"ToolResponse: Error\n---\nUnknown tool: {tool_name} in XML: {clean_ai_response_text[:200]}" except ET.ParseError: # Not valid XML # print(f"AICodeAgentCog: XML ParseError for response: {ai_response_text[:200]}") # Debugging return "NO_TOOL", ai_response_text except Exception as e: # Catch any other unexpected errors during parsing/dispatch print(f"AICodeAgentCog: Unexpected error in _parse_and_execute_tool_call: {type(e).__name__} - {e} for response {ai_response_text[:200]}") # import traceback # traceback.print_exc() # For more detailed debugging if needed return "TOOL_OUTPUT", f"ToolResponse: SystemError\n---\nError processing tool call: {type(e).__name__} - {e}" # --- Tool Execution Methods --- # (Implementations for _execute_tool_... methods remain the same) async def _execute_tool_read_file(self, path: str) -> str: print(f"AICodeAgentCog: Placeholder _execute_tool_read_file for path: {path}") # Actual implementation: try: # Ensure path is within project, basic safety. More robust checks might be needed. # base_dir = os.path.abspath(".") # Or specific project root # requested_path = os.path.abspath(os.path.join(base_dir, path)) # if not requested_path.startswith(base_dir): # return "Error: File path is outside the allowed project directory." if not os.path.exists(path): return f"Error: File not found at '{path}'" if os.path.isdir(path): return f"Error: Path '{path}' is a directory, not a file." with open(path, 'r', encoding='utf-8', errors='replace') as f: return f.read() except Exception as e: return f"Error reading file '{path}': {type(e).__name__} - {e}" async def _execute_tool_write_file(self, path: str, content: str) -> str: print(f"AICodeAgentCog: Placeholder _execute_tool_write_file for path: {path}") # Actual implementation: try: # base_dir = os.path.abspath(".") # requested_path = os.path.abspath(os.path.join(base_dir, path)) # if not requested_path.startswith(base_dir): # return "Error: File path is outside the allowed project directory." os.makedirs(os.path.dirname(path) or '.', exist_ok=True) # Ensure directory exists with open(path, 'w', encoding='utf-8') as f: f.write(content) return f"Successfully wrote to file '{path}'." except Exception as e: return f"Error writing to file '{path}': {type(e).__name__} - {e}" async def _execute_tool_apply_diff(self, path: str, diff_block: str) -> str: print(f"AICodeAgentCog: Attempting _execute_tool_apply_diff for path: {path}") if not os.path.exists(path): return f"Error: File not found at '{path}' for applying diff." if os.path.isdir(path): return f"Error: Path '{path}' is a directory, cannot apply diff." try: # Ensure diff_block ends with a newline for `patch` utility if not diff_block.endswith('\n'): diff_block += '\n' process = await asyncio.create_subprocess_exec( 'patch', path, # Target file for the patch stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout, stderr = await process.communicate(input=diff_block.encode('utf-8')) output_str = "" if stdout: output_str += f"Stdout:\n{stdout.decode(errors='replace')}\n" if stderr: output_str += f"Stderr:\n{stderr.decode(errors='replace')}\n" if process.returncode == 0: return f"Successfully applied diff to '{path}'.\n{output_str}" else: # Try to provide context if patch failed, e.g. if it created .rej file rej_file = f"{path}.rej" if os.path.exists(rej_file): with open(rej_file, 'r', encoding='utf-8', errors='replace') as rf: rej_content = rf.read() output_str += f"\nRejects file found ({rej_file}):\n{rej_content[:500]}...\n(Please check this file for details of failed hunks)" return f"Error applying diff to '{path}' (exit code {process.returncode}).\n{output_str}" except FileNotFoundError: return "Error: The 'patch' command-line utility was not found. Diff application failed. Please ensure 'patch' is installed and in the system PATH." except Exception as e: return f"Error applying diff to '{path}': {type(e).__name__} - {e}" async def _execute_tool_execute_command(self, command: str, user_id: int) -> str: session = self.agent_shell_sessions[user_id] cwd = session['cwd'] env = session['env'] print(f"AICodeAgentCog: Attempting _execute_tool_execute_command for user_id {user_id}: '{command}' in CWD: '{cwd}'") # Mirroring shell_command_cog.py's command allowance check allowed, reason = is_command_allowed_agent(command) if not allowed: return f"⛔ Command not allowed: {reason}" # Mirroring shell_command_cog.py's settings timeout_seconds = 30.0 max_output_length = 1900 def run_agent_subprocess_sync(cmd_str, current_cwd, current_env, cmd_timeout_secs): try: proc = subprocess.Popen( cmd_str, shell=True, cwd=current_cwd, env=current_env, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) try: stdout, stderr = proc.communicate(timeout=cmd_timeout_secs) return (stdout, stderr, proc.returncode, False) # stdout, stderr, rc, timed_out except subprocess.TimeoutExpired: proc.kill() # Communicate again to fetch any output after kill stdout, stderr = proc.communicate() return (stdout, stderr, -1, True) # Using -1 for timeout rc, as in shell_command_cog except Exception as e: # Capture other exceptions during Popen or initial communicate return (b"", str(e).encode('utf-8', errors='replace'), -2, False) # -2 for other errors try: # Execute the synchronous subprocess logic in a separate thread stdout_bytes, stderr_bytes, returncode, timed_out = await asyncio.to_thread( run_agent_subprocess_sync, command, cwd, env, timeout_seconds ) # Update session working directory if 'cd' command was used and it was successful # This logic is from the previous iteration and is similar to shell_command_cog's attempt if command.strip().startswith('cd ') and returncode == 0: new_dir_arg_str = command.strip()[len("cd "):].strip() potential_new_cwd = None # Handle 'cd' with no arguments (e.g. 'cd' or 'cd ~') - typically goes to home if not new_dir_arg_str or new_dir_arg_str == '~' or new_dir_arg_str == '$HOME': potential_new_cwd = os.path.expanduser('~') elif new_dir_arg_str == '-': # 'cd -' (previous directory) is hard to track reliably without more state, # so we won't update cwd for it, similar to shell_command_cog's limitations. print(f"AICodeAgentCog: 'cd -' used by user_id {user_id}. CWD tracking will not update for this command.") else: # For 'cd ' temp_arg = new_dir_arg_str # Remove quotes if present if (temp_arg.startswith('"') and temp_arg.endswith('"')) or \ (temp_arg.startswith("'") and temp_arg.endswith("'")): temp_arg = temp_arg[1:-1] if os.path.isabs(temp_arg): potential_new_cwd = temp_arg else: potential_new_cwd = os.path.abspath(os.path.join(cwd, temp_arg)) if potential_new_cwd and os.path.isdir(potential_new_cwd): session['cwd'] = potential_new_cwd print(f"AICodeAgentCog: Updated CWD for user_id {user_id} to: {session['cwd']}") elif new_dir_arg_str and new_dir_arg_str != '-' and potential_new_cwd: print(f"AICodeAgentCog: 'cd' command for user_id {user_id} seemed to succeed (rc=0), but CWD tracking logic could not confirm new path '{potential_new_cwd}' or it's not a directory. CWD remains '{session['cwd']}'. Command: '{command}'.") elif new_dir_arg_str and new_dir_arg_str != '-': # if potential_new_cwd was None but arg was given print(f"AICodeAgentCog: 'cd' command for user_id {user_id} with arg '{new_dir_arg_str}' succeeded (rc=0), but path resolution for CWD tracking failed. CWD remains '{session['cwd']}'.") # Format Output identically to shell_command_cog.py's _execute_local_command result_parts = [] stdout_str = stdout_bytes.decode('utf-8', errors='replace').strip() stderr_str = stderr_bytes.decode('utf-8', errors='replace').strip() if timed_out: result_parts.append(f"⏱️ Command timed out after {timeout_seconds} seconds.") if stdout_str: if len(stdout_str) > max_output_length: stdout_str = stdout_str[:max_output_length] + "... (output truncated)" result_parts.append(f"📤 **STDOUT:**\n```\n{stdout_str}\n```") if stderr_str: if len(stderr_str) > max_output_length: stderr_str = stderr_str[:max_output_length] + "... (output truncated)" result_parts.append(f"⚠️ **STDERR:**\n```\n{stderr_str}\n```") if returncode != 0 and not timed_out: # Don't add exit code if it was a timeout result_parts.append(f"❌ **Exit Code:** {returncode}") else: # Successful or timed out (timeout message already added) if not result_parts: # No stdout, no stderr, not timed out, and successful result_parts.append("✅ Command executed successfully (no output).") return "\n".join(result_parts) except Exception as e: # General exception during subprocess handling return f"Exception executing command '{command}': {type(e).__name__} - {e}" async def _execute_tool_list_files(self, path: str, recursive: bool) -> str: print(f"AICodeAgentCog: Attempting _execute_tool_list_files for path: {path}, recursive: {recursive}") # Actual implementation: try: # base_dir = os.path.abspath(".") # requested_path = os.path.abspath(os.path.join(base_dir, path)) # if not requested_path.startswith(base_dir): # return "Error: Path is outside the allowed project directory." if not os.path.exists(path): return f"Error: Path not found at '{path}'" if not os.path.isdir(path): return f"Error: Path '{path}' is not a directory." file_list = [] excluded_dirs = {"__pycache__", ".git", ".vscode", ".idea", "node_modules", "venv", ".env", "terminal_images"} if recursive: for root, dirs, files in os.walk(path, topdown=True): # Exclude specified directories from further traversal dirs[:] = [d for d in dirs if d not in excluded_dirs] for name in files: file_list.append(os.path.join(root, name)) # Add filtered directories to the list for name in dirs: # These are already filtered dirs file_list.append(os.path.join(root, name) + os.sep) # Indicate dirs else: # Non-recursive case for item in os.listdir(path): if item in excluded_dirs: # Check if the item itself is an excluded directory name continue full_item_path = os.path.join(path, item) if os.path.isdir(full_item_path): file_list.append(item + os.sep) # Indicate dirs else: file_list.append(item) return "\n".join(file_list) if file_list else "No files or directories found." except Exception as e: return f"Error listing files at '{path}': {type(e).__name__} - {e}" async def _execute_tool_web_search(self, query: str) -> str: print(f"AICodeAgentCog: Placeholder _execute_tool_web_search for query: {query}") if not self.tavily_client: return "Error: Tavily client not initialized. Cannot perform web search." try: # Using basic parameters for now, can be expanded response = await asyncio.to_thread( self.tavily_client.search, query=query, search_depth=self.tavily_search_depth, # "basic" or "advanced" max_results=self.tavily_max_results, include_answer=True # Try to get a direct answer ) results_str_parts = [] if response.get("answer"): results_str_parts.append(f"Answer: {response['answer']}") if response.get("results"): for i, res in enumerate(response["results"][:self.tavily_max_results]): # Show up to max_results results_str_parts.append(f"\nResult {i+1}: {res.get('title', 'N/A')}\nURL: {res.get('url', 'N/A')}\nSnippet: {res.get('content', 'N/A')[:250]}...") # Truncate snippet return "\n".join(results_str_parts) if results_str_parts else "No search results found." except Exception as e: return f"Error during Tavily web search for '{query}': {type(e).__name__} - {e}" async def _process_agent_interaction(self, ctx: commands.Context, initial_prompt_text: str): user_id = ctx.author.id self._add_to_conversation_history(user_id, role="user", text_content=initial_prompt_text) iteration_count = 0 max_iterations = 10 # Configurable, from plan # Ensure genai_client is available if not self.genai_client: await ctx.send("AICodeAgent: Google GenAI Client is not initialized. Cannot process request.") return async with ctx.typing(): while iteration_count < max_iterations: current_history = self._get_conversation_history(user_id) if not current_history: # Should not happen if initial prompt was added await ctx.send("AICodeAgent: Error - conversation history is empty.") return try: # Construct messages for Vertex AI API # The system prompt is passed via generation_config.system_instruction vertex_contents = current_history # Already in types.Content format generation_config = google_genai_types.GenerateContentConfig( temperature=0.7, # Adjust as needed max_output_tokens=4096, # Adjust as needed safety_settings=STANDARD_SAFETY_SETTINGS, # System instruction is critical here system_instruction=google_genai_types.Content( role="system", # Though for Gemini, system prompt is often first user message or model tuning parts=[google_genai_types.Part(text=AGENT_SYSTEM_PROMPT)] ) ) print(f"AICodeAgentCog: Sending to Vertex AI. Model: {self._ai_model}. History items: {len(vertex_contents)}") # for i, item in enumerate(vertex_contents): # print(f" History {i} Role: {item.role}, Parts: {item.parts}") response = await self.genai_client.aio.models.generate_content( model=f"publishers/google/models/{self._ai_model}", contents=vertex_contents, config=generation_config, # Corrected parameter name # No 'tools' or 'tool_config' for inline tool usage ) # Safely extract text from response ai_response_text = "" if response.candidates and response.candidates[0].content and response.candidates[0].content.parts: ai_response_text = response.candidates[0].content.parts[0].text else: # Handle cases like safety blocks or empty responses finish_reason = response.candidates[0].finish_reason if response.candidates else "UNKNOWN" safety_ratings_str = "" if response.candidates and response.candidates[0].safety_ratings: sr = response.candidates[0].safety_ratings safety_ratings_str = ", ".join([f"{rating.category.name}: {rating.probability.name}" for rating in sr]) if finish_reason == google_genai_types.FinishReason.SAFETY: await ctx.send(f"AICodeAgent: AI response was blocked due to safety settings: {safety_ratings_str}") self._add_to_conversation_history(user_id, role="model", text_content=f"[Blocked by Safety: {safety_ratings_str}]") return else: await ctx.send(f"AICodeAgent: AI returned an empty or non-text response. Finish Reason: {finish_reason}. Safety: {safety_ratings_str}") self._add_to_conversation_history(user_id, role="model", text_content="[Empty or Non-Text Response]") return if not ai_response_text.strip(): await ctx.send("AICodeAgent: AI returned an empty response text.") self._add_to_conversation_history(user_id, role="model", text_content="[Empty Response Text]") return self._add_to_conversation_history(user_id, role="model", text_content=ai_response_text) print(f"AICodeAgentCog: AI Raw Response:\n{ai_response_text}") # Parse for inline tool call # _parse_and_execute_tool_call now returns -> Tuple[str, Optional[str]] # status can be "TOOL_OUTPUT", "TASK_COMPLETE", "NO_TOOL" # data is the tool output string, completion message, or original AI text parse_status, parsed_data = await self._parse_and_execute_tool_call(ctx, ai_response_text) if parse_status == "TASK_COMPLETE": completion_message = parsed_data if parsed_data is not None else "Task marked as complete by AI." await ctx.send(f"AICodeAgent: Task Complete!\n{completion_message}") # Log AI's completion signal to history (optional, but good for context) # self._add_to_conversation_history(user_id, role="model", text_content=f"TaskComplete: message: {completion_message}") return # End of interaction elif parse_status == "TOOL_OUTPUT": tool_output_str = parsed_data if tool_output_str is None: # Should not happen if status is TOOL_OUTPUT but defensive tool_output_str = "Error: Tool executed but returned no output string." print(f"AICodeAgentCog: Tool Output:\n{tool_output_str}") self._add_to_conversation_history(user_id, role="user", text_content=tool_output_str) # Feed tool output back as 'user' iteration_count += 1 # Optionally send tool output to Discord for transparency if desired # if len(tool_output_str) < 1900 : await ctx.send(f"```{tool_output_str}```") continue # Loop back to AI with tool output in history elif parse_status == "NO_TOOL": # No tool call found, this is the final AI response for this turn final_ai_text = parsed_data # This is the original ai_response_text if final_ai_text is None: # Should not happen final_ai_text = "AI provided no textual response." if len(final_ai_text) > 1950: await ctx.send(final_ai_text[:1950] + "\n...(message truncated)") else: await ctx.send(final_ai_text) return # End of interaction else: # Should not happen await ctx.send("AICodeAgent: Internal error - unknown parse status from tool parser.") return except google_exceptions.GoogleAPICallError as e: await ctx.send(f"AICodeAgent: Vertex AI API call failed: {e}") return except Exception as e: await ctx.send(f"AICodeAgent: An unexpected error occurred during AI interaction: {e}") print(f"AICodeAgentCog: Interaction Error: {type(e).__name__} - {e}") import traceback traceback.print_exc() return # Iteration limit check (moved inside loop for clarity, but logic is similar) if iteration_count >= max_iterations: await ctx.send(f"AICodeAgent: Reached iteration limit ({max_iterations}).") try: check = lambda m: m.author == ctx.author and m.channel == ctx.channel and \ m.content.lower().startswith(("yes", "no", "continue", "feedback")) await ctx.send("Continue processing? (yes/no/feedback ):") user_response_msg = await self.bot.wait_for('message', check=check, timeout=300.0) user_response_content = user_response_msg.content.lower() if user_response_content.startswith("yes") or user_response_content.startswith("continue"): iteration_count = 0 # Reset iteration count self._add_to_conversation_history(user_id, role="user", text_content="[User approved continuation]") await ctx.send("Continuing...") continue elif user_response_content.startswith("feedback"): feedback_text = user_response_msg.content[len("feedback"):].strip() iteration_count = 0 # Reset self._add_to_conversation_history(user_id, role="user", text_content=f"System Feedback: {feedback_text}") await ctx.send("Continuing with feedback...") continue else: # No or other await ctx.send("AICodeAgent: Processing stopped by user.") return except asyncio.TimeoutError: await ctx.send("AICodeAgent: Continuation prompt timed out. Stopping.") return # If loop finishes due to max_iterations without reset (should be caught by above) if iteration_count >= max_iterations : await ctx.send("AICodeAgent: Stopped due to reaching maximum processing iterations.") @commands.command(name="codeagent", aliases=["ca"]) @commands.is_owner() async def codeagent_command(self, ctx: commands.Context, *, prompt: str): """Interacts with the AI Code Agent.""" if not self.genai_client: await ctx.send("AICodeAgent: Google GenAI Client is not initialized. Cannot process request.") return if not prompt: await ctx.send("AICodeAgent: Please provide a prompt for the agent.") return await self._process_agent_interaction(ctx, prompt) async def setup(bot: commands.Bot): # Ensure PROJECT_ID and LOCATION are available before adding cog # Or allow loading and let commands fail gracefully if genai_client is None if not PROJECT_ID or not LOCATION: print("AICodeAgentCog: Cannot load cog as PROJECT_ID or LOCATION is missing.") # Optionally, raise an error or just don't add the cog # For now, let it load but genai_client will be None and commands using it should check cog = AICodeAgentCog(bot) await bot.add_cog(cog) print("AICodeAgentCog loaded.")