From 4d944db49c16a53599bb7393c791699b031cba99 Mon Sep 17 00:00:00 2001 From: "shuxu.li" Date: Thu, 17 Jul 2025 11:01:56 +0800 Subject: [PATCH 1/2] feat: RegisterTable support for InMemoryCatalog --- src/iceberg/catalog.h | 3 +- src/iceberg/catalog/in_memory_catalog.cc | 227 +++++------------------ src/iceberg/catalog/in_memory_catalog.h | 17 +- test/CMakeLists.txt | 12 +- test/in_memory_catalog_test.cc | 63 ++++++- 5 files changed, 126 insertions(+), 196 deletions(-) diff --git a/src/iceberg/catalog.h b/src/iceberg/catalog.h index a882f4d61..1f3373577 100644 --- a/src/iceberg/catalog.h +++ b/src/iceberg/catalog.h @@ -166,8 +166,7 @@ class ICEBERG_EXPORT Catalog { /// \param identifier a table identifier /// \return instance of Table implementation referred to by identifier or /// ErrorKind::kNoSuchTable if the table does not exist - virtual Result> LoadTable( - const TableIdentifier& identifier) const = 0; + virtual Result> LoadTable(const TableIdentifier& identifier) = 0; /// \brief Register a table with the catalog if it does not exist /// diff --git a/src/iceberg/catalog/in_memory_catalog.cc b/src/iceberg/catalog/in_memory_catalog.cc index 67e2b0c3f..af6aa146f 100644 --- a/src/iceberg/catalog/in_memory_catalog.cc +++ b/src/iceberg/catalog/in_memory_catalog.cc @@ -21,17 +21,14 @@ #include #include // IWYU pragma: keep -#include -#include #include "iceberg/exception.h" #include "iceberg/table.h" +#include "iceberg/table_metadata.h" #include "iceberg/util/macros.h" namespace iceberg { -namespace { - /// \brief A hierarchical namespace that manages namespaces and table metadata in-memory. /// /// Each InMemoryNamespace represents a namespace level and can contain properties, @@ -317,117 +314,56 @@ Result InMemoryNamespace::GetTableMetadataLocation( return it->second; } -} // namespace - -class ICEBERG_EXPORT InMemoryCatalogImpl { - public: - InMemoryCatalogImpl(std::string name, std::shared_ptr file_io, - std::string warehouse_location, - std::unordered_map properties); - - std::string_view name() const; - - Status CreateNamespace(const Namespace& ns, - const std::unordered_map& properties); - - Result> ListNamespaces(const Namespace& ns) const; - - Status DropNamespace(const Namespace& ns); - - Result NamespaceExists(const Namespace& ns) const; - - Result> GetNamespaceProperties( - const Namespace& ns) const; - - Status UpdateNamespaceProperties( - const Namespace& ns, const std::unordered_map& updates, - const std::unordered_set& removals); - - Result> ListTables(const Namespace& ns) const; - - Result> CreateTable( - const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec, - const std::string& location, - const std::unordered_map& properties); - - Result> UpdateTable( - const TableIdentifier& identifier, - const std::vector>& requirements, - const std::vector>& updates); - - Result> StageCreateTable( - const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec, - const std::string& location, - const std::unordered_map& properties); - - Result TableExists(const TableIdentifier& identifier) const; - - Status DropTable(const TableIdentifier& identifier, bool purge); - - Result> LoadTable(const TableIdentifier& identifier) const; - - Result> RegisterTable(const TableIdentifier& identifier, - const std::string& metadata_file_location); - - std::unique_ptr BuildTable(const TableIdentifier& identifier, - const Schema& schema) const; - - private: - std::string catalog_name_; - std::unordered_map properties_; - std::shared_ptr file_io_; - std::string warehouse_location_; - std::unique_ptr root_namespace_; - mutable std::recursive_mutex mutex_; -}; - -InMemoryCatalogImpl::InMemoryCatalogImpl( - std::string name, std::shared_ptr file_io, std::string warehouse_location, - std::unordered_map properties) +InMemoryCatalog::InMemoryCatalog( + std::string const& name, std::shared_ptr const& file_io, + std::string const& warehouse_location, + std::unordered_map const& properties) : catalog_name_(std::move(name)), properties_(std::move(properties)), file_io_(std::move(file_io)), warehouse_location_(std::move(warehouse_location)), root_namespace_(std::make_unique()) {} -std::string_view InMemoryCatalogImpl::name() const { return catalog_name_; } +InMemoryCatalog::~InMemoryCatalog() = default; + +std::string_view InMemoryCatalog::name() const { return catalog_name_; } -Status InMemoryCatalogImpl::CreateNamespace( +Status InMemoryCatalog::CreateNamespace( const Namespace& ns, const std::unordered_map& properties) { std::unique_lock lock(mutex_); return root_namespace_->CreateNamespace(ns, properties); } -Result> InMemoryCatalogImpl::ListNamespaces( +Result> +InMemoryCatalog::GetNamespaceProperties(const Namespace& ns) const { + std::unique_lock lock(mutex_); + return root_namespace_->GetProperties(ns); +} + +Result> InMemoryCatalog::ListNamespaces( const Namespace& ns) const { std::unique_lock lock(mutex_); return root_namespace_->ListNamespaces(ns); } -Status InMemoryCatalogImpl::DropNamespace(const Namespace& ns) { +Status InMemoryCatalog::DropNamespace(const Namespace& ns) { std::unique_lock lock(mutex_); return root_namespace_->DropNamespace(ns); } -Result InMemoryCatalogImpl::NamespaceExists(const Namespace& ns) const { +Result InMemoryCatalog::NamespaceExists(const Namespace& ns) const { std::unique_lock lock(mutex_); return root_namespace_->NamespaceExists(ns); } -Result> -InMemoryCatalogImpl::GetNamespaceProperties(const Namespace& ns) const { - std::unique_lock lock(mutex_); - return root_namespace_->GetProperties(ns); -} - -Status InMemoryCatalogImpl::UpdateNamespaceProperties( +Status InMemoryCatalog::UpdateNamespaceProperties( const Namespace& ns, const std::unordered_map& updates, const std::unordered_set& removals) { std::unique_lock lock(mutex_); return root_namespace_->UpdateNamespaceProperties(ns, updates, removals); } -Result> InMemoryCatalogImpl::ListTables( +Result> InMemoryCatalog::ListTables( const Namespace& ns) const { std::unique_lock lock(mutex_); const auto& table_names = root_namespace_->ListTables(ns); @@ -440,44 +376,60 @@ Result> InMemoryCatalogImpl::ListTables( return table_idents; } -Result> InMemoryCatalogImpl::CreateTable( +Result> InMemoryCatalog::CreateTable( const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec, const std::string& location, const std::unordered_map& properties) { return NotImplemented("create table"); } -Result> InMemoryCatalogImpl::UpdateTable( +Result> InMemoryCatalog::UpdateTable( const TableIdentifier& identifier, const std::vector>& requirements, const std::vector>& updates) { return NotImplemented("update table"); } -Result> InMemoryCatalogImpl::StageCreateTable( +Result> InMemoryCatalog::StageCreateTable( const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec, const std::string& location, const std::unordered_map& properties) { return NotImplemented("stage create table"); } -Result InMemoryCatalogImpl::TableExists(const TableIdentifier& identifier) const { +Result InMemoryCatalog::TableExists(const TableIdentifier& identifier) const { std::unique_lock lock(mutex_); return root_namespace_->TableExists(identifier); } -Status InMemoryCatalogImpl::DropTable(const TableIdentifier& identifier, bool purge) { +Status InMemoryCatalog::DropTable(const TableIdentifier& identifier, bool purge) { std::unique_lock lock(mutex_); // TODO(Guotao): Delete all metadata files if purge is true. return root_namespace_->UnregisterTable(identifier); } -Result> InMemoryCatalogImpl::LoadTable( - const TableIdentifier& identifier) const { - return NotImplemented("load table"); +Result> InMemoryCatalog::LoadTable( + const TableIdentifier& identifier) { + if (!file_io_) [[unlikely]] { + return InvalidArgument("file_io is not set for catalog {}", catalog_name_); + } + + Result metadata_location; + { + std::unique_lock lock(mutex_); + ICEBERG_ASSIGN_OR_RAISE(metadata_location, + root_namespace_->GetTableMetadataLocation(identifier)); + } + + ICEBERG_ASSIGN_OR_RAISE(auto metadata, + TableMetadataUtil::Read(*file_io_, metadata_location.value())); + + return std::make_unique(identifier, std::move(metadata), + metadata_location.value(), file_io_, + std::static_pointer_cast(shared_from_this())); } -Result> InMemoryCatalogImpl::RegisterTable( +Result> InMemoryCatalog::RegisterTable( const TableIdentifier& identifier, const std::string& metadata_file_location) { std::unique_lock lock(mutex_); if (!root_namespace_->NamespaceExists(identifier.ns)) { @@ -489,95 +441,6 @@ Result> InMemoryCatalogImpl::RegisterTable( return LoadTable(identifier); } -std::unique_ptr InMemoryCatalogImpl::BuildTable( - const TableIdentifier& identifier, const Schema& schema) const { - throw IcebergError("not implemented"); -} - -InMemoryCatalog::InMemoryCatalog( - std::string const& name, std::shared_ptr const& file_io, - std::string const& warehouse_location, - std::unordered_map const& properties) - : impl_(std::make_unique(name, file_io, warehouse_location, - properties)) {} - -InMemoryCatalog::~InMemoryCatalog() = default; - -std::string_view InMemoryCatalog::name() const { return impl_->name(); } - -Status InMemoryCatalog::CreateNamespace( - const Namespace& ns, const std::unordered_map& properties) { - return impl_->CreateNamespace(ns, properties); -} - -Result> -InMemoryCatalog::GetNamespaceProperties(const Namespace& ns) const { - return impl_->GetNamespaceProperties(ns); -} - -Result> InMemoryCatalog::ListNamespaces( - const Namespace& ns) const { - return impl_->ListNamespaces(ns); -} - -Status InMemoryCatalog::DropNamespace(const Namespace& ns) { - return impl_->DropNamespace(ns); -} - -Result InMemoryCatalog::NamespaceExists(const Namespace& ns) const { - return impl_->NamespaceExists(ns); -} - -Status InMemoryCatalog::UpdateNamespaceProperties( - const Namespace& ns, const std::unordered_map& updates, - const std::unordered_set& removals) { - return impl_->UpdateNamespaceProperties(ns, updates, removals); -} - -Result> InMemoryCatalog::ListTables( - const Namespace& ns) const { - return impl_->ListTables(ns); -} - -Result> InMemoryCatalog::CreateTable( - const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec, - const std::string& location, - const std::unordered_map& properties) { - return impl_->CreateTable(identifier, schema, spec, location, properties); -} - -Result> InMemoryCatalog::UpdateTable( - const TableIdentifier& identifier, - const std::vector>& requirements, - const std::vector>& updates) { - return impl_->UpdateTable(identifier, requirements, updates); -} - -Result> InMemoryCatalog::StageCreateTable( - const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec, - const std::string& location, - const std::unordered_map& properties) { - return impl_->StageCreateTable(identifier, schema, spec, location, properties); -} - -Result InMemoryCatalog::TableExists(const TableIdentifier& identifier) const { - return impl_->TableExists(identifier); -} - -Status InMemoryCatalog::DropTable(const TableIdentifier& identifier, bool purge) { - return impl_->DropTable(identifier, purge); -} - -Result> InMemoryCatalog::LoadTable( - const TableIdentifier& identifier) const { - return impl_->LoadTable(identifier); -} - -Result> InMemoryCatalog::RegisterTable( - const TableIdentifier& identifier, const std::string& metadata_file_location) { - return impl_->RegisterTable(identifier, metadata_file_location); -} - std::unique_ptr InMemoryCatalog::BuildTable( const TableIdentifier& identifier, const Schema& schema) const { throw IcebergError("not implemented"); diff --git a/src/iceberg/catalog/in_memory_catalog.h b/src/iceberg/catalog/in_memory_catalog.h index c8e24b5db..e3da403e5 100644 --- a/src/iceberg/catalog/in_memory_catalog.h +++ b/src/iceberg/catalog/in_memory_catalog.h @@ -19,9 +19,12 @@ #pragma once +#include + #include "iceberg/catalog.h" namespace iceberg { + /** * @brief An in-memory implementation of the Iceberg Catalog interface. * @@ -32,7 +35,9 @@ namespace iceberg { * @note This class is **not** suitable for production use. * All data will be lost when the process exits. */ -class ICEBERG_EXPORT InMemoryCatalog : public Catalog { +class ICEBERG_EXPORT InMemoryCatalog + : public Catalog, + public std::enable_shared_from_this { public: InMemoryCatalog(std::string const& name, std::shared_ptr const& file_io, std::string const& warehouse_location, @@ -79,8 +84,7 @@ class ICEBERG_EXPORT InMemoryCatalog : public Catalog { Status DropTable(const TableIdentifier& identifier, bool purge) override; - Result> LoadTable( - const TableIdentifier& identifier) const override; + Result> LoadTable(const TableIdentifier& identifier) override; Result> RegisterTable( const TableIdentifier& identifier, @@ -90,7 +94,12 @@ class ICEBERG_EXPORT InMemoryCatalog : public Catalog { const Schema& schema) const override; private: - std::unique_ptr impl_; + std::string catalog_name_; + std::unordered_map properties_; + std::shared_ptr file_io_; + std::string warehouse_location_; + std::unique_ptr root_namespace_; + mutable std::recursive_mutex mutex_; }; } // namespace iceberg diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index a565cb64a..33d027cf0 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -44,11 +44,6 @@ target_sources(schema_test target_link_libraries(schema_test PRIVATE iceberg_static GTest::gtest_main GTest::gmock) add_test(NAME schema_test COMMAND schema_test) -add_executable(catalog_test) -target_sources(catalog_test PRIVATE in_memory_catalog_test.cc) -target_link_libraries(catalog_test PRIVATE iceberg_static GTest::gtest_main GTest::gmock) -add_test(NAME catalog_test COMMAND catalog_test) - add_executable(table_test) target_include_directories(table_test PRIVATE "${CMAKE_BINARY_DIR}") target_sources(table_test PRIVATE test_common.cc json_internal_test.cc table_test.cc @@ -89,4 +84,11 @@ if(ICEBERG_BUILD_BUNDLE) target_link_libraries(arrow_test PRIVATE iceberg_bundle_static GTest::gtest_main GTest::gmock) add_test(NAME arrow_test COMMAND arrow_test) + + add_executable(catalog_test) + target_include_directories(catalog_test PRIVATE "${CMAKE_BINARY_DIR}") + target_sources(catalog_test PRIVATE test_common.cc in_memory_catalog_test.cc) + target_link_libraries(catalog_test PRIVATE iceberg_bundle_static GTest::gtest_main + GTest::gmock) + add_test(NAME catalog_test COMMAND catalog_test) endif() diff --git a/test/in_memory_catalog_test.cc b/test/in_memory_catalog_test.cc index c76d78878..2ccad8910 100644 --- a/test/in_memory_catalog_test.cc +++ b/test/in_memory_catalog_test.cc @@ -19,24 +19,64 @@ #include "iceberg/catalog/in_memory_catalog.h" +#include + +#include #include #include +#include "iceberg/arrow/arrow_fs_file_io.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" #include "matchers.h" +#include "test_common.h" namespace iceberg { class InMemoryCatalogTest : public ::testing::Test { protected: void SetUp() override { - file_io_ = nullptr; // TODO(Guotao): A real FileIO instance needs to be constructed. + file_io_ = std::make_shared( + std::make_shared<::arrow::fs::LocalFileSystem>()); std::unordered_map properties = {{"prop1", "val1"}}; - catalog_ = std::make_unique("test_catalog", file_io_, + catalog_ = std::make_shared("test_catalog", file_io_, "/tmp/warehouse/", properties); } + void TearDown() override { + // Clean up the temporary files created for the table metadata + for (const auto& path : created_temp_paths_) { + std::error_code ec; + if (std::filesystem::is_directory(path, ec)) { + std::filesystem::remove_all(path, ec); + } else { + std::filesystem::remove(path, ec); + } + } + } + + std::string GenerateTestTableLocation(std::string table_name) { + std::filesystem::path temp_dir = std::filesystem::temp_directory_path(); + const auto info = ::testing::UnitTest::GetInstance()->current_test_info(); + auto table_location = std::format("{}iceberg_test_{}_{}/{}/", temp_dir.string(), + info->test_suite_name(), info->name(), table_name); + // generate a unique directory for the table + std::error_code ec; + std::filesystem::create_directories(table_location, ec); + if (ec) { + throw std::runtime_error( + std::format("Failed to create temporary directory: {}, error message: {}", + table_location, ec.message())); + } + + created_temp_paths_.push_back(table_location); + return table_location; + } + std::shared_ptr file_io_; - std::unique_ptr catalog_; + std::shared_ptr catalog_; + // Used to store temporary paths created during the test + std::vector created_temp_paths_; }; TEST_F(InMemoryCatalogTest, CatalogName) { @@ -58,6 +98,23 @@ TEST_F(InMemoryCatalogTest, TableExists) { EXPECT_THAT(result, HasValue(::testing::Eq(false))); } +TEST_F(InMemoryCatalogTest, RegisterTable) { + TableIdentifier tableIdent{.ns = {}, .name = "t1"}; + + std::unique_ptr metadata; + ASSERT_NO_FATAL_FAILURE(ReadTableMetadata("TableMetadataV2Valid.json", &metadata)); + + auto table_location = GenerateTestTableLocation(tableIdent.name); + auto metadata_location = std::format("{}v1.metadata.json", table_location); + auto status = TableMetadataUtil::Write(*file_io_, metadata_location, *metadata); + EXPECT_THAT(status, IsOk()); + + auto table = catalog_->RegisterTable(tableIdent, metadata_location); + EXPECT_THAT(table, IsOk()); + ASSERT_EQ(table.value()->name().name, "t1"); + ASSERT_EQ(table.value()->location(), "s3://bucket/test/location"); +} + TEST_F(InMemoryCatalogTest, DropTable) { TableIdentifier tableIdent{.ns = {}, .name = "t1"}; auto result = catalog_->DropTable(tableIdent, false); From 00b314859c5cc3d8d34142280ab45f79c9a7f908 Mon Sep 17 00:00:00 2001 From: "shuxu.li" Date: Thu, 17 Jul 2025 13:50:12 +0800 Subject: [PATCH 2/2] feat: RegisterTable support for InMemoryCatalog --- test/in_memory_catalog_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/in_memory_catalog_test.cc b/test/in_memory_catalog_test.cc index 2ccad8910..1fe598a2c 100644 --- a/test/in_memory_catalog_test.cc +++ b/test/in_memory_catalog_test.cc @@ -58,7 +58,7 @@ class InMemoryCatalogTest : public ::testing::Test { std::string GenerateTestTableLocation(std::string table_name) { std::filesystem::path temp_dir = std::filesystem::temp_directory_path(); const auto info = ::testing::UnitTest::GetInstance()->current_test_info(); - auto table_location = std::format("{}iceberg_test_{}_{}/{}/", temp_dir.string(), + auto table_location = std::format("{}/iceberg_test_{}_{}/{}/", temp_dir.string(), info->test_suite_name(), info->name(), table_name); // generate a unique directory for the table std::error_code ec;