Skip to content

Commit 7836c3e

Browse files
committed
feat: add schema update to table metadata builder
1 parent 9c00bfa commit 7836c3e

File tree

10 files changed

+509
-26
lines changed

10 files changed

+509
-26
lines changed

src/iceberg/partition_spec.cc

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,42 @@ Status PartitionSpec::Validate(const Schema& schema, bool allow_missing_fields)
155155
return {};
156156
}
157157

158+
Status PartitionSpec::ValidatePartitionName(const Schema& schema) const {
159+
std::unordered_set<std::string> partition_names;
160+
for (const auto& partition_field : fields_) {
161+
auto name = std::string(partition_field.name());
162+
if (name.empty()) {
163+
return InvalidArgument("Cannot use empty partition name: {}", name);
164+
}
165+
if (partition_names.contains(name)) {
166+
return InvalidArgument("Cannot use partition name more than once: {}", name);
167+
}
168+
partition_names.insert(name);
169+
170+
ICEBERG_ASSIGN_OR_RAISE(auto schema_field, schema.FindFieldByName(name));
171+
auto transform_type = partition_field.transform()->transform_type();
172+
if (transform_type == TransformType::kIdentity) {
173+
// for identity transform case we allow conflicts between partition and schema field
174+
// name as long as they are sourced from the same schema field
175+
if (schema_field.has_value() &&
176+
schema_field.value().get().field_id() != partition_field.source_id()) {
177+
return InvalidArgument(
178+
"Cannot create identity partition sourced from different field in schema: {}",
179+
name);
180+
}
181+
} else {
182+
// for all other transforms we don't allow conflicts between partition name and
183+
// schema field name
184+
if (schema_field.has_value()) {
185+
return InvalidArgument(
186+
"Cannot create partition from name that exists in schema: {}", name);
187+
}
188+
}
189+
}
190+
191+
return {};
192+
}
193+
158194
Result<std::vector<std::reference_wrapper<const PartitionField>>>
159195
PartitionSpec::GetFieldsBySourceId(int32_t source_id) const {
160196
ICEBERG_ASSIGN_OR_RAISE(auto source_id_to_fields, source_id_to_fields_.Get(*this));

src/iceberg/partition_spec.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
7979
/// \return Error status if the partition spec is invalid.
8080
Status Validate(const Schema& schema, bool allow_missing_fields) const;
8181

82+
// \brief Validates the partition field names are unique within the partition spec and
83+
// schema.
84+
Status ValidatePartitionName(const Schema& schema) const;
85+
8286
/// \brief Get the partition fields by source ID.
8387
/// \param source_id The id of the source field.
8488
/// \return The partition fields by source ID, or NotFound if the source field is not

src/iceberg/schema.cc

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "iceberg/util/macros.h"
3131
#include "iceberg/util/type_util.h"
3232
#include "iceberg/util/visit_type.h"
33+
#include "table_metadata.h"
3334

3435
namespace iceberg {
3536

@@ -179,4 +180,43 @@ Result<std::unique_ptr<Schema>> Schema::Project(
179180
std::nullopt);
180181
}
181182

183+
Result<int32_t> Schema::HighestFieldId() const {
184+
ICEBERG_ASSIGN_OR_RAISE(auto id_to_field, id_to_field_.Get(*this));
185+
186+
if (id_to_field.get().empty()) {
187+
return kInitialColumnId;
188+
}
189+
190+
auto max_it = std::ranges::max_element(
191+
id_to_field.get(),
192+
[](const auto& lhs, const auto& rhs) { return lhs.first < rhs.first; });
193+
194+
return max_it->first;
195+
}
196+
197+
bool Schema::SameSchema(const Schema& other) const { return fields_ == other.fields_; }
198+
199+
Status Schema::Validate(int32_t format_version) const {
200+
// Get all fields including nested ones
201+
ICEBERG_ASSIGN_OR_RAISE(auto id_to_field, id_to_field_.Get(*this));
202+
203+
// Check each field's type and defaults
204+
for (const auto& [field_id, field_ref] : id_to_field.get()) {
205+
const auto& field = field_ref.get();
206+
207+
// Check if the field's type requires a minimum format version
208+
if (auto it = TableMetadata::kMinFormatVersions.find(field.type()->type_id());
209+
it != TableMetadata::kMinFormatVersions.end()) {
210+
if (int32_t min_format_version = it->second; format_version < min_format_version) {
211+
return InvalidSchema("Invalid type for {}: {} is not supported until v{}",
212+
field.name(), *field.type(), min_format_version);
213+
}
214+
}
215+
216+
// TODO(GuoTao.yu): Check default values when they are supported
217+
}
218+
219+
return {};
220+
}
221+
182222
} // namespace iceberg

src/iceberg/schema.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ namespace iceberg {
4646
class ICEBERG_EXPORT Schema : public StructType {
4747
public:
4848
static constexpr int32_t kInitialSchemaId = 0;
49+
static constexpr int32_t kInitialColumnId = 0;
4950
static constexpr int32_t kInvalidColumnId = -1;
5051

5152
explicit Schema(std::vector<SchemaField> fields,
@@ -103,6 +104,23 @@ class ICEBERG_EXPORT Schema : public StructType {
103104
Result<std::unique_ptr<Schema>> Project(
104105
const std::unordered_set<int32_t>& field_ids) const;
105106

107+
/// \brief Get the highest field ID in the schema.
108+
/// \return The highest field ID.
109+
Result<int32_t> HighestFieldId() const;
110+
111+
/// \brief Checks whether this schema is equivalent to another schema while ignoring the
112+
/// schema id.
113+
bool SameSchema(const Schema& other) const;
114+
115+
/// \brief Validate the schema for a given format version.
116+
///
117+
/// This validates that the schema does not contain types that were released in later
118+
/// format versions.
119+
///
120+
/// \param format_version The format version to validate against.
121+
/// \return Error status if the schema is invalid.
122+
Status Validate(int32_t format_version) const;
123+
106124
friend bool operator==(const Schema& lhs, const Schema& rhs) { return lhs.Equals(rhs); }
107125

108126
private:

src/iceberg/table_metadata.cc

Lines changed: 197 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -522,16 +522,166 @@ TableMetadataBuilder& TableMetadataBuilder::UpgradeFormatVersion(
522522
}
523523

524524
TableMetadataBuilder& TableMetadataBuilder::SetCurrentSchema(
525-
std::shared_ptr<Schema> schema, int32_t new_last_column_id) {
526-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
525+
std::shared_ptr<Schema> const& schema, int32_t new_last_column_id) {
526+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto schema_id,
527+
AddSchemaInternal(*schema, new_last_column_id));
528+
return SetCurrentSchema(schema_id);
527529
}
528530

529531
TableMetadataBuilder& TableMetadataBuilder::SetCurrentSchema(int32_t schema_id) {
530-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
532+
if (schema_id == kLastAdded) {
533+
ICEBERG_BUILDER_CHECK(impl_->last_added_schema_id.has_value(),
534+
"Cannot set last added schema: no schema has been added");
535+
return SetCurrentSchema(impl_->last_added_schema_id.value());
536+
}
537+
538+
if (impl_->metadata.current_schema_id == schema_id) {
539+
return *this;
540+
}
541+
542+
auto it = impl_->schemas_by_id.find(schema_id);
543+
ICEBERG_BUILDER_CHECK(it != impl_->schemas_by_id.end(),
544+
"Cannot set current schema to unknown schema: {}", schema_id);
545+
const auto& schema = it->second;
546+
547+
// Rebuild all partition specs for the new current schema
548+
std::vector<std::shared_ptr<PartitionSpec>> updated_specs;
549+
for (const auto& spec : impl_->metadata.partition_specs) {
550+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto updated_spec, UpdateSpecSchema(*schema, *spec));
551+
updated_specs.push_back(std::move(updated_spec));
552+
}
553+
impl_->metadata.partition_specs = std::move(updated_specs);
554+
impl_->specs_by_id.clear();
555+
for (const auto& spec : impl_->metadata.partition_specs) {
556+
impl_->specs_by_id.emplace(spec->spec_id(), spec);
557+
}
558+
559+
// Rebuild all sort orders for the new current schema
560+
std::vector<std::shared_ptr<SortOrder>> updated_orders;
561+
for (const auto& order : impl_->metadata.sort_orders) {
562+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto updated_order,
563+
UpdateSortOrderSchema(*schema, *order));
564+
updated_orders.push_back(std::move(updated_order));
565+
}
566+
impl_->metadata.sort_orders = std::move(updated_orders);
567+
impl_->sort_orders_by_id.clear();
568+
for (const auto& order : impl_->metadata.sort_orders) {
569+
impl_->sort_orders_by_id.emplace(order->order_id(), order);
570+
}
571+
572+
// Set the current schema ID
573+
impl_->metadata.current_schema_id = schema_id;
574+
575+
// Record the change
576+
if (impl_->last_added_schema_id.has_value() &&
577+
impl_->last_added_schema_id.value() == schema_id) {
578+
impl_->changes.push_back(std::make_unique<table::SetCurrentSchema>(kLastAdded));
579+
} else {
580+
impl_->changes.push_back(std::make_unique<table::SetCurrentSchema>(schema_id));
581+
}
582+
583+
return *this;
531584
}
532585

533-
TableMetadataBuilder& TableMetadataBuilder::AddSchema(std::shared_ptr<Schema> schema) {
534-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
586+
Result<std::shared_ptr<PartitionSpec>> TableMetadataBuilder::UpdateSpecSchema(
587+
const Schema& schema, const PartitionSpec& partition_spec) {
588+
// UpdateSpecSchema: Update partition spec to use the new schema
589+
// This preserves the partition spec structure but rebinds it to the new schema
590+
591+
// Copy all fields from the partition spec. IDs should not change.
592+
std::vector<PartitionField> fields;
593+
fields.reserve(partition_spec.fields().size());
594+
int32_t last_assigned_field_id = PartitionSpec::kLegacyPartitionDataIdStart;
595+
for (const auto& field : partition_spec.fields()) {
596+
fields.push_back(field);
597+
last_assigned_field_id = std::max(last_assigned_field_id, field.field_id());
598+
}
599+
600+
// Build without validation because the schema may have changed in a way that makes
601+
// this spec invalid. The spec should still be preserved so that older metadata can
602+
// be interpreted.
603+
ICEBERG_ASSIGN_OR_RAISE(auto new_partition_spec,
604+
PartitionSpec::Make(partition_spec.spec_id(), std::move(fields),
605+
last_assigned_field_id));
606+
607+
// Validate the new partition name against the new schema
608+
ICEBERG_RETURN_UNEXPECTED(new_partition_spec->ValidatePartitionName(schema));
609+
return new_partition_spec;
610+
}
611+
612+
Result<std::unique_ptr<SortOrder>> TableMetadataBuilder::UpdateSortOrderSchema(
613+
const Schema& schema, const SortOrder& sort_order) {
614+
// Build without validation because the schema may have changed in a way that makes
615+
// this order invalid. The order should still be preserved so that older metadata can
616+
// be interpreted.
617+
auto fields = sort_order.fields();
618+
std::vector<SortField> new_fields{fields.begin(), fields.end()};
619+
return SortOrder::Make(sort_order.order_id(), std::move(new_fields));
620+
}
621+
622+
TableMetadataBuilder& TableMetadataBuilder::AddSchema(
623+
std::shared_ptr<Schema> const& schema) {
624+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto highest_field_id, schema->HighestFieldId());
625+
AddSchemaInternal(*schema, std::max(impl_->metadata.last_column_id, highest_field_id));
626+
return *this;
627+
}
628+
629+
Result<int32_t> TableMetadataBuilder::AddSchemaInternal(const Schema& schema,
630+
int32_t new_last_column_id) {
631+
if (new_last_column_id < impl_->metadata.last_column_id) {
632+
return InvalidArgument("Invalid last column ID: {} < {} (previous last column ID)",
633+
new_last_column_id, impl_->metadata.last_column_id);
634+
}
635+
636+
ICEBERG_RETURN_UNEXPECTED(schema.Validate(impl_->metadata.format_version));
637+
638+
auto new_schema_id = ReuseOrCreateNewSchemaId(schema);
639+
if (impl_->schemas_by_id.find(new_schema_id) != impl_->schemas_by_id.end()) {
640+
// update last_added_schema_id if the schema was added in this set of changes (since
641+
// it is now the last)
642+
bool is_new_schema =
643+
impl_->last_added_schema_id.has_value() &&
644+
std::ranges::find_if(impl_->changes, [new_schema_id](const auto& change) {
645+
if (change->kind() != TableUpdate::Kind::kAddSchema) {
646+
return false;
647+
}
648+
auto* add_schema = dynamic_cast<table::AddSchema*>(change.get());
649+
return add_schema->schema()->schema_id().value_or(Schema::kInitialSchemaId) ==
650+
new_schema_id;
651+
}) != impl_->changes.cend();
652+
impl_->last_added_schema_id =
653+
is_new_schema ? std::make_optional(new_schema_id) : std::nullopt;
654+
return new_schema_id;
655+
}
656+
657+
auto new_schema = std::make_shared<Schema>(
658+
std::vector<SchemaField>(schema.fields().begin(), schema.fields().end()),
659+
new_schema_id);
660+
661+
impl_->metadata.schemas.push_back(new_schema);
662+
impl_->schemas_by_id.emplace(new_schema_id, new_schema);
663+
664+
impl_->changes.push_back(
665+
std::make_unique<table::AddSchema>(new_schema, new_last_column_id));
666+
impl_->metadata.last_column_id = new_last_column_id;
667+
impl_->last_added_schema_id = new_schema_id;
668+
669+
return new_schema_id;
670+
}
671+
672+
int32_t TableMetadataBuilder::ReuseOrCreateNewSchemaId(const Schema& new_schema) const {
673+
// if the schema already exists, use its id; otherwise use the highest id + 1
674+
auto new_schema_id =
675+
impl_->metadata.current_schema_id.value_or(Schema::kInitialSchemaId);
676+
for (auto& schema : impl_->metadata.schemas) {
677+
auto schema_id = schema->schema_id().value_or(Schema::kInitialSchemaId);
678+
if (schema->SameSchema(new_schema)) {
679+
return schema_id;
680+
} else if (new_schema_id <= schema_id) {
681+
new_schema_id = schema_id + 1;
682+
}
683+
}
684+
return new_schema_id;
535685
}
536686

537687
TableMetadataBuilder& TableMetadataBuilder::SetDefaultPartitionSpec(
@@ -555,7 +705,23 @@ TableMetadataBuilder& TableMetadataBuilder::RemovePartitionSpecs(
555705

556706
TableMetadataBuilder& TableMetadataBuilder::RemoveSchemas(
557707
const std::vector<int32_t>& schema_ids) {
558-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
708+
std::unordered_set<int32_t> schema_ids_to_remove(schema_ids.begin(), schema_ids.end());
709+
auto current_schema_id =
710+
impl_->metadata.current_schema_id.value_or(Schema::kInitialSchemaId);
711+
ICEBERG_BUILDER_CHECK(schema_ids_to_remove.contains(current_schema_id),
712+
"Cannot remove current schema: {}", current_schema_id);
713+
714+
if (!schema_ids_to_remove.empty()) {
715+
impl_->metadata.schemas =
716+
impl_->metadata.schemas | std::views::filter([&](const auto& schema) {
717+
return !schema_ids_to_remove.contains(
718+
schema->schema_id().value_or(Schema::kInitialSchemaId));
719+
}) |
720+
std::ranges::to<std::vector<std::shared_ptr<Schema>>>();
721+
impl_->changes.push_back(
722+
std::make_unique<table::RemoveSchemas>(std::move(schema_ids_to_remove)));
723+
}
724+
return *this;
559725
}
560726

561727
TableMetadataBuilder& TableMetadataBuilder::SetDefaultSortOrder(
@@ -595,9 +761,11 @@ Result<int32_t> TableMetadataBuilder::AddSortOrderInternal(const SortOrder& orde
595761
bool is_new_order =
596762
impl_->last_added_order_id.has_value() &&
597763
std::ranges::find_if(impl_->changes, [new_order_id](const auto& change) {
764+
if (change->kind() != TableUpdate::Kind::kAddSortOrder) {
765+
return false;
766+
}
598767
auto* add_sort_order = dynamic_cast<table::AddSortOrder*>(change.get());
599-
return add_sort_order &&
600-
add_sort_order->sort_order()->order_id() == new_order_id;
768+
return add_sort_order->sort_order()->order_id() == new_order_id;
601769
}) != impl_->changes.cend();
602770
impl_->last_added_order_id =
603771
is_new_order ? std::make_optional(new_order_id) : std::nullopt;
@@ -764,6 +932,27 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataBuilder::Build() {
764932
std::chrono::system_clock::now().time_since_epoch())};
765933
}
766934

935+
auto current_schema_id =
936+
impl_->metadata.current_schema_id.value_or(Schema::kInitialSchemaId);
937+
auto schema_it = impl_->schemas_by_id.find(current_schema_id);
938+
ICEBERG_PRECHECK(schema_it != impl_->schemas_by_id.end(),
939+
"Current schema ID {} not found in schemas", current_schema_id);
940+
const auto& current_schema = schema_it->second;
941+
{
942+
auto spec_it = impl_->specs_by_id.find(impl_->metadata.default_spec_id);
943+
// FIXME(GuoTao.yu): Default spec must exist after we support update partition spec
944+
if (spec_it != impl_->specs_by_id.end()) {
945+
ICEBERG_RETURN_UNEXPECTED(
946+
spec_it->second->Validate(*current_schema, /*allow_missing_fields=*/false));
947+
}
948+
auto sort_order_it =
949+
impl_->sort_orders_by_id.find(impl_->metadata.default_sort_order_id);
950+
ICEBERG_PRECHECK(sort_order_it != impl_->sort_orders_by_id.end(),
951+
"Default sort order ID {} not found in sort orders",
952+
impl_->metadata.default_sort_order_id);
953+
ICEBERG_RETURN_UNEXPECTED(sort_order_it->second->Validate(*current_schema));
954+
}
955+
767956
// 4. Buildup metadata_log from base metadata
768957
int32_t max_metadata_log_size =
769958
impl_->metadata.properties.Get(TableProperties::kMetadataPreviousVersionsMax);

0 commit comments

Comments
 (0)