From 7674a64c4d0f328831caba8c2800812c36b1cbce Mon Sep 17 00:00:00 2001 From: Sayan Samanta Date: Fri, 22 May 2026 15:03:05 -0700 Subject: [PATCH 1/4] vibe code --- server/internal/plugins/telemetry/config.go | 36 ++++ server/internal/plugins/telemetry/manager.go | 193 ++++++++++++++++++ .../plugins/telemetry/manager_test.go | 167 +++++++++++++++ server/internal/plugins/telemetry/plugin.go | 44 ++++ server/internal/plugins/telemetry/types.go | 3 + 5 files changed, 443 insertions(+) create mode 100644 server/internal/plugins/telemetry/config.go create mode 100644 server/internal/plugins/telemetry/manager.go create mode 100644 server/internal/plugins/telemetry/manager_test.go create mode 100644 server/internal/plugins/telemetry/plugin.go create mode 100644 server/internal/plugins/telemetry/types.go diff --git a/server/internal/plugins/telemetry/config.go b/server/internal/plugins/telemetry/config.go new file mode 100644 index 000000000..15b99d121 --- /dev/null +++ b/server/internal/plugins/telemetry/config.go @@ -0,0 +1,36 @@ +package telemetry + +import ( + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +// Config controls the telemetry-forwarding plugin which mirrors WebRTC +// session connect/disconnect events into the kernel-images-api telemetry +// stream as live_view_connect / live_view_disconnect events. +type Config struct { + Enabled bool + Endpoint string +} + +// Init registers CLI/viper flags. Defaults are wired for the in-VM headful +// image where neko and kernel-images-api share localhost. +func (Config) Init(cmd *cobra.Command) error { + cmd.PersistentFlags().Bool("telemetry.enabled", false, "forward live-view session connect/disconnect events to kernel-images-api") + if err := viper.BindPFlag("telemetry.enabled", cmd.PersistentFlags().Lookup("telemetry.enabled")); err != nil { + return err + } + + cmd.PersistentFlags().String("telemetry.endpoint", "http://127.0.0.1:10001/telemetry/events", "kernel-images-api telemetry publish endpoint") + if err := viper.BindPFlag("telemetry.endpoint", cmd.PersistentFlags().Lookup("telemetry.endpoint")); err != nil { + return err + } + + return nil +} + +// Set hydrates Config from viper after flag/file/env parsing. +func (c *Config) Set() { + c.Enabled = viper.GetBool("telemetry.enabled") + c.Endpoint = viper.GetString("telemetry.endpoint") +} diff --git a/server/internal/plugins/telemetry/manager.go b/server/internal/plugins/telemetry/manager.go new file mode 100644 index 000000000..e34410d28 --- /dev/null +++ b/server/internal/plugins/telemetry/manager.go @@ -0,0 +1,193 @@ +package telemetry + +import ( + "bytes" + "context" + "encoding/json" + "net/http" + "sync" + "time" + + "github.com/m1k1o/neko/server/pkg/types" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" +) + +// Tunables; small enough that the worker can't block neko's session +// goroutines, large enough that a brief api downtime doesn't lose events. +const ( + queueDepth = 256 + defaultHTTPTimeout = 5 * time.Second +) + +// Manager subscribes to session connect/disconnect events and forwards them +// to kernel-images-api as live_view_* telemetry events. All HTTP work runs +// on a single background worker so neko's emitter callbacks stay non-blocking. +type Manager struct { + logger zerolog.Logger + config *Config + sessions types.SessionManager + httpClient *http.Client + + mu sync.Mutex + connectedAt map[string]time.Time + + eventsCh chan eventPayload + stopCh chan struct{} + wg sync.WaitGroup +} + +// NewManager builds a manager with sensible defaults; tests can override +// httpClient via the exported field after construction. +func NewManager(sessions types.SessionManager, config *Config) *Manager { + return &Manager{ + logger: log.With().Str("module", "telemetry").Logger(), + config: config, + sessions: sessions, + httpClient: &http.Client{Timeout: defaultHTTPTimeout}, + connectedAt: make(map[string]time.Time), + eventsCh: make(chan eventPayload, queueDepth), + stopCh: make(chan struct{}), + } +} + +// Start subscribes to session events and launches the publish worker. No-op +// when the plugin is disabled. +func (m *Manager) Start() error { + if !m.config.Enabled { + return nil + } + m.logger.Info().Str("endpoint", m.config.Endpoint).Msg("plugin enabled") + + m.wg.Add(1) + go m.worker() + + m.sessions.OnConnected(func(session types.Session) { + m.handleConnect(session.ID()) + }) + m.sessions.OnDisconnected(func(session types.Session) { + m.handleDisconnect(session.ID()) + }) + + return nil +} + +// Shutdown signals the worker to drain and exit, blocking until it does. +func (m *Manager) Shutdown() error { + if !m.config.Enabled { + return nil + } + close(m.stopCh) + m.wg.Wait() + return nil +} + +func (m *Manager) handleConnect(id string) { + m.mu.Lock() + m.connectedAt[id] = time.Now() + m.mu.Unlock() + + m.enqueue(eventPayload{ + Type: "live_view_connect", + Data: map[string]any{"session_id": id}, + }) +} + +func (m *Manager) handleDisconnect(id string) { + m.mu.Lock() + start, ok := m.connectedAt[id] + delete(m.connectedAt, id) + m.mu.Unlock() + + var durationMs float64 + if ok { + durationMs = float64(time.Since(start).Microseconds()) / 1000.0 + } + + m.enqueue(eventPayload{ + Type: "live_view_disconnect", + Data: map[string]any{"session_id": id, "duration_ms": durationMs}, + }) +} + +func (m *Manager) enqueue(ev eventPayload) { + select { + case m.eventsCh <- ev: + default: + // Drop rather than block neko's session goroutines. A backed-up + // kernel-images-api means we'd lose lifecycle pairs anyway. + m.logger.Warn().Str("type", ev.Type).Msg("telemetry queue full; dropping event") + } +} + +func (m *Manager) worker() { + defer m.wg.Done() + for { + select { + case <-m.stopCh: + // Best-effort drain of remaining events on shutdown so we don't + // lose paired connect/disconnects when neko exits cleanly. + for { + select { + case ev := <-m.eventsCh: + m.publish(ev) + default: + return + } + } + case ev := <-m.eventsCh: + m.publish(ev) + } + } +} + +func (m *Manager) publish(ev eventPayload) { + body := publishBody{ + Type: ev.Type, + Category: "system", + Source: publishSource{ + Kind: "local_process", + Event: "neko." + ev.Type, + }, + Data: ev.Data, + } + raw, err := json.Marshal(body) + if err != nil { + m.logger.Warn().Err(err).Msg("marshal telemetry body failed") + return + } + ctx, cancel := context.WithTimeout(context.Background(), defaultHTTPTimeout) + defer cancel() + req, err := http.NewRequestWithContext(ctx, http.MethodPost, m.config.Endpoint, bytes.NewReader(raw)) + if err != nil { + m.logger.Warn().Err(err).Msg("build telemetry request failed") + return + } + req.Header.Set("Content-Type", "application/json") + resp, err := m.httpClient.Do(req) + if err != nil { + m.logger.Debug().Err(err).Str("type", ev.Type).Msg("telemetry POST failed") + return + } + _ = resp.Body.Close() + if resp.StatusCode/100 != 2 { + m.logger.Debug().Int("status", resp.StatusCode).Str("type", ev.Type).Msg("telemetry POST non-2xx") + } +} + +type eventPayload struct { + Type string + Data map[string]any +} + +type publishSource struct { + Kind string `json:"kind"` + Event string `json:"event,omitempty"` +} + +type publishBody struct { + Type string `json:"type"` + Category string `json:"category"` + Source publishSource `json:"source"` + Data map[string]any `json:"data"` +} diff --git a/server/internal/plugins/telemetry/manager_test.go b/server/internal/plugins/telemetry/manager_test.go new file mode 100644 index 000000000..2c8b7e1df --- /dev/null +++ b/server/internal/plugins/telemetry/manager_test.go @@ -0,0 +1,167 @@ +package telemetry + +import ( + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/m1k1o/neko/server/internal/config" + intsession "github.com/m1k1o/neko/server/internal/session" + "github.com/m1k1o/neko/server/pkg/types" +) + +func TestPlugin_DisabledIsNoop(t *testing.T) { + srv, captured := newCapturingServer(t) + defer srv.Close() + + sm := newSessionManager(t) + m := NewManager(sm, &Config{Enabled: false, Endpoint: srv.URL}) + require.NoError(t, m.Start()) + defer func() { require.NoError(t, m.Shutdown()) }() + + connect(t, sm, "1") + time.Sleep(50 * time.Millisecond) + + require.Empty(t, captured.snapshot(), "no events expected when disabled") +} + +func TestPlugin_EmitsConnectAndDisconnect(t *testing.T) { + srv, captured := newCapturingServer(t) + defer srv.Close() + + sm := newSessionManager(t) + m := NewManager(sm, &Config{Enabled: true, Endpoint: srv.URL}) + require.NoError(t, m.Start()) + defer func() { require.NoError(t, m.Shutdown()) }() + + s, p := connect(t, sm, "session-1") + // Hold a connection for a measurable interval so duration_ms is positive. + time.Sleep(20 * time.Millisecond) + s.DisconnectWebSocketPeer(p, false) + + require.Eventually(t, func() bool { + return len(captured.snapshot()) >= 2 + }, 2*time.Second, 10*time.Millisecond, "expected 2 events") + + events := captured.snapshot() + require.GreaterOrEqual(t, len(events), 2) + + require.Equal(t, "live_view_connect", events[0].Type) + require.Equal(t, "system", events[0].Category) + require.Equal(t, "local_process", events[0].Source.Kind) + require.Equal(t, "neko.live_view_connect", events[0].Source.Event) + require.Equal(t, "session-1", events[0].Data["session_id"]) + require.NotContains(t, events[0].Data, "duration_ms") + + require.Equal(t, "live_view_disconnect", events[1].Type) + require.Equal(t, "system", events[1].Category) + require.Equal(t, "local_process", events[1].Source.Kind) + require.Equal(t, "neko.live_view_disconnect", events[1].Source.Event) + require.Equal(t, "session-1", events[1].Data["session_id"]) + dur, ok := events[1].Data["duration_ms"].(float64) + require.True(t, ok, "duration_ms should be a number") + require.Greater(t, dur, 0.0) +} + +func TestPlugin_DropsOnEndpointFailureWithoutBlocking(t *testing.T) { + // Endpoint that always 500s; plugin should still drain without blocking + // the session goroutines. + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer srv.Close() + + sm := newSessionManager(t) + m := NewManager(sm, &Config{Enabled: true, Endpoint: srv.URL}) + require.NoError(t, m.Start()) + defer func() { require.NoError(t, m.Shutdown()) }() + + done := make(chan struct{}) + go func() { + s, p := connect(t, sm, "session-error") + s.DisconnectWebSocketPeer(p, false) + close(done) + }() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("session connect/disconnect blocked on telemetry plugin") + } +} + +// capturingServer collects POSTed JSON bodies for assertion. +type capturingServer struct { + mu sync.Mutex + captured []capturedEvent +} + +type capturedEvent struct { + Type string `json:"type"` + Category string `json:"category"` + Source publishSource `json:"source"` + Data map[string]any `json:"data"` +} + +func (c *capturingServer) snapshot() []capturedEvent { + c.mu.Lock() + defer c.mu.Unlock() + out := make([]capturedEvent, len(c.captured)) + copy(out, c.captured) + return out +} + +func newCapturingServer(t *testing.T) (*httptest.Server, *capturingServer) { + t.Helper() + c := &capturingServer{} + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + t.Errorf("read body: %v", err) + w.WriteHeader(http.StatusBadRequest) + return + } + var ev capturedEvent + if err := json.Unmarshal(body, &ev); err != nil { + t.Errorf("unmarshal body: %v", err) + w.WriteHeader(http.StatusBadRequest) + return + } + c.mu.Lock() + c.captured = append(c.captured, ev) + c.mu.Unlock() + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"seq":1,"event":{}}`)) + })) + return srv, c +} + +type mockWebsocketPeer struct{} + +func (mockWebsocketPeer) Send(event string, payload any) {} +func (mockWebsocketPeer) Ping() error { return nil } +func (mockWebsocketPeer) Destroy(reason string) {} + +func newSessionManager(t *testing.T) *intsession.SessionManagerCtx { + t.Helper() + return intsession.New(&config.Session{}) +} + +func connect(t *testing.T, sm types.SessionManager, id string) (types.Session, types.WebSocketPeer) { + t.Helper() + s, ok := sm.Get(id) + if !ok { + var err error + s, _, err = sm.Create(id, types.MemberProfile{CanLogin: true, CanConnect: true, CanWatch: true}) + require.NoError(t, err) + } + p := &mockWebsocketPeer{} + s.ConnectWebSocketPeer(p) + return s, p +} diff --git a/server/internal/plugins/telemetry/plugin.go b/server/internal/plugins/telemetry/plugin.go new file mode 100644 index 000000000..d140145c6 --- /dev/null +++ b/server/internal/plugins/telemetry/plugin.go @@ -0,0 +1,44 @@ +package telemetry + +import ( + "github.com/m1k1o/neko/server/pkg/types" +) + +// Plugin forwards neko session lifecycle events to kernel-images-api as +// live_view_connect / live_view_disconnect telemetry events. +type Plugin struct { + config *Config + manager *Manager +} + +// NewPlugin constructs the telemetry plugin in disabled state; Start is a +// no-op until Config.Enabled is true. +func NewPlugin() *Plugin { + return &Plugin{ + config: &Config{}, + } +} + +// Name returns the plugin identifier used by the plugin manager. +func (p *Plugin) Name() string { + return PluginName +} + +// Config exposes the underlying config struct for flag binding. +func (p *Plugin) Config() types.PluginConfig { + return p.config +} + +// Start wires the lifecycle listeners and spins the worker goroutine. +func (p *Plugin) Start(m types.PluginManagers) error { + p.manager = NewManager(m.SessionManager, p.config) + return p.manager.Start() +} + +// Shutdown stops the worker goroutine and drains in-flight events. +func (p *Plugin) Shutdown() error { + if p.manager == nil { + return nil + } + return p.manager.Shutdown() +} diff --git a/server/internal/plugins/telemetry/types.go b/server/internal/plugins/telemetry/types.go new file mode 100644 index 000000000..72082f0e2 --- /dev/null +++ b/server/internal/plugins/telemetry/types.go @@ -0,0 +1,3 @@ +package telemetry + +const PluginName = "telemetry" From f727552446ee86774b79b113105b615ce3c521a2 Mon Sep 17 00:00:00 2001 From: Sayan Samanta Date: Fri, 22 May 2026 15:03:12 -0700 Subject: [PATCH 2/4] bumpity --- server/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/go.mod b/server/go.mod index 63e81660d..ea0042363 100644 --- a/server/go.mod +++ b/server/go.mod @@ -21,6 +21,7 @@ require ( github.com/rs/zerolog v1.34.0 github.com/spf13/cobra v1.9.1 github.com/spf13/viper v1.20.1 + github.com/stretchr/testify v1.10.0 ) require ( @@ -57,7 +58,6 @@ require ( github.com/spf13/afero v1.14.0 // indirect github.com/spf13/cast v1.9.2 // indirect github.com/spf13/pflag v1.0.7 // indirect - github.com/stretchr/testify v1.10.0 // indirect github.com/subosito/gotenv v1.6.0 // indirect github.com/wlynxg/anet v0.0.5 // indirect golang.org/x/crypto v0.40.0 // indirect From f6c281c72691767ea934848ab65ace315eacdae4 Mon Sep 17 00:00:00 2001 From: Sayan Samanta Date: Fri, 22 May 2026 15:03:16 -0700 Subject: [PATCH 3/4] integrate --- server/internal/plugins/manager.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/internal/plugins/manager.go b/server/internal/plugins/manager.go index df51b6f8a..431272d85 100644 --- a/server/internal/plugins/manager.go +++ b/server/internal/plugins/manager.go @@ -14,6 +14,7 @@ import ( "github.com/m1k1o/neko/server/internal/plugins/chat" "github.com/m1k1o/neko/server/internal/plugins/filetransfer" "github.com/m1k1o/neko/server/internal/plugins/scaletozero" + "github.com/m1k1o/neko/server/internal/plugins/telemetry" "github.com/m1k1o/neko/server/pkg/types" ) @@ -49,6 +50,7 @@ func New(config *config.Plugins) *ManagerCtx { manager.plugins.addPlugin(filetransfer.NewPlugin()) manager.plugins.addPlugin(chat.NewPlugin()) manager.plugins.addPlugin(scaletozero.NewPlugin()) + manager.plugins.addPlugin(telemetry.NewPlugin()) return manager } From c787d1c1ede1debf33412e5507f20c2329b6ea8c Mon Sep 17 00:00:00 2001 From: Sayan Samanta Date: Fri, 22 May 2026 15:09:41 -0700 Subject: [PATCH 4/4] self review --- server/internal/plugins/telemetry/config.go | 6 ---- server/internal/plugins/telemetry/manager.go | 33 ++++++++----------- .../plugins/telemetry/manager_test.go | 4 +-- server/internal/plugins/telemetry/plugin.go | 11 ------- 4 files changed, 15 insertions(+), 39 deletions(-) diff --git a/server/internal/plugins/telemetry/config.go b/server/internal/plugins/telemetry/config.go index 15b99d121..553ac8b2c 100644 --- a/server/internal/plugins/telemetry/config.go +++ b/server/internal/plugins/telemetry/config.go @@ -5,16 +5,11 @@ import ( "github.com/spf13/viper" ) -// Config controls the telemetry-forwarding plugin which mirrors WebRTC -// session connect/disconnect events into the kernel-images-api telemetry -// stream as live_view_connect / live_view_disconnect events. type Config struct { Enabled bool Endpoint string } -// Init registers CLI/viper flags. Defaults are wired for the in-VM headful -// image where neko and kernel-images-api share localhost. func (Config) Init(cmd *cobra.Command) error { cmd.PersistentFlags().Bool("telemetry.enabled", false, "forward live-view session connect/disconnect events to kernel-images-api") if err := viper.BindPFlag("telemetry.enabled", cmd.PersistentFlags().Lookup("telemetry.enabled")); err != nil { @@ -29,7 +24,6 @@ func (Config) Init(cmd *cobra.Command) error { return nil } -// Set hydrates Config from viper after flag/file/env parsing. func (c *Config) Set() { c.Enabled = viper.GetBool("telemetry.enabled") c.Endpoint = viper.GetString("telemetry.endpoint") diff --git a/server/internal/plugins/telemetry/manager.go b/server/internal/plugins/telemetry/manager.go index e34410d28..9fa0448f9 100644 --- a/server/internal/plugins/telemetry/manager.go +++ b/server/internal/plugins/telemetry/manager.go @@ -13,16 +13,11 @@ import ( "github.com/rs/zerolog/log" ) -// Tunables; small enough that the worker can't block neko's session -// goroutines, large enough that a brief api downtime doesn't lose events. const ( queueDepth = 256 defaultHTTPTimeout = 5 * time.Second ) -// Manager subscribes to session connect/disconnect events and forwards them -// to kernel-images-api as live_view_* telemetry events. All HTTP work runs -// on a single background worker so neko's emitter callbacks stay non-blocking. type Manager struct { logger zerolog.Logger config *Config @@ -37,8 +32,6 @@ type Manager struct { wg sync.WaitGroup } -// NewManager builds a manager with sensible defaults; tests can override -// httpClient via the exported field after construction. func NewManager(sessions types.SessionManager, config *Config) *Manager { return &Manager{ logger: log.With().Str("module", "telemetry").Logger(), @@ -51,8 +44,6 @@ func NewManager(sessions types.SessionManager, config *Config) *Manager { } } -// Start subscribes to session events and launches the publish worker. No-op -// when the plugin is disabled. func (m *Manager) Start() error { if !m.config.Enabled { return nil @@ -72,7 +63,6 @@ func (m *Manager) Start() error { return nil } -// Shutdown signals the worker to drain and exit, blocking until it does. func (m *Manager) Shutdown() error { if !m.config.Enabled { return nil @@ -88,8 +78,9 @@ func (m *Manager) handleConnect(id string) { m.mu.Unlock() m.enqueue(eventPayload{ - Type: "live_view_connect", - Data: map[string]any{"session_id": id}, + Type: "live_view_connect", + SourceEvent: "neko.session.connected", + Data: map[string]any{"session_id": id}, }) } @@ -105,8 +96,9 @@ func (m *Manager) handleDisconnect(id string) { } m.enqueue(eventPayload{ - Type: "live_view_disconnect", - Data: map[string]any{"session_id": id, "duration_ms": durationMs}, + Type: "live_view_disconnect", + SourceEvent: "neko.session.disconnected", + Data: map[string]any{"session_id": id, "duration_ms": durationMs}, }) } @@ -125,8 +117,8 @@ func (m *Manager) worker() { for { select { case <-m.stopCh: - // Best-effort drain of remaining events on shutdown so we don't - // lose paired connect/disconnects when neko exits cleanly. + // Best-effort drain on shutdown so we don't lose paired + // connect/disconnects when neko exits cleanly. for { select { case ev := <-m.eventsCh: @@ -147,7 +139,7 @@ func (m *Manager) publish(ev eventPayload) { Category: "system", Source: publishSource{ Kind: "local_process", - Event: "neko." + ev.Type, + Event: ev.SourceEvent, }, Data: ev.Data, } @@ -176,13 +168,14 @@ func (m *Manager) publish(ev eventPayload) { } type eventPayload struct { - Type string - Data map[string]any + Type string + SourceEvent string + Data map[string]any } type publishSource struct { Kind string `json:"kind"` - Event string `json:"event,omitempty"` + Event string `json:"event"` } type publishBody struct { diff --git a/server/internal/plugins/telemetry/manager_test.go b/server/internal/plugins/telemetry/manager_test.go index 2c8b7e1df..5460d1134 100644 --- a/server/internal/plugins/telemetry/manager_test.go +++ b/server/internal/plugins/telemetry/manager_test.go @@ -55,14 +55,14 @@ func TestPlugin_EmitsConnectAndDisconnect(t *testing.T) { require.Equal(t, "live_view_connect", events[0].Type) require.Equal(t, "system", events[0].Category) require.Equal(t, "local_process", events[0].Source.Kind) - require.Equal(t, "neko.live_view_connect", events[0].Source.Event) + require.Equal(t, "neko.session.connected", events[0].Source.Event) require.Equal(t, "session-1", events[0].Data["session_id"]) require.NotContains(t, events[0].Data, "duration_ms") require.Equal(t, "live_view_disconnect", events[1].Type) require.Equal(t, "system", events[1].Category) require.Equal(t, "local_process", events[1].Source.Kind) - require.Equal(t, "neko.live_view_disconnect", events[1].Source.Event) + require.Equal(t, "neko.session.disconnected", events[1].Source.Event) require.Equal(t, "session-1", events[1].Data["session_id"]) dur, ok := events[1].Data["duration_ms"].(float64) require.True(t, ok, "duration_ms should be a number") diff --git a/server/internal/plugins/telemetry/plugin.go b/server/internal/plugins/telemetry/plugin.go index d140145c6..aa6d5e87d 100644 --- a/server/internal/plugins/telemetry/plugin.go +++ b/server/internal/plugins/telemetry/plugin.go @@ -4,41 +4,30 @@ import ( "github.com/m1k1o/neko/server/pkg/types" ) -// Plugin forwards neko session lifecycle events to kernel-images-api as -// live_view_connect / live_view_disconnect telemetry events. type Plugin struct { config *Config manager *Manager } -// NewPlugin constructs the telemetry plugin in disabled state; Start is a -// no-op until Config.Enabled is true. func NewPlugin() *Plugin { return &Plugin{ config: &Config{}, } } -// Name returns the plugin identifier used by the plugin manager. func (p *Plugin) Name() string { return PluginName } -// Config exposes the underlying config struct for flag binding. func (p *Plugin) Config() types.PluginConfig { return p.config } -// Start wires the lifecycle listeners and spins the worker goroutine. func (p *Plugin) Start(m types.PluginManagers) error { p.manager = NewManager(m.SessionManager, p.config) return p.manager.Start() } -// Shutdown stops the worker goroutine and drains in-flight events. func (p *Plugin) Shutdown() error { - if p.manager == nil { - return nil - } return p.manager.Shutdown() }