import discord from discord.ext import commands import re import os import asyncio import aiohttp import subprocess import json import base64 import datetime # For snapshot naming import random # For snapshot naming import ast # For GetCodeStructure import pathlib # For path manipulations, potentially from typing import Dict, Any, List, Optional, Tuple from collections import defaultdict # Added for agent_shell_sessions import xml.etree.ElementTree as ET # Google Generative AI Imports (using Vertex AI backend) from google import genai from google.genai import ( types as google_genai_types, ) # Renamed to avoid conflict with typing.types from google.api_core import exceptions as google_exceptions # Import project configuration for Vertex AI try: from gurt.config import PROJECT_ID, LOCATION except ImportError: PROJECT_ID = os.getenv("GCP_PROJECT_ID") # Fallback to environment variable LOCATION = os.getenv("GCP_LOCATION") # Fallback to environment variable if not PROJECT_ID or not LOCATION: print( "Warning: PROJECT_ID or LOCATION not found in gurt.config or environment variables." ) # Allow cog to load but genai_client will be None from tavily import TavilyClient # Define standard safety settings using google.generativeai types # Set all thresholds to OFF as requested for internal tools STANDARD_SAFETY_SETTINGS = [ google_genai_types.SafetySetting( category=google_genai_types.HarmCategory.HARM_CATEGORY_HATE_SPEECH, threshold="BLOCK_NONE", ), google_genai_types.SafetySetting( category=google_genai_types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT, threshold="BLOCK_NONE", ), google_genai_types.SafetySetting( category=google_genai_types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT, threshold="BLOCK_NONE", ), google_genai_types.SafetySetting( category=google_genai_types.HarmCategory.HARM_CATEGORY_HARASSMENT, threshold="BLOCK_NONE", ), ] # --- Constants for command filtering, mirroring shell_command_cog.py --- # (Currently empty as they are commented out in the reference cog) BANNED_COMMANDS_AGENT = [] BANNED_PATTERNS_AGENT = [] def is_command_allowed_agent(command): """ Check if the command is allowed to run. Mirrors shell_command_cog.py. Returns (allowed, reason) tuple. """ # Check against banned commands for banned in BANNED_COMMANDS_AGENT: if banned in command.lower(): # Simple substring check return False, f"Command contains banned term: `{banned}`" # Check against banned patterns for pattern in BANNED_PATTERNS_AGENT: if re.search(pattern, command): return False, f"Command matches banned pattern: `{pattern}`" return True, None # --- End of command filtering constants and function --- COMMIT_AUTHOR = "AI Coding Agent Cog " AGENT_SYSTEM_PROMPT = """You are an expert AI Coding Agent. Your primary function is to assist the user (bot owner) by directly modifying the codebase of this Discord bot project or performing related tasks. This bot uses discord.py. Cogs placed in the 'cogs' folder are automatically loaded by the bot's main script, so you typically do not need to modify `main.py` to load new cogs you create in that directory. You operate by understanding user requests and then generating specific inline "tool calls" in your responses when you need to interact with the file system, execute commands, or search the web. **XML Tool Call Syntax:** When you need to use a tool, your response should *only* contain the XML block representing the tool call, formatted exactly as specified below. The system will parse this XML, execute the tool, and then feed the output back to you in a subsequent message prefixed with "ToolResponse:". IMPORTANT: Do NOT wrap your XML tool calls in markdown code blocks (e.g., ```xml ... ``` or ``` ... ```). Output the raw XML directly, starting with the root tool tag (e.g., ``). **Available Tools:** 1. **ReadFile:** Reads the content of a specified file. * Can read specific line ranges or "peek" at file ends. ```xml path/to/file.ext 10 20 5 5 ``` (System will provide file content or error in ToolResponse) 2. **WriteFile:** Writes content to a specified file, overwriting if it exists, creating if it doesn't. ```xml path/to/file.ext are fine. ]]> ``` (System will confirm success or report error in ToolResponse. A snapshot is made before writing.) 3. **ApplyDiff:** Applies targeted modifications to an existing file by searching for specific sections of content and replacing them. The `diff_block` uses a custom SEARCH/REPLACE format. You can perform multiple distinct search and replace operations within a single call by providing multiple SEARCH/REPLACE blocks. ```xml path/to/file.ext ``` (System will confirm success or report error in ToolResponse. A snapshot is made before applying.) 4. **ExecuteCommand:** Executes a shell command. Tracks CWD per user session. ```xml your shell command here ``` (System will provide stdout/stderr or error in ToolResponse) 5. **ListFiles:** Lists files and directories at a given path. * Supports recursion, filtering by extension/regex, and including metadata. ```xml path/to/search true .py,.txt test_.*\.py true ``` (System will provide file list or error in ToolResponse) 6. **WebSearch:** Searches the web for information using Tavily. ```xml your search query ``` (System will provide search results or error in ToolResponse) 7. **LintFile:** Checks code quality using a linter (e.g., pylint, flake8). ```xml path/to/python_file.py pylint ``` (System will provide linter output or error in ToolResponse) 8. **GetCodeStructure:** Parses a Python file and provides an overview of its structure (classes, functions, signatures, docstrings) using AST. ```xml path/to/python_file.py ``` (System will provide structured code overview or error in ToolResponse) 9. **FindSymbolDefinition:** Locates where a specific symbol (function, class, variable) is defined within the project. (Initial implementation might be grep-based). ```xml my_function ./cogs *.py ``` (System will provide definition location(s) or error in ToolResponse) 10. **ManageCog:** Loads, unloads, reloads, or lists the bot's cogs. ```xml load cogs.my_cog ``` (System will confirm action or provide error/list in ToolResponse) 11. **RunTests:** Executes unit tests (e.g., pytest, unittest) for specified files, directories, or test patterns. ```xml tests/test_module.py pytest ``` (System will provide test results or error in ToolResponse) 12. **PythonREPL:** Executes Python code snippets in a REPL-like session. (Note: Be cautious with arbitrary code execution). ```xml user_specific_repl_id ``` (System will provide output/result or error in ToolResponse) 13. **CreateNamedSnapshot:** Takes a Git snapshot (branch) with a user-defined name and optional description. ```xml feature_x_before_refactor Snapshot before major refactoring of feature X. ``` (System will confirm snapshot creation or report error in ToolResponse) 14. **CompareSnapshots:** Shows the `git diff` between two specified snapshot branches (or any two branches/commits). ```xml snapshot_cog_20230101_120000_abc123 snapshot_cog_20230102_140000_def456 ``` (System will provide diff output or error in ToolResponse) 15. **DryRunApplyDiff:** Checks the outcome of applying a SEARCH/REPLACE diff (as defined for ApplyDiff) without actually modifying the file. ```xml path/to/file.ext ``` (System will report if patch would apply cleanly or show errors, without changing the file.) 16. **DryRunWriteFile:** Checks the validity of a path and permissions for a `WriteFile` operation without actual writing. ```xml path/to/new_or_existing_file.ext ``` (System will report if path is writable/creatable or any permission issues.) 17. **ReadWebPageRaw:** Reads the raw text content from a given URL. * Ideal for fetching raw code or data from services like GitHub raw user content. ```xml https://example.com/raw/file.txt ``` (System will provide page content or error in ToolResponse) 18. **TaskComplete:** Signals that the current multi-step task is considered complete by the AI. ```xml A brief summary of what was accomplished or the final status. ``` (System will acknowledge and stop the current interaction loop.) **Workflow and Rules:** - **Tool Preference:** For modifying existing files, ALWAYS prefer `ApplyDiff` if the changes are targeted. Use `WriteFile` for new files or if `ApplyDiff` is unsuitable or fails repeatedly. Consider `DryRunApplyDiff` or `DryRunWriteFile` first if unsure. - **Direct Operation:** You operate directly. No explicit user confirmation is needed for individual tool actions after the initial user prompt. - **Programmatic Snapshots (System-Managed):** - The system AUTOMATICALLY creates a Git snapshot (a temporary branch) of the project *before* executing `WriteFile` or `ApplyDiff` tools. - You will be notified by a "ToolResponse: SystemNotification..." message when a snapshot has been successfully created, right before your file modification tool is about to be truly processed. - You do NOT need to request these automatic snapshots yourself. - If the system fails to create a snapshot, it will inform you with a "ToolResponse: SystemError...". In such a case, your `WriteFile` or `ApplyDiff` operation will NOT proceed. You should then typically inform the user of this critical system failure. - **Named Snapshots (AI-Managed):** Use the `CreateNamedSnapshot` tool when you want to create a more permanent, named checkpoint (e.g., before a large refactoring). - **Git Workflow for Your Changes:** After you believe your coding task and all related file modifications are complete and correct, you MUST use the `ExecuteCommand` tool to perform the following Git operations in sequence: 1. `git add .` (to stage all your changes) 2. `git commit --author="AI Coding Agent Cog " -m "AI Agent: "` (You will generate the commit message part) 3. **Before pushing, attempt to integrate remote changes:** `git pull --rebase` 4. `git push` - **Commit Messages:** Ensure your commit messages are descriptive of the changes made. - **Conflict Resolution (Git Pull --rebase):** If `git pull --rebase` (executed via `ExecuteCommand`) results in merge conflicts, the `ToolResponse` will indicate this. You must then: a. Use `ReadFile` to inspect the conflicted file(s) to see the conflict markers. b. Decide on the resolution. c. Use `WriteFile` or `ApplyDiff` to apply your resolved version of the file(s). (Remember, a programmatic snapshot will be made before these tools run). d. Use `ExecuteCommand` for `git add ` for each resolved file. e. Use `ExecuteCommand` for `git rebase --continue`. f. Then attempt `git push` again using `ExecuteCommand`. - **Push Failures:** If `git push` still fails (e.g., other non-fast-forward errors), the `ToolResponse` will report this. You should then inform the user about the push failure and the reason, and await further instructions. - **Agent Operational Modes:** - The user can set your operational mode (e.g., `default`, `planning`, `debugging`, `learning`) via a Discord command. - When the mode changes, you will receive a `[System Notification]` message in the conversation history like: `[System Notification] Agent mode changed to 'planning'. Context: User wants to outline a new feature.` - Adapt your behavior based on the current mode and any provided context. For example: - `default/implementation`: Focus on direct execution of tasks and code changes. - `planning`: Focus on breaking down complex tasks, outlining steps, asking clarifying questions before diving into code. You might use `TaskComplete` more often with a plan rather than full execution. - `debugging`: Focus on analyzing errors, using tools like `ReadFile`, `LintFile`, `ExecuteCommand` (for logs), `PythonREPL` to diagnose issues. - `learning/exploration`: Focus on understanding new parts of the codebase using `ReadFile`, `ListFiles`, `GetCodeStructure`, `FindSymbolDefinition`, or using `WebSearch` for external information. - The core set of tools remains available in all modes, but your strategy for using them should adapt. - **Clarity:** Be clear and methodical. If a step fails, acknowledge it and decide on the next course of action. - **Focus:** Your goal is to complete the coding/file manipulation task as requested by the user, adapting to the current operational mode. """ class AICodeAgentCog(commands.Cog): def __init__(self, bot: commands.Bot): self.bot = bot self.genai_client = None self.agent_conversations: Dict[int, List[google_genai_types.Content]] = ( {} ) # User ID to conversation history self.agent_shell_sessions = defaultdict( lambda: { # For ExecuteCommand CWD tracking "cwd": os.getcwd(), "env": os.environ.copy(), } ) self.agent_modes: Dict[int, str] = ( {} ) # User ID to current agent mode (e.g., "default", "planning") self.agent_python_repl_sessions: Dict[str, Dict[str, Any]] = ( {} ) # session_id to {'globals': {}, 'locals': {}} # Initialize Google GenAI Client for Vertex AI if PROJECT_ID and LOCATION: try: self.genai_client = genai.Client( vertexai=True, project=PROJECT_ID, location=LOCATION, ) print( f"AICodeAgentCog: Google GenAI Client initialized for Vertex AI project '{PROJECT_ID}' in location '{LOCATION}'." ) except Exception as e: print( f"AICodeAgentCog: Error initializing Google GenAI Client for Vertex AI: {e}" ) self.genai_client = None # Ensure it's None on failure else: print( "AICodeAgentCog: PROJECT_ID or LOCATION not configured. Google GenAI Client not initialized." ) # AI Model Configuration self._ai_model: str = "gemini-2.5-flash-preview-05-20" # Default model self._available_models: Dict[str, str] = { "pro": "gemini-2.5-pro-preview-05-06", # Assuming this is the intended Pro model "flash": "gemini-2.5-flash-preview-05-20", } # User mentioned "gemini-2.5-pro-preview-05-06" and "gemini-2.5-flash-preview-05-20" # Updating to reflect those if they are the correct ones, otherwise the 1.5 versions are common. # For now, sticking to what was in the plan based on common Gemini models. # If 2.5 models are indeed what's intended and available, these strings should be updated. # Tavily Web Search Integration self.tavily_api_key: Optional[str] = os.getenv("TAVILY_API_KEY") self.tavily_client: Optional[TavilyClient] = None if self.tavily_api_key: self.tavily_client = TavilyClient(api_key=self.tavily_api_key) print("AICodeAgentCog: TavilyClient initialized.") else: print( "AICodeAgentCog: TAVILY_API_KEY not found. TavilyClient not initialized." ) self.tavily_search_depth: str = os.getenv( "TAVILY_DEFAULT_SEARCH_DEPTH", "basic" ) self.tavily_max_results: int = int(os.getenv("TAVILY_DEFAULT_MAX_RESULTS", "5")) @commands.command(name="codeagent_model") @commands.is_owner() async def set_ai_model_command(self, ctx: commands.Context, model_key: str): """Sets the AI model for the code agent. Usage: !codeagent_model """ model_key = model_key.lower() if model_key in self._available_models: self._ai_model = self._available_models[model_key] await ctx.send( f"AICodeAgent: AI model set to: {self._ai_model} (key: {model_key})" ) else: await ctx.send( f"AICodeAgent: Invalid model key '{model_key}'. Available keys: {', '.join(self._available_models.keys())}" ) @commands.command(name="codeagent_get_model") @commands.is_owner() async def get_ai_model_command(self, ctx: commands.Context): """Gets the current AI model for the code agent.""" await ctx.send(f"AICodeAgent: Current AI model is: {self._ai_model}") @commands.command(name="codeagent_clear") @commands.is_owner() async def clear_agent_history_command(self, ctx: commands.Context): """Clears the conversation history for the code agent for the calling user.""" user_id = ctx.author.id if user_id in self.agent_conversations: del self.agent_conversations[user_id] await ctx.send("AICodeAgent: Conversation history cleared for you.") else: await ctx.send( "AICodeAgent: No conversation history found for you to clear." ) @commands.command(name="codeagent_mode", aliases=["ca_mode"]) @commands.is_owner() async def codeagent_mode_command( self, ctx: commands.Context, mode_name: str, *, context_message: Optional[str] = None, ): """Sets the operational mode for the AI agent for the calling user. Usage: !codeagent_mode [optional context_message] Modes: default, planning, debugging, learning """ user_id = ctx.author.id mode_name = mode_name.lower() valid_modes = [ "default", "planning", "debugging", "learning", ] # Can be expanded if mode_name not in valid_modes: await ctx.send( f"AICodeAgent: Invalid mode '{mode_name}'. Valid modes are: {', '.join(valid_modes)}." ) return self.agent_modes[user_id] = mode_name mode_set_message = ( f"AICodeAgent: Operational mode for you set to '{mode_name}'." ) # Prepare system notification for AI history notification_text = ( f"[System Notification] Agent mode changed to '{mode_name}'." ) if context_message: notification_text += f" Context: {context_message}" mode_set_message += f" Context: {context_message}" # Add this notification to the AI's conversation history for this user # This ensures the AI is aware of the mode change for its next interaction self._add_to_conversation_history( user_id, role="user", text_content=notification_text ) # Treat as user input for AI to see await ctx.send(mode_set_message) print( f"AICodeAgentCog: User {user_id} set mode to '{mode_name}'. Notification added to history: {notification_text}" ) @commands.command(name="codeagent_get_mode", aliases=["ca_get_mode"]) @commands.is_owner() async def codeagent_get_mode_command(self, ctx: commands.Context): """Displays the current operational mode for the AI agent for the calling user.""" user_id = ctx.author.id current_mode = self.agent_modes.get( user_id, "default" ) # Default to "default" if not set await ctx.send( f"AICodeAgent: Your current operational mode is '{current_mode}'." ) async def _run_git_command(self, command_str: str) -> Tuple[bool, str]: """ Runs a Git command using subprocess.Popen in a thread and returns (success_status, output_string). """ # For Git commands, we generally want them to run in the bot's current working directory, # which should be the root of the Git repository. cwd = os.getcwd() env = os.environ.copy() print(f"AICodeAgentCog: Executing Git command: '{command_str}' in CWD: '{cwd}'") def run_sync_subprocess(): try: # For git commands, shell=False is safer if command_str is split into a list. # If command_str is a single string and might contain shell features (though unlikely for our git use), # shell=True would be needed, but then command_str must be trustworthy. # Given our specific git commands, splitting them is safer. # Simplified: if it's a simple git command, can pass as string with shell=True, # but better to split for shell=False. # For now, let's assume simple commands or trust shell=True for git. # However, the example used shell=True. Let's try that first for consistency with the hint. final_command_str = command_str if ( "commit" in command_str and "--author" in command_str ): # Heuristic to identify our commit commands # COMMIT_AUTHOR = "Name " author_name_match = re.match(r"^(.*?)\s*<(.+?)>$", COMMIT_AUTHOR) if author_name_match: committer_name = author_name_match.group(1).strip() committer_email = author_name_match.group(2).strip() # Prepend -c flags for committer identity # Ensure the original command_str is correctly modified. # If command_str starts with "git commit", we insert after "git". if command_str.strip().startswith("git commit"): parts = command_str.strip().split( " ", 1 ) # "git", "commit ..." final_command_str = f'{parts[0]} -c user.name="{committer_name}" -c user.email="{committer_email}" {parts[1]}' print( f"AICodeAgentCog: Modified commit command for committer ID: {final_command_str}" ) else: print( f"AICodeAgentCog: Warning - Could not parse COMMIT_AUTHOR ('{COMMIT_AUTHOR}') to set committer identity." ) proc = subprocess.Popen( final_command_str, # Potentially modified command string shell=True, # Execute through the shell cwd=cwd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, # Decodes stdout/stderr as text errors="replace", # Handles decoding errors ) stdout, stderr = proc.communicate( timeout=60 ) # 60-second timeout for git commands return (stdout, stderr, proc.returncode, False) except subprocess.TimeoutExpired: proc.kill() stdout, stderr = proc.communicate() return (stdout, stderr, -1, True) # -1 for timeout-specific return code except ( FileNotFoundError ) as fnf_err: # Specifically catch if 'git' command itself is not found print( f"AICodeAgentCog: FileNotFoundError for command '{final_command_str}': {fnf_err}. Is Git installed and in PATH?" ) return ( "", f"FileNotFoundError: {fnf_err}. Ensure Git is installed and in PATH.", -2, False, ) except Exception as e: print( f"AICodeAgentCog: Exception in run_sync_subprocess for '{final_command_str}': {type(e).__name__} - {e}" ) return ("", str(e), -3, False) # -3 for other exceptions stdout_str, stderr_str, returncode, timed_out = await asyncio.to_thread( run_sync_subprocess ) full_output = "" if timed_out: full_output += "Command timed out after 60 seconds.\n" if stdout_str: full_output += f"Stdout:\n{stdout_str.strip()}\n" if stderr_str: full_output += f"Stderr:\n{stderr_str.strip()}\n" if returncode == 0: # For commands like `git rev-parse --abbrev-ref HEAD`, stdout is the primary result. # If stdout is empty but no error, return it as is. # If full_output is just "Stdout:\n\n", it means empty stdout. # We want the actual stdout for rev-parse, not the "Stdout:" prefix. # Check original command_str for this specific case, not final_command_str which might be modified if ( command_str == "git rev-parse --abbrev-ref HEAD" and stdout_str ): # Use original command_str for this check return True, stdout_str.strip() # Return just the branch name return True, ( full_output.strip() if full_output.strip() else "Command executed successfully with no output." ) else: error_message = ( f"Git command failed. Return Code: {returncode}\n{full_output.strip()}" ) print(f"AICodeAgentCog: {error_message}") return False, error_message async def _create_programmatic_snapshot(self) -> Optional[str]: """Creates a programmatic Git snapshot using a temporary branch.""" try: # Get current branch name success, current_branch_name = await self._run_git_command( "git rev-parse --abbrev-ref HEAD" ) if not success: print( f"AICodeAgentCog: Failed to get current branch name for snapshot: {current_branch_name}" ) return None current_branch_name = current_branch_name.strip() timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") random_hex = random.randbytes(3).hex() snapshot_branch_name = f"snapshot_cog_{timestamp}_{random_hex}" # Create and checkout the new snapshot branch success, output = await self._run_git_command( f"git checkout -b {snapshot_branch_name}" ) if not success: print( f"AICodeAgentCog: Failed to create snapshot branch '{snapshot_branch_name}': {output}" ) # Attempt to switch back if checkout failed mid-operation await self._run_git_command( f"git checkout {current_branch_name}" ) # Best effort return None # Commit any currently staged changes or create an empty commit for a clean snapshot point commit_message = ( f"Cog Snapshot: Pre-AI Edit State on branch {snapshot_branch_name}" ) success, output = await self._run_git_command( f'git commit --author="{COMMIT_AUTHOR}" -m "{commit_message}" --allow-empty' ) if not success: print( f"AICodeAgentCog: Failed to commit snapshot on '{snapshot_branch_name}': {output}" ) # Attempt to switch back and clean up branch await self._run_git_command(f"git checkout {current_branch_name}") await self._run_git_command( f"git branch -D {snapshot_branch_name}" ) # Best effort cleanup return None # Switch back to the original branch success, output = await self._run_git_command( f"git checkout {current_branch_name}" ) if not success: print( f"AICodeAgentCog: CRITICAL - Failed to switch back to original branch '{current_branch_name}' after snapshot. Current branch might be '{snapshot_branch_name}'. Manual intervention may be needed. Error: {output}" ) return snapshot_branch_name # Return it so it can potentially be used/deleted print( f"AICodeAgentCog: Successfully created snapshot branch: {snapshot_branch_name}" ) return snapshot_branch_name except Exception as e: print(f"AICodeAgentCog: Exception in _create_programmatic_snapshot: {e}") return None @commands.command(name="codeagent_list_snapshots") @commands.is_owner() async def list_snapshots_command(self, ctx: commands.Context): """Lists available programmatic Git snapshots created by the cog.""" success, output = await self._run_git_command( 'git branch --list "snapshot_cog_*"' ) if success: if output: await ctx.send(f"Available snapshots:\n```\n{output}\n```") else: await ctx.send("No programmatic snapshots found.") else: await ctx.send(f"Error listing snapshots: {output}") @commands.command(name="codeagent_revert_to_snapshot") @commands.is_owner() async def revert_to_snapshot_command( self, ctx: commands.Context, snapshot_branch_name: str ): """Reverts the current branch to the state of a given snapshot branch.""" if not snapshot_branch_name.startswith("snapshot_cog_"): await ctx.send("Invalid snapshot name. Must start with 'snapshot_cog_'.") return # Check if snapshot branch exists success, branches_output = await self._run_git_command("git branch --list") # Normalize branches_output for reliable checking existing_branches = [ b.strip().lstrip("* ") for b in branches_output.splitlines() ] if not success or snapshot_branch_name not in existing_branches: await ctx.send(f"Snapshot branch '{snapshot_branch_name}' not found.") return await ctx.send( f"Attempting to revert current branch to snapshot '{snapshot_branch_name}'..." ) success, current_branch = await self._run_git_command( "git rev-parse --abbrev-ref HEAD" ) if not success: await ctx.send( f"Failed to determine current branch before revert: {current_branch}" ) return current_branch = current_branch.strip() success, output = await self._run_git_command( f"git reset --hard {snapshot_branch_name}" ) if success: await ctx.send( f"Successfully reverted current branch ('{current_branch}') to snapshot '{snapshot_branch_name}'.\nOutput:\n```\n{output}\n```" ) else: await ctx.send( f"Error reverting to snapshot '{snapshot_branch_name}':\n```\n{output}\n```" ) @commands.command(name="codeagent_delete_snapshot") @commands.is_owner() async def delete_snapshot_command( self, ctx: commands.Context, snapshot_branch_name: str ): """Deletes a programmatic Git snapshot branch.""" if not snapshot_branch_name.startswith("snapshot_cog_"): await ctx.send("Invalid snapshot name. Must start with 'snapshot_cog_'.") return success, branches_output = await self._run_git_command("git branch --list") existing_branches = [ b.strip().lstrip("* ") for b in branches_output.splitlines() ] if not success or snapshot_branch_name not in existing_branches: await ctx.send(f"Snapshot branch '{snapshot_branch_name}' not found.") return success, current_branch_name_str = await self._run_git_command( "git rev-parse --abbrev-ref HEAD" ) if success and current_branch_name_str.strip() == snapshot_branch_name: await ctx.send( f"Cannot delete snapshot branch '{snapshot_branch_name}' as it is the current branch. Please checkout to a different branch first." ) return elif not success: await ctx.send( f"Could not determine current branch. Deletion aborted for safety. Error: {current_branch_name_str}" ) return await ctx.send( f"Attempting to delete snapshot branch '{snapshot_branch_name}'..." ) success, output = await self._run_git_command( f"git branch -D {snapshot_branch_name}" ) if success: await ctx.send( f"Successfully deleted snapshot branch '{snapshot_branch_name}'.\nOutput:\n```\n{output}\n```" ) else: await ctx.send( f"Error deleting snapshot branch '{snapshot_branch_name}':\n```\n{output}\n```" ) def _get_conversation_history( self, user_id: int ) -> List[google_genai_types.Content]: if user_id not in self.agent_conversations: self.agent_conversations[user_id] = [] return self.agent_conversations[user_id] def _add_to_conversation_history( self, user_id: int, role: str, text_content: str, is_tool_response: bool = False ): history = self._get_conversation_history(user_id) # For Vertex AI, 'function' role is used for tool responses, 'model' for AI text, 'user' for user text. # We'll adapt this slightly for our inline tools. # AI's raw response (potentially with tool call) -> model # Tool's output -> user (formatted as "ToolResponse: ...") # User's direct prompt -> user # For simplicity in our loop, we might treat tool responses as if they are from the 'user' # to guide the AI's next step, or use a specific format the AI understands. # The system prompt already guides the AI to expect "ToolResponse:" # Let's ensure content is always a list of parts for Vertex parts = [google_genai_types.Part(text=text_content)] history.append(google_genai_types.Content(role=role, parts=parts)) # Keep history to a reasonable length (e.g., last 20 turns, or token-based limit later) max_history_items = 20 if len(history) > max_history_items: self.agent_conversations[user_id] = history[-max_history_items:] async def _parse_and_execute_tool_call( self, ctx: commands.Context, ai_response_text: str ) -> Tuple[str, Optional[str]]: """ Parses AI response for an XML tool call, executes it, and returns the tool's output string. Returns a tuple: (status: str, data: Optional[str]). Status can be "TOOL_OUTPUT", "TASK_COMPLETE", "NO_TOOL". Data is the tool output string, completion message, or original AI text. """ try: clean_ai_response_text = ai_response_text.strip() # Remove potential markdown ```xml ... ``` wrapper if clean_ai_response_text.startswith("```"): # More robustly remove potential ```xml ... ``` or just ``` ... ``` clean_ai_response_text = re.sub( r"^```(?:xml)?\s*\n?", "", clean_ai_response_text, flags=re.MULTILINE, ) clean_ai_response_text = re.sub( r"\n?```$", "", clean_ai_response_text, flags=re.MULTILINE ) clean_ai_response_text = clean_ai_response_text.strip() if ( not clean_ai_response_text or not clean_ai_response_text.startswith("<") or not clean_ai_response_text.endswith(">") ): return "NO_TOOL", ai_response_text root = ET.fromstring(clean_ai_response_text) tool_name = root.tag parameters = {child.tag: child.text for child in root} if tool_name == "ReadFile": file_path = parameters.get("path") if not file_path: return ( "TOOL_OUTPUT", "ToolResponse: Error\n---\nReadFile: Missing 'path' parameter.", ) tool_output = await self._execute_tool_read_file(file_path) return ( "TOOL_OUTPUT", f"ToolResponse: ReadFile\nPath: {file_path}\n---\n{tool_output}", ) elif tool_name == "WriteFile": file_path = parameters.get("path") content = parameters.get("content") # CDATA content will be in .text if file_path is None or content is None: return ( "TOOL_OUTPUT", "ToolResponse: Error\n---\nWriteFile: Missing 'path' or 'content' parameter.", ) snapshot_branch = await self._create_programmatic_snapshot() if not snapshot_branch: return ( "TOOL_OUTPUT", "ToolResponse: SystemError\n---\nFailed to create project snapshot. WriteFile operation aborted.", ) else: await ctx.send( f"AICodeAgent: [Info] Created snapshot: {snapshot_branch} before writing to {file_path}" ) tool_output = await self._execute_tool_write_file( file_path, content ) return ( "TOOL_OUTPUT", f"ToolResponse: WriteFile\nPath: {file_path}\n---\n{tool_output}", ) elif tool_name == "ApplyDiff": file_path = parameters.get("path") diff_block = parameters.get("diff_block") # CDATA content if file_path is None or diff_block is None: return ( "TOOL_OUTPUT", "ToolResponse: Error\n---\nApplyDiff: Missing 'path' or 'diff_block' parameter.", ) snapshot_branch = await self._create_programmatic_snapshot() if not snapshot_branch: return ( "TOOL_OUTPUT", "ToolResponse: SystemError\n---\nFailed to create project snapshot. ApplyDiff operation aborted.", ) else: await ctx.send( f"AICodeAgent: [Info] Created snapshot: {snapshot_branch} before applying diff to {file_path}" ) tool_output = await self._execute_tool_apply_diff( file_path, diff_block ) return ( "TOOL_OUTPUT", f"ToolResponse: ApplyDiff\nPath: {file_path}\n---\n{tool_output}", ) elif tool_name == "ExecuteCommand": command_str = parameters.get("command") if not command_str: return ( "TOOL_OUTPUT", "ToolResponse: Error\n---\nExecuteCommand: Missing 'command' parameter.", ) user_id = ctx.author.id tool_output = await self._execute_tool_execute_command( command_str, user_id ) return ( "TOOL_OUTPUT", f"ToolResponse: ExecuteCommand\nCommand: {command_str}\n---\n{tool_output}", ) elif tool_name == "ListFiles": file_path = parameters.get("path") recursive_str = parameters.get("recursive") recursive = recursive_str.lower() == "true" if recursive_str else False filter_extensions = parameters.get( "filter_extensions" ) # Optional: comma-separated string filter_regex_name = parameters.get( "filter_regex_name" ) # Optional: regex string include_metadata_str = parameters.get("include_metadata") include_metadata = ( include_metadata_str.lower() == "true" if include_metadata_str else False ) if not file_path: return ( "TOOL_OUTPUT", "ToolResponse: Error\n---\nListFiles: Missing 'path' parameter.", ) tool_output = await self._execute_tool_list_files( file_path, recursive, filter_extensions=filter_extensions, filter_regex_name=filter_regex_name, include_metadata=include_metadata, ) params_summary = [f"Recursive: {recursive}"] if filter_extensions: params_summary.append(f"Extensions: {filter_extensions}") if filter_regex_name: params_summary.append(f"RegexName: {filter_regex_name}") params_summary.append(f"Metadata: {include_metadata}") response_message = ( f"ToolResponse: ListFiles\nPath: {file_path}\n" + "\n".join(params_summary) ) response_message += f"\n---\n{tool_output}" return "TOOL_OUTPUT", response_message elif tool_name == "WebSearch": query_str = parameters.get("query") if not query_str: return ( "TOOL_OUTPUT", "ToolResponse: Error\n---\nWebSearch: Missing 'query' parameter.", ) tool_output = await self._execute_tool_web_search(query_str) return ( "TOOL_OUTPUT", f"ToolResponse: WebSearch\nQuery: {query_str}\n---\n{tool_output}", ) elif tool_name == "TaskComplete": message = parameters.get( "message", "Task marked as complete by AI." ) # Default if message tag is missing or empty return "TASK_COMPLETE", ( message if message is not None else "Task marked as complete by AI." ) elif tool_name == "LintFile": file_path = parameters.get("path") linter = parameters.get("linter", "pylint") # Default to pylint if not file_path: return ( "TOOL_OUTPUT", "ToolResponse: Error\n---\nLintFile: Missing 'path' parameter.", ) tool_output = await self._execute_tool_lint_file(file_path, linter) return ( "TOOL_OUTPUT", f"ToolResponse: LintFile\nPath: {file_path}\nLinter: {linter}\n---\n{tool_output}", ) elif tool_name == "GetCodeStructure": file_path = parameters.get("path") if not file_path: return ( "TOOL_OUTPUT", "ToolResponse: Error\n---\nGetCodeStructure: Missing 'path' parameter.", ) tool_output = await self._execute_tool_get_code_structure(file_path) return ( "TOOL_OUTPUT", f"ToolResponse: GetCodeStructure\nPath: {file_path}\n---\n{tool_output}", ) elif tool_name == "FindSymbolDefinition": symbol_name = parameters.get("symbol_name") search_path = parameters.get( "search_path", "." ) # Default to current dir (project root) file_pattern = parameters.get( "file_pattern", "*.py" ) # Default to Python files if not symbol_name: return ( "TOOL_OUTPUT", "ToolResponse: Error\n---\nFindSymbolDefinition: Missing 'symbol_name' parameter.", ) tool_output = await self._execute_tool_find_symbol_definition( symbol_name, search_path, file_pattern ) return ( "TOOL_OUTPUT", f"ToolResponse: FindSymbolDefinition\nSymbol: {symbol_name}\nPath: {search_path}\nPattern: {file_pattern}\n---\n{tool_output}", ) elif tool_name == "ManageCog": action = parameters.get("action") cog_name = parameters.get("cog_name") if not action: return ( "TOOL_OUTPUT", "ToolResponse: Error\n---\nManageCog: Missing 'action' parameter.", ) if action in ["load", "unload", "reload"] and not cog_name: return ( "TOOL_OUTPUT", f"ToolResponse: Error\n---\nManageCog: Missing 'cog_name' for action '{action}'.", ) tool_output = await self._execute_tool_manage_cog(action, cog_name) return ( "TOOL_OUTPUT", f"ToolResponse: ManageCog\nAction: {action}\nCog: {cog_name or 'N/A'}\n---\n{tool_output}", ) elif tool_name == "RunTests": test_path_or_pattern = parameters.get("test_path_or_pattern") framework = parameters.get("framework", "pytest") if not test_path_or_pattern: return ( "TOOL_OUTPUT", "ToolResponse: Error\n---\nRunTests: Missing 'test_path_or_pattern' parameter.", ) tool_output = await self._execute_tool_run_tests( test_path_or_pattern, framework ) return ( "TOOL_OUTPUT", f"ToolResponse: RunTests\nTarget: {test_path_or_pattern}\nFramework: {framework}\n---\n{tool_output}", ) elif tool_name == "PythonREPL": code_snippet = parameters.get("code_snippet") session_id_param = parameters.get("session_id") # AI might suggest one user_id = ctx.author.id # Use user_id for a persistent session if AI doesn't specify one, or combine them. # For simplicity, let's use user_id as the primary key for REPL sessions for now. # If AI provides session_id, it could be a sub-context within that user's REPL. # Let's make session_id for the tool map to user_id for now. repl_session_key = str( user_id ) # Or incorporate session_id_param if needed if not code_snippet: return ( "TOOL_OUTPUT", "ToolResponse: Error\n---\nPythonREPL: Missing 'code_snippet' parameter.", ) tool_output = await self._execute_tool_python_repl( code_snippet, repl_session_key ) return ( "TOOL_OUTPUT", f"ToolResponse: PythonREPL\nSession: {repl_session_key}\n---\n{tool_output}", ) elif tool_name == "CreateNamedSnapshot": snapshot_name = parameters.get("snapshot_name") description = parameters.get("description") # Optional if not snapshot_name: return ( "TOOL_OUTPUT", "ToolResponse: Error\n---\nCreateNamedSnapshot: Missing 'snapshot_name' parameter.", ) tool_output = await self._execute_tool_create_named_snapshot( snapshot_name, description ) return ( "TOOL_OUTPUT", f"ToolResponse: CreateNamedSnapshot\nName: {snapshot_name}\n---\n{tool_output}", ) elif tool_name == "CompareSnapshots": base_ref = parameters.get("base_ref") compare_ref = parameters.get("compare_ref") if not base_ref or not compare_ref: return ( "TOOL_OUTPUT", "ToolResponse: Error\n---\nCompareSnapshots: Missing 'base_ref' or 'compare_ref' parameter.", ) tool_output = await self._execute_tool_compare_snapshots( base_ref, compare_ref ) return ( "TOOL_OUTPUT", f"ToolResponse: CompareSnapshots\nBase: {base_ref}\nCompare: {compare_ref}\n---\n{tool_output}", ) elif tool_name == "DryRunApplyDiff": file_path = parameters.get("path") diff_block = parameters.get("diff_block") if not file_path or not diff_block: return ( "TOOL_OUTPUT", "ToolResponse: Error\n---\nDryRunApplyDiff: Missing 'path' or 'diff_block' parameter.", ) tool_output = await self._execute_tool_dry_run_apply_diff( file_path, diff_block ) return ( "TOOL_OUTPUT", f"ToolResponse: DryRunApplyDiff\nPath: {file_path}\n---\n{tool_output}", ) elif tool_name == "DryRunWriteFile": file_path = parameters.get("path") if not file_path: return ( "TOOL_OUTPUT", "ToolResponse: Error\n---\nDryRunWriteFile: Missing 'path' parameter.", ) tool_output = await self._execute_tool_dry_run_write_file(file_path) return ( "TOOL_OUTPUT", f"ToolResponse: DryRunWriteFile\nPath: {file_path}\n---\n{tool_output}", ) elif tool_name == "ReadWebPageRaw": url_param = parameters.get("url") if not url_param: return ( "TOOL_OUTPUT", "ToolResponse: Error\n---\nReadWebPageRaw: Missing 'url' parameter.", ) tool_output = await self._execute_tool_read_web_page_raw(url_param) return ( "TOOL_OUTPUT", f"ToolResponse: ReadWebPageRaw\nURL: {url_param}\n---\n{tool_output}", ) else: # Unknown tool name found in XML return ( "TOOL_OUTPUT", f"ToolResponse: Error\n---\nUnknown tool: {tool_name} in XML: {clean_ai_response_text[:200]}", ) except ET.ParseError: # Not valid XML # print(f"AICodeAgentCog: XML ParseError for response: {ai_response_text[:200]}") # Debugging return "NO_TOOL", ai_response_text except ( Exception ) as e: # Catch any other unexpected errors during parsing/dispatch print( f"AICodeAgentCog: Unexpected error in _parse_and_execute_tool_call: {type(e).__name__} - {e} for response {ai_response_text[:200]}" ) # import traceback # traceback.print_exc() # For more detailed debugging if needed return ( "TOOL_OUTPUT", f"ToolResponse: SystemError\n---\nError processing tool call: {type(e).__name__} - {e}", ) # --- Tool Execution Methods --- # (Implementations for _execute_tool_... methods remain the same) # This comment might be outdated after this change async def _execute_tool_read_file( self, path: str, start_line: Optional[int] = None, end_line: Optional[int] = None, peek_first_n_lines: Optional[int] = None, peek_last_n_lines: Optional[int] = None, ) -> str: print( f"AICodeAgentCog: _execute_tool_read_file for path: {path}, start: {start_line}, end: {end_line}, peek_first: {peek_first_n_lines}, peek_last: {peek_last_n_lines}" ) try: if not os.path.exists(path): return f"Error: File not found at '{path}'" if os.path.isdir(path): return f"Error: Path '{path}' is a directory, not a file." # Determine the operation based on parameters # Priority: peek_first > peek_last > start_line/end_line > full_read if peek_first_n_lines is not None and peek_first_n_lines > 0: with open(path, "r", encoding="utf-8", errors="replace") as f: lines = [] for i, line in enumerate(f): if i >= peek_first_n_lines: break lines.append(line) return ( "".join(lines) if lines else "File is empty or shorter than peek_first_n_lines." ) elif peek_last_n_lines is not None and peek_last_n_lines > 0: # This is inefficient for large files, but a simple approach for now. # A more efficient way would be to seek from the end and read backwards, # or use a deque with a maxlen. with open(path, "r", encoding="utf-8", errors="replace") as f: lines = f.readlines() # Reads all lines into memory return ( "".join(lines[-peek_last_n_lines:]) if lines else "File is empty." ) elif start_line is not None and start_line > 0: # start_line is 1-based with open(path, "r", encoding="utf-8", errors="replace") as f: lines = f.readlines() # Reads all lines start_idx = start_line - 1 # Convert to 0-based index if start_idx >= len(lines): return f"Error: start_line ({start_line}) is beyond the end of the file ({len(lines)} lines)." if end_line is not None and end_line > 0: # end_line is inclusive, so slice up to end_line (0-based end_idx = end_line) end_idx = end_line if end_idx < start_idx: return f"Error: end_line ({end_line}) cannot be before start_line ({start_line})." return "".join(lines[start_idx : min(end_idx, len(lines))]) else: # Read from start_line to the end of the file return "".join(lines[start_idx:]) else: # Default: read whole file with open(path, "r", encoding="utf-8", errors="replace") as f: content = f.read() return content except Exception as e: return f"Error reading file '{path}': {type(e).__name__} - {e}" async def _execute_tool_write_file(self, path: str, content: str) -> str: print(f"AICodeAgentCog: _execute_tool_write_file for path: {path}") # Actual implementation: try: # base_dir = os.path.abspath(".") # requested_path = os.path.abspath(os.path.join(base_dir, path)) # if not requested_path.startswith(base_dir): # return "Error: File path is outside the allowed project directory." os.makedirs( os.path.dirname(path) or ".", exist_ok=True ) # Ensure directory exists with open(path, "w", encoding="utf-8") as f: f.write(content) return f"Successfully wrote to file '{path}'." except Exception as e: return f"Error writing to file '{path}': {type(e).__name__} - {e}" async def _execute_tool_apply_diff(self, path: str, diff_block: str) -> str: print( f"AICodeAgentCog: Attempting _execute_tool_apply_diff (Search/Replace) for path: {path}" ) if not os.path.exists(path): return f"Error: File not found at '{path}' for applying diff." if os.path.isdir(path): return f"Error: Path '{path}' is a directory, cannot apply diff." try: with open(path, "r", encoding="utf-8") as f: file_lines = f.readlines() # Read all lines, keeping newlines except Exception as e: return f"Error reading file '{path}': {type(e).__name__} - {e}" try: # Pattern for a single search/replace operation block # Ensure :start_line: is captured, and search/replace content is captured non-greedily. # Content between ------- and ======= is search_content. # Content between ======= and >>>>>>> REPLACE is replace_content. operation_pattern = re.compile( r"^\s*\[SEARCH_BLOCK_START\]\s*\n" # Changed to match AI prompt r":start_line:(\d+)\s*\n" r"-------\s*\n" r"(.*?)" # search_content (group 2) r"\\=======\s*\n" # Changed to match AI prompt (\=======) r"(.*?)" # replace_content (group 3) r"\[REPLACE_BLOCK_END\]\s*$", # Changed to match AI prompt re.DOTALL | re.MULTILINE, ) operations = [] last_match_end = 0 for match in operation_pattern.finditer(diff_block): if match.start() < last_match_end: # Should not happen with finditer continue # Check for content between operations that is not part of a valid operation if diff_block[last_match_end : match.start()].strip(): return f"Error: Malformed diff_block. Unexpected content between operations near character {last_match_end}." start_line = int(match.group(1)) # Split search/replace content carefully to respect original newlines # The (.*?) captures content up to the next \n======= or \n>>>>>>> # We need to remove the structural newlines from the capture groups if they are included by DOTALL # Use regex capture groups directly for search and replace content search_c = match.group(2) replace_c = match.group(3) operations.append( { "start_line": start_line, "search": search_c, "replace": replace_c, "original_block_for_error": match.group(0)[ :200 ], # For error reporting } ) last_match_end = match.end() if not operations: if ( diff_block.strip() ): # If diff_block had content but no operations parsed return "Error: diff_block provided but no valid SEARCH/REPLACE operations found." else: # Empty diff_block return "Error: Empty diff_block provided." # Check if there's any trailing malformed content after the last valid operation if diff_block[last_match_end:].strip(): return f"Error: Malformed diff_block. Unexpected trailing content after last operation, near character {last_match_end}." # Sort operations by start_line in descending order to apply changes from bottom-up # This helps manage line number shifts correctly if replacements change the number of lines. operations.sort(key=lambda op: op["start_line"], reverse=True) applied_count = 0 for op in operations: start_line_0_indexed = op["start_line"] - 1 # Ensure search content exactly matches, including its own newlines search_content_lines = op["search"].splitlines( True ) # Keep newlines for comparison if ( not search_content_lines and op["search"] ): # Search content is not empty but has no newlines (e.g. "foo") search_content_lines = [op["search"]] elif not op["search"]: # Empty search string search_content_lines = [""] num_search_lines = len(search_content_lines) if ( start_line_0_indexed < 0 or ( start_line_0_indexed + num_search_lines > len(file_lines) and num_search_lines > 0 ) or ( start_line_0_indexed > len(file_lines) and num_search_lines == 0 and op["search"] == "" ) ): # check for empty search at end of file # Special case: if search is empty and start_line is one past the end, it's an append if not ( op["search"] == "" and start_line_0_indexed == len(file_lines) ): return f"Error: Operation for line {op['start_line']} (0-indexed {start_line_0_indexed}) with {num_search_lines} search lines is out of file bounds (total lines: {len(file_lines)}). Block: {op['original_block_for_error']}..." actual_file_segment_lines = file_lines[ start_line_0_indexed : start_line_0_indexed + num_search_lines ] actual_file_segment_content = "".join(actual_file_segment_lines) # Exact match, including newlines. if actual_file_segment_content == op["search"]: replace_content_lines = op["replace"].splitlines(True) if ( not replace_content_lines and op["replace"] ): # Replace content is not empty but has no newlines replace_content_lines = [op["replace"]] file_lines[ start_line_0_indexed : start_line_0_indexed + num_search_lines ] = replace_content_lines applied_count += 1 else: # For better error reporting: expected_repr = repr(op["search"]) found_repr = repr(actual_file_segment_content) max_len = 100 if len(expected_repr) > max_len: expected_repr = expected_repr[:max_len] + "..." if len(found_repr) > max_len: found_repr = found_repr[:max_len] + "..." return ( f"Error: Search content mismatch at line {op['start_line']}.\n" f"Expected: {expected_repr}\n" f"Found : {found_repr}\n" f"Original Block Hint: {op['original_block_for_error']}..." ) if applied_count == len(operations): try: with open(path, "w", encoding="utf-8") as f: f.writelines(file_lines) return f"Successfully applied {applied_count} SEARCH/REPLACE operation(s) to '{path}'." except Exception as e: return f"Error writing changes to file '{path}': {type(e).__name__} - {e}" else: # This case should ideally be caught by the mismatch error above. return f"Error: Not all operations could be applied. Applied {applied_count} out of {len(operations)}." except Exception as e: # General error during parsing or application logic return f"Error applying SEARCH/REPLACE diff to '{path}': {type(e).__name__} - {e}" async def _execute_tool_execute_command(self, command: str, user_id: int) -> str: session = self.agent_shell_sessions[user_id] cwd = session["cwd"] env = session["env"] print( f"AICodeAgentCog: Attempting _execute_tool_execute_command for user_id {user_id}: '{command}' in CWD: '{cwd}'" ) # Mirroring shell_command_cog.py's command allowance check allowed, reason = is_command_allowed_agent(command) if not allowed: return f"⛔ Command not allowed: {reason}" # Mirroring shell_command_cog.py's settings timeout_seconds = 30.0 max_output_length = 1900 def run_agent_subprocess_sync( cmd_str, current_cwd, current_env, cmd_timeout_secs ): try: proc = subprocess.Popen( cmd_str, shell=True, cwd=current_cwd, env=current_env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) try: stdout, stderr = proc.communicate(timeout=cmd_timeout_secs) return ( stdout, stderr, proc.returncode, False, ) # stdout, stderr, rc, timed_out except subprocess.TimeoutExpired: proc.kill() # Communicate again to fetch any output after kill stdout, stderr = proc.communicate() return ( stdout, stderr, -1, True, ) # Using -1 for timeout rc, as in shell_command_cog except Exception as e: # Capture other exceptions during Popen or initial communicate return ( b"", str(e).encode("utf-8", errors="replace"), -2, False, ) # -2 for other errors try: # Execute the synchronous subprocess logic in a separate thread stdout_bytes, stderr_bytes, returncode, timed_out = await asyncio.to_thread( run_agent_subprocess_sync, command, cwd, env, timeout_seconds ) # Update session working directory if 'cd' command was used and it was successful # This logic is from the previous iteration and is similar to shell_command_cog's attempt if command.strip().startswith("cd ") and returncode == 0: new_dir_arg_str = command.strip()[len("cd ") :].strip() potential_new_cwd = None # Handle 'cd' with no arguments (e.g. 'cd' or 'cd ~') - typically goes to home if ( not new_dir_arg_str or new_dir_arg_str == "~" or new_dir_arg_str == "$HOME" ): potential_new_cwd = os.path.expanduser("~") elif new_dir_arg_str == "-": # 'cd -' (previous directory) is hard to track reliably without more state, # so we won't update cwd for it, similar to shell_command_cog's limitations. print( f"AICodeAgentCog: 'cd -' used by user_id {user_id}. CWD tracking will not update for this command." ) else: # For 'cd ' temp_arg = new_dir_arg_str # Remove quotes if present if (temp_arg.startswith('"') and temp_arg.endswith('"')) or ( temp_arg.startswith("'") and temp_arg.endswith("'") ): temp_arg = temp_arg[1:-1] if os.path.isabs(temp_arg): potential_new_cwd = temp_arg else: potential_new_cwd = os.path.abspath(os.path.join(cwd, temp_arg)) if potential_new_cwd and os.path.isdir(potential_new_cwd): session["cwd"] = potential_new_cwd print( f"AICodeAgentCog: Updated CWD for user_id {user_id} to: {session['cwd']}" ) elif new_dir_arg_str and new_dir_arg_str != "-" and potential_new_cwd: print( f"AICodeAgentCog: 'cd' command for user_id {user_id} seemed to succeed (rc=0), but CWD tracking logic could not confirm new path '{potential_new_cwd}' or it's not a directory. CWD remains '{session['cwd']}'. Command: '{command}'." ) elif ( new_dir_arg_str and new_dir_arg_str != "-" ): # if potential_new_cwd was None but arg was given print( f"AICodeAgentCog: 'cd' command for user_id {user_id} with arg '{new_dir_arg_str}' succeeded (rc=0), but path resolution for CWD tracking failed. CWD remains '{session['cwd']}'." ) # Format Output identically to shell_command_cog.py's _execute_local_command result_parts = [] stdout_str = stdout_bytes.decode("utf-8", errors="replace").strip() stderr_str = stderr_bytes.decode("utf-8", errors="replace").strip() if timed_out: result_parts.append( f"⏱️ Command timed out after {timeout_seconds} seconds." ) if stdout_str: if len(stdout_str) > max_output_length: stdout_str = ( stdout_str[:max_output_length] + "... (output truncated)" ) result_parts.append(f"📤 **STDOUT:**\n```\n{stdout_str}\n```") if stderr_str: if len(stderr_str) > max_output_length: stderr_str = ( stderr_str[:max_output_length] + "... (output truncated)" ) result_parts.append(f"⚠️ **STDERR:**\n```\n{stderr_str}\n```") if ( returncode != 0 and not timed_out ): # Don't add exit code if it was a timeout result_parts.append(f"❌ **Exit Code:** {returncode}") else: # Successful or timed out (timeout message already added) if ( not result_parts ): # No stdout, no stderr, not timed out, and successful result_parts.append("✅ Command executed successfully (no output).") return "\n".join(result_parts) except Exception as e: # General exception during subprocess handling return f"Exception executing command '{command}': {type(e).__name__} - {e}" async def _execute_tool_list_files( self, path: str, recursive: bool, filter_extensions: Optional[str] = None, filter_regex_name: Optional[str] = None, include_metadata: bool = False, ) -> str: print( f"AICodeAgentCog: _execute_tool_list_files for path: {path}, recursive: {recursive}, ext: {filter_extensions}, regex: {filter_regex_name}, meta: {include_metadata}" ) # TODO: Implement filtering (filter_extensions, filter_regex_name) and metadata (include_metadata) try: if not os.path.exists(path): return f"Error: Path not found at '{path}'" if not os.path.isdir(path): return f"Error: Path '{path}' is not a directory." file_list_results = [] excluded_dirs = { "__pycache__", ".git", ".vscode", ".idea", "node_modules", "venv", ".env", "terminal_images", } extensions_to_filter = [] if filter_extensions: extensions_to_filter = [ ext.strip().lower() for ext in filter_extensions.split(",") if ext.strip() ] name_regex_pattern = None if filter_regex_name: try: name_regex_pattern = re.compile(filter_regex_name) except re.error as e: return f"Error: Invalid regex for name filtering: {e}" items_processed = 0 max_items_to_list = 500 # Safety break if recursive: for root, dirs, files in os.walk(path, topdown=True): if items_processed > max_items_to_list: break # Exclude specified directories from further traversal dirs[:] = [ d for d in dirs if d not in excluded_dirs and (not name_regex_pattern or name_regex_pattern.search(d)) ] for name in files: if items_processed > max_items_to_list: break if name_regex_pattern and not name_regex_pattern.search(name): continue if extensions_to_filter and not any( name.lower().endswith(ext) for ext in extensions_to_filter ): continue full_path = os.path.join(root, name) entry = full_path if include_metadata: try: stat = os.stat(full_path) entry += f" (Size: {stat.st_size} B, Modified: {datetime.datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S')})" except OSError: entry += " (Metadata N/A)" file_list_results.append(entry) items_processed += 1 for ( name ) in ( dirs ): # These are already filtered and regex matched (if regex provided for dirs) if items_processed > max_items_to_list: break # No extension filter for dirs, regex already applied full_path = os.path.join(root, name) entry = full_path + os.sep if include_metadata: try: stat = os.stat(full_path) entry += f" (Modified: {datetime.datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S')})" except OSError: entry += " (Metadata N/A)" file_list_results.append(entry) items_processed += 1 else: # Non-recursive case for item in os.listdir(path): if items_processed > max_items_to_list: break if item in excluded_dirs: continue if name_regex_pattern and not name_regex_pattern.search(item): continue full_item_path = os.path.join(path, item) is_dir = os.path.isdir(full_item_path) if ( not is_dir and extensions_to_filter and not any( item.lower().endswith(ext) for ext in extensions_to_filter ) ): continue entry = item + (os.sep if is_dir else "") if include_metadata: try: stat = os.stat(full_item_path) if is_dir: entry += f" (Modified: {datetime.datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S')})" else: entry += f" (Size: {stat.st_size} B, Modified: {datetime.datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S')})" except OSError: entry += " (Metadata N/A)" file_list_results.append(entry) items_processed += 1 if items_processed > max_items_to_list: file_list_results.append( f"... (truncated, listed {max_items_to_list} items)" ) return ( "\n".join(file_list_results) if file_list_results else "No files or directories found matching criteria." ) except Exception as e: return f"Error listing files at '{path}': {type(e).__name__} - {e}" async def _execute_tool_web_search(self, query: str) -> str: print( f"AICodeAgentCog: _execute_tool_web_search for query: {query}" ) # Removed "Placeholder" if not self.tavily_client: return "Error: Tavily client not initialized. Cannot perform web search." try: # Using basic parameters for now, can be expanded response = await asyncio.to_thread( self.tavily_client.search, query=query, search_depth=self.tavily_search_depth, # "basic" or "advanced" max_results=self.tavily_max_results, include_answer=True, # Try to get a direct answer ) results_str_parts = [] if response.get("answer"): results_str_parts.append(f"Answer: {response['answer']}") if response.get("results"): for i, res in enumerate( response["results"][: self.tavily_max_results] ): # Show up to max_results results_str_parts.append( f"\nResult {i+1}: {res.get('title', 'N/A')}\nURL: {res.get('url', 'N/A')}\nSnippet: {res.get('content', 'N/A')[:250]}..." ) # Truncate snippet return ( "\n".join(results_str_parts) if results_str_parts else "No search results found." ) except Exception as e: return f"Error during Tavily web search for '{query}': {type(e).__name__} - {e}" # --- Placeholder New Tool Execution Methods --- async def _execute_tool_lint_file(self, path: str, linter: str) -> str: if not os.path.exists(path): return f"Error: File not found at '{path}' for linting." if not os.path.isfile(path): return f"Error: Path '{path}' is not a file." linter_cmd = [] if linter.lower() == "pylint": linter_cmd = ["pylint", path] elif linter.lower() == "flake8": linter_cmd = ["flake8", path] else: return f"Error: Unsupported linter '{linter}'. Supported linters: pylint, flake8." try: process = await asyncio.create_subprocess_exec( *linter_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout, stderr = await process.communicate() output_str = "" if stdout: output_str += ( f"Linter ({linter}) STDOUT:\n{stdout.decode(errors='replace')}\n" ) if stderr: # Linters often output to stderr for warnings/errors output_str += ( f"Linter ({linter}) STDERR:\n{stderr.decode(errors='replace')}\n" ) if not output_str and process.returncode == 0: output_str = f"Linter ({linter}) found no issues." elif not output_str and process.returncode != 0: output_str = f"Linter ({linter}) exited with code {process.returncode} but no output." return output_str except FileNotFoundError: return f"Error: Linter command '{linter_cmd[0]}' not found. Please ensure it is installed and in PATH." except Exception as e: return ( f"Error running linter '{linter}' on '{path}': {type(e).__name__} - {e}" ) async def _execute_tool_get_code_structure(self, path: str) -> str: # Basic AST parsing example try: if not path.endswith(".py"): return "Error: GetCodeStructure currently only supports Python files." with open(path, "r", encoding="utf-8") as source_file: source_code = source_file.read() tree = ast.parse(source_code) structure = [] for node in ast.walk(tree): if isinstance(node, ast.FunctionDef): args = [arg.arg for arg in node.args.args] structure.append( f"Function: {node.name}({', '.join(args)}) - Docstring: {ast.get_docstring(node) or 'N/A'}" ) elif isinstance(node, ast.AsyncFunctionDef): args = [arg.arg for arg in node.args.args] structure.append( f"Async Function: {node.name}({', '.join(args)}) - Docstring: {ast.get_docstring(node) or 'N/A'}" ) elif isinstance(node, ast.ClassDef): structure.append( f"Class: {node.name} - Docstring: {ast.get_docstring(node) or 'N/A'}" ) return ( "\n".join(structure) if structure else "No major structures (classes/functions) found." ) except Exception as e: return ( f"Error parsing code structure for '{path}': {type(e).__name__} - {e}" ) async def _execute_tool_find_symbol_definition( self, symbol_name: str, search_path: str, file_pattern: str ) -> str: if not os.path.exists(search_path): return f"Error: Search path '{search_path}' not found." if not os.path.isdir(search_path): return f"Error: Search path '{search_path}' is not a directory." # Using findstr for Windows. It's less flexible with patterns than grep. # findstr /S /N /P /C:"search string" files_to_search # /S: searches subdirectories # /N: prints line numbers # /P: skips files with non-printable characters # /C:"string": uses string as a literal search string # files_to_search can include wildcards, e.g., *.py # Construct the files_to_search argument. # If file_pattern is like "*.py", it can be directly appended. # os.path.join will correctly handle path separators. files_to_search_arg = os.path.join(search_path, file_pattern) # Escape the symbol name for command line if it contains special characters, though /C should treat it literally. # For simplicity, we assume symbol_name doesn't need complex shell escaping here. find_cmd = [ "findstr", "/S", "/N", "/P", f"/C:{symbol_name}", files_to_search_arg, ] try: process = await asyncio.create_subprocess_exec( *find_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=os.getcwd(), # Run from bot's root, search_path should be relative or absolute ) stdout, stderr = await process.communicate(timeout=30) # 30-second timeout output_str = "" if stdout: output_str += f"Definitions found for '{symbol_name}' (using findstr):\n{stdout.decode(errors='replace')}\n" if stderr: output_str += f"Findstr STDERR:\n{stderr.decode(errors='replace')}\n" if ( not output_str and process.returncode == 0 ): # findstr returns 0 if found, 1 if not found, 2 for error output_str = f"No definitions found for '{symbol_name}' in '{search_path}/{file_pattern}' (findstr found nothing but exited cleanly)." elif not output_str and process.returncode == 1: # Explicit "not found" output_str = f"No definitions found for '{symbol_name}' in '{search_path}/{file_pattern}'." elif process.returncode not in [0, 1]: # Other errors output_str += f"Findstr exited with code {process.returncode}." return output_str except FileNotFoundError: return "Error: Command 'findstr' not found. This tool currently relies on findstr (Windows)." except subprocess.TimeoutExpired: return f"Error: FindSymbolDefinition command timed out after 30 seconds for symbol '{symbol_name}'." except Exception as e: return f"Error running FindSymbolDefinition for '{symbol_name}': {type(e).__name__} - {e}" async def _execute_tool_manage_cog( self, action: str, cog_name: Optional[str] ) -> str: action = action.lower() try: if action == "list": loaded_cogs = list(self.bot.cogs.keys()) return ( f"Loaded cogs: {', '.join(loaded_cogs)}" if loaded_cogs else "No cogs currently loaded." ) if not cog_name: # Should be caught by parser, but defensive return ( "Error: cog_name is required for load, unload, or reload actions." ) if action == "load": await self.bot.load_extension(cog_name) return f"Successfully loaded cog: {cog_name}" elif action == "unload": await self.bot.unload_extension(cog_name) return f"Successfully unloaded cog: {cog_name}" elif action == "reload": await self.bot.reload_extension(cog_name) return f"Successfully reloaded cog: {cog_name}" else: return f"Error: Unknown action '{action}' for ManageCog." except commands.ExtensionNotFound: return f"Error: Cog '{cog_name}' not found." except commands.ExtensionAlreadyLoaded: return f"Error: Cog '{cog_name}' is already loaded." except commands.ExtensionNotLoaded: return f"Error: Cog '{cog_name}' is not loaded." except commands.NoEntryPointError: return f"Error: Cog '{cog_name}' does not have a setup function." except Exception as e: return f"Error during ManageCog action '{action}' on '{cog_name}': {type(e).__name__} - {e}" async def _execute_tool_run_tests( self, test_path_or_pattern: str, framework: str ) -> str: framework = framework.lower() test_cmd = [] if framework == "pytest": test_cmd = ["pytest", test_path_or_pattern] elif framework == "unittest": # Basic unittest invocation. Might need more complex discovery for patterns. # python -m unittest test_module.TestClass.test_method # python -m unittest discover -s project_directory -p 'test_*.py' # For simplicity, assume test_path_or_pattern is directly usable by `python -m unittest` test_cmd = ["python", "-m", "unittest", test_path_or_pattern] else: return f"Error: Unsupported test framework '{framework}'. Supported: pytest, unittest." try: process = await asyncio.create_subprocess_exec( *test_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=os.getcwd(), # Run tests from the project root ) stdout, stderr = await process.communicate( timeout=300 ) # 5-minute timeout for tests output_str = "" if stdout: output_str += ( f"Test ({framework}) STDOUT:\n{stdout.decode(errors='replace')}\n" ) if stderr: output_str += ( f"Test ({framework}) STDERR:\n{stderr.decode(errors='replace')}\n" ) if not output_str: output_str = f"Test ({framework}) command executed with no output. Exit code: {process.returncode}" else: output_str += ( f"\nTest ({framework}) command exit code: {process.returncode}" ) return output_str except FileNotFoundError: cmd_not_found = test_cmd[0] if framework == "unittest" and cmd_not_found == "python": cmd_not_found = "python interpreter" return f"Error: Test command '{cmd_not_found}' not found. Please ensure it is installed and in PATH." except subprocess.TimeoutExpired: return f"Error: Tests timed out after 300 seconds for target '{test_path_or_pattern}'." except Exception as e: return f"Error running tests for '{test_path_or_pattern}' with {framework}: {type(e).__name__} - {e}" async def _execute_tool_python_repl( self, code_snippet: str, session_key: str ) -> str: # Basic, insecure exec-based REPL. CAUTION ADVISED. # A proper implementation would use a sandboxed environment. if session_key not in self.agent_python_repl_sessions: self.agent_python_repl_sessions[session_key] = { "globals": globals().copy(), "locals": {}, } session_env = self.agent_python_repl_sessions[session_key] # Capture stdout for the REPL import io from contextlib import redirect_stdout f = io.StringIO() try: with redirect_stdout(f): exec(code_snippet, session_env["globals"], session_env["locals"]) output = f.getvalue() return ( f"Output:\n{output}" if output else "Executed successfully with no direct output." ) except Exception as e: return f"Error in PythonREPL: {type(e).__name__} - {e}" finally: f.close() async def _execute_tool_create_named_snapshot( self, snapshot_name: str, description: Optional[str] ) -> str: # Similar to _create_programmatic_snapshot but uses the given name and doesn't switch back. # It creates a branch and commits. try: # Sanitize snapshot_name (Git branch names have restrictions) # A simple sanitization: replace spaces and invalid chars with underscores safe_snapshot_name = re.sub(r"[^\w.-]", "_", snapshot_name) if not safe_snapshot_name: return "Error: Invalid snapshot name after sanitization (empty)." # Check if branch already exists success_check, existing_branches_str = await self._run_git_command( f"git branch --list {safe_snapshot_name}" ) if success_check and safe_snapshot_name in existing_branches_str: return f"Error: Snapshot branch '{safe_snapshot_name}' already exists." # Create the new snapshot branch from current HEAD success, output = await self._run_git_command( f"git branch {safe_snapshot_name}" ) if not success: return f"Error: Failed to create snapshot branch '{safe_snapshot_name}': {output}" # Commit on this new branch (can be an empty commit if no changes, or commit current state) # For simplicity, let's make it an empty commit with the description. # A more advanced version might commit staged changes onto this branch. commit_message = f"AI Named Snapshot: {snapshot_name}" if description: commit_message += f"\n\n{description}" # To commit on the new branch, we'd typically checkout to it first, commit, then checkout back. # Or, create commit on new_branch_name using 'git commit-tree' and then 'git update-ref'. # Simpler: checkout, commit, checkout back (if desired, or leave on new branch). # The prompt implies it's separate from automatic snapshots, so maybe it stays on this branch or user manages. # Let's assume for now it just creates the branch and a commit on it, leaving current branch as is. # This requires creating a commit object pointing to current HEAD and then updating the branch ref. # Alternative: create branch, then switch to it, commit, switch back. current_branch_success, current_branch_name = await self._run_git_command( "git rev-parse --abbrev-ref HEAD" ) if not current_branch_success: await self._run_git_command( f"git branch -D {safe_snapshot_name}" ) # cleanup return f"Error: Could not get current branch name before creating named snapshot: {current_branch_name}" success, output = await self._run_git_command( f"git checkout {safe_snapshot_name}" ) if not success: await self._run_git_command( f"git branch -D {safe_snapshot_name}" ) # cleanup return f"Error: Failed to checkout to new snapshot branch '{safe_snapshot_name}': {output}" success, output = await self._run_git_command( f'git commit --author="{COMMIT_AUTHOR}" -m "{commit_message}" --allow-empty' ) if not success: # Attempt to switch back before reporting error await self._run_git_command( f"git checkout {current_branch_name.strip()}" ) # Optionally delete the branch if commit failed # await self._run_git_command(f"git branch -D {safe_snapshot_name}") return f"Error: Failed to commit on snapshot branch '{safe_snapshot_name}': {output}" # Switch back to original branch success_back, output_back = await self._run_git_command( f"git checkout {current_branch_name.strip()}" ) if not success_back: # This is problematic, user might be left on snapshot branch return f"Successfully created and committed snapshot '{safe_snapshot_name}', but FAILED to switch back to original branch '{current_branch_name.strip()}'. Current branch is now '{safe_snapshot_name}'. Details: {output_back}" return f"Successfully created named snapshot: {safe_snapshot_name}" except Exception as e: return f"Error creating named snapshot '{snapshot_name}': {type(e).__name__} - {e}" async def _execute_tool_compare_snapshots( self, base_ref: str, compare_ref: str ) -> str: success, output = await self._run_git_command( f"git diff {base_ref}..{compare_ref}" ) if success: return f"Diff between '{base_ref}' and '{compare_ref}':\n```diff\n{output or 'No differences found.'}\n```" else: return ( f"Error comparing snapshots '{base_ref}' and '{compare_ref}': {output}" ) async def _execute_tool_dry_run_apply_diff(self, path: str, diff_block: str) -> str: print( f"AICodeAgentCog: Attempting _execute_tool_dry_run_apply_diff (Search/Replace) for path: {path}" ) if not os.path.exists(path): return f"Error: File not found at '{path}' for dry-run applying diff." if os.path.isdir(path): return f"Error: Path '{path}' is a directory, cannot dry-run apply diff." try: with open(path, "r", encoding="utf-8") as f: file_lines = f.readlines() # Read all lines, keeping newlines except Exception as e: return f"Error reading file '{path}' for dry-run: {type(e).__name__} - {e}" try: # Pattern for a single search/replace operation block (same as apply_diff) operation_pattern = re.compile( r"^\s*\[SEARCH_BLOCK_START\]\s*\n" # Changed to match AI prompt r":start_line:(\d+)\s*\n" r"-------\s*\n" r"(.*?)" # search_content (group 2) r"\\=======\s*\n" # Changed to match AI prompt (\=======) r"(.*?)" # replace_content (group 3) r"\[REPLACE_BLOCK_END\]\s*$", # Changed to match AI prompt re.DOTALL | re.MULTILINE, ) operations = [] last_match_end = 0 for match in operation_pattern.finditer(diff_block): if match.start() < last_match_end: continue if diff_block[last_match_end : match.start()].strip(): return f"Dry run Error: Malformed diff_block. Unexpected content between operations near character {last_match_end}." start_line = int(match.group(1)) # Use regex capture groups directly for search and replace content search_c = match.group(2) replace_c = match.group(3) operations.append( { "start_line": start_line, "search": search_c, "replace": replace_c, "original_block_for_error": match.group(0)[:200], } ) last_match_end = match.end() if not operations: if diff_block.strip(): return "Dry run Error: diff_block provided but no valid SEARCH/REPLACE operations found." else: return "Dry run Error: Empty diff_block provided." if diff_block[last_match_end:].strip(): return f"Dry run Error: Malformed diff_block. Unexpected trailing content after last operation, near character {last_match_end}." # No sorting needed for dry run as we are not modifying the list in place during iteration. # We just check each operation independently against the original file content. checked_ops_count = 0 for op_idx, op in enumerate(operations): start_line_0_indexed = op["start_line"] - 1 search_content_lines = op["search"].splitlines(True) if not search_content_lines and op["search"]: search_content_lines = [op["search"]] elif not op["search"]: search_content_lines = [""] num_search_lines = len(search_content_lines) if ( start_line_0_indexed < 0 or ( start_line_0_indexed + num_search_lines > len(file_lines) and num_search_lines > 0 ) or ( start_line_0_indexed > len(file_lines) and num_search_lines == 0 and op["search"] == "" ) ): if not ( op["search"] == "" and start_line_0_indexed == len(file_lines) ): # append case return ( f"Dry run Error: Operation {op_idx+1} for line {op['start_line']} " f"(0-indexed {start_line_0_indexed}) with {num_search_lines} search lines " f"is out of file bounds (total lines: {len(file_lines)}). " f"Block: {op['original_block_for_error']}..." ) actual_file_segment_lines = file_lines[ start_line_0_indexed : start_line_0_indexed + num_search_lines ] actual_file_segment_content = "".join(actual_file_segment_lines) if actual_file_segment_content == op["search"]: checked_ops_count += 1 else: expected_repr = repr(op["search"]) found_repr = repr(actual_file_segment_content) max_len = 100 if len(expected_repr) > max_len: expected_repr = expected_repr[:max_len] + "..." if len(found_repr) > max_len: found_repr = found_repr[:max_len] + "..." return ( f"Dry run Error: Search content mismatch for operation {op_idx+1} at line {op['start_line']}.\n" f"Expected: {expected_repr}\n" f"Found : {found_repr}\n" f"Original Block Hint: {op['original_block_for_error']}..." ) if checked_ops_count == len(operations): return f"Dry run: All {len(operations)} SEARCH/REPLACE operation(s) would apply cleanly to '{path}'." else: # This state should ideally not be reached if mismatches return early. return f"Dry run Error: Not all operations would apply cleanly. Checked {checked_ops_count} of {len(operations)}." except Exception as e: return f"Error during DryRunApplyDiff (Search/Replace) for '{path}': {type(e).__name__} - {e}" async def _execute_tool_dry_run_write_file(self, path: str) -> str: try: p = pathlib.Path(path) # Check if parent directory exists and is writable parent_dir = p.parent if not parent_dir.exists(): # Check if we can create the parent directory try: # Attempt to create a dummy temp dir to check if parent is creatable # This is a bit complex; simpler check: can we write to grandparent? # For now, just report if parent doesn't exist. return f"Dry run: Parent directory '{parent_dir}' does not exist. Write would likely create it if permissions allow." except Exception: # Broad exception for permission issues with parent pass # Fall through to os.access checks if p.exists(): # File exists if os.access(path, os.W_OK): return f"Dry run: File '{path}' exists and is writable." else: return f"Dry run: File '{path}' exists but is NOT writable (permission error)." else: # File does not exist, check if directory is writable if os.access(parent_dir, os.W_OK): return f"Dry run: File '{path}' does not exist, but directory '{parent_dir}' is writable. File can likely be created." else: return f"Dry run: File '{path}' does not exist, and directory '{parent_dir}' is NOT writable (permission error)." except Exception as e: return f"Error during DryRunWriteFile check for '{path}': {type(e).__name__} - {e}" async def _execute_tool_read_web_page_raw(self, url: str) -> str: print(f"AICodeAgentCog: _execute_tool_read_web_page_raw for URL: {url}") if not url.startswith(("http://", "https://")): return "Error: Invalid URL. Must start with http:// or https://" try: async with aiohttp.ClientSession() as session: # Set a timeout for the request timeout = aiohttp.ClientTimeout(total=30) # 30 seconds total timeout async with session.get(url, timeout=timeout) as response: if response.status == 200: # Limit the size of the content to prevent memory issues # Max 1MB for raw content, can be adjusted max_content_size = 1 * 1024 * 1024 content_length = response.headers.get("Content-Length") if content_length and int(content_length) > max_content_size: return f"Error: Content at URL is too large (>{max_content_size / (1024*1024):.0f}MB). Size: {content_length} bytes." # Read content chunk by chunk to enforce max_content_size if Content-Length is missing/unreliable content = b"" async for chunk in response.content.iter_chunked( 1024 ): # Read 1KB chunks content += chunk if len(content) > max_content_size: return f"Error: Content at URL is too large (exceeded {max_content_size / (1024*1024):.0f}MB during download)." # Try to decode as UTF-8, replace errors return content.decode("utf-8", errors="replace") else: # Try to read a snippet of the error response body error_body_snippet = "" try: error_body_snippet = await response.text() error_body_snippet = error_body_snippet[ :200 ] # Limit snippet length except Exception: error_body_snippet = "(Could not read error response body)" return f"Error: Failed to fetch URL. Status code: {response.status}. Response snippet: {error_body_snippet}" except asyncio.TimeoutError: return f"Error: Request to URL '{url}' timed out after 30 seconds." except aiohttp.ClientError as e: return f"Error: aiohttp client error while fetching URL '{url}': {type(e).__name__} - {e}" except Exception as e: return f"Error fetching content from URL '{url}': {type(e).__name__} - {e}" # --- End of New Tool Execution Methods --- async def _process_agent_interaction( self, ctx: commands.Context, initial_prompt_text: str ): user_id = ctx.author.id # Check current mode and prepend to history if it's the start of a new interaction (or if mode changed) # The mode change command already adds a notification. Here, we ensure the AI is aware of the *current* mode # if this is a fresh interaction after a mode was set previously. # However, the system prompt now instructs AI on how mode changes are communicated. # So, direct injection here might be redundant if mode change command handles it. # Let's rely on the mode change command to inject the notification. self._add_to_conversation_history( user_id, role="user", text_content=initial_prompt_text ) iteration_count = 0 max_iterations = 10 # Configurable, from plan # Ensure genai_client is available if not self.genai_client: await ctx.send( "AICodeAgent: Google GenAI Client is not initialized. Cannot process request." ) return async with ctx.typing(): while iteration_count < max_iterations: current_history = self._get_conversation_history(user_id) if not current_history: # Should not happen if initial prompt was added await ctx.send( "AICodeAgent: Error - conversation history is empty." ) return try: # Construct messages for Vertex AI API # The system prompt is passed via generation_config.system_instruction vertex_contents = current_history # Already in types.Content format generation_config = google_genai_types.GenerateContentConfig( temperature=0.3, # Adjust as needed max_output_tokens=65535, # Adjust as needed safety_settings=STANDARD_SAFETY_SETTINGS, # System instruction is critical here system_instruction=google_genai_types.Content( role="system", # Though for Gemini, system prompt is often first user message or model tuning parts=[google_genai_types.Part(text=AGENT_SYSTEM_PROMPT)], ), ) print( f"AICodeAgentCog: Sending to Vertex AI. Model: {self._ai_model}. History items: {len(vertex_contents)}" ) # for i, item in enumerate(vertex_contents): # print(f" History {i} Role: {item.role}, Parts: {item.parts}") response = await self.genai_client.aio.models.generate_content( model=f"publishers/google/models/{self._ai_model}", contents=vertex_contents, config=generation_config, # Corrected parameter name # No 'tools' or 'tool_config' for inline tool usage ) # Safely extract text from response ai_response_text = "" if ( response.candidates and response.candidates[0].content and response.candidates[0].content.parts ): ai_response_text = response.candidates[0].content.parts[0].text else: # Handle cases like safety blocks or empty responses finish_reason = ( response.candidates[0].finish_reason if response.candidates else "UNKNOWN" ) safety_ratings_str = "" if ( response.candidates and response.candidates[0].safety_ratings ): sr = response.candidates[0].safety_ratings safety_ratings_str = ", ".join( [ f"{rating.category.name}: {rating.probability.name}" for rating in sr ] ) if finish_reason == google_genai_types.FinishReason.SAFETY: await ctx.send( f"AICodeAgent: AI response was blocked due to safety settings: {safety_ratings_str}" ) self._add_to_conversation_history( user_id, role="model", text_content=f"[Blocked by Safety: {safety_ratings_str}]", ) return else: await ctx.send( f"AICodeAgent: AI returned an empty or non-text response. Finish Reason: {finish_reason}. Safety: {safety_ratings_str}" ) self._add_to_conversation_history( user_id, role="model", text_content="[Empty or Non-Text Response]", ) return if not ai_response_text.strip(): await ctx.send( "AICodeAgent: AI returned an empty response text." ) self._add_to_conversation_history( user_id, role="model", text_content="[Empty Response Text]" ) return self._add_to_conversation_history( user_id, role="model", text_content=ai_response_text ) print(f"AICodeAgentCog: AI Raw Response:\n{ai_response_text}") # Parse for inline tool call # _parse_and_execute_tool_call now returns -> Tuple[str, Optional[str]] # status can be "TOOL_OUTPUT", "TASK_COMPLETE", "NO_TOOL" # data is the tool output string, completion message, or original AI text parse_status, parsed_data = await self._parse_and_execute_tool_call( ctx, ai_response_text ) if parse_status == "TASK_COMPLETE": completion_message = ( parsed_data if parsed_data is not None else "Task marked as complete by AI." ) await ctx.send( f"AICodeAgent: Task Complete!\n{completion_message}" ) # Log AI's completion signal to history (optional, but good for context) # self._add_to_conversation_history(user_id, role="model", text_content=f"TaskComplete: message: {completion_message}") return # End of interaction elif parse_status == "TOOL_OUTPUT": tool_output_str = parsed_data if ( tool_output_str is None ): # Should not happen if status is TOOL_OUTPUT but defensive tool_output_str = ( "Error: Tool executed but returned no output string." ) print(f"AICodeAgentCog: Tool Output:\n{tool_output_str}") self._add_to_conversation_history( user_id, role="user", text_content=tool_output_str ) # Feed tool output back as 'user' iteration_count += 1 # Optionally send tool output to Discord for transparency if desired # if len(tool_output_str) < 1900 : await ctx.send(f"```{tool_output_str}```") continue # Loop back to AI with tool output in history elif parse_status == "NO_TOOL": # No tool call found, this is the final AI response for this turn final_ai_text = ( parsed_data # This is the original ai_response_text ) if final_ai_text is None: # Should not happen final_ai_text = "AI provided no textual response." if len(final_ai_text) > 1950: await ctx.send( final_ai_text[:1950] + "\n...(message truncated)" ) else: await ctx.send(final_ai_text) return # End of interaction else: # Should not happen await ctx.send( "AICodeAgent: Internal error - unknown parse status from tool parser." ) return except google_exceptions.GoogleAPICallError as e: await ctx.send(f"AICodeAgent: Vertex AI API call failed: {e}") return except Exception as e: await ctx.send( f"AICodeAgent: An unexpected error occurred during AI interaction: {e}" ) print( f"AICodeAgentCog: Interaction Error: {type(e).__name__} - {e}" ) import traceback traceback.print_exc() return # Iteration limit check (moved inside loop for clarity, but logic is similar) if iteration_count >= max_iterations: await ctx.send( f"AICodeAgent: Reached iteration limit ({max_iterations})." ) try: check = ( lambda m: m.author == ctx.author and m.channel == ctx.channel and m.content.lower().startswith( ("yes", "no", "continue", "feedback") ) ) await ctx.send( "Continue processing? (yes/no/feedback ):" ) user_response_msg = await self.bot.wait_for( "message", check=check, timeout=300.0 ) user_response_content = user_response_msg.content.lower() if user_response_content.startswith( "yes" ) or user_response_content.startswith("continue"): iteration_count = 0 # Reset iteration count self._add_to_conversation_history( user_id, role="user", text_content="[User approved continuation]", ) await ctx.send("Continuing...") continue elif user_response_content.startswith("feedback"): feedback_text = user_response_msg.content[ len("feedback") : ].strip() iteration_count = 0 # Reset self._add_to_conversation_history( user_id, role="user", text_content=f"System Feedback: {feedback_text}", ) await ctx.send("Continuing with feedback...") continue else: # No or other await ctx.send("AICodeAgent: Processing stopped by user.") return except asyncio.TimeoutError: await ctx.send( "AICodeAgent: Continuation prompt timed out. Stopping." ) return # If loop finishes due to max_iterations without reset (should be caught by above) if iteration_count >= max_iterations: await ctx.send( "AICodeAgent: Stopped due to reaching maximum processing iterations." ) @commands.command(name="codeagent", aliases=["ca"]) @commands.is_owner() async def codeagent_command(self, ctx: commands.Context, *, prompt: str): """Interacts with the AI Code Agent.""" if not self.genai_client: await ctx.send( "AICodeAgent: Google GenAI Client is not initialized. Cannot process request." ) return if not prompt: await ctx.send("AICodeAgent: Please provide a prompt for the agent.") return await self._process_agent_interaction(ctx, prompt) async def setup(bot: commands.Bot): # Ensure PROJECT_ID and LOCATION are available before adding cog # Or allow loading and let commands fail gracefully if genai_client is None if not PROJECT_ID or not LOCATION: print("AICodeAgentCog: Cannot load cog as PROJECT_ID or LOCATION is missing.") # Optionally, raise an error or just don't add the cog # For now, let it load but genai_client will be None and commands using it should check cog = AICodeAgentCog(bot) await bot.add_cog(cog) print("AICodeAgentCog loaded.")