Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions internal/runtime/configsvc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,23 +200,53 @@ func (s *Service) Subscribe(ctx context.Context) <-chan Update {
s.logger.Debug("New configuration subscriber",
zap.Int("total_subscribers", len(s.subscribers)))

// Send initial snapshot to new subscriber
// Send initial snapshot to new subscriber.
go func() {
// Best-effort early-out if the subscriber canceled before we ran.
select {
case <-ctx.Done():
s.Unsubscribe(ch)
return
default:
}

// Deliver the initial snapshot under subMu(R) so Close()/Unsubscribe()
// (which close ch under the subMu write lock) cannot race or panic this
// send — a send on a closed channel both data-races and panics. The
// membership check covers both teardown paths: Close() nils the slice and
// Unsubscribe() removes this ch. The buffer (cap 10) is empty at subscribe
// time, so a non-blocking send always reaches a still-live subscriber;
// staying non-blocking avoids holding the lock across a blocking send.
s.subMu.RLock()
defer s.subMu.RUnlock()
if !s.isSubscribedLocked(ch) {
return
}
select {
case ch <- Update{
Snapshot: s.Current(),
Type: UpdateTypeInit,
ChangedAt: time.Now(),
Source: "subscription",
}:
case <-ctx.Done():
s.Unsubscribe(ch)
default:
}
}()

return ch
}

// isSubscribedLocked reports whether ch is still a live subscriber. Callers must
// hold subMu (read or write).
func (s *Service) isSubscribedLocked(ch chan Update) bool {
for _, sub := range s.subscribers {
if sub == ch {
return true
}
}
return false
}

// Unsubscribe removes a subscriber channel and closes it.
func (s *Service) Unsubscribe(ch <-chan Update) {
s.subMu.Lock()
Expand Down
29 changes: 29 additions & 0 deletions internal/runtime/configsvc/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package configsvc

import (
"context"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -328,6 +329,34 @@ func TestSnapshot_ServerNames(t *testing.T) {
}
}

// TestService_SubscribeCloseRace reproduces the close-vs-send data race surfaced
// by TestHandleUpstreamServers_AddFromRegistry_* under -race (MCP-816 / MCP-809
// RV-3). Subscribe spawns a goroutine that sends the initial snapshot on the
// subscriber channel (service.go:206) while holding no lock; Close (service.go:261)
// closes that same channel under subMu. A send racing the close both data-races
// and can panic ("send on closed channel"). Run under `go test -race`: trips
// before the lock+membership-guarded send, green after. Run with high parallelism
// so a Subscribe-init send overlaps the Close in most iterations.
func TestService_SubscribeCloseRace(t *testing.T) {
const iterations = 200

var wg sync.WaitGroup
for i := 0; i < iterations; i++ {
wg.Add(1)
go func() {
defer wg.Done()
svc := NewService(&config.Config{Listen: "127.0.0.1:8080"}, "/tmp/config.json", zap.NewNop())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Subscribe schedules the init-snapshot send goroutine; Close races it.
_ = svc.Subscribe(ctx)
svc.Close()
}()
}
wg.Wait()
}

func TestService_Close(t *testing.T) {
cfg := &config.Config{
Listen: "127.0.0.1:8080",
Expand Down
165 changes: 165 additions & 0 deletions internal/server/consistency_crosssurface_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package server

import (
"bytes"
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/smart-mcp-proxy/mcpproxy-go/internal/config"
"github.com/smart-mcp-proxy/mcpproxy-go/internal/contracts"
"github.com/smart-mcp-proxy/mcpproxy-go/internal/httpapi"
)

// Spec 070 keystone regression (T021 / CN-004 / FR-010 / SC-004).
//
// Every add surface (REST, MCP, CLI) funnels through the single keystone
// AddServerFromRegistryRef, so the registry-result -> config.ServerConfig
// normalization lives in exactly one place. This test is the guard that keeps
// it that way: it drives the SAME logical add — same (registry, serverId, name,
// env) — through each surface against its own isolated server, then asserts the
// PERSISTED config.ServerConfig is byte-identical across all three (modulo the
// Created/Updated timestamps) and that every one is quarantined (SC-004).
//
// If a future change lets one surface bypass the keystone (e.g. the Web UI's
// old client-side install_cmd parsing, or a surface that forgets the quarantine
// default), the persisted configs diverge and this test fails.
//
// Surfaces exercised in-process:
// - MCP: the real upstream_servers handler (operation=add_from_registry),
// which extracts args from the MCP request (note: env arrives as env_json).
// - REST: a real HTTP POST to the actual chi router handler
// (POST /api/v1/registries/{id}/servers/{serverId}/add), exercising JSON
// body decode + URL param extraction + auth.
// - CLI add path: the CLI is a thin HTTP client of the REST route, so its
// config-derivation contribution bottoms out at the same controller method
// (AddServerFromRegistryRef). The full CLI binary->daemon path is separately
// covered end-to-end by TestRegistryAddCLIE2E.
func TestCrossSurfaceConsistency_RegistryAdd(t *testing.T) {
// One stdio entry whose install command declares a required input
// (${API_KEY}); supplying it via env exercises required-input satisfaction
// AND env carry-through on the persisted config.
servers := []map[string]interface{}{
{"id": "everything", "name": "everything", "installCmd": "npx -y srv --key ${API_KEY}"},
}
startTestRegistry(t, servers) // registers id="testreg" against a local httptest server

const (
regID = "testreg"
serverID = "everything"
addName = "consistency-srv"
)
env := map[string]string{"API_KEY": "secret-123"}

// --- Surface 1: MCP -------------------------------------------------------
srvMCP := newConsistencyServer(t)
proxy := createTestMCPProxyServer(t)
proxy.mainServer = srvMCP

envJSON, err := json.Marshal(env)
require.NoError(t, err)
mcpResult := callAddFromRegistry(t, proxy, map[string]interface{}{
"operation": "add_from_registry",
"registry": regID,
"id": serverID,
"name": addName,
"env_json": string(envJSON),
})
require.False(t, mcpResult.IsError, "MCP add must succeed: %v", mcpResult.Content)
mcpCfg := persistedServer(t, srvMCP, addName)

// --- Surface 2: REST (real HTTP through the chi router) -------------------
srvREST := newConsistencyServer(t)
api := httpapi.NewServer(srvREST, zap.NewNop().Sugar(), nil)

body, err := json.Marshal(contracts.AddFromRegistryRequest{Name: addName, Env: env})
require.NoError(t, err)
req := httptest.NewRequest(http.MethodPost,
"/api/v1/registries/"+regID+"/servers/"+serverID+"/add", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-API-Key", consistencyAPIKey)
rec := httptest.NewRecorder()
api.Router().ServeHTTP(rec, req)
require.Equal(t, http.StatusOK, rec.Code, "REST add must succeed: %s", rec.Body.String())
restCfg := persistedServer(t, srvREST, addName)

// --- Surface 3: CLI add path (shared controller bottom) -------------------
srvCLI := newConsistencyServer(t)
cliCfg, rerr, err := srvCLI.AddServerFromRegistryRef(context.Background(), regID, serverID, addName, env, nil)
require.NoError(t, err)
require.Nil(t, rerr)
require.NotNil(t, cliCfg)
cliPersisted := persistedServer(t, srvCLI, addName)

// --- Cross-surface byte-identity (CN-004) ---------------------------------
mcpJSON := canonicalServerJSON(t, mcpCfg)
restJSON := canonicalServerJSON(t, restCfg)
cliJSON := canonicalServerJSON(t, cliPersisted)

assert.Equal(t, mcpJSON, restJSON, "REST add must persist a byte-identical config to MCP add")
assert.Equal(t, mcpJSON, cliJSON, "CLI add path must persist a byte-identical config to MCP add")

// --- Quarantine invariant (SC-004 / CN-002) -------------------------------
assert.True(t, mcpCfg.Quarantined, "MCP-added server must be quarantined")
assert.True(t, restCfg.Quarantined, "REST-added server must be quarantined")
assert.True(t, cliPersisted.Quarantined, "CLI-added server must be quarantined")

// --- Sanity on the shared derivation -------------------------------------
assert.Equal(t, "stdio", mcpCfg.Protocol)
assert.Equal(t, "npx", mcpCfg.Command)
assert.Equal(t, []string{"-y", "srv", "--key", "${API_KEY}"}, mcpCfg.Args)
assert.Equal(t, "secret-123", mcpCfg.Env["API_KEY"])
assert.True(t, mcpCfg.Enabled)
}

const consistencyAPIKey = "t021-consistency-key"

// newConsistencyServer builds an isolated *Server (own data dir + storage) with
// a known API key so the REST surface can authenticate. The storage handle is
// closed on cleanup so the temp-dir removal succeeds on Windows.
func newConsistencyServer(t *testing.T) *Server {
t.Helper()
cfg := config.DefaultConfig()
cfg.DataDir = t.TempDir()
cfg.Listen = "127.0.0.1:0"
cfg.APIKey = consistencyAPIKey
srv, err := NewServer(cfg, zap.NewNop())
require.NoError(t, err)
t.Cleanup(func() { _ = srv.Shutdown() })
return srv
}

// persistedServer reads the actually-persisted ServerConfig back from the
// server's live config snapshot (not the function return value), so the
// comparison reflects what reached storage.
func persistedServer(t *testing.T, srv *Server, name string) *config.ServerConfig {
t.Helper()
cfg := srv.runtime.Config()
require.NotNil(t, cfg, "runtime config must be available")
for _, sc := range cfg.Servers {
if sc != nil && sc.Name == name {
return sc
}
}
t.Fatalf("server %q not found in persisted config", name)
return nil
}

// canonicalServerJSON serializes a ServerConfig with the per-add timestamps
// zeroed, so byte-comparison reflects only the derived/persisted fields.
func canonicalServerJSON(t *testing.T, sc *config.ServerConfig) string {
t.Helper()
clone := *sc
clone.Created = time.Time{}
clone.Updated = time.Time{}
b, err := json.Marshal(&clone)
require.NoError(t, err)
return string(b)
}
24 changes: 18 additions & 6 deletions internal/upstream/core/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"context"
"fmt"
"io"
"os"
"strings"
"time"
Expand Down Expand Up @@ -31,17 +32,25 @@ func (c *Client) StartStderrMonitoring() {
return
}

// Capture the stderr reader as a local under monitoringMu. connectStdio
// reassigns c.stderr on every (re)connect (connection_stdio.go:217); passing
// the reader as a goroutine arg keeps monitorStderr from reading the shared
// field, so a later reconnect's write never races a lingering monitor's read
// (the connectStdio↔monitorStderr data race, MCP-816).
stderr := c.stderr

// Create context for stderr monitoring. The monitor goroutine receives the
// context and its done channel as locals so an abandoned (timed-out)
// goroutine never reads the shared fields a later Start may overwrite.
// context, stderr reader, and its done channel as locals so an abandoned
// (timed-out) goroutine never reads the shared fields a later Start may
// overwrite.
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
c.stderrMonitoringCtx, c.stderrMonitoringCancel = ctx, cancel
c.stderrMonitoringDone = done

go func() {
defer close(done)
c.monitorStderr(ctx)
c.monitorStderr(ctx, stderr)
}()

c.logger.Debug("Started stderr monitoring",
Expand Down Expand Up @@ -165,9 +174,12 @@ func (c *Client) monitorProcess(ctx context.Context) {
}
}

// monitorStderr monitors stderr output and logs it to both main and server-specific logs
func (c *Client) monitorStderr(ctx context.Context) {
scanner := bufio.NewScanner(c.stderr)
// monitorStderr monitors stderr output and logs it to both main and server-specific logs.
// The stderr reader is passed as an argument (captured under monitoringMu by the
// caller) rather than read from c.stderr, so a concurrent connectStdio reassigning
// the shared field cannot race this goroutine's read (MCP-816).
func (c *Client) monitorStderr(ctx context.Context, stderr io.Reader) {
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
select {
case <-ctx.Done():
Expand Down
43 changes: 43 additions & 0 deletions internal/upstream/core/monitoring_race_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,49 @@ func TestStderrMonitoring_StartStopRace(t *testing.T) {
c.StopStderrMonitoring()
}

// TestStderrMonitoring_ReassignDuringMonitorNoRace reproduces the
// connectStdio(connection_stdio.go:217)-write vs monitorStderr(monitoring.go:170)-read
// data race on the shared c.stderr field, surfaced by TestCrossSurfaceConsistency_RegistryAdd
// in CI (MCP-816 / MCP-809 RV-3). connectStdio reassigns c.stderr on every
// (re)connect, then starts the monitor; the monitor goroutine built its scanner
// from the shared c.stderr field instead of a captured local, so a reconnect's
// field write raced the lingering goroutine's read. Run under `go test -race`:
// trips before the capture-local fix, green after. Empty readers EOF immediately
// so each monitor goroutine exits at once and the loop stays fast.
func TestStderrMonitoring_ReassignDuringMonitorNoRace(t *testing.T) {
c := &Client{
transportType: transportStdio,
stderr: strings.NewReader(""),
logger: zap.NewNop(),
config: &config.ServerConfig{Name: "race"},
}

const iterations = 500
var wg sync.WaitGroup
wg.Add(2)

// Mimics connectStdio: reassign c.stderr on each reconnect, then (re)start
// the monitor. The reassignment is the field write the detector flagged at
// connection_stdio.go:217; the monitor goroutine spawned by the previous
// iteration reads c.stderr at monitoring.go:170 concurrently.
go func() {
defer wg.Done()
for i := 0; i < iterations; i++ {
c.stderr = strings.NewReader("")
c.StartStderrMonitoring()
}
}()
go func() {
defer wg.Done()
for i := 0; i < iterations; i++ {
c.StopStderrMonitoring()
}
}()

wg.Wait()
c.StopStderrMonitoring()
}

// TestStderrMonitoring_AbandonedMonitorNoRace models the round-5 escape: the
// monitor goroutine is still alive when Stop is called (its stderr Read blocks),
// so Stop hits the 500ms timeout and abandons it. With the old reused-WaitGroup
Expand Down
4 changes: 4 additions & 0 deletions specs/070-registry-easy-upstream-add/spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
- Q: Add-from-registry today via MCP? → A: Works, but the agent must hand-construct the upstream config from a search result; a convenience "add from registry result" mode is desired.
- Q: Does quarantine-by-default hold on add? → A: Yes, on every surface (they share the core add path); this MUST be preserved as an invariant.

### Session 2026-06-01 (O3 — post-implementation amendment)

- Amendment (O3): The US1/US2 "gaps" recorded on 2026-05-31 were partly **stale**. During implementation it was found that the Web UI already exposed an "Add to MCP" path and the CLI already *listed and searched* registries; the genuine gap was narrower — each surface re-implemented add logic (including a client-side parse on the Web UI, a CN-001 risk) instead of sharing one path, and the CLI could not add directly *from a search result*. The real work delivered was therefore **de-duplicating every surface (Web UI, CLI, MCP) onto the single `AddServerFromRegistry` keystone core op** (`internal/registries/`), so quarantine-by-default and validation live in exactly one place. This landed in PR #555 / commit `354580f4` and is guarded by a cross-surface consistency regression (`internal/server/consistency_crosssurface_test.go`, T021 / CN-004 / SC-004). The other decision artifacts are already reflected in shipped code — O1 (required-input heuristic), O2 (key-absent skip), O4 (P2 in scope); O3 (this note) was the only outstanding artifact.

## User Scenarios & Testing *(mandatory)*

### User Story 1 — Add a discovered server to upstream from the Web UI in one flow (Priority: P1)
Expand Down
Loading