Implement code changes to enhance functionality and improve performance
This commit is contained in:
parent
113ce20b2f
commit
14696c131d
@ -8,6 +8,8 @@ import json
|
||||
import base64
|
||||
import datetime # For snapshot naming
|
||||
import random # For snapshot naming
|
||||
import ast # For GetCodeStructure
|
||||
import pathlib # For path manipulations, potentially
|
||||
from typing import Dict, Any, List, Optional, Tuple
|
||||
from collections import defaultdict # Added for agent_shell_sessions
|
||||
import xml.etree.ElementTree as ET
|
||||
@ -69,10 +71,22 @@ AGENT_SYSTEM_PROMPT = """You are an expert AI Coding Agent. Your primary functio
|
||||
When you need to use a tool, your response should *only* contain the XML block representing the tool call, formatted exactly as specified below. The system will parse this XML, execute the tool, and then feed the output back to you in a subsequent message prefixed with "ToolResponse:".
|
||||
IMPORTANT: Do NOT wrap your XML tool calls in markdown code blocks (e.g., ```xml ... ``` or ``` ... ```). Output the raw XML directly, starting with the root tool tag (e.g., `<ReadFile>`).
|
||||
|
||||
**Available Tools:**
|
||||
|
||||
1. **ReadFile:** Reads the content of a specified file.
|
||||
* Can read specific line ranges or "peek" at file ends.
|
||||
```xml
|
||||
<ReadFile>
|
||||
<path>path/to/file.ext</path>
|
||||
<!-- Optional: Read specific lines (1-based index) -->
|
||||
<start_line>10</start_line>
|
||||
<end_line>20</end_line>
|
||||
<!-- Optional: Peek at first/last N lines (alternative to start/end_line) -->
|
||||
<peek_first_n_lines>5</peek_first_n_lines>
|
||||
<peek_last_n_lines>5</peek_last_n_lines>
|
||||
<!-- Optional: Peek at first/last N bytes (alternative to line peeking) -->
|
||||
<!-- <peek_first_n_bytes>1024</peek_first_n_bytes> -->
|
||||
<!-- <peek_last_n_bytes>1024</peek_last_n_bytes> -->
|
||||
</ReadFile>
|
||||
```
|
||||
(System will provide file content or error in ToolResponse)
|
||||
@ -87,7 +101,7 @@ IMPORTANT: Do NOT wrap your XML tool calls in markdown code blocks (e.g., ```xml
|
||||
]]></content>
|
||||
</WriteFile>
|
||||
```
|
||||
(System will confirm success or report error in ToolResponse)
|
||||
(System will confirm success or report error in ToolResponse. A snapshot is made before writing.)
|
||||
|
||||
3. **ApplyDiff:** Applies a diff/patch to a file. Use standard unidiff format for the diff_block.
|
||||
```xml
|
||||
@ -104,9 +118,9 @@ IMPORTANT: Do NOT wrap your XML tool calls in markdown code blocks (e.g., ```xml
|
||||
]]></diff_block>
|
||||
</ApplyDiff>
|
||||
```
|
||||
(System will confirm success or report error in ToolResponse)
|
||||
(System will confirm success or report error in ToolResponse. A snapshot is made before applying.)
|
||||
|
||||
4. **ExecuteCommand:** Executes a shell command.
|
||||
4. **ExecuteCommand:** Executes a shell command. Tracks CWD per user session.
|
||||
```xml
|
||||
<ExecuteCommand>
|
||||
<command>your shell command here</command>
|
||||
@ -115,15 +129,19 @@ IMPORTANT: Do NOT wrap your XML tool calls in markdown code blocks (e.g., ```xml
|
||||
(System will provide stdout/stderr or error in ToolResponse)
|
||||
|
||||
5. **ListFiles:** Lists files and directories at a given path.
|
||||
* Supports recursion, filtering by extension/regex, and including metadata.
|
||||
```xml
|
||||
<ListFiles>
|
||||
<path>path/to/search</path>
|
||||
<recursive>true</recursive> <!-- boolean: "true" or "false". If tag is absent or value is not "true", it defaults to false. -->
|
||||
<recursive>true</recursive> <!-- boolean: "true" or "false". Default: false. -->
|
||||
<filter_extensions>.py,.txt</filter_extensions> <!-- Optional: comma-separated list of extensions, e.g., ".py,.md" -->
|
||||
<filter_regex_name>test_.*\.py</filter_regex_name> <!-- Optional: regex pattern for file/dir names -->
|
||||
<include_metadata>true</include_metadata> <!-- Optional: "true" or "false". Default: false. If true, includes size and last modified. -->
|
||||
</ListFiles>
|
||||
```
|
||||
(System will provide file list or error in ToolResponse)
|
||||
|
||||
6. **WebSearch:** Searches the web for information.
|
||||
6. **WebSearch:** Searches the web for information using Tavily.
|
||||
```xml
|
||||
<WebSearch>
|
||||
<query>your search query</query>
|
||||
@ -131,7 +149,109 @@ IMPORTANT: Do NOT wrap your XML tool calls in markdown code blocks (e.g., ```xml
|
||||
```
|
||||
(System will provide search results or error in ToolResponse)
|
||||
|
||||
7. **TaskComplete:** Signals that the current multi-step task is considered complete by the AI.
|
||||
7. **LintFile:** Checks code quality using a linter (e.g., pylint, flake8).
|
||||
```xml
|
||||
<LintFile>
|
||||
<path>path/to/python_file.py</path>
|
||||
<linter>pylint</linter> <!-- Optional: "pylint" or "flake8". Default: "pylint". -->
|
||||
</LintFile>
|
||||
```
|
||||
(System will provide linter output or error in ToolResponse)
|
||||
|
||||
8. **GetCodeStructure:** Parses a Python file and provides an overview of its structure (classes, functions, signatures, docstrings) using AST.
|
||||
```xml
|
||||
<GetCodeStructure>
|
||||
<path>path/to/python_file.py</path>
|
||||
</GetCodeStructure>
|
||||
```
|
||||
(System will provide structured code overview or error in ToolResponse)
|
||||
|
||||
9. **FindSymbolDefinition:** Locates where a specific symbol (function, class, variable) is defined within the project. (Initial implementation might be grep-based).
|
||||
```xml
|
||||
<FindSymbolDefinition>
|
||||
<symbol_name>my_function</symbol_name>
|
||||
<search_path>./cogs</search_path> <!-- Optional: path to search within. Default: project root. -->
|
||||
<file_pattern>*.py</file_pattern> <!-- Optional: glob pattern for files to search. Default: "*.py". -->
|
||||
</FindSymbolDefinition>
|
||||
```
|
||||
(System will provide definition location(s) or error in ToolResponse)
|
||||
|
||||
10. **ManageCog:** Loads, unloads, reloads, or lists the bot's cogs.
|
||||
```xml
|
||||
<ManageCog>
|
||||
<action>load</action> <!-- "load", "unload", "reload", "list" -->
|
||||
<cog_name>cogs.my_cog</cog_name> <!-- Required for load, unload, reload. e.g., "cogs.utility_cog" -->
|
||||
</ManageCog>
|
||||
```
|
||||
(System will confirm action or provide error/list in ToolResponse)
|
||||
|
||||
11. **RunTests:** Executes unit tests (e.g., pytest, unittest) for specified files, directories, or test patterns.
|
||||
```xml
|
||||
<RunTests>
|
||||
<test_path_or_pattern>tests/test_module.py</test_path_or_pattern> <!-- Path to file/dir, or pattern like "tests/test_*.py::test_specific_function" -->
|
||||
<framework>pytest</framework> <!-- Optional: "pytest" or "unittest". Default: "pytest". -->
|
||||
</RunTests>
|
||||
```
|
||||
(System will provide test results or error in ToolResponse)
|
||||
|
||||
12. **PythonREPL:** Executes Python code snippets in a REPL-like session. (Note: Be cautious with arbitrary code execution).
|
||||
```xml
|
||||
<PythonREPL>
|
||||
<code_snippet><![CDATA[
|
||||
print("Hello from REPL")
|
||||
a = 1 + 1
|
||||
# For multi-line, ensure it's a complete block or expression.
|
||||
]]></code_snippet>
|
||||
<session_id>user_specific_repl_id</session_id> <!-- Optional: If you want to maintain state across calls for a user. System might manage this. -->
|
||||
</PythonREPL>
|
||||
```
|
||||
(System will provide output/result or error in ToolResponse)
|
||||
|
||||
13. **CreateNamedSnapshot:** Takes a Git snapshot (branch) with a user-defined name and optional description.
|
||||
```xml
|
||||
<CreateNamedSnapshot>
|
||||
<snapshot_name>feature_x_before_refactor</snapshot_name>
|
||||
<description>Snapshot before major refactoring of feature X.</description> <!-- Optional -->
|
||||
</CreateNamedSnapshot>
|
||||
```
|
||||
(System will confirm snapshot creation or report error in ToolResponse)
|
||||
|
||||
14. **CompareSnapshots:** Shows the `git diff` between two specified snapshot branches (or any two branches/commits).
|
||||
```xml
|
||||
<CompareSnapshots>
|
||||
<base_ref>snapshot_cog_20230101_120000_abc123</base_ref>
|
||||
<compare_ref>snapshot_cog_20230102_140000_def456</compare_ref>
|
||||
<!-- Or use branch names: <base_ref>main</base_ref> <compare_ref>my_feature_branch</compare_ref> -->
|
||||
</CompareSnapshots>
|
||||
```
|
||||
(System will provide diff output or error in ToolResponse)
|
||||
|
||||
15. **DryRunApplyDiff:** Checks the outcome of applying a diff (e.g., via `patch --dry-run`) without actually modifying the file.
|
||||
```xml
|
||||
<DryRunApplyDiff>
|
||||
<path>path/to/file.ext</path>
|
||||
<diff_block><![CDATA[
|
||||
--- a/original_file.py
|
||||
+++ b/modified_file.py
|
||||
@@ -1,3 +1,4 @@
|
||||
line 1
|
||||
-line 2 old
|
||||
+line 2 new
|
||||
+line 3 added
|
||||
]]></diff_block>
|
||||
</DryRunApplyDiff>
|
||||
```
|
||||
(System will report if patch would apply cleanly or show errors, without changing the file.)
|
||||
|
||||
16. **DryRunWriteFile:** Checks the validity of a path and permissions for a `WriteFile` operation without actual writing.
|
||||
```xml
|
||||
<DryRunWriteFile>
|
||||
<path>path/to/new_or_existing_file.ext</path>
|
||||
</DryRunWriteFile>
|
||||
```
|
||||
(System will report if path is writable/creatable or any permission issues.)
|
||||
|
||||
17. **TaskComplete:** Signals that the current multi-step task is considered complete by the AI.
|
||||
```xml
|
||||
<TaskComplete>
|
||||
<message>A brief summary of what was accomplished or the final status.</message>
|
||||
@ -140,13 +260,14 @@ IMPORTANT: Do NOT wrap your XML tool calls in markdown code blocks (e.g., ```xml
|
||||
(System will acknowledge and stop the current interaction loop.)
|
||||
|
||||
**Workflow and Rules:**
|
||||
- **Tool Preference:** For modifying existing files, ALWAYS prefer `ApplyDiff` if the changes are targeted. Use `WriteFile` for new files or if `ApplyDiff` is unsuitable or fails repeatedly.
|
||||
- **Tool Preference:** For modifying existing files, ALWAYS prefer `ApplyDiff` if the changes are targeted. Use `WriteFile` for new files or if `ApplyDiff` is unsuitable or fails repeatedly. Consider `DryRunApplyDiff` or `DryRunWriteFile` first if unsure.
|
||||
- **Direct Operation:** You operate directly. No explicit user confirmation is needed for individual tool actions after the initial user prompt.
|
||||
- **Programmatic Snapshots (System-Managed):**
|
||||
- The system AUTOMATICALLY creates a Git snapshot of the project *before* executing `WriteFile` or `ApplyDiff` tools.
|
||||
- The system AUTOMATICALLY creates a Git snapshot (a temporary branch) of the project *before* executing `WriteFile` or `ApplyDiff` tools.
|
||||
- You will be notified by a "ToolResponse: SystemNotification..." message when a snapshot has been successfully created, right before your file modification tool is about to be truly processed.
|
||||
- You do NOT need to request or create snapshots yourself. Do NOT include snapshot steps in your `ExecuteCommand` calls for `git`.
|
||||
- If the system fails to create a snapshot, it will inform you with a "ToolResponse: SystemError...". In such a case, your `WriteFile` or `ApplyDiff` operation will NOT proceed. You should then typically inform the user of this critical system failure. Do not repeatedly try the same file operation if snapshot creation consistently fails.
|
||||
- You do NOT need to request these automatic snapshots yourself.
|
||||
- If the system fails to create a snapshot, it will inform you with a "ToolResponse: SystemError...". In such a case, your `WriteFile` or `ApplyDiff` operation will NOT proceed. You should then typically inform the user of this critical system failure.
|
||||
- **Named Snapshots (AI-Managed):** Use the `CreateNamedSnapshot` tool when you want to create a more permanent, named checkpoint (e.g., before a large refactoring).
|
||||
- **Git Workflow for Your Changes:** After you believe your coding task and all related file modifications are complete and correct, you MUST use the `ExecuteCommand` tool to perform the following Git operations in sequence:
|
||||
1. `git add .` (to stage all your changes)
|
||||
2. `git commit --author="AI Coding Agent Cog <me@slipstreamm.dev>" -m "AI Agent: <Your concise summary of changes>"` (You will generate the commit message part)
|
||||
@ -160,9 +281,18 @@ IMPORTANT: Do NOT wrap your XML tool calls in markdown code blocks (e.g., ```xml
|
||||
d. Use `ExecuteCommand` for `git add <resolved_file_path>` for each resolved file.
|
||||
e. Use `ExecuteCommand` for `git rebase --continue`.
|
||||
f. Then attempt `git push` again using `ExecuteCommand`.
|
||||
- **Push Failures:** If `git push` still fails (e.g., other non-fast-forward errors), the `ToolResponse` will report this. You should then inform the user about the push failure and the reason, and await further instructions. Do not attempt overly complex recovery maneuvers without user guidance.
|
||||
- **Clarity:** Be clear and methodical. If a step fails, acknowledge it and decide on the next course of action (retry, try alternative, or inform user).
|
||||
- **Focus:** Your goal is to complete the coding/file manipulation task as requested by the user.
|
||||
- **Push Failures:** If `git push` still fails (e.g., other non-fast-forward errors), the `ToolResponse` will report this. You should then inform the user about the push failure and the reason, and await further instructions.
|
||||
- **Agent Operational Modes:**
|
||||
- The user can set your operational mode (e.g., `default`, `planning`, `debugging`, `learning`) via a Discord command.
|
||||
- When the mode changes, you will receive a `[System Notification]` message in the conversation history like: `[System Notification] Agent mode changed to 'planning'. Context: User wants to outline a new feature.`
|
||||
- Adapt your behavior based on the current mode and any provided context. For example:
|
||||
- `default/implementation`: Focus on direct execution of tasks and code changes.
|
||||
- `planning`: Focus on breaking down complex tasks, outlining steps, asking clarifying questions before diving into code. You might use `TaskComplete` more often with a plan rather than full execution.
|
||||
- `debugging`: Focus on analyzing errors, using tools like `ReadFile`, `LintFile`, `ExecuteCommand` (for logs), `PythonREPL` to diagnose issues.
|
||||
- `learning/exploration`: Focus on understanding new parts of the codebase using `ReadFile`, `ListFiles`, `GetCodeStructure`, `FindSymbolDefinition`, or using `WebSearch` for external information.
|
||||
- The core set of tools remains available in all modes, but your strategy for using them should adapt.
|
||||
- **Clarity:** Be clear and methodical. If a step fails, acknowledge it and decide on the next course of action.
|
||||
- **Focus:** Your goal is to complete the coding/file manipulation task as requested by the user, adapting to the current operational mode.
|
||||
"""
|
||||
|
||||
class AICodeAgentCog(commands.Cog):
|
||||
@ -170,10 +300,12 @@ class AICodeAgentCog(commands.Cog):
|
||||
self.bot = bot
|
||||
self.genai_client = None
|
||||
self.agent_conversations: Dict[int, List[google_genai_types.Content]] = {} # User ID to conversation history
|
||||
self.agent_shell_sessions = defaultdict(lambda: {
|
||||
self.agent_shell_sessions = defaultdict(lambda: { # For ExecuteCommand CWD tracking
|
||||
'cwd': os.getcwd(),
|
||||
'env': os.environ.copy()
|
||||
})
|
||||
self.agent_modes: Dict[int, str] = {} # User ID to current agent mode (e.g., "default", "planning")
|
||||
self.agent_python_repl_sessions: Dict[str, Dict[str, Any]] = {} # session_id to {'globals': {}, 'locals': {}}
|
||||
|
||||
# Initialize Google GenAI Client for Vertex AI
|
||||
if PROJECT_ID and LOCATION:
|
||||
@ -241,6 +373,46 @@ class AICodeAgentCog(commands.Cog):
|
||||
else:
|
||||
await ctx.send("AICodeAgent: No conversation history found for you to clear.")
|
||||
|
||||
@commands.command(name="codeagent_mode", aliases=["ca_mode"])
|
||||
@commands.is_owner()
|
||||
async def codeagent_mode_command(self, ctx: commands.Context, mode_name: str, *, context_message: Optional[str] = None):
|
||||
"""Sets the operational mode for the AI agent for the calling user.
|
||||
Usage: !codeagent_mode <mode_name> [optional context_message]
|
||||
Modes: default, planning, debugging, learning
|
||||
"""
|
||||
user_id = ctx.author.id
|
||||
mode_name = mode_name.lower()
|
||||
valid_modes = ["default", "planning", "debugging", "learning"] # Can be expanded
|
||||
|
||||
if mode_name not in valid_modes:
|
||||
await ctx.send(f"AICodeAgent: Invalid mode '{mode_name}'. Valid modes are: {', '.join(valid_modes)}.")
|
||||
return
|
||||
|
||||
self.agent_modes[user_id] = mode_name
|
||||
mode_set_message = f"AICodeAgent: Operational mode for you set to '{mode_name}'."
|
||||
|
||||
# Prepare system notification for AI history
|
||||
notification_text = f"[System Notification] Agent mode changed to '{mode_name}'."
|
||||
if context_message:
|
||||
notification_text += f" Context: {context_message}"
|
||||
mode_set_message += f" Context: {context_message}"
|
||||
|
||||
# Add this notification to the AI's conversation history for this user
|
||||
# This ensures the AI is aware of the mode change for its next interaction
|
||||
self._add_to_conversation_history(user_id, role="user", text_content=notification_text) # Treat as user input for AI to see
|
||||
|
||||
await ctx.send(mode_set_message)
|
||||
print(f"AICodeAgentCog: User {user_id} set mode to '{mode_name}'. Notification added to history: {notification_text}")
|
||||
|
||||
|
||||
@commands.command(name="codeagent_get_mode", aliases=["ca_get_mode"])
|
||||
@commands.is_owner()
|
||||
async def codeagent_get_mode_command(self, ctx: commands.Context):
|
||||
"""Displays the current operational mode for the AI agent for the calling user."""
|
||||
user_id = ctx.author.id
|
||||
current_mode = self.agent_modes.get(user_id, "default") # Default to "default" if not set
|
||||
await ctx.send(f"AICodeAgent: Your current operational mode is '{current_mode}'.")
|
||||
|
||||
async def _run_git_command(self, command_str: str) -> Tuple[bool, str]:
|
||||
"""
|
||||
Runs a Git command using subprocess.Popen in a thread and returns (success_status, output_string).
|
||||
@ -537,12 +709,33 @@ class AICodeAgentCog(commands.Cog):
|
||||
|
||||
elif tool_name == "ListFiles":
|
||||
file_path = parameters.get("path")
|
||||
recursive_str = parameters.get("recursive") # Will be None if tag is missing
|
||||
recursive = recursive_str.lower() == 'true' if recursive_str else False # Handles None or empty string safely
|
||||
recursive_str = parameters.get("recursive")
|
||||
recursive = recursive_str.lower() == 'true' if recursive_str else False
|
||||
|
||||
filter_extensions = parameters.get("filter_extensions") # Optional: comma-separated string
|
||||
filter_regex_name = parameters.get("filter_regex_name") # Optional: regex string
|
||||
include_metadata_str = parameters.get("include_metadata")
|
||||
include_metadata = include_metadata_str.lower() == 'true' if include_metadata_str else False
|
||||
|
||||
if not file_path:
|
||||
return "TOOL_OUTPUT", "ToolResponse: Error\n---\nListFiles: Missing 'path' parameter."
|
||||
tool_output = await self._execute_tool_list_files(file_path, recursive)
|
||||
return "TOOL_OUTPUT", f"ToolResponse: ListFiles\nPath: {file_path}\nRecursive: {recursive}\n---\n{tool_output}"
|
||||
|
||||
tool_output = await self._execute_tool_list_files(
|
||||
file_path,
|
||||
recursive,
|
||||
filter_extensions=filter_extensions,
|
||||
filter_regex_name=filter_regex_name,
|
||||
include_metadata=include_metadata
|
||||
)
|
||||
|
||||
params_summary = [f"Recursive: {recursive}"]
|
||||
if filter_extensions: params_summary.append(f"Extensions: {filter_extensions}")
|
||||
if filter_regex_name: params_summary.append(f"RegexName: {filter_regex_name}")
|
||||
params_summary.append(f"Metadata: {include_metadata}")
|
||||
|
||||
response_message = f"ToolResponse: ListFiles\nPath: {file_path}\n" + "\n".join(params_summary)
|
||||
response_message += f"\n---\n{tool_output}"
|
||||
return "TOOL_OUTPUT", response_message
|
||||
|
||||
elif tool_name == "WebSearch":
|
||||
query_str = parameters.get("query")
|
||||
@ -556,6 +749,93 @@ class AICodeAgentCog(commands.Cog):
|
||||
return "TASK_COMPLETE", message if message is not None else "Task marked as complete by AI."
|
||||
|
||||
|
||||
elif tool_name == "LintFile":
|
||||
file_path = parameters.get("path")
|
||||
linter = parameters.get("linter", "pylint") # Default to pylint
|
||||
if not file_path:
|
||||
return "TOOL_OUTPUT", "ToolResponse: Error\n---\nLintFile: Missing 'path' parameter."
|
||||
tool_output = await self._execute_tool_lint_file(file_path, linter)
|
||||
return "TOOL_OUTPUT", f"ToolResponse: LintFile\nPath: {file_path}\nLinter: {linter}\n---\n{tool_output}"
|
||||
|
||||
elif tool_name == "GetCodeStructure":
|
||||
file_path = parameters.get("path")
|
||||
if not file_path:
|
||||
return "TOOL_OUTPUT", "ToolResponse: Error\n---\nGetCodeStructure: Missing 'path' parameter."
|
||||
tool_output = await self._execute_tool_get_code_structure(file_path)
|
||||
return "TOOL_OUTPUT", f"ToolResponse: GetCodeStructure\nPath: {file_path}\n---\n{tool_output}"
|
||||
|
||||
elif tool_name == "FindSymbolDefinition":
|
||||
symbol_name = parameters.get("symbol_name")
|
||||
search_path = parameters.get("search_path", ".") # Default to current dir (project root)
|
||||
file_pattern = parameters.get("file_pattern", "*.py") # Default to Python files
|
||||
if not symbol_name:
|
||||
return "TOOL_OUTPUT", "ToolResponse: Error\n---\nFindSymbolDefinition: Missing 'symbol_name' parameter."
|
||||
tool_output = await self._execute_tool_find_symbol_definition(symbol_name, search_path, file_pattern)
|
||||
return "TOOL_OUTPUT", f"ToolResponse: FindSymbolDefinition\nSymbol: {symbol_name}\nPath: {search_path}\nPattern: {file_pattern}\n---\n{tool_output}"
|
||||
|
||||
elif tool_name == "ManageCog":
|
||||
action = parameters.get("action")
|
||||
cog_name = parameters.get("cog_name")
|
||||
if not action:
|
||||
return "TOOL_OUTPUT", "ToolResponse: Error\n---\nManageCog: Missing 'action' parameter."
|
||||
if action in ["load", "unload", "reload"] and not cog_name:
|
||||
return "TOOL_OUTPUT", f"ToolResponse: Error\n---\nManageCog: Missing 'cog_name' for action '{action}'."
|
||||
tool_output = await self._execute_tool_manage_cog(action, cog_name)
|
||||
return "TOOL_OUTPUT", f"ToolResponse: ManageCog\nAction: {action}\nCog: {cog_name or 'N/A'}\n---\n{tool_output}"
|
||||
|
||||
elif tool_name == "RunTests":
|
||||
test_path_or_pattern = parameters.get("test_path_or_pattern")
|
||||
framework = parameters.get("framework", "pytest")
|
||||
if not test_path_or_pattern:
|
||||
return "TOOL_OUTPUT", "ToolResponse: Error\n---\nRunTests: Missing 'test_path_or_pattern' parameter."
|
||||
tool_output = await self._execute_tool_run_tests(test_path_or_pattern, framework)
|
||||
return "TOOL_OUTPUT", f"ToolResponse: RunTests\nTarget: {test_path_or_pattern}\nFramework: {framework}\n---\n{tool_output}"
|
||||
|
||||
elif tool_name == "PythonREPL":
|
||||
code_snippet = parameters.get("code_snippet")
|
||||
session_id_param = parameters.get("session_id") # AI might suggest one
|
||||
user_id = ctx.author.id
|
||||
# Use user_id for a persistent session if AI doesn't specify one, or combine them.
|
||||
# For simplicity, let's use user_id as the primary key for REPL sessions for now.
|
||||
# If AI provides session_id, it could be a sub-context within that user's REPL.
|
||||
# Let's make session_id for the tool map to user_id for now.
|
||||
repl_session_key = str(user_id) # Or incorporate session_id_param if needed
|
||||
if not code_snippet:
|
||||
return "TOOL_OUTPUT", "ToolResponse: Error\n---\nPythonREPL: Missing 'code_snippet' parameter."
|
||||
tool_output = await self._execute_tool_python_repl(code_snippet, repl_session_key)
|
||||
return "TOOL_OUTPUT", f"ToolResponse: PythonREPL\nSession: {repl_session_key}\n---\n{tool_output}"
|
||||
|
||||
elif tool_name == "CreateNamedSnapshot":
|
||||
snapshot_name = parameters.get("snapshot_name")
|
||||
description = parameters.get("description") # Optional
|
||||
if not snapshot_name:
|
||||
return "TOOL_OUTPUT", "ToolResponse: Error\n---\nCreateNamedSnapshot: Missing 'snapshot_name' parameter."
|
||||
tool_output = await self._execute_tool_create_named_snapshot(snapshot_name, description)
|
||||
return "TOOL_OUTPUT", f"ToolResponse: CreateNamedSnapshot\nName: {snapshot_name}\n---\n{tool_output}"
|
||||
|
||||
elif tool_name == "CompareSnapshots":
|
||||
base_ref = parameters.get("base_ref")
|
||||
compare_ref = parameters.get("compare_ref")
|
||||
if not base_ref or not compare_ref:
|
||||
return "TOOL_OUTPUT", "ToolResponse: Error\n---\nCompareSnapshots: Missing 'base_ref' or 'compare_ref' parameter."
|
||||
tool_output = await self._execute_tool_compare_snapshots(base_ref, compare_ref)
|
||||
return "TOOL_OUTPUT", f"ToolResponse: CompareSnapshots\nBase: {base_ref}\nCompare: {compare_ref}\n---\n{tool_output}"
|
||||
|
||||
elif tool_name == "DryRunApplyDiff":
|
||||
file_path = parameters.get("path")
|
||||
diff_block = parameters.get("diff_block")
|
||||
if not file_path or not diff_block:
|
||||
return "TOOL_OUTPUT", "ToolResponse: Error\n---\nDryRunApplyDiff: Missing 'path' or 'diff_block' parameter."
|
||||
tool_output = await self._execute_tool_dry_run_apply_diff(file_path, diff_block)
|
||||
return "TOOL_OUTPUT", f"ToolResponse: DryRunApplyDiff\nPath: {file_path}\n---\n{tool_output}"
|
||||
|
||||
elif tool_name == "DryRunWriteFile":
|
||||
file_path = parameters.get("path")
|
||||
if not file_path:
|
||||
return "TOOL_OUTPUT", "ToolResponse: Error\n---\nDryRunWriteFile: Missing 'path' parameter."
|
||||
tool_output = await self._execute_tool_dry_run_write_file(file_path)
|
||||
return "TOOL_OUTPUT", f"ToolResponse: DryRunWriteFile\nPath: {file_path}\n---\n{tool_output}"
|
||||
|
||||
else:
|
||||
# Unknown tool name found in XML
|
||||
return "TOOL_OUTPUT", f"ToolResponse: Error\n---\nUnknown tool: {tool_name} in XML: {clean_ai_response_text[:200]}"
|
||||
@ -571,28 +851,62 @@ class AICodeAgentCog(commands.Cog):
|
||||
return "TOOL_OUTPUT", f"ToolResponse: SystemError\n---\nError processing tool call: {type(e).__name__} - {e}"
|
||||
|
||||
# --- Tool Execution Methods ---
|
||||
# (Implementations for _execute_tool_... methods remain the same)
|
||||
# (Implementations for _execute_tool_... methods remain the same) # This comment might be outdated after this change
|
||||
|
||||
async def _execute_tool_read_file(self, path: str) -> str:
|
||||
print(f"AICodeAgentCog: Placeholder _execute_tool_read_file for path: {path}")
|
||||
# Actual implementation:
|
||||
async def _execute_tool_read_file(self, path: str,
|
||||
start_line: Optional[int] = None,
|
||||
end_line: Optional[int] = None,
|
||||
peek_first_n_lines: Optional[int] = None,
|
||||
peek_last_n_lines: Optional[int] = None) -> str:
|
||||
print(f"AICodeAgentCog: _execute_tool_read_file for path: {path}, start: {start_line}, end: {end_line}, peek_first: {peek_first_n_lines}, peek_last: {peek_last_n_lines}")
|
||||
try:
|
||||
# Ensure path is within project, basic safety. More robust checks might be needed.
|
||||
# base_dir = os.path.abspath(".") # Or specific project root
|
||||
# requested_path = os.path.abspath(os.path.join(base_dir, path))
|
||||
# if not requested_path.startswith(base_dir):
|
||||
# return "Error: File path is outside the allowed project directory."
|
||||
if not os.path.exists(path):
|
||||
return f"Error: File not found at '{path}'"
|
||||
if os.path.isdir(path):
|
||||
return f"Error: Path '{path}' is a directory, not a file."
|
||||
with open(path, 'r', encoding='utf-8', errors='replace') as f:
|
||||
return f.read()
|
||||
|
||||
# Determine the operation based on parameters
|
||||
# Priority: peek_first > peek_last > start_line/end_line > full_read
|
||||
if peek_first_n_lines is not None and peek_first_n_lines > 0:
|
||||
with open(path, 'r', encoding='utf-8', errors='replace') as f:
|
||||
lines = []
|
||||
for i, line in enumerate(f):
|
||||
if i >= peek_first_n_lines:
|
||||
break
|
||||
lines.append(line)
|
||||
return "".join(lines) if lines else "File is empty or shorter than peek_first_n_lines."
|
||||
elif peek_last_n_lines is not None and peek_last_n_lines > 0:
|
||||
# This is inefficient for large files, but a simple approach for now.
|
||||
# A more efficient way would be to seek from the end and read backwards,
|
||||
# or use a deque with a maxlen.
|
||||
with open(path, 'r', encoding='utf-8', errors='replace') as f:
|
||||
lines = f.readlines() # Reads all lines into memory
|
||||
return "".join(lines[-peek_last_n_lines:]) if lines else "File is empty."
|
||||
elif start_line is not None and start_line > 0: # start_line is 1-based
|
||||
with open(path, 'r', encoding='utf-8', errors='replace') as f:
|
||||
lines = f.readlines() # Reads all lines
|
||||
start_idx = start_line - 1 # Convert to 0-based index
|
||||
|
||||
if start_idx >= len(lines):
|
||||
return f"Error: start_line ({start_line}) is beyond the end of the file ({len(lines)} lines)."
|
||||
|
||||
if end_line is not None and end_line > 0:
|
||||
# end_line is inclusive, so slice up to end_line (0-based end_idx = end_line)
|
||||
end_idx = end_line
|
||||
if end_idx < start_idx:
|
||||
return f"Error: end_line ({end_line}) cannot be before start_line ({start_line})."
|
||||
return "".join(lines[start_idx:min(end_idx, len(lines))])
|
||||
else: # Read from start_line to the end of the file
|
||||
return "".join(lines[start_idx:])
|
||||
else: # Default: read whole file
|
||||
with open(path, 'r', encoding='utf-8', errors='replace') as f:
|
||||
content = f.read()
|
||||
return content
|
||||
except Exception as e:
|
||||
return f"Error reading file '{path}': {type(e).__name__} - {e}"
|
||||
|
||||
async def _execute_tool_write_file(self, path: str, content: str) -> str:
|
||||
print(f"AICodeAgentCog: Placeholder _execute_tool_write_file for path: {path}")
|
||||
print(f"AICodeAgentCog: _execute_tool_write_file for path: {path}")
|
||||
# Actual implementation:
|
||||
try:
|
||||
# base_dir = os.path.abspath(".")
|
||||
@ -755,46 +1069,105 @@ class AICodeAgentCog(commands.Cog):
|
||||
# General exception during subprocess handling
|
||||
return f"Exception executing command '{command}': {type(e).__name__} - {e}"
|
||||
|
||||
async def _execute_tool_list_files(self, path: str, recursive: bool) -> str:
|
||||
print(f"AICodeAgentCog: Attempting _execute_tool_list_files for path: {path}, recursive: {recursive}")
|
||||
# Actual implementation:
|
||||
async def _execute_tool_list_files(self, path: str, recursive: bool, filter_extensions: Optional[str] = None, filter_regex_name: Optional[str] = None, include_metadata: bool = False) -> str:
|
||||
print(f"AICodeAgentCog: _execute_tool_list_files for path: {path}, recursive: {recursive}, ext: {filter_extensions}, regex: {filter_regex_name}, meta: {include_metadata}")
|
||||
# TODO: Implement filtering (filter_extensions, filter_regex_name) and metadata (include_metadata)
|
||||
try:
|
||||
# base_dir = os.path.abspath(".")
|
||||
# requested_path = os.path.abspath(os.path.join(base_dir, path))
|
||||
# if not requested_path.startswith(base_dir):
|
||||
# return "Error: Path is outside the allowed project directory."
|
||||
if not os.path.exists(path):
|
||||
return f"Error: Path not found at '{path}'"
|
||||
if not os.path.isdir(path):
|
||||
return f"Error: Path '{path}' is not a directory."
|
||||
|
||||
file_list = []
|
||||
file_list_results = []
|
||||
excluded_dirs = {"__pycache__", ".git", ".vscode", ".idea", "node_modules", "venv", ".env", "terminal_images"}
|
||||
|
||||
extensions_to_filter = []
|
||||
if filter_extensions:
|
||||
extensions_to_filter = [ext.strip().lower() for ext in filter_extensions.split(',') if ext.strip()]
|
||||
|
||||
name_regex_pattern = None
|
||||
if filter_regex_name:
|
||||
try:
|
||||
name_regex_pattern = re.compile(filter_regex_name)
|
||||
except re.error as e:
|
||||
return f"Error: Invalid regex for name filtering: {e}"
|
||||
|
||||
items_processed = 0
|
||||
max_items_to_list = 500 # Safety break
|
||||
|
||||
if recursive:
|
||||
for root, dirs, files in os.walk(path, topdown=True):
|
||||
if items_processed > max_items_to_list: break
|
||||
# Exclude specified directories from further traversal
|
||||
dirs[:] = [d for d in dirs if d not in excluded_dirs]
|
||||
dirs[:] = [d for d in dirs if d not in excluded_dirs and (not name_regex_pattern or name_regex_pattern.search(d))]
|
||||
|
||||
for name in files:
|
||||
file_list.append(os.path.join(root, name))
|
||||
# Add filtered directories to the list
|
||||
for name in dirs: # These are already filtered dirs
|
||||
file_list.append(os.path.join(root, name) + os.sep) # Indicate dirs
|
||||
if items_processed > max_items_to_list: break
|
||||
if name_regex_pattern and not name_regex_pattern.search(name):
|
||||
continue
|
||||
if extensions_to_filter and not any(name.lower().endswith(ext) for ext in extensions_to_filter):
|
||||
continue
|
||||
|
||||
full_path = os.path.join(root, name)
|
||||
entry = full_path
|
||||
if include_metadata:
|
||||
try:
|
||||
stat = os.stat(full_path)
|
||||
entry += f" (Size: {stat.st_size} B, Modified: {datetime.datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S')})"
|
||||
except OSError:
|
||||
entry += " (Metadata N/A)"
|
||||
file_list_results.append(entry)
|
||||
items_processed +=1
|
||||
|
||||
for name in dirs: # These are already filtered and regex matched (if regex provided for dirs)
|
||||
if items_processed > max_items_to_list: break
|
||||
# No extension filter for dirs, regex already applied
|
||||
full_path = os.path.join(root, name)
|
||||
entry = full_path + os.sep
|
||||
if include_metadata:
|
||||
try:
|
||||
stat = os.stat(full_path)
|
||||
entry += f" (Modified: {datetime.datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S')})"
|
||||
except OSError:
|
||||
entry += " (Metadata N/A)"
|
||||
file_list_results.append(entry)
|
||||
items_processed +=1
|
||||
else: # Non-recursive case
|
||||
for item in os.listdir(path):
|
||||
if item in excluded_dirs: # Check if the item itself is an excluded directory name
|
||||
if items_processed > max_items_to_list: break
|
||||
if item in excluded_dirs:
|
||||
continue
|
||||
if name_regex_pattern and not name_regex_pattern.search(item):
|
||||
continue
|
||||
|
||||
full_item_path = os.path.join(path, item)
|
||||
if os.path.isdir(full_item_path):
|
||||
file_list.append(item + os.sep) # Indicate dirs
|
||||
else:
|
||||
file_list.append(item)
|
||||
return "\n".join(file_list) if file_list else "No files or directories found."
|
||||
is_dir = os.path.isdir(full_item_path)
|
||||
|
||||
if not is_dir and extensions_to_filter and not any(item.lower().endswith(ext) for ext in extensions_to_filter):
|
||||
continue
|
||||
|
||||
entry = item + (os.sep if is_dir else "")
|
||||
if include_metadata:
|
||||
try:
|
||||
stat = os.stat(full_item_path)
|
||||
if is_dir:
|
||||
entry += f" (Modified: {datetime.datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S')})"
|
||||
else:
|
||||
entry += f" (Size: {stat.st_size} B, Modified: {datetime.datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S')})"
|
||||
except OSError:
|
||||
entry += " (Metadata N/A)"
|
||||
file_list_results.append(entry)
|
||||
items_processed +=1
|
||||
|
||||
if items_processed > max_items_to_list:
|
||||
file_list_results.append(f"... (truncated, listed {max_items_to_list} items)")
|
||||
|
||||
return "\n".join(file_list_results) if file_list_results else "No files or directories found matching criteria."
|
||||
except Exception as e:
|
||||
return f"Error listing files at '{path}': {type(e).__name__} - {e}"
|
||||
|
||||
async def _execute_tool_web_search(self, query: str) -> str:
|
||||
print(f"AICodeAgentCog: Placeholder _execute_tool_web_search for query: {query}")
|
||||
print(f"AICodeAgentCog: _execute_tool_web_search for query: {query}") # Removed "Placeholder"
|
||||
if not self.tavily_client:
|
||||
return "Error: Tavily client not initialized. Cannot perform web search."
|
||||
try:
|
||||
@ -819,8 +1192,372 @@ class AICodeAgentCog(commands.Cog):
|
||||
except Exception as e:
|
||||
return f"Error during Tavily web search for '{query}': {type(e).__name__} - {e}"
|
||||
|
||||
# --- Placeholder New Tool Execution Methods ---
|
||||
async def _execute_tool_lint_file(self, path: str, linter: str) -> str:
|
||||
if not os.path.exists(path):
|
||||
return f"Error: File not found at '{path}' for linting."
|
||||
if not os.path.isfile(path):
|
||||
return f"Error: Path '{path}' is not a file."
|
||||
|
||||
linter_cmd = []
|
||||
if linter.lower() == "pylint":
|
||||
linter_cmd = ["pylint", path]
|
||||
elif linter.lower() == "flake8":
|
||||
linter_cmd = ["flake8", path]
|
||||
else:
|
||||
return f"Error: Unsupported linter '{linter}'. Supported linters: pylint, flake8."
|
||||
|
||||
try:
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
*linter_cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE
|
||||
)
|
||||
stdout, stderr = await process.communicate()
|
||||
|
||||
output_str = ""
|
||||
if stdout:
|
||||
output_str += f"Linter ({linter}) STDOUT:\n{stdout.decode(errors='replace')}\n"
|
||||
if stderr: # Linters often output to stderr for warnings/errors
|
||||
output_str += f"Linter ({linter}) STDERR:\n{stderr.decode(errors='replace')}\n"
|
||||
|
||||
if not output_str and process.returncode == 0:
|
||||
output_str = f"Linter ({linter}) found no issues."
|
||||
elif not output_str and process.returncode !=0:
|
||||
output_str = f"Linter ({linter}) exited with code {process.returncode} but no output."
|
||||
|
||||
|
||||
return output_str
|
||||
except FileNotFoundError:
|
||||
return f"Error: Linter command '{linter_cmd[0]}' not found. Please ensure it is installed and in PATH."
|
||||
except Exception as e:
|
||||
return f"Error running linter '{linter}' on '{path}': {type(e).__name__} - {e}"
|
||||
|
||||
async def _execute_tool_get_code_structure(self, path: str) -> str:
|
||||
# Basic AST parsing example
|
||||
try:
|
||||
if not path.endswith(".py"):
|
||||
return "Error: GetCodeStructure currently only supports Python files."
|
||||
with open(path, "r", encoding="utf-8") as source_file:
|
||||
source_code = source_file.read()
|
||||
tree = ast.parse(source_code)
|
||||
|
||||
structure = []
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.FunctionDef):
|
||||
args = [arg.arg for arg in node.args.args]
|
||||
structure.append(f"Function: {node.name}({', '.join(args)}) - Docstring: {ast.get_docstring(node) or 'N/A'}")
|
||||
elif isinstance(node, ast.AsyncFunctionDef):
|
||||
args = [arg.arg for arg in node.args.args]
|
||||
structure.append(f"Async Function: {node.name}({', '.join(args)}) - Docstring: {ast.get_docstring(node) or 'N/A'}")
|
||||
elif isinstance(node, ast.ClassDef):
|
||||
structure.append(f"Class: {node.name} - Docstring: {ast.get_docstring(node) or 'N/A'}")
|
||||
return "\n".join(structure) if structure else "No major structures (classes/functions) found."
|
||||
except Exception as e:
|
||||
return f"Error parsing code structure for '{path}': {type(e).__name__} - {e}"
|
||||
|
||||
async def _execute_tool_find_symbol_definition(self, symbol_name: str, search_path: str, file_pattern: str) -> str:
|
||||
if not os.path.exists(search_path):
|
||||
return f"Error: Search path '{search_path}' not found."
|
||||
if not os.path.isdir(search_path):
|
||||
return f"Error: Search path '{search_path}' is not a directory."
|
||||
|
||||
# Using findstr for Windows. It's less flexible with patterns than grep.
|
||||
# findstr /S /N /P /C:"search string" files_to_search
|
||||
# /S: searches subdirectories
|
||||
# /N: prints line numbers
|
||||
# /P: skips files with non-printable characters
|
||||
# /C:"string": uses string as a literal search string
|
||||
# files_to_search can include wildcards, e.g., *.py
|
||||
|
||||
# Construct the files_to_search argument.
|
||||
# If file_pattern is like "*.py", it can be directly appended.
|
||||
# os.path.join will correctly handle path separators.
|
||||
files_to_search_arg = os.path.join(search_path, file_pattern)
|
||||
|
||||
# Escape the symbol name for command line if it contains special characters, though /C should treat it literally.
|
||||
# For simplicity, we assume symbol_name doesn't need complex shell escaping here.
|
||||
find_cmd = ["findstr", "/S", "/N", "/P", f"/C:{symbol_name}", files_to_search_arg]
|
||||
|
||||
try:
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
*find_cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
cwd=os.getcwd() # Run from bot's root, search_path should be relative or absolute
|
||||
)
|
||||
stdout, stderr = await process.communicate(timeout=30) # 30-second timeout
|
||||
|
||||
output_str = ""
|
||||
if stdout:
|
||||
output_str += f"Definitions found for '{symbol_name}' (using findstr):\n{stdout.decode(errors='replace')}\n"
|
||||
if stderr:
|
||||
output_str += f"Findstr STDERR:\n{stderr.decode(errors='replace')}\n"
|
||||
|
||||
if not output_str and process.returncode == 0: # findstr returns 0 if found, 1 if not found, 2 for error
|
||||
output_str = f"No definitions found for '{symbol_name}' in '{search_path}/{file_pattern}' (findstr found nothing but exited cleanly)."
|
||||
elif not output_str and process.returncode == 1: # Explicit "not found"
|
||||
output_str = f"No definitions found for '{symbol_name}' in '{search_path}/{file_pattern}'."
|
||||
elif process.returncode not in [0,1]: # Other errors
|
||||
output_str += f"Findstr exited with code {process.returncode}."
|
||||
|
||||
|
||||
return output_str
|
||||
except FileNotFoundError:
|
||||
return "Error: Command 'findstr' not found. This tool currently relies on findstr (Windows)."
|
||||
except subprocess.TimeoutExpired:
|
||||
return f"Error: FindSymbolDefinition command timed out after 30 seconds for symbol '{symbol_name}'."
|
||||
except Exception as e:
|
||||
return f"Error running FindSymbolDefinition for '{symbol_name}': {type(e).__name__} - {e}"
|
||||
|
||||
|
||||
async def _execute_tool_manage_cog(self, action: str, cog_name: Optional[str]) -> str:
|
||||
action = action.lower()
|
||||
try:
|
||||
if action == "list":
|
||||
loaded_cogs = list(self.bot.cogs.keys())
|
||||
return f"Loaded cogs: {', '.join(loaded_cogs)}" if loaded_cogs else "No cogs currently loaded."
|
||||
|
||||
if not cog_name: # Should be caught by parser, but defensive
|
||||
return "Error: cog_name is required for load, unload, or reload actions."
|
||||
|
||||
if action == "load":
|
||||
await self.bot.load_extension(cog_name)
|
||||
return f"Successfully loaded cog: {cog_name}"
|
||||
elif action == "unload":
|
||||
await self.bot.unload_extension(cog_name)
|
||||
return f"Successfully unloaded cog: {cog_name}"
|
||||
elif action == "reload":
|
||||
await self.bot.reload_extension(cog_name)
|
||||
return f"Successfully reloaded cog: {cog_name}"
|
||||
else:
|
||||
return f"Error: Unknown action '{action}' for ManageCog."
|
||||
except commands.ExtensionNotFound:
|
||||
return f"Error: Cog '{cog_name}' not found."
|
||||
except commands.ExtensionAlreadyLoaded:
|
||||
return f"Error: Cog '{cog_name}' is already loaded."
|
||||
except commands.ExtensionNotLoaded:
|
||||
return f"Error: Cog '{cog_name}' is not loaded."
|
||||
except commands.NoEntryPointError:
|
||||
return f"Error: Cog '{cog_name}' does not have a setup function."
|
||||
except Exception as e:
|
||||
return f"Error during ManageCog action '{action}' on '{cog_name}': {type(e).__name__} - {e}"
|
||||
|
||||
async def _execute_tool_run_tests(self, test_path_or_pattern: str, framework: str) -> str:
|
||||
framework = framework.lower()
|
||||
test_cmd = []
|
||||
|
||||
if framework == "pytest":
|
||||
test_cmd = ["pytest", test_path_or_pattern]
|
||||
elif framework == "unittest":
|
||||
# Basic unittest invocation. Might need more complex discovery for patterns.
|
||||
# python -m unittest test_module.TestClass.test_method
|
||||
# python -m unittest discover -s project_directory -p 'test_*.py'
|
||||
# For simplicity, assume test_path_or_pattern is directly usable by `python -m unittest`
|
||||
test_cmd = ["python", "-m", "unittest", test_path_or_pattern]
|
||||
else:
|
||||
return f"Error: Unsupported test framework '{framework}'. Supported: pytest, unittest."
|
||||
|
||||
try:
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
*test_cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
cwd=os.getcwd() # Run tests from the project root
|
||||
)
|
||||
stdout, stderr = await process.communicate(timeout=300) # 5-minute timeout for tests
|
||||
|
||||
output_str = ""
|
||||
if stdout:
|
||||
output_str += f"Test ({framework}) STDOUT:\n{stdout.decode(errors='replace')}\n"
|
||||
if stderr:
|
||||
output_str += f"Test ({framework}) STDERR:\n{stderr.decode(errors='replace')}\n"
|
||||
|
||||
if not output_str:
|
||||
output_str = f"Test ({framework}) command executed with no output. Exit code: {process.returncode}"
|
||||
else:
|
||||
output_str += f"\nTest ({framework}) command exit code: {process.returncode}"
|
||||
|
||||
|
||||
return output_str
|
||||
except FileNotFoundError:
|
||||
cmd_not_found = test_cmd[0]
|
||||
if framework == "unittest" and cmd_not_found == "python":
|
||||
cmd_not_found = "python interpreter"
|
||||
return f"Error: Test command '{cmd_not_found}' not found. Please ensure it is installed and in PATH."
|
||||
except subprocess.TimeoutExpired:
|
||||
return f"Error: Tests timed out after 300 seconds for target '{test_path_or_pattern}'."
|
||||
except Exception as e:
|
||||
return f"Error running tests for '{test_path_or_pattern}' with {framework}: {type(e).__name__} - {e}"
|
||||
|
||||
async def _execute_tool_python_repl(self, code_snippet: str, session_key: str) -> str:
|
||||
# Basic, insecure exec-based REPL. CAUTION ADVISED.
|
||||
# A proper implementation would use a sandboxed environment.
|
||||
if session_key not in self.agent_python_repl_sessions:
|
||||
self.agent_python_repl_sessions[session_key] = {'globals': globals().copy(), 'locals': {}}
|
||||
|
||||
session_env = self.agent_python_repl_sessions[session_key]
|
||||
|
||||
# Capture stdout for the REPL
|
||||
import io
|
||||
from contextlib import redirect_stdout
|
||||
|
||||
f = io.StringIO()
|
||||
try:
|
||||
with redirect_stdout(f):
|
||||
exec(code_snippet, session_env['globals'], session_env['locals'])
|
||||
output = f.getvalue()
|
||||
return f"Output:\n{output}" if output else "Executed successfully with no direct output."
|
||||
except Exception as e:
|
||||
return f"Error in PythonREPL: {type(e).__name__} - {e}"
|
||||
finally:
|
||||
f.close()
|
||||
|
||||
|
||||
async def _execute_tool_create_named_snapshot(self, snapshot_name: str, description: Optional[str]) -> str:
|
||||
# Similar to _create_programmatic_snapshot but uses the given name and doesn't switch back.
|
||||
# It creates a branch and commits.
|
||||
try:
|
||||
# Sanitize snapshot_name (Git branch names have restrictions)
|
||||
# A simple sanitization: replace spaces and invalid chars with underscores
|
||||
safe_snapshot_name = re.sub(r'[^\w.-]', '_', snapshot_name)
|
||||
if not safe_snapshot_name:
|
||||
return "Error: Invalid snapshot name after sanitization (empty)."
|
||||
|
||||
# Check if branch already exists
|
||||
success_check, existing_branches_str = await self._run_git_command(f"git branch --list {safe_snapshot_name}")
|
||||
if success_check and safe_snapshot_name in existing_branches_str:
|
||||
return f"Error: Snapshot branch '{safe_snapshot_name}' already exists."
|
||||
|
||||
# Create the new snapshot branch from current HEAD
|
||||
success, output = await self._run_git_command(f"git branch {safe_snapshot_name}")
|
||||
if not success:
|
||||
return f"Error: Failed to create snapshot branch '{safe_snapshot_name}': {output}"
|
||||
|
||||
# Commit on this new branch (can be an empty commit if no changes, or commit current state)
|
||||
# For simplicity, let's make it an empty commit with the description.
|
||||
# A more advanced version might commit staged changes onto this branch.
|
||||
commit_message = f"AI Named Snapshot: {snapshot_name}"
|
||||
if description:
|
||||
commit_message += f"\n\n{description}"
|
||||
|
||||
# To commit on the new branch, we'd typically checkout to it first, commit, then checkout back.
|
||||
# Or, create commit on new_branch_name using 'git commit-tree' and then 'git update-ref'.
|
||||
# Simpler: checkout, commit, checkout back (if desired, or leave on new branch).
|
||||
# The prompt implies it's separate from automatic snapshots, so maybe it stays on this branch or user manages.
|
||||
# Let's assume for now it just creates the branch and a commit on it, leaving current branch as is.
|
||||
# This requires creating a commit object pointing to current HEAD and then updating the branch ref.
|
||||
|
||||
# Alternative: create branch, then switch to it, commit, switch back.
|
||||
current_branch_success, current_branch_name = await self._run_git_command("git rev-parse --abbrev-ref HEAD")
|
||||
if not current_branch_success:
|
||||
await self._run_git_command(f"git branch -D {safe_snapshot_name}") # cleanup
|
||||
return f"Error: Could not get current branch name before creating named snapshot: {current_branch_name}"
|
||||
|
||||
success, output = await self._run_git_command(f"git checkout {safe_snapshot_name}")
|
||||
if not success:
|
||||
await self._run_git_command(f"git branch -D {safe_snapshot_name}") # cleanup
|
||||
return f"Error: Failed to checkout to new snapshot branch '{safe_snapshot_name}': {output}"
|
||||
|
||||
success, output = await self._run_git_command(f"git commit --author=\"{COMMIT_AUTHOR}\" -m \"{commit_message}\" --allow-empty")
|
||||
if not success:
|
||||
# Attempt to switch back before reporting error
|
||||
await self._run_git_command(f"git checkout {current_branch_name.strip()}")
|
||||
# Optionally delete the branch if commit failed
|
||||
# await self._run_git_command(f"git branch -D {safe_snapshot_name}")
|
||||
return f"Error: Failed to commit on snapshot branch '{safe_snapshot_name}': {output}"
|
||||
|
||||
# Switch back to original branch
|
||||
success_back, output_back = await self._run_git_command(f"git checkout {current_branch_name.strip()}")
|
||||
if not success_back:
|
||||
# This is problematic, user might be left on snapshot branch
|
||||
return f"Successfully created and committed snapshot '{safe_snapshot_name}', but FAILED to switch back to original branch '{current_branch_name.strip()}'. Current branch is now '{safe_snapshot_name}'. Details: {output_back}"
|
||||
|
||||
return f"Successfully created named snapshot: {safe_snapshot_name}"
|
||||
|
||||
except Exception as e:
|
||||
return f"Error creating named snapshot '{snapshot_name}': {type(e).__name__} - {e}"
|
||||
|
||||
|
||||
async def _execute_tool_compare_snapshots(self, base_ref: str, compare_ref: str) -> str:
|
||||
success, output = await self._run_git_command(f"git diff {base_ref}..{compare_ref}")
|
||||
if success:
|
||||
return f"Diff between '{base_ref}' and '{compare_ref}':\n```diff\n{output or 'No differences found.'}\n```"
|
||||
else:
|
||||
return f"Error comparing snapshots '{base_ref}' and '{compare_ref}': {output}"
|
||||
|
||||
async def _execute_tool_dry_run_apply_diff(self, path: str, diff_block: str) -> str:
|
||||
if not os.path.exists(path): # Check if target file for patch exists
|
||||
return f"Error: File not found at '{path}' for dry-run applying diff."
|
||||
if os.path.isdir(path):
|
||||
return f"Error: Path '{path}' is a directory, cannot dry-run apply diff."
|
||||
|
||||
try:
|
||||
if not diff_block.endswith('\n'):
|
||||
diff_block += '\n'
|
||||
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
'patch', '--dry-run', path,
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE
|
||||
)
|
||||
stdout, stderr = await process.communicate(input=diff_block.encode('utf-8'))
|
||||
|
||||
output_str = ""
|
||||
if stdout: output_str += f"Stdout:\n{stdout.decode(errors='replace')}\n"
|
||||
if stderr: output_str += f"Stderr:\n{stderr.decode(errors='replace')}\n"
|
||||
|
||||
if process.returncode == 0:
|
||||
return f"Dry run: Diff would apply cleanly to '{path}'.\n{output_str}"
|
||||
else:
|
||||
return f"Dry run: Error applying diff to '{path}' (exit code {process.returncode}).\n{output_str}"
|
||||
except FileNotFoundError:
|
||||
return "Error: The 'patch' command-line utility was not found. DryRunApplyDiff failed."
|
||||
except Exception as e:
|
||||
return f"Error during DryRunApplyDiff for '{path}': {type(e).__name__} - {e}"
|
||||
|
||||
|
||||
async def _execute_tool_dry_run_write_file(self, path: str) -> str:
|
||||
try:
|
||||
p = pathlib.Path(path)
|
||||
# Check if parent directory exists and is writable
|
||||
parent_dir = p.parent
|
||||
if not parent_dir.exists():
|
||||
# Check if we can create the parent directory
|
||||
try:
|
||||
# Attempt to create a dummy temp dir to check if parent is creatable
|
||||
# This is a bit complex; simpler check: can we write to grandparent?
|
||||
# For now, just report if parent doesn't exist.
|
||||
return f"Dry run: Parent directory '{parent_dir}' does not exist. Write would likely create it if permissions allow."
|
||||
except Exception: # Broad exception for permission issues with parent
|
||||
pass # Fall through to os.access checks
|
||||
|
||||
if p.exists(): # File exists
|
||||
if os.access(path, os.W_OK):
|
||||
return f"Dry run: File '{path}' exists and is writable."
|
||||
else:
|
||||
return f"Dry run: File '{path}' exists but is NOT writable (permission error)."
|
||||
else: # File does not exist, check if directory is writable
|
||||
if os.access(parent_dir, os.W_OK):
|
||||
return f"Dry run: File '{path}' does not exist, but directory '{parent_dir}' is writable. File can likely be created."
|
||||
else:
|
||||
return f"Dry run: File '{path}' does not exist, and directory '{parent_dir}' is NOT writable (permission error)."
|
||||
except Exception as e:
|
||||
return f"Error during DryRunWriteFile check for '{path}': {type(e).__name__} - {e}"
|
||||
|
||||
# --- End of New Tool Execution Methods ---
|
||||
|
||||
async def _process_agent_interaction(self, ctx: commands.Context, initial_prompt_text: str):
|
||||
user_id = ctx.author.id
|
||||
|
||||
# Check current mode and prepend to history if it's the start of a new interaction (or if mode changed)
|
||||
# The mode change command already adds a notification. Here, we ensure the AI is aware of the *current* mode
|
||||
# if this is a fresh interaction after a mode was set previously.
|
||||
# However, the system prompt now instructs AI on how mode changes are communicated.
|
||||
# So, direct injection here might be redundant if mode change command handles it.
|
||||
# Let's rely on the mode change command to inject the notification.
|
||||
|
||||
self._add_to_conversation_history(user_id, role="user", text_content=initial_prompt_text)
|
||||
|
||||
iteration_count = 0
|
||||
|
Loading…
x
Reference in New Issue
Block a user