Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions temporalio/contrib/opentelemetry/_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

from __future__ import annotations

import contextvars
import dataclasses
import logging
from collections.abc import Callable, Iterator, Mapping, Sequence
from contextlib import contextmanager
from dataclasses import dataclass
Expand Down Expand Up @@ -58,6 +60,33 @@
_ContextT = TypeVar("_ContextT", bound=nexusrpc.handler.OperationContext)


_otel_context_logger = logging.getLogger("opentelemetry.context")


class _SuppressDetachFailureFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
return not record.getMessage().startswith("Failed to detach context")


def _safe_detach(token: contextvars.Token[Context]) -> None:
"""Detach an OTel context token, suppressing OTel's spurious failure log.

A detach can run inside a different ``contextvars.Context`` than the one the
token was created on -- the workflow event loop runs portions of the workflow
inside ``contextvars.copy_context().run(...)``. A copy preserves the OTel
context value but invalidates the token, so OTel's ``detach`` logs "Failed to
detach context". We still call ``opentelemetry.context.detach`` (rather than
e.g. restoring via ``attach``) so attach/detach calls stay balanced -- a leak
invariant the interceptor tests enforce -- and only drop that one log record.
"""
detach_filter = _SuppressDetachFailureFilter()
_otel_context_logger.addFilter(detach_filter)
try:
opentelemetry.context.detach(token)
finally:
_otel_context_logger.removeFilter(detach_filter)


class TracingInterceptor(temporalio.client.Interceptor, temporalio.worker.Interceptor):
"""Interceptor that supports client and worker OpenTelemetry span creation
and propagation.
Expand Down Expand Up @@ -568,7 +597,7 @@ async def handle_query(self, input: temporalio.worker.HandleQueryInput) -> Any:
# on. As such we do a best effort detach to avoid using a mismatched
# token.
if context is opentelemetry.context.get_current():
opentelemetry.context.detach(token)
_safe_detach(token)

def handle_update_validator(
self, input: temporalio.worker.HandleUpdateInput
Expand Down Expand Up @@ -663,7 +692,7 @@ def _top_level_workflow_context(
# on. As such we do a best effort detach to avoid using a mismatched
# token.
if context is opentelemetry.context.get_current():
opentelemetry.context.detach(token)
_safe_detach(token)

def _context_to_headers(
self, headers: Mapping[str, temporalio.api.common.v1.Payload]
Expand Down
Loading