From d4145ec1b50b13cab8d939543041514bde4ef830 Mon Sep 17 00:00:00 2001 From: Andrei Petraru Date: Tue, 9 Jun 2026 15:53:12 +0300 Subject: [PATCH 1/3] feat(guardrails): add HITL escalation (EscalateAction) for guardrail middlewares [AL-289] MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add EscalateAction, a GuardrailAction that routes a guardrail violation to a human reviewer via the documented HITL primitive interrupt(CreateEscalation(...)): the run suspends with a task in a UiPath Action App, and on resume Approve (optionally editing the reviewed input/output) continues while Reject terminates. - Runtime context: a GuardrailActionContext ContextVar (scope / stage / component / description / original input) is published around each action call so EscalateAction derives the app payload's Component, ExecutionStage, GuardrailDescription and the Inputs/Outputs split automatically — no hardcoding. - Stage-aware payload: PRE fills Inputs; POST fills Outputs and carries the original input in Inputs so the reviewer sees both. Legacy ToolInputs/ToolOutputs are still sent for older apps (comments/docs use Inputs/Outputs). Typed recipient supported. - Shared, stage-gated _build_message_hooks across all message middlewares (PII, harmful content, IP, prompt injection, user prompt attacks) so stage=PRE/POST registers the right before_*/after_* hook once (escalate once per run) and the wiring can't drift. - Re-raise GraphBubbleUp in the middleware so interrupt() isn't swallowed. - Tests (action-context publishing, hook-wiring stage gating, escalate-action suspend/approve/reject, output-stage escalation) and docs. Co-Authored-By: Claude Opus 4.8 (1M context) --- docs/guardrails.md | 44 +- samples/joke-agent/README.md | 69 ++- samples/joke-agent/graph.py | 43 +- samples/joke-agent/pyproject.toml | 2 +- src/uipath_langchain/guardrails/__init__.py | 2 + .../guardrails/_action_context.py | 56 ++ src/uipath_langchain/guardrails/actions.py | 4 +- .../guardrails/escalate_action.py | 323 +++++++++++ .../guardrails/middlewares/_base.py | 243 ++++++++- .../guardrails/middlewares/harmful_content.py | 76 +-- .../middlewares/intellectual_property.py | 55 +- .../guardrails/middlewares/pii_detection.py | 81 +-- .../middlewares/prompt_injection.py | 23 +- .../middlewares/user_prompt_attacks.py | 23 +- tests/cli/test_guardrails_in_langgraph.py | 167 +++++- .../test_action_context_publishing.py | 238 ++++++++ .../middlewares/test_hook_wiring.py | 130 ++++- tests/guardrails/test_action_context.py | 44 ++ tests/guardrails/test_escalate_action.py | 512 ++++++++++++++++++ 19 files changed, 1915 insertions(+), 220 deletions(-) create mode 100644 src/uipath_langchain/guardrails/_action_context.py create mode 100644 src/uipath_langchain/guardrails/escalate_action.py create mode 100644 tests/guardrails/middlewares/test_action_context_publishing.py create mode 100644 tests/guardrails/test_action_context.py create mode 100644 tests/guardrails/test_escalate_action.py diff --git a/docs/guardrails.md b/docs/guardrails.md index 21d45cff8..ef78f9f4d 100644 --- a/docs/guardrails.md +++ b/docs/guardrails.md @@ -39,6 +39,7 @@ agent = create_agent( ```python from uipath_langchain.guardrails import ( BlockAction, + EscalateAction, LogAction, LoggingSeverityLevel, UiPathDeterministicGuardrailMiddleware, @@ -73,7 +74,7 @@ TOOL scope for `UiPathPIIDetectionMiddleware` and `UiPathHarmfulContentMiddlewar All classes share these common parameters: - **`name`** (`str`) — display name for this guardrail instance. -- **`action`** — what to do on violation: `LogAction(...)` or `BlockAction(...)`. +- **`action`** — what to do on violation: `LogAction(...)`, `BlockAction(...)`, or `EscalateAction(...)` (escalate to a human — see [Escalation action](#escalation-action-human-in-the-loop)). - **`scopes`** (`list[GuardrailScope]`) — restrict which hooks are registered. Defaults shown in the table above. Use `GuardrailScope.AGENT`, `GuardrailScope.LLM`, `GuardrailScope.TOOL`. - **`enabled_for_evals`** (`bool`, default `True`) — set `False` to skip this guardrail when the agent runs in evaluation mode. @@ -238,6 +239,45 @@ agent = create_agent( ) ``` +### Escalation action (human-in-the-loop) + +`EscalateAction` routes a violation to a **human reviewer** instead of logging or blocking it. On a violation it builds the review payload and calls the documented HITL primitive [`interrupt(CreateEscalation(...))`](https://uipath.github.io/uipath-python/langchain/human_in_the_loop/#3-createescalation) — creating a task in a UiPath **Action App** and **suspending the run** until the reviewer responds. On resume: + +- **Approve** — if the reviewer edited the content, the edited value is substituted back into the flagged message / tool args / output; otherwise the original is kept. The edit is read from `ReviewedInputs` for a PRE (input) escalation and `ReviewedOutputs` for a POST (output) one. +- **Reject** — raises `GuardrailBlockException`, terminating the run. + +```python +from uipath_langchain.guardrails import EscalateAction +from uipath.platform.action_center.tasks import TaskRecipient, TaskRecipientType + +*UiPathPIIDetectionMiddleware( + name="PII escalation", + scopes=[GuardrailScope.AGENT], + stage=GuardrailExecutionStage.PRE, # validate once → escalate once per run + action=EscalateAction( + app_name="Guardrail Escalation Action App", + app_folder_path="Shared", + # route the review task to a specific recipient (user / group / email) + recipient=TaskRecipient( + type=TaskRecipientType.EMAIL, value="reviewer@example.com" + ), + ), + entities=[PIIDetectionEntity(PIIDetectionEntityType.EMAIL, threshold=0.5)], +), +``` + +Parameters: + +- **`app_name`** (`str`, required) — the published Action App that renders the review task. +- **`app_folder_path`** (`str`) — folder where the app is deployed. +- **`assignee`** (`str`) — the simple username/email to assign the task to. +- **`recipient`** (`TaskRecipient`) — a typed escalation target (shown above); takes precedence over `assignee`. Supports the four `TaskRecipientType` members — `USER_ID`, `GROUP_ID`, `EMAIL` (user email), and `GROUP_NAME`, e.g. `TaskRecipient(type=TaskRecipientType.GROUP_NAME, value="Reviewers")`. +- **`title`** (`str`) — task title; defaults to a message derived from the guardrail name. + +> 💡 **Escalate once per run.** On AGENT/LLM scope a guardrail validates both *before* and *after* by default, which can escalate twice. Set `stage=GuardrailExecutionStage.PRE` (or `POST`) so only a single checkpoint is registered. + +> ⚠️ **Requires a published Action App.** The target app must exist in the configured folder for the task to be created. Resume is durable — the run suspends on `interrupt()` and resumes when the task is completed. See [Human In The Loop](https://uipath.github.io/uipath-python/langchain/human_in_the_loop/) for the underlying primitive. + ### Custom actions Both the built-in middleware and `UiPathDeterministicGuardrailMiddleware` accept any `GuardrailAction` subclass as the `action` parameter. This lets you implement content sanitisation, redaction, or any other custom response to a violation: @@ -413,7 +453,7 @@ Imported from `uipath_langchain.guardrails`. |---|---| | `PRE` | Before the call (inspect / block inputs) | | `POST` | After the call (inspect / transform outputs) | -| `PRE_AND_POST` | Both — used only by `UiPathDeterministicGuardrailMiddleware` | +| `PRE_AND_POST` | Both checkpoints (the default) | ### LoggingSeverityLevel diff --git a/samples/joke-agent/README.md b/samples/joke-agent/README.md index e8f328b5e..cf81a249e 100644 --- a/samples/joke-agent/README.md +++ b/samples/joke-agent/README.md @@ -1,6 +1,6 @@ # Joke Agent -A LangGraph agent that generates family-friendly jokes based on a given topic using UiPath's LLM. The agent includes comprehensive guardrails for PII detection, prompt injection prevention, and content validation. +A LangGraph agent that generates family-friendly jokes based on a given topic using UiPath's LLM. The agent includes comprehensive guardrails for PII detection, prompt injection prevention, and content validation — plus a **human-in-the-loop (HITL) escalation** to the *Guardrail Escalation Action App* when PII is detected in the agent's input. ## Requirements @@ -52,12 +52,79 @@ The `topic` field should be a string representing the subject for the joke. The - Custom logging middleware that logs input and output - Simple, clean architecture following UiPath agent patterns +## Guardrail Escalation (Human-in-the-Loop) + +The PII detection guardrail uses **`EscalateAction`** — a first-class guardrail +middleware action, alongside `LogAction` and `BlockAction`. **When PII is detected, the run +escalates to a human reviewer** through the **Guardrail Escalation Action App** and suspends +until the task is completed, reusing the documented UiPath LangChain HITL primitive +`interrupt(CreateEscalation(...))` — the same mechanism the +[`ticket-classification`](../ticket-classification) sample uses. + +```python +UiPathPIIDetectionMiddleware( + name="PII escalation guardrail", + scopes=[GuardrailScope.AGENT], + stage=GuardrailExecutionStage.PRE, # validate input once → escalate once per run + action=EscalateAction( + app_name="Guardrail.Escalation.Action.App.2", + app_folder_path="Shared", + ), + entities=[PIIDetectionEntity(PIIDetectionEntityType.EMAIL, 0.5)], +) +``` + +On a violation the run **suspends** with a review task in the action app. The reviewer can edit the +flagged input (or, at a POST check, the output) and **Approve** to resume with the edited value, or +**Reject** to terminate the run with a guardrail-violation error. + +### Configuration + +The target app is configured via environment variables (defaults shown): + +| Variable | Default | Purpose | +|---|---|---| +| `GUARDRAIL_ESCALATION_APP_NAME` | `Guardrail.Escalation.Action.App.2` | Published action app (process) name | +| `GUARDRAIL_ESCALATION_APP_FOLDER` | `Shared` | Folder where the app is deployed | + +The **Guardrail Escalation Action App (2)** must be published to the configured folder on +your tenant for the escalation task to be created. The defaults match its deployed +name/folder (find yours with `uip or processes list --all-folders --name Guardrail`). + +**Escalation target.** `assignee` is the simple username/email shortcut. For typed targets, +pass `recipient=TaskRecipient(...)` instead — the HITL `CreateEscalation` primitive supports the +four `TaskRecipientType` members: `USER_ID`, `GROUP_ID`, `EMAIL` (user email), and `GROUP_NAME`, e.g.: + +```python +from uipath.platform.action_center.tasks import TaskRecipient, TaskRecipientType + +EscalateAction( + app_name="Guardrail.Escalation.Action.App.2", + app_folder_path="Shared", + recipient=TaskRecipient(type=TaskRecipientType.GROUP_NAME, value="Reviewers"), +) +``` + +### Triggering an escalation + +A `"banana"` topic contains no PII, so it completes without escalating. To exercise the +HITL path, use a topic that contains PII, e.g.: + +```bash +uv run uipath run agent '{"topic": "a joke that mentions the email john.doe@example.com"}' +``` + +The run will suspend after creating the review task. Complete the task in Action Center +(choosing **Approve** or **Reject**), then resume the run with `uv run uipath run agent --resume ...`. + ## Agent Architecture The agent is built using LangGraph's `StateGraph` with custom input/output schemas: - **Input Schema**: `Input` with a `topic` field - **Output Schema**: `Output` with a `joke` field +- **`joke` node**: runs the guarded `create_agent` and extracts the joke. The guardrail + middleware (including the PII `EscalateAction`) runs inside the agent. - **LLM**: UiPathChat with model `gpt-4o-2024-08-06` and temperature `0.7` ### Tools diff --git a/samples/joke-agent/graph.py b/samples/joke-agent/graph.py index bb1d85686..1f1043b6f 100644 --- a/samples/joke-agent/graph.py +++ b/samples/joke-agent/graph.py @@ -1,5 +1,7 @@ """Joke generating agent that creates family-friendly jokes based on a topic.""" +import os + from langchain.agents import create_agent from langchain_core.messages import HumanMessage from langchain_core.tools import tool @@ -12,6 +14,7 @@ from uipath_langchain.chat import UiPathChat from uipath_langchain.guardrails import ( BlockAction, + EscalateAction, GuardrailExecutionStage, HarmfulContentEntity, LogAction, @@ -43,6 +46,15 @@ class Output(BaseModel): joke: str +# Escalation Action App configuration (override via env vars to match your +# tenant deployment). Defaults point at the "Guardrail Escalation Action App (2)" +# published as the process "Guardrail.Escalation.Action.App.2" in the "Shared" +# folder (its deployed name/folder in the tenant — verified via `uip or processes list`). +ESCALATION_APP_NAME = os.getenv( + "GUARDRAIL_ESCALATION_APP_NAME", "Guardrail.Escalation.Action.App.2" +) +ESCALATION_APP_FOLDER = os.getenv("GUARDRAIL_ESCALATION_APP_FOLDER", "Shared") + # Initialize UiPathChat LLM llm = UiPathChat(model="gpt-4o-2024-08-06", temperature=0.7) @@ -93,13 +105,25 @@ def analyze_joke_syntax(joke: str) -> str: system_prompt=SYSTEM_PROMPT, middleware=[ *LoggingMiddleware, + # PII detection on the agent scope. On a violation it escalates to the + # Guardrail Escalation Action App for human review via the documented + # HITL interrupt(CreateEscalation(...)) — the run suspends until a human + # approves (optionally editing the content) or rejects. *UiPathPIIDetectionMiddleware( - name="My personal PII detector", - scopes=[GuardrailScope.AGENT, GuardrailScope.LLM], - action=LogAction(severity_level=LoggingSeverityLevel.WARNING), + name="PII escalation guardrail", + scopes=[GuardrailScope.AGENT], + # PRE only → validate the input once, so the escalation triggers a + # single time per run (AGENT scope would otherwise check both + # before_agent and after_agent). + stage=GuardrailExecutionStage.PRE, + action=EscalateAction( + app_name=ESCALATION_APP_NAME, + app_folder_path=ESCALATION_APP_FOLDER, + ), entities=[ PIIDetectionEntity(PIIDetectionEntityType.EMAIL, 0.5), PIIDetectionEntity(PIIDetectionEntityType.CREDIT_CARD_NUMBER, 0.5), + PIIDetectionEntity(PIIDetectionEntityType.PHONE_NUMBER, 0.5), ], ), *UiPathPIIDetectionMiddleware( @@ -182,26 +206,23 @@ def analyze_joke_syntax(joke: str) -> str: ) -# Wrapper node to convert topic input to messages and call the agent +# Wrapper node to convert topic input to messages and call the agent. The +# guardrail middleware runs inside the agent; when the PII escalation guardrail +# fires, interrupt(CreateEscalation(...)) suspends the run for human review. async def joke_node(state: Input) -> Output: """Convert topic to messages, call agent, and extract joke.""" - # Convert topic to messages format messages = [ HumanMessage( content=f"Generate a family-friendly joke based on the topic: {state.topic}" ) ] - - # Call the agent with messages result = await agent.ainvoke({"messages": messages}) - - # Extract the joke from the agent's response joke = result["messages"][-1].content - return Output(joke=joke) -# Build wrapper graph with custom input/output schemas +# Build wrapper graph with custom input/output schemas. The runtime recompiles +# this with a durable checkpointer, so interrupt()/resume works under `uipath run`. builder = StateGraph(Input, input_schema=Input, output_schema=Output) builder.add_node("joke", joke_node) builder.add_edge(START, "joke") diff --git a/samples/joke-agent/pyproject.toml b/samples/joke-agent/pyproject.toml index 98a7aa6c8..b13711398 100644 --- a/samples/joke-agent/pyproject.toml +++ b/samples/joke-agent/pyproject.toml @@ -5,7 +5,7 @@ description = "Joke generating agent that creates family-friendly jokes based on authors = [{ name = "John Doe", email = "john.doe@myemail.com" }] requires-python = ">=3.11" dependencies = [ - "uipath-langchain>=0.11.3, <0.12.0", + "uipath-langchain>=0.11.13, <0.12.0", "uipath>2.7.0", ] diff --git a/src/uipath_langchain/guardrails/__init__.py b/src/uipath_langchain/guardrails/__init__.py index fd997b8a9..cb9444a71 100644 --- a/src/uipath_langchain/guardrails/__init__.py +++ b/src/uipath_langchain/guardrails/__init__.py @@ -31,6 +31,7 @@ ) from ._langchain_adapter import LangChainGuardrailAdapter +from .escalate_action import EscalateAction from .middlewares import ( UiPathDeterministicGuardrailMiddleware, UiPathHarmfulContentMiddleware, @@ -67,6 +68,7 @@ # Actions "LogAction", "BlockAction", + "EscalateAction", "LoggingSeverityLevel", # Exception "GuardrailBlockException", diff --git a/src/uipath_langchain/guardrails/_action_context.py b/src/uipath_langchain/guardrails/_action_context.py new file mode 100644 index 000000000..96b94d2e6 --- /dev/null +++ b/src/uipath_langchain/guardrails/_action_context.py @@ -0,0 +1,56 @@ +"""Runtime context for guardrail actions (scope / stage / component). + +A :class:`GuardrailAction`'s ``handle_validation_result(result, data, +guardrail_name)`` signature does not carry the guardrail's scope, execution +stage, or guarded-component label — but the middleware that invokes the action +knows all three. The middleware publishes them here (via a ``ContextVar``) for +the duration of the action call, so actions that need them — e.g. +``EscalateAction``, which maps them onto the escalation app's ``Component`` / +``ExecutionStage`` fields — can read them at runtime instead of requiring the +developer to hardcode them. + +The context is set synchronously around each action invocation and reset +afterwards, so it is correct across LangGraph's interrupt/replay too (it is +re-published on every replay). +""" + +from __future__ import annotations + +from contextvars import ContextVar +from dataclasses import dataclass + +from .enums import GuardrailExecutionStage, GuardrailScope + + +@dataclass(frozen=True) +class GuardrailActionContext: + """The guardrail context active while an action handles a violation.""" + + scope: GuardrailScope | None = None + execution_stage: GuardrailExecutionStage | None = None + component: str | None = None + description: str | None = None + input_payload: str | None = None + + +_action_context: ContextVar[GuardrailActionContext | None] = ContextVar( + "uipath_guardrail_action_context", default=None +) + + +def current_action_context() -> GuardrailActionContext | None: + """Return the guardrail context for the in-flight action call, if any.""" + return _action_context.get() + + +def component_label(scope: GuardrailScope | None) -> str | None: + """Map a guardrail scope to the app's component label (matches the SDK). + + TOOL has no static label here — the tool name is supplied separately by the + caller — so this returns ``None`` for TOOL scope. + """ + if scope == GuardrailScope.AGENT: + return "Agent" + if scope == GuardrailScope.LLM: + return "LLM call" + return None diff --git a/src/uipath_langchain/guardrails/actions.py b/src/uipath_langchain/guardrails/actions.py index 49aa5993f..cf21c6559 100644 --- a/src/uipath_langchain/guardrails/actions.py +++ b/src/uipath_langchain/guardrails/actions.py @@ -6,4 +6,6 @@ LoggingSeverityLevel, ) -__all__ = ["LoggingSeverityLevel", "LogAction", "BlockAction"] +from .escalate_action import EscalateAction + +__all__ = ["LoggingSeverityLevel", "LogAction", "BlockAction", "EscalateAction"] diff --git a/src/uipath_langchain/guardrails/escalate_action.py b/src/uipath_langchain/guardrails/escalate_action.py new file mode 100644 index 000000000..a187fb5de --- /dev/null +++ b/src/uipath_langchain/guardrails/escalate_action.py @@ -0,0 +1,323 @@ +"""Human-in-the-loop escalation action for LangChain guardrail middlewares. + +``EscalateAction`` is a :class:`GuardrailAction` that, on a guardrail violation, +escalates the flagged content to a human reviewer through a UiPath **Action +App** (e.g. the *Guardrail Escalation Action App*) using the documented HITL +primitive ``interrupt(CreateEscalation(...))`` — the same mechanism coded +agents use for human-in-the-loop tasks. + +It is the escalation counterpart to :class:`LogAction` and :class:`BlockAction` +for the middleware path:: + + UiPathPIIDetectionMiddleware( + scopes=[GuardrailScope.AGENT], + action=EscalateAction( + app_name="Guardrail.Escalation.Action.App.2", + app_folder_path="Shared", + ), + entities=[PIIDetectionEntity(PIIDetectionEntityType.EMAIL, 0.5)], + ) + +Lifecycle: + +1. On ``VALIDATION_FAILED``, the flagged content is mapped onto the action + app's input schema and ``interrupt(CreateEscalation(...))`` is raised. The + platform creates the task, suspends the run durably, and resumes it once a + human acts. ``interrupt()`` is memoized, so replay-on-resume never creates a + duplicate task. +2. On resume, the completed task's outcome drives the result: + - ``Approve`` → return the reviewer-edited content so the middleware + substitutes it, or keep the original when the reviewed value is + absent/empty (matching the factory-path ``EscalateAction``). The reviewed + value is read from ``ReviewedInputs`` for a PRE (input) escalation and from + ``ReviewedOutputs`` for a POST (output) one. + - ``Reject`` → raise :class:`GuardrailBlockException`, terminating the run + (mirroring the SDK's factory-path ``EscalateAction``). + +The flagged content is mapped onto the action app's schema using the guardrail +context the middleware publishes (scope / stage / component / description): a PRE +violation fills ``Inputs``; a POST violation fills ``Outputs`` and carries the +original input in ``Inputs``, so the reviewer sees both. +""" + +from __future__ import annotations + +import json +import logging +from dataclasses import dataclass +from typing import Any + +from langgraph.types import interrupt +from uipath.core.guardrails import ( + GuardrailValidationResult, + GuardrailValidationResultType, +) +from uipath.platform.action_center.tasks import TaskRecipient +from uipath.platform.common import CreateEscalation, UiPathConfig +from uipath.platform.guardrails.decorators import ( + GuardrailAction, + GuardrailBlockException, +) + +from ._action_context import GuardrailActionContext, current_action_context +from .enums import GuardrailExecutionStage + +logger = logging.getLogger(__name__) + + +@dataclass +class EscalateAction(GuardrailAction): + """Escalate guardrail violations to a human via a UiPath Action App. + + The escalation-app payload fields — ``Component``, ``ExecutionStage``, + ``GuardrailDescription``, and the ``Inputs`` / ``Outputs`` input-vs-output + split — are derived at runtime from the guardrail context the middleware + publishes (scope → component, hook → stage, guardrail → description), so they + are not configured here. + + Args: + app_name: Name of the published escalation Action App. + app_folder_path: Folder where the app is deployed. + assignee: Optional task assignee — the simple username/email shortcut. + recipient: Optional typed escalation target (``TaskRecipient``). Use this + to target a ``UserId`` / ``GroupId`` / ``UserEmail`` / ``GroupName`` + (the recipient types the HITL ``CreateEscalation`` primitive + supports). Takes precedence over ``assignee`` when set. + title: Optional task title. Defaults to a message derived from the + guardrail name. + """ + + app_name: str + app_folder_path: str | None = None + assignee: str | None = None + recipient: TaskRecipient | None = None + title: str | None = None + + def handle_validation_result( + self, + result: GuardrailValidationResult, + data: str | dict[str, Any], + guardrail_name: str, + ) -> str | dict[str, Any] | None: + """Escalate to the action app and apply the reviewer's decision.""" + if result.result != GuardrailValidationResultType.VALIDATION_FAILED: + return None + + ctx = current_action_context() + data_is_dict = isinstance(data, dict) + # JSON-encode the flagged payload. The action app parses this field as + # JSON (the helix backend sends it via ToJsonString and the factory-path + # EscalateAction via json.dumps), so a raw string leaves the app's + # input box empty. + content = json.dumps(data) + + logger.warning( + "[GUARDRAIL] [%s] violation detected — escalating to app '%s'.", + guardrail_name, + self.app_name, + ) + + raw = interrupt( + CreateEscalation( + app_name=self.app_name, + app_folder_path=self.app_folder_path, + title=self.title or f"Guardrail '{guardrail_name}': review required", + data=self._build_app_inputs(guardrail_name, result, content, ctx), + assignee=self.assignee, + recipient=self.recipient, + ) + ) + + outcome, response = _normalize_escalation_result(raw) + logger.info( + "[GUARDRAIL] [%s] escalation resolved with outcome '%s'.", + guardrail_name, + outcome, + ) + + if outcome.lower() == "approve": + reviewed = response.get(_reviewed_field_name(ctx)) + if not reviewed: + return None + return _coerce_reviewed(reviewed, data_is_dict) + + reason = response.get("Reason") or result.reason or "No reason provided." + raise GuardrailBlockException( + title=f"Guardrail [{guardrail_name}] escalation rejected", + detail=reason, + ) + + def _build_app_inputs( + self, + guardrail_name: str, + result: GuardrailValidationResult, + content: str, + ctx: GuardrailActionContext | None, + ) -> dict[str, Any]: + """Map the guardrail context onto the action app's input schema. + + Keys mirror the deployed ``Guardrail Escalation Action App`` schema. The + flagged content uses ``Inputs`` / ``Outputs``, split by the published + execution stage: + + - PRE → flagged content fills ``Inputs`` (``Outputs`` empty). + - POST → flagged content fills ``Outputs``; the original input + (``ctx.input_payload``) fills ``Inputs`` so the reviewer sees both. + """ + is_post = bool(ctx and ctx.execution_stage == GuardrailExecutionStage.POST) + data: dict[str, Any] = { + "GuardrailName": guardrail_name, + "GuardrailDescription": (ctx.description if ctx else None) or "", + "GuardrailResult": result.reason or "", + } + if is_post: + data["Inputs"] = (ctx.input_payload if ctx else None) or "" + data["Outputs"] = content + else: + data["Inputs"] = content + data["Outputs"] = "" + # Legacy aliases kept for older apps that read the Tool-prefixed fields. + data["ToolInputs"] = data["Inputs"] + data["ToolOutputs"] = data["Outputs"] + if ctx and ctx.component: + data["Component"] = ctx.component + data["Tool"] = ctx.component + execution_stage = _execution_stage_label(ctx.execution_stage if ctx else None) + if execution_stage: + data["ExecutionStage"] = execution_stage + tenant_name = _resolve_tenant_name() + if tenant_name: + data["TenantName"] = tenant_name + trace_url = _agent_trace_url() + if trace_url: + data["AgentTrace"] = trace_url + return data + + +def _normalize_escalation_result(raw: Any) -> tuple[str, dict[str, Any]]: + """Normalize the interrupt resume value into ``(outcome, data)``. + + ``CreateEscalation`` resumes with the completed task. Depending on how the + platform delivers it, ``raw`` may be a task-like object (``.action`` / + ``.data``) or a plain dict, so both are handled. + """ + if raw is None: + return "Approve", {} + + if isinstance(raw, dict): + outcome = raw.get("action") or raw.get("Action") or raw.get("outcome") + data = raw.get("data") + if not isinstance(data, dict): + data = raw + return (str(outcome) if outcome else "Approve"), data + + outcome = getattr(raw, "action", None) + data = getattr(raw, "data", None) + if not isinstance(data, dict): + data = {} + return (str(outcome) if outcome else "Approve"), data + + +def _coerce_reviewed(reviewed: Any, want_dict: bool) -> str | dict[str, Any]: + """Coerce the reviewed value back to the original data's shape.""" + if want_dict and isinstance(reviewed, str): + try: + parsed = json.loads(reviewed) + if isinstance(parsed, dict): + return parsed + except (json.JSONDecodeError, TypeError): + pass + return reviewed + + +def _execution_stage_label(stage: GuardrailExecutionStage | None) -> str | None: + """Map a guardrail execution stage to the action app's stage label.""" + if stage == GuardrailExecutionStage.PRE: + return "PreExecution" + if stage == GuardrailExecutionStage.POST: + return "PostExecution" + return None + + +def _reviewed_field_name(ctx: GuardrailActionContext | None) -> str: + """Return the resume field the reviewer edits, keyed by execution stage. + + A PRE (input) escalation comes back as ``ReviewedInputs``; a POST (output) + one as ``ReviewedOutputs`` — matching the action app's output schema. With no + stage context we default to ``ReviewedInputs``. + """ + stage = ctx.execution_stage if ctx else None + if stage == GuardrailExecutionStage.POST: + return "ReviewedOutputs" + return "ReviewedInputs" + + +def _safe_config_attr(attr: str) -> Any: + """Read an attribute off ``UiPathConfig``, tolerating missing context.""" + try: + return getattr(UiPathConfig, attr, None) + except Exception: # pragma: no cover - config not always populated locally + return None + + +def _resolve_tenant_name() -> str | None: + """Resolve the tenant name for the escalation payload. + + Prefers ``UiPathConfig.tenant_name`` (the ``UIPATH_TENANT_NAME`` env var, + injected by Orchestrator in deployed runs). Falls back to parsing the + tenant segment from the base URL — which ``uipath auth`` writes as + ``UIPATH_URL`` (e.g. ``https://...//``) — so the logged-in + tenant is populated for local ``uipath run`` too. + """ + name = _safe_config_attr("tenant_name") + if name: + return name + base_url = _safe_config_attr("base_url") + if not base_url: + return None + try: + from uipath._utils import UiPathUrl + + return UiPathUrl(base_url).tenant_name or None + except Exception: # pragma: no cover - defensive + return None + + +def _agent_trace_url() -> str | None: + """Build the agent execution viewer URL from ``UiPathConfig``. + + Mirrors the factory-path ``EscalateAction`` so the action app's "Agent + trace" field links to the run. Returns ``None`` when the runtime context + (base URL / trace identifiers) isn't populated — e.g. local runs without a + deployment context — so we never emit a URL containing ``None`` segments. + """ + base_url = _safe_config_attr("base_url") + organization_id = _safe_config_attr("organization_id") + if not base_url or not organization_id: + return None + try: + from uipath._utils import UiPathUrl + + normalized = UiPathUrl(base_url).base_url + if _safe_config_attr("is_studio_project"): + project_id = _safe_config_attr("project_id") + solution_id = _safe_config_attr("studio_solution_id") + if not project_id: + return None + return ( + f"{normalized}/{organization_id}/studio_/designer/" + f"{project_id}?solutionId={solution_id}" + ) + folder_key = _safe_config_attr("folder_key") + process_uuid = _safe_config_attr("process_uuid") + trace_id = _safe_config_attr("trace_id") + project_key = _safe_config_attr("project_key") + package_version = _safe_config_attr("process_version") + if not (folder_key and process_uuid and trace_id): + return None + return ( + f"{normalized}/{organization_id}/agents_/deployed/{folder_key}/" + f"{process_uuid}/{project_key}/{package_version}/traces/{trace_id}" + ) + except Exception: # pragma: no cover - defensive: never break escalation + return None diff --git a/src/uipath_langchain/guardrails/middlewares/_base.py b/src/uipath_langchain/guardrails/middlewares/_base.py index 1fe48c04b..d024cbd5f 100644 --- a/src/uipath_langchain/guardrails/middlewares/_base.py +++ b/src/uipath_langchain/guardrails/middlewares/_base.py @@ -4,13 +4,24 @@ import asyncio import json import logging -from typing import Any +from typing import Any, Sequence -from langchain.agents.middleware import AgentMiddleware, wrap_tool_call +from langchain.agents.middleware import ( + AgentMiddleware, + AgentState, + after_agent, + after_model, + before_agent, + before_model, + wrap_tool_call, +) from langchain_core.messages import AIMessage, BaseMessage, HumanMessage, ToolMessage +from langgraph.errors import GraphBubbleUp from langgraph.prebuilt.tool_node import ToolCallRequest +from langgraph.runtime import Runtime from langgraph.types import Command from uipath.core.guardrails import ( + GuardrailScope, GuardrailValidationResult, GuardrailValidationResultType, ) @@ -21,6 +32,11 @@ from uipath_langchain.agent.exceptions import AgentRuntimeError +from .._action_context import ( + GuardrailActionContext, + _action_context, + component_label, +) from ..models import GuardrailAction from ._utils import ( convert_block_exception, @@ -59,6 +75,27 @@ def _parse_str_content(content: str) -> dict[str, Any]: return {"output": content} +def _apply_text_modification( + messages: list[BaseMessage], + original: str, + modified: str | dict[str, Any] | None, +) -> None: + """Substitute ``original`` with ``modified`` in the first matching message. + + No-op unless ``modified`` is a string that differs from ``original``. + """ + if not isinstance(modified, str) or modified == original: + return + for msg in messages: + if ( + isinstance(msg, (HumanMessage, AIMessage)) + and isinstance(msg.content, str) + and original in msg.content + ): + msg.content = msg.content.replace(original, modified, 1) + return + + class BuiltInGuardrailMiddlewareMixin: """Mixin providing shared evaluation logic for built-in guardrail middlewares. @@ -91,12 +128,38 @@ def _evaluate_guardrail( return uipath.guardrails.evaluate_guardrail(input_data, self._guardrail) def _handle_validation_result( - self, result: GuardrailValidationResult, input_data: str | dict[str, Any] + self, + result: GuardrailValidationResult, + input_data: str | dict[str, Any], + *, + scope: GuardrailScope | None = None, + stage: GuardrailExecutionStage | None = None, + component: str | None = None, + input_payload: str | None = None, ) -> str | dict[str, Any] | None: - """Delegate to the action when a violation is detected.""" - if result.result == GuardrailValidationResultType.VALIDATION_FAILED: + """Delegate to the action when a violation is detected. + + Publishes the guardrail context (scope / stage / component / the + guardrail's description, plus the original ``input_payload`` on a POST + output check) for the duration of the action call so context-aware + actions (e.g. ``EscalateAction``) can read it instead of requiring it to + be hardcoded. + """ + if result.result != GuardrailValidationResultType.VALIDATION_FAILED: + return None + token = _action_context.set( + GuardrailActionContext( + scope=scope, + execution_stage=stage, + component=component, + description=getattr(self._guardrail, "description", None), + input_payload=input_payload, + ) + ) + try: return self.action.handle_validation_result(result, input_data, self._name) - return None + finally: + _action_context.reset(token) def _extract_tool_output_data( self, result: ToolMessage | Command[Any] @@ -139,13 +202,23 @@ async def _run_tool_guardrail( input_data = self._extract_tool_input_data(request) try: result = await asyncio.to_thread(self._evaluate_guardrail, input_data) - modified_input = self._handle_validation_result(result, input_data) + modified_input = self._handle_validation_result( + result, + input_data, + scope=GuardrailScope.TOOL, + stage=GuardrailExecutionStage.PRE, + component=tool_name, + ) if modified_input is not None: request = create_modified_tool_request(request, modified_input) except GuardrailBlockException as exc: raise convert_block_exception(exc) from exc except AgentRuntimeError: raise + except GraphBubbleUp: + # LangGraph control-flow signals (e.g. interrupt() from an + # escalation action). Must bubble up so the run can suspend. + raise except Exception: logger.exception( f"Error evaluating '{self._name}' guardrail (PRE)" @@ -165,7 +238,14 @@ async def _run_tool_guardrail( self._evaluate_guardrail, output_data ) modified_output = self._handle_validation_result( - result, output_data + result, + output_data, + scope=GuardrailScope.TOOL, + stage=GuardrailExecutionStage.POST, + component=tool_name, + input_payload=json.dumps( + self._extract_tool_input_data(request) + ), ) if modified_output is not None: tool_result = create_modified_tool_result( @@ -175,6 +255,10 @@ async def _run_tool_guardrail( raise convert_block_exception(exc) from exc except AgentRuntimeError: raise + except GraphBubbleUp: + # LangGraph control-flow signals (e.g. interrupt() from an + # escalation action). Must bubble up so the run can suspend. + raise except Exception: logger.exception( f"Error evaluating '{self._name}' guardrail (POST)" @@ -196,8 +280,123 @@ async def _wrap_tool_call_func( _wrap_tool_call_func.__name__ = f"{guardrail_name}_wrap_tool_call" return wrap_tool_call(_wrap_tool_call_func) # type: ignore[call-overload] - def _check_messages(self, messages: list[BaseMessage]) -> None: - """Evaluate guardrail against message text; apply action on violation.""" + def _build_message_hooks( + self, + scope: GuardrailScope, + stage: GuardrailExecutionStage, + guardrail_name: str, + ) -> list[AgentMiddleware]: + """Build stage-gated before/after message hooks for an AGENT or LLM scope. + + ``PRE`` registers only the ``before_*`` hook, ``POST`` only the + ``after_*`` hook, and ``PRE_AND_POST`` both — so a guardrail validates + (and acts, e.g. escalates) at a single checkpoint instead of twice per + run. Shared by the message-based middlewares (PII / harmful content / + intellectual property) so their hook wiring can't drift apart. + """ + include_pre = stage in ( + GuardrailExecutionStage.PRE, + GuardrailExecutionStage.PRE_AND_POST, + ) + include_post = stage in ( + GuardrailExecutionStage.POST, + GuardrailExecutionStage.PRE_AND_POST, + ) + mw = self + hooks: list[AgentMiddleware] = [] + + if scope == GuardrailScope.AGENT: + if include_pre: + + async def _before_agent_func( + state: AgentState[Any], runtime: Runtime + ) -> None: + messages = state.get("messages", []) + mw._check_messages( + list(messages), + scope=GuardrailScope.AGENT, + stage=GuardrailExecutionStage.PRE, + ) + + _before_agent_func.__name__ = f"{guardrail_name}_before_agent" + hooks.append(before_agent(_before_agent_func)) + + if include_post: + + async def _after_agent_func( + state: AgentState[Any], runtime: Runtime + ) -> None: + messages = state.get("messages", []) + mw._check_messages( + list(messages), + scope=GuardrailScope.AGENT, + stage=GuardrailExecutionStage.POST, + input_text=mw._last_input_text(list(messages)), + ) + + _after_agent_func.__name__ = f"{guardrail_name}_after_agent" + hooks.append(after_agent(_after_agent_func)) + + elif scope == GuardrailScope.LLM: + if include_pre: + + async def _before_model_func( + state: AgentState[Any], runtime: Runtime + ) -> None: + messages = state.get("messages", []) + mw._check_messages( + list(messages), + scope=GuardrailScope.LLM, + stage=GuardrailExecutionStage.PRE, + ) + + _before_model_func.__name__ = f"{guardrail_name}_before_model" + hooks.append(before_model(_before_model_func)) + + if include_post: + + async def _after_model_func( + state: AgentState[Any], runtime: Runtime + ) -> None: + messages = state.get("messages", []) + ai_messages = [m for m in messages if isinstance(m, AIMessage)] + if ai_messages: + mw._check_messages( + [ai_messages[-1]], + scope=GuardrailScope.LLM, + stage=GuardrailExecutionStage.POST, + input_text=mw._last_input_text(messages), + ) + + _after_model_func.__name__ = f"{guardrail_name}_after_model" + hooks.append(after_model(_after_model_func)) + + return hooks + + def _last_input_text(self, messages: Sequence[BaseMessage]) -> str | None: + """Return the last HumanMessage text — the input for a POST check. + + Used by ``after_*`` hooks to supply the original input alongside the + flagged output when escalating an output (POST) violation. + """ + for msg in reversed(messages): + if isinstance(msg, HumanMessage): + return extract_text_from_messages([msg]) or None + return None + + def _check_messages( + self, + messages: list[BaseMessage], + scope: GuardrailScope | None = None, + stage: GuardrailExecutionStage | None = None, + input_text: str | None = None, + ) -> None: + """Evaluate guardrail against message text; apply action on violation. + + ``input_text`` is the original input for a POST (output) check — the + message that produced the flagged output — so an escalation can show it + as ``Inputs`` alongside the flagged ``Outputs``. + """ if not messages: return @@ -207,20 +406,22 @@ def _check_messages(self, messages: list[BaseMessage]) -> None: try: result = self._evaluate_guardrail(text) - modified_text = self._handle_validation_result(result, text) - if ( - modified_text is not None - and isinstance(modified_text, str) - and modified_text != text - ): - for msg in messages: - if isinstance(msg, (HumanMessage, AIMessage)): - if isinstance(msg.content, str) and text in msg.content: - msg.content = msg.content.replace(text, modified_text, 1) - break + modified_text = self._handle_validation_result( + result, + text, + scope=scope, + stage=stage, + component=component_label(scope), + input_payload=json.dumps(input_text) if input_text else None, + ) + _apply_text_modification(messages, text, modified_text) except GuardrailBlockException as exc: raise convert_block_exception(exc) from exc except AgentRuntimeError: raise + except GraphBubbleUp: + # LangGraph control-flow signals (e.g. interrupt() from an + # escalation action). Must bubble up so the run can suspend. + raise except Exception: logger.exception(f"Error evaluating guardrail '{self._name}'") diff --git a/src/uipath_langchain/guardrails/middlewares/harmful_content.py b/src/uipath_langchain/guardrails/middlewares/harmful_content.py index 77ceaf984..dfe1cb224 100644 --- a/src/uipath_langchain/guardrails/middlewares/harmful_content.py +++ b/src/uipath_langchain/guardrails/middlewares/harmful_content.py @@ -4,17 +4,8 @@ from typing import Any, Sequence from uuid import uuid4 -from langchain.agents.middleware import ( - AgentMiddleware, - AgentState, - after_agent, - after_model, - before_agent, - before_model, -) -from langchain_core.messages import AIMessage +from langchain.agents.middleware import AgentMiddleware from langchain_core.tools import BaseTool -from langgraph.runtime import Runtime from uipath.core.guardrails import GuardrailSelector from uipath.platform.guardrails import ( BuiltInValidatorGuardrail, @@ -105,57 +96,28 @@ def __init__( self._middleware_instances = self._create_middleware_instances() def _create_middleware_instances(self) -> list[AgentMiddleware]: - """Create middleware instances from decorated functions.""" - instances = [] - middleware_instance = self + """Create middleware instances from decorated functions. + + AGENT/LLM hooks are built by the shared, stage-gated + ``_build_message_hooks`` helper (``PRE`` → ``before_*`` only, ``POST`` → + ``after_*`` only, ``PRE_AND_POST`` → both), so a guardrail validates (and + acts, e.g. escalates) at a single checkpoint instead of twice per run. + """ + instances: list[AgentMiddleware] = [] guardrail_name = self._name.replace(" ", "_") if GuardrailScope.AGENT in self.scopes: - - async def _before_agent_func( - state: AgentState[Any], runtime: Runtime - ) -> None: - messages = state.get("messages", []) - middleware_instance._check_messages(list(messages)) - - _before_agent_func.__name__ = f"{guardrail_name}_before_agent" - _before_agent = before_agent(_before_agent_func) - instances.append(_before_agent) - - async def _after_agent_func( - state: AgentState[Any], runtime: Runtime - ) -> None: - messages = state.get("messages", []) - middleware_instance._check_messages(list(messages)) - - _after_agent_func.__name__ = f"{guardrail_name}_after_agent" - _after_agent = after_agent(_after_agent_func) - instances.append(_after_agent) - + instances.extend( + self._build_message_hooks( + GuardrailScope.AGENT, self._tool_stage, guardrail_name + ) + ) if GuardrailScope.LLM in self.scopes: - - async def _before_model_func( - state: AgentState[Any], runtime: Runtime - ) -> None: - messages = state.get("messages", []) - middleware_instance._check_messages(list(messages)) - - _before_model_func.__name__ = f"{guardrail_name}_before_model" - _before_model = before_model(_before_model_func) - instances.append(_before_model) - - async def _after_model_func( - state: AgentState[Any], runtime: Runtime - ) -> None: - messages = state.get("messages", []) - ai_messages = [msg for msg in messages if isinstance(msg, AIMessage)] - if ai_messages: - middleware_instance._check_messages([ai_messages[-1]]) - - _after_model_func.__name__ = f"{guardrail_name}_after_model" - _after_model = after_model(_after_model_func) - instances.append(_after_model) - + instances.extend( + self._build_message_hooks( + GuardrailScope.LLM, self._tool_stage, guardrail_name + ) + ) if GuardrailScope.TOOL in self.scopes: instances.append(self._create_tool_wrap_hook(guardrail_name)) diff --git a/src/uipath_langchain/guardrails/middlewares/intellectual_property.py b/src/uipath_langchain/guardrails/middlewares/intellectual_property.py index 89a137553..636e464e0 100644 --- a/src/uipath_langchain/guardrails/middlewares/intellectual_property.py +++ b/src/uipath_langchain/guardrails/middlewares/intellectual_property.py @@ -1,17 +1,10 @@ """Intellectual property detection guardrail middleware.""" import logging -from typing import Any, Sequence +from typing import Sequence from uuid import uuid4 -from langchain.agents.middleware import ( - AgentMiddleware, - AgentState, - after_agent, - after_model, -) -from langchain_core.messages import AIMessage -from langgraph.runtime import Runtime +from langchain.agents.middleware import AgentMiddleware from uipath.core.guardrails import GuardrailSelector from uipath.platform.guardrails import ( BuiltInValidatorGuardrail, @@ -19,6 +12,7 @@ GuardrailScope, ) +from ..enums import GuardrailExecutionStage from ..models import GuardrailAction from ._base import BuiltInGuardrailMiddlewareMixin @@ -80,36 +74,27 @@ def __init__( self._middleware_instances = self._create_middleware_instances() def _create_middleware_instances(self) -> list[AgentMiddleware]: - """Create middleware instances — POST only (after_agent, after_model).""" - instances = [] - middleware_instance = self + """Create middleware instances — POST only (after_agent, after_model). + + Built via the shared ``_build_message_hooks`` helper forced to ``POST``, + so IP only ever registers the ``after_*`` hooks (it validates generated + output, never input). + """ + instances: list[AgentMiddleware] = [] guardrail_name = self._name.replace(" ", "_") if GuardrailScope.AGENT in self.scopes: - - async def _after_agent_func( - state: AgentState[Any], runtime: Runtime - ) -> None: - messages = state.get("messages", []) - middleware_instance._check_messages(list(messages)) - - _after_agent_func.__name__ = f"{guardrail_name}_after_agent" - _after_agent = after_agent(_after_agent_func) - instances.append(_after_agent) - + instances.extend( + self._build_message_hooks( + GuardrailScope.AGENT, GuardrailExecutionStage.POST, guardrail_name + ) + ) if GuardrailScope.LLM in self.scopes: - - async def _after_model_func( - state: AgentState[Any], runtime: Runtime - ) -> None: - messages = state.get("messages", []) - ai_messages = [msg for msg in messages if isinstance(msg, AIMessage)] - if ai_messages: - middleware_instance._check_messages([ai_messages[-1]]) - - _after_model_func.__name__ = f"{guardrail_name}_after_model" - _after_model = after_model(_after_model_func) - instances.append(_after_model) + instances.extend( + self._build_message_hooks( + GuardrailScope.LLM, GuardrailExecutionStage.POST, guardrail_name + ) + ) return instances diff --git a/src/uipath_langchain/guardrails/middlewares/pii_detection.py b/src/uipath_langchain/guardrails/middlewares/pii_detection.py index 960da83ac..c52074c7c 100644 --- a/src/uipath_langchain/guardrails/middlewares/pii_detection.py +++ b/src/uipath_langchain/guardrails/middlewares/pii_detection.py @@ -4,17 +4,8 @@ from typing import Any, Sequence from uuid import uuid4 -from langchain.agents.middleware import ( - AgentMiddleware, - AgentState, - after_agent, - after_model, - before_agent, - before_model, -) -from langchain_core.messages import AIMessage +from langchain.agents.middleware import AgentMiddleware from langchain_core.tools import BaseTool -from langgraph.runtime import Runtime from uipath.core.guardrails import GuardrailSelector from uipath.platform.guardrails import ( BuiltInValidatorGuardrail, @@ -87,6 +78,11 @@ def analyze_joke_syntax(joke: str) -> str: to apply guardrail to. Must contain at least one tool. Can be a mix of strings (tool names) or BaseTool objects. If TOOL scope is not specified, this parameter is ignored. + stage: Optional execution stage controlling when the guardrail runs. + ``PRE`` evaluates before the target executes (registers only the + ``before_*`` hook), ``POST`` evaluates after (only the ``after_*`` + hook), and ``PRE_AND_POST`` evaluates both. Applies to all scopes + (Agent, LLM, Tool). Defaults to ``GuardrailExecutionStage.PRE_AND_POST``. name: Optional name for the guardrail (defaults to "PII Detection") description: Optional description for the guardrail enabled_for_evals: Whether this guardrail is enabled for evaluation scenarios. @@ -153,57 +149,28 @@ def __init__( self._middleware_instances = self._create_middleware_instances() def _create_middleware_instances(self) -> list[AgentMiddleware]: - """Create middleware instances from decorated functions.""" - instances = [] - middleware_instance = self + """Create middleware instances from decorated functions. + + AGENT/LLM hooks are built by the shared, stage-gated + ``_build_message_hooks`` helper (``PRE`` → ``before_*`` only, ``POST`` → + ``after_*`` only, ``PRE_AND_POST`` → both), so a guardrail validates (and + acts, e.g. escalates) at a single checkpoint instead of twice per run. + """ + instances: list[AgentMiddleware] = [] guardrail_name = self._name.replace(" ", "_") if GuardrailScope.AGENT in self.scopes: - - async def _before_agent_func( - state: AgentState[Any], runtime: Runtime - ) -> None: - messages = state.get("messages", []) - middleware_instance._check_messages(list(messages)) - - _before_agent_func.__name__ = f"{guardrail_name}_before_agent" - _before_agent = before_agent(_before_agent_func) - instances.append(_before_agent) - - async def _after_agent_func( - state: AgentState[Any], runtime: Runtime - ) -> None: - messages = state.get("messages", []) - middleware_instance._check_messages(list(messages)) - - _after_agent_func.__name__ = f"{guardrail_name}_after_agent" - _after_agent = after_agent(_after_agent_func) - instances.append(_after_agent) - + instances.extend( + self._build_message_hooks( + GuardrailScope.AGENT, self._tool_stage, guardrail_name + ) + ) if GuardrailScope.LLM in self.scopes: - - async def _before_model_func( - state: AgentState[Any], runtime: Runtime - ) -> None: - messages = state.get("messages", []) - middleware_instance._check_messages(list(messages)) - - _before_model_func.__name__ = f"{guardrail_name}_before_model" - _before_model = before_model(_before_model_func) - instances.append(_before_model) - - async def _after_model_func( - state: AgentState[Any], runtime: Runtime - ) -> None: - messages = state.get("messages", []) - ai_messages = [msg for msg in messages if isinstance(msg, AIMessage)] - if ai_messages: - middleware_instance._check_messages([ai_messages[-1]]) - - _after_model_func.__name__ = f"{guardrail_name}_after_model" - _after_model = after_model(_after_model_func) - instances.append(_after_model) - + instances.extend( + self._build_message_hooks( + GuardrailScope.LLM, self._tool_stage, guardrail_name + ) + ) if GuardrailScope.TOOL in self.scopes: instances.append(self._create_tool_wrap_hook(guardrail_name)) diff --git a/src/uipath_langchain/guardrails/middlewares/prompt_injection.py b/src/uipath_langchain/guardrails/middlewares/prompt_injection.py index 843e7fba4..440ebece9 100644 --- a/src/uipath_langchain/guardrails/middlewares/prompt_injection.py +++ b/src/uipath_langchain/guardrails/middlewares/prompt_injection.py @@ -1,15 +1,15 @@ """Prompt injection detection guardrail middleware.""" import logging -from typing import Any, Sequence +from typing import Sequence from uuid import uuid4 -from langchain.agents.middleware import AgentMiddleware, AgentState, before_model -from langgraph.runtime import Runtime +from langchain.agents.middleware import AgentMiddleware from uipath.core.guardrails import GuardrailSelector from uipath.platform.guardrails import BuiltInValidatorGuardrail, GuardrailScope from uipath.platform.guardrails.guardrails import NumberParameterValue +from ..enums import GuardrailExecutionStage from ..models import GuardrailAction from ._base import BuiltInGuardrailMiddlewareMixin @@ -85,20 +85,11 @@ def __init__( self._middleware_instances = self._create_middleware_instances() def _create_middleware_instances(self) -> list[AgentMiddleware]: - """Create middleware instances from decorated functions.""" - instances = [] - middleware_instance = self + """Create middleware instances — LLM scope, PRE only (before_model).""" guardrail_name = self._name.replace(" ", "_") - - async def _before_model_func(state: AgentState[Any], runtime: Runtime) -> None: - messages = state.get("messages", []) - middleware_instance._check_messages(list(messages)) - - _before_model_func.__name__ = f"{guardrail_name}_before_model" - _before_model = before_model(_before_model_func) - instances.append(_before_model) - - return instances + return self._build_message_hooks( + GuardrailScope.LLM, GuardrailExecutionStage.PRE, guardrail_name + ) def __iter__(self): """Make the class iterable to return middleware instances.""" diff --git a/src/uipath_langchain/guardrails/middlewares/user_prompt_attacks.py b/src/uipath_langchain/guardrails/middlewares/user_prompt_attacks.py index 2fccf4539..ee32e0f8e 100644 --- a/src/uipath_langchain/guardrails/middlewares/user_prompt_attacks.py +++ b/src/uipath_langchain/guardrails/middlewares/user_prompt_attacks.py @@ -1,14 +1,14 @@ """User prompt attacks detection guardrail middleware.""" import logging -from typing import Any, Sequence +from typing import Sequence from uuid import uuid4 -from langchain.agents.middleware import AgentMiddleware, AgentState, before_model -from langgraph.runtime import Runtime +from langchain.agents.middleware import AgentMiddleware from uipath.core.guardrails import GuardrailSelector from uipath.platform.guardrails import BuiltInValidatorGuardrail, GuardrailScope +from ..enums import GuardrailExecutionStage from ..models import GuardrailAction from ._base import BuiltInGuardrailMiddlewareMixin @@ -62,20 +62,11 @@ def __init__( self._middleware_instances = self._create_middleware_instances() def _create_middleware_instances(self) -> list[AgentMiddleware]: - """Create middleware instances — PRE only (before_model).""" - instances = [] - middleware_instance = self + """Create middleware instances — LLM scope, PRE only (before_model).""" guardrail_name = self._name.replace(" ", "_") - - async def _before_model_func(state: AgentState[Any], runtime: Runtime) -> None: - messages = state.get("messages", []) - middleware_instance._check_messages(list(messages)) - - _before_model_func.__name__ = f"{guardrail_name}_before_model" - _before_model = before_model(_before_model_func) - instances.append(_before_model) - - return instances + return self._build_message_hooks( + GuardrailScope.LLM, GuardrailExecutionStage.PRE, guardrail_name + ) def __iter__(self): """Make the class iterable to return middleware instances.""" diff --git a/tests/cli/test_guardrails_in_langgraph.py b/tests/cli/test_guardrails_in_langgraph.py index acbc798b1..48980c25b 100644 --- a/tests/cli/test_guardrails_in_langgraph.py +++ b/tests/cli/test_guardrails_in_langgraph.py @@ -31,11 +31,16 @@ from unittest.mock import patch import pytest -from langchain_core.messages import AIMessage +from langchain.agents import create_agent +from langchain_core.messages import AIMessage, HumanMessage +from langgraph.checkpoint.memory import MemorySaver +from langgraph.types import Command from uipath.core.guardrails import ( + GuardrailScope, GuardrailValidationResult, GuardrailValidationResultType, ) +from uipath.platform.common import CreateEscalation from uipath.runtime import ( UiPathExecuteOptions, UiPathRuntimeContext, @@ -44,6 +49,14 @@ from uipath.runtime.errors import UiPathErrorCategory from uipath_langchain.agent.exceptions import AgentRuntimeError +from uipath_langchain.chat.openai import UiPathChatOpenAI +from uipath_langchain.guardrails import ( + EscalateAction, + GuardrailExecutionStage, + PIIDetectionEntity, + PIIDetectionEntityType, + UiPathPIIDetectionMiddleware, +) from uipath_langchain.runtime import register_runtime_factory @@ -545,3 +558,155 @@ def mock_evaluate(text, guardrail): assert output["joke"], f"[{flavor}] joke is empty" await runtime.dispose() await factory.dispose() + + +# --------------------------------------------------------------------------- +# Middleware escalation (HITL) — interrupt → resume +# --------------------------------------------------------------------------- + + +_FAIL_REASON = "PII detected: Email" + + +def _fail_on_email(text, guardrail): + """Fail the escalation PII guardrail when the input contains an email.""" + if guardrail.name == "PII escalation guardrail" and "@" in str(text): + return GuardrailValidationResult( + result=GuardrailValidationResultType.VALIDATION_FAILED, reason=_FAIL_REASON + ) + return _GUARDRAIL_PASSED + + +async def _final_llm(messages, *args, **kwargs): + """Mock LLM that returns a final answer (reached only after Approve resume).""" + return AIMessage(content="final answer") + + +def _interrupt_value(result: Any, agent: Any, config: dict[str, Any]) -> Any: + """Extract the value passed to interrupt() from an invoke result/state.""" + interrupts = result.get("__interrupt__") if isinstance(result, dict) else None + if interrupts: + return interrupts[0].value + state = agent.get_state(config) + if state.interrupts: + return state.interrupts[0].value + return None + + +class TestMiddlewareEscalation: + """The middleware EscalateAction suspends via interrupt() and resumes correctly. + + Middleware flavor only. The decorator/@guardrail flavor is a follow-up: it does + not yet publish the guardrail action context, so Component/ExecutionStage parity + needs decorator-side work before a 2-flavor escalation parity test is meaningful. + """ + + @pytest.fixture(autouse=True) + def _setup_env(self, mock_env_vars: dict[str, str]): + os.environ.clear() + os.environ.update(mock_env_vars) + + def _build_agent(self) -> Any: + llm = UiPathChatOpenAI(model="gpt-4o-2024-11-20") # type: ignore[call-arg] + return create_agent( + model=llm, + tools=[], + middleware=[ + *UiPathPIIDetectionMiddleware( + name="PII escalation guardrail", + scopes=[GuardrailScope.AGENT], + stage=GuardrailExecutionStage.PRE, + action=EscalateAction(app_name="EscApp", app_folder_path="Shared"), + entities=[PIIDetectionEntity(PIIDetectionEntityType.EMAIL, 0.5)], + ), + ], + checkpointer=MemorySaver(), + ) + + @pytest.mark.asyncio + async def test_escalation_suspends_with_context_derived_payload(self) -> None: + agent = self._build_agent() + config = {"configurable": {"thread_id": "esc-suspend"}} + with ( + patch( + "uipath_langchain.chat.openai.UiPathChatOpenAI.ainvoke", + side_effect=_final_llm, + ), + patch( + "uipath.platform.guardrails.GuardrailsService.evaluate_guardrail", + side_effect=_fail_on_email, + ), + ): + result = await agent.ainvoke( + {"messages": [HumanMessage(content="joke about a@b.com")]}, config + ) + + cre = _interrupt_value(result, agent, config) + assert isinstance(cre, CreateEscalation) + assert cre.app_name == "EscApp" + assert cre.app_folder_path == "Shared" + assert cre.data is not None + # Component + ExecutionStage derived from the runtime guardrail context + assert cre.data["Component"] == "Agent" + assert cre.data["ExecutionStage"] == "PreExecution" + # Flagged payload is JSON-encoded (so the action app can parse it) + assert cre.data["Inputs"] == json.dumps("joke about a@b.com") + assert cre.data["GuardrailName"] == "PII escalation guardrail" + + @pytest.mark.asyncio + async def test_escalation_approve_applies_reviewed_input(self) -> None: + agent = self._build_agent() + config = {"configurable": {"thread_id": "esc-approve"}} + with ( + patch( + "uipath_langchain.chat.openai.UiPathChatOpenAI.ainvoke", + side_effect=_final_llm, + ), + patch( + "uipath.platform.guardrails.GuardrailsService.evaluate_guardrail", + side_effect=_fail_on_email, + ), + ): + await agent.ainvoke( + {"messages": [HumanMessage(content="joke about a@b.com")]}, config + ) + final = await agent.ainvoke( + Command( + resume={ + "action": "Approve", + "data": {"ReviewedInputs": "clean topic"}, + } + ), + config, + ) + + # Run completed (no second escalation — stage=PRE) and the reviewed input + # was substituted into the message the agent ran on. + assert "__interrupt__" not in final + assert final["messages"][0].content == "clean topic" + + @pytest.mark.asyncio + async def test_escalation_reject_terminates_run(self) -> None: + agent = self._build_agent() + config = {"configurable": {"thread_id": "esc-reject"}} + with ( + patch( + "uipath_langchain.chat.openai.UiPathChatOpenAI.ainvoke", + side_effect=_final_llm, + ), + patch( + "uipath.platform.guardrails.GuardrailsService.evaluate_guardrail", + side_effect=_fail_on_email, + ), + ): + await agent.ainvoke( + {"messages": [HumanMessage(content="joke about a@b.com")]}, config + ) + with pytest.raises(AgentRuntimeError) as exc_info: + await agent.ainvoke( + Command( + resume={"action": "Reject", "data": {"Reason": "contains PII"}} + ), + config, + ) + assert "contains PII" in str(exc_info.value) diff --git a/tests/guardrails/middlewares/test_action_context_publishing.py b/tests/guardrails/middlewares/test_action_context_publishing.py new file mode 100644 index 000000000..2573f54ee --- /dev/null +++ b/tests/guardrails/middlewares/test_action_context_publishing.py @@ -0,0 +1,238 @@ +"""Tests that the middleware mixin publishes guardrail context to the action +and re-raises LangGraph control-flow signals (so interrupt() is not swallowed). +""" + +import json +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from langchain_core.messages import AIMessage, HumanMessage, ToolMessage +from langgraph.errors import GraphInterrupt +from langgraph.prebuilt.tool_node import ToolCallRequest +from uipath.core.guardrails import ( + GuardrailScope, + GuardrailValidationResult, + GuardrailValidationResultType, +) + +from uipath_langchain.guardrails import GuardrailAction +from uipath_langchain.guardrails._action_context import ( + GuardrailActionContext, + current_action_context, +) +from uipath_langchain.guardrails.enums import ( + GuardrailExecutionStage, + PIIDetectionEntityType, +) +from uipath_langchain.guardrails.middlewares import UiPathPIIDetectionMiddleware +from uipath_langchain.guardrails.models import PIIDetectionEntity + +_FAILED = GuardrailValidationResult( + result=GuardrailValidationResultType.VALIDATION_FAILED, reason="violation" +) +_PASSED = GuardrailValidationResult( + result=GuardrailValidationResultType.PASSED, reason="" +) + + +class _RecordingAction(GuardrailAction): + """Captures the published context when invoked; optionally raises.""" + + def __init__(self, raise_exc: BaseException | None = None) -> None: + self.seen: GuardrailActionContext | None = None + self.called = False + self._raise = raise_exc + + def handle_validation_result( + self, result: Any, data: Any, guardrail_name: str + ) -> Any: + self.called = True + self.seen = current_action_context() + if self._raise is not None: + raise self._raise + return None + + +def _message_mw( + action: GuardrailAction, scope: GuardrailScope = GuardrailScope.AGENT +) -> UiPathPIIDetectionMiddleware: + return UiPathPIIDetectionMiddleware( + scopes=[scope], + action=action, + entities=[PIIDetectionEntity(PIIDetectionEntityType.EMAIL)], + ) + + +def _tool_mw( + action: GuardrailAction, + stage: GuardrailExecutionStage = GuardrailExecutionStage.PRE, +) -> UiPathPIIDetectionMiddleware: + return UiPathPIIDetectionMiddleware( + scopes=[GuardrailScope.TOOL], + action=action, + entities=[PIIDetectionEntity(PIIDetectionEntityType.EMAIL)], + tools=["my_tool"], + stage=stage, + ) + + +def _request() -> ToolCallRequest: + tool_call: Any = {"id": "tc1", "name": "my_tool", "args": {"text": "a@b.com"}} + return ToolCallRequest( + tool_call=tool_call, tool=MagicMock(), state={}, runtime=MagicMock() + ) + + +# --------------------------------------------------------------------------- +# Context publishing — message scopes (_check_messages) +# --------------------------------------------------------------------------- + + +class TestMessageContextPublishing: + def test_agent_scope_publishes_agent_pre_context(self) -> None: + action = _RecordingAction() + mw = _message_mw(action, scope=GuardrailScope.AGENT) + with patch.object(mw, "_evaluate_guardrail", return_value=_FAILED): + mw._check_messages( + [HumanMessage(content="hi a@b.com")], + scope=GuardrailScope.AGENT, + stage=GuardrailExecutionStage.PRE, + ) + assert action.seen is not None + assert action.seen.scope == GuardrailScope.AGENT + assert action.seen.execution_stage == GuardrailExecutionStage.PRE + assert action.seen.component == "Agent" + + def test_llm_scope_publishes_llm_call_component(self) -> None: + action = _RecordingAction() + mw = _message_mw(action, scope=GuardrailScope.LLM) + with patch.object(mw, "_evaluate_guardrail", return_value=_FAILED): + mw._check_messages( + [HumanMessage(content="hi a@b.com")], + scope=GuardrailScope.LLM, + stage=GuardrailExecutionStage.POST, + ) + assert action.seen is not None + assert action.seen.component == "LLM call" + assert action.seen.execution_stage == GuardrailExecutionStage.POST + + def test_context_reset_after_call(self) -> None: + action = _RecordingAction() + mw = _message_mw(action) + with patch.object(mw, "_evaluate_guardrail", return_value=_FAILED): + mw._check_messages( + [HumanMessage(content="hi a@b.com")], + scope=GuardrailScope.AGENT, + stage=GuardrailExecutionStage.PRE, + ) + assert current_action_context() is None + + def test_passed_result_does_not_invoke_action(self) -> None: + action = _RecordingAction() + mw = _message_mw(action) + with patch.object(mw, "_evaluate_guardrail", return_value=_PASSED): + mw._check_messages( + [HumanMessage(content="clean")], + scope=GuardrailScope.AGENT, + stage=GuardrailExecutionStage.PRE, + ) + assert action.called is False + + def test_guardrail_description_is_published(self) -> None: + action = _RecordingAction() + mw = _message_mw(action, scope=GuardrailScope.AGENT) + with patch.object(mw, "_evaluate_guardrail", return_value=_FAILED): + mw._check_messages( + [HumanMessage(content="hi a@b.com")], + scope=GuardrailScope.AGENT, + stage=GuardrailExecutionStage.PRE, + ) + assert action.seen is not None + assert action.seen.description == mw._guardrail.description + + def test_post_publishes_input_text_as_input_payload(self) -> None: + action = _RecordingAction() + mw = _message_mw(action, scope=GuardrailScope.AGENT) + with patch.object(mw, "_evaluate_guardrail", return_value=_FAILED): + mw._check_messages( + [AIMessage(content="output a@b.com")], + scope=GuardrailScope.AGENT, + stage=GuardrailExecutionStage.POST, + input_text="the original input", + ) + assert action.seen is not None + assert action.seen.input_payload == json.dumps("the original input") + + +# --------------------------------------------------------------------------- +# Context publishing — tool scope (_run_tool_guardrail) +# --------------------------------------------------------------------------- + + +class TestToolContextPublishing: + @pytest.mark.asyncio + async def test_pre_publishes_tool_pre_context_with_tool_name(self) -> None: + action = _RecordingAction() + mw = _tool_mw(action, stage=GuardrailExecutionStage.PRE) + handler = AsyncMock(return_value=ToolMessage(content="{}", tool_call_id="tc1")) + with patch.object(mw, "_evaluate_guardrail", return_value=_FAILED): + await mw._run_tool_guardrail(_request(), handler) + assert action.seen is not None + assert action.seen.scope == GuardrailScope.TOOL + assert action.seen.execution_stage == GuardrailExecutionStage.PRE + assert action.seen.component == "my_tool" + assert action.seen.input_payload is None # no separate input at PRE + + @pytest.mark.asyncio + async def test_post_publishes_tool_post_context(self) -> None: + action = _RecordingAction() + mw = _tool_mw(action, stage=GuardrailExecutionStage.POST) + handler = AsyncMock( + return_value=ToolMessage(content='{"x": 1}', tool_call_id="tc1") + ) + with patch.object(mw, "_evaluate_guardrail", return_value=_FAILED): + await mw._run_tool_guardrail(_request(), handler) + assert action.seen is not None + assert action.seen.scope == GuardrailScope.TOOL + assert action.seen.execution_stage == GuardrailExecutionStage.POST + assert action.seen.component == "my_tool" + assert action.seen.input_payload == json.dumps({"text": "a@b.com"}) + + +# --------------------------------------------------------------------------- +# Bubble-up: interrupt() control-flow signal must NOT be swallowed +# --------------------------------------------------------------------------- + + +class TestGraphBubbleUpReraised: + def test_check_messages_reraises_graph_interrupt(self) -> None: + action = _RecordingAction(raise_exc=GraphInterrupt(())) + mw = _message_mw(action) + with patch.object(mw, "_evaluate_guardrail", return_value=_FAILED): + with pytest.raises(GraphInterrupt): + mw._check_messages( + [HumanMessage(content="a@b.com")], + scope=GuardrailScope.AGENT, + stage=GuardrailExecutionStage.PRE, + ) + + def test_check_messages_still_swallows_generic_exception(self) -> None: + action = _RecordingAction(raise_exc=RuntimeError("boom")) + mw = _message_mw(action) + with patch.object(mw, "_evaluate_guardrail", return_value=_FAILED): + # Generic errors stay swallowed/logged (no raise) — regression guard. + mw._check_messages( + [HumanMessage(content="a@b.com")], + scope=GuardrailScope.AGENT, + stage=GuardrailExecutionStage.PRE, + ) + + @pytest.mark.asyncio + async def test_run_tool_guardrail_reraises_graph_interrupt(self) -> None: + action = _RecordingAction(raise_exc=GraphInterrupt(())) + mw = _tool_mw(action, stage=GuardrailExecutionStage.PRE) + handler = AsyncMock(return_value=ToolMessage(content="{}", tool_call_id="tc1")) + with patch.object(mw, "_evaluate_guardrail", return_value=_FAILED): + with pytest.raises(GraphInterrupt): + await mw._run_tool_guardrail(_request(), handler) diff --git a/tests/guardrails/middlewares/test_hook_wiring.py b/tests/guardrails/middlewares/test_hook_wiring.py index 6214dfba4..84142362e 100644 --- a/tests/guardrails/middlewares/test_hook_wiring.py +++ b/tests/guardrails/middlewares/test_hook_wiring.py @@ -23,11 +23,15 @@ LogAction, ) -from uipath_langchain.guardrails.enums import PIIDetectionEntityType +from uipath_langchain.guardrails.enums import ( + GuardrailExecutionStage, + PIIDetectionEntityType, +) from uipath_langchain.guardrails.middlewares import ( UiPathHarmfulContentMiddleware, UiPathIntellectualPropertyMiddleware, UiPathPIIDetectionMiddleware, + UiPathPromptInjectionMiddleware, UiPathUserPromptAttacksMiddleware, ) from uipath_langchain.guardrails.models import PIIDetectionEntity @@ -113,6 +117,25 @@ def test_llm_scope_registers_only_before_model(self) -> None: ) +class TestPromptInjectionHookWiring: + """UiPathPromptInjectionMiddleware registers only PRE (before_*) hooks.""" + + def test_llm_scope_registers_only_before_model(self) -> None: + """LLM scope produces a single before_model hook, no after_* hooks.""" + middleware = UiPathPromptInjectionMiddleware( + scopes=[GuardrailScope.LLM], + action=_BLOCK, + ) + names = _hook_names(middleware) + assert len(names) == 1 + assert all("before" in n for n in names), ( + f"Expected only before_* hooks, got: {names}" + ) + assert not any("after" in n for n in names), ( + f"No after_* hooks expected, got: {names}" + ) + + class TestPIIDetectionHookWiringToolScope: """UiPathPIIDetectionMiddleware TOOL scope registers exactly one wrap_tool_call hook.""" @@ -142,6 +165,59 @@ def test_tool_scope_no_before_or_after_hooks(self) -> None: ) +class TestPIIDetectionHookWiringStageGating: + """PII middleware honors ``stage`` for AGENT/LLM scopes (not just TOOL). + + PRE registers only ``before_*``, POST only ``after_*``, PRE_AND_POST both. + """ + + def _pii(self, scope: GuardrailScope, stage: GuardrailExecutionStage): + return UiPathPIIDetectionMiddleware( + scopes=[scope], + action=_LOG, + entities=[PIIDetectionEntity(PIIDetectionEntityType.EMAIL)], + stage=stage, + ) + + def test_agent_pre_registers_only_before_agent(self) -> None: + names = _hook_names( + self._pii(GuardrailScope.AGENT, GuardrailExecutionStage.PRE) + ) + assert names == ["PII_Detection_before_agent"] + + def test_agent_post_registers_only_after_agent(self) -> None: + names = _hook_names( + self._pii(GuardrailScope.AGENT, GuardrailExecutionStage.POST) + ) + assert names == ["PII_Detection_after_agent"] + + def test_agent_pre_and_post_registers_both(self) -> None: + names = _hook_names( + self._pii(GuardrailScope.AGENT, GuardrailExecutionStage.PRE_AND_POST) + ) + assert sorted(names) == [ + "PII_Detection_after_agent", + "PII_Detection_before_agent", + ] + + def test_llm_pre_registers_only_before_model(self) -> None: + names = _hook_names(self._pii(GuardrailScope.LLM, GuardrailExecutionStage.PRE)) + assert names == ["PII_Detection_before_model"] + + def test_llm_post_registers_only_after_model(self) -> None: + names = _hook_names(self._pii(GuardrailScope.LLM, GuardrailExecutionStage.POST)) + assert names == ["PII_Detection_after_model"] + + def test_llm_pre_and_post_registers_both(self) -> None: + names = _hook_names( + self._pii(GuardrailScope.LLM, GuardrailExecutionStage.PRE_AND_POST) + ) + assert sorted(names) == [ + "PII_Detection_after_model", + "PII_Detection_before_model", + ] + + class TestHarmfulContentHookWiringToolScope: """UiPathHarmfulContentMiddleware TOOL scope registers exactly one wrap_tool_call hook.""" @@ -222,3 +298,55 @@ def test_all_scopes_register_four_hooks( assert sum(1 for n in names if "after" in n) == 2, ( f"Expected 2 after_* hooks: {names}" ) + + +class TestHarmfulContentHookWiringStageGating: + """Harmful content honors ``stage`` for AGENT/LLM scopes (mirrors PII). + + PRE registers only ``before_*``, POST only ``after_*``, PRE_AND_POST both — + so ``stage=PRE`` no longer also wires ``after_*`` (which would escalate twice). + """ + + def _hc(self, scope: GuardrailScope, stage: GuardrailExecutionStage): + return UiPathHarmfulContentMiddleware( + scopes=[scope], + action=_LOG, + entities=[HarmfulContentEntity("Hate")], + stage=stage, + ) + + def test_agent_pre_registers_only_before_agent(self) -> None: + names = _hook_names(self._hc(GuardrailScope.AGENT, GuardrailExecutionStage.PRE)) + assert names == ["Harmful_Content_Detection_before_agent"] + + def test_agent_post_registers_only_after_agent(self) -> None: + names = _hook_names( + self._hc(GuardrailScope.AGENT, GuardrailExecutionStage.POST) + ) + assert names == ["Harmful_Content_Detection_after_agent"] + + def test_agent_pre_and_post_registers_both(self) -> None: + names = _hook_names( + self._hc(GuardrailScope.AGENT, GuardrailExecutionStage.PRE_AND_POST) + ) + assert sorted(names) == [ + "Harmful_Content_Detection_after_agent", + "Harmful_Content_Detection_before_agent", + ] + + def test_llm_pre_registers_only_before_model(self) -> None: + names = _hook_names(self._hc(GuardrailScope.LLM, GuardrailExecutionStage.PRE)) + assert names == ["Harmful_Content_Detection_before_model"] + + def test_llm_post_registers_only_after_model(self) -> None: + names = _hook_names(self._hc(GuardrailScope.LLM, GuardrailExecutionStage.POST)) + assert names == ["Harmful_Content_Detection_after_model"] + + def test_llm_pre_and_post_registers_both(self) -> None: + names = _hook_names( + self._hc(GuardrailScope.LLM, GuardrailExecutionStage.PRE_AND_POST) + ) + assert sorted(names) == [ + "Harmful_Content_Detection_after_model", + "Harmful_Content_Detection_before_model", + ] diff --git a/tests/guardrails/test_action_context.py b/tests/guardrails/test_action_context.py new file mode 100644 index 000000000..fa198cb6c --- /dev/null +++ b/tests/guardrails/test_action_context.py @@ -0,0 +1,44 @@ +"""Unit tests for the guardrail action runtime context.""" + +from uipath.core.guardrails import GuardrailScope + +from uipath_langchain.guardrails._action_context import ( + GuardrailActionContext, + _action_context, + component_label, + current_action_context, +) +from uipath_langchain.guardrails.enums import GuardrailExecutionStage + + +class TestComponentLabel: + def test_agent(self) -> None: + assert component_label(GuardrailScope.AGENT) == "Agent" + + def test_llm(self) -> None: + assert component_label(GuardrailScope.LLM) == "LLM call" + + def test_tool_returns_none(self) -> None: + # TOOL has no static label — the tool name is supplied separately. + assert component_label(GuardrailScope.TOOL) is None + + def test_none(self) -> None: + assert component_label(None) is None + + +class TestContextVar: + def test_default_is_none(self) -> None: + assert current_action_context() is None + + def test_set_get_reset_round_trip(self) -> None: + ctx = GuardrailActionContext( + scope=GuardrailScope.AGENT, + execution_stage=GuardrailExecutionStage.PRE, + component="Agent", + ) + token = _action_context.set(ctx) + try: + assert current_action_context() is ctx + finally: + _action_context.reset(token) + assert current_action_context() is None diff --git a/tests/guardrails/test_escalate_action.py b/tests/guardrails/test_escalate_action.py new file mode 100644 index 000000000..260b8744a --- /dev/null +++ b/tests/guardrails/test_escalate_action.py @@ -0,0 +1,512 @@ +"""Unit tests for the middleware ``EscalateAction``. + +Drives ``EscalateAction.handle_validation_result(result, data, name)`` directly +(the guardrail-middleware action contract), patching: +- ``escalate_action.interrupt`` to simulate the resume value (Approve/Reject), and +- ``escalate_action.UiPathConfig`` to control TenantName/AgentTrace derivation. + +The guardrail runtime context (scope/stage/component) that the middleware would +publish is set explicitly via ``_action_context`` where a case needs it. +""" + +import json +from contextlib import contextmanager +from types import SimpleNamespace +from typing import Any, Iterator +from unittest.mock import patch + +import pytest +from uipath.core.guardrails import ( + GuardrailScope, + GuardrailValidationResult, + GuardrailValidationResultType, +) +from uipath.platform.action_center.tasks import TaskRecipient, TaskRecipientType +from uipath.platform.common import CreateEscalation +from uipath.platform.guardrails.decorators._exceptions import GuardrailBlockException + +import uipath_langchain.guardrails.escalate_action as escalate_module +from uipath_langchain.guardrails import EscalateAction, GuardrailExecutionStage +from uipath_langchain.guardrails._action_context import ( + GuardrailActionContext, + _action_context, +) +from uipath_langchain.guardrails.escalate_action import ( + _coerce_reviewed, + _execution_stage_label, + _normalize_escalation_result, + _resolve_tenant_name, +) + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _failed(reason: str = "PII detected: Email") -> GuardrailValidationResult: + return GuardrailValidationResult( + result=GuardrailValidationResultType.VALIDATION_FAILED, reason=reason + ) + + +def _passed() -> GuardrailValidationResult: + return GuardrailValidationResult( + result=GuardrailValidationResultType.PASSED, reason="" + ) + + +def _action(**kwargs: Any) -> EscalateAction: + kwargs.setdefault("app_name", "Guardrail.Escalation.Action.App.2") + kwargs.setdefault("app_folder_path", "Shared") + return EscalateAction(**kwargs) + + +@contextmanager +def _published_context( + *, + scope: GuardrailScope | None = None, + stage: GuardrailExecutionStage | None = None, + component: str | None = None, + description: str | None = None, + input_payload: str | None = None, +) -> Iterator[None]: + """Publish a guardrail action context for the duration of the block.""" + token = _action_context.set( + GuardrailActionContext( + scope=scope, + execution_stage=stage, + component=component, + description=description, + input_payload=input_payload, + ) + ) + try: + yield + finally: + _action_context.reset(token) + + +@contextmanager +def _patched(resume: Any, config: SimpleNamespace | None = None) -> Iterator[Any]: + """Patch interrupt (to return ``resume``) and UiPathConfig; yields the interrupt mock.""" + cfg = ( + config + if config is not None + else SimpleNamespace(tenant_name=None, base_url=None) + ) + with ( + patch.object( + escalate_module, "interrupt", return_value=resume + ) as mock_interrupt, + patch.object(escalate_module, "UiPathConfig", cfg), + ): + yield mock_interrupt + + +# --------------------------------------------------------------------------- +# Triggering: PASSED vs VALIDATION_FAILED +# --------------------------------------------------------------------------- + + +class TestTriggering: + def test_passed_returns_none_without_interrupt(self) -> None: + with _patched(resume={"action": "Approve", "data": {}}) as mock_interrupt: + result = _action().handle_validation_result(_passed(), "data", "g") + assert result is None + mock_interrupt.assert_not_called() + + def test_failed_calls_interrupt_with_create_escalation(self) -> None: + with _patched(resume={"action": "Approve", "data": {}}) as mock_interrupt: + _action(assignee="user@x.com").handle_validation_result( + _failed("PII detected: Email"), "the topic", "PII guard" + ) + mock_interrupt.assert_called_once() + payload = mock_interrupt.call_args[0][0] + assert isinstance(payload, CreateEscalation) + assert payload.app_name == "Guardrail.Escalation.Action.App.2" + assert payload.app_folder_path == "Shared" + assert payload.assignee == "user@x.com" + assert payload.title == "Guardrail 'PII guard': review required" + assert payload.data is not None + assert payload.data["GuardrailName"] == "PII guard" + assert payload.data["GuardrailDescription"] == "" + assert payload.data["GuardrailResult"] == "PII detected: Email" + + +# --------------------------------------------------------------------------- +# Payload: JSON-encoding of the flagged content +# --------------------------------------------------------------------------- + + +class TestPayloadEncoding: + def test_string_payload_is_json_encoded(self) -> None: + with _patched(resume={"action": "Approve", "data": {}}) as mock_interrupt: + _action().handle_validation_result(_failed(), "topic with email", "g") + data = mock_interrupt.call_args[0][0].data + assert data["Inputs"] == json.dumps("topic with email") + assert data["Outputs"] == "" + assert data["ToolInputs"] == data["Inputs"] + assert data["ToolOutputs"] == data["Outputs"] + + def test_dict_payload_is_json_object(self) -> None: + with _patched(resume={"action": "Approve", "data": {}}) as mock_interrupt: + _action().handle_validation_result(_failed(), {"email": "a@b.com"}, "g") + data = mock_interrupt.call_args[0][0].data + assert json.loads(data["Inputs"]) == {"email": "a@b.com"} + assert data["ToolInputs"] == data["Inputs"] + + +# --------------------------------------------------------------------------- +# Payload: Component / ExecutionStage from runtime context +# --------------------------------------------------------------------------- + + +class TestContextDerivedFields: + def test_agent_pre_context(self) -> None: + with _patched(resume={"action": "Approve", "data": {}}) as mock_interrupt: + with _published_context( + scope=GuardrailScope.AGENT, + stage=GuardrailExecutionStage.PRE, + component="Agent", + ): + _action().handle_validation_result(_failed(), "x", "g") + data = mock_interrupt.call_args[0][0].data + assert data["Component"] == "Agent" + assert data["Tool"] == "Agent" + assert data["ExecutionStage"] == "PreExecution" + + def test_tool_post_context(self) -> None: + with _patched(resume={"action": "Approve", "data": {}}) as mock_interrupt: + with _published_context( + scope=GuardrailScope.TOOL, + stage=GuardrailExecutionStage.POST, + component="my_tool", + ): + _action().handle_validation_result(_failed(), {"a": 1}, "g") + data = mock_interrupt.call_args[0][0].data + assert data["Component"] == "my_tool" + assert data["Tool"] == "my_tool" + assert data["ExecutionStage"] == "PostExecution" + + def test_no_context_omits_component_and_stage(self) -> None: + with _patched(resume={"action": "Approve", "data": {}}) as mock_interrupt: + _action().handle_validation_result(_failed(), "x", "g") + data = mock_interrupt.call_args[0][0].data + assert "Component" not in data + assert "Tool" not in data + assert "ExecutionStage" not in data + + +# --------------------------------------------------------------------------- +# Payload: stage-aware Inputs / Outputs (input vs output escalation) +# --------------------------------------------------------------------------- + + +class TestStageAwarePayload: + def test_pre_maps_content_to_inputs_only(self) -> None: + with _patched(resume={"action": "Approve", "data": {}}) as mi: + with _published_context( + scope=GuardrailScope.TOOL, + stage=GuardrailExecutionStage.PRE, + component="my_tool", + ): + _action().handle_validation_result(_failed(), {"a": 1}, "g") + data = mi.call_args[0][0].data + assert json.loads(data["Inputs"]) == {"a": 1} + assert data["Outputs"] == "" # no output at PRE + assert data["ExecutionStage"] == "PreExecution" + assert data["ToolInputs"] == data["Inputs"] + assert data["ToolOutputs"] == data["Outputs"] + + def test_post_maps_output_to_outputs_and_input_to_inputs(self) -> None: + with _patched(resume={"action": "Approve", "data": {}}) as mi: + with _published_context( + scope=GuardrailScope.TOOL, + stage=GuardrailExecutionStage.POST, + component="my_tool", + input_payload=json.dumps({"in": 1}), + ): + _action().handle_validation_result(_failed(), {"out": 2}, "g") + data = mi.call_args[0][0].data + assert json.loads(data["Outputs"]) == {"out": 2} + assert json.loads(data["Inputs"]) == {"in": 1} + assert data["ExecutionStage"] == "PostExecution" + assert data["Tool"] == "my_tool" + assert data["Component"] == "my_tool" + assert data["ToolInputs"] == data["Inputs"] + assert data["ToolOutputs"] == data["Outputs"] + + def test_post_without_input_payload_leaves_inputs_empty(self) -> None: + with _patched(resume={"action": "Approve", "data": {}}) as mi: + with _published_context( + scope=GuardrailScope.TOOL, + stage=GuardrailExecutionStage.POST, + component="my_tool", + ): + _action().handle_validation_result(_failed(), {"out": 2}, "g") + data = mi.call_args[0][0].data + assert json.loads(data["Outputs"]) == {"out": 2} + assert data["Inputs"] == "" + assert data["ToolInputs"] == data["Inputs"] + assert data["ToolOutputs"] == data["Outputs"] + + +# --------------------------------------------------------------------------- +# Payload: GuardrailDescription and GuardrailResult +# --------------------------------------------------------------------------- + + +class TestGuardrailDescription: + def test_description_comes_from_context(self) -> None: + with _patched(resume={"action": "Approve", "data": {}}) as mi: + with _published_context(description="Detects PII emails"): + _action().handle_validation_result(_failed("PII detected"), "x", "g") + data = mi.call_args[0][0].data + assert data["GuardrailDescription"] == "Detects PII emails" + assert data["GuardrailResult"] == "PII detected" + + def test_description_empty_when_absent(self) -> None: + with _patched(resume={"action": "Approve", "data": {}}) as mi: + _action().handle_validation_result(_failed("PII detected"), "x", "g") + data = mi.call_args[0][0].data + assert data["GuardrailDescription"] == "" + assert data["GuardrailResult"] == "PII detected" + + +# --------------------------------------------------------------------------- +# Payload: TenantName / AgentTrace from UiPathConfig +# --------------------------------------------------------------------------- + + +class TestConfigDerivedFields: + def test_tenant_name_from_config(self) -> None: + cfg = SimpleNamespace(tenant_name="MyTenant", base_url=None) + with _patched(resume={"action": "Approve", "data": {}}, config=cfg) as mi: + _action().handle_validation_result(_failed(), "x", "g") + assert mi.call_args[0][0].data["TenantName"] == "MyTenant" + + def test_tenant_name_falls_back_to_base_url(self) -> None: + cfg = SimpleNamespace( + tenant_name=None, base_url="https://alpha.uipath.com/Org/MyTenant" + ) + with _patched(resume={"action": "Approve", "data": {}}, config=cfg) as mi: + _action().handle_validation_result(_failed(), "x", "g") + assert mi.call_args[0][0].data["TenantName"] == "MyTenant" + + def test_tenant_name_omitted_when_unresolvable(self) -> None: + cfg = SimpleNamespace(tenant_name=None, base_url=None) + with _patched(resume={"action": "Approve", "data": {}}, config=cfg) as mi: + _action().handle_validation_result(_failed(), "x", "g") + assert "TenantName" not in mi.call_args[0][0].data + + def test_agent_trace_deployed_url(self) -> None: + cfg = SimpleNamespace( + tenant_name="T", + base_url="https://alpha.uipath.com/Org/T", + organization_id="org-123", + is_studio_project=False, + folder_key="folder-1", + process_uuid="proc-1", + trace_id="trace-1", + project_key="project-1", + process_version="1.0.0", + ) + with _patched(resume={"action": "Approve", "data": {}}, config=cfg) as mi: + _action().handle_validation_result(_failed(), "x", "g") + trace = mi.call_args[0][0].data["AgentTrace"] + assert trace.startswith("https://alpha.uipath.com/org-123/agents_/deployed/") + assert "/traces/trace-1" in trace + + def test_agent_trace_omitted_when_ids_missing(self) -> None: + cfg = SimpleNamespace( + tenant_name="T", + base_url="https://alpha.uipath.com/Org/T", + organization_id="org-123", + ) + with _patched(resume={"action": "Approve", "data": {}}, config=cfg) as mi: + _action().handle_validation_result(_failed(), "x", "g") + assert "AgentTrace" not in mi.call_args[0][0].data + + +# --------------------------------------------------------------------------- +# Response handling: Approve / Reject / modify +# --------------------------------------------------------------------------- + + +class TestResponseHandling: + def test_approve_with_reviewed_inputs_string(self) -> None: + resume = {"action": "Approve", "data": {"ReviewedInputs": "clean topic"}} + with _patched(resume=resume): + result = _action().handle_validation_result(_failed(), "dirty topic", "g") + assert result == "clean topic" + + def test_approve_with_reviewed_inputs_dict_payload(self) -> None: + reviewed = {"email": "[redacted]"} + resume = {"action": "Approve", "data": {"ReviewedInputs": json.dumps(reviewed)}} + with _patched(resume=resume): + result = _action().handle_validation_result( + _failed(), {"email": "a@b.com"}, "g" + ) + assert result == reviewed + + def test_approve_without_reviewed_inputs_returns_none(self) -> None: + with _patched(resume={"action": "Approve", "data": {}}): + result = _action().handle_validation_result(_failed(), "topic", "g") + assert result is None + + def test_reject_raises_block_exception_with_reason(self) -> None: + resume = {"action": "Reject", "data": {"Reason": "contains PII"}} + with _patched(resume=resume): + with pytest.raises(GuardrailBlockException) as exc_info: + _action().handle_validation_result(_failed(), "topic", "PII guard") + assert "contains PII" in exc_info.value.detail + assert "PII guard" in exc_info.value.title + + def test_reject_falls_back_to_result_reason(self) -> None: + resume = {"action": "Reject", "data": {}} + with _patched(resume=resume): + with pytest.raises(GuardrailBlockException) as exc_info: + _action().handle_validation_result(_failed("the reason"), "topic", "g") + assert "the reason" in exc_info.value.detail + + def test_post_approve_reads_reviewed_outputs(self) -> None: + resume = {"action": "Approve", "data": {"ReviewedOutputs": "clean output"}} + with _patched(resume=resume): + with _published_context( + scope=GuardrailScope.TOOL, + stage=GuardrailExecutionStage.POST, + component="t", + ): + result = _action().handle_validation_result( + _failed(), "dirty output", "g" + ) + assert result == "clean output" + + def test_post_approve_ignores_reviewed_inputs(self) -> None: + resume = {"action": "Approve", "data": {"ReviewedInputs": "ignored"}} + with _patched(resume=resume): + with _published_context(stage=GuardrailExecutionStage.POST): + result = _action().handle_validation_result(_failed(), "orig", "g") + assert result is None + + +# --------------------------------------------------------------------------- +# Pure helpers +# --------------------------------------------------------------------------- + + +class TestNormalizeEscalationResult: + def test_none_defaults_to_approve(self) -> None: + assert _normalize_escalation_result(None) == ("Approve", {}) + + def test_dict_with_action_key(self) -> None: + outcome, data = _normalize_escalation_result( + {"action": "Reject", "data": {"Reason": "x"}} + ) + assert outcome == "Reject" + assert data == {"Reason": "x"} + + def test_dict_with_capitalized_action_key(self) -> None: + outcome, _ = _normalize_escalation_result({"Action": "Approve"}) + assert outcome == "Approve" + + def test_dict_without_nested_data_uses_self(self) -> None: + outcome, data = _normalize_escalation_result( + {"action": "Approve", "ReviewedInputs": "v"} + ) + assert outcome == "Approve" + assert data == {"action": "Approve", "ReviewedInputs": "v"} + + def test_object_with_action_and_data(self) -> None: + raw = SimpleNamespace(action="Reject", data={"Reason": "no"}) + assert _normalize_escalation_result(raw) == ("Reject", {"Reason": "no"}) + + def test_object_without_action_defaults_to_approve(self) -> None: + raw = SimpleNamespace(action=None, data=None) + assert _normalize_escalation_result(raw) == ("Approve", {}) + + +class TestCoerceReviewed: + def test_want_dict_parses_json_object(self) -> None: + assert _coerce_reviewed(json.dumps({"a": 1}), want_dict=True) == {"a": 1} + + def test_want_dict_non_json_returns_as_is(self) -> None: + assert _coerce_reviewed("not json", want_dict=True) == "not json" + + def test_want_str_returns_as_is(self) -> None: + assert _coerce_reviewed("clean", want_dict=False) == "clean" + + +class TestExecutionStageLabel: + def test_pre(self) -> None: + assert _execution_stage_label(GuardrailExecutionStage.PRE) == "PreExecution" + + def test_post(self) -> None: + assert _execution_stage_label(GuardrailExecutionStage.POST) == "PostExecution" + + def test_pre_and_post_returns_none(self) -> None: + assert _execution_stage_label(GuardrailExecutionStage.PRE_AND_POST) is None + + def test_none(self) -> None: + assert _execution_stage_label(None) is None + + +class TestResolveTenantName: + def test_prefers_config_tenant_name(self) -> None: + cfg = SimpleNamespace(tenant_name="FromEnv", base_url="https://x/o/FromUrl") + with patch.object(escalate_module, "UiPathConfig", cfg): + assert _resolve_tenant_name() == "FromEnv" + + def test_falls_back_to_url(self) -> None: + cfg = SimpleNamespace(tenant_name=None, base_url="https://x/org/tenant") + with patch.object(escalate_module, "UiPathConfig", cfg): + assert _resolve_tenant_name() == "tenant" + + def test_returns_none_when_unresolvable(self) -> None: + cfg = SimpleNamespace(tenant_name=None, base_url=None) + with patch.object(escalate_module, "UiPathConfig", cfg): + assert _resolve_tenant_name() is None + + +# --------------------------------------------------------------------------- +# Escalation target: recipient (TaskRecipient) — the HITL-native typed target +# --------------------------------------------------------------------------- + + +class TestRecipient: + @pytest.mark.parametrize( + "recipient_type,value", + [ + (TaskRecipientType.USER_ID, "user-guid-1"), + (TaskRecipientType.GROUP_ID, "group-guid-1"), + (TaskRecipientType.EMAIL, "reviewer@x.com"), + (TaskRecipientType.GROUP_NAME, "Reviewers"), + ], + ) + def test_recipient_passed_through( + self, recipient_type: TaskRecipientType, value: str + ) -> None: + recipient = TaskRecipient(type=recipient_type, value=value) + with _patched(resume={"action": "Approve", "data": {}}) as mock_interrupt: + _action(recipient=recipient).handle_validation_result(_failed(), "x", "g") + payload = mock_interrupt.call_args[0][0] + assert payload.recipient == recipient + assert payload.recipient.type == recipient_type + assert payload.recipient.value == value + + def test_recipient_defaults_to_none(self) -> None: + with _patched(resume={"action": "Approve", "data": {}}) as mock_interrupt: + _action(assignee="user@x.com").handle_validation_result(_failed(), "x", "g") + payload = mock_interrupt.call_args[0][0] + assert payload.recipient is None + assert payload.assignee == "user@x.com" + + def test_assignee_and_recipient_coexist(self) -> None: + recipient = TaskRecipient(type=TaskRecipientType.GROUP_NAME, value="Reviewers") + with _patched(resume={"action": "Approve", "data": {}}) as mock_interrupt: + _action( + assignee="user@x.com", recipient=recipient + ).handle_validation_result(_failed(), "x", "g") + payload = mock_interrupt.call_args[0][0] + assert payload.assignee == "user@x.com" + assert payload.recipient == recipient From b0278b81fa3c10ed3fbdbf437b0e4e73f959809a Mon Sep 17 00:00:00 2001 From: Andrei Petraru Date: Wed, 10 Jun 2026 16:20:59 +0300 Subject: [PATCH 2/3] fix(guardrails): validate final AI message at AGENT POST so ReviewedOutputs applies [AL-289] MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The after_agent (AGENT-scope POST) hook validated the whole joined conversation, so the flagged Outputs included the input echo and an escalation's ReviewedOutputs edit couldn't be mapped back to a single message — it was silently dropped. Validate only the agent's final AI message (like the LLM-scope after_model hook): Outputs is the agent output, the original input is carried as Inputs, and an approve-with-edit is applied on resume. Add TestMiddlewareEscalation coverage for AGENT-POST and LLM-POST escalations (payload Component/ExecutionStage/Inputs/Outputs + ReviewedOutputs on approve). Co-Authored-By: Claude Opus 4.8 (1M context) --- .../guardrails/middlewares/_base.py | 19 ++- tests/cli/test_guardrails_in_langgraph.py | 159 +++++++++++++++++- 2 files changed, 169 insertions(+), 9 deletions(-) diff --git a/src/uipath_langchain/guardrails/middlewares/_base.py b/src/uipath_langchain/guardrails/middlewares/_base.py index d024cbd5f..ee21ac0b6 100644 --- a/src/uipath_langchain/guardrails/middlewares/_base.py +++ b/src/uipath_langchain/guardrails/middlewares/_base.py @@ -326,13 +326,20 @@ async def _before_agent_func( async def _after_agent_func( state: AgentState[Any], runtime: Runtime ) -> None: + # POST validates the agent's OUTPUT — the final AI message — + # not the whole conversation, so the flagged content maps back + # to a single message (an escalation's ReviewedOutputs edit can + # be applied) and the original input is carried separately as + # input_text. Mirrors the LLM-scope after_model behavior. messages = state.get("messages", []) - mw._check_messages( - list(messages), - scope=GuardrailScope.AGENT, - stage=GuardrailExecutionStage.POST, - input_text=mw._last_input_text(list(messages)), - ) + ai_messages = [m for m in messages if isinstance(m, AIMessage)] + if ai_messages: + mw._check_messages( + [ai_messages[-1]], + scope=GuardrailScope.AGENT, + stage=GuardrailExecutionStage.POST, + input_text=mw._last_input_text(messages), + ) _after_agent_func.__name__ = f"{guardrail_name}_after_agent" hooks.append(after_agent(_after_agent_func)) diff --git a/tests/cli/test_guardrails_in_langgraph.py b/tests/cli/test_guardrails_in_langgraph.py index 48980c25b..902ac1223 100644 --- a/tests/cli/test_guardrails_in_langgraph.py +++ b/tests/cli/test_guardrails_in_langgraph.py @@ -582,6 +582,11 @@ async def _final_llm(messages, *args, **kwargs): return AIMessage(content="final answer") +async def _final_llm_with_email(messages, *args, **kwargs): + """Mock LLM whose OUTPUT contains an email, so a POST guardrail fires on it.""" + return AIMessage(content="here is your joke — reach me at a@b.com") + + def _interrupt_value(result: Any, agent: Any, config: dict[str, Any]) -> Any: """Extract the value passed to interrupt() from an invoke result/state.""" interrupts = result.get("__interrupt__") if isinstance(result, dict) else None @@ -596,9 +601,10 @@ def _interrupt_value(result: Any, agent: Any, config: dict[str, Any]) -> Any: class TestMiddlewareEscalation: """The middleware EscalateAction suspends via interrupt() and resumes correctly. - Middleware flavor only. The decorator/@guardrail flavor is a follow-up: it does - not yet publish the guardrail action context, so Component/ExecutionStage parity - needs decorator-side work before a 2-flavor escalation parity test is meaningful. + Covers AGENT scope at PRE (escalate on input; Approve substitutes ReviewedInputs) + and AGENT/LLM scope at POST (escalate on the output; Approve substitutes + ReviewedOutputs), asserting the context-derived Component/ExecutionStage and the + stage-aware Inputs/Outputs payload. """ @pytest.fixture(autouse=True) @@ -623,6 +629,28 @@ def _build_agent(self) -> Any: checkpointer=MemorySaver(), ) + def _build_post_agent(self, scope: GuardrailScope) -> Any: + """Agent with a single PII escalation guardrail at POST for the given scope. + + POST validates the *output* (the agent's final message for AGENT scope, + the model response for LLM scope), so the escalation fires on the output. + """ + llm = UiPathChatOpenAI(model="gpt-4o-2024-11-20") # type: ignore[call-arg] + return create_agent( + model=llm, + tools=[], + middleware=[ + *UiPathPIIDetectionMiddleware( + name="PII escalation guardrail", + scopes=[scope], + stage=GuardrailExecutionStage.POST, + action=EscalateAction(app_name="EscApp", app_folder_path="Shared"), + entities=[PIIDetectionEntity(PIIDetectionEntityType.EMAIL, 0.5)], + ), + ], + checkpointer=MemorySaver(), + ) + @pytest.mark.asyncio async def test_escalation_suspends_with_context_derived_payload(self) -> None: agent = self._build_agent() @@ -710,3 +738,128 @@ async def test_escalation_reject_terminates_run(self) -> None: config, ) assert "contains PII" in str(exc_info.value) + + # -- POST stage: escalate on the OUTPUT (input is clean, only output flagged) -- + + @pytest.mark.asyncio + async def test_agent_post_escalation_suspends_with_output_payload(self) -> None: + agent = self._build_post_agent(GuardrailScope.AGENT) + config = {"configurable": {"thread_id": "esc-agent-post"}} + with ( + patch( + "uipath_langchain.chat.openai.UiPathChatOpenAI.ainvoke", + side_effect=_final_llm_with_email, + ), + patch( + "uipath.platform.guardrails.GuardrailsService.evaluate_guardrail", + side_effect=_fail_on_email, + ), + ): + result = await agent.ainvoke( + {"messages": [HumanMessage(content="tell a joke")]}, config + ) + + cre = _interrupt_value(result, agent, config) + assert isinstance(cre, CreateEscalation) + assert cre.data is not None + # AGENT scope at POST: the agent OUTPUT is flagged; the original input is + # carried alongside it so the reviewer sees both. + assert cre.data["Component"] == "Agent" + assert cre.data["ExecutionStage"] == "PostExecution" + assert cre.data["Outputs"] == json.dumps( + "here is your joke — reach me at a@b.com" + ) + assert cre.data["Inputs"] == json.dumps("tell a joke") + assert cre.data["GuardrailName"] == "PII escalation guardrail" + + @pytest.mark.asyncio + async def test_agent_post_approve_applies_reviewed_output(self) -> None: + agent = self._build_post_agent(GuardrailScope.AGENT) + config = {"configurable": {"thread_id": "esc-agent-post-approve"}} + with ( + patch( + "uipath_langchain.chat.openai.UiPathChatOpenAI.ainvoke", + side_effect=_final_llm_with_email, + ), + patch( + "uipath.platform.guardrails.GuardrailsService.evaluate_guardrail", + side_effect=_fail_on_email, + ), + ): + await agent.ainvoke( + {"messages": [HumanMessage(content="tell a joke")]}, config + ) + final = await agent.ainvoke( + Command( + resume={ + "action": "Approve", + "data": {"ReviewedOutputs": "clean output"}, + } + ), + config, + ) + + # Run completed and the reviewer's edit was written back to the agent output. + assert "__interrupt__" not in final + assert final["messages"][-1].content == "clean output" + + @pytest.mark.asyncio + async def test_llm_post_escalation_suspends_with_output_payload(self) -> None: + agent = self._build_post_agent(GuardrailScope.LLM) + config = {"configurable": {"thread_id": "esc-llm-post"}} + with ( + patch( + "uipath_langchain.chat.openai.UiPathChatOpenAI.ainvoke", + side_effect=_final_llm_with_email, + ), + patch( + "uipath.platform.guardrails.GuardrailsService.evaluate_guardrail", + side_effect=_fail_on_email, + ), + ): + result = await agent.ainvoke( + {"messages": [HumanMessage(content="tell a joke")]}, config + ) + + cre = _interrupt_value(result, agent, config) + assert isinstance(cre, CreateEscalation) + assert cre.data is not None + # LLM scope at POST fires through the after_model hook → Component "LLM call". + assert cre.data["Component"] == "LLM call" + assert cre.data["ExecutionStage"] == "PostExecution" + assert cre.data["Outputs"] == json.dumps( + "here is your joke — reach me at a@b.com" + ) + assert cre.data["Inputs"] == json.dumps("tell a joke") + assert cre.data["GuardrailName"] == "PII escalation guardrail" + + @pytest.mark.asyncio + async def test_llm_post_approve_applies_reviewed_output(self) -> None: + agent = self._build_post_agent(GuardrailScope.LLM) + config = {"configurable": {"thread_id": "esc-llm-post-approve"}} + with ( + patch( + "uipath_langchain.chat.openai.UiPathChatOpenAI.ainvoke", + side_effect=_final_llm_with_email, + ), + patch( + "uipath.platform.guardrails.GuardrailsService.evaluate_guardrail", + side_effect=_fail_on_email, + ), + ): + await agent.ainvoke( + {"messages": [HumanMessage(content="tell a joke")]}, config + ) + final = await agent.ainvoke( + Command( + resume={ + "action": "Approve", + "data": {"ReviewedOutputs": "clean output"}, + } + ), + config, + ) + + # The reviewer's edit was written back to the LLM output via after_model. + assert "__interrupt__" not in final + assert final["messages"][-1].content == "clean output" From 4fef2b5821a26a360a68493423eeae7eee7b42ea Mon Sep 17 00:00:00 2001 From: Andrei Petraru Date: Wed, 10 Jun 2026 17:07:40 +0300 Subject: [PATCH 3/3] chore(samples): declare joke-agent escalation app in bindings.json, drop env vars [AL-289] Address the review comment on graph.py: configure the Guardrail Escalation Action App as a declarative bindings.json "app" resource (mirroring ticket-classification) instead of GUARDRAIL_ESCALATION_APP_* env vars. graph.py passes the literal app name/folder to EscalateAction; the binding is the deploy-time contract that Studio/deploy resolves and can override (locally the literals are used). Co-Authored-By: Claude Opus 4.8 (1M context) --- samples/joke-agent/bindings.json | 26 ++++++++++++++++++++++++++ samples/joke-agent/graph.py | 18 +++++------------- 2 files changed, 31 insertions(+), 13 deletions(-) create mode 100644 samples/joke-agent/bindings.json diff --git a/samples/joke-agent/bindings.json b/samples/joke-agent/bindings.json new file mode 100644 index 000000000..cc9f641f2 --- /dev/null +++ b/samples/joke-agent/bindings.json @@ -0,0 +1,26 @@ +{ + "version": "2.0", + "resources": [ + { + "resource": "app", + "key": "Guardrail.Escalation.Action.App.2.Shared", + "value": { + "name": { + "defaultValue": "Guardrail.Escalation.Action.App.2", + "isExpression": false, + "displayName": "App Name" + }, + "folderPath": { + "defaultValue": "Shared", + "isExpression": false, + "displayName": "App Folder Path" + } + }, + "metadata": { + "ActivityName": "create_async", + "BindingsVersion": "2.2", + "DisplayLabel": "app_name" + } + } + ] +} diff --git a/samples/joke-agent/graph.py b/samples/joke-agent/graph.py index 1f1043b6f..b940f8f3d 100644 --- a/samples/joke-agent/graph.py +++ b/samples/joke-agent/graph.py @@ -1,7 +1,5 @@ """Joke generating agent that creates family-friendly jokes based on a topic.""" -import os - from langchain.agents import create_agent from langchain_core.messages import HumanMessage from langchain_core.tools import tool @@ -46,15 +44,6 @@ class Output(BaseModel): joke: str -# Escalation Action App configuration (override via env vars to match your -# tenant deployment). Defaults point at the "Guardrail Escalation Action App (2)" -# published as the process "Guardrail.Escalation.Action.App.2" in the "Shared" -# folder (its deployed name/folder in the tenant — verified via `uip or processes list`). -ESCALATION_APP_NAME = os.getenv( - "GUARDRAIL_ESCALATION_APP_NAME", "Guardrail.Escalation.Action.App.2" -) -ESCALATION_APP_FOLDER = os.getenv("GUARDRAIL_ESCALATION_APP_FOLDER", "Shared") - # Initialize UiPathChat LLM llm = UiPathChat(model="gpt-4o-2024-08-06", temperature=0.7) @@ -117,8 +106,11 @@ def analyze_joke_syntax(joke: str) -> str: # before_agent and after_agent). stage=GuardrailExecutionStage.PRE, action=EscalateAction( - app_name=ESCALATION_APP_NAME, - app_folder_path=ESCALATION_APP_FOLDER, + # Escalation Action App — declared as a binding in bindings.json + # (resource "app"). Studio/deploy resolves and can override it; + # locally these literal values are used. + app_name="Guardrail.Escalation.Action.App.2", + app_folder_path="Shared", ), entities=[ PIIDetectionEntity(PIIDetectionEntityType.EMAIL, 0.5),