diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 275d71fce..7218d76e0 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -28,6 +28,7 @@ set(ICEBERG_SOURCES expression/inclusive_metrics_evaluator.cc expression/literal.cc expression/predicate.cc + expression/residual_evaluator.cc expression/rewrite_not.cc expression/strict_metrics_evaluator.cc expression/term.cc diff --git a/src/iceberg/expression/expression_visitor.h b/src/iceberg/expression/expression_visitor.h index d66382453..27cfb99c1 100644 --- a/src/iceberg/expression/expression_visitor.h +++ b/src/iceberg/expression/expression_visitor.h @@ -260,10 +260,8 @@ class ICEBERG_EXPORT BoundVisitor : public ExpressionVisitor { /// \brief Visit an unbound predicate. /// - /// Bound visitors do not support unbound predicates. - /// /// \param pred The unbound predicate - Result Predicate(const std::shared_ptr& pred) final { + Result Predicate(const std::shared_ptr& pred) override { ICEBERG_DCHECK(pred != nullptr, "UnboundPredicate cannot be null"); return NotSupported("Not a bound predicate: {}", pred->ToString()); } diff --git a/src/iceberg/expression/meson.build b/src/iceberg/expression/meson.build index 8e312791b..f3b748482 100644 --- a/src/iceberg/expression/meson.build +++ b/src/iceberg/expression/meson.build @@ -26,6 +26,7 @@ install_headers( 'inclusive_metrics_evaluator.h', 'literal.h', 'predicate.h', + 'residual_evaluator.h', 'rewrite_not.h', 'strict_metrics_evaluator.h', 'term.h', diff --git a/src/iceberg/expression/residual_evaluator.cc b/src/iceberg/expression/residual_evaluator.cc new file mode 100644 index 000000000..e818199ed --- /dev/null +++ b/src/iceberg/expression/residual_evaluator.cc @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/expression/residual_evaluator.h" + +#include "iceberg/expression/expression.h" +#include "iceberg/expression/expression_visitor.h" +#include "iceberg/expression/predicate.h" +#include "iceberg/partition_spec.h" +#include "iceberg/row/struct_like.h" +#include "iceberg/schema.h" +#include "iceberg/schema_internal.h" +#include "iceberg/transform.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +std::shared_ptr always_true() { return True::Instance(); } +std::shared_ptr always_false() { return False::Instance(); } + +class ResidualVisitor : public BoundVisitor> { + public: + static Result Make(const PartitionSpec& spec, const Schema& schema, + const StructLike& partition_data, + bool case_sensitive) { + ICEBERG_ASSIGN_OR_RAISE(auto partition_type, spec.PartitionType(schema)); + auto partition_schema = FromStructType(std::move(*partition_type), std::nullopt); + return ResidualVisitor(spec, schema, std::move(partition_schema), partition_data, + case_sensitive); + } + + Result> AlwaysTrue() override { return always_true(); } + + Result> AlwaysFalse() override { return always_false(); } + + Result> Not( + const std::shared_ptr& child_result) override { + return Not::MakeFolded(child_result); + } + + Result> And( + const std::shared_ptr& left_result, + const std::shared_ptr& right_result) override { + return And::MakeFolded(left_result, right_result); + } + + Result> Or( + const std::shared_ptr& left_result, + const std::shared_ptr& right_result) override { + return Or::MakeFolded(left_result, right_result); + } + + Result> IsNull( + const std::shared_ptr& expr) override { + return expr->Evaluate(partition_data_).transform([](const auto& value) { + return value.IsNull() ? always_true() : always_false(); + }); + } + + Result> NotNull( + const std::shared_ptr& expr) override { + return expr->Evaluate(partition_data_).transform([](const auto& value) { + return value.IsNull() ? always_false() : always_true(); + }); + } + + Result> IsNaN(const std::shared_ptr& expr) override { + return expr->Evaluate(partition_data_).transform([](const auto& value) { + return value.IsNaN() ? always_true() : always_false(); + }); + } + + Result> NotNaN( + const std::shared_ptr& expr) override { + return expr->Evaluate(partition_data_).transform([](const auto& value) { + return value.IsNaN() ? always_false() : always_true(); + }); + } + + Result> Lt(const std::shared_ptr& expr, + const Literal& lit) override { + return expr->Evaluate(partition_data_).transform([&lit](const auto& value) { + return value < lit ? always_true() : always_false(); + }); + } + + Result> LtEq(const std::shared_ptr& expr, + const Literal& lit) override { + return expr->Evaluate(partition_data_).transform([&lit](const auto& value) { + return value <= lit ? always_true() : always_false(); + }); + } + + Result> Gt(const std::shared_ptr& expr, + const Literal& lit) override { + return expr->Evaluate(partition_data_).transform([&lit](const auto& value) { + return value > lit ? always_true() : always_false(); + }); + } + + Result> GtEq(const std::shared_ptr& expr, + const Literal& lit) override { + return expr->Evaluate(partition_data_).transform([&lit](const auto& value) { + return value >= lit ? always_true() : always_false(); + }); + } + + Result> Eq(const std::shared_ptr& expr, + const Literal& lit) override { + return expr->Evaluate(partition_data_).transform([&lit](const auto& value) { + return value == lit ? always_true() : always_false(); + }); + } + + Result> NotEq(const std::shared_ptr& expr, + const Literal& lit) override { + return expr->Evaluate(partition_data_).transform([&lit](const auto& value) { + return value != lit ? always_true() : always_false(); + }); + } + + Result> StartsWith(const std::shared_ptr& expr, + const Literal& lit) override { + ICEBERG_ASSIGN_OR_RAISE(auto value, expr->Evaluate(partition_data_)); + + if (!std::holds_alternative(value.value()) || + !std::holds_alternative(lit.value())) { + return InvalidExpression("Both value and literal should be strings"); + } + + const auto& str_value = std::get(value.value()); + const auto& str_prefix = std::get(lit.value()); + return str_value.starts_with(str_prefix) ? always_true() : always_false(); + } + + Result> NotStartsWith(const std::shared_ptr& expr, + const Literal& lit) override { + ICEBERG_ASSIGN_OR_RAISE(auto value, expr->Evaluate(partition_data_)); + + if (!std::holds_alternative(value.value()) || + !std::holds_alternative(lit.value())) { + return InvalidExpression("Both value and literal should be strings"); + } + + const auto& str_value = std::get(value.value()); + const auto& str_prefix = std::get(lit.value()); + return str_value.starts_with(str_prefix) ? always_false() : always_true(); + } + + Result> In( + const std::shared_ptr& expr, + const BoundSetPredicate::LiteralSet& literal_set) override { + return expr->Evaluate(partition_data_).transform([&literal_set](const auto& value) { + return literal_set.contains(value) ? always_true() : always_false(); + }); + } + + Result> NotIn( + const std::shared_ptr& expr, + const BoundSetPredicate::LiteralSet& literal_set) override { + return expr->Evaluate(partition_data_).transform([&literal_set](const auto& value) { + return literal_set.contains(value) ? always_false() : always_true(); + }); + } + + Result> Predicate( + const std::shared_ptr& pred) override; + + Result> Predicate( + const std::shared_ptr& pred) override { + ICEBERG_ASSIGN_OR_RAISE(auto bound, pred->Bind(schema_, case_sensitive_)); + if (bound->is_bound_predicate()) { + ICEBERG_ASSIGN_OR_RAISE( + auto residual, Predicate(std::dynamic_pointer_cast(bound))); + if (residual->is_bound_predicate()) { + // replace inclusive original unbound predicate + return pred; + } + return residual; + } + // if binding didn't result in a Predicate, return the expression + return bound; + } + + private: + ResidualVisitor(const PartitionSpec& spec, const Schema& schema, + std::unique_ptr partition_schema, + const StructLike& partition_data, bool case_sensitive) + : spec_(spec), + schema_(schema), + partition_schema_(std::move(partition_schema)), + partition_data_(partition_data), + case_sensitive_(case_sensitive) {} + + const PartitionSpec& spec_; + const Schema& schema_; + std::unique_ptr partition_schema_; + const StructLike& partition_data_; + bool case_sensitive_; +}; + +Result> ResidualVisitor::Predicate( + const std::shared_ptr& pred) { + // Get the strict projection and inclusive projection of this predicate in partition + // data, then use them to determine whether to return the original predicate. The + // strict projection returns true iff the original predicate would have returned true, + // so the predicate can be eliminated if the strict projection evaluates to true. + // Similarly the inclusive projection returns false iff the original predicate would + // have returned false, so the predicate can also be eliminated if the inclusive + // projection evaluates to false. + + // If there is no strict projection or if it evaluates to false, then return the + // predicate. + ICEBERG_ASSIGN_OR_RAISE( + auto parts, spec_.GetFieldsBySourceId(pred->reference()->field().field_id())); + if (parts.empty()) { + // Not associated with a partition field, can't be evaluated + return pred; + } + + for (const auto& part : parts) { + // Check the strict projection + ICEBERG_ASSIGN_OR_RAISE(auto strict_projection, part.get().transform()->ProjectStrict( + part.get().name(), pred)); + std::shared_ptr strict_result = nullptr; + + if (strict_projection != nullptr) { + ICEBERG_ASSIGN_OR_RAISE( + auto bound_strict, + strict_projection->Bind(*partition_schema_, case_sensitive_)); + if (bound_strict->is_bound_predicate()) { + ICEBERG_ASSIGN_OR_RAISE( + strict_result, BoundVisitor::Predicate( + std::dynamic_pointer_cast(bound_strict))); + } else { + // If the result is not a predicate, then it must be a constant like alwaysTrue + // or alwaysFalse + strict_result = std::move(bound_strict); + } + } + + if (strict_result != nullptr && strict_result->op() == Expression::Operation::kTrue) { + // If strict is true, returning true + return always_true(); + } + + // Check the inclusive projection + ICEBERG_ASSIGN_OR_RAISE(auto inclusive_projection, + part.get().transform()->Project(part.get().name(), pred)); + std::shared_ptr inclusive_result = nullptr; + + if (inclusive_projection != nullptr) { + ICEBERG_ASSIGN_OR_RAISE( + auto bound_inclusive, + inclusive_projection->Bind(*partition_schema_, case_sensitive_)); + + if (bound_inclusive->is_bound_predicate()) { + ICEBERG_ASSIGN_OR_RAISE( + inclusive_result, + BoundVisitor::Predicate( + std::dynamic_pointer_cast(bound_inclusive))); + } else { + // If the result is not a predicate, then it must be a constant like alwaysTrue + // or alwaysFalse + inclusive_result = std::move(bound_inclusive); + } + } + + if (inclusive_result != nullptr && + inclusive_result->op() == Expression::Operation::kFalse) { + // If inclusive is false, returning false + return always_false(); + } + } + + // Neither strict nor inclusive predicate was conclusive, returning the original pred + return pred; +} + +// Unpartitioned residual evaluator that always returns the original expression +class UnpartitionedResidualEvaluator : public ResidualEvaluator { + public: + explicit UnpartitionedResidualEvaluator(std::shared_ptr expr) + : ResidualEvaluator(std::move(expr), *PartitionSpec::Unpartitioned(), + *kEmptySchema_, true) {} + + Result> ResidualFor( + const StructLike& /*partition_data*/) const override { + return expr_; + } + + private: + // Store an empty schema to avoid dangling reference when passing to base class + inline static const std::shared_ptr kEmptySchema_ = + std::make_shared(std::vector{}, std::nullopt); +}; + +} // namespace + +ResidualEvaluator::ResidualEvaluator(std::shared_ptr expr, + const PartitionSpec& spec, const Schema& schema, + bool case_sensitive) + : expr_(std::move(expr)), + spec_(spec), + schema_(schema), + case_sensitive_(case_sensitive) {} + +ResidualEvaluator::~ResidualEvaluator() = default; + +Result> ResidualEvaluator::Unpartitioned( + std::shared_ptr expr) { + return std::unique_ptr( + new UnpartitionedResidualEvaluator(std::move(expr))); +} + +Result> ResidualEvaluator::Make( + std::shared_ptr expr, const PartitionSpec& spec, const Schema& schema, + bool case_sensitive) { + if (spec.fields().empty()) { + return Unpartitioned(std::move(expr)); + } + return std::unique_ptr( + new ResidualEvaluator(std::move(expr), spec, schema, case_sensitive)); +} + +Result> ResidualEvaluator::ResidualFor( + const StructLike& partition_data) const { + ICEBERG_ASSIGN_OR_RAISE( + auto visitor, + ResidualVisitor::Make(spec_, schema_, partition_data, case_sensitive_)); + return Visit, ResidualVisitor>(expr_, visitor); +} + +} // namespace iceberg diff --git a/src/iceberg/expression/residual_evaluator.h b/src/iceberg/expression/residual_evaluator.h new file mode 100644 index 000000000..60bf67f25 --- /dev/null +++ b/src/iceberg/expression/residual_evaluator.h @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/expression/residual_evaluator.h +/// Residual evaluator for finding residual expressions after partition evaluation. + +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Finds the residuals for an Expression using the partitions in the given +/// PartitionSpec. +/// +/// A residual expression is made by partially evaluating an expression using partition +/// values. For example, if a table is partitioned by day(utc_timestamp) and is read +/// with a filter expression utc_timestamp >= a and utc_timestamp <= b, then there are +/// 4 possible residual expressions for the partition data, d: +/// +/// - If d > day(a) and d < day(b), the residual is always true +/// - If d == day(a) and d != day(b), the residual is utc_timestamp >= a +/// - If d == day(b) and d != day(a), the residual is utc_timestamp <= b +/// - If d == day(a) == day(b), the residual is utc_timestamp >= a and utc_timestamp <= b +/// +/// Partition data is passed using StructLike. Residuals are returned by ResidualFor(). +class ICEBERG_EXPORT ResidualEvaluator { + public: + /// \brief Return a residual evaluator for an unpartitioned PartitionSpec. + /// + /// \param expr An expression + /// \return A residual evaluator that always returns the expression + static Result> Unpartitioned( + std::shared_ptr expr); + + /// \brief Return a residual evaluator for a PartitionSpec and Expression. + /// + /// \param expr An expression + /// \param spec A partition spec + /// \param schema The schema to bind expressions against + /// \param case_sensitive Whether field name matching is case-sensitive + /// \return A residual evaluator for the expression + static Result> Make(std::shared_ptr expr, + const PartitionSpec& spec, + const Schema& schema, + bool case_sensitive = true); + + ~ResidualEvaluator(); + + /// \brief Returns a residual expression for the given partition values. + /// + /// \param partition_data Partition data values + /// \return The residual of this evaluator's expression from the partition values + virtual Result> ResidualFor( + const StructLike& partition_data) const; + + protected: + ResidualEvaluator(std::shared_ptr expr, const PartitionSpec& spec, + const Schema& schema, bool case_sensitive); + + std::shared_ptr expr_; + + private: + const PartitionSpec& spec_; + const Schema& schema_; + bool case_sensitive_; +}; + +} // namespace iceberg diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index c139c66b5..c10c5a827 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -50,6 +50,7 @@ iceberg_sources = files( 'expression/inclusive_metrics_evaluator.cc', 'expression/literal.cc', 'expression/predicate.cc', + 'expression/residual_evaluator.cc', 'expression/rewrite_not.cc', 'expression/strict_metrics_evaluator.cc', 'expression/term.cc', diff --git a/src/iceberg/partition_spec.cc b/src/iceberg/partition_spec.cc index 0c2dda124..b0f1144c1 100644 --- a/src/iceberg/partition_spec.cc +++ b/src/iceberg/partition_spec.cc @@ -60,7 +60,8 @@ int32_t PartitionSpec::spec_id() const { return spec_id_; } std::span PartitionSpec::fields() const { return fields_; } -Result> PartitionSpec::PartitionType(const Schema& schema) { +Result> PartitionSpec::PartitionType( + const Schema& schema) const { if (fields_.empty()) { return std::make_unique(std::vector{}); } @@ -154,6 +155,26 @@ Status PartitionSpec::Validate(const Schema& schema, bool allow_missing_fields) return {}; } +Result>> +PartitionSpec::GetFieldsBySourceId(int32_t source_id) const { + ICEBERG_ASSIGN_OR_RAISE(auto source_id_to_fields, source_id_to_fields_.Get(*this)); + if (auto it = source_id_to_fields.get().find(source_id); + it != source_id_to_fields.get().cend()) { + return it->second; + } + // Note that it is not an error to not find any partition fields for a source id. + return std::vector{}; +} + +Result PartitionSpec::InitSourceIdToFieldsMap( + const PartitionSpec& self) { + SourceIdToFieldsMap source_id_to_fields; + for (const auto& field : self.fields_) { + source_id_to_fields[field.source_id()].emplace_back(std::cref(field)); + } + return source_id_to_fields; +} + Result> PartitionSpec::Make( const Schema& schema, int32_t spec_id, std::vector fields, bool allow_missing_fields, std::optional last_assigned_field_id) { diff --git a/src/iceberg/partition_spec.h b/src/iceberg/partition_spec.h index 12beb9c97..7f8f67822 100644 --- a/src/iceberg/partition_spec.h +++ b/src/iceberg/partition_spec.h @@ -27,6 +27,7 @@ #include #include #include +#include #include #include "iceberg/iceberg_export.h" @@ -34,6 +35,7 @@ #include "iceberg/result.h" #include "iceberg/type_fwd.h" #include "iceberg/util/formattable.h" +#include "iceberg/util/lazy.h" namespace iceberg { @@ -60,7 +62,7 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable { std::span fields() const; /// \brief Get the partition type binding to the input schema. - Result> PartitionType(const Schema&); + Result> PartitionType(const Schema& schema) const; std::string ToString() const override; @@ -77,6 +79,13 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable { /// \return Error status if the partition spec is invalid. Status Validate(const Schema& schema, bool allow_missing_fields) const; + /// \brief Get the partition fields by source ID. + /// \param source_id The id of the source field. + /// \return The partition fields by source ID, or NotFound if the source field is not + /// found. + using PartitionFieldRef = std::reference_wrapper; + Result> GetFieldsBySourceId(int32_t source_id) const; + /// \brief Create a PartitionSpec binding to a schema. /// \param schema The schema to bind the partition spec to. /// \param spec_id The spec ID. @@ -116,9 +125,13 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable { /// \brief Compare two partition specs for equality. bool Equals(const PartitionSpec& other) const; + using SourceIdToFieldsMap = std::unordered_map>; + static Result InitSourceIdToFieldsMap(const PartitionSpec&); + const int32_t spec_id_; std::vector fields_; int32_t last_assigned_field_id_; + Lazy source_id_to_fields_; }; } // namespace iceberg diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 9892e3d4f..f9cfb8485 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -88,6 +88,7 @@ add_iceberg_test(expression_test inclusive_metrics_evaluator_test.cc inclusive_metrics_evaluator_with_transform_test.cc predicate_test.cc + residual_evaluator_test.cc strict_metrics_evaluator_test.cc) add_iceberg_test(json_serde_test diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index c73abe188..0f8b9291b 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -65,6 +65,7 @@ iceberg_tests = { 'inclusive_metrics_evaluator_with_transform_test.cc', 'literal_test.cc', 'predicate_test.cc', + 'residual_evaluator_test.cc', 'strict_metrics_evaluator_test.cc', ), }, diff --git a/src/iceberg/test/residual_evaluator_test.cc b/src/iceberg/test/residual_evaluator_test.cc new file mode 100644 index 000000000..bef17d2bc --- /dev/null +++ b/src/iceberg/test/residual_evaluator_test.cc @@ -0,0 +1,614 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/expression/residual_evaluator.h" + +#include +#include +#include + +#include + +#include "iceberg/expression/expressions.h" +#include "iceberg/expression/literal.h" +#include "iceberg/expression/predicate.h" +#include "iceberg/partition_field.h" +#include "iceberg/partition_spec.h" +#include "iceberg/row/partition_values.h" +#include "iceberg/schema.h" +#include "iceberg/test/matchers.h" +#include "iceberg/transform.h" +#include "iceberg/type.h" + +namespace iceberg { + +class ResidualEvaluatorTest : public ::testing::Test { + protected: + void SetUp() override {} + + // Helper function to assert residual operation + void AssertResidualOp(const std::shared_ptr& spec, + const std::shared_ptr& schema, + const std::shared_ptr& pred, + const Literal& partition_value, + Expression::Operation expected_op) { + ICEBERG_UNWRAP_OR_FAIL(auto evaluator, + ResidualEvaluator::Make(pred, *spec, *schema, true)); + PartitionValues partition_data(partition_value); + ICEBERG_UNWRAP_OR_FAIL(auto residual, evaluator->ResidualFor(partition_data)); + EXPECT_EQ(residual->op(), expected_op); + } + + // Helper function to assert residual is the same as original predicate + void AssertResidualPredicate(const std::shared_ptr& spec, + const std::shared_ptr& schema, + const std::shared_ptr& pred, + const Literal& partition_value) { + ICEBERG_UNWRAP_OR_FAIL(auto evaluator, + ResidualEvaluator::Make(pred, *spec, *schema, true)); + PartitionValues partition_data(partition_value); + ICEBERG_UNWRAP_OR_FAIL(auto residual, evaluator->ResidualFor(partition_data)); + ASSERT_TRUE(residual->is_unbound_predicate()); + auto unbound_residual = std::dynamic_pointer_cast(residual); + ASSERT_NE(unbound_residual, nullptr); + auto unbound_original = std::dynamic_pointer_cast(pred); + ASSERT_NE(unbound_original, nullptr); + EXPECT_EQ(unbound_residual->op(), unbound_original->op()); + EXPECT_EQ(unbound_residual->reference()->name(), + unbound_original->reference()->name()); + // Check literal value + auto residual_impl = + std::dynamic_pointer_cast>(unbound_residual); + auto original_impl = + std::dynamic_pointer_cast>(unbound_original); + ASSERT_NE(residual_impl, nullptr); + ASSERT_NE(original_impl, nullptr); + ASSERT_EQ(residual_impl->literals().size(), original_impl->literals().size()); + if (!residual_impl->literals().empty()) { + EXPECT_EQ(residual_impl->literals()[0].value(), + original_impl->literals()[0].value()); + } + } +}; + +TEST_F(ResidualEvaluatorTest, IdentityTransformResiduals) { + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(50, "dateint", int32()), + SchemaField::MakeOptional(51, "hour", int32())}, + std::nullopt); + + auto identity_transform = Transform::Identity(); + PartitionField pt_field(50, 1000, "dateint", identity_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec_unique, + PartitionSpec::Make(*schema, 0, {pt_field}, false)); + auto spec = std::shared_ptr(spec_unique.release()); + + // Create expression: (dateint < 20170815 AND dateint > 20170801) OR + // (dateint == 20170815 AND hour < 12) OR + // (dateint == 20170801 AND hour > 11) + auto expr = Expressions::Or( + Expressions::Or( + Expressions::And(Expressions::LessThan("dateint", Literal::Int(20170815)), + Expressions::GreaterThan("dateint", Literal::Int(20170801))), + Expressions::And(Expressions::Equal("dateint", Literal::Int(20170815)), + Expressions::LessThan("hour", Literal::Int(12)))), + Expressions::And(Expressions::Equal("dateint", Literal::Int(20170801)), + Expressions::GreaterThan("hour", Literal::Int(11)))); + + ICEBERG_UNWRAP_OR_FAIL(auto evaluator, + ResidualEvaluator::Make(expr, *spec, *schema, true)); + + // Equal to the upper date bound + PartitionValues partition_data1(Literal::Int(20170815)); + ICEBERG_UNWRAP_OR_FAIL(auto residual1, evaluator->ResidualFor(partition_data1)); + ASSERT_TRUE(residual1->is_unbound_predicate()); + auto unbound1 = std::dynamic_pointer_cast(residual1); + ASSERT_NE(unbound1, nullptr); + EXPECT_EQ(unbound1->op(), Expression::Operation::kLt); + EXPECT_EQ(unbound1->reference()->name(), "hour"); + // Access literal through literals() span + auto unbound1_impl = + std::dynamic_pointer_cast>(unbound1); + ASSERT_NE(unbound1_impl, nullptr); + ASSERT_EQ(unbound1_impl->literals().size(), 1); + EXPECT_EQ(unbound1_impl->literals()[0].value(), Literal::Int(12).value()); + + // Equal to the lower date bound + PartitionValues partition_data2(Literal::Int(20170801)); + ICEBERG_UNWRAP_OR_FAIL(auto residual2, evaluator->ResidualFor(partition_data2)); + ASSERT_TRUE(residual2->is_unbound_predicate()); + auto unbound2 = std::dynamic_pointer_cast(residual2); + ASSERT_NE(unbound2, nullptr); + EXPECT_EQ(unbound2->op(), Expression::Operation::kGt); + EXPECT_EQ(unbound2->reference()->name(), "hour"); + // Access literal through literals() span + auto unbound2_impl = + std::dynamic_pointer_cast>(unbound2); + ASSERT_NE(unbound2_impl, nullptr); + ASSERT_EQ(unbound2_impl->literals().size(), 1); + EXPECT_EQ(unbound2_impl->literals()[0].value(), Literal::Int(11).value()); + + // Inside the date range + PartitionValues partition_data3(Literal::Int(20170812)); + ICEBERG_UNWRAP_OR_FAIL(auto residual3, evaluator->ResidualFor(partition_data3)); + EXPECT_EQ(residual3->op(), Expression::Operation::kTrue); + + // Outside the date range + PartitionValues partition_data4(Literal::Int(20170817)); + ICEBERG_UNWRAP_OR_FAIL(auto residual4, evaluator->ResidualFor(partition_data4)); + EXPECT_EQ(residual4->op(), Expression::Operation::kFalse); +} + +TEST_F(ResidualEvaluatorTest, CaseInsensitiveIdentityTransformResiduals) { + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(50, "dateint", int32()), + SchemaField::MakeOptional(51, "hour", int32())}, + std::nullopt); + + auto identity_transform = Transform::Identity(); + PartitionField pt_field(50, 1000, "dateint", identity_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec_unique, + PartitionSpec::Make(*schema, 0, {pt_field}, false)); + auto spec = std::shared_ptr(spec_unique.release()); + + // Create expression with mixed case field names + auto expr = Expressions::Or( + Expressions::Or( + Expressions::And(Expressions::LessThan("DATEINT", Literal::Int(20170815)), + Expressions::GreaterThan("dateint", Literal::Int(20170801))), + Expressions::And(Expressions::Equal("dateint", Literal::Int(20170815)), + Expressions::LessThan("HOUR", Literal::Int(12)))), + Expressions::And(Expressions::Equal("DateInt", Literal::Int(20170801)), + Expressions::GreaterThan("hOUr", Literal::Int(11)))); + + ICEBERG_UNWRAP_OR_FAIL(auto evaluator, + ResidualEvaluator::Make(expr, *spec, *schema, false)); + + // Equal to the upper date bound + PartitionValues partition_data1(Literal::Int(20170815)); + ICEBERG_UNWRAP_OR_FAIL(auto residual1, evaluator->ResidualFor(partition_data1)); + ASSERT_TRUE(residual1->is_unbound_predicate()); + auto unbound1 = std::dynamic_pointer_cast(residual1); + ASSERT_NE(unbound1, nullptr); + EXPECT_EQ(unbound1->op(), Expression::Operation::kLt); + EXPECT_EQ(unbound1->reference()->name(), "HOUR"); + // Access literal through literals() span + auto unbound1_impl = + std::dynamic_pointer_cast>(unbound1); + ASSERT_NE(unbound1_impl, nullptr); + ASSERT_EQ(unbound1_impl->literals().size(), 1); + EXPECT_EQ(unbound1_impl->literals()[0].value(), Literal::Int(12).value()); + + // Equal to the lower date bound + PartitionValues partition_data2(Literal::Int(20170801)); + ICEBERG_UNWRAP_OR_FAIL(auto residual2, evaluator->ResidualFor(partition_data2)); + ASSERT_TRUE(residual2->is_unbound_predicate()); + auto unbound2 = std::dynamic_pointer_cast(residual2); + ASSERT_NE(unbound2, nullptr); + EXPECT_EQ(unbound2->op(), Expression::Operation::kGt); + EXPECT_EQ(unbound2->reference()->name(), "hOUr"); + // Access literal through literals() span + auto unbound2_impl = + std::dynamic_pointer_cast>(unbound2); + ASSERT_NE(unbound2_impl, nullptr); + ASSERT_EQ(unbound2_impl->literals().size(), 1); + EXPECT_EQ(unbound2_impl->literals()[0].value(), Literal::Int(11).value()); + + // Inside the date range + PartitionValues partition_data3(Literal::Int(20170812)); + ICEBERG_UNWRAP_OR_FAIL(auto residual3, evaluator->ResidualFor(partition_data3)); + EXPECT_EQ(residual3->op(), Expression::Operation::kTrue); + + // Outside the date range + PartitionValues partition_data4(Literal::Int(20170817)); + ICEBERG_UNWRAP_OR_FAIL(auto residual4, evaluator->ResidualFor(partition_data4)); + EXPECT_EQ(residual4->op(), Expression::Operation::kFalse); +} + +TEST_F(ResidualEvaluatorTest, UnpartitionedResiduals) { + std::vector> expressions = { + Expressions::AlwaysTrue(), + Expressions::AlwaysFalse(), + Expressions::LessThan("a", Literal::Int(5)), + Expressions::GreaterThanOrEqual("b", Literal::Int(16)), + Expressions::NotNull("c"), + Expressions::IsNull("d"), + Expressions::In("e", {Literal::Int(1), Literal::Int(2), Literal::Int(3)}), + Expressions::NotIn("f", {Literal::Int(1), Literal::Int(2), Literal::Int(3)}), + Expressions::NotNaN("g"), + Expressions::IsNaN("h"), + Expressions::StartsWith("data", "abcd"), + Expressions::NotStartsWith("data", "abcd")}; + + PartitionValues empty_partition; + + for (const auto& expr : expressions) { + ICEBERG_UNWRAP_OR_FAIL(auto evaluator, ResidualEvaluator::Unpartitioned(expr)); + ICEBERG_UNWRAP_OR_FAIL(auto residual, evaluator->ResidualFor(empty_partition)); + // For unpartitioned tables, residual should be the original expression + EXPECT_EQ(residual->op(), expr->op()); + } +} + +TEST_F(ResidualEvaluatorTest, In) { + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(50, "dateint", int32()), + SchemaField::MakeOptional(51, "hour", int32())}, + std::nullopt); + + auto identity_transform = Transform::Identity(); + PartitionField pt_field(50, 1000, "dateint", identity_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec_unique, + PartitionSpec::Make(*schema, 0, {pt_field}, false)); + auto spec = std::shared_ptr(spec_unique.release()); + + auto expr = Expressions::In("dateint", {Literal::Int(20170815), Literal::Int(20170816), + Literal::Int(20170817)}); + + ICEBERG_UNWRAP_OR_FAIL(auto evaluator, + ResidualEvaluator::Make(expr, *spec, *schema, true)); + + PartitionValues partition_data1(Literal::Int(20170815)); + ICEBERG_UNWRAP_OR_FAIL(auto residual1, evaluator->ResidualFor(partition_data1)); + EXPECT_EQ(residual1->op(), Expression::Operation::kTrue); + + PartitionValues partition_data2(Literal::Int(20180815)); + ICEBERG_UNWRAP_OR_FAIL(auto residual2, evaluator->ResidualFor(partition_data2)); + EXPECT_EQ(residual2->op(), Expression::Operation::kFalse); +} + +TEST_F(ResidualEvaluatorTest, NotIn) { + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(50, "dateint", int32()), + SchemaField::MakeOptional(51, "hour", int32())}, + std::nullopt); + + auto identity_transform = Transform::Identity(); + PartitionField pt_field(50, 1000, "dateint", identity_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec_unique, + PartitionSpec::Make(*schema, 0, {pt_field}, false)); + auto spec = std::shared_ptr(spec_unique.release()); + + auto expr = Expressions::NotIn( + "dateint", + {Literal::Int(20170815), Literal::Int(20170816), Literal::Int(20170817)}); + + ICEBERG_UNWRAP_OR_FAIL(auto evaluator, + ResidualEvaluator::Make(expr, *spec, *schema, true)); + + PartitionValues partition_data1(Literal::Int(20180815)); + ICEBERG_UNWRAP_OR_FAIL(auto residual1, evaluator->ResidualFor(partition_data1)); + EXPECT_EQ(residual1->op(), Expression::Operation::kTrue); + + PartitionValues partition_data2(Literal::Int(20170815)); + ICEBERG_UNWRAP_OR_FAIL(auto residual2, evaluator->ResidualFor(partition_data2)); + EXPECT_EQ(residual2->op(), Expression::Operation::kFalse); +} + +TEST_F(ResidualEvaluatorTest, IsNaN) { + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(50, "double", float64()), + SchemaField::MakeOptional(51, "float", float32())}, + std::nullopt); + + // Test double field + auto identity_transform = Transform::Identity(); + PartitionField pt_field_double(50, 1000, "double", identity_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec_double_unique, + PartitionSpec::Make(*schema, 0, {pt_field_double}, false)); + auto spec_double = std::shared_ptr(spec_double_unique.release()); + + auto expr_double = Expressions::IsNaN("double"); + ICEBERG_UNWRAP_OR_FAIL( + auto evaluator_double, + ResidualEvaluator::Make(expr_double, *spec_double, *schema, true)); + + PartitionValues partition_data_nan_double(Literal::Double(std::nan(""))); + ICEBERG_UNWRAP_OR_FAIL(auto residual_nan_double, + evaluator_double->ResidualFor(partition_data_nan_double)); + EXPECT_EQ(residual_nan_double->op(), Expression::Operation::kTrue); + + PartitionValues partition_data_double(Literal::Double(2.0)); + ICEBERG_UNWRAP_OR_FAIL(auto residual_double, + evaluator_double->ResidualFor(partition_data_double)); + EXPECT_EQ(residual_double->op(), Expression::Operation::kFalse); + + // Test float field + PartitionField pt_field_float(51, 1001, "float", identity_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec_float_unique, + PartitionSpec::Make(*schema, 0, {pt_field_float}, false)); + auto spec_float = std::shared_ptr(spec_float_unique.release()); + + auto expr_float = Expressions::IsNaN("float"); + ICEBERG_UNWRAP_OR_FAIL(auto evaluator_float, + ResidualEvaluator::Make(expr_float, *spec_float, *schema, true)); + + PartitionValues partition_data_nan_float(Literal::Float(std::nanf(""))); + ICEBERG_UNWRAP_OR_FAIL(auto residual_nan_float, + evaluator_float->ResidualFor(partition_data_nan_float)); + EXPECT_EQ(residual_nan_float->op(), Expression::Operation::kTrue); + + PartitionValues partition_data_float(Literal::Float(3.0f)); + ICEBERG_UNWRAP_OR_FAIL(auto residual_float, + evaluator_float->ResidualFor(partition_data_float)); + EXPECT_EQ(residual_float->op(), Expression::Operation::kFalse); +} + +TEST_F(ResidualEvaluatorTest, NotNaN) { + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(50, "double", float64()), + SchemaField::MakeOptional(51, "float", float32())}, + std::nullopt); + + // Test double field + auto identity_transform = Transform::Identity(); + PartitionField pt_field_double(50, 1000, "double", identity_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec_double_unique, + PartitionSpec::Make(*schema, 0, {pt_field_double}, false)); + auto spec_double = std::shared_ptr(spec_double_unique.release()); + + auto expr_double = Expressions::NotNaN("double"); + ICEBERG_UNWRAP_OR_FAIL( + auto evaluator_double, + ResidualEvaluator::Make(expr_double, *spec_double, *schema, true)); + + PartitionValues partition_data_nan_double(Literal::Double(std::nan(""))); + ICEBERG_UNWRAP_OR_FAIL(auto residual_nan_double, + evaluator_double->ResidualFor(partition_data_nan_double)); + EXPECT_EQ(residual_nan_double->op(), Expression::Operation::kFalse); + + PartitionValues partition_data_double(Literal::Double(2.0)); + ICEBERG_UNWRAP_OR_FAIL(auto residual_double, + evaluator_double->ResidualFor(partition_data_double)); + EXPECT_EQ(residual_double->op(), Expression::Operation::kTrue); + + // Test float field + PartitionField pt_field_float(51, 1001, "float", identity_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec_float_unique, + PartitionSpec::Make(*schema, 0, {pt_field_float}, false)); + auto spec_float = std::shared_ptr(spec_float_unique.release()); + + auto expr_float = Expressions::NotNaN("float"); + ICEBERG_UNWRAP_OR_FAIL(auto evaluator_float, + ResidualEvaluator::Make(expr_float, *spec_float, *schema, true)); + + PartitionValues partition_data_nan_float(Literal::Float(std::nanf(""))); + ICEBERG_UNWRAP_OR_FAIL(auto residual_nan_float, + evaluator_float->ResidualFor(partition_data_nan_float)); + EXPECT_EQ(residual_nan_float->op(), Expression::Operation::kFalse); + + PartitionValues partition_data_float(Literal::Float(3.0f)); + ICEBERG_UNWRAP_OR_FAIL(auto residual_float, + evaluator_float->ResidualFor(partition_data_float)); + EXPECT_EQ(residual_float->op(), Expression::Operation::kTrue); +} + +TEST_F(ResidualEvaluatorTest, IntegerTruncateTransformResiduals) { + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(50, "value", int32())}, + std::nullopt); + + // Valid partitions would be 0, 10, 20...90, 100 etc. + auto truncate_transform = Transform::Truncate(10); + PartitionField pt_field(50, 1000, "value", truncate_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec_unique, + PartitionSpec::Make(*schema, 0, {pt_field}, false)); + auto spec = std::shared_ptr(spec_unique.release()); + + // Less than lower bound + AssertResidualOp(spec, schema, Expressions::LessThan("value", Literal::Int(100)), + Literal::Int(110), Expression::Operation::kFalse); + AssertResidualOp(spec, schema, Expressions::LessThan("value", Literal::Int(100)), + Literal::Int(100), Expression::Operation::kFalse); + AssertResidualOp(spec, schema, Expressions::LessThan("value", Literal::Int(100)), + Literal::Int(90), Expression::Operation::kTrue); + + // Less than upper bound + AssertResidualOp(spec, schema, Expressions::LessThan("value", Literal::Int(99)), + Literal::Int(100), Expression::Operation::kFalse); + AssertResidualPredicate(spec, schema, Expressions::LessThan("value", Literal::Int(99)), + Literal::Int(90)); + AssertResidualOp(spec, schema, Expressions::LessThan("value", Literal::Int(99)), + Literal::Int(80), Expression::Operation::kTrue); + + // Less than equals lower bound + AssertResidualOp(spec, schema, Expressions::LessThanOrEqual("value", Literal::Int(100)), + Literal::Int(110), Expression::Operation::kFalse); + AssertResidualPredicate(spec, schema, + Expressions::LessThanOrEqual("value", Literal::Int(100)), + Literal::Int(100)); + AssertResidualOp(spec, schema, Expressions::LessThanOrEqual("value", Literal::Int(100)), + Literal::Int(90), Expression::Operation::kTrue); + + // Less than equals upper bound + AssertResidualOp(spec, schema, Expressions::LessThanOrEqual("value", Literal::Int(99)), + Literal::Int(100), Expression::Operation::kFalse); + AssertResidualOp(spec, schema, Expressions::LessThanOrEqual("value", Literal::Int(99)), + Literal::Int(90), Expression::Operation::kTrue); + AssertResidualOp(spec, schema, Expressions::LessThanOrEqual("value", Literal::Int(99)), + Literal::Int(80), Expression::Operation::kTrue); + + // Greater than lower bound + AssertResidualOp(spec, schema, Expressions::GreaterThan("value", Literal::Int(100)), + Literal::Int(110), Expression::Operation::kTrue); + AssertResidualPredicate(spec, schema, + Expressions::GreaterThan("value", Literal::Int(100)), + Literal::Int(100)); + AssertResidualOp(spec, schema, Expressions::GreaterThan("value", Literal::Int(100)), + Literal::Int(90), Expression::Operation::kFalse); + + // Greater than upper bound + AssertResidualOp(spec, schema, Expressions::GreaterThan("value", Literal::Int(99)), + Literal::Int(100), Expression::Operation::kTrue); + AssertResidualOp(spec, schema, Expressions::GreaterThan("value", Literal::Int(99)), + Literal::Int(90), Expression::Operation::kFalse); + AssertResidualOp(spec, schema, Expressions::GreaterThan("value", Literal::Int(99)), + Literal::Int(80), Expression::Operation::kFalse); + + // Greater than equals lower bound + AssertResidualOp(spec, schema, + Expressions::GreaterThanOrEqual("value", Literal::Int(100)), + Literal::Int(110), Expression::Operation::kTrue); + AssertResidualOp(spec, schema, + Expressions::GreaterThanOrEqual("value", Literal::Int(100)), + Literal::Int(100), Expression::Operation::kTrue); + AssertResidualOp(spec, schema, + Expressions::GreaterThanOrEqual("value", Literal::Int(100)), + Literal::Int(90), Expression::Operation::kFalse); + + // Greater than equals upper bound + AssertResidualOp(spec, schema, + Expressions::GreaterThanOrEqual("value", Literal::Int(99)), + Literal::Int(100), Expression::Operation::kTrue); + AssertResidualPredicate(spec, schema, + Expressions::GreaterThanOrEqual("value", Literal::Int(99)), + Literal::Int(90)); + AssertResidualOp(spec, schema, + Expressions::GreaterThanOrEqual("value", Literal::Int(99)), + Literal::Int(80), Expression::Operation::kFalse); + + // Equal lower bound + AssertResidualOp(spec, schema, Expressions::Equal("value", Literal::Int(100)), + Literal::Int(110), Expression::Operation::kFalse); + AssertResidualPredicate(spec, schema, Expressions::Equal("value", Literal::Int(100)), + Literal::Int(100)); + AssertResidualOp(spec, schema, Expressions::Equal("value", Literal::Int(100)), + Literal::Int(90), Expression::Operation::kFalse); + + // Equal upper bound + AssertResidualOp(spec, schema, Expressions::Equal("value", Literal::Int(99)), + Literal::Int(100), Expression::Operation::kFalse); + AssertResidualPredicate(spec, schema, Expressions::Equal("value", Literal::Int(99)), + Literal::Int(90)); + AssertResidualOp(spec, schema, Expressions::Equal("value", Literal::Int(99)), + Literal::Int(80), Expression::Operation::kFalse); + + // Not equal lower bound + AssertResidualOp(spec, schema, Expressions::NotEqual("value", Literal::Int(100)), + Literal::Int(110), Expression::Operation::kTrue); + AssertResidualPredicate(spec, schema, Expressions::NotEqual("value", Literal::Int(100)), + Literal::Int(100)); + AssertResidualOp(spec, schema, Expressions::NotEqual("value", Literal::Int(100)), + Literal::Int(90), Expression::Operation::kTrue); + + // Not equal upper bound + AssertResidualOp(spec, schema, Expressions::NotEqual("value", Literal::Int(99)), + Literal::Int(100), Expression::Operation::kTrue); + AssertResidualPredicate(spec, schema, Expressions::NotEqual("value", Literal::Int(99)), + Literal::Int(90)); + AssertResidualOp(spec, schema, Expressions::NotEqual("value", Literal::Int(99)), + Literal::Int(80), Expression::Operation::kTrue); +} + +TEST_F(ResidualEvaluatorTest, StringTruncateTransformResiduals) { + auto schema = std::make_shared( + std::vector{SchemaField::MakeOptional(50, "value", string())}, + std::nullopt); + + // Valid partitions would be two letter strings for eg: ab, bc etc + auto truncate_transform = Transform::Truncate(2); + PartitionField pt_field(50, 1000, "value", truncate_transform); + ICEBERG_UNWRAP_OR_FAIL(auto spec_unique, + PartitionSpec::Make(*schema, 0, {pt_field}, false)); + auto spec = std::shared_ptr(spec_unique.release()); + + // Less than + AssertResidualOp(spec, schema, Expressions::LessThan("value", Literal::String("bcd")), + Literal::String("ab"), Expression::Operation::kTrue); + AssertResidualPredicate(spec, schema, + Expressions::LessThan("value", Literal::String("bcd")), + Literal::String("bc")); + AssertResidualOp(spec, schema, Expressions::LessThan("value", Literal::String("bcd")), + Literal::String("cd"), Expression::Operation::kFalse); + + // Less than equals + AssertResidualOp(spec, schema, + Expressions::LessThanOrEqual("value", Literal::String("bcd")), + Literal::String("ab"), Expression::Operation::kTrue); + AssertResidualPredicate(spec, schema, + Expressions::LessThanOrEqual("value", Literal::String("bcd")), + Literal::String("bc")); + AssertResidualOp(spec, schema, + Expressions::LessThanOrEqual("value", Literal::String("bcd")), + Literal::String("cd"), Expression::Operation::kFalse); + + // Greater than + AssertResidualOp(spec, schema, + Expressions::GreaterThan("value", Literal::String("bcd")), + Literal::String("ab"), Expression::Operation::kFalse); + AssertResidualPredicate(spec, schema, + Expressions::GreaterThan("value", Literal::String("bcd")), + Literal::String("bc")); + AssertResidualOp(spec, schema, + Expressions::GreaterThan("value", Literal::String("bcd")), + Literal::String("cd"), Expression::Operation::kTrue); + + // Greater than equals + AssertResidualOp(spec, schema, + Expressions::GreaterThanOrEqual("value", Literal::String("bcd")), + Literal::String("ab"), Expression::Operation::kFalse); + AssertResidualPredicate( + spec, schema, Expressions::GreaterThanOrEqual("value", Literal::String("bcd")), + Literal::String("bc")); + AssertResidualOp(spec, schema, + Expressions::GreaterThanOrEqual("value", Literal::String("bcd")), + Literal::String("cd"), Expression::Operation::kTrue); + + // Equal + AssertResidualOp(spec, schema, Expressions::Equal("value", Literal::String("bcd")), + Literal::String("ab"), Expression::Operation::kFalse); + AssertResidualPredicate(spec, schema, + Expressions::Equal("value", Literal::String("bcd")), + Literal::String("bc")); + AssertResidualOp(spec, schema, Expressions::Equal("value", Literal::String("bcd")), + Literal::String("cd"), Expression::Operation::kFalse); + + // Not equal + AssertResidualOp(spec, schema, Expressions::NotEqual("value", Literal::String("bcd")), + Literal::String("ab"), Expression::Operation::kTrue); + AssertResidualPredicate(spec, schema, + Expressions::NotEqual("value", Literal::String("bcd")), + Literal::String("bc")); + AssertResidualOp(spec, schema, Expressions::NotEqual("value", Literal::String("bcd")), + Literal::String("cd"), Expression::Operation::kTrue); + + // Starts with + AssertResidualOp(spec, schema, Expressions::StartsWith("value", "bcd"), + Literal::String("ab"), Expression::Operation::kFalse); + AssertResidualPredicate(spec, schema, Expressions::StartsWith("value", "bcd"), + Literal::String("bc")); + AssertResidualOp(spec, schema, Expressions::StartsWith("value", "bcd"), + Literal::String("cd"), Expression::Operation::kFalse); + AssertResidualPredicate(spec, schema, Expressions::StartsWith("value", "bcd"), + Literal::String("bcdd")); + + // Not starts with + AssertResidualOp(spec, schema, Expressions::NotStartsWith("value", "bcd"), + Literal::String("ab"), Expression::Operation::kTrue); + AssertResidualPredicate(spec, schema, Expressions::NotStartsWith("value", "bcd"), + Literal::String("bc")); + AssertResidualOp(spec, schema, Expressions::NotStartsWith("value", "bcd"), + Literal::String("cd"), Expression::Operation::kTrue); + AssertResidualPredicate(spec, schema, Expressions::NotStartsWith("value", "bcd"), + Literal::String("bcd")); + AssertResidualPredicate(spec, schema, Expressions::NotStartsWith("value", "bcd"), + Literal::String("bcdd")); +} + +} // namespace iceberg