From 51bb610eeb11706358f2dad894320bdda1801158 Mon Sep 17 00:00:00 2001 From: Raja Sekhar Rao Dheekonda Date: Thu, 14 May 2026 14:01:37 -0700 Subject: [PATCH] fix(airt): replace remaining ~/workspace/airt paths with ~/.dreadnode/airt - scripts/workflow_helper.py: update docstring + use WORKFLOWS_DIR in user-facing strings - tools/attacks.py: fix docstring path reference for generate_attack - tools/session.py: resolve default session path via UserConfig (org/workspace) - capability.yaml: bump to 1.3.1 Closes the last stale references missed by the 1.3.0 workspace refactor. Co-Authored-By: Claude Opus 4.6 --- capabilities/ai-red-teaming/capability.yaml | 2 +- .../ai-red-teaming/scripts/workflow_helper.py | 39 ++++++++--- capabilities/ai-red-teaming/tools/attacks.py | 64 ++++++++++++++----- capabilities/ai-red-teaming/tools/session.py | 47 +++++++++++--- 4 files changed, 117 insertions(+), 35 deletions(-) diff --git a/capabilities/ai-red-teaming/capability.yaml b/capabilities/ai-red-teaming/capability.yaml index 96de80d..87d015d 100644 --- a/capabilities/ai-red-teaming/capability.yaml +++ b/capabilities/ai-red-teaming/capability.yaml @@ -1,6 +1,6 @@ schema: 1 name: ai-red-teaming -version: "1.3.0" +version: "1.3.1" description: > Probe the security and safety of AI applications, agents, and foundation models. Orchestrates adversarial attack workflows to discover vulnerabilities in LLMs, diff --git a/capabilities/ai-red-teaming/scripts/workflow_helper.py b/capabilities/ai-red-teaming/scripts/workflow_helper.py index 713925c..f53685f 100644 --- a/capabilities/ai-red-teaming/scripts/workflow_helper.py +++ b/capabilities/ai-red-teaming/scripts/workflow_helper.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 """Workflow helper for saving and listing Python attack scripts. -Saves workflow scripts to ~/workspace/airt/workflows/ with syntax +Saves workflow scripts to ~/.dreadnode/airt/[org]/[workspace]/workflows/ with syntax validation via compile(). Provides listing of saved workflows. Protocol: reads JSON from stdin, writes JSON to stdout. @@ -39,7 +39,9 @@ def _get_workspace_path() -> Path: WORKFLOWS_DIR = ( - Path(os.environ.get("AIRT_WORKFLOWS_DIR")) if os.environ.get("AIRT_WORKFLOWS_DIR") else _get_workspace_path() + Path(os.environ.get("AIRT_WORKFLOWS_DIR")) + if os.environ.get("AIRT_WORKFLOWS_DIR") + else _get_workspace_path() ) METADATA_FILE = WORKFLOWS_DIR / ".workflow_metadata.json" @@ -74,7 +76,11 @@ def save_workflow(params: dict) -> dict: try: compile(content, filename, "exec") except SyntaxError as e: - return {"error": (f"Syntax error in workflow: {e.msg} (line {e.lineno}, col {e.offset})")} + return { + "error": ( + f"Syntax error in workflow: {e.msg} (line {e.lineno}, col {e.offset})" + ) + } # Save the file WORKFLOWS_DIR.mkdir(parents=True, exist_ok=True) @@ -90,7 +96,11 @@ def save_workflow(params: dict) -> dict: } _save_metadata(metadata) - return {"result": (f"Workflow saved: {filepath}\nSize: {len(content.encode())} bytes\nSyntax: valid")} + return { + "result": ( + f"Workflow saved: {filepath}\nSize: {len(content.encode())} bytes\nSyntax: valid" + ) + } def list_workflows(params: dict) -> dict: @@ -99,7 +109,7 @@ def list_workflows(params: dict) -> dict: py_files = sorted(WORKFLOWS_DIR.glob("*.py")) if not py_files: - return {"result": "No workflow files found in ~/workspace/airt/workflows/"} + return {"result": f"No workflow files found in {WORKFLOWS_DIR}"} metadata = _load_metadata() @@ -133,7 +143,11 @@ def execute_workflow(params: dict) -> dict: filepath = WORKFLOWS_DIR / filename if not filepath.exists(): # List available workflows - available = [f.name for f in WORKFLOWS_DIR.glob("*.py")] if WORKFLOWS_DIR.exists() else [] + available = ( + [f.name for f in WORKFLOWS_DIR.glob("*.py")] + if WORKFLOWS_DIR.exists() + else [] + ) return {"error": f"Workflow not found: {filename}. Available: {available}"} timeout = int(params.get("timeout", 300)) @@ -141,7 +155,10 @@ def execute_workflow(params: dict) -> dict: try: python_executable = resolve_python_executable() - print(f"[INFO] Executing workflow with Python: {python_executable}", file=sys.stderr) + print( + f"[INFO] Executing workflow with Python: {python_executable}", + file=sys.stderr, + ) result = subprocess.run( [python_executable, str(filepath)], cwd=str(WORKFLOWS_DIR.parent), @@ -159,12 +176,16 @@ def execute_workflow(params: dict) -> dict: output = "\n".join(output_parts) or "(no output)" if result.returncode != 0: - return {"result": f"Workflow exited with code {result.returncode}.\n\n{output}"} + return { + "result": f"Workflow exited with code {result.returncode}.\n\n{output}" + } return {"result": f"Workflow completed successfully.\n\n{output}"} except subprocess.TimeoutExpired: - return {"result": f"Workflow timed out after {timeout}s. Partial output may be in ~/workspace/airt/."} + return { + "result": f"Workflow timed out after {timeout}s. Partial output may be in {WORKFLOWS_DIR.parent}." + } except Exception as e: return {"error": f"Failed to execute workflow: {e}"} diff --git a/capabilities/ai-red-teaming/tools/attacks.py b/capabilities/ai-red-teaming/tools/attacks.py index 6cea6dc..050221a 100644 --- a/capabilities/ai-red-teaming/tools/attacks.py +++ b/capabilities/ai-red-teaming/tools/attacks.py @@ -28,7 +28,10 @@ def _call_runner(name: str, params: dict) -> str: payload = json.dumps({"name": name, "parameters": params}) try: python_executable = resolve_python_executable() - print(f"[INFO] Executing attack runner with Python: {python_executable}", file=sys.stderr) + print( + f"[INFO] Executing attack runner with Python: {python_executable}", + file=sys.stderr, + ) result = subprocess.run( [python_executable, str(_RUNNER_SCRIPT)], input=payload, @@ -95,7 +98,9 @@ def generate_attack( "injection (skeleton_key_framing, many_shot_examples), " "advanced_jailbreak, mcp_attacks, multi_agent_attacks, exfiltration, and more.", ] = None, - compare_transforms: t.Annotated[bool, "If True with transforms, creates N+1 comparison study"] = False, + compare_transforms: t.Annotated[ + bool, "If True with transforms, creates N+1 comparison study" + ] = False, scorers: t.Annotated[list[str] | None, "Custom scorer names"] = None, n_iterations: t.Annotated[int | None, "Iterations per attack"] = None, goal_category: t.Annotated[str, "Goal category for scoring"] = "", @@ -106,7 +111,7 @@ def generate_attack( Supports 12+ attack types, 200+ transforms (encoding, cipher, persuasion, agentic, MCP, multi-agent, exfiltration, and more), and configurable scorers. The generated Python script is saved to - ~/workspace/airt/workflows/ and auto-executed. + ~/.dreadnode/airt/[org]/[workspace]/workflows/ and auto-executed. Multiple attacks (comma-separated) create a campaign. Adding compare_transforms=True with transforms creates an N+1 study. @@ -144,10 +149,15 @@ def generate_category_attack( target_model: t.Annotated[str, "Target LLM model"], categories: t.Annotated[ list[str] | None, - "Sub-category slugs (e.g., ['cybersecurity', 'credential_extraction']) " "or ['all'] for all categories", + "Sub-category slugs (e.g., ['cybersecurity', 'credential_extraction']) " + "or ['all'] for all categories", + ] = None, + goal_ids: t.Annotated[ + list[str] | None, "Specific goal IDs (overrides categories)" + ] = None, + goals_per_category: t.Annotated[ + int | None, "Max goals to sample per category" ] = None, - goal_ids: t.Annotated[list[str] | None, "Specific goal IDs (overrides categories)"] = None, - goals_per_category: t.Annotated[int | None, "Max goals to sample per category"] = None, attacker_model: t.Annotated[str, "Attacker LLM"] = "", evaluator_model: t.Annotated[str, "Judge LLM"] = "", transform_model: t.Annotated[str, "Transform LLM"] = "", @@ -203,14 +213,30 @@ def generate_agentic_attack( agent_url: t.Annotated[str, "HTTP endpoint of the target agent"], attacker_model: t.Annotated[str, "LLM generating attack prompts"], attack_type: t.Annotated[str, "Attack type (default: tap)"] = "tap", - agent_auth_type: t.Annotated[str, "Auth scheme: 'none', 'bearer', or 'api_key'"] = "none", - agent_auth_env_var: t.Annotated[str, "Env var name for auth credential"] = "AGENT_API_KEY", - agent_request_template: t.Annotated[str, "JSON request template with {prompt} placeholder"] = "", - agent_response_text_path: t.Annotated[str, "JSONPath to extract response text"] = "", - agent_response_tool_calls_path: t.Annotated[str, "JSONPath for tool calls in response"] = "", - agent_dangerous_tools: t.Annotated[list[str] | None, "Dangerous tool names to target for agentic scoring"] = None, - agent_safe_tools: t.Annotated[list[str] | None, "Safe tool whitelist for agentic scoring"] = None, - agent_preset: t.Annotated[str, "Preset: 'openai_assistants', 'anthropic', or 'custom'"] = "custom", + agent_auth_type: t.Annotated[ + str, "Auth scheme: 'none', 'bearer', or 'api_key'" + ] = "none", + agent_auth_env_var: t.Annotated[ + str, "Env var name for auth credential" + ] = "AGENT_API_KEY", + agent_request_template: t.Annotated[ + str, "JSON request template with {prompt} placeholder" + ] = "", + agent_response_text_path: t.Annotated[ + str, "JSONPath to extract response text" + ] = "", + agent_response_tool_calls_path: t.Annotated[ + str, "JSONPath for tool calls in response" + ] = "", + agent_dangerous_tools: t.Annotated[ + list[str] | None, "Dangerous tool names to target for agentic scoring" + ] = None, + agent_safe_tools: t.Annotated[ + list[str] | None, "Safe tool whitelist for agentic scoring" + ] = None, + agent_preset: t.Annotated[ + str, "Preset: 'openai_assistants', 'anthropic', or 'custom'" + ] = "custom", evaluator_model: t.Annotated[str, "Judge LLM"] = "", transform_model: t.Annotated[str, "Transform LLM"] = "", transforms: t.Annotated[list[str] | None, "Transforms to apply"] = None, @@ -276,12 +302,14 @@ def generate_image_attack( ] = "hopskipjump", input_type: t.Annotated[ str, - "Input data type: 'image' (load from URL, perturb pixels) or " "'tabular' (feature array + API endpoint)", + "Input data type: 'image' (load from URL, perturb pixels) or " + "'tabular' (feature array + API endpoint)", ] = "image", # --- Image-specific params --- image_url: t.Annotated[ str, - "URL of the source image (for input_type='image'). " "Can also be a local file path.", + "URL of the source image (for input_type='image'). " + "Can also be a local file path.", ] = "", # --- Tabular-specific params --- features: t.Annotated[ @@ -295,7 +323,9 @@ def generate_image_attack( "and returns {predictions: [{class: int, confidence: float}]}", ] = "", api_key: t.Annotated[str, "API key for x-api-key header (optional)"] = "", - target_class: t.Annotated[int, "Class to flip TO (adversarial target), e.g. 1 for fraud"] = 1, + target_class: t.Annotated[ + int, "Class to flip TO (adversarial target), e.g. 1 for fraud" + ] = 1, original_class: t.Annotated[ int | str, "Original class of the source input, e.g. 0 for legitimate", diff --git a/capabilities/ai-red-teaming/tools/session.py b/capabilities/ai-red-teaming/tools/session.py index 041940c..5e3078d 100644 --- a/capabilities/ai-red-teaming/tools/session.py +++ b/capabilities/ai-red-teaming/tools/session.py @@ -16,11 +16,32 @@ from dreadnode.agents.tools import tool -SESSION_PATH = Path( - os.environ.get( - "AIRT_SESSION_PATH", - os.path.expanduser("~/workspace/airt/.session_context.json"), + +def _default_session_path() -> Path: + try: + from dreadnode.app.config import UserConfig + + config = UserConfig.read() + profile_data = config.active_profile + if profile_data: + _, profile = profile_data + org = profile.organization or "default" + workspace = profile.workspace or "main" + else: + org = "default" + workspace = "main" + except Exception: + org = "default" + workspace = "main" + return ( + Path.home() / ".dreadnode" / "airt" / org / workspace / ".session_context.json" ) + + +SESSION_PATH = ( + Path(os.environ["AIRT_SESSION_PATH"]) + if os.environ.get("AIRT_SESSION_PATH") + else _default_session_path() ) @@ -89,7 +110,9 @@ def save_session_context( session["history"] = history[-20:] _save(session) - return "Session context saved. Target: {}, Goal: {}, Last attack: {}".format(target_model, goal[:60], attack_type) + return "Session context saved. Target: {}, Goal: {}, Last attack: {}".format( + target_model, goal[:60], attack_type + ) @tool @@ -134,10 +157,18 @@ def get_session_context() -> str: lines.append("") lines.append("Attack History ({} runs):".format(len(history))) for h in history[-5:]: # Show last 5 - score_str = "ASR={}%".format(h["best_score"]) if h.get("best_score") is not None else "no score" - tx_str = "+{}".format(",".join(h["transforms"])) if h.get("transforms") else "" + score_str = ( + "ASR={}%".format(h["best_score"]) + if h.get("best_score") is not None + else "no score" + ) + tx_str = ( + "+{}".format(",".join(h["transforms"])) if h.get("transforms") else "" + ) lines.append( - " - {} {}: {} ({})".format(h.get("attack_type", "?"), tx_str, h.get("goal", "")[:40], score_str) + " - {} {}: {} ({})".format( + h.get("attack_type", "?"), tx_str, h.get("goal", "")[:40], score_str + ) ) return "\n".join(lines)