Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ set(ICEBERG_SOURCES
partition_summary.cc
row/arrow_array_wrapper.cc
row/manifest_wrapper.cc
row/partition_values.cc
row/struct_like.cc
schema.cc
schema_field.cc
Expand Down
6 changes: 3 additions & 3 deletions src/iceberg/manifest_adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,17 +165,17 @@ ManifestEntryAdapter::~ManifestEntryAdapter() {

Status ManifestEntryAdapter::AppendPartitionValues(
ArrowArray* array, const std::shared_ptr<StructType>& partition_type,
const std::vector<Literal>& partition_values) {
const PartitionValues& partition_values) {
if (array->n_children != partition_type->fields().size()) [[unlikely]] {
return InvalidArrowData("Arrow array of partition does not match partition type.");
}
if (partition_values.size() != partition_type->fields().size()) [[unlikely]] {
if (partition_values.num_fields() != partition_type->fields().size()) [[unlikely]] {
return InvalidArrowData("Literal list of partition does not match partition type.");
}
auto fields = partition_type->fields();

for (size_t i = 0; i < fields.size(); i++) {
const auto& partition_value = partition_values[i];
const auto& partition_value = partition_values.ValueAt(i)->get();
const auto& partition_field = fields[i];
auto child_array = array->children[i];
if (partition_value.IsNull()) {
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/manifest_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
const DataFile& file);
static Status AppendPartitionValues(ArrowArray* array,
const std::shared_ptr<StructType>& partition_type,
const std::vector<Literal>& partition_values);
const PartitionValues& partition_values);

virtual Result<std::optional<int64_t>> GetSequenceNumber(
const ManifestEntry& entry) const;
Expand Down
4 changes: 2 additions & 2 deletions src/iceberg/manifest_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
#include <string>
#include <vector>

#include "iceberg/expression/literal.h"
#include "iceberg/file_format.h"
#include "iceberg/iceberg_export.h"
#include "iceberg/partition_spec.h"
#include "iceberg/result.h"
#include "iceberg/row/partition_values.h"
#include "iceberg/schema_field.h"
#include "iceberg/type.h"

Expand Down Expand Up @@ -79,7 +79,7 @@ struct ICEBERG_EXPORT DataFile {
/// Field id: 102
/// Partition data tuple, schema based on the partition spec output using partition
/// field ids
std::vector<Literal> partition;
PartitionValues partition;
/// Field id: 103
/// Number of records in this file, or the cardinality of a deletion vector
int64_t record_count = 0;
Expand Down
15 changes: 7 additions & 8 deletions src/iceberg/manifest_reader_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -297,27 +297,26 @@ Status ParseLiteral(ArrowArrayView* view_of_partition, int64_t row_idx,
std::vector<ManifestEntry>& manifest_entries) {
if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_BOOL) {
auto value = ArrowArrayViewGetUIntUnsafe(view_of_partition, row_idx);
manifest_entries[row_idx].data_file->partition.emplace_back(
Literal::Boolean(value != 0));
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Boolean(value != 0));
} else if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_INT32) {
auto value = ArrowArrayViewGetIntUnsafe(view_of_partition, row_idx);
manifest_entries[row_idx].data_file->partition.emplace_back(Literal::Int(value));
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Int(value));
} else if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_INT64) {
auto value = ArrowArrayViewGetIntUnsafe(view_of_partition, row_idx);
manifest_entries[row_idx].data_file->partition.emplace_back(Literal::Long(value));
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Long(value));
} else if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_FLOAT) {
auto value = ArrowArrayViewGetDoubleUnsafe(view_of_partition, row_idx);
manifest_entries[row_idx].data_file->partition.emplace_back(Literal::Float(value));
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Float(value));
} else if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_DOUBLE) {
auto value = ArrowArrayViewGetDoubleUnsafe(view_of_partition, row_idx);
manifest_entries[row_idx].data_file->partition.emplace_back(Literal::Double(value));
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Double(value));
} else if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_STRING) {
auto value = ArrowArrayViewGetStringUnsafe(view_of_partition, row_idx);
manifest_entries[row_idx].data_file->partition.emplace_back(
manifest_entries[row_idx].data_file->partition.AddValue(
Literal::String(std::string(value.data, value.size_bytes)));
} else if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_BINARY) {
auto buffer = ArrowArrayViewGetBytesUnsafe(view_of_partition, row_idx);
manifest_entries[row_idx].data_file->partition.emplace_back(
manifest_entries[row_idx].data_file->partition.AddValue(
Literal::Binary(std::vector<uint8_t>(buffer.data.as_char,
buffer.data.as_char + buffer.size_bytes)));
} else {
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ iceberg_sources = files(
'partition_summary.cc',
'row/arrow_array_wrapper.cc',
'row/manifest_wrapper.cc',
'row/partition_values.cc',
'row/struct_like.cc',
'schema.cc',
'schema_field.cc',
Expand Down
17 changes: 9 additions & 8 deletions src/iceberg/partition_summary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "iceberg/manifest_list.h"
#include "iceberg/partition_summary_internal.h"
#include "iceberg/result.h"
#include "iceberg/row/partition_values.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/formatter.h" // IWYU pragma: keep
#include "iceberg/util/macros.h"
Expand Down Expand Up @@ -74,18 +75,18 @@ PartitionSummary::PartitionSummary(const StructType& partition_type) {
}
}

Status PartitionSummary::Update(const std::vector<Literal>& partition_values) {
if (partition_values.size() != field_stats_.size()) [[unlikely]] {
Status PartitionSummary::Update(const PartitionValues& partition_values) {
if (partition_values.num_fields() != field_stats_.size()) [[unlikely]] {
return InvalidArgument("partition values size {} does not match field stats size {}",
partition_values.size(), field_stats_.size());
partition_values.num_fields(), field_stats_.size());
}

for (size_t i = 0; i < partition_values.size(); i++) {
for (size_t i = 0; i < partition_values.num_fields(); i++) {
ICEBERG_ASSIGN_OR_RAISE(auto val, partition_values.ValueAt(i));
ICEBERG_ASSIGN_OR_RAISE(
auto literal,
partition_values[i].CastTo(
internal::checked_pointer_cast<PrimitiveType>(field_stats_[i].type())));
ICEBERG_RETURN_UNEXPECTED(field_stats_[i].Update(literal));
auto lit, val.get().CastTo(internal::checked_pointer_cast<PrimitiveType>(
field_stats_[i].type())));
ICEBERG_RETURN_UNEXPECTED(field_stats_[i].Update(lit));
}
return {};
}
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/partition_summary_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class PartitionSummary {
explicit PartitionSummary(const StructType& partition_type);

/// \brief Update the partition summary with partition values.
Status Update(const std::vector<Literal>& partition_values);
Status Update(const PartitionValues& partition_values);

/// \brief Get the list of partition field summaries.
Result<std::vector<PartitionFieldSummary>> Summaries() const;
Expand Down
7 changes: 6 additions & 1 deletion src/iceberg/row/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
# under the License.

install_headers(
['arrow_array_wrapper.h', 'manifest_wrapper.h', 'struct_like.h'],
[
'arrow_array_wrapper.h',
'manifest_wrapper.h',
'partition_values.h',
'struct_like.h',
],
subdir: 'iceberg/row',
)
91 changes: 91 additions & 0 deletions src/iceberg/row/partition_values.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/row/partition_values.h"

namespace iceberg {

PartitionValues& PartitionValues::operator=(const PartitionValues& other) {
if (this != &other) {
values_ = other.values_;
}
return *this;
}

bool PartitionValues::operator==(const PartitionValues& other) const {
return values_ == other.values_;
}

Result<Scalar> PartitionValues::GetField(size_t pos) const {
if (pos >= values_.size()) {
return InvalidArgument(
"Position {} is out of bounds for PartitionValues with {} fields", pos,
values_.size());
}

const auto& literal = values_[pos];

// Handle null values
if (literal.IsNull()) {
return Scalar{std::monostate{}};
}

// Convert Literal to Scalar based on type
switch (literal.type()->type_id()) {
case TypeId::kBoolean:
return Scalar{std::get<bool>(literal.value())};
case TypeId::kInt:
case TypeId::kDate:
return Scalar{std::get<int32_t>(literal.value())};
case TypeId::kLong:
case TypeId::kTime:
case TypeId::kTimestamp:
case TypeId::kTimestampTz:
return Scalar{std::get<int64_t>(literal.value())};
case TypeId::kFloat:
return Scalar{std::get<float>(literal.value())};
case TypeId::kDouble:
return Scalar{std::get<double>(literal.value())};
case TypeId::kString: {
const auto& str = std::get<std::string>(literal.value());
return Scalar{std::string_view(str)};
}
case TypeId::kBinary:
case TypeId::kFixed: {
const auto& bytes = std::get<std::vector<uint8_t>>(literal.value());
return Scalar{
std::string_view(reinterpret_cast<const char*>(bytes.data()), bytes.size())};
}
case TypeId::kDecimal:
return Scalar{std::get<Decimal>(literal.value())};
default:
return NotSupported("Cannot convert literal of type {} to Scalar",
literal.type()->ToString());
}
}

Result<std::reference_wrapper<const Literal>> PartitionValues::ValueAt(size_t pos) const {
if (pos >= values_.size()) {
return InvalidArgument("Cannot get partition value at {} from {} fields", pos,
values_.size());
}
return std::cref(values_[pos]);
}

} // namespace iceberg
75 changes: 75 additions & 0 deletions src/iceberg/row/partition_values.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

/// \file iceberg/row/partition_values.h
/// Wrapper classes for partition value related data structures.

#include <functional>
#include <span>
#include <utility>

#include "iceberg/expression/literal.h"
#include "iceberg/iceberg_export.h"
#include "iceberg/row/struct_like.h"

namespace iceberg {

/// \brief StructLike wrapper for a vector of literals that represent partition values.
class ICEBERG_EXPORT PartitionValues : public StructLike {
public:
PartitionValues() = default;
explicit PartitionValues(std::vector<Literal> values) : values_(std::move(values)) {}
explicit PartitionValues(Literal value) : values_({std::move(value)}) {}

PartitionValues(const PartitionValues& other) : values_(other.values_) {}
PartitionValues& operator=(const PartitionValues& other);

PartitionValues(PartitionValues&&) noexcept = default;
PartitionValues& operator=(PartitionValues&&) noexcept = default;

~PartitionValues() override = default;

Result<Scalar> GetField(size_t pos) const override;

size_t num_fields() const override { return values_.size(); }

/// \brief Get the partition field value at the given position.
/// \param pos The position of the field in the struct.
/// \return A reference to the partition field value.
Result<std::reference_wrapper<const Literal>> ValueAt(size_t pos) const;

/// \brief Add a value to the partition values.
/// \param value The value to add.
void AddValue(Literal value) { values_.emplace_back(std::move(value)); }

/// \brief Reset the partition values.
/// \param values The values to reset to.
void Reset(std::vector<Literal> values) { values_ = std::move(values); }

std::span<const Literal> values() const { return values_; }

bool operator==(const PartitionValues& other) const;

private:
std::vector<Literal> values_;
};

} // namespace iceberg
1 change: 1 addition & 0 deletions src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ add_iceberg_test(schema_test
transform_test.cc
partition_field_test.cc
partition_spec_test.cc
partition_value_test.cc
sort_field_test.cc
sort_order_test.cc
snapshot_test.cc
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/test/manifest_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class ManifestV1Test : public ManifestReaderWriterTestBase {
entry.data_file = std::make_shared<DataFile>();
entry.data_file->file_path = test_dir_prefix + paths[i];
entry.data_file->file_format = FileFormatType::kParquet;
entry.data_file->partition.emplace_back(Literal::Int(partitions[i]));
entry.data_file->partition.AddValue(Literal::Int(partitions[i]));
entry.data_file->record_count = 1;
entry.data_file->file_size_in_bytes = 1375;
entry.data_file->column_sizes = {{1, 49}, {2, 49}, {3, 49}, {4, 49}};
Expand Down
6 changes: 3 additions & 3 deletions src/iceberg/test/manifest_writer_versions_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@

#include "iceberg/arrow/arrow_file_io.h"
#include "iceberg/avro/avro_register.h"
#include "iceberg/expression/literal.h"
#include "iceberg/file_format.h"
#include "iceberg/manifest_entry.h"
#include "iceberg/manifest_list.h"
#include "iceberg/manifest_reader.h"
#include "iceberg/manifest_writer.h"
#include "iceberg/metrics.h"
#include "iceberg/partition_spec.h"
#include "iceberg/row/partition_values.h"
#include "iceberg/schema.h"
#include "iceberg/schema_field.h"
#include "iceberg/table_metadata.h"
Expand All @@ -54,8 +54,8 @@ constexpr FileFormatType kFormat = FileFormatType::kAvro;
constexpr int32_t kSortOrderId = 2;
constexpr int64_t kFirstRowId = 100L;

const std::vector<Literal> kPartition = {Literal::String("cheesy"), Literal::Int(10),
Literal::Int(3)};
const PartitionValues kPartition =
PartitionValues({Literal::String("cheesy"), Literal::Int(10), Literal::Int(3)});
const std::vector<int32_t> kEqualityIds = {1};

const auto kMetrics = Metrics{
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/test/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ iceberg_tests = {
'name_mapping_test.cc',
'partition_field_test.cc',
'partition_spec_test.cc',
'partition_value_test.cc',
'schema_field_test.cc',
'schema_test.cc',
'schema_util_test.cc',
Expand Down
Loading
Loading