Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,8 @@ void IStorageCluster::read(
{
auto remote_initiator_cluster_name = settings[Setting::object_storage_remote_initiator_cluster].value;
if (remote_initiator_cluster_name.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting 'object_storage_remote_initiator' can be used only with 'object_storage_remote_initiator_cluster' or 'object_storage_cluster'");
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Setting 'object_storage_remote_initiator' can be used only with 'object_storage_remote_initiator_cluster', 'object_storage_cluster', or cluster name in arguments");

/// rewrite query to execute `remote('remote_host', s3(...))`
/// remote_host can execute query itself or make on-cluster query depends on own `object_storage_cluster` setting
Expand Down
15 changes: 14 additions & 1 deletion src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ namespace Setting
extern const SettingsObjectStorageGranularityLevel cluster_table_function_split_granularity;
extern const SettingsBool parallel_replicas_for_cluster_engines;
extern const SettingsString object_storage_cluster;
extern const SettingsBool object_storage_remote_initiator;
extern const SettingsString object_storage_remote_initiator_cluster;
extern const SettingsInt64 delta_lake_snapshot_start_version;
extern const SettingsInt64 delta_lake_snapshot_end_version;
extern const SettingsUInt64 lock_object_storage_task_distribution_ms;
Expand All @@ -46,6 +48,7 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INVALID_SETTING_VALUE;
extern const int BAD_ARGUMENTS;
}

String StorageObjectStorageCluster::getPathSample(ContextPtr context)
Expand Down Expand Up @@ -719,9 +722,19 @@ String StorageObjectStorageCluster::getClusterName(ContextPtr context) const
QueryProcessingStage::Enum StorageObjectStorageCluster::getQueryProcessingStage(
ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info) const
{
if (!isClusterSupported())
return QueryProcessingStage::Enum::FetchColumns;

/// Full query if fall back to pure storage.
if (getClusterName(context).empty())
if (getClusterName(context).empty() // Not cluster request
&& context->getSettingsRef()[Setting::object_storage_remote_initiator_cluster].value.empty()) // Not request with remote initiator
{
if (context->getSettingsRef()[Setting::object_storage_remote_initiator])
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Setting 'object_storage_remote_initiator' can be used only with 'object_storage_remote_initiator_cluster', 'object_storage_cluster', or cluster name in arguments");

return QueryProcessingStage::Enum::FetchColumns;
}

/// Distributed storage.
return IStorageCluster::getQueryProcessingStage(context, to_stage, storage_snapshot, query_info);
Expand Down
13 changes: 12 additions & 1 deletion src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,18 @@ void TableFunctionObjectStorageClusterFallback<Definition, Base>::parseArguments
const auto & settings = context->getSettingsRef();

is_cluster_function = !settings[Setting::object_storage_cluster].value.empty() && typename Base::Configuration().isClusterSupported();
is_remote = settings[Setting::object_storage_remote_initiator];
// Remote initiator requires 'object_storage_cluster' or 'object_storage_remote_initiator_cluster'
if (settings[Setting::object_storage_remote_initiator])
{
if (settings[Setting::object_storage_cluster].value.empty()
&& settings[Setting::object_storage_remote_initiator_cluster].value.empty())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Setting 'object_storage_remote_initiator' can be used only with 'object_storage_remote_initiator_cluster', 'object_storage_cluster', or cluster name in arguments");
}

is_remote = true;
}

if (is_cluster_function)
{
Expand Down
75 changes: 75 additions & 0 deletions tests/integration/test_s3_cluster/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1511,6 +1511,81 @@ def test_object_storage_remote_initiator_without_cluster_function(started_cluste
"s0_1_0\tfoo"]


def test_object_storage_remote_initiator_aggregation(started_cluster):
node = started_cluster.instances["s0_0_0"]

# Remove initiator without cluster request
# Check that aggregation works on nodes
query_id = uuid.uuid4().hex

result = node.query(
f"""
SELECT sum(value) from s3(
'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
SETTINGS
object_storage_remote_initiator=1,
object_storage_remote_initiator_cluster='cluster_with_dots_and_user'
""",
query_id = query_id,
)

assert result == "67802152770\n"

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I'm not really fond of these magic constants as it's difficult to see for myself if that's actually the value we should be expecting, but I trust that you've verified it to be actually valid

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Me too, but I reused existed data files, https://github.com/Altinity/ClickHouse/blob/antalya-26.3/tests/integration/test_s3_cluster/data/clickhouse/part1.csv and others. Number means nothing, just summa of values in all files.


node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_all'")
result_rows = node.query(
f"""
SELECT sum(result_rows)
FROM clusterAllReplicas('cluster_all', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id}'
AND is_initial_query = 0
ORDER BY ALL
FORMAT TSV
"""
).splitlines()

# Data processed on cluster 'hidden_cluster_with_username_and_password'.
# Cluster contains two nodes, each returns one row.
assert result_rows == ["2"]

# Remove initiator without cluster request
# Check that aggregation works on nodes
query_id = uuid.uuid4().hex

result = node.query(
f"""
SELECT value % 2 as bit, sum(value) from s3(
'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
GROUP BY bit
ORDER BY bit
SETTINGS
object_storage_remote_initiator=1,
object_storage_remote_initiator_cluster='cluster_with_dots_and_user'
""",
query_id = query_id,
)

assert result == "0\t41117771522\n1\t26684381248\n"

node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_all'")
result_rows = node.query(
f"""
SELECT sum(result_rows)
FROM clusterAllReplicas('cluster_all', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id}'
AND is_initial_query = 0
ORDER BY ALL
FORMAT TSV
"""
).splitlines()

# Data processed on cluster 'hidden_cluster_with_username_and_password'.
# Cluster contains two nodes, each returns up to two rows, at least two rows totaly.
result_rows = int(result_rows[0])
assert result_rows >= 2 and result_rows <= 4


def test_hive_partitioning_with_where_condition(started_cluster):
node = started_cluster.instances["s0_0_0"]
test_id = uuid.uuid4().hex[:8]
Expand Down
Loading