diff --git a/CHANGES.md b/CHANGES.md index 664494ca8..a0fb2cad7 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -91,12 +91,35 @@ To be released. operators distinguish a slow-draining queue from a queue that sees less traffic. [[#316], [#740], [#759]] + - Added OpenTelemetry metrics for ActivityPub fanout and activity + lifecycle events, complementing the per-recipient + `activitypub.delivery.*` counters and the per-task + `fedify.queue.task.*` metrics with an activity-level view of inbox + and outbox pressure: + + - `activitypub.fanout.recipients` (histogram) records the number of + recipient inboxes produced by a single fanout enqueue. + - `activitypub.inbox.activity` (counter) classifies an inbound + activity via the new `activitypub.processing.result` attribute + as `queued`, `processed`, `retried`, `rejected`, or `abandoned`. + - `activitypub.outbox.activity` (counter) classifies an outbound + activity as `queued`, `retried`, or `abandoned`. Per-recipient + `sent`/`failed` rows remain on `activitypub.delivery.sent` and + `activitypub.delivery.permanent_failure` and are not duplicated. + + The lifecycle counters cover only Fedify-managed events: queue + backends with `nativeRetrial` defer retry handling and therefore do + not record `retried` or `abandoned`. Recipient URLs, actor IDs, + and other high-cardinality identifiers are deliberately excluded + from the fanout histogram. [[#316], [#742], [#770]] + [#316]: https://github.com/fedify-dev/fedify/issues/316 [#619]: https://github.com/fedify-dev/fedify/issues/619 [#735]: https://github.com/fedify-dev/fedify/issues/735 [#736]: https://github.com/fedify-dev/fedify/issues/736 [#737]: https://github.com/fedify-dev/fedify/issues/737 [#740]: https://github.com/fedify-dev/fedify/issues/740 +[#742]: https://github.com/fedify-dev/fedify/issues/742 [#748]: https://github.com/fedify-dev/fedify/pull/748 [#752]: https://github.com/fedify-dev/fedify/issues/752 [#753]: https://github.com/fedify-dev/fedify/pull/753 @@ -104,6 +127,7 @@ To be released. [#757]: https://github.com/fedify-dev/fedify/pull/757 [#759]: https://github.com/fedify-dev/fedify/pull/759 [#769]: https://github.com/fedify-dev/fedify/pull/769 +[#770]: https://github.com/fedify-dev/fedify/pull/770 ### @fedify/fixture diff --git a/docs/manual/opentelemetry.md b/docs/manual/opentelemetry.md index 404293aa1..fe38b3d78 100644 --- a/docs/manual/opentelemetry.md +++ b/docs/manual/opentelemetry.md @@ -296,23 +296,26 @@ Instrumented metrics Fedify records the following OpenTelemetry metrics: -| Metric name | Instrument | Unit | Description | -| --------------------------------------------- | ------------- | ----------- | ----------------------------------------------------------------------------------------------- | -| `activitypub.delivery.sent` | Counter | `{attempt}` | Counts outgoing ActivityPub delivery attempts. | -| `activitypub.delivery.permanent_failure` | Counter | `{failure}` | Counts outgoing deliveries abandoned as permanent failures. | -| `activitypub.delivery.duration` | Histogram | `ms` | Measures outgoing ActivityPub delivery attempt duration. | -| `activitypub.inbox.processing_duration` | Histogram | `ms` | Measures inbox listener processing duration. | -| `activitypub.signature.verification_failure` | Counter | `{failure}` | Counts failed signature verification for inbox requests. | -| `activitypub.signature.verification.duration` | Histogram | `ms` | Measures signature verification duration across HTTP, Linked Data, and Object Integrity Proofs. | -| `activitypub.signature.key_fetch.duration` | Histogram | `ms` | Measures public key lookup duration during signature verification. | -| `fedify.http.server.request.count` | Counter | `{request}` | Counts inbound HTTP requests handled by `Federation.fetch()`. | -| `fedify.http.server.request.duration` | Histogram | `ms` | Measures inbound HTTP request duration in `Federation.fetch()`. | -| `fedify.queue.task.enqueued` | Counter | `{task}` | Counts inbox, outbox, and fanout tasks Fedify enqueued. | -| `fedify.queue.task.started` | Counter | `{task}` | Counts queue tasks Fedify began processing as a worker. | -| `fedify.queue.task.completed` | Counter | `{task}` | Counts queue tasks Fedify finished processing without throwing. | -| `fedify.queue.task.failed` | Counter | `{task}` | Counts queue tasks Fedify abandoned because processing threw. | -| `fedify.queue.task.duration` | Histogram | `ms` | Measures queue task processing duration in Fedify workers. | -| `fedify.queue.task.in_flight` | UpDownCounter | `{task}` | Tracks queue tasks currently in flight in this Fedify process. | +| Metric name | Instrument | Unit | Description | +| --------------------------------------------- | ------------- | ------------- | ----------------------------------------------------------------------------------------------- | +| `activitypub.delivery.sent` | Counter | `{attempt}` | Counts outgoing ActivityPub delivery attempts. | +| `activitypub.delivery.permanent_failure` | Counter | `{failure}` | Counts outgoing deliveries abandoned as permanent failures. | +| `activitypub.delivery.duration` | Histogram | `ms` | Measures outgoing ActivityPub delivery attempt duration. | +| `activitypub.inbox.activity` | Counter | `{activity}` | Classifies inbound activities by lifecycle outcome. | +| `activitypub.inbox.processing_duration` | Histogram | `ms` | Measures inbox listener processing duration. | +| `activitypub.outbox.activity` | Counter | `{activity}` | Classifies outbound activities by lifecycle outcome. | +| `activitypub.fanout.recipients` | Histogram | `{recipient}` | Records the recipient inbox count produced by a single fanout enqueue. | +| `activitypub.signature.verification_failure` | Counter | `{failure}` | Counts failed signature verification for inbox requests. | +| `activitypub.signature.verification.duration` | Histogram | `ms` | Measures signature verification duration across HTTP, Linked Data, and Object Integrity Proofs. | +| `activitypub.signature.key_fetch.duration` | Histogram | `ms` | Measures public key lookup duration during signature verification. | +| `fedify.http.server.request.count` | Counter | `{request}` | Counts inbound HTTP requests handled by `Federation.fetch()`. | +| `fedify.http.server.request.duration` | Histogram | `ms` | Measures inbound HTTP request duration in `Federation.fetch()`. | +| `fedify.queue.task.enqueued` | Counter | `{task}` | Counts inbox, outbox, and fanout tasks Fedify enqueued. | +| `fedify.queue.task.started` | Counter | `{task}` | Counts queue tasks Fedify began processing as a worker. | +| `fedify.queue.task.completed` | Counter | `{task}` | Counts queue tasks Fedify finished processing without throwing. | +| `fedify.queue.task.failed` | Counter | `{task}` | Counts queue tasks Fedify abandoned because processing threw. | +| `fedify.queue.task.duration` | Histogram | `ms` | Measures queue task processing duration in Fedify workers. | +| `fedify.queue.task.in_flight` | UpDownCounter | `{task}` | Tracks queue tasks currently in flight in this Fedify process. | ### Metric attributes @@ -327,6 +330,67 @@ Fedify records the following OpenTelemetry metrics: : `activitypub.remote.host`, `activitypub.delivery.success`, and `activitypub.activity.type` when Fedify knows the activity type. +`activitypub.inbox.activity` +: `activitypub.processing.result` is always present, and is one of: + + - `queued`: the activity was accepted at the inbox endpoint and + enqueued for background processing. + - `processed`: the registered listener returned without throwing. + Recorded once per successful dispatch, immediately after the + listener completes and before the idempotency cache write so + a `kv.set()` failure does not lose the event. + - `retried`: Fedify enqueued a retry message after the listener + threw and the configured `inboxRetryPolicy` returned a delay. + - `rejected`: Fedify refused the activity at the routing layer + (idempotency cache hit, missing actor) or at processing time + (no listener for the activity type, no-queue listener error). + - `abandoned`: the inbox retry policy returned `null` and Fedify + gave up on the activity. + + `activitypub.activity.type` is recorded whenever Fedify knows the + activity type, which is at every site listed above. Queue backends + that declare `nativeRetrial` are not represented in `retried` or + `abandoned` because Fedify defers retry handling to the backend + instead of re-enqueuing itself. + +`activitypub.outbox.activity` +: `activitypub.processing.result` is always present, and is one of: + + - `queued`: an outbox task was enqueued for an initial delivery + attempt (`attempt = 0`). Each recipient inbox enqueues its own + task, so fanned-out activities increment this counter once per + recipient. Retry re-enqueues are reported as `retried`, not + `queued`. + - `retried`: Fedify enqueued a retry message after a delivery + failed and the configured `outboxRetryPolicy` returned a delay. + - `abandoned`: Fedify gave up on the recipient. Recorded both + when the outbox retry policy returned `null` after exhausted + attempts and when the remote responded with a permanent-failure + status code listed in `permanentFailureStatusCodes` (`404` and + `410` by default). The per-recipient permanent-failure detail + (remote host, status code) stays on + [`activitypub.delivery.permanent_failure`](#instrumented-metrics). + + `activitypub.activity.type` is always present. Per-recipient + `sent`/`failed` views live on + [`activitypub.delivery.sent`](#instrumented-metrics) (with the + `activitypub.delivery.success` attribute) and + [`activitypub.delivery.permanent_failure`](#instrumented-metrics); + they are not duplicated on this counter. Native-retrial backends + do not record `retried` or `abandoned`. + +`activitypub.fanout.recipients` +: `activitypub.activity.type` is recorded whenever known. The + histogram value is the number of recipient inboxes the fanout task + expanded into (after shared-inbox grouping); one measurement per + fanout enqueue. Recipient URLs, actor IDs, and shared-inbox flags + are deliberately omitted to keep cardinality bounded. With the + default `fanout: "auto"` strategy, activities below the fanout + threshold (`< 5` recipients) are delivered directly without a + fanout task and do not appear in this histogram; passing + `fanout: "force"` always enqueues a fanout task, and + `fanout: "skip"` bypasses fanout regardless of recipient count. + `activitypub.inbox.processing_duration` : `activitypub.activity.type`. @@ -476,6 +540,18 @@ Reading both signals together (task throughput plus backlog depth) makes it possible to distinguish a small, slow queue from a large, fast one and to set alerting thresholds for delivery latency under load. +The `activitypub.inbox.activity`, `activitypub.outbox.activity`, and +`activitypub.fanout.recipients` metrics describe what is happening at +the *activity* level, complementing the per-recipient +`activitypub.delivery.*` counters and the per-task `fedify.queue.task.*` +metrics. Use them when you need to understand whether the pressure on +a slow queue comes from fanout size, retry volume, or activity-type +mix. Concrete per-task counts (initial enqueue vs. retry re-enqueue, +or processed-task throughput) remain available on `fedify.queue.task.enqueued` +(via the `fedify.queue.task.attempt` attribute) and +`fedify.queue.task.completed`; the activity-level counters are +intentionally not a queue-mechanism replacement. + Fedify records `activitypub.remote.host` as the URL hostname only; ports, paths, and query strings are deliberately excluded to keep metric cardinality bounded. Activity types use the same qualified URI form as Fedify's trace attributes, @@ -519,6 +595,7 @@ for ActivityPub: | `activitypub.activity.retries` | int | The ordinal number of activity resending attempt (if and only if it's retried). | `3` | | `activitypub.delivery.attempt` | int | The zero-based delivery attempt number for a queued outgoing activity. | `0` | | `activitypub.delivery.permanent_failure` | boolean | Whether an outgoing delivery failure will be abandoned instead of retried. | `true` | +| `activitypub.processing.result` | string | Lifecycle outcome of an inbox or outbox activity: `queued`, `processed`, `retried`, `rejected`, or `abandoned`. | `"retried"` | | `activitypub.actor.id` | string | The URI of the actor object. | `"https://example.com/actor/1"` | | `activitypub.actor.key.cached` | boolean | Whether the actor's public keys are cached. | `true` | | `activitypub.actor.type` | string[] | The qualified URI(s) of the actor type(s). | `["https://www.w3.org/ns/activitystreams#Person"]` | diff --git a/packages/fedify/src/federation/inbox.ts b/packages/fedify/src/federation/inbox.ts index 955e2b5b5..ac33bb331 100644 --- a/packages/fedify/src/federation/inbox.ts +++ b/packages/fedify/src/federation/inbox.ts @@ -22,7 +22,11 @@ import type { KvKey, KvStore } from "./kv.ts"; import type { MessageQueue } from "./mq.ts"; import type { InboxMessage } from "./queue.ts"; import type { ActivityListenerSet } from "./activity-listener.ts"; -import { getDurationMs, getFederationMetrics } from "./metrics.ts"; +import { + getDurationMs, + getFederationMetrics, + recordInboxActivity, +} from "./metrics.ts"; export interface RouteActivityParameters { context: Context; @@ -141,12 +145,18 @@ export async function routeActivity( code: SpanStatusCode.UNSET, message: `Activity ${activity.id?.href} has already been processed.`, }); + recordInboxActivity( + meterProvider, + "rejected", + getTypeId(activity).href, + ); return "alreadyProcessed"; } } if (activity.actorId == null) { logger.error("Missing actor.", { activity: json }); span.setStatus({ code: SpanStatusCode.ERROR, message: "Missing actor." }); + recordInboxActivity(meterProvider, "rejected", getTypeId(activity).href); return "missingActor"; } span.setAttribute("activitypub.actor.id", activity.actorId.href); @@ -182,6 +192,7 @@ export async function routeActivity( { role: "inbox", queue, activityType: getTypeId(activity).href }, 0, ); + recordInboxActivity(meterProvider, "queued", getTypeId(activity).href); logger.info( "Activity {activityId} is enqueued.", { activityId: activity.id?.href, activity: json, recipient }, @@ -194,7 +205,7 @@ export async function routeActivity( "activitypub.dispatch_inbox_listener", { kind: SpanKind.INTERNAL }, async (span) => { - const dispatched = inboxListeners?.dispatchWithClass(activity!); + const dispatched = inboxListeners?.dispatchWithClass(activity); if (dispatched == null) { logger.error( "Unsupported activity type:\n{activity}", @@ -202,25 +213,30 @@ export async function routeActivity( ); span.setStatus({ code: SpanStatusCode.UNSET, - message: `Unsupported activity type: ${getTypeId(activity!).href}`, + message: `Unsupported activity type: ${getTypeId(activity).href}`, }); + recordInboxActivity( + meterProvider, + "rejected", + getTypeId(activity).href, + ); span.end(); return "unsupportedActivity"; } const { class: cls, listener } = dispatched; span.updateName(`activitypub.dispatch_inbox_listener ${cls.name}`); try { - const activityType = getTypeId(activity!).href; + const activityType = getTypeId(activity).href; const started = performance.now(); try { await listener( inboxContextFactory( recipient, json, - activity?.id?.href, + activity.id?.href, activityType, ), - activity!, + activity, ); } finally { getFederationMetrics(meterProvider).recordInboxProcessingDuration( @@ -236,7 +252,7 @@ export async function routeActivity( "An unexpected error occurred in inbox error handler:\n{error}", { error, - activityId: activity!.id?.href, + activityId: activity.id?.href, activity: json, recipient, }, @@ -246,15 +262,25 @@ export async function routeActivity( "Failed to process the incoming activity {activityId}:\n{error}", { error, - activityId: activity!.id?.href, + activityId: activity.id?.href, activity: json, recipient, }, ); span.setStatus({ code: SpanStatusCode.ERROR, message: String(error) }); + recordInboxActivity( + meterProvider, + "rejected", + getTypeId(activity).href, + ); span.end(); return "error"; } + recordInboxActivity( + meterProvider, + "processed", + getTypeId(activity).href, + ); if (cacheKey != null) { await kv.set(cacheKey, true, { ttl: Temporal.Duration.from({ days: 1 }), @@ -262,7 +288,7 @@ export async function routeActivity( } logger.info( "Activity {activityId} has been processed.", - { activityId: activity!.id?.href, activity: json, recipient }, + { activityId: activity.id?.href, activity: json, recipient }, ); span.end(); return "success"; diff --git a/packages/fedify/src/federation/metrics.test.ts b/packages/fedify/src/federation/metrics.test.ts new file mode 100644 index 000000000..61df566ca --- /dev/null +++ b/packages/fedify/src/federation/metrics.test.ts @@ -0,0 +1,155 @@ +import { createTestMeterProvider, test } from "@fedify/fixture"; +import { assertEquals } from "@std/assert"; +import type { MessageQueue } from "./mq.ts"; +import { + recordFanoutRecipients, + recordInboxActivity, + recordOutboxActivity, + recordOutboxEnqueue, +} from "./metrics.ts"; + +const noopQueue: MessageQueue = { + enqueue() { + return Promise.resolve(); + }, + listen() { + return Promise.resolve(); + }, +}; + +test("recordFanoutRecipients() records the recipient count with activity type", () => { + const [meterProvider, recorder] = createTestMeterProvider(); + recordFanoutRecipients( + meterProvider, + 7, + "https://www.w3.org/ns/activitystreams#Create", + ); + const measurements = recorder.getMeasurements( + "activitypub.fanout.recipients", + ); + assertEquals(measurements.length, 1); + assertEquals(measurements[0].type, "histogram"); + assertEquals(measurements[0].value, 7); + assertEquals( + measurements[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); +}); + +test("recordFanoutRecipients() omits activity type when unknown", () => { + const [meterProvider, recorder] = createTestMeterProvider(); + recordFanoutRecipients(meterProvider, 0); + const measurements = recorder.getMeasurements( + "activitypub.fanout.recipients", + ); + assertEquals(measurements.length, 1); + assertEquals(measurements[0].value, 0); + assertEquals( + "activitypub.activity.type" in measurements[0].attributes, + false, + ); +}); + +test("recordInboxActivity() records counter with result and activity type", () => { + const [meterProvider, recorder] = createTestMeterProvider(); + for ( + const result of [ + "queued", + "processed", + "retried", + "rejected", + "abandoned", + ] as const + ) { + recordInboxActivity( + meterProvider, + result, + "https://www.w3.org/ns/activitystreams#Follow", + ); + } + const measurements = recorder.getMeasurements("activitypub.inbox.activity"); + assertEquals(measurements.length, 5); + for (const m of measurements) { + assertEquals(m.type, "counter"); + assertEquals(m.value, 1); + assertEquals( + m.attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Follow", + ); + } + assertEquals( + measurements.map((m) => m.attributes["activitypub.processing.result"]), + ["queued", "processed", "retried", "rejected", "abandoned"], + ); +}); + +test("recordInboxActivity() omits activity type when unknown", () => { + const [meterProvider, recorder] = createTestMeterProvider(); + recordInboxActivity(meterProvider, "rejected"); + const measurements = recorder.getMeasurements("activitypub.inbox.activity"); + assertEquals(measurements.length, 1); + assertEquals( + measurements[0].attributes["activitypub.processing.result"], + "rejected", + ); + assertEquals( + "activitypub.activity.type" in measurements[0].attributes, + false, + ); +}); + +test("recordOutboxEnqueue() also records activitypub.outbox.activity{queued} on initial enqueue", () => { + const [meterProvider, recorder] = createTestMeterProvider(); + recordOutboxEnqueue(meterProvider, noopQueue, { + activityType: "https://www.w3.org/ns/activitystreams#Create", + attempt: 0, + }); + const queued = recorder.getMeasurements("activitypub.outbox.activity"); + assertEquals(queued.length, 1); + assertEquals(queued[0].type, "counter"); + assertEquals( + queued[0].attributes["activitypub.processing.result"], + "queued", + ); + assertEquals( + queued[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); +}); + +test("recordOutboxEnqueue() does not record outbox.activity{queued} on retry enqueues", () => { + const [meterProvider, recorder] = createTestMeterProvider(); + recordOutboxEnqueue(meterProvider, noopQueue, { + activityType: "https://www.w3.org/ns/activitystreams#Create", + attempt: 1, + }); + assertEquals( + recorder.getMeasurements("activitypub.outbox.activity").length, + 0, + ); +}); + +test("recordOutboxActivity() records counter with result and activity type", () => { + const [meterProvider, recorder] = createTestMeterProvider(); + for (const result of ["queued", "retried", "abandoned"] as const) { + recordOutboxActivity( + meterProvider, + result, + "https://www.w3.org/ns/activitystreams#Announce", + ); + } + const measurements = recorder.getMeasurements("activitypub.outbox.activity"); + assertEquals(measurements.length, 3); + for (const m of measurements) { + assertEquals(m.type, "counter"); + assertEquals(m.value, 1); + assertEquals( + m.attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Announce", + ); + } + assertEquals( + measurements.map((m) => m.attributes["activitypub.processing.result"]), + ["queued", "retried", "abandoned"], + ); +}); diff --git a/packages/fedify/src/federation/metrics.ts b/packages/fedify/src/federation/metrics.ts index 64373c02e..49a0ceddb 100644 --- a/packages/fedify/src/federation/metrics.ts +++ b/packages/fedify/src/federation/metrics.ts @@ -21,6 +21,58 @@ export type QueueTaskRole = "fanout" | "outbox" | "inbox"; */ export type QueueTaskResult = "completed" | "failed" | "aborted"; +/** + * The lifecycle event classification recorded on + * `activitypub.inbox.activity` as the `activitypub.processing.result` + * attribute. Tracks Fedify-managed events at the activity level, separate + * from the per-delivery and per-queue-task metrics. + * + * - `queued`: the activity was accepted at the inbox endpoint and + * enqueued for background processing. + * - `processed`: the registered listener finished without throwing. + * - `retried`: Fedify scheduled a retry after the listener threw. + * - `rejected`: Fedify refused to process the activity (missing actor, + * duplicate, unsupported type, or no-queue listener error). + * - `abandoned`: the inbox retry policy gave up after exhausted attempts. + * + * Native-retry message queue backends will not record `retried` or + * `abandoned` because Fedify defers retry handling to the backend. + * @since 2.3.0 + */ +export type InboxActivityResult = + | "queued" + | "processed" + | "retried" + | "rejected" + | "abandoned"; + +/** + * The lifecycle event classification recorded on + * `activitypub.outbox.activity` as the `activitypub.processing.result` + * attribute. Tracks Fedify-managed events at the outbox-task level, + * separate from the per-delivery counters defined in #619. + * + * - `queued`: an initial outbox task was enqueued for delivery. + * Recorded once per recipient inbox: fanned-out activities are + * counted as each per-recipient outbox task is enqueued by the + * fanout worker, not at the fanout-task enqueue itself. Retry + * re-enqueues are recorded as `retried`, not `queued`. + * - `retried`: Fedify scheduled a retry after a delivery failure. + * - `abandoned`: Fedify gave up on the recipient. Recorded both + * when the outbox retry policy returns `null` after exhausted + * attempts and when the remote responded with a permanent-failure + * status code (`permanentFailureStatusCodes`, by default `404` or + * `410`); the per-recipient permanent-failure detail still lives + * on `activitypub.delivery.permanent_failure`. + * + * The per-recipient `sent`/`failed` view lives on + * `activitypub.delivery.sent` and `activitypub.delivery.permanent_failure` + * and is not duplicated here. Native-retry backends will not record + * `retried` or `abandoned`. + * @since 2.3.0 + */ +export type OutboxActivityResult = "queued" | "retried" | "abandoned"; + /** * Common attributes shared by all queue task metrics. * @since 2.3.0 @@ -173,6 +225,9 @@ class FederationMetrics { readonly queueTaskFailed: Counter; readonly queueTaskDuration: Histogram; readonly queueTaskInFlight: UpDownCounter; + readonly fanoutRecipients: Histogram; + readonly inboxActivity: Counter; + readonly outboxActivity: Counter; constructor(meterProvider: MeterProvider) { const meter = meterProvider.getMeter(metadata.name, metadata.version); @@ -315,6 +370,28 @@ class FederationMetrics { unit: "{task}", }, ); + this.fanoutRecipients = meter.createHistogram( + "activitypub.fanout.recipients", + { + description: + "Number of recipient inboxes produced by an ActivityPub fanout " + + "task.", + unit: "{recipient}", + }, + ); + this.inboxActivity = meter.createCounter("activitypub.inbox.activity", { + description: + "ActivityPub activities observed at the inbox lifecycle level: " + + "queued, processed, retried, rejected, or abandoned.", + unit: "{activity}", + }); + this.outboxActivity = meter.createCounter("activitypub.outbox.activity", { + description: + "ActivityPub activities observed at the outbox lifecycle level: " + + "queued, retried, or abandoned. Per-recipient delivery counters " + + "live on `activitypub.delivery.*`.", + unit: "{activity}", + }); } recordDelivery( @@ -454,6 +531,47 @@ class FederationMetrics { } this.queueTaskDuration.record(durationMs, attributes); } + + recordFanoutRecipients(recipientCount: number, activityType?: string): void { + const attributes: Attributes = {}; + if (activityType != null) { + attributes["activitypub.activity.type"] = activityType; + } + this.fanoutRecipients.record(recipientCount, attributes); + } + + recordInboxActivity( + result: InboxActivityResult, + activityType?: string, + ): void { + this.inboxActivity.add( + 1, + buildActivityLifecycleAttributes(result, activityType), + ); + } + + recordOutboxActivity( + result: OutboxActivityResult, + activityType?: string, + ): void { + this.outboxActivity.add( + 1, + buildActivityLifecycleAttributes(result, activityType), + ); + } +} + +function buildActivityLifecycleAttributes( + result: InboxActivityResult | OutboxActivityResult, + activityType?: string, +): Attributes { + const attributes: Attributes = { + "activitypub.processing.result": result, + }; + if (activityType != null) { + attributes["activitypub.activity.type"] = activityType; + } + return attributes; } function buildQueueTaskAttributes( @@ -500,12 +618,17 @@ export function getQueueBackend(queue?: MessageQueue): string | undefined { } /** - * Records `fedify.queue.task.enqueued` for an outgoing outbox enqueue. + * Records `fedify.queue.task.enqueued` for an outgoing outbox enqueue and, + * for the initial attempt, also records + * `activitypub.outbox.activity{queued}`. * * Both `Context.sendActivity()` and `OutboxContext.forwardActivity()` enqueue * outbox messages with the same metric attributes (role, queue, activity * type, attempt), so they share this helper rather than each defining a local - * closure. + * closure. Retry enqueues (attempt > 0) intentionally do not record a + * second `activitypub.outbox.activity{queued}`; retries are reported as + * `result=retried` from the retry-scheduling site, which has the failure + * context. * @since 2.3.0 */ export function recordOutboxEnqueue( @@ -513,7 +636,8 @@ export function recordOutboxEnqueue( outboxQueue: MessageQueue, message: { readonly activityType: string; readonly attempt: number }, ): void { - getFederationMetrics(meterProvider).recordQueueTaskEnqueued( + const metrics = getFederationMetrics(meterProvider); + metrics.recordQueueTaskEnqueued( { role: "outbox", queue: outboxQueue, @@ -521,6 +645,60 @@ export function recordOutboxEnqueue( }, message.attempt, ); + if (message.attempt === 0) { + metrics.recordOutboxActivity("queued", message.activityType); + } +} + +/** + * Records `activitypub.fanout.recipients` with the number of recipient + * inboxes a single fanout produced. The histogram is unitless count + * (one measurement per fanout enqueue). Recipient URLs are deliberately + * not recorded; only the activity type, when known. + * @since 2.3.0 + */ +export function recordFanoutRecipients( + meterProvider: MeterProvider | undefined, + recipientCount: number, + activityType?: string, +): void { + getFederationMetrics(meterProvider).recordFanoutRecipients( + recipientCount, + activityType, + ); +} + +/** + * Records one `activitypub.inbox.activity` measurement. The + * `activitypub.processing.result` attribute is always present; + * `activitypub.activity.type` is recorded only when Fedify already knows + * the activity type. + * @since 2.3.0 + */ +export function recordInboxActivity( + meterProvider: MeterProvider | undefined, + result: InboxActivityResult, + activityType?: string, +): void { + getFederationMetrics(meterProvider).recordInboxActivity(result, activityType); +} + +/** + * Records one `activitypub.outbox.activity` measurement. The + * `activitypub.processing.result` attribute is always present; + * `activitypub.activity.type` is recorded only when Fedify already knows + * the activity type (it is always known for outbox lifecycle events). + * @since 2.3.0 + */ +export function recordOutboxActivity( + meterProvider: MeterProvider | undefined, + result: OutboxActivityResult, + activityType?: string, +): void { + getFederationMetrics(meterProvider).recordOutboxActivity( + result, + activityType, + ); } /** diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index 43d101453..f2663e94e 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -3656,6 +3656,297 @@ test("FederationImpl.processQueuedTask()", async (t) => { assertEquals(queuedMessages, [{ ...inboxMessage, attempt: 1 }]); }); + await t.step( + "records activitypub.outbox.activity retry on transient failure", + async () => { + const kv = new MemoryKvStore(); + const [meterProvider, recorder] = createTestMeterProvider(); + const queue: MessageQueue = { + enqueue(_message, _options) { + return Promise.resolve(); + }, + listen(_handler, _options) { + return Promise.resolve(); + }, + }; + const federation = new FederationImpl({ + kv, + meterProvider, + queue, + }); + + await federation.processQueuedTask( + undefined, + { + type: "outbox", + id: crypto.randomUUID(), + baseUrl: "https://example.com", + keys: [], + activity: { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Create", + actor: "https://example.com/users/alice", + object: { type: "Note", content: "test" }, + }, + activityType: "https://www.w3.org/ns/activitystreams#Create", + inbox: "https://invalid-domain-that-does-not-exist.example/inbox", + sharedInbox: false, + started: new Date().toISOString(), + attempt: 0, + headers: {}, + traceContext: {}, + } satisfies OutboxMessage, + ); + + const outboxLifecycle = recorder.getMeasurements( + "activitypub.outbox.activity", + ); + assertEquals(outboxLifecycle.length, 1); + assertEquals(outboxLifecycle[0].type, "counter"); + assertEquals( + outboxLifecycle[0].attributes["activitypub.processing.result"], + "retried", + ); + assertEquals( + outboxLifecycle[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); + }, + ); + + await t.step( + "records activitypub.outbox.activity abandoned when retry policy gives up", + async () => { + const kv = new MemoryKvStore(); + const [meterProvider, recorder] = createTestMeterProvider(); + const queue: MessageQueue = { + enqueue(_message, _options) { + return Promise.resolve(); + }, + listen(_handler, _options) { + return Promise.resolve(); + }, + }; + const federation = new FederationImpl({ + kv, + meterProvider, + queue, + outboxRetryPolicy: () => null, + }); + + await federation.processQueuedTask( + undefined, + { + type: "outbox", + id: crypto.randomUUID(), + baseUrl: "https://example.com", + keys: [], + activity: { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Follow", + actor: "https://example.com/users/alice", + object: "https://remote.example/users/bob", + }, + activityType: "https://www.w3.org/ns/activitystreams#Follow", + inbox: "https://invalid-domain-that-does-not-exist.example/inbox", + sharedInbox: false, + started: new Date().toISOString(), + attempt: 0, + headers: {}, + traceContext: {}, + } satisfies OutboxMessage, + ); + + const outboxLifecycle = recorder.getMeasurements( + "activitypub.outbox.activity", + ); + assertEquals(outboxLifecycle.length, 1); + assertEquals(outboxLifecycle[0].type, "counter"); + assertEquals( + outboxLifecycle[0].attributes["activitypub.processing.result"], + "abandoned", + ); + assertEquals( + outboxLifecycle[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Follow", + ); + }, + ); + + await t.step( + "records activitypub.inbox.activity processed on successful queued dispatch", + async () => { + const kv = new MemoryKvStore(); + const [meterProvider, recorder] = createTestMeterProvider(); + const queue: MessageQueue = { + enqueue(_message, _options) { + return Promise.resolve(); + }, + listen(_handler, _options) { + return Promise.resolve(); + }, + }; + const federation = new FederationImpl({ + kv, + meterProvider, + queue, + }); + federation.setInboxListeners("/users/{identifier}/inbox", "/inbox") + .on(vocab.Create, () => {}); + + await federation.processQueuedTask( + undefined, + { + type: "inbox", + id: crypto.randomUUID(), + baseUrl: "https://example.com", + activity: { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Create", + id: "https://example.com/activities/queued-processed", + actor: "https://remote.example/users/alice", + object: { type: "Note", content: "Hello world" }, + }, + started: new Date().toISOString(), + attempt: 0, + identifier: null, + traceContext: {}, + } satisfies InboxMessage, + ); + + const inboxLifecycle = recorder.getMeasurements( + "activitypub.inbox.activity", + ); + assertEquals(inboxLifecycle.length, 1); + assertEquals(inboxLifecycle[0].type, "counter"); + assertEquals( + inboxLifecycle[0].attributes["activitypub.processing.result"], + "processed", + ); + assertEquals( + inboxLifecycle[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); + }, + ); + + await t.step( + "records activitypub.inbox.activity retried on transient listener failure", + async () => { + const kv = new MemoryKvStore(); + const [meterProvider, recorder] = createTestMeterProvider(); + const queue: MessageQueue = { + enqueue(_message, _options) { + return Promise.resolve(); + }, + listen(_handler, _options) { + return Promise.resolve(); + }, + }; + const federation = new FederationImpl({ + kv, + meterProvider, + queue, + }); + federation.setInboxListeners("/users/{identifier}/inbox", "/inbox") + .on(vocab.Create, () => { + throw new Error("Intended error for testing"); + }); + + await federation.processQueuedTask( + undefined, + { + type: "inbox", + id: crypto.randomUUID(), + baseUrl: "https://example.com", + activity: { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Create", + id: "https://example.com/activities/queued-retried", + actor: "https://remote.example/users/alice", + object: { type: "Note", content: "Hello world" }, + }, + started: new Date().toISOString(), + attempt: 0, + identifier: null, + traceContext: {}, + } satisfies InboxMessage, + ); + + const inboxLifecycle = recorder.getMeasurements( + "activitypub.inbox.activity", + ); + assertEquals(inboxLifecycle.length, 1); + assertEquals( + inboxLifecycle[0].attributes["activitypub.processing.result"], + "retried", + ); + assertEquals( + inboxLifecycle[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); + }, + ); + + await t.step( + "records activitypub.inbox.activity abandoned when retry policy gives up", + async () => { + const kv = new MemoryKvStore(); + const [meterProvider, recorder] = createTestMeterProvider(); + const queue: MessageQueue = { + enqueue(_message, _options) { + return Promise.resolve(); + }, + listen(_handler, _options) { + return Promise.resolve(); + }, + }; + const federation = new FederationImpl({ + kv, + meterProvider, + queue, + inboxRetryPolicy: () => null, + }); + federation.setInboxListeners("/users/{identifier}/inbox", "/inbox") + .on(vocab.Create, () => { + throw new Error("Intended error for testing"); + }); + + await federation.processQueuedTask( + undefined, + { + type: "inbox", + id: crypto.randomUUID(), + baseUrl: "https://example.com", + activity: { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Create", + id: "https://example.com/activities/queued-abandoned", + actor: "https://remote.example/users/alice", + object: { type: "Note", content: "Hello world" }, + }, + started: new Date().toISOString(), + attempt: 0, + identifier: null, + traceContext: {}, + } satisfies InboxMessage, + ); + + const inboxLifecycle = recorder.getMeasurements( + "activitypub.inbox.activity", + ); + assertEquals(inboxLifecycle.length, 1); + assertEquals( + inboxLifecycle[0].attributes["activitypub.processing.result"], + "abandoned", + ); + assertEquals( + inboxLifecycle[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); + }, + ); + await t.step("records queued inbox processing duration", async () => { const kv = new MemoryKvStore(); const [meterProvider, recorder] = createTestMeterProvider(); @@ -3906,6 +4197,19 @@ test("FederationImpl.processQueuedTask() permanent failure", async (t) => { 410, ); + const abandoned = recorder.getMeasurements( + "activitypub.outbox.activity", + ); + assertEquals(abandoned.length, 1); + assertEquals( + abandoned[0].attributes["activitypub.processing.result"], + "abandoned", + ); + assertEquals( + abandoned[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); + const events = exporter.getEvents( "activitypub.outbox", "activitypub.delivery.failed", @@ -5367,6 +5671,78 @@ test("ContextImpl.sendActivity()", async (t) => { fetchMock.hardReset(); }); +test("ContextImpl.sendActivity() records fanout recipient metrics", async () => { + const kv = new MemoryKvStore(); + const [meterProvider, recorder] = createTestMeterProvider(); + const queue: MessageQueue & { messages: Message[] } = { + messages: [], + enqueue(message) { + this.messages.push(message); + return Promise.resolve(); + }, + async listen() {}, + }; + const federation = new FederationImpl({ + kv, + contextLoaderFactory: () => mockDocumentLoader, + queue, + meterProvider, + }); + federation + .setActorDispatcher("/{identifier}", async (ctx, identifier) => { + if (identifier !== "john") return null; + const keys = await ctx.getActorKeyPairs(identifier); + return new vocab.Person({ + id: ctx.getActorUri(identifier), + preferredUsername: "john", + publicKey: keys[0].cryptographicKey, + assertionMethods: keys.map((k) => k.multikey), + }); + }) + .setKeyPairsDispatcher((_ctx, identifier) => { + if (identifier !== "john") return []; + return [ + { privateKey: rsaPrivateKey2, publicKey: rsaPublicKey2.publicKey! }, + { + privateKey: ed25519PrivateKey, + publicKey: ed25519PublicKey.publicKey!, + }, + ]; + }); + const ctx = new ContextImpl({ + data: undefined, + federation, + url: new URL("https://example.com/"), + documentLoader: mockDocumentLoader, + contextLoader: mockDocumentLoader, + }); + const activity = new vocab.Create({ + id: new URL("https://example.com/activity/1"), + actor: new URL("https://example.com/person"), + }); + const recipients = Array.from({ length: 7 }, (_, i) => ({ + id: new URL(`https://example${i + 1}.com/recipient`), + inboxId: new URL(`https://example${i + 1}.com/inbox`), + })); + await ctx.sendActivity({ username: "john" }, recipients, activity, { + fanout: "force", + }); + + assertEquals(queue.messages.length, 1); + assertEquals(queue.messages[0].type, "fanout"); + + const measurements = recorder.getMeasurements( + "activitypub.fanout.recipients", + ); + assertEquals(measurements.length, 1); + assertEquals(measurements[0].type, "histogram"); + assertEquals(measurements[0].value, recipients.length); + assertEquals( + measurements[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); +}); + test({ name: "ContextImpl.routeActivity()", permissions: { env: true, read: true }, @@ -5580,6 +5956,163 @@ test({ }, }); +test({ + name: "ContextImpl.routeActivity() records inbox.activity lifecycle metrics", + permissions: { env: true, read: true }, + async fn() { + const [meterProvider, recorder] = createTestMeterProvider(); + const queue: MessageQueue = { + enqueue(): Promise { + return Promise.resolve(); + }, + listen(): Promise { + return Promise.resolve(); + }, + }; + const federation = new FederationImpl({ + kv: new MemoryKvStore(), + meterProvider, + queue, + }); + federation.setInboxListeners("/u/{identifier}/i", "/i"); + + const ctx = new ContextImpl({ + url: new URL("https://example.com/"), + federation, + data: undefined, + documentLoader: mockDocumentLoader, + contextLoader: documentLoader, + }); + + const signedOffer = await signObject( + new vocab.Offer({ + id: new URL("https://example.com/offer-queued"), + actor: new URL("https://example.com/person2"), + }), + ed25519PrivateKey, + ed25519Multikey.id!, + ); + assert(await ctx.routeActivity(null, signedOffer)); + + const queued = recorder.getMeasurements("activitypub.inbox.activity"); + assertEquals(queued.length, 1); + assertEquals(queued[0].type, "counter"); + assertEquals( + queued[0].attributes["activitypub.processing.result"], + "queued", + ); + assertEquals( + queued[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Offer", + ); + }, +}); + +test({ + name: + "ContextImpl.routeActivity() records inbox.activity processed without queue", + permissions: { env: true, read: true }, + async fn() { + const [meterProvider, recorder] = createTestMeterProvider(); + const federation = new FederationImpl({ + kv: new MemoryKvStore(), + meterProvider, + }); + federation.setInboxListeners("/u/{identifier}/i", "/i") + .on(vocab.Offer, () => {}); + + const ctx = new ContextImpl({ + url: new URL("https://example.com/"), + federation, + data: undefined, + documentLoader: mockDocumentLoader, + contextLoader: documentLoader, + }); + + const signedOffer = await signObject( + new vocab.Offer({ + id: new URL("https://example.com/offer-processed"), + actor: new URL("https://example.com/person2"), + }), + ed25519PrivateKey, + ed25519Multikey.id!, + ); + assert(await ctx.routeActivity(null, signedOffer)); + + const processed = recorder.getMeasurements("activitypub.inbox.activity"); + assertEquals(processed.length, 1); + assertEquals( + processed[0].attributes["activitypub.processing.result"], + "processed", + ); + assertEquals( + processed[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Offer", + ); + }, +}); + +test({ + name: + "ContextImpl.routeActivity() records inbox.activity rejected for unsupported type and duplicates", + permissions: { env: true, read: true }, + async fn() { + const [meterProvider, recorder] = createTestMeterProvider(); + const federation = new FederationImpl({ + kv: new MemoryKvStore(), + meterProvider, + }); + federation.setInboxListeners("/u/{identifier}/i", "/i") + .on(vocab.Offer, () => {}); + + const ctx = new ContextImpl({ + url: new URL("https://example.com/"), + federation, + data: undefined, + documentLoader: mockDocumentLoader, + contextLoader: documentLoader, + }); + + // Unsupported activity type (Create has no listener). + const signedCreate = await signObject( + new vocab.Create({ + id: new URL("https://example.com/create-unsupported"), + actor: new URL("https://example.com/person2"), + }), + ed25519PrivateKey, + ed25519Multikey.id!, + ); + assert(await ctx.routeActivity(null, signedCreate)); + + // Duplicate Offer activity (re-route same id → idempotency cache hit). + const dupOffer = await signObject( + new vocab.Offer({ + id: new URL("https://example.com/offer-duplicate"), + actor: new URL("https://example.com/person2"), + }), + ed25519PrivateKey, + ed25519Multikey.id!, + ); + assert(await ctx.routeActivity(null, dupOffer)); + assert(await ctx.routeActivity(null, dupOffer)); + + const measurements = recorder.getMeasurements("activitypub.inbox.activity"); + const rejected = measurements.filter((m) => + m.attributes["activitypub.processing.result"] === "rejected" + ); + // One for the unsupported Create, one for the duplicate Offer. + assertEquals(rejected.length, 2); + assertEquals( + rejected[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); + assertEquals( + rejected[1].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Offer", + ); + }, +}); + test("ContextImpl.getCollectionUri()", () => { const federation = new FederationImpl({ kv: new MemoryKvStore() }); const base = "https://example.com"; diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index f52e00318..ec5d2629d 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -110,6 +110,9 @@ import { isAbortError, type QueueTaskCommonAttributes, type QueueTaskResult, + recordFanoutRecipients, + recordInboxActivity, + recordOutboxActivity, recordOutboxEnqueue, } from "./metrics.ts"; import type { MessageQueue } from "./mq.ts"; @@ -853,6 +856,11 @@ export class FederationImpl ); } } + recordOutboxActivity( + this.meterProvider, + "abandoned", + message.activityType, + ); return; } @@ -899,6 +907,11 @@ export class FederationImpl }, retryMessage.attempt, ); + recordOutboxActivity( + this.meterProvider, + "retried", + retryMessage.activityType, + ); } } else { logger.error( @@ -906,6 +919,11 @@ export class FederationImpl "attempts; giving up:\n{error}", { ...logData, error }, ); + recordOutboxActivity( + this.meterProvider, + "abandoned", + message.activityType, + ); } return; } @@ -960,6 +978,7 @@ export class FederationImpl activity: message.activity, recipient: message.identifier, }); + recordInboxActivity(this.meterProvider, "rejected", activityType); return; } } @@ -982,6 +1001,7 @@ export class FederationImpl code: SpanStatusCode.ERROR, message: `Unsupported activity type: ${activityType}`, }); + recordInboxActivity(this.meterProvider, "rejected", activityType); span.end(); return; } @@ -1006,6 +1026,7 @@ export class FederationImpl getDurationMs(started), ); } + recordInboxActivity(this.meterProvider, "processed", activityType); } catch (error) { try { await this.inboxErrorHandler?.(context, error as Error); @@ -1080,6 +1101,7 @@ export class FederationImpl }, retryMessage.attempt, ); + recordInboxActivity(this.meterProvider, "retried", activityType); } } else { logger.error( @@ -1092,6 +1114,7 @@ export class FederationImpl recipient: message.identifier, }, ); + recordInboxActivity(this.meterProvider, "abandoned", activityType); } span.setStatus({ code: SpanStatusCode.ERROR, @@ -2725,6 +2748,11 @@ export class ContextImpl implements Context { }, 0, ); + recordFanoutRecipients( + this.federation.meterProvider, + globalThis.Object.keys(message.inboxes).length, + message.activityType, + ); return true; }