Skip to content

Commit 51c68bd

Browse files
committed
feat: harden normalization worker
1 parent 20d3708 commit 51c68bd

File tree

11 files changed

+2935
-43
lines changed

11 files changed

+2935
-43
lines changed

package-lock.json

Lines changed: 2681 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,29 @@
11
{
22
"name": "processing-worker",
3-
"version": "0.1.0",
3+
"version": "1.0.0",
44
"private": true,
5-
"description": "Черновой worker для нормализации и передачи данных в backend-api.",
5+
"description": "Production-ready normalization worker for AIMSORA with retry/DLQ handling and strict contract validation.",
66
"scripts": {
77
"build": "tsc -p tsconfig.json",
88
"start": "node dist/main.js",
99
"start:dev": "ts-node-dev --respawn --transpile-only src/main.ts",
10-
"check": "tsc --noEmit"
10+
"check": "tsc --noEmit",
11+
"test": "vitest run"
1112
},
1213
"dependencies": {
1314
"ajv": "^8.17.1",
15+
"ajv-formats": "^3.0.1",
1416
"amqplib": "^0.10.5",
1517
"dotenv": "^16.4.7"
18+
,"pino": "^9.7.0",
19+
"zod": "^3.24.2"
1620
},
1721
"devDependencies": {
1822
"@types/amqplib": "^0.10.7",
1923
"@types/node": "^22.10.5",
2024
"ts-node-dev": "^2.0.0",
21-
"typescript": "^5.7.2"
25+
"typescript": "^5.7.2",
26+
"vitest": "^3.1.1"
2227
},
2328
"engines": {
2429
"node": ">=20"

src/backend-client.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,21 @@ mutation IngestNormalizedItem($input: IngestNormalizedItemInput!) {
55
ingestNormalizedItem(input: $input) {
66
accepted
77
idempotencyKey
8+
procurementId
89
}
910
}
1011
`;
1112

1213
export async function sendToBackend(
1314
graphqlUrl: string,
15+
ingestToken: string,
1416
event: NormalizedSourceEvent
1517
): Promise<void> {
1618
const response = await fetch(graphqlUrl, {
1719
method: "POST",
1820
headers: {
19-
"content-type": "application/json"
21+
"content-type": "application/json",
22+
"x-ingest-token": ingestToken
2023
},
2124
body: JSON.stringify({
2225
query: INGEST_MUTATION,
@@ -25,13 +28,18 @@ export async function sendToBackend(
2528
externalId: event.externalId,
2629
source: event.source,
2730
title: event.title,
31+
description: event.description,
2832
customer: event.customer,
2933
supplier: event.supplier,
3034
amount: event.amount,
3135
currency: event.currency,
3236
publishedAt: event.publishedAt,
37+
deadlineAt: event.deadlineAt,
3338
payloadVersion: event.payloadVersion,
34-
rawPayload: { rawRef: event.rawRef, eventId: event.eventId }
39+
sourceUrl: event.sourceUrl,
40+
status: event.status,
41+
rawPayload: { rawRef: event.rawRef, eventId: event.eventId },
42+
rawEvent: event.rawEvent
3543
}
3644
}
3745
})

src/config.ts

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,18 @@
11
import "dotenv/config";
2+
import { z } from "zod";
23

3-
export const config = {
4-
rabbitmqUrl: process.env.RABBITMQ_URL ?? "amqp://app:app@localhost:5672",
5-
apiGraphqlUrl: process.env.API_GRAPHQL_URL ?? "http://localhost:3000/graphql",
6-
queueRaw: process.env.QUEUE_RAW_EVENT ?? "source.raw.v1",
7-
queueNormalized: process.env.QUEUE_NORMALIZED_EVENT ?? "source.normalized.v1",
8-
sharedContractsDir: process.env.SHARED_CONTRACTS_DIR ?? "../shared-contracts"
9-
};
4+
const envSchema = z.object({
5+
RABBITMQ_URL: z.string().default("amqp://app:app@localhost:5672"),
6+
API_GRAPHQL_URL: z.string().default("http://localhost:3000/graphql"),
7+
API_INGEST_TOKEN: z.string().default("replace_me_ingest_token"),
8+
QUEUE_RAW_EVENT: z.string().default("source.raw.v1"),
9+
QUEUE_RETRY_EVENT: z.string().default("source.raw.retry.v1"),
10+
QUEUE_DEAD_LETTER_EVENT: z.string().default("source.raw.dlq.v1"),
11+
QUEUE_NORMALIZED_EVENT: z.string().default("source.normalized.v1"),
12+
SHARED_CONTRACTS_DIR: z.string().default("../shared-contracts"),
13+
RETRY_ATTEMPTS: z.coerce.number().int().positive().default(5),
14+
RETRY_BASE_DELAY_MS: z.coerce.number().int().positive().default(5000),
15+
PREFETCH: z.coerce.number().int().positive().default(10)
16+
});
17+
18+
export const config = envSchema.parse(process.env);

src/contracts/schema-validator.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { readFileSync } from "node:fs";
22
import { join } from "node:path";
33
import Ajv from "ajv";
4+
import addFormats from "ajv-formats";
45
import type { NormalizedSourceEvent, RawSourceEvent } from "../types";
56

67
export function createSchemaValidators(sharedContractsDir: string) {
@@ -12,6 +13,7 @@ export function createSchemaValidators(sharedContractsDir: string) {
1213
);
1314

1415
const ajv = new Ajv({ allErrors: true });
16+
addFormats(ajv);
1517
const rawValidate = ajv.compile<RawSourceEvent>(rawSchema);
1618
const normalizedValidate = ajv.compile<NormalizedSourceEvent>(normalizedSchema);
1719

src/logger.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
import pino from "pino";
2+
3+
export const logger = pino({
4+
level: process.env.LOG_LEVEL ?? "info",
5+
base: { service: "processing-worker" }
6+
});

src/main.ts

Lines changed: 47 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,39 +2,67 @@ import type { ConsumeMessage } from "amqplib";
22
import { sendToBackend } from "./backend-client";
33
import { config } from "./config";
44
import { createSchemaValidators } from "./contracts/schema-validator";
5+
import { logger } from "./logger";
56
import { QueueClient } from "./messaging/queue-client";
67
import { normalizeRawEvent } from "./normalize";
78
import type { RawSourceEvent } from "./types";
89

910
async function bootstrap(): Promise<void> {
10-
const queue = new QueueClient(config.rabbitmqUrl);
11-
const validators = createSchemaValidators(config.sharedContractsDir);
11+
const queue = new QueueClient(config.RABBITMQ_URL);
12+
const validators = createSchemaValidators(config.SHARED_CONTRACTS_DIR);
1213

13-
await queue.init();
14-
await queue.assertQueue(config.queueRaw);
15-
await queue.assertQueue(config.queueNormalized);
14+
await queue.init(config.PREFETCH);
15+
await queue.assertRetryTopology(
16+
config.QUEUE_RAW_EVENT,
17+
config.QUEUE_RETRY_EVENT,
18+
config.QUEUE_DEAD_LETTER_EVENT
19+
);
20+
await queue.assertQueue(config.QUEUE_NORMALIZED_EVENT);
1621

17-
console.log("[processing-worker] запущен", {
18-
queueRaw: config.queueRaw,
19-
queueNormalized: config.queueNormalized,
20-
apiGraphqlUrl: config.apiGraphqlUrl
21-
});
22+
logger.info(
23+
{
24+
queueRaw: config.QUEUE_RAW_EVENT,
25+
queueRetry: config.QUEUE_RETRY_EVENT,
26+
queueDeadLetter: config.QUEUE_DEAD_LETTER_EVENT,
27+
queueNormalized: config.QUEUE_NORMALIZED_EVENT,
28+
apiGraphqlUrl: config.API_GRAPHQL_URL
29+
},
30+
"processing-worker started"
31+
);
32+
33+
queue.consume(config.QUEUE_RAW_EVENT, async (message: ConsumeMessage) => {
34+
const attempt = queue.getRetryCount(message);
35+
36+
try {
37+
const raw = queue.parseMessage<RawSourceEvent>(message);
38+
validators.validateRaw(raw);
2239

23-
queue.consume(config.queueRaw, async (message: ConsumeMessage) => {
24-
const raw = queue.parseMessage<RawSourceEvent>(message);
25-
validators.validateRaw(raw);
40+
const normalized = normalizeRawEvent(raw);
41+
validators.validateNormalized(normalized);
2642

27-
const normalized = normalizeRawEvent(raw);
28-
validators.validateNormalized(normalized);
43+
await queue.publish(config.QUEUE_NORMALIZED_EVENT, normalized);
44+
await sendToBackend(config.API_GRAPHQL_URL, config.API_INGEST_TOKEN, normalized);
45+
queue.ack(message);
2946

30-
await queue.publish(config.queueNormalized, normalized);
31-
await sendToBackend(config.apiGraphqlUrl, normalized);
47+
logger.info(
48+
{ eventId: raw.eventId, source: raw.source, externalId: normalized.externalId },
49+
"raw event normalized and ingested"
50+
);
51+
} catch (error) {
52+
const reason = error instanceof Error ? error.message : "Unknown worker error";
53+
if (attempt < config.RETRY_ATTEMPTS) {
54+
await queue.retry(message, config.QUEUE_RETRY_EVENT, attempt + 1, config.RETRY_BASE_DELAY_MS);
55+
logger.warn({ err: error, attempt: attempt + 1 }, "message scheduled for retry");
56+
return;
57+
}
3258

33-
console.log(`[processing-worker] обработано событие ${raw.eventId}`);
59+
await queue.deadLetter(message, config.QUEUE_DEAD_LETTER_EVENT, reason);
60+
logger.error({ err: error, attempt }, "message moved to dead-letter queue");
61+
}
3462
});
3563
}
3664

3765
void bootstrap().catch((error) => {
38-
console.error("[processing-worker] фатальная ошибка", error);
66+
logger.error({ err: error }, "processing-worker crashed");
3967
process.exit(1);
4068
});

src/messaging/queue-client.ts

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
1-
import { connect, type Channel, type Connection, type ConsumeMessage } from "amqplib";
1+
import { connect, type Channel, type ChannelModel, type ConsumeMessage, type Options } from "amqplib";
22

33
export class QueueClient {
4-
private connection?: Connection;
4+
private connection?: ChannelModel;
55
private channel?: Channel;
66

77
constructor(private readonly rabbitmqUrl: string) {}
88

9-
async init(): Promise<void> {
9+
async init(prefetch = 10): Promise<void> {
1010
this.connection = await connect(this.rabbitmqUrl);
1111
this.channel = await this.connection.createChannel();
12+
await this.channel.prefetch(prefetch);
1213
}
1314

1415
async assertQueue(queue: string): Promise<void> {
@@ -18,14 +19,29 @@ export class QueueClient {
1819
await this.channel.assertQueue(queue, { durable: true });
1920
}
2021

21-
async publish(queue: string, payload: unknown): Promise<void> {
22+
async assertRetryTopology(mainQueue: string, retryQueue: string, deadLetterQueue: string): Promise<void> {
23+
if (!this.channel) {
24+
throw new Error("QueueClient не инициализирован");
25+
}
26+
27+
await this.channel.assertQueue(deadLetterQueue, { durable: true });
28+
await this.channel.assertQueue(retryQueue, {
29+
durable: true,
30+
deadLetterExchange: "",
31+
deadLetterRoutingKey: mainQueue
32+
});
33+
await this.channel.assertQueue(mainQueue, { durable: true });
34+
}
35+
36+
async publish(queue: string, payload: unknown, options?: Options.Publish): Promise<void> {
2237
if (!this.channel) {
2338
throw new Error("QueueClient не инициализирован");
2439
}
2540

2641
this.channel.sendToQueue(queue, Buffer.from(JSON.stringify(payload)), {
2742
contentType: "application/json",
28-
persistent: true
43+
persistent: true,
44+
...options
2945
});
3046
}
3147

@@ -43,13 +59,50 @@ export class QueueClient {
4359
await handler(msg);
4460
this.channel?.ack(msg);
4561
} catch (error) {
46-
console.error("[processing-worker] Ошибка обработки сообщения", error);
47-
this.channel?.nack(msg, false, false);
62+
throw error;
4863
}
4964
});
5065
}
5166

5267
parseMessage<T>(message: ConsumeMessage): T {
5368
return JSON.parse(message.content.toString("utf-8")) as T;
5469
}
70+
71+
ack(message: ConsumeMessage): void {
72+
this.channel?.ack(message);
73+
}
74+
75+
async retry(
76+
message: ConsumeMessage,
77+
retryQueue: string,
78+
attempt: number,
79+
baseDelayMs: number
80+
): Promise<void> {
81+
await this.publish(retryQueue, JSON.parse(message.content.toString("utf-8")), {
82+
expiration: String(baseDelayMs * 2 ** attempt),
83+
headers: {
84+
...(message.properties.headers ?? {}),
85+
"x-retry-count": attempt
86+
}
87+
});
88+
this.ack(message);
89+
}
90+
91+
async deadLetter(
92+
message: ConsumeMessage,
93+
deadLetterQueue: string,
94+
reason: string
95+
): Promise<void> {
96+
await this.publish(deadLetterQueue, {
97+
reason,
98+
failedAt: new Date().toISOString(),
99+
payload: JSON.parse(message.content.toString("utf-8"))
100+
});
101+
this.ack(message);
102+
}
103+
104+
getRetryCount(message: ConsumeMessage): number {
105+
const raw = message.properties.headers?.["x-retry-count"];
106+
return typeof raw === "number" ? raw : Number(raw ?? 0);
107+
}
55108
}

src/normalize.spec.ts

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import { describe, expect, it } from "vitest";
2+
import { normalizeRawEvent } from "./normalize";
3+
import type { RawSourceEvent } from "./types";
4+
5+
describe("normalizeRawEvent", () => {
6+
it("normalizes demo-source payload", () => {
7+
const input: RawSourceEvent = {
8+
eventId: "evt-1",
9+
runKey: "demo-run",
10+
source: "demo-source",
11+
collectedAt: "2026-03-30T10:00:00.000Z",
12+
url: "https://example.org",
13+
payloadVersion: "v1",
14+
artifacts: [],
15+
raw: {
16+
externalId: "demo-1",
17+
title: "Поставка оборудования",
18+
customer: "АО Демонстрационная АЭС",
19+
amount: 1200
20+
}
21+
};
22+
23+
const normalized = normalizeRawEvent(input);
24+
25+
expect(normalized.externalId).toBe("demo-1");
26+
expect(normalized.title).toBe("Поставка оборудования");
27+
expect(normalized.customer).toBe("АО Демонстрационная АЭС");
28+
expect(normalized.amount).toBe(1200);
29+
expect(normalized.sourceUrl).toBe("https://example.org");
30+
});
31+
32+
it("normalizes find-tender payload using ocid", () => {
33+
const input: RawSourceEvent = {
34+
eventId: "evt-2",
35+
runKey: "find-tender-run",
36+
source: "find-tender",
37+
collectedAt: "2026-03-30T10:00:00.000Z",
38+
url: "https://www.find-tender.service.gov.uk/Notice/008889-2026",
39+
payloadVersion: "v1",
40+
artifacts: [],
41+
raw: {
42+
ocid: "ocds-h6vhtk-061410",
43+
title: "E-Disclosure Services",
44+
buyer: "Care Quality Commission",
45+
supplier: "KLDISCOVERY LIMITED",
46+
amount: 150000,
47+
currency: "GBP"
48+
}
49+
};
50+
51+
const normalized = normalizeRawEvent(input);
52+
53+
expect(normalized.externalId).toBe("ocds-h6vhtk-061410");
54+
expect(normalized.title).toBe("E-Disclosure Services");
55+
expect(normalized.customer).toBe("Care Quality Commission");
56+
expect(normalized.sourceUrl).toContain("find-tender.service.gov.uk");
57+
});
58+
});

0 commit comments

Comments
 (0)