Skip to content

Commit ddc9bd5

Browse files
author
Rares Polenciuc
committed
feat: add SQLite execution store with query capabilities
- Add SQLiteExecutionStore with database persistence and indexing - Implement ExecutionQuery system for filtering by function name, execution name, status - Add QueryResult with pagination support (limit, offset, has_more) - Refactor BaseExecutionStore with shared query processing logic - Update Executor to use new query system for efficient list operations - Add comprehensive test coverage for all store implementations - Support concurrent access patterns with proper database handling
1 parent 48eae39 commit ddc9bd5

File tree

9 files changed

+1426
-67
lines changed

9 files changed

+1426
-67
lines changed

src/aws_durable_execution_sdk_python_testing/executor.py

Lines changed: 39 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,8 @@ def list_executions(
162162
function_version: str | None = None, # noqa: ARG002
163163
execution_name: str | None = None,
164164
status_filter: str | None = None,
165-
time_after: str | None = None, # noqa: ARG002
166-
time_before: str | None = None, # noqa: ARG002
165+
time_after: str | None = None,
166+
time_before: str | None = None,
167167
marker: str | None = None,
168168
max_items: int | None = None,
169169
reverse_order: bool = False, # noqa: FBT001, FBT002
@@ -184,77 +184,53 @@ def list_executions(
184184
Returns:
185185
ListDurableExecutionsResponse: List of executions with pagination
186186
"""
187-
# Get all executions from store
188-
all_executions = self._store.list_all()
187+
from aws_durable_execution_sdk_python_testing.stores.base import ExecutionQuery
189188

190-
# Apply filters
191-
filtered_executions = []
192-
for execution in all_executions:
193-
# Filter by function name
194-
if function_name and execution.start_input.function_name != function_name:
195-
continue
189+
# Convert marker to offset
190+
offset = 0
191+
if marker:
192+
try:
193+
offset = int(marker)
194+
except ValueError:
195+
offset = 0
196196

197-
# Filter by execution name
198-
if (
199-
execution_name
200-
and execution.start_input.execution_name != execution_name
201-
):
202-
continue
197+
# Query store
198+
query = ExecutionQuery(
199+
function_name=function_name,
200+
execution_name=execution_name,
201+
status_filter=status_filter,
202+
time_after=time_after,
203+
time_before=time_before,
204+
limit=max_items or 50,
205+
offset=offset,
206+
reverse_order=reverse_order
207+
)
203208

204-
# Determine execution status
205-
execution_status = "RUNNING"
206-
if execution.is_complete:
207-
if (
208-
execution.result
209-
and execution.result.status == InvocationStatus.SUCCEEDED
210-
):
211-
execution_status = "SUCCEEDED"
212-
else:
213-
execution_status = "FAILED"
214-
215-
# Filter by status
216-
if status_filter and execution_status != status_filter:
217-
continue
218-
219-
# Convert to ExecutionSummary
209+
result = self._store.query(query)
210+
211+
# Convert to ExecutionSummary objects
212+
execution_summaries = []
213+
for execution in result.executions:
220214
execution_op = execution.get_operation_execution_started()
221-
execution_summary = ExecutionSummary(
215+
status = "RUNNING"
216+
if execution.is_complete:
217+
status = "SUCCEEDED" if execution.result and execution.result.status == InvocationStatus.SUCCEEDED else "FAILED"
218+
219+
execution_summaries.append(ExecutionSummary(
222220
durable_execution_arn=execution.durable_execution_arn,
223221
durable_execution_name=execution.start_input.execution_name,
224222
function_arn=f"arn:aws:lambda:us-east-1:123456789012:function:{execution.start_input.function_name}",
225-
status=execution_status,
226-
start_timestamp=execution_op.start_timestamp.timestamp()
227-
if execution_op.start_timestamp
228-
else datetime.now(UTC).timestamp(),
229-
end_timestamp=execution_op.end_timestamp.timestamp()
230-
if execution_op.end_timestamp
231-
else None,
232-
)
233-
filtered_executions.append(execution_summary)
234-
235-
# Sort by start date
236-
filtered_executions.sort(key=lambda e: e.start_timestamp, reverse=reverse_order)
223+
status=status,
224+
start_timestamp=execution_op.start_timestamp.timestamp() if execution_op.start_timestamp else datetime.now(UTC).timestamp(),
225+
end_timestamp=execution_op.end_timestamp.timestamp() if execution_op.end_timestamp else None,
226+
))
237227

238-
# Apply pagination
239-
if max_items is None:
240-
max_items = 50
241-
242-
start_index = 0
243-
if marker:
244-
try:
245-
start_index = int(marker)
246-
except ValueError:
247-
start_index = 0
248-
249-
end_index = start_index + max_items
250-
paginated_executions = filtered_executions[start_index:end_index]
251-
252-
next_marker = None
253-
if end_index < len(filtered_executions):
254-
next_marker = str(end_index)
228+
# Generate next marker
229+
next_marker = str(offset + len(execution_summaries)) if result.has_more else None
255230

256231
return ListDurableExecutionsResponse(
257-
durable_executions=paginated_executions, next_marker=next_marker
232+
durable_executions=execution_summaries,
233+
next_marker=next_marker
258234
)
259235

260236
def list_executions_by_function(

src/aws_durable_execution_sdk_python_testing/stores/base.py

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@
22

33
from __future__ import annotations
44

5+
from dataclasses import dataclass
6+
from datetime import timezone
57
from enum import Enum
68
from typing import TYPE_CHECKING, Protocol
79

10+
from aws_durable_execution_sdk_python.execution import InvocationStatus
811

912
if TYPE_CHECKING:
1013
from aws_durable_execution_sdk_python_testing.execution import Execution
@@ -15,6 +18,30 @@ class StoreType(Enum):
1518

1619
MEMORY = "memory"
1720
FILESYSTEM = "filesystem"
21+
SQLITE = "sqlite"
22+
23+
24+
@dataclass
25+
class ExecutionQuery:
26+
"""Query parameters for execution filtering."""
27+
function_name: str | None = None
28+
execution_name: str | None = None
29+
status_filter: str | None = None
30+
time_after: str | None = None
31+
time_before: str | None = None
32+
limit: int | None = None
33+
offset: int = 0
34+
reverse_order: bool = False
35+
36+
37+
@dataclass
38+
class QueryResult:
39+
"""Result of a query with pagination info."""
40+
executions: list[Execution]
41+
total_count: int
42+
has_more: bool
43+
44+
1845

1946

2047
class ExecutionStore(Protocol):
@@ -24,4 +51,64 @@ class ExecutionStore(Protocol):
2451
def save(self, execution: Execution) -> None: ... # pragma: no cover
2552
def load(self, execution_arn: str) -> Execution: ... # pragma: no cover
2653
def update(self, execution: Execution) -> None: ... # pragma: no cover
27-
def list_all(self) -> list[Execution]: ... # pragma: no cover
54+
def query(self, query: ExecutionQuery) -> QueryResult: ... # pragma: no cover
55+
def list_all(self) -> list[Execution]: ... # pragma: no cover # Keep for backward compatibility
56+
57+
class BaseExecutionStore(ExecutionStore):
58+
"""Base implementation for execution stores with shared query logic."""
59+
60+
@staticmethod
61+
def process_query(executions: list[Execution], query: ExecutionQuery) -> QueryResult:
62+
"""Apply filtering, sorting, and pagination to executions."""
63+
# Apply filters
64+
filtered = []
65+
for execution in executions:
66+
if query.function_name and execution.start_input.function_name != query.function_name:
67+
continue
68+
if query.execution_name and execution.start_input.execution_name != query.execution_name:
69+
continue
70+
71+
# Status filtering
72+
if query.status_filter:
73+
status = "RUNNING"
74+
if execution.is_complete:
75+
status = "SUCCEEDED" if execution.result and execution.result.status == InvocationStatus.SUCCEEDED else "FAILED"
76+
if status != query.status_filter:
77+
continue
78+
79+
filtered.append(execution)
80+
81+
# Sort by start timestamp
82+
def get_sort_key(e):
83+
try:
84+
op = e.get_operation_execution_started()
85+
if op.start_timestamp:
86+
return op.start_timestamp.timestamp() if hasattr(op.start_timestamp, 'timestamp') else op.start_timestamp.replace(tzinfo=timezone.utc).timestamp()
87+
else:
88+
return 0
89+
except Exception: # noqa: BLE001
90+
return 0
91+
92+
filtered.sort(key=get_sort_key, reverse=query.reverse_order)
93+
94+
total_count = len(filtered)
95+
96+
# Apply pagination
97+
if query.limit:
98+
end_idx = query.offset + query.limit
99+
paginated = filtered[query.offset:end_idx]
100+
has_more = end_idx < total_count
101+
else:
102+
paginated = filtered[query.offset:]
103+
has_more = False
104+
105+
return QueryResult(
106+
executions=paginated,
107+
total_count=total_count,
108+
has_more=has_more
109+
)
110+
111+
def query(self, query: ExecutionQuery) -> QueryResult:
112+
"""Apply filtering, sorting, and pagination to executions."""
113+
executions = self.list_all()
114+
return self.process_query(executions, query)

src/aws_durable_execution_sdk_python_testing/stores/filesystem.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
ResourceNotFoundException,
1212
)
1313
from aws_durable_execution_sdk_python_testing.execution import Execution
14+
from aws_durable_execution_sdk_python_testing.stores.base import (
15+
BaseExecutionStore,
16+
)
1417

1518

1619
class DateTimeEncoder(json.JSONEncoder):
@@ -35,7 +38,7 @@ def datetime_object_hook(obj):
3538
return obj
3639

3740

38-
class FileSystemExecutionStore:
41+
class FileSystemExecutionStore(BaseExecutionStore):
3942
"""File system-based execution store for persistence."""
4043

4144
def __init__(self, storage_dir: Path) -> None:

src/aws_durable_execution_sdk_python_testing/stores/memory.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,17 @@
66
from typing import TYPE_CHECKING
77

88

9+
10+
from aws_durable_execution_sdk_python_testing.stores.base import (
11+
BaseExecutionStore,
12+
)
13+
14+
915
if TYPE_CHECKING:
1016
from aws_durable_execution_sdk_python_testing.execution import Execution
1117

1218

13-
class InMemoryExecutionStore:
19+
class InMemoryExecutionStore(BaseExecutionStore):
1420
"""Dict-based storage for testing."""
1521

1622
def __init__(self) -> None:

0 commit comments

Comments
 (0)