Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/thv-operator/api/v1alpha1/virtualmcpserver_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,13 @@ type OperationalConfig struct {
// FailureHandling configures failure handling behavior
// +optional
FailureHandling *FailureHandlingConfig `json:"failureHandling,omitempty"`

// CapabilityCacheTTL is the time-to-live for cached capability entries.
// Defaults to 5 minutes if not specified.
// Shorter values (e.g., "10s") can be used for testing to see capability updates sooner.
// +kubebuilder:default="5m"
// +optional
CapabilityCacheTTL string `json:"capabilityCacheTTL,omitempty"`
}

// TimeoutConfig configures timeout settings
Expand Down
299 changes: 199 additions & 100 deletions cmd/thv-operator/controllers/virtualmcpserver_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"sync"
"time"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
Expand Down Expand Up @@ -1128,7 +1129,13 @@ func countBackendHealth(ctx context.Context, backends []mcpv1alpha1.DiscoveredBa
return ready, unhealthy
}

// determineStatusFromBackends evaluates backend health to determine status
// determineStatusFromBackends evaluates backend health to determine status.
// Backend health is based on:
// - Workload phase (MCPServer/MCPRemoteProxy phase): Always evaluated
// - Runtime health checks: Only when health monitoring is configured
//
// When health monitoring is disabled, backends with healthy workload phase are marked as Ready.
// When health monitoring is enabled, backends must pass both workload phase checks and runtime health checks.
func (*VirtualMCPServerReconciler) determineStatusFromBackends(
ctx context.Context,
vmcp *mcpv1alpha1.VirtualMCPServer,
Expand Down Expand Up @@ -1640,6 +1647,9 @@ func (r *VirtualMCPServerReconciler) discoverBackends(
discoveredBackends := make([]mcpv1alpha1.DiscoveredBackend, 0, len(typedWorkloads))
now := metav1.Now()

// Check if health monitoring is enabled for this VirtualMCPServer
healthMonitoringEnabled := isHealthMonitoringEnabled(vmcp)

// Convert vmcp.Backend to DiscoveredBackend for all workloads in the group
for _, workloadInfo := range typedWorkloads {
backend, found := discoveredBackendMap[workloadInfo.Name]
Expand All @@ -1653,49 +1663,28 @@ func (r *VirtualMCPServerReconciler) discoverBackends(
continue
}

// Convert vmcp.Backend to DiscoveredBackend
// Map health status from BackendHealthStatus to CRD status using centralized mapping
backendStatus := mapHealthStatusToCRDStatus(string(backend.HealthStatus))

// Extract auth config reference and check workload phase based on workload type
// Using pre-fetched maps instead of individual Get calls
authConfigRef := ""
authType := ""
switch workloadInfo.Type {
case workloads.WorkloadTypeMCPServer:
if mcpServer, found := mcpServerMap[workloadInfo.Name]; found {
if mcpServer.Spec.ExternalAuthConfigRef != nil {
authConfigRef = mcpServer.Spec.ExternalAuthConfigRef.Name
authType = mcpv1alpha1.BackendAuthTypeExternalAuthConfigRef
}
// Override backend status based on MCPServer phase for non-ready states
if isPhaseUnhealthy(string(mcpServer.Status.Phase)) {
backendStatus = mcpv1alpha1.BackendStatusUnavailable
ctxLogger.V(1).Info("Backend MCPServer not ready, marking as unavailable",
"name", workloadInfo.Name,
"phase", mcpServer.Status.Phase)
}
}
case workloads.WorkloadTypeMCPRemoteProxy:
if mcpRemoteProxy, found := mcpRemoteProxyMap[workloadInfo.Name]; found {
if mcpRemoteProxy.Spec.ExternalAuthConfigRef != nil {
authConfigRef = mcpRemoteProxy.Spec.ExternalAuthConfigRef.Name
authType = mcpv1alpha1.BackendAuthTypeExternalAuthConfigRef
}
// Override backend status based on MCPRemoteProxy phase for non-ready states
if isPhaseUnhealthy(string(mcpRemoteProxy.Status.Phase)) {
backendStatus = mcpv1alpha1.BackendStatusUnavailable
ctxLogger.V(1).Info("Backend MCPRemoteProxy not ready, marking as unavailable",
"name", workloadInfo.Name,
"phase", mcpRemoteProxy.Status.Phase)
}
}
// Get workload information (auth config, phase) using pre-fetched maps
wlInfo := getWorkloadInfo(workloadInfo.Name, workloadInfo.Type, mcpServerMap, mcpRemoteProxyMap)

// Determine backend status based on workload phase and health monitoring config
if wlInfo.found {
backendStatus = determineBackendStatusFromPhase(
backendStatus,
wlInfo.phase,
healthMonitoringEnabled,
string(workloadInfo.Type),
workloadInfo.Name,
ctxLogger,
)
}

discoveredBackend := mcpv1alpha1.DiscoveredBackend{
Name: backend.Name,
AuthConfigRef: authConfigRef,
AuthType: authType,
AuthConfigRef: wlInfo.authConfigRef,
AuthType: wlInfo.authType,
Status: backendStatus,
LastHealthCheck: now,
URL: backend.BaseURL,
Expand All @@ -1706,7 +1695,7 @@ func (r *VirtualMCPServerReconciler) discoverBackends(
"name", backend.Name,
"status", backendStatus,
"url", backend.BaseURL,
"authConfigRef", authConfigRef)
"authConfigRef", wlInfo.authConfigRef)
}

// Query vmcp health status and update backend statuses if health monitoring is enabled
Expand All @@ -1715,74 +1704,16 @@ func (r *VirtualMCPServerReconciler) discoverBackends(
// Performance: Health status responses are cached with healthStatusCacheTTL to reduce HTTP
// overhead from frequent reconciliations while maintaining relatively fresh health data.
// The vmcp health endpoint itself returns cached results from periodic health checks.
if vmcp.Status.URL != "" {
// Parse health check interval from spec to derive query timeout
var healthCheckInterval time.Duration
if vmcp.Spec.Operational != nil && vmcp.Spec.Operational.FailureHandling != nil &&
vmcp.Spec.Operational.FailureHandling.HealthCheckInterval != "" {
if interval, err := time.ParseDuration(vmcp.Spec.Operational.FailureHandling.HealthCheckInterval); err == nil {
healthCheckInterval = interval
}
}

if vmcp.Status.URL != "" && healthMonitoringEnabled {
healthCheckInterval := getHealthCheckInterval(vmcp)
healthStatus := r.queryVMCPHealthStatus(ctx, vmcp.Status.URL, healthCheckInterval)

if healthStatus != nil {
ctxLogger.Info("Updating backend status from vmcp health checks",
"vmcp_url", vmcp.Status.URL,
"backend_count", len(healthStatus))

for i := range discoveredBackends {
// Health status is keyed by BackendID, which equals backend.Name for workloads
// (see pkg/vmcp/workloads/k8s.go where ID and Name are both set to workload.Name)
if healthInfo, found := healthStatus[discoveredBackends[i].Name]; found {
// Map vmcp health status to CRD backend status using centralized mapping
newStatus := mapHealthStatusToCRDStatus(healthInfo.Status)

// IMPORTANT: Re-check current phase to determine if we should trust vmcp health status
// If the workload's phase is currently unhealthy (Pending/Failed/Terminating),
// we must keep it unavailable even if vmcp health checks report it as healthy.
// The phase check is authoritative for workload-level health.
shouldPreserveUnavailable := false
if mcpServer, found := mcpServerMap[discoveredBackends[i].Name]; found {
if isPhaseUnhealthy(string(mcpServer.Status.Phase)) {
shouldPreserveUnavailable = true
ctxLogger.Info("Preserving unavailable status - MCPServer phase is unhealthy",
"name", discoveredBackends[i].Name,
"phase", mcpServer.Status.Phase,
"vmcp_health_status", newStatus)
}
} else if mcpRemoteProxy, found := mcpRemoteProxyMap[discoveredBackends[i].Name]; found {
if isPhaseUnhealthy(string(mcpRemoteProxy.Status.Phase)) {
shouldPreserveUnavailable = true
ctxLogger.Info("Preserving unavailable status - MCPRemoteProxy phase is unhealthy",
"name", discoveredBackends[i].Name,
"phase", mcpRemoteProxy.Status.Phase,
"vmcp_health_status", newStatus)
}
}

// Update LastHealthCheck with actual health check timestamp from vmcp
// Do this BEFORE the shouldPreserveUnavailable check so timestamp is always fresh
if !healthInfo.LastCheckTime.IsZero() {
discoveredBackends[i].LastHealthCheck = metav1.NewTime(healthInfo.LastCheckTime)
}

if shouldPreserveUnavailable {
// Skip status update but keep timestamp fresh (already updated above)
continue
}

// Update status if changed
if newStatus != discoveredBackends[i].Status {
ctxLogger.Info("Backend health check updated status",
"name", discoveredBackends[i].Name,
"old_status", discoveredBackends[i].Status,
"new_status", newStatus,
"health_status", healthInfo.Status)
discoveredBackends[i].Status = newStatus
}
}
}
r.updateBackendsFromHealthStatus(ctx, discoveredBackends, healthStatus, mcpServerMap, mcpRemoteProxyMap)
} else {
ctxLogger.Info("Health monitoring not enabled or failed to query vmcp health endpoint",
"vmcp_url", vmcp.Status.URL)
Expand All @@ -1803,6 +1734,174 @@ func isPhaseUnhealthy(phase string) bool {
phase == string(mcpv1alpha1.MCPRemoteProxyPhaseTerminating)
}

// isHealthMonitoringEnabled checks if health monitoring is configured for the VirtualMCPServer.
func isHealthMonitoringEnabled(vmcp *mcpv1alpha1.VirtualMCPServer) bool {
return vmcp.Spec.Operational != nil &&
vmcp.Spec.Operational.FailureHandling != nil &&
vmcp.Spec.Operational.FailureHandling.HealthCheckInterval != ""
}

// getHealthCheckInterval parses and returns the health check interval from VirtualMCPServer spec.
// Returns zero duration if health monitoring is not configured or parsing fails.
func getHealthCheckInterval(vmcp *mcpv1alpha1.VirtualMCPServer) time.Duration {
if !isHealthMonitoringEnabled(vmcp) {
return 0
}

interval, err := time.ParseDuration(vmcp.Spec.Operational.FailureHandling.HealthCheckInterval)
if err != nil {
return 0
}
return interval
}

// workloadInfo contains information extracted from a workload resource.
type workloadInfo struct {
authConfigRef string
authType string
phase string
found bool
}

// getWorkloadInfo retrieves auth config and phase information from a workload.
// Returns workloadInfo with found=false if workload is not in the provided maps.
func getWorkloadInfo(
workloadName string,
workloadType workloads.WorkloadType,
mcpServerMap map[string]*mcpv1alpha1.MCPServer,
mcpRemoteProxyMap map[string]*mcpv1alpha1.MCPRemoteProxy,
) workloadInfo {
info := workloadInfo{found: false}

switch workloadType {
case workloads.WorkloadTypeMCPServer:
if mcpServer, found := mcpServerMap[workloadName]; found {
info.found = true
info.phase = string(mcpServer.Status.Phase)
if mcpServer.Spec.ExternalAuthConfigRef != nil {
info.authConfigRef = mcpServer.Spec.ExternalAuthConfigRef.Name
info.authType = mcpv1alpha1.BackendAuthTypeExternalAuthConfigRef
}
}
case workloads.WorkloadTypeMCPRemoteProxy:
if mcpRemoteProxy, found := mcpRemoteProxyMap[workloadName]; found {
info.found = true
info.phase = string(mcpRemoteProxy.Status.Phase)
if mcpRemoteProxy.Spec.ExternalAuthConfigRef != nil {
info.authConfigRef = mcpRemoteProxy.Spec.ExternalAuthConfigRef.Name
info.authType = mcpv1alpha1.BackendAuthTypeExternalAuthConfigRef
}
}
}

return info
}

// determineBackendStatusFromPhase determines backend status based on workload phase and health monitoring config.
// When health monitoring is disabled and phase is healthy, backends are marked as Ready.
// When phase is unhealthy, backend is always marked as Unavailable regardless of health monitoring.
func determineBackendStatusFromPhase(
currentStatus string,
phase string,
healthMonitoringEnabled bool,
workloadType string,
workloadName string,
logger logr.Logger,
) string {
// Phase is unhealthy - always mark as unavailable
if isPhaseUnhealthy(phase) {
logger.V(1).Info("Backend workload not ready, marking as unavailable",
"name", workloadName,
"type", workloadType,
"phase", phase)
return mcpv1alpha1.BackendStatusUnavailable
}

// Phase is healthy but status is unknown (no runtime health checks)
// When health monitoring is disabled, treat this as ready
if currentStatus == mcpv1alpha1.BackendStatusUnknown && !healthMonitoringEnabled {
logger.V(1).Info("Health monitoring disabled with healthy phase, marking backend as ready",
"name", workloadName,
"type", workloadType,
"phase", phase)
return mcpv1alpha1.BackendStatusReady
}

// Keep current status
return currentStatus
}

// updateBackendsFromHealthStatus updates backend statuses from vmcp health check results.
// Phase checks are authoritative - unhealthy phases override health check results.
// Health status is keyed by BackendID, which equals backend.Name for workloads
// (see pkg/vmcp/workloads/k8s.go where ID and Name are both set to workload.Name).
func (*VirtualMCPServerReconciler) updateBackendsFromHealthStatus(
ctx context.Context,
discoveredBackends []mcpv1alpha1.DiscoveredBackend,
healthStatus map[string]*BackendHealthInfo,
mcpServerMap map[string]*mcpv1alpha1.MCPServer,
mcpRemoteProxyMap map[string]*mcpv1alpha1.MCPRemoteProxy,
) {
ctxLogger := log.FromContext(ctx)

for i := range discoveredBackends {
healthInfo, found := healthStatus[discoveredBackends[i].Name]
if !found {
continue
}

// Map vmcp health status to CRD backend status
newStatus := mapHealthStatusToCRDStatus(healthInfo.Status)

// IMPORTANT: Re-check current phase to determine if we should trust vmcp health status
// If the workload's phase is currently unhealthy (Pending/Failed/Terminating),
// we must keep it unavailable even if vmcp health checks report it as healthy.
// The phase check is authoritative for workload-level health.
shouldPreserveUnavailable := false

// Check MCPServer phase
if mcpServer, found := mcpServerMap[discoveredBackends[i].Name]; found {
if isPhaseUnhealthy(string(mcpServer.Status.Phase)) {
shouldPreserveUnavailable = true
ctxLogger.Info("Preserving unavailable status - MCPServer phase is unhealthy",
"name", discoveredBackends[i].Name,
"phase", mcpServer.Status.Phase,
"vmcp_health_status", newStatus)
}
} else if mcpRemoteProxy, found := mcpRemoteProxyMap[discoveredBackends[i].Name]; found {
// Check MCPRemoteProxy phase
if isPhaseUnhealthy(string(mcpRemoteProxy.Status.Phase)) {
shouldPreserveUnavailable = true
ctxLogger.Info("Preserving unavailable status - MCPRemoteProxy phase is unhealthy",
"name", discoveredBackends[i].Name,
"phase", mcpRemoteProxy.Status.Phase,
"vmcp_health_status", newStatus)
}
}

// Update LastHealthCheck with actual health check timestamp from vmcp
// Do this BEFORE the shouldPreserveUnavailable check so timestamp is always fresh
if !healthInfo.LastCheckTime.IsZero() {
discoveredBackends[i].LastHealthCheck = metav1.NewTime(healthInfo.LastCheckTime)
}

if shouldPreserveUnavailable {
// Skip status update but keep timestamp fresh (already updated above)
continue
}

// Update status if changed
if newStatus != discoveredBackends[i].Status {
ctxLogger.Info("Backend health check updated status",
"name", discoveredBackends[i].Name,
"old_status", discoveredBackends[i].Status,
"new_status", newStatus,
"health_status", healthInfo.Status)
discoveredBackends[i].Status = newStatus
}
}
}

// mapHealthStatusToCRDStatus converts health status strings to CRD backend status constants.
// This provides a centralized mapping between vmcp health statuses and VirtualMCPServer CRD statuses.
//
Expand Down
7 changes: 7 additions & 0 deletions cmd/thv-operator/pkg/vmcpconfig/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,5 +921,12 @@ func (*Converter) convertOperational(
}
}

// Parse capability cache TTL
if vmcp.Spec.Operational.CapabilityCacheTTL != "" {
if duration, err := time.ParseDuration(vmcp.Spec.Operational.CapabilityCacheTTL); err == nil {
operational.CapabilityCacheTTL = vmcpconfig.Duration(duration)
}
}

return operational
}
Loading
Loading