Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions internal/runtime/supervisor/actor_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,12 @@ func (p *ActorPoolSimple) GetAllStates() map[string]*ServerState {

state := &ServerState{
Name: name,
Config: client.Config,
Enabled: client.Config.Enabled,
Config: client.GetConfig(),
Enabled: client.GetConfig().Enabled,
Connected: connected,
}

if client.Config.Quarantined {
if client.GetConfig().Quarantined {
state.Quarantined = true
}

Expand Down
26 changes: 13 additions & 13 deletions internal/runtime/supervisor/actor_pool_complex_reference.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (
// ActorPool manages the lifecycle of server actors and provides stats for Supervisor.
// This replaces UpstreamAdapter with direct Actor integration (Phase 7.2).
type ActorPool struct {
actors map[string]*actor.Actor
mu sync.RWMutex
logger *zap.Logger
manager *upstream.Manager // Use existing manager for client creation
actors map[string]*actor.Actor
mu sync.RWMutex
logger *zap.Logger
manager *upstream.Manager // Use existing manager for client creation

// Event aggregation
eventCh chan Event
Expand Down Expand Up @@ -218,12 +218,12 @@ func (p *ActorPool) GetServerState(name string) (*ServerState, error) {

state := &ServerState{
Name: name,
Config: client.Config,
Enabled: client.Config.Enabled,
Config: client.GetConfig(),
Enabled: client.GetConfig().Enabled,
Connected: client.IsConnected(),
}

if client.Config.Quarantined {
if client.GetConfig().Quarantined {
state.Quarantined = true
}

Expand Down Expand Up @@ -258,12 +258,12 @@ func (p *ActorPool) GetAllStates() map[string]*ServerState {
connected := client.IsConnected()
state := &ServerState{
Name: name,
Config: client.Config,
Enabled: client.Config.Enabled,
Config: client.GetConfig(),
Enabled: client.GetConfig().Enabled,
Connected: connected,
}

if client.Config.Quarantined {
if client.GetConfig().Quarantined {
state.Quarantined = true
}

Expand Down Expand Up @@ -328,9 +328,9 @@ func (p *ActorPool) forwardActorEvents(name string, a *actor.Actor) {
ServerName: name,
Timestamp: event.Timestamp,
Payload: map[string]interface{}{
"connected": event.State == actor.StateConnected,
"state": string(event.State),
"actor_event": string(event.Type),
"connected": event.State == actor.StateConnected,
"state": string(event.State),
"actor_event": string(event.Type),
},
})
}
Expand Down
2 changes: 1 addition & 1 deletion internal/upstream/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func TestClient_Headers_Support(t *testing.T) {
require.NotNil(t, client)

// Test that headers are stored in config
assert.Equal(t, tt.headers, client.Config.Headers)
assert.Equal(t, tt.headers, client.GetConfig().Headers)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
Expand Down
17 changes: 14 additions & 3 deletions internal/upstream/core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,21 @@ type Client struct {
// Cached tools list from successful immediate call
cachedTools []mcp.Tool

// Stderr monitoring
// monitoringMu serializes the stderr/process monitoring lifecycle methods
// (Start*/Stop*Monitoring). Connect (StartStderrMonitoring) and Disconnect
// (StopStderrMonitoring) can run concurrently on the same client during a
// reconcile-vs-shutdown overlap, racing the ctx/cancel/WaitGroup fields
// below (notably WG.Add vs WG.Wait). This mutex makes start and stop
// mutually exclusive. It is never held across c.mu.
monitoringMu sync.Mutex

// Stderr monitoring. stderrMonitoringDone is a per-cycle channel closed by
// the monitor goroutine when it exits; Stop waits on it instead of a reused
// sync.WaitGroup, so an abandoned (timed-out) wait never races a later
// Start's counter. All three fields are written only under monitoringMu.
stderrMonitoringCtx context.Context
stderrMonitoringCancel context.CancelFunc
stderrMonitoringWG sync.WaitGroup
stderrMonitoringDone chan struct{}

// Ring buffer of recent stderr lines from the subprocess.
// Populated by monitorStderr; surfaced in initialize failure messages so
Expand All @@ -92,7 +103,7 @@ type Client struct {
processGroupID int // Process group ID for proper cleanup
processMonitorCtx context.Context
processMonitorCancel context.CancelFunc
processMonitorWG sync.WaitGroup
processMonitorDone chan struct{}

// Docker container tracking
containerID string
Expand Down
117 changes: 71 additions & 46 deletions internal/upstream/core/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,24 @@ const (

// StartStderrMonitoring starts monitoring stderr output and logging it
func (c *Client) StartStderrMonitoring() {
c.monitoringMu.Lock()
defer c.monitoringMu.Unlock()

if c.stderr == nil || c.transportType != transportStdio {
return
}

// Create context for stderr monitoring
c.stderrMonitoringCtx, c.stderrMonitoringCancel = context.WithCancel(context.Background())
// Create context for stderr monitoring. The monitor goroutine receives the
// context and its done channel as locals so an abandoned (timed-out)
// goroutine never reads the shared fields a later Start may overwrite.
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
c.stderrMonitoringCtx, c.stderrMonitoringCancel = ctx, cancel
c.stderrMonitoringDone = done

c.stderrMonitoringWG.Add(1)
go func() {
defer c.stderrMonitoringWG.Done()
c.monitorStderr()
defer close(done)
c.monitorStderr(ctx)
}()

c.logger.Debug("Started stderr monitoring",
Expand All @@ -43,41 +50,55 @@ func (c *Client) StartStderrMonitoring() {

// StopStderrMonitoring stops stderr monitoring
func (c *Client) StopStderrMonitoring() {
if c.stderrMonitoringCancel != nil {
c.stderrMonitoringCancel()
c.monitoringMu.Lock()
defer c.monitoringMu.Unlock()

// Use a timeout for the wait to prevent hanging
done := make(chan struct{})
go func() {
c.stderrMonitoringWG.Wait()
close(done)
}()
if c.stderrMonitoringCancel == nil {
return
}

select {
case <-done:
c.logger.Debug("Stopped stderr monitoring",
zap.String("server", c.config.Name))
case <-time.After(500 * time.Millisecond):
c.logger.Warn("Stderr monitoring stop timed out after 500ms, forcing shutdown",
zap.String("server", c.config.Name))
}
c.stderrMonitoringCancel()
done := c.stderrMonitoringDone
c.stderrMonitoringCancel = nil
c.stderrMonitoringDone = nil
if done == nil {
return
}

// Wait for the monitor goroutine directly under monitoringMu (no detached
// waiter that could outlive the lock). On timeout the goroutine is abandoned;
// it closes its own done channel and touches only its captured ctx, so it
// cannot race a subsequent Start.
select {
case <-done:
c.logger.Debug("Stopped stderr monitoring",
zap.String("server", c.config.Name))
case <-time.After(500 * time.Millisecond):
c.logger.Warn("Stderr monitoring stop timed out after 500ms, forcing shutdown",
zap.String("server", c.config.Name))
}
}

// StartProcessMonitoring starts monitoring the underlying process
func (c *Client) StartProcessMonitoring() {
c.monitoringMu.Lock()
defer c.monitoringMu.Unlock()

// Start monitoring even if processCmd is nil for Docker containers
if c.processCmd == nil && !c.isDockerCommand {
return
}

// Create context for process monitoring
c.processMonitorCtx, c.processMonitorCancel = context.WithCancel(context.Background())
// Create context for process monitoring (ctx + done passed as locals; see
// StartStderrMonitoring for the abandoned-goroutine rationale).
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
c.processMonitorCtx, c.processMonitorCancel = ctx, cancel
c.processMonitorDone = done

c.processMonitorWG.Add(1)
go func() {
defer c.processMonitorWG.Done()
c.monitorProcess()
defer close(done)
c.monitorProcess(ctx)
}()

if c.processCmd != nil {
Expand All @@ -94,29 +115,33 @@ func (c *Client) StartProcessMonitoring() {

// StopProcessMonitoring stops process monitoring
func (c *Client) StopProcessMonitoring() {
if c.processMonitorCancel != nil {
c.processMonitorCancel()
c.monitoringMu.Lock()
defer c.monitoringMu.Unlock()

// Use a timeout for the wait to prevent hanging
done := make(chan struct{})
go func() {
c.processMonitorWG.Wait()
close(done)
}()
if c.processMonitorCancel == nil {
return
}

select {
case <-done:
c.logger.Debug("Stopped process monitoring",
zap.String("server", c.config.Name))
case <-time.After(500 * time.Millisecond):
c.logger.Warn("Process monitoring stop timed out after 500ms, forcing shutdown",
zap.String("server", c.config.Name))
}
c.processMonitorCancel()
done := c.processMonitorDone
c.processMonitorCancel = nil
c.processMonitorDone = nil
if done == nil {
return
}

select {
case <-done:
c.logger.Debug("Stopped process monitoring",
zap.String("server", c.config.Name))
case <-time.After(500 * time.Millisecond):
c.logger.Warn("Process monitoring stop timed out after 500ms, forcing shutdown",
zap.String("server", c.config.Name))
}
}

// monitorProcess monitors the underlying process health
func (c *Client) monitorProcess() {
func (c *Client) monitorProcess(ctx context.Context) {
// Only return early if we have neither processCmd nor Docker command
if c.processCmd == nil && !c.isDockerCommand {
return
Expand All @@ -130,7 +155,7 @@ func (c *Client) monitorProcess() {

for {
select {
case <-c.processMonitorCtx.Done():
case <-ctx.Done():
return
case <-ticker.C:
if isDocker {
Expand All @@ -141,11 +166,11 @@ func (c *Client) monitorProcess() {
}

// monitorStderr monitors stderr output and logs it to both main and server-specific logs
func (c *Client) monitorStderr() {
func (c *Client) monitorStderr(ctx context.Context) {
scanner := bufio.NewScanner(c.stderr)
for scanner.Scan() {
select {
case <-c.stderrMonitoringCtx.Done():
case <-ctx.Done():
return
default:
line := strings.TrimSpace(scanner.Text())
Expand Down
86 changes: 86 additions & 0 deletions internal/upstream/core/monitoring_race_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package core

import (
"io"
"strings"
"sync"
"testing"

"go.uber.org/zap"

"github.com/smart-mcp-proxy/mcpproxy-go/internal/config"
)

// TestStderrMonitoring_StartStopRace reproduces the Connect-vs-Disconnect race
// on the stderr-monitoring lifecycle fields (stderrMonitoringCtx/Cancel/WG).
// StartStderrMonitoring runs from connectStdio during a reconcile-driven Connect
// while StopStderrMonitoring runs from Disconnect during Manager.ShutdownAll, with
// no synchronization on those fields — the -race detector flags WG.Add (Start)
// vs WG.Wait (Stop). Run under `go test -race`: trips without monitoringMu, green
// with it. A reused empty stderr reader returns EOF immediately so monitorStderr
// exits at once and the loop stays fast.
func TestStderrMonitoring_StartStopRace(t *testing.T) {
c := &Client{
transportType: transportStdio,
stderr: strings.NewReader(""),
logger: zap.NewNop(),
config: &config.ServerConfig{Name: "race"},
}

const iterations = 500
var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()
for i := 0; i < iterations; i++ {
c.StartStderrMonitoring()
}
}()
go func() {
defer wg.Done()
for i := 0; i < iterations; i++ {
c.StopStderrMonitoring()
}
}()

wg.Wait()
c.StopStderrMonitoring()
}

// TestStderrMonitoring_AbandonedMonitorNoRace models the round-5 escape: the
// monitor goroutine is still alive when Stop is called (its stderr Read blocks),
// so Stop hits the 500ms timeout and abandons it. With the old reused-WaitGroup
// design the abandoned WG.Wait raced the next cycle's WG.Add; the per-cycle done
// channel + ctx-as-param design must keep concurrent Start/Stop race-free even
// while a prior monitor lingers. A blocking pipe keeps monitorStderr alive;
// closing the writer on cleanup lets the leaked goroutines exit.
func TestStderrMonitoring_AbandonedMonitorNoRace(t *testing.T) {
pr, pw := io.Pipe()
t.Cleanup(func() { _ = pw.Close() })

c := &Client{
transportType: transportStdio,
stderr: pr, // Read blocks until the writer is closed
logger: zap.NewNop(),
config: &config.ServerConfig{Name: "race"},
}

const cycles = 4 // each Stop times out at 500ms; keep small
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < cycles; i++ {
c.StartStderrMonitoring()
}
}()
go func() {
defer wg.Done()
for i := 0; i < cycles; i++ {
c.StopStderrMonitoring()
}
}()
wg.Wait()
c.StopStderrMonitoring()
}
Loading
Loading