From fa6288bd001478d5ffc6adc83293612417a22fbe Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 27 Mar 2026 20:20:04 +0000 Subject: [PATCH] =?UTF-8?q?test:=20log-buffer,=20RWLock=20concurrency,=20S?= =?UTF-8?q?SE=20chunk=20splitting=20=E2=80=94=2013=20new=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cover three untested risk areas: dbt ring buffer overflow (ties to #249 TUI corruption fix), reader-writer lock starvation ordering, and SSE event parsing across chunk boundaries and abort signals. Co-Authored-By: Claude Opus 4.6 (1M context) https://claude.ai/code/session_01153R7Dh9BMKiarndEUraBk --- packages/dbt-tools/test/log-buffer.test.ts | 51 ++++++++++++ .../opencode/test/control-plane/sse.test.ts | 79 +++++++++++++++++++ packages/opencode/test/util/lock.test.ts | 68 ++++++++++++++++ 3 files changed, 198 insertions(+) create mode 100644 packages/dbt-tools/test/log-buffer.test.ts diff --git a/packages/dbt-tools/test/log-buffer.test.ts b/packages/dbt-tools/test/log-buffer.test.ts new file mode 100644 index 0000000000..cfc69037cb --- /dev/null +++ b/packages/dbt-tools/test/log-buffer.test.ts @@ -0,0 +1,51 @@ +import { describe, test, expect, beforeEach } from "bun:test" +import { bufferLog, getRecentDbtLogs, clearDbtLogs } from "../src/log-buffer" + +describe("dbt log-buffer", () => { + beforeEach(() => { + clearDbtLogs() + }) + + test("buffers log messages in insertion order", () => { + bufferLog("first") + bufferLog("second") + bufferLog("third") + expect(getRecentDbtLogs()).toEqual(["first", "second", "third"]) + }) + + test("evicts oldest entries when buffer exceeds 100", () => { + for (let i = 0; i < 105; i++) { + bufferLog(`msg-${i}`) + } + const logs = getRecentDbtLogs() + expect(logs).toHaveLength(100) + expect(logs[0]).toBe("msg-5") + expect(logs[99]).toBe("msg-104") + }) + + test("clearDbtLogs empties the buffer", () => { + bufferLog("something") + clearDbtLogs() + expect(getRecentDbtLogs()).toEqual([]) + }) + + test("getRecentDbtLogs returns a copy, not a reference", () => { + bufferLog("original") + const copy = getRecentDbtLogs() + copy.push("injected") + expect(getRecentDbtLogs()).toEqual(["original"]) + }) + + test("handles empty buffer", () => { + expect(getRecentDbtLogs()).toEqual([]) + }) + + test("buffer stays at exactly 100 after repeated overflow", () => { + for (let i = 0; i < 200; i++) { + bufferLog(`msg-${i}`) + } + expect(getRecentDbtLogs()).toHaveLength(100) + expect(getRecentDbtLogs()[0]).toBe("msg-100") + expect(getRecentDbtLogs()[99]).toBe("msg-199") + }) +}) diff --git a/packages/opencode/test/control-plane/sse.test.ts b/packages/opencode/test/control-plane/sse.test.ts index 78a8341c0e..78f96cfe8e 100644 --- a/packages/opencode/test/control-plane/sse.test.ts +++ b/packages/opencode/test/control-plane/sse.test.ts @@ -53,4 +53,83 @@ describe("control-plane/sse", () => { }, ]) }) + + test("handles events split across chunk boundaries", async () => { + const events: unknown[] = [] + const stop = new AbortController() + + await parseSSE( + stream(['data: {"type":"spl', 'it"}\n\n']), + stop.signal, + (event) => events.push(event), + ) + + expect(events).toEqual([{ type: "split" }]) + }) + + test("handles double newline split across chunks", async () => { + const events: unknown[] = [] + const stop = new AbortController() + + await parseSSE( + stream(['data: {"type":"boundary"}\n', '\ndata: {"type":"next"}\n\n']), + stop.signal, + (event) => events.push(event), + ) + + expect(events).toEqual([{ type: "boundary" }, { type: "next" }]) + }) + + test("ignores empty events (double newline with no data)", async () => { + const events: unknown[] = [] + const stop = new AbortController() + + await parseSSE( + stream(['\n\ndata: {"type":"real"}\n\n']), + stop.signal, + (event) => events.push(event), + ) + + expect(events).toEqual([{ type: "real" }]) + }) + + test("abort signal stops processing mid-stream", async () => { + const events: unknown[] = [] + const stop = new AbortController() + + // Stream that delivers chunks on demand via pull(); abort fires + // between the first and second read. + let pullCount = 0 + const body = new ReadableStream({ + pull(controller) { + const encoder = new TextEncoder() + pullCount++ + if (pullCount === 1) { + controller.enqueue(encoder.encode('data: {"type":"first"}\n\n')) + // Abort before next pull delivers second event + stop.abort() + } else { + controller.enqueue(encoder.encode('data: {"type":"second"}\n\n')) + controller.close() + } + }, + }) + + await parseSSE(body, stop.signal, (event) => events.push(event)) + + expect(events).toEqual([{ type: "first" }]) + }) + + test("handles bare \\r line endings", async () => { + const events: unknown[] = [] + const stop = new AbortController() + + await parseSSE( + stream(['data: {"type":"cr"}\r\r']), + stop.signal, + (event) => events.push(event), + ) + + expect(events).toEqual([{ type: "cr" }]) + }) }) diff --git a/packages/opencode/test/util/lock.test.ts b/packages/opencode/test/util/lock.test.ts index b877311e39..897fe0ea0d 100644 --- a/packages/opencode/test/util/lock.test.ts +++ b/packages/opencode/test/util/lock.test.ts @@ -69,4 +69,72 @@ describe("util.lock", () => { reader[Symbol.dispose]() }) + + test("concurrent readers: multiple readers can hold the lock simultaneously", async () => { + const key = "lock:" + Math.random().toString(36).slice(2) + const timeline: string[] = [] + + const r1 = await Lock.read(key) + timeline.push("r1-acquired") + + const r2 = await Lock.read(key) + timeline.push("r2-acquired") + + // Both acquired without blocking — readers are concurrent + expect(timeline).toEqual(["r1-acquired", "r2-acquired"]) + + r1[Symbol.dispose]() + r2[Symbol.dispose]() + }) + + test("writer starvation prevention: waiting writer blocks new readers", async () => { + const key = "lock:" + Math.random().toString(36).slice(2) + const timeline: string[] = [] + + // Acquire a reader + const r1 = await Lock.read(key) + timeline.push("r1-acquired") + + // Queue a writer (blocks because r1 is held) + let writerResolved = false + const writerTask = Lock.write(key).then((w) => { + writerResolved = true + timeline.push("writer-acquired") + return w + }) + + await flush() + expect(writerResolved).toBe(false) + + // Queue a second reader (should block because a writer is waiting — + // the source's process() prioritizes waitingWriters over waitingReaders) + let reader2Resolved = false + const reader2Task = Lock.read(key).then((r) => { + reader2Resolved = true + timeline.push("r2-acquired") + return r + }) + + await flush() + expect(reader2Resolved).toBe(false) + + // Release r1 — writer goes next (not reader2). + // process() calls the writer callback synchronously via shift()+nextWriter(), + // which resolves the writer promise on the next microtask. reader2 remains + // queued in waitingReaders until the writer releases. + r1[Symbol.dispose]() + + const writer = await writerTask + expect(writerResolved).toBe(true) + expect(reader2Resolved).toBe(false) + + // Release writer — now reader2 can proceed + writer[Symbol.dispose]() + const r2 = await reader2Task + expect(reader2Resolved).toBe(true) + + expect(timeline).toEqual(["r1-acquired", "writer-acquired", "r2-acquired"]) + + r2[Symbol.dispose]() + }) })