Add V2 batch format with statistics collection#2886
Add V2 batch format with statistics collection#2886platinumhamburg wants to merge 1 commit intoapache:mainfrom
Conversation
af26717 to
836948c
Compare
There was a problem hiding this comment.
Pull request overview
This PR introduces a new V2 Arrow log record batch format that can embed per-column min/max + null-count statistics between the batch header and record payload, enabling future filter pushdown improvements (Issue #2885 / FIP-10).
Changes:
- Add statistics collection/serialization/parsing APIs and implementations (collector/writer/parser + batch-access API).
- Extend Arrow log batch building, reading, and projection logic to account for a V2 layout with an optional statistics section and a
StatisticsLengthheader field. - Add table-level configuration (
table.statistics.columns) utilities and validation, and wire statistics collection into the client write path.
Reviewed changes
Copilot reviewed 32 out of 32 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-server/src/main/java/org/apache/fluss/server/kv/wal/ArrowWalBuilder.java | Adapts builder call sites to new statistics-capable Arrow batch builder signature. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java | Validates table statistics configuration on CREATE TABLE. |
| fluss-common/src/test/java/org/apache/fluss/record/TestData.java | Adds schemas/data for statistics-related tests. |
| fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java | Extends batch builder tests to cover V2 + statistics. |
| fluss-common/src/test/java/org/apache/fluss/record/LogRecordBatchStatisticsTestUtils.java | Adds reusable utilities for generating batches with statistics in tests. |
| fluss-common/src/test/java/org/apache/fluss/record/LogRecordBatchStatisticsTest.java | Adds end-to-end tests for statistics extraction/caching across batch impls. |
| fluss-common/src/test/java/org/apache/fluss/record/LogRecordBatchStatisticsParserTest.java | Adds tests for statistics parsing/validation helpers. |
| fluss-common/src/test/java/org/apache/fluss/record/LogRecordBatchStatisticsCollectorTest.java | Adds tests for collector behavior across types/nulls/mappings. |
| fluss-common/src/test/java/org/apache/fluss/record/LogRecordBatchFormatTest.java | Adds V2 header/offset assertions and stats-aware offsets. |
| fluss-common/src/test/java/org/apache/fluss/config/StatisticsConfigUtilsTest.java | Adds tests for statistics config validation. |
| fluss-common/src/main/java/org/apache/fluss/types/DataTypeChecks.java | Adds isBinaryType helper used by stats config/mapping logic. |
| fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowWriter.java | Exposes RowType schema via getSchema() for stats wiring. |
| fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRow.java | Adds AlignedRow.from(RowType, InternalRow) conversion helper. |
| fluss-common/src/main/java/org/apache/fluss/record/bytesview/MultiBytesView.java | Enhances builder API (addBytes(BytesView), isEmpty) and improves file-region merging. |
| fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java | Implements V2 batch building with optional embedded statistics + CRC handling changes. |
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatchStatisticsWriter.java | New: serializes statistics in a compact schema-aware format. |
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatchStatisticsParser.java | New: parses/validates statistics payloads from multiple memory sources. |
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatchStatisticsCollector.java | New: collects per-column min/max/null-counts during batch build. |
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatchStatistics.java | New: statistics interface exposed from LogRecordBatch. |
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatchFormat.java | Adds V2 constants/layout helpers, statistics offsets, and header mutation helper. |
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java | Adds getStatistics(ReadContext) API to batches. |
| fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java | Updates projection to skip stats section for V2 and clear stats header fields. |
| fluss-common/src/main/java/org/apache/fluss/record/FileLogInputStream.java | Adds stats parsing and provides optional “trim stats” BytesView generation for V2. |
| fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatchStatistics.java | New: zero-copy statistics view with full-schema wrappers + mapping logic. |
| fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java | Updates record decoding offsets for V2 (stats-aware) and implements statistics parsing/caching. |
| fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java | Computes and caches stats column index mappings based on table config. |
| fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java | Adds statistics config accessors and enablement checks. |
| fluss-common/src/main/java/org/apache/fluss/config/StatisticsConfigUtils.java | New: validates table.statistics.columns against schema + supported types. |
| fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java | Adds table.statistics.columns option definition/documentation. |
| fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java | Updates tests for ArrowLogWriteBatch constructor signature. |
| fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java | Wires per-table statistics collection into batch creation when enabled. |
| fluss-client/src/main/java/org/apache/fluss/client/write/ArrowLogWriteBatch.java | Passes statistics collector into Arrow batch builder. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatchStatistics.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/record/FileLogInputStream.java
Show resolved
Hide resolved
fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java
Outdated
Show resolved
Hide resolved
836948c to
51dd84c
Compare
Introduce V2 batch format that collects min/max statistics for each column to enable efficient filtering. - Add LogRecordBatchStatistics and related classes for statistics collection - Add StatisticsConfigUtils for parsing table.statistics.columns configuration - Extend DefaultLogRecordBatch to support V2 format with statistics - Place statistics data between header and records with StatisticsLength field - Add comprehensive tests for statistics collection and parsing
51dd84c to
e595708
Compare
wuchong
left a comment
There was a problem hiding this comment.
Thanks @platinumhamburg , I left some comments.
| * | ||
| * @return true if configured with "*" (collect all non-binary columns), false otherwise | ||
| */ | ||
| public boolean isCollectAllNonBinaryColumns() { |
| .map(String::trim) | ||
| .filter(s -> !s.isEmpty()) | ||
| .collect(Collectors.toList()); | ||
| return Optional.of(columns); |
There was a problem hiding this comment.
nit: Besides, the logic seems is a little duplicated with StatisticsConfigUtils. Maybe we can reuse some code.
| public static final ConfigOption<String> TABLE_STATISTICS_COLUMNS = | ||
| key("table.statistics.columns") | ||
| .stringType() | ||
| .defaultValue("*") |
There was a problem hiding this comment.
We should use noDefaultValue() or '' as default value. Forcing all columns to collect stats is a big overhead and introduces regression to users.
| public static void validateStatisticsConfig(TableDescriptor tableDescriptor) { | ||
| Map<String, String> properties = tableDescriptor.getProperties(); | ||
| String statisticsColumns = | ||
| properties.getOrDefault(ConfigOptions.TABLE_STATISTICS_COLUMNS.key(), "*"); |
There was a problem hiding this comment.
Default should be null or empty.
| return new MemoryLogRecordsArrowBuilder( | ||
| BUILDER_DEFAULT_OFFSET, | ||
| schemaId, | ||
| CURRENT_LOG_MAGIC_VALUE, |
There was a problem hiding this comment.
- mark the
statisticsCollectorparameter as@Nullable. - we should use version V0 if statisticsCollector is null, and use V1 if is not null. Otherwise, we always use the V0 (
CURRENT_LOG_MAGIC_VALUEis V0), and the stats is not serialized.
| AbstractPagedOutputView outputView, | ||
| long createdMs) { | ||
| long createdMs, | ||
| LogRecordBatchStatisticsCollector statisticsCollector) { |
| * @param bytesView the bytes view to add | ||
| * @return this builder instance for method chaining | ||
| */ | ||
| public Builder addBytes(BytesView bytesView) { |
| /** Gets whether statistics collection is enabled for the table. */ | ||
| public boolean isStatisticsEnabled() { | ||
| String columnsStr = config.get(ConfigOptions.TABLE_STATISTICS_COLUMNS); | ||
| return !columnsStr.isEmpty(); |
There was a problem hiding this comment.
Should be return columnsStr != null && !columnsStr.isEmpty(); if we making TABLE_STATISTICS_COLUMNS defaults to null.
| + "The value '*' (default) means collect statistics for all non-binary columns. " | ||
| + "Comma-separated list of column names means collect statistics only for the specified columns. " | ||
| + "Binary and bytes columns are not supported for statistics collection. " | ||
| + "Example: 'id,name,timestamp' to collect statistics only for specified columns."); |
There was a problem hiding this comment.
Could you also update the documentation in Markdown? Additionally, we need to address compatibility concerns. As I understand it, enabling column statistics requires a higher format version, which mandates that downstream jobs be upgraded to the latest version first (Fluss v1.0 or later), otherwise the consumer jobs will fail when parsing V1/V2 format logs. Please ensure this requirement is clearly stated in the documentation.
Introduce V2 batch format that collects min/max statistics for each column to enable efficient filtering.
Purpose
Linked issue: close #2885
Brief change log
Tests
API and Format
Documentation