diff --git a/config/config.go b/config/config.go index 2581e6da..1bf6d50f 100644 --- a/config/config.go +++ b/config/config.go @@ -46,8 +46,10 @@ type FailOverConfig struct { // EnableSlaveHAUpdate controls whether HA logic marks failed slave nodes and // propagates the updated topology. Requires kvrocks to support node status // modification (new versions only). Defaults to false for backward compatibility. - EnableSlaveHAUpdate bool `yaml:"enable_slave_ha_update"` - WaitForSync bool `yaml:"wait_for_sync"` + EnableSlaveHAUpdate bool `yaml:"enable_slave_ha_update"` + WaitForSync bool `yaml:"wait_for_sync"` + VoteTimeoutMs int `yaml:"vote_timeout_ms"` + VoteThresholdRatio float64 `yaml:"vote_threshold_ratio"` } type ControllerConfig struct { @@ -81,6 +83,8 @@ func DefaultFailOverConfig() *FailOverConfig { return &FailOverConfig{ PingIntervalSeconds: 3, MaxPingCount: 5, + VoteTimeoutMs: 2000, + VoteThresholdRatio: 0.6, } } @@ -104,6 +108,12 @@ func (c *Config) Validate() error { if c.Controller.FailOver.PingIntervalSeconds < 1 { return errors.New("ping interval required >= 1s") } + if c.Controller.FailOver.VoteTimeoutMs < 1 { + return errors.New("vote_timeout_ms required >= 1 (0 causes all peer votes to time out immediately)") + } + if c.Controller.FailOver.VoteThresholdRatio <= 0 || c.Controller.FailOver.VoteThresholdRatio > 1.0 { + return errors.New("vote_threshold_ratio must be in range (0, 1.0]") + } hostPort := strings.Split(c.Addr, ":") if hostPort[0] == "0.0.0.0" || hostPort[0] == "127.0.0.1" { logger.Get().Warn("Leader forward may not work if the host is " + hostPort[0]) diff --git a/config/config_test.go b/config/config_test.go index a2002fe3..2ac5504a 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -25,12 +25,20 @@ import ( "github.com/stretchr/testify/assert" ) +func TestDefaultFailOverConfig_VoteDefaults(t *testing.T) { + cfg := DefaultFailOverConfig() + assert.Equal(t, 2000, cfg.VoteTimeoutMs) + assert.InDelta(t, 0.6, cfg.VoteThresholdRatio, 1e-9) +} + func TestDefaultControllerConfigSet(t *testing.T) { cfg := Default() expectedControllerConfig := &ControllerConfig{ FailOver: &FailOverConfig{ PingIntervalSeconds: 3, MaxPingCount: 5, + VoteTimeoutMs: 2000, + VoteThresholdRatio: 0.6, }, } diff --git a/controller/cluster.go b/controller/cluster.go index b6f78bcb..507b763a 100755 --- a/controller/cluster.go +++ b/controller/cluster.go @@ -22,13 +22,17 @@ package controller import ( "context" "errors" + "fmt" + "math" "strings" "sync" "time" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "github.com/apache/kvrocks-controller/logger" + "github.com/apache/kvrocks-controller/metrics" "github.com/apache/kvrocks-controller/store" ) @@ -37,11 +41,19 @@ var ( ErrRestoringBackUp = errors.New("LOADING kvrocks is restoring the db from backup") ) +type failoverProposal struct { + namespace string + clusterName string + shardIndex int + failedNodeID string +} + type ClusterCheckOptions struct { pingInterval time.Duration maxFailureCount int64 enableSlaveHAUpdate bool failoverOpts store.FailoverOptions + voteThresholdRatio float64 } type ClusterChecker struct { @@ -55,7 +67,20 @@ type ClusterChecker struct { failureMu sync.Mutex failureCounts map[string]int64 - syncCh chan struct{} + + lastProbeMu sync.Mutex + lastProbeTime map[string]time.Time + + failoverProposalCh chan failoverProposal + + coordinateMu sync.Mutex + coordinateCtx context.Context + coordinateCancelFn context.CancelFunc + coordinateDoneCh chan struct{} // closed when coordinateLoop exits; nil when not running + + voter Voter + + syncCh chan struct{} ctx context.Context cancelFn context.CancelFunc @@ -71,12 +96,16 @@ func NewClusterChecker(s store.Store, ns, cluster string) *ClusterChecker { clusterStore: s, options: ClusterCheckOptions{ - pingInterval: time.Second * 3, - maxFailureCount: 5, - failoverOpts: store.DefaultFailoverOptions(), + pingInterval: time.Second * 3, + maxFailureCount: 5, + failoverOpts: store.DefaultFailoverOptions(), + voteThresholdRatio: 0.6, }, - failureCounts: make(map[string]int64), - syncCh: make(chan struct{}, 1), + failureCounts: make(map[string]int64), + lastProbeTime: make(map[string]time.Time), + failoverProposalCh: make(chan failoverProposal, 1), + voter: nopVoter{}, + syncCh: make(chan struct{}, 1), ctx: ctx, cancelFn: cancel, @@ -117,6 +146,66 @@ func (c *ClusterChecker) WithFailoverOptions(opts store.FailoverOptions) *Cluste return c } +func (c *ClusterChecker) WithVoter(v Voter) *ClusterChecker { + c.voter = v + return c +} + +func (c *ClusterChecker) WithVoteThresholdRatio(ratio float64) *ClusterChecker { + if ratio > 0 && ratio <= 1.0 { + c.options.voteThresholdRatio = ratio + } + return c +} + +// ShouldVote returns a VoteResponse describing whether this node's probe data +// justifies approving a failover for the given kvrocks node. The response +// includes diagnostic fields (failure count, soft threshold, last-probe age) +// so that the requesting leader can log them when a peer votes NO, making it +// possible to answer "why didn't failover happen?" in production. +func (c *ClusterChecker) ShouldVote(nodeID string) VoteResponse { + softThreshold := int64(math.Ceil( + float64(c.options.maxFailureCount) * c.options.voteThresholdRatio)) + freshnessWindow := c.options.pingInterval * 2 + + c.failureMu.Lock() + count := c.failureCounts[nodeID] + c.failureMu.Unlock() + + c.lastProbeMu.Lock() + lastProbe := c.lastProbeTime[nodeID] + c.lastProbeMu.Unlock() + + if lastProbe.IsZero() { + return VoteResponse{ + Vote: false, + Reason: "no probe data for node", + SoftThreshold: softThreshold, + } + } + + agoMs := time.Since(lastProbe).Milliseconds() + vote := count >= softThreshold && time.Since(lastProbe) < freshnessWindow + + var reason string + if !vote { + if count < softThreshold { + reason = fmt.Sprintf("failure count %d below soft threshold %d", count, softThreshold) + } else { + reason = fmt.Sprintf("last probe stale (%dms ago, window %dms)", + agoMs, freshnessWindow.Milliseconds()) + } + } + + return VoteResponse{ + Vote: vote, + Reason: reason, + FailureCount: count, + SoftThreshold: softThreshold, + LastProbeAgoMs: agoMs, + } +} + func (c *ClusterChecker) probeNode(ctx context.Context, node store.Node) (int64, error) { clusterInfo, err := node.GetClusterInfo(ctx) if err != nil { @@ -144,6 +233,9 @@ func (c *ClusterChecker) increaseFailureCount(shardIndex int, node store.Node) i c.failureCounts[id] += 1 count := c.failureCounts[id] c.failureMu.Unlock() + metrics.Get().NodeFailureCount.With(prometheus.Labels{ + "namespace": c.namespace, "cluster": c.clusterName, "node_id": id, + }).Set(float64(count)) if !node.IsMaster() { if c.options.enableSlaveHAUpdate && count >= c.options.maxFailureCount && !node.Failed() { @@ -170,30 +262,24 @@ func (c *ClusterChecker) increaseFailureCount(shardIndex int, node store.Node) i return count } - if count%c.options.maxFailureCount == 0 || count > c.options.maxFailureCount { - log := logger.Get().With( + if count%c.options.maxFailureCount == 0 { + logger.Get().With( zap.String("cluster_name", c.clusterName), zap.String("id", node.ID()), zap.Bool("is_master", node.IsMaster()), - zap.String("addr", node.Addr())) - cluster, err := c.clusterStore.GetCluster(c.ctx, c.namespace, c.clusterName) - if err != nil { - log.Error("Failed to get the cluster info", zap.Error(err)) - return count - } - _, newMaster, promoteErr := cluster.PromoteNewMaster(c.ctx, shardIndex, node.ID(), "", c.options.failoverOpts) - if promoteErr != nil { - log.Error("Failed to promote the new master", zap.Error(promoteErr)) - return count - } - if updateErr := c.clusterStore.UpdateCluster(c.ctx, c.namespace, cluster); updateErr != nil { - log.Error("Failed to persist cluster after promoting new master", zap.Error(updateErr)) - return count + zap.String("addr", node.Addr()), + zap.Int64("failure_count", count), + ).Warn("Master failure threshold reached, proposing failover") + select { + case c.failoverProposalCh <- failoverProposal{ + namespace: c.namespace, + clusterName: c.clusterName, + shardIndex: shardIndex, + failedNodeID: node.ID(), + }: + default: + // previous proposal still being processed } - // the node is normal if it can be elected as the new master, - // because it requires the node is healthy. - c.resetFailureCount(newMaster.ID()) - log.With(zap.String("new_master_id", newMaster.ID())).Info("Promote the new master") } return count } @@ -202,6 +288,34 @@ func (c *ClusterChecker) resetFailureCount(nodeID string) { c.failureMu.Lock() delete(c.failureCounts, nodeID) c.failureMu.Unlock() + metrics.Get().NodeFailureCount.With(prometheus.Labels{ + "namespace": c.namespace, "cluster": c.clusterName, "node_id": nodeID, + }).Set(0) +} + +// pruneStaleEntries removes failure-count and probe-time map entries for nodes +// that are no longer present in the current cluster topology. It is called after +// each probe round so that removing a node from a cluster eventually frees the +// memory held for it, preventing unbounded map growth. +func (c *ClusterChecker) pruneStaleEntries(activeIDs map[string]struct{}) { + c.failureMu.Lock() + for id := range c.failureCounts { + if _, active := activeIDs[id]; !active { + delete(c.failureCounts, id) + metrics.Get().NodeFailureCount.Delete(prometheus.Labels{ + "namespace": c.namespace, "cluster": c.clusterName, "node_id": id, + }) + } + } + c.failureMu.Unlock() + + c.lastProbeMu.Lock() + for id := range c.lastProbeTime { + if _, active := activeIDs[id]; !active { + delete(c.lastProbeTime, id) + } + } + c.lastProbeMu.Unlock() } func (c *ClusterChecker) sendSyncEvent() { @@ -243,6 +357,15 @@ func (c *ClusterChecker) syncClusterToNodes(ctx context.Context) error { } func (c *ClusterChecker) parallelProbeNodes(ctx context.Context, cluster *store.Cluster) { + // Snapshot active node IDs before probing so we can prune stale map entries + // for nodes removed from the topology after all goroutines finish. + activeIDs := make(map[string]struct{}) + for _, shard := range cluster.Shards { + for _, node := range shard.Nodes { + activeIDs[node.ID()] = struct{}{} + } + } + var mu sync.Mutex var latestNodeVersion int64 = 0 var latestClusterNodesStr string @@ -260,6 +383,11 @@ func (c *ClusterChecker) parallelProbeNodes(ctx context.Context, cluster *store. zap.String("addr", n.Addr()), ) version, err := c.probeNode(ctx, n) + // Record probe time regardless of outcome so ShouldVote has fresh data. + c.lastProbeMu.Lock() + c.lastProbeTime[n.ID()] = time.Now() + c.lastProbeMu.Unlock() + // Don't sync the cluster info to the node if it is restoring the db from backup if errors.Is(err, ErrRestoringBackUp) { log.Error("The node is restoring the db from backup") @@ -270,6 +398,14 @@ func (c *ClusterChecker) parallelProbeNodes(ctx context.Context, cluster *store. log.With(zap.Error(err), zap.Int64("failure_count", failureCount), ).Warn("Failed to probe the node") + isMaster := "false" + if n.IsMaster() { + isMaster = "true" + } + metrics.Get().ProbeFailures.With(prometheus.Labels{ + "namespace": c.namespace, "cluster": c.clusterName, + "node_id": n.ID(), "is_master": isMaster, + }).Inc() return } log.Debug("Probe the clusterName node") @@ -305,6 +441,8 @@ func (c *ClusterChecker) parallelProbeNodes(ctx context.Context, cluster *store. } wg.Wait() + c.pruneStaleEntries(activeIDs) + if latestNodeVersion > cluster.Version.Load() && latestClusterNodesStr != "" { latestClusterInfo, err := store.ParseCluster(latestClusterNodesStr) if err != nil { @@ -462,7 +600,108 @@ func (c *ClusterChecker) migrationLoop() { } } +// StartCoordinate starts the coordinateLoop goroutine (idempotent). +// Call only when this node is the leader. +func (c *ClusterChecker) StartCoordinate() { + c.coordinateMu.Lock() + defer c.coordinateMu.Unlock() + if c.coordinateCancelFn != nil { + return + } + coordCtx, cancel := context.WithCancel(context.Background()) + c.coordinateCtx = coordCtx + c.coordinateCancelFn = cancel + doneCh := make(chan struct{}) + c.coordinateDoneCh = doneCh + c.wg.Add(1) + go c.coordinateLoop(doneCh) +} + +// StopCoordinate stops the coordinateLoop goroutine and blocks until it has +// fully exited. This prevents a stale coordinateLoop from racing with a newly +// started one after a leader-change. Idempotent. +func (c *ClusterChecker) StopCoordinate() { + c.coordinateMu.Lock() + if c.coordinateCancelFn == nil { + c.coordinateMu.Unlock() + return + } + c.coordinateCancelFn() + c.coordinateCancelFn = nil + doneCh := c.coordinateDoneCh + c.coordinateDoneCh = nil + c.coordinateMu.Unlock() // release before blocking + + if doneCh != nil { + <-doneCh // wait for coordinateLoop goroutine to fully exit + } +} + +func (c *ClusterChecker) coordinateLoop(doneCh chan struct{}) { + defer close(doneCh) // signal exit AFTER wg.Done so Close()'s wg.Wait is clean + defer c.wg.Done() + for { + select { + case proposal := <-c.failoverProposalCh: + c.handleProposal(c.coordinateCtx, proposal) + case <-c.coordinateCtx.Done(): + return + } + } +} + +func (c *ClusterChecker) handleProposal(ctx context.Context, p failoverProposal) { + log := logger.Get().With( + zap.String("namespace", p.namespace), + zap.String("cluster", p.clusterName), + zap.Int("shard_index", p.shardIndex), + zap.String("failed_node", p.failedNodeID), + ) + metrics.Get().FailoverProposals.With(prometheus.Labels{ + "namespace": p.namespace, "cluster": p.clusterName, + }).Inc() + + approved, err := c.voter.RequestVotes(ctx, VoteRequest{ + Namespace: p.namespace, + ClusterName: p.clusterName, + ShardIndex: p.shardIndex, + FailedNodeID: p.failedNodeID, + }) + if err != nil { + log.Error("Vote request failed", zap.Error(err)) + metrics.Get().FailoverBlocked.With(prometheus.Labels{ + "namespace": p.namespace, "cluster": p.clusterName, "reason": "vote_error", + }).Inc() + return + } + if !approved { + log.Info("Failover blocked by peer vote") + return + } + + cluster, err := c.clusterStore.GetCluster(ctx, p.namespace, p.clusterName) + if err != nil { + log.Error("Failed to get cluster for failover", zap.Error(err)) + return + } + _, newMaster, err := cluster.PromoteNewMaster(ctx, p.shardIndex, p.failedNodeID, "", c.options.failoverOpts) + if err != nil { + log.Error("Failed to promote new master", zap.Error(err)) + return + } + if err := c.clusterStore.UpdateCluster(ctx, p.namespace, cluster); err != nil { + log.Error("Failed to persist cluster after failover", zap.Error(err)) + return + } + c.resetFailureCount(newMaster.ID()) + metrics.Get().FailoverCompleted.With(prometheus.Labels{ + "namespace": p.namespace, "cluster": p.clusterName, + }).Inc() + log.With(zap.String("new_master", newMaster.ID())).Info("Failover completed") +} + func (c *ClusterChecker) Close() { + c.StopCoordinate() c.cancelFn() c.wg.Wait() } diff --git a/controller/cluster_test.go b/controller/cluster_test.go index a3774446..b7707832 100644 --- a/controller/cluster_test.go +++ b/controller/cluster_test.go @@ -26,6 +26,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/apache/kvrocks-controller/consts" @@ -117,6 +118,7 @@ func TestCluster_FailureCount(t *testing.T) { require.NoError(t, s.CreateCluster(ctx, ns, clusterInfo)) + voteCalled := make(chan struct{}, 1) cluster := &ClusterChecker{ clusterStore: s, namespace: ns, @@ -125,26 +127,50 @@ func TestCluster_FailureCount(t *testing.T) { pingInterval: time.Second, maxFailureCount: 3, enableSlaveHAUpdate: true, + voteThresholdRatio: 0.6, }, - failureCounts: make(map[string]int64), - syncCh: make(chan struct{}, 1), + failureCounts: make(map[string]int64), + lastProbeTime: make(map[string]time.Time), + failoverProposalCh: make(chan failoverProposal, 1), + voter: voterFunc(func(_ context.Context, req VoteRequest) (bool, error) { + voteCalled <- struct{}{} + return true, nil // approve so PromoteNewMaster runs + }), + syncCh: make(chan struct{}, 1), } + cluster.ctx, cluster.cancelFn = context.WithCancel(context.Background()) + cluster.StartCoordinate() + defer cluster.Close() require.EqualValues(t, 1, clusterInfo.Version.Load()) for i := int64(0); i < cluster.options.maxFailureCount-1; i++ { require.EqualValues(t, i+1, cluster.increaseFailureCount(0, mockNode2)) } for i := int64(0); i < cluster.options.maxFailureCount; i++ { - require.EqualValues(t, i+1, cluster.increaseFailureCount(0, mockNode0)) + cluster.increaseFailureCount(0, mockNode0) } + + // Voter called: fast-fail if the coordinateLoop never picks up the proposal. + select { + case <-voteCalled: + case <-time.After(500 * time.Millisecond): + t.Fatal("voter not called within timeout") + } + + // Wait for handleProposal to complete. UpdateCluster increments Version + // atomically AFTER PromoteNewMaster/SetRole, so Version==2 is the correct + // synchronization point — it prevents a data race between the coordinator + // goroutine's SetRole write and the test goroutine's IsMaster read. + require.Eventually(t, + func() bool { return clusterInfo.Version.Load() == 2 }, + 500*time.Millisecond, 5*time.Millisecond, + "failover did not complete") + require.False(t, mockNode0.IsMaster()) // mockNode2 should become the new master since its sequence is the largest require.True(t, mockNode2.IsMaster()) require.EqualValues(t, 2, clusterInfo.Version.Load()) - require.EqualValues(t, 0, cluster.failureCounts[mockNode2.Addr()]) - require.True(t, mockNode2.IsMaster()) - // Slave failure count keeps increasing; at threshold the slave is auto-marked as failed. for i := int64(0); i < cluster.options.maxFailureCount*2; i++ { require.EqualValues(t, i+1, cluster.increaseFailureCount(0, mockNode3)) @@ -193,10 +219,15 @@ func TestCluster_SlaveFailureAutoOffline(t *testing.T) { pingInterval: time.Second, maxFailureCount: 3, enableSlaveHAUpdate: true, + voteThresholdRatio: 0.6, }, - failureCounts: make(map[string]int64), - syncCh: make(chan struct{}, 1), + failureCounts: make(map[string]int64), + lastProbeTime: make(map[string]time.Time), + failoverProposalCh: make(chan failoverProposal, 1), + voter: nopVoter{}, + syncCh: make(chan struct{}, 1), } + checker.ctx, checker.cancelFn = context.WithCancel(context.Background()) // Slave should not be marked as failed before reaching threshold require.False(t, mockSlave1.Failed()) @@ -307,3 +338,318 @@ func TestCluster_MigrateSlot(t *testing.T) { defer ticker.Stop() <-ticker.C } + +// TestPruneStaleEntries_DirectCall verifies the pruning helper itself: +// entries for absent nodes are deleted, entries for active nodes are kept. +func TestPruneStaleEntries_DirectCall(t *testing.T) { + checker := &ClusterChecker{ + failureCounts: map[string]int64{ + "active-node": 2, + "removed-node": 5, + }, + lastProbeTime: map[string]time.Time{ + "active-node": time.Now(), + "removed-node": time.Now(), + }, + } + + checker.pruneStaleEntries(map[string]struct{}{"active-node": {}}) + + checker.failureMu.Lock() + _, activeExists := checker.failureCounts["active-node"] + _, removedExists := checker.failureCounts["removed-node"] + checker.failureMu.Unlock() + assert.True(t, activeExists, "active node failure count must be kept") + assert.False(t, removedExists, "removed node failure count must be pruned") + + checker.lastProbeMu.Lock() + _, activeExists = checker.lastProbeTime["active-node"] + _, removedExists = checker.lastProbeTime["removed-node"] + checker.lastProbeMu.Unlock() + assert.True(t, activeExists, "active node probe time must be kept") + assert.False(t, removedExists, "removed node probe time must be pruned") +} + +// TestParallelProbeNodes_PrunesRemovedNodes verifies the integration path: +// entries accumulated for a node that has been removed from the cluster topology +// are cleaned up on the next probe round. +func TestParallelProbeNodes_PrunesRemovedNodes(t *testing.T) { + ctx := context.Background() + s := NewMockClusterStore() + + activeNode := store.NewClusterMockNode() + activeNode.SetRole(store.RoleMaster) + removedNode := store.NewClusterMockNode() + removedNode.SetRole(store.RoleSlave) + + fullCluster := &store.Cluster{ + Name: "test", + Shards: []*store.Shard{{ + Nodes: []store.Node{activeNode, removedNode}, + SlotRanges: []store.SlotRange{{Start: 0, Stop: 16383}}, + MigratingSlot: &store.MigratingSlot{}, + TargetShardIndex: -1, + }}, + } + fullCluster.Version.Store(1) + require.NoError(t, s.CreateCluster(ctx, "ns", fullCluster)) + + checker := NewClusterChecker(s, "ns", "test") + + // Simulate entries that were built up while removedNode was still active. + checker.failureMu.Lock() + checker.failureCounts[removedNode.ID()] = 3 + checker.failureMu.Unlock() + checker.lastProbeMu.Lock() + checker.lastProbeTime[removedNode.ID()] = time.Now() + checker.lastProbeMu.Unlock() + + // Probe with a topology from which removedNode has been evicted. + reducedCluster := &store.Cluster{ + Name: "test", + Shards: []*store.Shard{{ + Nodes: []store.Node{activeNode}, + SlotRanges: []store.SlotRange{{Start: 0, Stop: 16383}}, + MigratingSlot: &store.MigratingSlot{}, + TargetShardIndex: -1, + }}, + } + reducedCluster.Version.Store(1) + checker.parallelProbeNodes(ctx, reducedCluster) + + checker.failureMu.Lock() + _, exists := checker.failureCounts[removedNode.ID()] + checker.failureMu.Unlock() + assert.False(t, exists, "failure count for removed node must be pruned after probe round") + + checker.lastProbeMu.Lock() + _, exists = checker.lastProbeTime[removedNode.ID()] + checker.lastProbeMu.Unlock() + assert.False(t, exists, "lastProbeTime for removed node must be pruned after probe round") +} + +// newTestChecker creates a ClusterChecker with voteThresholdRatio=0.6, +// maxFailureCount=5, pingInterval=3s — soft threshold = ceil(5*0.6) = 3. +func newTestChecker() *ClusterChecker { + checker := NewClusterChecker(nil, "test-ns", "test-cluster") + checker.options.maxFailureCount = 5 + checker.options.voteThresholdRatio = 0.6 + checker.options.pingInterval = 3 * time.Second + return checker +} + +func TestShouldVote_SufficientCountAndFresh(t *testing.T) { + c := newTestChecker() + c.failureCounts["nodeA"] = 5 + c.lastProbeTime["nodeA"] = time.Now() + assert.True(t, c.ShouldVote("nodeA").Vote) +} + +func TestShouldVote_ExactSoftThreshold(t *testing.T) { + c := newTestChecker() + c.failureCounts["nodeA"] = 3 + c.lastProbeTime["nodeA"] = time.Now() + assert.True(t, c.ShouldVote("nodeA").Vote) +} + +func TestShouldVote_CountBelowThreshold(t *testing.T) { + c := newTestChecker() + c.failureCounts["nodeA"] = 2 + c.lastProbeTime["nodeA"] = time.Now() + assert.False(t, c.ShouldVote("nodeA").Vote) +} + +func TestShouldVote_StaleData(t *testing.T) { + c := newTestChecker() + c.failureCounts["nodeA"] = 5 + c.lastProbeTime["nodeA"] = time.Now().Add(-10 * time.Second) // stale (> 2*3s) + assert.False(t, c.ShouldVote("nodeA").Vote) +} + +func TestShouldVote_NeverProbed(t *testing.T) { + c := newTestChecker() + assert.False(t, c.ShouldVote("unknown").Vote) +} + +func TestShouldVote_AfterReset(t *testing.T) { + c := newTestChecker() + c.failureCounts["nodeA"] = 5 + c.lastProbeTime["nodeA"] = time.Now() + c.resetFailureCount("nodeA") + assert.False(t, c.ShouldVote("nodeA").Vote) +} + +// TestStopCoordinate_WaitsForLoopToExit verifies Fix 1: StopCoordinate must +// block until the coordinateLoop goroutine has fully exited, not just until the +// cancel signal has been sent. Without the fix, StopCoordinate returns while +// handleProposal is still executing, creating a window where two coordinateLoops +// can run concurrently after a rapid leader-change cycle. +func TestStopCoordinate_WaitsForLoopToExit(t *testing.T) { + gate := make(chan struct{}) // released by test to unblock the voter + entered := make(chan struct{}) // closed when voter goroutine is active + + checker := &ClusterChecker{ + namespace: "ns", + clusterName: "test", + failoverProposalCh: make(chan failoverProposal, 1), + syncCh: make(chan struct{}, 1), + failureCounts: make(map[string]int64), + lastProbeTime: make(map[string]time.Time), + voter: voterFunc(func(_ context.Context, _ VoteRequest) (bool, error) { + close(entered) // signal: voter is now running + <-gate // hold until test releases + return false, nil + }), + } + checker.ctx, checker.cancelFn = context.WithCancel(context.Background()) + checker.StartCoordinate() + defer checker.Close() + + // Deliver a proposal so coordinateLoop enters handleProposal. + checker.failoverProposalCh <- failoverProposal{namespace: "ns", clusterName: "test"} + <-entered // voter is now blocked inside handleProposal + + stopped := make(chan struct{}) + go func() { + checker.StopCoordinate() + close(stopped) + }() + + // StopCoordinate must NOT return while the voter still holds the gate. + select { + case <-stopped: + t.Fatal("StopCoordinate returned before coordinateLoop exited") + case <-time.After(60 * time.Millisecond): + // correct: still waiting + } + + // Release the voter — coordinateLoop can now exit. + close(gate) + + select { + case <-stopped: + // correct + case <-time.After(500 * time.Millisecond): + t.Fatal("StopCoordinate did not return after coordinateLoop exited") + } +} + +// TestIncreaseFailureCount_NoSpuriousProposals verifies Fix 2: proposals must +// only be sent at exact multiples of maxFailureCount (3, 6, 9 …), not on every +// tick once count exceeds the threshold (the old "|| count > max" bug). +func TestIncreaseFailureCount_NoSpuriousProposals(t *testing.T) { + masterNode := store.NewClusterMockNode() + masterNode.SetRole(store.RoleMaster) + + voterCh := make(chan struct{}, 10) + checker := &ClusterChecker{ + namespace: "ns", + clusterName: "test", + options: ClusterCheckOptions{ + maxFailureCount: 3, + voteThresholdRatio: 0.6, + }, + failureCounts: make(map[string]int64), + lastProbeTime: make(map[string]time.Time), + failoverProposalCh: make(chan failoverProposal, 1), + syncCh: make(chan struct{}, 1), + voter: voterFunc(func(_ context.Context, _ VoteRequest) (bool, error) { + voterCh <- struct{}{} + return false, nil + }), + } + checker.ctx, checker.cancelFn = context.WithCancel(context.Background()) + checker.StartCoordinate() + defer checker.Close() + + maxF := checker.options.maxFailureCount + + // counts 1 .. maxF-1: no proposal expected + for i := int64(0); i < maxF-1; i++ { + checker.increaseFailureCount(0, masterNode) + } + time.Sleep(30 * time.Millisecond) + assert.Empty(t, voterCh, "no proposal before threshold") + + // count = maxF: proposal expected + checker.increaseFailureCount(0, masterNode) + require.Eventually(t, func() bool { return len(voterCh) >= 1 }, + 300*time.Millisecond, 5*time.Millisecond, "proposal at threshold") + <-voterCh // drain + + // count = maxF+1: must NOT generate a new proposal (the old bug) + checker.increaseFailureCount(0, masterNode) + time.Sleep(50 * time.Millisecond) + assert.Empty(t, voterCh, "count=maxFailureCount+1 must not trigger a proposal") + + // count = 2*maxF: next multiple → proposal expected + for i := int64(1); i < maxF; i++ { + checker.increaseFailureCount(0, masterNode) + } + require.Eventually(t, func() bool { return len(voterCh) >= 1 }, + 300*time.Millisecond, 5*time.Millisecond, "proposal at 2× threshold") +} + +// voterFunc lets tests inject a voter without a real VoteCoordinator. +type voterFunc func(ctx context.Context, req VoteRequest) (bool, error) + +func (f voterFunc) RequestVotes(ctx context.Context, req VoteRequest) (bool, error) { + return f(ctx, req) +} + +func TestFailoverProposal_SentOnThreshold(t *testing.T) { + s := NewMockClusterStore() + masterNode := store.NewClusterMockNode() + masterNode.SetRole(store.RoleMaster) + slaveNode := store.NewClusterMockNode() + slaveNode.SetRole(store.RoleSlave) + slaveNode.Sequence = 10 + + ctx := context.Background() + clusterInfo := &store.Cluster{ + Name: "test-cluster", + Shards: []*store.Shard{{ + Nodes: []store.Node{masterNode, slaveNode}, + SlotRanges: []store.SlotRange{{Start: 0, Stop: 16383}}, + MigratingSlot: &store.MigratingSlot{}, + TargetShardIndex: -1, + }}, + } + clusterInfo.Version.Store(1) + require.NoError(t, s.CreateCluster(ctx, "ns", clusterInfo)) + + voteCalled := make(chan VoteRequest, 1) + checker := &ClusterChecker{ + clusterStore: s, + namespace: "ns", + clusterName: "test-cluster", + options: ClusterCheckOptions{ + pingInterval: time.Second, + maxFailureCount: 3, + voteThresholdRatio: 0.6, + }, + failureCounts: make(map[string]int64), + lastProbeTime: make(map[string]time.Time), + failoverProposalCh: make(chan failoverProposal, 1), + voter: voterFunc(func(_ context.Context, req VoteRequest) (bool, error) { + voteCalled <- req + return false, nil // return false to avoid PromoteNewMaster with partial state + }), + syncCh: make(chan struct{}, 1), + } + checker.ctx, checker.cancelFn = context.WithCancel(context.Background()) + checker.StartCoordinate() + defer checker.Close() + + for i := int64(0); i < checker.options.maxFailureCount; i++ { + checker.increaseFailureCount(0, masterNode) + } + + select { + case req := <-voteCalled: + assert.Equal(t, masterNode.ID(), req.FailedNodeID) + assert.Equal(t, "test-cluster", req.ClusterName) + case <-time.After(500 * time.Millisecond): + t.Fatal("coordinateLoop did not call voter within timeout") + } +} diff --git a/controller/controller.go b/controller/controller.go index 5d2c446d..f8b2659e 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -43,6 +43,7 @@ const ( type Controller struct { config *config.ControllerConfig clusterStore *store.ClusterStore + voter Voter mu sync.Mutex clusters map[string]*ClusterChecker @@ -57,6 +58,7 @@ func New(s *store.ClusterStore, config *config.ControllerConfig) (*Controller, e c := &Controller{ config: config, clusterStore: s, + voter: nopVoter{}, clusters: make(map[string]*ClusterChecker), readyCh: make(chan struct{}, 1), closeCh: make(chan struct{}), @@ -65,6 +67,17 @@ func New(s *store.ClusterStore, config *config.ControllerConfig) (*Controller, e return c, nil } +func (c *Controller) WithVoter(v Voter) *Controller { + c.voter = v + return c +} + +// GetClusterChecker returns the ClusterChecker for the given cluster. +// Used by the /internal/vote HTTP handler. +func (c *Controller) GetClusterChecker(namespace, clusterName string) (*ClusterChecker, error) { + return c.getCluster(namespace, clusterName) +} + func (c *Controller) Start(ctx context.Context) error { if !c.state.CompareAndSwap(stateInit, stateRunning) { return nil @@ -81,12 +94,12 @@ func (c *Controller) WaitForReady() { <-c.readyCh } -// suspend stops the controller from processing events if it's not the leader +// suspend stops only the coordinateLoop on all checkers (probeLoop keeps running). +// All checkers remain in c.clusters so non-leader nodes keep probing. func (c *Controller) suspend() { c.mu.Lock() - for key, cluster := range c.clusters { - cluster.Close() - delete(c.clusters, key) + for _, cluster := range c.clusters { + cluster.StopCoordinate() } c.mu.Unlock() } @@ -110,25 +123,34 @@ func (c *Controller) resume(ctx context.Context) error { return nil } -func (c *Controller) becomeLeader(ctx context.Context, prevTermLeader string) { - if prevTermLeader == c.clusterStore.ID() { - return +func (c *Controller) startAllCoordinate() { + c.mu.Lock() + for _, cluster := range c.clusters { + cluster.StartCoordinate() } - if err := c.resume(ctx); err != nil { - logger.Get().Error("Failed to resume the controller", zap.Error(err)) + c.mu.Unlock() +} + +func (c *Controller) becomeLeader(_ context.Context, prevTermLeader string) { + if prevTermLeader == c.clusterStore.ID() { return } - logger.Get().Info("Became the leader, resume the controller") + c.startAllCoordinate() + logger.Get().Info("Became the leader, started coordinate loops") } func (c *Controller) syncLoop(ctx context.Context) { defer c.wg.Done() - prevTermLeader := "" + // All nodes load checkers at startup so probeLoop runs everywhere. + if err := c.resume(ctx); err != nil { + logger.Get().Error("Failed to resume cluster checkers", zap.Error(err)) + } + if c.clusterStore.IsLeader() { - c.becomeLeader(ctx, prevTermLeader) + c.startAllCoordinate() } - prevTermLeader = c.clusterStore.Leader() + prevTermLeader := c.clusterStore.Leader() c.readyCh <- struct{}{} for { @@ -140,12 +162,11 @@ func (c *Controller) syncLoop(ctx context.Context) { prevTermLeader = c.clusterStore.ID() } } else { - if prevTermLeader != c.clusterStore.ID() { - continue + if prevTermLeader == c.clusterStore.ID() { + c.suspend() + prevTermLeader = c.clusterStore.Leader() + logger.Get().Warn("Lost the leader, suspended coordinate loops") } - c.suspend() - prevTermLeader = c.clusterStore.Leader() - logger.Get().Warn("Lost the leader, suspend the controller") } case <-c.closeCh: return @@ -158,16 +179,20 @@ func (c *Controller) leaderEventLoop() { for { select { case event := <-c.clusterStore.Notify(): - if !c.clusterStore.IsLeader() || event.Type != store.EventCluster { + if event.Type != store.EventCluster { continue } switch event.Command { case store.CommandCreate: + // All nodes maintain a checker set so they can respond to /internal/vote c.addCluster(event.Namespace, event.Cluster) case store.CommandRemove: c.removeCluster(event.Namespace, event.Cluster) case store.CommandUpdate: - c.updateCluster(event.Namespace, event.Cluster) + // Only the leader needs to sync topology changes to kvrocks nodes + if c.clusterStore.IsLeader() { + c.updateCluster(event.Namespace, event.Cluster) + } default: logger.Get().Error("Unknown command", zap.Any("event", event)) } @@ -187,14 +212,23 @@ func (c *Controller) addCluster(namespace, clusterName string) { return } - cluster := NewClusterChecker(c.clusterStore, namespace, clusterName). - WithPingInterval(time.Duration(c.config.FailOver.PingIntervalSeconds) * time.Second). - WithMaxFailureCount(c.config.FailOver.MaxPingCount). - WithSlaveHAUpdate(c.config.FailOver.EnableSlaveHAUpdate) - cluster.Start() + checker := NewClusterChecker(c.clusterStore, namespace, clusterName). + WithVoter(c.voter) + if c.config != nil { + checker. + WithPingInterval(time.Duration(c.config.FailOver.PingIntervalSeconds) * time.Second). + WithMaxFailureCount(c.config.FailOver.MaxPingCount). + WithSlaveHAUpdate(c.config.FailOver.EnableSlaveHAUpdate). + WithVoteThresholdRatio(c.config.FailOver.VoteThresholdRatio) + } + checker.Start() + + if c.clusterStore.IsLeader() { + checker.StartCoordinate() + } c.mu.Lock() - c.clusters[key] = cluster + c.clusters[key] = checker c.mu.Unlock() } @@ -240,7 +274,13 @@ func (c *Controller) Close() { return } - c.suspend() + c.mu.Lock() + for key, cluster := range c.clusters { + cluster.Close() + delete(c.clusters, key) + } + c.mu.Unlock() + close(c.readyCh) close(c.closeCh) c.wg.Wait() diff --git a/controller/vote_coordinator.go b/controller/vote_coordinator.go new file mode 100644 index 00000000..19e61e12 --- /dev/null +++ b/controller/vote_coordinator.go @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package controller + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "time" + + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" + + "github.com/apache/kvrocks-controller/logger" + "github.com/apache/kvrocks-controller/metrics" + "github.com/apache/kvrocks-controller/store" +) + +// Voter is implemented by VoteCoordinator and by nopVoter (default single-node). +type Voter interface { + RequestVotes(ctx context.Context, req VoteRequest) (bool, error) +} + +// VoteRequest is the JSON body sent to /internal/vote on each peer. +type VoteRequest struct { + Namespace string `json:"namespace"` + ClusterName string `json:"cluster_name"` + ShardIndex int `json:"shard_index"` + FailedNodeID string `json:"failed_node_id"` +} + +// VoteResponse is the JSON body returned by /internal/vote. +// The diagnostic fields (FailureCount, SoftThreshold, LastProbeAgoMs) are +// populated by the peer so the leader can log them when a peer votes NO, +// making it possible to answer "why didn't failover happen?" in production. +type VoteResponse struct { + Vote bool `json:"vote"` + Reason string `json:"reason,omitempty"` + FailureCount int64 `json:"failure_count,omitempty"` + SoftThreshold int64 `json:"soft_threshold,omitempty"` + LastProbeAgoMs int64 `json:"last_probe_ago_ms,omitempty"` +} + +// nopVoter always approves — used as default when no coordinator is configured. +type nopVoter struct{} + +func (nopVoter) RequestVotes(_ context.Context, _ VoteRequest) (bool, error) { + return true, nil +} + +// VoteCoordinator asks all active peer controllers to vote before a failover. +type VoteCoordinator struct { + clusterStore *store.ClusterStore + voteTimeout time.Duration + httpClient *http.Client +} + +// NewVoteCoordinator creates a coordinator. voteTimeout is the per-peer HTTP deadline. +func NewVoteCoordinator(s *store.ClusterStore, voteTimeout time.Duration) *VoteCoordinator { + return &VoteCoordinator{ + clusterStore: s, + voteTimeout: voteTimeout, + httpClient: &http.Client{Timeout: voteTimeout + 500*time.Millisecond}, + } +} + +// RequestVotes sends a vote request to every active peer concurrently. +// Returns true only when all active peers respond YES. +// An empty peer list (single-node deployment) returns true immediately. +// If a peer times out and its lease is still alive it is treated as NO +// (network partition protection). If its lease has expired it is excluded. +func (v *VoteCoordinator) RequestVotes(ctx context.Context, req VoteRequest) (approved bool, err error) { + start := time.Now() + defer func() { + result := "approved" + if err != nil { + result = "error" + } else if !approved { + result = "blocked" + } + metrics.Get().VoteRoundDurationMs.With(prometheus.Labels{ + "namespace": req.Namespace, "cluster": req.ClusterName, "result": result, + }).Observe(float64(time.Since(start).Milliseconds())) + }() + + activePeers, err := v.clusterStore.ListActivePeers(ctx) + if err != nil { + return false, err + } + metrics.Get().ActivePeersCount.With(prometheus.Labels{}).Set(float64(len(activePeers))) + if len(activePeers) == 0 { + return true, nil + } + + logger.Get().Info("Starting vote round", + zap.String("namespace", req.Namespace), + zap.String("cluster", req.ClusterName), + zap.String("failed_node", req.FailedNodeID), + zap.Int("peer_count", len(activePeers)), + ) + + type result struct { + peerID string + resp VoteResponse + callErr error + elapsedMs int64 + } + + resultCh := make(chan result, len(activePeers)) + for _, peer := range activePeers { + go func(p store.PeerInfo) { + start := time.Now() + voteCtx, cancel := context.WithTimeout(ctx, v.voteTimeout) + defer cancel() + resp, callErr := v.callVote(voteCtx, p.HTTPAddr, req) + resultCh <- result{ + peerID: p.ID, + resp: resp, + callErr: callErr, + elapsedMs: time.Since(start).Milliseconds(), + } + }(peer) + } + + for i := 0; i < len(activePeers); i++ { + r := <-resultCh + if r.callErr != nil { + // Timed out or network error — check if the lease was alive at the + // start of this vote round. peerTTL (15 s) >> voteTimeout (≤ 500 ms), + // so a lease that was fresh when activePeers was fetched cannot have + // expired by the time a call fails; re-fetching the list would return + // identical data and waste a store round-trip per failing peer. + if peerExists(activePeers, r.peerID) { + logger.Get().Warn("Failover blocked: peer unreachable but lease alive", + zap.String("peer_id", r.peerID), + zap.Int64("elapsed_ms", r.elapsedMs), + zap.Error(r.callErr), + ) + metrics.Get().FailoverBlocked.With(prometheus.Labels{ + "namespace": req.Namespace, "cluster": req.ClusterName, + "reason": "peer_unreachable", + }).Inc() + return false, nil + } + logger.Get().Info("Peer lease expired, excluding from quorum", + zap.String("peer_id", r.peerID), + ) + continue + } + if !r.resp.Vote { + logger.Get().Warn("Failover blocked: peer voted NO", + zap.String("peer_id", r.peerID), + zap.Int64("elapsed_ms", r.elapsedMs), + zap.String("reason", r.resp.Reason), + zap.Int64("failure_count", r.resp.FailureCount), + zap.Int64("soft_threshold", r.resp.SoftThreshold), + zap.Int64("last_probe_ago_ms", r.resp.LastProbeAgoMs), + ) + metrics.Get().FailoverBlocked.With(prometheus.Labels{ + "namespace": req.Namespace, "cluster": req.ClusterName, + "reason": "peer_voted_no", + }).Inc() + return false, nil + } + logger.Get().Debug("Peer voted YES", + zap.String("peer_id", r.peerID), + zap.Int64("elapsed_ms", r.elapsedMs), + zap.Int64("failure_count", r.resp.FailureCount), + ) + } + logger.Get().Info("Vote round approved by all peers", + zap.String("namespace", req.Namespace), + zap.String("cluster", req.ClusterName), + zap.String("failed_node", req.FailedNodeID), + zap.Int("peer_count", len(activePeers)), + ) + return true, nil +} + +func (v *VoteCoordinator) callVote(ctx context.Context, peerAddr string, req VoteRequest) (VoteResponse, error) { + body, err := json.Marshal(req) + if err != nil { + return VoteResponse{}, err + } + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, + "http://"+peerAddr+"/internal/vote", bytes.NewReader(body)) + if err != nil { + return VoteResponse{}, err + } + httpReq.Header.Set("Content-Type", "application/json") + + resp, err := v.httpClient.Do(httpReq) + if err != nil { + return VoteResponse{}, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return VoteResponse{}, fmt.Errorf("peer returned HTTP %d", resp.StatusCode) + } + var voteResp VoteResponse + if err := json.NewDecoder(resp.Body).Decode(&voteResp); err != nil { + return VoteResponse{}, err + } + return voteResp, nil +} + +func peerExists(peers []store.PeerInfo, id string) bool { + for _, p := range peers { + if p.ID == id { + return true + } + } + return false +} diff --git a/controller/vote_coordinator_test.go b/controller/vote_coordinator_test.go new file mode 100644 index 00000000..d2162b81 --- /dev/null +++ b/controller/vote_coordinator_test.go @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package controller + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/apache/kvrocks-controller/store" + "github.com/apache/kvrocks-controller/store/engine" +) + +func makeVoteServer(vote bool) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _ = json.NewEncoder(w).Encode(VoteResponse{Vote: vote}) + })) +} + +func makeSlowServer(delay time.Duration) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(delay) + _ = json.NewEncoder(w).Encode(VoteResponse{Vote: true}) + })) +} + +var testVoteReq = VoteRequest{ + Namespace: "ns", + ClusterName: "cluster", + ShardIndex: 0, + FailedNodeID: "dead-node", +} + +func TestRequestVotes_NoPeers_ReturnsTrue(t *testing.T) { + m := engine.NewMock() + m.SetID("solo") + s := store.NewClusterStore(m) + coord := NewVoteCoordinator(s, 500*time.Millisecond) + + approved, err := coord.RequestVotes(context.Background(), testVoteReq) + require.NoError(t, err) + assert.True(t, approved) +} + +func TestCallVote_YesResponse(t *testing.T) { + peer := makeVoteServer(true) + defer peer.Close() + + coord := &VoteCoordinator{ + voteTimeout: 500 * time.Millisecond, + httpClient: &http.Client{Timeout: 500 * time.Millisecond}, + } + addr := peer.URL[len("http://"):] + resp, err := coord.callVote(context.Background(), addr, testVoteReq) + require.NoError(t, err) + assert.True(t, resp.Vote) +} + +func TestCallVote_NoResponse(t *testing.T) { + peer := makeVoteServer(false) + defer peer.Close() + + coord := &VoteCoordinator{ + voteTimeout: 500 * time.Millisecond, + httpClient: &http.Client{Timeout: 500 * time.Millisecond}, + } + addr := peer.URL[len("http://"):] + resp, err := coord.callVote(context.Background(), addr, testVoteReq) + require.NoError(t, err) + assert.False(t, resp.Vote) +} + +func TestRequestVotes_OneNo_ReturnsFalse(t *testing.T) { + yesServer := makeVoteServer(true) + defer yesServer.Close() + noServer := makeVoteServer(false) + defer noServer.Close() + + ctx := context.Background() + m := engine.NewMock() + m.SetID("leader") + // Register two peers with the current timestamp so ListActivePeers treats them as live. + now := time.Now().Unix() + _ = m.Set(ctx, "/kvrocks/peers/peer-yes", []byte(fmt.Sprintf("%s|%d", yesServer.URL[len("http://"):], now))) + _ = m.Set(ctx, "/kvrocks/peers/peer-no", []byte(fmt.Sprintf("%s|%d", noServer.URL[len("http://"):], now))) + + s := store.NewClusterStore(m) + coord := NewVoteCoordinator(s, 500*time.Millisecond) + + approved, err := coord.RequestVotes(ctx, testVoteReq) + require.NoError(t, err) + assert.False(t, approved) +} + +func TestRequestVotes_AllYes_ReturnsTrue(t *testing.T) { + yes1 := makeVoteServer(true) + defer yes1.Close() + yes2 := makeVoteServer(true) + defer yes2.Close() + + ctx := context.Background() + m := engine.NewMock() + m.SetID("leader") + now := time.Now().Unix() + _ = m.Set(ctx, "/kvrocks/peers/peer-1", []byte(fmt.Sprintf("%s|%d", yes1.URL[len("http://"):], now))) + _ = m.Set(ctx, "/kvrocks/peers/peer-2", []byte(fmt.Sprintf("%s|%d", yes2.URL[len("http://"):], now))) + + s := store.NewClusterStore(m) + coord := NewVoteCoordinator(s, 500*time.Millisecond) + + approved, err := coord.RequestVotes(ctx, testVoteReq) + require.NoError(t, err) + assert.True(t, approved) +} + +func TestCallVote_Timeout(t *testing.T) { + slow := makeSlowServer(1 * time.Second) + defer slow.Close() + + coord := &VoteCoordinator{ + voteTimeout: 100 * time.Millisecond, + httpClient: &http.Client{Timeout: 200 * time.Millisecond}, + } + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + _, err := coord.callVote(ctx, slow.URL[len("http://"):], testVoteReq) + assert.Error(t, err) +} diff --git a/metrics/setup.go b/metrics/setup.go index ecede8b8..f8f383f2 100644 --- a/metrics/setup.go +++ b/metrics/setup.go @@ -26,10 +26,38 @@ import ( ) type performanceMetrics struct { + // HTTP performance metrics (populated by middleware) Latencies *prometheus.HistogramVec HTTPCodes *prometheus.CounterVec Payload *prometheus.CounterVec HTTPServerPanics *prometheus.CounterVec + + // HA voting and failover metrics + // + // FailoverProposals counts every time the coordinateLoop dequeues a + // proposal and calls RequestVotes, regardless of outcome. + FailoverProposals *prometheus.CounterVec // labels: namespace, cluster + // FailoverCompleted counts successful failovers (UpdateCluster persisted). + FailoverCompleted *prometheus.CounterVec // labels: namespace, cluster + // FailoverBlocked counts proposals that were blocked before promotion. + // reason: "peer_voted_no" | "peer_unreachable" | "vote_error" + FailoverBlocked *prometheus.CounterVec // labels: namespace, cluster, reason + // VoteRoundDurationMs is the wall-clock duration of one RequestVotes call. + // result: "approved" | "blocked" | "error" + VoteRoundDurationMs *prometheus.HistogramVec // labels: namespace, cluster, result + // NodeFailureCount is the current consecutive probe-failure count for each + // kvrocks node. Useful for "approaching threshold" alerts. + NodeFailureCount *prometheus.GaugeVec // labels: namespace, cluster, node_id + // ProbeFailures counts every individual probe failure, enabling rate-based + // alerting ("node unreachable right now") independently of the failure-count + // threshold used for failover decisions. + // is_master: "true" | "false" — master failures warrant stricter alert thresholds. + ProbeFailures *prometheus.CounterVec // labels: namespace, cluster, node_id, is_master + // ActivePeersCount is the number of live peer controllers visible to this node + // at the time of the most recent vote round. Drops to 0 in single-node mode. + // Alert when this falls below the expected cluster size — the controller cluster + // has lost redundancy even if kvrocks failover still works. + ActivePeersCount *prometheus.GaugeVec // labels: (none — node-scoped) } var _metrics *performanceMetrics @@ -55,6 +83,21 @@ func NewHistogramHelper(ns, subsystem, name string, buckets []float64, labels .. return histogram } +// NewGaugeHelper creates and registers a prometheus gauge metric. +func NewGaugeHelper(ns, subsystem, name string, labels ...string) *prometheus.GaugeVec { + ns = strings.ReplaceAll(ns, "-", "_") + subsystem = strings.ReplaceAll(subsystem, "-", "_") + name = strings.ReplaceAll(name, "-", "_") + g := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, + Subsystem: subsystem, + Name: name, + Help: name, + }, labels) + prometheus.MustRegister(g) + return g +} + // NewCounterHelper was used to fast create and register prometheus counter metric func NewCounterHelper(ns, subsystem, name string, labels ...string) *prometheus.CounterVec { ns = strings.ReplaceAll(ns, "-", "_") @@ -78,10 +121,22 @@ func setupMetrics() { newCounter := func(name string, labels ...string) *prometheus.CounterVec { return NewCounterHelper(_namespace, _subsystem, name, labels...) } + newGauge := func(name string, labels ...string) *prometheus.GaugeVec { + return NewGaugeHelper(_namespace, _subsystem, name, labels...) + } + voteBuckets := prometheus.ExponentialBuckets(1, 2, 12) // 1ms … 4096ms _metrics = &performanceMetrics{ Latencies: newHistogram("request_latency", labels...), HTTPCodes: newCounter("http_code", labels...), Payload: newCounter("http_payload", labels...), + + ActivePeersCount: newGauge("active_peers_count"), + ProbeFailures: newCounter("probe_failures_total", "namespace", "cluster", "node_id", "is_master"), + FailoverProposals: newCounter("failover_proposals_total", "namespace", "cluster"), + FailoverCompleted: newCounter("failover_completed_total", "namespace", "cluster"), + FailoverBlocked: newCounter("failover_blocked_total", "namespace", "cluster", "reason"), + VoteRoundDurationMs: NewHistogramHelper(_namespace, _subsystem, "vote_round_duration_ms", voteBuckets, "namespace", "cluster", "result"), + NodeFailureCount: newGauge("node_failure_count", "namespace", "cluster", "node_id"), } } diff --git a/server/api/internal.go b/server/api/internal.go new file mode 100644 index 00000000..95dd5b9d --- /dev/null +++ b/server/api/internal.go @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package api + +import ( + "net/http" + + "github.com/gin-gonic/gin" + + "github.com/apache/kvrocks-controller/controller" +) + +// InternalHandler handles controller-to-controller internal RPC endpoints. +// These routes must NOT be gated by the leader-redirect middleware. +type InternalHandler struct { + ctrl *controller.Controller +} + +func NewInternalHandler(ctrl *controller.Controller) *InternalHandler { + return &InternalHandler{ctrl: ctrl} +} + +// Vote responds to a failover vote request from the leader controller. +// POST /internal/vote +func (h *InternalHandler) Vote(c *gin.Context) { + var req controller.VoteRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + checker, err := h.ctrl.GetClusterChecker(req.Namespace, req.ClusterName) + if err != nil { + // This node has no checker for the cluster — abstain rather than veto. + c.JSON(http.StatusOK, controller.VoteResponse{ + Vote: true, + Reason: "no local checker, abstain", + }) + return + } + + c.JSON(http.StatusOK, checker.ShouldVote(req.FailedNodeID)) +} diff --git a/server/api/internal_test.go b/server/api/internal_test.go new file mode 100644 index 00000000..24952ad8 --- /dev/null +++ b/server/api/internal_test.go @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package api_test + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/gin-gonic/gin" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/apache/kvrocks-controller/controller" + "github.com/apache/kvrocks-controller/server/api" + "github.com/apache/kvrocks-controller/store" + "github.com/apache/kvrocks-controller/store/engine" +) + +func TestVoteHandler_NoChecker_Abstains(t *testing.T) { + gin.SetMode(gin.TestMode) + + s := store.NewClusterStore(engine.NewMock()) + ctrl, err := controller.New(s, nil) + require.NoError(t, err) + + h := api.NewInternalHandler(ctrl) + r := gin.New() + r.POST("/internal/vote", h.Vote) + + body, _ := json.Marshal(controller.VoteRequest{ + Namespace: "ns", + ClusterName: "missing-cluster", + ShardIndex: 0, + FailedNodeID: "node-1", + }) + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/internal/vote", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + r.ServeHTTP(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + var resp controller.VoteResponse + require.NoError(t, json.NewDecoder(w.Body).Decode(&resp)) + assert.True(t, resp.Vote, "no checker should abstain (true)") +} diff --git a/server/middleware/middleware.go b/server/middleware/middleware.go index 36f40563..1d3fa4a4 100644 --- a/server/middleware/middleware.go +++ b/server/middleware/middleware.go @@ -24,6 +24,7 @@ import ( "errors" "net/http" "strconv" + "strings" "time" "github.com/apache/kvrocks-controller/store/engine/raft" @@ -61,6 +62,11 @@ func CollectMetrics(c *gin.Context) { } func RedirectIfNotLeader(c *gin.Context) { + // Internal routes must be reachable on all nodes (e.g. /internal/vote). + if strings.HasPrefix(c.Request.URL.Path, "/internal/") { + c.Next() + return + } storage, _ := c.MustGet(consts.ContextKeyStore).(*store.ClusterStore) if storage.Leader() == "" { c.JSON(http.StatusBadRequest, gin.H{"error": "no leader now, please retry later"}) diff --git a/server/middleware/middleware_test.go b/server/middleware/middleware_test.go new file mode 100644 index 00000000..008b3832 --- /dev/null +++ b/server/middleware/middleware_test.go @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package middleware_test + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/gin-gonic/gin" + "github.com/stretchr/testify/assert" + + "github.com/apache/kvrocks-controller/consts" + "github.com/apache/kvrocks-controller/server/middleware" + "github.com/apache/kvrocks-controller/store" + "github.com/apache/kvrocks-controller/store/engine" +) + +func TestRedirectIfNotLeader_SkipsInternalPaths(t *testing.T) { + gin.SetMode(gin.TestMode) + r := gin.New() + + // Make this node a follower: engine ID differs from Leader() + // Mock.Leader() always returns "mock_store_engine"; set ID to something else. + mockEngine := engine.NewMock() + mockEngine.SetID("follower-node") + s := store.NewClusterStore(mockEngine) + + r.Use(func(c *gin.Context) { + c.Set(consts.ContextKeyStore, s) + c.Next() + }, middleware.RedirectIfNotLeader) + + r.POST("/internal/vote", func(c *gin.Context) { + c.JSON(http.StatusOK, gin.H{"ok": true}) + }) + + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/internal/vote", nil) + r.ServeHTTP(w, req) + + // Must NOT be redirected — internal routes bypass leader check. + // Check Location header is absent: if the middleware ran its redirect branch, + // http.Redirect would have set Location regardless of whether it wrote a body. + assert.Equal(t, http.StatusOK, w.Code) + assert.Empty(t, w.Header().Get("Location"), "redirect must not be issued for /internal/ paths") +} diff --git a/server/route.go b/server/route.go index 113a1a09..1bf37344 100644 --- a/server/route.go +++ b/server/route.go @@ -32,6 +32,12 @@ import ( func (srv *Server) initHandlers() { engine := srv.engine + + // Internal routes registered BEFORE global middleware so they bypass + // RedirectIfNotLeader — all nodes must be able to serve these endpoints. + internalHandler := api.NewInternalHandler(srv.controller) + engine.POST("/internal/vote", internalHandler.Vote) + engine.Use(middleware.CollectMetrics, func(c *gin.Context) { c.Set(consts.ContextKeyStore, srv.store) c.Next() diff --git a/server/server.go b/server/server.go index fdb14cdb..6b006309 100644 --- a/server/server.go +++ b/server/server.go @@ -33,6 +33,7 @@ import ( "github.com/apache/kvrocks-controller/store/engine/raft" "github.com/gin-gonic/gin" + "go.uber.org/zap" "github.com/apache/kvrocks-controller/config" "github.com/apache/kvrocks-controller/controller" @@ -84,10 +85,16 @@ func NewServer(cfg *config.Config) (*Server, error) { } clusterStore := store.NewClusterStore(persist) + + voteTimeout := time.Duration(cfg.Controller.FailOver.VoteTimeoutMs) * time.Millisecond + voter := controller.NewVoteCoordinator(clusterStore, voteTimeout) + ctrl, err := controller.New(clusterStore, cfg.Controller) if err != nil { return nil, err } + ctrl.WithVoter(voter) + gin.SetMode(gin.ReleaseMode) return &Server{ store: clusterStore, @@ -133,6 +140,11 @@ func (srv *Server) Start(ctx context.Context) error { if ok := srv.store.IsReady(ctx); !ok { return fmt.Errorf("the cluster store is not ready") } + // Register this node so peers can discover it for voting. + if err := srv.store.RegisterSelf(ctx, srv.config.Addr); err != nil { + // Non-fatal: single-node mode still works without registration. + logger.Get().Warn("Failed to register self for peer discovery", zap.Error(err)) + } if err := srv.controller.Start(ctx); err != nil { return err } diff --git a/store/engine/engine_inmemory.go b/store/engine/engine_inmemory.go index 00c17dd7..3f31a0cd 100644 --- a/store/engine/engine_inmemory.go +++ b/store/engine/engine_inmemory.go @@ -32,14 +32,22 @@ var _ Engine = (*Mock)(nil) type Mock struct { mu sync.Mutex values map[string]string + id string } func NewMock() *Mock { return &Mock{ values: make(map[string]string), + id: "mock_store_engine", } } +func (m *Mock) SetID(id string) { + m.mu.Lock() + defer m.mu.Unlock() + m.id = id +} + func (m *Mock) Get(_ context.Context, key string) ([]byte, error) { m.mu.Lock() defer m.mu.Unlock() @@ -103,7 +111,9 @@ func (m *Mock) Close() error { } func (m *Mock) ID() string { - return "mock_store_engine" + m.mu.Lock() + defer m.mu.Unlock() + return m.id } func (m *Mock) Leader() string { diff --git a/store/helper.go b/store/helper.go index 5bb4f65f..16a08fe2 100644 --- a/store/helper.go +++ b/store/helper.go @@ -25,6 +25,8 @@ import ( const nsPrefix = "/kvrocks/metadata" +const peerKeyPrefix = "/kvrocks/peers/" + func appendPrefix(ns string) string { return nsPrefix + "/" + ns } diff --git a/store/store.go b/store/store.go index fc319fcc..35f3e557 100644 --- a/store/store.go +++ b/store/store.go @@ -24,11 +24,15 @@ import ( "context" "encoding/json" "fmt" - "github.com/apache/kvrocks-controller/logger" - "go.uber.org/zap" + "strconv" + "strings" "sync" + "time" + + "go.uber.org/zap" "github.com/apache/kvrocks-controller/consts" + "github.com/apache/kvrocks-controller/logger" "github.com/apache/kvrocks-controller/store/engine" ) @@ -68,6 +72,96 @@ func NewClusterStore(e engine.Engine) *ClusterStore { } } +const ( + // peerTTL is how long a peer registration is considered live without a renewal. + // Must be > peerRenewalInterval so a single missed tick doesn't evict a healthy peer. + peerTTL = 15 * time.Second + // peerRenewalInterval is how often RegisterSelf refreshes its timestamp in the store. + peerRenewalInterval = 5 * time.Second +) + +// parsePeerEntry parses a stored peer value of the form "addr|unixTimestamp". +// Returns ok=false for entries missing or having an unparseable timestamp field; +// callers should treat those as stale. +func parsePeerEntry(val string) (addr string, ts int64, ok bool) { + idx := strings.LastIndex(val, "|") + if idx < 0 { + return val, 0, false + } + n, err := strconv.ParseInt(val[idx+1:], 10, 64) + if err != nil { + return val, 0, false + } + return val[:idx], n, true +} + +// PeerInfo holds the ID and HTTP address of a peer controller node. +type PeerInfo struct { + ID string + HTTPAddr string +} + +// RegisterSelf writes this node's HTTP address into the store so other +// controllers can discover it. The value is stored as "addr|unixTimestamp" +// and refreshed every peerRenewalInterval so peers can detect liveness via +// timestamp staleness. The background goroutine runs until ctx is cancelled. +func (s *ClusterStore) RegisterSelf(ctx context.Context, httpAddr string) error { + key := peerKeyPrefix + s.e.ID() + val := fmt.Sprintf("%s|%d", httpAddr, time.Now().Unix()) + if err := s.e.Set(ctx, key, []byte(val)); err != nil { + return err + } + go func() { + ticker := time.NewTicker(peerRenewalInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + v := fmt.Sprintf("%s|%d", httpAddr, time.Now().Unix()) + if err := s.e.Set(ctx, key, []byte(v)); err != nil { + logger.Get().Warn("Failed to renew peer registration", zap.Error(err)) + } + case <-ctx.Done(): + // Clean shutdown: delete our key immediately so peers stop seeing us + // instead of waiting up to peerTTL for the timestamp to expire. + delCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + if err := s.e.Delete(delCtx, key); err != nil { + logger.Get().Warn("Failed to deregister peer on shutdown", zap.Error(err)) + } + return + } + } + }() + return nil +} + +// ListActivePeers returns all registered peer nodes whose timestamps are within +// peerTTL, excluding this node. Entries with missing or stale timestamps are +// silently excluded so dead controllers don't block quorum. +func (s *ClusterStore) ListActivePeers(ctx context.Context) ([]PeerInfo, error) { + selfID := s.e.ID() + entries, err := s.e.List(ctx, peerKeyPrefix) + if err != nil { + return nil, err + } + var peers []PeerInfo + for _, entry := range entries { + if entry.Key == selfID { + continue + } + addr, ts, ok := parsePeerEntry(string(entry.Value)) + if !ok || time.Since(time.Unix(ts, 0)) > peerTTL { + continue // stale or unparseable — exclude from quorum + } + peers = append(peers, PeerInfo{ + ID: entry.Key, + HTTPAddr: addr, + }) + } + return peers, nil +} + func (s *ClusterStore) IsReady(ctx context.Context) bool { return s.e.IsReady(ctx) } diff --git a/store/store_test.go b/store/store_test.go index b0517f70..9035ca37 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -21,14 +21,127 @@ package store import ( "context" + "fmt" "testing" + "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/apache/kvrocks-controller/consts" "github.com/apache/kvrocks-controller/store/engine" ) +func TestRegisterSelfAndListActivePeers(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + m := engine.NewMock() + + // Register node-2 + m.SetID("node-2") + s2 := NewClusterStore(m) + require.NoError(t, s2.RegisterSelf(ctx, "10.0.0.2:9379")) + + // Register node-3 + m.SetID("node-3") + s3 := NewClusterStore(m) + require.NoError(t, s3.RegisterSelf(ctx, "10.0.0.3:9379")) + + // Query from node-1's perspective + m.SetID("node-1") + s1 := NewClusterStore(m) + peers, err := s1.ListActivePeers(ctx) + require.NoError(t, err) + require.Len(t, peers, 2) + addrs := map[string]string{} + for _, p := range peers { + addrs[p.ID] = p.HTTPAddr + } + assert.Equal(t, "10.0.0.2:9379", addrs["node-2"]) + assert.Equal(t, "10.0.0.3:9379", addrs["node-3"]) + assert.NotContains(t, addrs, "node-1") +} + +func TestListActivePeers_Empty(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + m := engine.NewMock() + m.SetID("solo") + s := NewClusterStore(m) + require.NoError(t, s.RegisterSelf(ctx, "10.0.0.1:9379")) + + peers, err := s.ListActivePeers(ctx) + require.NoError(t, err) + assert.Empty(t, peers) +} + +// TestRegisterSelf_DeletesKeyOnShutdown verifies Fix 3: when the context passed +// to RegisterSelf is cancelled (clean shutdown), the background goroutine must +// delete the peer key immediately instead of letting it expire via TTL. Without +// the fix, rolling restarts leave a stale key for up to peerTTL (15 s), causing +// VoteCoordinator to treat the dead node as "alive but unreachable" and block +// failover for the entire TTL window. +func TestRegisterSelf_DeletesKeyOnShutdown(t *testing.T) { + m := engine.NewMock() + + // Register as node-dying. + m.SetID("node-dying") + sDying := NewClusterStore(m) + ctx, cancel := context.WithCancel(context.Background()) + require.NoError(t, sDying.RegisterSelf(ctx, "10.0.0.50:9379")) + + // Confirm the registration is visible from a different node's perspective. + m.SetID("node-viewer") + sViewer := NewClusterStore(m) + peers, err := sViewer.ListActivePeers(context.Background()) + require.NoError(t, err) + require.Len(t, peers, 1, "peer should be visible before shutdown") + + // Simulate clean shutdown by cancelling the context. + cancel() + + // The background goroutine should delete the key almost immediately. + require.Eventually(t, + func() bool { + p, e := sViewer.ListActivePeers(context.Background()) + return e == nil && len(p) == 0 + }, + 500*time.Millisecond, 10*time.Millisecond, + "peer key must be deleted on clean shutdown, not left to expire via TTL", + ) +} + +func TestListActivePeers_StaleEntryExcluded(t *testing.T) { + ctx := context.Background() + m := engine.NewMock() + m.SetID("viewer") + s := NewClusterStore(m) + + // Write a peer entry with a timestamp older than peerTTL. + staleTS := time.Now().Add(-2 * peerTTL).Unix() + require.NoError(t, m.Set(ctx, peerKeyPrefix+"dead-node", + []byte(fmt.Sprintf("10.0.0.99:9379|%d", staleTS)))) + + peers, err := s.ListActivePeers(ctx) + require.NoError(t, err) + assert.Empty(t, peers, "stale peer must be excluded from quorum") +} + +func TestListActivePeers_OldFormatExcluded(t *testing.T) { + ctx := context.Background() + m := engine.NewMock() + m.SetID("viewer") + s := NewClusterStore(m) + + // Write a peer entry in the old format (no timestamp) — must also be excluded. + require.NoError(t, m.Set(ctx, peerKeyPrefix+"legacy-node", + []byte("10.0.0.50:9379"))) + + peers, err := s.ListActivePeers(ctx) + require.NoError(t, err) + assert.Empty(t, peers, "legacy format peer (no timestamp) must be excluded") +} + func TestClusterStore(t *testing.T) { ctx := context.Background() store := NewClusterStore(engine.NewMock())