Skip to content

Commit 586642f

Browse files
committed
fix: Validate identifier fields in Schema::Make
1 parent ff8eea9 commit 586642f

File tree

5 files changed

+225
-89
lines changed

5 files changed

+225
-89
lines changed

src/iceberg/expression/residual_evaluator.cc

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -299,17 +299,12 @@ class UnpartitionedResidualEvaluator : public ResidualEvaluator {
299299
public:
300300
explicit UnpartitionedResidualEvaluator(std::shared_ptr<Expression> expr)
301301
: ResidualEvaluator(std::move(expr), *PartitionSpec::Unpartitioned(),
302-
*kEmptySchema_, true) {}
302+
*Schema::kEmptySchema, true) {}
303303

304304
Result<std::shared_ptr<Expression>> ResidualFor(
305305
const StructLike& /*partition_data*/) const override {
306306
return expr_;
307307
}
308-
309-
private:
310-
// Store an empty schema to avoid dangling reference when passing to base class
311-
inline static const std::shared_ptr<Schema> kEmptySchema_ =
312-
std::make_shared<Schema>(std::vector<SchemaField>{}, std::nullopt);
313308
};
314309

315310
} // namespace

src/iceberg/schema.cc

Lines changed: 84 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include <format>
2323
#include <functional>
24+
#include <stack>
2425

2526
#include "iceberg/result.h"
2627
#include "iceberg/row/struct_like.h"
@@ -34,11 +35,80 @@
3435

3536
namespace iceberg {
3637

37-
Schema::Schema(std::vector<SchemaField> fields, std::optional<int32_t> schema_id,
38-
std::vector<int32_t> identifier_field_ids)
39-
: StructType(std::move(fields)),
40-
schema_id_(schema_id),
41-
identifier_field_ids_(std::move(identifier_field_ids)) {}
38+
namespace {
39+
Status ValidateIdentifierFields(
40+
int32_t field_id, const Schema& schema,
41+
const std::unordered_map<int32_t, int32_t>& id_to_parent) {
42+
ICEBERG_ASSIGN_OR_RAISE(auto field_opt, schema.FindFieldById(field_id));
43+
ICEBERG_PRECHECK(field_opt.has_value(),
44+
"Cannot add field {} as an identifier field: field does not exist",
45+
field_id);
46+
47+
const SchemaField& field = field_opt.value().get();
48+
ICEBERG_PRECHECK(
49+
field.type()->is_primitive(),
50+
"Cannot add field {} as an identifier field: not a primitive type field", field_id);
51+
ICEBERG_PRECHECK(!field.optional(),
52+
"Cannot add field {} as an identifier field: not a required field",
53+
field_id);
54+
ICEBERG_PRECHECK(
55+
field.type()->type_id() != TypeId::kDouble &&
56+
field.type()->type_id() != TypeId::kFloat,
57+
"Cannot add field {} as an identifier field: must not be float or double field",
58+
field_id);
59+
60+
// check whether the nested field is in a chain of required struct fields
61+
// exploring from root for better error message for list and map types
62+
std::stack<int32_t> ancestors;
63+
auto parent_it = id_to_parent.find(field.field_id());
64+
while (parent_it != id_to_parent.end()) {
65+
ancestors.push(parent_it->second);
66+
parent_it = id_to_parent.find(parent_it->second);
67+
}
68+
69+
while (!ancestors.empty()) {
70+
ICEBERG_ASSIGN_OR_RAISE(auto parent_opt, schema.FindFieldById(ancestors.top()));
71+
ICEBERG_PRECHECK(
72+
parent_opt.has_value(),
73+
"Cannot add field {} as an identifier field: parent field id {} does not exist",
74+
field_id, ancestors.top());
75+
const SchemaField& parent = parent_opt.value().get();
76+
ICEBERG_PRECHECK(
77+
parent.type()->type_id() == TypeId::kStruct,
78+
"Cannot add field {} as an identifier field: must not be nested in {}", field_id,
79+
*parent.type());
80+
ICEBERG_PRECHECK(!parent.optional(),
81+
"Cannot add field {} as an identifier field: must not be nested in "
82+
"optional field {}",
83+
field_id, parent.field_id());
84+
ancestors.pop();
85+
}
86+
return {};
87+
}
88+
} // namespace
89+
90+
const std::shared_ptr<Schema> Schema::kEmptySchema =
91+
std::make_shared<Schema>(std::vector<SchemaField>{}, std::nullopt);
92+
93+
Schema::Schema(std::vector<SchemaField> fields, std::optional<int32_t> schema_id)
94+
: StructType(std::move(fields)), schema_id_(schema_id) {}
95+
96+
Result<std::unique_ptr<Schema>> Schema::Make(std::vector<SchemaField> fields,
97+
std::optional<int32_t> schema_id,
98+
std::vector<int32_t> identifier_field_ids) {
99+
auto schema = std::make_unique<Schema>(std::move(fields), schema_id);
100+
101+
if (!identifier_field_ids.empty()) {
102+
auto id_to_parent = IndexParents(*schema);
103+
for (auto field_id : identifier_field_ids) {
104+
ICEBERG_RETURN_UNEXPECTED(
105+
ValidateIdentifierFields(field_id, *schema, id_to_parent));
106+
}
107+
}
108+
109+
schema->identifier_field_ids_ = std::move(identifier_field_ids);
110+
return schema;
111+
}
42112

43113
Result<std::unique_ptr<Schema>> Schema::Make(
44114
std::vector<SchemaField> fields, std::optional<int32_t> schema_id,
@@ -53,6 +123,15 @@ Result<std::unique_ptr<Schema>> Schema::Make(
53123
}
54124
fresh_identifier_ids.push_back(field.value().get().field_id());
55125
}
126+
127+
if (!fresh_identifier_ids.empty()) {
128+
auto id_to_parent = IndexParents(*schema);
129+
for (auto field_id : fresh_identifier_ids) {
130+
ICEBERG_RETURN_UNEXPECTED(
131+
ValidateIdentifierFields(field_id, *schema, id_to_parent));
132+
}
133+
}
134+
56135
schema->identifier_field_ids_ = std::move(fresh_identifier_ids);
57136
return schema;
58137
}

src/iceberg/schema.h

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,19 +52,33 @@ class ICEBERG_EXPORT Schema : public StructType {
5252
/// \brief Special value to select all columns from manifest files.
5353
static constexpr std::string_view kAllColumns = "*";
5454

55+
/// \brief An empty schema instance.
56+
static const std::shared_ptr<Schema> kEmptySchema;
57+
5558
explicit Schema(std::vector<SchemaField> fields,
56-
std::optional<int32_t> schema_id = std::nullopt,
57-
std::vector<int32_t> identifier_field_ids = {});
59+
std::optional<int32_t> schema_id = std::nullopt);
60+
61+
/// \brief Create a schema.
62+
///
63+
/// \param fields The fields that make up the schema.
64+
/// \param schema_id The unique identifier for this schema (default:
65+
/// kInitialSchemaId). \param identifier_field_ids Field IDs that uniquely identify
66+
/// rows in the table (default: empty). \return A new Schema instance or Status if
67+
/// failed.
68+
static Result<std::unique_ptr<Schema>> Make(std::vector<SchemaField> fields,
69+
std::optional<int32_t> schema_id,
70+
std::vector<int32_t> identifier_field_ids);
5871

5972
/// \brief Create a schema.
6073
///
6174
/// \param fields The fields that make up the schema.
6275
/// \param schema_id The unique identifier for this schema (default: kInitialSchemaId).
6376
/// \param identifier_field_names Canonical names of fields that uniquely identify rows
64-
/// in the table (default: empty). \return A new Schema instance or Status if failed.
77+
/// in the table (default: empty).
78+
/// \return A new Schema instance or Status if failed.
6579
static Result<std::unique_ptr<Schema>> Make(
66-
std::vector<SchemaField> fields, std::optional<int32_t> schema_id = std::nullopt,
67-
const std::vector<std::string>& identifier_field_names = {});
80+
std::vector<SchemaField> fields, std::optional<int32_t> schema_id,
81+
const std::vector<std::string>& identifier_field_names);
6882

6983
/// \brief Get the schema ID.
7084
///

src/iceberg/test/assign_id_visitor_test.cc

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,9 @@ std::shared_ptr<Type> CreateNestedStruct() {
7474
});
7575
}
7676

77-
Schema CreateNestedSchema(std::vector<int32_t> identifier_field_ids = {}) {
78-
return Schema(
77+
Result<std::unique_ptr<Schema>> CreateNestedSchema(
78+
std::vector<int32_t> identifier_field_ids = {}) {
79+
return Schema::Make(
7980
{
8081
SchemaField::MakeRequired(/*field_id=*/10, "id", iceberg::int64()),
8182
SchemaField::MakeOptional(/*field_id=*/20, "list", CreateListOfStruct()),
@@ -108,11 +109,11 @@ TEST(AssignFreshIdVisitorTest, FlatSchema) {
108109
}
109110

110111
TEST(AssignFreshIdVisitorTest, NestedSchema) {
111-
Schema schema = CreateNestedSchema();
112+
ICEBERG_UNWRAP_OR_FAIL(auto schema, CreateNestedSchema());
112113
std::atomic<int32_t> id = 0;
113114
auto next_id = [&id]() { return ++id; };
114115
ICEBERG_UNWRAP_OR_FAIL(auto fresh_schema,
115-
AssignFreshIds(Schema::kInitialSchemaId, schema, next_id));
116+
AssignFreshIds(Schema::kInitialSchemaId, *schema, next_id));
116117

117118
ASSERT_EQ(4, fresh_schema->fields().size());
118119
for (int32_t i = 0; i < fresh_schema->fields().size(); ++i) {
@@ -169,20 +170,16 @@ TEST(AssignFreshIdVisitorTest, NestedSchema) {
169170
}
170171

171172
TEST(AssignFreshIdVisitorTest, RefreshIdentifierId) {
172-
std::atomic<int32_t> id = 0;
173+
int32_t id = 0;
173174
auto next_id = [&id]() { return ++id; };
174175

175-
Schema invalid_schema = CreateNestedSchema({10, 400});
176-
// Invalid identified field id
177-
auto result = AssignFreshIds(Schema::kInitialSchemaId, invalid_schema, next_id);
178-
EXPECT_THAT(result, IsError(ErrorKind::kInvalidSchema));
179-
EXPECT_THAT(result, HasErrorMessage("Cannot find"));
180-
181-
id = 0;
182-
Schema schema = CreateNestedSchema({10, 301});
176+
ICEBERG_UNWRAP_OR_FAIL(auto schema, CreateNestedSchema({10, 301}));
183177
ICEBERG_UNWRAP_OR_FAIL(auto fresh_schema,
184-
AssignFreshIds(Schema::kInitialSchemaId, schema, next_id));
178+
AssignFreshIds(Schema::kInitialSchemaId, *schema, next_id));
185179
EXPECT_THAT(fresh_schema->IdentifierFieldIds(), testing::ElementsAre(1, 12));
180+
ICEBERG_UNWRAP_OR_FAIL(auto identifier_field_names,
181+
fresh_schema->IdentifierFieldNames());
182+
EXPECT_THAT(identifier_field_names, testing::ElementsAre("id", "struct.outer_id"));
186183
}
187184

188185
} // namespace iceberg

0 commit comments

Comments
 (0)