|
| 1 | +"""Context propagation helpers for OpenTelemetry tracing.""" |
| 2 | + |
| 3 | +from __future__ import annotations |
| 4 | + |
| 5 | +import contextlib |
| 6 | +from typing import TYPE_CHECKING, Any |
| 7 | + |
| 8 | +if TYPE_CHECKING: |
| 9 | + from collections.abc import Generator |
| 10 | + |
| 11 | + from opentelemetry.trace import Span |
| 12 | + |
| 13 | + |
| 14 | +def inject_trace_context(carrier: dict[str, Any]) -> dict[str, Any]: |
| 15 | + """Inject current trace context into a carrier dict. |
| 16 | +
|
| 17 | + Use this to propagate trace context through SQS messages. |
| 18 | +
|
| 19 | + Parameters |
| 20 | + ---------- |
| 21 | + carrier : dict |
| 22 | + Dictionary to inject trace context into. |
| 23 | +
|
| 24 | + Returns |
| 25 | + ------- |
| 26 | + dict |
| 27 | + Carrier with trace context injected. |
| 28 | +
|
| 29 | + Example |
| 30 | + ------- |
| 31 | + message = {"data": "payload"} |
| 32 | + message = inject_trace_context(message) |
| 33 | + sqs.send_message(QueueUrl=url, MessageBody=json.dumps(message)) |
| 34 | + """ |
| 35 | + from opentelemetry.propagate import inject |
| 36 | + |
| 37 | + inject(carrier) |
| 38 | + return carrier |
| 39 | + |
| 40 | + |
| 41 | +@contextlib.contextmanager |
| 42 | +def create_span_from_context( |
| 43 | + name: str, |
| 44 | + carrier: dict[str, Any], |
| 45 | + **kwargs, |
| 46 | +) -> Generator[Span, None, None]: |
| 47 | + """Create a span with parent context extracted from carrier. |
| 48 | +
|
| 49 | + Use this to continue a trace from an SQS message. |
| 50 | +
|
| 51 | + Parameters |
| 52 | + ---------- |
| 53 | + name : str |
| 54 | + Span name. |
| 55 | + carrier : dict |
| 56 | + Dictionary containing trace context (e.g., SQS message body). |
| 57 | + **kwargs |
| 58 | + Additional arguments passed to start_as_current_span. |
| 59 | +
|
| 60 | + Example |
| 61 | + ------- |
| 62 | + message = json.loads(record["body"]) |
| 63 | + with create_span_from_context("process_message", message) as span: |
| 64 | + process(message["data"]) |
| 65 | + """ |
| 66 | + from opentelemetry import trace |
| 67 | + from opentelemetry.propagate import extract |
| 68 | + |
| 69 | + ctx = extract(carrier) |
| 70 | + tracer = trace.get_tracer("aws_lambda_powertools") |
| 71 | + |
| 72 | + kwargs.setdefault("record_exception", True) |
| 73 | + kwargs.setdefault("set_status_on_exception", True) |
| 74 | + |
| 75 | + with tracer.start_as_current_span(name=name, context=ctx, **kwargs) as span: |
| 76 | + yield span |
0 commit comments