Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions cmd/thv-operator/controllers/mcpserver_runconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,8 @@ func (r *MCPServerReconciler) createRunConfigFromMCPServer(m *mcpv1alpha1.MCPSer
return runConfig, nil
}

// populateScalingConfig sets BackendReplicas and SessionRedis on the RunConfig from the MCPServer spec.
// Fields are only set when present in the spec; nil means "not configured" and is left as-is.
// populateScalingConfig sets BackendReplicas, SessionRedis, and HeadlessService on the RunConfig
// from the MCPServer spec. Fields are only set when present in the spec; nil means "not configured".
func populateScalingConfig(runConfig *runner.RunConfig, m *mcpv1alpha1.MCPServer) {
hasBackendReplicas := m.Spec.BackendReplicas != nil
hasRedis := m.Spec.SessionStorage != nil && m.Spec.SessionStorage.Provider == mcpv1alpha1.SessionStorageProviderRedis
Expand All @@ -305,6 +305,18 @@ func populateScalingConfig(runConfig *runner.RunConfig, m *mcpv1alpha1.MCPServer
if hasBackendReplicas {
val := *m.Spec.BackendReplicas
runConfig.ScalingConfig.BackendReplicas = &val

// Always populate headless service config when BackendReplicas is set.
// This enables the proxy runner to route each session to a specific pod via
// headless DNS (e.g. myserver-0.mcp-myserver-headless.default.svc.cluster.local)
// so sessions survive proxy-runner restarts. For single-replica StatefulSets,
// ordinal 0 is always selected deterministically.
runConfig.ScalingConfig.HeadlessService = &transporttypes.HeadlessServiceConfig{
StatefulSetName: m.Name,
ServiceName: fmt.Sprintf("mcp-%s-headless", m.Name),
Namespace: m.Namespace,
Replicas: val,
}
}

if hasRedis {
Expand Down
25 changes: 25 additions & 0 deletions docs/server/docs.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions docs/server/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions docs/server/swagger.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/container/kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func buildStatefulSetSpec(
spec := appsv1apply.StatefulSetSpec().
WithSelector(metav1apply.LabelSelector().
WithMatchLabels(map[string]string{"app": containerName})).
WithServiceName(containerName).
WithServiceName(fmt.Sprintf("mcp-%s-headless", containerName)).
WithTemplate(podTemplateSpec)
Comment on lines 447 to 451
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change updates StatefulSet spec.serviceName. Kubernetes treats several StatefulSet spec fields (including serviceName) as immutable; if so, server-side apply on an existing StatefulSet will fail during upgrades and the DNS fix won't take effect without a recreate. Consider adding a migration path (detect mismatch and recreate) or document the required rollout step for existing MCPServers.

Copilot uses AI. Check for mistakes.
if options != nil && options.ScalingConfig != nil && options.ScalingConfig.BackendReplicas != nil {
spec = spec.WithReplicas(*options.ScalingConfig.BackendReplicas)
Expand Down
6 changes: 6 additions & 0 deletions pkg/runner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,12 @@ type ScalingConfig struct {
// The Redis password is not included — it is injected as env var THV_SESSION_REDIS_PASSWORD.
// +optional
SessionRedis *SessionRedisConfig `json:"session_redis,omitempty" yaml:"session_redis,omitempty"`

// HeadlessService holds the information needed to construct pod-specific headless DNS URLs
// for session-affinity routing in Kubernetes StatefulSet deployments.
// Populated by the operator whenever BackendReplicas is set (including single-replica).
// +optional
HeadlessService *types.HeadlessServiceConfig `json:"headless_service,omitempty" yaml:"headless_service,omitempty"`
}

// SessionRedisConfig contains non-sensitive Redis connection parameters used for distributed
Expand Down
8 changes: 7 additions & 1 deletion pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/stacklok/toolhive/pkg/transport"
"github.com/stacklok/toolhive/pkg/transport/session"
"github.com/stacklok/toolhive/pkg/transport/types"
vmcpconfig "github.com/stacklok/toolhive/pkg/vmcp/config"
"github.com/stacklok/toolhive/pkg/workloads/statuses"
)

Expand Down Expand Up @@ -354,6 +355,11 @@ func (r *Runner) Run(ctx context.Context) error {
}
}

// Enable pod-specific session routing for Kubernetes StatefulSet backends.
if r.Config.ScalingConfig != nil && r.Config.ScalingConfig.HeadlessService != nil {
transportConfig.HeadlessService = r.Config.ScalingConfig.HeadlessService
}

// When Redis session storage is configured, create a Redis-backed session store
// so sessions are shared across proxy replicas instead of being pod-local.
if r.Config.ScalingConfig != nil && r.Config.ScalingConfig.SessionRedis != nil {
Expand All @@ -364,7 +370,7 @@ func (r *Runner) Run(ctx context.Context) error {
}
storage, err := session.NewRedisStorage(ctx, session.RedisConfig{
Addr: redisCfg.Address,
Password: os.Getenv(session.RedisPasswordEnvVar),
Password: os.Getenv(vmcpconfig.RedisPasswordEnvVar),
DB: int(redisCfg.DB),
KeyPrefix: keyPrefix,
}, session.DefaultSessionTTL)
Expand Down
2 changes: 2 additions & 0 deletions pkg/transport/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (*Factory) Create(config types.Config, opts ...Option) (types.Transport, er
config.Middlewares...,
)
httpTransport.sessionStorage = config.SessionStorage
httpTransport.headlessService = config.HeadlessService
tr = httpTransport
case types.TransportTypeStreamableHTTP:
httpTransport := NewHTTPTransport(
Expand All @@ -91,6 +92,7 @@ func (*Factory) Create(config types.Config, opts ...Option) (types.Transport, er
config.Middlewares...,
)
httpTransport.sessionStorage = config.SessionStorage
httpTransport.headlessService = config.HeadlessService
tr = httpTransport
case types.TransportTypeInspector:
// HTTP transport is not implemented yet
Expand Down
88 changes: 48 additions & 40 deletions pkg/transport/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ type HTTPTransport struct {
// Mutex for protecting shared state
mutex sync.Mutex

// headlessService configures pod-specific routing for Kubernetes StatefulSet deployments.
headlessService *types.HeadlessServiceConfig

// sessionStorage overrides the default in-memory session store when set.
// Used for Redis-backed session sharing across replicas.
sessionStorage session.Storage
Expand Down Expand Up @@ -239,6 +242,33 @@ func (t *HTTPTransport) setTargetURI(targetURI string) {
t.targetURI = targetURI
}

// resolveTargetURI determines the proxy target URI, base path, and raw query from the
// transport configuration. For remote MCP servers it parses the remote URL; for local
// containers it returns the pre-configured targetURI.
func (t *HTTPTransport) resolveTargetURI() (targetURI, remoteBasePath, remoteRawQuery string, err error) {
if t.remoteURL != "" {
remoteURL, err := url.Parse(t.remoteURL)
if err != nil {
return "", "", "", fmt.Errorf("failed to parse remote URL: %w", err)
}
targetURI = (&url.URL{Scheme: remoteURL.Scheme, Host: remoteURL.Host}).String()
remoteBasePath = remoteURL.Path
remoteRawQuery = remoteURL.RawQuery
slog.Debug("setting up transparent proxy to forward to remote URL",
"port", t.proxyPort, "target", targetURI, "base_path", remoteBasePath, "raw_query", remoteRawQuery)
return targetURI, remoteBasePath, remoteRawQuery, nil
}
if t.containerName == "" {
return "", "", "", transporterrors.ErrContainerNameNotSet
}
if t.targetURI == "" {
return "", "", "", fmt.Errorf("target URI not set for HTTP transport")
}
slog.Debug("setting up transparent proxy to forward to target",
"port", t.proxyPort, "target", t.targetURI)
return t.targetURI, "", "", nil
}

// Start initializes the transport and begins processing messages.
// The transport is responsible for starting the container.
//
Expand All @@ -251,52 +281,15 @@ func (t *HTTPTransport) Start(ctx context.Context) error {
return fmt.Errorf("container deployer not set")
}

// Determine target URI
var targetURI string

// remoteBasePath holds the path component from the remote URL (e.g., "/v2" from
// "https://mcp.asana.com/v2/mcp"). This must be prepended to incoming request
// paths so they reach the correct endpoint on the remote server.
var remoteBasePath string

// remoteRawQuery holds the raw query string from the remote URL (e.g.,
// "toolsets=core,alerting" from "https://mcp.example.com/mcp?toolsets=core,alerting").
// This must be forwarded on every outbound request or it is silently dropped.
var remoteRawQuery string

if t.remoteURL != "" {
// For remote MCP servers, construct target URI from remote URL
remoteURL, err := url.Parse(t.remoteURL)
if err != nil {
return fmt.Errorf("failed to parse remote URL: %w", err)
}
targetURI = (&url.URL{
Scheme: remoteURL.Scheme,
Host: remoteURL.Host,
}).String()

// Extract the path prefix that needs to be prepended to incoming requests.
// The target URI only has scheme+host, so without this the remote path is lost.
remoteBasePath = remoteURL.Path

remoteRawQuery = remoteURL.RawQuery

//nolint:gosec // G706: logging proxy port and remote URL from config
slog.Debug("setting up transparent proxy to forward to remote URL",
"port", t.proxyPort, "target", targetURI, "base_path", remoteBasePath, "raw_query", remoteRawQuery)
} else {
if t.containerName == "" {
return transporterrors.ErrContainerNameNotSet
}

// For local containers, use the configured target URI
if t.targetURI == "" {
return fmt.Errorf("target URI not set for HTTP transport")
}
targetURI = t.targetURI
//nolint:gosec // G706: logging proxy port and target URI from config
slog.Debug("setting up transparent proxy to forward to target",
"port", t.proxyPort, "target", targetURI)
targetURI, remoteBasePath, remoteRawQuery, err := t.resolveTargetURI()
if err != nil {
return err
}

// Create middlewares slice
Expand Down Expand Up @@ -330,6 +323,21 @@ func (t *HTTPTransport) Start(ctx context.Context) error {
proxyOptions = append(proxyOptions, transparent.WithSessionStorage(t.sessionStorage))
}

// Inject Redis-backed session storage for cross-replica session sharing.
if t.sessionStorage != nil {
proxyOptions = append(proxyOptions, transparent.WithSessionStorage(t.sessionStorage))
}

Comment on lines +326 to +330
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

proxyOptions appends WithSessionStorage(t.sessionStorage) twice. WithSessionStorage stops and recreates the sessionManager, so applying it twice is redundant work and can have side effects (extra Stop()/recreate) during proxy construction. Remove the duplicate block and keep a single append when sessionStorage != nil.

Suggested change
// Inject Redis-backed session storage for cross-replica session sharing.
if t.sessionStorage != nil {
proxyOptions = append(proxyOptions, transparent.WithSessionStorage(t.sessionStorage))
}

Copilot uses AI. Check for mistakes.
// Enable pod-specific routing for Kubernetes StatefulSet backends.
// When configured, each new session is pinned to a specific pod via headless DNS
// so that session routing survives proxy-runner restarts.
if t.headlessService != nil {
proxyOptions = append(proxyOptions, transparent.WithPodHeadlessService(
t.headlessService.StatefulSetName, t.headlessService.ServiceName,
t.headlessService.Namespace, t.headlessService.Replicas,
))
}

// Create the transparent proxy
t.proxy = transparent.NewTransparentProxyWithOptions(
t.host,
Expand Down
Loading
Loading