Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.

build/
cmake-build/
cmake-build-debug/
cmake-build-release/
.DS_Store
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ set(ICEBERG_SOURCES
name_mapping.cc
partition_field.cc
partition_spec.cc
partition_summary_internal.cc
row/arrow_array_wrapper.cc
row/manifest_wrapper.cc
row/struct_like.cc
Expand Down
8 changes: 4 additions & 4 deletions src/iceberg/avro/avro_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,19 +154,19 @@ Status AvroWriter::Close() {
return {};
}

std::optional<Metrics> AvroWriter::metrics() {
Result<Metrics> AvroWriter::metrics() {
if (impl_->Closed()) {
// TODO(xiao.dong) implement metrics
return {};
}
return std::nullopt;
return Invalid("AvroWriter is not closed");
}

std::optional<int64_t> AvroWriter::length() {
Result<int64_t> AvroWriter::length() {
if (impl_->Closed()) {
return impl_->length();
}
return std::nullopt;
return Invalid("AvroWriter is not closed");
}

std::vector<int64_t> AvroWriter::split_offsets() { return {}; }
Expand Down
4 changes: 2 additions & 2 deletions src/iceberg/avro/avro_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ class ICEBERG_BUNDLE_EXPORT AvroWriter : public Writer {

Status Write(ArrowArray* data) final;

std::optional<Metrics> metrics() final;
Result<Metrics> metrics() final;

std::optional<int64_t> length() final;
Result<int64_t> length() final;

std::vector<int64_t> split_offsets() final;

Expand Down
4 changes: 2 additions & 2 deletions src/iceberg/expression/literal.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,11 @@ class ICEBERG_EXPORT Literal : public util::Formattable {
/// \return true if this literal represents a BelowMin value, false otherwise
bool IsBelowMin() const;

/// Check if this literal is null.
/// \brief Check if this literal is null.
/// \return true if this literal is null, false otherwise
bool IsNull() const;

/// Check if this literal is NaN.
/// \brief Check if this literal is NaN.
/// \return true if this literal is NaN, false otherwise
bool IsNaN() const;

Expand Down
4 changes: 2 additions & 2 deletions src/iceberg/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ class ICEBERG_EXPORT Writer {

/// \brief Get the file statistics.
/// Only valid after the file is closed.
virtual std::optional<Metrics> metrics() = 0;
virtual Result<Metrics> metrics() = 0;

/// \brief Get the file length.
/// Only valid after the file is closed.
virtual std::optional<int64_t> length() = 0;
virtual Result<int64_t> length() = 0;

/// \brief Returns a list of recommended split locations, if applicable, empty
/// otherwise. When available, this information is used for planning scan tasks whose
Expand Down
62 changes: 56 additions & 6 deletions src/iceberg/manifest_adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@

#include "iceberg/manifest_adapter.h"

#include <memory>
#include <utility>

#include <nanoarrow/nanoarrow.h>

#include "iceberg/arrow/nanoarrow_status_internal.h"
#include "iceberg/manifest_entry.h"
#include "iceberg/manifest_list.h"
#include "iceberg/partition_summary_internal.h"
#include "iceberg/result.h"
#include "iceberg/schema.h"
#include "iceberg/schema_internal.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/macros.h"

Expand Down Expand Up @@ -141,10 +142,12 @@ Result<ArrowArray*> ManifestAdapter::FinishAppending() {
return &array_;
}

ManifestEntryAdapter::ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec,
ManifestEntryAdapter::ManifestEntryAdapter(std::optional<int64_t> snapshot_id_,
std::shared_ptr<PartitionSpec> partition_spec,
std::shared_ptr<Schema> current_schema,
ManifestContent content)
: partition_spec_(std::move(partition_spec)),
: snapshot_id_(snapshot_id_),
partition_spec_(std::move(partition_spec)),
current_schema_(std::move(current_schema)),
content_(content) {
if (!partition_spec_) {
Expand All @@ -161,6 +164,27 @@ ManifestEntryAdapter::~ManifestEntryAdapter() {
}
}

Result<ManifestFile> ManifestEntryAdapter::ToManifestFile() const {
ManifestFile manifest_file;
manifest_file.partition_spec_id = partition_spec_->spec_id();
manifest_file.content = content_;
// sequence_number and min_sequence_number with kInvalidSequenceNumber will be
// replace with real sequence number in `ManifestListWriter`.
manifest_file.sequence_number = TableMetadata::kInvalidSequenceNumber;
manifest_file.min_sequence_number =
min_sequence_number_.value_or(TableMetadata::kInvalidSequenceNumber);
manifest_file.existing_files_count = existing_files_count_;
manifest_file.added_files_count = add_files_count_;
manifest_file.existing_files_count = existing_files_count_;
manifest_file.deleted_files_count = delete_files_count_;
manifest_file.added_rows_count = add_rows_count_;
manifest_file.existing_rows_count = existing_rows_count_;
manifest_file.deleted_rows_count = delete_rows_count_;
ICEBERG_ASSIGN_OR_RAISE(auto partition_summary, partition_summary_->Summaries());
manifest_file.partitions = std::move(partition_summary);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ICEBERG_ASSIGN_OR_RAISE(auto partition_summary, partition_summary_->Summaries());
manifest_file.partitions = std::move(partition_summary);
ICEBERG_ASSIGN_OR_RAISE(manifest_file.partitions, partition_summary_->Summaries());

return manifest_file;
}

Status ManifestEntryAdapter::AppendPartitionValues(
ArrowArray* array, const std::shared_ptr<StructType>& partition_type,
const std::vector<Literal>& partition_values) {
Expand Down Expand Up @@ -318,13 +342,16 @@ Status ManifestEntryAdapter::AppendDataFile(
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(child_array, 1));
}
break;
case 142: // first_row_id (optional int64)
if (file.first_row_id.has_value()) {
ICEBERG_RETURN_UNEXPECTED(AppendField(child_array, file.first_row_id.value()));
case 142: {
// first_row_id (optional int64)
ICEBERG_ASSIGN_OR_RAISE(auto first_row_id, GetFirstRowId(file));
if (first_row_id.has_value()) {
ICEBERG_RETURN_UNEXPECTED(AppendField(child_array, first_row_id.value()));
} else {
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(child_array, 1));
}
break;
}
case 143: {
// referenced_data_file (optional string)
ICEBERG_ASSIGN_OR_RAISE(auto referenced_data_file, GetReferenceDataFile(file));
Expand Down Expand Up @@ -386,6 +413,29 @@ Result<std::optional<int64_t>> ManifestEntryAdapter::GetContentSizeInBytes(
}

Status ManifestEntryAdapter::AppendInternal(const ManifestEntry& entry) {
if (entry.data_file == nullptr) [[unlikely]] {
return InvalidManifest("Missing required data_file field from manifest entry.");
}

switch (entry.status) {
case ManifestStatus::kAdded:
add_files_count_++;
add_rows_count_ += entry.data_file->record_count;
break;
case ManifestStatus::kExisting:
existing_files_count_++;
existing_rows_count_ += entry.data_file->record_count;
break;
case ManifestStatus::kDeleted:
delete_files_count_++;
delete_rows_count_ += entry.data_file->record_count;
break;
default:
std::unreachable();
}

ICEBERG_RETURN_UNEXPECTED(partition_summary_->Update(entry.data_file->partition));

const auto& fields = manifest_schema_->fields();
for (size_t i = 0; i < fields.size(); i++) {
const auto& field = fields[i];
Expand Down
19 changes: 17 additions & 2 deletions src/iceberg/manifest_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <memory>
#include <optional>
#include <unordered_map>
#include <unordered_set>
#include <vector>

#include "iceberg/arrow_c_data.h"
Expand Down Expand Up @@ -61,7 +60,8 @@ class ICEBERG_EXPORT ManifestAdapter {
/// Implemented by different versions with version-specific schemas.
class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
public:
ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec,
ManifestEntryAdapter(std::optional<int64_t> snapshot_id_,
std::shared_ptr<PartitionSpec> partition_spec,
std::shared_ptr<Schema> current_schema, ManifestContent content);

~ManifestEntryAdapter() override;
Expand All @@ -72,6 +72,12 @@ class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {

ManifestContent content() const { return content_; }

std::optional<int64_t> snapshot_id() const { return snapshot_id_; }

/// \brief Create a ManifestFile object without setting file metadata, such as
/// location, file size, key metadata, etc.
Result<ManifestFile> ToManifestFile() const;

protected:
Status AppendInternal(const ManifestEntry& entry);
Status AppendDataFile(ArrowArray* array,
Expand All @@ -91,10 +97,19 @@ class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
const DataFile& file) const;

protected:
std::optional<int64_t> snapshot_id_;
std::shared_ptr<PartitionSpec> partition_spec_;
std::shared_ptr<Schema> current_schema_;
std::shared_ptr<Schema> manifest_schema_;
const ManifestContent content_;
int32_t add_files_count_{0};
int32_t existing_files_count_{0};
int32_t delete_files_count_{0};
int64_t add_rows_count_{0L};
int64_t existing_rows_count_{0L};
int64_t delete_rows_count_{0L};
std::optional<int64_t> min_sequence_number_{std::nullopt};
std::unique_ptr<PartitionSummary> partition_summary_;
};

/// \brief Adapter for appending a list of `ManifestFile`s to an `ArrowArray`.
Expand Down
45 changes: 36 additions & 9 deletions src/iceberg/manifest_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,6 @@ ICEBERG_EXPORT constexpr Result<ManifestStatus> ManifestStatusFromInt(
}
}

enum class ManifestContent {
kData = 0,
kDeletes = 1,
};

ICEBERG_EXPORT constexpr std::string_view ToString(ManifestContent content) noexcept;
ICEBERG_EXPORT constexpr Result<ManifestContent> ManifestContentFromString(
std::string_view str) noexcept;

/// \brief DataFile carries data file path, partition tuple, metrics, ...
struct ICEBERG_EXPORT DataFile {
/// \brief Content of a data file
Expand Down Expand Up @@ -315,6 +306,29 @@ struct ICEBERG_EXPORT ManifestEntry {
inline static const SchemaField kFileSequenceNumber =
SchemaField::MakeOptional(4, "file_sequence_number", iceberg::int64());

/// \brief Check if this manifest entry is deleted.
constexpr bool IsAlive() const {
return status == ManifestStatus::kAdded || status == ManifestStatus::kExisting;
}

ManifestEntry AsAdded() const {
ManifestEntry copy = *this;
copy.status = ManifestStatus::kAdded;
return copy;
}

ManifestEntry AsExisting() const {
ManifestEntry copy = *this;
copy.status = ManifestStatus::kExisting;
return copy;
}

ManifestEntry AsDeleted() const {
ManifestEntry copy = *this;
copy.status = ManifestStatus::kDeleted;
return copy;
}

bool operator==(const ManifestEntry& other) const;

static std::shared_ptr<StructType> TypeFromPartitionType(
Expand All @@ -323,6 +337,19 @@ struct ICEBERG_EXPORT ManifestEntry {
std::shared_ptr<StructType> datafile_type);
};

/// \brief Get the relative datafile content type name
ICEBERG_EXPORT constexpr std::string_view ToString(DataFile::Content type) noexcept {
switch (type) {
case DataFile::Content::kData:
return "data";
case DataFile::Content::kPositionDeletes:
return "position_deletes";
case DataFile::Content::kEqualityDeletes:
return "equality_deletes";
}
std::unreachable();
}

/// \brief Get the relative data file content type from int
ICEBERG_EXPORT constexpr Result<DataFile::Content> DataFileContentFromInt(
int content) noexcept {
Expand Down
32 changes: 16 additions & 16 deletions src/iceberg/manifest_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,17 @@ struct ICEBERG_EXPORT PartitionFieldSummary {
static const StructType& Type();
};

/// \brief The type of files tracked by the manifest, either data or delete files; 0 for
/// all v1 manifests
enum class ManifestContent {
/// The manifest content is data.
kData = 0,
/// The manifest content is deletes.
kDeletes = 1,
};

/// \brief Entry in a manifest list.
struct ICEBERG_EXPORT ManifestFile {
/// \brief The type of files tracked by the manifest, either data or delete files; 0 for
/// all v1 manifests
enum class Content {
/// The manifest content is data.
kData = 0,
/// The manifest content is deletes.
kDeletes = 1,
};

/// Field id: 500
/// Location of the manifest file
std::string manifest_path;
Expand All @@ -96,7 +96,7 @@ struct ICEBERG_EXPORT ManifestFile {
/// Field id: 517
/// The type of files tracked by the manifest, either data or delete files; 0 for all v1
/// manifests
Content content = Content::kData;
ManifestContent content = ManifestContent::kData;
/// Field id: 515
/// The sequence number when the manifest was added to the table; use 0 when reading v1
/// manifest lists
Expand Down Expand Up @@ -218,21 +218,21 @@ struct ICEBERG_EXPORT ManifestList {
};

/// \brief Get the relative manifest content type name
ICEBERG_EXPORT constexpr std::string_view ToString(ManifestFile::Content type) noexcept {
ICEBERG_EXPORT constexpr std::string_view ToString(ManifestContent type) noexcept {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make it a static function.

switch (type) {
case ManifestFile::Content::kData:
case ManifestContent::kData:
return "data";
case ManifestFile::Content::kDeletes:
case ManifestContent::kDeletes:
return "deletes";
}
std::unreachable();
}

/// \brief Get the relative manifest content type from name
ICEBERG_EXPORT constexpr Result<ManifestFile::Content> ManifestFileContentFromString(
ICEBERG_EXPORT constexpr Result<ManifestContent> ManifestContentFromString(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make it a static function.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused here, are you worried that this function will be included in multiple translation units?
If so, constexpr implies inline, so this should be fine. We already use this pattern in several other places, correct me if I'm wrong.

std::string_view str) noexcept {
if (str == "data") return ManifestFile::Content::kData;
if (str == "deletes") return ManifestFile::Content::kDeletes;
if (str == "data") return ManifestContent::kData;
if (str == "deletes") return ManifestContent::kDeletes;
return InvalidArgument("Invalid manifest content type: {}", str);
}

Expand Down
Loading
Loading