diff --git a/temporalio/contrib/opentelemetry/_interceptor.py b/temporalio/contrib/opentelemetry/_interceptor.py index eb22f8be6..433f81452 100644 --- a/temporalio/contrib/opentelemetry/_interceptor.py +++ b/temporalio/contrib/opentelemetry/_interceptor.py @@ -2,7 +2,9 @@ from __future__ import annotations +import contextvars import dataclasses +import logging from collections.abc import Callable, Iterator, Mapping, Sequence from contextlib import contextmanager from dataclasses import dataclass @@ -58,6 +60,33 @@ _ContextT = TypeVar("_ContextT", bound=nexusrpc.handler.OperationContext) +_otel_context_logger = logging.getLogger("opentelemetry.context") + + +class _SuppressDetachFailureFilter(logging.Filter): + def filter(self, record: logging.LogRecord) -> bool: + return not record.getMessage().startswith("Failed to detach context") + + +def _safe_detach(token: contextvars.Token[Context]) -> None: + """Detach an OTel context token, suppressing OTel's spurious failure log. + + A detach can run inside a different ``contextvars.Context`` than the one the + token was created on -- the workflow event loop runs portions of the workflow + inside ``contextvars.copy_context().run(...)``. A copy preserves the OTel + context value but invalidates the token, so OTel's ``detach`` logs "Failed to + detach context". We still call ``opentelemetry.context.detach`` (rather than + e.g. restoring via ``attach``) so attach/detach calls stay balanced -- a leak + invariant the interceptor tests enforce -- and only drop that one log record. + """ + detach_filter = _SuppressDetachFailureFilter() + _otel_context_logger.addFilter(detach_filter) + try: + opentelemetry.context.detach(token) + finally: + _otel_context_logger.removeFilter(detach_filter) + + class TracingInterceptor(temporalio.client.Interceptor, temporalio.worker.Interceptor): """Interceptor that supports client and worker OpenTelemetry span creation and propagation. @@ -568,7 +597,7 @@ async def handle_query(self, input: temporalio.worker.HandleQueryInput) -> Any: # on. As such we do a best effort detach to avoid using a mismatched # token. if context is opentelemetry.context.get_current(): - opentelemetry.context.detach(token) + _safe_detach(token) def handle_update_validator( self, input: temporalio.worker.HandleUpdateInput @@ -663,7 +692,7 @@ def _top_level_workflow_context( # on. As such we do a best effort detach to avoid using a mismatched # token. if context is opentelemetry.context.get_current(): - opentelemetry.context.detach(token) + _safe_detach(token) def _context_to_headers( self, headers: Mapping[str, temporalio.api.common.v1.Payload]