Skip to content

Commit 607ff44

Browse files
committed
Improve worker resilience with RabbitMQ reconnects, structured ingest logging, retry vs DLQ error handling, and safer backend delivery logic
1 parent 51c68bd commit 607ff44

File tree

9 files changed

+384
-89
lines changed

9 files changed

+384
-89
lines changed

.env.example

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,18 @@
1-
NODE_ENV=development
21
RABBITMQ_URL=amqp://app:app@localhost:5672
3-
REDIS_URL=redis://localhost:6379
4-
API_GRAPHQL_URL=http://localhost:3000/graphql
52

63
QUEUE_RAW_EVENT=source.raw.v1
4+
QUEUE_RETRY_EVENT=source.raw.retry.v1
5+
QUEUE_DEAD_LETTER_EVENT=source.raw.dlq.v1
76
QUEUE_NORMALIZED_EVENT=source.normalized.v1
7+
8+
API_BASE_URL=http://localhost:3000
9+
GRAPHQL_PATH=/graphql
10+
# API_GRAPHQL_URL=http://localhost:3000/graphql
11+
API_INGEST_TOKEN=replace_me_ingest_token
12+
813
SHARED_CONTRACTS_DIR=../shared-contracts
14+
15+
RETRY_ATTEMPTS=5
16+
RETRY_BASE_DELAY_MS=5000
17+
PREFETCH=10
18+
LOG_LEVEL=info

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
FROM node:20-alpine AS deps
22
WORKDIR /app
33
COPY package.json package-lock.json* ./
4-
RUN npm install
4+
RUN npm ci --fetch-retries 5 --fetch-retry-mintimeout 20000 --fetch-retry-maxtimeout 120000
55

66
FROM deps AS build
77
WORKDIR /app

README.md

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111
- читает `source.raw.v1` из RabbitMQ;
1212
- валидирует raw-события;
1313
- нормализует payload до формата `source.normalized.v1`;
14-
- публикует нормализованные события и отправляет ingest mutation в `backend-api`.
14+
- отправляет ingest mutation `ingestNormalizedItem` в `backend-api` с заголовком `x-ingest-token`;
15+
- после успешного ingest публикует нормализованные события в `source.normalized.v1`;
16+
- при transient ошибках отправляет сообщение в retry queue, а poison messages складывает в DLQ.
1517

1618
## Черновая реализация
1719

@@ -21,6 +23,20 @@
2123
- GraphQL-клиент отправки в backend (`src/backend-client.ts`);
2224
- Dockerfile и CI workflow.
2325

26+
## Переменные окружения
27+
28+
- `RABBITMQ_URL` - адрес RabbitMQ.
29+
- `QUEUE_RAW_EVENT` - основная очередь входящих raw-событий, по умолчанию `source.raw.v1`.
30+
- `QUEUE_RETRY_EVENT` - очередь retry, по умолчанию `source.raw.retry.v1`.
31+
- `QUEUE_DEAD_LETTER_EVENT` - очередь dead-letter, по умолчанию `source.raw.dlq.v1`.
32+
- `QUEUE_NORMALIZED_EVENT` - очередь нормализованных событий, по умолчанию `source.normalized.v1`.
33+
- `API_BASE_URL` - базовый URL `backend-api`, по умолчанию `http://localhost:3000`.
34+
- `GRAPHQL_PATH` - GraphQL path, по умолчанию `/graphql`.
35+
- `API_GRAPHQL_URL` - явный GraphQL URL. Если задан, имеет приоритет над `API_BASE_URL + GRAPHQL_PATH`.
36+
- `API_INGEST_TOKEN` - токен для заголовка `x-ingest-token`. Должен совпадать с `INGEST_API_TOKEN` в `backend-api`.
37+
- `SHARED_CONTRACTS_DIR` - путь к `shared-contracts`.
38+
- `RETRY_ATTEMPTS`, `RETRY_BASE_DELAY_MS`, `PREFETCH`, `LOG_LEVEL` - параметры обработки и логирования.
39+
2440
## Локальный запуск
2541

2642
```bash
@@ -29,13 +45,26 @@ npm install
2945
npm run start:dev
3046
```
3147

32-
## Важные переменные
48+
Для Docker Compose сервис ожидает:
49+
50+
- RabbitMQ на `RABBITMQ_URL`;
51+
- `backend-api` на `API_BASE_URL` или `API_GRAPHQL_URL`;
52+
- смонтированный `shared-contracts` в `SHARED_CONTRACTS_DIR`.
53+
54+
## Ожидаемые очереди
55+
56+
- `source.raw.v1` - входящие события.
57+
- `source.raw.retry.v1` - delayed retry queue.
58+
- `source.raw.dlq.v1` - dead-letter queue.
59+
- `source.normalized.v1` - успешно обработанные нормализованные события.
60+
61+
## Как проверить обработку
3362

34-
- `RABBITMQ_URL`
35-
- `QUEUE_RAW_EVENT`
36-
- `QUEUE_NORMALIZED_EVENT`
37-
- `API_GRAPHQL_URL`
38-
- `SHARED_CONTRACTS_DIR`
63+
1. Опубликуй валидное сообщение в `source.raw.v1`.
64+
2. В логах worker должны появиться строки `connected to rabbitmq`, `consuming queue`, `raw event validated`, `normalized event created`, `ingest success`, `published normalized event`, `message acknowledged`.
65+
3. Проверь, что сообщение появилось в `source.normalized.v1`, а запись создалась в `backend-api`.
66+
4. Для transient ошибки временно выключи `backend-api`: worker должен логировать `ingest failed` и `retry scheduled`, а не падать.
67+
5. Для poison message отправь невалидный JSON или payload, нарушающий schema: worker должен логировать `message dead-lettered`, сообщение должно уйти в `source.raw.dlq.v1`.
3968

4069
## Связи с другими репозиториями
4170

src/backend-client.ts

Lines changed: 72 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { PoisonMessageError, TransientMessageError, toErrorMessage } from "./errors";
12
import type { NormalizedSourceEvent } from "./types";
23

34
const INGEST_MUTATION = `
@@ -15,45 +16,83 @@ export async function sendToBackend(
1516
ingestToken: string,
1617
event: NormalizedSourceEvent
1718
): Promise<void> {
18-
const response = await fetch(graphqlUrl, {
19-
method: "POST",
20-
headers: {
21-
"content-type": "application/json",
22-
"x-ingest-token": ingestToken
23-
},
24-
body: JSON.stringify({
25-
query: INGEST_MUTATION,
26-
variables: {
27-
input: {
28-
externalId: event.externalId,
29-
source: event.source,
30-
title: event.title,
31-
description: event.description,
32-
customer: event.customer,
33-
supplier: event.supplier,
34-
amount: event.amount,
35-
currency: event.currency,
36-
publishedAt: event.publishedAt,
37-
deadlineAt: event.deadlineAt,
38-
payloadVersion: event.payloadVersion,
39-
sourceUrl: event.sourceUrl,
40-
status: event.status,
41-
rawPayload: { rawRef: event.rawRef, eventId: event.eventId },
42-
rawEvent: event.rawEvent
19+
let response: Response;
20+
21+
try {
22+
response = await fetch(graphqlUrl, {
23+
method: "POST",
24+
signal: AbortSignal.timeout(10000),
25+
headers: {
26+
"content-type": "application/json",
27+
"x-ingest-token": ingestToken
28+
},
29+
body: JSON.stringify({
30+
query: INGEST_MUTATION,
31+
variables: {
32+
input: {
33+
externalId: event.externalId,
34+
source: event.source,
35+
title: event.title,
36+
description: event.description,
37+
customer: event.customer,
38+
supplier: event.supplier,
39+
amount: event.amount,
40+
currency: event.currency,
41+
publishedAt: event.publishedAt,
42+
deadlineAt: event.deadlineAt,
43+
payloadVersion: event.payloadVersion,
44+
sourceUrl: event.sourceUrl,
45+
status: event.status,
46+
rawPayload: { rawRef: event.rawRef, eventId: event.eventId },
47+
rawEvent: event.rawEvent
48+
}
4349
}
44-
}
45-
})
46-
});
50+
})
51+
});
52+
} catch (error) {
53+
throw new TransientMessageError(
54+
`Backend API is unavailable at ${graphqlUrl}: ${toErrorMessage(error)}`,
55+
{ cause: error }
56+
);
57+
}
4758

4859
if (!response.ok) {
49-
throw new Error(`Backend API ответил с кодом ${response.status}`);
60+
const message = `Backend API responded with HTTP ${response.status}`;
61+
if (response.status >= 500 || response.status === 429 || response.status === 408) {
62+
throw new TransientMessageError(message);
63+
}
64+
65+
throw new PoisonMessageError(message);
5066
}
5167

52-
const payload = (await response.json()) as {
53-
errors?: Array<{ message: string }>;
54-
};
68+
let payload: { errors?: Array<{ message: string }> };
69+
70+
try {
71+
payload = (await response.json()) as {
72+
errors?: Array<{ message: string }>;
73+
};
74+
} catch (error) {
75+
throw new TransientMessageError("Backend API returned invalid JSON", { cause: error });
76+
}
5577

5678
if (payload.errors?.length) {
57-
throw new Error(`Ошибка GraphQL ingest: ${payload.errors.map((e) => e.message).join("; ")}`);
79+
const message = `GraphQL ingest failed: ${payload.errors.map((entry) => entry.message).join("; ")}`;
80+
if (isPoisonGraphqlError(message)) {
81+
throw new PoisonMessageError(message);
82+
}
83+
84+
throw new TransientMessageError(message);
5885
}
5986
}
87+
88+
function isPoisonGraphqlError(message: string): boolean {
89+
const normalized = message.toLowerCase();
90+
91+
return (
92+
normalized.includes("unauthorized") ||
93+
normalized.includes("forbidden") ||
94+
normalized.includes("invalid") ||
95+
normalized.includes("bad request") ||
96+
normalized.includes("validation")
97+
);
98+
}

src/config.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ import { z } from "zod";
33

44
const envSchema = z.object({
55
RABBITMQ_URL: z.string().default("amqp://app:app@localhost:5672"),
6-
API_GRAPHQL_URL: z.string().default("http://localhost:3000/graphql"),
6+
API_BASE_URL: z.string().url().optional(),
7+
API_GRAPHQL_URL: z.string().url().optional(),
8+
GRAPHQL_PATH: z.string().default("/graphql"),
79
API_INGEST_TOKEN: z.string().default("replace_me_ingest_token"),
810
QUEUE_RAW_EVENT: z.string().default("source.raw.v1"),
911
QUEUE_RETRY_EVENT: z.string().default("source.raw.retry.v1"),
@@ -15,4 +17,15 @@ const envSchema = z.object({
1517
PREFETCH: z.coerce.number().int().positive().default(10)
1618
});
1719

18-
export const config = envSchema.parse(process.env);
20+
const parsedEnv = envSchema.parse(process.env);
21+
22+
const apiBaseUrl = parsedEnv.API_BASE_URL ?? "http://localhost:3000";
23+
const graphqlPath = parsedEnv.GRAPHQL_PATH.startsWith("/")
24+
? parsedEnv.GRAPHQL_PATH
25+
: `/${parsedEnv.GRAPHQL_PATH}`;
26+
27+
export const config = {
28+
...parsedEnv,
29+
API_BASE_URL: apiBaseUrl,
30+
API_GRAPHQL_URL: parsedEnv.API_GRAPHQL_URL ?? new URL(graphqlPath, apiBaseUrl).toString()
31+
};

src/contracts/schema-validator.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import { readFileSync } from "node:fs";
22
import { join } from "node:path";
3-
import Ajv from "ajv";
3+
import Ajv2020 from "ajv/dist/2020";
44
import addFormats from "ajv-formats";
5+
import { PoisonMessageError } from "../errors";
56
import type { NormalizedSourceEvent, RawSourceEvent } from "../types";
67

78
export function createSchemaValidators(sharedContractsDir: string) {
@@ -12,7 +13,7 @@ export function createSchemaValidators(sharedContractsDir: string) {
1213
readFileSync(join(sharedContractsDir, "events", "source-normalized.v1.json"), "utf-8")
1314
);
1415

15-
const ajv = new Ajv({ allErrors: true });
16+
const ajv = new Ajv2020({ allErrors: true });
1617
addFormats(ajv);
1718
const rawValidate = ajv.compile<RawSourceEvent>(rawSchema);
1819
const normalizedValidate = ajv.compile<NormalizedSourceEvent>(normalizedSchema);
@@ -22,7 +23,7 @@ export function createSchemaValidators(sharedContractsDir: string) {
2223
if (rawValidate(event)) {
2324
return;
2425
}
25-
throw new Error(
26+
throw new PoisonMessageError(
2627
`Raw schema validation failed: ${(rawValidate.errors ?? [])
2728
.map((err) => `${err.instancePath || "/"} ${err.message}`)
2829
.join("; ")}`
@@ -32,7 +33,7 @@ export function createSchemaValidators(sharedContractsDir: string) {
3233
if (normalizedValidate(event)) {
3334
return;
3435
}
35-
throw new Error(
36+
throw new PoisonMessageError(
3637
`Normalized schema validation failed: ${(normalizedValidate.errors ?? [])
3738
.map((err) => `${err.instancePath || "/"} ${err.message}`)
3839
.join("; ")}`

src/errors.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
export class WorkerError extends Error {
2+
constructor(
3+
message: string,
4+
public readonly kind: "poison" | "transient",
5+
options?: { cause?: unknown }
6+
) {
7+
super(message, options);
8+
this.name = new.target.name;
9+
}
10+
}
11+
12+
export class PoisonMessageError extends WorkerError {
13+
constructor(message: string, options?: { cause?: unknown }) {
14+
super(message, "poison", options);
15+
}
16+
}
17+
18+
export class TransientMessageError extends WorkerError {
19+
constructor(message: string, options?: { cause?: unknown }) {
20+
super(message, "transient", options);
21+
}
22+
}
23+
24+
export function isPoisonMessageError(error: unknown): error is PoisonMessageError {
25+
return error instanceof WorkerError && error.kind === "poison";
26+
}
27+
28+
export function isTransientMessageError(error: unknown): error is TransientMessageError {
29+
return error instanceof WorkerError && error.kind === "transient";
30+
}
31+
32+
export function toErrorMessage(error: unknown): string {
33+
if (error instanceof Error) {
34+
return error.message;
35+
}
36+
37+
return String(error);
38+
}

0 commit comments

Comments
 (0)