Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion src/Analyzer/FunctionNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <DataTypes/DataTypeSet.h>

#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSetQuery.h>

#include <Functions/IFunction.h>

Expand Down Expand Up @@ -164,14 +165,21 @@ void FunctionNode::dumpTreeImpl(WriteBuffer & buffer, FormatState & format_state
buffer << '\n' << std::string(indent + 2, ' ') << "WINDOW\n";
getWindowNode()->dumpTreeImpl(buffer, format_state, indent + 4);
}

if (!settings_changes.empty())
{
buffer << '\n' << std::string(indent + 2, ' ') << "SETTINGS";
for (const auto & change : settings_changes)
buffer << fmt::format(" {}={}", change.name, toString(change.value));
}
}

bool FunctionNode::isEqualImpl(const IQueryTreeNode & rhs, CompareOptions compare_options) const
{
const auto & rhs_typed = assert_cast<const FunctionNode &>(rhs);
if (function_name != rhs_typed.function_name || isAggregateFunction() != rhs_typed.isAggregateFunction()
|| isOrdinaryFunction() != rhs_typed.isOrdinaryFunction() || isWindowFunction() != rhs_typed.isWindowFunction()
|| nulls_action != rhs_typed.nulls_action)
|| nulls_action != rhs_typed.nulls_action || settings_changes != rhs_typed.settings_changes)
return false;

if (!compare_options.compare_types)
Expand Down Expand Up @@ -204,6 +212,17 @@ void FunctionNode::updateTreeHashImpl(HashState & hash_state, CompareOptions com
hash_state.update(isWindowFunction());
hash_state.update(nulls_action);

hash_state.update(settings_changes.size());
for (const auto & change : settings_changes)
{
hash_state.update(change.name.size());
hash_state.update(change.name);

const auto & value_dump = change.value.dump();
hash_state.update(value_dump.size());
hash_state.update(value_dump);
}

if (!compare_options.compare_types)
return;

Expand All @@ -225,6 +244,7 @@ QueryTreeNodePtr FunctionNode::cloneImpl() const
result_function->kind = kind;
result_function->nulls_action = nulls_action;
result_function->wrap_with_nullable = wrap_with_nullable;
result_function->settings_changes = settings_changes;

return result_function;
}
Expand Down Expand Up @@ -275,6 +295,14 @@ ASTPtr FunctionNode::toASTImpl(const ConvertToASTOptions & options) const
function_ast->window_definition = window_node->toAST(new_options);
}

if (!settings_changes.empty())
{
auto settings_ast = std::make_shared<ASTSetQuery>();
settings_ast->changes = settings_changes;
settings_ast->is_standalone = false;
function_ast->arguments->children.push_back(settings_ast);
}

return function_ast;
}

Expand Down
15 changes: 15 additions & 0 deletions src/Analyzer/FunctionNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <Functions/IFunction.h>
#include <Parsers/NullsAction.h>
#include <Common/typeid_cast.h>
#include <Common/SettingsChanges.h>

namespace DB
{
Expand Down Expand Up @@ -202,6 +203,18 @@ class FunctionNode final : public IQueryTreeNode
wrap_with_nullable = true;
}

/// Get settings changes passed to table function
const SettingsChanges & getSettingsChanges() const
{
return settings_changes;
}

/// Set settings changes passed as last argument to table function
void setSettingsChanges(SettingsChanges settings_changes_)
{
settings_changes = std::move(settings_changes_);
}

void dumpTreeImpl(WriteBuffer & buffer, FormatState & format_state, size_t indent) const override;

protected:
Expand All @@ -224,6 +237,8 @@ class FunctionNode final : public IQueryTreeNode
static constexpr size_t arguments_child_index = 1;
static constexpr size_t window_child_index = 2;
static constexpr size_t children_size = window_child_index + 1;

SettingsChanges settings_changes;
};

}
7 changes: 6 additions & 1 deletion src/Analyzer/QueryTreeBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,12 @@ QueryTreeNodePtr QueryTreeBuilder::buildExpression(const ASTPtr & expression, co
{
const auto & function_arguments_list = function->arguments->as<ASTExpressionList>()->children;
for (const auto & argument : function_arguments_list)
function_node->getArguments().getNodes().push_back(buildExpression(argument, context));
{
if (const auto * ast_set = argument->as<ASTSetQuery>())
function_node->setSettingsChanges(ast_set->changes);
else
function_node->getArguments().getNodes().push_back(buildExpression(argument, context));
}
}

if (function->is_window_function)
Expand Down
1 change: 1 addition & 0 deletions src/Analyzer/Resolve/QueryAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5013,6 +5013,7 @@ void QueryAnalyzer::resolveTableFunction(QueryTreeNodePtr & table_function_node,
{
auto table_function_node_to_resolve_typed = std::make_shared<TableFunctionNode>(table_function_argument_function_name);
table_function_node_to_resolve_typed->getArgumentsNode() = table_function_argument_function->getArgumentsNode();
table_function_node_to_resolve_typed->setSettingsChanges(table_function_argument_function->getSettingsChanges());

QueryTreeNodePtr table_function_node_to_resolve = std::move(table_function_node_to_resolve_typed);
if (table_function_argument_function_name == "view")
Expand Down
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7156,6 +7156,9 @@ Allow Iceberg read optimization based on Iceberg metadata.
)", EXPERIMENTAL) \
DECLARE(Bool, object_storage_remote_initiator, false, R"(
Execute request to object storage as remote on one of object_storage_cluster nodes.
)", EXPERIMENTAL) \
DECLARE(String, object_storage_remote_initiator_cluster, "", R"(
Cluster to choose remote initiator, when `object_storage_remote_initiator` is true. When empty, `object_storage_cluster` is used.
)", EXPERIMENTAL) \
\
/** Experimental timeSeries* aggregate functions. */ \
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
addSettingsChanges(settings_changes_history, "25.8.16.20002",
{
{"allow_local_data_lakes", false, false, "New setting."},
{"object_storage_remote_initiator_cluster", "", "", "New setting."},
});
addSettingsChanges(settings_changes_history, "25.8.9.2000",
{
Expand Down
41 changes: 35 additions & 6 deletions src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,11 @@ namespace Setting
extern const SettingsBool async_query_sending_for_remote;
extern const SettingsBool async_socket_for_remote;
extern const SettingsBool skip_unavailable_shards;
extern const SettingsBool parallel_replicas_local_plan;
extern const SettingsString cluster_for_parallel_replicas;
extern const SettingsNonZeroUInt64 max_parallel_replicas;
extern const SettingsObjectStorageClusterJoinMode object_storage_cluster_join_mode;
extern const SettingsUInt64 object_storage_max_nodes;
extern const SettingsBool object_storage_remote_initiator;
extern const SettingsString object_storage_remote_initiator_cluster;
}

namespace ErrorCodes
Expand Down Expand Up @@ -324,8 +323,6 @@ void IStorageCluster::read(

const auto & settings = context->getSettingsRef();

auto cluster = getClusterImpl(context, cluster_name_from_settings, settings[Setting::object_storage_max_nodes]);

/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)

SharedHeader sample_block;
Expand All @@ -346,9 +343,21 @@ void IStorageCluster::read(

updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context);

/// In case the current node is not supposed to initiate the clustered query
/// Sends this query to a remote initiator using the `remote` table function
if (settings[Setting::object_storage_remote_initiator])
{
auto storage_and_context = convertToRemote(cluster, context, cluster_name_from_settings, query_to_send);
/// Re-writes queries in the form of:
/// Input: SELECT * FROM iceberg(...) SETTINGS object_storage_cluster='swarm', object_storage_remote_initiator=1
/// Output: SELECT * FROM remote('remote_host', icebergCluster('swarm', ...)
/// Where `remote_host` is a random host from the cluster which will execute the query
/// This means the initiator node belongs to the same cluster that will execute the query
/// In case remote_initiator_cluster_name is set, the initiator might be set to a different cluster
auto remote_initiator_cluster_name = settings[Setting::object_storage_remote_initiator_cluster].value;
if (remote_initiator_cluster_name.empty())
remote_initiator_cluster_name = cluster_name_from_settings;
auto remote_initiator_cluster = getClusterImpl(context, remote_initiator_cluster_name);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Apply object_storage_max_nodes to initiator cluster selection

IStorageCluster::read now builds remote_initiator_cluster without the object_storage_max_nodes cap, so object_storage_remote_initiator=1 can pick a remote initiator from hosts that the query intentionally excluded via object_storage_max_nodes. In environments that use this cap to avoid unhealthy/unconfigured nodes, this introduces nondeterministic routing failures (auth/connectivity/cluster visibility) even though the setting is configured. The initiator cluster lookup should use the same max-host limit as the normal cluster path.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

@ianton-ru ianton-ru Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ok, with object_storage_max_nodes getCluster chooses this number of random cluster nodes, and for remote initiator also random node is selected, Select random node from random nodes sublist gives the same result as just select random node from full list.

auto storage_and_context = convertToRemote(remote_initiator_cluster, context, remote_initiator_cluster_name, query_to_send);
auto src_distributed = std::dynamic_pointer_cast<StorageDistributed>(storage_and_context.storage);
auto modified_query_info = query_info;
modified_query_info.cluster = src_distributed->getCluster();
Expand All @@ -357,6 +366,8 @@ void IStorageCluster::read(
return;
}

auto cluster = getClusterImpl(context, cluster_name_from_settings, settings[Setting::object_storage_max_nodes]);

RestoreQualifiedNamesVisitor::Data data;
data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_to_send->as<ASTSelectQuery &>(), 0));
data.remote_table.database = context->getCurrentDatabase();
Expand Down Expand Up @@ -390,6 +401,10 @@ IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote(
const std::string & cluster_name_from_settings,
ASTPtr query_to_send)
{
/// TODO: Allow to use secret for remote queries
if (!cluster->getSecret().empty())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Can't convert query to remote when cluster uses secret");

auto host_addresses = cluster->getShardsAddresses();
if (host_addresses.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty cluster {}", cluster_name_from_settings);
Expand All @@ -411,6 +426,7 @@ IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote(
/// Clean object_storage_remote_initiator setting to avoid infinite remote call
auto new_context = Context::createCopy(context);
new_context->setSetting("object_storage_remote_initiator", false);
new_context->setSetting("object_storage_remote_initiator_cluster", String(""));

auto * select_query = query_to_send->as<ASTSelectQuery>();
if (!select_query)
Expand All @@ -430,7 +446,20 @@ IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote(
if (!table_expression)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table expression");

auto remote_query = makeASTFunction(remote_function_name, std::make_shared<ASTLiteral>(host_name), table_expression->table_function);
std::shared_ptr<ASTFunction> remote_query;

if (shard_addresses[0].user_specified)
{ // with user/password for clsuter access remote query is executed from this user, add it in query parameters
remote_query = makeASTFunction(remote_function_name,
std::make_shared<ASTLiteral>(host_name),
table_expression->table_function,
std::make_shared<ASTLiteral>(shard_addresses[0].user),
std::make_shared<ASTLiteral>(shard_addresses[0].password));
}
else
{ // without specified user/password remote query is executed from default user
remote_query = makeASTFunction(remote_function_name, std::make_shared<ASTLiteral>(host_name), table_expression->table_function);
}

table_expression->table_function = remote_query;

Expand Down
1 change: 1 addition & 0 deletions src/Storages/StorageDistributed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,7 @@ QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info,

auto table_function_node = std::make_shared<TableFunctionNode>(remote_table_function_node.getFunctionName());
table_function_node->getArgumentsNode() = remote_table_function_node.getArgumentsNode();
table_function_node->setSettingsChanges(remote_table_function_node.getSettingsChanges());

if (table_expression_modifiers)
table_function_node->setTableExpressionModifiers(*table_expression_modifiers);
Expand Down
8 changes: 5 additions & 3 deletions src/TableFunctions/ITableFunctionCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ class ITableFunctionCluster : public Base

/// Cluster name is always the first
cluster_name = checkAndGetLiteralArgument<String>(args[0], "cluster_name");

if (!context->tryGetCluster(cluster_name))
throw Exception(ErrorCodes::CLUSTER_DOESNT_EXIST, "Requested cluster '{}' not found", cluster_name);
/// Remove check cluster existing here
/// In query like
/// remote('remote_host', xxxCluster('remote_cluster', ...))
/// 'remote_cluster' can be defined only on 'remote_host'
/// If cluster not exists, query falls later

/// Just cut the first arg (cluster_name) and try to parse other table function arguments as is
args.erase(args.begin());
Expand Down
33 changes: 33 additions & 0 deletions tests/integration/test_s3_cluster/configs/cluster.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,39 @@
</shard>
</cluster_with_dots>

<cluster_with_username_and_password>
<shard>
<replica>
<host>s0_0_1</host>
<port>9000</port>
<user>foo</user>
<password>bar</password>
</replica>
<replica>
<host>s0_1_0</host>
<port>9000</port>
<user>foo</user>
<password>bar</password>
</replica>
</shard>
</cluster_with_username_and_password>

<cluster_with_secret>
<secret>baz</secret>
<shard>
<replica>
<host>s0_0_1</host>
<port>9000</port>
<user>foo</user>
</replica>
<replica>
<host>s0_1_0</host>
<port>9000</port>
<user>foo</user>
</replica>
</shard>
</cluster_with_secret>

<cluster_all>
<shard>
<replica>
Expand Down
20 changes: 20 additions & 0 deletions tests/integration/test_s3_cluster/configs/hidden_clusters.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<clickhouse>
<remote_servers>
<hidden_cluster_with_username_and_password>
<shard>
<replica>
<host>s0_0_1</host>
<port>9000</port>
<user>foo</user>
<password>bar</password>
</replica>
<replica>
<host>s0_1_0</host>
<port>9000</port>
<user>foo</user>
<password>bar</password>
</replica>
</shard>
</hidden_cluster_with_username_and_password>
</remote_servers>
</clickhouse>
4 changes: 4 additions & 0 deletions tests/integration/test_s3_cluster/configs/users.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,9 @@
<profile>default</profile>
<named_collection_control>1</named_collection_control>
</default>
<foo>
<password>bar</password>
<profile>default</profile>
</foo>
</users>
</clickhouse>
Loading
Loading