Skip to content

Commit 0f40163

Browse files
committed
fix: review comments
1 parent 637a55a commit 0f40163

22 files changed

+382
-311
lines changed

src/iceberg/avro/avro_writer.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -154,19 +154,19 @@ Status AvroWriter::Close() {
154154
return {};
155155
}
156156

157-
std::optional<Metrics> AvroWriter::metrics() {
157+
Result<Metrics> AvroWriter::metrics() {
158158
if (impl_->Closed()) {
159159
// TODO(xiao.dong) implement metrics
160160
return {};
161161
}
162-
return std::nullopt;
162+
return Invalid("AvroWriter is not closed");
163163
}
164164

165-
std::optional<int64_t> AvroWriter::length() {
165+
Result<int64_t> AvroWriter::length() {
166166
if (impl_->Closed()) {
167167
return impl_->length();
168168
}
169-
return std::nullopt;
169+
return Invalid("AvroWriter is not closed");
170170
}
171171

172172
std::vector<int64_t> AvroWriter::split_offsets() { return {}; }

src/iceberg/avro/avro_writer.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ class ICEBERG_BUNDLE_EXPORT AvroWriter : public Writer {
3737

3838
Status Write(ArrowArray* data) final;
3939

40-
std::optional<Metrics> metrics() final;
40+
Result<Metrics> metrics() final;
4141

42-
std::optional<int64_t> length() final;
42+
Result<int64_t> length() final;
4343

4444
std::vector<int64_t> split_offsets() final;
4545

src/iceberg/file_writer.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,11 @@ class ICEBERG_EXPORT Writer {
9797

9898
/// \brief Get the file statistics.
9999
/// Only valid after the file is closed.
100-
virtual std::optional<Metrics> metrics() = 0;
100+
virtual Result<Metrics> metrics() = 0;
101101

102102
/// \brief Get the file length.
103103
/// Only valid after the file is closed.
104-
virtual std::optional<int64_t> length() = 0;
104+
virtual Result<int64_t> length() = 0;
105105

106106
/// \brief Returns a list of recommended split locations, if applicable, empty
107107
/// otherwise. When available, this information is used for planning scan tasks whose

src/iceberg/manifest_adapter.cc

Lines changed: 32 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -164,32 +164,7 @@ ManifestEntryAdapter::~ManifestEntryAdapter() {
164164
}
165165
}
166166

167-
Status ManifestEntryAdapter::AddEntry(ManifestEntry& entry) {
168-
ICEBERG_RETURN_UNEXPECTED(CheckDataFile(*entry.data_file));
169-
entry.status = ManifestStatus::kAdded;
170-
entry.snapshot_id = snapshot_id_;
171-
if (entry.sequence_number.has_value() &&
172-
entry.sequence_number.value() < TableMetadata::kInitialSequenceNumber) {
173-
entry.sequence_number = std::nullopt;
174-
}
175-
entry.file_sequence_number = std::nullopt;
176-
return AddEntryInternal(entry);
177-
}
178-
179-
Status ManifestEntryAdapter::AddDeleteEntry(ManifestEntry& entry) {
180-
ICEBERG_RETURN_UNEXPECTED(CheckDataFile(*entry.data_file));
181-
entry.status = ManifestStatus::kDeleted;
182-
entry.snapshot_id = snapshot_id_;
183-
return AddEntryInternal(entry);
184-
}
185-
186-
Status ManifestEntryAdapter::AddExistingEntry(ManifestEntry& entry) {
187-
ICEBERG_RETURN_UNEXPECTED(CheckDataFile(*entry.data_file));
188-
entry.status = ManifestStatus::kExisting;
189-
return AddEntryInternal(entry);
190-
}
191-
192-
ManifestFile ManifestEntryAdapter::ToManifestFile() const {
167+
Result<ManifestFile> ManifestEntryAdapter::ToManifestFile() const {
193168
ManifestFile manifest_file;
194169
manifest_file.partition_spec_id = partition_spec_->spec_id();
195170
manifest_file.content = content_;
@@ -199,75 +174,17 @@ ManifestFile ManifestEntryAdapter::ToManifestFile() const {
199174
manifest_file.min_sequence_number =
200175
min_sequence_number_.value_or(TableMetadata::kInvalidSequenceNumber);
201176
manifest_file.existing_files_count = existing_files_count_;
202-
manifest_file.added_snapshot_id = snapshot_id_.value_or(Snapshot::kInvalidSnapshotId);
203177
manifest_file.added_files_count = add_files_count_;
204178
manifest_file.existing_files_count = existing_files_count_;
205179
manifest_file.deleted_files_count = delete_files_count_;
206180
manifest_file.added_rows_count = add_rows_count_;
207181
manifest_file.existing_rows_count = existing_rows_count_;
208182
manifest_file.deleted_rows_count = delete_rows_count_;
209-
manifest_file.partitions = std::move(partition_summary_->Summaries());
183+
ICEBERG_ASSIGN_OR_RAISE(auto partition_summary, partition_summary_->Summaries());
184+
manifest_file.partitions = std::move(partition_summary);
210185
return manifest_file;
211186
}
212187

213-
Status ManifestEntryAdapter::CheckDataFile(const DataFile& file) const {
214-
switch (content_) {
215-
case ManifestContent::kData:
216-
if (file.content != DataFile::Content::kData) {
217-
return InvalidArgument(
218-
"Manifest content type: data, data file content should be: data, but got: {}",
219-
ToString(file.content));
220-
}
221-
break;
222-
case ManifestContent::kDeletes:
223-
if (file.content != DataFile::Content::kPositionDeletes &&
224-
file.content != DataFile::Content::kEqualityDeletes) {
225-
return InvalidArgument(
226-
"Manifest content type: deletes, data file content should be: "
227-
"position_deletes or equality_deletes, but got: {}",
228-
ToString(file.content));
229-
}
230-
break;
231-
default:
232-
std::unreachable();
233-
}
234-
return {};
235-
}
236-
237-
Status ManifestEntryAdapter::AddEntryInternal(const ManifestEntry& entry) {
238-
if (entry.data_file == nullptr) [[unlikely]] {
239-
return InvalidManifest("Missing required data_file field from manifest entry.");
240-
}
241-
242-
switch (entry.status) {
243-
case ManifestStatus::kAdded:
244-
add_files_count_++;
245-
add_rows_count_ += entry.data_file->record_count;
246-
break;
247-
case ManifestStatus::kExisting:
248-
existing_files_count_++;
249-
existing_rows_count_ += entry.data_file->record_count;
250-
break;
251-
case ManifestStatus::kDeleted:
252-
delete_files_count_++;
253-
delete_rows_count_ += entry.data_file->record_count;
254-
break;
255-
default:
256-
std::unreachable();
257-
}
258-
259-
ICEBERG_RETURN_UNEXPECTED(partition_summary_->Update(entry.data_file->partition));
260-
261-
if (entry.IsAlive() && entry.sequence_number.has_value()) {
262-
if (!min_sequence_number_.has_value() ||
263-
entry.sequence_number.value() < min_sequence_number_.value()) {
264-
min_sequence_number_ = entry.sequence_number.value();
265-
}
266-
}
267-
268-
return AppendInternal(entry);
269-
}
270-
271188
Status ManifestEntryAdapter::AppendPartitionValues(
272189
ArrowArray* array, const std::shared_ptr<StructType>& partition_type,
273190
const std::vector<Literal>& partition_values) {
@@ -425,13 +342,16 @@ Status ManifestEntryAdapter::AppendDataFile(
425342
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(child_array, 1));
426343
}
427344
break;
428-
case 142: // first_row_id (optional int64)
429-
if (file.first_row_id.has_value()) {
430-
ICEBERG_RETURN_UNEXPECTED(AppendField(child_array, file.first_row_id.value()));
345+
case 142: {
346+
// first_row_id (optional int64)
347+
ICEBERG_ASSIGN_OR_RAISE(auto first_row_id, GetFirstRowId(file));
348+
if (first_row_id.has_value()) {
349+
ICEBERG_RETURN_UNEXPECTED(AppendField(child_array, first_row_id.value()));
431350
} else {
432351
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(child_array, 1));
433352
}
434353
break;
354+
}
435355
case 143: {
436356
// referenced_data_file (optional string)
437357
ICEBERG_ASSIGN_OR_RAISE(auto referenced_data_file, GetReferenceDataFile(file));
@@ -493,6 +413,29 @@ Result<std::optional<int64_t>> ManifestEntryAdapter::GetContentSizeInBytes(
493413
}
494414

495415
Status ManifestEntryAdapter::AppendInternal(const ManifestEntry& entry) {
416+
if (entry.data_file == nullptr) [[unlikely]] {
417+
return InvalidManifest("Missing required data_file field from manifest entry.");
418+
}
419+
420+
switch (entry.status) {
421+
case ManifestStatus::kAdded:
422+
add_files_count_++;
423+
add_rows_count_ += entry.data_file->record_count;
424+
break;
425+
case ManifestStatus::kExisting:
426+
existing_files_count_++;
427+
existing_rows_count_ += entry.data_file->record_count;
428+
break;
429+
case ManifestStatus::kDeleted:
430+
delete_files_count_++;
431+
delete_rows_count_ += entry.data_file->record_count;
432+
break;
433+
default:
434+
std::unreachable();
435+
}
436+
437+
ICEBERG_RETURN_UNEXPECTED(partition_summary_->Update(entry.data_file->partition));
438+
496439
const auto& fields = manifest_schema_->fields();
497440
for (size_t i = 0; i < fields.size(); i++) {
498441
const auto& field = fields[i];

src/iceberg/manifest_adapter.h

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -66,39 +66,19 @@ class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
6666

6767
~ManifestEntryAdapter() override;
6868

69-
/// \brief Add a new entry to the manifest.
70-
///
71-
/// This method will update following status of the entry:
72-
/// - Update the entry status to `Added`
73-
/// - Set the snapshot id to the current snapshot id
74-
/// - Set the sequence number to nullopt if it is invalid(smaller than 0)
75-
/// - Set the file sequence number to nullopt
76-
virtual Status AddEntry(ManifestEntry& entry);
77-
78-
/// \brief Add a delete entry to the manifest.
79-
///
80-
/// This method will update following status of the entry:
81-
/// - Update the entry status to `Deleted`
82-
/// - Set the snapshot id to the current snapshot id
83-
virtual Status AddDeleteEntry(ManifestEntry& entry);
84-
85-
/// \brief Add an existing entry to the manifest.
86-
///
87-
/// This method will update following status of the entry:
88-
/// - Update the entry status to `Existing`
89-
virtual Status AddExistingEntry(ManifestEntry& entry);
69+
virtual Status Append(const ManifestEntry& entry) = 0;
9070

9171
const std::shared_ptr<Schema>& schema() const { return manifest_schema_; }
9272

9373
ManifestContent content() const { return content_; }
9474

75+
std::optional<int64_t> snapshot_id() const { return snapshot_id_; }
76+
9577
/// \brief Create a ManifestFile object without setting file metadata, such as
9678
/// location, file size, key metadata, etc.
97-
ManifestFile ToManifestFile() const;
79+
Result<ManifestFile> ToManifestFile() const;
9880

9981
protected:
100-
Status CheckDataFile(const DataFile& file) const;
101-
Status AddEntryInternal(const ManifestEntry& entry);
10282
Status AppendInternal(const ManifestEntry& entry);
10383
Status AppendDataFile(ArrowArray* array,
10484
const std::shared_ptr<StructType>& data_file_type,
@@ -139,7 +119,7 @@ class ICEBERG_EXPORT ManifestFileAdapter : public ManifestAdapter {
139119
ManifestFileAdapter() = default;
140120
~ManifestFileAdapter() override;
141121

142-
virtual Status Append(ManifestFile& file) = 0;
122+
virtual Status Append(const ManifestFile& file) = 0;
143123

144124
const std::shared_ptr<Schema>& schema() const { return manifest_list_schema_; }
145125

src/iceberg/manifest_entry.h

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -311,9 +311,21 @@ struct ICEBERG_EXPORT ManifestEntry {
311311
return status == ManifestStatus::kAdded || status == ManifestStatus::kExisting;
312312
}
313313

314-
/// \brief Create a copy of this manifest entry.
315-
ManifestEntry Copy() const {
314+
ManifestEntry AsAdded() const {
316315
ManifestEntry copy = *this;
316+
copy.status = ManifestStatus::kAdded;
317+
return copy;
318+
}
319+
320+
ManifestEntry AsExisting() const {
321+
ManifestEntry copy = *this;
322+
copy.status = ManifestStatus::kExisting;
323+
return copy;
324+
}
325+
326+
ManifestEntry AsDeleted() const {
327+
ManifestEntry copy = *this;
328+
copy.status = ManifestStatus::kDeleted;
317329
return copy;
318330
}
319331

src/iceberg/manifest_list.h

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -198,12 +198,6 @@ struct ICEBERG_EXPORT ManifestFile {
198198
bool operator==(const ManifestFile& other) const = default;
199199

200200
static const std::shared_ptr<Schema>& Type();
201-
202-
/// \brief Create a copy of this manifest file.
203-
ManifestFile Copy() const {
204-
ManifestFile copy = *this;
205-
return copy;
206-
}
207201
};
208202

209203
/// Snapshots are embedded in table metadata, but the list of manifests for a snapshot are

0 commit comments

Comments
 (0)