diff --git a/pyproject.toml b/pyproject.toml index c8fa96a8d..e52ad5ead 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,13 @@ opentelemetry = ["opentelemetry-api>=1.11.1,<2", "opentelemetry-sdk>=1.11.1,<2"] pydantic = ["pydantic>=2.0.0,<3"] openai-agents = ["openai-agents>=0.3,<0.7", "mcp>=1.9.4, <2"] google-adk = ["google-adk>=1.27.0,<2"] +lambda-worker-otel = [ + "opentelemetry-api>=1.11.1,<2", + "opentelemetry-sdk>=1.11.1,<2", + "opentelemetry-exporter-otlp-proto-grpc>=1.11.1,<2", + "opentelemetry-semantic-conventions>=0.40b0,<1", + "opentelemetry-sdk-extension-aws>=2.0.0,<3", +] aioboto3 = [ "aioboto3>=10.4.0", "types-aioboto3[s3]>=10.4.0", @@ -69,6 +76,9 @@ dev = [ "googleapis-common-protos==1.70.0", "pytest-rerunfailures>=16.1", "moto[s3,server]>=5", + "opentelemetry-exporter-otlp-proto-grpc>=1.11.1,<2", + "opentelemetry-semantic-conventions>=0.40b0,<1", + "opentelemetry-sdk-extension-aws>=2.0.0,<3", ] [tool.poe.tasks] diff --git a/temporalio/client.py b/temporalio/client.py index cc2750ec6..9e5e3494e 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -2802,6 +2802,29 @@ async def get_worker_task_reachability( ) +class ClientConnectConfig(TypedDict, total=False): + """TypedDict of keyword arguments for :py:meth:`Client.connect`.""" + + target_host: str + namespace: str + api_key: str | None + data_converter: temporalio.converter.DataConverter + plugins: Sequence[Plugin] + interceptors: Sequence[Interceptor] + default_workflow_query_reject_condition: ( + temporalio.common.QueryRejectCondition | None + ) + tls: bool | TLSConfig | None + retry_config: RetryConfig | None + keep_alive_config: KeepAliveConfig | None + rpc_metadata: Mapping[str, str | bytes] + identity: str | None + lazy: bool + runtime: temporalio.runtime.Runtime | None + http_connect_proxy_config: HttpConnectProxyConfig | None + header_codec_behavior: HeaderCodecBehavior + + class ClientConfig(TypedDict, total=False): """TypedDict of config originally passed to :py:meth:`Client`.""" diff --git a/temporalio/contrib/aws/__init__.py b/temporalio/contrib/aws/__init__.py new file mode 100644 index 000000000..a8b8c648f --- /dev/null +++ b/temporalio/contrib/aws/__init__.py @@ -0,0 +1 @@ +"""AWS integrations for Temporal SDK.""" diff --git a/temporalio/contrib/aws/lambda_worker/README.md b/temporalio/contrib/aws/lambda_worker/README.md new file mode 100644 index 000000000..f9166b13d --- /dev/null +++ b/temporalio/contrib/aws/lambda_worker/README.md @@ -0,0 +1,104 @@ +# lambda_worker + +A wrapper for running [Temporal](https://temporal.io) workers inside AWS Lambda. A single +`run_worker` call handles the full per-invocation lifecycle: connecting to the Temporal server, +creating a worker with Lambda-tuned defaults, polling for tasks, and gracefully shutting down before +the invocation deadline. + +## Quick start + +```python +# handler.py +from temporalio.common import WorkerDeploymentVersion +from temporalio.contrib.aws.lambda_worker import LambdaWorkerConfig, run_worker + +from my_workflows import MyWorkflow +from my_activities import my_activity + + +def configure(config: LambdaWorkerConfig) -> None: + config.worker_config["task_queue"] = "my-task-queue" + config.worker_config["workflows"] = [MyWorkflow] + config.worker_config["activities"] = [my_activity] + + +lambda_handler = run_worker( + WorkerDeploymentVersion( + deployment_name="my-service", + build_id="v1.0", + ), + configure, +) +``` + +## Configuration + +Client connection settings (address, namespace, TLS, API key) are loaded +automatically from a TOML config file and/or environment variables via +`temporalio.envconfig`. The config file is resolved in order: + +1. `TEMPORAL_CONFIG_FILE` env var, if set. +2. `temporal.toml` in `$LAMBDA_TASK_ROOT` (typically `/var/task`). +3. `temporal.toml` in the current working directory. + +The file is optional -- if absent, only environment variables are used. + +The configure callback receives a `LambdaWorkerConfig` dataclass with fields +pre-populated with Lambda-appropriate defaults. Override any field directly in +the callback. The `task_queue` key in `worker_config` is pre-populated from the +`TEMPORAL_TASK_QUEUE` environment variable if set. + +## Lambda-tuned worker defaults + +The package applies conservative concurrency limits suited to Lambda's resource +constraints: + +| Setting | Default | +| --- | --- | +| `max_concurrent_activities` | 2 | +| `max_concurrent_workflow_tasks` | 10 | +| `max_concurrent_local_activities` | 2 | +| `max_concurrent_nexus_tasks` | 5 | +| `workflow_task_poller_behavior` | `SimpleMaximum(2)` | +| `activity_task_poller_behavior` | `SimpleMaximum(1)` | +| `nexus_task_poller_behavior` | `SimpleMaximum(1)` | +| `graceful_shutdown_timeout` | 5 seconds | +| `max_cached_workflows` | 100 | +| `disable_eager_activity_execution` | Always `True` | + +Worker Deployment Versioning is always enabled. + +## Observability + +Metrics and tracing are opt-in. The `otel` module provides convenience helpers +for AWS Distro for OpenTelemetry (ADOT): + +```python +from temporalio.common import WorkerDeploymentVersion +from temporalio.contrib.aws.lambda_worker import LambdaWorkerConfig, run_worker +from temporalio.contrib.aws.lambda_worker.otel import apply_defaults, OtelOptions + + +def configure(config: LambdaWorkerConfig) -> None: + config.worker_config["task_queue"] = "my-task-queue" + config.worker_config["workflows"] = [MyWorkflow] + config.worker_config["activities"] = [my_activity] + apply_defaults(config, OtelOptions()) + + +lambda_handler = run_worker( + WorkerDeploymentVersion( + deployment_name="my-service", + build_id="v1.0", + ), + configure, +) +``` + +You can also use `apply_metrics` or `apply_tracing` individually. + +If you use OTEL, you can use +[ADOT](https://aws-otel.github.io/docs/getting-started/lambda/lambda-python) +(the AWS Distro For OpenTelemetry) to automatically integrate with AWS +observability functionality. Namely, you will want to add the Lambda layer in +the aforementioned link. We'll handle setting up the SDK for you. diff --git a/temporalio/contrib/aws/lambda_worker/__init__.py b/temporalio/contrib/aws/lambda_worker/__init__.py new file mode 100644 index 000000000..11f748c9b --- /dev/null +++ b/temporalio/contrib/aws/lambda_worker/__init__.py @@ -0,0 +1,49 @@ +"""A wrapper for running Temporal workers inside AWS Lambda. + +A single :py:func:`run_worker` call handles the full per-invocation lifecycle: connecting to the +Temporal server, creating a worker with Lambda-tuned defaults, polling for tasks, and gracefully +shutting down before the invocation deadline. + +Quick start:: + + from temporalio.common import WorkerDeploymentVersion + from temporalio.contrib.aws.lambda_worker import LambdaWorkerConfig, run_worker + + def configure(config: LambdaWorkerConfig) -> None: + config.worker_config["task_queue"] = "my-task-queue" + config.worker_config["workflows"] = [MyWorkflow] + config.worker_config["activities"] = [my_activity] + + lambda_handler = run_worker( + WorkerDeploymentVersion( + deployment_name="my-service", + build_id="v1.0", + ), + configure, + ) + +Configuration +------------- +Client connection settings (address, namespace, TLS, API key) are loaded automatically from a TOML +config file and/or environment variables via :py:mod:`temporalio.envconfig`. The config file is +resolved in order: + +1. ``TEMPORAL_CONFIG_FILE`` env var, if set. +2. ``temporal.toml`` in ``$LAMBDA_TASK_ROOT`` (typically ``/var/task``). +3. ``temporal.toml`` in the current working directory. + +The file is optional -- if absent, only environment variables are used. + +The configure callback receives a :py:class:`LambdaWorkerConfig` dataclass with fields pre-populated +with Lambda-appropriate defaults. Override any field directly in the callback. The ``task_queue`` +key in ``worker_config`` is pre-populated from the ``TEMPORAL_TASK_QUEUE`` environment variable if +set. +""" + +from temporalio.contrib.aws.lambda_worker._configure import LambdaWorkerConfig +from temporalio.contrib.aws.lambda_worker._run_worker import run_worker + +__all__ = [ + "LambdaWorkerConfig", + "run_worker", +] diff --git a/temporalio/contrib/aws/lambda_worker/_configure.py b/temporalio/contrib/aws/lambda_worker/_configure.py new file mode 100644 index 000000000..dd1657f6d --- /dev/null +++ b/temporalio/contrib/aws/lambda_worker/_configure.py @@ -0,0 +1,72 @@ +"""Configuration for the Lambda worker.""" + +from __future__ import annotations + +import asyncio +import logging +from collections.abc import Awaitable, Callable +from dataclasses import dataclass, field +from datetime import timedelta + +from temporalio.client import ClientConnectConfig +from temporalio.worker import WorkerConfig + +logger = logging.getLogger(__name__) + + +@dataclass +class LambdaWorkerConfig: + """Passed to the configure callback of :py:func:`run_worker`. + + Fields are pre-populated with Lambda-appropriate defaults before the configure callback is + invoked; the callback may read and override any of them. + + Use ``worker_config`` to set task queue, register workflows/activities, and tune worker options. + The ``task_queue`` key is pre-populated from the ``TEMPORAL_TASK_QUEUE`` environment variable if + set. + + Attributes: + client_connect_config: Keyword arguments that will be passed to + :py:meth:`temporalio.client.Client.connect`. Pre-populated from the + config file / environment variables via envconfig, with Lambda + defaults applied. + worker_config: Keyword arguments that will be passed to the + :py:class:`temporalio.worker.Worker` constructor (the ``client`` + key is managed internally). Pre-populated with Lambda-appropriate + defaults (low concurrency, eager activities disabled) and + ``task_queue`` from ``TEMPORAL_TASK_QUEUE`` if set. + shutdown_deadline_buffer: How long before the Lambda invocation + deadline the worker begins its shutdown sequence (worker drain + + shutdown hooks). Pre-populated to + ``graceful_shutdown_timeout + 2s``. If you change + ``graceful_shutdown_timeout`` in ``worker_config``, adjust this + accordingly. + shutdown_hooks: Functions called at the end of each Lambda invocation, + after the worker has stopped. Run in list order. Each may be sync + or async. Use this to flush telemetry providers or release other + per-process resources. + """ + + client_connect_config: ClientConnectConfig = field( + default_factory=ClientConnectConfig + ) + worker_config: WorkerConfig = field(default_factory=WorkerConfig) + shutdown_deadline_buffer: timedelta = field( + default_factory=lambda: timedelta(seconds=7) + ) + shutdown_hooks: list[Callable[[], Awaitable[None] | None]] = field( + default_factory=list + ) + + +async def _run_shutdown_hooks( # type:ignore[reportUnusedFunction] + config: LambdaWorkerConfig, +) -> None: + """Run all registered shutdown hooks in order, logging errors.""" + for fn in config.shutdown_hooks: + try: + result = fn() + if asyncio.iscoroutine(result): + await result + except Exception as e: + logger.error(f"shutdown hook error: {e}") diff --git a/temporalio/contrib/aws/lambda_worker/_defaults.py b/temporalio/contrib/aws/lambda_worker/_defaults.py new file mode 100644 index 000000000..1b93e3407 --- /dev/null +++ b/temporalio/contrib/aws/lambda_worker/_defaults.py @@ -0,0 +1,84 @@ +"""Lambda-tuned defaults for Temporal worker and client configuration.""" + +from __future__ import annotations + +import os +from collections.abc import Callable +from datetime import timedelta +from pathlib import Path + +from temporalio.worker import PollerBehaviorSimpleMaximum, WorkerConfig + +# ---- Lambda-tuned worker defaults ---- +# Conservative concurrency limits suited to Lambda's resource constraints. + +DEFAULT_MAX_CONCURRENT_ACTIVITIES: int = 2 +DEFAULT_MAX_CONCURRENT_WORKFLOW_TASKS: int = 10 +DEFAULT_MAX_CONCURRENT_LOCAL_ACTIVITIES: int = 2 +DEFAULT_MAX_CONCURRENT_NEXUS_TASKS: int = 5 +DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT: timedelta = timedelta(seconds=5) +DEFAULT_SHUTDOWN_HOOK_BUFFER: timedelta = timedelta(seconds=2) +DEFAULT_MAX_CACHED_WORKFLOWS: int = 30 + +DEFAULT_WORKFLOW_TASK_POLLER_BEHAVIOR = PollerBehaviorSimpleMaximum(maximum=2) +DEFAULT_ACTIVITY_TASK_POLLER_BEHAVIOR = PollerBehaviorSimpleMaximum(maximum=1) +DEFAULT_NEXUS_TASK_POLLER_BEHAVIOR = PollerBehaviorSimpleMaximum(maximum=1) + +# ---- Environment variable names ---- +ENV_TASK_QUEUE = "TEMPORAL_TASK_QUEUE" +ENV_LAMBDA_TASK_ROOT = "LAMBDA_TASK_ROOT" +ENV_CONFIG_FILE = "TEMPORAL_CONFIG_FILE" +DEFAULT_CONFIG_FILE = "temporal.toml" + + +def apply_lambda_worker_defaults(config: WorkerConfig) -> None: + """Apply Lambda-appropriate defaults to worker config. + + Only sets values that have not already been set (i.e. are absent from *config*). + ``disable_eager_activity_execution`` is always set to ``True``. + """ + config.setdefault("max_concurrent_activities", DEFAULT_MAX_CONCURRENT_ACTIVITIES) + config.setdefault( + "max_concurrent_workflow_tasks", DEFAULT_MAX_CONCURRENT_WORKFLOW_TASKS + ) + config.setdefault( + "max_concurrent_local_activities", DEFAULT_MAX_CONCURRENT_LOCAL_ACTIVITIES + ) + config.setdefault("max_concurrent_nexus_tasks", DEFAULT_MAX_CONCURRENT_NEXUS_TASKS) + config.setdefault("graceful_shutdown_timeout", DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT) + config.setdefault("max_cached_workflows", DEFAULT_MAX_CACHED_WORKFLOWS) + config.setdefault( + "workflow_task_poller_behavior", DEFAULT_WORKFLOW_TASK_POLLER_BEHAVIOR + ) + config.setdefault( + "activity_task_poller_behavior", DEFAULT_ACTIVITY_TASK_POLLER_BEHAVIOR + ) + config.setdefault("nexus_task_poller_behavior", DEFAULT_NEXUS_TASK_POLLER_BEHAVIOR) + # Always disable eager activities in Lambda. + config["disable_eager_activity_execution"] = True + + +def build_lambda_identity(request_id: str, function_arn: str) -> str: + """Build a worker identity string from the Lambda invocation context. + + Format: ``@``. + """ + return f"{request_id or 'unknown'}@{function_arn or 'unknown'}" + + +def lambda_default_config_file_path( + getenv: Callable[[str], str] = os.environ.get, # type: ignore[assignment] +) -> Path: + """Return the config file path for a Lambda environment. + + Resolution order: + + 1. ``TEMPORAL_CONFIG_FILE`` env var, if set. + 2. ``temporal.toml`` in ``$LAMBDA_TASK_ROOT`` (typically ``/var/task``). + 3. ``temporal.toml`` in the current working directory. + """ + config_file = getenv(ENV_CONFIG_FILE) + if config_file: + return Path(config_file) + root = getenv(ENV_LAMBDA_TASK_ROOT) or "." + return Path(root) / DEFAULT_CONFIG_FILE diff --git a/temporalio/contrib/aws/lambda_worker/_run_worker.py b/temporalio/contrib/aws/lambda_worker/_run_worker.py new file mode 100644 index 000000000..6a2cc75a3 --- /dev/null +++ b/temporalio/contrib/aws/lambda_worker/_run_worker.py @@ -0,0 +1,259 @@ +from __future__ import annotations + +import asyncio +import logging +import os +import sys +from collections.abc import Awaitable, Callable +from dataclasses import dataclass, field +from datetime import timedelta +from typing import Any + +import temporalio.client +import temporalio.worker +from temporalio.client import ClientConnectConfig +from temporalio.common import WorkerDeploymentVersion +from temporalio.contrib.aws.lambda_worker._configure import ( + LambdaWorkerConfig, + _run_shutdown_hooks, +) +from temporalio.contrib.aws.lambda_worker._defaults import ( + DEFAULT_SHUTDOWN_HOOK_BUFFER, + apply_lambda_worker_defaults, + build_lambda_identity, + lambda_default_config_file_path, +) +from temporalio.envconfig import ClientConfigProfile +from temporalio.worker import WorkerConfig, WorkerDeploymentConfig + +logger = logging.getLogger(__name__) + + +@dataclass +class _WorkerDeps: + """External dependencies injected for testability.""" + + connect: Callable[..., Awaitable[temporalio.client.Client]] = field( + default_factory=lambda: temporalio.client.Client.connect + ) + create_worker: Callable[..., temporalio.worker.Worker] = field( + default_factory=lambda: temporalio.worker.Worker + ) + load_config: Callable[[], ClientConfigProfile] | None = None + getenv: Callable[[str], str | None] = field(default_factory=lambda: os.environ.get) + extract_lambda_ctx: Callable[[Any], tuple[str, str] | None] | None = None + + +def _default_load_config(getenv: Callable[[str], str | None]) -> ClientConfigProfile: + config_path = lambda_default_config_file_path(getenv) # type: ignore[arg-type] + return ClientConfigProfile.load(config_source=config_path) + + +def _default_extract_lambda_ctx( + lambda_context: Any, +) -> tuple[str, str] | None: + """Extract (request_id, function_arn) from a Lambda context object.""" + if lambda_context is None: + return None + request_id = getattr(lambda_context, "aws_request_id", None) + function_arn = getattr(lambda_context, "invoked_function_arn", None) + if request_id is not None and function_arn is not None: + return (request_id, function_arn) + return None + + +def run_worker( + version: WorkerDeploymentVersion, + configure: Callable[[LambdaWorkerConfig], None], +) -> Callable[[Any, Any], None]: + """Create a Temporal worker Lambda handler. + + Calls the *configure* callback to collect workflow/activity registrations and option overrides, + then returns a Lambda handler function. On each invocation the handler connects to the Temporal + server, starts a worker with Lambda-tuned defaults, polls for tasks until the invocation + deadline approaches, and then gracefully shuts down. + + The *version* parameter identifies this worker's deployment version. ``run_worker`` always + enables Worker Deployment Versioning (``use_worker_versioning=True``). To provide a default + versioning behavior for workflows that do not specify one at registration time, set + ``deployment_config`` in ``worker_config`` in the configure callback. + + The returned handler has the signature ``handler(event, context)`` and should be set as your + Lambda function's handler entry point. + + Args: + version: The worker deployment version. Required. + configure: A callback that receives a :py:class:`LambdaWorkerConfig` + (pre-populated with Lambda defaults) and configures workflows, + activities, and options on it. + + Returns: + A Lambda handler function. + + Example:: + + from temporalio.common import WorkerDeploymentVersion + from temporalio.contrib.aws.lambda_worker import ( + LambdaWorkerConfig, + run_worker, + ) + + def configure(config: LambdaWorkerConfig) -> None: + config.worker_config["task_queue"] = "my-task-queue" + config.worker_config["workflows"] = [MyWorkflow] + config.worker_config["activities"] = [my_activity] + + lambda_handler = run_worker( + WorkerDeploymentVersion( + deployment_name="my-service", + build_id="v1.0", + ), + configure, + ) + """ + deps = _WorkerDeps() + try: + return _run_worker_internal(version, configure, deps) + except Exception as e: + logger.error(f"fatal error running lambda worker: {e}") + sys.exit(1) + + +def _run_worker_internal( + version: WorkerDeploymentVersion, + configure: Callable[[LambdaWorkerConfig], None], + deps: _WorkerDeps, +) -> Callable[[Any, Any], None]: + """Core logic with injected dependencies for testability.""" + if not version.deployment_name or not version.build_id: + raise ValueError( + "version is required (deployment_name and build_id must be set)" + ) + + # Load client config from envconfig / TOML. + load_config = deps.load_config or (lambda: _default_load_config(deps.getenv)) + profile = load_config() + connect_config: ClientConnectConfig = {**profile.to_client_connect_config()} + + # Build worker config with Lambda defaults. + worker_config: WorkerConfig = {} + apply_lambda_worker_defaults(worker_config) + + # Always enable deployment versioning. + worker_config["deployment_config"] = WorkerDeploymentConfig( + version=version, + use_worker_versioning=True, + ) + + # Calculate default shutdown buffer. + graceful_timeout = worker_config.get( + "graceful_shutdown_timeout", timedelta(seconds=5) + ) + shutdown_buffer = graceful_timeout + DEFAULT_SHUTDOWN_HOOK_BUFFER + + # Pre-populate config with defaults. + config = LambdaWorkerConfig( + client_connect_config=connect_config, + worker_config=worker_config, + shutdown_deadline_buffer=shutdown_buffer, + ) + + # Pre-populate task queue from environment if available. + env_tq = deps.getenv("TEMPORAL_TASK_QUEUE") + if env_tq: + config.worker_config["task_queue"] = env_tq + + # Call user configure callback with pre-populated config. + configure(config) + + # Validate task queue. + if not config.worker_config.get("task_queue"): + raise ValueError( + "task queue not configured: set " + 'worker_config["task_queue"] or the ' + "TEMPORAL_TASK_QUEUE environment variable" + ) + + extract_lambda_ctx = deps.extract_lambda_ctx or _default_extract_lambda_ctx + + def _handler(_event: Any, lambda_context: Any) -> None: + asyncio.run( + _invocation_handler( + lambda_context=lambda_context, + config=config, + deps=deps, + extract_lambda_ctx=extract_lambda_ctx, + ) + ) + + return _handler + + +async def _invocation_handler( + *, + lambda_context: Any, + config: LambdaWorkerConfig, + deps: _WorkerDeps, + extract_lambda_ctx: Callable[[Any], tuple[str, str] | None], +) -> None: + """Handle a single Lambda invocation.""" + shutdown_buffer = config.shutdown_deadline_buffer + + # Check deadline feasibility. + remaining_ms_fn = getattr(lambda_context, "get_remaining_time_in_millis", None) + deadline_available = remaining_ms_fn is not None + if deadline_available: + assert remaining_ms_fn is not None + remaining = timedelta(milliseconds=remaining_ms_fn()) + work_time = remaining - shutdown_buffer + if work_time <= timedelta(seconds=1): + raise RuntimeError( + f"Lambda timeout is too short: {remaining.total_seconds():.1f}s " + f"remaining but {shutdown_buffer.total_seconds():.1f}s is " + f"reserved for shutdown, leaving no time for work. " + f"Increase the function timeout or decrease the shutdown " + f"deadline buffer" + ) + elif work_time < timedelta(seconds=5): + logger.warning( + "Lambda timeout leaves less than 5s for work after " + "shutdown buffer; consider increasing the function " + "timeout or decreasing the shutdown deadline buffer " + "(work_time=%s, shutdown_buffer=%s)", + work_time, + shutdown_buffer, + ) + + # Build per-invocation connect kwargs with identity from Lambda context. + invocation_connect_kwargs: ClientConnectConfig = {**config.client_connect_config} + if "identity" not in invocation_connect_kwargs: + ctx_info = extract_lambda_ctx(lambda_context) + if ctx_info is not None: + request_id, function_arn = ctx_info + invocation_connect_kwargs["identity"] = build_lambda_identity( + request_id, function_arn + ) + + # Connect to Temporal. + client = await deps.connect(**invocation_connect_kwargs) + + # Create the worker. + worker = deps.create_worker(client, **config.worker_config) + + # Run the worker until the deadline approaches or context is done. + if deadline_available: + assert remaining_ms_fn is not None + work_time_secs = ( + timedelta(milliseconds=remaining_ms_fn()) - shutdown_buffer + ).total_seconds() + if work_time_secs > 0: + try: + await asyncio.wait_for(worker.run(), timeout=work_time_secs) + except asyncio.TimeoutError: + pass + else: + # No deadline - run until cancelled. + await worker.run() + + # Run shutdown hooks after worker has stopped. + await _run_shutdown_hooks(config) diff --git a/temporalio/contrib/aws/lambda_worker/otel.py b/temporalio/contrib/aws/lambda_worker/otel.py new file mode 100644 index 000000000..216e80f48 --- /dev/null +++ b/temporalio/contrib/aws/lambda_worker/otel.py @@ -0,0 +1,241 @@ +"""OpenTelemetry helpers for Temporal workers running inside AWS Lambda. + +Use :py:func:`apply_defaults` inside a :py:func:`run_worker` configure callback for a +batteries-included setup that creates an OTel collector exporter and tracing plugin, suitable +for use with the AWS Distro for OpenTelemetry (ADOT) Lambda layer. + +Use :py:func:`apply_tracing` or :py:func:`build_metrics_telemetry_config` individually if you only +need one. +""" + +from __future__ import annotations + +import logging +import os +from dataclasses import dataclass, field +from datetime import timedelta + +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.semconv.attributes.service_attributes import SERVICE_NAME +from opentelemetry.trace import get_tracer_provider, set_tracer_provider + +from temporalio.contrib.aws.lambda_worker._configure import LambdaWorkerConfig +from temporalio.contrib.opentelemetry import OpenTelemetryPlugin, create_tracer_provider +from temporalio.runtime import OpenTelemetryConfig, Runtime, TelemetryConfig + +logger = logging.getLogger(__name__) + + +@dataclass +class OtelOptions: + """Options for :py:func:`apply_defaults`. + + Attributes: + metric_periodicity: How often the Core SDK exports metrics to the + collector. Defaults to 10 seconds. Set this shorter than your + Lambda timeout to ensure at least one export per invocation. + service_name: OTel service name resource attribute. If empty, + falls back to ``OTEL_SERVICE_NAME``, then + ``AWS_LAMBDA_FUNCTION_NAME``, then + ``"temporal-lambda-worker"``. + collector_endpoint: OTLP collector endpoint (e.g. + ``"http://localhost:4317"``). If empty, falls back to + ``OTEL_EXPORTER_OTLP_ENDPOINT``, then + ``"http://localhost:4317"``. + """ + + metric_periodicity: timedelta = field(default_factory=lambda: timedelta(seconds=10)) + service_name: str = "" + collector_endpoint: str = "" + + +def _resolve_service_name(options: OtelOptions) -> str: + service_name = options.service_name + if not service_name: + service_name = os.environ.get("OTEL_SERVICE_NAME", "") + if not service_name: + service_name = os.environ.get("AWS_LAMBDA_FUNCTION_NAME", "") + if not service_name: + service_name = "temporal-lambda-worker" + return service_name + + +def _resolve_endpoint(options: OtelOptions) -> str: + endpoint = options.collector_endpoint + if not endpoint: + endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT", "") + if not endpoint: + endpoint = "http://localhost:4317" + return endpoint + + +def apply_defaults( + config: LambdaWorkerConfig, + options: OtelOptions | None = None, +) -> None: + """Configure OTel metrics and tracing with AWS Lambda defaults. + + Sets up Core SDK metrics export via a :py:class:`temporalio.runtime.Runtime` with an + :py:class:`temporalio.runtime.OpenTelemetryConfig` pointing at the OTLP collector, and adds the + :py:class:`temporalio.contrib.opentelemetry.OpenTelemetryPlugin` for distributed tracing with + workflow sandbox passthrough. + + Creates a replay-safe ``TracerProvider`` (with X-Ray ID generator and OTLP gRPC exporter if + available) and sets it as the global OpenTelemetry tracer provider. The + :py:class:`temporalio.contrib.opentelemetry.OpenTelemetryPlugin` uses the global provider, so + it must be set before the worker starts. + + The collector endpoint defaults to ``http://localhost:4317``, which is the endpoint expected by + the ADOT collector Lambda layer. + + Registers a per-invocation ``ForceFlush`` shutdown hook for the global ``TracerProvider`` so + pending traces are exported before each Lambda invocation completes. + + Metrics are exported on the ``metric_periodicity`` interval by the runtime's internal thread. + There is no explicit flush API for these metrics; set ``metric_periodicity`` short enough to + ensure at least one export per invocation. + + Args: + config: The :py:class:`LambdaWorkerConfig` to configure. + options: Optional overrides for service name, endpoint, etc. + """ + if options is None: + options = OtelOptions() + + endpoint = _resolve_endpoint(options) + service_name = _resolve_service_name(options) + + telemetry_config = build_metrics_telemetry_config( + endpoint=endpoint, + service_name=service_name, + metric_periodicity=options.metric_periodicity, + ) + runtime = Runtime(telemetry=telemetry_config) + config.client_connect_config["runtime"] = runtime + + resource = Resource.create({SERVICE_NAME: service_name}) + + # Try to use X-Ray ID generator if available. + try: + from opentelemetry.sdk.extension.aws.trace import ( # type: ignore[reportMissingTypeStubs] + AwsXRayIdGenerator, + ) + + tracer_provider = create_tracer_provider( + resource=resource, id_generator=AwsXRayIdGenerator() + ) + except ImportError: + logger.warning( + "opentelemetry-sdk-extension-aws is not installed; " + "X-Ray trace ID generation is disabled. " + "Install the 'lambda-worker-otel' extra for full ADOT support." + ) + tracer_provider = create_tracer_provider(resource=resource) + + # Use OTLP gRPC exporter if available, otherwise skip trace export. + try: + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter, + ) + + tracer_provider.add_span_processor( + BatchSpanProcessor(OTLPSpanExporter(endpoint=endpoint, insecure=True)) + ) + except ImportError: + logger.warning( + "opentelemetry-exporter-otlp-proto-grpc is not installed; " + "traces will not be exported to the OTLP collector. " + "Install the 'lambda-worker-otel' extra for full ADOT support." + ) + + # Set as global so the OpenTelemetryPlugin picks it up. + set_tracer_provider(tracer_provider) + + apply_tracing(config) + + +def build_metrics_telemetry_config( + *, + endpoint: str = "", + service_name: str = "", + metric_periodicity: timedelta | None = None, +) -> TelemetryConfig: + """Build a :py:class:`temporalio.runtime.TelemetryConfig` for OTel metrics. + + Returns a ``TelemetryConfig`` with :py:class:`temporalio.runtime.OpenTelemetryConfig` metrics + pointed at the given OTLP collector endpoint. Use this when you need to compose metrics config + with other telemetry settings (e.g. custom logging) into your own + :py:class:`temporalio.runtime.Runtime`. + + Core SDK metrics are exported on the ``metric_periodicity`` interval by the runtime's internal + thread. There is no explicit flush API; set ``metric_periodicity`` short enough to ensure at + least one export per Lambda invocation. + + Example:: + + telemetry = build_metrics_telemetry_config( + endpoint="http://localhost:4317", + service_name="my-service", + ) + # Customize further: + telemetry_config = dataclasses.replace( + telemetry, logging=my_logging_config + ) + runtime = Runtime(telemetry=telemetry_config) + config.client_connect_config["runtime"] = runtime + + Args: + endpoint: OTLP collector endpoint. Defaults to + ``http://localhost:4317``. + service_name: OTel service name. Used as a global tag. + metric_periodicity: How often metrics are exported. + + Returns: + A ``TelemetryConfig`` ready to pass to + :py:class:`temporalio.runtime.Runtime`. + """ + if not endpoint: + endpoint = "http://localhost:4317" + + otel_config = OpenTelemetryConfig( + url=endpoint, + metric_periodicity=metric_periodicity, + ) + + global_tags: dict[str, str] = {} + if service_name: + global_tags["service_name"] = service_name + + return TelemetryConfig( + metrics=otel_config, + global_tags=global_tags, + ) + + +def apply_tracing(config: LambdaWorkerConfig) -> None: + """Configure only OTel tracing (no metrics) on the Lambda worker config. + + Adds an :py:class:`temporalio.contrib.opentelemetry.OpenTelemetryPlugin` to + ``config.worker_config["plugins"]``. The plugin uses the global + ``TracerProvider`` set via ``opentelemetry.trace.set_tracer_provider``. + Ensure your provider is set globally before the worker starts. + + Also registers a ``ForceFlush`` shutdown hook that flushes the global + ``TracerProvider`` (if it supports ``force_flush``). + + Args: + config: The :py:class:`LambdaWorkerConfig` to configure. + """ + plugin = OpenTelemetryPlugin() + plugins = list(config.worker_config.get("plugins", [])) + plugins.append(plugin) + config.worker_config["plugins"] = plugins + + async def _flush() -> None: + provider = get_tracer_provider() + flush = getattr(provider, "force_flush", None) + if flush is not None: + flush() + + config.shutdown_hooks.append(_flush) diff --git a/temporalio/contrib/aws/s3driver/_driver.py b/temporalio/contrib/aws/s3driver/_driver.py index 481e3a9d4..3b9858813 100644 --- a/temporalio/contrib/aws/s3driver/_driver.py +++ b/temporalio/contrib/aws/s3driver/_driver.py @@ -64,7 +64,7 @@ def __init__( Args: client: An :class:`S3StorageDriverClient` implementation. Use - :func:`~temporalio.contrib.aws.s3driver.aioboto3.new_aioboto3_client` to + :func:`temporalio.contrib.aws.s3driver.aioboto3.new_aioboto3_client` to wrap an aioboto3 S3 client. bucket: S3 bucket name, access point ARN, or a callable that accepts ``(StorageDriverStoreContext, Payload)`` and returns @@ -73,7 +73,7 @@ def __init__( driver_name: Name of this driver instance. Defaults to ``"aws.s3driver"``. Override when registering multiple S3StorageDriver instances with distinct configurations - under the same :attr:`~temporalio.extstore.Options.drivers` list. + under the same ``temporalio.extstore.Options.drivers`` list. max_payload_size: Maximum serialized payload size in bytes that the driver will accept. Defaults to 52428800 (50 MiB). Raise this value if your workload requires larger payloads; lower it to @@ -105,7 +105,7 @@ async def store( context: StorageDriverStoreContext, payloads: Sequence[Payload], ) -> list[StorageDriverClaim]: - """Stores payloads in S3 and returns a :class:`~temporalio.extstore.DriverClaim` for each one. + """Stores payloads in S3 and returns a ``temporalio.extstore.DriverClaim`` for each one. Payloads are keyed by their SHA-256 hash, so identical serialized bytes share the same S3 object. Deduplication is best-effort because the same @@ -190,7 +190,7 @@ async def retrieve( context: StorageDriverRetrieveContext, # noqa: ARG002 claims: Sequence[StorageDriverClaim], ) -> list[Payload]: - """Retrieves payloads from S3 for the given :class:`~temporalio.extstore.DriverClaim` list.""" + """Retrieves payloads from S3 for the given ``temporalio.extstore.DriverClaim`` list.""" async def _download(claim: StorageDriverClaim) -> Payload: bucket = claim.claim_data["bucket"] diff --git a/tests/conftest.py b/tests/conftest.py index c813f91f9..e2ab2149e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,8 +4,10 @@ import sys from collections.abc import AsyncGenerator, Iterator +import opentelemetry.trace import pytest import pytest_asyncio +from opentelemetry.util._once import Once from temporalio.client import Client from temporalio.testing import WorkflowEnvironment @@ -196,3 +198,13 @@ def pytest_cmdline_main(config): # type: ignore[reportMissingParameterType, rep @pytest.fixture def continue_as_new_suggest_history_count() -> int: return CONTINUE_AS_NEW_SUGGEST_HISTORY_COUNT + + +@pytest.fixture +def reset_otel_tracer_provider(): + """Reset global OpenTelemetry tracer provider state around tests.""" + opentelemetry.trace._TRACER_PROVIDER_SET_ONCE = Once() + opentelemetry.trace._TRACER_PROVIDER = None + yield + opentelemetry.trace._TRACER_PROVIDER_SET_ONCE = Once() + opentelemetry.trace._TRACER_PROVIDER = None diff --git a/tests/contrib/aws/lambda_worker/__init__.py b/tests/contrib/aws/lambda_worker/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/contrib/aws/lambda_worker/test_lambda_worker.py b/tests/contrib/aws/lambda_worker/test_lambda_worker.py new file mode 100644 index 000000000..178e078ac --- /dev/null +++ b/tests/contrib/aws/lambda_worker/test_lambda_worker.py @@ -0,0 +1,522 @@ +"""Tests for temporalio.contrib.aws.lambda_worker.""" + +from __future__ import annotations + +from datetime import timedelta +from pathlib import Path +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from temporalio.common import WorkerDeploymentVersion +from temporalio.contrib.aws.lambda_worker._configure import ( + LambdaWorkerConfig, + _run_shutdown_hooks, +) +from temporalio.contrib.aws.lambda_worker._defaults import ( + DEFAULT_MAX_CACHED_WORKFLOWS, + DEFAULT_MAX_CONCURRENT_ACTIVITIES, + DEFAULT_MAX_CONCURRENT_LOCAL_ACTIVITIES, + DEFAULT_MAX_CONCURRENT_NEXUS_TASKS, + DEFAULT_MAX_CONCURRENT_WORKFLOW_TASKS, + apply_lambda_worker_defaults, + build_lambda_identity, + lambda_default_config_file_path, +) +from temporalio.contrib.aws.lambda_worker._run_worker import ( + _run_worker_internal, + _WorkerDeps, +) +from temporalio.envconfig import ClientConfigProfile +from temporalio.worker import WorkerConfig + +TEST_VERSION = WorkerDeploymentVersion( + deployment_name="test-deployment", + build_id="test-build", +) + + +# ---- LambdaWorkerConfig tests ---- + + +class TestLambdaWorkerConfig: + def test_worker_config_task_queue(self) -> None: + config = LambdaWorkerConfig() + assert config.worker_config.get("task_queue") is None + config.worker_config["task_queue"] = "my-queue" + assert config.worker_config["task_queue"] == "my-queue" + + def test_worker_config_workflows(self) -> None: + config = LambdaWorkerConfig() + + class FakeWorkflow: + pass + + config.worker_config["workflows"] = [FakeWorkflow] + assert FakeWorkflow in config.worker_config["workflows"] + + def test_worker_config_activities(self) -> None: + config = LambdaWorkerConfig() + + def fake_activity() -> None: + pass + + config.worker_config["activities"] = [fake_activity] + assert fake_activity in config.worker_config["activities"] + + def test_client_connect_config_directly_modifiable(self) -> None: + config = LambdaWorkerConfig() + config.client_connect_config["namespace"] = "custom-ns" + assert config.client_connect_config["namespace"] == "custom-ns" + + def test_worker_config_directly_modifiable(self) -> None: + config = LambdaWorkerConfig() + config.worker_config["max_concurrent_activities"] = 42 + assert config.worker_config["max_concurrent_activities"] == 42 + + def test_shutdown_deadline_buffer(self) -> None: + config = LambdaWorkerConfig() + config.shutdown_deadline_buffer = timedelta(seconds=5) + assert config.shutdown_deadline_buffer == timedelta(seconds=5) + + def test_shutdown_hooks_list(self) -> None: + config = LambdaWorkerConfig() + fn = MagicMock() + config.shutdown_hooks.append(fn) + assert fn in config.shutdown_hooks + + @pytest.mark.asyncio + async def test_run_shutdown_hooks_in_order(self) -> None: + config = LambdaWorkerConfig() + order: list[str] = [] + config.shutdown_hooks.append(lambda: order.append("first")) + config.shutdown_hooks.append(lambda: order.append("second")) + await _run_shutdown_hooks(config) + assert order == ["first", "second"] + + @pytest.mark.asyncio + async def test_run_shutdown_hooks_async(self) -> None: + config = LambdaWorkerConfig() + called = False + + async def async_hook() -> None: + nonlocal called + called = True + + config.shutdown_hooks.append(async_hook) + await _run_shutdown_hooks(config) + assert called + + @pytest.mark.asyncio + async def test_run_shutdown_hooks_error_continues(self) -> None: + config = LambdaWorkerConfig() + second_called = False + + def failing_hook() -> None: + raise RuntimeError("flush failed") + + def second_hook() -> None: + nonlocal second_called + second_called = True + + config.shutdown_hooks.append(failing_hook) + config.shutdown_hooks.append(second_hook) + await _run_shutdown_hooks(config) + assert second_called + + def test_is_dataclass(self) -> None: + import dataclasses + + assert dataclasses.is_dataclass(LambdaWorkerConfig) + + def test_default_field_independence(self) -> None: + """Each instance gets its own mutable containers.""" + a = LambdaWorkerConfig() + b = LambdaWorkerConfig() + a.worker_config["max_concurrent_activities"] = 99 + assert "max_concurrent_activities" not in b.worker_config + + +# ---- Defaults tests ---- + + +class TestDefaults: + def test_apply_lambda_worker_defaults(self) -> None: + config: WorkerConfig = {} + apply_lambda_worker_defaults(config) + assert ( + config.get("max_concurrent_activities") == DEFAULT_MAX_CONCURRENT_ACTIVITIES + ) + assert ( + config.get("max_concurrent_workflow_tasks") + == DEFAULT_MAX_CONCURRENT_WORKFLOW_TASKS + ) + assert ( + config.get("max_concurrent_local_activities") + == DEFAULT_MAX_CONCURRENT_LOCAL_ACTIVITIES + ) + assert ( + config.get("max_concurrent_nexus_tasks") + == DEFAULT_MAX_CONCURRENT_NEXUS_TASKS + ) + assert config.get("max_cached_workflows") == DEFAULT_MAX_CACHED_WORKFLOWS + assert config.get("disable_eager_activity_execution") is True + + def test_apply_lambda_worker_defaults_preserves_existing(self) -> None: + config: WorkerConfig = { + "max_concurrent_activities": 50, + "graceful_shutdown_timeout": timedelta(seconds=10), + } + apply_lambda_worker_defaults(config) + assert config.get("max_concurrent_activities") == 50 + assert config.get("graceful_shutdown_timeout") == timedelta(seconds=10) + assert config.get("disable_eager_activity_execution") is True + + def test_build_lambda_identity(self) -> None: + assert ( + build_lambda_identity("req-123", "arn:aws:lambda:us-east-1:123:function:f") + == "req-123@arn:aws:lambda:us-east-1:123:function:f" + ) + + def test_build_lambda_identity_empty(self) -> None: + assert build_lambda_identity("", "") == "unknown@unknown" + + def test_lambda_default_config_file_path_env_var(self) -> None: + env = {"TEMPORAL_CONFIG_FILE": "/custom/path.toml"} + assert ( + lambda_default_config_file_path(env.get) # type: ignore[arg-type] + == Path("/custom/path.toml") + ) + + def test_lambda_default_config_file_path_lambda_root(self) -> None: + env = {"LAMBDA_TASK_ROOT": "/var/task"} + assert ( + lambda_default_config_file_path(env.get) # type: ignore[arg-type] + == Path("/var/task/temporal.toml") + ) + + def test_lambda_default_config_file_path_cwd(self) -> None: + env: dict[str, str] = {} + assert ( + lambda_default_config_file_path(env.get) # type: ignore[arg-type] + == Path("temporal.toml") + ) + + +# ---- RunWorker tests ---- + + +def _make_lambda_context( + *, + remaining_ms: int = 3_600_000, + request_id: str = "req-123", + function_arn: str = "arn:aws:lambda:us-east-1:123:function:my-func", +) -> Any: + """Create a mock Lambda context object.""" + ctx = MagicMock() + ctx.get_remaining_time_in_millis.return_value = remaining_ms + ctx.aws_request_id = request_id + ctx.invoked_function_arn = function_arn + return ctx + + +def _make_test_deps( + *, + connect_kwargs_capture: list[dict[str, Any]] | None = None, + worker_kwargs_capture: list[dict[str, Any]] | None = None, +) -> _WorkerDeps: + """Create test deps with mocked connect and worker.""" + mock_client = MagicMock() + mock_worker = MagicMock() + mock_worker.run = AsyncMock() + + async def fake_connect(**kwargs: Any) -> Any: + if connect_kwargs_capture is not None: + connect_kwargs_capture.append(kwargs) + return mock_client + + def fake_create_worker(_client: Any, **kwargs: Any) -> Any: + if worker_kwargs_capture is not None: + worker_kwargs_capture.append(kwargs) + return mock_worker + + return _WorkerDeps( + connect=fake_connect, + create_worker=fake_create_worker, + load_config=lambda: ClientConfigProfile(), + getenv={"TEMPORAL_TASK_QUEUE": "test-queue"}.get, # type: ignore[arg-type] + extract_lambda_ctx=lambda ctx: ( + ctx.aws_request_id, + ctx.invoked_function_arn, + ) + if hasattr(ctx, "aws_request_id") + else None, + ) + + +class TestRunWorkerInternal: + def test_returns_handler(self) -> None: + deps = _make_test_deps() + handler = _run_worker_internal(TEST_VERSION, lambda config: None, deps) + assert callable(handler) + + def test_success(self) -> None: + deps = _make_test_deps() + + def configure(config: LambdaWorkerConfig) -> None: + config.worker_config["workflows"] = [type("FakeWf", (), {})] + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + handler({}, _make_lambda_context()) + + def test_configure_callback_error(self) -> None: + deps = _make_test_deps() + + def bad_configure(_config: LambdaWorkerConfig) -> None: + raise RuntimeError("bad config") + + with pytest.raises(RuntimeError, match="bad config"): + _run_worker_internal(TEST_VERSION, bad_configure, deps) + + def test_missing_task_queue(self) -> None: + deps = _make_test_deps() + deps.getenv = lambda _: None # type: ignore[assignment] + with pytest.raises(ValueError, match="task queue not configured"): + _run_worker_internal(TEST_VERSION, lambda config: None, deps) + + def test_missing_version(self) -> None: + deps = _make_test_deps() + with pytest.raises(ValueError, match="version is required"): + _run_worker_internal( + WorkerDeploymentVersion(deployment_name="", build_id=""), + lambda config: None, + deps, + ) + + def test_user_overrides_applied(self) -> None: + connect_capture: list[dict[str, Any]] = [] + worker_capture: list[dict[str, Any]] = [] + deps = _make_test_deps( + connect_kwargs_capture=connect_capture, + worker_kwargs_capture=worker_capture, + ) + + def configure(config: LambdaWorkerConfig) -> None: + config.worker_config["task_queue"] = "user-queue" + config.client_connect_config["namespace"] = "custom-ns" + config.worker_config["max_concurrent_activities"] = 99 + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + handler({}, _make_lambda_context()) + + assert connect_capture[0]["namespace"] == "custom-ns" + assert worker_capture[0]["max_concurrent_activities"] == 99 + + def test_lambda_defaults_applied(self) -> None: + worker_capture: list[dict[str, Any]] = [] + deps = _make_test_deps(worker_kwargs_capture=worker_capture) + handler = _run_worker_internal(TEST_VERSION, lambda config: None, deps) + handler({}, _make_lambda_context()) + + kwargs = worker_capture[0] + assert kwargs["max_concurrent_activities"] == DEFAULT_MAX_CONCURRENT_ACTIVITIES + assert ( + kwargs["max_concurrent_workflow_tasks"] + == DEFAULT_MAX_CONCURRENT_WORKFLOW_TASKS + ) + assert kwargs["disable_eager_activity_execution"] is True + dc = kwargs["deployment_config"] + assert dc.use_worker_versioning is True + assert dc.version == TEST_VERSION + + def test_identity_from_lambda_context(self) -> None: + connect_capture: list[dict[str, Any]] = [] + deps = _make_test_deps(connect_kwargs_capture=connect_capture) + handler = _run_worker_internal(TEST_VERSION, lambda config: None, deps) + handler( + {}, + _make_lambda_context( + request_id="req-abc-123", + function_arn="arn:aws:lambda:us-east-1:123456:function:my-func", + ), + ) + + assert ( + connect_capture[0]["identity"] + == "req-abc-123@arn:aws:lambda:us-east-1:123456:function:my-func" + ) + + def test_identity_user_override_wins(self) -> None: + connect_capture: list[dict[str, Any]] = [] + deps = _make_test_deps(connect_kwargs_capture=connect_capture) + + def configure(config: LambdaWorkerConfig) -> None: + config.client_connect_config["identity"] = "my-custom-identity" + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + handler({}, _make_lambda_context()) + assert connect_capture[0]["identity"] == "my-custom-identity" + + def test_identity_no_lambda_context(self) -> None: + connect_capture: list[dict[str, Any]] = [] + deps = _make_test_deps(connect_kwargs_capture=connect_capture) + deps.extract_lambda_ctx = lambda ctx: None + handler = _run_worker_internal(TEST_VERSION, lambda config: None, deps) + handler({}, MagicMock(spec=[])) + assert "identity" not in connect_capture[0] + + def test_shutdown_hooks_called(self) -> None: + deps = _make_test_deps() + shutdown_called = False + + def configure(config: LambdaWorkerConfig) -> None: + def hook() -> None: + nonlocal shutdown_called + shutdown_called = True + + config.shutdown_hooks.append(hook) + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + handler({}, _make_lambda_context()) + assert shutdown_called + + def test_shutdown_hooks_called_per_invocation(self) -> None: + deps = _make_test_deps() + shutdown_count = 0 + + def configure(config: LambdaWorkerConfig) -> None: + def hook() -> None: + nonlocal shutdown_count + shutdown_count += 1 + + config.shutdown_hooks.append(hook) + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + handler({}, _make_lambda_context()) + handler({}, _make_lambda_context()) + handler({}, _make_lambda_context()) + assert shutdown_count == 3 + + def test_shutdown_hooks_multiple_funcs_order(self) -> None: + deps = _make_test_deps() + order: list[str] = [] + + def configure(config: LambdaWorkerConfig) -> None: + config.shutdown_hooks.append(lambda: order.append("first")) + config.shutdown_hooks.append(lambda: order.append("second")) + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + handler({}, _make_lambda_context()) + assert order == ["first", "second"] + + def test_shutdown_hooks_error_continues(self) -> None: + deps = _make_test_deps() + second_called = False + + def configure(config: LambdaWorkerConfig) -> None: + def failing() -> None: + raise RuntimeError("flush failed") + + def second() -> None: + nonlocal second_called + second_called = True + + config.shutdown_hooks.append(failing) + config.shutdown_hooks.append(second) + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + handler({}, _make_lambda_context()) + assert second_called + + def test_tight_deadline_raises_error(self) -> None: + deps = _make_test_deps() + + def configure(config: LambdaWorkerConfig) -> None: + config.shutdown_deadline_buffer = timedelta(milliseconds=1500) + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + with pytest.raises(RuntimeError, match="Lambda timeout is too short"): + handler({}, _make_lambda_context(remaining_ms=2000)) + + def test_tight_deadline_logs_warning(self) -> None: + deps = _make_test_deps() + + def configure(config: LambdaWorkerConfig) -> None: + config.shutdown_deadline_buffer = timedelta(milliseconds=500) + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + with patch( + "temporalio.contrib.aws.lambda_worker._run_worker.logger" + ) as mock_logger: + handler({}, _make_lambda_context(remaining_ms=2000)) + mock_logger.warning.assert_called_once() + assert "less than 5s" in mock_logger.warning.call_args[0][0] + + def test_per_invocation_lifecycle(self) -> None: + """Each invocation creates its own client and worker.""" + connect_count = 0 + deps = _make_test_deps() + original_connect = deps.connect + + async def counting_connect(**kwargs: Any) -> Any: + nonlocal connect_count + connect_count += 1 + return await original_connect(**kwargs) + + deps.connect = counting_connect + + handler = _run_worker_internal(TEST_VERSION, lambda config: None, deps) + handler({}, _make_lambda_context()) + handler({}, _make_lambda_context()) + handler({}, _make_lambda_context()) + assert connect_count == 3 + + def test_task_queue_from_config(self) -> None: + worker_capture: list[dict[str, Any]] = [] + deps = _make_test_deps(worker_kwargs_capture=worker_capture) + deps.getenv = lambda _: None # type: ignore[assignment] + + def configure(config: LambdaWorkerConfig) -> None: + config.worker_config["task_queue"] = "explicit-queue" + + handler = _run_worker_internal(TEST_VERSION, configure, deps) + handler({}, _make_lambda_context()) + assert worker_capture[0]["task_queue"] == "explicit-queue" + + def test_task_queue_pre_populated_from_env(self) -> None: + """Task queue is pre-populated from TEMPORAL_TASK_QUEUE env var.""" + deps = _make_test_deps() + task_queues: list[str | None] = [] + + def configure(config: LambdaWorkerConfig) -> None: + task_queues.append(config.worker_config.get("task_queue")) + + _run_worker_internal(TEST_VERSION, configure, deps) + assert task_queues[0] == "test-queue" + + def test_config_pre_populated_with_defaults(self) -> None: + """Configure callback receives pre-populated LambdaWorkerConfig.""" + deps = _make_test_deps() + captured: list[LambdaWorkerConfig] = [] + + def configure(config: LambdaWorkerConfig) -> None: + captured.append(config) + + _run_worker_internal(TEST_VERSION, configure, deps) + wc = captured[0].worker_config + assert wc.get("max_concurrent_activities") == DEFAULT_MAX_CONCURRENT_ACTIVITIES + assert wc.get("disable_eager_activity_execution") is True + dc = wc.get("deployment_config") + assert dc is not None + assert dc.use_worker_versioning is True + assert dc.version == TEST_VERSION + + def test_no_deadline_runs_until_complete(self) -> None: + """When no deadline is available, worker runs until it completes.""" + deps = _make_test_deps() + handler = _run_worker_internal(TEST_VERSION, lambda config: None, deps) + ctx = MagicMock(spec=["aws_request_id", "invoked_function_arn"]) + ctx.aws_request_id = "req-123" + ctx.invoked_function_arn = "arn:aws:lambda:us-east-1:123:function:f" + handler({}, ctx) diff --git a/tests/contrib/aws/lambda_worker/test_otel.py b/tests/contrib/aws/lambda_worker/test_otel.py new file mode 100644 index 000000000..97f3d9647 --- /dev/null +++ b/tests/contrib/aws/lambda_worker/test_otel.py @@ -0,0 +1,167 @@ +"""Tests for temporalio.contrib.aws.lambda_worker.otel.""" + +from __future__ import annotations + +from datetime import timedelta +from unittest.mock import patch + +import pytest + +from temporalio.contrib.aws.lambda_worker._configure import ( + LambdaWorkerConfig, + _run_shutdown_hooks, +) +from temporalio.contrib.aws.lambda_worker.otel import ( + OtelOptions, + apply_defaults, + apply_tracing, + build_metrics_telemetry_config, +) +from temporalio.contrib.opentelemetry import OpenTelemetryPlugin +from temporalio.runtime import OpenTelemetryConfig, TelemetryConfig + + +class TestApplyTracing: + def test_adds_plugin(self) -> None: + config = LambdaWorkerConfig() + apply_tracing(config) + plugins = config.worker_config.get("plugins", []) + assert len(plugins) == 1 + assert isinstance(plugins[0], OpenTelemetryPlugin) + + def test_appends_to_existing_plugins(self) -> None: + config = LambdaWorkerConfig() + existing = OpenTelemetryPlugin() + config.worker_config["plugins"] = [existing] + apply_tracing(config) + plugins = config.worker_config["plugins"] + assert len(plugins) == 2 + assert plugins[0] is existing + + def test_registers_flush_shutdown_hook(self) -> None: + config = LambdaWorkerConfig() + apply_tracing(config) + assert len(config.shutdown_hooks) == 1 + + @pytest.mark.asyncio + async def test_shutdown_hook_flushes(self) -> None: + config = LambdaWorkerConfig() + apply_tracing(config) + # Should not raise even with the default noop global provider. + await _run_shutdown_hooks(config) + + +class TestBuildMetricsTelemetryConfig: + def test_returns_telemetry_config(self) -> None: + tc = build_metrics_telemetry_config(endpoint="http://localhost:4317") + assert isinstance(tc, TelemetryConfig) + assert isinstance(tc.metrics, OpenTelemetryConfig) + assert tc.metrics.url == "http://localhost:4317" + + def test_default_endpoint(self) -> None: + tc = build_metrics_telemetry_config() + assert isinstance(tc.metrics, OpenTelemetryConfig) + assert tc.metrics.url == "http://localhost:4317" + + def test_service_name_as_global_tag(self) -> None: + tc = build_metrics_telemetry_config(service_name="my-svc") + assert tc.global_tags.get("service_name") == "my-svc" + + def test_no_service_name_no_tag(self) -> None: + tc = build_metrics_telemetry_config() + assert "service_name" not in tc.global_tags + + def test_metric_periodicity(self) -> None: + tc = build_metrics_telemetry_config(metric_periodicity=timedelta(seconds=30)) + assert isinstance(tc.metrics, OpenTelemetryConfig) + assert tc.metrics.metric_periodicity == timedelta(seconds=30) + + def test_composable_with_custom_runtime(self) -> None: + """User can compose the returned config into a custom Runtime.""" + import dataclasses + + tc = build_metrics_telemetry_config(endpoint="http://localhost:4317") + custom_tc = dataclasses.replace(tc, logging=None) + assert custom_tc.logging is None + assert isinstance(custom_tc.metrics, OpenTelemetryConfig) + + +class TestApplyDefaults: + def test_configures_metrics_and_tracing(self) -> None: + config = LambdaWorkerConfig() + apply_defaults(config, OtelOptions(collector_endpoint="http://localhost:4317")) + + # Metrics: runtime should be set. + assert "runtime" in config.client_connect_config + # Tracing: plugin should be added. + plugins = config.worker_config.get("plugins", []) + assert len(plugins) == 1 + assert isinstance(plugins[0], OpenTelemetryPlugin) + # Shutdown hook for tracer flush. + assert len(config.shutdown_hooks) == 1 + + def test_sets_global_tracer_provider(self) -> None: + from opentelemetry.trace import get_tracer_provider + + from temporalio.contrib.opentelemetry._tracer_provider import ( + ReplaySafeTracerProvider, + ) + + config = LambdaWorkerConfig() + apply_defaults(config) + provider = get_tracer_provider() + assert isinstance(provider, ReplaySafeTracerProvider) + + def test_service_name_from_options(self) -> None: + config = LambdaWorkerConfig() + apply_defaults(config, OtelOptions(service_name="my-service")) + assert "runtime" in config.client_connect_config + + def test_service_name_from_env(self) -> None: + config = LambdaWorkerConfig() + with patch.dict("os.environ", {"OTEL_SERVICE_NAME": "env-service"}): + apply_defaults(config) + assert "runtime" in config.client_connect_config + + def test_service_name_from_lambda_function_name(self) -> None: + config = LambdaWorkerConfig() + with patch.dict( + "os.environ", + {"AWS_LAMBDA_FUNCTION_NAME": "my-lambda"}, + clear=True, + ): + apply_defaults(config) + assert "runtime" in config.client_connect_config + + def test_endpoint_from_env(self) -> None: + config = LambdaWorkerConfig() + with patch.dict( + "os.environ", + {"OTEL_EXPORTER_OTLP_ENDPOINT": "http://custom:4317"}, + ): + apply_defaults(config) + assert "runtime" in config.client_connect_config + + def test_default_options_used_when_none(self) -> None: + config = LambdaWorkerConfig() + apply_defaults(config) + assert "runtime" in config.client_connect_config + assert len(config.shutdown_hooks) == 1 + + +class TestOtelOptions: + def test_defaults(self) -> None: + opts = OtelOptions() + assert opts.service_name == "" + assert opts.collector_endpoint == "" + assert opts.metric_periodicity == timedelta(seconds=10) + + def test_custom_values(self) -> None: + opts = OtelOptions( + service_name="svc", + collector_endpoint="http://host:4317", + metric_periodicity=timedelta(seconds=30), + ) + assert opts.service_name == "svc" + assert opts.collector_endpoint == "http://host:4317" + assert opts.metric_periodicity == timedelta(seconds=30) diff --git a/tests/contrib/google_adk_agents/test_google_adk_agents.py b/tests/contrib/google_adk_agents/test_google_adk_agents.py index 4d41b6a82..22e6be4d8 100644 --- a/tests/contrib/google_adk_agents/test_google_adk_agents.py +++ b/tests/contrib/google_adk_agents/test_google_adk_agents.py @@ -501,7 +501,10 @@ async def test_mcp_agent(client: Client, use_local_model: bool): @pytest.mark.asyncio -async def test_single_agent_telemetry(client: Client): +async def test_single_agent_telemetry( + client: Client, + reset_otel_tracer_provider, # type: ignore[reportUnusedParameter] +): exporter = InMemorySpanExporter() provider = create_tracer_provider() provider.add_span_processor(SimpleSpanProcessor(exporter)) diff --git a/tests/contrib/openai_agents/test_openai_tracing.py b/tests/contrib/openai_agents/test_openai_tracing.py index a13911ac2..5414f7916 100644 --- a/tests/contrib/openai_agents/test_openai_tracing.py +++ b/tests/contrib/openai_agents/test_openai_tracing.py @@ -265,19 +265,16 @@ def print_otel_spans(spans: tuple[ReadableSpan, ...]): def set_test_tracer_provider() -> InMemorySpanExporter: exporter = InMemorySpanExporter() - # Reset global so tests don't conflict - from opentelemetry.util._once import Once - - opentelemetry.trace._TRACER_PROVIDER_SET_ONCE = Once() - opentelemetry.trace._TRACER_PROVIDER = None - provider = create_tracer_provider() provider.add_span_processor(SimpleSpanProcessor(exporter)) opentelemetry.trace.set_tracer_provider(provider) return exporter -async def test_external_trace_to_workflow_spans(client: Client): +async def test_external_trace_to_workflow_spans( + client: Client, + reset_otel_tracer_provider: Any, # type: ignore[reportUnusedParameter] +): """Test: External trace -> workflow spans (with worker restart).""" exporter = set_test_tracer_provider() workflow_id = None @@ -364,7 +361,10 @@ async def ready() -> bool: ), f"All spans should have unique IDs, got: {span_ids}" -async def test_external_trace_and_span_to_workflow_spans(client: Client): +async def test_external_trace_and_span_to_workflow_spans( + client: Client, + reset_otel_tracer_provider: Any, # type: ignore[reportUnusedParameter] +): """Test: External trace + span -> workflow spans (with worker restart).""" exporter = set_test_tracer_provider() workflow_id = None @@ -461,7 +461,10 @@ async def ready() -> bool: ), f"All spans should have unique IDs, got: {span_ids}" -async def test_workflow_only_trace_to_spans(client: Client): +async def test_workflow_only_trace_to_spans( + client: Client, + reset_otel_tracer_provider: Any, # type: ignore[reportUnusedParameter] +): """Test: Workflow-only trace -> spans (with worker restart).""" exporter = set_test_tracer_provider() workflow_id = None @@ -551,7 +554,10 @@ async def run(self) -> str: return "done" -async def test_custom_span_without_trace_context(client: Client): +async def test_custom_span_without_trace_context( + client: Client, + reset_otel_tracer_provider: Any, # type: ignore[reportUnusedParameter] +): """Test that custom_span() without a trace context emits no spans. This validates our hypothesis about why the main test fails: @@ -591,7 +597,10 @@ async def test_custom_span_without_trace_context(client: Client): ), f"Expected no spans without trace context, but found: {[s.name for s in spans]}" -async def test_otel_tracing_in_runner(client: Client): +async def test_otel_tracing_in_runner( + client: Client, + reset_otel_tracer_provider: Any, # type: ignore[reportUnusedParameter] +): """Test the tracing when executing an actual OpenAI Runner.""" exporter = set_test_tracer_provider() @@ -750,7 +759,10 @@ def proceed(self) -> None: self._proceed = True -async def test_sdk_trace_to_otel_span_parenting(client: Client): +async def test_sdk_trace_to_otel_span_parenting( + client: Client, + reset_otel_tracer_provider: Any, # type: ignore[reportUnusedParameter] +): """Test that OTEL spans started in workflow are properly parented to client SDK trace.""" exporter = set_test_tracer_provider() workflow_id = None diff --git a/tests/contrib/opentelemetry/test_opentelemetry_plugin.py b/tests/contrib/opentelemetry/test_opentelemetry_plugin.py index b2ff2f913..dd1b20024 100644 --- a/tests/contrib/opentelemetry/test_opentelemetry_plugin.py +++ b/tests/contrib/opentelemetry/test_opentelemetry_plugin.py @@ -13,7 +13,6 @@ from opentelemetry.trace import ( get_tracer, ) -from opentelemetry.util._once import Once import temporalio.contrib.opentelemetry.workflow from temporalio import activity, nexus, workflow @@ -30,16 +29,6 @@ logger = logging.getLogger(__name__) -@pytest.fixture -def reset_otel_tracer_provider(): - """Reset OpenTelemetry tracer provider state to allow multiple test runs.""" - opentelemetry.trace._TRACER_PROVIDER_SET_ONCE = Once() - opentelemetry.trace._TRACER_PROVIDER = None - yield - opentelemetry.trace._TRACER_PROVIDER_SET_ONCE = Once() - opentelemetry.trace._TRACER_PROVIDER = None - - @activity.defn async def simple_no_context_activity() -> str: with get_tracer(__name__).start_as_current_span("Activity"): diff --git a/tests/test_client.py b/tests/test_client.py index b8bebdaf7..530c166f0 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1552,6 +1552,21 @@ def test_fork_create_client( self.run(mp_fork_ctx) +def test_client_connect_config_matches_connect_params(): + """ClientConnectConfig TypedDict keys must match Client.connect kwargs.""" + import inspect + + from temporalio.client import Client, ClientConnectConfig + + connect_params = set(inspect.signature(Client.connect).parameters.keys()) - {"cls"} + config_keys = set(ClientConnectConfig.__annotations__.keys()) + assert config_keys == connect_params, ( + f"ClientConnectConfig is out of sync with Client.connect. " + f"Missing from config: {connect_params - config_keys}. " + f"Extra in config: {config_keys - connect_params}." + ) + + class TestForkUseClient(_TestFork): async def coro(self): await self._client.start_workflow( diff --git a/tests/worker/test_worker.py b/tests/worker/test_worker.py index 4c76a7ba9..4aa366735 100644 --- a/tests/worker/test_worker.py +++ b/tests/worker/test_worker.py @@ -1543,3 +1543,18 @@ async def test_continue_as_new_with_version_upgrade( # Expect workflow to return "v2.0", indicating that it continued-as-new and completed on v2 result = await handle.result() assert result == "v2.0" + + +def test_worker_config_matches_init_params(): + """WorkerConfig TypedDict keys must match Worker.__init__ kwargs.""" + import inspect + + from temporalio.worker import Worker, WorkerConfig + + init_params = set(inspect.signature(Worker.__init__).parameters.keys()) - {"self"} + config_keys = set(WorkerConfig.__annotations__.keys()) + assert config_keys == init_params, ( + f"WorkerConfig is out of sync with Worker.__init__. " + f"Missing from config: {init_params - config_keys}. " + f"Extra in config: {config_keys - init_params}." + ) diff --git a/uv.lock b/uv.lock index c63faefad..c0fe6ec17 100644 --- a/uv.lock +++ b/uv.lock @@ -3379,6 +3379,24 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/08/13/b4ef09837409a777f3c0af2a5b4ba9b7af34872bc43609dda0c209e4060d/opentelemetry_exporter_otlp_proto_common-1.37.0-py3-none-any.whl", hash = "sha256:53038428449c559b0c564b8d718df3314da387109c4d36bd1b94c9a641b0292e", size = 18359, upload-time = "2025-09-11T10:28:44.939Z" }, ] +[[package]] +name = "opentelemetry-exporter-otlp-proto-grpc" +version = "1.37.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "googleapis-common-protos" }, + { name = "grpcio" }, + { name = "opentelemetry-api" }, + { name = "opentelemetry-exporter-otlp-proto-common" }, + { name = "opentelemetry-proto" }, + { name = "opentelemetry-sdk" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/d1/11/4ad0979d0bb13ae5a845214e97c8d42da43980034c30d6f72d8e0ebe580e/opentelemetry_exporter_otlp_proto_grpc-1.37.0.tar.gz", hash = "sha256:f55bcb9fc848ce05ad3dd954058bc7b126624d22c4d9e958da24d8537763bec5", size = 24465, upload-time = "2025-09-11T10:29:04.172Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/39/17/46630b74751031a658706bef23ac99cdc2953cd3b2d28ec90590a0766b3e/opentelemetry_exporter_otlp_proto_grpc-1.37.0-py3-none-any.whl", hash = "sha256:aee5104835bf7993b7ddaaf380b6467472abaedb1f1dbfcc54a52a7d781a3890", size = 19305, upload-time = "2025-09-11T10:28:45.776Z" }, +] + [[package]] name = "opentelemetry-exporter-otlp-proto-http" version = "1.37.0" @@ -3453,6 +3471,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/9f/62/9f4ad6a54126fb00f7ed4bb5034964c6e4f00fcd5a905e115bd22707e20d/opentelemetry_sdk-1.37.0-py3-none-any.whl", hash = "sha256:8f3c3c22063e52475c5dbced7209495c2c16723d016d39287dfc215d1771257c", size = 131941, upload-time = "2025-09-11T10:28:57.83Z" }, ] +[[package]] +name = "opentelemetry-sdk-extension-aws" +version = "2.1.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-sdk" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/f5/b3/825c93fe4c238845f1356297abea33d03b2adaafb5ae98fc257b394de124/opentelemetry_sdk_extension_aws-2.1.0.tar.gz", hash = "sha256:ff68ddecc1910f62c019d22ec0f7461713ead7f662d6a2304d4089c1a0b20416", size = 16334, upload-time = "2024-12-24T15:01:57.387Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/02/61/47a6a43b7935d54b5734fbf3fb0357dd5a7d0dfaa9677b7318518fe8d507/opentelemetry_sdk_extension_aws-2.1.0-py3-none-any.whl", hash = "sha256:c7cf6efc275d2c24108a468d954287ce5aab9733bac816a080cfb3117374e63a", size = 18776, upload-time = "2024-12-24T15:01:56.053Z" }, +] + [[package]] name = "opentelemetry-semantic-conventions" version = "0.58b0" @@ -4801,6 +4831,13 @@ google-adk = [ grpc = [ { name = "grpcio" }, ] +lambda-worker-otel = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-exporter-otlp-proto-grpc" }, + { name = "opentelemetry-sdk" }, + { name = "opentelemetry-sdk-extension-aws" }, + { name = "opentelemetry-semantic-conventions" }, +] openai-agents = [ { name = "mcp" }, { name = "openai-agents" }, @@ -4828,6 +4865,9 @@ dev = [ { name = "openai-agents", extra = ["litellm"], marker = "python_full_version < '3.14'" }, { name = "openinference-instrumentation-google-adk" }, { name = "openinference-instrumentation-openai-agents" }, + { name = "opentelemetry-exporter-otlp-proto-grpc" }, + { name = "opentelemetry-sdk-extension-aws" }, + { name = "opentelemetry-semantic-conventions" }, { name = "psutil" }, { name = "pydocstyle" }, { name = "pydoctor" }, @@ -4851,16 +4891,21 @@ requires-dist = [ { name = "mcp", marker = "extra == 'openai-agents'", specifier = ">=1.9.4,<2" }, { name = "nexus-rpc", specifier = "==1.4.0" }, { name = "openai-agents", marker = "extra == 'openai-agents'", specifier = ">=0.3,<0.7" }, + { name = "opentelemetry-api", marker = "extra == 'lambda-worker-otel'", specifier = ">=1.11.1,<2" }, { name = "opentelemetry-api", marker = "extra == 'opentelemetry'", specifier = ">=1.11.1,<2" }, + { name = "opentelemetry-exporter-otlp-proto-grpc", marker = "extra == 'lambda-worker-otel'", specifier = ">=1.11.1,<2" }, + { name = "opentelemetry-sdk", marker = "extra == 'lambda-worker-otel'", specifier = ">=1.11.1,<2" }, { name = "opentelemetry-sdk", marker = "extra == 'opentelemetry'", specifier = ">=1.11.1,<2" }, + { name = "opentelemetry-sdk-extension-aws", marker = "extra == 'lambda-worker-otel'", specifier = ">=2.0.0,<3" }, + { name = "opentelemetry-semantic-conventions", marker = "extra == 'lambda-worker-otel'", specifier = ">=0.40b0,<1" }, { name = "protobuf", specifier = ">=3.20,<7.0.0" }, { name = "pydantic", marker = "extra == 'pydantic'", specifier = ">=2.0.0,<3" }, { name = "python-dateutil", marker = "python_full_version < '3.11'", specifier = ">=2.8.2,<3" }, { name = "types-aioboto3", extras = ["s3"], marker = "extra == 'aioboto3'", specifier = ">=10.4.0" }, - { name = "types-protobuf", specifier = ">=3.20" }, + { name = "types-protobuf", specifier = ">=3.20,<7.0.0" }, { name = "typing-extensions", specifier = ">=4.2.0,<5" }, ] -provides-extras = ["grpc", "opentelemetry", "pydantic", "openai-agents", "google-adk", "aioboto3"] +provides-extras = ["grpc", "opentelemetry", "pydantic", "openai-agents", "google-adk", "lambda-worker-otel", "aioboto3"] [package.metadata.requires-dev] dev = [ @@ -4877,6 +4922,9 @@ dev = [ { name = "openai-agents", extras = ["litellm"], marker = "python_full_version < '3.14'", specifier = ">=0.3,<0.7" }, { name = "openinference-instrumentation-google-adk", specifier = ">=0.1.8" }, { name = "openinference-instrumentation-openai-agents", specifier = ">=0.1.0" }, + { name = "opentelemetry-exporter-otlp-proto-grpc", specifier = ">=1.11.1,<2" }, + { name = "opentelemetry-sdk-extension-aws", specifier = ">=2.0.0,<3" }, + { name = "opentelemetry-semantic-conventions", specifier = ">=0.40b0,<1" }, { name = "psutil", specifier = ">=5.9.3,<6" }, { name = "pydocstyle", specifier = ">=6.3.0,<7" }, { name = "pydoctor", specifier = ">=25.10.1,<26" },