feat: Enhance ApplyDiff and DryRunApplyDiff to support SEARCH/REPLACE operations with detailed error handling

This commit is contained in:
Slipstream 2025-06-02 16:32:20 -06:00
parent e9fac08645
commit 0a8ff280e7
Signed by: slipstream
GPG Key ID: 13E498CE010AC6FD

View File

@ -104,18 +104,28 @@ IMPORTANT: Do NOT wrap your XML tool calls in markdown code blocks (e.g., ```xml
```
(System will confirm success or report error in ToolResponse. A snapshot is made before writing.)
3. **ApplyDiff:** Applies a diff/patch to a file. Use standard unidiff format for the diff_block.
3. **ApplyDiff:** Applies targeted modifications to an existing file by searching for specific sections of content and replacing them. The `diff_block` uses a custom SEARCH/REPLACE format. You can perform multiple distinct search and replace operations within a single call by providing multiple SEARCH/REPLACE blocks.
```xml
<ApplyDiff>
<path>path/to/file.ext</path>
<diff_block><![CDATA[
--- a/original_file.py
+++ b/modified_file.py
@@ -1,3 +1,4 @@
line 1
-line 2 old
+line 2 new
+line 3 added
[SEARCH_BLOCK_START]
:start_line:10
-------
exact content to find
including whitespace and newlines
\=======
new content to replace with
[REPLACE_BLOCK_END]
[SEARCH_BLOCK_START]
:start_line:25
-------
another exact content block
to find and replace
\=======
another new content block
[REPLACE_BLOCK_END]
]]></diff_block>
</ApplyDiff>
```
@ -227,18 +237,28 @@ IMPORTANT: Do NOT wrap your XML tool calls in markdown code blocks (e.g., ```xml
```
(System will provide diff output or error in ToolResponse)
15. **DryRunApplyDiff:** Checks the outcome of applying a diff (e.g., via `patch --dry-run`) without actually modifying the file.
15. **DryRunApplyDiff:** Checks the outcome of applying a SEARCH/REPLACE diff (as defined for ApplyDiff) without actually modifying the file.
```xml
<DryRunApplyDiff>
<path>path/to/file.ext</path>
<diff_block><![CDATA[
--- a/original_file.py
+++ b/modified_file.py
@@ -1,3 +1,4 @@
line 1
-line 2 old
+line 2 new
+line 3 added
[SEARCH_BLOCK_START]
:start_line:10
-------
exact content to find
including whitespace and newlines
\=======
new content to replace with
[REPLACE_BLOCK_END]
[SEARCH_BLOCK_START]
:start_line:25
-------
another exact content block
to find and replace
\=======
another new content block
[REPLACE_BLOCK_END]
]]></diff_block>
</DryRunApplyDiff>
```
@ -938,46 +958,135 @@ class AICodeAgentCog(commands.Cog):
return f"Error writing to file '{path}': {type(e).__name__} - {e}"
async def _execute_tool_apply_diff(self, path: str, diff_block: str) -> str:
print(f"AICodeAgentCog: Attempting _execute_tool_apply_diff for path: {path}")
print(f"AICodeAgentCog: Attempting _execute_tool_apply_diff (Search/Replace) for path: {path}")
if not os.path.exists(path):
return f"Error: File not found at '{path}' for applying diff."
if os.path.isdir(path):
return f"Error: Path '{path}' is a directory, cannot apply diff."
try:
# Ensure diff_block ends with a newline for `patch` utility
if not diff_block.endswith('\n'):
diff_block += '\n'
process = await asyncio.create_subprocess_exec(
'patch', path, # Target file for the patch
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
stdout, stderr = await process.communicate(input=diff_block.encode('utf-8'))
output_str = ""
if stdout:
output_str += f"Stdout:\n{stdout.decode(errors='replace')}\n"
if stderr:
output_str += f"Stderr:\n{stderr.decode(errors='replace')}\n"
if process.returncode == 0:
return f"Successfully applied diff to '{path}'.\n{output_str}"
else:
# Try to provide context if patch failed, e.g. if it created .rej file
rej_file = f"{path}.rej"
if os.path.exists(rej_file):
with open(rej_file, 'r', encoding='utf-8', errors='replace') as rf:
rej_content = rf.read()
output_str += f"\nRejects file found ({rej_file}):\n{rej_content[:500]}...\n(Please check this file for details of failed hunks)"
return f"Error applying diff to '{path}' (exit code {process.returncode}).\n{output_str}"
except FileNotFoundError:
return "Error: The 'patch' command-line utility was not found. Diff application failed. Please ensure 'patch' is installed and in the system PATH."
with open(path, 'r', encoding='utf-8') as f:
file_lines = f.readlines() # Read all lines, keeping newlines
except Exception as e:
return f"Error applying diff to '{path}': {type(e).__name__} - {e}"
return f"Error reading file '{path}': {type(e).__name__} - {e}"
try:
# Pattern for a single search/replace operation block
# Ensure :start_line: is captured, and search/replace content is captured non-greedily.
# Content between ------- and ======= is search_content.
# Content between ======= and >>>>>>> REPLACE is replace_content.
operation_pattern = re.compile(
r"^\s*<<<<<<< SEARCH\s*\n"
r":start_line:(\d+)\s*\n"
r"-------\s*\n"
r"(.*?)" # search_content (group 2)
r"=======\s*\n"
r"(.*?)" # replace_content (group 3)
r">>>>>>> REPLACE\s*$",
re.DOTALL | re.MULTILINE
)
operations = []
last_match_end = 0
for match in operation_pattern.finditer(diff_block):
if match.start() < last_match_end: # Should not happen with finditer
continue
# Check for content between operations that is not part of a valid operation
if diff_block[last_match_end:match.start()].strip():
return f"Error: Malformed diff_block. Unexpected content between operations near character {last_match_end}."
start_line = int(match.group(1))
# Split search/replace content carefully to respect original newlines
# The (.*?) captures content up to the next \n======= or \n>>>>>>>
# We need to remove the structural newlines from the capture groups if they are included by DOTALL
# To get precise content: find the end of "-------\n" and start of "\n======="
search_content_start_offset = match.group(0).find("-------\n") + len("-------\n")
search_content_end_offset = match.group(0).find("\n=======")
search_c = match.group(0)[search_content_start_offset:search_content_end_offset]
replace_content_start_offset = match.group(0).find("=======\n") + len("=======\n")
replace_content_end_offset = match.group(0).find("\n>>>>>>> REPLACE")
replace_c = match.group(0)[replace_content_start_offset:replace_content_end_offset]
operations.append({
"start_line": start_line,
"search": search_c,
"replace": replace_c,
"original_block_for_error": match.group(0)[:200] # For error reporting
})
last_match_end = match.end()
if not operations:
if diff_block.strip(): # If diff_block had content but no operations parsed
return "Error: diff_block provided but no valid SEARCH/REPLACE operations found."
else: # Empty diff_block
return "Error: Empty diff_block provided."
# Check if there's any trailing malformed content after the last valid operation
if diff_block[last_match_end:].strip():
return f"Error: Malformed diff_block. Unexpected trailing content after last operation, near character {last_match_end}."
# Sort operations by start_line in descending order to apply changes from bottom-up
# This helps manage line number shifts correctly if replacements change the number of lines.
operations.sort(key=lambda op: op['start_line'], reverse=True)
applied_count = 0
for op in operations:
start_line_0_indexed = op['start_line'] - 1
# Ensure search content exactly matches, including its own newlines
search_content_lines = op['search'].splitlines(True) # Keep newlines for comparison
if not search_content_lines and op['search']: # Search content is not empty but has no newlines (e.g. "foo")
search_content_lines = [op['search']]
elif not op['search']: # Empty search string
search_content_lines = [""]
num_search_lines = len(search_content_lines)
if start_line_0_indexed < 0 or (start_line_0_indexed + num_search_lines > len(file_lines) and num_search_lines > 0) or (start_line_0_indexed > len(file_lines) and num_search_lines == 0 and op['search'] == ""): # check for empty search at end of file
# Special case: if search is empty and start_line is one past the end, it's an append
if not (op['search'] == "" and start_line_0_indexed == len(file_lines)):
return f"Error: Operation for line {op['start_line']} (0-indexed {start_line_0_indexed}) with {num_search_lines} search lines is out of file bounds (total lines: {len(file_lines)}). Block: {op['original_block_for_error']}..."
actual_file_segment_lines = file_lines[start_line_0_indexed : start_line_0_indexed + num_search_lines]
actual_file_segment_content = "".join(actual_file_segment_lines)
# Exact match, including newlines.
if actual_file_segment_content == op['search']:
replace_content_lines = op['replace'].splitlines(True)
if not replace_content_lines and op['replace']: # Replace content is not empty but has no newlines
replace_content_lines = [op['replace']]
file_lines[start_line_0_indexed : start_line_0_indexed + num_search_lines] = replace_content_lines
applied_count += 1
else:
# For better error reporting:
expected_repr = repr(op['search'])
found_repr = repr(actual_file_segment_content)
max_len = 100
if len(expected_repr) > max_len: expected_repr = expected_repr[:max_len] + "..."
if len(found_repr) > max_len: found_repr = found_repr[:max_len] + "..."
return (f"Error: Search content mismatch at line {op['start_line']}.\n"
f"Expected: {expected_repr}\n"
f"Found : {found_repr}\n"
f"Original Block Hint: {op['original_block_for_error']}...")
if applied_count == len(operations):
try:
with open(path, 'w', encoding='utf-8') as f:
f.writelines(file_lines)
return f"Successfully applied {applied_count} SEARCH/REPLACE operation(s) to '{path}'."
except Exception as e:
return f"Error writing changes to file '{path}': {type(e).__name__} - {e}"
else:
# This case should ideally be caught by the mismatch error above.
return f"Error: Not all operations could be applied. Applied {applied_count} out of {len(operations)}."
except Exception as e:
# General error during parsing or application logic
return f"Error applying SEARCH/REPLACE diff to '{path}': {type(e).__name__} - {e}"
async def _execute_tool_execute_command(self, command: str, user_id: int) -> str:
session = self.agent_shell_sessions[user_id]
@ -1504,35 +1613,112 @@ class AICodeAgentCog(commands.Cog):
return f"Error comparing snapshots '{base_ref}' and '{compare_ref}': {output}"
async def _execute_tool_dry_run_apply_diff(self, path: str, diff_block: str) -> str:
if not os.path.exists(path): # Check if target file for patch exists
print(f"AICodeAgentCog: Attempting _execute_tool_dry_run_apply_diff (Search/Replace) for path: {path}")
if not os.path.exists(path):
return f"Error: File not found at '{path}' for dry-run applying diff."
if os.path.isdir(path):
return f"Error: Path '{path}' is a directory, cannot dry-run apply diff."
try:
if not diff_block.endswith('\n'):
diff_block += '\n'
process = await asyncio.create_subprocess_exec(
'patch', '--dry-run', path,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
stdout, stderr = await process.communicate(input=diff_block.encode('utf-8'))
output_str = ""
if stdout: output_str += f"Stdout:\n{stdout.decode(errors='replace')}\n"
if stderr: output_str += f"Stderr:\n{stderr.decode(errors='replace')}\n"
if process.returncode == 0:
return f"Dry run: Diff would apply cleanly to '{path}'.\n{output_str}"
else:
return f"Dry run: Error applying diff to '{path}' (exit code {process.returncode}).\n{output_str}"
except FileNotFoundError:
return "Error: The 'patch' command-line utility was not found. DryRunApplyDiff failed."
with open(path, 'r', encoding='utf-8') as f:
file_lines = f.readlines() # Read all lines, keeping newlines
except Exception as e:
return f"Error during DryRunApplyDiff for '{path}': {type(e).__name__} - {e}"
return f"Error reading file '{path}' for dry-run: {type(e).__name__} - {e}"
try:
# Pattern for a single search/replace operation block (same as apply_diff)
operation_pattern = re.compile(
r"^\s*<<<<<<< SEARCH\s*\n"
r":start_line:(\d+)\s*\n"
r"-------\s*\n"
r"(.*?)" # search_content
r"=======\s*\n"
r"(.*?)" # replace_content
r">>>>>>> REPLACE\s*$",
re.DOTALL | re.MULTILINE
)
operations = []
last_match_end = 0
for match in operation_pattern.finditer(diff_block):
if match.start() < last_match_end:
continue
if diff_block[last_match_end:match.start()].strip():
return f"Dry run Error: Malformed diff_block. Unexpected content between operations near character {last_match_end}."
start_line = int(match.group(1))
search_content_start_offset = match.group(0).find("-------\n") + len("-------\n")
search_content_end_offset = match.group(0).find("\n=======")
search_c = match.group(0)[search_content_start_offset:search_content_end_offset]
replace_content_start_offset = match.group(0).find("=======\n") + len("=======\n")
replace_content_end_offset = match.group(0).find("\n>>>>>>> REPLACE")
replace_c = match.group(0)[replace_content_start_offset:replace_content_end_offset]
operations.append({
"start_line": start_line,
"search": search_c,
"replace": replace_c,
"original_block_for_error": match.group(0)[:200]
})
last_match_end = match.end()
if not operations:
if diff_block.strip():
return "Dry run Error: diff_block provided but no valid SEARCH/REPLACE operations found."
else:
return "Dry run Error: Empty diff_block provided."
if diff_block[last_match_end:].strip():
return f"Dry run Error: Malformed diff_block. Unexpected trailing content after last operation, near character {last_match_end}."
# No sorting needed for dry run as we are not modifying the list in place during iteration.
# We just check each operation independently against the original file content.
checked_ops_count = 0
for op_idx, op in enumerate(operations):
start_line_0_indexed = op['start_line'] - 1
search_content_lines = op['search'].splitlines(True)
if not search_content_lines and op['search']:
search_content_lines = [op['search']]
elif not op['search']:
search_content_lines = [""]
num_search_lines = len(search_content_lines)
if start_line_0_indexed < 0 or \
(start_line_0_indexed + num_search_lines > len(file_lines) and num_search_lines > 0) or \
(start_line_0_indexed > len(file_lines) and num_search_lines == 0 and op['search'] == ""):
if not (op['search'] == "" and start_line_0_indexed == len(file_lines)): # append case
return (f"Dry run Error: Operation {op_idx+1} for line {op['start_line']} "
f"(0-indexed {start_line_0_indexed}) with {num_search_lines} search lines "
f"is out of file bounds (total lines: {len(file_lines)}). "
f"Block: {op['original_block_for_error']}...")
actual_file_segment_lines = file_lines[start_line_0_indexed : start_line_0_indexed + num_search_lines]
actual_file_segment_content = "".join(actual_file_segment_lines)
if actual_file_segment_content == op['search']:
checked_ops_count += 1
else:
expected_repr = repr(op['search'])
found_repr = repr(actual_file_segment_content)
max_len = 100
if len(expected_repr) > max_len: expected_repr = expected_repr[:max_len] + "..."
if len(found_repr) > max_len: found_repr = found_repr[:max_len] + "..."
return (f"Dry run Error: Search content mismatch for operation {op_idx+1} at line {op['start_line']}.\n"
f"Expected: {expected_repr}\n"
f"Found : {found_repr}\n"
f"Original Block Hint: {op['original_block_for_error']}...")
if checked_ops_count == len(operations):
return f"Dry run: All {len(operations)} SEARCH/REPLACE operation(s) would apply cleanly to '{path}'."
else:
# This state should ideally not be reached if mismatches return early.
return f"Dry run Error: Not all operations would apply cleanly. Checked {checked_ops_count} of {len(operations)}."
except Exception as e:
return f"Error during DryRunApplyDiff (Search/Replace) for '{path}': {type(e).__name__} - {e}"
async def _execute_tool_dry_run_write_file(self, path: str) -> str:
@ -1642,8 +1828,8 @@ class AICodeAgentCog(commands.Cog):
vertex_contents = current_history # Already in types.Content format
generation_config = google_genai_types.GenerateContentConfig(
temperature=0.7, # Adjust as needed
max_output_tokens=4096, # Adjust as needed
temperature=0.3, # Adjust as needed
max_output_tokens=65535, # Adjust as needed
safety_settings=STANDARD_SAFETY_SETTINGS,
# System instruction is critical here
system_instruction=google_genai_types.Content(