Skip to content

Add PercentileTDigest support for MergeAndRollup aggregation#18088

Open
justahuman1 wants to merge 7 commits intoapache:masterfrom
justahuman1:svallabh/tdigest-merge-rollup
Open

Add PercentileTDigest support for MergeAndRollup aggregation#18088
justahuman1 wants to merge 7 commits intoapache:masterfrom
justahuman1:svallabh/tdigest-merge-rollup

Conversation

@justahuman1
Copy link
Copy Markdown
Contributor

@justahuman1 justahuman1 commented Apr 2, 2026

Summary

Adds PercentileTDigestAggregator for minion merge/rollup tasks, enabling TDigest sketch merging with configurable compression factor.

  • New PercentileTDigestAggregator implementing ValueAggregator
    • Note: Creates an empty digest instead of returning empty byte[]
  • Registers PERCENTILETDIGEST and PERCENTILERAWTDIGEST in ValueAggregatorFactory and MinionConstants.AVAILABLE_CORE_VALUE_AGGREGATORS
  • Allows compressionFactor in MergeRollupTaskGenerator validation

How to Configure

{
  "MergeRollupTask": {
    "daily.mergeType": "rollup",
    "daily.bucketTimePeriod": "1d",
    "daily.bufferTimePeriod": "1d",
    "<column_name>.aggregationType": "percentiletdigest",
    "daily.aggregationFunctionParameters.<column_name>.compressionFactor": "100"
  }
}
  • The column must be BYTES type in the schema
  • compressionFactor is optional (defaults to 100)

Test plan

  • Unit tests: PercentileTDigestAggregatorTest — default and custom compression
  • Validation tests: MergeRollupTaskGeneratorTest — valid and invalid compressionFactor
  • Executor integration tests: MergeRollupTDigestTaskExecutorTest

@justahuman1 justahuman1 changed the title feat: add PercentileTDigest support for MergeAndRollup aggregation Add PercentileTDigest support for MergeAndRollup aggregation Apr 2, 2026
Add PercentileTDigestAggregator for minion merge/rollup tasks, enabling
TDigest sketch merging with configurable compression factor.

- New PercentileTDigestAggregator implementing ValueAggregator
- Register PERCENTILETDIGEST and PERCENTILERAWTDIGEST in ValueAggregatorFactory
- Add both types to AVAILABLE_CORE_VALUE_AGGREGATORS in MinionConstants
- Allow compressionFactor in MergeRollupTaskGenerator validation
- Unit tests for the aggregator with default and custom compression
- Integration test exercising the full MergeRollupTaskExecutor pipeline
  with TDigest BYTES columns, multiple dimension groups, cross-segment
  merging, skewed distributions, and edge cases
@justahuman1 justahuman1 force-pushed the svallabh/tdigest-merge-rollup branch from 859f945 to 9ac773b Compare April 2, 2026 20:50
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Apr 2, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 63.48%. Comparing base (736cbf7) to head (99aa269).
⚠️ Report is 102 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18088      +/-   ##
============================================
- Coverage     63.75%   63.48%   -0.27%     
- Complexity     1573     1628      +55     
============================================
  Files          3167     3245      +78     
  Lines        191658   197363    +5705     
  Branches      29469    30533    +1064     
============================================
+ Hits         122198   125303    +3105     
- Misses        59851    62021    +2170     
- Partials       9609    10039     +430     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.45% <100.00%> (-0.30%) ⬇️
java-21 63.46% <100.00%> (+7.37%) ⬆️
temurin 63.48% <100.00%> (-0.27%) ⬇️
unittests 63.48% <100.00%> (-0.27%) ⬇️
unittests1 55.46% <100.00%> (-0.66%) ⬇️
unittests2 34.96% <17.39%> (+0.76%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds TDigest sketch merging support to Pinot’s minion merge/rollup pipeline by introducing a new PercentileTDigestAggregator, wiring it into the aggregator factory/minion constants, and extending task-config validation (plus new unit/integration-style tests) to support a configurable compressionFactor.

Changes:

  • Add PercentileTDigestAggregator and register PERCENTILETDIGEST / PERCENTILERAWTDIGEST in ValueAggregatorFactory.
  • Extend MergeRollupTaskGenerator validation to allow and validate compressionFactor.
  • Add unit and executor tests covering TDigest merging and compressionFactor validation.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregator.java New minion rollup aggregator for merging serialized TDigests with optional compressionFactor.
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java Registers TDigest aggregator for PERCENTILETDIGEST and PERCENTILERAWTDIGEST.
pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java Adds TDigest aggregation types to available core value aggregators set.
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/.../MergeRollupTaskGenerator.java Allows/validates compressionFactor as an aggregation function parameter.
pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregatorTest.java Unit tests for default/custom compression merge behavior.
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/.../MergeRollupTDigestTaskExecutorTest.java Integration-style tests for merge/rollup executor using TDigest bytes metric.
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/.../MergeRollupTaskGeneratorTest.java Adds valid/invalid compressionFactor validation tests.

Comment on lines +41 to +46
TDigest first = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize((byte[]) value1);
TDigest second = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize((byte[]) value2);
TDigest merged = TDigest.createMergingDigest(compression);
merged.add(first);
merged.add(second);
return ObjectSerDeUtils.TDIGEST_SER_DE.serialize(merged);
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PercentileTDigestAggregator directly deserializes (byte[]) value1/value2. For BYTES columns, Pinot's default null is an empty byte[] (see FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_BYTES), and RollupReducer can aggregate such values when includeNullFields=false. Deserializing byte[0] will fail, so this aggregator should explicitly handle empty inputs (e.g., return the other value when one side is empty, and produce a serialized empty TDigest when both are empty).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed in d556a16

Comment on lines +516 to +520
// Segment 0: group1=[1..100], group2=[500..600]
List<GenericRow> seg0 = new ArrayList<>();
seg0.add(makeRow(GROUP_1, createTDigest(1, 101)));
seg0.add(makeRow(GROUP_2, createTDigest(500, 600)));
segments.add(seg0);
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is off by one: createTDigest(500, 600) adds values 500..599 (end is exclusive). Either adjust the comment to [500..599] or change the test data range to match.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

…stAggregator

- Fix MergeRollupTDigestTaskExecutorTest: wrong imports for SchemaUtils
  and TableConfigUtils (should be SchemaSerDeUtils/TableConfigSerDeUtils)
- Fix MergeRollupTaskGeneratorTest: missing ImmutableMap import
- Add empty byte[] handling to PercentileTDigestAggregator to prevent
  BufferUnderflowException when BYTES columns have default null value
  (byte[0]). Follows same pattern as DistinctCountCPCSketchAggregator.
- Add unit tests for empty byte[] aggregation cases
- Clean up test assertion style: remove unnecessary .0 suffixes
Comment on lines +38 to +39
int compression = getCompression(functionParameters);
return ObjectSerDeUtils.TDIGEST_SER_DE.serialize(TDigest.createMergingDigest(compression));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it work if you simply return empty bytes?

Copy link
Copy Markdown
Contributor Author

@justahuman1 justahuman1 Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick review!

I don't think returning empty bytes is safe. The query-time aggregation function and RollupReducer both deserialize without checking for empty bytes. At LinkedIn, we hit this exact issue with HLL in production recently where empty bytes caused BufferUnderflowException during the query execution path (I wanted to send a PR to add a guard for all aggregators separately as well if that makes sense). The reason for this empty serialization was to ensures everything downstream can parse it properly.

What do you think/recommend?


I tried to create a static variable for this (for perf reasons) but apparently byte buffers can be different for empty arrays with a different compression value. See below script for test:

import com.tdunning.math.stats.TDigest;
import com.tdunning.math.stats.MergingDigest;
import java.nio.ByteBuffer;
import java.util.Arrays;

public class TDigestTest {
    public static void main(String[] args) {
        TDigest d100 = TDigest.createMergingDigest(100);
        TDigest d200 = TDigest.createMergingDigest(200);

        ByteBuffer buf100 = ByteBuffer.allocate(d100.smallByteSize());
        d100.asSmallBytes(buf100);
        byte[] bytes100 = buf100.array();

        ByteBuffer buf200 = ByteBuffer.allocate(d200.smallByteSize());
        d200.asSmallBytes(buf200);
        byte[] bytes200 = buf200.array();

        System.out.println("compression=100 size: " + bytes100.length + " bytes: " + Arrays.toString(bytes100));
        System.out.println("compression=200 size: " + bytes200.length + " bytes: " + Arrays.toString(bytes200));
        System.out.println("equal: " + Arrays.equals(bytes100, bytes200));
    }
}

Output:

compression=100 size: 30 bytes: [0, 0, 0, 2, 127, -16, 0, 0, 0, 0, 0, 0, -1, -16, 0, 0, 0, 0, 0, 0, 66, -56, 0, 0, 0, -46, 1, -12, 0, 0]
compression=200 size: 30 bytes: [0, 0, 0, 2, 127, -16, 0, 0, 0, 0, 0, 0, -1, -16, 0, 0, 0, 0, 0, 0, 67, 72, 0, 0, 1, -102, 3, -24, 0, 0]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we allow empty bytes in the value aggregator (i.e. the raw value), we should also allow it in aggregation function and roll-up reducer. They should follow the same contract for input values

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Given that I am still new to the Pinot codebase, I would like to rely on your judgement. What do you believe is the right call? Should I make this change in the aggregation function and roll-up reducer or shall I return empty bytes as you initially suggested?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically we want to treat empty bytes as null (i.e. missing value). We should handle empty bytes in all 3 places.
It is okay to merge the current PR, and fix this in a separate PR, where we modify all different functions to follow this convention. Let me know how do you want to proceed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

@justahuman1 justahuman1 Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patience @Jackie-Jiang—was waiting for tests/builds to finish and also wanted to think this through. Updated this PR to treat empty bytes as missing in PercentileTDigestAggregator (returns empty bytes now).

On extending this to the query-time aggregation functions (TDigest, HLL, ThetaSketch, KLL, etc. — only CPC currently handles empty bytes per #17925): I've been discussing internally and there's a real tradeoff worth flagging. The recent HLL incident on our side was actually caught because queries crashed on empty bytes from a stripped defaultNullValue — if it had silently returned 0, the misconfiguration might have sat undetected producing silently wrong dashboards. Our team's take was that users should be intentional about setting defaultNullValue for BYTES columns.

That said, silent null-handling is still probably saner client-facing behavior in most cases, and extending CPC's pattern is a reasonable cleanup. Agree it should be a separate PR — want to think through the tradeoff more (and probably add metrics so silent nulls aren't invisible) before picking it up.

Let me know if you're in favor of adding this null handling throughout T-Digest (and potentially other sketches) and I will send it out in a separate PR.

- Add JavaDoc to PercentileTDigestAggregator documenting raw/non-raw support
- Add factory tests verifying both PERCENTILETDIGEST and PERCENTILERAWTDIGEST
  resolve to the same aggregator
- Remove stale "nominal entries" comment in MergeRollupTaskGenerator
- Fix off-by-one in test comment: [500..600] -> [500..599]
Preconditions.checkState(allowedFunctionParameterNames.contains(functionParameterName.toLowerCase()),
"Aggregation function parameter name must be one of [lgK, samplingProbability, nominalEntries]!");
// check that function parameter value is valid for nominal entries
"Aggregation function parameter name must be one of [lgK, samplingProbability, nominalEntries,"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider simply print out allowedFunctionParameterNames

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. In case it matters, just want to call out that this results in a cosmetic difference (the set is lowercase).

[lgk, samplingprobability, nominalentries, compressionfactor]

- Treat empty byte[] as missing value in PercentileTDigestAggregator
  (pass through instead of synthesizing an empty TDigest); full
  cross-layer fix to follow in a separate PR
- Print allowedFunctionParameterNames in the error message so it stays
  in sync with the set
- Use Map.of instead of Guava ImmutableMap in tests
@Jackie-Jiang Jackie-Jiang added ingestion Related to data ingestion pipeline feature New functionality labels Apr 18, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature New functionality ingestion Related to data ingestion pipeline

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants