Skip to content

Commit d4ad8f7

Browse files
authored
feat(avro): support writing multiple blocks (#470)
1 parent efe0e37 commit d4ad8f7

File tree

2 files changed

+54
-1
lines changed

2 files changed

+54
-1
lines changed

src/iceberg/avro/avro_writer.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ class DirectEncoderBackend : public AvroWriteBackend {
8080

8181
Status WriteRow(const Schema& write_schema, const ::arrow::Array& array,
8282
int64_t row_index) override {
83+
writer_->syncIfNeeded();
8384
ICEBERG_RETURN_UNEXPECTED(EncodeArrowToAvro(avro_root_node_, writer_->encoder(),
8485
write_schema, array, row_index,
8586
encode_ctx_));

src/iceberg/test/avro_test.cc

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,27 @@
1818
*/
1919

2020
#include <sstream>
21+
#include <unordered_map>
2122

2223
#include <arrow/array.h>
2324
#include <arrow/array/array_base.h>
2425
#include <arrow/c/bridge.h>
2526
#include <arrow/json/from_string.h>
27+
#include <avro/DataFile.hh>
28+
#include <avro/Generic.hh>
2629
#include <gtest/gtest.h>
2730

2831
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
2932
#include "iceberg/avro/avro_register.h"
33+
#include "iceberg/avro/avro_stream_internal.h"
3034
#include "iceberg/avro/avro_writer.h"
3135
#include "iceberg/file_reader.h"
3236
#include "iceberg/metadata_columns.h"
3337
#include "iceberg/schema.h"
3438
#include "iceberg/schema_internal.h"
3539
#include "iceberg/test/matchers.h"
3640
#include "iceberg/type.h"
41+
#include "iceberg/util/checked_cast.h"
3742

3843
namespace iceberg::avro {
3944

@@ -639,7 +644,9 @@ class AvroWriterTest : public ::testing::Test,
639644
skip_datum_ = GetParam();
640645
}
641646

642-
void WriteAvroFile(std::shared_ptr<Schema> schema, const std::string& json_data) {
647+
void WriteAvroFile(
648+
std::shared_ptr<Schema> schema, const std::string& json_data,
649+
const std::unordered_map<std::string, std::string>& extra_properties = {}) {
643650
ArrowSchema arrow_c_schema;
644651
ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk());
645652

@@ -660,6 +667,9 @@ class AvroWriterTest : public ::testing::Test,
660667

661668
auto writer_properties = WriterProperties::default_properties();
662669
writer_properties->Set(WriterProperties::kAvroSkipDatum, skip_datum_);
670+
for (const auto& [key, value] : extra_properties) {
671+
writer_properties->mutable_configs().emplace(key, value);
672+
}
663673

664674
auto writer_result = WriterFactoryRegistry::Open(
665675
FileFormatType::kAvro, {.path = temp_avro_file_,
@@ -884,6 +894,48 @@ TEST_P(AvroWriterTest, WriteLargeDataset) {
884894
VerifyWrittenData(json.str());
885895
}
886896

897+
TEST_P(AvroWriterTest, MultipleAvroBlocks) {
898+
auto schema = std::make_shared<Schema>(
899+
std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
900+
SchemaField::MakeRequired(2, "name", string())});
901+
902+
const std::string json_data = R"([
903+
[1, "Alice_with_a_very_long_name_to_exceed_sync_interval"],
904+
[2, "Bob_with_another_very_long_name_to_exceed_sync_interval"],
905+
[3, "Charlie_with_yet_another_very_long_name_to_exceed_sync"],
906+
[4, "David_with_a_super_long_name_that_will_exceed_interval"],
907+
[5, "Eve_with_an_extremely_long_name_to_force_new_block_here"]
908+
])";
909+
910+
const std::vector<std::pair</*sync_interval*/ std::string, /*num_blocks*/ size_t>>
911+
test_cases = {{"32", 5}, {"65536", 1}};
912+
913+
for (const auto& [interval, num_blocks] : test_cases) {
914+
WriteAvroFile(schema, json_data,
915+
{{WriterProperties::kAvroSyncInterval.key(), interval}});
916+
VerifyWrittenData(json_data);
917+
918+
// Use raw avro-cpp reader to count blocks by tracking previousSync() changes
919+
auto mock_io = internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(file_io_);
920+
auto input = mock_io->fs()->OpenInputFile(temp_avro_file_).ValueOrDie();
921+
auto input_stream = std::make_unique<AvroInputStream>(std::move(input), 1024 * 1024);
922+
::avro::DataFileReader<::avro::GenericDatum> avro_reader(std::move(input_stream));
923+
::avro::GenericDatum datum(avro_reader.dataSchema());
924+
925+
size_t block_count = 0;
926+
int64_t last_sync = -1;
927+
928+
while (avro_reader.read(datum)) {
929+
if (int64_t current_sync = avro_reader.previousSync(); current_sync != last_sync) {
930+
block_count++;
931+
last_sync = current_sync;
932+
}
933+
}
934+
935+
ASSERT_EQ(block_count, num_blocks);
936+
}
937+
}
938+
887939
// Instantiate parameterized tests for both direct encoder and GenericDatum paths
888940
INSTANTIATE_TEST_SUITE_P(DirectEncoderModes, AvroWriterTest,
889941
::testing::Values(true, false),

0 commit comments

Comments
 (0)