diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index 73f16fc7..d0bdb4e6 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -26,6 +26,7 @@ import { rightTrim } from '../../../lib/utils/string'; import { hasValue } from '../../../lib/utils/hasValue'; /* eslint-disable-next-line no-unused-vars */ import { memoize } from '../../../lib/memoize'; +import TimeMs from '../../../lib/utils/time'; /** * eslint does not count decorators as a variable usage @@ -268,6 +269,74 @@ export default class GrouperWorker extends Worker { }); } } + + await this.recordProjectMetrics(task.projectId, 'events-accepted'); + } + + /** + * Build RedisTimeSeries key for project metrics. + * + * @param projectId - id of the project + * @param metricType - metric type identifier + * @param granularity - time granularity + */ + private getTimeSeriesKey( + projectId: string, + metricType: string, + granularity: 'minutely' | 'hourly' | 'daily' + ): string { + return `ts:project-${metricType}:${projectId}:${granularity}`; + } + + /** + * Record project metrics to Redis TimeSeries. + * + * @param projectId - id of the project + * @param metricType - metric type identifier + */ + private async recordProjectMetrics(projectId: string, metricType: string): Promise { + const minutelyKey = this.getTimeSeriesKey(projectId, metricType, 'minutely'); + const hourlyKey = this.getTimeSeriesKey(projectId, metricType, 'hourly'); + const dailyKey = this.getTimeSeriesKey(projectId, metricType, 'daily'); + + const labels: Record = { + type: 'error', + status: metricType, + project: projectId, + }; + + const series = [ + { + key: minutelyKey, + label: 'minutely', + retentionMs: TimeMs.DAY, + }, + { + key: hourlyKey, + label: 'hourly', + retentionMs: TimeMs.WEEK, + }, + { + key: dailyKey, + label: 'daily', + // eslint-disable-next-line @typescript-eslint/no-magic-numbers + retentionMs: 90 * TimeMs.DAY, + }, + ]; + + const operations = series.map(({ key, label, retentionMs }) => ({ + label, + promise: this.redis.safeTsAdd(key, 1, labels, retentionMs), + })); + + const results = await Promise.allSettled(operations.map((op) => op.promise)); + + results.forEach((result, index) => { + if (result.status === 'rejected') { + const { label } = operations[index]; + this.logger.error(`Failed to add ${label} TS for ${metricType}`, result.reason); + } + }); } /** diff --git a/workers/grouper/src/redisHelper.ts b/workers/grouper/src/redisHelper.ts index a655c24b..ec486ddb 100644 --- a/workers/grouper/src/redisHelper.ts +++ b/workers/grouper/src/redisHelper.ts @@ -110,6 +110,176 @@ export default class RedisHelper { return result === null; } + /** + * Creates a RedisTimeSeries key if it doesn't exist. + * + * @param key - time series key + * @param labels - labels to attach to the time series + * @param retentionMs - optional retention in milliseconds + */ + public async tsCreateIfNotExists( + key: string, + labels: Record, + retentionMs = 0 + ): Promise { + const script = ` + if redis.call('EXISTS', KEYS[1]) == 1 then + return 0 + end + + redis.call('TS.CREATE', KEYS[1], unpack(ARGV)) + return 1 + `; + + const args: string[] = []; + + if (retentionMs > 0) { + args.push('RETENTION', Math.floor(retentionMs).toString()); + } + + args.push(...this.buildLabelArguments(labels)); + + await this.redisClient.eval( + script, + { + keys: [key], + arguments: args, + } + ); + } + + /** + * Increments a RedisTimeSeries key with labels and timestamp. + * + * @param key - time series key + * @param value - value to increment by + * @param timestampMs - timestamp in milliseconds, defaults to current time + * @param labels - labels to attach to the sample + */ + public async tsIncrBy( + key: string, + value: number, + timestampMs = 0, + labels: Record = {} + ): Promise { + const labelArgs = this.buildLabelArguments(labels); + const timestamp = timestampMs === 0 ? Date.now() : timestampMs; + + const args: string[] = [ + 'TS.INCRBY', + key, + value.toString(), + 'TIMESTAMP', + Math.floor(timestamp).toString(), + ...labelArgs, + ]; + + await this.redisClient.sendCommand(args); + } + + /** + * Ensures that a RedisTimeSeries key exists and increments it safely. + * + * @param key - time series key + * @param value - value to increment by + * @param labels - labels to attach to the time series + * @param retentionMs - optional retention in milliseconds + */ + public async safeTsIncrBy( + key: string, + value: number, + labels: Record, + retentionMs = 0 + ): Promise { + const timestamp = Date.now(); + + try { + await this.tsIncrBy(key, value, timestamp, labels); + } catch (error) { + if (error instanceof Error && error.message.includes('TSDB: key does not exist')) { + this.logger.warn(`TS key ${key} does not exist, creating it...`); + await this.tsCreateIfNotExists(key, labels, retentionMs); + await this.tsIncrBy(key, value, timestamp, labels); + } else { + throw error; + } + } + } + + /** + * Adds a sample to a RedisTimeSeries key. + * + * @param key - time series key + * @param value - value to add + * @param timestampMs - timestamp in milliseconds, defaults to current time + * @param labels - labels to attach to the sample + */ + public async tsAdd( + key: string, + value: number, + timestampMs = 0, + labels: Record = {} + ): Promise { + const labelArgs = this.buildLabelArguments(labels); + const timestamp = timestampMs === 0 ? Date.now() : timestampMs; + + const args: string[] = [ + 'TS.ADD', + key, + Math.floor(timestamp).toString(), + value.toString(), + 'ON_DUPLICATE', + 'SUM', + ...labelArgs, + ]; + + await this.redisClient.sendCommand(args); + } + + /** + * Ensures that a RedisTimeSeries key exists and adds a sample safely. + * + * @param key - time series key + * @param value - value to add + * @param labels - labels to attach to the time series + * @param retentionMs - optional retention in milliseconds + */ + public async safeTsAdd( + key: string, + value: number, + labels: Record, + retentionMs = 0 + ): Promise { + const timestamp = Date.now(); + + try { + await this.tsAdd(key, value, timestamp, labels); + } catch (error) { + if (error instanceof Error && error.message.includes('TSDB: key does not exist')) { + this.logger.warn(`TS key ${key} does not exist, creating it...`); + await this.tsCreateIfNotExists(key, labels, retentionMs); + await this.tsAdd(key, value, timestamp, labels); + } else { + throw error; + } + } + } + + /** + * Build label arguments for RedisTimeSeries commands + * + * @param labels - labels to attach to the time series + */ + private buildLabelArguments(labels: Record): string[] { + const labelArgs: string[] = [ 'LABELS' ]; + + for (const [labelKey, labelValue] of Object.entries(labels)) { + labelArgs.push(labelKey, labelValue); + } + + return labelArgs; + } + /** * Creates callback function for Redis operations * @@ -130,4 +300,4 @@ export default class RedisHelper { resolve(resp !== 'OK'); }; } -} +} \ No newline at end of file diff --git a/workers/grouper/tests/index.test.ts b/workers/grouper/tests/index.test.ts index ee781e98..6ad01812 100644 --- a/workers/grouper/tests/index.test.ts +++ b/workers/grouper/tests/index.test.ts @@ -7,6 +7,7 @@ import type { Collection } from 'mongodb'; import { MongoClient } from 'mongodb'; import type { ErrorsCatcherType, EventAddons, EventData } from '@hawk.so/types'; import { MS_IN_SEC } from '../../../lib/utils/consts'; +import TimeMs from '../../../lib/utils/time'; import * as mongodb from 'mongodb'; import { patch } from '@n1ru4l/json-patch-plus'; @@ -115,7 +116,6 @@ describe('GrouperWorker', () => { let projectsCollection: Collection; let redisClient: RedisClientType; let worker: GrouperWorker; - beforeAll(async () => { worker = new GrouperWorker(); @@ -743,6 +743,84 @@ describe('GrouperWorker', () => { }); }); + describe('Events-accepted metrics', () => { + test('writes minutely, hourly, and daily samples after handling an event', async () => { + const safeTsAddSpy = jest.spyOn((worker as any).redis, 'safeTsAdd'); + + try { + await worker.handle(generateTask()); + + expect(safeTsAddSpy).toHaveBeenCalledTimes(3); + + const expectedLabels = { + type: 'error', + status: 'events-accepted', + project: projectIdMock, + }; + + expect(safeTsAddSpy).toHaveBeenNthCalledWith( + 1, + `ts:project-events-accepted:${projectIdMock}:minutely`, + 1, + expectedLabels, + TimeMs.DAY + ); + expect(safeTsAddSpy).toHaveBeenNthCalledWith( + 2, + `ts:project-events-accepted:${projectIdMock}:hourly`, + 1, + expectedLabels, + TimeMs.WEEK + ); + expect(safeTsAddSpy).toHaveBeenNthCalledWith( + 3, + `ts:project-events-accepted:${projectIdMock}:daily`, + 1, + expectedLabels, + 90 * TimeMs.DAY + ); + } finally { + safeTsAddSpy.mockRestore(); + } + }); + + test('logs when a time-series write fails but continues processing', async () => { + const safeTsAddSpy = jest.spyOn((worker as any).redis, 'safeTsAdd'); + const loggerErrorSpy = jest.spyOn((worker as any).logger, 'error').mockImplementation(() => undefined); + const failure = new Error('TS failure'); + + safeTsAddSpy + .mockImplementationOnce(() => Promise.resolve()) + .mockImplementationOnce(async () => { + throw failure; + }) + .mockImplementationOnce(() => Promise.resolve()); + + try { + await worker.handle(generateTask()); + + expect(loggerErrorSpy).toHaveBeenCalledWith('Failed to add hourly TS for events-accepted', failure); + expect(await eventsCollection.find().count()).toBe(1); + } finally { + safeTsAddSpy.mockRestore(); + loggerErrorSpy.mockRestore(); + } + }); + + test('records metrics exactly once per handled event', async () => { + const recordMetricsSpy = jest.spyOn(worker as any, 'recordProjectMetrics'); + + try { + await worker.handle(generateTask()); + + expect(recordMetricsSpy).toHaveBeenCalledTimes(1); + expect(recordMetricsSpy).toHaveBeenCalledWith(projectIdMock, 'events-accepted'); + } finally { + recordMetricsSpy.mockRestore(); + } + }); + }); + afterAll(async () => { await redisClient.quit(); await worker.finish();