Skip to content

Commit de47dde

Browse files
committed
feat: implement filesystem store
1 parent 128c297 commit de47dde

File tree

13 files changed

+485
-70
lines changed

13 files changed

+485
-70
lines changed

src/aws_durable_execution_sdk_python_testing/checkpoint/processor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
if TYPE_CHECKING:
2828
from aws_durable_execution_sdk_python_testing.execution import Execution
2929
from aws_durable_execution_sdk_python_testing.scheduler import Scheduler
30-
from aws_durable_execution_sdk_python_testing.store import ExecutionStore
30+
from aws_durable_execution_sdk_python_testing.stores import ExecutionStore
3131

3232

3333
class CheckpointProcessor:

src/aws_durable_execution_sdk_python_testing/cli.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ class CliConfig:
5050
local_runner_region: str = "us-west-2"
5151
local_runner_mode: str = "local"
5252

53+
# Store configuration
54+
store_type: str = "memory"
55+
store_path: str | None = None
56+
5357
@classmethod
5458
def from_environment(cls) -> CliConfig:
5559
"""Create configuration from environment variables with defaults."""
@@ -69,6 +73,8 @@ def from_environment(cls) -> CliConfig:
6973
),
7074
local_runner_region=os.getenv("AWS_DEX_LOCAL_RUNNER_REGION", "us-west-2"),
7175
local_runner_mode=os.getenv("AWS_DEX_LOCAL_RUNNER_MODE", "local"),
76+
store_type=os.getenv("AWS_DEX_STORE_TYPE", "memory"),
77+
store_path=os.getenv("AWS_DEX_STORE_PATH"),
7278
)
7379

7480

@@ -188,6 +194,17 @@ def _create_start_server_parser(self, subparsers) -> None:
188194
default=self.config.local_runner_mode,
189195
help=f"Local Runner mode (default: {self.config.local_runner_mode}, env: AWS_DEX_LOCAL_RUNNER_MODE)",
190196
)
197+
start_server_parser.add_argument(
198+
"--store-type",
199+
choices=["memory", "filesystem"],
200+
default=self.config.store_type,
201+
help=f"Store type for execution persistence (default: {self.config.store_type}, env: AWS_DEX_STORE_TYPE)",
202+
)
203+
start_server_parser.add_argument(
204+
"--store-path",
205+
default=self.config.store_path,
206+
help=f"Path for filesystem store (default: {self.config.store_path or '.durable_executions'}, env: AWS_DEX_STORE_PATH)",
207+
)
191208
start_server_parser.set_defaults(func=self.start_server_command)
192209

193210
def _create_invoke_parser(self, subparsers) -> None:
@@ -258,6 +275,8 @@ def start_server_command(self, args: argparse.Namespace) -> int:
258275
local_runner_endpoint=args.local_runner_endpoint,
259276
local_runner_region=args.local_runner_region,
260277
local_runner_mode=args.local_runner_mode,
278+
store_type=args.store_type,
279+
store_path=args.store_path,
261280
)
262281

263282
logger.info(
@@ -273,6 +292,10 @@ def start_server_command(self, args: argparse.Namespace) -> int:
273292
logger.info(" Local Runner Endpoint: %s", args.local_runner_endpoint)
274293
logger.info(" Local Runner Region: %s", args.local_runner_region)
275294
logger.info(" Local Runner Mode: %s", args.local_runner_mode)
295+
logger.info(" Store Type: %s", args.store_type)
296+
if args.store_type == "filesystem":
297+
store_path = args.store_path or ".durable_executions"
298+
logger.info(" Store Path: %s", store_path)
276299

277300
# Use runner as context manager for proper lifecycle
278301
with WebRunner(runner_config) as runner:

src/aws_durable_execution_sdk_python_testing/executor.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848

4949
from aws_durable_execution_sdk_python_testing.invoker import Invoker
5050
from aws_durable_execution_sdk_python_testing.scheduler import Event, Scheduler
51-
from aws_durable_execution_sdk_python_testing.store import ExecutionStore
51+
from aws_durable_execution_sdk_python_testing.stores import ExecutionStore
5252

5353
logger = logging.getLogger(__name__)
5454

@@ -142,15 +142,15 @@ def get_execution_details(self, execution_arn: str) -> GetDurableExecutionRespon
142142
durable_execution_name=execution.start_input.execution_name,
143143
function_arn=f"arn:aws:lambda:us-east-1:123456789012:function:{execution.start_input.function_name}",
144144
status=status,
145-
start_timestamp=execution_op.start_timestamp.isoformat()
145+
start_timestamp=execution_op.start_timestamp.timestamp()
146146
if execution_op.start_timestamp
147-
else datetime.now(UTC).isoformat(),
147+
else datetime.now(UTC).timestamp(),
148148
input_payload=execution_op.execution_details.input_payload
149149
if execution_op.execution_details
150150
else None,
151151
result=result,
152152
error=error,
153-
end_timestamp=execution_op.end_timestamp.isoformat()
153+
end_timestamp=execution_op.end_timestamp.timestamp()
154154
if execution_op.end_timestamp
155155
else None,
156156
version="1.0",
@@ -223,10 +223,10 @@ def list_executions(
223223
durable_execution_name=execution.start_input.execution_name,
224224
function_arn=f"arn:aws:lambda:us-east-1:123456789012:function:{execution.start_input.function_name}",
225225
status=execution_status,
226-
start_timestamp=execution_op.start_timestamp.isoformat()
226+
start_timestamp=execution_op.start_timestamp.timestamp()
227227
if execution_op.start_timestamp
228-
else datetime.now(UTC).isoformat(),
229-
end_timestamp=execution_op.end_timestamp.isoformat()
228+
else datetime.now(UTC).timestamp(),
229+
end_timestamp=execution_op.end_timestamp.timestamp()
230230
if execution_op.end_timestamp
231231
else None,
232232
)
@@ -333,7 +333,7 @@ def stop_execution(
333333
# Stop the execution
334334
self.fail_execution(execution_arn, stop_error)
335335

336-
return StopDurableExecutionResponse(stop_date=datetime.now(UTC).isoformat())
336+
return StopDurableExecutionResponse(stop_date=datetime.now(UTC).timestamp())
337337

338338
def get_execution_state(
339339
self,

src/aws_durable_execution_sdk_python_testing/model.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,11 @@ class GetDurableExecutionResponse:
131131
durable_execution_name: str
132132
function_arn: str
133133
status: str
134-
start_timestamp: str
134+
start_timestamp: float
135135
input_payload: str | None = None
136136
result: str | None = None
137137
error: ErrorObject | None = None
138-
end_timestamp: str | None = None
138+
end_timestamp: float | None = None
139139
version: str | None = None
140140

141141
@classmethod
@@ -188,8 +188,8 @@ class Execution:
188188
durable_execution_name: str
189189
function_arn: str
190190
status: str
191-
start_timestamp: str
192-
end_timestamp: str | None = None
191+
start_timestamp: float
192+
end_timestamp: float | None = None
193193

194194
@classmethod
195195
def from_dict(cls, data: dict) -> Execution:
@@ -325,7 +325,7 @@ def to_dict(self) -> dict[str, Any]:
325325
class StopDurableExecutionResponse:
326326
"""Response from stopping a durable execution."""
327327

328-
stop_date: str
328+
stop_date: float
329329

330330
@classmethod
331331
def from_dict(cls, data: dict) -> StopDurableExecutionResponse:

src/aws_durable_execution_sdk_python_testing/runner.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,11 @@
4848
StartDurableExecutionOutput,
4949
)
5050
from aws_durable_execution_sdk_python_testing.scheduler import Scheduler
51-
from aws_durable_execution_sdk_python_testing.store import InMemoryExecutionStore
51+
from aws_durable_execution_sdk_python_testing.stores import (
52+
ExecutionStore,
53+
FileSystemExecutionStore,
54+
InMemoryExecutionStore,
55+
)
5256
from aws_durable_execution_sdk_python_testing.web.server import WebServer
5357

5458

@@ -83,6 +87,10 @@ class WebRunnerConfig:
8387
local_runner_region: str = "us-west-2"
8488
local_runner_mode: str = "local"
8589

90+
# Store configuration
91+
store_type: str = "memory" # "memory" or "filesystem"
92+
store_path: str | None = None # Path for filesystem store
93+
8694

8795
@dataclass(frozen=True)
8896
class Operation:
@@ -543,7 +551,7 @@ def __init__(self, config: WebRunnerConfig) -> None:
543551
self._config = config
544552
self._server: WebServer | None = None
545553
self._scheduler: Scheduler | None = None
546-
self._store: InMemoryExecutionStore | None = None
554+
self._store: ExecutionStore | None = None
547555
self._invoker: LambdaInvoker | None = None
548556
self._executor: Executor | None = None
549557

@@ -581,7 +589,11 @@ def start(self) -> None:
581589
raise DurableFunctionsLocalRunnerError(msg)
582590

583591
# Create dependencies and server
584-
self._store = InMemoryExecutionStore()
592+
if self._config.store_type == "filesystem":
593+
store_path = self._config.store_path or ".durable_executions"
594+
self._store = FileSystemExecutionStore(store_path)
595+
else:
596+
self._store = InMemoryExecutionStore()
585597
self._scheduler = Scheduler()
586598
self._invoker = LambdaInvoker(self._create_boto3_client())
587599

src/aws_durable_execution_sdk_python_testing/store.py

Lines changed: 0 additions & 50 deletions
This file was deleted.
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
"""Execution stores for persisting durable function executions."""
2+
3+
from __future__ import annotations
4+
5+
from typing import TYPE_CHECKING, Protocol
6+
7+
from aws_durable_execution_sdk_python_testing.stores.filesystem import (
8+
FileSystemExecutionStore,
9+
)
10+
from aws_durable_execution_sdk_python_testing.stores.memory import (
11+
InMemoryExecutionStore,
12+
)
13+
14+
15+
if TYPE_CHECKING:
16+
from aws_durable_execution_sdk_python_testing.execution import Execution
17+
18+
19+
class ExecutionStore(Protocol):
20+
"""Protocol for execution storage implementations."""
21+
22+
# ignore cover because coverage doesn't understand elipses
23+
def save(self, execution: Execution) -> None: ... # pragma: no cover
24+
def load(self, execution_arn: str) -> Execution: ... # pragma: no cover
25+
def update(self, execution: Execution) -> None: ... # pragma: no cover
26+
def list_all(self) -> list[Execution]: ... # pragma: no cover
27+
28+
29+
__all__ = ["ExecutionStore", "InMemoryExecutionStore", "FileSystemExecutionStore"]
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
"""File system-based execution store implementation."""
2+
3+
from __future__ import annotations
4+
5+
import contextlib
6+
import json
7+
import logging
8+
import threading
9+
from datetime import UTC, datetime
10+
from pathlib import Path
11+
from typing import TYPE_CHECKING
12+
13+
14+
if TYPE_CHECKING:
15+
from aws_durable_execution_sdk_python_testing.execution import Execution
16+
17+
18+
class DateTimeEncoder(json.JSONEncoder):
19+
"""Custom JSON encoder that handles datetime objects."""
20+
21+
def default(self, obj):
22+
if isinstance(obj, datetime):
23+
return obj.timestamp()
24+
return super().default(obj)
25+
26+
27+
def datetime_object_hook(obj):
28+
"""JSON object hook to convert unix timestamps back to datetime objects."""
29+
if isinstance(obj, dict):
30+
for key, value in obj.items():
31+
if isinstance(value, int | float) and key.endswith(("_timestamp", "_time")):
32+
with contextlib.suppress(ValueError, OSError):
33+
obj[key] = datetime.fromtimestamp(value, tz=UTC)
34+
return obj
35+
36+
37+
class FileSystemExecutionStore:
38+
"""File system-based execution store for persistence."""
39+
40+
def __init__(self, storage_dir: str | Path = ".durable_executions") -> None:
41+
self._storage_dir = Path(storage_dir)
42+
self._storage_dir.mkdir(exist_ok=True)
43+
self._lock = threading.RLock()
44+
45+
def _get_file_path(self, execution_arn: str) -> Path:
46+
"""Get file path for execution ARN."""
47+
# Use ARN as filename with .json extension, replacing unsafe characters
48+
safe_filename = execution_arn.replace(":", "_").replace("/", "_")
49+
return self._storage_dir / f"{safe_filename}.json"
50+
51+
def save(self, execution: Execution) -> None:
52+
"""Save execution to file system."""
53+
with self._lock:
54+
file_path = self._get_file_path(execution.durable_execution_arn)
55+
data = execution.to_dict()
56+
57+
# Write atomically using temporary file
58+
temp_path = file_path.with_suffix(".tmp")
59+
try:
60+
with open(temp_path, "w", encoding="utf-8") as f:
61+
json.dump(data, f, indent=2, cls=DateTimeEncoder)
62+
temp_path.replace(file_path)
63+
except Exception:
64+
if temp_path.exists():
65+
temp_path.unlink()
66+
raise
67+
68+
def load(self, execution_arn: str) -> Execution:
69+
"""Load execution from file system."""
70+
from aws_durable_execution_sdk_python_testing.execution import Execution
71+
72+
with self._lock:
73+
file_path = self._get_file_path(execution_arn)
74+
if not file_path.exists():
75+
msg = f"Execution {execution_arn} not found"
76+
raise KeyError(msg)
77+
78+
with open(file_path, encoding="utf-8") as f:
79+
data = json.load(f, object_hook=datetime_object_hook)
80+
81+
return Execution.from_dict(data)
82+
83+
def update(self, execution: Execution) -> None:
84+
"""Update execution in file system (same as save)."""
85+
self.save(execution)
86+
87+
def list_all(self) -> list[Execution]:
88+
"""List all executions from file system."""
89+
from aws_durable_execution_sdk_python_testing.execution import Execution
90+
91+
with self._lock:
92+
executions = []
93+
for file_path in self._storage_dir.glob("*.json"):
94+
try:
95+
with open(file_path, encoding="utf-8") as f:
96+
data = json.load(f, object_hook=datetime_object_hook)
97+
executions.append(Execution.from_dict(data))
98+
except (json.JSONDecodeError, KeyError, OSError) as e:
99+
logging.warning("Skipping corrupted file %s: %s", file_path, e)
100+
continue
101+
return executions

0 commit comments

Comments
 (0)