Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/Processors/Formats/Impl/Parquet/Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ void Reader::prefilterAndInitRowGroups(const std::optional<std::unordered_set<UI
auto add_prewhere_outputs = [&](const ActionsDAG & actions)
{
for (const auto * node : actions.getOutputs())
if (sample_block->has(node->result_name))
if (node->type != ActionsDAG::ActionType::INPUT && sample_block->has(node->result_name))
schemer.external_columns.push_back(node->result_name);
};
if (row_level_filter)
Expand Down Expand Up @@ -688,7 +688,8 @@ void Reader::preparePrewhere()
else
{
if (!prewhere_output_column_idxs.contains(idx_in_output_block))
throw Exception(ErrorCodes::LOGICAL_ERROR, "PREWHERE appears to use its own output as input");
throw Exception(ErrorCodes::LOGICAL_ERROR, "PREWHERE appears to use its own output as input: column '{}' (idx {})",
col.name, idx_in_output_block);
}
step.input_idxs.push_back(idx_in_output_block);
}
Expand Down Expand Up @@ -2063,7 +2064,7 @@ void Reader::applyPrewhere(RowSubgroup & row_subgroup, const RowGroup & row_grou
const ColumnWithTypeAndName & col = extended_sample_block.getByPosition(idx_in_output_block);
block.insert({getOrFormOutputColumn(row_subgroup, idx_in_output_block), col.type, col.name});
}
addDummyColumnWithRowCount(block, row_subgroup.filter.rows_total);
addDummyColumnWithRowCount(block, row_subgroup.filter.rows_pass);

step.actions.execute(block);

Expand Down
10 changes: 10 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,15 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
return res;
}

bool supportsPrewhere() const override
{
#if USE_AVRO
return std::is_same_v<DataLakeMetadata, IcebergMetadata>;
#else
return false;
#endif
}

private:
const DataLakeStorageSettingsPtr settings;
ObjectStoragePtr ready_object_storage;
Expand Down Expand Up @@ -599,6 +608,7 @@ class StorageIcebergConfiguration : public StorageObjectStorageConfiguration, pu
bool supportsFileIterator() const override { return getImpl().supportsFileIterator(); }
bool supportsParallelInsert() const override { return getImpl().supportsParallelInsert(); }
bool supportsWrites() const override { return getImpl().supportsWrites(); }
bool supportsPrewhere() const override { return getImpl().supportsPrewhere(); }

bool supportsPartialPathPrefix() const override { return getImpl().supportsPartialPathPrefix(); }

Expand Down
2 changes: 1 addition & 1 deletion src/Storages/ObjectStorage/StorageObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ StorageObjectStorage::StorageObjectStorage(
/// There's probably no reason for this, and it should just copy those fields like the others.
/// * If the table contains files in different formats, with only some of them supporting
/// prewhere, things break.
supports_prewhere = !configuration->isDataLakeConfiguration() && format_supports_prewhere;
supports_prewhere = configuration->supportsPrewhere() && format_supports_prewhere;
supports_tuple_elements = format_supports_prewhere;

StorageInMemoryMetadata metadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,11 @@ class StorageObjectStorageConfiguration
return false;
}

virtual bool supportsPrewhere() const
{
return true;
}

virtual void drop(ContextPtr) {}

virtual bool isClusterSupported() const { return true; }
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/ObjectStorage/StorageObjectStorageSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
auto mapper = configuration->getColumnMapperForObject(object_info);
if (!mapper)
return format_filter_info;
return std::make_shared<FormatFilterInfo>(format_filter_info->filter_actions_dag, format_filter_info->context.lock(), mapper, nullptr, nullptr);
return std::make_shared<FormatFilterInfo>(format_filter_info->filter_actions_dag, format_filter_info->context.lock(), mapper, format_filter_info->row_level_filter, format_filter_info->prewhere_info);
}();

LOG_DEBUG(
Expand Down
10 changes: 7 additions & 3 deletions src/Storages/prepareReadingFromFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,19 +258,23 @@ Names filterTupleColumnsToRead(NamesAndTypesList & requested_columns)

ReadFromFormatInfo updateFormatPrewhereInfo(const ReadFromFormatInfo & info, const FilterDAGInfoPtr & row_level_filter, const PrewhereInfoPtr & prewhere_info)
{
chassert(prewhere_info);
chassert(prewhere_info || row_level_filter);

if (info.prewhere_info || info.row_level_filter)
if (info.prewhere_info)
throw Exception(ErrorCodes::LOGICAL_ERROR, "updateFormatPrewhereInfo called more than once");

ReadFromFormatInfo new_info;
new_info.prewhere_info = prewhere_info;
new_info.row_level_filter = row_level_filter;

/// Removes columns that are only used as prewhere input.
/// Adds prewhere outputs (the actual prewhere filter column is only added if
/// !remove_prewhere_column; but there may also be subexpressions computed by prewhere
/// expression and preserved for use further down the query pipeline).
new_info.format_header = SourceStepWithFilter::applyPrewhereActions(info.format_header, row_level_filter, prewhere_info);
/// If row_level_filter was already applied in a previous call, don't re-apply it;
/// only apply the new prewhere_info on top.
new_info.format_header = SourceStepWithFilter::applyPrewhereActions(
info.format_header, info.row_level_filter ? nullptr : row_level_filter, prewhere_info);

/// We assume that any format that supports prewhere also supports subset of subcolumns, so we
/// don't need to replace subcolumns with their nested columns etc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@

@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"])
@pytest.mark.parametrize("is_table_function", [False, True])
def test_minmax_pruning(started_cluster_iceberg_with_spark, storage_type, is_table_function):
@pytest.mark.parametrize("use_prewhere", [False, True])
def test_minmax_pruning(started_cluster_iceberg_with_spark, storage_type, is_table_function, use_prewhere):
instance = started_cluster_iceberg_with_spark.instances["node1"]
spark = started_cluster_iceberg_with_spark.spark_session
TABLE_NAME = "test_minmax_pruning_" + storage_type + "_" + get_uuid_str()
Expand Down Expand Up @@ -95,6 +96,11 @@ def check_validity_and_get_prunned_files(select_expression):
instance, TABLE_NAME, settings1, settings2, 'IcebergMinMaxIndexPrunedFiles', select_expression
)


where_condition = "WHERE"
if use_prewhere and not is_table_function:
where_condition = "PREWHERE"

assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} ORDER BY ALL"
Expand All @@ -103,55 +109,55 @@ def check_validity_and_get_prunned_files(select_expression):
)
assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} WHERE date <= '2024-01-25' ORDER BY ALL"
f"SELECT * FROM {creation_expression} {where_condition} date <= '2024-01-25' ORDER BY ALL"
)
== 3
)
assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} WHERE ts <= timestamp('2024-03-20 14:00:00.000000') ORDER BY ALL"
f"SELECT * FROM {creation_expression} {where_condition} ts <= timestamp('2024-03-20 14:00:00.000000') ORDER BY ALL"
)
== 3
)

assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} WHERE tag == 1 ORDER BY ALL"
f"SELECT * FROM {creation_expression} {where_condition} tag == 1 ORDER BY ALL"
)
== 3
)

assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} WHERE tag <= 1 ORDER BY ALL"
f"SELECT * FROM {creation_expression} {where_condition} tag <= 1 ORDER BY ALL"
)
== 3
)

assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} WHERE name == 'vasilisa' ORDER BY ALL"
f"SELECT * FROM {creation_expression} {where_condition} name == 'vasilisa' ORDER BY ALL"
)
== 3
)

assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} WHERE name < 'kek' ORDER BY ALL"
f"SELECT * FROM {creation_expression} {where_condition} name < 'kek' ORDER BY ALL"
)
== 2
)

assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} WHERE number == 8 ORDER BY ALL"
f"SELECT * FROM {creation_expression} {where_condition} number == 8 ORDER BY ALL"
)
== 3
)

assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} WHERE number <= 5 ORDER BY ALL"
f"SELECT * FROM {creation_expression} {where_condition} number <= 5 ORDER BY ALL"
)
== 3
)
Expand All @@ -163,7 +169,7 @@ def check_validity_and_get_prunned_files(select_expression):

assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} WHERE date3 <= '2024-01-25' ORDER BY ALL"
f"SELECT * FROM {creation_expression} {where_condition} date3 <= '2024-01-25' ORDER BY ALL"
)
== 3
)
Expand All @@ -172,14 +178,14 @@ def check_validity_and_get_prunned_files(select_expression):

assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} WHERE tag <= 1 ORDER BY ALL"
f"SELECT * FROM {creation_expression} {where_condition} tag <= 1 ORDER BY ALL"
)
== 3
)

assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} WHERE time_struct.a <= '2024-02-01' ORDER BY ALL"
f"SELECT * FROM {creation_expression} {where_condition} time_struct.a <= '2024-02-01' ORDER BY ALL"
)
== 3
)
Expand All @@ -190,7 +196,7 @@ def check_validity_and_get_prunned_files(select_expression):

assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} WHERE time_struct.a <= '2024-02-01' ORDER BY ALL"
f"SELECT * FROM {creation_expression} {where_condition} time_struct.a <= '2024-02-01' ORDER BY ALL"
)
== 4
)
Expand All @@ -211,15 +217,15 @@ def check_validity_and_get_prunned_files(select_expression):

assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} WHERE ddd >= 100 ORDER BY ALL"
f"SELECT * FROM {creation_expression} {where_condition} ddd >= 100 ORDER BY ALL"
)
== 2
)
# Spark store rounded values of decimals, this query checks that we work it around.
# Please check the code where we parse lower bounds and upper bounds
assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} WHERE ddd >= toDecimal64('17.21', 3) ORDER BY ALL"
f"SELECT * FROM {creation_expression} {where_condition} ddd >= toDecimal64('17.21', 3) ORDER BY ALL"
)
== 1
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
0 0
1 0
2 0
3 0
4 0
5 0
6 0
7 0
8 0
9 0
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-- Tags: no-fasttest

-- Regression test: PREWHERE on a Parquet file with a column declared in the schema
-- but not present in the actual file. When the missing column appears both in the
-- PREWHERE condition and in the SELECT list, it stays in the format header as a
-- pass-through INPUT node in the prewhere DAG. The `add_prewhere_outputs` lambda
-- incorrectly added such INPUT nodes to `external_columns`, which prevented the
-- SchemaConverter from creating a missing-column entry. This caused "KeyCondition
-- uses PREWHERE output" or "PREWHERE appears to use its own output as input"
-- exceptions (LOGICAL_ERROR).

SET input_format_parquet_use_native_reader_v3 = 1;
SET input_format_parquet_allow_missing_columns = 1;
SET engine_file_truncate_on_insert = 1;

INSERT INTO FUNCTION file(currentDatabase() || '_04001.parquet')
SELECT number FROM numbers(10);

-- `extra` is not in the Parquet file; it defaults to 0 with allow_missing_columns.
-- The SELECT uses both `number` and `extra`, so `extra` remains in format_header as
-- a pass-through. Without the fix, `add_prewhere_outputs` incorrectly added `extra`
-- to `external_columns`, preventing SchemaConverter from creating a missing column.
SELECT number, extra FROM file(currentDatabase() || '_04001.parquet', Parquet, 'number UInt64, extra UInt64')
PREWHERE extra = 0
ORDER BY number;
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
--- Row policy filters URL Parquet table ---
1 a
2 b
--- Row policy with WHERE on URL Parquet table ---
1 a
--- Row policy count on URL Parquet table ---
2
46 changes: 46 additions & 0 deletions tests/queries/0_stateless/04053_row_policy_object_storage.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/usr/bin/env bash
# Tags: no-replicated-database, no-fasttest

# Regression test: row policy on a URL table with Parquet format caused
# "Logical error: 'prewhere_info'" because updateFormatPrewhereInfo asserted
# prewhere_info was non-null, but only row_level_filter was set.

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh

user="user04053_${CLICKHOUSE_DATABASE}_$RANDOM"
db=${CLICKHOUSE_DATABASE}

${CLICKHOUSE_CLIENT} <<EOF
DROP TABLE IF EXISTS ${db}.source_data;
CREATE TABLE ${db}.source_data (id UInt32, value String) ENGINE = MergeTree ORDER BY id;
INSERT INTO ${db}.source_data VALUES (1, 'a'), (2, 'b'), (3, 'c');

DROP TABLE IF EXISTS ${db}.url_parquet;
CREATE TABLE ${db}.url_parquet (id UInt32, value String)
ENGINE = URL('http://127.0.0.1:${CLICKHOUSE_PORT_HTTP}/?query=SELECT+*+FROM+${db}.source_data+FORMAT+Parquet', 'Parquet');

DROP USER IF EXISTS ${user};
CREATE USER ${user} IDENTIFIED WITH no_password;
GRANT SELECT ON ${db}.url_parquet TO ${user};

DROP ROW POLICY IF EXISTS rp_04053 ON ${db}.url_parquet;
CREATE ROW POLICY rp_04053 ON ${db}.url_parquet FOR SELECT USING id <= 2 TO ${user};
EOF

echo "--- Row policy filters URL Parquet table ---"
${CLICKHOUSE_CLIENT} --user ${user} --query "SELECT * FROM ${db}.url_parquet ORDER BY id"

echo "--- Row policy with WHERE on URL Parquet table ---"
${CLICKHOUSE_CLIENT} --user ${user} --query "SELECT * FROM ${db}.url_parquet WHERE value = 'a' ORDER BY id"

echo "--- Row policy count on URL Parquet table ---"
${CLICKHOUSE_CLIENT} --user ${user} --query "SELECT count() FROM ${db}.url_parquet"

${CLICKHOUSE_CLIENT} <<EOF
DROP ROW POLICY IF EXISTS rp_04053 ON ${db}.url_parquet;
DROP USER IF EXISTS ${user};
DROP TABLE IF EXISTS ${db}.url_parquet;
DROP TABLE IF EXISTS ${db}.source_data;
EOF
Loading