From c5b35408c39adb517cee839cc2aff1d266839eaa Mon Sep 17 00:00:00 2001 From: Andrey Zvonov <32552679+zvonand@users.noreply.github.com> Date: Wed, 6 May 2026 21:16:58 +0200 Subject: [PATCH 1/2] Cherry-pick of https://github.com/Altinity/ClickHouse/pull/1646 with unresolved conflict markers (resolution in next commit) --- Original cherry-pick message follows: Merge pull request #1646 from Altinity/frontport/antalya-26.3/fix_remote_calls 26.3 Antalya port - fixes for s3Cluster distributed calls # Conflicts: # src/Processors/QueryPlan/ObjectFilterStep.cpp # src/Processors/QueryPlan/ObjectFilterStep.h # src/Processors/QueryPlan/ReadFromRemote.cpp # src/QueryPipeline/RemoteQueryExecutor.h # src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp # tests/integration/test_database_iceberg/test.py # tests/integration/test_s3_cluster/test.py --- src/Core/Settings.cpp | 16 + src/Core/Settings.h | 1 + src/Core/SettingsChangesHistory.cpp | 3 + src/Core/SettingsEnums.cpp | 5 + src/Core/SettingsEnums.h | 10 + .../ClusterProxy/executeQuery.cpp | 1 + src/Interpreters/Context.cpp | 5 +- src/Interpreters/PreparedSets.cpp | 26 +- src/Interpreters/PreparedSets.h | 7 + src/Planner/Planner.cpp | 28 ++ src/Planner/PlannerJoinTree.cpp | 4 +- src/Processors/QueryPlan/ObjectFilterStep.cpp | 4 + src/Processors/QueryPlan/ObjectFilterStep.h | 8 + .../QueryPlan/QueryPlanStepRegistry.cpp | 1 + src/Processors/QueryPlan/ReadFromRemote.cpp | 12 +- src/Processors/QueryPlan/ReadFromRemote.h | 2 + src/QueryPipeline/RemoteQueryExecutor.cpp | 11 +- src/QueryPipeline/RemoteQueryExecutor.h | 9 + src/Storages/IStorageCluster.cpp | 276 ++++++++++++- src/Storages/IStorageCluster.h | 10 + .../StorageObjectStorageCluster.cpp | 4 + .../extractTableFunctionFromSelectQuery.cpp | 32 +- .../extractTableFunctionFromSelectQuery.h | 4 + .../integration/test_database_iceberg/test.py | 147 ++++++- tests/integration/test_s3_cluster/test.py | 362 ++++++++++++++++++ .../test_cluster_joins.py | 154 ++++++++ .../test_cluster_table_function.py | 18 + .../0_stateless/02126_dist_desc.sql.j2 | 2 +- .../03550_analyzer_remote_view_columns.sql | 2 +- ...0_analyzer_distributed_global_in.reference | 2 +- .../03620_analyzer_distributed_global_in.sql | 2 +- 31 files changed, 1149 insertions(+), 19 deletions(-) create mode 100644 tests/integration/test_storage_iceberg_with_spark/test_cluster_joins.py diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 6cb66f160596..ccb12fbdd058 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -2010,6 +2010,22 @@ Possible values: - `global` — Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.` - `allow` — Allows the use of these types of subqueries. )", IMPORTANT) \ + DECLARE(ObjectStorageClusterJoinMode, object_storage_cluster_join_mode, ObjectStorageClusterJoinMode::ALLOW, R"( +Changes the behaviour of object storage cluster function or table. + +ClickHouse applies this setting when the query contains the product of object storage cluster function or table, i.e. when the query for a object storage cluster function or table contains a non-GLOBAL subquery for the object storage cluster function or table. + +Restrictions: + +- Only applied for JOIN subqueries. +- Only if the FROM section uses a object storage cluster function or table. + +Possible values: + +- `local` — Replaces the database and table in the subquery with local ones for the destination server (shard), leaving the normal `IN`/`JOIN.` +- `global` — Unsupported for now. Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.` +- `allow` — Default value. Allows the use of these types of subqueries. +)", 0) \ \ DECLARE(UInt64, max_concurrent_queries_for_all_users, 0, R"( Throw exception if the value of this setting is less or equal than the current number of simultaneously processed queries. diff --git a/src/Core/Settings.h b/src/Core/Settings.h index fb54d08ef3a9..9528f70a5c5f 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -60,6 +60,7 @@ class WriteBuffer; M(CLASS_NAME, DistributedCachePoolBehaviourOnLimit) /* Cloud only */ \ M(CLASS_NAME, DistributedDDLOutputMode) \ M(CLASS_NAME, DistributedProductMode) \ + M(CLASS_NAME, ObjectStorageClusterJoinMode) \ M(CLASS_NAME, Double) \ M(CLASS_NAME, EscapingRule) \ M(CLASS_NAME, Float) \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 677519aeb09a..9db2b1370f8d 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -97,6 +97,9 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() addSettingsChanges(settings_changes_history, "26.3.10.20001.altinityantalya", { // {"export_merge_tree_partition_task_timeout_seconds", "3600", "86400", "Increase default value to make it more realistic"}, + addSettingsChanges(settings_changes_history, "26.3.1.20001.altinityantalya", + { + {"object_storage_cluster_join_mode", "allow", "allow", "New setting"}, }); addSettingsChanges(settings_changes_history, "26.3", { diff --git a/src/Core/SettingsEnums.cpp b/src/Core/SettingsEnums.cpp index 7628c344d509..cddb9a99649f 100644 --- a/src/Core/SettingsEnums.cpp +++ b/src/Core/SettingsEnums.cpp @@ -96,6 +96,11 @@ IMPLEMENT_SETTING_ENUM(DistributedProductMode, ErrorCodes::UNKNOWN_DISTRIBUTED_P {"global", DistributedProductMode::GLOBAL}, {"allow", DistributedProductMode::ALLOW}}) +IMPLEMENT_SETTING_ENUM(ObjectStorageClusterJoinMode, ErrorCodes::BAD_ARGUMENTS, + {{"local", ObjectStorageClusterJoinMode::LOCAL}, + {"global", ObjectStorageClusterJoinMode::GLOBAL}, + {"allow", ObjectStorageClusterJoinMode::ALLOW}}) + IMPLEMENT_SETTING_ENUM(QueryResultCacheNondeterministicFunctionHandling, ErrorCodes::BAD_ARGUMENTS, {{"throw", QueryResultCacheNondeterministicFunctionHandling::Throw}, diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index ac9e068d3d1d..9e5e671cd2d3 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -165,6 +165,16 @@ enum class DistributedProductMode : uint8_t DECLARE_SETTING_ENUM(DistributedProductMode) +/// The setting for executing object storage cluster function or table JOIN sections. +enum class ObjectStorageClusterJoinMode : uint8_t +{ + LOCAL, /// Convert to local query + GLOBAL, /// Convert to global query + ALLOW /// Enable +}; + +DECLARE_SETTING_ENUM(ObjectStorageClusterJoinMode) + /// How the query result cache handles queries with non-deterministic functions, e.g. now() enum class QueryResultCacheNondeterministicFunctionHandling : uint8_t { diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index 1d7b237232ce..5439c51fb9f9 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -482,6 +482,7 @@ void executeQuery( std::move(unavailable_shard_tracker)); read_from_remote->setStepDescription("Read from remote replica"); + read_from_remote->setIsRemoteFunction(is_remote_function); plan->addStep(std::move(read_from_remote)); plan->addInterpreterContext(new_context); plans.emplace_back(std::move(plan)); diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 6c02bd513cde..b1966fd89266 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -3312,8 +3312,11 @@ void Context::setCurrentQueryId(const String & query_id) client_info.current_query_id = query_id_to_set; - if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY) + if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY + && (getApplicationType() != ApplicationType::SERVER || client_info.initial_query_id.empty())) + { client_info.initial_query_id = client_info.current_query_id; + } } void Context::killCurrentQuery() const diff --git a/src/Interpreters/PreparedSets.cpp b/src/Interpreters/PreparedSets.cpp index 823995831676..e2671ad4d3da 100644 --- a/src/Interpreters/PreparedSets.cpp +++ b/src/Interpreters/PreparedSets.cpp @@ -196,6 +196,12 @@ SetAndKeyPtr FutureSetFromSubquery::detachSetAndKey() } SetPtr FutureSetFromSubquery::get() const +{ + std::lock_guard lock(mutex); + return get_unsafe(); +} + +SetPtr FutureSetFromSubquery::get_unsafe() const { if (set_and_key->set != nullptr && set_and_key->set->isCreated()) return set_and_key->set; @@ -205,6 +211,7 @@ SetPtr FutureSetFromSubquery::get() const void FutureSetFromSubquery::setQueryPlan(std::unique_ptr source_) { + std::lock_guard lock(mutex); source = std::move(source_); set_and_key->set->setHeader(source->getCurrentHeader()->getColumnsWithTypeAndName()); } @@ -256,6 +263,8 @@ void FutureSetFromSubquery::buildExternalTableFromInplaceSet(StoragePtr external void FutureSetFromSubquery::setExternalTable(StoragePtr external_table_) { + std::lock_guard lock(mutex); + if (set_and_key->set->isCreated()) { if (!set_and_key->set->hasExplicitSetElements()) @@ -269,12 +278,19 @@ void FutureSetFromSubquery::setExternalTable(StoragePtr external_table_) DataTypes FutureSetFromSubquery::getTypes() const { + std::lock_guard lock(mutex); return set_and_key->set->getElementsTypes(); } FutureSet::Hash FutureSetFromSubquery::getHash() const { return hash; } std::unique_ptr FutureSetFromSubquery::build(const SizeLimits & network_transfer_limits, const PreparedSetsCachePtr & prepared_sets_cache) +{ + std::lock_guard lock(mutex); + return build_unsafe(network_transfer_limits, prepared_sets_cache); +} + +std::unique_ptr FutureSetFromSubquery::build_unsafe(const SizeLimits & network_transfer_limits, const PreparedSetsCachePtr & prepared_sets_cache) { if (set_and_key->set->isCreated()) return nullptr; @@ -296,6 +312,8 @@ std::unique_ptr FutureSetFromSubquery::build(const SizeLimits & netwo void FutureSetFromSubquery::buildSetInplace(const ContextPtr & context) { + std::lock_guard lock(mutex); + if (external_table_set) external_table_set->buildSetInplace(context); @@ -303,7 +321,7 @@ void FutureSetFromSubquery::buildSetInplace(const ContextPtr & context) SizeLimits network_transfer_limits(settings[Setting::max_rows_to_transfer], settings[Setting::max_bytes_to_transfer], settings[Setting::transfer_overflow_mode]); auto prepared_sets_cache = context->getPreparedSetsCache(); - auto plan = build(network_transfer_limits, prepared_sets_cache); + auto plan = build_unsafe(network_transfer_limits, prepared_sets_cache); if (!plan) return; @@ -326,7 +344,9 @@ SetPtr FutureSetFromSubquery::buildOrderedSetInplace(const ContextPtr & context) if (!context->getSettingsRef()[Setting::use_index_for_in_with_subqueries]) return nullptr; - if (auto set = get()) + std::lock_guard lock(mutex); + + if (auto set = get_unsafe()) { if (set->hasExplicitSetElements()) return set; @@ -348,7 +368,7 @@ SetPtr FutureSetFromSubquery::buildOrderedSetInplace(const ContextPtr & context) SizeLimits network_transfer_limits(settings[Setting::max_rows_to_transfer], settings[Setting::max_bytes_to_transfer], settings[Setting::transfer_overflow_mode]); auto prepared_sets_cache = context->getPreparedSetsCache(); - auto plan = build(network_transfer_limits, prepared_sets_cache); + auto plan = build_unsafe(network_transfer_limits, prepared_sets_cache); if (!plan) return nullptr; diff --git a/src/Interpreters/PreparedSets.h b/src/Interpreters/PreparedSets.h index 85ecc55a840f..eea499a0043f 100644 --- a/src/Interpreters/PreparedSets.h +++ b/src/Interpreters/PreparedSets.h @@ -176,6 +176,11 @@ class FutureSetFromSubquery final : public FutureSet QueryPlan * getQueryPlan() { return source.get(); } private: + SetPtr get_unsafe() const; + std::unique_ptr build_unsafe( + const SizeLimits & network_transfer_limits, + const PreparedSetsCachePtr & prepared_sets_cache); + Hash hash; ASTPtr ast; SetAndKeyPtr set_and_key; @@ -183,6 +188,8 @@ class FutureSetFromSubquery final : public FutureSet std::unique_ptr source; QueryTreeNodePtr query_tree; + + mutable std::mutex mutex; }; using FutureSetFromSubqueryPtr = std::shared_ptr; diff --git a/src/Planner/Planner.cpp b/src/Planner/Planner.cpp index 56c179c56d3b..c33e4f9d0799 100644 --- a/src/Planner/Planner.cpp +++ b/src/Planner/Planner.cpp @@ -38,6 +38,7 @@ #include #include #include +#include #include #include @@ -53,6 +54,7 @@ #include #include #include +#include #include @@ -145,6 +147,7 @@ namespace Setting extern const SettingsBool serialize_string_in_memory_with_zero_byte; extern const SettingsString temporary_files_codec; extern const SettingsNonZeroUInt64 temporary_files_buffer_size; + extern const SettingsBool use_hive_partitioning; } namespace ServerSetting @@ -585,6 +588,21 @@ ALWAYS_INLINE void addFilterStep( query_plan.addStep(std::move(where_step)); } +template +ALWAYS_INLINE void addObjectFilterStep( + QueryPlan & query_plan, + FilterAnalysisResult & filter_analysis_result, + const char (&step_description)[size]) +{ + auto actions = std::move(filter_analysis_result.filter_actions->dag); + + auto where_step = std::make_unique(query_plan.getCurrentHeader(), + std::move(actions), + filter_analysis_result.filter_column_name); + where_step->setStepDescription(step_description); + query_plan.addStep(std::move(where_step)); +} + Aggregator::Params getAggregatorParams(const PlannerContextPtr & planner_context, const AggregationAnalysisResult & aggregation_analysis_result, const QueryAnalysisResult & query_analysis_result, @@ -2191,6 +2209,16 @@ void Planner::buildPlanForQueryNode() if (query_processing_info.isSecondStage() || query_processing_info.isFromAggregationState()) { + if (settings[Setting::use_hive_partitioning] + && !query_processing_info.isFirstStage() + && expression_analysis_result.hasWhere()) + { + if (typeid_cast(query_plan.getRootNode()->step.get())) + { + addObjectFilterStep(query_plan, expression_analysis_result.getWhere(), "WHERE"); + } + } + if (query_processing_info.isFromAggregationState()) { /// Aggregation was performed on remote shards diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp index 4f61d321797b..f9cfec2aaa34 100644 --- a/src/Planner/PlannerJoinTree.cpp +++ b/src/Planner/PlannerJoinTree.cpp @@ -1512,7 +1512,9 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres /// Overall, IStorage::read -> FetchColumns returns normal column names (except Distributed, which is inconsistent) /// Interpreter::getQueryPlan -> FetchColumns returns identifiers (why?) and this the reason for the bug ^ in Distributed /// Hopefully there is no other case when we read from Distributed up to FetchColumns. - if (table_node && table_node->getStorage()->isRemote() && select_query_options.to_stage == QueryProcessingStage::FetchColumns) + if (table_node && table_node->getStorage()->isRemote()) + updated_actions_dag_outputs.push_back(output_node); + else if (table_function_node && table_function_node->getStorage()->isRemote()) updated_actions_dag_outputs.push_back(output_node); } else diff --git a/src/Processors/QueryPlan/ObjectFilterStep.cpp b/src/Processors/QueryPlan/ObjectFilterStep.cpp index e7fab549d73a..054ccdc3f40c 100644 --- a/src/Processors/QueryPlan/ObjectFilterStep.cpp +++ b/src/Processors/QueryPlan/ObjectFilterStep.cpp @@ -15,7 +15,11 @@ namespace ErrorCodes } ObjectFilterStep::ObjectFilterStep( +<<<<<<< HEAD const SharedHeader & input_header_, +======= + SharedHeader input_header_, +>>>>>>> d9d3710bd9b (Merge pull request #1646 from Altinity/frontport/antalya-26.3/fix_remote_calls) ActionsDAG actions_dag_, String filter_column_name_) : actions_dag(std::move(actions_dag_)) diff --git a/src/Processors/QueryPlan/ObjectFilterStep.h b/src/Processors/QueryPlan/ObjectFilterStep.h index cbfaf05a0df1..593d44d6ed57 100644 --- a/src/Processors/QueryPlan/ObjectFilterStep.h +++ b/src/Processors/QueryPlan/ObjectFilterStep.h @@ -5,17 +5,25 @@ namespace DB { +<<<<<<< HEAD /// Implements WHERE condition only to filter objects in object storage /// Difference with FilterStep is that ObjectFilterStep is added only for distributed calls /// (table functions like `s3Cluster`) and is used only to filter objects, /// not to filter data after reading, because initiator can have not this column /// In query like `SELECT count() FROM s3Cluster('cluster', ...) WHERE key=42` /// column `key` does not exist in blocks getting from cluster replicas. +======= +/// Implements WHERE operation. +>>>>>>> d9d3710bd9b (Merge pull request #1646 from Altinity/frontport/antalya-26.3/fix_remote_calls) class ObjectFilterStep : public IQueryPlanStep { public: ObjectFilterStep( +<<<<<<< HEAD const SharedHeader & input_header_, +======= + SharedHeader input_header_, +>>>>>>> d9d3710bd9b (Merge pull request #1646 from Altinity/frontport/antalya-26.3/fix_remote_calls) ActionsDAG actions_dag_, String filter_column_name_); diff --git a/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp b/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp index 35ef6f94f6ff..346761149913 100644 --- a/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp +++ b/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp @@ -55,6 +55,7 @@ void registerFilterStep(QueryPlanStepRegistry & registry); void registerTotalsHavingStep(QueryPlanStepRegistry & registry); void registerExtremesStep(QueryPlanStepRegistry & registry); void registerJoinStep(QueryPlanStepRegistry & registry); +void registerObjectFilterStep(QueryPlanStepRegistry & registry); void registerReadFromTableStep(QueryPlanStepRegistry & registry); void registerReadFromTableFunctionStep(QueryPlanStepRegistry & registry); diff --git a/src/Processors/QueryPlan/ReadFromRemote.cpp b/src/Processors/QueryPlan/ReadFromRemote.cpp index 86c89fb86821..37a03a95bed6 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.cpp +++ b/src/Processors/QueryPlan/ReadFromRemote.cpp @@ -572,7 +572,8 @@ void ReadFromRemote::addLazyPipe( my_stage = stage, my_storage = storage, add_agg_info, add_totals, add_extremes, async_read, async_query_sending, query_tree = shard.query_tree, planner_context = shard.planner_context, - pushed_down_filters, parallel_marshalling_threads]() mutable + pushed_down_filters, parallel_marshalling_threads, + my_is_remote_function = is_remote_function]() mutable -> QueryPipelineBuilder { auto current_settings = my_context->getSettingsRef(); @@ -669,7 +670,12 @@ void ReadFromRemote::addLazyPipe( auto remote_query_executor = std::make_shared( std::move(connections), query_string, header, my_context, my_throttler, my_scalars, my_external_tables, stage_to_use, my_shard.query_plan, /*extension=*/std::nullopt, my_shard.shard_info.pool); +<<<<<<< HEAD remote_query_executor->setDistributedFanout(my_distributed_fanout); +======= + remote_query_executor->setRemoteFunction(my_is_remote_function); + remote_query_executor->setShardCount(my_shard_count); +>>>>>>> d9d3710bd9b (Merge pull request #1646 from Altinity/frontport/antalya-26.3/fix_remote_calls) auto pipe = createRemoteSourcePipe( remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending, parallel_marshalling_threads); @@ -764,6 +770,8 @@ void ReadFromRemote::addPipe( remote_query_executor->setPoolMode(PoolMode::GET_ONE); remote_query_executor->setDistributedFanout(shards.size() * shard.shard_info.per_replica_pools.size()); remote_query_executor->setUnavailableShardTracker(unavailable_shard_tracker); + remote_query_executor->setRemoteFunction(is_remote_function); + remote_query_executor->setShardCount(shard_count); if (!table_func_ptr) remote_query_executor->setMainTable(shard.main_table ? shard.main_table : main_table); @@ -794,6 +802,8 @@ void ReadFromRemote::addPipe( remote_query_executor->setLogger(log); remote_query_executor->setDistributedFanout(shards.size()); remote_query_executor->setUnavailableShardTracker(unavailable_shard_tracker); + remote_query_executor->setRemoteFunction(is_remote_function); + remote_query_executor->setShardCount(shard_count); if (context->canUseTaskBasedParallelReplicas() || parallel_replicas_disabled) { diff --git a/src/Processors/QueryPlan/ReadFromRemote.h b/src/Processors/QueryPlan/ReadFromRemote.h index 7ebc0f69d072..2e739e032be3 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.h +++ b/src/Processors/QueryPlan/ReadFromRemote.h @@ -51,6 +51,7 @@ class ReadFromRemote final : public SourceStepWithFilterBase void enableMemoryBoundMerging(); void enforceAggregationInOrder(const SortDescription & sort_description); + void setIsRemoteFunction(bool is_remote_function_ = true) { is_remote_function = is_remote_function_; } bool hasSerializedPlan() const; @@ -69,6 +70,7 @@ class ReadFromRemote final : public SourceStepWithFilterBase const String cluster_name; UnavailableShardTrackerPtr unavailable_shard_tracker; std::optional priority_func_factory; + bool is_remote_function = false; Pipes addPipes(const ClusterProxy::SelectStreamFactory::Shards & used_shards, const SharedHeader & out_header); diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index e6456d7880ff..1b524da44e8f 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -443,7 +443,16 @@ void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, As auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings); ClientInfo modified_client_info = context->getClientInfo(); - modified_client_info.query_kind = query_kind; + + /// Doesn't support now "remote('1.1.1.{1,2}')"" + if (is_remote_function && (shard_count == 1)) + { + modified_client_info.setInitialQuery(); + modified_client_info.client_name = "ClickHouse server"; + modified_client_info.interface = ClientInfo::Interface::TCP; + } + else + modified_client_info.query_kind = query_kind; if (extension) modified_client_info.collaborate_with_initiator = true; diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index 48e91df6556b..11027a95a65e 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -218,7 +218,13 @@ class RemoteQueryExecutor void setUnavailableShardTracker(UnavailableShardTrackerPtr tracker) { unavailable_shard_tracker = std::move(tracker); } +<<<<<<< HEAD void setDistributedFanout(size_t total_connections) { distributed_fanout = total_connections; } +======= + void setRemoteFunction(bool is_remote_function_ = true) { is_remote_function = is_remote_function_; } + + void setShardCount(UInt32 shard_count_) { shard_count = shard_count_; } +>>>>>>> d9d3710bd9b (Merge pull request #1646 from Altinity/frontport/antalya-26.3/fix_remote_calls) const Block & getHeader() const { return *header; } const SharedHeader & getSharedHeader() const { return header; } @@ -316,6 +322,9 @@ class RemoteQueryExecutor bool packet_in_progress = false; #endif + bool is_remote_function = false; + UInt32 shard_count = 0; + /// Parts uuids, collected from remote replicas std::vector duplicated_part_uuids; diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 0d1ac9a23c44..6084b91929f8 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -19,6 +19,15 @@ #include #include #include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include #include @@ -36,6 +45,12 @@ namespace Setting extern const SettingsBool parallel_replicas_local_plan; extern const SettingsString cluster_for_parallel_replicas; extern const SettingsNonZeroUInt64 max_parallel_replicas; + extern const SettingsObjectStorageClusterJoinMode object_storage_cluster_join_mode; +} + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; } namespace ErrorCodes @@ -78,6 +93,207 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate) getStorageSnapshot()->metadata); } +namespace +{ + +/* +Helping class to find in query tree first node of required type +*/ +class SearcherVisitor : public InDepthQueryTreeVisitorWithContext +{ +public: + using Base = InDepthQueryTreeVisitorWithContext; + using Base::Base; + + explicit SearcherVisitor(std::unordered_set types_, ContextPtr context) : Base(context), types(types_) {} + + bool needChildVisit(QueryTreeNodePtr & /*parent*/, QueryTreeNodePtr & /*child*/) + { + return getSubqueryDepth() <= 2 && !passed_node; + } + + void enterImpl(QueryTreeNodePtr & node) + { + if (passed_node) + return; + + auto node_type = node->getNodeType(); + + if (types.contains(node_type)) + passed_node = node; + } + + QueryTreeNodePtr getNode() const { return passed_node; } + +private: + std::unordered_set types; + QueryTreeNodePtr passed_node; +}; + +/* +Helping class to find all used columns with specific source +*/ +class CollectUsedColumnsForSourceVisitor : public InDepthQueryTreeVisitorWithContext +{ +public: + using Base = InDepthQueryTreeVisitorWithContext; + using Base::Base; + + explicit CollectUsedColumnsForSourceVisitor( + QueryTreeNodePtr source_, + ContextPtr context, + bool collect_columns_from_other_sources_ = false) + : Base(context) + , source(source_) + , collect_columns_from_other_sources(collect_columns_from_other_sources_) + {} + + void enterImpl(QueryTreeNodePtr & node) + { + auto node_type = node->getNodeType(); + + if (node_type != QueryTreeNodeType::COLUMN) + return; + + auto & column_node = node->as(); + auto column_source = column_node.getColumnSourceOrNull(); + if (!column_source) + return; + + if ((column_source == source) != collect_columns_from_other_sources) + { + const auto & name = column_node.getColumnName(); + if (!names.count(name)) + { + columns.emplace_back(column_node.getColumn()); + names.insert(name); + } + } + } + + const NamesAndTypes & getColumns() const { return columns; } + +private: + std::unordered_set names; + QueryTreeNodePtr source; + NamesAndTypes columns; + bool collect_columns_from_other_sources; +}; + +}; + +/* +Try to make subquery to send on nodes +Converts + + SELECT s3.c1, s3.c2, t.c3 + FROM + s3Cluster(...) AS s3 + JOIN + localtable as t + ON s3.key == t.key + +to + + SELECT s3.c1, s3.c2, s3.key + FROM + s3Cluster(...) AS s3 +*/ +void IStorageCluster::updateQueryWithJoinToSendIfNeeded( + ASTPtr & query_to_send, + QueryTreeNodePtr query_tree, + const ContextPtr & context) +{ + auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode]; + switch (object_storage_cluster_join_mode) + { + case ObjectStorageClusterJoinMode::LOCAL: + { + auto info = getQueryTreeInfo(query_tree, context); + + if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) + { + auto modified_query_tree = query_tree->clone(); + + SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); + left_table_expression_searcher.visit(modified_query_tree); + auto table_function_node = left_table_expression_searcher.getNode(); + if (!table_function_node) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); + + QueryTreeNodePtr query_tree_distributed; + + auto & query_node = modified_query_tree->as(); + + if (info.has_join) + { + auto join_node = query_node.getJoinTree(); + query_tree_distributed = join_node->as()->getLeftTableExpression()->clone(); + } + else if (info.has_cross_join) + { + SearcherVisitor join_searcher({QueryTreeNodeType::CROSS_JOIN}, context); + join_searcher.visit(modified_query_tree); + auto cross_join_node = join_searcher.getNode(); + if (!cross_join_node) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find CROSS JOIN node"); + // CrossJoinNode contains vector of nodes. 0 is left expression, always exists. + query_tree_distributed = cross_join_node->as()->getTableExpressions()[0]->clone(); + } + + // Find add used columns from table function to make proper projection list + // Need to do before changing WHERE condition + CollectUsedColumnsForSourceVisitor collector(table_function_node, context); + collector.visit(modified_query_tree); + const auto & columns = collector.getColumns(); + + if (columns.empty()) + { + auto column_nodes_to_select = std::make_shared(); + column_nodes_to_select->getNodes().reserve(1); + column_nodes_to_select->getNodes().emplace_back(std::make_shared(1)); + query_node.getProjectionNode() = column_nodes_to_select; + } + else + { + query_node.resolveProjectionColumns(columns); + auto column_nodes_to_select = std::make_shared(); + column_nodes_to_select->getNodes().reserve(columns.size()); + for (auto & column : columns) + column_nodes_to_select->getNodes().emplace_back(std::make_shared(column, table_function_node)); + query_node.getProjectionNode() = column_nodes_to_select; + } + + if (info.has_local_columns_in_where) + { + if (query_node.getPrewhere()) + removeExpressionsThatDoNotDependOnTableIdentifiers(query_node.getPrewhere(), table_function_node, context); + if (query_node.getWhere()) + removeExpressionsThatDoNotDependOnTableIdentifiers(query_node.getWhere(), table_function_node, context); + } + + query_node.getOrderByNode() = std::make_shared(); + query_node.getGroupByNode() = std::make_shared(); + + if (query_tree_distributed) + { + // Left only table function to send on cluster nodes + modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed); + } + + query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree); + } + + return; + } + case ObjectStorageClusterJoinMode::GLOBAL: + // TODO + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "`Global` mode for `object_storage_cluster_join_mode` setting is unimplemented for now"); + case ObjectStorageClusterJoinMode::ALLOW: // Do nothing special + return; + } +} + /// The code executes on initiator void IStorageCluster::read( QueryPlan & query_plan, @@ -101,13 +317,15 @@ void IStorageCluster::read( SharedHeader sample_block; ASTPtr query_to_send = query_info.query; + updateQueryWithJoinToSendIfNeeded(query_to_send, query_info.query_tree, context); + if (context->getSettingsRef()[Setting::allow_experimental_analyzer]) { - sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_info.query, context, SelectQueryOptions(processed_stage)); + sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_to_send, context, SelectQueryOptions(processed_stage)); } else { - auto interpreter = InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage).analyze()); + auto interpreter = InterpreterSelectQuery(query_to_send, context, SelectQueryOptions(processed_stage).analyze()); sample_block = interpreter.getSampleBlock(); query_to_send = interpreter.getQueryInfo().query->clone(); } @@ -115,7 +333,7 @@ void IStorageCluster::read( updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context); RestoreQualifiedNamesVisitor::Data data; - data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_info.query->as(), 0)); + data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_to_send->as(), 0)); data.remote_table.database = context->getCurrentDatabase(); data.remote_table.table = getName(); RestoreQualifiedNamesVisitor(data).visit(query_to_send); @@ -210,9 +428,59 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const pipeline.init(std::move(pipe)); } +IStorageCluster::QueryTreeInfo IStorageCluster::getQueryTreeInfo(QueryTreeNodePtr query_tree, ContextPtr context) +{ + QueryTreeInfo info; + + auto & query_node = query_tree->as(); + if (auto join_node = query_node.getJoinTree()) + { + if (join_node->getNodeType() == QueryTreeNodeType::JOIN) + info.has_join = true; + else if (join_node->getNodeType() == QueryTreeNodeType::CROSS_JOIN) + info.has_cross_join = true; + } + + SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); + left_table_expression_searcher.visit(query_tree); + auto table_function_node = left_table_expression_searcher.getNode(); + if (!table_function_node) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table or table function node"); + + if (query_node.hasWhere() || query_node.hasPrewhere()) + { + CollectUsedColumnsForSourceVisitor collector_where(table_function_node, context, true); + if (query_node.hasPrewhere()) + collector_where.visit(query_node.getPrewhere()); + if (query_node.hasWhere()) + collector_where.visit(query_node.getWhere()); + + // SELECT x FROM datalake.table WHERE x IN local.table. + // Need to modify 'WHERE' on remote node if it contains columns from other sources + // because remote node might not have those sources. + if (!collector_where.getColumns().empty()) + info.has_local_columns_in_where = true; + } + + return info; +} + QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( - ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo &) const + ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo & query_info) const { + auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode]; + + if (object_storage_cluster_join_mode != ObjectStorageClusterJoinMode::ALLOW) + { + if (!context->getSettingsRef()[Setting::allow_experimental_analyzer]) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, + "object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true"); + + auto info = getQueryTreeInfo(query_info.query_tree, context); + if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) + return QueryProcessingStage::Enum::FetchColumns; + } + /// Initiator executes query on remote node. if (context->getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY) if (to_stage >= QueryProcessingStage::Enum::WithMergeableState) diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 2d95a3d53863..96964a02dd35 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -55,12 +55,22 @@ class IStorageCluster : public IStorage protected: virtual void updateBeforeRead(const ContextPtr &) {} virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {} + void updateQueryWithJoinToSendIfNeeded(ASTPtr & query_to_send, QueryTreeNodePtr query_tree, const ContextPtr & context); virtual void updateConfigurationIfNeeded(ContextPtr /* context */) {} private: LoggerPtr log; String cluster_name; + + struct QueryTreeInfo + { + bool has_join = false; + bool has_cross_join = false; + bool has_local_columns_in_where = false; + }; + + static QueryTreeInfo getQueryTreeInfo(QueryTreeNodePtr query_tree, ContextPtr context); }; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 926f48928cec..5560d003a237 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -251,7 +251,11 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten local_context, predicate, filter, +<<<<<<< HEAD storage_metadata_snapshot->virtuals.getSampleBlock(VirtualsKind::All, VirtualsMaterializationPlace::Reader).getNamesAndTypesList(), +======= + getVirtualsList(), +>>>>>>> d9d3710bd9b (Merge pull request #1646 from Altinity/frontport/antalya-26.3/fix_remote_calls) hive_partition_columns_to_read_from_file_path, nullptr, local_context->getFileProgressCallback(), diff --git a/src/Storages/extractTableFunctionFromSelectQuery.cpp b/src/Storages/extractTableFunctionFromSelectQuery.cpp index 57302036c889..064f538eeae7 100644 --- a/src/Storages/extractTableFunctionFromSelectQuery.cpp +++ b/src/Storages/extractTableFunctionFromSelectQuery.cpp @@ -9,7 +9,7 @@ namespace DB { -ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) +ASTTableExpression * extractTableExpressionASTPtrFromSelectQuery(ASTPtr & query) { auto * select_query = query->as(); if (!select_query || !select_query->tables()) @@ -17,10 +17,36 @@ ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) auto * tables = select_query->tables()->as(); auto * table_expression = tables->children[0]->as()->table_expression->as(); - if (!table_expression->table_function) + return table_expression; +} + +ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query) +{ + auto table_expression = extractTableExpressionASTPtrFromSelectQuery(query); + return table_expression ? table_expression->table_function : nullptr; +} + +ASTPtr extractTableASTPtrFromSelectQuery(ASTPtr & query) +{ + auto table_expression = extractTableExpressionASTPtrFromSelectQuery(query); + return table_expression ? table_expression->database_and_table_name : nullptr; +} + +ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) +{ + auto table_function_ast = extractTableFunctionASTPtrFromSelectQuery(query); + if (!table_function_ast) return nullptr; - return table_expression->table_function->as(); + return table_function_ast->as(); +} + +ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query) +{ + auto * table_function = extractTableFunctionFromSelectQuery(query); + if (!table_function) + return nullptr; + return table_function->arguments->as(); } } diff --git a/src/Storages/extractTableFunctionFromSelectQuery.h b/src/Storages/extractTableFunctionFromSelectQuery.h index c69cc7ce6c52..20cc1ae93896 100644 --- a/src/Storages/extractTableFunctionFromSelectQuery.h +++ b/src/Storages/extractTableFunctionFromSelectQuery.h @@ -6,7 +6,11 @@ namespace DB { +struct ASTTableExpression; +ASTTableExpression * extractTableExpressionASTPtrFromSelectQuery(ASTPtr & query); +ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query); +ASTPtr extractTableASTPtrFromSelectQuery(ASTPtr & query); ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query); } diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 5a08265699ef..4179e6bd06cf 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -14,12 +14,13 @@ import pytz from minio import Minio from pyiceberg.catalog import load_catalog -from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.partitioning import PartitionField, PartitionSpec, UNPARTITIONED_PARTITION_SPEC from pyiceberg.schema import Schema from pyiceberg.table.sorting import SortField, SortOrder from pyiceberg.transforms import DayTransform, IdentityTransform from pyiceberg.types import ( DoubleType, + LongType, FloatType, NestedField, StringType, @@ -27,6 +28,7 @@ TimestampType, TimestamptzType ) +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER from helpers.cluster import ClickHouseCluster, ClickHouseInstance, is_arm from helpers.config_cluster import minio_secret_key, minio_access_key @@ -206,6 +208,7 @@ def started_cluster(): user_configs=[], stay_alive=True, with_iceberg_catalog=True, + with_zookeeper=True, ) logging.info("Starting cluster...") @@ -961,6 +964,7 @@ def test_gcs(started_cluster): assert "Google cloud storage converts to S3" in str(err.value) +<<<<<<< HEAD def test_invalid_auth_header_format(started_cluster): node = started_cluster.instances["node1"] @@ -978,3 +982,144 @@ def test_invalid_auth_header_format(started_cluster): """ ) assert "Invalid auth header format" in str(err.value) +======= +# TODO - turn on after merge alternative syntax +def _test_cluster_joins(started_cluster): + node = started_cluster.instances["node1"] + + test_ref = f"test_join_tables_{uuid.uuid4()}" + table_name = f"{test_ref}_table" + table_name_2 = f"{test_ref}_table_2" + table_name_local = f"{test_ref}_table_local" + + root_namespace = f"{test_ref}_namespace" + + catalog = load_catalog_impl(started_cluster) + catalog.create_namespace(root_namespace) + + schema = Schema( + NestedField( + field_id=1, + name="tag", + field_type=LongType(), + required=False + ), + NestedField( + field_id=2, + name="name", + field_type=StringType(), + required=False, + ), + ) + table = create_table(catalog, root_namespace, table_name, schema, + partition_spec=UNPARTITIONED_PARTITION_SPEC, sort_order=UNSORTED_SORT_ORDER) + data = [{"tag": 1, "name": "John"}, {"tag": 2, "name": "Jack"}] + df = pa.Table.from_pylist(data) + table.append(df) + + schema2 = Schema( + NestedField( + field_id=1, + name="id", + field_type=LongType(), + required=False + ), + NestedField( + field_id=2, + name="second_name", + field_type=StringType(), + required=False, + ), + ) + table2 = create_table(catalog, root_namespace, table_name_2, schema2, + partition_spec=UNPARTITIONED_PARTITION_SPEC, sort_order=UNSORTED_SORT_ORDER) + data = [{"id": 1, "second_name": "Dow"}, {"id": 2, "second_name": "Sparrow"}] + df = pa.Table.from_pylist(data) + table2.append(df) + + node.query(f"CREATE TABLE `{table_name_local}` (id Int64, second_name String) ENGINE = Memory()") + node.query(f"INSERT INTO `{table_name_local}` VALUES (1, 'Silver'), (2, 'Black')") + + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + + res = node.query( + f""" + SELECT t1.name,t2.second_name + FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` AS t1 + JOIN {CATALOG_NAME}.`{root_namespace}.{table_name_2}` AS t2 + ON t1.tag=t2.id + WHERE t1.tag < 10 AND t2.id < 20 + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "Jack\tSparrow\nJohn\tDow\n" + + res = node.query( + f""" + SELECT name + FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` + WHERE tag in ( + SELECT id + FROM {CATALOG_NAME}.`{root_namespace}.{table_name_2}` + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "Jack\nJohn\n" + + res = node.query( + f""" + SELECT t1.name,t2.second_name + FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` AS t1 + JOIN `{table_name_local}` AS t2 + ON t1.tag=t2.id + WHERE t1.tag < 10 AND t2.id < 20 + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "Jack\tBlack\nJohn\tSilver\n" + + res = node.query( + f""" + SELECT name + FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` + WHERE tag in ( + SELECT id + FROM `{table_name_local}` + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "Jack\nJohn\n" + + res = node.query( + f""" + SELECT t1.name,t2.second_name + FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` AS t1 + CROSS JOIN `{table_name_local}` AS t2 + WHERE t1.tag < 10 AND t2.id < 20 + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "Jack\tBlack\nJack\tSilver\nJohn\tBlack\nJohn\tSilver\n" +>>>>>>> d9d3710bd9b (Merge pull request #1646 from Altinity/frontport/antalya-26.3/fix_remote_calls) diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 079e0834de29..418698fbade0 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -3,6 +3,11 @@ import os import shutil import uuid +<<<<<<< HEAD +======= +import threading +import time +>>>>>>> d9d3710bd9b (Merge pull request #1646 from Altinity/frontport/antalya-26.3/fix_remote_calls) from email.errors import HeaderParseError import pytest @@ -512,9 +517,366 @@ def test_cluster_default_expression(started_cluster): assert result == expected_result +<<<<<<< HEAD @pytest.mark.parametrize("allow_experimental_analyzer", [0, 1]) @pytest.mark.parametrize("use_partition_strategy", [False, True]) def test_hive_partitioning(started_cluster, allow_experimental_analyzer, use_partition_strategy): +======= +def test_remote_hedged(started_cluster): + node = started_cluster.instances["s0_0_0"] + pure_s3 = node.query( + f""" + SELECT * from s3( + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', + 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + ORDER BY (name, value, polygon) + LIMIT 1 + """ + ) + s3_distributed = node.query( + f""" + SELECT * from remote('s0_0_1', s3Cluster( + 'cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')) + ORDER BY (name, value, polygon) + LIMIT 1 + SETTINGS use_hedged_requests=True + """ + ) + + assert TSV(pure_s3) == TSV(s3_distributed) + + +def test_remote_no_hedged(started_cluster): + node = started_cluster.instances["s0_0_0"] + pure_s3 = node.query( + f""" + SELECT * from s3( + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', + 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + ORDER BY (name, value, polygon) + LIMIT 1 + """ + ) + s3_distributed = node.query( + f""" + SELECT * from remote('s0_0_1', s3Cluster( + 'cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')) + ORDER BY (name, value, polygon) + LIMIT 1 + SETTINGS use_hedged_requests=False + """ + ) + + assert TSV(pure_s3) == TSV(s3_distributed) + + +@pytest.mark.parametrize("allow_experimental_analyzer", [0, 1]) +def test_hive_partitioning(started_cluster, allow_experimental_analyzer): + node = started_cluster.instances["s0_0_0"] + + node.query(f"SET allow_experimental_analyzer = {allow_experimental_analyzer}") + + for i in range(1, 5): + exists = node.query( + f""" + SELECT + count() + FROM s3('http://minio1:9001/root/data/hive/key={i}/*', 'minio', '{minio_secret_key}', 'Parquet', 'key Int32, value Int32') + GROUP BY ALL + FORMAT TSV + """ + ) + if int(exists) == 0: + node.query( + f""" + INSERT + INTO FUNCTION s3('http://minio1:9001/root/data/hive/key={i}/data.parquet', 'minio', '{minio_secret_key}', 'Parquet', 'key Int32, value Int32') + SELECT {i}, {i} + SETTINGS use_hive_partitioning = 0 + """ + ) + + settings = "enable_filesystem_cache = 0, use_query_cache = 0, use_cache_for_count_from_files = 0, use_iceberg_metadata_files_cache = 0, use_parquet_metadata_cache = 0, use_page_cache_for_object_storage = 0" + + query_id_full = str(uuid.uuid4()) + result = node.query( + f""" + SELECT count() + FROM s3('http://minio1:9001/root/data/hive/key=**.parquet', 'minio', '{minio_secret_key}', 'Parquet', 'key Int32, value Int32') + WHERE key <= 2 + FORMAT TSV + SETTINGS {settings}, use_hive_partitioning = 0 + """, + query_id=query_id_full, + ) + result = int(result) + assert result == 2 + + query_id_optimized = str(uuid.uuid4()) + result = node.query( + f""" + SELECT count() + FROM s3('http://minio1:9001/root/data/hive/key=**.parquet', 'minio', '{minio_secret_key}', 'Parquet', 'key Int32, value Int32') + WHERE key <= 2 + FORMAT TSV + SETTINGS {settings}, use_hive_partitioning = 1 + """, + query_id=query_id_optimized, + ) + result = int(result) + assert result == 2 + + query_id_cluster_full = str(uuid.uuid4()) + result = node.query( + f""" + SELECT count() + FROM s3Cluster(cluster_simple, 'http://minio1:9001/root/data/hive/key=**.parquet', 'minio', '{minio_secret_key}', 'Parquet', 'key Int32, value Int32') + WHERE key <= 2 + FORMAT TSV + SETTINGS {settings}, use_hive_partitioning = 0 + """, + query_id=query_id_cluster_full, + ) + result = int(result) + assert result == 2 + + query_id_cluster_optimized = str(uuid.uuid4()) + result = node.query( + f""" + SELECT count() + FROM s3Cluster(cluster_simple, 'http://minio1:9001/root/data/hive/key=**.parquet', 'minio', '{minio_secret_key}', 'Parquet', 'key Int32, value Int32') + WHERE key <= 2 + FORMAT TSV + SETTINGS {settings}, use_hive_partitioning = 1 + """, + query_id=query_id_cluster_optimized, + ) + result = int(result) + assert result == 2 + + node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'") + + full_traffic = node.query( + f""" + SELECT sum(ProfileEvents['ReadBufferFromS3Bytes']) + FROM clusterAllReplicas(cluster_simple, system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_full}' + FORMAT TSV + """ + ) + full_traffic = int(full_traffic) + assert full_traffic > 0 # 612*4 + + optimized_traffic = node.query( + f""" + SELECT sum(ProfileEvents['ReadBufferFromS3Bytes']) + FROM clusterAllReplicas(cluster_simple, system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_optimized}' + FORMAT TSV + """ + ) + optimized_traffic = int(optimized_traffic) + assert optimized_traffic > 0 # 612*2 + assert full_traffic > optimized_traffic + + cluster_full_traffic = node.query( + f""" + SELECT sum(ProfileEvents['ReadBufferFromS3Bytes']) + FROM clusterAllReplicas(cluster_simple, system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_cluster_full}' + FORMAT TSV + """ + ) + cluster_full_traffic = int(cluster_full_traffic) + assert cluster_full_traffic == full_traffic + + cluster_optimized_traffic = node.query( + f""" + SELECT sum(ProfileEvents['ReadBufferFromS3Bytes']) + FROM clusterAllReplicas(cluster_simple, system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_cluster_optimized}' + FORMAT TSV + """ + ) + cluster_optimized_traffic = int(cluster_optimized_traffic) + assert cluster_optimized_traffic == optimized_traffic + + node.query("SET allow_experimental_analyzer = DEFAULT") + + +def test_joins(started_cluster): + node = started_cluster.instances["s0_0_0"] + + # Table join_table only exists on the node 's0_0_0'. + node.query("DROP TABLE IF EXISTS join_table SYNC") + node.query( + """ + CREATE TABLE IF NOT EXISTS join_table ( + id UInt32, + name String + ) ENGINE=MergeTree() + ORDER BY id; + """ + ) + + node.query( + f""" + INSERT INTO join_table + SELECT value, concat(name, '_jt') FROM s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))'); + """ + ) + + result1 = node.query( + f""" + SELECT t1.name, t2.name FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 + JOIN + join_table AS t2 + ON t1.value = t2.id + ORDER BY t1.name + SETTINGS object_storage_cluster_join_mode='local'; + """ + ) + + res = list(map(str.split, result1.splitlines())) + assert len(res) == 25 + + for line in res: + if len(line) == 2: + assert line[1] == f"{line[0]}_jt" + else: + assert line == ["_jt"] # for empty name + + result2 = node.query( + f""" + SELECT t1.name, t2.name FROM + join_table AS t2 + JOIN + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 + ON t1.value = t2.id + ORDER BY t1.name + SETTINGS object_storage_cluster_join_mode='local'; + """ + ) + + assert result1 == result2 + + # With WHERE clause with remote column only + result3 = node.query( + f""" + SELECT t1.name, t2.name FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 + JOIN + join_table AS t2 + ON t1.value = t2.id + WHERE (t1.value % 2) + ORDER BY t1.name + SETTINGS object_storage_cluster_join_mode='local'; + """ + ) + + res = list(map(str.split, result3.splitlines())) + assert len(res) == 8 + + # With WHERE clause with local column only + result4 = node.query( + f""" + SELECT t1.name, t2.name FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 + JOIN + join_table AS t2 + ON t1.value = t2.id + WHERE (t2.id % 2) + ORDER BY t1.name + SETTINGS object_storage_cluster_join_mode='local'; + """ + ) + + assert result3 == result4 + + # With WHERE clause with local and remote columns + result5 = node.query( + f""" + SELECT t1.name, t2.name FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 + JOIN + join_table AS t2 + ON t1.value = t2.id + WHERE (t1.value % 2) AND ((t2.id % 3) == 2) + ORDER BY t1.name + SETTINGS object_storage_cluster_join_mode='local'; + """ + ) + + res = list(map(str.split, result5.splitlines())) + assert len(res) == 6 + + result6 = node.query( + f""" + SELECT name FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + WHERE value IN (SELECT id FROM join_table) + ORDER BY name + SETTINGS object_storage_cluster_join_mode='local'; + """ + ) + res = list(map(str.split, result6.splitlines())) + assert len(res) == 25 + + result7 = node.query( + f""" + SELECT count() FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 + JOIN + join_table AS t2 + ON 1 + GROUP BY ALL + SETTINGS object_storage_cluster_join_mode='local'; + """ + ) + assert result7.strip() == "625" + + result8 = node.query( + f""" + SELECT count(), t2.id FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 + JOIN + join_table AS t2 + ON 1 + GROUP BY ALL + SETTINGS object_storage_cluster_join_mode='local'; + """ + ) + res = list(map(str.split, result8.splitlines())) + assert len(res) == 25 + + +def test_graceful_shutdown(started_cluster): +>>>>>>> d9d3710bd9b (Merge pull request #1646 from Altinity/frontport/antalya-26.3/fix_remote_calls) node = started_cluster.instances["s0_0_0"] data_path = f"root/data/hive_{allow_experimental_analyzer}/{random_string(6)}" diff --git a/tests/integration/test_storage_iceberg_with_spark/test_cluster_joins.py b/tests/integration/test_storage_iceberg_with_spark/test_cluster_joins.py new file mode 100644 index 000000000000..bb637f8e8cc2 --- /dev/null +++ b/tests/integration/test_storage_iceberg_with_spark/test_cluster_joins.py @@ -0,0 +1,154 @@ +import pytest + +from helpers.iceberg_utils import ( + get_uuid_str, + get_creation_expression, + execute_spark_query_general, +) + +# TODO - turn on after merge alternative syntax +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) +def _test_cluster_joins(started_cluster_iceberg_with_spark, storage_type): + instance = started_cluster_iceberg_with_spark.instances["node1"] + spark = started_cluster_iceberg_with_spark.spark_session + TABLE_NAME = "test_cluster_joins_" + storage_type + "_" + get_uuid_str() + TABLE_NAME_2 = "test_cluster_joins_2_" + storage_type + "_" + get_uuid_str() + TABLE_NAME_LOCAL = "test_cluster_joins_local_" + storage_type + "_" + get_uuid_str() + + def execute_spark_query(query: str, table_name): + return execute_spark_query_general( + spark, + started_cluster_iceberg_with_spark, + storage_type, + table_name, + query, + ) + + execute_spark_query( + f""" + CREATE TABLE {TABLE_NAME} ( + tag INT, + name VARCHAR(50) + ) + USING iceberg + OPTIONS('format-version'='2') + """, TABLE_NAME + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME} VALUES + (1, 'john'), + (2, 'jack') + """, TABLE_NAME + ) + + execute_spark_query( + f""" + CREATE TABLE {TABLE_NAME_2} ( + id INT, + second_name VARCHAR(50) + ) + USING iceberg + OPTIONS('format-version'='2') + """, TABLE_NAME_2 + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME_2} VALUES + (1, 'dow'), + (2, 'sparrow') + """, TABLE_NAME_2 + ) + + creation_expression = get_creation_expression( + storage_type, TABLE_NAME, started_cluster_iceberg_with_spark, table_function=True, run_on_cluster=True + ) + + creation_expression_2 = get_creation_expression( + storage_type, TABLE_NAME_2, started_cluster_iceberg_with_spark, table_function=True, run_on_cluster=True + ) + + instance.query(f"CREATE TABLE `{TABLE_NAME_LOCAL}` (id Int64, second_name String) ENGINE = Memory()") + instance.query(f"INSERT INTO `{TABLE_NAME_LOCAL}` VALUES (1, 'silver'), (2, 'black')") + + res = instance.query( + f""" + SELECT t1.name,t2.second_name + FROM {creation_expression} AS t1 + JOIN {creation_expression_2} AS t2 + ON t1.tag=t2.id + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "jack\tsparrow\njohn\tdow\n" + + res = instance.query( + f""" + SELECT name + FROM {creation_expression} + WHERE tag in ( + SELECT id + FROM {creation_expression_2} + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "jack\njohn\n" + + res = instance.query( + f""" + SELECT t1.name,t2.second_name + FROM {creation_expression} AS t1 + JOIN `{TABLE_NAME_LOCAL}` AS t2 + ON t1.tag=t2.id + WHERE t1.tag < 10 AND t2.id < 20 + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "jack\tblack\njohn\tsilver\n" + + res = instance.query( + f""" + SELECT name + FROM {creation_expression} + WHERE tag in ( + SELECT id + FROM `{TABLE_NAME_LOCAL}` + ) + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "jack\njohn\n" + + res = instance.query( + f""" + SELECT t1.name,t2.second_name + FROM {creation_expression} AS t1 + CROSS JOIN `{TABLE_NAME_LOCAL}` AS t2 + WHERE t1.tag < 10 AND t2.id < 20 + ORDER BY ALL + SETTINGS + object_storage_cluster='cluster_simple', + object_storage_cluster_join_mode='local' + """ + ) + + assert res == "jack\tblack\njack\tsilver\njohn\tblack\njohn\tsilver\n" diff --git a/tests/integration/test_storage_iceberg_with_spark/test_cluster_table_function.py b/tests/integration/test_storage_iceberg_with_spark/test_cluster_table_function.py index 39dae6e5fbd9..628a2149eb74 100644 --- a/tests/integration/test_storage_iceberg_with_spark/test_cluster_table_function.py +++ b/tests/integration/test_storage_iceberg_with_spark/test_cluster_table_function.py @@ -132,6 +132,24 @@ def add_df(mode): # write 3 times assert int(instance.query(f"SELECT count() FROM {table_function_expr_cluster}")) == 100 * 3 + + # Cluster Query with node1 as coordinator + table_function_expr_cluster = get_creation_expression( + storage_type, + TABLE_NAME, + started_cluster_iceberg_with_spark, + table_function=True, + run_on_cluster=True, + ) + select_remote_cluster = ( + instance.query(f"SELECT * FROM remote('node2',{table_function_expr_cluster})") + .strip() + .split() + ) + assert len(select_remote_cluster) == 600 + assert select_remote_cluster == select_regular + + @pytest.mark.parametrize("format_version", ["1", "2"]) @pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_writes_cluster_table_function(started_cluster_iceberg_with_spark, format_version, storage_type): diff --git a/tests/queries/0_stateless/02126_dist_desc.sql.j2 b/tests/queries/0_stateless/02126_dist_desc.sql.j2 index c0e1b5f8abd2..93f23dc9eb14 100644 --- a/tests/queries/0_stateless/02126_dist_desc.sql.j2 +++ b/tests/queries/0_stateless/02126_dist_desc.sql.j2 @@ -10,7 +10,7 @@ select * from remote('{{host}}', {{args}}) format Null; {% endfor -%} system flush logs query_log; -select anyIf(query, is_initial_query), groupArrayIf(query, query_kind = 'Describe' and not is_initial_query) from system.query_log +select anyIf(query, initial_query_id == query_id), groupArrayIf(query, query_kind = 'Describe' and initial_query_id != query_id) from system.query_log where event_date >= yesterday() AND event_time >= now() - 600 AND type = 'QueryFinish' and diff --git a/tests/queries/0_stateless/03550_analyzer_remote_view_columns.sql b/tests/queries/0_stateless/03550_analyzer_remote_view_columns.sql index 833106925266..820a7cd95f2b 100644 --- a/tests/queries/0_stateless/03550_analyzer_remote_view_columns.sql +++ b/tests/queries/0_stateless/03550_analyzer_remote_view_columns.sql @@ -39,4 +39,4 @@ WHERE event_date >= yesterday() AND event_time >= now() - 600 AND AND log_comment = 'THIS IS A COMMENT TO MARK THE INITIAL QUERY' LIMIT 1) AND type = 'QueryFinish' - AND NOT is_initial_query; + AND query_id != initial_query_id; diff --git a/tests/queries/0_stateless/03620_analyzer_distributed_global_in.reference b/tests/queries/0_stateless/03620_analyzer_distributed_global_in.reference index 070fc644d1c5..673ead4122c6 100644 --- a/tests/queries/0_stateless/03620_analyzer_distributed_global_in.reference +++ b/tests/queries/0_stateless/03620_analyzer_distributed_global_in.reference @@ -60,7 +60,7 @@ CreatingSets (Create sets before main query execution) ReadFromSystemNumbers system flush logs query_log; -- SKIP: current_database = currentDatabase() -select normalizeQuery(replace(query, currentDatabase(), 'default')) from system.query_log where event_date >= yesterday() AND event_time >= now() - 600 and log_comment like '%' || currentDatabase() || '%' and not is_initial_query and type != 'QueryStart' and query_kind = 'Select' order by event_time_microseconds; +select normalizeQuery(replace(query, currentDatabase(), 'default')) from system.query_log where event_date >= yesterday() AND event_time >= now() - 600 and log_comment like '%' || currentDatabase() || '%' and initial_query_id != query_id and type != 'QueryStart' and query_kind = 'Select' order by event_time_microseconds; SELECT `__table1`.`x` AS `x`, `__table1`.`y` AS `y` FROM `default`.`tab0` AS `__table1` HAVING in(`x`, (SELECT `__table1`.`number` + ? AS `?` FROM numbers(?) AS `__table1`)) SELECT `__table1`.`x` AS `x`, `__table1`.`y` AS `y` FROM `default`.`tab0` AS `__table1` HAVING globalIn(`x`, `?`) SELECT `__table1`.`x` AS `x`, `__table1`.`y` AS `y` FROM `default`.`tab0` AS `__table1` HAVING in(`x`, (SELECT `__table1`.`number` + ? AS `?` FROM numbers(?) AS `__table1`)) diff --git a/tests/queries/0_stateless/03620_analyzer_distributed_global_in.sql b/tests/queries/0_stateless/03620_analyzer_distributed_global_in.sql index 031dccb75f89..87193c7e06a8 100644 --- a/tests/queries/0_stateless/03620_analyzer_distributed_global_in.sql +++ b/tests/queries/0_stateless/03620_analyzer_distributed_global_in.sql @@ -30,4 +30,4 @@ select * from (explain indexes=1, distributed=1 ); system flush logs query_log; -- SKIP: current_database = currentDatabase() -select normalizeQuery(replace(query, currentDatabase(), 'default')) from system.query_log where event_date >= yesterday() AND event_time >= now() - 600 and log_comment like '%' || currentDatabase() || '%' and not is_initial_query and type != 'QueryStart' and query_kind = 'Select' order by event_time_microseconds; +select normalizeQuery(replace(query, currentDatabase(), 'default')) from system.query_log where event_date >= yesterday() AND event_time >= now() - 600 and log_comment like '%' || currentDatabase() || '%' and initial_query_id != query_id and type != 'QueryStart' and query_kind = 'Select' order by event_time_microseconds; From 03c03aaeb8277cb9e4f348afde135d198a107fbd Mon Sep 17 00:00:00 2001 From: Andrey Zvonov <32552679+zvonand@users.noreply.github.com> Date: Sun, 7 Jun 2026 19:28:17 +0200 Subject: [PATCH 2/2] Resolve conflicts in cherry-pick of #1646 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Kept 26.4's const-ref signature for ObjectFilterStep constructor (bucket 2: 26.4 improved the parameter from value to const-ref). Kept 26.4's detailed comment in ObjectFilterStep.h (bucket 2: 26.4 added more descriptive documentation). Kept both setDistributedFanout (26.4 addition) and the source PR's setRemoteFunction/setShardCount in RemoteQueryExecutor.h and ReadFromRemote.cpp — they serve different purposes. Kept 26.4's inline virtuals expression in StorageObjectStorageCluster.cpp instead of getVirtualsList() (bucket 2: getVirtualsList does not exist on antalya-26.4; the verbose expression is the equivalent). In test_s3_cluster: kept new tests test_remote_hedged, test_remote_no_hedged, test_joins from source PR; kept 26.4's two-parameter test_hive_partitioning (with use_partition_strategy) instead of source PR's one-parameter version; dropped test_graceful_shutdown (pre-existing in 26.3, not added by PR #1646). In test_database_iceberg: kept both test_invalid_auth_header_format (26.4) and _test_cluster_joins (source PR). --- src/Processors/QueryPlan/ObjectFilterStep.cpp | 4 - src/Processors/QueryPlan/ObjectFilterStep.h | 8 - src/Processors/QueryPlan/ReadFromRemote.cpp | 3 - src/QueryPipeline/RemoteQueryExecutor.h | 4 +- .../StorageObjectStorageCluster.cpp | 4 - .../integration/test_database_iceberg/test.py | 5 +- tests/integration/test_s3_cluster/test.py | 146 +----------------- 7 files changed, 6 insertions(+), 168 deletions(-) diff --git a/src/Processors/QueryPlan/ObjectFilterStep.cpp b/src/Processors/QueryPlan/ObjectFilterStep.cpp index 054ccdc3f40c..e7fab549d73a 100644 --- a/src/Processors/QueryPlan/ObjectFilterStep.cpp +++ b/src/Processors/QueryPlan/ObjectFilterStep.cpp @@ -15,11 +15,7 @@ namespace ErrorCodes } ObjectFilterStep::ObjectFilterStep( -<<<<<<< HEAD const SharedHeader & input_header_, -======= - SharedHeader input_header_, ->>>>>>> d9d3710bd9b (Merge pull request #1646 from Altinity/frontport/antalya-26.3/fix_remote_calls) ActionsDAG actions_dag_, String filter_column_name_) : actions_dag(std::move(actions_dag_)) diff --git a/src/Processors/QueryPlan/ObjectFilterStep.h b/src/Processors/QueryPlan/ObjectFilterStep.h index 593d44d6ed57..cbfaf05a0df1 100644 --- a/src/Processors/QueryPlan/ObjectFilterStep.h +++ b/src/Processors/QueryPlan/ObjectFilterStep.h @@ -5,25 +5,17 @@ namespace DB { -<<<<<<< HEAD /// Implements WHERE condition only to filter objects in object storage /// Difference with FilterStep is that ObjectFilterStep is added only for distributed calls /// (table functions like `s3Cluster`) and is used only to filter objects, /// not to filter data after reading, because initiator can have not this column /// In query like `SELECT count() FROM s3Cluster('cluster', ...) WHERE key=42` /// column `key` does not exist in blocks getting from cluster replicas. -======= -/// Implements WHERE operation. ->>>>>>> d9d3710bd9b (Merge pull request #1646 from Altinity/frontport/antalya-26.3/fix_remote_calls) class ObjectFilterStep : public IQueryPlanStep { public: ObjectFilterStep( -<<<<<<< HEAD const SharedHeader & input_header_, -======= - SharedHeader input_header_, ->>>>>>> d9d3710bd9b (Merge pull request #1646 from Altinity/frontport/antalya-26.3/fix_remote_calls) ActionsDAG actions_dag_, String filter_column_name_); diff --git a/src/Processors/QueryPlan/ReadFromRemote.cpp b/src/Processors/QueryPlan/ReadFromRemote.cpp index 37a03a95bed6..d870370ed08c 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.cpp +++ b/src/Processors/QueryPlan/ReadFromRemote.cpp @@ -670,12 +670,9 @@ void ReadFromRemote::addLazyPipe( auto remote_query_executor = std::make_shared( std::move(connections), query_string, header, my_context, my_throttler, my_scalars, my_external_tables, stage_to_use, my_shard.query_plan, /*extension=*/std::nullopt, my_shard.shard_info.pool); -<<<<<<< HEAD remote_query_executor->setDistributedFanout(my_distributed_fanout); -======= remote_query_executor->setRemoteFunction(my_is_remote_function); remote_query_executor->setShardCount(my_shard_count); ->>>>>>> d9d3710bd9b (Merge pull request #1646 from Altinity/frontport/antalya-26.3/fix_remote_calls) auto pipe = createRemoteSourcePipe( remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending, parallel_marshalling_threads); diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index 11027a95a65e..07a090fe83c0 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -218,13 +218,11 @@ class RemoteQueryExecutor void setUnavailableShardTracker(UnavailableShardTrackerPtr tracker) { unavailable_shard_tracker = std::move(tracker); } -<<<<<<< HEAD void setDistributedFanout(size_t total_connections) { distributed_fanout = total_connections; } -======= + void setRemoteFunction(bool is_remote_function_ = true) { is_remote_function = is_remote_function_; } void setShardCount(UInt32 shard_count_) { shard_count = shard_count_; } ->>>>>>> d9d3710bd9b (Merge pull request #1646 from Altinity/frontport/antalya-26.3/fix_remote_calls) const Block & getHeader() const { return *header; } const SharedHeader & getSharedHeader() const { return header; } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 5560d003a237..926f48928cec 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -251,11 +251,7 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten local_context, predicate, filter, -<<<<<<< HEAD storage_metadata_snapshot->virtuals.getSampleBlock(VirtualsKind::All, VirtualsMaterializationPlace::Reader).getNamesAndTypesList(), -======= - getVirtualsList(), ->>>>>>> d9d3710bd9b (Merge pull request #1646 from Altinity/frontport/antalya-26.3/fix_remote_calls) hive_partition_columns_to_read_from_file_path, nullptr, local_context->getFileProgressCallback(), diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 4179e6bd06cf..8c5311bc2fb8 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -964,7 +964,6 @@ def test_gcs(started_cluster): assert "Google cloud storage converts to S3" in str(err.value) -<<<<<<< HEAD def test_invalid_auth_header_format(started_cluster): node = started_cluster.instances["node1"] @@ -982,7 +981,8 @@ def test_invalid_auth_header_format(started_cluster): """ ) assert "Invalid auth header format" in str(err.value) -======= + + # TODO - turn on after merge alternative syntax def _test_cluster_joins(started_cluster): node = started_cluster.instances["node1"] @@ -1122,4 +1122,3 @@ def _test_cluster_joins(started_cluster): ) assert res == "Jack\tBlack\nJack\tSilver\nJohn\tBlack\nJohn\tSilver\n" ->>>>>>> d9d3710bd9b (Merge pull request #1646 from Altinity/frontport/antalya-26.3/fix_remote_calls) diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 418698fbade0..96e882d63014 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -3,11 +3,8 @@ import os import shutil import uuid -<<<<<<< HEAD -======= import threading import time ->>>>>>> d9d3710bd9b (Merge pull request #1646 from Altinity/frontport/antalya-26.3/fix_remote_calls) from email.errors import HeaderParseError import pytest @@ -517,11 +514,6 @@ def test_cluster_default_expression(started_cluster): assert result == expected_result -<<<<<<< HEAD -@pytest.mark.parametrize("allow_experimental_analyzer", [0, 1]) -@pytest.mark.parametrize("use_partition_strategy", [False, True]) -def test_hive_partitioning(started_cluster, allow_experimental_analyzer, use_partition_strategy): -======= def test_remote_hedged(started_cluster): node = started_cluster.instances["s0_0_0"] pure_s3 = node.query( @@ -576,139 +568,6 @@ def test_remote_no_hedged(started_cluster): assert TSV(pure_s3) == TSV(s3_distributed) -@pytest.mark.parametrize("allow_experimental_analyzer", [0, 1]) -def test_hive_partitioning(started_cluster, allow_experimental_analyzer): - node = started_cluster.instances["s0_0_0"] - - node.query(f"SET allow_experimental_analyzer = {allow_experimental_analyzer}") - - for i in range(1, 5): - exists = node.query( - f""" - SELECT - count() - FROM s3('http://minio1:9001/root/data/hive/key={i}/*', 'minio', '{minio_secret_key}', 'Parquet', 'key Int32, value Int32') - GROUP BY ALL - FORMAT TSV - """ - ) - if int(exists) == 0: - node.query( - f""" - INSERT - INTO FUNCTION s3('http://minio1:9001/root/data/hive/key={i}/data.parquet', 'minio', '{minio_secret_key}', 'Parquet', 'key Int32, value Int32') - SELECT {i}, {i} - SETTINGS use_hive_partitioning = 0 - """ - ) - - settings = "enable_filesystem_cache = 0, use_query_cache = 0, use_cache_for_count_from_files = 0, use_iceberg_metadata_files_cache = 0, use_parquet_metadata_cache = 0, use_page_cache_for_object_storage = 0" - - query_id_full = str(uuid.uuid4()) - result = node.query( - f""" - SELECT count() - FROM s3('http://minio1:9001/root/data/hive/key=**.parquet', 'minio', '{minio_secret_key}', 'Parquet', 'key Int32, value Int32') - WHERE key <= 2 - FORMAT TSV - SETTINGS {settings}, use_hive_partitioning = 0 - """, - query_id=query_id_full, - ) - result = int(result) - assert result == 2 - - query_id_optimized = str(uuid.uuid4()) - result = node.query( - f""" - SELECT count() - FROM s3('http://minio1:9001/root/data/hive/key=**.parquet', 'minio', '{minio_secret_key}', 'Parquet', 'key Int32, value Int32') - WHERE key <= 2 - FORMAT TSV - SETTINGS {settings}, use_hive_partitioning = 1 - """, - query_id=query_id_optimized, - ) - result = int(result) - assert result == 2 - - query_id_cluster_full = str(uuid.uuid4()) - result = node.query( - f""" - SELECT count() - FROM s3Cluster(cluster_simple, 'http://minio1:9001/root/data/hive/key=**.parquet', 'minio', '{minio_secret_key}', 'Parquet', 'key Int32, value Int32') - WHERE key <= 2 - FORMAT TSV - SETTINGS {settings}, use_hive_partitioning = 0 - """, - query_id=query_id_cluster_full, - ) - result = int(result) - assert result == 2 - - query_id_cluster_optimized = str(uuid.uuid4()) - result = node.query( - f""" - SELECT count() - FROM s3Cluster(cluster_simple, 'http://minio1:9001/root/data/hive/key=**.parquet', 'minio', '{minio_secret_key}', 'Parquet', 'key Int32, value Int32') - WHERE key <= 2 - FORMAT TSV - SETTINGS {settings}, use_hive_partitioning = 1 - """, - query_id=query_id_cluster_optimized, - ) - result = int(result) - assert result == 2 - - node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'") - - full_traffic = node.query( - f""" - SELECT sum(ProfileEvents['ReadBufferFromS3Bytes']) - FROM clusterAllReplicas(cluster_simple, system.query_log) - WHERE type='QueryFinish' AND initial_query_id='{query_id_full}' - FORMAT TSV - """ - ) - full_traffic = int(full_traffic) - assert full_traffic > 0 # 612*4 - - optimized_traffic = node.query( - f""" - SELECT sum(ProfileEvents['ReadBufferFromS3Bytes']) - FROM clusterAllReplicas(cluster_simple, system.query_log) - WHERE type='QueryFinish' AND initial_query_id='{query_id_optimized}' - FORMAT TSV - """ - ) - optimized_traffic = int(optimized_traffic) - assert optimized_traffic > 0 # 612*2 - assert full_traffic > optimized_traffic - - cluster_full_traffic = node.query( - f""" - SELECT sum(ProfileEvents['ReadBufferFromS3Bytes']) - FROM clusterAllReplicas(cluster_simple, system.query_log) - WHERE type='QueryFinish' AND initial_query_id='{query_id_cluster_full}' - FORMAT TSV - """ - ) - cluster_full_traffic = int(cluster_full_traffic) - assert cluster_full_traffic == full_traffic - - cluster_optimized_traffic = node.query( - f""" - SELECT sum(ProfileEvents['ReadBufferFromS3Bytes']) - FROM clusterAllReplicas(cluster_simple, system.query_log) - WHERE type='QueryFinish' AND initial_query_id='{query_id_cluster_optimized}' - FORMAT TSV - """ - ) - cluster_optimized_traffic = int(cluster_optimized_traffic) - assert cluster_optimized_traffic == optimized_traffic - - node.query("SET allow_experimental_analyzer = DEFAULT") - def test_joins(started_cluster): node = started_cluster.instances["s0_0_0"] @@ -875,8 +734,9 @@ def test_joins(started_cluster): assert len(res) == 25 -def test_graceful_shutdown(started_cluster): ->>>>>>> d9d3710bd9b (Merge pull request #1646 from Altinity/frontport/antalya-26.3/fix_remote_calls) +@pytest.mark.parametrize("allow_experimental_analyzer", [0, 1]) +@pytest.mark.parametrize("use_partition_strategy", [False, True]) +def test_hive_partitioning(started_cluster, allow_experimental_analyzer, use_partition_strategy): node = started_cluster.instances["s0_0_0"] data_path = f"root/data/hive_{allow_experimental_analyzer}/{random_string(6)}"