From 3529d73a669e7c95dc1b2192cd1522f8d67fe745 Mon Sep 17 00:00:00 2001 From: Kuchizu <70284260+Kuchizu@users.noreply.github.com> Date: Sat, 2 May 2026 20:06:18 +0300 Subject: [PATCH 1/6] feat(metrics): expose worker metrics over HTTP (#547) --- .env.sample | 11 ++- .env.test | 4 +- lib/metrics.ts | 199 +++++++++++++++++++++++++++++++++++-------------- runner.ts | 34 +++------ 4 files changed, 162 insertions(+), 86 deletions(-) diff --git a/.env.sample b/.env.sample index f5c19c7a..bcd8054f 100644 --- a/.env.sample +++ b/.env.sample @@ -25,11 +25,14 @@ EVENT_SECRET=hell # @codex_bot webhook for reports CODEX_BOT_WEBHOOK= -# address of prometheus pushgateway -PROMETHEUS_PUSHGATEWAY_URL= +# Port for VictoriaMetrics metrics endpoint. +PROMETHEUS_METRICS_PORT= -# pushgateway push interval in ms -PROMETHEUS_PUSHGATEWAY_INTERVAL=10000 +# Host for metrics endpoint binding. +PROMETHEUS_METRICS_HOST=0.0.0.0 + +# Path for metrics endpoint. +PROMETHEUS_METRICS_PATH=/metrics # Grouper memory log controls # Number of handled tasks between memory checkpoint logs diff --git a/.env.test b/.env.test index 237fe292..a60516a7 100644 --- a/.env.test +++ b/.env.test @@ -22,8 +22,8 @@ EVENT_SECRET=hell # @codex_bot webhook for reports CODEX_BOT_WEBHOOK= -# address of prometheus pushgateway -PROMETHEUS_PUSHGATEWAY= +# Port for VictoriaMetrics metrics endpoint. +PROMETHEUS_METRICS_PORT= # Feature flags diff --git a/lib/metrics.ts b/lib/metrics.ts index e26f208f..3cfb490f 100644 --- a/lib/metrics.ts +++ b/lib/metrics.ts @@ -1,16 +1,19 @@ import * as client from 'prom-client'; -import os from 'os'; -import { nanoid } from 'nanoid'; +import * as http from 'http'; import createLogger from './logger'; const register = new client.Registry(); const logger = createLogger(); -const DEFAULT_PUSH_INTERVAL_MS = 10_000; -const ID_SIZE = 5; -const METRICS_JOB_NAME = 'workers'; +const DEFAULT_METRICS_HOST = '0.0.0.0'; +const DEFAULT_METRICS_PATH = '/metrics'; +const MIN_PORT = 1; +const MAX_PORT = 65535; +const HTTP_OK = 200; +const HTTP_NOT_FOUND = 404; +const HTTP_INTERNAL_SERVER_ERROR = 500; -let pushInterval: NodeJS.Timeout | null = null; +let metricsServer: http.Server | null = null; let currentWorkerName = ''; client.collectDefaultMetrics({ register }); @@ -18,80 +21,160 @@ client.collectDefaultMetrics({ register }); export { register, client }; /** - * Parse push interval from environment. + * Parse metrics endpoint port from environment. */ -function getPushIntervalMs(): number { - const rawInterval = process.env.PROMETHEUS_PUSHGATEWAY_INTERVAL; - const parsedInterval = rawInterval === undefined - ? DEFAULT_PUSH_INTERVAL_MS - : Number(rawInterval); - - const interval = Number.isFinite(parsedInterval) && parsedInterval > 0 - ? parsedInterval - : DEFAULT_PUSH_INTERVAL_MS; - - if (rawInterval !== undefined && interval !== parsedInterval) { - logger.warn(`[metrics] invalid PROMETHEUS_PUSHGATEWAY_INTERVAL="${rawInterval}", fallback to ${DEFAULT_PUSH_INTERVAL_MS}ms`); +function getMetricsPort(): number | null { + const rawPort = process.env.PROMETHEUS_METRICS_PORT; + + if (!rawPort) { + return null; + } + + const port = Number(rawPort); + + if (!Number.isInteger(port) || port < MIN_PORT || port > MAX_PORT) { + logger.warn(`[metrics] invalid PROMETHEUS_METRICS_PORT="${rawPort}"; expected an integer between ${MIN_PORT} and ${MAX_PORT}`); + + return null; + } + + return port; +} + +/** + * Read metrics endpoint path from environment. + */ +function getMetricsPath(): string { + const rawPath = process.env.PROMETHEUS_METRICS_PATH; + + if (!rawPath) { + return DEFAULT_METRICS_PATH; + } + + const path = rawPath.trim(); + + if (!path) { + logger.warn(`[metrics] invalid PROMETHEUS_METRICS_PATH="${rawPath}", fallback to ${DEFAULT_METRICS_PATH}`); + + return DEFAULT_METRICS_PATH; + } + + if (!path.startsWith('/')) { + const normalizedPath = `/${path}`; + + logger.warn(`[metrics] normalized PROMETHEUS_METRICS_PATH from "${rawPath}" to "${normalizedPath}"`); + + return normalizedPath; } - return interval; + return path; } /** - * Stop periodic push to pushgateway. + * Stop HTTP metrics endpoint. */ -export function stopMetricsPushing(): void { - if (!pushInterval) { +export function stopMetricsServer(): void { + if (!metricsServer) { return; } - clearInterval(pushInterval); - pushInterval = null; - logger.info(`[metrics] stopped pushing metrics for worker=${currentWorkerName}`); - currentWorkerName = ''; + const serverToStop = metricsServer; + const stoppedWorkerName = currentWorkerName; + + if (!serverToStop.listening) { + logger.info(`[metrics] endpoint already stopped for worker=${stoppedWorkerName}`); + + if (metricsServer === serverToStop) { + metricsServer = null; + currentWorkerName = ''; + } + + return; + } + + serverToStop.close((error) => { + if (error) { + logger.error(`[metrics] failed to stop endpoint for worker=${stoppedWorkerName}: ${error.message}`); + + return; + } + + if (metricsServer === serverToStop) { + metricsServer = null; + currentWorkerName = ''; + } + + logger.info(`[metrics] stopped endpoint for worker=${stoppedWorkerName}`); + }); } /** - * Start periodic push to pushgateway. + * Start HTTP metrics endpoint for scraper-based monitoring. * - * @param workerName - name of the worker for grouping. + * @param workerName - name of the worker for default metric labels. */ -export function startMetricsPushing(workerName: string): () => void { - const url = process.env.PROMETHEUS_PUSHGATEWAY_URL; +export function startMetricsServer(workerName: string): () => void { + const port = getMetricsPort(); - if (!url) { - return stopMetricsPushing; + if (!port) { + return stopMetricsServer; } - if (pushInterval) { - logger.warn(`[metrics] pushing is already started for worker=${currentWorkerName}, skip duplicate start for worker=${workerName}`); + if (metricsServer) { + logger.warn(`[metrics] endpoint is already started for worker=${currentWorkerName}, skip duplicate start for worker=${workerName}`); - return stopMetricsPushing; + return stopMetricsServer; } - const interval = getPushIntervalMs(); - const hostname = os.hostname(); - const id = nanoid(ID_SIZE); - const gateway = new client.Pushgateway(url, undefined, register); + const host = process.env.PROMETHEUS_METRICS_HOST || DEFAULT_METRICS_HOST; + const path = getMetricsPath(); + + register.setDefaultLabels({ worker: workerName }); + const server = http.createServer(async (request, response) => { + const requestPath = request.url?.split('?')[0]; + + if (requestPath === '/-/healthy') { + response.writeHead(HTTP_OK, { 'Content-Type': 'text/plain' }); + response.end('ok'); + + return; + } + + if (request.method !== 'GET' || requestPath !== path) { + response.writeHead(HTTP_NOT_FOUND, { 'Content-Type': 'text/plain' }); + response.end('not found'); + + return; + } + + try { + response.writeHead(HTTP_OK, { 'Content-Type': register.contentType }); + response.end(await register.metrics()); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + + logger.error(`[metrics] failed to render metrics: ${message}`); + response.writeHead(HTTP_INTERNAL_SERVER_ERROR, { 'Content-Type': 'text/plain' }); + response.end('metrics error'); + } + }); + + server.on('error', (error) => { + logger.error(`[metrics] endpoint error for worker=${workerName}: ${error.message}`); + + if (metricsServer === server) { + metricsServer = null; + currentWorkerName = ''; + } + }); + + metricsServer = server; currentWorkerName = workerName; - logger.info(`Start pushing metrics to ${url} every ${interval}ms (host: ${hostname}, id: ${id}, worker: ${workerName})`); - - pushInterval = setInterval(() => { - gateway.pushAdd({ - jobName: METRICS_JOB_NAME, - groupings: { - worker: workerName, - host: hostname, - id, - }, - }, (err) => { - if (err) { - logger.error(`Metrics push error: ${err.message || err}`); - } - }); - }, interval); - - return stopMetricsPushing; + server.listen(port, host, () => { + logger.info(`[metrics] endpoint started for worker=${workerName} at http://${host}:${port}${path}`); + }); + + return stopMetricsServer; } diff --git a/runner.ts b/runner.ts index e68fd090..1c19f97b 100644 --- a/runner.ts +++ b/runner.ts @@ -1,15 +1,8 @@ import * as utils from './lib/utils'; - -/* Prometheus client for pushing metrics to the pushgateway */ -// import os from 'os'; -// import * as promClient from 'prom-client'; -// import gcStats from 'prometheus-gc-stats'; -// import { nanoid } from 'nanoid'; -// import * as url from 'url'; import { Worker } from './lib/worker'; import HawkCatcher from '@hawk.so/nodejs'; import * as dotenv from 'dotenv'; -import { startMetricsPushing } from './lib/metrics'; +import { startMetricsServer } from './lib/metrics'; dotenv.config(); @@ -24,15 +17,15 @@ const BEGINNING_OF_ARGS = 2; */ const workerNames = process.argv.slice(BEGINNING_OF_ARGS); -/** +/** * Initialize HawkCatcher -*/ + */ if (process.env.HAWK_CATCHER_TOKEN) { HawkCatcher.init({ token: process.env.HAWK_CATCHER_TOKEN, context: { - workerTypes: workerNames.join(","), - } + workerTypes: workerNames.join(','), + }, }); } @@ -46,12 +39,10 @@ class WorkerRunner { */ private workers: Worker[] = []; - // private gateway?: promClient.Pushgateway; - /** - * Metrics push cleanup callback. + * Metrics endpoint cleanup callback. */ - private stopMetricsPushing?: () => void; + private stopMetricsServer?: () => void; /** * Create runner instance @@ -90,7 +81,7 @@ class WorkerRunner { * Run metrics exporter */ private startMetrics(): void { - if (!process.env.PROMETHEUS_PUSHGATEWAY_URL) { + if (!process.env.PROMETHEUS_METRICS_PORT && !process.env.PROMETHEUS_PUSHGATEWAY_URL) { return; } @@ -105,10 +96,10 @@ class WorkerRunner { const workerTypeForMetrics = workerTypes.length === 1 ? workerTypes[0] : 'multi_worker_process'; if (workerTypes.length > 1) { - console.warn(`[metrics] ${workerTypes.length} workers are running in one process; pushing metrics as "${workerTypeForMetrics}" to avoid duplicated attribution`); + console.warn(`[metrics] ${workerTypes.length} workers are running in one process; exposing metrics as "${workerTypeForMetrics}"`); } - this.stopMetricsPushing = startMetricsPushing(workerTypeForMetrics); + this.stopMetricsServer = startMetricsServer(workerTypeForMetrics); } /** @@ -243,9 +234,8 @@ class WorkerRunner { */ private async stopWorker(worker: Worker): Promise { try { - // stop pushing metrics - this.stopMetricsPushing?.(); - this.stopMetricsPushing = undefined; + this.stopMetricsServer?.(); + this.stopMetricsServer = undefined; await worker.finish(); console.log( From 5885e7750ae4f82a210639458e838a5e81a3252a Mon Sep 17 00:00:00 2001 From: Kuchizu <70284260+Kuchizu@users.noreply.github.com> Date: Fri, 8 May 2026 04:51:42 +0300 Subject: [PATCH 2/6] perf(notifier): project only notifications field when loading rules (#551) * perf(notifier): project only notifications field when loading rules * test(notifier): update findOne assertion for projection arg --- workers/notifier/src/index.ts | 7 +++++-- workers/notifier/tests/worker.test.ts | 5 ++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/workers/notifier/src/index.ts b/workers/notifier/src/index.ts index d2d17966..c629c6d5 100644 --- a/workers/notifier/src/index.ts +++ b/workers/notifier/src/index.ts @@ -193,9 +193,12 @@ export default class NotifierWorker extends Worker { */ private async getProjectNotificationRules(projectId: string): Promise { const connection = this.accountsDb.getConnection(); - const projects = connection.collection('projects'); + const projects = connection.collection<{ notifications?: Rule[] }>('projects'); - const project = await projects.findOne({ _id: new ObjectID(projectId) }); + const project = await projects.findOne( + { _id: new ObjectID(projectId) }, + { projection: { notifications: 1 } } + ); if (!project) { throw new Error('There is no project with given id'); diff --git a/workers/notifier/tests/worker.test.ts b/workers/notifier/tests/worker.test.ts index 1f128ac1..dc438cd6 100644 --- a/workers/notifier/tests/worker.test.ts +++ b/workers/notifier/tests/worker.test.ts @@ -242,7 +242,10 @@ describe('NotifierWorker', () => { await worker.handle(message); - expect(dbQueryMock).toBeCalledWith({ _id: new ObjectID(message.projectId) }); + expect(dbQueryMock).toBeCalledWith( + { _id: new ObjectID(message.projectId) }, + { projection: { notifications: 1 } } + ); }); it('should close db connection on finish', async () => { From 2f0bad09ad335113cf4e367cd0b0674193168331 Mon Sep 17 00:00:00 2001 From: Kuchizu <70284260+Kuchizu@users.noreply.github.com> Date: Tue, 12 May 2026 01:21:20 +0300 Subject: [PATCH 3/6] perf(paymaster): batch subscription check over workspaces (#550) * perf(paymaster): batch subscription check over workspaces * test(paymaster): cover multi-batch dispatch in subscription check * refactor(paymaster): pass batch into flush and clear it explicitly --- workers/paymaster/src/index.ts | 73 ++++++++++++++++++++++----- workers/paymaster/tests/index.test.ts | 48 ++++++++++++++++++ 2 files changed, 108 insertions(+), 13 deletions(-) diff --git a/workers/paymaster/src/index.ts b/workers/paymaster/src/index.ts index 94be50d9..7d588be5 100644 --- a/workers/paymaster/src/index.ts +++ b/workers/paymaster/src/index.ts @@ -34,6 +34,33 @@ const DAYS_LEFT_ALERT = [3, 2, 1]; // eslint-disable-next-line @typescript-eslint/no-magic-numbers const DAYS_AFTER_BLOCK_TO_REMIND = [1, 2, 3, 5, 7, 30]; +/** + * Bounds concurrent updateOne / addTask calls per subscription check tick. + */ +const WORKSPACE_PROCESSING_CONCURRENCY = 25; + +const WORKSPACE_CURSOR_BATCH_SIZE = 200; + +/** + * Keep in sync with fields read by `processWorkspaceSubscriptionCheck` and its helpers. + */ +const WORKSPACE_SUBSCRIPTION_PROJECTION = { + _id: 1, + name: 1, + tariffPlanId: 1, + lastChargeDate: 1, + paidUntil: 1, + isDebug: 1, + isBlocked: 1, + blockedDate: 1, + subscriptionId: 1, +} as const; + +const PLAN_PROJECTION = { + _id: 1, + monthlyCharge: 1, +} as const; + /** * Worker to check workspaces subscription status and ban workspaces without actual subscription */ @@ -151,7 +178,10 @@ export default class PaymasterWorker extends Worker { throw new Error('Plans collection is not initialized'); } - this.plans = await this.plansCollection.find({}).toArray(); + this.plans = await this.plansCollection + .find({}) + .project(PLAN_PROJECTION) + .toArray(); if (this.plans.length === 0) { throw new Error('Please add tariff plans to the database'); @@ -195,28 +225,45 @@ export default class PaymasterWorker extends Worker { * Called periodically, enumerate through workspaces and check if today is a payday for workspace subscription */ private async handleWorkspaceSubscriptionCheckEvent(): Promise { - const workspaces = await this.workspaces.find({}).toArray(); + const cursor = this.workspaces + .find({}) + .project(WORKSPACE_SUBSCRIPTION_PROJECTION) + .batchSize(WORKSPACE_CURSOR_BATCH_SIZE); + + let batch: WorkspaceDBScheme[] = []; - await Promise.all(workspaces - .filter(workspace => { + const flush = async (currentBatch: WorkspaceDBScheme[]): Promise => { + if (currentBatch.length === 0) { + return; + } + + await Promise.all(currentBatch.map((workspace) => this.processWorkspaceSubscriptionCheck(workspace))); + }; + + try { + for await (const workspace of cursor) { /** * Skip workspace without lastChargeDate */ if (!workspace.lastChargeDate) { - const error = new Error('[Paymaster] Workspace without lastChargeDate detected'); - - HawkCatcher.send(error, { + HawkCatcher.send(new Error('[Paymaster] Workspace without lastChargeDate detected'), { workspaceId: workspace._id.toString(), }); + continue; + } + + batch.push(workspace); - return false; + if (batch.length >= WORKSPACE_PROCESSING_CONCURRENCY) { + await flush(batch); + batch = []; } + } - return true; - }) - .map( - (workspace) => this.processWorkspaceSubscriptionCheck(workspace) - )); + await flush(batch); + } finally { + await cursor.close(); + } } /** diff --git a/workers/paymaster/tests/index.test.ts b/workers/paymaster/tests/index.test.ts index 51ff31fd..23a1bd69 100644 --- a/workers/paymaster/tests/index.test.ts +++ b/workers/paymaster/tests/index.test.ts @@ -794,6 +794,54 @@ describe('PaymasterWorker', () => { MockDate.reset(); }); + test('Should process every workspace when there are several batches', async () => { + /** + * 50 > WORKSPACE_PROCESSING_CONCURRENCY (25), so the subscription check + * has to flush more than one batch. + */ + const WORKSPACES_COUNT = 50; + const currentDate = new Date('2005-12-22'); + const plan = createPlanMock({ + monthlyCharge: 0, + isDefault: true, + }); + + const workspaces = Array.from({ length: WORKSPACES_COUNT }, () => + createWorkspaceMock({ + plan, + subscriptionId: null, + lastChargeDate: new Date('2005-11-22'), + isBlocked: false, + billingPeriodEventsCount: 0, + }) + ); + + await tariffCollection.insertOne(plan); + await workspacesCollection.insertMany(workspaces); + + MockDate.set(currentDate); + + const worker = new PaymasterWorker(); + const processSpy = jest + .spyOn(worker as any, 'processWorkspaceSubscriptionCheck') + .mockResolvedValue([null, false]); + + await worker.start(); + await worker.handle(WORKSPACE_SUBSCRIPTION_CHECK); + await worker.finish(); + + expect(processSpy).toHaveBeenCalledTimes(WORKSPACES_COUNT); + + const calledIds = processSpy.mock.calls + .map((call) => (call[0] as WorkspaceDBScheme)._id.toString()) + .sort(); + const expectedIds = workspaces.map((w) => w._id.toString()).sort(); + + expect(calledIds).toEqual(expectedIds); + + MockDate.reset(); + }); + afterAll(async () => { await connection.close(); MockDate.reset(); From 8f68626e3321eb3e278a088febed5c621d6af369 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Wed, 13 May 2026 19:15:57 +0300 Subject: [PATCH 4/6] Imp(limiter): fetch workspaces with cursor (#554) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * imp(): fetch workspaces one by one * imp(): add workspace projection * imp(): fix function overload * fix(): eslint and types * fix(): missing await --------- Co-authored-by: Егор Коновалов --- workers/limiter/src/dbHelper.ts | 109 +++++++++++++++++-------- workers/limiter/src/index.ts | 14 ++-- workers/limiter/tests/dbHelper.test.ts | 18 ++-- workers/limiter/types/index.ts | 7 +- 4 files changed, 101 insertions(+), 47 deletions(-) diff --git a/workers/limiter/src/dbHelper.ts b/workers/limiter/src/dbHelper.ts index b66336e9..dbe45da0 100644 --- a/workers/limiter/src/dbHelper.ts +++ b/workers/limiter/src/dbHelper.ts @@ -2,7 +2,7 @@ import { Collection, Db, ObjectId } from 'mongodb'; import { ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types'; import { WorkspaceWithTariffPlan } from '../types'; import HawkCatcher from '@hawk.so/nodejs'; -import { CriticalError } from '../../../lib/workerErrors'; +import { CriticalError, NonCriticalError } from '../../../lib/workerErrors'; /** * Class that implements methods used for interaction between limiter and db @@ -35,49 +35,24 @@ export class DbHelper { } /** - * Method that returns all workspaces with their tariff plans + * Method that yields all workspaces with their tariff plans */ - public async getWorkspacesWithTariffPlans():Promise; + public getWorkspacesWithTariffPlans(): AsyncGenerator; /** * Method that returns workspace with its tariff plan by its id * * @param id - id of the workspace to fetch */ - public async getWorkspacesWithTariffPlans(id: string):Promise; + public getWorkspacesWithTariffPlans(id: string): Promise; /** - * Returns workspace with its tariff plan by its id - * - * @param id - workspace id + * @param id - id of the workspace to fetch */ - public async getWorkspacesWithTariffPlans(id?: string):Promise { - /* eslint-disable-next-line */ - const queue: any[] = [ - { - $lookup: { - from: 'plans', - localField: 'tariffPlanId', - foreignField: '_id', - as: 'tariffPlan', - }, - }, - { - $unwind: { - path: '$tariffPlan', - }, - }, - ]; - + public getWorkspacesWithTariffPlans(id?: string): AsyncGenerator | Promise { if (id !== undefined) { - queue.unshift({ - $match: { - _id: new ObjectId(id), - }, - }); + return this.getOneWorkspaceWithTariffPlan(id); } - const workspacesArray = await this.workspacesCollection.aggregate(queue).toArray(); - - return (id !== undefined) ? workspacesArray[0] : workspacesArray; + return this.yieldWorkspacesWithTariffPlans(); } /** @@ -172,4 +147,72 @@ export class DbHelper { return this.projectsCollection.find(query).toArray(); } + + /** + * Returns a single workspace with its tariff plan by id + * + * @param id - workspace id + */ + private async getOneWorkspaceWithTariffPlan(id: string): Promise { + const pipeline = [ + { + $match: { + _id: new ObjectId(id), + }, + }, + ...this.tariffPlanLookupPipeline(), + ]; + + const workspace = await this.workspacesCollection.aggregate(pipeline).next(); + + if (workspace === null) { + throw new NonCriticalError(`Workspace ${id} not found`, { + workspaceId: id, + }); + } + + return workspace; + } + + /** + * Yields all workspaces with their tariff plans one by one + */ + private async * yieldWorkspacesWithTariffPlans(): AsyncGenerator { + const pipeline = this.tariffPlanLookupPipeline(); + const cursor = this.workspacesCollection.aggregate(pipeline); + + for await (const workspace of cursor) { + yield workspace; + } + } + + /* eslint-disable-next-line */ + private tariffPlanLookupPipeline(): any[] { + return [ + { + $lookup: { + from: 'plans', + localField: 'tariffPlanId', + foreignField: '_id', + as: 'tariffPlan', + }, + }, + { + $unwind: { + path: '$tariffPlan', + }, + }, + { + $project: { + _id: 1, + name: 1, + isBlocked: 1, + blockedDate: 1, + lastChargeDate: 1, + billingPeriodEventsCount: 1, + tariffPlan: 1, + }, + }, + ]; + } } \ No newline at end of file diff --git a/workers/limiter/src/index.ts b/workers/limiter/src/index.ts index 9cb2abf6..82ed2539 100644 --- a/workers/limiter/src/index.ts +++ b/workers/limiter/src/index.ts @@ -189,16 +189,16 @@ export default class LimiterWorker extends Worker { private async handleRegularWorkspacesCheck(): Promise { let message = ''; - const workspaces = await this.dbHelper.getWorkspacesWithTariffPlans(); + const workspaces = this.dbHelper.getWorkspacesWithTariffPlans(); const updatedWorkspaces: WorkspaceWithTariffPlan[] = []; - await Promise.all(workspaces.map(async (workspace) => { + for await (const workspace of workspaces) { /** * If workspace is already blocked - do nothing */ if (workspace.isBlocked) { - return; + continue; } const workspaceProjects = await this.dbHelper.getProjects(workspace._id.toString()); @@ -211,7 +211,7 @@ export default class LimiterWorker extends Worker { * If there are no projects to update - move on to next workspace */ if (projectsToUpdate.length === 0) { - return; + continue; } /** @@ -223,12 +223,12 @@ export default class LimiterWorker extends Worker { updatedWorkspace.isBlocked = true; updatedWorkspace.blockedDate = new Date(); - this.redis.appendBannedProjects(projectIds); + await this.redis.appendBannedProjects(projectIds); message += this.formSingleWorkspaceMessage(updatedWorkspace, projectsToUpdate, 'blocked'); } - })); + } - this.dbHelper.updateWorkspacesEventsCountAndIsBlocked(updatedWorkspaces); + await this.dbHelper.updateWorkspacesEventsCountAndIsBlocked(updatedWorkspaces); this.sendRegularReport(message); } diff --git a/workers/limiter/tests/dbHelper.test.ts b/workers/limiter/tests/dbHelper.test.ts index 3a4a49f8..947b070a 100644 --- a/workers/limiter/tests/dbHelper.test.ts +++ b/workers/limiter/tests/dbHelper.test.ts @@ -159,16 +159,22 @@ describe('DbHelper', () => { /** * Act */ - const result = await dbHelper.getWorkspacesWithTariffPlans(); + const cursor = dbHelper.getWorkspacesWithTariffPlans(); + + const workspaces = []; + + for await (const workspace of cursor) { + workspaces.push(workspace); + } /** * Assert */ - expect(result).toHaveLength(2); - expect(result[0].tariffPlan).toBeDefined(); - expect(result[1].tariffPlan).toBeDefined(); - expect(result[0].tariffPlan.eventsLimit).toBe(10); - expect(result[1].tariffPlan.eventsLimit).toBe(10000); + expect(workspaces).toHaveLength(2); + expect(workspaces[0].tariffPlan).toBeDefined(); + expect(workspaces[1].tariffPlan).toBeDefined(); + expect(workspaces[0].tariffPlan.eventsLimit).toBe(10); + expect(workspaces[1].tariffPlan.eventsLimit).toBe(10000); }); test('Should return single workspace with its tariff plan by id', async () => { diff --git a/workers/limiter/types/index.ts b/workers/limiter/types/index.ts index c04c3936..e788e5dc 100644 --- a/workers/limiter/types/index.ts +++ b/workers/limiter/types/index.ts @@ -3,4 +3,9 @@ import { PlanDBScheme, WorkspaceDBScheme } from '@hawk.so/types'; /** * Workspace with its tariff plan */ -export type WorkspaceWithTariffPlan = WorkspaceDBScheme & {tariffPlan: PlanDBScheme}; +export type WorkspaceWithTariffPlan = Pick< + WorkspaceDBScheme, + '_id' | 'name' | 'isBlocked' | 'blockedDate' | 'lastChargeDate' | 'billingPeriodEventsCount' +> & { + tariffPlan: PlanDBScheme; +}; From 6fb756d46a66b7e4b7dabbdd9a6aa3b8e1312f28 Mon Sep 17 00:00:00 2001 From: Kuchizu <70284260+Kuchizu@users.noreply.github.com> Date: Wed, 3 Jun 2026 14:39:44 +0300 Subject: [PATCH 5/6] feat(db): support MONGO_APP_NAME for per-worker MongoDB attribution (#557) --- .env.sample | 3 +++ lib/db/controller.ts | 10 +++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/.env.sample b/.env.sample index bcd8054f..f2df9be2 100644 --- a/.env.sample +++ b/.env.sample @@ -16,6 +16,9 @@ MONGO_ACCOUNTS_DATABASE_URI=mongodb://localhost:27017/hawk # MongoDB URL for connecting to events database MONGO_EVENTS_DATABASE_URI=mongodb://localhost:27017/hawk_events +# MongoDB appName for per-worker load attribution (e.g. hawk-worker-limiter) +MONGO_APP_NAME= + # JWT secret key for projects tokens JWT_SECRET=qwerty diff --git a/lib/db/controller.ts b/lib/db/controller.ts index 1eb7b91e..220db627 100644 --- a/lib/db/controller.ts +++ b/lib/db/controller.ts @@ -20,6 +20,11 @@ export class DatabaseController { */ private readonly connectionUri: string; + /** + * Sent to MongoDB on handshake; overrides any `appName` query param in the URI + */ + private readonly appName?: string; + /** * GridFSBucket object * Used to store files in GridFS @@ -30,12 +35,14 @@ export class DatabaseController { * Creates controller instance * * @param connectionUri - mongo URI for connection + * @param appName - MongoDB appName, defaults to `process.env.MONGO_APP_NAME` */ - constructor(connectionUri: string) { + constructor(connectionUri: string, appName?: string) { if (!connectionUri) { throw new DatabaseConnectionError('Connection URI is not specified. Check .env'); } this.connectionUri = connectionUri; + this.appName = appName ?? process.env.MONGO_APP_NAME; } /** @@ -53,6 +60,7 @@ export class DatabaseController { this.connection = await connect(this.connectionUri, { useNewUrlParser: true, useUnifiedTopology: true, + ...(this.appName ? { appName: this.appName } : {}), }); this.db = await this.connection.db(); From c7ad00238e5aae6321a2b0b46d1b255e0bcd62c2 Mon Sep 17 00:00:00 2001 From: Kuchizu <70284260+Kuchizu@users.noreply.github.com> Date: Wed, 3 Jun 2026 14:40:29 +0300 Subject: [PATCH 6/6] perf(limiter): cache plans in memory instead of $lookup per workspace (#556) * perf(limiter): cache plans in memory instead of $lookup per workspace * perf(limiter): memoize missing plan ids to avoid repeated cache refreshes * test(limiter): cover tariff plan caching in dbHelper --- workers/limiter/src/dbHelper.ts | 170 ++++++++++++++++------- workers/limiter/src/index.ts | 7 +- workers/limiter/tests/dbHelper.test.ts | 179 ++++++++++++++++++++++++- 3 files changed, 306 insertions(+), 50 deletions(-) diff --git a/workers/limiter/src/dbHelper.ts b/workers/limiter/src/dbHelper.ts index dbe45da0..b4cc845f 100644 --- a/workers/limiter/src/dbHelper.ts +++ b/workers/limiter/src/dbHelper.ts @@ -1,9 +1,24 @@ import { Collection, Db, ObjectId } from 'mongodb'; -import { ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types'; +import { PlanDBScheme, ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types'; import { WorkspaceWithTariffPlan } from '../types'; import HawkCatcher from '@hawk.so/nodejs'; import { CriticalError, NonCriticalError } from '../../../lib/workerErrors'; +const WORKSPACE_PROJECTION = { + _id: 1, + name: 1, + isBlocked: 1, + blockedDate: 1, + lastChargeDate: 1, + billingPeriodEventsCount: 1, + tariffPlanId: 1, +} as const; + +type WorkspaceForLimiter = Pick< + WorkspaceDBScheme, + '_id' | 'name' | 'isBlocked' | 'blockedDate' | 'lastChargeDate' | 'billingPeriodEventsCount' | 'tariffPlanId' +>; + /** * Class that implements methods used for interaction between limiter and db */ @@ -23,15 +38,49 @@ export class DbHelper { */ private workspacesCollection: Collection; + /** + * Collection with tariff plans + */ + private plansCollection: Collection; + + /** + * In-memory cache of tariff plans — avoids $lookup on the small plans collection per workspace + */ + private plans: PlanDBScheme[] = []; + + /** + * Plan ids that were still missing after a cache refresh — don't trigger more refreshes for them + */ + private knownMissingPlanIds: Set = new Set(); + /** * @param projects - projects collection * @param workspaces - workspaces collection + * @param plans - plans collection * @param eventsDbConnection - connection to events DB */ - constructor(projects: Collection, workspaces: Collection, eventsDbConnection: Db) { + constructor( + projects: Collection, + workspaces: Collection, + plans: Collection, + eventsDbConnection: Db + ) { this.eventsDbConnection = eventsDbConnection; this.projectsCollection = projects; this.workspacesCollection = workspaces; + this.plansCollection = plans; + } + + /** + * Fetches tariff plans from database and keeps them cached + */ + public async fetchPlans(): Promise { + this.plans = await this.plansCollection.find({}).toArray(); + this.knownMissingPlanIds.clear(); + + if (this.plans.length === 0) { + throw new CriticalError('Please add tariff plans to the database'); + } } /** @@ -148,22 +197,51 @@ export class DbHelper { return this.projectsCollection.find(query).toArray(); } + /** + * Returns plan from cache, refetches once on miss + * + * @param planId - id of the plan to find + */ + private async resolvePlan(planId: WorkspaceDBScheme['tariffPlanId']): Promise { + let plan = this.findPlanById(planId); + + if (plan) { + return plan; + } + + const planIdStr = planId.toString(); + + if (this.knownMissingPlanIds.has(planIdStr)) { + return null; + } + + await this.fetchPlans(); + plan = this.findPlanById(planId); + + if (!plan) { + this.knownMissingPlanIds.add(planIdStr); + } + + return plan ?? null; + } + + /** + * @param planId - id of the plan to find + */ + private findPlanById(planId: WorkspaceDBScheme['tariffPlanId']): PlanDBScheme | undefined { + return this.plans.find((plan) => plan._id.toString() === planId.toString()); + } + /** * Returns a single workspace with its tariff plan by id * * @param id - workspace id */ private async getOneWorkspaceWithTariffPlan(id: string): Promise { - const pipeline = [ - { - $match: { - _id: new ObjectId(id), - }, - }, - ...this.tariffPlanLookupPipeline(), - ]; - - const workspace = await this.workspacesCollection.aggregate(pipeline).next(); + const workspace = await this.workspacesCollection + .find({ _id: new ObjectId(id) }) + .project(WORKSPACE_PROJECTION) + .next(); if (workspace === null) { throw new NonCriticalError(`Workspace ${id} not found`, { @@ -171,48 +249,46 @@ export class DbHelper { }); } - return workspace; + const plan = await this.resolvePlan(workspace.tariffPlanId); + + if (!plan) { + throw new NonCriticalError(`Tariff plan ${workspace.tariffPlanId.toString()} not found for workspace ${id}`, { + workspaceId: id, + }); + } + + return { + ...workspace, + tariffPlan: plan, + }; } /** * Yields all workspaces with their tariff plans one by one */ private async * yieldWorkspacesWithTariffPlans(): AsyncGenerator { - const pipeline = this.tariffPlanLookupPipeline(); - const cursor = this.workspacesCollection.aggregate(pipeline); + const cursor = this.workspacesCollection + .find({}) + .project(WORKSPACE_PROJECTION); for await (const workspace of cursor) { - yield workspace; - } - } + const plan = await this.resolvePlan(workspace.tariffPlanId); - /* eslint-disable-next-line */ - private tariffPlanLookupPipeline(): any[] { - return [ - { - $lookup: { - from: 'plans', - localField: 'tariffPlanId', - foreignField: '_id', - as: 'tariffPlan', - }, - }, - { - $unwind: { - path: '$tariffPlan', - }, - }, - { - $project: { - _id: 1, - name: 1, - isBlocked: 1, - blockedDate: 1, - lastChargeDate: 1, - billingPeriodEventsCount: 1, - tariffPlan: 1, - }, - }, - ]; + if (!plan) { + HawkCatcher.send( + new Error(`[Limiter] Tariff plan not found for workspace`), + { + workspaceId: workspace._id.toString(), + tariffPlanId: workspace.tariffPlanId?.toString(), + } + ); + continue; + } + + yield { + ...workspace, + tariffPlan: plan, + }; + } } -} \ No newline at end of file +} diff --git a/workers/limiter/src/index.ts b/workers/limiter/src/index.ts index 82ed2539..6ed21cfe 100644 --- a/workers/limiter/src/index.ts +++ b/workers/limiter/src/index.ts @@ -3,7 +3,7 @@ import { Worker } from '../../../lib/worker'; import * as pkg from '../package.json'; import * as path from 'path'; import * as dotenv from 'dotenv'; -import { ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types'; +import { PlanDBScheme, ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types'; import HawkCatcher from '@hawk.so/nodejs'; import { MS_IN_SEC } from '../../../lib/utils/consts'; import LimiterEvent, { BlockWorkspaceEvent, UnblockWorkspaceEvent } from '../types/eventTypes'; @@ -68,8 +68,11 @@ export default class LimiterWorker extends Worker { const projectsCollection = accountDbConnection.collection('projects'); const workspacesCollection = accountDbConnection.collection('workspaces'); + const plansCollection = accountDbConnection.collection('plans'); - this.dbHelper = new DbHelper(projectsCollection, workspacesCollection, eventsDbConnection); + this.dbHelper = new DbHelper(projectsCollection, workspacesCollection, plansCollection, eventsDbConnection); + + await this.dbHelper.fetchPlans(); await this.redis.initialize(); diff --git a/workers/limiter/tests/dbHelper.test.ts b/workers/limiter/tests/dbHelper.test.ts index 947b070a..3e8eba54 100644 --- a/workers/limiter/tests/dbHelper.test.ts +++ b/workers/limiter/tests/dbHelper.test.ts @@ -4,6 +4,8 @@ import { GroupedEventDBScheme, PlanDBScheme, ProjectDBScheme, WorkspaceDBScheme import { DbHelper } from '../src/dbHelper'; import { mockedPlans } from './plans.mock'; import { MS_IN_SEC } from '../../../lib/utils/consts'; +import { CriticalError, NonCriticalError } from '../../../lib/workerErrors'; +import HawkCatcher from '@hawk.so/nodejs'; /** * Constant of last charge date in all workspaces for tests @@ -130,7 +132,8 @@ describe('DbHelper', () => { await planCollection.deleteMany({}); await planCollection.insertMany(Object.values(mockedPlans)); - dbHelper = new DbHelper(projectCollection, workspaceCollection, db); + dbHelper = new DbHelper(projectCollection, workspaceCollection, planCollection, db); + await dbHelper.fetchPlans(); }, 30000); // 30 seconds timeout for MongoDB connection and setup beforeEach(async () => { @@ -203,6 +206,180 @@ describe('DbHelper', () => { }); }); + describe('plans caching', () => { + /** + * Restore the default plans cache after tests that mutate the plans collection or the cache + */ + afterEach(async () => { + jest.restoreAllMocks(); + await planCollection.deleteMany({}); + await planCollection.insertMany(Object.values(mockedPlans)); + await dbHelper.fetchPlans(); + }); + + test('Should serve plans from the in-memory cache without reading the plans collection per workspace', async () => { + /** + * Arrange + */ + const workspace1 = createWorkspaceMock({ + plan: mockedPlans.eventsLimit10, + billingPeriodEventsCount: 0, + lastChargeDate: new Date(), + }); + const workspace2 = createWorkspaceMock({ + plan: mockedPlans.eventsLimit10000, + billingPeriodEventsCount: 0, + lastChargeDate: new Date(), + }); + + await workspaceCollection.insertMany([workspace1, workspace2]); + + /** + * Cache is already loaded in beforeAll — start watching the plans collection from now on + */ + const findSpy = jest.spyOn(planCollection, 'find'); + + /** + * Act + */ + const workspaces = []; + + for await (const workspace of dbHelper.getWorkspacesWithTariffPlans()) { + workspaces.push(workspace); + } + + /** + * Assert — both plans are resolved from the cache, the plans collection is never read + */ + expect(workspaces).toHaveLength(2); + expect(workspaces[0].tariffPlan.eventsLimit).toBe(10); + expect(workspaces[1].tariffPlan.eventsLimit).toBe(10000); + expect(findSpy).not.toHaveBeenCalled(); + }); + + test('Should refetch plans once on a cache miss and resolve a plan added after startup', async () => { + /** + * Arrange — a plan that exists in the database but not in the already-loaded cache + */ + const freshPlan: PlanDBScheme = { + _id: new ObjectId(), + name: 'Fresh plan', + monthlyCharge: 10, + monthlyChargeCurrency: 'RUB', + eventsLimit: 500, + isDefault: false, + }; + + await planCollection.insertOne(freshPlan); + + const workspace = createWorkspaceMock({ + plan: freshPlan, + billingPeriodEventsCount: 0, + lastChargeDate: new Date(), + }); + + await workspaceCollection.insertOne(workspace); + + const findSpy = jest.spyOn(planCollection, 'find'); + + /** + * Act + */ + const result = await dbHelper.getWorkspacesWithTariffPlans(workspace._id.toString()); + + /** + * Assert — the miss triggered exactly one refetch and the plan resolved from the refreshed cache + */ + expect(result.tariffPlan.eventsLimit).toBe(500); + expect(findSpy).toHaveBeenCalledTimes(1); + }); + + test('Should reload the plans collection only once for repeated misses of the same plan id', async () => { + /** + * Arrange — two workspaces referencing the same non-existent plan + */ + const missingPlanId = new ObjectId(); + + const workspace1 = createWorkspaceMock({ + plan: { + ...mockedPlans.eventsLimit10, + _id: missingPlanId, + }, + billingPeriodEventsCount: 0, + lastChargeDate: new Date(), + }); + const workspace2 = createWorkspaceMock({ + plan: { + ...mockedPlans.eventsLimit10, + _id: missingPlanId, + }, + billingPeriodEventsCount: 0, + lastChargeDate: new Date(), + }); + + await workspaceCollection.insertMany([workspace1, workspace2]); + + const findSpy = jest.spyOn(planCollection, 'find'); + const hawkCatcherSpy = jest.spyOn(HawkCatcher, 'send').mockImplementation(() => undefined); + + /** + * Act + */ + const workspaces = []; + + for await (const workspace of dbHelper.getWorkspacesWithTariffPlans()) { + workspaces.push(workspace); + } + + /** + * Assert — workspaces with a dangling plan are skipped and reported, and the missing id is + * memoized so the collection is reloaded only once despite two misses + */ + expect(workspaces).toHaveLength(0); + expect(findSpy).toHaveBeenCalledTimes(1); + expect(hawkCatcherSpy).toHaveBeenCalledTimes(2); + }); + + test('Should throw NonCriticalError when a single workspace references a non-existent plan', async () => { + /** + * Arrange + */ + const workspace = createWorkspaceMock({ + plan: { + ...mockedPlans.eventsLimit10, + _id: new ObjectId(), + }, + billingPeriodEventsCount: 0, + lastChargeDate: new Date(), + }); + + await workspaceCollection.insertOne(workspace); + + /** + * Act & Assert + */ + await expect( + dbHelper.getWorkspacesWithTariffPlans(workspace._id.toString()) + ).rejects.toThrow(NonCriticalError); + }); + + test('fetchPlans should throw CriticalError when the plans collection is empty', async () => { + /** + * Arrange — a helper pointed at an empty plans collection + */ + const emptyPlanCollection = db.collection('plans_empty_for_test'); + + await emptyPlanCollection.deleteMany({}); + + const helperWithoutPlans = new DbHelper(projectCollection, workspaceCollection, emptyPlanCollection, db); + + /** + * Act & Assert + */ + await expect(helperWithoutPlans.fetchPlans()).rejects.toThrow(CriticalError); + }); + }); + describe('updateWorkspacesEventsCountAndIsBlocked', () => { test('Should update multiple workspaces', async () => { /**