diff --git a/server/go.mod b/server/go.mod index 63e81660d..ea0042363 100644 --- a/server/go.mod +++ b/server/go.mod @@ -21,6 +21,7 @@ require ( github.com/rs/zerolog v1.34.0 github.com/spf13/cobra v1.9.1 github.com/spf13/viper v1.20.1 + github.com/stretchr/testify v1.10.0 ) require ( @@ -57,7 +58,6 @@ require ( github.com/spf13/afero v1.14.0 // indirect github.com/spf13/cast v1.9.2 // indirect github.com/spf13/pflag v1.0.7 // indirect - github.com/stretchr/testify v1.10.0 // indirect github.com/subosito/gotenv v1.6.0 // indirect github.com/wlynxg/anet v0.0.5 // indirect golang.org/x/crypto v0.40.0 // indirect diff --git a/server/internal/plugins/manager.go b/server/internal/plugins/manager.go index df51b6f8a..431272d85 100644 --- a/server/internal/plugins/manager.go +++ b/server/internal/plugins/manager.go @@ -14,6 +14,7 @@ import ( "github.com/m1k1o/neko/server/internal/plugins/chat" "github.com/m1k1o/neko/server/internal/plugins/filetransfer" "github.com/m1k1o/neko/server/internal/plugins/scaletozero" + "github.com/m1k1o/neko/server/internal/plugins/telemetry" "github.com/m1k1o/neko/server/pkg/types" ) @@ -49,6 +50,7 @@ func New(config *config.Plugins) *ManagerCtx { manager.plugins.addPlugin(filetransfer.NewPlugin()) manager.plugins.addPlugin(chat.NewPlugin()) manager.plugins.addPlugin(scaletozero.NewPlugin()) + manager.plugins.addPlugin(telemetry.NewPlugin()) return manager } diff --git a/server/internal/plugins/telemetry/config.go b/server/internal/plugins/telemetry/config.go new file mode 100644 index 000000000..553ac8b2c --- /dev/null +++ b/server/internal/plugins/telemetry/config.go @@ -0,0 +1,30 @@ +package telemetry + +import ( + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +type Config struct { + Enabled bool + Endpoint string +} + +func (Config) Init(cmd *cobra.Command) error { + cmd.PersistentFlags().Bool("telemetry.enabled", false, "forward live-view session connect/disconnect events to kernel-images-api") + if err := viper.BindPFlag("telemetry.enabled", cmd.PersistentFlags().Lookup("telemetry.enabled")); err != nil { + return err + } + + cmd.PersistentFlags().String("telemetry.endpoint", "http://127.0.0.1:10001/telemetry/events", "kernel-images-api telemetry publish endpoint") + if err := viper.BindPFlag("telemetry.endpoint", cmd.PersistentFlags().Lookup("telemetry.endpoint")); err != nil { + return err + } + + return nil +} + +func (c *Config) Set() { + c.Enabled = viper.GetBool("telemetry.enabled") + c.Endpoint = viper.GetString("telemetry.endpoint") +} diff --git a/server/internal/plugins/telemetry/manager.go b/server/internal/plugins/telemetry/manager.go new file mode 100644 index 000000000..9fa0448f9 --- /dev/null +++ b/server/internal/plugins/telemetry/manager.go @@ -0,0 +1,186 @@ +package telemetry + +import ( + "bytes" + "context" + "encoding/json" + "net/http" + "sync" + "time" + + "github.com/m1k1o/neko/server/pkg/types" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" +) + +const ( + queueDepth = 256 + defaultHTTPTimeout = 5 * time.Second +) + +type Manager struct { + logger zerolog.Logger + config *Config + sessions types.SessionManager + httpClient *http.Client + + mu sync.Mutex + connectedAt map[string]time.Time + + eventsCh chan eventPayload + stopCh chan struct{} + wg sync.WaitGroup +} + +func NewManager(sessions types.SessionManager, config *Config) *Manager { + return &Manager{ + logger: log.With().Str("module", "telemetry").Logger(), + config: config, + sessions: sessions, + httpClient: &http.Client{Timeout: defaultHTTPTimeout}, + connectedAt: make(map[string]time.Time), + eventsCh: make(chan eventPayload, queueDepth), + stopCh: make(chan struct{}), + } +} + +func (m *Manager) Start() error { + if !m.config.Enabled { + return nil + } + m.logger.Info().Str("endpoint", m.config.Endpoint).Msg("plugin enabled") + + m.wg.Add(1) + go m.worker() + + m.sessions.OnConnected(func(session types.Session) { + m.handleConnect(session.ID()) + }) + m.sessions.OnDisconnected(func(session types.Session) { + m.handleDisconnect(session.ID()) + }) + + return nil +} + +func (m *Manager) Shutdown() error { + if !m.config.Enabled { + return nil + } + close(m.stopCh) + m.wg.Wait() + return nil +} + +func (m *Manager) handleConnect(id string) { + m.mu.Lock() + m.connectedAt[id] = time.Now() + m.mu.Unlock() + + m.enqueue(eventPayload{ + Type: "live_view_connect", + SourceEvent: "neko.session.connected", + Data: map[string]any{"session_id": id}, + }) +} + +func (m *Manager) handleDisconnect(id string) { + m.mu.Lock() + start, ok := m.connectedAt[id] + delete(m.connectedAt, id) + m.mu.Unlock() + + var durationMs float64 + if ok { + durationMs = float64(time.Since(start).Microseconds()) / 1000.0 + } + + m.enqueue(eventPayload{ + Type: "live_view_disconnect", + SourceEvent: "neko.session.disconnected", + Data: map[string]any{"session_id": id, "duration_ms": durationMs}, + }) +} + +func (m *Manager) enqueue(ev eventPayload) { + select { + case m.eventsCh <- ev: + default: + // Drop rather than block neko's session goroutines. A backed-up + // kernel-images-api means we'd lose lifecycle pairs anyway. + m.logger.Warn().Str("type", ev.Type).Msg("telemetry queue full; dropping event") + } +} + +func (m *Manager) worker() { + defer m.wg.Done() + for { + select { + case <-m.stopCh: + // Best-effort drain on shutdown so we don't lose paired + // connect/disconnects when neko exits cleanly. + for { + select { + case ev := <-m.eventsCh: + m.publish(ev) + default: + return + } + } + case ev := <-m.eventsCh: + m.publish(ev) + } + } +} + +func (m *Manager) publish(ev eventPayload) { + body := publishBody{ + Type: ev.Type, + Category: "system", + Source: publishSource{ + Kind: "local_process", + Event: ev.SourceEvent, + }, + Data: ev.Data, + } + raw, err := json.Marshal(body) + if err != nil { + m.logger.Warn().Err(err).Msg("marshal telemetry body failed") + return + } + ctx, cancel := context.WithTimeout(context.Background(), defaultHTTPTimeout) + defer cancel() + req, err := http.NewRequestWithContext(ctx, http.MethodPost, m.config.Endpoint, bytes.NewReader(raw)) + if err != nil { + m.logger.Warn().Err(err).Msg("build telemetry request failed") + return + } + req.Header.Set("Content-Type", "application/json") + resp, err := m.httpClient.Do(req) + if err != nil { + m.logger.Debug().Err(err).Str("type", ev.Type).Msg("telemetry POST failed") + return + } + _ = resp.Body.Close() + if resp.StatusCode/100 != 2 { + m.logger.Debug().Int("status", resp.StatusCode).Str("type", ev.Type).Msg("telemetry POST non-2xx") + } +} + +type eventPayload struct { + Type string + SourceEvent string + Data map[string]any +} + +type publishSource struct { + Kind string `json:"kind"` + Event string `json:"event"` +} + +type publishBody struct { + Type string `json:"type"` + Category string `json:"category"` + Source publishSource `json:"source"` + Data map[string]any `json:"data"` +} diff --git a/server/internal/plugins/telemetry/manager_test.go b/server/internal/plugins/telemetry/manager_test.go new file mode 100644 index 000000000..5460d1134 --- /dev/null +++ b/server/internal/plugins/telemetry/manager_test.go @@ -0,0 +1,167 @@ +package telemetry + +import ( + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/m1k1o/neko/server/internal/config" + intsession "github.com/m1k1o/neko/server/internal/session" + "github.com/m1k1o/neko/server/pkg/types" +) + +func TestPlugin_DisabledIsNoop(t *testing.T) { + srv, captured := newCapturingServer(t) + defer srv.Close() + + sm := newSessionManager(t) + m := NewManager(sm, &Config{Enabled: false, Endpoint: srv.URL}) + require.NoError(t, m.Start()) + defer func() { require.NoError(t, m.Shutdown()) }() + + connect(t, sm, "1") + time.Sleep(50 * time.Millisecond) + + require.Empty(t, captured.snapshot(), "no events expected when disabled") +} + +func TestPlugin_EmitsConnectAndDisconnect(t *testing.T) { + srv, captured := newCapturingServer(t) + defer srv.Close() + + sm := newSessionManager(t) + m := NewManager(sm, &Config{Enabled: true, Endpoint: srv.URL}) + require.NoError(t, m.Start()) + defer func() { require.NoError(t, m.Shutdown()) }() + + s, p := connect(t, sm, "session-1") + // Hold a connection for a measurable interval so duration_ms is positive. + time.Sleep(20 * time.Millisecond) + s.DisconnectWebSocketPeer(p, false) + + require.Eventually(t, func() bool { + return len(captured.snapshot()) >= 2 + }, 2*time.Second, 10*time.Millisecond, "expected 2 events") + + events := captured.snapshot() + require.GreaterOrEqual(t, len(events), 2) + + require.Equal(t, "live_view_connect", events[0].Type) + require.Equal(t, "system", events[0].Category) + require.Equal(t, "local_process", events[0].Source.Kind) + require.Equal(t, "neko.session.connected", events[0].Source.Event) + require.Equal(t, "session-1", events[0].Data["session_id"]) + require.NotContains(t, events[0].Data, "duration_ms") + + require.Equal(t, "live_view_disconnect", events[1].Type) + require.Equal(t, "system", events[1].Category) + require.Equal(t, "local_process", events[1].Source.Kind) + require.Equal(t, "neko.session.disconnected", events[1].Source.Event) + require.Equal(t, "session-1", events[1].Data["session_id"]) + dur, ok := events[1].Data["duration_ms"].(float64) + require.True(t, ok, "duration_ms should be a number") + require.Greater(t, dur, 0.0) +} + +func TestPlugin_DropsOnEndpointFailureWithoutBlocking(t *testing.T) { + // Endpoint that always 500s; plugin should still drain without blocking + // the session goroutines. + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer srv.Close() + + sm := newSessionManager(t) + m := NewManager(sm, &Config{Enabled: true, Endpoint: srv.URL}) + require.NoError(t, m.Start()) + defer func() { require.NoError(t, m.Shutdown()) }() + + done := make(chan struct{}) + go func() { + s, p := connect(t, sm, "session-error") + s.DisconnectWebSocketPeer(p, false) + close(done) + }() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("session connect/disconnect blocked on telemetry plugin") + } +} + +// capturingServer collects POSTed JSON bodies for assertion. +type capturingServer struct { + mu sync.Mutex + captured []capturedEvent +} + +type capturedEvent struct { + Type string `json:"type"` + Category string `json:"category"` + Source publishSource `json:"source"` + Data map[string]any `json:"data"` +} + +func (c *capturingServer) snapshot() []capturedEvent { + c.mu.Lock() + defer c.mu.Unlock() + out := make([]capturedEvent, len(c.captured)) + copy(out, c.captured) + return out +} + +func newCapturingServer(t *testing.T) (*httptest.Server, *capturingServer) { + t.Helper() + c := &capturingServer{} + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + t.Errorf("read body: %v", err) + w.WriteHeader(http.StatusBadRequest) + return + } + var ev capturedEvent + if err := json.Unmarshal(body, &ev); err != nil { + t.Errorf("unmarshal body: %v", err) + w.WriteHeader(http.StatusBadRequest) + return + } + c.mu.Lock() + c.captured = append(c.captured, ev) + c.mu.Unlock() + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"seq":1,"event":{}}`)) + })) + return srv, c +} + +type mockWebsocketPeer struct{} + +func (mockWebsocketPeer) Send(event string, payload any) {} +func (mockWebsocketPeer) Ping() error { return nil } +func (mockWebsocketPeer) Destroy(reason string) {} + +func newSessionManager(t *testing.T) *intsession.SessionManagerCtx { + t.Helper() + return intsession.New(&config.Session{}) +} + +func connect(t *testing.T, sm types.SessionManager, id string) (types.Session, types.WebSocketPeer) { + t.Helper() + s, ok := sm.Get(id) + if !ok { + var err error + s, _, err = sm.Create(id, types.MemberProfile{CanLogin: true, CanConnect: true, CanWatch: true}) + require.NoError(t, err) + } + p := &mockWebsocketPeer{} + s.ConnectWebSocketPeer(p) + return s, p +} diff --git a/server/internal/plugins/telemetry/plugin.go b/server/internal/plugins/telemetry/plugin.go new file mode 100644 index 000000000..aa6d5e87d --- /dev/null +++ b/server/internal/plugins/telemetry/plugin.go @@ -0,0 +1,33 @@ +package telemetry + +import ( + "github.com/m1k1o/neko/server/pkg/types" +) + +type Plugin struct { + config *Config + manager *Manager +} + +func NewPlugin() *Plugin { + return &Plugin{ + config: &Config{}, + } +} + +func (p *Plugin) Name() string { + return PluginName +} + +func (p *Plugin) Config() types.PluginConfig { + return p.config +} + +func (p *Plugin) Start(m types.PluginManagers) error { + p.manager = NewManager(m.SessionManager, p.config) + return p.manager.Start() +} + +func (p *Plugin) Shutdown() error { + return p.manager.Shutdown() +} diff --git a/server/internal/plugins/telemetry/types.go b/server/internal/plugins/telemetry/types.go new file mode 100644 index 000000000..72082f0e2 --- /dev/null +++ b/server/internal/plugins/telemetry/types.go @@ -0,0 +1,3 @@ +package telemetry + +const PluginName = "telemetry"