-
Notifications
You must be signed in to change notification settings - Fork 851
nh support for metrics aggregation #7359
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
62d56be
cfc5cee
4740331
751d91e
ba81824
67b5d04
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ import ( | |
| "errors" | ||
| "fmt" | ||
| "math" | ||
| "sort" | ||
| "strings" | ||
| "sync" | ||
|
|
||
|
|
@@ -467,10 +468,77 @@ func (s *SummaryData) Metric(desc *prometheus.Desc, labelValues ...string) prome | |
| } | ||
|
|
||
| // HistogramData keeps data required to build histogram Metric. | ||
| // | ||
| // For native histograms, the Schema and ZeroThreshold are set from the first | ||
| // native histogram encountered. All aggregated histograms should use the same | ||
| // schema, as bucket indices have different meanings across different schemas. | ||
| type HistogramData struct { | ||
| sampleCount uint64 | ||
| sampleSum float64 | ||
| buckets map[float64]uint64 | ||
|
|
||
| // Native histogram fields | ||
| nativeMode bool | ||
| Schema int32 | ||
| ZeroThreshold float64 | ||
| ZeroCount uint64 | ||
| PositiveBuckets map[int]int64 // bucket index -> count | ||
| NegativeBuckets map[int]int64 | ||
| } | ||
|
|
||
| // isNative returns true if the dto.Histogram carries native histogram data. | ||
| // The authoritative signal is Schema being set (non-nil), which client_golang | ||
| // always populates for native/classic histograms regardless of observation count. | ||
| // Spans alone are insufficient because a zero-observation dual histogram has | ||
| // no spans but still has Schema set. | ||
| func isNative(histo *dto.Histogram) bool { | ||
| return histo.Schema != nil | ||
| } | ||
|
|
||
| func (d *HistogramData) hasNative() bool { | ||
| return d.nativeMode | ||
| } | ||
|
|
||
| // spansCountsToBucketMap converts native histogram spans and absolute bucket counts | ||
| // into a map of bucket index -> count for easier aggregation. | ||
| func spansCountsToBucketMap(spans []*dto.BucketSpan, counts []int64) map[int]int64 { | ||
| if len(spans) == 0 { | ||
| return nil | ||
| } | ||
| bucketMap := make(map[int]int64, len(counts)) | ||
| var idx int32 | ||
| bucketIdx := 0 | ||
| for _, sp := range spans { | ||
| idx += sp.GetOffset() | ||
| for j := 0; j < int(sp.GetLength()) && bucketIdx < len(counts); j++ { | ||
| bucketMap[int(idx)] += counts[bucketIdx] | ||
| idx++ | ||
| bucketIdx++ | ||
| } | ||
| } | ||
| return bucketMap | ||
| } | ||
|
|
||
| // deltasToCountsInt converts delta-encoded bucket counts to absolute counts. | ||
| func deltasToCountsInt(deltas []int64) []int64 { | ||
| counts := make([]int64, len(deltas)) | ||
| var cur int64 | ||
| for i, d := range deltas { | ||
| cur += int64(d) | ||
| counts[i] = cur | ||
| } | ||
| return counts | ||
| } | ||
|
|
||
| // mergeBucketMaps merges src bucket map into dst bucket map by summing counts for each bucket index. | ||
| func mergeBucketMaps(dst, src map[int]int64) map[int]int64 { | ||
| if dst == nil { | ||
| dst = make(map[int]int64) | ||
| } | ||
| for idx, count := range src { | ||
| dst[idx] += count | ||
| } | ||
| return dst | ||
| } | ||
|
|
||
| // AddHistogram adds histogram from gathered metrics to this histogram data. | ||
|
|
@@ -481,6 +549,26 @@ func (d *HistogramData) AddHistogram(histo *dto.Histogram) { | |
| d.sampleCount += histo.GetSampleCount() | ||
| d.sampleSum += histo.GetSampleSum() | ||
|
|
||
| if isNative(histo) { | ||
| // Initialize schema/threshold on first native histogram | ||
| if !d.hasNative() { | ||
| d.Schema = histo.GetSchema() | ||
| d.ZeroThreshold = histo.GetZeroThreshold() | ||
| d.nativeMode = true | ||
| } | ||
| d.ZeroCount += histo.GetZeroCount() | ||
|
|
||
| posCounts := deltasToCountsInt(histo.GetPositiveDelta()) | ||
| negCounts := deltasToCountsInt(histo.GetNegativeDelta()) | ||
|
|
||
| posMap := spansCountsToBucketMap(histo.GetPositiveSpan(), posCounts) | ||
| negMap := spansCountsToBucketMap(histo.GetNegativeSpan(), negCounts) | ||
|
|
||
| d.PositiveBuckets = mergeBucketMaps(d.PositiveBuckets, posMap) | ||
| d.NegativeBuckets = mergeBucketMaps(d.NegativeBuckets, negMap) | ||
| } | ||
|
|
||
| // Always collect classic buckets | ||
| histoBuckets := histo.GetBucket() | ||
| if len(histoBuckets) > 0 && d.buckets == nil { | ||
| d.buckets = map[float64]uint64{} | ||
|
|
@@ -499,7 +587,18 @@ func (d *HistogramData) AddHistogram(histo *dto.Histogram) { | |
| func (d *HistogramData) AddHistogramData(histo HistogramData) { | ||
| d.sampleCount += histo.sampleCount | ||
| d.sampleSum += histo.sampleSum | ||
| if histo.hasNative() { | ||
| if !d.hasNative() { | ||
| d.Schema = histo.Schema | ||
| d.ZeroThreshold = histo.ZeroThreshold | ||
| d.nativeMode = true | ||
| } | ||
| d.ZeroCount += histo.ZeroCount | ||
| d.PositiveBuckets = mergeBucketMaps(d.PositiveBuckets, histo.PositiveBuckets) | ||
| d.NegativeBuckets = mergeBucketMaps(d.NegativeBuckets, histo.NegativeBuckets) | ||
| } | ||
|
|
||
| // Always merge classic buckets. | ||
| if len(histo.buckets) > 0 && d.buckets == nil { | ||
| d.buckets = map[float64]uint64{} | ||
| } | ||
|
|
@@ -510,11 +609,84 @@ func (d *HistogramData) AddHistogramData(histo HistogramData) { | |
| } | ||
| } | ||
|
|
||
| // nativeHistogramMetric is basically the same as constNativeHistogram struct in prometheus histogram.go | ||
| // we need to create this new struct because the existing method NewConstNativeHistogram method in prometheus | ||
| // does not populate classic histogram fields. without this the NH compatible metrics are only exposed in NH format | ||
| // and classic histogram buckets are not exposed. | ||
| type nativeHistogramMetric struct { | ||
| desc *prometheus.Desc | ||
| dto.Histogram | ||
| labelPairs []*dto.LabelPair | ||
| } | ||
|
|
||
| func (m *nativeHistogramMetric) Desc() *prometheus.Desc { return m.desc } | ||
| func (m *nativeHistogramMetric) Write(out *dto.Metric) error { | ||
| out.Histogram = &m.Histogram | ||
| out.Label = m.labelPairs | ||
| return nil | ||
| } | ||
|
|
||
| // Metric returns prometheus metric from this histogram data. | ||
| // | ||
| // Note that returned metric shares bucket with this HistogramData, so avoid | ||
| // doing more modifications to this HistogramData after calling Metric. | ||
| func (d *HistogramData) Metric(desc *prometheus.Desc, labelValues ...string) prometheus.Metric { | ||
| if d.hasNative() { | ||
| // Build native spans+deltas from bucket maps. | ||
| posSpans, posDeltas := makeBucketsFromMap(d.PositiveBuckets) | ||
| negSpans, negDeltas := makeBucketsFromMap(d.NegativeBuckets) | ||
|
|
||
| schema := d.Schema | ||
| zt := d.ZeroThreshold | ||
| sc := d.sampleCount | ||
| ss := d.sampleSum | ||
| zc := d.ZeroCount | ||
|
|
||
| // Build classic buckets if available | ||
| var buckets []*dto.Bucket | ||
| if len(d.buckets) > 0 { | ||
| buckets = make([]*dto.Bucket, 0, len(d.buckets)) | ||
| for ub, cc := range d.buckets { | ||
| upperBound := ub | ||
| cumCount := cc | ||
| buckets = append(buckets, &dto.Bucket{ | ||
| UpperBound: &upperBound, | ||
| CumulativeCount: &cumCount, | ||
| }) | ||
| } | ||
| sort.Slice(buckets, func(i, j int) bool { | ||
| return buckets[i].GetUpperBound() < buckets[j].GetUpperBound() | ||
| }) | ||
| } | ||
|
|
||
| // Sentinel span for native histograms with no observations. | ||
| // This is required to distinguish an empty native histogram from a classic histogram. | ||
| // This matches the prometheus behavior (histogram.go:1958) | ||
| if zt == 0 && zc == 0 && len(posSpans) == 0 && len(negSpans) == 0 { | ||
| posSpans = []*dto.BucketSpan{{ | ||
| Offset: proto.Int32(0), | ||
| Length: proto.Uint32(0), | ||
| }} | ||
| } | ||
|
|
||
| // Construct histogram in-place within the struct (no intermediate copy) | ||
| return &nativeHistogramMetric{ | ||
| desc: desc, | ||
| Histogram: dto.Histogram{ | ||
| Schema: &schema, | ||
| ZeroThreshold: &zt, | ||
| SampleCount: &sc, | ||
| SampleSum: &ss, | ||
| ZeroCount: &zc, | ||
| PositiveSpan: posSpans, | ||
| PositiveDelta: posDeltas, | ||
| NegativeSpan: negSpans, | ||
| NegativeDelta: negDeltas, | ||
| Bucket: buckets, | ||
| }, | ||
| labelPairs: prometheus.MakeLabelPairs(desc, labelValues), | ||
| } | ||
| } | ||
| return prometheus.MustNewConstHistogram(desc, d.sampleCount, d.sampleSum, d.buckets, labelValues...) | ||
| } | ||
|
|
||
|
|
@@ -897,24 +1069,120 @@ func mergeCounter(mf1, mf2 *dto.Metric) { | |
| } | ||
|
|
||
| func mergeHistogram(mf1, mf2 *dto.Metric) { | ||
| bucketMap := map[float64]uint64{} | ||
|
|
||
| for _, bucket := range append(mf1.Histogram.GetBucket(), mf2.Histogram.GetBucket()...) { | ||
| bucketMap[bucket.GetUpperBound()] += bucket.GetCumulativeCount() | ||
| } | ||
|
|
||
| var newBucket []*dto.Bucket | ||
| for upperBound, cumulativeCount := range bucketMap { | ||
| ubValue := upperBound | ||
| ccValue := cumulativeCount | ||
| newBucket = append(newBucket, &dto.Bucket{UpperBound: &ubValue, CumulativeCount: &ccValue}) | ||
| } | ||
|
|
||
| newSampleCount := *mf1.Histogram.SampleCount + *mf2.Histogram.SampleCount | ||
| newSampleSum := *mf1.Histogram.SampleSum + *mf2.Histogram.SampleSum | ||
| mf1.Histogram.Bucket = newBucket | ||
| mf1.Histogram.SampleCount = &newSampleCount | ||
| mf1.Histogram.SampleSum = &newSampleSum | ||
|
|
||
| h1 := mf1.Histogram | ||
| h2 := mf2.Histogram | ||
|
|
||
| // Merge native histogram data if present. | ||
| // We'll process both native AND classic data below and expose in both formats | ||
| if isNative(h1) || isNative(h2) { | ||
| // Use schema/threshold from whichever side has native data (they should match). | ||
| if !isNative(h1) { | ||
| schema := h2.GetSchema() | ||
| h1.Schema = &schema | ||
| zt := h2.GetZeroThreshold() | ||
| h1.ZeroThreshold = &zt | ||
| } | ||
|
Comment on lines
+1082
to
+1089
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if h1 and h2 always have the same schema.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For this usecase for aggregating in |
||
|
|
||
| // Merge zero bucket counts | ||
| zc := h1.GetZeroCount() + h2.GetZeroCount() | ||
| h1.ZeroCount = &zc | ||
|
|
||
| // Convert spans+deltas to bucket maps, merge them, then convert back | ||
| posCounts1 := deltasToCountsInt(h1.GetPositiveDelta()) | ||
| posCounts2 := deltasToCountsInt(h2.GetPositiveDelta()) | ||
| negCounts1 := deltasToCountsInt(h1.GetNegativeDelta()) | ||
| negCounts2 := deltasToCountsInt(h2.GetNegativeDelta()) | ||
|
|
||
| posMap := mergeBucketMaps( | ||
| spansCountsToBucketMap(h1.GetPositiveSpan(), posCounts1), | ||
| spansCountsToBucketMap(h2.GetPositiveSpan(), posCounts2), | ||
| ) | ||
| negMap := mergeBucketMaps( | ||
| spansCountsToBucketMap(h1.GetNegativeSpan(), negCounts1), | ||
| spansCountsToBucketMap(h2.GetNegativeSpan(), negCounts2), | ||
| ) | ||
|
|
||
| h1.PositiveSpan, h1.PositiveDelta = makeBucketsFromMap(posMap) | ||
| h1.NegativeSpan, h1.NegativeDelta = makeBucketsFromMap(negMap) | ||
| } | ||
|
|
||
| // Merge classic histogram buckets if present. | ||
| if len(h1.GetBucket()) > 0 || len(h2.GetBucket()) > 0 { | ||
| bucketMap := map[float64]uint64{} | ||
| for _, bucket := range append(h1.GetBucket(), h2.GetBucket()...) { | ||
| bucketMap[bucket.GetUpperBound()] += bucket.GetCumulativeCount() | ||
| } | ||
|
|
||
| var newBucket []*dto.Bucket | ||
| for upperBound, cumulativeCount := range bucketMap { | ||
| ubValue := upperBound | ||
| ccValue := cumulativeCount | ||
| newBucket = append(newBucket, &dto.Bucket{UpperBound: &ubValue, CumulativeCount: &ccValue}) | ||
| } | ||
| h1.Bucket = newBucket | ||
| } | ||
| } | ||
|
|
||
| // bucketMapToSpansDeltas converts a bucket index->count map back into the | ||
| // spans+deltas encoding used by the native histogram proto representation. | ||
| // | ||
| // This implementation is the same as makeBucketsFromMap from prometheus | ||
| // (histogram.go:2006) to include the gap-filling optimization | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
| func makeBucketsFromMap(buckets map[int]int64) ([]*dto.BucketSpan, []int64) { | ||
| if len(buckets) == 0 { | ||
| return nil, nil | ||
| } | ||
| var ii []int | ||
| for k := range buckets { | ||
| ii = append(ii, k) | ||
| } | ||
| sort.Ints(ii) | ||
|
|
||
| var ( | ||
| spans []*dto.BucketSpan | ||
| deltas []int64 | ||
| prevCount int64 | ||
| nextI int | ||
| ) | ||
|
|
||
| appendDelta := func(count int64) { | ||
| *spans[len(spans)-1].Length++ | ||
| deltas = append(deltas, count-prevCount) | ||
| prevCount = count | ||
| } | ||
|
|
||
| for n, i := range ii { | ||
| count := buckets[i] | ||
| // Multiple spans with only small gaps in between are probably | ||
| // encoded more efficiently as one larger span with a few empty | ||
| // buckets. Needs some research to find the sweet spot. For now, | ||
| // we assume that gaps of one or two buckets should not create | ||
| // a new span. | ||
| iDelta := int32(i - nextI) | ||
| if n == 0 || iDelta > 2 { | ||
| // We have to create a new span, either because we are | ||
| // at the very beginning, or because we have found a gap | ||
| // of more than two buckets. | ||
| spans = append(spans, &dto.BucketSpan{ | ||
| Offset: proto.Int32(iDelta), | ||
| Length: proto.Uint32(0), | ||
| }) | ||
| } else { | ||
| // We have found a small gap (or no gap at all). | ||
| // Insert empty buckets as needed. | ||
| for range iDelta { | ||
| appendDelta(0) | ||
| } | ||
| } | ||
| appendDelta(count) | ||
| nextI = i + 1 | ||
| } | ||
| return spans, deltas | ||
| } | ||
|
|
||
| func mergeSummary(mf1 *dto.Metric, mf2 *dto.Metric) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add the code link instead of just file and line.