Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions py/plugins/aws-bedrock/src/genkit/py.typed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

170 changes: 134 additions & 36 deletions py/plugins/aws/src/genkit/plugins/aws/telemetry/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,9 @@
import os
import uuid
from collections.abc import Callable, Mapping, MutableMapping, Sequence
from typing import Any
from typing import Any, cast

import requests
import structlog
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
Expand All @@ -238,6 +239,8 @@
from opentelemetry.sdk.trace import ReadableSpan, TracerProvider
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from opentelemetry.sdk.trace.sampling import Sampler
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

from genkit.core.environment import is_dev_environment
from genkit.core.trace.adjusting_exporter import AdjustingTraceExporter, RedactedSpan
Expand Down Expand Up @@ -446,12 +449,125 @@ def _inject_trace_context(self, event_dict: MutableMapping[str, Any]) -> Mutable
return event_dict


class SigV4SigningAdapter(HTTPAdapter):
"""HTTP adapter that signs requests with AWS SigV4 authentication.

This adapter intercepts all HTTP requests and signs them with AWS SigV4
before sending. This enables the OTLP exporter to authenticate with
AWS services like X-Ray that require SigV4.

Example:
```python
session = requests.Session()
adapter = SigV4SigningAdapter(
credentials=botocore_session.get_credentials(),
region='us-west-2',
service='xray',
)
session.mount('https://', adapter)
```
"""

def __init__(
self,
credentials: Any, # noqa: ANN401 - botocore credentials type
region: str,
service: str = 'xray',
**kwargs: Any, # noqa: ANN401
) -> None:
"""Initialize the SigV4 signing adapter.

Args:
credentials: Botocore credentials object.
region: AWS region for signing.
service: AWS service name for signing (default: 'xray').
**kwargs: Additional arguments passed to HTTPAdapter.
"""
super().__init__(**kwargs)
self._credentials = credentials
self._region = region
self._service = service

def send( # type: ignore[override]
self,
request: requests.PreparedRequest,
**kwargs: Any, # noqa: ANN401
) -> requests.Response:
"""Send the request after signing it with SigV4.

Args:
request: The prepared request to send.
**kwargs: Additional arguments passed to the parent send method.

Returns:
The response from the server.
"""
if self._credentials is not None:
# Sign the request
aws_request = AWSRequest(
method=request.method or 'POST',
url=request.url or '',
headers=dict(request.headers) if request.headers else {},
data=request.body or b'',
)
SigV4Auth(self._credentials, self._service, self._region).add_auth(aws_request)

# Update the original request with signed headers
if request.headers is None:
request.headers = cast(Any, {})
request.headers.update(dict(aws_request.headers))

return super().send(request, **kwargs)


def _create_sigv4_session(
credentials: Any, # noqa: ANN401 - botocore credentials type
region: str,
service: str = 'xray',
) -> requests.Session:
"""Create a requests Session that signs all requests with SigV4.

Args:
credentials: Botocore credentials object.
region: AWS region for signing.
service: AWS service name for signing.

Returns:
A configured requests Session with SigV4 signing.
"""
session = requests.Session()

# Configure retry logic
retry = Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[500, 502, 503, 504],
)

# Create signing adapter with retry
adapter = SigV4SigningAdapter(
credentials=credentials,
region=region,
service=service,
max_retries=retry,
)

# Mount the signing adapter for HTTPS requests
session.mount('https://', adapter)

return session


class AwsXRayOtlpExporter(SpanExporter):
"""OTLP/HTTP exporter with AWS SigV4 authentication for X-Ray.

This exporter sends spans via OTLP/HTTP to the AWS X-Ray endpoint,
signing each request with AWS SigV4 authentication using botocore.

The SigV4 signing is implemented via a custom requests Session adapter
that intercepts all outgoing HTTP requests and adds the required
Authorization, X-Amz-Date, and X-Amz-Security-Token headers.

Args:
region: AWS region for the X-Ray endpoint.
error_handler: Optional callback for export errors.
Expand Down Expand Up @@ -485,47 +601,29 @@ def __init__(
self._botocore_session = BotocoreSession()
self._credentials = self._botocore_session.get_credentials()

# Create the underlying OTLP exporter
self._otlp_exporter = OTLPSpanExporter(
endpoint=self._endpoint,
)

def _sign_request(self, headers: dict[str, str], body: bytes) -> dict[str, str]:
"""Sign the request with AWS SigV4.

Args:
headers: Request headers.
body: Request body bytes.

Returns:
Updated headers with SigV4 signature.
"""
if self._credentials is None:
logger.warning('No AWS credentials found for SigV4 signing')
return headers

# Create an AWS request for signing
aws_request = AWSRequest(
method='POST',
url=self._endpoint,
headers=headers,
data=body,
)
logger.warning(
'No AWS credentials found for SigV4 signing. X-Ray export will fail without valid credentials.'
)

# Sign the request
SigV4Auth(self._credentials, 'xray', self._region).add_auth(aws_request)
# Create a session with SigV4 signing adapter
signing_session = _create_sigv4_session(
credentials=self._credentials,
region=region,
service='xray',
)

# Return the signed headers
return dict(aws_request.headers)
# Create the underlying OTLP exporter with signing session
self._otlp_exporter = OTLPSpanExporter(
endpoint=self._endpoint,
session=signing_session,
)

def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
"""Export spans to AWS X-Ray via OTLP/HTTP.
"""Export spans to AWS X-Ray via OTLP/HTTP with SigV4 authentication.

Note:
The current implementation delegates to the underlying OTLP exporter
which may not include SigV4 headers. For production use with
collector-less export, consider using ADOT auto-instrumentation
or the ADOT collector.
All requests are automatically signed with AWS SigV4 using the
credentials from the botocore session.

Args:
spans: A sequence of OpenTelemetry ReadableSpan objects to export.
Expand Down
127 changes: 127 additions & 0 deletions py/plugins/aws/tests/aws_telemetry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@
from unittest import mock

import pytest
import requests

from genkit.plugins.aws.telemetry.tracing import (
XRAY_OTLP_ENDPOINT_PATTERN,
AwsAdjustingTraceExporter,
AwsTelemetry,
AwsXRayOtlpExporter,
SigV4SigningAdapter,
TimeAdjustedSpan,
_create_sigv4_session,
_resolve_region,
add_aws_telemetry,
)
Expand Down Expand Up @@ -116,6 +119,130 @@ def test_exporter_with_error_handler(self) -> None:
)
assert exporter._error_handler is not None

def test_exporter_uses_sigv4_session(self) -> None:
"""Exporter should use a session with SigV4 signing adapter mounted."""
exporter = AwsXRayOtlpExporter(region='us-west-2')
# The OTLP exporter should have a session configured
assert exporter._otlp_exporter._session is not None


class TestSigV4SigningAdapter:
"""Tests for the SigV4SigningAdapter class."""

def test_adapter_initialization(self) -> None:
"""Adapter should initialize with credentials and region."""
mock_credentials = mock.MagicMock()
adapter = SigV4SigningAdapter(
credentials=mock_credentials,
region='us-west-2',
service='xray',
)
assert adapter._credentials == mock_credentials
assert adapter._region == 'us-west-2'
assert adapter._service == 'xray'

def test_adapter_default_service(self) -> None:
"""Adapter should default to xray service."""
mock_credentials = mock.MagicMock()
adapter = SigV4SigningAdapter(
credentials=mock_credentials,
region='eu-west-1',
)
assert adapter._service == 'xray'

def test_adapter_signs_request(self) -> None:
"""Adapter should add SigV4 headers to request."""
# Create mock credentials
mock_credentials = mock.MagicMock()
mock_credentials.access_key = 'AKIAIOSFODNN7EXAMPLE'
mock_credentials.secret_key = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'
mock_credentials.token = None

adapter = SigV4SigningAdapter(
credentials=mock_credentials,
region='us-west-2',
service='xray',
)

# Create a mock request
request = requests.PreparedRequest()
request.prepare(
method='POST',
url='https://xray.us-west-2.amazonaws.com/v1/traces',
headers={'Content-Type': 'application/x-protobuf'},
data=b'test-payload',
)

# Mock the parent send method
with mock.patch.object(
adapter.__class__.__bases__[0],
'send',
return_value=mock.MagicMock(status_code=200),
):
adapter.send(request)

# Verify that the request now has Authorization header
assert request.headers is not None
assert 'Authorization' in request.headers
assert 'AWS4-HMAC-SHA256' in request.headers['Authorization']

def test_adapter_handles_none_credentials(self) -> None:
"""Adapter should handle None credentials gracefully."""
adapter = SigV4SigningAdapter(
credentials=None,
region='us-west-2',
)

request = requests.PreparedRequest()
request.prepare(
method='POST',
url='https://xray.us-west-2.amazonaws.com/v1/traces',
headers={},
data=b'test',
)

# Should not raise, just skip signing
with mock.patch.object(
adapter.__class__.__bases__[0],
'send',
return_value=mock.MagicMock(status_code=200),
):
adapter.send(request)

# No Authorization header should be set
assert request.headers is not None
assert 'Authorization' not in request.headers


class TestCreateSigV4Session:
"""Tests for the _create_sigv4_session function."""

def test_session_has_adapter_mounted(self) -> None:
"""Session should have SigV4 adapter mounted for HTTPS."""
mock_credentials = mock.MagicMock()
session = _create_sigv4_session(
credentials=mock_credentials,
region='us-west-2',
)

# Get the adapter for an HTTPS URL
adapter = session.get_adapter('https://xray.us-west-2.amazonaws.com')
assert isinstance(adapter, SigV4SigningAdapter)

def test_session_adapter_has_correct_region(self) -> None:
"""Session adapter should be configured with correct region."""
mock_credentials = mock.MagicMock()
session = _create_sigv4_session(
credentials=mock_credentials,
region='eu-west-1',
service='xray',
)

adapter = session.get_adapter('https://xray.eu-west-1.amazonaws.com')
assert isinstance(adapter, SigV4SigningAdapter)
assert adapter._region == 'eu-west-1'
assert adapter._service == 'xray'


class TestAwsAdjustingTraceExporter:
"""Tests for the AwsAdjustingTraceExporter class."""
Expand Down
32 changes: 32 additions & 0 deletions py/samples/_common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,38 @@ check_env_var() {
local var_name="$1"
local get_url="$2"

local current_val="${!var_name:-}"

# Prompt if running interactively
# We check -t 0 (stdin is TTY) and also explicit check for /dev/tty availability
if [[ -t 0 ]] && [ -c /dev/tty ]; then
local display_val="${current_val}"

# Simple masking for keys
if [[ "$var_name" == *"API_KEY"* || "$var_name" == *"SECRET"* ]]; then
if [[ -n "$current_val" ]]; then
display_val="******"
fi
fi

echo -en "${BLUE}Enter ${var_name}${NC}"
if [[ -n "$display_val" ]]; then
echo -en " [${YELLOW}${display_val}${NC}]: "
else
echo -n ": "
fi

local input_val
# Safely read from TTY
if read -r input_val < /dev/tty; then
if [[ -n "$input_val" ]]; then
export "$var_name"="$input_val"
fi
fi
# Only print newline if we actually prompted
echo ""
fi

if [[ -z "${!var_name:-}" ]]; then
echo -e "${YELLOW}Warning: ${var_name} not set${NC}"
if [[ -n "$get_url" ]]; then
Expand Down
Loading
Loading