Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.

build/
cmake-build/
cmake-build-debug/
cmake-build-release/
.DS_Store
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ set(ICEBERG_SOURCES
name_mapping.cc
partition_field.cc
partition_spec.cc
partition_summary.cc
row/arrow_array_wrapper.cc
row/manifest_wrapper.cc
row/struct_like.cc
Expand Down
8 changes: 4 additions & 4 deletions src/iceberg/avro/avro_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,19 +154,19 @@ Status AvroWriter::Close() {
return {};
}

std::optional<Metrics> AvroWriter::metrics() {
Result<Metrics> AvroWriter::metrics() {
if (impl_->Closed()) {
// TODO(xiao.dong) implement metrics
return {};
}
return std::nullopt;
return Invalid("AvroWriter is not closed");
}

std::optional<int64_t> AvroWriter::length() {
Result<int64_t> AvroWriter::length() {
if (impl_->Closed()) {
return impl_->length();
}
return std::nullopt;
return Invalid("AvroWriter is not closed");
}

std::vector<int64_t> AvroWriter::split_offsets() { return {}; }
Expand Down
4 changes: 2 additions & 2 deletions src/iceberg/avro/avro_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ class ICEBERG_BUNDLE_EXPORT AvroWriter : public Writer {

Status Write(ArrowArray* data) final;

std::optional<Metrics> metrics() final;
Result<Metrics> metrics() final;

std::optional<int64_t> length() final;
Result<int64_t> length() final;

std::vector<int64_t> split_offsets() final;

Expand Down
4 changes: 2 additions & 2 deletions src/iceberg/expression/literal.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,11 @@ class ICEBERG_EXPORT Literal : public util::Formattable {
/// \return true if this literal represents a BelowMin value, false otherwise
bool IsBelowMin() const;

/// Check if this literal is null.
/// \brief Check if this literal is null.
/// \return true if this literal is null, false otherwise
bool IsNull() const;

/// Check if this literal is NaN.
/// \brief Check if this literal is NaN.
/// \return true if this literal is NaN, false otherwise
bool IsNaN() const;

Expand Down
4 changes: 2 additions & 2 deletions src/iceberg/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ class ICEBERG_EXPORT Writer {

/// \brief Get the file statistics.
/// Only valid after the file is closed.
virtual std::optional<Metrics> metrics() = 0;
virtual Result<Metrics> metrics() = 0;

/// \brief Get the file length.
/// Only valid after the file is closed.
virtual std::optional<int64_t> length() = 0;
virtual Result<int64_t> length() = 0;

/// \brief Returns a list of recommended split locations, if applicable, empty
/// otherwise. When available, this information is used for planning scan tasks whose
Expand Down
12 changes: 9 additions & 3 deletions src/iceberg/manifest_adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "iceberg/manifest_adapter.h"

#include <memory>
#include <utility>

#include <nanoarrow/nanoarrow.h>
Expand All @@ -28,7 +29,6 @@
#include "iceberg/manifest_list.h"
#include "iceberg/result.h"
#include "iceberg/schema.h"
#include "iceberg/schema_internal.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/macros.h"

Expand Down Expand Up @@ -141,10 +141,12 @@ Result<ArrowArray*> ManifestAdapter::FinishAppending() {
return &array_;
}

ManifestEntryAdapter::ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec,
ManifestEntryAdapter::ManifestEntryAdapter(std::optional<int64_t> snapshot_id_,
std::shared_ptr<PartitionSpec> partition_spec,
std::shared_ptr<Schema> current_schema,
ManifestContent content)
: partition_spec_(std::move(partition_spec)),
: snapshot_id_(snapshot_id_),
partition_spec_(std::move(partition_spec)),
current_schema_(std::move(current_schema)),
content_(content) {
if (!partition_spec_) {
Expand Down Expand Up @@ -386,6 +388,10 @@ Result<std::optional<int64_t>> ManifestEntryAdapter::GetContentSizeInBytes(
}

Status ManifestEntryAdapter::AppendInternal(const ManifestEntry& entry) {
if (entry.data_file == nullptr) [[unlikely]] {
return InvalidManifest("Missing required data_file field from manifest entry.");
}

const auto& fields = manifest_schema_->fields();
for (size_t i = 0; i < fields.size(); i++) {
const auto& field = fields[i];
Expand Down
12 changes: 10 additions & 2 deletions src/iceberg/manifest_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <memory>
#include <optional>
#include <unordered_map>
#include <unordered_set>
#include <vector>

#include "iceberg/arrow_c_data.h"
Expand Down Expand Up @@ -61,7 +60,8 @@ class ICEBERG_EXPORT ManifestAdapter {
/// Implemented by different versions with version-specific schemas.
class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
public:
ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec,
ManifestEntryAdapter(std::optional<int64_t> snapshot_id_,
std::shared_ptr<PartitionSpec> partition_spec,
std::shared_ptr<Schema> current_schema, ManifestContent content);

~ManifestEntryAdapter() override;
Expand All @@ -72,6 +72,12 @@ class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {

ManifestContent content() const { return content_; }

std::optional<int64_t> snapshot_id() const { return snapshot_id_; }

const std::shared_ptr<PartitionSpec>& partition_spec() const { return partition_spec_; }

const std::shared_ptr<StructType>& partition_type() const { return partition_type_; }

protected:
Status AppendInternal(const ManifestEntry& entry);
Status AppendDataFile(ArrowArray* array,
Expand All @@ -91,8 +97,10 @@ class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
const DataFile& file) const;

protected:
std::optional<int64_t> snapshot_id_;
std::shared_ptr<PartitionSpec> partition_spec_;
std::shared_ptr<Schema> current_schema_;
std::shared_ptr<StructType> partition_type_;
std::shared_ptr<Schema> manifest_schema_;
const ManifestContent content_;
};
Expand Down
1 change: 0 additions & 1 deletion src/iceberg/manifest_entry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <memory>
#include <vector>

#include "iceberg/schema.h"
#include "iceberg/schema_field.h"
#include "iceberg/type.h"

Expand Down
50 changes: 41 additions & 9 deletions src/iceberg/manifest_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,6 @@ ICEBERG_EXPORT constexpr Result<ManifestStatus> ManifestStatusFromInt(
}
}

enum class ManifestContent {
kData = 0,
kDeletes = 1,
};

ICEBERG_EXPORT constexpr std::string_view ToString(ManifestContent content) noexcept;
ICEBERG_EXPORT constexpr Result<ManifestContent> ManifestContentFromString(
std::string_view str) noexcept;

/// \brief DataFile carries data file path, partition tuple, metrics, ...
struct ICEBERG_EXPORT DataFile {
/// \brief Content of a data file
Expand Down Expand Up @@ -277,6 +268,7 @@ struct ICEBERG_EXPORT DataFile {

bool operator==(const DataFile& other) const = default;

/// \brief Get the schema of the data file with the given partition type.
static std::shared_ptr<StructType> Type(std::shared_ptr<StructType> partition_type);
};

Expand Down Expand Up @@ -315,6 +307,33 @@ struct ICEBERG_EXPORT ManifestEntry {
inline static const SchemaField kFileSequenceNumber =
SchemaField::MakeOptional(4, "file_sequence_number", iceberg::int64());

/// \brief Check if this manifest entry is deleted.
constexpr bool IsAlive() const {
return status == ManifestStatus::kAdded || status == ManifestStatus::kExisting;
}

ManifestEntry AsAdded() const {
ManifestEntry copy = *this;
copy.status = ManifestStatus::kAdded;
if (copy.data_file->first_row_id.has_value()) {
copy.data_file = std::make_unique<DataFile>(*copy.data_file);
copy.data_file->first_row_id = std::nullopt;
}
return copy;
}

ManifestEntry AsExisting() const {
ManifestEntry copy = *this;
copy.status = ManifestStatus::kExisting;
return copy;
}

ManifestEntry AsDeleted() const {
ManifestEntry copy = *this;
copy.status = ManifestStatus::kDeleted;
return copy;
}

bool operator==(const ManifestEntry& other) const;

static std::shared_ptr<StructType> TypeFromPartitionType(
Expand All @@ -323,6 +342,19 @@ struct ICEBERG_EXPORT ManifestEntry {
std::shared_ptr<StructType> datafile_type);
};

/// \brief Get the relative datafile content type name
ICEBERG_EXPORT constexpr std::string_view ToString(DataFile::Content type) noexcept {
switch (type) {
case DataFile::Content::kData:
return "data";
case DataFile::Content::kPositionDeletes:
return "position_deletes";
case DataFile::Content::kEqualityDeletes:
return "equality_deletes";
}
std::unreachable();
}

/// \brief Get the relative data file content type from int
ICEBERG_EXPORT constexpr Result<DataFile::Content> DataFileContentFromInt(
int content) noexcept {
Expand Down
32 changes: 16 additions & 16 deletions src/iceberg/manifest_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,17 @@ struct ICEBERG_EXPORT PartitionFieldSummary {
static const StructType& Type();
};

/// \brief The type of files tracked by the manifest, either data or delete files; 0 for
/// all v1 manifests
enum class ManifestContent {
/// The manifest content is data.
kData = 0,
/// The manifest content is deletes.
kDeletes = 1,
};

/// \brief Entry in a manifest list.
struct ICEBERG_EXPORT ManifestFile {
/// \brief The type of files tracked by the manifest, either data or delete files; 0 for
/// all v1 manifests
enum class Content {
/// The manifest content is data.
kData = 0,
/// The manifest content is deletes.
kDeletes = 1,
};

/// Field id: 500
/// Location of the manifest file
std::string manifest_path;
Expand All @@ -96,7 +96,7 @@ struct ICEBERG_EXPORT ManifestFile {
/// Field id: 517
/// The type of files tracked by the manifest, either data or delete files; 0 for all v1
/// manifests
Content content = Content::kData;
ManifestContent content = ManifestContent::kData;
/// Field id: 515
/// The sequence number when the manifest was added to the table; use 0 when reading v1
/// manifest lists
Expand Down Expand Up @@ -218,21 +218,21 @@ struct ICEBERG_EXPORT ManifestList {
};

/// \brief Get the relative manifest content type name
ICEBERG_EXPORT constexpr std::string_view ToString(ManifestFile::Content type) noexcept {
ICEBERG_EXPORT inline constexpr std::string_view ToString(ManifestContent type) noexcept {
switch (type) {
case ManifestFile::Content::kData:
case ManifestContent::kData:
return "data";
case ManifestFile::Content::kDeletes:
case ManifestContent::kDeletes:
return "deletes";
}
std::unreachable();
}

/// \brief Get the relative manifest content type from name
ICEBERG_EXPORT constexpr Result<ManifestFile::Content> ManifestFileContentFromString(
ICEBERG_EXPORT inline constexpr Result<ManifestContent> ManifestContentFromString(
std::string_view str) noexcept {
if (str == "data") return ManifestFile::Content::kData;
if (str == "deletes") return ManifestFile::Content::kDeletes;
if (str == "data") return ManifestContent::kData;
if (str == "deletes") return ManifestContent::kDeletes;
return InvalidArgument("Invalid manifest content type: {}", str);
}

Expand Down
6 changes: 4 additions & 2 deletions src/iceberg/manifest_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
InheritableMetadataFactory::FromManifest(manifest));

return std::make_unique<ManifestReaderImpl>(std::move(reader), std::move(schema),
std::move(inheritable_metadata));
std::move(inheritable_metadata),
manifest.first_row_id);
}

Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
Expand All @@ -66,7 +67,8 @@ Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
.projection = schema}));
ICEBERG_ASSIGN_OR_RAISE(auto inheritable_metadata, InheritableMetadataFactory::Empty());
return std::make_unique<ManifestReaderImpl>(std::move(reader), std::move(schema),
std::move(inheritable_metadata));
std::move(inheritable_metadata),
std::nullopt);
}

Result<std::unique_ptr<ManifestListReader>> ManifestListReader::Make(
Expand Down
Loading
Loading