Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions src/iceberg/expression/residual_evaluator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -299,16 +299,12 @@ class UnpartitionedResidualEvaluator : public ResidualEvaluator {
public:
explicit UnpartitionedResidualEvaluator(std::shared_ptr<Expression> expr)
: ResidualEvaluator(std::move(expr), *PartitionSpec::Unpartitioned(),
*kEmptySchema_, true) {}
*Schema::EmptySchema(), true) {}

Result<std::shared_ptr<Expression>> ResidualFor(
const StructLike& /*partition_data*/) const override {
return expr_;
}

private:
// Store an empty schema to avoid dangling reference when passing to base class
inline static const std::shared_ptr<Schema> kEmptySchema_ = Schema::EmptySchema();
};

} // namespace
Expand Down
5 changes: 2 additions & 3 deletions src/iceberg/json_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -488,9 +488,8 @@ Result<std::unique_ptr<Schema>> SchemaFromJson(const nlohmann::json& json) {
auto identifier_field_ids,
GetJsonValueOrDefault<std::vector<int32_t>>(json, kIdentifierFieldIds));

return std::make_unique<Schema>(std::move(fields),
schema_id_opt.value_or(Schema::kInitialSchemaId),
std::move(identifier_field_ids));
return Schema::Make(std::move(fields), schema_id_opt.value_or(Schema::kInitialSchemaId),
std::move(identifier_field_ids));
}

nlohmann::json ToJson(const PartitionField& partition_field) {
Expand Down
84 changes: 79 additions & 5 deletions src/iceberg/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <format>
#include <functional>
#include <stack>

#include "iceberg/result.h"
#include "iceberg/row/struct_like.h"
Expand All @@ -34,11 +35,25 @@

namespace iceberg {

Schema::Schema(std::vector<SchemaField> fields, int32_t schema_id,
std::vector<int32_t> identifier_field_ids)
: StructType(std::move(fields)),
schema_id_(schema_id),
identifier_field_ids_(std::move(identifier_field_ids)) {}
Schema::Schema(std::vector<SchemaField> fields, int32_t schema_id)
: StructType(std::move(fields)), schema_id_(schema_id) {}

Result<std::unique_ptr<Schema>> Schema::Make(std::vector<SchemaField> fields,
int32_t schema_id,
std::vector<int32_t> identifier_field_ids) {
auto schema = std::make_unique<Schema>(std::move(fields), schema_id);

if (!identifier_field_ids.empty()) {
auto id_to_parent = IndexParents(*schema);
for (auto field_id : identifier_field_ids) {
ICEBERG_RETURN_UNEXPECTED(
ValidateIdentifierFields(field_id, *schema, id_to_parent));
}
}

schema->identifier_field_ids_ = std::move(identifier_field_ids);
return schema;
}

Result<std::unique_ptr<Schema>> Schema::Make(
std::vector<SchemaField> fields, int32_t schema_id,
Expand All @@ -53,10 +68,69 @@ Result<std::unique_ptr<Schema>> Schema::Make(
}
fresh_identifier_ids.push_back(field.value().get().field_id());
}

if (!fresh_identifier_ids.empty()) {
auto id_to_parent = IndexParents(*schema);
for (auto field_id : fresh_identifier_ids) {
ICEBERG_RETURN_UNEXPECTED(
ValidateIdentifierFields(field_id, *schema, id_to_parent));
}
}

schema->identifier_field_ids_ = std::move(fresh_identifier_ids);
return schema;
}

Status Schema::ValidateIdentifierFields(
int32_t field_id, const Schema& schema,
const std::unordered_map<int32_t, int32_t>& id_to_parent) {
ICEBERG_ASSIGN_OR_RAISE(auto field_opt, schema.FindFieldById(field_id));
ICEBERG_PRECHECK(field_opt.has_value(),
"Cannot add field {} as an identifier field: field does not exist",
field_id);

const SchemaField& field = field_opt.value().get();
ICEBERG_PRECHECK(
field.type()->is_primitive(),
"Cannot add field {} as an identifier field: not a primitive type field", field_id);
ICEBERG_PRECHECK(!field.optional(),
"Cannot add field {} as an identifier field: not a required field",
field_id);
ICEBERG_PRECHECK(
field.type()->type_id() != TypeId::kDouble &&
field.type()->type_id() != TypeId::kFloat,
"Cannot add field {} as an identifier field: must not be float or double field",
field_id);

// check whether the nested field is in a chain of required struct fields
// exploring from root for better error message for list and map types
std::stack<int32_t> ancestors;
auto parent_it = id_to_parent.find(field.field_id());
while (parent_it != id_to_parent.end()) {
ancestors.push(parent_it->second);
parent_it = id_to_parent.find(parent_it->second);
}

while (!ancestors.empty()) {
ICEBERG_ASSIGN_OR_RAISE(auto parent_opt, schema.FindFieldById(ancestors.top()));
ICEBERG_PRECHECK(
parent_opt.has_value(),
"Cannot add field {} as an identifier field: parent field id {} does not exist",
field_id, ancestors.top());
const SchemaField& parent = parent_opt.value().get();
ICEBERG_PRECHECK(
parent.type()->type_id() == TypeId::kStruct,
"Cannot add field {} as an identifier field: must not be nested in {}", field_id,
*parent.type());
ICEBERG_PRECHECK(!parent.optional(),
"Cannot add field {} as an identifier field: must not be nested in "
"optional field {}",
field_id, parent.field_id());
ancestors.pop();
}
return {};
}

const std::shared_ptr<Schema>& Schema::EmptySchema() {
static const auto empty_schema =
std::make_shared<Schema>(std::vector<SchemaField>{}, kInitialSchemaId);
Expand Down
39 changes: 34 additions & 5 deletions src/iceberg/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,47 @@ class ICEBERG_EXPORT Schema : public StructType {
/// \brief Special value to select all columns from manifest files.
static constexpr std::string_view kAllColumns = "*";

explicit Schema(std::vector<SchemaField> fields, int32_t schema_id = kInitialSchemaId,
std::vector<int32_t> identifier_field_ids = {});
explicit Schema(std::vector<SchemaField> fields, int32_t schema_id = kInitialSchemaId);

/// \brief Create a schema.
///
/// \param fields The fields that make up the schema.
/// \param schema_id The unique identifier for this schema (default:kInitialSchemaId).
/// \param identifier_field_ids Field IDs that uniquely identify rows in the table.
/// \return A new Schema instance or Status if failed.
static Result<std::unique_ptr<Schema>> Make(std::vector<SchemaField> fields,
int32_t schema_id,
std::vector<int32_t> identifier_field_ids);

/// \brief Create a schema.
///
/// \param fields The fields that make up the schema.
/// \param schema_id The unique identifier for this schema (default: kInitialSchemaId).
/// \param identifier_field_names Canonical names of fields that uniquely identify rows
/// in the table (default: empty). \return A new Schema instance or Status if failed.
/// in the table.
/// \return A new Schema instance or Status if failed.
static Result<std::unique_ptr<Schema>> Make(
std::vector<SchemaField> fields, int32_t schema_id = kInitialSchemaId,
const std::vector<std::string>& identifier_field_names = {});
std::vector<SchemaField> fields, int32_t schema_id,
const std::vector<std::string>& identifier_field_names);

/// \brief Validate that the identifier field with the given ID is valid for the schema
///
/// This method checks that the specified field ID represents a valid identifier field
/// according to Iceberg's identifier field requirements. It verifies that the field:
/// - exists in the schema
/// - is a primitive type
/// - is not optional (required field)
/// - is not a float or double type
/// - is not nested within optional or non-struct parent fields
///
/// \param field_id The ID of the field to validate as an identifier field.
/// \param schema The schema containing the field to validate.
/// \param id_to_parent A mapping from field IDs to their parent field IDs for nested
/// field validation.
/// \return Status indicating success or failure of the validation.
static Status ValidateIdentifierFields(
int32_t field_id, const Schema& schema,
const std::unordered_map<int32_t, int32_t>& id_to_parent);

/// \brief Get an empty schema.
///
Expand Down
6 changes: 3 additions & 3 deletions src/iceberg/table_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -958,9 +958,9 @@ Result<int32_t> TableMetadataBuilder::Impl::AddSchema(const Schema& schema,

metadata_.last_column_id = new_last_column_id;

auto new_schema =
std::make_shared<Schema>(schema.fields() | std::ranges::to<std::vector>(),
new_schema_id, schema.IdentifierFieldIds());
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<Schema> new_schema,
Schema::Make(schema.fields() | std::ranges::to<std::vector>(),
new_schema_id, schema.IdentifierFieldIds()))

if (!schema_found) {
metadata_.schemas.push_back(new_schema);
Expand Down
25 changes: 11 additions & 14 deletions src/iceberg/test/assign_id_visitor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ std::shared_ptr<Type> CreateNestedStruct() {
});
}

Schema CreateNestedSchema(std::vector<int32_t> identifier_field_ids = {}) {
return Schema(
Result<std::unique_ptr<Schema>> CreateNestedSchema(
std::vector<int32_t> identifier_field_ids = {}) {
return Schema::Make(
{
SchemaField::MakeRequired(/*field_id=*/10, "id", iceberg::int64()),
SchemaField::MakeOptional(/*field_id=*/20, "list", CreateListOfStruct()),
Expand Down Expand Up @@ -108,11 +109,11 @@ TEST(AssignFreshIdVisitorTest, FlatSchema) {
}

TEST(AssignFreshIdVisitorTest, NestedSchema) {
Schema schema = CreateNestedSchema();
ICEBERG_UNWRAP_OR_FAIL(auto schema, CreateNestedSchema());
std::atomic<int32_t> id = 0;
auto next_id = [&id]() { return ++id; };
ICEBERG_UNWRAP_OR_FAIL(auto fresh_schema,
AssignFreshIds(Schema::kInitialSchemaId, schema, next_id));
AssignFreshIds(Schema::kInitialSchemaId, *schema, next_id));

ASSERT_EQ(4, fresh_schema->fields().size());
for (int32_t i = 0; i < fresh_schema->fields().size(); ++i) {
Expand Down Expand Up @@ -169,20 +170,16 @@ TEST(AssignFreshIdVisitorTest, NestedSchema) {
}

TEST(AssignFreshIdVisitorTest, RefreshIdentifierId) {
std::atomic<int32_t> id = 0;
int32_t id = 0;
auto next_id = [&id]() { return ++id; };

Schema invalid_schema = CreateNestedSchema({10, 400});
// Invalid identified field id
auto result = AssignFreshIds(Schema::kInitialSchemaId, invalid_schema, next_id);
EXPECT_THAT(result, IsError(ErrorKind::kInvalidSchema));
EXPECT_THAT(result, HasErrorMessage("Cannot find"));

id = 0;
Schema schema = CreateNestedSchema({10, 301});
ICEBERG_UNWRAP_OR_FAIL(auto schema, CreateNestedSchema({10, 301}));
ICEBERG_UNWRAP_OR_FAIL(auto fresh_schema,
AssignFreshIds(Schema::kInitialSchemaId, schema, next_id));
AssignFreshIds(Schema::kInitialSchemaId, *schema, next_id));
EXPECT_THAT(fresh_schema->IdentifierFieldIds(), testing::ElementsAre(1, 12));
ICEBERG_UNWRAP_OR_FAIL(auto identifier_field_names,
fresh_schema->IdentifierFieldNames());
EXPECT_THAT(identifier_field_names, testing::ElementsAre("id", "struct.outer_id"));
}

} // namespace iceberg
13 changes: 7 additions & 6 deletions src/iceberg/test/metadata_serde_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,13 @@ TEST(MetadataSerdeTest, DeserializeV2Valid) {
/*optional=*/false)},
/*schema_id=*/0);

auto expected_schema_2 = std::make_shared<Schema>(
std::vector<SchemaField>{SchemaField::MakeRequired(1, "x", int64()),
SchemaField::MakeRequired(2, "y", int64()),
SchemaField::MakeRequired(3, "z", int64())},
/*schema_id=*/1,
/*identifier_field_ids=*/std::vector<int32_t>{1, 2});
ICEBERG_UNWRAP_OR_FAIL(
std::shared_ptr<Schema> expected_schema_2,
Schema::Make(std::vector<SchemaField>{SchemaField::MakeRequired(1, "x", int64()),
SchemaField::MakeRequired(2, "y", int64()),
SchemaField::MakeRequired(3, "z", int64())},
/*schema_id=*/1,
/*identifier_field_ids=*/std::vector<int32_t>{1, 2}));

auto expected_spec_result = PartitionSpec::Make(
/*spec_id=*/0,
Expand Down
Loading
Loading