Skip to content

Commit 4d16df6

Browse files
merge: resolve conflict in jobs.py reserve method
Keep master's atomic SQL UPDATE for job reservation (prevents race conditions) while threading connection-scoped config through _get_job_version(). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2 parents f0ef848 + 0bd6e02 commit 4d16df6

File tree

6 files changed

+25
-77
lines changed

6 files changed

+25
-77
lines changed

src/datajoint/jobs.py

Lines changed: 24 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import platform
1414
import subprocess
1515

16-
from .condition import AndList, Not
16+
from .condition import AndList, Not, make_condition
1717
from .errors import DataJointError, DuplicateError
1818
from .heading import Heading
1919
from .table import Table
@@ -435,8 +435,10 @@ def reserve(self, key: dict) -> bool:
435435
"""
436436
Attempt to reserve a pending job for processing.
437437
438-
Updates status to ``'reserved'`` if currently ``'pending'`` and
439-
``scheduled_time <= now``.
438+
Atomically updates status to ``'reserved'`` if currently ``'pending'``
439+
and ``scheduled_time <= now``, using a single UPDATE with a WHERE clause
440+
that includes the status check. This prevents race conditions where
441+
multiple workers could reserve the same job simultaneously.
440442
441443
Parameters
442444
----------
@@ -448,33 +450,26 @@ def reserve(self, key: dict) -> bool:
448450
bool
449451
True if reservation successful, False if job not available.
450452
"""
451-
# Check if job is pending and scheduled (use CURRENT_TIMESTAMP(3) for datetime(3) precision)
452-
job = (self & key & "status='pending'" & "scheduled_time <= CURRENT_TIMESTAMP(3)").to_dicts()
453-
454-
if not job:
455-
return False
456-
457-
# Get server time for reserved_time
458-
server_now = self.connection.query("SELECT CURRENT_TIMESTAMP").fetchone()[0]
459-
460-
# Build update row with primary key and new values
461453
pk = self._get_pk(key)
462-
update_row = {
463-
**pk,
464-
"status": "reserved",
465-
"reserved_time": server_now,
466-
"host": platform.node(),
467-
"pid": os.getpid(),
468-
"connection_id": self.connection.connection_id,
469-
"user": self.connection.get_user(),
470-
"version": _get_job_version(self.connection._config),
471-
}
472-
473-
try:
474-
self.update1(update_row)
475-
return True
476-
except Exception:
477-
return False
454+
where = make_condition(self, pk, set())
455+
qi = self.adapter.quote_identifier
456+
assignments = ", ".join(f"{qi(k)}=%s" for k in ("status", "host", "pid", "connection_id", "user", "version"))
457+
query = (
458+
f"UPDATE {self.full_table_name} "
459+
f"SET {assignments}, {qi('reserved_time')}=CURRENT_TIMESTAMP(3) "
460+
f"WHERE {where} AND {qi('status')}='pending' "
461+
f"AND {qi('scheduled_time')} <= CURRENT_TIMESTAMP(3)"
462+
)
463+
args = [
464+
"reserved",
465+
platform.node(),
466+
os.getpid(),
467+
self.connection.connection_id,
468+
self.connection.get_user(),
469+
_get_job_version(self.connection._config),
470+
]
471+
cursor = self.connection.query(query, args=args)
472+
return cursor.rowcount == 1
478473

479474
def complete(self, key: dict, duration: float | None = None) -> None:
480475
"""

src/datajoint/schemas.py

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -320,26 +320,6 @@ def _decorate_table(self, table_class: type, context: dict[str, Any], assert_dec
320320
def __repr__(self):
321321
return "Schema `{name}`\n".format(name=self.database)
322322

323-
@property
324-
def size_on_disk(self) -> int:
325-
"""
326-
Return the total size of all tables in the schema.
327-
328-
Returns
329-
-------
330-
int
331-
Size in bytes (data + indices).
332-
"""
333-
self._assert_exists()
334-
return int(
335-
self.connection.query(
336-
"""
337-
SELECT SUM(data_length + index_length)
338-
FROM information_schema.tables WHERE table_schema='{db}'
339-
""".format(db=self.database)
340-
).fetchone()[0]
341-
)
342-
343323
def make_classes(self, into: dict[str, Any] | None = None) -> None:
344324
"""
345325
Create Python table classes for tables in the schema.

src/datajoint/table.py

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1250,22 +1250,6 @@ def drop(self, prompt: bool | None = None):
12501250
FreeTable(self.connection, table).drop_quick()
12511251
logger.info("Tables dropped. Restart kernel.")
12521252

1253-
@property
1254-
def size_on_disk(self):
1255-
"""
1256-
Return the size of data and indices in bytes on the storage device.
1257-
1258-
Returns
1259-
-------
1260-
int
1261-
Size of data and indices in bytes.
1262-
"""
1263-
ret = self.connection.query(
1264-
'SHOW TABLE STATUS FROM `{database}` WHERE NAME="{table}"'.format(database=self.database, table=self.table_name),
1265-
as_dict=True,
1266-
).fetchone()
1267-
return ret["Data_length"] + ret["Index_length"]
1268-
12691253
def describe(self, context=None, printout=False):
12701254
"""
12711255
Return the definition string for the query using DataJoint DDL.

src/datajoint/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
# version bump auto managed by Github Actions:
22
# label_prs.yaml(prep), release.yaml(bump), post_release.yaml(edit)
33
# manually set this version will be eventually overwritten by the above actions
4-
__version__ = "2.1.0"
4+
__version__ = "2.1.1"

tests/integration/test_relation.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -282,11 +282,5 @@ def relation_selector(attr):
282282
), "Regular expression matches for {name} but should not".format(name=name)
283283

284284

285-
def test_table_size(experiment):
286-
"""test getting the size of the table and its indices in bytes"""
287-
number_of_bytes = experiment.size_on_disk
288-
assert isinstance(number_of_bytes, int) and number_of_bytes > 100
289-
290-
291285
def test_repr_html(ephys):
292286
assert ephys._repr_html_().strip().startswith("<style")

tests/integration/test_schema.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,6 @@ def schema_empty(connection_test, schema_any, prefix):
5656
# Don't drop the schema since schema_any still needs it
5757

5858

59-
def test_schema_size_on_disk(schema_any):
60-
number_of_bytes = schema_any.size_on_disk
61-
assert isinstance(number_of_bytes, int)
62-
63-
6459
def test_schema_list(schema_any):
6560
schemas = dj.list_schemas(connection=schema_any.connection)
6661
assert schema_any.database in schemas

0 commit comments

Comments
 (0)