diff --git a/temporalio/lib/temporalio/contrib/open_telemetry.rb b/temporalio/lib/temporalio/contrib/open_telemetry.rb index 3a3b9dda..77b03439 100644 --- a/temporalio/lib/temporalio/contrib/open_telemetry.rb +++ b/temporalio/lib/temporalio/contrib/open_telemetry.rb @@ -231,7 +231,7 @@ def init(outbound) # @!visibility private def execute(input) - @root._attach_context(Temporalio::Workflow.info.headers) + _attach_context(Temporalio::Workflow.info.headers) Workflow.with_completed_span("RunWorkflow:#{Temporalio::Workflow.info.workflow_type}", kind: :server) do super ensure @@ -245,7 +245,7 @@ def execute(input) # @!visibility private def handle_signal(input) - @root._attach_context(Temporalio::Workflow.info.headers) + _attach_context(Temporalio::Workflow.info.headers) Workflow.with_completed_span( "HandleSignal:#{input.signal}", links: _links_from_headers(input.headers), @@ -260,7 +260,7 @@ def handle_signal(input) # @!visibility private def handle_query(input) - @root._attach_context(Temporalio::Workflow.info.headers) + _attach_context(Temporalio::Workflow.info.headers) Workflow.with_completed_span( "HandleQuery:#{input.query}", links: _links_from_headers(input.headers), @@ -281,7 +281,7 @@ def handle_query(input) # @!visibility private def validate_update(input) - @root._attach_context(Temporalio::Workflow.info.headers) + _attach_context(Temporalio::Workflow.info.headers) Workflow.with_completed_span( "ValidateUpdate:#{input.update}", attributes: { 'temporalUpdateID' => input.id }, @@ -304,7 +304,7 @@ def validate_update(input) # @!visibility private def handle_update(input) - @root._attach_context(Temporalio::Workflow.info.headers) + _attach_context(Temporalio::Workflow.info.headers) Workflow.with_completed_span( "HandleUpdate:#{input.update}", attributes: { 'temporalUpdateID' => input.id }, @@ -323,14 +323,30 @@ def handle_update(input) end end + # @!visibility private + def _attach_context(headers) + # We have to disable the durable scheduler _even_ for something simple like attach context. For most OTel + # implementations, such a procedure is completely deterministic, but unfortunately some implementations like + # DataDog monkey patch OpenTelemetry (see + # https://github.com/DataDog/dd-trace-rb/blob/f88393d0571806b9980bb2cf5066eba60cfea177/lib/datadog/opentelemetry/api/context.rb#L184) + # to make even OpenTelemetry::Context.current non-deterministic because it uses mutexes. And a simple text + # map propagation extraction accesses Context.current. + Temporalio::Workflow::Unsafe.durable_scheduler_disabled do + @root._attach_context(headers) + end + end + # @!visibility private def _links_from_headers(headers) - context = @root._context_from_headers(headers) - span = ::OpenTelemetry::Trace.current_span(context) if context - if span && span != ::OpenTelemetry::Trace::Span::INVALID - [::OpenTelemetry::Trace::Link.new(span.context)] - else - [] + # See _attach_context above for why we have to disable scheduler even for these simple operations + Temporalio::Workflow::Unsafe.durable_scheduler_disabled do + context = @root._context_from_headers(headers) + span = ::OpenTelemetry::Trace.current_span(context) if context + if span && span != ::OpenTelemetry::Trace::Span::INVALID + [::OpenTelemetry::Trace::Link.new(span.context)] + else + [] + end end end end @@ -359,7 +375,9 @@ def execute_local_activity(input) # @!visibility private def initialize_continue_as_new_error(input) # Just apply the current context to headers - @root._apply_context_to_headers(input.error.headers) + Temporalio::Workflow::Unsafe.durable_scheduler_disabled do + @root._apply_context_to_headers(input.error.headers) + end super end @@ -386,7 +404,11 @@ def start_child_workflow(input) # @!visibility private def _apply_span_to_headers(headers, span) - @root._apply_context_to_headers(headers, context: ::OpenTelemetry::Trace.context_with_span(span)) if span + # See WorkflowInbound#_attach_context comments for why we have to disable scheduler even for these simple + # operations + Temporalio::Workflow::Unsafe.durable_scheduler_disabled do + @root._apply_context_to_headers(headers, context: ::OpenTelemetry::Trace.context_with_span(span)) if span + end end end @@ -419,9 +441,19 @@ def self.with_completed_span( ) span = completed_span(name, attributes:, links:, kind:, exception:, even_during_replay:) if span - ::OpenTelemetry::Trace.with_span(span) do # rubocop:disable Style/ExplicitBlockArgument + # We cannot use ::OpenTelemetry::Trace.with_span here unfortunately. We need to disable the durable + # scheduler for just the span attach/detach but leave it enabled for the user code (see + # WorkflowInbound#_attach_current for why we have to disable scheduler even for these simple operations). + token = Temporalio::Workflow::Unsafe.durable_scheduler_disabled do + ::OpenTelemetry::Context.attach(::OpenTelemetry::Trace.context_with_span(span)) + end + begin # Yield with no parameters yield + ensure + Temporalio::Workflow::Unsafe.durable_scheduler_disabled do + ::OpenTelemetry::Context.detach(token) + end end else yield @@ -455,22 +487,22 @@ def self.completed_span( # Do nothing if replaying and not wanted during replay return nil if !even_during_replay && Temporalio::Workflow::Unsafe.replaying? - # If there is no span on the context and the user hasn't opted in to always creating, do not create. This - # prevents orphans if there was no span originally created from the client start-workflow call. - if ::OpenTelemetry::Trace.current_span == ::OpenTelemetry::Trace::Span::INVALID && - !root._always_create_workflow_spans - return nil - end - # Create attributes, adding user-defined ones attributes = { 'temporalWorkflowID' => Temporalio::Workflow.info.workflow_id, 'temporalRunID' => Temporalio::Workflow.info.run_id }.merge(attributes) time = Temporalio::Workflow.now.dup # Disable durable scheduler because 1) synchronous/non-batch span processors in OTel use network (though could - # have just used Unafe.io_enabled for this if not for the next point) and 2) OTel uses Ruby Timeout which we + # have just used Unsafe.io_enabled for this if not for the next point) and 2) OTel uses Ruby Timeout which we # don't want to use durable timers. Temporalio::Workflow::Unsafe.durable_scheduler_disabled do + # If there is no span on the context and the user hasn't opted in to always creating, do not create. This + # prevents orphans if there was no span originally created from the client start-workflow call. + if ::OpenTelemetry::Trace.current_span == ::OpenTelemetry::Trace::Span::INVALID && + !root._always_create_workflow_spans + return nil + end + span = root.tracer.start_span(name, attributes:, links:, start_timestamp: time, kind:) # steep:ignore # Record exception and set status if present if exception diff --git a/temporalio/sig/temporalio/contrib/open_telemetry.rbs b/temporalio/sig/temporalio/contrib/open_telemetry.rbs index de2158cf..84376bff 100644 --- a/temporalio/sig/temporalio/contrib/open_telemetry.rbs +++ b/temporalio/sig/temporalio/contrib/open_telemetry.rbs @@ -29,6 +29,7 @@ module Temporalio class WorkflowInbound < Worker::Interceptor::Workflow::Inbound def initialize: (TracingInterceptor root, Worker::Interceptor::Workflow::Inbound next_interceptor) -> void + def _attach_context: (Hash[String, untyped] headers) -> void def _links_from_headers: (Hash[String, untyped] headers) -> Array[untyped] end diff --git a/temporalio/test/contrib/open_telemetry_test.rb b/temporalio/test/contrib/open_telemetry_test.rb index e0a22990..7d3fb58d 100644 --- a/temporalio/test/contrib/open_telemetry_test.rb +++ b/temporalio/test/contrib/open_telemetry_test.rb @@ -580,6 +580,34 @@ def test_benign_activity_exception assert_equal exp_root.to_s_indented, act_root.to_s_indented end + module ContextCurrentPatch + def current(*args, **kwargs, &) + Mutex.new if ContextCurrentPatch.do_illegal_thing + super + end + + class << self + attr_accessor :do_illegal_thing + end + end + + ::OpenTelemetry::Context.singleton_class.prepend(ContextCurrentPatch) # rubocop:disable Layout/ClassStructure + + def test_illegal_calls_on_context + # Some libraries (DataDog) sadly patch OTel's Context.current, so we need to make anything referencing OTel + # disable durable scheduler. This test used to fail before we adjusted the interceptor. + + # Make the "current" call for a context do something bad + ContextCurrentPatch.do_illegal_thing = true + + act_root = trace_workflow(:complete, &:result) + assert_equal 'StartWorkflow:TestWorkflow', act_root.children.first.name + assert_equal 'RunWorkflow:TestWorkflow', act_root.children.first.children.first.name + assert_equal 'CompleteWorkflow:TestWorkflow', act_root.children.first.children.first.children.first.name + ensure + ContextCurrentPatch.do_illegal_thing = false + end + ExpectedSpan = Data.define( # rubocop:disable Layout/ClassStructure :name, :children,