Skip to content

Commit 4557a6b

Browse files
authored
feat: add support to update partition spec (#401)
1 parent a6cdea4 commit 4557a6b

22 files changed

+1711
-16
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,9 @@ set(ICEBERG_SOURCES
7676
transform_function.cc
7777
type.cc
7878
update/pending_update.cc
79-
update/update_sort_order.cc
79+
update/update_partition_spec.cc
8080
update/update_properties.cc
81+
update/update_sort_order.cc
8182
util/bucket_util.cc
8283
util/conversions.cc
8384
util/decimal.cc

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ iceberg_sources = files(
9898
'transform_function.cc',
9999
'type.cc',
100100
'update/pending_update.cc',
101+
'update/update_partition_spec.cc',
101102
'update/update_properties.cc',
102103
'update/update_sort_order.cc',
103104
'util/bucket_util.cc',

src/iceberg/partition_spec.cc

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "iceberg/partition_spec.h"
2121

2222
#include <algorithm>
23+
#include <cstddef>
2324
#include <cstdint>
2425
#include <format>
2526
#include <memory>
@@ -95,6 +96,27 @@ Result<std::unique_ptr<StructType>> PartitionSpec::PartitionType(
9596
return std::make_unique<StructType>(std::move(partition_fields));
9697
}
9798

99+
bool PartitionSpec::CompatibleWith(const PartitionSpec& other) const {
100+
if (Equals(other)) {
101+
return true;
102+
}
103+
104+
if (fields_.size() != other.fields_.size()) {
105+
return false;
106+
}
107+
108+
for (const auto& [lhs, rhs] :
109+
std::ranges::zip_view<std::span<const PartitionField>,
110+
std::span<const PartitionField>>{fields_, other.fields_}) {
111+
if (lhs.source_id() != rhs.source_id() || *lhs.transform() != *rhs.transform() ||
112+
lhs.name() != rhs.name()) {
113+
return false;
114+
}
115+
}
116+
117+
return true;
118+
}
119+
98120
std::string PartitionSpec::ToString() const {
99121
std::string repr = std::format("partition_spec[spec_id<{}>,\n", spec_id_);
100122
for (const auto& field : fields_) {
@@ -191,4 +213,13 @@ Result<std::unique_ptr<PartitionSpec>> PartitionSpec::Make(
191213
new PartitionSpec(spec_id, std::move(fields), last_assigned_field_id));
192214
}
193215

216+
bool PartitionSpec::HasSequentialFieldIds(const PartitionSpec& spec) {
217+
for (size_t i = 0; i < spec.fields().size(); i += 1) {
218+
if (spec.fields()[i].field_id() != PartitionSpec::kLegacyPartitionDataIdStart + i) {
219+
return false;
220+
}
221+
}
222+
return true;
223+
}
224+
194225
} // namespace iceberg

src/iceberg/partition_spec.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
6464
/// \brief Get the partition type binding to the input schema.
6565
Result<std::unique_ptr<StructType>> PartitionType(const Schema& schema) const;
6666

67+
/// \brief Returns true if this spec is equivalent to the other, with partition field
68+
/// ids ignored. That is, if both specs have the same number of fields, field order,
69+
/// field name, source columns, and transforms.
70+
bool CompatibleWith(const PartitionSpec& other) const;
71+
6772
std::string ToString() const override;
6873

6974
int32_t last_assigned_field_id() const { return last_assigned_field_id_; }
@@ -111,6 +116,8 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
111116
int32_t spec_id, std::vector<PartitionField> fields,
112117
std::optional<int32_t> last_assigned_field_id = std::nullopt);
113118

119+
static bool HasSequentialFieldIds(const PartitionSpec& spec);
120+
114121
private:
115122
/// \brief Create a new partition spec.
116123
///

src/iceberg/table.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
#include "iceberg/table.h"
2121

22+
#include <memory>
23+
2224
#include "iceberg/catalog.h"
2325
#include "iceberg/partition_spec.h"
2426
#include "iceberg/result.h"
@@ -28,6 +30,7 @@
2830
#include "iceberg/table_properties.h"
2931
#include "iceberg/table_scan.h"
3032
#include "iceberg/transaction.h"
33+
#include "iceberg/update/update_partition_spec.h"
3134
#include "iceberg/update/update_properties.h"
3235
#include "iceberg/util/macros.h"
3336

@@ -147,6 +150,13 @@ Result<std::shared_ptr<Transaction>> Table::NewTransaction() {
147150
/*auto_commit=*/false);
148151
}
149152

153+
Result<std::shared_ptr<UpdatePartitionSpec>> Table::NewUpdatePartitionSpec() {
154+
ICEBERG_ASSIGN_OR_RAISE(
155+
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
156+
/*auto_commit=*/true));
157+
return transaction->NewUpdatePartitionSpec();
158+
}
159+
150160
Result<std::shared_ptr<UpdateProperties>> Table::NewUpdateProperties() {
151161
ICEBERG_ASSIGN_OR_RAISE(
152162
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,

src/iceberg/table.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
128128
/// \brief Create a new Transaction to commit multiple table operations at once.
129129
virtual Result<std::shared_ptr<Transaction>> NewTransaction();
130130

131+
/// \brief Create a new UpdatePartitionSpec to update the partition spec of this table
132+
/// and commit the changes.
133+
virtual Result<std::shared_ptr<UpdatePartitionSpec>> NewUpdatePartitionSpec();
134+
131135
/// \brief Create a new UpdateProperties to update table properties and commit the
132136
/// changes.
133137
virtual Result<std::shared_ptr<UpdateProperties>> NewUpdateProperties();

src/iceberg/table_metadata.cc

Lines changed: 97 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,15 @@
3737
#include "iceberg/exception.h"
3838
#include "iceberg/file_io.h"
3939
#include "iceberg/json_internal.h"
40+
#include "iceberg/partition_field.h"
4041
#include "iceberg/partition_spec.h"
4142
#include "iceberg/result.h"
4243
#include "iceberg/schema.h"
4344
#include "iceberg/snapshot.h"
4445
#include "iceberg/sort_order.h"
4546
#include "iceberg/table_properties.h"
4647
#include "iceberg/table_update.h"
48+
#include "iceberg/util/checked_cast.h"
4749
#include "iceberg/util/error_collector.h"
4850
#include "iceberg/util/gzip_internal.h"
4951
#include "iceberg/util/location_util.h"
@@ -428,7 +430,8 @@ class TableMetadataBuilder::Impl {
428430
Result<int32_t> AddSortOrder(const SortOrder& order);
429431
Status SetProperties(const std::unordered_map<std::string, std::string>& updated);
430432
Status RemoveProperties(const std::unordered_set<std::string>& removed);
431-
433+
Status SetDefaultPartitionSpec(int32_t spec_id);
434+
Result<int32_t> AddPartitionSpec(const PartitionSpec& spec);
432435
std::unique_ptr<TableMetadata> Build();
433436

434437
private:
@@ -438,6 +441,12 @@ class TableMetadataBuilder::Impl {
438441
/// \return The ID to use for this sort order (reused if exists, new otherwise)
439442
int32_t ReuseOrCreateNewSortOrderId(const SortOrder& new_order);
440443

444+
/// \brief Internal method to check for existing partition spec and reuse its ID or
445+
/// create a new one
446+
/// \param new_spec The partition spec to check
447+
/// \return The ID to use for this partition spec (reused if exists, new otherwise)
448+
int32_t ReuseOrCreateNewPartitionSpecId(const PartitionSpec& new_spec);
449+
441450
private:
442451
// Base metadata (nullptr for new tables)
443452
const TableMetadata* base_;
@@ -540,9 +549,10 @@ Result<int32_t> TableMetadataBuilder::Impl::AddSortOrder(const SortOrder& order)
540549
bool is_new_order =
541550
last_added_order_id_.has_value() &&
542551
std::ranges::find_if(changes_, [new_order_id](const auto& change) {
543-
auto* add_sort_order = dynamic_cast<table::AddSortOrder*>(change.get());
544-
return add_sort_order &&
545-
add_sort_order->sort_order()->order_id() == new_order_id;
552+
return change->kind() == TableUpdate::Kind::kAddSortOrder &&
553+
internal::checked_cast<const table::AddSortOrder&>(*change)
554+
.sort_order()
555+
->order_id() == new_order_id;
546556
}) != changes_.cend();
547557
last_added_order_id_ = is_new_order ? std::make_optional(new_order_id) : std::nullopt;
548558
return new_order_id;
@@ -572,6 +582,69 @@ Result<int32_t> TableMetadataBuilder::Impl::AddSortOrder(const SortOrder& order)
572582
return new_order_id;
573583
}
574584

585+
Status TableMetadataBuilder::Impl::SetDefaultPartitionSpec(int32_t spec_id) {
586+
if (spec_id == -1) {
587+
if (!last_added_spec_id_.has_value()) {
588+
return ValidationFailed(
589+
"Cannot set last added partition spec: no partition spec has been added");
590+
}
591+
return SetDefaultPartitionSpec(last_added_spec_id_.value());
592+
}
593+
594+
if (spec_id == metadata_.default_spec_id) {
595+
// the new spec is already current and no change is needed
596+
return {};
597+
}
598+
599+
metadata_.default_spec_id = spec_id;
600+
if (last_added_spec_id_ == std::make_optional(spec_id)) {
601+
changes_.push_back(std::make_unique<table::SetDefaultPartitionSpec>(kLastAdded));
602+
} else {
603+
changes_.push_back(std::make_unique<table::SetDefaultPartitionSpec>(spec_id));
604+
}
605+
return {};
606+
}
607+
608+
Result<int32_t> TableMetadataBuilder::Impl::AddPartitionSpec(const PartitionSpec& spec) {
609+
int32_t new_spec_id = ReuseOrCreateNewPartitionSpecId(spec);
610+
611+
if (specs_by_id_.contains(new_spec_id)) {
612+
// update last_added_spec_id if the spec was added in this set of changes (since it
613+
// is now the last)
614+
bool is_new_spec =
615+
last_added_spec_id_.has_value() &&
616+
std::ranges::find_if(changes_, [new_spec_id](const auto& change) {
617+
return change->kind() == TableUpdate::Kind::kAddPartitionSpec &&
618+
internal::checked_cast<const table::AddPartitionSpec&>(*change)
619+
.spec()
620+
->spec_id() == new_spec_id;
621+
}) != changes_.cend();
622+
last_added_spec_id_ = is_new_spec ? std::make_optional(new_spec_id) : std::nullopt;
623+
return new_spec_id;
624+
}
625+
626+
// Get current schema and validate the partition spec against it
627+
ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata_.Schema());
628+
ICEBERG_RETURN_UNEXPECTED(spec.Validate(*schema, /*allow_missing_fields=*/false));
629+
ICEBERG_CHECK(
630+
metadata_.format_version > 1 || PartitionSpec::HasSequentialFieldIds(spec),
631+
"Spec does not use sequential IDs that are required in v1: {}", spec.ToString());
632+
633+
ICEBERG_ASSIGN_OR_RAISE(
634+
std::shared_ptr<PartitionSpec> new_spec,
635+
PartitionSpec::Make(new_spec_id, std::vector<PartitionField>(spec.fields().begin(),
636+
spec.fields().end())));
637+
metadata_.last_partition_id =
638+
std::max(metadata_.last_partition_id, new_spec->last_assigned_field_id());
639+
metadata_.partition_specs.push_back(new_spec);
640+
specs_by_id_.emplace(new_spec_id, new_spec);
641+
642+
changes_.push_back(std::make_unique<table::AddPartitionSpec>(new_spec));
643+
last_added_spec_id_ = new_spec_id;
644+
645+
return new_spec_id;
646+
}
647+
575648
Status TableMetadataBuilder::Impl::SetProperties(
576649
const std::unordered_map<std::string, std::string>& updated) {
577650
// If updated is empty, return early (no-op)
@@ -653,6 +726,20 @@ int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSortOrderId(
653726
return new_order_id;
654727
}
655728

729+
int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewPartitionSpecId(
730+
const PartitionSpec& new_spec) {
731+
// if the spec already exists, use the same ID. otherwise, use the highest ID + 1.
732+
int32_t new_spec_id = PartitionSpec::kInitialSpecId;
733+
for (const auto& spec : metadata_.partition_specs) {
734+
if (new_spec.CompatibleWith(*spec)) {
735+
return spec->spec_id();
736+
} else if (new_spec_id <= spec->spec_id()) {
737+
new_spec_id = spec->spec_id() + 1;
738+
}
739+
}
740+
return new_spec_id;
741+
}
742+
656743
TableMetadataBuilder::TableMetadataBuilder(int8_t format_version)
657744
: impl_(std::make_unique<Impl>(format_version)) {}
658745

@@ -723,16 +810,19 @@ TableMetadataBuilder& TableMetadataBuilder::AddSchema(std::shared_ptr<Schema> sc
723810

724811
TableMetadataBuilder& TableMetadataBuilder::SetDefaultPartitionSpec(
725812
std::shared_ptr<PartitionSpec> spec) {
726-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
813+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto spec_id, impl_->AddPartitionSpec(*spec));
814+
return SetDefaultPartitionSpec(spec_id);
727815
}
728816

729817
TableMetadataBuilder& TableMetadataBuilder::SetDefaultPartitionSpec(int32_t spec_id) {
730-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
818+
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetDefaultPartitionSpec(spec_id));
819+
return *this;
731820
}
732821

733822
TableMetadataBuilder& TableMetadataBuilder::AddPartitionSpec(
734823
std::shared_ptr<PartitionSpec> spec) {
735-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
824+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto spec_id, impl_->AddPartitionSpec(*spec));
825+
return *this;
736826
}
737827

738828
TableMetadataBuilder& TableMetadataBuilder::RemovePartitionSpecs(

src/iceberg/table_update.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ void SetCurrentSchema::GenerateRequirements(TableUpdateContext& context) const {
7272
// AddPartitionSpec
7373

7474
void AddPartitionSpec::ApplyTo(TableMetadataBuilder& builder) const {
75-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
75+
builder.AddPartitionSpec(spec_);
7676
}
7777

7878
void AddPartitionSpec::GenerateRequirements(TableUpdateContext& context) const {
@@ -82,7 +82,7 @@ void AddPartitionSpec::GenerateRequirements(TableUpdateContext& context) const {
8282
// SetDefaultPartitionSpec
8383

8484
void SetDefaultPartitionSpec::ApplyTo(TableMetadataBuilder& builder) const {
85-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
85+
builder.SetDefaultPartitionSpec(spec_id_);
8686
}
8787

8888
void SetDefaultPartitionSpec::GenerateRequirements(TableUpdateContext& context) const {

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ if(ICEBERG_BUILD_BUNDLE)
156156
USE_BUNDLE
157157
SOURCES
158158
transaction_test.cc
159+
update_partition_spec_test.cc
159160
update_properties_test.cc
160161
update_sort_order_test.cc)
161162

0 commit comments

Comments
 (0)