-
Notifications
You must be signed in to change notification settings - Fork 207
Route MCP sessions to specific StatefulSet pods via headless DNS #4638
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -74,6 +74,9 @@ type HTTPTransport struct { | |||||||||
| // Mutex for protecting shared state | ||||||||||
| mutex sync.Mutex | ||||||||||
|
|
||||||||||
| // headlessService configures pod-specific routing for Kubernetes StatefulSet deployments. | ||||||||||
| headlessService *types.HeadlessServiceConfig | ||||||||||
|
|
||||||||||
| // sessionStorage overrides the default in-memory session store when set. | ||||||||||
| // Used for Redis-backed session sharing across replicas. | ||||||||||
| sessionStorage session.Storage | ||||||||||
|
|
@@ -239,6 +242,33 @@ func (t *HTTPTransport) setTargetURI(targetURI string) { | |||||||||
| t.targetURI = targetURI | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // resolveTargetURI determines the proxy target URI, base path, and raw query from the | ||||||||||
| // transport configuration. For remote MCP servers it parses the remote URL; for local | ||||||||||
| // containers it returns the pre-configured targetURI. | ||||||||||
| func (t *HTTPTransport) resolveTargetURI() (targetURI, remoteBasePath, remoteRawQuery string, err error) { | ||||||||||
| if t.remoteURL != "" { | ||||||||||
| remoteURL, err := url.Parse(t.remoteURL) | ||||||||||
| if err != nil { | ||||||||||
| return "", "", "", fmt.Errorf("failed to parse remote URL: %w", err) | ||||||||||
| } | ||||||||||
| targetURI = (&url.URL{Scheme: remoteURL.Scheme, Host: remoteURL.Host}).String() | ||||||||||
| remoteBasePath = remoteURL.Path | ||||||||||
| remoteRawQuery = remoteURL.RawQuery | ||||||||||
| slog.Debug("setting up transparent proxy to forward to remote URL", | ||||||||||
| "port", t.proxyPort, "target", targetURI, "base_path", remoteBasePath, "raw_query", remoteRawQuery) | ||||||||||
| return targetURI, remoteBasePath, remoteRawQuery, nil | ||||||||||
| } | ||||||||||
| if t.containerName == "" { | ||||||||||
| return "", "", "", transporterrors.ErrContainerNameNotSet | ||||||||||
| } | ||||||||||
| if t.targetURI == "" { | ||||||||||
| return "", "", "", fmt.Errorf("target URI not set for HTTP transport") | ||||||||||
| } | ||||||||||
| slog.Debug("setting up transparent proxy to forward to target", | ||||||||||
| "port", t.proxyPort, "target", t.targetURI) | ||||||||||
| return t.targetURI, "", "", nil | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // Start initializes the transport and begins processing messages. | ||||||||||
| // The transport is responsible for starting the container. | ||||||||||
| // | ||||||||||
|
|
@@ -251,52 +281,15 @@ func (t *HTTPTransport) Start(ctx context.Context) error { | |||||||||
| return fmt.Errorf("container deployer not set") | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // Determine target URI | ||||||||||
| var targetURI string | ||||||||||
|
|
||||||||||
| // remoteBasePath holds the path component from the remote URL (e.g., "/v2" from | ||||||||||
| // "https://mcp.asana.com/v2/mcp"). This must be prepended to incoming request | ||||||||||
| // paths so they reach the correct endpoint on the remote server. | ||||||||||
| var remoteBasePath string | ||||||||||
|
|
||||||||||
| // remoteRawQuery holds the raw query string from the remote URL (e.g., | ||||||||||
| // "toolsets=core,alerting" from "https://mcp.example.com/mcp?toolsets=core,alerting"). | ||||||||||
| // This must be forwarded on every outbound request or it is silently dropped. | ||||||||||
| var remoteRawQuery string | ||||||||||
|
|
||||||||||
| if t.remoteURL != "" { | ||||||||||
| // For remote MCP servers, construct target URI from remote URL | ||||||||||
| remoteURL, err := url.Parse(t.remoteURL) | ||||||||||
| if err != nil { | ||||||||||
| return fmt.Errorf("failed to parse remote URL: %w", err) | ||||||||||
| } | ||||||||||
| targetURI = (&url.URL{ | ||||||||||
| Scheme: remoteURL.Scheme, | ||||||||||
| Host: remoteURL.Host, | ||||||||||
| }).String() | ||||||||||
|
|
||||||||||
| // Extract the path prefix that needs to be prepended to incoming requests. | ||||||||||
| // The target URI only has scheme+host, so without this the remote path is lost. | ||||||||||
| remoteBasePath = remoteURL.Path | ||||||||||
|
|
||||||||||
| remoteRawQuery = remoteURL.RawQuery | ||||||||||
|
|
||||||||||
| //nolint:gosec // G706: logging proxy port and remote URL from config | ||||||||||
| slog.Debug("setting up transparent proxy to forward to remote URL", | ||||||||||
| "port", t.proxyPort, "target", targetURI, "base_path", remoteBasePath, "raw_query", remoteRawQuery) | ||||||||||
| } else { | ||||||||||
| if t.containerName == "" { | ||||||||||
| return transporterrors.ErrContainerNameNotSet | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // For local containers, use the configured target URI | ||||||||||
| if t.targetURI == "" { | ||||||||||
| return fmt.Errorf("target URI not set for HTTP transport") | ||||||||||
| } | ||||||||||
| targetURI = t.targetURI | ||||||||||
| //nolint:gosec // G706: logging proxy port and target URI from config | ||||||||||
| slog.Debug("setting up transparent proxy to forward to target", | ||||||||||
| "port", t.proxyPort, "target", targetURI) | ||||||||||
| targetURI, remoteBasePath, remoteRawQuery, err := t.resolveTargetURI() | ||||||||||
| if err != nil { | ||||||||||
| return err | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // Create middlewares slice | ||||||||||
|
|
@@ -330,6 +323,21 @@ func (t *HTTPTransport) Start(ctx context.Context) error { | |||||||||
| proxyOptions = append(proxyOptions, transparent.WithSessionStorage(t.sessionStorage)) | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // Inject Redis-backed session storage for cross-replica session sharing. | ||||||||||
| if t.sessionStorage != nil { | ||||||||||
| proxyOptions = append(proxyOptions, transparent.WithSessionStorage(t.sessionStorage)) | ||||||||||
| } | ||||||||||
|
|
||||||||||
|
Comment on lines
+326
to
+330
|
||||||||||
| // Inject Redis-backed session storage for cross-replica session sharing. | |
| if t.sessionStorage != nil { | |
| proxyOptions = append(proxyOptions, transparent.WithSessionStorage(t.sessionStorage)) | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change updates StatefulSet spec.serviceName. Kubernetes treats several StatefulSet spec fields (including serviceName) as immutable; if so, server-side apply on an existing StatefulSet will fail during upgrades and the DNS fix won't take effect without a recreate. Consider adding a migration path (detect mismatch and recreate) or document the required rollout step for existing MCPServers.