Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 111 additions & 7 deletions src/iceberg/table_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@

#include <algorithm>
#include <chrono>
#include <cstdint>
#include <format>
#include <optional>
#include <ranges>
#include <string>
#include <unordered_map>

#include <nlohmann/json.hpp>

Expand All @@ -39,12 +42,11 @@
#include "iceberg/util/gzip_internal.h"
#include "iceberg/util/macros.h"
#include "iceberg/util/uuid.h"

namespace iceberg {

namespace {
const TimePointMs kInvalidLastUpdatedMs = TimePointMs::min();
}
constexpr int32_t kLastAdded = -1;
} // namespace

std::string ToString(const SnapshotLogEntry& entry) {
return std::format("SnapshotLogEntry[timestampMillis={},snapshotId={}]",
Expand Down Expand Up @@ -274,11 +276,19 @@ struct TableMetadataBuilder::Impl {

// Change tracking
std::vector<std::unique_ptr<TableUpdate>> changes;
std::optional<int32_t> last_added_schema_id;
std::optional<int32_t> last_added_order_id;
std::optional<int32_t> last_added_spec_id;

// Metadata location tracking
std::optional<std::string> metadata_location;
std::optional<std::string> previous_metadata_location;

// indexes for convenience
std::unordered_map<int32_t, std::shared_ptr<Schema>> schemas_by_id;
std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id;
std::unordered_map<int32_t, std::shared_ptr<SortOrder>> sort_orders_by_id;

// Constructor for new table
explicit Impl(int8_t format_version) : base(nullptr), metadata{} {
metadata.format_version = format_version;
Expand All @@ -294,7 +304,22 @@ struct TableMetadataBuilder::Impl {

// Constructor from existing metadata
explicit Impl(const TableMetadata* base_metadata)
: base(base_metadata), metadata(*base_metadata) {}
: base(base_metadata), metadata(*base_metadata) {
// Initialize index maps from base metadata
for (const auto& schema : metadata.schemas) {
if (schema->schema_id().has_value()) {
schemas_by_id.emplace(schema->schema_id().value(), schema);
}
}

for (const auto& spec : metadata.partition_specs) {
specs_by_id.emplace(spec->spec_id(), spec);
}

for (const auto& order : metadata.sort_orders) {
sort_orders_by_id.emplace(order->order_id(), order);
}
}
};

TableMetadataBuilder::TableMetadataBuilder(int8_t format_version)
Expand Down Expand Up @@ -434,16 +459,95 @@ TableMetadataBuilder& TableMetadataBuilder::RemoveSchemas(

TableMetadataBuilder& TableMetadataBuilder::SetDefaultSortOrder(
std::shared_ptr<SortOrder> order) {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
BUILDER_ASSIGN_OR_RETURN(auto order_id, AddSortOrderInternal(*order));
return SetDefaultSortOrder(order_id);
}

TableMetadataBuilder& TableMetadataBuilder::SetDefaultSortOrder(int32_t order_id) {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
if (order_id == -1) {
if (!impl_->last_added_order_id.has_value()) {
return AddError(ErrorKind::kInvalidArgument,
"Cannot set last added sort order: no sort order has been added");
}
return SetDefaultSortOrder(impl_->last_added_order_id.value());
}

if (order_id == impl_->metadata.default_sort_order_id) {
return *this;
}

impl_->metadata.default_sort_order_id = order_id;

if (impl_->last_added_order_id == std::make_optional(order_id)) {
impl_->changes.push_back(std::make_unique<table::SetDefaultSortOrder>(kLastAdded));
} else {
impl_->changes.push_back(std::make_unique<table::SetDefaultSortOrder>(order_id));
}
return *this;
}

Result<int32_t> TableMetadataBuilder::AddSortOrderInternal(const SortOrder& order) {
int32_t new_order_id = ReuseOrCreateNewSortOrderId(order);

if (impl_->sort_orders_by_id.find(new_order_id) != impl_->sort_orders_by_id.end()) {
// update last_added_order_id if the order was added in this set of changes (since it
// is now the last)
bool is_new_order =
impl_->last_added_order_id.has_value() &&
std::ranges::find_if(impl_->changes, [new_order_id](const auto& change) {
auto* add_sort_order = dynamic_cast<table::AddSortOrder*>(change.get());
return add_sort_order &&
add_sort_order->sort_order()->order_id() == new_order_id;
}) != impl_->changes.cend();
impl_->last_added_order_id =
is_new_order ? std::make_optional(new_order_id) : std::nullopt;
return new_order_id;
}

// Get current schema and validate the sort order against it
ICEBERG_ASSIGN_OR_RAISE(auto schema, impl_->metadata.Schema());
ICEBERG_RETURN_UNEXPECTED(order.Validate(*schema));

std::shared_ptr<SortOrder> new_order;
if (order.is_unsorted()) {
new_order = SortOrder::Unsorted();
} else {
// Unlike freshSortOrder from Java impl, we don't use field name from old bound
// schema to rebuild the sort order.
ICEBERG_ASSIGN_OR_RAISE(
new_order,
SortOrder::Make(new_order_id, std::vector<SortField>(order.fields().begin(),
order.fields().end())));
}

impl_->metadata.sort_orders.push_back(new_order);
impl_->sort_orders_by_id.emplace(new_order_id, new_order);

impl_->changes.push_back(std::make_unique<table::AddSortOrder>(new_order));
impl_->last_added_order_id = new_order_id;
return new_order_id;
}

TableMetadataBuilder& TableMetadataBuilder::AddSortOrder(
std::shared_ptr<SortOrder> order) {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
BUILDER_ASSIGN_OR_RETURN(auto order_id, AddSortOrderInternal(*order));
return *this;
}

int32_t TableMetadataBuilder::ReuseOrCreateNewSortOrderId(const SortOrder& new_order) {
if (new_order.is_unsorted()) {
return SortOrder::kUnsortedOrderId;
}
// determine the next order id
int32_t new_order_id = SortOrder::kInitialSortOrderId;
for (const auto& order : impl_->metadata.sort_orders) {
if (order->SameOrder(new_order)) {
return order->order_id();
} else if (new_order_id <= order->order_id()) {
new_order_id = order->order_id() + 1;
}
}
return new_order_id;
}

TableMetadataBuilder& TableMetadataBuilder::AddSnapshot(
Expand Down
11 changes: 11 additions & 0 deletions src/iceberg/table_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,17 @@ class ICEBERG_EXPORT TableMetadataBuilder : public ErrorCollector {
/// \brief Private constructor for building from existing metadata
explicit TableMetadataBuilder(const TableMetadata* base);

/// \brief Internal method to add a sort order and return its ID
/// \param order The sort order to add
/// \return The ID of the added or reused sort order
Result<int32_t> AddSortOrderInternal(const SortOrder& order);

/// \brief Internal method to check for existing sort order and reuse its ID or create a
/// new one
/// \param new_order The sort order to check
/// \return The ID to use for this sort order (reused if exists, new otherwise)
int32_t ReuseOrCreateNewSortOrderId(const SortOrder& new_order);

/// Internal state members
struct Impl;
std::unique_ptr<Impl> impl_;
Expand Down
Loading
Loading