Skip to content

Commit a0dfd8a

Browse files
committed
fix(localrunner): checkpoint validation parent & duplicate IDs
The SDK now uses background thread batching for checkpoints, which can send multiple updates for the same operation in a single batch (e.g., START followed by SUCCEED for fast-completing operations). Updated the checkpoint validator to allow this valid behavior. Changes: - Allow duplicate operation IDs in checkpoint batches for STEP/CONTEXT operations when first action is START and subsequent is non-START - Reject duplicate IDs for other operation types (WAIT, CALLBACK, etc.) - Add 11 new tests covering all duplicate/inconsistency scenarios - Add pragma comments to Protocol method stubs in serialization.py
1 parent 869fd0d commit a0dfd8a

File tree

3 files changed

+394
-20
lines changed

3 files changed

+394
-20
lines changed

src/aws_durable_execution_sdk_python_testing/checkpoint/validators/checkpoint.py

Lines changed: 83 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from typing import TYPE_CHECKING
77

88
from aws_durable_execution_sdk_python.lambda_service import (
9+
OperationAction,
910
OperationType,
1011
OperationUpdate,
1112
)
@@ -83,6 +84,7 @@ def _validate_operation_update(
8384
update: OperationUpdate, execution: Execution
8485
) -> None:
8586
"""Validate a single operation update."""
87+
CheckpointValidator._validate_inconsistent_operation_metadata(update, execution)
8688
CheckpointValidator._validate_payload_sizes(update)
8789
ValidActionsByOperationTypeValidator.validate(
8890
update.operation_type, update.action
@@ -127,43 +129,112 @@ def _validate_operation_status_transition(
127129

128130
raise InvalidParameterValueException(msg)
129131

132+
@staticmethod
133+
def _validate_inconsistent_operation_metadata(
134+
update: OperationUpdate, execution: Execution
135+
) -> None:
136+
"""Validate that operation metadata is consistent with existing operation."""
137+
current_state = None
138+
for operation in execution.operations:
139+
if operation.operation_id == update.operation_id:
140+
current_state = operation
141+
break
142+
143+
if current_state is not None:
144+
if (
145+
update.operation_type is not None
146+
and update.operation_type != current_state.operation_type
147+
):
148+
raise InvalidParameterValueException("Inconsistent operation type.")
149+
150+
if (
151+
hasattr(update, "sub_type")
152+
and update.sub_type is not None
153+
and update.sub_type != getattr(current_state, "sub_type", None)
154+
):
155+
raise InvalidParameterValueException("Inconsistent operation subtype.")
156+
157+
if (
158+
hasattr(update, "name")
159+
and update.name is not None
160+
and update.name != getattr(current_state, "name", None)
161+
):
162+
raise InvalidParameterValueException("Inconsistent operation name.")
163+
164+
if update.parent_id is not None and update.parent_id != getattr(
165+
current_state, "parent_id", None
166+
):
167+
raise InvalidParameterValueException(
168+
"Inconsistent parent operation id."
169+
)
170+
130171
@staticmethod
131172
def _validate_parent_id_and_duplicate_id(
132173
updates: list[OperationUpdate], execution: Execution
133174
) -> None:
134-
"""Validate parent IDs and check for duplicate operation IDs."""
135-
operations_seen: MutableMapping[str, OperationUpdate] = {}
175+
"""Validate parent IDs and check for duplicate operation IDs.
176+
177+
Validate that any provided parentId is valid, and also validate no duplicate operation is being
178+
updated at the same time (unless it is a STEP/CONTEXT starting + performing one more non-START action).
179+
."""
180+
operations_started: MutableMapping[str, OperationUpdate] = {}
181+
last_updates_seen: MutableMapping[str, OperationUpdate] = {}
136182

137183
for update in updates:
138-
if update.operation_id in operations_seen:
139-
msg: str = "Cannot update the same operation twice in a single request."
140-
raise InvalidParameterValueException(msg)
184+
if CheckpointValidator._is_invalid_duplicate_update(
185+
update, last_updates_seen
186+
):
187+
raise InvalidParameterValueException(
188+
"Cannot checkpoint multiple operations with the same ID."
189+
)
141190

142191
if not CheckpointValidator._is_valid_parent_for_update(
143-
execution, update, operations_seen
192+
execution, update, operations_started
144193
):
145-
msg_invalid_parent: str = "Invalid parent operation id."
194+
raise InvalidParameterValueException("Invalid parent operation id.")
195+
196+
if update.action == OperationAction.START:
197+
operations_started[update.operation_id] = update
198+
199+
last_updates_seen[update.operation_id] = update
200+
201+
@staticmethod
202+
def _is_invalid_duplicate_update(
203+
update: OperationUpdate, last_updates_seen: MutableMapping[str, OperationUpdate]
204+
) -> bool:
205+
"""Check if this is an invalid duplicate update."""
206+
last_update = last_updates_seen.get(update.operation_id)
207+
if last_update is None:
208+
return False
146209

147-
raise InvalidParameterValueException(msg_invalid_parent)
210+
if last_update.operation_type in (OperationType.STEP, OperationType.CONTEXT):
211+
# Allow duplicate for STEP/CONTEXT if last was START and current is not START
212+
allow_duplicate = (
213+
last_update.action == OperationAction.START
214+
and update.action != OperationAction.START
215+
)
216+
return not allow_duplicate
148217

149-
operations_seen[update.operation_id] = update
218+
return True
150219

151220
@staticmethod
152221
def _is_valid_parent_for_update(
153222
execution: Execution,
154223
update: OperationUpdate,
155-
operations_seen: MutableMapping[str, OperationUpdate],
224+
operations_started: MutableMapping[str, OperationUpdate],
156225
) -> bool:
157226
"""Check if the parent ID is valid for the update."""
158227
parent_id = update.parent_id
159228

160229
if parent_id is None:
161230
return True
162231

163-
if parent_id in operations_seen:
164-
parent_update = operations_seen[parent_id]
232+
# Check if parent is in operations started in this batch
233+
if parent_id in operations_started:
234+
parent_update = operations_started[parent_id]
165235
return parent_update.operation_type == OperationType.CONTEXT
166236

237+
# Check if parent exists in current execution state
167238
for operation in execution.operations:
168239
if operation.operation_id == parent_id:
169240
return operation.operation_type == OperationType.CONTEXT

src/aws_durable_execution_sdk_python_testing/web/serialization.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def to_bytes(self, data: Any) -> bytes:
3636
Raises:
3737
InvalidParameterValueException: If serialization fails
3838
"""
39-
...
39+
... # pragma: no cover
4040

4141

4242
class Deserializer(Protocol):
@@ -54,7 +54,7 @@ def from_bytes(self, data: bytes) -> dict[str, Any]:
5454
Raises:
5555
InvalidParameterValueException: If deserialization fails
5656
"""
57-
...
57+
... # pragma: no cover
5858

5959

6060
class AwsRestJsonSerializer:

0 commit comments

Comments
 (0)