From dd35d60a19447a628370dd2b1fd1c8976e652dc4 Mon Sep 17 00:00:00 2001 From: Konstantin Vedernikov <75157521+scanhex12@users.noreply.github.com> Date: Wed, 11 Feb 2026 10:17:32 +0000 Subject: [PATCH] Merge pull request #96620 from scanhex12/iceberg_partitioing_fix Iceberg partitioing fix --- .../DataLakes/Iceberg/ChunkPartitioner.cpp | 12 +++---- .../test_writes_with_partitioned_table.py | 34 +++++++++++++++++++ 2 files changed, 39 insertions(+), 7 deletions(-) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/ChunkPartitioner.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/ChunkPartitioner.cpp index f5930b5742e9..c61c566c1744 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/ChunkPartitioner.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/ChunkPartitioner.cpp @@ -93,6 +93,8 @@ ChunkPartitioner::partitionChunk(const Chunk & chunk) } std::vector transform_results(chunk.getNumRows()); + ColumnRawPtrs raw_columns; + Columns functions_columns; for (size_t transform_ind = 0; transform_ind < functions.size(); ++transform_ind) { ColumnsWithTypeAndName arguments; @@ -115,6 +117,8 @@ ChunkPartitioner::partitionChunk(const Chunk & chunk) } auto result = functions[transform_ind]->build(arguments)->execute(arguments, std::make_shared(), chunk.getNumRows(), false); + functions_columns.push_back(result); + raw_columns.push_back(result.get()); for (size_t i = 0; i < chunk.getNumRows(); ++i) { Field field; @@ -127,12 +131,9 @@ ChunkPartitioner::partitionChunk(const Chunk & chunk) { return transform_results[row_num]; }; - + std::vector> result; PODArray partition_num_to_first_row; IColumn::Selector selector; - ColumnRawPtrs raw_columns; - for (const auto & column : chunk.getColumns()) - raw_columns.push_back(column.get()); buildScatterSelector(raw_columns, partition_num_to_first_row, selector, 0, Context::getGlobalContextInstance()); @@ -164,9 +165,6 @@ ChunkPartitioner::partitionChunk(const Chunk & chunk) result_columns[0].second[col] = chunk.getColumns()[col]->cloneFinalized(); } } - - std::vector> result; - result.reserve(result_columns.size()); for (auto && [key, partition_columns] : result_columns) { size_t column_size = partition_columns[0]->size(); diff --git a/tests/integration/test_storage_iceberg_with_spark/test_writes_with_partitioned_table.py b/tests/integration/test_storage_iceberg_with_spark/test_writes_with_partitioned_table.py index 7667d4dff2ee..93e41fae9475 100644 --- a/tests/integration/test_storage_iceberg_with_spark/test_writes_with_partitioned_table.py +++ b/tests/integration/test_storage_iceberg_with_spark/test_writes_with_partitioned_table.py @@ -83,3 +83,37 @@ def execute_spark_query(query: str): df = spark.read.format("iceberg").load(f"/var/lib/clickhouse/user_files/iceberg_data/default/{TABLE_NAME}").collect() assert len(df) == 10 + + +@pytest.mark.parametrize("format_version", ["2"]) +@pytest.mark.parametrize("storage_type", ["s3"]) +def test_writes_with_partitioned_table_count_partitions(started_cluster_iceberg_with_spark, format_version, storage_type): + instance = started_cluster_iceberg_with_spark.instances["node1"] + + TABLE_NAME = "test_writes_with_partitioned_table_count_partitions_" + storage_type + "_" + get_uuid_str() + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster_iceberg_with_spark, "(y String)", partition_by="(icebergBucket(16, y))", format_version=format_version) + values = "" + num_rows = 1000 + target = [] + for i in range(num_rows): + values += f"({i})" + target.append(str(i)) + if i != num_rows - 1: + values += "," + target = '\n'.join(sorted(target)) + instance.query(f"INSERT INTO {TABLE_NAME} VALUES {values};", settings={"allow_experimental_insert_into_iceberg": 1}) + assert instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY ALL") == target + "\n" + + files = default_download_directory( + started_cluster_iceberg_with_spark, + storage_type, + f"/var/lib/clickhouse/user_files/iceberg_data/default/{TABLE_NAME}/", + f"/var/lib/clickhouse/user_files/iceberg_data/default/{TABLE_NAME}/", + ) + + num_pq_files = 0 + for file in files: + if file[-7:] == 'parquet': + num_pq_files += 1 + + assert num_pq_files == 16