diff --git a/src/Analyzer/FunctionNode.cpp b/src/Analyzer/FunctionNode.cpp index db2e4c558335..26b76784c6ca 100644 --- a/src/Analyzer/FunctionNode.cpp +++ b/src/Analyzer/FunctionNode.cpp @@ -12,6 +12,7 @@ #include #include +#include #include @@ -164,6 +165,13 @@ void FunctionNode::dumpTreeImpl(WriteBuffer & buffer, FormatState & format_state buffer << '\n' << std::string(indent + 2, ' ') << "WINDOW\n"; getWindowNode()->dumpTreeImpl(buffer, format_state, indent + 4); } + + if (!settings_changes.empty()) + { + buffer << '\n' << std::string(indent + 2, ' ') << "SETTINGS"; + for (const auto & change : settings_changes) + buffer << fmt::format(" {}={}", change.name, fieldToString(change.value)); + } } bool FunctionNode::isEqualImpl(const IQueryTreeNode & rhs, CompareOptions compare_options) const @@ -171,7 +179,7 @@ bool FunctionNode::isEqualImpl(const IQueryTreeNode & rhs, CompareOptions compar const auto & rhs_typed = assert_cast(rhs); if (function_name != rhs_typed.function_name || isAggregateFunction() != rhs_typed.isAggregateFunction() || isOrdinaryFunction() != rhs_typed.isOrdinaryFunction() || isWindowFunction() != rhs_typed.isWindowFunction() - || nulls_action != rhs_typed.nulls_action) + || nulls_action != rhs_typed.nulls_action || settings_changes != rhs_typed.settings_changes) return false; /// is_operator is ignored here because it affects only AST formatting @@ -206,6 +214,17 @@ void FunctionNode::updateTreeHashImpl(HashState & hash_state, CompareOptions com hash_state.update(isWindowFunction()); hash_state.update(nulls_action); + hash_state.update(settings_changes.size()); + for (const auto & change : settings_changes) + { + hash_state.update(change.name.size()); + hash_state.update(change.name); + + const auto & value_dump = change.value.dump(); + hash_state.update(value_dump.size()); + hash_state.update(value_dump); + } + /// is_operator is ignored here because it affects only AST formatting if (!compare_options.compare_types) @@ -230,6 +249,7 @@ QueryTreeNodePtr FunctionNode::cloneImpl() const result_function->nulls_action = nulls_action; result_function->wrap_with_nullable = wrap_with_nullable; result_function->is_operator = is_operator; + result_function->settings_changes = settings_changes; return result_function; } @@ -292,6 +312,14 @@ ASTPtr FunctionNode::toASTImpl(const ConvertToASTOptions & options) const function_ast->window_definition = window_node->toAST(new_options); } + if (!settings_changes.empty()) + { + auto settings_ast = make_intrusive(); + settings_ast->changes = settings_changes; + settings_ast->is_standalone = false; + function_ast->arguments->children.push_back(settings_ast); + } + return function_ast; } diff --git a/src/Analyzer/FunctionNode.h b/src/Analyzer/FunctionNode.h index c0005016def6..0ec99c9ab40c 100644 --- a/src/Analyzer/FunctionNode.h +++ b/src/Analyzer/FunctionNode.h @@ -10,6 +10,7 @@ #include #include #include +#include namespace DB { @@ -204,6 +205,18 @@ class FunctionNode final : public IQueryTreeNode wrap_with_nullable = true; } + /// Get settings changes passed to table function + const SettingsChanges & getSettingsChanges() const + { + return settings_changes; + } + + /// Set settings changes passed as last argument to table function + void setSettingsChanges(SettingsChanges settings_changes_) + { + settings_changes = std::move(settings_changes_); + } + void dumpTreeImpl(WriteBuffer & buffer, FormatState & format_state, size_t indent) const override; protected: @@ -228,6 +241,8 @@ class FunctionNode final : public IQueryTreeNode static constexpr size_t arguments_child_index = 1; static constexpr size_t window_child_index = 2; static constexpr size_t children_size = window_child_index + 1; + + SettingsChanges settings_changes; }; } diff --git a/src/Analyzer/QueryTreeBuilder.cpp b/src/Analyzer/QueryTreeBuilder.cpp index 80caa1c2ba86..284accbb1687 100644 --- a/src/Analyzer/QueryTreeBuilder.cpp +++ b/src/Analyzer/QueryTreeBuilder.cpp @@ -699,7 +699,12 @@ QueryTreeNodePtr QueryTreeBuilder::buildExpression(const ASTPtr & expression, co { const auto & function_arguments_list = function->arguments->as()->children; for (const auto & argument : function_arguments_list) - function_node->getArguments().getNodes().push_back(buildExpression(argument, context)); + { + if (const auto * ast_set = argument->as()) + function_node->setSettingsChanges(ast_set->changes); + else + function_node->getArguments().getNodes().push_back(buildExpression(argument, context)); + } } if (function->is_window_function) diff --git a/src/Analyzer/Resolve/QueryAnalyzer.cpp b/src/Analyzer/Resolve/QueryAnalyzer.cpp index f221978682ee..ecffbeaab0ba 100644 --- a/src/Analyzer/Resolve/QueryAnalyzer.cpp +++ b/src/Analyzer/Resolve/QueryAnalyzer.cpp @@ -3861,6 +3861,7 @@ void QueryAnalyzer::resolveTableFunction(QueryTreeNodePtr & table_function_node, { auto table_function_node_to_resolve_typed = std::make_shared(table_function_argument_function_name); table_function_node_to_resolve_typed->getArgumentsNode() = table_function_argument_function->getArgumentsNode(); + table_function_node_to_resolve_typed->setSettingsChanges(table_function_argument_function->getSettingsChanges()); QueryTreeNodePtr table_function_node_to_resolve = std::move(table_function_node_to_resolve_typed); if (table_function_argument_function_name == "view") diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index fda56c37291d..f7503c94af28 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -1930,7 +1930,7 @@ ClickHouse applies this setting when the query contains the product of object st Possible values: - `local` — Replaces the database and table in the subquery with local ones for the destination server (shard), leaving the normal `IN`/`JOIN.` -- `global` — Unsupported for now. Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.` +- `global` — Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.` Right table executes first and is added to the secondary query as temporay table. - `allow` — Default value. Allows the use of these types of subqueries. )", 0) \ \ @@ -7658,6 +7658,9 @@ Rewrite expressions like 'x IN subquery' to JOIN. This might be useful for optim )", EXPERIMENTAL) \ DECLARE(Bool, object_storage_remote_initiator, false, R"( Execute request to object storage as remote on one of object_storage_cluster nodes. +)", EXPERIMENTAL) \ + DECLARE(String, object_storage_remote_initiator_cluster, "", R"( +Cluster to choose remote initiator, when `object_storage_remote_initiator` is true. When empty, `object_storage_cluster` is used. )", EXPERIMENTAL) \ DECLARE(Bool, allow_experimental_iceberg_read_optimization, true, R"( Allow Iceberg read optimization based on Iceberg metadata. diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 55ba5928add7..3e1bb4759792 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -42,9 +42,9 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() addSettingsChanges(settings_changes_history, "26.1.3.20001.altinityantalya", { {"iceberg_partition_timezone", "", "", "New setting."}, - // {"object_storage_max_nodes", 0, 0, "Antalya: New setting"}, {"s3_propagate_credentials_to_other_storages", false, false, "New setting"}, {"export_merge_tree_part_filename_pattern", "", "{part_name}_{checksum}", "New setting"}, + {"object_storage_remote_initiator_cluster", "", "", "New setting."}, {"iceberg_metadata_staleness_ms", 0, 0, "New setting allowing using cached metadata version at READ operations to prevent fetching from remote catalog"}, }); addSettingsChanges(settings_changes_history, "26.1", @@ -247,26 +247,19 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"object_storage_cluster_join_mode", "allow", "allow", "New setting"}, {"lock_object_storage_task_distribution_ms", 500, 500, "New setting."}, {"allow_retries_in_cluster_requests", false, false, "New setting"}, - // {"object_storage_remote_initiator", false, false, "New setting."}, {"allow_experimental_export_merge_tree_part", false, true, "Turned ON by default for Antalya."}, {"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."}, {"export_merge_tree_partition_force_export", false, false, "New setting."}, {"export_merge_tree_partition_max_retries", 3, 3, "New setting."}, {"export_merge_tree_partition_manifest_ttl", 180, 180, "New setting."}, {"export_merge_tree_part_file_already_exists_policy", "skip", "skip", "New setting."}, - // {"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."}, {"hybrid_table_auto_cast_columns", true, true, "New setting to automatically cast Hybrid table columns when segments disagree on types. Default enabled."}, {"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."}, {"enable_alias_marker", true, true, "New setting."}, - // {"input_format_parquet_use_native_reader_v3", false, true, "Seems stable"}, - // {"input_format_parquet_verify_checksums", true, true, "New setting."}, - // {"output_format_parquet_write_checksums", false, true, "New setting."}, {"export_merge_tree_part_max_bytes_per_file", 0, 0, "New setting."}, {"export_merge_tree_part_max_rows_per_file", 0, 0, "New setting."}, {"export_merge_tree_partition_lock_inside_the_task", false, false, "New setting."}, {"export_merge_tree_partition_system_table_prefer_remote_information", true, true, "New setting."}, - // {"cluster_table_function_split_granularity", "file", "file", "New setting."}, - // {"cluster_table_function_buckets_batch_size", 0, 0, "New setting."}, {"export_merge_tree_part_throw_on_pending_mutations", true, true, "New setting."}, {"export_merge_tree_part_throw_on_pending_patch_parts", true, true, "New setting."}, {"object_storage_cluster", "", "", "Antalya: New setting"}, diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 3f09a0a595bf..6d9b480dc4d0 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -24,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -49,11 +51,10 @@ namespace Setting extern const SettingsBool async_query_sending_for_remote; extern const SettingsBool async_socket_for_remote; extern const SettingsBool skip_unavailable_shards; - extern const SettingsBool parallel_replicas_local_plan; - extern const SettingsString cluster_for_parallel_replicas; extern const SettingsNonZeroUInt64 max_parallel_replicas; extern const SettingsUInt64 object_storage_max_nodes; extern const SettingsBool object_storage_remote_initiator; + extern const SettingsString object_storage_remote_initiator_cluster; extern const SettingsObjectStorageClusterJoinMode object_storage_cluster_join_mode; } @@ -116,11 +117,14 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContext; using Base::Base; - explicit SearcherVisitor(std::unordered_set types_, ContextPtr context) : Base(context), types(types_) {} + explicit SearcherVisitor(std::unordered_set types_, size_t entry_, ContextPtr context) + : Base(context) + , types(types_) + , entry(entry_) {} bool needChildVisit(QueryTreeNodePtr & /*parent*/, QueryTreeNodePtr & /*child*/) { - return getSubqueryDepth() <= 2 && !passed_node; + return getSubqueryDepth() <= 2 && !passed_node && !current_entry; } void enterImpl(QueryTreeNodePtr & node) @@ -131,13 +135,19 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContextgetNodeType(); if (types.contains(node_type)) - passed_node = node; + { + ++current_entry; + if (current_entry == entry) + passed_node = node; + } } QueryTreeNodePtr getNode() const { return passed_node; } private: std::unordered_set types; + size_t entry; + size_t current_entry = 0; QueryTreeNodePtr passed_node; }; @@ -204,15 +214,24 @@ Converts localtable as t ON s3.key == t.key -to +to (object_storage_cluster_join_mode='local') SELECT s3.c1, s3.c2, s3.key FROM s3Cluster(...) AS s3 + +or (object_storage_cluster_join_mode='global') + + SELECT s3.c1, s3.c2, t.c3 + FROM + s3Cluster(...) as s3 + JOIN + values('key UInt32, data String', (1, 'one'), (2, 'two'), ...) as t + ON s3.key == t.key */ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( ASTPtr & query_to_send, - QueryTreeNodePtr query_tree, + SelectQueryInfo query_info, const ContextPtr & context) { auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode]; @@ -220,17 +239,17 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( { case ObjectStorageClusterJoinMode::LOCAL: { - auto info = getQueryTreeInfo(query_tree, context); + auto info = getQueryTreeInfo(query_info.query_tree, context); if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) { - auto modified_query_tree = query_tree->clone(); + auto modified_query_tree = query_info.query_tree->clone(); - SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); + SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, 1, context); left_table_expression_searcher.visit(modified_query_tree); auto table_function_node = left_table_expression_searcher.getNode(); if (!table_function_node) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find left table function node"); QueryTreeNodePtr query_tree_distributed; @@ -243,7 +262,7 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( } else if (info.has_cross_join) { - SearcherVisitor join_searcher({QueryTreeNodeType::CROSS_JOIN}, context); + SearcherVisitor join_searcher({QueryTreeNodeType::CROSS_JOIN}, 1, context); join_searcher.visit(modified_query_tree); auto cross_join_node = join_searcher.getNode(); if (!cross_join_node) @@ -298,8 +317,24 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( return; } case ObjectStorageClusterJoinMode::GLOBAL: - // TODO - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "`Global` mode for `object_storage_cluster_join_mode` setting is unimplemented for now"); + { + auto info = getQueryTreeInfo(query_info.query_tree, context); + + if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) + { + auto modified_query_tree = query_info.query_tree->clone(); + + rewriteJoinToGlobalJoin(modified_query_tree, context); + modified_query_tree = buildQueryTreeForShard( + query_info.planner_context, + modified_query_tree, + /*allow_global_join_for_right_table*/ true, + /*find_cross_join*/ true); + query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree); + } + + return; + } case ObjectStorageClusterJoinMode::ALLOW: // Do nothing special return; } @@ -330,14 +365,12 @@ void IStorageCluster::read( const auto & settings = context->getSettingsRef(); - auto cluster = getClusterImpl(context, cluster_name_from_settings, isObjectStorage() ? settings[Setting::object_storage_max_nodes] : 0); - /// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*) SharedHeader sample_block; ASTPtr query_to_send = query_info.query; - updateQueryWithJoinToSendIfNeeded(query_to_send, query_info.query_tree, context); + updateQueryWithJoinToSendIfNeeded(query_to_send, query_info, context); if (settings[Setting::allow_experimental_analyzer]) { @@ -352,9 +385,21 @@ void IStorageCluster::read( updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context); + /// In case the current node is not supposed to initiate the clustered query + /// Sends this query to a remote initiator using the `remote` table function if (settings[Setting::object_storage_remote_initiator]) { - auto storage_and_context = convertToRemote(cluster, context, cluster_name_from_settings, query_to_send); + /// Re-writes queries in the form of: + /// Input: SELECT * FROM iceberg(...) SETTINGS object_storage_cluster='swarm', object_storage_remote_initiator=1 + /// Output: SELECT * FROM remote('remote_host', icebergCluster('swarm', ...) + /// Where `remote_host` is a random host from the cluster which will execute the query + /// This means the initiator node belongs to the same cluster that will execute the query + /// In case remote_initiator_cluster_name is set, the initiator might be set to a different cluster + auto remote_initiator_cluster_name = settings[Setting::object_storage_remote_initiator_cluster].value; + if (remote_initiator_cluster_name.empty()) + remote_initiator_cluster_name = cluster_name_from_settings; + auto remote_initiator_cluster = getClusterImpl(context, remote_initiator_cluster_name); + auto storage_and_context = convertToRemote(remote_initiator_cluster, context, remote_initiator_cluster_name, query_to_send); auto src_distributed = std::dynamic_pointer_cast(storage_and_context.storage); auto modified_query_info = query_info; modified_query_info.cluster = src_distributed->getCluster(); @@ -363,6 +408,8 @@ void IStorageCluster::read( return; } + auto cluster = getClusterImpl(context, cluster_name_from_settings, isObjectStorage() ? settings[Setting::object_storage_max_nodes] : 0); + RestoreQualifiedNamesVisitor::Data data; data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_to_send->as(), 0)); data.remote_table.database = context->getCurrentDatabase(); @@ -375,6 +422,10 @@ void IStorageCluster::read( auto this_ptr = std::static_pointer_cast(shared_from_this()); + std::optional external_tables = std::nullopt; + if (query_info.planner_context && query_info.planner_context->getMutableQueryContext()) + external_tables = query_info.planner_context->getMutableQueryContext()->getExternalTables(); + auto reading = std::make_unique( column_names, query_info, @@ -385,7 +436,8 @@ void IStorageCluster::read( std::move(query_to_send), processed_stage, cluster, - log); + log, + external_tables); query_plan.addStep(std::move(reading)); } @@ -396,6 +448,10 @@ IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote( const std::string & cluster_name_from_settings, ASTPtr query_to_send) { + /// TODO: Allow to use secret for remote queries + if (!cluster->getSecret().empty()) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Can't convert query to remote when cluster uses secret"); + auto host_addresses = cluster->getShardsAddresses(); if (host_addresses.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty cluster {}", cluster_name_from_settings); @@ -417,6 +473,7 @@ IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote( /// Clean object_storage_remote_initiator setting to avoid infinite remote call auto new_context = Context::createCopy(context); new_context->setSetting("object_storage_remote_initiator", false); + new_context->setSetting("object_storage_remote_initiator_cluster", String("")); auto * select_query = query_to_send->as(); if (!select_query) @@ -436,7 +493,20 @@ IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote( if (!table_expression) throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table expression"); - auto remote_query = makeASTFunction(remote_function_name, make_intrusive(host_name), table_expression->table_function); + boost::intrusive_ptr remote_query; + + if (shard_addresses[0].user_specified) + { // with user/password for clsuter access remote query is executed from this user, add it in query parameters + remote_query = makeASTFunction(remote_function_name, + make_intrusive(host_name), + table_expression->table_function, + make_intrusive(shard_addresses[0].user), + make_intrusive(shard_addresses[0].password)); + } + else + { // without specified user/password remote query is executed from default user + remote_query = makeASTFunction(remote_function_name, make_intrusive(host_name), table_expression->table_function); + } table_expression->table_function = remote_query; @@ -504,7 +574,7 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const new_context, /*throttler=*/nullptr, scalars, - Tables(), + external_tables.has_value() ? *external_tables : Tables(), processed_stage, nullptr, RemoteQueryExecutor::Extension{.task_iterator = extension->task_iterator, .replica_info = std::move(replica_info)}); @@ -542,7 +612,7 @@ IStorageCluster::QueryTreeInfo IStorageCluster::getQueryTreeInfo(QueryTreeNodePt info.has_cross_join = true; } - SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); + SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, 1, context); left_table_expression_searcher.visit(query_tree); auto table_function_node = left_table_expression_searcher.getNode(); if (!table_function_node) @@ -577,9 +647,12 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( throw Exception(ErrorCodes::NOT_IMPLEMENTED, "object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true"); - auto info = getQueryTreeInfo(query_info.query_tree, context); - if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) - return QueryProcessingStage::Enum::FetchColumns; + if (object_storage_cluster_join_mode == ObjectStorageClusterJoinMode::LOCAL) + { + auto info = getQueryTreeInfo(query_info.query_tree, context); + if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) + return QueryProcessingStage::Enum::FetchColumns; + } } /// Initiator executes query on remote node. diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 69ebd0b777fb..65fa86589cad 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -63,7 +63,7 @@ class IStorageCluster : public IStorage protected: virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {} - void updateQueryWithJoinToSendIfNeeded(ASTPtr & query_to_send, QueryTreeNodePtr query_tree, const ContextPtr & context); + void updateQueryWithJoinToSendIfNeeded(ASTPtr & query_to_send, SelectQueryInfo query_info, const ContextPtr & context); virtual void updateConfigurationIfNeeded(ContextPtr /* context */) {} @@ -137,7 +137,8 @@ class ReadFromCluster : public SourceStepWithFilter ASTPtr query_to_send_, QueryProcessingStage::Enum processed_stage_, ClusterPtr cluster_, - LoggerPtr log_) + LoggerPtr log_, + std::optional external_tables_) : SourceStepWithFilter( std::move(sample_block), column_names_, @@ -149,6 +150,7 @@ class ReadFromCluster : public SourceStepWithFilter , processed_stage(processed_stage_) , cluster(std::move(cluster_)) , log(log_) + , external_tables(external_tables_) { } @@ -160,6 +162,7 @@ class ReadFromCluster : public SourceStepWithFilter LoggerPtr log; std::optional extension; + std::optional external_tables; void createExtension(const ActionsDAG::Node * predicate); ContextPtr updateSettings(const Settings & settings); diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index 13e3721a076e..812fc3a9287e 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -49,13 +49,16 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t numb saveLastNodeActivity(number_of_current_replica); - auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica); - if (processed_file_list_ptr == replica_to_files_to_be_processed.end()) - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Replica number {} was marked as lost, can't set task for it anymore", - number_of_current_replica - ); + { + std::lock_guard lock(mutex); + auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica); + if (processed_file_list_ptr == replica_to_files_to_be_processed.end()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Replica number {} was marked as lost, can't set task for it anymore", + number_of_current_replica + ); + } // 1. Check pre-queued files first auto file = getPreQueuedFile(number_of_current_replica); @@ -67,7 +70,19 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t numb file = getAnyUnprocessedFile(number_of_current_replica); if (file) - processed_file_list_ptr->second.push_back(file); + { + std::lock_guard lock(mutex); + auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica); + if (processed_file_list_ptr == replica_to_files_to_be_processed.end()) + { // It is possible that replica was lost after check in the begining of the method + auto file_identifier = getFileIdentifier(file); + auto file_replica_idx = getReplicaForFile(file_identifier); + unprocessed_files.emplace(file_identifier, std::make_pair(file, file_replica_idx)); + connection_to_files[file_replica_idx].push_back(file); + } + else + processed_file_list_ptr->second.push_back(file); + } return file; } @@ -125,7 +140,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t auto next_file = files.back(); files.pop_back(); - auto file_identifier = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : getAbsolutePathFromObjectInfo(next_file).value_or(next_file->getIdentifier()); + auto file_identifier = getFileIdentifier(next_file); auto it = unprocessed_files.find(file_identifier); if (it == unprocessed_files.end()) continue; @@ -170,18 +185,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter } } - String file_identifier; - if (send_over_whole_archive && object_info->isArchive()) - { - file_identifier = object_info->getPathOrPathToArchiveIfArchive(); - LOG_TEST(log, "Will send over the whole archive {} to replicas. " - "This will be suboptimal, consider turning on " - "cluster_function_process_archive_on_multiple_nodes setting", file_identifier); - } - else - { - file_identifier = getAbsolutePathFromObjectInfo(object_info).value_or(object_info->getIdentifier()); - } + String file_identifier = getFileIdentifier(object_info, true); if (iceberg_read_optimization_enabled) { @@ -194,7 +198,13 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter } } - size_t file_replica_idx = getReplicaForFile(file_identifier); + size_t file_replica_idx; + + { + std::lock_guard lock(mutex); + file_replica_idx = getReplicaForFile(file_identifier); + } + if (file_replica_idx == number_of_current_replica) { LOG_TRACE( @@ -250,7 +260,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s auto next_file = it->second.first; unprocessed_files.erase(it); - auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : getAbsolutePathFromObjectInfo(next_file).value_or(next_file->getPath()); + auto file_path = getFileIdentifier(next_file); LOG_TRACE( log, "Iterator exhausted. Assigning unprocessed file {} to replica {} from matched replica {}", @@ -310,13 +320,31 @@ void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_ "All replicas were marked as lost" ); - for (const auto & file : processed_file_list_ptr->second) + auto files = std::move(processed_file_list_ptr->second); + replica_to_files_to_be_processed.erase(number_of_current_replica); + for (const auto & file : files) { - auto file_replica_idx = getReplicaForFile(file->getPath()); - unprocessed_files.emplace(file->getPath(), std::make_pair(file, file_replica_idx)); + auto file_identifier = getFileIdentifier(file); + auto file_replica_idx = getReplicaForFile(file_identifier); + unprocessed_files.emplace(file_identifier, std::make_pair(file, file_replica_idx)); connection_to_files[file_replica_idx].push_back(file); } - replica_to_files_to_be_processed.erase(number_of_current_replica); +} + +String StorageObjectStorageStableTaskDistributor::getFileIdentifier(ObjectInfoPtr file_object, bool write_to_log) const +{ + if (send_over_whole_archive && file_object->isArchive()) + { + auto file_identifier = file_object->getPathOrPathToArchiveIfArchive(); + if (write_to_log) + { + LOG_TEST(log, "Will send over the whole archive {} to replicas. " + "This will be suboptimal, consider turning on " + "cluster_function_process_archive_on_multiple_nodes setting", file_identifier); + } + return file_identifier; + } + return getAbsolutePathFromObjectInfo(file_object).value_or(file_object->getIdentifier()); } } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h index 0cd10ac7188e..3a5a16998be3 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h @@ -41,6 +41,8 @@ class StorageObjectStorageStableTaskDistributor void saveLastNodeActivity(size_t number_of_current_replica); + String getFileIdentifier(ObjectInfoPtr file_object, bool write_to_log = false) const; + const std::shared_ptr iterator; const bool send_over_whole_archive; diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 0c0ad60c3e85..5d8c06418467 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -1057,6 +1057,7 @@ QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info, auto table_function_node = std::make_shared(remote_table_function_node.getFunctionName()); table_function_node->getArgumentsNode() = remote_table_function_node.getArgumentsNode(); + table_function_node->setSettingsChanges(remote_table_function_node.getSettingsChanges()); if (table_expression_modifiers) table_function_node->setTableExpressionModifiers(*table_expression_modifiers); diff --git a/src/Storages/buildQueryTreeForShard.cpp b/src/Storages/buildQueryTreeForShard.cpp index 273858fc3982..d07f2a816561 100644 --- a/src/Storages/buildQueryTreeForShard.cpp +++ b/src/Storages/buildQueryTreeForShard.cpp @@ -42,6 +42,7 @@ namespace Setting extern const SettingsBool prefer_global_in_and_join; extern const SettingsBool enable_add_distinct_to_in_subqueries; extern const SettingsInt64 optimize_const_name_size; + extern const SettingsObjectStorageClusterJoinMode object_storage_cluster_join_mode; } namespace ErrorCodes @@ -120,8 +121,9 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito using Base = InDepthQueryTreeVisitorWithContext; using Base::Base; - explicit DistributedProductModeRewriteInJoinVisitor(const ContextPtr & context_) + explicit DistributedProductModeRewriteInJoinVisitor(const ContextPtr & context_, bool find_cross_join_) : Base(context_) + , find_cross_join(find_cross_join_) {} struct InFunctionOrJoin @@ -157,9 +159,11 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito { auto * function_node = node->as(); auto * join_node = node->as(); + CrossJoinNode * cross_join_node = find_cross_join ? node->as() : nullptr; if ((function_node && isNameOfGlobalInFunction(function_node->getFunctionName())) || - (join_node && join_node->getLocality() == JoinLocality::Global)) + (join_node && join_node->getLocality() == JoinLocality::Global) || + cross_join_node) { InFunctionOrJoin in_function_or_join_entry; in_function_or_join_entry.query_node = node; @@ -223,7 +227,9 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito replacement_table_expression->setTableExpressionModifiers(*table_expression_modifiers); replacement_map.emplace(table_node.get(), std::move(replacement_table_expression)); } - else if ((distributed_product_mode == DistributedProductMode::GLOBAL || getSettings()[Setting::prefer_global_in_and_join]) && + else if ((distributed_product_mode == DistributedProductMode::GLOBAL || + getSettings()[Setting::prefer_global_in_and_join] || + (find_cross_join && getSettings()[Setting::object_storage_cluster_join_mode] == ObjectStorageClusterJoinMode::GLOBAL)) && !in_function_or_join_stack.empty()) { auto * in_or_join_node_to_modify = in_function_or_join_stack.back().query_node.get(); @@ -257,6 +263,8 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito std::vector in_function_or_join_stack; std::unordered_map replacement_map; std::vector global_in_or_join_nodes; + + bool find_cross_join = false; }; /** Replaces large constant values with `__getScalar` function calls to avoid @@ -504,14 +512,18 @@ QueryTreeNodePtr getSubqueryFromTableExpression( } -QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_context, QueryTreeNodePtr query_tree_to_modify, bool allow_global_join_for_right_table) +QueryTreeNodePtr buildQueryTreeForShard( + const PlannerContextPtr & planner_context, + QueryTreeNodePtr query_tree_to_modify, + bool allow_global_join_for_right_table, + bool find_cross_join) { CollectColumnSourceToColumnsVisitor collect_column_source_to_columns_visitor; collect_column_source_to_columns_visitor.visit(query_tree_to_modify); const auto & column_source_to_columns = collect_column_source_to_columns_visitor.getColumnSourceToColumns(); - DistributedProductModeRewriteInJoinVisitor visitor(planner_context->getQueryContext()); + DistributedProductModeRewriteInJoinVisitor visitor(planner_context->getQueryContext(), find_cross_join); visitor.visit(query_tree_to_modify); auto replacement_map = visitor.getReplacementMap(); @@ -550,6 +562,24 @@ QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_contex replacement_map.emplace(join_table_expression.get(), std::move(temporary_table_expression_node)); continue; } + if (auto * cross_join_node = global_in_or_join_node.query_node->as()) + { + auto tables_count = cross_join_node->getTableExpressions().size(); + for (size_t i = 1; i < tables_count; ++i) + { + QueryTreeNodePtr join_table_expression = cross_join_node->getTableExpressions()[i]; + + auto subquery_node = getSubqueryFromTableExpression(join_table_expression, column_source_to_columns, planner_context->getQueryContext()); + + auto temporary_table_expression_node = executeSubqueryNode(subquery_node, + planner_context->getMutableQueryContext(), + global_in_or_join_node.subquery_depth); + temporary_table_expression_node->setAlias(join_table_expression->getAlias()); + + replacement_map.emplace(join_table_expression.get(), std::move(temporary_table_expression_node)); + } + continue; + } if (auto * in_function_node = global_in_or_join_node.query_node->as()) { auto & in_function_subquery_node = in_function_node->getArguments().getNodes().at(1); @@ -661,7 +691,8 @@ class RewriteJoinToGlobalJoinVisitor : public InDepthQueryTreeVisitorWithContext { if (auto * join_node = node->as()) { - bool prefer_local_join = getContext()->getSettingsRef()[Setting::parallel_replicas_prefer_local_join]; + bool prefer_local_join = getContext()->getSettingsRef()[Setting::parallel_replicas_prefer_local_join] + && getContext()->getSettingsRef()[Setting::object_storage_cluster_join_mode] != ObjectStorageClusterJoinMode::GLOBAL; bool should_use_global_join = !prefer_local_join || !allStoragesAreMergeTree(join_node->getRightTableExpression()); if (should_use_global_join) join_node->setLocality(JoinLocality::Global); diff --git a/src/Storages/buildQueryTreeForShard.h b/src/Storages/buildQueryTreeForShard.h index 90cbfd36f660..bcbac10b55e0 100644 --- a/src/Storages/buildQueryTreeForShard.h +++ b/src/Storages/buildQueryTreeForShard.h @@ -16,7 +16,11 @@ using PlannerContextPtr = std::shared_ptr; class Context; using ContextPtr = std::shared_ptr; -QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_context, QueryTreeNodePtr query_tree_to_modify, bool allow_global_join_for_right_table); +QueryTreeNodePtr buildQueryTreeForShard( + const PlannerContextPtr & planner_context, + QueryTreeNodePtr query_tree_to_modify, + bool allow_global_join_for_right_table, + bool find_cross_join = false); void rewriteJoinToGlobalJoin(QueryTreeNodePtr query_tree_to_modify, ContextPtr context); diff --git a/src/TableFunctions/ITableFunctionCluster.h b/src/TableFunctions/ITableFunctionCluster.h index 975322d054b3..920f271f0535 100644 --- a/src/TableFunctions/ITableFunctionCluster.h +++ b/src/TableFunctions/ITableFunctionCluster.h @@ -75,9 +75,11 @@ class ITableFunctionCluster : public Base /// Cluster name is always the first cluster_name = checkAndGetLiteralArgument(args[0], "cluster_name"); - - if (!context->tryGetCluster(cluster_name)) - throw Exception(ErrorCodes::CLUSTER_DOESNT_EXIST, "Requested cluster '{}' not found", cluster_name); + /// Remove check cluster existing here + /// In query like + /// remote('remote_host', xxxCluster('remote_cluster', ...)) + /// 'remote_cluster' can be defined only on 'remote_host' + /// If cluster not exists, query falls later /// Just cut the first arg (cluster_name) and try to parse other table function arguments as is args.erase(args.begin()); diff --git a/tests/integration/test_s3_cluster/configs/cluster.xml b/tests/integration/test_s3_cluster/configs/cluster.xml index 0452a383a709..7f3dab539985 100644 --- a/tests/integration/test_s3_cluster/configs/cluster.xml +++ b/tests/integration/test_s3_cluster/configs/cluster.xml @@ -76,6 +76,39 @@ + + + + s0_0_1 + 9000 + foo + bar + + + s0_1_0 + 9000 + foo + bar + + + + + + baz + + + s0_0_1 + 9000 + foo + + + s0_1_0 + 9000 + foo + + + + diff --git a/tests/integration/test_s3_cluster/configs/hidden_clusters.xml b/tests/integration/test_s3_cluster/configs/hidden_clusters.xml new file mode 100644 index 000000000000..8816cca1c79b --- /dev/null +++ b/tests/integration/test_s3_cluster/configs/hidden_clusters.xml @@ -0,0 +1,20 @@ + + + + + + s0_0_1 + 9000 + foo + bar + + + s0_1_0 + 9000 + foo + bar + + + + + diff --git a/tests/integration/test_s3_cluster/configs/users.xml b/tests/integration/test_s3_cluster/configs/users.xml index 4b6ba057ecb1..95d2d329cac0 100644 --- a/tests/integration/test_s3_cluster/configs/users.xml +++ b/tests/integration/test_s3_cluster/configs/users.xml @@ -5,5 +5,9 @@ default 1 + + bar + default + diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 77626af827ee..5332c26cffec 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -116,7 +116,7 @@ def started_cluster(): ) cluster.add_instance( "c2.s0_0_0", - main_configs=["configs/cluster.xml", "configs/named_collections.xml"], + main_configs=["configs/cluster.xml", "configs/named_collections.xml", "configs/hidden_clusters.xml"], user_configs=["configs/users.xml"], macros={"replica": "replica1", "shard": "shard1"}, with_zookeeper=True, @@ -124,7 +124,7 @@ def started_cluster(): ) cluster.add_instance( "c2.s0_0_1", - main_configs=["configs/cluster.xml", "configs/named_collections.xml"], + main_configs=["configs/cluster.xml", "configs/named_collections.xml", "configs/hidden_clusters.xml"], user_configs=["configs/users.xml"], macros={"replica": "replica2", "shard": "shard1"}, with_zookeeper=True, @@ -141,7 +141,7 @@ def started_cluster(): yield cluster finally: - shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated/")) + shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated/"), ignore_errors=True) cluster.shutdown() @@ -1050,7 +1050,8 @@ def test_hive_partitioning(started_cluster, allow_experimental_analyzer): node.query("SET allow_experimental_analyzer = DEFAULT") -def test_joins(started_cluster): +@pytest.mark.parametrize("join_mode", ["local", "global"]) +def test_joins(started_cluster, join_mode): node = started_cluster.instances["s0_0_0"] # Table join_table only exists on the node 's0_0_0'. @@ -1084,7 +1085,7 @@ def test_joins(started_cluster): join_table AS t2 ON t1.value = t2.id ORDER BY t1.name - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) @@ -1107,7 +1108,7 @@ def test_joins(started_cluster): 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 ON t1.value = t2.id ORDER BY t1.name - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) @@ -1125,7 +1126,7 @@ def test_joins(started_cluster): ON t1.value = t2.id WHERE (t1.value % 2) ORDER BY t1.name - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) @@ -1144,7 +1145,7 @@ def test_joins(started_cluster): ON t1.value = t2.id WHERE (t2.id % 2) ORDER BY t1.name - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) @@ -1162,27 +1163,29 @@ def test_joins(started_cluster): ON t1.value = t2.id WHERE (t1.value % 2) AND ((t2.id % 3) == 2) ORDER BY t1.name - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) res = list(map(str.split, result5.splitlines())) assert len(res) == 6 + # With WHERE clause with global subquery result6 = node.query( f""" SELECT name FROM s3Cluster('cluster_simple', 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') - WHERE value IN (SELECT id FROM join_table) + WHERE value GLOBAL IN (SELECT id FROM join_table) ORDER BY name - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) res = list(map(str.split, result6.splitlines())) assert len(res) == 25 + # With WHERE clause without columns in condition result7 = node.query( f""" SELECT count() FROM @@ -1193,11 +1196,12 @@ def test_joins(started_cluster): join_table AS t2 ON 1 GROUP BY ALL - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) assert result7.strip() == "625" + # With WHERE clause without columns in condition and with local column in SELECT result8 = node.query( f""" SELECT count(), t2.id FROM @@ -1208,7 +1212,7 @@ def test_joins(started_cluster): join_table AS t2 ON 1 GROUP BY ALL - SETTINGS object_storage_cluster_join_mode='local'; + SETTINGS object_storage_cluster_join_mode='{join_mode}'; """ ) res = list(map(str.split, result8.splitlines())) @@ -1218,6 +1222,7 @@ def test_joins(started_cluster): def test_object_storage_remote_initiator(started_cluster): node = started_cluster.instances["s0_0_0"] + # Simple cluster query_id = uuid.uuid4().hex result = node.query( f""" @@ -1245,6 +1250,7 @@ def test_object_storage_remote_initiator(started_cluster): # initial node + describe table + remote initiator + 2 subqueries on replicas assert queries == ["5"] + # Cluster with dots in the host names query_id = uuid.uuid4().hex result = node.query( f""" @@ -1271,3 +1277,122 @@ def test_object_storage_remote_initiator(started_cluster): # initial node + describe table + remote initiator + 2 subqueries on replicas assert queries == ["5"] + + users = node.query( + f""" + SELECT DISTINCT hostname, user + FROM clusterAllReplicas('cluster_all', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id}' + ORDER BY ALL + FORMAT TSV + """ + ).splitlines() + + assert users == ["c2.s0_0_0\tdefault", + "c2.s0_0_1\tdefault", + "s0_0_0\tdefault"] + + # Cluster with user and password + query_id = uuid.uuid4().hex + result = node.query( + f""" + SELECT * from s3Cluster( + 'cluster_with_username_and_password', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon) + SETTINGS object_storage_remote_initiator=1 + """, + query_id = query_id, + ) + + assert result is not None + + node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_all'") + queries = node.query( + f""" + SELECT count() + FROM clusterAllReplicas('cluster_all', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id}' + FORMAT TSV + """ + ).splitlines() + + # initial node + describe table + remote initiator + 2 subqueries on replicas + assert queries == ["5"] + + users = node.query( + f""" + SELECT DISTINCT hostname, user + FROM clusterAllReplicas('cluster_all', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id}' + ORDER BY ALL + FORMAT TSV + """ + ).splitlines() + + assert users == ["s0_0_0\tdefault", + "s0_0_1\tfoo", + "s0_1_0\tfoo"] + + # Cluster with secret + query_id = uuid.uuid4().hex + result = node.query_and_get_error( + f""" + SELECT * from s3Cluster( + 'cluster_with_secret', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon) + SETTINGS object_storage_remote_initiator=1 + """, + query_id = query_id, + ) + + assert "Can't convert query to remote when cluster uses secret" in result + + # Different cluster for remote initiator and query execution + # with `hidden_cluster_with_username_and_password` existed only in `cluster_with_dots` nodes + query_id = uuid.uuid4().hex + + result = node.query( + f""" + SELECT * from s3( + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon) + SETTINGS + object_storage_remote_initiator=1, + object_storage_cluster='hidden_cluster_with_username_and_password', + object_storage_remote_initiator_cluster='cluster_with_dots' + """, + query_id = query_id, + ) + + assert result is not None + + node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_all'") + queries = node.query( + f""" + SELECT count() + FROM clusterAllReplicas('cluster_all', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id}' + FORMAT TSV + """ + ).splitlines() + + # initial node + describe table + remote initiator + 2 subqueries on replicas + assert queries == ["5"] + + users = node.query( + f""" + SELECT DISTINCT hostname, user + FROM clusterAllReplicas('cluster_all', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id}' + ORDER BY ALL + FORMAT TSV + """ + ).splitlines() + + # Random host from 'cluster_with_dots' for remote query + assert users[0] in ["c2.s0_0_0\tdefault", "c2.s0_0_1\tdefault"] + assert users[1:] == ["s0_0_0\tdefault", + "s0_0_1\tfoo", + "s0_1_0\tfoo"] diff --git a/tests/queries/0_stateless/01625_constraints_index_append.reference b/tests/queries/0_stateless/01625_constraints_index_append.reference index 4593d63d1cf7..b68b514ca8bd 100644 --- a/tests/queries/0_stateless/01625_constraints_index_append.reference +++ b/tests/queries/0_stateless/01625_constraints_index_append.reference @@ -13,7 +13,7 @@ Prewhere info Prewhere filter Prewhere filter column: less(multiply(2, b), 100) - Filter column: and(indexHint(greater(plus(i, 40), 0)), equals(a, 0)) (removed) + Filter column: and(equals(a, 0), indexHint(greater(plus(i, 40), 0))) (removed) Prewhere info Prewhere filter Prewhere filter column: equals(a, 0) @@ -24,7 +24,7 @@ Prewhere info Prewhere filter Prewhere filter column: greaterOrEquals(a, 0) - Filter column: and(indexHint(less(i, 100)), less(multiply(2, b), 100)) (removed) + Filter column: and(less(multiply(2, b), 100), indexHint(less(i, 100))) (removed) Prewhere info Prewhere filter Prewhere filter column: less(multiply(2, b), 100)