Skip to content

Commit 1171521

Browse files
FullyTypedAstraea Quinn S
authored andcommitted
fix: Migrate datetime from float to datetime object
- Replace float for timestamps with actual datetime objects as those are given back by boto3
1 parent a71e9eb commit 1171521

File tree

5 files changed

+93
-69
lines changed

5 files changed

+93
-69
lines changed

src/aws_durable_execution_sdk_python/concurrency.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@
55
import heapq
66
import logging
77
import threading
8-
import time
98
from abc import ABC, abstractmethod
109
from collections import Counter
1110
from concurrent.futures import Future, ThreadPoolExecutor
1211
from dataclasses import dataclass
12+
from datetime import UTC, datetime
1313
from enum import Enum
1414
from typing import TYPE_CHECKING, Generic, Self, TypeVar
1515

@@ -258,7 +258,7 @@ def __init__(self, executable: Executable[CallableType]):
258258
self.executable = executable
259259
self._status = BranchStatus.PENDING
260260
self._future: Future | None = None
261-
self._suspend_until: float | None = None
261+
self._suspend_until: datetime | None = None
262262
self._result: ResultType = None # type: ignore[assignment]
263263
self._is_result_set: bool = False
264264
self._error: Exception | None = None
@@ -293,7 +293,7 @@ def error(self) -> Exception:
293293
return self._error
294294

295295
@property
296-
def suspend_until(self) -> float | None:
296+
def suspend_until(self) -> datetime | None:
297297
"""Get suspend timestamp."""
298298
return self._suspend_until
299299

@@ -308,7 +308,7 @@ def can_resume(self) -> bool:
308308
return self._status is BranchStatus.SUSPENDED or (
309309
self._status is BranchStatus.SUSPENDED_WITH_TIMEOUT
310310
and self._suspend_until is not None
311-
and time.time() >= self._suspend_until
311+
and datetime.now(UTC) >= self._suspend_until
312312
)
313313

314314
@property
@@ -333,7 +333,7 @@ def suspend(self) -> None:
333333
self._status = BranchStatus.SUSPENDED
334334
self._suspend_until = None
335335

336-
def suspend_with_timeout(self, timestamp: float) -> None:
336+
def suspend_with_timeout(self, timestamp: datetime) -> None:
337337
"""Transition to SUSPENDED_WITH_TIMEOUT state."""
338338
self._status = BranchStatus.SUSPENDED_WITH_TIMEOUT
339339
self._suspend_until = timestamp
@@ -507,11 +507,11 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None:
507507
self.shutdown()
508508

509509
def schedule_resume(
510-
self, exe_state: ExecutableWithState, resume_time: float
510+
self, exe_state: ExecutableWithState, resume_time: datetime
511511
) -> None:
512512
"""Schedule a task to resume at the specified time."""
513513
with self._lock:
514-
heapq.heappush(self._pending_resumes, (resume_time, exe_state))
514+
heapq.heappush(self._pending_resumes, (resume_time.timestamp(), exe_state))
515515

516516
def shutdown(self) -> None:
517517
"""Shutdown the timer thread and cancel all pending resumes."""
@@ -534,7 +534,7 @@ def _timer_loop(self) -> None:
534534
self._shutdown.wait(timeout=0.1)
535535
continue
536536

537-
current_time = time.time()
537+
current_time = datetime.now(UTC).timestamp()
538538
if current_time >= next_resume_time:
539539
# Time to resume
540540
with self._lock:
@@ -675,7 +675,7 @@ def on_done(future: Future) -> None:
675675

676676
def should_execution_suspend(self) -> SuspendResult:
677677
"""Check if execution should suspend."""
678-
earliest_timestamp: float = float("inf")
678+
earliest_timestamp: datetime | None = None
679679
indefinite_suspend_task: (
680680
ExecutableWithState[CallableType, ResultType] | None
681681
) = None
@@ -685,16 +685,16 @@ def should_execution_suspend(self) -> SuspendResult:
685685
# Exit here! Still have tasks that can make progress, don't suspend.
686686
return SuspendResult.do_not_suspend()
687687
if exe_state.status is BranchStatus.SUSPENDED_WITH_TIMEOUT:
688-
if (
689-
exe_state.suspend_until
690-
and exe_state.suspend_until < earliest_timestamp
688+
if exe_state.suspend_until and (
689+
earliest_timestamp is None
690+
or exe_state.suspend_until < earliest_timestamp
691691
):
692692
earliest_timestamp = exe_state.suspend_until
693693
elif exe_state.status is BranchStatus.SUSPENDED:
694694
indefinite_suspend_task = exe_state
695695

696696
# All tasks are in final states and at least one of them is a suspend.
697-
if earliest_timestamp != float("inf"):
697+
if earliest_timestamp is not None:
698698
return SuspendResult.suspend(
699699
TimedSuspendExecution(
700700
"All concurrent work complete or suspended pending retry.",

src/aws_durable_execution_sdk_python/exceptions.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,8 @@
55

66
from __future__ import annotations
77

8-
import time
98
from dataclasses import dataclass
10-
from typing import TYPE_CHECKING
11-
12-
if TYPE_CHECKING:
13-
import datetime
9+
from datetime import UTC, datetime, timedelta
1410

1511

1612
class DurableExecutionsError(Exception):
@@ -75,10 +71,10 @@ class TimedSuspendExecution(SuspendExecution):
7571
This is a specialized form of SuspendExecution that includes a scheduled resume time.
7672
7773
Attributes:
78-
scheduled_timestamp (float): Unix timestamp in seconds at which to resume.
74+
scheduled_timestamp (datetime): DateTime at which to resume.
7975
"""
8076

81-
def __init__(self, message: str, scheduled_timestamp: float):
77+
def __init__(self, message: str, scheduled_timestamp: datetime):
8278
super().__init__(message)
8379
self.scheduled_timestamp = scheduled_timestamp
8480

@@ -97,23 +93,23 @@ def from_delay(cls, message: str, delay_seconds: int) -> TimedSuspendExecution:
9793
>>> exception = TimedSuspendExecution.from_delay("Waiting for callback", 30)
9894
>>> # Will suspend for 30 seconds from now
9995
"""
100-
resume_time = time.time() + delay_seconds
96+
resume_time = datetime.now(UTC) + timedelta(seconds=delay_seconds)
10197
return cls(message, scheduled_timestamp=resume_time)
10298

10399
@classmethod
104100
def from_datetime(
105-
cls, message: str, datetime_timestamp: datetime.datetime
101+
cls, message: str, datetime_timestamp: datetime
106102
) -> TimedSuspendExecution:
107103
"""Create a timed suspension with the delay calculated from now.
108104
109105
Args:
110106
message: Descriptive message for the suspension
111-
datetime_timestamp: Unix datetime timestamp in seconds at which to resume
107+
datetime_timestamp: DateTime timestamp at which to resume
112108
113109
Returns:
114110
TimedSuspendExecution: Instance with calculated resume time
115111
"""
116-
return cls(message, scheduled_timestamp=datetime_timestamp.timestamp())
112+
return cls(message, scheduled_timestamp=datetime_timestamp)
117113

118114

119115
class OrderedLockError(DurableExecutionsError):

src/aws_durable_execution_sdk_python/operation/wait.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from __future__ import annotations
44

55
import logging
6-
import time
6+
from datetime import UTC, datetime, timedelta
77
from typing import TYPE_CHECKING
88

99
from aws_durable_execution_sdk_python.exceptions import TimedSuspendExecution
@@ -48,6 +48,6 @@ def wait_handler(
4848
state.create_checkpoint(operation_update=operation)
4949

5050
# Calculate when to resume
51-
resume_time = time.time() + seconds
51+
resume_time = datetime.now(UTC) + timedelta(seconds=seconds)
5252
msg = f"Wait for {seconds} seconds"
5353
raise TimedSuspendExecution(msg, scheduled_timestamp=resume_time)

0 commit comments

Comments
 (0)