Skip to content

Commit 41bc9f6

Browse files
FullyTypedAstraea Quinn S
authored andcommitted
Revert "fix: Migrate datetime from float to datetime object"
This reverts commit 1171521.
1 parent d7da110 commit 41bc9f6

File tree

5 files changed

+69
-93
lines changed

5 files changed

+69
-93
lines changed

src/aws_durable_execution_sdk_python/concurrency.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@
55
import heapq
66
import logging
77
import threading
8+
import time
89
from abc import ABC, abstractmethod
910
from collections import Counter
1011
from concurrent.futures import Future, ThreadPoolExecutor
1112
from dataclasses import dataclass
12-
from datetime import UTC, datetime
1313
from enum import Enum
1414
from typing import TYPE_CHECKING, Generic, Self, TypeVar
1515

@@ -258,7 +258,7 @@ def __init__(self, executable: Executable[CallableType]):
258258
self.executable = executable
259259
self._status = BranchStatus.PENDING
260260
self._future: Future | None = None
261-
self._suspend_until: datetime | None = None
261+
self._suspend_until: float | None = None
262262
self._result: ResultType = None # type: ignore[assignment]
263263
self._is_result_set: bool = False
264264
self._error: Exception | None = None
@@ -293,7 +293,7 @@ def error(self) -> Exception:
293293
return self._error
294294

295295
@property
296-
def suspend_until(self) -> datetime | None:
296+
def suspend_until(self) -> float | None:
297297
"""Get suspend timestamp."""
298298
return self._suspend_until
299299

@@ -308,7 +308,7 @@ def can_resume(self) -> bool:
308308
return self._status is BranchStatus.SUSPENDED or (
309309
self._status is BranchStatus.SUSPENDED_WITH_TIMEOUT
310310
and self._suspend_until is not None
311-
and datetime.now(UTC) >= self._suspend_until
311+
and time.time() >= self._suspend_until
312312
)
313313

314314
@property
@@ -333,7 +333,7 @@ def suspend(self) -> None:
333333
self._status = BranchStatus.SUSPENDED
334334
self._suspend_until = None
335335

336-
def suspend_with_timeout(self, timestamp: datetime) -> None:
336+
def suspend_with_timeout(self, timestamp: float) -> None:
337337
"""Transition to SUSPENDED_WITH_TIMEOUT state."""
338338
self._status = BranchStatus.SUSPENDED_WITH_TIMEOUT
339339
self._suspend_until = timestamp
@@ -507,11 +507,11 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None:
507507
self.shutdown()
508508

509509
def schedule_resume(
510-
self, exe_state: ExecutableWithState, resume_time: datetime
510+
self, exe_state: ExecutableWithState, resume_time: float
511511
) -> None:
512512
"""Schedule a task to resume at the specified time."""
513513
with self._lock:
514-
heapq.heappush(self._pending_resumes, (resume_time.timestamp(), exe_state))
514+
heapq.heappush(self._pending_resumes, (resume_time, exe_state))
515515

516516
def shutdown(self) -> None:
517517
"""Shutdown the timer thread and cancel all pending resumes."""
@@ -534,7 +534,7 @@ def _timer_loop(self) -> None:
534534
self._shutdown.wait(timeout=0.1)
535535
continue
536536

537-
current_time = datetime.now(UTC).timestamp()
537+
current_time = time.time()
538538
if current_time >= next_resume_time:
539539
# Time to resume
540540
with self._lock:
@@ -675,7 +675,7 @@ def on_done(future: Future) -> None:
675675

676676
def should_execution_suspend(self) -> SuspendResult:
677677
"""Check if execution should suspend."""
678-
earliest_timestamp: datetime | None = None
678+
earliest_timestamp: float = float("inf")
679679
indefinite_suspend_task: (
680680
ExecutableWithState[CallableType, ResultType] | None
681681
) = None
@@ -685,16 +685,16 @@ def should_execution_suspend(self) -> SuspendResult:
685685
# Exit here! Still have tasks that can make progress, don't suspend.
686686
return SuspendResult.do_not_suspend()
687687
if exe_state.status is BranchStatus.SUSPENDED_WITH_TIMEOUT:
688-
if exe_state.suspend_until and (
689-
earliest_timestamp is None
690-
or exe_state.suspend_until < earliest_timestamp
688+
if (
689+
exe_state.suspend_until
690+
and exe_state.suspend_until < earliest_timestamp
691691
):
692692
earliest_timestamp = exe_state.suspend_until
693693
elif exe_state.status is BranchStatus.SUSPENDED:
694694
indefinite_suspend_task = exe_state
695695

696696
# All tasks are in final states and at least one of them is a suspend.
697-
if earliest_timestamp is not None:
697+
if earliest_timestamp != float("inf"):
698698
return SuspendResult.suspend(
699699
TimedSuspendExecution(
700700
"All concurrent work complete or suspended pending retry.",

src/aws_durable_execution_sdk_python/exceptions.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,13 @@
55

66
from __future__ import annotations
77

8+
import time
89
from dataclasses import dataclass
9-
from datetime import UTC, datetime, timedelta
1010
from enum import Enum
11+
from typing import TYPE_CHECKING
12+
13+
if TYPE_CHECKING:
14+
import datetime
1115

1216

1317
class TerminationReason(Enum):
@@ -146,10 +150,10 @@ class TimedSuspendExecution(SuspendExecution):
146150
This is a specialized form of SuspendExecution that includes a scheduled resume time.
147151
148152
Attributes:
149-
scheduled_timestamp (datetime): DateTime at which to resume.
153+
scheduled_timestamp (float): Unix timestamp in seconds at which to resume.
150154
"""
151155

152-
def __init__(self, message: str, scheduled_timestamp: datetime):
156+
def __init__(self, message: str, scheduled_timestamp: float):
153157
super().__init__(message)
154158
self.scheduled_timestamp = scheduled_timestamp
155159

@@ -168,23 +172,23 @@ def from_delay(cls, message: str, delay_seconds: int) -> TimedSuspendExecution:
168172
>>> exception = TimedSuspendExecution.from_delay("Waiting for callback", 30)
169173
>>> # Will suspend for 30 seconds from now
170174
"""
171-
resume_time = datetime.now(UTC) + timedelta(seconds=delay_seconds)
175+
resume_time = time.time() + delay_seconds
172176
return cls(message, scheduled_timestamp=resume_time)
173177

174178
@classmethod
175179
def from_datetime(
176-
cls, message: str, datetime_timestamp: datetime
180+
cls, message: str, datetime_timestamp: datetime.datetime
177181
) -> TimedSuspendExecution:
178182
"""Create a timed suspension with the delay calculated from now.
179183
180184
Args:
181185
message: Descriptive message for the suspension
182-
datetime_timestamp: DateTime timestamp at which to resume
186+
datetime_timestamp: Unix datetime timestamp in seconds at which to resume
183187
184188
Returns:
185189
TimedSuspendExecution: Instance with calculated resume time
186190
"""
187-
return cls(message, scheduled_timestamp=datetime_timestamp)
191+
return cls(message, scheduled_timestamp=datetime_timestamp.timestamp())
188192

189193

190194
class OrderedLockError(DurableExecutionsError):

src/aws_durable_execution_sdk_python/operation/wait.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from __future__ import annotations
44

55
import logging
6-
from datetime import UTC, datetime, timedelta
6+
import time
77
from typing import TYPE_CHECKING
88

99
from aws_durable_execution_sdk_python.exceptions import TimedSuspendExecution
@@ -48,6 +48,6 @@ def wait_handler(
4848
state.create_checkpoint(operation_update=operation)
4949

5050
# Calculate when to resume
51-
resume_time = datetime.now(UTC) + timedelta(seconds=seconds)
51+
resume_time = time.time() + seconds
5252
msg = f"Wait for {seconds} seconds"
5353
raise TimedSuspendExecution(msg, scheduled_timestamp=resume_time)

0 commit comments

Comments
 (0)