Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion capabilities/ai-red-teaming/capability.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
schema: 1
name: ai-red-teaming
version: "1.3.0"
version: "1.3.1"
description: >
Probe the security and safety of AI applications, agents, and foundation models.
Orchestrates adversarial attack workflows to discover vulnerabilities in LLMs,
Expand Down
39 changes: 30 additions & 9 deletions capabilities/ai-red-teaming/scripts/workflow_helper.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
"""Workflow helper for saving and listing Python attack scripts.

Saves workflow scripts to ~/workspace/airt/workflows/ with syntax
Saves workflow scripts to ~/.dreadnode/airt/[org]/[workspace]/workflows/ with syntax
validation via compile(). Provides listing of saved workflows.

Protocol: reads JSON from stdin, writes JSON to stdout.
Expand Down Expand Up @@ -39,7 +39,9 @@ def _get_workspace_path() -> Path:


WORKFLOWS_DIR = (
Path(os.environ.get("AIRT_WORKFLOWS_DIR")) if os.environ.get("AIRT_WORKFLOWS_DIR") else _get_workspace_path()
Path(os.environ.get("AIRT_WORKFLOWS_DIR"))
if os.environ.get("AIRT_WORKFLOWS_DIR")
else _get_workspace_path()
)
METADATA_FILE = WORKFLOWS_DIR / ".workflow_metadata.json"

Expand Down Expand Up @@ -74,7 +76,11 @@ def save_workflow(params: dict) -> dict:
try:
compile(content, filename, "exec")
except SyntaxError as e:
return {"error": (f"Syntax error in workflow: {e.msg} (line {e.lineno}, col {e.offset})")}
return {
"error": (
f"Syntax error in workflow: {e.msg} (line {e.lineno}, col {e.offset})"
)
}

# Save the file
WORKFLOWS_DIR.mkdir(parents=True, exist_ok=True)
Expand All @@ -90,7 +96,11 @@ def save_workflow(params: dict) -> dict:
}
_save_metadata(metadata)

return {"result": (f"Workflow saved: {filepath}\nSize: {len(content.encode())} bytes\nSyntax: valid")}
return {
"result": (
f"Workflow saved: {filepath}\nSize: {len(content.encode())} bytes\nSyntax: valid"
)
}


def list_workflows(params: dict) -> dict:
Expand All @@ -99,7 +109,7 @@ def list_workflows(params: dict) -> dict:

py_files = sorted(WORKFLOWS_DIR.glob("*.py"))
if not py_files:
return {"result": "No workflow files found in ~/workspace/airt/workflows/"}
return {"result": f"No workflow files found in {WORKFLOWS_DIR}"}

metadata = _load_metadata()

Expand Down Expand Up @@ -133,15 +143,22 @@ def execute_workflow(params: dict) -> dict:
filepath = WORKFLOWS_DIR / filename
if not filepath.exists():
# List available workflows
available = [f.name for f in WORKFLOWS_DIR.glob("*.py")] if WORKFLOWS_DIR.exists() else []
available = (
[f.name for f in WORKFLOWS_DIR.glob("*.py")]
if WORKFLOWS_DIR.exists()
else []
)
return {"error": f"Workflow not found: {filename}. Available: {available}"}

timeout = int(params.get("timeout", 300))
timeout = min(timeout, 600) # Max 10 minutes

try:
python_executable = resolve_python_executable()
print(f"[INFO] Executing workflow with Python: {python_executable}", file=sys.stderr)
print(
f"[INFO] Executing workflow with Python: {python_executable}",
file=sys.stderr,
)
result = subprocess.run(
[python_executable, str(filepath)],
cwd=str(WORKFLOWS_DIR.parent),
Expand All @@ -159,12 +176,16 @@ def execute_workflow(params: dict) -> dict:
output = "\n".join(output_parts) or "(no output)"

if result.returncode != 0:
return {"result": f"Workflow exited with code {result.returncode}.\n\n{output}"}
return {
"result": f"Workflow exited with code {result.returncode}.\n\n{output}"
}

return {"result": f"Workflow completed successfully.\n\n{output}"}

except subprocess.TimeoutExpired:
return {"result": f"Workflow timed out after {timeout}s. Partial output may be in ~/workspace/airt/."}
return {
"result": f"Workflow timed out after {timeout}s. Partial output may be in {WORKFLOWS_DIR.parent}."
}
except Exception as e:
return {"error": f"Failed to execute workflow: {e}"}

Expand Down
64 changes: 47 additions & 17 deletions capabilities/ai-red-teaming/tools/attacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ def _call_runner(name: str, params: dict) -> str:
payload = json.dumps({"name": name, "parameters": params})
try:
python_executable = resolve_python_executable()
print(f"[INFO] Executing attack runner with Python: {python_executable}", file=sys.stderr)
print(
f"[INFO] Executing attack runner with Python: {python_executable}",
file=sys.stderr,
)
result = subprocess.run(
[python_executable, str(_RUNNER_SCRIPT)],
input=payload,
Expand Down Expand Up @@ -95,7 +98,9 @@ def generate_attack(
"injection (skeleton_key_framing, many_shot_examples), "
"advanced_jailbreak, mcp_attacks, multi_agent_attacks, exfiltration, and more.",
] = None,
compare_transforms: t.Annotated[bool, "If True with transforms, creates N+1 comparison study"] = False,
compare_transforms: t.Annotated[
bool, "If True with transforms, creates N+1 comparison study"
] = False,
scorers: t.Annotated[list[str] | None, "Custom scorer names"] = None,
n_iterations: t.Annotated[int | None, "Iterations per attack"] = None,
goal_category: t.Annotated[str, "Goal category for scoring"] = "",
Expand All @@ -106,7 +111,7 @@ def generate_attack(
Supports 12+ attack types, 200+ transforms (encoding, cipher,
persuasion, agentic, MCP, multi-agent, exfiltration, and more),
and configurable scorers. The generated Python script is saved to
~/workspace/airt/workflows/ and auto-executed.
~/.dreadnode/airt/[org]/[workspace]/workflows/ and auto-executed.

Multiple attacks (comma-separated) create a campaign. Adding
compare_transforms=True with transforms creates an N+1 study.
Expand Down Expand Up @@ -144,10 +149,15 @@ def generate_category_attack(
target_model: t.Annotated[str, "Target LLM model"],
categories: t.Annotated[
list[str] | None,
"Sub-category slugs (e.g., ['cybersecurity', 'credential_extraction']) " "or ['all'] for all categories",
"Sub-category slugs (e.g., ['cybersecurity', 'credential_extraction']) "
"or ['all'] for all categories",
] = None,
goal_ids: t.Annotated[
list[str] | None, "Specific goal IDs (overrides categories)"
] = None,
goals_per_category: t.Annotated[
int | None, "Max goals to sample per category"
] = None,
goal_ids: t.Annotated[list[str] | None, "Specific goal IDs (overrides categories)"] = None,
goals_per_category: t.Annotated[int | None, "Max goals to sample per category"] = None,
attacker_model: t.Annotated[str, "Attacker LLM"] = "",
evaluator_model: t.Annotated[str, "Judge LLM"] = "",
transform_model: t.Annotated[str, "Transform LLM"] = "",
Expand Down Expand Up @@ -203,14 +213,30 @@ def generate_agentic_attack(
agent_url: t.Annotated[str, "HTTP endpoint of the target agent"],
attacker_model: t.Annotated[str, "LLM generating attack prompts"],
attack_type: t.Annotated[str, "Attack type (default: tap)"] = "tap",
agent_auth_type: t.Annotated[str, "Auth scheme: 'none', 'bearer', or 'api_key'"] = "none",
agent_auth_env_var: t.Annotated[str, "Env var name for auth credential"] = "AGENT_API_KEY",
agent_request_template: t.Annotated[str, "JSON request template with {prompt} placeholder"] = "",
agent_response_text_path: t.Annotated[str, "JSONPath to extract response text"] = "",
agent_response_tool_calls_path: t.Annotated[str, "JSONPath for tool calls in response"] = "",
agent_dangerous_tools: t.Annotated[list[str] | None, "Dangerous tool names to target for agentic scoring"] = None,
agent_safe_tools: t.Annotated[list[str] | None, "Safe tool whitelist for agentic scoring"] = None,
agent_preset: t.Annotated[str, "Preset: 'openai_assistants', 'anthropic', or 'custom'"] = "custom",
agent_auth_type: t.Annotated[
str, "Auth scheme: 'none', 'bearer', or 'api_key'"
] = "none",
agent_auth_env_var: t.Annotated[
str, "Env var name for auth credential"
] = "AGENT_API_KEY",
agent_request_template: t.Annotated[
str, "JSON request template with {prompt} placeholder"
] = "",
agent_response_text_path: t.Annotated[
str, "JSONPath to extract response text"
] = "",
agent_response_tool_calls_path: t.Annotated[
str, "JSONPath for tool calls in response"
] = "",
agent_dangerous_tools: t.Annotated[
list[str] | None, "Dangerous tool names to target for agentic scoring"
] = None,
agent_safe_tools: t.Annotated[
list[str] | None, "Safe tool whitelist for agentic scoring"
] = None,
agent_preset: t.Annotated[
str, "Preset: 'openai_assistants', 'anthropic', or 'custom'"
] = "custom",
evaluator_model: t.Annotated[str, "Judge LLM"] = "",
transform_model: t.Annotated[str, "Transform LLM"] = "",
transforms: t.Annotated[list[str] | None, "Transforms to apply"] = None,
Expand Down Expand Up @@ -276,12 +302,14 @@ def generate_image_attack(
] = "hopskipjump",
input_type: t.Annotated[
str,
"Input data type: 'image' (load from URL, perturb pixels) or " "'tabular' (feature array + API endpoint)",
"Input data type: 'image' (load from URL, perturb pixels) or "
"'tabular' (feature array + API endpoint)",
] = "image",
# --- Image-specific params ---
image_url: t.Annotated[
str,
"URL of the source image (for input_type='image'). " "Can also be a local file path.",
"URL of the source image (for input_type='image'). "
"Can also be a local file path.",
] = "",
# --- Tabular-specific params ---
features: t.Annotated[
Expand All @@ -295,7 +323,9 @@ def generate_image_attack(
"and returns {predictions: [{class: int, confidence: float}]}",
] = "",
api_key: t.Annotated[str, "API key for x-api-key header (optional)"] = "",
target_class: t.Annotated[int, "Class to flip TO (adversarial target), e.g. 1 for fraud"] = 1,
target_class: t.Annotated[
int, "Class to flip TO (adversarial target), e.g. 1 for fraud"
] = 1,
original_class: t.Annotated[
int | str,
"Original class of the source input, e.g. 0 for legitimate",
Expand Down
47 changes: 39 additions & 8 deletions capabilities/ai-red-teaming/tools/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,32 @@

from dreadnode.agents.tools import tool

SESSION_PATH = Path(
os.environ.get(
"AIRT_SESSION_PATH",
os.path.expanduser("~/workspace/airt/.session_context.json"),

def _default_session_path() -> Path:
try:
from dreadnode.app.config import UserConfig

config = UserConfig.read()
profile_data = config.active_profile
if profile_data:
_, profile = profile_data
org = profile.organization or "default"
workspace = profile.workspace or "main"
else:
org = "default"
workspace = "main"
except Exception:
org = "default"
workspace = "main"
return (
Path.home() / ".dreadnode" / "airt" / org / workspace / ".session_context.json"
)


SESSION_PATH = (
Path(os.environ["AIRT_SESSION_PATH"])
if os.environ.get("AIRT_SESSION_PATH")
else _default_session_path()
)


Expand Down Expand Up @@ -89,7 +110,9 @@ def save_session_context(
session["history"] = history[-20:]

_save(session)
return "Session context saved. Target: {}, Goal: {}, Last attack: {}".format(target_model, goal[:60], attack_type)
return "Session context saved. Target: {}, Goal: {}, Last attack: {}".format(
target_model, goal[:60], attack_type
)


@tool
Expand Down Expand Up @@ -134,10 +157,18 @@ def get_session_context() -> str:
lines.append("")
lines.append("Attack History ({} runs):".format(len(history)))
for h in history[-5:]: # Show last 5
score_str = "ASR={}%".format(h["best_score"]) if h.get("best_score") is not None else "no score"
tx_str = "+{}".format(",".join(h["transforms"])) if h.get("transforms") else ""
score_str = (
"ASR={}%".format(h["best_score"])
if h.get("best_score") is not None
else "no score"
)
tx_str = (
"+{}".format(",".join(h["transforms"])) if h.get("transforms") else ""
)
lines.append(
" - {} {}: {} ({})".format(h.get("attack_type", "?"), tx_str, h.get("goal", "")[:40], score_str)
" - {} {}: {} ({})".format(
h.get("attack_type", "?"), tx_str, h.get("goal", "")[:40], score_str
)
)

return "\n".join(lines)
Expand Down
Loading