diff --git a/capabilities/ai-red-teaming/capability.yaml b/capabilities/ai-red-teaming/capability.yaml index 87d015d..c35dd9a 100644 --- a/capabilities/ai-red-teaming/capability.yaml +++ b/capabilities/ai-red-teaming/capability.yaml @@ -1,6 +1,6 @@ schema: 1 name: ai-red-teaming -version: "1.3.1" +version: "1.3.2" description: > Probe the security and safety of AI applications, agents, and foundation models. Orchestrates adversarial attack workflows to discover vulnerabilities in LLMs, diff --git a/capabilities/ai-red-teaming/scripts/workflow_helper.py b/capabilities/ai-red-teaming/scripts/workflow_helper.py index 713925c..88a0f15 100644 --- a/capabilities/ai-red-teaming/scripts/workflow_helper.py +++ b/capabilities/ai-red-teaming/scripts/workflow_helper.py @@ -79,9 +79,35 @@ def save_workflow(params: dict) -> dict: # Save the file WORKFLOWS_DIR.mkdir(parents=True, exist_ok=True) filepath = WORKFLOWS_DIR / filename - filepath.write_text(content) - # Update metadata + # Read existing content (if any) for comparison + existing_content = "" + if filepath.exists(): + try: + existing_content = filepath.read_text() + except Exception: + pass # File may be locked/unreadable + + # Attempt write + try: + filepath.write_text(content) + except Exception as e: + return {"error": f"Failed to write file: {e}"} + + # Verify write succeeded by reading back + try: + written_content = filepath.read_text() + if written_content != content: + return {"error": f"File write incomplete (expected {len(content)} chars, got {len(written_content)})"} + + # Check if content actually changed when overwriting + if existing_content and existing_content == written_content and existing_content != content: + return {"error": f"File exists but content unchanged - write may have failed silently: {filepath}"} + + except Exception as e: + return {"error": f"Failed to verify write: {e}"} + + # Update metadata only after successful verification metadata = _load_metadata() metadata[filename] = { "description": description, @@ -90,7 +116,9 @@ def save_workflow(params: dict) -> dict: } _save_metadata(metadata) - return {"result": (f"Workflow saved: {filepath}\nSize: {len(content.encode())} bytes\nSyntax: valid")} + # Success - file confirmed written with correct content + status = "updated" if existing_content else "created" + return {"result": f"Workflow {status}: {filepath}\nSize: {len(content.encode())} bytes\nSyntax: valid\nContent: verified"} def list_workflows(params: dict) -> dict: diff --git a/capabilities/ai-red-teaming/tests/test_workflow_helper.py b/capabilities/ai-red-teaming/tests/test_workflow_helper.py index d425726..69090c5 100644 --- a/capabilities/ai-red-teaming/tests/test_workflow_helper.py +++ b/capabilities/ai-red-teaming/tests/test_workflow_helper.py @@ -97,3 +97,78 @@ def test_lists_saved_scripts(self, temp_workflows_dir) -> None: assert "result" in result # Result may be a string summary or a dict — just verify it's present assert result["result"] + + def test_save_workflow_overwrite_verification(self, temp_workflows_dir) -> None: + """Test that save_workflow properly verifies file overwrite.""" + import unittest.mock + from pathlib import Path + + helper, wf_dir = temp_workflows_dir + helper.WORKFLOWS_DIR = wf_dir + helper.METADATA_FILE = wf_dir / ".workflow_metadata.json" + + # Create initial file + initial_content = "print('original')" + wf_dir.mkdir(parents=True, exist_ok=True) + test_file = wf_dir / "test.py" + test_file.write_text(initial_content) + + # Test normal overwrite (should work) + result = helper.save_workflow({ + "filename": "test.py", + "content": "print('updated')", + "description": "test overwrite" + }) + assert "error" not in result + assert "updated" in result["result"] + assert test_file.read_text() == "print('updated')" + + # Test scenario where write appears to succeed but content doesn't change + # This simulates the bug reported by the user + original_content = test_file.read_text() + + with unittest.mock.patch.object(Path, 'write_text') as mock_write, \ + unittest.mock.patch.object(Path, 'read_text') as mock_read: + + # Mock write_text to do nothing (simulate silent failure) + mock_write.return_value = None + + # Mock read_text to return the original content (simulating no change) + mock_read.return_value = original_content + + result = helper.save_workflow({ + "filename": "test.py", + "content": "print('new content')", + "description": "test silent failure" + }) + + # Should detect that content didn't actually change + assert "error" in result + assert "incomplete" in result["error"] or "unchanged" in result["error"] + + def test_save_workflow_content_verification(self, temp_workflows_dir) -> None: + """Test that save_workflow verifies written content matches expected.""" + import unittest.mock + from pathlib import Path + + helper, wf_dir = temp_workflows_dir + helper.WORKFLOWS_DIR = wf_dir + helper.METADATA_FILE = wf_dir / ".workflow_metadata.json" + + # Test scenario where write operation writes partial/incorrect content + with unittest.mock.patch.object(Path, 'write_text') as mock_write: + mock_write.return_value = None + + # Mock read_text to return different content than expected + with unittest.mock.patch.object(Path, 'read_text') as mock_read: + mock_read.return_value = "print('partial" # Truncated content + + result = helper.save_workflow({ + "filename": "test.py", + "content": "print('complete content')", + "description": "test verification" + }) + + # Should detect that written content doesn't match expected + assert "error" in result + assert "incomplete" in result["error"] diff --git a/capabilities/ai-red-teaming/tools/workflows.py b/capabilities/ai-red-teaming/tools/workflows.py index 69266d1..fe12b0a 100644 --- a/capabilities/ai-red-teaming/tools/workflows.py +++ b/capabilities/ai-red-teaming/tools/workflows.py @@ -86,8 +86,35 @@ def save_workflow( WORKFLOWS_DIR.mkdir(parents=True, exist_ok=True) filepath = WORKFLOWS_DIR / filename - filepath.write_text(code) + # Read existing content (if any) for comparison + existing_content = "" + if filepath.exists(): + try: + existing_content = filepath.read_text() + except Exception: + pass # File may be locked/unreadable + + # Attempt write + try: + filepath.write_text(code) + except Exception as e: + return f"Error writing file: {e}" + + # Verify write succeeded by reading back + try: + written_content = filepath.read_text() + if written_content != code: + return f"Error: File write incomplete (expected {len(code)} chars, got {len(written_content)})" + + # Check if content actually changed when overwriting + if existing_content and existing_content == written_content and existing_content != code: + return f"Warning: File exists but content unchanged - write may have failed silently: {filepath}" + + except Exception as e: + return f"Error verifying write: {e}" + + # Update metadata only after successful verification meta = _load_metadata() meta[filename] = { "description": description, @@ -96,7 +123,9 @@ def save_workflow( } _save_metadata(meta) - return f"Workflow saved: {filepath} ({len(code)} bytes)" + # Success - file confirmed written with correct content + status = "updated" if existing_content else "created" + return f"Workflow {status}: {filepath} ({len(code)} bytes) - content verified" @tool