Skip to content

Commit f434f1c

Browse files
committed
feat: add otel instrumentation system
1 parent c26de83 commit f434f1c

18 files changed

+1250
-82
lines changed

README.md

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,77 @@ await QueueManager.init({
519519
})
520520
```
521521

522+
## OpenTelemetry Instrumentation (experimental)
523+
524+
> [!WARNING]
525+
> The OpenTelemetry instrumentation is experimental and its API may change in future releases.
526+
527+
`@boringnode/queue` ships with built-in OpenTelemetry instrumentation that creates **PRODUCER** spans for job dispatch and **CONSUMER** spans for job execution, following [OTel messaging semantic conventions](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/).
528+
529+
### Quick Setup
530+
531+
```typescript
532+
import { QueueInstrumentation } from '@boringnode/queue/otel'
533+
import * as boringqueue from '@boringnode/queue'
534+
535+
const instrumentation = new QueueInstrumentation({
536+
messagingSystem: 'boringqueue', // default
537+
executionSpanLinkMode: 'link', // or 'parent'
538+
})
539+
540+
instrumentation.enable()
541+
instrumentation.manuallyRegister(boringqueue)
542+
```
543+
544+
The instrumentation patches `QueueManager.init()` to automatically inject its wrappers — no config changes needed in your queue setup.
545+
546+
### Span Attributes
547+
548+
The instrumentation uses standard [OTel messaging semantic conventions](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/) where they map cleanly, plus a few queue-specific custom attributes.
549+
550+
| Attribute | Kind | Description |
551+
| ------------------------------- | ------- | ------------------------------------------ |
552+
| `messaging.system` | Semconv | `'boringqueue'` (configurable) |
553+
| `messaging.operation.name` | Semconv | `'publish'` or `'process'` |
554+
| `messaging.destination.name` | Semconv | Queue name |
555+
| `messaging.message.id` | Semconv | Job ID for single-message spans |
556+
| `messaging.batch.message_count` | Semconv | Number of jobs in a batch dispatch |
557+
| `messaging.message.retry.count` | Custom | Retry count (0-based) for a job attempt |
558+
| `messaging.job.name` | Custom | Job class name (e.g. `SendEmailJob`) |
559+
| `messaging.job.status` | Custom | `'completed'`, `'failed'`, or `'retrying'` |
560+
| `messaging.job.group_id` | Custom | Queue-specific group identifier |
561+
| `messaging.job.priority` | Custom | Queue-specific job priority |
562+
| `messaging.job.delay_ms` | Custom | Delay before the job becomes available |
563+
564+
### Trace Context Propagation
565+
566+
The instrumentation automatically propagates trace context from dispatch to execution:
567+
568+
- **Link mode** (default): Each job execution is an independent trace, linked to the dispatch span
569+
- **Parent mode**: Job execution is a child of the dispatch span (same trace)
570+
571+
Child spans created inside `execute()` (DB queries, HTTP calls, etc.) are automatically parented to the job consumer span.
572+
573+
### diagnostics_channel
574+
575+
Raw telemetry events are available via `diagnostics_channel` for custom subscribers:
576+
577+
```typescript
578+
import { tracingChannels } from '@boringnode/queue'
579+
580+
const { executeChannel } = tracingChannels
581+
582+
executeChannel.subscribe({
583+
start() {},
584+
end() {},
585+
asyncStart() {},
586+
asyncEnd(message) {
587+
console.log(`Job ${message.job.name} ${message.status} in ${message.duration}ms`)
588+
},
589+
error() {},
590+
})
591+
```
592+
522593
## Benchmarks
523594

524595
Performance comparison with BullMQ (5ms simulated work per job):

index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ export {
1313
fixedBackoff,
1414
} from './src/strategies/backoff_strategy.js'
1515
export * as errors from './src/exceptions.js'
16+
export * as tracingChannels from './src/tracing_channels.js'

package.json

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
],
1010
"exports": {
1111
".": "./build/index.js",
12+
"./otel": "./build/src/otel.js",
1213
"./drivers/*": "./build/src/drivers/*.js",
1314
"./contracts/*": "./build/src/contracts/*.js",
1415
"./types": "./build/src/types/index.js"
@@ -40,6 +41,11 @@
4041
"@japa/expect-type": "^2.0.4",
4142
"@japa/file-system": "^3.0.0",
4243
"@japa/runner": "^5.3.0",
44+
"@opentelemetry/api": "^1.9.0",
45+
"@opentelemetry/context-async-hooks": "^2.6.0",
46+
"@opentelemetry/core": "^2.6.0",
47+
"@opentelemetry/instrumentation": "^0.213.0",
48+
"@opentelemetry/sdk-trace-base": "^2.6.0",
4349
"@poppinss/ts-exec": "^1.4.4",
4450
"@types/better-sqlite3": "^7.6.13",
4551
"@types/node": "^24.11.0",
@@ -59,10 +65,22 @@
5965
"typescript": "^5.9.3"
6066
},
6167
"peerDependencies": {
68+
"@opentelemetry/api": "^1.9.0",
69+
"@opentelemetry/core": "^1.30.0 || ^2.0.0",
70+
"@opentelemetry/instrumentation": "^0.200.0",
6271
"ioredis": "^5.0.0",
6372
"knex": "^3.0.0"
6473
},
6574
"peerDependenciesMeta": {
75+
"@opentelemetry/api": {
76+
"optional": true
77+
},
78+
"@opentelemetry/core": {
79+
"optional": true
80+
},
81+
"@opentelemetry/instrumentation": {
82+
"optional": true
83+
},
6684
"ioredis": {
6785
"optional": true
6886
},

src/drivers/sync_adapter.ts

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { setTimeout as sleep } from 'node:timers/promises'
22
import { Locator } from '../locator.js'
33
import { QueueManager } from '../queue_manager.js'
44
import { JobExecutionRuntime } from '../job_runtime.js'
5+
import { executeChannel } from '../tracing_channels.js'
56
import type { Adapter, AcquiredJob } from '../contracts/adapter.js'
67
import type {
78
JobContext,
@@ -11,6 +12,7 @@ import type {
1112
ScheduleData,
1213
ScheduleListOptions,
1314
} from '../types/main.js'
15+
import type { JobExecuteMessage } from '../types/tracing_channels.js'
1416
import { DEFAULT_PRIORITY } from '../constants.js'
1517

1618
/**
@@ -165,40 +167,60 @@ export class SyncAdapter implements Adapter {
165167
defaultTimeout: configResolver.getWorkerTimeout(),
166168
})
167169
const jobFactory = QueueManager.getJobFactory()
170+
const executionWrapper = QueueManager.getExecutionWrapper()
168171
let attempts = jobData.attempts
169172

170173
while (true) {
174+
const now = Date.now()
175+
const acquiredJob: AcquiredJob = { ...jobData, attempts, acquiredAt: now }
176+
171177
const context: JobContext = {
172178
jobId: jobData.id,
173179
name: jobData.name,
174180
attempt: attempts + 1,
175181
queue,
176182
priority: jobData.priority ?? DEFAULT_PRIORITY,
177-
acquiredAt: new Date(),
183+
acquiredAt: new Date(now),
178184
stalledCount: jobData.stalledCount ?? 0,
179185
}
180186

181187
const jobInstance = jobFactory ? await jobFactory(JobClass) : new JobClass()
182188

183-
try {
184-
await runtime.execute(jobInstance, jobData.payload, context)
185-
return
186-
} catch (error) {
187-
const outcome = runtime.resolveFailure(error as Error, attempts)
189+
const startTime = performance.now()
190+
const executeMessage: JobExecuteMessage = { job: acquiredJob, queue }
191+
192+
const run = () => {
193+
return executeChannel.tracePromise(async () => {
194+
try {
195+
await runtime.execute(jobInstance, jobData.payload, context)
196+
executeMessage.status = 'completed'
197+
} catch (error) {
198+
const outcome = runtime.resolveFailure(error as Error, attempts)
199+
executeMessage.error = error as Error
200+
201+
if (outcome.type === 'failed') {
202+
executeMessage.status = 'failed'
203+
await jobInstance.failed?.(outcome.hookError)
204+
} else if (outcome.type === 'retry') {
205+
executeMessage.status = 'retrying'
206+
executeMessage.nextRetryAt = outcome.retryAt
207+
}
208+
}
188209

189-
if (outcome.type === 'failed') {
190-
await jobInstance.failed?.(outcome.hookError)
191-
return
192-
}
210+
executeMessage.duration = Number((performance.now() - startTime).toFixed(2))
211+
}, executeMessage)
212+
}
193213

194-
attempts++
214+
await executionWrapper(run, acquiredJob, queue)
195215

196-
if (outcome.type === 'retry' && outcome.retryAt) {
197-
const delay = outcome.retryAt.getTime() - Date.now()
216+
if (executeMessage.status !== 'retrying') return
198217

199-
if (delay > 0) {
200-
await sleep(delay)
201-
}
218+
attempts++
219+
220+
if (executeMessage.nextRetryAt) {
221+
const delay = executeMessage.nextRetryAt.getTime() - Date.now()
222+
if (delay > 0) {
223+
await sleep(delay)
202224
}
203225
}
204226
}

src/job_batch_dispatcher.ts

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import debug from './debug.js'
22
import { randomUUID } from 'node:crypto'
33
import { QueueManager } from './queue_manager.js'
4+
import { dispatchChannel } from './tracing_channels.js'
45
import type { Adapter } from './contracts/adapter.js'
56
import type { DispatchManyResult } from './types/main.js'
7+
import type { JobDispatchMessage } from './types/tracing_channels.js'
68

79
/**
810
* Fluent builder for dispatching multiple jobs to the queue in a single batch.
@@ -143,9 +145,12 @@ export class JobBatchDispatcher<T> {
143145
* ```
144146
*/
145147
async run(): Promise<DispatchManyResult> {
148+
if (this.#payloads.length === 0) return { jobIds: [] }
149+
146150
debug('dispatching %d jobs of type %s', this.#payloads.length, this.#name)
147151

148152
const adapter = this.#getAdapterInstance()
153+
const wrapInternal = QueueManager.getInternalOperationWrapper()
149154

150155
const jobs = this.#payloads.map((payload) => ({
151156
id: randomUUID(),
@@ -156,11 +161,14 @@ export class JobBatchDispatcher<T> {
156161
groupId: this.#groupId,
157162
}))
158163

159-
await adapter.pushManyOn(this.#queue, jobs)
164+
const message: JobDispatchMessage = { jobs, queue: this.#queue }
160165

161-
return {
162-
jobIds: jobs.map((job) => job.id),
163-
}
166+
167+
await dispatchChannel.tracePromise(async () => {
168+
await wrapInternal(() => adapter.pushManyOn(this.#queue, jobs))
169+
}, message)
170+
171+
return { jobIds: jobs.map((job) => job.id) }
164172
}
165173

166174
/**

src/job_dispatcher.ts

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import debug from './debug.js'
22
import { randomUUID } from 'node:crypto'
33
import { QueueManager } from './queue_manager.js'
4+
import { dispatchChannel } from './tracing_channels.js'
45
import type { Adapter } from './contracts/adapter.js'
56
import type { DispatchResult, Duration } from './types/main.js'
7+
import type { JobDispatchMessage } from './types/tracing_channels.js'
68
import { parse } from './utils.js'
79

810
/**
@@ -184,8 +186,10 @@ export class JobDispatcher<T> {
184186
debug('dispatching job %s with id %s using payload %s', this.#name, id, this.#payload)
185187

186188
const adapter = this.#getAdapterInstance()
189+
const wrapInternal = QueueManager.getInternalOperationWrapper()
190+
const parsedDelay = this.#delay ? parse(this.#delay) : undefined
187191

188-
const payload = {
192+
const jobData = {
189193
id,
190194
name: this.#name,
191195
payload: this.#payload,
@@ -194,17 +198,17 @@ export class JobDispatcher<T> {
194198
groupId: this.#groupId,
195199
}
196200

197-
if (this.#delay) {
198-
const parsedDelay = parse(this.#delay)
201+
const message: JobDispatchMessage = { jobs: [jobData], queue: this.#queue, delay: parsedDelay }
199202

200-
await adapter.pushLaterOn(this.#queue, payload, parsedDelay)
201-
} else {
202-
await adapter.pushOn(this.#queue, payload)
203-
}
203+
await dispatchChannel.tracePromise(async () => {
204+
if (parsedDelay !== undefined) {
205+
await wrapInternal(() => adapter.pushLaterOn(this.#queue, jobData, parsedDelay))
206+
} else {
207+
await wrapInternal(() => adapter.pushOn(this.#queue, jobData))
208+
}
209+
}, message)
204210

205-
return {
206-
jobId: id,
207-
}
211+
return { jobId: id }
208212
}
209213

210214
/**

0 commit comments

Comments
 (0)