From 8777e77c8706215d5fa0f890922d237cbed18417 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sat, 31 Jan 2026 02:20:15 +0500 Subject: [PATCH 1/9] chore(grouper): add counters to the grouper worker --- workers/grouper/src/index.ts | 181 +++++++++++++++++ workers/grouper/src/redisHelper.ts | 213 ++++++++++++++++++- workers/grouper/tests/index.test.ts | 304 ++++++++++++++++++++++++++++ 3 files changed, 697 insertions(+), 1 deletion(-) diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index 73f16fc7..8a10b369 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -26,6 +26,7 @@ import { rightTrim } from '../../../lib/utils/string'; import { hasValue } from '../../../lib/utils/hasValue'; /* eslint-disable-next-line no-unused-vars */ import { memoize } from '../../../lib/memoize'; +import TimeMs from '../../../lib/utils/time'; /** * eslint does not count decorators as a variable usage @@ -268,6 +269,186 @@ export default class GrouperWorker extends Worker { }); } } + + await this.incrementRateLimitCounter(task.projectId); + await this.recordProjectMetrics(task.projectId, 'events-stored'); + } + + /** + * Build RedisTimeSeries key for project metrics. + * + * @param projectId - id of the project + * @param metricType - metric type identifier + * @param granularity - time granularity + */ + private getTimeSeriesKey( + projectId: string, + metricType: string, + granularity: 'minutely' | 'hourly' | 'daily' + ): string { + return `ts:project-${metricType}:${projectId}:${granularity}`; + } + + /** + * Record project metrics to Redis TimeSeries. + * + * @param projectId - id of the project + * @param metricType - metric type identifier + */ + private async recordProjectMetrics(projectId: string, metricType: string): Promise { + const minutelyKey = this.getTimeSeriesKey(projectId, metricType, 'minutely'); + const hourlyKey = this.getTimeSeriesKey(projectId, metricType, 'hourly'); + const dailyKey = this.getTimeSeriesKey(projectId, metricType, 'daily'); + + const labels: Record = { + type: 'error', + status: metricType, + project: projectId, + }; + + const series = [ + { key: minutelyKey, label: 'minutely', retentionMs: TimeMs.DAY }, + { key: hourlyKey, label: 'hourly', retentionMs: TimeMs.WEEK }, + { key: dailyKey, label: 'daily', retentionMs: 90 * TimeMs.DAY }, + ]; + + for (const { key, label, retentionMs } of series) { + try { + await this.redis.safeTsAdd(key, 1, labels, retentionMs); + } catch (error) { + this.logger.error(`Failed to add ${label} TS for ${metricType}`, error); + } + } + } + + /** + * Increment rate limit counters for the project. + * + * @param projectId - id of the project + */ + private async incrementRateLimitCounter(projectId: string): Promise { + try { + const settings = await this.getProjectRateLimitSettings(projectId); + + if (!settings) { + return; + } + + await this.redis.incrementRateLimitCounterForCurrentEvent( + projectId, + settings.eventsPeriod, + settings.eventsLimit + ); + } catch (error) { + this.logger.error(`Failed to increment rate limit counter for project ${projectId}`, error); + } + } + + /** + * Fetch and normalize rate limit settings + * Rate limit settings could appear in tarifPlan, workspace and project. + * All rateLimits have different priority. + * + * @param projectId - id of the project + */ + @memoize({ max: 200, ttl: MEMOIZATION_TTL, strategy: 'concat', skipCache: [null] }) + private async getProjectRateLimitSettings(projectId: string): Promise<{ eventsLimit: number; eventsPeriod: number } | null> { + if (!projectId || !mongodb.ObjectID.isValid(projectId)) { + return null; + } + + const accountsDb = this.accountsDb.getConnection(); + + /** + * Fetch project from the db + */ + const project = await accountsDb + .collection('projects') + .findOne( + { _id: new mongodb.ObjectId(projectId) }, + { projection: { rateLimitSettings: 1, workspaceId: 1 } } + ); + + if (!project) { + return null; + } + + const projectRateLimitSettings = project.rateLimitSettings as { N: number, T: number }; + const workspaceId = new mongodb.ObjectID(project.workspaceId); + + let planRateLimitSettings: { N: number, T: number}; + let workspaceRateLimitSettings: { N: number, T: number}; + + /** + * Fetch workspace from the db + */ + if (workspaceId) { + const workspace = await accountsDb + .collection('workspaces') + .findOne( + { _id: workspaceId }, + { projection: { rateLimitSettings: 1, tariffPlanId: 1 } } + ); + + workspaceRateLimitSettings = workspace?.rateLimitSettings as { N: number, T: number }; + + const planId = new mongodb.ObjectId(workspace?.tariffPlanId); + + /** + * Tarif plan from the db + */ + if (planId) { + const plan = await accountsDb + .collection('plans') + .findOne( + { _id: planId }, + { projection: { rateLimitSettings: 1 } } + ); + + planRateLimitSettings = plan?.rateLimitSettings; + } + } + + return this.normalizeRateLimitSettings( + planRateLimitSettings, + workspaceRateLimitSettings, + projectRateLimitSettings + ); + } + + /** + * Normalize rate limit settings shape from database. + * + * @param rateLimitLayers - raw settings documents in priority order + */ + private normalizeRateLimitSettings( + ...rateLimitLayers: { N: number, T: number }[] + ): { eventsLimit: number; eventsPeriod: number } | null { + let eventsLimit = 0; + let eventsPeriod = 0; + + for (const layer of rateLimitLayers) { + if (!layer) { + continue; + } + + const limit = layer.N as number; + const period = layer.T as number; + + if (limit !== undefined && limit > 0) { + eventsLimit = limit; + } + + if (period !== undefined && period > 0) { + eventsPeriod = period; + } + } + + if (eventsLimit <= 0 || eventsPeriod <= 0) { + return null; + } + + return { eventsLimit, eventsPeriod }; } /** diff --git a/workers/grouper/src/redisHelper.ts b/workers/grouper/src/redisHelper.ts index a655c24b..f0f45a7e 100644 --- a/workers/grouper/src/redisHelper.ts +++ b/workers/grouper/src/redisHelper.ts @@ -110,6 +110,217 @@ export default class RedisHelper { return result === null; } + /** + * Increments redis counter used for rate limiting + * + * @param projectId - id of the project which event belongs to + * @param eventsPeriod - rate limit period configured for the project + */ + public async incrementRateLimitCounterForCurrentEvent(projectId: string, eventsPeriod: number, limit: number): Promise { + const script = ` + local key = KEYS[1] + local field = ARGV[1] + local now = tonumber(ARGV[2]) + local period = tonumber(ARGV[3]) + local limit = tonumber(ARGV[4]) + + local current = redis.call('HGET', key, field) + + -- If no record yet, start a new window with count = 1 + if not current then + redis.call('HSET', key, field, now .. ':1') + return + end + + local timestamp, count = string.match(current, '(%d+):(%d+)') + timestamp = tonumber(timestamp) + count = tonumber(count) + + -- Check if we're in a new time window + if now - timestamp >= period then + redis.call('HSET', key, field, now .. ':1') + return + end + + -- Check if incrementing would exceed limit + if count + 1 > limit then + return + end + + -- Increment counter + redis.call('HSET', key, field, timestamp .. ':' .. (count + 1)) + ` + + const key = 'rate_limits'; + const now = Math.floor(Date.now() / 1000); + + await this.redisClient.eval(script, { + keys: [key], + arguments: [projectId, now.toString(), eventsPeriod.toString(), limit.toString()], + }); + } + + /** + * Build label arguments for RedisTimeSeries commands + * + * @param labels - labels to attach to the time series + */ + private buildLabelArguments(labels: Record): string[] { + const labelArgs: string[] = ['LABELS']; + + for (const [labelKey, labelValue] of Object.entries(labels)) { + labelArgs.push(labelKey, labelValue); + } + + return labelArgs; + } + + /** + * Creates a RedisTimeSeries key if it doesn't exist. + * + * @param key - time series key + * @param labels - labels to attach to the time series + * @param retentionMs - optional retention in milliseconds + */ + public async tsCreateIfNotExists( + key: string, + labels: Record, + retentionMs = 0 + ): Promise { + const exists = await this.redisClient.exists(key); + + if (exists > 0) { + return; + } + + const args: string[] = ['TS.CREATE', key]; + + if (retentionMs > 0) { + args.push('RETENTION', Math.floor(retentionMs).toString()); + } + + args.push(...this.buildLabelArguments(labels)); + + await this.redisClient.sendCommand(args); + } + + /** + * Increments a RedisTimeSeries key with labels and timestamp. + * + * @param key - time series key + * @param value - value to increment by + * @param timestampMs - timestamp in milliseconds, defaults to current time + * @param labels - labels to attach to the sample + */ + public async tsIncrBy( + key: string, + value: number, + timestampMs = 0, + labels: Record = {} + ): Promise { + const labelArgs = this.buildLabelArguments(labels); + const timestamp = timestampMs === 0 ? Date.now() : timestampMs; + + const args: string[] = [ + 'TS.INCRBY', + key, + value.toString(), + 'TIMESTAMP', + Math.floor(timestamp).toString(), + ...labelArgs, + ]; + + await this.redisClient.sendCommand(args); + } + + /** + * Ensures that a RedisTimeSeries key exists and increments it safely. + * + * @param key - time series key + * @param value - value to increment by + * @param labels - labels to attach to the time series + * @param retentionMs - optional retention in milliseconds + */ + public async safeTsIncrBy( + key: string, + value: number, + labels: Record, + retentionMs = 0 + ): Promise { + const timestamp = Date.now(); + + try { + await this.tsIncrBy(key, value, timestamp, labels); + } catch (error) { + if (error instanceof Error && error.message.includes('TSDB: key does not exist')) { + this.logger.warn(`TS key ${key} does not exist, creating it...`); + await this.tsCreateIfNotExists(key, labels, retentionMs); + await this.tsIncrBy(key, value, timestamp, labels); + } else { + throw error; + } + } + } + + /** + * Adds a sample to a RedisTimeSeries key. + * + * @param key - time series key + * @param value - value to add + * @param timestampMs - timestamp in milliseconds, defaults to current time + * @param labels - labels to attach to the sample + */ + public async tsAdd( + key: string, + value: number, + timestampMs = 0, + labels: Record = {} + ): Promise { + const labelArgs = this.buildLabelArguments(labels); + const timestamp = timestampMs === 0 ? Date.now() : timestampMs; + + const args: string[] = [ + 'TS.ADD', + key, + Math.floor(timestamp).toString(), + value.toString(), + 'ON_DUPLICATE', + 'SUM', + ...labelArgs, + ]; + + await this.redisClient.sendCommand(args); + } + + /** + * Ensures that a RedisTimeSeries key exists and adds a sample safely. + * + * @param key - time series key + * @param value - value to add + * @param labels - labels to attach to the time series + * @param retentionMs - optional retention in milliseconds + */ + public async safeTsAdd( + key: string, + value: number, + labels: Record, + retentionMs = 0 + ): Promise { + const timestamp = Date.now(); + + try { + await this.tsAdd(key, value, timestamp, labels); + } catch (error) { + if (error instanceof Error && error.message.includes('TSDB: key does not exist')) { + this.logger.warn(`TS key ${key} does not exist, creating it...`); + await this.tsCreateIfNotExists(key, labels, retentionMs); + await this.tsAdd(key, value, timestamp, labels); + } else { + throw error; + } + } + } + /** * Creates callback function for Redis operations * @@ -130,4 +341,4 @@ export default class RedisHelper { resolve(resp !== 'OK'); }; } -} +} \ No newline at end of file diff --git a/workers/grouper/tests/index.test.ts b/workers/grouper/tests/index.test.ts index ee781e98..2ae61810 100644 --- a/workers/grouper/tests/index.test.ts +++ b/workers/grouper/tests/index.test.ts @@ -7,6 +7,7 @@ import type { Collection } from 'mongodb'; import { MongoClient } from 'mongodb'; import type { ErrorsCatcherType, EventAddons, EventData } from '@hawk.so/types'; import { MS_IN_SEC } from '../../../lib/utils/consts'; +import TimeMs from '../../../lib/utils/time'; import * as mongodb from 'mongodb'; import { patch } from '@n1ru4l/json-patch-plus'; @@ -57,16 +58,41 @@ const projectIdMock = '5d206f7f9aaf7c0071d64596'; /** * Mock project data */ +const planIdMock = new mongodb.ObjectId(); +const workspaceIdMock = new mongodb.ObjectId(); + +const planMock = { + _id: planIdMock, + rateLimitSettings: { + N: 0, + T: 0, + }, +}; + +const workspaceMock = { + _id: workspaceIdMock, + tariffPlanId: planIdMock, + rateLimitSettings: { + N: 0, + T: 0, + }, +}; + const projectMock = { _id: new mongodb.ObjectId(projectIdMock), id: projectIdMock, name: 'Test Project', token: 'test-token', + workspaceId: workspaceIdMock, uidAdded: { id: 'test-user-id', }, unreadCount: 0, description: 'Test project for grouper worker tests', + rateLimitSettings: { + N: 0, + T: 0, + }, eventGroupingPatterns: [ { _id: mongodb.ObjectId(), pattern: 'New error .*', @@ -113,8 +139,30 @@ describe('GrouperWorker', () => { let dailyEventsCollection: Collection; let repetitionsCollection: Collection; let projectsCollection: Collection; + let workspacesCollection: Collection; + let plansCollection: Collection; let redisClient: RedisClientType; let worker: GrouperWorker; + const setPlanRateLimit = async (eventsLimit: number, eventsPeriod: number): Promise => { + await plansCollection.updateOne( + { _id: planIdMock }, + { $set: { rateLimitSettings: { N: eventsLimit, T: eventsPeriod } } }, + { upsert: true } + ); + }; + const setWorkspaceRateLimit = async (eventsLimit: number, eventsPeriod: number): Promise => { + await workspacesCollection.updateOne( + { _id: workspaceIdMock }, + { $set: { rateLimitSettings: { N: eventsLimit, T: eventsPeriod } } }, + { upsert: true } + ); + }; + const setProjectRateLimit = async (eventsLimit: number, eventsPeriod: number): Promise => { + await projectsCollection.updateOne( + { _id: new mongodb.ObjectId(projectIdMock) }, + { $set: { rateLimitSettings: { N: eventsLimit, T: eventsPeriod } } }, + ); + }; beforeAll(async () => { worker = new GrouperWorker(); @@ -133,12 +181,17 @@ describe('GrouperWorker', () => { dailyEventsCollection = connection.db().collection('dailyEvents:' + projectIdMock); repetitionsCollection = connection.db().collection('repetitions:' + projectIdMock); projectsCollection = accountsConnection.db().collection('projects'); + workspacesCollection = accountsConnection.db().collection('workspaces'); + plansCollection = accountsConnection.db().collection('plans'); /** * Create unique index for groupHash */ await eventsCollection.createIndex({ groupHash: 1 }, { unique: true }); + await plansCollection.insertOne(planMock); + await workspacesCollection.insertOne(workspaceMock); + /** * Insert mock project into accounts database */ @@ -155,9 +208,13 @@ describe('GrouperWorker', () => { */ beforeEach(async () => { worker.clearCache(); + delete (worker as any)['memoizeCache:getProjectRateLimitSettings']; await eventsCollection.deleteMany({}); await dailyEventsCollection.deleteMany({}); await repetitionsCollection.deleteMany({}); + await setPlanRateLimit(0, 0); + await setWorkspaceRateLimit(0, 0); + await setProjectRateLimit(0, 0); }); afterEach(async () => { @@ -743,10 +800,257 @@ describe('GrouperWorker', () => { }); }); + describe('Rate limits counter increment', () => { + const rateLimitsKey = 'rate_limits'; + + test('increments counter when handling an event', async () => { + await setProjectRateLimit(5, 60); + + let currentTime = 1_000_000; + const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); + + try { + await worker.handle(generateTask()); + + const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); + + expect(storedValue).toBe(`${Math.floor(currentTime / 1000)}:1`); + } finally { + nowSpy.mockRestore(); + } + }); + + test('reuses window and increments while within limit', async () => { + await setProjectRateLimit(5, 60); + + let currentTime = 2_000_000; + const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); + + try { + await worker.handle(generateTask()); + await worker.handle(generateTask()); + + const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); + + expect(storedValue).not.toBeNull(); + + const [, count] = (storedValue as string).split(':'); + + expect(Number(count)).toBe(2); + } finally { + nowSpy.mockRestore(); + } + }); + + test('does not exceed configured limit within same window', async () => { + const eventsLimit = 3; + + await setProjectRateLimit(eventsLimit, 60); + + let currentTime = 3_000_000; + const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); + + try { + for (let i = 0; i < eventsLimit + 2; i++) { + await worker.handle(generateTask()); + } + + const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); + + expect(storedValue).not.toBeNull(); + + const [, count] = (storedValue as string).split(':'); + + expect(Number(count)).toBe(eventsLimit); + } finally { + nowSpy.mockRestore(); + } + }); + + test('resets window after period elapses', async () => { + await setProjectRateLimit(5, 2); + + let currentTime = 4_000_000; + const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); + + try { + await worker.handle(generateTask()); + + currentTime += 3_000; // advance by 3 seconds + + await worker.handle(generateTask()); + + const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); + + expect(storedValue).not.toBeNull(); + + const [timestamp, count] = (storedValue as string).split(':'); + + expect(Number(timestamp)).toBe(Math.floor(currentTime / 1000)); + expect(Number(count)).toBe(1); + } finally { + nowSpy.mockRestore(); + } + }); + + test('uses workspace limits when project overrides are absent', async () => { + await setPlanRateLimit(10, 60); + await setWorkspaceRateLimit(3, 60); + await setProjectRateLimit(0, 0); + + let currentTime = 5_000_000; + const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); + + try { + for (let i = 0; i < 5; i++) { + await worker.handle(generateTask()); + } + + const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); + + expect(storedValue).not.toBeNull(); + + const [, count] = (storedValue as string).split(':'); + + expect(Number(count)).toBe(3); + } finally { + nowSpy.mockRestore(); + } + }); + + test('falls back to plan limits when workspace settings are empty', async () => { + await setPlanRateLimit(4, 60); + await setWorkspaceRateLimit(0, 0); + await setProjectRateLimit(0, 0); + + let currentTime = 6_000_000; + const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); + + try { + for (let i = 0; i < 6; i++) { + await worker.handle(generateTask()); + } + + const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); + + expect(storedValue).not.toBeNull(); + + const [, count] = (storedValue as string).split(':'); + + expect(Number(count)).toBe(4); + } finally { + nowSpy.mockRestore(); + } + }); + + test('prefers project limits over workspace and plan', async () => { + await setPlanRateLimit(4, 60); + await setWorkspaceRateLimit(6, 60); + await setProjectRateLimit(8, 60); + + let currentTime = 7_000_000; + const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); + + try { + for (let i = 0; i < 10; i++) { + await worker.handle(generateTask()); + } + + const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); + + expect(storedValue).not.toBeNull(); + + const [, count] = (storedValue as string).split(':'); + + expect(Number(count)).toBe(8); + } finally { + nowSpy.mockRestore(); + } + }); + }); + + describe('Events-stored metrics', () => { + test('writes minutely, hourly, and daily samples after handling an event', async () => { + const safeTsAddSpy = jest.spyOn((worker as any).redis, 'safeTsAdd'); + + try { + await worker.handle(generateTask()); + + expect(safeTsAddSpy).toHaveBeenCalledTimes(3); + + const expectedLabels = { + type: 'error', + status: 'events-stored', + project: projectIdMock, + }; + + expect(safeTsAddSpy).toHaveBeenNthCalledWith( + 1, + `ts:project-events-stored:${projectIdMock}:minutely`, + 1, + expectedLabels, + TimeMs.DAY, + ); + expect(safeTsAddSpy).toHaveBeenNthCalledWith( + 2, + `ts:project-events-stored:${projectIdMock}:hourly`, + 1, + expectedLabels, + TimeMs.WEEK, + ); + expect(safeTsAddSpy).toHaveBeenNthCalledWith( + 3, + `ts:project-events-stored:${projectIdMock}:daily`, + 1, + expectedLabels, + 90 * TimeMs.DAY, + ); + } finally { + safeTsAddSpy.mockRestore(); + } + }); + + test('logs when a time-series write fails but continues processing', async () => { + const safeTsAddSpy = jest.spyOn((worker as any).redis, 'safeTsAdd'); + const loggerErrorSpy = jest.spyOn((worker as any).logger, 'error').mockImplementation(() => undefined); + const failure = new Error('TS failure'); + + safeTsAddSpy + .mockImplementationOnce(() => Promise.resolve()) + .mockImplementationOnce(async () => { throw failure; }) + .mockImplementationOnce(() => Promise.resolve()); + + try { + await worker.handle(generateTask()); + + expect(loggerErrorSpy).toHaveBeenCalledWith('Failed to add hourly TS for events-stored', failure); + expect(await eventsCollection.find().count()).toBe(1); + } finally { + safeTsAddSpy.mockRestore(); + loggerErrorSpy.mockRestore(); + } + }); + + test('records metrics exactly once per handled event', async () => { + const recordMetricsSpy = jest.spyOn(worker as any, 'recordProjectMetrics'); + + try { + await worker.handle(generateTask()); + + expect(recordMetricsSpy).toHaveBeenCalledTimes(1); + expect(recordMetricsSpy).toHaveBeenCalledWith(projectIdMock, 'events-stored'); + } finally { + recordMetricsSpy.mockRestore(); + } + }); + }); + afterAll(async () => { await redisClient.quit(); await worker.finish(); await projectsCollection.deleteMany({}); + await workspacesCollection.deleteMany({}); + await plansCollection.deleteMany({}); await accountsConnection.close(); await connection.close(); }); From 3badfa93c4ced6f085356c2d114fbef707fecee7 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sat, 7 Feb 2026 17:49:23 +0300 Subject: [PATCH 2/9] chore(): eslint fix --- workers/grouper/src/index.ts | 1 + workers/grouper/src/redisHelper.ts | 33 +++++++++++++++--------------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index 8a10b369..f42789ed 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -309,6 +309,7 @@ export default class GrouperWorker extends Worker { const series = [ { key: minutelyKey, label: 'minutely', retentionMs: TimeMs.DAY }, { key: hourlyKey, label: 'hourly', retentionMs: TimeMs.WEEK }, + /* eslint-disable-next-line @typescript-eslint/no-magic-numbers */ { key: dailyKey, label: 'daily', retentionMs: 90 * TimeMs.DAY }, ]; diff --git a/workers/grouper/src/redisHelper.ts b/workers/grouper/src/redisHelper.ts index f0f45a7e..0d207208 100644 --- a/workers/grouper/src/redisHelper.ts +++ b/workers/grouper/src/redisHelper.ts @@ -2,6 +2,7 @@ import HawkCatcher from '@hawk.so/nodejs'; import type { RedisClientType } from 'redis'; import { createClient } from 'redis'; import createLogger from '../../../lib/logger'; +import { MS_IN_SEC } from '../../../lib/utils/consts'; /** * Class with helper functions for working with Redis @@ -152,7 +153,7 @@ export default class RedisHelper { ` const key = 'rate_limits'; - const now = Math.floor(Date.now() / 1000); + const now = Math.floor(Date.now() / MS_IN_SEC); await this.redisClient.eval(script, { keys: [key], @@ -160,21 +161,6 @@ export default class RedisHelper { }); } - /** - * Build label arguments for RedisTimeSeries commands - * - * @param labels - labels to attach to the time series - */ - private buildLabelArguments(labels: Record): string[] { - const labelArgs: string[] = ['LABELS']; - - for (const [labelKey, labelValue] of Object.entries(labels)) { - labelArgs.push(labelKey, labelValue); - } - - return labelArgs; - } - /** * Creates a RedisTimeSeries key if it doesn't exist. * @@ -321,6 +307,21 @@ export default class RedisHelper { } } + /** + * Build label arguments for RedisTimeSeries commands + * + * @param labels - labels to attach to the time series + */ + private buildLabelArguments(labels: Record): string[] { + const labelArgs: string[] = ['LABELS']; + + for (const [labelKey, labelValue] of Object.entries(labels)) { + labelArgs.push(labelKey, labelValue); + } + + return labelArgs; + } + /** * Creates callback function for Redis operations * From 0d00df96b33ac3ac2a5eb62a65575861f832ff86 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sat, 7 Feb 2026 17:59:33 +0300 Subject: [PATCH 3/9] chore(): clean up --- workers/grouper/src/index.ts | 39 ++++++++++++++++++----- workers/grouper/src/redisHelper.ts | 7 +++-- workers/grouper/tests/index.test.ts | 49 +++++++++++++++++++++-------- 3 files changed, 72 insertions(+), 23 deletions(-) diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index f42789ed..52b9c705 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -307,10 +307,22 @@ export default class GrouperWorker extends Worker { }; const series = [ - { key: minutelyKey, label: 'minutely', retentionMs: TimeMs.DAY }, - { key: hourlyKey, label: 'hourly', retentionMs: TimeMs.WEEK }, - /* eslint-disable-next-line @typescript-eslint/no-magic-numbers */ - { key: dailyKey, label: 'daily', retentionMs: 90 * TimeMs.DAY }, + { + key: minutelyKey, + label: 'minutely', + retentionMs: TimeMs.DAY, + }, + { + key: hourlyKey, + label: 'hourly', + retentionMs: TimeMs.WEEK, + }, + { + key: dailyKey, + label: 'daily', + // eslint-disable-next-line @typescript-eslint/no-magic-numbers + retentionMs: 90 * TimeMs.DAY, + }, ]; for (const { key, label, retentionMs } of series) { @@ -367,7 +379,12 @@ export default class GrouperWorker extends Worker { .collection('projects') .findOne( { _id: new mongodb.ObjectId(projectId) }, - { projection: { rateLimitSettings: 1, workspaceId: 1 } } + { + projection: { + rateLimitSettings: 1, + workspaceId: 1, + }, + } ); if (!project) { @@ -388,7 +405,12 @@ export default class GrouperWorker extends Worker { .collection('workspaces') .findOne( { _id: workspaceId }, - { projection: { rateLimitSettings: 1, tariffPlanId: 1 } } + { + projection: { + rateLimitSettings: 1, + tariffPlanId: 1, + }, + } ); workspaceRateLimitSettings = workspace?.rateLimitSettings as { N: number, T: number }; @@ -449,7 +471,10 @@ export default class GrouperWorker extends Worker { return null; } - return { eventsLimit, eventsPeriod }; + return { + eventsLimit, + eventsPeriod, + }; } /** diff --git a/workers/grouper/src/redisHelper.ts b/workers/grouper/src/redisHelper.ts index 0d207208..bed08418 100644 --- a/workers/grouper/src/redisHelper.ts +++ b/workers/grouper/src/redisHelper.ts @@ -116,6 +116,7 @@ export default class RedisHelper { * * @param projectId - id of the project which event belongs to * @param eventsPeriod - rate limit period configured for the project + * @param limit - current event count limit (from project / workspace / plan) */ public async incrementRateLimitCounterForCurrentEvent(projectId: string, eventsPeriod: number, limit: number): Promise { const script = ` @@ -150,13 +151,13 @@ export default class RedisHelper { -- Increment counter redis.call('HSET', key, field, timestamp .. ':' .. (count + 1)) - ` + `; const key = 'rate_limits'; const now = Math.floor(Date.now() / MS_IN_SEC); await this.redisClient.eval(script, { - keys: [key], + keys: [ key ], arguments: [projectId, now.toString(), eventsPeriod.toString(), limit.toString()], }); } @@ -313,7 +314,7 @@ export default class RedisHelper { * @param labels - labels to attach to the time series */ private buildLabelArguments(labels: Record): string[] { - const labelArgs: string[] = ['LABELS']; + const labelArgs: string[] = [ 'LABELS' ]; for (const [labelKey, labelValue] of Object.entries(labels)) { labelArgs.push(labelKey, labelValue); diff --git a/workers/grouper/tests/index.test.ts b/workers/grouper/tests/index.test.ts index 2ae61810..34bad1cc 100644 --- a/workers/grouper/tests/index.test.ts +++ b/workers/grouper/tests/index.test.ts @@ -146,21 +146,42 @@ describe('GrouperWorker', () => { const setPlanRateLimit = async (eventsLimit: number, eventsPeriod: number): Promise => { await plansCollection.updateOne( { _id: planIdMock }, - { $set: { rateLimitSettings: { N: eventsLimit, T: eventsPeriod } } }, + { + $set: { + rateLimitSettings: { + N: eventsLimit, + T: eventsPeriod, + }, + }, + }, { upsert: true } ); }; const setWorkspaceRateLimit = async (eventsLimit: number, eventsPeriod: number): Promise => { await workspacesCollection.updateOne( { _id: workspaceIdMock }, - { $set: { rateLimitSettings: { N: eventsLimit, T: eventsPeriod } } }, + { + $set: { + rateLimitSettings: { + N: eventsLimit, + T: eventsPeriod, + }, + }, + }, { upsert: true } ); }; const setProjectRateLimit = async (eventsLimit: number, eventsPeriod: number): Promise => { await projectsCollection.updateOne( { _id: new mongodb.ObjectId(projectIdMock) }, - { $set: { rateLimitSettings: { N: eventsLimit, T: eventsPeriod } } }, + { + $set: { + rateLimitSettings: { + N: eventsLimit, + T: eventsPeriod, + }, + }, + } ); }; @@ -806,7 +827,7 @@ describe('GrouperWorker', () => { test('increments counter when handling an event', async () => { await setProjectRateLimit(5, 60); - let currentTime = 1_000_000; + const currentTime = 1_000_000; const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); try { @@ -823,7 +844,7 @@ describe('GrouperWorker', () => { test('reuses window and increments while within limit', async () => { await setProjectRateLimit(5, 60); - let currentTime = 2_000_000; + const currentTime = 2_000_000; const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); try { @@ -847,7 +868,7 @@ describe('GrouperWorker', () => { await setProjectRateLimit(eventsLimit, 60); - let currentTime = 3_000_000; + const currentTime = 3_000_000; const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); try { @@ -898,7 +919,7 @@ describe('GrouperWorker', () => { await setWorkspaceRateLimit(3, 60); await setProjectRateLimit(0, 0); - let currentTime = 5_000_000; + const currentTime = 5_000_000; const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); try { @@ -923,7 +944,7 @@ describe('GrouperWorker', () => { await setWorkspaceRateLimit(0, 0); await setProjectRateLimit(0, 0); - let currentTime = 6_000_000; + const currentTime = 6_000_000; const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); try { @@ -948,7 +969,7 @@ describe('GrouperWorker', () => { await setWorkspaceRateLimit(6, 60); await setProjectRateLimit(8, 60); - let currentTime = 7_000_000; + const currentTime = 7_000_000; const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); try { @@ -989,21 +1010,21 @@ describe('GrouperWorker', () => { `ts:project-events-stored:${projectIdMock}:minutely`, 1, expectedLabels, - TimeMs.DAY, + TimeMs.DAY ); expect(safeTsAddSpy).toHaveBeenNthCalledWith( 2, `ts:project-events-stored:${projectIdMock}:hourly`, 1, expectedLabels, - TimeMs.WEEK, + TimeMs.WEEK ); expect(safeTsAddSpy).toHaveBeenNthCalledWith( 3, `ts:project-events-stored:${projectIdMock}:daily`, 1, expectedLabels, - 90 * TimeMs.DAY, + 90 * TimeMs.DAY ); } finally { safeTsAddSpy.mockRestore(); @@ -1017,7 +1038,9 @@ describe('GrouperWorker', () => { safeTsAddSpy .mockImplementationOnce(() => Promise.resolve()) - .mockImplementationOnce(async () => { throw failure; }) + .mockImplementationOnce(async () => { + throw failure; + }) .mockImplementationOnce(() => Promise.resolve()); try { From 20990d34f56a3846ff0a22f0c48845484aaea605 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sun, 8 Feb 2026 20:05:48 +0300 Subject: [PATCH 4/9] chore(grouper): remove redundant rate-limit increment logic --- workers/grouper/src/index.ts | 144 ------------------- workers/grouper/src/redisHelper.ts | 51 ------- workers/grouper/tests/index.test.ts | 215 ---------------------------- 3 files changed, 410 deletions(-) diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index 52b9c705..05aff09b 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -270,7 +270,6 @@ export default class GrouperWorker extends Worker { } } - await this.incrementRateLimitCounter(task.projectId); await this.recordProjectMetrics(task.projectId, 'events-stored'); } @@ -334,149 +333,6 @@ export default class GrouperWorker extends Worker { } } - /** - * Increment rate limit counters for the project. - * - * @param projectId - id of the project - */ - private async incrementRateLimitCounter(projectId: string): Promise { - try { - const settings = await this.getProjectRateLimitSettings(projectId); - - if (!settings) { - return; - } - - await this.redis.incrementRateLimitCounterForCurrentEvent( - projectId, - settings.eventsPeriod, - settings.eventsLimit - ); - } catch (error) { - this.logger.error(`Failed to increment rate limit counter for project ${projectId}`, error); - } - } - - /** - * Fetch and normalize rate limit settings - * Rate limit settings could appear in tarifPlan, workspace and project. - * All rateLimits have different priority. - * - * @param projectId - id of the project - */ - @memoize({ max: 200, ttl: MEMOIZATION_TTL, strategy: 'concat', skipCache: [null] }) - private async getProjectRateLimitSettings(projectId: string): Promise<{ eventsLimit: number; eventsPeriod: number } | null> { - if (!projectId || !mongodb.ObjectID.isValid(projectId)) { - return null; - } - - const accountsDb = this.accountsDb.getConnection(); - - /** - * Fetch project from the db - */ - const project = await accountsDb - .collection('projects') - .findOne( - { _id: new mongodb.ObjectId(projectId) }, - { - projection: { - rateLimitSettings: 1, - workspaceId: 1, - }, - } - ); - - if (!project) { - return null; - } - - const projectRateLimitSettings = project.rateLimitSettings as { N: number, T: number }; - const workspaceId = new mongodb.ObjectID(project.workspaceId); - - let planRateLimitSettings: { N: number, T: number}; - let workspaceRateLimitSettings: { N: number, T: number}; - - /** - * Fetch workspace from the db - */ - if (workspaceId) { - const workspace = await accountsDb - .collection('workspaces') - .findOne( - { _id: workspaceId }, - { - projection: { - rateLimitSettings: 1, - tariffPlanId: 1, - }, - } - ); - - workspaceRateLimitSettings = workspace?.rateLimitSettings as { N: number, T: number }; - - const planId = new mongodb.ObjectId(workspace?.tariffPlanId); - - /** - * Tarif plan from the db - */ - if (planId) { - const plan = await accountsDb - .collection('plans') - .findOne( - { _id: planId }, - { projection: { rateLimitSettings: 1 } } - ); - - planRateLimitSettings = plan?.rateLimitSettings; - } - } - - return this.normalizeRateLimitSettings( - planRateLimitSettings, - workspaceRateLimitSettings, - projectRateLimitSettings - ); - } - - /** - * Normalize rate limit settings shape from database. - * - * @param rateLimitLayers - raw settings documents in priority order - */ - private normalizeRateLimitSettings( - ...rateLimitLayers: { N: number, T: number }[] - ): { eventsLimit: number; eventsPeriod: number } | null { - let eventsLimit = 0; - let eventsPeriod = 0; - - for (const layer of rateLimitLayers) { - if (!layer) { - continue; - } - - const limit = layer.N as number; - const period = layer.T as number; - - if (limit !== undefined && limit > 0) { - eventsLimit = limit; - } - - if (period !== undefined && period > 0) { - eventsPeriod = period; - } - } - - if (eventsLimit <= 0 || eventsPeriod <= 0) { - return null; - } - - return { - eventsLimit, - eventsPeriod, - }; - } - /** * Trims source code lines in event's backtrace to prevent memory leaks * diff --git a/workers/grouper/src/redisHelper.ts b/workers/grouper/src/redisHelper.ts index bed08418..cc009cd5 100644 --- a/workers/grouper/src/redisHelper.ts +++ b/workers/grouper/src/redisHelper.ts @@ -111,57 +111,6 @@ export default class RedisHelper { return result === null; } - /** - * Increments redis counter used for rate limiting - * - * @param projectId - id of the project which event belongs to - * @param eventsPeriod - rate limit period configured for the project - * @param limit - current event count limit (from project / workspace / plan) - */ - public async incrementRateLimitCounterForCurrentEvent(projectId: string, eventsPeriod: number, limit: number): Promise { - const script = ` - local key = KEYS[1] - local field = ARGV[1] - local now = tonumber(ARGV[2]) - local period = tonumber(ARGV[3]) - local limit = tonumber(ARGV[4]) - - local current = redis.call('HGET', key, field) - - -- If no record yet, start a new window with count = 1 - if not current then - redis.call('HSET', key, field, now .. ':1') - return - end - - local timestamp, count = string.match(current, '(%d+):(%d+)') - timestamp = tonumber(timestamp) - count = tonumber(count) - - -- Check if we're in a new time window - if now - timestamp >= period then - redis.call('HSET', key, field, now .. ':1') - return - end - - -- Check if incrementing would exceed limit - if count + 1 > limit then - return - end - - -- Increment counter - redis.call('HSET', key, field, timestamp .. ':' .. (count + 1)) - `; - - const key = 'rate_limits'; - const now = Math.floor(Date.now() / MS_IN_SEC); - - await this.redisClient.eval(script, { - keys: [ key ], - arguments: [projectId, now.toString(), eventsPeriod.toString(), limit.toString()], - }); - } - /** * Creates a RedisTimeSeries key if it doesn't exist. * diff --git a/workers/grouper/tests/index.test.ts b/workers/grouper/tests/index.test.ts index 34bad1cc..2986bf7f 100644 --- a/workers/grouper/tests/index.test.ts +++ b/workers/grouper/tests/index.test.ts @@ -143,48 +143,6 @@ describe('GrouperWorker', () => { let plansCollection: Collection; let redisClient: RedisClientType; let worker: GrouperWorker; - const setPlanRateLimit = async (eventsLimit: number, eventsPeriod: number): Promise => { - await plansCollection.updateOne( - { _id: planIdMock }, - { - $set: { - rateLimitSettings: { - N: eventsLimit, - T: eventsPeriod, - }, - }, - }, - { upsert: true } - ); - }; - const setWorkspaceRateLimit = async (eventsLimit: number, eventsPeriod: number): Promise => { - await workspacesCollection.updateOne( - { _id: workspaceIdMock }, - { - $set: { - rateLimitSettings: { - N: eventsLimit, - T: eventsPeriod, - }, - }, - }, - { upsert: true } - ); - }; - const setProjectRateLimit = async (eventsLimit: number, eventsPeriod: number): Promise => { - await projectsCollection.updateOne( - { _id: new mongodb.ObjectId(projectIdMock) }, - { - $set: { - rateLimitSettings: { - N: eventsLimit, - T: eventsPeriod, - }, - }, - } - ); - }; - beforeAll(async () => { worker = new GrouperWorker(); @@ -229,13 +187,9 @@ describe('GrouperWorker', () => { */ beforeEach(async () => { worker.clearCache(); - delete (worker as any)['memoizeCache:getProjectRateLimitSettings']; await eventsCollection.deleteMany({}); await dailyEventsCollection.deleteMany({}); await repetitionsCollection.deleteMany({}); - await setPlanRateLimit(0, 0); - await setWorkspaceRateLimit(0, 0); - await setProjectRateLimit(0, 0); }); afterEach(async () => { @@ -821,175 +775,6 @@ describe('GrouperWorker', () => { }); }); - describe('Rate limits counter increment', () => { - const rateLimitsKey = 'rate_limits'; - - test('increments counter when handling an event', async () => { - await setProjectRateLimit(5, 60); - - const currentTime = 1_000_000; - const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); - - try { - await worker.handle(generateTask()); - - const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); - - expect(storedValue).toBe(`${Math.floor(currentTime / 1000)}:1`); - } finally { - nowSpy.mockRestore(); - } - }); - - test('reuses window and increments while within limit', async () => { - await setProjectRateLimit(5, 60); - - const currentTime = 2_000_000; - const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); - - try { - await worker.handle(generateTask()); - await worker.handle(generateTask()); - - const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); - - expect(storedValue).not.toBeNull(); - - const [, count] = (storedValue as string).split(':'); - - expect(Number(count)).toBe(2); - } finally { - nowSpy.mockRestore(); - } - }); - - test('does not exceed configured limit within same window', async () => { - const eventsLimit = 3; - - await setProjectRateLimit(eventsLimit, 60); - - const currentTime = 3_000_000; - const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); - - try { - for (let i = 0; i < eventsLimit + 2; i++) { - await worker.handle(generateTask()); - } - - const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); - - expect(storedValue).not.toBeNull(); - - const [, count] = (storedValue as string).split(':'); - - expect(Number(count)).toBe(eventsLimit); - } finally { - nowSpy.mockRestore(); - } - }); - - test('resets window after period elapses', async () => { - await setProjectRateLimit(5, 2); - - let currentTime = 4_000_000; - const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); - - try { - await worker.handle(generateTask()); - - currentTime += 3_000; // advance by 3 seconds - - await worker.handle(generateTask()); - - const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); - - expect(storedValue).not.toBeNull(); - - const [timestamp, count] = (storedValue as string).split(':'); - - expect(Number(timestamp)).toBe(Math.floor(currentTime / 1000)); - expect(Number(count)).toBe(1); - } finally { - nowSpy.mockRestore(); - } - }); - - test('uses workspace limits when project overrides are absent', async () => { - await setPlanRateLimit(10, 60); - await setWorkspaceRateLimit(3, 60); - await setProjectRateLimit(0, 0); - - const currentTime = 5_000_000; - const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); - - try { - for (let i = 0; i < 5; i++) { - await worker.handle(generateTask()); - } - - const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); - - expect(storedValue).not.toBeNull(); - - const [, count] = (storedValue as string).split(':'); - - expect(Number(count)).toBe(3); - } finally { - nowSpy.mockRestore(); - } - }); - - test('falls back to plan limits when workspace settings are empty', async () => { - await setPlanRateLimit(4, 60); - await setWorkspaceRateLimit(0, 0); - await setProjectRateLimit(0, 0); - - const currentTime = 6_000_000; - const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); - - try { - for (let i = 0; i < 6; i++) { - await worker.handle(generateTask()); - } - - const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); - - expect(storedValue).not.toBeNull(); - - const [, count] = (storedValue as string).split(':'); - - expect(Number(count)).toBe(4); - } finally { - nowSpy.mockRestore(); - } - }); - - test('prefers project limits over workspace and plan', async () => { - await setPlanRateLimit(4, 60); - await setWorkspaceRateLimit(6, 60); - await setProjectRateLimit(8, 60); - - const currentTime = 7_000_000; - const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => currentTime); - - try { - for (let i = 0; i < 10; i++) { - await worker.handle(generateTask()); - } - - const storedValue = await redisClient.hGet(rateLimitsKey, projectIdMock); - - expect(storedValue).not.toBeNull(); - - const [, count] = (storedValue as string).split(':'); - - expect(Number(count)).toBe(8); - } finally { - nowSpy.mockRestore(); - } - }); - }); - describe('Events-stored metrics', () => { test('writes minutely, hourly, and daily samples after handling an event', async () => { const safeTsAddSpy = jest.spyOn((worker as any).redis, 'safeTsAdd'); From a8a31c850928eef18b43a35b1ea21a1dc0d9273b Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sun, 8 Feb 2026 20:08:15 +0300 Subject: [PATCH 5/9] chore(grouper): remove redundant mocks --- workers/grouper/tests/index.test.ts | 34 ----------------------------- 1 file changed, 34 deletions(-) diff --git a/workers/grouper/tests/index.test.ts b/workers/grouper/tests/index.test.ts index 2986bf7f..af1c2cf2 100644 --- a/workers/grouper/tests/index.test.ts +++ b/workers/grouper/tests/index.test.ts @@ -58,41 +58,16 @@ const projectIdMock = '5d206f7f9aaf7c0071d64596'; /** * Mock project data */ -const planIdMock = new mongodb.ObjectId(); -const workspaceIdMock = new mongodb.ObjectId(); - -const planMock = { - _id: planIdMock, - rateLimitSettings: { - N: 0, - T: 0, - }, -}; - -const workspaceMock = { - _id: workspaceIdMock, - tariffPlanId: planIdMock, - rateLimitSettings: { - N: 0, - T: 0, - }, -}; - const projectMock = { _id: new mongodb.ObjectId(projectIdMock), id: projectIdMock, name: 'Test Project', token: 'test-token', - workspaceId: workspaceIdMock, uidAdded: { id: 'test-user-id', }, unreadCount: 0, description: 'Test project for grouper worker tests', - rateLimitSettings: { - N: 0, - T: 0, - }, eventGroupingPatterns: [ { _id: mongodb.ObjectId(), pattern: 'New error .*', @@ -139,8 +114,6 @@ describe('GrouperWorker', () => { let dailyEventsCollection: Collection; let repetitionsCollection: Collection; let projectsCollection: Collection; - let workspacesCollection: Collection; - let plansCollection: Collection; let redisClient: RedisClientType; let worker: GrouperWorker; beforeAll(async () => { @@ -160,17 +133,12 @@ describe('GrouperWorker', () => { dailyEventsCollection = connection.db().collection('dailyEvents:' + projectIdMock); repetitionsCollection = connection.db().collection('repetitions:' + projectIdMock); projectsCollection = accountsConnection.db().collection('projects'); - workspacesCollection = accountsConnection.db().collection('workspaces'); - plansCollection = accountsConnection.db().collection('plans'); /** * Create unique index for groupHash */ await eventsCollection.createIndex({ groupHash: 1 }, { unique: true }); - await plansCollection.insertOne(planMock); - await workspacesCollection.insertOne(workspaceMock); - /** * Insert mock project into accounts database */ @@ -857,8 +825,6 @@ describe('GrouperWorker', () => { await redisClient.quit(); await worker.finish(); await projectsCollection.deleteMany({}); - await workspacesCollection.deleteMany({}); - await plansCollection.deleteMany({}); await accountsConnection.close(); await connection.close(); }); From f9b181d2bffb4414b3c55b66c64c90b62f23af74 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sun, 8 Feb 2026 20:09:47 +0300 Subject: [PATCH 6/9] chore(): eslint fix --- workers/grouper/src/redisHelper.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/workers/grouper/src/redisHelper.ts b/workers/grouper/src/redisHelper.ts index cc009cd5..d15df940 100644 --- a/workers/grouper/src/redisHelper.ts +++ b/workers/grouper/src/redisHelper.ts @@ -2,7 +2,6 @@ import HawkCatcher from '@hawk.so/nodejs'; import type { RedisClientType } from 'redis'; import { createClient } from 'redis'; import createLogger from '../../../lib/logger'; -import { MS_IN_SEC } from '../../../lib/utils/consts'; /** * Class with helper functions for working with Redis From a583997f84be9436e0e1ef82e18073365a1cb6e5 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sun, 8 Feb 2026 21:12:44 +0300 Subject: [PATCH 7/9] chore(): change metric type --- workers/grouper/src/index.ts | 2 +- workers/grouper/tests/index.test.ts | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index 05aff09b..a0611383 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -270,7 +270,7 @@ export default class GrouperWorker extends Worker { } } - await this.recordProjectMetrics(task.projectId, 'events-stored'); + await this.recordProjectMetrics(task.projectId, 'events-accepted'); } /** diff --git a/workers/grouper/tests/index.test.ts b/workers/grouper/tests/index.test.ts index af1c2cf2..6ad01812 100644 --- a/workers/grouper/tests/index.test.ts +++ b/workers/grouper/tests/index.test.ts @@ -743,7 +743,7 @@ describe('GrouperWorker', () => { }); }); - describe('Events-stored metrics', () => { + describe('Events-accepted metrics', () => { test('writes minutely, hourly, and daily samples after handling an event', async () => { const safeTsAddSpy = jest.spyOn((worker as any).redis, 'safeTsAdd'); @@ -754,27 +754,27 @@ describe('GrouperWorker', () => { const expectedLabels = { type: 'error', - status: 'events-stored', + status: 'events-accepted', project: projectIdMock, }; expect(safeTsAddSpy).toHaveBeenNthCalledWith( 1, - `ts:project-events-stored:${projectIdMock}:minutely`, + `ts:project-events-accepted:${projectIdMock}:minutely`, 1, expectedLabels, TimeMs.DAY ); expect(safeTsAddSpy).toHaveBeenNthCalledWith( 2, - `ts:project-events-stored:${projectIdMock}:hourly`, + `ts:project-events-accepted:${projectIdMock}:hourly`, 1, expectedLabels, TimeMs.WEEK ); expect(safeTsAddSpy).toHaveBeenNthCalledWith( 3, - `ts:project-events-stored:${projectIdMock}:daily`, + `ts:project-events-accepted:${projectIdMock}:daily`, 1, expectedLabels, 90 * TimeMs.DAY @@ -799,7 +799,7 @@ describe('GrouperWorker', () => { try { await worker.handle(generateTask()); - expect(loggerErrorSpy).toHaveBeenCalledWith('Failed to add hourly TS for events-stored', failure); + expect(loggerErrorSpy).toHaveBeenCalledWith('Failed to add hourly TS for events-accepted', failure); expect(await eventsCollection.find().count()).toBe(1); } finally { safeTsAddSpy.mockRestore(); @@ -814,7 +814,7 @@ describe('GrouperWorker', () => { await worker.handle(generateTask()); expect(recordMetricsSpy).toHaveBeenCalledTimes(1); - expect(recordMetricsSpy).toHaveBeenCalledWith(projectIdMock, 'events-stored'); + expect(recordMetricsSpy).toHaveBeenCalledWith(projectIdMock, 'events-accepted'); } finally { recordMetricsSpy.mockRestore(); } From 8ed970c057e680650a0a8ed52c63df64273a0f9f Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sun, 8 Feb 2026 21:44:50 +0300 Subject: [PATCH 8/9] Update workers/grouper/src/index.ts Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- workers/grouper/src/index.ts | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index a0611383..d0bdb4e6 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -324,13 +324,19 @@ export default class GrouperWorker extends Worker { }, ]; - for (const { key, label, retentionMs } of series) { - try { - await this.redis.safeTsAdd(key, 1, labels, retentionMs); - } catch (error) { - this.logger.error(`Failed to add ${label} TS for ${metricType}`, error); + const operations = series.map(({ key, label, retentionMs }) => ({ + label, + promise: this.redis.safeTsAdd(key, 1, labels, retentionMs), + })); + + const results = await Promise.allSettled(operations.map((op) => op.promise)); + + results.forEach((result, index) => { + if (result.status === 'rejected') { + const { label } = operations[index]; + this.logger.error(`Failed to add ${label} TS for ${metricType}`, result.reason); } - } + }); } /** From 45e527abb003c8d8ef5da53f044b9ffb84ed9089 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sun, 8 Feb 2026 22:06:29 +0300 Subject: [PATCH 9/9] imp(): use lua for create if not exists, to avoid race-cond --- workers/grouper/src/redisHelper.ts | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/workers/grouper/src/redisHelper.ts b/workers/grouper/src/redisHelper.ts index d15df940..ec486ddb 100644 --- a/workers/grouper/src/redisHelper.ts +++ b/workers/grouper/src/redisHelper.ts @@ -122,13 +122,16 @@ export default class RedisHelper { labels: Record, retentionMs = 0 ): Promise { - const exists = await this.redisClient.exists(key); + const script = ` + if redis.call('EXISTS', KEYS[1]) == 1 then + return 0 + end - if (exists > 0) { - return; - } + redis.call('TS.CREATE', KEYS[1], unpack(ARGV)) + return 1 + `; - const args: string[] = ['TS.CREATE', key]; + const args: string[] = []; if (retentionMs > 0) { args.push('RETENTION', Math.floor(retentionMs).toString()); @@ -136,7 +139,13 @@ export default class RedisHelper { args.push(...this.buildLabelArguments(labels)); - await this.redisClient.sendCommand(args); + await this.redisClient.eval( + script, + { + keys: [key], + arguments: args, + } + ); } /**