Skip to content

Commit 6bee2c7

Browse files
committed
refactor: wip
1 parent 244dca8 commit 6bee2c7

File tree

4 files changed

+61
-85
lines changed

4 files changed

+61
-85
lines changed

packages/utils/src/lib/profiler/profiler-node.ts

Lines changed: 36 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@ import {
88
import { objectToEntries } from '../transform.js';
99
import {
1010
asOptions,
11+
errorToMarkerPayload,
1112
markerPayload,
12-
errorToMarkerPayload } from '../user-timing-extensibility-api-utils.js';
13+
} from '../user-timing-extensibility-api-utils.js';
1314
import type {
1415
ActionTrackEntryPayload,
1516
MarkerPayload,
@@ -109,27 +110,44 @@ export class NodejsProfiler<
109110
*/
110111

111112
constructor(options: NodejsProfilerOptions<DomainEvents, Tracks>) {
113+
// Pick ProfilerBufferOptions
112114
const {
113115
captureBufferedEntries,
114116
flushThreshold,
115117
maxQueueSize,
118+
...allButBufferOptions
119+
} = options;
120+
// Pick ProfilerPersistOptions
121+
const {
116122
format: profilerFormat,
117123
measureName,
118124
outDir = PROFILER_PERSIST_OUT_DIR,
119125
enabled,
120126
debug,
121127
...profilerOptions
122-
} = options;
128+
} = allButBufferOptions;
123129

124130
super({ ...profilerOptions, enabled, debug });
125131

126-
this.#initializeStorage(profilerFormat, {
132+
const { encodePerfEntry, ...format } = profilerFormat;
133+
134+
this.#sharder = new ShardedWal<DomainEvents>({
135+
debug,
136+
dir: process.env[PROFILER_OUT_DIR_ENV_VAR] ?? outDir,
137+
format: parseWalFormat<DomainEvents>(format),
138+
coordinatorIdEnvVar: PROFILER_SHARDER_ID_ENV_VAR,
139+
measureNameEnvVar: PROFILER_MEASURE_NAME_ENV_VAR,
140+
groupId: measureName,
141+
});
142+
143+
this.#shard = this.#sharder.shard();
144+
this.#performanceObserverSink = new PerformanceObserverSink({
145+
sink: this.#shard,
146+
encodePerfEntry,
127147
captureBufferedEntries,
128148
flushThreshold,
129149
maxQueueSize,
130-
measureName,
131-
outDir,
132-
debug,
150+
debug: this.isDebugMode(),
133151
});
134152

135153
this.#unsubscribeExitHandlers = subscribeProcessExit({
@@ -151,40 +169,6 @@ export class NodejsProfiler<
151169
}
152170
}
153171

154-
#initializeStorage(
155-
profilerFormat: ProfilerFormat<DomainEvents>,
156-
options: {
157-
captureBufferedEntries?: boolean;
158-
flushThreshold?: number;
159-
maxQueueSize?: number;
160-
measureName?: string;
161-
outDir: string;
162-
debug?: boolean;
163-
},
164-
) {
165-
const { encodePerfEntry, ...format } = profilerFormat;
166-
const { captureBufferedEntries, flushThreshold, maxQueueSize, measureName, outDir, debug } = options;
167-
168-
this.#sharder = new ShardedWal<DomainEvents>({
169-
debug,
170-
dir: process.env[PROFILER_OUT_DIR_ENV_VAR] ?? outDir,
171-
format: parseWalFormat<DomainEvents>(format),
172-
coordinatorIdEnvVar: PROFILER_SHARDER_ID_ENV_VAR,
173-
measureNameEnvVar: PROFILER_MEASURE_NAME_ENV_VAR,
174-
groupId: measureName,
175-
});
176-
177-
this.#shard = this.#sharder.shard();
178-
this.#performanceObserverSink = new PerformanceObserverSink({
179-
sink: this.#shard,
180-
encodePerfEntry,
181-
captureBufferedEntries,
182-
flushThreshold,
183-
maxQueueSize,
184-
debug: this.isDebugMode(),
185-
});
186-
}
187-
188172
/**
189173
* Creates a performance marker for a profiler state transition.
190174
*/
@@ -255,22 +239,22 @@ export class NodejsProfiler<
255239
switch (transition) {
256240
case 'idle->running':
257241
super.setEnabled(true);
258-
this.#shard.open();
259-
this.#performanceObserverSink.subscribe();
242+
this.#shard?.open();
243+
this.#performanceObserverSink?.subscribe();
260244
break;
261245

262246
case 'running->idle':
263247
super.setEnabled(false);
264-
this.#performanceObserverSink.unsubscribe();
265-
this.#shard.close();
248+
this.#performanceObserverSink?.unsubscribe();
249+
this.#shard?.close();
266250
break;
267251

268252
case 'running->closed':
269253
case 'idle->closed':
270254
super.setEnabled(false);
271-
this.#performanceObserverSink.unsubscribe();
272-
this.#shard.close();
273-
this.#sharder.finalizeIfCoordinator();
255+
this.#performanceObserverSink?.unsubscribe();
256+
this.#shard?.close();
257+
this.#sharder?.finalizeIfCoordinator();
274258
this.#unsubscribeExitHandlers?.();
275259
break;
276260

@@ -317,17 +301,17 @@ export class NodejsProfiler<
317301
state: sharderState,
318302
isCoordinator,
319303
...sharderStats
320-
} = this.#sharder.stats;
304+
} = this.#sharder?.stats ?? {};
321305

322306
return {
323307
profilerState: this.#state,
324308
debug: this.isDebugMode(),
325309
sharderState,
326310
...sharderStats,
327311
isCoordinator,
328-
shardOpen: !this.#shard.isClosed(),
329-
shardPath: this.#shard.getPath(),
330-
...this.#performanceObserverSink.getStats(),
312+
shardOpen: this.#shard?.isClosed(),
313+
shardPath: this.#shard?.getPath(),
314+
...this.#performanceObserverSink?.getStats(),
331315
};
332316
}
333317

@@ -336,6 +320,6 @@ export class NodejsProfiler<
336320
if (this.#state === 'closed') {
337321
return; // No-op if closed
338322
}
339-
this.#performanceObserverSink.flush();
323+
this.#performanceObserverSink?.flush();
340324
}
341325
}

packages/utils/src/lib/wal-sharded.ts

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ export class ShardedWal<T extends WalRecord = WalRecord> {
6767
},
6868
});
6969
readonly groupId = getUniqueTimeId();
70-
readonly #debug = false;
70+
readonly #debug: boolean = false;
7171
readonly #format: WalFormat<T>;
7272
readonly #dir: string = process.cwd();
7373
readonly #coordinatorIdEnvVar: string;
@@ -117,7 +117,7 @@ export class ShardedWal<T extends WalRecord = WalRecord> {
117117
* @param opt.measureNameEnvVar - Environment variable name for coordinating groupId across processes (optional)
118118
*/
119119
constructor(opt: {
120-
debug: boolean;
120+
debug?: boolean;
121121
dir?: string;
122122
format: WalFormat<T>;
123123
groupId?: string;
@@ -128,12 +128,17 @@ export class ShardedWal<T extends WalRecord = WalRecord> {
128128
const {
129129
dir,
130130
format,
131+
debug,
131132
groupId,
132133
coordinatorIdEnvVar,
133134
autoCoordinator = true,
134135
measureNameEnvVar,
135136
} = opt;
136137

138+
if (debug != null) {
139+
this.#debug = debug;
140+
}
141+
137142
// Determine groupId: use provided, then env var, or generate
138143
// eslint-disable-next-line functional/no-let
139144
let resolvedGroupId: string;
@@ -295,6 +300,11 @@ export class ShardedWal<T extends WalRecord = WalRecord> {
295300
.map(entry => path.join(groupDir, entry));
296301
}
297302

303+
/** Get shard file paths created by this instance */
304+
private getCreatedShardFiles() {
305+
return this.#createdShardFiles.filter(f => fs.existsSync(f));
306+
}
307+
298308
/**
299309
* Finalize all shards by merging them into a single output file.
300310
* Recovers all records from all shards, validates no errors, and writes merged result.
@@ -355,9 +365,11 @@ export class ShardedWal<T extends WalRecord = WalRecord> {
355365
return;
356366
}
357367

358-
this.shardFiles().forEach(f => {
359-
fs.unlinkSync(f);
360-
});
368+
this.getCreatedShardFiles()
369+
.filter(f => fs.existsSync(f))
370+
.forEach(f => {
371+
fs.unlinkSync(f);
372+
});
361373

362374
this.#state = 'cleaned';
363375
}
@@ -367,13 +379,13 @@ export class ShardedWal<T extends WalRecord = WalRecord> {
367379
lastRecover: this.#lastRecovery,
368380
state: this.#state,
369381
groupId: this.groupId,
370-
shardCount: this.shardFiles().length,
382+
shardCount: this.getCreatedShardFiles().length,
371383
isCoordinator: this.isCoordinator(),
372384
isFinalized: this.isFinalized(),
373385
isCleaned: this.isCleaned(),
374386
finalFilePath: this.getFinalFilePath(),
375-
shardFileCount: this.shardFiles().length,
376-
shardFiles: this.shardFiles(),
387+
shardFileCount: this.getCreatedShardFiles().length,
388+
shardFiles: this.getCreatedShardFiles(),
377389
};
378390
}
379391

packages/utils/src/lib/wal-sharded.unit.test.ts

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -502,27 +502,7 @@ describe('ShardedWal', () => {
502502

503503
expect(sw.stats.shardFiles).toHaveLength(0);
504504
sw.shard();
505-
expect(sw.stats.shardFiles).toHaveLength(1);
506-
507-
sw.cleanupIfCoordinator();
508-
expect(sw.getState()).toBe('cleaned');
509-
expect(sw.stats.shardFiles).toHaveLength(1);
510-
});
511-
512-
it('should support cleanupIfCoordinator method', () => {
513-
vol.fromJSON({
514-
'/shards/20231114-221320-000/test.20231114-221320-000.10001.2.1.log':
515-
'content1',
516-
});
517-
518-
const sw = getShardedWal({
519-
dir: '/shards',
520-
format: { baseName: 'test', walExtension: '.log' },
521-
});
522-
523505
expect(sw.stats.shardFiles).toHaveLength(0);
524-
sw.shard();
525-
expect(sw.stats.shardFiles).toHaveLength(1);
526506

527507
sw.cleanupIfCoordinator();
528508
expect(sw.getState()).toBe('cleaned');

packages/utils/src/lib/wal.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@ export type Codec<I, O = string> = {
1414

1515
export type InvalidEntry<O = string> = { __invalid: true; raw: O };
1616

17-
type CodecInput<C extends Codec<any, any>> =
18-
C extends Codec<infer I, any> ? I : never;
19-
type CodecOutput<C extends Codec<any, any>> =
20-
C extends Codec<any, infer O> ? O : never;
17+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
18+
type CodecInput<C> = C extends Codec<infer I, infer O> ? I : never;
19+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
20+
type CodecOutput<C> = C extends Codec<infer I, infer O> ? O : never;
2121

22-
export type TolerantCodec<C extends Codec<any, any>> = Codec<
22+
export type TolerantCodec<C> = Codec<
2323
CodecInput<C> | InvalidEntry<CodecOutput<C>>,
2424
CodecOutput<C>
2525
>;

0 commit comments

Comments
 (0)