diff --git a/CHANGELOG.md b/CHANGELOG.md index efc085818a..faf33b8c93 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ # Changelog ## master / unreleased - +* [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359 * [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #7357 * [ENHANCEMENT] Query Scheduler: Add `cortex_query_scheduler_tracked_requests` metric to track the current number of requests held by the scheduler. #7355 diff --git a/pkg/util/metrics_helper.go b/pkg/util/metrics_helper.go index 9aafae7afb..168034b772 100644 --- a/pkg/util/metrics_helper.go +++ b/pkg/util/metrics_helper.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math" + "sort" "strings" "sync" @@ -467,10 +468,77 @@ func (s *SummaryData) Metric(desc *prometheus.Desc, labelValues ...string) prome } // HistogramData keeps data required to build histogram Metric. +// +// For native histograms, the Schema and ZeroThreshold are set from the first +// native histogram encountered. All aggregated histograms should use the same +// schema, as bucket indices have different meanings across different schemas. type HistogramData struct { sampleCount uint64 sampleSum float64 buckets map[float64]uint64 + + // Native histogram fields + nativeMode bool + Schema int32 + ZeroThreshold float64 + ZeroCount uint64 + PositiveBuckets map[int]int64 // bucket index -> count + NegativeBuckets map[int]int64 +} + +// isNative returns true if the dto.Histogram carries native histogram data. +// The authoritative signal is Schema being set (non-nil), which client_golang +// always populates for native/classic histograms regardless of observation count. +// Spans alone are insufficient because a zero-observation dual histogram has +// no spans but still has Schema set. +func isNative(histo *dto.Histogram) bool { + return histo.Schema != nil +} + +func (d *HistogramData) hasNative() bool { + return d.nativeMode +} + +// spansCountsToBucketMap converts native histogram spans and absolute bucket counts +// into a map of bucket index -> count for easier aggregation. +func spansCountsToBucketMap(spans []*dto.BucketSpan, counts []int64) map[int]int64 { + if len(spans) == 0 { + return nil + } + bucketMap := make(map[int]int64, len(counts)) + var idx int32 + bucketIdx := 0 + for _, sp := range spans { + idx += sp.GetOffset() + for j := 0; j < int(sp.GetLength()) && bucketIdx < len(counts); j++ { + bucketMap[int(idx)] += counts[bucketIdx] + idx++ + bucketIdx++ + } + } + return bucketMap +} + +// deltasToCountsInt converts delta-encoded bucket counts to absolute counts. +func deltasToCountsInt(deltas []int64) []int64 { + counts := make([]int64, len(deltas)) + var cur int64 + for i, d := range deltas { + cur += int64(d) + counts[i] = cur + } + return counts +} + +// mergeBucketMaps merges src bucket map into dst bucket map by summing counts for each bucket index. +func mergeBucketMaps(dst, src map[int]int64) map[int]int64 { + if dst == nil { + dst = make(map[int]int64) + } + for idx, count := range src { + dst[idx] += count + } + return dst } // AddHistogram adds histogram from gathered metrics to this histogram data. @@ -481,6 +549,26 @@ func (d *HistogramData) AddHistogram(histo *dto.Histogram) { d.sampleCount += histo.GetSampleCount() d.sampleSum += histo.GetSampleSum() + if isNative(histo) { + // Initialize schema/threshold on first native histogram + if !d.hasNative() { + d.Schema = histo.GetSchema() + d.ZeroThreshold = histo.GetZeroThreshold() + d.nativeMode = true + } + d.ZeroCount += histo.GetZeroCount() + + posCounts := deltasToCountsInt(histo.GetPositiveDelta()) + negCounts := deltasToCountsInt(histo.GetNegativeDelta()) + + posMap := spansCountsToBucketMap(histo.GetPositiveSpan(), posCounts) + negMap := spansCountsToBucketMap(histo.GetNegativeSpan(), negCounts) + + d.PositiveBuckets = mergeBucketMaps(d.PositiveBuckets, posMap) + d.NegativeBuckets = mergeBucketMaps(d.NegativeBuckets, negMap) + } + + // Always collect classic buckets histoBuckets := histo.GetBucket() if len(histoBuckets) > 0 && d.buckets == nil { d.buckets = map[float64]uint64{} @@ -499,7 +587,18 @@ func (d *HistogramData) AddHistogram(histo *dto.Histogram) { func (d *HistogramData) AddHistogramData(histo HistogramData) { d.sampleCount += histo.sampleCount d.sampleSum += histo.sampleSum + if histo.hasNative() { + if !d.hasNative() { + d.Schema = histo.Schema + d.ZeroThreshold = histo.ZeroThreshold + d.nativeMode = true + } + d.ZeroCount += histo.ZeroCount + d.PositiveBuckets = mergeBucketMaps(d.PositiveBuckets, histo.PositiveBuckets) + d.NegativeBuckets = mergeBucketMaps(d.NegativeBuckets, histo.NegativeBuckets) + } + // Always merge classic buckets. if len(histo.buckets) > 0 && d.buckets == nil { d.buckets = map[float64]uint64{} } @@ -510,11 +609,84 @@ func (d *HistogramData) AddHistogramData(histo HistogramData) { } } +// nativeHistogramMetric is basically the same as constNativeHistogram struct in prometheus histogram.go +// we need to create this new struct because the existing method NewConstNativeHistogram method in prometheus +// does not populate classic histogram fields. without this the NH compatible metrics are only exposed in NH format +// and classic histogram buckets are not exposed. +type nativeHistogramMetric struct { + desc *prometheus.Desc + dto.Histogram + labelPairs []*dto.LabelPair +} + +func (m *nativeHistogramMetric) Desc() *prometheus.Desc { return m.desc } +func (m *nativeHistogramMetric) Write(out *dto.Metric) error { + out.Histogram = &m.Histogram + out.Label = m.labelPairs + return nil +} + // Metric returns prometheus metric from this histogram data. // // Note that returned metric shares bucket with this HistogramData, so avoid // doing more modifications to this HistogramData after calling Metric. func (d *HistogramData) Metric(desc *prometheus.Desc, labelValues ...string) prometheus.Metric { + if d.hasNative() { + // Build native spans+deltas from bucket maps. + posSpans, posDeltas := makeBucketsFromMap(d.PositiveBuckets) + negSpans, negDeltas := makeBucketsFromMap(d.NegativeBuckets) + + schema := d.Schema + zt := d.ZeroThreshold + sc := d.sampleCount + ss := d.sampleSum + zc := d.ZeroCount + + // Build classic buckets if available + var buckets []*dto.Bucket + if len(d.buckets) > 0 { + buckets = make([]*dto.Bucket, 0, len(d.buckets)) + for ub, cc := range d.buckets { + upperBound := ub + cumCount := cc + buckets = append(buckets, &dto.Bucket{ + UpperBound: &upperBound, + CumulativeCount: &cumCount, + }) + } + sort.Slice(buckets, func(i, j int) bool { + return buckets[i].GetUpperBound() < buckets[j].GetUpperBound() + }) + } + + // Sentinel span for native histograms with no observations. + // This is required to distinguish an empty native histogram from a classic histogram. + // This matches the prometheus behavior (histogram.go:1958) + if zt == 0 && zc == 0 && len(posSpans) == 0 && len(negSpans) == 0 { + posSpans = []*dto.BucketSpan{{ + Offset: proto.Int32(0), + Length: proto.Uint32(0), + }} + } + + // Construct histogram in-place within the struct (no intermediate copy) + return &nativeHistogramMetric{ + desc: desc, + Histogram: dto.Histogram{ + Schema: &schema, + ZeroThreshold: &zt, + SampleCount: &sc, + SampleSum: &ss, + ZeroCount: &zc, + PositiveSpan: posSpans, + PositiveDelta: posDeltas, + NegativeSpan: negSpans, + NegativeDelta: negDeltas, + Bucket: buckets, + }, + labelPairs: prometheus.MakeLabelPairs(desc, labelValues), + } + } return prometheus.MustNewConstHistogram(desc, d.sampleCount, d.sampleSum, d.buckets, labelValues...) } @@ -897,24 +1069,120 @@ func mergeCounter(mf1, mf2 *dto.Metric) { } func mergeHistogram(mf1, mf2 *dto.Metric) { - bucketMap := map[float64]uint64{} - - for _, bucket := range append(mf1.Histogram.GetBucket(), mf2.Histogram.GetBucket()...) { - bucketMap[bucket.GetUpperBound()] += bucket.GetCumulativeCount() - } - - var newBucket []*dto.Bucket - for upperBound, cumulativeCount := range bucketMap { - ubValue := upperBound - ccValue := cumulativeCount - newBucket = append(newBucket, &dto.Bucket{UpperBound: &ubValue, CumulativeCount: &ccValue}) - } - newSampleCount := *mf1.Histogram.SampleCount + *mf2.Histogram.SampleCount newSampleSum := *mf1.Histogram.SampleSum + *mf2.Histogram.SampleSum - mf1.Histogram.Bucket = newBucket mf1.Histogram.SampleCount = &newSampleCount mf1.Histogram.SampleSum = &newSampleSum + + h1 := mf1.Histogram + h2 := mf2.Histogram + + // Merge native histogram data if present. + // We'll process both native AND classic data below and expose in both formats + if isNative(h1) || isNative(h2) { + // Use schema/threshold from whichever side has native data (they should match). + if !isNative(h1) { + schema := h2.GetSchema() + h1.Schema = &schema + zt := h2.GetZeroThreshold() + h1.ZeroThreshold = &zt + } + + // Merge zero bucket counts + zc := h1.GetZeroCount() + h2.GetZeroCount() + h1.ZeroCount = &zc + + // Convert spans+deltas to bucket maps, merge them, then convert back + posCounts1 := deltasToCountsInt(h1.GetPositiveDelta()) + posCounts2 := deltasToCountsInt(h2.GetPositiveDelta()) + negCounts1 := deltasToCountsInt(h1.GetNegativeDelta()) + negCounts2 := deltasToCountsInt(h2.GetNegativeDelta()) + + posMap := mergeBucketMaps( + spansCountsToBucketMap(h1.GetPositiveSpan(), posCounts1), + spansCountsToBucketMap(h2.GetPositiveSpan(), posCounts2), + ) + negMap := mergeBucketMaps( + spansCountsToBucketMap(h1.GetNegativeSpan(), negCounts1), + spansCountsToBucketMap(h2.GetNegativeSpan(), negCounts2), + ) + + h1.PositiveSpan, h1.PositiveDelta = makeBucketsFromMap(posMap) + h1.NegativeSpan, h1.NegativeDelta = makeBucketsFromMap(negMap) + } + + // Merge classic histogram buckets if present. + if len(h1.GetBucket()) > 0 || len(h2.GetBucket()) > 0 { + bucketMap := map[float64]uint64{} + for _, bucket := range append(h1.GetBucket(), h2.GetBucket()...) { + bucketMap[bucket.GetUpperBound()] += bucket.GetCumulativeCount() + } + + var newBucket []*dto.Bucket + for upperBound, cumulativeCount := range bucketMap { + ubValue := upperBound + ccValue := cumulativeCount + newBucket = append(newBucket, &dto.Bucket{UpperBound: &ubValue, CumulativeCount: &ccValue}) + } + h1.Bucket = newBucket + } +} + +// bucketMapToSpansDeltas converts a bucket index->count map back into the +// spans+deltas encoding used by the native histogram proto representation. +// +// This implementation is the same as makeBucketsFromMap from prometheus +// (histogram.go:2006) to include the gap-filling optimization +func makeBucketsFromMap(buckets map[int]int64) ([]*dto.BucketSpan, []int64) { + if len(buckets) == 0 { + return nil, nil + } + var ii []int + for k := range buckets { + ii = append(ii, k) + } + sort.Ints(ii) + + var ( + spans []*dto.BucketSpan + deltas []int64 + prevCount int64 + nextI int + ) + + appendDelta := func(count int64) { + *spans[len(spans)-1].Length++ + deltas = append(deltas, count-prevCount) + prevCount = count + } + + for n, i := range ii { + count := buckets[i] + // Multiple spans with only small gaps in between are probably + // encoded more efficiently as one larger span with a few empty + // buckets. Needs some research to find the sweet spot. For now, + // we assume that gaps of one or two buckets should not create + // a new span. + iDelta := int32(i - nextI) + if n == 0 || iDelta > 2 { + // We have to create a new span, either because we are + // at the very beginning, or because we have found a gap + // of more than two buckets. + spans = append(spans, &dto.BucketSpan{ + Offset: proto.Int32(iDelta), + Length: proto.Uint32(0), + }) + } else { + // We have found a small gap (or no gap at all). + // Insert empty buckets as needed. + for range iDelta { + appendDelta(0) + } + } + appendDelta(count) + nextI = i + 1 + } + return spans, deltas } func mergeSummary(mf1 *dto.Metric, mf2 *dto.Metric) { diff --git a/pkg/util/metrics_helper_test.go b/pkg/util/metrics_helper_test.go index 85d9895389..0b3dbc52b6 100644 --- a/pkg/util/metrics_helper_test.go +++ b/pkg/util/metrics_helper_test.go @@ -1258,3 +1258,270 @@ func verifyLabels(t *testing.T, m prometheus.Collector, filter map[string]string require.Equal(t, expectedLabels, result) } + +// TestIsNative tests the native histogram detection function +func TestIsNative(t *testing.T) { + // Native histogram has Schema set + schema := int32(0) + nativeHisto := &dto.Histogram{ + Schema: &schema, + } + require.True(t, isNative(nativeHisto)) + + // Classic histogram has no Schema + classicHisto := &dto.Histogram{ + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(100.0), + Bucket: []*dto.Bucket{ + {UpperBound: proto.Float64(1.0), CumulativeCount: proto.Uint64(5)}, + }, + } + require.False(t, isNative(classicHisto)) + + // Empty histogram + require.False(t, isNative(&dto.Histogram{})) +} + +// TestHistogramData_AddHistogram_Native tests adding native histograms +func TestHistogramData_AddHistogram_Native(t *testing.T) { + hd := &HistogramData{} + + // Create a native histogram + schema := int32(0) + zt := 0.001 + histo := &dto.Histogram{ + Schema: &schema, + ZeroThreshold: &zt, + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(100.0), + ZeroCount: proto.Uint64(2), + PositiveSpan: []*dto.BucketSpan{ + {Offset: proto.Int32(0), Length: proto.Uint32(2)}, + }, + PositiveDelta: []int64{5, 3}, // deltas: [5, 3] -> counts: [5, 8] + NegativeSpan: []*dto.BucketSpan{ + {Offset: proto.Int32(0), Length: proto.Uint32(1)}, + }, + NegativeDelta: []int64{2}, // deltas: [2] -> counts: [2] + } + + hd.AddHistogram(histo) + + require.True(t, hd.hasNative()) + require.Equal(t, int32(0), hd.Schema) + require.Equal(t, 0.001, hd.ZeroThreshold) + require.Equal(t, uint64(10), hd.sampleCount) + require.Equal(t, 100.0, hd.sampleSum) + require.Equal(t, uint64(2), hd.ZeroCount) + require.Equal(t, int64(5), hd.PositiveBuckets[0]) + require.Equal(t, int64(8), hd.PositiveBuckets[1]) + require.Equal(t, int64(2), hd.NegativeBuckets[0]) +} + +// TestHistogramData_AddHistogram_Classic tests adding classic histograms +func TestHistogramData_AddHistogram_Classic(t *testing.T) { + hd := &HistogramData{} + + histo := &dto.Histogram{ + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(100.0), + Bucket: []*dto.Bucket{ + {UpperBound: proto.Float64(1.0), CumulativeCount: proto.Uint64(5)}, + {UpperBound: proto.Float64(5.0), CumulativeCount: proto.Uint64(8)}, + {UpperBound: proto.Float64(10.0), CumulativeCount: proto.Uint64(10)}, + }, + } + + hd.AddHistogram(histo) + + require.False(t, hd.hasNative()) + require.Equal(t, uint64(10), hd.sampleCount) + require.Equal(t, 100.0, hd.sampleSum) + require.Equal(t, uint64(5), hd.buckets[1.0]) + require.Equal(t, uint64(8), hd.buckets[5.0]) + require.Equal(t, uint64(10), hd.buckets[10.0]) +} + +// TestHistogramData_AddHistogram_DualFormat tests adding dual-format histograms (both native and classic) +func TestHistogramData_AddHistogram_DualFormat(t *testing.T) { + hd := &HistogramData{} + + schema := int32(0) + zt := 0.001 + histo := &dto.Histogram{ + Schema: &schema, + ZeroThreshold: &zt, + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(100.0), + ZeroCount: proto.Uint64(2), + PositiveSpan: []*dto.BucketSpan{ + {Offset: proto.Int32(0), Length: proto.Uint32(1)}, + }, + PositiveDelta: []int64{5}, + Bucket: []*dto.Bucket{ + {UpperBound: proto.Float64(1.0), CumulativeCount: proto.Uint64(5)}, + {UpperBound: proto.Float64(5.0), CumulativeCount: proto.Uint64(8)}, + }, + } + + hd.AddHistogram(histo) + + // Both native and classic should be populated + require.True(t, hd.hasNative()) + require.Equal(t, uint64(2), hd.ZeroCount) + require.Equal(t, int64(5), hd.PositiveBuckets[0]) + require.Equal(t, uint64(5), hd.buckets[1.0]) + require.Equal(t, uint64(8), hd.buckets[5.0]) +} + +// TestHistogramData_AddHistogram_Multiple tests merging multiple histograms +func TestHistogramData_AddHistogram_Multiple(t *testing.T) { + hd := &HistogramData{} + + schema := int32(0) + zt := 0.001 + + // First histogram + histo1 := &dto.Histogram{ + Schema: &schema, + ZeroThreshold: &zt, + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(100.0), + ZeroCount: proto.Uint64(2), + PositiveSpan: []*dto.BucketSpan{ + {Offset: proto.Int32(0), Length: proto.Uint32(2)}, + }, + PositiveDelta: []int64{5, 3}, // counts: [5, 8] + } + + // Second histogram + histo2 := &dto.Histogram{ + Schema: &schema, + ZeroThreshold: &zt, + SampleCount: proto.Uint64(5), + SampleSum: proto.Float64(50.0), + ZeroCount: proto.Uint64(1), + PositiveSpan: []*dto.BucketSpan{ + {Offset: proto.Int32(1), Length: proto.Uint32(1)}, // bucket 1 + }, + PositiveDelta: []int64{2}, // counts: [2] at bucket 1 + } + + hd.AddHistogram(histo1) + hd.AddHistogram(histo2) + + require.Equal(t, uint64(15), hd.sampleCount) // 10 + 5 + require.Equal(t, 150.0, hd.sampleSum) // 100 + 50 + require.Equal(t, uint64(3), hd.ZeroCount) // 2 + 1 + require.Equal(t, int64(5), hd.PositiveBuckets[0]) + require.Equal(t, int64(10), hd.PositiveBuckets[1]) // 8 + 2 +} + +// TestMakeBucketsFromMap tests the conversion from bucket map to spans and deltas +func TestMakeBucketsFromMap(t *testing.T) { + tests := []struct { + name string + buckets map[int]int64 + expectedSpans int + expectedDeltas []int64 + }{ + { + name: "empty bucket map", + buckets: map[int]int64{}, + expectedSpans: 0, + expectedDeltas: nil, + }, + { + name: "single bucket", + buckets: map[int]int64{0: 5}, + expectedSpans: 1, + expectedDeltas: []int64{5}, + }, + { + name: "contiguous buckets", + buckets: map[int]int64{0: 5, 1: 8, 2: 3}, + expectedSpans: 1, + expectedDeltas: []int64{5, 3, -5}, // deltas from previous count + }, + { + name: "buckets with small gap (filled)", + buckets: map[int]int64{0: 5, 2: 3}, // gap of 1 + expectedSpans: 1, + expectedDeltas: []int64{5, -5, 3}, // includes zero for gap + }, + { + name: "buckets with large gap (new span)", + buckets: map[int]int64{0: 5, 5: 3}, // gap of 4 + expectedSpans: 2, + expectedDeltas: []int64{5, -2}, // deltas carry across spans: bucket0=5, bucket5=3-5=-2 + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + spans, deltas := makeBucketsFromMap(tt.buckets) + require.Equal(t, tt.expectedSpans, len(spans)) + if tt.expectedDeltas != nil { + require.Equal(t, tt.expectedDeltas, deltas) + } + }) + } +} + +// TestMergeHistogram tests the mergeHistogram function +func TestMergeHistogram(t *testing.T) { + schema := int32(0) + zt := 0.001 + + // First metric with native histogram + m1 := &dto.Metric{ + Histogram: &dto.Histogram{ + Schema: &schema, + ZeroThreshold: &zt, + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(100.0), + ZeroCount: proto.Uint64(2), + PositiveSpan: []*dto.BucketSpan{ + {Offset: proto.Int32(0), Length: proto.Uint32(1)}, + }, + PositiveDelta: []int64{5}, + Bucket: []*dto.Bucket{ + {UpperBound: proto.Float64(1.0), CumulativeCount: proto.Uint64(5)}, + }, + }, + } + + // Second metric with native histogram + m2 := &dto.Metric{ + Histogram: &dto.Histogram{ + Schema: &schema, + ZeroThreshold: &zt, + SampleCount: proto.Uint64(5), + SampleSum: proto.Float64(50.0), + ZeroCount: proto.Uint64(1), + PositiveSpan: []*dto.BucketSpan{ + {Offset: proto.Int32(0), Length: proto.Uint32(1)}, + }, + PositiveDelta: []int64{3}, + Bucket: []*dto.Bucket{ + {UpperBound: proto.Float64(1.0), CumulativeCount: proto.Uint64(3)}, + }, + }, + } + + mergeHistogram(m1, m2) + + h := m1.Histogram + require.Equal(t, uint64(15), h.GetSampleCount()) // 10 + 5 + require.Equal(t, 150.0, h.GetSampleSum()) // 100 + 50 + require.Equal(t, uint64(3), h.GetZeroCount()) // 2 + 1 + + // Check classic buckets merged + require.Equal(t, 1, len(h.Bucket)) + require.Equal(t, uint64(8), h.Bucket[0].GetCumulativeCount()) // 5 + 3 + + // Check native buckets merged (should be 5 + 3 = 8) + posCounts := deltasToCountsInt(h.GetPositiveDelta()) + require.Equal(t, 1, len(posCounts)) + require.Equal(t, int64(8), posCounts[0]) +}