Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
if TYPE_CHECKING:
from aws_durable_execution_sdk_python_testing.execution import Execution
from aws_durable_execution_sdk_python_testing.scheduler import Scheduler
from aws_durable_execution_sdk_python_testing.store import ExecutionStore
from aws_durable_execution_sdk_python_testing.stores.base import ExecutionStore


class CheckpointProcessor:
Expand Down
24 changes: 24 additions & 0 deletions src/aws_durable_execution_sdk_python_testing/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
StartDurableExecutionInput,
)
from aws_durable_execution_sdk_python_testing.runner import WebRunner, WebRunnerConfig
from aws_durable_execution_sdk_python_testing.stores.base import StoreType
from aws_durable_execution_sdk_python_testing.web.server import WebServiceConfig


Expand All @@ -50,6 +51,10 @@ class CliConfig:
local_runner_region: str = "us-west-2"
local_runner_mode: str = "local"

# Store configuration
store_type: StoreType = StoreType.MEMORY
store_path: str | None = None

@classmethod
def from_environment(cls) -> CliConfig:
"""Create configuration from environment variables with defaults."""
Expand All @@ -69,6 +74,8 @@ def from_environment(cls) -> CliConfig:
),
local_runner_region=os.getenv("AWS_DEX_LOCAL_RUNNER_REGION", "us-west-2"),
local_runner_mode=os.getenv("AWS_DEX_LOCAL_RUNNER_MODE", "local"),
store_type=StoreType(os.getenv("AWS_DEX_STORE_TYPE", "memory")),
store_path=os.getenv("AWS_DEX_STORE_PATH"),
)


Expand Down Expand Up @@ -188,6 +195,17 @@ def _create_start_server_parser(self, subparsers) -> None:
default=self.config.local_runner_mode,
help=f"Local Runner mode (default: {self.config.local_runner_mode}, env: AWS_DEX_LOCAL_RUNNER_MODE)",
)
start_server_parser.add_argument(
"--store-type",
choices=[store_type.value for store_type in StoreType],
default=self.config.store_type.value,
help=f"Store type for execution persistence (default: {self.config.store_type.value}, env: AWS_DEX_STORE_TYPE)",
)
start_server_parser.add_argument(
"--store-path",
default=self.config.store_path,
help=f"Path for filesystem store (default: {self.config.store_path or '.durable_executions'}, env: AWS_DEX_STORE_PATH)",
)
start_server_parser.set_defaults(func=self.start_server_command)

def _create_invoke_parser(self, subparsers) -> None:
Expand Down Expand Up @@ -258,6 +276,8 @@ def start_server_command(self, args: argparse.Namespace) -> int:
local_runner_endpoint=args.local_runner_endpoint,
local_runner_region=args.local_runner_region,
local_runner_mode=args.local_runner_mode,
store_type=StoreType(args.store_type),
store_path=args.store_path,
)

logger.info(
Expand All @@ -273,6 +293,10 @@ def start_server_command(self, args: argparse.Namespace) -> int:
logger.info(" Local Runner Endpoint: %s", args.local_runner_endpoint)
logger.info(" Local Runner Region: %s", args.local_runner_region)
logger.info(" Local Runner Mode: %s", args.local_runner_mode)
logger.info(" Store Type: %s", args.store_type)
if StoreType(args.store_type) == StoreType.FILESYSTEM:
store_path = args.store_path or ".durable_executions"
logger.info(" Store Path: %s", store_path)

# Use runner as context manager for proper lifecycle
with WebRunner(runner_config) as runner:
Expand Down
16 changes: 8 additions & 8 deletions src/aws_durable_execution_sdk_python_testing/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@

from aws_durable_execution_sdk_python_testing.invoker import Invoker
from aws_durable_execution_sdk_python_testing.scheduler import Event, Scheduler
from aws_durable_execution_sdk_python_testing.store import ExecutionStore
from aws_durable_execution_sdk_python_testing.stores.base import ExecutionStore

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -142,15 +142,15 @@ def get_execution_details(self, execution_arn: str) -> GetDurableExecutionRespon
durable_execution_name=execution.start_input.execution_name,
function_arn=f"arn:aws:lambda:us-east-1:123456789012:function:{execution.start_input.function_name}",
status=status,
start_timestamp=execution_op.start_timestamp.isoformat()
start_timestamp=execution_op.start_timestamp.timestamp()
if execution_op.start_timestamp
else datetime.now(UTC).isoformat(),
else datetime.now(UTC).timestamp(),
input_payload=execution_op.execution_details.input_payload
if execution_op.execution_details
else None,
result=result,
error=error,
end_timestamp=execution_op.end_timestamp.isoformat()
end_timestamp=execution_op.end_timestamp.timestamp()
if execution_op.end_timestamp
else None,
version="1.0",
Expand Down Expand Up @@ -223,10 +223,10 @@ def list_executions(
durable_execution_name=execution.start_input.execution_name,
function_arn=f"arn:aws:lambda:us-east-1:123456789012:function:{execution.start_input.function_name}",
status=execution_status,
start_timestamp=execution_op.start_timestamp.isoformat()
start_timestamp=execution_op.start_timestamp.timestamp()
if execution_op.start_timestamp
else datetime.now(UTC).isoformat(),
end_timestamp=execution_op.end_timestamp.isoformat()
else datetime.now(UTC).timestamp(),
end_timestamp=execution_op.end_timestamp.timestamp()
if execution_op.end_timestamp
else None,
)
Expand Down Expand Up @@ -333,7 +333,7 @@ def stop_execution(
# Stop the execution
self.fail_execution(execution_arn, stop_error)

return StopDurableExecutionResponse(stop_date=datetime.now(UTC).isoformat())
return StopDurableExecutionResponse(stop_date=datetime.now(UTC).timestamp())

def get_execution_state(
self,
Expand Down
10 changes: 5 additions & 5 deletions src/aws_durable_execution_sdk_python_testing/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,11 @@ class GetDurableExecutionResponse:
durable_execution_name: str
function_arn: str
status: str
start_timestamp: str
start_timestamp: float
input_payload: str | None = None
result: str | None = None
error: ErrorObject | None = None
end_timestamp: str | None = None
end_timestamp: float | None = None
version: str | None = None

@classmethod
Expand Down Expand Up @@ -188,8 +188,8 @@ class Execution:
durable_execution_name: str
function_arn: str
status: str
start_timestamp: str
end_timestamp: str | None = None
start_timestamp: float
end_timestamp: float | None = None

@classmethod
def from_dict(cls, data: dict) -> Execution:
Expand Down Expand Up @@ -325,7 +325,7 @@ def to_dict(self) -> dict[str, Any]:
class StopDurableExecutionResponse:
"""Response from stopping a durable execution."""

stop_date: str
stop_date: float

@classmethod
def from_dict(cls, data: dict) -> StopDurableExecutionResponse:
Expand Down
23 changes: 20 additions & 3 deletions src/aws_durable_execution_sdk_python_testing/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,16 @@
StartDurableExecutionOutput,
)
from aws_durable_execution_sdk_python_testing.scheduler import Scheduler
from aws_durable_execution_sdk_python_testing.store import InMemoryExecutionStore
from aws_durable_execution_sdk_python_testing.stores.base import (
ExecutionStore,
StoreType,
)
from aws_durable_execution_sdk_python_testing.stores.filesystem import (
FileSystemExecutionStore,
)
from aws_durable_execution_sdk_python_testing.stores.memory import (
InMemoryExecutionStore,
)
from aws_durable_execution_sdk_python_testing.web.server import WebServer


Expand Down Expand Up @@ -83,6 +92,10 @@ class WebRunnerConfig:
local_runner_region: str = "us-west-2"
local_runner_mode: str = "local"

# Store configuration
store_type: StoreType = StoreType.MEMORY
store_path: str | None = None # Path for filesystem store


@dataclass(frozen=True)
class Operation:
Expand Down Expand Up @@ -543,7 +556,7 @@ def __init__(self, config: WebRunnerConfig) -> None:
self._config = config
self._server: WebServer | None = None
self._scheduler: Scheduler | None = None
self._store: InMemoryExecutionStore | None = None
self._store: ExecutionStore | None = None
self._invoker: LambdaInvoker | None = None
self._executor: Executor | None = None

Expand Down Expand Up @@ -581,7 +594,11 @@ def start(self) -> None:
raise DurableFunctionsLocalRunnerError(msg)

# Create dependencies and server
self._store = InMemoryExecutionStore()
if self._config.store_type == StoreType.FILESYSTEM:
store_path = self._config.store_path or ".durable_executions"
self._store = FileSystemExecutionStore.create(store_path)
else:
self._store = InMemoryExecutionStore()
self._scheduler = Scheduler()
self._invoker = LambdaInvoker(self._create_boto3_client())

Expand Down
50 changes: 0 additions & 50 deletions src/aws_durable_execution_sdk_python_testing/store.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

27 changes: 27 additions & 0 deletions src/aws_durable_execution_sdk_python_testing/stores/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Base classes and protocols for execution stores."""

from __future__ import annotations

from enum import Enum
from typing import TYPE_CHECKING, Protocol


if TYPE_CHECKING:
from aws_durable_execution_sdk_python_testing.execution import Execution


class StoreType(Enum):
"""Supported execution store types."""

MEMORY = "memory"
FILESYSTEM = "filesystem"


class ExecutionStore(Protocol):
"""Protocol for execution storage implementations."""

# ignore cover because coverage doesn't understand elipses
def save(self, execution: Execution) -> None: ... # pragma: no cover
def load(self, execution_arn: str) -> Execution: ... # pragma: no cover
def update(self, execution: Execution) -> None: ... # pragma: no cover
def list_all(self) -> list[Execution]: ... # pragma: no cover
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""File system-based execution store implementation."""

from __future__ import annotations

import json
import logging
from datetime import UTC, datetime
from pathlib import Path

from aws_durable_execution_sdk_python_testing.exceptions import (
DurableFunctionsLocalRunnerError,
)
from aws_durable_execution_sdk_python_testing.execution import Execution


class DateTimeEncoder(json.JSONEncoder):
"""Custom JSON encoder that handles datetime objects."""

def default(self, obj):
if isinstance(obj, datetime):
return obj.timestamp()
return super().default(obj)


def datetime_object_hook(obj):
"""JSON object hook to convert unix timestamps back to datetime objects."""
if isinstance(obj, dict):
for key, value in obj.items():
if isinstance(value, int | float) and key.endswith(("_timestamp", "_time")):
try: # noqa: SIM105
obj[key] = datetime.fromtimestamp(value, tz=UTC)
except (ValueError, OSError):
# Leave as number if not a valid timestamp
pass
return obj


class FileSystemExecutionStore:
"""File system-based execution store for persistence."""

def __init__(self, storage_dir: Path) -> None:
self._storage_dir = storage_dir

@classmethod
def create(cls, storage_dir: str | Path | None = None) -> FileSystemExecutionStore:
"""Create a FileSystemExecutionStore with directory creation.

Args:
storage_dir: Directory path for storage. Defaults to '.durable_executions'

Returns:
FileSystemExecutionStore instance with created directory
"""
path = Path(storage_dir) if storage_dir else Path(".durable_executions")
path.mkdir(exist_ok=True)
return cls(storage_dir=path)

def _get_file_path(self, execution_arn: str) -> Path:
"""Get file path for execution ARN."""
# Use ARN as filename with .json extension, replacing unsafe characters
safe_filename = execution_arn.replace(":", "_").replace("/", "_")
return self._storage_dir / f"{safe_filename}.json"

def save(self, execution: Execution) -> None:
"""Save execution to file system."""
file_path = self._get_file_path(execution.durable_execution_arn)
data = execution.to_dict()

with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, cls=DateTimeEncoder)

def load(self, execution_arn: str) -> Execution:
"""Load execution from file system."""
file_path = self._get_file_path(execution_arn)
if not file_path.exists():
msg = f"Execution {execution_arn} not found"
raise DurableFunctionsLocalRunnerError(msg)

with open(file_path, encoding="utf-8") as f:
data = json.load(f, object_hook=datetime_object_hook)

return Execution.from_dict(data)

def update(self, execution: Execution) -> None:
"""Update execution in file system (same as save)."""
self.save(execution)

def list_all(self) -> list[Execution]:
"""List all executions from file system."""
executions = []
for file_path in self._storage_dir.glob("*.json"):
try:
with open(file_path, encoding="utf-8") as f:
data = json.load(f, object_hook=datetime_object_hook)
executions.append(Execution.from_dict(data))
except (json.JSONDecodeError, KeyError, OSError) as e:
logging.warning("Skipping corrupted file %s: %s", file_path, e)
continue
return executions
Loading
Loading