Add PercentileTDigest support for MergeAndRollup aggregation#18088
Add PercentileTDigest support for MergeAndRollup aggregation#18088justahuman1 wants to merge 7 commits intoapache:masterfrom
Conversation
Add PercentileTDigestAggregator for minion merge/rollup tasks, enabling TDigest sketch merging with configurable compression factor. - New PercentileTDigestAggregator implementing ValueAggregator - Register PERCENTILETDIGEST and PERCENTILERAWTDIGEST in ValueAggregatorFactory - Add both types to AVAILABLE_CORE_VALUE_AGGREGATORS in MinionConstants - Allow compressionFactor in MergeRollupTaskGenerator validation - Unit tests for the aggregator with default and custom compression - Integration test exercising the full MergeRollupTaskExecutor pipeline with TDigest BYTES columns, multiple dimension groups, cross-segment merging, skewed distributions, and edge cases
859f945 to
9ac773b
Compare
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #18088 +/- ##
============================================
- Coverage 63.75% 63.48% -0.27%
- Complexity 1573 1628 +55
============================================
Files 3167 3245 +78
Lines 191658 197363 +5705
Branches 29469 30533 +1064
============================================
+ Hits 122198 125303 +3105
- Misses 59851 62021 +2170
- Partials 9609 10039 +430
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Adds TDigest sketch merging support to Pinot’s minion merge/rollup pipeline by introducing a new PercentileTDigestAggregator, wiring it into the aggregator factory/minion constants, and extending task-config validation (plus new unit/integration-style tests) to support a configurable compressionFactor.
Changes:
- Add
PercentileTDigestAggregatorand registerPERCENTILETDIGEST/PERCENTILERAWTDIGESTinValueAggregatorFactory. - Extend
MergeRollupTaskGeneratorvalidation to allow and validatecompressionFactor. - Add unit and executor tests covering TDigest merging and compressionFactor validation.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregator.java | New minion rollup aggregator for merging serialized TDigests with optional compressionFactor. |
| pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java | Registers TDigest aggregator for PERCENTILETDIGEST and PERCENTILERAWTDIGEST. |
| pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java | Adds TDigest aggregation types to available core value aggregators set. |
| pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/.../MergeRollupTaskGenerator.java | Allows/validates compressionFactor as an aggregation function parameter. |
| pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregatorTest.java | Unit tests for default/custom compression merge behavior. |
| pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/.../MergeRollupTDigestTaskExecutorTest.java | Integration-style tests for merge/rollup executor using TDigest bytes metric. |
| pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/.../MergeRollupTaskGeneratorTest.java | Adds valid/invalid compressionFactor validation tests. |
| TDigest first = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize((byte[]) value1); | ||
| TDigest second = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize((byte[]) value2); | ||
| TDigest merged = TDigest.createMergingDigest(compression); | ||
| merged.add(first); | ||
| merged.add(second); | ||
| return ObjectSerDeUtils.TDIGEST_SER_DE.serialize(merged); |
There was a problem hiding this comment.
PercentileTDigestAggregator directly deserializes (byte[]) value1/value2. For BYTES columns, Pinot's default null is an empty byte[] (see FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_BYTES), and RollupReducer can aggregate such values when includeNullFields=false. Deserializing byte[0] will fail, so this aggregator should explicitly handle empty inputs (e.g., return the other value when one side is empty, and produce a serialized empty TDigest when both are empty).
| // Segment 0: group1=[1..100], group2=[500..600] | ||
| List<GenericRow> seg0 = new ArrayList<>(); | ||
| seg0.add(makeRow(GROUP_1, createTDigest(1, 101))); | ||
| seg0.add(makeRow(GROUP_2, createTDigest(500, 600))); | ||
| segments.add(seg0); |
There was a problem hiding this comment.
This comment is off by one: createTDigest(500, 600) adds values 500..599 (end is exclusive). Either adjust the comment to [500..599] or change the test data range to match.
…stAggregator - Fix MergeRollupTDigestTaskExecutorTest: wrong imports for SchemaUtils and TableConfigUtils (should be SchemaSerDeUtils/TableConfigSerDeUtils) - Fix MergeRollupTaskGeneratorTest: missing ImmutableMap import - Add empty byte[] handling to PercentileTDigestAggregator to prevent BufferUnderflowException when BYTES columns have default null value (byte[0]). Follows same pattern as DistinctCountCPCSketchAggregator. - Add unit tests for empty byte[] aggregation cases - Clean up test assertion style: remove unnecessary .0 suffixes
| int compression = getCompression(functionParameters); | ||
| return ObjectSerDeUtils.TDIGEST_SER_DE.serialize(TDigest.createMergingDigest(compression)); |
There was a problem hiding this comment.
Does it work if you simply return empty bytes?
There was a problem hiding this comment.
Thanks for the quick review!
I don't think returning empty bytes is safe. The query-time aggregation function and RollupReducer both deserialize without checking for empty bytes. At LinkedIn, we hit this exact issue with HLL in production recently where empty bytes caused BufferUnderflowException during the query execution path (I wanted to send a PR to add a guard for all aggregators separately as well if that makes sense). The reason for this empty serialization was to ensures everything downstream can parse it properly.
What do you think/recommend?
I tried to create a static variable for this (for perf reasons) but apparently byte buffers can be different for empty arrays with a different compression value. See below script for test:
import com.tdunning.math.stats.TDigest;
import com.tdunning.math.stats.MergingDigest;
import java.nio.ByteBuffer;
import java.util.Arrays;
public class TDigestTest {
public static void main(String[] args) {
TDigest d100 = TDigest.createMergingDigest(100);
TDigest d200 = TDigest.createMergingDigest(200);
ByteBuffer buf100 = ByteBuffer.allocate(d100.smallByteSize());
d100.asSmallBytes(buf100);
byte[] bytes100 = buf100.array();
ByteBuffer buf200 = ByteBuffer.allocate(d200.smallByteSize());
d200.asSmallBytes(buf200);
byte[] bytes200 = buf200.array();
System.out.println("compression=100 size: " + bytes100.length + " bytes: " + Arrays.toString(bytes100));
System.out.println("compression=200 size: " + bytes200.length + " bytes: " + Arrays.toString(bytes200));
System.out.println("equal: " + Arrays.equals(bytes100, bytes200));
}
}Output:
compression=100 size: 30 bytes: [0, 0, 0, 2, 127, -16, 0, 0, 0, 0, 0, 0, -1, -16, 0, 0, 0, 0, 0, 0, 66, -56, 0, 0, 0, -46, 1, -12, 0, 0]
compression=200 size: 30 bytes: [0, 0, 0, 2, 127, -16, 0, 0, 0, 0, 0, 0, -1, -16, 0, 0, 0, 0, 0, 0, 67, 72, 0, 0, 1, -102, 3, -24, 0, 0]
There was a problem hiding this comment.
If we allow empty bytes in the value aggregator (i.e. the raw value), we should also allow it in aggregation function and roll-up reducer. They should follow the same contract for input values
There was a problem hiding this comment.
Good point. Given that I am still new to the Pinot codebase, I would like to rely on your judgement. What do you believe is the right call? Should I make this change in the aggregation function and roll-up reducer or shall I return empty bytes as you initially suggested?
There was a problem hiding this comment.
Basically we want to treat empty bytes as null (i.e. missing value). We should handle empty bytes in all 3 places.
It is okay to merge the current PR, and fix this in a separate PR, where we modify all different functions to follow this convention. Let me know how do you want to proceed.
There was a problem hiding this comment.
Thanks for the patience @Jackie-Jiang—was waiting for tests/builds to finish and also wanted to think this through. Updated this PR to treat empty bytes as missing in PercentileTDigestAggregator (returns empty bytes now).
On extending this to the query-time aggregation functions (TDigest, HLL, ThetaSketch, KLL, etc. — only CPC currently handles empty bytes per #17925): I've been discussing internally and there's a real tradeoff worth flagging. The recent HLL incident on our side was actually caught because queries crashed on empty bytes from a stripped defaultNullValue — if it had silently returned 0, the misconfiguration might have sat undetected producing silently wrong dashboards. Our team's take was that users should be intentional about setting defaultNullValue for BYTES columns.
That said, silent null-handling is still probably saner client-facing behavior in most cases, and extending CPC's pattern is a reasonable cleanup. Agree it should be a separate PR — want to think through the tradeoff more (and probably add metrics so silent nulls aren't invisible) before picking it up.
Let me know if you're in favor of adding this null handling throughout T-Digest (and potentially other sketches) and I will send it out in a separate PR.
- Add JavaDoc to PercentileTDigestAggregator documenting raw/non-raw support - Add factory tests verifying both PERCENTILETDIGEST and PERCENTILERAWTDIGEST resolve to the same aggregator - Remove stale "nominal entries" comment in MergeRollupTaskGenerator - Fix off-by-one in test comment: [500..600] -> [500..599]
| Preconditions.checkState(allowedFunctionParameterNames.contains(functionParameterName.toLowerCase()), | ||
| "Aggregation function parameter name must be one of [lgK, samplingProbability, nominalEntries]!"); | ||
| // check that function parameter value is valid for nominal entries | ||
| "Aggregation function parameter name must be one of [lgK, samplingProbability, nominalEntries," |
There was a problem hiding this comment.
Consider simply print out allowedFunctionParameterNames
There was a problem hiding this comment.
Done. In case it matters, just want to call out that this results in a cosmetic difference (the set is lowercase).
[lgk, samplingprobability, nominalentries, compressionfactor]
- Treat empty byte[] as missing value in PercentileTDigestAggregator (pass through instead of synthesizing an empty TDigest); full cross-layer fix to follow in a separate PR - Print allowedFunctionParameterNames in the error message so it stays in sync with the set - Use Map.of instead of Guava ImmutableMap in tests
Summary
Adds
PercentileTDigestAggregatorfor minion merge/rollup tasks, enabling TDigest sketch merging with configurable compression factor.PercentileTDigestAggregatorimplementingValueAggregatorbyte[]PERCENTILETDIGESTandPERCENTILERAWTDIGESTinValueAggregatorFactoryandMinionConstants.AVAILABLE_CORE_VALUE_AGGREGATORScompressionFactorinMergeRollupTaskGeneratorvalidationHow to Configure
{ "MergeRollupTask": { "daily.mergeType": "rollup", "daily.bucketTimePeriod": "1d", "daily.bufferTimePeriod": "1d", "<column_name>.aggregationType": "percentiletdigest", "daily.aggregationFunctionParameters.<column_name>.compressionFactor": "100" } }BYTEStype in the schemacompressionFactoris optional (defaults to 100)Test plan
PercentileTDigestAggregatorTest— default and custom compressionMergeRollupTaskGeneratorTest— valid and invalidcompressionFactorMergeRollupTDigestTaskExecutorTest