diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f2b02e6a64..2164d604b8a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ * [ENHANCEMENT] Ingester: Add fetch timeout for Ingester expanded postings cache. #7185 * [ENHANCEMENT] Ingester: Add feature flag to collect metrics of how expensive an unoptimized regex matcher is and new limits to protect Ingester query path against expensive unoptimized regex matchers. #7194 #7210 * [ENHANCEMENT] Compactor: Add partition group creation time to visit marker. #7217 +* [BUGFIX] Distributor: If remote write v2 is disabled, explicitly return HTTP 415 (Unsupported Media Type) for Remote Write V2 requests instead of attempting to parse them as V1. #7238 * [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088 * [BUGFIX] Ruler: Add XFunctions validation support. #7111 * [BUGFIX] Querier: propagate Prometheus info annotations in protobuf responses. #7132 diff --git a/integration/remote_write_v2_test.go b/integration/remote_write_v2_test.go index ed0f1902fc6..d640155f7df 100644 --- a/integration/remote_write_v2_test.go +++ b/integration/remote_write_v2_test.go @@ -194,7 +194,7 @@ func TestIngest_SenderSendPRW2_DistributorNotAllowPRW2(t *testing.T) { symbols1, series, _ := e2e.GenerateSeriesV2("test_series", now, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "foo", Value: "bar"}) _, err = c.PushV2(symbols1, series) require.Error(t, err) - require.Contains(t, err.Error(), "sent v2 request; got 2xx, but PRW 2.0 response header statistics indicate 0 samples, 0 histograms and 0 exemplars were accepted") + require.Contains(t, err.Error(), "io.prometheus.write.v2.Request protobuf message is not accepted by this server; only accepts prometheus.WriteRequest") // sample result, err := c.Query("test_series", now) diff --git a/pkg/util/push/push.go b/pkg/util/push/push.go index d15c5e51e2b..abb4103a97f 100644 --- a/pkg/util/push/push.go +++ b/pkg/util/push/push.go @@ -137,43 +137,44 @@ func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, overrides *validation } } - if remoteWrite2Enabled { - // follow Prometheus https://github.com/prometheus/prometheus/blob/v3.3.1/storage/remote/write_handler.go#L121 - contentType := r.Header.Get("Content-Type") - if contentType == "" { - contentType = appProtoContentType - } + // follow Prometheus https://github.com/prometheus/prometheus/blob/v3.3.1/storage/remote/write_handler.go#L121 + contentType := r.Header.Get("Content-Type") + if contentType == "" { + contentType = appProtoContentType + } - msgType, err := remote.ParseProtoMsg(contentType) - if err != nil { - level.Error(logger).Log("Error decoding remote write request", "err", err) - http.Error(w, err.Error(), http.StatusUnsupportedMediaType) - return - } + msgType, err := remote.ParseProtoMsg(contentType) + if err != nil { + level.Error(logger).Log("Error decoding remote write request", "err", err) + http.Error(w, err.Error(), http.StatusUnsupportedMediaType) + return + } - if msgType != remote.WriteV1MessageType && msgType != remote.WriteV2MessageType { - level.Error(logger).Log("Not accepted msg type", "msgType", msgType, "err", err) - http.Error(w, err.Error(), http.StatusUnsupportedMediaType) - return - } + if msgType != remote.WriteV1MessageType && msgType != remote.WriteV2MessageType { + level.Error(logger).Log("Not accepted msg type", "msgType", msgType, "err", err) + http.Error(w, err.Error(), http.StatusUnsupportedMediaType) + return + } - enc := r.Header.Get("Content-Encoding") - if enc == "" { - } else if enc != compression.Snappy { - err := fmt.Errorf("%v encoding (compression) is not accepted by this server; only %v is acceptable", enc, compression.Snappy) - level.Error(logger).Log("Error decoding remote write request", "err", err) - http.Error(w, err.Error(), http.StatusUnsupportedMediaType) - return - } + enc := r.Header.Get("Content-Encoding") + if enc == "" { + } else if enc != compression.Snappy { + err := fmt.Errorf("%v encoding (compression) is not accepted by this server; only %v is acceptable", enc, compression.Snappy) + level.Error(logger).Log("Error decoding remote write request", "err", err) + http.Error(w, err.Error(), http.StatusUnsupportedMediaType) + return + } - switch msgType { - case remote.WriteV1MessageType: - handlePRW1() - case remote.WriteV2MessageType: - handlePRW2() - } - } else { + switch msgType { + case remote.WriteV1MessageType: handlePRW1() + case remote.WriteV2MessageType: + if !remoteWrite2Enabled { + errMsg := fmt.Sprintf("%v protobuf message is not accepted by this server; only accepts %v", msgType, remote.WriteV1MessageType) + http.Error(w, errMsg, http.StatusUnsupportedMediaType) + return + } + handlePRW2() } }) } diff --git a/pkg/util/push/push_test.go b/pkg/util/push/push_test.go index bf21863eea4..cf264705721 100644 --- a/pkg/util/push/push_test.go +++ b/pkg/util/push/push_test.go @@ -484,13 +484,13 @@ func TestHandler_ContentTypeAndEncoding(t *testing.T) { overrides := validation.NewOverrides(limits, nil) sourceIPs, _ := middleware.NewSourceIPs("SomeField", "(.*)") - handler := Handler(true, 100000, overrides, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API)) tests := []struct { - description string - reqHeaders map[string]string - expectedCode int - isV2 bool + description string + reqHeaders map[string]string + expectedCode int + isV2 bool + remoteWrite2Enabled bool }{ { description: "[RW 2.0] correct content-type", @@ -499,8 +499,9 @@ func TestHandler_ContentTypeAndEncoding(t *testing.T) { "Content-Encoding": "snappy", remoteWriteVersionHeader: "2.0.0", }, - expectedCode: http.StatusNoContent, - isV2: true, + expectedCode: http.StatusNoContent, + isV2: true, + remoteWrite2Enabled: true, }, { description: "[RW 1.0] correct content-type", @@ -529,8 +530,9 @@ func TestHandler_ContentTypeAndEncoding(t *testing.T) { "Content-Encoding": "snappy", remoteWriteVersionHeader: "2.0.0", }, - expectedCode: http.StatusUnsupportedMediaType, - isV2: true, + expectedCode: http.StatusUnsupportedMediaType, + isV2: true, + remoteWrite2Enabled: true, }, { description: "[RW 2.0] wrong content-encoding", @@ -539,13 +541,26 @@ func TestHandler_ContentTypeAndEncoding(t *testing.T) { "Content-Encoding": "zstd", remoteWriteVersionHeader: "2.0.0", }, - expectedCode: http.StatusUnsupportedMediaType, - isV2: true, + expectedCode: http.StatusUnsupportedMediaType, + isV2: true, + remoteWrite2Enabled: true, }, { - description: "no header, should treated as RW 1.0", - expectedCode: http.StatusOK, - isV2: false, + description: "[RW 2.0] V2 disabled", + reqHeaders: map[string]string{ + "Content-Type": appProtoV2ContentType, + "Content-Encoding": "snappy", + remoteWriteVersionHeader: "2.0.0", + }, + expectedCode: http.StatusUnsupportedMediaType, + isV2: true, + remoteWrite2Enabled: false, + }, + { + description: "no header, should treated as RW 1.0", + expectedCode: http.StatusOK, + isV2: false, + remoteWrite2Enabled: true, }, { description: "missing content-type, should treated as RW 1.0", @@ -553,8 +568,9 @@ func TestHandler_ContentTypeAndEncoding(t *testing.T) { "Content-Encoding": "snappy", remoteWriteVersionHeader: "2.0.0", }, - expectedCode: http.StatusOK, - isV2: false, + expectedCode: http.StatusOK, + isV2: false, + remoteWrite2Enabled: true, }, { description: "missing content-encoding", @@ -562,8 +578,9 @@ func TestHandler_ContentTypeAndEncoding(t *testing.T) { "Content-Type": appProtoV2ContentType, remoteWriteVersionHeader: "2.0.0", }, - expectedCode: http.StatusNoContent, - isV2: true, + expectedCode: http.StatusNoContent, + isV2: true, + remoteWrite2Enabled: true, }, { description: "missing remote write version, should treated based on Content-type", @@ -571,8 +588,9 @@ func TestHandler_ContentTypeAndEncoding(t *testing.T) { "Content-Type": appProtoV2ContentType, "Content-Encoding": "snappy", }, - expectedCode: http.StatusNoContent, - isV2: true, + expectedCode: http.StatusNoContent, + isV2: true, + remoteWrite2Enabled: true, }, { description: "missing remote write version, should treated based on Content-type", @@ -580,13 +598,16 @@ func TestHandler_ContentTypeAndEncoding(t *testing.T) { "Content-Type": appProtoV1ContentType, "Content-Encoding": "snappy", }, - expectedCode: http.StatusOK, - isV2: false, + expectedCode: http.StatusOK, + isV2: false, + remoteWrite2Enabled: true, }, } for _, test := range tests { t.Run(test.description, func(t *testing.T) { + handler := Handler(test.remoteWrite2Enabled, 100000, overrides, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API)) + if test.isV2 { ctx := context.Background() ctx = user.InjectOrgID(ctx, "user-1")