Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
* [BUGFIX] Query Frontend: Add Native Histogram extraction logic in results cache #7167
* [BUGFIX] Alertmanager: Fix alertmanager reloading bug that removes user template files #7196
* [BUGFIX] Query Scheduler: If max_outstanding_requests_per_tenant value is updated to lesser value than the current number of requests in the queue, the excess requests (newest ones) will be dropped to prevent deadlocks. #7188
* [BUGFIX] Distributor: Return remote write V2 stats headers properly when the request is HA deduplicated. #7240

## 1.20.1 2025-12-03

Expand Down
9 changes: 8 additions & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,14 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
if errors.Is(err, ha.ReplicasNotMatchError{}) {
// These samples have been deduped.
d.dedupedSamples.WithLabelValues(userID, cluster).Add(float64(numFloatSamples + numHistogramSamples))
return nil, httpgrpc.Errorf(http.StatusAccepted, "%s", err.Error())
var dedupResp *cortexpb.WriteResponse
if d.cfg.RemoteWriteV2Enabled {
dedupResp = &cortexpb.WriteResponse{}
dedupResp.Samples = int64(numFloatSamples)
dedupResp.Histograms = int64(numHistogramSamples)
dedupResp.Exemplars = int64(numExemplars)
}
return dedupResp, httpgrpc.Errorf(http.StatusAccepted, "%s", err.Error())
}

if errors.Is(err, ha.TooManyReplicaGroupsError{}) {
Expand Down
54 changes: 54 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1301,6 +1301,57 @@ func TestDistributor_PushHAInstances(t *testing.T) {
}
}

func TestDistributor_PushHA_RWv2DedupReturnsStats(t *testing.T) {
t.Parallel()
ctx := user.InjectOrgID(context.Background(), "user")

// When HA dedup occurs and RemoteWriteV2 is enabled, distributor must return
// WriteResponse with Samples/Histograms/Exemplars so the push handler can set RWv2 headers.
for _, enableHistogram := range []bool{false, true} {
t.Run(fmt.Sprintf("histogram=%v", enableHistogram), func(t *testing.T) {
t.Parallel()
var limits validation.Limits
flagext.DefaultValues(&limits)
limits.AcceptHASamples = true
limits.MaxLabelValueLength = 15

ds, _, _, _ := prepare(t, prepConfig{
numIngesters: 3,
happyIngesters: 3,
numDistributors: 1,
shardByAllLabels: true,
limits: &limits,
enableTracker: true,
remoteWriteV2Enabled: true,
})

d := ds[0]

// Accept "instance2" for cluster0; we will push from "instance0" so all samples are deduped.
err := d.HATracker.CheckReplica(ctx, "user", "cluster0", "instance2", time.Now())
require.NoError(t, err)

const numSamples = 5
request := makeWriteRequestHA(numSamples, "instance0", "cluster0", enableHistogram)
response, err := d.Push(ctx, request)

require.NotNil(t, response, "RWv2 HA dedup must return WriteResponse with stats")
if enableHistogram {
assert.Equal(t, int64(0), response.Samples)
assert.Equal(t, int64(numSamples), response.Histograms)
} else {
assert.Equal(t, int64(numSamples), response.Samples)
assert.Equal(t, int64(0), response.Histograms)
}
assert.Equal(t, int64(0), response.Exemplars)

httpResp, ok := httpgrpc.HTTPResponseFromError(err)
require.True(t, ok, "expected HTTP error")
assert.Equal(t, int32(http.StatusAccepted), httpResp.Code)
})
}
}

func TestDistributor_PushMixedHAInstances(t *testing.T) {
t.Parallel()
ctx := user.InjectOrgID(context.Background(), "user")
Expand Down Expand Up @@ -3075,6 +3126,7 @@ type prepConfig struct {
maxIngestionRate float64
replicationFactor int
enableTracker bool
remoteWriteV2Enabled bool
errFail error
tokens [][]uint32
useStreamPush bool
Expand Down Expand Up @@ -3225,6 +3277,8 @@ func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []
cfg.limits.HAMaxClusters = 100
}

distributorCfg.RemoteWriteV2Enabled = cfg.remoteWriteV2Enabled

overrides := validation.NewOverrides(*cfg.limits, nil)

reg := prometheus.NewPedanticRegistry()
Expand Down
12 changes: 9 additions & 3 deletions pkg/util/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,21 +118,27 @@ func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, overrides *validation
v1Req.Source = cortexpb.API
}

if resp, err := push(ctx, &v1Req.WriteRequest); err != nil {
if writeResp, err := push(ctx, &v1Req.WriteRequest); err != nil {
resp, ok := httpgrpc.HTTPResponseFromError(err)
setPRW2RespHeader(w, 0, 0, 0)
if !ok {
setPRW2RespHeader(w, 0, 0, 0)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// For the case of HA deduplication, add the stats headers from the push response.
if writeResp != nil {
setPRW2RespHeader(w, writeResp.Samples, writeResp.Histograms, writeResp.Exemplars)
} else {
setPRW2RespHeader(w, 0, 0, 0)
}
if resp.GetCode()/100 == 5 {
level.Error(logger).Log("msg", "push error", "err", err)
} else if resp.GetCode() != http.StatusAccepted && resp.GetCode() != http.StatusTooManyRequests {
level.Warn(logger).Log("msg", "push refused", "err", err)
}
http.Error(w, string(resp.Body), int(resp.Code))
} else {
setPRW2RespHeader(w, resp.Samples, resp.Histograms, resp.Exemplars)
setPRW2RespHeader(w, writeResp.Samples, writeResp.Histograms, writeResp.Exemplars)
w.WriteHeader(http.StatusNoContent)
}
}
Expand Down
28 changes: 28 additions & 0 deletions pkg/util/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"

Expand Down Expand Up @@ -476,6 +477,33 @@ func TestHandler_remoteWrite(t *testing.T) {
assert.Equal(t, "1", respHeader[rw20WrittenHistogramsHeader][0])
assert.Equal(t, "1", respHeader[rw20WrittenExemplarsHeader][0])
})
t.Run("remote write v2 HA dedup", func(t *testing.T) {
// HA dedup: push returns error with StatusAccepted (202) but also a non-nil WriteResponse with stats.
dedupWriteResp := &cortexpb.WriteResponse{
Samples: 5,
Histograms: 2,
Exemplars: 1,
}
dedupErr := httpgrpc.Errorf(http.StatusAccepted, "HA deduplication: samples deduped")
pushFunc := func(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
return dedupWriteResp, dedupErr
}

ctx := context.Background()
ctx = user.InjectOrgID(ctx, "user-1")
handler := Handler(true, 100000, overrides, nil, pushFunc)
req := createRequest(t, createPrometheusRemoteWriteV2Protobuf(t), true)
req = req.WithContext(ctx)
resp := httptest.NewRecorder()
handler.ServeHTTP(resp, req)

assert.Equal(t, http.StatusAccepted, resp.Code)
respHeader := resp.Header()
assert.Equal(t, "5", respHeader[rw20WrittenSamplesHeader][0])
assert.Equal(t, "2", respHeader[rw20WrittenHistogramsHeader][0])
assert.Equal(t, "1", respHeader[rw20WrittenExemplarsHeader][0])
assert.Contains(t, resp.Body.String(), "HA deduplication")
})
}

func TestHandler_ContentTypeAndEncoding(t *testing.T) {
Expand Down
Loading