Skip to content

Commit ea90062

Browse files
committed
Add job management documentation
Document the jobs system for coordinating distributed populate() operations: job states, refresh(), status queries, configuration, and distributed processing patterns.
1 parent 46f362d commit ea90062

File tree

2 files changed

+219
-0
lines changed

2 files changed

+219
-0
lines changed

docs/mkdocs.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ nav:
4242
- Make Method: operations/make.md
4343
- Populate: operations/populate.md
4444
- Key Source: operations/key-source.md
45+
- Jobs: operations/jobs.md
4546
- Distributed: operations/distributed.md
4647
- Queries:
4748
- query/principles.md

docs/src/operations/jobs.md

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
# Job Management
2+
3+
DataJoint provides a job reservation system for coordinating distributed `populate()`
4+
operations across multiple workers. Each auto-populated table (`dj.Imported` or
5+
`dj.Computed`) has an associated hidden jobs table that tracks processing status.
6+
7+
## Overview
8+
9+
The jobs system enables:
10+
11+
- **Distributed computing**: Multiple workers can process the same table without conflicts
12+
- **Progress tracking**: Monitor pending, reserved, completed, and failed jobs
13+
- **Error management**: Track and retry failed computations
14+
- **Priority scheduling**: Process urgent jobs first
15+
16+
## Accessing the Jobs Table
17+
18+
Every auto-populated table has a `.jobs` attribute:
19+
20+
```python
21+
@schema
22+
class ProcessedData(dj.Computed):
23+
definition = """
24+
-> RawData
25+
---
26+
result : float
27+
"""
28+
29+
def make(self, key):
30+
# computation logic
31+
self.insert1(dict(key, result=compute(key)))
32+
33+
# Access the jobs table
34+
ProcessedData.jobs
35+
```
36+
37+
## Job States
38+
39+
Jobs can be in one of five states:
40+
41+
| Status | Description |
42+
|--------|-------------|
43+
| `pending` | Queued and ready for processing |
44+
| `reserved` | Currently being processed by a worker |
45+
| `success` | Completed successfully |
46+
| `error` | Failed with an error |
47+
| `ignore` | Manually marked to skip |
48+
49+
## Refreshing the Job Queue
50+
51+
The `refresh()` method updates the jobs queue by adding new jobs and removing stale ones:
52+
53+
```python
54+
# Add jobs for all missing keys
55+
ProcessedData.jobs.refresh()
56+
57+
# Add jobs for specific restrictions
58+
ProcessedData.jobs.refresh("subject_id > 10")
59+
60+
# Set priority (lower = more urgent, default: 5)
61+
ProcessedData.jobs.refresh(priority=1)
62+
63+
# Delay job availability by 60 seconds
64+
ProcessedData.jobs.refresh(delay=60)
65+
```
66+
67+
**Returns**: `{'added': int, 'removed': int}` - counts of jobs added and stale jobs removed.
68+
69+
### Parameters
70+
71+
| Parameter | Default | Description |
72+
|-----------|---------|-------------|
73+
| `restrictions` | None | Filter conditions for key_source |
74+
| `delay` | 0 | Seconds until jobs become available |
75+
| `priority` | 5 | Job priority (lower = more urgent) |
76+
| `stale_timeout` | 3600 | Seconds before checking pending jobs for staleness |
77+
78+
## Querying Job Status
79+
80+
### Filter by Status
81+
82+
```python
83+
# Pending jobs
84+
ProcessedData.jobs.pending
85+
86+
# Reserved (in-progress) jobs
87+
ProcessedData.jobs.reserved
88+
89+
# Completed jobs
90+
ProcessedData.jobs.completed
91+
92+
# Failed jobs
93+
ProcessedData.jobs.errors
94+
95+
# Ignored jobs
96+
ProcessedData.jobs.ignored
97+
```
98+
99+
### Progress Summary
100+
101+
```python
102+
ProcessedData.jobs.progress()
103+
# Returns: {'pending': 50, 'reserved': 2, 'success': 100, 'error': 3, 'ignore': 1, 'total': 156}
104+
```
105+
106+
### Fetch Pending Jobs
107+
108+
```python
109+
# Get up to 10 highest-priority pending jobs
110+
keys = ProcessedData.jobs.fetch_pending(limit=10)
111+
112+
# Get pending jobs at priority 3 or higher (lower number)
113+
keys = ProcessedData.jobs.fetch_pending(priority=3)
114+
```
115+
116+
## Managing Jobs
117+
118+
### Mark Keys to Ignore
119+
120+
Skip specific keys during populate:
121+
122+
```python
123+
ProcessedData.jobs.ignore({"subject_id": 5, "session_id": 3})
124+
```
125+
126+
### Clear Jobs
127+
128+
```python
129+
# Delete all jobs
130+
ProcessedData.jobs.delete()
131+
132+
# Delete specific jobs
133+
(ProcessedData.jobs & "status='error'").delete()
134+
135+
# Drop the entire jobs table
136+
ProcessedData.jobs.drop()
137+
```
138+
139+
### View Error Details
140+
141+
```python
142+
# View error messages
143+
ProcessedData.jobs.errors.fetch("KEY", "error_message")
144+
145+
# Get full error traceback
146+
error_job = (ProcessedData.jobs.errors & key).fetch1()
147+
print(error_job["error_stack"])
148+
```
149+
150+
## Configuration
151+
152+
Configure job behavior in `datajoint.json`:
153+
154+
```json
155+
{
156+
"jobs": {
157+
"default_priority": 5,
158+
"stale_timeout": 3600,
159+
"keep_completed": false
160+
}
161+
}
162+
```
163+
164+
| Setting | Default | Description |
165+
|---------|---------|-------------|
166+
| `jobs.default_priority` | 5 | Default priority for new jobs |
167+
| `jobs.stale_timeout` | 3600 | Seconds before pending jobs are checked for staleness |
168+
| `jobs.keep_completed` | false | Keep job records after successful completion |
169+
170+
## Jobs Table Schema
171+
172+
The jobs table stores:
173+
174+
| Attribute | Type | Description |
175+
|-----------|------|-------------|
176+
| *primary key* | (varies) | FK-derived primary key from target table |
177+
| `status` | enum | pending, reserved, success, error, ignore |
178+
| `priority` | int | Lower = more urgent |
179+
| `created_time` | datetime | When job was added |
180+
| `scheduled_time` | datetime | Process on or after this time |
181+
| `reserved_time` | datetime | When job was reserved |
182+
| `completed_time` | datetime | When job completed |
183+
| `duration` | float | Execution duration in seconds |
184+
| `error_message` | varchar | Error message if failed |
185+
| `error_stack` | blob | Full error traceback |
186+
| `user` | varchar | Database user |
187+
| `host` | varchar | Worker hostname |
188+
| `pid` | int | Worker process ID |
189+
| `connection_id` | bigint | MySQL connection ID |
190+
191+
## Distributed Processing Example
192+
193+
Run multiple workers to process a table in parallel:
194+
195+
```python
196+
# Worker script (run on multiple machines)
197+
import datajoint as dj
198+
199+
schema = dj.Schema('my_pipeline')
200+
201+
@schema
202+
class Analysis(dj.Computed):
203+
definition = """
204+
-> Experiment
205+
---
206+
result : float
207+
"""
208+
209+
def make(self, key):
210+
data = (Experiment & key).fetch1('data')
211+
self.insert1(dict(key, result=analyze(data)))
212+
213+
# Each worker runs:
214+
Analysis.populate(reserve_jobs=True)
215+
```
216+
217+
With `reserve_jobs=True`, workers coordinate through the jobs table to avoid
218+
processing the same key twice.

0 commit comments

Comments
 (0)