Skip to content

Commit 4efd9d3

Browse files
committed
polish impls
1 parent f1685be commit 4efd9d3

17 files changed

+360
-279
lines changed

src/iceberg/catalog/rest/catalog_properties.cc

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,18 +37,7 @@ std::unique_ptr<RestCatalogProperties> RestCatalogProperties::FromMap(
3737

3838
std::unordered_map<std::string, std::string> RestCatalogProperties::ExtractHeaders()
3939
const {
40-
std::unordered_map<std::string, std::string> headers;
41-
constexpr std::string_view prefix = "header.";
42-
for (const auto& [key, value] : configs_) {
43-
if (key.starts_with(prefix)) {
44-
std::string header_name = key.substr(prefix.length());
45-
if (header_name.empty() || value.empty()) {
46-
continue;
47-
}
48-
headers[header_name] = value;
49-
}
50-
}
51-
return headers;
40+
return Extract(kHeaderPrefix);
5241
}
5342

5443
Result<std::string_view> RestCatalogProperties::Uri() const {

src/iceberg/catalog/rest/catalog_properties.h

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,14 @@ class ICEBERG_REST_EXPORT RestCatalogProperties
4141

4242
/// \brief The URI of the REST catalog server.
4343
inline static Entry<std::string> kUri{"uri", ""};
44-
4544
/// \brief The name of the catalog.
4645
inline static Entry<std::string> kName{"name", ""};
47-
4846
/// \brief The warehouse path.
4947
inline static Entry<std::string> kWarehouse{"warehouse", ""};
50-
5148
/// \brief The optional prefix for REST API paths.
5249
inline static Entry<std::string> kPrefix{"prefix", ""};
50+
/// \brief The prefix for HTTP headers.
51+
inline static constexpr std::string_view kHeaderPrefix = "header.";
5352

5453
/// \brief Create a default RestCatalogProperties instance.
5554
static std::unique_ptr<RestCatalogProperties> default_properties();
@@ -59,9 +58,6 @@ class ICEBERG_REST_EXPORT RestCatalogProperties
5958
const std::unordered_map<std::string, std::string>& properties);
6059

6160
/// \brief Returns HTTP headers to be added to every request.
62-
///
63-
/// This includes any key prefixed with "header." in the properties.
64-
/// \return A map of headers with the prefix removed from the keys.
6561
std::unordered_map<std::string, std::string> ExtractHeaders() const;
6662

6763
/// \brief Get the URI of the REST catalog server.

src/iceberg/catalog/rest/error_handlers.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
#include <string_view>
2323

24+
#include "iceberg/catalog/rest/types.h"
25+
2426
namespace iceberg::rest {
2527

2628
namespace {
@@ -58,7 +60,7 @@ Status DefaultErrorHandler::Accept(const ErrorModel& error) const {
5860
return ServiceUnavailable("Service unavailable: {}", error.message);
5961
}
6062

61-
return RestError("Unable to process: {}", error.message);
63+
return RestError("Code: {}, message: {}", error.code, error.message);
6264
}
6365

6466
const std::shared_ptr<NamespaceErrorHandler>& NamespaceErrorHandler::Instance() {

src/iceberg/catalog/rest/error_handlers.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
#include <memory>
2323

2424
#include "iceberg/catalog/rest/iceberg_rest_export.h"
25-
#include "iceberg/catalog/rest/types.h"
25+
#include "iceberg/catalog/rest/type_fwd.h"
2626
#include "iceberg/result.h"
2727

2828
/// \file iceberg/catalog/rest/error_handlers.h

src/iceberg/catalog/rest/http_client.cc

Lines changed: 50 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
#include <nlohmann/json.hpp>
2424

2525
#include "iceberg/catalog/rest/constant.h"
26+
#include "iceberg/catalog/rest/error_handlers.h"
2627
#include "iceberg/catalog/rest/json_internal.h"
2728
#include "iceberg/json_internal.h"
29+
#include "iceberg/result.h"
2830
#include "iceberg/util/macros.h"
2931

3032
namespace iceberg::rest {
@@ -47,11 +49,8 @@ class HttpResponse::Impl {
4749
};
4850

4951
HttpResponse::HttpResponse(HttpResponse&&) noexcept = default;
50-
5152
HttpResponse& HttpResponse::operator=(HttpResponse&&) noexcept = default;
52-
5353
HttpResponse::HttpResponse() = default;
54-
5554
HttpResponse::~HttpResponse() = default;
5655

5756
int32_t HttpResponse::status_code() const { return impl_->status_code(); }
@@ -101,6 +100,7 @@ bool IsSuccessful(int32_t status_code) {
101100
Status HandleFailureResponse(const cpr::Response& response,
102101
const ErrorHandler& error_handler) {
103102
if (!IsSuccessful(response.status_code)) {
103+
// TODO(gangwu): response status code is lost, wrap it with RestError.
104104
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.text));
105105
ICEBERG_ASSIGN_OR_RAISE(auto error_response, ErrorResponseFromJson(json));
106106
return error_handler.Accept(error_response.error);
@@ -137,30 +137,34 @@ Result<HttpResponse> HttpClient::Get(
137137
const std::string& path, const std::unordered_map<std::string, std::string>& params,
138138
const std::unordered_map<std::string, std::string>& headers,
139139
const ErrorHandler& error_handler) {
140-
std::scoped_lock<std::mutex> lock(session_mutex_);
140+
cpr::Response response;
141+
{
142+
std::scoped_lock<std::mutex> lock(session_mutex_);
143+
PrepareSession(path, headers, params);
144+
response = session_->Get();
145+
}
141146

142-
PrepareSession(path, headers, params);
143-
cpr::Response response = session_->Get();
144147
ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
145-
auto impl = std::make_unique<HttpResponse::Impl>(std::move(response));
146148
HttpResponse http_response;
147-
http_response.impl_ = std::move(impl);
149+
http_response.impl_ = std::make_unique<HttpResponse::Impl>(std::move(response));
148150
return http_response;
149151
}
150152

151153
Result<HttpResponse> HttpClient::Post(
152154
const std::string& path, const std::string& body,
153155
const std::unordered_map<std::string, std::string>& headers,
154156
const ErrorHandler& error_handler) {
155-
std::scoped_lock<std::mutex> lock(session_mutex_);
157+
cpr::Response response;
158+
{
159+
std::scoped_lock<std::mutex> lock(session_mutex_);
160+
PrepareSession(path, headers);
161+
session_->SetBody(cpr::Body{body});
162+
response = session_->Post();
163+
}
156164

157-
PrepareSession(path, headers);
158-
session_->SetBody(cpr::Body{body});
159-
cpr::Response response = session_->Post();
160165
ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
161-
auto impl = std::make_unique<HttpResponse::Impl>(std::move(response));
162166
HttpResponse http_response;
163-
http_response.impl_ = std::move(impl);
167+
http_response.impl_ = std::make_unique<HttpResponse::Impl>(std::move(response));
164168
return http_response;
165169
}
166170

@@ -169,52 +173,61 @@ Result<HttpResponse> HttpClient::PostForm(
169173
const std::unordered_map<std::string, std::string>& form_data,
170174
const std::unordered_map<std::string, std::string>& headers,
171175
const ErrorHandler& error_handler) {
172-
std::scoped_lock<std::mutex> lock(session_mutex_);
176+
cpr::Response response;
173177

174-
// Override default Content-Type (application/json) with form-urlencoded
175-
auto form_headers = headers;
176-
form_headers[kHeaderContentType] = kMimeTypeFormUrlEncoded;
178+
{
179+
std::scoped_lock<std::mutex> lock(session_mutex_);
177180

178-
PrepareSession(path, form_headers);
179-
std::vector<cpr::Pair> pair_list;
180-
pair_list.reserve(form_data.size());
181-
for (const auto& [key, val] : form_data) {
182-
pair_list.emplace_back(key, val);
181+
// Override default Content-Type (application/json) with form-urlencoded
182+
auto form_headers = headers;
183+
form_headers[kHeaderContentType] = kMimeTypeFormUrlEncoded;
184+
185+
PrepareSession(path, form_headers);
186+
std::vector<cpr::Pair> pair_list;
187+
pair_list.reserve(form_data.size());
188+
for (const auto& [key, val] : form_data) {
189+
pair_list.emplace_back(key, val);
190+
}
191+
session_->SetPayload(cpr::Payload(pair_list.begin(), pair_list.end()));
192+
193+
response = session_->Post();
183194
}
184-
session_->SetPayload(cpr::Payload(pair_list.begin(), pair_list.end()));
185-
cpr::Response response = session_->Post();
195+
186196
ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
187-
auto impl = std::make_unique<HttpResponse::Impl>(std::move(response));
188197
HttpResponse http_response;
189-
http_response.impl_ = std::move(impl);
198+
http_response.impl_ = std::make_unique<HttpResponse::Impl>(std::move(response));
190199
return http_response;
191200
}
192201

193202
Result<HttpResponse> HttpClient::Head(
194203
const std::string& path, const std::unordered_map<std::string, std::string>& headers,
195204
const ErrorHandler& error_handler) {
196-
std::scoped_lock<std::mutex> lock(session_mutex_);
205+
cpr::Response response;
206+
{
207+
std::scoped_lock<std::mutex> lock(session_mutex_);
208+
PrepareSession(path, headers);
209+
response = session_->Head();
210+
}
197211

198-
PrepareSession(path, headers);
199-
cpr::Response response = session_->Head();
200212
ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
201-
auto impl = std::make_unique<HttpResponse::Impl>(std::move(response));
202213
HttpResponse http_response;
203-
http_response.impl_ = std::move(impl);
214+
http_response.impl_ = std::make_unique<HttpResponse::Impl>(std::move(response));
204215
return http_response;
205216
}
206217

207218
Result<HttpResponse> HttpClient::Delete(
208219
const std::string& path, const std::unordered_map<std::string, std::string>& headers,
209220
const ErrorHandler& error_handler) {
210-
std::scoped_lock<std::mutex> lock(session_mutex_);
221+
cpr::Response response;
222+
{
223+
std::scoped_lock<std::mutex> lock(session_mutex_);
224+
PrepareSession(path, headers);
225+
response = session_->Delete();
226+
}
211227

212-
PrepareSession(path, headers);
213-
cpr::Response response = session_->Delete();
214228
ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
215-
auto impl = std::make_unique<HttpResponse::Impl>(std::move(response));
216229
HttpResponse http_response;
217-
http_response.impl_ = std::move(impl);
230+
http_response.impl_ = std::make_unique<HttpResponse::Impl>(std::move(response));
218231
return http_response;
219232
}
220233

src/iceberg/catalog/rest/http_client.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
#include <string>
2626
#include <unordered_map>
2727

28-
#include "iceberg/catalog/rest/error_handlers.h"
2928
#include "iceberg/catalog/rest/iceberg_rest_export.h"
29+
#include "iceberg/catalog/rest/type_fwd.h"
3030
#include "iceberg/result.h"
3131

3232
/// \file iceberg/catalog/rest/http_client.h
@@ -114,12 +114,10 @@ class ICEBERG_REST_EXPORT HttpClient {
114114

115115
std::unordered_map<std::string, std::string> default_headers_;
116116

117-
// Mutex to protect the non-thread-safe cpr::Session.
118-
mutable std::mutex session_mutex_;
119-
120117
// TODO(Li Feiyang): use connection pool to support external multi-threaded concurrent
121118
// calls
122119
std::unique_ptr<cpr::Session> session_;
120+
mutable std::mutex session_mutex_;
123121
};
124122

125123
} // namespace iceberg::rest

src/iceberg/catalog/rest/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ install_headers(
6565
'resource_paths.h',
6666
'rest_catalog.h',
6767
'rest_util.h',
68+
'type_fwd.h',
6869
'types.h',
6970
],
7071
subdir: 'iceberg/catalog/rest',

0 commit comments

Comments
 (0)