-
Notifications
You must be signed in to change notification settings - Fork 886
feat: add ability to configure flush behavior for AWS lambda instrumentation #4158
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,17 +40,34 @@ def lambda_handler(event, context): | |
|
|
||
| AwsLambdaInstrumentor().instrument() | ||
|
|
||
| Configuration | ||
| ------------- | ||
|
|
||
| Force Flush | ||
| *********** | ||
| By default, the instrumentation force flushes both traces and metrics after each Lambda invocation. | ||
| This behavior can be controlled with the following environment variables: | ||
|
|
||
| * ``OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH``: Enables or disables force flushing for all signal types. Defaults to ``true``. | ||
| * ``OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TRACES``: Enables or disables force flushing for traces. Defaults to the value of ``OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH``. | ||
| * ``OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_METRICS``: Enables or disables force flushing for metrics. Defaults to the value of ``OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH``. | ||
| * ``OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT``: The timeout in milliseconds for force flushing. Defaults to ``30000``. | ||
|
|
||
| The default value for signal-specific options inherits from ``OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH``. | ||
|
|
||
| API | ||
| --- | ||
|
|
||
| The `instrument` method accepts the following keyword args: | ||
|
|
||
| tracer_provider (TracerProvider) - an optional tracer provider | ||
| meter_provider (MeterProvider) - an optional meter provider | ||
| event_context_extractor (Callable) - a function that returns an OTel Trace | ||
| Context given the Lambda Event the AWS Lambda was invoked with | ||
| this function signature is: def event_context_extractor(lambda_event: Any) -> Context | ||
| for example: | ||
| * ``tracer_provider`` (TracerProvider) - an optional tracer provider | ||
| * ``meter_provider`` (MeterProvider) - an optional meter provider | ||
| * ``event_context_extractor`` (Callable) - a function that returns an OTel Trace Context given the Lambda Event the AWS Lambda was invoked with this function signature is: def event_context_extractor(lambda_event: Any) -> Context | ||
| * ``force_flush`` (bool) - enables or disables force flushing for all signal types. Defaults to True. | ||
| * ``flush_traces`` (bool) - enables or disables force flushing for traces. Defaults to the value of ``force_flush``. | ||
| * ``flush_metrics`` (bool) - enables or disables force flushing for metrics. Defaults to the value of ``force_flush``. | ||
|
|
||
| Example usage: | ||
|
|
||
| .. code:: python | ||
|
|
||
|
|
@@ -63,10 +80,10 @@ def custom_event_context_extractor(lambda_event): | |
| return get_global_textmap().extract(lambda_event["foo"]["headers"]) | ||
|
|
||
| AwsLambdaInstrumentor().instrument( | ||
| event_context_extractor=custom_event_context_extractor | ||
| event_context_extractor=custom_event_context_extractor, | ||
| force_flush=False, | ||
| flush_metrics=True, | ||
| ) | ||
|
|
||
| --- | ||
| """ | ||
|
|
||
| import logging | ||
|
|
@@ -122,6 +139,27 @@ def custom_event_context_extractor(lambda_event): | |
| OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT = ( | ||
| "OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT" | ||
| ) | ||
| OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH = ( | ||
| "OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH" | ||
| ) | ||
| OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TRACES = ( | ||
| "OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TRACES" | ||
| ) | ||
| OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_METRICS = ( | ||
| "OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_METRICS" | ||
| ) | ||
|
|
||
|
|
||
| def _get_env_bool(env_var: str, default: bool) -> bool: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm surprised there isn't something like this already in contrib nor the SDK repo -- I couldn't find one either. |
||
| value = os.environ.get(env_var) | ||
| if value is None: | ||
| return default | ||
| value = value.strip().lower() | ||
| if value in ("y", "yes", "t", "true", "on", "1"): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should just care about
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The issue I see with that is that we cannot allow explicitly setting this option to
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @xrmx thoughts? (on recently stale PR 🙂) |
||
| return True | ||
| if value in ("n", "no", "f", "false", "off", "0"): | ||
| return False | ||
| return default | ||
|
|
||
|
|
||
| def _default_event_context_extractor(lambda_event: Any) -> Context: | ||
|
|
@@ -272,6 +310,8 @@ def _instrument( | |
| event_context_extractor: Callable[[Any], Context], | ||
| tracer_provider: TracerProvider = None, | ||
| meter_provider: MeterProvider = None, | ||
| flush_traces: bool = True, | ||
| flush_metrics: bool = True, | ||
| ): | ||
| # pylint: disable=too-many-locals | ||
| # pylint: disable=too-many-statements | ||
|
|
@@ -389,27 +429,27 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches | |
|
|
||
| now = time.time() | ||
| _tracer_provider = tracer_provider or get_tracer_provider() | ||
| if hasattr(_tracer_provider, "force_flush"): | ||
| if flush_traces and hasattr(_tracer_provider, "force_flush"): | ||
| try: | ||
| # NOTE: `force_flush` before function quit in case of Lambda freeze. | ||
| _tracer_provider.force_flush(flush_timeout) | ||
| except Exception: # pylint: disable=broad-except | ||
| logger.exception("TracerProvider failed to flush traces") | ||
| else: | ||
| elif flush_traces and not hasattr(_tracer_provider, "force_flush"): | ||
| logger.warning( | ||
| "TracerProvider was missing `force_flush` method. This is necessary in case of a Lambda freeze and would exist in the OTel SDK implementation." | ||
| ) | ||
|
|
||
| _meter_provider = meter_provider or get_meter_provider() | ||
| if hasattr(_meter_provider, "force_flush"): | ||
| if flush_metrics and hasattr(_meter_provider, "force_flush"): | ||
| rem = flush_timeout - (time.time() - now) * 1000 | ||
| if rem > 0: | ||
| try: | ||
| # NOTE: `force_flush` before function quit in case of Lambda freeze. | ||
| _meter_provider.force_flush(rem) | ||
| except Exception: # pylint: disable=broad-except | ||
| logger.exception("MeterProvider failed to flush metrics") | ||
| else: | ||
| elif flush_metrics and not hasattr(_meter_provider, "force_flush"): | ||
| logger.warning( | ||
| "MeterProvider was missing `force_flush` method. This is necessary in case of a Lambda freeze and would exist in the OTel SDK implementation." | ||
| ) | ||
|
|
@@ -444,6 +484,12 @@ def _instrument(self, **kwargs): | |
| Event as input and extracts an OTel Context from it. By default, | ||
| the context is extracted from the HTTP headers of an API Gateway | ||
| request. | ||
| ``force_flush``: if True, force flush all signal types after each | ||
| invocation. Defaults to True. | ||
| ``flush_traces``: if True, force flush traces after each | ||
| invocation. Defaults to False. | ||
| ``flush_metrics``: if True, force flush metrics after each | ||
| invocation. Defaults to False. | ||
| """ | ||
|
|
||
| # Don't try if we are not running on AWS Lambda | ||
|
|
@@ -480,6 +526,11 @@ def _instrument(self, **kwargs): | |
| flush_timeout_env, | ||
| ) | ||
|
|
||
| force_flush = kwargs.get( | ||
| "force_flush", | ||
| _get_env_bool(OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH, True), | ||
| ) | ||
|
|
||
| _instrument( | ||
| self._wrapped_module_name, | ||
| self._wrapped_function_name, | ||
|
|
@@ -489,6 +540,18 @@ def _instrument(self, **kwargs): | |
| ), | ||
| tracer_provider=kwargs.get("tracer_provider"), | ||
| meter_provider=kwargs.get("meter_provider"), | ||
| flush_traces=kwargs.get( | ||
| "flush_traces", | ||
| _get_env_bool( | ||
| OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TRACES, force_flush | ||
| ), | ||
| ), | ||
| flush_metrics=kwargs.get( | ||
| "flush_metrics", | ||
| _get_env_bool( | ||
| OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_METRICS, force_flush | ||
| ), | ||
| ), | ||
| ) | ||
|
|
||
| def _uninstrument(self, **kwargs): | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If these are Python specific they should be
OTEL_PYTHON_prefixed, e.g.OTEL_PYTHON_AWS_LAMBDA_FORCE_FLUSH