Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion temporalio/client/_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,25 @@ async def start_workflow(
start_workflow_response=resp,
)
setattr(handle, "__temporal_eagerly_started", eagerly_started)
# If this signal-with-start is issued from inside a Nexus operation handler (but not as the
# nexus-backing workflow, whose links are handled separately by
# WorkflowRunOperationContext.start_workflow), capture the signal backlink the server
# returned so the caller workflow's Nexus history event links to the signaled event. A
# plain start does not capture a backlink: it only forwards the inbound links onto the
# start request.
nexus_ctx = self._try_nexus_start_operation_context()
if (
nexus_ctx is not None
and not temporalio.nexus._operation_context._in_nexus_backing_workflow_start_context()
and isinstance(
resp,
temporalio.api.workflowservice.v1.SignalWithStartWorkflowExecutionResponse,
)
):
# Server >= 1.31 with EnableCHASMSignalBacklinks returns signal_link pointing at
# the WorkflowExecutionSignaled event; older servers leave it unset.
if resp.HasField("signal_link"):
nexus_ctx._add_backlink(resp.signal_link)
return handle

async def _build_start_workflow_execution_request(
Expand Down Expand Up @@ -241,6 +260,14 @@ async def _build_start_workflow_execution_request(
req.on_conflict_options.attach_request_id = True
req.on_conflict_options.attach_completion_callbacks = True
req.on_conflict_options.attach_links = True
else:
# If this is a plain start_workflow issued from inside a Nexus operation handler
# (not the nexus-backing workflow, which already carries inbound links via
# input.links), forward the inbound Nexus task links so the started callee's
# WorkflowExecutionStarted event links back to the caller.
nexus_ctx = self._try_nexus_start_operation_context()
if nexus_ctx is not None:
req.links.extend(nexus_ctx._get_outgoing_request_links())

return req

Expand All @@ -267,6 +294,13 @@ async def _build_signal_with_start_workflow_execution_request(
await data_converter.encode(input.start_signal_args)
)
await self._populate_start_workflow_execution_request(req, input)
# If this signal-with-start is issued from inside a Nexus operation handler (but not the
# nexus-backing workflow), forward the inbound Nexus task links so both the callee's
# WorkflowExecutionStarted and WorkflowExecutionSignaled events link back to the caller.
if not temporalio.nexus._operation_context._in_nexus_backing_workflow_start_context():
nexus_ctx = self._try_nexus_start_operation_context()
if nexus_ctx is not None:
req.links.extend(nexus_ctx._get_outgoing_request_links())
return req

async def _build_update_with_start_start_workflow_execution_request(
Expand Down Expand Up @@ -500,9 +534,18 @@ async def signal_workflow(self, input: SignalWorkflowInput) -> None:
req.input.payloads.extend(await data_converter.encode(input.args))
if input.headers is not None: # type:ignore[reportUnnecessaryComparison]
await self._apply_headers(input.headers, req.header.fields)
await self._client.workflow_service.signal_workflow_execution(
# If this signal is issued from inside a Nexus operation handler, forward the inbound
# Nexus task links so the WorkflowExecutionSignaled event links back to the caller.
nexus_ctx = self._try_nexus_start_operation_context()
if nexus_ctx is not None:
req.links.extend(nexus_ctx._get_outgoing_request_links())
resp = await self._client.workflow_service.signal_workflow_execution(
req, retry=True, metadata=input.rpc_metadata, timeout=input.rpc_timeout
)
# Server >= 1.31 with EnableCHASMSignalBacklinks returns a backlink pointing at the
# signal event; older servers leave it unset. Propagate when present.
if nexus_ctx is not None and resp.HasField("link"):
nexus_ctx._add_backlink(resp.link)

async def terminate_workflow(self, input: TerminateWorkflowInput) -> None:
data_converter = self._client.data_converter._with_contexts(
Expand Down Expand Up @@ -1636,6 +1679,17 @@ async def count_nexus_operations(
)
)

@staticmethod
def _try_nexus_start_operation_context() -> (
temporalio.nexus._operation_context._TemporalStartOperationContext | None
):
"""The Nexus start-operation context if a handler is currently running, else None."""
return (
temporalio.nexus._operation_context._temporal_start_operation_context.get(
None
)
)

async def _apply_headers(
self,
source: Mapping[str, temporalio.api.common.v1.Payload] | None,
Expand Down
25 changes: 25 additions & 0 deletions temporalio/nexus/_operation_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,31 @@ def _add_outbound_links(
)
return workflow_handle

def _get_outgoing_request_links(self) -> list[temporalio.api.common.v1.Link]:
"""Inbound Nexus task links to attach to RPCs the operation handler issues.

When the operation handler signals, signal-with-starts, or starts a workflow, these
links are added to the request's ``links`` field so the callee's history event links
back to the caller workflow that scheduled this Nexus operation.
"""
return self._get_links()

def _add_backlink(self, link: temporalio.api.common.v1.Link | None) -> None:
"""Append a backlink returned by an RPC the operation handler issued.

``link`` is the ``common.v1.Link`` returned on a signal, signal-with-start, or start
response (or ``None`` against a server that did not return one). When present and of the
``workflow_event`` variant, it is converted to a Nexus link and added to the operation's
outbound links so the caller workflow's Nexus history event links to the callee event.

This is only safe to call from the single thread/task that runs the operation handler.
"""
if link is None or not link.HasField("workflow_event"):
return
self.nexus_context.outbound_links.append(
workflow_event_to_nexus_link(link.workflow_event)
)


class WorkflowRunOperationContext(StartOperationContext):
"""Context received by a workflow run operation."""
Expand Down
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ async def env(env_type: str) -> AsyncGenerator[WorkflowEnvironment, None]:
"--dynamic-config-value",
"history.enableChasmCallbacks=true",
"--dynamic-config-value",
"history.enableCHASMSignalBacklinks=true",
"--dynamic-config-value",
"nexusoperation.enableStandalone=true",
"--dynamic-config-value",
'system.system.refreshNexusEndpointsMinWait="0s"',
Expand Down
Loading
Loading