Skip to content

Commit 61a7de5

Browse files
authored
feat: add schema update to table metadata builder (#437)
1 parent a6ff753 commit 61a7de5

15 files changed

+711
-45
lines changed

src/iceberg/partition_spec.cc

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ bool PartitionSpec::Equals(const PartitionSpec& other) const {
131131
}
132132

133133
Status PartitionSpec::Validate(const Schema& schema, bool allow_missing_fields) const {
134+
ICEBERG_RETURN_UNEXPECTED(ValidatePartitionName(schema, *this));
135+
134136
std::unordered_map<int32_t, int32_t> parents = IndexParents(schema);
135137
for (const auto& partition_field : fields_) {
136138
ICEBERG_ASSIGN_OR_RAISE(auto source_field,
@@ -177,6 +179,43 @@ Status PartitionSpec::Validate(const Schema& schema, bool allow_missing_fields)
177179
return {};
178180
}
179181

182+
Status PartitionSpec::ValidatePartitionName(const Schema& schema,
183+
const PartitionSpec& spec) {
184+
std::unordered_set<std::string> partition_names;
185+
for (const auto& partition_field : spec.fields()) {
186+
auto name = std::string(partition_field.name());
187+
ICEBERG_PRECHECK(!name.empty(), "Cannot use empty partition name: {}", name);
188+
189+
if (partition_names.contains(name)) {
190+
return InvalidArgument("Cannot use partition name more than once: {}", name);
191+
}
192+
partition_names.insert(name);
193+
194+
ICEBERG_ASSIGN_OR_RAISE(auto schema_field, schema.FindFieldByName(name));
195+
auto transform_type = partition_field.transform()->transform_type();
196+
if (transform_type == TransformType::kIdentity ||
197+
transform_type == TransformType::kVoid) {
198+
// for identity/nulls transform case we allow conflicts between partition and schema
199+
// field name as long as they are sourced from the same schema field
200+
if (schema_field.has_value() &&
201+
schema_field.value().get().field_id() != partition_field.source_id()) {
202+
return InvalidArgument(
203+
"Cannot create identity partition sourced from different field in schema: {}",
204+
name);
205+
}
206+
} else {
207+
// for all other transforms we don't allow conflicts between partition name and
208+
// schema field name
209+
if (schema_field.has_value()) {
210+
return InvalidArgument(
211+
"Cannot create partition from name that exists in schema: {}", name);
212+
}
213+
}
214+
}
215+
216+
return {};
217+
}
218+
180219
Result<std::vector<std::reference_wrapper<const PartitionField>>>
181220
PartitionSpec::GetFieldsBySourceId(int32_t source_id) const {
182221
ICEBERG_ASSIGN_OR_RAISE(auto source_id_to_fields, source_id_to_fields_.Get(*this));

src/iceberg/partition_spec.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,15 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
7979

8080
/// \brief Validates the partition spec against a schema.
8181
/// \param schema The schema to validate against.
82-
/// \param allowMissingFields Whether to skip validation for partition fields whose
82+
/// \param allow_missing_fields Whether to skip validation for partition fields whose
8383
/// source columns have been dropped from the schema.
8484
/// \return Error status if the partition spec is invalid.
8585
Status Validate(const Schema& schema, bool allow_missing_fields) const;
8686

87+
// \brief Validates the partition field names are unique within the partition spec and
88+
// schema.
89+
static Status ValidatePartitionName(const Schema& schema, const PartitionSpec& spec);
90+
8791
/// \brief Get the partition fields by source ID.
8892
/// \param source_id The id of the source field.
8993
/// \return The partition fields by source ID, or NotFound if the source field is not

src/iceberg/schema.cc

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "iceberg/result.h"
2626
#include "iceberg/row/struct_like.h"
2727
#include "iceberg/schema_internal.h"
28+
#include "iceberg/table_metadata.h"
2829
#include "iceberg/type.h"
2930
#include "iceberg/util/formatter.h" // IWYU pragma: keep
3031
#include "iceberg/util/macros.h"
@@ -147,6 +148,20 @@ Result<std::unordered_map<int32_t, std::vector<size_t>>> Schema::InitIdToPositio
147148
return visitor.Finish();
148149
}
149150

151+
Result<int32_t> Schema::InitHighestFieldId(const Schema& self) {
152+
ICEBERG_ASSIGN_OR_RAISE(auto id_to_field, self.id_to_field_.Get(self));
153+
154+
if (id_to_field.get().empty()) {
155+
return kInitialColumnId;
156+
}
157+
158+
auto max_it = std::ranges::max_element(
159+
id_to_field.get(),
160+
[](const auto& lhs, const auto& rhs) { return lhs.first < rhs.first; });
161+
162+
return max_it->first;
163+
}
164+
150165
Result<std::unique_ptr<StructLikeAccessor>> Schema::GetAccessorById(
151166
int32_t field_id) const {
152167
ICEBERG_ASSIGN_OR_RAISE(auto id_to_position_path, id_to_position_path_.Get(*this));
@@ -227,4 +242,33 @@ Result<std::vector<std::string>> Schema::IdentifierFieldNames() const {
227242
return names;
228243
}
229244

245+
Result<int32_t> Schema::HighestFieldId() const { return highest_field_id_.Get(*this); }
246+
247+
bool Schema::SameSchema(const Schema& other) const {
248+
return fields_ == other.fields_ && identifier_field_ids_ == other.identifier_field_ids_;
249+
}
250+
251+
Status Schema::Validate(int32_t format_version) const {
252+
// Get all fields including nested ones
253+
ICEBERG_ASSIGN_OR_RAISE(auto id_to_field, id_to_field_.Get(*this));
254+
255+
// Check each field's type and defaults
256+
for (const auto& [field_id, field_ref] : id_to_field.get()) {
257+
const auto& field = field_ref.get();
258+
259+
// Check if the field's type requires a minimum format version
260+
if (auto it = TableMetadata::kMinFormatVersions.find(field.type()->type_id());
261+
it != TableMetadata::kMinFormatVersions.end()) {
262+
if (int32_t min_format_version = it->second; format_version < min_format_version) {
263+
return InvalidSchema("Invalid type for {}: {} is not supported until v{}",
264+
field.name(), *field.type(), min_format_version);
265+
}
266+
}
267+
268+
// TODO(GuoTao.yu): Check default values when they are supported
269+
}
270+
271+
return {};
272+
}
273+
230274
} // namespace iceberg

src/iceberg/schema.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ namespace iceberg {
4646
class ICEBERG_EXPORT Schema : public StructType {
4747
public:
4848
static constexpr int32_t kInitialSchemaId = 0;
49+
static constexpr int32_t kInitialColumnId = 0;
4950
static constexpr int32_t kInvalidColumnId = -1;
5051

5152
/// \brief Special value to select all columns from manifest files.
@@ -130,6 +131,23 @@ class ICEBERG_EXPORT Schema : public StructType {
130131
/// \brief Return the canonical field names of the identifier fields.
131132
Result<std::vector<std::string>> IdentifierFieldNames() const;
132133

134+
/// \brief Get the highest field ID in the schema.
135+
/// \return The highest field ID.
136+
Result<int32_t> HighestFieldId() const;
137+
138+
/// \brief Checks whether this schema is equivalent to another schema while ignoring the
139+
/// schema id.
140+
bool SameSchema(const Schema& other) const;
141+
142+
/// \brief Validate the schema for a given format version.
143+
///
144+
/// This validates that the schema does not contain types that were released in later
145+
/// format versions.
146+
///
147+
/// \param format_version The format version to validate against.
148+
/// \return Error status if the schema is invalid.
149+
Status Validate(int32_t format_version) const;
150+
133151
friend bool operator==(const Schema& lhs, const Schema& rhs) { return lhs.Equals(rhs); }
134152

135153
private:
@@ -158,6 +176,7 @@ class ICEBERG_EXPORT Schema : public StructType {
158176
InitLowerCaseNameToIdMap(const Schema&);
159177
static Result<std::unordered_map<int32_t, std::vector<size_t>>> InitIdToPositionPath(
160178
const Schema&);
179+
static Result<int32_t> InitHighestFieldId(const Schema&);
161180

162181
const std::optional<int32_t> schema_id_;
163182
/// Field IDs that uniquely identify rows in the table.
@@ -170,6 +189,8 @@ class ICEBERG_EXPORT Schema : public StructType {
170189
Lazy<InitLowerCaseNameToIdMap> lowercase_name_to_id_;
171190
/// Mapping from field id to (nested) position path to access the field.
172191
Lazy<InitIdToPositionPath> id_to_position_path_;
192+
/// Highest field ID in the schema.
193+
Lazy<InitHighestFieldId> highest_field_id_;
173194
};
174195

175196
} // namespace iceberg

0 commit comments

Comments
 (0)