diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 9c25015c3..0579c67d2 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -83,6 +83,7 @@ set(ICEBERG_SOURCES util/decimal.cc util/gzip_internal.cc util/murmurhash3_internal.cc + util/snapshot_util.cc util/temporal_util.cc util/timepoint.cc util/truncate_util.cc diff --git a/src/iceberg/avro/avro_stream_internal.cc b/src/iceberg/avro/avro_stream_internal.cc index f299b5233..e868bab4b 100644 --- a/src/iceberg/avro/avro_stream_internal.cc +++ b/src/iceberg/avro/avro_stream_internal.cc @@ -66,8 +66,9 @@ bool AvroInputStream::next(const uint8_t** data, size_t* len) { } void AvroInputStream::backup(size_t len) { - ICEBERG_CHECK(len <= buffer_pos_, "Cannot backup {} bytes, only {} bytes available", - len, buffer_pos_); + ICEBERG_CHECK_OR_DIE(len <= buffer_pos_, + "Cannot backup {} bytes, only {} bytes available", len, + buffer_pos_); buffer_pos_ -= len; byte_count_ -= len; @@ -88,7 +89,8 @@ size_t AvroInputStream::byteCount() const { return byte_count_; } void AvroInputStream::seek(int64_t position) { auto status = input_stream_->Seek(position); - ICEBERG_CHECK(status.ok(), "Failed to seek to {}, got {}", position, status.ToString()); + ICEBERG_CHECK_OR_DIE(status.ok(), "Failed to seek to {}, got {}", position, + status.ToString()); buffer_pos_ = 0; available_bytes_ = 0; @@ -116,8 +118,9 @@ bool AvroOutputStream::next(uint8_t** data, size_t* len) { } void AvroOutputStream::backup(size_t len) { - ICEBERG_CHECK(len <= buffer_pos_, "Cannot backup {} bytes, only {} bytes available", - len, buffer_pos_); + ICEBERG_CHECK_OR_DIE(len <= buffer_pos_, + "Cannot backup {} bytes, only {} bytes available", len, + buffer_pos_); buffer_pos_ -= len; } @@ -126,12 +129,12 @@ uint64_t AvroOutputStream::byteCount() const { return flushed_bytes_ + buffer_po void AvroOutputStream::flush() { if (buffer_pos_ > 0) { auto status = output_stream_->Write(buffer_.data(), buffer_pos_); - ICEBERG_CHECK(status.ok(), "Write failed {}", status.ToString()); + ICEBERG_CHECK_OR_DIE(status.ok(), "Write failed {}", status.ToString()); flushed_bytes_ += buffer_pos_; buffer_pos_ = 0; } auto status = output_stream_->Flush(); - ICEBERG_CHECK(status.ok(), "Flush failed {}", status.ToString()); + ICEBERG_CHECK_OR_DIE(status.ok(), "Flush failed {}", status.ToString()); } const std::shared_ptr<::arrow::io::OutputStream>& AvroOutputStream::arrow_output_stream() diff --git a/src/iceberg/exception.h b/src/iceberg/exception.h index bbb9c2283..0333eb12a 100644 --- a/src/iceberg/exception.h +++ b/src/iceberg/exception.h @@ -44,7 +44,7 @@ class ICEBERG_EXPORT ExpressionError : public IcebergError { explicit ExpressionError(const std::string& what) : IcebergError(what) {} }; -#define ICEBERG_CHECK(condition, ...) \ +#define ICEBERG_CHECK_OR_DIE(condition, ...) \ do { \ if (!(condition)) [[unlikely]] { \ throw iceberg::IcebergError(std::format(__VA_ARGS__)); \ diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 21a41dcf2..850f65905 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -105,6 +105,7 @@ iceberg_sources = files( 'util/decimal.cc', 'util/gzip_internal.cc', 'util/murmurhash3_internal.cc', + 'util/snapshot_util.cc', 'util/temporal_util.cc', 'util/timepoint.cc', 'util/truncate_util.cc', diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index e787b6fd2..71b3fa302 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -69,9 +69,9 @@ Result> TableMetadata::Schema() const { } Result> TableMetadata::SchemaById( - const std::optional& schema_id) const { + std::optional schema_id) const { auto iter = std::ranges::find_if(schemas, [schema_id](const auto& schema) { - return schema->schema_id() == schema_id; + return schema != nullptr && schema->schema_id() == schema_id; }); if (iter == schemas.end()) { return NotFound("Schema with ID {} is not found", schema_id.value_or(-1)); @@ -81,7 +81,7 @@ Result> TableMetadata::SchemaById( Result> TableMetadata::PartitionSpec() const { auto iter = std::ranges::find_if(partition_specs, [this](const auto& spec) { - return spec->spec_id() == default_spec_id; + return spec != nullptr && spec->spec_id() == default_spec_id; }); if (iter == partition_specs.end()) { return NotFound("Default partition spec is not found"); @@ -91,7 +91,7 @@ Result> TableMetadata::PartitionSpec() const { Result> TableMetadata::SortOrder() const { auto iter = std::ranges::find_if(sort_orders, [this](const auto& order) { - return order->order_id() == default_sort_order_id; + return order != nullptr && order->order_id() == default_sort_order_id; }); if (iter == sort_orders.end()) { return NotFound("Default sort order is not found"); @@ -105,7 +105,7 @@ Result> TableMetadata::Snapshot() const { Result> TableMetadata::SnapshotById(int64_t snapshot_id) const { auto iter = std::ranges::find_if(snapshots, [snapshot_id](const auto& snapshot) { - return snapshot->snapshot_id == snapshot_id; + return snapshot != nullptr && snapshot->snapshot_id == snapshot_id; }); if (iter == snapshots.end()) { return NotFound("Snapshot with ID {} is not found", snapshot_id); diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h index f7c260114..3f2f36101 100644 --- a/src/iceberg/table_metadata.h +++ b/src/iceberg/table_metadata.h @@ -128,7 +128,7 @@ struct ICEBERG_EXPORT TableMetadata { Result> Schema() const; /// \brief Get the current schema by ID, return NotFoundError if not found Result> SchemaById( - const std::optional& schema_id) const; + std::optional schema_id) const; /// \brief Get the current partition spec, return NotFoundError if not found Result> PartitionSpec() const; /// \brief Get the current sort order, return NotFoundError if not found diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 2af7d1c4e..28178b883 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -70,6 +70,7 @@ add_iceberg_test(table_test SOURCES metrics_config_test.cc snapshot_test.cc + snapshot_util_test.cc table_metadata_builder_test.cc table_requirement_test.cc table_requirements_test.cc diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index 5ccab940c..fcd397b9e 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -47,6 +47,7 @@ iceberg_tests = { 'sources': files( 'metrics_config_test.cc', 'snapshot_test.cc', + 'snapshot_util_test.cc', 'table_metadata_builder_test.cc', 'table_requirement_test.cc', 'table_requirements_test.cc', diff --git a/src/iceberg/test/snapshot_util_test.cc b/src/iceberg/test/snapshot_util_test.cc new file mode 100644 index 000000000..e4e17251e --- /dev/null +++ b/src/iceberg/test/snapshot_util_test.cc @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include + +#include + +#include "iceberg/partition_spec.h" +#include "iceberg/schema.h" +#include "iceberg/snapshot.h" +#include "iceberg/sort_order.h" +#include "iceberg/table.h" +#include "iceberg/table_identifier.h" +#include "iceberg/table_metadata.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/mock_catalog.h" +#include "iceberg/test/mock_io.h" +#include "iceberg/util/snapshot_util_internal.h" +#include "iceberg/util/timepoint.h" + +namespace iceberg { + +namespace { + +// Schema for testing: id (int32), data (string) +std::shared_ptr CreateTestSchema() { + auto field1 = SchemaField::MakeRequired(1, "id", int32()); + auto field2 = SchemaField::MakeRequired(2, "data", string()); + return std::make_shared(std::vector{field1, field2}, 0); +} + +// Helper to create a snapshot +std::shared_ptr CreateSnapshot(int64_t snapshot_id, + std::optional parent_id, + int64_t sequence_number, + TimePointMs timestamp_ms) { + return std::make_shared( + Snapshot{.snapshot_id = snapshot_id, + .parent_snapshot_id = parent_id, + .sequence_number = sequence_number, + .timestamp_ms = timestamp_ms, + .manifest_list = + "s3://bucket/manifest-list-" + std::to_string(snapshot_id) + ".avro", + .summary = {}, + .schema_id = 0}); +} + +// Helper to create table metadata with snapshots +std::shared_ptr CreateTableMetadataWithSnapshots( + int64_t base_snapshot_id, int64_t main1_snapshot_id, int64_t main2_snapshot_id, + int64_t branch_snapshot_id, int64_t fork0_snapshot_id, int64_t fork1_snapshot_id, + int64_t fork2_snapshot_id, TimePointMs base_timestamp) { + auto metadata = std::make_shared(); + metadata->format_version = 2; + metadata->table_uuid = "test-uuid-1234"; + metadata->location = "s3://bucket/test"; + metadata->last_sequence_number = 10; + metadata->last_updated_ms = base_timestamp + std::chrono::milliseconds(3000); + metadata->last_column_id = 2; + metadata->current_schema_id = 0; + metadata->schemas.push_back(CreateTestSchema()); + metadata->default_spec_id = PartitionSpec::kInitialSpecId; + metadata->last_partition_id = 0; + metadata->current_snapshot_id = main2_snapshot_id; + metadata->default_sort_order_id = SortOrder::kInitialSortOrderId; + metadata->sort_orders.push_back(SortOrder::Unsorted()); + metadata->next_row_id = TableMetadata::kInitialRowId; + metadata->properties = TableProperties::default_properties(); + + // Create snapshots: base -> main1 -> main2 + auto base_snapshot = CreateSnapshot(base_snapshot_id, std::nullopt, 1, base_timestamp); + auto main1_snapshot = CreateSnapshot(main1_snapshot_id, base_snapshot_id, 2, + base_timestamp + std::chrono::milliseconds(1000)); + auto main2_snapshot = CreateSnapshot(main2_snapshot_id, main1_snapshot_id, 3, + base_timestamp + std::chrono::milliseconds(2000)); + + // Branch snapshot (from base) + auto branch_snapshot = CreateSnapshot(branch_snapshot_id, base_snapshot_id, 4, + base_timestamp + std::chrono::milliseconds(1500)); + + // Fork branch snapshots: fork0 -> fork1 -> fork2 (fork0 will be expired) + auto fork0_snapshot = CreateSnapshot(fork0_snapshot_id, base_snapshot_id, 5, + base_timestamp + std::chrono::milliseconds(500)); + auto fork1_snapshot = CreateSnapshot(fork1_snapshot_id, fork0_snapshot_id, 6, + base_timestamp + std::chrono::milliseconds(2500)); + auto fork2_snapshot = CreateSnapshot(fork2_snapshot_id, fork1_snapshot_id, 7, + base_timestamp + std::chrono::milliseconds(3000)); + + metadata->snapshots = {base_snapshot, main1_snapshot, main2_snapshot, + branch_snapshot, fork1_snapshot, fork2_snapshot}; + // Note: fork0 is expired, so it's not in the snapshots list + + // Snapshot log + metadata->snapshot_log = { + {.timestamp_ms = base_timestamp, .snapshot_id = base_snapshot_id}, + {.timestamp_ms = base_timestamp + std::chrono::milliseconds(1000), + .snapshot_id = main1_snapshot_id}, + {.timestamp_ms = base_timestamp + std::chrono::milliseconds(2000), + .snapshot_id = main2_snapshot_id}, + }; + + // Create refs + std::string branch_name = "b1"; + metadata->refs[branch_name] = std::make_shared( + SnapshotRef{.snapshot_id = branch_snapshot_id, .retention = SnapshotRef::Branch{}}); + + std::string fork_branch = "fork"; + metadata->refs[fork_branch] = std::make_shared( + SnapshotRef{.snapshot_id = fork2_snapshot_id, .retention = SnapshotRef::Branch{}}); + + return metadata; +} + +// Helper to extract snapshot IDs from a vector of snapshots +std::vector ExtractSnapshotIds( + const std::vector>& snapshots) { + std::vector ids; + ids.reserve(snapshots.size()); + for (const auto& snapshot : snapshots) { + ids.push_back(snapshot->snapshot_id); + } + return ids; +} + +} // namespace + +class SnapshotUtilTest : public ::testing::Test { + protected: + void SetUp() override { + base_timestamp_ = TimePointMs{std::chrono::milliseconds(1000000000000)}; + base_snapshot_id_ = 100; + main1_snapshot_id_ = 101; + main2_snapshot_id_ = 102; + branch_snapshot_id_ = 200; + fork0_snapshot_id_ = 300; + fork1_snapshot_id_ = 301; + fork2_snapshot_id_ = 302; + + auto metadata = CreateTableMetadataWithSnapshots( + base_snapshot_id_, main1_snapshot_id_, main2_snapshot_id_, branch_snapshot_id_, + fork0_snapshot_id_, fork1_snapshot_id_, fork2_snapshot_id_, base_timestamp_); + + TableIdentifier table_ident{.ns = {}, .name = "test"}; + auto io = std::make_shared(); + auto catalog = std::make_shared(); + table_ = std::move(Table::Make(table_ident, std::move(metadata), + "s3://bucket/test/metadata.json", io, catalog) + .value()); + } + + TimePointMs base_timestamp_; + int64_t base_snapshot_id_; + int64_t main1_snapshot_id_; + int64_t main2_snapshot_id_; + int64_t branch_snapshot_id_; + int64_t fork0_snapshot_id_; + int64_t fork1_snapshot_id_; + int64_t fork2_snapshot_id_; + std::shared_ptr table_; +}; + +TEST_F(SnapshotUtilTest, IsParentAncestorOf) { + ICEBERG_UNWRAP_OR_FAIL( + auto result1, + SnapshotUtil::IsParentAncestorOf(*table_, main1_snapshot_id_, base_snapshot_id_)); + EXPECT_TRUE(result1); + + ICEBERG_UNWRAP_OR_FAIL( + auto result2, + SnapshotUtil::IsParentAncestorOf(*table_, branch_snapshot_id_, main1_snapshot_id_)); + EXPECT_FALSE(result2); + + // fork2's parent is fork1, fork1's parent is fork0 (expired) + ICEBERG_UNWRAP_OR_FAIL( + auto result3, + SnapshotUtil::IsParentAncestorOf(*table_, fork2_snapshot_id_, fork0_snapshot_id_)); + EXPECT_TRUE(result3); +} + +TEST_F(SnapshotUtilTest, IsAncestorOf) { + ICEBERG_UNWRAP_OR_FAIL( + auto result1, + SnapshotUtil::IsAncestorOf(*table_, main1_snapshot_id_, base_snapshot_id_)); + EXPECT_TRUE(result1); + + ICEBERG_UNWRAP_OR_FAIL( + auto result2, + SnapshotUtil::IsAncestorOf(*table_, branch_snapshot_id_, main1_snapshot_id_)); + EXPECT_FALSE(result2); + + // fork2 -> fork1 -> fork0 (expired, not in snapshots) + ICEBERG_UNWRAP_OR_FAIL( + auto result3, + SnapshotUtil::IsAncestorOf(*table_, fork2_snapshot_id_, fork0_snapshot_id_)); + EXPECT_FALSE(result3); // fork0 is expired, so not found + + // Test with current snapshot + ICEBERG_UNWRAP_OR_FAIL(auto result4, + SnapshotUtil::IsAncestorOf(*table_, main1_snapshot_id_)); + EXPECT_TRUE(result4); + + ICEBERG_UNWRAP_OR_FAIL(auto result5, + SnapshotUtil::IsAncestorOf(*table_, branch_snapshot_id_)); + EXPECT_FALSE(result5); +} + +TEST_F(SnapshotUtilTest, CurrentAncestors) { + ICEBERG_UNWRAP_OR_FAIL(auto ancestors, SnapshotUtil::CurrentAncestors(*table_)); + auto ids = ExtractSnapshotIds(ancestors); + EXPECT_EQ(ids, std::vector( + {main2_snapshot_id_, main1_snapshot_id_, base_snapshot_id_})); + + ICEBERG_UNWRAP_OR_FAIL(auto ancestor_ids, SnapshotUtil::CurrentAncestorIds(*table_)); + EXPECT_EQ(ancestor_ids, std::vector({main2_snapshot_id_, main1_snapshot_id_, + base_snapshot_id_})); +} + +TEST_F(SnapshotUtilTest, OldestAncestor) { + ICEBERG_UNWRAP_OR_FAIL(auto oldest, SnapshotUtil::OldestAncestor(*table_)); + ASSERT_TRUE(oldest.has_value()); + EXPECT_EQ(oldest.value()->snapshot_id, base_snapshot_id_); + + ICEBERG_UNWRAP_OR_FAIL(auto oldest_of_main2, + SnapshotUtil::OldestAncestorOf(*table_, main2_snapshot_id_)); + ASSERT_TRUE(oldest_of_main2.has_value()); + EXPECT_EQ(oldest_of_main2.value()->snapshot_id, base_snapshot_id_); + + ICEBERG_UNWRAP_OR_FAIL(auto oldest_after, + SnapshotUtil::OldestAncestorAfter( + *table_, base_timestamp_ + std::chrono::milliseconds(1))); + ASSERT_TRUE(oldest_after.has_value()); + EXPECT_EQ(oldest_after.value()->snapshot_id, main1_snapshot_id_); +} + +TEST_F(SnapshotUtilTest, SnapshotsBetween) { + ICEBERG_UNWRAP_OR_FAIL( + auto snapshot_ids, + SnapshotUtil::SnapshotIdsBetween(*table_, base_snapshot_id_, main2_snapshot_id_)); + EXPECT_EQ(snapshot_ids, std::vector({main2_snapshot_id_, main1_snapshot_id_})); + + ICEBERG_UNWRAP_OR_FAIL( + auto ancestors_between1, + SnapshotUtil::AncestorsBetween(*table_, main2_snapshot_id_, main1_snapshot_id_)); + auto ids1 = ExtractSnapshotIds(ancestors_between1); + EXPECT_EQ(ids1, std::vector({main2_snapshot_id_})); + + ICEBERG_UNWRAP_OR_FAIL( + auto ancestors_between2, + SnapshotUtil::AncestorsBetween(*table_, main2_snapshot_id_, branch_snapshot_id_)); + auto ids2 = ExtractSnapshotIds(ancestors_between2); + EXPECT_EQ(ids2, std::vector( + {main2_snapshot_id_, main1_snapshot_id_, base_snapshot_id_})); +} + +TEST_F(SnapshotUtilTest, AncestorsOf) { + // Test ancestors of fork2: fork2 -> fork1 (fork0 is expired, not in snapshots) + ICEBERG_UNWRAP_OR_FAIL(auto ancestors, + SnapshotUtil::AncestorsOf(*table_, fork2_snapshot_id_)); + auto ids = ExtractSnapshotIds(ancestors); + EXPECT_EQ(ids, std::vector({fork2_snapshot_id_, fork1_snapshot_id_})); +} + +TEST_F(SnapshotUtilTest, SchemaForRef) { + ICEBERG_UNWRAP_OR_FAIL(auto initial_schema, table_->schema()); + ASSERT_NE(initial_schema, nullptr); + + // Test with null/empty ref (main branch) + ICEBERG_UNWRAP_OR_FAIL(auto schema1, SnapshotUtil::SchemaFor(*table_, "")); + EXPECT_EQ(schema1->fields().size(), initial_schema->fields().size()); + + // Test with non-existing ref + ICEBERG_UNWRAP_OR_FAIL(auto schema2, + SnapshotUtil::SchemaFor(*table_, "non-existing-ref")); + EXPECT_EQ(schema2->fields().size(), initial_schema->fields().size()); + + // Test with main branch + ICEBERG_UNWRAP_OR_FAIL( + auto schema3, + SnapshotUtil::SchemaFor(*table_, std::string(SnapshotRef::kMainBranch))); + EXPECT_EQ(schema3->fields().size(), initial_schema->fields().size()); +} + +TEST_F(SnapshotUtilTest, SchemaForBranch) { + ICEBERG_UNWRAP_OR_FAIL(auto initial_schema, table_->schema()); + ASSERT_NE(initial_schema, nullptr); + + std::string branch = "b1"; + ICEBERG_UNWRAP_OR_FAIL(auto schema, SnapshotUtil::SchemaFor(*table_, branch)); + // Branch should return current schema (not snapshot schema) + EXPECT_EQ(schema->fields().size(), initial_schema->fields().size()); +} + +TEST_F(SnapshotUtilTest, SchemaForTag) { + // Create a tag pointing to base snapshot + auto metadata = table_->metadata(); + std::string tag = "tag1"; + metadata->refs[tag] = std::make_shared( + SnapshotRef{.snapshot_id = base_snapshot_id_, .retention = SnapshotRef::Tag{}}); + + ICEBERG_UNWRAP_OR_FAIL(auto initial_schema, table_->schema()); + ASSERT_NE(initial_schema, nullptr); + + ICEBERG_UNWRAP_OR_FAIL(auto schema, SnapshotUtil::SchemaFor(*table_, tag)); + // Tag should return the schema of the snapshot it points to + // Since base snapshot has schema_id = 0, it should return the same schema + EXPECT_EQ(schema->fields().size(), initial_schema->fields().size()); +} + +TEST_F(SnapshotUtilTest, SnapshotAfter) { + ICEBERG_UNWRAP_OR_FAIL(auto snapshot_after, + SnapshotUtil::SnapshotAfter(*table_, base_snapshot_id_)); + EXPECT_EQ(snapshot_after->snapshot_id, main1_snapshot_id_); + + ICEBERG_UNWRAP_OR_FAIL(auto snapshot_after_main1, + SnapshotUtil::SnapshotAfter(*table_, main1_snapshot_id_)); + EXPECT_EQ(snapshot_after_main1->snapshot_id, main2_snapshot_id_); +} + +TEST_F(SnapshotUtilTest, SnapshotIdAsOfTime) { + // Test with timestamp before any snapshot + auto early_timestamp = base_timestamp_ - std::chrono::milliseconds(1000); + auto snapshot_id = SnapshotUtil::OptionalSnapshotIdAsOfTime(*table_, early_timestamp); + EXPECT_FALSE(snapshot_id.has_value()); + + // Test with timestamp at base snapshot + auto snapshot_id1 = SnapshotUtil::OptionalSnapshotIdAsOfTime(*table_, base_timestamp_); + ASSERT_TRUE(snapshot_id1.has_value()); + EXPECT_EQ(snapshot_id1.value(), base_snapshot_id_); + + // Test with timestamp between main1 and main2 + auto mid_timestamp = base_timestamp_ + std::chrono::milliseconds(1500); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id2, + SnapshotUtil::SnapshotIdAsOfTime(*table_, mid_timestamp)); + EXPECT_EQ(snapshot_id2, main1_snapshot_id_); +} + +TEST_F(SnapshotUtilTest, LatestSnapshot) { + // Test main branch + ICEBERG_UNWRAP_OR_FAIL( + auto main_snapshot, + SnapshotUtil::LatestSnapshot(*table_, std::string(SnapshotRef::kMainBranch))); + EXPECT_EQ(main_snapshot->snapshot_id, main2_snapshot_id_); + + // Test branch + ICEBERG_UNWRAP_OR_FAIL(auto branch_snapshot, + SnapshotUtil::LatestSnapshot(*table_, "b1")); + EXPECT_EQ(branch_snapshot->snapshot_id, branch_snapshot_id_); + + // Test non-existing branch + ICEBERG_UNWRAP_OR_FAIL(auto non_existing, + SnapshotUtil::LatestSnapshot(*table_, "non-existing")); + // Should return current snapshot + EXPECT_EQ(non_existing->snapshot_id, main2_snapshot_id_); +} + +} // namespace iceberg diff --git a/src/iceberg/type.cc b/src/iceberg/type.cc index 1cd5fb3ec..44512c0d3 100644 --- a/src/iceberg/type.cc +++ b/src/iceberg/type.cc @@ -141,9 +141,9 @@ StructType::InitFieldByLowerCaseName(const StructType& self) { } ListType::ListType(SchemaField element) : element_(std::move(element)) { - ICEBERG_CHECK(element_.name() == kElementName, - "ListType: child field name should be '{}', was '{}'", kElementName, - element_.name()); + ICEBERG_CHECK_OR_DIE(element_.name() == kElementName, + "ListType: child field name should be '{}', was '{}'", + kElementName, element_.name()); } ListType::ListType(int32_t field_id, std::shared_ptr type, bool optional) @@ -200,12 +200,12 @@ bool ListType::Equals(const Type& other) const { MapType::MapType(SchemaField key, SchemaField value) : fields_{std::move(key), std::move(value)} { - ICEBERG_CHECK(this->key().name() == kKeyName, - "MapType: key field name should be '{}', was '{}'", kKeyName, - this->key().name()); - ICEBERG_CHECK(this->value().name() == kValueName, - "MapType: value field name should be '{}', was '{}'", kValueName, - this->value().name()); + ICEBERG_CHECK_OR_DIE(this->key().name() == kKeyName, + "MapType: key field name should be '{}', was '{}'", kKeyName, + this->key().name()); + ICEBERG_CHECK_OR_DIE(this->value().name() == kValueName, + "MapType: value field name should be '{}', was '{}'", kValueName, + this->value().name()); } const SchemaField& MapType::key() const { return fields_[0]; } @@ -292,8 +292,8 @@ bool DoubleType::Equals(const Type& other) const { return other.type_id() == kTy DecimalType::DecimalType(int32_t precision, int32_t scale) : precision_(precision), scale_(scale) { - ICEBERG_CHECK(precision >= 0 && precision <= kMaxPrecision, - "DecimalType: precision must be in [0, 38], was {}", precision); + ICEBERG_CHECK_OR_DIE(precision >= 0 && precision <= kMaxPrecision, + "DecimalType: precision must be in [0, 38], was {}", precision); } int32_t DecimalType::precision() const { return precision_; } @@ -341,7 +341,7 @@ std::string UuidType::ToString() const { return "uuid"; } bool UuidType::Equals(const Type& other) const { return other.type_id() == kTypeId; } FixedType::FixedType(int32_t length) : length_(length) { - ICEBERG_CHECK(length >= 0, "FixedType: length must be >= 0, was {}", length); + ICEBERG_CHECK_OR_DIE(length >= 0, "FixedType: length must be >= 0, was {}", length); } int32_t FixedType::length() const { return length_; } diff --git a/src/iceberg/util/decimal.cc b/src/iceberg/util/decimal.cc index f33d93287..433d35869 100644 --- a/src/iceberg/util/decimal.cc +++ b/src/iceberg/util/decimal.cc @@ -300,8 +300,8 @@ bool RescaleWouldCauseDataLoss(const Decimal& value, int32_t delta_scale, Decimal::Decimal(std::string_view str) { auto result = Decimal::FromString(str); - ICEBERG_CHECK(result, "Failed to parse Decimal from string: {}, error: {}", str, - result.error().message); + ICEBERG_CHECK_OR_DIE(result, "Failed to parse Decimal from string: {}, error: {}", str, + result.error().message); *this = std::move(result.value()); } diff --git a/src/iceberg/util/macros.h b/src/iceberg/util/macros.h index 50ac13f27..c6919ab7b 100644 --- a/src/iceberg/util/macros.h +++ b/src/iceberg/util/macros.h @@ -42,8 +42,25 @@ ICEBERG_ASSIGN_OR_RAISE_IMPL(ICEBERG_ASSIGN_OR_RAISE_NAME(result_, __COUNTER__), lhs, \ rexpr) +// Macro for debug checks #define ICEBERG_DCHECK(expr, message) assert((expr) && (message)) +// Macro for precondition checks, usually used for function arguments +#define ICEBERG_PRECHECK(expr, ...) \ + do { \ + if (!(expr)) [[unlikely]] { \ + return InvalidArgument(__VA_ARGS__); \ + } \ + } while (0) + +// Macro for state checks, usually used for unexpected states +#define ICEBERG_CHECK(expr, ...) \ + do { \ + if (!(expr)) [[unlikely]] { \ + return Invalid(__VA_ARGS__); \ + } \ + } while (0) + #define ERROR_TO_EXCEPTION(error) \ if (error.kind == iceberg::ErrorKind::kInvalidExpression) { \ throw iceberg::ExpressionError(error.message); \ diff --git a/src/iceberg/util/snapshot_util.cc b/src/iceberg/util/snapshot_util.cc new file mode 100644 index 000000000..1243a1093 --- /dev/null +++ b/src/iceberg/util/snapshot_util.cc @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include "iceberg/schema.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/util/macros.h" +#include "iceberg/util/snapshot_util_internal.h" +#include "iceberg/util/timepoint.h" + +namespace iceberg { + +// Shorthand to return for a NotFound error. +#define ICEBERG_ACTION_FOR_NOT_FOUND(result, action) \ + if (!result.has_value()) [[unlikely]] { \ + if (result.error().kind == ErrorKind::kNotFound) { \ + action; \ + } \ + return std::unexpected(result.error()); \ + } + +Result>> SnapshotUtil::AncestorsOf( + const Table& table, int64_t snapshot_id) { + return table.SnapshotById(snapshot_id).and_then([&table](const auto& snapshot) { + return AncestorsOf(table, snapshot); + }); +} + +Result SnapshotUtil::IsAncestorOf(const Table& table, int64_t snapshot_id, + int64_t ancestor_snapshot_id) { + ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id)); + return std::ranges::any_of(ancestors, [ancestor_snapshot_id](const auto& snapshot) { + return snapshot != nullptr && snapshot->snapshot_id == ancestor_snapshot_id; + }); +} + +Result SnapshotUtil::IsAncestorOf(const Table& table, + int64_t ancestor_snapshot_id) { + ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot()); + ICEBERG_CHECK(current != nullptr, "Current snapshot is null"); + return IsAncestorOf(table, current->snapshot_id, ancestor_snapshot_id); +} + +Result SnapshotUtil::IsParentAncestorOf(const Table& table, int64_t snapshot_id, + int64_t ancestor_parent_snapshot_id) { + ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id)); + return std::ranges::any_of( + ancestors, [ancestor_parent_snapshot_id](const auto& snapshot) { + return snapshot != nullptr && snapshot->parent_snapshot_id.has_value() && + snapshot->parent_snapshot_id.value() == ancestor_parent_snapshot_id; + }); +} + +Result>> SnapshotUtil::CurrentAncestors( + const Table& table) { + auto current_result = table.current_snapshot(); + ICEBERG_ACTION_FOR_NOT_FOUND(current_result, return {}); + return AncestorsOf(table, current_result.value()); +} + +Result> SnapshotUtil::CurrentAncestorIds(const Table& table) { + return CurrentAncestors(table).and_then(ToIds); +} + +Result>> SnapshotUtil::OldestAncestor( + const Table& table) { + ICEBERG_ASSIGN_OR_RAISE(auto ancestors, CurrentAncestors(table)); + if (ancestors.empty()) { + return std::nullopt; + } + return ancestors.back(); +} + +Result>> SnapshotUtil::OldestAncestorOf( + const Table& table, int64_t snapshot_id) { + ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id)); + if (ancestors.empty()) { + return std::nullopt; + } + return ancestors.back(); +} + +Result>> SnapshotUtil::OldestAncestorAfter( + const Table& table, TimePointMs timestamp_ms) { + auto current_result = table.current_snapshot(); + ICEBERG_ACTION_FOR_NOT_FOUND(current_result, { return std::nullopt; }); + auto current = std::move(current_result.value()); + + std::optional> last_snapshot = std::nullopt; + ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, current)); + for (const auto& snapshot : ancestors) { + auto snapshot_timestamp_ms = snapshot->timestamp_ms; + if (snapshot_timestamp_ms < timestamp_ms) { + return last_snapshot; + } else if (snapshot_timestamp_ms == timestamp_ms) { + return snapshot; + } + last_snapshot = std::move(snapshot); + } + + if (last_snapshot.has_value() && last_snapshot.value() != nullptr && + !last_snapshot.value()->parent_snapshot_id.has_value()) { + // this is the first snapshot in the table, return it + return last_snapshot; + } + + // the first ancestor after the given time can't be determined + return NotFound("Cannot find snapshot older than {}", FormatTimestamp(timestamp_ms)); +} + +Result> SnapshotUtil::SnapshotIdsBetween(const Table& table, + int64_t from_snapshot_id, + int64_t to_snapshot_id) { + // Create a lookup function that returns null when snapshot_id equals from_snapshot_id. + // This effectively stops traversal at from_snapshot_id (exclusive) + auto lookup = [&table, + from_snapshot_id](int64_t id) -> Result> { + if (id == from_snapshot_id) { + return nullptr; + } + return table.SnapshotById(id); + }; + + return table.SnapshotById(to_snapshot_id) + .and_then( + [&lookup](const auto& to_snapshot) { return AncestorsOf(to_snapshot, lookup); }) + .and_then(ToIds); +} + +Result> SnapshotUtil::AncestorIdsBetween( + const Table& table, int64_t latest_snapshot_id, + const std::optional& oldest_snapshot_id) { + return AncestorsBetween(table, latest_snapshot_id, oldest_snapshot_id).and_then(ToIds); +} + +Result>> SnapshotUtil::AncestorsBetween( + const Table& table, int64_t latest_snapshot_id, + std::optional oldest_snapshot_id) { + ICEBERG_ASSIGN_OR_RAISE(auto start, table.SnapshotById(latest_snapshot_id)); + + if (oldest_snapshot_id.has_value()) { + if (latest_snapshot_id == oldest_snapshot_id.value()) { + return {}; + } + + return AncestorsOf(start, + [&table, oldest_snapshot_id = oldest_snapshot_id.value()]( + int64_t id) -> Result> { + if (id == oldest_snapshot_id) { + return nullptr; + } + return table.SnapshotById(id); + }); + } else { + return AncestorsOf(table, start); + } +} + +Result>> SnapshotUtil::AncestorsOf( + const Table& table, const std::shared_ptr& snapshot) { + return AncestorsOf(snapshot, [&table](int64_t id) { return table.SnapshotById(id); }); +} + +Result>> SnapshotUtil::AncestorsOf( + const std::shared_ptr& snapshot, + const std::function>(int64_t)>& lookup) { + ICEBERG_PRECHECK(snapshot != nullptr, "Snapshot is null"); + + std::shared_ptr current = snapshot; + std::vector> result; + + while (current != nullptr) { + result.push_back(current); + if (!current->parent_snapshot_id.has_value()) { + break; + } + auto parent_result = lookup(current->parent_snapshot_id.value()); + ICEBERG_ACTION_FOR_NOT_FOUND(parent_result, { break; }); + current = std::move(parent_result.value()); + } + + return result; +} + +Result> SnapshotUtil::ToIds( + const std::vector>& snapshots) { + return snapshots | + std::views::filter([](const auto& snapshot) { return snapshot != nullptr; }) | + std::views::transform( + [](const auto& snapshot) { return snapshot->snapshot_id; }) | + std::ranges::to>(); +} + +Result> SnapshotUtil::SnapshotAfter(const Table& table, + int64_t snapshot_id) { + ICEBERG_ASSIGN_OR_RAISE(auto parent, table.SnapshotById(snapshot_id)); + ICEBERG_CHECK(parent != nullptr, "Snapshot is null for id {}", snapshot_id); + + ICEBERG_ASSIGN_OR_RAISE(auto ancestors, CurrentAncestors(table)); + for (const auto& current : ancestors) { + if (current != nullptr && current->parent_snapshot_id.has_value() && + current->parent_snapshot_id.value() == snapshot_id) { + return current; + } + } + + return NotFound( + "Cannot find snapshot after {}: not an ancestor of table's current snapshot", + snapshot_id); +} + +Result SnapshotUtil::SnapshotIdAsOfTime(const Table& table, + TimePointMs timestamp_ms) { + auto snapshot_id = OptionalSnapshotIdAsOfTime(table, timestamp_ms); + ICEBERG_CHECK(snapshot_id.has_value(), "Cannot find a snapshot older than {}", + FormatTimestamp(timestamp_ms)); + return snapshot_id.value(); +} + +std::optional SnapshotUtil::OptionalSnapshotIdAsOfTime( + const Table& table, TimePointMs timestamp_ms) { + std::optional snapshot_id = std::nullopt; + for (const auto& log_entry : table.history()) { + if (log_entry.timestamp_ms <= timestamp_ms) { + snapshot_id = log_entry.snapshot_id; + } + } + return snapshot_id; +} + +Result> SnapshotUtil::SchemaFor(const Table& table, + int64_t snapshot_id) { + ICEBERG_ASSIGN_OR_RAISE(auto snapshot, table.SnapshotById(snapshot_id)); + ICEBERG_CHECK(snapshot, "Snapshot is null for id {}", snapshot_id); + + if (snapshot->schema_id.has_value()) { + return table.metadata()->SchemaById(snapshot->schema_id.value()); + } + + // TODO(any): recover the schema by reading previous metadata files + return table.schema(); +} + +Result> SnapshotUtil::SchemaFor(const Table& table, + TimePointMs timestamp_ms) { + return SnapshotIdAsOfTime(table, timestamp_ms).and_then([&table](int64_t id) { + return SchemaFor(table, id); + }); +} + +Result> SnapshotUtil::SchemaFor(const Table& table, + const std::string& ref) { + if (ref.empty() || ref == SnapshotRef::kMainBranch) { + return table.schema(); + } + + const auto& metadata = table.metadata(); + auto it = metadata->refs.find(ref); + if (it == metadata->refs.cend() || it->second->type() == SnapshotRefType::kBranch) { + return table.schema(); + } + + return SchemaFor(table, it->second->snapshot_id); +} + +Result> SnapshotUtil::SchemaFor(const TableMetadata& metadata, + const std::string& ref) { + if (ref.empty() || ref == SnapshotRef::kMainBranch) { + return metadata.Schema(); + } + + auto it = metadata.refs.find(ref); + if (it == metadata.refs.end() || it->second->type() == SnapshotRefType::kBranch) { + return metadata.Schema(); + } + + ICEBERG_ASSIGN_OR_RAISE(auto snapshot, metadata.SnapshotById(it->second->snapshot_id)); + if (!snapshot->schema_id.has_value()) { + return metadata.Schema(); + } + + return metadata.SchemaById(snapshot->schema_id); +} + +Result> SnapshotUtil::LatestSnapshot( + const Table& table, const std::string& branch) { + return LatestSnapshot(*table.metadata(), branch); +} + +Result> SnapshotUtil::LatestSnapshot( + const TableMetadata& metadata, const std::string& branch) { + if (branch.empty() || branch == SnapshotRef::kMainBranch) { + return metadata.Snapshot(); + } + + auto it = metadata.refs.find(branch); + if (it == metadata.refs.end()) { + return metadata.Snapshot(); + } + + return metadata.SnapshotById(it->second->snapshot_id); +} + +} // namespace iceberg diff --git a/src/iceberg/util/snapshot_util_internal.h b/src/iceberg/util/snapshot_util_internal.h new file mode 100644 index 000000000..e0d8830ff --- /dev/null +++ b/src/iceberg/util/snapshot_util_internal.h @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/util/timepoint.h" + +namespace iceberg { + +/// \brief Utility functions for working with snapshots +/// \note All the returned std::shared_ptr are guaranteed to be not null. +class ICEBERG_EXPORT SnapshotUtil { + public: + /// \brief Returns a vector of ancestors of the given snapshot. + /// + /// \param table The table + /// \param snapshot_id The snapshot ID to start from + /// \return A vector of ancestor snapshots + static Result>> AncestorsOf(const Table& table, + int64_t snapshot_id); + + /// \brief Returns whether ancestor_snapshot_id is an ancestor of snapshot_id. + /// + /// \param table The table to check + /// \param snapshot_id The snapshot ID to check + /// \param ancestor_snapshot_id The ancestor snapshot ID to check for + /// \return true if ancestor_snapshot_id is an ancestor of snapshot_id + static Result IsAncestorOf(const Table& table, int64_t snapshot_id, + int64_t ancestor_snapshot_id); + + /// \brief Returns whether ancestor_snapshot_id is an ancestor of the table's current + /// state. + /// + /// \param table The table to check + /// \param ancestor_snapshot_id The ancestor snapshot ID to check for + /// \return true if ancestor_snapshot_id is an ancestor of the current snapshot + static Result IsAncestorOf(const Table& table, int64_t ancestor_snapshot_id); + + /// \brief Returns whether some ancestor of snapshot_id has parentId matches + /// ancestor_parent_snapshot_id. + /// + /// \param table The table to check + /// \param snapshot_id The snapshot ID to check + /// \param ancestor_parent_snapshot_id The ancestor parent snapshot ID to check for + /// \return true if any ancestor has the given parent ID + static Result IsParentAncestorOf(const Table& table, int64_t snapshot_id, + int64_t ancestor_parent_snapshot_id); + + /// \brief Returns a vector that traverses the table's snapshots from the current to the + /// last known ancestor. + /// + /// \param table The table + /// \return A vector from the table's current snapshot to its last known ancestor + static Result>> CurrentAncestors( + const Table& table); + + /// \brief Returns the snapshot IDs for the ancestors of the current table state. + /// + /// Ancestor IDs are ordered by commit time, descending. The first ID is the current + /// snapshot, followed by its parent, and so on. + /// + /// \param table The table + /// \return A vector of snapshot IDs of the known ancestor snapshots, including the + /// current ID + static Result> CurrentAncestorIds(const Table& table); + + /// \brief Traverses the history of the table's current snapshot and finds the oldest + /// Snapshot. + /// + /// \param table The table + /// \return The oldest snapshot, or nullopt if there is no current snapshot + static Result>> OldestAncestor( + const Table& table); + + /// \brief Traverses the history and finds the oldest ancestor of the specified + /// snapshot. + /// + /// Oldest ancestor is defined as the ancestor snapshot whose parent is null or has been + /// expired. If the specified snapshot has no parent or parent has been expired, the + /// specified snapshot itself is returned. + /// + /// \param table The table + /// \param snapshot_id The ID of the snapshot to find the oldest ancestor + /// \return The oldest snapshot, or nullopt if not found + static Result>> OldestAncestorOf( + const Table& table, int64_t snapshot_id); + + /// \brief Traverses the history of the table's current snapshot, finds the oldest + /// snapshot that was committed either at or after a given time. + /// + /// \param table The table + /// \param timestamp_ms A timestamp in milliseconds + /// \return The first snapshot after the given timestamp, or nullopt if the current + /// snapshot is older than the timestamp. If the first ancestor after the given time + /// can't be determined, returns a NotFound error. + static Result>> OldestAncestorAfter( + const Table& table, TimePointMs timestamp_ms); + + /// \brief Returns list of snapshot ids in the range (from_snapshot_id,to_snapshot_id] + /// + /// This method assumes that from_snapshot_id is an ancestor of to_snapshot_id. + /// + /// \param table The table + /// \param from_snapshot_id The starting snapshot ID (exclusive) + /// \param to_snapshot_id The ending snapshot ID (inclusive) + /// \return A vector of snapshot IDs in the range + static Result> SnapshotIdsBetween(const Table& table, + int64_t from_snapshot_id, + int64_t to_snapshot_id); + + /// \brief Returns a vector of ancestor IDs between two snapshots. + /// + /// \param table The table + /// \param latest_snapshot_id The latest snapshot ID + /// \param oldest_snapshot_id The oldest snapshot ID (optional, nullopt means all + /// ancestors) + /// \return A vector of snapshot IDs between the two snapshots + static Result> AncestorIdsBetween( + const Table& table, int64_t latest_snapshot_id, + const std::optional& oldest_snapshot_id); + + /// \brief Returns a vector of ancestors between two snapshots. + /// + /// \param table The table + /// \param latest_snapshot_id The latest snapshot ID + /// \param oldest_snapshot_id The oldest snapshot ID (optional, nullopt means all + /// ancestors) + /// \return A vector of ancestor snapshots between the two snapshots + static Result>> AncestorsBetween( + const Table& table, int64_t latest_snapshot_id, + std::optional oldest_snapshot_id); + + /// \brief Traverses the history of the table's current snapshot and finds the snapshot + /// with the given snapshot id as its parent. + /// + /// \param table The table + /// \param snapshot_id The parent snapshot ID + /// \return The snapshot for which the given snapshot is the parent + static Result> SnapshotAfter(const Table& table, + int64_t snapshot_id); + + /// \brief Returns the ID of the most recent snapshot for the table as of the timestamp. + /// + /// \param table The table + /// \param timestamp_ms The timestamp in millis since the Unix epoch + /// \return The snapshot ID + static Result SnapshotIdAsOfTime(const Table& table, TimePointMs timestamp_ms); + + /// \brief Returns the ID of the most recent snapshot for the table as of the timestamp, + /// or nullopt if not found. + /// + /// \param table The table + /// \param timestamp_ms The timestamp in millis since the Unix epoch + /// \return The snapshot ID, or nullopt if not found + static std::optional OptionalSnapshotIdAsOfTime(const Table& table, + TimePointMs timestamp_ms); + + /// \brief Returns the schema of the table for the specified snapshot. + /// + /// \param table The table + /// \param snapshot_id The ID of the snapshot + /// \return The schema + static Result> SchemaFor(const Table& table, + int64_t snapshot_id); + + /// \brief Returns the schema of the table for the specified timestamp. + /// + /// \param table The table + /// \param timestamp_ms The timestamp in millis since the Unix epoch + /// \return The schema + static Result> SchemaFor(const Table& table, + TimePointMs timestamp_ms); + + /// \brief Return the schema of the snapshot at a given ref. + /// + /// If the ref does not exist or the ref is a branch, the table schema is returned + /// because it will be the schema when the new branch is created. If the ref is a tag, + /// then the snapshot schema is returned. + /// + /// \param table The table + /// \param ref Ref name of the table (empty string means main branch) + /// \return Schema of the specific snapshot at the given ref + static Result> SchemaFor(const Table& table, + const std::string& ref); + + /// \brief Return the schema of the snapshot at a given ref. + /// + /// If the ref does not exist or the ref is a branch, the table schema is returned + /// because it will be the schema when the new branch is created. If the ref is a tag, + /// then the snapshot schema is returned. + /// + /// \param metadata The table metadata + /// \param ref Ref name of the table (empty string means main branch) + /// \return Schema of the specific snapshot at the given branch + static Result> SchemaFor(const TableMetadata& metadata, + const std::string& ref); + + /// \brief Fetch the snapshot at the head of the given branch in the given table. + /// + /// This method calls Table::current_snapshot() instead of using branch API for the main + /// branch so that existing code still goes through the old code path to ensure + /// backwards compatibility. + /// + /// \param table The table + /// \param branch Branch name of the table (empty string means main branch) + /// \return The latest snapshot for the given branch + static Result> LatestSnapshot(const Table& table, + const std::string& branch); + + /// \brief Fetch the snapshot at the head of the given branch in the given table. + /// + /// This method calls TableMetadata::Snapshot() instead of using branch API for the main + /// branch so that existing code still goes through the old code path to ensure + /// backwards compatibility. + /// + /// If branch does not exist, the table's latest snapshot is returned it will be the + /// schema when the new branch is created. + /// + /// \param metadata The table metadata + /// \param branch Branch name of the table metadata (empty string means main + /// branch) + /// \return The latest snapshot for the given branch + static Result> LatestSnapshot(const TableMetadata& metadata, + const std::string& branch); + + private: + /// \brief Helper function to traverse ancestors of a snapshot. + /// + /// \param table The table + /// \param snapshot The snapshot to start from + /// \return A vector of ancestor snapshots + static Result>> AncestorsOf( + const Table& table, const std::shared_ptr& snapshot); + + /// \brief Helper function to traverse ancestors of a snapshot using a lookup function. + /// + /// \param snapshot The snapshot to start from + /// \param lookup Function to lookup snapshots by ID + /// \return A vector of ancestor snapshots + static Result>> AncestorsOf( + const std::shared_ptr& snapshot, + const std::function>(int64_t)>& lookup); + + /// \brief Helper function to convert snapshots to IDs. + /// + /// \param snapshots The snapshots + /// \return A vector of snapshot IDs + static Result> ToIds( + const std::vector>& snapshots); +}; + +} // namespace iceberg diff --git a/src/iceberg/util/timepoint.cc b/src/iceberg/util/timepoint.cc index 6438e8e95..a8bc77080 100644 --- a/src/iceberg/util/timepoint.cc +++ b/src/iceberg/util/timepoint.cc @@ -20,6 +20,8 @@ #include "iceberg/util/timepoint.h" #include +#include +#include namespace iceberg { @@ -43,4 +45,22 @@ int64_t UnixNsFromTimePointNs(TimePointNs time_point_ns) { .count(); } +std::string FormatTimestamp(TimePointMs time_point_ns) { + // Convert TimePointMs to system_clock::time_point + auto unix_ms = UnixMsFromTimePointMs(time_point_ns); + auto time_point = + std::chrono::system_clock::time_point(std::chrono::milliseconds(unix_ms)); + auto time_t = std::chrono::system_clock::to_time_t(time_point); + + // Format as ISO 8601-like string: YYYY-MM-DD HH:MM:SS + std::ostringstream oss; + oss << std::put_time(std::gmtime(&time_t), "%Y-%m-%d %H:%M:%S"); + + // Add milliseconds + auto ms = unix_ms % 1000; + oss << "." << std::setfill('0') << std::setw(3) << ms << " UTC"; + + return oss.str(); +} + } // namespace iceberg diff --git a/src/iceberg/util/timepoint.h b/src/iceberg/util/timepoint.h index 538578752..48e630ae4 100644 --- a/src/iceberg/util/timepoint.h +++ b/src/iceberg/util/timepoint.h @@ -46,4 +46,7 @@ ICEBERG_EXPORT Result TimePointNsFromUnixNs(int64_t unix_ns); /// \brief Returns a Unix timestamp in nanoseconds from a TimePointNs ICEBERG_EXPORT int64_t UnixNsFromTimePointNs(TimePointNs time_point_ns); +/// \brief Returns a human-readable string representation of a TimePointMs +ICEBERG_EXPORT std::string FormatTimestamp(TimePointMs time_point_ns); + } // namespace iceberg diff --git a/src/iceberg/util/uuid.cc b/src/iceberg/util/uuid.cc index 14256755d..9322deb93 100644 --- a/src/iceberg/util/uuid.cc +++ b/src/iceberg/util/uuid.cc @@ -204,7 +204,7 @@ Result Uuid::FromBytes(std::span bytes) { } uint8_t Uuid::operator[](size_t index) const { - ICEBERG_CHECK(index < kLength, "UUID index out of range: {}", index); + ICEBERG_CHECK_OR_DIE(index < kLength, "UUID index out of range: {}", index); return data_[index]; }