Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions packages/dbt-tools/test/log-buffer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { describe, test, expect, beforeEach } from "bun:test"
import { bufferLog, getRecentDbtLogs, clearDbtLogs } from "../src/log-buffer"

describe("dbt log-buffer", () => {
beforeEach(() => {
clearDbtLogs()
})

test("buffers log messages in insertion order", () => {
bufferLog("first")
bufferLog("second")
bufferLog("third")
expect(getRecentDbtLogs()).toEqual(["first", "second", "third"])
})

test("evicts oldest entries when buffer exceeds 100", () => {
for (let i = 0; i < 105; i++) {
bufferLog(`msg-${i}`)
}
const logs = getRecentDbtLogs()
expect(logs).toHaveLength(100)
expect(logs[0]).toBe("msg-5")
expect(logs[99]).toBe("msg-104")
})

test("clearDbtLogs empties the buffer", () => {
bufferLog("something")
clearDbtLogs()
expect(getRecentDbtLogs()).toEqual([])
})

test("getRecentDbtLogs returns a copy, not a reference", () => {
bufferLog("original")
const copy = getRecentDbtLogs()
copy.push("injected")
expect(getRecentDbtLogs()).toEqual(["original"])
})

test("handles empty buffer", () => {
expect(getRecentDbtLogs()).toEqual([])
})

test("buffer stays at exactly 100 after repeated overflow", () => {
for (let i = 0; i < 200; i++) {
bufferLog(`msg-${i}`)
}
expect(getRecentDbtLogs()).toHaveLength(100)
expect(getRecentDbtLogs()[0]).toBe("msg-100")
expect(getRecentDbtLogs()[99]).toBe("msg-199")
})
})
79 changes: 79 additions & 0 deletions packages/opencode/test/control-plane/sse.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,83 @@ describe("control-plane/sse", () => {
},
])
})

test("handles events split across chunk boundaries", async () => {
const events: unknown[] = []
const stop = new AbortController()

await parseSSE(
stream(['data: {"type":"spl', 'it"}\n\n']),
stop.signal,
(event) => events.push(event),
)

expect(events).toEqual([{ type: "split" }])
})

test("handles double newline split across chunks", async () => {
const events: unknown[] = []
const stop = new AbortController()

await parseSSE(
stream(['data: {"type":"boundary"}\n', '\ndata: {"type":"next"}\n\n']),
stop.signal,
(event) => events.push(event),
)

expect(events).toEqual([{ type: "boundary" }, { type: "next" }])
})

test("ignores empty events (double newline with no data)", async () => {
const events: unknown[] = []
const stop = new AbortController()

await parseSSE(
stream(['\n\ndata: {"type":"real"}\n\n']),
stop.signal,
(event) => events.push(event),
)

expect(events).toEqual([{ type: "real" }])
})

test("abort signal stops processing mid-stream", async () => {
const events: unknown[] = []
const stop = new AbortController()

// Stream that delivers chunks on demand via pull(); abort fires
// between the first and second read.
let pullCount = 0
const body = new ReadableStream<Uint8Array>({
pull(controller) {
const encoder = new TextEncoder()
pullCount++
if (pullCount === 1) {
controller.enqueue(encoder.encode('data: {"type":"first"}\n\n'))
// Abort before next pull delivers second event
stop.abort()
} else {
controller.enqueue(encoder.encode('data: {"type":"second"}\n\n'))
controller.close()
}
},
})

await parseSSE(body, stop.signal, (event) => events.push(event))

expect(events).toEqual([{ type: "first" }])
})

test("handles bare \\r line endings", async () => {
const events: unknown[] = []
const stop = new AbortController()

await parseSSE(
stream(['data: {"type":"cr"}\r\r']),
stop.signal,
(event) => events.push(event),
)

expect(events).toEqual([{ type: "cr" }])
})
})
68 changes: 68 additions & 0 deletions packages/opencode/test/util/lock.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,72 @@ describe("util.lock", () => {

reader[Symbol.dispose]()
})

test("concurrent readers: multiple readers can hold the lock simultaneously", async () => {
const key = "lock:" + Math.random().toString(36).slice(2)
const timeline: string[] = []

const r1 = await Lock.read(key)
timeline.push("r1-acquired")

const r2 = await Lock.read(key)
timeline.push("r2-acquired")

// Both acquired without blocking — readers are concurrent
expect(timeline).toEqual(["r1-acquired", "r2-acquired"])

r1[Symbol.dispose]()
r2[Symbol.dispose]()
})

test("writer starvation prevention: waiting writer blocks new readers", async () => {
const key = "lock:" + Math.random().toString(36).slice(2)
const timeline: string[] = []

// Acquire a reader
const r1 = await Lock.read(key)
timeline.push("r1-acquired")

// Queue a writer (blocks because r1 is held)
let writerResolved = false
const writerTask = Lock.write(key).then((w) => {
writerResolved = true
timeline.push("writer-acquired")
return w
})

await flush()
expect(writerResolved).toBe(false)

// Queue a second reader (should block because a writer is waiting —
// the source's process() prioritizes waitingWriters over waitingReaders)
let reader2Resolved = false
const reader2Task = Lock.read(key).then((r) => {
reader2Resolved = true
timeline.push("r2-acquired")
return r
})

await flush()
expect(reader2Resolved).toBe(false)

// Release r1 — writer goes next (not reader2).
// process() calls the writer callback synchronously via shift()+nextWriter(),
// which resolves the writer promise on the next microtask. reader2 remains
// queued in waitingReaders until the writer releases.
r1[Symbol.dispose]()

const writer = await writerTask
expect(writerResolved).toBe(true)
expect(reader2Resolved).toBe(false)

// Release writer — now reader2 can proceed
writer[Symbol.dispose]()
const r2 = await reader2Task
expect(reader2Resolved).toBe(true)

expect(timeline).toEqual(["r1-acquired", "writer-acquired", "r2-acquired"])

r2[Symbol.dispose]()
})
})
Loading