820 lines
46 KiB
Python
820 lines
46 KiB
Python
import discord
|
|
from discord.ext import commands
|
|
import re
|
|
import os
|
|
import asyncio
|
|
import subprocess
|
|
import json
|
|
import base64
|
|
import datetime # For snapshot naming
|
|
import random # For snapshot naming
|
|
from typing import Dict, Any, List, Optional, Tuple
|
|
|
|
# Google Generative AI Imports (using Vertex AI backend)
|
|
from google import genai
|
|
from google.genai import types as google_genai_types # Renamed to avoid conflict with typing.types
|
|
from google.api_core import exceptions as google_exceptions
|
|
|
|
# Import project configuration for Vertex AI
|
|
try:
|
|
from gurt.config import PROJECT_ID, LOCATION
|
|
except ImportError:
|
|
PROJECT_ID = os.getenv("GCP_PROJECT_ID") # Fallback to environment variable
|
|
LOCATION = os.getenv("GCP_LOCATION") # Fallback to environment variable
|
|
if not PROJECT_ID or not LOCATION:
|
|
print("Warning: PROJECT_ID or LOCATION not found in gurt.config or environment variables.")
|
|
# Allow cog to load but genai_client will be None
|
|
|
|
from tavily import TavilyClient
|
|
|
|
# Define standard safety settings using google.generativeai types
|
|
# Set all thresholds to OFF as requested for internal tools
|
|
STANDARD_SAFETY_SETTINGS = [
|
|
google_genai_types.SafetySetting(category=google_genai_types.HarmCategory.HARM_CATEGORY_HATE_SPEECH, threshold="BLOCK_NONE"),
|
|
google_genai_types.SafetySetting(category=google_genai_types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT, threshold="BLOCK_NONE"),
|
|
google_genai_types.SafetySetting(category=google_genai_types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT, threshold="BLOCK_NONE"),
|
|
google_genai_types.SafetySetting(category=google_genai_types.HarmCategory.HARM_CATEGORY_HARASSMENT, threshold="BLOCK_NONE"),
|
|
]
|
|
|
|
COMMIT_AUTHOR = "AI Coding Agent Cog <me@slipstreamm.dev>"
|
|
|
|
AGENT_SYSTEM_PROMPT = """You are an expert AI Coding Agent. Your primary function is to assist the user (bot owner) by directly modifying the codebase of this Discord bot project or performing related tasks. You operate by understanding user requests and then generating specific inline "tool calls" in your responses when you need to interact with the file system, execute commands, or search the web.
|
|
|
|
**Inline Tool Call Syntax:**
|
|
When you need to use a tool, your response should *only* contain the tool call block, formatted exactly as specified below. The system will parse this, execute the tool, and then feed the output back to you in a subsequent message prefixed with "ToolResponse:".
|
|
IMPORTANT: Do NOT wrap your tool calls in markdown code blocks (e.g., ```tool ... ``` or ```json ... ```). Output the raw tool syntax directly, starting with the tool name (e.g., `ReadFile:`).
|
|
|
|
1. **ReadFile:** Reads the content of a specified file.
|
|
```tool
|
|
ReadFile:
|
|
path: <string: path_to_file>
|
|
```
|
|
(System will provide file content or error in ToolResponse)
|
|
|
|
2. **WriteFile:** Writes content to a specified file, overwriting if it exists, creating if it doesn't.
|
|
```tool
|
|
WriteFile:
|
|
path: <string: path_to_file>
|
|
content: |
|
|
<string: multi-line_file_content>
|
|
```
|
|
(System will confirm success or report error in ToolResponse)
|
|
|
|
3. **ApplyDiff:** Applies a diff/patch to a file. Use standard unidiff format for the diff_block.
|
|
```tool
|
|
ApplyDiff:
|
|
path: <string: path_to_file>
|
|
diff_block: |
|
|
<string: multi-line_diff_content>
|
|
```
|
|
(System will confirm success or report error in ToolResponse)
|
|
|
|
4. **ExecuteCommand:** Executes a shell command.
|
|
```tool
|
|
ExecuteCommand:
|
|
command: <string: shell_command_to_execute>
|
|
```
|
|
(System will provide stdout/stderr or error in ToolResponse)
|
|
|
|
5. **ListFiles:** Lists files and directories at a given path.
|
|
```tool
|
|
ListFiles:
|
|
path: <string: path_to_search>
|
|
recursive: <boolean: true_or_false (optional, default: false)>
|
|
```
|
|
(System will provide file list or error in ToolResponse)
|
|
|
|
6. **WebSearch:** Searches the web for information.
|
|
```tool
|
|
WebSearch:
|
|
query: <string: search_query>
|
|
```
|
|
(System will provide search results or error in ToolResponse)
|
|
|
|
7. **TaskComplete:** Signals that the current multi-step task is considered complete by the AI.
|
|
```tool
|
|
TaskComplete:
|
|
message: <string: A brief summary of what was accomplished or the final status.>
|
|
```
|
|
(System will acknowledge and stop the current interaction loop.)
|
|
|
|
**Workflow and Rules:**
|
|
- **Tool Preference:** For modifying existing files, ALWAYS prefer `ApplyDiff` if the changes are targeted. Use `WriteFile` for new files or if `ApplyDiff` is unsuitable or fails repeatedly.
|
|
- **Direct Operation:** You operate directly. No explicit user confirmation is needed for individual tool actions after the initial user prompt.
|
|
- **Programmatic Snapshots (System-Managed):**
|
|
- The system AUTOMATICALLY creates a Git snapshot of the project *before* executing `WriteFile` or `ApplyDiff` tools.
|
|
- You will be notified by a "ToolResponse: SystemNotification..." message when a snapshot has been successfully created, right before your file modification tool is about to be truly processed.
|
|
- You do NOT need to request or create snapshots yourself. Do NOT include snapshot steps in your `ExecuteCommand` calls for `git`.
|
|
- If the system fails to create a snapshot, it will inform you with a "ToolResponse: SystemError...". In such a case, your `WriteFile` or `ApplyDiff` operation will NOT proceed. You should then typically inform the user of this critical system failure. Do not repeatedly try the same file operation if snapshot creation consistently fails.
|
|
- **Git Workflow for Your Changes:** After you believe your coding task and all related file modifications are complete and correct, you MUST use the `ExecuteCommand` tool to perform the following Git operations in sequence:
|
|
1. `git add .` (to stage all your changes)
|
|
2. `git commit --author="AI Coding Agent Cog <me@slipstreamm.dev>" -m "AI Agent: <Your concise summary of changes>"` (You will generate the commit message part)
|
|
3. **Before pushing, attempt to integrate remote changes:** `git pull --rebase`
|
|
4. `git push`
|
|
- **Commit Messages:** Ensure your commit messages are descriptive of the changes made.
|
|
- **Conflict Resolution (Git Pull --rebase):** If `git pull --rebase` (executed via `ExecuteCommand`) results in merge conflicts, the `ToolResponse` will indicate this. You must then:
|
|
a. Use `ReadFile` to inspect the conflicted file(s) to see the conflict markers.
|
|
b. Decide on the resolution.
|
|
c. Use `WriteFile` or `ApplyDiff` to apply your resolved version of the file(s). (Remember, a programmatic snapshot will be made before these tools run).
|
|
d. Use `ExecuteCommand` for `git add <resolved_file_path>` for each resolved file.
|
|
e. Use `ExecuteCommand` for `git rebase --continue`.
|
|
f. Then attempt `git push` again using `ExecuteCommand`.
|
|
- **Push Failures:** If `git push` still fails (e.g., other non-fast-forward errors), the `ToolResponse` will report this. You should then inform the user about the push failure and the reason, and await further instructions. Do not attempt overly complex recovery maneuvers without user guidance.
|
|
- **Clarity:** Be clear and methodical. If a step fails, acknowledge it and decide on the next course of action (retry, try alternative, or inform user).
|
|
- **Focus:** Your goal is to complete the coding/file manipulation task as requested by the user.
|
|
"""
|
|
|
|
class AICodeAgentCog(commands.Cog):
|
|
def __init__(self, bot: commands.Bot):
|
|
self.bot = bot
|
|
self.genai_client = None
|
|
self.agent_conversations: Dict[int, List[google_genai_types.Content]] = {} # User ID to conversation history
|
|
|
|
# Initialize Google GenAI Client for Vertex AI
|
|
if PROJECT_ID and LOCATION:
|
|
try:
|
|
self.genai_client = genai.Client(
|
|
vertexai=True,
|
|
project=PROJECT_ID,
|
|
location=LOCATION,
|
|
)
|
|
print(f"AICodeAgentCog: Google GenAI Client initialized for Vertex AI project '{PROJECT_ID}' in location '{LOCATION}'.")
|
|
except Exception as e:
|
|
print(f"AICodeAgentCog: Error initializing Google GenAI Client for Vertex AI: {e}")
|
|
self.genai_client = None # Ensure it's None on failure
|
|
else:
|
|
print("AICodeAgentCog: PROJECT_ID or LOCATION not configured. Google GenAI Client not initialized.")
|
|
|
|
# AI Model Configuration
|
|
self._ai_model: str = "gemini-2.5-flash-preview-05-20" # Default model
|
|
self._available_models: Dict[str, str] = {
|
|
"pro": "gemini-2.5-pro-preview-05-06", # Assuming this is the intended Pro model
|
|
"flash": "gemini-2.5-flash-preview-05-20"
|
|
}
|
|
# User mentioned "gemini-2.5-pro-preview-05-06" and "gemini-2.5-flash-preview-05-20"
|
|
# Updating to reflect those if they are the correct ones, otherwise the 1.5 versions are common.
|
|
# For now, sticking to what was in the plan based on common Gemini models.
|
|
# If 2.5 models are indeed what's intended and available, these strings should be updated.
|
|
|
|
# Tavily Web Search Integration
|
|
self.tavily_api_key: Optional[str] = os.getenv("TAVILY_API_KEY")
|
|
self.tavily_client: Optional[TavilyClient] = None
|
|
if self.tavily_api_key:
|
|
self.tavily_client = TavilyClient(api_key=self.tavily_api_key)
|
|
print("AICodeAgentCog: TavilyClient initialized.")
|
|
else:
|
|
print("AICodeAgentCog: TAVILY_API_KEY not found. TavilyClient not initialized.")
|
|
|
|
self.tavily_search_depth: str = os.getenv("TAVILY_DEFAULT_SEARCH_DEPTH", "basic")
|
|
self.tavily_max_results: int = int(os.getenv("TAVILY_DEFAULT_MAX_RESULTS", "5"))
|
|
|
|
@commands.command(name="codeagent_model")
|
|
@commands.is_owner()
|
|
async def set_ai_model_command(self, ctx: commands.Context, model_key: str):
|
|
"""Sets the AI model for the code agent. Usage: !codeagent_model <pro|flash>"""
|
|
model_key = model_key.lower()
|
|
if model_key in self._available_models:
|
|
self._ai_model = self._available_models[model_key]
|
|
await ctx.send(f"AICodeAgent: AI model set to: {self._ai_model} (key: {model_key})")
|
|
else:
|
|
await ctx.send(f"AICodeAgent: Invalid model key '{model_key}'. Available keys: {', '.join(self._available_models.keys())}")
|
|
|
|
@commands.command(name="codeagent_get_model")
|
|
@commands.is_owner()
|
|
async def get_ai_model_command(self, ctx: commands.Context):
|
|
"""Gets the current AI model for the code agent."""
|
|
await ctx.send(f"AICodeAgent: Current AI model is: {self._ai_model}")
|
|
|
|
@commands.command(name="codeagent_clear")
|
|
@commands.is_owner()
|
|
async def clear_agent_history_command(self, ctx: commands.Context):
|
|
"""Clears the conversation history for the code agent for the calling user."""
|
|
user_id = ctx.author.id
|
|
if user_id in self.agent_conversations:
|
|
del self.agent_conversations[user_id]
|
|
await ctx.send("AICodeAgent: Conversation history cleared for you.")
|
|
else:
|
|
await ctx.send("AICodeAgent: No conversation history found for you to clear.")
|
|
|
|
async def _run_git_command(self, command: str) -> Tuple[bool, str]:
|
|
"""Runs a Git command and returns (success_status, output_string)."""
|
|
try:
|
|
process = await asyncio.create_subprocess_shell(
|
|
command,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE
|
|
)
|
|
stdout, stderr = await process.communicate()
|
|
|
|
output = ""
|
|
if stdout:
|
|
output += stdout.decode(errors='replace')
|
|
if stderr:
|
|
output += stderr.decode(errors='replace')
|
|
|
|
if process.returncode == 0:
|
|
return True, output.strip()
|
|
else:
|
|
error_message = f"Git command failed with exit code {process.returncode}:\n{output.strip()}"
|
|
print(f"AICodeAgentCog: {error_message}")
|
|
return False, error_message
|
|
except Exception as e:
|
|
error_message = f"Exception running Git command '{command}': {e}"
|
|
print(f"AICodeAgentCog: {error_message}")
|
|
return False, error_message
|
|
|
|
async def _create_programmatic_snapshot(self) -> Optional[str]:
|
|
"""Creates a programmatic Git snapshot using a temporary branch."""
|
|
try:
|
|
# Get current branch name
|
|
success, current_branch_name = await self._run_git_command("git rev-parse --abbrev-ref HEAD")
|
|
if not success:
|
|
print(f"AICodeAgentCog: Failed to get current branch name for snapshot: {current_branch_name}")
|
|
return None
|
|
current_branch_name = current_branch_name.strip()
|
|
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
random_hex = random.randbytes(3).hex()
|
|
snapshot_branch_name = f"snapshot_cog_{timestamp}_{random_hex}"
|
|
|
|
# Create and checkout the new snapshot branch
|
|
success, output = await self._run_git_command(f"git checkout -b {snapshot_branch_name}")
|
|
if not success:
|
|
print(f"AICodeAgentCog: Failed to create snapshot branch '{snapshot_branch_name}': {output}")
|
|
# Attempt to switch back if checkout failed mid-operation
|
|
await self._run_git_command(f"git checkout {current_branch_name}") # Best effort
|
|
return None
|
|
|
|
# Commit any currently staged changes or create an empty commit for a clean snapshot point
|
|
commit_message = f"Cog Snapshot: Pre-AI Edit State on branch {snapshot_branch_name}"
|
|
success, output = await self._run_git_command(f"git commit --author=\"{COMMIT_AUTHOR}\" -m \"{commit_message}\" --allow-empty")
|
|
if not success:
|
|
print(f"AICodeAgentCog: Failed to commit snapshot on '{snapshot_branch_name}': {output}")
|
|
# Attempt to switch back and clean up branch
|
|
await self._run_git_command(f"git checkout {current_branch_name}")
|
|
await self._run_git_command(f"git branch -D {snapshot_branch_name}") # Best effort cleanup
|
|
return None
|
|
|
|
# Switch back to the original branch
|
|
success, output = await self._run_git_command(f"git checkout {current_branch_name}")
|
|
if not success:
|
|
print(f"AICodeAgentCog: CRITICAL - Failed to switch back to original branch '{current_branch_name}' after snapshot. Current branch might be '{snapshot_branch_name}'. Manual intervention may be needed. Error: {output}")
|
|
return snapshot_branch_name # Return it so it can potentially be used/deleted
|
|
|
|
print(f"AICodeAgentCog: Successfully created snapshot branch: {snapshot_branch_name}")
|
|
return snapshot_branch_name
|
|
except Exception as e:
|
|
print(f"AICodeAgentCog: Exception in _create_programmatic_snapshot: {e}")
|
|
return None
|
|
|
|
@commands.command(name="codeagent_list_snapshots")
|
|
@commands.is_owner()
|
|
async def list_snapshots_command(self, ctx: commands.Context):
|
|
"""Lists available programmatic Git snapshots created by the cog."""
|
|
success, output = await self._run_git_command('git branch --list "snapshot_cog_*"')
|
|
if success:
|
|
if output:
|
|
await ctx.send(f"Available snapshots:\n```\n{output}\n```")
|
|
else:
|
|
await ctx.send("No programmatic snapshots found.")
|
|
else:
|
|
await ctx.send(f"Error listing snapshots: {output}")
|
|
|
|
@commands.command(name="codeagent_revert_to_snapshot")
|
|
@commands.is_owner()
|
|
async def revert_to_snapshot_command(self, ctx: commands.Context, snapshot_branch_name: str):
|
|
"""Reverts the current branch to the state of a given snapshot branch."""
|
|
if not snapshot_branch_name.startswith("snapshot_cog_"):
|
|
await ctx.send("Invalid snapshot name. Must start with 'snapshot_cog_'.")
|
|
return
|
|
|
|
# Check if snapshot branch exists
|
|
success, branches_output = await self._run_git_command("git branch --list")
|
|
# Normalize branches_output for reliable checking
|
|
existing_branches = [b.strip().lstrip('* ') for b in branches_output.splitlines()]
|
|
if not success or snapshot_branch_name not in existing_branches:
|
|
await ctx.send(f"Snapshot branch '{snapshot_branch_name}' not found.")
|
|
return
|
|
|
|
await ctx.send(f"Attempting to revert current branch to snapshot '{snapshot_branch_name}'...")
|
|
success, current_branch = await self._run_git_command("git rev-parse --abbrev-ref HEAD")
|
|
if not success:
|
|
await ctx.send(f"Failed to determine current branch before revert: {current_branch}")
|
|
return
|
|
current_branch = current_branch.strip()
|
|
|
|
success, output = await self._run_git_command(f"git reset --hard {snapshot_branch_name}")
|
|
if success:
|
|
await ctx.send(f"Successfully reverted current branch ('{current_branch}') to snapshot '{snapshot_branch_name}'.\nOutput:\n```\n{output}\n```")
|
|
else:
|
|
await ctx.send(f"Error reverting to snapshot '{snapshot_branch_name}':\n```\n{output}\n```")
|
|
|
|
@commands.command(name="codeagent_delete_snapshot")
|
|
@commands.is_owner()
|
|
async def delete_snapshot_command(self, ctx: commands.Context, snapshot_branch_name: str):
|
|
"""Deletes a programmatic Git snapshot branch."""
|
|
if not snapshot_branch_name.startswith("snapshot_cog_"):
|
|
await ctx.send("Invalid snapshot name. Must start with 'snapshot_cog_'.")
|
|
return
|
|
|
|
success, branches_output = await self._run_git_command("git branch --list")
|
|
existing_branches = [b.strip().lstrip('* ') for b in branches_output.splitlines()]
|
|
if not success or snapshot_branch_name not in existing_branches:
|
|
await ctx.send(f"Snapshot branch '{snapshot_branch_name}' not found.")
|
|
return
|
|
|
|
success, current_branch_name_str = await self._run_git_command("git rev-parse --abbrev-ref HEAD")
|
|
if success and current_branch_name_str.strip() == snapshot_branch_name:
|
|
await ctx.send(f"Cannot delete snapshot branch '{snapshot_branch_name}' as it is the current branch. Please checkout to a different branch first.")
|
|
return
|
|
elif not success:
|
|
await ctx.send(f"Could not determine current branch. Deletion aborted for safety. Error: {current_branch_name_str}")
|
|
return
|
|
|
|
await ctx.send(f"Attempting to delete snapshot branch '{snapshot_branch_name}'...")
|
|
success, output = await self._run_git_command(f"git branch -D {snapshot_branch_name}")
|
|
if success:
|
|
await ctx.send(f"Successfully deleted snapshot branch '{snapshot_branch_name}'.\nOutput:\n```\n{output}\n```")
|
|
else:
|
|
await ctx.send(f"Error deleting snapshot branch '{snapshot_branch_name}':\n```\n{output}\n```")
|
|
|
|
def _get_conversation_history(self, user_id: int) -> List[google_genai_types.Content]:
|
|
if user_id not in self.agent_conversations:
|
|
self.agent_conversations[user_id] = []
|
|
return self.agent_conversations[user_id]
|
|
|
|
def _add_to_conversation_history(self, user_id: int, role: str, text_content: str, is_tool_response: bool = False):
|
|
history = self._get_conversation_history(user_id)
|
|
# For Vertex AI, 'function' role is used for tool responses, 'model' for AI text, 'user' for user text.
|
|
# We'll adapt this slightly for our inline tools.
|
|
# AI's raw response (potentially with tool call) -> model
|
|
# Tool's output -> user (formatted as "ToolResponse: ...")
|
|
# User's direct prompt -> user
|
|
|
|
# For simplicity in our loop, we might treat tool responses as if they are from the 'user'
|
|
# to guide the AI's next step, or use a specific format the AI understands.
|
|
# The system prompt already guides the AI to expect "ToolResponse:"
|
|
|
|
# Let's ensure content is always a list of parts for Vertex
|
|
parts = [google_genai_types.Part(text=text_content)]
|
|
history.append(google_genai_types.Content(role=role, parts=parts))
|
|
|
|
# Keep history to a reasonable length (e.g., last 20 turns, or token-based limit later)
|
|
max_history_items = 20
|
|
if len(history) > max_history_items:
|
|
self.agent_conversations[user_id] = history[-max_history_items:]
|
|
|
|
async def _parse_and_execute_tool_call(self, ctx: commands.Context, ai_response_text: str) -> Optional[str]:
|
|
"""
|
|
Parses AI response for an inline tool call, executes it, and returns the tool's output string.
|
|
Returns a tuple: (status: str, data: Optional[str]).
|
|
Status can be "TOOL_OUTPUT", "TASK_COMPLETE", "NO_TOOL".
|
|
Data is the tool output string, completion message, or original AI text.
|
|
"""
|
|
tool_executed = False # Flag to indicate if any tool was matched and attempted
|
|
|
|
# --- TaskComplete ---
|
|
# Check for TaskComplete first as it's a terminal operation for the loop.
|
|
task_complete_match = re.search(r"TaskComplete:\s*message:\s*(.*)", ai_response_text, re.IGNORECASE | re.DOTALL)
|
|
if task_complete_match:
|
|
tool_executed = True
|
|
# Ensure we capture the message correctly, even if it's multi-line, up to the end of the tool block or string.
|
|
# The (.*) should be greedy enough for simple cases.
|
|
completion_message = task_complete_match.group(1).strip()
|
|
return "TASK_COMPLETE", completion_message
|
|
|
|
# --- ReadFile ---
|
|
if not tool_executed:
|
|
# Path can contain spaces, so .+? is good. Ensure it doesn't grab parts of other tools if text is messy.
|
|
read_file_match = re.search(r"ReadFile:\s*path:\s*(.+?)(?:\n|$)", ai_response_text, re.IGNORECASE | re.MULTILINE)
|
|
if read_file_match:
|
|
tool_executed = True
|
|
file_path = read_file_match.group(1).strip()
|
|
tool_output = await self._execute_tool_read_file(file_path)
|
|
return "TOOL_OUTPUT", f"ToolResponse: ReadFile\nPath: {file_path}\n---\n{tool_output}"
|
|
|
|
# --- WriteFile ---
|
|
if not tool_executed:
|
|
# Content can be multi-line and extensive. DOTALL is crucial.
|
|
# Capture path, then content separately.
|
|
write_file_match = re.search(r"WriteFile:\s*path:\s*(.+?)\s*content:\s*\|?\s*(.*)", ai_response_text, re.IGNORECASE | re.DOTALL)
|
|
if write_file_match:
|
|
tool_executed = True
|
|
file_path = write_file_match.group(1).strip()
|
|
content = write_file_match.group(2).strip()
|
|
|
|
snapshot_branch = await self._create_programmatic_snapshot()
|
|
if not snapshot_branch:
|
|
return "TOOL_OUTPUT", "ToolResponse: SystemError\n---\nFailed to create project snapshot. WriteFile operation aborted."
|
|
else:
|
|
# The notification about snapshot creation is now in the system prompt's description of the workflow.
|
|
# We can still send a message to Discord for owner visibility if desired.
|
|
await ctx.send(f"AICodeAgent: [Info] Created snapshot: {snapshot_branch} before writing to {file_path}")
|
|
tool_output = await self._execute_tool_write_file(file_path, content)
|
|
return "TOOL_OUTPUT", f"ToolResponse: WriteFile\nPath: {file_path}\n---\n{tool_output}"
|
|
|
|
# --- ApplyDiff ---
|
|
if not tool_executed:
|
|
# Diff block can be multi-line.
|
|
apply_diff_match = re.search(r"ApplyDiff:\s*path:\s*(.+?)\s*diff_block:\s*\|?\s*(.*)", ai_response_text, re.IGNORECASE | re.DOTALL)
|
|
if apply_diff_match:
|
|
tool_executed = True
|
|
file_path = apply_diff_match.group(1).strip()
|
|
diff_block = apply_diff_match.group(2).strip()
|
|
|
|
snapshot_branch = await self._create_programmatic_snapshot()
|
|
if not snapshot_branch:
|
|
return "TOOL_OUTPUT", "ToolResponse: SystemError\n---\nFailed to create project snapshot. ApplyDiff operation aborted."
|
|
else:
|
|
await ctx.send(f"AICodeAgent: [Info] Created snapshot: {snapshot_branch} before applying diff to {file_path}")
|
|
tool_output = await self._execute_tool_apply_diff(file_path, diff_block)
|
|
return "TOOL_OUTPUT", f"ToolResponse: ApplyDiff\nPath: {file_path}\n---\n{tool_output}"
|
|
|
|
# --- ExecuteCommand ---
|
|
if not tool_executed:
|
|
# Command can have spaces.
|
|
exec_command_match = re.search(r"ExecuteCommand:\s*command:\s*(.+?)(?:\n|$)", ai_response_text, re.IGNORECASE | re.DOTALL)
|
|
if exec_command_match:
|
|
tool_executed = True
|
|
command_str = exec_command_match.group(1).strip()
|
|
tool_output = await self._execute_tool_execute_command(command_str)
|
|
return "TOOL_OUTPUT", f"ToolResponse: ExecuteCommand\nCommand: {command_str}\n---\n{tool_output}"
|
|
|
|
# --- ListFiles ---
|
|
if not tool_executed:
|
|
# Path and optional recursive flag.
|
|
list_files_match = re.search(r"ListFiles:\s*path:\s*(.+?)(?:\s*recursive:\s*(true|false))?(?:\n|$)", ai_response_text, re.IGNORECASE | re.MULTILINE)
|
|
if list_files_match:
|
|
tool_executed = True
|
|
file_path = list_files_match.group(1).strip()
|
|
recursive_str = list_files_match.group(2) # This might be None
|
|
recursive = recursive_str.lower() == 'true' if recursive_str else False
|
|
tool_output = await self._execute_tool_list_files(file_path, recursive)
|
|
return "TOOL_OUTPUT", f"ToolResponse: ListFiles\nPath: {file_path}\nRecursive: {recursive}\n---\n{tool_output}"
|
|
|
|
# --- WebSearch ---
|
|
if not tool_executed:
|
|
# Query can have spaces.
|
|
web_search_match = re.search(r"WebSearch:\s*query:\s*(.+?)(?:\n|$)", ai_response_text, re.IGNORECASE | re.DOTALL)
|
|
if web_search_match:
|
|
tool_executed = True
|
|
query_str = web_search_match.group(1).strip()
|
|
tool_output = await self._execute_tool_web_search(query_str)
|
|
return "TOOL_OUTPUT", f"ToolResponse: WebSearch\nQuery: {query_str}\n---\n{tool_output}"
|
|
|
|
return "NO_TOOL", ai_response_text # No tool call found, return original AI text
|
|
|
|
# --- Tool Execution Methods ---
|
|
# (Implementations for _execute_tool_... methods remain the same)
|
|
|
|
async def _execute_tool_read_file(self, path: str) -> str:
|
|
print(f"AICodeAgentCog: Placeholder _execute_tool_read_file for path: {path}")
|
|
# Actual implementation:
|
|
try:
|
|
# Ensure path is within project, basic safety. More robust checks might be needed.
|
|
# base_dir = os.path.abspath(".") # Or specific project root
|
|
# requested_path = os.path.abspath(os.path.join(base_dir, path))
|
|
# if not requested_path.startswith(base_dir):
|
|
# return "Error: File path is outside the allowed project directory."
|
|
if not os.path.exists(path):
|
|
return f"Error: File not found at '{path}'"
|
|
if os.path.isdir(path):
|
|
return f"Error: Path '{path}' is a directory, not a file."
|
|
with open(path, 'r', encoding='utf-8', errors='replace') as f:
|
|
return f.read()
|
|
except Exception as e:
|
|
return f"Error reading file '{path}': {type(e).__name__} - {e}"
|
|
|
|
async def _execute_tool_write_file(self, path: str, content: str) -> str:
|
|
print(f"AICodeAgentCog: Placeholder _execute_tool_write_file for path: {path}")
|
|
# Actual implementation:
|
|
try:
|
|
# base_dir = os.path.abspath(".")
|
|
# requested_path = os.path.abspath(os.path.join(base_dir, path))
|
|
# if not requested_path.startswith(base_dir):
|
|
# return "Error: File path is outside the allowed project directory."
|
|
os.makedirs(os.path.dirname(path) or '.', exist_ok=True) # Ensure directory exists
|
|
with open(path, 'w', encoding='utf-8') as f:
|
|
f.write(content)
|
|
return f"Successfully wrote to file '{path}'."
|
|
except Exception as e:
|
|
return f"Error writing to file '{path}': {type(e).__name__} - {e}"
|
|
|
|
async def _execute_tool_apply_diff(self, path: str, diff_block: str) -> str:
|
|
print(f"AICodeAgentCog: Attempting _execute_tool_apply_diff for path: {path}")
|
|
if not os.path.exists(path):
|
|
return f"Error: File not found at '{path}' for applying diff."
|
|
if os.path.isdir(path):
|
|
return f"Error: Path '{path}' is a directory, cannot apply diff."
|
|
|
|
try:
|
|
# Ensure diff_block ends with a newline for `patch` utility
|
|
if not diff_block.endswith('\n'):
|
|
diff_block += '\n'
|
|
|
|
process = await asyncio.create_subprocess_exec(
|
|
'patch', path, # Target file for the patch
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE
|
|
)
|
|
stdout, stderr = await process.communicate(input=diff_block.encode('utf-8'))
|
|
|
|
output_str = ""
|
|
if stdout:
|
|
output_str += f"Stdout:\n{stdout.decode(errors='replace')}\n"
|
|
if stderr:
|
|
output_str += f"Stderr:\n{stderr.decode(errors='replace')}\n"
|
|
|
|
if process.returncode == 0:
|
|
return f"Successfully applied diff to '{path}'.\n{output_str}"
|
|
else:
|
|
# Try to provide context if patch failed, e.g. if it created .rej file
|
|
rej_file = f"{path}.rej"
|
|
if os.path.exists(rej_file):
|
|
with open(rej_file, 'r', encoding='utf-8', errors='replace') as rf:
|
|
rej_content = rf.read()
|
|
output_str += f"\nRejects file found ({rej_file}):\n{rej_content[:500]}...\n(Please check this file for details of failed hunks)"
|
|
|
|
return f"Error applying diff to '{path}' (exit code {process.returncode}).\n{output_str}"
|
|
except FileNotFoundError:
|
|
return "Error: The 'patch' command-line utility was not found. Diff application failed. Please ensure 'patch' is installed and in the system PATH."
|
|
except Exception as e:
|
|
return f"Error applying diff to '{path}': {type(e).__name__} - {e}"
|
|
|
|
async def _execute_tool_execute_command(self, command: str) -> str:
|
|
print(f"AICodeAgentCog: Attempting _execute_tool_execute_command: {command}")
|
|
|
|
# Basic safety check for extremely dangerous commands, even for owner.
|
|
# The AI is instructed on Git workflow, but this adds a small layer.
|
|
# More comprehensive checks could be added if needed.
|
|
blocked_commands = ["rm -rf /", "sudo rm -rf /", "mkfs", "format C:", "> /dev/sda"] # Example
|
|
for blocked in blocked_commands:
|
|
if blocked in command: # Simple substring check, can be improved
|
|
return f"Error: Command '{command}' appears to be extremely dangerous and was blocked by a safeguard."
|
|
|
|
try:
|
|
process = await asyncio.create_subprocess_shell(
|
|
command,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE
|
|
)
|
|
stdout, stderr = await process.communicate()
|
|
|
|
output = ""
|
|
if stdout:
|
|
output += f"Stdout:\n{stdout.decode(errors='replace')}\n"
|
|
if stderr:
|
|
output += f"Stderr:\n{stderr.decode(errors='replace')}\n"
|
|
|
|
if not output.strip():
|
|
output = "Command executed with no output.\n"
|
|
|
|
if process.returncode == 0:
|
|
return f"Command executed successfully (exit code 0).\n{output.strip()}"
|
|
else:
|
|
return f"Command failed with exit code {process.returncode}.\n{output.strip()}"
|
|
except Exception as e:
|
|
return f"Exception executing command '{command}': {type(e).__name__} - {e}"
|
|
|
|
async def _execute_tool_list_files(self, path: str, recursive: bool) -> str:
|
|
print(f"AICodeAgentCog: Attempting _execute_tool_list_files for path: {path}, recursive: {recursive}")
|
|
# Actual implementation:
|
|
try:
|
|
# base_dir = os.path.abspath(".")
|
|
# requested_path = os.path.abspath(os.path.join(base_dir, path))
|
|
# if not requested_path.startswith(base_dir):
|
|
# return "Error: Path is outside the allowed project directory."
|
|
if not os.path.exists(path):
|
|
return f"Error: Path not found at '{path}'"
|
|
if not os.path.isdir(path):
|
|
return f"Error: Path '{path}' is not a directory."
|
|
|
|
file_list = []
|
|
if recursive:
|
|
for root, dirs, files in os.walk(path):
|
|
for name in files:
|
|
file_list.append(os.path.join(root, name))
|
|
for name in dirs:
|
|
file_list.append(os.path.join(root, name) + os.sep) # Indicate dirs
|
|
else:
|
|
for item in os.listdir(path):
|
|
full_item_path = os.path.join(path, item)
|
|
if os.path.isdir(full_item_path):
|
|
file_list.append(item + os.sep)
|
|
else:
|
|
file_list.append(item)
|
|
return "\n".join(file_list) if file_list else "No files or directories found."
|
|
except Exception as e:
|
|
return f"Error listing files at '{path}': {type(e).__name__} - {e}"
|
|
|
|
async def _execute_tool_web_search(self, query: str) -> str:
|
|
print(f"AICodeAgentCog: Placeholder _execute_tool_web_search for query: {query}")
|
|
if not self.tavily_client:
|
|
return "Error: Tavily client not initialized. Cannot perform web search."
|
|
try:
|
|
# Using basic parameters for now, can be expanded
|
|
response = await asyncio.to_thread(
|
|
self.tavily_client.search,
|
|
query=query,
|
|
search_depth=self.tavily_search_depth, # "basic" or "advanced"
|
|
max_results=self.tavily_max_results,
|
|
include_answer=True # Try to get a direct answer
|
|
)
|
|
|
|
results_str_parts = []
|
|
if response.get("answer"):
|
|
results_str_parts.append(f"Answer: {response['answer']}")
|
|
|
|
if response.get("results"):
|
|
for i, res in enumerate(response["results"][:self.tavily_max_results]): # Show up to max_results
|
|
results_str_parts.append(f"\nResult {i+1}: {res.get('title', 'N/A')}\nURL: {res.get('url', 'N/A')}\nSnippet: {res.get('content', 'N/A')[:250]}...") # Truncate snippet
|
|
|
|
return "\n".join(results_str_parts) if results_str_parts else "No search results found."
|
|
except Exception as e:
|
|
return f"Error during Tavily web search for '{query}': {type(e).__name__} - {e}"
|
|
|
|
async def _process_agent_interaction(self, ctx: commands.Context, initial_prompt_text: str):
|
|
user_id = ctx.author.id
|
|
self._add_to_conversation_history(user_id, role="user", text_content=initial_prompt_text)
|
|
|
|
iteration_count = 0
|
|
max_iterations = 10 # Configurable, from plan
|
|
|
|
# Ensure genai_client is available
|
|
if not self.genai_client:
|
|
await ctx.send("AICodeAgent: Google GenAI Client is not initialized. Cannot process request.")
|
|
return
|
|
|
|
async with ctx.typing():
|
|
while iteration_count < max_iterations:
|
|
current_history = self._get_conversation_history(user_id)
|
|
|
|
if not current_history: # Should not happen if initial prompt was added
|
|
await ctx.send("AICodeAgent: Error - conversation history is empty.")
|
|
return
|
|
|
|
try:
|
|
# Construct messages for Vertex AI API
|
|
# The system prompt is passed via generation_config.system_instruction
|
|
vertex_contents = current_history # Already in types.Content format
|
|
|
|
generation_config = google_genai_types.GenerateContentConfig(
|
|
temperature=0.7, # Adjust as needed
|
|
max_output_tokens=4096, # Adjust as needed
|
|
safety_settings=STANDARD_SAFETY_SETTINGS,
|
|
# System instruction is critical here
|
|
system_instruction=google_genai_types.Content(
|
|
role="system", # Though for Gemini, system prompt is often first user message or model tuning
|
|
parts=[google_genai_types.Part(text=AGENT_SYSTEM_PROMPT)]
|
|
)
|
|
)
|
|
|
|
print(f"AICodeAgentCog: Sending to Vertex AI. Model: {self._ai_model}. History items: {len(vertex_contents)}")
|
|
# for i, item in enumerate(vertex_contents):
|
|
# print(f" History {i} Role: {item.role}, Parts: {item.parts}")
|
|
|
|
|
|
response = await self.genai_client.aio.models.generate_content(
|
|
model=f"publishers/google/models/{self._ai_model}",
|
|
contents=vertex_contents,
|
|
config=generation_config, # Corrected parameter name
|
|
# No 'tools' or 'tool_config' for inline tool usage
|
|
)
|
|
|
|
# Safely extract text from response
|
|
ai_response_text = ""
|
|
if response.candidates and response.candidates[0].content and response.candidates[0].content.parts:
|
|
ai_response_text = response.candidates[0].content.parts[0].text
|
|
else: # Handle cases like safety blocks or empty responses
|
|
finish_reason = response.candidates[0].finish_reason if response.candidates else "UNKNOWN"
|
|
safety_ratings_str = ""
|
|
if response.candidates and response.candidates[0].safety_ratings:
|
|
sr = response.candidates[0].safety_ratings
|
|
safety_ratings_str = ", ".join([f"{rating.category.name}: {rating.probability.name}" for rating in sr])
|
|
|
|
if finish_reason == google_genai_types.FinishReason.SAFETY:
|
|
await ctx.send(f"AICodeAgent: AI response was blocked due to safety settings: {safety_ratings_str}")
|
|
self._add_to_conversation_history(user_id, role="model", text_content=f"[Blocked by Safety: {safety_ratings_str}]")
|
|
return
|
|
else:
|
|
await ctx.send(f"AICodeAgent: AI returned an empty or non-text response. Finish Reason: {finish_reason}. Safety: {safety_ratings_str}")
|
|
self._add_to_conversation_history(user_id, role="model", text_content="[Empty or Non-Text Response]")
|
|
return
|
|
|
|
if not ai_response_text.strip():
|
|
await ctx.send("AICodeAgent: AI returned an empty response text.")
|
|
self._add_to_conversation_history(user_id, role="model", text_content="[Empty Response Text]")
|
|
return
|
|
|
|
self._add_to_conversation_history(user_id, role="model", text_content=ai_response_text)
|
|
print(f"AICodeAgentCog: AI Raw Response:\n{ai_response_text}")
|
|
|
|
# Parse for inline tool call
|
|
# _parse_and_execute_tool_call now returns -> Tuple[str, Optional[str]]
|
|
# status can be "TOOL_OUTPUT", "TASK_COMPLETE", "NO_TOOL"
|
|
# data is the tool output string, completion message, or original AI text
|
|
parse_status, parsed_data = await self._parse_and_execute_tool_call(ctx, ai_response_text)
|
|
|
|
if parse_status == "TASK_COMPLETE":
|
|
completion_message = parsed_data if parsed_data is not None else "Task marked as complete by AI."
|
|
await ctx.send(f"AICodeAgent: Task Complete!\n{completion_message}")
|
|
# Log AI's completion signal to history (optional, but good for context)
|
|
# self._add_to_conversation_history(user_id, role="model", text_content=f"TaskComplete: message: {completion_message}")
|
|
return # End of interaction
|
|
|
|
elif parse_status == "TOOL_OUTPUT":
|
|
tool_output_str = parsed_data
|
|
if tool_output_str is None: # Should not happen if status is TOOL_OUTPUT but defensive
|
|
tool_output_str = "Error: Tool executed but returned no output string."
|
|
|
|
print(f"AICodeAgentCog: Tool Output:\n{tool_output_str}")
|
|
self._add_to_conversation_history(user_id, role="user", text_content=tool_output_str) # Feed tool output back as 'user'
|
|
iteration_count += 1
|
|
# Optionally send tool output to Discord for transparency if desired
|
|
# if len(tool_output_str) < 1900 : await ctx.send(f"```{tool_output_str}```")
|
|
continue # Loop back to AI with tool output in history
|
|
|
|
elif parse_status == "NO_TOOL":
|
|
# No tool call found, this is the final AI response for this turn
|
|
final_ai_text = parsed_data # This is the original ai_response_text
|
|
if final_ai_text is None: # Should not happen
|
|
final_ai_text = "AI provided no textual response."
|
|
|
|
if len(final_ai_text) > 1950:
|
|
await ctx.send(final_ai_text[:1950] + "\n...(message truncated)")
|
|
else:
|
|
await ctx.send(final_ai_text)
|
|
return # End of interaction
|
|
else: # Should not happen
|
|
await ctx.send("AICodeAgent: Internal error - unknown parse status from tool parser.")
|
|
return
|
|
|
|
except google_exceptions.GoogleAPICallError as e:
|
|
await ctx.send(f"AICodeAgent: Vertex AI API call failed: {e}")
|
|
return
|
|
except Exception as e:
|
|
await ctx.send(f"AICodeAgent: An unexpected error occurred during AI interaction: {e}")
|
|
print(f"AICodeAgentCog: Interaction Error: {type(e).__name__} - {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return
|
|
|
|
# Iteration limit check (moved inside loop for clarity, but logic is similar)
|
|
if iteration_count >= max_iterations:
|
|
await ctx.send(f"AICodeAgent: Reached iteration limit ({max_iterations}).")
|
|
try:
|
|
check = lambda m: m.author == ctx.author and m.channel == ctx.channel and \
|
|
m.content.lower().startswith(("yes", "no", "continue", "feedback"))
|
|
|
|
await ctx.send("Continue processing? (yes/no/feedback <your feedback>):")
|
|
user_response_msg = await self.bot.wait_for('message', check=check, timeout=300.0)
|
|
user_response_content = user_response_msg.content.lower()
|
|
|
|
if user_response_content.startswith("yes") or user_response_content.startswith("continue"):
|
|
iteration_count = 0 # Reset iteration count
|
|
self._add_to_conversation_history(user_id, role="user", text_content="[User approved continuation]")
|
|
await ctx.send("Continuing...")
|
|
continue
|
|
elif user_response_content.startswith("feedback"):
|
|
feedback_text = user_response_msg.content[len("feedback"):].strip()
|
|
iteration_count = 0 # Reset
|
|
self._add_to_conversation_history(user_id, role="user", text_content=f"System Feedback: {feedback_text}")
|
|
await ctx.send("Continuing with feedback...")
|
|
continue
|
|
else: # No or other
|
|
await ctx.send("AICodeAgent: Processing stopped by user.")
|
|
return
|
|
except asyncio.TimeoutError:
|
|
await ctx.send("AICodeAgent: Continuation prompt timed out. Stopping.")
|
|
return
|
|
|
|
# If loop finishes due to max_iterations without reset (should be caught by above)
|
|
if iteration_count >= max_iterations :
|
|
await ctx.send("AICodeAgent: Stopped due to reaching maximum processing iterations.")
|
|
|
|
@commands.command(name="codeagent", aliases=["ca"])
|
|
@commands.is_owner()
|
|
async def codeagent_command(self, ctx: commands.Context, *, prompt: str):
|
|
"""Interacts with the AI Code Agent."""
|
|
if not self.genai_client:
|
|
await ctx.send("AICodeAgent: Google GenAI Client is not initialized. Cannot process request.")
|
|
return
|
|
if not prompt:
|
|
await ctx.send("AICodeAgent: Please provide a prompt for the agent.")
|
|
return
|
|
|
|
await self._process_agent_interaction(ctx, prompt)
|
|
|
|
|
|
async def setup(bot: commands.Bot):
|
|
# Ensure PROJECT_ID and LOCATION are available before adding cog
|
|
# Or allow loading and let commands fail gracefully if genai_client is None
|
|
if not PROJECT_ID or not LOCATION:
|
|
print("AICodeAgentCog: Cannot load cog as PROJECT_ID or LOCATION is missing.")
|
|
# Optionally, raise an error or just don't add the cog
|
|
# For now, let it load but genai_client will be None and commands using it should check
|
|
|
|
cog = AICodeAgentCog(bot)
|
|
await bot.add_cog(cog)
|
|
print("AICodeAgentCog loaded.") |