Skip to content

Commit 608020a

Browse files
committed
Improve jobs.py: use update1, djblob, cleaner f-string
- Use variable assignment for pk_section instead of chr(10) in f-string - Change error_stack type from mediumblob to <djblob> - Use update1() in error() instead of raw SQL and deprecated _update() - Remove config.override(enable_python_native_blobs=True) wrapper Note: reserve() keeps raw SQL for atomic conditional update with rowcount check - this is required for safe concurrent job reservation.
1 parent 956fa27 commit 608020a

File tree

1 file changed

+14
-23
lines changed

1 file changed

+14
-23
lines changed

src/datajoint/jobs.py

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import logging
1010
import os
1111
import platform
12+
from datetime import datetime
1213
from typing import TYPE_CHECKING
1314

1415
from .errors import DataJointError, DuplicateError
@@ -134,10 +135,10 @@ def _build_definition(self) -> str:
134135
)
135136

136137
# Build primary key section
137-
pk_lines = [attr_def for _, attr_def in pk_attrs]
138+
pk_section = "\n".join(attr_def for _, attr_def in pk_attrs)
138139

139140
definition = f"""# Job queue for {self._target.class_name}
140-
{chr(10).join(pk_lines)}
141+
{pk_section}
141142
---
142143
status : enum('pending', 'reserved', 'success', 'error', 'ignore')
143144
priority : int # Lower = more urgent (0 = highest priority)
@@ -147,7 +148,7 @@ def _build_definition(self) -> str:
147148
completed_time=null : datetime(6) # When job completed
148149
duration=null : float # Execution duration in seconds
149150
error_message="" : varchar({ERROR_MESSAGE_LENGTH}) # Error message if failed
150-
error_stack=null : mediumblob # Full error traceback
151+
error_stack=null : <djblob> # Full error traceback
151152
user="" : varchar(255) # Database user who reserved/completed job
152153
host="" : varchar(255) # Hostname of worker
153154
pid=0 : int unsigned # Process ID of worker
@@ -417,27 +418,17 @@ def error(self, key: dict, error_message: str, error_stack: str = None) -> None:
417418
pk_attrs = [name for name, _ in self._get_fk_derived_primary_key()]
418419
job_key = {attr: key[attr] for attr in pk_attrs if attr in key}
419420

420-
key_conditions = " AND ".join(
421-
f"`{attr}`='{job_key[attr]}'" if isinstance(job_key[attr], str) else f"`{attr}`={job_key[attr]}"
422-
for attr in pk_attrs
423-
)
424-
425-
# Escape error message for SQL
426-
error_message_escaped = error_message.replace("'", "''").replace("\\", "\\\\")
427-
428-
sql = f"""
429-
UPDATE {self.full_table_name}
430-
SET status='error',
431-
completed_time=NOW(6),
432-
error_message='{error_message_escaped}'
433-
WHERE {key_conditions}
434-
"""
435-
self.connection.query(sql)
436-
437-
# Update error_stack separately using parameterized query if provided
421+
# Build update dict with all required fields
422+
update_row = {
423+
**job_key,
424+
"status": "error",
425+
"completed_time": datetime.now(),
426+
"error_message": error_message,
427+
}
438428
if error_stack is not None:
439-
with config.override(enable_python_native_blobs=True):
440-
(self & job_key)._update("error_stack", error_stack)
429+
update_row["error_stack"] = error_stack
430+
431+
self.update1(update_row)
441432

442433
def ignore(self, key: dict) -> None:
443434
"""

0 commit comments

Comments
 (0)