Skip to content

Commit 2ea5a17

Browse files
Merge pull request #1336 from datajoint/replace-now-with-current-timestamp
Replace NOW() with CURRENT_TIMESTAMP for PostgreSQL compatibility
2 parents ac98154 + c1d109e commit 2ea5a17

File tree

3 files changed

+18
-18
lines changed

3 files changed

+18
-18
lines changed

src/datajoint/autopopulate.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -486,11 +486,11 @@ def handler(signum, frame):
486486
refresh = config.jobs.auto_refresh
487487
if refresh:
488488
# Use delay=-1 to ensure jobs are immediately schedulable
489-
# (avoids race condition with scheduled_time <= NOW(3) check)
489+
# (avoids race condition with scheduled_time <= CURRENT_TIMESTAMP(3) check)
490490
self.jobs.refresh(*restrictions, priority=priority, delay=-1)
491491

492-
# Fetch pending jobs ordered by priority (use NOW(3) to match CURRENT_TIMESTAMP(3) precision)
493-
pending_query = self.jobs.pending & "scheduled_time <= NOW(3)"
492+
# Fetch pending jobs ordered by priority (use CURRENT_TIMESTAMP(3) for datetime(3) precision)
493+
pending_query = self.jobs.pending & "scheduled_time <= CURRENT_TIMESTAMP(3)"
494494
if priority is not None:
495495
pending_query = pending_query & f"priority <= {priority}"
496496

src/datajoint/jobs.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -375,8 +375,8 @@ def refresh(
375375
new_key_list = new_keys.keys()
376376

377377
if new_key_list:
378-
# Always use MySQL server time for scheduling (NOW(3) matches datetime(3) precision)
379-
scheduled_time = self.connection.query(f"SELECT NOW(3) + INTERVAL {delay} SECOND").fetchone()[0]
378+
# Use server time for scheduling (CURRENT_TIMESTAMP(3) matches datetime(3) precision)
379+
scheduled_time = self.connection.query(f"SELECT CURRENT_TIMESTAMP(3) + INTERVAL {delay} SECOND").fetchone()[0]
380380

381381
for key in new_key_list:
382382
job_entry = {
@@ -402,19 +402,19 @@ def refresh(
402402
self.insert1({**key, "status": "pending", "priority": priority})
403403
result["re_pended"] += 1
404404

405-
# 3. Remove stale jobs (not ignore status) - use MySQL NOW() for consistent timing
405+
# 3. Remove stale jobs (not ignore status) - use server CURRENT_TIMESTAMP for consistent timing
406406
if stale_timeout > 0:
407-
old_jobs = self & f"created_time < NOW() - INTERVAL {stale_timeout} SECOND" & 'status != "ignore"'
407+
old_jobs = self & f"created_time < CURRENT_TIMESTAMP - INTERVAL {stale_timeout} SECOND" & 'status != "ignore"'
408408

409409
for key in old_jobs.keys():
410410
# Check if key still in key_source
411411
if not (key_source & key):
412412
(self & key).delete_quick()
413413
result["removed"] += 1
414414

415-
# 4. Handle orphaned reserved jobs - use MySQL NOW() for consistent timing
415+
# 4. Handle orphaned reserved jobs - use server CURRENT_TIMESTAMP for consistent timing
416416
if orphan_timeout is not None and orphan_timeout > 0:
417-
orphaned_jobs = self.reserved & f"reserved_time < NOW() - INTERVAL {orphan_timeout} SECOND"
417+
orphaned_jobs = self.reserved & f"reserved_time < CURRENT_TIMESTAMP - INTERVAL {orphan_timeout} SECOND"
418418

419419
for key in orphaned_jobs.keys():
420420
(self & key).delete_quick()
@@ -440,14 +440,14 @@ def reserve(self, key: dict) -> bool:
440440
bool
441441
True if reservation successful, False if job not available.
442442
"""
443-
# Check if job is pending and scheduled (use NOW(3) to match CURRENT_TIMESTAMP(3) precision)
444-
job = (self & key & 'status="pending"' & "scheduled_time <= NOW(3)").to_dicts()
443+
# Check if job is pending and scheduled (use CURRENT_TIMESTAMP(3) for datetime(3) precision)
444+
job = (self & key & 'status="pending"' & "scheduled_time <= CURRENT_TIMESTAMP(3)").to_dicts()
445445

446446
if not job:
447447
return False
448448

449-
# Get MySQL server time for reserved_time
450-
server_now = self.connection.query("SELECT NOW()").fetchone()[0]
449+
# Get server time for reserved_time
450+
server_now = self.connection.query("SELECT CURRENT_TIMESTAMP").fetchone()[0]
451451

452452
# Build update row with primary key and new values
453453
pk = self._get_pk(key)
@@ -489,8 +489,8 @@ def complete(self, key: dict, duration: float | None = None) -> None:
489489
from .settings import config
490490

491491
if config.jobs.keep_completed:
492-
# Use MySQL server time for completed_time
493-
server_now = self.connection.query("SELECT NOW()").fetchone()[0]
492+
# Use server time for completed_time
493+
server_now = self.connection.query("SELECT CURRENT_TIMESTAMP").fetchone()[0]
494494
pk = self._get_pk(key)
495495
update_row = {
496496
**pk,
@@ -519,8 +519,8 @@ def error(self, key: dict, error_message: str, error_stack: str | None = None) -
519519
if len(error_message) > ERROR_MESSAGE_LENGTH:
520520
error_message = error_message[: ERROR_MESSAGE_LENGTH - len(TRUNCATION_APPENDIX)] + TRUNCATION_APPENDIX
521521

522-
# Use MySQL server time for completed_time
523-
server_now = self.connection.query("SELECT NOW()").fetchone()[0]
522+
# Use server time for completed_time
523+
server_now = self.connection.query("SELECT CURRENT_TIMESTAMP").fetchone()[0]
524524

525525
pk = self._get_pk(key)
526526
update_row = {

tests/integration/test_autopopulate.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def test_populate_exclude_error_and_ignore_jobs(clean_autopopulate, subject, exp
6666
assert not experiment, "table already filled?"
6767

6868
# Refresh jobs to create pending entries
69-
# Use delay=-1 to ensure jobs are immediately schedulable (avoids race condition with NOW(3))
69+
# Use delay=-1 to ensure jobs are immediately schedulable (avoids race condition with CURRENT_TIMESTAMP(3))
7070
experiment.jobs.refresh(delay=-1)
7171

7272
keys = experiment.jobs.pending.keys(limit=2)

0 commit comments

Comments
 (0)