Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/iceberg/catalog/rest/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
# specific language governing permissions and limitations
# under the License.

set(ICEBERG_REST_SOURCES rest_catalog.cc json_internal.cc)
set(ICEBERG_REST_SOURCES
catalog.cc
json_internal.cc
validator.cc
config.cc
http_client_internal.cc)

set(ICEBERG_REST_STATIC_BUILD_INTERFACE_LIBS)
set(ICEBERG_REST_SHARED_BUILD_INTERFACE_LIBS)
Expand Down
199 changes: 199 additions & 0 deletions src/iceberg/catalog/rest/catalog.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/catalog/rest/catalog.h"

#include <memory>
#include <utility>

#include <cpr/cpr.h>

#include "iceberg/catalog/rest/config.h"
#include "iceberg/catalog/rest/http_client_interal.h"
#include "iceberg/catalog/rest/json_internal.h"
#include "iceberg/catalog/rest/types.h"
#include "iceberg/catalog/rest/util.h"
#include "iceberg/json_internal.h"
#include "iceberg/result.h"
#include "iceberg/table.h"
#include "iceberg/util/macros.h"

namespace iceberg::rest {

Result<RestCatalog> RestCatalog::Make(RestCatalogConfig config) {
// Validate that uri is not empty
if (config.uri.empty()) {
return InvalidArgument("Rest catalog configuration property 'uri' is required.");
}
ICEBERG_ASSIGN_OR_RAISE(auto tmp_client, HttpClient::Make(config));
const std::string endpoint = config.GetConfigEndpoint();
cpr::Parameters params;
if (config.warehouse.has_value()) {
params.Add({"warehouse", config.warehouse.value()});
}
ICEBERG_ASSIGN_OR_RAISE(const auto& response, tmp_client->Get(endpoint, params));
switch (response.status_code) {
case cpr::status::HTTP_OK: {
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.text));
ICEBERG_ASSIGN_OR_RAISE(auto server_config, CatalogConfigFromJson(json));
// Merge server config into client config, server config overrides > client config
// properties > server config defaults
auto final_props = std::move(server_config.defaults);
for (const auto& kv : config.properties_) {
final_props.insert_or_assign(kv.first, kv.second);
}

for (const auto& kv : server_config.overrides) {
final_props.insert_or_assign(kv.first, kv.second);
}
RestCatalogConfig final_config = {
.uri = config.uri,
.name = config.name,
.warehouse = config.warehouse,
.properties_ = std::move(final_props),
};
ICEBERG_ASSIGN_OR_RAISE(auto client, HttpClient::Make(final_config));
return RestCatalog(std::make_shared<RestCatalogConfig>(std::move(final_config)),
std::move(client));
};
default: {
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.text));
ICEBERG_ASSIGN_OR_RAISE(auto list_response, ErrorResponseFromJson(json));
return UnknownError("Error listing namespaces: {}", list_response.error.message);
}
}
}

RestCatalog::RestCatalog(std::shared_ptr<RestCatalogConfig> config,
std::unique_ptr<HttpClient> client)
: config_(std::move(config)), client_(std::move(client)) {}

std::string_view RestCatalog::name() const {
return config_->name.has_value() ? std::string_view(config_->name.value())
: std::string_view("");
}

Result<std::vector<Namespace>> RestCatalog::ListNamespaces(const Namespace& ns) const {
const std::string endpoint = config_->GetNamespacesEndpoint();
std::vector<Namespace> result;
std::string next_token;
while (true) {
cpr::Parameters params;
if (!ns.levels.empty()) {
params.Add({"parent", EncodeNamespaceForUrl(ns)});
}
if (!next_token.empty()) {
params.Add({"page_token", next_token});
}
ICEBERG_ASSIGN_OR_RAISE(const auto& response, client_->Get(endpoint, params));
switch (response.status_code) {
case cpr::status::HTTP_OK: {
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.text));
ICEBERG_ASSIGN_OR_RAISE(auto list_response, ListNamespacesResponseFromJson(json));
result.insert(result.end(), list_response.namespaces.begin(),
list_response.namespaces.end());
if (list_response.next_page_token.empty()) {
return result;
}
next_token = list_response.next_page_token;
continue;
}
case cpr::status::HTTP_NOT_FOUND: {
return NoSuchNamespace("Namespace not found");
}
default:
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.text));
ICEBERG_ASSIGN_OR_RAISE(auto list_response, ErrorResponseFromJson(json));
return UnknownError("Error listing namespaces: {}", list_response.error.message);
}
}
}

Status RestCatalog::CreateNamespace(
const Namespace& ns, const std::unordered_map<std::string, std::string>& properties) {
return NotImplemented("Not implemented");
}

Result<std::unordered_map<std::string, std::string>> RestCatalog::GetNamespaceProperties(
const Namespace& ns) const {
return NotImplemented("Not implemented");
}

Status RestCatalog::DropNamespace(const Namespace& ns) {
return NotImplemented("Not implemented");
}

Result<bool> RestCatalog::NamespaceExists(const Namespace& ns) const {
return NotImplemented("Not implemented");
}

Status RestCatalog::UpdateNamespaceProperties(
const Namespace& ns, const std::unordered_map<std::string, std::string>& updates,
const std::unordered_set<std::string>& removals) {
return NotImplemented("Not implemented");
}

Result<std::vector<TableIdentifier>> RestCatalog::ListTables(const Namespace& ns) const {
return NotImplemented("Not implemented");
}

Result<std::unique_ptr<Table>> RestCatalog::CreateTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) {
return NotImplemented("Not implemented");
}

Result<std::unique_ptr<Table>> RestCatalog::UpdateTable(
const TableIdentifier& identifier,
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
const std::vector<std::unique_ptr<TableUpdate>>& updates) {
return NotImplemented("Not implemented");
}

Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) {
return NotImplemented("Not implemented");
}

Status RestCatalog::DropTable(const TableIdentifier& identifier, bool purge) {
return NotImplemented("Not implemented");
}

Result<bool> RestCatalog::TableExists(const TableIdentifier& identifier) const {
return NotImplemented("Not implemented");
}

Result<std::unique_ptr<Table>> RestCatalog::LoadTable(const TableIdentifier& identifier) {
return NotImplemented("Not implemented");
}

Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
const TableIdentifier& identifier, const std::string& metadata_file_location) {
return NotImplemented("Not implemented");
}

std::unique_ptr<RestCatalog::TableBuilder> RestCatalog::BuildTable(
const TableIdentifier& identifier, const Schema& schema) const {
return nullptr;
}

} // namespace iceberg::rest
165 changes: 165 additions & 0 deletions src/iceberg/catalog/rest/catalog.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <memory>
#include <string>

#include <cpr/cpr.h>

#include "iceberg/catalog.h"
#include "iceberg/catalog/rest/config.h"
#include "iceberg/catalog/rest/http_client_interal.h"
#include "iceberg/catalog/rest/iceberg_rest_export.h"
#include "iceberg/catalog/rest/types.h"
#include "iceberg/result.h"

/// \file iceberg/catalog/rest/catalog.h
/// RestCatalog implementation for Iceberg REST API.

namespace iceberg::rest {

class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
public:
RestCatalog(const RestCatalog&) = delete;
RestCatalog& operator=(const RestCatalog&) = delete;
RestCatalog(RestCatalog&&) = default;
RestCatalog& operator=(RestCatalog&&) = default;

/// \brief Create a RestCatalog instance
///
/// \param config the configuration for the RestCatalog
/// \return a RestCatalog instance
static Result<RestCatalog> Make(RestCatalogConfig config);

/// \brief Return the name for this catalog
std::string_view name() const override;

/// \brief List child namespaces from the given namespace.
Result<std::vector<Namespace>> ListNamespaces(const Namespace& ns) const override;

/// \brief Create a namespace with associated properties.
Status CreateNamespace(
const Namespace& ns,
const std::unordered_map<std::string, std::string>& properties) override;

/// \brief Get metadata properties for a namespace.
Result<std::unordered_map<std::string, std::string>> GetNamespaceProperties(
const Namespace& ns) const override;

/// \brief Drop a namespace.
Status DropNamespace(const Namespace& ns) override;

/// \brief Check whether the namespace exists.
Result<bool> NamespaceExists(const Namespace& ns) const override;

/// \brief Update a namespace's properties by applying additions and removals.
Status UpdateNamespaceProperties(
const Namespace& ns, const std::unordered_map<std::string, std::string>& updates,
const std::unordered_set<std::string>& removals) override;

/// \brief Return all the identifiers under this namespace
Result<std::vector<TableIdentifier>> ListTables(const Namespace& ns) const override;

/// \brief Create a table
Result<std::unique_ptr<Table>> CreateTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) override;

/// \brief Update a table
///
/// \param identifier a table identifier
/// \param requirements a list of table requirements
/// \param updates a list of table updates
/// \return a Table instance or ErrorKind::kAlreadyExists if the table already exists
Result<std::unique_ptr<Table>> UpdateTable(
const TableIdentifier& identifier,
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
const std::vector<std::unique_ptr<TableUpdate>>& updates) override;

/// \brief Start a transaction to create a table
///
/// \param identifier a table identifier
/// \param schema a schema
/// \param spec a partition spec
/// \param location a location for the table; leave empty if unspecified
/// \param properties a string map of table properties
/// \return a Transaction to create the table or ErrorKind::kAlreadyExists if the
/// table already exists
Result<std::shared_ptr<Transaction>> StageCreateTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) override;

/// \brief Check whether table exists
///
/// \param identifier a table identifier
/// \return Result<bool> indicating table exists or not.
/// - On success, the table existence was successfully checked (actual
/// existence may be inferred elsewhere).
/// - On failure, contains error information.
Result<bool> TableExists(const TableIdentifier& identifier) const override;

/// \brief Drop a table; optionally delete data and metadata files
///
/// If purge is set to true the implementation should delete all data and metadata
/// files.
///
/// \param identifier a table identifier
/// \param purge if true, delete all data and metadata files in the table
/// \return Status indicating the outcome of the operation.
/// - On success, the table was dropped (or did not exist).
/// - On failure, contains error information.
Status DropTable(const TableIdentifier& identifier, bool purge) override;

/// \brief Load a table
///
/// \param identifier a table identifier
/// \return instance of Table implementation referred to by identifier or
/// ErrorKind::kNoSuchTable if the table does not exist
Result<std::unique_ptr<Table>> LoadTable(const TableIdentifier& identifier) override;

/// \brief Register a table with the catalog if it does not exist
///
/// \param identifier a table identifier
/// \param metadata_file_location the location of a metadata file
/// \return a Table instance or ErrorKind::kAlreadyExists if the table already exists
Result<std::shared_ptr<Table>> RegisterTable(
const TableIdentifier& identifier,
const std::string& metadata_file_location) override;

/// \brief A builder used to create valid tables or start create/replace transactions
///
/// \param identifier a table identifier
/// \param schema a schema
/// \return the builder to create a table or start a create/replace transaction
std::unique_ptr<RestCatalog::TableBuilder> BuildTable(
const TableIdentifier& identifier, const Schema& schema) const override;

private:
RestCatalog(std::shared_ptr<RestCatalogConfig> config,
std::unique_ptr<HttpClient> client);

std::shared_ptr<RestCatalogConfig> config_;
std::unique_ptr<HttpClient> client_;
};

} // namespace iceberg::rest
Loading
Loading