Skip to content

Commit e595708

Browse files
Add V2 batch format with statistics collection
Introduce V2 batch format that collects min/max statistics for each column to enable efficient filtering. - Add LogRecordBatchStatistics and related classes for statistics collection - Add StatisticsConfigUtils for parsing table.statistics.columns configuration - Extend DefaultLogRecordBatch to support V2 format with statistics - Place statistics data between header and records with StatisticsLength field - Add comprehensive tests for statistics collection and parsing
1 parent 0f4d865 commit e595708

32 files changed

Lines changed: 4319 additions & 66 deletions

fluss-client/src/main/java/org/apache/fluss/client/write/ArrowLogWriteBatch.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.fluss.memory.MemorySegment;
2424
import org.apache.fluss.metadata.PhysicalTablePath;
2525
import org.apache.fluss.record.ChangeType;
26+
import org.apache.fluss.record.LogRecordBatchStatisticsCollector;
2627
import org.apache.fluss.record.MemoryLogRecordsArrowBuilder;
2728
import org.apache.fluss.record.bytesview.BytesView;
2829
import org.apache.fluss.row.InternalRow;
@@ -55,11 +56,13 @@ public ArrowLogWriteBatch(
5556
int schemaId,
5657
ArrowWriter arrowWriter,
5758
AbstractPagedOutputView outputView,
58-
long createdMs) {
59+
long createdMs,
60+
LogRecordBatchStatisticsCollector statisticsCollector) {
5961
super(bucketId, physicalTablePath, createdMs);
6062
this.outputView = outputView;
6163
this.recordsBuilder =
62-
MemoryLogRecordsArrowBuilder.builder(schemaId, arrowWriter, outputView, true);
64+
MemoryLogRecordsArrowBuilder.builder(
65+
schemaId, arrowWriter, outputView, true, statisticsCollector);
6366
}
6467

6568
@Override

fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.fluss.metadata.TableBucket;
3333
import org.apache.fluss.metadata.TableInfo;
3434
import org.apache.fluss.metrics.MetricNames;
35+
import org.apache.fluss.record.LogRecordBatchStatisticsCollector;
3536
import org.apache.fluss.row.arrow.ArrowWriter;
3637
import org.apache.fluss.row.arrow.ArrowWriterPool;
3738
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
@@ -628,13 +629,20 @@ private WriteBatch createWriteBatch(
628629
outputView.getPreAllocatedSize(),
629630
tableInfo.getRowType(),
630631
tableInfo.getTableConfig().getArrowCompressionInfo());
632+
LogRecordBatchStatisticsCollector statisticsCollector = null;
633+
if (tableInfo.isStatisticsEnabled()) {
634+
statisticsCollector =
635+
new LogRecordBatchStatisticsCollector(
636+
tableInfo.getRowType(), tableInfo.getStatsIndexMapping());
637+
}
631638
return new ArrowLogWriteBatch(
632639
bucketId,
633640
physicalTablePath,
634641
tableInfo.getSchemaId(),
635642
arrowWriter,
636643
outputView,
637-
clock.milliseconds());
644+
clock.milliseconds(),
645+
statisticsCollector);
638646

639647
case COMPACTED_LOG:
640648
return new CompactedLogWriteBatch(

fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,8 @@ void testAppendWithPreAllocatedMemorySegments() throws Exception {
136136
DATA1_ROW_TYPE,
137137
DEFAULT_COMPRESSION),
138138
new PreAllocatedPagedOutputView(memorySegmentList),
139-
System.currentTimeMillis());
139+
System.currentTimeMillis(),
140+
null);
140141
assertThat(arrowLogWriteBatch.pooledMemorySegments()).isEqualTo(memorySegmentList);
141142

142143
int count = 0;
@@ -210,7 +211,8 @@ void testArrowCompressionRatioEstimated() throws Exception {
210211
DATA1_TABLE_INFO.getSchemaId(),
211212
arrowWriter,
212213
new PreAllocatedPagedOutputView(memorySegmentList),
213-
System.currentTimeMillis());
214+
System.currentTimeMillis(),
215+
null);
214216

215217
int recordCount = 0;
216218
while (arrowLogWriteBatch.tryAppend(
@@ -310,7 +312,8 @@ private ArrowLogWriteBatch createArrowLogWriteBatch(TableBucket tb, int maxSizeI
310312
DATA1_ROW_TYPE,
311313
DEFAULT_COMPRESSION),
312314
new UnmanagedPagedOutputView(128),
313-
System.currentTimeMillis());
315+
System.currentTimeMillis(),
316+
null);
314317
}
315318

316319
private WriteCallback newWriteCallback() {

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1579,6 +1579,18 @@ public class ConfigOptions {
15791579
+ "This mode reduces storage and transmission costs but loses the ability to track previous values. "
15801580
+ "This option only affects primary key tables.");
15811581

1582+
public static final ConfigOption<String> TABLE_STATISTICS_COLUMNS =
1583+
key("table.statistics.columns")
1584+
.stringType()
1585+
.defaultValue("*")
1586+
.withDescription(
1587+
"Configures statistics collection for the table. "
1588+
+ "Empty string ('') means disable statistics collection completely. "
1589+
+ "The value '*' (default) means collect statistics for all non-binary columns. "
1590+
+ "Comma-separated list of column names means collect statistics only for the specified columns. "
1591+
+ "Binary and bytes columns are not supported for statistics collection. "
1592+
+ "Example: 'id,name,timestamp' to collect statistics only for specified columns.");
1593+
15821594
// ------------------------------------------------------------------------
15831595
// ConfigOptions for Kv
15841596
// ------------------------------------------------------------------------
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.config;
19+
20+
import org.apache.fluss.annotation.Internal;
21+
import org.apache.fluss.exception.InvalidConfigException;
22+
import org.apache.fluss.metadata.TableDescriptor;
23+
import org.apache.fluss.types.DataField;
24+
import org.apache.fluss.types.DataType;
25+
import org.apache.fluss.types.DataTypeChecks;
26+
import org.apache.fluss.types.RowType;
27+
28+
import java.util.Arrays;
29+
import java.util.List;
30+
import java.util.Map;
31+
import java.util.stream.Collectors;
32+
33+
/**
34+
* Utility class for validating table statistics configuration.
35+
*
36+
* <p>This provides simple validation methods that can be called during CREATE TABLE operations to
37+
* ensure statistics configuration is valid and compatible with the table schema.
38+
*/
39+
@Internal
40+
public class StatisticsConfigUtils {
41+
42+
private StatisticsConfigUtils() {}
43+
44+
/**
45+
* Validates statistics configuration for a table descriptor.
46+
*
47+
* @param tableDescriptor the table descriptor to validate
48+
* @throws InvalidConfigException if the statistics configuration is invalid
49+
*/
50+
public static void validateStatisticsConfig(TableDescriptor tableDescriptor) {
51+
Map<String, String> properties = tableDescriptor.getProperties();
52+
String statisticsColumns =
53+
properties.getOrDefault(ConfigOptions.TABLE_STATISTICS_COLUMNS.key(), "*");
54+
55+
// Empty string means statistics disabled - no validation needed
56+
if (statisticsColumns.isEmpty()) {
57+
return;
58+
}
59+
60+
RowType rowType = tableDescriptor.getSchema().getRowType();
61+
62+
// Wildcard means all non-binary columns - no validation needed
63+
if ("*".equals(statisticsColumns.trim())) {
64+
return;
65+
}
66+
67+
// Parse and validate specific column names
68+
List<String> columnNames = parseColumnNames(statisticsColumns);
69+
if (columnNames.isEmpty()) {
70+
throw new InvalidConfigException(
71+
"Statistics columns configuration cannot be empty. "
72+
+ "Use '*' to collect statistics for all non-binary columns, "
73+
+ "or use empty string '' to disable statistics collection.");
74+
}
75+
76+
validateColumns(rowType, columnNames);
77+
}
78+
79+
/**
80+
* Parses comma-separated column names from the configuration string.
81+
*
82+
* @param columnsConfig the configuration string
83+
* @return list of parsed column names
84+
*/
85+
private static List<String> parseColumnNames(String columnsConfig) {
86+
return Arrays.stream(columnsConfig.split(","))
87+
.map(String::trim)
88+
.filter(s -> !s.isEmpty())
89+
.collect(Collectors.toList());
90+
}
91+
92+
/**
93+
* Validates that the specified columns exist in the schema and are of supported types.
94+
*
95+
* @param rowType the table schema
96+
* @param statisticsColumns the list of column names to validate
97+
* @throws InvalidConfigException if validation fails
98+
*/
99+
private static void validateColumns(RowType rowType, List<String> statisticsColumns) {
100+
Map<String, DataType> columnTypeMap = buildColumnTypeMap(rowType);
101+
102+
for (String columnName : statisticsColumns) {
103+
// Check if column exists
104+
if (!columnTypeMap.containsKey(columnName)) {
105+
throw new InvalidConfigException(
106+
String.format(
107+
"Column '%s' specified in statistics collection does not exist in table schema",
108+
columnName));
109+
}
110+
111+
// Check if column type is supported
112+
DataType dataType = columnTypeMap.get(columnName);
113+
if (DataTypeChecks.isBinaryType(dataType)) {
114+
throw new InvalidConfigException(
115+
String.format(
116+
"Binary column '%s' cannot be included in statistics collection. "
117+
+ "Binary and bytes columns are not supported for statistics collection.",
118+
columnName));
119+
}
120+
}
121+
}
122+
123+
/**
124+
* Builds a map from column name to data type for quick lookup.
125+
*
126+
* @param rowType the table schema
127+
* @return map of column name to data type
128+
*/
129+
private static Map<String, DataType> buildColumnTypeMap(RowType rowType) {
130+
return rowType.getFields().stream()
131+
.collect(Collectors.toMap(DataField::getName, DataField::getType));
132+
}
133+
}

fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@
2828
import org.apache.fluss.utils.AutoPartitionStrategy;
2929

3030
import java.time.Duration;
31+
import java.util.Arrays;
32+
import java.util.List;
3133
import java.util.Optional;
34+
import java.util.stream.Collectors;
3235

3336
/**
3437
* Helper class to get table configs (prefixed with "table.*" properties).
@@ -154,4 +157,42 @@ public AutoPartitionStrategy getAutoPartitionStrategy() {
154157
public long getAutoIncrementCacheSize() {
155158
return config.get(ConfigOptions.TABLE_AUTO_INCREMENT_CACHE_SIZE);
156159
}
160+
161+
/** Gets whether statistics collection is enabled for the table. */
162+
public boolean isStatisticsEnabled() {
163+
String columnsStr = config.get(ConfigOptions.TABLE_STATISTICS_COLUMNS);
164+
return !columnsStr.isEmpty();
165+
}
166+
167+
/**
168+
* Gets the statistics columns configuration of the table.
169+
*
170+
* @return Optional containing the list of column names if specific columns are configured,
171+
* empty if all non-binary columns should be collected ("*" configuration), null if
172+
* statistics collection is disabled (empty string configuration)
173+
*/
174+
public Optional<List<String>> getStatisticsColumns() {
175+
String columnsStr = config.get(ConfigOptions.TABLE_STATISTICS_COLUMNS);
176+
if (columnsStr.isEmpty()) {
177+
return null; // null means statistics collection is disabled
178+
}
179+
if ("*".equals(columnsStr)) {
180+
return Optional.empty(); // Empty means collect all non-binary columns
181+
}
182+
List<String> columns =
183+
Arrays.stream(columnsStr.split(","))
184+
.map(String::trim)
185+
.filter(s -> !s.isEmpty())
186+
.collect(Collectors.toList());
187+
return Optional.of(columns);
188+
}
189+
190+
/**
191+
* Checks whether the table is configured to collect statistics for all non-binary columns.
192+
*
193+
* @return true if configured with "*" (collect all non-binary columns), false otherwise
194+
*/
195+
public boolean isCollectAllNonBinaryColumns() {
196+
return "*".equals(config.get(ConfigOptions.TABLE_STATISTICS_COLUMNS));
197+
}
157198
}

0 commit comments

Comments
 (0)