Skip to content

Commit e71a0ba

Browse files
committed
feat: implement filesystem store
1 parent 128c297 commit e71a0ba

File tree

13 files changed

+509
-70
lines changed

13 files changed

+509
-70
lines changed

src/aws_durable_execution_sdk_python_testing/checkpoint/processor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
if TYPE_CHECKING:
2828
from aws_durable_execution_sdk_python_testing.execution import Execution
2929
from aws_durable_execution_sdk_python_testing.scheduler import Scheduler
30-
from aws_durable_execution_sdk_python_testing.store import ExecutionStore
30+
from aws_durable_execution_sdk_python_testing.stores import ExecutionStore
3131

3232

3333
class CheckpointProcessor:

src/aws_durable_execution_sdk_python_testing/cli.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
StartDurableExecutionInput,
3232
)
3333
from aws_durable_execution_sdk_python_testing.runner import WebRunner, WebRunnerConfig
34+
from aws_durable_execution_sdk_python_testing.stores import StoreType
3435
from aws_durable_execution_sdk_python_testing.web.server import WebServiceConfig
3536

3637

@@ -50,6 +51,10 @@ class CliConfig:
5051
local_runner_region: str = "us-west-2"
5152
local_runner_mode: str = "local"
5253

54+
# Store configuration
55+
store_type: str = "memory"
56+
store_path: str | None = None
57+
5358
@classmethod
5459
def from_environment(cls) -> CliConfig:
5560
"""Create configuration from environment variables with defaults."""
@@ -69,6 +74,8 @@ def from_environment(cls) -> CliConfig:
6974
),
7075
local_runner_region=os.getenv("AWS_DEX_LOCAL_RUNNER_REGION", "us-west-2"),
7176
local_runner_mode=os.getenv("AWS_DEX_LOCAL_RUNNER_MODE", "local"),
77+
store_type=os.getenv("AWS_DEX_STORE_TYPE", "memory"),
78+
store_path=os.getenv("AWS_DEX_STORE_PATH"),
7279
)
7380

7481

@@ -188,6 +195,17 @@ def _create_start_server_parser(self, subparsers) -> None:
188195
default=self.config.local_runner_mode,
189196
help=f"Local Runner mode (default: {self.config.local_runner_mode}, env: AWS_DEX_LOCAL_RUNNER_MODE)",
190197
)
198+
start_server_parser.add_argument(
199+
"--store-type",
200+
choices=[store_type.value for store_type in StoreType],
201+
default=self.config.store_type,
202+
help=f"Store type for execution persistence (default: {self.config.store_type}, env: AWS_DEX_STORE_TYPE)",
203+
)
204+
start_server_parser.add_argument(
205+
"--store-path",
206+
default=self.config.store_path,
207+
help=f"Path for filesystem store (default: {self.config.store_path or '.durable_executions'}, env: AWS_DEX_STORE_PATH)",
208+
)
191209
start_server_parser.set_defaults(func=self.start_server_command)
192210

193211
def _create_invoke_parser(self, subparsers) -> None:
@@ -258,6 +276,8 @@ def start_server_command(self, args: argparse.Namespace) -> int:
258276
local_runner_endpoint=args.local_runner_endpoint,
259277
local_runner_region=args.local_runner_region,
260278
local_runner_mode=args.local_runner_mode,
279+
store_type=args.store_type,
280+
store_path=args.store_path,
261281
)
262282

263283
logger.info(
@@ -273,6 +293,10 @@ def start_server_command(self, args: argparse.Namespace) -> int:
273293
logger.info(" Local Runner Endpoint: %s", args.local_runner_endpoint)
274294
logger.info(" Local Runner Region: %s", args.local_runner_region)
275295
logger.info(" Local Runner Mode: %s", args.local_runner_mode)
296+
logger.info(" Store Type: %s", args.store_type)
297+
if args.store_type == "filesystem":
298+
store_path = args.store_path or ".durable_executions"
299+
logger.info(" Store Path: %s", store_path)
276300

277301
# Use runner as context manager for proper lifecycle
278302
with WebRunner(runner_config) as runner:

src/aws_durable_execution_sdk_python_testing/executor.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848

4949
from aws_durable_execution_sdk_python_testing.invoker import Invoker
5050
from aws_durable_execution_sdk_python_testing.scheduler import Event, Scheduler
51-
from aws_durable_execution_sdk_python_testing.store import ExecutionStore
51+
from aws_durable_execution_sdk_python_testing.stores import ExecutionStore
5252

5353
logger = logging.getLogger(__name__)
5454

@@ -142,15 +142,15 @@ def get_execution_details(self, execution_arn: str) -> GetDurableExecutionRespon
142142
durable_execution_name=execution.start_input.execution_name,
143143
function_arn=f"arn:aws:lambda:us-east-1:123456789012:function:{execution.start_input.function_name}",
144144
status=status,
145-
start_timestamp=execution_op.start_timestamp.isoformat()
145+
start_timestamp=execution_op.start_timestamp.timestamp()
146146
if execution_op.start_timestamp
147-
else datetime.now(UTC).isoformat(),
147+
else datetime.now(UTC).timestamp(),
148148
input_payload=execution_op.execution_details.input_payload
149149
if execution_op.execution_details
150150
else None,
151151
result=result,
152152
error=error,
153-
end_timestamp=execution_op.end_timestamp.isoformat()
153+
end_timestamp=execution_op.end_timestamp.timestamp()
154154
if execution_op.end_timestamp
155155
else None,
156156
version="1.0",
@@ -223,10 +223,10 @@ def list_executions(
223223
durable_execution_name=execution.start_input.execution_name,
224224
function_arn=f"arn:aws:lambda:us-east-1:123456789012:function:{execution.start_input.function_name}",
225225
status=execution_status,
226-
start_timestamp=execution_op.start_timestamp.isoformat()
226+
start_timestamp=execution_op.start_timestamp.timestamp()
227227
if execution_op.start_timestamp
228-
else datetime.now(UTC).isoformat(),
229-
end_timestamp=execution_op.end_timestamp.isoformat()
228+
else datetime.now(UTC).timestamp(),
229+
end_timestamp=execution_op.end_timestamp.timestamp()
230230
if execution_op.end_timestamp
231231
else None,
232232
)
@@ -333,7 +333,7 @@ def stop_execution(
333333
# Stop the execution
334334
self.fail_execution(execution_arn, stop_error)
335335

336-
return StopDurableExecutionResponse(stop_date=datetime.now(UTC).isoformat())
336+
return StopDurableExecutionResponse(stop_date=datetime.now(UTC).timestamp())
337337

338338
def get_execution_state(
339339
self,

src/aws_durable_execution_sdk_python_testing/model.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,11 @@ class GetDurableExecutionResponse:
131131
durable_execution_name: str
132132
function_arn: str
133133
status: str
134-
start_timestamp: str
134+
start_timestamp: float
135135
input_payload: str | None = None
136136
result: str | None = None
137137
error: ErrorObject | None = None
138-
end_timestamp: str | None = None
138+
end_timestamp: float | None = None
139139
version: str | None = None
140140

141141
@classmethod
@@ -188,8 +188,8 @@ class Execution:
188188
durable_execution_name: str
189189
function_arn: str
190190
status: str
191-
start_timestamp: str
192-
end_timestamp: str | None = None
191+
start_timestamp: float
192+
end_timestamp: float | None = None
193193

194194
@classmethod
195195
def from_dict(cls, data: dict) -> Execution:
@@ -325,7 +325,7 @@ def to_dict(self) -> dict[str, Any]:
325325
class StopDurableExecutionResponse:
326326
"""Response from stopping a durable execution."""
327327

328-
stop_date: str
328+
stop_date: float
329329

330330
@classmethod
331331
def from_dict(cls, data: dict) -> StopDurableExecutionResponse:

src/aws_durable_execution_sdk_python_testing/runner.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,11 @@
4848
StartDurableExecutionOutput,
4949
)
5050
from aws_durable_execution_sdk_python_testing.scheduler import Scheduler
51-
from aws_durable_execution_sdk_python_testing.store import InMemoryExecutionStore
51+
from aws_durable_execution_sdk_python_testing.stores import (
52+
ExecutionStore,
53+
FileSystemExecutionStore,
54+
InMemoryExecutionStore,
55+
)
5256
from aws_durable_execution_sdk_python_testing.web.server import WebServer
5357

5458

@@ -83,6 +87,10 @@ class WebRunnerConfig:
8387
local_runner_region: str = "us-west-2"
8488
local_runner_mode: str = "local"
8589

90+
# Store configuration
91+
store_type: str = "memory" # "memory" or "filesystem"
92+
store_path: str | None = None # Path for filesystem store
93+
8694

8795
@dataclass(frozen=True)
8896
class Operation:
@@ -543,7 +551,7 @@ def __init__(self, config: WebRunnerConfig) -> None:
543551
self._config = config
544552
self._server: WebServer | None = None
545553
self._scheduler: Scheduler | None = None
546-
self._store: InMemoryExecutionStore | None = None
554+
self._store: ExecutionStore | None = None
547555
self._invoker: LambdaInvoker | None = None
548556
self._executor: Executor | None = None
549557

@@ -581,7 +589,11 @@ def start(self) -> None:
581589
raise DurableFunctionsLocalRunnerError(msg)
582590

583591
# Create dependencies and server
584-
self._store = InMemoryExecutionStore()
592+
if self._config.store_type == "filesystem":
593+
store_path = self._config.store_path or ".durable_executions"
594+
self._store = FileSystemExecutionStore.create(store_path)
595+
else:
596+
self._store = InMemoryExecutionStore()
585597
self._scheduler = Scheduler()
586598
self._invoker = LambdaInvoker(self._create_boto3_client())
587599

src/aws_durable_execution_sdk_python_testing/store.py

Lines changed: 0 additions & 50 deletions
This file was deleted.
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
"""Execution stores for persisting durable function executions."""
2+
3+
from __future__ import annotations
4+
5+
from enum import Enum
6+
from typing import TYPE_CHECKING, Protocol
7+
8+
from aws_durable_execution_sdk_python_testing.stores.filesystem import (
9+
FileSystemExecutionStore,
10+
)
11+
from aws_durable_execution_sdk_python_testing.stores.memory import (
12+
InMemoryExecutionStore,
13+
)
14+
15+
16+
if TYPE_CHECKING:
17+
from aws_durable_execution_sdk_python_testing.execution import Execution
18+
19+
20+
class StoreType(Enum):
21+
"""Supported execution store types."""
22+
23+
MEMORY = "memory"
24+
FILESYSTEM = "filesystem"
25+
26+
27+
class ExecutionStore(Protocol):
28+
"""Protocol for execution storage implementations."""
29+
30+
# ignore cover because coverage doesn't understand elipses
31+
def save(self, execution: Execution) -> None: ... # pragma: no cover
32+
def load(self, execution_arn: str) -> Execution: ... # pragma: no cover
33+
def update(self, execution: Execution) -> None: ... # pragma: no cover
34+
def list_all(self) -> list[Execution]: ... # pragma: no cover
35+
36+
37+
__all__ = [
38+
"StoreType",
39+
"ExecutionStore",
40+
"InMemoryExecutionStore",
41+
"FileSystemExecutionStore",
42+
]
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
"""File system-based execution store implementation."""
2+
3+
from __future__ import annotations
4+
5+
import json
6+
import logging
7+
from datetime import UTC, datetime
8+
from pathlib import Path
9+
10+
from aws_durable_execution_sdk_python_testing.exceptions import (
11+
DurableFunctionsLocalRunnerError,
12+
)
13+
from aws_durable_execution_sdk_python_testing.execution import Execution
14+
15+
16+
class DateTimeEncoder(json.JSONEncoder):
17+
"""Custom JSON encoder that handles datetime objects."""
18+
19+
def default(self, obj):
20+
if isinstance(obj, datetime):
21+
return obj.timestamp()
22+
return super().default(obj)
23+
24+
25+
def datetime_object_hook(obj):
26+
"""JSON object hook to convert unix timestamps back to datetime objects."""
27+
if isinstance(obj, dict):
28+
for key, value in obj.items():
29+
if isinstance(value, int | float) and key.endswith(("_timestamp", "_time")):
30+
try: # noqa: SIM105
31+
obj[key] = datetime.fromtimestamp(value, tz=UTC)
32+
except (ValueError, OSError):
33+
# Leave as number if not a valid timestamp
34+
pass
35+
return obj
36+
37+
38+
class FileSystemExecutionStore:
39+
"""File system-based execution store for persistence."""
40+
41+
def __init__(self, storage_dir: Path) -> None:
42+
self._storage_dir = storage_dir
43+
44+
@classmethod
45+
def create(cls, storage_dir: str | Path | None = None) -> FileSystemExecutionStore:
46+
"""Create a FileSystemExecutionStore with directory creation.
47+
48+
Args:
49+
storage_dir: Directory path for storage. Defaults to '.durable_executions'
50+
51+
Returns:
52+
FileSystemExecutionStore instance with created directory
53+
"""
54+
path = Path(storage_dir) if storage_dir else Path(".durable_executions")
55+
path.mkdir(exist_ok=True)
56+
return cls(storage_dir=path)
57+
58+
def _get_file_path(self, execution_arn: str) -> Path:
59+
"""Get file path for execution ARN."""
60+
# Use ARN as filename with .json extension, replacing unsafe characters
61+
safe_filename = execution_arn.replace(":", "_").replace("/", "_")
62+
return self._storage_dir / f"{safe_filename}.json"
63+
64+
def save(self, execution: Execution) -> None:
65+
"""Save execution to file system."""
66+
file_path = self._get_file_path(execution.durable_execution_arn)
67+
data = execution.to_dict()
68+
69+
with open(file_path, "w", encoding="utf-8") as f:
70+
json.dump(data, f, indent=2, cls=DateTimeEncoder)
71+
72+
def load(self, execution_arn: str) -> Execution:
73+
"""Load execution from file system."""
74+
file_path = self._get_file_path(execution_arn)
75+
if not file_path.exists():
76+
msg = f"Execution {execution_arn} not found"
77+
raise DurableFunctionsLocalRunnerError(msg)
78+
79+
with open(file_path, encoding="utf-8") as f:
80+
data = json.load(f, object_hook=datetime_object_hook)
81+
82+
return Execution.from_dict(data)
83+
84+
def update(self, execution: Execution) -> None:
85+
"""Update execution in file system (same as save)."""
86+
self.save(execution)
87+
88+
def list_all(self) -> list[Execution]:
89+
"""List all executions from file system."""
90+
executions = []
91+
for file_path in self._storage_dir.glob("*.json"):
92+
try:
93+
with open(file_path, encoding="utf-8") as f:
94+
data = json.load(f, object_hook=datetime_object_hook)
95+
executions.append(Execution.from_dict(data))
96+
except (json.JSONDecodeError, KeyError, OSError) as e:
97+
logging.warning("Skipping corrupted file %s: %s", file_path, e)
98+
continue
99+
return executions

0 commit comments

Comments
 (0)