From 0160f51cc1bd96707fc60458c5476c8945cfcc36 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Sun, 10 May 2026 13:45:17 +0900 Subject: [PATCH 1/9] Add OpenTelemetry queue task metrics Issue #740 asked for metrics covering Fedify's enqueue and worker boundaries for inbox, outbox, and fanout work. Backlog depth from MessageQueue.getDepth() answers "how much is waiting"; these metrics answer "how fast are workers draining it, how often are they failing, and how many tasks are running right now in this process". This change adds six instruments to FederationMetrics: fedify.queue.task.enqueued Counter, every Fedify enqueue including retries. fedify.queue.task.started Counter, when a worker begins. fedify.queue.task.completed Counter, when processing returns without throwing. fedify.queue.task.failed Counter, when processing throws a non-abort error. fedify.queue.task.duration Histogram in ms. fedify.queue.task.in_flight UpDownCounter, process-local; the in-flight series intentionally omits per-message attributes so increments and decrements pair up cleanly. Common attributes are fedify.queue.role, fedify.queue.backend (queue.constructor.name verbatim, omitted for plain object literals), fedify.queue.native_retrial, and activitypub.activity.type when available. The enqueue counter additionally carries fedify.queue.task.attempt; the completion-side instruments carry fedify.queue.task.result (completed, failed, or aborted). Aborts are detected via error?.name === "AbortError". Instrumentation points: - Initial inbox enqueue in routeActivity(). - Initial outbox enqueue in sendActivityInternal() and queued forwardActivityInternal(). - Initial fanout enqueue in Context.sendActivity(). - Retry re-enqueues in #listenOutboxMessage() and #listenInboxMessage(), guarded so the metric only fires when the enqueue actually happens. - All three branches of processQueuedTask() are wrapped with started/in-flight/completed-or-failed-or-aborted/duration. #listenInboxMessage() now exposes the parsed activity type to its caller via an onActivityType callback so processQueuedTask can attach activitypub.activity.type to the inbox-task completion measurements. When a queue does not declare nativeRetrial and Fedify catches an inbox listener or outbox delivery error to schedule a retry, the worker boundary records result=completed; the retry is observable as a new fedify.queue.task.enqueued measurement with non-zero fedify.queue.task.attempt, and outbox-side delivery failures remain observable through activitypub.delivery.* and the activitypub.delivery.failed span event. The OpenTelemetry manual page now documents this contrast explicitly. Tests cover success, native-retrial failure, internally-handled retry, abort, and forwardActivity enqueue paths. CHANGES.md, the OpenTelemetry manual, and the message queue manual are updated. https://github.com/fedify-dev/fedify/issues/316 Closes https://github.com/fedify-dev/fedify/issues/740 Assisted-by: Claude Code:claude-opus-4-7 Assisted-by: Codex:gpt-5.2-codex --- CHANGES.md | 22 + docs/manual/mq.md | 8 + docs/manual/opentelemetry.md | 83 +++- .../fedify/src/federation/handler.test.ts | 97 ++++ packages/fedify/src/federation/inbox.ts | 4 + packages/fedify/src/federation/metrics.ts | 157 ++++++ .../fedify/src/federation/middleware.test.ts | 458 ++++++++++++++++++ packages/fedify/src/federation/middleware.ts | 198 ++++++-- 8 files changed, 980 insertions(+), 47 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index ba213c2b2..6f628b0ec 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -51,10 +51,32 @@ To be released. deliberately exclude raw URLs, query strings, and identifier values to keep cardinality bounded. [[#316], [#736], [#757]] + - Added OpenTelemetry queue task metrics covering Fedify's enqueue and + worker boundaries for inbox, outbox, and fanout work: + `fedify.queue.task.enqueued` (Counter), `fedify.queue.task.started` + (Counter), `fedify.queue.task.completed` (Counter), + `fedify.queue.task.failed` (Counter), `fedify.queue.task.duration` + (Histogram), and `fedify.queue.task.in_flight` (UpDownCounter, process + local). Instruments carry `fedify.queue.role`, best-effort + `fedify.queue.backend` (the queue implementation's constructor name), + and `fedify.queue.native_retrial`. The enqueue/started/completed/ + failed/duration instruments additionally carry + `activitypub.activity.type` whenever Fedify knows the activity type + for the queued message; the in-flight UpDownCounter deliberately + omits per-message attributes so that increment and decrement + operations always pair up cleanly per attribute series. Enqueue + measurements additionally carry `fedify.queue.task.attempt` for + retries, and the completion-side instruments carry + `fedify.queue.task.result` (`completed`, `failed`, or `aborted`). + Together with `MessageQueue.getDepth()` reporting, these metrics let + operators distinguish a slow-draining queue from a queue that sees + less traffic. [[#316], [#740]] + [#316]: https://github.com/fedify-dev/fedify/issues/316 [#619]: https://github.com/fedify-dev/fedify/issues/619 [#735]: https://github.com/fedify-dev/fedify/issues/735 [#736]: https://github.com/fedify-dev/fedify/issues/736 +[#740]: https://github.com/fedify-dev/fedify/issues/740 [#748]: https://github.com/fedify-dev/fedify/pull/748 [#752]: https://github.com/fedify-dev/fedify/issues/752 [#753]: https://github.com/fedify-dev/fedify/pull/753 diff --git a/docs/manual/mq.md b/docs/manual/mq.md index db78998d3..e2948967d 100644 --- a/docs/manual/mq.md +++ b/docs/manual/mq.md @@ -1023,6 +1023,14 @@ outbox, and fanout work, observability code should report that queue once as a shared queue. Reporting the same `getDepth()` result separately for each logical role would double- or triple-count the backlog. +Queue depth covers only the *backend* side of the queue. To see what +Fedify's workers are doing with the dequeued messages — enqueue rate, task +processing duration, completion versus failure, and how many tasks are in +flight per process — read the matching [`fedify.queue.task.*` OpenTelemetry +metrics](./opentelemetry.md#instrumented-metrics). Backlog depth and task +throughput together let you tell a slowly draining queue apart from one +that simply sees less traffic. + [^amqp-depth]: `AmqpMessageQueue` can count the configured ready queues and delayed queues created by the same `AmqpMessageQueue` instance. AMQP 0-9-1 does not provide a portable queue-listing API, so diff --git a/docs/manual/opentelemetry.md b/docs/manual/opentelemetry.md index e6288ba9c..7cbe70e08 100644 --- a/docs/manual/opentelemetry.md +++ b/docs/manual/opentelemetry.md @@ -296,15 +296,21 @@ Instrumented metrics Fedify records the following OpenTelemetry metrics: -| Metric name | Instrument | Unit | Description | -| -------------------------------------------- | ---------- | ----------- | --------------------------------------------------------------- | -| `activitypub.delivery.sent` | Counter | `{attempt}` | Counts outgoing ActivityPub delivery attempts. | -| `activitypub.delivery.permanent_failure` | Counter | `{failure}` | Counts outgoing deliveries abandoned as permanent failures. | -| `activitypub.delivery.duration` | Histogram | `ms` | Measures outgoing ActivityPub delivery attempt duration. | -| `activitypub.inbox.processing_duration` | Histogram | `ms` | Measures inbox listener processing duration. | -| `activitypub.signature.verification_failure` | Counter | `{failure}` | Counts failed signature verification for inbox requests. | -| `fedify.http.server.request.count` | Counter | `{request}` | Counts inbound HTTP requests handled by `Federation.fetch()`. | -| `fedify.http.server.request.duration` | Histogram | `ms` | Measures inbound HTTP request duration in `Federation.fetch()`. | +| Metric name | Instrument | Unit | Description | +| -------------------------------------------- | ------------- | ----------- | --------------------------------------------------------------- | +| `activitypub.delivery.sent` | Counter | `{attempt}` | Counts outgoing ActivityPub delivery attempts. | +| `activitypub.delivery.permanent_failure` | Counter | `{failure}` | Counts outgoing deliveries abandoned as permanent failures. | +| `activitypub.delivery.duration` | Histogram | `ms` | Measures outgoing ActivityPub delivery attempt duration. | +| `activitypub.inbox.processing_duration` | Histogram | `ms` | Measures inbox listener processing duration. | +| `activitypub.signature.verification_failure` | Counter | `{failure}` | Counts failed signature verification for inbox requests. | +| `fedify.http.server.request.count` | Counter | `{request}` | Counts inbound HTTP requests handled by `Federation.fetch()`. | +| `fedify.http.server.request.duration` | Histogram | `ms` | Measures inbound HTTP request duration in `Federation.fetch()`. | +| `fedify.queue.task.enqueued` | Counter | `{task}` | Counts inbox, outbox, and fanout tasks Fedify enqueued. | +| `fedify.queue.task.started` | Counter | `{task}` | Counts queue tasks Fedify began processing as a worker. | +| `fedify.queue.task.completed` | Counter | `{task}` | Counts queue tasks Fedify finished processing without throwing. | +| `fedify.queue.task.failed` | Counter | `{task}` | Counts queue tasks Fedify abandoned because processing threw. | +| `fedify.queue.task.duration` | Histogram | `ms` | Measures queue task processing duration in Fedify workers. | +| `fedify.queue.task.in_flight` | UpDownCounter | `{task}` | Tracks queue tasks currently in flight in this Fedify process. | ### Metric attributes @@ -339,6 +345,60 @@ Fedify records the following OpenTelemetry metrics: parameter names (for example `/users/{identifier}`) rather than the matched parameter values. +`fedify.queue.task.enqueued`, `fedify.queue.task.started`, +`fedify.queue.task.completed`, `fedify.queue.task.failed`, and +`fedify.queue.task.duration` +: `fedify.queue.role` (`inbox`, `outbox`, or `fanout`) is always present. + `fedify.queue.backend` is the queue implementation's constructor name + (for example `RedisMessageQueue`) when available; it is omitted for + queues whose constructor is the plain `Object` (for example, + `MessageQueue` instances built from an object literal). + `fedify.queue.native_retrial` reflects the queue backend's `nativeRetrial` + flag when set on the queue. `activitypub.activity.type` is recorded + whenever Fedify knows the activity type for the queued message; for inbox + tasks the type only becomes available after the activity is parsed, so the + *started* counter for inbox tasks may be recorded without it. + `fedify.queue.task.enqueued` additionally carries a zero-based + `fedify.queue.task.attempt` so that retry re-enqueues are distinguishable + from initial enqueues. `fedify.queue.task.completed`, + `fedify.queue.task.failed`, and `fedify.queue.task.duration` carry + `fedify.queue.task.result`, which is `completed` when processing returned + without throwing, `failed` when the worker re-threw a non-abort error, and + `aborted` when the worker re-threw an `AbortError` (for example, because a + graceful-shutdown `AbortSignal` interrupted processing). When the queue + backend does not declare `nativeRetrial`, Fedify catches inbox listener and + outbox delivery errors itself; if its retry policy still allows another + attempt, it schedules a retry by re-enqueuing the message and returns from + the worker without re-throwing, so the worker boundary records + `result=completed`. When the retry policy gives up, the worker also + returns normally (`result=completed`) without scheduling a retry. + Outbox-side activity failures remain observable through the + `activitypub.delivery.*` metrics and the `activitypub.delivery.failed` + span event, and any retry attempt — inbox or outbox — appears as a + `fedify.queue.task.enqueued` measurement with a non-zero + `fedify.queue.task.attempt`. Inbox listener errors that the retry policy + abandons are visible through error logs and the inbox span's error status, + but not through a dedicated metric. + +`fedify.queue.task.in_flight` +: `fedify.queue.role` and `fedify.queue.backend` (when available), plus + `fedify.queue.native_retrial` when set on the queue. Per-message + attributes such as `activitypub.activity.type`, + `fedify.queue.task.attempt`, and `fedify.queue.task.result` are + deliberately omitted so that increment and decrement operations always + pair up cleanly per attribute series. This UpDownCounter is + process-local: it tracks tasks currently being processed *in this + Fedify process*, not cross-process totals. Aggregate it across + replicas in your metrics backend. + +The `fedify.queue.task.*` metrics describe what Fedify's workers do with +queued messages. They complement the backend-side +[`MessageQueue.getDepth()` API](./mq.md#queue-depth-reporting), which +reports how many messages are currently waiting in the queue backend. +Reading both signals together — task throughput plus backlog depth — +makes it possible to distinguish a small, slow queue from a large, fast +one and to set alerting thresholds for delivery latency under load. + Fedify records `activitypub.remote.host` as the URL hostname only; ports, paths, and query strings are deliberately excluded to keep metric cardinality bounded. Activity types use the same qualified URI form as Fedify's trace attributes, @@ -407,6 +467,11 @@ for ActivityPub: | `fedify.object.values.{parameter}` | string[] | The argument values of the object dispatcher. | `["1", "2"]` | | `fedify.collection.cursor` | string | The cursor of the collection. | `"eyJpZCI6IjEiLCJ0eXBlIjoiT3JkZXJlZENvbGxlY3Rpb24ifQ=="` | | `fedify.collection.items` | number | The number of items in the collection page. It can be less than the total items. | `10` | +| `fedify.queue.role` | string | The Fedify queue role for the task: `inbox`, `outbox`, or `fanout`. | `"outbox"` | +| `fedify.queue.backend` | string | The queue implementation's constructor name (best-effort backend identifier). | `"RedisMessageQueue"` | +| `fedify.queue.native_retrial` | boolean | Whether the queue backend declares `nativeRetrial`, meaning Fedify defers retry handling to the backend. | `true` | +| `fedify.queue.task.attempt` | int | The zero-based attempt number recorded on `fedify.queue.task.enqueued`; non-zero for retry re-enqueues. | `1` | +| `fedify.queue.task.result` | string | The terminal outcome of queue task processing: `completed`, `failed`, or `aborted`. | `"failed"` | | `http.redirect.url` | string | The redirect URL when a document fetch results in a redirect. | `"https://example.com/new-location"` | | `http.response.status_code` | int | The HTTP response status code. | `200` | | `http_signatures.signature` | string | The signature of the HTTP request in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | diff --git a/packages/fedify/src/federation/handler.test.ts b/packages/fedify/src/federation/handler.test.ts index 95dc2b144..0ea50f429 100644 --- a/packages/fedify/src/federation/handler.test.ts +++ b/packages/fedify/src/federation/handler.test.ts @@ -57,6 +57,7 @@ import { import { ActivityListenerSet } from "./activity-listener.ts"; import { MemoryKvStore } from "./kv.ts"; import { createFederation } from "./middleware.ts"; +import type { MessageQueue } from "./mq.ts"; const QUOTE_CONTEXT_TERMS = { QuoteAuthorization: "https://w3id.org/fep/044f#QuoteAuthorization", @@ -2453,6 +2454,102 @@ test("handleInbox() records OpenTelemetry span events", async () => { ); }); +test("handleInbox() records fedify.queue.task.enqueued when queued", async () => { + const [meterProvider, recorder] = createTestMeterProvider(); + const kv = new MemoryKvStore(); + const federation = createFederation({ + kv, + meterProvider, + }); + + const activity = new Create({ + id: new URL("https://example.com/activities/queued"), + actor: new URL("https://example.com/users/someone"), + object: new Note({ + id: new URL("https://example.com/note-queued"), + content: "Queue me up", + }), + }); + + const request = new Request("https://example.com/users/someone/inbox", { + method: "POST", + headers: { "Content-Type": "application/activity+json" }, + body: JSON.stringify(await activity.toJsonLd()), + }); + const signed = await signRequest( + request, + rsaPrivateKey3, + new URL("https://example.com/users/someone#main-key"), + ); + + const context = createRequestContext({ + federation, + request: signed, + url: new URL(signed.url), + data: undefined, + documentLoader: mockDocumentLoader, + contextLoader: mockDocumentLoader, + getActorUri(identifier: string) { + return new URL(`https://example.com/users/${identifier}`); + }, + }); + + const actorDispatcher: ActorDispatcher = (ctx, identifier) => { + if (identifier !== "someone") return null; + return new Person({ + id: ctx.getActorUri(identifier), + name: "Someone", + inbox: new URL("https://example.com/users/someone/inbox"), + publicKey: rsaPublicKey2, + }); + }; + + const queuedMessages: unknown[] = []; + const queue: MessageQueue = { + enqueue(message, _options) { + queuedMessages.push(message); + return Promise.resolve(); + }, + listen(_handler, _options) { + return Promise.resolve(); + }, + }; + + const response = await handleInbox(signed, { + recipient: "someone", + context, + inboxContextFactory(_activity) { + return createInboxContext({ ...context, clone: undefined }); + }, + kv, + kvPrefixes: { + activityIdempotence: ["activityIdempotence"], + publicKey: ["publicKey"], + acceptSignatureNonce: ["acceptSignatureNonce"], + }, + actorDispatcher, + inboxListeners: new ActivityListenerSet>(), + inboxErrorHandler: undefined, + onNotFound: (_request) => new Response("Not found", { status: 404 }), + signatureTimeWindow: false, + skipSignatureVerification: true, + queue, + meterProvider, + }); + + assertEquals(response.status, 202); + assertEquals(queuedMessages.length, 1); + + const enqueued = recorder.getMeasurements("fedify.queue.task.enqueued"); + assertEquals(enqueued.length, 1); + assertEquals(enqueued[0].attributes["fedify.queue.role"], "inbox"); + assertEquals(enqueued[0].attributes["fedify.queue.task.attempt"], 0); + assertEquals( + enqueued[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); +}); + test("handleInbox() records unverified HTTP signature details", async () => { const [tracerProvider, exporter] = createTestTracerProvider(); const [meterProvider, recorder] = createTestMeterProvider(); diff --git a/packages/fedify/src/federation/inbox.ts b/packages/fedify/src/federation/inbox.ts index db66692a8..955e2b5b5 100644 --- a/packages/fedify/src/federation/inbox.ts +++ b/packages/fedify/src/federation/inbox.ts @@ -178,6 +178,10 @@ export async function routeActivity( }); throw error; } + getFederationMetrics(meterProvider).recordQueueTaskEnqueued( + { role: "inbox", queue, activityType: getTypeId(activity).href }, + 0, + ); logger.info( "Activity {activityId} is enqueued.", { activityId: activity.id?.href, activity: json, recipient }, diff --git a/packages/fedify/src/federation/metrics.ts b/packages/fedify/src/federation/metrics.ts index 2decbc9c9..119ae357e 100644 --- a/packages/fedify/src/federation/metrics.ts +++ b/packages/fedify/src/federation/metrics.ts @@ -4,8 +4,32 @@ import { type Histogram, type MeterProvider, metrics, + type UpDownCounter, } from "@opentelemetry/api"; import metadata from "../../deno.json" with { type: "json" }; +import type { MessageQueue } from "./mq.ts"; + +/** + * The role of a queued task, derived from the queued message's `type` field. + * @since 2.3.0 + */ +export type QueueTaskRole = "fanout" | "outbox" | "inbox"; + +/** + * The terminal result of a queued task processing attempt. + * @since 2.3.0 + */ +export type QueueTaskResult = "completed" | "failed" | "aborted"; + +/** + * Common attributes shared by all queue task metrics. + * @since 2.3.0 + */ +export interface QueueTaskCommonAttributes { + role: QueueTaskRole; + queue?: MessageQueue; + activityType?: string; +} class FederationMetrics { readonly deliverySent: Counter; @@ -15,6 +39,12 @@ class FederationMetrics { readonly inboxProcessingDuration: Histogram; readonly httpServerRequestCount: Counter; readonly httpServerRequestDuration: Histogram; + readonly queueTaskEnqueued: Counter; + readonly queueTaskStarted: Counter; + readonly queueTaskCompleted: Counter; + readonly queueTaskFailed: Counter; + readonly queueTaskDuration: Histogram; + readonly queueTaskInFlight: UpDownCounter; constructor(meterProvider: MeterProvider) { const meter = meterProvider.getMeter(metadata.name, metadata.version); @@ -84,6 +114,40 @@ class FederationMetrics { }, }, ); + this.queueTaskEnqueued = meter.createCounter("fedify.queue.task.enqueued", { + description: "Tasks Fedify enqueued for inbox, outbox, or fanout work.", + unit: "{task}", + }); + this.queueTaskStarted = meter.createCounter("fedify.queue.task.started", { + description: "Tasks Fedify began processing as a queue worker.", + unit: "{task}", + }); + this.queueTaskCompleted = meter.createCounter( + "fedify.queue.task.completed", + { + description: "Queue tasks Fedify finished processing without throwing.", + unit: "{task}", + }, + ); + this.queueTaskFailed = meter.createCounter("fedify.queue.task.failed", { + description: "Queue tasks Fedify abandoned because processing threw.", + unit: "{task}", + }); + this.queueTaskDuration = meter.createHistogram( + "fedify.queue.task.duration", + { + description: "Duration of queue task processing in Fedify workers.", + unit: "ms", + }, + ); + this.queueTaskInFlight = meter.createUpDownCounter( + "fedify.queue.task.in_flight", + { + description: + "Queue tasks currently being processed in this Fedify process.", + unit: "{task}", + }, + ); } recordDelivery( @@ -151,6 +215,99 @@ class FederationMetrics { this.httpServerRequestCount.add(1, attributes); this.httpServerRequestDuration.record(durationMs, attributes); } + + recordQueueTaskEnqueued( + common: QueueTaskCommonAttributes, + attempt: number, + ): void { + const attributes = buildQueueTaskAttributes(common); + attributes["fedify.queue.task.attempt"] = attempt; + this.queueTaskEnqueued.add(1, attributes); + } + + recordQueueTaskStarted(common: QueueTaskCommonAttributes): void { + this.queueTaskStarted.add(1, buildQueueTaskAttributes(common)); + } + + incrementQueueTaskInFlight(common: QueueTaskCommonAttributes): void { + this.queueTaskInFlight.add(1, buildQueueTaskInFlightAttributes(common)); + } + + decrementQueueTaskInFlight(common: QueueTaskCommonAttributes): void { + this.queueTaskInFlight.add(-1, buildQueueTaskInFlightAttributes(common)); + } + + recordQueueTaskOutcome( + common: QueueTaskCommonAttributes, + result: QueueTaskResult, + durationMs: number, + ): void { + const attributes = buildQueueTaskAttributes(common); + attributes["fedify.queue.task.result"] = result; + if (result === "completed") { + this.queueTaskCompleted.add(1, attributes); + } else if (result === "failed") { + this.queueTaskFailed.add(1, attributes); + } + this.queueTaskDuration.record(durationMs, attributes); + } +} + +function buildQueueTaskAttributes( + common: QueueTaskCommonAttributes, +): Attributes { + const attributes: Attributes = { + "fedify.queue.role": common.role, + }; + const backend = getQueueBackend(common.queue); + if (backend != null) { + attributes["fedify.queue.backend"] = backend; + } + const nativeRetrial = common.queue?.nativeRetrial; + if (typeof nativeRetrial === "boolean") { + attributes["fedify.queue.native_retrial"] = nativeRetrial; + } + if (common.activityType != null) { + attributes["activitypub.activity.type"] = common.activityType; + } + return attributes; +} + +function buildQueueTaskInFlightAttributes( + common: QueueTaskCommonAttributes, +): Attributes { + // The in-flight UpDownCounter is process-local and intentionally omits + // per-message attributes (activity type, attempt, result) so that + // increments and decrements pair up cleanly. + return buildQueueTaskAttributes({ role: common.role, queue: common.queue }); +} + +/** + * Returns the constructor name of the given message queue, when it is a + * meaningful identifier. Used as a best-effort `fedify.queue.backend` + * attribute on queue task metrics; returns `undefined` for plain object + * literals (whose constructor is `Object`) so the attribute does not appear + * with a non-informative value. + * @since 2.3.0 + */ +export function getQueueBackend(queue?: MessageQueue): string | undefined { + const name = queue?.constructor?.name; + if (name == null || name === "" || name === "Object") return undefined; + return name; +} + +/** + * Whether the given thrown value is an `AbortError`. + * + * `processQueuedTask` distinguishes aborted tasks (recorded as + * `fedify.queue.task.result=aborted`) from other failures so that backend + * shutdown signals do not inflate the `fedify.queue.task.failed` counter. + * @since 2.3.0 + */ +export function isAbortError(error: unknown): boolean { + if (error == null || typeof error !== "object") return false; + const name = (error as { name?: unknown }).name; + return typeof name === "string" && name === "AbortError"; } const KNOWN_HTTP_METHODS: ReadonlySet = new Set([ diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index 7de8556cc..3296d2e0b 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -3129,9 +3129,11 @@ test("Federation.setOutboxListeners()", async (t) => { return Promise.resolve(); }, }; + const [meterProvider, recorder] = createTestMeterProvider(); const federation = new FederationImpl({ kv, contextLoaderFactory: () => mockDocumentLoader, + meterProvider, queue, }); federation @@ -3180,6 +3182,19 @@ test("Federation.setOutboxListeners()", async (t) => { assertEquals((enqueued[0] as OutboxMessage).actorIds, [ "https://remote.example/users/alice", ]); + + const enqueuedMetrics = recorder.getMeasurements( + "fedify.queue.task.enqueued", + ); + assertEquals(enqueuedMetrics.length, 1); + assertEquals( + enqueuedMetrics[0].attributes["fedify.queue.role"], + "outbox", + ); + assertEquals( + enqueuedMetrics[0].attributes["fedify.queue.task.attempt"], + 0, + ); }, ); }); @@ -3694,6 +3709,52 @@ test("FederationImpl.processQueuedTask()", async (t) => { durations[0].attributes["activitypub.activity.type"], "https://www.w3.org/ns/activitystreams#Create", ); + + const started = recorder.getMeasurements("fedify.queue.task.started"); + assertEquals(started.length, 1); + assertEquals(started[0].attributes["fedify.queue.role"], "inbox"); + + const completed = recorder.getMeasurements("fedify.queue.task.completed"); + assertEquals(completed.length, 1); + assertEquals(completed[0].attributes["fedify.queue.role"], "inbox"); + assertEquals( + completed[0].attributes["fedify.queue.task.result"], + "completed", + ); + assertEquals( + completed[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); + + assertEquals( + recorder.getMeasurements("fedify.queue.task.failed").length, + 0, + ); + + const taskDurations = recorder.getMeasurements( + "fedify.queue.task.duration", + ); + assertEquals(taskDurations.length, 1); + assertEquals(taskDurations[0].type, "histogram"); + assertEquals(taskDurations[0].attributes["fedify.queue.role"], "inbox"); + assertEquals( + taskDurations[0].attributes["fedify.queue.task.result"], + "completed", + ); + + const inFlight = recorder.getMeasurements("fedify.queue.task.in_flight"); + assertEquals(inFlight.length, 2); + assertEquals(inFlight[0].type, "upDownCounter"); + assertEquals(inFlight[0].value, 1); + assertEquals(inFlight[1].value, -1); + // The increment and decrement attribute bags must match exactly so that + // the in-flight gauge always nets to zero per attribute series. + assertEquals(inFlight[0].attributes, inFlight[1].attributes); + assertEquals(inFlight[0].attributes["fedify.queue.role"], "inbox"); + assertEquals( + inFlight[0].attributes["activitypub.activity.type"], + undefined, + ); }); }); @@ -4076,6 +4137,403 @@ test("FederationImpl.processQueuedTask() permanent failure", async (t) => { fetchMock.hardReset(); }); +test("FederationImpl.processQueuedTask() queue task metrics", async (t) => { + await t.step( + "records failed result when worker re-throws (nativeRetrial)", + async () => { + // With nativeRetrial=true the worker leaves retry handling to the queue + // backend, so an inbox listener exception propagates back out of + // processQueuedTask and is recorded as a failed outcome. + const kv = new MemoryKvStore(); + const [meterProvider, recorder] = createTestMeterProvider(); + const queue: MessageQueue = { + nativeRetrial: true, + enqueue(_message, _options) { + return Promise.resolve(); + }, + listen(_handler, _options) { + return Promise.resolve(); + }, + }; + const federation = new FederationImpl({ + kv, + meterProvider, + queue, + }); + federation.setInboxListeners("/users/{identifier}/inbox", "/inbox") + .on(vocab.Create, () => { + throw new Error("Intended error for testing"); + }); + + await assertRejects( + () => + federation.processQueuedTask( + undefined, + { + type: "inbox", + id: crypto.randomUUID(), + baseUrl: "https://example.com", + activity: { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Create", + id: "https://remote.example/activities/2", + actor: "https://remote.example/users/alice", + object: { type: "Note", content: "Hello world" }, + }, + started: new Date().toISOString(), + attempt: 0, + identifier: null, + traceContext: {}, + } satisfies InboxMessage, + ), + Error, + ); + + assertEquals( + recorder.getMeasurements("fedify.queue.task.completed").length, + 0, + ); + const failed = recorder.getMeasurements("fedify.queue.task.failed"); + assertEquals(failed.length, 1); + assertEquals(failed[0].attributes["fedify.queue.role"], "inbox"); + assertEquals(failed[0].attributes["fedify.queue.task.result"], "failed"); + assertEquals(failed[0].attributes["fedify.queue.native_retrial"], true); + assertEquals( + failed[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); + + const taskDurations = recorder.getMeasurements( + "fedify.queue.task.duration", + ); + assertEquals(taskDurations.length, 1); + assertEquals( + taskDurations[0].attributes["fedify.queue.task.result"], + "failed", + ); + + const inFlight = recorder.getMeasurements("fedify.queue.task.in_flight"); + assertEquals(inFlight.length, 2); + assertEquals(inFlight[0].value, 1); + assertEquals(inFlight[1].value, -1); + assertEquals(inFlight[0].attributes, inFlight[1].attributes); + }, + ); + + await t.step( + "records completed when retry handler swallows listener error", + async () => { + // With nativeRetrial=false the worker schedules a retry and returns + // normally, so processQueuedTask records a completed outcome and a + // separate retry enqueue. + const kv = new MemoryKvStore(); + const [meterProvider, recorder] = createTestMeterProvider(); + const queuedMessages: Message[] = []; + const queue: MessageQueue = { + enqueue(message, _options) { + queuedMessages.push(message); + return Promise.resolve(); + }, + listen(_handler, _options) { + return Promise.resolve(); + }, + }; + const federation = new FederationImpl({ + kv, + meterProvider, + queue, + }); + federation.setInboxListeners("/users/{identifier}/inbox", "/inbox") + .on(vocab.Create, () => { + throw new Error("Intended error for testing"); + }); + + await federation.processQueuedTask( + undefined, + { + type: "inbox", + id: crypto.randomUUID(), + baseUrl: "https://example.com", + activity: { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Create", + id: "https://remote.example/activities/retry", + actor: "https://remote.example/users/alice", + object: { type: "Note", content: "Hello world" }, + }, + started: new Date().toISOString(), + attempt: 0, + identifier: null, + traceContext: {}, + } satisfies InboxMessage, + ); + assertEquals(queuedMessages.length, 1); + + const completed = recorder.getMeasurements("fedify.queue.task.completed"); + assertEquals(completed.length, 1); + assertEquals( + completed[0].attributes["fedify.queue.task.result"], + "completed", + ); + + const enqueued = recorder.getMeasurements("fedify.queue.task.enqueued"); + assertEquals(enqueued.length, 1); + assertEquals(enqueued[0].attributes["fedify.queue.role"], "inbox"); + assertEquals(enqueued[0].attributes["fedify.queue.task.attempt"], 1); + assertEquals( + enqueued[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); + }, + ); + + await t.step( + "records aborted result when worker re-throws AbortError", + async () => { + const kv = new MemoryKvStore(); + const [meterProvider, recorder] = createTestMeterProvider(); + const queue: MessageQueue = { + nativeRetrial: true, + enqueue(_message, _options) { + return Promise.resolve(); + }, + listen(_handler, _options) { + return Promise.resolve(); + }, + }; + const federation = new FederationImpl({ + kv, + meterProvider, + queue, + }); + federation.setInboxListeners("/users/{identifier}/inbox", "/inbox") + .on(vocab.Create, () => { + throw new DOMException("aborted", "AbortError"); + }); + + await assertRejects( + () => + federation.processQueuedTask( + undefined, + { + type: "inbox", + id: crypto.randomUUID(), + baseUrl: "https://example.com", + activity: { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Create", + id: "https://remote.example/activities/3", + actor: "https://remote.example/users/alice", + object: { type: "Note", content: "Hello world" }, + }, + started: new Date().toISOString(), + attempt: 0, + identifier: null, + traceContext: {}, + } satisfies InboxMessage, + ), + DOMException, + ); + + assertEquals( + recorder.getMeasurements("fedify.queue.task.failed").length, + 0, + ); + assertEquals( + recorder.getMeasurements("fedify.queue.task.completed").length, + 0, + ); + const taskDurations = recorder.getMeasurements( + "fedify.queue.task.duration", + ); + assertEquals(taskDurations.length, 1); + assertEquals( + taskDurations[0].attributes["fedify.queue.task.result"], + "aborted", + ); + }, + ); + + await t.step("records native_retrial and backend attributes", async () => { + const kv = new MemoryKvStore(); + const [meterProvider, recorder] = createTestMeterProvider(); + class TestMessageQueue implements MessageQueue { + readonly nativeRetrial = true; + enqueue(_message: unknown, _options?: unknown): Promise { + return Promise.resolve(); + } + listen(_handler: unknown, _options?: unknown): Promise { + return Promise.resolve(); + } + } + const federation = new FederationImpl({ + kv, + meterProvider, + queue: new TestMessageQueue(), + }); + federation.setInboxListeners("/users/{identifier}/inbox", "/inbox") + .on(vocab.Create, () => {}); + + await federation.processQueuedTask( + undefined, + { + type: "inbox", + id: crypto.randomUUID(), + baseUrl: "https://example.com", + activity: { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Create", + id: "https://remote.example/activities/4", + actor: "https://remote.example/users/alice", + object: { type: "Note", content: "Hello world" }, + }, + started: new Date().toISOString(), + attempt: 0, + identifier: null, + traceContext: {}, + } satisfies InboxMessage, + ); + + const completed = recorder.getMeasurements("fedify.queue.task.completed"); + assertEquals(completed.length, 1); + assertEquals( + completed[0].attributes["fedify.queue.backend"], + "TestMessageQueue", + ); + assertEquals(completed[0].attributes["fedify.queue.native_retrial"], true); + }); + + await t.step("records outbox enqueue and worker metrics", async () => { + const kv = new MemoryKvStore(); + const [meterProvider, recorder] = createTestMeterProvider(); + const queue: MessageQueue = { + enqueue(_message, _options) { + return Promise.resolve(); + }, + listen(_handler, _options) { + return Promise.resolve(); + }, + }; + const federation = new FederationImpl({ + kv, + meterProvider, + queue, + }); + federation.setInboxListeners("/users/{identifier}/inbox", "/inbox"); + + fetchMock.spyGlobal(); + fetchMock.post("https://remote.example/inbox", { status: 202 }); + try { + await federation.processQueuedTask( + undefined, + { + type: "outbox", + id: crypto.randomUUID(), + baseUrl: "https://example.com", + keys: [], + activity: { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Create", + id: "https://example.com/activities/1", + actor: "https://example.com/users/alice", + object: { type: "Note", content: "test" }, + }, + activityType: "https://www.w3.org/ns/activitystreams#Create", + inbox: "https://remote.example/inbox", + sharedInbox: false, + started: new Date().toISOString(), + attempt: 0, + headers: {}, + traceContext: {}, + } satisfies OutboxMessage, + ); + } finally { + fetchMock.hardReset(); + } + + const started = recorder.getMeasurements("fedify.queue.task.started"); + assertEquals(started.length, 1); + assertEquals(started[0].attributes["fedify.queue.role"], "outbox"); + assertEquals( + started[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); + + const completed = recorder.getMeasurements("fedify.queue.task.completed"); + assertEquals(completed.length, 1); + assertEquals( + completed[0].attributes["fedify.queue.task.result"], + "completed", + ); + }); + + await t.step( + "records started/completed for a fanout task with no recipients", + async () => { + // A fanout task with no inboxes drops out before sendActivity validates + // keys, so the worker still completes successfully. + const kv = new MemoryKvStore(); + const [meterProvider, recorder] = createTestMeterProvider(); + const exportedKey = await crypto.subtle.exportKey( + "jwk", + rsaPrivateKey3, + ); + const queue: MessageQueue = { + enqueue(_message, _options) { + return Promise.resolve(); + }, + listen(_handler, _options) { + return Promise.resolve(); + }, + }; + const federation = new FederationImpl({ + kv, + meterProvider, + queue, + }); + federation.setInboxListeners("/users/{identifier}/inbox", "/inbox"); + + await federation.processQueuedTask(undefined, { + type: "fanout", + id: crypto.randomUUID(), + baseUrl: "https://example.com", + keys: [ + { + keyId: "https://example.com/users/alice#main-key", + privateKey: exportedKey, + }, + ], + inboxes: {}, + activity: { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Create", + id: "https://example.com/activities/1", + actor: "https://example.com/users/alice", + object: { type: "Note", content: "test" }, + }, + activityType: "https://www.w3.org/ns/activitystreams#Create", + traceContext: {}, + }); + + const started = recorder.getMeasurements("fedify.queue.task.started"); + assertEquals(started.length, 1); + assertEquals(started[0].attributes["fedify.queue.role"], "fanout"); + assertEquals( + started[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); + + const completed = recorder.getMeasurements("fedify.queue.task.completed"); + assertEquals(completed.length, 1); + assertEquals(completed[0].attributes["fedify.queue.role"], "fanout"); + assertEquals( + completed[0].attributes["fedify.queue.task.result"], + "completed", + ); + }, + ); +}); + test("ContextImpl.lookupObject()", async (t) => { // Note that this test only checks if allowPrivateAddress option affects // the ContextImpl.lookupObject() method. Other aspects of the method are diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index f29e3a84c..55a49a97e 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -107,6 +107,9 @@ import { getDurationMs, getFederationMetrics, getRemoteHost, + isAbortError, + type QueueTaskCommonAttributes, + type QueueTaskResult, } from "./metrics.ts"; import type { MessageQueue } from "./mq.ts"; import { acceptsJsonLd } from "./negotiation.ts"; @@ -488,8 +491,14 @@ export class FederationImpl context.active(), message.traceContext, ); + const meter = getFederationMetrics(this.meterProvider); return withContext({ messageId: message.id }, async () => { if (message.type === "fanout") { + const common: QueueTaskCommonAttributes = { + role: "fanout", + queue: this.fanoutQueue, + activityType: message.activityType, + }; await tracer.startActiveSpan( "activitypub.fanout", { @@ -510,15 +519,26 @@ export class FederationImpl message.activityId, ); } + meter.recordQueueTaskStarted(common); + meter.incrementQueueTaskInFlight(common); + const startedAt = performance.now(); + let outcome: QueueTaskResult = "completed"; try { await this.#listenFanoutMessage(contextData, message); } catch (e) { + outcome = isAbortError(e) ? "aborted" : "failed"; span.setStatus({ code: SpanStatusCode.ERROR, message: String(e), }); throw e; } finally { + meter.recordQueueTaskOutcome( + common, + outcome, + getDurationMs(startedAt), + ); + meter.decrementQueueTaskInFlight(common); span.end(); } }, @@ -526,6 +546,11 @@ export class FederationImpl }, ); } else if (message.type === "outbox") { + const common: QueueTaskCommonAttributes = { + role: "outbox", + queue: this.outboxQueue, + activityType: message.activityType, + }; await tracer.startActiveSpan( "activitypub.outbox", { @@ -547,15 +572,26 @@ export class FederationImpl message.activityId, ); } + meter.recordQueueTaskStarted(common); + meter.incrementQueueTaskInFlight(common); + const startedAt = performance.now(); + let outcome: QueueTaskResult = "completed"; try { await this.#listenOutboxMessage(contextData, message, span); } catch (e) { + outcome = isAbortError(e) ? "aborted" : "failed"; span.setStatus({ code: SpanStatusCode.ERROR, message: String(e), }); throw e; } finally { + meter.recordQueueTaskOutcome( + common, + outcome, + getDurationMs(startedAt), + ); + meter.decrementQueueTaskInFlight(common); span.end(); } }, @@ -563,6 +599,10 @@ export class FederationImpl }, ); } else if (message.type === "inbox") { + const common: QueueTaskCommonAttributes = { + role: "inbox", + queue: this.inboxQueue, + }; await tracer.startActiveSpan( "activitypub.inbox", { @@ -577,15 +617,33 @@ export class FederationImpl return await withContext( { traceId: spanCtx.traceId, spanId: spanCtx.spanId }, async () => { + meter.recordQueueTaskStarted(common); + meter.incrementQueueTaskInFlight(common); + const startedAt = performance.now(); + let outcome: QueueTaskResult = "completed"; try { - await this.#listenInboxMessage(contextData, message, span); + await this.#listenInboxMessage( + contextData, + message, + span, + (activityType) => { + common.activityType = activityType; + }, + ); } catch (e) { + outcome = isAbortError(e) ? "aborted" : "failed"; span.setStatus({ code: SpanStatusCode.ERROR, message: String(e), }); throw e; } finally { + meter.recordQueueTaskOutcome( + common, + outcome, + getDurationMs(startedAt), + ); + meter.decrementQueueTaskInFlight(common); span.end(); } }, @@ -809,17 +867,29 @@ export class FederationImpl "#{attempt}); retry...:\n{error}", { ...logData, error }, ); - await this.outboxQueue?.enqueue( - { - ...message, - attempt: message.attempt + 1, - } satisfies OutboxMessage, - { - delay: Temporal.Duration.compare(delay, { seconds: 0 }) < 0 - ? Temporal.Duration.from({ seconds: 0 }) - : delay, - }, - ); + const retryMessage = { + ...message, + attempt: message.attempt + 1, + } satisfies OutboxMessage; + const { outboxQueue } = this; + if (outboxQueue != null) { + await outboxQueue.enqueue( + retryMessage, + { + delay: Temporal.Duration.compare(delay, { seconds: 0 }) < 0 + ? Temporal.Duration.from({ seconds: 0 }) + : delay, + }, + ); + getFederationMetrics(this.meterProvider).recordQueueTaskEnqueued( + { + role: "outbox", + queue: outboxQueue, + activityType: retryMessage.activityType, + }, + retryMessage.attempt, + ); + } } else { logger.error( "Failed to send activity {activityId} to {inbox} after {attempt} " + @@ -839,6 +909,7 @@ export class FederationImpl ctxData: TContextData, message: InboxMessage, span: Span, + onActivityType?: (activityType: string) => void, ): Promise { const logger = getLogger(["fedify", "federation", "inbox"]); const baseUrl = new URL(message.baseUrl); @@ -860,7 +931,9 @@ export class FederationImpl } } const activity = await Activity.fromJsonLd(message.activity, context); - span.setAttribute("activitypub.activity.type", getTypeId(activity).href); + const activityType = getTypeId(activity).href; + span.setAttribute("activitypub.activity.type", activityType); + onActivityType?.(activityType); if (activity.id != null) { span.setAttribute("activitypub.activity.id", activity.id.href); } @@ -897,7 +970,7 @@ export class FederationImpl ); span.setStatus({ code: SpanStatusCode.ERROR, - message: `Unsupported activity type: ${getTypeId(activity).href}`, + message: `Unsupported activity type: ${activityType}`, }); span.end(); return; @@ -905,7 +978,6 @@ export class FederationImpl const { class: cls, listener } = dispatched; span.updateName(`activitypub.dispatch_inbox_listener ${cls.name}`); try { - const activityType = getTypeId(activity).href; const started = performance.now(); try { await listener( @@ -976,17 +1048,29 @@ export class FederationImpl recipient: message.identifier, }, ); - await this.inboxQueue?.enqueue( - { - ...message, - attempt: message.attempt + 1, - } satisfies InboxMessage, - { - delay: Temporal.Duration.compare(delay, { seconds: 0 }) < 0 - ? Temporal.Duration.from({ seconds: 0 }) - : delay, - }, - ); + const retryMessage = { + ...message, + attempt: message.attempt + 1, + } satisfies InboxMessage; + const { inboxQueue } = this; + if (inboxQueue != null) { + await inboxQueue.enqueue( + retryMessage, + { + delay: Temporal.Duration.compare(delay, { seconds: 0 }) < 0 + ? Temporal.Duration.from({ seconds: 0 }) + : delay, + }, + ); + getFederationMetrics(this.meterProvider).recordQueueTaskEnqueued( + { + role: "inbox", + queue: inboxQueue, + activityType, + }, + retryMessage.attempt, + ); + } } else { logger.error( "Failed to process the incoming activity {activityId} after " + @@ -1300,12 +1384,24 @@ export class FederationImpl messages.push({ message, orderingKey: messageOrderingKey }); } const { outboxQueue } = this; + const recordOutboxEnqueued = (message: OutboxMessage): void => { + getFederationMetrics(this.meterProvider).recordQueueTaskEnqueued( + { + role: "outbox", + queue: outboxQueue, + activityType: message.activityType, + }, + message.attempt, + ); + }; if (outboxQueue.enqueueMany == null) { - const promises: Promise[] = messages.map((m) => - outboxQueue.enqueue(m.message, { orderingKey: m.orderingKey }) + const promises: PromiseSettledResult[] = await Promise.allSettled( + messages.map(async (m) => { + await outboxQueue.enqueue(m.message, { orderingKey: m.orderingKey }); + recordOutboxEnqueued(m.message); + }), ); - const results = await Promise.allSettled(promises); - const errors = results + const errors = promises .filter((r) => r.status === "rejected") .map((r) => (r as PromiseRejectedResult).reason); if (errors.length > 0) { @@ -1325,11 +1421,15 @@ export class FederationImpl // Note: enqueueMany does not support per-message orderingKey, // so we fall back to individual enqueues when orderingKey is specified if (orderingKey != null) { - const promises: Promise[] = messages.map((m) => - outboxQueue.enqueue(m.message, { orderingKey: m.orderingKey }) + const promises: PromiseSettledResult[] = await Promise.allSettled( + messages.map(async (m) => { + await outboxQueue.enqueue(m.message, { + orderingKey: m.orderingKey, + }); + recordOutboxEnqueued(m.message); + }), ); - const results = await Promise.allSettled(promises); - const errors = results + const errors = promises .filter((r) => r.status === "rejected") .map((r) => (r as PromiseRejectedResult).reason); if (errors.length > 0) { @@ -1355,6 +1455,7 @@ export class FederationImpl ); throw error; } + for (const m of messages) recordOutboxEnqueued(m.message); } } } @@ -2640,6 +2741,14 @@ export class ContextImpl implements Context { message, { orderingKey: options.orderingKey }, ); + getFederationMetrics(this.federation.meterProvider).recordQueueTaskEnqueued( + { + role: "fanout", + queue: this.federation.fanoutQueue, + activityType: message.activityType, + }, + 0, + ); return true; } @@ -3271,12 +3380,24 @@ async function forwardActivityInternal( }); } const { outboxQueue } = ctx.federation; + const recordOutboxEnqueued = (message: OutboxMessage): void => { + getFederationMetrics(ctx.federation.meterProvider).recordQueueTaskEnqueued( + { + role: "outbox", + queue: outboxQueue, + activityType: message.activityType, + }, + message.attempt, + ); + }; if (outboxQueue.enqueueMany == null || orderingKey != null) { - const promises: Promise[] = messages.map((m) => - outboxQueue.enqueue(m.message, { orderingKey: m.orderingKey }) + const promises: PromiseSettledResult[] = await Promise.allSettled( + messages.map(async (m) => { + await outboxQueue.enqueue(m.message, { orderingKey: m.orderingKey }); + recordOutboxEnqueued(m.message); + }), ); - const results = await Promise.allSettled(promises); - const errors: unknown[] = results + const errors: unknown[] = promises .filter((r) => r.status === "rejected") .map((r) => (r as PromiseRejectedResult).reason); if (errors.length > 0) { @@ -3302,6 +3423,7 @@ async function forwardActivityInternal( ); throw error; } + for (const m of messages) recordOutboxEnqueued(m.message); } return true; } From f68096f24574705a144d2f86be4fe3e674cff2ae Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Sun, 10 May 2026 14:17:34 +0900 Subject: [PATCH 2/9] Refine OpenTelemetry queue metrics in CHANGES.md Improve the formatting of the OpenTelemetry queue task metrics list in CHANGES.md and add a reference to pull request #759. --- CHANGES.md | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 6f628b0ec..662e9f8e5 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -53,16 +53,20 @@ To be released. - Added OpenTelemetry queue task metrics covering Fedify's enqueue and worker boundaries for inbox, outbox, and fanout work: - `fedify.queue.task.enqueued` (Counter), `fedify.queue.task.started` - (Counter), `fedify.queue.task.completed` (Counter), - `fedify.queue.task.failed` (Counter), `fedify.queue.task.duration` - (Histogram), and `fedify.queue.task.in_flight` (UpDownCounter, process - local). Instruments carry `fedify.queue.role`, best-effort + + - `fedify.queue.task.enqueued` (counter) + - `fedify.queue.task.started` (counter) + - `fedify.queue.task.completed` (counter) + - `fedify.queue.task.failed` (counter) + - `fedify.queue.task.duration` (histogram) + - `fedify.queue.task.in_flight` (up/down counter, process local) + + Instruments carry `fedify.queue.role`, best-effort `fedify.queue.backend` (the queue implementation's constructor name), and `fedify.queue.native_retrial`. The enqueue/started/completed/ failed/duration instruments additionally carry `activitypub.activity.type` whenever Fedify knows the activity type - for the queued message; the in-flight UpDownCounter deliberately + for the queued message; the in-flight up/down counter deliberately omits per-message attributes so that increment and decrement operations always pair up cleanly per attribute series. Enqueue measurements additionally carry `fedify.queue.task.attempt` for @@ -70,7 +74,7 @@ To be released. `fedify.queue.task.result` (`completed`, `failed`, or `aborted`). Together with `MessageQueue.getDepth()` reporting, these metrics let operators distinguish a slow-draining queue from a queue that sees - less traffic. [[#316], [#740]] + less traffic. [[#316], [#740], [#759]] [#316]: https://github.com/fedify-dev/fedify/issues/316 [#619]: https://github.com/fedify-dev/fedify/issues/619 @@ -82,6 +86,7 @@ To be released. [#753]: https://github.com/fedify-dev/fedify/pull/753 [#755]: https://github.com/fedify-dev/fedify/pull/755 [#757]: https://github.com/fedify-dev/fedify/pull/757 +[#759]: https://github.com/fedify-dev/fedify/pull/759 ### @fedify/fixture From 5946108ae09018cfd993d30f328fcde13894adf5 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Sun, 10 May 2026 14:42:44 +0900 Subject: [PATCH 3/9] Forward meterProvider through ContextImpl.routeActivity() ContextImpl.routeActivityInternal() called the routeActivity() helper in inbox.ts without passing this.federation.meterProvider, so the fedify.queue.task.enqueued counter recorded from the public Context.routeActivity() API path was emitted on the global MeterProvider instead of the configured one. The fetch-driven inbox enqueue path through handleInbox() forwarded the provider correctly, so this only affected callers using ctx.routeActivity() directly. Forward this.federation.meterProvider into the routeActivity() call and add a regression test that exercises the path with a configured test meter provider. https://github.com/fedify-dev/fedify/pull/759#discussion_r3214358845 Assisted-by: Claude Code:claude-opus-4-7 Assisted-by: Codex:gpt-5.2-codex --- .../fedify/src/federation/middleware.test.ts | 55 +++++++++++++++++++ packages/fedify/src/federation/middleware.ts | 1 + 2 files changed, 56 insertions(+) diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index 3296d2e0b..11aa8eeda 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -5505,6 +5505,61 @@ test({ }, }); +test({ + name: "ContextImpl.routeActivity() forwards meterProvider to inbox enqueue", + permissions: { env: true, read: true }, + async fn() { + const [meterProvider, recorder] = createTestMeterProvider(); + const enqueued: Message[] = []; + const queue: MessageQueue = { + enqueue(message): Promise { + enqueued.push(message); + return Promise.resolve(); + }, + listen(): Promise { + return Promise.resolve(); + }, + }; + const federation = new FederationImpl({ + kv: new MemoryKvStore(), + meterProvider, + queue, + }); + federation.setInboxListeners("/u/{identifier}/i", "/i"); + + const ctx = new ContextImpl({ + url: new URL("https://example.com/"), + federation, + data: undefined, + documentLoader: mockDocumentLoader, + contextLoader: documentLoader, + }); + + const signedOffer = await signObject( + new vocab.Offer({ + actor: new URL("https://example.com/person2"), + }), + ed25519PrivateKey, + ed25519Multikey.id!, + ); + assert(await ctx.routeActivity(null, signedOffer)); + assertEquals(enqueued.length, 1); + + const enqueuedMetrics = recorder.getMeasurements( + "fedify.queue.task.enqueued", + ); + assertEquals(enqueuedMetrics.length, 1); + assertEquals( + enqueuedMetrics[0].attributes["fedify.queue.role"], + "inbox", + ); + assertEquals( + enqueuedMetrics[0].attributes["fedify.queue.task.attempt"], + 0, + ); + }, +}); + test("ContextImpl.getCollectionUri()", () => { const federation = new FederationImpl({ kv: new MemoryKvStore() }); const base = "https://example.com"; diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index 55a49a97e..111176001 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -2962,6 +2962,7 @@ export class ContextImpl implements Context { kvPrefixes: this.federation.kvPrefixes, queue: this.federation.inboxQueue, span, + meterProvider: this.federation.meterProvider, tracerProvider: options.tracerProvider ?? this.tracerProvider, idempotencyStrategy: this.federation.idempotencyStrategy, }); From 643c23534e7818814da70fa32cd2eb919cc6d115 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Sun, 10 May 2026 14:43:41 +0900 Subject: [PATCH 4/9] Strengthen queue task metric test assertions Two CodeRabbit nitpicks on the queue task metric tests: 1. handleInbox() inbox-enqueue test (handler.test.ts) now also asserts the measurement's instrument type is "counter", and that the fedify.queue.backend attribute is undefined when the queue is an object literal (whose constructor.name is "Object" and is filtered by getQueueBackend()). 2. processQueuedTask() outbox worker step (middleware.test.ts) is renamed to "records outbox worker metrics on successful delivery", since the body asserts only worker-side started/completed and never exercised the enqueue counter. An assertion that no enqueue measurement is emitted on the success path is added so accidental double-counting in #listenOutboxMessage() would fail this test. https://github.com/fedify-dev/fedify/pull/759#discussion_r3214358838 https://github.com/fedify-dev/fedify/pull/759#discussion_r3214358850 Assisted-by: Claude Code:claude-opus-4-7 Assisted-by: Codex:gpt-5.2-codex --- .../fedify/src/federation/handler.test.ts | 4 + .../fedify/src/federation/middleware.test.ts | 129 ++++++++++-------- 2 files changed, 74 insertions(+), 59 deletions(-) diff --git a/packages/fedify/src/federation/handler.test.ts b/packages/fedify/src/federation/handler.test.ts index 0ea50f429..89bf2c9d5 100644 --- a/packages/fedify/src/federation/handler.test.ts +++ b/packages/fedify/src/federation/handler.test.ts @@ -2542,12 +2542,16 @@ test("handleInbox() records fedify.queue.task.enqueued when queued", async () => const enqueued = recorder.getMeasurements("fedify.queue.task.enqueued"); assertEquals(enqueued.length, 1); + assertEquals(enqueued[0].type, "counter"); assertEquals(enqueued[0].attributes["fedify.queue.role"], "inbox"); assertEquals(enqueued[0].attributes["fedify.queue.task.attempt"], 0); assertEquals( enqueued[0].attributes["activitypub.activity.type"], "https://www.w3.org/ns/activitystreams#Create", ); + // The queue here is an object literal, so getQueueBackend() should omit the + // backend attribute rather than emit "Object". + assertEquals(enqueued[0].attributes["fedify.queue.backend"], undefined); }); test("handleInbox() records unverified HTTP signature details", async () => { diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index 11aa8eeda..265f4b176 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -4403,69 +4403,80 @@ test("FederationImpl.processQueuedTask() queue task metrics", async (t) => { assertEquals(completed[0].attributes["fedify.queue.native_retrial"], true); }); - await t.step("records outbox enqueue and worker metrics", async () => { - const kv = new MemoryKvStore(); - const [meterProvider, recorder] = createTestMeterProvider(); - const queue: MessageQueue = { - enqueue(_message, _options) { - return Promise.resolve(); - }, - listen(_handler, _options) { - return Promise.resolve(); - }, - }; - const federation = new FederationImpl({ - kv, - meterProvider, - queue, - }); - federation.setInboxListeners("/users/{identifier}/inbox", "/inbox"); + await t.step( + "records outbox worker metrics on successful delivery", + async () => { + const kv = new MemoryKvStore(); + const [meterProvider, recorder] = createTestMeterProvider(); + const queue: MessageQueue = { + enqueue(_message, _options) { + return Promise.resolve(); + }, + listen(_handler, _options) { + return Promise.resolve(); + }, + }; + const federation = new FederationImpl({ + kv, + meterProvider, + queue, + }); + federation.setInboxListeners("/users/{identifier}/inbox", "/inbox"); - fetchMock.spyGlobal(); - fetchMock.post("https://remote.example/inbox", { status: 202 }); - try { - await federation.processQueuedTask( - undefined, - { - type: "outbox", - id: crypto.randomUUID(), - baseUrl: "https://example.com", - keys: [], - activity: { - "@context": "https://www.w3.org/ns/activitystreams", - type: "Create", - id: "https://example.com/activities/1", - actor: "https://example.com/users/alice", - object: { type: "Note", content: "test" }, - }, - activityType: "https://www.w3.org/ns/activitystreams#Create", - inbox: "https://remote.example/inbox", - sharedInbox: false, - started: new Date().toISOString(), - attempt: 0, - headers: {}, - traceContext: {}, - } satisfies OutboxMessage, + fetchMock.spyGlobal(); + fetchMock.post("https://remote.example/inbox", { status: 202 }); + try { + await federation.processQueuedTask( + undefined, + { + type: "outbox", + id: crypto.randomUUID(), + baseUrl: "https://example.com", + keys: [], + activity: { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Create", + id: "https://example.com/activities/1", + actor: "https://example.com/users/alice", + object: { type: "Note", content: "test" }, + }, + activityType: "https://www.w3.org/ns/activitystreams#Create", + inbox: "https://remote.example/inbox", + sharedInbox: false, + started: new Date().toISOString(), + attempt: 0, + headers: {}, + traceContext: {}, + } satisfies OutboxMessage, + ); + } finally { + fetchMock.hardReset(); + } + + const started = recorder.getMeasurements("fedify.queue.task.started"); + assertEquals(started.length, 1); + assertEquals(started[0].attributes["fedify.queue.role"], "outbox"); + assertEquals( + started[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", ); - } finally { - fetchMock.hardReset(); - } - const started = recorder.getMeasurements("fedify.queue.task.started"); - assertEquals(started.length, 1); - assertEquals(started[0].attributes["fedify.queue.role"], "outbox"); - assertEquals( - started[0].attributes["activitypub.activity.type"], - "https://www.w3.org/ns/activitystreams#Create", - ); + const completed = recorder.getMeasurements("fedify.queue.task.completed"); + assertEquals(completed.length, 1); + assertEquals( + completed[0].attributes["fedify.queue.task.result"], + "completed", + ); - const completed = recorder.getMeasurements("fedify.queue.task.completed"); - assertEquals(completed.length, 1); - assertEquals( - completed[0].attributes["fedify.queue.task.result"], - "completed", - ); - }); + // Successful outbox delivery should not re-enqueue, so no enqueued + // measurement is expected on this path. The guard catches accidental + // double-counting if the implementation ever changes. + assertEquals( + recorder.getMeasurements("fedify.queue.task.enqueued").length, + 0, + ); + }, + ); await t.step( "records started/completed for a fanout task with no recipients", From f591b5c27c855d922db4749b31c40002d0019ae5 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Sun, 10 May 2026 15:06:21 +0900 Subject: [PATCH 5/9] Consolidate per-message outbox enqueue branches The per-message enqueue branch that ran when the queue lacked enqueueMany() was structurally identical to the one that ran when enqueueMany() existed but the caller specified an orderingKey: both fell back to individual enqueue() calls and recorded recordOutboxEnqueued() for each. Folding both conditions into a single fallback (`enqueueMany == null || orderingKey != null`) removes a copy of the Promise.allSettled error-aggregation block, leaving the enqueueMany() success path untouched. No behavior change. Assisted-by: Claude Code:claude-opus-4-7 --- packages/fedify/src/federation/middleware.ts | 50 +++++--------------- 1 file changed, 12 insertions(+), 38 deletions(-) diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index 111176001..a79ad3a1f 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -1394,7 +1394,10 @@ export class FederationImpl message.attempt, ); }; - if (outboxQueue.enqueueMany == null) { + // enqueueMany does not support per-message orderingKey, so fall back to + // individual enqueues whenever orderingKey is specified or the backend + // does not implement enqueueMany. + if (outboxQueue.enqueueMany == null || orderingKey != null) { const promises: PromiseSettledResult[] = await Promise.allSettled( messages.map(async (m) => { await outboxQueue.enqueue(m.message, { orderingKey: m.orderingKey }); @@ -1418,45 +1421,16 @@ export class FederationImpl throw errors[0]; } } else { - // Note: enqueueMany does not support per-message orderingKey, - // so we fall back to individual enqueues when orderingKey is specified - if (orderingKey != null) { - const promises: PromiseSettledResult[] = await Promise.allSettled( - messages.map(async (m) => { - await outboxQueue.enqueue(m.message, { - orderingKey: m.orderingKey, - }); - recordOutboxEnqueued(m.message); - }), + try { + await outboxQueue.enqueueMany(messages.map((m) => m.message)); + } catch (error) { + logger.error( + "Failed to enqueue activity {activityId} to send later: {error}", + { activityId: activity.id!.href, error }, ); - const errors = promises - .filter((r) => r.status === "rejected") - .map((r) => (r as PromiseRejectedResult).reason); - if (errors.length > 0) { - logger.error( - "Failed to enqueue activity {activityId} to send later: {errors}", - { activityId: activity.id!.href, errors }, - ); - if (errors.length > 1) { - throw new AggregateError( - errors, - `Failed to enqueue activity ${activityId} to send later.`, - ); - } - throw errors[0]; - } - } else { - try { - await outboxQueue.enqueueMany(messages.map((m) => m.message)); - } catch (error) { - logger.error( - "Failed to enqueue activity {activityId} to send later: {error}", - { activityId: activity.id!.href, error }, - ); - throw error; - } - for (const m of messages) recordOutboxEnqueued(m.message); + throw error; } + for (const m of messages) recordOutboxEnqueued(m.message); } } From f6624d9c01806474e6dad75b93734c4381cde62e Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Sun, 10 May 2026 15:14:36 +0900 Subject: [PATCH 6/9] Set explicit buckets on fedify.queue.task.duration Use the same OpenTelemetry HTTP server semantic-conventions buckets (5 ms to 10 s) that fedify.http.server.request.duration already uses. Queue task durations span a similar range: in-process inbox listeners finish in single-digit milliseconds, while outbox delivery to a slow remote inbox can take several seconds. Without explicit buckets, exporters fall back to backend-specific defaults that vary across Prometheus, OTLP/Tempo, and Datadog and make latency dashboards inconsistent across deployments. https://github.com/fedify-dev/fedify/pull/759#discussion_r3214391894 Assisted-by: Claude Code:claude-opus-4-7 Assisted-by: Gemini Code Assist --- packages/fedify/src/federation/metrics.ts | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/packages/fedify/src/federation/metrics.ts b/packages/fedify/src/federation/metrics.ts index 119ae357e..49ab99dd1 100644 --- a/packages/fedify/src/federation/metrics.ts +++ b/packages/fedify/src/federation/metrics.ts @@ -138,6 +138,27 @@ class FederationMetrics { { description: "Duration of queue task processing in Fedify workers.", unit: "ms", + advice: { + // Reuse the OpenTelemetry HTTP server semantic-conventions buckets + // since queue task durations span a similar 5 ms to 10 s range + // (network-bound outbox delivery dominates the tail). + explicitBucketBoundaries: [ + 5, + 10, + 25, + 50, + 75, + 100, + 250, + 500, + 750, + 1000, + 2500, + 5000, + 7500, + 10000, + ], + }, }, ); this.queueTaskInFlight = meter.createUpDownCounter( From de2d3d91245ac2979067cce85075a217d4523757 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Sun, 10 May 2026 15:58:55 +0900 Subject: [PATCH 7/9] Share recordOutboxEnqueue() between outbox enqueue paths sendActivityInternal() and forwardActivityInternal() each defined an identical recordOutboxEnqueued() closure that just forwarded (activityType, attempt) to FederationMetrics.recordQueueTaskEnqueued() under role=outbox. Move the body to a single top-level function in metrics.ts, so future attribute additions only need one update site. The shared helper takes the meter provider, the outbox queue, and a structural { activityType, attempt } subset of OutboxMessage. Keeping the parameter as a structural type means metrics.ts does not need to import from queue.ts. https://github.com/fedify-dev/fedify/pull/759#discussion_r3214392678 Assisted-by: Claude Code:claude-opus-4-7 Assisted-by: CodeRabbit --- packages/fedify/src/federation/metrics.ts | 24 ++++++++++++ packages/fedify/src/federation/middleware.ts | 41 ++++++++------------ 2 files changed, 41 insertions(+), 24 deletions(-) diff --git a/packages/fedify/src/federation/metrics.ts b/packages/fedify/src/federation/metrics.ts index 49ab99dd1..34169bcf2 100644 --- a/packages/fedify/src/federation/metrics.ts +++ b/packages/fedify/src/federation/metrics.ts @@ -317,6 +317,30 @@ export function getQueueBackend(queue?: MessageQueue): string | undefined { return name; } +/** + * Records `fedify.queue.task.enqueued` for an outgoing outbox enqueue. + * + * Both `Context.sendActivity()` and `OutboxContext.forwardActivity()` enqueue + * outbox messages with the same metric attributes (role, queue, activity + * type, attempt), so they share this helper rather than each defining a local + * closure. + * @since 2.3.0 + */ +export function recordOutboxEnqueue( + meterProvider: MeterProvider | undefined, + outboxQueue: MessageQueue, + message: { readonly activityType: string; readonly attempt: number }, +): void { + getFederationMetrics(meterProvider).recordQueueTaskEnqueued( + { + role: "outbox", + queue: outboxQueue, + activityType: message.activityType, + }, + message.attempt, + ); +} + /** * Whether the given thrown value is an `AbortError`. * diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index a79ad3a1f..b265dea6b 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -110,6 +110,7 @@ import { isAbortError, type QueueTaskCommonAttributes, type QueueTaskResult, + recordOutboxEnqueue, } from "./metrics.ts"; import type { MessageQueue } from "./mq.ts"; import { acceptsJsonLd } from "./negotiation.ts"; @@ -1384,16 +1385,6 @@ export class FederationImpl messages.push({ message, orderingKey: messageOrderingKey }); } const { outboxQueue } = this; - const recordOutboxEnqueued = (message: OutboxMessage): void => { - getFederationMetrics(this.meterProvider).recordQueueTaskEnqueued( - { - role: "outbox", - queue: outboxQueue, - activityType: message.activityType, - }, - message.attempt, - ); - }; // enqueueMany does not support per-message orderingKey, so fall back to // individual enqueues whenever orderingKey is specified or the backend // does not implement enqueueMany. @@ -1401,7 +1392,7 @@ export class FederationImpl const promises: PromiseSettledResult[] = await Promise.allSettled( messages.map(async (m) => { await outboxQueue.enqueue(m.message, { orderingKey: m.orderingKey }); - recordOutboxEnqueued(m.message); + recordOutboxEnqueue(this.meterProvider, outboxQueue, m.message); }), ); const errors = promises @@ -1430,7 +1421,9 @@ export class FederationImpl ); throw error; } - for (const m of messages) recordOutboxEnqueued(m.message); + for (const m of messages) { + recordOutboxEnqueue(this.meterProvider, outboxQueue, m.message); + } } } @@ -3355,21 +3348,15 @@ async function forwardActivityInternal( }); } const { outboxQueue } = ctx.federation; - const recordOutboxEnqueued = (message: OutboxMessage): void => { - getFederationMetrics(ctx.federation.meterProvider).recordQueueTaskEnqueued( - { - role: "outbox", - queue: outboxQueue, - activityType: message.activityType, - }, - message.attempt, - ); - }; if (outboxQueue.enqueueMany == null || orderingKey != null) { const promises: PromiseSettledResult[] = await Promise.allSettled( messages.map(async (m) => { await outboxQueue.enqueue(m.message, { orderingKey: m.orderingKey }); - recordOutboxEnqueued(m.message); + recordOutboxEnqueue( + ctx.federation.meterProvider, + outboxQueue, + m.message, + ); }), ); const errors: unknown[] = promises @@ -3398,7 +3385,13 @@ async function forwardActivityInternal( ); throw error; } - for (const m of messages) recordOutboxEnqueued(m.message); + for (const m of messages) { + recordOutboxEnqueue( + ctx.federation.meterProvider, + outboxQueue, + m.message, + ); + } } return true; } From 9ff3f303fb962f8c3cd2b03b034ba92dfe6d6f2e Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Sun, 10 May 2026 17:23:46 +0900 Subject: [PATCH 8/9] Leave queue task span unset on AbortError processQueuedTask() distinguishes aborted queue tasks via fedify.queue.task.result=aborted so that backend shutdown signals do not inflate the failed counter, but each catch block still set SpanStatusCode.ERROR on the consumer span unconditionally. The result was that trace error-rate views reported every cancelled task as a failure even though the metric side already excluded them. Per the OpenTelemetry "Recording errors" guidance, span status MUST remain UNSET for operations that ended without an actual error, and should only flip to ERROR when there is one. Cancellation does not qualify on its own. Guard the span.setStatus() calls in all three worker branches (fanout, outbox, inbox) on isAbortError(e) being false. The metric outcome classification is unchanged. An assertion on the abort-path test confirms the inbox span's status code stays UNSET. https://github.com/fedify-dev/fedify/pull/759#discussion_r3214471516 Assisted-by: Claude Code:claude-opus-4-7 Assisted-by: CodeRabbit --- .../fedify/src/federation/middleware.test.ts | 9 +++++ packages/fedify/src/federation/middleware.ts | 39 ++++++++++++------- 2 files changed, 33 insertions(+), 15 deletions(-) diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index 265f4b176..e5339a43c 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -44,6 +44,7 @@ import { rsaPublicKey3, } from "../testing/keys.ts"; import { FetchError, getDocumentLoader } from "@fedify/vocab-runtime"; +import { SpanStatusCode } from "@opentelemetry/api"; import { getAuthenticatedDocumentLoader } from "../utils/docloader.ts"; const documentLoader = getDocumentLoader(); @@ -4292,6 +4293,7 @@ test("FederationImpl.processQueuedTask() queue task metrics", async (t) => { async () => { const kv = new MemoryKvStore(); const [meterProvider, recorder] = createTestMeterProvider(); + const [tracerProvider, exporter] = createTestTracerProvider(); const queue: MessageQueue = { nativeRetrial: true, enqueue(_message, _options) { @@ -4304,6 +4306,7 @@ test("FederationImpl.processQueuedTask() queue task metrics", async (t) => { const federation = new FederationImpl({ kv, meterProvider, + tracerProvider, queue, }); federation.setInboxListeners("/users/{identifier}/inbox", "/inbox") @@ -4351,6 +4354,12 @@ test("FederationImpl.processQueuedTask() queue task metrics", async (t) => { taskDurations[0].attributes["fedify.queue.task.result"], "aborted", ); + + // Per OpenTelemetry guidance, the inbox span should remain UNSET for + // cancellation and not flip into ERROR status. + const inboxSpans = exporter.getSpans("activitypub.inbox"); + assertEquals(inboxSpans.length, 1); + assertEquals(inboxSpans[0].status.code, SpanStatusCode.UNSET); }, ); diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index b265dea6b..b75237bb8 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -527,11 +527,14 @@ export class FederationImpl try { await this.#listenFanoutMessage(contextData, message); } catch (e) { - outcome = isAbortError(e) ? "aborted" : "failed"; - span.setStatus({ - code: SpanStatusCode.ERROR, - message: String(e), - }); + const aborted = isAbortError(e); + outcome = aborted ? "aborted" : "failed"; + if (!aborted) { + span.setStatus({ + code: SpanStatusCode.ERROR, + message: String(e), + }); + } throw e; } finally { meter.recordQueueTaskOutcome( @@ -580,11 +583,14 @@ export class FederationImpl try { await this.#listenOutboxMessage(contextData, message, span); } catch (e) { - outcome = isAbortError(e) ? "aborted" : "failed"; - span.setStatus({ - code: SpanStatusCode.ERROR, - message: String(e), - }); + const aborted = isAbortError(e); + outcome = aborted ? "aborted" : "failed"; + if (!aborted) { + span.setStatus({ + code: SpanStatusCode.ERROR, + message: String(e), + }); + } throw e; } finally { meter.recordQueueTaskOutcome( @@ -632,11 +638,14 @@ export class FederationImpl }, ); } catch (e) { - outcome = isAbortError(e) ? "aborted" : "failed"; - span.setStatus({ - code: SpanStatusCode.ERROR, - message: String(e), - }); + const aborted = isAbortError(e); + outcome = aborted ? "aborted" : "failed"; + if (!aborted) { + span.setStatus({ + code: SpanStatusCode.ERROR, + message: String(e), + }); + } throw e; } finally { meter.recordQueueTaskOutcome( From 55dd49a00ede0b69dff1809b6c60bbefc3b3bdfe Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Mon, 11 May 2026 18:45:28 +0900 Subject: [PATCH 9/9] Remove em dashes from queue metrics docs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace spaced em dashes ( — ) with parentheses in the newly added queue task metrics documentation, per the project prose style that avoids em dashes in project-facing text. Assisted-by: Claude Code:claude-sonnet-4-6 --- docs/manual/mq.md | 4 ++-- docs/manual/opentelemetry.md | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/manual/mq.md b/docs/manual/mq.md index e2948967d..db77e9cad 100644 --- a/docs/manual/mq.md +++ b/docs/manual/mq.md @@ -1024,9 +1024,9 @@ shared queue. Reporting the same `getDepth()` result separately for each logical role would double- or triple-count the backlog. Queue depth covers only the *backend* side of the queue. To see what -Fedify's workers are doing with the dequeued messages — enqueue rate, task +Fedify's workers are doing with the dequeued messages (enqueue rate, task processing duration, completion versus failure, and how many tasks are in -flight per process — read the matching [`fedify.queue.task.*` OpenTelemetry +flight per process), read the matching [`fedify.queue.task.*` OpenTelemetry metrics](./opentelemetry.md#instrumented-metrics). Backlog depth and task throughput together let you tell a slowly draining queue apart from one that simply sees less traffic. diff --git a/docs/manual/opentelemetry.md b/docs/manual/opentelemetry.md index 7cbe70e08..3eb5acc64 100644 --- a/docs/manual/opentelemetry.md +++ b/docs/manual/opentelemetry.md @@ -374,7 +374,7 @@ Fedify records the following OpenTelemetry metrics: returns normally (`result=completed`) without scheduling a retry. Outbox-side activity failures remain observable through the `activitypub.delivery.*` metrics and the `activitypub.delivery.failed` - span event, and any retry attempt — inbox or outbox — appears as a + span event, and any retry attempt (inbox or outbox) appears as a `fedify.queue.task.enqueued` measurement with a non-zero `fedify.queue.task.attempt`. Inbox listener errors that the retry policy abandons are visible through error logs and the inbox span's error status, @@ -395,7 +395,7 @@ The `fedify.queue.task.*` metrics describe what Fedify's workers do with queued messages. They complement the backend-side [`MessageQueue.getDepth()` API](./mq.md#queue-depth-reporting), which reports how many messages are currently waiting in the queue backend. -Reading both signals together — task throughput plus backlog depth — +Reading both signals together (task throughput plus backlog depth) makes it possible to distinguish a small, slow queue from a large, fast one and to set alerting thresholds for delivery latency under load.