From bc8b933aa0722a6298e96aa063ec19ed649a7d1e Mon Sep 17 00:00:00 2001 From: yeya24 Date: Fri, 30 Jan 2026 17:21:14 -0800 Subject: [PATCH 1/3] return remote write v2 response stats correctly when write is HA deduplicated Signed-off-by: yeya24 --- pkg/distributor/distributor.go | 9 ++++- pkg/distributor/distributor_test.go | 55 +++++++++++++++++++++++++++++ pkg/util/push/push.go | 12 +++++-- pkg/util/push/push_test.go | 28 +++++++++++++++ 4 files changed, 100 insertions(+), 4 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 6ac313200a0..1e65920c2b6 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -776,7 +776,14 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co if errors.Is(err, ha.ReplicasNotMatchError{}) { // These samples have been deduped. d.dedupedSamples.WithLabelValues(userID, cluster).Add(float64(numFloatSamples + numHistogramSamples)) - return nil, httpgrpc.Errorf(http.StatusAccepted, "%s", err.Error()) + var dedupResp *cortexpb.WriteResponse + if d.cfg.RemoteWriteV2Enabled { + dedupResp = &cortexpb.WriteResponse{} + dedupResp.Samples = int64(numFloatSamples) + dedupResp.Histograms = int64(numHistogramSamples) + dedupResp.Exemplars = int64(numExemplars) + } + return dedupResp, httpgrpc.Errorf(http.StatusAccepted, "%s", err.Error()) } if errors.Is(err, ha.TooManyReplicaGroupsError{}) { diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 9cfc6a1501c..654eb8535b7 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -1301,6 +1301,58 @@ func TestDistributor_PushHAInstances(t *testing.T) { } } +func TestDistributor_PushHA_RWv2DedupReturnsStats(t *testing.T) { + t.Parallel() + ctx := user.InjectOrgID(context.Background(), "user") + + // When HA dedup occurs and RemoteWriteV2 is enabled, distributor must return + // WriteResponse with Samples/Histograms/Exemplars so the push handler can set RWv2 headers. + for _, enableHistogram := range []bool{false, true} { + enableHistogram := enableHistogram + t.Run(fmt.Sprintf("histogram=%v", enableHistogram), func(t *testing.T) { + t.Parallel() + var limits validation.Limits + flagext.DefaultValues(&limits) + limits.AcceptHASamples = true + limits.MaxLabelValueLength = 15 + + ds, _, _, _ := prepare(t, prepConfig{ + numIngesters: 3, + happyIngesters: 3, + numDistributors: 1, + shardByAllLabels: true, + limits: &limits, + enableTracker: true, + remoteWriteV2Enabled: true, + }) + + d := ds[0] + + // Accept "instance2" for cluster0; we will push from "instance0" so all samples are deduped. + err := d.HATracker.CheckReplica(ctx, "user", "cluster0", "instance2", time.Now()) + require.NoError(t, err) + + const numSamples = 5 + request := makeWriteRequestHA(numSamples, "instance0", "cluster0", enableHistogram) + response, err := d.Push(ctx, request) + + require.NotNil(t, response, "RWv2 HA dedup must return WriteResponse with stats") + if enableHistogram { + assert.Equal(t, int64(0), response.Samples) + assert.Equal(t, int64(numSamples), response.Histograms) + } else { + assert.Equal(t, int64(numSamples), response.Samples) + assert.Equal(t, int64(0), response.Histograms) + } + assert.Equal(t, int64(0), response.Exemplars) + + httpResp, ok := httpgrpc.HTTPResponseFromError(err) + require.True(t, ok, "expected HTTP error") + assert.Equal(t, int32(http.StatusAccepted), httpResp.Code) + }) + } +} + func TestDistributor_PushMixedHAInstances(t *testing.T) { t.Parallel() ctx := user.InjectOrgID(context.Background(), "user") @@ -3075,6 +3127,7 @@ type prepConfig struct { maxIngestionRate float64 replicationFactor int enableTracker bool + remoteWriteV2Enabled bool errFail error tokens [][]uint32 useStreamPush bool @@ -3225,6 +3278,8 @@ func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, [] cfg.limits.HAMaxClusters = 100 } + distributorCfg.RemoteWriteV2Enabled = cfg.remoteWriteV2Enabled + overrides := validation.NewOverrides(*cfg.limits, nil) reg := prometheus.NewPedanticRegistry() diff --git a/pkg/util/push/push.go b/pkg/util/push/push.go index d15c5e51e2b..0ffdca4d37b 100644 --- a/pkg/util/push/push.go +++ b/pkg/util/push/push.go @@ -118,13 +118,19 @@ func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, overrides *validation v1Req.Source = cortexpb.API } - if resp, err := push(ctx, &v1Req.WriteRequest); err != nil { + if writeResp, err := push(ctx, &v1Req.WriteRequest); err != nil { resp, ok := httpgrpc.HTTPResponseFromError(err) - setPRW2RespHeader(w, 0, 0, 0) if !ok { + setPRW2RespHeader(w, 0, 0, 0) http.Error(w, err.Error(), http.StatusInternalServerError) return } + // For the case of HA deduplication, add the stats headers from the push response. + if writeResp != nil { + setPRW2RespHeader(w, writeResp.Samples, writeResp.Histograms, writeResp.Exemplars) + } else { + setPRW2RespHeader(w, 0, 0, 0) + } if resp.GetCode()/100 == 5 { level.Error(logger).Log("msg", "push error", "err", err) } else if resp.GetCode() != http.StatusAccepted && resp.GetCode() != http.StatusTooManyRequests { @@ -132,7 +138,7 @@ func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, overrides *validation } http.Error(w, string(resp.Body), int(resp.Code)) } else { - setPRW2RespHeader(w, resp.Samples, resp.Histograms, resp.Exemplars) + setPRW2RespHeader(w, writeResp.Samples, writeResp.Histograms, writeResp.Exemplars) w.WriteHeader(http.StatusNoContent) } } diff --git a/pkg/util/push/push_test.go b/pkg/util/push/push_test.go index bf21863eea4..ba6b343d4f4 100644 --- a/pkg/util/push/push_test.go +++ b/pkg/util/push/push_test.go @@ -17,6 +17,7 @@ import ( "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/user" @@ -476,6 +477,33 @@ func TestHandler_remoteWrite(t *testing.T) { assert.Equal(t, "1", respHeader[rw20WrittenHistogramsHeader][0]) assert.Equal(t, "1", respHeader[rw20WrittenExemplarsHeader][0]) }) + t.Run("remote write v2 HA dedup", func(t *testing.T) { + // HA dedup: push returns error with StatusAccepted (202) but also a non-nil WriteResponse with stats. + dedupWriteResp := &cortexpb.WriteResponse{ + Samples: 5, + Histograms: 2, + Exemplars: 1, + } + dedupErr := httpgrpc.Errorf(http.StatusAccepted, "HA deduplication: samples deduped") + pushFunc := func(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) { + return dedupWriteResp, dedupErr + } + + ctx := context.Background() + ctx = user.InjectOrgID(ctx, "user-1") + handler := Handler(true, 100000, overrides, nil, pushFunc) + req := createRequest(t, createPrometheusRemoteWriteV2Protobuf(t), true) + req = req.WithContext(ctx) + resp := httptest.NewRecorder() + handler.ServeHTTP(resp, req) + + assert.Equal(t, http.StatusAccepted, resp.Code) + respHeader := resp.Header() + assert.Equal(t, "5", respHeader[rw20WrittenSamplesHeader][0]) + assert.Equal(t, "2", respHeader[rw20WrittenHistogramsHeader][0]) + assert.Equal(t, "1", respHeader[rw20WrittenExemplarsHeader][0]) + assert.Contains(t, resp.Body.String(), "HA deduplication") + }) } func TestHandler_ContentTypeAndEncoding(t *testing.T) { From a172be095288297819c1fff8d3d9ebb7ca457cc9 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Fri, 30 Jan 2026 17:31:59 -0800 Subject: [PATCH 2/3] changelog Signed-off-by: yeya24 --- CHANGELOG.md | 1 + pkg/distributor/distributor_test.go | 12 ++++++------ 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f2b02e6a64..3cc77be21c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,7 @@ * [BUGFIX] Query Frontend: Add Native Histogram extraction logic in results cache #7167 * [BUGFIX] Alertmanager: Fix alertmanager reloading bug that removes user template files #7196 * [BUGFIX] Query Scheduler: If max_outstanding_requests_per_tenant value is updated to lesser value than the current number of requests in the queue, the excess requests (newest ones) will be dropped to prevent deadlocks. #7188 +* [BUGFIX] Distributor: Return remote write V2 stats headers properly when the request is HA deduplicated. #7240 ## 1.20.1 2025-12-03 diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 654eb8535b7..40050f783f8 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -1317,12 +1317,12 @@ func TestDistributor_PushHA_RWv2DedupReturnsStats(t *testing.T) { limits.MaxLabelValueLength = 15 ds, _, _, _ := prepare(t, prepConfig{ - numIngesters: 3, - happyIngesters: 3, - numDistributors: 1, - shardByAllLabels: true, - limits: &limits, - enableTracker: true, + numIngesters: 3, + happyIngesters: 3, + numDistributors: 1, + shardByAllLabels: true, + limits: &limits, + enableTracker: true, remoteWriteV2Enabled: true, }) From d30504a34bd7d583c28e91c539c47bd6f719f1a7 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Fri, 30 Jan 2026 19:17:49 -0800 Subject: [PATCH 3/3] fix lint Signed-off-by: yeya24 --- pkg/distributor/distributor_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 40050f783f8..cd2d742b5ab 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -1308,7 +1308,6 @@ func TestDistributor_PushHA_RWv2DedupReturnsStats(t *testing.T) { // When HA dedup occurs and RemoteWriteV2 is enabled, distributor must return // WriteResponse with Samples/Histograms/Exemplars so the push handler can set RWv2 headers. for _, enableHistogram := range []bool{false, true} { - enableHistogram := enableHistogram t.Run(fmt.Sprintf("histogram=%v", enableHistogram), func(t *testing.T) { t.Parallel() var limits validation.Limits