Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 53 additions & 8 deletions src/aws_durable_execution_sdk_python_testing/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import json
from dataclasses import replace
from datetime import UTC, datetime
from typing import TYPE_CHECKING
from typing import Any
from uuid import uuid4

from aws_durable_execution_sdk_python.execution import (
Expand All @@ -24,15 +24,12 @@
IllegalStateException,
InvalidParameterValueException,
)
from aws_durable_execution_sdk_python_testing.model import (
StartDurableExecutionInput,
)
from aws_durable_execution_sdk_python_testing.token import CheckpointToken


if TYPE_CHECKING:
from aws_durable_execution_sdk_python_testing.model import (
StartDurableExecutionInput,
)


class Execution:
"""Execution state."""

Expand All @@ -51,7 +48,7 @@ def __init__(
# TODO: this will need to persist/rehydrate depending on inmemory vs sqllite store
self.token_sequence: int = 0
self.is_complete: bool = False
self.result: DurableExecutionInvocationOutput | None
self.result: DurableExecutionInvocationOutput | None = None
self.consecutive_failed_invocation_attempts: int = 0

@staticmethod
Expand All @@ -63,6 +60,54 @@ def new(input: StartDurableExecutionInput) -> Execution: # noqa: A002
durable_execution_arn=str(uuid4()), start_input=input, operations=[]
)

def to_dict(self) -> dict[str, Any]:
"""Serialize execution to dictionary."""
return {
"DurableExecutionArn": self.durable_execution_arn,
"StartInput": self.start_input.to_dict(),
"Operations": [op.to_dict() for op in self.operations],
"Updates": [update.to_dict() for update in self.updates],
"UsedTokens": list(self.used_tokens),
"TokenSequence": self.token_sequence,
"IsComplete": self.is_complete,
"Result": self.result.to_dict() if self.result else None,
"ConsecutiveFailedInvocationAttempts": self.consecutive_failed_invocation_attempts,
}

@classmethod
def from_dict(cls, data: dict[str, Any]) -> Execution:
"""Deserialize execution from dictionary."""
# Reconstruct start_input
start_input = StartDurableExecutionInput.from_dict(data["StartInput"])

# Reconstruct operations
operations = [Operation.from_dict(op_data) for op_data in data["Operations"]]

# Create execution
execution = cls(
durable_execution_arn=data["DurableExecutionArn"],
start_input=start_input,
operations=operations,
)

# Set additional fields
execution.updates = [
OperationUpdate.from_dict(update_data) for update_data in data["Updates"]
]
execution.used_tokens = set(data["UsedTokens"])
execution.token_sequence = data["TokenSequence"]
execution.is_complete = data["IsComplete"]
execution.result = (
DurableExecutionInvocationOutput.from_dict(data["Result"])
if data["Result"]
else None
)
execution.consecutive_failed_invocation_attempts = data[
"ConsecutiveFailedInvocationAttempts"
]

return execution

def start(self) -> None:
# not thread safe, prob should be
if self.start_input.invocation_id is None:
Expand Down