diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f2b02e6a64..a6485a61f68 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ * [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077 * [FEATURE] Querier: Add experimental projection pushdown support in Parquet Queryable. #7152 * [FEATURE] Ingester: Add experimental active series queried metric. #7173 +* [ENHANCEMENT] Distributor: Add `cortex_distributor_push_requests_total` metric to track the number of push requests by type. #7239 * [ENHANCEMENT] Querier: Add `-querier.store-gateway-series-batch-size` flag to configure the maximum number of series to be batched in a single gRPC response message from Store Gateways. #7203 * [ENHANCEMENT] HATracker: Add `-distributor.ha-tracker.enable-startup-sync` flag. If enabled, the ha-tracker fetches all tracked keys on startup to populate the local cache. #7213 * [ENHANCEMENT] Ingester: Add support for ingesting Native Histogram with Custom Buckets. #7191 diff --git a/integration/remote_write_v2_test.go b/integration/remote_write_v2_test.go index ed0f1902fc6..0c7cc62245e 100644 --- a/integration/remote_write_v2_test.go +++ b/integration/remote_write_v2_test.go @@ -368,6 +368,8 @@ func TestIngest(t *testing.T) { require.NoError(t, err) testPushHeader(t, writeStats, 0, 1, 0) + require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(5), []string{"cortex_distributor_push_requests_total"}, e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "type", "prw2")))) + testHistogramTimestamp := now.Add(blockRangePeriod * 2) expectedNH := tsdbutil.GenerateTestHistogram(int64(histogramIdx)) result, err = c.Query(`test_nh`, testHistogramTimestamp) diff --git a/pkg/api/api.go b/pkg/api/api.go index e77d082d436..c449869daf5 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -13,6 +13,8 @@ import ( "github.com/grafana/regexp" "github.com/klauspost/compress/gzhttp" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/httputil" @@ -280,10 +282,16 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc) { } // RegisterDistributor registers the endpoints associated with the distributor. -func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, overrides *validation.Overrides) { +func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, overrides *validation.Overrides, reg prometheus.Registerer) { distributorpb.RegisterDistributorServer(a.server.GRPC, d) - a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") + requestTotal := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "distributor_push_requests_total", + Help: "Total number of push requests by type.", + }, []string{"type"}) + + a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d), requestTotal), true, "POST") a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPMaxRecvMsgSize, overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status") @@ -295,7 +303,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib a.RegisterRoute("/distributor/ha_tracker", d.HATracker, false, "GET") // Legacy Routes - a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") + a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d), requestTotal), true, "POST") a.RegisterRoute("/all_user_stats", http.HandlerFunc(d.AllUserStatsHandler), false, "GET") a.RegisterRoute("/ha-tracker", d.HATracker, false, "GET") } @@ -328,12 +336,12 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config, overri a.RegisterRoute("/ingester/renewTokens", http.HandlerFunc(i.RenewTokenHandler), false, "GET", "POST") a.RegisterRoute("/ingester/all_user_stats", http.HandlerFunc(i.AllUserStatsHandler), false, "GET") a.RegisterRoute("/ingester/mode", http.HandlerFunc(i.ModeHandler), false, "GET", "POST") - a.RegisterRoute("/ingester/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push), true, "POST") // For testing and debugging. + a.RegisterRoute("/ingester/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push, nil), true, "POST") // For testing and debugging. // Legacy Routes a.RegisterRoute("/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST") a.RegisterRoute("/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST") - a.RegisterRoute("/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push), true, "POST") // For testing and debugging. + a.RegisterRoute("/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push, nil), true, "POST") // For testing and debugging. } func (a *API) RegisterTenantDeletion(api *purger.TenantDeletionAPI) { diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 3b56b6a8fb8..09c18bbbab9 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -258,7 +258,7 @@ func (t *Cortex) initGrpcClientServices() (serv services.Service, err error) { } func (t *Cortex) initDistributor() (serv services.Service, err error) { - t.API.RegisterDistributor(t.Distributor, t.Cfg.Distributor, t.Overrides) + t.API.RegisterDistributor(t.Distributor, t.Cfg.Distributor, t.Overrides, prometheus.DefaultRegisterer) return nil, nil } diff --git a/pkg/util/push/push.go b/pkg/util/push/push.go index d15c5e51e2b..0e6d6aed424 100644 --- a/pkg/util/push/push.go +++ b/pkg/util/push/push.go @@ -8,6 +8,7 @@ import ( "github.com/go-kit/log/level" "github.com/prometheus/client_golang/exp/api/remote" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/schema" @@ -34,13 +35,16 @@ const ( rw20WrittenSamplesHeader = "X-Prometheus-Remote-Write-Samples-Written" rw20WrittenHistogramsHeader = "X-Prometheus-Remote-Write-Histograms-Written" rw20WrittenExemplarsHeader = "X-Prometheus-Remote-Write-Exemplars-Written" + + labelValuePRW1 = "prw1" + labelValuePRW2 = "prw2" ) // Func defines the type of the push. It is similar to http.HandlerFunc. type Func func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) // Handler is a http.Handler which accepts WriteRequests. -func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, overrides *validation.Overrides, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler { +func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, overrides *validation.Overrides, sourceIPs *middleware.SourceIPExtractor, push Func, requestTotal *prometheus.CounterVec) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() logger := log.WithContext(ctx, log.Logger) @@ -151,6 +155,10 @@ func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, overrides *validation return } + if requestTotal != nil { + requestTotal.WithLabelValues(getTypeLabel(msgType)).Inc() + } + if msgType != remote.WriteV1MessageType && msgType != remote.WriteV2MessageType { level.Error(logger).Log("Not accepted msg type", "msgType", msgType, "err", err) http.Error(w, err.Error(), http.StatusUnsupportedMediaType) @@ -290,3 +298,11 @@ func convertV2ToV1Exemplars(b *labels.ScratchBuilder, symbols []string, v2Exempl } return v1Exemplars, nil } + +func getTypeLabel(msgType remote.WriteMessageType) string { + if msgType == remote.WriteV1MessageType { + return labelValuePRW1 + } + + return labelValuePRW2 +} diff --git a/pkg/util/push/push_test.go b/pkg/util/push/push_test.go index bf21863eea4..957bc9a866a 100644 --- a/pkg/util/push/push_test.go +++ b/pkg/util/push/push_test.go @@ -10,6 +10,8 @@ import ( "time" "github.com/golang/snappy" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/prompb" @@ -126,7 +128,7 @@ func Benchmark_Handler(b *testing.B) { testSeriesNums := []int{10, 100, 500, 1000} for _, seriesNum := range testSeriesNums { b.Run(fmt.Sprintf("PRW1 with %d series", seriesNum), func(b *testing.B) { - handler := Handler(true, 1000000, overrides, nil, mockHandler) + handler := Handler(true, 1000000, overrides, nil, mockHandler, nil) req, err := createPRW1HTTPRequest(seriesNum) require.NoError(b, err) @@ -140,7 +142,7 @@ func Benchmark_Handler(b *testing.B) { } }) b.Run(fmt.Sprintf("PRW2 with %d series", seriesNum), func(b *testing.B) { - handler := Handler(true, 1000000, overrides, nil, mockHandler) + handler := Handler(true, 1000000, overrides, nil, mockHandler, nil) req, err := createPRW2HTTPRequest(seriesNum) require.NoError(b, err) @@ -453,7 +455,7 @@ func TestHandler_remoteWrite(t *testing.T) { overrides := validation.NewOverrides(limits, nil) t.Run("remote write v1", func(t *testing.T) { - handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API)) + handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API), nil) req := createRequest(t, createPrometheusRemoteWriteProtobuf(t), false) resp := httptest.NewRecorder() handler.ServeHTTP(resp, req) @@ -463,7 +465,7 @@ func TestHandler_remoteWrite(t *testing.T) { ctx := context.Background() ctx = user.InjectOrgID(ctx, "user-1") - handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API)) + handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API), nil) req := createRequest(t, createPrometheusRemoteWriteV2Protobuf(t), true) req = req.WithContext(ctx) resp := httptest.NewRecorder() @@ -484,7 +486,7 @@ func TestHandler_ContentTypeAndEncoding(t *testing.T) { overrides := validation.NewOverrides(limits, nil) sourceIPs, _ := middleware.NewSourceIPs("SomeField", "(.*)") - handler := Handler(true, 100000, overrides, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API)) + handler := Handler(true, 100000, overrides, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API), nil) tests := []struct { description string @@ -612,7 +614,7 @@ func TestHandler_cortexWriteRequest(t *testing.T) { overrides := validation.NewOverrides(limits, nil) sourceIPs, _ := middleware.NewSourceIPs("SomeField", "(.*)") - handler := Handler(true, 100000, overrides, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API)) + handler := Handler(true, 100000, overrides, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API), nil) t.Run("remote write v1", func(t *testing.T) { req := createRequest(t, createCortexWriteRequestProtobuf(t, false, cortexpb.API), false) @@ -642,12 +644,49 @@ func TestHandler_ignoresSkipLabelNameValidationIfSet(t *testing.T) { createRequest(t, createCortexWriteRequestProtobuf(t, true, cortexpb.RULE), false), } { resp := httptest.NewRecorder() - handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.RULE)) + handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.RULE), nil) handler.ServeHTTP(resp, req) assert.Equal(t, 200, resp.Code) } } +func TestHandler_MetricCollection(t *testing.T) { + var limits validation.Limits + flagext.DefaultValues(&limits) + overrides := validation.NewOverrides(limits, nil) + + counter := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "test_counter", + Help: "test help", + }, []string{"type"}) + + handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API), counter) + + t.Run("counts v1 requests", func(t *testing.T) { + req := createRequest(t, createPrometheusRemoteWriteProtobuf(t), false) + resp := httptest.NewRecorder() + handler.ServeHTTP(resp, req) + assert.Equal(t, http.StatusOK, resp.Code) + + val := testutil.ToFloat64(counter.WithLabelValues("prw1")) + assert.Equal(t, 1.0, val) + }) + + t.Run("counts v2 requests", func(t *testing.T) { + ctx := context.Background() + ctx = user.InjectOrgID(ctx, "user-1") + + req := createRequest(t, createPrometheusRemoteWriteV2Protobuf(t), true) + req = req.WithContext(ctx) + resp := httptest.NewRecorder() + handler.ServeHTTP(resp, req) + assert.Equal(t, http.StatusNoContent, resp.Code) + + val := testutil.ToFloat64(counter.WithLabelValues("prw2")) + assert.Equal(t, 1.0, val) + }) +} + func verifyWriteRequestHandler(t *testing.T, expectSource cortexpb.SourceEnum) func(ctx context.Context, request *cortexpb.WriteRequest) (response *cortexpb.WriteResponse, err error) { t.Helper() return func(ctx context.Context, request *cortexpb.WriteRequest) (response *cortexpb.WriteResponse, err error) {