Skip to content

Commit f3d62c3

Browse files
committed
feat: add support to write metadata for avro and parquet
1 parent 7f7f85b commit f3d62c3

File tree

7 files changed

+46
-13
lines changed

7 files changed

+46
-13
lines changed

src/iceberg/avro/avro_writer.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class AvroWriter::Impl {
7575
CreateOutputStream(options, kDefaultBufferSize));
7676
arrow_output_stream_ = output_stream->arrow_output_stream();
7777
std::map<std::string, std::vector<uint8_t>> metadata;
78-
for (const auto& [key, value] : options.properties) {
78+
for (const auto& [key, value] : options.metadata) {
7979
std::vector<uint8_t> vec;
8080
vec.reserve(value.size());
8181
vec.assign(value.begin(), value.end());

src/iceberg/file_reader.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class ICEBERG_EXPORT Reader {
4242
Reader& operator=(const Reader&) = delete;
4343

4444
/// \brief Open the reader.
45-
virtual Status Open(const struct ReaderOptions& options) = 0;
45+
virtual Status Open(const ReaderOptions& options) = 0;
4646

4747
/// \brief Close the reader.
4848
virtual Status Close() = 0;
@@ -93,6 +93,7 @@ struct ICEBERG_EXPORT ReaderOptions {
9393
/// that may have different field names than the current schema.
9494
std::shared_ptr<class NameMapping> name_mapping;
9595
/// \brief Format-specific or implementation-specific properties.
96+
/// TODO(gangwu): replace it with a new class inherited from `ConfigBase`.
9697
std::unordered_map<std::string, std::string> properties;
9798
};
9899

src/iceberg/file_writer.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,10 @@ struct ICEBERG_EXPORT WriterOptions {
4444
/// to the specific FileIO implementation. By default, the `iceberg-bundle` library uses
4545
/// `ArrowFileSystemFileIO` as the default implementation.
4646
std::shared_ptr<class FileIO> io;
47+
/// \brief Metadata to write to the file.
48+
std::unordered_map<std::string, std::string> metadata;
4749
/// \brief Format-specific or implementation-specific properties.
50+
/// TODO(gangwu): replace it with a new class inherited from `ConfigBase`.
4851
std::unordered_map<std::string, std::string> properties;
4952
};
5053

@@ -57,7 +60,7 @@ class ICEBERG_EXPORT Writer {
5760
Writer& operator=(const Writer&) = delete;
5861

5962
/// \brief Open the writer.
60-
virtual Status Open(const struct WriterOptions& options) = 0;
63+
virtual Status Open(const WriterOptions& options) = 0;
6164

6265
/// \brief Close the writer.
6366
virtual Status Close() = 0;

src/iceberg/parquet/parquet_writer.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ class ParquetWriter::Impl {
6868

6969
ICEBERG_ASSIGN_OR_RAISE(output_stream_, OpenOutputStream(options));
7070
auto file_writer = ::parquet::ParquetFileWriter::Open(
71-
output_stream_, std::move(schema_node), std::move(writer_properties));
71+
output_stream_, std::move(schema_node), std::move(writer_properties),
72+
std::make_shared<::arrow::KeyValueMetadata>(options.metadata));
7273
ICEBERG_ARROW_RETURN_NOT_OK(
7374
::parquet::arrow::FileWriter::Make(pool_, std::move(file_writer), arrow_schema_,
7475
std::move(arrow_writer_properties), &writer_));

src/iceberg/test/avro_test.cc

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,13 @@ class AvroReaderTest : public TempFileTestBase {
123123
auto export_result = ::arrow::ExportArray(*array, &arrow_array);
124124
ASSERT_TRUE(export_result.ok());
125125

126-
auto writer_result = WriterFactoryRegistry::Open(
127-
FileFormatType::kAvro,
128-
{.path = temp_avro_file_, .schema = schema, .io = file_io_});
126+
std::unordered_map<std::string, std::string> metadata = {{"k1", "v1"}, {"k2", "v2"}};
127+
128+
auto writer_result =
129+
WriterFactoryRegistry::Open(FileFormatType::kAvro, {.path = temp_avro_file_,
130+
.schema = schema,
131+
.io = file_io_,
132+
.metadata = metadata});
129133
ASSERT_TRUE(writer_result.has_value());
130134
auto writer = std::move(writer_result.value());
131135
ASSERT_THAT(writer->Write(&arrow_array), IsOk());
@@ -144,6 +148,15 @@ class AvroReaderTest : public TempFileTestBase {
144148
auto reader = std::move(reader_result.value());
145149
ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, expected_string));
146150
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
151+
152+
auto metadata_result = reader->Metadata();
153+
ASSERT_THAT(metadata_result, IsOk());
154+
auto read_metadata = std::move(metadata_result.value());
155+
for (const auto& [key, value] : metadata) {
156+
auto it = read_metadata.find(key);
157+
ASSERT_NE(it, read_metadata.end());
158+
ASSERT_EQ(it->second, value);
159+
}
147160
}
148161

149162
std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;

src/iceberg/test/parquet_test.cc

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ Status WriteArray(std::shared_ptr<::arrow::Array> data,
6363
}
6464

6565
Status ReadArray(std::shared_ptr<::arrow::Array>& out,
66-
const ReaderOptions& reader_options) {
66+
const ReaderOptions& reader_options,
67+
std::unordered_map<std::string, std::string>* metadata) {
6768
ICEBERG_ASSIGN_OR_RAISE(
6869
auto reader, ReaderFactoryRegistry::Open(FileFormatType::kParquet, reader_options));
6970
ICEBERG_ASSIGN_OR_RAISE(auto read_data, reader->Next());
@@ -77,6 +78,11 @@ Status ReadArray(std::shared_ptr<::arrow::Array>& out,
7778
ICEBERG_ASSIGN_OR_RAISE(ArrowSchema arrow_schema, reader->Schema());
7879
ICEBERG_ARROW_ASSIGN_OR_RETURN(out,
7980
::arrow::ImportArray(&arrow_c_array, &arrow_schema));
81+
82+
if (metadata) {
83+
ICEBERG_ASSIGN_OR_RAISE(*metadata, reader->Metadata());
84+
}
85+
8086
return {};
8187
}
8288

@@ -85,18 +91,25 @@ void DoRoundtrip(std::shared_ptr<::arrow::Array> data, std::shared_ptr<Schema> s
8591
std::shared_ptr<FileIO> file_io = arrow::ArrowFileSystemFileIO::MakeMockFileIO();
8692
const std::string basePath = "base.parquet";
8793

94+
std::unordered_map<std::string, std::string> metadata = {{"k1", "v1"}, {"k2", "v2"}};
95+
8896
auto writer_data = WriterFactoryRegistry::Open(
89-
FileFormatType::kParquet, {.path = basePath, .schema = schema, .io = file_io});
97+
FileFormatType::kParquet,
98+
{.path = basePath, .schema = schema, .io = file_io, .metadata = metadata});
9099
ASSERT_THAT(writer_data, IsOk())
91100
<< "Failed to create writer: " << writer_data.error().message;
92101
auto writer = std::move(writer_data.value());
93102
ASSERT_THAT(WriteArray(data, *writer), IsOk());
94103

95-
ASSERT_THAT(ReadArray(out, {.path = basePath,
96-
.length = writer->length(),
97-
.io = file_io,
98-
.projection = schema}),
104+
std::unordered_map<std::string, std::string> read_metadata;
105+
ASSERT_THAT(ReadArray(out,
106+
{.path = basePath,
107+
.length = writer->length(),
108+
.io = file_io,
109+
.projection = schema},
110+
&read_metadata),
99111
IsOk());
112+
ASSERT_EQ(read_metadata, metadata);
100113

101114
ASSERT_TRUE(out != nullptr) << "Reader.Next() returned no data";
102115
}

src/iceberg/type_fwd.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,8 @@ class ManifestListWriter;
139139
class ManifestReader;
140140
class ManifestWriter;
141141

142+
struct ReaderOptions;
143+
struct WriterOptions;
142144
class Reader;
143145
class Writer;
144146

0 commit comments

Comments
 (0)