Skip to content

Commit ef309f4

Browse files
authored
feat(avro): optimize writer performance by directly using encoder (#445)
Implement direct Avro encoder to eliminate GenericDatum intermediate layer, matching the decoder approach for better performance.
1 parent ba92781 commit ef309f4

File tree

11 files changed

+965
-255
lines changed

11 files changed

+965
-255
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ if(ICEBERG_BUILD_BUNDLE)
160160
arrow/arrow_fs_file_io.cc
161161
avro/avro_data_util.cc
162162
avro/avro_direct_decoder.cc
163+
avro/avro_direct_encoder.cc
163164
avro/avro_reader.cc
164165
avro/avro_writer.cc
165166
avro/avro_register.cc

src/iceberg/avro/CMakeLists.txt

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,3 @@
1616
# under the License.
1717

1818
iceberg_install_all_headers(iceberg/avro)
19-
20-
# avro_scan benchmark executable
21-
add_executable(avro_scan avro_scan.cc)
22-
target_link_libraries(avro_scan PRIVATE iceberg_bundle_static)
23-
set_target_properties(avro_scan PROPERTIES RUNTIME_OUTPUT_DIRECTORY
24-
"${CMAKE_BINARY_DIR}/src/iceberg/avro")

src/iceberg/avro/avro_direct_decoder.cc

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ namespace {
4545
Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder,
4646
const FieldProjection& projection,
4747
const SchemaField& projected_field,
48-
::arrow::ArrayBuilder* array_builder, DecodeContext* ctx);
48+
::arrow::ArrayBuilder* array_builder, DecodeContext& ctx);
4949

5050
/// \brief Skip an Avro value based on its schema without decoding
5151
Status SkipAvroValue(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder) {
@@ -146,7 +146,7 @@ Status SkipAvroValue(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder)
146146
Status DecodeStructToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder,
147147
const std::span<const FieldProjection>& projections,
148148
const StructType& struct_type,
149-
::arrow::ArrayBuilder* array_builder, DecodeContext* ctx) {
149+
::arrow::ArrayBuilder* array_builder, DecodeContext& ctx) {
150150
if (avro_node->type() != ::avro::AVRO_RECORD) {
151151
return InvalidArgument("Expected Avro record, got type: {}", ToString(avro_node));
152152
}
@@ -157,15 +157,15 @@ Status DecodeStructToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder&
157157
// Build a map from Avro field index to projection index (cached per struct schema)
158158
// -1 means the field should be skipped
159159
const FieldProjection* cache_key = projections.data();
160-
auto cache_it = ctx->avro_to_projection_cache.find(cache_key);
160+
auto cache_it = ctx.avro_to_projection_cache.find(cache_key);
161161
std::vector<int>* avro_to_projection;
162162

163-
if (cache_it != ctx->avro_to_projection_cache.end()) {
163+
if (cache_it != ctx.avro_to_projection_cache.end()) {
164164
// Use cached mapping
165165
avro_to_projection = &cache_it->second;
166166
} else {
167167
// Build and cache the mapping
168-
auto [inserted_it, inserted] = ctx->avro_to_projection_cache.emplace(
168+
auto [inserted_it, inserted] = ctx.avro_to_projection_cache.emplace(
169169
cache_key, std::vector<int>(avro_node->leaves(), -1));
170170
avro_to_projection = &inserted_it->second;
171171

@@ -217,7 +217,7 @@ Status DecodeStructToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder&
217217
Status DecodeListToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder,
218218
const FieldProjection& element_projection,
219219
const ListType& list_type,
220-
::arrow::ArrayBuilder* array_builder, DecodeContext* ctx) {
220+
::arrow::ArrayBuilder* array_builder, DecodeContext& ctx) {
221221
if (avro_node->type() != ::avro::AVRO_ARRAY) {
222222
return InvalidArgument("Expected Avro array, got type: {}", ToString(avro_node));
223223
}
@@ -247,7 +247,7 @@ Status DecodeMapToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& dec
247247
const FieldProjection& key_projection,
248248
const FieldProjection& value_projection,
249249
const MapType& map_type, ::arrow::ArrayBuilder* array_builder,
250-
DecodeContext* ctx) {
250+
DecodeContext& ctx) {
251251
auto* map_builder = internal::checked_cast<::arrow::MapBuilder*>(array_builder);
252252

253253
if (avro_node->type() == ::avro::AVRO_MAP) {
@@ -317,7 +317,7 @@ Status DecodeNestedValueToBuilder(const ::avro::NodePtr& avro_node,
317317
const std::span<const FieldProjection>& projections,
318318
const NestedType& projected_type,
319319
::arrow::ArrayBuilder* array_builder,
320-
DecodeContext* ctx) {
320+
DecodeContext& ctx) {
321321
switch (projected_type.type_id()) {
322322
case TypeId::kStruct: {
323323
const auto& struct_type = internal::checked_cast<const StructType&>(projected_type);
@@ -354,7 +354,7 @@ Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr& avro_node,
354354
::avro::Decoder& decoder,
355355
const SchemaField& projected_field,
356356
::arrow::ArrayBuilder* array_builder,
357-
DecodeContext* ctx) {
357+
DecodeContext& ctx) {
358358
const auto& projected_type = *projected_field.type();
359359
if (!projected_type.is_primitive()) {
360360
return InvalidArgument("Expected primitive type, got: {}", projected_type.ToString());
@@ -430,8 +430,8 @@ Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr& avro_node,
430430
ToString(avro_node));
431431
}
432432
auto* builder = internal::checked_cast<::arrow::StringBuilder*>(array_builder);
433-
decoder.decodeString(ctx->string_scratch);
434-
ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx->string_scratch));
433+
decoder.decodeString(ctx.string_scratch);
434+
ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx.string_scratch));
435435
return {};
436436
}
437437

@@ -441,9 +441,9 @@ Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr& avro_node,
441441
ToString(avro_node));
442442
}
443443
auto* builder = internal::checked_cast<::arrow::BinaryBuilder*>(array_builder);
444-
decoder.decodeBytes(ctx->bytes_scratch);
444+
decoder.decodeBytes(ctx.bytes_scratch);
445445
ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(
446-
ctx->bytes_scratch.data(), static_cast<int32_t>(ctx->bytes_scratch.size())));
446+
ctx.bytes_scratch.data(), static_cast<int32_t>(ctx.bytes_scratch.size())));
447447
return {};
448448
}
449449

@@ -456,9 +456,9 @@ Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr& avro_node,
456456
auto* builder =
457457
internal::checked_cast<::arrow::FixedSizeBinaryBuilder*>(array_builder);
458458

459-
ctx->bytes_scratch.resize(fixed_type.length());
460-
decoder.decodeFixed(fixed_type.length(), ctx->bytes_scratch);
461-
ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx->bytes_scratch.data()));
459+
ctx.bytes_scratch.resize(fixed_type.length());
460+
decoder.decodeFixed(fixed_type.length(), ctx.bytes_scratch);
461+
ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx.bytes_scratch.data()));
462462
return {};
463463
}
464464

@@ -472,9 +472,9 @@ Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr& avro_node,
472472
auto* builder =
473473
internal::checked_cast<::arrow::FixedSizeBinaryBuilder*>(array_builder);
474474

475-
ctx->bytes_scratch.resize(16);
476-
decoder.decodeFixed(16, ctx->bytes_scratch);
477-
ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx->bytes_scratch.data()));
475+
ctx.bytes_scratch.resize(16);
476+
decoder.decodeFixed(16, ctx.bytes_scratch);
477+
ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx.bytes_scratch.data()));
478478
return {};
479479
}
480480

@@ -489,11 +489,11 @@ Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr& avro_node,
489489
size_t byte_width = avro_node->fixedSize();
490490
auto* builder = internal::checked_cast<::arrow::Decimal128Builder*>(array_builder);
491491

492-
ctx->bytes_scratch.resize(byte_width);
493-
decoder.decodeFixed(byte_width, ctx->bytes_scratch);
492+
ctx.bytes_scratch.resize(byte_width);
493+
decoder.decodeFixed(byte_width, ctx.bytes_scratch);
494494
ICEBERG_ARROW_ASSIGN_OR_RETURN(
495-
auto decimal, ::arrow::Decimal128::FromBigEndian(ctx->bytes_scratch.data(),
496-
ctx->bytes_scratch.size()));
495+
auto decimal, ::arrow::Decimal128::FromBigEndian(ctx.bytes_scratch.data(),
496+
ctx.bytes_scratch.size()));
497497
ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(decimal));
498498
return {};
499499
}
@@ -548,7 +548,7 @@ Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr& avro_node,
548548
Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder,
549549
const FieldProjection& projection,
550550
const SchemaField& projected_field,
551-
::arrow::ArrayBuilder* array_builder, DecodeContext* ctx) {
551+
::arrow::ArrayBuilder* array_builder, DecodeContext& ctx) {
552552
if (avro_node->type() == ::avro::AVRO_UNION) {
553553
const size_t branch_index = decoder.decodeUnionIndex();
554554

@@ -585,7 +585,7 @@ Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& d
585585
Status DecodeAvroToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder,
586586
const SchemaProjection& projection,
587587
const Schema& projected_schema,
588-
::arrow::ArrayBuilder* array_builder, DecodeContext* ctx) {
588+
::arrow::ArrayBuilder* array_builder, DecodeContext& ctx) {
589589
return DecodeNestedValueToBuilder(avro_node, decoder, projection.fields,
590590
projected_schema, array_builder, ctx);
591591
}

src/iceberg/avro/avro_direct_decoder_internal.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,6 @@ struct DecodeContext {
8282
Status DecodeAvroToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder,
8383
const SchemaProjection& projection,
8484
const Schema& projected_schema,
85-
::arrow::ArrayBuilder* array_builder, DecodeContext* ctx);
85+
::arrow::ArrayBuilder* array_builder, DecodeContext& ctx);
8686

8787
} // namespace iceberg::avro

0 commit comments

Comments
 (0)