From 84f3b2eb9c0b4cb2441ddb5b38fafec5cd0f45ea Mon Sep 17 00:00:00 2001 From: Brent Champion Date: Thu, 9 Oct 2025 14:29:06 -0400 Subject: [PATCH] chore: add deserialization for execution object --- .../execution.py | 61 ++++++++++++++++--- 1 file changed, 53 insertions(+), 8 deletions(-) diff --git a/src/aws_durable_execution_sdk_python_testing/execution.py b/src/aws_durable_execution_sdk_python_testing/execution.py index 58c01de..51342fb 100644 --- a/src/aws_durable_execution_sdk_python_testing/execution.py +++ b/src/aws_durable_execution_sdk_python_testing/execution.py @@ -3,7 +3,7 @@ import json from dataclasses import replace from datetime import UTC, datetime -from typing import TYPE_CHECKING +from typing import Any from uuid import uuid4 from aws_durable_execution_sdk_python.execution import ( @@ -24,15 +24,12 @@ IllegalStateException, InvalidParameterValueException, ) +from aws_durable_execution_sdk_python_testing.model import ( + StartDurableExecutionInput, +) from aws_durable_execution_sdk_python_testing.token import CheckpointToken -if TYPE_CHECKING: - from aws_durable_execution_sdk_python_testing.model import ( - StartDurableExecutionInput, - ) - - class Execution: """Execution state.""" @@ -51,7 +48,7 @@ def __init__( # TODO: this will need to persist/rehydrate depending on inmemory vs sqllite store self.token_sequence: int = 0 self.is_complete: bool = False - self.result: DurableExecutionInvocationOutput | None + self.result: DurableExecutionInvocationOutput | None = None self.consecutive_failed_invocation_attempts: int = 0 @staticmethod @@ -63,6 +60,54 @@ def new(input: StartDurableExecutionInput) -> Execution: # noqa: A002 durable_execution_arn=str(uuid4()), start_input=input, operations=[] ) + def to_dict(self) -> dict[str, Any]: + """Serialize execution to dictionary.""" + return { + "DurableExecutionArn": self.durable_execution_arn, + "StartInput": self.start_input.to_dict(), + "Operations": [op.to_dict() for op in self.operations], + "Updates": [update.to_dict() for update in self.updates], + "UsedTokens": list(self.used_tokens), + "TokenSequence": self.token_sequence, + "IsComplete": self.is_complete, + "Result": self.result.to_dict() if self.result else None, + "ConsecutiveFailedInvocationAttempts": self.consecutive_failed_invocation_attempts, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> Execution: + """Deserialize execution from dictionary.""" + # Reconstruct start_input + start_input = StartDurableExecutionInput.from_dict(data["StartInput"]) + + # Reconstruct operations + operations = [Operation.from_dict(op_data) for op_data in data["Operations"]] + + # Create execution + execution = cls( + durable_execution_arn=data["DurableExecutionArn"], + start_input=start_input, + operations=operations, + ) + + # Set additional fields + execution.updates = [ + OperationUpdate.from_dict(update_data) for update_data in data["Updates"] + ] + execution.used_tokens = set(data["UsedTokens"]) + execution.token_sequence = data["TokenSequence"] + execution.is_complete = data["IsComplete"] + execution.result = ( + DurableExecutionInvocationOutput.from_dict(data["Result"]) + if data["Result"] + else None + ) + execution.consecutive_failed_invocation_attempts = data[ + "ConsecutiveFailedInvocationAttempts" + ] + + return execution + def start(self) -> None: # not thread safe, prob should be if self.start_input.invocation_id is None: