From 66d86caf1208e8458170464908fb010e6e814001 Mon Sep 17 00:00:00 2001 From: Evan Reynolds Date: Wed, 10 Jun 2026 15:38:22 -0700 Subject: [PATCH 1/2] Signal work --- temporalio/client/_impl.py | 73 ++- temporalio/nexus/_operation_context.py | 25 + tests/conftest.py | 2 + tests/nexus/test_signal_link_propagation.py | 475 ++++++++++++++++++ .../nexus/test_signal_link_propagation_e2e.py | 341 +++++++++++++ 5 files changed, 915 insertions(+), 1 deletion(-) create mode 100644 tests/nexus/test_signal_link_propagation.py create mode 100644 tests/nexus/test_signal_link_propagation_e2e.py diff --git a/temporalio/client/_impl.py b/temporalio/client/_impl.py index 1481c8327..a2d74e5da 100644 --- a/temporalio/client/_impl.py +++ b/temporalio/client/_impl.py @@ -200,6 +200,42 @@ async def start_workflow( start_workflow_response=resp, ) setattr(handle, "__temporal_eagerly_started", eagerly_started) + # If this start / signal-with-start is issued from inside a Nexus operation handler (but + # not as the nexus-backing workflow, whose links are handled separately by + # WorkflowRunOperationContext.start_workflow), capture the backlink the server returned so + # the caller workflow's Nexus history event links to the callee event. + nexus_ctx = self._try_nexus_start_operation_context() + if ( + nexus_ctx is not None + and not temporalio.nexus._operation_context._in_nexus_backing_workflow_start_context() + ): + if isinstance( + resp, + temporalio.api.workflowservice.v1.SignalWithStartWorkflowExecutionResponse, + ): + # Server >= 1.31 with EnableCHASMSignalBacklinks returns signal_link pointing at + # the WorkflowExecutionSignaled event; older servers leave it unset. + if resp.HasField("signal_link"): + nexus_ctx._add_backlink(resp.signal_link) + else: + if resp.HasField("link"): + nexus_ctx._add_backlink(resp.link) + else: + # Older servers (pre-1.31) don't return a link on the start response. + # Fabricate one pointing at the started workflow's WorkflowExecutionStarted + # event so the caller still gets a backlink. + nexus_ctx._add_backlink( + temporalio.api.common.v1.Link( + workflow_event=temporalio.api.common.v1.Link.WorkflowEvent( + namespace=self._client.namespace, + workflow_id=req.workflow_id, + run_id=resp.run_id, + event_ref=temporalio.api.common.v1.Link.WorkflowEvent.EventReference( + event_type=temporalio.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, + ), + ) + ) + ) return handle async def _build_start_workflow_execution_request( @@ -241,6 +277,14 @@ async def _build_start_workflow_execution_request( req.on_conflict_options.attach_request_id = True req.on_conflict_options.attach_completion_callbacks = True req.on_conflict_options.attach_links = True + else: + # If this is a plain start_workflow issued from inside a Nexus operation handler + # (not the nexus-backing workflow, which already carries inbound links via + # input.links), forward the inbound Nexus task links so the started callee's + # WorkflowExecutionStarted event links back to the caller. + nexus_ctx = self._try_nexus_start_operation_context() + if nexus_ctx is not None: + req.links.extend(nexus_ctx._get_outgoing_request_links()) return req @@ -267,6 +311,13 @@ async def _build_signal_with_start_workflow_execution_request( await data_converter.encode(input.start_signal_args) ) await self._populate_start_workflow_execution_request(req, input) + # If this signal-with-start is issued from inside a Nexus operation handler (but not the + # nexus-backing workflow), forward the inbound Nexus task links so both the callee's + # WorkflowExecutionStarted and WorkflowExecutionSignaled events link back to the caller. + if not temporalio.nexus._operation_context._in_nexus_backing_workflow_start_context(): + nexus_ctx = self._try_nexus_start_operation_context() + if nexus_ctx is not None: + req.links.extend(nexus_ctx._get_outgoing_request_links()) return req async def _build_update_with_start_start_workflow_execution_request( @@ -500,9 +551,18 @@ async def signal_workflow(self, input: SignalWorkflowInput) -> None: req.input.payloads.extend(await data_converter.encode(input.args)) if input.headers is not None: # type:ignore[reportUnnecessaryComparison] await self._apply_headers(input.headers, req.header.fields) - await self._client.workflow_service.signal_workflow_execution( + # If this signal is issued from inside a Nexus operation handler, forward the inbound + # Nexus task links so the WorkflowExecutionSignaled event links back to the caller. + nexus_ctx = self._try_nexus_start_operation_context() + if nexus_ctx is not None: + req.links.extend(nexus_ctx._get_outgoing_request_links()) + resp = await self._client.workflow_service.signal_workflow_execution( req, retry=True, metadata=input.rpc_metadata, timeout=input.rpc_timeout ) + # Server >= 1.31 with EnableCHASMSignalBacklinks returns a backlink pointing at the + # signal event; older servers leave it unset. Propagate when present. + if nexus_ctx is not None and resp.HasField("link"): + nexus_ctx._add_backlink(resp.link) async def terminate_workflow(self, input: TerminateWorkflowInput) -> None: data_converter = self._client.data_converter._with_contexts( @@ -1636,6 +1696,17 @@ async def count_nexus_operations( ) ) + @staticmethod + def _try_nexus_start_operation_context() -> ( + temporalio.nexus._operation_context._TemporalStartOperationContext | None + ): + """The Nexus start-operation context if a handler is currently running, else None.""" + return ( + temporalio.nexus._operation_context._temporal_start_operation_context.get( + None + ) + ) + async def _apply_headers( self, source: Mapping[str, temporalio.api.common.v1.Payload] | None, diff --git a/temporalio/nexus/_operation_context.py b/temporalio/nexus/_operation_context.py index 0d9d11449..b323f54be 100644 --- a/temporalio/nexus/_operation_context.py +++ b/temporalio/nexus/_operation_context.py @@ -278,6 +278,31 @@ def _add_outbound_links( ) return workflow_handle + def _get_outgoing_request_links(self) -> list[temporalio.api.common.v1.Link]: + """Inbound Nexus task links to attach to RPCs the operation handler issues. + + When the operation handler signals, signal-with-starts, or starts a workflow, these + links are added to the request's ``links`` field so the callee's history event links + back to the caller workflow that scheduled this Nexus operation. + """ + return self._get_links() + + def _add_backlink(self, link: temporalio.api.common.v1.Link | None) -> None: + """Append a backlink returned by an RPC the operation handler issued. + + ``link`` is the ``common.v1.Link`` returned on a signal, signal-with-start, or start + response (or ``None`` against a server that did not return one). When present and of the + ``workflow_event`` variant, it is converted to a Nexus link and added to the operation's + outbound links so the caller workflow's Nexus history event links to the callee event. + + This is only safe to call from the single thread/task that runs the operation handler. + """ + if link is None or not link.HasField("workflow_event"): + return + self.nexus_context.outbound_links.append( + workflow_event_to_nexus_link(link.workflow_event) + ) + class WorkflowRunOperationContext(StartOperationContext): """Context received by a workflow run operation.""" diff --git a/tests/conftest.py b/tests/conftest.py index 1e1db3730..c5c213e9a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -133,6 +133,8 @@ async def env(env_type: str) -> AsyncGenerator[WorkflowEnvironment, None]: "--dynamic-config-value", "history.enableChasmCallbacks=true", "--dynamic-config-value", + "history.enableCHASMSignalBacklinks=true", + "--dynamic-config-value", "nexusoperation.enableStandalone=true", "--dynamic-config-value", 'system.system.refreshNexusEndpointsMinWait="0s"', diff --git a/tests/nexus/test_signal_link_propagation.py b/tests/nexus/test_signal_link_propagation.py new file mode 100644 index 000000000..dbdb17cb1 --- /dev/null +++ b/tests/nexus/test_signal_link_propagation.py @@ -0,0 +1,475 @@ +"""Unit tests for Nexus signal-backlink propagation. + +These mirror the Java SDK's RootWorkflowClientInvokerLinkPropagationTest. They exercise the +in/out link propagation that happens when a Nexus operation handler issues a signal, +signal-with-start, or start-workflow RPC, against a mocked workflow service. The corresponding +end-to-end behavior requires a real server with EnableCHASMSignalBacklinks=true and is therefore +not covered here. +""" + +from __future__ import annotations + +from typing import Any +from unittest import mock + +import nexusrpc +import nexusrpc.handler +import pytest +from nexusrpc.handler import ( + OperationHandler, + StartOperationContext, + StartOperationResultAsync, + service_handler, + sync_operation, +) +from nexusrpc.handler._decorators import operation_handler + +import temporalio.api.common.v1 +import temporalio.api.enums.v1 +import temporalio.api.nexus.v1 +import temporalio.api.workflowservice.v1 +import temporalio.common +import temporalio.converter +import temporalio.nexus._link_conversion +import temporalio.nexus._operation_context +from temporalio.client._impl import _ClientImpl +from temporalio.client._interceptor import ( + SignalWorkflowInput, + StartWorkflowInput, +) +from temporalio.nexus._link_conversion import _LinkType +from temporalio.worker._nexus import _NexusTaskCancellation, _NexusWorker + +NAMESPACE = "test-namespace" +WORKFLOW_ID = "wf-target" + + +def _workflow_event_link( + workflow_id: str, + run_id: str, + event_type: temporalio.api.enums.v1.EventType.ValueType, +) -> temporalio.api.common.v1.Link: + return temporalio.api.common.v1.Link( + workflow_event=temporalio.api.common.v1.Link.WorkflowEvent( + namespace=NAMESPACE, + workflow_id=workflow_id, + run_id=run_id, + event_ref=temporalio.api.common.v1.Link.WorkflowEvent.EventReference( + event_type=event_type, + ), + ) + ) + + +def _inbound_nexus_link() -> temporalio.api.common.v1.Link: + return _workflow_event_link( + "caller-wf", + "caller-run", + temporalio.api.enums.v1.EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, + ) + + +@pytest.fixture +def nexus_ctx() -> Any: + """Install a Nexus start-operation context with a single inbound link. + + The inbound link is provided in nexusrpc.Link form, exactly as the worker populates it from + the inbound Nexus task. Yields the temporal context so tests can inspect outbound_links. + """ + inbound = temporalio.nexus._link_conversion.workflow_event_to_nexus_link( + _inbound_nexus_link().workflow_event + ) + nexus_context = nexusrpc.handler.StartOperationContext( + service="svc", + operation="op", + headers={}, + request_id="req-id", + callback_url=None, + inbound_links=[inbound], + callback_headers={}, + task_cancellation=_NexusTaskCancellation(), + ) + ctx = temporalio.nexus._operation_context._TemporalStartOperationContext( + nexus_context=nexus_context, + client=mock.MagicMock(namespace=NAMESPACE), + info=lambda: temporalio.nexus.Info( + endpoint="endpoint", namespace=NAMESPACE, task_queue="tq" + ), + _runtime_metric_meter=mock.MagicMock(), + _worker_shutdown_event=mock.MagicMock(), + ) + token = temporalio.nexus._operation_context._temporal_start_operation_context.set( + ctx + ) + try: + yield ctx + finally: + temporalio.nexus._operation_context._temporal_start_operation_context.reset( + token + ) + + +def _make_client_impl(workflow_service: Any) -> _ClientImpl: + client = mock.MagicMock() + client.namespace = NAMESPACE + client.identity = "test-identity" + client.workflow_service = workflow_service + client.data_converter = temporalio.converter.DataConverter.default + return _ClientImpl(client) + + +def _signal_input() -> SignalWorkflowInput: + return SignalWorkflowInput( + id=WORKFLOW_ID, + run_id=None, + signal="test-signal", + args=[], + headers={}, + rpc_metadata={}, + rpc_timeout=None, + ) + + +def _start_input(start_signal: str | None = None) -> StartWorkflowInput: + return StartWorkflowInput( + workflow="TestWorkflow", + args=[], + id=WORKFLOW_ID, + task_queue="tq", + execution_timeout=None, + run_timeout=None, + task_timeout=None, + id_reuse_policy=temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, + id_conflict_policy=temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED, + retry_policy=None, + cron_schedule="", + memo=None, + search_attributes=None, + start_delay=None, + headers={}, + start_signal=start_signal, + start_signal_args=[], + static_summary=None, + static_details=None, + ret_type=None, + rpc_metadata={}, + rpc_timeout=None, + request_eager_start=False, + priority=temporalio.common.Priority.default, + callbacks=[], + links=[], + request_id=None, + versioning_override=None, + ) + + +def _outbound_link_urls(ctx: Any) -> list[str]: + return [link.url for link in ctx.nexus_context.outbound_links] + + +# ── signal ──────────────────────────────────────────────────────────────────────────────── + + +async def test_signal_forwards_inbound_links_and_captures_response_backlink( + nexus_ctx: Any, +) -> None: + response_link = _workflow_event_link( + WORKFLOW_ID, + "target-run", + temporalio.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED, + ) + workflow_service = mock.MagicMock() + workflow_service.signal_workflow_execution = mock.AsyncMock( + return_value=temporalio.api.workflowservice.v1.SignalWorkflowExecutionResponse( + link=response_link + ) + ) + impl = _make_client_impl(workflow_service) + + await impl.signal_workflow(_signal_input()) + + # Forward: the request carries the single inbound link. + sent = workflow_service.signal_workflow_execution.call_args.args[0] + assert len(sent.links) == 1 + assert sent.links[0] == _inbound_nexus_link() + + # Backward: the response link is added to the operation's outbound links (as a Nexus link). + assert len(nexus_ctx.nexus_context.outbound_links) == 1 + assert "wf-target" in _outbound_link_urls(nexus_ctx)[0] + + +async def test_signal_against_older_server_captures_no_backlink( + nexus_ctx: Any, +) -> None: + workflow_service = mock.MagicMock() + workflow_service.signal_workflow_execution = mock.AsyncMock( + return_value=temporalio.api.workflowservice.v1.SignalWorkflowExecutionResponse() + ) + impl = _make_client_impl(workflow_service) + + await impl.signal_workflow(_signal_input()) + + # Forward direction still works regardless of server version. + sent = workflow_service.signal_workflow_execution.call_args.args[0] + assert len(sent.links) == 1 + + # Backward: no backlink because the server returned no link. + assert nexus_ctx.nexus_context.outbound_links == [] + + +async def test_multiple_signals_accumulate_all_backlinks(nexus_ctx: Any) -> None: + first = _workflow_event_link( + "callee-a", + "run-a", + temporalio.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED, + ) + second = _workflow_event_link( + "callee-b", + "run-b", + temporalio.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED, + ) + workflow_service = mock.MagicMock() + workflow_service.signal_workflow_execution = mock.AsyncMock( + side_effect=[ + temporalio.api.workflowservice.v1.SignalWorkflowExecutionResponse( + link=first + ), + temporalio.api.workflowservice.v1.SignalWorkflowExecutionResponse( + link=second + ), + ] + ) + impl = _make_client_impl(workflow_service) + + await impl.signal_workflow(_signal_input()) + await impl.signal_workflow(_signal_input()) + + urls = _outbound_link_urls(nexus_ctx) + assert len(urls) == 2 + assert "callee-a" in urls[0] + assert "callee-b" in urls[1] + + +async def test_signal_outside_nexus_context_does_not_touch_links() -> None: + workflow_service = mock.MagicMock() + workflow_service.signal_workflow_execution = mock.AsyncMock( + return_value=temporalio.api.workflowservice.v1.SignalWorkflowExecutionResponse() + ) + impl = _make_client_impl(workflow_service) + + await impl.signal_workflow(_signal_input()) + + sent = workflow_service.signal_workflow_execution.call_args.args[0] + assert len(sent.links) == 0 + + +# ── signal-with-start ─────────────────────────────────────────────────────────────────────── + + +async def test_signal_with_start_forwards_inbound_links_and_captures_backlink( + nexus_ctx: Any, +) -> None: + response_link = _workflow_event_link( + WORKFLOW_ID, + "target-run", + temporalio.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED, + ) + workflow_service = mock.MagicMock() + workflow_service.signal_with_start_workflow_execution = mock.AsyncMock( + return_value=temporalio.api.workflowservice.v1.SignalWithStartWorkflowExecutionResponse( + run_id="target-run", + signal_link=response_link, + ) + ) + impl = _make_client_impl(workflow_service) + + await impl.start_workflow(_start_input(start_signal="test-signal")) + + # Forward: the SignalWithStart request carries the inbound link. + sent = workflow_service.signal_with_start_workflow_execution.call_args.args[0] + assert len(sent.links) == 1 + assert sent.links[0] == _inbound_nexus_link() + + # Backward: response.signal_link is captured as an outbound Nexus link. + assert len(nexus_ctx.nexus_context.outbound_links) == 1 + assert "wf-target" in _outbound_link_urls(nexus_ctx)[0] + + +async def test_signal_with_start_against_older_server_captures_no_backlink( + nexus_ctx: Any, +) -> None: + workflow_service = mock.MagicMock() + workflow_service.signal_with_start_workflow_execution = mock.AsyncMock( + return_value=temporalio.api.workflowservice.v1.SignalWithStartWorkflowExecutionResponse( + run_id="target-run", + ) + ) + impl = _make_client_impl(workflow_service) + + await impl.start_workflow(_start_input(start_signal="test-signal")) + + sent = workflow_service.signal_with_start_workflow_execution.call_args.args[0] + assert len(sent.links) == 1 + assert nexus_ctx.nexus_context.outbound_links == [] + + +# ── start ───────────────────────────────────────────────────────────────────────────────── + + +async def test_start_uses_server_link_when_present(nexus_ctx: Any) -> None: + server_link = _workflow_event_link( + WORKFLOW_ID, + "target-run", + temporalio.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, + ) + workflow_service = mock.MagicMock() + workflow_service.start_workflow_execution = mock.AsyncMock( + return_value=temporalio.api.workflowservice.v1.StartWorkflowExecutionResponse( + run_id="target-run", + link=server_link, + ) + ) + impl = _make_client_impl(workflow_service) + + await impl.start_workflow(_start_input()) + + assert len(nexus_ctx.nexus_context.outbound_links) == 1 + [link] = nexus_ctx.nexus_context.outbound_links + assert link.type == _LinkType.WORKFLOW.value + assert "wf-target" in link.url + # WorkflowExecutionStarted event type round-trips through the URL. + assert "WorkflowExecutionStarted" in link.url + + +async def test_start_fabricates_link_when_server_omits_it(nexus_ctx: Any) -> None: + workflow_service = mock.MagicMock() + workflow_service.start_workflow_execution = mock.AsyncMock( + return_value=temporalio.api.workflowservice.v1.StartWorkflowExecutionResponse( + run_id="target-run", + ) + ) + impl = _make_client_impl(workflow_service) + + await impl.start_workflow(_start_input()) + + assert len(nexus_ctx.nexus_context.outbound_links) == 1 + [link] = nexus_ctx.nexus_context.outbound_links + # Fabricated link points at the started workflow's WorkflowExecutionStarted event. + assert "wf-target" in link.url + assert "target-run" in link.url + assert "WorkflowExecutionStarted" in link.url + + +async def test_start_outside_nexus_context_does_not_touch_links() -> None: + workflow_service = mock.MagicMock() + workflow_service.start_workflow_execution = mock.AsyncMock( + return_value=temporalio.api.workflowservice.v1.StartWorkflowExecutionResponse( + run_id="target-run", + ) + ) + impl = _make_client_impl(workflow_service) + + # Should not raise even though there is no Nexus context. + handle = await impl.start_workflow(_start_input()) + assert handle.result_run_id == "target-run" + sent = workflow_service.start_workflow_execution.call_args.args[0] + assert len(sent.links) == 0 + + +# ── handler-level: backlinks land on the StartOperationResponse ────────────────────────────── + +# A backlink that a handler stashes on ctx.outbound_links, mimicking what a signal RPC inside the +# handler would do via _add_backlink. +_BACKLINK = temporalio.nexus._link_conversion.workflow_event_to_nexus_link( + temporalio.api.common.v1.Link.WorkflowEvent( + namespace=NAMESPACE, + workflow_id="callee-wf", + run_id="callee-run-id", + event_ref=temporalio.api.common.v1.Link.WorkflowEvent.EventReference( + event_type=temporalio.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED, + ), + ) +) + + +class _AsyncBacklinkOperation(OperationHandler): + """Stashes a backlink then returns an async result, simulating a signaling handler.""" + + async def start( + self, ctx: StartOperationContext, input: str + ) -> StartOperationResultAsync: + ctx.outbound_links.append(_BACKLINK) + return StartOperationResultAsync(token=input) + + async def cancel(self, ctx: Any, token: str) -> None: ... + + +@service_handler +class _BacklinkStashingService: + @sync_operation + async def sync_op(self, ctx: StartOperationContext, _input: str) -> str: + # Stash a backlink and return a sync result. + ctx.outbound_links.append(_BACKLINK) + return "result" + + @operation_handler + def async_op(self) -> OperationHandler[str, str]: + return _AsyncBacklinkOperation() + + +def _make_nexus_worker() -> _NexusWorker: + return _NexusWorker( + bridge_worker=lambda: mock.MagicMock(), + client=mock.MagicMock(namespace=NAMESPACE), + namespace=NAMESPACE, + task_queue="tq", + service_handlers=[_BacklinkStashingService()], + data_converter=temporalio.converter.DataConverter.default, + interceptors=[], + metric_meter=mock.MagicMock(), + executor=None, + ) + + +def _start_request( + operation: str, input: str +) -> temporalio.api.nexus.v1.StartOperationRequest: + [payload] = ( + temporalio.converter.DataConverter.default.payload_converter.to_payloads( + [input] + ) + ) + return temporalio.api.nexus.v1.StartOperationRequest( + service="_BacklinkStashingService", + operation=operation, + payload=payload, + ) + + +async def test_sync_response_includes_signal_backlinks() -> None: + worker = _make_nexus_worker() + response = await worker._start_operation( + _start_request("sync_op", "input"), + headers={}, + cancellation=_NexusTaskCancellation(), + request_deadline=None, + endpoint="endpoint", + ) + assert response.HasField("sync_success") + assert len(response.sync_success.links) == 1 + assert "callee-wf" in response.sync_success.links[0].url + + +async def test_async_response_includes_signal_backlinks() -> None: + worker = _make_nexus_worker() + response = await worker._start_operation( + _start_request("async_op", "op-token"), + headers={}, + cancellation=_NexusTaskCancellation(), + request_deadline=None, + endpoint="endpoint", + ) + assert response.HasField("async_success") + assert response.async_success.operation_token == "op-token" + assert len(response.async_success.links) == 1 + assert "callee-wf" in response.async_success.links[0].url diff --git a/tests/nexus/test_signal_link_propagation_e2e.py b/tests/nexus/test_signal_link_propagation_e2e.py new file mode 100644 index 000000000..039a46016 --- /dev/null +++ b/tests/nexus/test_signal_link_propagation_e2e.py @@ -0,0 +1,341 @@ +"""End-to-end (server-based) tests for Nexus signal-backlink propagation. + +These mirror the Java SDK's ``SignalOperationLinkingTest`` and the Go SDK's +``TestNexusSignalOperationLinks``. They exercise, against a real server, the bidirectional +link propagation that occurs when a Nexus operation handler signals (or signal-with-starts) a +workflow: + +- Forward: the caller's ``NexusOperationScheduled`` event is referenced by the callee's + ``WorkflowExecutionSignaled`` event (attached to the signal RPC the handler issues). +- Backward: a backlink pointing at the callee's ``WorkflowExecutionSignaled`` event lands on + the caller's ``NexusOperationCompleted`` event (sync handler) or ``NexusOperationStarted`` + event (async handler). + +The backward direction is produced server-side (temporalio/temporal#9897) and is gated by +``history.enableCHASMSignalBacklinks=true`` (added to the local dev-server args in +``tests/conftest.py``). The server populates the backlink's reference via ``RequestIdReference`` +rather than ``EventReference``, so backlink assertions tolerate both oneof variants of +``common.v1.Link.WorkflowEvent.reference`` (see ``_backlink_event_type``). When run against a +server that does not emit the backlink, the backward assertions are skipped. +""" + +from __future__ import annotations + +import uuid + +import pytest +from nexusrpc import Operation, service +from nexusrpc.handler import ( + OperationHandler, + StartOperationContext, + StartOperationResultAsync, + service_handler, + sync_operation, +) +from nexusrpc.handler._decorators import operation_handler + +import temporalio.api.common.v1 +import temporalio.api.enums.v1 +import temporalio.api.history.v1 +from temporalio import nexus, workflow +from temporalio.client import Client, WorkflowHistory +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker +from tests.helpers.nexus import make_nexus_endpoint_name + +EventType = temporalio.api.enums.v1.EventType + + +# ── Service definition ────────────────────────────────────────────────────────────────────── + + +@service +class SignalingService: + # input encodes ":". + op: Operation[str, str] + + +# ── Callee workflow ─────────────────────────────────────────────────────────────────────── + + +@workflow.defn +class SignalCalleeWorkflow: + def __init__(self) -> None: + self._received: list[str] = [] + self._expected = 1 + + @workflow.run + async def run(self, expected_signals: int) -> str: + self._expected = expected_signals + await workflow.wait_condition(lambda: len(self._received) >= self._expected) + return ",".join(self._received) + + @workflow.signal + def ping(self, msg: str) -> None: + self._received.append(msg) + + +# ── Caller workflow ─────────────────────────────────────────────────────────────────────── + + +@workflow.defn +class SignalCallerWorkflow: + @workflow.run + async def run(self, mode: str, callee_id: str, task_queue: str) -> str: + client = workflow.create_nexus_client( + service=SignalingService, + endpoint=make_nexus_endpoint_name(task_queue), + ) + return await client.execute_operation( + SignalingService.op, f"{mode}:{callee_id}" + ) + + +# ── Nexus service handler ───────────────────────────────────────────────────────────────── + +MODE_SYNC = "sync" +MODE_ASYNC = "async" + + +class _AsyncSignalingOperation(OperationHandler[str, str]): + """Signal-with-starts the callee then returns an async result. + + The backlink stashed on ``ctx.outbound_links`` by the signal-with-start RPC is carried on + the start-operation response, landing on the caller's ``NexusOperationStarted`` event. + """ + + async def start( + self, ctx: StartOperationContext, input: str + ) -> StartOperationResultAsync: + _, callee_id = input.split(":", 1) + await _signal_with_start(callee_id, "async-signal") + return StartOperationResultAsync(token=f"async-op-{uuid.uuid4()}") + + async def cancel(self, ctx, token: str) -> None: # type: ignore[no-untyped-def] + raise NotImplementedError + + async def fetch_info(self, ctx, token: str): # type: ignore[no-untyped-def] + raise NotImplementedError + + async def fetch_result(self, ctx, token: str): # type: ignore[no-untyped-def] + raise NotImplementedError + + +@service_handler(service=SignalingService) +class SignalingServiceHandler: + @sync_operation + async def op(self, _ctx: StartOperationContext, input: str) -> str: + # Synchronous path: signal-with-start the callee (first signal) then plain-signal it + # (second signal). Both backlinks are carried on the sync start-operation response and + # land on the caller's NexusOperationCompleted event. + _, callee_id = input.split(":", 1) + await _signal_with_start(callee_id, "first") + await ( + nexus.client() + .get_workflow_handle(callee_id) + .signal(SignalCalleeWorkflow.ping, "second") + ) + return "ok:sync" + + +# A separate service exposing only the async operation, so the caller can address it by name. +@service +class AsyncSignalingService: + op: Operation[str, str] + + +@service_handler(service=AsyncSignalingService) +class AsyncSignalingServiceHandler: + @operation_handler + def op(self) -> OperationHandler[str, str]: + return _AsyncSignalingOperation() + + +async def _signal_with_start(callee_id: str, payload: str) -> None: + # signal-with-start exercises the SignalWithStartWorkflowExecutionResponse.signal_link + # backlink path in temporalio.client._impl. + await nexus.client().start_workflow( + SignalCalleeWorkflow.run, + 2 if payload == "first" else 1, + id=callee_id, + task_queue=nexus.info().task_queue, + start_signal="ping", + start_signal_args=[payload], + ) + + +@workflow.defn +class AsyncSignalCallerWorkflow: + @workflow.run + async def run(self, callee_id: str, task_queue: str) -> str: + client = workflow.create_nexus_client( + service=AsyncSignalingService, + endpoint=make_nexus_endpoint_name(task_queue), + ) + handle = await client.start_operation( + AsyncSignalingService.op, f"{MODE_ASYNC}:{callee_id}" + ) + # Do not await the result: the async op never completes (no completion is delivered). + # Returning the token confirms the operation reached the Started state, whose history + # event carries the backlink. + return handle.operation_token or "async-started" + + +# ── Assertion helpers ─────────────────────────────────────────────────────────────────────── + + +def _events_of_type( + history: WorkflowHistory, + event_type: temporalio.api.enums.v1.EventType.ValueType, +) -> list[temporalio.api.history.v1.HistoryEvent]: + return [e for e in history.events if e.event_type == event_type] + + +def _backlink_event_type( + we: temporalio.api.common.v1.Link.WorkflowEvent, +) -> temporalio.api.enums.v1.EventType.ValueType: + # Server PR #9897 keys backlinks via RequestIdReference rather than EventReference; accept + # either oneof variant (matches Java SignalOperationLinkingTest.assertBacklink). + if we.HasField("request_id_ref"): + return we.request_id_ref.event_type + return we.event_ref.event_type + + +def _assert_forward_link( + callee_history: WorkflowHistory, + caller_id: str, + expected_count: int, +) -> None: + signaled = _events_of_type( + callee_history, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED + ) + assert len(signaled) == expected_count, ( + f"expected {expected_count} WorkflowExecutionSignaled events, got {len(signaled)}" + ) + for event in signaled: + assert len(event.links) >= 1, ( + "expected at least one link on each WorkflowExecutionSignaled event" + ) + we = event.links[0].workflow_event + assert we.workflow_id == caller_id, ( + "forward link should reference the caller workflow" + ) + assert we.event_ref.event_type == EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED + + +def _assert_backlink( + event: temporalio.api.history.v1.HistoryEvent, callee_id: str +) -> bool: + """Assert the event carries a signal-event backlink to the callee. + + Returns False (and asserts nothing) if no backlink is present, so the test soft-passes + against a server that does not emit backlinks. + """ + if len(event.links) < 1: + return False + we = event.links[0].workflow_event + assert we.workflow_id == callee_id, "backlink should reference the callee workflow" + assert _backlink_event_type(we) == EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED + return True + + +# ── Tests ───────────────────────────────────────────────────────────────────────────────── + + +async def test_sync_signal_operation_links( + client: Client, + env: WorkflowEnvironment, +) -> None: + if env.supports_time_skipping: + pytest.skip("Nexus tests don't work with time-skipping server") + + task_queue = str(uuid.uuid4()) + await env.create_nexus_endpoint(make_nexus_endpoint_name(task_queue), task_queue) + callee_id = f"sync-callee-{uuid.uuid4()}" + caller_id = f"sync-caller-{uuid.uuid4()}" + + async with Worker( + client, + task_queue=task_queue, + nexus_service_handlers=[SignalingServiceHandler()], + workflows=[SignalCallerWorkflow, SignalCalleeWorkflow], + ): + caller_handle = await client.start_workflow( + SignalCallerWorkflow.run, + args=[MODE_SYNC, callee_id, task_queue], + id=caller_id, + task_queue=task_queue, + ) + assert await caller_handle.result() == "ok:sync" + + callee_result = await client.get_workflow_handle(callee_id).result() + assert callee_result == "first,second" + + caller_history = await caller_handle.fetch_history() + callee_history = await client.get_workflow_handle(callee_id).fetch_history() + + # Forward: both signal events on the callee reference the caller's scheduled event. + _assert_forward_link(callee_history, caller_id, expected_count=2) + + # Backward: the single NexusOperationCompleted carries backlinks to the callee. + completed = _events_of_type( + caller_history, EventType.EVENT_TYPE_NEXUS_OPERATION_COMPLETED + ) + assert len(completed) == 1, ( + f"expected exactly one NexusOperationCompleted event, got {len(completed)}" + ) + if not _assert_backlink(completed[0], callee_id): + pytest.skip( + "server did not emit a signal backlink " + "(history.enableCHASMSignalBacklinks not enabled)" + ) + + +async def test_async_signal_operation_links( + client: Client, + env: WorkflowEnvironment, +) -> None: + if env.supports_time_skipping: + pytest.skip("Nexus tests don't work with time-skipping server") + + task_queue = str(uuid.uuid4()) + await env.create_nexus_endpoint(make_nexus_endpoint_name(task_queue), task_queue) + callee_id = f"async-callee-{uuid.uuid4()}" + caller_id = f"async-caller-{uuid.uuid4()}" + + async with Worker( + client, + task_queue=task_queue, + nexus_service_handlers=[AsyncSignalingServiceHandler()], + workflows=[AsyncSignalCallerWorkflow, SignalCalleeWorkflow], + ): + caller_handle = await client.start_workflow( + AsyncSignalCallerWorkflow.run, + args=[callee_id, task_queue], + id=caller_id, + task_queue=task_queue, + ) + # Caller returns once the async operation reaches Started; result is the op token. + assert await caller_handle.result() + + callee_result = await client.get_workflow_handle(callee_id).result() + assert callee_result == "async-signal" + + caller_history = await caller_handle.fetch_history() + callee_history = await client.get_workflow_handle(callee_id).fetch_history() + + # Forward: the single signal event on the callee references the caller's scheduled event. + _assert_forward_link(callee_history, caller_id, expected_count=1) + + # Backward: the backlink lands on NexusOperationStarted for the async response path. + started = _events_of_type( + caller_history, EventType.EVENT_TYPE_NEXUS_OPERATION_STARTED + ) + assert len(started) == 1, ( + f"expected exactly one NexusOperationStarted event, got {len(started)}" + ) + if not _assert_backlink(started[0], callee_id): + pytest.skip( + "server did not emit a signal backlink " + "(history.enableCHASMSignalBacklinks not enabled)" + ) From a580faeaf9e85942a1ebc4689d3b842f99306efa Mon Sep 17 00:00:00 2001 From: Evan Reynolds Date: Thu, 11 Jun 2026 16:13:54 -0700 Subject: [PATCH 2/2] Reviewing --- temporalio/client/_impl.py | 43 ++++++------------- tests/nexus/test_signal_link_propagation.py | 35 ++++++++------- .../nexus/test_signal_link_propagation_e2e.py | 18 ++++---- 3 files changed, 42 insertions(+), 54 deletions(-) diff --git a/temporalio/client/_impl.py b/temporalio/client/_impl.py index a2d74e5da..dc0f99412 100644 --- a/temporalio/client/_impl.py +++ b/temporalio/client/_impl.py @@ -200,42 +200,25 @@ async def start_workflow( start_workflow_response=resp, ) setattr(handle, "__temporal_eagerly_started", eagerly_started) - # If this start / signal-with-start is issued from inside a Nexus operation handler (but - # not as the nexus-backing workflow, whose links are handled separately by - # WorkflowRunOperationContext.start_workflow), capture the backlink the server returned so - # the caller workflow's Nexus history event links to the callee event. + # If this signal-with-start is issued from inside a Nexus operation handler (but not as the + # nexus-backing workflow, whose links are handled separately by + # WorkflowRunOperationContext.start_workflow), capture the signal backlink the server + # returned so the caller workflow's Nexus history event links to the signaled event. A + # plain start does not capture a backlink: it only forwards the inbound links onto the + # start request. nexus_ctx = self._try_nexus_start_operation_context() if ( nexus_ctx is not None and not temporalio.nexus._operation_context._in_nexus_backing_workflow_start_context() - ): - if isinstance( + and isinstance( resp, temporalio.api.workflowservice.v1.SignalWithStartWorkflowExecutionResponse, - ): - # Server >= 1.31 with EnableCHASMSignalBacklinks returns signal_link pointing at - # the WorkflowExecutionSignaled event; older servers leave it unset. - if resp.HasField("signal_link"): - nexus_ctx._add_backlink(resp.signal_link) - else: - if resp.HasField("link"): - nexus_ctx._add_backlink(resp.link) - else: - # Older servers (pre-1.31) don't return a link on the start response. - # Fabricate one pointing at the started workflow's WorkflowExecutionStarted - # event so the caller still gets a backlink. - nexus_ctx._add_backlink( - temporalio.api.common.v1.Link( - workflow_event=temporalio.api.common.v1.Link.WorkflowEvent( - namespace=self._client.namespace, - workflow_id=req.workflow_id, - run_id=resp.run_id, - event_ref=temporalio.api.common.v1.Link.WorkflowEvent.EventReference( - event_type=temporalio.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, - ), - ) - ) - ) + ) + ): + # Server >= 1.31 with EnableCHASMSignalBacklinks returns signal_link pointing at + # the WorkflowExecutionSignaled event; older servers leave it unset. + if resp.HasField("signal_link"): + nexus_ctx._add_backlink(resp.signal_link) return handle async def _build_start_workflow_execution_request( diff --git a/tests/nexus/test_signal_link_propagation.py b/tests/nexus/test_signal_link_propagation.py index dbdb17cb1..0305477ea 100644 --- a/tests/nexus/test_signal_link_propagation.py +++ b/tests/nexus/test_signal_link_propagation.py @@ -37,7 +37,6 @@ SignalWorkflowInput, StartWorkflowInput, ) -from temporalio.nexus._link_conversion import _LinkType from temporalio.worker._nexus import _NexusTaskCancellation, _NexusWorker NAMESPACE = "test-namespace" @@ -316,7 +315,9 @@ async def test_signal_with_start_against_older_server_captures_no_backlink( # ── start ───────────────────────────────────────────────────────────────────────────────── -async def test_start_uses_server_link_when_present(nexus_ctx: Any) -> None: +async def test_start_forwards_inbound_links_and_captures_no_backlink( + nexus_ctx: Any, +) -> None: server_link = _workflow_event_link( WORKFLOW_ID, "target-run", @@ -333,15 +334,18 @@ async def test_start_uses_server_link_when_present(nexus_ctx: Any) -> None: await impl.start_workflow(_start_input()) - assert len(nexus_ctx.nexus_context.outbound_links) == 1 - [link] = nexus_ctx.nexus_context.outbound_links - assert link.type == _LinkType.WORKFLOW.value - assert "wf-target" in link.url - # WorkflowExecutionStarted event type round-trips through the URL. - assert "WorkflowExecutionStarted" in link.url + # Forward: the start request carries the single inbound link. + sent = workflow_service.start_workflow_execution.call_args.args[0] + assert len(sent.links) == 1 + assert sent.links[0] == _inbound_nexus_link() + + # Backward: a plain start does not capture a backlink, even when the server returns one. + assert nexus_ctx.nexus_context.outbound_links == [] -async def test_start_fabricates_link_when_server_omits_it(nexus_ctx: Any) -> None: +async def test_start_against_older_server_captures_no_backlink( + nexus_ctx: Any, +) -> None: workflow_service = mock.MagicMock() workflow_service.start_workflow_execution = mock.AsyncMock( return_value=temporalio.api.workflowservice.v1.StartWorkflowExecutionResponse( @@ -352,12 +356,13 @@ async def test_start_fabricates_link_when_server_omits_it(nexus_ctx: Any) -> Non await impl.start_workflow(_start_input()) - assert len(nexus_ctx.nexus_context.outbound_links) == 1 - [link] = nexus_ctx.nexus_context.outbound_links - # Fabricated link points at the started workflow's WorkflowExecutionStarted event. - assert "wf-target" in link.url - assert "target-run" in link.url - assert "WorkflowExecutionStarted" in link.url + # Forward direction still works regardless of server version. + sent = workflow_service.start_workflow_execution.call_args.args[0] + assert len(sent.links) == 1 + assert sent.links[0] == _inbound_nexus_link() + + # Backward: a plain start never fabricates a backlink. + assert nexus_ctx.nexus_context.outbound_links == [] async def test_start_outside_nexus_context_does_not_touch_links() -> None: diff --git a/tests/nexus/test_signal_link_propagation_e2e.py b/tests/nexus/test_signal_link_propagation_e2e.py index 039a46016..3ab2cb744 100644 --- a/tests/nexus/test_signal_link_propagation_e2e.py +++ b/tests/nexus/test_signal_link_propagation_e2e.py @@ -59,7 +59,7 @@ class SignalingService: @workflow.defn -class SignalCalleeWorkflow: +class CalleeWorkflow: def __init__(self) -> None: self._received: list[str] = [] self._expected = 1 @@ -79,7 +79,7 @@ def ping(self, msg: str) -> None: @workflow.defn -class SignalCallerWorkflow: +class CallerWorkflow: @workflow.run async def run(self, mode: str, callee_id: str, task_queue: str) -> str: client = workflow.create_nexus_client( @@ -133,7 +133,7 @@ async def op(self, _ctx: StartOperationContext, input: str) -> str: await ( nexus.client() .get_workflow_handle(callee_id) - .signal(SignalCalleeWorkflow.ping, "second") + .signal(CalleeWorkflow.ping, "second") ) return "ok:sync" @@ -155,7 +155,7 @@ async def _signal_with_start(callee_id: str, payload: str) -> None: # signal-with-start exercises the SignalWithStartWorkflowExecutionResponse.signal_link # backlink path in temporalio.client._impl. await nexus.client().start_workflow( - SignalCalleeWorkflow.run, + CalleeWorkflow.run, 2 if payload == "first" else 1, id=callee_id, task_queue=nexus.info().task_queue, @@ -251,17 +251,17 @@ async def test_sync_signal_operation_links( task_queue = str(uuid.uuid4()) await env.create_nexus_endpoint(make_nexus_endpoint_name(task_queue), task_queue) - callee_id = f"sync-callee-{uuid.uuid4()}" - caller_id = f"sync-caller-{uuid.uuid4()}" + callee_id = f"callee-{uuid.uuid4()}" + caller_id = f"caller-{uuid.uuid4()}" async with Worker( client, task_queue=task_queue, nexus_service_handlers=[SignalingServiceHandler()], - workflows=[SignalCallerWorkflow, SignalCalleeWorkflow], + workflows=[CallerWorkflow, CalleeWorkflow], ): caller_handle = await client.start_workflow( - SignalCallerWorkflow.run, + CallerWorkflow.run, args=[MODE_SYNC, callee_id, task_queue], id=caller_id, task_queue=task_queue, @@ -307,7 +307,7 @@ async def test_async_signal_operation_links( client, task_queue=task_queue, nexus_service_handlers=[AsyncSignalingServiceHandler()], - workflows=[AsyncSignalCallerWorkflow, SignalCalleeWorkflow], + workflows=[AsyncSignalCallerWorkflow, CalleeWorkflow], ): caller_handle = await client.start_workflow( AsyncSignalCallerWorkflow.run,