diff --git a/docs/guardrails.md b/docs/guardrails.md index ef78f9f4d..879110381 100644 --- a/docs/guardrails.md +++ b/docs/guardrails.md @@ -409,6 +409,49 @@ async def my_node(state: Input) -> Output: ... ``` +### Escalation action (human-in-the-loop) + +`EscalateAction` works on the decorator path exactly as it does for middleware — on a violation it suspends the run via `interrupt(CreateEscalation(...))`, creates a review task in a UiPath **Action App**, and resumes on Approve/Reject. It is the **same action class** as the middleware path; just pass it as the `action` of a `@guardrail` on the factory you want to guard. The escalation task's `Component` / `ExecutionStage` are **derived automatically** from the inferred scope of the decorated target — no extra configuration: + +```python +from langchain.agents import create_agent +from uipath_langchain.guardrails import ( + guardrail, + EscalateAction, + PIIValidator, + GuardrailExecutionStage, + PIIDetectionEntity, +) +from uipath_langchain.guardrails.enums import PIIDetectionEntityType +from uipath.platform.action_center.tasks import TaskRecipient, TaskRecipientType + +@guardrail( + validator=PIIValidator( + entities=[PIIDetectionEntity(PIIDetectionEntityType.EMAIL, threshold=0.5)], + ), + action=EscalateAction( + app_name="Guardrail Escalation Action App", + app_folder_path="Shared", + # optional: route the review task to a specific recipient + recipient=TaskRecipient( + type=TaskRecipientType.EMAIL, value="reviewer@example.com" + ), + ), + name="Agent PII escalation", + stage=GuardrailExecutionStage.PRE, # escalate once per run +) +def create_my_agent(): + return create_agent(model=llm, tools=[analyze_text], system_prompt="...") + +agent = create_my_agent() +``` + +On resume: **Approve** continues, substituting the reviewer's edit if any — read from `ReviewedInputs` for a PRE (input) escalation and `ReviewedOutputs` for a POST (output) one, otherwise keeping the original; **Reject** raises `GuardrailBlockException` and terminates the run. The `app_name` / `app_folder_path` / `assignee` / `recipient` / `title` parameters and the auto-derived payload fields behave identically to the [middleware escalation action](#escalation-action-human-in-the-loop) above — refer to it for the full parameter list. + +> 💡 **Scope inference for the payload context.** `Component` / `ExecutionStage` are derived automatically for the adapter-handled LangChain targets — `@tool`, `BaseChatModel` factories, and `create_agent()` factories. On a plain LangGraph node or plain Python function (handled by the core `@guardrail`, which doesn't publish the LangChain runtime context) the escalation still suspends, but those two fields are not populated. + +> 💡 **Escalate once per run.** As with middleware, AGENT/LLM scope validates both *before* and *after* by default. Set `stage=GuardrailExecutionStage.PRE` (or `POST`) so only a single checkpoint is registered. + --- ## Choosing between middleware and decorator diff --git a/pyproject.toml b/pyproject.toml index a0053aa9c..277cf622a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "uipath-langchain" -version = "0.11.16" +version = "0.11.17" description = "Python SDK that enables developers to build and deploy LangGraph agents to the UiPath Cloud Platform" readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.11" diff --git a/samples/joke-agent-decorator/README.md b/samples/joke-agent-decorator/README.md index 409db21e5..22348ff9e 100644 --- a/samples/joke-agent-decorator/README.md +++ b/samples/joke-agent-decorator/README.md @@ -54,6 +54,7 @@ This sample uses a single unified `@guardrail` decorator with three components: | `@guardrail(validator=CustomValidator(...))` | `analyze_joke_syntax` tool | Custom (length check) | `BlockAction` — blocks jokes > 1000 chars | | `@guardrail(validator=CustomValidator(...))` | `analyze_joke_syntax` tool | Custom (always true) | `CustomFilterAction` — always-on output transform (POST) | | `@guardrail(validator=PIIValidator(...EMAIL, PHONE...))` | `analyze_joke_syntax` tool | PII (Email, Phone) | `LogAction(WARNING)` — logs email/phone | +| `@guardrail(validator=PIIValidator(...EMAIL...))` | `create_joke_agent` factory | PII (Email) | `EscalateAction` — suspends for human review (HITL) | | `@guardrail(validator=PIIValidator(...PERSON...))` | `create_joke_agent` factory | PII (Person) | `BlockAction` — blocks person names | | `@guardrail(validator=PIIValidator(...PERSON...))` | `joke_node` graph node | PII (Person) | `BlockAction` — blocks person names in node input | | `@guardrail(validator=CustomValidator(...))` | `format_joke_for_display` function | Custom (word check) | `CustomFilterAction` — replaces "donkey" in display output | @@ -123,6 +124,15 @@ def analyze_joke_syntax(joke: str) -> str: ### Agent-level guardrail ```python +@guardrail( + validator=pii_email, + action=EscalateAction( + app_name=ESCALATION_APP_NAME, + app_folder_path=ESCALATION_APP_FOLDER, + ), + name="Agent PII Escalation", + stage=GuardrailExecutionStage.PRE, +) @guardrail( validator=PIIValidator( entities=[PIIDetectionEntity(PIIDetectionEntityType.PERSON, threshold=0.5)], @@ -140,6 +150,19 @@ def create_joke_agent(): agent = create_joke_agent() ``` +### Agent-level escalation (human-in-the-loop) + +`EscalateAction` turns a guardrail violation into a human review step. On an EMAIL +PII violation in the agent input it suspends the run via the documented HITL +`interrupt(CreateEscalation(...))` primitive, creating a task in the **Guardrail +Escalation Action App**. A human approves (optionally editing the input) or +rejects; on resume the run continues with the reviewed input, or terminates on a +reject. The app name/folder default to the same deployment as the middleware +`joke-agent` and can be overridden with the `GUARDRAIL_ESCALATION_APP_NAME` / +`GUARDRAIL_ESCALATION_APP_FOLDER` environment variables. This is the +decorator-style counterpart of the middleware sample's AGENT-scope escalation — +no SDK change beyond the `@guardrail` action. + ### Graph node guardrail ```python @@ -239,6 +262,23 @@ uv run uipath run agent '{"topic": "donkey, test@example.com"}' Both the agent-scope and LLM-scope PII guardrails log a `WARNING` when the email is detected. The tool-scope PII guardrail logs when the email reaches the tool input. +**Scenario 4 — agent-scope escalation (HITL):** supply a topic containing an email address: + +```bash +uv run uipath run agent '{"topic": "a joke about a@b.com"}' +``` + +The `Agent PII Escalation` guardrail detects the email and suspends the run, +creating a review task in the Guardrail Escalation Action App. After a human +approves (or rejects) the task in Action Center, resume the run: + +```bash +uv run uipath run agent --resume +``` + +On approve the agent continues with the reviewed input; on reject the run +terminates with a guardrail-violation error. + ## Differences from the Middleware Approach (`joke-agent`) | Aspect | Middleware (`joke-agent`) | Decorator (`joke-agent-decorator`) | @@ -256,4 +296,5 @@ Both the agent-scope and LLM-scope PII guardrails log a `WARNING` when the email - `"banana"` — normal run, all guardrails pass - `"donkey"` — triggers the word filter on `analyze_joke_syntax` - `"donkey, test@example.com"` — triggers word filter + PII guardrails at all scopes +- `"a joke about a@b.com"` — triggers the agent-scope escalation (suspends for human review) - `"computer"`, `"coffee"`, `"pizza"`, `"weather"` \ No newline at end of file diff --git a/samples/joke-agent-decorator/bindings.json b/samples/joke-agent-decorator/bindings.json new file mode 100644 index 000000000..cc9f641f2 --- /dev/null +++ b/samples/joke-agent-decorator/bindings.json @@ -0,0 +1,26 @@ +{ + "version": "2.0", + "resources": [ + { + "resource": "app", + "key": "Guardrail.Escalation.Action.App.2.Shared", + "value": { + "name": { + "defaultValue": "Guardrail.Escalation.Action.App.2", + "isExpression": false, + "displayName": "App Name" + }, + "folderPath": { + "defaultValue": "Shared", + "isExpression": false, + "displayName": "App Folder Path" + } + }, + "metadata": { + "ActivityName": "create_async", + "BindingsVersion": "2.2", + "DisplayLabel": "app_name" + } + } + ] +} diff --git a/samples/joke-agent-decorator/graph.py b/samples/joke-agent-decorator/graph.py index 01ac5fb52..26c637027 100644 --- a/samples/joke-agent-decorator/graph.py +++ b/samples/joke-agent-decorator/graph.py @@ -20,6 +20,7 @@ from uipath_langchain.guardrails import ( BlockAction, CustomValidator, + EscalateAction, GuardrailAction, GuardrailExclude, GuardrailExecutionStage, @@ -258,6 +259,24 @@ def analyze_joke_syntax(joke: str) -> str: # --------------------------------------------------------------------------- +# On an EMAIL PII violation in the agent input this escalates to the Guardrail +# Escalation Action App for human review via the documented HITL +# interrupt(CreateEscalation(...)) — the run suspends until a human approves +# (optionally editing the input) or rejects. PRE only, so it triggers once per +# run. This is the decorator-style counterpart of the middleware joke-agent's +# AGENT-scope PII escalation. +@guardrail( + validator=pii_email, + action=EscalateAction( + # Escalation Action App — declared as a binding in bindings.json (resource + # "app"). Studio/deploy resolves and can override it; locally these literal + # values are used. + app_name="Guardrail.Escalation.Action.App.2", + app_folder_path="Shared", + ), + name="Agent PII Escalation", + stage=GuardrailExecutionStage.PRE, +) @guardrail( validator=HarmfulContentValidator( entities=[HarmfulContentEntity(HarmfulContentEntityType.VIOLENCE, threshold=2)], diff --git a/samples/joke-agent-decorator/pyproject.toml b/samples/joke-agent-decorator/pyproject.toml index 98293c18c..747a9ea85 100644 --- a/samples/joke-agent-decorator/pyproject.toml +++ b/samples/joke-agent-decorator/pyproject.toml @@ -5,7 +5,7 @@ description = "Joke generating agent that creates family-friendly jokes based on authors = [{ name = "John Doe", email = "john.doe@myemail.com" }] requires-python = ">=3.11" dependencies = [ - "uipath-langchain>=0.9.26, <0.11.0", + "uipath-langchain>=0.11.13, <0.12.0", "uipath>2.7.0", ] diff --git a/src/uipath_langchain/guardrails/_langchain_adapter.py b/src/uipath_langchain/guardrails/_langchain_adapter.py index 67663eb62..c6b6fd3e4 100644 --- a/src/uipath_langchain/guardrails/_langchain_adapter.py +++ b/src/uipath_langchain/guardrails/_langchain_adapter.py @@ -10,6 +10,7 @@ The adapter is auto-registered when ``uipath_langchain.guardrails`` is imported. """ +import json import logging from functools import wraps from typing import Any @@ -20,7 +21,7 @@ from langgraph.graph import StateGraph from langgraph.graph.state import CompiledStateGraph from langgraph.types import Command -from uipath.core.guardrails import GuardrailValidationResultType +from uipath.core.guardrails import GuardrailScope, GuardrailValidationResultType from uipath.platform.guardrails.decorators._core import ( _EvaluatorFn, _extract_input, @@ -33,6 +34,11 @@ from uipath.runtime.errors import UiPathErrorCategory from uipath_langchain.agent.exceptions import AgentRuntimeError, AgentRuntimeErrorCode +from uipath_langchain.guardrails._action_context import ( + GuardrailActionContext, + _action_context, + component_label, +) from uipath_langchain.guardrails.middlewares._utils import create_modified_tool_result logger = logging.getLogger(__name__) @@ -53,6 +59,51 @@ def _convert_block_exception(exc: GuardrailBlockException) -> AgentRuntimeError: ) +# --------------------------------------------------------------------------- +# Guardrail action invocation (with runtime context publishing) +# --------------------------------------------------------------------------- + + +def _run_action( + action: GuardrailAction, + result: Any, + data: Any, + name: str, + *, + scope: GuardrailScope | None, + stage: GuardrailExecutionStage | None, + component: str | None, + input_payload: str | None = None, +) -> str | dict[str, Any] | None: + """Run a guardrail action with its runtime context published. + + Publishes the guardrail context (scope / stage / component / input_payload) + on a ``ContextVar`` for the duration of the action call, so context-aware + actions (e.g. ``EscalateAction``) can read it at runtime instead of requiring + it to be hardcoded. ``input_payload`` carries the original input at POST so + the action can show both input and output. Mirrors the middleware path's + publishing in ``_base.py``. + + A :class:`GuardrailBlockException` is converted to an ``AgentRuntimeError``. + Any other exception — notably LangGraph's ``GraphInterrupt`` raised by an + escalation action — propagates unchanged so the run can suspend. + """ + token = _action_context.set( + GuardrailActionContext( + scope=scope, + execution_stage=stage, + component=component, + input_payload=input_payload, + ) + ) + try: + return action.handle_validation_result(result, data, name) + except GuardrailBlockException as exc: + raise _convert_block_exception(exc) from exc + finally: + _action_context.reset(token) + + # --------------------------------------------------------------------------- # Message helpers # --------------------------------------------------------------------------- @@ -88,6 +139,23 @@ def _extract_message_text(msg: BaseMessage) -> str: return "" +def _input_payload_from_messages( + messages: list[BaseMessage] | None, +) -> str | None: + """JSON-encode the last HumanMessage's text as the original-input payload. + + Used at POST so the escalation action can show the original input alongside + the flagged output. Returns ``None`` when there is no usable human input. + """ + if not messages: + return None + msg = _get_last_human_message(messages) + if msg is None: + return None + text = _extract_message_text(msg) + return json.dumps(text) if text else None + + def _apply_message_text_modification(msg: BaseMessage, modified: str) -> None: """Apply a modified text string back to a message in-place.""" if isinstance(msg.content, str): @@ -158,10 +226,15 @@ def _apply_pre(tool_input: Any) -> Any: return tool_input modified = None if result.result == GuardrailValidationResultType.VALIDATION_FAILED: - try: - modified = action.handle_validation_result(result, input_data, name) - except GuardrailBlockException as exc: - raise _convert_block_exception(exc) from exc + modified = _run_action( + action, + result, + input_data, + name, + scope=GuardrailScope.TOOL, + stage=GuardrailExecutionStage.PRE, + component=tool.name, + ) if modified is not None and isinstance(modified, dict): return _rewrap_input(tool_input, modified) return tool_input @@ -183,10 +256,16 @@ def _apply_post(tool_input: Any, raw_result: Any) -> Any: return raw_result modified = None if result.result == GuardrailValidationResultType.VALIDATION_FAILED: - try: - modified = action.handle_validation_result(result, output_data, name) - except GuardrailBlockException as exc: - raise _convert_block_exception(exc) from exc + modified = _run_action( + action, + result, + output_data, + name, + scope=GuardrailScope.TOOL, + stage=GuardrailExecutionStage.POST, + component=tool.name, + input_payload=json.dumps(input_data), + ) if modified is not None: if isinstance(raw_result, (ToolMessage, Command)): return create_modified_tool_result(raw_result, modified) @@ -245,10 +324,15 @@ def _apply_llm_pre( return modified = None if result.result == GuardrailValidationResultType.VALIDATION_FAILED: - try: - modified = action.handle_validation_result(result, text, name) - except GuardrailBlockException as exc: - raise _convert_block_exception(exc) from exc + modified = _run_action( + action, + result, + text, + name, + scope=GuardrailScope.LLM, + stage=GuardrailExecutionStage.PRE, + component=component_label(GuardrailScope.LLM), + ) if isinstance(modified, str) and modified != text: _apply_message_text_modification(msg, modified) @@ -258,6 +342,7 @@ def _apply_llm_post( evaluator: _EvaluatorFn, action: GuardrailAction, name: str, + input_messages: list[BaseMessage] | None = None, ) -> None: """Evaluate the AIMessage content in-place (POST stage, LLM scope).""" if not isinstance(response.content, str) or not response.content: @@ -268,10 +353,16 @@ def _apply_llm_post( return modified = None if result.result == GuardrailValidationResultType.VALIDATION_FAILED: - try: - modified = action.handle_validation_result(result, response.content, name) - except GuardrailBlockException as exc: - raise _convert_block_exception(exc) from exc + modified = _run_action( + action, + result, + response.content, + name, + scope=GuardrailScope.LLM, + stage=GuardrailExecutionStage.POST, + component=component_label(GuardrailScope.LLM), + input_payload=_input_payload_from_messages(input_messages), + ) if isinstance(modified, str) and modified != response.content: response.content = modified @@ -299,7 +390,13 @@ def invoke(self, messages: Any, config: Any = None, **kwargs: Any) -> Any: GuardrailExecutionStage.POST, GuardrailExecutionStage.PRE_AND_POST, ): - _apply_llm_post(response, evaluator, action, name) + _apply_llm_post( + response, + evaluator, + action, + name, + input_messages=messages if isinstance(messages, list) else None, + ) return response async def ainvoke( @@ -315,7 +412,13 @@ async def ainvoke( GuardrailExecutionStage.POST, GuardrailExecutionStage.PRE_AND_POST, ): - _apply_llm_post(response, evaluator, action, name) + _apply_llm_post( + response, + evaluator, + action, + name, + input_messages=messages if isinstance(messages, list) else None, + ) return response llm.__class__ = _GuardedLLM @@ -351,10 +454,15 @@ def _apply_agent_input_guardrail( return modified = None if result.result == GuardrailValidationResultType.VALIDATION_FAILED: - try: - modified = action.handle_validation_result(result, text, name) - except GuardrailBlockException as exc: - raise _convert_block_exception(exc) from exc + modified = _run_action( + action, + result, + text, + name, + scope=GuardrailScope.AGENT, + stage=GuardrailExecutionStage.PRE, + component=component_label(GuardrailScope.AGENT), + ) if isinstance(modified, str) and modified != text: _apply_message_text_modification(msg, modified) @@ -383,10 +491,16 @@ def _apply_agent_output_guardrail( return modified = None if result.result == GuardrailValidationResultType.VALIDATION_FAILED: - try: - modified = action.handle_validation_result(result, text, name) - except GuardrailBlockException as exc: - raise _convert_block_exception(exc) from exc + modified = _run_action( + action, + result, + text, + name, + scope=GuardrailScope.AGENT, + stage=GuardrailExecutionStage.POST, + component=component_label(GuardrailScope.AGENT), + input_payload=_input_payload_from_messages(messages), + ) if isinstance(modified, str) and modified != text: _apply_message_text_modification(msg, modified) diff --git a/tests/cli/test_guardrails_in_langgraph.py b/tests/cli/test_guardrails_in_langgraph.py index 902ac1223..41461ea55 100644 --- a/tests/cli/test_guardrails_in_langgraph.py +++ b/tests/cli/test_guardrails_in_langgraph.py @@ -55,7 +55,9 @@ GuardrailExecutionStage, PIIDetectionEntity, PIIDetectionEntityType, + PIIValidator, UiPathPIIDetectionMiddleware, + guardrail, ) from uipath_langchain.runtime import register_runtime_factory @@ -863,3 +865,176 @@ async def test_llm_post_approve_applies_reviewed_output(self) -> None: # The reviewer's edit was written back to the LLM output via after_model. assert "__interrupt__" not in final assert final["messages"][-1].content == "clean output" + + +# --------------------------------------------------------------------------- +# Decorator escalation (HITL) — interrupt → resume, via @guardrail + EscalateAction +# --------------------------------------------------------------------------- + + +class TestDecoratorEscalation: + """The decorator @guardrail EscalateAction suspends via interrupt() and resumes. + + The adapter fires interrupt() before the guarded agent's real invoke, so — like + the joke-agent-decorator sample — the guarded agent is embedded in an outer + graph node, which gives interrupt() the graph/checkpoint context it needs. + Payload (Component/ExecutionStage/Inputs) matches TestMiddlewareEscalation. + """ + + @pytest.fixture(autouse=True) + def _setup_env(self, mock_env_vars: dict[str, str]): + os.environ.clear() + os.environ.update(mock_env_vars) + + def _build_graph(self) -> Any: + from typing import Annotated + + from langgraph.graph import END, START, StateGraph + from langgraph.graph.message import add_messages + from typing_extensions import TypedDict + + llm = UiPathChatOpenAI(model="gpt-4o-2024-11-20") # type: ignore[call-arg] + + @guardrail( + validator=PIIValidator( + entities=[PIIDetectionEntity(PIIDetectionEntityType.EMAIL, 0.5)] + ), + action=EscalateAction(app_name="EscApp", app_folder_path="Shared"), + name="PII escalation guardrail", + stage=GuardrailExecutionStage.PRE, + ) + def create_joke_agent() -> Any: + return create_agent(model=llm, tools=[]) + + inner_agent = create_joke_agent() + + class State(TypedDict): + messages: Annotated[list[Any], add_messages] + + async def node(state: State) -> dict[str, Any]: + result = await inner_agent.ainvoke({"messages": state["messages"]}) + return {"messages": result["messages"]} + + builder = StateGraph(State) + builder.add_node("n", node) + builder.add_edge(START, "n") + builder.add_edge("n", END) + return builder.compile(checkpointer=MemorySaver()) + + @pytest.mark.asyncio + async def test_escalation_suspends_with_context_derived_payload(self) -> None: + graph = self._build_graph() + config = {"configurable": {"thread_id": "dec-esc-suspend"}} + with ( + patch( + "uipath_langchain.chat.openai.UiPathChatOpenAI.ainvoke", + side_effect=_final_llm, + ), + patch( + "uipath.platform.guardrails.GuardrailsService.evaluate_guardrail", + side_effect=_fail_on_email, + ), + ): + result = await graph.ainvoke( + {"messages": [HumanMessage(content="joke about a@b.com")]}, config + ) + + cre = _interrupt_value(result, graph, config) + assert isinstance(cre, CreateEscalation) + assert cre.app_name == "EscApp" + assert cre.app_folder_path == "Shared" + assert cre.data is not None + # Component + ExecutionStage derived from the runtime guardrail context the + # adapter publishes — identical to the middleware flavor. + assert cre.data["Component"] == "Agent" + assert cre.data["ExecutionStage"] == "PreExecution" + assert cre.data["Inputs"] == json.dumps("joke about a@b.com") + assert cre.data["GuardrailName"] == "PII escalation guardrail" + + @pytest.mark.asyncio + async def test_escalation_approve_applies_reviewed_input(self) -> None: + graph = self._build_graph() + config = {"configurable": {"thread_id": "dec-esc-approve"}} + with ( + patch( + "uipath_langchain.chat.openai.UiPathChatOpenAI.ainvoke", + side_effect=_final_llm, + ), + patch( + "uipath.platform.guardrails.GuardrailsService.evaluate_guardrail", + side_effect=_fail_on_email, + ), + ): + await graph.ainvoke( + {"messages": [HumanMessage(content="joke about a@b.com")]}, config + ) + final = await graph.ainvoke( + Command( + resume={ + "action": "Approve", + "data": {"ReviewedInputs": "clean topic"}, + } + ), + config, + ) + + # Run completed (no second escalation — stage=PRE) and the reviewed input + # was substituted into the message the agent ran on. + assert "__interrupt__" not in final + assert final["messages"][0].content == "clean topic" + + @pytest.mark.asyncio + async def test_escalation_approve_without_edit_keeps_original_input(self) -> None: + graph = self._build_graph() + config = {"configurable": {"thread_id": "dec-esc-approve-noedit"}} + with ( + patch( + "uipath_langchain.chat.openai.UiPathChatOpenAI.ainvoke", + side_effect=_final_llm, + ), + patch( + "uipath.platform.guardrails.GuardrailsService.evaluate_guardrail", + side_effect=_fail_on_email, + ), + ): + await graph.ainvoke( + {"messages": [HumanMessage(content="joke about a@b.com")]}, config + ) + final = await graph.ainvoke( + Command(resume={"action": "Approve", "data": {}}), + config, + ) + + # Approve without ReviewedInputs: the run completes on the *original* + # input (the action returns None, so the adapter leaves the message + # untouched) — no re-suspend despite the input still tripping the + # guardrail on replay. + assert "__interrupt__" not in final + assert final["messages"][0].content == "joke about a@b.com" + assert final["messages"][-1].content == "final answer" + + @pytest.mark.asyncio + async def test_escalation_reject_terminates_run(self) -> None: + graph = self._build_graph() + config = {"configurable": {"thread_id": "dec-esc-reject"}} + with ( + patch( + "uipath_langchain.chat.openai.UiPathChatOpenAI.ainvoke", + side_effect=_final_llm, + ), + patch( + "uipath.platform.guardrails.GuardrailsService.evaluate_guardrail", + side_effect=_fail_on_email, + ), + ): + await graph.ainvoke( + {"messages": [HumanMessage(content="joke about a@b.com")]}, config + ) + with pytest.raises(AgentRuntimeError) as exc_info: + await graph.ainvoke( + Command( + resume={"action": "Reject", "data": {"Reason": "contains PII"}} + ), + config, + ) + assert "contains PII" in str(exc_info.value) diff --git a/tests/guardrails/test_adapter_context_publishing.py b/tests/guardrails/test_adapter_context_publishing.py new file mode 100644 index 000000000..32f1e38da --- /dev/null +++ b/tests/guardrails/test_adapter_context_publishing.py @@ -0,0 +1,607 @@ +"""Tests that the LangChain adapter publishes guardrail context to the action. + +The decorator (``@guardrail``) path wraps LangChain/LangGraph objects via +``LangChainGuardrailAdapter``. Like the middleware path, the adapter must publish +the guardrail runtime context (scope / stage / component) around each action call +so context-aware actions (e.g. ``EscalateAction``) can derive +``Component`` / ``ExecutionStage`` instead of requiring them to be hardcoded. + +The action call sites only catch ``GuardrailBlockException`` (the broad +``except Exception`` is around the evaluator, not the action), so a LangGraph +``GraphInterrupt`` raised by an escalation action propagates unchanged — these +tests guard that too. +""" + +import json +from typing import Any + +import pytest +from langchain_core.language_models import BaseChatModel +from langchain_core.messages import AIMessage, HumanMessage +from langchain_core.outputs import ChatGeneration, ChatResult +from langchain_core.tools import tool +from langgraph.errors import GraphInterrupt +from uipath.core.guardrails import ( + GuardrailScope, + GuardrailValidationResult, + GuardrailValidationResultType, +) + +from uipath_langchain.guardrails import GuardrailAction +from uipath_langchain.guardrails._action_context import ( + GuardrailActionContext, + current_action_context, +) +from uipath_langchain.guardrails._langchain_adapter import ( + LangChainGuardrailAdapter, + _apply_agent_input_guardrail, + _apply_agent_output_guardrail, + _apply_llm_post, + _apply_llm_pre, + _input_payload_from_messages, + _wrap_compiled_graph_with_guardrail, + _wrap_llm_with_guardrail, + _wrap_stategraph_with_guardrail, + _wrap_tool_with_guardrail, +) +from uipath_langchain.guardrails.enums import GuardrailExecutionStage + +_FAILED = GuardrailValidationResult( + result=GuardrailValidationResultType.VALIDATION_FAILED, reason="violation" +) +_PASSED = GuardrailValidationResult( + result=GuardrailValidationResultType.PASSED, reason="" +) + + +class _RecordingAction(GuardrailAction): + """Captures the published context when invoked; optionally raises.""" + + def __init__(self, raise_exc: BaseException | None = None) -> None: + self.seen: GuardrailActionContext | None = None + self.called = False + self._raise = raise_exc + + def handle_validation_result( + self, result: Any, data: Any, guardrail_name: str + ) -> Any: + self.called = True + self.seen = current_action_context() + if self._raise is not None: + raise self._raise + return None + + +def _fail_eval(*_args: Any, **_kwargs: Any) -> GuardrailValidationResult: + return _FAILED + + +def _pass_eval(*_args: Any, **_kwargs: Any) -> GuardrailValidationResult: + return _PASSED + + +def _raise_eval(*_args: Any, **_kwargs: Any) -> GuardrailValidationResult: + """Evaluator that raises — exercises the ``except Exception`` log branches.""" + raise RuntimeError("evaluator boom") + + +class _ModifyingAction(GuardrailAction): + """Action that returns a modified string — exercises the apply-modification branches.""" + + def handle_validation_result( + self, result: Any, data: Any, guardrail_name: str + ) -> Any: + return "MODIFIED" + + +@tool +def my_tool(text: str) -> str: + """Echo the input text back.""" + return f"echo: {text}" + + +class _FakeChatModel(BaseChatModel): + """Minimal BaseChatModel whose response is a fixed AIMessage. + + Used to drive the ``_wrap_llm_with_guardrail`` ``invoke``/``ainvoke`` wrappers + (the wrapper swaps ``__class__``, exactly like the tool wrapper). + """ + + reply: str + + @property + def _llm_type(self) -> str: + return "fake" + + def _generate(self, messages, stop=None, run_manager=None, **kwargs): + return ChatResult( + generations=[ChatGeneration(message=AIMessage(content=self.reply))] + ) + + +class _FakeGraph: + """Minimal graph-like object exposing ``invoke``/``ainvoke`` for the graph wrappers.""" + + def __init__(self, output: dict[str, Any]) -> None: + self._output = output + + def invoke(self, input: Any, config: Any = None, **kwargs: Any) -> Any: + return self._output + + async def ainvoke(self, input: Any, config: Any = None, **kwargs: Any) -> Any: + return self._output + + +# --------------------------------------------------------------------------- +# Tool scope — context published via the wrapped tool's invoke() +# --------------------------------------------------------------------------- + + +class TestToolContextPublishing: + def test_pre_publishes_tool_pre_context_with_tool_name(self) -> None: + action = _RecordingAction() + wrapped = _wrap_tool_with_guardrail( + my_tool, _fail_eval, action, "g", GuardrailExecutionStage.PRE + ) + wrapped.invoke({"text": "a@b.com"}) + assert action.seen is not None + assert action.seen.scope == GuardrailScope.TOOL + assert action.seen.execution_stage == GuardrailExecutionStage.PRE + assert action.seen.component == "my_tool" + assert action.seen.input_payload is None # no separate input at PRE + + def test_post_publishes_tool_post_context(self) -> None: + action = _RecordingAction() + wrapped = _wrap_tool_with_guardrail( + my_tool, _fail_eval, action, "g", GuardrailExecutionStage.POST + ) + wrapped.invoke({"text": "a@b.com"}) + assert action.seen is not None + assert action.seen.scope == GuardrailScope.TOOL + assert action.seen.execution_stage == GuardrailExecutionStage.POST + assert action.seen.component == "my_tool" + # POST carries the original tool input so the action shows both sides. + assert action.seen.input_payload == json.dumps({"text": "a@b.com"}) + + def test_context_reset_after_invoke(self) -> None: + action = _RecordingAction() + wrapped = _wrap_tool_with_guardrail( + my_tool, _fail_eval, action, "g", GuardrailExecutionStage.PRE + ) + wrapped.invoke({"text": "a@b.com"}) + assert current_action_context() is None + + def test_pre_reraises_graph_interrupt(self) -> None: + action = _RecordingAction(raise_exc=GraphInterrupt(())) + wrapped = _wrap_tool_with_guardrail( + my_tool, _fail_eval, action, "g", GuardrailExecutionStage.PRE + ) + with pytest.raises(GraphInterrupt): + wrapped.invoke({"text": "a@b.com"}) + + +# --------------------------------------------------------------------------- +# LLM scope — context published via _apply_llm_pre / _apply_llm_post +# --------------------------------------------------------------------------- + + +class TestLLMContextPublishing: + def test_pre_publishes_llm_pre_context(self) -> None: + action = _RecordingAction() + _apply_llm_pre([HumanMessage(content="hi a@b.com")], _fail_eval, action, "g") + assert action.seen is not None + assert action.seen.scope == GuardrailScope.LLM + assert action.seen.execution_stage == GuardrailExecutionStage.PRE + assert action.seen.component == "LLM call" + assert action.seen.input_payload is None # no separate input at PRE + + def test_post_publishes_llm_post_context(self) -> None: + action = _RecordingAction() + _apply_llm_post( + AIMessage(content="answer a@b.com"), + _fail_eval, + action, + "g", + input_messages=[HumanMessage(content="hi a@b.com")], + ) + assert action.seen is not None + assert action.seen.scope == GuardrailScope.LLM + assert action.seen.execution_stage == GuardrailExecutionStage.POST + assert action.seen.component == "LLM call" + # POST carries the original human input so the action shows both sides. + assert action.seen.input_payload == json.dumps("hi a@b.com") + + def test_context_reset_after_call(self) -> None: + action = _RecordingAction() + _apply_llm_pre([HumanMessage(content="hi a@b.com")], _fail_eval, action, "g") + assert current_action_context() is None + + def test_passed_result_does_not_invoke_action(self) -> None: + action = _RecordingAction() + _apply_llm_pre([HumanMessage(content="clean")], _pass_eval, action, "g") + assert action.called is False + + def test_pre_reraises_graph_interrupt(self) -> None: + action = _RecordingAction(raise_exc=GraphInterrupt(())) + with pytest.raises(GraphInterrupt): + _apply_llm_pre([HumanMessage(content="a@b.com")], _fail_eval, action, "g") + + +# --------------------------------------------------------------------------- +# Agent scope — context published via _apply_agent_input/output_guardrail +# --------------------------------------------------------------------------- + + +class TestAgentContextPublishing: + def test_input_publishes_agent_pre_context(self) -> None: + action = _RecordingAction() + _apply_agent_input_guardrail( + {"messages": [HumanMessage(content="hi a@b.com")]}, + _fail_eval, + action, + "g", + ) + assert action.seen is not None + assert action.seen.scope == GuardrailScope.AGENT + assert action.seen.execution_stage == GuardrailExecutionStage.PRE + assert action.seen.component == "Agent" + assert action.seen.input_payload is None # no separate input at PRE + + def test_output_publishes_agent_post_context(self) -> None: + action = _RecordingAction() + _apply_agent_output_guardrail( + { + "messages": [ + HumanMessage(content="hi a@b.com"), + AIMessage(content="answer a@b.com"), + ] + }, + _fail_eval, + action, + "g", + ) + assert action.seen is not None + assert action.seen.scope == GuardrailScope.AGENT + assert action.seen.execution_stage == GuardrailExecutionStage.POST + assert action.seen.component == "Agent" + # POST carries the original human input (from the output messages). + assert action.seen.input_payload == json.dumps("hi a@b.com") + + def test_context_reset_after_call(self) -> None: + action = _RecordingAction() + _apply_agent_input_guardrail( + {"messages": [HumanMessage(content="hi a@b.com")]}, + _fail_eval, + action, + "g", + ) + assert current_action_context() is None + + def test_input_reraises_graph_interrupt(self) -> None: + action = _RecordingAction(raise_exc=GraphInterrupt(())) + with pytest.raises(GraphInterrupt): + _apply_agent_input_guardrail( + {"messages": [HumanMessage(content="a@b.com")]}, + _fail_eval, + action, + "g", + ) + + +# --------------------------------------------------------------------------- +# _input_payload_from_messages — the original-input-at-POST helper +# --------------------------------------------------------------------------- + + +class TestInputPayloadFromMessages: + def test_none_when_no_messages(self) -> None: + assert _input_payload_from_messages(None) is None + assert _input_payload_from_messages([]) is None + + def test_none_when_no_human_message(self) -> None: + assert _input_payload_from_messages([AIMessage(content="x")]) is None + + def test_json_encodes_last_human_text(self) -> None: + assert _input_payload_from_messages( + [HumanMessage(content="hi there")] + ) == json.dumps("hi there") + + +# --------------------------------------------------------------------------- +# LLM scope — context published via the wrapped model's invoke()/ainvoke() +# --------------------------------------------------------------------------- + + +class TestLLMWrapperContextPublishing: + def test_sync_invoke_publishes_llm_context_with_input_payload(self) -> None: + action = _RecordingAction() + llm = _FakeChatModel(reply="answer a@b.com") + wrapped = _wrap_llm_with_guardrail( + llm, _fail_eval, action, "g", GuardrailExecutionStage.PRE_AND_POST + ) + wrapped.invoke([HumanMessage(content="hi a@b.com")]) + assert action.seen is not None + assert action.seen.scope == GuardrailScope.LLM + assert action.seen.component == "LLM call" + # POST fires last; it carries the original input alongside the output. + assert action.seen.execution_stage == GuardrailExecutionStage.POST + assert action.seen.input_payload == json.dumps("hi a@b.com") + + @pytest.mark.asyncio + async def test_async_ainvoke_publishes_llm_context_with_input_payload(self) -> None: + action = _RecordingAction() + llm = _FakeChatModel(reply="answer a@b.com") + wrapped = _wrap_llm_with_guardrail( + llm, _fail_eval, action, "g", GuardrailExecutionStage.PRE_AND_POST + ) + await wrapped.ainvoke([HumanMessage(content="hi a@b.com")]) + assert action.seen is not None + assert action.seen.component == "LLM call" + assert action.seen.execution_stage == GuardrailExecutionStage.POST + assert action.seen.input_payload == json.dumps("hi a@b.com") + + +# --------------------------------------------------------------------------- +# Graph wrappers — StateGraph and CompiledStateGraph invoke()/ainvoke() +# --------------------------------------------------------------------------- + + +def _graph_output() -> dict[str, Any]: + return { + "messages": [ + HumanMessage(content="hi a@b.com"), + AIMessage(content="answer a@b.com"), + ] + } + + +class TestGraphWrapperContextPublishing: + # PRE_AND_POST exercises both the PRE (input) and POST (output) branches in one + # call; the recorded context is the last (POST) one. + + def test_stategraph_sync_invoke_publishes_agent_context(self) -> None: + action = _RecordingAction() + graph: Any = _FakeGraph(_graph_output()) + _wrap_stategraph_with_guardrail( + graph, _fail_eval, action, "g", GuardrailExecutionStage.PRE_AND_POST + ) + graph.invoke({"messages": [HumanMessage(content="hi a@b.com")]}) + assert action.seen is not None + assert action.seen.scope == GuardrailScope.AGENT + assert action.seen.component == "Agent" + assert action.seen.execution_stage == GuardrailExecutionStage.POST + assert action.seen.input_payload == json.dumps("hi a@b.com") + + @pytest.mark.asyncio + async def test_stategraph_async_ainvoke_publishes_agent_context(self) -> None: + action = _RecordingAction() + graph: Any = _FakeGraph(_graph_output()) + _wrap_stategraph_with_guardrail( + graph, _fail_eval, action, "g", GuardrailExecutionStage.PRE_AND_POST + ) + await graph.ainvoke({"messages": [HumanMessage(content="hi a@b.com")]}) + assert action.seen is not None + assert action.seen.component == "Agent" + assert action.seen.execution_stage == GuardrailExecutionStage.POST + + def test_compiled_graph_sync_invoke_publishes_agent_context(self) -> None: + action = _RecordingAction() + graph: Any = _FakeGraph(_graph_output()) + _wrap_compiled_graph_with_guardrail( + graph, _fail_eval, action, "g", GuardrailExecutionStage.PRE_AND_POST + ) + graph.invoke({"messages": [HumanMessage(content="hi a@b.com")]}) + assert action.seen is not None + assert action.seen.component == "Agent" + assert action.seen.execution_stage == GuardrailExecutionStage.POST + assert action.seen.input_payload == json.dumps("hi a@b.com") + + @pytest.mark.asyncio + async def test_compiled_graph_async_ainvoke_publishes_agent_context(self) -> None: + action = _RecordingAction() + graph: Any = _FakeGraph(_graph_output()) + _wrap_compiled_graph_with_guardrail( + graph, _fail_eval, action, "g", GuardrailExecutionStage.PRE_AND_POST + ) + await graph.ainvoke({"messages": [HumanMessage(content="hi a@b.com")]}) + assert action.seen is not None + assert action.seen.component == "Agent" + assert action.seen.execution_stage == GuardrailExecutionStage.POST + + +# --------------------------------------------------------------------------- +# Defensive branches — evaluator errors are swallowed; malformed input no-ops +# --------------------------------------------------------------------------- + + +class TestAdapterDefensiveBranches: + def test_tool_pre_swallows_evaluator_exception(self) -> None: + # Fresh tool instance: _wrap_tool_with_guardrail swaps __class__, and the + # module-level my_tool is shared, so wrapping a fresh tool avoids stacking. + @tool + def echo_pre(text: str) -> str: + """Echo the input text back.""" + return f"echo: {text}" + + action = _RecordingAction() + wrapped = _wrap_tool_with_guardrail( + echo_pre, _raise_eval, action, "g", GuardrailExecutionStage.PRE + ) + # Evaluator raises → logged & swallowed; tool still runs, action not called. + assert wrapped.invoke({"text": "x"}) == "echo: x" + assert action.called is False + + def test_tool_post_swallows_evaluator_exception(self) -> None: + @tool + def echo_post(text: str) -> str: + """Echo the input text back.""" + return f"echo: {text}" + + action = _RecordingAction() + wrapped = _wrap_tool_with_guardrail( + echo_post, _raise_eval, action, "g", GuardrailExecutionStage.POST + ) + assert wrapped.invoke({"text": "x"}) == "echo: x" + assert action.called is False + + @pytest.mark.parametrize( + "bad_input", + ["not a dict", {}, {"messages": "notalist"}, {"messages": []}], + ) + def test_agent_input_guardrail_noops_on_malformed_input( + self, bad_input: Any + ) -> None: + action = _RecordingAction() + _apply_agent_input_guardrail(bad_input, _fail_eval, action, "g") + assert action.called is False + + @pytest.mark.parametrize( + "bad_output", + [ + "not a dict", + {}, + {"messages": "notalist"}, + {"messages": [HumanMessage(content="only human")]}, + {"messages": [AIMessage(content="")]}, + ], + ) + def test_agent_output_guardrail_noops_on_malformed_output( + self, bad_output: Any + ) -> None: + action = _RecordingAction() + _apply_agent_output_guardrail(bad_output, _fail_eval, action, "g") + assert action.called is False + + +# --------------------------------------------------------------------------- +# Apply helpers — empty/raise no-ops and modification application +# --------------------------------------------------------------------------- + + +class TestApplyHelperBranches: + # --- LLM pre --- + def test_llm_pre_noops_without_human_message(self) -> None: + action = _RecordingAction() + _apply_llm_pre([AIMessage(content="x")], _fail_eval, action, "g") + assert action.called is False + + def test_llm_pre_noops_on_empty_text(self) -> None: + action = _RecordingAction() + _apply_llm_pre([HumanMessage(content="")], _fail_eval, action, "g") + assert action.called is False + + def test_llm_pre_swallows_evaluator_exception(self) -> None: + action = _RecordingAction() + _apply_llm_pre([HumanMessage(content="hi")], _raise_eval, action, "g") + assert action.called is False + + def test_llm_pre_applies_modification(self) -> None: + msg = HumanMessage(content="hi a@b.com") + _apply_llm_pre([msg], _fail_eval, _ModifyingAction(), "g") + assert msg.content == "MODIFIED" + + # --- LLM post --- + def test_llm_post_noops_on_nonstring_content(self) -> None: + action = _RecordingAction() + _apply_llm_post( + AIMessage(content=[{"type": "text", "text": "x"}]), + _fail_eval, + action, + "g", + ) + assert action.called is False + + def test_llm_post_swallows_evaluator_exception(self) -> None: + action = _RecordingAction() + _apply_llm_post(AIMessage(content="ans"), _raise_eval, action, "g") + assert action.called is False + + def test_llm_post_applies_modification(self) -> None: + resp = AIMessage(content="ans a@b.com") + _apply_llm_post(resp, _fail_eval, _ModifyingAction(), "g") + assert resp.content == "MODIFIED" + + # --- Agent input --- + def test_agent_input_noops_on_empty_text(self) -> None: + action = _RecordingAction() + _apply_agent_input_guardrail( + {"messages": [HumanMessage(content="")]}, _fail_eval, action, "g" + ) + assert action.called is False + + def test_agent_input_swallows_evaluator_exception(self) -> None: + action = _RecordingAction() + _apply_agent_input_guardrail( + {"messages": [HumanMessage(content="hi")]}, _raise_eval, action, "g" + ) + assert action.called is False + + def test_agent_input_applies_modification(self) -> None: + msg = HumanMessage(content="hi a@b.com") + _apply_agent_input_guardrail( + {"messages": [msg]}, _fail_eval, _ModifyingAction(), "g" + ) + assert msg.content == "MODIFIED" + + # --- Agent output --- + def test_agent_output_swallows_evaluator_exception(self) -> None: + action = _RecordingAction() + _apply_agent_output_guardrail( + {"messages": [AIMessage(content="ans")]}, _raise_eval, action, "g" + ) + assert action.called is False + + def test_agent_output_applies_modification(self) -> None: + msg = AIMessage(content="ans a@b.com") + _apply_agent_output_guardrail( + {"messages": [HumanMessage(content="hi"), msg]}, + _fail_eval, + _ModifyingAction(), + "g", + ) + assert msg.content == "MODIFIED" + + # --- multimodal text extraction (list content) --- + def test_input_payload_from_multimodal_human_message(self) -> None: + msg = HumanMessage(content=[{"type": "text", "text": "hi there"}]) + assert _input_payload_from_messages([msg]) == json.dumps("hi there") + + +# --------------------------------------------------------------------------- +# LangChainGuardrailAdapter.wrap dispatch +# --------------------------------------------------------------------------- + + +class TestAdapterWrapDispatch: + def test_recognize(self) -> None: + adapter = LangChainGuardrailAdapter() + assert adapter.recognize(my_tool) is True + assert adapter.recognize(object()) is False + + def test_wrap_unknown_target_returns_unchanged(self) -> None: + adapter = LangChainGuardrailAdapter() + sentinel = object() + assert ( + adapter.wrap( + sentinel, + _fail_eval, + _RecordingAction(), + "g", + GuardrailExecutionStage.PRE, + ) + is sentinel + ) + + def test_wrap_dispatches_stategraph(self) -> None: + from langgraph.graph import StateGraph + from typing_extensions import TypedDict + + class _S(TypedDict): + messages: list[Any] + + builder = StateGraph(_S) + adapter = LangChainGuardrailAdapter() + result = adapter.wrap( + builder, _fail_eval, _RecordingAction(), "g", GuardrailExecutionStage.PRE + ) + assert result is builder diff --git a/uv.lock b/uv.lock index a425af33d..3bacdc8ae 100644 --- a/uv.lock +++ b/uv.lock @@ -4388,7 +4388,7 @@ wheels = [ [[package]] name = "uipath-langchain" -version = "0.11.16" +version = "0.11.17" source = { editable = "." } dependencies = [ { name = "a2a-sdk" },