Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 54 additions & 22 deletions temporalio/lib/temporalio/contrib/open_telemetry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def init(outbound)

# @!visibility private
def execute(input)
@root._attach_context(Temporalio::Workflow.info.headers)
_attach_context(Temporalio::Workflow.info.headers)
Workflow.with_completed_span("RunWorkflow:#{Temporalio::Workflow.info.workflow_type}", kind: :server) do
super
ensure
Expand All @@ -245,7 +245,7 @@ def execute(input)

# @!visibility private
def handle_signal(input)
@root._attach_context(Temporalio::Workflow.info.headers)
_attach_context(Temporalio::Workflow.info.headers)
Workflow.with_completed_span(
"HandleSignal:#{input.signal}",
links: _links_from_headers(input.headers),
Expand All @@ -260,7 +260,7 @@ def handle_signal(input)

# @!visibility private
def handle_query(input)
@root._attach_context(Temporalio::Workflow.info.headers)
_attach_context(Temporalio::Workflow.info.headers)
Workflow.with_completed_span(
"HandleQuery:#{input.query}",
links: _links_from_headers(input.headers),
Expand All @@ -281,7 +281,7 @@ def handle_query(input)

# @!visibility private
def validate_update(input)
@root._attach_context(Temporalio::Workflow.info.headers)
_attach_context(Temporalio::Workflow.info.headers)
Workflow.with_completed_span(
"ValidateUpdate:#{input.update}",
attributes: { 'temporalUpdateID' => input.id },
Expand All @@ -304,7 +304,7 @@ def validate_update(input)

# @!visibility private
def handle_update(input)
@root._attach_context(Temporalio::Workflow.info.headers)
_attach_context(Temporalio::Workflow.info.headers)
Workflow.with_completed_span(
"HandleUpdate:#{input.update}",
attributes: { 'temporalUpdateID' => input.id },
Expand All @@ -323,14 +323,30 @@ def handle_update(input)
end
end

# @!visibility private
def _attach_context(headers)
# We have to disable the durable scheduler _even_ for something simple like attach context. For most OTel
# implementations, such a procedure is completely deterministic, but unfortunately some implementations like
# DataDog monkey patch OpenTelemetry (see
# https://github.com/DataDog/dd-trace-rb/blob/f88393d0571806b9980bb2cf5066eba60cfea177/lib/datadog/opentelemetry/api/context.rb#L184)
# to make even OpenTelemetry::Context.current non-deterministic because it uses mutexes. And a simple text
# map propagation extraction accesses Context.current.
Temporalio::Workflow::Unsafe.durable_scheduler_disabled do
@root._attach_context(headers)
end
end

# @!visibility private
def _links_from_headers(headers)
context = @root._context_from_headers(headers)
span = ::OpenTelemetry::Trace.current_span(context) if context
if span && span != ::OpenTelemetry::Trace::Span::INVALID
[::OpenTelemetry::Trace::Link.new(span.context)]
else
[]
# See _attach_context above for why we have to disable scheduler even for these simple operations
Temporalio::Workflow::Unsafe.durable_scheduler_disabled do
context = @root._context_from_headers(headers)
span = ::OpenTelemetry::Trace.current_span(context) if context
if span && span != ::OpenTelemetry::Trace::Span::INVALID
[::OpenTelemetry::Trace::Link.new(span.context)]
else
[]
end
end
end
end
Expand Down Expand Up @@ -359,7 +375,9 @@ def execute_local_activity(input)
# @!visibility private
def initialize_continue_as_new_error(input)
# Just apply the current context to headers
@root._apply_context_to_headers(input.error.headers)
Temporalio::Workflow::Unsafe.durable_scheduler_disabled do
@root._apply_context_to_headers(input.error.headers)
end
super
end

Expand All @@ -386,7 +404,11 @@ def start_child_workflow(input)

# @!visibility private
def _apply_span_to_headers(headers, span)
@root._apply_context_to_headers(headers, context: ::OpenTelemetry::Trace.context_with_span(span)) if span
# See WorkflowInbound#_attach_context comments for why we have to disable scheduler even for these simple
# operations
Temporalio::Workflow::Unsafe.durable_scheduler_disabled do
@root._apply_context_to_headers(headers, context: ::OpenTelemetry::Trace.context_with_span(span)) if span
end
end
end

Expand Down Expand Up @@ -419,9 +441,19 @@ def self.with_completed_span(
)
span = completed_span(name, attributes:, links:, kind:, exception:, even_during_replay:)
if span
::OpenTelemetry::Trace.with_span(span) do # rubocop:disable Style/ExplicitBlockArgument
# We cannot use ::OpenTelemetry::Trace.with_span here unfortunately. We need to disable the durable
# scheduler for just the span attach/detach but leave it enabled for the user code (see
# WorkflowInbound#_attach_current for why we have to disable scheduler even for these simple operations).
token = Temporalio::Workflow::Unsafe.durable_scheduler_disabled do
::OpenTelemetry::Context.attach(::OpenTelemetry::Trace.context_with_span(span))
end
begin
# Yield with no parameters
yield
ensure
Temporalio::Workflow::Unsafe.durable_scheduler_disabled do
::OpenTelemetry::Context.detach(token)
end
end
else
yield
Expand Down Expand Up @@ -455,22 +487,22 @@ def self.completed_span(
# Do nothing if replaying and not wanted during replay
return nil if !even_during_replay && Temporalio::Workflow::Unsafe.replaying?

# If there is no span on the context and the user hasn't opted in to always creating, do not create. This
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Temporalio::Workflow::Unsafe.durable_scheduler_disabled hefty enough that we want to avoid calling it twice or was the block move just cleaner code?

Copy link
Member Author

@cretz cretz Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not too bad I don't think, it just unsets the fiber scheduler temporarily and turns off illegal call validation (just a bool setting).

I put in some thought here, but decided I'd rather keep the block-based approach as the only way to disable scheduler currently (instead of also supporting a split disable and re-enable), so I figured splitting the OTel part was better.

(note, my comment refers to "We cannot use ::OpenTelemetry::Trace.with_span here unfortunately", this line of code linked is just code we moved to inside the scheduler disabling)

# prevents orphans if there was no span originally created from the client start-workflow call.
if ::OpenTelemetry::Trace.current_span == ::OpenTelemetry::Trace::Span::INVALID &&
!root._always_create_workflow_spans
return nil
end

# Create attributes, adding user-defined ones
attributes = { 'temporalWorkflowID' => Temporalio::Workflow.info.workflow_id,
'temporalRunID' => Temporalio::Workflow.info.run_id }.merge(attributes)

time = Temporalio::Workflow.now.dup
# Disable durable scheduler because 1) synchronous/non-batch span processors in OTel use network (though could
# have just used Unafe.io_enabled for this if not for the next point) and 2) OTel uses Ruby Timeout which we
# have just used Unsafe.io_enabled for this if not for the next point) and 2) OTel uses Ruby Timeout which we
# don't want to use durable timers.
Temporalio::Workflow::Unsafe.durable_scheduler_disabled do
# If there is no span on the context and the user hasn't opted in to always creating, do not create. This
# prevents orphans if there was no span originally created from the client start-workflow call.
if ::OpenTelemetry::Trace.current_span == ::OpenTelemetry::Trace::Span::INVALID &&
!root._always_create_workflow_spans
return nil
end

span = root.tracer.start_span(name, attributes:, links:, start_timestamp: time, kind:) # steep:ignore
# Record exception and set status if present
if exception
Expand Down
1 change: 1 addition & 0 deletions temporalio/sig/temporalio/contrib/open_telemetry.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ module Temporalio
class WorkflowInbound < Worker::Interceptor::Workflow::Inbound
def initialize: (TracingInterceptor root, Worker::Interceptor::Workflow::Inbound next_interceptor) -> void

def _attach_context: (Hash[String, untyped] headers) -> void
def _links_from_headers: (Hash[String, untyped] headers) -> Array[untyped]
end

Expand Down
28 changes: 28 additions & 0 deletions temporalio/test/contrib/open_telemetry_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,34 @@ def test_benign_activity_exception
assert_equal exp_root.to_s_indented, act_root.to_s_indented
end

module ContextCurrentPatch
def current(*args, **kwargs, &)
Mutex.new if ContextCurrentPatch.do_illegal_thing
super
end

class << self
attr_accessor :do_illegal_thing
end
end

::OpenTelemetry::Context.singleton_class.prepend(ContextCurrentPatch) # rubocop:disable Layout/ClassStructure

def test_illegal_calls_on_context
# Some libraries (DataDog) sadly patch OTel's Context.current, so we need to make anything referencing OTel
# disable durable scheduler. This test used to fail before we adjusted the interceptor.

# Make the "current" call for a context do something bad
ContextCurrentPatch.do_illegal_thing = true

act_root = trace_workflow(:complete, &:result)
assert_equal 'StartWorkflow:TestWorkflow', act_root.children.first.name
assert_equal 'RunWorkflow:TestWorkflow', act_root.children.first.children.first.name
assert_equal 'CompleteWorkflow:TestWorkflow', act_root.children.first.children.first.children.first.name
ensure
ContextCurrentPatch.do_illegal_thing = false
end

ExpectedSpan = Data.define( # rubocop:disable Layout/ClassStructure
:name,
:children,
Expand Down
Loading