Skip to content

Commit a2fa579

Browse files
committed
feat(rest): implement create table
1 parent 25daf33 commit a2fa579

File tree

12 files changed

+308
-41
lines changed

12 files changed

+308
-41
lines changed

src/iceberg/catalog.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include <vector>
2828

2929
#include "iceberg/result.h"
30+
#include "iceberg/sort_order.h"
3031
#include "iceberg/table_identifier.h"
3132
#include "iceberg/type_fwd.h"
3233

@@ -107,12 +108,13 @@ class ICEBERG_EXPORT Catalog {
107108
/// \param identifier a table identifier
108109
/// \param schema a schema
109110
/// \param spec a partition spec
111+
/// \param order a sort order
110112
/// \param location a location for the table; leave empty if unspecified
111113
/// \param properties a string map of table properties
112114
/// \return a Table instance or ErrorKind::kAlreadyExists if the table already exists
113115
virtual Result<std::unique_ptr<Table>> CreateTable(
114116
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
115-
const std::string& location,
117+
const SortOrder& order, const std::string& location,
116118
const std::unordered_map<std::string, std::string>& properties) = 0;
117119

118120
/// \brief Update a table
@@ -131,13 +133,14 @@ class ICEBERG_EXPORT Catalog {
131133
/// \param identifier a table identifier
132134
/// \param schema a schema
133135
/// \param spec a partition spec
136+
/// \param order a sort order
134137
/// \param location a location for the table; leave empty if unspecified
135138
/// \param properties a string map of table properties
136139
/// \return a Transaction to create the table or ErrorKind::kAlreadyExists if the
137140
/// table already exists
138141
virtual Result<std::shared_ptr<Transaction>> StageCreateTable(
139142
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
140-
const std::string& location,
143+
const SortOrder& order, const std::string& location,
141144
const std::unordered_map<std::string, std::string>& properties) = 0;
142145

143146
/// \brief Check whether table exists

src/iceberg/catalog/memory/in_memory_catalog.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ Result<std::vector<TableIdentifier>> InMemoryCatalog::ListTables(
401401

402402
Result<std::unique_ptr<Table>> InMemoryCatalog::CreateTable(
403403
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
404-
const std::string& location,
404+
const SortOrder& order, const std::string& location,
405405
const std::unordered_map<std::string, std::string>& properties) {
406406
std::unique_lock lock(mutex_);
407407
return NotImplemented("create table");
@@ -441,7 +441,7 @@ Result<std::unique_ptr<Table>> InMemoryCatalog::UpdateTable(
441441

442442
Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
443443
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
444-
const std::string& location,
444+
const SortOrder& order, const std::string& location,
445445
const std::unordered_map<std::string, std::string>& properties) {
446446
std::unique_lock lock(mutex_);
447447
return NotImplemented("stage create table");

src/iceberg/catalog/memory/in_memory_catalog.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ class ICEBERG_EXPORT InMemoryCatalog
7272

7373
Result<std::unique_ptr<Table>> CreateTable(
7474
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
75-
const std::string& location,
75+
const SortOrder& order, const std::string& location,
7676
const std::unordered_map<std::string, std::string>& properties) override;
7777

7878
Result<std::unique_ptr<Table>> UpdateTable(
@@ -82,7 +82,7 @@ class ICEBERG_EXPORT InMemoryCatalog
8282

8383
Result<std::shared_ptr<Transaction>> StageCreateTable(
8484
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
85-
const std::string& location,
85+
const SortOrder& order, const std::string& location,
8686
const std::unordered_map<std::string, std::string>& properties) override;
8787

8888
Result<bool> TableExists(const TableIdentifier& identifier) const override;

src/iceberg/catalog/rest/json_internal.cc

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include "iceberg/catalog/rest/json_internal.h"
2121

22+
#include <memory>
2223
#include <string>
2324
#include <utility>
2425
#include <vector>
@@ -27,6 +28,8 @@
2728

2829
#include "iceberg/catalog/rest/types.h"
2930
#include "iceberg/json_internal.h"
31+
#include "iceberg/partition_spec.h"
32+
#include "iceberg/sort_order.h"
3033
#include "iceberg/table_identifier.h"
3134
#include "iceberg/util/json_util_internal.h"
3235
#include "iceberg/util/macros.h"
@@ -336,6 +339,57 @@ Result<ListTablesResponse> ListTablesResponseFromJson(const nlohmann::json& json
336339
return response;
337340
}
338341

342+
nlohmann::json ToJson(const CreateTableRequest& request) {
343+
nlohmann::json json;
344+
json[kName] = request.name;
345+
SetOptionalStringField(json, kLocation, request.location);
346+
if (request.schema) {
347+
json[kSchema] = ToJson(*request.schema);
348+
}
349+
if (request.partition_spec) {
350+
json[kPartitionSpec] = ToJson(*request.partition_spec);
351+
}
352+
if (request.write_order) {
353+
json[kWriteOrder] = ToJson(*request.write_order);
354+
}
355+
if (request.stage_create) {
356+
json[kStageCreate] = request.stage_create;
357+
}
358+
SetContainerField(json, kProperties, request.properties);
359+
return json;
360+
}
361+
362+
Result<CreateTableRequest> CreateTableRequestFromJson(const nlohmann::json& json) {
363+
CreateTableRequest request;
364+
ICEBERG_ASSIGN_OR_RAISE(request.name, GetJsonValue<std::string>(json, kName));
365+
ICEBERG_ASSIGN_OR_RAISE(request.location,
366+
GetJsonValueOrDefault<std::string>(json, kLocation));
367+
ICEBERG_ASSIGN_OR_RAISE(auto schema_json, GetJsonValue<nlohmann::json>(json, kSchema));
368+
ICEBERG_ASSIGN_OR_RAISE(request.schema, SchemaFromJson(schema_json));
369+
370+
if (auto spec_json_result = GetJsonValue<nlohmann::json>(json, kPartitionSpec);
371+
spec_json_result.has_value()) {
372+
ICEBERG_ASSIGN_OR_RAISE(
373+
request.partition_spec,
374+
PartitionSpecFromJson(request.schema, spec_json_result.value(),
375+
PartitionSpec::kInitialSpecId));
376+
}
377+
378+
if (auto order_json_result = GetJsonValue<nlohmann::json>(json, kWriteOrder);
379+
order_json_result.has_value()) {
380+
ICEBERG_ASSIGN_OR_RAISE(request.write_order,
381+
SortOrderFromJson(order_json_result.value(), request.schema));
382+
}
383+
384+
ICEBERG_ASSIGN_OR_RAISE(request.stage_create,
385+
GetJsonValueOrDefault<bool>(json, kStageCreate, false));
386+
ICEBERG_ASSIGN_OR_RAISE(
387+
request.properties,
388+
GetJsonValueOrDefault<decltype(request.properties)>(json, kProperties));
389+
ICEBERG_RETURN_UNEXPECTED(request.Validate());
390+
return request;
391+
}
392+
339393
#define ICEBERG_DEFINE_FROM_JSON(Model) \
340394
template <> \
341395
Result<Model> FromJson<Model>(const nlohmann::json& json) { \
@@ -354,5 +408,6 @@ ICEBERG_DEFINE_FROM_JSON(ListTablesResponse)
354408
ICEBERG_DEFINE_FROM_JSON(LoadTableResult)
355409
ICEBERG_DEFINE_FROM_JSON(RegisterTableRequest)
356410
ICEBERG_DEFINE_FROM_JSON(RenameTableRequest)
411+
ICEBERG_DEFINE_FROM_JSON(CreateTableRequest)
357412

358413
} // namespace iceberg::rest

src/iceberg/catalog/rest/json_internal.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ ICEBERG_DECLARE_JSON_SERDE(ListTablesResponse)
5555
ICEBERG_DECLARE_JSON_SERDE(LoadTableResult)
5656
ICEBERG_DECLARE_JSON_SERDE(RegisterTableRequest)
5757
ICEBERG_DECLARE_JSON_SERDE(RenameTableRequest)
58+
ICEBERG_DECLARE_JSON_SERDE(CreateTableRequest)
5859

5960
#undef ICEBERG_DECLARE_JSON_SERDE
6061

src/iceberg/catalog/rest/rest_catalog.cc

Lines changed: 54 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ Result<CatalogConfig> FetchServerConfig(const ResourcePaths& paths,
7777

7878
RestCatalog::~RestCatalog() = default;
7979

80-
Result<std::unique_ptr<RestCatalog>> RestCatalog::Make(
81-
const RestCatalogProperties& config) {
80+
Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
81+
const RestCatalogProperties& config, std::shared_ptr<FileIO> file_io) {
8282
ICEBERG_ASSIGN_OR_RAISE(auto uri, config.Uri());
8383
ICEBERG_ASSIGN_OR_RAISE(
8484
auto paths, ResourcePaths::Make(std::string(TrimTrailingSlash(uri)),
@@ -103,18 +103,21 @@ Result<std::unique_ptr<RestCatalog>> RestCatalog::Make(
103103
ICEBERG_ASSIGN_OR_RAISE(auto final_uri, final_config->Uri());
104104
ICEBERG_RETURN_UNEXPECTED(paths->SetBaseUri(std::string(TrimTrailingSlash(final_uri))));
105105

106-
return std::unique_ptr<RestCatalog>(
107-
new RestCatalog(std::move(final_config), std::move(paths), std::move(endpoints)));
106+
return std::shared_ptr<RestCatalog>(
107+
new RestCatalog(std::move(final_config), std::move(paths), std::move(endpoints),
108+
std::move(file_io)));
108109
}
109110

110111
RestCatalog::RestCatalog(std::unique_ptr<RestCatalogProperties> config,
111112
std::unique_ptr<ResourcePaths> paths,
112-
std::unordered_set<Endpoint> endpoints)
113+
std::unordered_set<Endpoint> endpoints,
114+
std::shared_ptr<FileIO> file_io)
113115
: config_(std::move(config)),
114116
client_(std::make_unique<HttpClient>(config_->ExtractHeaders())),
115117
paths_(std::move(paths)),
116118
name_(config_->Get(RestCatalogProperties::kName)),
117-
supported_endpoints_(std::move(endpoints)) {}
119+
supported_endpoints_(std::move(endpoints)),
120+
file_io_(std::move(file_io)) {}
118121

119122
std::string_view RestCatalog::name() const { return name_; }
120123

@@ -241,11 +244,50 @@ Result<std::vector<TableIdentifier>> RestCatalog::ListTables(
241244
}
242245

243246
Result<std::unique_ptr<Table>> RestCatalog::CreateTable(
244-
[[maybe_unused]] const TableIdentifier& identifier,
245-
[[maybe_unused]] const Schema& schema, [[maybe_unused]] const PartitionSpec& spec,
246-
[[maybe_unused]] const std::string& location,
247-
[[maybe_unused]] const std::unordered_map<std::string, std::string>& properties) {
248-
return NotImplemented("Not implemented");
247+
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
248+
const SortOrder& order, const std::string& location,
249+
const std::unordered_map<std::string, std::string>& properties) {
250+
ICEBERG_RETURN_UNEXPECTED(CheckEndpoint(supported_endpoints_, Endpoint::CreateTable()));
251+
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Tables(identifier.ns));
252+
253+
// Schema, partition spec, and sort order are not copyable due to Lazy<> members,
254+
// recreate them
255+
auto new_schema = std::make_shared<Schema>(
256+
std::vector<SchemaField>(schema.fields().begin(), schema.fields().end()),
257+
schema.schema_id());
258+
259+
ICEBERG_ASSIGN_OR_RAISE(
260+
std::shared_ptr<PartitionSpec> partition_spec,
261+
PartitionSpec::Make(
262+
spec.spec_id(),
263+
std::vector<PartitionField>(spec.fields().begin(), spec.fields().end()),
264+
spec.last_assigned_field_id()));
265+
266+
ICEBERG_ASSIGN_OR_RAISE(
267+
std::shared_ptr<SortOrder> sort_order,
268+
SortOrder::Make(order.order_id(), std::vector<SortField>(order.fields().begin(),
269+
order.fields().end())));
270+
271+
CreateTableRequest request{
272+
.name = identifier.name,
273+
.location = location,
274+
.schema = new_schema,
275+
.partition_spec = partition_spec,
276+
.write_order = sort_order,
277+
.stage_create = false,
278+
.properties = properties,
279+
};
280+
281+
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
282+
ICEBERG_ASSIGN_OR_RAISE(
283+
const auto response,
284+
client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance()));
285+
286+
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
287+
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
288+
return std::make_unique<Table>(identifier, std::move(load_result.metadata),
289+
std::move(load_result.metadata_location), file_io_,
290+
std::static_pointer_cast<Catalog>(shared_from_this()));
249291
}
250292

251293
Result<std::unique_ptr<Table>> RestCatalog::UpdateTable(
@@ -258,7 +300,7 @@ Result<std::unique_ptr<Table>> RestCatalog::UpdateTable(
258300
Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
259301
[[maybe_unused]] const TableIdentifier& identifier,
260302
[[maybe_unused]] const Schema& schema, [[maybe_unused]] const PartitionSpec& spec,
261-
[[maybe_unused]] const std::string& location,
303+
[[maybe_unused]] const SortOrder& order, [[maybe_unused]] const std::string& location,
262304
[[maybe_unused]] const std::unordered_map<std::string, std::string>& properties) {
263305
return NotImplemented("Not implemented");
264306
}

src/iceberg/catalog/rest/rest_catalog.h

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
#pragma once
2121

2222
#include <memory>
23-
#include <set>
2423
#include <string>
24+
#include <unordered_set>
2525

2626
#include "iceberg/catalog.h"
2727
#include "iceberg/catalog/rest/endpoint.h"
@@ -35,7 +35,8 @@
3535
namespace iceberg::rest {
3636

3737
/// \brief Rest catalog implementation.
38-
class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
38+
class ICEBERG_REST_EXPORT RestCatalog : public Catalog,
39+
public std::enable_shared_from_this<RestCatalog> {
3940
public:
4041
~RestCatalog() override;
4142

@@ -47,8 +48,10 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
4748
/// \brief Create a RestCatalog instance
4849
///
4950
/// \param config the configuration for the RestCatalog
50-
/// \return a unique_ptr to RestCatalog instance
51-
static Result<std::unique_ptr<RestCatalog>> Make(const RestCatalogProperties& config);
51+
/// \param file_io the FileIO instance to use for table operations
52+
/// \return a shared_ptr to RestCatalog instance
53+
static Result<std::shared_ptr<RestCatalog>> Make(const RestCatalogProperties& config,
54+
std::shared_ptr<FileIO> file_io);
5255

5356
std::string_view name() const override;
5457

@@ -73,7 +76,7 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
7376

7477
Result<std::unique_ptr<Table>> CreateTable(
7578
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
76-
const std::string& location,
79+
const SortOrder& order, const std::string& location,
7780
const std::unordered_map<std::string, std::string>& properties) override;
7881

7982
Result<std::unique_ptr<Table>> UpdateTable(
@@ -83,7 +86,7 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
8386

8487
Result<std::shared_ptr<Transaction>> StageCreateTable(
8588
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
86-
const std::string& location,
89+
const SortOrder& order, const std::string& location,
8790
const std::unordered_map<std::string, std::string>& properties) override;
8891

8992
Result<bool> TableExists(const TableIdentifier& identifier) const override;
@@ -101,13 +104,14 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
101104
private:
102105
RestCatalog(std::unique_ptr<RestCatalogProperties> config,
103106
std::unique_ptr<ResourcePaths> paths,
104-
std::unordered_set<Endpoint> endpoints);
107+
std::unordered_set<Endpoint> endpoints, std::shared_ptr<FileIO> file_io);
105108

106109
std::unique_ptr<RestCatalogProperties> config_;
107110
std::unique_ptr<HttpClient> client_;
108111
std::unique_ptr<ResourcePaths> paths_;
109112
std::string name_;
110113
std::unordered_set<Endpoint> supported_endpoints_;
114+
std::shared_ptr<FileIO> file_io_;
111115
};
112116

113117
} // namespace iceberg::rest

src/iceberg/catalog/rest/types.h

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "iceberg/catalog/rest/endpoint.h"
2828
#include "iceberg/catalog/rest/iceberg_rest_export.h"
2929
#include "iceberg/result.h"
30+
#include "iceberg/schema.h"
3031
#include "iceberg/table_identifier.h"
3132
#include "iceberg/type_fwd.h"
3233
#include "iceberg/util/macros.h"
@@ -126,6 +127,28 @@ struct ICEBERG_REST_EXPORT RenameTableRequest {
126127
}
127128
};
128129

130+
/// \brief Request to create a table.
131+
struct ICEBERG_REST_EXPORT CreateTableRequest {
132+
std::string name; // required
133+
std::string location;
134+
std::shared_ptr<Schema> schema; // required
135+
std::shared_ptr<PartitionSpec> partition_spec;
136+
std::shared_ptr<SortOrder> write_order;
137+
bool stage_create = false;
138+
std::unordered_map<std::string, std::string> properties;
139+
140+
/// \brief Validates the CreateTableRequest.
141+
Status Validate() const {
142+
if (name.empty()) {
143+
return Invalid("Missing table name");
144+
}
145+
if (!schema) {
146+
return Invalid("Missing schema");
147+
}
148+
return {};
149+
}
150+
};
151+
129152
/// \brief An opaque token that allows clients to make use of pagination for list APIs.
130153
using PageToken = std::string;
131154

src/iceberg/json_internal.cc

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -220,18 +220,6 @@ Result<std::unique_ptr<SortOrder>> SortOrderFromJson(
220220
return SortOrder::Make(*current_schema, order_id, std::move(sort_fields));
221221
}
222222

223-
Result<std::unique_ptr<SortOrder>> SortOrderFromJson(const nlohmann::json& json) {
224-
ICEBERG_ASSIGN_OR_RAISE(auto order_id, GetJsonValue<int32_t>(json, kOrderId));
225-
ICEBERG_ASSIGN_OR_RAISE(auto fields, GetJsonValue<nlohmann::json>(json, kFields));
226-
227-
std::vector<SortField> sort_fields;
228-
for (const auto& field_json : fields) {
229-
ICEBERG_ASSIGN_OR_RAISE(auto sort_field, SortFieldFromJson(field_json));
230-
sort_fields.push_back(std::move(*sort_field));
231-
}
232-
return SortOrder::Make(order_id, std::move(sort_fields));
233-
}
234-
235223
nlohmann::json ToJson(const SchemaField& field) {
236224
nlohmann::json json;
237225
json[kId] = field.field_id();

src/iceberg/table.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ std::unique_ptr<Transaction> Table::NewTransaction() const {
120120

121121
const std::shared_ptr<FileIO>& Table::io() const { return io_; }
122122

123+
const std::shared_ptr<TableMetadata>& Table::metadata() const { return metadata_; }
124+
123125
std::unique_ptr<TableScanBuilder> Table::NewScan() const {
124126
return std::make_unique<TableScanBuilder>(metadata_, io_);
125127
}

0 commit comments

Comments
 (0)