Skip to content
24 changes: 24 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,43 @@ To be released.
operators distinguish a slow-draining queue from a queue that sees
less traffic. [[#316], [#740], [#759]]

- Added OpenTelemetry metrics for ActivityPub fanout and activity
lifecycle events, complementing the per-recipient
`activitypub.delivery.*` counters and the per-task
`fedify.queue.task.*` metrics with an activity-level view of inbox
and outbox pressure:

- `activitypub.fanout.recipients` (histogram) records the number of
recipient inboxes produced by a single fanout enqueue.
- `activitypub.inbox.activity` (counter) classifies an inbound
activity via the new `activitypub.processing.result` attribute
as `queued`, `processed`, `retried`, `rejected`, or `abandoned`.
- `activitypub.outbox.activity` (counter) classifies an outbound
activity as `queued`, `retried`, or `abandoned`. Per-recipient
`sent`/`failed` rows remain on `activitypub.delivery.sent` and
`activitypub.delivery.permanent_failure` and are not duplicated.

The lifecycle counters cover only Fedify-managed events: queue
backends with `nativeRetrial` defer retry handling and therefore do
not record `retried` or `abandoned`. Recipient URLs, actor IDs,
and other high-cardinality identifiers are deliberately excluded
from the fanout histogram. [[#316], [#742], [#770]]

[#316]: https://github.com/fedify-dev/fedify/issues/316
[#619]: https://github.com/fedify-dev/fedify/issues/619
[#735]: https://github.com/fedify-dev/fedify/issues/735
[#736]: https://github.com/fedify-dev/fedify/issues/736
[#737]: https://github.com/fedify-dev/fedify/issues/737
[#740]: https://github.com/fedify-dev/fedify/issues/740
[#742]: https://github.com/fedify-dev/fedify/issues/742
[#748]: https://github.com/fedify-dev/fedify/pull/748
[#752]: https://github.com/fedify-dev/fedify/issues/752
[#753]: https://github.com/fedify-dev/fedify/pull/753
[#755]: https://github.com/fedify-dev/fedify/pull/755
[#757]: https://github.com/fedify-dev/fedify/pull/757
[#759]: https://github.com/fedify-dev/fedify/pull/759
[#769]: https://github.com/fedify-dev/fedify/pull/769
[#770]: https://github.com/fedify-dev/fedify/pull/770

### @fedify/fixture

Expand Down
111 changes: 94 additions & 17 deletions docs/manual/opentelemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -296,23 +296,26 @@ Instrumented metrics

Fedify records the following OpenTelemetry metrics:

| Metric name | Instrument | Unit | Description |
| --------------------------------------------- | ------------- | ----------- | ----------------------------------------------------------------------------------------------- |
| `activitypub.delivery.sent` | Counter | `{attempt}` | Counts outgoing ActivityPub delivery attempts. |
| `activitypub.delivery.permanent_failure` | Counter | `{failure}` | Counts outgoing deliveries abandoned as permanent failures. |
| `activitypub.delivery.duration` | Histogram | `ms` | Measures outgoing ActivityPub delivery attempt duration. |
| `activitypub.inbox.processing_duration` | Histogram | `ms` | Measures inbox listener processing duration. |
| `activitypub.signature.verification_failure` | Counter | `{failure}` | Counts failed signature verification for inbox requests. |
| `activitypub.signature.verification.duration` | Histogram | `ms` | Measures signature verification duration across HTTP, Linked Data, and Object Integrity Proofs. |
| `activitypub.signature.key_fetch.duration` | Histogram | `ms` | Measures public key lookup duration during signature verification. |
| `fedify.http.server.request.count` | Counter | `{request}` | Counts inbound HTTP requests handled by `Federation.fetch()`. |
| `fedify.http.server.request.duration` | Histogram | `ms` | Measures inbound HTTP request duration in `Federation.fetch()`. |
| `fedify.queue.task.enqueued` | Counter | `{task}` | Counts inbox, outbox, and fanout tasks Fedify enqueued. |
| `fedify.queue.task.started` | Counter | `{task}` | Counts queue tasks Fedify began processing as a worker. |
| `fedify.queue.task.completed` | Counter | `{task}` | Counts queue tasks Fedify finished processing without throwing. |
| `fedify.queue.task.failed` | Counter | `{task}` | Counts queue tasks Fedify abandoned because processing threw. |
| `fedify.queue.task.duration` | Histogram | `ms` | Measures queue task processing duration in Fedify workers. |
| `fedify.queue.task.in_flight` | UpDownCounter | `{task}` | Tracks queue tasks currently in flight in this Fedify process. |
| Metric name | Instrument | Unit | Description |
| --------------------------------------------- | ------------- | ------------- | ----------------------------------------------------------------------------------------------- |
| `activitypub.delivery.sent` | Counter | `{attempt}` | Counts outgoing ActivityPub delivery attempts. |
| `activitypub.delivery.permanent_failure` | Counter | `{failure}` | Counts outgoing deliveries abandoned as permanent failures. |
| `activitypub.delivery.duration` | Histogram | `ms` | Measures outgoing ActivityPub delivery attempt duration. |
| `activitypub.inbox.activity` | Counter | `{activity}` | Classifies inbound activities by lifecycle outcome. |
| `activitypub.inbox.processing_duration` | Histogram | `ms` | Measures inbox listener processing duration. |
| `activitypub.outbox.activity` | Counter | `{activity}` | Classifies outbound activities by lifecycle outcome. |
| `activitypub.fanout.recipients` | Histogram | `{recipient}` | Records the recipient inbox count produced by a single fanout enqueue. |
| `activitypub.signature.verification_failure` | Counter | `{failure}` | Counts failed signature verification for inbox requests. |
| `activitypub.signature.verification.duration` | Histogram | `ms` | Measures signature verification duration across HTTP, Linked Data, and Object Integrity Proofs. |
| `activitypub.signature.key_fetch.duration` | Histogram | `ms` | Measures public key lookup duration during signature verification. |
| `fedify.http.server.request.count` | Counter | `{request}` | Counts inbound HTTP requests handled by `Federation.fetch()`. |
| `fedify.http.server.request.duration` | Histogram | `ms` | Measures inbound HTTP request duration in `Federation.fetch()`. |
| `fedify.queue.task.enqueued` | Counter | `{task}` | Counts inbox, outbox, and fanout tasks Fedify enqueued. |
| `fedify.queue.task.started` | Counter | `{task}` | Counts queue tasks Fedify began processing as a worker. |
| `fedify.queue.task.completed` | Counter | `{task}` | Counts queue tasks Fedify finished processing without throwing. |
| `fedify.queue.task.failed` | Counter | `{task}` | Counts queue tasks Fedify abandoned because processing threw. |
| `fedify.queue.task.duration` | Histogram | `ms` | Measures queue task processing duration in Fedify workers. |
| `fedify.queue.task.in_flight` | UpDownCounter | `{task}` | Tracks queue tasks currently in flight in this Fedify process. |

### Metric attributes

Expand All @@ -327,6 +330,67 @@ Fedify records the following OpenTelemetry metrics:
: `activitypub.remote.host`, `activitypub.delivery.success`, and
`activitypub.activity.type` when Fedify knows the activity type.

`activitypub.inbox.activity`
: `activitypub.processing.result` is always present, and is one of:

- `queued`: the activity was accepted at the inbox endpoint and
enqueued for background processing.
- `processed`: the registered listener returned without throwing.
Recorded once per successful dispatch, immediately after the
listener completes and before the idempotency cache write so
a `kv.set()` failure does not lose the event.
- `retried`: Fedify enqueued a retry message after the listener
threw and the configured `inboxRetryPolicy` returned a delay.
- `rejected`: Fedify refused the activity at the routing layer
(idempotency cache hit, missing actor) or at processing time
(no listener for the activity type, no-queue listener error).
- `abandoned`: the inbox retry policy returned `null` and Fedify
gave up on the activity.

`activitypub.activity.type` is recorded whenever Fedify knows the
activity type, which is at every site listed above. Queue backends
that declare `nativeRetrial` are not represented in `retried` or
`abandoned` because Fedify defers retry handling to the backend
instead of re-enqueuing itself.

`activitypub.outbox.activity`
: `activitypub.processing.result` is always present, and is one of:

- `queued`: an outbox task was enqueued for an initial delivery
attempt (`attempt = 0`). Each recipient inbox enqueues its own
task, so fanned-out activities increment this counter once per
recipient. Retry re-enqueues are reported as `retried`, not
`queued`.
- `retried`: Fedify enqueued a retry message after a delivery
failed and the configured `outboxRetryPolicy` returned a delay.
- `abandoned`: Fedify gave up on the recipient. Recorded both
when the outbox retry policy returned `null` after exhausted
attempts and when the remote responded with a permanent-failure
status code listed in `permanentFailureStatusCodes` (`404` and
`410` by default). The per-recipient permanent-failure detail
(remote host, status code) stays on
[`activitypub.delivery.permanent_failure`](#instrumented-metrics).

`activitypub.activity.type` is always present. Per-recipient
`sent`/`failed` views live on
[`activitypub.delivery.sent`](#instrumented-metrics) (with the
`activitypub.delivery.success` attribute) and
[`activitypub.delivery.permanent_failure`](#instrumented-metrics);
they are not duplicated on this counter. Native-retrial backends
do not record `retried` or `abandoned`.

`activitypub.fanout.recipients`
: `activitypub.activity.type` is recorded whenever known. The
histogram value is the number of recipient inboxes the fanout task
expanded into (after shared-inbox grouping); one measurement per
fanout enqueue. Recipient URLs, actor IDs, and shared-inbox flags
are deliberately omitted to keep cardinality bounded. With the
default `fanout: "auto"` strategy, activities below the fanout
threshold (`< 5` recipients) are delivered directly without a
fanout task and do not appear in this histogram; passing
`fanout: "force"` always enqueues a fanout task, and
`fanout: "skip"` bypasses fanout regardless of recipient count.

Comment thread
dahlia marked this conversation as resolved.
`activitypub.inbox.processing_duration`
: `activitypub.activity.type`.

Expand Down Expand Up @@ -476,6 +540,18 @@ Reading both signals together (task throughput plus backlog depth)
makes it possible to distinguish a small, slow queue from a large, fast
one and to set alerting thresholds for delivery latency under load.

The `activitypub.inbox.activity`, `activitypub.outbox.activity`, and
`activitypub.fanout.recipients` metrics describe what is happening at
the *activity* level, complementing the per-recipient
`activitypub.delivery.*` counters and the per-task `fedify.queue.task.*`
metrics. Use them when you need to understand whether the pressure on
a slow queue comes from fanout size, retry volume, or activity-type
mix. Concrete per-task counts (initial enqueue vs. retry re-enqueue,
or processed-task throughput) remain available on `fedify.queue.task.enqueued`
(via the `fedify.queue.task.attempt` attribute) and
`fedify.queue.task.completed`; the activity-level counters are
intentionally not a queue-mechanism replacement.

Fedify records `activitypub.remote.host` as the URL hostname only; ports, paths,
and query strings are deliberately excluded to keep metric cardinality bounded.
Activity types use the same qualified URI form as Fedify's trace attributes,
Expand Down Expand Up @@ -519,6 +595,7 @@ for ActivityPub:
| `activitypub.activity.retries` | int | The ordinal number of activity resending attempt (if and only if it's retried). | `3` |
| `activitypub.delivery.attempt` | int | The zero-based delivery attempt number for a queued outgoing activity. | `0` |
| `activitypub.delivery.permanent_failure` | boolean | Whether an outgoing delivery failure will be abandoned instead of retried. | `true` |
| `activitypub.processing.result` | string | Lifecycle outcome of an inbox or outbox activity: `queued`, `processed`, `retried`, `rejected`, or `abandoned`. | `"retried"` |
| `activitypub.actor.id` | string | The URI of the actor object. | `"https://example.com/actor/1"` |
| `activitypub.actor.key.cached` | boolean | Whether the actor's public keys are cached. | `true` |
| `activitypub.actor.type` | string[] | The qualified URI(s) of the actor type(s). | `["https://www.w3.org/ns/activitystreams#Person"]` |
Expand Down
44 changes: 35 additions & 9 deletions packages/fedify/src/federation/inbox.ts
Comment thread
dahlia marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ import type { KvKey, KvStore } from "./kv.ts";
import type { MessageQueue } from "./mq.ts";
import type { InboxMessage } from "./queue.ts";
import type { ActivityListenerSet } from "./activity-listener.ts";
import { getDurationMs, getFederationMetrics } from "./metrics.ts";
import {
getDurationMs,
getFederationMetrics,
recordInboxActivity,
} from "./metrics.ts";

export interface RouteActivityParameters<TContextData> {
context: Context<TContextData>;
Expand Down Expand Up @@ -141,12 +145,18 @@ export async function routeActivity<TContextData>(
code: SpanStatusCode.UNSET,
message: `Activity ${activity.id?.href} has already been processed.`,
});
recordInboxActivity(
meterProvider,
"rejected",
getTypeId(activity).href,
);
return "alreadyProcessed";
}
}
if (activity.actorId == null) {
logger.error("Missing actor.", { activity: json });
span.setStatus({ code: SpanStatusCode.ERROR, message: "Missing actor." });
recordInboxActivity(meterProvider, "rejected", getTypeId(activity).href);
return "missingActor";
}
span.setAttribute("activitypub.actor.id", activity.actorId.href);
Expand Down Expand Up @@ -182,6 +192,7 @@ export async function routeActivity<TContextData>(
{ role: "inbox", queue, activityType: getTypeId(activity).href },
0,
);
recordInboxActivity(meterProvider, "queued", getTypeId(activity).href);
logger.info(
"Activity {activityId} is enqueued.",
{ activityId: activity.id?.href, activity: json, recipient },
Expand All @@ -194,33 +205,38 @@ export async function routeActivity<TContextData>(
"activitypub.dispatch_inbox_listener",
{ kind: SpanKind.INTERNAL },
async (span) => {
const dispatched = inboxListeners?.dispatchWithClass(activity!);
const dispatched = inboxListeners?.dispatchWithClass(activity);
if (dispatched == null) {
logger.error(
"Unsupported activity type:\n{activity}",
{ activity: json, recipient },
);
span.setStatus({
code: SpanStatusCode.UNSET,
message: `Unsupported activity type: ${getTypeId(activity!).href}`,
message: `Unsupported activity type: ${getTypeId(activity).href}`,
});
recordInboxActivity(
meterProvider,
"rejected",
getTypeId(activity).href,
);
span.end();
return "unsupportedActivity";
}
const { class: cls, listener } = dispatched;
span.updateName(`activitypub.dispatch_inbox_listener ${cls.name}`);
try {
const activityType = getTypeId(activity!).href;
const activityType = getTypeId(activity).href;
const started = performance.now();
try {
await listener(
inboxContextFactory(
recipient,
json,
activity?.id?.href,
activity.id?.href,
activityType,
),
activity!,
activity,
);
} finally {
getFederationMetrics(meterProvider).recordInboxProcessingDuration(
Expand All @@ -236,7 +252,7 @@ export async function routeActivity<TContextData>(
"An unexpected error occurred in inbox error handler:\n{error}",
{
error,
activityId: activity!.id?.href,
activityId: activity.id?.href,
activity: json,
recipient,
},
Expand All @@ -246,23 +262,33 @@ export async function routeActivity<TContextData>(
"Failed to process the incoming activity {activityId}:\n{error}",
{
error,
activityId: activity!.id?.href,
activityId: activity.id?.href,
activity: json,
recipient,
},
);
span.setStatus({ code: SpanStatusCode.ERROR, message: String(error) });
recordInboxActivity(
meterProvider,
"rejected",
getTypeId(activity).href,
);
span.end();
return "error";
}
recordInboxActivity(
meterProvider,
"processed",
getTypeId(activity).href,
);
if (cacheKey != null) {
await kv.set(cacheKey, true, {
ttl: Temporal.Duration.from({ days: 1 }),
});
Comment thread
dahlia marked this conversation as resolved.
}
logger.info(
"Activity {activityId} has been processed.",
{ activityId: activity!.id?.href, activity: json, recipient },
{ activityId: activity.id?.href, activity: json, recipient },
);
span.end();
return "success";
Expand Down
Loading
Loading