Skip to content

Commit 7b351be

Browse files
committed
1
1 parent c472f3c commit 7b351be

File tree

11 files changed

+794
-113
lines changed

11 files changed

+794
-113
lines changed

src/iceberg/snapshot.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ ICEBERG_EXPORT constexpr Result<SnapshotRefType> SnapshotRefTypeFromString(
6262

6363
/// \brief A reference to a snapshot, either a branch or a tag.
6464
struct ICEBERG_EXPORT SnapshotRef {
65+
static constexpr std::string_view kMainBranch = "main";
66+
6567
struct ICEBERG_EXPORT Branch {
6668
/// A positive number for the minimum number of snapshots to keep in a branch while
6769
/// expiring snapshots. Defaults to table property

src/iceberg/table_metadata.cc

Lines changed: 111 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,12 @@
2121

2222
#include <algorithm>
2323
#include <chrono>
24+
#include <cstdint>
2425
#include <format>
26+
#include <optional>
2527
#include <ranges>
2628
#include <string>
29+
#include <unordered_map>
2730

2831
#include <nlohmann/json.hpp>
2932

@@ -39,12 +42,11 @@
3942
#include "iceberg/util/gzip_internal.h"
4043
#include "iceberg/util/macros.h"
4144
#include "iceberg/util/uuid.h"
42-
4345
namespace iceberg {
44-
4546
namespace {
4647
const TimePointMs kInvalidLastUpdatedMs = TimePointMs::min();
47-
}
48+
constexpr int64_t kLastAdded = -1;
49+
} // namespace
4850

4951
std::string ToString(const SnapshotLogEntry& entry) {
5052
return std::format("SnapshotLogEntry[timestampMillis={},snapshotId={}]",
@@ -274,11 +276,19 @@ struct TableMetadataBuilder::Impl {
274276

275277
// Change tracking
276278
std::vector<std::unique_ptr<TableUpdate>> changes;
279+
std::optional<int32_t> last_added_schema_id;
280+
std::optional<int32_t> last_added_order_id;
281+
std::optional<int32_t> last_added_spec_id;
277282

278283
// Metadata location tracking
279284
std::optional<std::string> metadata_location;
280285
std::optional<std::string> previous_metadata_location;
281286

287+
// indexes for convenience
288+
std::unordered_map<int32_t, std::shared_ptr<Schema>> schemas_by_id;
289+
std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id;
290+
std::unordered_map<int32_t, std::shared_ptr<SortOrder>> sort_orders_by_id;
291+
282292
// Constructor for new table
283293
explicit Impl(int8_t format_version) : base(nullptr), metadata{} {
284294
metadata.format_version = format_version;
@@ -294,7 +304,22 @@ struct TableMetadataBuilder::Impl {
294304

295305
// Constructor from existing metadata
296306
explicit Impl(const TableMetadata* base_metadata)
297-
: base(base_metadata), metadata(*base_metadata) {}
307+
: base(base_metadata), metadata(*base_metadata) {
308+
// Initialize index maps from base metadata
309+
for (const auto& schema : metadata.schemas) {
310+
if (schema->schema_id().has_value()) {
311+
schemas_by_id.emplace(schema->schema_id().value(), schema);
312+
}
313+
}
314+
315+
for (const auto& spec : metadata.partition_specs) {
316+
specs_by_id.emplace(spec->spec_id(), spec);
317+
}
318+
319+
for (const auto& order : metadata.sort_orders) {
320+
sort_orders_by_id.emplace(order->order_id(), order);
321+
}
322+
}
298323
};
299324

300325
TableMetadataBuilder::TableMetadataBuilder(int8_t format_version)
@@ -434,16 +459,95 @@ TableMetadataBuilder& TableMetadataBuilder::RemoveSchemas(
434459

435460
TableMetadataBuilder& TableMetadataBuilder::SetDefaultSortOrder(
436461
std::shared_ptr<SortOrder> order) {
437-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
462+
BUILDER_ASSIGN_OR_RETURN(auto order_id, AddSortOrderInternal(*order));
463+
return SetDefaultSortOrder(order_id);
438464
}
439465

440466
TableMetadataBuilder& TableMetadataBuilder::SetDefaultSortOrder(int32_t order_id) {
441-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
467+
if (order_id == -1) {
468+
if (!impl_->last_added_order_id.has_value()) {
469+
return AddError(ErrorKind::kInvalidArgument,
470+
"Cannot set last added sort order: no sort order has been added");
471+
}
472+
return SetDefaultSortOrder(impl_->last_added_order_id.value());
473+
}
474+
475+
if (order_id == impl_->metadata.default_sort_order_id) {
476+
return *this;
477+
}
478+
479+
impl_->metadata.default_sort_order_id = order_id;
480+
481+
if (impl_->last_added_order_id == std::make_optional(order_id)) {
482+
impl_->changes.push_back(std::make_unique<table::SetDefaultSortOrder>(kLastAdded));
483+
} else {
484+
impl_->changes.push_back(std::make_unique<table::SetDefaultSortOrder>(order_id));
485+
}
486+
return *this;
487+
}
488+
489+
Result<int32_t> TableMetadataBuilder::AddSortOrderInternal(const SortOrder& order) {
490+
int32_t new_order_id = ReuseOrCreateNewSortOrderId(order);
491+
492+
if (impl_->sort_orders_by_id.find(new_order_id) != impl_->sort_orders_by_id.end()) {
493+
// update last_added_order_id if the order was added in this set of changes (since it
494+
// is now the last)
495+
bool is_new_order =
496+
impl_->last_added_order_id.has_value() &&
497+
std::ranges::find_if(impl_->changes, [new_order_id](const auto& change) {
498+
auto* add_sort_order = dynamic_cast<table::AddSortOrder*>(change.get());
499+
return add_sort_order &&
500+
add_sort_order->sort_order()->order_id() == new_order_id;
501+
}) != impl_->changes.cend();
502+
impl_->last_added_order_id =
503+
is_new_order ? std::make_optional(new_order_id) : std::nullopt;
504+
return new_order_id;
505+
}
506+
507+
// Get current schema and validate the sort order against it
508+
ICEBERG_ASSIGN_OR_RAISE(auto schema, impl_->metadata.Schema());
509+
ICEBERG_RETURN_UNEXPECTED(order.Validate(*schema));
510+
511+
std::shared_ptr<SortOrder> new_order;
512+
if (order.is_unsorted()) {
513+
new_order = SortOrder::Unsorted();
514+
} else {
515+
// Unlike freshSortOrder from Java impl, we don't use field name from old bound
516+
// schema to rebuild the sort order.
517+
ICEBERG_ASSIGN_OR_RAISE(
518+
new_order,
519+
SortOrder::Make(new_order_id, std::vector<SortField>(order.fields().begin(),
520+
order.fields().end())));
521+
}
522+
523+
impl_->metadata.sort_orders.push_back(new_order);
524+
impl_->sort_orders_by_id.emplace(new_order_id, new_order);
525+
526+
impl_->changes.push_back(std::make_unique<table::AddSortOrder>(new_order));
527+
impl_->last_added_order_id = new_order_id;
528+
return new_order_id;
442529
}
443530

444531
TableMetadataBuilder& TableMetadataBuilder::AddSortOrder(
445532
std::shared_ptr<SortOrder> order) {
446-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
533+
BUILDER_ASSIGN_OR_RETURN(auto order_id, AddSortOrderInternal(*order));
534+
return *this;
535+
}
536+
537+
int32_t TableMetadataBuilder::ReuseOrCreateNewSortOrderId(const SortOrder& new_order) {
538+
if (new_order.is_unsorted()) {
539+
return SortOrder::kUnsortedOrderId;
540+
}
541+
// determine the next order id
542+
int32_t new_order_id = SortOrder::kInitialSortOrderId;
543+
for (const auto& order : impl_->metadata.sort_orders) {
544+
if (order->SameOrder(new_order)) {
545+
return order->order_id();
546+
} else if (new_order_id <= order->order_id()) {
547+
new_order_id = order->order_id() + 1;
548+
}
549+
}
550+
return new_order_id;
447551
}
448552

449553
TableMetadataBuilder& TableMetadataBuilder::AddSnapshot(

src/iceberg/table_metadata.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,17 @@ class ICEBERG_EXPORT TableMetadataBuilder : public ErrorCollector {
436436
/// \brief Private constructor for building from existing metadata
437437
explicit TableMetadataBuilder(const TableMetadata* base);
438438

439+
/// \brief Internal method to add a sort order and return its ID
440+
/// \param order The sort order to add
441+
/// \return The ID of the added or reused sort order
442+
Result<int32_t> AddSortOrderInternal(const SortOrder& order);
443+
444+
/// \brief Internal method to check for existing sort order and reuse its ID or create a
445+
/// new one
446+
/// \param new_order The sort order to check
447+
/// \return The ID to use for this sort order (reused if exists, new otherwise)
448+
int32_t ReuseOrCreateNewSortOrderId(const SortOrder& new_order);
449+
439450
/// Internal state members
440451
struct Impl;
441452
std::unique_ptr<Impl> impl_;

src/iceberg/table_requirements.cc

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include <memory>
2323

24+
#include "iceberg/snapshot.h"
2425
#include "iceberg/table_metadata.h"
2526
#include "iceberg/table_requirement.h"
2627
#include "iceberg/table_update.h"
@@ -36,12 +37,73 @@ Result<std::vector<std::unique_ptr<TableRequirement>>> TableUpdateContext::Build
3637
return std::move(requirements_);
3738
}
3839

40+
void TableUpdateContext::RequireLastAssignedFieldIdUnchanged() {
41+
if (!added_last_assigned_field_id_) {
42+
if (base_ != nullptr) {
43+
AddRequirement(
44+
std::make_unique<table::AssertLastAssignedFieldId>(base_->last_column_id));
45+
}
46+
added_last_assigned_field_id_ = true;
47+
}
48+
}
49+
50+
void TableUpdateContext::RequireCurrentSchemaIdUnchanged() {
51+
if (!added_current_schema_id_) {
52+
if (base_ != nullptr && !is_replace_) {
53+
AddRequirement(std::make_unique<table::AssertCurrentSchemaID>(
54+
base_->current_schema_id.value()));
55+
}
56+
added_current_schema_id_ = true;
57+
}
58+
}
59+
60+
void TableUpdateContext::RequireLastAssignedPartitionIdUnchanged() {
61+
if (!added_last_assigned_partition_id_) {
62+
if (base_ != nullptr) {
63+
AddRequirement(std::make_unique<table::AssertLastAssignedPartitionId>(
64+
base_->last_partition_id));
65+
}
66+
added_last_assigned_partition_id_ = true;
67+
}
68+
}
69+
70+
void TableUpdateContext::RequireDefaultSpecIdUnchanged() {
71+
if (!added_default_spec_id_) {
72+
if (base_ != nullptr && !is_replace_) {
73+
AddRequirement(
74+
std::make_unique<table::AssertDefaultSpecID>(base_->default_spec_id));
75+
}
76+
added_default_spec_id_ = true;
77+
}
78+
}
79+
80+
void TableUpdateContext::RequireDefaultSortOrderIdUnchanged() {
81+
if (!added_default_sort_order_id_) {
82+
if (base_ != nullptr && !is_replace_) {
83+
AddRequirement(std::make_unique<table::AssertDefaultSortOrderID>(
84+
base_->default_sort_order_id));
85+
}
86+
added_default_sort_order_id_ = true;
87+
}
88+
}
89+
90+
void TableUpdateContext::RequireNoBranchesChanged() {
91+
if (base_ != nullptr && !is_replace_) {
92+
for (const auto& [name, ref] : base_->refs) {
93+
if (ref->type() == SnapshotRefType::kBranch && name != SnapshotRef::kMainBranch) {
94+
AddRequirement(
95+
std::make_unique<table::AssertRefSnapshotID>(name, ref->snapshot_id));
96+
}
97+
}
98+
}
99+
}
100+
39101
Result<std::vector<std::unique_ptr<TableRequirement>>> TableRequirements::ForCreateTable(
40102
const std::vector<std::unique_ptr<TableUpdate>>& table_updates) {
41103
TableUpdateContext context(nullptr, false);
42104
context.AddRequirement(std::make_unique<table::AssertDoesNotExist>());
43105
for (const auto& update : table_updates) {
44-
ICEBERG_RETURN_UNEXPECTED(update->GenerateRequirements(context));
106+
update->GenerateRequirements(context);
45107
}
46108
return context.Build();
47109
}
@@ -52,7 +114,7 @@ Result<std::vector<std::unique_ptr<TableRequirement>>> TableRequirements::ForRep
52114
TableUpdateContext context(&base, true);
53115
context.AddRequirement(std::make_unique<table::AssertUUID>(base.table_uuid));
54116
for (const auto& update : table_updates) {
55-
ICEBERG_RETURN_UNEXPECTED(update->GenerateRequirements(context));
117+
update->GenerateRequirements(context);
56118
}
57119
return context.Build();
58120
}
@@ -63,7 +125,7 @@ Result<std::vector<std::unique_ptr<TableRequirement>>> TableRequirements::ForUpd
63125
TableUpdateContext context(&base, false);
64126
context.AddRequirement(std::make_unique<table::AssertUUID>(base.table_uuid));
65127
for (const auto& update : table_updates) {
66-
ICEBERG_RETURN_UNEXPECTED(update->GenerateRequirements(context));
128+
update->GenerateRequirements(context);
67129
}
68130
return context.Build();
69131
}

src/iceberg/table_requirements.h

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -68,27 +68,19 @@ class ICEBERG_EXPORT TableUpdateContext {
6868
/// \brief Build and return the list of requirements
6969
Result<std::vector<std::unique_ptr<TableRequirement>>> Build();
7070

71-
// Getters for deduplication flags
72-
bool added_last_assigned_field_id() const { return added_last_assigned_field_id_; }
73-
bool added_current_schema_id() const { return added_current_schema_id_; }
74-
bool added_last_assigned_partition_id() const {
75-
return added_last_assigned_partition_id_;
76-
}
77-
bool added_default_spec_id() const { return added_default_spec_id_; }
78-
bool added_default_sort_order_id() const { return added_default_sort_order_id_; }
79-
80-
// Setters for deduplication flags
81-
void set_added_last_assigned_field_id(bool value) {
82-
added_last_assigned_field_id_ = value;
83-
}
84-
void set_added_current_schema_id(bool value) { added_current_schema_id_ = value; }
85-
void set_added_last_assigned_partition_id(bool value) {
86-
added_last_assigned_partition_id_ = value;
87-
}
88-
void set_added_default_spec_id(bool value) { added_default_spec_id_ = value; }
89-
void set_added_default_sort_order_id(bool value) {
90-
added_default_sort_order_id_ = value;
91-
}
71+
// Helper methods to deduplicate requirements to add.
72+
/// \brief Require that the last assigned field ID remains unchanged
73+
void RequireLastAssignedFieldIdUnchanged();
74+
/// \brief Require that the current schema ID remains unchanged
75+
void RequireCurrentSchemaIdUnchanged();
76+
/// \brief Require that the last assigned partition ID remains unchanged
77+
void RequireLastAssignedPartitionIdUnchanged();
78+
/// \brief Require that the default spec ID remains unchanged
79+
void RequireDefaultSpecIdUnchanged();
80+
/// \brief Require that the default sort order ID remains unchanged
81+
void RequireDefaultSortOrderIdUnchanged();
82+
/// \brief Require that no branches have been changed
83+
void RequireNoBranchesChanged();
9284

9385
private:
9486
const TableMetadata* base_;

0 commit comments

Comments
 (0)