Skip to content

Commit b2594d2

Browse files
committed
feat: add groupId support for job grouping
1 parent bd3b8d1 commit b2594d2

File tree

5 files changed

+153
-1
lines changed

5 files changed

+153
-1
lines changed

.changelog/group-id.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Job grouping
2+
3+
## New feature
4+
5+
### Group ID
6+
7+
Jobs can now be assigned to a group using the `groupId` option. This allows organizing related jobs together for easier monitoring and filtering in UIs.
8+
9+
```typescript
10+
// Assign jobs to a group
11+
await SendEmailJob.dispatch({ to: 'user@example.com' })
12+
.group('newsletter-jan-2025')
13+
.run()
14+
15+
// Combine with other options
16+
await ExportJob.dispatch({ userId: 1 })
17+
.group('batch-export-123')
18+
.toQueue('exports')
19+
.priority(2)
20+
.run()
21+
```
22+
23+
### Use cases
24+
25+
- **Batch operations**: Group all jobs from a newsletter send, bulk export, or data migration
26+
- **Monitoring**: Filter and view related jobs together in queue UIs
27+
- **Debugging**: Easily find all jobs related to a specific operation
28+
29+
### API
30+
31+
The `groupId` is stored with the job data and can be accessed via:
32+
33+
```typescript
34+
const record = await adapter.getJob('job-id', 'queue-name')
35+
console.log(record.data.groupId) // 'newsletter-jan-2025'
36+
```

examples/jobs/metrics_job.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ interface MetricsJobPayload {
1919
export default class MetricsJob extends Job<MetricsJobPayload> {
2020
static options: JobOptions = {
2121
queue: 'metrics',
22-
removeOnComplete: true,
22+
removeOnComplete: { count: 10 },
2323
removeOnFail: { count: 100 },
2424
}
2525

src/job_dispatcher.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ export class JobDispatcher<T> {
4444
#adapter?: string | (() => Adapter)
4545
#delay?: Duration
4646
#priority?: number
47+
#groupId?: string
4748

4849
/**
4950
* Create a new job dispatcher.
@@ -121,6 +122,30 @@ export class JobDispatcher<T> {
121122
return this
122123
}
123124

125+
/**
126+
* Assign this job to a group.
127+
*
128+
* Jobs with the same groupId can be filtered and displayed together
129+
* in monitoring UIs. Useful for batch operations like newsletters
130+
* or bulk exports.
131+
*
132+
* @param groupId - Group identifier
133+
* @returns This dispatcher for chaining
134+
*
135+
* @example
136+
* ```typescript
137+
* // Group newsletter jobs together
138+
* await SendEmailJob.dispatch({ to: 'user@example.com' })
139+
* .group('newsletter-jan-2025')
140+
* .run()
141+
* ```
142+
*/
143+
group(groupId: string): this {
144+
this.#groupId = groupId
145+
146+
return this
147+
}
148+
124149
/**
125150
* Use a specific adapter for this job.
126151
*
@@ -166,6 +191,7 @@ export class JobDispatcher<T> {
166191
payload: this.#payload,
167192
attempts: 0,
168193
priority: this.#priority,
194+
groupId: this.#groupId,
169195
}
170196

171197
if (this.#delay) {

src/types/main.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,22 @@ export interface JobData {
9090
* Number of times this job was recovered from stalled state.
9191
*/
9292
stalledCount?: number
93+
94+
/**
95+
* Optional group identifier for organizing related jobs.
96+
*
97+
* Jobs with the same groupId can be filtered and displayed together
98+
* in monitoring UIs. Useful for batch operations like newsletters
99+
* or bulk exports.
100+
*
101+
* @example
102+
* ```typescript
103+
* await SendEmailJob.dispatch({ to: 'user@example.com' })
104+
* .group('newsletter-jan-2025')
105+
* .run()
106+
* ```
107+
*/
108+
groupId?: string
93109
}
94110

95111
/**

tests/job_dispatcher.spec.ts

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,3 +222,77 @@ test.group('JobDispatcher | DispatchResult', () => {
222222
assert.property(result, 'jobId')
223223
})
224224
})
225+
226+
test.group('JobDispatcher | groupId', () => {
227+
test('should dispatch job with groupId', async ({ assert }) => {
228+
const sharedAdapter = memory()()
229+
230+
const localConfig = {
231+
default: 'memory',
232+
adapters: { memory: () => sharedAdapter },
233+
}
234+
235+
await QueueManager.init(localConfig)
236+
237+
const dispatcher = new JobDispatcher('NewsletterJob', { userId: 1 })
238+
239+
const { jobId } = await dispatcher.group('newsletter-jan-2025').run()
240+
241+
assert.isString(jobId)
242+
243+
const job = await sharedAdapter.pop()
244+
245+
assert.isNotNull(job)
246+
assert.equal(job!.id, jobId)
247+
assert.equal(job!.groupId, 'newsletter-jan-2025')
248+
})
249+
250+
test('should dispatch multiple jobs with same groupId', async ({ assert }) => {
251+
const sharedAdapter = memory()()
252+
253+
const localConfig = {
254+
default: 'memory',
255+
adapters: { memory: () => sharedAdapter },
256+
}
257+
258+
await QueueManager.init(localConfig)
259+
260+
const groupId = 'batch-export-123'
261+
262+
await new JobDispatcher('ExportJob', { userId: 1 }).group(groupId).run()
263+
await new JobDispatcher('ExportJob', { userId: 2 }).group(groupId).run()
264+
await new JobDispatcher('ExportJob', { userId: 3 }).group(groupId).run()
265+
266+
const job1 = await sharedAdapter.pop()
267+
const job2 = await sharedAdapter.pop()
268+
const job3 = await sharedAdapter.pop()
269+
270+
assert.equal(job1!.groupId, groupId)
271+
assert.equal(job2!.groupId, groupId)
272+
assert.equal(job3!.groupId, groupId)
273+
})
274+
275+
test('should work with other options like priority and queue', async ({ assert }) => {
276+
const sharedAdapter = memory()()
277+
278+
const localConfig = {
279+
default: 'memory',
280+
adapters: { memory: () => sharedAdapter },
281+
}
282+
283+
await QueueManager.init(localConfig)
284+
285+
const { jobId } = await new JobDispatcher('ImportJob', { file: 'data.csv' })
286+
.group('import-batch-456')
287+
.toQueue('imports')
288+
.priority(2)
289+
.run()
290+
291+
const job = await sharedAdapter.popFrom('imports')
292+
293+
assert.isNotNull(job)
294+
assert.equal(job!.id, jobId)
295+
assert.equal(job!.groupId, 'import-batch-456')
296+
assert.equal(job!.priority, 2)
297+
})
298+
})

0 commit comments

Comments
 (0)