Skip to content

Commit ed4672b

Browse files
authored
feat: add projection evaluators for expression (#399)
1 parent f93f54d commit ed4672b

File tree

9 files changed

+1407
-1
lines changed

9 files changed

+1407
-1
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ set(ICEBERG_SOURCES
2929
expression/literal.cc
3030
expression/manifest_evaluator.cc
3131
expression/predicate.cc
32+
expression/projections.cc
3233
expression/residual_evaluator.cc
3334
expression/rewrite_not.cc
3435
expression/strict_metrics_evaluator.cc

src/iceberg/expression/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ install_headers(
2727
'literal.h',
2828
'manifest_evaluator.h',
2929
'predicate.h',
30+
'projections.h',
3031
'residual_evaluator.h',
3132
'rewrite_not.h',
3233
'strict_metrics_evaluator.h',
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/expression/projections.h"
21+
22+
#include <memory>
23+
24+
#include "iceberg/expression/expression.h"
25+
#include "iceberg/expression/expression_visitor.h"
26+
#include "iceberg/expression/predicate.h"
27+
#include "iceberg/expression/rewrite_not.h"
28+
#include "iceberg/partition_spec.h"
29+
#include "iceberg/result.h"
30+
#include "iceberg/transform.h"
31+
#include "iceberg/util/macros.h"
32+
33+
namespace iceberg {
34+
35+
class ProjectionVisitor : public ExpressionVisitor<std::shared_ptr<Expression>> {
36+
public:
37+
~ProjectionVisitor() override = default;
38+
39+
ProjectionVisitor(const PartitionSpec& spec, const Schema& schema, bool case_sensitive)
40+
: spec_(spec), schema_(schema), case_sensitive_(case_sensitive) {}
41+
42+
Result<std::shared_ptr<Expression>> AlwaysTrue() override { return True::Instance(); }
43+
44+
Result<std::shared_ptr<Expression>> AlwaysFalse() override { return False::Instance(); }
45+
46+
Result<std::shared_ptr<Expression>> Not(
47+
const std::shared_ptr<Expression>& child_result) override {
48+
return InvalidExpression("Project called on expression with a not");
49+
}
50+
51+
Result<std::shared_ptr<Expression>> And(
52+
const std::shared_ptr<Expression>& left_result,
53+
const std::shared_ptr<Expression>& right_result) override {
54+
return And::MakeFolded(left_result, right_result);
55+
}
56+
57+
Result<std::shared_ptr<Expression>> Or(
58+
const std::shared_ptr<Expression>& left_result,
59+
const std::shared_ptr<Expression>& right_result) override {
60+
return Or::MakeFolded(left_result, right_result);
61+
}
62+
63+
Result<std::shared_ptr<Expression>> Predicate(
64+
const std::shared_ptr<UnboundPredicate>& pred) override {
65+
ICEBERG_ASSIGN_OR_RAISE(auto bound_pred, pred->Bind(schema_, case_sensitive_));
66+
if (bound_pred->is_bound_predicate()) {
67+
return Predicate(std::dynamic_pointer_cast<BoundPredicate>(bound_pred));
68+
}
69+
return bound_pred;
70+
}
71+
72+
Result<std::shared_ptr<Expression>> Predicate(
73+
const std::shared_ptr<BoundPredicate>& pred) override {
74+
return InvalidExpression("Bound predicates are not supported in projections");
75+
}
76+
77+
protected:
78+
const PartitionSpec& spec_;
79+
const Schema& schema_;
80+
bool case_sensitive_;
81+
};
82+
83+
ProjectionEvaluator::ProjectionEvaluator(std::unique_ptr<ProjectionVisitor> visitor)
84+
: visitor_(std::move(visitor)) {}
85+
86+
ProjectionEvaluator::~ProjectionEvaluator() = default;
87+
88+
/// \brief Inclusive projection visitor.
89+
///
90+
/// Uses AND to combine projections from multiple partition fields.
91+
class InclusiveProjectionVisitor : public ProjectionVisitor {
92+
public:
93+
~InclusiveProjectionVisitor() override = default;
94+
95+
InclusiveProjectionVisitor(const PartitionSpec& spec, const Schema& schema,
96+
bool case_sensitive)
97+
: ProjectionVisitor(spec, schema, case_sensitive) {}
98+
99+
Result<std::shared_ptr<Expression>> Predicate(
100+
const std::shared_ptr<BoundPredicate>& pred) override {
101+
ICEBERG_DCHECK(pred != nullptr, "Predicate cannot be null");
102+
// Find partition fields that match the predicate's term
103+
ICEBERG_ASSIGN_OR_RAISE(
104+
auto parts, spec_.GetFieldsBySourceId(pred->reference()->field().field_id()));
105+
if (parts.empty()) {
106+
// The predicate has no partition column
107+
return AlwaysTrue();
108+
}
109+
110+
// Project the predicate for each partition field and combine with AND
111+
//
112+
// consider (d = 2019-01-01) with bucket(7, d) and bucket(5, d)
113+
// projections: b1 = bucket(7, '2019-01-01') = 5, b2 = bucket(5, '2019-01-01') = 0
114+
// any value where b1 != 5 or any value where b2 != 0 cannot be the '2019-01-01'
115+
//
116+
// similarly, if partitioning by day(ts) and hour(ts), the more restrictive
117+
// projection should be used. ts = 2019-01-01T01:00:00 produces day=2019-01-01 and
118+
// hour=2019-01-01-01. the value will be in 2019-01-01-01 and not in 2019-01-01-02.
119+
std::shared_ptr<Expression> result = True::Instance();
120+
for (const auto& part : parts) {
121+
ICEBERG_ASSIGN_OR_RAISE(auto projected,
122+
part.get().transform()->Project(part.get().name(), pred));
123+
if (projected != nullptr) {
124+
ICEBERG_ASSIGN_OR_RAISE(result,
125+
And::MakeFolded(std::move(result), std::move(projected)));
126+
}
127+
}
128+
129+
return result;
130+
}
131+
};
132+
133+
/// \brief Strict projection evaluator.
134+
///
135+
/// Uses OR to combine projections from multiple partition fields.
136+
class StrictProjectionVisitor : public ProjectionVisitor {
137+
public:
138+
~StrictProjectionVisitor() override = default;
139+
140+
StrictProjectionVisitor(const PartitionSpec& spec, const Schema& schema,
141+
bool case_sensitive)
142+
: ProjectionVisitor(spec, schema, case_sensitive) {}
143+
144+
Result<std::shared_ptr<Expression>> Predicate(
145+
const std::shared_ptr<BoundPredicate>& pred) override {
146+
ICEBERG_DCHECK(pred != nullptr, "Predicate cannot be null");
147+
// Find partition fields that match the predicate's term
148+
ICEBERG_ASSIGN_OR_RAISE(
149+
auto parts, spec_.GetFieldsBySourceId(pred->reference()->field().field_id()));
150+
if (parts.empty()) {
151+
// The predicate has no matching partition columns
152+
return AlwaysFalse();
153+
}
154+
155+
// Project the predicate for each partition field and combine with OR
156+
//
157+
// consider (ts > 2019-01-01T01:00:00) with day(ts) and hour(ts)
158+
// projections: d >= 2019-01-02 and h >= 2019-01-01-02 (note the inclusive bounds).
159+
// any timestamp where either projection predicate is true must match the original
160+
// predicate. For example, ts = 2019-01-01T03:00:00 matches the hour projection but
161+
// not the day, but does match the original predicate.
162+
std::shared_ptr<Expression> result = False::Instance();
163+
for (const auto& part : parts) {
164+
ICEBERG_ASSIGN_OR_RAISE(
165+
auto projected, part.get().transform()->ProjectStrict(part.get().name(), pred));
166+
if (projected != nullptr) {
167+
ICEBERG_ASSIGN_OR_RAISE(result,
168+
Or::MakeFolded(std::move(result), std::move(projected)));
169+
}
170+
}
171+
172+
return result;
173+
}
174+
};
175+
176+
Result<std::shared_ptr<Expression>> ProjectionEvaluator::Project(
177+
const std::shared_ptr<Expression>& expr) {
178+
// Projections assume that there are no NOT nodes in the expression tree. To ensure that
179+
// this is the case, the expression is rewritten to push all NOT nodes down to the
180+
// expression leaf nodes.
181+
//
182+
// This is necessary to ensure that the default expression returned when a predicate
183+
// can't be projected is correct.
184+
ICEBERG_ASSIGN_OR_RAISE(auto rewritten, RewriteNot::Visit(expr));
185+
return Visit<std::shared_ptr<Expression>, ProjectionVisitor>(rewritten, *visitor_);
186+
}
187+
188+
std::unique_ptr<ProjectionEvaluator> Projections::Inclusive(const PartitionSpec& spec,
189+
const Schema& schema,
190+
bool case_sensitive) {
191+
auto visitor =
192+
std::make_unique<InclusiveProjectionVisitor>(spec, schema, case_sensitive);
193+
return std::unique_ptr<ProjectionEvaluator>(
194+
new ProjectionEvaluator(std::move(visitor)));
195+
}
196+
197+
std::unique_ptr<ProjectionEvaluator> Projections::Strict(const PartitionSpec& spec,
198+
const Schema& schema,
199+
bool case_sensitive) {
200+
auto visitor = std::make_unique<StrictProjectionVisitor>(spec, schema, case_sensitive);
201+
return std::unique_ptr<ProjectionEvaluator>(
202+
new ProjectionEvaluator(std::move(visitor)));
203+
}
204+
205+
} // namespace iceberg
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/expression/projections.h
23+
/// Utils to project expressions on rows to expressions on partitions.
24+
25+
#include <memory>
26+
27+
#include "iceberg/iceberg_export.h"
28+
#include "iceberg/result.h"
29+
#include "iceberg/type_fwd.h"
30+
31+
namespace iceberg {
32+
33+
/// \brief A class that projects expressions for a table's data rows into expressions on
34+
/// the table's partition values, for a table's partition spec.
35+
class ICEBERG_EXPORT ProjectionEvaluator {
36+
public:
37+
~ProjectionEvaluator();
38+
39+
/// \brief Project the given row expression to a partition expression.
40+
///
41+
/// \param expr an expression on data rows
42+
/// \return an expression on partition data (depends on the projection)
43+
Result<std::shared_ptr<Expression>> Project(const std::shared_ptr<Expression>& expr);
44+
45+
private:
46+
friend class Projections;
47+
48+
/// \brief Create a ProjectionEvaluator.
49+
///
50+
/// \param visitor The projection visitor to use
51+
explicit ProjectionEvaluator(std::unique_ptr<class ProjectionVisitor> visitor);
52+
53+
std::unique_ptr<ProjectionVisitor> visitor_;
54+
};
55+
56+
/// \brief Utils to project expressions on rows to expressions on partitions.
57+
///
58+
/// There are two types of projections: inclusive and strict.
59+
///
60+
/// An inclusive projection guarantees that if an expression matches a row, the projected
61+
/// expression will match the row's partition.
62+
///
63+
/// A strict projection guarantees that if a partition matches a projected expression,
64+
/// then all rows in that partition will match the original expression.
65+
struct ICEBERG_EXPORT Projections {
66+
/// \brief Creates an inclusive ProjectionEvaluator for the partition spec.
67+
///
68+
/// An evaluator is used to project expressions for a table's data rows into expressions
69+
/// on the table's partition values. The evaluator returned by this function is
70+
/// inclusive and will build expressions with the following guarantee: if the original
71+
/// expression matches a row, then the projected expression will match that row's
72+
/// partition.
73+
///
74+
/// Each predicate in the expression is projected using Transform::Project.
75+
///
76+
/// \param spec a partition spec
77+
/// \param schema a schema
78+
/// \param case_sensitive whether the Projection should consider case sensitivity on
79+
/// column names or not. Defaults to true (case sensitive).
80+
/// \return an inclusive projection evaluator for the partition spec
81+
static std::unique_ptr<ProjectionEvaluator> Inclusive(const PartitionSpec& spec,
82+
const Schema& schema,
83+
bool case_sensitive = true);
84+
85+
/// \brief Creates a strict ProjectionEvaluator for the partition spec.
86+
///
87+
/// An evaluator is used to project expressions for a table's data rows into expressions
88+
/// on the table's partition values. The evaluator returned by this function is strict
89+
/// and will build expressions with the following guarantee: if the projected expression
90+
/// matches a partition, then the original expression will match all rows in that
91+
/// partition.
92+
///
93+
/// Each predicate in the expression is projected using Transform::ProjectStrict.
94+
///
95+
/// \param spec a partition spec
96+
/// \param schema a schema
97+
/// \param case_sensitive whether the Projection should consider case sensitivity on
98+
/// column names or not. Defaults to true (case sensitive).
99+
/// \return a strict projection evaluator for the partition spec
100+
static std::unique_ptr<ProjectionEvaluator> Strict(const PartitionSpec& spec,
101+
const Schema& schema,
102+
bool case_sensitive = true);
103+
};
104+
105+
} // namespace iceberg

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ iceberg_sources = files(
5151
'expression/literal.cc',
5252
'expression/manifest_evaluator.cc',
5353
'expression/predicate.cc',
54+
'expression/projections.cc',
5455
'expression/residual_evaluator.cc',
5556
'expression/rewrite_not.cc',
5657
'expression/strict_metrics_evaluator.cc',

src/iceberg/partition_spec.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
114114
private:
115115
/// \brief Create a new partition spec.
116116
///
117-
/// \param schema The table schema.
118117
/// \param spec_id The spec ID.
119118
/// \param fields The partition fields.
120119
/// \param last_assigned_field_id The last assigned field ID. If not provided, it will

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ add_iceberg_test(expression_test
8989
inclusive_metrics_evaluator_test.cc
9090
inclusive_metrics_evaluator_with_transform_test.cc
9191
predicate_test.cc
92+
projections_test.cc
9293
residual_evaluator_test.cc
9394
strict_metrics_evaluator_test.cc)
9495

src/iceberg/test/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ iceberg_tests = {
6666
'literal_test.cc',
6767
'manifest_evaluator_test.cc',
6868
'predicate_test.cc',
69+
'projections_test.cc',
6970
'residual_evaluator_test.cc',
7071
'strict_metrics_evaluator_test.cc',
7172
),

0 commit comments

Comments
 (0)