Skip to content

Commit 696fd88

Browse files
committed
Review feedback
1 parent efcffca commit 696fd88

File tree

4 files changed

+87
-64
lines changed

4 files changed

+87
-64
lines changed

temporalio/contrib/aws/lambda_worker/_configure.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,16 @@
33
from __future__ import annotations
44

55
import asyncio
6-
import sys
6+
import logging
77
from collections.abc import Awaitable, Callable
88
from dataclasses import dataclass, field
99
from datetime import timedelta
1010

1111
from temporalio.client import ClientConnectConfig
1212
from temporalio.worker import WorkerConfig
1313

14+
logger = logging.getLogger(__name__)
15+
1416

1517
@dataclass
1618
class LambdaWorkerConfig:
@@ -67,7 +69,4 @@ async def _run_shutdown_hooks( # type:ignore[reportUnusedFunction]
6769
if asyncio.iscoroutine(result):
6870
await result
6971
except Exception as e:
70-
print(
71-
f"lambda_worker: shutdown hook error: {e}",
72-
file=sys.stderr,
73-
)
72+
logger.error(f"shutdown hook error: {e}")

temporalio/contrib/aws/lambda_worker/_run_worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ def configure(config: LambdaWorkerConfig) -> None:
115115
try:
116116
return _run_worker_internal(version, configure, deps)
117117
except Exception as e:
118-
print(f"lambda_worker: fatal: {e}", file=sys.stderr)
118+
logger.error(f"fatal error running lambda worker: {e}")
119119
sys.exit(1)
120120

121121

temporalio/contrib/aws/lambda_worker/otel.py

Lines changed: 48 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""OpenTelemetry helpers for Temporal workers running inside AWS Lambda.
22
33
Use :py:func:`apply_defaults` inside a :py:func:`run_worker` configure callback for a
4-
batteries-included setup that creates an OTel collector exporter and tracing interceptor, suitable
4+
batteries-included setup that creates an OTel collector exporter and tracing plugin, suitable
55
for use with the AWS Distro for OpenTelemetry (ADOT) Lambda layer.
66
77
Use :py:func:`apply_tracing` or :py:func:`build_metrics_telemetry_config` individually if you only
@@ -10,18 +10,22 @@
1010

1111
from __future__ import annotations
1212

13+
import logging
1314
import os
1415
from dataclasses import dataclass, field
1516
from datetime import timedelta
1617

1718
from opentelemetry.sdk.resources import Resource
18-
from opentelemetry.sdk.trace import TracerProvider
1919
from opentelemetry.sdk.trace.export import BatchSpanProcessor
2020
from opentelemetry.semconv.attributes.service_attributes import SERVICE_NAME
21+
from opentelemetry.trace import get_tracer_provider, set_tracer_provider
2122

2223
from temporalio.contrib.aws.lambda_worker._configure import LambdaWorkerConfig
24+
from temporalio.contrib.opentelemetry import OpenTelemetryPlugin, create_tracer_provider
2325
from temporalio.runtime import OpenTelemetryConfig, Runtime, TelemetryConfig
2426

27+
logger = logging.getLogger(__name__)
28+
2529

2630
@dataclass
2731
class OtelOptions:
@@ -73,14 +77,20 @@ def apply_defaults(
7377
"""Configure OTel metrics and tracing with AWS Lambda defaults.
7478
7579
Sets up Core SDK metrics export via a :py:class:`temporalio.runtime.Runtime` with an
76-
:py:class:`temporalio.runtime.OpenTelemetryConfig` pointing at the OTLP collector, and adds a
77-
:py:class:`temporalio.contrib.opentelemetry.TracingInterceptor` for distributed tracing.
80+
:py:class:`temporalio.runtime.OpenTelemetryConfig` pointing at the OTLP collector, and adds the
81+
:py:class:`temporalio.contrib.opentelemetry.OpenTelemetryPlugin` for distributed tracing with
82+
workflow sandbox passthrough.
83+
84+
Creates a replay-safe ``TracerProvider`` (with X-Ray ID generator and OTLP gRPC exporter if
85+
available) and sets it as the global OpenTelemetry tracer provider. The
86+
:py:class:`temporalio.contrib.opentelemetry.OpenTelemetryPlugin` uses the global provider, so
87+
it must be set before the worker starts.
7888
7989
The collector endpoint defaults to ``http://localhost:4317``, which is the endpoint expected by
8090
the ADOT collector Lambda layer.
8191
82-
Registers a per-invocation ``ForceFlush`` shutdown hook for the ``TracerProvider`` so pending
83-
traces are exported before each Lambda invocation completes.
92+
Registers a per-invocation ``ForceFlush`` shutdown hook for the global ``TracerProvider`` so
93+
pending traces are exported before each Lambda invocation completes.
8494
8595
Metrics are exported on the ``metric_periodicity`` interval by the runtime's internal thread.
8696
There is no explicit flush API for these metrics; set ``metric_periodicity`` short enough to
@@ -112,11 +122,16 @@ def apply_defaults(
112122
AwsXRayIdGenerator,
113123
)
114124

115-
tracer_provider = TracerProvider(
125+
tracer_provider = create_tracer_provider(
116126
resource=resource, id_generator=AwsXRayIdGenerator()
117127
)
118128
except ImportError:
119-
tracer_provider = TracerProvider(resource=resource)
129+
logger.warning(
130+
"opentelemetry-sdk-extension-aws is not installed; "
131+
"X-Ray trace ID generation is disabled. "
132+
"Install the 'lambda-worker-otel' extra for full ADOT support."
133+
)
134+
tracer_provider = create_tracer_provider(resource=resource)
120135

121136
# Use OTLP gRPC exporter if available, otherwise skip trace export.
122137
try:
@@ -128,9 +143,16 @@ def apply_defaults(
128143
BatchSpanProcessor(OTLPSpanExporter(endpoint=endpoint, insecure=True))
129144
)
130145
except ImportError:
131-
pass
146+
logger.warning(
147+
"opentelemetry-exporter-otlp-proto-grpc is not installed; "
148+
"traces will not be exported to the OTLP collector. "
149+
"Install the 'lambda-worker-otel' extra for full ADOT support."
150+
)
151+
152+
# Set as global so the OpenTelemetryPlugin picks it up.
153+
set_tracer_provider(tracer_provider)
132154

133-
apply_tracing(config, tracer_provider)
155+
apply_tracing(config)
134156

135157

136158
def build_metrics_telemetry_config(
@@ -191,28 +213,29 @@ def build_metrics_telemetry_config(
191213
)
192214

193215

194-
def apply_tracing(
195-
config: LambdaWorkerConfig,
196-
tracer_provider: TracerProvider,
197-
) -> None:
216+
def apply_tracing(config: LambdaWorkerConfig) -> None:
198217
"""Configure only OTel tracing (no metrics) on the Lambda worker config.
199218
200-
Adds a :py:class:`temporalio.contrib.opentelemetry.TracingInterceptor` to
201-
``config.client_connect_config["interceptors"]`` and registers a ``ForceFlush`` shutdown hook
202-
for the provider.
219+
Adds an :py:class:`temporalio.contrib.opentelemetry.OpenTelemetryPlugin` to
220+
``config.worker_config["plugins"]``. The plugin uses the global
221+
``TracerProvider`` set via ``opentelemetry.trace.set_tracer_provider``.
222+
Ensure your provider is set globally before the worker starts.
223+
224+
Also registers a ``ForceFlush`` shutdown hook that flushes the global
225+
``TracerProvider`` (if it supports ``force_flush``).
203226
204227
Args:
205228
config: The :py:class:`LambdaWorkerConfig` to configure.
206-
tracer_provider: The ``TracerProvider`` to use for tracing.
207229
"""
208-
from temporalio.contrib.opentelemetry import TracingInterceptor
209-
210-
interceptor = TracingInterceptor(tracer=tracer_provider.get_tracer("temporal-sdk"))
211-
interceptors = list(config.client_connect_config.get("interceptors", []))
212-
interceptors.append(interceptor)
213-
config.client_connect_config["interceptors"] = interceptors
230+
plugin = OpenTelemetryPlugin()
231+
plugins = list(config.worker_config.get("plugins", []))
232+
plugins.append(plugin)
233+
config.worker_config["plugins"] = plugins
214234

215235
async def _flush() -> None:
216-
tracer_provider.force_flush()
236+
provider = get_tracer_provider()
237+
flush = getattr(provider, "force_flush", None)
238+
if flush is not None:
239+
flush()
217240

218241
config.shutdown_hooks.append(_flush)

tests/contrib/aws/lambda_worker/test_otel.py

Lines changed: 34 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@
66
from unittest.mock import patch
77

88
import pytest
9-
from opentelemetry.sdk.trace import TracerProvider
10-
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
11-
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
129

1310
from temporalio.contrib.aws.lambda_worker._configure import (
1411
LambdaWorkerConfig,
@@ -20,48 +17,37 @@
2017
apply_tracing,
2118
build_metrics_telemetry_config,
2219
)
23-
from temporalio.contrib.opentelemetry import TracingInterceptor
20+
from temporalio.contrib.opentelemetry import OpenTelemetryPlugin
2421
from temporalio.runtime import OpenTelemetryConfig, TelemetryConfig
2522

2623

27-
def _make_tracer_provider() -> tuple[TracerProvider, InMemorySpanExporter]:
28-
span_exporter = InMemorySpanExporter()
29-
tracer_provider = TracerProvider()
30-
tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter))
31-
return tracer_provider, span_exporter
32-
33-
3424
class TestApplyTracing:
35-
def test_adds_interceptor(self) -> None:
25+
def test_adds_plugin(self) -> None:
3626
config = LambdaWorkerConfig()
37-
tracer_provider, _ = _make_tracer_provider()
38-
apply_tracing(config, tracer_provider)
39-
interceptors = config.client_connect_config.get("interceptors", [])
40-
assert len(interceptors) == 1
27+
apply_tracing(config)
28+
plugins = config.worker_config.get("plugins", [])
29+
assert len(plugins) == 1
30+
assert isinstance(plugins[0], OpenTelemetryPlugin)
4131

42-
def test_appends_to_existing_interceptors(self) -> None:
32+
def test_appends_to_existing_plugins(self) -> None:
4333
config = LambdaWorkerConfig()
44-
# Use a TracingInterceptor as the existing interceptor.
45-
existing_provider, _ = _make_tracer_provider()
46-
existing = TracingInterceptor(tracer=existing_provider.get_tracer("existing"))
47-
config.client_connect_config["interceptors"] = [existing]
48-
tracer_provider, _ = _make_tracer_provider()
49-
apply_tracing(config, tracer_provider)
50-
interceptors = config.client_connect_config["interceptors"]
51-
assert len(interceptors) == 2
52-
assert interceptors[0] is existing
34+
existing = OpenTelemetryPlugin()
35+
config.worker_config["plugins"] = [existing]
36+
apply_tracing(config)
37+
plugins = config.worker_config["plugins"]
38+
assert len(plugins) == 2
39+
assert plugins[0] is existing
5340

5441
def test_registers_flush_shutdown_hook(self) -> None:
5542
config = LambdaWorkerConfig()
56-
tracer_provider, _ = _make_tracer_provider()
57-
apply_tracing(config, tracer_provider)
43+
apply_tracing(config)
5844
assert len(config.shutdown_hooks) == 1
5945

6046
@pytest.mark.asyncio
6147
async def test_shutdown_hook_flushes(self) -> None:
6248
config = LambdaWorkerConfig()
63-
tracer_provider, _ = _make_tracer_provider()
64-
apply_tracing(config, tracer_provider)
49+
apply_tracing(config)
50+
# Should not raise even with the default noop global provider.
6551
await _run_shutdown_hooks(config)
6652

6753

@@ -95,7 +81,6 @@ def test_composable_with_custom_runtime(self) -> None:
9581
import dataclasses
9682

9783
tc = build_metrics_telemetry_config(endpoint="http://localhost:4317")
98-
# Replace logging config to demonstrate composability.
9984
custom_tc = dataclasses.replace(tc, logging=None)
10085
assert custom_tc.logging is None
10186
assert isinstance(custom_tc.metrics, OpenTelemetryConfig)
@@ -106,11 +91,27 @@ def test_configures_metrics_and_tracing(self) -> None:
10691
config = LambdaWorkerConfig()
10792
apply_defaults(config, OtelOptions(collector_endpoint="http://localhost:4317"))
10893

94+
# Metrics: runtime should be set.
10995
assert "runtime" in config.client_connect_config
110-
interceptors = config.client_connect_config.get("interceptors", [])
111-
assert len(interceptors) == 1
96+
# Tracing: plugin should be added.
97+
plugins = config.worker_config.get("plugins", [])
98+
assert len(plugins) == 1
99+
assert isinstance(plugins[0], OpenTelemetryPlugin)
100+
# Shutdown hook for tracer flush.
112101
assert len(config.shutdown_hooks) == 1
113102

103+
def test_sets_global_tracer_provider(self) -> None:
104+
from opentelemetry.trace import get_tracer_provider
105+
106+
from temporalio.contrib.opentelemetry._tracer_provider import (
107+
ReplaySafeTracerProvider,
108+
)
109+
110+
config = LambdaWorkerConfig()
111+
apply_defaults(config)
112+
provider = get_tracer_provider()
113+
assert isinstance(provider, ReplaySafeTracerProvider)
114+
114115
def test_service_name_from_options(self) -> None:
115116
config = LambdaWorkerConfig()
116117
apply_defaults(config, OtelOptions(service_name="my-service"))

0 commit comments

Comments
 (0)