From 173a6196562cac5582e74bd493e942133b8e21f4 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 18 Mar 2026 09:13:02 +0000 Subject: [PATCH 1/2] Merge pull request #99548 from ClickHouse/parallelize-object-storage-output Parallelize object storage output --- .../QueryPlan/ReadFromObjectStorageStep.cpp | 10 ++++++- .../QueryPlan/ReadFromObjectStorageStep.h | 1 + ...bject_storage_parallelize_output.reference | 1 + ...04040_object_storage_parallelize_output.sh | 29 +++++++++++++++++++ 4 files changed, 40 insertions(+), 1 deletion(-) create mode 100644 tests/queries/0_stateless/04040_object_storage_parallelize_output.reference create mode 100755 tests/queries/0_stateless/04040_object_storage_parallelize_output.sh diff --git a/src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp b/src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp index 9f77f1aaf3db..4fe19b1fd540 100644 --- a/src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp +++ b/src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp @@ -23,7 +23,7 @@ namespace DB namespace Setting { - extern const SettingsMaxThreads max_threads; + extern const SettingsBool parallelize_output_from_storages; } @@ -50,6 +50,7 @@ ReadFromObjectStorageStep::ReadFromObjectStorageStep( , need_only_count(need_only_count_) , max_block_size(max_block_size_) , num_streams(num_streams_) + , max_num_streams(num_streams_) , distributed_processing(distributed_processing_) { } @@ -128,6 +129,13 @@ void ReadFromObjectStorageStep::initializePipeline(QueryPipelineBuilder & pipeli if (pipe.empty()) pipe = Pipe(std::make_shared(std::make_shared(info.source_header))); + size_t output_ports = pipe.numOutputPorts(); + const bool parallelize_output = context->getSettingsRef()[Setting::parallelize_output_from_storages]; + if (parallelize_output + && FormatFactory::instance().checkParallelizeOutputAfterReading(configuration->format, context) + && output_ports > 0 && output_ports < max_num_streams) + pipe.resize(max_num_streams); + for (const auto & processor : pipe.getProcessors()) processors.emplace_back(processor); diff --git a/src/Processors/QueryPlan/ReadFromObjectStorageStep.h b/src/Processors/QueryPlan/ReadFromObjectStorageStep.h index 6a08ae1d387f..272c6e917f46 100644 --- a/src/Processors/QueryPlan/ReadFromObjectStorageStep.h +++ b/src/Processors/QueryPlan/ReadFromObjectStorageStep.h @@ -56,6 +56,7 @@ class ReadFromObjectStorageStep : public SourceStepWithFilter const bool need_only_count; const size_t max_block_size; size_t num_streams; + const size_t max_num_streams; const bool distributed_processing; void createIterator(); diff --git a/tests/queries/0_stateless/04040_object_storage_parallelize_output.reference b/tests/queries/0_stateless/04040_object_storage_parallelize_output.reference new file mode 100644 index 000000000000..4f746a3120fe --- /dev/null +++ b/tests/queries/0_stateless/04040_object_storage_parallelize_output.reference @@ -0,0 +1 @@ +Resize 1 → 4 diff --git a/tests/queries/0_stateless/04040_object_storage_parallelize_output.sh b/tests/queries/0_stateless/04040_object_storage_parallelize_output.sh new file mode 100755 index 000000000000..87a61b2f737e --- /dev/null +++ b/tests/queries/0_stateless/04040_object_storage_parallelize_output.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash +# Tags: no-fasttest + +# Verify that reading a single file from object storage (ENGINE = S3) +# parallelizes the pipeline output via Resize, so downstream processors +# like AggregatingTransform can run on multiple threads. +# Without this, queries on a single S3/data-lake Parquet file run +# entirely single-threaded (e.g. Q28 in ClickBench: 79× slower). + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +${CLICKHOUSE_CLIENT} --query " + CREATE TABLE test_s3_${CLICKHOUSE_DATABASE} (x UInt64, y String) + ENGINE = S3('http://localhost:19999/dummy.parquet', Parquet); +" + +# The pipeline should contain 'Resize 1 → 4' between the source and +# the processing transforms, proving the single source output is +# distributed across max_threads workers. +${CLICKHOUSE_CLIENT} --query " + EXPLAIN PIPELINE + SELECT count(), sum(x) FROM test_s3_${CLICKHOUSE_DATABASE} + GROUP BY y + SETTINGS max_threads = 4; +" | grep -o 'Resize 1 → 4' + +${CLICKHOUSE_CLIENT} --query "DROP TABLE test_s3_${CLICKHOUSE_DATABASE};" From dab5942b2fbd17b04bffa015422464fd739fa53e Mon Sep 17 00:00:00 2001 From: Mikhail Koviazin Date: Thu, 26 Mar 2026 08:31:48 +0100 Subject: [PATCH 2/2] fix build --- src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp b/src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp index 4fe19b1fd540..bb3a2b137dbc 100644 --- a/src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp +++ b/src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp @@ -132,7 +132,7 @@ void ReadFromObjectStorageStep::initializePipeline(QueryPipelineBuilder & pipeli size_t output_ports = pipe.numOutputPorts(); const bool parallelize_output = context->getSettingsRef()[Setting::parallelize_output_from_storages]; if (parallelize_output - && FormatFactory::instance().checkParallelizeOutputAfterReading(configuration->format, context) + && FormatFactory::instance().checkParallelizeOutputAfterReading(configuration->getFormat(), context) && output_ports > 0 && output_ports < max_num_streams) pipe.resize(max_num_streams);