From 828ad81ca5c479cd34dcca643bfa53945193a484 Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Mon, 14 Jul 2025 23:09:45 +0800 Subject: [PATCH 1/3] refactor: Add SchemaById and SnapshotById to TableMetadata --- src/iceberg/table.cc | 9 +-------- src/iceberg/table_metadata.cc | 21 +++++++++++++++------ src/iceberg/table_metadata.h | 5 +++++ src/iceberg/table_scan.cc | 30 +++++++----------------------- src/iceberg/table_scan.h | 2 +- 5 files changed, 29 insertions(+), 38 deletions(-) diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index ed533cbd5..3bbb3b824 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -90,14 +90,7 @@ Result> Table::current_snapshot() const { } Result> Table::SnapshotById(int64_t snapshot_id) const { - auto iter = std::ranges::find_if(metadata_->snapshots, - [this, &snapshot_id](const auto& snapshot) { - return snapshot->snapshot_id == snapshot_id; - }); - if (iter == metadata_->snapshots.end()) { - return NotFound("Snapshot with ID {} is not found", snapshot_id); - } - return *iter; + return metadata_->SnapshotById(snapshot_id); } const std::vector>& Table::snapshots() const { diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index b820517b2..e58d06aeb 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -47,11 +47,16 @@ std::string ToString(const MetadataLogEntry& entry) { } Result> TableMetadata::Schema() const { - auto iter = std::ranges::find_if(schemas, [this](const auto& schema) { - return schema->schema_id() == current_schema_id; + return SchemaById(current_schema_id); +} + +Result> TableMetadata::SchemaById( + const std::optional& schema_id) const { + auto iter = std::ranges::find_if(schemas, [schema_id](const auto& schema) { + return schema->schema_id() == schema_id; }); if (iter == schemas.end()) { - return NotFound("Current schema is not found"); + return NotFound("Schema with ID {} is not found", schema_id.value_or(-1)); } return *iter; } @@ -77,11 +82,15 @@ Result> TableMetadata::SortOrder() const { } Result> TableMetadata::Snapshot() const { - auto iter = std::ranges::find_if(snapshots, [this](const auto& snapshot) { - return snapshot->snapshot_id == current_snapshot_id; + return SnapshotById(current_snapshot_id); +} + +Result> TableMetadata::SnapshotById(int64_t snapshot_id) const { + auto iter = std::ranges::find_if(snapshots, [snapshot_id](const auto& snapshot) { + return snapshot->snapshot_id == snapshot_id; }); if (iter == snapshots.end()) { - return NotFound("Current snapshot with ID {} is not found", current_snapshot_id); + return NotFound("Snapshot with ID {} is not found", snapshot_id); } return *iter; } diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h index c34091aee..da4d677b3 100644 --- a/src/iceberg/table_metadata.h +++ b/src/iceberg/table_metadata.h @@ -123,12 +123,17 @@ struct ICEBERG_EXPORT TableMetadata { /// \brief Get the current schema, return NotFoundError if not found Result> Schema() const; + /// \brief Get the current schema by ID, return NotFoundError if not found + Result> SchemaById( + const std::optional& schema_id) const; /// \brief Get the current partition spec, return NotFoundError if not found Result> PartitionSpec() const; /// \brief Get the current sort order, return NotFoundError if not found Result> SortOrder() const; /// \brief Get the current snapshot, return NotFoundError if not found Result> Snapshot() const; + /// \brief Get the snapshot of this table with the given id + Result> SnapshotById(int64_t snapshot_id) const; friend bool operator==(const TableMetadata& lhs, const TableMetadata& rhs); }; diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 45539ef83..25702b6a4 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -34,8 +34,8 @@ namespace iceberg { // implement FileScanTask -FileScanTask::FileScanTask(std::shared_ptr file) - : data_file_(std::move(file)) {} +FileScanTask::FileScanTask(std::shared_ptr data_file) + : data_file_(std::move(data_file)) {} const std::shared_ptr& FileScanTask::data_file() const { return data_file_; } @@ -94,32 +94,13 @@ Result> TableScanBuilder::Build() { return InvalidArgument("No snapshot ID specified for table {}", table_metadata->table_uuid); } - auto iter = std::ranges::find_if( - table_metadata->snapshots, - [id = *snapshot_id](const auto& snapshot) { return snapshot->snapshot_id == id; }); - if (iter == table_metadata->snapshots.end()) { - return NotFound("Snapshot with ID {} is not found", *snapshot_id); - } - context_.snapshot = *iter; + ICEBERG_ASSIGN_OR_RAISE(context_.snapshot, table_metadata->SnapshotById(*snapshot_id)); if (!context_.projected_schema) { const auto& snapshot = context_.snapshot; auto schema_id = snapshot->schema_id ? snapshot->schema_id : table_metadata->current_schema_id; - if (!schema_id) { - return InvalidArgument("No schema ID found in snapshot {} for table {}", - snapshot->snapshot_id, table_metadata->table_uuid); - } - - const auto& schemas = table_metadata->schemas; - const auto it = std::ranges::find_if(schemas, [id = *schema_id](const auto& schema) { - return schema->schema_id() == id; - }); - if (it == schemas.end()) { - return InvalidArgument("Schema {} in snapshot {} is not found", - *snapshot->schema_id, snapshot->snapshot_id); - } - const auto& schema = *it; + ICEBERG_ASSIGN_OR_RAISE(auto schema, table_metadata->SchemaById(schema_id)); if (column_names_.empty()) { context_.projected_schema = schema; @@ -139,6 +120,9 @@ Result> TableScanBuilder::Build() { context_.projected_schema = std::make_shared(std::move(projected_fields), schema->schema_id()); } + } else if (!column_names_.empty()) { + return InvalidArgument( + "Cannot specify column names when a projected schema is provided"); } return std::make_unique(std::move(context_), file_io_); diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index 09dd0885d..dcfa72205 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -45,7 +45,7 @@ class ICEBERG_EXPORT ScanTask { /// \brief Task representing a data file and its corresponding delete files. class ICEBERG_EXPORT FileScanTask : public ScanTask { public: - explicit FileScanTask(std::shared_ptr file); + explicit FileScanTask(std::shared_ptr data_file); /// \brief The data file that should be read by this scan task. const std::shared_ptr& data_file() const; From 598768c8fac37b5411a2403b1248c421ab209615 Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Wed, 16 Jul 2025 09:47:29 +0800 Subject: [PATCH 2/3] add test --- test/metadata_serde_test.cc | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/test/metadata_serde_test.cc b/test/metadata_serde_test.cc index 9070503c0..0bb8e928c 100644 --- a/test/metadata_serde_test.cc +++ b/test/metadata_serde_test.cc @@ -106,12 +106,21 @@ TEST_F(MetadataSerdeTest, DeserializeV2Valid) { /*optional=*/false); schema_fields.emplace_back(/*field_id=*/3, "z", iceberg::int64(), /*optional=*/false); - auto expected_schema = - std::make_shared(std::move(schema_fields), /*schema_id=*/1); + auto expected_schema = std::make_shared(schema_fields, /*schema_id=*/1); auto schema = metadata->Schema(); ASSERT_TRUE(schema.has_value()); EXPECT_EQ(*(schema.value().get()), *expected_schema); + auto schema_v1 = metadata->SchemaById(1); + ASSERT_TRUE(schema_v1.has_value()); + EXPECT_EQ(*(schema_v1.value().get()), *expected_schema); + + auto expected_schema_v0 = std::make_shared( + std::vector{schema_fields.at(0)}, /*schema_id=*/0); + auto schema_v0 = metadata->SchemaById(0); + ASSERT_TRUE(schema_v0.has_value()); + EXPECT_EQ(*(schema_v0.value().get()), *expected_schema_v0); + // Compare partition spec EXPECT_EQ(metadata->default_spec_id, 0); std::vector partition_fields; @@ -164,6 +173,13 @@ TEST_F(MetadataSerdeTest, DeserializeV2Valid) { for (size_t i = 0; i < expected_snapshots.size(); ++i) { EXPECT_EQ(*metadata->snapshots[i], expected_snapshots[i]); } + auto snapshot_v0 = metadata->SnapshotById(3051729675574597004); + ASSERT_TRUE(snapshot_v0.has_value()); + EXPECT_EQ(*snapshot_v0.value(), expected_snapshots[0]); + + auto snapshot_v1 = metadata->SnapshotById(3055729675574597004); + ASSERT_TRUE(snapshot_v1.has_value()); + EXPECT_EQ(*snapshot_v1.value(), expected_snapshots[1]); // Compare snapshot logs std::vector expected_snapshot_log{ From 03240f094a64cba6e7f69e9f725d69aaff937949 Mon Sep 17 00:00:00 2001 From: Guotao Yu Date: Wed, 16 Jul 2025 10:02:58 +0800 Subject: [PATCH 3/3] retrigger build --- test/metadata_serde_test.cc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/test/metadata_serde_test.cc b/test/metadata_serde_test.cc index 0bb8e928c..73e5dd3a5 100644 --- a/test/metadata_serde_test.cc +++ b/test/metadata_serde_test.cc @@ -111,10 +111,12 @@ TEST_F(MetadataSerdeTest, DeserializeV2Valid) { ASSERT_TRUE(schema.has_value()); EXPECT_EQ(*(schema.value().get()), *expected_schema); + // schema with ID 1 auto schema_v1 = metadata->SchemaById(1); ASSERT_TRUE(schema_v1.has_value()); EXPECT_EQ(*(schema_v1.value().get()), *expected_schema); + // schema with ID 0 auto expected_schema_v0 = std::make_shared( std::vector{schema_fields.at(0)}, /*schema_id=*/0); auto schema_v0 = metadata->SchemaById(0); @@ -173,10 +175,13 @@ TEST_F(MetadataSerdeTest, DeserializeV2Valid) { for (size_t i = 0; i < expected_snapshots.size(); ++i) { EXPECT_EQ(*metadata->snapshots[i], expected_snapshots[i]); } + + // snapshot with ID 3051729675574597004 auto snapshot_v0 = metadata->SnapshotById(3051729675574597004); ASSERT_TRUE(snapshot_v0.has_value()); EXPECT_EQ(*snapshot_v0.value(), expected_snapshots[0]); + // snapshot with ID 3055729675574597004 auto snapshot_v1 = metadata->SnapshotById(3055729675574597004); ASSERT_TRUE(snapshot_v1.has_value()); EXPECT_EQ(*snapshot_v1.value(), expected_snapshots[1]);