diff --git a/CHANGELOG.md b/CHANGELOG.md index 03b5cc5154..ac5d506734 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ * Metrics: Renamed `cortex_parquet_queryable_cache_*` to `cortex_parquet_cache_*`. * Flags: Renamed `-querier.parquet-queryable-shard-cache-size` to `-querier.parquet-shard-cache-size` and `-querier.parquet-queryable-shard-cache-ttl` to `-querier.parquet-shard-cache-ttl`. * Config: Renamed `parquet_queryable_shard_cache_size` to `parquet_shard_cache_size` and `parquet_queryable_shard_cache_ttl` to `parquet_shard_cache_ttl`. +* [FEATURE] HATracker: Add experimental support for `memberlist` and `multi` as a KV store backend. #7284 * [FEATURE] Distributor: Add `-distributor.otlp.add-metric-suffixes` flag. If true, suffixes will be added to the metrics for name normalization. #7286 * [FEATURE] StoreGateway: Introduces a new parquet mode. #7046 * [FEATURE] StoreGateway: Add a parquet shard cache to parquet mode. #7166 diff --git a/docs/configuration/arguments.md b/docs/configuration/arguments.md index 9ba9f31710..fa32ba010e 100644 --- a/docs/configuration/arguments.md +++ b/docs/configuration/arguments.md @@ -117,7 +117,7 @@ The next three options only apply when the querier is used together with the Que ### Ring/HA Tracker Store -The KVStore client is used by both the Ring and HA Tracker (HA Tracker doesn't support memberlist as KV store). +The KVStore client is used by both the Ring and HA Tracker (HA Tracker supports memberlist as a KV store as an experimental feature). - `{ring,distributor.ha-tracker}.prefix` The prefix for the keys in the store. Should end with a /. For example with a prefix of foo/, the key bar would be stored under foo/bar. - `{ring,distributor.ha-tracker}.store` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index fc43d615c4..65f443f762 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3103,9 +3103,8 @@ ha_tracker: # CLI flag: -distributor.ha-tracker.enable-startup-sync [enable_startup_sync: | default = false] - # Backend storage to use for the ring. Please be aware that memberlist is not - # supported by the HA tracker since gossip propagation is too slow for HA - # purposes. + # Backend storage to use for the ring. Memberlist support in the HA tracker is + # experimental, as gossip propagation delays may impact HA performance. kvstore: # Backend storage to use for the ring. Supported values are: consul, # dynamodb, etcd, inmemory, memberlist, multi. @@ -4574,6 +4573,10 @@ The `memberlist_config` configures the Gossip memberlist. # CLI flag: -memberlist.left-ingesters-timeout [left_ingesters_timeout: | default = 5m] +# How long to keep deleted keys (tombstones) in the KV store +# CLI flag: -memberlist.tombstone-timeout +[tombstone_timeout: | default = 5m] + # Timeout for leaving memberlist cluster. # CLI flag: -memberlist.leave-timeout [leave_timeout: | default = 5s] diff --git a/integration/integration_memberlist_single_binary_test.go b/integration/integration_memberlist_single_binary_test.go index a79642e1dd..14a720d9a9 100644 --- a/integration/integration_memberlist_single_binary_test.go +++ b/integration/integration_memberlist_single_binary_test.go @@ -11,6 +11,8 @@ import ( "testing" "time" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" @@ -246,3 +248,245 @@ func TestSingleBinaryWithMemberlistScaling(t *testing.T) { "expected all instances to have %f ring members and %f tombstones", expectedRingMembers, expectedTombstones) } + +func TestHATrackerWithMemberlistClusterSync(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + minio := e2edb.NewMinio(9000, bucketName) + require.NoError(t, s.StartAndWaitReady(minio)) + + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-distributor.ha-tracker.enable": "true", + "-distributor.ha-tracker.enable-for-all-users": "true", + "-distributor.ha-tracker.cluster": "cluster", + "-distributor.ha-tracker.replica": "__replica__", + // Use memberlist as the KV store for the HA Tracker + "-distributor.ha-tracker.store": "memberlist", + + // To fast failover + "-distributor.ha-tracker.update-timeout": "1s", + "-distributor.ha-tracker.update-timeout-jitter-max": "0s", + "-distributor.ha-tracker.failover-timeout": "2s", + + // memberlist config + "-ring.store": "memberlist", + "-memberlist.bind-port": "8000", + }) + + cortex1 := newSingleBinary("cortex-1", "", "", flags) + cortex2 := newSingleBinary("cortex-2", "", networkName+"-cortex-1:8000", flags) + cortex3 := newSingleBinary("cortex-3", "", networkName+"-cortex-1:8000", flags) + + require.NoError(t, s.StartAndWaitReady(cortex1)) + require.NoError(t, s.StartAndWaitReady(cortex2, cortex3)) + + // Ensure both Cortex instances have successfully discovered each other in the memberlist cluster. + require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(3), "memberlist_client_cluster_members_count")) + require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(3), "memberlist_client_cluster_members_count")) + require.NoError(t, cortex3.WaitSumMetrics(e2e.Equals(3), "memberlist_client_cluster_members_count")) + + // All Cortex servers should have 512 tokens, altogether 3 * 512. + require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(3*512), "cortex_ring_tokens_total")) + require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(3*512), "cortex_ring_tokens_total")) + require.NoError(t, cortex3.WaitSumMetrics(e2e.Equals(3*512), "cortex_ring_tokens_total")) + + now := time.Now() + userID := "user-1" + + client1, err := e2ecortex.NewClient(cortex1.HTTPEndpoint(), "", "", "", userID) + require.NoError(t, err) + + series, _ := generateSeries("foo", now, prompb.Label{Name: "__replica__", Value: "replica0"}, prompb.Label{Name: "cluster", Value: "cluster0"}) + // send to cortex1 + res, err := client1.Push([]prompb.TimeSeries{series[0]}) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(1), "cortex_ha_tracker_elected_replica_changes_total")) + // cortex-2 should be noticed HA reader via memberlist gossip + require.NoError(t, cortex2.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ha_tracker_elected_replica_changes_total"}, e2e.WaitMissingMetrics)) + // cortex-3 should be noticed HA reader via memberlist gossip + require.NoError(t, cortex3.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ha_tracker_elected_replica_changes_total"}, e2e.WaitMissingMetrics)) + + // Wait 5 seconds to ensure the FailoverTimeout (2s) has comfortably passed. + time.Sleep(5 * time.Second) + + client2, err := e2ecortex.NewClient(cortex2.HTTPEndpoint(), "", "", "", userID) + require.NoError(t, err) + + series2, _ := generateSeries("foo", now.Add(time.Second*30), prompb.Label{Name: "__replica__", Value: "replica1"}, prompb.Label{Name: "cluster", Value: "cluster0"}) + // send to cortex2 + res2, err := client2.Push([]prompb.TimeSeries{series2[0]}) + require.NoError(t, err) + require.Equal(t, 200, res2.StatusCode) + + // cortex2 failover to replica1 + require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(2), "cortex_ha_tracker_elected_replica_changes_total")) + // cortex-1 should be noticed changed HA reader via memberlist gossip + require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(2), "cortex_ha_tracker_elected_replica_changes_total")) + // cortex-3 should be noticed changed HA reader via memberlist gossip + require.NoError(t, cortex3.WaitSumMetrics(e2e.Equals(2), "cortex_ha_tracker_elected_replica_changes_total")) +} + +func TestHATrackerWithMemberlist(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + minio := e2edb.NewMinio(9000, bucketName) + require.NoError(t, s.StartAndWaitReady(minio)) + + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-distributor.ha-tracker.enable": "true", + "-distributor.ha-tracker.enable-for-all-users": "true", + "-distributor.ha-tracker.cluster": "cluster", + "-distributor.ha-tracker.replica": "__replica__", + // Use memberlist as the KV store for the HA Tracker + "-distributor.ha-tracker.store": "memberlist", + + // To fast failover + "-distributor.ha-tracker.update-timeout": "1s", + "-distributor.ha-tracker.update-timeout-jitter-max": "0s", + "-distributor.ha-tracker.failover-timeout": "2s", + + // memberlist config + "-ring.store": "memberlist", + "-memberlist.bind-port": "8000", + }) + + cortex := newSingleBinary("cortex", "", "", flags) + require.NoError(t, s.StartAndWaitReady(cortex)) + + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(1), "memberlist_client_cluster_members_count")) + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + now := time.Now() + numUsers := 100 + + for i := 1; i <= numUsers; i++ { + userID := fmt.Sprintf("user-%d", i) + client, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), "", "", "", userID) + require.NoError(t, err) + + series, _ := generateSeries("foo", now, prompb.Label{Name: "__replica__", Value: "replica0"}, prompb.Label{Name: "cluster", Value: "cluster0"}) + res, err := client.Push([]prompb.TimeSeries{series[0]}) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + } + + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(numUsers)), "cortex_ha_tracker_elected_replica_changes_total")) + + // Wait 5 seconds to ensure the FailoverTimeout (2s) has comfortably passed. + time.Sleep(5 * time.Second) + + for i := 1; i <= numUsers; i++ { + userID := fmt.Sprintf("user-%d", i) + client, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), "", "", "", userID) + require.NoError(t, err) + + // This time, we send data from replica1 instead of replica0. + series, _ := generateSeries("foo", now.Add(time.Second*30), prompb.Label{Name: "__replica__", Value: "replica1"}, prompb.Label{Name: "cluster", Value: "cluster0"}) + res, err := client.Push([]prompb.TimeSeries{series[0]}) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + } + + // Since the leader successfully failed over to replica1, the change count increments by 1 per user + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(numUsers*2)), "cortex_ha_tracker_elected_replica_changes_total")) +} + +func TestHATrackerWithMultiKV(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, bucketName) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-distributor.ha-tracker.enable": "true", + "-distributor.ha-tracker.enable-for-all-users": "true", + "-distributor.ha-tracker.cluster": "cluster", + "-distributor.ha-tracker.replica": "__replica__", + // Use memberlist as the KV store for the HA Tracker + "-distributor.ha-tracker.store": "multi", + + // To fast failover + "-distributor.ha-tracker.update-timeout": "1s", + "-distributor.ha-tracker.update-timeout-jitter-max": "0s", + "-distributor.ha-tracker.failover-timeout": "2s", + + // multi KV config + "-distributor.ha-tracker.multi.primary": "consul", + "-distributor.ha-tracker.multi.secondary": "memberlist", + "-distributor.ha-tracker.consul.hostname": consul.NetworkHTTPEndpoint(), + + // Enable data mirroring + "-distributor.ha-tracker.multi.mirror-enabled": "true", + + // memberlist config + "-ring.store": "memberlist", + "-memberlist.bind-port": "8000", + }) + + cortex := newSingleBinary("cortex", "", "", flags) + require.NoError(t, s.StartAndWaitReady(cortex)) + + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(1), "memberlist_client_cluster_members_count")) + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + // mirror enabled + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(1)), "cortex_multikv_mirror_enabled")) + // consul as primary KV Store + require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_multikv_primary_store"}, e2e.WaitMissingMetrics, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "store", "consul"))), + ) + + now := time.Now() + numUsers := 100 + + for i := 1; i <= numUsers; i++ { + userID := fmt.Sprintf("user-%d", i) + client, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), "", "", "", userID) + require.NoError(t, err) + + series, _ := generateSeries("foo", now, prompb.Label{Name: "__replica__", Value: "replica0"}, prompb.Label{Name: "cluster", Value: "cluster0"}) + res, err := client.Push([]prompb.TimeSeries{series[0]}) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + } + + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(numUsers)), "cortex_ha_tracker_elected_replica_changes_total")) + + // Wait 5 seconds to ensure the FailoverTimeout (2s) has comfortably passed. + time.Sleep(5 * time.Second) + + for i := 1; i <= numUsers; i++ { + userID := fmt.Sprintf("user-%d", i) + client, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), "", "", "", userID) + require.NoError(t, err) + + // This time, we send data from replica1 instead of replica0. + series, _ := generateSeries("foo", now.Add(time.Second*30), prompb.Label{Name: "__replica__", Value: "replica1"}, prompb.Label{Name: "cluster", Value: "cluster0"}) + res, err := client.Push([]prompb.TimeSeries{series[0]}) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + } + + // Since the leader successfully failed over to replica1, the change count increments by 1 per user + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(numUsers*2)), "cortex_ha_tracker_elected_replica_changes_total")) + // Two keys (1 cluster with 2 replicas) per user should be written to the memberlist (secondary store) + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(numUsers*2)), "cortex_multikv_mirror_writes_total")) +} diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index fbc6db605f..4fe4502d89 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -37,6 +37,7 @@ import ( "github.com/cortexproject/cortex/pkg/flusher" "github.com/cortexproject/cortex/pkg/frontend" "github.com/cortexproject/cortex/pkg/frontend/transport" + "github.com/cortexproject/cortex/pkg/ha" "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/overrides" "github.com/cortexproject/cortex/pkg/parquetconverter" @@ -821,6 +822,7 @@ func (t *Cortex) initMemberlistKV() (services.Service, error) { t.Cfg.MemberlistKV.MetricsRegisterer = reg t.Cfg.MemberlistKV.Codecs = []codec.Codec{ ring.GetCodec(), + ha.GetReplicaDescCodec(), } dnsProviderReg := prometheus.WrapRegistererWithPrefix( "cortex_", @@ -835,6 +837,7 @@ func (t *Cortex) initMemberlistKV() (services.Service, error) { // Update the config. t.Cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + t.Cfg.Distributor.HATrackerConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.StoreGateway.ShardingRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.Compactor.ShardingRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV diff --git a/pkg/ha/ha_tracker.go b/pkg/ha/ha_tracker.go index 11b38eb79c..d798e07d18 100644 --- a/pkg/ha/ha_tracker.go +++ b/pkg/ha/ha_tracker.go @@ -21,6 +21,7 @@ import ( "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/ring/kv/codec" + "github.com/cortexproject/cortex/pkg/ring/kv/memberlist" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -72,7 +73,7 @@ type HATrackerConfig struct { // of tracked keys is large. EnableStartupSync bool `yaml:"enable_startup_sync"` - KVStore kv.Config `yaml:"kvstore" doc:"description=Backend storage to use for the ring. Please be aware that memberlist is not supported by the HA tracker since gossip propagation is too slow for HA purposes."` + KVStore kv.Config `yaml:"kvstore" doc:"description=Backend storage to use for the ring. Memberlist support in the HA tracker is experimental, as gossip propagation delays may impact HA performance."` } // RegisterFlags adds the flags required to config this to the given FlagSet with a specified prefix @@ -80,6 +81,92 @@ func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet) { cfg.RegisterFlagsWithPrefix("", "", f) } +func (d *ReplicaDesc) Clone() any { + return proto.Clone(d) +} + +// Merge merges other ReplicaDesc into this one and can be sent out to other clients. +func (d *ReplicaDesc) Merge(mergeable memberlist.Mergeable, _ bool) (memberlist.Mergeable, error) { + if mergeable == nil { + return nil, nil + } + + other, ok := mergeable.(*ReplicaDesc) + if !ok { + return nil, fmt.Errorf("expected *ha.ReplicaDesc, got %T", mergeable) + } + + if other == nil { + return nil, nil + } + + getLatestTime := func(desc *ReplicaDesc) int64 { + if desc.DeletedAt > desc.ReceivedAt { + return desc.DeletedAt + } + return desc.ReceivedAt + } + + curLatest := getLatestTime(d) + otherLatest := getLatestTime(other) + + if otherLatest > curLatest { + // If other is more recent, take it. + return d.apply(other), nil + } + + if otherLatest < curLatest { + // If the current is more recent, ignore the incoming data. + return nil, nil + } + + // If timestamps are the same, we take deleted one. + isCurDeleted := d.DeletedAt == curLatest && d.DeletedAt > 0 + isOtherIsDeleted := other.DeletedAt == otherLatest && other.DeletedAt > 0 + + if isOtherIsDeleted && !isCurDeleted { + // If other has been deleted, take it. + return d.apply(other), nil + } + if isCurDeleted && !isOtherIsDeleted { + // If the current has been deleted, ignore the incoming data. + return nil, nil + } + + // If timestamps are exactly equal but replicas differ, use lexicographic ordering + if other.Replica != d.Replica { + if other.Replica < d.Replica { + return d.apply(other), nil + } + } + + // No change (same timestamp, same replica) + return nil, nil +} + +// apply performs an in-place update of the current descriptor and returns a cloned result. +func (d *ReplicaDesc) apply(other *ReplicaDesc) *ReplicaDesc { + d.Replica = other.Replica + d.ReceivedAt = other.ReceivedAt + d.DeletedAt = other.DeletedAt + return proto.Clone(d).(*ReplicaDesc) +} + +// MergeContent describes content of this Mergeable. +// For ReplicaDesc, we return the replica name. +func (d *ReplicaDesc) MergeContent() []string { + if d.Replica == "" { + return nil + } + return []string{d.Replica} +} + +// RemoveTombstones is a no-op for ReplicaDesc. +func (d *ReplicaDesc) RemoveTombstones(_ time.Time) (total, removed int) { + // No-op: HATracker manages tombstones via cleanupOldReplicas + return +} + // RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *HATrackerConfig) RegisterFlagsWithPrefix(flagPrefix string, kvPrefix string, f *flag.FlagSet) { finalFlagPrefix := "" @@ -116,12 +203,13 @@ func (cfg *HATrackerConfig) Validate() error { return fmt.Errorf(errInvalidFailoverTimeout, cfg.FailoverTimeout, minFailureTimeout) } - // Tracker kv store only supports consul and etcd. - storeAllowedList := []string{"consul", "etcd"} - if slices.Contains(storeAllowedList, cfg.KVStore.Store) { - return nil + // Tracker kv store only supports consul, etcd, memberlist, and multi. + storeAllowedList := []string{"consul", "etcd", "memberlist", "multi"} + if !slices.Contains(storeAllowedList, cfg.KVStore.Store) { + return fmt.Errorf("invalid HATracker KV store type: %s", cfg.KVStore.Store) } - return fmt.Errorf("invalid HATracker KV store type: %s", cfg.KVStore.Store) + + return nil } func GetReplicaDescCodec() codec.Proto { diff --git a/pkg/ha/ha_tracker_test.go b/pkg/ha/ha_tracker_test.go index 882a4c8868..552c582d2f 100644 --- a/pkg/ha/ha_tracker_test.go +++ b/pkg/ha/ha_tracker_test.go @@ -3,11 +3,13 @@ package ha import ( "context" "fmt" + "math" "strings" "testing" "time" "github.com/go-kit/log" + "github.com/gogo/protobuf/proto" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" @@ -19,7 +21,9 @@ import ( "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv" + "github.com/cortexproject/cortex/pkg/ring/kv/codec" "github.com/cortexproject/cortex/pkg/ring/kv/consul" + "github.com/cortexproject/cortex/pkg/ring/kv/memberlist" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" util_log "github.com/cortexproject/cortex/pkg/util/log" @@ -108,14 +112,23 @@ func TestHATrackerConfig_Validate(t *testing.T) { }(), expectedErr: nil, }, - "should failed with invalid kv store": { + "should pass with memberlist kv store": { cfg: func() HATrackerConfig { cfg := HATrackerConfig{} flagext.DefaultValues(&cfg) cfg.KVStore.Store = "memberlist" return cfg }(), - expectedErr: fmt.Errorf("invalid HATracker KV store type: %s", "memberlist"), + expectedErr: nil, + }, + "should pass with multi kv store": { + cfg: func() HATrackerConfig { + cfg := HATrackerConfig{} + flagext.DefaultValues(&cfg) + cfg.KVStore.Store = "multi" + return cfg + }(), + expectedErr: nil, }, } @@ -127,6 +140,89 @@ func TestHATrackerConfig_Validate(t *testing.T) { } } +type dnsProviderMock struct { + resolved []string +} + +func (p *dnsProviderMock) Resolve(ctx context.Context, addrs []string, flushOld bool) error { + p.resolved = addrs + return nil +} + +func (p dnsProviderMock) Addresses() []string { + return p.resolved +} + +// TestHATracker_CleanupDeletesArePropagated demonstrates that when the HA tracker's +// background cleanup loop removes stale replicas, it correctly triggers a memberlist +// KV.Delete which in turn generates a tombstone broadcast. +func TestHATracker_CleanupDeletesArePropagatedWithMemberlist(t *testing.T) { + ctx := context.Background() + logger := log.NewNopLogger() + reg := prometheus.NewPedanticRegistry() + + var cfg memberlist.KVConfig + flagext.DefaultValues(&cfg) + replicaDescCodec := GetReplicaDescCodec() + cfg.Codecs = []codec.Codec{replicaDescCodec} + cfg.RetransmitMult = 1 + + mkv := memberlist.NewKV(cfg, logger, &dnsProviderMock{}, reg) + require.NoError(t, services.StartAndAwaitRunning(ctx, mkv)) + defer services.StopAndAwaitTerminated(ctx, mkv) //nolint:errcheck + + client, err := memberlist.NewClient(mkv, replicaDescCodec) + require.NoError(t, err) + + trackerCfg := HATrackerConfig{ + EnableHATracker: false, + KVStore: kv.Config{ + Store: "memberlist", + }, + } + tracker, err := NewHATracker(trackerCfg, nil, HATrackerStatusConfig{}, reg, "", logger) + require.NoError(t, err) + + // Inject our test memberlist client into the tracker + tracker.cfg.EnableHATracker = true + tracker.client = client + + userID := "user1" + cluster := "cluster1" + replica := "replica0" + key := userID + "/" + cluster + + // Inject initial HA data (simulates receiving a sample) + now := time.Now() + err = tracker.CheckReplica(ctx, userID, cluster, replica, now) + require.NoError(t, err) + + // Drain the broadcast queue to clear out the CAS notification from CheckReplica + mkv.GetBroadcasts(0, math.MaxInt32) + + // To call c.client.Delete(ctx, key) in cleanupOldReplicas + futureDeadline := now.Add(time.Hour) + + // This will see (ReceivedAt < deadline) and CAS the entry to set DeletedAt = time.Now() + tracker.cleanupOldReplicas(ctx, futureDeadline) + require.Equal(t, float64(1), testutil.ToFloat64(tracker.replicasMarkedForDeletion)) + require.Equal(t, float64(0), testutil.ToFloat64(tracker.deletedReplicas)) + + // This will see (DeletedAt > 0) and (DeletedAt < deadline), calling c.client.Delete(ctx, key) + tracker.cleanupOldReplicas(ctx, futureDeadline) + require.Equal(t, float64(1), testutil.ToFloat64(tracker.deletedReplicas)) + require.Equal(t, float64(0), testutil.ToFloat64(tracker.markingForDeletionsFailed)) + + // Verify Local Deletion + val, err := client.Get(ctx, key) + require.NoError(t, err) + require.Nil(t, val, "HA Tracker cleanup should have deleted the key locally") + + // Verify Broadcast Generation + broadcasts := mkv.GetBroadcasts(0, math.MaxInt32) + require.NotEmpty(t, broadcasts, "Cleanup Delete should generate a broadcast for tombstone propagation") +} + // Test that values are set in the HATracker after WatchPrefix has found it in the KVStore. func TestWatchPrefixAssignment(t *testing.T) { t.Parallel() @@ -945,3 +1041,280 @@ func checkReplicaDeletionState(t *testing.T, duration time.Duration, c *HATracke require.Equal(t, expectedMarkedForDeletion, markedForDeletion, "KV entry marked for deletion") } } + +func TestReplicaDesc_Merge(t *testing.T) { + now := time.Now() + + tests := []struct { + name string + current *ReplicaDesc + other *ReplicaDesc + expectChange bool + expectedResult *ReplicaDesc + }{ + { + name: "merge with more recent replica", + current: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + other: &ReplicaDesc{ + Replica: "replica2", + ReceivedAt: timestamp.FromTime(now.Add(time.Minute)), + DeletedAt: 0, + }, + expectChange: true, + expectedResult: &ReplicaDesc{ + Replica: "replica2", + ReceivedAt: timestamp.FromTime(now.Add(time.Minute)), + DeletedAt: 0, + }, + }, + { + name: "merge with older replica - no change", + current: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now.Add(time.Minute)), + DeletedAt: 0, + }, + other: &ReplicaDesc{ + Replica: "replica2", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + expectChange: false, + expectedResult: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now.Add(time.Minute)), + DeletedAt: 0, + }, + }, + { + name: "merge with deleted replica", + current: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + other: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now.Add(time.Minute)), + DeletedAt: timestamp.FromTime(now.Add(2 * time.Minute)), + }, + expectChange: true, + expectedResult: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now.Add(time.Minute)), + DeletedAt: timestamp.FromTime(now.Add(2 * time.Minute)), + }, + }, + { + name: "undelete with more recent replica", + current: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: timestamp.FromTime(now.Add(time.Minute)), + }, + other: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now.Add(2 * time.Minute)), + DeletedAt: 0, + }, + expectChange: true, + expectedResult: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now.Add(2 * time.Minute)), + DeletedAt: 0, + }, + }, + { + name: "merge with nil other", + current: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + other: nil, + expectChange: false, + expectedResult: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + }, + { + name: "merge deleted with more recent deleted", + current: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: timestamp.FromTime(now.Add(time.Minute)), + }, + other: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: timestamp.FromTime(now.Add(2 * time.Minute)), + }, + expectChange: true, + expectedResult: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: timestamp.FromTime(now.Add(2 * time.Minute)), + }, + }, + { + name: "same timestamp, different replica - choose lexicographically smaller", + current: &ReplicaDesc{ + Replica: "replica-b", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + other: &ReplicaDesc{ + Replica: "replica-a", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + expectChange: true, + expectedResult: &ReplicaDesc{ + Replica: "replica-a", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + }, + { + name: "same timestamp, same replica - no change", + current: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + other: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + expectChange: false, + expectedResult: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var change memberlist.Mergeable + var err error + + if tt.other != nil { + change, err = tt.current.Merge(tt.other, false) + } else { + change, err = tt.current.Merge(nil, false) + } + + require.NoError(t, err) + + if tt.expectChange { + require.NotNil(t, change, "expected a change to be returned") + } else { + require.Nil(t, change, "expected no change to be returned") + } + + assert.Equal(t, tt.expectedResult.Replica, tt.current.Replica) + assert.Equal(t, tt.expectedResult.ReceivedAt, tt.current.ReceivedAt) + assert.Equal(t, tt.expectedResult.DeletedAt, tt.current.DeletedAt) + }) + } +} + +func TestReplicaDesc_Merge_Commutativity(t *testing.T) { + tests := []struct { + name string + descA *ReplicaDesc + descB *ReplicaDesc + }{ + { + name: "Same replica: New vs Older", + descA: &ReplicaDesc{ + Replica: "replica-A", + ReceivedAt: 200, + DeletedAt: 0, + }, + descB: &ReplicaDesc{ + Replica: "replica-A", + ReceivedAt: 50, + DeletedAt: 100, + }, + }, + { + name: "Same Timestamps - Lexicographical Tie-break", + descA: &ReplicaDesc{ + Replica: "replica-A", + ReceivedAt: 100, + DeletedAt: 0, + }, + descB: &ReplicaDesc{ + Replica: "replica-B", + ReceivedAt: 100, + DeletedAt: 0, + }, + }, + { + name: "Concurrent Deletions with Different Timestamps", + descA: &ReplicaDesc{ + Replica: "replica-A", + ReceivedAt: 50, + DeletedAt: 150, + }, + descB: &ReplicaDesc{ + Replica: "replica-A", + ReceivedAt: 50, + DeletedAt: 120, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // A merges B + nodeA := proto.Clone(tt.descA).(*ReplicaDesc) + incomingB := proto.Clone(tt.descB).(*ReplicaDesc) + _, _ = nodeA.Merge(incomingB, false) + + // B merges A + nodeB := proto.Clone(tt.descB).(*ReplicaDesc) + incomingA := proto.Clone(tt.descA).(*ReplicaDesc) + _, _ = nodeB.Merge(incomingA, false) + + // Check if both nodes converged to the exact same state + isSame := (nodeA.Replica == nodeB.Replica) && + (nodeA.ReceivedAt == nodeB.ReceivedAt) && + (nodeA.DeletedAt == nodeB.DeletedAt) + + if !isSame { + t.Errorf("Commutativity violation in '%s'!\n"+ + "Result of A.Merge(B): Replica=%s, ReceivedAt=%d, DeletedAt=%d\n"+ + "Result of B.Merge(A): Replica=%s, ReceivedAt=%d, DeletedAt=%d", + tt.name, + nodeA.Replica, nodeA.ReceivedAt, nodeA.DeletedAt, + nodeB.Replica, nodeB.ReceivedAt, nodeB.DeletedAt) + } + }) + } +} + +func TestReplicaDesc_MergeContent(t *testing.T) { + desc := &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(time.Now()), + DeletedAt: 0, + } + + content := desc.MergeContent() + require.Equal(t, []string{"replica1"}, content) + + emptyDesc := &ReplicaDesc{} + emptyContent := emptyDesc.MergeContent() + require.Nil(t, emptyContent) +} diff --git a/pkg/ring/kv/memberlist/kv.pb.go b/pkg/ring/kv/memberlist/kv.pb.go index 29030cfe9b..8997293472 100644 --- a/pkg/ring/kv/memberlist/kv.pb.go +++ b/pkg/ring/kv/memberlist/kv.pb.go @@ -76,6 +76,10 @@ type KeyValuePair struct { Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` // ID of the codec used to write the value Codec string `protobuf:"bytes,3,opt,name=codec,proto3" json:"codec,omitempty"` + // Indicates whether the key is marked as logically deleted (Tombstone). + Deleted bool `protobuf:"varint,4,opt,name=deleted,proto3" json:"deleted,omitempty"` + // Unix timestamp (in milliseconds) of the last update or deletion. + UpdatedAt int64 `protobuf:"varint,5,opt,name=updatedAt,proto3" json:"updatedAt,omitempty"` } func (m *KeyValuePair) Reset() { *m = KeyValuePair{} } @@ -131,6 +135,20 @@ func (m *KeyValuePair) GetCodec() string { return "" } +func (m *KeyValuePair) GetDeleted() bool { + if m != nil { + return m.Deleted + } + return false +} + +func (m *KeyValuePair) GetUpdatedAt() int64 { + if m != nil { + return m.UpdatedAt + } + return 0 +} + func init() { proto.RegisterType((*KeyValueStore)(nil), "memberlist.KeyValueStore") proto.RegisterType((*KeyValuePair)(nil), "memberlist.KeyValuePair") @@ -139,21 +157,24 @@ func init() { func init() { proto.RegisterFile("kv.proto", fileDescriptor_2216fe83c9c12408) } var fileDescriptor_2216fe83c9c12408 = []byte{ - // 218 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xc8, 0x2e, 0xd3, 0x2b, - 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0xca, 0x4d, 0xcd, 0x4d, 0x4a, 0x2d, 0xca, 0xc9, 0x2c, 0x2e, - 0x91, 0x12, 0x49, 0xcf, 0x4f, 0xcf, 0x07, 0x0b, 0xeb, 0x83, 0x58, 0x10, 0x15, 0x4a, 0xf6, 0x5c, - 0xbc, 0xde, 0xa9, 0x95, 0x61, 0x89, 0x39, 0xa5, 0xa9, 0xc1, 0x25, 0xf9, 0x45, 0xa9, 0x42, 0x7a, - 0x5c, 0xac, 0x05, 0x89, 0x99, 0x45, 0xc5, 0x12, 0x8c, 0x0a, 0xcc, 0x1a, 0xdc, 0x46, 0x12, 0x7a, - 0x08, 0x23, 0xf4, 0x60, 0x2a, 0x03, 0x12, 0x33, 0x8b, 0x82, 0x20, 0xca, 0x94, 0x7c, 0xb8, 0x78, - 0x90, 0x85, 0x85, 0x04, 0xb8, 0x98, 0xb3, 0x53, 0x2b, 0x25, 0x18, 0x15, 0x18, 0x35, 0x38, 0x83, - 0x40, 0x4c, 0x21, 0x11, 0x2e, 0xd6, 0x32, 0x90, 0xb4, 0x04, 0x93, 0x02, 0xa3, 0x06, 0x4f, 0x10, - 0x84, 0x03, 0x12, 0x4d, 0xce, 0x4f, 0x49, 0x4d, 0x96, 0x60, 0x06, 0xab, 0x84, 0x70, 0x9c, 0x4c, - 0x2e, 0x3c, 0x94, 0x63, 0xb8, 0xf1, 0x50, 0x8e, 0xe1, 0xc3, 0x43, 0x39, 0xc6, 0x86, 0x47, 0x72, - 0x8c, 0x2b, 0x1e, 0xc9, 0x31, 0x9e, 0x78, 0x24, 0xc7, 0x78, 0xe1, 0x91, 0x1c, 0xe3, 0x83, 0x47, - 0x72, 0x8c, 0x2f, 0x1e, 0xc9, 0x31, 0x7c, 0x78, 0x24, 0xc7, 0x38, 0xe1, 0xb1, 0x1c, 0xc3, 0x85, - 0xc7, 0x72, 0x0c, 0x37, 0x1e, 0xcb, 0x31, 0x24, 0xb1, 0x81, 0xfd, 0x62, 0x0c, 0x08, 0x00, 0x00, - 0xff, 0xff, 0xbe, 0xcc, 0xb6, 0xd8, 0xf9, 0x00, 0x00, 0x00, + // 260 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x4c, 0x90, 0xb1, 0x4e, 0xf3, 0x30, + 0x14, 0x46, 0x7d, 0xff, 0xfc, 0x81, 0xd6, 0x14, 0x09, 0x59, 0x1d, 0x2c, 0x84, 0xae, 0xa2, 0x4e, + 0x9e, 0x82, 0x04, 0xec, 0x08, 0x56, 0x16, 0x64, 0x24, 0xf6, 0xb4, 0xbe, 0xaa, 0xa2, 0xa6, 0x72, + 0xe4, 0xba, 0x95, 0xba, 0x31, 0xf0, 0x00, 0x3c, 0x06, 0x8f, 0xc2, 0x98, 0xb1, 0x23, 0x71, 0x16, + 0xc6, 0x3e, 0x02, 0x4a, 0xa2, 0xaa, 0x6c, 0xdf, 0x39, 0x3e, 0xf2, 0x70, 0xf9, 0x60, 0xb1, 0x49, + 0x4b, 0x67, 0xbd, 0x15, 0x7c, 0x49, 0xcb, 0x29, 0xb9, 0x22, 0x5f, 0xf9, 0xcb, 0xf1, 0xdc, 0xce, + 0x6d, 0xa7, 0xaf, 0xdb, 0xd5, 0x17, 0x93, 0x7b, 0x7e, 0xfe, 0x44, 0xdb, 0xd7, 0xac, 0x58, 0xd3, + 0x8b, 0xb7, 0x8e, 0x44, 0xca, 0xe3, 0x32, 0xcb, 0xdd, 0x4a, 0x42, 0x12, 0xa9, 0xb3, 0x1b, 0x99, + 0x1e, 0xbf, 0x48, 0x0f, 0xe5, 0x73, 0x96, 0x3b, 0xdd, 0x67, 0x93, 0x77, 0xe0, 0xa3, 0xbf, 0x5e, + 0x5c, 0xf0, 0x68, 0x41, 0x5b, 0x09, 0x09, 0xa8, 0xa1, 0x6e, 0xa7, 0x18, 0xf3, 0x78, 0xd3, 0x3e, + 0xcb, 0x7f, 0x09, 0xa8, 0x91, 0xee, 0xa1, 0xb5, 0x33, 0x6b, 0x68, 0x26, 0xa3, 0xae, 0xec, 0x41, + 0x48, 0x7e, 0x6a, 0xa8, 0x20, 0x4f, 0x46, 0xfe, 0x4f, 0x40, 0x0d, 0xf4, 0x01, 0xc5, 0x15, 0x1f, + 0xae, 0x4b, 0x93, 0x79, 0x32, 0x0f, 0x5e, 0xc6, 0x09, 0xa8, 0x48, 0x1f, 0xc5, 0xe3, 0x5d, 0x55, + 0x23, 0xdb, 0xd5, 0xc8, 0xf6, 0x35, 0xc2, 0x5b, 0x40, 0xf8, 0x0c, 0x08, 0x5f, 0x01, 0xa1, 0x0a, + 0x08, 0xdf, 0x01, 0xe1, 0x27, 0x20, 0xdb, 0x07, 0x84, 0x8f, 0x06, 0x59, 0xd5, 0x20, 0xdb, 0x35, + 0xc8, 0xa6, 0x27, 0xdd, 0x11, 0x6e, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0xf1, 0x94, 0xac, 0x1d, + 0x32, 0x01, 0x00, 0x00, } func (this *KeyValueStore) Equal(that interface{}) bool { @@ -213,6 +234,12 @@ func (this *KeyValuePair) Equal(that interface{}) bool { if this.Codec != that1.Codec { return false } + if this.Deleted != that1.Deleted { + return false + } + if this.UpdatedAt != that1.UpdatedAt { + return false + } return true } func (this *KeyValueStore) GoString() string { @@ -231,11 +258,13 @@ func (this *KeyValuePair) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 7) + s := make([]string, 0, 9) s = append(s, "&memberlist.KeyValuePair{") s = append(s, "Key: "+fmt.Sprintf("%#v", this.Key)+",\n") s = append(s, "Value: "+fmt.Sprintf("%#v", this.Value)+",\n") s = append(s, "Codec: "+fmt.Sprintf("%#v", this.Codec)+",\n") + s = append(s, "Deleted: "+fmt.Sprintf("%#v", this.Deleted)+",\n") + s = append(s, "UpdatedAt: "+fmt.Sprintf("%#v", this.UpdatedAt)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -304,6 +333,21 @@ func (m *KeyValuePair) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.UpdatedAt != 0 { + i = encodeVarintKv(dAtA, i, uint64(m.UpdatedAt)) + i-- + dAtA[i] = 0x28 + } + if m.Deleted { + i-- + if m.Deleted { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x20 + } if len(m.Codec) > 0 { i -= len(m.Codec) copy(dAtA[i:], m.Codec) @@ -372,6 +416,12 @@ func (m *KeyValuePair) Size() (n int) { if l > 0 { n += 1 + l + sovKv(uint64(l)) } + if m.Deleted { + n += 2 + } + if m.UpdatedAt != 0 { + n += 1 + sovKv(uint64(m.UpdatedAt)) + } return n } @@ -404,6 +454,8 @@ func (this *KeyValuePair) String() string { `Key:` + fmt.Sprintf("%v", this.Key) + `,`, `Value:` + fmt.Sprintf("%v", this.Value) + `,`, `Codec:` + fmt.Sprintf("%v", this.Codec) + `,`, + `Deleted:` + fmt.Sprintf("%v", this.Deleted) + `,`, + `UpdatedAt:` + fmt.Sprintf("%v", this.UpdatedAt) + `,`, `}`, }, "") return s @@ -630,6 +682,45 @@ func (m *KeyValuePair) Unmarshal(dAtA []byte) error { } m.Codec = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Deleted", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Deleted = bool(v != 0) + case 5: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field UpdatedAt", wireType) + } + m.UpdatedAt = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.UpdatedAt |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipKv(dAtA[iNdEx:]) diff --git a/pkg/ring/kv/memberlist/kv.proto b/pkg/ring/kv/memberlist/kv.proto index 700b2f5305..42d4828b76 100644 --- a/pkg/ring/kv/memberlist/kv.proto +++ b/pkg/ring/kv/memberlist/kv.proto @@ -19,4 +19,10 @@ message KeyValuePair { // ID of the codec used to write the value string codec = 3; + + // Indicates whether the key is marked as logically deleted (Tombstone). + bool deleted = 4; + + // Unix timestamp (in milliseconds) of the last update or deletion. + int64 updatedAt = 5; } diff --git a/pkg/ring/kv/memberlist/memberlist_client.go b/pkg/ring/kv/memberlist/memberlist_client.go index 48b5bd9266..b635a54cee 100644 --- a/pkg/ring/kv/memberlist/memberlist_client.go +++ b/pkg/ring/kv/memberlist/memberlist_client.go @@ -29,6 +29,8 @@ import ( const ( maxCasRetries = 10 // max retries in CAS operation noChangeDetectedRetrySleep = time.Second // how long to sleep after no change was detected in CAS + + tombstoneSweepInterval = time.Second * 30 ) // Client implements kv.Client interface, by using memberlist.KV @@ -72,7 +74,12 @@ func (c *Client) Get(ctx context.Context, key string) (any, error) { // Delete is part of kv.Client interface. func (c *Client) Delete(ctx context.Context, key string) error { - return errors.New("memberlist does not support Delete") + err := c.awaitKVRunningOrStopping(ctx) + if err != nil { + return err + } + + return c.kv.Delete(key) } // CAS is part of kv.Client interface @@ -156,6 +163,9 @@ type KVConfig struct { // Remove LEFT ingesters from ring after this timeout. LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout"` + // How long to keep deleted keys (tombstones) in the KV store + TombstoneTimeout time.Duration `yaml:"tombstone_timeout"` + // Timeout used when leaving the memberlist cluster. LeaveTimeout time.Duration `yaml:"leave_timeout"` @@ -188,6 +198,7 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { f.BoolVar(&cfg.AbortIfJoinFails, prefix+"memberlist.abort-if-join-fails", true, "If this node fails to join memberlist cluster, abort.") f.DurationVar(&cfg.RejoinInterval, prefix+"memberlist.rejoin-interval", 0, "If not 0, how often to rejoin the cluster. Occasional rejoin can help to fix the cluster split issue, and is harmless otherwise. For example when using only few components as a seed nodes (via -memberlist.join), then it's recommended to use rejoin. If -memberlist.join points to dynamic service that resolves to all gossiping nodes (eg. Kubernetes headless service), then rejoin is not needed.") f.DurationVar(&cfg.LeftIngestersTimeout, prefix+"memberlist.left-ingesters-timeout", 5*time.Minute, "How long to keep LEFT ingesters in the ring.") + f.DurationVar(&cfg.TombstoneTimeout, prefix+"memberlist.tombstone-timeout", 5*time.Minute, "How long to keep deleted keys (tombstones) in the KV store") f.DurationVar(&cfg.LeaveTimeout, prefix+"memberlist.leave-timeout", 5*time.Second, "Timeout for leaving memberlist cluster.") f.DurationVar(&cfg.GossipInterval, prefix+"memberlist.gossip-interval", mlDefaults.GossipInterval, "How often to gossip.") f.IntVar(&cfg.GossipNodes, prefix+"memberlist.gossip-nodes", mlDefaults.GossipNodes, "How many nodes to gossip to.") @@ -279,6 +290,7 @@ type KV struct { storeValuesDesc *prometheus.Desc storeTombstones *prometheus.GaugeVec storeRemovedTombstones *prometheus.CounterVec + sweptTombstones prometheus.Counter memberlistMembersCount prometheus.GaugeFunc memberlistHealthScore prometheus.GaugeFunc @@ -314,6 +326,9 @@ type valueDesc struct { // ID of codec used to write this value. Only used when sending full state. codecID string + + deleted bool // mark for deleted + updatedAt time.Time // time when deletion or update to resolve conflict } func (v valueDesc) Clone() (result valueDesc) { @@ -488,16 +503,44 @@ func (m *KV) running(ctx context.Context) error { tickerChan = t.C } + var sweepTickerChan <-chan time.Time + if m.cfg.TombstoneTimeout > 0 { + sweepTicker := time.NewTicker(tombstoneSweepInterval) + defer sweepTicker.Stop() + sweepTickerChan = sweepTicker.C + } + for { select { case <-tickerChan: m.rejoinMemberlist(ctx) + case <-sweepTickerChan: + m.sweepTombstones() case <-ctx.Done(): return nil } } } +func (m *KV) sweepTombstones() { + m.storeMu.Lock() + defer m.storeMu.Unlock() + + now := time.Now() + sweptCount := 0 + + for key, val := range m.store { + if val.deleted && now.Sub(val.updatedAt) > m.cfg.TombstoneTimeout { + delete(m.store, key) + sweptCount++ + } + } + + if sweptCount > 0 { + m.sweptTombstones.Add(float64(sweptCount)) + } +} + func (m *KV) rejoinMemberlist(ctx context.Context) { members := m.discoverMembers(ctx, m.cfg.JoinMembers) @@ -649,7 +692,11 @@ func (m *KV) List(prefix string) []string { defer m.storeMu.Unlock() var keys []string - for k := range m.store { + for k, v := range m.store { + if v.deleted { + continue + } + if strings.HasPrefix(k, prefix) { keys = append(keys, k) } @@ -670,6 +717,10 @@ func (m *KV) get(key string, codec codec.Codec) (out any, version uint, err erro v := m.store[key].Clone() m.storeMu.Unlock() + if v.deleted { + return nil, v.version, nil + } + if v.value != nil { // remove ALL tombstones before returning to client. // No need for clients to see them. @@ -679,6 +730,37 @@ func (m *KV) get(key string, codec codec.Codec) (out any, version uint, err erro return v.value, v.version, nil } +// Delete marks the key as deleted (Tombstone) and broadcasts this state to peers. +func (m *KV) Delete(key string) error { + m.storeMu.Lock() + val, ok := m.store[key] + m.storeMu.Unlock() + + // If the key is not found or is marked as deleted, do nothing. + if !ok || val.deleted { + return nil + } + + codec := m.GetCodec(val.codecID) + if codec == nil { + level.Error(m.logger).Log("msg", "failed to mark for deleted: invalid codec for key", "codec", val.codecID, "key", key) + return fmt.Errorf("invalid codec %s for key %s", val.codecID, key) + } + + change, newvar, err := m.mergeValueForKey(key, val.value, 0, codec, true, time.Now()) + if err != nil { + level.Error(m.logger).Log("msg", "failed to mark for deleted: error ", "key", key, "err", err) + return err + } + + if newvar > 0 { + m.notifyWatchers(key) + m.broadcastNewValue(key, change, newvar, codec) + } + + return nil +} + // WatchKey watches for value changes for given key. When value changes, 'f' function is called with the // latest value. Notifications that arrive while 'f' is running are coalesced into one subsequent 'f' call. // @@ -888,7 +970,7 @@ outer: } m.casFailures.Inc() - return fmt.Errorf("failed to CAS-update key %s: %v", key, lastError) + return fmt.Errorf("failed to CAS-update key %s: %w", key, lastError) } // returns change, error (or nil, if CAS succeeded), and whether to retry or not. @@ -901,7 +983,7 @@ func (m *KV) trySingleCas(key string, codec codec.Codec, f func(in any) (out any out, retry, err := f(val) if err != nil { - return nil, 0, retry, fmt.Errorf("fn returned error: %v", err) + return nil, 0, retry, fmt.Errorf("fn returned error: %w", err) } if out == nil { @@ -917,7 +999,7 @@ func (m *KV) trySingleCas(key string, codec codec.Codec, f func(in any) (out any // To support detection of removed items from value, we will only allow CAS operation to // succeed if version hasn't changed, i.e. state hasn't changed since running 'f'. - change, newver, err := m.mergeValueForKey(key, r, ver, codec) + change, newver, err := m.mergeValueForKey(key, r, ver, codec, false, time.Now()) if err == errVersionMismatch { return nil, 0, retry, err } @@ -942,7 +1024,28 @@ func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec return } - kvPair := KeyValuePair{Key: key, Value: data, Codec: codec.CodecID()} + // Fetch current metadata from the store to broadcast + m.storeMu.Lock() + desc, ok := m.store[key] + m.storeMu.Unlock() + + var deleted bool + var updatedAt int64 + if ok { + deleted = desc.deleted + if !desc.updatedAt.IsZero() { + updatedAt = desc.updatedAt.UnixMilli() + } + } + + kvPair := KeyValuePair{ + Key: key, + Value: data, + Codec: codec.CodecID(), + Deleted: deleted, + UpdatedAt: updatedAt, + } + pairData, err := kvPair.Marshal() if err != nil { level.Error(m.logger).Log("msg", "failed to serialize KV pair", "key", key, "version", version, "err", err) @@ -997,8 +1100,14 @@ func (m *KV) NotifyMsg(msg []byte) { return } + // Convert int64 UnixMilli back to time.Time + var parsedTime time.Time + if kvPair.UpdatedAt != 0 { + parsedTime = time.UnixMilli(kvPair.UpdatedAt) + } + // we have a ring update! Let's merge it with our version of the ring for given key - mod, version, err := m.mergeBytesValueForKey(kvPair.Key, kvPair.Value, codec) + mod, version, err := m.mergeBytesValueForKey(kvPair.Key, kvPair.Value, codec, kvPair.Deleted, parsedTime) changes := []string(nil) if mod != nil { @@ -1090,6 +1199,12 @@ func (m *KV) LocalState(join bool) []byte { kvPair.Key = key kvPair.Value = encoded kvPair.Codec = val.codecID + kvPair.Deleted = val.deleted + if !val.updatedAt.IsZero() { + kvPair.UpdatedAt = val.updatedAt.UnixMilli() + } else { + kvPair.UpdatedAt = 0 + } ser, err := kvPair.Marshal() if err != nil { @@ -1169,8 +1284,14 @@ func (m *KV) MergeRemoteState(data []byte, join bool) { continue } + // Convert int64 UnixMilli back to time.Time + var parsedTime time.Time + if kvPair.UpdatedAt != 0 { + parsedTime = time.UnixMilli(kvPair.UpdatedAt) + } + // we have both key and value, try to merge it with our state - change, newver, err := m.mergeBytesValueForKey(kvPair.Key, kvPair.Value, codec) + change, newver, err := m.mergeBytesValueForKey(kvPair.Key, kvPair.Value, codec, kvPair.Deleted, parsedTime) changes := []string(nil) if change != nil { @@ -1198,7 +1319,7 @@ func (m *KV) MergeRemoteState(data []byte, join bool) { } } -func (m *KV) mergeBytesValueForKey(key string, incomingData []byte, codec codec.Codec) (Mergeable, uint, error) { +func (m *KV) mergeBytesValueForKey(key string, incomingData []byte, codec codec.Codec, deleted bool, updatedAt time.Time) (Mergeable, uint, error) { decodedValue, err := codec.Decode(incomingData) if err != nil { return nil, 0, fmt.Errorf("failed to decode value: %v", err) @@ -1209,14 +1330,14 @@ func (m *KV) mergeBytesValueForKey(key string, incomingData []byte, codec codec. return nil, 0, fmt.Errorf("expected Mergeable, got: %T", decodedValue) } - return m.mergeValueForKey(key, incomingValue, 0, codec) + return m.mergeValueForKey(key, incomingValue, 0, codec, deleted, updatedAt) } // Merges incoming value with value we have in our store. Returns "a change" that can be sent to other // cluster members to update their state, and new version of the value. // If CAS version is specified, then merging will fail if state has changed already, and errVersionMismatch is reported. // If no modification occurred, new version is 0. -func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion uint, codec codec.Codec) (Mergeable, uint, error) { +func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion uint, codec codec.Codec, deleted bool, updatedAt time.Time) (Mergeable, uint, error) { m.storeMu.Lock() defer m.storeMu.Unlock() @@ -1233,11 +1354,42 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui return nil, 0, err } - // No change, don't store it. - if change == nil || len(change.MergeContent()) == 0 { + hasDataChanged := change != nil && len(change.MergeContent()) > 0 + // Time-based conflict resolution (LWW - Last Write Wins) + newUpdatedAt := curr.updatedAt + newDeleted := curr.deleted + statusChanged := false + + if updatedAt.After(curr.updatedAt) { + // Incoming data is newer + if deleted != curr.deleted || deleted { + // The deleted state differs from the current one or deleted + newDeleted = deleted + statusChanged = true + } + } else if updatedAt.Equal(curr.updatedAt) && deleted && !curr.deleted { + // Timestamps are exactly the same, prioritize deletion (Tombstone) + newDeleted = true + statusChanged = true + } + + // Neither the data nor the metadata changed + if !hasDataChanged && !statusChanged { return nil, 0, nil } + // data or state has been changed, update the timestamp + if updatedAt.After(curr.updatedAt) { + newUpdatedAt = updatedAt + } + + // If the value itself hasn't changed but the delete status (Tombstone) + // has changed, we still need to broadcast this metadata change to ensure + // the cluster agrees on the deleted state. + if !hasDataChanged && statusChanged { + change = result + } + if m.cfg.LeftIngestersTimeout > 0 { limit := time.Now().Add(-m.cfg.LeftIngestersTimeout) total, removed := result.RemoveTombstones(limit) @@ -1250,22 +1402,28 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui // Note that "result" and "change" may actually be the same Mergeable. That is why we // call RemoveTombstones on "result" first, so that we get the correct metrics. Calling // RemoveTombstones twice with same limit should be noop. - change.RemoveTombstones(limit) - if len(change.MergeContent()) == 0 { - return nil, 0, nil + if change != nil { + change.RemoveTombstones(limit) + if len(change.MergeContent()) == 0 && !statusChanged { + return nil, 0, nil + } } } newVersion := curr.version + 1 m.store[key] = valueDesc{ - value: result, - version: newVersion, - codecID: codec.CodecID(), + value: result, + version: newVersion, + codecID: codec.CodecID(), + deleted: newDeleted, + updatedAt: newUpdatedAt, } // The "changes" returned by Merge() can contain references to the "result" // state. Therefore, make sure we clone it before releasing the lock. - change = change.Clone().(Mergeable) + if change != nil { + change = change.Clone().(Mergeable) + } return change, newVersion, nil } diff --git a/pkg/ring/kv/memberlist/memberlist_client_test.go b/pkg/ring/kv/memberlist/memberlist_client_test.go index 2ac167665a..002ccb340f 100644 --- a/pkg/ring/kv/memberlist/memberlist_client_test.go +++ b/pkg/ring/kv/memberlist/memberlist_client_test.go @@ -18,6 +18,7 @@ import ( "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -1171,7 +1172,7 @@ func TestNotifyMsgResendsOnlyChanges(t *testing.T) { "a": {Timestamp: now.Unix() - 5, State: ACTIVE}, "b": {Timestamp: now.Unix() + 5, State: ACTIVE, Tokens: []uint32{1, 2, 3}}, "c": {Timestamp: now.Unix(), State: ACTIVE}, - }})) + }}, false, 0)) // Check two things here: // 1) state of value in KV store @@ -1266,7 +1267,7 @@ func TestSendingOldTombstoneShouldNotForwardMessage(t *testing.T) { require.Equal(t, tc.valueBeforeSend, d, "valueBeforeSend") } - kv.NotifyMsg(marshalKeyValuePair(t, key, codec, tc.msgToSend)) + kv.NotifyMsg(marshalKeyValuePair(t, key, codec, tc.msgToSend, false, 0)) bs := kv.GetBroadcasts(0, math.MaxInt32) if tc.broadcastMessage == nil { @@ -1286,6 +1287,397 @@ func TestSendingOldTombstoneShouldNotForwardMessage(t *testing.T) { } } +// TestDeleteIsPropagatedToOtherNodes demonstrates that the updated KV.Delete +// correctly removes the key, generates a tombstone broadcast, and prevents +// the key from reappearing when older gossip is received from peers. +func TestDeleteIsPropagatedToOtherNodes(t *testing.T) { + c := dataCodec{} + + cfg := KVConfig{} + cfg.RetransmitMult = 1 + cfg.Codecs = append(cfg.Codecs, c) + + mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) + defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck + + client, err := NewClient(mkv, c) + require.NoError(t, err) + + testKey := "test-key" + now := time.Now() + + // Write a value via CAS (simulates node1 writing). + cas(t, client, testKey, func(in *data) (*data, bool, error) { + d := &data{Members: map[string]member{}} + d.Members["replica0"] = member{Timestamp: now.Unix(), State: ACTIVE} + return d, true, nil + }) + + // Drain any broadcast messages from the CAS above so our queue is empty. + mkv.GetBroadcasts(0, math.MaxInt32) + + // Verify the key exists. + d := getData(t, client, testKey) + require.NotNil(t, d) + require.Contains(t, d.Members, "replica0") + + // Delete the key. + err = client.Delete(context.Background(), testKey) + require.NoError(t, err) + + // A broadcast should be generated for the delete. + broadcasts := mkv.GetBroadcasts(0, math.MaxInt32) + require.NotEmpty(t, broadcasts, "Delete should generate a broadcast to propagate the tombstone") + + // Simulate another node (node2) gossiping back the old value it still has. + // This is what happens during push-pull sync or regular gossip. + mkv.NotifyMsg(marshalKeyValuePair(t, testKey, c, &data{ + Members: map[string]member{ + "replica0": {Timestamp: now.Unix(), State: ACTIVE}, // Same timestamp as the original creation + }, + }, false, now.UnixMilli())) + + // The key should remain deleted because the tombstone takes precedence. + d = getData(t, client, testKey) + require.Nil(t, d, "key should remain deleted after gossip from peer.") +} + +// TestResurrectDeletedKeyWithNewerMetadata demonstrates that a tombstone +// can be resurrected if a gossip message arrives with a newer UpdatedAt metadata timestamp. +func TestResurrectDeletedKeyWithNewerMetadata(t *testing.T) { + c := dataCodec{} + + cfg := KVConfig{} + cfg.RetransmitMult = 1 + cfg.Codecs = append(cfg.Codecs, c) + + mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) + defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck + + client, err := NewClient(mkv, c) + require.NoError(t, err) + + testKey := "resurrect-key" + now := time.Now() + + // Create a key (ACTIVE state) + cas(t, client, testKey, func(in *data) (*data, bool, error) { + d := getOrCreateData(in) + d.Members["replica0"] = member{Timestamp: now.Unix(), State: ACTIVE} + return d, true, nil + }) + + // Delete the key (Create a Tombstone) + err = client.Delete(context.Background(), testKey) + require.NoError(t, err) + + // Verify deletion + d := getData(t, client, testKey) + require.Nil(t, d, "The key should be successfully deleted.") + + // Prepare a new state with a future timestamp + futureTime := now.Add(time.Minute) + futureData := &data{ + Members: map[string]member{ + "replica0": {Timestamp: futureTime.Unix(), State: ACTIVE}, + }, + } + + // Send a gossip message with a newer metadata timestamp (UpdatedAt). + msg := marshalKeyValuePair(t, testKey, c, futureData, false, futureTime.UnixMilli()) + mkv.NotifyMsg(msg) + + // The key should be resurrected + resurrectedData := getData(t, client, testKey) + require.NotNil(t, resurrectedData, "The key should be resurrected after receiving a message with a newer UpdatedAt.") + require.Contains(t, resurrectedData.Members, "replica0") + require.Equal(t, ACTIVE, resurrectedData.Members["replica0"].State) + require.Equal(t, futureTime.Unix(), resurrectedData.Members["replica0"].Timestamp) +} + +func TestDeleteIdempotencyAndList(t *testing.T) { + c := dataCodec{} + + cfg := KVConfig{} + cfg.RetransmitMult = 1 + cfg.Codecs = append(cfg.Codecs, c) + + mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) + defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck + + client, err := NewClient(mkv, c) + require.NoError(t, err) + + ctx := context.Background() + + err = client.Delete(ctx, "non-existent-key") + require.NoError(t, err) + require.Equal(t, 0, len(mkv.GetBroadcasts(0, math.MaxInt32)), "Deleting a non-existent key should not trigger a broadcast") + + key1 := "prefix-key1" + key2 := "prefix-key2" + now := time.Now() + + createFn := func(in *data) (*data, bool, error) { + d := &data{Members: map[string]member{"node1": {Timestamp: now.Unix(), State: ACTIVE}}} + return d, true, nil + } + cas(t, client, key1, createFn) + cas(t, client, key2, createFn) + + // Drain broadcasts caused by the CAS operations + mkv.GetBroadcasts(0, math.MaxInt32) + + keys, err := client.List(ctx, "prefix-") + require.NoError(t, err) + require.ElementsMatch(t, []string{key1, key2}, keys, "Both keys should appear in the List before deletion") + + // Delete key1 + err = client.Delete(ctx, key1) + require.NoError(t, err) + require.Equal(t, 1, len(mkv.GetBroadcasts(0, math.MaxInt32)), "A successful deletion should trigger exactly 1 broadcast") + + keys, err = client.List(ctx, "prefix-") + require.NoError(t, err) + require.ElementsMatch(t, []string{key2}, keys, "The deleted key1 should be excluded from the List") + + err = client.Delete(ctx, key1) + require.NoError(t, err) + require.Equal(t, 0, len(mkv.GetBroadcasts(0, math.MaxInt32)), "Deleting an already tombstoned key should not trigger an additional broadcast") +} + +func TestDeleteTriggersWatchKeyNotification(t *testing.T) { + c := dataCodec{} + + cfg := KVConfig{} + cfg.RetransmitMult = 1 + cfg.Codecs = append(cfg.Codecs, c) + + mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) + defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck + + client, err := NewClient(mkv, c) + require.NoError(t, err) + + testKey := "watch-delete-key" + now := time.Now() + + // Channel to collect watched values + watchCh := make(chan any, 5) + ctx := t.Context() + + // Start watching the key in a separate goroutine + go client.WatchKey(ctx, testKey, func(val any) bool { + watchCh <- val + return true // Keep watching + }) + + // Add a small sleep to give the WatchKey goroutine enough time to + // register its internal channel. + time.Sleep(100 * time.Millisecond) + + // Create the key + cas(t, client, testKey, func(in *data) (*data, bool, error) { + d := &data{Members: map[string]member{"node1": {Timestamp: now.Unix(), State: ACTIVE}}} + return d, true, nil + }) + + // Expect a non-nil value from the watcher upon creation + var val any + select { + case val = <-watchCh: + require.NotNil(t, val, "Watcher should receive a non-nil value upon creation") + case <-time.After(2 * time.Second): + t.Fatal("Timeout waiting for watch notification on creation") + } + + // Delete the key + err = client.Delete(context.Background(), testKey) + require.NoError(t, err) + + // Expect a nil value from the watcher (Tombstone translates to nil for the client) + select { + case val = <-watchCh: + require.Nil(t, val, "Watcher should receive a nil value upon deletion") + case <-time.After(2 * time.Second): + t.Fatal("Timeout waiting for watch notification on deletion") + } +} + +func TestDeletePropagatedViaLocalStateSync(t *testing.T) { + c := dataCodec{} + + cfg := KVConfig{} + cfg.Codecs = append(cfg.Codecs, c) + + // Set up Node 1 + mkv1 := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv1)) + defer services.StopAndAwaitTerminated(context.Background(), mkv1) //nolint:errcheck + + client1, err := NewClient(mkv1, c) + require.NoError(t, err) + + testKey := "sync-tombstone-key" + now := time.Now() + + // Create and then Delete the key on Node 1 + cas(t, client1, testKey, func(in *data) (*data, bool, error) { + d := &data{Members: map[string]member{"node1": {Timestamp: now.Unix(), State: ACTIVE}}} + return d, true, nil + }) + require.NoError(t, client1.Delete(context.Background(), testKey)) + + // Generate the full state dump from Node 1 (Simulating Push/Pull sync) + stateData := mkv1.LocalState(false) + require.NotEmpty(t, stateData, "LocalState should contain data (including tombstones)") + + // Set up Node 2 + mkv2 := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv2)) + defer services.StopAndAwaitTerminated(context.Background(), mkv2) //nolint:errcheck + + client2, err := NewClient(mkv2, c) + require.NoError(t, err) + + // Apply the state dump from Node 1 to Node 2 + mkv2.MergeRemoteState(stateData, false) + + // The value should be nil when accessed via the client + val := get(t, client2, testKey) + require.Nil(t, val, "Key should be unreadable (nil) on Node 2 after sync") + + // Inspect Node 2's internal store to ensure it's recorded as a tombstone, not just missing + mkv2.storeMu.Lock() + desc, ok := mkv2.store[testKey] + mkv2.storeMu.Unlock() + + require.True(t, ok, "Key must exist in Node 2's internal store") + require.True(t, desc.deleted, "Key must be explicitly marked as deleted (Tombstone) on Node 2") + require.NotZero(t, desc.updatedAt, "Tombstone must have a valid updatedAt timestamp on Node 2") +} + +func TestDeleteAndCAS(t *testing.T) { + withFixtures(t, func(t *testing.T, kv *Client) { + name := "test-node" + + cas(t, kv, key, func(in *data) (*data, bool, error) { + d := getOrCreateData(in) + d.Members[name] = member{ + Timestamp: time.Now().Unix(), + Tokens: generateTokens(128), + State: ACTIVE, + } + return d, true, nil + }) + + r := getData(t, kv, key) + require.NotNil(t, r) + require.Contains(t, r.Members, name) + + // Delete the key + err := kv.Delete(context.Background(), key) + require.NoError(t, err) + + val := get(t, kv, key) + require.Nil(t, val) + + // verify deleted and updatedAt + kv.kv.storeMu.Lock() + desc, ok := kv.kv.store[key] + kv.kv.storeMu.Unlock() + require.True(t, ok) + require.True(t, desc.deleted) + require.NotZero(t, desc.updatedAt) + + cas(t, kv, key, func(in *data) (*data, bool, error) { + d := getOrCreateData(in) + d.Members[name] = member{ + Timestamp: time.Now().Unix(), + Tokens: generateTokens(128), + State: ACTIVE, + } + return d, true, nil + }) + + // verify resurrection + val = get(t, kv, key) + require.NotNil(t, val) + + // after CAS, the deleted should be false + kv.kv.storeMu.Lock() + desc, ok = kv.kv.store[key] + kv.kv.storeMu.Unlock() + require.True(t, ok) + require.False(t, desc.deleted) + }) +} + +func TestSweepTombstones(t *testing.T) { + c := dataCodec{} + + var cfg KVConfig + flagext.DefaultValues(&cfg) + cfg.TCPTransport = TCPTransportConfig{ + BindAddrs: []string{"localhost"}, + } + cfg.Codecs = []codec.Codec{c} + cfg.TombstoneTimeout = 1 * time.Second + + mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) + defer services.StopAndAwaitTerminated(context.Background(), mkv) // nolint:errcheck + + kv, err := NewClient(mkv, c) + require.NoError(t, err) + + key1 := "key-to-sweep" + key2 := "key-to-keep" + + // Insert dummy data for both keys + dummyUpdate := func(in *data) (*data, bool, error) { + d := getOrCreateData(in) + d.Members["node"] = member{Timestamp: time.Now().Unix(), State: ACTIVE} + return d, true, nil + } + cas(t, kv, key1, dummyUpdate) + cas(t, kv, key2, dummyUpdate) + + // Delete both keys manually + mkv.storeMu.Lock() + desc1 := mkv.store[key1] + desc2 := mkv.store[key2] + + // key1 should be deleted + desc1.deleted = true + desc1.updatedAt = time.Now().Add(-2 * time.Second) + mkv.store[key1] = desc1 + + // key should not be deleted + desc2.deleted = true + desc2.updatedAt = time.Now() + mkv.store[key2] = desc2 + mkv.storeMu.Unlock() + + mkv.sweepTombstones() + + mkv.storeMu.Lock() + _, ok1 := mkv.store[key1] + _, ok2 := mkv.store[key2] + mkv.storeMu.Unlock() + + // verify the result of the sweep + require.False(t, ok1) + require.True(t, ok2) + + // verify metric + require.Equal(t, float64(1), testutil.ToFloat64(mkv.sweptTombstones)) +} + func decodeDataFromMarshalledKeyValuePair(t *testing.T, marshalledKVP []byte, key string, codec dataCodec) *data { kvp := KeyValuePair{} require.NoError(t, kvp.Unmarshal(marshalledKVP)) @@ -1298,11 +1690,17 @@ func decodeDataFromMarshalledKeyValuePair(t *testing.T, marshalledKVP []byte, ke return d } -func marshalKeyValuePair(t *testing.T, key string, codec codec.Codec, value any) []byte { +func marshalKeyValuePair(t *testing.T, key string, codec codec.Codec, value any, deleted bool, updatedAt int64) []byte { data, err := codec.Encode(value) require.NoError(t, err) - kvp := KeyValuePair{Key: key, Codec: codec.CodecID(), Value: data} + kvp := KeyValuePair{ + Key: key, + Codec: codec.CodecID(), + Value: data, + Deleted: deleted, + UpdatedAt: updatedAt, + } data, err = kvp.Marshal() require.NoError(t, err) return data diff --git a/pkg/ring/kv/memberlist/metrics.go b/pkg/ring/kv/memberlist/metrics.go index 759140de88..23d561ce37 100644 --- a/pkg/ring/kv/memberlist/metrics.go +++ b/pkg/ring/kv/memberlist/metrics.go @@ -131,6 +131,13 @@ func (m *KV) createAndRegisterMetrics() { Help: "Total number of tombstones which have been removed from KV store values", }, []string{"key"}) + m.sweptTombstones = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{ + Namespace: m.cfg.MetricsNamespace, + Subsystem: subsystem, + Name: "kv_store_swept_tombstones_total", + Help: "Total number of deleted keys (tombstones) removed from the local store.", + }) + m.memberlistMembersCount = promauto.With(m.registerer).NewGaugeFunc(prometheus.GaugeOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index e202bb3a8a..4597e7a291 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -3756,7 +3756,7 @@ "x-format": "duration" }, "kvstore": { - "description": "Backend storage to use for the ring. Please be aware that memberlist is not supported by the HA tracker since gossip propagation is too slow for HA purposes.", + "description": "Backend storage to use for the ring. Memberlist support in the HA tracker is experimental, as gossip propagation delays may impact HA performance.", "properties": { "consul": { "$ref": "#/definitions/consul_config" @@ -5751,6 +5751,13 @@ "description": "Override the expected name on the server certificate.", "type": "string", "x-cli-flag": "memberlist.tls-server-name" + }, + "tombstone_timeout": { + "default": "5m0s", + "description": "How long to keep deleted keys (tombstones) in the KV store", + "type": "string", + "x-cli-flag": "memberlist.tombstone-timeout", + "x-format": "duration" } }, "type": "object"