Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 230 additions & 0 deletions src/iceberg/avro/avro_data_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
* under the License.
*/

#include <ranges>

#include <arrow/array/builder_binary.h>
#include <arrow/array/builder_decimal.h>
#include <arrow/array/builder_nested.h>
#include <arrow/array/builder_primitive.h>
#include <arrow/extension_type.h>
#include <arrow/json/from_string.h>
#include <arrow/type.h>
#include <arrow/util/decimal.h>
Expand Down Expand Up @@ -451,4 +454,231 @@ Status AppendDatumToBuilder(const ::avro::NodePtr& avro_node,
projected_schema, array_builder);
}

namespace {

// ToAvroNodeVisitor uses 0 for null branch and 1 for value branch.
constexpr int64_t kNullBranch = 0;
constexpr int64_t kValueBranch = 1;

} // namespace

Status ExtractDatumFromArray(const ::arrow::Array& array, int64_t index,
::avro::GenericDatum* datum) {
if (index < 0 || index >= array.length()) {
return InvalidArgument("Cannot extract datum from array at index {} of length {}",
index, array.length());
}

if (array.IsNull(index)) {
if (!datum->isUnion()) [[unlikely]] {
return InvalidSchema("Cannot extract null to non-union type: {}",
::avro::toString(datum->type()));
}
datum->selectBranch(kNullBranch);
return {};
}

if (datum->isUnion()) {
datum->selectBranch(kValueBranch);
}

switch (array.type()->id()) {
case ::arrow::Type::BOOL: {
const auto& bool_array =
internal::checked_cast<const ::arrow::BooleanArray&>(array);
datum->value<bool>() = bool_array.Value(index);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check if the GenericDatum type matches the arrow::Array type?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is an internal API used by the Avro writer. Schemas of GenericDatum, ArrowArray and Iceberg should be consistent. The GenericDatum is reused so we don't want to check it repeatedly.

return {};
}

case ::arrow::Type::INT32: {
const auto& int32_array = internal::checked_cast<const ::arrow::Int32Array&>(array);
datum->value<int32_t>() = int32_array.Value(index);
return {};
}

case ::arrow::Type::INT64: {
const auto& int64_array = internal::checked_cast<const ::arrow::Int64Array&>(array);
datum->value<int64_t>() = int64_array.Value(index);
return {};
}

case ::arrow::Type::FLOAT: {
const auto& float_array = internal::checked_cast<const ::arrow::FloatArray&>(array);
datum->value<float>() = float_array.Value(index);
return {};
}

case ::arrow::Type::DOUBLE: {
const auto& double_array =
internal::checked_cast<const ::arrow::DoubleArray&>(array);
datum->value<double>() = double_array.Value(index);
return {};
}

// TODO(gangwu): support LARGE_STRING.
case ::arrow::Type::STRING: {
const auto& string_array =
internal::checked_cast<const ::arrow::StringArray&>(array);
datum->value<std::string>() = string_array.GetString(index);
return {};
}

// TODO(gangwu): support LARGE_BINARY.
case ::arrow::Type::BINARY: {
const auto& binary_array =
internal::checked_cast<const ::arrow::BinaryArray&>(array);
std::string_view value = binary_array.GetView(index);
datum->value<std::vector<uint8_t>>().assign(
reinterpret_cast<const uint8_t*>(value.data()),
reinterpret_cast<const uint8_t*>(value.data()) + value.size());
return {};
}

case ::arrow::Type::FIXED_SIZE_BINARY: {
const auto& fixed_array =
internal::checked_cast<const ::arrow::FixedSizeBinaryArray&>(array);
std::string_view value = fixed_array.GetView(index);
auto& fixed_datum = datum->value<::avro::GenericFixed>();
fixed_datum.value().assign(value.begin(), value.end());
return {};
}

case ::arrow::Type::DECIMAL128: {
const auto& decimal_array =
internal::checked_cast<const ::arrow::Decimal128Array&>(array);
std::string_view decimal_value = decimal_array.GetView(index);
auto& fixed_datum = datum->value<::avro::GenericFixed>();
auto& bytes = fixed_datum.value();
bytes.assign(decimal_value.begin(), decimal_value.end());
std::ranges::reverse(bytes);
return {};
}

case ::arrow::Type::DATE32: {
const auto& date_array = internal::checked_cast<const ::arrow::Date32Array&>(array);
datum->value<int32_t>() = date_array.Value(index);
return {};
}

case ::arrow::Type::TIME64: {
const auto& time_array = internal::checked_cast<const ::arrow::Time64Array&>(array);
datum->value<int64_t>() = time_array.Value(index);
return {};
}

// For both timestamp and timestamp_tz with time unit as microsecond.
case ::arrow::Type::TIMESTAMP: {
const auto& timestamp_array =
internal::checked_cast<const ::arrow::TimestampArray&>(array);
datum->value<int64_t>() = timestamp_array.Value(index);
return {};
}

case ::arrow::Type::EXTENSION: {
if (array.type()->name() == "arrow.uuid") {
const auto& extension_array =
internal::checked_cast<const ::arrow::ExtensionArray&>(array);
const auto& fixed_array =
internal::checked_cast<const ::arrow::FixedSizeBinaryArray&>(
*extension_array.storage());
std::string_view value = fixed_array.GetView(index);
auto& fixed_datum = datum->value<::avro::GenericFixed>();
fixed_datum.value().assign(value.begin(), value.end());
return {};
}

return NotSupported("Unsupported Arrow extension type: {}", array.type()->name());
}

case ::arrow::Type::STRUCT: {
const auto& struct_array =
internal::checked_cast<const ::arrow::StructArray&>(array);
auto& record = datum->value<::avro::GenericRecord>();
for (int i = 0; i < struct_array.num_fields(); ++i) {
ICEBERG_RETURN_UNEXPECTED(
ExtractDatumFromArray(*struct_array.field(i), index, &record.fieldAt(i)));
}
return {};
}

// TODO(gangwu): support LARGE_LIST.
case ::arrow::Type::LIST: {
const auto& list_array = internal::checked_cast<const ::arrow::ListArray&>(array);
auto& avro_array = datum->value<::avro::GenericArray>();
auto& elements = avro_array.value();

auto start = list_array.value_offset(index);
auto end = list_array.value_offset(index + 1);
auto length = end - start;

auto values = list_array.values();
elements.resize(length, ::avro::GenericDatum(avro_array.schema()->leafAt(0)));

for (int64_t i = 0; i < length; ++i) {
ICEBERG_RETURN_UNEXPECTED(
ExtractDatumFromArray(*values, start + i, &elements[i]));
}
return {};
}

case ::arrow::Type::MAP: {
const auto& map_array = internal::checked_cast<const ::arrow::MapArray&>(array);
auto start = map_array.value_offset(index);
auto end = map_array.value_offset(index + 1);
auto length = end - start;

auto keys = map_array.keys();
auto items = map_array.items();

if (datum->type() == ::avro::AVRO_MAP) {
// Handle regular Avro map
auto& avro_map = datum->value<::avro::GenericMap>();
auto value_node = avro_map.schema()->leafAt(1);

auto& map_entries = avro_map.value();
map_entries.resize(
length, std::make_pair(std::string(), ::avro::GenericDatum(value_node)));

const auto& key_array =
internal::checked_cast<const ::arrow::StringArray&>(*keys);

for (int64_t i = 0; i < length; ++i) {
auto& map_entry = map_entries[i];
map_entry.first = key_array.GetString(start + i);
ICEBERG_RETURN_UNEXPECTED(
ExtractDatumFromArray(*items, start + i, &map_entry.second));
}
} else if (datum->type() == ::avro::AVRO_ARRAY) {
// Handle array-based map (list<struct<key, value>>)
auto& avro_array = datum->value<::avro::GenericArray>();
auto record_node = avro_array.schema()->leafAt(0);
if (record_node->type() != ::avro::AVRO_RECORD || record_node->leaves() != 2) {
return InvalidArgument(
"Expected Avro record with 2 fields for map value, got: {}",
ToString(record_node));
}

auto& elements = avro_array.value();
elements.resize(length, ::avro::GenericDatum(record_node));

for (int64_t i = 0; i < length; ++i) {
auto& record = elements[i].value<::avro::GenericRecord>();
ICEBERG_RETURN_UNEXPECTED(
ExtractDatumFromArray(*keys, start + i, &record.fieldAt(0)));
ICEBERG_RETURN_UNEXPECTED(
ExtractDatumFromArray(*items, start + i, &record.fieldAt(1)));
}
} else {
return InvalidArgument("Unsupported Avro type for map: {}",
static_cast<int>(datum->type()));
}
return {};
}

default:
return InvalidArgument("Unsupported Arrow array type: {}",
array.type()->ToString());
}
}

} // namespace iceberg::avro
9 changes: 9 additions & 0 deletions src/iceberg/avro/avro_data_util_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,13 @@ Status AppendDatumToBuilder(const ::avro::NodePtr& avro_node,
const Schema& projected_schema,
::arrow::ArrayBuilder* array_builder);

/// \brief Extract an Avro datum from an Arrow array.
///
/// \param array The Arrow array to extract from.
/// \param index The index of the element to extract.
/// \param datum The Avro datum to extract to. Its Avro type should be consistent with the
/// Arrow type.
Status ExtractDatumFromArray(const ::arrow::Array& array, int64_t index,
::avro::GenericDatum* datum);

} // namespace iceberg::avro
Loading
Loading