From 2942dfe79842aa9d35ce4b68ddc9add3b19476a2 Mon Sep 17 00:00:00 2001 From: mingfeng Date: Wed, 7 Jan 2026 10:40:39 +0800 Subject: [PATCH 1/5] [wip][core]PartitionStatistics support bucket count --- docs/static/rest-catalog-open-api.yaml | 6 +++ .../apache/paimon/partition/Partition.java | 7 ++- .../paimon/partition/PartitionStatistics.java | 21 ++++++-- .../paimon/partition/PartitionTest.java | 4 ++ .../org/apache/paimon/catalog/Catalog.java | 1 + .../paimon/manifest/PartitionEntry.java | 48 +++++++++++++++---- .../operation/commit/CommitScanner.java | 13 +++++ .../paimon/table/format/FormatTableScan.java | 2 +- .../paimon/table/system/PartitionsTable.java | 3 +- .../utils/PartitionStatisticsReporter.java | 17 ++++++- .../partition/PartitionExpireTableTest.java | 2 +- .../apache/paimon/rest/MockRESTMessage.java | 2 +- .../apache/paimon/rest/RESTCatalogServer.java | 2 + .../paimon/flink/RESTCatalogITCase.java | 48 +++++++++++++++++++ .../org/apache/paimon/hive/HiveCatalog.java | 6 +++ .../apache/paimon/hive/HiveCatalogTest.java | 3 +- 16 files changed, 166 insertions(+), 19 deletions(-) diff --git a/docs/static/rest-catalog-open-api.yaml b/docs/static/rest-catalog-open-api.yaml index fcf5c53d11a0..47585a1f1381 100644 --- a/docs/static/rest-catalog-open-api.yaml +++ b/docs/static/rest-catalog-open-api.yaml @@ -3252,6 +3252,9 @@ components: lastFileCreationTime: type: integer format: int64 + bucketCount: + type: integer + format: int32 done: type: boolean createdAt: @@ -3283,6 +3286,9 @@ components: type: integer format: int64 lastFileCreationTime: + type: integer + format: int32 + bucketCount: type: integer format: int64 ####################################### diff --git a/paimon-api/src/main/java/org/apache/paimon/partition/Partition.java b/paimon-api/src/main/java/org/apache/paimon/partition/Partition.java index 427b61464f4d..85f1e945c982 100644 --- a/paimon-api/src/main/java/org/apache/paimon/partition/Partition.java +++ b/paimon-api/src/main/java/org/apache/paimon/partition/Partition.java @@ -81,13 +81,14 @@ public Partition( @JsonProperty(FIELD_FILE_SIZE_IN_BYTES) long fileSizeInBytes, @JsonProperty(FIELD_FILE_COUNT) long fileCount, @JsonProperty(FIELD_LAST_FILE_CREATION_TIME) long lastFileCreationTime, + @JsonProperty(FIELD_BUCKET_COUNT) int bucketCount, @JsonProperty(FIELD_DONE) boolean done, @JsonProperty(FIELD_CREATED_AT) @Nullable Long createdAt, @JsonProperty(FIELD_CREATED_BY) @Nullable String createdBy, @JsonProperty(FIELD_UPDATED_AT) @Nullable Long updatedAt, @JsonProperty(FIELD_UPDATED_BY) @Nullable String updatedBy, @JsonProperty(FIELD_OPTIONS) @Nullable Map options) { - super(spec, recordCount, fileSizeInBytes, fileCount, lastFileCreationTime); + super(spec, recordCount, fileSizeInBytes, fileCount, lastFileCreationTime, bucketCount); this.done = done; this.createdAt = createdAt; this.createdBy = createdBy; @@ -102,6 +103,7 @@ public Partition( long fileSizeInBytes, long fileCount, long lastFileCreationTime, + int bucketCount, boolean done) { this( spec, @@ -109,6 +111,7 @@ public Partition( fileSizeInBytes, fileCount, lastFileCreationTime, + bucketCount, done, null, null, @@ -188,6 +191,8 @@ public String toString() { + fileCount + ", lastFileCreationTime=" + lastFileCreationTime + + ", bucketCount=" + + bucketCount + ", done=" + done + ", createdAt=" diff --git a/paimon-api/src/main/java/org/apache/paimon/partition/PartitionStatistics.java b/paimon-api/src/main/java/org/apache/paimon/partition/PartitionStatistics.java index 44b61c19d5f5..40d754601227 100644 --- a/paimon-api/src/main/java/org/apache/paimon/partition/PartitionStatistics.java +++ b/paimon-api/src/main/java/org/apache/paimon/partition/PartitionStatistics.java @@ -44,6 +44,7 @@ public class PartitionStatistics implements Serializable { public static final String FIELD_FILE_SIZE_IN_BYTES = "fileSizeInBytes"; public static final String FIELD_FILE_COUNT = "fileCount"; public static final String FIELD_LAST_FILE_CREATION_TIME = "lastFileCreationTime"; + public static final String FIELD_BUCKET_COUNT = "bucketCount"; @JsonProperty(FIELD_SPEC) protected final Map spec; @@ -60,18 +61,23 @@ public class PartitionStatistics implements Serializable { @JsonProperty(FIELD_LAST_FILE_CREATION_TIME) protected final long lastFileCreationTime; + @JsonProperty(FIELD_BUCKET_COUNT) + protected final int bucketCount; + @JsonCreator public PartitionStatistics( @JsonProperty(FIELD_SPEC) Map spec, @JsonProperty(FIELD_RECORD_COUNT) long recordCount, @JsonProperty(FIELD_FILE_SIZE_IN_BYTES) long fileSizeInBytes, @JsonProperty(FIELD_FILE_COUNT) long fileCount, - @JsonProperty(FIELD_LAST_FILE_CREATION_TIME) long lastFileCreationTime) { + @JsonProperty(FIELD_LAST_FILE_CREATION_TIME) long lastFileCreationTime, + @JsonProperty(FIELD_BUCKET_COUNT) int bucketCount) { this.spec = spec; this.recordCount = recordCount; this.fileSizeInBytes = fileSizeInBytes; this.fileCount = fileCount; this.lastFileCreationTime = lastFileCreationTime; + this.bucketCount = bucketCount; } @JsonGetter(FIELD_SPEC) @@ -99,6 +105,11 @@ public long lastFileCreationTime() { return lastFileCreationTime; } + @JsonGetter(FIELD_BUCKET_COUNT) + public int bucketCount() { + return bucketCount; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -112,12 +123,14 @@ public boolean equals(Object o) { && fileSizeInBytes == that.fileSizeInBytes && fileCount == that.fileCount && lastFileCreationTime == that.lastFileCreationTime - && Objects.equals(spec, that.spec); + && Objects.equals(spec, that.spec) + && Objects.equals(bucketCount, that.bucketCount); } @Override public int hashCode() { - return Objects.hash(spec, recordCount, fileSizeInBytes, fileCount, lastFileCreationTime); + return Objects.hash( + spec, recordCount, fileSizeInBytes, fileCount, lastFileCreationTime, bucketCount); } @Override @@ -133,6 +146,8 @@ public String toString() { + fileCount + ", lastFileCreationTime=" + lastFileCreationTime + + ", bucketCount=" + + bucketCount + '}'; } } diff --git a/paimon-api/src/test/java/org/apache/paimon/partition/PartitionTest.java b/paimon-api/src/test/java/org/apache/paimon/partition/PartitionTest.java index 7ec342d38deb..5d2cfabe07b6 100644 --- a/paimon-api/src/test/java/org/apache/paimon/partition/PartitionTest.java +++ b/paimon-api/src/test/java/org/apache/paimon/partition/PartitionTest.java @@ -40,6 +40,7 @@ void testJsonSerializationWithNullValues() { 1024L, // fileSizeInBytes 2L, // fileCount System.currentTimeMillis(), // lastFileCreationTime + 10, // bucketCount false, // done null, // createdAt null, // createdBy @@ -57,6 +58,7 @@ void testJsonSerializationWithNullValues() { assertThat(json).contains("done"); assertThat(json).contains("recordCount"); + assertThat(json).contains("bucketCount"); } @Test @@ -69,6 +71,7 @@ void testJsonSerializationWithNonNullValues() { 1024L, 2L, System.currentTimeMillis(), + 10, // bucketCount true, 1234567890L, // createdAt "user1", // createdBy @@ -78,6 +81,7 @@ void testJsonSerializationWithNonNullValues() { String json = JsonSerdeUtil.toFlatJson(partition); + assertThat(json).contains("bucketCount"); assertThat(json).contains("createdAt"); assertThat(json).contains("createdBy"); assertThat(json).contains("updatedAt"); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 74e35dde3dac..ad47b8555ed2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -1073,6 +1073,7 @@ List authTableQuery(Identifier identifier, @Nullable List select String NUM_FILES_PROP = "numFiles"; String TOTAL_SIZE_PROP = "totalSize"; String LAST_UPDATE_TIME_PROP = "lastUpdateTime"; + String BUCKET_COUNT = "bucketCount"; // ======================= Exceptions =============================== diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java index d3417bb71c33..2cdb52d269bb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Optional; import static org.apache.paimon.manifest.FileKind.ADD; import static org.apache.paimon.manifest.FileKind.DELETE; @@ -43,18 +44,21 @@ public class PartitionEntry { private final long fileSizeInBytes; private final long fileCount; private final long lastFileCreationTime; + private final int bucketCount; public PartitionEntry( BinaryRow partition, long recordCount, long fileSizeInBytes, long fileCount, - long lastFileCreationTime) { + long lastFileCreationTime, + int bucketCount) { this.partition = partition; this.recordCount = recordCount; this.fileSizeInBytes = fileSizeInBytes; this.fileCount = fileCount; this.lastFileCreationTime = lastFileCreationTime; + this.bucketCount = bucketCount; } public BinaryRow partition() { @@ -83,7 +87,8 @@ public PartitionEntry merge(PartitionEntry entry) { recordCount + entry.recordCount, fileSizeInBytes + entry.fileSizeInBytes, fileCount + entry.fileCount, - Math.max(lastFileCreationTime, entry.lastFileCreationTime)); + Math.max(lastFileCreationTime, entry.lastFileCreationTime), + Math.max(bucketCount, entry.bucketCount)); } public Partition toPartition(InternalRowPartitionComputer computer) { @@ -93,6 +98,7 @@ public Partition toPartition(InternalRowPartitionComputer computer) { fileSizeInBytes, fileCount, lastFileCreationTime, + bucketCount, false); } @@ -102,15 +108,27 @@ public PartitionStatistics toPartitionStatistics(InternalRowPartitionComputer co recordCount, fileSizeInBytes, fileCount, - lastFileCreationTime); + lastFileCreationTime, + bucketCount); + } + + public PartitionStatistics toPartitionStatistics( + InternalRowPartitionComputer computer, Integer dynamicBucketCount) { + return new PartitionStatistics( + computer.generatePartValues(partition), + recordCount, + fileSizeInBytes, + fileCount, + lastFileCreationTime, + dynamicBucketCount); } public static PartitionEntry fromManifestEntry(ManifestEntry entry) { - return fromDataFile(entry.partition(), entry.kind(), entry.file()); + return fromDataFile(entry.partition(), entry.kind(), entry.file(), entry.totalBuckets()); } public static PartitionEntry fromDataFile( - BinaryRow partition, FileKind kind, DataFileMeta file) { + BinaryRow partition, FileKind kind, DataFileMeta file, Integer bucketCount) { long recordCount = file.rowCount(); long fileSizeInBytes = file.fileSize(); long fileCount = 1; @@ -120,7 +138,12 @@ public static PartitionEntry fromDataFile( fileCount = -fileCount; } return new PartitionEntry( - partition, recordCount, fileSizeInBytes, fileCount, file.creationTimeEpochMillis()); + partition, + recordCount, + fileSizeInBytes, + fileCount, + file.creationTimeEpochMillis(), + Optional.ofNullable(bucketCount).orElse(0)); } public static Collection merge(Collection fileEntries) { @@ -139,7 +162,8 @@ public static Collection mergeSplits(Collection split for (DataSplit split : splits) { BinaryRow partition = split.partition(); for (DataFileMeta file : split.dataFiles()) { - PartitionEntry partitionEntry = fromDataFile(partition, ADD, file); + PartitionEntry partitionEntry = + fromDataFile(partition, ADD, file, split.totalBuckets()); partitions.compute( partition, (part, old) -> old == null ? partitionEntry : old.merge(partitionEntry)); @@ -170,12 +194,18 @@ public boolean equals(Object o) { && fileSizeInBytes == that.fileSizeInBytes && fileCount == that.fileCount && lastFileCreationTime == that.lastFileCreationTime - && Objects.equals(partition, that.partition); + && Objects.equals(partition, that.partition) + && Objects.equals(bucketCount, that.bucketCount); } @Override public int hashCode() { return Objects.hash( - partition, recordCount, fileSizeInBytes, fileCount, lastFileCreationTime); + partition, + recordCount, + fileSizeInBytes, + fileCount, + lastFileCreationTime, + bucketCount); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java index da8b5c7563c9..77d63564c26e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.manifest.BucketEntry; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.manifest.IndexManifestFile; @@ -34,7 +35,9 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static java.util.Collections.emptyList; @@ -122,4 +125,14 @@ public CommitChanges readOverwriteChanges( indexChangesWithOverwrite.addAll(indexFiles); return new CommitChanges(changesWithOverwrite, emptyList(), indexChangesWithOverwrite); } + + public Map readBucketCounts(Snapshot snapshot, List partitions) { + List bucketEntries = + scan.withSnapshot(snapshot).withPartitionFilter(partitions).readBucketEntries(); + Map result = new HashMap<>(); + for (BucketEntry entry : bucketEntries) { + result.compute(entry.partition(), (k, v) -> (v == null ? 0 : v) + 1); + } + return result; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java index 20cbb9b31d50..91040a19cd2b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java @@ -107,7 +107,7 @@ public List listPartitionEntries() { List partitionEntries = new ArrayList<>(); for (Pair, Path> partition2Path : partition2Paths) { BinaryRow row = toPartitionRow(partition2Path.getKey()); - partitionEntries.add(new PartitionEntry(row, -1L, -1L, -1L, -1L)); + partitionEntries.add(new PartitionEntry(row, -1L, -1L, -1L, -1L, -1)); } return partitionEntries; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java index d3314edfa3a7..0bdeb3635bcc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java @@ -301,7 +301,8 @@ private PartitionEntry toPartitionEntry(Partition partition) { partition.recordCount(), partition.fileSizeInBytes(), partition.fileCount(), - partition.lastFileCreationTime()); + partition.lastFileCreationTime(), + partition.bucketCount()); } private Timestamp toTimestamp(Long epochMillis) { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionStatisticsReporter.java b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionStatisticsReporter.java index c27e1768fd1f..4c2fde36ffc3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionStatisticsReporter.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionStatisticsReporter.java @@ -37,6 +37,7 @@ import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; +import java.util.Optional; import static org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath; @@ -73,6 +74,7 @@ public void report(String partition, long modifyTimeMillis) throws Exception { long rowCount = 0; long totalSize = 0; long fileCount = 0; + int bucketCount = 0; for (DataSplit split : splits) { List fileMetas = split.dataFiles(); fileCount += fileMetas.size(); @@ -80,11 +82,24 @@ public void report(String partition, long modifyTimeMillis) throws Exception { rowCount += fileMeta.rowCount(); totalSize += fileMeta.fileSize(); } + Integer splitTotalBuckets = split.totalBuckets(); + if (splitTotalBuckets != null) { + bucketCount = Math.max(bucketCount, splitTotalBuckets); + } + bucketCount = + Math.max( + bucketCount, + Optional.ofNullable(split.totalBuckets()).orElse(bucketCount)); } PartitionStatistics partitionStats = new PartitionStatistics( - partitionSpec, rowCount, totalSize, fileCount, modifyTimeMillis); + partitionSpec, + rowCount, + totalSize, + fileCount, + modifyTimeMillis, + bucketCount); LOG.info("alter partition {} with statistic {}.", partitionSpec, partitionStats); partitionHandler.alterPartitions(Collections.singletonList(partitionStats)); } diff --git a/paimon-core/src/test/java/org/apache/paimon/partition/PartitionExpireTableTest.java b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionExpireTableTest.java index b78506a825fa..47ba63d9fd92 100644 --- a/paimon-core/src/test/java/org/apache/paimon/partition/PartitionExpireTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionExpireTableTest.java @@ -51,7 +51,7 @@ public void testCustomExpire() throws Exception { Table table = catalog.getTable(identifier()); String path = table.options().get("path"); - PartitionEntry expire = new PartitionEntry(BinaryRow.singleColumn(1), 1, 1, 1, 1); + PartitionEntry expire = new PartitionEntry(BinaryRow.singleColumn(1), 1, 1, 1, 1, 1); TABLE_EXPIRE_PARTITIONS.put(path, Collections.singletonList(expire)); write(table, GenericRow.of(1, 1)); write(table, GenericRow.of(2, 2)); diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java index f487815e4d61..597202714325 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java @@ -150,7 +150,7 @@ public static AlterTableRequest alterTableRequest() { public static ListPartitionsResponse listPartitionsResponse() { Map spec = new HashMap<>(); spec.put("f0", "1"); - Partition partition = new Partition(spec, 1, 1, 1, 1, false); + Partition partition = new Partition(spec, 1, 1, 1, 1, 1, false); return new ListPartitionsResponse(ImmutableList.of(partition)); } diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java index 544b4903850c..67aa6594dcc6 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java @@ -2405,6 +2405,7 @@ private MockResponse commitSnapshot( .lastFileCreationTime(), stats .lastFileCreationTime()), + stats.bucketCount(), oldPartition.done(), oldPartition.createdAt(), oldPartition.createdBy(), @@ -2630,6 +2631,7 @@ private Partition toPartition(PartitionStatistics stats) { stats.fileSizeInBytes(), stats.fileCount(), stats.lastFileCreationTime(), + stats.bucketCount(), false, System.currentTimeMillis(), "created", diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java index d79aa713c9dd..d413d41eb774 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java @@ -151,4 +151,52 @@ public void testFunction() throws Exception { sql(String.format("DROP FUNCTION %s.%s", DATABASE_NAME, functionName)); assertThat(catalog.functionExists(functionObjectPath)).isFalse(); } + + @Test + public void testBucketCountStatistics() throws Exception { + String fixedBucketTableName = "fixed_bucket_tbl"; + sql( + String.format( + "CREATE TABLE %s.%s (a INT, b INT, p INT) PARTITIONED BY (p) WITH ('bucket'='2', 'bucket-key'='a')", + DATABASE_NAME, fixedBucketTableName)); + sql( + String.format( + "INSERT INTO %s.%s VALUES (1, 10, 1), (2, 20, 1)", + DATABASE_NAME, fixedBucketTableName)); + validateBucketCount(DATABASE_NAME, fixedBucketTableName, 2); + + String dynamicBucketTableName = "dynamic_bucket_tbl"; + // enable metastore.partitioned-table to trigger + // org.apache.paimon.rest.RESTCatalogServer.partitionsApiHandle + sql( + String.format( + "CREATE TABLE %s.%s (a INT, b INT, p INT) PARTITIONED BY (p) WITH ('bucket'='-1', 'metastore.partitioned-table' = 'true')", + DATABASE_NAME, dynamicBucketTableName)); + sql( + String.format( + "INSERT INTO %s.dynamic_bucket_tbl VALUES (1, 10, 1), (2, 20, 1)", + DATABASE_NAME)); + validateBucketCount(DATABASE_NAME, "dynamic_bucket_tbl", 1); + + String postponeBucketTableName = "postpone_bucket_tbl"; + sql( + String.format( + "CREATE TABLE %s.%s (a INT, b INT, p INT, PRIMARY KEY (p, a) NOT ENFORCED) PARTITIONED BY (p) WITH ('bucket'='-2')", + DATABASE_NAME, postponeBucketTableName)); + sql( + String.format( + "INSERT INTO %s.%s VALUES (1, 10, 1), (2, 20, 1)", + DATABASE_NAME, postponeBucketTableName)); + validateBucketCount(DATABASE_NAME, postponeBucketTableName, 1); + } + + private void validateBucketCount( + String databaseName, String tableName, Integer expectedBucketCount) throws Exception { + Catalog catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).get(); + org.apache.paimon.catalog.Catalog paimonCatalog = ((FlinkCatalog) catalog).catalog(); + List partitions = + paimonCatalog.listPartitions(Identifier.create(databaseName, tableName)); + assertThat(partitions).isNotEmpty(); + assertThat(partitions.get(0).bucketCount()).isEqualTo(expectedBucketCount); + } } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index f63cd4846447..c7662f654727 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -479,6 +479,8 @@ && new CoreOptions(tableSchema.options()).partitionedTableInMetastore()) { String modifyTimeSeconds = String.valueOf(partition.lastFileCreationTime() / 1000); statistic.put(LAST_UPDATE_TIME_PROP, modifyTimeSeconds); + statistic.put(BUCKET_COUNT, String.valueOf(partition.bucketCount())); + // just for being compatible with hive metastore statistic.put(HIVE_LAST_UPDATE_TIME_PROP, modifyTimeSeconds); @@ -558,6 +560,9 @@ public List listPartitions(Identifier ide parameters.getOrDefault( LAST_UPDATE_TIME_PROP, System.currentTimeMillis() + "")); + int bucketCount = + Integer.parseInt( + parameters.getOrDefault(BUCKET_COUNT, "1")); return new org.apache.paimon.partition.Partition( Collections.singletonMap( tagToPartitionField, part.getValues().get(0)), @@ -565,6 +570,7 @@ public List listPartitions(Identifier ide fileSizeInBytes, fileCount, lastFileCreationTime, + bucketCount, false); }) .collect(Collectors.toList()); diff --git a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java index 5e196067168d..de04b0c8381b 100644 --- a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java +++ b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java @@ -530,7 +530,7 @@ public void testAlterPartitions() throws Exception { long fileCreationTime = System.currentTimeMillis(); PartitionStatistics partition = new PartitionStatistics( - Collections.singletonMap("dt", "20250101"), 1, 2, 3, fileCreationTime); + Collections.singletonMap("dt", "20250101"), 1, 2, 3, fileCreationTime, 4); catalog.alterPartitions(alterIdentifier, Collections.singletonList(partition)); Partition partitionFromServer = catalog.listPartitions(alterIdentifier).get(0); checkPartition( @@ -540,6 +540,7 @@ public void testAlterPartitions() throws Exception { 2, 3, fileCreationTime, + 4, false), partitionFromServer); From b78690865707cbc5ff6f2b0544e1fe09a88c09f7 Mon Sep 17 00:00:00 2001 From: mingfeng Date: Wed, 7 Jan 2026 15:28:48 +0800 Subject: [PATCH 2/5] fix fail tests --- .../org/apache/paimon/manifest/PartitionEntry.java | 4 ++++ .../utils/PartitionStatisticsReporterTest.java | 2 +- .../org/apache/paimon/flink/RESTCatalogITCase.java | 14 ++++++++------ 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java index 2cdb52d269bb..2d2e1f3b8bb1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java @@ -81,6 +81,10 @@ public long lastFileCreationTime() { return lastFileCreationTime; } + public int bucketCount() { + return bucketCount; + } + public PartitionEntry merge(PartitionEntry entry) { return new PartitionEntry( partition, diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java index 5ffb50e95f5b..e4fad3195a0c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java @@ -135,7 +135,7 @@ public void close() { assertThat(partitionParams).containsKey("c1=a/"); assertThat(partitionParams.get("c1=a/").toString()) .isEqualTo( - "{spec={c1=a}, recordCount=2, fileSizeInBytes=705, fileCount=1, lastFileCreationTime=1729598544974}"); + "{spec={c1=a}, recordCount=2, fileSizeInBytes=705, fileCount=1, lastFileCreationTime=1729598544974, bucketCount=0}"); action.close(); assertThat(closed).isTrue(); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java index d413d41eb774..5a623734049c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.partition.Partition; import org.apache.paimon.rest.RESTToken; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; @@ -192,11 +193,12 @@ public void testBucketCountStatistics() throws Exception { private void validateBucketCount( String databaseName, String tableName, Integer expectedBucketCount) throws Exception { - Catalog catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).get(); - org.apache.paimon.catalog.Catalog paimonCatalog = ((FlinkCatalog) catalog).catalog(); - List partitions = - paimonCatalog.listPartitions(Identifier.create(databaseName, tableName)); - assertThat(partitions).isNotEmpty(); - assertThat(partitions.get(0).bucketCount()).isEqualTo(expectedBucketCount); + Catalog flinkCatalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).get(); + try (org.apache.paimon.catalog.Catalog catalog = ((FlinkCatalog) flinkCatalog).catalog()) { + List partitions = + catalog.listPartitions(Identifier.create(databaseName, tableName)); + assertThat(partitions).isNotEmpty(); + assertThat(partitions.get(0).bucketCount()).isEqualTo(expectedBucketCount); + } } } From 9e42afcdf40fab44a019537e896b23b8296ad439 Mon Sep 17 00:00:00 2001 From: mingfeng Date: Thu, 8 Jan 2026 08:24:16 +0800 Subject: [PATCH 3/5] use raw totalBuckets --- docs/static/rest-catalog-open-api.yaml | 4 +- .../apache/paimon/partition/Partition.java | 12 +++--- .../paimon/partition/PartitionStatistics.java | 24 +++++------ .../paimon/partition/PartitionTest.java | 8 ++-- .../org/apache/paimon/catalog/Catalog.java | 2 +- .../paimon/manifest/PartitionEntry.java | 41 ++++++++----------- .../operation/commit/CommitScanner.java | 13 ------ .../paimon/table/system/PartitionsTable.java | 2 +- .../utils/PartitionStatisticsReporter.java | 13 ++---- .../apache/paimon/rest/RESTCatalogServer.java | 4 +- .../PartitionStatisticsReporterTest.java | 2 +- .../paimon/flink/RESTCatalogITCase.java | 24 +++++------ .../org/apache/paimon/hive/HiveCatalog.java | 8 ++-- 13 files changed, 64 insertions(+), 93 deletions(-) diff --git a/docs/static/rest-catalog-open-api.yaml b/docs/static/rest-catalog-open-api.yaml index 47585a1f1381..a01fede7cfa1 100644 --- a/docs/static/rest-catalog-open-api.yaml +++ b/docs/static/rest-catalog-open-api.yaml @@ -3252,7 +3252,7 @@ components: lastFileCreationTime: type: integer format: int64 - bucketCount: + totalBuckets: type: integer format: int32 done: @@ -3288,7 +3288,7 @@ components: lastFileCreationTime: type: integer format: int32 - bucketCount: + totalBuckets: type: integer format: int64 ####################################### diff --git a/paimon-api/src/main/java/org/apache/paimon/partition/Partition.java b/paimon-api/src/main/java/org/apache/paimon/partition/Partition.java index 85f1e945c982..c9fa4e099752 100644 --- a/paimon-api/src/main/java/org/apache/paimon/partition/Partition.java +++ b/paimon-api/src/main/java/org/apache/paimon/partition/Partition.java @@ -81,14 +81,14 @@ public Partition( @JsonProperty(FIELD_FILE_SIZE_IN_BYTES) long fileSizeInBytes, @JsonProperty(FIELD_FILE_COUNT) long fileCount, @JsonProperty(FIELD_LAST_FILE_CREATION_TIME) long lastFileCreationTime, - @JsonProperty(FIELD_BUCKET_COUNT) int bucketCount, + @JsonProperty(FIELD_TOTAL_BUCKETS) int totalBuckets, @JsonProperty(FIELD_DONE) boolean done, @JsonProperty(FIELD_CREATED_AT) @Nullable Long createdAt, @JsonProperty(FIELD_CREATED_BY) @Nullable String createdBy, @JsonProperty(FIELD_UPDATED_AT) @Nullable Long updatedAt, @JsonProperty(FIELD_UPDATED_BY) @Nullable String updatedBy, @JsonProperty(FIELD_OPTIONS) @Nullable Map options) { - super(spec, recordCount, fileSizeInBytes, fileCount, lastFileCreationTime, bucketCount); + super(spec, recordCount, fileSizeInBytes, fileCount, lastFileCreationTime, totalBuckets); this.done = done; this.createdAt = createdAt; this.createdBy = createdBy; @@ -103,7 +103,7 @@ public Partition( long fileSizeInBytes, long fileCount, long lastFileCreationTime, - int bucketCount, + int totalBuckets, boolean done) { this( spec, @@ -111,7 +111,7 @@ public Partition( fileSizeInBytes, fileCount, lastFileCreationTime, - bucketCount, + totalBuckets, done, null, null, @@ -191,8 +191,8 @@ public String toString() { + fileCount + ", lastFileCreationTime=" + lastFileCreationTime - + ", bucketCount=" - + bucketCount + + ", totalBuckets=" + + totalBuckets + ", done=" + done + ", createdAt=" diff --git a/paimon-api/src/main/java/org/apache/paimon/partition/PartitionStatistics.java b/paimon-api/src/main/java/org/apache/paimon/partition/PartitionStatistics.java index 40d754601227..02bfc12661f3 100644 --- a/paimon-api/src/main/java/org/apache/paimon/partition/PartitionStatistics.java +++ b/paimon-api/src/main/java/org/apache/paimon/partition/PartitionStatistics.java @@ -44,7 +44,7 @@ public class PartitionStatistics implements Serializable { public static final String FIELD_FILE_SIZE_IN_BYTES = "fileSizeInBytes"; public static final String FIELD_FILE_COUNT = "fileCount"; public static final String FIELD_LAST_FILE_CREATION_TIME = "lastFileCreationTime"; - public static final String FIELD_BUCKET_COUNT = "bucketCount"; + public static final String FIELD_TOTAL_BUCKETS = "totalBuckets"; @JsonProperty(FIELD_SPEC) protected final Map spec; @@ -61,8 +61,8 @@ public class PartitionStatistics implements Serializable { @JsonProperty(FIELD_LAST_FILE_CREATION_TIME) protected final long lastFileCreationTime; - @JsonProperty(FIELD_BUCKET_COUNT) - protected final int bucketCount; + @JsonProperty(FIELD_TOTAL_BUCKETS) + protected final int totalBuckets; @JsonCreator public PartitionStatistics( @@ -71,13 +71,13 @@ public PartitionStatistics( @JsonProperty(FIELD_FILE_SIZE_IN_BYTES) long fileSizeInBytes, @JsonProperty(FIELD_FILE_COUNT) long fileCount, @JsonProperty(FIELD_LAST_FILE_CREATION_TIME) long lastFileCreationTime, - @JsonProperty(FIELD_BUCKET_COUNT) int bucketCount) { + @JsonProperty(FIELD_TOTAL_BUCKETS) int totalBuckets) { this.spec = spec; this.recordCount = recordCount; this.fileSizeInBytes = fileSizeInBytes; this.fileCount = fileCount; this.lastFileCreationTime = lastFileCreationTime; - this.bucketCount = bucketCount; + this.totalBuckets = totalBuckets; } @JsonGetter(FIELD_SPEC) @@ -105,9 +105,9 @@ public long lastFileCreationTime() { return lastFileCreationTime; } - @JsonGetter(FIELD_BUCKET_COUNT) - public int bucketCount() { - return bucketCount; + @JsonGetter(FIELD_TOTAL_BUCKETS) + public int totalBuckets() { + return totalBuckets; } @Override @@ -124,13 +124,13 @@ public boolean equals(Object o) { && fileCount == that.fileCount && lastFileCreationTime == that.lastFileCreationTime && Objects.equals(spec, that.spec) - && Objects.equals(bucketCount, that.bucketCount); + && Objects.equals(totalBuckets, that.totalBuckets); } @Override public int hashCode() { return Objects.hash( - spec, recordCount, fileSizeInBytes, fileCount, lastFileCreationTime, bucketCount); + spec, recordCount, fileSizeInBytes, fileCount, lastFileCreationTime, totalBuckets); } @Override @@ -146,8 +146,8 @@ public String toString() { + fileCount + ", lastFileCreationTime=" + lastFileCreationTime - + ", bucketCount=" - + bucketCount + + ", totalBuckets=" + + totalBuckets + '}'; } } diff --git a/paimon-api/src/test/java/org/apache/paimon/partition/PartitionTest.java b/paimon-api/src/test/java/org/apache/paimon/partition/PartitionTest.java index 5d2cfabe07b6..b589a75cf1b6 100644 --- a/paimon-api/src/test/java/org/apache/paimon/partition/PartitionTest.java +++ b/paimon-api/src/test/java/org/apache/paimon/partition/PartitionTest.java @@ -40,7 +40,7 @@ void testJsonSerializationWithNullValues() { 1024L, // fileSizeInBytes 2L, // fileCount System.currentTimeMillis(), // lastFileCreationTime - 10, // bucketCount + 10, // totalBuckets false, // done null, // createdAt null, // createdBy @@ -58,7 +58,7 @@ void testJsonSerializationWithNullValues() { assertThat(json).contains("done"); assertThat(json).contains("recordCount"); - assertThat(json).contains("bucketCount"); + assertThat(json).contains("totalBuckets"); } @Test @@ -71,7 +71,7 @@ void testJsonSerializationWithNonNullValues() { 1024L, 2L, System.currentTimeMillis(), - 10, // bucketCount + 10, // totalBuckets true, 1234567890L, // createdAt "user1", // createdBy @@ -81,7 +81,7 @@ void testJsonSerializationWithNonNullValues() { String json = JsonSerdeUtil.toFlatJson(partition); - assertThat(json).contains("bucketCount"); + assertThat(json).contains("totalBuckets"); assertThat(json).contains("createdAt"); assertThat(json).contains("createdBy"); assertThat(json).contains("updatedAt"); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index ad47b8555ed2..f5118fb32c5a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -1073,7 +1073,7 @@ List authTableQuery(Identifier identifier, @Nullable List select String NUM_FILES_PROP = "numFiles"; String TOTAL_SIZE_PROP = "totalSize"; String LAST_UPDATE_TIME_PROP = "lastUpdateTime"; - String BUCKET_COUNT = "bucketCount"; + String TOTAL_BUCKETS = "totalBuckets"; // ======================= Exceptions =============================== diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java index 2d2e1f3b8bb1..6d57c1c4cfba 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java @@ -44,7 +44,7 @@ public class PartitionEntry { private final long fileSizeInBytes; private final long fileCount; private final long lastFileCreationTime; - private final int bucketCount; + private final int totalBuckets; public PartitionEntry( BinaryRow partition, @@ -52,13 +52,13 @@ public PartitionEntry( long fileSizeInBytes, long fileCount, long lastFileCreationTime, - int bucketCount) { + int totalBuckets) { this.partition = partition; this.recordCount = recordCount; this.fileSizeInBytes = fileSizeInBytes; this.fileCount = fileCount; this.lastFileCreationTime = lastFileCreationTime; - this.bucketCount = bucketCount; + this.totalBuckets = totalBuckets; } public BinaryRow partition() { @@ -81,8 +81,8 @@ public long lastFileCreationTime() { return lastFileCreationTime; } - public int bucketCount() { - return bucketCount; + public int totalBuckets() { + return totalBuckets; } public PartitionEntry merge(PartitionEntry entry) { @@ -92,7 +92,7 @@ public PartitionEntry merge(PartitionEntry entry) { fileSizeInBytes + entry.fileSizeInBytes, fileCount + entry.fileCount, Math.max(lastFileCreationTime, entry.lastFileCreationTime), - Math.max(bucketCount, entry.bucketCount)); + entry.totalBuckets); } public Partition toPartition(InternalRowPartitionComputer computer) { @@ -102,7 +102,7 @@ public Partition toPartition(InternalRowPartitionComputer computer) { fileSizeInBytes, fileCount, lastFileCreationTime, - bucketCount, + totalBuckets, false); } @@ -113,18 +113,7 @@ public PartitionStatistics toPartitionStatistics(InternalRowPartitionComputer co fileSizeInBytes, fileCount, lastFileCreationTime, - bucketCount); - } - - public PartitionStatistics toPartitionStatistics( - InternalRowPartitionComputer computer, Integer dynamicBucketCount) { - return new PartitionStatistics( - computer.generatePartValues(partition), - recordCount, - fileSizeInBytes, - fileCount, - lastFileCreationTime, - dynamicBucketCount); + totalBuckets); } public static PartitionEntry fromManifestEntry(ManifestEntry entry) { @@ -132,7 +121,7 @@ public static PartitionEntry fromManifestEntry(ManifestEntry entry) { } public static PartitionEntry fromDataFile( - BinaryRow partition, FileKind kind, DataFileMeta file, Integer bucketCount) { + BinaryRow partition, FileKind kind, DataFileMeta file, int totalBuckets) { long recordCount = file.rowCount(); long fileSizeInBytes = file.fileSize(); long fileCount = 1; @@ -147,7 +136,7 @@ public static PartitionEntry fromDataFile( fileSizeInBytes, fileCount, file.creationTimeEpochMillis(), - Optional.ofNullable(bucketCount).orElse(0)); + totalBuckets); } public static Collection merge(Collection fileEntries) { @@ -167,7 +156,11 @@ public static Collection mergeSplits(Collection split BinaryRow partition = split.partition(); for (DataFileMeta file : split.dataFiles()) { PartitionEntry partitionEntry = - fromDataFile(partition, ADD, file, split.totalBuckets()); + fromDataFile( + partition, + ADD, + file, + Optional.ofNullable(split.totalBuckets()).orElse(0)); partitions.compute( partition, (part, old) -> old == null ? partitionEntry : old.merge(partitionEntry)); @@ -199,7 +192,7 @@ public boolean equals(Object o) { && fileCount == that.fileCount && lastFileCreationTime == that.lastFileCreationTime && Objects.equals(partition, that.partition) - && Objects.equals(bucketCount, that.bucketCount); + && Objects.equals(totalBuckets, that.totalBuckets); } @Override @@ -210,6 +203,6 @@ public int hashCode() { fileSizeInBytes, fileCount, lastFileCreationTime, - bucketCount); + totalBuckets); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java index 77d63564c26e..da8b5c7563c9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java @@ -21,7 +21,6 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.manifest.BucketEntry; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.manifest.IndexManifestFile; @@ -35,9 +34,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import static java.util.Collections.emptyList; @@ -125,14 +122,4 @@ public CommitChanges readOverwriteChanges( indexChangesWithOverwrite.addAll(indexFiles); return new CommitChanges(changesWithOverwrite, emptyList(), indexChangesWithOverwrite); } - - public Map readBucketCounts(Snapshot snapshot, List partitions) { - List bucketEntries = - scan.withSnapshot(snapshot).withPartitionFilter(partitions).readBucketEntries(); - Map result = new HashMap<>(); - for (BucketEntry entry : bucketEntries) { - result.compute(entry.partition(), (k, v) -> (v == null ? 0 : v) + 1); - } - return result; - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java index 0bdeb3635bcc..22d2c852830a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java @@ -302,7 +302,7 @@ private PartitionEntry toPartitionEntry(Partition partition) { partition.fileSizeInBytes(), partition.fileCount(), partition.lastFileCreationTime(), - partition.bucketCount()); + partition.totalBuckets()); } private Timestamp toTimestamp(Long epochMillis) { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionStatisticsReporter.java b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionStatisticsReporter.java index 4c2fde36ffc3..da8d47ba8c3d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionStatisticsReporter.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionStatisticsReporter.java @@ -74,7 +74,7 @@ public void report(String partition, long modifyTimeMillis) throws Exception { long rowCount = 0; long totalSize = 0; long fileCount = 0; - int bucketCount = 0; + int totalBuckets = 0; for (DataSplit split : splits) { List fileMetas = split.dataFiles(); fileCount += fileMetas.size(); @@ -82,14 +82,7 @@ public void report(String partition, long modifyTimeMillis) throws Exception { rowCount += fileMeta.rowCount(); totalSize += fileMeta.fileSize(); } - Integer splitTotalBuckets = split.totalBuckets(); - if (splitTotalBuckets != null) { - bucketCount = Math.max(bucketCount, splitTotalBuckets); - } - bucketCount = - Math.max( - bucketCount, - Optional.ofNullable(split.totalBuckets()).orElse(bucketCount)); + totalBuckets = Optional.ofNullable(split.totalBuckets()).orElse(0); } PartitionStatistics partitionStats = @@ -99,7 +92,7 @@ public void report(String partition, long modifyTimeMillis) throws Exception { totalSize, fileCount, modifyTimeMillis, - bucketCount); + totalBuckets); LOG.info("alter partition {} with statistic {}.", partitionSpec, partitionStats); partitionHandler.alterPartitions(Collections.singletonList(partitionStats)); } diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java index 67aa6594dcc6..e84fd00f4559 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java @@ -2405,7 +2405,7 @@ private MockResponse commitSnapshot( .lastFileCreationTime(), stats .lastFileCreationTime()), - stats.bucketCount(), + stats.totalBuckets(), oldPartition.done(), oldPartition.createdAt(), oldPartition.createdBy(), @@ -2631,7 +2631,7 @@ private Partition toPartition(PartitionStatistics stats) { stats.fileSizeInBytes(), stats.fileCount(), stats.lastFileCreationTime(), - stats.bucketCount(), + stats.totalBuckets(), false, System.currentTimeMillis(), "created", diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java index e4fad3195a0c..7517fac6f1ea 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java @@ -135,7 +135,7 @@ public void close() { assertThat(partitionParams).containsKey("c1=a/"); assertThat(partitionParams.get("c1=a/").toString()) .isEqualTo( - "{spec={c1=a}, recordCount=2, fileSizeInBytes=705, fileCount=1, lastFileCreationTime=1729598544974, bucketCount=0}"); + "{spec={c1=a}, recordCount=2, fileSizeInBytes=705, fileCount=1, lastFileCreationTime=1729598544974, totalBuckets=-1}"); action.close(); assertThat(closed).isTrue(); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java index 5a623734049c..b383d2594482 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java @@ -154,21 +154,19 @@ public void testFunction() throws Exception { } @Test - public void testBucketCountStatistics() throws Exception { + public void testTotalBucketsStatistics() throws Exception { String fixedBucketTableName = "fixed_bucket_tbl"; - sql( + batchSql( String.format( "CREATE TABLE %s.%s (a INT, b INT, p INT) PARTITIONED BY (p) WITH ('bucket'='2', 'bucket-key'='a')", DATABASE_NAME, fixedBucketTableName)); - sql( + batchSql( String.format( "INSERT INTO %s.%s VALUES (1, 10, 1), (2, 20, 1)", DATABASE_NAME, fixedBucketTableName)); - validateBucketCount(DATABASE_NAME, fixedBucketTableName, 2); + validateTotalBuckets(DATABASE_NAME, fixedBucketTableName, 2); String dynamicBucketTableName = "dynamic_bucket_tbl"; - // enable metastore.partitioned-table to trigger - // org.apache.paimon.rest.RESTCatalogServer.partitionsApiHandle sql( String.format( "CREATE TABLE %s.%s (a INT, b INT, p INT) PARTITIONED BY (p) WITH ('bucket'='-1', 'metastore.partitioned-table' = 'true')", @@ -177,28 +175,28 @@ public void testBucketCountStatistics() throws Exception { String.format( "INSERT INTO %s.dynamic_bucket_tbl VALUES (1, 10, 1), (2, 20, 1)", DATABASE_NAME)); - validateBucketCount(DATABASE_NAME, "dynamic_bucket_tbl", 1); + validateTotalBuckets(DATABASE_NAME, "dynamic_bucket_tbl", -1); String postponeBucketTableName = "postpone_bucket_tbl"; - sql( + batchSql( String.format( "CREATE TABLE %s.%s (a INT, b INT, p INT, PRIMARY KEY (p, a) NOT ENFORCED) PARTITIONED BY (p) WITH ('bucket'='-2')", DATABASE_NAME, postponeBucketTableName)); - sql( + batchSql( String.format( "INSERT INTO %s.%s VALUES (1, 10, 1), (2, 20, 1)", DATABASE_NAME, postponeBucketTableName)); - validateBucketCount(DATABASE_NAME, postponeBucketTableName, 1); + validateTotalBuckets(DATABASE_NAME, postponeBucketTableName, 1); } - private void validateBucketCount( - String databaseName, String tableName, Integer expectedBucketCount) throws Exception { + private void validateTotalBuckets( + String databaseName, String tableName, Integer expectedTotalBuckets) throws Exception { Catalog flinkCatalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).get(); try (org.apache.paimon.catalog.Catalog catalog = ((FlinkCatalog) flinkCatalog).catalog()) { List partitions = catalog.listPartitions(Identifier.create(databaseName, tableName)); assertThat(partitions).isNotEmpty(); - assertThat(partitions.get(0).bucketCount()).isEqualTo(expectedBucketCount); + assertThat(partitions.get(0).totalBuckets()).isEqualTo(expectedTotalBuckets); } } } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index c7662f654727..98c14def5820 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -479,7 +479,7 @@ && new CoreOptions(tableSchema.options()).partitionedTableInMetastore()) { String modifyTimeSeconds = String.valueOf(partition.lastFileCreationTime() / 1000); statistic.put(LAST_UPDATE_TIME_PROP, modifyTimeSeconds); - statistic.put(BUCKET_COUNT, String.valueOf(partition.bucketCount())); + statistic.put(TOTAL_BUCKETS, String.valueOf(partition.totalBuckets())); // just for being compatible with hive metastore statistic.put(HIVE_LAST_UPDATE_TIME_PROP, modifyTimeSeconds); @@ -560,9 +560,9 @@ public List listPartitions(Identifier ide parameters.getOrDefault( LAST_UPDATE_TIME_PROP, System.currentTimeMillis() + "")); - int bucketCount = + int totalBuckets = Integer.parseInt( - parameters.getOrDefault(BUCKET_COUNT, "1")); + parameters.getOrDefault(TOTAL_BUCKETS, "0")); return new org.apache.paimon.partition.Partition( Collections.singletonMap( tagToPartitionField, part.getValues().get(0)), @@ -570,7 +570,7 @@ public List listPartitions(Identifier ide fileSizeInBytes, fileCount, lastFileCreationTime, - bucketCount, + totalBuckets, false); }) .collect(Collectors.toList()); From 004aba5fbc5c34bff17d96c15be4ea43ebabf29f Mon Sep 17 00:00:00 2001 From: mingfeng Date: Thu, 8 Jan 2026 10:43:36 +0800 Subject: [PATCH 4/5] pypaimon support total_buckets --- docs/static/rest-catalog-open-api.yaml | 4 ++-- .../org/apache/paimon/partition/PartitionStatistics.java | 4 ++-- .../java/org/apache/paimon/manifest/PartitionEntry.java | 4 ++-- .../java/org/apache/paimon/flink/RESTCatalogITCase.java | 2 +- paimon-python/pypaimon/snapshot/snapshot_commit.py | 7 +++++-- paimon-python/pypaimon/write/file_store_commit.py | 6 ++++-- 6 files changed, 16 insertions(+), 11 deletions(-) diff --git a/docs/static/rest-catalog-open-api.yaml b/docs/static/rest-catalog-open-api.yaml index a01fede7cfa1..0aa17e2b09f6 100644 --- a/docs/static/rest-catalog-open-api.yaml +++ b/docs/static/rest-catalog-open-api.yaml @@ -3287,10 +3287,10 @@ components: format: int64 lastFileCreationTime: type: integer - format: int32 + format: int64 totalBuckets: type: integer - format: int64 + format: int32 ####################################### # Examples of different values # ####################################### diff --git a/paimon-api/src/main/java/org/apache/paimon/partition/PartitionStatistics.java b/paimon-api/src/main/java/org/apache/paimon/partition/PartitionStatistics.java index 02bfc12661f3..3c322b5a39c6 100644 --- a/paimon-api/src/main/java/org/apache/paimon/partition/PartitionStatistics.java +++ b/paimon-api/src/main/java/org/apache/paimon/partition/PartitionStatistics.java @@ -123,8 +123,8 @@ public boolean equals(Object o) { && fileSizeInBytes == that.fileSizeInBytes && fileCount == that.fileCount && lastFileCreationTime == that.lastFileCreationTime - && Objects.equals(spec, that.spec) - && Objects.equals(totalBuckets, that.totalBuckets); + && totalBuckets == that.totalBuckets + && Objects.equals(spec, that.spec); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java index 6d57c1c4cfba..aaee93602a9e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java @@ -191,8 +191,8 @@ public boolean equals(Object o) { && fileSizeInBytes == that.fileSizeInBytes && fileCount == that.fileCount && lastFileCreationTime == that.lastFileCreationTime - && Objects.equals(partition, that.partition) - && Objects.equals(totalBuckets, that.totalBuckets); + && totalBuckets == that.totalBuckets + && Objects.equals(partition, that.partition); } @Override diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java index b383d2594482..034816d6a0fc 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java @@ -169,7 +169,7 @@ public void testTotalBucketsStatistics() throws Exception { String dynamicBucketTableName = "dynamic_bucket_tbl"; sql( String.format( - "CREATE TABLE %s.%s (a INT, b INT, p INT) PARTITIONED BY (p) WITH ('bucket'='-1', 'metastore.partitioned-table' = 'true')", + "CREATE TABLE %s.%s (a INT, b INT, p INT) PARTITIONED BY (p) WITH ('bucket'='-1')", DATABASE_NAME, dynamicBucketTableName)); sql( String.format( diff --git a/paimon-python/pypaimon/snapshot/snapshot_commit.py b/paimon-python/pypaimon/snapshot/snapshot_commit.py index 50727b6ce37c..156036165a4e 100644 --- a/paimon-python/pypaimon/snapshot/snapshot_commit.py +++ b/paimon-python/pypaimon/snapshot/snapshot_commit.py @@ -39,11 +39,12 @@ class PartitionStatistics: file_size_in_bytes: int = json_field("fileSizeInBytes", default=0) file_count: int = json_field("fileCount", default=0) last_file_creation_time: int = json_field("lastFileCreationTime", default_factory=lambda: int(time.time() * 1000)) + total_buckets: int = json_field("totalBuckets", default=0) @classmethod def create(cls, partition_spec: Dict[str, str] = None, record_count: int = 0, file_count: int = 0, file_size_in_bytes: int = 0, - last_file_creation_time: int = None) -> 'PartitionStatistics': + last_file_creation_time: int = None, total_buckets: int = 0) -> 'PartitionStatistics': """ Factory method to create PartitionStatistics with backward compatibility. @@ -53,6 +54,7 @@ def create(cls, partition_spec: Dict[str, str] = None, record_count: int = 0, file_count: Number of files file_size_in_bytes: Total file size in bytes last_file_creation_time: Last file creation time in milliseconds + total_buckets: Total number of buckets in the partition Returns: PartitionStatistics instance @@ -62,7 +64,8 @@ def create(cls, partition_spec: Dict[str, str] = None, record_count: int = 0, record_count=record_count, file_count=file_count, file_size_in_bytes=file_size_in_bytes, - last_file_creation_time=last_file_creation_time or int(time.time() * 1000) + last_file_creation_time=last_file_creation_time or int(time.time() * 1000), + total_buckets=total_buckets ) diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index a5b9fd969349..6a1f4c50b15f 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -288,7 +288,8 @@ def _generate_partition_statistics(self, commit_entries: List[ManifestEntry]) -> 'record_count': 0, 'file_count': 0, 'file_size_in_bytes': 0, - 'last_file_creation_time': 0 + 'last_file_creation_time': 0, + 'total_buckets': entry.total_buckets } # Following Java implementation: PartitionEntry.fromDataFile() @@ -323,7 +324,8 @@ def _generate_partition_statistics(self, commit_entries: List[ManifestEntry]) -> record_count=stats['record_count'], file_count=stats['file_count'], file_size_in_bytes=stats['file_size_in_bytes'], - last_file_creation_time=stats['last_file_creation_time'] + last_file_creation_time=stats['last_file_creation_time'], + total_buckets=stats['total_buckets'] ) for stats in partition_stats.values() ] From c4da200689edae6254e81b23b85343be1c1bf92f Mon Sep 17 00:00:00 2001 From: mingfeng Date: Wed, 14 Jan 2026 17:27:49 +0800 Subject: [PATCH 5/5] add PartitionStatistics deserialization test --- .../paimon/partition/PartitionStatistics.java | 2 + .../partition/PartitionStatisticsTest.java | 44 +++++++++++++++++++ 2 files changed, 46 insertions(+) create mode 100644 paimon-api/src/test/java/org/apache/paimon/partition/PartitionStatisticsTest.java diff --git a/paimon-api/src/main/java/org/apache/paimon/partition/PartitionStatistics.java b/paimon-api/src/main/java/org/apache/paimon/partition/PartitionStatistics.java index 3c322b5a39c6..ab87f02ed6a1 100644 --- a/paimon-api/src/main/java/org/apache/paimon/partition/PartitionStatistics.java +++ b/paimon-api/src/main/java/org/apache/paimon/partition/PartitionStatistics.java @@ -61,6 +61,8 @@ public class PartitionStatistics implements Serializable { @JsonProperty(FIELD_LAST_FILE_CREATION_TIME) protected final long lastFileCreationTime; + // defaults to 0 if this field is absent in the serialized data (e.g., from an older Paimon + // version) @JsonProperty(FIELD_TOTAL_BUCKETS) protected final int totalBuckets; diff --git a/paimon-api/src/test/java/org/apache/paimon/partition/PartitionStatisticsTest.java b/paimon-api/src/test/java/org/apache/paimon/partition/PartitionStatisticsTest.java new file mode 100644 index 000000000000..d9fd8e8bb162 --- /dev/null +++ b/paimon-api/src/test/java/org/apache/paimon/partition/PartitionStatisticsTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.partition; + +import org.apache.paimon.utils.JsonSerdeUtil; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link PartitionStatistics}. */ +public class PartitionStatisticsTest { + + @Test + void testLegacyPartitionStatisticsDeserialization() { + String legacyPartitionStatisticsJson = + "{\"spec\":{\"pt\":\"1\"},\"recordCount\":100,\"fileSizeInBytes\":1024,\"fileCount\":2,\"lastFileCreationTime\":123456789}"; + PartitionStatistics stats = + JsonSerdeUtil.fromJson(legacyPartitionStatisticsJson, PartitionStatistics.class); + + assertThat(stats.spec()).containsEntry("pt", "1"); + assertThat(stats.recordCount()).isEqualTo(100); + assertThat(stats.fileSizeInBytes()).isEqualTo(1024); + assertThat(stats.fileCount()).isEqualTo(2); + assertThat(stats.lastFileCreationTime()).isEqualTo(123456789L); + assertThat(stats.totalBuckets()).isEqualTo(0); + } +}