Skip to content

Commit 226ceb9

Browse files
authored
refactor: refactor sharded wal to handle coordinator (#1237)
1 parent 18acf3c commit 226ceb9

File tree

10 files changed

+1440
-592
lines changed

10 files changed

+1440
-592
lines changed

packages/utils/src/lib/create-runner-files.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import { writeFile } from 'node:fs/promises';
22
import path from 'node:path';
3-
import { threadId } from 'node:worker_threads';
43
import type { RunnerFilesPaths } from '@code-pushup/models';
54
import { ensureDirectoryExists, pluginWorkDir } from './file-system.js';
5+
import { getUniqueProcessThreadId } from './process-id.js';
66

77
/**
88
* Function to create timestamp nested plugin runner files for config and output.
@@ -14,9 +14,7 @@ export async function createRunnerFiles(
1414
pluginSlug: string,
1515
configJSON: string,
1616
): Promise<RunnerFilesPaths> {
17-
// Use timestamp + process ID + threadId
18-
// This prevents race conditions when running the same plugin for multiple projects in parallel
19-
const uniqueId = `${(performance.timeOrigin + performance.now()) * 10}-${process.pid}-${threadId}`;
17+
const uniqueId = getUniqueProcessThreadId();
2018
const runnerWorkDir = path.join(pluginWorkDir(pluginSlug), uniqueId);
2119
const runnerConfigPath = path.join(runnerWorkDir, 'plugin-config.json');
2220
const runnerOutputPath = path.join(runnerWorkDir, 'runner-output.json');

packages/utils/src/lib/errors.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,22 @@ export function stringifyError(
3030
}
3131
return JSON.stringify(error);
3232
}
33+
34+
/**
35+
* Extends an error with a new message and keeps the original as the cause.
36+
* This helps to keep the stacktrace intact and enables better debugging.
37+
* @param error - The error to extend
38+
* @param message - The new message to add to the error
39+
* @param appendOriginalMessage - Whether to add the original error message after new message
40+
* @returns A new error with the extended message and the original as cause
41+
*/
42+
export function extendError(
43+
error: unknown,
44+
message: string,
45+
{ appendOriginalMessage = false } = {},
46+
) {
47+
const errorMessage = appendOriginalMessage
48+
? `${message}\n${stringifyError(error)}`
49+
: message;
50+
return new Error(errorMessage, { cause: error });
51+
}

packages/utils/src/lib/profiler/constants.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,9 @@ export const SHARDED_WAL_COORDINATOR_ID_ENV_VAR =
3131
* Used as the base name for sharded WAL files (e.g., "trace" in "trace.json").
3232
*/
3333
export const PROFILER_PERSIST_BASENAME = 'trace';
34+
35+
/**
36+
* Name for current measure.
37+
* Used as the name for the sharded folder.
38+
*/
39+
export const PROFILER_MEASURE_NAME_ENV_VAR = 'CP_PROFILER_MEASURE_NAME';

packages/utils/src/lib/profiler/profiler-node.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@ import type {
1616
ActionTrackEntryPayload,
1717
MarkerPayload,
1818
} from '../user-timing-extensibility-api.type.js';
19+
import { getShardedPath } from '../wal-sharded.js';
1920
import {
2021
type AppendableSink,
22+
type WalRecord,
2123
WriteAheadLogFile,
22-
getShardedPath,
2324
} from '../wal.js';
2425
import {
2526
PROFILER_DEBUG_ENV_VAR,
@@ -73,7 +74,7 @@ export type NodejsProfilerOptions<
7374
* @template Tracks - Record type defining available track names and their configurations
7475
*/
7576
export class NodejsProfiler<
76-
DomainEvents extends string | object,
77+
DomainEvents extends WalRecord,
7778
Tracks extends Record<string, ActionTrackEntryPayload> = Record<
7879
string,
7980
ActionTrackEntryPayload
@@ -118,6 +119,7 @@ export class NodejsProfiler<
118119
filename ??
119120
path.join(
120121
process.cwd(),
122+
// @TODO remove in PR https://github.com/code-pushup/cli/pull/1231 in favour of class method getShardedFileName
121123
getShardedPath({
122124
dir: 'tmp/profiles',
123125
groupId: getUniqueTimeId(),
Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
import fs from 'node:fs';
2+
import path from 'node:path';
3+
import { SHARDED_WAL_COORDINATOR_ID_ENV_VAR } from './profiler/constants.js';
4+
import { ShardedWal } from './wal-sharded.js';
5+
import { type WalFormat, type WalRecord, stringCodec } from './wal.js';
6+
7+
describe('ShardedWal Integration', () => {
8+
const testDir = path.join(
9+
process.cwd(),
10+
'tmp',
11+
'int',
12+
'utils',
13+
'wal-sharded',
14+
);
15+
const makeMockFormat = <T extends WalRecord>(
16+
overrides: Partial<WalFormat<T>>,
17+
): WalFormat<T> => {
18+
const {
19+
baseName = 'wal',
20+
walExtension = '.log',
21+
finalExtension = '.json',
22+
codec = stringCodec<T>(),
23+
finalizer = records => `${JSON.stringify(records)}\n`,
24+
} = overrides;
25+
26+
return {
27+
baseName,
28+
walExtension,
29+
finalExtension,
30+
codec,
31+
finalizer,
32+
};
33+
};
34+
let shardedWal: ShardedWal;
35+
36+
beforeEach(() => {
37+
if (fs.existsSync(testDir)) {
38+
fs.rmSync(testDir, { recursive: true, force: true });
39+
}
40+
fs.mkdirSync(testDir, { recursive: true });
41+
});
42+
43+
afterEach(() => {
44+
if (shardedWal) {
45+
shardedWal.cleanupIfCoordinator();
46+
}
47+
if (fs.existsSync(testDir)) {
48+
fs.rmSync(testDir, { recursive: true, force: true });
49+
}
50+
});
51+
52+
it('should create and finalize shards correctly', () => {
53+
shardedWal = new ShardedWal({
54+
debug: false,
55+
dir: testDir,
56+
format: makeMockFormat({
57+
baseName: 'trace',
58+
}),
59+
coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR,
60+
groupId: 'create-finalize',
61+
});
62+
63+
const shard1 = shardedWal.shard();
64+
shard1.open();
65+
shard1.append('record1');
66+
shard1.append('record2');
67+
shard1.close();
68+
69+
const shard2 = shardedWal.shard();
70+
shard2.open();
71+
shard2.append('record3');
72+
shard2.close();
73+
74+
shardedWal.finalize();
75+
76+
const finalFile = path.join(
77+
testDir,
78+
shardedWal.groupId,
79+
`trace.create-finalize.json`,
80+
);
81+
expect(fs.existsSync(finalFile)).toBeTrue();
82+
83+
const content = fs.readFileSync(finalFile, 'utf8');
84+
const records = JSON.parse(content.trim());
85+
expect(records).toEqual(['record1', 'record2', 'record3']);
86+
});
87+
88+
it('should merge multiple shards correctly', () => {
89+
shardedWal = new ShardedWal({
90+
debug: false,
91+
dir: testDir,
92+
format: makeMockFormat({
93+
baseName: 'merged',
94+
}),
95+
coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR,
96+
groupId: 'merge-shards',
97+
});
98+
99+
// eslint-disable-next-line functional/no-loop-statements
100+
for (let i = 1; i <= 5; i++) {
101+
const shard = shardedWal.shard();
102+
shard.open();
103+
shard.append(`record-from-shard-${i}`);
104+
shard.close();
105+
}
106+
107+
shardedWal.finalize();
108+
109+
const finalFile = path.join(
110+
testDir,
111+
shardedWal.groupId,
112+
`merged.merge-shards.json`,
113+
);
114+
const content = fs.readFileSync(finalFile, 'utf8');
115+
const records = JSON.parse(content.trim());
116+
expect(records).toHaveLength(5);
117+
expect(records[0]).toBe('record-from-shard-1');
118+
expect(records[4]).toBe('record-from-shard-5');
119+
});
120+
121+
it('should expose recovery details in stats when debug is true', () => {
122+
shardedWal = new ShardedWal({
123+
debug: true,
124+
dir: testDir,
125+
format: makeMockFormat({
126+
baseName: 'test',
127+
}),
128+
coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR,
129+
groupId: 'invalid-entries',
130+
});
131+
132+
const shard = shardedWal.shard();
133+
shard.open();
134+
shard.append('valid1');
135+
shard.append('invalid');
136+
shard.append('valid2');
137+
shard.close();
138+
139+
shardedWal.finalize();
140+
// When debug is true, lastRecovery should contain recovery results
141+
expect(shardedWal.stats.lastRecovery).toHaveLength(1);
142+
expect(shardedWal.stats.lastRecovery[0]).toMatchObject({
143+
file: expect.stringContaining('test.'),
144+
result: expect.objectContaining({
145+
records: expect.arrayContaining(['valid1', 'invalid', 'valid2']),
146+
errors: [],
147+
partialTail: null,
148+
}),
149+
});
150+
151+
const finalFile = path.join(
152+
testDir,
153+
shardedWal.groupId,
154+
`test.invalid-entries.json`,
155+
);
156+
const content = fs.readFileSync(finalFile, 'utf8');
157+
const records = JSON.parse(content.trim());
158+
expect(records).toEqual(['valid1', 'invalid', 'valid2']);
159+
});
160+
161+
it('should cleanup shard files after finalization', () => {
162+
shardedWal = new ShardedWal({
163+
debug: false,
164+
dir: testDir,
165+
format: makeMockFormat({
166+
baseName: 'cleanup-test',
167+
}),
168+
coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR,
169+
groupId: 'cleanup-test',
170+
});
171+
172+
const shard1 = shardedWal.shard();
173+
shard1.open();
174+
shard1.append('record1');
175+
shard1.close();
176+
177+
const shard2 = shardedWal.shard();
178+
shard2.open();
179+
shard2.append('record2');
180+
shard2.close();
181+
182+
shardedWal.finalize();
183+
184+
const finalFile = path.join(
185+
testDir,
186+
shardedWal.groupId,
187+
`cleanup-test.cleanup-test.json`,
188+
);
189+
expect(fs.existsSync(finalFile)).toBeTrue();
190+
191+
shardedWal.cleanupIfCoordinator();
192+
193+
const groupDir = path.join(testDir, shardedWal.groupId);
194+
const files = fs.readdirSync(groupDir);
195+
expect(files).not.toContain(expect.stringMatching(/cleanup-test.*\.log$/));
196+
expect(files).toContain(`cleanup-test.cleanup-test.json`);
197+
});
198+
199+
it('should use custom options in finalizer', () => {
200+
shardedWal = new ShardedWal({
201+
debug: false,
202+
dir: testDir,
203+
format: makeMockFormat({
204+
baseName: 'custom',
205+
finalizer: (records, opt) =>
206+
`${JSON.stringify({ records, metadata: opt })}\n`,
207+
}),
208+
coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR,
209+
groupId: 'custom-finalizer',
210+
});
211+
212+
const shard = shardedWal.shard();
213+
shard.open();
214+
shard.append('record1');
215+
shard.close();
216+
217+
shardedWal.finalize({ version: '2.0', timestamp: Date.now() });
218+
219+
const finalFile = path.join(
220+
testDir,
221+
shardedWal.groupId,
222+
`custom.custom-finalizer.json`,
223+
);
224+
const content = fs.readFileSync(finalFile, 'utf8');
225+
const result = JSON.parse(content.trim());
226+
expect(result.records).toEqual(['record1']);
227+
expect(result.metadata).toEqual({
228+
version: '2.0',
229+
timestamp: expect.any(Number),
230+
});
231+
});
232+
233+
it('should handle empty shards correctly', () => {
234+
shardedWal = new ShardedWal({
235+
debug: false,
236+
dir: testDir,
237+
format: makeMockFormat({
238+
baseName: 'empty',
239+
}),
240+
coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR,
241+
groupId: 'empty-shards',
242+
});
243+
244+
const groupDir = path.join(testDir, shardedWal.groupId);
245+
fs.mkdirSync(groupDir, { recursive: true });
246+
247+
shardedWal.finalize();
248+
249+
const finalFile = path.join(
250+
testDir,
251+
shardedWal.groupId,
252+
`empty.${shardedWal.groupId}.json`,
253+
);
254+
expect(fs.existsSync(finalFile)).toBeTrue();
255+
const content = fs.readFileSync(finalFile, 'utf8');
256+
expect(content.trim()).toBe('[]');
257+
});
258+
});

0 commit comments

Comments
 (0)