Skip to content

Commit 2e44b69

Browse files
FullyTypedAstraea Quinn S
authored andcommitted
parity: Suspend when invoking a pending function
When invoking a function that is currently pending, we should treat it is as if it has just started, and suspend execution.
1 parent 50bc1d4 commit 2e44b69

File tree

2 files changed

+13
-9
lines changed

2 files changed

+13
-9
lines changed

src/aws_durable_execution_sdk_python/operation/invoke.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ def invoke_handler(
6969
# Operation failed, throw the exact same error on replay as the checkpointed failure
7070
checkpointed_result.raise_callable_error()
7171

72-
if checkpointed_result.is_started():
72+
if checkpointed_result.is_started() or checkpointed_result.is_pending():
7373
# Operation is still running, suspend until completion
7474
logger.debug(
7575
"⏳ Invoke %s still in progress, suspending",

tests/operation/invoke_test.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -164,15 +164,16 @@ def test_invoke_handler_already_timed_out():
164164
)
165165

166166

167-
def test_invoke_handler_already_started():
167+
@pytest.mark.parametrize("status", [OperationStatus.STARTED, OperationStatus.PENDING])
168+
def test_invoke_handler_already_started(status):
168169
"""Test invoke_handler when operation is already started."""
169170
mock_state = Mock(spec=ExecutionState)
170171
mock_state.durable_execution_arn = "test_arn"
171172

172173
operation = Operation(
173174
operation_id="invoke6",
174175
operation_type=OperationType.CHAINED_INVOKE,
175-
status=OperationStatus.STARTED,
176+
status=status,
176177
chained_invoke_details=ChainedInvokeDetails(),
177178
)
178179
mock_result = CheckpointedResult.create_from_operation(operation)
@@ -188,15 +189,16 @@ def test_invoke_handler_already_started():
188189
)
189190

190191

191-
def test_invoke_handler_already_started_with_timeout():
192+
@pytest.mark.parametrize("status", [OperationStatus.STARTED, OperationStatus.PENDING])
193+
def test_invoke_handler_already_started_with_timeout(status):
192194
"""Test invoke_handler when operation is already started with timeout config."""
193195
mock_state = Mock(spec=ExecutionState)
194196
mock_state.durable_execution_arn = "test_arn"
195197

196198
operation = Operation(
197199
operation_id="invoke7",
198200
operation_type=OperationType.CHAINED_INVOKE,
199-
status=OperationStatus.STARTED,
201+
status=status,
200202
chained_invoke_details=ChainedInvokeDetails(),
201203
)
202204
mock_result = CheckpointedResult.create_from_operation(operation)
@@ -402,15 +404,16 @@ def test_suspend_with_optional_timeout_negative_timeout():
402404
assert "test message" in str(exc_info.value)
403405

404406

405-
def test_invoke_handler_with_operation_name():
407+
@pytest.mark.parametrize("status", [OperationStatus.STARTED, OperationStatus.PENDING])
408+
def test_invoke_handler_with_operation_name(status: OperationStatus):
406409
"""Test invoke_handler uses operation name in logs when available."""
407410
mock_state = Mock(spec=ExecutionState)
408411
mock_state.durable_execution_arn = "test_arn"
409412

410413
operation = Operation(
411414
operation_id="invoke14",
412415
operation_type=OperationType.CHAINED_INVOKE,
413-
status=OperationStatus.STARTED,
416+
status=status,
414417
chained_invoke_details=ChainedInvokeDetails(),
415418
)
416419
mock_result = CheckpointedResult.create_from_operation(operation)
@@ -426,15 +429,16 @@ def test_invoke_handler_with_operation_name():
426429
)
427430

428431

429-
def test_invoke_handler_without_operation_name():
432+
@pytest.mark.parametrize("status", [OperationStatus.STARTED, OperationStatus.PENDING])
433+
def test_invoke_handler_without_operation_name(status: OperationStatus):
430434
"""Test invoke_handler uses function name in logs when no operation name."""
431435
mock_state = Mock(spec=ExecutionState)
432436
mock_state.durable_execution_arn = "test_arn"
433437

434438
operation = Operation(
435439
operation_id="invoke15",
436440
operation_type=OperationType.CHAINED_INVOKE,
437-
status=OperationStatus.STARTED,
441+
status=status,
438442
chained_invoke_details=ChainedInvokeDetails(),
439443
)
440444
mock_result = CheckpointedResult.create_from_operation(operation)

0 commit comments

Comments
 (0)