From 1c4cef0453220e53b15584e7b9bff232381d3480 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Pierzcha=C5=82a?= Date: Wed, 10 Jun 2026 20:04:34 +0200 Subject: [PATCH] refactor: extract daemon progress parsing --- src/__tests__/daemon-client-progress.test.ts | 132 +++++++++++++++++++ src/daemon-client-progress.ts | 58 +++++++- src/daemon-client.ts | 57 +++----- 3 files changed, 203 insertions(+), 44 deletions(-) create mode 100644 src/__tests__/daemon-client-progress.test.ts diff --git a/src/__tests__/daemon-client-progress.test.ts b/src/__tests__/daemon-client-progress.test.ts new file mode 100644 index 000000000..19db7b3ab --- /dev/null +++ b/src/__tests__/daemon-client-progress.test.ts @@ -0,0 +1,132 @@ +import assert from 'node:assert/strict'; +import { EventEmitter } from 'node:events'; +import type { Socket } from 'node:net'; +import { test } from 'vitest'; +import type { DaemonRequest, DaemonResponse } from '../daemon/types.ts'; +import { readDaemonSocketProgressResponse } from '../daemon-client-progress.ts'; +import { AppError } from '../utils/errors.ts'; + +type MockSocket = EventEmitter & { + ended: boolean; + encoding?: string; + end: () => MockSocket; + setEncoding: (encoding: BufferEncoding) => MockSocket; +}; + +function createMockSocket(): MockSocket { + const socket = new EventEmitter() as MockSocket; + socket.ended = false; + socket.end = () => { + socket.ended = true; + socket.emit('close'); + return socket; + }; + socket.setEncoding = (encoding) => { + socket.encoding = encoding; + return socket; + }; + return socket; +} + +function readSocketProgressResponse( + socket: MockSocket, + req: DaemonRequest, +): Promise { + let settled = false; + return new Promise((resolve, reject) => { + readDaemonSocketProgressResponse(socket as unknown as Socket, { + req, + isSettled: () => settled, + clearTimeout: () => {}, + resolve: (response) => { + settled = true; + resolve(response); + }, + reject: (error) => { + settled = true; + reject(error); + }, + }); + }); +} + +test('readDaemonSocketProgressResponse parses split progress lines before response envelopes', async () => { + const socket = createMockSocket(); + const req: DaemonRequest = { + session: 'default', + command: 'test', + positionals: ['/tmp/replays'], + flags: {}, + token: 'secret', + meta: { requestId: 'req-socket-progress', requestProgress: 'replay-test' }, + }; + let stderr = ''; + const originalStderrWrite = process.stderr.write.bind(process.stderr); + + try { + (process.stderr as any).write = ((chunk: unknown) => { + stderr += String(chunk); + return true; + }) as typeof process.stderr.write; + + const responsePromise = readSocketProgressResponse(socket, req); + const progressLine = JSON.stringify({ + type: 'progress', + event: { + type: 'replay-test', + file: '/tmp/01-login.ad', + title: 'Login flow', + status: 'fail', + index: 1, + total: 2, + attempt: 1, + maxAttempts: 2, + durationMs: 1234, + retrying: true, + message: 'first attempt failed', + }, + }); + const responseLine = JSON.stringify({ + type: 'response', + response: { ok: true, data: { via: 'socket-progress' } }, + }); + + socket.emit('data', progressLine.slice(0, 24)); + socket.emit('data', `${progressLine.slice(24)}\n${responseLine}\n`); + socket.emit('data', '{not-json-after-settle}\n'); + + await assert.doesNotReject(responsePromise); + assert.deepEqual(await responsePromise, { ok: true, data: { via: 'socket-progress' } }); + assert.equal(socket.encoding, 'utf8'); + assert.equal(socket.ended, true); + assert.match(stderr, /FAIL "Login flow" attempt 1\/2 retrying \(1\.23s\)/); + assert.match(stderr, / first attempt failed/); + } finally { + process.stderr.write = originalStderrWrite; + } +}); + +test('readDaemonSocketProgressResponse rejects invalid response lines with request context', async () => { + const socket = createMockSocket(); + const req: DaemonRequest = { + session: 'default', + command: 'snapshot', + positionals: [], + flags: {}, + token: 'secret', + meta: { requestId: 'req-invalid-socket-progress' }, + }; + + const responsePromise = readSocketProgressResponse(socket, req); + socket.emit('data', '{not-json}\n'); + + await assert.rejects( + responsePromise, + (error) => + error instanceof AppError && + error.code === 'COMMAND_FAILED' && + error.message === 'Invalid daemon response' && + error.details?.requestId === 'req-invalid-socket-progress' && + error.details?.line === '{not-json}', + ); +}); diff --git a/src/daemon-client-progress.ts b/src/daemon-client-progress.ts index 40949129a..f2c6b4abf 100644 --- a/src/daemon-client-progress.ts +++ b/src/daemon-client-progress.ts @@ -1,6 +1,7 @@ import http from 'node:http'; +import type { Socket } from 'node:net'; import { AppError } from './utils/errors.ts'; -import type { DaemonRequest } from './daemon/types.ts'; +import type { DaemonRequest, DaemonResponse } from './daemon/types.ts'; import type { RequestProgressEvent } from './daemon/request-progress.ts'; import { consumeTextLines } from './utils/line-stream.ts'; import { formatReplayTestProgressEvent } from './cli-test-progress.ts'; @@ -10,7 +11,7 @@ import { shouldStreamRequestProgress, } from './daemon/request-progress-protocol.ts'; -export function writeRequestProgressEvent(event: RequestProgressEvent): void { +function writeRequestProgressEvent(event: RequestProgressEvent): void { const line = formatReplayTestProgressEvent(event); if (line) process.stderr.write(`${line}\n`); } @@ -27,6 +28,59 @@ export function shouldReadDaemonProgressStream( ); } +export function readDaemonSocketProgressResponse( + socket: Socket, + options: { + req: DaemonRequest; + isSettled: () => boolean; + resolve: (response: DaemonResponse) => void; + reject: (error: unknown) => void; + clearTimeout: () => void; + }, +): void { + const { req, isSettled, resolve, reject, clearTimeout } = options; + let buffer = ''; + + const rejectInvalidLine = (line: string, error: unknown) => { + clearTimeout(); + reject( + new AppError( + 'COMMAND_FAILED', + 'Invalid daemon response', + { + requestId: req.meta?.requestId, + line, + }, + error instanceof Error ? error : undefined, + ), + ); + }; + + socket.setEncoding('utf8'); + socket.on('data', (chunk) => { + if (isSettled()) return; + const parsed = consumeTextLines(buffer, chunk); + buffer = parsed.buffer; + for (const line of parsed.lines) { + try { + const message = JSON.parse(line) as unknown; + if (isDaemonProgressEnvelope(message)) { + writeRequestProgressEvent(message.event); + continue; + } + const response = isDaemonResponseEnvelope(message) ? message.response : message; + clearTimeout(); + resolve(response as DaemonResponse); + socket.end(); + return; + } catch (error) { + rejectInvalidLine(line, error); + return; + } + } + }); +} + export function readDaemonHttpProgressResponse( res: http.IncomingMessage, options: { diff --git a/src/daemon-client.ts b/src/daemon-client.ts index 981b8b815..1d3b8d09d 100644 --- a/src/daemon-client.ts +++ b/src/daemon-client.ts @@ -6,7 +6,6 @@ import os from 'node:os'; import path from 'node:path'; import { sleep } from './utils/timeouts.ts'; import { AppError, toAppErrorCode } from './utils/errors.ts'; -import { consumeTextLines } from './utils/line-stream.ts'; import { readNodeHttpResponseBody } from './utils/node-http.ts'; import type { DaemonRequest as SharedDaemonRequest, @@ -29,13 +28,9 @@ import { PUBLIC_COMMANDS } from './command-catalog.ts'; import { shellQuote } from './utils/shell-quote.ts'; import { readDaemonHttpProgressResponse, + readDaemonSocketProgressResponse, shouldReadDaemonProgressStream, - writeRequestProgressEvent, } from './daemon-client-progress.ts'; -import { - isDaemonProgressEnvelope, - isDaemonResponseEnvelope, -} from './daemon/request-progress-protocol.ts'; import { materializeRemoteArtifacts, prepareRemoteRequestArtifacts } from './daemon-artifacts.ts'; export { computeDaemonCodeSignature } from './daemon/code-signature.ts'; export { downloadRemoteArtifact } from './daemon-artifacts.ts'; @@ -1043,42 +1038,20 @@ async function sendSocketRequest( }, timeoutMs) : undefined; - let buffer = ''; - socket.setEncoding('utf8'); - socket.on('data', (chunk) => { - if (settled) return; - const parsed = consumeTextLines(buffer, chunk); - buffer = parsed.buffer; - for (const line of parsed.lines) { - try { - const message = JSON.parse(line) as unknown; - if (isDaemonProgressEnvelope(message)) { - writeRequestProgressEvent(message.event); - continue; - } - const response = isDaemonResponseEnvelope(message) ? message.response : message; - settled = true; - socket.end(); - if (timeoutHandle) clearTimeout(timeoutHandle); - resolve(response as DaemonResponse); - return; - } catch (err) { - settled = true; - if (timeoutHandle) clearTimeout(timeoutHandle); - reject( - new AppError( - 'COMMAND_FAILED', - 'Invalid daemon response', - { - requestId: req.meta?.requestId, - line, - }, - err instanceof Error ? err : undefined, - ), - ); - return; - } - } + readDaemonSocketProgressResponse(socket, { + req, + isSettled: () => settled, + clearTimeout: () => { + if (timeoutHandle) clearTimeout(timeoutHandle); + }, + resolve: (response) => { + settled = true; + resolve(response); + }, + reject: (error) => { + settled = true; + reject(error); + }, }); socket.on('error', (err) => {