diff --git a/CHANGELOG.md b/CHANGELOG.md index 1853b73414..26813203ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#4141](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4141)) - `opentelemetry-instrumentation-pyramid`: pass request attributes at span creation ([#4139](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4139)) +- `opentelemetry-instrumentation-aws-lambda`: add ability to configure flush behavior + ([#4158](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4158)) ### Fixed diff --git a/instrumentation/opentelemetry-instrumentation-aws-lambda/src/opentelemetry/instrumentation/aws_lambda/__init__.py b/instrumentation/opentelemetry-instrumentation-aws-lambda/src/opentelemetry/instrumentation/aws_lambda/__init__.py index c753e69f71..07b4c54e9a 100644 --- a/instrumentation/opentelemetry-instrumentation-aws-lambda/src/opentelemetry/instrumentation/aws_lambda/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-aws-lambda/src/opentelemetry/instrumentation/aws_lambda/__init__.py @@ -40,17 +40,34 @@ def lambda_handler(event, context): AwsLambdaInstrumentor().instrument() +Configuration +------------- + +Force Flush +*********** +By default, the instrumentation force flushes both traces and metrics after each Lambda invocation. +This behavior can be controlled with the following environment variables: + +* ``OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH``: Enables or disables force flushing for all signal types. Defaults to ``true``. +* ``OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TRACES``: Enables or disables force flushing for traces. Defaults to the value of ``OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH``. +* ``OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_METRICS``: Enables or disables force flushing for metrics. Defaults to the value of ``OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH``. +* ``OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT``: The timeout in milliseconds for force flushing. Defaults to ``30000``. + +The default value for signal-specific options inherits from ``OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH``. + API --- The `instrument` method accepts the following keyword args: -tracer_provider (TracerProvider) - an optional tracer provider -meter_provider (MeterProvider) - an optional meter provider -event_context_extractor (Callable) - a function that returns an OTel Trace -Context given the Lambda Event the AWS Lambda was invoked with -this function signature is: def event_context_extractor(lambda_event: Any) -> Context -for example: +* ``tracer_provider`` (TracerProvider) - an optional tracer provider +* ``meter_provider`` (MeterProvider) - an optional meter provider +* ``event_context_extractor`` (Callable) - a function that returns an OTel Trace Context given the Lambda Event the AWS Lambda was invoked with this function signature is: def event_context_extractor(lambda_event: Any) -> Context +* ``force_flush`` (bool) - enables or disables force flushing for all signal types. Defaults to True. +* ``flush_traces`` (bool) - enables or disables force flushing for traces. Defaults to the value of ``force_flush``. +* ``flush_metrics`` (bool) - enables or disables force flushing for metrics. Defaults to the value of ``force_flush``. + +Example usage: .. code:: python @@ -63,10 +80,10 @@ def custom_event_context_extractor(lambda_event): return get_global_textmap().extract(lambda_event["foo"]["headers"]) AwsLambdaInstrumentor().instrument( - event_context_extractor=custom_event_context_extractor + event_context_extractor=custom_event_context_extractor, + force_flush=False, + flush_metrics=True, ) - ---- """ import logging @@ -122,6 +139,27 @@ def custom_event_context_extractor(lambda_event): OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT = ( "OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT" ) +OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH = ( + "OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH" +) +OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TRACES = ( + "OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TRACES" +) +OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_METRICS = ( + "OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_METRICS" +) + + +def _get_env_bool(env_var: str, default: bool) -> bool: + value = os.environ.get(env_var) + if value is None: + return default + value = value.strip().lower() + if value in ("y", "yes", "t", "true", "on", "1"): + return True + if value in ("n", "no", "f", "false", "off", "0"): + return False + return default def _default_event_context_extractor(lambda_event: Any) -> Context: @@ -272,6 +310,8 @@ def _instrument( event_context_extractor: Callable[[Any], Context], tracer_provider: TracerProvider = None, meter_provider: MeterProvider = None, + flush_traces: bool = True, + flush_metrics: bool = True, ): # pylint: disable=too-many-locals # pylint: disable=too-many-statements @@ -389,19 +429,19 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches now = time.time() _tracer_provider = tracer_provider or get_tracer_provider() - if hasattr(_tracer_provider, "force_flush"): + if flush_traces and hasattr(_tracer_provider, "force_flush"): try: # NOTE: `force_flush` before function quit in case of Lambda freeze. _tracer_provider.force_flush(flush_timeout) except Exception: # pylint: disable=broad-except logger.exception("TracerProvider failed to flush traces") - else: + elif flush_traces and not hasattr(_tracer_provider, "force_flush"): logger.warning( "TracerProvider was missing `force_flush` method. This is necessary in case of a Lambda freeze and would exist in the OTel SDK implementation." ) _meter_provider = meter_provider or get_meter_provider() - if hasattr(_meter_provider, "force_flush"): + if flush_metrics and hasattr(_meter_provider, "force_flush"): rem = flush_timeout - (time.time() - now) * 1000 if rem > 0: try: @@ -409,7 +449,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches _meter_provider.force_flush(rem) except Exception: # pylint: disable=broad-except logger.exception("MeterProvider failed to flush metrics") - else: + elif flush_metrics and not hasattr(_meter_provider, "force_flush"): logger.warning( "MeterProvider was missing `force_flush` method. This is necessary in case of a Lambda freeze and would exist in the OTel SDK implementation." ) @@ -444,6 +484,12 @@ def _instrument(self, **kwargs): Event as input and extracts an OTel Context from it. By default, the context is extracted from the HTTP headers of an API Gateway request. + ``force_flush``: if True, force flush all signal types after each + invocation. Defaults to True. + ``flush_traces``: if True, force flush traces after each + invocation. Defaults to False. + ``flush_metrics``: if True, force flush metrics after each + invocation. Defaults to False. """ # Don't try if we are not running on AWS Lambda @@ -480,6 +526,11 @@ def _instrument(self, **kwargs): flush_timeout_env, ) + force_flush = kwargs.get( + "force_flush", + _get_env_bool(OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH, True), + ) + _instrument( self._wrapped_module_name, self._wrapped_function_name, @@ -489,6 +540,18 @@ def _instrument(self, **kwargs): ), tracer_provider=kwargs.get("tracer_provider"), meter_provider=kwargs.get("meter_provider"), + flush_traces=kwargs.get( + "flush_traces", + _get_env_bool( + OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TRACES, force_flush + ), + ), + flush_metrics=kwargs.get( + "flush_metrics", + _get_env_bool( + OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_METRICS, force_flush + ), + ), ) def _uninstrument(self, **kwargs): diff --git a/instrumentation/opentelemetry-instrumentation-aws-lambda/tests/test_aws_lambda_instrumentation_manual.py b/instrumentation/opentelemetry-instrumentation-aws-lambda/tests/test_aws_lambda_instrumentation_manual.py index a468cb986a..e7aa1c2186 100644 --- a/instrumentation/opentelemetry-instrumentation-aws-lambda/tests/test_aws_lambda_instrumentation_manual.py +++ b/instrumentation/opentelemetry-instrumentation-aws-lambda/tests/test_aws_lambda_instrumentation_manual.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +# pylint: disable=no-self-use import logging import os @@ -25,7 +26,10 @@ from opentelemetry.instrumentation.aws_lambda import ( _HANDLER, _X_AMZN_TRACE_ID, + OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_METRICS, OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT, + OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TRACES, + OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH, AwsLambdaInstrumentor, ) from opentelemetry.propagate import get_global_textmap @@ -561,6 +565,157 @@ def test_load_entry_point(self): AwsLambdaInstrumentor, ) + @mock.patch("opentelemetry.instrumentation.aws_lambda.get_meter_provider") + @mock.patch("opentelemetry.instrumentation.aws_lambda.get_tracer_provider") + def test_force_flush_default( + self, mock_get_tracer_provider, mock_get_meter_provider + ): + mock_tracer_provider = mock.Mock() + mock_meter_provider = mock.Mock() + mock_get_tracer_provider.return_value = mock_tracer_provider + mock_get_meter_provider.return_value = mock_meter_provider + + AwsLambdaInstrumentor().instrument() + mock_execute_lambda() + + mock_tracer_provider.force_flush.assert_called_once() + mock_meter_provider.force_flush.assert_called_once() + + @mock.patch("opentelemetry.instrumentation.aws_lambda.get_meter_provider") + @mock.patch("opentelemetry.instrumentation.aws_lambda.get_tracer_provider") + @mock.patch.dict( + "os.environ", + { + OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH: "false", + }, + ) + def test_force_flush_disabled_globally( + self, mock_get_tracer_provider, mock_get_meter_provider + ): + mock_tracer_provider = mock.Mock() + mock_meter_provider = mock.Mock() + mock_get_tracer_provider.return_value = mock_tracer_provider + mock_get_meter_provider.return_value = mock_meter_provider + + AwsLambdaInstrumentor().instrument() + mock_execute_lambda() + + mock_tracer_provider.force_flush.assert_not_called() + mock_meter_provider.force_flush.assert_not_called() + + @mock.patch("opentelemetry.instrumentation.aws_lambda.get_meter_provider") + @mock.patch("opentelemetry.instrumentation.aws_lambda.get_tracer_provider") + @mock.patch.dict( + "os.environ", + { + OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH: "false", + OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TRACES: "true", + }, + ) + def test_force_flush_selective_traces( + self, mock_get_tracer_provider, mock_get_meter_provider + ): + mock_tracer_provider = mock.Mock() + mock_meter_provider = mock.Mock() + mock_get_tracer_provider.return_value = mock_tracer_provider + mock_get_meter_provider.return_value = mock_meter_provider + + AwsLambdaInstrumentor().instrument() + mock_execute_lambda() + + mock_tracer_provider.force_flush.assert_called_once() + mock_meter_provider.force_flush.assert_not_called() + + @mock.patch("opentelemetry.instrumentation.aws_lambda.get_meter_provider") + @mock.patch("opentelemetry.instrumentation.aws_lambda.get_tracer_provider") + @mock.patch.dict( + "os.environ", + { + OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH: "false", + OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_METRICS: "true", + }, + ) + def test_force_flush_selective_metrics( + self, mock_get_tracer_provider, mock_get_meter_provider + ): + mock_tracer_provider = mock.Mock() + mock_meter_provider = mock.Mock() + mock_get_tracer_provider.return_value = mock_tracer_provider + mock_get_meter_provider.return_value = mock_meter_provider + + AwsLambdaInstrumentor().instrument() + mock_execute_lambda() + + mock_tracer_provider.force_flush.assert_not_called() + mock_meter_provider.force_flush.assert_called_once() + + @mock.patch("opentelemetry.instrumentation.aws_lambda.get_meter_provider") + @mock.patch("opentelemetry.instrumentation.aws_lambda.get_tracer_provider") + @mock.patch.dict( + "os.environ", + { + OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TRACES: "false", + }, + ) + def test_force_flush_disabled_selective_traces( + self, mock_get_tracer_provider, mock_get_meter_provider + ): + mock_tracer_provider = mock.Mock() + mock_meter_provider = mock.Mock() + mock_get_tracer_provider.return_value = mock_tracer_provider + mock_get_meter_provider.return_value = mock_meter_provider + + AwsLambdaInstrumentor().instrument() + mock_execute_lambda() + + mock_tracer_provider.force_flush.assert_not_called() + mock_meter_provider.force_flush.assert_called_once() + + @mock.patch("opentelemetry.instrumentation.aws_lambda.get_meter_provider") + @mock.patch("opentelemetry.instrumentation.aws_lambda.get_tracer_provider") + @mock.patch.dict( + "os.environ", + { + OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_METRICS: "false", + }, + ) + def test_force_flush_disabled_selective_metrics( + self, mock_get_tracer_provider, mock_get_meter_provider + ): + mock_tracer_provider = mock.Mock() + mock_meter_provider = mock.Mock() + mock_get_tracer_provider.return_value = mock_tracer_provider + mock_get_meter_provider.return_value = mock_meter_provider + + AwsLambdaInstrumentor().instrument() + mock_execute_lambda() + + mock_tracer_provider.force_flush.assert_called_once() + mock_meter_provider.force_flush.assert_not_called() + + @mock.patch("opentelemetry.instrumentation.aws_lambda.get_meter_provider") + @mock.patch("opentelemetry.instrumentation.aws_lambda.get_tracer_provider") + @mock.patch.dict( + "os.environ", {OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH: "true"} + ) + def test_force_flush_instrumentation_options_override( + self, mock_get_tracer_provider, mock_get_meter_provider + ): + mock_tracer_provider = mock.Mock() + mock_meter_provider = mock.Mock() + mock_get_tracer_provider.return_value = mock_tracer_provider + mock_get_meter_provider.return_value = mock_meter_provider + + # Pass options that override the environment variable + AwsLambdaInstrumentor().instrument( + force_flush=False, + flush_metrics=True, + ) + mock_execute_lambda() + + mock_tracer_provider.force_flush.assert_not_called() + mock_meter_provider.force_flush.assert_called_once() + class TestAwsLambdaInstrumentorMocks(TestAwsLambdaInstrumentorBase): def test_api_gateway_proxy_event_sets_attributes(self):