From 521160fc815a05b29ebc8a54d08460d6e8918ec3 Mon Sep 17 00:00:00 2001 From: Brent Champion Date: Thu, 9 Oct 2025 18:25:37 -0400 Subject: [PATCH 1/2] feat: implement filesystem store --- .../checkpoint/processor.py | 2 +- .../cli.py | 24 ++ .../executor.py | 16 +- .../model.py | 10 +- .../runner.py | 18 +- .../store.py | 50 ---- .../stores/__init__.py | 42 +++ .../stores/filesystem.py | 99 +++++++ .../stores/memory.py | 28 ++ tests/checkpoint/processor_test.py | 2 +- tests/stores/__init__.py | 1 + tests/stores/filesystem_store_test.py | 264 ++++++++++++++++++ .../memory_store_test.py} | 6 +- 13 files changed, 492 insertions(+), 70 deletions(-) delete mode 100644 src/aws_durable_execution_sdk_python_testing/store.py create mode 100644 src/aws_durable_execution_sdk_python_testing/stores/__init__.py create mode 100644 src/aws_durable_execution_sdk_python_testing/stores/filesystem.py create mode 100644 src/aws_durable_execution_sdk_python_testing/stores/memory.py create mode 100644 tests/stores/__init__.py create mode 100644 tests/stores/filesystem_store_test.py rename tests/{store_test.py => stores/memory_store_test.py} (96%) diff --git a/src/aws_durable_execution_sdk_python_testing/checkpoint/processor.py b/src/aws_durable_execution_sdk_python_testing/checkpoint/processor.py index 9d37962..c15762f 100644 --- a/src/aws_durable_execution_sdk_python_testing/checkpoint/processor.py +++ b/src/aws_durable_execution_sdk_python_testing/checkpoint/processor.py @@ -27,7 +27,7 @@ if TYPE_CHECKING: from aws_durable_execution_sdk_python_testing.execution import Execution from aws_durable_execution_sdk_python_testing.scheduler import Scheduler - from aws_durable_execution_sdk_python_testing.store import ExecutionStore + from aws_durable_execution_sdk_python_testing.stores import ExecutionStore class CheckpointProcessor: diff --git a/src/aws_durable_execution_sdk_python_testing/cli.py b/src/aws_durable_execution_sdk_python_testing/cli.py index 89facdc..c23847d 100644 --- a/src/aws_durable_execution_sdk_python_testing/cli.py +++ b/src/aws_durable_execution_sdk_python_testing/cli.py @@ -31,6 +31,7 @@ StartDurableExecutionInput, ) from aws_durable_execution_sdk_python_testing.runner import WebRunner, WebRunnerConfig +from aws_durable_execution_sdk_python_testing.stores import StoreType from aws_durable_execution_sdk_python_testing.web.server import WebServiceConfig @@ -50,6 +51,10 @@ class CliConfig: local_runner_region: str = "us-west-2" local_runner_mode: str = "local" + # Store configuration + store_type: str = "memory" + store_path: str | None = None + @classmethod def from_environment(cls) -> CliConfig: """Create configuration from environment variables with defaults.""" @@ -69,6 +74,8 @@ def from_environment(cls) -> CliConfig: ), local_runner_region=os.getenv("AWS_DEX_LOCAL_RUNNER_REGION", "us-west-2"), local_runner_mode=os.getenv("AWS_DEX_LOCAL_RUNNER_MODE", "local"), + store_type=os.getenv("AWS_DEX_STORE_TYPE", "memory"), + store_path=os.getenv("AWS_DEX_STORE_PATH"), ) @@ -188,6 +195,17 @@ def _create_start_server_parser(self, subparsers) -> None: default=self.config.local_runner_mode, help=f"Local Runner mode (default: {self.config.local_runner_mode}, env: AWS_DEX_LOCAL_RUNNER_MODE)", ) + start_server_parser.add_argument( + "--store-type", + choices=[store_type.value for store_type in StoreType], + default=self.config.store_type, + help=f"Store type for execution persistence (default: {self.config.store_type}, env: AWS_DEX_STORE_TYPE)", + ) + start_server_parser.add_argument( + "--store-path", + default=self.config.store_path, + help=f"Path for filesystem store (default: {self.config.store_path or '.durable_executions'}, env: AWS_DEX_STORE_PATH)", + ) start_server_parser.set_defaults(func=self.start_server_command) def _create_invoke_parser(self, subparsers) -> None: @@ -258,6 +276,8 @@ def start_server_command(self, args: argparse.Namespace) -> int: local_runner_endpoint=args.local_runner_endpoint, local_runner_region=args.local_runner_region, local_runner_mode=args.local_runner_mode, + store_type=args.store_type, + store_path=args.store_path, ) logger.info( @@ -273,6 +293,10 @@ def start_server_command(self, args: argparse.Namespace) -> int: logger.info(" Local Runner Endpoint: %s", args.local_runner_endpoint) logger.info(" Local Runner Region: %s", args.local_runner_region) logger.info(" Local Runner Mode: %s", args.local_runner_mode) + logger.info(" Store Type: %s", args.store_type) + if args.store_type == "filesystem": + store_path = args.store_path or ".durable_executions" + logger.info(" Store Path: %s", store_path) # Use runner as context manager for proper lifecycle with WebRunner(runner_config) as runner: diff --git a/src/aws_durable_execution_sdk_python_testing/executor.py b/src/aws_durable_execution_sdk_python_testing/executor.py index 08aa467..fe621af 100644 --- a/src/aws_durable_execution_sdk_python_testing/executor.py +++ b/src/aws_durable_execution_sdk_python_testing/executor.py @@ -48,7 +48,7 @@ from aws_durable_execution_sdk_python_testing.invoker import Invoker from aws_durable_execution_sdk_python_testing.scheduler import Event, Scheduler - from aws_durable_execution_sdk_python_testing.store import ExecutionStore + from aws_durable_execution_sdk_python_testing.stores import ExecutionStore logger = logging.getLogger(__name__) @@ -142,15 +142,15 @@ def get_execution_details(self, execution_arn: str) -> GetDurableExecutionRespon durable_execution_name=execution.start_input.execution_name, function_arn=f"arn:aws:lambda:us-east-1:123456789012:function:{execution.start_input.function_name}", status=status, - start_timestamp=execution_op.start_timestamp.isoformat() + start_timestamp=execution_op.start_timestamp.timestamp() if execution_op.start_timestamp - else datetime.now(UTC).isoformat(), + else datetime.now(UTC).timestamp(), input_payload=execution_op.execution_details.input_payload if execution_op.execution_details else None, result=result, error=error, - end_timestamp=execution_op.end_timestamp.isoformat() + end_timestamp=execution_op.end_timestamp.timestamp() if execution_op.end_timestamp else None, version="1.0", @@ -223,10 +223,10 @@ def list_executions( durable_execution_name=execution.start_input.execution_name, function_arn=f"arn:aws:lambda:us-east-1:123456789012:function:{execution.start_input.function_name}", status=execution_status, - start_timestamp=execution_op.start_timestamp.isoformat() + start_timestamp=execution_op.start_timestamp.timestamp() if execution_op.start_timestamp - else datetime.now(UTC).isoformat(), - end_timestamp=execution_op.end_timestamp.isoformat() + else datetime.now(UTC).timestamp(), + end_timestamp=execution_op.end_timestamp.timestamp() if execution_op.end_timestamp else None, ) @@ -333,7 +333,7 @@ def stop_execution( # Stop the execution self.fail_execution(execution_arn, stop_error) - return StopDurableExecutionResponse(stop_date=datetime.now(UTC).isoformat()) + return StopDurableExecutionResponse(stop_date=datetime.now(UTC).timestamp()) def get_execution_state( self, diff --git a/src/aws_durable_execution_sdk_python_testing/model.py b/src/aws_durable_execution_sdk_python_testing/model.py index 8753c8a..39ae665 100644 --- a/src/aws_durable_execution_sdk_python_testing/model.py +++ b/src/aws_durable_execution_sdk_python_testing/model.py @@ -131,11 +131,11 @@ class GetDurableExecutionResponse: durable_execution_name: str function_arn: str status: str - start_timestamp: str + start_timestamp: float input_payload: str | None = None result: str | None = None error: ErrorObject | None = None - end_timestamp: str | None = None + end_timestamp: float | None = None version: str | None = None @classmethod @@ -188,8 +188,8 @@ class Execution: durable_execution_name: str function_arn: str status: str - start_timestamp: str - end_timestamp: str | None = None + start_timestamp: float + end_timestamp: float | None = None @classmethod def from_dict(cls, data: dict) -> Execution: @@ -325,7 +325,7 @@ def to_dict(self) -> dict[str, Any]: class StopDurableExecutionResponse: """Response from stopping a durable execution.""" - stop_date: str + stop_date: float @classmethod def from_dict(cls, data: dict) -> StopDurableExecutionResponse: diff --git a/src/aws_durable_execution_sdk_python_testing/runner.py b/src/aws_durable_execution_sdk_python_testing/runner.py index 68e5522..c4fad56 100644 --- a/src/aws_durable_execution_sdk_python_testing/runner.py +++ b/src/aws_durable_execution_sdk_python_testing/runner.py @@ -48,7 +48,11 @@ StartDurableExecutionOutput, ) from aws_durable_execution_sdk_python_testing.scheduler import Scheduler -from aws_durable_execution_sdk_python_testing.store import InMemoryExecutionStore +from aws_durable_execution_sdk_python_testing.stores import ( + ExecutionStore, + FileSystemExecutionStore, + InMemoryExecutionStore, +) from aws_durable_execution_sdk_python_testing.web.server import WebServer @@ -83,6 +87,10 @@ class WebRunnerConfig: local_runner_region: str = "us-west-2" local_runner_mode: str = "local" + # Store configuration + store_type: str = "memory" # "memory" or "filesystem" + store_path: str | None = None # Path for filesystem store + @dataclass(frozen=True) class Operation: @@ -543,7 +551,7 @@ def __init__(self, config: WebRunnerConfig) -> None: self._config = config self._server: WebServer | None = None self._scheduler: Scheduler | None = None - self._store: InMemoryExecutionStore | None = None + self._store: ExecutionStore | None = None self._invoker: LambdaInvoker | None = None self._executor: Executor | None = None @@ -581,7 +589,11 @@ def start(self) -> None: raise DurableFunctionsLocalRunnerError(msg) # Create dependencies and server - self._store = InMemoryExecutionStore() + if self._config.store_type == "filesystem": + store_path = self._config.store_path or ".durable_executions" + self._store = FileSystemExecutionStore.create(store_path) + else: + self._store = InMemoryExecutionStore() self._scheduler = Scheduler() self._invoker = LambdaInvoker(self._create_boto3_client()) diff --git a/src/aws_durable_execution_sdk_python_testing/store.py b/src/aws_durable_execution_sdk_python_testing/store.py deleted file mode 100644 index 1d2b655..0000000 --- a/src/aws_durable_execution_sdk_python_testing/store.py +++ /dev/null @@ -1,50 +0,0 @@ -"""Datestore for the execution data.""" - -from __future__ import annotations - -from typing import TYPE_CHECKING, Protocol - - -if TYPE_CHECKING: - from aws_durable_execution_sdk_python_testing.execution import Execution - - -class ExecutionStore(Protocol): - # ignore cover because coverage doesn't understand elipses - def save(self, execution: Execution) -> None: ... # pragma: no cover - def load(self, execution_arn: str) -> Execution: ... # pragma: no cover - def update(self, execution: Execution) -> None: ... # pragma: no cover - def list_all(self) -> list[Execution]: ... # pragma: no cover - - -class InMemoryExecutionStore(ExecutionStore): - # Dict-based storage for testing - def __init__(self) -> None: - self._store: dict[str, Execution] = {} - - def save(self, execution: Execution) -> None: - self._store[execution.durable_execution_arn] = execution - - def load(self, execution_arn: str) -> Execution: - return self._store[execution_arn] - - def update(self, execution: Execution) -> None: - self._store[execution.durable_execution_arn] = execution - - def list_all(self) -> list[Execution]: - return list(self._store.values()) - - -# class SQLiteExecutionStore(ExecutionStore): -# # SQLite persistence for web server -# def __init__(self) -> None: -# pass - -# def save(self, execution: Execution) -> None: -# pass - -# def load(self, execution_arn: str) -> Execution: -# return Execution.new() - -# def update(self, execution: Execution) -> None: -# pass diff --git a/src/aws_durable_execution_sdk_python_testing/stores/__init__.py b/src/aws_durable_execution_sdk_python_testing/stores/__init__.py new file mode 100644 index 0000000..c61af00 --- /dev/null +++ b/src/aws_durable_execution_sdk_python_testing/stores/__init__.py @@ -0,0 +1,42 @@ +"""Execution stores for persisting durable function executions.""" + +from __future__ import annotations + +from enum import Enum +from typing import TYPE_CHECKING, Protocol + +from aws_durable_execution_sdk_python_testing.stores.filesystem import ( + FileSystemExecutionStore, +) +from aws_durable_execution_sdk_python_testing.stores.memory import ( + InMemoryExecutionStore, +) + + +if TYPE_CHECKING: + from aws_durable_execution_sdk_python_testing.execution import Execution + + +class StoreType(Enum): + """Supported execution store types.""" + + MEMORY = "memory" + FILESYSTEM = "filesystem" + + +class ExecutionStore(Protocol): + """Protocol for execution storage implementations.""" + + # ignore cover because coverage doesn't understand elipses + def save(self, execution: Execution) -> None: ... # pragma: no cover + def load(self, execution_arn: str) -> Execution: ... # pragma: no cover + def update(self, execution: Execution) -> None: ... # pragma: no cover + def list_all(self) -> list[Execution]: ... # pragma: no cover + + +__all__ = [ + "StoreType", + "ExecutionStore", + "InMemoryExecutionStore", + "FileSystemExecutionStore", +] diff --git a/src/aws_durable_execution_sdk_python_testing/stores/filesystem.py b/src/aws_durable_execution_sdk_python_testing/stores/filesystem.py new file mode 100644 index 0000000..3b5e0c9 --- /dev/null +++ b/src/aws_durable_execution_sdk_python_testing/stores/filesystem.py @@ -0,0 +1,99 @@ +"""File system-based execution store implementation.""" + +from __future__ import annotations + +import json +import logging +from datetime import UTC, datetime +from pathlib import Path + +from aws_durable_execution_sdk_python_testing.exceptions import ( + DurableFunctionsLocalRunnerError, +) +from aws_durable_execution_sdk_python_testing.execution import Execution + + +class DateTimeEncoder(json.JSONEncoder): + """Custom JSON encoder that handles datetime objects.""" + + def default(self, obj): + if isinstance(obj, datetime): + return obj.timestamp() + return super().default(obj) + + +def datetime_object_hook(obj): + """JSON object hook to convert unix timestamps back to datetime objects.""" + if isinstance(obj, dict): + for key, value in obj.items(): + if isinstance(value, int | float) and key.endswith(("_timestamp", "_time")): + try: # noqa: SIM105 + obj[key] = datetime.fromtimestamp(value, tz=UTC) + except (ValueError, OSError): + # Leave as number if not a valid timestamp + pass + return obj + + +class FileSystemExecutionStore: + """File system-based execution store for persistence.""" + + def __init__(self, storage_dir: Path) -> None: + self._storage_dir = storage_dir + + @classmethod + def create(cls, storage_dir: str | Path | None = None) -> FileSystemExecutionStore: + """Create a FileSystemExecutionStore with directory creation. + + Args: + storage_dir: Directory path for storage. Defaults to '.durable_executions' + + Returns: + FileSystemExecutionStore instance with created directory + """ + path = Path(storage_dir) if storage_dir else Path(".durable_executions") + path.mkdir(exist_ok=True) + return cls(storage_dir=path) + + def _get_file_path(self, execution_arn: str) -> Path: + """Get file path for execution ARN.""" + # Use ARN as filename with .json extension, replacing unsafe characters + safe_filename = execution_arn.replace(":", "_").replace("/", "_") + return self._storage_dir / f"{safe_filename}.json" + + def save(self, execution: Execution) -> None: + """Save execution to file system.""" + file_path = self._get_file_path(execution.durable_execution_arn) + data = execution.to_dict() + + with open(file_path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2, cls=DateTimeEncoder) + + def load(self, execution_arn: str) -> Execution: + """Load execution from file system.""" + file_path = self._get_file_path(execution_arn) + if not file_path.exists(): + msg = f"Execution {execution_arn} not found" + raise DurableFunctionsLocalRunnerError(msg) + + with open(file_path, encoding="utf-8") as f: + data = json.load(f, object_hook=datetime_object_hook) + + return Execution.from_dict(data) + + def update(self, execution: Execution) -> None: + """Update execution in file system (same as save).""" + self.save(execution) + + def list_all(self) -> list[Execution]: + """List all executions from file system.""" + executions = [] + for file_path in self._storage_dir.glob("*.json"): + try: + with open(file_path, encoding="utf-8") as f: + data = json.load(f, object_hook=datetime_object_hook) + executions.append(Execution.from_dict(data)) + except (json.JSONDecodeError, KeyError, OSError) as e: + logging.warning("Skipping corrupted file %s: %s", file_path, e) + continue + return executions diff --git a/src/aws_durable_execution_sdk_python_testing/stores/memory.py b/src/aws_durable_execution_sdk_python_testing/stores/memory.py new file mode 100644 index 0000000..482bef9 --- /dev/null +++ b/src/aws_durable_execution_sdk_python_testing/stores/memory.py @@ -0,0 +1,28 @@ +"""In-memory execution store implementation.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + + +if TYPE_CHECKING: + from aws_durable_execution_sdk_python_testing.execution import Execution + + +class InMemoryExecutionStore: + """Dict-based storage for testing.""" + + def __init__(self) -> None: + self._store: dict[str, Execution] = {} + + def save(self, execution: Execution) -> None: + self._store[execution.durable_execution_arn] = execution + + def load(self, execution_arn: str) -> Execution: + return self._store[execution_arn] + + def update(self, execution: Execution) -> None: + self._store[execution.durable_execution_arn] = execution + + def list_all(self) -> list[Execution]: + return list(self._store.values()) diff --git a/tests/checkpoint/processor_test.py b/tests/checkpoint/processor_test.py index 450570d..c686f7a 100644 --- a/tests/checkpoint/processor_test.py +++ b/tests/checkpoint/processor_test.py @@ -20,7 +20,7 @@ ) from aws_durable_execution_sdk_python_testing.execution import Execution from aws_durable_execution_sdk_python_testing.scheduler import Scheduler -from aws_durable_execution_sdk_python_testing.store import ExecutionStore +from aws_durable_execution_sdk_python_testing.stores import ExecutionStore from aws_durable_execution_sdk_python_testing.token import CheckpointToken diff --git a/tests/stores/__init__.py b/tests/stores/__init__.py new file mode 100644 index 0000000..dbc7145 --- /dev/null +++ b/tests/stores/__init__.py @@ -0,0 +1 @@ +"""Tests for store implementations.""" diff --git a/tests/stores/filesystem_store_test.py b/tests/stores/filesystem_store_test.py new file mode 100644 index 0000000..1eb1538 --- /dev/null +++ b/tests/stores/filesystem_store_test.py @@ -0,0 +1,264 @@ +"""Tests for FileSystemExecutionStore.""" + +import tempfile +from pathlib import Path + +import pytest + +from aws_durable_execution_sdk_python_testing.exceptions import ( + DurableFunctionsLocalRunnerError, +) +from aws_durable_execution_sdk_python_testing.execution import Execution +from aws_durable_execution_sdk_python_testing.model import StartDurableExecutionInput +from aws_durable_execution_sdk_python_testing.stores.filesystem import ( + FileSystemExecutionStore, +) + + +@pytest.fixture +def temp_storage_dir(): + """Create a temporary directory for testing.""" + with tempfile.TemporaryDirectory() as temp_dir: + yield Path(temp_dir) + + +@pytest.fixture +def store(temp_storage_dir): + """Create a FileSystemExecutionStore with temporary storage.""" + return FileSystemExecutionStore.create(temp_storage_dir) + + +@pytest.fixture +def sample_execution(): + """Create a sample execution for testing.""" + input_data = StartDurableExecutionInput( + account_id="123456789012", + function_name="test-function", + function_qualifier="$LATEST", + execution_name="test-execution", + execution_timeout_seconds=300, + execution_retention_period_days=7, + invocation_id="test-invocation-id", + ) + return Execution.new(input_data) + + +def test_filesystem_execution_store_save_and_load(store, sample_execution): + """Test saving and loading an execution.""" + store.save(sample_execution) + loaded_execution = store.load(sample_execution.durable_execution_arn) + + assert ( + loaded_execution.durable_execution_arn == sample_execution.durable_execution_arn + ) + assert ( + loaded_execution.start_input.function_name + == sample_execution.start_input.function_name + ) + assert ( + loaded_execution.start_input.execution_name + == sample_execution.start_input.execution_name + ) + assert loaded_execution.token_sequence == sample_execution.token_sequence + assert loaded_execution.is_complete == sample_execution.is_complete + + +def test_filesystem_execution_store_load_nonexistent(store): + """Test loading a nonexistent execution raises DurableFunctionsLocalRunnerError.""" + with pytest.raises( + DurableFunctionsLocalRunnerError, match="Execution nonexistent-arn not found" + ): + store.load("nonexistent-arn") + + +def test_filesystem_execution_store_update(store, sample_execution): + """Test updating an execution.""" + store.save(sample_execution) + + sample_execution.is_complete = True + sample_execution.token_sequence = 5 + store.update(sample_execution) + + loaded_execution = store.load(sample_execution.durable_execution_arn) + assert loaded_execution.is_complete is True + assert loaded_execution.token_sequence == 5 + + +def test_filesystem_execution_store_update_overwrites(store, temp_storage_dir): + """Test that update overwrites existing execution.""" + input_data = StartDurableExecutionInput( + account_id="123456789012", + function_name="test-function", + function_qualifier="$LATEST", + execution_name="test-execution", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ) + execution1 = Execution.new(input_data) + execution2 = Execution.new(input_data) + execution2.durable_execution_arn = execution1.durable_execution_arn + execution2.token_sequence = 10 + + store.save(execution1) + store.update(execution2) + + loaded_execution = store.load(execution1.durable_execution_arn) + assert loaded_execution.token_sequence == 10 + + +def test_filesystem_execution_store_multiple_executions(store): + """Test storing multiple executions.""" + input_data1 = StartDurableExecutionInput( + account_id="123456789012", + function_name="test-function-1", + function_qualifier="$LATEST", + execution_name="test-execution-1", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ) + input_data2 = StartDurableExecutionInput( + account_id="123456789012", + function_name="test-function-2", + function_qualifier="$LATEST", + execution_name="test-execution-2", + execution_timeout_seconds=600, + execution_retention_period_days=14, + ) + + execution1 = Execution.new(input_data1) + execution2 = Execution.new(input_data2) + + store.save(execution1) + store.save(execution2) + + loaded_execution1 = store.load(execution1.durable_execution_arn) + loaded_execution2 = store.load(execution2.durable_execution_arn) + + assert loaded_execution1.durable_execution_arn == execution1.durable_execution_arn + assert loaded_execution2.durable_execution_arn == execution2.durable_execution_arn + assert loaded_execution1.start_input.function_name == "test-function-1" + assert loaded_execution2.start_input.function_name == "test-function-2" + + +def test_filesystem_execution_store_list_all_empty(store): + """Test list_all method with empty store.""" + result = store.list_all() + assert result == [] + + +def test_filesystem_execution_store_list_all_with_executions(store): + """Test list_all method with multiple executions.""" + # Create test executions + input_data1 = StartDurableExecutionInput( + account_id="123456789012", + function_name="test-function-1", + function_qualifier="$LATEST", + execution_name="test-execution-1", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ) + input_data2 = StartDurableExecutionInput( + account_id="123456789012", + function_name="test-function-2", + function_qualifier="$LATEST", + execution_name="test-execution-2", + execution_timeout_seconds=600, + execution_retention_period_days=14, + ) + input_data3 = StartDurableExecutionInput( + account_id="123456789012", + function_name="test-function-3", + function_qualifier="$LATEST", + execution_name="test-execution-3", + execution_timeout_seconds=900, + execution_retention_period_days=21, + ) + + execution1 = Execution.new(input_data1) + execution2 = Execution.new(input_data2) + execution3 = Execution.new(input_data3) + + # Save executions + store.save(execution1) + store.save(execution2) + store.save(execution3) + + # Test list_all + result = store.list_all() + + assert len(result) == 3 + arns = {execution.durable_execution_arn for execution in result} + assert execution1.durable_execution_arn in arns + assert execution2.durable_execution_arn in arns + assert execution3.durable_execution_arn in arns + + +def test_filesystem_execution_store_file_path_generation( + store, sample_execution, temp_storage_dir +): + """Test that file paths are generated correctly with safe filenames.""" + arn_with_colons = "arn:aws:lambda:us-east-1:123456789012:durable-execution:test" + expected_filename = ( + "arn_aws_lambda_us-east-1_123456789012_durable-execution_test.json" + ) + + # Test by saving and checking the file exists with expected name + sample_execution.durable_execution_arn = arn_with_colons + store.save(sample_execution) + + expected_file = temp_storage_dir / expected_filename + assert expected_file.exists() + + +def test_filesystem_execution_store_corrupted_file_handling(store, temp_storage_dir): + """Test that corrupted files are skipped during list_all.""" + # Create a valid execution + input_data = StartDurableExecutionInput( + account_id="123456789012", + function_name="test-function", + function_qualifier="$LATEST", + execution_name="test-execution", + execution_timeout_seconds=300, + execution_retention_period_days=7, + ) + execution = Execution.new(input_data) + store.save(execution) + + # Create a corrupted file + corrupted_file = temp_storage_dir / "corrupted.json" + with open(corrupted_file, "w") as f: + f.write("invalid json content") + + # list_all should skip the corrupted file and return only valid executions + result = store.list_all() + assert len(result) == 1 + assert result[0].durable_execution_arn == execution.durable_execution_arn + + +def test_filesystem_execution_store_custom_storage_dir(): + """Test creating store with custom storage directory.""" + with tempfile.TemporaryDirectory() as temp_dir: + custom_dir = Path(temp_dir) / "custom_storage" + FileSystemExecutionStore.create(custom_dir) + + # Directory should be created + assert custom_dir.exists() + assert custom_dir.is_dir() + + +def test_filesystem_execution_store_init_no_side_effects(): + """Test that __init__ doesn't create directories (no side effects).""" + with tempfile.TemporaryDirectory() as temp_dir: + nonexistent_dir = Path(temp_dir) / "nonexistent" + + # __init__ should not create the directory + FileSystemExecutionStore(nonexistent_dir) + assert not nonexistent_dir.exists() + + +def test_filesystem_execution_store_thread_safety_basic(store, sample_execution): + """Basic test that operations work without locking (atomic file operations).""" + # Test that basic operations work - atomic file operations provide thread safety + store.save(sample_execution) + loaded = store.load(sample_execution.durable_execution_arn) + assert loaded.durable_execution_arn == sample_execution.durable_execution_arn diff --git a/tests/store_test.py b/tests/stores/memory_store_test.py similarity index 96% rename from tests/store_test.py rename to tests/stores/memory_store_test.py index 32e29aa..a58cf54 100644 --- a/tests/store_test.py +++ b/tests/stores/memory_store_test.py @@ -1,4 +1,4 @@ -"""Tests for store module.""" +"""Tests for InMemoryExecutionStore.""" from unittest.mock import Mock @@ -6,7 +6,9 @@ from aws_durable_execution_sdk_python_testing.execution import Execution from aws_durable_execution_sdk_python_testing.model import StartDurableExecutionInput -from aws_durable_execution_sdk_python_testing.store import InMemoryExecutionStore +from aws_durable_execution_sdk_python_testing.stores.memory import ( + InMemoryExecutionStore, +) def test_in_memory_execution_store_save_and_load(): From d9e647710ceb4ca43b07c06555f8e997f13de785 Mon Sep 17 00:00:00 2001 From: Brent Champion Date: Tue, 14 Oct 2025 18:59:53 -0400 Subject: [PATCH 2/2] feat: implement filesystem store --- .../checkpoint/processor.py | 2 +- .../cli.py | 14 +++---- .../executor.py | 2 +- .../runner.py | 11 +++-- .../stores/__init__.py | 41 ------------------- .../stores/base.py | 27 ++++++++++++ tests/checkpoint/processor_test.py | 2 +- 7 files changed, 45 insertions(+), 54 deletions(-) create mode 100644 src/aws_durable_execution_sdk_python_testing/stores/base.py diff --git a/src/aws_durable_execution_sdk_python_testing/checkpoint/processor.py b/src/aws_durable_execution_sdk_python_testing/checkpoint/processor.py index c15762f..9189706 100644 --- a/src/aws_durable_execution_sdk_python_testing/checkpoint/processor.py +++ b/src/aws_durable_execution_sdk_python_testing/checkpoint/processor.py @@ -27,7 +27,7 @@ if TYPE_CHECKING: from aws_durable_execution_sdk_python_testing.execution import Execution from aws_durable_execution_sdk_python_testing.scheduler import Scheduler - from aws_durable_execution_sdk_python_testing.stores import ExecutionStore + from aws_durable_execution_sdk_python_testing.stores.base import ExecutionStore class CheckpointProcessor: diff --git a/src/aws_durable_execution_sdk_python_testing/cli.py b/src/aws_durable_execution_sdk_python_testing/cli.py index c23847d..d6d82d3 100644 --- a/src/aws_durable_execution_sdk_python_testing/cli.py +++ b/src/aws_durable_execution_sdk_python_testing/cli.py @@ -31,7 +31,7 @@ StartDurableExecutionInput, ) from aws_durable_execution_sdk_python_testing.runner import WebRunner, WebRunnerConfig -from aws_durable_execution_sdk_python_testing.stores import StoreType +from aws_durable_execution_sdk_python_testing.stores.base import StoreType from aws_durable_execution_sdk_python_testing.web.server import WebServiceConfig @@ -52,7 +52,7 @@ class CliConfig: local_runner_mode: str = "local" # Store configuration - store_type: str = "memory" + store_type: StoreType = StoreType.MEMORY store_path: str | None = None @classmethod @@ -74,7 +74,7 @@ def from_environment(cls) -> CliConfig: ), local_runner_region=os.getenv("AWS_DEX_LOCAL_RUNNER_REGION", "us-west-2"), local_runner_mode=os.getenv("AWS_DEX_LOCAL_RUNNER_MODE", "local"), - store_type=os.getenv("AWS_DEX_STORE_TYPE", "memory"), + store_type=StoreType(os.getenv("AWS_DEX_STORE_TYPE", "memory")), store_path=os.getenv("AWS_DEX_STORE_PATH"), ) @@ -198,8 +198,8 @@ def _create_start_server_parser(self, subparsers) -> None: start_server_parser.add_argument( "--store-type", choices=[store_type.value for store_type in StoreType], - default=self.config.store_type, - help=f"Store type for execution persistence (default: {self.config.store_type}, env: AWS_DEX_STORE_TYPE)", + default=self.config.store_type.value, + help=f"Store type for execution persistence (default: {self.config.store_type.value}, env: AWS_DEX_STORE_TYPE)", ) start_server_parser.add_argument( "--store-path", @@ -276,7 +276,7 @@ def start_server_command(self, args: argparse.Namespace) -> int: local_runner_endpoint=args.local_runner_endpoint, local_runner_region=args.local_runner_region, local_runner_mode=args.local_runner_mode, - store_type=args.store_type, + store_type=StoreType(args.store_type), store_path=args.store_path, ) @@ -294,7 +294,7 @@ def start_server_command(self, args: argparse.Namespace) -> int: logger.info(" Local Runner Region: %s", args.local_runner_region) logger.info(" Local Runner Mode: %s", args.local_runner_mode) logger.info(" Store Type: %s", args.store_type) - if args.store_type == "filesystem": + if StoreType(args.store_type) == StoreType.FILESYSTEM: store_path = args.store_path or ".durable_executions" logger.info(" Store Path: %s", store_path) diff --git a/src/aws_durable_execution_sdk_python_testing/executor.py b/src/aws_durable_execution_sdk_python_testing/executor.py index fe621af..77ce59a 100644 --- a/src/aws_durable_execution_sdk_python_testing/executor.py +++ b/src/aws_durable_execution_sdk_python_testing/executor.py @@ -48,7 +48,7 @@ from aws_durable_execution_sdk_python_testing.invoker import Invoker from aws_durable_execution_sdk_python_testing.scheduler import Event, Scheduler - from aws_durable_execution_sdk_python_testing.stores import ExecutionStore + from aws_durable_execution_sdk_python_testing.stores.base import ExecutionStore logger = logging.getLogger(__name__) diff --git a/src/aws_durable_execution_sdk_python_testing/runner.py b/src/aws_durable_execution_sdk_python_testing/runner.py index c4fad56..a79c617 100644 --- a/src/aws_durable_execution_sdk_python_testing/runner.py +++ b/src/aws_durable_execution_sdk_python_testing/runner.py @@ -48,9 +48,14 @@ StartDurableExecutionOutput, ) from aws_durable_execution_sdk_python_testing.scheduler import Scheduler -from aws_durable_execution_sdk_python_testing.stores import ( +from aws_durable_execution_sdk_python_testing.stores.base import ( ExecutionStore, + StoreType, +) +from aws_durable_execution_sdk_python_testing.stores.filesystem import ( FileSystemExecutionStore, +) +from aws_durable_execution_sdk_python_testing.stores.memory import ( InMemoryExecutionStore, ) from aws_durable_execution_sdk_python_testing.web.server import WebServer @@ -88,7 +93,7 @@ class WebRunnerConfig: local_runner_mode: str = "local" # Store configuration - store_type: str = "memory" # "memory" or "filesystem" + store_type: StoreType = StoreType.MEMORY store_path: str | None = None # Path for filesystem store @@ -589,7 +594,7 @@ def start(self) -> None: raise DurableFunctionsLocalRunnerError(msg) # Create dependencies and server - if self._config.store_type == "filesystem": + if self._config.store_type == StoreType.FILESYSTEM: store_path = self._config.store_path or ".durable_executions" self._store = FileSystemExecutionStore.create(store_path) else: diff --git a/src/aws_durable_execution_sdk_python_testing/stores/__init__.py b/src/aws_durable_execution_sdk_python_testing/stores/__init__.py index c61af00..8b13789 100644 --- a/src/aws_durable_execution_sdk_python_testing/stores/__init__.py +++ b/src/aws_durable_execution_sdk_python_testing/stores/__init__.py @@ -1,42 +1 @@ -"""Execution stores for persisting durable function executions.""" -from __future__ import annotations - -from enum import Enum -from typing import TYPE_CHECKING, Protocol - -from aws_durable_execution_sdk_python_testing.stores.filesystem import ( - FileSystemExecutionStore, -) -from aws_durable_execution_sdk_python_testing.stores.memory import ( - InMemoryExecutionStore, -) - - -if TYPE_CHECKING: - from aws_durable_execution_sdk_python_testing.execution import Execution - - -class StoreType(Enum): - """Supported execution store types.""" - - MEMORY = "memory" - FILESYSTEM = "filesystem" - - -class ExecutionStore(Protocol): - """Protocol for execution storage implementations.""" - - # ignore cover because coverage doesn't understand elipses - def save(self, execution: Execution) -> None: ... # pragma: no cover - def load(self, execution_arn: str) -> Execution: ... # pragma: no cover - def update(self, execution: Execution) -> None: ... # pragma: no cover - def list_all(self) -> list[Execution]: ... # pragma: no cover - - -__all__ = [ - "StoreType", - "ExecutionStore", - "InMemoryExecutionStore", - "FileSystemExecutionStore", -] diff --git a/src/aws_durable_execution_sdk_python_testing/stores/base.py b/src/aws_durable_execution_sdk_python_testing/stores/base.py new file mode 100644 index 0000000..f4943e9 --- /dev/null +++ b/src/aws_durable_execution_sdk_python_testing/stores/base.py @@ -0,0 +1,27 @@ +"""Base classes and protocols for execution stores.""" + +from __future__ import annotations + +from enum import Enum +from typing import TYPE_CHECKING, Protocol + + +if TYPE_CHECKING: + from aws_durable_execution_sdk_python_testing.execution import Execution + + +class StoreType(Enum): + """Supported execution store types.""" + + MEMORY = "memory" + FILESYSTEM = "filesystem" + + +class ExecutionStore(Protocol): + """Protocol for execution storage implementations.""" + + # ignore cover because coverage doesn't understand elipses + def save(self, execution: Execution) -> None: ... # pragma: no cover + def load(self, execution_arn: str) -> Execution: ... # pragma: no cover + def update(self, execution: Execution) -> None: ... # pragma: no cover + def list_all(self) -> list[Execution]: ... # pragma: no cover diff --git a/tests/checkpoint/processor_test.py b/tests/checkpoint/processor_test.py index c686f7a..1df24cc 100644 --- a/tests/checkpoint/processor_test.py +++ b/tests/checkpoint/processor_test.py @@ -20,7 +20,7 @@ ) from aws_durable_execution_sdk_python_testing.execution import Execution from aws_durable_execution_sdk_python_testing.scheduler import Scheduler -from aws_durable_execution_sdk_python_testing.stores import ExecutionStore +from aws_durable_execution_sdk_python_testing.stores.base import ExecutionStore from aws_durable_execution_sdk_python_testing.token import CheckpointToken