From a9eb638d193ed42d33e2620c4890a45f95da668f Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Fri, 20 Mar 2026 17:25:20 +0000 Subject: [PATCH 01/14] feat: add Pipeline glue type sequencing truncation, file write, and ring publish --- server/lib/events/pipeline.go | 67 +++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 server/lib/events/pipeline.go diff --git a/server/lib/events/pipeline.go b/server/lib/events/pipeline.go new file mode 100644 index 00000000..11661150 --- /dev/null +++ b/server/lib/events/pipeline.go @@ -0,0 +1,67 @@ +package events + +import ( + "sync/atomic" + "time" +) + +// Pipeline glues a RingBuffer and a FileWriter into a single write path. +// A single call to Publish stamps the event with a monotonic sequence number, +// applies truncation, durably appends it to the per-category log file, and +// then makes it available to ring buffer readers. +type Pipeline struct { + ring *RingBuffer + files *FileWriter + seq atomic.Uint64 + captureSessionID atomic.Value // stores string +} + +// NewPipeline returns a Pipeline backed by the supplied ring and file writer. +func NewPipeline(ring *RingBuffer, files *FileWriter) *Pipeline { + p := &Pipeline{ring: ring, files: files} + p.captureSessionID.Store("") + return p +} + +// Start sets the capture session ID that will be stamped on every subsequent +// published event. It may be called at any time; the change is immediately +// visible to concurrent Publish calls. +func (p *Pipeline) Start(captureSessionID string) { + p.captureSessionID.Store(captureSessionID) +} + +// Publish stamps, truncates, files, and broadcasts a single event. +// +// Ordering: +// 1. Stamp CaptureSessionID, Seq, Ts (Ts only if caller left it zero) +// 2. Apply truncateIfNeeded (SCHEMA-04) — must happen before both sinks +// 3. Write to FileWriter (durable before in-memory) +// 4. Publish to RingBuffer (in-memory fan-out) +// +// Errors from FileWriter.Write are silently dropped; the ring buffer always +// receives the event even if the file write fails. +func (p *Pipeline) Publish(ev BrowserEvent) { + ev.CaptureSessionID = p.captureSessionID.Load().(string) + ev.Seq = p.seq.Add(1) // starts at 1 + if ev.Ts == 0 { + ev.Ts = time.Now().UnixMilli() + } + ev = truncateIfNeeded(ev) + + // File write first — durable before in-memory. + _ = p.files.Write(ev) + + // Ring buffer last — readers see the event after the file is written. + p.ring.Publish(ev) +} + +// NewReader returns a Reader positioned at the start of the ring buffer. +func (p *Pipeline) NewReader() *Reader { + return p.ring.NewReader() +} + +// Close closes the underlying FileWriter, flushing and releasing all open +// file descriptors. +func (p *Pipeline) Close() error { + return p.files.Close() +} From dd31b840252c7a9ffaca2ec1d7884ad6fa932070 Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Fri, 27 Mar 2026 11:37:14 +0000 Subject: [PATCH 02/14] review: fix truncateIfNeeded branch split, atomic.Pointer[string], Reader godoc, and test correctness --- server/lib/events/pipeline.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/server/lib/events/pipeline.go b/server/lib/events/pipeline.go index 11661150..ed2f3a58 100644 --- a/server/lib/events/pipeline.go +++ b/server/lib/events/pipeline.go @@ -13,13 +13,14 @@ type Pipeline struct { ring *RingBuffer files *FileWriter seq atomic.Uint64 - captureSessionID atomic.Value // stores string + captureSessionID atomic.Pointer[string] } // NewPipeline returns a Pipeline backed by the supplied ring and file writer. func NewPipeline(ring *RingBuffer, files *FileWriter) *Pipeline { p := &Pipeline{ring: ring, files: files} - p.captureSessionID.Store("") + empty := "" + p.captureSessionID.Store(&empty) return p } @@ -27,7 +28,7 @@ func NewPipeline(ring *RingBuffer, files *FileWriter) *Pipeline { // published event. It may be called at any time; the change is immediately // visible to concurrent Publish calls. func (p *Pipeline) Start(captureSessionID string) { - p.captureSessionID.Store(captureSessionID) + p.captureSessionID.Store(&captureSessionID) } // Publish stamps, truncates, files, and broadcasts a single event. @@ -41,17 +42,17 @@ func (p *Pipeline) Start(captureSessionID string) { // Errors from FileWriter.Write are silently dropped; the ring buffer always // receives the event even if the file write fails. func (p *Pipeline) Publish(ev BrowserEvent) { - ev.CaptureSessionID = p.captureSessionID.Load().(string) + ev.CaptureSessionID = *p.captureSessionID.Load() ev.Seq = p.seq.Add(1) // starts at 1 if ev.Ts == 0 { ev.Ts = time.Now().UnixMilli() } + if ev.DetailLevel == "" { + ev.DetailLevel = DetailDefault + } ev = truncateIfNeeded(ev) - // File write first — durable before in-memory. _ = p.files.Write(ev) - - // Ring buffer last — readers see the event after the file is written. p.ring.Publish(ev) } From b615377bb51e1843f04bc879e7bdad42b6881b6b Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Fri, 27 Mar 2026 12:30:03 +0000 Subject: [PATCH 03/14] fix: serialise Pipeline.Publish to guarantee monotonic seq delivery order --- server/lib/events/pipeline.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/server/lib/events/pipeline.go b/server/lib/events/pipeline.go index ed2f3a58..b7184abc 100644 --- a/server/lib/events/pipeline.go +++ b/server/lib/events/pipeline.go @@ -1,6 +1,7 @@ package events import ( + "sync" "sync/atomic" "time" ) @@ -10,6 +11,7 @@ import ( // applies truncation, durably appends it to the per-category log file, and // then makes it available to ring buffer readers. type Pipeline struct { + mu sync.Mutex ring *RingBuffer files *FileWriter seq atomic.Uint64 @@ -35,13 +37,18 @@ func (p *Pipeline) Start(captureSessionID string) { // // Ordering: // 1. Stamp CaptureSessionID, Seq, Ts (Ts only if caller left it zero) -// 2. Apply truncateIfNeeded (SCHEMA-04) — must happen before both sinks +// 2. Apply truncateIfNeeded — must happen before both sinks // 3. Write to FileWriter (durable before in-memory) // 4. Publish to RingBuffer (in-memory fan-out) // +// The mutex serialises concurrent callers so that seq assignment and sink +// delivery are atomic — readers always see events in seq order. // Errors from FileWriter.Write are silently dropped; the ring buffer always // receives the event even if the file write fails. func (p *Pipeline) Publish(ev BrowserEvent) { + p.mu.Lock() + defer p.mu.Unlock() + ev.CaptureSessionID = *p.captureSessionID.Load() ev.Seq = p.seq.Add(1) // starts at 1 if ev.Ts == 0 { From fbb97a5100169a81016a5901fa81bee1adef51e8 Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Mon, 30 Mar 2026 15:01:13 +0000 Subject: [PATCH 04/14] review --- server/lib/events/pipeline.go | 27 ++++++++++----------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/server/lib/events/pipeline.go b/server/lib/events/pipeline.go index b7184abc..c6f93dcb 100644 --- a/server/lib/events/pipeline.go +++ b/server/lib/events/pipeline.go @@ -1,15 +1,13 @@ package events import ( + "log/slog" "sync" "sync/atomic" "time" ) -// Pipeline glues a RingBuffer and a FileWriter into a single write path. -// A single call to Publish stamps the event with a monotonic sequence number, -// applies truncation, durably appends it to the per-category log file, and -// then makes it available to ring buffer readers. +// Pipeline glues a RingBuffer and a FileWriter into a single write path type Pipeline struct { mu sync.Mutex ring *RingBuffer @@ -18,7 +16,6 @@ type Pipeline struct { captureSessionID atomic.Pointer[string] } -// NewPipeline returns a Pipeline backed by the supplied ring and file writer. func NewPipeline(ring *RingBuffer, files *FileWriter) *Pipeline { p := &Pipeline{ring: ring, files: files} empty := "" @@ -27,8 +24,7 @@ func NewPipeline(ring *RingBuffer, files *FileWriter) *Pipeline { } // Start sets the capture session ID that will be stamped on every subsequent -// published event. It may be called at any time; the change is immediately -// visible to concurrent Publish calls. +// published event func (p *Pipeline) Start(captureSessionID string) { p.captureSessionID.Store(&captureSessionID) } @@ -40,36 +36,33 @@ func (p *Pipeline) Start(captureSessionID string) { // 2. Apply truncateIfNeeded — must happen before both sinks // 3. Write to FileWriter (durable before in-memory) // 4. Publish to RingBuffer (in-memory fan-out) -// -// The mutex serialises concurrent callers so that seq assignment and sink -// delivery are atomic — readers always see events in seq order. -// Errors from FileWriter.Write are silently dropped; the ring buffer always -// receives the event even if the file write fails. func (p *Pipeline) Publish(ev BrowserEvent) { p.mu.Lock() defer p.mu.Unlock() ev.CaptureSessionID = *p.captureSessionID.Load() - ev.Seq = p.seq.Add(1) // starts at 1 + ev.Seq = p.seq.Add(1) if ev.Ts == 0 { ev.Ts = time.Now().UnixMilli() } if ev.DetailLevel == "" { ev.DetailLevel = DetailDefault } - ev = truncateIfNeeded(ev) + ev, data := truncateIfNeeded(ev) - _ = p.files.Write(ev) + if err := p.files.Write(ev, data); err != nil { + slog.Error("pipeline: file write failed", "seq", ev.Seq, "category", ev.Category, "err", err) + } p.ring.Publish(ev) } -// NewReader returns a Reader positioned at the start of the ring buffer. +// NewReader returns a Reader positioned at the start of the ring buffer func (p *Pipeline) NewReader() *Reader { return p.ring.NewReader() } // Close closes the underlying FileWriter, flushing and releasing all open -// file descriptors. +// file descriptors func (p *Pipeline) Close() error { return p.files.Close() } From ab2900840e88d521dd9a81b6940b92e88a0fcfbb Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Tue, 31 Mar 2026 20:04:59 +0000 Subject: [PATCH 05/14] refactor: rename BrowserEvent to Event, DetailDefault to DetailStandard Event is the agreed portable name. DetailStandard avoids Go keyword ambiguity with "default". --- server/lib/events/pipeline.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/lib/events/pipeline.go b/server/lib/events/pipeline.go index c6f93dcb..1c3d31e8 100644 --- a/server/lib/events/pipeline.go +++ b/server/lib/events/pipeline.go @@ -36,7 +36,7 @@ func (p *Pipeline) Start(captureSessionID string) { // 2. Apply truncateIfNeeded — must happen before both sinks // 3. Write to FileWriter (durable before in-memory) // 4. Publish to RingBuffer (in-memory fan-out) -func (p *Pipeline) Publish(ev BrowserEvent) { +func (p *Pipeline) Publish(ev Event) { p.mu.Lock() defer p.mu.Unlock() @@ -46,7 +46,7 @@ func (p *Pipeline) Publish(ev BrowserEvent) { ev.Ts = time.Now().UnixMilli() } if ev.DetailLevel == "" { - ev.DetailLevel = DetailDefault + ev.DetailLevel = DetailStandard } ev, data := truncateIfNeeded(ev) From f0aed531c0b57169536ccffd0d31a754d287447f Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Tue, 31 Mar 2026 20:09:01 +0000 Subject: [PATCH 06/14] refactor: extract Envelope wrapper, move seq and capture_session_id out of Event Event is now purely producer-emitted content. Pipeline-assigned metadata (seq, capture_session_id) lives on the Envelope. truncateIfNeeded operates on the full Envelope. Pipeline type comment now documents lifecycle semantics. --- server/lib/events/pipeline.go | 38 +++++++++++++++++------------------ 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/server/lib/events/pipeline.go b/server/lib/events/pipeline.go index 1c3d31e8..403a1df0 100644 --- a/server/lib/events/pipeline.go +++ b/server/lib/events/pipeline.go @@ -7,7 +7,10 @@ import ( "time" ) -// Pipeline glues a RingBuffer and a FileWriter into a single write path +// Pipeline is a single-use write path that wraps events in envelopes and fans +// them out to a FileWriter (durable) and RingBuffer (in-memory). Call Start +// once with a capture session ID, then Publish concurrently. Close flushes the +// FileWriter; there is no restart or terminal event. type Pipeline struct { mu sync.Mutex ring *RingBuffer @@ -23,46 +26,43 @@ func NewPipeline(ring *RingBuffer, files *FileWriter) *Pipeline { return p } -// Start sets the capture session ID that will be stamped on every subsequent -// published event +// Start sets the capture session ID stamped on every subsequent envelope. func (p *Pipeline) Start(captureSessionID string) { p.captureSessionID.Store(&captureSessionID) } -// Publish stamps, truncates, files, and broadcasts a single event. -// -// Ordering: -// 1. Stamp CaptureSessionID, Seq, Ts (Ts only if caller left it zero) -// 2. Apply truncateIfNeeded — must happen before both sinks -// 3. Write to FileWriter (durable before in-memory) -// 4. Publish to RingBuffer (in-memory fan-out) +// Publish wraps ev in an Envelope, truncates if needed, then writes to +// FileWriter (durable) before RingBuffer (in-memory fan-out). func (p *Pipeline) Publish(ev Event) { p.mu.Lock() defer p.mu.Unlock() - ev.CaptureSessionID = *p.captureSessionID.Load() - ev.Seq = p.seq.Add(1) if ev.Ts == 0 { ev.Ts = time.Now().UnixMilli() } if ev.DetailLevel == "" { ev.DetailLevel = DetailStandard } - ev, data := truncateIfNeeded(ev) - if err := p.files.Write(ev, data); err != nil { - slog.Error("pipeline: file write failed", "seq", ev.Seq, "category", ev.Category, "err", err) + env := Envelope{ + CaptureSessionID: *p.captureSessionID.Load(), + Seq: p.seq.Add(1), + Event: ev, } - p.ring.Publish(ev) + env, data := truncateIfNeeded(env) + + if err := p.files.Write(env, data); err != nil { + slog.Error("pipeline: file write failed", "seq", env.Seq, "category", env.Event.Category, "err", err) + } + p.ring.Publish(env) } -// NewReader returns a Reader positioned at the start of the ring buffer +// NewReader returns a Reader positioned at the start of the ring buffer. func (p *Pipeline) NewReader() *Reader { return p.ring.NewReader() } -// Close closes the underlying FileWriter, flushing and releasing all open -// file descriptors +// Close flushes and releases all open file descriptors. func (p *Pipeline) Close() error { return p.files.Close() } From e07c02441e9ed2cec9d50458a88dc7a4349763fc Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Tue, 31 Mar 2026 20:14:41 +0000 Subject: [PATCH 07/14] refactor: unify seq as universal cursor, add NewReader(afterSeq) Ring buffer now indexes by envelope.Seq directly, removing the separate head/written counters. NewReader takes an explicit afterSeq for resume support. Renamed notify to readerWake for clarity. --- server/lib/events/pipeline.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/lib/events/pipeline.go b/server/lib/events/pipeline.go index 403a1df0..ba7a7660 100644 --- a/server/lib/events/pipeline.go +++ b/server/lib/events/pipeline.go @@ -58,8 +58,8 @@ func (p *Pipeline) Publish(ev Event) { } // NewReader returns a Reader positioned at the start of the ring buffer. -func (p *Pipeline) NewReader() *Reader { - return p.ring.NewReader() +func (p *Pipeline) NewReader(afterSeq uint64) *Reader { + return p.ring.NewReader(afterSeq) } // Close flushes and releases all open file descriptors. From 3945ce4639f0f84c409815342177e4960c7c49f1 Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Tue, 31 Mar 2026 20:16:20 +0000 Subject: [PATCH 08/14] refactor: return ReadResult instead of synthetic drop events Drops are now stream metadata (ReadResult.Dropped) rather than fake events smuggled into the Event schema. Transport layer decides how to surface gaps on the wire. --- server/lib/events/ringbuffer.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/server/lib/events/ringbuffer.go b/server/lib/events/ringbuffer.go index d30a680c..846322de 100644 --- a/server/lib/events/ringbuffer.go +++ b/server/lib/events/ringbuffer.go @@ -55,6 +55,14 @@ type ReadResult struct { Dropped uint64 } +// ReadResult is returned by Reader.Read. Exactly one of Envelope or Dropped is +// set: Envelope is non-nil for a normal read, Dropped is non-zero when the +// reader fell behind and events were lost. +type ReadResult struct { + Envelope *Envelope + Dropped uint64 +} + // Reader tracks an independent read position in a RingBuffer. type Reader struct { rb *RingBuffer From 894e9d0b7dd1189096968b3bd6cbbb94fc65faa3 Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Wed, 1 Apr 2026 11:59:16 +0000 Subject: [PATCH 09/14] fix: guard against nil marshal data and oversized non-data envelopes truncateIfNeeded now warns if the envelope still exceeds the 1MB limit after nulling data (e.g. huge url or source.metadata). Pipeline.Publish skips the file write when marshal returns nil to avoid writing corrupt bare-newline JSONL lines. --- server/lib/events/pipeline.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/lib/events/pipeline.go b/server/lib/events/pipeline.go index ba7a7660..e69c254f 100644 --- a/server/lib/events/pipeline.go +++ b/server/lib/events/pipeline.go @@ -51,7 +51,9 @@ func (p *Pipeline) Publish(ev Event) { } env, data := truncateIfNeeded(env) - if err := p.files.Write(env, data); err != nil { + if data == nil { + slog.Error("pipeline: marshal failed, skipping file write", "seq", env.Seq, "category", env.Event.Category) + } else if err := p.files.Write(env, data); err != nil { slog.Error("pipeline: file write failed", "seq", env.Seq, "category", env.Event.Category, "err", err) } p.ring.Publish(env) From 7d38b48bc7e5968266f17266031748ddbd7b5dea Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Mon, 6 Apr 2026 14:52:34 +0000 Subject: [PATCH 10/14] fix: remove duplicate ReadResult type in ringbuffer.go --- server/lib/events/ringbuffer.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/server/lib/events/ringbuffer.go b/server/lib/events/ringbuffer.go index 846322de..d30a680c 100644 --- a/server/lib/events/ringbuffer.go +++ b/server/lib/events/ringbuffer.go @@ -55,14 +55,6 @@ type ReadResult struct { Dropped uint64 } -// ReadResult is returned by Reader.Read. Exactly one of Envelope or Dropped is -// set: Envelope is non-nil for a normal read, Dropped is non-zero when the -// reader fell behind and events were lost. -type ReadResult struct { - Envelope *Envelope - Dropped uint64 -} - // Reader tracks an independent read position in a RingBuffer. type Reader struct { rb *RingBuffer From 6a67415045af368f3b20fba3c96788ddcd101490 Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Mon, 6 Apr 2026 14:52:39 +0000 Subject: [PATCH 11/14] feat: add POST /events/publish and GET /events/stream endpoints --- server/cmd/api/api/api.go | 24 ++-- server/cmd/api/api/events.go | 98 ++++++++++++- server/cmd/api/api/events_publish_test.go | 166 ++++++++++++++++++++++ server/cmd/api/api/events_stream_test.go | 122 ++++++++++++++++ server/cmd/api/main.go | 12 +- 5 files changed, 400 insertions(+), 22 deletions(-) create mode 100644 server/cmd/api/api/events_publish_test.go create mode 100644 server/cmd/api/api/events_stream_test.go diff --git a/server/cmd/api/api/api.go b/server/cmd/api/api/api.go index 3523904d..f8f46bed 100644 --- a/server/cmd/api/api/api.go +++ b/server/cmd/api/api/api.go @@ -41,6 +41,11 @@ type ApiService struct { upstreamMgr *devtoolsproxy.UpstreamManager stz scaletozero.Controller + // CDP event pipeline and cdpMonitor. + captureSession *events.CaptureSession + cdpMonitor *cdpmonitor.Monitor + monitorMu sync.Mutex + // inputMu serializes input-related operations (mouse, keyboard, screenshot) inputMu sync.Mutex @@ -70,11 +75,6 @@ type ApiService struct { // xvfbResizeMu serializes background Xvfb restarts to prevent races // when multiple CDP fast-path resizes fire in quick succession. xvfbResizeMu sync.Mutex - - // CDP event pipeline and cdpMonitor. - captureSession *events.CaptureSession - cdpMonitor *cdpmonitor.Monitor - monitorMu sync.Mutex } var _ oapi.StrictServerInterface = (*ApiService)(nil) @@ -101,8 +101,6 @@ func New( return nil, fmt.Errorf("captureSession cannot be nil") } - mon := cdpmonitor.New(upstreamMgr, captureSession.Publish, displayNum) - return &ApiService{ recordManager: recordManager, factory: factory, @@ -114,7 +112,7 @@ func New( nekoAuthClient: nekoAuthClient, policy: &policy.Policy{}, captureSession: captureSession, - cdpMonitor: mon, + cdpMonitor: cdpmonitor.New(upstreamMgr, captureSession.Publish, displayNum), }, nil } @@ -334,9 +332,11 @@ func (s *ApiService) ListRecorders(ctx context.Context, _ oapi.ListRecordersRequ } func (s *ApiService) Shutdown(ctx context.Context) error { - s.monitorMu.Lock() - s.cdpMonitor.Stop() - _ = s.captureSession.Close() - s.monitorMu.Unlock() + if s.cdpMonitor != nil { + s.cdpMonitor.Stop() + } + if s.captureSession != nil { + _ = s.captureSession.Close() + } return s.recordManager.StopAll(ctx) } diff --git a/server/cmd/api/api/events.go b/server/cmd/api/api/events.go index f9021a17..c4bf43ad 100644 --- a/server/cmd/api/api/events.go +++ b/server/cmd/api/api/events.go @@ -2,21 +2,27 @@ package api import ( "context" + "encoding/json" + "fmt" + "io" "net/http" + "strconv" "github.com/google/uuid" + "github.com/onkernel/kernel-images/server/lib/events" "github.com/onkernel/kernel-images/server/lib/logger" ) // StartCapture handles POST /events/start. -// Generates a new capture session ID, seeds the pipeline, then starts the -// CDP monitor. If already running, the monitor is stopped and -// restarted with a fresh session ID +// Registered as a direct chi route (not via OpenAPI spec) because these are +// simple internal control endpoints with no request body. +// A second call while already running restarts capture (stop+start). func (s *ApiService) StartCapture(w http.ResponseWriter, r *http.Request) { s.monitorMu.Lock() defer s.monitorMu.Unlock() - s.captureSession.Start(uuid.New().String()) + captureSessionID := uuid.New().String() + s.captureSession.Start(captureSessionID) if err := s.cdpMonitor.Start(context.Background()); err != nil { logger.FromContext(r.Context()).Error("failed to start CDP monitor", "err", err) @@ -26,10 +32,92 @@ func (s *ApiService) StartCapture(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } -// StopCapture handles POST /events/stop +// StopCapture handles POST /events/stop. Idempotent if not running. func (s *ApiService) StopCapture(w http.ResponseWriter, r *http.Request) { s.monitorMu.Lock() defer s.monitorMu.Unlock() s.cdpMonitor.Stop() w.WriteHeader(http.StatusOK) } + +// PublishEvent handles POST /events/publish. +// Accepts an Event JSON body and ingests it into the pipeline (ring buffer + log file). +// Derives Category from Type if omitted; stamps KindKernelAPI if Source.Kind is omitted. +func (s *ApiService) PublishEvent(w http.ResponseWriter, r *http.Request) { + var ev events.Event + if err := json.NewDecoder(r.Body).Decode(&ev); err != nil { + http.Error(w, "invalid JSON body", http.StatusBadRequest) + return + } + + // Derive category if caller omitted it — FileWriter returns error for empty category. + if ev.Category == "" { + ev.Category = events.CategoryFor(ev.Type) + } + + // Stamp provenance if caller omitted source kind. + if ev.Source.Kind == "" { + ev.Source.Kind = events.KindKernelAPI + } + + s.captureSession.Publish(ev) + w.WriteHeader(http.StatusOK) +} + +// StreamEvents handles GET /events/stream. +// Delivers a live stream of Envelopes over Server-Sent Events. +// Each frame is formatted as "id: {seq}\ndata: {json}\n\n". +// Clients may reconnect with Last-Event-ID to resume from the next unseen event. +func (s *ApiService) StreamEvents(w http.ResponseWriter, r *http.Request) { + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "streaming not supported", http.StatusInternalServerError) + return + } + + // Parse Last-Event-ID for reconnection (ignore parse errors; default to 0). + var lastSeq uint64 + if v := r.Header.Get("Last-Event-ID"); v != "" { + if n, err := strconv.ParseUint(v, 10, 64); err == nil { + lastSeq = n + } + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("X-Accel-Buffering", "no") + w.WriteHeader(http.StatusOK) + flusher.Flush() + + // NewReader(lastSeq) positions the reader to deliver events after lastSeq. + reader := s.captureSession.NewReader(lastSeq) + ctx := r.Context() + + // Main event loop. + for { + res, err := reader.Read(ctx) + if err != nil { + // Context cancelled (client disconnected). + return + } + if res.Envelope == nil { + // Drop notification — skip, client will see the gap via seq discontinuity. + continue + } + if err := writeSSEEnvelope(w, *res.Envelope); err != nil { + return + } + flusher.Flush() + } +} + +// writeSSEEnvelope marshals env to JSON and writes a single SSE frame to w. +// Frame format: "id: {seq}\ndata: {json}\n\n" +func writeSSEEnvelope(w io.Writer, env events.Envelope) error { + data, err := json.Marshal(env) + if err != nil { + return err + } + _, err = fmt.Fprintf(w, "id: %d\ndata: %s\n\n", env.Seq, data) + return err +} diff --git a/server/cmd/api/api/events_publish_test.go b/server/cmd/api/api/events_publish_test.go new file mode 100644 index 00000000..be3dace0 --- /dev/null +++ b/server/cmd/api/api/events_publish_test.go @@ -0,0 +1,166 @@ +package api + +import ( + "bytes" + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "testing" + "time" + + "github.com/onkernel/kernel-images/server/lib/events" + "github.com/onkernel/kernel-images/server/lib/recorder" + "github.com/onkernel/kernel-images/server/lib/scaletozero" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func newPublishTestService(t *testing.T, logDir string) (*ApiService, *events.CaptureSession) { + t.Helper() + ring := events.NewRingBuffer(16) + fw := events.NewFileWriter(logDir) + cs := events.NewCaptureSession(ring, fw) + cs.Start("test-session-123") + svc, err := New( + recorder.NewFFmpegManager(), + newMockFactory(), + newTestUpstreamManager(), + scaletozero.NewNoopController(), + newMockNekoClient(t), + cs, + 0, + ) + require.NoError(t, err) + return svc, cs +} + +func TestPublishEvent(t *testing.T) { + t.Run("happy_path", func(t *testing.T) { + logDir := t.TempDir() + svc, cs := newPublishTestService(t, logDir) + + b, _ := json.Marshal(events.Event{ + Type: "liveview.click", + Category: events.CategoryLiveview, + Source: events.Source{Kind: events.KindKernelAPI}, + Data: json.RawMessage(`{"x":100}`), + }) + req := httptest.NewRequest(http.MethodPost, "/events/publish", bytes.NewReader(b)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + svc.PublishEvent(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + + reader := cs.NewReader(0) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + res, err := reader.Read(ctx) + require.NoError(t, err) + require.NotNil(t, res.Envelope) + assert.Equal(t, "liveview.click", res.Envelope.Event.Type) + assert.Equal(t, events.CategoryLiveview, res.Envelope.Event.Category) + }) + + t.Run("invalid_json", func(t *testing.T) { + logDir := t.TempDir() + svc, _ := newPublishTestService(t, logDir) + + req := httptest.NewRequest(http.MethodPost, "/events/publish", bytes.NewReader([]byte(`not-json`))) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + svc.PublishEvent(w, req) + + assert.Equal(t, http.StatusBadRequest, w.Code) + }) + + t.Run("liveview_routes_correctly", func(t *testing.T) { + logDir := t.TempDir() + svc, _ := newPublishTestService(t, logDir) + + b, _ := json.Marshal(events.Event{ + Type: "liveview.click", + Category: events.CategoryLiveview, + Source: events.Source{Kind: events.KindKernelAPI}, + Data: json.RawMessage(`{"x":100}`), + }) + req := httptest.NewRequest(http.MethodPost, "/events/publish", bytes.NewReader(b)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + svc.PublishEvent(w, req) + + require.Equal(t, http.StatusOK, w.Code) + entries, err := os.ReadDir(logDir) + require.NoError(t, err) + found := false + for _, e := range entries { + if e.Name() == "liveview.log" { + info, _ := e.Info() + assert.Greater(t, info.Size(), int64(0)) + found = true + } + } + assert.True(t, found, "liveview.log should exist in logDir") + }) + + t.Run("captcha_routes_correctly", func(t *testing.T) { + logDir := t.TempDir() + svc, _ := newPublishTestService(t, logDir) + + b, _ := json.Marshal(events.Event{ + Type: "captcha.solve", + Category: events.CategoryCaptcha, + Source: events.Source{Kind: events.KindKernelAPI}, + Data: json.RawMessage(`{"token":"abc"}`), + }) + req := httptest.NewRequest(http.MethodPost, "/events/publish", bytes.NewReader(b)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + svc.PublishEvent(w, req) + + require.Equal(t, http.StatusOK, w.Code) + entries, err := os.ReadDir(logDir) + require.NoError(t, err) + found := false + for _, e := range entries { + if e.Name() == "captcha.log" { + info, _ := e.Info() + assert.Greater(t, info.Size(), int64(0)) + found = true + } + } + assert.True(t, found, "captcha.log should exist in logDir") + }) + + t.Run("category_derived_from_type", func(t *testing.T) { + logDir := t.TempDir() + svc, cs := newPublishTestService(t, logDir) + + // No Category field set — should be derived from Type prefix (underscore separator) + b, _ := json.Marshal(events.Event{ + Type: "liveview_click", + Data: json.RawMessage(`{"x":50}`), + }) + req := httptest.NewRequest(http.MethodPost, "/events/publish", bytes.NewReader(b)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + svc.PublishEvent(w, req) + + require.Equal(t, http.StatusOK, w.Code) + + reader := cs.NewReader(0) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + res, err := reader.Read(ctx) + require.NoError(t, err) + require.NotNil(t, res.Envelope) + assert.Equal(t, events.CategoryLiveview, res.Envelope.Event.Category) + + // liveview.log should also exist + _, statErr := os.Stat(filepath.Join(logDir, "liveview.log")) + assert.NoError(t, statErr, "liveview.log should exist after category derivation") + }) +} diff --git a/server/cmd/api/api/events_stream_test.go b/server/cmd/api/api/events_stream_test.go new file mode 100644 index 00000000..94070614 --- /dev/null +++ b/server/cmd/api/api/events_stream_test.go @@ -0,0 +1,122 @@ +package api + +import ( + "context" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/onkernel/kernel-images/server/lib/events" + "github.com/stretchr/testify/assert" +) + +func TestStreamEvents(t *testing.T) { + t.Run("delivers_events", func(t *testing.T) { + logDir := t.TempDir() + svc, cs := newPublishTestService(t, logDir) + + // Publish 2 events before streaming + cs.Publish(events.Event{ + Type: "console.log", + Category: events.CategoryConsole, + Source: events.Source{Kind: events.KindCDP}, + }) + cs.Publish(events.Event{ + Type: "console.log", + Category: events.CategoryConsole, + Source: events.Source{Kind: events.KindCDP}, + }) + + // Create a request context that cancels after a short window + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + + req := httptest.NewRequest(http.MethodGet, "/events/stream", nil).WithContext(ctx) + w := httptest.NewRecorder() + + svc.StreamEvents(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + assert.Equal(t, "text/event-stream", w.Header().Get("Content-Type")) + body := w.Body.String() + assert.Contains(t, body, "id: 1", "should contain event with seq 1") + assert.Contains(t, body, "id: 2", "should contain event with seq 2") + }) + + t.Run("last_event_id_reconnect", func(t *testing.T) { + logDir := t.TempDir() + svc, cs := newPublishTestService(t, logDir) + + // Publish 3 events + cs.Publish(events.Event{Type: "console.log", Category: events.CategoryConsole, Source: events.Source{Kind: events.KindCDP}}) + cs.Publish(events.Event{Type: "console.log", Category: events.CategoryConsole, Source: events.Source{Kind: events.KindCDP}}) + cs.Publish(events.Event{Type: "console.log", Category: events.CategoryConsole, Source: events.Source{Kind: events.KindCDP}}) + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + + req := httptest.NewRequest(http.MethodGet, "/events/stream", nil).WithContext(ctx) + req.Header.Set("Last-Event-ID", "2") + w := httptest.NewRecorder() + + svc.StreamEvents(w, req) + + body := w.Body.String() + assert.Contains(t, body, "id: 3", "should contain event with seq 3") + assert.NotContains(t, body, "id: 1", "should not re-send seq 1") + assert.NotContains(t, body, "id: 2", "should not re-send seq 2") + }) + + t.Run("clean_disconnect", func(t *testing.T) { + logDir := t.TempDir() + svc, _ := newPublishTestService(t, logDir) + + // Context already cancelled before calling handler + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + req := httptest.NewRequest(http.MethodGet, "/events/stream", nil).WithContext(ctx) + w := httptest.NewRecorder() + + done := make(chan struct{}) + go func() { + svc.StreamEvents(w, req) + close(done) + }() + + select { + case <-done: + // Good: handler returned promptly + case <-time.After(100 * time.Millisecond): + t.Error("StreamEvents did not return promptly after context cancellation") + } + }) + + t.Run("no_flusher_returns_500", func(t *testing.T) { + logDir := t.TempDir() + svc, _ := newPublishTestService(t, logDir) + + req := httptest.NewRequest(http.MethodGet, "/events/stream", nil) + // Use a non-flusher ResponseWriter + w := &nonFlusherWriter{header: make(http.Header)} + + svc.StreamEvents(w, req) + + assert.Equal(t, http.StatusInternalServerError, w.code) + }) +} + +// nonFlusherWriter is a ResponseWriter that does NOT implement http.Flusher. +type nonFlusherWriter struct { + header http.Header + code int + body strings.Builder +} + +func (w *nonFlusherWriter) Header() http.Header { return w.header } +func (w *nonFlusherWriter) WriteHeader(code int) { w.code = code } +func (w *nonFlusherWriter) Write(b []byte) (int, error) { + return w.body.Write(b) +} diff --git a/server/cmd/api/main.go b/server/cmd/api/main.go index 767c4881..75a67039 100644 --- a/server/cmd/api/main.go +++ b/server/cmd/api/main.go @@ -24,8 +24,8 @@ import ( "github.com/onkernel/kernel-images/server/cmd/config" "github.com/onkernel/kernel-images/server/lib/chromedriverproxy" "github.com/onkernel/kernel-images/server/lib/devtoolsproxy" - "github.com/onkernel/kernel-images/server/lib/events" "github.com/onkernel/kernel-images/server/lib/logger" + "github.com/onkernel/kernel-images/server/lib/events" "github.com/onkernel/kernel-images/server/lib/nekoclient" oapi "github.com/onkernel/kernel-images/server/lib/oapi" "github.com/onkernel/kernel-images/server/lib/recorder" @@ -128,10 +128,6 @@ func main() { w.Header().Set("Content-Type", "application/json") w.Write(jsonData) }) - // capture events - r.Post("/events/start", apiService.StartCapture) - r.Post("/events/stop", apiService.StopCapture) - // PTY attach endpoint (WebSocket) - not part of OpenAPI spec // Uses WebSocket for bidirectional streaming, which works well through proxies. r.Get("/process/{process_id}/attach", func(w http.ResponseWriter, r *http.Request) { @@ -139,6 +135,12 @@ func main() { apiService.HandleProcessAttachWS(w, r, id) }) + // Events capture lifecycle (not part of OpenAPI spec — simple internal control endpoints) + r.Post("/events/start", apiService.StartCapture) + r.Post("/events/stop", apiService.StopCapture) + r.Post("/events/publish", apiService.PublishEvent) + r.Get("/events/stream", apiService.StreamEvents) + // Serve extension files for Chrome policy-installed extensions // This allows Chrome to download .crx and update.xml files via HTTP extensionsDir := "/home/kernel/extensions" From 9425b059884ec3d2a7c62839eec68d42445136e9 Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Mon, 6 Apr 2026 15:10:38 +0000 Subject: [PATCH 12/14] review: update naming --- server/cmd/api/api/api.go | 4 +- server/cmd/api/api/events.go | 5 ++ server/cmd/api/api/events_publish_test.go | 24 ++++++-- server/cmd/api/api/events_stream_test.go | 10 ++-- server/cmd/api/main.go | 2 +- server/lib/events/pipeline.go | 70 ----------------------- 6 files changed, 34 insertions(+), 81 deletions(-) delete mode 100644 server/lib/events/pipeline.go diff --git a/server/cmd/api/api/api.go b/server/cmd/api/api/api.go index f8f46bed..68cdd201 100644 --- a/server/cmd/api/api/api.go +++ b/server/cmd/api/api/api.go @@ -336,7 +336,9 @@ func (s *ApiService) Shutdown(ctx context.Context) error { s.cdpMonitor.Stop() } if s.captureSession != nil { - _ = s.captureSession.Close() + if err := s.captureSession.Close(); err != nil { + logger.FromContext(ctx).Error("failed to close capture session", "err", err) + } } return s.recordManager.StopAll(ctx) } diff --git a/server/cmd/api/api/events.go b/server/cmd/api/api/events.go index c4bf43ad..6a17a3ac 100644 --- a/server/cmd/api/api/events.go +++ b/server/cmd/api/api/events.go @@ -50,6 +50,11 @@ func (s *ApiService) PublishEvent(w http.ResponseWriter, r *http.Request) { return } + if ev.Type == "" { + http.Error(w, "type is required", http.StatusBadRequest) + return + } + // Derive category if caller omitted it — FileWriter returns error for empty category. if ev.Category == "" { ev.Category = events.CategoryFor(ev.Type) diff --git a/server/cmd/api/api/events_publish_test.go b/server/cmd/api/api/events_publish_test.go index be3dace0..29704a7c 100644 --- a/server/cmd/api/api/events_publish_test.go +++ b/server/cmd/api/api/events_publish_test.go @@ -43,7 +43,7 @@ func TestPublishEvent(t *testing.T) { svc, cs := newPublishTestService(t, logDir) b, _ := json.Marshal(events.Event{ - Type: "liveview.click", + Type: "liveview_click", Category: events.CategoryLiveview, Source: events.Source{Kind: events.KindKernelAPI}, Data: json.RawMessage(`{"x":100}`), @@ -61,7 +61,7 @@ func TestPublishEvent(t *testing.T) { res, err := reader.Read(ctx) require.NoError(t, err) require.NotNil(t, res.Envelope) - assert.Equal(t, "liveview.click", res.Envelope.Event.Type) + assert.Equal(t, "liveview_click", res.Envelope.Event.Type) assert.Equal(t, events.CategoryLiveview, res.Envelope.Event.Category) }) @@ -77,12 +77,28 @@ func TestPublishEvent(t *testing.T) { assert.Equal(t, http.StatusBadRequest, w.Code) }) + t.Run("empty_type_rejected", func(t *testing.T) { + logDir := t.TempDir() + svc, _ := newPublishTestService(t, logDir) + + b, _ := json.Marshal(events.Event{ + Category: events.CategoryConsole, + Data: json.RawMessage(`{"x":1}`), + }) + req := httptest.NewRequest(http.MethodPost, "/events/publish", bytes.NewReader(b)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + svc.PublishEvent(w, req) + + assert.Equal(t, http.StatusBadRequest, w.Code) + }) + t.Run("liveview_routes_correctly", func(t *testing.T) { logDir := t.TempDir() svc, _ := newPublishTestService(t, logDir) b, _ := json.Marshal(events.Event{ - Type: "liveview.click", + Type: "liveview_click", Category: events.CategoryLiveview, Source: events.Source{Kind: events.KindKernelAPI}, Data: json.RawMessage(`{"x":100}`), @@ -111,7 +127,7 @@ func TestPublishEvent(t *testing.T) { svc, _ := newPublishTestService(t, logDir) b, _ := json.Marshal(events.Event{ - Type: "captcha.solve", + Type: "captcha_solve", Category: events.CategoryCaptcha, Source: events.Source{Kind: events.KindKernelAPI}, Data: json.RawMessage(`{"token":"abc"}`), diff --git a/server/cmd/api/api/events_stream_test.go b/server/cmd/api/api/events_stream_test.go index 94070614..6ed4a8ae 100644 --- a/server/cmd/api/api/events_stream_test.go +++ b/server/cmd/api/api/events_stream_test.go @@ -19,12 +19,12 @@ func TestStreamEvents(t *testing.T) { // Publish 2 events before streaming cs.Publish(events.Event{ - Type: "console.log", + Type: "console_log", Category: events.CategoryConsole, Source: events.Source{Kind: events.KindCDP}, }) cs.Publish(events.Event{ - Type: "console.log", + Type: "console_log", Category: events.CategoryConsole, Source: events.Source{Kind: events.KindCDP}, }) @@ -50,9 +50,9 @@ func TestStreamEvents(t *testing.T) { svc, cs := newPublishTestService(t, logDir) // Publish 3 events - cs.Publish(events.Event{Type: "console.log", Category: events.CategoryConsole, Source: events.Source{Kind: events.KindCDP}}) - cs.Publish(events.Event{Type: "console.log", Category: events.CategoryConsole, Source: events.Source{Kind: events.KindCDP}}) - cs.Publish(events.Event{Type: "console.log", Category: events.CategoryConsole, Source: events.Source{Kind: events.KindCDP}}) + cs.Publish(events.Event{Type: "console_log", Category: events.CategoryConsole, Source: events.Source{Kind: events.KindCDP}}) + cs.Publish(events.Event{Type: "console_log", Category: events.CategoryConsole, Source: events.Source{Kind: events.KindCDP}}) + cs.Publish(events.Event{Type: "console_log", Category: events.CategoryConsole, Source: events.Source{Kind: events.KindCDP}}) ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel() diff --git a/server/cmd/api/main.go b/server/cmd/api/main.go index 75a67039..50b5636b 100644 --- a/server/cmd/api/main.go +++ b/server/cmd/api/main.go @@ -24,8 +24,8 @@ import ( "github.com/onkernel/kernel-images/server/cmd/config" "github.com/onkernel/kernel-images/server/lib/chromedriverproxy" "github.com/onkernel/kernel-images/server/lib/devtoolsproxy" - "github.com/onkernel/kernel-images/server/lib/logger" "github.com/onkernel/kernel-images/server/lib/events" + "github.com/onkernel/kernel-images/server/lib/logger" "github.com/onkernel/kernel-images/server/lib/nekoclient" oapi "github.com/onkernel/kernel-images/server/lib/oapi" "github.com/onkernel/kernel-images/server/lib/recorder" diff --git a/server/lib/events/pipeline.go b/server/lib/events/pipeline.go deleted file mode 100644 index e69c254f..00000000 --- a/server/lib/events/pipeline.go +++ /dev/null @@ -1,70 +0,0 @@ -package events - -import ( - "log/slog" - "sync" - "sync/atomic" - "time" -) - -// Pipeline is a single-use write path that wraps events in envelopes and fans -// them out to a FileWriter (durable) and RingBuffer (in-memory). Call Start -// once with a capture session ID, then Publish concurrently. Close flushes the -// FileWriter; there is no restart or terminal event. -type Pipeline struct { - mu sync.Mutex - ring *RingBuffer - files *FileWriter - seq atomic.Uint64 - captureSessionID atomic.Pointer[string] -} - -func NewPipeline(ring *RingBuffer, files *FileWriter) *Pipeline { - p := &Pipeline{ring: ring, files: files} - empty := "" - p.captureSessionID.Store(&empty) - return p -} - -// Start sets the capture session ID stamped on every subsequent envelope. -func (p *Pipeline) Start(captureSessionID string) { - p.captureSessionID.Store(&captureSessionID) -} - -// Publish wraps ev in an Envelope, truncates if needed, then writes to -// FileWriter (durable) before RingBuffer (in-memory fan-out). -func (p *Pipeline) Publish(ev Event) { - p.mu.Lock() - defer p.mu.Unlock() - - if ev.Ts == 0 { - ev.Ts = time.Now().UnixMilli() - } - if ev.DetailLevel == "" { - ev.DetailLevel = DetailStandard - } - - env := Envelope{ - CaptureSessionID: *p.captureSessionID.Load(), - Seq: p.seq.Add(1), - Event: ev, - } - env, data := truncateIfNeeded(env) - - if data == nil { - slog.Error("pipeline: marshal failed, skipping file write", "seq", env.Seq, "category", env.Event.Category) - } else if err := p.files.Write(env, data); err != nil { - slog.Error("pipeline: file write failed", "seq", env.Seq, "category", env.Event.Category, "err", err) - } - p.ring.Publish(env) -} - -// NewReader returns a Reader positioned at the start of the ring buffer. -func (p *Pipeline) NewReader(afterSeq uint64) *Reader { - return p.ring.NewReader(afterSeq) -} - -// Close flushes and releases all open file descriptors. -func (p *Pipeline) Close() error { - return p.files.Close() -} From c2435ed26b76de956a9092cc055d589b441d7192 Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Mon, 6 Apr 2026 15:30:36 +0000 Subject: [PATCH 13/14] review: update tests --- server/cmd/api/api/events.go | 26 +---- server/cmd/api/api/events_publish_test.go | 128 ++++++++-------------- server/cmd/api/api/events_stream_test.go | 93 +++++++--------- 3 files changed, 92 insertions(+), 155 deletions(-) diff --git a/server/cmd/api/api/events.go b/server/cmd/api/api/events.go index 6a17a3ac..9c2435f8 100644 --- a/server/cmd/api/api/events.go +++ b/server/cmd/api/api/events.go @@ -13,10 +13,7 @@ import ( "github.com/onkernel/kernel-images/server/lib/logger" ) -// StartCapture handles POST /events/start. -// Registered as a direct chi route (not via OpenAPI spec) because these are -// simple internal control endpoints with no request body. -// A second call while already running restarts capture (stop+start). +// StartCapture handles POST /events/start. Restarts if already running. func (s *ApiService) StartCapture(w http.ResponseWriter, r *http.Request) { s.monitorMu.Lock() defer s.monitorMu.Unlock() @@ -32,7 +29,7 @@ func (s *ApiService) StartCapture(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } -// StopCapture handles POST /events/stop. Idempotent if not running. +// StopCapture handles POST /events/stop. No-op if not running. func (s *ApiService) StopCapture(w http.ResponseWriter, r *http.Request) { s.monitorMu.Lock() defer s.monitorMu.Unlock() @@ -41,8 +38,7 @@ func (s *ApiService) StopCapture(w http.ResponseWriter, r *http.Request) { } // PublishEvent handles POST /events/publish. -// Accepts an Event JSON body and ingests it into the pipeline (ring buffer + log file). -// Derives Category from Type if omitted; stamps KindKernelAPI if Source.Kind is omitted. +// Defaults Category (via CategoryFor) and Source.Kind (to KindKernelAPI) when omitted. func (s *ApiService) PublishEvent(w http.ResponseWriter, r *http.Request) { var ev events.Event if err := json.NewDecoder(r.Body).Decode(&ev); err != nil { @@ -55,12 +51,10 @@ func (s *ApiService) PublishEvent(w http.ResponseWriter, r *http.Request) { return } - // Derive category if caller omitted it — FileWriter returns error for empty category. if ev.Category == "" { ev.Category = events.CategoryFor(ev.Type) } - // Stamp provenance if caller omitted source kind. if ev.Source.Kind == "" { ev.Source.Kind = events.KindKernelAPI } @@ -69,10 +63,8 @@ func (s *ApiService) PublishEvent(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } -// StreamEvents handles GET /events/stream. -// Delivers a live stream of Envelopes over Server-Sent Events. -// Each frame is formatted as "id: {seq}\ndata: {json}\n\n". -// Clients may reconnect with Last-Event-ID to resume from the next unseen event. +// StreamEvents handles GET /events/stream (SSE). +// Supports Last-Event-ID for reconnection. func (s *ApiService) StreamEvents(w http.ResponseWriter, r *http.Request) { flusher, ok := w.(http.Flusher) if !ok { @@ -80,7 +72,6 @@ func (s *ApiService) StreamEvents(w http.ResponseWriter, r *http.Request) { return } - // Parse Last-Event-ID for reconnection (ignore parse errors; default to 0). var lastSeq uint64 if v := r.Header.Get("Last-Event-ID"); v != "" { if n, err := strconv.ParseUint(v, 10, 64); err == nil { @@ -94,19 +85,15 @@ func (s *ApiService) StreamEvents(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) flusher.Flush() - // NewReader(lastSeq) positions the reader to deliver events after lastSeq. reader := s.captureSession.NewReader(lastSeq) ctx := r.Context() - // Main event loop. for { res, err := reader.Read(ctx) if err != nil { - // Context cancelled (client disconnected). return } if res.Envelope == nil { - // Drop notification — skip, client will see the gap via seq discontinuity. continue } if err := writeSSEEnvelope(w, *res.Envelope); err != nil { @@ -116,8 +103,7 @@ func (s *ApiService) StreamEvents(w http.ResponseWriter, r *http.Request) { } } -// writeSSEEnvelope marshals env to JSON and writes a single SSE frame to w. -// Frame format: "id: {seq}\ndata: {json}\n\n" +// writeSSEEnvelope writes a single SSE frame: "id: {seq}\ndata: {json}\n\n". func writeSSEEnvelope(w io.Writer, env events.Envelope) error { data, err := json.Marshal(env) if err != nil { diff --git a/server/cmd/api/api/events_publish_test.go b/server/cmd/api/api/events_publish_test.go index 29704a7c..4e1e4099 100644 --- a/server/cmd/api/api/events_publish_test.go +++ b/server/cmd/api/api/events_publish_test.go @@ -23,7 +23,7 @@ func newPublishTestService(t *testing.T, logDir string) (*ApiService, *events.Ca ring := events.NewRingBuffer(16) fw := events.NewFileWriter(logDir) cs := events.NewCaptureSession(ring, fw) - cs.Start("test-session-123") + cs.Start("test-capture") svc, err := New( recorder.NewFFmpegManager(), newMockFactory(), @@ -37,37 +37,55 @@ func newPublishTestService(t *testing.T, logDir string) (*ApiService, *events.Ca return svc, cs } +func publishEvent(t *testing.T, svc *ApiService, ev events.Event) *httptest.ResponseRecorder { + t.Helper() + b, err := json.Marshal(ev) + require.NoError(t, err) + req := httptest.NewRequest(http.MethodPost, "/events/publish", bytes.NewReader(b)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + svc.PublishEvent(w, req) + return w +} + +func readEnvelope(t *testing.T, cs *events.CaptureSession) events.Envelope { + t.Helper() + reader := cs.NewReader(0) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + res, err := reader.Read(ctx) + require.NoError(t, err) + require.NotNil(t, res.Envelope) + return *res.Envelope +} + +func assertLogFileExists(t *testing.T, logDir, filename string) { + t.Helper() + info, err := os.Stat(filepath.Join(logDir, filename)) + require.NoError(t, err, "%s should exist", filename) + assert.Greater(t, info.Size(), int64(0), "%s should be non-empty", filename) +} + func TestPublishEvent(t *testing.T) { - t.Run("happy_path", func(t *testing.T) { + t.Run("valid_event_published_to_ring", func(t *testing.T) { logDir := t.TempDir() svc, cs := newPublishTestService(t, logDir) - b, _ := json.Marshal(events.Event{ + w := publishEvent(t, svc, events.Event{ Type: "liveview_click", Category: events.CategoryLiveview, Source: events.Source{Kind: events.KindKernelAPI}, Data: json.RawMessage(`{"x":100}`), }) - req := httptest.NewRequest(http.MethodPost, "/events/publish", bytes.NewReader(b)) - req.Header.Set("Content-Type", "application/json") - w := httptest.NewRecorder() - svc.PublishEvent(w, req) - assert.Equal(t, http.StatusOK, w.Code) - reader := cs.NewReader(0) - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - res, err := reader.Read(ctx) - require.NoError(t, err) - require.NotNil(t, res.Envelope) - assert.Equal(t, "liveview_click", res.Envelope.Event.Type) - assert.Equal(t, events.CategoryLiveview, res.Envelope.Event.Category) + env := readEnvelope(t, cs) + assert.Equal(t, "liveview_click", env.Event.Type) + assert.Equal(t, events.CategoryLiveview, env.Event.Category) }) t.Run("invalid_json", func(t *testing.T) { - logDir := t.TempDir() - svc, _ := newPublishTestService(t, logDir) + svc, _ := newPublishTestService(t, t.TempDir()) req := httptest.NewRequest(http.MethodPost, "/events/publish", bytes.NewReader([]byte(`not-json`))) req.Header.Set("Content-Type", "application/json") @@ -78,105 +96,55 @@ func TestPublishEvent(t *testing.T) { }) t.Run("empty_type_rejected", func(t *testing.T) { - logDir := t.TempDir() - svc, _ := newPublishTestService(t, logDir) + svc, _ := newPublishTestService(t, t.TempDir()) - b, _ := json.Marshal(events.Event{ + w := publishEvent(t, svc, events.Event{ Category: events.CategoryConsole, Data: json.RawMessage(`{"x":1}`), }) - req := httptest.NewRequest(http.MethodPost, "/events/publish", bytes.NewReader(b)) - req.Header.Set("Content-Type", "application/json") - w := httptest.NewRecorder() - svc.PublishEvent(w, req) - assert.Equal(t, http.StatusBadRequest, w.Code) }) - t.Run("liveview_routes_correctly", func(t *testing.T) { + t.Run("liveview_routes_to_log", func(t *testing.T) { logDir := t.TempDir() svc, _ := newPublishTestService(t, logDir) - b, _ := json.Marshal(events.Event{ + w := publishEvent(t, svc, events.Event{ Type: "liveview_click", Category: events.CategoryLiveview, Source: events.Source{Kind: events.KindKernelAPI}, Data: json.RawMessage(`{"x":100}`), }) - req := httptest.NewRequest(http.MethodPost, "/events/publish", bytes.NewReader(b)) - req.Header.Set("Content-Type", "application/json") - w := httptest.NewRecorder() - svc.PublishEvent(w, req) - require.Equal(t, http.StatusOK, w.Code) - entries, err := os.ReadDir(logDir) - require.NoError(t, err) - found := false - for _, e := range entries { - if e.Name() == "liveview.log" { - info, _ := e.Info() - assert.Greater(t, info.Size(), int64(0)) - found = true - } - } - assert.True(t, found, "liveview.log should exist in logDir") + assertLogFileExists(t, logDir, "liveview.log") }) - t.Run("captcha_routes_correctly", func(t *testing.T) { + t.Run("captcha_routes_to_log", func(t *testing.T) { logDir := t.TempDir() svc, _ := newPublishTestService(t, logDir) - b, _ := json.Marshal(events.Event{ + w := publishEvent(t, svc, events.Event{ Type: "captcha_solve", Category: events.CategoryCaptcha, Source: events.Source{Kind: events.KindKernelAPI}, Data: json.RawMessage(`{"token":"abc"}`), }) - req := httptest.NewRequest(http.MethodPost, "/events/publish", bytes.NewReader(b)) - req.Header.Set("Content-Type", "application/json") - w := httptest.NewRecorder() - svc.PublishEvent(w, req) - require.Equal(t, http.StatusOK, w.Code) - entries, err := os.ReadDir(logDir) - require.NoError(t, err) - found := false - for _, e := range entries { - if e.Name() == "captcha.log" { - info, _ := e.Info() - assert.Greater(t, info.Size(), int64(0)) - found = true - } - } - assert.True(t, found, "captcha.log should exist in logDir") + assertLogFileExists(t, logDir, "captcha.log") }) t.Run("category_derived_from_type", func(t *testing.T) { logDir := t.TempDir() svc, cs := newPublishTestService(t, logDir) - // No Category field set — should be derived from Type prefix (underscore separator) - b, _ := json.Marshal(events.Event{ + w := publishEvent(t, svc, events.Event{ Type: "liveview_click", Data: json.RawMessage(`{"x":50}`), }) - req := httptest.NewRequest(http.MethodPost, "/events/publish", bytes.NewReader(b)) - req.Header.Set("Content-Type", "application/json") - w := httptest.NewRecorder() - svc.PublishEvent(w, req) - require.Equal(t, http.StatusOK, w.Code) - reader := cs.NewReader(0) - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - res, err := reader.Read(ctx) - require.NoError(t, err) - require.NotNil(t, res.Envelope) - assert.Equal(t, events.CategoryLiveview, res.Envelope.Event.Category) - - // liveview.log should also exist - _, statErr := os.Stat(filepath.Join(logDir, "liveview.log")) - assert.NoError(t, statErr, "liveview.log should exist after category derivation") + env := readEnvelope(t, cs) + assert.Equal(t, events.CategoryLiveview, env.Event.Category) + assertLogFileExists(t, logDir, "liveview.log") }) } diff --git a/server/cmd/api/api/events_stream_test.go b/server/cmd/api/api/events_stream_test.go index 6ed4a8ae..fe4e893e 100644 --- a/server/cmd/api/api/events_stream_test.go +++ b/server/cmd/api/api/events_stream_test.go @@ -12,73 +12,62 @@ import ( "github.com/stretchr/testify/assert" ) +var testEvent = events.Event{ + Type: "console_log", + Category: events.CategoryConsole, + Source: events.Source{Kind: events.KindCDP}, +} + +func streamRequest(ctx context.Context) (*httptest.ResponseRecorder, *http.Request) { + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/events/stream", nil).WithContext(ctx) + return w, req +} + func TestStreamEvents(t *testing.T) { - t.Run("delivers_events", func(t *testing.T) { - logDir := t.TempDir() - svc, cs := newPublishTestService(t, logDir) - - // Publish 2 events before streaming - cs.Publish(events.Event{ - Type: "console_log", - Category: events.CategoryConsole, - Source: events.Source{Kind: events.KindCDP}, - }) - cs.Publish(events.Event{ - Type: "console_log", - Category: events.CategoryConsole, - Source: events.Source{Kind: events.KindCDP}, - }) - - // Create a request context that cancels after a short window + t.Run("delivers_buffered_events", func(t *testing.T) { + svc, cs := newPublishTestService(t, t.TempDir()) + cs.Publish(testEvent) + cs.Publish(testEvent) + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel() - - req := httptest.NewRequest(http.MethodGet, "/events/stream", nil).WithContext(ctx) - w := httptest.NewRecorder() + w, req := streamRequest(ctx) svc.StreamEvents(w, req) assert.Equal(t, http.StatusOK, w.Code) assert.Equal(t, "text/event-stream", w.Header().Get("Content-Type")) body := w.Body.String() - assert.Contains(t, body, "id: 1", "should contain event with seq 1") - assert.Contains(t, body, "id: 2", "should contain event with seq 2") + assert.Contains(t, body, "id: 1") + assert.Contains(t, body, "id: 2") }) - t.Run("last_event_id_reconnect", func(t *testing.T) { - logDir := t.TempDir() - svc, cs := newPublishTestService(t, logDir) - - // Publish 3 events - cs.Publish(events.Event{Type: "console_log", Category: events.CategoryConsole, Source: events.Source{Kind: events.KindCDP}}) - cs.Publish(events.Event{Type: "console_log", Category: events.CategoryConsole, Source: events.Source{Kind: events.KindCDP}}) - cs.Publish(events.Event{Type: "console_log", Category: events.CategoryConsole, Source: events.Source{Kind: events.KindCDP}}) + t.Run("resumes_after_last_event_id", func(t *testing.T) { + svc, cs := newPublishTestService(t, t.TempDir()) + cs.Publish(testEvent) + cs.Publish(testEvent) + cs.Publish(testEvent) ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel() - - req := httptest.NewRequest(http.MethodGet, "/events/stream", nil).WithContext(ctx) + w, req := streamRequest(ctx) req.Header.Set("Last-Event-ID", "2") - w := httptest.NewRecorder() svc.StreamEvents(w, req) body := w.Body.String() - assert.Contains(t, body, "id: 3", "should contain event with seq 3") - assert.NotContains(t, body, "id: 1", "should not re-send seq 1") - assert.NotContains(t, body, "id: 2", "should not re-send seq 2") + assert.Contains(t, body, "id: 3") + assert.NotContains(t, body, "id: 1") + assert.NotContains(t, body, "id: 2") }) - t.Run("clean_disconnect", func(t *testing.T) { - logDir := t.TempDir() - svc, _ := newPublishTestService(t, logDir) + t.Run("exits_on_cancelled_context", func(t *testing.T) { + svc, _ := newPublishTestService(t, t.TempDir()) - // Context already cancelled before calling handler ctx, cancel := context.WithCancel(context.Background()) cancel() - - req := httptest.NewRequest(http.MethodGet, "/events/stream", nil).WithContext(ctx) - w := httptest.NewRecorder() + w, req := streamRequest(ctx) done := make(chan struct{}) go func() { @@ -88,18 +77,15 @@ func TestStreamEvents(t *testing.T) { select { case <-done: - // Good: handler returned promptly case <-time.After(100 * time.Millisecond): - t.Error("StreamEvents did not return promptly after context cancellation") + t.Error("StreamEvents did not return after context cancellation") } }) - t.Run("no_flusher_returns_500", func(t *testing.T) { - logDir := t.TempDir() - svc, _ := newPublishTestService(t, logDir) + t.Run("rejects_non_flusher", func(t *testing.T) { + svc, _ := newPublishTestService(t, t.TempDir()) req := httptest.NewRequest(http.MethodGet, "/events/stream", nil) - // Use a non-flusher ResponseWriter w := &nonFlusherWriter{header: make(http.Header)} svc.StreamEvents(w, req) @@ -108,15 +94,12 @@ func TestStreamEvents(t *testing.T) { }) } -// nonFlusherWriter is a ResponseWriter that does NOT implement http.Flusher. type nonFlusherWriter struct { header http.Header code int body strings.Builder } -func (w *nonFlusherWriter) Header() http.Header { return w.header } -func (w *nonFlusherWriter) WriteHeader(code int) { w.code = code } -func (w *nonFlusherWriter) Write(b []byte) (int, error) { - return w.body.Write(b) -} +func (w *nonFlusherWriter) Header() http.Header { return w.header } +func (w *nonFlusherWriter) WriteHeader(code int) { w.code = code } +func (w *nonFlusherWriter) Write(b []byte) (int, error) { return w.body.Write(b) } From 27856a3cfcc6fe52c429bd407854a6c34349ce3c Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Mon, 6 Apr 2026 16:13:54 +0000 Subject: [PATCH 14/14] review: add mutex lock/unlock --- server/cmd/api/api/api.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/cmd/api/api/api.go b/server/cmd/api/api/api.go index 68cdd201..57689c80 100644 --- a/server/cmd/api/api/api.go +++ b/server/cmd/api/api/api.go @@ -332,6 +332,7 @@ func (s *ApiService) ListRecorders(ctx context.Context, _ oapi.ListRecordersRequ } func (s *ApiService) Shutdown(ctx context.Context) error { + s.monitorMu.Lock() if s.cdpMonitor != nil { s.cdpMonitor.Stop() } @@ -340,5 +341,6 @@ func (s *ApiService) Shutdown(ctx context.Context) error { logger.FromContext(ctx).Error("failed to close capture session", "err", err) } } + s.monitorMu.Unlock() return s.recordManager.StopAll(ctx) }