Skip to content

Commit 76d837f

Browse files
committed
feat: add CDP pipeline, cdpmonitor, and wire into API service
1 parent 1c77850 commit 76d837f

File tree

10 files changed

+244
-31
lines changed

10 files changed

+244
-31
lines changed

server/cmd/api/api/api.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ import (
99
"sync"
1010
"time"
1111

12+
"github.com/onkernel/kernel-images/server/lib/cdpmonitor"
1213
"github.com/onkernel/kernel-images/server/lib/devtoolsproxy"
14+
"github.com/onkernel/kernel-images/server/lib/events"
1315
"github.com/onkernel/kernel-images/server/lib/logger"
1416
"github.com/onkernel/kernel-images/server/lib/nekoclient"
1517
oapi "github.com/onkernel/kernel-images/server/lib/oapi"
@@ -68,11 +70,24 @@ type ApiService struct {
6870
// xvfbResizeMu serializes background Xvfb restarts to prevent races
6971
// when multiple CDP fast-path resizes fire in quick succession.
7072
xvfbResizeMu sync.Mutex
73+
74+
// CDP event pipeline and cdpMonitor.
75+
eventsPipeline *events.Pipeline
76+
cdpMonitor *cdpmonitor.Monitor
77+
monitorMu sync.Mutex
7178
}
7279

7380
var _ oapi.StrictServerInterface = (*ApiService)(nil)
7481

75-
func New(recordManager recorder.RecordManager, factory recorder.FFmpegRecorderFactory, upstreamMgr *devtoolsproxy.UpstreamManager, stz scaletozero.Controller, nekoAuthClient *nekoclient.AuthClient) (*ApiService, error) {
82+
func New(
83+
recordManager recorder.RecordManager,
84+
factory recorder.FFmpegRecorderFactory,
85+
upstreamMgr *devtoolsproxy.UpstreamManager,
86+
stz scaletozero.Controller,
87+
nekoAuthClient *nekoclient.AuthClient,
88+
eventsPipeline *events.Pipeline,
89+
displayNum int,
90+
) (*ApiService, error) {
7691
switch {
7792
case recordManager == nil:
7893
return nil, fmt.Errorf("recordManager cannot be nil")
@@ -82,8 +97,12 @@ func New(recordManager recorder.RecordManager, factory recorder.FFmpegRecorderFa
8297
return nil, fmt.Errorf("upstreamMgr cannot be nil")
8398
case nekoAuthClient == nil:
8499
return nil, fmt.Errorf("nekoAuthClient cannot be nil")
100+
case eventsPipeline == nil:
101+
return nil, fmt.Errorf("eventsPipeline cannot be nil")
85102
}
86103

104+
mon := cdpmonitor.New(upstreamMgr, eventsPipeline.Publish, displayNum)
105+
87106
return &ApiService{
88107
recordManager: recordManager,
89108
factory: factory,
@@ -94,6 +113,8 @@ func New(recordManager recorder.RecordManager, factory recorder.FFmpegRecorderFa
94113
stz: stz,
95114
nekoAuthClient: nekoAuthClient,
96115
policy: &policy.Policy{},
116+
eventsPipeline: eventsPipeline,
117+
cdpMonitor: mon,
97118
}, nil
98119
}
99120

@@ -313,5 +334,9 @@ func (s *ApiService) ListRecorders(ctx context.Context, _ oapi.ListRecordersRequ
313334
}
314335

315336
func (s *ApiService) Shutdown(ctx context.Context) error {
337+
s.monitorMu.Lock()
338+
s.cdpMonitor.Stop()
339+
_ = s.eventsPipeline.Close()
340+
s.monitorMu.Unlock()
316341
return s.recordManager.StopAll(ctx)
317342
}

server/cmd/api/api/api_test.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"log/slog"
1313

1414
"github.com/onkernel/kernel-images/server/lib/devtoolsproxy"
15+
"github.com/onkernel/kernel-images/server/lib/events"
1516
"github.com/onkernel/kernel-images/server/lib/nekoclient"
1617
oapi "github.com/onkernel/kernel-images/server/lib/oapi"
1718
"github.com/onkernel/kernel-images/server/lib/recorder"
@@ -25,7 +26,7 @@ func TestApiService_StartRecording(t *testing.T) {
2526

2627
t.Run("success", func(t *testing.T) {
2728
mgr := recorder.NewFFmpegManager()
28-
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t))
29+
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0)
2930
require.NoError(t, err)
3031

3132
resp, err := svc.StartRecording(ctx, oapi.StartRecordingRequestObject{})
@@ -39,7 +40,7 @@ func TestApiService_StartRecording(t *testing.T) {
3940

4041
t.Run("already recording", func(t *testing.T) {
4142
mgr := recorder.NewFFmpegManager()
42-
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t))
43+
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0)
4344
require.NoError(t, err)
4445

4546
// First start should succeed
@@ -54,7 +55,7 @@ func TestApiService_StartRecording(t *testing.T) {
5455

5556
t.Run("custom ids don't collide", func(t *testing.T) {
5657
mgr := recorder.NewFFmpegManager()
57-
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t))
58+
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0)
5859
require.NoError(t, err)
5960

6061
for i := 0; i < 5; i++ {
@@ -87,7 +88,7 @@ func TestApiService_StopRecording(t *testing.T) {
8788

8889
t.Run("no active recording", func(t *testing.T) {
8990
mgr := recorder.NewFFmpegManager()
90-
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t))
91+
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0)
9192
require.NoError(t, err)
9293

9394
resp, err := svc.StopRecording(ctx, oapi.StopRecordingRequestObject{})
@@ -100,7 +101,7 @@ func TestApiService_StopRecording(t *testing.T) {
100101
rec := &mockRecorder{id: "default", isRecordingFlag: true}
101102
require.NoError(t, mgr.RegisterRecorder(ctx, rec), "failed to register recorder")
102103

103-
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t))
104+
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0)
104105
require.NoError(t, err)
105106
resp, err := svc.StopRecording(ctx, oapi.StopRecordingRequestObject{})
106107
require.NoError(t, err)
@@ -115,7 +116,7 @@ func TestApiService_StopRecording(t *testing.T) {
115116

116117
force := true
117118
req := oapi.StopRecordingRequestObject{Body: &oapi.StopRecordingJSONRequestBody{ForceStop: &force}}
118-
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t))
119+
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0)
119120
require.NoError(t, err)
120121
resp, err := svc.StopRecording(ctx, req)
121122
require.NoError(t, err)
@@ -129,7 +130,7 @@ func TestApiService_DownloadRecording(t *testing.T) {
129130

130131
t.Run("not found", func(t *testing.T) {
131132
mgr := recorder.NewFFmpegManager()
132-
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t))
133+
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0)
133134
require.NoError(t, err)
134135
resp, err := svc.DownloadRecording(ctx, oapi.DownloadRecordingRequestObject{})
135136
require.NoError(t, err)
@@ -149,7 +150,7 @@ func TestApiService_DownloadRecording(t *testing.T) {
149150
rec := &mockRecorder{id: "default", isRecordingFlag: true, recordingData: randomBytes(minRecordingSizeInBytes - 1)}
150151
require.NoError(t, mgr.RegisterRecorder(ctx, rec), "failed to register recorder")
151152

152-
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t))
153+
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0)
153154
require.NoError(t, err)
154155
// will return a 202 when the recording is too small
155156
resp, err := svc.DownloadRecording(ctx, oapi.DownloadRecordingRequestObject{})
@@ -179,7 +180,7 @@ func TestApiService_DownloadRecording(t *testing.T) {
179180
rec := &mockRecorder{id: "default", recordingData: data}
180181
require.NoError(t, mgr.RegisterRecorder(ctx, rec), "failed to register recorder")
181182

182-
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t))
183+
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0)
183184
require.NoError(t, err)
184185
resp, err := svc.DownloadRecording(ctx, oapi.DownloadRecordingRequestObject{})
185186
require.NoError(t, err)
@@ -199,7 +200,7 @@ func TestApiService_Shutdown(t *testing.T) {
199200
rec := &mockRecorder{id: "default", isRecordingFlag: true}
200201
require.NoError(t, mgr.RegisterRecorder(ctx, rec), "failed to register recorder")
201202

202-
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t))
203+
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0)
203204
require.NoError(t, err)
204205

205206
require.NoError(t, svc.Shutdown(ctx))
@@ -303,10 +304,16 @@ func newMockNekoClient(t *testing.T) *nekoclient.AuthClient {
303304
return client
304305
}
305306

307+
func newEventsPipeline() *events.Pipeline {
308+
ring := events.NewRingBuffer(64)
309+
fw := events.NewFileWriter(os.TempDir())
310+
return events.NewPipeline(ring, fw)
311+
}
312+
306313
func TestApiService_PatchChromiumFlags(t *testing.T) {
307314
ctx := context.Background()
308315
mgr := recorder.NewFFmpegManager()
309-
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t))
316+
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0)
310317
require.NoError(t, err)
311318

312319
// Test with valid flags

server/cmd/api/api/display_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func testFFmpegFactory(t *testing.T, tempDir string) recorder.FFmpegRecorderFact
3434

3535
func newTestServiceWithFactory(t *testing.T, mgr recorder.RecordManager, factory recorder.FFmpegRecorderFactory) *ApiService {
3636
t.Helper()
37-
svc, err := New(mgr, factory, newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t))
37+
svc, err := New(mgr, factory, newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newEventsPipeline(), 0)
3838
require.NoError(t, err)
3939
return svc
4040
}

server/cmd/api/api/events.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package api
2+
3+
import (
4+
"net/http"
5+
6+
"github.com/google/uuid"
7+
"github.com/onkernel/kernel-images/server/lib/logger"
8+
)
9+
10+
// StartCapture handles POST /events/start.
11+
// Generates a new capture session ID, seeds the pipeline, then starts the
12+
// CDP monitor. If already running, the monitor is stopped and
13+
// restarted with a fresh session ID
14+
func (s *ApiService) StartCapture(w http.ResponseWriter, r *http.Request) {
15+
s.monitorMu.Lock()
16+
defer s.monitorMu.Unlock()
17+
18+
s.eventsPipeline.Start(uuid.New().String())
19+
20+
if err := s.cdpMonitor.Start(r.Context()); err != nil {
21+
logger.FromContext(r.Context()).Error("failed to start CDP monitor", "err", err)
22+
http.Error(w, "failed to start capture", http.StatusInternalServerError)
23+
return
24+
}
25+
w.WriteHeader(http.StatusOK)
26+
}
27+
28+
// StopCapture handles POST /events/stop
29+
func (s *ApiService) StopCapture(w http.ResponseWriter, r *http.Request) {
30+
s.monitorMu.Lock()
31+
defer s.monitorMu.Unlock()
32+
s.cdpMonitor.Stop()
33+
w.WriteHeader(http.StatusOK)
34+
}

server/cmd/api/main.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/onkernel/kernel-images/server/cmd/config"
2525
"github.com/onkernel/kernel-images/server/lib/chromedriverproxy"
2626
"github.com/onkernel/kernel-images/server/lib/devtoolsproxy"
27+
"github.com/onkernel/kernel-images/server/lib/events"
2728
"github.com/onkernel/kernel-images/server/lib/logger"
2829
"github.com/onkernel/kernel-images/server/lib/nekoclient"
2930
oapi "github.com/onkernel/kernel-images/server/lib/oapi"
@@ -90,12 +91,19 @@ func main() {
9091
os.Exit(1)
9192
}
9293

94+
// Construct events pipeline
95+
eventsRing := events.NewRingBuffer(1024)
96+
eventsFileWriter := events.NewFileWriter("/var/log")
97+
eventsPipeline := events.NewPipeline(eventsRing, eventsFileWriter)
98+
9399
apiService, err := api.New(
94100
recorder.NewFFmpegManager(),
95101
recorder.NewFFmpegRecorderFactory(config.PathToFFmpeg, defaultParams, stz),
96102
upstreamMgr,
97103
stz,
98104
nekoAuthClient,
105+
eventsPipeline,
106+
config.DisplayNum,
99107
)
100108
if err != nil {
101109
slogger.Error("failed to create api service", "err", err)
@@ -120,6 +128,10 @@ func main() {
120128
w.Header().Set("Content-Type", "application/json")
121129
w.Write(jsonData)
122130
})
131+
// capture events
132+
r.Post("/events/start", apiService.StartCapture)
133+
r.Post("/events/stop", apiService.StopCapture)
134+
123135
// PTY attach endpoint (WebSocket) - not part of OpenAPI spec
124136
// Uses WebSocket for bidirectional streaming, which works well through proxies.
125137
r.Get("/process/{process_id}/attach", func(w http.ResponseWriter, r *http.Request) {

server/lib/cdpmonitor/monitor.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package cdpmonitor
2+
3+
import (
4+
"context"
5+
"sync/atomic"
6+
7+
"github.com/onkernel/kernel-images/server/lib/events"
8+
)
9+
10+
// UpstreamProvider abstracts *devtoolsproxy.UpstreamManager for testability.
11+
type UpstreamProvider interface {
12+
Current() string
13+
Subscribe() (<-chan string, func())
14+
}
15+
16+
// PublishFunc publishes an Event to the pipeline.
17+
type PublishFunc func(ev events.Event)
18+
19+
// Monitor manages a CDP WebSocket connection with auto-attach session fan-out.
20+
// Single-use per capture session: call Start to begin, Stop to tear down.
21+
type Monitor struct {
22+
running atomic.Bool
23+
}
24+
25+
// New creates a Monitor. displayNum is the X display for ffmpeg screenshots.
26+
func New(_ UpstreamProvider, _ PublishFunc, _ int) *Monitor {
27+
return &Monitor{}
28+
}
29+
30+
// IsRunning reports whether the monitor is actively capturing.
31+
func (m *Monitor) IsRunning() bool {
32+
return m.running.Load()
33+
}
34+
35+
// Start begins CDP capture. Restarts if already running.
36+
func (m *Monitor) Start(_ context.Context) error {
37+
return nil
38+
}
39+
40+
// Stop tears down the monitor. Safe to call multiple times.
41+
func (m *Monitor) Stop() {}

server/lib/events/event.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package events
33
import (
44
"encoding/json"
55
"log/slog"
6+
"strings"
67
)
78

89
// maxS2RecordBytes is the maximum record size for the S2 event pipeline (1 MB).
@@ -50,7 +51,7 @@ const (
5051
// Event is the portable event schema. It contains only producer-emitted content;
5152
// pipeline metadata (seq, capture session) lives on the Envelope.
5253
type Event struct {
53-
Ts int64 `json:"ts"` // Unix microseconds (µs since epoch)
54+
Ts int64 `json:"ts"`
5455
Type string `json:"type"`
5556
Category EventCategory `json:"category"`
5657
Source Source `json:"source"`
@@ -67,6 +68,26 @@ type Envelope struct {
6768
Event Event `json:"event"`
6869
}
6970

71+
// CategoryFor derives an EventCategory from an event type string.
72+
// It splits on the first underscore and maps the prefix to a category.
73+
func CategoryFor(eventType string) EventCategory {
74+
prefix, _, _ := strings.Cut(eventType, "_")
75+
switch prefix {
76+
case "console":
77+
return CategoryConsole
78+
case "network":
79+
return CategoryNetwork
80+
case "page", "navigation", "dom", "target":
81+
return CategoryPage
82+
case "interaction", "layout", "scroll":
83+
return CategoryInteraction
84+
case "screenshot", "monitor":
85+
return CategorySystem
86+
default:
87+
return CategorySystem
88+
}
89+
}
90+
7091
// truncateIfNeeded marshals env and returns the (possibly truncated) envelope.
7192
// If the envelope still exceeds maxS2RecordBytes after nulling data (e.g. huge
7293
// url or source.metadata), it is returned as-is — callers must handle nil data.

0 commit comments

Comments
 (0)