From a0dc533ba6c8e42a3006e308cc79e5d9ac019a5d Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Thu, 18 Dec 2025 00:21:59 +0800 Subject: [PATCH 1/5] feat: add snapshot util --- src/iceberg/CMakeLists.txt | 1 + src/iceberg/meson.build | 1 + src/iceberg/test/CMakeLists.txt | 1 + src/iceberg/test/meson.build | 1 + src/iceberg/test/snapshot_util_test.cc | 372 +++++++++++++++++++++ src/iceberg/util/snapshot_util.cc | 373 ++++++++++++++++++++++ src/iceberg/util/snapshot_util_internal.h | 274 ++++++++++++++++ src/iceberg/util/timepoint.cc | 21 ++ src/iceberg/util/timepoint.h | 3 + 9 files changed, 1047 insertions(+) create mode 100644 src/iceberg/test/snapshot_util_test.cc create mode 100644 src/iceberg/util/snapshot_util.cc create mode 100644 src/iceberg/util/snapshot_util_internal.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 9c25015c3..0579c67d2 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -83,6 +83,7 @@ set(ICEBERG_SOURCES util/decimal.cc util/gzip_internal.cc util/murmurhash3_internal.cc + util/snapshot_util.cc util/temporal_util.cc util/timepoint.cc util/truncate_util.cc diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 21a41dcf2..850f65905 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -105,6 +105,7 @@ iceberg_sources = files( 'util/decimal.cc', 'util/gzip_internal.cc', 'util/murmurhash3_internal.cc', + 'util/snapshot_util.cc', 'util/temporal_util.cc', 'util/timepoint.cc', 'util/truncate_util.cc', diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 2af7d1c4e..28178b883 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -70,6 +70,7 @@ add_iceberg_test(table_test SOURCES metrics_config_test.cc snapshot_test.cc + snapshot_util_test.cc table_metadata_builder_test.cc table_requirement_test.cc table_requirements_test.cc diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index 5ccab940c..fcd397b9e 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -47,6 +47,7 @@ iceberg_tests = { 'sources': files( 'metrics_config_test.cc', 'snapshot_test.cc', + 'snapshot_util_test.cc', 'table_metadata_builder_test.cc', 'table_requirement_test.cc', 'table_requirements_test.cc', diff --git a/src/iceberg/test/snapshot_util_test.cc b/src/iceberg/test/snapshot_util_test.cc new file mode 100644 index 000000000..0e521cfa9 --- /dev/null +++ b/src/iceberg/test/snapshot_util_test.cc @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include + +#include + +#include "iceberg/partition_spec.h" +#include "iceberg/schema.h" +#include "iceberg/snapshot.h" +#include "iceberg/sort_order.h" +#include "iceberg/table.h" +#include "iceberg/table_identifier.h" +#include "iceberg/table_metadata.h" +#include "iceberg/test/matchers.h" +#include "iceberg/util/snapshot_util_internal.h" +#include "iceberg/util/timepoint.h" + +namespace iceberg { + +namespace { + +// Schema for testing: id (int32), data (string) +std::shared_ptr CreateTestSchema() { + auto field1 = SchemaField::MakeRequired(1, "id", int32()); + auto field2 = SchemaField::MakeRequired(2, "data", string()); + return std::make_shared(std::vector{field1, field2}, 0); +} + +// Helper to create a snapshot +std::shared_ptr CreateSnapshot(int64_t snapshot_id, + std::optional parent_id, + int64_t sequence_number, + TimePointMs timestamp_ms) { + return std::make_shared( + Snapshot{.snapshot_id = snapshot_id, + .parent_snapshot_id = parent_id, + .sequence_number = sequence_number, + .timestamp_ms = timestamp_ms, + .manifest_list = + "s3://bucket/manifest-list-" + std::to_string(snapshot_id) + ".avro", + .summary = {}, + .schema_id = 0}); +} + +// Helper to create table metadata with snapshots +std::unique_ptr CreateTableMetadataWithSnapshots( + int64_t base_snapshot_id, int64_t main1_snapshot_id, int64_t main2_snapshot_id, + int64_t branch_snapshot_id, int64_t fork0_snapshot_id, int64_t fork1_snapshot_id, + int64_t fork2_snapshot_id, TimePointMs base_timestamp) { + auto metadata = std::make_unique(); + metadata->format_version = 2; + metadata->table_uuid = "test-uuid-1234"; + metadata->location = "s3://bucket/test"; + metadata->last_sequence_number = 10; + metadata->last_updated_ms = base_timestamp + std::chrono::milliseconds(3000); + metadata->last_column_id = 2; + metadata->current_schema_id = 0; + metadata->schemas.push_back(CreateTestSchema()); + metadata->default_spec_id = PartitionSpec::kInitialSpecId; + metadata->last_partition_id = 0; + metadata->current_snapshot_id = main2_snapshot_id; + metadata->default_sort_order_id = SortOrder::kInitialSortOrderId; + metadata->sort_orders.push_back(SortOrder::Unsorted()); + metadata->next_row_id = TableMetadata::kInitialRowId; + metadata->properties = TableProperties::default_properties(); + + // Create snapshots: base -> main1 -> main2 + auto base_snapshot = CreateSnapshot(base_snapshot_id, std::nullopt, 1, base_timestamp); + auto main1_snapshot = CreateSnapshot(main1_snapshot_id, base_snapshot_id, 2, + base_timestamp + std::chrono::milliseconds(1000)); + auto main2_snapshot = CreateSnapshot(main2_snapshot_id, main1_snapshot_id, 3, + base_timestamp + std::chrono::milliseconds(2000)); + + // Branch snapshot (from base) + auto branch_snapshot = CreateSnapshot(branch_snapshot_id, base_snapshot_id, 4, + base_timestamp + std::chrono::milliseconds(1500)); + + // Fork branch snapshots: fork0 -> fork1 -> fork2 (fork0 will be expired) + auto fork0_snapshot = CreateSnapshot(fork0_snapshot_id, base_snapshot_id, 5, + base_timestamp + std::chrono::milliseconds(500)); + auto fork1_snapshot = CreateSnapshot(fork1_snapshot_id, fork0_snapshot_id, 6, + base_timestamp + std::chrono::milliseconds(2500)); + auto fork2_snapshot = CreateSnapshot(fork2_snapshot_id, fork1_snapshot_id, 7, + base_timestamp + std::chrono::milliseconds(3000)); + + metadata->snapshots = {base_snapshot, main1_snapshot, main2_snapshot, + branch_snapshot, fork1_snapshot, fork2_snapshot}; + // Note: fork0 is expired, so it's not in the snapshots list + + // Snapshot log + metadata->snapshot_log = { + {.timestamp_ms = base_timestamp, .snapshot_id = base_snapshot_id}, + {.timestamp_ms = base_timestamp + std::chrono::milliseconds(1000), + .snapshot_id = main1_snapshot_id}, + {.timestamp_ms = base_timestamp + std::chrono::milliseconds(2000), + .snapshot_id = main2_snapshot_id}, + }; + + // Create refs + std::string branch_name = "b1"; + metadata->refs[branch_name] = std::make_shared( + SnapshotRef{.snapshot_id = branch_snapshot_id, .retention = SnapshotRef::Branch{}}); + + std::string fork_branch = "fork"; + metadata->refs[fork_branch] = std::make_shared( + SnapshotRef{.snapshot_id = fork2_snapshot_id, .retention = SnapshotRef::Branch{}}); + + return metadata; +} + +// Helper to extract snapshot IDs from a vector of snapshots +std::vector ExtractSnapshotIds( + const std::vector>& snapshots) { + std::vector ids; + ids.reserve(snapshots.size()); + for (const auto& snapshot : snapshots) { + ids.push_back(snapshot->snapshot_id); + } + return ids; +} + +} // namespace + +class SnapshotUtilTest : public ::testing::Test { + protected: + void SetUp() override { + base_timestamp_ = TimePointMs{std::chrono::milliseconds(1000000000000)}; + base_snapshot_id_ = 100; + main1_snapshot_id_ = 101; + main2_snapshot_id_ = 102; + branch_snapshot_id_ = 200; + fork0_snapshot_id_ = 300; + fork1_snapshot_id_ = 301; + fork2_snapshot_id_ = 302; + + auto metadata = CreateTableMetadataWithSnapshots( + base_snapshot_id_, main1_snapshot_id_, main2_snapshot_id_, branch_snapshot_id_, + fork0_snapshot_id_, fork1_snapshot_id_, fork2_snapshot_id_, base_timestamp_); + + TableIdentifier table_ident{.ns = {}, .name = "test"}; + table_ = std::make_unique(table_ident, std::move(metadata), + "s3://bucket/test/metadata.json", nullptr, nullptr); + } + + TimePointMs base_timestamp_; + int64_t base_snapshot_id_; + int64_t main1_snapshot_id_; + int64_t main2_snapshot_id_; + int64_t branch_snapshot_id_; + int64_t fork0_snapshot_id_; + int64_t fork1_snapshot_id_; + int64_t fork2_snapshot_id_; + std::unique_ptr
table_; +}; + +TEST_F(SnapshotUtilTest, IsParentAncestorOf) { + ICEBERG_UNWRAP_OR_FAIL( + auto result1, + SnapshotUtil::IsParentAncestorOf(*table_, main1_snapshot_id_, base_snapshot_id_)); + EXPECT_TRUE(result1); + + ICEBERG_UNWRAP_OR_FAIL( + auto result2, + SnapshotUtil::IsParentAncestorOf(*table_, branch_snapshot_id_, main1_snapshot_id_)); + EXPECT_FALSE(result2); + + // fork2's parent is fork1, fork1's parent is fork0 (expired) + ICEBERG_UNWRAP_OR_FAIL( + auto result3, + SnapshotUtil::IsParentAncestorOf(*table_, fork2_snapshot_id_, fork0_snapshot_id_)); + EXPECT_TRUE(result3); +} + +TEST_F(SnapshotUtilTest, IsAncestorOf) { + ICEBERG_UNWRAP_OR_FAIL( + auto result1, + SnapshotUtil::IsAncestorOf(*table_, main1_snapshot_id_, base_snapshot_id_)); + EXPECT_TRUE(result1); + + ICEBERG_UNWRAP_OR_FAIL( + auto result2, + SnapshotUtil::IsAncestorOf(*table_, branch_snapshot_id_, main1_snapshot_id_)); + EXPECT_FALSE(result2); + + // fork2 -> fork1 -> fork0 (expired, not in snapshots) + ICEBERG_UNWRAP_OR_FAIL( + auto result3, + SnapshotUtil::IsAncestorOf(*table_, fork2_snapshot_id_, fork0_snapshot_id_)); + EXPECT_FALSE(result3); // fork0 is expired, so not found + + // Test with current snapshot + ICEBERG_UNWRAP_OR_FAIL(auto result4, + SnapshotUtil::IsAncestorOf(*table_, main1_snapshot_id_)); + EXPECT_TRUE(result4); + + ICEBERG_UNWRAP_OR_FAIL(auto result5, + SnapshotUtil::IsAncestorOf(*table_, branch_snapshot_id_)); + EXPECT_FALSE(result5); +} + +TEST_F(SnapshotUtilTest, CurrentAncestors) { + ICEBERG_UNWRAP_OR_FAIL(auto ancestors, SnapshotUtil::CurrentAncestors(*table_)); + auto ids = ExtractSnapshotIds(ancestors); + EXPECT_EQ(ids, std::vector( + {main2_snapshot_id_, main1_snapshot_id_, base_snapshot_id_})); + + ICEBERG_UNWRAP_OR_FAIL(auto ancestor_ids, SnapshotUtil::CurrentAncestorIds(*table_)); + EXPECT_EQ(ancestor_ids, std::vector({main2_snapshot_id_, main1_snapshot_id_, + base_snapshot_id_})); +} + +TEST_F(SnapshotUtilTest, OldestAncestor) { + ICEBERG_UNWRAP_OR_FAIL(auto oldest, SnapshotUtil::OldestAncestor(*table_)); + ASSERT_TRUE(oldest.has_value()); + EXPECT_EQ(oldest.value()->snapshot_id, base_snapshot_id_); + + ICEBERG_UNWRAP_OR_FAIL(auto oldest_of_main2, + SnapshotUtil::OldestAncestorOf(*table_, main2_snapshot_id_)); + ASSERT_TRUE(oldest_of_main2.has_value()); + EXPECT_EQ(oldest_of_main2.value()->snapshot_id, base_snapshot_id_); + + ICEBERG_UNWRAP_OR_FAIL(auto oldest_after, + SnapshotUtil::OldestAncestorAfter( + *table_, base_timestamp_ + std::chrono::milliseconds(1))); + ASSERT_TRUE(oldest_after.has_value()); + EXPECT_EQ(oldest_after.value()->snapshot_id, main1_snapshot_id_); +} + +TEST_F(SnapshotUtilTest, SnapshotsBetween) { + ICEBERG_UNWRAP_OR_FAIL( + auto snapshot_ids, + SnapshotUtil::SnapshotIdsBetween(*table_, base_snapshot_id_, main2_snapshot_id_)); + EXPECT_EQ(snapshot_ids, std::vector({main2_snapshot_id_, main1_snapshot_id_})); + + ICEBERG_UNWRAP_OR_FAIL( + auto ancestors_between1, + SnapshotUtil::AncestorsBetween(*table_, main2_snapshot_id_, main1_snapshot_id_)); + auto ids1 = ExtractSnapshotIds(ancestors_between1); + EXPECT_EQ(ids1, std::vector({main2_snapshot_id_})); + + ICEBERG_UNWRAP_OR_FAIL( + auto ancestors_between2, + SnapshotUtil::AncestorsBetween(*table_, main2_snapshot_id_, branch_snapshot_id_)); + auto ids2 = ExtractSnapshotIds(ancestors_between2); + EXPECT_EQ(ids2, std::vector( + {main2_snapshot_id_, main1_snapshot_id_, base_snapshot_id_})); +} + +TEST_F(SnapshotUtilTest, AncestorsOf) { + // Test ancestors of fork2: fork2 -> fork1 (fork0 is expired, not in snapshots) + ICEBERG_UNWRAP_OR_FAIL(auto ancestors, + SnapshotUtil::AncestorsOf(*table_, fork2_snapshot_id_)); + auto ids = ExtractSnapshotIds(ancestors); + EXPECT_EQ(ids, std::vector({fork2_snapshot_id_, fork1_snapshot_id_})); +} + +TEST_F(SnapshotUtilTest, SchemaForRef) { + ICEBERG_UNWRAP_OR_FAIL(auto initial_schema, table_->schema()); + ASSERT_NE(initial_schema, nullptr); + + // Test with null/empty ref (main branch) + ICEBERG_UNWRAP_OR_FAIL(auto schema1, SnapshotUtil::SchemaFor(*table_, "")); + EXPECT_EQ(schema1->fields().size(), initial_schema->fields().size()); + + // Test with non-existing ref + ICEBERG_UNWRAP_OR_FAIL(auto schema2, + SnapshotUtil::SchemaFor(*table_, "non-existing-ref")); + EXPECT_EQ(schema2->fields().size(), initial_schema->fields().size()); + + // Test with main branch + ICEBERG_UNWRAP_OR_FAIL( + auto schema3, + SnapshotUtil::SchemaFor(*table_, std::string(SnapshotRef::kMainBranch))); + EXPECT_EQ(schema3->fields().size(), initial_schema->fields().size()); +} + +TEST_F(SnapshotUtilTest, SchemaForBranch) { + ICEBERG_UNWRAP_OR_FAIL(auto initial_schema, table_->schema()); + ASSERT_NE(initial_schema, nullptr); + + std::string branch = "b1"; + ICEBERG_UNWRAP_OR_FAIL(auto schema, SnapshotUtil::SchemaFor(*table_, branch)); + // Branch should return current schema (not snapshot schema) + EXPECT_EQ(schema->fields().size(), initial_schema->fields().size()); +} + +TEST_F(SnapshotUtilTest, SchemaForTag) { + // Create a tag pointing to base snapshot + auto metadata = table_->metadata(); + std::string tag = "tag1"; + metadata->refs[tag] = std::make_shared( + SnapshotRef{.snapshot_id = base_snapshot_id_, .retention = SnapshotRef::Tag{}}); + + ICEBERG_UNWRAP_OR_FAIL(auto initial_schema, table_->schema()); + ASSERT_NE(initial_schema, nullptr); + + ICEBERG_UNWRAP_OR_FAIL(auto schema, SnapshotUtil::SchemaFor(*table_, tag)); + // Tag should return the schema of the snapshot it points to + // Since base snapshot has schema_id = 0, it should return the same schema + EXPECT_EQ(schema->fields().size(), initial_schema->fields().size()); +} + +TEST_F(SnapshotUtilTest, SnapshotAfter) { + ICEBERG_UNWRAP_OR_FAIL(auto snapshot_after, + SnapshotUtil::SnapshotAfter(*table_, base_snapshot_id_)); + EXPECT_EQ(snapshot_after->snapshot_id, main1_snapshot_id_); + + ICEBERG_UNWRAP_OR_FAIL(auto snapshot_after_main1, + SnapshotUtil::SnapshotAfter(*table_, main1_snapshot_id_)); + EXPECT_EQ(snapshot_after_main1->snapshot_id, main2_snapshot_id_); +} + +TEST_F(SnapshotUtilTest, SnapshotIdAsOfTime) { + // Test with timestamp before any snapshot + auto early_timestamp = base_timestamp_ - std::chrono::milliseconds(1000); + auto snapshot_id = SnapshotUtil::NullableSnapshotIdAsOfTime(*table_, early_timestamp); + EXPECT_FALSE(snapshot_id.has_value()); + + // Test with timestamp at base snapshot + auto snapshot_id1 = SnapshotUtil::NullableSnapshotIdAsOfTime(*table_, base_timestamp_); + ASSERT_TRUE(snapshot_id1.has_value()); + EXPECT_EQ(snapshot_id1.value(), base_snapshot_id_); + + // Test with timestamp between main1 and main2 + auto mid_timestamp = base_timestamp_ + std::chrono::milliseconds(1500); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id2, + SnapshotUtil::SnapshotIdAsOfTime(*table_, mid_timestamp)); + EXPECT_EQ(snapshot_id2, main1_snapshot_id_); +} + +TEST_F(SnapshotUtilTest, LatestSnapshot) { + // Test main branch + ICEBERG_UNWRAP_OR_FAIL( + auto main_snapshot, + SnapshotUtil::LatestSnapshot(*table_, std::string(SnapshotRef::kMainBranch))); + ASSERT_TRUE(main_snapshot.has_value()); + EXPECT_EQ(main_snapshot.value()->snapshot_id, main2_snapshot_id_); + + // Test branch + ICEBERG_UNWRAP_OR_FAIL(auto branch_snapshot, + SnapshotUtil::LatestSnapshot(*table_, "b1")); + ASSERT_TRUE(branch_snapshot.has_value()); + EXPECT_EQ(branch_snapshot.value()->snapshot_id, branch_snapshot_id_); + + // Test non-existing branch + ICEBERG_UNWRAP_OR_FAIL(auto non_existing, + SnapshotUtil::LatestSnapshot(*table_, "non-existing")); + ASSERT_TRUE(non_existing.has_value()); + // Should return current snapshot + EXPECT_EQ(non_existing.value()->snapshot_id, main2_snapshot_id_); +} + +} // namespace iceberg diff --git a/src/iceberg/util/snapshot_util.cc b/src/iceberg/util/snapshot_util.cc new file mode 100644 index 000000000..5fea516af --- /dev/null +++ b/src/iceberg/util/snapshot_util.cc @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/schema.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/util/snapshot_util_internal.h" +#include "iceberg/util/timepoint.h" + +namespace iceberg { + +namespace {} // namespace + +Result>> SnapshotUtil::AncestorsOf( + const Table& table, int64_t snapshot_id) { + ICEBERG_ASSIGN_OR_RAISE(auto start, table.SnapshotById(snapshot_id)); + if (!start) { + return InvalidArgument("Cannot find snapshot: {}", snapshot_id); + } + return AncestorsOf(table, start); +} + +Result SnapshotUtil::IsAncestorOf(const Table& table, int64_t snapshot_id, + int64_t ancestor_snapshot_id) { + ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id)); + for (const auto& snapshot : ancestors) { + if (snapshot->snapshot_id == ancestor_snapshot_id) { + return true; + } + } + return false; +} + +Result SnapshotUtil::IsAncestorOf(const Table& table, + int64_t ancestor_snapshot_id) { + ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot()); + return IsAncestorOf(table, current->snapshot_id, ancestor_snapshot_id); +} + +Result SnapshotUtil::IsParentAncestorOf(const Table& table, int64_t snapshot_id, + int64_t ancestor_parent_snapshot_id) { + ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id)); + for (const auto& snapshot : ancestors) { + if (snapshot->parent_snapshot_id.has_value() && + snapshot->parent_snapshot_id.value() == ancestor_parent_snapshot_id) { + return true; + } + } + return false; +} + +Result>> SnapshotUtil::CurrentAncestors( + const Table& table) { + ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot()); + return AncestorsOf(table, current); +} + +Result> SnapshotUtil::CurrentAncestorIds(const Table& table) { + ICEBERG_ASSIGN_OR_RAISE(auto ancestors, CurrentAncestors(table)); + return ToIds(ancestors); +} + +Result>> SnapshotUtil::OldestAncestor( + const Table& table) { + ICEBERG_ASSIGN_OR_RAISE(auto ancestors, CurrentAncestors(table)); + if (ancestors.empty()) { + return std::nullopt; + } + return ancestors.back(); +} + +Result>> SnapshotUtil::OldestAncestorOf( + const Table& table, int64_t snapshot_id) { + ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id)); + if (ancestors.empty()) { + return std::nullopt; + } + return ancestors.back(); +} + +Result>> SnapshotUtil::OldestAncestorAfter( + const Table& table, TimePointMs timestamp_ms) { + ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot()); + if (!current) { + // there are no snapshots or ancestors + return std::nullopt; + } + + std::shared_ptr last_snapshot = nullptr; + auto ancestors = AncestorsOf(table, current); + for (const auto& snapshot : ancestors) { + auto snapshot_timestamp_ms = snapshot->timestamp_ms; + if (snapshot_timestamp_ms < timestamp_ms) { + return last_snapshot ? std::make_optional(last_snapshot) : std::nullopt; + } else if (snapshot_timestamp_ms == timestamp_ms) { + return snapshot; + } + last_snapshot = snapshot; + } + + if (last_snapshot && !last_snapshot->parent_snapshot_id.has_value()) { + // this is the first snapshot in the table, return it + return last_snapshot; + } + + return ValidationFailed("Cannot find snapshot older than {}", + FormatTimestamp(timestamp_ms)); +} + +Result> SnapshotUtil::SnapshotIdsBetween(const Table& table, + int64_t from_snapshot_id, + int64_t to_snapshot_id) { + ICEBERG_ASSIGN_OR_RAISE(auto to_snapshot, table.SnapshotById(to_snapshot_id)); + if (!to_snapshot) { + return InvalidArgument("Cannot find snapshot: {}", to_snapshot_id); + } + + // Create a lookup function that returns null when snapshot_id equals from_snapshot_id + // This effectively stops traversal at from_snapshot_id (exclusive) + auto lookup = [&table, + from_snapshot_id](int64_t id) -> Result> { + if (id == from_snapshot_id) { + return nullptr; + } + return table.SnapshotById(id); + }; + + auto ancestors = AncestorsOf(to_snapshot, lookup); + return ToIds(ancestors); +} + +Result> SnapshotUtil::AncestorIdsBetween( + const Table& table, int64_t latest_snapshot_id, + const std::optional& oldest_snapshot_id) { + ICEBERG_ASSIGN_OR_RAISE( + auto ancestors, AncestorsBetween(table, latest_snapshot_id, oldest_snapshot_id)); + return ToIds(ancestors); +} + +Result>> SnapshotUtil::AncestorsBetween( + const Table& table, int64_t latest_snapshot_id, + const std::optional& oldest_snapshot_id) { + ICEBERG_ASSIGN_OR_RAISE(auto start, table.SnapshotById(latest_snapshot_id)); + if (!start) { + return InvalidArgument("Cannot find snapshot: {}", latest_snapshot_id); + } + + if (oldest_snapshot_id.has_value()) { + if (latest_snapshot_id == oldest_snapshot_id.value()) { + return std::vector>(); + } + + auto lookup = [&table, oldest_snapshot_id = oldest_snapshot_id.value()]( + int64_t id) -> Result> { + if (id == oldest_snapshot_id) { + return nullptr; + } + return table.SnapshotById(id); + }; + return AncestorsOf(start, lookup); + } else { + return AncestorsOf(table, start); + } +} + +std::vector> SnapshotUtil::AncestorsOf( + const Table& table, const std::shared_ptr& snapshot) { + std::vector> result; + if (!snapshot) { + return result; + } + + std::shared_ptr current = snapshot; + while (current) { + result.push_back(current); + if (!current->parent_snapshot_id.has_value()) { + break; + } + auto parent_result = table.SnapshotById(current->parent_snapshot_id.value()); + if (!parent_result.has_value()) { + // Parent snapshot not found (e.g., expired), stop traversal + break; + } + current = parent_result.value(); + } + + return result; +} + +std::vector> SnapshotUtil::AncestorsOf( + const std::shared_ptr& snapshot, + const std::function>(int64_t)>& lookup) { + std::vector> result; + if (!snapshot) { + return result; + } + + std::shared_ptr current = snapshot; + while (current) { + result.push_back(current); + if (!current->parent_snapshot_id.has_value()) { + break; + } + auto parent_result = lookup(current->parent_snapshot_id.value()); + if (!parent_result.has_value()) { + break; + } + auto parent = parent_result.value(); + if (!parent) { + break; + } + current = parent; + } + + return result; +} + +std::vector SnapshotUtil::ToIds( + const std::vector>& snapshots) { + std::vector ids; + ids.reserve(snapshots.size()); + for (const auto& snapshot : snapshots) { + ids.push_back(snapshot->snapshot_id); + } + return ids; +} + +Result> SnapshotUtil::SnapshotAfter(const Table& table, + int64_t snapshot_id) { + ICEBERG_ASSIGN_OR_RAISE(auto parent, table.SnapshotById(snapshot_id)); + if (!parent) { + return InvalidArgument("Cannot find parent snapshot: {}", snapshot_id); + } + + ICEBERG_ASSIGN_OR_RAISE(auto ancestors, CurrentAncestors(table)); + for (const auto& current : ancestors) { + if (current->parent_snapshot_id.has_value() && + current->parent_snapshot_id.value() == snapshot_id) { + return current; + } + } + + return ValidationFailed( + "Cannot find snapshot after {}: not an ancestor of table's current snapshot", + snapshot_id); +} + +Result SnapshotUtil::SnapshotIdAsOfTime(const Table& table, + TimePointMs timestamp_ms) { + auto snapshot_id = NullableSnapshotIdAsOfTime(table, timestamp_ms); + if (!snapshot_id) { + return ValidationFailed("Cannot find a snapshot older than {}", + FormatTimestamp(timestamp_ms)); + } + return *snapshot_id; +} + +std::optional SnapshotUtil::NullableSnapshotIdAsOfTime( + const Table& table, TimePointMs timestamp_ms) { + std::optional snapshot_id = std::nullopt; + const auto& history = table.history(); + for (const auto& log_entry : history) { + if (log_entry.timestamp_ms <= timestamp_ms) { + snapshot_id = log_entry.snapshot_id; + } + } + return snapshot_id; +} + +Result> SnapshotUtil::SchemaFor(const Table& table, + int64_t snapshot_id) { + ICEBERG_ASSIGN_OR_RAISE(auto snapshot, table.SnapshotById(snapshot_id)); + + if (snapshot->schema_id.has_value()) { + ICEBERG_ASSIGN_OR_RAISE(auto schemas, table.schemas()); + auto it = schemas.get().find(snapshot->schema_id.value()); + if (it == schemas.get().end()) { + return ValidationFailed("Cannot find schema with schema id {}", + snapshot->schema_id.value()); + } + return it->second; + } + + return table.schema(); +} + +Result> SnapshotUtil::SchemaFor(const Table& table, + TimePointMs timestamp_ms) { + ICEBERG_ASSIGN_OR_RAISE(auto id, SnapshotIdAsOfTime(table, timestamp_ms)); + return SchemaFor(table, id); +} + +Result> SnapshotUtil::SchemaFor(const Table& table, + const std::string& ref) { + if (ref.empty() || ref == SnapshotRef::kMainBranch) { + return table.schema(); + } + + const auto& metadata = table.metadata(); + auto it = metadata->refs.find(ref); + if (it == metadata->refs.end() || it->second->type() == SnapshotRefType::kBranch) { + return table.schema(); + } + + return SchemaFor(table, it->second->snapshot_id); +} + +Result> SnapshotUtil::SchemaFor(const TableMetadata& metadata, + const std::string& ref) { + if (ref.empty() || ref == SnapshotRef::kMainBranch) { + return metadata.Schema(); + } + + auto it = metadata.refs.find(ref); + if (it == metadata.refs.end() || it->second->type() == SnapshotRefType::kBranch) { + return metadata.Schema(); + } + + ICEBERG_ASSIGN_OR_RAISE(auto snapshot, metadata.SnapshotById(it->second->snapshot_id)); + if (!snapshot->schema_id.has_value()) { + return metadata.Schema(); + } + + return metadata.SchemaById(snapshot->schema_id); +} + +Result>> SnapshotUtil::LatestSnapshot( + const Table& table, const std::string& branch) { + if (branch.empty() || branch == SnapshotRef::kMainBranch) { + return table.current_snapshot(); + } + + const auto& metadata = table.metadata(); + auto it = metadata->refs.find(branch); + if (it == metadata->refs.end()) { + return table.current_snapshot(); + } + + return table.SnapshotById(it->second->snapshot_id); +} + +Result>> SnapshotUtil::LatestSnapshot( + const TableMetadata& metadata, const std::string& branch) { + if (branch.empty() || branch == SnapshotRef::kMainBranch) { + return metadata.Snapshot(); + } + + auto it = metadata.refs.find(branch); + if (it == metadata.refs.end()) { + return metadata.Snapshot(); + } + + return metadata.SnapshotById(it->second->snapshot_id); +} + +} // namespace iceberg diff --git a/src/iceberg/util/snapshot_util_internal.h b/src/iceberg/util/snapshot_util_internal.h new file mode 100644 index 000000000..fcc100813 --- /dev/null +++ b/src/iceberg/util/snapshot_util_internal.h @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/util/timepoint.h" + +namespace iceberg { + +/// \brief Utility functions for working with snapshots +class ICEBERG_EXPORT SnapshotUtil { + public: + /// \brief Returns a vector of ancestors of the given snapshot. + /// + /// \param table The table + /// \param snapshot_id The snapshot ID to start from + /// \return A vector of ancestor snapshots + static Result>> AncestorsOf(const Table& table, + int64_t snapshot_id); + + /// \brief Returns whether ancestor_snapshot_id is an ancestor of snapshot_id. + /// + /// \param table The table to check + /// \param snapshot_id The snapshot ID to check + /// \param ancestor_snapshot_id The ancestor snapshot ID to check for + /// \return true if ancestor_snapshot_id is an ancestor of snapshot_id + static Result IsAncestorOf(const Table& table, int64_t snapshot_id, + int64_t ancestor_snapshot_id); + + /// \brief Returns whether ancestor_snapshot_id is an ancestor of the table's current + /// state. + /// + /// \param table The table to check + /// \param ancestor_snapshot_id The ancestor snapshot ID to check for + /// \return true if ancestor_snapshot_id is an ancestor of the current snapshot + static Result IsAncestorOf(const Table& table, int64_t ancestor_snapshot_id); + + /// \brief Returns whether some ancestor of snapshot_id has parentId matches + /// ancestor_parent_snapshot_id. + /// + /// \param table The table to check + /// \param snapshot_id The snapshot ID to check + /// \param ancestor_parent_snapshot_id The ancestor parent snapshot ID to check for + /// \return true if any ancestor has the given parent ID + static Result IsParentAncestorOf(const Table& table, int64_t snapshot_id, + int64_t ancestor_parent_snapshot_id); + + /// \brief Returns a vector that traverses the table's snapshots from the current to the + /// last known ancestor. + /// + /// \param table The table + /// \return A vector from the table's current snapshot to its last known ancestor + static Result>> CurrentAncestors( + const Table& table); + + /// \brief Return the snapshot IDs for the ancestors of the current table state. + /// + /// Ancestor IDs are ordered by commit time, descending. The first ID is the current + /// snapshot, followed by its parent, and so on. + /// + /// \param table The table + /// \return A vector of snapshot IDs of the known ancestor snapshots, including the + /// current ID + static Result> CurrentAncestorIds(const Table& table); + + /// \brief Traverses the history of the table's current snapshot and finds the oldest + /// Snapshot. + /// + /// \param table The table + /// \return The oldest snapshot, or nullopt if there is no current snapshot + static Result>> OldestAncestor( + const Table& table); + + /// \brief Traverses the history and finds the oldest ancestor of the specified + /// snapshot. + /// + /// Oldest ancestor is defined as the ancestor snapshot whose parent is null or has been + /// expired. If the specified snapshot has no parent or parent has been expired, the + /// specified snapshot itself is returned. + /// + /// \param table The table + /// \param snapshot_id The ID of the snapshot to find the oldest ancestor + /// \return The oldest snapshot, or nullopt if not found + static Result>> OldestAncestorOf( + const Table& table, int64_t snapshot_id); + + /// \brief Traverses the history of the table's current snapshot, finds the oldest + /// snapshot that was committed either at or after a given time. + /// + /// \param table The table + /// \param timestamp_ms A timestamp in milliseconds + /// \return The first snapshot after the given timestamp, or nullopt if the current + /// snapshot is older than the timestamp + static Result>> OldestAncestorAfter( + const Table& table, TimePointMs timestamp_ms); + + /// \brief Returns list of snapshot ids in the range (from_snapshot_id,to_snapshot_id] + /// + /// This method assumes that from_snapshot_id is an ancestor of to_snapshot_id. + /// + /// \param table The table + /// \param from_snapshot_id The starting snapshot ID (exclusive) + /// \param to_snapshot_id The ending snapshot ID (inclusive) + /// \return A vector of snapshot IDs in the range + static Result> SnapshotIdsBetween(const Table& table, + int64_t from_snapshot_id, + int64_t to_snapshot_id); + + /// \brief Returns a vector of ancestor IDs between two snapshots. + /// + /// \param table The table + /// \param latest_snapshot_id The latest snapshot ID + /// \param oldest_snapshot_id The oldest snapshot ID (optional, nullopt means all + /// ancestors) + /// \return A vector of snapshot IDs between the two snapshots + static Result> AncestorIdsBetween( + const Table& table, int64_t latest_snapshot_id, + const std::optional& oldest_snapshot_id); + + /// \brief Returns a vector of ancestors between two snapshots. + /// + /// \param table The table + /// \param latest_snapshot_id The latest snapshot ID + /// \param oldest_snapshot_id The oldest snapshot ID (optional, nullopt means all + /// ancestors) + /// \return A vector of ancestor snapshots between the two snapshots + static Result>> AncestorsBetween( + const Table& table, int64_t latest_snapshot_id, + const std::optional& oldest_snapshot_id); + + /// \brief Traverses the history of the table's current snapshot and finds the snapshot + /// with the given snapshot id as its parent. + /// + /// \param table The table + /// \param snapshot_id The parent snapshot ID + /// \return The snapshot for which the given snapshot is the parent + static Result> SnapshotAfter(const Table& table, + int64_t snapshot_id); + + /// \brief Returns the ID of the most recent snapshot for the table as of the timestamp. + /// + /// \param table The table + /// \param timestamp_ms The timestamp in millis since the Unix epoch + /// \return The snapshot ID + static Result SnapshotIdAsOfTime(const Table& table, TimePointMs timestamp_ms); + + /// \brief Returns the ID of the most recent snapshot for the table as of the timestamp, + /// or nullopt if not found. + /// + /// \param table The table + /// \param timestamp_ms The timestamp in millis since the Unix epoch + /// \return The snapshot ID, or nullopt if not found + static std::optional NullableSnapshotIdAsOfTime(const Table& table, + TimePointMs timestamp_ms); + + /// \brief Returns the schema of the table for the specified snapshot. + /// + /// \param table The table + /// \param snapshot_id The ID of the snapshot + /// \return The schema + static Result> SchemaFor(const Table& table, + int64_t snapshot_id); + + /// \brief Returns the schema of the table for the specified timestamp. + /// + /// \param table The table + /// \param timestamp_ms The timestamp in millis since the Unix epoch + /// \return The schema + static Result> SchemaFor(const Table& table, + TimePointMs timestamp_ms); + + /// \brief Return the schema of the snapshot at a given ref. + /// + /// If the ref does not exist or the ref is a branch, the table schema is returned + /// because it will be the schema when the new branch is created. If the ref is a tag, + /// then the snapshot schema is returned. + /// + /// \param table The table + /// \param ref Ref name of the table (empty string means main branch) + /// \return Schema of the specific snapshot at the given ref + static Result> SchemaFor(const Table& table, + const std::string& ref); + + /// \brief Return the schema of the snapshot at a given ref. + /// + /// If the ref does not exist or the ref is a branch, the table schema is returned + /// because it will be the schema when the new branch is created. If the ref is a tag, + /// then the snapshot schema is returned. + /// + /// \param metadata The table metadata + /// \param ref Ref name of the table (empty string means main branch) + /// \return Schema of the specific snapshot at the given branch + static Result> SchemaFor(const TableMetadata& metadata, + const std::string& ref); + + /// \brief Fetch the snapshot at the head of the given branch in the given table. + /// + /// This method calls Table::current_snapshot() instead of using branch API for the main + /// branch so that existing code still goes through the old code path to ensure + /// backwards compatibility. + /// + /// \param table The table + /// \param branch Branch name of the table (empty string means main branch) + /// \return The latest snapshot for the given branch + static Result>> LatestSnapshot( + const Table& table, const std::string& branch); + + /// \brief Fetch the snapshot at the head of the given branch in the given table. + /// + /// This method calls TableMetadata::Snapshot() instead of using branch API for the main + /// branch so that existing code still goes through the old code path to ensure + /// backwards compatibility. + /// + /// If branch does not exist, the table's latest snapshot is returned it will be the + /// schema when the new branch is created. + /// + /// \param metadata The table metadata + /// \param branch Branch name of the table metadata (empty string means main + /// branch) + /// \return The latest snapshot for the given branch + static Result>> LatestSnapshot( + const TableMetadata& metadata, const std::string& branch); + + private: + /// \brief Helper function to traverse ancestors of a snapshot. + /// + /// \param table The table + /// \param snapshot The snapshot to start from + /// \return A vector of ancestor snapshots + static std::vector> AncestorsOf( + const Table& table, const std::shared_ptr& snapshot); + + /// \brief Helper function to traverse ancestors of a snapshot using a lookup function. + /// + /// \param snapshot The snapshot to start from + /// \param lookup Function to lookup snapshots by ID + /// \return A vector of ancestor snapshots + static std::vector> AncestorsOf( + const std::shared_ptr& snapshot, + const std::function>(int64_t)>& lookup); + + /// \brief Helper function to convert snapshots to IDs. + /// + /// \param snapshots The snapshots + /// \return A vector of snapshot IDs + static std::vector ToIds( + const std::vector>& snapshots); +}; + +} // namespace iceberg diff --git a/src/iceberg/util/timepoint.cc b/src/iceberg/util/timepoint.cc index 6438e8e95..37e2ad78c 100644 --- a/src/iceberg/util/timepoint.cc +++ b/src/iceberg/util/timepoint.cc @@ -20,6 +20,8 @@ #include "iceberg/util/timepoint.h" #include +#include +#include namespace iceberg { @@ -43,4 +45,23 @@ int64_t UnixNsFromTimePointNs(TimePointNs time_point_ns) { .count(); } +std::string FormatTimestamp(TimePointMs timestamp_ms) { + // Convert TimePointMs to system_clock::time_point + auto unix_ms = UnixMsFromTimePointMs(timestamp_ms); + auto time_point = + std::chrono::system_clock::time_point(std::chrono::milliseconds(unix_ms)); + + auto time_t = std::chrono::system_clock::to_time_t(time_point); + + // Format as ISO 8601-like string: YYYY-MM-DD HH:MM:SS + std::ostringstream oss; + oss << std::put_time(std::gmtime(&time_t), "%Y-%m-%d %H:%M:%S"); + + // Add milliseconds + auto ms = unix_ms % 1000; + oss << "." << std::setfill('0') << std::setw(3) << ms << " UTC"; + + return oss.str(); +} + } // namespace iceberg diff --git a/src/iceberg/util/timepoint.h b/src/iceberg/util/timepoint.h index 538578752..5a5c7edeb 100644 --- a/src/iceberg/util/timepoint.h +++ b/src/iceberg/util/timepoint.h @@ -46,4 +46,7 @@ ICEBERG_EXPORT Result TimePointNsFromUnixNs(int64_t unix_ns); /// \brief Returns a Unix timestamp in nanoseconds from a TimePointNs ICEBERG_EXPORT int64_t UnixNsFromTimePointNs(TimePointNs time_point_ns); +/// \brief Returns a human-readable string representation of a TimePointMs +ICEBERG_EXPORT std::string FormatTimestamp(TimePointMs timestamp_ms); + } // namespace iceberg From 8c6600b59cac1f2f9237f5a84d8d989b49bd7a91 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Fri, 19 Dec 2025 22:07:36 +0800 Subject: [PATCH 2/5] fix: review comments --- src/iceberg/util/snapshot_util.cc | 140 +++++++++------------- src/iceberg/util/snapshot_util_internal.h | 5 +- 2 files changed, 62 insertions(+), 83 deletions(-) diff --git a/src/iceberg/util/snapshot_util.cc b/src/iceberg/util/snapshot_util.cc index 5fea516af..cb2b1850d 100644 --- a/src/iceberg/util/snapshot_util.cc +++ b/src/iceberg/util/snapshot_util.cc @@ -17,23 +17,30 @@ * under the License. */ +#include + #include "iceberg/schema.h" #include "iceberg/snapshot.h" #include "iceberg/table.h" #include "iceberg/table_metadata.h" +#include "iceberg/util/macros.h" #include "iceberg/util/snapshot_util_internal.h" #include "iceberg/util/timepoint.h" namespace iceberg { -namespace {} // namespace +#define ICEBERG_RETURN_NULLOPT_IF_NOT_FOUND(result) \ + if (!result.has_value()) [[unlikely]] { \ + if (result.error().kind == ErrorKind::kNotFound) { \ + return std::nullopt; \ + } \ + return std::unexpected(result.error()); \ + } Result>> SnapshotUtil::AncestorsOf( const Table& table, int64_t snapshot_id) { ICEBERG_ASSIGN_OR_RAISE(auto start, table.SnapshotById(snapshot_id)); - if (!start) { - return InvalidArgument("Cannot find snapshot: {}", snapshot_id); - } + ICEBERG_DCHECK(start, "Snapshot is null"); return AncestorsOf(table, start); } @@ -57,13 +64,11 @@ Result SnapshotUtil::IsAncestorOf(const Table& table, Result SnapshotUtil::IsParentAncestorOf(const Table& table, int64_t snapshot_id, int64_t ancestor_parent_snapshot_id) { ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id)); - for (const auto& snapshot : ancestors) { - if (snapshot->parent_snapshot_id.has_value() && - snapshot->parent_snapshot_id.value() == ancestor_parent_snapshot_id) { - return true; - } - } - return false; + return std::ranges::any_of( + ancestors, [ancestor_parent_snapshot_id](const auto& snapshot) { + return snapshot->parent_snapshot_id.has_value() && + snapshot->parent_snapshot_id.value() == ancestor_parent_snapshot_id; + }); } Result>> SnapshotUtil::CurrentAncestors( @@ -97,40 +102,40 @@ Result>> SnapshotUtil::OldestAncestorOf( Result>> SnapshotUtil::OldestAncestorAfter( const Table& table, TimePointMs timestamp_ms) { - ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot()); + auto current_result = table.current_snapshot(); + ICEBERG_RETURN_NULLOPT_IF_NOT_FOUND(current_result); + auto current = current_result.value(); if (!current) { // there are no snapshots or ancestors return std::nullopt; } - std::shared_ptr last_snapshot = nullptr; - auto ancestors = AncestorsOf(table, current); + std::optional> last_snapshot = std::nullopt; + ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, current)); for (const auto& snapshot : ancestors) { auto snapshot_timestamp_ms = snapshot->timestamp_ms; if (snapshot_timestamp_ms < timestamp_ms) { - return last_snapshot ? std::make_optional(last_snapshot) : std::nullopt; + return last_snapshot; } else if (snapshot_timestamp_ms == timestamp_ms) { return snapshot; } - last_snapshot = snapshot; + last_snapshot = std::move(snapshot); } - if (last_snapshot && !last_snapshot->parent_snapshot_id.has_value()) { + if (last_snapshot.has_value() && + !last_snapshot.value()->parent_snapshot_id.has_value()) { // this is the first snapshot in the table, return it return last_snapshot; } - return ValidationFailed("Cannot find snapshot older than {}", - FormatTimestamp(timestamp_ms)); + return NotFound("Cannot find snapshot older than {}", FormatTimestamp(timestamp_ms)); } Result> SnapshotUtil::SnapshotIdsBetween(const Table& table, int64_t from_snapshot_id, int64_t to_snapshot_id) { ICEBERG_ASSIGN_OR_RAISE(auto to_snapshot, table.SnapshotById(to_snapshot_id)); - if (!to_snapshot) { - return InvalidArgument("Cannot find snapshot: {}", to_snapshot_id); - } + ICEBERG_DCHECK(to_snapshot, "Snapshot is null"); // Create a lookup function that returns null when snapshot_id equals from_snapshot_id // This effectively stops traversal at from_snapshot_id (exclusive) @@ -142,7 +147,7 @@ Result> SnapshotUtil::SnapshotIdsBetween(const Table& table return table.SnapshotById(id); }; - auto ancestors = AncestorsOf(to_snapshot, lookup); + ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(to_snapshot, lookup)); return ToIds(ancestors); } @@ -158,9 +163,7 @@ Result>> SnapshotUtil::AncestorsBetween( const Table& table, int64_t latest_snapshot_id, const std::optional& oldest_snapshot_id) { ICEBERG_ASSIGN_OR_RAISE(auto start, table.SnapshotById(latest_snapshot_id)); - if (!start) { - return InvalidArgument("Cannot find snapshot: {}", latest_snapshot_id); - } + ICEBERG_DCHECK(start, "Snapshot is null"); if (oldest_snapshot_id.has_value()) { if (latest_snapshot_id == oldest_snapshot_id.value()) { @@ -180,37 +183,19 @@ Result>> SnapshotUtil::AncestorsBetween( } } -std::vector> SnapshotUtil::AncestorsOf( +Result>> SnapshotUtil::AncestorsOf( const Table& table, const std::shared_ptr& snapshot) { - std::vector> result; - if (!snapshot) { - return result; - } - - std::shared_ptr current = snapshot; - while (current) { - result.push_back(current); - if (!current->parent_snapshot_id.has_value()) { - break; - } - auto parent_result = table.SnapshotById(current->parent_snapshot_id.value()); - if (!parent_result.has_value()) { - // Parent snapshot not found (e.g., expired), stop traversal - break; - } - current = parent_result.value(); - } - - return result; + auto lookup = [&table](int64_t id) -> Result> { + return table.SnapshotById(id); + }; + return AncestorsOf(snapshot, lookup); } -std::vector> SnapshotUtil::AncestorsOf( +Result>> SnapshotUtil::AncestorsOf( const std::shared_ptr& snapshot, const std::function>(int64_t)>& lookup) { std::vector> result; - if (!snapshot) { - return result; - } + ICEBERG_DCHECK(snapshot, "Snapshot is null"); std::shared_ptr current = snapshot; while (current) { @@ -220,13 +205,13 @@ std::vector> SnapshotUtil::AncestorsOf( } auto parent_result = lookup(current->parent_snapshot_id.value()); if (!parent_result.has_value()) { - break; - } - auto parent = parent_result.value(); - if (!parent) { - break; + if (parent_result.error().kind == ErrorKind::kNotFound) { + // Parent snapshot not found (e.g., expired), stop traversal + break; + } + return std::unexpected(parent_result.error()); } - current = parent; + current = std::move(parent_result.value()); } return result; @@ -234,20 +219,16 @@ std::vector> SnapshotUtil::AncestorsOf( std::vector SnapshotUtil::ToIds( const std::vector>& snapshots) { - std::vector ids; - ids.reserve(snapshots.size()); - for (const auto& snapshot : snapshots) { - ids.push_back(snapshot->snapshot_id); - } - return ids; + return snapshots | std::ranges::views::transform([](const auto& snapshot) { + return snapshot->snapshot_id; + }) | + std::ranges::to>(); } Result> SnapshotUtil::SnapshotAfter(const Table& table, int64_t snapshot_id) { ICEBERG_ASSIGN_OR_RAISE(auto parent, table.SnapshotById(snapshot_id)); - if (!parent) { - return InvalidArgument("Cannot find parent snapshot: {}", snapshot_id); - } + ICEBERG_DCHECK(parent, "Parent snapshot is null"); ICEBERG_ASSIGN_OR_RAISE(auto ancestors, CurrentAncestors(table)); for (const auto& current : ancestors) { @@ -257,7 +238,7 @@ Result> SnapshotUtil::SnapshotAfter(const Table& table } } - return ValidationFailed( + return NotFound( "Cannot find snapshot after {}: not an ancestor of table's current snapshot", snapshot_id); } @@ -287,6 +268,7 @@ std::optional SnapshotUtil::NullableSnapshotIdAsOfTime( Result> SnapshotUtil::SchemaFor(const Table& table, int64_t snapshot_id) { ICEBERG_ASSIGN_OR_RAISE(auto snapshot, table.SnapshotById(snapshot_id)); + ICEBERG_DCHECK(snapshot, "Snapshot is null"); if (snapshot->schema_id.has_value()) { ICEBERG_ASSIGN_OR_RAISE(auto schemas, table.schemas()); @@ -343,31 +325,27 @@ Result> SnapshotUtil::SchemaFor(const TableMetadata& met Result>> SnapshotUtil::LatestSnapshot( const Table& table, const std::string& branch) { - if (branch.empty() || branch == SnapshotRef::kMainBranch) { - return table.current_snapshot(); - } - - const auto& metadata = table.metadata(); - auto it = metadata->refs.find(branch); - if (it == metadata->refs.end()) { - return table.current_snapshot(); - } - - return table.SnapshotById(it->second->snapshot_id); + return LatestSnapshot(*table.metadata(), branch); } Result>> SnapshotUtil::LatestSnapshot( const TableMetadata& metadata, const std::string& branch) { if (branch.empty() || branch == SnapshotRef::kMainBranch) { - return metadata.Snapshot(); + auto snapshot_result = metadata.Snapshot(); + ICEBERG_RETURN_NULLOPT_IF_NOT_FOUND(snapshot_result); + return snapshot_result.value(); } auto it = metadata.refs.find(branch); if (it == metadata.refs.end()) { - return metadata.Snapshot(); + auto snapshot_result = metadata.Snapshot(); + ICEBERG_RETURN_NULLOPT_IF_NOT_FOUND(snapshot_result); + return snapshot_result.value(); } - return metadata.SnapshotById(it->second->snapshot_id); + auto snapshot_result = metadata.SnapshotById(it->second->snapshot_id); + ICEBERG_RETURN_NULLOPT_IF_NOT_FOUND(snapshot_result); + return snapshot_result.value(); } } // namespace iceberg diff --git a/src/iceberg/util/snapshot_util_internal.h b/src/iceberg/util/snapshot_util_internal.h index fcc100813..44f515826 100644 --- a/src/iceberg/util/snapshot_util_internal.h +++ b/src/iceberg/util/snapshot_util_internal.h @@ -32,6 +32,7 @@ namespace iceberg { /// \brief Utility functions for working with snapshots +/// \note All the returned std::shared_ptr are guaranteed to be not null. class ICEBERG_EXPORT SnapshotUtil { public: /// \brief Returns a vector of ancestors of the given snapshot. @@ -251,7 +252,7 @@ class ICEBERG_EXPORT SnapshotUtil { /// \param table The table /// \param snapshot The snapshot to start from /// \return A vector of ancestor snapshots - static std::vector> AncestorsOf( + static Result>> AncestorsOf( const Table& table, const std::shared_ptr& snapshot); /// \brief Helper function to traverse ancestors of a snapshot using a lookup function. @@ -259,7 +260,7 @@ class ICEBERG_EXPORT SnapshotUtil { /// \param snapshot The snapshot to start from /// \param lookup Function to lookup snapshots by ID /// \return A vector of ancestor snapshots - static std::vector> AncestorsOf( + static Result>> AncestorsOf( const std::shared_ptr& snapshot, const std::function>(int64_t)>& lookup); From 2dc17d09778eec64113fe53e14f7c553b586d95d Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Sat, 20 Dec 2025 00:02:42 +0800 Subject: [PATCH 3/5] fix: rebase conflict --- src/iceberg/test/snapshot_util_test.cc | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/iceberg/test/snapshot_util_test.cc b/src/iceberg/test/snapshot_util_test.cc index 0e521cfa9..3b2a235cf 100644 --- a/src/iceberg/test/snapshot_util_test.cc +++ b/src/iceberg/test/snapshot_util_test.cc @@ -31,6 +31,8 @@ #include "iceberg/table_identifier.h" #include "iceberg/table_metadata.h" #include "iceberg/test/matchers.h" +#include "iceberg/test/mock_catalog.h" +#include "iceberg/test/mock_io.h" #include "iceberg/util/snapshot_util_internal.h" #include "iceberg/util/timepoint.h" @@ -62,11 +64,11 @@ std::shared_ptr CreateSnapshot(int64_t snapshot_id, } // Helper to create table metadata with snapshots -std::unique_ptr CreateTableMetadataWithSnapshots( +std::shared_ptr CreateTableMetadataWithSnapshots( int64_t base_snapshot_id, int64_t main1_snapshot_id, int64_t main2_snapshot_id, int64_t branch_snapshot_id, int64_t fork0_snapshot_id, int64_t fork1_snapshot_id, int64_t fork2_snapshot_id, TimePointMs base_timestamp) { - auto metadata = std::make_unique(); + auto metadata = std::make_shared(); metadata->format_version = 2; metadata->table_uuid = "test-uuid-1234"; metadata->location = "s3://bucket/test"; @@ -157,8 +159,11 @@ class SnapshotUtilTest : public ::testing::Test { fork0_snapshot_id_, fork1_snapshot_id_, fork2_snapshot_id_, base_timestamp_); TableIdentifier table_ident{.ns = {}, .name = "test"}; - table_ = std::make_unique
(table_ident, std::move(metadata), - "s3://bucket/test/metadata.json", nullptr, nullptr); + auto io = std::make_shared(); + auto catalog = std::make_shared(); + table_ = std::move(Table::Make(table_ident, std::move(metadata), + "s3://bucket/test/metadata.json", io, catalog) + .value()); } TimePointMs base_timestamp_; @@ -169,7 +174,7 @@ class SnapshotUtilTest : public ::testing::Test { int64_t fork0_snapshot_id_; int64_t fork1_snapshot_id_; int64_t fork2_snapshot_id_; - std::unique_ptr
table_; + std::shared_ptr
table_; }; TEST_F(SnapshotUtilTest, IsParentAncestorOf) { From 219b03041cadc817797beceb41ca3fe8395787a5 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Sat, 20 Dec 2025 10:34:18 +0800 Subject: [PATCH 4/5] fix: resolve more feedback --- src/iceberg/util/snapshot_util.cc | 25 ++++++++++++----------- src/iceberg/util/snapshot_util_internal.h | 2 +- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/src/iceberg/util/snapshot_util.cc b/src/iceberg/util/snapshot_util.cc index cb2b1850d..a105e6437 100644 --- a/src/iceberg/util/snapshot_util.cc +++ b/src/iceberg/util/snapshot_util.cc @@ -47,17 +47,15 @@ Result>> SnapshotUtil::AncestorsOf( Result SnapshotUtil::IsAncestorOf(const Table& table, int64_t snapshot_id, int64_t ancestor_snapshot_id) { ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id)); - for (const auto& snapshot : ancestors) { - if (snapshot->snapshot_id == ancestor_snapshot_id) { - return true; - } - } - return false; + return std::ranges::any_of(ancestors, [ancestor_snapshot_id](const auto& snapshot) { + return snapshot->snapshot_id == ancestor_snapshot_id; + }); } Result SnapshotUtil::IsAncestorOf(const Table& table, int64_t ancestor_snapshot_id) { ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot()); + ICEBERG_DCHECK(current, "Current snapshot is null"); return IsAncestorOf(table, current->snapshot_id, ancestor_snapshot_id); } @@ -73,8 +71,14 @@ Result SnapshotUtil::IsParentAncestorOf(const Table& table, int64_t snapsh Result>> SnapshotUtil::CurrentAncestors( const Table& table) { - ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot()); - return AncestorsOf(table, current); + auto current_result = table.current_snapshot(); + if (!current_result.has_value()) { + if (current_result.error().kind == ErrorKind::kNotFound) { + return std::vector>(); + } + return std::unexpected(current_result.error()); + } + return AncestorsOf(table, current_result.value()); } Result> SnapshotUtil::CurrentAncestorIds(const Table& table) { @@ -105,10 +109,7 @@ Result>> SnapshotUtil::OldestAncestorAft auto current_result = table.current_snapshot(); ICEBERG_RETURN_NULLOPT_IF_NOT_FOUND(current_result); auto current = current_result.value(); - if (!current) { - // there are no snapshots or ancestors - return std::nullopt; - } + ICEBERG_DCHECK(current, "Current snapshot is null"); std::optional> last_snapshot = std::nullopt; ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, current)); diff --git a/src/iceberg/util/snapshot_util_internal.h b/src/iceberg/util/snapshot_util_internal.h index 44f515826..77e555c71 100644 --- a/src/iceberg/util/snapshot_util_internal.h +++ b/src/iceberg/util/snapshot_util_internal.h @@ -78,7 +78,7 @@ class ICEBERG_EXPORT SnapshotUtil { static Result>> CurrentAncestors( const Table& table); - /// \brief Return the snapshot IDs for the ancestors of the current table state. + /// \brief Returns the snapshot IDs for the ancestors of the current table state. /// /// Ancestor IDs are ordered by commit time, descending. The first ID is the current /// snapshot, followed by its parent, and so on. From 3a7b9f239eb27339ba5abe0ccd1e9dcc37c09477 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Mon, 22 Dec 2025 16:37:54 +0800 Subject: [PATCH 5/5] refine error handling --- src/iceberg/avro/avro_stream_internal.cc | 17 ++- src/iceberg/exception.h | 2 +- src/iceberg/table_metadata.cc | 10 +- src/iceberg/table_metadata.h | 2 +- src/iceberg/test/snapshot_util_test.cc | 13 +- src/iceberg/type.cc | 24 ++-- src/iceberg/util/decimal.cc | 4 +- src/iceberg/util/macros.h | 17 +++ src/iceberg/util/snapshot_util.cc | 153 +++++++++------------- src/iceberg/util/snapshot_util_internal.h | 17 +-- src/iceberg/util/timepoint.cc | 5 +- src/iceberg/util/timepoint.h | 2 +- src/iceberg/util/uuid.cc | 2 +- 13 files changed, 128 insertions(+), 140 deletions(-) diff --git a/src/iceberg/avro/avro_stream_internal.cc b/src/iceberg/avro/avro_stream_internal.cc index f299b5233..e868bab4b 100644 --- a/src/iceberg/avro/avro_stream_internal.cc +++ b/src/iceberg/avro/avro_stream_internal.cc @@ -66,8 +66,9 @@ bool AvroInputStream::next(const uint8_t** data, size_t* len) { } void AvroInputStream::backup(size_t len) { - ICEBERG_CHECK(len <= buffer_pos_, "Cannot backup {} bytes, only {} bytes available", - len, buffer_pos_); + ICEBERG_CHECK_OR_DIE(len <= buffer_pos_, + "Cannot backup {} bytes, only {} bytes available", len, + buffer_pos_); buffer_pos_ -= len; byte_count_ -= len; @@ -88,7 +89,8 @@ size_t AvroInputStream::byteCount() const { return byte_count_; } void AvroInputStream::seek(int64_t position) { auto status = input_stream_->Seek(position); - ICEBERG_CHECK(status.ok(), "Failed to seek to {}, got {}", position, status.ToString()); + ICEBERG_CHECK_OR_DIE(status.ok(), "Failed to seek to {}, got {}", position, + status.ToString()); buffer_pos_ = 0; available_bytes_ = 0; @@ -116,8 +118,9 @@ bool AvroOutputStream::next(uint8_t** data, size_t* len) { } void AvroOutputStream::backup(size_t len) { - ICEBERG_CHECK(len <= buffer_pos_, "Cannot backup {} bytes, only {} bytes available", - len, buffer_pos_); + ICEBERG_CHECK_OR_DIE(len <= buffer_pos_, + "Cannot backup {} bytes, only {} bytes available", len, + buffer_pos_); buffer_pos_ -= len; } @@ -126,12 +129,12 @@ uint64_t AvroOutputStream::byteCount() const { return flushed_bytes_ + buffer_po void AvroOutputStream::flush() { if (buffer_pos_ > 0) { auto status = output_stream_->Write(buffer_.data(), buffer_pos_); - ICEBERG_CHECK(status.ok(), "Write failed {}", status.ToString()); + ICEBERG_CHECK_OR_DIE(status.ok(), "Write failed {}", status.ToString()); flushed_bytes_ += buffer_pos_; buffer_pos_ = 0; } auto status = output_stream_->Flush(); - ICEBERG_CHECK(status.ok(), "Flush failed {}", status.ToString()); + ICEBERG_CHECK_OR_DIE(status.ok(), "Flush failed {}", status.ToString()); } const std::shared_ptr<::arrow::io::OutputStream>& AvroOutputStream::arrow_output_stream() diff --git a/src/iceberg/exception.h b/src/iceberg/exception.h index bbb9c2283..0333eb12a 100644 --- a/src/iceberg/exception.h +++ b/src/iceberg/exception.h @@ -44,7 +44,7 @@ class ICEBERG_EXPORT ExpressionError : public IcebergError { explicit ExpressionError(const std::string& what) : IcebergError(what) {} }; -#define ICEBERG_CHECK(condition, ...) \ +#define ICEBERG_CHECK_OR_DIE(condition, ...) \ do { \ if (!(condition)) [[unlikely]] { \ throw iceberg::IcebergError(std::format(__VA_ARGS__)); \ diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index e787b6fd2..71b3fa302 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -69,9 +69,9 @@ Result> TableMetadata::Schema() const { } Result> TableMetadata::SchemaById( - const std::optional& schema_id) const { + std::optional schema_id) const { auto iter = std::ranges::find_if(schemas, [schema_id](const auto& schema) { - return schema->schema_id() == schema_id; + return schema != nullptr && schema->schema_id() == schema_id; }); if (iter == schemas.end()) { return NotFound("Schema with ID {} is not found", schema_id.value_or(-1)); @@ -81,7 +81,7 @@ Result> TableMetadata::SchemaById( Result> TableMetadata::PartitionSpec() const { auto iter = std::ranges::find_if(partition_specs, [this](const auto& spec) { - return spec->spec_id() == default_spec_id; + return spec != nullptr && spec->spec_id() == default_spec_id; }); if (iter == partition_specs.end()) { return NotFound("Default partition spec is not found"); @@ -91,7 +91,7 @@ Result> TableMetadata::PartitionSpec() const { Result> TableMetadata::SortOrder() const { auto iter = std::ranges::find_if(sort_orders, [this](const auto& order) { - return order->order_id() == default_sort_order_id; + return order != nullptr && order->order_id() == default_sort_order_id; }); if (iter == sort_orders.end()) { return NotFound("Default sort order is not found"); @@ -105,7 +105,7 @@ Result> TableMetadata::Snapshot() const { Result> TableMetadata::SnapshotById(int64_t snapshot_id) const { auto iter = std::ranges::find_if(snapshots, [snapshot_id](const auto& snapshot) { - return snapshot->snapshot_id == snapshot_id; + return snapshot != nullptr && snapshot->snapshot_id == snapshot_id; }); if (iter == snapshots.end()) { return NotFound("Snapshot with ID {} is not found", snapshot_id); diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h index f7c260114..3f2f36101 100644 --- a/src/iceberg/table_metadata.h +++ b/src/iceberg/table_metadata.h @@ -128,7 +128,7 @@ struct ICEBERG_EXPORT TableMetadata { Result> Schema() const; /// \brief Get the current schema by ID, return NotFoundError if not found Result> SchemaById( - const std::optional& schema_id) const; + std::optional schema_id) const; /// \brief Get the current partition spec, return NotFoundError if not found Result> PartitionSpec() const; /// \brief Get the current sort order, return NotFoundError if not found diff --git a/src/iceberg/test/snapshot_util_test.cc b/src/iceberg/test/snapshot_util_test.cc index 3b2a235cf..e4e17251e 100644 --- a/src/iceberg/test/snapshot_util_test.cc +++ b/src/iceberg/test/snapshot_util_test.cc @@ -337,11 +337,11 @@ TEST_F(SnapshotUtilTest, SnapshotAfter) { TEST_F(SnapshotUtilTest, SnapshotIdAsOfTime) { // Test with timestamp before any snapshot auto early_timestamp = base_timestamp_ - std::chrono::milliseconds(1000); - auto snapshot_id = SnapshotUtil::NullableSnapshotIdAsOfTime(*table_, early_timestamp); + auto snapshot_id = SnapshotUtil::OptionalSnapshotIdAsOfTime(*table_, early_timestamp); EXPECT_FALSE(snapshot_id.has_value()); // Test with timestamp at base snapshot - auto snapshot_id1 = SnapshotUtil::NullableSnapshotIdAsOfTime(*table_, base_timestamp_); + auto snapshot_id1 = SnapshotUtil::OptionalSnapshotIdAsOfTime(*table_, base_timestamp_); ASSERT_TRUE(snapshot_id1.has_value()); EXPECT_EQ(snapshot_id1.value(), base_snapshot_id_); @@ -357,21 +357,18 @@ TEST_F(SnapshotUtilTest, LatestSnapshot) { ICEBERG_UNWRAP_OR_FAIL( auto main_snapshot, SnapshotUtil::LatestSnapshot(*table_, std::string(SnapshotRef::kMainBranch))); - ASSERT_TRUE(main_snapshot.has_value()); - EXPECT_EQ(main_snapshot.value()->snapshot_id, main2_snapshot_id_); + EXPECT_EQ(main_snapshot->snapshot_id, main2_snapshot_id_); // Test branch ICEBERG_UNWRAP_OR_FAIL(auto branch_snapshot, SnapshotUtil::LatestSnapshot(*table_, "b1")); - ASSERT_TRUE(branch_snapshot.has_value()); - EXPECT_EQ(branch_snapshot.value()->snapshot_id, branch_snapshot_id_); + EXPECT_EQ(branch_snapshot->snapshot_id, branch_snapshot_id_); // Test non-existing branch ICEBERG_UNWRAP_OR_FAIL(auto non_existing, SnapshotUtil::LatestSnapshot(*table_, "non-existing")); - ASSERT_TRUE(non_existing.has_value()); // Should return current snapshot - EXPECT_EQ(non_existing.value()->snapshot_id, main2_snapshot_id_); + EXPECT_EQ(non_existing->snapshot_id, main2_snapshot_id_); } } // namespace iceberg diff --git a/src/iceberg/type.cc b/src/iceberg/type.cc index 1cd5fb3ec..44512c0d3 100644 --- a/src/iceberg/type.cc +++ b/src/iceberg/type.cc @@ -141,9 +141,9 @@ StructType::InitFieldByLowerCaseName(const StructType& self) { } ListType::ListType(SchemaField element) : element_(std::move(element)) { - ICEBERG_CHECK(element_.name() == kElementName, - "ListType: child field name should be '{}', was '{}'", kElementName, - element_.name()); + ICEBERG_CHECK_OR_DIE(element_.name() == kElementName, + "ListType: child field name should be '{}', was '{}'", + kElementName, element_.name()); } ListType::ListType(int32_t field_id, std::shared_ptr type, bool optional) @@ -200,12 +200,12 @@ bool ListType::Equals(const Type& other) const { MapType::MapType(SchemaField key, SchemaField value) : fields_{std::move(key), std::move(value)} { - ICEBERG_CHECK(this->key().name() == kKeyName, - "MapType: key field name should be '{}', was '{}'", kKeyName, - this->key().name()); - ICEBERG_CHECK(this->value().name() == kValueName, - "MapType: value field name should be '{}', was '{}'", kValueName, - this->value().name()); + ICEBERG_CHECK_OR_DIE(this->key().name() == kKeyName, + "MapType: key field name should be '{}', was '{}'", kKeyName, + this->key().name()); + ICEBERG_CHECK_OR_DIE(this->value().name() == kValueName, + "MapType: value field name should be '{}', was '{}'", kValueName, + this->value().name()); } const SchemaField& MapType::key() const { return fields_[0]; } @@ -292,8 +292,8 @@ bool DoubleType::Equals(const Type& other) const { return other.type_id() == kTy DecimalType::DecimalType(int32_t precision, int32_t scale) : precision_(precision), scale_(scale) { - ICEBERG_CHECK(precision >= 0 && precision <= kMaxPrecision, - "DecimalType: precision must be in [0, 38], was {}", precision); + ICEBERG_CHECK_OR_DIE(precision >= 0 && precision <= kMaxPrecision, + "DecimalType: precision must be in [0, 38], was {}", precision); } int32_t DecimalType::precision() const { return precision_; } @@ -341,7 +341,7 @@ std::string UuidType::ToString() const { return "uuid"; } bool UuidType::Equals(const Type& other) const { return other.type_id() == kTypeId; } FixedType::FixedType(int32_t length) : length_(length) { - ICEBERG_CHECK(length >= 0, "FixedType: length must be >= 0, was {}", length); + ICEBERG_CHECK_OR_DIE(length >= 0, "FixedType: length must be >= 0, was {}", length); } int32_t FixedType::length() const { return length_; } diff --git a/src/iceberg/util/decimal.cc b/src/iceberg/util/decimal.cc index f33d93287..433d35869 100644 --- a/src/iceberg/util/decimal.cc +++ b/src/iceberg/util/decimal.cc @@ -300,8 +300,8 @@ bool RescaleWouldCauseDataLoss(const Decimal& value, int32_t delta_scale, Decimal::Decimal(std::string_view str) { auto result = Decimal::FromString(str); - ICEBERG_CHECK(result, "Failed to parse Decimal from string: {}, error: {}", str, - result.error().message); + ICEBERG_CHECK_OR_DIE(result, "Failed to parse Decimal from string: {}, error: {}", str, + result.error().message); *this = std::move(result.value()); } diff --git a/src/iceberg/util/macros.h b/src/iceberg/util/macros.h index 50ac13f27..c6919ab7b 100644 --- a/src/iceberg/util/macros.h +++ b/src/iceberg/util/macros.h @@ -42,8 +42,25 @@ ICEBERG_ASSIGN_OR_RAISE_IMPL(ICEBERG_ASSIGN_OR_RAISE_NAME(result_, __COUNTER__), lhs, \ rexpr) +// Macro for debug checks #define ICEBERG_DCHECK(expr, message) assert((expr) && (message)) +// Macro for precondition checks, usually used for function arguments +#define ICEBERG_PRECHECK(expr, ...) \ + do { \ + if (!(expr)) [[unlikely]] { \ + return InvalidArgument(__VA_ARGS__); \ + } \ + } while (0) + +// Macro for state checks, usually used for unexpected states +#define ICEBERG_CHECK(expr, ...) \ + do { \ + if (!(expr)) [[unlikely]] { \ + return Invalid(__VA_ARGS__); \ + } \ + } while (0) + #define ERROR_TO_EXCEPTION(error) \ if (error.kind == iceberg::ErrorKind::kInvalidExpression) { \ throw iceberg::ExpressionError(error.message); \ diff --git a/src/iceberg/util/snapshot_util.cc b/src/iceberg/util/snapshot_util.cc index a105e6437..1243a1093 100644 --- a/src/iceberg/util/snapshot_util.cc +++ b/src/iceberg/util/snapshot_util.cc @@ -29,33 +29,34 @@ namespace iceberg { -#define ICEBERG_RETURN_NULLOPT_IF_NOT_FOUND(result) \ +// Shorthand to return for a NotFound error. +#define ICEBERG_ACTION_FOR_NOT_FOUND(result, action) \ if (!result.has_value()) [[unlikely]] { \ if (result.error().kind == ErrorKind::kNotFound) { \ - return std::nullopt; \ + action; \ } \ return std::unexpected(result.error()); \ } Result>> SnapshotUtil::AncestorsOf( const Table& table, int64_t snapshot_id) { - ICEBERG_ASSIGN_OR_RAISE(auto start, table.SnapshotById(snapshot_id)); - ICEBERG_DCHECK(start, "Snapshot is null"); - return AncestorsOf(table, start); + return table.SnapshotById(snapshot_id).and_then([&table](const auto& snapshot) { + return AncestorsOf(table, snapshot); + }); } Result SnapshotUtil::IsAncestorOf(const Table& table, int64_t snapshot_id, int64_t ancestor_snapshot_id) { ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id)); return std::ranges::any_of(ancestors, [ancestor_snapshot_id](const auto& snapshot) { - return snapshot->snapshot_id == ancestor_snapshot_id; + return snapshot != nullptr && snapshot->snapshot_id == ancestor_snapshot_id; }); } Result SnapshotUtil::IsAncestorOf(const Table& table, int64_t ancestor_snapshot_id) { ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot()); - ICEBERG_DCHECK(current, "Current snapshot is null"); + ICEBERG_CHECK(current != nullptr, "Current snapshot is null"); return IsAncestorOf(table, current->snapshot_id, ancestor_snapshot_id); } @@ -64,7 +65,7 @@ Result SnapshotUtil::IsParentAncestorOf(const Table& table, int64_t snapsh ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id)); return std::ranges::any_of( ancestors, [ancestor_parent_snapshot_id](const auto& snapshot) { - return snapshot->parent_snapshot_id.has_value() && + return snapshot != nullptr && snapshot->parent_snapshot_id.has_value() && snapshot->parent_snapshot_id.value() == ancestor_parent_snapshot_id; }); } @@ -72,18 +73,12 @@ Result SnapshotUtil::IsParentAncestorOf(const Table& table, int64_t snapsh Result>> SnapshotUtil::CurrentAncestors( const Table& table) { auto current_result = table.current_snapshot(); - if (!current_result.has_value()) { - if (current_result.error().kind == ErrorKind::kNotFound) { - return std::vector>(); - } - return std::unexpected(current_result.error()); - } + ICEBERG_ACTION_FOR_NOT_FOUND(current_result, return {}); return AncestorsOf(table, current_result.value()); } Result> SnapshotUtil::CurrentAncestorIds(const Table& table) { - ICEBERG_ASSIGN_OR_RAISE(auto ancestors, CurrentAncestors(table)); - return ToIds(ancestors); + return CurrentAncestors(table).and_then(ToIds); } Result>> SnapshotUtil::OldestAncestor( @@ -107,9 +102,8 @@ Result>> SnapshotUtil::OldestAncestorOf( Result>> SnapshotUtil::OldestAncestorAfter( const Table& table, TimePointMs timestamp_ms) { auto current_result = table.current_snapshot(); - ICEBERG_RETURN_NULLOPT_IF_NOT_FOUND(current_result); - auto current = current_result.value(); - ICEBERG_DCHECK(current, "Current snapshot is null"); + ICEBERG_ACTION_FOR_NOT_FOUND(current_result, { return std::nullopt; }); + auto current = std::move(current_result.value()); std::optional> last_snapshot = std::nullopt; ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, current)); @@ -123,22 +117,20 @@ Result>> SnapshotUtil::OldestAncestorAft last_snapshot = std::move(snapshot); } - if (last_snapshot.has_value() && + if (last_snapshot.has_value() && last_snapshot.value() != nullptr && !last_snapshot.value()->parent_snapshot_id.has_value()) { // this is the first snapshot in the table, return it return last_snapshot; } + // the first ancestor after the given time can't be determined return NotFound("Cannot find snapshot older than {}", FormatTimestamp(timestamp_ms)); } Result> SnapshotUtil::SnapshotIdsBetween(const Table& table, int64_t from_snapshot_id, int64_t to_snapshot_id) { - ICEBERG_ASSIGN_OR_RAISE(auto to_snapshot, table.SnapshotById(to_snapshot_id)); - ICEBERG_DCHECK(to_snapshot, "Snapshot is null"); - - // Create a lookup function that returns null when snapshot_id equals from_snapshot_id + // Create a lookup function that returns null when snapshot_id equals from_snapshot_id. // This effectively stops traversal at from_snapshot_id (exclusive) auto lookup = [&table, from_snapshot_id](int64_t id) -> Result> { @@ -148,37 +140,36 @@ Result> SnapshotUtil::SnapshotIdsBetween(const Table& table return table.SnapshotById(id); }; - ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(to_snapshot, lookup)); - return ToIds(ancestors); + return table.SnapshotById(to_snapshot_id) + .and_then( + [&lookup](const auto& to_snapshot) { return AncestorsOf(to_snapshot, lookup); }) + .and_then(ToIds); } Result> SnapshotUtil::AncestorIdsBetween( const Table& table, int64_t latest_snapshot_id, const std::optional& oldest_snapshot_id) { - ICEBERG_ASSIGN_OR_RAISE( - auto ancestors, AncestorsBetween(table, latest_snapshot_id, oldest_snapshot_id)); - return ToIds(ancestors); + return AncestorsBetween(table, latest_snapshot_id, oldest_snapshot_id).and_then(ToIds); } Result>> SnapshotUtil::AncestorsBetween( const Table& table, int64_t latest_snapshot_id, - const std::optional& oldest_snapshot_id) { + std::optional oldest_snapshot_id) { ICEBERG_ASSIGN_OR_RAISE(auto start, table.SnapshotById(latest_snapshot_id)); - ICEBERG_DCHECK(start, "Snapshot is null"); if (oldest_snapshot_id.has_value()) { if (latest_snapshot_id == oldest_snapshot_id.value()) { - return std::vector>(); + return {}; } - auto lookup = [&table, oldest_snapshot_id = oldest_snapshot_id.value()]( - int64_t id) -> Result> { - if (id == oldest_snapshot_id) { - return nullptr; - } - return table.SnapshotById(id); - }; - return AncestorsOf(start, lookup); + return AncestorsOf(start, + [&table, oldest_snapshot_id = oldest_snapshot_id.value()]( + int64_t id) -> Result> { + if (id == oldest_snapshot_id) { + return nullptr; + } + return table.SnapshotById(id); + }); } else { return AncestorsOf(table, start); } @@ -186,54 +177,47 @@ Result>> SnapshotUtil::AncestorsBetween( Result>> SnapshotUtil::AncestorsOf( const Table& table, const std::shared_ptr& snapshot) { - auto lookup = [&table](int64_t id) -> Result> { - return table.SnapshotById(id); - }; - return AncestorsOf(snapshot, lookup); + return AncestorsOf(snapshot, [&table](int64_t id) { return table.SnapshotById(id); }); } Result>> SnapshotUtil::AncestorsOf( const std::shared_ptr& snapshot, const std::function>(int64_t)>& lookup) { - std::vector> result; - ICEBERG_DCHECK(snapshot, "Snapshot is null"); + ICEBERG_PRECHECK(snapshot != nullptr, "Snapshot is null"); std::shared_ptr current = snapshot; - while (current) { + std::vector> result; + + while (current != nullptr) { result.push_back(current); if (!current->parent_snapshot_id.has_value()) { break; } auto parent_result = lookup(current->parent_snapshot_id.value()); - if (!parent_result.has_value()) { - if (parent_result.error().kind == ErrorKind::kNotFound) { - // Parent snapshot not found (e.g., expired), stop traversal - break; - } - return std::unexpected(parent_result.error()); - } + ICEBERG_ACTION_FOR_NOT_FOUND(parent_result, { break; }); current = std::move(parent_result.value()); } return result; } -std::vector SnapshotUtil::ToIds( +Result> SnapshotUtil::ToIds( const std::vector>& snapshots) { - return snapshots | std::ranges::views::transform([](const auto& snapshot) { - return snapshot->snapshot_id; - }) | + return snapshots | + std::views::filter([](const auto& snapshot) { return snapshot != nullptr; }) | + std::views::transform( + [](const auto& snapshot) { return snapshot->snapshot_id; }) | std::ranges::to>(); } Result> SnapshotUtil::SnapshotAfter(const Table& table, int64_t snapshot_id) { ICEBERG_ASSIGN_OR_RAISE(auto parent, table.SnapshotById(snapshot_id)); - ICEBERG_DCHECK(parent, "Parent snapshot is null"); + ICEBERG_CHECK(parent != nullptr, "Snapshot is null for id {}", snapshot_id); ICEBERG_ASSIGN_OR_RAISE(auto ancestors, CurrentAncestors(table)); for (const auto& current : ancestors) { - if (current->parent_snapshot_id.has_value() && + if (current != nullptr && current->parent_snapshot_id.has_value() && current->parent_snapshot_id.value() == snapshot_id) { return current; } @@ -246,19 +230,16 @@ Result> SnapshotUtil::SnapshotAfter(const Table& table Result SnapshotUtil::SnapshotIdAsOfTime(const Table& table, TimePointMs timestamp_ms) { - auto snapshot_id = NullableSnapshotIdAsOfTime(table, timestamp_ms); - if (!snapshot_id) { - return ValidationFailed("Cannot find a snapshot older than {}", - FormatTimestamp(timestamp_ms)); - } - return *snapshot_id; + auto snapshot_id = OptionalSnapshotIdAsOfTime(table, timestamp_ms); + ICEBERG_CHECK(snapshot_id.has_value(), "Cannot find a snapshot older than {}", + FormatTimestamp(timestamp_ms)); + return snapshot_id.value(); } -std::optional SnapshotUtil::NullableSnapshotIdAsOfTime( +std::optional SnapshotUtil::OptionalSnapshotIdAsOfTime( const Table& table, TimePointMs timestamp_ms) { std::optional snapshot_id = std::nullopt; - const auto& history = table.history(); - for (const auto& log_entry : history) { + for (const auto& log_entry : table.history()) { if (log_entry.timestamp_ms <= timestamp_ms) { snapshot_id = log_entry.snapshot_id; } @@ -269,25 +250,21 @@ std::optional SnapshotUtil::NullableSnapshotIdAsOfTime( Result> SnapshotUtil::SchemaFor(const Table& table, int64_t snapshot_id) { ICEBERG_ASSIGN_OR_RAISE(auto snapshot, table.SnapshotById(snapshot_id)); - ICEBERG_DCHECK(snapshot, "Snapshot is null"); + ICEBERG_CHECK(snapshot, "Snapshot is null for id {}", snapshot_id); if (snapshot->schema_id.has_value()) { - ICEBERG_ASSIGN_OR_RAISE(auto schemas, table.schemas()); - auto it = schemas.get().find(snapshot->schema_id.value()); - if (it == schemas.get().end()) { - return ValidationFailed("Cannot find schema with schema id {}", - snapshot->schema_id.value()); - } - return it->second; + return table.metadata()->SchemaById(snapshot->schema_id.value()); } + // TODO(any): recover the schema by reading previous metadata files return table.schema(); } Result> SnapshotUtil::SchemaFor(const Table& table, TimePointMs timestamp_ms) { - ICEBERG_ASSIGN_OR_RAISE(auto id, SnapshotIdAsOfTime(table, timestamp_ms)); - return SchemaFor(table, id); + return SnapshotIdAsOfTime(table, timestamp_ms).and_then([&table](int64_t id) { + return SchemaFor(table, id); + }); } Result> SnapshotUtil::SchemaFor(const Table& table, @@ -298,7 +275,7 @@ Result> SnapshotUtil::SchemaFor(const Table& table, const auto& metadata = table.metadata(); auto it = metadata->refs.find(ref); - if (it == metadata->refs.end() || it->second->type() == SnapshotRefType::kBranch) { + if (it == metadata->refs.cend() || it->second->type() == SnapshotRefType::kBranch) { return table.schema(); } @@ -324,29 +301,23 @@ Result> SnapshotUtil::SchemaFor(const TableMetadata& met return metadata.SchemaById(snapshot->schema_id); } -Result>> SnapshotUtil::LatestSnapshot( +Result> SnapshotUtil::LatestSnapshot( const Table& table, const std::string& branch) { return LatestSnapshot(*table.metadata(), branch); } -Result>> SnapshotUtil::LatestSnapshot( +Result> SnapshotUtil::LatestSnapshot( const TableMetadata& metadata, const std::string& branch) { if (branch.empty() || branch == SnapshotRef::kMainBranch) { - auto snapshot_result = metadata.Snapshot(); - ICEBERG_RETURN_NULLOPT_IF_NOT_FOUND(snapshot_result); - return snapshot_result.value(); + return metadata.Snapshot(); } auto it = metadata.refs.find(branch); if (it == metadata.refs.end()) { - auto snapshot_result = metadata.Snapshot(); - ICEBERG_RETURN_NULLOPT_IF_NOT_FOUND(snapshot_result); - return snapshot_result.value(); + return metadata.Snapshot(); } - auto snapshot_result = metadata.SnapshotById(it->second->snapshot_id); - ICEBERG_RETURN_NULLOPT_IF_NOT_FOUND(snapshot_result); - return snapshot_result.value(); + return metadata.SnapshotById(it->second->snapshot_id); } } // namespace iceberg diff --git a/src/iceberg/util/snapshot_util_internal.h b/src/iceberg/util/snapshot_util_internal.h index 77e555c71..e0d8830ff 100644 --- a/src/iceberg/util/snapshot_util_internal.h +++ b/src/iceberg/util/snapshot_util_internal.h @@ -115,7 +115,8 @@ class ICEBERG_EXPORT SnapshotUtil { /// \param table The table /// \param timestamp_ms A timestamp in milliseconds /// \return The first snapshot after the given timestamp, or nullopt if the current - /// snapshot is older than the timestamp + /// snapshot is older than the timestamp. If the first ancestor after the given time + /// can't be determined, returns a NotFound error. static Result>> OldestAncestorAfter( const Table& table, TimePointMs timestamp_ms); @@ -151,7 +152,7 @@ class ICEBERG_EXPORT SnapshotUtil { /// \return A vector of ancestor snapshots between the two snapshots static Result>> AncestorsBetween( const Table& table, int64_t latest_snapshot_id, - const std::optional& oldest_snapshot_id); + std::optional oldest_snapshot_id); /// \brief Traverses the history of the table's current snapshot and finds the snapshot /// with the given snapshot id as its parent. @@ -175,7 +176,7 @@ class ICEBERG_EXPORT SnapshotUtil { /// \param table The table /// \param timestamp_ms The timestamp in millis since the Unix epoch /// \return The snapshot ID, or nullopt if not found - static std::optional NullableSnapshotIdAsOfTime(const Table& table, + static std::optional OptionalSnapshotIdAsOfTime(const Table& table, TimePointMs timestamp_ms); /// \brief Returns the schema of the table for the specified snapshot. @@ -227,8 +228,8 @@ class ICEBERG_EXPORT SnapshotUtil { /// \param table The table /// \param branch Branch name of the table (empty string means main branch) /// \return The latest snapshot for the given branch - static Result>> LatestSnapshot( - const Table& table, const std::string& branch); + static Result> LatestSnapshot(const Table& table, + const std::string& branch); /// \brief Fetch the snapshot at the head of the given branch in the given table. /// @@ -243,8 +244,8 @@ class ICEBERG_EXPORT SnapshotUtil { /// \param branch Branch name of the table metadata (empty string means main /// branch) /// \return The latest snapshot for the given branch - static Result>> LatestSnapshot( - const TableMetadata& metadata, const std::string& branch); + static Result> LatestSnapshot(const TableMetadata& metadata, + const std::string& branch); private: /// \brief Helper function to traverse ancestors of a snapshot. @@ -268,7 +269,7 @@ class ICEBERG_EXPORT SnapshotUtil { /// /// \param snapshots The snapshots /// \return A vector of snapshot IDs - static std::vector ToIds( + static Result> ToIds( const std::vector>& snapshots); }; diff --git a/src/iceberg/util/timepoint.cc b/src/iceberg/util/timepoint.cc index 37e2ad78c..a8bc77080 100644 --- a/src/iceberg/util/timepoint.cc +++ b/src/iceberg/util/timepoint.cc @@ -45,12 +45,11 @@ int64_t UnixNsFromTimePointNs(TimePointNs time_point_ns) { .count(); } -std::string FormatTimestamp(TimePointMs timestamp_ms) { +std::string FormatTimestamp(TimePointMs time_point_ns) { // Convert TimePointMs to system_clock::time_point - auto unix_ms = UnixMsFromTimePointMs(timestamp_ms); + auto unix_ms = UnixMsFromTimePointMs(time_point_ns); auto time_point = std::chrono::system_clock::time_point(std::chrono::milliseconds(unix_ms)); - auto time_t = std::chrono::system_clock::to_time_t(time_point); // Format as ISO 8601-like string: YYYY-MM-DD HH:MM:SS diff --git a/src/iceberg/util/timepoint.h b/src/iceberg/util/timepoint.h index 5a5c7edeb..48e630ae4 100644 --- a/src/iceberg/util/timepoint.h +++ b/src/iceberg/util/timepoint.h @@ -47,6 +47,6 @@ ICEBERG_EXPORT Result TimePointNsFromUnixNs(int64_t unix_ns); ICEBERG_EXPORT int64_t UnixNsFromTimePointNs(TimePointNs time_point_ns); /// \brief Returns a human-readable string representation of a TimePointMs -ICEBERG_EXPORT std::string FormatTimestamp(TimePointMs timestamp_ms); +ICEBERG_EXPORT std::string FormatTimestamp(TimePointMs time_point_ns); } // namespace iceberg diff --git a/src/iceberg/util/uuid.cc b/src/iceberg/util/uuid.cc index 14256755d..9322deb93 100644 --- a/src/iceberg/util/uuid.cc +++ b/src/iceberg/util/uuid.cc @@ -204,7 +204,7 @@ Result Uuid::FromBytes(std::span bytes) { } uint8_t Uuid::operator[](size_t index) const { - ICEBERG_CHECK(index < kLength, "UUID index out of range: {}", index); + ICEBERG_CHECK_OR_DIE(index < kLength, "UUID index out of range: {}", index); return data_[index]; }