Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions workers/grouper/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { rightTrim } from '../../../lib/utils/string';
import { hasValue } from '../../../lib/utils/hasValue';
/* eslint-disable-next-line no-unused-vars */
import { memoize } from '../../../lib/memoize';
import TimeMs from '../../../lib/utils/time';

/**
* eslint does not count decorators as a variable usage
Expand Down Expand Up @@ -268,6 +269,74 @@ export default class GrouperWorker extends Worker {
});
}
}

await this.recordProjectMetrics(task.projectId, 'events-accepted');
}

/**
* Build RedisTimeSeries key for project metrics.
*
* @param projectId - id of the project
* @param metricType - metric type identifier
* @param granularity - time granularity
*/
private getTimeSeriesKey(
projectId: string,
metricType: string,
granularity: 'minutely' | 'hourly' | 'daily'
): string {
return `ts:project-${metricType}:${projectId}:${granularity}`;
}

/**
* Record project metrics to Redis TimeSeries.
*
* @param projectId - id of the project
* @param metricType - metric type identifier
*/
private async recordProjectMetrics(projectId: string, metricType: string): Promise<void> {
const minutelyKey = this.getTimeSeriesKey(projectId, metricType, 'minutely');
const hourlyKey = this.getTimeSeriesKey(projectId, metricType, 'hourly');
const dailyKey = this.getTimeSeriesKey(projectId, metricType, 'daily');

const labels: Record<string, string> = {
type: 'error',
status: metricType,
project: projectId,
};

const series = [
{
key: minutelyKey,
label: 'minutely',
retentionMs: TimeMs.DAY,
},
{
key: hourlyKey,
label: 'hourly',
retentionMs: TimeMs.WEEK,
},
{
key: dailyKey,
label: 'daily',
// eslint-disable-next-line @typescript-eslint/no-magic-numbers
retentionMs: 90 * TimeMs.DAY,
},
];

const operations = series.map(({ key, label, retentionMs }) => ({
label,
promise: this.redis.safeTsAdd(key, 1, labels, retentionMs),
}));

const results = await Promise.allSettled(operations.map((op) => op.promise));

results.forEach((result, index) => {
if (result.status === 'rejected') {
const { label } = operations[index];
this.logger.error(`Failed to add ${label} TS for ${metricType}`, result.reason);
}
});
}

/**
Expand Down
172 changes: 171 additions & 1 deletion workers/grouper/src/redisHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,176 @@ export default class RedisHelper {
return result === null;
}

/**
* Creates a RedisTimeSeries key if it doesn't exist.
*
* @param key - time series key
* @param labels - labels to attach to the time series
* @param retentionMs - optional retention in milliseconds
*/
public async tsCreateIfNotExists(
key: string,
labels: Record<string, string>,
retentionMs = 0
): Promise<void> {
const script = `
if redis.call('EXISTS', KEYS[1]) == 1 then
return 0
end

redis.call('TS.CREATE', KEYS[1], unpack(ARGV))
return 1
`;

const args: string[] = [];

if (retentionMs > 0) {
args.push('RETENTION', Math.floor(retentionMs).toString());
}

args.push(...this.buildLabelArguments(labels));

await this.redisClient.eval(
script,
{
keys: [key],
arguments: args,
}
);
}

/**
* Increments a RedisTimeSeries key with labels and timestamp.
*
* @param key - time series key
* @param value - value to increment by
* @param timestampMs - timestamp in milliseconds, defaults to current time
* @param labels - labels to attach to the sample
*/
public async tsIncrBy(
key: string,
value: number,
timestampMs = 0,
labels: Record<string, string> = {}
): Promise<void> {
const labelArgs = this.buildLabelArguments(labels);
const timestamp = timestampMs === 0 ? Date.now() : timestampMs;

const args: string[] = [
'TS.INCRBY',
key,
value.toString(),
'TIMESTAMP',
Math.floor(timestamp).toString(),
...labelArgs,
];

await this.redisClient.sendCommand(args);
}

/**
* Ensures that a RedisTimeSeries key exists and increments it safely.
*
* @param key - time series key
* @param value - value to increment by
* @param labels - labels to attach to the time series
* @param retentionMs - optional retention in milliseconds
*/
public async safeTsIncrBy(
key: string,
value: number,
labels: Record<string, string>,
retentionMs = 0
): Promise<void> {
const timestamp = Date.now();

try {
await this.tsIncrBy(key, value, timestamp, labels);
} catch (error) {
if (error instanceof Error && error.message.includes('TSDB: key does not exist')) {
this.logger.warn(`TS key ${key} does not exist, creating it...`);
await this.tsCreateIfNotExists(key, labels, retentionMs);
await this.tsIncrBy(key, value, timestamp, labels);
} else {
throw error;
}
}
}

/**
* Adds a sample to a RedisTimeSeries key.
*
* @param key - time series key
* @param value - value to add
* @param timestampMs - timestamp in milliseconds, defaults to current time
* @param labels - labels to attach to the sample
*/
public async tsAdd(
key: string,
value: number,
timestampMs = 0,
labels: Record<string, string> = {}
): Promise<void> {
const labelArgs = this.buildLabelArguments(labels);
const timestamp = timestampMs === 0 ? Date.now() : timestampMs;

const args: string[] = [
'TS.ADD',
key,
Math.floor(timestamp).toString(),
value.toString(),
'ON_DUPLICATE',
'SUM',
...labelArgs,
];

await this.redisClient.sendCommand(args);
}

/**
* Ensures that a RedisTimeSeries key exists and adds a sample safely.
*
* @param key - time series key
* @param value - value to add
* @param labels - labels to attach to the time series
* @param retentionMs - optional retention in milliseconds
*/
public async safeTsAdd(
key: string,
value: number,
labels: Record<string, string>,
retentionMs = 0
): Promise<void> {
const timestamp = Date.now();

try {
await this.tsAdd(key, value, timestamp, labels);
} catch (error) {
if (error instanceof Error && error.message.includes('TSDB: key does not exist')) {
this.logger.warn(`TS key ${key} does not exist, creating it...`);
await this.tsCreateIfNotExists(key, labels, retentionMs);
await this.tsAdd(key, value, timestamp, labels);
} else {
throw error;
}
}
}

/**
* Build label arguments for RedisTimeSeries commands
*
* @param labels - labels to attach to the time series
*/
private buildLabelArguments(labels: Record<string, string>): string[] {
const labelArgs: string[] = [ 'LABELS' ];

for (const [labelKey, labelValue] of Object.entries(labels)) {
labelArgs.push(labelKey, labelValue);
}

return labelArgs;
Comment on lines +273 to +280
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buildLabelArguments always returns ['LABELS'] even when labels is empty. Because tsAdd/tsIncrBy default labels = {}, this can generate RedisTimeSeries commands ending with a bare LABELS token, which Redis will reject as a wrong-arity command. Consider returning an empty array when there are no labels (or make labels required for these public methods).

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we never pass empty labels object

}

/**
* Creates callback function for Redis operations
*
Expand All @@ -130,4 +300,4 @@ export default class RedisHelper {
resolve(resp !== 'OK');
};
}
}
}
80 changes: 79 additions & 1 deletion workers/grouper/tests/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type { Collection } from 'mongodb';
import { MongoClient } from 'mongodb';
import type { ErrorsCatcherType, EventAddons, EventData } from '@hawk.so/types';
import { MS_IN_SEC } from '../../../lib/utils/consts';
import TimeMs from '../../../lib/utils/time';
import * as mongodb from 'mongodb';
import { patch } from '@n1ru4l/json-patch-plus';

Expand Down Expand Up @@ -115,7 +116,6 @@ describe('GrouperWorker', () => {
let projectsCollection: Collection;
let redisClient: RedisClientType;
let worker: GrouperWorker;

beforeAll(async () => {
worker = new GrouperWorker();

Expand Down Expand Up @@ -743,6 +743,84 @@ describe('GrouperWorker', () => {
});
});

describe('Events-accepted metrics', () => {
test('writes minutely, hourly, and daily samples after handling an event', async () => {
const safeTsAddSpy = jest.spyOn((worker as any).redis, 'safeTsAdd');

try {
await worker.handle(generateTask());

expect(safeTsAddSpy).toHaveBeenCalledTimes(3);

const expectedLabels = {
type: 'error',
status: 'events-accepted',
project: projectIdMock,
};

expect(safeTsAddSpy).toHaveBeenNthCalledWith(
1,
`ts:project-events-accepted:${projectIdMock}:minutely`,
1,
expectedLabels,
TimeMs.DAY
);
expect(safeTsAddSpy).toHaveBeenNthCalledWith(
2,
`ts:project-events-accepted:${projectIdMock}:hourly`,
1,
expectedLabels,
TimeMs.WEEK
);
expect(safeTsAddSpy).toHaveBeenNthCalledWith(
3,
`ts:project-events-accepted:${projectIdMock}:daily`,
1,
expectedLabels,
90 * TimeMs.DAY
);
} finally {
safeTsAddSpy.mockRestore();
}
});

test('logs when a time-series write fails but continues processing', async () => {
const safeTsAddSpy = jest.spyOn((worker as any).redis, 'safeTsAdd');
const loggerErrorSpy = jest.spyOn((worker as any).logger, 'error').mockImplementation(() => undefined);
const failure = new Error('TS failure');

safeTsAddSpy
.mockImplementationOnce(() => Promise.resolve())
.mockImplementationOnce(async () => {
throw failure;
})
.mockImplementationOnce(() => Promise.resolve());

try {
await worker.handle(generateTask());

expect(loggerErrorSpy).toHaveBeenCalledWith('Failed to add hourly TS for events-accepted', failure);
expect(await eventsCollection.find().count()).toBe(1);
} finally {
safeTsAddSpy.mockRestore();
loggerErrorSpy.mockRestore();
}
});

test('records metrics exactly once per handled event', async () => {
const recordMetricsSpy = jest.spyOn(worker as any, 'recordProjectMetrics');

try {
await worker.handle(generateTask());

expect(recordMetricsSpy).toHaveBeenCalledTimes(1);
expect(recordMetricsSpy).toHaveBeenCalledWith(projectIdMock, 'events-accepted');
} finally {
recordMetricsSpy.mockRestore();
}
});
});

afterAll(async () => {
await redisClient.quit();
await worker.finish();
Expand Down
Loading