Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions src/iceberg/catalog/rest/http_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "iceberg/catalog/rest/constant.h"
#include "iceberg/catalog/rest/error_handlers.h"
#include "iceberg/catalog/rest/json_internal.h"
#include "iceberg/catalog/rest/rest_util.h"
#include "iceberg/json_internal.h"
#include "iceberg/result.h"
#include "iceberg/util/macros.h"
Expand Down Expand Up @@ -63,6 +64,9 @@ std::unordered_map<std::string, std::string> HttpResponse::headers() const {

namespace {

/// \brief Default error type for unparseable REST responses.
constexpr std::string_view kRestExceptionType = "RESTException";

/// \brief Merges global default headers with request-specific headers.
///
/// Combines the global headers derived from RestCatalogProperties with the headers
Expand Down Expand Up @@ -96,16 +100,36 @@ bool IsSuccessful(int32_t status_code) {
|| status_code == 304; // Not Modified
}

/// \brief Builds a default ErrorResponse when the response body cannot be parsed.
ErrorResponse BuildDefaultErrorResponse(const cpr::Response& response) {
return {
.code = static_cast<uint32_t>(response.status_code),
.type = std::string(kRestExceptionType),
.message = !response.reason.empty() ? response.reason
: GetStandardReasonPhrase(response.status_code),
};
}

/// \brief Tries to parse the response body as an ErrorResponse.
Result<ErrorResponse> TryParseErrorResponse(const std::string& text) {
if (text.empty()) {
return InvalidArgument("Empty response body");
}
ICEBERG_ASSIGN_OR_RAISE(auto json_result, FromJsonString(text));
ICEBERG_ASSIGN_OR_RAISE(auto error_result, ErrorResponseFromJson(json_result));
return error_result;
}

/// \brief Handles failure responses by invoking the provided error handler.
Status HandleFailureResponse(const cpr::Response& response,
const ErrorHandler& error_handler) {
if (!IsSuccessful(response.status_code)) {
// TODO(gangwu): response status code is lost, wrap it with RestError.
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.text));
ICEBERG_ASSIGN_OR_RAISE(auto error_response, ErrorResponseFromJson(json));
return error_handler.Accept(error_response);
if (IsSuccessful(response.status_code)) {
return {};
}
return {};
auto parse_result = TryParseErrorResponse(response.text);
const ErrorResponse final_error =
parse_result.value_or(BuildDefaultErrorResponse(response));
return error_handler.Accept(final_error);
}

} // namespace
Expand Down
3 changes: 3 additions & 0 deletions src/iceberg/catalog/rest/json_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ Result<LoadTableResult> LoadTableResultFromJson(const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(result.metadata, TableMetadataFromJson(metadata_json));
ICEBERG_ASSIGN_OR_RAISE(result.config,
GetJsonValueOrDefault<decltype(result.config)>(json, kConfig));
ICEBERG_RETURN_UNEXPECTED(result.Validate());
return result;
}

Expand Down Expand Up @@ -257,6 +258,7 @@ Result<CreateNamespaceResponse> CreateNamespaceResponseFromJson(
ICEBERG_ASSIGN_OR_RAISE(
response.properties,
GetJsonValueOrDefault<decltype(response.properties)>(json, kProperties));
ICEBERG_RETURN_UNEXPECTED(response.Validate());
return response;
}

Expand All @@ -274,6 +276,7 @@ Result<GetNamespaceResponse> GetNamespaceResponseFromJson(const nlohmann::json&
ICEBERG_ASSIGN_OR_RAISE(
response.properties,
GetJsonValueOrDefault<decltype(response.properties)>(json, kProperties));
ICEBERG_RETURN_UNEXPECTED(response.Validate());
return response;
}

Expand Down
74 changes: 58 additions & 16 deletions src/iceberg/catalog/rest/rest_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
#include "iceberg/catalog/rest/rest_catalog.h"
#include "iceberg/catalog/rest/rest_util.h"
#include "iceberg/json_internal.h"
#include "iceberg/partition_spec.h"
#include "iceberg/result.h"
#include "iceberg/schema.h"
#include "iceberg/table.h"
#include "iceberg/util/macros.h"

Expand Down Expand Up @@ -99,7 +101,7 @@ Result<std::vector<Namespace>> RestCatalog::ListNamespaces(const Namespace& ns)
if (!next_token.empty()) {
params[kQueryParamPageToken] = next_token;
}
ICEBERG_ASSIGN_OR_RAISE(const auto& response,
ICEBERG_ASSIGN_OR_RAISE(const auto response,
client_->Get(endpoint, params, /*headers=*/{},
*NamespaceErrorHandler::Instance()));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
Expand All @@ -115,29 +117,69 @@ Result<std::vector<Namespace>> RestCatalog::ListNamespaces(const Namespace& ns)
}

Status RestCatalog::CreateNamespace(
[[maybe_unused]] const Namespace& ns,
[[maybe_unused]] const std::unordered_map<std::string, std::string>& properties) {
return NotImplemented("Not implemented");
const Namespace& ns, const std::unordered_map<std::string, std::string>& properties) {
ICEBERG_ASSIGN_OR_RAISE(auto endpoint, paths_->Namespaces());
CreateNamespaceRequest request{.namespace_ = ns, .properties = properties};
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
ICEBERG_ASSIGN_OR_RAISE(const auto response,
client_->Post(endpoint, json_request, /*headers=*/{},
*NamespaceErrorHandler::Instance()));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
ICEBERG_ASSIGN_OR_RAISE(auto create_response, CreateNamespaceResponseFromJson(json));
return {};
}

Result<std::unordered_map<std::string, std::string>> RestCatalog::GetNamespaceProperties(
[[maybe_unused]] const Namespace& ns) const {
return NotImplemented("Not implemented");
}

Status RestCatalog::DropNamespace([[maybe_unused]] const Namespace& ns) {
return NotImplemented("Not implemented");
const Namespace& ns) const {
ICEBERG_ASSIGN_OR_RAISE(auto endpoint, paths_->Namespace_(ns));
ICEBERG_ASSIGN_OR_RAISE(const auto response,
client_->Get(endpoint, /*params=*/{}, /*headers=*/{},
*NamespaceErrorHandler::Instance()));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
ICEBERG_ASSIGN_OR_RAISE(auto get_response, GetNamespaceResponseFromJson(json));
return get_response.properties;
}

Result<bool> RestCatalog::NamespaceExists([[maybe_unused]] const Namespace& ns) const {
return NotImplemented("Not implemented");
Status RestCatalog::DropNamespace(const Namespace& ns) {
ICEBERG_ASSIGN_OR_RAISE(auto endpoint, paths_->Namespace_(ns));
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Delete(endpoint, /*headers=*/{}, *DropNamespaceErrorHandler::Instance()));
return {};
}

Result<bool> RestCatalog::NamespaceExists(const Namespace& ns) const {
ICEBERG_ASSIGN_OR_RAISE(auto endpoint, paths_->Namespace_(ns));
// TODO(Feiyang Li): checks if the server supports the namespace exists endpoint, if
// not, triggers a fallback mechanism
auto response_or_error =
client_->Head(endpoint, /*headers=*/{}, *NamespaceErrorHandler::Instance());
if (!response_or_error.has_value()) {
const auto& error = response_or_error.error();
// catch NoSuchNamespaceException/404 and return false
if (error.kind == ErrorKind::kNoSuchNamespace) {
return false;
}
ICEBERG_RETURN_UNEXPECTED(response_or_error);
}
return true;
}

Status RestCatalog::UpdateNamespaceProperties(
[[maybe_unused]] const Namespace& ns,
[[maybe_unused]] const std::unordered_map<std::string, std::string>& updates,
[[maybe_unused]] const std::unordered_set<std::string>& removals) {
return NotImplemented("Not implemented");
const Namespace& ns, const std::unordered_map<std::string, std::string>& updates,
const std::unordered_set<std::string>& removals) {
ICEBERG_ASSIGN_OR_RAISE(auto endpoint, paths_->NamespaceProperties(ns));
UpdateNamespacePropertiesRequest request{
.removals = std::vector<std::string>(removals.begin(), removals.end()),
.updates = updates};
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
ICEBERG_ASSIGN_OR_RAISE(const auto response,
client_->Post(endpoint, json_request, /*headers=*/{},
*NamespaceErrorHandler::Instance()));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
ICEBERG_ASSIGN_OR_RAISE(auto update_response,
UpdateNamespacePropertiesResponseFromJson(json));
return {};
}

Result<std::vector<TableIdentifier>> RestCatalog::ListTables(
Expand Down
131 changes: 131 additions & 0 deletions src/iceberg/catalog/rest/rest_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

#include "iceberg/catalog/rest/rest_util.h"

#include <format>

#include <cpr/util.h>

#include "iceberg/table_identifier.h"
Expand Down Expand Up @@ -120,4 +122,133 @@ std::unordered_map<std::string, std::string> MergeConfigs(
return merged;
}

std::string GetStandardReasonPhrase(int32_t status_code) {
switch (status_code) {
case 100:
return "Continue";
case 101:
return "Switching Protocols";
case 102:
return "Processing";
case 103:
return "Early Hints";
case 200:
return "OK";
case 201:
return "Created";
case 202:
return "Accepted";
case 203:
return "Non Authoritative Information";
case 204:
return "No Content";
case 205:
return "Reset Content";
case 206:
return "Partial Content";
case 207:
return "Multi-Status";
case 208:
return "Already Reported";
case 226:
return "IM Used";
case 300:
return "Multiple Choices";
case 301:
return "Moved Permanently";
case 302:
return "Moved Temporarily";
case 303:
return "See Other";
case 304:
return "Not Modified";
case 305:
return "Use Proxy";
case 307:
return "Temporary Redirect";
case 308:
return "Permanent Redirect";
case 400:
return "Bad Request";
case 401:
return "Unauthorized";
case 402:
return "Payment Required";
case 403:
return "Forbidden";
case 404:
return "Not Found";
case 405:
return "Method Not Allowed";
case 406:
return "Not Acceptable";
case 407:
return "Proxy Authentication Required";
case 408:
return "Request Timeout";
case 409:
return "Conflict";
case 410:
return "Gone";
case 411:
return "Length Required";
case 412:
return "Precondition Failed";
case 413:
return "Request Too Long";
case 414:
return "Request-URI Too Long";
case 415:
return "Unsupported Media Type";
case 416:
return "Requested Range Not Satisfiable";
case 417:
return "Expectation Failed";
case 421:
return "Misdirected Request";
case 422:
return "Unprocessable Content";
case 423:
return "Locked";
case 424:
return "Failed Dependency";
case 425:
return "Too Early";
case 426:
return "Upgrade Required";
case 428:
return "Precondition Required";
case 429:
return "Too Many Requests";
case 431:
return "Request Header Fields Too Large";
case 451:
return "Unavailable For Legal Reasons";
case 500:
return "Internal Server Error";
case 501:
return "Not Implemented";
case 502:
return "Bad Gateway";
case 503:
return "Service Unavailable";
case 504:
return "Gateway Timeout";
case 505:
return "Http Version Not Supported";
case 506:
return "Variant Also Negotiates";
case 507:
return "Insufficient Storage";
case 508:
return "Loop Detected";
case 510:
return "Not Extended";
case 511:
return "Network Authentication Required";
default:
return std::format("HTTP {}", status_code);
}
}

} // namespace iceberg::rest
9 changes: 9 additions & 0 deletions src/iceberg/catalog/rest/rest_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,13 @@ ICEBERG_REST_EXPORT std::unordered_map<std::string, std::string> MergeConfigs(
const std::unordered_map<std::string, std::string>& client_configs,
const std::unordered_map<std::string, std::string>& server_overrides);

/// \brief Get the standard HTTP reason phrase for a status code.
///
/// \details Returns the standard English reason phrase for common HTTP status codes.
/// For unknown status codes, returns a generic "HTTP {code}" message.
/// \param status_code The HTTP status code (e.g., 200, 404, 500).
/// \return The standard reason phrase string (e.g., "OK", "Not Found", "Internal Server
/// Error").
ICEBERG_REST_EXPORT std::string GetStandardReasonPhrase(int32_t status_code);

} // namespace iceberg::rest
4 changes: 2 additions & 2 deletions src/iceberg/catalog/rest/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ struct ICEBERG_REST_EXPORT CatalogConfig {

/// \brief JSON error payload returned in a response with further details on the error.
struct ICEBERG_REST_EXPORT ErrorResponse {
std::string message; // required
std::string type; // required
uint32_t code; // required
std::string type; // required
std::string message; // required
std::vector<std::string> stack;

/// \brief Validates the ErrorResponse.
Expand Down
Loading
Loading