From 4a70d3bf22466ffa05e1d1d554d8add60d621ce8 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 6 Feb 2026 11:11:08 +0100 Subject: [PATCH 01/18] object_storage_cluster_join_mode global --- src/Storages/IStorageCluster.cpp | 81 ++++++++++++++++++++++++-------- src/Storages/IStorageCluster.h | 8 +++- 2 files changed, 67 insertions(+), 22 deletions(-) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 3058408f36bf..7ef8c4c7fc41 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -20,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -104,11 +106,14 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContext; using Base::Base; - explicit SearcherVisitor(std::unordered_set types_, ContextPtr context) : Base(context), types(types_) {} + explicit SearcherVisitor(std::unordered_set types_, size_t entry_, ContextPtr context) + : Base(context) + , types(types_) + , entry(entry_) {} bool needChildVisit(QueryTreeNodePtr & /*parent*/, QueryTreeNodePtr & /*child*/) { - return getSubqueryDepth() <= 2 && !passed_node; + return getSubqueryDepth() <= 2 && !passed_node && !current_entry; } void enterImpl(QueryTreeNodePtr & node) @@ -119,13 +124,19 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContextgetNodeType(); if (types.contains(node_type)) - passed_node = node; + { + ++current_entry; + if (current_entry == entry) + passed_node = node; + } } QueryTreeNodePtr getNode() const { return passed_node; } private: std::unordered_set types; + size_t entry; + size_t current_entry = 0; QueryTreeNodePtr passed_node; }; @@ -192,15 +203,24 @@ Converts localtable as t ON s3.key == t.key -to +to (object_storage_cluster_join_mode='local') SELECT s3.c1, s3.c2, s3.key FROM s3Cluster(...) AS s3 + +or (object_storage_cluster_join_mode='global') + + SELECT s3.c1, s3.c2, t.c3 + FROM + s3Cluster(...) as s3 + JOIN + values('key UInt32, data String', (1, 'one'), (2, 'two'), ...) as t + ON s3.key == t.key */ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( ASTPtr & query_to_send, - QueryTreeNodePtr query_tree, + SelectQueryInfo query_info, const ContextPtr & context) { auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode]; @@ -208,17 +228,17 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( { case ObjectStorageClusterJoinMode::LOCAL: { - auto info = getQueryTreeInfo(query_tree, context); + auto info = getQueryTreeInfo(query_info.query_tree, context); if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) { - auto modified_query_tree = query_tree->clone(); + auto modified_query_tree = query_info.query_tree->clone(); - SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); + SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, 1, context); left_table_expression_searcher.visit(modified_query_tree); auto table_function_node = left_table_expression_searcher.getNode(); if (!table_function_node) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find left table function node"); QueryTreeNodePtr query_tree_distributed; @@ -231,7 +251,7 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( } else if (info.has_cross_join) { - SearcherVisitor join_searcher({QueryTreeNodeType::CROSS_JOIN}, context); + SearcherVisitor join_searcher({QueryTreeNodeType::CROSS_JOIN}, 1, context); join_searcher.visit(modified_query_tree); auto cross_join_node = join_searcher.getNode(); if (!cross_join_node) @@ -286,8 +306,21 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( return; } case ObjectStorageClusterJoinMode::GLOBAL: - // TODO - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "`Global` mode for `object_storage_cluster_join_mode` setting is unimplemented for now"); + { + auto info = getQueryTreeInfo(query_info.query_tree, context); + + if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) + { + auto modified_query_tree = query_info.query_tree->clone(); + + rewriteJoinToGlobalJoin(modified_query_tree, context); + modified_query_tree = buildQueryTreeForShard(query_info.planner_context, modified_query_tree, /*allow_global_join_for_right_table*/ true); + query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree); + send_external_tables = true; + } + + return; + } case ObjectStorageClusterJoinMode::ALLOW: // Do nothing special return; } @@ -316,7 +349,7 @@ void IStorageCluster::read( SharedHeader sample_block; ASTPtr query_to_send = query_info.query; - updateQueryWithJoinToSendIfNeeded(query_to_send, query_info.query_tree, context); + updateQueryWithJoinToSendIfNeeded(query_to_send, query_info, context); if (context->getSettingsRef()[Setting::allow_experimental_analyzer]) { @@ -343,6 +376,10 @@ void IStorageCluster::read( auto this_ptr = std::static_pointer_cast(shared_from_this()); + std::optional external_tables = std::nullopt; + if (send_external_tables) + external_tables = query_info.planner_context->getMutableQueryContext()->getExternalTables(); + auto reading = std::make_unique( column_names, query_info, @@ -353,7 +390,8 @@ void IStorageCluster::read( std::move(query_to_send), processed_stage, cluster, - log); + log, + external_tables); query_plan.addStep(std::move(reading)); } @@ -401,7 +439,7 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const new_context, /*throttler=*/nullptr, scalars, - Tables(), + external_tables.has_value() ? *external_tables : Tables(), processed_stage, nullptr, RemoteQueryExecutor::Extension{.task_iterator = extension->task_iterator, .replica_info = std::move(replica_info)}); @@ -439,7 +477,7 @@ IStorageCluster::QueryTreeInfo IStorageCluster::getQueryTreeInfo(QueryTreeNodePt info.has_cross_join = true; } - SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); + SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, 1, context); left_table_expression_searcher.visit(query_tree); auto table_function_node = left_table_expression_searcher.getNode(); if (!table_function_node) @@ -472,11 +510,14 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( { if (!context->getSettingsRef()[Setting::allow_experimental_analyzer]) throw Exception(ErrorCodes::NOT_IMPLEMENTED, - "object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true"); + "object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=false"); - auto info = getQueryTreeInfo(query_info.query_tree, context); - if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) - return QueryProcessingStage::Enum::FetchColumns; + if (object_storage_cluster_join_mode == ObjectStorageClusterJoinMode::LOCAL) + { + auto info = getQueryTreeInfo(query_info.query_tree, context); + if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) + return QueryProcessingStage::Enum::FetchColumns; + } } /// Initiator executes query on remote node. diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 96964a02dd35..6a1641acc7e9 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -55,13 +55,14 @@ class IStorageCluster : public IStorage protected: virtual void updateBeforeRead(const ContextPtr &) {} virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {} - void updateQueryWithJoinToSendIfNeeded(ASTPtr & query_to_send, QueryTreeNodePtr query_tree, const ContextPtr & context); + void updateQueryWithJoinToSendIfNeeded(ASTPtr & query_to_send, SelectQueryInfo query_info, const ContextPtr & context); virtual void updateConfigurationIfNeeded(ContextPtr /* context */) {} private: LoggerPtr log; String cluster_name; + bool send_external_tables = false; struct QueryTreeInfo { @@ -91,7 +92,8 @@ class ReadFromCluster : public SourceStepWithFilter ASTPtr query_to_send_, QueryProcessingStage::Enum processed_stage_, ClusterPtr cluster_, - LoggerPtr log_) + LoggerPtr log_, + std::optional external_tables_) : SourceStepWithFilter( std::move(sample_block), column_names_, @@ -103,6 +105,7 @@ class ReadFromCluster : public SourceStepWithFilter , processed_stage(processed_stage_) , cluster(std::move(cluster_)) , log(log_) + , external_tables(external_tables_) { } @@ -114,6 +117,7 @@ class ReadFromCluster : public SourceStepWithFilter LoggerPtr log; std::optional extension; + std::optional external_tables; void createExtension(const ActionsDAG::Node * predicate); ContextPtr updateSettings(const Settings & settings); From 972f1ee2196c14d5a19c9f5cfad5d29ab701c3f5 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 26 Feb 2026 10:20:22 +0100 Subject: [PATCH 02/18] Global gross join --- src/Storages/IStorageCluster.cpp | 6 +++- src/Storages/buildQueryTreeForShard.cpp | 43 +++++++++++++++++++++---- src/Storages/buildQueryTreeForShard.h | 6 +++- 3 files changed, 47 insertions(+), 8 deletions(-) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 7ef8c4c7fc41..116ff5c8c7cd 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -314,7 +314,11 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( auto modified_query_tree = query_info.query_tree->clone(); rewriteJoinToGlobalJoin(modified_query_tree, context); - modified_query_tree = buildQueryTreeForShard(query_info.planner_context, modified_query_tree, /*allow_global_join_for_right_table*/ true); + modified_query_tree = buildQueryTreeForShard( + query_info.planner_context, + modified_query_tree, + /*allow_global_join_for_right_table*/ true, + /*find_cross_join*/ true); query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree); send_external_tables = true; } diff --git a/src/Storages/buildQueryTreeForShard.cpp b/src/Storages/buildQueryTreeForShard.cpp index 939dcfdfaa1a..cf43fa25c2f1 100644 --- a/src/Storages/buildQueryTreeForShard.cpp +++ b/src/Storages/buildQueryTreeForShard.cpp @@ -42,6 +42,7 @@ namespace Setting extern const SettingsBool prefer_global_in_and_join; extern const SettingsBool enable_add_distinct_to_in_subqueries; extern const SettingsInt64 optimize_const_name_size; + extern const SettingsObjectStorageClusterJoinMode object_storage_cluster_join_mode; } namespace ErrorCodes @@ -120,8 +121,9 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito using Base = InDepthQueryTreeVisitorWithContext; using Base::Base; - explicit DistributedProductModeRewriteInJoinVisitor(const ContextPtr & context_) + explicit DistributedProductModeRewriteInJoinVisitor(const ContextPtr & context_, bool find_cross_join_) : Base(context_) + , find_cross_join(find_cross_join_) {} struct InFunctionOrJoin @@ -157,9 +159,11 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito { auto * function_node = node->as(); auto * join_node = node->as(); + CrossJoinNode * cross_join_node = find_cross_join ? node->as() : nullptr; if ((function_node && isNameOfGlobalInFunction(function_node->getFunctionName())) || - (join_node && join_node->getLocality() == JoinLocality::Global)) + (join_node && join_node->getLocality() == JoinLocality::Global) || + cross_join_node) { InFunctionOrJoin in_function_or_join_entry; in_function_or_join_entry.query_node = node; @@ -223,7 +227,9 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito replacement_table_expression->setTableExpressionModifiers(*table_expression_modifiers); replacement_map.emplace(table_node.get(), std::move(replacement_table_expression)); } - else if ((distributed_product_mode == DistributedProductMode::GLOBAL || getSettings()[Setting::prefer_global_in_and_join]) && + else if ((distributed_product_mode == DistributedProductMode::GLOBAL || + getSettings()[Setting::prefer_global_in_and_join] || + (find_cross_join && getSettings()[Setting::object_storage_cluster_join_mode] == ObjectStorageClusterJoinMode::GLOBAL)) && !in_function_or_join_stack.empty()) { auto * in_or_join_node_to_modify = in_function_or_join_stack.back().query_node.get(); @@ -257,6 +263,8 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito std::vector in_function_or_join_stack; std::unordered_map replacement_map; std::vector global_in_or_join_nodes; + + bool find_cross_join = false; }; /** Replaces large constant values with `__getScalar` function calls to avoid @@ -504,14 +512,18 @@ QueryTreeNodePtr getSubqueryFromTableExpression( } -QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_context, QueryTreeNodePtr query_tree_to_modify, bool allow_global_join_for_right_table) +QueryTreeNodePtr buildQueryTreeForShard( + const PlannerContextPtr & planner_context, + QueryTreeNodePtr query_tree_to_modify, + bool allow_global_join_for_right_table, + bool find_cross_join) { CollectColumnSourceToColumnsVisitor collect_column_source_to_columns_visitor; collect_column_source_to_columns_visitor.visit(query_tree_to_modify); const auto & column_source_to_columns = collect_column_source_to_columns_visitor.getColumnSourceToColumns(); - DistributedProductModeRewriteInJoinVisitor visitor(planner_context->getQueryContext()); + DistributedProductModeRewriteInJoinVisitor visitor(planner_context->getQueryContext(), find_cross_join); visitor.visit(query_tree_to_modify); auto replacement_map = visitor.getReplacementMap(); @@ -550,6 +562,24 @@ QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_contex replacement_map.emplace(join_table_expression.get(), std::move(temporary_table_expression_node)); continue; } + if (auto * cross_join_node = global_in_or_join_node.query_node->as()) + { + auto tables_count = cross_join_node->getTableExpressions().size(); + for (size_t i = 1; i < tables_count; ++i) + { + QueryTreeNodePtr join_table_expression = cross_join_node->getTableExpressions()[i]; + + auto subquery_node = getSubqueryFromTableExpression(join_table_expression, column_source_to_columns, planner_context->getQueryContext()); + + auto temporary_table_expression_node = executeSubqueryNode(subquery_node, + planner_context->getMutableQueryContext(), + global_in_or_join_node.subquery_depth); + temporary_table_expression_node->setAlias(join_table_expression->getAlias()); + + replacement_map.emplace(join_table_expression.get(), std::move(temporary_table_expression_node)); + } + continue; + } if (auto * in_function_node = global_in_or_join_node.query_node->as()) { auto & in_function_subquery_node = in_function_node->getArguments().getNodes().at(1); @@ -661,7 +691,8 @@ class RewriteJoinToGlobalJoinVisitor : public InDepthQueryTreeVisitorWithContext { if (auto * join_node = node->as()) { - bool prefer_local_join = getContext()->getSettingsRef()[Setting::parallel_replicas_prefer_local_join]; + bool prefer_local_join = getContext()->getSettingsRef()[Setting::parallel_replicas_prefer_local_join] + && getContext()->getSettingsRef()[Setting::object_storage_cluster_join_mode] != ObjectStorageClusterJoinMode::GLOBAL; bool should_use_global_join = !prefer_local_join || !allStoragesAreMergeTree(join_node->getRightTableExpression()); if (should_use_global_join) join_node->setLocality(JoinLocality::Global); diff --git a/src/Storages/buildQueryTreeForShard.h b/src/Storages/buildQueryTreeForShard.h index 90cbfd36f660..bcbac10b55e0 100644 --- a/src/Storages/buildQueryTreeForShard.h +++ b/src/Storages/buildQueryTreeForShard.h @@ -16,7 +16,11 @@ using PlannerContextPtr = std::shared_ptr; class Context; using ContextPtr = std::shared_ptr; -QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_context, QueryTreeNodePtr query_tree_to_modify, bool allow_global_join_for_right_table); +QueryTreeNodePtr buildQueryTreeForShard( + const PlannerContextPtr & planner_context, + QueryTreeNodePtr query_tree_to_modify, + bool allow_global_join_for_right_table, + bool find_cross_join = false); void rewriteJoinToGlobalJoin(QueryTreeNodePtr query_tree_to_modify, ContextPtr context); From b6bf151389bb8a6a5983794b959ac021ca90a7a3 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Mon, 2 Mar 2026 12:48:35 +0100 Subject: [PATCH 03/18] Test for object_storage_cluster_join_mode='global' --- tests/integration/test_s3_cluster/test.py | 26 +++++++++++++---------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index f437d9afb11f..35a63c83617c 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -102,7 +102,7 @@ def started_cluster(): yield cluster finally: - shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated/")) + shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated/"), ignore_errors=True) cluster.shutdown() @@ -697,7 +697,8 @@ def test_hive_partitioning(started_cluster, allow_experimental_analyzer): node.query("SET allow_experimental_analyzer = DEFAULT") -def test_joins(started_cluster): +@pytest.mark.parametrize("join_mode", ["local", "global"]) +def test_joins(started_cluster, join_mode): node = started_cluster.instances["s0_0_0"] # Table join_table only exists on the node 's0_0_0'. @@ -731,7 +732,7 @@ def test_joins(started_cluster): join_table AS t2 ON t1.value = t2.id ORDER BY t1.name - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) @@ -754,7 +755,7 @@ def test_joins(started_cluster): 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 ON t1.value = t2.id ORDER BY t1.name - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) @@ -772,7 +773,7 @@ def test_joins(started_cluster): ON t1.value = t2.id WHERE (t1.value % 2) ORDER BY t1.name - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) @@ -791,7 +792,7 @@ def test_joins(started_cluster): ON t1.value = t2.id WHERE (t2.id % 2) ORDER BY t1.name - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) @@ -809,27 +810,29 @@ def test_joins(started_cluster): ON t1.value = t2.id WHERE (t1.value % 2) AND ((t2.id % 3) == 2) ORDER BY t1.name - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) res = list(map(str.split, result5.splitlines())) assert len(res) == 6 + # With WHERE clause with global subquery result6 = node.query( f""" SELECT name FROM s3Cluster('cluster_simple', 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') - WHERE value IN (SELECT id FROM join_table) + WHERE value GLOBAL IN (SELECT id FROM join_table) ORDER BY name - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) res = list(map(str.split, result6.splitlines())) assert len(res) == 25 + # With WHERE clause without columns in condition result7 = node.query( f""" SELECT count() FROM @@ -840,11 +843,12 @@ def test_joins(started_cluster): join_table AS t2 ON 1 GROUP BY ALL - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) assert result7.strip() == "625" + # With WHERE clause without columns in condition and with local column in SELECT result8 = node.query( f""" SELECT count(), t2.id FROM @@ -855,7 +859,7 @@ def test_joins(started_cluster): join_table AS t2 ON 1 GROUP BY ALL - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) res = list(map(str.split, result8.splitlines())) From 06b4edea156a9b9ecd6b58aaf176958539ca86ee Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Mon, 9 Mar 2026 17:31:16 +0100 Subject: [PATCH 04/18] Fix file identifier in rescheduleTasksFromReplica --- ...rageObjectStorageStableTaskDistributor.cpp | 38 +++++++++++-------- ...torageObjectStorageStableTaskDistributor.h | 2 + 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index 13e3721a076e..8700ddad19a0 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -125,7 +125,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t auto next_file = files.back(); files.pop_back(); - auto file_identifier = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : getAbsolutePathFromObjectInfo(next_file).value_or(next_file->getIdentifier()); + auto file_identifier = getFileIdentifier(next_file); auto it = unprocessed_files.find(file_identifier); if (it == unprocessed_files.end()) continue; @@ -170,18 +170,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter } } - String file_identifier; - if (send_over_whole_archive && object_info->isArchive()) - { - file_identifier = object_info->getPathOrPathToArchiveIfArchive(); - LOG_TEST(log, "Will send over the whole archive {} to replicas. " - "This will be suboptimal, consider turning on " - "cluster_function_process_archive_on_multiple_nodes setting", file_identifier); - } - else - { - file_identifier = getAbsolutePathFromObjectInfo(object_info).value_or(object_info->getIdentifier()); - } + String file_identifier = getFileIdentifier(object_info, true); if (iceberg_read_optimization_enabled) { @@ -250,7 +239,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s auto next_file = it->second.first; unprocessed_files.erase(it); - auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : getAbsolutePathFromObjectInfo(next_file).value_or(next_file->getPath()); + auto file_path = getFileIdentifier(next_file); LOG_TRACE( log, "Iterator exhausted. Assigning unprocessed file {} to replica {} from matched replica {}", @@ -312,11 +301,28 @@ void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_ for (const auto & file : processed_file_list_ptr->second) { - auto file_replica_idx = getReplicaForFile(file->getPath()); - unprocessed_files.emplace(file->getPath(), std::make_pair(file, file_replica_idx)); + auto file_identifier = getFileIdentifier(file); + auto file_replica_idx = getReplicaForFile(file_identifier); + unprocessed_files.emplace(file_identifier, std::make_pair(file, file_replica_idx)); connection_to_files[file_replica_idx].push_back(file); } replica_to_files_to_be_processed.erase(number_of_current_replica); } +String StorageObjectStorageStableTaskDistributor::getFileIdentifier(ObjectInfoPtr file_object, bool write_to_log) const +{ + if (send_over_whole_archive && file_object->isArchive()) + { + auto file_identifier = file_object->getPathOrPathToArchiveIfArchive(); + if (write_to_log) + { + LOG_TEST(log, "Will send over the whole archive {} to replicas. " + "This will be suboptimal, consider turning on " + "cluster_function_process_archive_on_multiple_nodes setting", file_identifier); + } + return file_identifier; + } + return getAbsolutePathFromObjectInfo(file_object).value_or(file_object->getIdentifier()); +} + } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h index 0cd10ac7188e..3a5a16998be3 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h @@ -41,6 +41,8 @@ class StorageObjectStorageStableTaskDistributor void saveLastNodeActivity(size_t number_of_current_replica); + String getFileIdentifier(ObjectInfoPtr file_object, bool write_to_log = false) const; + const std::shared_ptr iterator; const bool send_over_whole_archive; From e15af07db05d37d6ab5c6cf571bd8f3c4f533519 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 13 Mar 2026 17:41:51 +0100 Subject: [PATCH 05/18] Check context existing before use --- src/Storages/IStorageCluster.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 116ff5c8c7cd..fbb3de939fac 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -381,7 +381,7 @@ void IStorageCluster::read( auto this_ptr = std::static_pointer_cast(shared_from_this()); std::optional external_tables = std::nullopt; - if (send_external_tables) + if (send_external_tables && query_info.planner_context && query_info.planner_context->getMutableQueryContext()) external_tables = query_info.planner_context->getMutableQueryContext()->getExternalTables(); auto reading = std::make_unique( From 604c78876da585430c03f1189bcd722d8d891230 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Mon, 16 Mar 2026 12:26:37 +0100 Subject: [PATCH 06/18] Fix unsynchronized access to replica_to_files_to_be_processed --- ...rageObjectStorageStableTaskDistributor.cpp | 39 ++++++++++++++----- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index 8700ddad19a0..ce4a21cd2137 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -49,13 +49,16 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t numb saveLastNodeActivity(number_of_current_replica); - auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica); - if (processed_file_list_ptr == replica_to_files_to_be_processed.end()) - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Replica number {} was marked as lost, can't set task for it anymore", - number_of_current_replica - ); + { + std::lock_guard lock(mutex); + auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica); + if (processed_file_list_ptr == replica_to_files_to_be_processed.end()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Replica number {} was marked as lost, can't set task for it anymore", + number_of_current_replica + ); + } // 1. Check pre-queued files first auto file = getPreQueuedFile(number_of_current_replica); @@ -67,7 +70,19 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t numb file = getAnyUnprocessedFile(number_of_current_replica); if (file) - processed_file_list_ptr->second.push_back(file); + { + std::lock_guard lock(mutex); + auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica); + if (processed_file_list_ptr == replica_to_files_to_be_processed.end()) + { // It is possible that replica was lost after check in the begining of the method + auto file_identifier = getFileIdentifier(file); + auto file_replica_idx = getReplicaForFile(file_identifier); + unprocessed_files.emplace(file_identifier, std::make_pair(file, file_replica_idx)); + connection_to_files[file_replica_idx].push_back(file); + } + else + processed_file_list_ptr->second.push_back(file); + } return file; } @@ -183,7 +198,13 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter } } - size_t file_replica_idx = getReplicaForFile(file_identifier); + size_t file_replica_idx; + + { + std::lock_guard lock(mutex); + file_replica_idx = getReplicaForFile(file_identifier); + } + if (file_replica_idx == number_of_current_replica) { LOG_TRACE( From b69eb1967b9114ca717145332dd570b9adc1bc40 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Mon, 23 Mar 2026 10:53:18 +0100 Subject: [PATCH 07/18] Fix rescheduleTasksFromReplica --- .../StorageObjectStorageStableTaskDistributor.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index 13e3721a076e..2f9571b38b85 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -310,13 +310,14 @@ void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_ "All replicas were marked as lost" ); - for (const auto & file : processed_file_list_ptr->second) + auto files = std::move(processed_file_list_ptr->second); + replica_to_files_to_be_processed.erase(number_of_current_replica); + for (const auto & file : files) { auto file_replica_idx = getReplicaForFile(file->getPath()); unprocessed_files.emplace(file->getPath(), std::make_pair(file, file_replica_idx)); connection_to_files[file_replica_idx].push_back(file); } - replica_to_files_to_be_processed.erase(number_of_current_replica); } } From 0cd90a87ab7b2ad83d24df87d96af5d3de7858c2 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Mon, 23 Mar 2026 12:13:50 +0100 Subject: [PATCH 08/18] Try to fix again --- src/Storages/IStorageCluster.cpp | 5 ++--- src/Storages/IStorageCluster.h | 1 - 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 5888b2644879..12c14e61493e 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -331,7 +331,6 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( /*allow_global_join_for_right_table*/ true, /*find_cross_join*/ true); query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree); - send_external_tables = true; } return; @@ -412,7 +411,7 @@ void IStorageCluster::read( auto this_ptr = std::static_pointer_cast(shared_from_this()); std::optional external_tables = std::nullopt; - if (send_external_tables && query_info.planner_context && query_info.planner_context->getMutableQueryContext()) + if (query_info.planner_context && query_info.planner_context->getMutableQueryContext()) external_tables = query_info.planner_context->getMutableQueryContext()->getExternalTables(); auto reading = std::make_unique( @@ -615,7 +614,7 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( { if (!context->getSettingsRef()[Setting::allow_experimental_analyzer]) throw Exception(ErrorCodes::NOT_IMPLEMENTED, - "object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=false"); + "object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true"); if (object_storage_cluster_join_mode == ObjectStorageClusterJoinMode::LOCAL) { diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 6108673d60d4..65fa86589cad 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -108,7 +108,6 @@ class IStorageCluster : public IStorage LoggerPtr log; String cluster_name; - bool send_external_tables = false; struct QueryTreeInfo { From 904d00effc310cd7ef70c6ac81b45dd61e87cfc8 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Mon, 23 Mar 2026 16:37:59 +0100 Subject: [PATCH 09/18] Fix setting description --- src/Core/Settings.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 6e9d3ad9b276..7b6acb79b3c5 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -1930,7 +1930,7 @@ ClickHouse applies this setting when the query contains the product of object st Possible values: - `local` — Replaces the database and table in the subquery with local ones for the destination server (shard), leaving the normal `IN`/`JOIN.` -- `global` — Unsupported for now. Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.` +- `global` — Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.` Right table executes first and is added to the secondary query as temporay table. - `allow` — Default value. Allows the use of these types of subqueries. )", 0) \ \ From 000a737da41dc9eac812de9978ce682dc50a28dd Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 24 Mar 2026 09:49:02 +0100 Subject: [PATCH 10/18] User/password auth for object_storage_remote_initiator --- src/Storages/IStorageCluster.cpp | 17 ++++- .../test_s3_cluster/configs/cluster.xml | 33 +++++++++ .../test_s3_cluster/configs/users.xml | 4 + tests/integration/test_s3_cluster/test.py | 73 +++++++++++++++++++ 4 files changed, 126 insertions(+), 1 deletion(-) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 3f09a0a595bf..5efac8ba0f98 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -396,6 +396,10 @@ IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote( const std::string & cluster_name_from_settings, ASTPtr query_to_send) { + /// TODO: Allow to use secret for remote queries + if (!cluster->getSecret().empty()) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Can't convert query to remote when cluster uses secret"); + auto host_addresses = cluster->getShardsAddresses(); if (host_addresses.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty cluster {}", cluster_name_from_settings); @@ -436,7 +440,18 @@ IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote( if (!table_expression) throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table expression"); - auto remote_query = makeASTFunction(remote_function_name, make_intrusive(host_name), table_expression->table_function); + boost::intrusive_ptr remote_query; + + if (shard_addresses[0].user_specified) + { + remote_query = makeASTFunction(remote_function_name, + make_intrusive(host_name), + table_expression->table_function, + make_intrusive(shard_addresses[0].user), + make_intrusive(shard_addresses[0].password)); + } + else + remote_query = makeASTFunction(remote_function_name, make_intrusive(host_name), table_expression->table_function); table_expression->table_function = remote_query; diff --git a/tests/integration/test_s3_cluster/configs/cluster.xml b/tests/integration/test_s3_cluster/configs/cluster.xml index 0452a383a709..7f3dab539985 100644 --- a/tests/integration/test_s3_cluster/configs/cluster.xml +++ b/tests/integration/test_s3_cluster/configs/cluster.xml @@ -76,6 +76,39 @@ + + + + s0_0_1 + 9000 + foo + bar + + + s0_1_0 + 9000 + foo + bar + + + + + + baz + + + s0_0_1 + 9000 + foo + + + s0_1_0 + 9000 + foo + + + + diff --git a/tests/integration/test_s3_cluster/configs/users.xml b/tests/integration/test_s3_cluster/configs/users.xml index 4b6ba057ecb1..95d2d329cac0 100644 --- a/tests/integration/test_s3_cluster/configs/users.xml +++ b/tests/integration/test_s3_cluster/configs/users.xml @@ -5,5 +5,9 @@ default 1 + + bar + default + diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 77626af827ee..0bb542db6291 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -1218,6 +1218,7 @@ def test_joins(started_cluster): def test_object_storage_remote_initiator(started_cluster): node = started_cluster.instances["s0_0_0"] + # Simple cluster query_id = uuid.uuid4().hex result = node.query( f""" @@ -1245,6 +1246,7 @@ def test_object_storage_remote_initiator(started_cluster): # initial node + describe table + remote initiator + 2 subqueries on replicas assert queries == ["5"] + # Cluster with dots in the host names query_id = uuid.uuid4().hex result = node.query( f""" @@ -1271,3 +1273,74 @@ def test_object_storage_remote_initiator(started_cluster): # initial node + describe table + remote initiator + 2 subqueries on replicas assert queries == ["5"] + + users = node.query( + f""" + SELECT DISTINCT hostname, user + FROM clusterAllReplicas('cluster_all', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id}' + ORDER BY ALL + FORMAT TSV + """ + ).splitlines() + + assert users == ["c2.s0_0_0\tdefault", + "c2.s0_0_1\tdefault", + "s0_0_0\tdefault"] + + # Cluster with user and password + query_id = uuid.uuid4().hex + result = node.query( + f""" + SELECT * from s3Cluster( + 'cluster_with_username_and_password', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon) + SETTINGS object_storage_remote_initiator=1 + """, + query_id = query_id, + ) + + assert result is not None + + node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_all'") + queries = node.query( + f""" + SELECT count() + FROM clusterAllReplicas('cluster_all', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id}' + FORMAT TSV + """ + ).splitlines() + + # initial node + describe table + remote initiator + 2 subqueries on replicas + assert queries == ["5"] + + users = node.query( + f""" + SELECT DISTINCT hostname, user + FROM clusterAllReplicas('cluster_all', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id}' + ORDER BY ALL + FORMAT TSV + """ + ).splitlines() + + assert users == ["s0_0_0\tdefault", + "s0_0_1\tfoo", + "s0_1_0\tfoo"] + + # Cluster with secret + query_id = uuid.uuid4().hex + result = node.query_and_get_error( + f""" + SELECT * from s3Cluster( + 'cluster_with_secret', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon) + SETTINGS object_storage_remote_initiator=1 + """, + query_id = query_id, + ) + + assert "Can't convert query to remote when cluster uses secret" in result From d4b850d2d52ecdf7ced9b1a87b0608bc5bfa3732 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 24 Mar 2026 14:11:26 +0100 Subject: [PATCH 11/18] Keep function settings in remote call --- src/Analyzer/FunctionNode.cpp | 31 +++++++++++++++++++++++++- src/Analyzer/FunctionNode.h | 15 +++++++++++++ src/Analyzer/QueryTreeBuilder.cpp | 7 +++++- src/Analyzer/Resolve/QueryAnalyzer.cpp | 3 +++ src/Storages/StorageDistributed.cpp | 1 + 5 files changed, 55 insertions(+), 2 deletions(-) diff --git a/src/Analyzer/FunctionNode.cpp b/src/Analyzer/FunctionNode.cpp index db2e4c558335..d1a93bfc46b9 100644 --- a/src/Analyzer/FunctionNode.cpp +++ b/src/Analyzer/FunctionNode.cpp @@ -12,6 +12,7 @@ #include #include +#include #include @@ -164,6 +165,13 @@ void FunctionNode::dumpTreeImpl(WriteBuffer & buffer, FormatState & format_state buffer << '\n' << std::string(indent + 2, ' ') << "WINDOW\n"; getWindowNode()->dumpTreeImpl(buffer, format_state, indent + 4); } + + if (!settings_changes.empty()) + { + buffer << '\n' << std::string(indent + 2, ' ') << "SETTINGS"; + for (const auto & change : settings_changes) + buffer << fmt::format(" {}={}", change.name, fieldToString(change.value)); + } } bool FunctionNode::isEqualImpl(const IQueryTreeNode & rhs, CompareOptions compare_options) const @@ -171,7 +179,7 @@ bool FunctionNode::isEqualImpl(const IQueryTreeNode & rhs, CompareOptions compar const auto & rhs_typed = assert_cast(rhs); if (function_name != rhs_typed.function_name || isAggregateFunction() != rhs_typed.isAggregateFunction() || isOrdinaryFunction() != rhs_typed.isOrdinaryFunction() || isWindowFunction() != rhs_typed.isWindowFunction() - || nulls_action != rhs_typed.nulls_action) + || nulls_action != rhs_typed.nulls_action || settings_changes != rhs_typed.settings_changes) return false; /// is_operator is ignored here because it affects only AST formatting @@ -206,6 +214,17 @@ void FunctionNode::updateTreeHashImpl(HashState & hash_state, CompareOptions com hash_state.update(isWindowFunction()); hash_state.update(nulls_action); + hash_state.update(settings_changes.size()); + for (const auto & change : settings_changes) + { + hash_state.update(change.name.size()); + hash_state.update(change.name); + + const auto & value_dump = change.value.dump(); + hash_state.update(value_dump.size()); + hash_state.update(value_dump); + } + /// is_operator is ignored here because it affects only AST formatting if (!compare_options.compare_types) @@ -230,6 +249,7 @@ QueryTreeNodePtr FunctionNode::cloneImpl() const result_function->nulls_action = nulls_action; result_function->wrap_with_nullable = wrap_with_nullable; result_function->is_operator = is_operator; + result_function->settings_changes = settings_changes; return result_function; } @@ -292,6 +312,15 @@ ASTPtr FunctionNode::toASTImpl(const ConvertToASTOptions & options) const function_ast->window_definition = window_node->toAST(new_options); } + if (!settings_changes.empty()) + { + auto settings_ast = make_intrusive(); + settings_ast->changes = settings_changes; + settings_ast->is_standalone = false; + function_ast->arguments->children.push_back(settings_ast); + function_ast->children.push_back(std::move(settings_ast)); + } + return function_ast; } diff --git a/src/Analyzer/FunctionNode.h b/src/Analyzer/FunctionNode.h index c0005016def6..0ec99c9ab40c 100644 --- a/src/Analyzer/FunctionNode.h +++ b/src/Analyzer/FunctionNode.h @@ -10,6 +10,7 @@ #include #include #include +#include namespace DB { @@ -204,6 +205,18 @@ class FunctionNode final : public IQueryTreeNode wrap_with_nullable = true; } + /// Get settings changes passed to table function + const SettingsChanges & getSettingsChanges() const + { + return settings_changes; + } + + /// Set settings changes passed as last argument to table function + void setSettingsChanges(SettingsChanges settings_changes_) + { + settings_changes = std::move(settings_changes_); + } + void dumpTreeImpl(WriteBuffer & buffer, FormatState & format_state, size_t indent) const override; protected: @@ -228,6 +241,8 @@ class FunctionNode final : public IQueryTreeNode static constexpr size_t arguments_child_index = 1; static constexpr size_t window_child_index = 2; static constexpr size_t children_size = window_child_index + 1; + + SettingsChanges settings_changes; }; } diff --git a/src/Analyzer/QueryTreeBuilder.cpp b/src/Analyzer/QueryTreeBuilder.cpp index 80caa1c2ba86..284accbb1687 100644 --- a/src/Analyzer/QueryTreeBuilder.cpp +++ b/src/Analyzer/QueryTreeBuilder.cpp @@ -699,7 +699,12 @@ QueryTreeNodePtr QueryTreeBuilder::buildExpression(const ASTPtr & expression, co { const auto & function_arguments_list = function->arguments->as()->children; for (const auto & argument : function_arguments_list) - function_node->getArguments().getNodes().push_back(buildExpression(argument, context)); + { + if (const auto * ast_set = argument->as()) + function_node->setSettingsChanges(ast_set->changes); + else + function_node->getArguments().getNodes().push_back(buildExpression(argument, context)); + } } if (function->is_window_function) diff --git a/src/Analyzer/Resolve/QueryAnalyzer.cpp b/src/Analyzer/Resolve/QueryAnalyzer.cpp index f221978682ee..2bb294bb9ae1 100644 --- a/src/Analyzer/Resolve/QueryAnalyzer.cpp +++ b/src/Analyzer/Resolve/QueryAnalyzer.cpp @@ -57,6 +57,8 @@ #include #include +#include + namespace DB { namespace Setting @@ -3861,6 +3863,7 @@ void QueryAnalyzer::resolveTableFunction(QueryTreeNodePtr & table_function_node, { auto table_function_node_to_resolve_typed = std::make_shared(table_function_argument_function_name); table_function_node_to_resolve_typed->getArgumentsNode() = table_function_argument_function->getArgumentsNode(); + table_function_node_to_resolve_typed->setSettingsChanges(table_function_argument_function->getSettingsChanges()); QueryTreeNodePtr table_function_node_to_resolve = std::move(table_function_node_to_resolve_typed); if (table_function_argument_function_name == "view") diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 0c0ad60c3e85..5d8c06418467 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -1057,6 +1057,7 @@ QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info, auto table_function_node = std::make_shared(remote_table_function_node.getFunctionName()); table_function_node->getArgumentsNode() = remote_table_function_node.getArgumentsNode(); + table_function_node->setSettingsChanges(remote_table_function_node.getSettingsChanges()); if (table_expression_modifiers) table_function_node->setTableExpressionModifiers(*table_expression_modifiers); From 0c95253d2c93683b4f691418323f8f7d80bfbfb6 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 25 Mar 2026 10:20:14 +0100 Subject: [PATCH 12/18] object_storage_remote_initiator_cluster setting --- src/Core/Settings.cpp | 3 +++ src/Core/SettingsChangesHistory.cpp | 9 +-------- src/Storages/IStorageCluster.cpp | 13 ++++++++----- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 6e9d3ad9b276..8b020a1afee6 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -7640,6 +7640,9 @@ Rewrite expressions like 'x IN subquery' to JOIN. This might be useful for optim )", EXPERIMENTAL) \ DECLARE(Bool, object_storage_remote_initiator, false, R"( Execute request to object storage as remote on one of object_storage_cluster nodes. +)", EXPERIMENTAL) \ + DECLARE(String, object_storage_remote_initiator_cluster, "", R"( +Cluster to choose remote initiator, when `object_storage_remote_initiator` is true. When empty, `object_storage_cluster` is used. )", EXPERIMENTAL) \ DECLARE(Bool, allow_experimental_iceberg_read_optimization, true, R"( Allow Iceberg read optimization based on Iceberg metadata. diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 1088a8163b0b..6cc6617b62f6 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -42,9 +42,9 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() addSettingsChanges(settings_changes_history, "26.1.3.20001.altinityantalya", { {"iceberg_partition_timezone", "", "", "New setting."}, - // {"object_storage_max_nodes", 0, 0, "Antalya: New setting"}, {"s3_propagate_credentials_to_other_storages", false, false, "New setting"}, {"export_merge_tree_part_filename_pattern", "", "{part_name}_{checksum}", "New setting"}, + {"object_storage_remote_initiator_cluster", "", "", "New setting."}, }); addSettingsChanges(settings_changes_history, "26.1", { @@ -245,26 +245,19 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"object_storage_cluster_join_mode", "allow", "allow", "New setting"}, {"lock_object_storage_task_distribution_ms", 500, 500, "New setting."}, {"allow_retries_in_cluster_requests", false, false, "New setting"}, - // {"object_storage_remote_initiator", false, false, "New setting."}, {"allow_experimental_export_merge_tree_part", false, true, "Turned ON by default for Antalya."}, {"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."}, {"export_merge_tree_partition_force_export", false, false, "New setting."}, {"export_merge_tree_partition_max_retries", 3, 3, "New setting."}, {"export_merge_tree_partition_manifest_ttl", 180, 180, "New setting."}, {"export_merge_tree_part_file_already_exists_policy", "skip", "skip", "New setting."}, - // {"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."}, {"hybrid_table_auto_cast_columns", true, true, "New setting to automatically cast Hybrid table columns when segments disagree on types. Default enabled."}, {"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."}, {"enable_alias_marker", true, true, "New setting."}, - // {"input_format_parquet_use_native_reader_v3", false, true, "Seems stable"}, - // {"input_format_parquet_verify_checksums", true, true, "New setting."}, - // {"output_format_parquet_write_checksums", false, true, "New setting."}, {"export_merge_tree_part_max_bytes_per_file", 0, 0, "New setting."}, {"export_merge_tree_part_max_rows_per_file", 0, 0, "New setting."}, {"export_merge_tree_partition_lock_inside_the_task", false, false, "New setting."}, {"export_merge_tree_partition_system_table_prefer_remote_information", true, true, "New setting."}, - // {"cluster_table_function_split_granularity", "file", "file", "New setting."}, - // {"cluster_table_function_buckets_batch_size", 0, 0, "New setting."}, {"export_merge_tree_part_throw_on_pending_mutations", true, true, "New setting."}, {"export_merge_tree_part_throw_on_pending_patch_parts", true, true, "New setting."}, {"object_storage_cluster", "", "", "Antalya: New setting"}, diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 5efac8ba0f98..a05c9a6edf63 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -49,11 +49,10 @@ namespace Setting extern const SettingsBool async_query_sending_for_remote; extern const SettingsBool async_socket_for_remote; extern const SettingsBool skip_unavailable_shards; - extern const SettingsBool parallel_replicas_local_plan; - extern const SettingsString cluster_for_parallel_replicas; extern const SettingsNonZeroUInt64 max_parallel_replicas; extern const SettingsUInt64 object_storage_max_nodes; extern const SettingsBool object_storage_remote_initiator; + extern const SettingsString object_storage_remote_initiator_cluster; extern const SettingsObjectStorageClusterJoinMode object_storage_cluster_join_mode; } @@ -330,8 +329,6 @@ void IStorageCluster::read( const auto & settings = context->getSettingsRef(); - auto cluster = getClusterImpl(context, cluster_name_from_settings, isObjectStorage() ? settings[Setting::object_storage_max_nodes] : 0); - /// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*) SharedHeader sample_block; @@ -354,7 +351,11 @@ void IStorageCluster::read( if (settings[Setting::object_storage_remote_initiator]) { - auto storage_and_context = convertToRemote(cluster, context, cluster_name_from_settings, query_to_send); + auto remote_initiator_cluster_name = settings[Setting::object_storage_remote_initiator_cluster].value; + if (remote_initiator_cluster_name.empty()) + remote_initiator_cluster_name = cluster_name_from_settings; + auto remote_initiator_cluster = getClusterImpl(context, remote_initiator_cluster_name); + auto storage_and_context = convertToRemote(remote_initiator_cluster, context, remote_initiator_cluster_name, query_to_send); auto src_distributed = std::dynamic_pointer_cast(storage_and_context.storage); auto modified_query_info = query_info; modified_query_info.cluster = src_distributed->getCluster(); @@ -363,6 +364,8 @@ void IStorageCluster::read( return; } + auto cluster = getClusterImpl(context, cluster_name_from_settings, isObjectStorage() ? settings[Setting::object_storage_max_nodes] : 0); + RestoreQualifiedNamesVisitor::Data data; data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_to_send->as(), 0)); data.remote_table.database = context->getCurrentDatabase(); From 8dab965e62d204d0c986c027021e56db7f50d616 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 25 Mar 2026 11:22:03 +0100 Subject: [PATCH 13/18] Fix remote query with clustre, unknown on initial node --- src/TableFunctions/ITableFunctionCluster.h | 8 +-- .../configs/hidden_clusters.xml | 20 +++++++ tests/integration/test_s3_cluster/test.py | 52 ++++++++++++++++++- 3 files changed, 75 insertions(+), 5 deletions(-) create mode 100644 tests/integration/test_s3_cluster/configs/hidden_clusters.xml diff --git a/src/TableFunctions/ITableFunctionCluster.h b/src/TableFunctions/ITableFunctionCluster.h index 975322d054b3..920f271f0535 100644 --- a/src/TableFunctions/ITableFunctionCluster.h +++ b/src/TableFunctions/ITableFunctionCluster.h @@ -75,9 +75,11 @@ class ITableFunctionCluster : public Base /// Cluster name is always the first cluster_name = checkAndGetLiteralArgument(args[0], "cluster_name"); - - if (!context->tryGetCluster(cluster_name)) - throw Exception(ErrorCodes::CLUSTER_DOESNT_EXIST, "Requested cluster '{}' not found", cluster_name); + /// Remove check cluster existing here + /// In query like + /// remote('remote_host', xxxCluster('remote_cluster', ...)) + /// 'remote_cluster' can be defined only on 'remote_host' + /// If cluster not exists, query falls later /// Just cut the first arg (cluster_name) and try to parse other table function arguments as is args.erase(args.begin()); diff --git a/tests/integration/test_s3_cluster/configs/hidden_clusters.xml b/tests/integration/test_s3_cluster/configs/hidden_clusters.xml new file mode 100644 index 000000000000..8816cca1c79b --- /dev/null +++ b/tests/integration/test_s3_cluster/configs/hidden_clusters.xml @@ -0,0 +1,20 @@ + + + + + + s0_0_1 + 9000 + foo + bar + + + s0_1_0 + 9000 + foo + bar + + + + + diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 0bb542db6291..43fd2d48c81f 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -116,7 +116,7 @@ def started_cluster(): ) cluster.add_instance( "c2.s0_0_0", - main_configs=["configs/cluster.xml", "configs/named_collections.xml"], + main_configs=["configs/cluster.xml", "configs/named_collections.xml", "configs/hidden_clusters.xml"], user_configs=["configs/users.xml"], macros={"replica": "replica1", "shard": "shard1"}, with_zookeeper=True, @@ -124,7 +124,7 @@ def started_cluster(): ) cluster.add_instance( "c2.s0_0_1", - main_configs=["configs/cluster.xml", "configs/named_collections.xml"], + main_configs=["configs/cluster.xml", "configs/named_collections.xml", "configs/hidden_clusters.xml"], user_configs=["configs/users.xml"], macros={"replica": "replica2", "shard": "shard1"}, with_zookeeper=True, @@ -1344,3 +1344,51 @@ def test_object_storage_remote_initiator(started_cluster): ) assert "Can't convert query to remote when cluster uses secret" in result + + # Different cluster for remote initiator and query execution + # with `hidden_cluster_with_username_and_password` existed only in `cluster_with_dots` nodes + query_id = uuid.uuid4().hex + + result = node.query( + f""" + SELECT * from s3( + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon) + SETTINGS + object_storage_remote_initiator=1, + object_storage_cluster='hidden_cluster_with_username_and_password', + object_storage_remote_initiator_cluster='cluster_with_dots' + """, + query_id = query_id, + ) + + assert result is not None + + node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_all'") + queries = node.query( + f""" + SELECT count() + FROM clusterAllReplicas('cluster_all', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id}' + FORMAT TSV + """ + ).splitlines() + + # initial node + describe table + remote initiator + 2 subqueries on replicas + assert queries == ["5"] + + users = node.query( + f""" + SELECT DISTINCT hostname, user + FROM clusterAllReplicas('cluster_all', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id}' + ORDER BY ALL + FORMAT TSV + """ + ).splitlines() + + # Random host from 'cluster_with_dots' for remote query + assert users[0] in ["c2.s0_0_0\tdefault", "c2.s0_0_1\tdefault"] + assert users[1:] == ["s0_0_0\tdefault", + "s0_0_1\tfoo", + "s0_1_0\tfoo"] From a0d1972000a7735b35f02f87b416a6949efc9552 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 25 Mar 2026 13:20:08 +0100 Subject: [PATCH 14/18] Fix FunctionNode::toASTImpl --- src/Analyzer/FunctionNode.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Analyzer/FunctionNode.cpp b/src/Analyzer/FunctionNode.cpp index d1a93bfc46b9..26b76784c6ca 100644 --- a/src/Analyzer/FunctionNode.cpp +++ b/src/Analyzer/FunctionNode.cpp @@ -318,7 +318,6 @@ ASTPtr FunctionNode::toASTImpl(const ConvertToASTOptions & options) const settings_ast->changes = settings_changes; settings_ast->is_standalone = false; function_ast->arguments->children.push_back(settings_ast); - function_ast->children.push_back(std::move(settings_ast)); } return function_ast; From d7c4beebbdbfef007d02ce4a084efb7d1db512ef Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 25 Mar 2026 14:04:30 +0100 Subject: [PATCH 15/18] Remove unused header --- src/Analyzer/Resolve/QueryAnalyzer.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/Analyzer/Resolve/QueryAnalyzer.cpp b/src/Analyzer/Resolve/QueryAnalyzer.cpp index 2bb294bb9ae1..ecffbeaab0ba 100644 --- a/src/Analyzer/Resolve/QueryAnalyzer.cpp +++ b/src/Analyzer/Resolve/QueryAnalyzer.cpp @@ -57,8 +57,6 @@ #include #include -#include - namespace DB { namespace Setting From df2595a7ce236f8e8a76d2d4aa3ea303e0837ea9 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 26 Mar 2026 12:26:21 +0100 Subject: [PATCH 16/18] Fix test --- .../0_stateless/01625_constraints_index_append.reference | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/queries/0_stateless/01625_constraints_index_append.reference b/tests/queries/0_stateless/01625_constraints_index_append.reference index 4593d63d1cf7..b68b514ca8bd 100644 --- a/tests/queries/0_stateless/01625_constraints_index_append.reference +++ b/tests/queries/0_stateless/01625_constraints_index_append.reference @@ -13,7 +13,7 @@ Prewhere info Prewhere filter Prewhere filter column: less(multiply(2, b), 100) - Filter column: and(indexHint(greater(plus(i, 40), 0)), equals(a, 0)) (removed) + Filter column: and(equals(a, 0), indexHint(greater(plus(i, 40), 0))) (removed) Prewhere info Prewhere filter Prewhere filter column: equals(a, 0) @@ -24,7 +24,7 @@ Prewhere info Prewhere filter Prewhere filter column: greaterOrEquals(a, 0) - Filter column: and(indexHint(less(i, 100)), less(multiply(2, b), 100)) (removed) + Filter column: and(less(multiply(2, b), 100), indexHint(less(i, 100))) (removed) Prewhere info Prewhere filter Prewhere filter column: less(multiply(2, b), 100) From d8b6b829b666595f3c2b6ba5e1bf9b301a343bfb Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 27 Mar 2026 15:51:27 +0100 Subject: [PATCH 17/18] Add comments --- src/Storages/IStorageCluster.cpp | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index a05c9a6edf63..585b4182292f 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -349,8 +349,16 @@ void IStorageCluster::read( updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context); + /// In case the current node is not supposed to initiate the clustered query + /// Sends this query to a remote initiator using the `remote` table function if (settings[Setting::object_storage_remote_initiator]) { + /// Re-writes queries in the form of: + /// Input: SELECT * FROM iceberg(...) SETTINGS object_storage_cluster='swarm', object_storage_remote_initiator=1 + /// Output: SELECT * FROM remote('remote_host', icebergCluster('swarm', ...) + /// Where `remote_host` is a random host from the cluster which will execute the query + /// This means the initiator node belongs to the same cluster that will execute the query + /// In case remote_initiator_cluster_name is set, the initiator might be set to a different cluster auto remote_initiator_cluster_name = settings[Setting::object_storage_remote_initiator_cluster].value; if (remote_initiator_cluster_name.empty()) remote_initiator_cluster_name = cluster_name_from_settings; @@ -424,6 +432,7 @@ IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote( /// Clean object_storage_remote_initiator setting to avoid infinite remote call auto new_context = Context::createCopy(context); new_context->setSetting("object_storage_remote_initiator", false); + new_context->setSetting("object_storage_remote_initiator_cluster", false); auto * select_query = query_to_send->as(); if (!select_query) @@ -446,7 +455,7 @@ IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote( boost::intrusive_ptr remote_query; if (shard_addresses[0].user_specified) - { + { // with user/password for clsuter access remote query is executed from this user, add it in query parameters remote_query = makeASTFunction(remote_function_name, make_intrusive(host_name), table_expression->table_function, @@ -454,7 +463,9 @@ IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote( make_intrusive(shard_addresses[0].password)); } else + { // without specified user/password remote query is executed from default user remote_query = makeASTFunction(remote_function_name, make_intrusive(host_name), table_expression->table_function); + } table_expression->table_function = remote_query; From a5eee1de2f6431b87d8293555ca84616dfa76356 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Mon, 30 Mar 2026 11:10:47 +0200 Subject: [PATCH 18/18] Fix setting object_storage_remote_initiator_cluster cleanup --- src/Storages/IStorageCluster.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 585b4182292f..2f1caaa6b974 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -432,7 +432,7 @@ IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote( /// Clean object_storage_remote_initiator setting to avoid infinite remote call auto new_context = Context::createCopy(context); new_context->setSetting("object_storage_remote_initiator", false); - new_context->setSetting("object_storage_remote_initiator_cluster", false); + new_context->setSetting("object_storage_remote_initiator_cluster", String("")); auto * select_query = query_to_send->as(); if (!select_query)