From b4caaae0c218917bb2b0bc4d5c97b9fc2dc2a140 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Mon, 24 Nov 2025 18:06:41 +0800 Subject: [PATCH] feat: add expression evaluators 1 add ManifestEvaluator 2 add ResidualEvaluator 3 add InclusiveMetricsEvaluator 4 add StrictMetricsEvaluator --- src/iceberg/CMakeLists.txt | 4 + src/iceberg/expression/expression_visitor.h | 2 +- .../expression/inclusive_metrics_evaluator.cc | 543 ++++++++++++++++++ .../expression/inclusive_metrics_evaluator.h | 74 +++ src/iceberg/expression/manifest_evaluator.cc | 190 ++++++ src/iceberg/expression/manifest_evaluator.h | 84 +++ src/iceberg/expression/residual_evaluator.cc | 245 ++++++++ src/iceberg/expression/residual_evaluator.h | 92 +++ .../expression/strict_metrics_evaluator.cc | 466 +++++++++++++++ .../expression/strict_metrics_evaluator.h | 79 +++ src/iceberg/result.h | 2 + 11 files changed, 1780 insertions(+), 1 deletion(-) create mode 100644 src/iceberg/expression/inclusive_metrics_evaluator.cc create mode 100644 src/iceberg/expression/inclusive_metrics_evaluator.h create mode 100644 src/iceberg/expression/manifest_evaluator.cc create mode 100644 src/iceberg/expression/manifest_evaluator.h create mode 100644 src/iceberg/expression/residual_evaluator.cc create mode 100644 src/iceberg/expression/residual_evaluator.h create mode 100644 src/iceberg/expression/strict_metrics_evaluator.cc create mode 100644 src/iceberg/expression/strict_metrics_evaluator.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index dd78dc6bc..713527c7b 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -25,8 +25,12 @@ set(ICEBERG_SOURCES expression/expression.cc expression/expressions.cc expression/literal.cc + expression/manifest_evaluator.cc expression/predicate.cc expression/rewrite_not.cc + expression/residual_evaluator.cc + expression/inclusive_metrics_evaluator.cc + expression/strict_metrics_evaluator.cc expression/term.cc file_reader.cc file_writer.cc diff --git a/src/iceberg/expression/expression_visitor.h b/src/iceberg/expression/expression_visitor.h index aeafa9298..4bf403106 100644 --- a/src/iceberg/expression/expression_visitor.h +++ b/src/iceberg/expression/expression_visitor.h @@ -245,7 +245,7 @@ class ICEBERG_EXPORT BoundVisitor : public ExpressionVisitor { /// Bound visitors do not support unbound predicates. /// /// \param pred The unbound predicate - Result Predicate(const std::shared_ptr& pred) final { + Result Predicate(const std::shared_ptr& pred) override { ICEBERG_DCHECK(pred != nullptr, "UnboundPredicate cannot be null"); return NotSupported("Not a bound predicate: {}", pred->ToString()); } diff --git a/src/iceberg/expression/inclusive_metrics_evaluator.cc b/src/iceberg/expression/inclusive_metrics_evaluator.cc new file mode 100644 index 000000000..1015f677b --- /dev/null +++ b/src/iceberg/expression/inclusive_metrics_evaluator.cc @@ -0,0 +1,543 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/expression/inclusive_metrics_evaluator.h" + +#include "iceberg/expression/binder.h" +#include "iceberg/expression/expression_visitor.h" +#include "iceberg/expression/rewrite_not.h" +#include "iceberg/manifest_entry.h" +#include "iceberg/schema.h" +#include "iceberg/transform.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { +static const bool ROWS_MIGHT_MATCH = true; +static const bool ROWS_CANNOT_MATCH = false; +static const int32_t IN_PREDICATE_LIMIT = 200; +} // namespace + +class InclusiveMetricsVisitor : public BoundVisitor { + public: + explicit InclusiveMetricsVisitor(const DataFile& data_file) : data_file_(data_file) {} + + Result AlwaysTrue() override { return ROWS_MIGHT_MATCH; } + + Result AlwaysFalse() override { return ROWS_CANNOT_MATCH; } + + Result Not(bool child_result) override { return !child_result; } + + Result And(bool left_result, bool right_result) override { + return left_result && right_result; + } + + Result Or(bool left_result, bool right_result) override { + return left_result || right_result; + } + + Result IsNull(const std::shared_ptr& term) override { + if (IsNonNullPreserving(term)) { + // number of non-nulls is the same as for the ref + int id = term->reference()->field().field_id(); + if (!MayContainNull(id)) { + return ROWS_CANNOT_MATCH; + } + } + return ROWS_MIGHT_MATCH; + } + + Result NotNull(const std::shared_ptr& term) override { + // no need to check whether the field is required because binding evaluates that case + // if the column has no non-null values, the expression cannot match + + // all terms are null preserving. see #isNullPreserving(Bound) + int id = term->reference()->field().field_id(); + if (ContainsNullsOnly(id)) { + return ROWS_CANNOT_MATCH; + } + + return ROWS_MIGHT_MATCH; + } + + Result IsNaN(const std::shared_ptr& term) override { + // when there's no nanCounts information, but we already know the column only contains + // null, it's guaranteed that there's no NaN value + int id = term->reference()->field().field_id(); + if (ContainsNullsOnly(id)) { + return ROWS_CANNOT_MATCH; + } + auto ptr = std::dynamic_pointer_cast(term); + if (ptr == nullptr) { + return ROWS_MIGHT_MATCH; + } + if (!data_file_.nan_value_counts.empty() && + data_file_.nan_value_counts.contains(id) && + data_file_.nan_value_counts.at(id) == 0) { + return ROWS_CANNOT_MATCH; + } + return ROWS_MIGHT_MATCH; + } + + Result NotNaN(const std::shared_ptr& term) override { + auto ptr = std::dynamic_pointer_cast(term); + if (ptr == nullptr) { + // identity transforms are already removed by this time + return ROWS_MIGHT_MATCH; + } + + int id = term->reference()->field().field_id(); + + if (ContainsNaNsOnly(id)) { + return ROWS_CANNOT_MATCH; + } + + return ROWS_MIGHT_MATCH; + } + + Result Lt(const std::shared_ptr& term, const Literal& lit) override { + // all terms are null preserving. see #isNullPreserving(Bound) + int id = term->reference()->field().field_id(); + if (ContainsNullsOnly(id) || ContainsNaNsOnly(id)) { + return ROWS_CANNOT_MATCH; + } + auto lower_result = LowerBound(term); + if (!lower_result.has_value() || lower_result.value().IsNaN()) { + // NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more. + return ROWS_MIGHT_MATCH; + } + const auto& lower = lower_result.value(); + + // this also works for transforms that are order preserving: + // if a transform f is order preserving, a < b means that f(a) <= f(b). + // because lower <= a for all values of a in the file, f(lower) <= f(a). + // when f(lower) >= X then f(a) >= f(lower) >= X, so there is no a such that f(a) < X + // f(lower) >= X means rows cannot match + if (lit >= lower) { + return ROWS_CANNOT_MATCH; + } + + return ROWS_MIGHT_MATCH; + } + + Result LtEq(const std::shared_ptr& term, const Literal& lit) override { + // all terms are null preserving. see #isNullPreserving(Bound) + int id = term->reference()->field().field_id(); + if (ContainsNullsOnly(id) || ContainsNaNsOnly(id)) { + return ROWS_CANNOT_MATCH; + } + + auto lower_result = LowerBound(term); + if (!lower_result.has_value() || lower_result.value().IsNaN()) { + // NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more. + return ROWS_MIGHT_MATCH; + } + const auto& lower = lower_result.value(); + + // this also works for transforms that are order preserving: + // if a transform f is order preserving, a < b means that f(a) <= f(b). + // because lower <= a for all values of a in the file, f(lower) <= f(a). + // when f(lower) > X then f(a) >= f(lower) > X, so there is no a such that f(a) <= X + // f(lower) > X means rows cannot match + if (lit > lower) { + return ROWS_CANNOT_MATCH; + } + + return ROWS_MIGHT_MATCH; + } + + Result Gt(const std::shared_ptr& term, const Literal& lit) override { + // all terms are null preserving. see #isNullPreserving(Bound) + int id = term->reference()->field().field_id(); + if (ContainsNullsOnly(id) || ContainsNaNsOnly(id)) { + return ROWS_CANNOT_MATCH; + } + + auto upper_result = UpperBound(term); + if (!upper_result.has_value()) { + return ROWS_MIGHT_MATCH; + } + const auto& upper = upper_result.value(); + + if (lit <= upper) { + return ROWS_CANNOT_MATCH; + } + + return ROWS_MIGHT_MATCH; + } + + Result GtEq(const std::shared_ptr& term, const Literal& lit) override { + // all terms are null preserving. see #isNullPreserving(Bound) + int id = term->reference()->field().field_id(); + if (ContainsNullsOnly(id) || ContainsNaNsOnly(id)) { + return ROWS_CANNOT_MATCH; + } + + auto upper_result = UpperBound(term); + if (!upper_result.has_value()) { + return ROWS_MIGHT_MATCH; + } + const auto& upper = upper_result.value(); + if (lit < upper) { + return ROWS_CANNOT_MATCH; + } + + return ROWS_MIGHT_MATCH; + } + + Result Eq(const std::shared_ptr& term, const Literal& lit) override { + // all terms are null preserving. see #isNullPreserving(Bound) + int id = term->reference()->field().field_id(); + if (ContainsNullsOnly(id) || ContainsNaNsOnly(id)) { + return ROWS_CANNOT_MATCH; + } + + auto lower_result = LowerBound(term); + if (lower_result.has_value()) { + const auto& lower = lower_result.value(); + if (!lower.IsNaN() && lower > lit) { + return ROWS_CANNOT_MATCH; + } + } + + auto upper_result = UpperBound(term); + if (!upper_result.has_value()) { + return ROWS_MIGHT_MATCH; + } + const auto& upper = upper_result.value(); + if (upper != lit) { + return ROWS_CANNOT_MATCH; + } + + return ROWS_MIGHT_MATCH; + } + + Result NotEq(const std::shared_ptr& term, + const Literal& lit) override { + // because the bounds are not necessarily a min or max value, this cannot be answered + // using them. notEq(col, X) with (X, Y) doesn't guarantee that X is a value in col. + return ROWS_MIGHT_MATCH; + } + + Result In(const std::shared_ptr& term, + const BoundSetPredicate::LiteralSet& literal_set) override { + // all terms are null preserving. see #isNullPreserving(Bound) + int id = term->reference()->field().field_id(); + if (ContainsNullsOnly(id) || ContainsNaNsOnly(id)) { + return ROWS_CANNOT_MATCH; + } + + if (literal_set.size() > IN_PREDICATE_LIMIT) { + // skip evaluating the predicate if the number of values is too big + return ROWS_MIGHT_MATCH; + } + + auto lower_result = LowerBound(term); + if (!lower_result.has_value() || lower_result.value().IsNaN()) { + // NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more. + return ROWS_MIGHT_MATCH; + } + const auto& lower = lower_result.value(); + std::vector literals; + for (const auto& lit : literal_set) { + if (lit >= lower) { + literals.emplace_back(lit); + } + } + // if all values are less than lower bound, rows cannot match + if (literals.empty()) { + return ROWS_CANNOT_MATCH; + } + + auto upper_result = UpperBound(term); + if (!upper_result.has_value()) { + return ROWS_MIGHT_MATCH; + } + const auto& upper = upper_result.value(); + std::erase_if(literals, [&](const Literal& x) { return x > upper; }); + // if remaining values are greater than upper bound, rows cannot match + if (literals.empty()) { + return ROWS_CANNOT_MATCH; + } + + return ROWS_MIGHT_MATCH; + } + + Result NotIn(const std::shared_ptr& term, + const BoundSetPredicate::LiteralSet& literal_set) override { + // because the bounds are not necessarily a min or max value, this cannot be answered + // using them. notIn(col, {X, ...}) with (X, Y) doesn't guarantee that X is a value in + // col. + return ROWS_MIGHT_MATCH; + } + + Result StartsWith(const std::shared_ptr& term, + const Literal& lit) override { + if (term->kind() == BoundTerm::Kind::kTransform && + std::dynamic_pointer_cast(term)->transform()->transform_type() != + TransformType::kIdentity) { + // truncate must be rewritten in binding. the result is either always or never + // compatible + return ROWS_MIGHT_MATCH; + } + + int id = term->reference()->field().field_id(); + if (ContainsNullsOnly(id)) { + return ROWS_CANNOT_MATCH; + } + if (lit.type()->type_id() != TypeId::kString) { + return ROWS_CANNOT_MATCH; + } + std::string prefix = get(lit.value()); + + auto lower_result = LowerBound(term); + if (!lower_result.has_value()) { + return ROWS_MIGHT_MATCH; + } + const auto& lower = lower_result.value(); + auto lower_str = get(lower.value()); + // truncate lower bound so that its length in bytes is not greater than the length of + // prefix + int length = std::min(prefix.size(), lower_str.size()); + // if prefix of lower bound is greater than prefix, rows cannot match + if (lower_str.substr(0, length) > prefix.substr(0, length)) { + return ROWS_CANNOT_MATCH; + } + + auto upper_result = UpperBound(term); + if (!upper_result.has_value()) { + return ROWS_MIGHT_MATCH; + } + const auto& upper = upper_result.value(); + auto upper_str = get(upper.value()); + // truncate upper bound so that its length in bytes is not greater than the length of + // prefix + length = std::min(prefix.size(), upper_str.size()); + // if prefix of upper bound is less than prefix, rows cannot match + if (upper_str.substr(0, length) < prefix.substr(0, length)) { + return ROWS_CANNOT_MATCH; + } + + return ROWS_MIGHT_MATCH; + } + + Result NotStartsWith(const std::shared_ptr& term, + const Literal& lit) override { + // the only transforms that produce strings are truncate and identity, which work with + // this + int id = term->reference()->field().field_id(); + if (MayContainNull(id)) { + return ROWS_MIGHT_MATCH; + } + + if (lit.type()->type_id() != TypeId::kString) { + return ROWS_CANNOT_MATCH; + } + std::string prefix = get(lit.value()); + + // notStartsWith will match unless all values must start with the prefix. This happens + // when the lower and upper bounds both start with the prefix. + auto lower_result = LowerBound(term); + auto upper_result = UpperBound(term); + if (!lower_result.has_value() || !upper_result.has_value()) { + return ROWS_MIGHT_MATCH; + } + const auto& lower = lower_result.value(); + const auto& upper = upper_result.value(); + auto lower_str = get(lower.value()); + auto upper_str = get(upper.value()); + + // if lower is shorter than the prefix then lower doesn't start with the prefix + if (lower_str.size() < prefix.size()) { + return ROWS_MIGHT_MATCH; + } + + if (prefix == lower_str.substr(0, prefix.size())) { + // if upper is shorter than the prefix then upper can't start with the prefix + if (upper_str.size() < prefix.size()) { + return ROWS_MIGHT_MATCH; + } + if (upper_str.substr(0, prefix.size()) == prefix) { + // both bounds match the prefix, so all rows must match the prefix and therefore + // do not satisfy the predicate + return ROWS_CANNOT_MATCH; + } + } + + return ROWS_MIGHT_MATCH; + } + + private: + bool MayContainNull(int32_t id) { + return data_file_.null_value_counts.empty() || + !data_file_.null_value_counts.contains(id) || + data_file_.null_value_counts.at(id) != 0; + } + + bool ContainsNullsOnly(int32_t id) { + return !data_file_.value_counts.empty() && data_file_.value_counts.contains(id) && + !data_file_.null_value_counts.empty() && + data_file_.null_value_counts.contains(id) && + data_file_.value_counts.at(id) - data_file_.null_value_counts.at(id) == 0; + } + + bool ContainsNaNsOnly(int32_t id) { + return !data_file_.nan_value_counts.empty() && + data_file_.nan_value_counts.contains(id) && !data_file_.value_counts.empty() && + data_file_.value_counts.at(id) == data_file_.nan_value_counts.at(id); + } + + Result LowerBound(const std::shared_ptr& term) { + if (term->kind() == BoundTerm::Kind::kReference) { + return ParseLowerBound(*std::dynamic_pointer_cast(term)); + } else if (term->kind() == BoundTerm::Kind::kTransform) { + return TransformLowerBound(*std::dynamic_pointer_cast(term)); + } else if (term->kind() == BoundTerm::Kind::kExtract) { + // TODO(xiao.dong) handle extract lower and upper bounds + return NotImplemented("Extract lower bound not implemented."); + } else { + return NotFound("Lower bound not found."); + } + } + + Result UpperBound(const std::shared_ptr& term) { + if (term->kind() == BoundTerm::Kind::kReference) { + return ParseUpperBound(*std::dynamic_pointer_cast(term)); + } else if (term->kind() == BoundTerm::Kind::kTransform) { + return TransformUpperBound(*std::dynamic_pointer_cast(term)); + } else if (term->kind() == BoundTerm::Kind::kExtract) { + // TODO(xiao.dong) handle extract lower and upper bounds + return NotImplemented("Extract upper bound not implemented."); + } else { + return NotFound("Upper bound not found."); + } + } + + Result ParseLowerBound(const BoundReference& ref) { + int id = ref.field().field_id(); + auto type = ref.type(); + if (!type->is_primitive()) { + return InvalidStats("Lower bound of non-primitive type is not supported."); + } + auto primitive_type = std::dynamic_pointer_cast(type); + if (!data_file_.lower_bounds.empty() && data_file_.lower_bounds.contains(id)) { + ICEBERG_ASSIGN_OR_RAISE( + auto lower, + Literal::Deserialize(data_file_.lower_bounds.at(id), primitive_type)); + return lower; + } + + return NotFound("Lower bound not found."); + } + + Result ParseUpperBound(const BoundReference& ref) { + int id = ref.field().field_id(); + auto type = ref.type(); + if (!type->is_primitive()) { + return InvalidStats("Upper bound of non-primitive type is not supported."); + } + auto primitive_type = std::dynamic_pointer_cast(type); + if (!data_file_.upper_bounds.empty() && data_file_.upper_bounds.contains(id)) { + ICEBERG_ASSIGN_OR_RAISE( + auto upper, + Literal::Deserialize(data_file_.upper_bounds.at(id), primitive_type)); + return upper; + } + + return NotFound("Upper bound not found."); + } + + Result TransformLowerBound(BoundTransform& boundTransform) { + auto transform = boundTransform.transform(); + if (transform->PreservesOrder()) { + ICEBERG_ASSIGN_OR_RAISE(auto lower, ParseLowerBound(*boundTransform.reference())); + ICEBERG_ASSIGN_OR_RAISE(auto transform_func, + transform->Bind(boundTransform.reference()->type())); + return transform_func->Transform(lower); + } + + return NotFound("Transform lower bound not found."); + } + + Result TransformUpperBound(BoundTransform& boundTransform) { + auto transform = boundTransform.transform(); + if (transform->PreservesOrder()) { + ICEBERG_ASSIGN_OR_RAISE(auto upper, ParseLowerBound(*boundTransform.reference())); + ICEBERG_ASSIGN_OR_RAISE(auto transform_func, + transform->Bind(boundTransform.reference()->type())); + return transform_func->Transform(upper); + } + + return NotFound("Transform upper bound not found."); + } + + // TODO(xiao.dong) handle extract lower and upper bounds + /* + Literal extractLowerBound(const BoundExtract& bound) { + + } + + Literal extractUpperBound(const BoundExtract& bound) { + + } +*/ + + /** Returns true if the expression term produces a non-null value for non-null input. */ + bool IsNonNullPreserving(const std::shared_ptr& term) { + if (term->kind() == BoundTerm::Kind::kReference) { + return true; + } else if (term->kind() == BoundTerm::Kind::kTransform) { + return std::dynamic_pointer_cast(term) + ->transform() + ->PreservesOrder(); + } + // a non-null variant does not necessarily contain a specific field + // and unknown bound terms are not non-null preserving + return false; + } + + private: + const DataFile& data_file_; +}; + +InclusiveMetricsEvaluator::InclusiveMetricsEvaluator( + const std::shared_ptr& expr) + : expr_(std::move(expr)) {} + +InclusiveMetricsEvaluator::~InclusiveMetricsEvaluator() = default; + +Result> InclusiveMetricsEvaluator::Make( + std::shared_ptr expr, const Schema& schema, bool case_sensitive) { + ICEBERG_ASSIGN_OR_RAISE(auto rewrite_expr, RewriteNot::Visit(std::move(expr))); + ICEBERG_ASSIGN_OR_RAISE(auto bound_expr, + Binder::Bind(schema, rewrite_expr, case_sensitive)); + return std::unique_ptr( + new InclusiveMetricsEvaluator(std::move(bound_expr))); +} + +Result InclusiveMetricsEvaluator::Eval(const DataFile& data_file) const { + if (data_file.record_count <= 0) { + return ROWS_CANNOT_MATCH; + } + InclusiveMetricsVisitor visitor(data_file); + return Visit(expr_, visitor); +} + +} // namespace iceberg diff --git a/src/iceberg/expression/inclusive_metrics_evaluator.h b/src/iceberg/expression/inclusive_metrics_evaluator.h new file mode 100644 index 000000000..4a5869f91 --- /dev/null +++ b/src/iceberg/expression/inclusive_metrics_evaluator.h @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/expression/inclusive_metrics_evaluator.h +/// +/// Evaluates an Expression on a DataFile to test whether rows in the file may match. +/// +/// This evaluation is inclusive: it returns true if a file may match and false if it +/// cannot match. +/// +/// Files are passed to #eval(ContentFile), which returns true if the file may contain +/// matching rows and false if the file cannot contain matching rows. Files may be skipped +/// if and only if the return value of eval is false. +/// +/// Due to the comparison implementation of ORC stats, for float/double columns in ORC +/// files, if the first value in a file is NaN, metrics of this file will report NaN for +/// both upper and lower bound despite that the column could contain non-NaN data. Thus in +/// some scenarios explicitly checks for NaN is necessary in order to not skip files that +/// may contain matching data. +/// + +#include + +#include "iceberg/expression/expression.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +class ICEBERG_EXPORT InclusiveMetricsEvaluator { + public: + /// \brief Make a inclusive metrics evaluator + /// + /// \param expr The expression to evaluate + /// \param schema The schema of the table + /// \param case_sensitive Whether field name matching is case-sensitive + static Result> Make( + std::shared_ptr expr, const Schema& schema, bool case_sensitive = true); + + ~InclusiveMetricsEvaluator(); + + /// \brief Evaluate the expression against a DataFile. + /// + /// \param data_file The data file to evaluate + /// \return true if the file matches the expression, false otherwise, or error + Result Eval(const DataFile& data_file) const; + + private: + explicit InclusiveMetricsEvaluator(const std::shared_ptr& expr); + + private: + std::shared_ptr expr_; +}; + +} // namespace iceberg diff --git a/src/iceberg/expression/manifest_evaluator.cc b/src/iceberg/expression/manifest_evaluator.cc new file mode 100644 index 000000000..5bfac2ac2 --- /dev/null +++ b/src/iceberg/expression/manifest_evaluator.cc @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/expression/manifest_evaluator.h" + +#include "iceberg/expression/binder.h" +#include "iceberg/expression/expression_visitor.h" +#include "iceberg/expression/rewrite_not.h" +#include "iceberg/manifest_list.h" +#include "iceberg/schema.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +class ManifestEvalVisitor : public BoundVisitor { + public: + explicit ManifestEvalVisitor(const ManifestFile& manifest) + : manifest_(manifest), stats_(manifest.partitions) {} + + Result AlwaysTrue() override { return true; } + + Result AlwaysFalse() override { return false; } + + Result Not(bool child_result) override { return !child_result; } + + Result And(bool left_result, bool right_result) override { + return left_result && right_result; + } + + Result Or(bool left_result, bool right_result) override { + return left_result || right_result; + } + + Result IsNull(const std::shared_ptr& term) override { + // TODO(xiao.dong) need a wrapper like PartitionStructLike + return NotImplemented("NotImplemented"); + // ICEBERG_ASSIGN_OR_RAISE(auto value, term->Evaluate(manifest_)); + // return value.IsNull(); + } + + Result NotNull(const std::shared_ptr& term) override { + ICEBERG_ASSIGN_OR_RAISE(auto value, IsNull(term)); + return !value; + } + + Result IsNaN(const std::shared_ptr& term) override { + // TODO(xiao.dong) need a wrapper like PartitionStructLike + return NotImplemented("NotImplemented"); + // ICEBERG_ASSIGN_OR_RAISE(auto value, term->Evaluate(manifest_)); + // return value.IsNaN(); + } + + Result NotNaN(const std::shared_ptr& term) override { + ICEBERG_ASSIGN_OR_RAISE(auto value, IsNaN(term)); + return !value; + } + + Result Lt(const std::shared_ptr& term, const Literal& lit) override { + // TODO(xiao.dong) need a wrapper like PartitionStructLike + return NotImplemented("NotImplemented"); + // ICEBERG_ASSIGN_OR_RAISE(auto value, term->Evaluate(manifest_)); + // return value < lit; + } + + Result LtEq(const std::shared_ptr& term, const Literal& lit) override { + // TODO(xiao.dong) need a wrapper like PartitionStructLike + return NotImplemented("NotImplemented"); + // ICEBERG_ASSIGN_OR_RAISE(auto value, term->Evaluate(manifest_)); + // return value <= lit; + } + + Result Gt(const std::shared_ptr& term, const Literal& lit) override { + // TODO(xiao.dong) need a wrapper like PartitionStructLike + return NotImplemented("NotImplemented"); + // ICEBERG_ASSIGN_OR_RAISE(auto value, term->Evaluate(manifest_)); + // return value > lit; + } + + Result GtEq(const std::shared_ptr& term, const Literal& lit) override { + // TODO(xiao.dong) need a wrapper like PartitionStructLike + return NotImplemented("NotImplemented"); + // ICEBERG_ASSIGN_OR_RAISE(auto value, term->Evaluate(manifest_)); + // return value >= lit; + } + + Result Eq(const std::shared_ptr& term, const Literal& lit) override { + // TODO(xiao.dong) need a wrapper like PartitionStructLike + return NotImplemented("NotImplemented"); + // ICEBERG_ASSIGN_OR_RAISE(auto value, term->Evaluate(manifest_)); + // return value == lit; + } + + Result NotEq(const std::shared_ptr& term, + const Literal& lit) override { + ICEBERG_ASSIGN_OR_RAISE(auto eq_result, Eq(term, lit)); + return !eq_result; + } + + Result In(const std::shared_ptr& term, + const BoundSetPredicate::LiteralSet& literal_set) override { + // TODO(xiao.dong) need a wrapper like PartitionStructLike + return NotImplemented("NotImplemented"); + // ICEBERG_ASSIGN_OR_RAISE(auto value, term->Evaluate(manifest_)); + // return literal_set.contains(value); + } + + Result NotIn(const std::shared_ptr& term, + const BoundSetPredicate::LiteralSet& literal_set) override { + ICEBERG_ASSIGN_OR_RAISE(auto in_result, In(term, literal_set)); + return !in_result; + } + + Result StartsWith(const std::shared_ptr& term, + const Literal& lit) override { + // TODO(xiao.dong) need a wrapper like PartitionStructLike + return NotImplemented("NotImplemented"); + // + // ICEBERG_ASSIGN_OR_RAISE(auto value, term->Evaluate(manifest_)); + // + // // Both value and literal should be strings + // if (!std::holds_alternative(value.value()) || + // !std::holds_alternative(lit.value())) { + // return false; + // } + // + // const auto& str_value = std::get(value.value()); + // const auto& str_prefix = std::get(lit.value()); + // return str_value.starts_with(str_prefix); + } + + Result NotStartsWith(const std::shared_ptr& term, + const Literal& lit) override { + ICEBERG_ASSIGN_OR_RAISE(auto starts_result, StartsWith(term, lit)); + return !starts_result; + } + + private: + const ManifestFile& manifest_; + const std::vector& stats_; +}; + +ManifestEvaluator::ManifestEvaluator(const std::shared_ptr& expr) + : expr_(std::move(expr)) {} + +ManifestEvaluator::~ManifestEvaluator() = default; + +Result> ManifestEvaluator::MakeRowFilter( + [[maybe_unused]] std::shared_ptr expr, + [[maybe_unused]] const std::shared_ptr spec, + [[maybe_unused]] const Schema& schema, [[maybe_unused]] bool case_sensitive) { + // TODO(xiao.dong) we need a projection util to project row filter to the partition col + return NotImplemented("ManifestEvaluator::MakeRowFilter"); +} + +Result> ManifestEvaluator::MakePartitionFilter( + std::shared_ptr expr, const std::shared_ptr spec, + const Schema& schema, bool case_sensitive) { + ICEBERG_ASSIGN_OR_RAISE(auto partition_type, spec->PartitionType(schema)); + auto field_span = partition_type->fields(); + std::vector fields(field_span.begin(), field_span.end()); + auto partition_schema = std::make_shared(fields); + ICEBERG_ASSIGN_OR_RAISE(auto rewrite_expr, RewriteNot::Visit(std::move(expr))); + ICEBERG_ASSIGN_OR_RAISE(auto partition_expr, + Binder::Bind(*partition_schema, rewrite_expr, case_sensitive)); + return std::unique_ptr( + new ManifestEvaluator(std::move(partition_expr))); +} + +Result ManifestEvaluator::Eval(const ManifestFile& manifest) const { + ManifestEvalVisitor visitor(manifest); + return Visit(expr_, visitor); +} + +} // namespace iceberg diff --git a/src/iceberg/expression/manifest_evaluator.h b/src/iceberg/expression/manifest_evaluator.h new file mode 100644 index 000000000..4f19a61f4 --- /dev/null +++ b/src/iceberg/expression/manifest_evaluator.h @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/expression/manifest_evaluator.h +/// +/// Evaluates an Expression on a ManifestFile to test whether the file contains +/// matching partitions. +/// +/// For row expressions, evaluation is inclusive: it returns true if a file +/// may match and false if it cannot match. +/// +/// Files are passed to #eval(ManifestFile), which returns true if the manifest may +/// contain data files that match the partition expression. Manifest files may be +/// skipped if and only if the return value of eval is false. +/// + +#include + +#include "iceberg/expression/expression.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/partition_spec.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Evaluates an Expression against manifest. +/// \note: The evaluator is thread-safe. +class ICEBERG_EXPORT ManifestEvaluator { + public: + /// \brief Make a manifest evaluator for RowFilter + /// + /// \param expr The expression to evaluate + /// \param spec The partition spec + /// \param schema The schema of the table + /// \param case_sensitive Whether field name matching is case-sensitive + static Result> MakeRowFilter( + std::shared_ptr expr, const std::shared_ptr spec, + const Schema& schema, bool case_sensitive = true); + + /// \brief Make a manifest evaluator for PartitionFilter + /// + /// \param expr The expression to evaluate + /// \param spec The partition spec + /// \param schema The schema of the table + /// \param case_sensitive Whether field name matching is case-sensitive + static Result> MakePartitionFilter( + std::shared_ptr expr, const std::shared_ptr spec, + const Schema& schema, bool case_sensitive = true); + + ~ManifestEvaluator(); + + /// \brief Evaluate the expression against a manifest. + /// + /// \param manifest The manifest to evaluate + /// \return true if the row matches the expression, false otherwise, or error + Result Eval(const ManifestFile& manifest) const; + + private: + explicit ManifestEvaluator(const std::shared_ptr& expr); + + private: + std::shared_ptr expr_; +}; + +} // namespace iceberg diff --git a/src/iceberg/expression/residual_evaluator.cc b/src/iceberg/expression/residual_evaluator.cc new file mode 100644 index 000000000..451e3cd0e --- /dev/null +++ b/src/iceberg/expression/residual_evaluator.cc @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/expression/residual_evaluator.h" + +#include "iceberg/expression/binder.h" +#include "iceberg/expression/expression_visitor.h" +#include "iceberg/expression/expressions.h" +#include "iceberg/expression/rewrite_not.h" +#include "iceberg/schema.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +class ResidualEvalVisitor : public BoundVisitor> { + public: + explicit ResidualEvalVisitor(const StructLike& partition_data, + std::shared_ptr schema, bool case_sensitive) + : case_sensitive_(case_sensitive), + schema_(schema), + partition_data_(partition_data) {} + + Result> AlwaysTrue() override { + return Expressions::AlwaysTrue(); + } + + Result> AlwaysFalse() override { + return Expressions::AlwaysFalse(); + } + + Result> Not( + const std::shared_ptr& child_result) override { + return Expressions::Not(child_result); + } + + Result> And( + const std::shared_ptr& left_result, + const std::shared_ptr& right_result) override { + return Expressions::And(left_result, right_result); + } + + Result> Or( + const std::shared_ptr& left_result, + const std::shared_ptr& right_result) override { + return Expressions::Or(left_result, right_result); + } + + Result> IsNull( + const std::shared_ptr& term) override { + ICEBERG_ASSIGN_OR_RAISE(auto value, term->Evaluate(partition_data_)); + return value.IsNull() ? AlwaysTrue() : AlwaysFalse(); + } + + Result> NotNull( + const std::shared_ptr& term) override { + ICEBERG_ASSIGN_OR_RAISE(auto value, term->Evaluate(partition_data_)); + return value.IsNull() ? AlwaysFalse() : AlwaysTrue(); + } + + Result> IsNaN( + const std::shared_ptr& term) override { + ICEBERG_ASSIGN_OR_RAISE(auto value, term->Evaluate(partition_data_)); + return value.IsNaN() ? AlwaysTrue() : AlwaysFalse(); + } + + Result> NotNaN( + const std::shared_ptr& term) override { + ICEBERG_ASSIGN_OR_RAISE(auto value, term->Evaluate(partition_data_)); + return value.IsNaN() ? AlwaysFalse() : AlwaysTrue(); + } + + Result> Lt(const std::shared_ptr& term, + const Literal& lit) override { + ICEBERG_ASSIGN_OR_RAISE(auto value, term->Evaluate(partition_data_)); + return value < lit ? AlwaysTrue() : AlwaysFalse(); + } + + Result> LtEq(const std::shared_ptr& term, + const Literal& lit) override { + ICEBERG_ASSIGN_OR_RAISE(auto value, term->Evaluate(partition_data_)); + return value <= lit ? AlwaysTrue() : AlwaysFalse(); + } + + Result> Gt(const std::shared_ptr& term, + const Literal& lit) override { + ICEBERG_ASSIGN_OR_RAISE(auto value, term->Evaluate(partition_data_)); + return value > lit ? AlwaysTrue() : AlwaysFalse(); + } + + Result> GtEq(const std::shared_ptr& term, + const Literal& lit) override { + ICEBERG_ASSIGN_OR_RAISE(auto value, term->Evaluate(partition_data_)); + return value >= lit ? AlwaysTrue() : AlwaysFalse(); + } + + Result> Eq(const std::shared_ptr& term, + const Literal& lit) override { + ICEBERG_ASSIGN_OR_RAISE(auto value, term->Evaluate(partition_data_)); + return value == lit ? AlwaysTrue() : AlwaysFalse(); + } + + Result> NotEq(const std::shared_ptr& term, + const Literal& lit) override { + ICEBERG_ASSIGN_OR_RAISE(auto value, term->Evaluate(partition_data_)); + return value != lit ? AlwaysTrue() : AlwaysFalse(); + } + + Result> In( + const std::shared_ptr& term, + const BoundSetPredicate::LiteralSet& literal_set) override { + ICEBERG_ASSIGN_OR_RAISE(auto value, term->Evaluate(partition_data_)); + return literal_set.contains(value) ? AlwaysTrue() : AlwaysFalse(); + } + + Result> NotIn( + const std::shared_ptr& term, + const BoundSetPredicate::LiteralSet& literal_set) override { + ICEBERG_ASSIGN_OR_RAISE(auto value, term->Evaluate(partition_data_)); + return literal_set.contains(value) ? AlwaysFalse() : AlwaysTrue(); + } + + Result> StartsWith(const std::shared_ptr& term, + const Literal& lit) override { + ICEBERG_ASSIGN_OR_RAISE(auto value, term->Evaluate(partition_data_)); + + // Both value and literal should be strings + if (!std::holds_alternative(value.value()) || + !std::holds_alternative(lit.value())) { + return AlwaysFalse(); + } + + const auto& str_value = std::get(value.value()); + const auto& str_prefix = std::get(lit.value()); + return str_value.starts_with(str_prefix) ? AlwaysTrue() : AlwaysFalse(); + } + + Result> NotStartsWith( + const std::shared_ptr& term, const Literal& lit) override { + ICEBERG_ASSIGN_OR_RAISE(auto value, term->Evaluate(partition_data_)); + + // Both value and literal should be strings + if (!std::holds_alternative(value.value()) || + !std::holds_alternative(lit.value())) { + return AlwaysTrue(); + } + + const auto& str_value = std::get(value.value()); + const auto& str_prefix = std::get(lit.value()); + return str_value.starts_with(str_prefix) ? AlwaysFalse() : AlwaysTrue(); + } + + Result> Predicate( + const std::shared_ptr& pred) override { + // TODO(xiao.dong) need to do Transform::project + return nullptr; + } + + Result> Predicate( + const std::shared_ptr& pred) override { + ICEBERG_ASSIGN_OR_RAISE(auto bound, pred->Bind(*schema_, case_sensitive_)); + if (bound->is_bound_predicate()) { + ICEBERG_ASSIGN_OR_RAISE( + auto bound_residual, + Predicate(std::dynamic_pointer_cast(bound))); + if (bound_residual->is_bound_predicate() || + bound_residual->is_unbound_predicate()) { + return pred; // replace inclusive original unbound predicate + } + return bound_residual; // use the non-predicate residual (e.g. alwaysTrue) + } + // if binding didn't result in a Predicate, return the expression + return bound; + } + + private: + bool case_sensitive_; + std::shared_ptr schema_; + const StructLike& partition_data_; +}; + +class UnpartitionedResidualEvaluator : public ResidualEvaluator { + public: + UnpartitionedResidualEvaluator(std::shared_ptr expr, + std::shared_ptr schema, bool case_sensitive) + : ResidualEvaluator(std::move(expr), std::move(schema), case_sensitive) {} + + Result> ResidualFor( + const StructLike& partition_data) const override { + return expr_; + } +}; + +ResidualEvaluator::ResidualEvaluator(const std::shared_ptr& expr, + std::shared_ptr schema, bool case_sensitive) + : case_sensitive_(case_sensitive), + expr_(std::move(expr)), + schema_(std::move(schema)) {} + +ResidualEvaluator::~ResidualEvaluator() = default; + +Result> ResidualEvaluator::Make( + std::shared_ptr expr, const std::shared_ptr spec, + std::shared_ptr schema, bool case_sensitive) { + if (spec->fields().empty()) { + return MakeUnpartitioned(expr); + } + ICEBERG_ASSIGN_OR_RAISE(auto partition_type, spec->PartitionType(*schema)); + auto field_span = partition_type->fields(); + std::vector fields(field_span.begin(), field_span.end()); + auto partition_schema = std::make_shared(fields); + ICEBERG_ASSIGN_OR_RAISE(auto rewrite_expr, RewriteNot::Visit(std::move(expr))); + ICEBERG_ASSIGN_OR_RAISE(auto partition_expr, + Binder::Bind(*partition_schema, rewrite_expr, case_sensitive)); + return std::unique_ptr(new ResidualEvaluator( + std::move(partition_expr), std::move(schema), case_sensitive)); +} + +Result> ResidualEvaluator::MakeUnpartitioned( + std::shared_ptr expr) { + return std::make_unique(std::move(expr), nullptr, true); +} + +Result> ResidualEvaluator::ResidualFor( + const StructLike& partition_data) const { + ResidualEvalVisitor visitor(partition_data, schema_, case_sensitive_); + return Visit, ResidualEvalVisitor>(expr_, visitor); +} + +} // namespace iceberg diff --git a/src/iceberg/expression/residual_evaluator.h b/src/iceberg/expression/residual_evaluator.h new file mode 100644 index 000000000..75004ec1f --- /dev/null +++ b/src/iceberg/expression/residual_evaluator.h @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/expression/residual_evaluator.h +/// +/// Finds the residuals for an Expression the partitions in the given PartitionSpec. +/// +/// A residual expression is made by partially evaluating an expression using partition +/// values. For example, if a table is partitioned by day(utc_timestamp) and is read with +/// a filter expression utc_timestamp >= a and utc_timestamp <= b, then there are 4 +/// possible residuals expressions for the partition data, d: +/// +///
    +///
  • If d > day(a) and d < day(b), the residual is always true +///
  • If d == day(a) and d != day(b), the residual is utc_timestamp >= a +///
  • if d == day(b) and d != day(a), the residual is utc_timestamp <= b +///
  • If d == day(a) == day(b), the residual is utc_timestamp >= a and +/// utc_timestamp <= b +///
+/// +///

Partition data is passed using {@link StructLike}. Residuals are returned by +/// #residualFor(StructLike). +/// +///

This class is thread-safe. +/// + +#include + +#include "iceberg/expression/expression.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/partition_spec.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +class ICEBERG_EXPORT ResidualEvaluator { + public: + /// \brief Make a residual evaluator + /// + /// \param expr The expression to evaluate + /// \param spec The partition spec + /// \param schema The schema of the table + /// \param case_sensitive Whether field name matching is case-sensitive + static Result> Make( + std::shared_ptr expr, const std::shared_ptr spec, + std::shared_ptr schema, bool case_sensitive = true); + + /// \brief Make a residual evaluator for unpartitioned tables + /// + /// \param expr The expression to evaluate + static Result> MakeUnpartitioned( + std::shared_ptr expr); + + ~ResidualEvaluator(); + + /// \brief Returns a residual expression for the given partition values. + /// + /// \param partition_data The partition data values + /// \return the residual of this evaluator's expression from the partition values + virtual Result> ResidualFor( + const StructLike& partition_data) const; + + protected: + explicit ResidualEvaluator(const std::shared_ptr& expr, + std::shared_ptr schema, bool case_sensitive); + + protected: + bool case_sensitive_; + std::shared_ptr expr_; + std::shared_ptr schema_; +}; + +} // namespace iceberg diff --git a/src/iceberg/expression/strict_metrics_evaluator.cc b/src/iceberg/expression/strict_metrics_evaluator.cc new file mode 100644 index 000000000..8b7cafe35 --- /dev/null +++ b/src/iceberg/expression/strict_metrics_evaluator.cc @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/expression/strict_metrics_evaluator.h" + +#include "iceberg/expression/binder.h" +#include "iceberg/expression/expression_visitor.h" +#include "iceberg/expression/rewrite_not.h" +#include "iceberg/manifest_entry.h" +#include "iceberg/schema.h" +#include "iceberg/schema_field.h" +#include "iceberg/transform.h" +#include "iceberg/type.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { +static const bool ROWS_MUST_MATCH = true; +static const bool ROWS_MIGHT_NOT_MATCH = false; +} // namespace + +class StrictMetricsVisitor : public BoundVisitor { + public: + explicit StrictMetricsVisitor(const DataFile& data_file, std::shared_ptr schema) + : data_file_(data_file), schema_(std::move(schema)) {} + + // TODO(xiao.dong) handleNonReference not implement + + Result AlwaysTrue() override { return ROWS_MUST_MATCH; } + + Result AlwaysFalse() override { return ROWS_MIGHT_NOT_MATCH; } + + Result Not(bool child_result) override { return !child_result; } + + Result And(bool left_result, bool right_result) override { + return left_result && right_result; + } + + Result Or(bool left_result, bool right_result) override { + return left_result || right_result; + } + + Result IsNull(const std::shared_ptr& term) override { + int id = term->reference()->field().field_id(); + if (IsNestedColumn(id)) { + return ROWS_MIGHT_NOT_MATCH; + } + if (ContainsNullsOnly(id)) { + return ROWS_MUST_MATCH; + } + return ROWS_MIGHT_NOT_MATCH; + } + + Result NotNull(const std::shared_ptr& term) override { + // no need to check whether the field is required because binding evaluates that case + // if the column has any null values, the expression does not match + int id = term->reference()->field().field_id(); + if (IsNestedColumn(id)) { + return ROWS_MIGHT_NOT_MATCH; + } + + if (!data_file_.null_value_counts.empty() && + data_file_.null_value_counts.contains(id) && + data_file_.null_value_counts.at(id) == 0) { + return ROWS_MUST_MATCH; + } + + return ROWS_MIGHT_NOT_MATCH; + } + + Result IsNaN(const std::shared_ptr& term) override { + int id = term->reference()->field().field_id(); + + if (ContainsNaNsOnly(id)) { + return ROWS_MUST_MATCH; + } + + return ROWS_MIGHT_NOT_MATCH; + } + + Result NotNaN(const std::shared_ptr& term) override { + int id = term->reference()->field().field_id(); + + if (!data_file_.nan_value_counts.empty() && + data_file_.nan_value_counts.contains(id) && + data_file_.nan_value_counts.at(id) == 0) { + return ROWS_MUST_MATCH; + } + + if (ContainsNullsOnly(id)) { + return ROWS_MUST_MATCH; + } + + return ROWS_MIGHT_NOT_MATCH; + } + + Result Lt(const std::shared_ptr& term, const Literal& lit) override { + // Rows must match when: <----------Min----Max---X-------> + int id = term->reference()->field().field_id(); + if (IsNestedColumn(id)) { + return ROWS_MIGHT_NOT_MATCH; + } + + if (CanContainNulls(id) || CanContainNaNs(id)) { + return ROWS_MIGHT_NOT_MATCH; + } + + if (!data_file_.upper_bounds.empty() && data_file_.upper_bounds.contains(id)) { + ICEBERG_ASSIGN_OR_RAISE(auto upper, + ParseBound(term, data_file_.upper_bounds.at(id))); + if (upper < lit) { + return ROWS_MUST_MATCH; + } + } + + return ROWS_MIGHT_NOT_MATCH; + } + + Result LtEq(const std::shared_ptr& term, const Literal& lit) override { + // Rows must match when: <----------Min----Max---X-------> + int id = term->reference()->field().field_id(); + if (IsNestedColumn(id)) { + return ROWS_MIGHT_NOT_MATCH; + } + + if (CanContainNulls(id) || CanContainNaNs(id)) { + return ROWS_MIGHT_NOT_MATCH; + } + + if (!data_file_.upper_bounds.empty() && data_file_.upper_bounds.contains(id)) { + ICEBERG_ASSIGN_OR_RAISE(auto upper, + ParseBound(term, data_file_.upper_bounds.at(id))); + + if (upper <= lit) { + return ROWS_MUST_MATCH; + } + } + + return ROWS_MIGHT_NOT_MATCH; + } + + Result Gt(const std::shared_ptr& term, const Literal& lit) override { + // Rows must match when: <-------X---Min----Max----------> + int id = term->reference()->field().field_id(); + if (IsNestedColumn(id)) { + return ROWS_MIGHT_NOT_MATCH; + } + + if (CanContainNulls(id) || CanContainNaNs(id)) { + return ROWS_MIGHT_NOT_MATCH; + } + + if (!data_file_.lower_bounds.empty() && data_file_.lower_bounds.contains(id)) { + ICEBERG_ASSIGN_OR_RAISE(auto lower, + ParseBound(term, data_file_.lower_bounds.at(id))); + + if (lower.IsNaN()) { + // NaN indicates unreliable bounds. See the StrictMetricsEvaluator docs for + // more. + return ROWS_MIGHT_NOT_MATCH; + } + + if (lower > lit) { + return ROWS_MUST_MATCH; + } + } + + return ROWS_MIGHT_NOT_MATCH; + } + + Result GtEq(const std::shared_ptr& term, const Literal& lit) override { + // Rows must match when: <-------X---Min----Max----------> + int id = term->reference()->field().field_id(); + if (IsNestedColumn(id)) { + return ROWS_MIGHT_NOT_MATCH; + } + + if (CanContainNulls(id) || CanContainNaNs(id)) { + return ROWS_MIGHT_NOT_MATCH; + } + + if (!data_file_.lower_bounds.empty() && data_file_.lower_bounds.contains(id)) { + ICEBERG_ASSIGN_OR_RAISE(auto lower, + ParseBound(term, data_file_.lower_bounds.at(id))); + + if (lower.IsNaN()) { + // NaN indicates unreliable bounds. See the StrictMetricsEvaluator docs for + // more. + return ROWS_MIGHT_NOT_MATCH; + } + + if (lower >= lit) { + return ROWS_MUST_MATCH; + } + } + + return ROWS_MIGHT_NOT_MATCH; + } + + Result Eq(const std::shared_ptr& term, const Literal& lit) override { + // Rows must match when Min == X == Max + int id = term->reference()->field().field_id(); + if (IsNestedColumn(id)) { + return ROWS_MIGHT_NOT_MATCH; + } + + if (CanContainNulls(id) || CanContainNaNs(id)) { + return ROWS_MIGHT_NOT_MATCH; + } + + if (!data_file_.lower_bounds.empty() && data_file_.lower_bounds.contains(id) && + !data_file_.upper_bounds.empty() && data_file_.upper_bounds.contains(id)) { + ICEBERG_ASSIGN_OR_RAISE(auto lower, + ParseBound(term, data_file_.lower_bounds.at(id))); + + if (lower != lit) { + return ROWS_MIGHT_NOT_MATCH; + } + + ICEBERG_ASSIGN_OR_RAISE(auto upper, + ParseBound(term, data_file_.upper_bounds.at(id))); + + if (upper != lit) { + return ROWS_MIGHT_NOT_MATCH; + } + + return ROWS_MUST_MATCH; + } + + return ROWS_MIGHT_NOT_MATCH; + } + + Result NotEq(const std::shared_ptr& term, + const Literal& lit) override { + // Rows must match when X < Min or Max < X because it is not in the range + int id = term->reference()->field().field_id(); + if (IsNestedColumn(id)) { + return ROWS_MIGHT_NOT_MATCH; + } + + if (ContainsNullsOnly(id) || ContainsNaNsOnly(id)) { + return ROWS_MUST_MATCH; + } + + if (!data_file_.lower_bounds.empty() && data_file_.lower_bounds.contains(id)) { + ICEBERG_ASSIGN_OR_RAISE(auto lower, + ParseBound(term, data_file_.lower_bounds.at(id))); + + if (lower.IsNaN()) { + // NaN indicates unreliable bounds. See the StrictMetricsEvaluator docs for + // more. + return ROWS_MIGHT_NOT_MATCH; + } + + if (lower > lit) { + return ROWS_MUST_MATCH; + } + } + + if (!data_file_.upper_bounds.empty() && data_file_.upper_bounds.contains(id)) { + ICEBERG_ASSIGN_OR_RAISE(auto upper, + ParseBound(term, data_file_.upper_bounds.at(id))); + + if (upper < lit) { + return ROWS_MUST_MATCH; + } + } + + return ROWS_MIGHT_NOT_MATCH; + } + + Result In(const std::shared_ptr& term, + const BoundSetPredicate::LiteralSet& literal_set) override { + int id = term->reference()->field().field_id(); + if (IsNestedColumn(id)) { + return ROWS_MIGHT_NOT_MATCH; + } + + if (CanContainNulls(id) || CanContainNaNs(id)) { + return ROWS_MIGHT_NOT_MATCH; + } + + if (!data_file_.lower_bounds.empty() && data_file_.lower_bounds.contains(id) && + !data_file_.upper_bounds.empty() && data_file_.upper_bounds.contains(id)) { + // similar to the implementation in eq, first check if the lower bound is in the + // set + ICEBERG_ASSIGN_OR_RAISE(auto lower, + ParseBound(term, data_file_.lower_bounds.at(id))); + + if (!literal_set.contains(lower)) { + return ROWS_MIGHT_NOT_MATCH; + } + + // check if the upper bound is in the set + ICEBERG_ASSIGN_OR_RAISE(auto upper, + ParseBound(term, data_file_.upper_bounds.at(id))); + if (!literal_set.contains(upper)) { + return ROWS_MIGHT_NOT_MATCH; + } + + // finally check if the lower bound and the upper bound are equal + if (lower != upper) { + return ROWS_MIGHT_NOT_MATCH; + } + + // All values must be in the set if the lower bound and the upper bound are in the + // set and are equal. + return ROWS_MUST_MATCH; + } + + return ROWS_MIGHT_NOT_MATCH; + } + + Result NotIn(const std::shared_ptr& term, + const BoundSetPredicate::LiteralSet& literal_set) override { + int id = term->reference()->field().field_id(); + if (IsNestedColumn(id)) { + return ROWS_MIGHT_NOT_MATCH; + } + + if (ContainsNullsOnly(id) || ContainsNaNsOnly(id)) { + return ROWS_MUST_MATCH; + } + std::vector literals; + if (!data_file_.lower_bounds.empty() && data_file_.lower_bounds.contains(id)) { + ICEBERG_ASSIGN_OR_RAISE(auto lower, + ParseBound(term, data_file_.lower_bounds.at(id))); + + if (lower.IsNaN()) { + // NaN indicates unreliable bounds. See the StrictMetricsEvaluator docs for + // more. + return ROWS_MIGHT_NOT_MATCH; + } + + for (const auto& lit : literal_set) { + if (lit >= lower) { + literals.emplace_back(lit); + } + } + // if all values are less than lower bound, rows must + // match (notIn). + if (literals.empty()) { + return ROWS_MUST_MATCH; + } + } + + if (!data_file_.upper_bounds.empty() && data_file_.upper_bounds.contains(id)) { + ICEBERG_ASSIGN_OR_RAISE(auto upper, + ParseBound(term, data_file_.upper_bounds.at(id))); + std::erase_if(literals, [&](const Literal& x) { return x > upper; }); + if (literals.empty()) { + // if all remaining values are greater than upper bound, + // rows must match + // (notIn). + return ROWS_MUST_MATCH; + } + } + + return ROWS_MIGHT_NOT_MATCH; + } + + Result StartsWith(const std::shared_ptr& term, + const Literal& lit) override { + return ROWS_MIGHT_NOT_MATCH; + } + + Result NotStartsWith(const std::shared_ptr& term, + const Literal& lit) override { + // TODO(xiao.dong) Handle cases that definitely cannot match, + // such as notStartsWith("x") when + // the bounds are ["a", "b"]. + return ROWS_MIGHT_NOT_MATCH; + } + + private: + Result ParseBound(const std::shared_ptr& term, + const std::vector& stats) { + auto type = term->reference()->type(); + if (!type->is_primitive()) { + return InvalidStats("Bound of non-primitive type is not supported."); + } + auto primitive_type = std::dynamic_pointer_cast(type); + ICEBERG_ASSIGN_OR_RAISE(auto bound, Literal::Deserialize(stats, primitive_type)); + return bound; + } + + bool CanContainNulls(int32_t id) { + return data_file_.null_value_counts.empty() || + (data_file_.null_value_counts.contains(id) && + data_file_.null_value_counts.at(id) > 0); + } + + bool CanContainNaNs(int32_t id) { + // nan counts might be null for early version writers when nan counters are not + // populated. + return !data_file_.nan_value_counts.empty() && + data_file_.nan_value_counts.contains(id) && + data_file_.nan_value_counts.at(id) > 0; + } + + bool ContainsNullsOnly(int32_t id) { + return !data_file_.value_counts.empty() && data_file_.value_counts.contains(id) && + !data_file_.null_value_counts.empty() && + data_file_.null_value_counts.contains(id) && + data_file_.value_counts.at(id) - data_file_.null_value_counts.at(id) == 0; + } + + bool ContainsNaNsOnly(int32_t id) { + return !data_file_.nan_value_counts.empty() && + data_file_.nan_value_counts.contains(id) && !data_file_.value_counts.empty() && + data_file_.value_counts.at(id) == data_file_.nan_value_counts.at(id); + } + + bool IsNestedColumn(int id) { + auto field = schema_->GetFieldById(id); + return !field.has_value() || !field.value().has_value() || + field.value()->get().type()->is_nested(); + } + + private: + const DataFile& data_file_; + std::shared_ptr schema_; +}; + +StrictMetricsEvaluator::StrictMetricsEvaluator(const std::shared_ptr& expr, + std::shared_ptr schema) + : expr_(std::move(expr)), schema_(std::move(schema)) {} + +StrictMetricsEvaluator::~StrictMetricsEvaluator() = default; + +Result> StrictMetricsEvaluator::Make( + std::shared_ptr expr, std::shared_ptr schema, + bool case_sensitive) { + ICEBERG_ASSIGN_OR_RAISE(auto rewrite_expr, RewriteNot::Visit(std::move(expr))); + ICEBERG_ASSIGN_OR_RAISE(auto bound_expr, + Binder::Bind(*schema, rewrite_expr, case_sensitive)); + return std::unique_ptr( + new StrictMetricsEvaluator(std::move(bound_expr), std::move(schema))); +} + +Result StrictMetricsEvaluator::Eval(const DataFile& data_file) const { + if (data_file.record_count <= 0) { + return ROWS_MIGHT_NOT_MATCH; + } + StrictMetricsVisitor visitor(data_file, schema_); + return Visit(expr_, visitor); +} + +} // namespace iceberg diff --git a/src/iceberg/expression/strict_metrics_evaluator.h b/src/iceberg/expression/strict_metrics_evaluator.h new file mode 100644 index 000000000..a6dc9ea4f --- /dev/null +++ b/src/iceberg/expression/strict_metrics_evaluator.h @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/expression/strict_metrics_evaluator.h +/// +/// Evaluates an Expression on a DataFile to test whether all rows in the file match. +/// +/// This evaluation is strict: it returns true if all rows in a file must match the +/// expression. For example, if a file's ts column has min X and max Y, this evaluator +/// will return true for ts < Y+1 but not for ts < Y-1. +/// +/// Files are passed to #eval(ContentFile), which returns true if all rows in the file +/// must contain matching rows and false if the file may contain rows that do not match. +/// +/// Due to the comparison implementation of ORC stats, for float/double columns in ORC +/// files, if the first value in a file is NaN, metrics of this file will report NaN for +/// both upper and lower bound despite that the column could contain non-NaN data. Thus in +/// some scenarios explicitly checks for NaN is necessary in order to not include files +/// that may contain rows that don't match. +/// + +#include + +#include "iceberg/expression/expression.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Evaluates an Expression against manifest. +/// \note: The evaluator is thread-safe. +class ICEBERG_EXPORT StrictMetricsEvaluator { + public: + /// \brief Make a strict metrics evaluator + /// + /// \param expr The expression to evaluate + /// \param schema The schema of the table + /// \param case_sensitive Whether field name matching is case-sensitive + static Result> Make( + std::shared_ptr expr, std::shared_ptr schema, + bool case_sensitive = true); + + ~StrictMetricsEvaluator(); + + /// \brief Evaluate the expression against a DataFile. + /// + /// \param data_file The data file to evaluate + /// \return true if the file matches the expression, false otherwise, or error + Result Eval(const DataFile& data_file) const; + + private: + explicit StrictMetricsEvaluator(const std::shared_ptr& expr, + std::shared_ptr schema); + + private: + std::shared_ptr expr_; + std::shared_ptr schema_; +}; + +} // namespace iceberg diff --git a/src/iceberg/result.h b/src/iceberg/result.h index 99df37247..b91134f52 100644 --- a/src/iceberg/result.h +++ b/src/iceberg/result.h @@ -40,6 +40,7 @@ enum class ErrorKind { kInvalidSchema, kInvalidManifest, kInvalidManifestList, + kInvalidStats, kIOError, kJsonParseError, kNoSuchNamespace, @@ -89,6 +90,7 @@ DEFINE_ERROR_FUNCTION(InvalidExpression) DEFINE_ERROR_FUNCTION(InvalidSchema) DEFINE_ERROR_FUNCTION(InvalidManifest) DEFINE_ERROR_FUNCTION(InvalidManifestList) +DEFINE_ERROR_FUNCTION(InvalidStats) DEFINE_ERROR_FUNCTION(IOError) DEFINE_ERROR_FUNCTION(JsonParseError) DEFINE_ERROR_FUNCTION(NoSuchNamespace)