diff --git a/internal/runtime/configsvc/service.go b/internal/runtime/configsvc/service.go index 78aabad3..0a189a26 100644 --- a/internal/runtime/configsvc/service.go +++ b/internal/runtime/configsvc/service.go @@ -200,8 +200,28 @@ func (s *Service) Subscribe(ctx context.Context) <-chan Update { s.logger.Debug("New configuration subscriber", zap.Int("total_subscribers", len(s.subscribers))) - // Send initial snapshot to new subscriber + // Send initial snapshot to new subscriber. go func() { + // Best-effort early-out if the subscriber canceled before we ran. + select { + case <-ctx.Done(): + s.Unsubscribe(ch) + return + default: + } + + // Deliver the initial snapshot under subMu(R) so Close()/Unsubscribe() + // (which close ch under the subMu write lock) cannot race or panic this + // send — a send on a closed channel both data-races and panics. The + // membership check covers both teardown paths: Close() nils the slice and + // Unsubscribe() removes this ch. The buffer (cap 10) is empty at subscribe + // time, so a non-blocking send always reaches a still-live subscriber; + // staying non-blocking avoids holding the lock across a blocking send. + s.subMu.RLock() + defer s.subMu.RUnlock() + if !s.isSubscribedLocked(ch) { + return + } select { case ch <- Update{ Snapshot: s.Current(), @@ -209,14 +229,24 @@ func (s *Service) Subscribe(ctx context.Context) <-chan Update { ChangedAt: time.Now(), Source: "subscription", }: - case <-ctx.Done(): - s.Unsubscribe(ch) + default: } }() return ch } +// isSubscribedLocked reports whether ch is still a live subscriber. Callers must +// hold subMu (read or write). +func (s *Service) isSubscribedLocked(ch chan Update) bool { + for _, sub := range s.subscribers { + if sub == ch { + return true + } + } + return false +} + // Unsubscribe removes a subscriber channel and closes it. func (s *Service) Unsubscribe(ch <-chan Update) { s.subMu.Lock() diff --git a/internal/runtime/configsvc/service_test.go b/internal/runtime/configsvc/service_test.go index 6bd81407..5be731eb 100644 --- a/internal/runtime/configsvc/service_test.go +++ b/internal/runtime/configsvc/service_test.go @@ -2,6 +2,7 @@ package configsvc import ( "context" + "sync" "testing" "time" @@ -328,6 +329,34 @@ func TestSnapshot_ServerNames(t *testing.T) { } } +// TestService_SubscribeCloseRace reproduces the close-vs-send data race surfaced +// by TestHandleUpstreamServers_AddFromRegistry_* under -race (MCP-816 / MCP-809 +// RV-3). Subscribe spawns a goroutine that sends the initial snapshot on the +// subscriber channel (service.go:206) while holding no lock; Close (service.go:261) +// closes that same channel under subMu. A send racing the close both data-races +// and can panic ("send on closed channel"). Run under `go test -race`: trips +// before the lock+membership-guarded send, green after. Run with high parallelism +// so a Subscribe-init send overlaps the Close in most iterations. +func TestService_SubscribeCloseRace(t *testing.T) { + const iterations = 200 + + var wg sync.WaitGroup + for i := 0; i < iterations; i++ { + wg.Add(1) + go func() { + defer wg.Done() + svc := NewService(&config.Config{Listen: "127.0.0.1:8080"}, "/tmp/config.json", zap.NewNop()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Subscribe schedules the init-snapshot send goroutine; Close races it. + _ = svc.Subscribe(ctx) + svc.Close() + }() + } + wg.Wait() +} + func TestService_Close(t *testing.T) { cfg := &config.Config{ Listen: "127.0.0.1:8080", diff --git a/internal/server/consistency_crosssurface_test.go b/internal/server/consistency_crosssurface_test.go new file mode 100644 index 00000000..5253c4be --- /dev/null +++ b/internal/server/consistency_crosssurface_test.go @@ -0,0 +1,165 @@ +package server + +import ( + "bytes" + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/smart-mcp-proxy/mcpproxy-go/internal/config" + "github.com/smart-mcp-proxy/mcpproxy-go/internal/contracts" + "github.com/smart-mcp-proxy/mcpproxy-go/internal/httpapi" +) + +// Spec 070 keystone regression (T021 / CN-004 / FR-010 / SC-004). +// +// Every add surface (REST, MCP, CLI) funnels through the single keystone +// AddServerFromRegistryRef, so the registry-result -> config.ServerConfig +// normalization lives in exactly one place. This test is the guard that keeps +// it that way: it drives the SAME logical add — same (registry, serverId, name, +// env) — through each surface against its own isolated server, then asserts the +// PERSISTED config.ServerConfig is byte-identical across all three (modulo the +// Created/Updated timestamps) and that every one is quarantined (SC-004). +// +// If a future change lets one surface bypass the keystone (e.g. the Web UI's +// old client-side install_cmd parsing, or a surface that forgets the quarantine +// default), the persisted configs diverge and this test fails. +// +// Surfaces exercised in-process: +// - MCP: the real upstream_servers handler (operation=add_from_registry), +// which extracts args from the MCP request (note: env arrives as env_json). +// - REST: a real HTTP POST to the actual chi router handler +// (POST /api/v1/registries/{id}/servers/{serverId}/add), exercising JSON +// body decode + URL param extraction + auth. +// - CLI add path: the CLI is a thin HTTP client of the REST route, so its +// config-derivation contribution bottoms out at the same controller method +// (AddServerFromRegistryRef). The full CLI binary->daemon path is separately +// covered end-to-end by TestRegistryAddCLIE2E. +func TestCrossSurfaceConsistency_RegistryAdd(t *testing.T) { + // One stdio entry whose install command declares a required input + // (${API_KEY}); supplying it via env exercises required-input satisfaction + // AND env carry-through on the persisted config. + servers := []map[string]interface{}{ + {"id": "everything", "name": "everything", "installCmd": "npx -y srv --key ${API_KEY}"}, + } + startTestRegistry(t, servers) // registers id="testreg" against a local httptest server + + const ( + regID = "testreg" + serverID = "everything" + addName = "consistency-srv" + ) + env := map[string]string{"API_KEY": "secret-123"} + + // --- Surface 1: MCP ------------------------------------------------------- + srvMCP := newConsistencyServer(t) + proxy := createTestMCPProxyServer(t) + proxy.mainServer = srvMCP + + envJSON, err := json.Marshal(env) + require.NoError(t, err) + mcpResult := callAddFromRegistry(t, proxy, map[string]interface{}{ + "operation": "add_from_registry", + "registry": regID, + "id": serverID, + "name": addName, + "env_json": string(envJSON), + }) + require.False(t, mcpResult.IsError, "MCP add must succeed: %v", mcpResult.Content) + mcpCfg := persistedServer(t, srvMCP, addName) + + // --- Surface 2: REST (real HTTP through the chi router) ------------------- + srvREST := newConsistencyServer(t) + api := httpapi.NewServer(srvREST, zap.NewNop().Sugar(), nil) + + body, err := json.Marshal(contracts.AddFromRegistryRequest{Name: addName, Env: env}) + require.NoError(t, err) + req := httptest.NewRequest(http.MethodPost, + "/api/v1/registries/"+regID+"/servers/"+serverID+"/add", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-API-Key", consistencyAPIKey) + rec := httptest.NewRecorder() + api.Router().ServeHTTP(rec, req) + require.Equal(t, http.StatusOK, rec.Code, "REST add must succeed: %s", rec.Body.String()) + restCfg := persistedServer(t, srvREST, addName) + + // --- Surface 3: CLI add path (shared controller bottom) ------------------- + srvCLI := newConsistencyServer(t) + cliCfg, rerr, err := srvCLI.AddServerFromRegistryRef(context.Background(), regID, serverID, addName, env, nil) + require.NoError(t, err) + require.Nil(t, rerr) + require.NotNil(t, cliCfg) + cliPersisted := persistedServer(t, srvCLI, addName) + + // --- Cross-surface byte-identity (CN-004) --------------------------------- + mcpJSON := canonicalServerJSON(t, mcpCfg) + restJSON := canonicalServerJSON(t, restCfg) + cliJSON := canonicalServerJSON(t, cliPersisted) + + assert.Equal(t, mcpJSON, restJSON, "REST add must persist a byte-identical config to MCP add") + assert.Equal(t, mcpJSON, cliJSON, "CLI add path must persist a byte-identical config to MCP add") + + // --- Quarantine invariant (SC-004 / CN-002) ------------------------------- + assert.True(t, mcpCfg.Quarantined, "MCP-added server must be quarantined") + assert.True(t, restCfg.Quarantined, "REST-added server must be quarantined") + assert.True(t, cliPersisted.Quarantined, "CLI-added server must be quarantined") + + // --- Sanity on the shared derivation ------------------------------------- + assert.Equal(t, "stdio", mcpCfg.Protocol) + assert.Equal(t, "npx", mcpCfg.Command) + assert.Equal(t, []string{"-y", "srv", "--key", "${API_KEY}"}, mcpCfg.Args) + assert.Equal(t, "secret-123", mcpCfg.Env["API_KEY"]) + assert.True(t, mcpCfg.Enabled) +} + +const consistencyAPIKey = "t021-consistency-key" + +// newConsistencyServer builds an isolated *Server (own data dir + storage) with +// a known API key so the REST surface can authenticate. The storage handle is +// closed on cleanup so the temp-dir removal succeeds on Windows. +func newConsistencyServer(t *testing.T) *Server { + t.Helper() + cfg := config.DefaultConfig() + cfg.DataDir = t.TempDir() + cfg.Listen = "127.0.0.1:0" + cfg.APIKey = consistencyAPIKey + srv, err := NewServer(cfg, zap.NewNop()) + require.NoError(t, err) + t.Cleanup(func() { _ = srv.Shutdown() }) + return srv +} + +// persistedServer reads the actually-persisted ServerConfig back from the +// server's live config snapshot (not the function return value), so the +// comparison reflects what reached storage. +func persistedServer(t *testing.T, srv *Server, name string) *config.ServerConfig { + t.Helper() + cfg := srv.runtime.Config() + require.NotNil(t, cfg, "runtime config must be available") + for _, sc := range cfg.Servers { + if sc != nil && sc.Name == name { + return sc + } + } + t.Fatalf("server %q not found in persisted config", name) + return nil +} + +// canonicalServerJSON serializes a ServerConfig with the per-add timestamps +// zeroed, so byte-comparison reflects only the derived/persisted fields. +func canonicalServerJSON(t *testing.T, sc *config.ServerConfig) string { + t.Helper() + clone := *sc + clone.Created = time.Time{} + clone.Updated = time.Time{} + b, err := json.Marshal(&clone) + require.NoError(t, err) + return string(b) +} diff --git a/internal/upstream/core/monitoring.go b/internal/upstream/core/monitoring.go index 809a684d..0e743cae 100644 --- a/internal/upstream/core/monitoring.go +++ b/internal/upstream/core/monitoring.go @@ -4,6 +4,7 @@ import ( "bufio" "context" "fmt" + "io" "os" "strings" "time" @@ -31,9 +32,17 @@ func (c *Client) StartStderrMonitoring() { return } + // Capture the stderr reader as a local under monitoringMu. connectStdio + // reassigns c.stderr on every (re)connect (connection_stdio.go:217); passing + // the reader as a goroutine arg keeps monitorStderr from reading the shared + // field, so a later reconnect's write never races a lingering monitor's read + // (the connectStdio↔monitorStderr data race, MCP-816). + stderr := c.stderr + // Create context for stderr monitoring. The monitor goroutine receives the - // context and its done channel as locals so an abandoned (timed-out) - // goroutine never reads the shared fields a later Start may overwrite. + // context, stderr reader, and its done channel as locals so an abandoned + // (timed-out) goroutine never reads the shared fields a later Start may + // overwrite. ctx, cancel := context.WithCancel(context.Background()) done := make(chan struct{}) c.stderrMonitoringCtx, c.stderrMonitoringCancel = ctx, cancel @@ -41,7 +50,7 @@ func (c *Client) StartStderrMonitoring() { go func() { defer close(done) - c.monitorStderr(ctx) + c.monitorStderr(ctx, stderr) }() c.logger.Debug("Started stderr monitoring", @@ -165,9 +174,12 @@ func (c *Client) monitorProcess(ctx context.Context) { } } -// monitorStderr monitors stderr output and logs it to both main and server-specific logs -func (c *Client) monitorStderr(ctx context.Context) { - scanner := bufio.NewScanner(c.stderr) +// monitorStderr monitors stderr output and logs it to both main and server-specific logs. +// The stderr reader is passed as an argument (captured under monitoringMu by the +// caller) rather than read from c.stderr, so a concurrent connectStdio reassigning +// the shared field cannot race this goroutine's read (MCP-816). +func (c *Client) monitorStderr(ctx context.Context, stderr io.Reader) { + scanner := bufio.NewScanner(stderr) for scanner.Scan() { select { case <-ctx.Done(): diff --git a/internal/upstream/core/monitoring_race_test.go b/internal/upstream/core/monitoring_race_test.go index 6ab87c5a..20618f7d 100644 --- a/internal/upstream/core/monitoring_race_test.go +++ b/internal/upstream/core/monitoring_race_test.go @@ -48,6 +48,49 @@ func TestStderrMonitoring_StartStopRace(t *testing.T) { c.StopStderrMonitoring() } +// TestStderrMonitoring_ReassignDuringMonitorNoRace reproduces the +// connectStdio(connection_stdio.go:217)-write vs monitorStderr(monitoring.go:170)-read +// data race on the shared c.stderr field, surfaced by TestCrossSurfaceConsistency_RegistryAdd +// in CI (MCP-816 / MCP-809 RV-3). connectStdio reassigns c.stderr on every +// (re)connect, then starts the monitor; the monitor goroutine built its scanner +// from the shared c.stderr field instead of a captured local, so a reconnect's +// field write raced the lingering goroutine's read. Run under `go test -race`: +// trips before the capture-local fix, green after. Empty readers EOF immediately +// so each monitor goroutine exits at once and the loop stays fast. +func TestStderrMonitoring_ReassignDuringMonitorNoRace(t *testing.T) { + c := &Client{ + transportType: transportStdio, + stderr: strings.NewReader(""), + logger: zap.NewNop(), + config: &config.ServerConfig{Name: "race"}, + } + + const iterations = 500 + var wg sync.WaitGroup + wg.Add(2) + + // Mimics connectStdio: reassign c.stderr on each reconnect, then (re)start + // the monitor. The reassignment is the field write the detector flagged at + // connection_stdio.go:217; the monitor goroutine spawned by the previous + // iteration reads c.stderr at monitoring.go:170 concurrently. + go func() { + defer wg.Done() + for i := 0; i < iterations; i++ { + c.stderr = strings.NewReader("") + c.StartStderrMonitoring() + } + }() + go func() { + defer wg.Done() + for i := 0; i < iterations; i++ { + c.StopStderrMonitoring() + } + }() + + wg.Wait() + c.StopStderrMonitoring() +} + // TestStderrMonitoring_AbandonedMonitorNoRace models the round-5 escape: the // monitor goroutine is still alive when Stop is called (its stderr Read blocks), // so Stop hits the 500ms timeout and abandons it. With the old reused-WaitGroup diff --git a/specs/070-registry-easy-upstream-add/spec.md b/specs/070-registry-easy-upstream-add/spec.md index 89e205a6..8cf90f6d 100644 --- a/specs/070-registry-easy-upstream-add/spec.md +++ b/specs/070-registry-easy-upstream-add/spec.md @@ -15,6 +15,10 @@ - Q: Add-from-registry today via MCP? → A: Works, but the agent must hand-construct the upstream config from a search result; a convenience "add from registry result" mode is desired. - Q: Does quarantine-by-default hold on add? → A: Yes, on every surface (they share the core add path); this MUST be preserved as an invariant. +### Session 2026-06-01 (O3 — post-implementation amendment) + +- Amendment (O3): The US1/US2 "gaps" recorded on 2026-05-31 were partly **stale**. During implementation it was found that the Web UI already exposed an "Add to MCP" path and the CLI already *listed and searched* registries; the genuine gap was narrower — each surface re-implemented add logic (including a client-side parse on the Web UI, a CN-001 risk) instead of sharing one path, and the CLI could not add directly *from a search result*. The real work delivered was therefore **de-duplicating every surface (Web UI, CLI, MCP) onto the single `AddServerFromRegistry` keystone core op** (`internal/registries/`), so quarantine-by-default and validation live in exactly one place. This landed in PR #555 / commit `354580f4` and is guarded by a cross-surface consistency regression (`internal/server/consistency_crosssurface_test.go`, T021 / CN-004 / SC-004). The other decision artifacts are already reflected in shipped code — O1 (required-input heuristic), O2 (key-absent skip), O4 (P2 in scope); O3 (this note) was the only outstanding artifact. + ## User Scenarios & Testing *(mandatory)* ### User Story 1 — Add a discovered server to upstream from the Web UI in one flow (Priority: P1)