diff --git a/src/iceberg/catalog/rest/CMakeLists.txt b/src/iceberg/catalog/rest/CMakeLists.txt index 38d897270..881b3d39a 100644 --- a/src/iceberg/catalog/rest/CMakeLists.txt +++ b/src/iceberg/catalog/rest/CMakeLists.txt @@ -15,7 +15,14 @@ # specific language governing permissions and limitations # under the License. -set(ICEBERG_REST_SOURCES rest_catalog.cc json_internal.cc) +set(ICEBERG_REST_SOURCES + rest_catalog.cc + catalog_properties.cc + error_handlers.cc + http_client.cc + json_internal.cc + resource_paths.cc + rest_util.cc) set(ICEBERG_REST_STATIC_BUILD_INTERFACE_LIBS) set(ICEBERG_REST_SHARED_BUILD_INTERFACE_LIBS) diff --git a/src/iceberg/catalog/rest/catalog_properties.cc b/src/iceberg/catalog/rest/catalog_properties.cc new file mode 100644 index 000000000..4d956837c --- /dev/null +++ b/src/iceberg/catalog/rest/catalog_properties.cc @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/catalog/rest/catalog_properties.h" + +#include + +namespace iceberg::rest { + +std::unique_ptr RestCatalogProperties::default_properties() { + return std::unique_ptr(new RestCatalogProperties()); +} + +std::unique_ptr RestCatalogProperties::FromMap( + const std::unordered_map& properties) { + auto rest_catalog_config = + std::unique_ptr(new RestCatalogProperties()); + rest_catalog_config->configs_ = properties; + return rest_catalog_config; +} + +std::unordered_map RestCatalogProperties::ExtractHeaders() + const { + return Extract(kHeaderPrefix); +} + +Result RestCatalogProperties::Uri() const { + auto it = configs_.find(kUri.key()); + if (it == configs_.end() || it->second.empty()) { + return InvalidArgument("Rest catalog configuration property 'uri' is required."); + } + return it->second; +} + +} // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/catalog_properties.h b/src/iceberg/catalog/rest/catalog_properties.h new file mode 100644 index 000000000..d351b50fc --- /dev/null +++ b/src/iceberg/catalog/rest/catalog_properties.h @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include + +#include "iceberg/catalog/rest/iceberg_rest_export.h" +#include "iceberg/result.h" +#include "iceberg/util/config.h" + +/// \file iceberg/catalog/rest/catalog_properties.h +/// \brief RestCatalogProperties implementation for Iceberg REST API. + +namespace iceberg::rest { + +/// \brief Configuration class for a REST Catalog. +class ICEBERG_REST_EXPORT RestCatalogProperties + : public ConfigBase { + public: + template + using Entry = const ConfigBase::Entry; + + /// \brief The URI of the REST catalog server. + inline static Entry kUri{"uri", ""}; + /// \brief The name of the catalog. + inline static Entry kName{"name", ""}; + /// \brief The warehouse path. + inline static Entry kWarehouse{"warehouse", ""}; + /// \brief The optional prefix for REST API paths. + inline static Entry kPrefix{"prefix", ""}; + /// \brief The prefix for HTTP headers. + inline static constexpr std::string_view kHeaderPrefix = "header."; + + /// \brief Create a default RestCatalogProperties instance. + static std::unique_ptr default_properties(); + + /// \brief Create a RestCatalogProperties instance from a map of key-value pairs. + static std::unique_ptr FromMap( + const std::unordered_map& properties); + + /// \brief Returns HTTP headers to be added to every request. + std::unordered_map ExtractHeaders() const; + + /// \brief Get the URI of the REST catalog server. + /// \return The URI if configured, or an error if not set or empty. + Result Uri() const; + + private: + RestCatalogProperties() = default; +}; + +} // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/constant.h b/src/iceberg/catalog/rest/constant.h new file mode 100644 index 000000000..0a6e8d0c0 --- /dev/null +++ b/src/iceberg/catalog/rest/constant.h @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include "iceberg/version.h" + +/// \file iceberg/catalog/rest/constant.h +/// Constant values for Iceberg REST API. + +namespace iceberg::rest { + +inline const std::string kHeaderContentType = "Content-Type"; +inline const std::string kHeaderAccept = "Accept"; +inline const std::string kHeaderXClientVersion = "X-Client-Version"; +inline const std::string kHeaderUserAgent = "User-Agent"; + +inline const std::string kMimeTypeApplicationJson = "application/json"; +inline const std::string kMimeTypeFormUrlEncoded = "application/x-www-form-urlencoded"; +inline const std::string kUserAgentPrefix = "iceberg-cpp/"; +inline const std::string kUserAgent = "iceberg-cpp/" ICEBERG_VERSION_STRING; + +inline const std::string kQueryParamParent = "parent"; +inline const std::string kQueryParamPageToken = "page_token"; + +} // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/error_handlers.cc b/src/iceberg/catalog/rest/error_handlers.cc new file mode 100644 index 000000000..8465c0018 --- /dev/null +++ b/src/iceberg/catalog/rest/error_handlers.cc @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/catalog/rest/error_handlers.h" + +#include + +#include "iceberg/catalog/rest/types.h" + +namespace iceberg::rest { + +namespace { + +constexpr std::string_view kIllegalArgumentException = "IllegalArgumentException"; +constexpr std::string_view kNoSuchNamespaceException = "NoSuchNamespaceException"; +constexpr std::string_view kNamespaceNotEmptyException = "NamespaceNotEmptyException"; + +} // namespace + +const std::shared_ptr& DefaultErrorHandler::Instance() { + static const std::shared_ptr instance{new DefaultErrorHandler()}; + return instance; +} + +Status DefaultErrorHandler::Accept(const ErrorModel& error) const { + switch (error.code) { + case 400: + if (error.type == kIllegalArgumentException) { + return InvalidArgument(error.message); + } + return BadRequest("Malformed request: {}", error.message); + case 401: + return NotAuthorized("Not authorized: {}", error.message); + case 403: + return Forbidden("Forbidden: {}", error.message); + case 405: + case 406: + break; + case 500: + return InternalServerError("Server error: {}: {}", error.type, error.message); + case 501: + return NotSupported(error.message); + case 503: + return ServiceUnavailable("Service unavailable: {}", error.message); + } + + return RestError("Code: {}, message: {}", error.code, error.message); +} + +const std::shared_ptr& NamespaceErrorHandler::Instance() { + static const std::shared_ptr instance{ + new NamespaceErrorHandler()}; + return instance; +} + +Status NamespaceErrorHandler::Accept(const ErrorModel& error) const { + switch (error.code) { + case 400: + if (error.type == kNamespaceNotEmptyException) { + return NamespaceNotEmpty(error.message); + } + return BadRequest("Malformed request: {}", error.message); + case 404: + return NoSuchNamespace(error.message); + case 409: + return AlreadyExists(error.message); + case 422: + return RestError("Unable to process: {}", error.message); + } + + return DefaultErrorHandler::Accept(error); +} + +const std::shared_ptr& DropNamespaceErrorHandler::Instance() { + static const std::shared_ptr instance{ + new DropNamespaceErrorHandler()}; + return instance; +} + +Status DropNamespaceErrorHandler::Accept(const ErrorModel& error) const { + if (error.code == 409) { + return NamespaceNotEmpty(error.message); + } + + return NamespaceErrorHandler::Accept(error); +} + +const std::shared_ptr& TableErrorHandler::Instance() { + static const std::shared_ptr instance{new TableErrorHandler()}; + return instance; +} + +Status TableErrorHandler::Accept(const ErrorModel& error) const { + switch (error.code) { + case 404: + if (error.type == kNoSuchNamespaceException) { + return NoSuchNamespace(error.message); + } + return NoSuchTable(error.message); + case 409: + return AlreadyExists(error.message); + } + + return DefaultErrorHandler::Accept(error); +} + +const std::shared_ptr& ViewErrorHandler::Instance() { + static const std::shared_ptr instance{new ViewErrorHandler()}; + return instance; +} + +Status ViewErrorHandler::Accept(const ErrorModel& error) const { + switch (error.code) { + case 404: + if (error.type == kNoSuchNamespaceException) { + return NoSuchNamespace(error.message); + } + return NoSuchView(error.message); + case 409: + return AlreadyExists(error.message); + } + + return DefaultErrorHandler::Accept(error); +} + +const std::shared_ptr& TableCommitErrorHandler::Instance() { + static const std::shared_ptr instance{ + new TableCommitErrorHandler()}; + return instance; +} + +Status TableCommitErrorHandler::Accept(const ErrorModel& error) const { + switch (error.code) { + case 404: + return NoSuchTable(error.message); + case 409: + return CommitFailed("Commit failed: {}", error.message); + case 500: + case 502: + case 503: + case 504: + return CommitStateUnknown("Service failed: {}: {}", error.code, error.message); + } + + return DefaultErrorHandler::Accept(error); +} + +const std::shared_ptr& ViewCommitErrorHandler::Instance() { + static const std::shared_ptr instance{ + new ViewCommitErrorHandler()}; + return instance; +} + +Status ViewCommitErrorHandler::Accept(const ErrorModel& error) const { + switch (error.code) { + case 404: + return NoSuchView(error.message); + case 409: + return CommitFailed("Commit failed: {}", error.message); + case 500: + case 502: + case 503: + case 504: + return CommitStateUnknown("Service failed: {}: {}", error.code, error.message); + } + + return DefaultErrorHandler::Accept(error); +} + +} // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/error_handlers.h b/src/iceberg/catalog/rest/error_handlers.h new file mode 100644 index 000000000..072d70442 --- /dev/null +++ b/src/iceberg/catalog/rest/error_handlers.h @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include "iceberg/catalog/rest/iceberg_rest_export.h" +#include "iceberg/catalog/rest/type_fwd.h" +#include "iceberg/result.h" + +/// \file iceberg/catalog/rest/error_handlers.h +/// Error handlers for different HTTP error types in Iceberg REST API. + +namespace iceberg::rest { + +/// \brief Error handler interface for processing REST API error responses. Maps HTTP +/// status codes to appropriate ErrorKind values following the Iceberg REST specification. +class ICEBERG_REST_EXPORT ErrorHandler { + public: + virtual ~ErrorHandler() = default; + + // TODO(Li Feiyang):removing ErrorModel as the inner layer and directly using + // ErrorResponse + + /// \brief Process an error response and return an appropriate Error. + /// + /// \param error The error model parsed from the HTTP response body + /// \return An Error object with appropriate ErrorKind and message + virtual Status Accept(const ErrorModel& error) const = 0; +}; + +/// \brief Default error handler for REST API responses. +class ICEBERG_REST_EXPORT DefaultErrorHandler : public ErrorHandler { + public: + /// \brief Returns the singleton instance + static const std::shared_ptr& Instance(); + + Status Accept(const ErrorModel& error) const override; + + protected: + constexpr DefaultErrorHandler() = default; +}; + +/// \brief Namespace-specific error handler for create/read/update operations. +class ICEBERG_REST_EXPORT NamespaceErrorHandler : public DefaultErrorHandler { + public: + /// \brief Returns the singleton instance + static const std::shared_ptr& Instance(); + + Status Accept(const ErrorModel& error) const override; + + protected: + constexpr NamespaceErrorHandler() = default; +}; + +/// \brief Error handler for drop namespace operations. +class ICEBERG_REST_EXPORT DropNamespaceErrorHandler : public NamespaceErrorHandler { + public: + /// \brief Returns the singleton instance + static const std::shared_ptr& Instance(); + + Status Accept(const ErrorModel& error) const override; + + private: + constexpr DropNamespaceErrorHandler() = default; +}; + +/// \brief Table-level error handler. +class ICEBERG_REST_EXPORT TableErrorHandler : public DefaultErrorHandler { + public: + /// \brief Returns the singleton instance + static const std::shared_ptr& Instance(); + + Status Accept(const ErrorModel& error) const override; + + private: + constexpr TableErrorHandler() = default; +}; + +/// \brief View-level error handler. +class ICEBERG_REST_EXPORT ViewErrorHandler : public DefaultErrorHandler { + public: + /// \brief Returns the singleton instance + static const std::shared_ptr& Instance(); + + Status Accept(const ErrorModel& error) const override; + + private: + constexpr ViewErrorHandler() = default; +}; + +/// \brief Table commit operation error handler. +class ICEBERG_REST_EXPORT TableCommitErrorHandler : public DefaultErrorHandler { + public: + /// \brief Returns the singleton instance + static const std::shared_ptr& Instance(); + + Status Accept(const ErrorModel& error) const override; + + private: + constexpr TableCommitErrorHandler() = default; +}; + +/// \brief View commit operation error handler. +class ICEBERG_REST_EXPORT ViewCommitErrorHandler : public DefaultErrorHandler { + public: + /// \brief Returns the singleton instance + static const std::shared_ptr& Instance(); + + Status Accept(const ErrorModel& error) const override; + + private: + constexpr ViewCommitErrorHandler() = default; +}; + +} // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/http_client.cc b/src/iceberg/catalog/rest/http_client.cc new file mode 100644 index 000000000..1b026c66e --- /dev/null +++ b/src/iceberg/catalog/rest/http_client.cc @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/catalog/rest/http_client.h" + +#include +#include + +#include "iceberg/catalog/rest/constant.h" +#include "iceberg/catalog/rest/error_handlers.h" +#include "iceberg/catalog/rest/json_internal.h" +#include "iceberg/json_internal.h" +#include "iceberg/result.h" +#include "iceberg/util/macros.h" + +namespace iceberg::rest { + +class HttpResponse::Impl { + public: + explicit Impl(cpr::Response&& response) : response_(std::move(response)) {} + ~Impl() = default; + + int32_t status_code() const { return static_cast(response_.status_code); } + + std::string body() const { return response_.text; } + + std::unordered_map headers() const { + return {response_.header.begin(), response_.header.end()}; + } + + private: + cpr::Response response_; +}; + +HttpResponse::HttpResponse() = default; +HttpResponse::~HttpResponse() = default; +HttpResponse::HttpResponse(HttpResponse&&) noexcept = default; +HttpResponse& HttpResponse::operator=(HttpResponse&&) noexcept = default; + +int32_t HttpResponse::status_code() const { return impl_->status_code(); } + +std::string HttpResponse::body() const { return impl_->body(); } + +std::unordered_map HttpResponse::headers() const { + return impl_->headers(); +} + +namespace { + +/// \brief Merges global default headers with request-specific headers. +/// +/// Combines the global headers derived from RestCatalogProperties with the headers +/// passed in the specific request. Request-specific headers have higher priority +/// and will override global defaults if the keys conflict (e.g., overriding +/// the default "Content-Type"). +cpr::Header MergeHeaders(const std::unordered_map& defaults, + const std::unordered_map& overrides) { + cpr::Header combined_headers = {defaults.begin(), defaults.end()}; + for (const auto& [key, val] : overrides) { + combined_headers.insert_or_assign(key, val); + } + return combined_headers; +} + +/// \brief Converts a map of string key-value pairs to cpr::Parameters. +cpr::Parameters GetParameters( + const std::unordered_map& params) { + cpr::Parameters cpr_params; + for (const auto& [key, val] : params) { + cpr_params.Add({key, val}); + } + return cpr_params; +} + +/// \brief Checks if the HTTP status code indicates a successful response. +bool IsSuccessful(int32_t status_code) { + return status_code == 200 // OK + || status_code == 202 // Accepted + || status_code == 204 // No Content + || status_code == 304; // Not Modified +} + +/// \brief Handles failure responses by invoking the provided error handler. +Status HandleFailureResponse(const cpr::Response& response, + const ErrorHandler& error_handler) { + if (!IsSuccessful(response.status_code)) { + // TODO(gangwu): response status code is lost, wrap it with RestError. + ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.text)); + ICEBERG_ASSIGN_OR_RAISE(auto error_response, ErrorResponseFromJson(json)); + return error_handler.Accept(error_response.error); + } + return {}; +} + +} // namespace + +void HttpClient::PrepareSession( + const std::string& path, + const std::unordered_map& request_headers, + const std::unordered_map& params) { + session_->SetUrl(cpr::Url{path}); + session_->SetParameters(GetParameters(params)); + session_->RemoveContent(); + auto final_headers = MergeHeaders(default_headers_, request_headers); + session_->SetHeader(final_headers); +} + +HttpClient::HttpClient(std::unordered_map default_headers) + : default_headers_{std::move(default_headers)}, + session_{std::make_unique()} { + // Set default Content-Type for all requests (including GET/HEAD/DELETE). + // Many systems require that content type is set regardless and will fail, + // even on an empty bodied request. + default_headers_[kHeaderContentType] = kMimeTypeApplicationJson; + default_headers_[kHeaderUserAgent] = kUserAgent; +} + +HttpClient::~HttpClient() = default; + +Result HttpClient::Get( + const std::string& path, const std::unordered_map& params, + const std::unordered_map& headers, + const ErrorHandler& error_handler) { + cpr::Response response; + { + std::scoped_lock lock(session_mutex_); + PrepareSession(path, headers, params); + response = session_->Get(); + } + + ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler)); + HttpResponse http_response; + http_response.impl_ = std::make_unique(std::move(response)); + return http_response; +} + +Result HttpClient::Post( + const std::string& path, const std::string& body, + const std::unordered_map& headers, + const ErrorHandler& error_handler) { + cpr::Response response; + { + std::scoped_lock lock(session_mutex_); + PrepareSession(path, headers); + session_->SetBody(cpr::Body{body}); + response = session_->Post(); + } + + ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler)); + HttpResponse http_response; + http_response.impl_ = std::make_unique(std::move(response)); + return http_response; +} + +Result HttpClient::PostForm( + const std::string& path, + const std::unordered_map& form_data, + const std::unordered_map& headers, + const ErrorHandler& error_handler) { + cpr::Response response; + + { + std::scoped_lock lock(session_mutex_); + + // Override default Content-Type (application/json) with form-urlencoded + auto form_headers = headers; + form_headers[kHeaderContentType] = kMimeTypeFormUrlEncoded; + + PrepareSession(path, form_headers); + std::vector pair_list; + pair_list.reserve(form_data.size()); + for (const auto& [key, val] : form_data) { + pair_list.emplace_back(key, val); + } + session_->SetPayload(cpr::Payload(pair_list.begin(), pair_list.end())); + + response = session_->Post(); + } + + ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler)); + HttpResponse http_response; + http_response.impl_ = std::make_unique(std::move(response)); + return http_response; +} + +Result HttpClient::Head( + const std::string& path, const std::unordered_map& headers, + const ErrorHandler& error_handler) { + cpr::Response response; + { + std::scoped_lock lock(session_mutex_); + PrepareSession(path, headers); + response = session_->Head(); + } + + ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler)); + HttpResponse http_response; + http_response.impl_ = std::make_unique(std::move(response)); + return http_response; +} + +Result HttpClient::Delete( + const std::string& path, const std::unordered_map& headers, + const ErrorHandler& error_handler) { + cpr::Response response; + { + std::scoped_lock lock(session_mutex_); + PrepareSession(path, headers); + response = session_->Delete(); + } + + ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler)); + HttpResponse http_response; + http_response.impl_ = std::make_unique(std::move(response)); + return http_response; +} + +} // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/http_client.h b/src/iceberg/catalog/rest/http_client.h new file mode 100644 index 000000000..56c9f2902 --- /dev/null +++ b/src/iceberg/catalog/rest/http_client.h @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "iceberg/catalog/rest/iceberg_rest_export.h" +#include "iceberg/catalog/rest/type_fwd.h" +#include "iceberg/result.h" + +/// \file iceberg/catalog/rest/http_client.h +/// \brief Http client for Iceberg REST API. + +namespace cpr { +class Session; +} // namespace cpr + +namespace iceberg::rest { + +/// \brief A simple wrapper for cpr::Response. +/// +/// This class encapsulates the details of the underlying cpr library's response, +/// providing a consistent interface that is independent of the specific network +/// library used. +class ICEBERG_REST_EXPORT HttpResponse { + public: + HttpResponse(); + ~HttpResponse(); + + HttpResponse(const HttpResponse&) = delete; + HttpResponse& operator=(const HttpResponse&) = delete; + HttpResponse(HttpResponse&&) noexcept; + HttpResponse& operator=(HttpResponse&&) noexcept; + + /// \brief Get the HTTP status code of the response. + int32_t status_code() const; + + /// \brief Get the body of the response as a string. + std::string body() const; + + /// \brief Get the headers of the response as a map. + std::unordered_map headers() const; + + private: + friend class HttpClient; + class Impl; + std::unique_ptr impl_; +}; + +/// \brief HTTP client for making requests to Iceberg REST Catalog API. +class ICEBERG_REST_EXPORT HttpClient { + public: + explicit HttpClient(std::unordered_map default_headers = {}); + ~HttpClient(); + + HttpClient(const HttpClient&) = delete; + HttpClient& operator=(const HttpClient&) = delete; + HttpClient(HttpClient&&) = delete; + HttpClient& operator=(HttpClient&&) = delete; + + /// \brief Sends a GET request. + Result Get(const std::string& path, + const std::unordered_map& params, + const std::unordered_map& headers, + const ErrorHandler& error_handler); + + /// \brief Sends a POST request. + Result Post(const std::string& path, const std::string& body, + const std::unordered_map& headers, + const ErrorHandler& error_handler); + + /// \brief Sends a POST request with form data. + Result PostForm( + const std::string& path, + const std::unordered_map& form_data, + const std::unordered_map& headers, + const ErrorHandler& error_handler); + + /// \brief Sends a HEAD request. + Result Head(const std::string& path, + const std::unordered_map& headers, + const ErrorHandler& error_handler); + + /// \brief Sends a DELETE request. + Result Delete(const std::string& path, + const std::unordered_map& headers, + const ErrorHandler& error_handler); + + private: + void PrepareSession(const std::string& path, + const std::unordered_map& request_headers, + const std::unordered_map& params = {}); + + std::unordered_map default_headers_; + + // TODO(Li Feiyang): use connection pool to support external multi-threaded concurrent + // calls + std::unique_ptr session_; + mutable std::mutex session_mutex_; +}; + +} // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/meson.build b/src/iceberg/catalog/rest/meson.build index 5f1f635ab..89a68850e 100644 --- a/src/iceberg/catalog/rest/meson.build +++ b/src/iceberg/catalog/rest/meson.build @@ -15,7 +15,15 @@ # specific language governing permissions and limitations # under the License. -iceberg_rest_sources = files('json_internal.cc', 'rest_catalog.cc') +iceberg_rest_sources = files( + 'catalog_properties.cc', + 'error_handlers.cc', + 'http_client.cc', + 'json_internal.cc', + 'resource_paths.cc', + 'rest_catalog.cc', + 'rest_util.cc', +) # cpr does not export symbols, so on Windows it must # be used as a static lib cpr_needs_static = ( @@ -46,4 +54,19 @@ iceberg_rest_dep = declare_dependency( meson.override_dependency('iceberg-rest', iceberg_rest_dep) pkg.generate(iceberg_rest_lib) -install_headers(['rest_catalog.h', 'types.h'], subdir: 'iceberg/catalog/rest') +install_headers( + [ + 'catalog_properties.h', + 'constant.h', + 'error_handlers.h', + 'http_client.h', + 'iceberg_rest_export.h', + 'json_internal.h', + 'resource_paths.h', + 'rest_catalog.h', + 'rest_util.h', + 'type_fwd.h', + 'types.h', + ], + subdir: 'iceberg/catalog/rest', +) diff --git a/src/iceberg/catalog/rest/resource_paths.cc b/src/iceberg/catalog/rest/resource_paths.cc new file mode 100644 index 000000000..c81467c4e --- /dev/null +++ b/src/iceberg/catalog/rest/resource_paths.cc @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/catalog/rest/resource_paths.h" + +#include + +#include "iceberg/catalog/rest/rest_util.h" +#include "iceberg/table_identifier.h" +#include "iceberg/util/macros.h" + +namespace iceberg::rest { + +Result> ResourcePaths::Make(std::string base_uri, + const std::string& prefix) { + if (base_uri.empty()) { + return InvalidArgument("Base URI is empty"); + } + return std::unique_ptr(new ResourcePaths(std::move(base_uri), prefix)); +} + +ResourcePaths::ResourcePaths(std::string base_uri, const std::string& prefix) + : base_uri_(std::move(base_uri)), prefix_(prefix.empty() ? "" : (prefix + "/")) {} + +Status ResourcePaths::SetBaseUri(const std::string& base_uri) { + if (base_uri.empty()) { + return InvalidArgument("Base URI is empty"); + } + base_uri_ = base_uri; + return {}; +} + +Result ResourcePaths::Config() const { + return std::format("{}/v1/{}config", base_uri_, prefix_); +} + +Result ResourcePaths::OAuth2Tokens() const { + return std::format("{}/v1/{}oauth/tokens", base_uri_, prefix_); +} + +Result ResourcePaths::Namespaces() const { + return std::format("{}/v1/{}namespaces", base_uri_, prefix_); +} + +Result ResourcePaths::Namespace_(const Namespace& ns) const { + ICEBERG_ASSIGN_OR_RAISE(std::string encoded_namespace, EncodeNamespace(ns)); + return std::format("{}/v1/{}namespaces/{}", base_uri_, prefix_, encoded_namespace); +} + +Result ResourcePaths::NamespaceProperties(const Namespace& ns) const { + ICEBERG_ASSIGN_OR_RAISE(std::string encoded_namespace, EncodeNamespace(ns)); + return std::format("{}/v1/{}namespaces/{}/properties", base_uri_, prefix_, + encoded_namespace); +} + +Result ResourcePaths::Tables(const Namespace& ns) const { + ICEBERG_ASSIGN_OR_RAISE(std::string encoded_namespace, EncodeNamespace(ns)); + return std::format("{}/v1/{}namespaces/{}/tables", base_uri_, prefix_, + encoded_namespace); +} + +Result ResourcePaths::Table(const TableIdentifier& ident) const { + ICEBERG_ASSIGN_OR_RAISE(std::string encoded_namespace, EncodeNamespace(ident.ns)); + ICEBERG_ASSIGN_OR_RAISE(std::string encoded_table_name, EncodeString(ident.name)); + return std::format("{}/v1/{}namespaces/{}/tables/{}", base_uri_, prefix_, + encoded_namespace, encoded_table_name); +} + +Result ResourcePaths::Register(const Namespace& ns) const { + ICEBERG_ASSIGN_OR_RAISE(std::string encoded_namespace, EncodeNamespace(ns)); + return std::format("{}/v1/{}namespaces/{}/register", base_uri_, prefix_, + encoded_namespace); +} + +Result ResourcePaths::Rename() const { + return std::format("{}/v1/{}tables/rename", base_uri_, prefix_); +} + +Result ResourcePaths::Metrics(const TableIdentifier& ident) const { + ICEBERG_ASSIGN_OR_RAISE(std::string encoded_namespace, EncodeNamespace(ident.ns)); + ICEBERG_ASSIGN_OR_RAISE(std::string encoded_table_name, EncodeString(ident.name)); + return std::format("{}/v1/{}namespaces/{}/tables/{}/metrics", base_uri_, prefix_, + encoded_namespace, encoded_table_name); +} + +Result ResourcePaths::Credentials(const TableIdentifier& ident) const { + ICEBERG_ASSIGN_OR_RAISE(std::string encoded_namespace, EncodeNamespace(ident.ns)); + ICEBERG_ASSIGN_OR_RAISE(std::string encoded_table_name, EncodeString(ident.name)); + return std::format("{}/v1/{}namespaces/{}/tables/{}/credentials", base_uri_, prefix_, + encoded_namespace, encoded_table_name); +} + +Result ResourcePaths::CommitTransaction() const { + return std::format("{}/v1/{}transactions/commit", base_uri_, prefix_); +} + +} // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/resource_paths.h b/src/iceberg/catalog/rest/resource_paths.h new file mode 100644 index 000000000..9d0bdda63 --- /dev/null +++ b/src/iceberg/catalog/rest/resource_paths.h @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "iceberg/catalog/rest/iceberg_rest_export.h" +#include "iceberg/catalog/rest/type_fwd.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +/// \file iceberg/catalog/rest/resource_paths.h +/// \brief Resource path construction for Iceberg REST API endpoints. + +namespace iceberg::rest { + +/// \brief Resource path builder for Iceberg REST catalog endpoints. +/// +/// This class constructs REST API endpoint URLs for various catalog operations. +class ICEBERG_REST_EXPORT ResourcePaths { + public: + /// \brief Construct a ResourcePaths with base URI and optional prefix. + /// \param base_uri The base URI of the REST catalog server (without trailing slash) + /// \param prefix Optional prefix for REST API paths (default: empty) + /// \return A unique_ptr to ResourcePaths instance + static Result> Make(std::string base_uri, + const std::string& prefix); + + /// \brief Set the base URI of the REST catalog server. + Status SetBaseUri(const std::string& base_uri); + + /// \brief Get the /v1/{prefix}/config endpoint path. + Result Config() const; + + /// \brief Get the /v1/{prefix}/oauth/tokens endpoint path. + Result OAuth2Tokens() const; + + /// \brief Get the /v1/{prefix}/namespaces endpoint path. + Result Namespaces() const; + + /// \brief Get the /v1/{prefix}/namespaces/{namespace} endpoint path. + Result Namespace_(const Namespace& ns) const; + + /// \brief Get the /v1/{prefix}/namespaces/{namespace}/properties endpoint path. + Result NamespaceProperties(const Namespace& ns) const; + + /// \brief Get the /v1/{prefix}/namespaces/{namespace}/tables endpoint path. + Result Tables(const Namespace& ns) const; + + /// \brief Get the /v1/{prefix}/namespaces/{namespace}/tables/{table} endpoint path. + Result Table(const TableIdentifier& ident) const; + + /// \brief Get the /v1/{prefix}/namespaces/{namespace}/register endpoint path. + Result Register(const Namespace& ns) const; + + /// \brief Get the /v1/{prefix}/tables/rename endpoint path. + Result Rename() const; + + /// \brief Get the /v1/{prefix}/namespaces/{namespace}/tables/{table}/metrics endpoint + /// path. + Result Metrics(const TableIdentifier& ident) const; + + /// \brief Get the /v1/{prefix}/namespaces/{namespace}/tables/{table}/credentials + /// endpoint path. + Result Credentials(const TableIdentifier& ident) const; + + /// \brief Get the /v1/{prefix}/transactions/commit endpoint path. + Result CommitTransaction() const; + + private: + ResourcePaths(std::string base_uri, const std::string& prefix); + + std::string base_uri_; // required + const std::string prefix_; // optional +}; + +} // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/rest_catalog.cc b/src/iceberg/catalog/rest/rest_catalog.cc index cd008e9b2..dff52e2af 100644 --- a/src/iceberg/catalog/rest/rest_catalog.cc +++ b/src/iceberg/catalog/rest/rest_catalog.cc @@ -19,26 +19,185 @@ #include "iceberg/catalog/rest/rest_catalog.h" +#include +#include #include -#include +#include -#include "iceberg/catalog/rest/types.h" +#include "iceberg/catalog/rest/catalog_properties.h" +#include "iceberg/catalog/rest/constant.h" +#include "iceberg/catalog/rest/error_handlers.h" +#include "iceberg/catalog/rest/http_client.h" +#include "iceberg/catalog/rest/json_internal.h" +#include "iceberg/catalog/rest/resource_paths.h" +#include "iceberg/catalog/rest/rest_catalog.h" +#include "iceberg/catalog/rest/rest_util.h" +#include "iceberg/json_internal.h" +#include "iceberg/result.h" +#include "iceberg/table.h" +#include "iceberg/util/macros.h" + +namespace iceberg::rest { + +namespace { + +// Fetch server config and merge it with client config +Result> FetchConfig( + const ResourcePaths& paths, const RestCatalogProperties& config) { + ICEBERG_ASSIGN_OR_RAISE(auto config_endpoint, paths.Config()); + HttpClient client(config.ExtractHeaders()); + ICEBERG_ASSIGN_OR_RAISE(const auto response, + client.Get(config_endpoint, /*params=*/{}, /*headers=*/{}, + *DefaultErrorHandler::Instance())); + ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body())); + ICEBERG_ASSIGN_OR_RAISE(auto server_config, CatalogConfigFromJson(json)); + + // Merge priorities: server overrides > client config > server defaults + return RestCatalogProperties::FromMap( + MergeConfigs(server_config.overrides, config.configs(), server_config.defaults)); +} + +} // namespace + +RestCatalog::~RestCatalog() = default; + +Result> RestCatalog::Make( + const RestCatalogProperties& config) { + ICEBERG_ASSIGN_OR_RAISE(auto uri, config.Uri()); + ICEBERG_ASSIGN_OR_RAISE( + auto paths, ResourcePaths::Make(std::string(TrimTrailingSlash(uri)), + config.Get(RestCatalogProperties::kPrefix))); + ICEBERG_ASSIGN_OR_RAISE(auto final_config, FetchConfig(*paths, config)); + + // Update resource paths based on the final config + ICEBERG_ASSIGN_OR_RAISE(auto final_uri, final_config->Uri()); + ICEBERG_RETURN_UNEXPECTED(paths->SetBaseUri(std::string(TrimTrailingSlash(final_uri)))); + + return std::unique_ptr( + new RestCatalog(std::move(final_config), std::move(paths))); +} + +RestCatalog::RestCatalog(std::unique_ptr config, + std::unique_ptr paths) + : config_(std::move(config)), + client_(std::make_unique(config_->ExtractHeaders())), + paths_(std::move(paths)), + name_(config_->Get(RestCatalogProperties::kName)) {} + +std::string_view RestCatalog::name() const { return name_; } + +Result> RestCatalog::ListNamespaces(const Namespace& ns) const { + ICEBERG_ASSIGN_OR_RAISE(auto endpoint, paths_->Namespaces()); + std::vector result; + std::string next_token; + while (true) { + std::unordered_map params; + if (!ns.levels.empty()) { + ICEBERG_ASSIGN_OR_RAISE(params[kQueryParamParent], EncodeNamespace(ns)); + } + if (!next_token.empty()) { + params[kQueryParamPageToken] = next_token; + } + ICEBERG_ASSIGN_OR_RAISE(const auto& response, + client_->Get(endpoint, params, /*headers=*/{}, + *NamespaceErrorHandler::Instance())); + ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body())); + ICEBERG_ASSIGN_OR_RAISE(auto list_response, ListNamespacesResponseFromJson(json)); + result.insert(result.end(), list_response.namespaces.begin(), + list_response.namespaces.end()); + if (list_response.next_page_token.empty()) { + return result; + } + next_token = list_response.next_page_token; + } + return result; +} + +Status RestCatalog::CreateNamespace( + [[maybe_unused]] const Namespace& ns, + [[maybe_unused]] const std::unordered_map& properties) { + return NotImplemented("Not implemented"); +} + +Result> RestCatalog::GetNamespaceProperties( + [[maybe_unused]] const Namespace& ns) const { + return NotImplemented("Not implemented"); +} -namespace iceberg::catalog::rest { +Status RestCatalog::DropNamespace([[maybe_unused]] const Namespace& ns) { + return NotImplemented("Not implemented"); +} + +Result RestCatalog::NamespaceExists([[maybe_unused]] const Namespace& ns) const { + return NotImplemented("Not implemented"); +} + +Status RestCatalog::UpdateNamespaceProperties( + [[maybe_unused]] const Namespace& ns, + [[maybe_unused]] const std::unordered_map& updates, + [[maybe_unused]] const std::unordered_set& removals) { + return NotImplemented("Not implemented"); +} + +Result> RestCatalog::ListTables( + [[maybe_unused]] const Namespace& ns) const { + return NotImplemented("Not implemented"); +} + +Result> RestCatalog::CreateTable( + [[maybe_unused]] const TableIdentifier& identifier, + [[maybe_unused]] const Schema& schema, [[maybe_unused]] const PartitionSpec& spec, + [[maybe_unused]] const std::string& location, + [[maybe_unused]] const std::unordered_map& properties) { + return NotImplemented("Not implemented"); +} + +Result> RestCatalog::UpdateTable( + [[maybe_unused]] const TableIdentifier& identifier, + [[maybe_unused]] const std::vector>& requirements, + [[maybe_unused]] const std::vector>& updates) { + return NotImplemented("Not implemented"); +} + +Result> RestCatalog::StageCreateTable( + [[maybe_unused]] const TableIdentifier& identifier, + [[maybe_unused]] const Schema& schema, [[maybe_unused]] const PartitionSpec& spec, + [[maybe_unused]] const std::string& location, + [[maybe_unused]] const std::unordered_map& properties) { + return NotImplemented("Not implemented"); +} + +Status RestCatalog::DropTable([[maybe_unused]] const TableIdentifier& identifier, + [[maybe_unused]] bool purge) { + return NotImplemented("Not implemented"); +} -RestCatalog::RestCatalog(const std::string& base_url) : base_url_(std::move(base_url)) {} +Result RestCatalog::TableExists( + [[maybe_unused]] const TableIdentifier& identifier) const { + return NotImplemented("Not implemented"); +} + +Status RestCatalog::RenameTable([[maybe_unused]] const TableIdentifier& from, + [[maybe_unused]] const TableIdentifier& to) { + return NotImplemented("Not implemented"); +} + +Result> RestCatalog::LoadTable( + [[maybe_unused]] const TableIdentifier& identifier) { + return NotImplemented("Not implemented"); +} -cpr::Response RestCatalog::GetConfig() { - cpr::Url url = cpr::Url{base_url_ + "/v1/config"}; - cpr::Response r = cpr::Get(url); - return r; +Result> RestCatalog::RegisterTable( + [[maybe_unused]] const TableIdentifier& identifier, + [[maybe_unused]] const std::string& metadata_file_location) { + return NotImplemented("Not implemented"); } -cpr::Response RestCatalog::ListNamespaces() { - cpr::Url url = cpr::Url{base_url_ + "/v1/namespaces"}; - cpr::Response r = cpr::Get(url); - return r; +std::unique_ptr RestCatalog::BuildTable( + [[maybe_unused]] const TableIdentifier& identifier, + [[maybe_unused]] const Schema& schema) const { + return nullptr; } -} // namespace iceberg::catalog::rest +} // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/rest_catalog.h b/src/iceberg/catalog/rest/rest_catalog.h index 7b3e205c1..84ab2b9c8 100644 --- a/src/iceberg/catalog/rest/rest_catalog.h +++ b/src/iceberg/catalog/rest/rest_catalog.h @@ -1,41 +1,112 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ #pragma once +#include #include -#include - +#include "iceberg/catalog.h" #include "iceberg/catalog/rest/iceberg_rest_export.h" +#include "iceberg/catalog/rest/type_fwd.h" +#include "iceberg/result.h" + +/// \file iceberg/catalog/rest/rest_catalog.h +/// RestCatalog implementation for Iceberg REST API. -namespace iceberg::catalog::rest { +namespace iceberg::rest { -class ICEBERG_REST_EXPORT RestCatalog { +/// \brief Rest catalog implementation. +class ICEBERG_REST_EXPORT RestCatalog : public Catalog { public: - explicit RestCatalog(const std::string& base_url); - ~RestCatalog() = default; + ~RestCatalog() override; + + RestCatalog(const RestCatalog&) = delete; + RestCatalog& operator=(const RestCatalog&) = delete; + RestCatalog(RestCatalog&&) = delete; + RestCatalog& operator=(RestCatalog&&) = delete; + + /// \brief Create a RestCatalog instance + /// + /// \param config the configuration for the RestCatalog + /// \return a unique_ptr to RestCatalog instance + static Result> Make(const RestCatalogProperties& config); + + std::string_view name() const override; + + Result> ListNamespaces(const Namespace& ns) const override; + + Status CreateNamespace( + const Namespace& ns, + const std::unordered_map& properties) override; + + Result> GetNamespaceProperties( + const Namespace& ns) const override; + + Status DropNamespace(const Namespace& ns) override; + + Result NamespaceExists(const Namespace& ns) const override; - cpr::Response GetConfig(); + Status UpdateNamespaceProperties( + const Namespace& ns, const std::unordered_map& updates, + const std::unordered_set& removals) override; - cpr::Response ListNamespaces(); + Result> ListTables(const Namespace& ns) const override; + + Result> CreateTable( + const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec, + const std::string& location, + const std::unordered_map& properties) override; + + Result> UpdateTable( + const TableIdentifier& identifier, + const std::vector>& requirements, + const std::vector>& updates) override; + + Result> StageCreateTable( + const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec, + const std::string& location, + const std::unordered_map& properties) override; + + Result TableExists(const TableIdentifier& identifier) const override; + + Status RenameTable(const TableIdentifier& from, const TableIdentifier& to) override; + + Status DropTable(const TableIdentifier& identifier, bool purge) override; + + Result> LoadTable(const TableIdentifier& identifier) override; + + Result> RegisterTable( + const TableIdentifier& identifier, + const std::string& metadata_file_location) override; + + std::unique_ptr BuildTable( + const TableIdentifier& identifier, const Schema& schema) const override; private: - std::string base_url_; + RestCatalog(std::unique_ptr config, + std::unique_ptr paths); + + std::unique_ptr config_; + std::unique_ptr client_; + std::unique_ptr paths_; + std::string name_; }; -} // namespace iceberg::catalog::rest +} // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/rest_util.cc b/src/iceberg/catalog/rest/rest_util.cc new file mode 100644 index 000000000..5a0f166d5 --- /dev/null +++ b/src/iceberg/catalog/rest/rest_util.cc @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/catalog/rest/rest_util.h" + +#include + +#include "iceberg/table_identifier.h" +#include "iceberg/util/macros.h" + +namespace iceberg::rest { + +namespace { +const std::string kNamespaceEscapeSeparator = "%1F"; +} + +std::string_view TrimTrailingSlash(std::string_view str) { + while (!str.empty() && str.back() == '/') { + str.remove_suffix(1); + } + return str; +} + +Result EncodeString(std::string_view str_to_encode) { + if (str_to_encode.empty()) { + return ""; + } + + // Use CPR's urlEncode which internally calls libcurl's curl_easy_escape() + cpr::util::SecureString encoded = cpr::util::urlEncode(str_to_encode); + if (encoded.empty()) { + return InvalidArgument("Failed to encode string '{}'", str_to_encode); + } + + return std::string{encoded.data(), encoded.size()}; +} + +Result DecodeString(std::string_view str_to_decode) { + if (str_to_decode.empty()) { + return ""; + } + + // Use CPR's urlDecode which internally calls libcurl's curl_easy_unescape() + cpr::util::SecureString decoded = cpr::util::urlDecode(str_to_decode); + if (decoded.empty()) { + return InvalidArgument("Failed to decode string '{}'", str_to_decode); + } + + return std::string{decoded.data(), decoded.size()}; +} + +Result EncodeNamespace(const Namespace& ns_to_encode) { + if (ns_to_encode.levels.empty()) { + return ""; + } + + ICEBERG_ASSIGN_OR_RAISE(std::string result, EncodeString(ns_to_encode.levels.front())); + + for (size_t i = 1; i < ns_to_encode.levels.size(); ++i) { + ICEBERG_ASSIGN_OR_RAISE(std::string encoded_level, + EncodeString(ns_to_encode.levels[i])); + result.append(kNamespaceEscapeSeparator); + result.append(std::move(encoded_level)); + } + + return result; +} + +Result DecodeNamespace(std::string_view str_to_decode) { + if (str_to_decode.empty()) { + return Namespace{.levels = {}}; + } + + Namespace ns{}; + std::string::size_type start = 0; + std::string::size_type end = str_to_decode.find(kNamespaceEscapeSeparator); + + while (end != std::string::npos) { + ICEBERG_ASSIGN_OR_RAISE(std::string decoded_level, + DecodeString(str_to_decode.substr(start, end - start))); + ns.levels.push_back(std::move(decoded_level)); + start = end + kNamespaceEscapeSeparator.size(); + end = str_to_decode.find(kNamespaceEscapeSeparator, start); + } + + ICEBERG_ASSIGN_OR_RAISE(std::string decoded_level, + DecodeString(str_to_decode.substr(start))); + ns.levels.push_back(std::move(decoded_level)); + return ns; +} + +std::unordered_map MergeConfigs( + const std::unordered_map& server_defaults, + const std::unordered_map& client_configs, + const std::unordered_map& server_overrides) { + // Merge with precedence: server_overrides > client_configs > server_defaults + auto merged = server_defaults; + for (const auto& [key, value] : client_configs) { + merged.insert_or_assign(key, value); + } + for (const auto& [key, value] : server_overrides) { + merged.insert_or_assign(key, value); + } + return merged; +} + +} // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/rest_util.h b/src/iceberg/catalog/rest/rest_util.h new file mode 100644 index 000000000..895bb2fb4 --- /dev/null +++ b/src/iceberg/catalog/rest/rest_util.h @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include + +#include "iceberg/catalog/rest/iceberg_rest_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg::rest { + +/// \brief Trim trailing slashes from a string. +/// +/// \param str A string to trim. +/// \return The trimmed string with all trailing slashes removed. +ICEBERG_REST_EXPORT std::string_view TrimTrailingSlash(std::string_view str); + +/// \brief URL-encode a string (RFC 3986). +/// +/// \details This implementation uses libcurl (via CPR), which follows RFC 3986 strictly: +/// - Unreserved characters: [A-Z], [a-z], [0-9], "-", "_", ".", "~" +/// - Space is encoded as "%20" (unlike Java's URLEncoder which uses "+"). +/// - All other characters are percent-encoded (%XX). +/// \param str_to_encode The string to encode. +/// \return The URL-encoded string or InvalidArgument if the string is invalid. +ICEBERG_REST_EXPORT Result EncodeString(std::string_view str_to_encode); + +/// \brief URL-decode a string. +/// +/// \details Decodes percent-encoded characters (e.g., "%20" -> space). Uses libcurl's URL +/// decoding via the CPR library. +/// \param str_to_decode The encoded string to decode. +/// \return The decoded string or InvalidArgument if the string is invalid. +ICEBERG_REST_EXPORT Result DecodeString(std::string_view str_to_decode); + +/// \brief Encode a Namespace into a URL-safe component. +/// +/// \details Encodes each level separately using EncodeString, then joins them with "%1F". +/// \param ns_to_encode The namespace to encode. +/// \return The percent-encoded namespace string suitable for URLs. +ICEBERG_REST_EXPORT Result EncodeNamespace(const Namespace& ns_to_encode); + +/// \brief Decode a URL-encoded namespace string back to a Namespace. +/// +/// \details Splits by "%1F" (the URL-encoded form of ASCII Unit Separator), then decodes +/// each level separately using DecodeString. +/// \param str_to_decode The percent-encoded namespace string. +/// \return The decoded Namespace. +ICEBERG_REST_EXPORT Result DecodeNamespace(std::string_view str_to_decode); + +/// \brief Merge catalog configuration properties. +/// +/// \details Merges three sets of configuration properties following the precedence order: +/// server overrides > client configs > server defaults. +/// \param server_defaults Default properties provided by the server. +/// \param client_configs Configuration properties from the client. +/// \param server_overrides Override properties enforced by the server. +/// \return A merged map containing all properties with correct precedence. +ICEBERG_REST_EXPORT std::unordered_map MergeConfigs( + const std::unordered_map& server_defaults, + const std::unordered_map& client_configs, + const std::unordered_map& server_overrides); + +} // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/type_fwd.h b/src/iceberg/catalog/rest/type_fwd.h new file mode 100644 index 000000000..082630f2d --- /dev/null +++ b/src/iceberg/catalog/rest/type_fwd.h @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/catalog/rest/type_fwd.h +/// Forward declarations and enum definitions for Iceberg REST API types. + +namespace iceberg::rest { + +struct ErrorModel; + +class ErrorHandler; +class HttpClient; +class ResourcePaths; +class RestCatalog; +class RestCatalogProperties; + +} // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/types.h b/src/iceberg/catalog/rest/types.h index 2e32f967f..0b589bdfd 100644 --- a/src/iceberg/catalog/rest/types.h +++ b/src/iceberg/catalog/rest/types.h @@ -19,8 +19,6 @@ #pragma once -#include -#include #include #include #include diff --git a/src/iceberg/result.h b/src/iceberg/result.h index 743473b10..ddc428a23 100644 --- a/src/iceberg/result.h +++ b/src/iceberg/result.h @@ -30,9 +30,12 @@ namespace iceberg { /// \brief Error types for iceberg. enum class ErrorKind { kAlreadyExists, + kBadRequest, kCommitFailed, kCommitStateUnknown, kDecompressError, + kForbidden, + kInternalServerError, kInvalid, // For general invalid errors kInvalidArgument, kInvalidArrowData, @@ -42,12 +45,17 @@ enum class ErrorKind { kInvalidSchema, kIOError, kJsonParseError, + kNamespaceNotEmpty, kNoSuchNamespace, kNoSuchTable, + kNoSuchView, kNotAllowed, + kNotAuthorized, kNotFound, kNotImplemented, kNotSupported, + kRestError, + kServiceUnavailable, kUnknownError, kValidationFailed, }; @@ -77,12 +85,18 @@ using Status = Result; -> std::unexpected { \ return std::unexpected( \ {ErrorKind::k##name, std::format(fmt, std::forward(args)...)}); \ + } \ + inline auto name(const std::string& message) -> std::unexpected { \ + return std::unexpected({ErrorKind::k##name, message}); \ } DEFINE_ERROR_FUNCTION(AlreadyExists) +DEFINE_ERROR_FUNCTION(BadRequest) DEFINE_ERROR_FUNCTION(CommitFailed) DEFINE_ERROR_FUNCTION(CommitStateUnknown) DEFINE_ERROR_FUNCTION(DecompressError) +DEFINE_ERROR_FUNCTION(Forbidden) +DEFINE_ERROR_FUNCTION(InternalServerError) DEFINE_ERROR_FUNCTION(Invalid) DEFINE_ERROR_FUNCTION(InvalidArgument) DEFINE_ERROR_FUNCTION(InvalidArrowData) @@ -92,12 +106,17 @@ DEFINE_ERROR_FUNCTION(InvalidManifestList) DEFINE_ERROR_FUNCTION(InvalidSchema) DEFINE_ERROR_FUNCTION(IOError) DEFINE_ERROR_FUNCTION(JsonParseError) +DEFINE_ERROR_FUNCTION(NamespaceNotEmpty) DEFINE_ERROR_FUNCTION(NoSuchNamespace) DEFINE_ERROR_FUNCTION(NoSuchTable) +DEFINE_ERROR_FUNCTION(NoSuchView) DEFINE_ERROR_FUNCTION(NotAllowed) +DEFINE_ERROR_FUNCTION(NotAuthorized) DEFINE_ERROR_FUNCTION(NotFound) DEFINE_ERROR_FUNCTION(NotImplemented) DEFINE_ERROR_FUNCTION(NotSupported) +DEFINE_ERROR_FUNCTION(RestError) +DEFINE_ERROR_FUNCTION(ServiceUnavailable) DEFINE_ERROR_FUNCTION(UnknownError) DEFINE_ERROR_FUNCTION(ValidationFailed) diff --git a/src/iceberg/table_identifier.h b/src/iceberg/table_identifier.h index b145e75f3..bef9b81dd 100644 --- a/src/iceberg/table_identifier.h +++ b/src/iceberg/table_identifier.h @@ -33,6 +33,8 @@ namespace iceberg { /// \brief A namespace in a catalog. struct ICEBERG_EXPORT Namespace { std::vector levels; + + bool operator==(const Namespace& other) const { return levels == other.levels; } }; /// \brief Identifies a table in iceberg catalog. @@ -40,6 +42,10 @@ struct ICEBERG_EXPORT TableIdentifier { Namespace ns; std::string name; + bool operator==(const TableIdentifier& other) const { + return ns == other.ns && name == other.name; + } + /// \brief Validates the TableIdentifier. Status Validate() const { if (name.empty()) { diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 25a03932d..c36d33da6 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -176,16 +176,17 @@ if(ICEBERG_BUILD_REST) add_executable(${test_name}) target_include_directories(${test_name} PRIVATE "${CMAKE_BINARY_DIR}/iceberg/test/") - target_sources(${test_name} PRIVATE ${ARG_SOURCES}) - target_link_libraries(${test_name} PRIVATE GTest::gtest_main GTest::gmock iceberg_rest_static) - add_test(NAME ${test_name} COMMAND ${test_name}) endfunction() - add_rest_iceberg_test(rest_catalog_test SOURCES rest_catalog_test.cc - rest_json_internal_test.cc) + add_rest_iceberg_test(rest_catalog_test + SOURCES + rest_catalog_test.cc + rest_json_internal_test.cc + rest_util_test.cc) + target_include_directories(rest_catalog_test PRIVATE ${cpp-httplib_SOURCE_DIR}) endif() diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index 4e5d30ad1..72b09a9ec 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -93,6 +93,7 @@ if get_option('rest').enabled() 'sources': files( 'rest_catalog_test.cc', 'rest_json_internal_test.cc', + 'rest_util_test.cc', ), 'dependencies': [iceberg_rest_dep, cpp_httplib_dep], }, diff --git a/src/iceberg/test/rest_catalog_test.cc b/src/iceberg/test/rest_catalog_test.cc index fda9ef6de..40befeedf 100644 --- a/src/iceberg/test/rest_catalog_test.cc +++ b/src/iceberg/test/rest_catalog_test.cc @@ -19,93 +19,152 @@ #include "iceberg/catalog/rest/rest_catalog.h" -#include - -#include -#include +#include +#include #include #include -#include -namespace iceberg::catalog::rest { +#include "iceberg/catalog/rest/catalog_properties.h" +#include "iceberg/table_identifier.h" +#include "iceberg/test/matchers.h" + +namespace iceberg::rest { -class RestCatalogIntegrationTest : public ::testing::Test { +// Test fixture for REST catalog tests, This assumes you have a local REST catalog service +// running Default configuration: http://localhost:8181. +class RestCatalogTest : public ::testing::Test { protected: void SetUp() override { - server_ = std::make_unique(); - port_ = server_->bind_to_any_port("127.0.0.1"); - - server_thread_ = std::thread([this]() { server_->listen_after_bind(); }); + // Default configuration for local testing + // You can override this with environment variables if needed + const char* uri_env = std::getenv("ICEBERG_REST_URI"); + const char* warehouse_env = std::getenv("ICEBERG_REST_WAREHOUSE"); + + std::string uri = uri_env ? uri_env : "http://localhost:8181"; + std::string warehouse = warehouse_env ? warehouse_env : "default"; + + config_ = RestCatalogProperties::default_properties(); + config_->Set(RestCatalogProperties::kUri, uri) + .Set(RestCatalogProperties::kName, std::string("test_catalog")) + .Set(RestCatalogProperties::kWarehouse, warehouse); } - void TearDown() override { - server_->stop(); - if (server_thread_.joinable()) { - server_thread_.join(); - } - } + void TearDown() override {} - std::unique_ptr server_; - int port_ = -1; - std::thread server_thread_; + std::unique_ptr config_; }; -TEST_F(RestCatalogIntegrationTest, DISABLED_GetConfigSuccessfully) { - server_->Get("/v1/config", [](const httplib::Request&, httplib::Response& res) { - res.status = 200; - res.set_content(R"({"warehouse": "s3://test-bucket"})", "application/json"); - }); +TEST_F(RestCatalogTest, DISABLED_MakeCatalogSuccess) { + auto catalog_result = RestCatalog::Make(*config_); + EXPECT_THAT(catalog_result, IsOk()); - std::string base_uri = "http://127.0.0.1:" + std::to_string(port_); - RestCatalog catalog(base_uri); - cpr::Response response = catalog.GetConfig(); + if (catalog_result.has_value()) { + auto& catalog = catalog_result.value(); + EXPECT_EQ(catalog->name(), "test_catalog"); + } +} - ASSERT_EQ(response.error.code, cpr::ErrorCode::OK); - ASSERT_EQ(response.status_code, 200); +TEST_F(RestCatalogTest, DISABLED_MakeCatalogEmptyUri) { + auto invalid_config = RestCatalogProperties::default_properties(); + invalid_config->Set(RestCatalogProperties::kUri, std::string("")); - auto json_body = nlohmann::json::parse(response.text); - EXPECT_EQ(json_body["warehouse"], "s3://test-bucket"); + auto catalog_result = RestCatalog::Make(*invalid_config); + EXPECT_THAT(catalog_result, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(catalog_result, HasErrorMessage("uri")); } -TEST_F(RestCatalogIntegrationTest, DISABLED_ListNamespacesReturnsMultipleResults) { - server_->Get("/v1/namespaces", [](const httplib::Request&, httplib::Response& res) { - res.status = 200; - res.set_content(R"({ - "namespaces": [ - ["accounting", "db"], - ["production", "db"] - ] - })", - "application/json"); - }); - - std::string base_uri = "http://127.0.0.1:" + std::to_string(port_); - RestCatalog catalog(base_uri); - cpr::Response response = catalog.ListNamespaces(); - - ASSERT_EQ(response.error.code, cpr::ErrorCode::OK); - ASSERT_EQ(response.status_code, 200); - - auto json_body = nlohmann::json::parse(response.text); - ASSERT_TRUE(json_body.contains("namespaces")); - EXPECT_EQ(json_body["namespaces"].size(), 2); - EXPECT_THAT(json_body["namespaces"][0][0], "accounting"); +TEST_F(RestCatalogTest, DISABLED_MakeCatalogWithCustomProperties) { + auto custom_config = RestCatalogProperties::default_properties(); + custom_config + ->Set(RestCatalogProperties::kUri, config_->Get(RestCatalogProperties::kUri)) + .Set(RestCatalogProperties::kName, config_->Get(RestCatalogProperties::kName)) + .Set(RestCatalogProperties::kWarehouse, + config_->Get(RestCatalogProperties::kWarehouse)) + .Set(RestCatalogProperties::Entry{"custom_prop", ""}, + std::string("custom_value")) + .Set(RestCatalogProperties::Entry{"timeout", ""}, + std::string("30000")); + + auto catalog_result = RestCatalog::Make(*custom_config); + EXPECT_THAT(catalog_result, IsOk()); } -TEST_F(RestCatalogIntegrationTest, DISABLED_HandlesServerError) { - server_->Get("/v1/config", [](const httplib::Request&, httplib::Response& res) { - res.status = 500; - res.set_content("Internal Server Error", "text/plain"); - }); +TEST_F(RestCatalogTest, DISABLED_ListNamespaces) { + auto catalog_result = RestCatalog::Make(*config_); + ASSERT_THAT(catalog_result, IsOk()); + auto& catalog = catalog_result.value(); - std::string base_uri = "http://127.0.0.1:" + std::to_string(port_); - RestCatalog catalog(base_uri); - cpr::Response response = catalog.GetConfig(); + Namespace ns{.levels = {}}; + auto result = catalog->ListNamespaces(ns); + EXPECT_THAT(result, IsOk()); + EXPECT_FALSE(result->empty()); + EXPECT_EQ(result->front().levels, (std::vector{"my_namespace_test2"})); +} + +TEST_F(RestCatalogTest, DISABLED_CreateNamespaceNotImplemented) { + auto catalog_result = RestCatalog::Make(*config_); + ASSERT_THAT(catalog_result, IsOk()); + auto catalog = std::move(catalog_result.value()); + + Namespace ns{.levels = {"test_namespace"}}; + std::unordered_map props = {{"owner", "test"}}; + + auto result = catalog->CreateNamespace(ns, props); + EXPECT_THAT(result, IsError(ErrorKind::kNotImplemented)); +} - ASSERT_EQ(response.error.code, cpr::ErrorCode::OK); - ASSERT_EQ(response.status_code, 500); - ASSERT_EQ(response.text, "Internal Server Error"); +TEST_F(RestCatalogTest, DISABLED_IntegrationTestFullNamespaceWorkflow) { + auto catalog_result = RestCatalog::Make(*config_); + ASSERT_THAT(catalog_result, IsOk()); + auto catalog = std::move(catalog_result.value()); + + // 1. List initial namespaces + Namespace root{.levels = {}}; + auto list_result1 = catalog->ListNamespaces(root); + ASSERT_THAT(list_result1, IsOk()); + size_t initial_count = list_result1->size(); + + // 2. Create a new namespace + Namespace test_ns{.levels = {"integration_test_ns"}}; + std::unordered_map props = { + {"owner", "test"}, {"created_by", "rest_catalog_test"}}; + auto create_result = catalog->CreateNamespace(test_ns, props); + EXPECT_THAT(create_result, IsOk()); + + // 3. Verify namespace exists + auto exists_result = catalog->NamespaceExists(test_ns); + EXPECT_THAT(exists_result, HasValue(::testing::Eq(true))); + + // 4. List namespaces again (should have one more) + auto list_result2 = catalog->ListNamespaces(root); + ASSERT_THAT(list_result2, IsOk()); + EXPECT_EQ(list_result2->size(), initial_count + 1); + + // 5. Get namespace properties + auto props_result = catalog->GetNamespaceProperties(test_ns); + ASSERT_THAT(props_result, IsOk()); + EXPECT_EQ((*props_result)["owner"], "test"); + + // 6. Update properties + std::unordered_map updates = { + {"description", "test namespace"}}; + std::unordered_set removals = {}; + auto update_result = catalog->UpdateNamespaceProperties(test_ns, updates, removals); + EXPECT_THAT(update_result, IsOk()); + + // 7. Verify updated properties + auto props_result2 = catalog->GetNamespaceProperties(test_ns); + ASSERT_THAT(props_result2, IsOk()); + EXPECT_EQ((*props_result2)["description"], "test namespace"); + + // 8. Drop the namespace (cleanup) + auto drop_result = catalog->DropNamespace(test_ns); + EXPECT_THAT(drop_result, IsOk()); + + // 9. Verify namespace no longer exists + auto exists_result2 = catalog->NamespaceExists(test_ns); + EXPECT_THAT(exists_result2, HasValue(::testing::Eq(false))); } -} // namespace iceberg::catalog::rest +} // namespace iceberg::rest diff --git a/src/iceberg/test/rest_util_test.cc b/src/iceberg/test/rest_util_test.cc new file mode 100644 index 000000000..b95a220bb --- /dev/null +++ b/src/iceberg/test/rest_util_test.cc @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/catalog/rest/rest_util.h" + +#include + +#include "iceberg/table_identifier.h" +#include "iceberg/test/matchers.h" + +namespace iceberg::rest { + +TEST(RestUtilTest, TrimTrailingSlash) { + EXPECT_EQ(TrimTrailingSlash("https://foo"), "https://foo"); + EXPECT_EQ(TrimTrailingSlash("https://foo/"), "https://foo"); + EXPECT_EQ(TrimTrailingSlash("https://foo////"), "https://foo"); +} + +TEST(RestUtilTest, RoundTripUrlEncodeDecodeNamespace) { + // {"dogs"} + EXPECT_THAT(EncodeNamespace(Namespace{.levels = {"dogs"}}), + HasValue(::testing::Eq("dogs"))); + EXPECT_THAT(DecodeNamespace("dogs"), + HasValue(::testing::Eq(Namespace{.levels = {"dogs"}}))); + + // {"dogs.named.hank"} + EXPECT_THAT(EncodeNamespace(Namespace{.levels = {"dogs.named.hank"}}), + HasValue(::testing::Eq("dogs.named.hank"))); + EXPECT_THAT(DecodeNamespace("dogs.named.hank"), + HasValue(::testing::Eq(Namespace{.levels = {"dogs.named.hank"}}))); + + // {"dogs/named/hank"} + EXPECT_THAT(EncodeNamespace(Namespace{.levels = {"dogs/named/hank"}}), + HasValue(::testing::Eq("dogs%2Fnamed%2Fhank"))); + EXPECT_THAT(DecodeNamespace("dogs%2Fnamed%2Fhank"), + HasValue(::testing::Eq(Namespace{.levels = {"dogs/named/hank"}}))); + + // {"dogs", "named", "hank"} + EXPECT_THAT(EncodeNamespace(Namespace{.levels = {"dogs", "named", "hank"}}), + HasValue(::testing::Eq("dogs%1Fnamed%1Fhank"))); + EXPECT_THAT(DecodeNamespace("dogs%1Fnamed%1Fhank"), + HasValue(::testing::Eq(Namespace{.levels = {"dogs", "named", "hank"}}))); + + // {"dogs.and.cats", "named", "hank.or.james-westfall"} + EXPECT_THAT(EncodeNamespace(Namespace{ + .levels = {"dogs.and.cats", "named", "hank.or.james-westfall"}}), + HasValue(::testing::Eq("dogs.and.cats%1Fnamed%1Fhank.or.james-westfall"))); + EXPECT_THAT(DecodeNamespace("dogs.and.cats%1Fnamed%1Fhank.or.james-westfall"), + HasValue(::testing::Eq(Namespace{ + .levels = {"dogs.and.cats", "named", "hank.or.james-westfall"}}))); + + // empty namespace + EXPECT_THAT(EncodeNamespace(Namespace{.levels = {}}), HasValue(::testing::Eq(""))); + EXPECT_THAT(DecodeNamespace(""), HasValue(::testing::Eq(Namespace{.levels = {}}))); +} + +TEST(RestUtilTest, EncodeString) { + // RFC 3986 unreserved characters should not be encoded + EXPECT_THAT(EncodeString("abc123XYZ"), HasValue(::testing::Eq("abc123XYZ"))); + EXPECT_THAT(EncodeString("test-file_name.txt~backup"), + HasValue(::testing::Eq("test-file_name.txt~backup"))); + + // Spaces and special characters should be encoded + EXPECT_THAT(EncodeString("hello world"), HasValue(::testing::Eq("hello%20world"))); + EXPECT_THAT(EncodeString("test@example.com"), + HasValue(::testing::Eq("test%40example.com"))); + EXPECT_THAT(EncodeString("path/to/file"), HasValue(::testing::Eq("path%2Fto%2Ffile"))); + EXPECT_THAT(EncodeString("key=value&foo=bar"), + HasValue(::testing::Eq("key%3Dvalue%26foo%3Dbar"))); + EXPECT_THAT(EncodeString("100%"), HasValue(::testing::Eq("100%25"))); + EXPECT_THAT(EncodeString("hello\x1Fworld"), HasValue(::testing::Eq("hello%1Fworld"))); + EXPECT_THAT(EncodeString(""), HasValue(::testing::Eq(""))); +} + +TEST(RestUtilTest, DecodeString) { + // Decode percent-encoded strings + EXPECT_THAT(DecodeString("hello%20world"), HasValue(::testing::Eq("hello world"))); + EXPECT_THAT(DecodeString("test%40example.com"), + HasValue(::testing::Eq("test@example.com"))); + EXPECT_THAT(DecodeString("path%2Fto%2Ffile"), HasValue(::testing::Eq("path/to/file"))); + EXPECT_THAT(DecodeString("key%3Dvalue%26foo%3Dbar"), + HasValue(::testing::Eq("key=value&foo=bar"))); + EXPECT_THAT(DecodeString("100%25"), HasValue(::testing::Eq("100%"))); + + // ASCII Unit Separator (0x1F) + EXPECT_THAT(DecodeString("hello%1Fworld"), HasValue(::testing::Eq("hello\x1Fworld"))); + + // Unreserved characters remain unchanged + EXPECT_THAT(DecodeString("test-file_name.txt~backup"), + HasValue(::testing::Eq("test-file_name.txt~backup"))); + EXPECT_THAT(DecodeString(""), HasValue(::testing::Eq(""))); +} + +TEST(RestUtilTest, EncodeDecodeStringRoundTrip) { + std::vector test_cases = {"hello world", + "test@example.com", + "path/to/file", + "key=value&foo=bar", + "100%", + "hello\x1Fworld", + "special!@#$%^&*()chars", + "mixed-123_test.file~ok", + ""}; + + for (const auto& test : test_cases) { + ICEBERG_UNWRAP_OR_FAIL(std::string encoded, EncodeString(test)); + ICEBERG_UNWRAP_OR_FAIL(std::string decoded, DecodeString(encoded)); + EXPECT_EQ(decoded, test) << "Round-trip failed for: " << test; + } +} + +TEST(RestUtilTest, MergeConfigs) { + std::unordered_map server_defaults = { + {"default1", "value1"}, {"default2", "value2"}, {"common", "default_value"}}; + + std::unordered_map client_configs = { + {"client1", "value1"}, {"common", "client_value"}, {"extra", "client_value"}}; + + std::unordered_map server_overrides = { + {"override1", "value1"}, {"common", "override_value"}}; + + auto merged = MergeConfigs(server_defaults, client_configs, server_overrides); + + EXPECT_EQ(merged.size(), 6); + + // Check precedence: server_overrides > client_configs > server_defaults + EXPECT_EQ(merged["default1"], "value1"); + EXPECT_EQ(merged["default2"], "value2"); + EXPECT_EQ(merged["client1"], "value1"); + EXPECT_EQ(merged["override1"], "value1"); + EXPECT_EQ(merged["common"], "override_value"); + EXPECT_EQ(merged["extra"], "client_value"); + + // Test with empty maps + auto merged_empty = MergeConfigs({}, {{"key", "value"}}, {}); + EXPECT_EQ(merged_empty.size(), 1); + EXPECT_EQ(merged_empty["key"], "value"); +} + +} // namespace iceberg::rest diff --git a/src/iceberg/util/config.h b/src/iceberg/util/config.h index 7a3a28b40..8fb715534 100644 --- a/src/iceberg/util/config.h +++ b/src/iceberg/util/config.h @@ -112,6 +112,19 @@ class ConfigBase { const std::unordered_map& configs() const { return configs_; } + /// \brief Extracts the prefix from the configuration. + /// \param prefix The prefix to extract. + /// \return A map of entries that match the prefix with prefix removed. + std::unordered_map Extract(std::string_view prefix) const { + std::unordered_map extracted; + for (const auto& [key, value] : configs_) { + if (key.starts_with(prefix)) { + extracted[key.substr(prefix.length())] = value; + } + } + return extracted; + } + protected: std::unordered_map configs_; };