From 5e640dca34c92e3f1373233cc44abb0c6dc4a556 Mon Sep 17 00:00:00 2001 From: Ehab Younes Date: Mon, 4 May 2026 17:28:01 +0300 Subject: [PATCH 1/5] feat(telemetry): local JSONL sink with per-session files LocalJsonlSink is the first concrete TelemetrySink. Each VS Code session writes its own files at globalStorage/telemetry/telemetry-YYYY-MM-DD-{sessionId8}.jsonl, with .N.jsonl size segments. One writer per file means concurrent windows do not race on appends or rotation, so no lockfile is needed. Tunables come from coder.telemetry.localJsonl (object-typed, not registered in package.json yet, same convention as coder.telemetry.level). Defaults: 15s flush, 100-event batch trigger, 500-event hard cap, 5MB per file, 30 day age, 100MB total. The sink subscribes via watchConfigurationChanges so changes apply without a window reload. cleanupFiles moves from sshProcess.ts to src/util/fileCleanup.ts and is shared between both. PathResolver gains getTelemetryPath(). Also drops a redundant value re-check in TelemetryService's level watcher: readLevel already sanitizes its return. Closes #901 --- src/core/container.ts | 14 +- src/core/pathResolver.ts | 10 + src/remote/sshProcess.ts | 88 +--- src/telemetry/service.ts | 8 +- src/telemetry/sinks/localJsonlSink.ts | 327 +++++++++++++++ src/util/fileCleanup.ts | 91 ++++ .../telemetry/sinks/localJsonlSink.test.ts | 390 ++++++++++++++++++ test/unit/util/fileCleanup.test.ts | 123 ++++++ 8 files changed, 969 insertions(+), 82 deletions(-) create mode 100644 src/telemetry/sinks/localJsonlSink.ts create mode 100644 src/util/fileCleanup.ts create mode 100644 test/unit/telemetry/sinks/localJsonlSink.test.ts create mode 100644 test/unit/util/fileCleanup.test.ts diff --git a/src/core/container.ts b/src/core/container.ts index 85934605..1b61741e 100644 --- a/src/core/container.ts +++ b/src/core/container.ts @@ -4,6 +4,7 @@ import { CoderApi } from "../api/coderApi"; import { LoginCoordinator } from "../login/loginCoordinator"; import { OAuthCallback } from "../oauth/oauthCallback"; import { TelemetryService } from "../telemetry/service"; +import { LocalJsonlSink } from "../telemetry/sinks/localJsonlSink"; import { SpeedtestPanelFactory } from "../webviews/speedtest/speedtestPanelFactory"; import { DuplicateWorkspaceIpc } from "../workspace/duplicateWorkspaceIpc"; @@ -90,7 +91,18 @@ export class ServiceContainer implements vscode.Disposable { context.extensionUri, this.logger, ); - this.telemetryService = new TelemetryService(context, [], this.logger); + const localJsonlSink = LocalJsonlSink.start( + { + baseDir: this.pathResolver.getTelemetryPath(), + sessionId: vscode.env.sessionId, + }, + this.logger, + ); + this.telemetryService = new TelemetryService( + context, + [localJsonlSink], + this.logger, + ); } getPathResolver(): PathResolver { diff --git a/src/core/pathResolver.ts b/src/core/pathResolver.ts index f02d078f..719e878e 100644 --- a/src/core/pathResolver.ts +++ b/src/core/pathResolver.ts @@ -49,6 +49,16 @@ export class PathResolver { return path.join(this.basePath, "net"); } + /** + * Return the directory where local telemetry JSONL files are written. + * + * Files within this directory are managed by `LocalJsonlSink`, which + * creates the directory on activation if it does not already exist. + */ + public getTelemetryPath(): string { + return path.join(this.basePath, "telemetry"); + } + /** * Return the proxy log directory from the `coder.proxyLogDirectory` setting * or the `CODER_SSH_LOG_DIR` environment variable, falling back to the `log` diff --git a/src/remote/sshProcess.ts b/src/remote/sshProcess.ts index 5a3a2e2e..dceef3e3 100644 --- a/src/remote/sshProcess.ts +++ b/src/remote/sshProcess.ts @@ -4,6 +4,7 @@ import * as path from "node:path"; import * as vscode from "vscode"; import { findPort } from "../util"; +import { cleanupFiles } from "../util/fileCleanup"; import { NetworkStatusReporter } from "./networkStatus"; @@ -80,72 +81,6 @@ export class SshProcessMonitor implements vscode.Disposable { private lastStaleSearchTime = 0; private readonly reporter: NetworkStatusReporter; - /** - * Helper to clean up files in a directory. - * Stats files in parallel, applies selection criteria, then deletes in parallel. - */ - private static async cleanupFiles( - dir: string, - fileType: string, - logger: Logger, - options: { - filter: (name: string) => boolean; - select: ( - files: Array<{ name: string; mtime: number }>, - now: number, - ) => Array<{ name: string }>; - }, - ): Promise { - try { - const now = Date.now(); - const files = await fs.readdir(dir); - - // Gather file stats in parallel - const withStats = await Promise.all( - files.filter(options.filter).map(async (name) => { - try { - const stats = await fs.stat(path.join(dir, name)); - return { name, mtime: stats.mtime.getTime() }; - } catch (error) { - if ((error as NodeJS.ErrnoException).code !== "ENOENT") { - logger.debug(`Failed to stat ${fileType} ${name}`, error); - } - return null; - } - }), - ); - - const toDelete = options.select( - withStats.filter((f) => f !== null), - now, - ); - - // Delete files in parallel - const results = await Promise.all( - toDelete.map(async (file) => { - try { - await fs.unlink(path.join(dir, file.name)); - return file.name; - } catch (error) { - if ((error as NodeJS.ErrnoException).code !== "ENOENT") { - logger.debug(`Failed to delete ${fileType} ${file.name}`, error); - } - return null; - } - }), - ); - - const deletedFiles = results.filter((name) => name !== null); - if (deletedFiles.length > 0) { - logger.debug( - `Cleaned up ${deletedFiles.length} ${fileType}(s): ${deletedFiles.join(", ")}`, - ); - } - } catch { - // Directory may not exist yet, ignore - } - } - /** * Cleans up network info files older than the specified age. */ @@ -154,15 +89,11 @@ export class SshProcessMonitor implements vscode.Disposable { maxAgeMs: number, logger: Logger, ): Promise { - await SshProcessMonitor.cleanupFiles( - networkInfoPath, - "network info file", - logger, - { - filter: (name) => name.endsWith(".json"), - select: (files, now) => files.filter((f) => now - f.mtime > maxAgeMs), - }, - ); + await cleanupFiles(networkInfoPath, logger, { + fileType: "network info file", + match: (name) => name.endsWith(".json"), + pick: (files, now) => files.filter((f) => now - f.mtime > maxAgeMs), + }); } /** @@ -175,9 +106,10 @@ export class SshProcessMonitor implements vscode.Disposable { maxAgeMs: number, logger: Logger, ): Promise { - await SshProcessMonitor.cleanupFiles(logDir, "log file", logger, { - filter: (name) => name.startsWith("coder-ssh") && name.endsWith(".log"), - select: (files, now) => + await cleanupFiles(logDir, logger, { + fileType: "log file", + match: (name) => name.startsWith("coder-ssh") && name.endsWith(".log"), + pick: (files, now) => files .toSorted((a, b) => a.mtime - b.mtime) // oldest first .slice(0, -maxFilesToKeep) // keep the newest maxFilesToKeep diff --git a/src/telemetry/service.ts b/src/telemetry/service.ts index 78803668..ec2b2d34 100644 --- a/src/telemetry/service.ts +++ b/src/telemetry/service.ts @@ -56,11 +56,13 @@ export class TelemetryService implements vscode.Disposable { this.#configWatcher = watchConfigurationChanges( [{ setting: TELEMETRY_LEVEL_SETTING, getValue: readLevel }], (changes) => { - const raw = changes.get(TELEMETRY_LEVEL_SETTING); - if (!isTelemetryLevel(raw)) { + const next = changes.get(TELEMETRY_LEVEL_SETTING) as + | TelemetryLevel + | undefined; + if (!next) { return; } - this.#applyLevelChange(raw).catch((err) => { + this.#applyLevelChange(next).catch((err) => { this.logger.warn("Telemetry level change failed", err); }); }, diff --git a/src/telemetry/sinks/localJsonlSink.ts b/src/telemetry/sinks/localJsonlSink.ts new file mode 100644 index 00000000..010ce89c --- /dev/null +++ b/src/telemetry/sinks/localJsonlSink.ts @@ -0,0 +1,327 @@ +import * as fs from "node:fs/promises"; +import * as path from "node:path"; +import * as vscode from "vscode"; + +import { watchConfigurationChanges } from "../../configWatcher"; +import { cleanupFiles } from "../../util/fileCleanup"; + +import type { Logger } from "../../logging/logger"; +import type { TelemetryEvent, TelemetryLevel, TelemetrySink } from "../event"; + +const SINK_NAME = "local-jsonl"; +const FILE_PREFIX = "telemetry-"; +const FILE_SUFFIX = ".jsonl"; +const MS_PER_DAY = 24 * 60 * 60 * 1000; + +const SETTING_NAME = "coder.telemetry.localJsonl"; + +const DEFAULTS: LocalJsonlConfig = { + flushIntervalMs: 15_000, + flushBatchSize: 100, + bufferLimit: 500, + maxFileBytes: 5 * 1024 * 1024, + maxAgeDays: 30, + maxTotalBytes: 100 * 1024 * 1024, +}; + +export interface LocalJsonlSinkOptions { + baseDir: string; + sessionId: string; +} + +export interface LocalJsonlConfig { + flushIntervalMs: number; + flushBatchSize: number; + bufferLimit: number; + maxFileBytes: number; + maxAgeDays: number; + maxTotalBytes: number; +} + +interface CurrentFile { + date: string; + segment: number; + size: number; +} + +/** + * Writes telemetry events as JSON Lines. + * + * Each session writes its own files (`telemetry-YYYY-MM-DD-{sessionId8}.jsonl`, + * with `.N.jsonl` suffixes added once `maxFileBytes` is exceeded), so multiple + * VS Code windows sharing globalStorage cannot race on appends or rotation. + * Cleanup runs across every session's files for shared retention. + * + * `write` is synchronous and never throws. Disk I/O happens in `flush` and + * `dispose`, which catch errors and log them via the provided logger. + * + * Tunables come from the `coder.telemetry.localJsonl` setting (object-typed, + * not registered in package.json) and update reactively. + */ +export class LocalJsonlSink implements TelemetrySink { + public readonly name = SINK_NAME; + public readonly minLevel: TelemetryLevel = "local"; + + readonly #baseDir: string; + readonly #sessionSlug: string; + readonly #logger: Logger; + readonly #buffer: string[] = []; + readonly #configWatcher: vscode.Disposable; + #config: LocalJsonlConfig; + #flushTimer: NodeJS.Timeout | null = null; + #flushChain: Promise = Promise.resolve(); + #hasQueued = false; + #current: CurrentFile = { date: "", segment: 0, size: 0 }; + #disposed = false; + #overflowWarned = false; + + private constructor( + opts: LocalJsonlSinkOptions, + config: LocalJsonlConfig, + logger: Logger, + ) { + this.#baseDir = opts.baseDir; + this.#sessionSlug = toSessionSlug(opts.sessionId); + this.#logger = logger; + this.#config = config; + this.#configWatcher = watchConfigurationChanges( + [{ setting: SETTING_NAME, getValue: readConfig }], + (changes) => { + const next = changes.get(SETTING_NAME) as LocalJsonlConfig | undefined; + if (next) { + this.#config = next; + } + }, + ); + this.#scheduleNextFlush(); + } + + public static start( + opts: LocalJsonlSinkOptions, + logger: Logger, + ): LocalJsonlSink { + const config = readConfig(); + const sink = new LocalJsonlSink(opts, config, logger); + void cleanupOldTelemetryFiles(opts.baseDir, config, logger); + return sink; + } + + public write(event: TelemetryEvent): void { + if (this.#disposed) { + return; + } + let line: string; + try { + line = serializeEvent(event); + } catch (err) { + this.#logger.warn(`Telemetry sink '${this.name}' serialize failed`, err); + return; + } + this.#buffer.push(line); + + if (this.#buffer.length > this.#config.bufferLimit) { + const dropped = this.#buffer.length - this.#config.bufferLimit; + this.#buffer.splice(0, dropped); + if (!this.#overflowWarned) { + this.#overflowWarned = true; + this.#logger.warn( + `Telemetry sink '${this.name}' buffer overflow: dropped ${dropped} oldest event(s)`, + ); + } + } + + if (this.#buffer.length >= this.#config.flushBatchSize) { + void this.flush(); + } + } + + /** + * Coalesces concurrent flush requests. While a flush is running, at most + * one more is queued; further callers receive that same queued promise. + * Resolves once the buffer state at the time of the call has been written + * (or attempted; failures are logged, not thrown). + */ + public flush(): Promise { + if (this.#hasQueued) { + return this.#flushChain; + } + this.#hasQueued = true; + const next = (): Promise => { + this.#hasQueued = false; + return this.#doFlush(); + }; + this.#flushChain = this.#flushChain.then(next, next); + return this.#flushChain; + } + + public async dispose(): Promise { + if (this.#disposed) { + return; + } + this.#disposed = true; + this.#configWatcher.dispose(); + if (this.#flushTimer) { + clearTimeout(this.#flushTimer); + this.#flushTimer = null; + } + await this.flush(); + } + + #scheduleNextFlush(): void { + if (this.#disposed) { + return; + } + this.#flushTimer = setTimeout(() => { + this.flush() + .catch((err) => { + this.#logger.warn( + `Telemetry sink '${this.name}' scheduled flush failed`, + err, + ); + }) + .finally(() => { + this.#scheduleNextFlush(); + }); + }, this.#config.flushIntervalMs); + } + + async #doFlush(): Promise { + if (this.#buffer.length === 0) { + return; + } + const lines = this.#buffer.splice(0); + const payload = lines.join(""); + const next = this.#nextFile(payload.length); + const target = path.join(this.#baseDir, this.#fileName(next)); + try { + await fs.appendFile(target, payload, "utf8"); + this.#current = { ...next, size: next.size + payload.length }; + this.#overflowWarned = false; + } catch (err) { + this.#logger.warn(`Telemetry sink '${this.name}' flush failed`, err); + } + } + + #nextFile(payloadSize: number): CurrentFile { + const today = todayUtc(); + if (this.#current.date !== today) { + return { date: today, segment: 0, size: 0 }; + } + if ( + this.#current.size > 0 && + this.#current.size + payloadSize > this.#config.maxFileBytes + ) { + return { date: today, segment: this.#current.segment + 1, size: 0 }; + } + return this.#current; + } + + #fileName(file: CurrentFile): string { + const segment = file.segment > 0 ? `.${file.segment}` : ""; + return `${FILE_PREFIX}${file.date}-${this.#sessionSlug}${segment}${FILE_SUFFIX}`; + } +} + +async function cleanupOldTelemetryFiles( + baseDir: string, + config: LocalJsonlConfig, + logger: Logger, +): Promise { + try { + await fs.mkdir(baseDir, { recursive: true }); + } catch (err) { + logger.warn(`Telemetry sink '${SINK_NAME}' could not create base dir`, err); + return; + } + const maxAgeMs = config.maxAgeDays * MS_PER_DAY; + const { maxTotalBytes } = config; + await cleanupFiles(baseDir, logger, { + fileType: "telemetry file", + match: (name) => name.startsWith(FILE_PREFIX) && name.endsWith(FILE_SUFFIX), + pick: (files, now) => { + const toDelete: Array<{ name: string }> = []; + const fresh: typeof files = []; + let total = 0; + for (const f of files) { + if (now - f.mtime > maxAgeMs) { + toDelete.push({ name: f.name }); + } else { + fresh.push(f); + total += f.size; + } + } + fresh.sort((a, b) => a.mtime - b.mtime); + for (const f of fresh) { + if (total <= maxTotalBytes) { + break; + } + toDelete.push({ name: f.name }); + total -= f.size; + } + return toDelete; + }, + }); +} + +/** Reads `coder.telemetry.localJsonl`, falling back to defaults for invalid values. */ +function readConfig(): LocalJsonlConfig { + const raw = vscode.workspace.getConfiguration().get(SETTING_NAME); + const obj = + raw && typeof raw === "object" ? (raw as Record) : {}; + return { + flushIntervalMs: positiveNumber( + obj.flushIntervalMs, + DEFAULTS.flushIntervalMs, + ), + flushBatchSize: positiveNumber(obj.flushBatchSize, DEFAULTS.flushBatchSize), + bufferLimit: positiveNumber(obj.bufferLimit, DEFAULTS.bufferLimit), + maxFileBytes: positiveNumber(obj.maxFileBytes, DEFAULTS.maxFileBytes), + maxAgeDays: positiveNumber(obj.maxAgeDays, DEFAULTS.maxAgeDays), + maxTotalBytes: positiveNumber(obj.maxTotalBytes, DEFAULTS.maxTotalBytes), + }; +} + +function positiveNumber(value: unknown, fallback: number): number { + return typeof value === "number" && value > 0 ? value : fallback; +} + +function todayUtc(): string { + return new Date().toISOString().slice(0, 10); +} + +function toSessionSlug(sessionId: string): string { + const cleaned = sessionId.replace(/[^a-zA-Z0-9]/g, ""); + return cleaned.slice(0, 8) || "anon0000"; +} + +function serializeEvent(event: TelemetryEvent): string { + const out: Record = { + event_id: event.eventId, + event_name: event.eventName, + timestamp: event.timestamp, + event_sequence: event.eventSequence, + context: { + extension_version: event.context.extensionVersion, + machine_id: event.context.machineId, + session_id: event.context.sessionId, + os_type: event.context.osType, + os_version: event.context.osVersion, + host_arch: event.context.hostArch, + platform_name: event.context.platformName, + platform_version: event.context.platformVersion, + deployment_url: event.context.deploymentUrl, + }, + properties: event.properties, + measurements: event.measurements, + }; + if (event.traceId !== undefined) { + out.trace_id = event.traceId; + } + if (event.parentEventId !== undefined) { + out.parent_event_id = event.parentEventId; + } + if (event.error !== undefined) { + out.error = event.error; + } + return JSON.stringify(out) + "\n"; +} diff --git a/src/util/fileCleanup.ts b/src/util/fileCleanup.ts new file mode 100644 index 00000000..185a764e --- /dev/null +++ b/src/util/fileCleanup.ts @@ -0,0 +1,91 @@ +import * as fs from "node:fs/promises"; +import * as path from "node:path"; + +import type { Logger } from "../logging/logger"; + +export interface FileCleanupCandidate { + name: string; + mtime: number; + size: number; +} + +export interface FileCleanupOptions { + /** Human-readable noun used in log messages, e.g. "telemetry file". */ + fileType: string; + /** + * Optional name-based filter applied before stat. Use this to skip + * unrelated entries cheaply. Files for which `match` returns false are + * never stat'd or passed to `pick`. + */ + match?: (name: string) => boolean; + /** + * Picks files to delete after stat. Receives `{ name, mtime, size }` for + * each survivor of `match`, plus the current time. + */ + pick: (files: FileCleanupCandidate[], now: number) => Array<{ name: string }>; +} + +/** + * Stats files in `dir` in parallel (after the optional name-based `match`), + * lets `pick` choose which to delete, then unlinks them in parallel. + * Tolerates concurrent deletes from other processes (ENOENT is swallowed). + * Never throws; failures are logged via `logger.debug`. + */ +export async function cleanupFiles( + dir: string, + logger: Logger, + options: FileCleanupOptions, +): Promise { + const { fileType, match, pick } = options; + try { + const now = Date.now(); + const names = await fs.readdir(dir); + const candidates = match ? names.filter(match) : names; + + const withStats = await Promise.all( + candidates.map(async (name) => { + try { + const stats = await fs.stat(path.join(dir, name)); + return { + name, + mtime: stats.mtime.getTime(), + size: stats.size, + }; + } catch (error) { + if ((error as NodeJS.ErrnoException).code !== "ENOENT") { + logger.debug(`Failed to stat ${fileType} ${name}`, error); + } + return null; + } + }), + ); + + const toDelete = pick( + withStats.filter((f) => f !== null), + now, + ); + + const deleted = await Promise.all( + toDelete.map(async (file) => { + try { + await fs.unlink(path.join(dir, file.name)); + return file.name; + } catch (error) { + if ((error as NodeJS.ErrnoException).code !== "ENOENT") { + logger.debug(`Failed to delete ${fileType} ${file.name}`, error); + } + return null; + } + }), + ); + + const successful = deleted.filter((name) => name !== null); + if (successful.length > 0) { + logger.debug( + `Cleaned up ${successful.length} ${fileType}(s): ${successful.join(", ")}`, + ); + } + } catch { + // Directory may not exist yet, ignore. + } +} diff --git a/test/unit/telemetry/sinks/localJsonlSink.test.ts b/test/unit/telemetry/sinks/localJsonlSink.test.ts new file mode 100644 index 00000000..39ff2a9f --- /dev/null +++ b/test/unit/telemetry/sinks/localJsonlSink.test.ts @@ -0,0 +1,390 @@ +import { vol } from "memfs"; +import * as fsPromises from "node:fs/promises"; +import { + afterEach, + beforeEach, + describe, + expect, + it, + vi, + type MockInstance, +} from "vitest"; + +import { + LocalJsonlSink, + type LocalJsonlConfig, + type LocalJsonlSinkOptions, +} from "@/telemetry/sinks/localJsonlSink"; + +import { + createMockLogger, + MockConfigurationProvider, +} from "../../../mocks/testHelpers"; + +import type * as fs from "node:fs"; + +import type { TelemetryEvent } from "@/telemetry/event"; + +vi.mock("node:fs/promises", async () => { + const memfs: { fs: typeof fs } = await vi.importActual("memfs"); + return memfs.fs.promises; +}); + +const SETTING_NAME = "coder.telemetry.localJsonl"; +const BASE_DIR = "/telemetry"; +const SESSION_ID = "12345678-aaaa-bbbb-cccc-dddddddddddd"; +const SESSION_SLUG = "12345678"; +const OTHER_SLUG = "ffeeddcc"; + +// Effectively-disabled interval so tests that don't drive timers themselves +// never see the flush timer fire. +const TEST_CONFIG: Partial = { + flushIntervalMs: 1_000_000, +}; + +function todayUtc(): string { + return new Date().toISOString().slice(0, 10); +} + +function fileFor(date: string, slug = SESSION_SLUG, segment = 0): string { + const seg = segment > 0 ? `.${segment}` : ""; + return `${BASE_DIR}/telemetry-${date}-${slug}${seg}.jsonl`; +} + +function readJsonl(filePath: string): Array> { + const raw = vol.readFileSync(filePath, "utf8") as string; + return raw + .split("\n") + .filter((l) => l.length > 0) + .map((l) => JSON.parse(l)); +} + +function setMtimeAgo(filePath: string, ageMs: number): void { + const t = (Date.now() - ageMs) / 1000; + vol.utimesSync(filePath, t, t); +} + +describe("LocalJsonlSink", () => { + let sinks: LocalJsonlSink[]; + let nextSeq: () => number; + let configProvider: MockConfigurationProvider; + + function makeEvent(overrides: Partial = {}): TelemetryEvent { + const seq = nextSeq(); + return { + eventId: `id-${seq}`, + eventName: "test.event", + timestamp: "2026-05-04T12:00:00.000Z", + eventSequence: seq, + context: { + extensionVersion: "1.14.5", + machineId: "machine-id", + sessionId: "session-id", + osType: "linux", + osVersion: "6.0.0", + hostArch: "x64", + platformName: "Visual Studio Code", + platformVersion: "1.106.0", + deploymentUrl: "https://coder.example.com", + }, + properties: {}, + measurements: {}, + ...overrides, + }; + } + + function makeSink( + config: Partial = {}, + opts: Partial = {}, + ): { + sink: LocalJsonlSink; + logger: ReturnType; + } { + configProvider.set(SETTING_NAME, { ...TEST_CONFIG, ...config }); + const logger = createMockLogger(); + const sink = LocalJsonlSink.start( + { baseDir: BASE_DIR, sessionId: SESSION_ID, ...opts }, + logger, + ); + sinks.push(sink); + return { sink, logger }; + } + + function todaysFile(slug = SESSION_SLUG, segment = 0): string { + return fileFor(todayUtc(), slug, segment); + } + + beforeEach(() => { + vi.restoreAllMocks(); + vol.reset(); + let counter = 0; + nextSeq = () => counter++; + sinks = []; + configProvider = new MockConfigurationProvider(); + }); + + afterEach(async () => { + for (const s of sinks) { + await s.dispose(); + } + vi.useRealTimers(); + vol.reset(); + }); + + it("flushes the buffer when the interval fires", async () => { + vi.useFakeTimers(); + const { sink } = makeSink({ flushIntervalMs: 1000 }); + + sink.write(makeEvent()); + sink.write(makeEvent()); + expect(vol.existsSync(todaysFile())).toBe(false); + + await vi.advanceTimersByTimeAsync(1000); + expect(readJsonl(todaysFile())).toHaveLength(2); + }); + + it("flushes early once the buffer reaches flushBatchSize", async () => { + const { sink } = makeSink({ flushBatchSize: 3 }); + + sink.write(makeEvent()); + sink.write(makeEvent()); + expect(vol.existsSync(todaysFile())).toBe(false); + + sink.write(makeEvent()); + await vi.waitFor(() => expect(vol.existsSync(todaysFile())).toBe(true)); + expect(readJsonl(todaysFile())).toHaveLength(3); + }); + + it("drops oldest events and warns once when the buffer exceeds bufferLimit", async () => { + const { sink, logger } = makeSink({ + bufferLimit: 5, + flushBatchSize: 10_000, + }); + + for (let i = 0; i < 8; i++) { + sink.write(makeEvent()); + } + await sink.flush(); + + const overflows = vi + .mocked(logger.warn) + .mock.calls.filter((c) => String(c[0]).includes("buffer overflow")); + expect(overflows).toHaveLength(1); + expect(readJsonl(todaysFile()).map((l) => l.event_sequence)).toEqual([ + 3, 4, 5, 6, 7, + ]); + }); + + it("flushes pending events on dispose", async () => { + const { sink } = makeSink(); + sink.write(makeEvent()); + sink.write(makeEvent()); + + await sink.dispose(); + + expect(readJsonl(todaysFile())).toHaveLength(2); + }); + + it("rotates to a numbered segment once maxFileBytes is exceeded", async () => { + // A serialized event is around 400 bytes, so 900 holds 2 events but not 3. + const { sink } = makeSink({ maxFileBytes: 900 }); + + for (let i = 0; i < 3; i++) { + sink.write(makeEvent()); + await sink.flush(); + } + + expect(readJsonl(todaysFile(SESSION_SLUG, 0))).toHaveLength(2); + expect(readJsonl(todaysFile(SESSION_SLUG, 1))).toHaveLength(1); + }); + + it("starts a fresh file on UTC date rollover", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-05-04T23:59:00.000Z")); + const { sink } = makeSink(); + + sink.write(makeEvent()); + await sink.flush(); + expect(readJsonl(fileFor("2026-05-04"))).toHaveLength(1); + + vi.setSystemTime(new Date("2026-05-05T00:01:00.000Z")); + sink.write(makeEvent()); + await sink.flush(); + + expect(readJsonl(fileFor("2026-05-04"))).toHaveLength(1); + expect(readJsonl(fileFor("2026-05-05"))).toHaveLength(1); + }); + + it("deletes telemetry files older than maxAgeDays at startup", async () => { + const dayMs = 24 * 60 * 60 * 1000; + const today = todayUtc(); + vol.fromJSON({ + [`${BASE_DIR}/telemetry-2025-01-01-aaaa1111.jsonl`]: "{}\n", + [`${BASE_DIR}/telemetry-2025-01-02-aaaa1111.jsonl`]: "{}\n", + [`${BASE_DIR}/telemetry-${today}-bbbb2222.jsonl`]: "{}\n", + }); + setMtimeAgo(`${BASE_DIR}/telemetry-2025-01-01-aaaa1111.jsonl`, 60 * dayMs); + setMtimeAgo(`${BASE_DIR}/telemetry-2025-01-02-aaaa1111.jsonl`, 60 * dayMs); + + makeSink({ maxAgeDays: 30 }); + + await vi.waitFor(() => { + expect(vol.readdirSync(BASE_DIR)).toEqual([ + `telemetry-${today}-bbbb2222.jsonl`, + ]); + }); + }); + + it("trims oldest files when total size exceeds maxTotalBytes", async () => { + const dayMs = 24 * 60 * 60 * 1000; + const big = "x".repeat(2000); + vol.fromJSON({ + [`${BASE_DIR}/telemetry-2026-04-01-a.jsonl`]: big, + [`${BASE_DIR}/telemetry-2026-04-02-b.jsonl`]: big, + [`${BASE_DIR}/telemetry-2026-04-03-c.jsonl`]: big, + }); + setMtimeAgo(`${BASE_DIR}/telemetry-2026-04-01-a.jsonl`, 5 * dayMs); + setMtimeAgo(`${BASE_DIR}/telemetry-2026-04-02-b.jsonl`, 4 * dayMs); + setMtimeAgo(`${BASE_DIR}/telemetry-2026-04-03-c.jsonl`, 3 * dayMs); + + makeSink({ maxAgeDays: 365, maxTotalBytes: 4500 }); + + await vi.waitFor(() => { + expect(vol.readdirSync(BASE_DIR).toSorted()).toEqual([ + "telemetry-2026-04-02-b.jsonl", + "telemetry-2026-04-03-c.jsonl", + ]); + }); + }); + + it("emits valid snake_case JSONL with optional fields when set, omitting when not", async () => { + const { sink } = makeSink(); + + sink.write(makeEvent()); // no optional fields + sink.write( + makeEvent({ + eventName: "remote.connect", + properties: { result: "success" }, + measurements: { durationMs: 12.5 }, + traceId: "trace-1", + parentEventId: "parent-1", + error: { message: "nope", type: "TypeError", code: "E_FAIL" }, + }), + ); + await sink.dispose(); + + const [bare, full] = readJsonl(todaysFile()); + expect(bare).not.toHaveProperty("trace_id"); + expect(bare).not.toHaveProperty("parent_event_id"); + expect(bare).not.toHaveProperty("error"); + expect(full).toMatchObject({ + event_id: expect.any(String), + event_name: "remote.connect", + event_sequence: 1, + trace_id: "trace-1", + parent_event_id: "parent-1", + error: { message: "nope", type: "TypeError", code: "E_FAIL" }, + context: { + extension_version: "1.14.5", + deployment_url: "https://coder.example.com", + platform_name: "Visual Studio Code", + }, + properties: { result: "success" }, + measurements: { durationMs: 12.5 }, + }); + }); + + it("logs but does not throw when fs.appendFile rejects", async () => { + const { sink } = makeSink(); + vi.spyOn(fsPromises, "appendFile").mockRejectedValueOnce(new Error("boom")); + + sink.write(makeEvent()); + await expect(sink.flush()).resolves.toBeUndefined(); + + // Sink keeps working after a failure. + sink.write(makeEvent()); + await sink.flush(); + expect(readJsonl(todaysFile())).toHaveLength(1); + }); + + it("two sinks with different sessions write to disjoint files without corruption", async () => { + const { sink: a } = makeSink(); + const { sink: b } = makeSink( + {}, + { sessionId: "ffeeddcc-1111-2222-3333-444444444444" }, + ); + + for (let i = 0; i < 5; i++) { + a.write(makeEvent()); + b.write(makeEvent()); + } + await Promise.all([a.flush(), b.flush()]); + + expect(readJsonl(todaysFile(SESSION_SLUG))).toHaveLength(5); + expect(readJsonl(todaysFile(OTHER_SLUG))).toHaveLength(5); + }); + + it("coalesces concurrent flush requests into at most two appendFile calls", async () => { + const { sink } = makeSink(); + + let resolveFirst!: () => void; + const firstAppendDone = new Promise((r) => { + resolveFirst = r; + }); + const realAppend: typeof fsPromises.appendFile = + fsPromises.appendFile.bind(fsPromises); + const spy: MockInstance = vi + .spyOn(fsPromises, "appendFile") + .mockImplementationOnce(async (target, data, opts) => { + await firstAppendDone; + return realAppend(target, data, opts); + }); + + sink.write(makeEvent()); + const p1 = sink.flush(); + // Yield so doFlush #1 captures the buffer and reaches `await appendFile`. + await Promise.resolve(); + await Promise.resolve(); + + sink.write(makeEvent()); + const p2 = sink.flush(); + const p3 = sink.flush(); + expect(p3).toBe(p2); + + resolveFirst(); + await Promise.all([p1, p2, p3]); + + expect(spy).toHaveBeenCalledTimes(2); + expect(readJsonl(todaysFile()).map((l) => l.event_sequence)).toEqual([ + 0, 1, + ]); + }); + + it("picks up config changes reactively", async () => { + const { sink } = makeSink({ flushBatchSize: 100 }); + + sink.write(makeEvent()); + sink.write(makeEvent()); + expect(vol.existsSync(todaysFile())).toBe(false); + + // Lower the batch threshold; the next write should flush. + configProvider.set(SETTING_NAME, { ...TEST_CONFIG, flushBatchSize: 3 }); + sink.write(makeEvent()); + + await vi.waitFor(() => expect(vol.existsSync(todaysFile())).toBe(true)); + expect(readJsonl(todaysFile())).toHaveLength(3); + }); + + it("write() does not throw when an event cannot be serialized", async () => { + const { sink } = makeSink(); + const bad = makeEvent(); + (bad.properties as Record).circular = BigInt(1); + + expect(() => sink.write(bad)).not.toThrow(); + + // Sink remains usable for valid events. + sink.write(makeEvent()); + await sink.flush(); + expect(readJsonl(todaysFile())).toHaveLength(1); + }); +}); diff --git a/test/unit/util/fileCleanup.test.ts b/test/unit/util/fileCleanup.test.ts new file mode 100644 index 00000000..8d485def --- /dev/null +++ b/test/unit/util/fileCleanup.test.ts @@ -0,0 +1,123 @@ +import { vol } from "memfs"; +import * as fsPromises from "node:fs/promises"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +import { cleanupFiles, type FileCleanupCandidate } from "@/util/fileCleanup"; + +import { createMockLogger } from "../../mocks/testHelpers"; + +import type * as fs from "node:fs"; + +type PickFn = ( + files: FileCleanupCandidate[], + now: number, +) => Array<{ name: string }>; + +vi.mock("node:fs/promises", async () => { + const memfs: { fs: typeof fs } = await vi.importActual("memfs"); + return memfs.fs.promises; +}); + +describe("cleanupFiles", () => { + const logger = createMockLogger(); + + beforeEach(() => { + vi.restoreAllMocks(); + vol.reset(); + }); + + afterEach(() => { + vol.reset(); + }); + + it("does not throw when directory is missing", async () => { + await expect( + cleanupFiles("/nope", logger, { + fileType: "thing", + pick: () => [], + }), + ).resolves.toBeUndefined(); + }); + + it("passes every file's name, mtime, size, and now to the pick callback", async () => { + vol.fromJSON({ "/d/a": "hello", "/d/b": "world!" }); + vol.utimesSync("/d/a", 1_700_000_000, 1_700_000_000); + const before = Date.now(); + const pick = vi.fn(() => []); + + await cleanupFiles("/d", logger, { fileType: "thing", pick }); + + const [files, now] = pick.mock.calls[0]; + expect(files.toSorted((x, y) => x.name.localeCompare(y.name))).toEqual([ + { name: "a", mtime: 1_700_000_000_000, size: 5 }, + expect.objectContaining({ name: "b", size: 6 }), + ]); + expect(now).toBeGreaterThanOrEqual(before); + }); + + it("unlinks files chosen by pick and leaves the rest", async () => { + vol.fromJSON({ "/d/a": "1", "/d/b": "2", "/d/c": "3" }); + + await cleanupFiles("/d", logger, { + fileType: "thing", + pick: (files) => files.filter((f) => f.name !== "b"), + }); + + expect(vol.readdirSync("/d")).toEqual(["b"]); + }); + + it("tolerates ENOENT when a file disappears between stat and unlink", async () => { + vol.fromJSON({ "/d/a": "" }); + const localLogger = createMockLogger(); + + await cleanupFiles("/d", localLogger, { + fileType: "thing", + pick: (files) => { + vol.unlinkSync("/d/a"); + return files; + }, + }); + + expect(localLogger.warn).not.toHaveBeenCalled(); + expect(localLogger.error).not.toHaveBeenCalled(); + }); + + it("skips stat for files rejected by `match`", async () => { + vol.fromJSON({ + "/d/keep.json": "{}", + "/d/skip.txt": "no", + "/d/keep-too.json": "{}", + }); + const statSpy = vi.spyOn(fsPromises, "stat"); + + const pick = vi.fn(() => []); + await cleanupFiles("/d", logger, { + fileType: "thing", + match: (n) => n.endsWith(".json"), + pick, + }); + + const stattedNames = statSpy.mock.calls + .map((c) => String(c[0]).split("/").pop()) + .toSorted(); + expect(stattedNames).toEqual(["keep-too.json", "keep.json"]); + expect(pick.mock.calls[0][0].map((f) => f.name).toSorted()).toEqual([ + "keep-too.json", + "keep.json", + ]); + }); + + it("logs a debug summary listing the deleted files", async () => { + vol.fromJSON({ "/d/a": "" }); + const localLogger = createMockLogger(); + + await cleanupFiles("/d", localLogger, { + fileType: "widget", + pick: (files) => files, + }); + + expect(localLogger.debug).toHaveBeenCalledWith( + expect.stringContaining("Cleaned up 1 widget(s): a"), + ); + }); +}); From 20d3006ffa44f499730167e0775b07f07c467762 Mon Sep 17 00:00:00 2001 From: Ehab Younes Date: Mon, 4 May 2026 18:11:41 +0300 Subject: [PATCH 2/5] refactor(telemetry): pull settings into src/settings, simplify sink and tests Move readTelemetryLevel and readLocalJsonlConfig (with the setting key constants and defaults) into src/settings/telemetry.ts, matching the existing notifications and cli settings pattern. Both functions take a WorkspaceConfiguration argument so they are easy to test. LocalJsonlSink keeps its static start() factory but the constructor is now side-effect-free: it just assigns fields. start() is the single place that reads the config, subscribes the watcher, schedules the flush timer, and kicks off cleanup. The class implements vscode.Disposable explicitly. Tighten comments on the sink and on cleanupFiles. Inline #fileName into #doFlush since it had only one caller. Tests: per-test setup() harness instead of beforeEach-installed helpers. Module-level state is gone, all setup happens inside each test through setup(), and afterEach disposes any active sinks. Drop the "warns once" log-side-effect assertion from the buffer overflow test; the file content already proves the behavior. fileCleanup tests now verify the contract by what is left in the volume after the call rather than by spying on fs.stat or pick. The debug-summary log assertion is dropped, and the promise-identity check in the coalescer test is dropped (call count plus final file content already prove the contract). --- src/settings/telemetry.ts | 69 ++++++ src/telemetry/service.ts | 20 +- src/telemetry/sinks/localJsonlSink.ts | 202 ++++++++---------- src/util/fileCleanup.ts | 20 +- .../telemetry/sinks/localJsonlSink.test.ts | 191 +++++++---------- test/unit/util/fileCleanup.test.ts | 99 +++------ 6 files changed, 280 insertions(+), 321 deletions(-) create mode 100644 src/settings/telemetry.ts diff --git a/src/settings/telemetry.ts b/src/settings/telemetry.ts new file mode 100644 index 00000000..f35c1132 --- /dev/null +++ b/src/settings/telemetry.ts @@ -0,0 +1,69 @@ +import type { WorkspaceConfiguration } from "vscode"; + +import type { TelemetryLevel } from "../telemetry/event"; + +export const TELEMETRY_LEVEL_SETTING = "coder.telemetry.level"; +export const LOCAL_JSONL_SETTING = "coder.telemetry.localJsonl"; + +/** Telemetry level. Falls back to `local` for any invalid value. */ +export function readTelemetryLevel( + cfg: Pick, +): TelemetryLevel { + const value = cfg.get(TELEMETRY_LEVEL_SETTING); + return value === "off" || value === "local" ? value : "local"; +} + +export interface LocalJsonlConfig { + flushIntervalMs: number; + flushBatchSize: number; + bufferLimit: number; + maxFileBytes: number; + maxAgeDays: number; + maxTotalBytes: number; +} + +export const LOCAL_JSONL_DEFAULTS: LocalJsonlConfig = { + flushIntervalMs: 15_000, + flushBatchSize: 100, + bufferLimit: 500, + maxFileBytes: 5 * 1024 * 1024, + maxAgeDays: 30, + maxTotalBytes: 100 * 1024 * 1024, +}; + +/** Reads the local JSONL sink config, defaulting any missing or invalid + * field. Each field must be a positive number to override the default. */ +export function readLocalJsonlConfig( + cfg: Pick, +): LocalJsonlConfig { + const raw = cfg.get(LOCAL_JSONL_SETTING); + const obj = + raw && typeof raw === "object" ? (raw as Record) : {}; + return { + flushIntervalMs: positiveNumber( + obj.flushIntervalMs, + LOCAL_JSONL_DEFAULTS.flushIntervalMs, + ), + flushBatchSize: positiveNumber( + obj.flushBatchSize, + LOCAL_JSONL_DEFAULTS.flushBatchSize, + ), + bufferLimit: positiveNumber( + obj.bufferLimit, + LOCAL_JSONL_DEFAULTS.bufferLimit, + ), + maxFileBytes: positiveNumber( + obj.maxFileBytes, + LOCAL_JSONL_DEFAULTS.maxFileBytes, + ), + maxAgeDays: positiveNumber(obj.maxAgeDays, LOCAL_JSONL_DEFAULTS.maxAgeDays), + maxTotalBytes: positiveNumber( + obj.maxTotalBytes, + LOCAL_JSONL_DEFAULTS.maxTotalBytes, + ), + }; +} + +function positiveNumber(value: unknown, fallback: number): number { + return typeof value === "number" && value > 0 ? value : fallback; +} diff --git a/src/telemetry/service.ts b/src/telemetry/service.ts index ec2b2d34..00efd011 100644 --- a/src/telemetry/service.ts +++ b/src/telemetry/service.ts @@ -2,6 +2,10 @@ import * as vscode from "vscode"; import { watchConfigurationChanges } from "../configWatcher"; import { type Logger } from "../logging/logger"; +import { + TELEMETRY_LEVEL_SETTING, + readTelemetryLevel, +} from "../settings/telemetry"; import { buildSession, @@ -15,13 +19,14 @@ import { } from "./event"; import { NOOP_SPAN, type Span } from "./span"; -const TELEMETRY_LEVEL_SETTING = "coder.telemetry.level"; - const LEVEL_ORDER: Readonly> = { off: 0, local: 1, }; +const readLevel = (): TelemetryLevel => + readTelemetryLevel(vscode.workspace.getConfiguration()); + /** Trace context shared by all events in one trace. */ interface SpanOptions { traceId: string; @@ -279,14 +284,3 @@ export class TelemetryService implements vscode.Disposable { } } } - -function readLevel(): TelemetryLevel { - const value = vscode.workspace - .getConfiguration() - .get(TELEMETRY_LEVEL_SETTING); - return isTelemetryLevel(value) ? value : "local"; -} - -function isTelemetryLevel(value: unknown): value is TelemetryLevel { - return value === "off" || value === "local"; -} diff --git a/src/telemetry/sinks/localJsonlSink.ts b/src/telemetry/sinks/localJsonlSink.ts index 010ce89c..876e82a4 100644 --- a/src/telemetry/sinks/localJsonlSink.ts +++ b/src/telemetry/sinks/localJsonlSink.ts @@ -3,6 +3,11 @@ import * as path from "node:path"; import * as vscode from "vscode"; import { watchConfigurationChanges } from "../../configWatcher"; +import { + LOCAL_JSONL_SETTING, + readLocalJsonlConfig, + type LocalJsonlConfig, +} from "../../settings/telemetry"; import { cleanupFiles } from "../../util/fileCleanup"; import type { Logger } from "../../logging/logger"; @@ -13,31 +18,11 @@ const FILE_PREFIX = "telemetry-"; const FILE_SUFFIX = ".jsonl"; const MS_PER_DAY = 24 * 60 * 60 * 1000; -const SETTING_NAME = "coder.telemetry.localJsonl"; - -const DEFAULTS: LocalJsonlConfig = { - flushIntervalMs: 15_000, - flushBatchSize: 100, - bufferLimit: 500, - maxFileBytes: 5 * 1024 * 1024, - maxAgeDays: 30, - maxTotalBytes: 100 * 1024 * 1024, -}; - export interface LocalJsonlSinkOptions { baseDir: string; sessionId: string; } -export interface LocalJsonlConfig { - flushIntervalMs: number; - flushBatchSize: number; - bufferLimit: number; - maxFileBytes: number; - maxAgeDays: number; - maxTotalBytes: number; -} - interface CurrentFile { date: string; segment: number; @@ -45,20 +30,14 @@ interface CurrentFile { } /** - * Writes telemetry events as JSON Lines. - * - * Each session writes its own files (`telemetry-YYYY-MM-DD-{sessionId8}.jsonl`, - * with `.N.jsonl` suffixes added once `maxFileBytes` is exceeded), so multiple - * VS Code windows sharing globalStorage cannot race on appends or rotation. - * Cleanup runs across every session's files for shared retention. - * - * `write` is synchronous and never throws. Disk I/O happens in `flush` and - * `dispose`, which catch errors and log them via the provided logger. - * - * Tunables come from the `coder.telemetry.localJsonl` setting (object-typed, - * not registered in package.json) and update reactively. + * Writes telemetry events as JSON Lines. Each VS Code session writes its + * own files (`telemetry-YYYY-MM-DD-{sessionId8}.jsonl` plus `.N.jsonl` size + * segments), so concurrent windows cannot race on appends or rotation. + * `write` is sync and never throws; disk I/O happens in `flush` and + * `dispose` and catches errors. Tunables come from `coder.telemetry.localJsonl` + * and update reactively. */ -export class LocalJsonlSink implements TelemetrySink { +export class LocalJsonlSink implements TelemetrySink, vscode.Disposable { public readonly name = SINK_NAME; public readonly minLevel: TelemetryLevel = "local"; @@ -66,8 +45,8 @@ export class LocalJsonlSink implements TelemetrySink { readonly #sessionSlug: string; readonly #logger: Logger; readonly #buffer: string[] = []; - readonly #configWatcher: vscode.Disposable; #config: LocalJsonlConfig; + #configWatcher: vscode.Disposable | null = null; #flushTimer: NodeJS.Timeout | null = null; #flushChain: Promise = Promise.resolve(); #hasQueued = false; @@ -77,32 +56,41 @@ export class LocalJsonlSink implements TelemetrySink { private constructor( opts: LocalJsonlSinkOptions, - config: LocalJsonlConfig, logger: Logger, + config: LocalJsonlConfig, ) { this.#baseDir = opts.baseDir; this.#sessionSlug = toSessionSlug(opts.sessionId); this.#logger = logger; this.#config = config; - this.#configWatcher = watchConfigurationChanges( - [{ setting: SETTING_NAME, getValue: readConfig }], - (changes) => { - const next = changes.get(SETTING_NAME) as LocalJsonlConfig | undefined; - if (next) { - this.#config = next; - } - }, - ); - this.#scheduleNextFlush(); } + /** Constructs a sink and starts its timer, config watcher, and cleanup. */ public static start( opts: LocalJsonlSinkOptions, logger: Logger, ): LocalJsonlSink { - const config = readConfig(); - const sink = new LocalJsonlSink(opts, config, logger); - void cleanupOldTelemetryFiles(opts.baseDir, config, logger); + const config = readLocalJsonlConfig(vscode.workspace.getConfiguration()); + const sink = new LocalJsonlSink(opts, logger, config); + sink.#configWatcher = watchConfigurationChanges( + [ + { + setting: LOCAL_JSONL_SETTING, + getValue: () => + readLocalJsonlConfig(vscode.workspace.getConfiguration()), + }, + ], + (changes) => { + const next = changes.get(LOCAL_JSONL_SETTING) as + | LocalJsonlConfig + | undefined; + if (next) { + sink.#config = next; + } + }, + ); + sink.#scheduleNextFlush(); + void sink.#cleanupOldFiles(); return sink; } @@ -137,9 +125,7 @@ export class LocalJsonlSink implements TelemetrySink { /** * Coalesces concurrent flush requests. While a flush is running, at most - * one more is queued; further callers receive that same queued promise. - * Resolves once the buffer state at the time of the call has been written - * (or attempted; failures are logged, not thrown). + * one more is queued, and further callers share that queued promise. */ public flush(): Promise { if (this.#hasQueued) { @@ -159,7 +145,7 @@ export class LocalJsonlSink implements TelemetrySink { return; } this.#disposed = true; - this.#configWatcher.dispose(); + this.#configWatcher?.dispose(); if (this.#flushTimer) { clearTimeout(this.#flushTimer); this.#flushTimer = null; @@ -167,6 +153,7 @@ export class LocalJsonlSink implements TelemetrySink { await this.flush(); } + // Chained on completion (not setInterval) so flushes never overlap or pile up. #scheduleNextFlush(): void { if (this.#disposed) { return; @@ -192,12 +179,18 @@ export class LocalJsonlSink implements TelemetrySink { const lines = this.#buffer.splice(0); const payload = lines.join(""); const next = this.#nextFile(payload.length); - const target = path.join(this.#baseDir, this.#fileName(next)); + const seg = next.segment > 0 ? `.${next.segment}` : ""; + const target = path.join( + this.#baseDir, + `${FILE_PREFIX}${next.date}-${this.#sessionSlug}${seg}${FILE_SUFFIX}`, + ); try { await fs.appendFile(target, payload, "utf8"); this.#current = { ...next, size: next.size + payload.length }; this.#overflowWarned = false; } catch (err) { + // Leave #current and #overflowWarned alone: the next flush re-evaluates + // rotation, and the warn flag fires again on the next overflow burst. this.#logger.warn(`Telemetry sink '${this.name}' flush failed`, err); } } @@ -216,73 +209,46 @@ export class LocalJsonlSink implements TelemetrySink { return this.#current; } - #fileName(file: CurrentFile): string { - const segment = file.segment > 0 ? `.${file.segment}` : ""; - return `${FILE_PREFIX}${file.date}-${this.#sessionSlug}${segment}${FILE_SUFFIX}`; - } -} - -async function cleanupOldTelemetryFiles( - baseDir: string, - config: LocalJsonlConfig, - logger: Logger, -): Promise { - try { - await fs.mkdir(baseDir, { recursive: true }); - } catch (err) { - logger.warn(`Telemetry sink '${SINK_NAME}' could not create base dir`, err); - return; - } - const maxAgeMs = config.maxAgeDays * MS_PER_DAY; - const { maxTotalBytes } = config; - await cleanupFiles(baseDir, logger, { - fileType: "telemetry file", - match: (name) => name.startsWith(FILE_PREFIX) && name.endsWith(FILE_SUFFIX), - pick: (files, now) => { - const toDelete: Array<{ name: string }> = []; - const fresh: typeof files = []; - let total = 0; - for (const f of files) { - if (now - f.mtime > maxAgeMs) { - toDelete.push({ name: f.name }); - } else { - fresh.push(f); - total += f.size; + async #cleanupOldFiles(): Promise { + try { + await fs.mkdir(this.#baseDir, { recursive: true }); + } catch (err) { + this.#logger.warn( + `Telemetry sink '${this.name}' could not create base dir`, + err, + ); + return; + } + const maxAgeMs = this.#config.maxAgeDays * MS_PER_DAY; + const { maxTotalBytes } = this.#config; + await cleanupFiles(this.#baseDir, this.#logger, { + fileType: "telemetry file", + match: (name) => + name.startsWith(FILE_PREFIX) && name.endsWith(FILE_SUFFIX), + pick: (files, now) => { + const toDelete: Array<{ name: string }> = []; + const fresh: typeof files = []; + let total = 0; + for (const f of files) { + if (now - f.mtime > maxAgeMs) { + toDelete.push({ name: f.name }); + } else { + fresh.push(f); + total += f.size; + } } - } - fresh.sort((a, b) => a.mtime - b.mtime); - for (const f of fresh) { - if (total <= maxTotalBytes) { - break; + fresh.sort((a, b) => a.mtime - b.mtime); + for (const f of fresh) { + if (total <= maxTotalBytes) { + break; + } + toDelete.push({ name: f.name }); + total -= f.size; } - toDelete.push({ name: f.name }); - total -= f.size; - } - return toDelete; - }, - }); -} - -/** Reads `coder.telemetry.localJsonl`, falling back to defaults for invalid values. */ -function readConfig(): LocalJsonlConfig { - const raw = vscode.workspace.getConfiguration().get(SETTING_NAME); - const obj = - raw && typeof raw === "object" ? (raw as Record) : {}; - return { - flushIntervalMs: positiveNumber( - obj.flushIntervalMs, - DEFAULTS.flushIntervalMs, - ), - flushBatchSize: positiveNumber(obj.flushBatchSize, DEFAULTS.flushBatchSize), - bufferLimit: positiveNumber(obj.bufferLimit, DEFAULTS.bufferLimit), - maxFileBytes: positiveNumber(obj.maxFileBytes, DEFAULTS.maxFileBytes), - maxAgeDays: positiveNumber(obj.maxAgeDays, DEFAULTS.maxAgeDays), - maxTotalBytes: positiveNumber(obj.maxTotalBytes, DEFAULTS.maxTotalBytes), - }; -} - -function positiveNumber(value: unknown, fallback: number): number { - return typeof value === "number" && value > 0 ? value : fallback; + return toDelete; + }, + }); + } } function todayUtc(): string { diff --git a/src/util/fileCleanup.ts b/src/util/fileCleanup.ts index 185a764e..55b960b4 100644 --- a/src/util/fileCleanup.ts +++ b/src/util/fileCleanup.ts @@ -10,26 +10,18 @@ export interface FileCleanupCandidate { } export interface FileCleanupOptions { - /** Human-readable noun used in log messages, e.g. "telemetry file". */ + /** Noun used in log messages, e.g. "telemetry file". */ fileType: string; - /** - * Optional name-based filter applied before stat. Use this to skip - * unrelated entries cheaply. Files for which `match` returns false are - * never stat'd or passed to `pick`. - */ + /** Name-based filter applied before stat to skip unrelated entries. */ match?: (name: string) => boolean; - /** - * Picks files to delete after stat. Receives `{ name, mtime, size }` for - * each survivor of `match`, plus the current time. - */ + /** Picks files to delete from the stat'd survivors of `match`. */ pick: (files: FileCleanupCandidate[], now: number) => Array<{ name: string }>; } /** - * Stats files in `dir` in parallel (after the optional name-based `match`), - * lets `pick` choose which to delete, then unlinks them in parallel. - * Tolerates concurrent deletes from other processes (ENOENT is swallowed). - * Never throws; failures are logged via `logger.debug`. + * Lists files in `dir`, filters by name, stats and unlinks the picks in + * parallel. ENOENT is swallowed so concurrent deletes are safe. Never + * throws; failures go to `logger.debug`. */ export async function cleanupFiles( dir: string, diff --git a/test/unit/telemetry/sinks/localJsonlSink.test.ts b/test/unit/telemetry/sinks/localJsonlSink.test.ts index 39ff2a9f..cae42e5c 100644 --- a/test/unit/telemetry/sinks/localJsonlSink.test.ts +++ b/test/unit/telemetry/sinks/localJsonlSink.test.ts @@ -10,11 +10,8 @@ import { type MockInstance, } from "vitest"; -import { - LocalJsonlSink, - type LocalJsonlConfig, - type LocalJsonlSinkOptions, -} from "@/telemetry/sinks/localJsonlSink"; +import { type LocalJsonlConfig } from "@/settings/telemetry"; +import { LocalJsonlSink } from "@/telemetry/sinks/localJsonlSink"; import { createMockLogger, @@ -34,48 +31,64 @@ const SETTING_NAME = "coder.telemetry.localJsonl"; const BASE_DIR = "/telemetry"; const SESSION_ID = "12345678-aaaa-bbbb-cccc-dddddddddddd"; const SESSION_SLUG = "12345678"; -const OTHER_SLUG = "ffeeddcc"; - -// Effectively-disabled interval so tests that don't drive timers themselves -// never see the flush timer fire. -const TEST_CONFIG: Partial = { - flushIntervalMs: 1_000_000, -}; -function todayUtc(): string { - return new Date().toISOString().slice(0, 10); -} +const todayUtc = (): string => new Date().toISOString().slice(0, 10); -function fileFor(date: string, slug = SESSION_SLUG, segment = 0): string { +const fileFor = (date: string, slug = SESSION_SLUG, segment = 0): string => { const seg = segment > 0 ? `.${segment}` : ""; return `${BASE_DIR}/telemetry-${date}-${slug}${seg}.jsonl`; -} +}; + +const todaysFile = (slug?: string, segment?: number): string => + fileFor(todayUtc(), slug, segment); -function readJsonl(filePath: string): Array> { - const raw = vol.readFileSync(filePath, "utf8") as string; - return raw +const readJsonl = (filePath: string): Array> => + (vol.readFileSync(filePath, "utf8") as string) .split("\n") .filter((l) => l.length > 0) .map((l) => JSON.parse(l)); -} -function setMtimeAgo(filePath: string, ageMs: number): void { +const setMtimeAgo = (filePath: string, ageMs: number): void => { const t = (Date.now() - ageMs) / 1000; vol.utimesSync(filePath, t, t); -} +}; describe("LocalJsonlSink", () => { - let sinks: LocalJsonlSink[]; - let nextSeq: () => number; - let configProvider: MockConfigurationProvider; + let active: LocalJsonlSink[]; + let provider: MockConfigurationProvider; + + beforeEach(() => { + vi.restoreAllMocks(); + vol.reset(); + active = []; + provider = new MockConfigurationProvider(); + }); + + afterEach(async () => { + for (const s of active) { + await s.dispose(); + } + vi.useRealTimers(); + vol.reset(); + }); - function makeEvent(overrides: Partial = {}): TelemetryEvent { - const seq = nextSeq(); - return { + function setup( + config: Partial = {}, + sessionId = SESSION_ID, + ) { + provider.set(SETTING_NAME, { flushIntervalMs: 1_000_000, ...config }); + const logger = createMockLogger(); + const sink = LocalJsonlSink.start({ baseDir: BASE_DIR, sessionId }, logger); + active.push(sink); + + let seq = 0; + const makeEvent = ( + overrides: Partial = {}, + ): TelemetryEvent => ({ eventId: `id-${seq}`, eventName: "test.event", timestamp: "2026-05-04T12:00:00.000Z", - eventSequence: seq, + eventSequence: seq++, context: { extensionVersion: "1.14.5", machineId: "machine-id", @@ -90,50 +103,14 @@ describe("LocalJsonlSink", () => { properties: {}, measurements: {}, ...overrides, - }; - } - - function makeSink( - config: Partial = {}, - opts: Partial = {}, - ): { - sink: LocalJsonlSink; - logger: ReturnType; - } { - configProvider.set(SETTING_NAME, { ...TEST_CONFIG, ...config }); - const logger = createMockLogger(); - const sink = LocalJsonlSink.start( - { baseDir: BASE_DIR, sessionId: SESSION_ID, ...opts }, - logger, - ); - sinks.push(sink); - return { sink, logger }; - } + }); - function todaysFile(slug = SESSION_SLUG, segment = 0): string { - return fileFor(todayUtc(), slug, segment); + return { sink, logger, makeEvent }; } - beforeEach(() => { - vi.restoreAllMocks(); - vol.reset(); - let counter = 0; - nextSeq = () => counter++; - sinks = []; - configProvider = new MockConfigurationProvider(); - }); - - afterEach(async () => { - for (const s of sinks) { - await s.dispose(); - } - vi.useRealTimers(); - vol.reset(); - }); - it("flushes the buffer when the interval fires", async () => { vi.useFakeTimers(); - const { sink } = makeSink({ flushIntervalMs: 1000 }); + const { sink, makeEvent } = setup({ flushIntervalMs: 1000 }); sink.write(makeEvent()); sink.write(makeEvent()); @@ -144,7 +121,7 @@ describe("LocalJsonlSink", () => { }); it("flushes early once the buffer reaches flushBatchSize", async () => { - const { sink } = makeSink({ flushBatchSize: 3 }); + const { sink, makeEvent } = setup({ flushBatchSize: 3 }); sink.write(makeEvent()); sink.write(makeEvent()); @@ -155,8 +132,8 @@ describe("LocalJsonlSink", () => { expect(readJsonl(todaysFile())).toHaveLength(3); }); - it("drops oldest events and warns once when the buffer exceeds bufferLimit", async () => { - const { sink, logger } = makeSink({ + it("drops the oldest events when the buffer exceeds bufferLimit", async () => { + const { sink, makeEvent } = setup({ bufferLimit: 5, flushBatchSize: 10_000, }); @@ -166,17 +143,13 @@ describe("LocalJsonlSink", () => { } await sink.flush(); - const overflows = vi - .mocked(logger.warn) - .mock.calls.filter((c) => String(c[0]).includes("buffer overflow")); - expect(overflows).toHaveLength(1); expect(readJsonl(todaysFile()).map((l) => l.event_sequence)).toEqual([ 3, 4, 5, 6, 7, ]); }); it("flushes pending events on dispose", async () => { - const { sink } = makeSink(); + const { sink, makeEvent } = setup(); sink.write(makeEvent()); sink.write(makeEvent()); @@ -187,7 +160,7 @@ describe("LocalJsonlSink", () => { it("rotates to a numbered segment once maxFileBytes is exceeded", async () => { // A serialized event is around 400 bytes, so 900 holds 2 events but not 3. - const { sink } = makeSink({ maxFileBytes: 900 }); + const { sink, makeEvent } = setup({ maxFileBytes: 900 }); for (let i = 0; i < 3; i++) { sink.write(makeEvent()); @@ -201,11 +174,10 @@ describe("LocalJsonlSink", () => { it("starts a fresh file on UTC date rollover", async () => { vi.useFakeTimers(); vi.setSystemTime(new Date("2026-05-04T23:59:00.000Z")); - const { sink } = makeSink(); + const { sink, makeEvent } = setup(); sink.write(makeEvent()); await sink.flush(); - expect(readJsonl(fileFor("2026-05-04"))).toHaveLength(1); vi.setSystemTime(new Date("2026-05-05T00:01:00.000Z")); sink.write(makeEvent()); @@ -226,13 +198,13 @@ describe("LocalJsonlSink", () => { setMtimeAgo(`${BASE_DIR}/telemetry-2025-01-01-aaaa1111.jsonl`, 60 * dayMs); setMtimeAgo(`${BASE_DIR}/telemetry-2025-01-02-aaaa1111.jsonl`, 60 * dayMs); - makeSink({ maxAgeDays: 30 }); + setup({ maxAgeDays: 30 }); - await vi.waitFor(() => { + await vi.waitFor(() => expect(vol.readdirSync(BASE_DIR)).toEqual([ `telemetry-${today}-bbbb2222.jsonl`, - ]); - }); + ]), + ); }); it("trims oldest files when total size exceeds maxTotalBytes", async () => { @@ -247,20 +219,20 @@ describe("LocalJsonlSink", () => { setMtimeAgo(`${BASE_DIR}/telemetry-2026-04-02-b.jsonl`, 4 * dayMs); setMtimeAgo(`${BASE_DIR}/telemetry-2026-04-03-c.jsonl`, 3 * dayMs); - makeSink({ maxAgeDays: 365, maxTotalBytes: 4500 }); + setup({ maxAgeDays: 365, maxTotalBytes: 4500 }); - await vi.waitFor(() => { + await vi.waitFor(() => expect(vol.readdirSync(BASE_DIR).toSorted()).toEqual([ "telemetry-2026-04-02-b.jsonl", "telemetry-2026-04-03-c.jsonl", - ]); - }); + ]), + ); }); it("emits valid snake_case JSONL with optional fields when set, omitting when not", async () => { - const { sink } = makeSink(); + const { sink, makeEvent } = setup(); - sink.write(makeEvent()); // no optional fields + sink.write(makeEvent()); sink.write( makeEvent({ eventName: "remote.connect", @@ -294,45 +266,40 @@ describe("LocalJsonlSink", () => { }); }); - it("logs but does not throw when fs.appendFile rejects", async () => { - const { sink } = makeSink(); + it("does not throw when fs.appendFile rejects, and recovers on the next flush", async () => { + const { sink, makeEvent } = setup(); vi.spyOn(fsPromises, "appendFile").mockRejectedValueOnce(new Error("boom")); sink.write(makeEvent()); await expect(sink.flush()).resolves.toBeUndefined(); - // Sink keeps working after a failure. sink.write(makeEvent()); await sink.flush(); expect(readJsonl(todaysFile())).toHaveLength(1); }); - it("two sinks with different sessions write to disjoint files without corruption", async () => { - const { sink: a } = makeSink(); - const { sink: b } = makeSink( - {}, - { sessionId: "ffeeddcc-1111-2222-3333-444444444444" }, - ); + it("two sinks with different sessions write to disjoint files", async () => { + const a = setup(); + const b = setup({}, "ffeeddcc-1111-2222-3333-444444444444"); for (let i = 0; i < 5; i++) { - a.write(makeEvent()); - b.write(makeEvent()); + a.sink.write(a.makeEvent()); + b.sink.write(b.makeEvent()); } - await Promise.all([a.flush(), b.flush()]); + await Promise.all([a.sink.flush(), b.sink.flush()]); expect(readJsonl(todaysFile(SESSION_SLUG))).toHaveLength(5); - expect(readJsonl(todaysFile(OTHER_SLUG))).toHaveLength(5); + expect(readJsonl(todaysFile("ffeeddcc"))).toHaveLength(5); }); it("coalesces concurrent flush requests into at most two appendFile calls", async () => { - const { sink } = makeSink(); + const { sink, makeEvent } = setup(); let resolveFirst!: () => void; const firstAppendDone = new Promise((r) => { resolveFirst = r; }); - const realAppend: typeof fsPromises.appendFile = - fsPromises.appendFile.bind(fsPromises); + const realAppend = fsPromises.appendFile.bind(fsPromises); const spy: MockInstance = vi .spyOn(fsPromises, "appendFile") .mockImplementationOnce(async (target, data, opts) => { @@ -349,7 +316,6 @@ describe("LocalJsonlSink", () => { sink.write(makeEvent()); const p2 = sink.flush(); const p3 = sink.flush(); - expect(p3).toBe(p2); resolveFirst(); await Promise.all([p1, p2, p3]); @@ -361,14 +327,16 @@ describe("LocalJsonlSink", () => { }); it("picks up config changes reactively", async () => { - const { sink } = makeSink({ flushBatchSize: 100 }); + const { sink, makeEvent } = setup({ flushBatchSize: 100 }); sink.write(makeEvent()); sink.write(makeEvent()); expect(vol.existsSync(todaysFile())).toBe(false); - // Lower the batch threshold; the next write should flush. - configProvider.set(SETTING_NAME, { ...TEST_CONFIG, flushBatchSize: 3 }); + provider.set(SETTING_NAME, { + flushIntervalMs: 1_000_000, + flushBatchSize: 3, + }); sink.write(makeEvent()); await vi.waitFor(() => expect(vol.existsSync(todaysFile())).toBe(true)); @@ -376,13 +344,12 @@ describe("LocalJsonlSink", () => { }); it("write() does not throw when an event cannot be serialized", async () => { - const { sink } = makeSink(); + const { sink, makeEvent } = setup(); const bad = makeEvent(); (bad.properties as Record).circular = BigInt(1); expect(() => sink.write(bad)).not.toThrow(); - // Sink remains usable for valid events. sink.write(makeEvent()); await sink.flush(); expect(readJsonl(todaysFile())).toHaveLength(1); diff --git a/test/unit/util/fileCleanup.test.ts b/test/unit/util/fileCleanup.test.ts index 8d485def..8d53ef19 100644 --- a/test/unit/util/fileCleanup.test.ts +++ b/test/unit/util/fileCleanup.test.ts @@ -1,26 +1,18 @@ import { vol } from "memfs"; -import * as fsPromises from "node:fs/promises"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import { cleanupFiles, type FileCleanupCandidate } from "@/util/fileCleanup"; +import { cleanupFiles } from "@/util/fileCleanup"; import { createMockLogger } from "../../mocks/testHelpers"; import type * as fs from "node:fs"; -type PickFn = ( - files: FileCleanupCandidate[], - now: number, -) => Array<{ name: string }>; - vi.mock("node:fs/promises", async () => { const memfs: { fs: typeof fs } = await vi.importActual("memfs"); return memfs.fs.promises; }); describe("cleanupFiles", () => { - const logger = createMockLogger(); - beforeEach(() => { vi.restoreAllMocks(); vol.reset(); @@ -30,33 +22,20 @@ describe("cleanupFiles", () => { vol.reset(); }); - it("does not throw when directory is missing", async () => { + function setup(files: Record = {}) { + vol.fromJSON(files); + return { logger: createMockLogger() }; + } + + it("does not throw when the directory is missing", async () => { + const { logger } = setup(); await expect( - cleanupFiles("/nope", logger, { - fileType: "thing", - pick: () => [], - }), + cleanupFiles("/nope", logger, { fileType: "thing", pick: () => [] }), ).resolves.toBeUndefined(); }); - it("passes every file's name, mtime, size, and now to the pick callback", async () => { - vol.fromJSON({ "/d/a": "hello", "/d/b": "world!" }); - vol.utimesSync("/d/a", 1_700_000_000, 1_700_000_000); - const before = Date.now(); - const pick = vi.fn(() => []); - - await cleanupFiles("/d", logger, { fileType: "thing", pick }); - - const [files, now] = pick.mock.calls[0]; - expect(files.toSorted((x, y) => x.name.localeCompare(y.name))).toEqual([ - { name: "a", mtime: 1_700_000_000_000, size: 5 }, - expect.objectContaining({ name: "b", size: 6 }), - ]); - expect(now).toBeGreaterThanOrEqual(before); - }); - - it("unlinks files chosen by pick and leaves the rest", async () => { - vol.fromJSON({ "/d/a": "1", "/d/b": "2", "/d/c": "3" }); + it("unlinks the files chosen by pick and leaves the rest", async () => { + const { logger } = setup({ "/d/a": "1", "/d/b": "2", "/d/c": "3" }); await cleanupFiles("/d", logger, { fileType: "thing", @@ -66,58 +45,50 @@ describe("cleanupFiles", () => { expect(vol.readdirSync("/d")).toEqual(["b"]); }); - it("tolerates ENOENT when a file disappears between stat and unlink", async () => { - vol.fromJSON({ "/d/a": "" }); - const localLogger = createMockLogger(); + it("exposes mtime, size, and the current time so pick can filter on them", async () => { + const { logger } = setup({ + "/d/old-big": "x".repeat(100), + "/d/new-small": "x", + }); + // 1970-01-01: definitely older than `now`. + vol.utimesSync("/d/old-big", 1, 1); - await cleanupFiles("/d", localLogger, { + await cleanupFiles("/d", logger, { fileType: "thing", - pick: (files) => { - vol.unlinkSync("/d/a"); - return files; - }, + pick: (files, now) => + files.filter((f) => now - f.mtime > 1000 && f.size > 50), }); - expect(localLogger.warn).not.toHaveBeenCalled(); - expect(localLogger.error).not.toHaveBeenCalled(); + expect(vol.readdirSync("/d")).toEqual(["new-small"]); }); - it("skips stat for files rejected by `match`", async () => { - vol.fromJSON({ + it("only feeds pick the files matched by `match`", async () => { + const { logger } = setup({ "/d/keep.json": "{}", "/d/skip.txt": "no", "/d/keep-too.json": "{}", }); - const statSpy = vi.spyOn(fsPromises, "stat"); - const pick = vi.fn(() => []); await cleanupFiles("/d", logger, { fileType: "thing", match: (n) => n.endsWith(".json"), - pick, + pick: (files) => files, }); - const stattedNames = statSpy.mock.calls - .map((c) => String(c[0]).split("/").pop()) - .toSorted(); - expect(stattedNames).toEqual(["keep-too.json", "keep.json"]); - expect(pick.mock.calls[0][0].map((f) => f.name).toSorted()).toEqual([ - "keep-too.json", - "keep.json", - ]); + expect(vol.readdirSync("/d")).toEqual(["skip.txt"]); }); - it("logs a debug summary listing the deleted files", async () => { - vol.fromJSON({ "/d/a": "" }); - const localLogger = createMockLogger(); + it("keeps going when a file disappears between stat and unlink", async () => { + const { logger } = setup({ "/d/a": "1", "/d/b": "2" }); - await cleanupFiles("/d", localLogger, { - fileType: "widget", - pick: (files) => files, + await cleanupFiles("/d", logger, { + fileType: "thing", + pick: (files) => { + vol.unlinkSync("/d/a"); + return files; + }, }); - expect(localLogger.debug).toHaveBeenCalledWith( - expect.stringContaining("Cleaned up 1 widget(s): a"), - ); + expect(vol.readdirSync("/d")).toEqual([]); }); }); From 359fd45e7f3deed9ad6610f19d597cf95b27a1a0 Mon Sep 17 00:00:00 2001 From: Ehab Younes Date: Tue, 5 May 2026 02:13:12 +0300 Subject: [PATCH 3/5] fix(telemetry): address review feedback on local JSONL sink - Register coder.telemetry.level and coder.telemetry.localJsonl in package.json so they show up in the settings UI with descriptions, enums, per-field minimums, and defaults. - Use Buffer.byteLength for UTF-8 size accounting so rotation triggers on actual bytes, not UTF-16 code units. - Seed #current.size from disk on the first flush after activation so an Extension Host restart does not let segment 0 grow past maxFileBytes. - Reset #overflowWarned in the flush failure path and report the discarded event count, so persistent failures stay visible in logs. - Warn when bufferLimit < flushBatchSize at start and on config change; cross-field invariant is otherwise silently broken. - Scope cleanupFiles' outer try/catch to readdir only, log non-ENOENT errors at debug, and basename-clamp pick names so unlinks cannot escape the cleanup directory. - Refactor LocalJsonlSink for readability: extract pickByAgeAndSize, reorder methods newspaper-style, drop the unused #ready/#dirEnsured flags now that mkdir runs on every flush (idempotent and cheap). - Add tests for the settings module, cleanup edge cases, the warn reset invariant, post-dispose writes, and multi-file deletion. --- package.json | 70 ++++++ src/telemetry/sinks/localJsonlSink.ts | 215 +++++++++++------- src/util/fileCleanup.ts | 94 ++++---- test/unit/settings/telemetry.test.ts | 99 ++++++++ .../telemetry/sinks/localJsonlSink.test.ts | 75 +++++- test/unit/util/fileCleanup.test.ts | 27 +++ 6 files changed, 447 insertions(+), 133 deletions(-) create mode 100644 test/unit/settings/telemetry.test.ts diff --git a/package.json b/package.json index 66e4c87e..098f36fd 100644 --- a/package.json +++ b/package.json @@ -189,6 +189,76 @@ "Logs everything from *headers* plus request and response bodies (may include sensitive data)" ], "default": "basic" + }, + "coder.telemetry.level": { + "markdownDescription": "Controls Coder extension telemetry collection. Used to diagnose extension issues.", + "type": "string", + "enum": [ + "off", + "local" + ], + "markdownEnumDescriptions": [ + "Disable telemetry collection.", + "Record events on this machine only." + ], + "default": "local", + "tags": [ + "telemetry" + ] + }, + "coder.telemetry.localJsonl": { + "markdownDescription": "Tunables for the local JSONL telemetry sink. Active when `#coder.telemetry.level#` is at least `local`. Events are written to JSONL files under the extension's global storage directory. Missing or invalid fields fall back to defaults.", + "type": "object", + "additionalProperties": false, + "properties": { + "flushIntervalMs": { + "type": "number", + "minimum": 1000, + "default": 15000, + "markdownDescription": "Interval in milliseconds between scheduled flushes of the in-memory buffer to disk." + }, + "flushBatchSize": { + "type": "number", + "minimum": 1, + "default": 100, + "markdownDescription": "Buffer size that triggers an early flush before the next scheduled interval." + }, + "bufferLimit": { + "type": "number", + "minimum": 1, + "default": 500, + "markdownDescription": "Maximum number of events held in memory. Oldest events are dropped on overflow. Should be at least `flushBatchSize`." + }, + "maxFileBytes": { + "type": "number", + "minimum": 1024, + "default": 5242880, + "markdownDescription": "Maximum size in bytes of a single JSONL file before rotating to a new segment." + }, + "maxAgeDays": { + "type": "number", + "minimum": 1, + "default": 30, + "markdownDescription": "Telemetry files older than this many days are deleted at activation." + }, + "maxTotalBytes": { + "type": "number", + "minimum": 1024, + "default": 104857600, + "markdownDescription": "Total bytes across all telemetry files. Oldest files are deleted at activation until under the cap." + } + }, + "default": { + "flushIntervalMs": 15000, + "flushBatchSize": 100, + "bufferLimit": 500, + "maxFileBytes": 5242880, + "maxAgeDays": 30, + "maxTotalBytes": 104857600 + }, + "tags": [ + "telemetry" + ] } } }, diff --git a/src/telemetry/sinks/localJsonlSink.ts b/src/telemetry/sinks/localJsonlSink.ts index 876e82a4..06fc71f7 100644 --- a/src/telemetry/sinks/localJsonlSink.ts +++ b/src/telemetry/sinks/localJsonlSink.ts @@ -8,7 +8,10 @@ import { readLocalJsonlConfig, type LocalJsonlConfig, } from "../../settings/telemetry"; -import { cleanupFiles } from "../../util/fileCleanup"; +import { + cleanupFiles, + type FileCleanupCandidate, +} from "../../util/fileCleanup"; import type { Logger } from "../../logging/logger"; import type { TelemetryEvent, TelemetryLevel, TelemetrySink } from "../event"; @@ -71,24 +74,9 @@ export class LocalJsonlSink implements TelemetrySink, vscode.Disposable { logger: Logger, ): LocalJsonlSink { const config = readLocalJsonlConfig(vscode.workspace.getConfiguration()); + warnIfBufferTooSmall(config, logger); const sink = new LocalJsonlSink(opts, logger, config); - sink.#configWatcher = watchConfigurationChanges( - [ - { - setting: LOCAL_JSONL_SETTING, - getValue: () => - readLocalJsonlConfig(vscode.workspace.getConfiguration()), - }, - ], - (changes) => { - const next = changes.get(LOCAL_JSONL_SETTING) as - | LocalJsonlConfig - | undefined; - if (next) { - sink.#config = next; - } - }, - ); + sink.#configWatcher = sink.#watchConfig(); sink.#scheduleNextFlush(); void sink.#cleanupOldFiles(); return sink; @@ -106,18 +94,7 @@ export class LocalJsonlSink implements TelemetrySink, vscode.Disposable { return; } this.#buffer.push(line); - - if (this.#buffer.length > this.#config.bufferLimit) { - const dropped = this.#buffer.length - this.#config.bufferLimit; - this.#buffer.splice(0, dropped); - if (!this.#overflowWarned) { - this.#overflowWarned = true; - this.#logger.warn( - `Telemetry sink '${this.name}' buffer overflow: dropped ${dropped} oldest event(s)`, - ); - } - } - + this.#enforceBufferLimit(); if (this.#buffer.length >= this.#config.flushBatchSize) { void this.flush(); } @@ -132,11 +109,11 @@ export class LocalJsonlSink implements TelemetrySink, vscode.Disposable { return this.#flushChain; } this.#hasQueued = true; - const next = (): Promise => { + const run = async (): Promise => { this.#hasQueued = false; - return this.#doFlush(); + await this.#doFlush(); }; - this.#flushChain = this.#flushChain.then(next, next); + this.#flushChain = this.#flushChain.then(run, run); return this.#flushChain; } @@ -153,6 +130,42 @@ export class LocalJsonlSink implements TelemetrySink, vscode.Disposable { await this.flush(); } + #enforceBufferLimit(): void { + const overage = this.#buffer.length - this.#config.bufferLimit; + if (overage <= 0) { + return; + } + this.#buffer.splice(0, overage); + if (!this.#overflowWarned) { + this.#overflowWarned = true; + this.#logger.warn( + `Telemetry sink '${this.name}' buffer overflow: dropped ${overage} oldest event(s)`, + ); + } + } + + #watchConfig(): vscode.Disposable { + return watchConfigurationChanges( + [ + { + setting: LOCAL_JSONL_SETTING, + getValue: () => + readLocalJsonlConfig(vscode.workspace.getConfiguration()), + }, + ], + (changes) => { + const next = changes.get(LOCAL_JSONL_SETTING) as + | LocalJsonlConfig + | undefined; + if (!next) { + return; + } + warnIfBufferTooSmall(next, this.#logger); + this.#config = next; + }, + ); + } + // Chained on completion (not setInterval) so flushes never overlap or pile up. #scheduleNextFlush(): void { if (this.#disposed) { @@ -166,9 +179,7 @@ export class LocalJsonlSink implements TelemetrySink, vscode.Disposable { err, ); }) - .finally(() => { - this.#scheduleNextFlush(); - }); + .finally(() => this.#scheduleNextFlush()); }, this.#config.flushIntervalMs); } @@ -176,85 +187,117 @@ export class LocalJsonlSink implements TelemetrySink, vscode.Disposable { if (this.#buffer.length === 0) { return; } + // Capture before any await so concurrent writes go to the next batch. const lines = this.#buffer.splice(0); const payload = lines.join(""); - const next = this.#nextFile(payload.length); - const seg = next.segment > 0 ? `.${next.segment}` : ""; - const target = path.join( - this.#baseDir, - `${FILE_PREFIX}${next.date}-${this.#sessionSlug}${seg}${FILE_SUFFIX}`, - ); + const payloadBytes = Buffer.byteLength(payload, "utf8"); try { - await fs.appendFile(target, payload, "utf8"); - this.#current = { ...next, size: next.size + payload.length }; + await this.#append(payload, payloadBytes); this.#overflowWarned = false; } catch (err) { - // Leave #current and #overflowWarned alone: the next flush re-evaluates - // rotation, and the warn flag fires again on the next overflow burst. - this.#logger.warn(`Telemetry sink '${this.name}' flush failed`, err); + // Reset so the next overflow burst is logged on repeated failures. + this.#overflowWarned = false; + this.#logger.warn( + `Telemetry sink '${this.name}' flush failed, ${lines.length} event(s) discarded`, + err, + ); } } - #nextFile(payloadSize: number): CurrentFile { + async #append(payload: string, payloadBytes: number): Promise { + // mkdir is idempotent with recursive:true; cheap enough to do per flush. + await fs.mkdir(this.#baseDir, { recursive: true }); + const next = await this.#nextFile(payloadBytes); + await fs.appendFile(this.#segmentPath(next), payload, "utf8"); + this.#current = { ...next, size: next.size + payloadBytes }; + } + + async #nextFile(payloadSize: number): Promise { const today = todayUtc(); if (this.#current.date !== today) { - return { date: today, segment: 0, size: 0 }; + // Seed from disk: an Extension Host restart may have left bytes in + // today's segment 0 from earlier in this session. + const target = this.#segmentPath({ date: today, segment: 0 }); + const existing = await statBytes(target); + return { date: today, segment: 0, size: existing }; } - if ( - this.#current.size > 0 && - this.#current.size + payloadSize > this.#config.maxFileBytes - ) { + const wouldExceed = + this.#current.size + payloadSize > this.#config.maxFileBytes; + if (this.#current.size > 0 && wouldExceed) { return { date: today, segment: this.#current.segment + 1, size: 0 }; } return this.#current; } + #segmentPath(file: { date: string; segment: number }): string { + const seg = file.segment > 0 ? `.${file.segment}` : ""; + return path.join( + this.#baseDir, + `${FILE_PREFIX}${file.date}-${this.#sessionSlug}${seg}${FILE_SUFFIX}`, + ); + } + async #cleanupOldFiles(): Promise { - try { - await fs.mkdir(this.#baseDir, { recursive: true }); - } catch (err) { - this.#logger.warn( - `Telemetry sink '${this.name}' could not create base dir`, - err, - ); - return; - } - const maxAgeMs = this.#config.maxAgeDays * MS_PER_DAY; - const { maxTotalBytes } = this.#config; await cleanupFiles(this.#baseDir, this.#logger, { fileType: "telemetry file", match: (name) => name.startsWith(FILE_PREFIX) && name.endsWith(FILE_SUFFIX), - pick: (files, now) => { - const toDelete: Array<{ name: string }> = []; - const fresh: typeof files = []; - let total = 0; - for (const f of files) { - if (now - f.mtime > maxAgeMs) { - toDelete.push({ name: f.name }); - } else { - fresh.push(f); - total += f.size; - } - } - fresh.sort((a, b) => a.mtime - b.mtime); - for (const f of fresh) { - if (total <= maxTotalBytes) { - break; - } - toDelete.push({ name: f.name }); - total -= f.size; - } - return toDelete; - }, + pick: pickByAgeAndSize( + this.#config.maxAgeDays * MS_PER_DAY, + this.#config.maxTotalBytes, + ), }); } } +function pickByAgeAndSize(maxAgeMs: number, maxTotalBytes: number) { + return ( + files: FileCleanupCandidate[], + now: number, + ): Array<{ name: string }> => { + const toDelete: Array<{ name: string }> = []; + const survivors: FileCleanupCandidate[] = []; + let totalBytes = 0; + for (const file of files) { + if (now - file.mtime > maxAgeMs) { + toDelete.push({ name: file.name }); + } else { + survivors.push(file); + totalBytes += file.size; + } + } + survivors.sort((a, b) => a.mtime - b.mtime); + for (const file of survivors) { + if (totalBytes <= maxTotalBytes) { + break; + } + toDelete.push({ name: file.name }); + totalBytes -= file.size; + } + return toDelete; + }; +} + +async function statBytes(target: string): Promise { + try { + return (await fs.stat(target)).size; + } catch { + return 0; + } +} + function todayUtc(): string { return new Date().toISOString().slice(0, 10); } +function warnIfBufferTooSmall(config: LocalJsonlConfig, logger: Logger): void { + if (config.bufferLimit < config.flushBatchSize) { + logger.warn( + `Telemetry sink '${SINK_NAME}' bufferLimit (${config.bufferLimit}) is below flushBatchSize (${config.flushBatchSize}); the batch-size flush trigger is unreachable and overflow will drop events instead. Raise bufferLimit or lower flushBatchSize.`, + ); + } +} + function toSessionSlug(sessionId: string): string { const cleaned = sessionId.replace(/[^a-zA-Z0-9]/g, ""); return cleaned.slice(0, 8) || "anon0000"; diff --git a/src/util/fileCleanup.ts b/src/util/fileCleanup.ts index 55b960b4..263adbbe 100644 --- a/src/util/fileCleanup.ts +++ b/src/util/fileCleanup.ts @@ -29,55 +29,63 @@ export async function cleanupFiles( options: FileCleanupOptions, ): Promise { const { fileType, match, pick } = options; + const now = Date.now(); + let names: string[]; try { - const now = Date.now(); - const names = await fs.readdir(dir); - const candidates = match ? names.filter(match) : names; + names = await fs.readdir(dir); + } catch (error) { + // ENOENT just means the dir hasn't been created yet; anything else + // (EACCES, EMFILE, ...) is a real failure worth surfacing. + if ((error as NodeJS.ErrnoException).code !== "ENOENT") { + logger.debug(`Failed to read ${fileType} directory ${dir}`, error); + } + return; + } + const candidates = match ? names.filter(match) : names; - const withStats = await Promise.all( - candidates.map(async (name) => { - try { - const stats = await fs.stat(path.join(dir, name)); - return { - name, - mtime: stats.mtime.getTime(), - size: stats.size, - }; - } catch (error) { - if ((error as NodeJS.ErrnoException).code !== "ENOENT") { - logger.debug(`Failed to stat ${fileType} ${name}`, error); - } - return null; + const withStats = await Promise.all( + candidates.map(async (name) => { + try { + const stats = await fs.stat(path.join(dir, name)); + return { + name, + mtime: stats.mtime.getTime(), + size: stats.size, + }; + } catch (error) { + if ((error as NodeJS.ErrnoException).code !== "ENOENT") { + logger.debug(`Failed to stat ${fileType} ${name}`, error); } - }), - ); + return null; + } + }), + ); - const toDelete = pick( - withStats.filter((f) => f !== null), - now, - ); + const toDelete = pick( + withStats.filter((f) => f !== null), + now, + ); - const deleted = await Promise.all( - toDelete.map(async (file) => { - try { - await fs.unlink(path.join(dir, file.name)); - return file.name; - } catch (error) { - if ((error as NodeJS.ErrnoException).code !== "ENOENT") { - logger.debug(`Failed to delete ${fileType} ${file.name}`, error); - } - return null; + const deleted = await Promise.all( + toDelete.map(async (file) => { + // Basename only; never let `pick` escape `dir`. + const safeName = path.basename(file.name); + try { + await fs.unlink(path.join(dir, safeName)); + return safeName; + } catch (error) { + if ((error as NodeJS.ErrnoException).code !== "ENOENT") { + logger.debug(`Failed to delete ${fileType} ${safeName}`, error); } - }), - ); + return null; + } + }), + ); - const successful = deleted.filter((name) => name !== null); - if (successful.length > 0) { - logger.debug( - `Cleaned up ${successful.length} ${fileType}(s): ${successful.join(", ")}`, - ); - } - } catch { - // Directory may not exist yet, ignore. + const successful = deleted.filter((name) => name !== null); + if (successful.length > 0) { + logger.debug( + `Cleaned up ${successful.length} ${fileType}(s): ${successful.join(", ")}`, + ); } } diff --git a/test/unit/settings/telemetry.test.ts b/test/unit/settings/telemetry.test.ts new file mode 100644 index 00000000..3a728eb5 --- /dev/null +++ b/test/unit/settings/telemetry.test.ts @@ -0,0 +1,99 @@ +import { beforeEach, describe, expect, it } from "vitest"; + +import { + LOCAL_JSONL_DEFAULTS, + LOCAL_JSONL_SETTING, + TELEMETRY_LEVEL_SETTING, + readLocalJsonlConfig, + readTelemetryLevel, +} from "@/settings/telemetry"; + +import { MockConfigurationProvider } from "../../mocks/testHelpers"; + +describe("telemetry settings", () => { + let config: MockConfigurationProvider; + + beforeEach(() => { + config = new MockConfigurationProvider(); + }); + + describe("readTelemetryLevel", () => { + it.each([ + ["off", "off"], + ["local", "local"], + [undefined, "local"], + ["bogus", "local"], + [42, "local"], + [null, "local"], + ])("when value is %p, returns %p", (value, expected) => { + if (value !== undefined) { + config.set(TELEMETRY_LEVEL_SETTING, value); + } + expect(readTelemetryLevel(config)).toBe(expected); + }); + }); + + describe("readLocalJsonlConfig", () => { + it("returns defaults when unset", () => { + expect(readLocalJsonlConfig(config)).toEqual(LOCAL_JSONL_DEFAULTS); + }); + + it.each([ + ["a string", "nope"], + ["a boolean", true], + ["null", null], + ["an array", [1, 2]], + ])("returns defaults when the raw value is %s", (_, raw) => { + config.set(LOCAL_JSONL_SETTING, raw); + expect(readLocalJsonlConfig(config)).toEqual(LOCAL_JSONL_DEFAULTS); + }); + + it("accepts a fully-specified object", () => { + const custom = { + flushIntervalMs: 1_000, + flushBatchSize: 10, + bufferLimit: 50, + maxFileBytes: 1024, + maxAgeDays: 7, + maxTotalBytes: 4096, + }; + config.set(LOCAL_JSONL_SETTING, custom); + expect(readLocalJsonlConfig(config)).toEqual(custom); + }); + + it.each([ + ["zero", 0], + ["a negative", -1], + ["NaN", Number.NaN], + ["a numeric string", "100"], + ["a boolean", true], + ])("falls back per-field when a value is %s", (_, bad) => { + config.set(LOCAL_JSONL_SETTING, { flushIntervalMs: bad }); + expect(readLocalJsonlConfig(config).flushIntervalMs).toBe( + LOCAL_JSONL_DEFAULTS.flushIntervalMs, + ); + }); + + it("merges valid fields with defaults for invalid ones", () => { + config.set(LOCAL_JSONL_SETTING, { + flushIntervalMs: 5_000, + flushBatchSize: -1, + }); + expect(readLocalJsonlConfig(config)).toEqual({ + ...LOCAL_JSONL_DEFAULTS, + flushIntervalMs: 5_000, + }); + }); + + it("returns bufferLimit and flushBatchSize as written, without clamping", () => { + config.set(LOCAL_JSONL_SETTING, { + flushBatchSize: 200, + bufferLimit: 50, + }); + expect(readLocalJsonlConfig(config)).toMatchObject({ + flushBatchSize: 200, + bufferLimit: 50, + }); + }); + }); +}); diff --git a/test/unit/telemetry/sinks/localJsonlSink.test.ts b/test/unit/telemetry/sinks/localJsonlSink.test.ts index cae42e5c..30d96587 100644 --- a/test/unit/telemetry/sinks/localJsonlSink.test.ts +++ b/test/unit/telemetry/sinks/localJsonlSink.test.ts @@ -10,7 +10,10 @@ import { type MockInstance, } from "vitest"; -import { type LocalJsonlConfig } from "@/settings/telemetry"; +import { + LOCAL_JSONL_SETTING, + type LocalJsonlConfig, +} from "@/settings/telemetry"; import { LocalJsonlSink } from "@/telemetry/sinks/localJsonlSink"; import { @@ -27,7 +30,6 @@ vi.mock("node:fs/promises", async () => { return memfs.fs.promises; }); -const SETTING_NAME = "coder.telemetry.localJsonl"; const BASE_DIR = "/telemetry"; const SESSION_ID = "12345678-aaaa-bbbb-cccc-dddddddddddd"; const SESSION_SLUG = "12345678"; @@ -76,7 +78,10 @@ describe("LocalJsonlSink", () => { config: Partial = {}, sessionId = SESSION_ID, ) { - provider.set(SETTING_NAME, { flushIntervalMs: 1_000_000, ...config }); + provider.set(LOCAL_JSONL_SETTING, { + flushIntervalMs: 1_000_000, + ...config, + }); const logger = createMockLogger(); const sink = LocalJsonlSink.start({ baseDir: BASE_DIR, sessionId }, logger); active.push(sink); @@ -148,6 +153,35 @@ describe("LocalJsonlSink", () => { ]); }); + it("emits one overflow warning per burst regardless of flush outcome", async () => { + const { sink, logger, makeEvent } = setup({ + bufferLimit: 5, + flushBatchSize: 10_000, + }); + const overflowWarnings = (): number => + vi.mocked(logger.warn).mock.calls.filter((c) => { + const arg = c[0]; + return typeof arg === "string" && arg.includes("buffer overflow"); + }).length; + const overflowBuffer = (): void => { + for (let i = 0; i < 8; i++) { + sink.write(makeEvent()); + } + }; + + overflowBuffer(); + await sink.flush(); + + vi.spyOn(fsPromises, "appendFile").mockRejectedValueOnce(new Error("boom")); + overflowBuffer(); + await sink.flush(); + + overflowBuffer(); + await sink.flush(); + + expect(overflowWarnings()).toBe(3); + }); + it("flushes pending events on dispose", async () => { const { sink, makeEvent } = setup(); sink.write(makeEvent()); @@ -158,6 +192,18 @@ describe("LocalJsonlSink", () => { expect(readJsonl(todaysFile())).toHaveLength(2); }); + it("ignores writes after dispose", async () => { + const { sink, makeEvent } = setup(); + sink.write(makeEvent()); + sink.write(makeEvent()); + await sink.dispose(); + + sink.write(makeEvent()); + sink.write(makeEvent()); + + expect(readJsonl(todaysFile())).toHaveLength(2); + }); + it("rotates to a numbered segment once maxFileBytes is exceeded", async () => { // A serialized event is around 400 bytes, so 900 holds 2 events but not 3. const { sink, makeEvent } = setup({ maxFileBytes: 900 }); @@ -229,6 +275,27 @@ describe("LocalJsonlSink", () => { ); }); + it("keeps deleting until total size is under maxTotalBytes", async () => { + const dayMs = 24 * 60 * 60 * 1000; + const big = "x".repeat(2000); + vol.fromJSON({ + [`${BASE_DIR}/telemetry-2026-04-01-a.jsonl`]: big, + [`${BASE_DIR}/telemetry-2026-04-02-b.jsonl`]: big, + [`${BASE_DIR}/telemetry-2026-04-03-c.jsonl`]: big, + }); + setMtimeAgo(`${BASE_DIR}/telemetry-2026-04-01-a.jsonl`, 5 * dayMs); + setMtimeAgo(`${BASE_DIR}/telemetry-2026-04-02-b.jsonl`, 4 * dayMs); + setMtimeAgo(`${BASE_DIR}/telemetry-2026-04-03-c.jsonl`, 3 * dayMs); + + setup({ maxAgeDays: 365, maxTotalBytes: 2500 }); + + await vi.waitFor(() => + expect(vol.readdirSync(BASE_DIR)).toEqual([ + "telemetry-2026-04-03-c.jsonl", + ]), + ); + }); + it("emits valid snake_case JSONL with optional fields when set, omitting when not", async () => { const { sink, makeEvent } = setup(); @@ -333,7 +400,7 @@ describe("LocalJsonlSink", () => { sink.write(makeEvent()); expect(vol.existsSync(todaysFile())).toBe(false); - provider.set(SETTING_NAME, { + provider.set(LOCAL_JSONL_SETTING, { flushIntervalMs: 1_000_000, flushBatchSize: 3, }); diff --git a/test/unit/util/fileCleanup.test.ts b/test/unit/util/fileCleanup.test.ts index 8d53ef19..1dadde6b 100644 --- a/test/unit/util/fileCleanup.test.ts +++ b/test/unit/util/fileCleanup.test.ts @@ -1,4 +1,5 @@ import { vol } from "memfs"; +import * as fsPromises from "node:fs/promises"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { cleanupFiles } from "@/util/fileCleanup"; @@ -91,4 +92,30 @@ describe("cleanupFiles", () => { expect(vol.readdirSync("/d")).toEqual([]); }); + + it("does not throw when readdir fails for reasons other than ENOENT", async () => { + const { logger } = setup(); + const err = Object.assign(new Error("denied"), { code: "EACCES" }); + vi.spyOn(fsPromises, "readdir").mockRejectedValueOnce(err); + + const pick = vi.fn(() => []); + await expect( + cleanupFiles("/d", logger, { fileType: "thing", pick }), + ).resolves.toBeUndefined(); + expect(pick).not.toHaveBeenCalled(); + }); + + it("clamps pick names to their basename so unlinks cannot escape the directory", async () => { + const { logger } = setup({ + "/d/inside.txt": "x", + "/outside.txt": "y", + }); + + await cleanupFiles("/d", logger, { + fileType: "thing", + pick: () => [{ name: "../outside.txt" }], + }); + + expect(vol.existsSync("/outside.txt")).toBe(true); + }); }); From 85b3cbc8d6359d3261f2c615edb47a4d380867a3 Mon Sep 17 00:00:00 2001 From: Ehab Younes Date: Tue, 5 May 2026 12:40:37 +0300 Subject: [PATCH 4/5] fix(telemetry): second round of review feedback on local JSONL sink - Apply rotation check after seeding from disk so an EH restart that resumes a full segment 0 immediately rotates instead of overrunning. - Skip files this session is writing to in cleanup, so a low maxTotalBytes cannot delete events the sink just wrote. - Enforce per-field minimums in readLocalJsonlConfig matching the package.json schema, so direct settings.json edits below the floor fall back to defaults instead of producing pathological values. - Tighten minimums where the schema floor was meaninglessly low (bufferLimit 1 -> 10, maxFileBytes 1024 -> 4096, maxTotalBytes 1024 -> 4096). - Re-schedule the flush timer when flushIntervalMs changes so a config update takes effect immediately rather than waiting up to one full interval. - Drop the dead .catch in #scheduleNextFlush; flush() never rejects. - Distinguish ENOENT in statBytes; log other errors at debug. - Make LocalJsonlConfig fields readonly; un-export MINIMUMS. - Add tests for: oversized single payload (rotation guard), the bufferLimit-too-small warning, and the third-caller coalesce invariant (p2 === p3). --- package.json | 6 +- src/settings/telemetry.ts | 62 +++++++++--------- src/telemetry/sinks/localJsonlSink.ts | 65 ++++++++++++------- test/unit/settings/telemetry.test.ts | 4 +- .../telemetry/sinks/localJsonlSink.test.ts | 47 +++++++++++--- 5 files changed, 113 insertions(+), 71 deletions(-) diff --git a/package.json b/package.json index 098f36fd..e389916f 100644 --- a/package.json +++ b/package.json @@ -225,13 +225,13 @@ }, "bufferLimit": { "type": "number", - "minimum": 1, + "minimum": 10, "default": 500, "markdownDescription": "Maximum number of events held in memory. Oldest events are dropped on overflow. Should be at least `flushBatchSize`." }, "maxFileBytes": { "type": "number", - "minimum": 1024, + "minimum": 4096, "default": 5242880, "markdownDescription": "Maximum size in bytes of a single JSONL file before rotating to a new segment." }, @@ -243,7 +243,7 @@ }, "maxTotalBytes": { "type": "number", - "minimum": 1024, + "minimum": 4096, "default": 104857600, "markdownDescription": "Total bytes across all telemetry files. Oldest files are deleted at activation until under the cap." } diff --git a/src/settings/telemetry.ts b/src/settings/telemetry.ts index f35c1132..7b32dff7 100644 --- a/src/settings/telemetry.ts +++ b/src/settings/telemetry.ts @@ -14,12 +14,12 @@ export function readTelemetryLevel( } export interface LocalJsonlConfig { - flushIntervalMs: number; - flushBatchSize: number; - bufferLimit: number; - maxFileBytes: number; - maxAgeDays: number; - maxTotalBytes: number; + readonly flushIntervalMs: number; + readonly flushBatchSize: number; + readonly bufferLimit: number; + readonly maxFileBytes: number; + readonly maxAgeDays: number; + readonly maxTotalBytes: number; } export const LOCAL_JSONL_DEFAULTS: LocalJsonlConfig = { @@ -31,39 +31,39 @@ export const LOCAL_JSONL_DEFAULTS: LocalJsonlConfig = { maxTotalBytes: 100 * 1024 * 1024, }; -/** Reads the local JSONL sink config, defaulting any missing or invalid - * field. Each field must be a positive number to override the default. */ +// Mirrors the schema minimums in package.json. +const MINIMUMS: LocalJsonlConfig = { + flushIntervalMs: 1000, + flushBatchSize: 1, + bufferLimit: 10, + maxFileBytes: 4096, + maxAgeDays: 1, + maxTotalBytes: 4096, +}; + +/** Missing or below-minimum fields fall back to the default. */ export function readLocalJsonlConfig( cfg: Pick, ): LocalJsonlConfig { const raw = cfg.get(LOCAL_JSONL_SETTING); const obj = raw && typeof raw === "object" ? (raw as Record) : {}; + const read = (key: keyof LocalJsonlConfig): number => + numberAtLeast(obj[key], MINIMUMS[key], LOCAL_JSONL_DEFAULTS[key]); return { - flushIntervalMs: positiveNumber( - obj.flushIntervalMs, - LOCAL_JSONL_DEFAULTS.flushIntervalMs, - ), - flushBatchSize: positiveNumber( - obj.flushBatchSize, - LOCAL_JSONL_DEFAULTS.flushBatchSize, - ), - bufferLimit: positiveNumber( - obj.bufferLimit, - LOCAL_JSONL_DEFAULTS.bufferLimit, - ), - maxFileBytes: positiveNumber( - obj.maxFileBytes, - LOCAL_JSONL_DEFAULTS.maxFileBytes, - ), - maxAgeDays: positiveNumber(obj.maxAgeDays, LOCAL_JSONL_DEFAULTS.maxAgeDays), - maxTotalBytes: positiveNumber( - obj.maxTotalBytes, - LOCAL_JSONL_DEFAULTS.maxTotalBytes, - ), + flushIntervalMs: read("flushIntervalMs"), + flushBatchSize: read("flushBatchSize"), + bufferLimit: read("bufferLimit"), + maxFileBytes: read("maxFileBytes"), + maxAgeDays: read("maxAgeDays"), + maxTotalBytes: read("maxTotalBytes"), }; } -function positiveNumber(value: unknown, fallback: number): number { - return typeof value === "number" && value > 0 ? value : fallback; +function numberAtLeast( + value: unknown, + minimum: number, + fallback: number, +): number { + return typeof value === "number" && value >= minimum ? value : fallback; } diff --git a/src/telemetry/sinks/localJsonlSink.ts b/src/telemetry/sinks/localJsonlSink.ts index 06fc71f7..ff4a24a5 100644 --- a/src/telemetry/sinks/localJsonlSink.ts +++ b/src/telemetry/sinks/localJsonlSink.ts @@ -161,28 +161,34 @@ export class LocalJsonlSink implements TelemetrySink, vscode.Disposable { return; } warnIfBufferTooSmall(next, this.#logger); + const intervalChanged = + next.flushIntervalMs !== this.#config.flushIntervalMs; this.#config = next; + if (intervalChanged) { + this.#rescheduleFlush(); + } }, ); } - // Chained on completion (not setInterval) so flushes never overlap or pile up. + // Self-rescheduling timer so flushes can never overlap or pile up. #scheduleNextFlush(): void { if (this.#disposed) { return; } this.#flushTimer = setTimeout(() => { - this.flush() - .catch((err) => { - this.#logger.warn( - `Telemetry sink '${this.name}' scheduled flush failed`, - err, - ); - }) - .finally(() => this.#scheduleNextFlush()); + void this.flush().finally(() => this.#scheduleNextFlush()); }, this.#config.flushIntervalMs); } + #rescheduleFlush(): void { + if (this.#flushTimer) { + clearTimeout(this.#flushTimer); + this.#flushTimer = null; + } + this.#scheduleNextFlush(); + } + async #doFlush(): Promise { if (this.#buffer.length === 0) { return; @@ -205,7 +211,6 @@ export class LocalJsonlSink implements TelemetrySink, vscode.Disposable { } async #append(payload: string, payloadBytes: number): Promise { - // mkdir is idempotent with recursive:true; cheap enough to do per flush. await fs.mkdir(this.#baseDir, { recursive: true }); const next = await this.#nextFile(payloadBytes); await fs.appendFile(this.#segmentPath(next), payload, "utf8"); @@ -214,19 +219,22 @@ export class LocalJsonlSink implements TelemetrySink, vscode.Disposable { async #nextFile(payloadSize: number): Promise { const today = todayUtc(); - if (this.#current.date !== today) { - // Seed from disk: an Extension Host restart may have left bytes in - // today's segment 0 from earlier in this session. - const target = this.#segmentPath({ date: today, segment: 0 }); - const existing = await statBytes(target); - return { date: today, segment: 0, size: existing }; - } - const wouldExceed = - this.#current.size + payloadSize > this.#config.maxFileBytes; - if (this.#current.size > 0 && wouldExceed) { - return { date: today, segment: this.#current.segment + 1, size: 0 }; + const seeded = + this.#current.date === today + ? this.#current + : await this.#seedFromDisk(today); + const wouldExceed = seeded.size + payloadSize > this.#config.maxFileBytes; + if (seeded.size > 0 && wouldExceed) { + return { date: today, segment: seeded.segment + 1, size: 0 }; } - return this.#current; + return seeded; + } + + /** Picks up bytes left in segment 0 by a prior Extension Host activation in the same VS Code session. */ + async #seedFromDisk(today: string): Promise { + const target = this.#segmentPath({ date: today, segment: 0 }); + const size = await statBytes(target, this.#logger); + return { date: today, segment: 0, size }; } #segmentPath(file: { date: string; segment: number }): string { @@ -238,10 +246,14 @@ export class LocalJsonlSink implements TelemetrySink, vscode.Disposable { } async #cleanupOldFiles(): Promise { + // Skip files this session is writing to: cleanup must not race with our own appends. + const sessionMarker = `-${this.#sessionSlug}`; await cleanupFiles(this.#baseDir, this.#logger, { fileType: "telemetry file", match: (name) => - name.startsWith(FILE_PREFIX) && name.endsWith(FILE_SUFFIX), + name.startsWith(FILE_PREFIX) && + name.endsWith(FILE_SUFFIX) && + !name.includes(sessionMarker), pick: pickByAgeAndSize( this.#config.maxAgeDays * MS_PER_DAY, this.#config.maxTotalBytes, @@ -278,10 +290,13 @@ function pickByAgeAndSize(maxAgeMs: number, maxTotalBytes: number) { }; } -async function statBytes(target: string): Promise { +async function statBytes(target: string, logger: Logger): Promise { try { return (await fs.stat(target)).size; - } catch { + } catch (err) { + if ((err as NodeJS.ErrnoException).code !== "ENOENT") { + logger.debug(`stat ${target} failed; treating size as 0`, err); + } return 0; } } diff --git a/test/unit/settings/telemetry.test.ts b/test/unit/settings/telemetry.test.ts index 3a728eb5..7fb2d35b 100644 --- a/test/unit/settings/telemetry.test.ts +++ b/test/unit/settings/telemetry.test.ts @@ -53,9 +53,9 @@ describe("telemetry settings", () => { flushIntervalMs: 1_000, flushBatchSize: 10, bufferLimit: 50, - maxFileBytes: 1024, + maxFileBytes: 8192, maxAgeDays: 7, - maxTotalBytes: 4096, + maxTotalBytes: 8192, }; config.set(LOCAL_JSONL_SETTING, custom); expect(readLocalJsonlConfig(config)).toEqual(custom); diff --git a/test/unit/telemetry/sinks/localJsonlSink.test.ts b/test/unit/telemetry/sinks/localJsonlSink.test.ts index 30d96587..d96f0c9a 100644 --- a/test/unit/telemetry/sinks/localJsonlSink.test.ts +++ b/test/unit/telemetry/sinks/localJsonlSink.test.ts @@ -139,23 +139,34 @@ describe("LocalJsonlSink", () => { it("drops the oldest events when the buffer exceeds bufferLimit", async () => { const { sink, makeEvent } = setup({ - bufferLimit: 5, + bufferLimit: 10, flushBatchSize: 10_000, }); - for (let i = 0; i < 8; i++) { + for (let i = 0; i < 13; i++) { sink.write(makeEvent()); } await sink.flush(); expect(readJsonl(todaysFile()).map((l) => l.event_sequence)).toEqual([ - 3, 4, 5, 6, 7, + 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, ]); }); + it("warns when bufferLimit is below flushBatchSize", () => { + const { logger } = setup({ bufferLimit: 10, flushBatchSize: 100 }); + + const warned = vi + .mocked(logger.warn) + .mock.calls.some( + (c) => typeof c[0] === "string" && c[0].includes("is below"), + ); + expect(warned).toBe(true); + }); + it("emits one overflow warning per burst regardless of flush outcome", async () => { const { sink, logger, makeEvent } = setup({ - bufferLimit: 5, + bufferLimit: 10, flushBatchSize: 10_000, }); const overflowWarnings = (): number => @@ -164,7 +175,7 @@ describe("LocalJsonlSink", () => { return typeof arg === "string" && arg.includes("buffer overflow"); }).length; const overflowBuffer = (): void => { - for (let i = 0; i < 8; i++) { + for (let i = 0; i < 13; i++) { sink.write(makeEvent()); } }; @@ -205,11 +216,13 @@ describe("LocalJsonlSink", () => { }); it("rotates to a numbered segment once maxFileBytes is exceeded", async () => { - // A serialized event is around 400 bytes, so 900 holds 2 events but not 3. - const { sink, makeEvent } = setup({ maxFileBytes: 900 }); + // Padded events are ~2000 bytes; 4500 holds 2 events but not 3. + const { sink, makeEvent } = setup({ maxFileBytes: 4500 }); + const padded = (): TelemetryEvent => + makeEvent({ properties: { pad: "x".repeat(1500) } }); for (let i = 0; i < 3; i++) { - sink.write(makeEvent()); + sink.write(padded()); await sink.flush(); } @@ -217,6 +230,18 @@ describe("LocalJsonlSink", () => { expect(readJsonl(todaysFile(SESSION_SLUG, 1))).toHaveLength(1); }); + it("keeps a single oversized payload in segment 0 instead of rotating", async () => { + // Single event larger than maxFileBytes. Without the `size > 0` + // guard, every event would skip segment 0 and start at .1. + const { sink, makeEvent } = setup({ maxFileBytes: 4096 }); + + sink.write(makeEvent({ properties: { huge: "x".repeat(5000) } })); + await sink.flush(); + + expect(readJsonl(todaysFile(SESSION_SLUG, 0))).toHaveLength(1); + expect(vol.existsSync(todaysFile(SESSION_SLUG, 1))).toBe(false); + }); + it("starts a fresh file on UTC date rollover", async () => { vi.useFakeTimers(); vi.setSystemTime(new Date("2026-05-04T23:59:00.000Z")); @@ -277,7 +302,7 @@ describe("LocalJsonlSink", () => { it("keeps deleting until total size is under maxTotalBytes", async () => { const dayMs = 24 * 60 * 60 * 1000; - const big = "x".repeat(2000); + const big = "x".repeat(3000); vol.fromJSON({ [`${BASE_DIR}/telemetry-2026-04-01-a.jsonl`]: big, [`${BASE_DIR}/telemetry-2026-04-02-b.jsonl`]: big, @@ -287,7 +312,7 @@ describe("LocalJsonlSink", () => { setMtimeAgo(`${BASE_DIR}/telemetry-2026-04-02-b.jsonl`, 4 * dayMs); setMtimeAgo(`${BASE_DIR}/telemetry-2026-04-03-c.jsonl`, 3 * dayMs); - setup({ maxAgeDays: 365, maxTotalBytes: 2500 }); + setup({ maxAgeDays: 365, maxTotalBytes: 5000 }); await vi.waitFor(() => expect(vol.readdirSync(BASE_DIR)).toEqual([ @@ -383,6 +408,8 @@ describe("LocalJsonlSink", () => { sink.write(makeEvent()); const p2 = sink.flush(); const p3 = sink.flush(); + // Third caller while one is queued must share the queued promise. + expect(p3).toBe(p2); resolveFirst(); await Promise.all([p1, p2, p3]); From 405c5d191cd370009b56e5e725bd756e888c0eaa Mon Sep 17 00:00:00 2001 From: Ehab Younes Date: Wed, 6 May 2026 23:54:42 +0300 Subject: [PATCH 5/5] refactor(telemetry): self-review polish on local sink MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rename setting `coder.telemetry.localJsonl` → `coder.telemetry.local` (aligns with the level value; drops format from the user-facing key). - Rename internals: LocalSinkConfig / LOCAL_SINK_SETTING / readLocalSinkConfig. - fileCleanup: rename options for clarity — fileType → label, match → filter, pick → select. Update sshProcess.ts caller. - pathResolver#getTelemetryPath: trim oversized comment. - localJsonlSink: fold reschedule into scheduleNextFlush; use spread-style optional fields in serializeEvent for consistency with service.ts. - package.json: tighten setting descriptions, drop confusing "at least local" wording. - Tests: harden coalescing test with a deterministic appendFile barrier; switch warning matcher to stringContaining; rename symbols. --- package.json | 14 +-- src/core/pathResolver.ts | 5 +- src/remote/sshProcess.ts | 12 +-- src/settings/telemetry.ts | 41 ++++--- src/telemetry/sinks/localJsonlSink.ts | 101 ++++++++---------- src/util/fileCleanup.ts | 37 ++++--- test/unit/settings/telemetry.test.ts | 34 +++--- .../telemetry/sinks/localJsonlSink.test.ts | 85 +++++++-------- test/unit/util/fileCleanup.test.ts | 38 +++---- 9 files changed, 176 insertions(+), 191 deletions(-) diff --git a/package.json b/package.json index e389916f..f2b9bda1 100644 --- a/package.json +++ b/package.json @@ -206,8 +206,8 @@ "telemetry" ] }, - "coder.telemetry.localJsonl": { - "markdownDescription": "Tunables for the local JSONL telemetry sink. Active when `#coder.telemetry.level#` is at least `local`. Events are written to JSONL files under the extension's global storage directory. Missing or invalid fields fall back to defaults.", + "coder.telemetry.local": { + "markdownDescription": "Tunables for the local telemetry sink, which writes events as JSON Lines under the extension's global storage. Used when `#coder.telemetry.level#` is `local`. Missing or invalid fields fall back to defaults.", "type": "object", "additionalProperties": false, "properties": { @@ -215,13 +215,13 @@ "type": "number", "minimum": 1000, "default": 15000, - "markdownDescription": "Interval in milliseconds between scheduled flushes of the in-memory buffer to disk." + "markdownDescription": "How often, in milliseconds, buffered events are written to disk." }, "flushBatchSize": { "type": "number", "minimum": 1, "default": 100, - "markdownDescription": "Buffer size that triggers an early flush before the next scheduled interval." + "markdownDescription": "Number of buffered events that triggers an immediate write, ahead of the scheduled interval." }, "bufferLimit": { "type": "number", @@ -233,19 +233,19 @@ "type": "number", "minimum": 4096, "default": 5242880, - "markdownDescription": "Maximum size in bytes of a single JSONL file before rotating to a new segment." + "markdownDescription": "Maximum size, in bytes, of a single telemetry file before the sink rotates to a new one." }, "maxAgeDays": { "type": "number", "minimum": 1, "default": 30, - "markdownDescription": "Telemetry files older than this many days are deleted at activation." + "markdownDescription": "Telemetry files older than this many days are deleted when the extension activates." }, "maxTotalBytes": { "type": "number", "minimum": 4096, "default": 104857600, - "markdownDescription": "Total bytes across all telemetry files. Oldest files are deleted at activation until under the cap." + "markdownDescription": "Cap, in bytes, on the combined size of telemetry files. Oldest files are deleted on activation until the total is under the cap." } }, "default": { diff --git a/src/core/pathResolver.ts b/src/core/pathResolver.ts index 719e878e..ee94b268 100644 --- a/src/core/pathResolver.ts +++ b/src/core/pathResolver.ts @@ -50,10 +50,7 @@ export class PathResolver { } /** - * Return the directory where local telemetry JSONL files are written. - * - * Files within this directory are managed by `LocalJsonlSink`, which - * creates the directory on activation if it does not already exist. + * Return the directory where telemetry files are written. */ public getTelemetryPath(): string { return path.join(this.basePath, "telemetry"); diff --git a/src/remote/sshProcess.ts b/src/remote/sshProcess.ts index dceef3e3..a117fa71 100644 --- a/src/remote/sshProcess.ts +++ b/src/remote/sshProcess.ts @@ -90,9 +90,9 @@ export class SshProcessMonitor implements vscode.Disposable { logger: Logger, ): Promise { await cleanupFiles(networkInfoPath, logger, { - fileType: "network info file", - match: (name) => name.endsWith(".json"), - pick: (files, now) => files.filter((f) => now - f.mtime > maxAgeMs), + label: "network info file", + filter: (name) => name.endsWith(".json"), + select: (files, now) => files.filter((f) => now - f.mtime > maxAgeMs), }); } @@ -107,9 +107,9 @@ export class SshProcessMonitor implements vscode.Disposable { logger: Logger, ): Promise { await cleanupFiles(logDir, logger, { - fileType: "log file", - match: (name) => name.startsWith("coder-ssh") && name.endsWith(".log"), - pick: (files, now) => + label: "log file", + filter: (name) => name.startsWith("coder-ssh") && name.endsWith(".log"), + select: (files, now) => files .toSorted((a, b) => a.mtime - b.mtime) // oldest first .slice(0, -maxFilesToKeep) // keep the newest maxFilesToKeep diff --git a/src/settings/telemetry.ts b/src/settings/telemetry.ts index 7b32dff7..e082c615 100644 --- a/src/settings/telemetry.ts +++ b/src/settings/telemetry.ts @@ -3,9 +3,9 @@ import type { WorkspaceConfiguration } from "vscode"; import type { TelemetryLevel } from "../telemetry/event"; export const TELEMETRY_LEVEL_SETTING = "coder.telemetry.level"; -export const LOCAL_JSONL_SETTING = "coder.telemetry.localJsonl"; +export const LOCAL_SINK_SETTING = "coder.telemetry.local"; -/** Telemetry level. Falls back to `local` for any invalid value. */ +/** Telemetry level. Falls back to `local` for unknown or invalid values. */ export function readTelemetryLevel( cfg: Pick, ): TelemetryLevel { @@ -13,7 +13,7 @@ export function readTelemetryLevel( return value === "off" || value === "local" ? value : "local"; } -export interface LocalJsonlConfig { +export interface LocalSinkConfig { readonly flushIntervalMs: number; readonly flushBatchSize: number; readonly bufferLimit: number; @@ -22,7 +22,7 @@ export interface LocalJsonlConfig { readonly maxTotalBytes: number; } -export const LOCAL_JSONL_DEFAULTS: LocalJsonlConfig = { +export const LOCAL_SINK_DEFAULTS: LocalSinkConfig = { flushIntervalMs: 15_000, flushBatchSize: 100, bufferLimit: 500, @@ -31,8 +31,9 @@ export const LOCAL_JSONL_DEFAULTS: LocalJsonlConfig = { maxTotalBytes: 100 * 1024 * 1024, }; -// Mirrors the schema minimums in package.json. -const MINIMUMS: LocalJsonlConfig = { +// Defense in depth: VS Code does not enforce JSON schema at runtime, so users +// can drop in any value via settings.json. Mirrors the minimums in package.json. +const LOCAL_SINK_MINIMUMS: LocalSinkConfig = { flushIntervalMs: 1000, flushBatchSize: 1, bufferLimit: 10, @@ -41,15 +42,21 @@ const MINIMUMS: LocalJsonlConfig = { maxTotalBytes: 4096, }; -/** Missing or below-minimum fields fall back to the default. */ -export function readLocalJsonlConfig( +/** Per-field: missing, non-numeric, or below-minimum values fall back to defaults. */ +export function readLocalSinkConfig( cfg: Pick, -): LocalJsonlConfig { - const raw = cfg.get(LOCAL_JSONL_SETTING); +): LocalSinkConfig { + const raw = cfg.get(LOCAL_SINK_SETTING); const obj = - raw && typeof raw === "object" ? (raw as Record) : {}; - const read = (key: keyof LocalJsonlConfig): number => - numberAtLeast(obj[key], MINIMUMS[key], LOCAL_JSONL_DEFAULTS[key]); + raw && typeof raw === "object" && !Array.isArray(raw) + ? (raw as Record) + : {}; + const read = (key: keyof LocalSinkConfig): number => { + const value = obj[key]; + return typeof value === "number" && value >= LOCAL_SINK_MINIMUMS[key] + ? value + : LOCAL_SINK_DEFAULTS[key]; + }; return { flushIntervalMs: read("flushIntervalMs"), flushBatchSize: read("flushBatchSize"), @@ -59,11 +66,3 @@ export function readLocalJsonlConfig( maxTotalBytes: read("maxTotalBytes"), }; } - -function numberAtLeast( - value: unknown, - minimum: number, - fallback: number, -): number { - return typeof value === "number" && value >= minimum ? value : fallback; -} diff --git a/src/telemetry/sinks/localJsonlSink.ts b/src/telemetry/sinks/localJsonlSink.ts index ff4a24a5..3cfe94ad 100644 --- a/src/telemetry/sinks/localJsonlSink.ts +++ b/src/telemetry/sinks/localJsonlSink.ts @@ -4,9 +4,9 @@ import * as vscode from "vscode"; import { watchConfigurationChanges } from "../../configWatcher"; import { - LOCAL_JSONL_SETTING, - readLocalJsonlConfig, - type LocalJsonlConfig, + LOCAL_SINK_SETTING, + readLocalSinkConfig, + type LocalSinkConfig, } from "../../settings/telemetry"; import { cleanupFiles, @@ -37,7 +37,7 @@ interface CurrentFile { * own files (`telemetry-YYYY-MM-DD-{sessionId8}.jsonl` plus `.N.jsonl` size * segments), so concurrent windows cannot race on appends or rotation. * `write` is sync and never throws; disk I/O happens in `flush` and - * `dispose` and catches errors. Tunables come from `coder.telemetry.localJsonl` + * `dispose` and catches errors. Tunables come from `coder.telemetry.local` * and update reactively. */ export class LocalJsonlSink implements TelemetrySink, vscode.Disposable { @@ -48,7 +48,7 @@ export class LocalJsonlSink implements TelemetrySink, vscode.Disposable { readonly #sessionSlug: string; readonly #logger: Logger; readonly #buffer: string[] = []; - #config: LocalJsonlConfig; + #config: LocalSinkConfig; #configWatcher: vscode.Disposable | null = null; #flushTimer: NodeJS.Timeout | null = null; #flushChain: Promise = Promise.resolve(); @@ -60,7 +60,7 @@ export class LocalJsonlSink implements TelemetrySink, vscode.Disposable { private constructor( opts: LocalJsonlSinkOptions, logger: Logger, - config: LocalJsonlConfig, + config: LocalSinkConfig, ) { this.#baseDir = opts.baseDir; this.#sessionSlug = toSessionSlug(opts.sessionId); @@ -73,7 +73,7 @@ export class LocalJsonlSink implements TelemetrySink, vscode.Disposable { opts: LocalJsonlSinkOptions, logger: Logger, ): LocalJsonlSink { - const config = readLocalJsonlConfig(vscode.workspace.getConfiguration()); + const config = readLocalSinkConfig(vscode.workspace.getConfiguration()); warnIfBufferTooSmall(config, logger); const sink = new LocalJsonlSink(opts, logger, config); sink.#configWatcher = sink.#watchConfig(); @@ -148,14 +148,14 @@ export class LocalJsonlSink implements TelemetrySink, vscode.Disposable { return watchConfigurationChanges( [ { - setting: LOCAL_JSONL_SETTING, + setting: LOCAL_SINK_SETTING, getValue: () => - readLocalJsonlConfig(vscode.workspace.getConfiguration()), + readLocalSinkConfig(vscode.workspace.getConfiguration()), }, ], (changes) => { - const next = changes.get(LOCAL_JSONL_SETTING) as - | LocalJsonlConfig + const next = changes.get(LOCAL_SINK_SETTING) as + | LocalSinkConfig | undefined; if (!next) { return; @@ -165,7 +165,7 @@ export class LocalJsonlSink implements TelemetrySink, vscode.Disposable { next.flushIntervalMs !== this.#config.flushIntervalMs; this.#config = next; if (intervalChanged) { - this.#rescheduleFlush(); + this.#scheduleNextFlush(); } }, ); @@ -173,6 +173,10 @@ export class LocalJsonlSink implements TelemetrySink, vscode.Disposable { // Self-rescheduling timer so flushes can never overlap or pile up. #scheduleNextFlush(): void { + if (this.#flushTimer) { + clearTimeout(this.#flushTimer); + this.#flushTimer = null; + } if (this.#disposed) { return; } @@ -181,14 +185,6 @@ export class LocalJsonlSink implements TelemetrySink, vscode.Disposable { }, this.#config.flushIntervalMs); } - #rescheduleFlush(): void { - if (this.#flushTimer) { - clearTimeout(this.#flushTimer); - this.#flushTimer = null; - } - this.#scheduleNextFlush(); - } - async #doFlush(): Promise { if (this.#buffer.length === 0) { return; @@ -249,12 +245,12 @@ export class LocalJsonlSink implements TelemetrySink, vscode.Disposable { // Skip files this session is writing to: cleanup must not race with our own appends. const sessionMarker = `-${this.#sessionSlug}`; await cleanupFiles(this.#baseDir, this.#logger, { - fileType: "telemetry file", - match: (name) => + label: "telemetry file", + filter: (name) => name.startsWith(FILE_PREFIX) && name.endsWith(FILE_SUFFIX) && !name.includes(sessionMarker), - pick: pickByAgeAndSize( + select: selectByAgeAndSize( this.#config.maxAgeDays * MS_PER_DAY, this.#config.maxTotalBytes, ), @@ -262,7 +258,7 @@ export class LocalJsonlSink implements TelemetrySink, vscode.Disposable { } } -function pickByAgeAndSize(maxAgeMs: number, maxTotalBytes: number) { +function selectByAgeAndSize(maxAgeMs: number, maxTotalBytes: number) { return ( files: FileCleanupCandidate[], now: number, @@ -305,7 +301,7 @@ function todayUtc(): string { return new Date().toISOString().slice(0, 10); } -function warnIfBufferTooSmall(config: LocalJsonlConfig, logger: Logger): void { +function warnIfBufferTooSmall(config: LocalSinkConfig, logger: Logger): void { if (config.bufferLimit < config.flushBatchSize) { logger.warn( `Telemetry sink '${SINK_NAME}' bufferLimit (${config.bufferLimit}) is below flushBatchSize (${config.flushBatchSize}); the batch-size flush trigger is unreachable and overflow will drop events instead. Raise bufferLimit or lower flushBatchSize.`, @@ -319,33 +315,30 @@ function toSessionSlug(sessionId: string): string { } function serializeEvent(event: TelemetryEvent): string { - const out: Record = { - event_id: event.eventId, - event_name: event.eventName, - timestamp: event.timestamp, - event_sequence: event.eventSequence, - context: { - extension_version: event.context.extensionVersion, - machine_id: event.context.machineId, - session_id: event.context.sessionId, - os_type: event.context.osType, - os_version: event.context.osVersion, - host_arch: event.context.hostArch, - platform_name: event.context.platformName, - platform_version: event.context.platformVersion, - deployment_url: event.context.deploymentUrl, - }, - properties: event.properties, - measurements: event.measurements, - }; - if (event.traceId !== undefined) { - out.trace_id = event.traceId; - } - if (event.parentEventId !== undefined) { - out.parent_event_id = event.parentEventId; - } - if (event.error !== undefined) { - out.error = event.error; - } - return JSON.stringify(out) + "\n"; + return ( + JSON.stringify({ + event_id: event.eventId, + event_name: event.eventName, + timestamp: event.timestamp, + event_sequence: event.eventSequence, + context: { + extension_version: event.context.extensionVersion, + machine_id: event.context.machineId, + session_id: event.context.sessionId, + os_type: event.context.osType, + os_version: event.context.osVersion, + host_arch: event.context.hostArch, + platform_name: event.context.platformName, + platform_version: event.context.platformVersion, + deployment_url: event.context.deploymentUrl, + }, + properties: event.properties, + measurements: event.measurements, + ...(event.traceId !== undefined && { trace_id: event.traceId }), + ...(event.parentEventId !== undefined && { + parent_event_id: event.parentEventId, + }), + ...(event.error !== undefined && { error: event.error }), + }) + "\n" + ); } diff --git a/src/util/fileCleanup.ts b/src/util/fileCleanup.ts index 263adbbe..c8804286 100644 --- a/src/util/fileCleanup.ts +++ b/src/util/fileCleanup.ts @@ -10,25 +10,28 @@ export interface FileCleanupCandidate { } export interface FileCleanupOptions { - /** Noun used in log messages, e.g. "telemetry file". */ - fileType: string; - /** Name-based filter applied before stat to skip unrelated entries. */ - match?: (name: string) => boolean; - /** Picks files to delete from the stat'd survivors of `match`. */ - pick: (files: FileCleanupCandidate[], now: number) => Array<{ name: string }>; + /** Label for log messages, e.g. "telemetry file". */ + label: string; + /** Cheap name-based predicate; non-matching entries are skipped before stat. */ + filter?: (name: string) => boolean; + /** From the stat'd survivors of `filter`, returns the files to delete. */ + select: ( + files: FileCleanupCandidate[], + now: number, + ) => Array<{ name: string }>; } /** - * Lists files in `dir`, filters by name, stats and unlinks the picks in - * parallel. ENOENT is swallowed so concurrent deletes are safe. Never - * throws; failures go to `logger.debug`. + * Lists files in `dir`, applies `filter` to names, stats the survivors, and + * unlinks whatever `select` returns. ENOENT is swallowed so concurrent + * deletes are safe. Never throws; failures go to `logger.debug`. */ export async function cleanupFiles( dir: string, logger: Logger, options: FileCleanupOptions, ): Promise { - const { fileType, match, pick } = options; + const { label, filter, select } = options; const now = Date.now(); let names: string[]; try { @@ -37,11 +40,11 @@ export async function cleanupFiles( // ENOENT just means the dir hasn't been created yet; anything else // (EACCES, EMFILE, ...) is a real failure worth surfacing. if ((error as NodeJS.ErrnoException).code !== "ENOENT") { - logger.debug(`Failed to read ${fileType} directory ${dir}`, error); + logger.debug(`Failed to read ${label} directory ${dir}`, error); } return; } - const candidates = match ? names.filter(match) : names; + const candidates = filter ? names.filter(filter) : names; const withStats = await Promise.all( candidates.map(async (name) => { @@ -54,28 +57,28 @@ export async function cleanupFiles( }; } catch (error) { if ((error as NodeJS.ErrnoException).code !== "ENOENT") { - logger.debug(`Failed to stat ${fileType} ${name}`, error); + logger.debug(`Failed to stat ${label} ${name}`, error); } return null; } }), ); - const toDelete = pick( + const toDelete = select( withStats.filter((f) => f !== null), now, ); const deleted = await Promise.all( toDelete.map(async (file) => { - // Basename only; never let `pick` escape `dir`. + // Basename only; never let `select` escape `dir`. const safeName = path.basename(file.name); try { await fs.unlink(path.join(dir, safeName)); return safeName; } catch (error) { if ((error as NodeJS.ErrnoException).code !== "ENOENT") { - logger.debug(`Failed to delete ${fileType} ${safeName}`, error); + logger.debug(`Failed to delete ${label} ${safeName}`, error); } return null; } @@ -85,7 +88,7 @@ export async function cleanupFiles( const successful = deleted.filter((name) => name !== null); if (successful.length > 0) { logger.debug( - `Cleaned up ${successful.length} ${fileType}(s): ${successful.join(", ")}`, + `Cleaned up ${successful.length} ${label}(s): ${successful.join(", ")}`, ); } } diff --git a/test/unit/settings/telemetry.test.ts b/test/unit/settings/telemetry.test.ts index 7fb2d35b..cfc7f2c6 100644 --- a/test/unit/settings/telemetry.test.ts +++ b/test/unit/settings/telemetry.test.ts @@ -1,10 +1,10 @@ import { beforeEach, describe, expect, it } from "vitest"; import { - LOCAL_JSONL_DEFAULTS, - LOCAL_JSONL_SETTING, + LOCAL_SINK_DEFAULTS, + LOCAL_SINK_SETTING, TELEMETRY_LEVEL_SETTING, - readLocalJsonlConfig, + readLocalSinkConfig, readTelemetryLevel, } from "@/settings/telemetry"; @@ -33,9 +33,9 @@ describe("telemetry settings", () => { }); }); - describe("readLocalJsonlConfig", () => { + describe("readLocalSinkConfig", () => { it("returns defaults when unset", () => { - expect(readLocalJsonlConfig(config)).toEqual(LOCAL_JSONL_DEFAULTS); + expect(readLocalSinkConfig(config)).toEqual(LOCAL_SINK_DEFAULTS); }); it.each([ @@ -44,8 +44,8 @@ describe("telemetry settings", () => { ["null", null], ["an array", [1, 2]], ])("returns defaults when the raw value is %s", (_, raw) => { - config.set(LOCAL_JSONL_SETTING, raw); - expect(readLocalJsonlConfig(config)).toEqual(LOCAL_JSONL_DEFAULTS); + config.set(LOCAL_SINK_SETTING, raw); + expect(readLocalSinkConfig(config)).toEqual(LOCAL_SINK_DEFAULTS); }); it("accepts a fully-specified object", () => { @@ -57,8 +57,8 @@ describe("telemetry settings", () => { maxAgeDays: 7, maxTotalBytes: 8192, }; - config.set(LOCAL_JSONL_SETTING, custom); - expect(readLocalJsonlConfig(config)).toEqual(custom); + config.set(LOCAL_SINK_SETTING, custom); + expect(readLocalSinkConfig(config)).toEqual(custom); }); it.each([ @@ -68,29 +68,29 @@ describe("telemetry settings", () => { ["a numeric string", "100"], ["a boolean", true], ])("falls back per-field when a value is %s", (_, bad) => { - config.set(LOCAL_JSONL_SETTING, { flushIntervalMs: bad }); - expect(readLocalJsonlConfig(config).flushIntervalMs).toBe( - LOCAL_JSONL_DEFAULTS.flushIntervalMs, + config.set(LOCAL_SINK_SETTING, { flushIntervalMs: bad }); + expect(readLocalSinkConfig(config).flushIntervalMs).toBe( + LOCAL_SINK_DEFAULTS.flushIntervalMs, ); }); it("merges valid fields with defaults for invalid ones", () => { - config.set(LOCAL_JSONL_SETTING, { + config.set(LOCAL_SINK_SETTING, { flushIntervalMs: 5_000, flushBatchSize: -1, }); - expect(readLocalJsonlConfig(config)).toEqual({ - ...LOCAL_JSONL_DEFAULTS, + expect(readLocalSinkConfig(config)).toEqual({ + ...LOCAL_SINK_DEFAULTS, flushIntervalMs: 5_000, }); }); it("returns bufferLimit and flushBatchSize as written, without clamping", () => { - config.set(LOCAL_JSONL_SETTING, { + config.set(LOCAL_SINK_SETTING, { flushBatchSize: 200, bufferLimit: 50, }); - expect(readLocalJsonlConfig(config)).toMatchObject({ + expect(readLocalSinkConfig(config)).toMatchObject({ flushBatchSize: 200, bufferLimit: 50, }); diff --git a/test/unit/telemetry/sinks/localJsonlSink.test.ts b/test/unit/telemetry/sinks/localJsonlSink.test.ts index d96f0c9a..55da1ddf 100644 --- a/test/unit/telemetry/sinks/localJsonlSink.test.ts +++ b/test/unit/telemetry/sinks/localJsonlSink.test.ts @@ -1,19 +1,8 @@ import { vol } from "memfs"; import * as fsPromises from "node:fs/promises"; -import { - afterEach, - beforeEach, - describe, - expect, - it, - vi, - type MockInstance, -} from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import { - LOCAL_JSONL_SETTING, - type LocalJsonlConfig, -} from "@/settings/telemetry"; +import { LOCAL_SINK_SETTING, type LocalSinkConfig } from "@/settings/telemetry"; import { LocalJsonlSink } from "@/telemetry/sinks/localJsonlSink"; import { @@ -75,10 +64,10 @@ describe("LocalJsonlSink", () => { }); function setup( - config: Partial = {}, + config: Partial = {}, sessionId = SESSION_ID, ) { - provider.set(LOCAL_JSONL_SETTING, { + provider.set(LOCAL_SINK_SETTING, { flushIntervalMs: 1_000_000, ...config, }); @@ -156,12 +145,9 @@ describe("LocalJsonlSink", () => { it("warns when bufferLimit is below flushBatchSize", () => { const { logger } = setup({ bufferLimit: 10, flushBatchSize: 100 }); - const warned = vi - .mocked(logger.warn) - .mock.calls.some( - (c) => typeof c[0] === "string" && c[0].includes("is below"), - ); - expect(warned).toBe(true); + expect(logger.warn).toHaveBeenCalledWith( + expect.stringContaining("bufferLimit"), + ); }); it("emits one overflow warning per burst regardless of flush outcome", async () => { @@ -169,11 +155,6 @@ describe("LocalJsonlSink", () => { bufferLimit: 10, flushBatchSize: 10_000, }); - const overflowWarnings = (): number => - vi.mocked(logger.warn).mock.calls.filter((c) => { - const arg = c[0]; - return typeof arg === "string" && arg.includes("buffer overflow"); - }).length; const overflowBuffer = (): void => { for (let i = 0; i < 13; i++) { sink.write(makeEvent()); @@ -190,7 +171,12 @@ describe("LocalJsonlSink", () => { overflowBuffer(); await sink.flush(); - expect(overflowWarnings()).toBe(3); + const overflowWarnings = vi + .mocked(logger.warn) + .mock.calls.filter( + (c) => typeof c[0] === "string" && c[0].includes("buffer overflow"), + ); + expect(overflowWarnings).toHaveLength(3); }); it("flushes pending events on dispose", async () => { @@ -384,40 +370,47 @@ describe("LocalJsonlSink", () => { expect(readJsonl(todaysFile("ffeeddcc"))).toHaveLength(5); }); - it("coalesces concurrent flush requests into at most two appendFile calls", async () => { + it("coalesces concurrent flushes so events write exactly once", async () => { const { sink, makeEvent } = setup(); - let resolveFirst!: () => void; - const firstAppendDone = new Promise((r) => { - resolveFirst = r; + // Block the first append until we've enqueued more flushes; signal back + // when the in-flight write reaches `await appendFile` so the test can + // proceed deterministically without juggling microtasks. + let signalFirstStarted!: () => void; + const firstAppendStarted = new Promise((r) => { + signalFirstStarted = r; + }); + let releaseFirst!: () => void; + const firstAppendBlocked = new Promise((r) => { + releaseFirst = r; }); const realAppend = fsPromises.appendFile.bind(fsPromises); - const spy: MockInstance = vi + const spy = vi .spyOn(fsPromises, "appendFile") .mockImplementationOnce(async (target, data, opts) => { - await firstAppendDone; + signalFirstStarted(); + await firstAppendBlocked; return realAppend(target, data, opts); }); sink.write(makeEvent()); - const p1 = sink.flush(); - // Yield so doFlush #1 captures the buffer and reaches `await appendFile`. - await Promise.resolve(); - await Promise.resolve(); + const inFlight = sink.flush(); + await firstAppendStarted; sink.write(makeEvent()); - const p2 = sink.flush(); - const p3 = sink.flush(); - // Third caller while one is queued must share the queued promise. - expect(p3).toBe(p2); + sink.write(makeEvent()); + const queuedA = sink.flush(); + const queuedB = sink.flush(); - resolveFirst(); - await Promise.all([p1, p2, p3]); + releaseFirst(); + await Promise.all([inFlight, queuedA, queuedB]); - expect(spy).toHaveBeenCalledTimes(2); expect(readJsonl(todaysFile()).map((l) => l.event_sequence)).toEqual([ - 0, 1, + 0, 1, 2, ]); + // One in-flight + at most one queued; multiple flush() calls do not + // pile up into separate writes. + expect(spy.mock.calls.length).toBeLessThanOrEqual(2); }); it("picks up config changes reactively", async () => { @@ -427,7 +420,7 @@ describe("LocalJsonlSink", () => { sink.write(makeEvent()); expect(vol.existsSync(todaysFile())).toBe(false); - provider.set(LOCAL_JSONL_SETTING, { + provider.set(LOCAL_SINK_SETTING, { flushIntervalMs: 1_000_000, flushBatchSize: 3, }); diff --git a/test/unit/util/fileCleanup.test.ts b/test/unit/util/fileCleanup.test.ts index 1dadde6b..a765669b 100644 --- a/test/unit/util/fileCleanup.test.ts +++ b/test/unit/util/fileCleanup.test.ts @@ -31,22 +31,22 @@ describe("cleanupFiles", () => { it("does not throw when the directory is missing", async () => { const { logger } = setup(); await expect( - cleanupFiles("/nope", logger, { fileType: "thing", pick: () => [] }), + cleanupFiles("/nope", logger, { label: "thing", select: () => [] }), ).resolves.toBeUndefined(); }); - it("unlinks the files chosen by pick and leaves the rest", async () => { + it("unlinks the files chosen by select and leaves the rest", async () => { const { logger } = setup({ "/d/a": "1", "/d/b": "2", "/d/c": "3" }); await cleanupFiles("/d", logger, { - fileType: "thing", - pick: (files) => files.filter((f) => f.name !== "b"), + label: "thing", + select: (files) => files.filter((f) => f.name !== "b"), }); expect(vol.readdirSync("/d")).toEqual(["b"]); }); - it("exposes mtime, size, and the current time so pick can filter on them", async () => { + it("exposes mtime, size, and the current time so select can filter on them", async () => { const { logger } = setup({ "/d/old-big": "x".repeat(100), "/d/new-small": "x", @@ -55,15 +55,15 @@ describe("cleanupFiles", () => { vol.utimesSync("/d/old-big", 1, 1); await cleanupFiles("/d", logger, { - fileType: "thing", - pick: (files, now) => + label: "thing", + select: (files, now) => files.filter((f) => now - f.mtime > 1000 && f.size > 50), }); expect(vol.readdirSync("/d")).toEqual(["new-small"]); }); - it("only feeds pick the files matched by `match`", async () => { + it("only feeds select the files matched by `filter`", async () => { const { logger } = setup({ "/d/keep.json": "{}", "/d/skip.txt": "no", @@ -71,9 +71,9 @@ describe("cleanupFiles", () => { }); await cleanupFiles("/d", logger, { - fileType: "thing", - match: (n) => n.endsWith(".json"), - pick: (files) => files, + label: "thing", + filter: (n) => n.endsWith(".json"), + select: (files) => files, }); expect(vol.readdirSync("/d")).toEqual(["skip.txt"]); @@ -83,8 +83,8 @@ describe("cleanupFiles", () => { const { logger } = setup({ "/d/a": "1", "/d/b": "2" }); await cleanupFiles("/d", logger, { - fileType: "thing", - pick: (files) => { + label: "thing", + select: (files) => { vol.unlinkSync("/d/a"); return files; }, @@ -98,22 +98,22 @@ describe("cleanupFiles", () => { const err = Object.assign(new Error("denied"), { code: "EACCES" }); vi.spyOn(fsPromises, "readdir").mockRejectedValueOnce(err); - const pick = vi.fn(() => []); + const select = vi.fn(() => []); await expect( - cleanupFiles("/d", logger, { fileType: "thing", pick }), + cleanupFiles("/d", logger, { label: "thing", select }), ).resolves.toBeUndefined(); - expect(pick).not.toHaveBeenCalled(); + expect(select).not.toHaveBeenCalled(); }); - it("clamps pick names to their basename so unlinks cannot escape the directory", async () => { + it("clamps select names to their basename so unlinks cannot escape the directory", async () => { const { logger } = setup({ "/d/inside.txt": "x", "/outside.txt": "y", }); await cleanupFiles("/d", logger, { - fileType: "thing", - pick: () => [{ name: "../outside.txt" }], + label: "thing", + select: () => [{ name: "../outside.txt" }], }); expect(vol.existsSync("/outside.txt")).toBe(true);