From 146d1c74fea4a795b6c50bda21e6ff62918110dd Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Mon, 18 May 2026 08:13:41 +0900 Subject: [PATCH 01/10] Scaffold ActivityPub lifecycle metric instruments Add the instrument, type, and helper plumbing for three new metrics introduced to close the gap between per-delivery counters (added in https://github.com/fedify-dev/fedify/issues/619) and per-queue-task counters (added in https://github.com/fedify-dev/fedify/pull/759). Operators want a view of activity-level pressure that does not depend on the queue mechanism: how many recipients a fanout produced and how many activities flowed through the inbox or outbox lifecycle. This commit only registers the instruments; the call sites that actually record measurements arrive in subsequent commits. Three new instruments: - activitypub.fanout.recipients (Histogram, {recipient}) records the recipient count for a single fanout. - activitypub.inbox.activity (Counter, {activity}) classifies an inbound activity as queued, processed, retried, rejected, or abandoned via the new activitypub.processing.result attribute. - activitypub.outbox.activity (Counter, {activity}) classifies an outbound activity as queued, retried, or abandoned via the same attribute. Per-recipient sent/failed views are left on activitypub.delivery.* and not duplicated here. Two new union types (InboxActivityResult, OutboxActivityResult) bound the attribute values so cardinality stays safe. Helpers (recordFanoutRecipients, recordInboxActivity, recordOutboxActivity) mirror the existing recordOutboxEnqueue style and route through the cached getFederationMetrics(meterProvider). https://github.com/fedify-dev/fedify/issues/742 Assisted-by: Claude Code:claude-opus-4-7 --- .../fedify/src/federation/metrics.test.ts | 113 ++++++++++++ packages/fedify/src/federation/metrics.ts | 163 ++++++++++++++++++ 2 files changed, 276 insertions(+) create mode 100644 packages/fedify/src/federation/metrics.test.ts diff --git a/packages/fedify/src/federation/metrics.test.ts b/packages/fedify/src/federation/metrics.test.ts new file mode 100644 index 000000000..eb7298308 --- /dev/null +++ b/packages/fedify/src/federation/metrics.test.ts @@ -0,0 +1,113 @@ +import { createTestMeterProvider, test } from "@fedify/fixture"; +import { assertEquals } from "@std/assert"; +import { + recordFanoutRecipients, + recordInboxActivity, + recordOutboxActivity, +} from "./metrics.ts"; + +test("recordFanoutRecipients() records the recipient count with activity type", () => { + const [meterProvider, recorder] = createTestMeterProvider(); + recordFanoutRecipients( + meterProvider, + "https://www.w3.org/ns/activitystreams#Create", + 7, + ); + const measurements = recorder.getMeasurements( + "activitypub.fanout.recipients", + ); + assertEquals(measurements.length, 1); + assertEquals(measurements[0].type, "histogram"); + assertEquals(measurements[0].value, 7); + assertEquals( + measurements[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); +}); + +test("recordFanoutRecipients() omits activity type when unknown", () => { + const [meterProvider, recorder] = createTestMeterProvider(); + recordFanoutRecipients(meterProvider, undefined, 0); + const measurements = recorder.getMeasurements( + "activitypub.fanout.recipients", + ); + assertEquals(measurements.length, 1); + assertEquals(measurements[0].value, 0); + assertEquals( + "activitypub.activity.type" in measurements[0].attributes, + false, + ); +}); + +test("recordInboxActivity() records counter with result and activity type", () => { + const [meterProvider, recorder] = createTestMeterProvider(); + for ( + const result of [ + "queued", + "processed", + "retried", + "rejected", + "abandoned", + ] as const + ) { + recordInboxActivity( + meterProvider, + result, + "https://www.w3.org/ns/activitystreams#Follow", + ); + } + const measurements = recorder.getMeasurements("activitypub.inbox.activity"); + assertEquals(measurements.length, 5); + for (const m of measurements) { + assertEquals(m.type, "counter"); + assertEquals(m.value, 1); + assertEquals( + m.attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Follow", + ); + } + assertEquals( + measurements.map((m) => m.attributes["activitypub.processing.result"]), + ["queued", "processed", "retried", "rejected", "abandoned"], + ); +}); + +test("recordInboxActivity() omits activity type when unknown", () => { + const [meterProvider, recorder] = createTestMeterProvider(); + recordInboxActivity(meterProvider, "rejected"); + const measurements = recorder.getMeasurements("activitypub.inbox.activity"); + assertEquals(measurements.length, 1); + assertEquals( + measurements[0].attributes["activitypub.processing.result"], + "rejected", + ); + assertEquals( + "activitypub.activity.type" in measurements[0].attributes, + false, + ); +}); + +test("recordOutboxActivity() records counter with result and activity type", () => { + const [meterProvider, recorder] = createTestMeterProvider(); + for (const result of ["queued", "retried", "abandoned"] as const) { + recordOutboxActivity( + meterProvider, + result, + "https://www.w3.org/ns/activitystreams#Announce", + ); + } + const measurements = recorder.getMeasurements("activitypub.outbox.activity"); + assertEquals(measurements.length, 3); + for (const m of measurements) { + assertEquals(m.type, "counter"); + assertEquals(m.value, 1); + assertEquals( + m.attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Announce", + ); + } + assertEquals( + measurements.map((m) => m.attributes["activitypub.processing.result"]), + ["queued", "retried", "abandoned"], + ); +}); diff --git a/packages/fedify/src/federation/metrics.ts b/packages/fedify/src/federation/metrics.ts index 64373c02e..11e9558a3 100644 --- a/packages/fedify/src/federation/metrics.ts +++ b/packages/fedify/src/federation/metrics.ts @@ -21,6 +21,52 @@ export type QueueTaskRole = "fanout" | "outbox" | "inbox"; */ export type QueueTaskResult = "completed" | "failed" | "aborted"; +/** + * The lifecycle event classification recorded on + * `activitypub.inbox.activity` as the `activitypub.processing.result` + * attribute. Tracks Fedify-managed events at the activity level, separate + * from the per-delivery and per-queue-task metrics. + * + * - `queued`: the activity was accepted at the inbox endpoint and + * enqueued for background processing. + * - `processed`: the registered listener finished without throwing. + * - `retried`: Fedify scheduled a retry after the listener threw. + * - `rejected`: Fedify refused to process the activity (missing actor, + * duplicate, unsupported type, or no-queue listener error). + * - `abandoned`: the inbox retry policy gave up after exhausted attempts. + * + * Native-retry message queue backends will not record `retried` or + * `abandoned` because Fedify defers retry handling to the backend. + * @since 2.3.0 + */ +export type InboxActivityResult = + | "queued" + | "processed" + | "retried" + | "rejected" + | "abandoned"; + +/** + * The lifecycle event classification recorded on + * `activitypub.outbox.activity` as the `activitypub.processing.result` + * attribute. Tracks Fedify-managed events at the outbox-task level, + * separate from the per-delivery counters defined in #619. + * + * - `queued`: an outbox or fanout task was enqueued for the activity + * (initial enqueue only; subsequent retry enqueues are recorded as + * `retried`). + * - `retried`: Fedify scheduled a retry after a delivery failure. + * - `abandoned`: the outbox retry policy gave up after exhausted + * attempts. + * + * The per-recipient `sent`/`failed` view lives on + * `activitypub.delivery.sent` and `activitypub.delivery.permanent_failure` + * and is not duplicated here. Native-retry backends will not record + * `retried` or `abandoned`. + * @since 2.3.0 + */ +export type OutboxActivityResult = "queued" | "retried" | "abandoned"; + /** * Common attributes shared by all queue task metrics. * @since 2.3.0 @@ -173,6 +219,9 @@ class FederationMetrics { readonly queueTaskFailed: Counter; readonly queueTaskDuration: Histogram; readonly queueTaskInFlight: UpDownCounter; + readonly fanoutRecipients: Histogram; + readonly inboxActivity: Counter; + readonly outboxActivity: Counter; constructor(meterProvider: MeterProvider) { const meter = meterProvider.getMeter(metadata.name, metadata.version); @@ -315,6 +364,28 @@ class FederationMetrics { unit: "{task}", }, ); + this.fanoutRecipients = meter.createHistogram( + "activitypub.fanout.recipients", + { + description: + "Number of recipient inboxes produced by an ActivityPub fanout " + + "task.", + unit: "{recipient}", + }, + ); + this.inboxActivity = meter.createCounter("activitypub.inbox.activity", { + description: + "ActivityPub activities observed at the inbox lifecycle level: " + + "queued, processed, retried, rejected, or abandoned.", + unit: "{activity}", + }); + this.outboxActivity = meter.createCounter("activitypub.outbox.activity", { + description: + "ActivityPub activities observed at the outbox lifecycle level: " + + "queued, retried, or abandoned. Per-recipient delivery counters " + + "live on `activitypub.delivery.*`.", + unit: "{activity}", + }); } recordDelivery( @@ -454,6 +525,47 @@ class FederationMetrics { } this.queueTaskDuration.record(durationMs, attributes); } + + recordFanoutRecipients(recipientCount: number, activityType?: string): void { + const attributes: Attributes = {}; + if (activityType != null) { + attributes["activitypub.activity.type"] = activityType; + } + this.fanoutRecipients.record(recipientCount, attributes); + } + + recordInboxActivity( + result: InboxActivityResult, + activityType?: string, + ): void { + this.inboxActivity.add( + 1, + buildActivityLifecycleAttributes(result, activityType), + ); + } + + recordOutboxActivity( + result: OutboxActivityResult, + activityType?: string, + ): void { + this.outboxActivity.add( + 1, + buildActivityLifecycleAttributes(result, activityType), + ); + } +} + +function buildActivityLifecycleAttributes( + result: InboxActivityResult | OutboxActivityResult, + activityType?: string, +): Attributes { + const attributes: Attributes = { + "activitypub.processing.result": result, + }; + if (activityType != null) { + attributes["activitypub.activity.type"] = activityType; + } + return attributes; } function buildQueueTaskAttributes( @@ -523,6 +635,57 @@ export function recordOutboxEnqueue( ); } +/** + * Records `activitypub.fanout.recipients` with the number of recipient + * inboxes a single fanout produced. The histogram is unitless count + * (one measurement per fanout enqueue). Recipient URLs are deliberately + * not recorded; only the activity type, when known. + * @since 2.3.0 + */ +export function recordFanoutRecipients( + meterProvider: MeterProvider | undefined, + activityType: string | undefined, + recipientCount: number, +): void { + getFederationMetrics(meterProvider).recordFanoutRecipients( + recipientCount, + activityType, + ); +} + +/** + * Records one `activitypub.inbox.activity` measurement. The + * `activitypub.processing.result` attribute is always present; + * `activitypub.activity.type` is recorded only when Fedify already knows + * the activity type. + * @since 2.3.0 + */ +export function recordInboxActivity( + meterProvider: MeterProvider | undefined, + result: InboxActivityResult, + activityType?: string, +): void { + getFederationMetrics(meterProvider).recordInboxActivity(result, activityType); +} + +/** + * Records one `activitypub.outbox.activity` measurement. The + * `activitypub.processing.result` attribute is always present; + * `activitypub.activity.type` is recorded only when Fedify already knows + * the activity type (it is always known for outbox lifecycle events). + * @since 2.3.0 + */ +export function recordOutboxActivity( + meterProvider: MeterProvider | undefined, + result: OutboxActivityResult, + activityType?: string, +): void { + getFederationMetrics(meterProvider).recordOutboxActivity( + result, + activityType, + ); +} + /** * Times an awaited public key fetch and records exactly one * `activitypub.signature.key_fetch.duration` measurement, classifying the From 84997c3ae5df994c09c19540e3d7121f2c5aaf65 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Mon, 18 May 2026 08:22:38 +0900 Subject: [PATCH 02/10] Record activitypub.fanout.recipients on fanout enqueue Record the new histogram metric at the point where Fedify enqueues a fanout message. The recipient count and activity type are already known there, and recording before the message is acted on means operators can see how much pressure each fanout produced even when the downstream outbox queue is slow or backed up. Recording is placed after the fanoutQueue.enqueue() await so that an enqueue failure does not inflate the histogram. Recipient URLs and actor IDs deliberately stay out of the metric; only the bounded activity type IRI accompanies the count. https://github.com/fedify-dev/fedify/issues/742 Assisted-by: Claude Code:claude-opus-4-7 --- .../fedify/src/federation/middleware.test.ts | 72 +++++++++++++++++++ packages/fedify/src/federation/middleware.ts | 6 ++ 2 files changed, 78 insertions(+) diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index 43d101453..c6fa1396c 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -5367,6 +5367,78 @@ test("ContextImpl.sendActivity()", async (t) => { fetchMock.hardReset(); }); +test("ContextImpl.sendActivity() records fanout recipient metrics", async () => { + const kv = new MemoryKvStore(); + const [meterProvider, recorder] = createTestMeterProvider(); + const queue: MessageQueue & { messages: Message[] } = { + messages: [], + enqueue(message) { + this.messages.push(message); + return Promise.resolve(); + }, + async listen() {}, + }; + const federation = new FederationImpl({ + kv, + contextLoaderFactory: () => mockDocumentLoader, + queue, + meterProvider, + }); + federation + .setActorDispatcher("/{identifier}", async (ctx, identifier) => { + if (identifier !== "john") return null; + const keys = await ctx.getActorKeyPairs(identifier); + return new vocab.Person({ + id: ctx.getActorUri(identifier), + preferredUsername: "john", + publicKey: keys[0].cryptographicKey, + assertionMethods: keys.map((k) => k.multikey), + }); + }) + .setKeyPairsDispatcher((_ctx, identifier) => { + if (identifier !== "john") return []; + return [ + { privateKey: rsaPrivateKey2, publicKey: rsaPublicKey2.publicKey! }, + { + privateKey: ed25519PrivateKey, + publicKey: ed25519PublicKey.publicKey!, + }, + ]; + }); + const ctx = new ContextImpl({ + data: undefined, + federation, + url: new URL("https://example.com/"), + documentLoader: mockDocumentLoader, + contextLoader: mockDocumentLoader, + }); + const activity = new vocab.Create({ + id: new URL("https://example.com/activity/1"), + actor: new URL("https://example.com/person"), + }); + const recipients = Array.from({ length: 7 }, (_, i) => ({ + id: new URL(`https://example${i + 1}.com/recipient`), + inboxId: new URL(`https://example${i + 1}.com/inbox`), + })); + await ctx.sendActivity({ username: "john" }, recipients, activity, { + fanout: "force", + }); + + assertEquals(queue.messages.length, 1); + assertEquals(queue.messages[0].type, "fanout"); + + const measurements = recorder.getMeasurements( + "activitypub.fanout.recipients", + ); + assertEquals(measurements.length, 1); + assertEquals(measurements[0].type, "histogram"); + assertEquals(measurements[0].value, recipients.length); + assertEquals( + measurements[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); +}); + test({ name: "ContextImpl.routeActivity()", permissions: { env: true, read: true }, diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index f52e00318..012b4cf25 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -110,6 +110,7 @@ import { isAbortError, type QueueTaskCommonAttributes, type QueueTaskResult, + recordFanoutRecipients, recordOutboxEnqueue, } from "./metrics.ts"; import type { MessageQueue } from "./mq.ts"; @@ -2725,6 +2726,11 @@ export class ContextImpl implements Context { }, 0, ); + recordFanoutRecipients( + this.federation.meterProvider, + message.activityType, + globalThis.Object.keys(message.inboxes).length, + ); return true; } From 9b05a4967b2a8dff11cbd9735701f2645ec596fb Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Mon, 18 May 2026 08:33:31 +0900 Subject: [PATCH 03/10] Record outbox activity lifecycle counters Wire the activitypub.outbox.activity counter at the three Fedify- managed lifecycle points for an outbound activity: - queued: recorded from recordOutboxEnqueue() when the message's attempt is 0, so both Context.sendActivity() and OutboxContext.forwardActivity() benefit without per-caller wiring. Retry re-enqueues (attempt > 0) intentionally skip this row; the retry-scheduling site records result=retried instead, with the failure context. - retried: recorded inside #listenOutboxMessage() after the retry message is enqueued. Native-retrial backends short- circuit earlier with a thrown error, so this counter remains a Fedify-managed signal only. - abandoned: recorded in the same handler when the retry policy returns null and Fedify gives up on the delivery. Per-recipient sent/failed are deliberately left on activitypub.delivery.sent and activitypub.delivery.permanent_failure so this counter stays activity-centric and does not duplicate the #619 delivery metrics. https://github.com/fedify-dev/fedify/issues/742 Assisted-by: Claude Code:claude-opus-4-7 Assisted-by: Codex:gpt-5.5 --- .../fedify/src/federation/metrics.test.ts | 42 +++++++ packages/fedify/src/federation/metrics.ts | 15 ++- .../fedify/src/federation/middleware.test.ts | 117 ++++++++++++++++++ packages/fedify/src/federation/middleware.ts | 11 ++ 4 files changed, 182 insertions(+), 3 deletions(-) diff --git a/packages/fedify/src/federation/metrics.test.ts b/packages/fedify/src/federation/metrics.test.ts index eb7298308..45ed54929 100644 --- a/packages/fedify/src/federation/metrics.test.ts +++ b/packages/fedify/src/federation/metrics.test.ts @@ -1,11 +1,22 @@ import { createTestMeterProvider, test } from "@fedify/fixture"; import { assertEquals } from "@std/assert"; +import type { MessageQueue } from "./mq.ts"; import { recordFanoutRecipients, recordInboxActivity, recordOutboxActivity, + recordOutboxEnqueue, } from "./metrics.ts"; +const noopQueue: MessageQueue = { + enqueue() { + return Promise.resolve(); + }, + listen() { + return Promise.resolve(); + }, +}; + test("recordFanoutRecipients() records the recipient count with activity type", () => { const [meterProvider, recorder] = createTestMeterProvider(); recordFanoutRecipients( @@ -87,6 +98,37 @@ test("recordInboxActivity() omits activity type when unknown", () => { ); }); +test("recordOutboxEnqueue() also records activitypub.outbox.activity{queued} on initial enqueue", () => { + const [meterProvider, recorder] = createTestMeterProvider(); + recordOutboxEnqueue(meterProvider, noopQueue, { + activityType: "https://www.w3.org/ns/activitystreams#Create", + attempt: 0, + }); + const queued = recorder.getMeasurements("activitypub.outbox.activity"); + assertEquals(queued.length, 1); + assertEquals(queued[0].type, "counter"); + assertEquals( + queued[0].attributes["activitypub.processing.result"], + "queued", + ); + assertEquals( + queued[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); +}); + +test("recordOutboxEnqueue() does not record outbox.activity{queued} on retry enqueues", () => { + const [meterProvider, recorder] = createTestMeterProvider(); + recordOutboxEnqueue(meterProvider, noopQueue, { + activityType: "https://www.w3.org/ns/activitystreams#Create", + attempt: 1, + }); + assertEquals( + recorder.getMeasurements("activitypub.outbox.activity").length, + 0, + ); +}); + test("recordOutboxActivity() records counter with result and activity type", () => { const [meterProvider, recorder] = createTestMeterProvider(); for (const result of ["queued", "retried", "abandoned"] as const) { diff --git a/packages/fedify/src/federation/metrics.ts b/packages/fedify/src/federation/metrics.ts index 11e9558a3..60421ac4d 100644 --- a/packages/fedify/src/federation/metrics.ts +++ b/packages/fedify/src/federation/metrics.ts @@ -612,12 +612,17 @@ export function getQueueBackend(queue?: MessageQueue): string | undefined { } /** - * Records `fedify.queue.task.enqueued` for an outgoing outbox enqueue. + * Records `fedify.queue.task.enqueued` for an outgoing outbox enqueue and, + * for the initial attempt, also records + * `activitypub.outbox.activity{queued}`. * * Both `Context.sendActivity()` and `OutboxContext.forwardActivity()` enqueue * outbox messages with the same metric attributes (role, queue, activity * type, attempt), so they share this helper rather than each defining a local - * closure. + * closure. Retry enqueues (attempt > 0) intentionally do not record a + * second `activitypub.outbox.activity{queued}`; retries are reported as + * `result=retried` from the retry-scheduling site, which has the failure + * context. * @since 2.3.0 */ export function recordOutboxEnqueue( @@ -625,7 +630,8 @@ export function recordOutboxEnqueue( outboxQueue: MessageQueue, message: { readonly activityType: string; readonly attempt: number }, ): void { - getFederationMetrics(meterProvider).recordQueueTaskEnqueued( + const metrics = getFederationMetrics(meterProvider); + metrics.recordQueueTaskEnqueued( { role: "outbox", queue: outboxQueue, @@ -633,6 +639,9 @@ export function recordOutboxEnqueue( }, message.attempt, ); + if (message.attempt === 0) { + metrics.recordOutboxActivity("queued", message.activityType); + } } /** diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index c6fa1396c..880c12a4f 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -3656,6 +3656,123 @@ test("FederationImpl.processQueuedTask()", async (t) => { assertEquals(queuedMessages, [{ ...inboxMessage, attempt: 1 }]); }); + await t.step( + "records activitypub.outbox.activity retry on transient failure", + async () => { + const kv = new MemoryKvStore(); + const [meterProvider, recorder] = createTestMeterProvider(); + const queue: MessageQueue = { + enqueue(_message, _options) { + return Promise.resolve(); + }, + listen(_handler, _options) { + return Promise.resolve(); + }, + }; + const federation = new FederationImpl({ + kv, + meterProvider, + queue, + }); + + await federation.processQueuedTask( + undefined, + { + type: "outbox", + id: crypto.randomUUID(), + baseUrl: "https://example.com", + keys: [], + activity: { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Create", + actor: "https://example.com/users/alice", + object: { type: "Note", content: "test" }, + }, + activityType: "https://www.w3.org/ns/activitystreams#Create", + inbox: "https://invalid-domain-that-does-not-exist.example/inbox", + sharedInbox: false, + started: new Date().toISOString(), + attempt: 0, + headers: {}, + traceContext: {}, + } satisfies OutboxMessage, + ); + + const outboxLifecycle = recorder.getMeasurements( + "activitypub.outbox.activity", + ); + assertEquals(outboxLifecycle.length, 1); + assertEquals(outboxLifecycle[0].type, "counter"); + assertEquals( + outboxLifecycle[0].attributes["activitypub.processing.result"], + "retried", + ); + assertEquals( + outboxLifecycle[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); + }, + ); + + await t.step( + "records activitypub.outbox.activity abandoned when retry policy gives up", + async () => { + const kv = new MemoryKvStore(); + const [meterProvider, recorder] = createTestMeterProvider(); + const queue: MessageQueue = { + enqueue(_message, _options) { + return Promise.resolve(); + }, + listen(_handler, _options) { + return Promise.resolve(); + }, + }; + const federation = new FederationImpl({ + kv, + meterProvider, + queue, + outboxRetryPolicy: () => null, + }); + + await federation.processQueuedTask( + undefined, + { + type: "outbox", + id: crypto.randomUUID(), + baseUrl: "https://example.com", + keys: [], + activity: { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Follow", + actor: "https://example.com/users/alice", + object: "https://remote.example/users/bob", + }, + activityType: "https://www.w3.org/ns/activitystreams#Follow", + inbox: "https://invalid-domain-that-does-not-exist.example/inbox", + sharedInbox: false, + started: new Date().toISOString(), + attempt: 0, + headers: {}, + traceContext: {}, + } satisfies OutboxMessage, + ); + + const outboxLifecycle = recorder.getMeasurements( + "activitypub.outbox.activity", + ); + assertEquals(outboxLifecycle.length, 1); + assertEquals(outboxLifecycle[0].type, "counter"); + assertEquals( + outboxLifecycle[0].attributes["activitypub.processing.result"], + "abandoned", + ); + assertEquals( + outboxLifecycle[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Follow", + ); + }, + ); + await t.step("records queued inbox processing duration", async () => { const kv = new MemoryKvStore(); const [meterProvider, recorder] = createTestMeterProvider(); diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index 012b4cf25..464b7dbc1 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -111,6 +111,7 @@ import { type QueueTaskCommonAttributes, type QueueTaskResult, recordFanoutRecipients, + recordOutboxActivity, recordOutboxEnqueue, } from "./metrics.ts"; import type { MessageQueue } from "./mq.ts"; @@ -900,6 +901,11 @@ export class FederationImpl }, retryMessage.attempt, ); + recordOutboxActivity( + this.meterProvider, + "retried", + retryMessage.activityType, + ); } } else { logger.error( @@ -907,6 +913,11 @@ export class FederationImpl "attempts; giving up:\n{error}", { ...logData, error }, ); + recordOutboxActivity( + this.meterProvider, + "abandoned", + message.activityType, + ); } return; } From eec221df99cd83ffed55b97b9036aa5a89db505b Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Mon, 18 May 2026 08:48:49 +0900 Subject: [PATCH 04/10] Record inbox activity lifecycle counters Wire the activitypub.inbox.activity counter at every Fedify-managed lifecycle point for an inbound activity, separating queue-mode and no-queue paths so the same five result values (queued, processed, retried, rejected, abandoned) classify both. In routeActivity() (no-queue routing and queue handoff): - rejected: idempotency cache hit at the HTTP routing layer, missing actor, no-queue listener error, and unsupported activity type with no registered listener. - queued: successful queue.enqueue() of the incoming message. - processed: successful no-queue listener completion, recorded immediately after the listener returns and before the idempotency cache write so a kv.set() failure does not lose the event. In #listenInboxMessage() (queue worker path): - rejected: the rare second idempotency cache hit at processing time (race with concurrent processing), and the no-listener case observed at queue-processing time. - processed: successful queued listener completion (in the same before-cache-write position). - retried: Fedify enqueued a retry message because the listener threw and the retry policy returned a delay. Native-retrial backends short-circuit earlier with a thrown error and are intentionally not counted, mirroring the outbox lifecycle. - abandoned: the inbox retry policy returned null. Tests cover the queue worker lifecycle (processed/retried/abandoned) via processQueuedTask() and the routing layer (queued/processed/ rejected for both unsupported-type and duplicate paths) via ContextImpl.routeActivity(). https://github.com/fedify-dev/fedify/issues/742 Assisted-by: Claude Code:claude-opus-4-7 Assisted-by: Codex:gpt-5.5 --- packages/fedify/src/federation/inbox.ts | 28 +- .../fedify/src/federation/middleware.test.ts | 331 ++++++++++++++++++ packages/fedify/src/federation/middleware.ts | 6 + 3 files changed, 364 insertions(+), 1 deletion(-) diff --git a/packages/fedify/src/federation/inbox.ts b/packages/fedify/src/federation/inbox.ts index 955e2b5b5..8c2bc5c39 100644 --- a/packages/fedify/src/federation/inbox.ts +++ b/packages/fedify/src/federation/inbox.ts @@ -22,7 +22,11 @@ import type { KvKey, KvStore } from "./kv.ts"; import type { MessageQueue } from "./mq.ts"; import type { InboxMessage } from "./queue.ts"; import type { ActivityListenerSet } from "./activity-listener.ts"; -import { getDurationMs, getFederationMetrics } from "./metrics.ts"; +import { + getDurationMs, + getFederationMetrics, + recordInboxActivity, +} from "./metrics.ts"; export interface RouteActivityParameters { context: Context; @@ -141,12 +145,18 @@ export async function routeActivity( code: SpanStatusCode.UNSET, message: `Activity ${activity.id?.href} has already been processed.`, }); + recordInboxActivity( + meterProvider, + "rejected", + getTypeId(activity).href, + ); return "alreadyProcessed"; } } if (activity.actorId == null) { logger.error("Missing actor.", { activity: json }); span.setStatus({ code: SpanStatusCode.ERROR, message: "Missing actor." }); + recordInboxActivity(meterProvider, "rejected", getTypeId(activity).href); return "missingActor"; } span.setAttribute("activitypub.actor.id", activity.actorId.href); @@ -182,6 +192,7 @@ export async function routeActivity( { role: "inbox", queue, activityType: getTypeId(activity).href }, 0, ); + recordInboxActivity(meterProvider, "queued", getTypeId(activity).href); logger.info( "Activity {activityId} is enqueued.", { activityId: activity.id?.href, activity: json, recipient }, @@ -204,6 +215,11 @@ export async function routeActivity( code: SpanStatusCode.UNSET, message: `Unsupported activity type: ${getTypeId(activity!).href}`, }); + recordInboxActivity( + meterProvider, + "rejected", + getTypeId(activity!).href, + ); span.end(); return "unsupportedActivity"; } @@ -252,9 +268,19 @@ export async function routeActivity( }, ); span.setStatus({ code: SpanStatusCode.ERROR, message: String(error) }); + recordInboxActivity( + meterProvider, + "rejected", + getTypeId(activity!).href, + ); span.end(); return "error"; } + recordInboxActivity( + meterProvider, + "processed", + getTypeId(activity!).href, + ); if (cacheKey != null) { await kv.set(cacheKey, true, { ttl: Temporal.Duration.from({ days: 1 }), diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index 880c12a4f..7047caaea 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -3773,6 +3773,180 @@ test("FederationImpl.processQueuedTask()", async (t) => { }, ); + await t.step( + "records activitypub.inbox.activity processed on successful queued dispatch", + async () => { + const kv = new MemoryKvStore(); + const [meterProvider, recorder] = createTestMeterProvider(); + const queue: MessageQueue = { + enqueue(_message, _options) { + return Promise.resolve(); + }, + listen(_handler, _options) { + return Promise.resolve(); + }, + }; + const federation = new FederationImpl({ + kv, + meterProvider, + queue, + }); + federation.setInboxListeners("/users/{identifier}/inbox", "/inbox") + .on(vocab.Create, () => {}); + + await federation.processQueuedTask( + undefined, + { + type: "inbox", + id: crypto.randomUUID(), + baseUrl: "https://example.com", + activity: { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Create", + id: "https://example.com/activities/queued-processed", + actor: "https://remote.example/users/alice", + object: { type: "Note", content: "Hello world" }, + }, + started: new Date().toISOString(), + attempt: 0, + identifier: null, + traceContext: {}, + } satisfies InboxMessage, + ); + + const inboxLifecycle = recorder.getMeasurements( + "activitypub.inbox.activity", + ); + assertEquals(inboxLifecycle.length, 1); + assertEquals(inboxLifecycle[0].type, "counter"); + assertEquals( + inboxLifecycle[0].attributes["activitypub.processing.result"], + "processed", + ); + assertEquals( + inboxLifecycle[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); + }, + ); + + await t.step( + "records activitypub.inbox.activity retried on transient listener failure", + async () => { + const kv = new MemoryKvStore(); + const [meterProvider, recorder] = createTestMeterProvider(); + const queue: MessageQueue = { + enqueue(_message, _options) { + return Promise.resolve(); + }, + listen(_handler, _options) { + return Promise.resolve(); + }, + }; + const federation = new FederationImpl({ + kv, + meterProvider, + queue, + }); + federation.setInboxListeners("/users/{identifier}/inbox", "/inbox") + .on(vocab.Create, () => { + throw new Error("Intended error for testing"); + }); + + await federation.processQueuedTask( + undefined, + { + type: "inbox", + id: crypto.randomUUID(), + baseUrl: "https://example.com", + activity: { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Create", + id: "https://example.com/activities/queued-retried", + actor: "https://remote.example/users/alice", + object: { type: "Note", content: "Hello world" }, + }, + started: new Date().toISOString(), + attempt: 0, + identifier: null, + traceContext: {}, + } satisfies InboxMessage, + ); + + const inboxLifecycle = recorder.getMeasurements( + "activitypub.inbox.activity", + ); + assertEquals(inboxLifecycle.length, 1); + assertEquals( + inboxLifecycle[0].attributes["activitypub.processing.result"], + "retried", + ); + assertEquals( + inboxLifecycle[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); + }, + ); + + await t.step( + "records activitypub.inbox.activity abandoned when retry policy gives up", + async () => { + const kv = new MemoryKvStore(); + const [meterProvider, recorder] = createTestMeterProvider(); + const queue: MessageQueue = { + enqueue(_message, _options) { + return Promise.resolve(); + }, + listen(_handler, _options) { + return Promise.resolve(); + }, + }; + const federation = new FederationImpl({ + kv, + meterProvider, + queue, + inboxRetryPolicy: () => null, + }); + federation.setInboxListeners("/users/{identifier}/inbox", "/inbox") + .on(vocab.Create, () => { + throw new Error("Intended error for testing"); + }); + + await federation.processQueuedTask( + undefined, + { + type: "inbox", + id: crypto.randomUUID(), + baseUrl: "https://example.com", + activity: { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Create", + id: "https://example.com/activities/queued-abandoned", + actor: "https://remote.example/users/alice", + object: { type: "Note", content: "Hello world" }, + }, + started: new Date().toISOString(), + attempt: 0, + identifier: null, + traceContext: {}, + } satisfies InboxMessage, + ); + + const inboxLifecycle = recorder.getMeasurements( + "activitypub.inbox.activity", + ); + assertEquals(inboxLifecycle.length, 1); + assertEquals( + inboxLifecycle[0].attributes["activitypub.processing.result"], + "abandoned", + ); + assertEquals( + inboxLifecycle[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); + }, + ); + await t.step("records queued inbox processing duration", async () => { const kv = new MemoryKvStore(); const [meterProvider, recorder] = createTestMeterProvider(); @@ -5769,6 +5943,163 @@ test({ }, }); +test({ + name: "ContextImpl.routeActivity() records inbox.activity lifecycle metrics", + permissions: { env: true, read: true }, + async fn() { + const [meterProvider, recorder] = createTestMeterProvider(); + const queue: MessageQueue = { + enqueue(): Promise { + return Promise.resolve(); + }, + listen(): Promise { + return Promise.resolve(); + }, + }; + const federation = new FederationImpl({ + kv: new MemoryKvStore(), + meterProvider, + queue, + }); + federation.setInboxListeners("/u/{identifier}/i", "/i"); + + const ctx = new ContextImpl({ + url: new URL("https://example.com/"), + federation, + data: undefined, + documentLoader: mockDocumentLoader, + contextLoader: documentLoader, + }); + + const signedOffer = await signObject( + new vocab.Offer({ + id: new URL("https://example.com/offer-queued"), + actor: new URL("https://example.com/person2"), + }), + ed25519PrivateKey, + ed25519Multikey.id!, + ); + assert(await ctx.routeActivity(null, signedOffer)); + + const queued = recorder.getMeasurements("activitypub.inbox.activity"); + assertEquals(queued.length, 1); + assertEquals(queued[0].type, "counter"); + assertEquals( + queued[0].attributes["activitypub.processing.result"], + "queued", + ); + assertEquals( + queued[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Offer", + ); + }, +}); + +test({ + name: + "ContextImpl.routeActivity() records inbox.activity processed without queue", + permissions: { env: true, read: true }, + async fn() { + const [meterProvider, recorder] = createTestMeterProvider(); + const federation = new FederationImpl({ + kv: new MemoryKvStore(), + meterProvider, + }); + federation.setInboxListeners("/u/{identifier}/i", "/i") + .on(vocab.Offer, () => {}); + + const ctx = new ContextImpl({ + url: new URL("https://example.com/"), + federation, + data: undefined, + documentLoader: mockDocumentLoader, + contextLoader: documentLoader, + }); + + const signedOffer = await signObject( + new vocab.Offer({ + id: new URL("https://example.com/offer-processed"), + actor: new URL("https://example.com/person2"), + }), + ed25519PrivateKey, + ed25519Multikey.id!, + ); + assert(await ctx.routeActivity(null, signedOffer)); + + const processed = recorder.getMeasurements("activitypub.inbox.activity"); + assertEquals(processed.length, 1); + assertEquals( + processed[0].attributes["activitypub.processing.result"], + "processed", + ); + assertEquals( + processed[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Offer", + ); + }, +}); + +test({ + name: + "ContextImpl.routeActivity() records inbox.activity rejected for unsupported type and duplicates", + permissions: { env: true, read: true }, + async fn() { + const [meterProvider, recorder] = createTestMeterProvider(); + const federation = new FederationImpl({ + kv: new MemoryKvStore(), + meterProvider, + }); + federation.setInboxListeners("/u/{identifier}/i", "/i") + .on(vocab.Offer, () => {}); + + const ctx = new ContextImpl({ + url: new URL("https://example.com/"), + federation, + data: undefined, + documentLoader: mockDocumentLoader, + contextLoader: documentLoader, + }); + + // Unsupported activity type (Create has no listener). + const signedCreate = await signObject( + new vocab.Create({ + id: new URL("https://example.com/create-unsupported"), + actor: new URL("https://example.com/person2"), + }), + ed25519PrivateKey, + ed25519Multikey.id!, + ); + assert(await ctx.routeActivity(null, signedCreate)); + + // Duplicate Offer activity (re-route same id → idempotency cache hit). + const dupOffer = await signObject( + new vocab.Offer({ + id: new URL("https://example.com/offer-duplicate"), + actor: new URL("https://example.com/person2"), + }), + ed25519PrivateKey, + ed25519Multikey.id!, + ); + assert(await ctx.routeActivity(null, dupOffer)); + assert(await ctx.routeActivity(null, dupOffer)); + + const measurements = recorder.getMeasurements("activitypub.inbox.activity"); + const rejected = measurements.filter((m) => + m.attributes["activitypub.processing.result"] === "rejected" + ); + // One for the unsupported Create, one for the duplicate Offer. + assertEquals(rejected.length, 2); + assertEquals( + rejected[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); + assertEquals( + rejected[1].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Offer", + ); + }, +}); + test("ContextImpl.getCollectionUri()", () => { const federation = new FederationImpl({ kv: new MemoryKvStore() }); const base = "https://example.com"; diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index 464b7dbc1..79d1f9f68 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -111,6 +111,7 @@ import { type QueueTaskCommonAttributes, type QueueTaskResult, recordFanoutRecipients, + recordInboxActivity, recordOutboxActivity, recordOutboxEnqueue, } from "./metrics.ts"; @@ -972,6 +973,7 @@ export class FederationImpl activity: message.activity, recipient: message.identifier, }); + recordInboxActivity(this.meterProvider, "rejected", activityType); return; } } @@ -994,6 +996,7 @@ export class FederationImpl code: SpanStatusCode.ERROR, message: `Unsupported activity type: ${activityType}`, }); + recordInboxActivity(this.meterProvider, "rejected", activityType); span.end(); return; } @@ -1018,6 +1021,7 @@ export class FederationImpl getDurationMs(started), ); } + recordInboxActivity(this.meterProvider, "processed", activityType); } catch (error) { try { await this.inboxErrorHandler?.(context, error as Error); @@ -1092,6 +1096,7 @@ export class FederationImpl }, retryMessage.attempt, ); + recordInboxActivity(this.meterProvider, "retried", activityType); } } else { logger.error( @@ -1104,6 +1109,7 @@ export class FederationImpl recipient: message.identifier, }, ); + recordInboxActivity(this.meterProvider, "abandoned", activityType); } span.setStatus({ code: SpanStatusCode.ERROR, From ba884feb35accd0a4474a89ea7c03d9244b25bdb Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Mon, 18 May 2026 09:08:29 +0900 Subject: [PATCH 05/10] Document activitypub.fanout and activity lifecycle metrics Extend the OpenTelemetry manual page with the three new instruments introduced by https://github.com/fedify-dev/fedify/issues/742 and a narrative paragraph that ties them to the existing per-recipient and per-task metric families. Documentation changes: - Add three rows to the "Instrumented metrics" table for activitypub.fanout.recipients, activitypub.inbox.activity, and activitypub.outbox.activity. - Add per-instrument attribute description blocks listing the processing.result vocabulary (queued, processed, retried, rejected, abandoned for inbox; queued, retried, abandoned for outbox) and where each value is recorded. - Note the native-retrial caveat: queue backends that declare nativeRetrial defer retry handling, so retried and abandoned are not recorded for those backends. - Note the fanout strategy semantics: with the default fanout: "auto", activities below the 5-recipient threshold are delivered directly and do not appear in activitypub.fanout.recipients; fanout: "force" always enqueues a fanout task, and fanout: "skip" bypasses fanout. - Add a paragraph explaining that the activity-level counters complement the per-recipient activitypub.delivery.* counters and the per-task fedify.queue.task.* metrics rather than replacing them. - Add activitypub.processing.result to the ActivityPub semantic attributes reference table. Changelog: - Add an entry under the unreleased 2.3.0 section listing the three new instruments and the native-retrial caveat. https://github.com/fedify-dev/fedify/issues/742 Assisted-by: Claude Code:claude-opus-4-7 Assisted-by: Codex:gpt-5.5 --- CHANGES.md | 23 ++++++++ docs/manual/opentelemetry.md | 106 +++++++++++++++++++++++++++++------ 2 files changed, 112 insertions(+), 17 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 664494ca8..289c2e92d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -91,12 +91,35 @@ To be released. operators distinguish a slow-draining queue from a queue that sees less traffic. [[#316], [#740], [#759]] + - Added OpenTelemetry metrics for ActivityPub fanout and activity + lifecycle events, complementing the per-recipient + `activitypub.delivery.*` counters and the per-task + `fedify.queue.task.*` metrics with an activity-level view of inbox + and outbox pressure: + + - `activitypub.fanout.recipients` (histogram) records the number of + recipient inboxes produced by a single fanout enqueue. + - `activitypub.inbox.activity` (counter) classifies an inbound + activity via the new `activitypub.processing.result` attribute + as `queued`, `processed`, `retried`, `rejected`, or `abandoned`. + - `activitypub.outbox.activity` (counter) classifies an outbound + activity as `queued`, `retried`, or `abandoned`. Per-recipient + `sent`/`failed` rows remain on `activitypub.delivery.sent` and + `activitypub.delivery.permanent_failure` and are not duplicated. + + The lifecycle counters cover only Fedify-managed events: queue + backends with `nativeRetrial` defer retry handling and therefore do + not record `retried` or `abandoned`. Recipient URLs, actor IDs, + and other high-cardinality identifiers are deliberately excluded + from the fanout histogram. [[#316], [#742]] + [#316]: https://github.com/fedify-dev/fedify/issues/316 [#619]: https://github.com/fedify-dev/fedify/issues/619 [#735]: https://github.com/fedify-dev/fedify/issues/735 [#736]: https://github.com/fedify-dev/fedify/issues/736 [#737]: https://github.com/fedify-dev/fedify/issues/737 [#740]: https://github.com/fedify-dev/fedify/issues/740 +[#742]: https://github.com/fedify-dev/fedify/issues/742 [#748]: https://github.com/fedify-dev/fedify/pull/748 [#752]: https://github.com/fedify-dev/fedify/issues/752 [#753]: https://github.com/fedify-dev/fedify/pull/753 diff --git a/docs/manual/opentelemetry.md b/docs/manual/opentelemetry.md index 404293aa1..717fd589b 100644 --- a/docs/manual/opentelemetry.md +++ b/docs/manual/opentelemetry.md @@ -296,23 +296,26 @@ Instrumented metrics Fedify records the following OpenTelemetry metrics: -| Metric name | Instrument | Unit | Description | -| --------------------------------------------- | ------------- | ----------- | ----------------------------------------------------------------------------------------------- | -| `activitypub.delivery.sent` | Counter | `{attempt}` | Counts outgoing ActivityPub delivery attempts. | -| `activitypub.delivery.permanent_failure` | Counter | `{failure}` | Counts outgoing deliveries abandoned as permanent failures. | -| `activitypub.delivery.duration` | Histogram | `ms` | Measures outgoing ActivityPub delivery attempt duration. | -| `activitypub.inbox.processing_duration` | Histogram | `ms` | Measures inbox listener processing duration. | -| `activitypub.signature.verification_failure` | Counter | `{failure}` | Counts failed signature verification for inbox requests. | -| `activitypub.signature.verification.duration` | Histogram | `ms` | Measures signature verification duration across HTTP, Linked Data, and Object Integrity Proofs. | -| `activitypub.signature.key_fetch.duration` | Histogram | `ms` | Measures public key lookup duration during signature verification. | -| `fedify.http.server.request.count` | Counter | `{request}` | Counts inbound HTTP requests handled by `Federation.fetch()`. | -| `fedify.http.server.request.duration` | Histogram | `ms` | Measures inbound HTTP request duration in `Federation.fetch()`. | -| `fedify.queue.task.enqueued` | Counter | `{task}` | Counts inbox, outbox, and fanout tasks Fedify enqueued. | -| `fedify.queue.task.started` | Counter | `{task}` | Counts queue tasks Fedify began processing as a worker. | -| `fedify.queue.task.completed` | Counter | `{task}` | Counts queue tasks Fedify finished processing without throwing. | -| `fedify.queue.task.failed` | Counter | `{task}` | Counts queue tasks Fedify abandoned because processing threw. | -| `fedify.queue.task.duration` | Histogram | `ms` | Measures queue task processing duration in Fedify workers. | -| `fedify.queue.task.in_flight` | UpDownCounter | `{task}` | Tracks queue tasks currently in flight in this Fedify process. | +| Metric name | Instrument | Unit | Description | +| --------------------------------------------- | ------------- | ------------- | ----------------------------------------------------------------------------------------------- | +| `activitypub.delivery.sent` | Counter | `{attempt}` | Counts outgoing ActivityPub delivery attempts. | +| `activitypub.delivery.permanent_failure` | Counter | `{failure}` | Counts outgoing deliveries abandoned as permanent failures. | +| `activitypub.delivery.duration` | Histogram | `ms` | Measures outgoing ActivityPub delivery attempt duration. | +| `activitypub.inbox.activity` | Counter | `{activity}` | Classifies inbound activities by lifecycle outcome. | +| `activitypub.inbox.processing_duration` | Histogram | `ms` | Measures inbox listener processing duration. | +| `activitypub.outbox.activity` | Counter | `{activity}` | Classifies outbound activities by lifecycle outcome. | +| `activitypub.fanout.recipients` | Histogram | `{recipient}` | Records the recipient inbox count produced by a single fanout enqueue. | +| `activitypub.signature.verification_failure` | Counter | `{failure}` | Counts failed signature verification for inbox requests. | +| `activitypub.signature.verification.duration` | Histogram | `ms` | Measures signature verification duration across HTTP, Linked Data, and Object Integrity Proofs. | +| `activitypub.signature.key_fetch.duration` | Histogram | `ms` | Measures public key lookup duration during signature verification. | +| `fedify.http.server.request.count` | Counter | `{request}` | Counts inbound HTTP requests handled by `Federation.fetch()`. | +| `fedify.http.server.request.duration` | Histogram | `ms` | Measures inbound HTTP request duration in `Federation.fetch()`. | +| `fedify.queue.task.enqueued` | Counter | `{task}` | Counts inbox, outbox, and fanout tasks Fedify enqueued. | +| `fedify.queue.task.started` | Counter | `{task}` | Counts queue tasks Fedify began processing as a worker. | +| `fedify.queue.task.completed` | Counter | `{task}` | Counts queue tasks Fedify finished processing without throwing. | +| `fedify.queue.task.failed` | Counter | `{task}` | Counts queue tasks Fedify abandoned because processing threw. | +| `fedify.queue.task.duration` | Histogram | `ms` | Measures queue task processing duration in Fedify workers. | +| `fedify.queue.task.in_flight` | UpDownCounter | `{task}` | Tracks queue tasks currently in flight in this Fedify process. | ### Metric attributes @@ -327,6 +330,62 @@ Fedify records the following OpenTelemetry metrics: : `activitypub.remote.host`, `activitypub.delivery.success`, and `activitypub.activity.type` when Fedify knows the activity type. +`activitypub.inbox.activity` +: `activitypub.processing.result` is always present, and is one of: + + - `queued`: the activity was accepted at the inbox endpoint and + enqueued for background processing. + - `processed`: the registered listener returned without throwing. + Recorded once per successful dispatch, immediately after the + listener completes and before the idempotency cache write so + a `kv.set()` failure does not lose the event. + - `retried`: Fedify enqueued a retry message after the listener + threw and the configured `inboxRetryPolicy` returned a delay. + - `rejected`: Fedify refused the activity at the routing layer + (idempotency cache hit, missing actor) or at processing time + (no listener for the activity type, no-queue listener error). + - `abandoned`: the inbox retry policy returned `null` and Fedify + gave up on the activity. + + `activitypub.activity.type` is recorded whenever Fedify knows the + activity type, which is at every site listed above. Queue backends + that declare `nativeRetrial` are not represented in `retried` or + `abandoned` because Fedify defers retry handling to the backend + instead of re-enqueuing itself. + +`activitypub.outbox.activity` +: `activitypub.processing.result` is always present, and is one of: + + - `queued`: an outbox task was enqueued for an initial delivery + attempt (`attempt = 0`). Each recipient inbox enqueues its own + task, so fanned-out activities increment this counter once per + recipient. Retry re-enqueues are reported as `retried`, not + `queued`. + - `retried`: Fedify enqueued a retry message after a delivery + failed and the configured `outboxRetryPolicy` returned a delay. + - `abandoned`: the outbox retry policy returned `null` and Fedify + gave up on the recipient. + + `activitypub.activity.type` is always present. Per-recipient + `sent`/`failed` views live on + [`activitypub.delivery.sent`](#instrumented-metrics) (with the + `activitypub.delivery.success` attribute) and + [`activitypub.delivery.permanent_failure`](#instrumented-metrics); + they are not duplicated on this counter. Native-retrial backends + do not record `retried` or `abandoned`. + +`activitypub.fanout.recipients` +: `activitypub.activity.type` is recorded whenever known. The + histogram value is the number of recipient inboxes the fanout task + expanded into (after shared-inbox grouping); one measurement per + fanout enqueue. Recipient URLs, actor IDs, and shared-inbox flags + are deliberately omitted to keep cardinality bounded. With the + default `fanout: "auto"` strategy, activities below the fanout + threshold (`< 5` recipients) are delivered directly without a + fanout task and do not appear in this histogram; passing + `fanout: "force"` always enqueues a fanout task, and + `fanout: "skip"` bypasses fanout regardless of recipient count. + `activitypub.inbox.processing_duration` : `activitypub.activity.type`. @@ -476,6 +535,18 @@ Reading both signals together (task throughput plus backlog depth) makes it possible to distinguish a small, slow queue from a large, fast one and to set alerting thresholds for delivery latency under load. +The `activitypub.inbox.activity`, `activitypub.outbox.activity`, and +`activitypub.fanout.recipients` metrics describe what is happening at +the *activity* level, complementing the per-recipient +`activitypub.delivery.*` counters and the per-task `fedify.queue.task.*` +metrics. Use them when you need to understand whether the pressure on +a slow queue comes from fanout size, retry volume, or activity-type +mix. Concrete per-task counts (initial enqueue vs. retry re-enqueue, +or processed-task throughput) remain available on `fedify.queue.task.enqueued` +(via the `fedify.queue.task.attempt` attribute) and +`fedify.queue.task.completed`; the activity-level counters are +intentionally not a queue-mechanism replacement. + Fedify records `activitypub.remote.host` as the URL hostname only; ports, paths, and query strings are deliberately excluded to keep metric cardinality bounded. Activity types use the same qualified URI form as Fedify's trace attributes, @@ -519,6 +590,7 @@ for ActivityPub: | `activitypub.activity.retries` | int | The ordinal number of activity resending attempt (if and only if it's retried). | `3` | | `activitypub.delivery.attempt` | int | The zero-based delivery attempt number for a queued outgoing activity. | `0` | | `activitypub.delivery.permanent_failure` | boolean | Whether an outgoing delivery failure will be abandoned instead of retried. | `true` | +| `activitypub.processing.result` | string | Lifecycle outcome of an inbox or outbox activity: `queued`, `processed`, `retried`, `rejected`, or `abandoned`. | `"retried"` | | `activitypub.actor.id` | string | The URI of the actor object. | `"https://example.com/actor/1"` | | `activitypub.actor.key.cached` | boolean | Whether the actor's public keys are cached. | `true` | | `activitypub.actor.type` | string[] | The qualified URI(s) of the actor type(s). | `["https://www.w3.org/ns/activitystreams#Person"]` | From a77511af9d63372acc4571386c587c81ab010af5 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Mon, 18 May 2026 11:42:25 +0900 Subject: [PATCH 06/10] Clarify OutboxActivityResult queued JSDoc Codex pointed out the previous wording said an outbox or fanout task was enqueued, but Fedify only records activitypub.outbox.activity{queued} when an initial outbox message is enqueued (including the per-recipient outbox messages that the fanout worker produces, not the fanout enqueue itself). Update the exported type's JSDoc so it matches the manual page and the actual recording site. https://github.com/fedify-dev/fedify/issues/742 Assisted-by: Codex:gpt-5.5 Assisted-by: Claude Code:claude-opus-4-7 --- packages/fedify/src/federation/metrics.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/packages/fedify/src/federation/metrics.ts b/packages/fedify/src/federation/metrics.ts index 60421ac4d..eae39af3d 100644 --- a/packages/fedify/src/federation/metrics.ts +++ b/packages/fedify/src/federation/metrics.ts @@ -52,9 +52,11 @@ export type InboxActivityResult = * attribute. Tracks Fedify-managed events at the outbox-task level, * separate from the per-delivery counters defined in #619. * - * - `queued`: an outbox or fanout task was enqueued for the activity - * (initial enqueue only; subsequent retry enqueues are recorded as - * `retried`). + * - `queued`: an initial outbox task was enqueued for delivery. + * Recorded once per recipient inbox: fanned-out activities are + * counted as each per-recipient outbox task is enqueued by the + * fanout worker, not at the fanout-task enqueue itself. Retry + * re-enqueues are recorded as `retried`, not `queued`. * - `retried`: Fedify scheduled a retry after a delivery failure. * - `abandoned`: the outbox retry policy gave up after exhausted * attempts. From e1434e833c298c9e93300b45cb2ed2837cee675b Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Mon, 18 May 2026 11:50:47 +0900 Subject: [PATCH 07/10] Add a PR link to the changelog --- CHANGES.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 289c2e92d..a0fb2cad7 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -111,7 +111,7 @@ To be released. backends with `nativeRetrial` defer retry handling and therefore do not record `retried` or `abandoned`. Recipient URLs, actor IDs, and other high-cardinality identifiers are deliberately excluded - from the fanout histogram. [[#316], [#742]] + from the fanout histogram. [[#316], [#742], [#770]] [#316]: https://github.com/fedify-dev/fedify/issues/316 [#619]: https://github.com/fedify-dev/fedify/issues/619 @@ -127,6 +127,7 @@ To be released. [#757]: https://github.com/fedify-dev/fedify/pull/757 [#759]: https://github.com/fedify-dev/fedify/pull/759 [#769]: https://github.com/fedify-dev/fedify/pull/769 +[#770]: https://github.com/fedify-dev/fedify/pull/770 ### @fedify/fixture From 53342a27db1e550e2b1f8f220b61cbfa243aab86 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Mon, 18 May 2026 12:30:29 +0900 Subject: [PATCH 08/10] Match recordFanoutRecipients to the instance method The wrapper used (meterProvider, activityType, recipientCount) while the underlying FederationMetrics.recordFanoutRecipients method takes (recipientCount, activityType?), and the sibling wrappers recordInboxActivity and recordOutboxActivity already mirror their instance methods. Reorder the wrapper to take recipientCount before activityType, switch to the optional `activityType?: string` form, and update the lone call site in middleware.ts plus the unit tests. https://github.com/fedify-dev/fedify/pull/770#discussion_r3256100370 Assisted-by: Claude Code:claude-opus-4-7 --- packages/fedify/src/federation/metrics.test.ts | 4 ++-- packages/fedify/src/federation/metrics.ts | 2 +- packages/fedify/src/federation/middleware.ts | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/fedify/src/federation/metrics.test.ts b/packages/fedify/src/federation/metrics.test.ts index 45ed54929..61df566ca 100644 --- a/packages/fedify/src/federation/metrics.test.ts +++ b/packages/fedify/src/federation/metrics.test.ts @@ -21,8 +21,8 @@ test("recordFanoutRecipients() records the recipient count with activity type", const [meterProvider, recorder] = createTestMeterProvider(); recordFanoutRecipients( meterProvider, - "https://www.w3.org/ns/activitystreams#Create", 7, + "https://www.w3.org/ns/activitystreams#Create", ); const measurements = recorder.getMeasurements( "activitypub.fanout.recipients", @@ -38,7 +38,7 @@ test("recordFanoutRecipients() records the recipient count with activity type", test("recordFanoutRecipients() omits activity type when unknown", () => { const [meterProvider, recorder] = createTestMeterProvider(); - recordFanoutRecipients(meterProvider, undefined, 0); + recordFanoutRecipients(meterProvider, 0); const measurements = recorder.getMeasurements( "activitypub.fanout.recipients", ); diff --git a/packages/fedify/src/federation/metrics.ts b/packages/fedify/src/federation/metrics.ts index eae39af3d..82d62fc49 100644 --- a/packages/fedify/src/federation/metrics.ts +++ b/packages/fedify/src/federation/metrics.ts @@ -655,8 +655,8 @@ export function recordOutboxEnqueue( */ export function recordFanoutRecipients( meterProvider: MeterProvider | undefined, - activityType: string | undefined, recipientCount: number, + activityType?: string, ): void { getFederationMetrics(meterProvider).recordFanoutRecipients( recipientCount, diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index 79d1f9f68..5bff9ca99 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -2745,8 +2745,8 @@ export class ContextImpl implements Context { ); recordFanoutRecipients( this.federation.meterProvider, - message.activityType, globalThis.Object.keys(message.inboxes).length, + message.activityType, ); return true; } From bd4d2d94ce6d9e4273ce912d27269c7db57ebb64 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Mon, 18 May 2026 12:44:49 +0900 Subject: [PATCH 09/10] Record outbox abandoned on permanent failures The activitypub.outbox.activity counter previously emitted abandoned only when the outbox retry policy returned null after exhausted attempts. The permanent-failure branch of #listenOutboxMessage returned early after recording activitypub.delivery.permanent_failure and invoking outboxPermanentFailureHandler, so 404/410-style failures never received a terminal activity-level row. That left queued unreconcilable against retried + abandoned for these failures. Emit abandoned at that early return as well. The per-recipient permanent-failure detail (remote host, status code) stays on the existing activitypub.delivery.permanent_failure counter; the new abandoned row only marks the activity-level lifecycle as concluded. The OutboxActivityResult JSDoc and the activitypub.outbox.activity block in the OpenTelemetry manual page describe both abandon paths. A regression assertion is added to the existing 410 Gone test step. https://github.com/fedify-dev/fedify/pull/770#discussion_r3256100375 Assisted-by: Claude Code:claude-opus-4-7 --- docs/manual/opentelemetry.md | 9 +++++++-- packages/fedify/src/federation/metrics.ts | 8 ++++++-- packages/fedify/src/federation/middleware.test.ts | 13 +++++++++++++ packages/fedify/src/federation/middleware.ts | 5 +++++ 4 files changed, 31 insertions(+), 4 deletions(-) diff --git a/docs/manual/opentelemetry.md b/docs/manual/opentelemetry.md index 717fd589b..fe38b3d78 100644 --- a/docs/manual/opentelemetry.md +++ b/docs/manual/opentelemetry.md @@ -363,8 +363,13 @@ Fedify records the following OpenTelemetry metrics: `queued`. - `retried`: Fedify enqueued a retry message after a delivery failed and the configured `outboxRetryPolicy` returned a delay. - - `abandoned`: the outbox retry policy returned `null` and Fedify - gave up on the recipient. + - `abandoned`: Fedify gave up on the recipient. Recorded both + when the outbox retry policy returned `null` after exhausted + attempts and when the remote responded with a permanent-failure + status code listed in `permanentFailureStatusCodes` (`404` and + `410` by default). The per-recipient permanent-failure detail + (remote host, status code) stays on + [`activitypub.delivery.permanent_failure`](#instrumented-metrics). `activitypub.activity.type` is always present. Per-recipient `sent`/`failed` views live on diff --git a/packages/fedify/src/federation/metrics.ts b/packages/fedify/src/federation/metrics.ts index 82d62fc49..49a0ceddb 100644 --- a/packages/fedify/src/federation/metrics.ts +++ b/packages/fedify/src/federation/metrics.ts @@ -58,8 +58,12 @@ export type InboxActivityResult = * fanout worker, not at the fanout-task enqueue itself. Retry * re-enqueues are recorded as `retried`, not `queued`. * - `retried`: Fedify scheduled a retry after a delivery failure. - * - `abandoned`: the outbox retry policy gave up after exhausted - * attempts. + * - `abandoned`: Fedify gave up on the recipient. Recorded both + * when the outbox retry policy returns `null` after exhausted + * attempts and when the remote responded with a permanent-failure + * status code (`permanentFailureStatusCodes`, by default `404` or + * `410`); the per-recipient permanent-failure detail still lives + * on `activitypub.delivery.permanent_failure`. * * The per-recipient `sent`/`failed` view lives on * `activitypub.delivery.sent` and `activitypub.delivery.permanent_failure` diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index 7047caaea..f2663e94e 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -4197,6 +4197,19 @@ test("FederationImpl.processQueuedTask() permanent failure", async (t) => { 410, ); + const abandoned = recorder.getMeasurements( + "activitypub.outbox.activity", + ); + assertEquals(abandoned.length, 1); + assertEquals( + abandoned[0].attributes["activitypub.processing.result"], + "abandoned", + ); + assertEquals( + abandoned[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); + const events = exporter.getEvents( "activitypub.outbox", "activitypub.delivery.failed", diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index 5bff9ca99..ec5d2629d 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -856,6 +856,11 @@ export class FederationImpl ); } } + recordOutboxActivity( + this.meterProvider, + "abandoned", + message.activityType, + ); return; } From f93da7bf84a377b3240a26656ab6973ca6306c25 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Mon, 18 May 2026 15:15:46 +0900 Subject: [PATCH 10/10] Drop redundant activity! assertions in routeActivity The activity parameter of routeActivity() is declared `activity: Activity` in RouteActivityParameters, not `Activity | null` or `Activity | undefined`, so the non-null assertions sprinkled through the function body have no work to do. The inner tracer.startActiveSpan callback also does not strip the parameter's narrowing, so the assertions inside it were never load-bearing either. Drop every `activity!` in this function, and reduce the matching `activity?.id?.href` to `activity.id?.href` for the same reason. https://github.com/fedify-dev/fedify/pull/770#discussion_r3256407233 Assisted-by: Claude Code:claude-opus-4-7 --- packages/fedify/src/federation/inbox.ts | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/packages/fedify/src/federation/inbox.ts b/packages/fedify/src/federation/inbox.ts index 8c2bc5c39..ac33bb331 100644 --- a/packages/fedify/src/federation/inbox.ts +++ b/packages/fedify/src/federation/inbox.ts @@ -205,7 +205,7 @@ export async function routeActivity( "activitypub.dispatch_inbox_listener", { kind: SpanKind.INTERNAL }, async (span) => { - const dispatched = inboxListeners?.dispatchWithClass(activity!); + const dispatched = inboxListeners?.dispatchWithClass(activity); if (dispatched == null) { logger.error( "Unsupported activity type:\n{activity}", @@ -213,12 +213,12 @@ export async function routeActivity( ); span.setStatus({ code: SpanStatusCode.UNSET, - message: `Unsupported activity type: ${getTypeId(activity!).href}`, + message: `Unsupported activity type: ${getTypeId(activity).href}`, }); recordInboxActivity( meterProvider, "rejected", - getTypeId(activity!).href, + getTypeId(activity).href, ); span.end(); return "unsupportedActivity"; @@ -226,17 +226,17 @@ export async function routeActivity( const { class: cls, listener } = dispatched; span.updateName(`activitypub.dispatch_inbox_listener ${cls.name}`); try { - const activityType = getTypeId(activity!).href; + const activityType = getTypeId(activity).href; const started = performance.now(); try { await listener( inboxContextFactory( recipient, json, - activity?.id?.href, + activity.id?.href, activityType, ), - activity!, + activity, ); } finally { getFederationMetrics(meterProvider).recordInboxProcessingDuration( @@ -252,7 +252,7 @@ export async function routeActivity( "An unexpected error occurred in inbox error handler:\n{error}", { error, - activityId: activity!.id?.href, + activityId: activity.id?.href, activity: json, recipient, }, @@ -262,7 +262,7 @@ export async function routeActivity( "Failed to process the incoming activity {activityId}:\n{error}", { error, - activityId: activity!.id?.href, + activityId: activity.id?.href, activity: json, recipient, }, @@ -271,7 +271,7 @@ export async function routeActivity( recordInboxActivity( meterProvider, "rejected", - getTypeId(activity!).href, + getTypeId(activity).href, ); span.end(); return "error"; @@ -279,7 +279,7 @@ export async function routeActivity( recordInboxActivity( meterProvider, "processed", - getTypeId(activity!).href, + getTypeId(activity).href, ); if (cacheKey != null) { await kv.set(cacheKey, true, { @@ -288,7 +288,7 @@ export async function routeActivity( } logger.info( "Activity {activityId} has been processed.", - { activityId: activity!.id?.href, activity: json, recipient }, + { activityId: activity.id?.href, activity: json, recipient }, ); span.end(); return "success";