Skip to content

Commit f1938b7

Browse files
committed
Allow passing input/output serdes
Changes: - We allow input and output serdes on the decorator. This is particularly useful for chained invoke to ensure callees automatically have the correct serialization
1 parent 00b195d commit f1938b7

File tree

3 files changed

+186
-15
lines changed

3 files changed

+186
-15
lines changed

src/aws_durable_execution_sdk_python/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,24 @@
1818
# Core decorator - used in every durable function
1919
from aws_durable_execution_sdk_python.execution import durable_execution
2020

21+
# Serialization - for custom input/output serialization
22+
from aws_durable_execution_sdk_python.serdes import (
23+
ExtendedTypeSerDes,
24+
JsonSerDes,
25+
SerDes,
26+
)
27+
2128
# Essential context types - passed to user functions
2229
from aws_durable_execution_sdk_python.types import BatchResult, StepContext
2330

2431
__all__ = [
2532
"BatchResult",
2633
"DurableContext",
2734
"DurableExecutionsError",
35+
"ExtendedTypeSerDes",
2836
"InvocationError",
37+
"JsonSerDes",
38+
"SerDes",
2939
"StepContext",
3040
"ValidationError",
3141
"durable_execution",

src/aws_durable_execution_sdk_python/execution.py

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@
2727
OperationType,
2828
OperationUpdate,
2929
)
30+
from aws_durable_execution_sdk_python.serdes import (
31+
JsonSerDes,
32+
SerDes,
33+
deserialize,
34+
serialize,
35+
)
3036

3137
if TYPE_CHECKING:
3238
from collections.abc import Callable, MutableMapping
@@ -206,11 +212,18 @@ def durable_execution(
206212
func: Callable[[Any, DurableContext], Any] | None = None,
207213
*,
208214
boto3_client: boto3.client | None = None,
215+
input_serdes: SerDes = JsonSerDes(),
216+
output_serdes: SerDes = JsonSerDes(),
209217
) -> Callable[[Any, LambdaContext], Any]:
210218
# Decorator called with parameters
211219
if func is None:
212220
logger.debug("Decorator called with parameters")
213-
return functools.partial(durable_execution, boto3_client=boto3_client)
221+
return functools.partial(
222+
durable_execution,
223+
boto3_client=boto3_client,
224+
input_serdes=input_serdes,
225+
output_serdes=output_serdes,
226+
)
214227

215228
logger.debug("Starting durable execution handler...")
216229

@@ -250,18 +263,14 @@ def wrapper(event: Any, context: LambdaContext) -> MutableMapping[str, Any]:
250263
invocation_input.initial_execution_state.get_input_payload()
251264
)
252265

253-
# Python RIC LambdaMarshaller just uses standard json deserialization for event
254-
# https://github.com/aws/aws-lambda-python-runtime-interface-client/blob/main/awslambdaric/lambda_runtime_marshaller.py#L46
255-
input_event: MutableMapping[str, Any] = {}
266+
input_event: MutableMapping[str, Any] = None
256267
if raw_input_payload and raw_input_payload.strip():
257-
try:
258-
input_event = json.loads(raw_input_payload)
259-
except json.JSONDecodeError:
260-
logger.exception(
261-
"Failed to parse input payload as JSON: payload: %r",
262-
raw_input_payload,
263-
)
264-
raise
268+
input_event = deserialize(
269+
serdes=input_serdes,
270+
data=raw_input_payload,
271+
operation_id="EXECUTION",
272+
durable_execution_arn=invocation_input.durable_execution_arn,
273+
)
265274

266275
execution_state: ExecutionState = ExecutionState(
267276
durable_execution_arn=invocation_input.durable_execution_arn,
@@ -310,7 +319,15 @@ def wrapper(event: Any, context: LambdaContext) -> MutableMapping[str, Any]:
310319
"%s exiting user-space...",
311320
invocation_input.durable_execution_arn,
312321
)
313-
serialized_result = json.dumps(result)
322+
323+
# Serialize result using output_serdes if provided
324+
serialized_result = serialize(
325+
serdes=output_serdes,
326+
value=result,
327+
operation_id="EXECUTION",
328+
durable_execution_arn=invocation_input.durable_execution_arn,
329+
)
330+
314331
# large response handling here. Remember if checkpointing to complete, NOT to include
315332
# payload in response
316333
if (

tests/execution_test.py

Lines changed: 146 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,32 @@
33
import datetime
44
import json
55
import time
6+
from decimal import Decimal
67
from typing import Any
78
from unittest.mock import Mock, patch
89

10+
11+
from aws_durable_execution_sdk_python import (
12+
DurableContext,
13+
ExtendedTypeSerDes,
14+
JsonSerDes,
15+
durable_execution,
16+
)
17+
from aws_durable_execution_sdk_python.execution import (
18+
DurableExecutionInvocationInputWithClient,
19+
InitialExecutionState,
20+
)
21+
from aws_durable_execution_sdk_python.lambda_service import (
22+
CheckpointOutput,
23+
CheckpointUpdatedExecutionState,
24+
DurableServiceClient,
25+
ExecutionDetails,
26+
Operation,
27+
OperationStatus,
28+
OperationType,
29+
)
30+
31+
932
import pytest
1033

1134
from aws_durable_execution_sdk_python.config import StepConfig, StepSemantics
@@ -946,7 +969,7 @@ def test_handler(event: Any, context: DurableContext) -> dict:
946969
lambda_context.invoked_function_arn = None
947970
lambda_context.tenant_id = None
948971

949-
with pytest.raises(json.JSONDecodeError):
972+
with pytest.raises(ExecutionError):
950973
test_handler(invocation_input, lambda_context)
951974

952975

@@ -2047,7 +2070,6 @@ def test_handler(event: Any, context: DurableContext) -> dict:
20472070

20482071
def test_durable_execution_with_non_dict_event_raises_error():
20492072
"""Test that invoking a durable function with a non-dict event raises a helpful error."""
2050-
20512073
# GIVEN a durable function
20522074
@durable_execution
20532075
def test_handler(event: Any, context: DurableContext) -> dict:
@@ -2071,3 +2093,125 @@ def test_handler(event: Any, context: DurableContext) -> dict:
20712093
match="The payload is not the correct Durable Function input",
20722094
):
20732095
test_handler(non_dict_event, lambda_context)
2096+
2097+
2098+
# region SERDES
2099+
2100+
@pytest.mark.parametrize(
2101+
"input_serdes,output_serdes,input_data,expected_output",
2102+
[
2103+
# Both ExtendedTypeSerDes
2104+
(
2105+
ExtendedTypeSerDes(),
2106+
ExtendedTypeSerDes(),
2107+
{
2108+
"amount": Decimal("123.45"),
2109+
"timestamp": datetime.datetime(2025, 11, 20, 18, 0, 0),
2110+
},
2111+
{
2112+
"result_amount": Decimal("678.90"),
2113+
"processed_at": datetime.datetime(2025, 11, 20, 19, 0, 0),
2114+
},
2115+
),
2116+
# Both JsonSerDes
2117+
(
2118+
JsonSerDes(),
2119+
JsonSerDes(),
2120+
{"name": "test", "value": 42},
2121+
{"result": "success", "count": 100},
2122+
),
2123+
# Input ExtendedTypeSerDes, Output JsonSerDes
2124+
(
2125+
ExtendedTypeSerDes(),
2126+
JsonSerDes(),
2127+
{"amount": Decimal("123.45")},
2128+
{"result": "success"},
2129+
),
2130+
# Input JsonSerDes, Output ExtendedTypeSerDes
2131+
(
2132+
JsonSerDes(),
2133+
ExtendedTypeSerDes(),
2134+
{"name": "test"},
2135+
{"timestamp": datetime.datetime(2025, 11, 20, 19, 0, 0)},
2136+
),
2137+
],
2138+
)
2139+
def test_durable_execution_with_serdes(input_serdes, output_serdes, input_data, expected_output):
2140+
"""Test that input_serdes and output_serdes are invoked correctly."""
2141+
serialized_input = input_serdes.serialize(input_data, None)
2142+
2143+
mock_client = Mock(spec=DurableServiceClient)
2144+
mock_client.checkpoint.return_value = CheckpointOutput(
2145+
checkpoint_token="new-token",
2146+
new_execution_state=CheckpointUpdatedExecutionState(operations=[], next_marker=None),
2147+
)
2148+
2149+
execution_op = Operation(
2150+
operation_id="EXECUTION",
2151+
operation_type=OperationType.EXECUTION,
2152+
status=OperationStatus.STARTED,
2153+
execution_details=ExecutionDetails(input_payload=serialized_input),
2154+
)
2155+
2156+
invocation_input = DurableExecutionInvocationInputWithClient(
2157+
durable_execution_arn="arn:aws:lambda:us-east-1:123456789012:function:test",
2158+
checkpoint_token="initial-token",
2159+
initial_execution_state=InitialExecutionState(
2160+
operations=[execution_op],
2161+
next_marker="",
2162+
),
2163+
is_local_runner=False,
2164+
service_client=mock_client,
2165+
)
2166+
2167+
with patch.object(input_serdes, "deserialize", wraps=input_serdes.deserialize) as mock_input_deser, patch.object(
2168+
output_serdes, "serialize", wraps=output_serdes.serialize
2169+
) as mock_output_ser:
2170+
2171+
@durable_execution(input_serdes=input_serdes, output_serdes=output_serdes)
2172+
def handler(event, context: DurableContext):
2173+
return expected_output
2174+
2175+
result = handler(invocation_input, Mock())
2176+
2177+
mock_input_deser.assert_called_once()
2178+
mock_output_ser.assert_called_once_with(expected_output, mock_output_ser.call_args[0][1])
2179+
2180+
assert result["Status"] == "SUCCEEDED"
2181+
result_data = output_serdes.deserialize(result["Result"], None)
2182+
assert result_data == expected_output
2183+
2184+
2185+
def test_durable_execution_with_none_input():
2186+
"""Test that None input is handled correctly with serdes."""
2187+
mock_client = Mock(spec=DurableServiceClient)
2188+
mock_client.checkpoint.return_value = CheckpointOutput(
2189+
checkpoint_token="new-token",
2190+
new_execution_state=CheckpointUpdatedExecutionState(operations=[], next_marker=None),
2191+
)
2192+
2193+
execution_op = Operation(
2194+
operation_id="EXECUTION",
2195+
operation_type=OperationType.EXECUTION,
2196+
status=OperationStatus.STARTED,
2197+
execution_details=ExecutionDetails(input_payload=None),
2198+
)
2199+
2200+
invocation_input = DurableExecutionInvocationInputWithClient(
2201+
durable_execution_arn="arn:aws:lambda:us-east-1:123456789012:function:test",
2202+
checkpoint_token="initial-token",
2203+
initial_execution_state=InitialExecutionState(
2204+
operations=[execution_op],
2205+
next_marker="",
2206+
),
2207+
is_local_runner=False,
2208+
service_client=mock_client,
2209+
)
2210+
2211+
@durable_execution(input_serdes=ExtendedTypeSerDes())
2212+
def handler(event, context: DurableContext):
2213+
assert event is None
2214+
return {"result": "success"}
2215+
2216+
result = handler(invocation_input, Mock())
2217+
assert result["Status"] == "SUCCEEDED"

0 commit comments

Comments
 (0)