Skip to content

Commit 3a37521

Browse files
committed
parity(Termination Paths): Match exit paths with reference
- Introduce `ExecutionError` and `InvocationError` for non-retriable and retriable errors respectfully. - align callback waiting behaviour according to spec: - terminated without resuklt will raise an error - terminated and successful will return appropriate value - anything else will suspend execution
1 parent bf4b002 commit 3a37521

File tree

19 files changed

+264
-84
lines changed

19 files changed

+264
-84
lines changed

src/aws_durable_execution_sdk_python/context.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
WaitForConditionConfig,
1717
)
1818
from aws_durable_execution_sdk_python.exceptions import (
19-
FatalError,
19+
CallbackError,
2020
SuspendExecution,
2121
ValidationError,
2222
)
@@ -125,11 +125,17 @@ def result(self) -> T | None:
125125
checkpointed_result: CheckpointedResult = self.state.get_checkpoint_result(
126126
self.operation_id
127127
)
128-
if checkpointed_result.is_started():
129-
msg: str = "Calback result not received yet. Suspending execution while waiting for result."
130-
raise SuspendExecution(msg)
131128

132-
if checkpointed_result.is_failed() or checkpointed_result.is_timed_out():
129+
if not checkpointed_result.is_existent():
130+
msg = "Callback operation must exist"
131+
raise CallbackError(msg)
132+
133+
if (
134+
checkpointed_result.is_failed()
135+
or checkpointed_result.is_cancelled()
136+
or checkpointed_result.is_timed_out()
137+
or checkpointed_result.is_stopped()
138+
):
133139
checkpointed_result.raise_callable_error()
134140

135141
if checkpointed_result.is_succeeded():
@@ -143,8 +149,10 @@ def result(self) -> T | None:
143149
durable_execution_arn=self.state.durable_execution_arn,
144150
)
145151

146-
msg = "Callback must be started before you can await the result."
147-
raise FatalError(msg)
152+
# operation exists; it has not terminated (successfully or otherwise)
153+
# therefore we should wait
154+
msg = "Callback result not received yet. Suspending execution while waiting for result."
155+
raise SuspendExecution(msg)
148156

149157

150158
class DurableContext(DurableContextProtocol):

src/aws_durable_execution_sdk_python/exceptions.py

Lines changed: 75 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,90 @@
77

88
import time
99
from dataclasses import dataclass
10+
from enum import Enum
1011
from typing import TYPE_CHECKING
1112

1213
if TYPE_CHECKING:
1314
import datetime
1415

1516

17+
class TerminationReason(Enum):
18+
"""Reasons why a durable execution terminated."""
19+
20+
UNHANDLED_ERROR = "UNHANDLED_ERROR"
21+
INVOCATION_ERROR = "INVOCATION_ERROR"
22+
EXECUTION_ERROR = "EXECUTION_ERROR"
23+
CHECKPOINT_FAILED = "CHECKPOINT_FAILED"
24+
NON_DETERMINISTIC_EXECUTION = "NON_DETERMINISTIC_EXECUTION"
25+
STEP_INTERRUPTED = "STEP_INTERRUPTED"
26+
CALLBACK_ERROR = "CALLBACK_ERROR"
27+
SERIALIZATION_ERROR = "SERIALIZATION_ERROR"
28+
29+
1630
class DurableExecutionsError(Exception):
1731
"""Base class for Durable Executions exceptions"""
1832

1933

20-
class FatalError(DurableExecutionsError):
21-
"""Unrecoverable error. Will not retry."""
34+
class UnrecoverableError(DurableExecutionsError):
35+
"""Base class for errors that terminate execution."""
36+
37+
def __init__(self, message: str, termination_reason: TerminationReason):
38+
super().__init__(message)
39+
self.termination_reason = termination_reason
40+
41+
42+
class ExecutionError(UnrecoverableError):
43+
"""Error that returns FAILED status without retry."""
44+
45+
def __init__(
46+
self,
47+
message: str,
48+
termination_reason: TerminationReason = TerminationReason.EXECUTION_ERROR,
49+
):
50+
super().__init__(message, termination_reason)
51+
52+
53+
class InvocationError(UnrecoverableError):
54+
"""Error that should cause Lambda retry by throwing from handler."""
55+
56+
def __init__(
57+
self,
58+
message: str,
59+
termination_reason: TerminationReason = TerminationReason.INVOCATION_ERROR,
60+
):
61+
super().__init__(message, termination_reason)
62+
63+
64+
class CallbackError(ExecutionError):
65+
"""Error in callback handling."""
66+
67+
def __init__(self, message: str, callback_id: str | None = None):
68+
super().__init__(message, TerminationReason.CALLBACK_ERROR)
69+
self.callback_id = callback_id
70+
71+
72+
class CheckpointFailedError(InvocationError):
73+
"""Error when checkpoint operation fails."""
74+
75+
def __init__(self, message: str, step_id: str | None = None):
76+
super().__init__(message, TerminationReason.CHECKPOINT_FAILED)
77+
self.step_id = step_id
2278

2379

24-
class CheckpointError(FatalError):
80+
class NonDeterministicExecutionError(ExecutionError):
81+
"""Error when execution is non-deterministic."""
82+
83+
def __init__(self, message: str, step_id: str | None = None):
84+
super().__init__(message, TerminationReason.NON_DETERMINISTIC_EXECUTION)
85+
self.step_id = step_id
86+
87+
88+
class CheckpointError(CheckpointFailedError):
2589
"""Failure to checkpoint. Will terminate the lambda."""
2690

91+
def __init__(self, message: str):
92+
super().__init__(message)
93+
2794

2895
class ValidationError(DurableExecutionsError):
2996
"""Incorrect arguments to a Durable Function operation."""
@@ -54,9 +121,13 @@ def __init__(
54121
self.stack_trace = stack_trace
55122

56123

57-
class StepInterruptedError(UserlandError):
124+
class StepInterruptedError(InvocationError):
58125
"""Raised when a step is interrupted before it checkpointed at the end."""
59126

127+
def __init__(self, message: str, step_id: str | None = None):
128+
super().__init__(message, TerminationReason.STEP_INTERRUPTED)
129+
self.step_id = step_id
130+
60131

61132
class SuspendExecution(BaseException):
62133
"""Raise this exception to suspend the current execution by returning PENDING to DAR.

src/aws_durable_execution_sdk_python/execution.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010
from aws_durable_execution_sdk_python.exceptions import (
1111
CheckpointError,
1212
DurableExecutionsError,
13-
FatalError,
13+
ExecutionError,
14+
InvocationError,
1415
SuspendExecution,
1516
)
1617
from aws_durable_execution_sdk_python.lambda_service import (
@@ -289,10 +290,16 @@ def wrapper(event: Any, context: LambdaContext) -> MutableMapping[str, Any]:
289290
logger.exception("Failed to checkpoint")
290291
# Throw the error to terminate the lambda
291292
raise
292-
except FatalError as e:
293-
logger.exception("Fatal error")
293+
294+
except InvocationError:
295+
logger.exception("Invocation error. Must terminate.")
296+
# Throw the error to trigger Lambda retry
297+
raise
298+
except ExecutionError as e:
299+
logger.exception("Execution error. Must terminate without retry.")
294300
return DurableExecutionInvocationOutput(
295-
status=InvocationStatus.PENDING, error=ErrorObject.from_exception(e)
301+
status=InvocationStatus.FAILED,
302+
error=ErrorObject.from_exception(e),
296303
).to_dict()
297304
except Exception as e:
298305
# all user-space errors go here

src/aws_durable_execution_sdk_python/operation/callback.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from typing import TYPE_CHECKING, Any
66

77
from aws_durable_execution_sdk_python.config import StepConfig
8-
from aws_durable_execution_sdk_python.exceptions import FatalError
8+
from aws_durable_execution_sdk_python.exceptions import CallbackError
99
from aws_durable_execution_sdk_python.lambda_service import (
1010
CallbackOptions,
1111
OperationUpdate,
@@ -58,8 +58,8 @@ def create_callback_handler(
5858
not checkpointed_result.operation
5959
or not checkpointed_result.operation.callback_details
6060
):
61-
msg = "Missing callback details"
62-
raise FatalError(msg)
61+
msg = f"Missing callback details for operation: {operation_identifier.operation_id}"
62+
raise CallbackError(msg)
6363

6464
return checkpointed_result.operation.callback_details.callback_id
6565

@@ -74,8 +74,8 @@ def create_callback_handler(
7474
)
7575

7676
if not result.operation or not result.operation.callback_details:
77-
msg = "Missing callback details"
78-
raise FatalError(msg)
77+
msg = f"Missing callback details for operation: {operation_identifier.operation_id}"
78+
raise CallbackError(msg)
7979

8080
return result.operation.callback_details.callback_id
8181

src/aws_durable_execution_sdk_python/operation/child.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@
66
from typing import TYPE_CHECKING, TypeVar
77

88
from aws_durable_execution_sdk_python.config import ChildConfig
9-
from aws_durable_execution_sdk_python.exceptions import FatalError, SuspendExecution
9+
from aws_durable_execution_sdk_python.exceptions import (
10+
InvocationError,
11+
SuspendExecution,
12+
)
1013
from aws_durable_execution_sdk_python.lambda_service import (
1114
ContextOptions,
1215
ErrorObject,
@@ -138,7 +141,11 @@ def child_handler(
138141
)
139142
state.create_checkpoint(operation_update=fail_operation)
140143

141-
# TODO: rethink FatalError
142-
if isinstance(e, FatalError):
143-
raise
144+
# InvocationError and its derivatives can be retried
145+
# When we encounter an invocation error (in all of its forms), we bubble that
146+
# error upwards (with the checkpoint in place) such that we reach the
147+
# execution handler at the very top, which will then induce a retry from the
148+
# dataplane.
149+
if isinstance(e, InvocationError):
150+
raise e
144151
raise error_object.to_callable_runtime_error() from e

src/aws_durable_execution_sdk_python/operation/invoke.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@
66
from typing import TYPE_CHECKING, TypeVar
77

88
from aws_durable_execution_sdk_python.config import InvokeConfig
9-
from aws_durable_execution_sdk_python.exceptions import (
10-
FatalError,
11-
)
9+
from aws_durable_execution_sdk_python.exceptions import ExecutionError
1210
from aws_durable_execution_sdk_python.lambda_service import (
1311
ChainedInvokeOptions,
1412
OperationUpdate,
@@ -107,5 +105,6 @@ def invoke_handler(
107105
)
108106
suspend_with_optional_timeout(msg, config.timeout_seconds)
109107
# This line should never be reached since suspend_with_optional_timeout always raises
108+
# if it is ever reached, we will crash in a non-retryable manner via ExecutionError
110109
msg = "suspend_with_optional_timeout should have raised an exception, but did not."
111-
raise FatalError(msg) from None
110+
raise ExecutionError(msg) from None

src/aws_durable_execution_sdk_python/operation/step.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
StepSemantics,
1212
)
1313
from aws_durable_execution_sdk_python.exceptions import (
14-
FatalError,
14+
ExecutionError,
1515
StepInterruptedError,
1616
)
1717
from aws_durable_execution_sdk_python.lambda_service import (
@@ -151,14 +151,14 @@ def step_handler(
151151
)
152152
return raw_result # noqa: TRY300
153153
except Exception as e:
154-
if isinstance(e, FatalError):
154+
if isinstance(e, ExecutionError):
155155
# no retry on fatal - e.g checkpoint exception
156156
logger.debug(
157157
"💥 Fatal error for id: %s, name: %s",
158158
operation_identifier.operation_id,
159159
operation_identifier.name,
160160
)
161-
# this bubbles up to execution.durable_handler, where it will exit with PENDING. TODO: confirm if still correct
161+
# this bubbles up to execution.durable_handler, where it will exit with FAILED
162162
raise
163163

164164
logger.exception(
@@ -168,8 +168,10 @@ def step_handler(
168168
)
169169

170170
retry_handler(e, state, operation_identifier, config, checkpointed_result)
171+
# if we've failed to raise an exception from the retry_handler, then we are in a
172+
# weird state, and should crash terminate the execution
171173
msg = "retry handler should have raised an exception, but did not."
172-
raise FatalError(msg) from None
174+
raise ExecutionError(msg) from None
173175

174176

175177
# TODO: I don't much like this func, needs refactor. Messy grab-bag of args, refine.

src/aws_durable_execution_sdk_python/operation/wait_for_condition.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from typing import TYPE_CHECKING, TypeVar
77

88
from aws_durable_execution_sdk_python.exceptions import (
9-
FatalError,
9+
ExecutionError,
1010
)
1111
from aws_durable_execution_sdk_python.lambda_service import (
1212
ErrorObject,
@@ -203,4 +203,4 @@ def wait_for_condition_handler(
203203
raise
204204

205205
msg: str = "wait_for_condition should never reach this point"
206-
raise FatalError(msg)
206+
raise ExecutionError(msg)

src/aws_durable_execution_sdk_python/serdes.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from dataclasses import dataclass
77
from typing import Generic, TypeVar
88

9-
from aws_durable_execution_sdk_python.exceptions import FatalError
9+
from aws_durable_execution_sdk_python.exceptions import ExecutionError
1010

1111
logger = logging.getLogger(__name__)
1212

@@ -54,7 +54,7 @@ def serialize(
5454
operation_id,
5555
)
5656
msg = f"Serialization failed for id: {operation_id}, error: {e}."
57-
raise FatalError(msg) from e
57+
raise ExecutionError(msg) from e
5858

5959

6060
def deserialize(
@@ -71,4 +71,4 @@ def deserialize(
7171
operation_id,
7272
)
7373
msg = f"Deserialization failed for id: {operation_id}"
74-
raise FatalError(msg) from e
74+
raise ExecutionError(msg) from e

src/aws_durable_execution_sdk_python/state.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,11 @@ def is_succeeded(self) -> bool:
9292

9393
return op.status is OperationStatus.SUCCEEDED
9494

95+
def is_cancelled(self) -> bool:
96+
if op := self.operation:
97+
return op.status is OperationStatus.CANCELLED
98+
return False
99+
95100
def is_failed(self) -> bool:
96101
"""Return True if the checkpointed operation is FAILED."""
97102
op = self.operation

0 commit comments

Comments
 (0)