Skip to content

Commit f17f69c

Browse files
committed
Wire Redis session storage and fix MultiSession lookup
Two bugs prevented Redis-backed VirtualMCPServer from working correctly across pods: 1. The vmcp process never read the SessionStorage field from its config, always using in-memory (local) storage regardless of operator config. Fixed by adding SessionStorageConfig to server.Config and a buildRedisConfig helper in commands.go that reads the provider, address, DB, and password (via THV_SESSION_REDIS_PASSWORD) from the vmcp config at startup. 2. With Redis-backed storage, sessionmanager.Manager.GetMultiSession() always returned (nil, false) for fully-formed sessions. The transportsession.Manager.Get() deserialises from Redis into a plain *StreamableSession, losing the defaultMultiSession type. The type assertion sess.(vmcpsession.MultiSession) therefore always failed, causing GetAdaptedTools to error, which triggered the cleanup defer to call Terminate(), marking the session terminated=true in Redis. The client's next request got a 404 "session terminated". Fixed by adding a node-local sync.Map (multiSessions) to sessionmanager.Manager as the authoritative store for live MultiSession objects. CreateSession stores the live session there after UpsertSession succeeds; GetMultiSession and Terminate consult the local map first before falling back to the pluggable backend. Also adds: - E2E test: VirtualMCPServer Redis Session Continuity — verifies that MCP sessions survive complete pod replacement when Redis is configured (ServiceAffinityNone, 2 replicas, all pods deleted and replaced). - Redis image constant and DeployRedis/CleanupRedis helpers in the e2e test package. Related-to: #4220
1 parent a771117 commit f17f69c

13 files changed

Lines changed: 684 additions & 203 deletions

File tree

cmd/vmcp/app/commands.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/stacklok/toolhive/pkg/container/runtime"
2222
"github.com/stacklok/toolhive/pkg/groups"
2323
"github.com/stacklok/toolhive/pkg/telemetry"
24+
transportsession "github.com/stacklok/toolhive/pkg/transport/session"
2425
"github.com/stacklok/toolhive/pkg/vmcp"
2526
"github.com/stacklok/toolhive/pkg/vmcp/aggregator"
2627
vmcpauth "github.com/stacklok/toolhive/pkg/vmcp/auth"
@@ -550,6 +551,11 @@ func runServe(cmd *cobra.Command, _ []string) error {
550551
OptimizerConfig: optCfg,
551552
SessionFactory: sessionFactory,
552553
}
554+
redisCfg, err := buildRedisConfig(cfg.SessionStorage)
555+
if err != nil {
556+
return fmt.Errorf("invalid session storage configuration: %w", err)
557+
}
558+
serverCfg.SessionStorageConfig = redisCfg
553559

554560
// Convert composite tool configurations to workflow definitions
555561
workflowDefs, err := vmcpserver.ConvertConfigToWorkflowDefinitions(cfg.CompositeTools)
@@ -570,3 +576,31 @@ func runServe(cmd *cobra.Command, _ []string) error {
570576
slog.Info(fmt.Sprintf("Starting Virtual MCP Server at %s", srv.Address()))
571577
return srv.Start(ctx)
572578
}
579+
580+
const defaultRedisKeyPrefix = "thv:session:"
581+
582+
// buildRedisConfig converts a vmcp SessionStorageConfig into a transport-layer
583+
// RedisConfig. Returns (nil, nil) when sessionStorage is nil or provider is not "redis".
584+
// Returns an error when provider is "redis" but required fields are invalid.
585+
// The Redis password is read from the THV_SESSION_REDIS_PASSWORD environment variable.
586+
func buildRedisConfig(sessionStorage *config.SessionStorageConfig) (*transportsession.RedisConfig, error) {
587+
if sessionStorage == nil || sessionStorage.Provider != "redis" {
588+
return nil, nil
589+
}
590+
if sessionStorage.Address == "" {
591+
return nil, fmt.Errorf("session storage provider is \"redis\" but address is empty")
592+
}
593+
keyPrefix := sessionStorage.KeyPrefix
594+
if keyPrefix == "" {
595+
keyPrefix = defaultRedisKeyPrefix
596+
}
597+
if keyPrefix[len(keyPrefix)-1] != ':' {
598+
return nil, fmt.Errorf("session storage key prefix %q must end with ':'", keyPrefix)
599+
}
600+
return &transportsession.RedisConfig{
601+
Addr: sessionStorage.Address,
602+
Password: os.Getenv(config.RedisPasswordEnvVar),
603+
DB: int(sessionStorage.DB),
604+
KeyPrefix: keyPrefix,
605+
}, nil
606+
}

pkg/transport/session/manager.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,21 @@ func (m *Manager) Count() int {
276276
return 0
277277
}
278278

279+
// TTL returns the session time-to-live configured for this manager.
280+
func (m *Manager) TTL() time.Duration {
281+
return m.ttl
282+
}
283+
284+
// Peek reports whether a session exists without refreshing its eviction TTL.
285+
// Returns (true, nil) if found, (false, nil) if definitively absent, and
286+
// (false, err) if the storage backend could not be reached.
287+
// Callers must treat a non-nil error as "unknown" rather than "not found".
288+
func (m *Manager) Peek(id string) (bool, error) {
289+
ctx, cancel := context.WithTimeout(context.Background(), defaultOperationTimeout)
290+
defer cancel()
291+
return m.storage.Peek(ctx, id)
292+
}
293+
279294
func (m *Manager) cleanupExpiredOnce() error {
280295
cutoff := time.Now().Add(-m.ttl)
281296
ctx, cancel := context.WithTimeout(context.Background(), cleanupOperationTimeout)

pkg/transport/session/storage.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@ type Storage interface {
2626
// that DeleteExpired does not evict sessions that are actively being accessed.
2727
Load(ctx context.Context, id string) (Session, error)
2828

29+
// Peek reports whether a session exists without refreshing its eviction TTL.
30+
// Returns (true, nil) if the session exists, (false, nil) if it definitively
31+
// does not exist, and (false, err) if the check could not be completed (e.g.
32+
// storage unavailable). Callers must treat non-nil errors as "unknown"
33+
// rather than "not found".
34+
Peek(ctx context.Context, id string) (bool, error)
35+
2936
// Delete removes a session from the storage backend.
3037
// It is not an error if the session doesn't exist.
3138
Delete(ctx context.Context, id string) error

pkg/transport/session/storage_local.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,17 @@ func (s *LocalStorage) Load(_ context.Context, id string) (Session, error) {
9090
return entry.session, nil
9191
}
9292

93+
// Peek reports whether a session exists in local storage without updating its
94+
// last-access timestamp. This avoids extending the TTL of idle sessions during
95+
// eviction-loop probes.
96+
func (s *LocalStorage) Peek(_ context.Context, id string) (bool, error) {
97+
if id == "" {
98+
return false, fmt.Errorf("cannot peek session with empty ID")
99+
}
100+
_, ok := s.sessions.Load(id)
101+
return ok, nil
102+
}
103+
93104
// Delete removes a session from local storage.
94105
func (s *LocalStorage) Delete(_ context.Context, id string) error {
95106
if id == "" {

pkg/transport/session/storage_redis.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,20 @@ func (s *RedisStorage) Load(ctx context.Context, id string) (Session, error) {
195195
return session, nil
196196
}
197197

198+
// Peek checks whether a session key exists in Redis without refreshing its TTL.
199+
// Uses EXISTS rather than GETEX so that idle sessions are not kept alive by the
200+
// eviction-loop probes in sessionmanager.
201+
func (s *RedisStorage) Peek(ctx context.Context, id string) (bool, error) {
202+
if id == "" {
203+
return false, fmt.Errorf("cannot peek session with empty ID")
204+
}
205+
n, err := s.client.Exists(ctx, s.key(id)).Result()
206+
if err != nil {
207+
return false, fmt.Errorf("failed to check session existence: %w", err)
208+
}
209+
return n > 0, nil
210+
}
211+
198212
// Delete removes the Redis key. A missing key is not an error.
199213
func (s *RedisStorage) Delete(ctx context.Context, id string) error {
200214
if id == "" {

pkg/vmcp/server/server.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,12 @@ type Config struct {
171171
// SessionFactory creates MultiSessions for session management.
172172
// Required; must not be nil.
173173
SessionFactory vmcpsession.MultiSessionFactory
174+
175+
// SessionStorageConfig configures the session storage backend.
176+
// When nil, an in-process local store is used (single-pod only).
177+
// When non-nil, a Redis-backed store is used so that session metadata
178+
// survives pod restarts, enabling horizontal scaling without sticky routing.
179+
SessionStorageConfig *transportsession.RedisConfig
174180
}
175181

176182
// Server is the Virtual MCP Server that aggregates multiple backends.
@@ -359,7 +365,18 @@ func New(
359365
// It intentionally carries no vmcp-specific state — backend connections, routing
360366
// tables, tool lists, and token binding all live in the separate sessionmanager.Manager,
361367
// keyed by the same session ID.
362-
sessionManager := transportsession.NewManager(cfg.SessionTTL, transportsession.NewStreamableSession)
368+
var sessionManager *transportsession.Manager
369+
if cfg.SessionStorageConfig != nil {
370+
sessionManager, err = transportsession.NewManagerWithRedis(
371+
ctx, cfg.SessionTTL, transportsession.NewStreamableSession, *cfg.SessionStorageConfig,
372+
)
373+
if err != nil {
374+
return nil, fmt.Errorf("failed to create Redis-backed session manager: %w", err)
375+
}
376+
slog.Info("using Redis-backed session storage", "addr", cfg.SessionStorageConfig.Addr)
377+
} else {
378+
sessionManager = transportsession.NewManager(cfg.SessionTTL, transportsession.NewStreamableSession)
379+
}
363380

364381
// Create handler factory (used by adapter and for future dynamic registration)
365382
handlerFactory := adapter.NewDefaultHandlerFactory(rt, backendClient)

0 commit comments

Comments
 (0)