From 6a99dab1437e710196180ab92deadc273cd4da51 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Fri, 30 Jan 2026 18:13:29 +0900 Subject: [PATCH 1/2] Add push request count metric by type Signed-off-by: SungJin1212 --- CHANGELOG.md | 1 + integration/remote_write_v2_test.go | 2 ++ pkg/api/api.go | 18 +++++++--- pkg/cortex/modules.go | 2 +- pkg/util/push/push.go | 18 +++++++++- pkg/util/push/push_test.go | 51 +++++++++++++++++++++++++---- 6 files changed, 79 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b76bcb3f999..9e4cbf487e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ * [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077 * [FEATURE] Querier: Add experimental projection pushdown support in Parquet Queryable. #7152 * [FEATURE] Ingester: Add experimental active series queried metric. #7173 +* [ENHANCEMENT] Distributor: Add `cortex_distributor_push_requests_total` metric to track the number of push requests by type. #7239 * [ENHANCEMENT] Querier: Add `-querier.store-gateway-series-batch-size` flag to configure the maximum number of series to be batched in a single gRPC response message from Store Gateways. #7203 * [ENHANCEMENT] HATracker: Add `-distributor.ha-tracker.enable-startup-sync` flag. If enabled, the ha-tracker fetches all tracked keys on startup to populate the local cache. #7213 * [ENHANCEMENT] Distributor: Add validation to ensure remote write v2 requests contain at least one sample or histogram. #7201 diff --git a/integration/remote_write_v2_test.go b/integration/remote_write_v2_test.go index d640155f7df..2ac1cae490e 100644 --- a/integration/remote_write_v2_test.go +++ b/integration/remote_write_v2_test.go @@ -368,6 +368,8 @@ func TestIngest(t *testing.T) { require.NoError(t, err) testPushHeader(t, writeStats, 0, 1, 0) + require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(5), []string{"cortex_distributor_push_requests_total"}, e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "type", "prw2")))) + testHistogramTimestamp := now.Add(blockRangePeriod * 2) expectedNH := tsdbutil.GenerateTestHistogram(int64(histogramIdx)) result, err = c.Query(`test_nh`, testHistogramTimestamp) diff --git a/pkg/api/api.go b/pkg/api/api.go index e77d082d436..c449869daf5 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -13,6 +13,8 @@ import ( "github.com/grafana/regexp" "github.com/klauspost/compress/gzhttp" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/httputil" @@ -280,10 +282,16 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc) { } // RegisterDistributor registers the endpoints associated with the distributor. -func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, overrides *validation.Overrides) { +func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, overrides *validation.Overrides, reg prometheus.Registerer) { distributorpb.RegisterDistributorServer(a.server.GRPC, d) - a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") + requestTotal := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "distributor_push_requests_total", + Help: "Total number of push requests by type.", + }, []string{"type"}) + + a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d), requestTotal), true, "POST") a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPMaxRecvMsgSize, overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status") @@ -295,7 +303,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib a.RegisterRoute("/distributor/ha_tracker", d.HATracker, false, "GET") // Legacy Routes - a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") + a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d), requestTotal), true, "POST") a.RegisterRoute("/all_user_stats", http.HandlerFunc(d.AllUserStatsHandler), false, "GET") a.RegisterRoute("/ha-tracker", d.HATracker, false, "GET") } @@ -328,12 +336,12 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config, overri a.RegisterRoute("/ingester/renewTokens", http.HandlerFunc(i.RenewTokenHandler), false, "GET", "POST") a.RegisterRoute("/ingester/all_user_stats", http.HandlerFunc(i.AllUserStatsHandler), false, "GET") a.RegisterRoute("/ingester/mode", http.HandlerFunc(i.ModeHandler), false, "GET", "POST") - a.RegisterRoute("/ingester/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push), true, "POST") // For testing and debugging. + a.RegisterRoute("/ingester/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push, nil), true, "POST") // For testing and debugging. // Legacy Routes a.RegisterRoute("/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST") a.RegisterRoute("/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST") - a.RegisterRoute("/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push), true, "POST") // For testing and debugging. + a.RegisterRoute("/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push, nil), true, "POST") // For testing and debugging. } func (a *API) RegisterTenantDeletion(api *purger.TenantDeletionAPI) { diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 3b56b6a8fb8..09c18bbbab9 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -258,7 +258,7 @@ func (t *Cortex) initGrpcClientServices() (serv services.Service, err error) { } func (t *Cortex) initDistributor() (serv services.Service, err error) { - t.API.RegisterDistributor(t.Distributor, t.Cfg.Distributor, t.Overrides) + t.API.RegisterDistributor(t.Distributor, t.Cfg.Distributor, t.Overrides, prometheus.DefaultRegisterer) return nil, nil } diff --git a/pkg/util/push/push.go b/pkg/util/push/push.go index 4bd8598b9a9..1ec90b23497 100644 --- a/pkg/util/push/push.go +++ b/pkg/util/push/push.go @@ -8,6 +8,7 @@ import ( "github.com/go-kit/log/level" "github.com/prometheus/client_golang/exp/api/remote" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/schema" @@ -34,13 +35,16 @@ const ( rw20WrittenSamplesHeader = "X-Prometheus-Remote-Write-Samples-Written" rw20WrittenHistogramsHeader = "X-Prometheus-Remote-Write-Histograms-Written" rw20WrittenExemplarsHeader = "X-Prometheus-Remote-Write-Exemplars-Written" + + labelValuePRW1 = "prw1" + labelValuePRW2 = "prw2" ) // Func defines the type of the push. It is similar to http.HandlerFunc. type Func func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) // Handler is a http.Handler which accepts WriteRequests. -func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, overrides *validation.Overrides, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler { +func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, overrides *validation.Overrides, sourceIPs *middleware.SourceIPExtractor, push Func, requestTotal *prometheus.CounterVec) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() logger := log.WithContext(ctx, log.Logger) @@ -150,6 +154,10 @@ func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, overrides *validation return } + if requestTotal != nil { + requestTotal.WithLabelValues(getTypeLabel(msgType)).Inc() + } + if msgType != remote.WriteV1MessageType && msgType != remote.WriteV2MessageType { level.Error(logger).Log("Not accepted msg type", "msgType", msgType, "err", err) http.Error(w, err.Error(), http.StatusUnsupportedMediaType) @@ -295,3 +303,11 @@ func convertV2ToV1Exemplars(b *labels.ScratchBuilder, symbols []string, v2Exempl } return v1Exemplars, nil } + +func getTypeLabel(msgType remote.WriteMessageType) string { + if msgType == remote.WriteV1MessageType { + return labelValuePRW1 + } + + return labelValuePRW2 +} diff --git a/pkg/util/push/push_test.go b/pkg/util/push/push_test.go index e8d1208b9d7..e5837734dea 100644 --- a/pkg/util/push/push_test.go +++ b/pkg/util/push/push_test.go @@ -10,6 +10,8 @@ import ( "time" "github.com/golang/snappy" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/prompb" @@ -126,7 +128,7 @@ func Benchmark_Handler(b *testing.B) { testSeriesNums := []int{10, 100, 500, 1000} for _, seriesNum := range testSeriesNums { b.Run(fmt.Sprintf("PRW1 with %d series", seriesNum), func(b *testing.B) { - handler := Handler(true, 1000000, overrides, nil, mockHandler) + handler := Handler(true, 1000000, overrides, nil, mockHandler, nil) req, err := createPRW1HTTPRequest(seriesNum) require.NoError(b, err) @@ -140,7 +142,7 @@ func Benchmark_Handler(b *testing.B) { } }) b.Run(fmt.Sprintf("PRW2 with %d series", seriesNum), func(b *testing.B) { - handler := Handler(true, 1000000, overrides, nil, mockHandler) + handler := Handler(true, 1000000, overrides, nil, mockHandler, nil) req, err := createPRW2HTTPRequest(seriesNum) require.NoError(b, err) @@ -511,7 +513,7 @@ func TestHandler_remoteWrite(t *testing.T) { t.Run(test.name, func(t *testing.T) { ctx := context.Background() ctx = user.InjectOrgID(ctx, "user-1") - handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API)) + handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API), nil) body, isV2 := test.createBody() req := createRequest(t, body, isV2) @@ -661,7 +663,7 @@ func TestHandler_ContentTypeAndEncoding(t *testing.T) { for _, test := range tests { t.Run(test.description, func(t *testing.T) { - handler := Handler(test.remoteWrite2Enabled, 100000, overrides, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API)) + handler := Handler(test.remoteWrite2Enabled, 100000, overrides, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API), nil) if test.isV2 { ctx := context.Background() @@ -688,7 +690,7 @@ func TestHandler_cortexWriteRequest(t *testing.T) { overrides := validation.NewOverrides(limits, nil) sourceIPs, _ := middleware.NewSourceIPs("SomeField", "(.*)") - handler := Handler(true, 100000, overrides, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API)) + handler := Handler(true, 100000, overrides, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API), nil) t.Run("remote write v1", func(t *testing.T) { req := createRequest(t, createCortexWriteRequestProtobuf(t, false, cortexpb.API), false) @@ -718,12 +720,49 @@ func TestHandler_ignoresSkipLabelNameValidationIfSet(t *testing.T) { createRequest(t, createCortexWriteRequestProtobuf(t, true, cortexpb.RULE), false), } { resp := httptest.NewRecorder() - handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.RULE)) + handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.RULE), nil) handler.ServeHTTP(resp, req) assert.Equal(t, 200, resp.Code) } } +func TestHandler_MetricCollection(t *testing.T) { + var limits validation.Limits + flagext.DefaultValues(&limits) + overrides := validation.NewOverrides(limits, nil) + + counter := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "test_counter", + Help: "test help", + }, []string{"type"}) + + handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API), counter) + + t.Run("counts v1 requests", func(t *testing.T) { + req := createRequest(t, createPrometheusRemoteWriteProtobuf(t), false) + resp := httptest.NewRecorder() + handler.ServeHTTP(resp, req) + assert.Equal(t, http.StatusOK, resp.Code) + + val := testutil.ToFloat64(counter.WithLabelValues("prw1")) + assert.Equal(t, 1.0, val) + }) + + t.Run("counts v2 requests", func(t *testing.T) { + ctx := context.Background() + ctx = user.InjectOrgID(ctx, "user-1") + + req := createRequest(t, createPrometheusRemoteWriteV2Protobuf(t), true) + req = req.WithContext(ctx) + resp := httptest.NewRecorder() + handler.ServeHTTP(resp, req) + assert.Equal(t, http.StatusNoContent, resp.Code) + + val := testutil.ToFloat64(counter.WithLabelValues("prw2")) + assert.Equal(t, 1.0, val) + }) +} + func verifyWriteRequestHandler(t *testing.T, expectSource cortexpb.SourceEnum) func(ctx context.Context, request *cortexpb.WriteRequest) (response *cortexpb.WriteResponse, err error) { t.Helper() return func(ctx context.Context, request *cortexpb.WriteRequest) (response *cortexpb.WriteResponse, err error) { From 7497c88cc847cb04b056058e980425a1f83f1186 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Mon, 2 Feb 2026 14:48:22 +0900 Subject: [PATCH 2/2] add otlp type Signed-off-by: SungJin1212 --- pkg/api/api.go | 2 +- pkg/util/push/otlp.go | 7 ++++++- pkg/util/push/otlp_test.go | 38 +++++++++++++++++++++++++++++++++++--- pkg/util/push/push.go | 1 + 4 files changed, 43 insertions(+), 5 deletions(-) diff --git a/pkg/api/api.go b/pkg/api/api.go index c449869daf5..2d1830619b4 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -292,7 +292,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib }, []string{"type"}) a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d), requestTotal), true, "POST") - a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPMaxRecvMsgSize, overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") + a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPMaxRecvMsgSize, overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d), requestTotal), true, "POST") a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status") a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/all_user_stats", "Usage Statistics") diff --git a/pkg/util/push/otlp.go b/pkg/util/push/otlp.go index e81b18515d1..bc162946042 100644 --- a/pkg/util/push/otlp.go +++ b/pkg/util/push/otlp.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/prompb" @@ -35,7 +36,7 @@ const ( ) // OTLPHandler is a http.Handler which accepts OTLP metrics. -func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distributor.OTLPConfig, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler { +func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distributor.OTLPConfig, sourceIPs *middleware.SourceIPExtractor, push Func, requestTotal *prometheus.CounterVec) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() logger := util_log.WithContext(ctx, util_log.Logger) @@ -59,6 +60,10 @@ func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distri return } + if requestTotal != nil { + requestTotal.WithLabelValues(labelValueOTLP).Inc() + } + prwReq := cortexpb.WriteRequest{ Source: cortexpb.API, Metadata: nil, diff --git a/pkg/util/push/otlp_test.go b/pkg/util/push/otlp_test.go index 67dc70f8b0e..3429d2d0c73 100644 --- a/pkg/util/push/otlp_test.go +++ b/pkg/util/push/otlp_test.go @@ -13,6 +13,8 @@ import ( "time" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/assert" @@ -631,7 +633,7 @@ func BenchmarkOTLPWriteHandlerCompression(b *testing.B) { mockPushFunc := func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) { return &cortexpb.WriteResponse{}, nil } - handler := OTLPHandler(10000, overrides, cfg, nil, mockPushFunc) + handler := OTLPHandler(10000, overrides, cfg, nil, mockPushFunc, nil) b.Run("json with no compression", func(b *testing.B) { req, err := getOTLPHttpRequest(&exportRequest, jsonContentType, "") @@ -701,7 +703,7 @@ func BenchmarkOTLPWriteHandlerPush(b *testing.B) { mockPushFunc := func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) { return &cortexpb.WriteResponse{}, nil } - handler := OTLPHandler(1000000, overrides, cfg, nil, mockPushFunc) + handler := OTLPHandler(1000000, overrides, cfg, nil, mockPushFunc, nil) tests := []struct { description string @@ -775,6 +777,36 @@ func BenchmarkOTLPWriteHandlerPush(b *testing.B) { } } +func TestOTLPHandler_MetricCollection(t *testing.T) { + cfg := distributor.OTLPConfig{ + ConvertAllAttributes: false, + DisableTargetInfo: false, + } + + exportRequest := generateOTLPWriteRequest() + + counter := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "test_counter", + Help: "test help", + }, []string{"type"}) + + req, err := getOTLPHttpRequest(&exportRequest, pbContentType, "") + require.NoError(t, err) + + push := verifyOTLPWriteRequestHandler(t, cortexpb.API) + overrides := validation.NewOverrides(querier.DefaultLimitsConfig(), nil) + handler := OTLPHandler(100000, overrides, cfg, nil, push, counter) + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + require.Equal(t, http.StatusOK, resp.StatusCode) + + val := testutil.ToFloat64(counter.WithLabelValues("otlp")) + assert.Equal(t, 1.0, val) +} + func TestOTLPWriteHandler(t *testing.T) { cfg := distributor.OTLPConfig{ ConvertAllAttributes: false, @@ -863,7 +895,7 @@ func TestOTLPWriteHandler(t *testing.T) { push := verifyOTLPWriteRequestHandler(t, cortexpb.API) overrides := validation.NewOverrides(querier.DefaultLimitsConfig(), nil) - handler := OTLPHandler(test.maxRecvMsgSize, overrides, cfg, nil, push) + handler := OTLPHandler(test.maxRecvMsgSize, overrides, cfg, nil, push, nil) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) diff --git a/pkg/util/push/push.go b/pkg/util/push/push.go index 1ec90b23497..f3f5d33cf81 100644 --- a/pkg/util/push/push.go +++ b/pkg/util/push/push.go @@ -38,6 +38,7 @@ const ( labelValuePRW1 = "prw1" labelValuePRW2 = "prw2" + labelValueOTLP = "otlp" ) // Func defines the type of the push. It is similar to http.HandlerFunc.