diff --git a/src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp b/src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp index 9f77f1aaf3db..bb3a2b137dbc 100644 --- a/src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp +++ b/src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp @@ -23,7 +23,7 @@ namespace DB namespace Setting { - extern const SettingsMaxThreads max_threads; + extern const SettingsBool parallelize_output_from_storages; } @@ -50,6 +50,7 @@ ReadFromObjectStorageStep::ReadFromObjectStorageStep( , need_only_count(need_only_count_) , max_block_size(max_block_size_) , num_streams(num_streams_) + , max_num_streams(num_streams_) , distributed_processing(distributed_processing_) { } @@ -128,6 +129,13 @@ void ReadFromObjectStorageStep::initializePipeline(QueryPipelineBuilder & pipeli if (pipe.empty()) pipe = Pipe(std::make_shared(std::make_shared(info.source_header))); + size_t output_ports = pipe.numOutputPorts(); + const bool parallelize_output = context->getSettingsRef()[Setting::parallelize_output_from_storages]; + if (parallelize_output + && FormatFactory::instance().checkParallelizeOutputAfterReading(configuration->getFormat(), context) + && output_ports > 0 && output_ports < max_num_streams) + pipe.resize(max_num_streams); + for (const auto & processor : pipe.getProcessors()) processors.emplace_back(processor); diff --git a/src/Processors/QueryPlan/ReadFromObjectStorageStep.h b/src/Processors/QueryPlan/ReadFromObjectStorageStep.h index 6a08ae1d387f..272c6e917f46 100644 --- a/src/Processors/QueryPlan/ReadFromObjectStorageStep.h +++ b/src/Processors/QueryPlan/ReadFromObjectStorageStep.h @@ -56,6 +56,7 @@ class ReadFromObjectStorageStep : public SourceStepWithFilter const bool need_only_count; const size_t max_block_size; size_t num_streams; + const size_t max_num_streams; const bool distributed_processing; void createIterator(); diff --git a/tests/queries/0_stateless/04040_object_storage_parallelize_output.reference b/tests/queries/0_stateless/04040_object_storage_parallelize_output.reference new file mode 100644 index 000000000000..4f746a3120fe --- /dev/null +++ b/tests/queries/0_stateless/04040_object_storage_parallelize_output.reference @@ -0,0 +1 @@ +Resize 1 → 4 diff --git a/tests/queries/0_stateless/04040_object_storage_parallelize_output.sh b/tests/queries/0_stateless/04040_object_storage_parallelize_output.sh new file mode 100755 index 000000000000..87a61b2f737e --- /dev/null +++ b/tests/queries/0_stateless/04040_object_storage_parallelize_output.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash +# Tags: no-fasttest + +# Verify that reading a single file from object storage (ENGINE = S3) +# parallelizes the pipeline output via Resize, so downstream processors +# like AggregatingTransform can run on multiple threads. +# Without this, queries on a single S3/data-lake Parquet file run +# entirely single-threaded (e.g. Q28 in ClickBench: 79× slower). + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +${CLICKHOUSE_CLIENT} --query " + CREATE TABLE test_s3_${CLICKHOUSE_DATABASE} (x UInt64, y String) + ENGINE = S3('http://localhost:19999/dummy.parquet', Parquet); +" + +# The pipeline should contain 'Resize 1 → 4' between the source and +# the processing transforms, proving the single source output is +# distributed across max_threads workers. +${CLICKHOUSE_CLIENT} --query " + EXPLAIN PIPELINE + SELECT count(), sum(x) FROM test_s3_${CLICKHOUSE_DATABASE} + GROUP BY y + SETTINGS max_threads = 4; +" | grep -o 'Resize 1 → 4' + +${CLICKHOUSE_CLIENT} --query "DROP TABLE test_s3_${CLICKHOUSE_DATABASE};"