From f434f1cc16333f2dede47b6f087ff36dd4eab97c Mon Sep 17 00:00:00 2001 From: Julien Ripouteau Date: Mon, 23 Mar 2026 12:53:13 +0100 Subject: [PATCH] feat: add otel instrumentation system --- README.md | 71 ++++++ index.ts | 1 + package.json | 18 ++ src/drivers/sync_adapter.ts | 54 +++-- src/job_batch_dispatcher.ts | 16 +- src/job_dispatcher.ts | 24 +- src/otel.ts | 304 ++++++++++++++++++++++++++ src/queue_manager.ts | 36 +++ src/tracing_channels.ts | 27 +++ src/types/index.ts | 4 + src/types/main.ts | 19 ++ src/types/tracing_channels.ts | 49 +++++ src/worker.ts | 115 +++++----- tests/helpers/setup_tracing.ts | 36 +++ tests/job_dispatcher.spec.ts | 19 ++ tests/otel.spec.ts | 389 +++++++++++++++++++++++++++++++++ tsup.config.ts | 8 +- yarn.lock | 142 +++++++++++- 18 files changed, 1250 insertions(+), 82 deletions(-) create mode 100644 src/otel.ts create mode 100644 src/tracing_channels.ts create mode 100644 src/types/tracing_channels.ts create mode 100644 tests/helpers/setup_tracing.ts create mode 100644 tests/otel.spec.ts diff --git a/README.md b/README.md index 92fb173..330b4be 100644 --- a/README.md +++ b/README.md @@ -519,6 +519,77 @@ await QueueManager.init({ }) ``` +## OpenTelemetry Instrumentation (experimental) + +> [!WARNING] +> The OpenTelemetry instrumentation is experimental and its API may change in future releases. + +`@boringnode/queue` ships with built-in OpenTelemetry instrumentation that creates **PRODUCER** spans for job dispatch and **CONSUMER** spans for job execution, following [OTel messaging semantic conventions](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/). + +### Quick Setup + +```typescript +import { QueueInstrumentation } from '@boringnode/queue/otel' +import * as boringqueue from '@boringnode/queue' + +const instrumentation = new QueueInstrumentation({ + messagingSystem: 'boringqueue', // default + executionSpanLinkMode: 'link', // or 'parent' +}) + +instrumentation.enable() +instrumentation.manuallyRegister(boringqueue) +``` + +The instrumentation patches `QueueManager.init()` to automatically inject its wrappers — no config changes needed in your queue setup. + +### Span Attributes + +The instrumentation uses standard [OTel messaging semantic conventions](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/) where they map cleanly, plus a few queue-specific custom attributes. + +| Attribute | Kind | Description | +| ------------------------------- | ------- | ------------------------------------------ | +| `messaging.system` | Semconv | `'boringqueue'` (configurable) | +| `messaging.operation.name` | Semconv | `'publish'` or `'process'` | +| `messaging.destination.name` | Semconv | Queue name | +| `messaging.message.id` | Semconv | Job ID for single-message spans | +| `messaging.batch.message_count` | Semconv | Number of jobs in a batch dispatch | +| `messaging.message.retry.count` | Custom | Retry count (0-based) for a job attempt | +| `messaging.job.name` | Custom | Job class name (e.g. `SendEmailJob`) | +| `messaging.job.status` | Custom | `'completed'`, `'failed'`, or `'retrying'` | +| `messaging.job.group_id` | Custom | Queue-specific group identifier | +| `messaging.job.priority` | Custom | Queue-specific job priority | +| `messaging.job.delay_ms` | Custom | Delay before the job becomes available | + +### Trace Context Propagation + +The instrumentation automatically propagates trace context from dispatch to execution: + +- **Link mode** (default): Each job execution is an independent trace, linked to the dispatch span +- **Parent mode**: Job execution is a child of the dispatch span (same trace) + +Child spans created inside `execute()` (DB queries, HTTP calls, etc.) are automatically parented to the job consumer span. + +### diagnostics_channel + +Raw telemetry events are available via `diagnostics_channel` for custom subscribers: + +```typescript +import { tracingChannels } from '@boringnode/queue' + +const { executeChannel } = tracingChannels + +executeChannel.subscribe({ + start() {}, + end() {}, + asyncStart() {}, + asyncEnd(message) { + console.log(`Job ${message.job.name} ${message.status} in ${message.duration}ms`) + }, + error() {}, +}) +``` + ## Benchmarks Performance comparison with BullMQ (5ms simulated work per job): diff --git a/index.ts b/index.ts index 2a99268..ac3860b 100644 --- a/index.ts +++ b/index.ts @@ -13,3 +13,4 @@ export { fixedBackoff, } from './src/strategies/backoff_strategy.js' export * as errors from './src/exceptions.js' +export * as tracingChannels from './src/tracing_channels.js' diff --git a/package.json b/package.json index b62a34f..6a3df5e 100644 --- a/package.json +++ b/package.json @@ -9,6 +9,7 @@ ], "exports": { ".": "./build/index.js", + "./otel": "./build/src/otel.js", "./drivers/*": "./build/src/drivers/*.js", "./contracts/*": "./build/src/contracts/*.js", "./types": "./build/src/types/index.js" @@ -40,6 +41,11 @@ "@japa/expect-type": "^2.0.4", "@japa/file-system": "^3.0.0", "@japa/runner": "^5.3.0", + "@opentelemetry/api": "^1.9.0", + "@opentelemetry/context-async-hooks": "^2.6.0", + "@opentelemetry/core": "^2.6.0", + "@opentelemetry/instrumentation": "^0.213.0", + "@opentelemetry/sdk-trace-base": "^2.6.0", "@poppinss/ts-exec": "^1.4.4", "@types/better-sqlite3": "^7.6.13", "@types/node": "^24.11.0", @@ -59,10 +65,22 @@ "typescript": "^5.9.3" }, "peerDependencies": { + "@opentelemetry/api": "^1.9.0", + "@opentelemetry/core": "^1.30.0 || ^2.0.0", + "@opentelemetry/instrumentation": "^0.200.0", "ioredis": "^5.0.0", "knex": "^3.0.0" }, "peerDependenciesMeta": { + "@opentelemetry/api": { + "optional": true + }, + "@opentelemetry/core": { + "optional": true + }, + "@opentelemetry/instrumentation": { + "optional": true + }, "ioredis": { "optional": true }, diff --git a/src/drivers/sync_adapter.ts b/src/drivers/sync_adapter.ts index 66f8318..796db00 100644 --- a/src/drivers/sync_adapter.ts +++ b/src/drivers/sync_adapter.ts @@ -2,6 +2,7 @@ import { setTimeout as sleep } from 'node:timers/promises' import { Locator } from '../locator.js' import { QueueManager } from '../queue_manager.js' import { JobExecutionRuntime } from '../job_runtime.js' +import { executeChannel } from '../tracing_channels.js' import type { Adapter, AcquiredJob } from '../contracts/adapter.js' import type { JobContext, @@ -11,6 +12,7 @@ import type { ScheduleData, ScheduleListOptions, } from '../types/main.js' +import type { JobExecuteMessage } from '../types/tracing_channels.js' import { DEFAULT_PRIORITY } from '../constants.js' /** @@ -165,40 +167,60 @@ export class SyncAdapter implements Adapter { defaultTimeout: configResolver.getWorkerTimeout(), }) const jobFactory = QueueManager.getJobFactory() + const executionWrapper = QueueManager.getExecutionWrapper() let attempts = jobData.attempts while (true) { + const now = Date.now() + const acquiredJob: AcquiredJob = { ...jobData, attempts, acquiredAt: now } + const context: JobContext = { jobId: jobData.id, name: jobData.name, attempt: attempts + 1, queue, priority: jobData.priority ?? DEFAULT_PRIORITY, - acquiredAt: new Date(), + acquiredAt: new Date(now), stalledCount: jobData.stalledCount ?? 0, } const jobInstance = jobFactory ? await jobFactory(JobClass) : new JobClass() - try { - await runtime.execute(jobInstance, jobData.payload, context) - return - } catch (error) { - const outcome = runtime.resolveFailure(error as Error, attempts) + const startTime = performance.now() + const executeMessage: JobExecuteMessage = { job: acquiredJob, queue } + + const run = () => { + return executeChannel.tracePromise(async () => { + try { + await runtime.execute(jobInstance, jobData.payload, context) + executeMessage.status = 'completed' + } catch (error) { + const outcome = runtime.resolveFailure(error as Error, attempts) + executeMessage.error = error as Error + + if (outcome.type === 'failed') { + executeMessage.status = 'failed' + await jobInstance.failed?.(outcome.hookError) + } else if (outcome.type === 'retry') { + executeMessage.status = 'retrying' + executeMessage.nextRetryAt = outcome.retryAt + } + } - if (outcome.type === 'failed') { - await jobInstance.failed?.(outcome.hookError) - return - } + executeMessage.duration = Number((performance.now() - startTime).toFixed(2)) + }, executeMessage) + } - attempts++ + await executionWrapper(run, acquiredJob, queue) - if (outcome.type === 'retry' && outcome.retryAt) { - const delay = outcome.retryAt.getTime() - Date.now() + if (executeMessage.status !== 'retrying') return - if (delay > 0) { - await sleep(delay) - } + attempts++ + + if (executeMessage.nextRetryAt) { + const delay = executeMessage.nextRetryAt.getTime() - Date.now() + if (delay > 0) { + await sleep(delay) } } } diff --git a/src/job_batch_dispatcher.ts b/src/job_batch_dispatcher.ts index 43b9cff..433e5c6 100644 --- a/src/job_batch_dispatcher.ts +++ b/src/job_batch_dispatcher.ts @@ -1,8 +1,10 @@ import debug from './debug.js' import { randomUUID } from 'node:crypto' import { QueueManager } from './queue_manager.js' +import { dispatchChannel } from './tracing_channels.js' import type { Adapter } from './contracts/adapter.js' import type { DispatchManyResult } from './types/main.js' +import type { JobDispatchMessage } from './types/tracing_channels.js' /** * Fluent builder for dispatching multiple jobs to the queue in a single batch. @@ -143,9 +145,12 @@ export class JobBatchDispatcher { * ``` */ async run(): Promise { + if (this.#payloads.length === 0) return { jobIds: [] } + debug('dispatching %d jobs of type %s', this.#payloads.length, this.#name) const adapter = this.#getAdapterInstance() + const wrapInternal = QueueManager.getInternalOperationWrapper() const jobs = this.#payloads.map((payload) => ({ id: randomUUID(), @@ -156,11 +161,14 @@ export class JobBatchDispatcher { groupId: this.#groupId, })) - await adapter.pushManyOn(this.#queue, jobs) + const message: JobDispatchMessage = { jobs, queue: this.#queue } - return { - jobIds: jobs.map((job) => job.id), - } + + await dispatchChannel.tracePromise(async () => { + await wrapInternal(() => adapter.pushManyOn(this.#queue, jobs)) + }, message) + + return { jobIds: jobs.map((job) => job.id) } } /** diff --git a/src/job_dispatcher.ts b/src/job_dispatcher.ts index 07a058e..89ada1c 100644 --- a/src/job_dispatcher.ts +++ b/src/job_dispatcher.ts @@ -1,8 +1,10 @@ import debug from './debug.js' import { randomUUID } from 'node:crypto' import { QueueManager } from './queue_manager.js' +import { dispatchChannel } from './tracing_channels.js' import type { Adapter } from './contracts/adapter.js' import type { DispatchResult, Duration } from './types/main.js' +import type { JobDispatchMessage } from './types/tracing_channels.js' import { parse } from './utils.js' /** @@ -184,8 +186,10 @@ export class JobDispatcher { debug('dispatching job %s with id %s using payload %s', this.#name, id, this.#payload) const adapter = this.#getAdapterInstance() + const wrapInternal = QueueManager.getInternalOperationWrapper() + const parsedDelay = this.#delay ? parse(this.#delay) : undefined - const payload = { + const jobData = { id, name: this.#name, payload: this.#payload, @@ -194,17 +198,17 @@ export class JobDispatcher { groupId: this.#groupId, } - if (this.#delay) { - const parsedDelay = parse(this.#delay) + const message: JobDispatchMessage = { jobs: [jobData], queue: this.#queue, delay: parsedDelay } - await adapter.pushLaterOn(this.#queue, payload, parsedDelay) - } else { - await adapter.pushOn(this.#queue, payload) - } + await dispatchChannel.tracePromise(async () => { + if (parsedDelay !== undefined) { + await wrapInternal(() => adapter.pushLaterOn(this.#queue, jobData, parsedDelay)) + } else { + await wrapInternal(() => adapter.pushOn(this.#queue, jobData)) + } + }, message) - return { - jobId: id, - } + return { jobId: id } } /** diff --git a/src/otel.ts b/src/otel.ts new file mode 100644 index 0000000..033cf59 --- /dev/null +++ b/src/otel.ts @@ -0,0 +1,304 @@ +import { + context, + propagation, + trace, + SpanKind, + SpanStatusCode, + ROOT_CONTEXT, + type Span, + type Link, +} from '@opentelemetry/api' +import type { TracingChannelSubscribers } from 'node:diagnostics_channel' +import { suppressTracing } from '@opentelemetry/core' +import { InstrumentationBase } from '@opentelemetry/instrumentation' +import type { InstrumentationConfig } from '@opentelemetry/instrumentation' +import { dispatchChannel, executeChannel } from './tracing_channels.js' +import type { AcquiredJob } from './contracts/adapter.js' +import type { JobDispatchMessage, JobExecuteMessage } from './types/tracing_channels.js' + +export interface QueueInstrumentationConfig extends InstrumentationConfig { + /** + * How execution spans relate to the dispatch span. + * + * - `'link'` (default): Independent trace, linked to dispatch span + * - `'parent'`: Child of the dispatch span (same trace) + */ + executionSpanLinkMode?: 'link' | 'parent' + + /** + * The messaging system identifier. + * + * @default 'boringqueue' + */ + messagingSystem?: string +} + +/** + * OpenTelemetry instrumentation for @boringnode/queue. + * + * Creates PRODUCER spans for job dispatch and CONSUMER spans for + * job execution, following OTel messaging semantic conventions. + * + * Uses `diagnostics_channel` for span lifecycle management and + * patches `QueueManager.init()` to inject wrappers automatically. + */ +export class QueueInstrumentation extends InstrumentationBase { + protected subscribed = false + protected executeSpans = new Map() + protected dispatchSpans = new WeakMap() + protected executeHandlers?: TracingChannelSubscribers + protected dispatchHandlers?: TracingChannelSubscribers + + #originalInit?: (...args: any[]) => any + #patchedManager?: { init: (...args: any[]) => any } + + constructor(config: QueueInstrumentationConfig = {}) { + super('@boringnode/queue', '0.1.0', config) + } + + get #messagingSystem(): string { + return this.getConfig().messagingSystem ?? 'boringqueue' + } + + get #executionSpanLinkMode(): 'link' | 'parent' { + return this.getConfig().executionSpanLinkMode ?? 'link' + } + + /** + * Required by InstrumentationBase. Returns undefined since we use + * diagnostics_channel instead of module patching. + */ + protected init() { + return undefined + } + + /** + * Subscribes to diagnostics_channels for span lifecycle. + */ + enable() { + super.enable() + if (this.subscribed !== undefined) this.#subscribe() + } + + /** + * Unsubscribes from diagnostics_channels and restores patched methods. + */ + disable() { + if (this.subscribed !== undefined) { + this.#unsubscribe() + this.#unpatchInit() + } + + super.disable() + } + + /** + * Patches `QueueManager.init()` to auto-inject OTel wrappers + * and subscribes to diagnostics_channels. + */ + manuallyRegister(queueModule: { QueueManager: { init: (...args: any[]) => any } }) { + this.#patchInit(queueModule.QueueManager) + this.#subscribe() + } + + #patchInit(manager: { init: (...args: any[]) => any }) { + if (this.#originalInit) return + + this.#patchedManager = manager + this.#originalInit = manager.init.bind(manager) + const instrumentation = this + + manager.init = async (config: any) => { + return this.#originalInit!({ + ...config, + internalOperationWrapper: (fn: () => Promise) => { + return context.with(suppressTracing(context.active()), fn) + }, + executionWrapper: (fn: () => Promise, job: AcquiredJob, queue: string) => { + return instrumentation.#wrapExecution(fn, job, queue) + }, + }) + } + } + + #unpatchInit() { + if (!this.#originalInit || !this.#patchedManager) return + + this.#patchedManager.init = this.#originalInit + this.#originalInit = undefined + this.#patchedManager = undefined + } + + #subscribe() { + if (this.subscribed) return + if (!this.isEnabled()) return + + this.subscribed = true + + this.executeHandlers = { + start: () => {}, + end: () => {}, + asyncStart: () => {}, + asyncEnd: (msg) => this.#handleExecuteAsyncEnd(msg as unknown as JobExecuteMessage), + error: () => {}, + } + + this.dispatchHandlers = { + start: (msg) => this.#handleDispatchStart(msg as unknown as JobDispatchMessage), + end: () => {}, + asyncStart: () => {}, + asyncEnd: (msg) => this.#handleDispatchAsyncEnd(msg as unknown as JobDispatchMessage), + error: () => {}, + } + + executeChannel.subscribe(this.executeHandlers as any) + dispatchChannel.subscribe(this.dispatchHandlers as any) + } + + #unsubscribe() { + if (!this.subscribed) return + + if (this.executeHandlers) executeChannel.unsubscribe(this.executeHandlers as any) + if (this.dispatchHandlers) dispatchChannel.unsubscribe(this.dispatchHandlers as any) + + this.subscribed = false + this.executeHandlers = undefined + this.dispatchHandlers = undefined + this.executeSpans.clear() + this.dispatchSpans = new WeakMap() + } + + /** + * Called on dispatchChannel `start` — injects trace context into jobData + * and creates/enriches a PRODUCER span. + */ + #handleDispatchStart(message: JobDispatchMessage) { + const attributes = this.#buildDispatchAttributes(message) + const span = this.tracer.startSpan(`publish ${message.queue}`, { + kind: SpanKind.PRODUCER, + attributes, + }) + + const dispatchContext = trace.setSpan(context.active(), span) + for (const job of message.jobs) { + if (!job.traceContext) job.traceContext = {} + propagation.inject(dispatchContext, job.traceContext) + } + + this.dispatchSpans.set(message, span) + } + + #handleDispatchAsyncEnd(message: JobDispatchMessage) { + const span = this.dispatchSpans.get(message) + if (!span) return + + if (message.error) { + span.recordException(message.error) + span.setStatus({ code: SpanStatusCode.ERROR, message: message.error.message }) + } + + span.end() + this.dispatchSpans.delete(message) + } + + /** + * Called by `executionWrapper` config — creates CONSUMER span and wraps + * execution in OTel context for proper child span parenting. + */ + #wrapExecution(fn: () => Promise, job: AcquiredJob, queue: string): Promise { + const extractedContext = this.#extractParentContext(job.traceContext) + const parentSpanContext = trace.getSpanContext(extractedContext) + + let baseContext: typeof extractedContext + let links: Link[] + + if (this.#executionSpanLinkMode === 'parent' && parentSpanContext) { + baseContext = extractedContext + links = [] + } else { + links = parentSpanContext ? [{ context: parentSpanContext }] : [] + baseContext = ROOT_CONTEXT + } + + const span = this.tracer.startSpan( + `process ${queue}`, + { + kind: SpanKind.CONSUMER, + attributes: this.#buildExecuteAttributes(job, queue), + links, + }, + baseContext + ) + + this.executeSpans.set(job.id, span) + const executionContext = trace.setSpan(baseContext, span) + + return context.with(executionContext, fn) + } + + #handleExecuteAsyncEnd(message: JobExecuteMessage) { + const span = this.executeSpans.get(message.job.id) + if (!span) return + + if (message.status) span.setAttribute('messaging.job.status', message.status) + if (message.error) span.recordException(message.error) + + if (message.status === 'retrying' && message.nextRetryAt) { + span.addEvent('messaging.retry', { + 'messaging.message.retry.count': message.job.attempts + 1, + 'messaging.job.retry_at': message.nextRetryAt.toISOString(), + }) + } + + if (message.status === 'failed') { + span.setStatus({ code: SpanStatusCode.ERROR, message: message.error?.message }) + } else { + span.setStatus({ code: SpanStatusCode.OK }) + } + + span.end() + this.executeSpans.delete(message.job.id) + } + + #extractParentContext(traceContext?: Record) { + if (!traceContext || Object.keys(traceContext).length === 0) return ROOT_CONTEXT + return propagation.extract(ROOT_CONTEXT, traceContext) + } + + #buildDispatchAttributes(message: JobDispatchMessage) { + const firstJob = message.jobs[0] + const attributes: Record = { + 'messaging.system': this.#messagingSystem, + 'messaging.operation.name': 'publish', + 'messaging.operation.type': 'send', + 'messaging.destination.name': message.queue, + 'messaging.job.name': firstJob.name, + } + + if (message.jobs.length === 1) attributes['messaging.message.id'] = firstJob.id + if (message.jobs.length > 1) attributes['messaging.batch.message_count'] = message.jobs.length + if (firstJob.groupId) attributes['messaging.job.group_id'] = firstJob.groupId + if (firstJob.priority !== undefined) attributes['messaging.job.priority'] = firstJob.priority + if (message.delay !== undefined) attributes['messaging.job.delay_ms'] = message.delay + + return attributes + } + + #buildExecuteAttributes(job: AcquiredJob, queue: string) { + const attributes: Record = { + 'entry_point.type': 'job', + 'messaging.system': this.#messagingSystem, + 'messaging.operation.name': 'process', + 'messaging.operation.type': 'process', + 'messaging.destination.name': queue, + 'messaging.message.id': job.id, + 'messaging.message.retry.count': job.attempts, + 'messaging.job.name': job.name, + } + + if (job.groupId) attributes['messaging.job.group_id'] = job.groupId + if (job.priority !== undefined) attributes['messaging.job.priority'] = job.priority + + return attributes + } +} diff --git a/src/queue_manager.ts b/src/queue_manager.ts index 7bdf6b3..30e493e 100644 --- a/src/queue_manager.ts +++ b/src/queue_manager.ts @@ -7,12 +7,17 @@ import { QueueConfigResolver } from './queue_config_resolver.js' import type { Adapter } from './contracts/adapter.js' import type { AdapterFactory, JobFactory, QueueManagerConfig } from './types/main.js' +const noopInternalOperationWrapper: NonNullable = async (fn) => fn() +const noopExecutionWrapper: NonNullable = async (fn) => fn() + type QueueManagerFakeState = { defaultAdapter: string adapters: Record adapterInstances: Map logger: Logger jobFactory?: JobFactory + internalOperationWrapper?: QueueManagerConfig['internalOperationWrapper'] + executionWrapper?: QueueManagerConfig['executionWrapper'] configResolver: QueueConfigResolver fakeAdapter: FakeAdapter } @@ -55,6 +60,8 @@ class QueueManagerSingleton { #adapterInstances: Map = new Map() #logger: Logger = consoleLogger #jobFactory?: JobFactory + #internalOperationWrapper?: QueueManagerConfig['internalOperationWrapper'] + #executionWrapper?: QueueManagerConfig['executionWrapper'] #configResolver: QueueConfigResolver = new QueueConfigResolver({}) #fakeState?: QueueManagerFakeState @@ -93,6 +100,8 @@ class QueueManagerSingleton { this.#adapters = config.adapters this.#logger = config.logger ?? consoleLogger this.#jobFactory = config.jobFactory + this.#internalOperationWrapper = config.internalOperationWrapper + this.#executionWrapper = config.executionWrapper this.#configResolver = QueueConfigResolver.from(config) if (config.locations && config.locations.length > 0) { @@ -242,6 +251,8 @@ class QueueManagerSingleton { adapterInstances: this.#adapterInstances, logger: this.#logger, jobFactory: this.#jobFactory, + internalOperationWrapper: this.#internalOperationWrapper, + executionWrapper: this.#executionWrapper, configResolver: this.#configResolver, fakeAdapter, } @@ -281,6 +292,8 @@ class QueueManagerSingleton { this.#adapterInstances = state.adapterInstances this.#logger = state.logger this.#jobFactory = state.jobFactory + this.#internalOperationWrapper = state.internalOperationWrapper + this.#executionWrapper = state.executionWrapper this.#configResolver = state.configResolver } @@ -293,6 +306,13 @@ class QueueManagerSingleton { return this.#jobFactory } + /** + * Whether the queue manager has been initialized. + */ + isInitialized(): boolean { + return this.#initialized + } + /** * Get the configured logger used by the queue runtime. */ @@ -300,6 +320,20 @@ class QueueManagerSingleton { return this.#logger } + /** + * Get the configured internal operation wrapper. + */ + getInternalOperationWrapper() { + return this.#internalOperationWrapper ?? noopInternalOperationWrapper + } + + /** + * Get the configured execution wrapper. + */ + getExecutionWrapper() { + return this.#executionWrapper ?? noopExecutionWrapper + } + /** * Get the resolver responsible for effective queue/job runtime config. */ @@ -362,6 +396,8 @@ class QueueManagerSingleton { this.#adapterInstances.clear() this.#initialized = false + this.#internalOperationWrapper = undefined + this.#executionWrapper = undefined this.#configResolver = new QueueConfigResolver({}) this.#fakeState = undefined } diff --git a/src/tracing_channels.ts b/src/tracing_channels.ts new file mode 100644 index 0000000..f7095bc --- /dev/null +++ b/src/tracing_channels.ts @@ -0,0 +1,27 @@ +/* + * @boringnode/queue + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +import diagnostics_channel from 'node:diagnostics_channel' +import type { JobDispatchMessage, JobExecuteMessage } from './types/tracing_channels.js' + +/** + * Traces job dispatch operations (push to queue). + * Fires for single dispatch, batch dispatch, and scheduled job dispatch. + */ +export const dispatchChannel = diagnostics_channel.tracingChannel< + 'boringqueue.job.dispatch', + JobDispatchMessage +>('boringqueue.job.dispatch') + +/** + * Traces job execution by the worker or sync adapter. + * Each retry attempt fires a separate trace. + */ +export const executeChannel = diagnostics_channel.tracingChannel< + 'boringqueue.job.execute', + JobExecuteMessage +>('boringqueue.job.execute') diff --git a/src/types/index.ts b/src/types/index.ts index c908aee..52b8f1d 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -27,3 +27,7 @@ export type { } from './main.js' export type { Adapter, AcquiredJob } from '../contracts/adapter.js' + +export type { JobDispatchMessage, JobExecuteMessage } from './tracing_channels.js' + +export type { QueueInstrumentationConfig } from '../otel.js' diff --git a/src/types/main.ts b/src/types/main.ts index e4991ac..4494669 100644 --- a/src/types/main.ts +++ b/src/types/main.ts @@ -1,5 +1,6 @@ import type { BackoffStrategy as BackoffStrategyClass } from '../strategies/backoff_strategy.js' import type { Adapter } from '../contracts/adapter.js' +import type { AcquiredJob } from '../contracts/adapter.js' import type { Logger } from '../logger.js' import { Job } from '../job.js' @@ -120,6 +121,12 @@ export interface JobData { * ``` */ groupId?: string + + /** + * Serialized trace context for distributed tracing. + * Injected by OTel plugin at dispatch time. + */ + traceContext?: Record } /** @@ -503,4 +510,16 @@ export interface QueueManagerConfig { * ``` */ jobFactory?: JobFactory + + /** + * Wraps internal adapter operations (Redis, Knex calls) to suppress + * or customize instrumentation. Used by OTel to suppress child spans. + */ + internalOperationWrapper?: (fn: () => Promise) => Promise + + /** + * Wraps job execution to inject tracing context or custom behavior. + * Called around `runtime.execute()` for each job attempt. + */ + executionWrapper?: (fn: () => Promise, job: AcquiredJob, queue: string) => Promise } diff --git a/src/types/tracing_channels.ts b/src/types/tracing_channels.ts new file mode 100644 index 0000000..e2507d8 --- /dev/null +++ b/src/types/tracing_channels.ts @@ -0,0 +1,49 @@ +/* + * @boringnode/queue + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +import type { AcquiredJob } from '../contracts/adapter.js' +import type { JobData } from './main.js' + +/** + * Tracing data structure for job dispatch events. + */ +export type JobDispatchMessage = { + /** The jobs being dispatched (single dispatch = array of one) */ + jobs: JobData[] + + /** Target queue name */ + queue: string + + /** Delay in milliseconds before the job becomes available */ + delay?: number + + /** Error that caused the dispatch to fail */ + error?: Error +} + +/** + * Tracing data structure for job execution events. + */ +export type JobExecuteMessage = { + /** The acquired job being executed */ + job: AcquiredJob + + /** Queue the job was acquired from */ + queue: string + + /** Execution outcome (set in asyncEnd) */ + status?: 'completed' | 'failed' | 'retrying' + + /** Execution duration in milliseconds (set in asyncEnd) */ + duration?: number + + /** Error that caused the failure (set in asyncEnd) */ + error?: Error + + /** When the next retry is scheduled (set in asyncEnd for retrying jobs) */ + nextRetryAt?: Date +} diff --git a/src/worker.ts b/src/worker.ts index a40738a..5316d00 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -5,8 +5,10 @@ import { parse } from './utils.js' import { QueueManager } from './queue_manager.js' import { JobPool } from './job_pool.js' import { JobExecutionRuntime } from './job_runtime.js' +import { dispatchChannel, executeChannel } from './tracing_channels.js' import type { Adapter, AcquiredJob } from './contracts/adapter.js' -import type { JobContext, JobOptions, QueueManagerConfig, WorkerCycle } from './types/main.js' +import type { JobContext, JobOptions, JobRetention, QueueManagerConfig, WorkerCycle } from './types/main.js' +import type { JobDispatchMessage, JobExecuteMessage } from './types/tracing_channels.js' import { Locator } from './locator.js' import { DEFAULT_PRIORITY } from './constants.js' import type { Job } from './job.js' @@ -60,6 +62,7 @@ export class Worker { readonly #onShutdownSignal?: () => void | Promise #adapter!: Adapter + #wrapInternal: (fn: () => Promise) => Promise = (fn) => fn() #running = false #initialized = false #generator?: AsyncGenerator @@ -109,6 +112,7 @@ export class Worker { this.#adapter = QueueManager.use() this.#adapter.setWorkerId(this.#id) + this.#wrapInternal = QueueManager.getInternalOperationWrapper() this.#initialized = true @@ -337,49 +341,58 @@ export class Worker { defaultTimeout: configResolver.getWorkerTimeout(), }) - try { - await runtime.execute(instance, payload, context) - await this.#adapter.completeJob(job.id, queue, retention.removeOnComplete) - - const duration = (performance.now() - startTime).toFixed(2) - debug('worker %s: successfully executed job %s in %dms', this.#id, job.id, duration) - } catch (e) { - const outcome = runtime.resolveFailure(e as Error, job.attempts) - - if (outcome.type === 'failed' && outcome.reason === 'timeout') { - debug('worker %s: job %s timed out and failOnTimeout is set', this.#id, job.id) - await this.#adapter.failJob(job.id, queue, outcome.storageError, retention.removeOnFail) - await instance.failed?.(outcome.hookError) - return - } + const executeMessage: JobExecuteMessage = { job, queue } + + const run = () => { + return executeChannel.tracePromise(async () => { + try { + await runtime.execute(instance, payload, context) + await this.#wrapInternal(() => this.#adapter.completeJob(job.id, queue, retention.removeOnComplete)) + executeMessage.status = 'completed' + debug('worker %s: successfully executed job %s in %dms', this.#id, job.id, (performance.now() - startTime).toFixed(2)) + } catch (e) { + await this.#handleExecutionFailure({ error: e as Error, job, queue, instance, runtime, retention, executeMessage }) + } - if (outcome.type === 'failed' && outcome.reason === 'no-retries') { - debug('worker %s: job %s has no retries configured, marking as failed', this.#id, job.id) - await this.#adapter.failJob(job.id, queue, outcome.storageError, retention.removeOnFail) - await instance.failed?.(outcome.hookError) - return - } + executeMessage.duration = Number((performance.now() - startTime).toFixed(2)) + }, executeMessage) + } - if (outcome.type === 'failed' && outcome.reason === 'max-attempts') { - debug( - 'worker %s: job %s has exceeded max retries (%d), marking as failed', - this.#id, - job.id, - runtime.maxRetries - ) - await this.#adapter.failJob(job.id, queue, outcome.storageError, retention.removeOnFail) - await instance.failed?.(outcome.hookError) - - return - } + const executionWrapper = QueueManager.getExecutionWrapper() + await executionWrapper(run, job, queue) + } - if (outcome.type === 'retry' && outcome.retryAt) { - debug('worker %s: job %s will retry at %s', this.#id, job.id, outcome.retryAt.toISOString()) - await this.#adapter.retryJob(job.id, queue, outcome.retryAt) - return - } + async #handleExecutionFailure(options: { + error: Error + job: AcquiredJob + queue: string + instance: Job + runtime: JobExecutionRuntime + retention: { removeOnComplete?: JobRetention; removeOnFail?: JobRetention } + executeMessage: JobExecuteMessage + }) { + const outcome = options.runtime.resolveFailure(options.error, options.job.attempts) + options.executeMessage.error = options.error + + if (outcome.type === 'failed') { + options.executeMessage.status = 'failed' + await this.#wrapInternal(() => + this.#adapter.failJob(options.job.id, options.queue, outcome.storageError, options.retention.removeOnFail) + ) + await options.instance.failed?.(outcome.hookError) + return + } + + if (outcome.type !== 'retry') return - await this.#adapter.retryJob(job.id, queue) + options.executeMessage.status = 'retrying' + options.executeMessage.nextRetryAt = outcome.retryAt + + if (outcome.retryAt) { + debug('worker %s: job %s will retry at %s', this.#id, options.job.id, outcome.retryAt.toISOString()) + await this.#wrapInternal(() => this.#adapter.retryJob(options.job.id, options.queue, outcome.retryAt)) + } else { + await this.#wrapInternal(() => this.#adapter.retryJob(options.job.id, options.queue)) } } @@ -413,14 +426,14 @@ export class Worker { } catch (error) { debug('worker %s: failed to initialize job %s (%s)', this.#id, job.id, job.name) const retention = QueueManager.getConfigResolver().resolveJobOptions(queue) - await this.#adapter.failJob(job.id, queue, error as Error, retention.removeOnFail) + await this.#wrapInternal(() => this.#adapter.failJob(job.id, queue, error as Error, retention.removeOnFail)) throw error } } async #acquireNextJob(queues: string[]): Promise<{ job: AcquiredJob; queue: string } | null> { for (const queue of queues) { - const job = await this.#adapter.popFrom(queue) + const job = await this.#wrapInternal(() => this.#adapter.popFrom(queue)) if (!job) { continue @@ -444,10 +457,8 @@ export class Worker { this.#lastStalledCheck = now for (const queue of queues) { - const recovered = await this.#adapter.recoverStalledJobs( - queue, - this.#stalledThreshold, - this.#maxStalledCount + const recovered = await this.#wrapInternal(() => + this.#adapter.recoverStalledJobs(queue, this.#stalledThreshold, this.#maxStalledCount) ) if (recovered > 0) { @@ -492,7 +503,7 @@ export class Worker { async #dispatchDueSchedules(): Promise { // Keep claiming due schedules until there are none left while (true) { - const schedule = await this.#adapter.claimDueSchedule() + const schedule = await this.#wrapInternal(() => this.#adapter.claimDueSchedule()) if (!schedule) { break @@ -510,14 +521,18 @@ export class Worker { const JobClass = Locator.get(schedule.name) const queue = JobClass?.options?.queue ?? 'default' - // Dispatch the job to the queue - await this.#adapter.pushOn(queue, { + const jobData = { id: randomUUID(), name: schedule.name, payload: schedule.payload, attempts: 0, priority: JobClass?.options?.priority, - }) + } + + const message: JobDispatchMessage = { jobs: [jobData], queue } + await dispatchChannel.tracePromise(async () => { + await this.#wrapInternal(() => this.#adapter.pushOn(queue, jobData)) + }, message) } } } diff --git a/tests/helpers/setup_tracing.ts b/tests/helpers/setup_tracing.ts new file mode 100644 index 0000000..6677e52 --- /dev/null +++ b/tests/helpers/setup_tracing.ts @@ -0,0 +1,36 @@ +import { trace, context, propagation } from '@opentelemetry/api' +import { + BasicTracerProvider, + InMemorySpanExporter, + SimpleSpanProcessor, +} from '@opentelemetry/sdk-trace-base' +import { AsyncLocalStorageContextManager } from '@opentelemetry/context-async-hooks' +import { W3CTraceContextPropagator } from '@opentelemetry/core' + +let initialized = false +let exporter: InMemorySpanExporter +let provider: BasicTracerProvider + +export function setupTracing() { + if (!initialized) { + exporter = new InMemorySpanExporter() + const contextManager = new AsyncLocalStorageContextManager() + + context.setGlobalContextManager(contextManager.enable()) + propagation.setGlobalPropagator(new W3CTraceContextPropagator()) + provider = new BasicTracerProvider({ spanProcessors: [new SimpleSpanProcessor(exporter)] }) + + trace.setGlobalTracerProvider(provider) + initialized = true + } + + return { exporter, provider } +} + +export function resetSpans() { + exporter?.reset() +} + +export function getFinishedSpans() { + return exporter?.getFinishedSpans() ?? [] +} diff --git a/tests/job_dispatcher.spec.ts b/tests/job_dispatcher.spec.ts index 552d229..7c71311 100644 --- a/tests/job_dispatcher.spec.ts +++ b/tests/job_dispatcher.spec.ts @@ -6,6 +6,25 @@ import { JobDispatcher } from '../src/job_dispatcher.js' import { JobBatchDispatcher } from '../src/job_batch_dispatcher.js' test.group('JobDispatcher', () => { + test('should wrap adapter calls with internalOperationWrapper', async ({ assert }) => { + const sharedAdapter = memory()() + let internalCalls = 0 + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + internalOperationWrapper: async (fn) => { + internalCalls++ + return fn() + }, + }) + + await new JobDispatcher('WrappedJob', { foo: 'bar' }).run() + await new JobBatchDispatcher('WrappedBatchJob', [{ foo: 1 }, { foo: 2 }]).run() + + assert.equal(internalCalls, 2) + }) + test('should dispatch job correctly', async ({ assert }) => { const sharedAdapter = memory()() diff --git a/tests/otel.spec.ts b/tests/otel.spec.ts new file mode 100644 index 0000000..4ac7b86 --- /dev/null +++ b/tests/otel.spec.ts @@ -0,0 +1,389 @@ +import { test } from '@japa/runner' +import { trace, SpanStatusCode, SpanKind } from '@opentelemetry/api' +import { setupTracing, resetSpans, getFinishedSpans } from './helpers/setup_tracing.js' +import { QueueInstrumentation } from '../src/otel.js' +import { dispatchChannel, executeChannel } from '../src/tracing_channels.js' +import type { JobExecuteMessage, JobDispatchMessage } from '../src/types/tracing_channels.js' +import type { AcquiredJob } from '../src/contracts/adapter.js' +import type { JobData } from '../src/types/main.js' + +function makeJob(overrides: Partial = {}): AcquiredJob { + return { + id: 'job-1', + name: 'TestJob', + payload: {}, + attempts: 0, + acquiredAt: Date.now(), + ...overrides, + } +} + +/** + * Creates an instrumentation with a fake QueueManager, + * captures the injected wrappers from the patched init. + */ +async function setupWithWrappers(config: ConstructorParameters[0] = {}) { + const instrumentation = new QueueInstrumentation(config) + instrumentation.enable() + + let capturedConfig: any + const fakeManager = { + init: async (cfg: any) => { capturedConfig = cfg }, + } + + instrumentation.manuallyRegister({ QueueManager: fakeManager }) + await fakeManager.init({ default: 'memory', adapters: {} }) + + return { + instrumentation, + fakeManager, + executionWrapper: capturedConfig.executionWrapper as (fn: () => Promise, job: AcquiredJob, queue: string) => Promise, + internalOperationWrapper: capturedConfig.internalOperationWrapper as (fn: () => Promise) => Promise, + } +} + +test.group('QueueInstrumentation | lifecycle', (group) => { + group.setup(() => { setupTracing() }) + group.each.setup(() => resetSpans()) + + test('enable() is idempotent', ({ assert }) => { + const instrumentation = new QueueInstrumentation() + instrumentation.enable() + instrumentation.enable() + assert.isTrue(instrumentation.isEnabled()) + instrumentation.disable() + }) + + test('disable() cleans up', ({ assert }) => { + const instrumentation = new QueueInstrumentation() + instrumentation.enable() + instrumentation.disable() + assert.isFalse(instrumentation.isEnabled()) + }) + + test('disable() is idempotent', ({ assert }) => { + const instrumentation = new QueueInstrumentation() + instrumentation.enable() + instrumentation.disable() + instrumentation.disable() + assert.isFalse(instrumentation.isEnabled()) + }) + + test('can re-enable after disable', ({ assert }) => { + const instrumentation = new QueueInstrumentation() + instrumentation.enable() + instrumentation.disable() + instrumentation.enable() + assert.isTrue(instrumentation.isEnabled()) + instrumentation.disable() + }) +}) + +test.group('QueueInstrumentation | dispatch via DC', (group) => { + group.setup(() => { setupTracing() }) + group.each.setup(() => resetSpans()) + + test('creates PRODUCER span when no active span', async ({ assert }) => { + const { instrumentation } = await setupWithWrappers() + + const jobData: JobData = { id: 'job-1', name: 'SendEmailJob', payload: {}, attempts: 0 } + const message: JobDispatchMessage = { jobs: [jobData], queue: 'emails' } + await dispatchChannel.tracePromise(async () => {}, message) + + const spans = getFinishedSpans() + assert.lengthOf(spans, 1) + assert.equal(spans[0].name, 'publish emails') + assert.equal(spans[0].kind, SpanKind.PRODUCER) + assert.equal(spans[0].attributes['messaging.system'], 'boringqueue') + assert.equal(spans[0].attributes['messaging.destination.name'], 'emails') + assert.equal(spans[0].attributes['messaging.message.id'], 'job-1') + assert.equal(spans[0].attributes['messaging.job.name'], 'SendEmailJob') + + instrumentation.disable() + }) + + test('creates PRODUCER span as child of active span', async ({ assert }) => { + const { instrumentation } = await setupWithWrappers() + const jobData: JobData = { id: 'job-2', name: 'ProcessJob', payload: {}, attempts: 0 } + + const tracer = trace.getTracer('test') + await tracer.startActiveSpan('http-request', async (parentSpan) => { + const message: JobDispatchMessage = { jobs: [jobData], queue: 'default' } + await dispatchChannel.tracePromise(async () => {}, message) + parentSpan.end() + }) + + const spans = getFinishedSpans() + const parentSpan = spans.find((span) => span.name === 'http-request') + const producerSpan = spans.find((span) => span.name === 'publish default') + assert.lengthOf(spans, 2) + assert.isDefined(parentSpan) + assert.isDefined(producerSpan) + assert.equal(producerSpan!.parentSpanContext?.spanId, parentSpan!.spanContext().spanId) + assert.equal(jobData.traceContext?.traceparent?.split('-')[2], producerSpan!.spanContext().spanId) + + instrumentation.disable() + }) + + test('injects trace context into jobData', async ({ assert }) => { + const { instrumentation } = await setupWithWrappers() + + const jobData: JobData = { id: 'job-3', name: 'TestJob', payload: {}, attempts: 0 } + const message: JobDispatchMessage = { jobs: [jobData], queue: 'default' } + await dispatchChannel.tracePromise(async () => {}, message) + + assert.isDefined(jobData.traceContext) + assert.property(jobData.traceContext!, 'traceparent') + + instrumentation.disable() + }) + + test('includes delay_ms and batch_count attributes', async ({ assert }) => { + const { instrumentation } = await setupWithWrappers() + + const jobData: JobData = { id: 'job-4', name: 'DelayedJob', payload: {}, attempts: 0 } + const message: JobDispatchMessage = { jobs: [jobData], queue: 'default', delay: 5000 } + await dispatchChannel.tracePromise(async () => {}, message) + + const spans = getFinishedSpans() + assert.equal(spans[0].attributes['messaging.job.delay_ms'], 5000) + + instrumentation.disable() + }) + + test('batch dispatch injects trace context into every job', async ({ assert }) => { + const { instrumentation } = await setupWithWrappers() + + const jobs: JobData[] = [ + { id: 'batch-1', name: 'BatchJob', payload: {}, attempts: 0 }, + { id: 'batch-2', name: 'BatchJob', payload: {}, attempts: 0 }, + ] + + await dispatchChannel.tracePromise(async () => {}, { jobs, queue: 'default' }) + + const [span] = getFinishedSpans() + assert.property(jobs[0].traceContext!, 'traceparent') + assert.property(jobs[1].traceContext!, 'traceparent') + assert.equal(jobs[0].traceContext!.traceparent, jobs[1].traceContext!.traceparent) + assert.notProperty(span.attributes, 'messaging.message.id') + assert.equal(span.attributes['messaging.batch.message_count'], 2) + + instrumentation.disable() + }) +}) + +test.group('QueueInstrumentation | execute via executionWrapper', (group) => { + group.setup(() => { setupTracing() }) + group.each.setup(() => resetSpans()) + + test('creates CONSUMER span with semconv attributes', async ({ assert }) => { + const { instrumentation, executionWrapper } = await setupWithWrappers() + + const job = makeJob({ id: 'attr-1', name: 'WorkerJob' }) + const message: JobExecuteMessage = { job, queue: 'default', status: 'completed' } + + await executeChannel.tracePromise(async () => { + await executionWrapper(async () => {}, job, 'default') + }, message) + + const spans = getFinishedSpans() + const span = spans.find((s) => s.kind === SpanKind.CONSUMER) + assert.isDefined(span) + assert.equal(span!.name, 'process default') + assert.equal(span!.attributes['messaging.system'], 'boringqueue') + assert.equal(span!.attributes['messaging.operation.name'], 'process') + assert.equal(span!.attributes['messaging.destination.name'], 'default') + assert.equal(span!.attributes['messaging.message.id'], 'attr-1') + assert.equal(span!.attributes['messaging.job.name'], 'WorkerJob') + assert.equal(span!.attributes['entry_point.type'], 'job') + + instrumentation.disable() + }) + + test('completed job sets OK status', async ({ assert }) => { + const { instrumentation, executionWrapper } = await setupWithWrappers() + + const job = makeJob({ id: 'ok-1' }) + const message: JobExecuteMessage = { job, queue: 'default', status: 'completed' } + + await executeChannel.tracePromise(async () => { + await executionWrapper(async () => {}, job, 'default') + }, message) + + const spans = getFinishedSpans() + const span = spans.find((s) => s.kind === SpanKind.CONSUMER) + assert.equal(span!.status.code, SpanStatusCode.OK) + assert.equal(span!.attributes['messaging.job.status'], 'completed') + + instrumentation.disable() + }) + + test('failed job sets ERROR status', async ({ assert }) => { + const { instrumentation, executionWrapper } = await setupWithWrappers() + + const job = makeJob({ id: 'fail-1' }) + const error = new Error('Job crashed') + const message: JobExecuteMessage = { job, queue: 'default', status: 'failed', error } + + await executeChannel.tracePromise(async () => { + await executionWrapper(async () => {}, job, 'default') + }, message) + + const spans = getFinishedSpans() + const span = spans.find((s) => s.kind === SpanKind.CONSUMER) + assert.equal(span!.status.code, SpanStatusCode.ERROR) + + instrumentation.disable() + }) + + test('retrying job records exception and retry event', async ({ assert }) => { + const { instrumentation, executionWrapper } = await setupWithWrappers() + + const job = makeJob({ id: 'retry-1' }) + const message: JobExecuteMessage = { + job, + queue: 'default', + status: 'retrying', + error: new Error('Transient'), + nextRetryAt: new Date('2025-01-01T00:00:00Z'), + } + + await executeChannel.tracePromise(async () => { + await executionWrapper(async () => {}, job, 'default') + }, message) + + const spans = getFinishedSpans() + const span = spans.find((s) => s.kind === SpanKind.CONSUMER) + assert.equal(span!.status.code, SpanStatusCode.OK) + assert.equal(span!.attributes['messaging.job.status'], 'retrying') + + const retryEvent = span!.events.find((e) => e.name === 'messaging.retry') + assert.isDefined(retryEvent) + + instrumentation.disable() + }) + + test('child spans are parented to consumer span', async ({ assert }) => { + const { instrumentation, executionWrapper } = await setupWithWrappers() + + const job = makeJob({ id: 'ctx-1' }) + const message: JobExecuteMessage = { job, queue: 'default', status: 'completed' } + + await executeChannel.tracePromise(async () => { + await executionWrapper(async () => { + const tracer = trace.getTracer('test-child') + const childSpan = tracer.startSpan('child-operation') + childSpan.end() + }, job, 'default') + }, message) + + const spans = getFinishedSpans() + const consumerSpan = spans.find((s) => s.name === 'process default') + const childSpan = spans.find((s) => s.name === 'child-operation') + + assert.isDefined(consumerSpan) + assert.isDefined(childSpan) + assert.equal(childSpan!.parentSpanContext?.spanId, consumerSpan!.spanContext().spanId) + + instrumentation.disable() + }) +}) + +test.group('QueueInstrumentation | trace linking', (group) => { + group.setup(() => { setupTracing() }) + group.each.setup(() => resetSpans()) + + test('link mode links consumer to dispatch trace', async ({ assert }) => { + const { instrumentation, executionWrapper } = await setupWithWrappers() + + const jobData: JobData = { id: 'link-1', name: 'LinkedJob', payload: {}, attempts: 0 } + await dispatchChannel.tracePromise(async () => {}, { jobs: [jobData], queue: 'default' }) + + const dispatchTraceId = getFinishedSpans()[0].spanContext().traceId + + const job = makeJob({ id: 'link-1', name: 'LinkedJob', traceContext: jobData.traceContext }) + const message: JobExecuteMessage = { job, queue: 'default', status: 'completed' } + + await executeChannel.tracePromise(async () => { + await executionWrapper(async () => {}, job, 'default') + }, message) + + const consumerSpan = getFinishedSpans().find((s) => s.kind === SpanKind.CONSUMER) + assert.isNotEmpty(consumerSpan!.links) + assert.equal(consumerSpan!.links[0].context.traceId, dispatchTraceId) + assert.notEqual(consumerSpan!.spanContext().traceId, dispatchTraceId) + + instrumentation.disable() + }) + + test('parent mode makes consumer child of dispatch', async ({ assert }) => { + const { instrumentation, executionWrapper } = await setupWithWrappers({ executionSpanLinkMode: 'parent' }) + + const jobData: JobData = { id: 'parent-1', name: 'ParentedJob', payload: {}, attempts: 0 } + await dispatchChannel.tracePromise(async () => {}, { jobs: [jobData], queue: 'default' }) + + const dispatchTraceId = getFinishedSpans()[0].spanContext().traceId + + const job = makeJob({ id: 'parent-1', name: 'ParentedJob', traceContext: jobData.traceContext }) + const message: JobExecuteMessage = { job, queue: 'default', status: 'completed' } + + await executeChannel.tracePromise(async () => { + await executionWrapper(async () => {}, job, 'default') + }, message) + + const consumerSpan = getFinishedSpans().find((s) => s.kind === SpanKind.CONSUMER) + assert.equal(consumerSpan!.spanContext().traceId, dispatchTraceId) + assert.isEmpty(consumerSpan!.links) + + instrumentation.disable() + }) +}) + +test.group('QueueInstrumentation | manuallyRegister', (group) => { + group.setup(() => { setupTracing() }) + group.each.setup(() => resetSpans()) + + test('patches init to inject wrappers', async ({ assert }) => { + const { executionWrapper, internalOperationWrapper } = await setupWithWrappers() + + assert.isFunction(executionWrapper) + assert.isFunction(internalOperationWrapper) + }) + + test('internalOperationWrapper suppresses tracing', async ({ assert }) => { + const { instrumentation, internalOperationWrapper } = await setupWithWrappers() + + const tracer = trace.getTracer('test') + await tracer.startActiveSpan('parent', async (parentSpan) => { + await internalOperationWrapper(async () => { + const suppressed = tracer.startSpan('should-be-suppressed') + suppressed.end() + }) + parentSpan.end() + }) + + const spans = getFinishedSpans() + const suppressedSpan = spans.find((s) => s.name === 'should-be-suppressed') + assert.isUndefined(suppressedSpan) + + instrumentation.disable() + }) +}) + +test.group('QueueInstrumentation | custom config', (group) => { + group.setup(() => { setupTracing() }) + group.each.setup(() => resetSpans()) + + test('custom messagingSystem attribute', async ({ assert }) => { + const { instrumentation } = await setupWithWrappers({ messagingSystem: 'my-queue' }) + + const jobData: JobData = { id: 'custom-1', name: 'Job', payload: {}, attempts: 0 } + await dispatchChannel.tracePromise(async () => {}, { jobs: [jobData], queue: 'default' }) + + const spans = getFinishedSpans() + const span = spans.find((s) => s.attributes['messaging.system'] === 'my-queue') + assert.isDefined(span) + + instrumentation.disable() + }) +}) diff --git a/tsup.config.ts b/tsup.config.ts index 697f5a8..8325a6e 100644 --- a/tsup.config.ts +++ b/tsup.config.ts @@ -1,7 +1,13 @@ import { defineConfig } from 'tsup' export default defineConfig({ - entry: ['./index.ts', './src/types/*.ts', './src/drivers/*.ts', './src/contracts/*.ts'], + entry: [ + './index.ts', + './src/otel.ts', + './src/types/*.ts', + './src/drivers/*.ts', + './src/contracts/*.ts', + ], outDir: './build', clean: true, format: 'esm', diff --git a/yarn.lock b/yarn.lock index d9247fe..c0f8484 100644 --- a/yarn.lock +++ b/yarn.lock @@ -29,6 +29,11 @@ __metadata: "@japa/file-system": "npm:^3.0.0" "@japa/runner": "npm:^5.3.0" "@lukeed/ms": "npm:^2.0.2" + "@opentelemetry/api": "npm:^1.9.0" + "@opentelemetry/context-async-hooks": "npm:^2.6.0" + "@opentelemetry/core": "npm:^2.6.0" + "@opentelemetry/instrumentation": "npm:^0.213.0" + "@opentelemetry/sdk-trace-base": "npm:^2.6.0" "@poppinss/ts-exec": "npm:^1.4.4" "@poppinss/utils": "npm:^7.0.1" "@types/better-sqlite3": "npm:^7.6.13" @@ -49,9 +54,18 @@ __metadata: tsup: "npm:^8.5.1" typescript: "npm:^5.9.3" peerDependencies: + "@opentelemetry/api": ^1.9.0 + "@opentelemetry/core": ^1.30.0 || ^2.0.0 + "@opentelemetry/instrumentation": ^0.200.0 ioredis: ^5.0.0 knex: ^3.0.0 peerDependenciesMeta: + "@opentelemetry/api": + optional: true + "@opentelemetry/core": + optional: true + "@opentelemetry/instrumentation": + optional: true ioredis: optional: true knex: @@ -915,6 +929,87 @@ __metadata: languageName: node linkType: hard +"@opentelemetry/api-logs@npm:0.213.0": + version: 0.213.0 + resolution: "@opentelemetry/api-logs@npm:0.213.0" + dependencies: + "@opentelemetry/api": "npm:^1.3.0" + checksum: 10c0/74421639d518b6d060aae8b0693feb6e7bcf2674951dfadc5ad111fc7285b1ba403010fbf2c5cd82b84267c31faef0bb72e4d61c008c5ec572471a0100e5c493 + languageName: node + linkType: hard + +"@opentelemetry/api@npm:^1.3.0, @opentelemetry/api@npm:^1.9.0": + version: 1.9.0 + resolution: "@opentelemetry/api@npm:1.9.0" + checksum: 10c0/9aae2fe6e8a3a3eeb6c1fdef78e1939cf05a0f37f8a4fae4d6bf2e09eb1e06f966ece85805626e01ba5fab48072b94f19b835449e58b6d26720ee19a58298add + languageName: node + linkType: hard + +"@opentelemetry/context-async-hooks@npm:^2.6.0": + version: 2.6.0 + resolution: "@opentelemetry/context-async-hooks@npm:2.6.0" + peerDependencies: + "@opentelemetry/api": ">=1.0.0 <1.10.0" + checksum: 10c0/c36e087d2a097cb8a5e397d15bee07e941ee53ff9f1f9b438028d017536f635021f9b36c7dc5c96a65189e81a17eccc718d3588d26aa29bbacf24a1c22cbc767 + languageName: node + linkType: hard + +"@opentelemetry/core@npm:2.6.0, @opentelemetry/core@npm:^2.6.0": + version: 2.6.0 + resolution: "@opentelemetry/core@npm:2.6.0" + dependencies: + "@opentelemetry/semantic-conventions": "npm:^1.29.0" + peerDependencies: + "@opentelemetry/api": ">=1.0.0 <1.10.0" + checksum: 10c0/526854a3d8917c82b41bfea6ed48f2e9ae38705d1758710b86f879e5c4910b9dbe7fa03d36205e98ebebe4854ae781116d8f298a10cd0fe2e51138e75926ec3a + languageName: node + linkType: hard + +"@opentelemetry/instrumentation@npm:^0.213.0": + version: 0.213.0 + resolution: "@opentelemetry/instrumentation@npm:0.213.0" + dependencies: + "@opentelemetry/api-logs": "npm:0.213.0" + import-in-the-middle: "npm:^3.0.0" + require-in-the-middle: "npm:^8.0.0" + peerDependencies: + "@opentelemetry/api": ^1.3.0 + checksum: 10c0/fd62fefb278b2609a8cb67f39bc0967c6e70a2501e0bd21647071a935654df6ac320f784a8babb69ff0502edc15d5980009836ec55a89280c6b3d5c5baf4b455 + languageName: node + linkType: hard + +"@opentelemetry/resources@npm:2.6.0": + version: 2.6.0 + resolution: "@opentelemetry/resources@npm:2.6.0" + dependencies: + "@opentelemetry/core": "npm:2.6.0" + "@opentelemetry/semantic-conventions": "npm:^1.29.0" + peerDependencies: + "@opentelemetry/api": ">=1.3.0 <1.10.0" + checksum: 10c0/9c75654690c0917be948ed18453f3085a54541f0db8c8b728515f5a26b67c5fc1f6acd2644462e17368045e3c3fd65f728e8bd0a19c31fe12cc443fa0f0f058c + languageName: node + linkType: hard + +"@opentelemetry/sdk-trace-base@npm:^2.6.0": + version: 2.6.0 + resolution: "@opentelemetry/sdk-trace-base@npm:2.6.0" + dependencies: + "@opentelemetry/core": "npm:2.6.0" + "@opentelemetry/resources": "npm:2.6.0" + "@opentelemetry/semantic-conventions": "npm:^1.29.0" + peerDependencies: + "@opentelemetry/api": ">=1.3.0 <1.10.0" + checksum: 10c0/92363197f17d37adf206299da95f683fc039a77f9477caad4bbd6599561cab699c29a1a72830cb1271d7ffc50cd2ed061e85e92f69d06e3fa1946bb742a41ce2 + languageName: node + linkType: hard + +"@opentelemetry/semantic-conventions@npm:^1.29.0": + version: 1.40.0 + resolution: "@opentelemetry/semantic-conventions@npm:1.40.0" + checksum: 10c0/3259de0ea11b52eb70e44c12eba21448392baf9cb74c37b62071c4a5ed7fb89b61e194f3898d40ac6bfa7293617a0e132876cb6e355472b66de0cdb13c50b529 + languageName: node + linkType: hard + "@oxfmt/binding-android-arm-eabi@npm:0.36.0": version: 0.36.0 resolution: "@oxfmt/binding-android-arm-eabi@npm:0.36.0" @@ -1747,6 +1842,15 @@ __metadata: languageName: node linkType: hard +"acorn-import-attributes@npm:^1.9.5": + version: 1.9.5 + resolution: "acorn-import-attributes@npm:1.9.5" + peerDependencies: + acorn: ^8 + checksum: 10c0/5926eaaead2326d5a86f322ff1b617b0f698aa61dc719a5baa0e9d955c9885cc71febac3fb5bacff71bbf2c4f9c12db2056883c68c53eb962c048b952e1e013d + languageName: node + linkType: hard + "acorn@npm:^8.15.0": version: 8.15.0 resolution: "acorn@npm:8.15.0" @@ -2110,6 +2214,13 @@ __metadata: languageName: node linkType: hard +"cjs-module-lexer@npm:^2.2.0": + version: 2.2.0 + resolution: "cjs-module-lexer@npm:2.2.0" + checksum: 10c0/aec4ca58f87145fac221386790ecaae8b012f2e2359a45acb61d8c75ea4fa84f6ea869f17abc1a7e91a808eff0fed581209632f03540de16f72f0a28f5fd35ac + languageName: node + linkType: hard + "cli-cursor@npm:^5.0.0": version: 5.0.0 resolution: "cli-cursor@npm:5.0.0" @@ -2266,7 +2377,7 @@ __metadata: languageName: node linkType: hard -"debug@npm:4, debug@npm:^4.3.4, debug@npm:^4.4.0": +"debug@npm:4, debug@npm:^4.3.4, debug@npm:^4.3.5, debug@npm:^4.4.0": version: 4.4.3 resolution: "debug@npm:4.4.3" dependencies: @@ -3047,6 +3158,18 @@ __metadata: languageName: node linkType: hard +"import-in-the-middle@npm:^3.0.0": + version: 3.0.0 + resolution: "import-in-the-middle@npm:3.0.0" + dependencies: + acorn: "npm:^8.15.0" + acorn-import-attributes: "npm:^1.9.5" + cjs-module-lexer: "npm:^2.2.0" + module-details-from-path: "npm:^1.0.4" + checksum: 10c0/8533418c271416f185d09b880c86d4d7668f788f7ec8679338bda0d1789c99244a4915d1d580d74e919f145cf63efdf2e76877584fbace034c6ced4ac651d126 + languageName: node + linkType: hard + "imurmurhash@npm:^0.1.4": version: 0.1.4 resolution: "imurmurhash@npm:0.1.4" @@ -3740,6 +3863,13 @@ __metadata: languageName: node linkType: hard +"module-details-from-path@npm:^1.0.3, module-details-from-path@npm:^1.0.4": + version: 1.0.4 + resolution: "module-details-from-path@npm:1.0.4" + checksum: 10c0/10863413e96dab07dee917eae07afe46f7bf853065cc75a7d2a718adf67574857fb64f8a2c0c9af12ac733a9a8cf652db7ed39b95f7a355d08106cb9cc50c83b + languageName: node + linkType: hard + "ms@npm:2.1.2": version: 2.1.2 resolution: "ms@npm:2.1.2" @@ -4765,6 +4895,16 @@ __metadata: languageName: node linkType: hard +"require-in-the-middle@npm:^8.0.0": + version: 8.0.1 + resolution: "require-in-the-middle@npm:8.0.1" + dependencies: + debug: "npm:^4.3.5" + module-details-from-path: "npm:^1.0.3" + checksum: 10c0/4b3d29adfff873585dceffa9ddb8f33bb6599001ddff758503e0e5ade2ae6d20d691314125bb13679fa75a19893338e11953d4702dd2fea181e95c0f8316b29b + languageName: node + linkType: hard + "resolve-from@npm:^5.0.0": version: 5.0.0 resolution: "resolve-from@npm:5.0.0"