Skip to content

Commit 2e545cd

Browse files
committed
feat:implement AddSortOrder for table metadata
1 parent 52b9c5e commit 2e545cd

File tree

5 files changed

+269
-13
lines changed

5 files changed

+269
-13
lines changed

src/iceberg/table_metadata.cc

Lines changed: 105 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@
2121

2222
#include <algorithm>
2323
#include <chrono>
24+
#include <cstdint>
2425
#include <format>
26+
#include <optional>
2527
#include <string>
28+
#include <unordered_map>
2629

2730
#include <nlohmann/json.hpp>
2831

@@ -216,6 +219,9 @@ struct TableMetadataBuilder::Impl {
216219

217220
// Change tracking
218221
std::vector<std::unique_ptr<TableUpdate>> changes;
222+
std::optional<int32_t> last_added_schema_id;
223+
std::optional<int32_t> last_added_order_id;
224+
std::optional<int32_t> last_added_spec_id;
219225

220226
// Error collection (since methods return *this and cannot throw)
221227
std::vector<Error> errors;
@@ -224,6 +230,11 @@ struct TableMetadataBuilder::Impl {
224230
std::optional<std::string> metadata_location;
225231
std::optional<std::string> previous_metadata_location;
226232

233+
// indexes for convenience
234+
std::unordered_map<int32_t, std::shared_ptr<Schema>> schemas_by_id;
235+
std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id;
236+
std::unordered_map<int32_t, std::shared_ptr<SortOrder>> sort_orders_by_id;
237+
227238
// Constructor for new table
228239
explicit Impl(int8_t format_version) : base(nullptr), metadata{} {
229240
metadata.format_version = format_version;
@@ -239,7 +250,38 @@ struct TableMetadataBuilder::Impl {
239250

240251
// Constructor from existing metadata
241252
explicit Impl(const TableMetadata* base_metadata)
242-
: base(base_metadata), metadata(*base_metadata) {}
253+
: base(base_metadata), metadata(*base_metadata) {
254+
// Initialize index maps from base metadata
255+
for (const auto& schema : metadata.schemas) {
256+
if (schema->schema_id().has_value()) {
257+
schemas_by_id.emplace(schema->schema_id().value(), schema);
258+
}
259+
}
260+
261+
for (const auto& spec : metadata.partition_specs) {
262+
specs_by_id.emplace(spec->spec_id(), spec);
263+
}
264+
265+
for (const auto& order : metadata.sort_orders) {
266+
sort_orders_by_id.emplace(order->order_id(), order);
267+
}
268+
}
269+
270+
int32_t reuseOrCreateNewSortOrderId(const SortOrder& new_order) {
271+
if (new_order.is_unsorted()) {
272+
return SortOrder::kUnsortedOrderId;
273+
}
274+
// determine the next order id
275+
int32_t new_order_id = SortOrder::kInitialSortOrderId;
276+
for (const auto& order : metadata.sort_orders) {
277+
if (order->SameOrder(new_order)) {
278+
return order->order_id();
279+
} else if (new_order_id <= order->order_id()) {
280+
new_order_id = order->order_id() + 1;
281+
}
282+
}
283+
return new_order_id;
284+
}
243285
};
244286

245287
TableMetadataBuilder::TableMetadataBuilder(int8_t format_version)
@@ -361,7 +403,68 @@ TableMetadataBuilder& TableMetadataBuilder::SetDefaultSortOrder(int32_t order_id
361403

362404
TableMetadataBuilder& TableMetadataBuilder::AddSortOrder(
363405
std::shared_ptr<SortOrder> order) {
364-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
406+
int32_t new_order_id = impl_->reuseOrCreateNewSortOrderId(*order);
407+
408+
if (impl_->sort_orders_by_id.find(new_order_id) != impl_->sort_orders_by_id.end()) {
409+
// update last_added_order_id if the order was added in this set of changes (since it
410+
// is now the last)
411+
bool is_new_order = false;
412+
for (const auto& change : impl_->changes) {
413+
auto* add_sort_order = dynamic_cast<table::AddSortOrder*>(change.get());
414+
if (add_sort_order && add_sort_order->sort_order()->order_id() == new_order_id) {
415+
is_new_order = true;
416+
break;
417+
}
418+
}
419+
impl_->last_added_order_id =
420+
is_new_order ? std::make_optional(new_order_id) : std::nullopt;
421+
return *this;
422+
}
423+
424+
// Get current schema for validation
425+
auto schema_result = impl_->metadata.Schema();
426+
if (!schema_result) {
427+
impl_->errors.emplace_back(
428+
ErrorKind::kInvalidArgument,
429+
std::format("Cannot find current schema: {}", schema_result.error().message));
430+
return *this;
431+
}
432+
433+
auto schema = schema_result.value();
434+
435+
// Validate the sort order against the schema
436+
auto validate_status = order->Validate(*schema);
437+
if (!validate_status) {
438+
impl_->errors.emplace_back(
439+
ErrorKind::kInvalidArgument,
440+
std::format("Sort order validation failed: {}", validate_status.error().message));
441+
return *this;
442+
}
443+
444+
std::shared_ptr<SortOrder> new_order;
445+
if (order->is_unsorted()) {
446+
new_order = SortOrder::Unsorted();
447+
} else {
448+
// TODO(Li Feiyang): rebuild the sort order using new column ids (freshSortOrder)
449+
// For now, create a new sort order with the provided fields
450+
auto sort_order_result = SortOrder::Make(
451+
*schema, new_order_id,
452+
std::vector<SortField>(order->fields().begin(), order->fields().end()));
453+
if (!sort_order_result) {
454+
impl_->errors.emplace_back(ErrorKind::kInvalidArgument,
455+
std::format("Failed to create sort order: {}",
456+
sort_order_result.error().message));
457+
return *this;
458+
}
459+
new_order = std::move(sort_order_result.value());
460+
}
461+
462+
impl_->metadata.sort_orders.push_back(new_order);
463+
impl_->sort_orders_by_id.emplace(new_order_id, new_order);
464+
465+
impl_->changes.push_back(std::make_unique<table::AddSortOrder>(new_order));
466+
impl_->last_added_order_id = new_order_id;
467+
return *this;
365468
}
366469

367470
TableMetadataBuilder& TableMetadataBuilder::AddSnapshot(

src/iceberg/table_update.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,12 @@ Status RemoveSchemas::GenerateRequirements(TableUpdateContext& context) const {
124124
// AddSortOrder
125125

126126
void AddSortOrder::ApplyTo(TableMetadataBuilder& builder) const {
127-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
127+
builder.AddSortOrder(sort_order_);
128128
}
129129

130130
Status AddSortOrder::GenerateRequirements(TableUpdateContext& context) const {
131-
return NotImplemented("AddTableSortOrder::GenerateRequirements not implemented");
131+
// AddSortOrder doesn't generate any requirements
132+
return {};
132133
}
133134

134135
// SetDefaultSortOrder

src/iceberg/test/table_metadata_builder_test.cc

Lines changed: 126 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,33 @@
1919

2020
#include <memory>
2121
#include <string>
22+
#include <vector>
2223

2324
#include <gtest/gtest.h>
2425

2526
#include "iceberg/partition_spec.h"
27+
#include "iceberg/schema.h"
2628
#include "iceberg/snapshot.h"
29+
#include "iceberg/sort_field.h"
2730
#include "iceberg/sort_order.h"
2831
#include "iceberg/table_metadata.h"
2932
#include "iceberg/table_update.h"
3033
#include "iceberg/test/matchers.h"
34+
#include "iceberg/transform.h"
35+
#include "iceberg/type.h"
3136

3237
namespace iceberg {
3338

3439
namespace {
3540

41+
// Helper function to create a simple schema for testing
42+
std::shared_ptr<Schema> CreateTestSchema() {
43+
auto field1 = SchemaField::MakeRequired(1, "id", int32());
44+
auto field2 = SchemaField::MakeRequired(2, "data", string());
45+
auto field3 = SchemaField::MakeRequired(3, "ts", timestamp());
46+
return std::make_shared<Schema>(std::vector<SchemaField>{field1, field2, field3}, 0);
47+
}
48+
3649
// Helper function to create base metadata for tests
3750
std::unique_ptr<TableMetadata> CreateBaseMetadata() {
3851
auto metadata = std::make_unique<TableMetadata>();
@@ -41,11 +54,14 @@ std::unique_ptr<TableMetadata> CreateBaseMetadata() {
4154
metadata->location = "s3://bucket/test";
4255
metadata->last_sequence_number = 0;
4356
metadata->last_updated_ms = TimePointMs{std::chrono::milliseconds(1000)};
44-
metadata->last_column_id = 0;
57+
metadata->last_column_id = 3;
58+
metadata->current_schema_id = 0;
59+
metadata->schemas.push_back(CreateTestSchema());
4560
metadata->default_spec_id = PartitionSpec::kInitialSpecId;
4661
metadata->last_partition_id = 0;
4762
metadata->current_snapshot_id = Snapshot::kInvalidSnapshotId;
4863
metadata->default_sort_order_id = SortOrder::kInitialSortOrderId;
64+
metadata->sort_orders.push_back(SortOrder::Unsorted());
4965
metadata->next_row_id = TableMetadata::kInitialRowId;
5066
return metadata;
5167
}
@@ -124,17 +140,121 @@ TEST(TableMetadataBuilderTest, AssignUUID) {
124140
EXPECT_EQ(metadata->table_uuid, "TEST-UUID-ABCD"); // Original case preserved
125141
}
126142

143+
// Test AddSortOrder
144+
TEST(TableMetadataBuilderTest, AddSortOrderBasic) {
145+
auto base = CreateBaseMetadata();
146+
auto builder = TableMetadataBuilder::BuildFrom(base.get());
147+
auto schema = CreateTestSchema();
148+
149+
// 1. Add unsorted - should reuse existing unsorted order
150+
builder->AddSortOrder(SortOrder::Unsorted());
151+
ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
152+
ASSERT_EQ(metadata->sort_orders.size(), 1);
153+
EXPECT_TRUE(metadata->sort_orders[0]->is_unsorted());
154+
155+
// 2. Add basic sort order
156+
builder = TableMetadataBuilder::BuildFrom(base.get());
157+
SortField field1(1, Transform::Identity(), SortDirection::kAscending,
158+
NullOrder::kFirst);
159+
ICEBERG_UNWRAP_OR_FAIL(auto order1,
160+
SortOrder::Make(*schema, 1, std::vector<SortField>{field1}));
161+
builder->AddSortOrder(std::move(order1));
162+
ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
163+
ASSERT_EQ(metadata->sort_orders.size(), 2);
164+
EXPECT_EQ(metadata->sort_orders[1]->order_id(), 1);
165+
166+
// 3. Add duplicate - should be idempotent
167+
builder = TableMetadataBuilder::BuildFrom(base.get());
168+
ICEBERG_UNWRAP_OR_FAIL(auto order2,
169+
SortOrder::Make(*schema, 1, std::vector<SortField>{field1}));
170+
ICEBERG_UNWRAP_OR_FAIL(auto order3,
171+
SortOrder::Make(*schema, 1, std::vector<SortField>{field1}));
172+
builder->AddSortOrder(std::move(order2));
173+
builder->AddSortOrder(std::move(order3)); // Duplicate
174+
ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
175+
ASSERT_EQ(metadata->sort_orders.size(), 2); // Only one added
176+
177+
// 4. Add multiple different orders + verify ID reassignment
178+
builder = TableMetadataBuilder::BuildFrom(base.get());
179+
SortField field2(2, Transform::Identity(), SortDirection::kDescending,
180+
NullOrder::kLast);
181+
// User provides ID=99, Builder should reassign to ID=1
182+
ICEBERG_UNWRAP_OR_FAIL(auto order4,
183+
SortOrder::Make(*schema, 99, std::vector<SortField>{field1}));
184+
ICEBERG_UNWRAP_OR_FAIL(
185+
auto order5, SortOrder::Make(*schema, 2, std::vector<SortField>{field1, field2}));
186+
builder->AddSortOrder(std::move(order4));
187+
builder->AddSortOrder(std::move(order5));
188+
ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
189+
ASSERT_EQ(metadata->sort_orders.size(), 3);
190+
EXPECT_EQ(metadata->sort_orders[1]->order_id(), 1); // Reassigned from 99
191+
EXPECT_EQ(metadata->sort_orders[2]->order_id(), 2);
192+
}
193+
194+
TEST(TableMetadataBuilderTest, AddSortOrderInvalid) {
195+
auto base = CreateBaseMetadata();
196+
auto schema = CreateTestSchema();
197+
198+
// 1. Invalid field ID
199+
auto builder = TableMetadataBuilder::BuildFrom(base.get());
200+
SortField invalid_field(999, Transform::Identity(), SortDirection::kAscending,
201+
NullOrder::kFirst);
202+
ICEBERG_UNWRAP_OR_FAIL(auto order1,
203+
SortOrder::Make(1, std::vector<SortField>{invalid_field}));
204+
builder->AddSortOrder(std::move(order1));
205+
ASSERT_THAT(builder->Build(), IsError(ErrorKind::kCommitFailed));
206+
ASSERT_THAT(builder->Build(),
207+
HasErrorMessage("Cannot find source column for sort field"));
208+
209+
// 2. Invalid transform (Day transform on string type)
210+
builder = TableMetadataBuilder::BuildFrom(base.get());
211+
SortField invalid_transform(2, Transform::Day(), SortDirection::kAscending,
212+
NullOrder::kFirst);
213+
ICEBERG_UNWRAP_OR_FAIL(auto order2,
214+
SortOrder::Make(1, std::vector<SortField>{invalid_transform}));
215+
builder->AddSortOrder(std::move(order2));
216+
ASSERT_THAT(builder->Build(), IsError(ErrorKind::kCommitFailed));
217+
ASSERT_THAT(builder->Build(), HasErrorMessage("Invalid source type"));
218+
219+
// 3. Without schema
220+
builder = TableMetadataBuilder::BuildFromEmpty(2);
221+
builder->AssignUUID("test-uuid");
222+
SortField field(1, Transform::Identity(), SortDirection::kAscending, NullOrder::kFirst);
223+
ICEBERG_UNWRAP_OR_FAIL(auto order3,
224+
SortOrder::Make(*schema, 1, std::vector<SortField>{field}));
225+
builder->AddSortOrder(std::move(order3));
226+
ASSERT_THAT(builder->Build(), IsError(ErrorKind::kCommitFailed));
227+
ASSERT_THAT(builder->Build(), HasErrorMessage("Cannot find current schema"));
228+
}
229+
127230
// Test applying TableUpdate to builder
128231
TEST(TableMetadataBuilderTest, ApplyUpdate) {
232+
// Use base metadata with schema for all update tests
233+
auto base = CreateBaseMetadata();
234+
auto builder = TableMetadataBuilder::BuildFrom(base.get());
235+
129236
// Apply AssignUUID update
130-
auto builder = TableMetadataBuilder::BuildFromEmpty(2);
131-
table::AssignUUID update("apply-uuid");
132-
update.ApplyTo(*builder);
133-
// TODO(Li Feiyang): Add more update and `apply` once other build methods are
134-
// implemented
237+
table::AssignUUID uuid_update("apply-uuid");
238+
uuid_update.ApplyTo(*builder);
239+
240+
// Apply AddSortOrder update
241+
auto schema = CreateTestSchema();
242+
SortField sort_field(1, Transform::Identity(), SortDirection::kAscending,
243+
NullOrder::kFirst);
244+
ICEBERG_UNWRAP_OR_FAIL(auto sort_order_unique,
245+
SortOrder::Make(*schema, 1, std::vector<SortField>{sort_field}));
246+
auto sort_order = std::shared_ptr<SortOrder>(std::move(sort_order_unique));
247+
248+
table::AddSortOrder sort_order_update(sort_order);
249+
sort_order_update.ApplyTo(*builder);
250+
251+
// TODO(Li Feiyang): Apply more build methods once they are implemented
135252

253+
// Verify all updates were applied
136254
ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
137255
EXPECT_EQ(metadata->table_uuid, "apply-uuid");
256+
ASSERT_EQ(metadata->sort_orders.size(), 2);
257+
EXPECT_EQ(metadata->sort_orders[1]->order_id(), 1);
138258
}
139259

140260
} // namespace iceberg

0 commit comments

Comments
 (0)