Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#4141](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4141))
- `opentelemetry-instrumentation-pyramid`: pass request attributes at span creation
([#4139](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4139))
- `opentelemetry-instrumentation-aws-lambda`: add ability to configure flush behavior
([#4158](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4158))

### Fixed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,34 @@ def lambda_handler(event, context):

AwsLambdaInstrumentor().instrument()

Configuration
-------------

Force Flush
***********
By default, the instrumentation force flushes both traces and metrics after each Lambda invocation.
This behavior can be controlled with the following environment variables:

* ``OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH``: Enables or disables force flushing for all signal types. Defaults to ``true``.
Copy link
Contributor

@xrmx xrmx Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If these are Python specific they should be OTEL_PYTHON_ prefixed, e.g. OTEL_PYTHON_AWS_LAMBDA_FORCE_FLUSH

* ``OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TRACES``: Enables or disables force flushing for traces. Defaults to the value of ``OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH``.
* ``OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_METRICS``: Enables or disables force flushing for metrics. Defaults to the value of ``OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH``.
* ``OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT``: The timeout in milliseconds for force flushing. Defaults to ``30000``.

The default value for signal-specific options inherits from ``OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH``.

API
---

The `instrument` method accepts the following keyword args:

tracer_provider (TracerProvider) - an optional tracer provider
meter_provider (MeterProvider) - an optional meter provider
event_context_extractor (Callable) - a function that returns an OTel Trace
Context given the Lambda Event the AWS Lambda was invoked with
this function signature is: def event_context_extractor(lambda_event: Any) -> Context
for example:
* ``tracer_provider`` (TracerProvider) - an optional tracer provider
* ``meter_provider`` (MeterProvider) - an optional meter provider
* ``event_context_extractor`` (Callable) - a function that returns an OTel Trace Context given the Lambda Event the AWS Lambda was invoked with this function signature is: def event_context_extractor(lambda_event: Any) -> Context
* ``force_flush`` (bool) - enables or disables force flushing for all signal types. Defaults to True.
* ``flush_traces`` (bool) - enables or disables force flushing for traces. Defaults to the value of ``force_flush``.
* ``flush_metrics`` (bool) - enables or disables force flushing for metrics. Defaults to the value of ``force_flush``.

Example usage:

.. code:: python

Expand All @@ -63,10 +80,10 @@ def custom_event_context_extractor(lambda_event):
return get_global_textmap().extract(lambda_event["foo"]["headers"])

AwsLambdaInstrumentor().instrument(
event_context_extractor=custom_event_context_extractor
event_context_extractor=custom_event_context_extractor,
force_flush=False,
flush_metrics=True,
)

---
"""

import logging
Expand Down Expand Up @@ -122,6 +139,27 @@ def custom_event_context_extractor(lambda_event):
OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT = (
"OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT"
)
OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH = (
"OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH"
)
OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TRACES = (
"OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TRACES"
)
OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_METRICS = (
"OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_METRICS"
)


def _get_env_bool(env_var: str, default: bool) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised there isn't something like this already in contrib nor the SDK repo -- I couldn't find one either.

value = os.environ.get(env_var)
if value is None:
return default
value = value.strip().lower()
if value in ("y", "yes", "t", "true", "on", "1"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue I see with that is that we cannot allow explicitly setting this option to false. Should I make this a number (i.e. 0/1) instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xrmx thoughts? (on recently stale PR 🙂)

return True
if value in ("n", "no", "f", "false", "off", "0"):
return False
return default


def _default_event_context_extractor(lambda_event: Any) -> Context:
Expand Down Expand Up @@ -272,6 +310,8 @@ def _instrument(
event_context_extractor: Callable[[Any], Context],
tracer_provider: TracerProvider = None,
meter_provider: MeterProvider = None,
flush_traces: bool = True,
flush_metrics: bool = True,
):
# pylint: disable=too-many-locals
# pylint: disable=too-many-statements
Expand Down Expand Up @@ -389,27 +429,27 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches

now = time.time()
_tracer_provider = tracer_provider or get_tracer_provider()
if hasattr(_tracer_provider, "force_flush"):
if flush_traces and hasattr(_tracer_provider, "force_flush"):
try:
# NOTE: `force_flush` before function quit in case of Lambda freeze.
_tracer_provider.force_flush(flush_timeout)
except Exception: # pylint: disable=broad-except
logger.exception("TracerProvider failed to flush traces")
else:
elif flush_traces and not hasattr(_tracer_provider, "force_flush"):
logger.warning(
"TracerProvider was missing `force_flush` method. This is necessary in case of a Lambda freeze and would exist in the OTel SDK implementation."
)

_meter_provider = meter_provider or get_meter_provider()
if hasattr(_meter_provider, "force_flush"):
if flush_metrics and hasattr(_meter_provider, "force_flush"):
rem = flush_timeout - (time.time() - now) * 1000
if rem > 0:
try:
# NOTE: `force_flush` before function quit in case of Lambda freeze.
_meter_provider.force_flush(rem)
except Exception: # pylint: disable=broad-except
logger.exception("MeterProvider failed to flush metrics")
else:
elif flush_metrics and not hasattr(_meter_provider, "force_flush"):
logger.warning(
"MeterProvider was missing `force_flush` method. This is necessary in case of a Lambda freeze and would exist in the OTel SDK implementation."
)
Expand Down Expand Up @@ -444,6 +484,12 @@ def _instrument(self, **kwargs):
Event as input and extracts an OTel Context from it. By default,
the context is extracted from the HTTP headers of an API Gateway
request.
``force_flush``: if True, force flush all signal types after each
invocation. Defaults to True.
``flush_traces``: if True, force flush traces after each
invocation. Defaults to False.
``flush_metrics``: if True, force flush metrics after each
invocation. Defaults to False.
"""

# Don't try if we are not running on AWS Lambda
Expand Down Expand Up @@ -480,6 +526,11 @@ def _instrument(self, **kwargs):
flush_timeout_env,
)

force_flush = kwargs.get(
"force_flush",
_get_env_bool(OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH, True),
)

_instrument(
self._wrapped_module_name,
self._wrapped_function_name,
Expand All @@ -489,6 +540,18 @@ def _instrument(self, **kwargs):
),
tracer_provider=kwargs.get("tracer_provider"),
meter_provider=kwargs.get("meter_provider"),
flush_traces=kwargs.get(
"flush_traces",
_get_env_bool(
OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TRACES, force_flush
),
),
flush_metrics=kwargs.get(
"flush_metrics",
_get_env_bool(
OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_METRICS, force_flush
),
),
)

def _uninstrument(self, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=no-self-use

import logging
import os
Expand All @@ -25,7 +26,10 @@
from opentelemetry.instrumentation.aws_lambda import (
_HANDLER,
_X_AMZN_TRACE_ID,
OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_METRICS,
OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT,
OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TRACES,
OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH,
AwsLambdaInstrumentor,
)
from opentelemetry.propagate import get_global_textmap
Expand Down Expand Up @@ -561,6 +565,157 @@ def test_load_entry_point(self):
AwsLambdaInstrumentor,
)

@mock.patch("opentelemetry.instrumentation.aws_lambda.get_meter_provider")
@mock.patch("opentelemetry.instrumentation.aws_lambda.get_tracer_provider")
def test_force_flush_default(
self, mock_get_tracer_provider, mock_get_meter_provider
):
mock_tracer_provider = mock.Mock()
mock_meter_provider = mock.Mock()
mock_get_tracer_provider.return_value = mock_tracer_provider
mock_get_meter_provider.return_value = mock_meter_provider

AwsLambdaInstrumentor().instrument()
mock_execute_lambda()

mock_tracer_provider.force_flush.assert_called_once()
mock_meter_provider.force_flush.assert_called_once()

@mock.patch("opentelemetry.instrumentation.aws_lambda.get_meter_provider")
@mock.patch("opentelemetry.instrumentation.aws_lambda.get_tracer_provider")
@mock.patch.dict(
"os.environ",
{
OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH: "false",
},
)
def test_force_flush_disabled_globally(
self, mock_get_tracer_provider, mock_get_meter_provider
):
mock_tracer_provider = mock.Mock()
mock_meter_provider = mock.Mock()
mock_get_tracer_provider.return_value = mock_tracer_provider
mock_get_meter_provider.return_value = mock_meter_provider

AwsLambdaInstrumentor().instrument()
mock_execute_lambda()

mock_tracer_provider.force_flush.assert_not_called()
mock_meter_provider.force_flush.assert_not_called()

@mock.patch("opentelemetry.instrumentation.aws_lambda.get_meter_provider")
@mock.patch("opentelemetry.instrumentation.aws_lambda.get_tracer_provider")
@mock.patch.dict(
"os.environ",
{
OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH: "false",
OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TRACES: "true",
},
)
def test_force_flush_selective_traces(
self, mock_get_tracer_provider, mock_get_meter_provider
):
mock_tracer_provider = mock.Mock()
mock_meter_provider = mock.Mock()
mock_get_tracer_provider.return_value = mock_tracer_provider
mock_get_meter_provider.return_value = mock_meter_provider

AwsLambdaInstrumentor().instrument()
mock_execute_lambda()

mock_tracer_provider.force_flush.assert_called_once()
mock_meter_provider.force_flush.assert_not_called()

@mock.patch("opentelemetry.instrumentation.aws_lambda.get_meter_provider")
@mock.patch("opentelemetry.instrumentation.aws_lambda.get_tracer_provider")
@mock.patch.dict(
"os.environ",
{
OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH: "false",
OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_METRICS: "true",
},
)
def test_force_flush_selective_metrics(
self, mock_get_tracer_provider, mock_get_meter_provider
):
mock_tracer_provider = mock.Mock()
mock_meter_provider = mock.Mock()
mock_get_tracer_provider.return_value = mock_tracer_provider
mock_get_meter_provider.return_value = mock_meter_provider

AwsLambdaInstrumentor().instrument()
mock_execute_lambda()

mock_tracer_provider.force_flush.assert_not_called()
mock_meter_provider.force_flush.assert_called_once()

@mock.patch("opentelemetry.instrumentation.aws_lambda.get_meter_provider")
@mock.patch("opentelemetry.instrumentation.aws_lambda.get_tracer_provider")
@mock.patch.dict(
"os.environ",
{
OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TRACES: "false",
},
)
def test_force_flush_disabled_selective_traces(
self, mock_get_tracer_provider, mock_get_meter_provider
):
mock_tracer_provider = mock.Mock()
mock_meter_provider = mock.Mock()
mock_get_tracer_provider.return_value = mock_tracer_provider
mock_get_meter_provider.return_value = mock_meter_provider

AwsLambdaInstrumentor().instrument()
mock_execute_lambda()

mock_tracer_provider.force_flush.assert_not_called()
mock_meter_provider.force_flush.assert_called_once()

@mock.patch("opentelemetry.instrumentation.aws_lambda.get_meter_provider")
@mock.patch("opentelemetry.instrumentation.aws_lambda.get_tracer_provider")
@mock.patch.dict(
"os.environ",
{
OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_METRICS: "false",
},
)
def test_force_flush_disabled_selective_metrics(
self, mock_get_tracer_provider, mock_get_meter_provider
):
mock_tracer_provider = mock.Mock()
mock_meter_provider = mock.Mock()
mock_get_tracer_provider.return_value = mock_tracer_provider
mock_get_meter_provider.return_value = mock_meter_provider

AwsLambdaInstrumentor().instrument()
mock_execute_lambda()

mock_tracer_provider.force_flush.assert_called_once()
mock_meter_provider.force_flush.assert_not_called()

@mock.patch("opentelemetry.instrumentation.aws_lambda.get_meter_provider")
@mock.patch("opentelemetry.instrumentation.aws_lambda.get_tracer_provider")
@mock.patch.dict(
"os.environ", {OTEL_INSTRUMENTATION_AWS_LAMBDA_FORCE_FLUSH: "true"}
)
def test_force_flush_instrumentation_options_override(
self, mock_get_tracer_provider, mock_get_meter_provider
):
mock_tracer_provider = mock.Mock()
mock_meter_provider = mock.Mock()
mock_get_tracer_provider.return_value = mock_tracer_provider
mock_get_meter_provider.return_value = mock_meter_provider

# Pass options that override the environment variable
AwsLambdaInstrumentor().instrument(
force_flush=False,
flush_metrics=True,
)
mock_execute_lambda()

mock_tracer_provider.force_flush.assert_not_called()
mock_meter_provider.force_flush.assert_called_once()


class TestAwsLambdaInstrumentorMocks(TestAwsLambdaInstrumentorBase):
def test_api_gateway_proxy_event_sets_attributes(self):
Expand Down