Skip to content

Commit 53e15a9

Browse files
authored
feat: make schema id non-optional with default value (#449)
1 parent a89924d commit 53e15a9

17 files changed

+119
-140
lines changed

src/iceberg/expression/residual_evaluator.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,8 +308,7 @@ class UnpartitionedResidualEvaluator : public ResidualEvaluator {
308308

309309
private:
310310
// Store an empty schema to avoid dangling reference when passing to base class
311-
inline static const std::shared_ptr<Schema> kEmptySchema_ =
312-
std::make_shared<Schema>(std::vector<SchemaField>{}, std::nullopt);
311+
inline static const std::shared_ptr<Schema> kEmptySchema_ = Schema::EmptySchema();
313312
};
314313

315314
} // namespace

src/iceberg/json_internal.cc

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ nlohmann::json ToJson(const Type& type) {
308308

309309
nlohmann::json ToJson(const Schema& schema) {
310310
nlohmann::json json = ToJson(static_cast<const Type&>(schema));
311-
SetOptionalField(json, kSchemaId, schema.schema_id());
311+
json[kSchemaId] = schema.schema_id();
312312
// TODO(gangwu): add identifier-field-ids.
313313
return json;
314314
}
@@ -466,14 +466,16 @@ Result<std::unique_ptr<SchemaField>> FieldFromJson(const nlohmann::json& json) {
466466
}
467467

468468
Result<std::unique_ptr<Schema>> SchemaFromJson(const nlohmann::json& json) {
469-
ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValueOptional<int32_t>(json, kSchemaId));
469+
ICEBERG_ASSIGN_OR_RAISE(auto schema_id_opt,
470+
GetJsonValueOptional<int32_t>(json, kSchemaId));
470471
ICEBERG_ASSIGN_OR_RAISE(auto type, TypeFromJson(json));
471472

472473
if (type->type_id() != TypeId::kStruct) [[unlikely]] {
473474
return JsonParseError("Schema must be a struct type, but got {}", SafeDumpJson(json));
474475
}
475476

476477
auto& struct_type = static_cast<StructType&>(*type);
478+
auto schema_id = schema_id_opt.value_or(Schema::kInitialSchemaId);
477479
return FromStructType(std::move(struct_type), schema_id);
478480
}
479481

@@ -762,7 +764,7 @@ nlohmann::json ToJson(const TableMetadata& table_metadata) {
762764
}
763765

764766
// write the current schema ID and schema list
765-
SetOptionalField(json, kCurrentSchemaId, table_metadata.current_schema_id);
767+
json[kCurrentSchemaId] = table_metadata.current_schema_id;
766768
json[kSchemas] = ToJsonList(table_metadata.schemas);
767769

768770
// for older readers, continue writing the default spec as "partition-spec"
@@ -824,8 +826,7 @@ namespace {
824826
///
825827
/// \return The current schema or parse error.
826828
Result<std::shared_ptr<Schema>> ParseSchemas(
827-
const nlohmann::json& json, int8_t format_version,
828-
std::optional<int32_t>& current_schema_id,
829+
const nlohmann::json& json, int8_t format_version, int32_t& current_schema_id,
829830
std::vector<std::shared_ptr<Schema>>& schemas) {
830831
std::shared_ptr<Schema> current_schema;
831832
if (json.contains(kSchemas)) {
@@ -848,7 +849,7 @@ Result<std::shared_ptr<Schema>> ParseSchemas(
848849
}
849850
if (!current_schema) {
850851
return JsonParseError("Cannot find schema with {}={} from {}", kCurrentSchemaId,
851-
current_schema_id.value(), SafeDumpJson(schema_array));
852+
current_schema_id, SafeDumpJson(schema_array));
852853
}
853854
} else {
854855
if (format_version != 1) {

src/iceberg/schema.cc

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,14 @@
3434

3535
namespace iceberg {
3636

37-
Schema::Schema(std::vector<SchemaField> fields, std::optional<int32_t> schema_id,
37+
Schema::Schema(std::vector<SchemaField> fields, int32_t schema_id,
3838
std::vector<int32_t> identifier_field_ids)
3939
: StructType(std::move(fields)),
4040
schema_id_(schema_id),
4141
identifier_field_ids_(std::move(identifier_field_ids)) {}
4242

4343
Result<std::unique_ptr<Schema>> Schema::Make(
44-
std::vector<SchemaField> fields, std::optional<int32_t> schema_id,
44+
std::vector<SchemaField> fields, int32_t schema_id,
4545
const std::vector<std::string>& identifier_field_names) {
4646
auto schema = std::make_unique<Schema>(std::move(fields), schema_id);
4747

@@ -57,7 +57,13 @@ Result<std::unique_ptr<Schema>> Schema::Make(
5757
return schema;
5858
}
5959

60-
std::optional<int32_t> Schema::schema_id() const { return schema_id_; }
60+
const std::shared_ptr<Schema>& Schema::EmptySchema() {
61+
static const auto empty_schema =
62+
std::make_shared<Schema>(std::vector<SchemaField>{}, kInitialSchemaId);
63+
return empty_schema;
64+
}
65+
66+
int32_t Schema::schema_id() const { return schema_id_; }
6167

6268
std::string Schema::ToString() const {
6369
std::string repr = "schema<";
@@ -196,7 +202,7 @@ Result<std::unique_ptr<Schema>> Schema::Select(std::span<const std::string> name
196202
auto pruned_type, visitor.Visit(std::shared_ptr<StructType>(ToStructType(*this))));
197203

198204
if (!pruned_type) {
199-
return std::make_unique<Schema>(std::vector<SchemaField>{}, std::nullopt);
205+
return std::make_unique<Schema>(std::vector<SchemaField>{}, kInitialSchemaId);
200206
}
201207

202208
if (pruned_type->type_id() != TypeId::kStruct) {
@@ -214,7 +220,7 @@ Result<std::unique_ptr<Schema>> Schema::Project(
214220
auto project_type, visitor.Visit(std::shared_ptr<StructType>(ToStructType(*this))));
215221

216222
if (!project_type) {
217-
return std::make_unique<Schema>(std::vector<SchemaField>{}, std::nullopt);
223+
return std::make_unique<Schema>(std::vector<SchemaField>{}, kInitialSchemaId);
218224
}
219225

220226
if (project_type->type_id() != TypeId::kStruct) {

src/iceberg/schema.h

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,7 @@ class ICEBERG_EXPORT Schema : public StructType {
5252
/// \brief Special value to select all columns from manifest files.
5353
static constexpr std::string_view kAllColumns = "*";
5454

55-
explicit Schema(std::vector<SchemaField> fields,
56-
std::optional<int32_t> schema_id = std::nullopt,
55+
explicit Schema(std::vector<SchemaField> fields, int32_t schema_id = kInitialSchemaId,
5756
std::vector<int32_t> identifier_field_ids = {});
5857

5958
/// \brief Create a schema.
@@ -63,14 +62,19 @@ class ICEBERG_EXPORT Schema : public StructType {
6362
/// \param identifier_field_names Canonical names of fields that uniquely identify rows
6463
/// in the table (default: empty). \return A new Schema instance or Status if failed.
6564
static Result<std::unique_ptr<Schema>> Make(
66-
std::vector<SchemaField> fields, std::optional<int32_t> schema_id = std::nullopt,
65+
std::vector<SchemaField> fields, int32_t schema_id = kInitialSchemaId,
6766
const std::vector<std::string>& identifier_field_names = {});
6867

68+
/// \brief Get an empty schema.
69+
///
70+
/// An empty schema has no fields and a schema ID of 0.
71+
static const std::shared_ptr<Schema>& EmptySchema();
72+
6973
/// \brief Get the schema ID.
7074
///
7175
/// A schema is identified by a unique ID for the purposes of schema
7276
/// evolution.
73-
std::optional<int32_t> schema_id() const;
77+
int32_t schema_id() const;
7478

7579
std::string ToString() const override;
7680

@@ -178,7 +182,7 @@ class ICEBERG_EXPORT Schema : public StructType {
178182
const Schema&);
179183
static Result<int32_t> InitHighestFieldId(const Schema&);
180184

181-
const std::optional<int32_t> schema_id_;
185+
const int32_t schema_id_;
182186
/// Field IDs that uniquely identify rows in the table.
183187
std::vector<int32_t> identifier_field_ids_;
184188
/// Mapping from field id to field.

src/iceberg/schema_internal.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,12 +304,13 @@ Result<std::shared_ptr<Type>> FromArrowSchema(const ArrowSchema& schema) {
304304
} // namespace
305305

306306
std::unique_ptr<Schema> FromStructType(StructType&& struct_type,
307-
std::optional<int32_t> schema_id) {
307+
std::optional<int32_t> schema_id_opt) {
308308
std::vector<SchemaField> fields;
309309
fields.reserve(struct_type.fields().size());
310310
for (auto& field : struct_type.fields()) {
311311
fields.emplace_back(std::move(field));
312312
}
313+
auto schema_id = schema_id_opt.value_or(Schema::kInitialSchemaId);
313314
return std::make_unique<Schema>(std::move(fields), schema_id);
314315
}
315316

src/iceberg/table_metadata.cc

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -233,13 +233,12 @@ Result<std::shared_ptr<Schema>> TableMetadata::Schema() const {
233233
return SchemaById(current_schema_id);
234234
}
235235

236-
Result<std::shared_ptr<Schema>> TableMetadata::SchemaById(
237-
std::optional<int32_t> schema_id) const {
236+
Result<std::shared_ptr<Schema>> TableMetadata::SchemaById(int32_t schema_id) const {
238237
auto iter = std::ranges::find_if(schemas, [schema_id](const auto& schema) {
239238
return schema != nullptr && schema->schema_id() == schema_id;
240239
});
241240
if (iter == schemas.end()) {
242-
return NotFound("Schema with ID {} is not found", schema_id.value_or(-1));
241+
return NotFound("Schema with ID {} is not found", schema_id);
243242
}
244243
return *iter;
245244
}
@@ -359,11 +358,8 @@ Result<TableMetadataCache::SnapshotsMapRef> TableMetadataCache::GetSnapshotsById
359358

360359
Result<TableMetadataCache::SchemasMap> TableMetadataCache::InitSchemasMap(
361360
const TableMetadata* metadata) {
362-
return metadata->schemas | std::views::filter([](const auto& schema) {
363-
return schema->schema_id().has_value();
364-
}) |
365-
std::views::transform([](const auto& schema) {
366-
return std::make_pair(schema->schema_id().value(), schema);
361+
return metadata->schemas | std::views::transform([](const auto& schema) {
362+
return std::make_pair(schema->schema_id(), schema);
367363
}) |
368364
std::ranges::to<SchemasMap>();
369365
}
@@ -548,9 +544,7 @@ class TableMetadataBuilder::Impl {
548544
: base_(base_metadata), metadata_(*base_metadata) {
549545
// Initialize index maps from base metadata
550546
for (const auto& schema : metadata_.schemas) {
551-
if (schema->schema_id().has_value()) {
552-
schemas_by_id_.emplace(schema->schema_id().value(), schema);
553-
}
547+
schemas_by_id_.emplace(schema->schema_id(), schema);
554548
}
555549

556550
for (const auto& spec : metadata_.partition_specs) {
@@ -920,14 +914,13 @@ Status TableMetadataBuilder::Impl::SetCurrentSchema(int32_t schema_id) {
920914

921915
Status TableMetadataBuilder::Impl::RemoveSchemas(
922916
const std::unordered_set<int32_t>& schema_ids) {
923-
auto current_schema_id = metadata_.current_schema_id.value_or(Schema::kInitialSchemaId);
917+
auto current_schema_id = metadata_.current_schema_id;
924918
ICEBERG_PRECHECK(!schema_ids.contains(current_schema_id),
925919
"Cannot remove current schema: {}", current_schema_id);
926920

927921
if (!schema_ids.empty()) {
928922
metadata_.schemas = metadata_.schemas | std::views::filter([&](const auto& schema) {
929-
return schema->schema_id().has_value() &&
930-
!schema_ids.contains(schema->schema_id().value());
923+
return !schema_ids.contains(schema->schema_id());
931924
}) |
932925
std::ranges::to<std::vector<std::shared_ptr<Schema>>>();
933926
changes_.push_back(std::make_unique<table::RemoveSchemas>(schema_ids));
@@ -999,7 +992,7 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataBuilder::Impl::Build() {
999992
std::chrono::system_clock::now().time_since_epoch())};
1000993
}
1001994

1002-
auto current_schema_id = metadata_.current_schema_id.value_or(Schema::kInitialSchemaId);
995+
auto current_schema_id = metadata_.current_schema_id;
1003996
auto schema_it = schemas_by_id_.find(current_schema_id);
1004997
ICEBERG_PRECHECK(schema_it != schemas_by_id_.end(),
1005998
"Current schema ID {} not found in schemas", current_schema_id);
@@ -1072,9 +1065,9 @@ int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewPartitionSpecId(
10721065
int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSchemaId(
10731066
const Schema& new_schema) const {
10741067
// if the schema already exists, use its id; otherwise use the highest id + 1
1075-
auto new_schema_id = metadata_.current_schema_id.value_or(Schema::kInitialSchemaId);
1068+
auto new_schema_id = metadata_.current_schema_id;
10761069
for (auto& schema : metadata_.schemas) {
1077-
auto schema_id = schema->schema_id().value_or(Schema::kInitialSchemaId);
1070+
auto schema_id = schema->schema_id();
10781071
if (schema->SameSchema(new_schema)) {
10791072
return schema_id;
10801073
} else if (new_schema_id <= schema_id) {

src/iceberg/table_metadata.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ struct ICEBERG_EXPORT TableMetadata {
9595
/// A list of schemas
9696
std::vector<std::shared_ptr<iceberg::Schema>> schemas;
9797
/// ID of the table's current schema
98-
std::optional<int32_t> current_schema_id;
98+
int32_t current_schema_id;
9999
/// A list of partition specs
100100
std::vector<std::shared_ptr<iceberg::PartitionSpec>> partition_specs;
101101
/// ID of the current partition spec that writers should use by default
@@ -136,8 +136,7 @@ struct ICEBERG_EXPORT TableMetadata {
136136
/// \brief Get the current schema, return NotFoundError if not found
137137
Result<std::shared_ptr<iceberg::Schema>> Schema() const;
138138
/// \brief Get the current schema by ID, return NotFoundError if not found
139-
Result<std::shared_ptr<iceberg::Schema>> SchemaById(
140-
std::optional<int32_t> schema_id) const;
139+
Result<std::shared_ptr<iceberg::Schema>> SchemaById(int32_t schema_id) const;
141140
/// \brief Get the current partition spec, return NotFoundError if not found
142141
Result<std::shared_ptr<iceberg::PartitionSpec>> PartitionSpec() const;
143142
/// \brief Get the current sort order, return NotFoundError if not found

src/iceberg/table_requirement.cc

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -90,15 +90,10 @@ Status AssertCurrentSchemaID::Validate(const TableMetadata* base) const {
9090
return CommitFailed("Requirement failed: current table metadata is missing");
9191
}
9292

93-
if (!base->current_schema_id.has_value()) {
94-
return CommitFailed(
95-
"Requirement failed: current schema ID is not set in table metadata");
96-
}
97-
98-
if (base->current_schema_id.value() != schema_id_) {
93+
if (base->current_schema_id != schema_id_) {
9994
return CommitFailed(
10095
"Requirement failed: current schema ID does not match, expected {} != {}",
101-
schema_id_, base->current_schema_id.value());
96+
schema_id_, base->current_schema_id);
10297
}
10398

10499
return {};

src/iceberg/table_requirements.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ void TableUpdateContext::RequireLastAssignedFieldIdUnchanged() {
5151
void TableUpdateContext::RequireCurrentSchemaIdUnchanged() {
5252
if (!added_current_schema_id_) {
5353
if (base_ != nullptr && !is_replace_) {
54-
AddRequirement(std::make_unique<table::AssertCurrentSchemaID>(
55-
base_->current_schema_id.value()));
54+
AddRequirement(
55+
std::make_unique<table::AssertCurrentSchemaID>(base_->current_schema_id));
5656
}
5757
added_current_schema_id_ = true;
5858
}

src/iceberg/table_scan.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,8 +216,7 @@ Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {
216216

217217
if (!context_.projected_schema) {
218218
const auto& snapshot = context_.snapshot;
219-
auto schema_id =
220-
snapshot->schema_id ? snapshot->schema_id : table_metadata->current_schema_id;
219+
auto schema_id = table_metadata->current_schema_id;
221220
ICEBERG_ASSIGN_OR_RAISE(auto schema, table_metadata->SchemaById(schema_id));
222221

223222
if (column_names_.empty()) {
@@ -231,7 +230,7 @@ Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {
231230
auto field_opt = schema->GetFieldByName(column_name);
232231
if (!field_opt) {
233232
return InvalidArgument("Column {} not found in schema '{}'", column_name,
234-
*schema_id);
233+
schema_id);
235234
}
236235
projected_fields.emplace_back(field_opt.value()->get());
237236
}

0 commit comments

Comments
 (0)