Skip to content

Commit 3ae3cff

Browse files
committed
refactor(avro): address PR review comments
- Replace 'new/legacy path' terminology with 'DirectDecoder/GenericDatum' - Add DecodeContext to reuse scratch buffers and avoid allocations - Combine decimal type validation checks - Add error handling for unsupported FieldProjection kinds - Cache avro_to_projection mapping in DecodeContext - Rename kAvroUseDirectDecoder to kAvroSkipDatum - Add test for map with non-string keys - Add test for column projection with subset and reordering - Create avro_scan benchmark executable - Convert tests to parameterized tests for both decoder modes - Fix temp file path handling for parameterized test names
1 parent 96b0521 commit 3ae3cff

File tree

8 files changed

+442
-174
lines changed

8 files changed

+442
-174
lines changed

src/iceberg/avro/CMakeLists.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,9 @@
1616
# under the License.
1717

1818
iceberg_install_all_headers(iceberg/avro)
19+
20+
# avro_scan benchmark executable
21+
add_executable(avro_scan avro_scan.cc)
22+
target_link_libraries(avro_scan PRIVATE iceberg_bundle_static)
23+
set_target_properties(avro_scan PROPERTIES RUNTIME_OUTPUT_DIRECTORY
24+
"${CMAKE_BINARY_DIR}/src/iceberg/avro")

src/iceberg/avro/avro_direct_decoder.cc

Lines changed: 71 additions & 68 deletions
Large diffs are not rendered by default.

src/iceberg/avro/avro_direct_decoder_internal.h

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,22 @@
3333

3434
namespace iceberg::avro {
3535

36+
/// \brief Context for reusing scratch buffers during Avro decoding
37+
///
38+
/// Avoids frequent small allocations by reusing temporary buffers across
39+
/// multiple decode operations. This is particularly important for string,
40+
/// binary, and fixed-size data types.
41+
struct DecodeContext {
42+
// Scratch buffer for string decoding (reused across rows)
43+
std::string string_scratch;
44+
// Scratch buffer for binary/fixed/uuid/decimal data (reused across rows)
45+
std::vector<uint8_t> bytes_scratch;
46+
// Cache for avro field index to projection index mapping
47+
// Key: pointer to projections array (identifies struct schema)
48+
// Value: vector mapping avro field index -> projection index (-1 if not projected)
49+
std::unordered_map<const FieldProjection*, std::vector<int>> avro_to_projection_cache;
50+
};
51+
3652
/// \brief Directly decode Avro data to Arrow array builders without GenericDatum
3753
///
3854
/// Eliminates the GenericDatum intermediate layer by directly calling Avro decoder
@@ -61,10 +77,11 @@ namespace iceberg::avro {
6177
/// \param projection The field projections (from Project() function)
6278
/// \param projected_schema The target Iceberg schema after projection
6379
/// \param array_builder The Arrow array builder to append decoded data to
80+
/// \param ctx Decode context for reusing scratch buffers
6481
/// \return Status::OK if successful, or an error status
6582
Status DecodeAvroToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder,
6683
const SchemaProjection& projection,
6784
const Schema& projected_schema,
68-
::arrow::ArrayBuilder* array_builder);
85+
::arrow::ArrayBuilder* array_builder, DecodeContext* ctx);
6986

7087
} // namespace iceberg::avro

src/iceberg/avro/avro_reader.cc

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,12 @@ struct ReadContext {
6767
std::shared_ptr<::arrow::Schema> arrow_schema_;
6868
// The builder to build the record batch.
6969
std::shared_ptr<::arrow::ArrayBuilder> builder_;
70-
// GenericDatum for legacy path (only used if direct decoder is disabled)
70+
// GenericDatum for GenericDatum-based decoding (only used if direct decoder is
71+
// disabled)
7172
std::unique_ptr<::avro::GenericDatum> datum_;
73+
// Decode context for reusing scratch buffers (only used if direct decoder is
74+
// enabled)
75+
DecodeContext decode_context_;
7276
};
7377

7478
// TODO(gang.wu): there are a lot to do to make this reader work.
@@ -84,8 +88,7 @@ class AvroReader::Impl {
8488
}
8589

8690
batch_size_ = options.properties->Get(ReaderProperties::kBatchSize);
87-
use_direct_decoder_ =
88-
options.properties->Get(ReaderProperties::kAvroUseDirectDecoder);
91+
use_direct_decoder_ = options.properties->Get(ReaderProperties::kAvroSkipDatum);
8992
read_schema_ = options.projection;
9093

9194
// Open the input stream and adapt to the avro interface.
@@ -97,13 +100,13 @@ class AvroReader::Impl {
97100
::avro::ValidSchema file_schema;
98101

99102
if (use_direct_decoder_) {
100-
// New path: Create base reader for direct decoder access
103+
// Create base reader for direct decoder access
101104
auto base_reader =
102105
std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream));
103106
file_schema = base_reader->dataSchema();
104107
base_reader_ = std::move(base_reader);
105108
} else {
106-
// Legacy path: Create DataFileReader<GenericDatum>
109+
// Create DataFileReader<GenericDatum> for GenericDatum-based decoding
107110
auto datum_reader = std::make_unique<::avro::DataFileReader<::avro::GenericDatum>>(
108111
std::move(input_stream));
109112
file_schema = datum_reader->dataSchema();
@@ -165,17 +168,17 @@ class AvroReader::Impl {
165168
}
166169

167170
if (use_direct_decoder_) {
168-
// New path: Use direct decoder
171+
// Direct decoder: decode Avro to Arrow without GenericDatum
169172
if (!base_reader_->hasMore()) {
170173
break;
171174
}
172175
base_reader_->decr();
173176

174-
ICEBERG_RETURN_UNEXPECTED(
175-
DecodeAvroToBuilder(GetReaderSchema().root(), base_reader_->decoder(),
176-
projection_, *read_schema_, context_->builder_.get()));
177+
ICEBERG_RETURN_UNEXPECTED(DecodeAvroToBuilder(
178+
GetReaderSchema().root(), base_reader_->decoder(), projection_, *read_schema_,
179+
context_->builder_.get(), &context_->decode_context_));
177180
} else {
178-
// Legacy path: Use GenericDatum
181+
// GenericDatum-based decoding: decode via GenericDatum intermediate
179182
if (!datum_reader_->read(*context_->datum_)) {
180183
break;
181184
}
@@ -248,7 +251,7 @@ class AvroReader::Impl {
248251
}
249252
context_->builder_ = builder_result.MoveValueUnsafe();
250253

251-
// Initialize GenericDatum for legacy path
254+
// Initialize GenericDatum for GenericDatum-based decoding
252255
if (!use_direct_decoder_) {
253256
context_->datum_ = std::make_unique<::avro::GenericDatum>(GetReaderSchema());
254257
}
@@ -321,9 +324,9 @@ class AvroReader::Impl {
321324
std::shared_ptr<::iceberg::Schema> read_schema_;
322325
// The projection result to apply to the read schema.
323326
SchemaProjection projection_;
324-
// The avro reader base - provides direct access to decoder (new path).
327+
// The avro reader base - provides direct access to decoder for direct decoding.
325328
std::unique_ptr<::avro::DataFileReaderBase> base_reader_;
326-
// The datum reader for GenericDatum-based decoding (legacy path).
329+
// The datum reader for GenericDatum-based decoding.
327330
std::unique_ptr<::avro::DataFileReader<::avro::GenericDatum>> datum_reader_;
328331
// The context to keep track of the reading progress.
329332
std::unique_ptr<ReadContext> context_;

src/iceberg/avro/avro_scan.cc

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include <chrono>
21+
#include <iostream>
22+
#include <memory>
23+
#include <string>
24+
25+
#include <arrow/array.h>
26+
#include <arrow/c/bridge.h>
27+
#include <arrow/filesystem/localfs.h>
28+
#include <arrow/type.h>
29+
30+
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
31+
#include "iceberg/avro/avro_register.h"
32+
#include "iceberg/file_reader.h"
33+
#include "iceberg/schema.h"
34+
35+
void PrintUsage(const char* program_name) {
36+
std::cerr << "Usage: " << program_name << " [options] <avro_file>\n"
37+
<< "Options:\n"
38+
<< " --skip-datum=<true|false> Use direct decoder (default: true)\n"
39+
<< " --batch-size=<N> Batch size for reading (default: 4096)\n"
40+
<< " --help Show this help message\n"
41+
<< "\nExample:\n"
42+
<< " " << program_name
43+
<< " --skip-datum=false --batch-size=1000 data.avro\n";
44+
}
45+
46+
int main(int argc, char* argv[]) {
47+
iceberg::avro::RegisterAll();
48+
49+
if (argc < 2) {
50+
PrintUsage(argv[0]);
51+
return 1;
52+
}
53+
54+
std::string avro_file;
55+
bool skip_datum = true;
56+
int64_t batch_size = 4096;
57+
58+
// Parse arguments
59+
for (int i = 1; i < argc; ++i) {
60+
std::string arg = argv[i];
61+
if (arg == "--help") {
62+
PrintUsage(argv[0]);
63+
return 0;
64+
} else if (arg.starts_with("--skip-datum=")) {
65+
std::string value = arg.substr(13);
66+
if (value == "true" || value == "1") {
67+
skip_datum = true;
68+
} else if (value == "false" || value == "0") {
69+
skip_datum = false;
70+
} else {
71+
std::cerr << "Invalid value for --skip-datum: " << value << "\n";
72+
return 1;
73+
}
74+
} else if (arg.starts_with("--batch-size=")) {
75+
batch_size = std::stoll(arg.substr(13));
76+
if (batch_size <= 0) {
77+
std::cerr << "Batch size must be positive\n";
78+
return 1;
79+
}
80+
} else if (arg[0] == '-') {
81+
std::cerr << "Unknown option: " << arg << "\n";
82+
PrintUsage(argv[0]);
83+
return 1;
84+
} else {
85+
avro_file = arg;
86+
}
87+
}
88+
89+
if (avro_file.empty()) {
90+
std::cerr << "Error: No Avro file specified\n";
91+
PrintUsage(argv[0]);
92+
return 1;
93+
}
94+
95+
std::cout << "Scanning Avro file: " << avro_file << "\n";
96+
std::cout << "Skip datum: " << (skip_datum ? "true" : "false") << "\n";
97+
std::cout << "Batch size: " << batch_size << "\n";
98+
std::cout << std::string(60, '-') << "\n";
99+
100+
auto local_fs = std::make_shared<::arrow::fs::LocalFileSystem>();
101+
auto file_io = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(local_fs);
102+
103+
// Get file info
104+
auto file_info_result = local_fs->GetFileInfo(avro_file);
105+
if (!file_info_result.ok()) {
106+
std::cerr << "Error: Cannot access file: " << file_info_result.status().message()
107+
<< "\n";
108+
return 1;
109+
}
110+
auto file_info = file_info_result.ValueOrDie();
111+
if (file_info.type() != ::arrow::fs::FileType::File) {
112+
std::cerr << "Error: Not a file: " << avro_file << "\n";
113+
return 1;
114+
}
115+
116+
std::cout << "File size: " << file_info.size() << " bytes\n";
117+
118+
// Configure reader properties
119+
auto reader_properties = iceberg::ReaderProperties::default_properties();
120+
reader_properties->Set(iceberg::ReaderProperties::kAvroSkipDatum, skip_datum);
121+
reader_properties->Set(iceberg::ReaderProperties::kBatchSize, batch_size);
122+
123+
// Open reader (without projection to read all columns)
124+
auto reader_result = iceberg::ReaderFactoryRegistry::Open(
125+
iceberg::FileFormatType::kAvro, {.path = avro_file,
126+
.length = file_info.size(),
127+
.io = file_io,
128+
.projection = nullptr,
129+
.properties = std::move(reader_properties)});
130+
131+
if (!reader_result.has_value()) {
132+
std::cerr << "Error opening reader: " << reader_result.error().message << "\n";
133+
return 1;
134+
}
135+
136+
auto reader = std::move(reader_result.value());
137+
138+
// Get schema
139+
auto schema_result = reader->Schema();
140+
if (!schema_result.has_value()) {
141+
std::cerr << "Error getting schema: " << schema_result.error().message << "\n";
142+
return 1;
143+
}
144+
auto arrow_schema = schema_result.value();
145+
auto arrow_schema_import = ::arrow::ImportType(&arrow_schema);
146+
if (!arrow_schema_import.ok()) {
147+
std::cerr << "Error importing schema: " << arrow_schema_import.status().message()
148+
<< "\n";
149+
return 1;
150+
}
151+
std::cout << "Schema: " << arrow_schema_import.ValueOrDie()->ToString() << "\n";
152+
std::cout << std::string(60, '-') << "\n";
153+
154+
// Scan file and measure time
155+
auto start = std::chrono::high_resolution_clock::now();
156+
157+
int64_t total_rows = 0;
158+
int64_t batch_count = 0;
159+
160+
while (true) {
161+
auto batch_result = reader->Next();
162+
if (!batch_result.has_value()) {
163+
std::cerr << "Error reading batch: " << batch_result.error().message << "\n";
164+
return 1;
165+
}
166+
167+
auto batch_opt = batch_result.value();
168+
if (!batch_opt.has_value()) {
169+
// End of file
170+
break;
171+
}
172+
173+
auto arrow_array = batch_opt.value();
174+
auto arrow_type = arrow_schema_import.ValueOrDie();
175+
auto array_import = ::arrow::ImportArray(&arrow_array, arrow_type);
176+
if (!array_import.ok()) {
177+
std::cerr << "Error importing array: " << array_import.status().message() << "\n";
178+
return 1;
179+
}
180+
181+
int64_t batch_rows = array_import.ValueOrDie()->length();
182+
total_rows += batch_rows;
183+
batch_count++;
184+
}
185+
186+
auto end = std::chrono::high_resolution_clock::now();
187+
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
188+
189+
// Print results
190+
std::cout << "\nResults:\n";
191+
std::cout << " Total rows: " << total_rows << "\n";
192+
std::cout << " Batches: " << batch_count << "\n";
193+
std::cout << " Time: " << duration.count() << " ms\n";
194+
std::cout << " Throughput: "
195+
<< (duration.count() > 0 ? (total_rows * 1000 / duration.count()) : 0)
196+
<< " rows/sec\n";
197+
std::cout << " Throughput: "
198+
<< (duration.count() > 0
199+
? (file_info.size() / 1024.0 / 1024.0) / (duration.count() / 1000.0)
200+
: 0)
201+
<< " MB/sec\n";
202+
203+
return 0;
204+
}

src/iceberg/file_reader.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,10 @@ class ReaderProperties : public ConfigBase<ReaderProperties> {
7676
/// \brief The batch size to read.
7777
inline static Entry<int64_t> kBatchSize{"read.batch-size", 4096};
7878

79-
/// \brief Use direct Avro decoder (true) or GenericDatum-based decoder (false).
80-
/// Default: true (use direct decoder for better performance).
81-
inline static Entry<bool> kAvroUseDirectDecoder{"avro.use-direct-decoder", true};
79+
/// \brief Skip GenericDatum in Avro reader for better performance.
80+
/// When true, decode directly from Avro to Arrow without GenericDatum intermediate.
81+
/// Default: true (skip GenericDatum for better performance).
82+
inline static Entry<bool> kAvroSkipDatum{"read.avro.skip-datum", true};
8283

8384
/// \brief Create a default ReaderProperties instance.
8485
static std::unique_ptr<ReaderProperties> default_properties();

0 commit comments

Comments
 (0)