From 31c06e59cfd6f21cf63f34abea7d840449a8d00f Mon Sep 17 00:00:00 2001 From: Zehua Zou <41586196+HuaHuaY@users.noreply.github.com> Date: Thu, 28 Aug 2025 11:26:10 +0800 Subject: [PATCH 1/9] basic support parquet writer --- .gitignore | 3 + src/iceberg/CMakeLists.txt | 3 +- src/iceberg/parquet/parquet_writer.cc | 149 ++++++++++++++++++++++++++ src/iceberg/parquet/parquet_writer.h | 53 +++++++++ 4 files changed, 207 insertions(+), 1 deletion(-) create mode 100644 src/iceberg/parquet/parquet_writer.cc create mode 100644 src/iceberg/parquet/parquet_writer.h diff --git a/.gitignore b/.gitignore index 2b7154b0d..74df5bd65 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,6 @@ cmake-build-release/ # intellij files .idea + +# vscode files +.vscode diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index bccd4d9d7..68cbd35c6 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -115,7 +115,8 @@ if(ICEBERG_BUILD_BUNDLE) parquet/parquet_data_util.cc parquet/parquet_reader.cc parquet/parquet_register.cc - parquet/parquet_schema_util.cc) + parquet/parquet_schema_util.cc + parquet/parquet_writer.cc) # Libraries to link with exported libiceberg_bundle.{so,a}. set(ICEBERG_BUNDLE_STATIC_BUILD_INTERFACE_LIBS) diff --git a/src/iceberg/parquet/parquet_writer.cc b/src/iceberg/parquet/parquet_writer.cc new file mode 100644 index 000000000..c69bd3823 --- /dev/null +++ b/src/iceberg/parquet/parquet_writer.cc @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/parquet/parquet_writer.h" + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/arrow/arrow_error_transform_internal.h" +#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/schema_internal.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg::parquet { + +namespace { + +Result> OpenOutputStream( + const WriterOptions& options) { + auto io = internal::checked_pointer_cast(options.io); + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto output, io->fs()->OpenOutputStream(options.path)); + return output; +} + +} // namespace + +class ParquetWriter::Impl { + public: + Status Open(const WriterOptions& options) { + auto writer_properties = + ::parquet::WriterProperties::Builder().memory_pool(pool_)->build(); + auto arrow_writer_properties = ::parquet::default_arrow_writer_properties(); + + ArrowSchema c_schema; + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*options.schema, &c_schema)); + ICEBERG_ARROW_ASSIGN_OR_RETURN(arrow_schema_, ::arrow::ImportSchema(&c_schema)); + + std::shared_ptr<::parquet::SchemaDescriptor> schema_descriptor; + ICEBERG_ARROW_RETURN_NOT_OK( + ::parquet::arrow::ToParquetSchema(arrow_schema_.get(), *writer_properties, + *arrow_writer_properties, &schema_descriptor)); + auto schema_node = std::static_pointer_cast<::parquet::schema::GroupNode>( + schema_descriptor->schema_root()); + + std::shared_ptr<::arrow::KeyValueMetadata> metadata = + ::arrow::key_value_metadata(options.properties); + + ICEBERG_ASSIGN_OR_RAISE(auto output_stream, OpenOutputStream(options)); + auto file_writer = ::parquet::ParquetFileWriter::Open( + std::move(output_stream), schema_node, writer_properties, metadata); + ICEBERG_ARROW_RETURN_NOT_OK(::parquet::arrow::FileWriter::Make( + pool_, std::move(file_writer), arrow_schema_, arrow_writer_properties, &writer_)); + + return {}; + } + + Status Write(ArrowArray array) { + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto batch, + ::arrow::ImportRecordBatch(&array, arrow_schema_)); + + ICEBERG_ARROW_RETURN_NOT_OK(writer_->WriteRecordBatch(*batch)); + + return {}; + } + + // Close the writer and release resources + Status Close() { + if (writer_ != nullptr) { + ICEBERG_ARROW_RETURN_NOT_OK(writer_->Close()); + writer_.reset(); + } + return {}; + } + + bool Closed() const { return writer_ == nullptr; } + + private: + // TODO(gangwu): make memory pool configurable + ::arrow::MemoryPool* pool_ = ::arrow::default_memory_pool(); + // Schema to write from the Parquet file. + std::shared_ptr<::arrow::Schema> arrow_schema_; + // Parquet file writer to write ArrowArray. + std::unique_ptr<::parquet::arrow::FileWriter> writer_; +}; + +ParquetWriter::~ParquetWriter() = default; + +Status ParquetWriter::Open(const WriterOptions& options) { + impl_ = std::make_unique(); + return impl_->Open(options); +} + +Status ParquetWriter::Write(ArrowArray array) { return impl_->Write(array); } + +Status ParquetWriter::Close() { return impl_->Close(); } + +std::optional ParquetWriter::metrics() { + if (!impl_->Closed()) { + return std::nullopt; + } + return {}; +} + +std::optional ParquetWriter::length() { + if (!impl_->Closed()) { + return std::nullopt; + } + return {}; +} + +std::vector ParquetWriter::split_offsets() { + if (!impl_->Closed()) { + return {}; + } + return {}; +} + +void ParquetWriter::Register() { + static WriterFactoryRegistry parquet_writer_register( + FileFormatType::kParquet, []() -> Result> { + return std::make_unique(); + }); +} + +} // namespace iceberg::parquet diff --git a/src/iceberg/parquet/parquet_writer.h b/src/iceberg/parquet/parquet_writer.h new file mode 100644 index 000000000..1bc09944c --- /dev/null +++ b/src/iceberg/parquet/parquet_writer.h @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include "iceberg/file_writer.h" +#include "iceberg/iceberg_bundle_export.h" + +namespace iceberg::parquet { + +/// \brief A writer that writes ArrowArray to Parquet files. +class ICEBERG_BUNDLE_EXPORT ParquetWriter : public Writer { + public: + ParquetWriter() = default; + + ~ParquetWriter() override; + + Status Open(const WriterOptions& options) final; + + Status Close() final; + + Status Write(ArrowArray array) final; + + std::optional metrics() final; + + std::optional length() final; + + std::vector split_offsets() final; + + static void Register(); + + private: + class Impl; + std::unique_ptr impl_; +}; + +} // namespace iceberg::parquet From eb8d8b9a42895f29b4c34503891af4bec73235db Mon Sep 17 00:00:00 2001 From: Zehua Zou <41586196+HuaHuaY@users.noreply.github.com> Date: Mon, 1 Sep 2025 14:23:03 +0800 Subject: [PATCH 2/9] add some tests --- src/iceberg/constants.h | 28 ++++ src/iceberg/parquet/parquet_reader.cc | 7 +- src/iceberg/parquet/parquet_register.cc | 2 - src/iceberg/parquet/parquet_schema_util.cc | 3 +- src/iceberg/parquet/parquet_writer.cc | 19 ++- src/iceberg/parquet/parquet_writer.h | 2 - src/iceberg/schema_internal.h | 3 +- src/iceberg/type.cc | 12 ++ src/iceberg/type.h | 16 ++ test/parquet_test.cc | 176 ++++++++++++++++++++- 10 files changed, 245 insertions(+), 23 deletions(-) create mode 100644 src/iceberg/constants.h diff --git a/src/iceberg/constants.h b/src/iceberg/constants.h new file mode 100644 index 000000000..61d10e2d6 --- /dev/null +++ b/src/iceberg/constants.h @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +namespace iceberg { + +constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id"; + +} // namespace iceberg diff --git a/src/iceberg/parquet/parquet_reader.cc b/src/iceberg/parquet/parquet_reader.cc index 405b09f03..376a56832 100644 --- a/src/iceberg/parquet/parquet_reader.cc +++ b/src/iceberg/parquet/parquet_reader.cc @@ -125,9 +125,9 @@ class ParquetReader::Impl { arrow_reader_properties.set_arrow_extensions_enabled(true); // Open the Parquet file reader - ICEBERG_ASSIGN_OR_RAISE(auto input_stream, OpenInputStream(options)); + ICEBERG_ASSIGN_OR_RAISE(input_stream_, OpenInputStream(options)); auto file_reader = - ::parquet::ParquetFileReader::Open(std::move(input_stream), reader_properties); + ::parquet::ParquetFileReader::Open(input_stream_, reader_properties); ICEBERG_ARROW_RETURN_NOT_OK(::parquet::arrow::FileReader::Make( pool_, std::move(file_reader), arrow_reader_properties, &reader_)); @@ -169,6 +169,7 @@ class ParquetReader::Impl { } reader_.reset(); + ICEBERG_ARROW_RETURN_NOT_OK(input_stream_->Close()); return {}; } @@ -240,6 +241,8 @@ class ParquetReader::Impl { std::unique_ptr<::parquet::arrow::FileReader> reader_; // The context to keep track of the reading progress. std::unique_ptr context_; + // The input stream to read Parquet file. + std::shared_ptr<::arrow::io::RandomAccessFile> input_stream_; }; ParquetReader::~ParquetReader() = default; diff --git a/src/iceberg/parquet/parquet_register.cc b/src/iceberg/parquet/parquet_register.cc index 19988cd29..d79e158c1 100644 --- a/src/iceberg/parquet/parquet_register.cc +++ b/src/iceberg/parquet/parquet_register.cc @@ -21,8 +21,6 @@ namespace iceberg::parquet { -void RegisterWriter() {} - void RegisterAll() { RegisterReader(); RegisterWriter(); diff --git a/src/iceberg/parquet/parquet_schema_util.cc b/src/iceberg/parquet/parquet_schema_util.cc index 361489973..088c15c04 100644 --- a/src/iceberg/parquet/parquet_schema_util.cc +++ b/src/iceberg/parquet/parquet_schema_util.cc @@ -26,6 +26,7 @@ #include #include +#include "iceberg/constants.h" #include "iceberg/metadata_columns.h" #include "iceberg/parquet/parquet_schema_util_internal.h" #include "iceberg/result.h" @@ -38,8 +39,6 @@ namespace iceberg::parquet { namespace { -constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id"; - std::optional FieldIdFromMetadata( const std::shared_ptr& metadata) { if (!metadata) { diff --git a/src/iceberg/parquet/parquet_writer.cc b/src/iceberg/parquet/parquet_writer.cc index c69bd3823..91d294127 100644 --- a/src/iceberg/parquet/parquet_writer.cc +++ b/src/iceberg/parquet/parquet_writer.cc @@ -69,9 +69,9 @@ class ParquetWriter::Impl { std::shared_ptr<::arrow::KeyValueMetadata> metadata = ::arrow::key_value_metadata(options.properties); - ICEBERG_ASSIGN_OR_RAISE(auto output_stream, OpenOutputStream(options)); - auto file_writer = ::parquet::ParquetFileWriter::Open( - std::move(output_stream), schema_node, writer_properties, metadata); + ICEBERG_ASSIGN_OR_RAISE(output_stream_, OpenOutputStream(options)); + auto file_writer = ::parquet::ParquetFileWriter::Open(output_stream_, schema_node, + writer_properties, metadata); ICEBERG_ARROW_RETURN_NOT_OK(::parquet::arrow::FileWriter::Make( pool_, std::move(file_writer), arrow_schema_, arrow_writer_properties, &writer_)); @@ -89,10 +89,13 @@ class ParquetWriter::Impl { // Close the writer and release resources Status Close() { - if (writer_ != nullptr) { - ICEBERG_ARROW_RETURN_NOT_OK(writer_->Close()); - writer_.reset(); + if (writer_ == nullptr) { + return {}; // Already closed } + + ICEBERG_ARROW_RETURN_NOT_OK(writer_->Close()); + writer_.reset(); + ICEBERG_ARROW_RETURN_NOT_OK(output_stream_->Close()); return {}; } @@ -105,6 +108,8 @@ class ParquetWriter::Impl { std::shared_ptr<::arrow::Schema> arrow_schema_; // Parquet file writer to write ArrowArray. std::unique_ptr<::parquet::arrow::FileWriter> writer_; + // The output stream to write Parquet file. + std::shared_ptr<::arrow::io::OutputStream> output_stream_; }; ParquetWriter::~ParquetWriter() = default; @@ -139,7 +144,7 @@ std::vector ParquetWriter::split_offsets() { return {}; } -void ParquetWriter::Register() { +void RegisterWriter() { static WriterFactoryRegistry parquet_writer_register( FileFormatType::kParquet, []() -> Result> { return std::make_unique(); diff --git a/src/iceberg/parquet/parquet_writer.h b/src/iceberg/parquet/parquet_writer.h index 1bc09944c..5371f3810 100644 --- a/src/iceberg/parquet/parquet_writer.h +++ b/src/iceberg/parquet/parquet_writer.h @@ -43,8 +43,6 @@ class ICEBERG_BUNDLE_EXPORT ParquetWriter : public Writer { std::vector split_offsets() final; - static void Register(); - private: class Impl; std::unique_ptr impl_; diff --git a/src/iceberg/schema_internal.h b/src/iceberg/schema_internal.h index 1a31e68f2..7624f061d 100644 --- a/src/iceberg/schema_internal.h +++ b/src/iceberg/schema_internal.h @@ -24,6 +24,7 @@ #include +#include "iceberg/constants.h" #include "iceberg/iceberg_export.h" #include "iceberg/result.h" #include "iceberg/type_fwd.h" @@ -33,7 +34,7 @@ namespace iceberg { // Apache Arrow C++ uses "PARQUET:field_id" to store field IDs for Parquet. // Here we follow a similar convention for Iceberg but we might also add // "PARQUET:field_id" in the future once we implement a Parquet writer. -constexpr std::string_view kFieldIdKey = "ICEBERG:field_id"; +constexpr std::string_view kFieldIdKey = kParquetFieldIdKey; /// \brief Convert an Iceberg schema to an Arrow schema. /// diff --git a/src/iceberg/type.cc b/src/iceberg/type.cc index e66f96daf..f77b76173 100644 --- a/src/iceberg/type.cc +++ b/src/iceberg/type.cc @@ -319,4 +319,16 @@ std::shared_ptr fixed(int32_t length) { return std::make_shared(length); } +std::shared_ptr map(SchemaField key, SchemaField value) { + return std::make_shared(key, value); +} + +std::shared_ptr list(SchemaField element) { + return std::make_shared(std::move(element)); +} + +std::shared_ptr struct_(std::vector fields) { + return std::make_shared(std::move(fields)); +} + } // namespace iceberg diff --git a/src/iceberg/type.h b/src/iceberg/type.h index 78c0141b1..d49f19fcd 100644 --- a/src/iceberg/type.h +++ b/src/iceberg/type.h @@ -487,6 +487,22 @@ ICEBERG_EXPORT std::shared_ptr decimal(int32_t precision, int32_t s /// \return A shared pointer to the FixedType instance. ICEBERG_EXPORT std::shared_ptr fixed(int32_t length); +/// \brief Create a StructType with the given fields. +/// \param fields The fields of the struct. +/// \return A shared pointer to the StructType instance. +ICEBERG_EXPORT std::shared_ptr struct_(std::vector fields); + +/// \brief Create a ListType with the given element field. +/// \param element The element field of the list. +/// \return A shared pointer to the ListType instance. +ICEBERG_EXPORT std::shared_ptr list(SchemaField element); + +/// \brief Create a MapType with the given key and value fields. +/// \param key The key field of the map. +/// \param value The value field of the map. +/// \return A shared pointer to the MapType instance. +ICEBERG_EXPORT std::shared_ptr map(SchemaField key, SchemaField value); + /// @} } // namespace iceberg diff --git a/test/parquet_test.cc b/test/parquet_test.cc index 122256bb3..8a2679b4e 100644 --- a/test/parquet_test.cc +++ b/test/parquet_test.cc @@ -17,38 +17,118 @@ * under the License. */ +#include + #include #include #include #include #include +#include #include +#include +#include +#include +#include +#include +#include #include #include #include +#include "iceberg/arrow/arrow_error_transform_internal.h" #include "iceberg/arrow/arrow_fs_file_io_internal.h" -#include "iceberg/parquet/parquet_reader.h" #include "iceberg/parquet/parquet_register.h" #include "iceberg/schema.h" #include "iceberg/type.h" #include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" #include "matchers.h" -#include "temp_file_test_base.h" namespace iceberg::parquet { -class ParquetReaderTest : public TempFileTestBase { +namespace { + +Status WriteTable(std::shared_ptr<::arrow::Array> data, + const WriterOptions& writer_options) { + ArrowArray arr; + ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportArray(*data, &arr)); + + ICEBERG_ASSIGN_OR_RAISE( + auto writer, WriterFactoryRegistry::Open(FileFormatType::kParquet, writer_options)); + { + internal::ArrowArrayGuard guard(&arr); + ICEBERG_RETURN_UNEXPECTED(writer->Write(arr)); + } + return writer->Close(); +} + +Status ReadTable(std::optional>* out, + const ReaderOptions& reader_options) { + ICEBERG_ASSIGN_OR_RAISE( + auto reader, ReaderFactoryRegistry::Open(FileFormatType::kParquet, reader_options)); + ICEBERG_ASSIGN_OR_RAISE(auto read_data, reader->Next()); + + if (!read_data.has_value()) { + *out = std::nullopt; + return {}; + } + auto arrow_c_array = read_data.value(); + + ArrowSchema arrow_schema; + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*reader_options.projection, &arrow_schema)); + ICEBERG_ARROW_ASSIGN_OR_RETURN(*out, + ::arrow::ImportArray(&arrow_c_array, &arrow_schema)); + return {}; +} + +void DoRoundtrip(std::shared_ptr<::arrow::Array> data, std::shared_ptr schema, + std::shared_ptr<::arrow::Array>* out) { + std::shared_ptr file_io = arrow::ArrowFileSystemFileIO::MakeMockFileIO(); + const std::string basePath = "base.parquet"; + + ASSERT_THAT(WriteTable(data, {.path = basePath, .schema = schema, .io = file_io}), + IsOk()); + + std::optional> out_array; + ASSERT_THAT( + ReadTable(&out_array, {.path = basePath, .io = file_io, .projection = schema}), + IsOk()); + + ASSERT_TRUE(out_array.has_value()) << "Reader.Next() returned no data"; + *out = std::move(out_array.value()); +} + +} // namespace + +class ParquetReaderTest : public ::testing::Test { protected: static void SetUpTestSuite() { parquet::RegisterAll(); } void SetUp() override { - TempFileTestBase::SetUp(); - file_io_ = arrow::ArrowFileSystemFileIO::MakeLocalFileIO(); - temp_parquet_file_ = CreateNewTempFilePathWithSuffix(".parquet"); + file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO(); + temp_parquet_file_ = "parquet_reader_test.parquet"; } void CreateSimpleParquetFile() { + auto schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(2, "name", string())}); + + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk()); + auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie(); + + auto array = + ::arrow::json::ArrayFromJSONString(::arrow::struct_(arrow_schema->fields()), + R"([[1, "Foo"],[2, "Bar"],[3, "Baz"]])") + .ValueOrDie(); + + ASSERT_TRUE(WriteTable( + array, {.path = temp_parquet_file_, .schema = schema, .io = file_io_})); + } + + void CreateSplitParquetFile() { const std::string kParquetFieldIdKey = "PARQUET:field_id"; auto arrow_schema = ::arrow::schema( {::arrow::field("id", ::arrow::int32(), /*nullable=*/false, @@ -70,6 +150,7 @@ class ParquetReaderTest : public TempFileTestBase { ASSERT_TRUE(::parquet::arrow::WriteTable(*table, ::arrow::default_memory_pool(), outfile, /*chunk_size=*/2) .ok()); + ASSERT_TRUE(outfile->Close().ok()); } void VerifyNextBatch(Reader& reader, std::string_view expected_json) { @@ -162,7 +243,7 @@ TEST_F(ParquetReaderTest, ReadWithBatchSize) { } TEST_F(ParquetReaderTest, ReadSplit) { - CreateSimpleParquetFile(); + CreateSplitParquetFile(); // Read split offsets auto io = internal::checked_cast(*file_io_); @@ -204,4 +285,85 @@ TEST_F(ParquetReaderTest, ReadSplit) { } } +class ParquetReadWrite : public ::testing::Test { + protected: + static void SetUpTestSuite() { parquet::RegisterAll(); } +}; + +TEST_F(ParquetReadWrite, EmptyStruct) { + auto schema = + std::make_shared(std::vector{SchemaField::MakeRequired( + 1, "empty_struct", std::make_shared(std::vector{}))}); + + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk()); + auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie(); + + auto array = ::arrow::json::ArrayFromJSONString( + ::arrow::struct_(arrow_schema->fields()), R"([null, {}])") + .ValueOrDie(); + + std::shared_ptr file_io = arrow::ArrowFileSystemFileIO::MakeMockFileIO(); + const std::string basePath = "base.parquet"; + + ASSERT_THAT(WriteTable(array, {.path = basePath, .schema = schema, .io = file_io}), + IsError(ErrorKind::kUnknownError)); +} + +TEST_F(ParquetReadWrite, SimpleStructRoundTrip) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeOptional(1, "a", + struct_({ + SchemaField::MakeOptional(2, "a1", int64()), + SchemaField::MakeOptional(3, "a2", string()), + })), + }); + + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk()); + auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie(); + + auto array = + ::arrow::json::ArrayFromJSONString(::arrow::struct_(arrow_schema->fields()), + R"([[{"a1": 1, "a2": "abc"}], + [{"a1": 0}], + [{"a2": "edf"}], + [{}]])") + .ValueOrDie(); + + std::shared_ptr<::arrow::Array> out; + DoRoundtrip(array, schema, &out); + + ASSERT_TRUE(out->Equals(*array)); +} + +TEST_F(ParquetReadWrite, SimpleTypeRoundTrip) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeOptional(1, "a", boolean()), + SchemaField::MakeOptional(2, "b", int32()), + SchemaField::MakeOptional(3, "c", int64()), + SchemaField::MakeOptional(4, "d", float32()), + SchemaField::MakeOptional(5, "e", float64()), + SchemaField::MakeOptional(6, "f", string()), + SchemaField::MakeOptional(7, "g", time()), + SchemaField::MakeOptional(8, "h", timestamp()), + }); + + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk()); + auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie(); + + auto array = ::arrow::json::ArrayFromJSONString( + ::arrow::struct_(arrow_schema->fields()), + R"([[true, 1, 2, 1.1, 1.2, "abc", 44614000, 1756696503000000], + [false, 0, 0, 0, 0, "", 0, 0], + [null, null, null, null, null, null, null, null]])") + .ValueOrDie(); + + std::shared_ptr<::arrow::Array> out; + DoRoundtrip(array, schema, &out); + + ASSERT_TRUE(out->Equals(*array)); +} + } // namespace iceberg::parquet From e4cc3e295a9882882ed557a7bfab28ffbf606c5d Mon Sep 17 00:00:00 2001 From: Zehua Zou <41586196+HuaHuaY@users.noreply.github.com> Date: Mon, 1 Sep 2025 19:14:20 +0800 Subject: [PATCH 3/9] fix some review comments --- src/iceberg/parquet/parquet_reader.cc | 4 +-- src/iceberg/parquet/parquet_writer.cc | 9 ++---- src/iceberg/schema_internal.cc | 7 +++-- src/iceberg/schema_internal.h | 6 ---- test/arrow_test.cc | 27 +++++++++-------- test/parquet_test.cc | 43 +++++++++++---------------- 6 files changed, 40 insertions(+), 56 deletions(-) diff --git a/src/iceberg/parquet/parquet_reader.cc b/src/iceberg/parquet/parquet_reader.cc index 376a56832..4c86802b3 100644 --- a/src/iceberg/parquet/parquet_reader.cc +++ b/src/iceberg/parquet/parquet_reader.cc @@ -237,12 +237,12 @@ class ParquetReader::Impl { std::shared_ptr<::iceberg::Schema> read_schema_; // The projection result to apply to the read schema. SchemaProjection projection_; + // The input stream to read Parquet file. + std::shared_ptr<::arrow::io::RandomAccessFile> input_stream_; // Parquet file reader to create RecordBatchReader. std::unique_ptr<::parquet::arrow::FileReader> reader_; // The context to keep track of the reading progress. std::unique_ptr context_; - // The input stream to read Parquet file. - std::shared_ptr<::arrow::io::RandomAccessFile> input_stream_; }; ParquetReader::~ParquetReader() = default; diff --git a/src/iceberg/parquet/parquet_writer.cc b/src/iceberg/parquet/parquet_writer.cc index 91d294127..afd4e0a11 100644 --- a/src/iceberg/parquet/parquet_writer.cc +++ b/src/iceberg/parquet/parquet_writer.cc @@ -66,12 +66,9 @@ class ParquetWriter::Impl { auto schema_node = std::static_pointer_cast<::parquet::schema::GroupNode>( schema_descriptor->schema_root()); - std::shared_ptr<::arrow::KeyValueMetadata> metadata = - ::arrow::key_value_metadata(options.properties); - ICEBERG_ASSIGN_OR_RAISE(output_stream_, OpenOutputStream(options)); auto file_writer = ::parquet::ParquetFileWriter::Open(output_stream_, schema_node, - writer_properties, metadata); + writer_properties); ICEBERG_ARROW_RETURN_NOT_OK(::parquet::arrow::FileWriter::Make( pool_, std::move(file_writer), arrow_schema_, arrow_writer_properties, &writer_)); @@ -106,10 +103,10 @@ class ParquetWriter::Impl { ::arrow::MemoryPool* pool_ = ::arrow::default_memory_pool(); // Schema to write from the Parquet file. std::shared_ptr<::arrow::Schema> arrow_schema_; - // Parquet file writer to write ArrowArray. - std::unique_ptr<::parquet::arrow::FileWriter> writer_; // The output stream to write Parquet file. std::shared_ptr<::arrow::io::OutputStream> output_stream_; + // Parquet file writer to write ArrowArray. + std::unique_ptr<::parquet::arrow::FileWriter> writer_; }; ParquetWriter::~ParquetWriter() = default; diff --git a/src/iceberg/schema_internal.cc b/src/iceberg/schema_internal.cc index 124c595d5..beb973b28 100644 --- a/src/iceberg/schema_internal.cc +++ b/src/iceberg/schema_internal.cc @@ -24,6 +24,7 @@ #include #include +#include "iceberg/constants.h" #include "iceberg/schema.h" #include "iceberg/type.h" #include "iceberg/util/macros.h" @@ -45,7 +46,7 @@ ArrowErrorCode ToArrowSchema(const Type& type, bool optional, std::string_view n NANOARROW_RETURN_NOT_OK(ArrowMetadataBuilderInit(&metadata_buffer, nullptr)); if (field_id.has_value()) { NANOARROW_RETURN_NOT_OK(ArrowMetadataBuilderAppend( - &metadata_buffer, ArrowCharView(std::string(kFieldIdKey).c_str()), + &metadata_buffer, ArrowCharView(std::string(kParquetFieldIdKey).c_str()), ArrowCharView(std::to_string(field_id.value()).c_str()))); } @@ -185,8 +186,8 @@ int32_t GetFieldId(const ArrowSchema& schema) { return kUnknownFieldId; } - ArrowStringView field_id_key{.data = kFieldIdKey.data(), - .size_bytes = kFieldIdKey.size()}; + ArrowStringView field_id_key{.data = kParquetFieldIdKey.data(), + .size_bytes = kParquetFieldIdKey.size()}; ArrowStringView field_id_value; if (ArrowMetadataGetValue(schema.metadata, field_id_key, &field_id_value) != NANOARROW_OK) { diff --git a/src/iceberg/schema_internal.h b/src/iceberg/schema_internal.h index 7624f061d..8b290852a 100644 --- a/src/iceberg/schema_internal.h +++ b/src/iceberg/schema_internal.h @@ -24,18 +24,12 @@ #include -#include "iceberg/constants.h" #include "iceberg/iceberg_export.h" #include "iceberg/result.h" #include "iceberg/type_fwd.h" namespace iceberg { -// Apache Arrow C++ uses "PARQUET:field_id" to store field IDs for Parquet. -// Here we follow a similar convention for Iceberg but we might also add -// "PARQUET:field_id" in the future once we implement a Parquet writer. -constexpr std::string_view kFieldIdKey = kParquetFieldIdKey; - /// \brief Convert an Iceberg schema to an Arrow schema. /// /// \param[in] schema The Iceberg schema to convert. diff --git a/test/arrow_test.cc b/test/arrow_test.cc index 202463df0..d76020845 100644 --- a/test/arrow_test.cc +++ b/test/arrow_test.cc @@ -28,6 +28,7 @@ #include #include +#include "iceberg/constants.h" #include "iceberg/schema.h" #include "iceberg/schema_internal.h" #include "matchers.h" @@ -64,8 +65,8 @@ TEST_P(ToArrowSchemaTest, PrimitiveType) { ASSERT_TRUE(field->type()->Equals(param.arrow_type)); auto metadata = field->metadata(); - ASSERT_TRUE(metadata->Contains(kFieldIdKey)); - ASSERT_EQ(metadata->Get(kFieldIdKey), std::to_string(kFieldId)); + ASSERT_TRUE(metadata->Contains(kParquetFieldIdKey)); + ASSERT_EQ(metadata->Get(kParquetFieldIdKey), std::to_string(kFieldId)); } INSTANTIATE_TEST_SUITE_P( @@ -112,8 +113,8 @@ void CheckArrowField(const ::arrow::Field& field, ::arrow::Type::type type_id, auto metadata = field.metadata(); ASSERT_TRUE(metadata != nullptr); - ASSERT_TRUE(metadata->Contains(kFieldIdKey)); - ASSERT_EQ(metadata->Get(kFieldIdKey), std::to_string(field_id)); + ASSERT_TRUE(metadata->Contains(kParquetFieldIdKey)); + ASSERT_EQ(metadata->Get(kParquetFieldIdKey), std::to_string(field_id)); } } // namespace @@ -241,7 +242,7 @@ TEST_P(FromArrowSchemaTest, PrimitiveType) { auto metadata = ::arrow::key_value_metadata(std::unordered_map{ - {std::string(kFieldIdKey), std::to_string(kFieldId)}}); + {std::string(kParquetFieldIdKey), std::to_string(kFieldId)}}); auto arrow_schema = ::arrow::schema({::arrow::field( std::string(kFieldName), param.arrow_type, param.optional, std::move(metadata))}); ArrowSchema exported_schema; @@ -309,16 +310,16 @@ TEST(FromArrowSchemaTest, StructType) { auto int_field = ::arrow::field( std::string(kIntFieldName), ::arrow::int32(), /*nullable=*/false, ::arrow::key_value_metadata(std::unordered_map{ - {std::string(kFieldIdKey), std::to_string(kIntFieldId)}})); + {std::string(kParquetFieldIdKey), std::to_string(kIntFieldId)}})); auto str_field = ::arrow::field( std::string(kStrFieldName), ::arrow::utf8(), /*nullable=*/true, ::arrow::key_value_metadata(std::unordered_map{ - {std::string(kFieldIdKey), std::to_string(kStrFieldId)}})); + {std::string(kParquetFieldIdKey), std::to_string(kStrFieldId)}})); auto struct_type = ::arrow::struct_({int_field, str_field}); auto struct_field = ::arrow::field( std::string(kStructFieldName), struct_type, /*nullable=*/false, ::arrow::key_value_metadata(std::unordered_map{ - {std::string(kFieldIdKey), std::to_string(kStructFieldId)}})); + {std::string(kParquetFieldIdKey), std::to_string(kStructFieldId)}})); auto arrow_schema = ::arrow::schema({struct_field}); ArrowSchema exported_schema; ASSERT_TRUE(::arrow::ExportSchema(*arrow_schema, &exported_schema).ok()); @@ -363,12 +364,12 @@ TEST(FromArrowSchemaTest, ListType) { auto element_field = ::arrow::field( std::string(kElemFieldName), ::arrow::int64(), /*nullable=*/true, ::arrow::key_value_metadata(std::unordered_map{ - {std::string(kFieldIdKey), std::to_string(kElemFieldId)}})); + {std::string(kParquetFieldIdKey), std::to_string(kElemFieldId)}})); auto list_type = ::arrow::list(element_field); auto list_field = ::arrow::field( std::string(kListFieldName), list_type, /*nullable=*/false, ::arrow::key_value_metadata(std::unordered_map{ - {std::string(kFieldIdKey), std::to_string(kListFieldId)}})); + {std::string(kParquetFieldIdKey), std::to_string(kListFieldId)}})); auto arrow_schema = ::arrow::schema({list_field}); ArrowSchema exported_schema; @@ -410,16 +411,16 @@ TEST(FromArrowSchemaTest, MapType) { auto key_field = ::arrow::field( std::string(kKeyFieldName), ::arrow::utf8(), /*nullable=*/false, ::arrow::key_value_metadata(std::unordered_map{ - {std::string(kFieldIdKey), std::to_string(kKeyFieldId)}})); + {std::string(kParquetFieldIdKey), std::to_string(kKeyFieldId)}})); auto value_field = ::arrow::field( std::string(kValueFieldName), ::arrow::int32(), /*nullable=*/true, ::arrow::key_value_metadata(std::unordered_map{ - {std::string(kFieldIdKey), std::to_string(kValueFieldId)}})); + {std::string(kParquetFieldIdKey), std::to_string(kValueFieldId)}})); auto map_type = std::make_shared<::arrow::MapType>(key_field, value_field); auto map_field = ::arrow::field( std::string(kMapFieldName), map_type, /*nullable=*/true, ::arrow::key_value_metadata(std::unordered_map{ - {std::string(kFieldIdKey), std::to_string(kFieldId)}})); + {std::string(kParquetFieldIdKey), std::to_string(kFieldId)}})); auto arrow_schema = ::arrow::schema({map_field}); ArrowSchema exported_schema; diff --git a/test/parquet_test.cc b/test/parquet_test.cc index 8a2679b4e..ada6d73ff 100644 --- a/test/parquet_test.cc +++ b/test/parquet_test.cc @@ -26,20 +26,19 @@ #include #include #include -#include -#include -#include -#include -#include -#include #include #include #include #include "iceberg/arrow/arrow_error_transform_internal.h" #include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/file_reader.h" +#include "iceberg/file_writer.h" #include "iceberg/parquet/parquet_register.h" +#include "iceberg/result.h" #include "iceberg/schema.h" +#include "iceberg/schema_field.h" +#include "iceberg/schema_internal.h" #include "iceberg/type.h" #include "iceberg/util/checked_cast.h" #include "iceberg/util/macros.h" @@ -51,15 +50,11 @@ namespace { Status WriteTable(std::shared_ptr<::arrow::Array> data, const WriterOptions& writer_options) { - ArrowArray arr; - ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportArray(*data, &arr)); - ICEBERG_ASSIGN_OR_RAISE( auto writer, WriterFactoryRegistry::Open(FileFormatType::kParquet, writer_options)); - { - internal::ArrowArrayGuard guard(&arr); - ICEBERG_RETURN_UNEXPECTED(writer->Write(arr)); - } + ArrowArray arr; + ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportArray(*data, &arr)); + ICEBERG_RETURN_UNEXPECTED(writer->Write(arr)); return writer->Close(); } @@ -323,13 +318,10 @@ TEST_F(ParquetReadWrite, SimpleStructRoundTrip) { ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk()); auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie(); - auto array = - ::arrow::json::ArrayFromJSONString(::arrow::struct_(arrow_schema->fields()), - R"([[{"a1": 1, "a2": "abc"}], - [{"a1": 0}], - [{"a2": "edf"}], - [{}]])") - .ValueOrDie(); + auto array = ::arrow::json::ArrayFromJSONString( + ::arrow::struct_(arrow_schema->fields()), + R"([[{"a1": 1, "a2": "abc"}], [{"a1": 0}], [{"a2": "edf"}], [{}]])") + .ValueOrDie(); std::shared_ptr<::arrow::Array> out; DoRoundtrip(array, schema, &out); @@ -353,12 +345,11 @@ TEST_F(ParquetReadWrite, SimpleTypeRoundTrip) { ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk()); auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie(); - auto array = ::arrow::json::ArrayFromJSONString( - ::arrow::struct_(arrow_schema->fields()), - R"([[true, 1, 2, 1.1, 1.2, "abc", 44614000, 1756696503000000], - [false, 0, 0, 0, 0, "", 0, 0], - [null, null, null, null, null, null, null, null]])") - .ValueOrDie(); + auto array = + ::arrow::json::ArrayFromJSONString( + ::arrow::struct_(arrow_schema->fields()), + R"([[true, 1, 2, 1.1, 1.2, "abc", 44614000, 1756696503000000], [false, 0, 0, 0, 0, "", 0, 0], [null, null, null, null, null, null, null, null]])") + .ValueOrDie(); std::shared_ptr<::arrow::Array> out; DoRoundtrip(array, schema, &out); From 5e630c4efc06fea630a4911c4f1c15587d3ba8fa Mon Sep 17 00:00:00 2001 From: Zehua Zou <41586196+HuaHuaY@users.noreply.github.com> Date: Mon, 1 Sep 2025 19:38:16 +0800 Subject: [PATCH 4/9] implement length() and split_offsets() --- src/iceberg/parquet/parquet_writer.cc | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/src/iceberg/parquet/parquet_writer.cc b/src/iceberg/parquet/parquet_writer.cc index afd4e0a11..db1846e10 100644 --- a/src/iceberg/parquet/parquet_writer.cc +++ b/src/iceberg/parquet/parquet_writer.cc @@ -90,14 +90,25 @@ class ParquetWriter::Impl { return {}; // Already closed } + auto& metadata = writer_->metadata(); + split_offsets_.reserve(metadata->num_row_groups()); + for (int i = 0; i < metadata->num_row_groups(); ++i) { + split_offsets_.push_back(metadata->RowGroup(i)->file_offset()); + } ICEBERG_ARROW_RETURN_NOT_OK(writer_->Close()); writer_.reset(); + + ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_, output_stream_->Tell()); ICEBERG_ARROW_RETURN_NOT_OK(output_stream_->Close()); return {}; } bool Closed() const { return writer_ == nullptr; } + int64_t length() const { return total_bytes_; } + + std::vector split_offsets() const { return split_offsets_; } + private: // TODO(gangwu): make memory pool configurable ::arrow::MemoryPool* pool_ = ::arrow::default_memory_pool(); @@ -107,6 +118,10 @@ class ParquetWriter::Impl { std::shared_ptr<::arrow::io::OutputStream> output_stream_; // Parquet file writer to write ArrowArray. std::unique_ptr<::parquet::arrow::FileWriter> writer_; + // Total length of the written Parquet file. + int64_t total_bytes_; + // Row group start offsets in the Parquet file. + std::vector split_offsets_; }; ParquetWriter::~ParquetWriter() = default; @@ -131,14 +146,14 @@ std::optional ParquetWriter::length() { if (!impl_->Closed()) { return std::nullopt; } - return {}; + return impl_->length(); } std::vector ParquetWriter::split_offsets() { if (!impl_->Closed()) { return {}; } - return {}; + return impl_->split_offsets(); } void RegisterWriter() { From e7b2bbf062e3b41830c3d8744ec3d82a6e40760b Mon Sep 17 00:00:00 2001 From: Zehua Zou <41586196+HuaHuaY@users.noreply.github.com> Date: Mon, 1 Sep 2025 19:56:08 +0800 Subject: [PATCH 5/9] fix length() and pass length() to reader_options --- src/iceberg/parquet/parquet_writer.cc | 2 +- test/parquet_test.cc | 28 ++++++++++++++++++--------- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/src/iceberg/parquet/parquet_writer.cc b/src/iceberg/parquet/parquet_writer.cc index db1846e10..c782a3d6e 100644 --- a/src/iceberg/parquet/parquet_writer.cc +++ b/src/iceberg/parquet/parquet_writer.cc @@ -90,12 +90,12 @@ class ParquetWriter::Impl { return {}; // Already closed } + ICEBERG_ARROW_RETURN_NOT_OK(writer_->Close()); auto& metadata = writer_->metadata(); split_offsets_.reserve(metadata->num_row_groups()); for (int i = 0; i < metadata->num_row_groups(); ++i) { split_offsets_.push_back(metadata->RowGroup(i)->file_offset()); } - ICEBERG_ARROW_RETURN_NOT_OK(writer_->Close()); writer_.reset(); ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_, output_stream_->Tell()); diff --git a/test/parquet_test.cc b/test/parquet_test.cc index ada6d73ff..b7f966452 100644 --- a/test/parquet_test.cc +++ b/test/parquet_test.cc @@ -48,14 +48,18 @@ namespace iceberg::parquet { namespace { +Status WriteTableInner(Writer& writer, std::shared_ptr<::arrow::Array> data) { + ArrowArray arr; + ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportArray(*data, &arr)); + ICEBERG_RETURN_UNEXPECTED(writer.Write(arr)); + return writer.Close(); +} + Status WriteTable(std::shared_ptr<::arrow::Array> data, const WriterOptions& writer_options) { ICEBERG_ASSIGN_OR_RAISE( auto writer, WriterFactoryRegistry::Open(FileFormatType::kParquet, writer_options)); - ArrowArray arr; - ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportArray(*data, &arr)); - ICEBERG_RETURN_UNEXPECTED(writer->Write(arr)); - return writer->Close(); + return WriteTableInner(*writer, data); } Status ReadTable(std::optional>* out, @@ -82,13 +86,19 @@ void DoRoundtrip(std::shared_ptr<::arrow::Array> data, std::shared_ptr s std::shared_ptr file_io = arrow::ArrowFileSystemFileIO::MakeMockFileIO(); const std::string basePath = "base.parquet"; - ASSERT_THAT(WriteTable(data, {.path = basePath, .schema = schema, .io = file_io}), - IsOk()); + auto writer_data = WriterFactoryRegistry::Open( + FileFormatType::kParquet, {.path = basePath, .schema = schema, .io = file_io}); + ASSERT_THAT(writer_data, IsOk()) + << "Failed to create writer: " << writer_data.error().message; + auto writer = std::move(writer_data.value()); + ASSERT_THAT(WriteTableInner(*writer, data), IsOk()); std::optional> out_array; - ASSERT_THAT( - ReadTable(&out_array, {.path = basePath, .io = file_io, .projection = schema}), - IsOk()); + ASSERT_THAT(ReadTable(&out_array, {.path = basePath, + .length = writer->length(), + .io = file_io, + .projection = schema}), + IsOk()); ASSERT_TRUE(out_array.has_value()) << "Reader.Next() returned no data"; *out = std::move(out_array.value()); From 02a05ccb0d20e2c982e15a3285b4a6c898ceff39 Mon Sep 17 00:00:00 2001 From: Zehua Zou <41586196+HuaHuaY@users.noreply.github.com> Date: Tue, 2 Sep 2025 10:27:53 +0800 Subject: [PATCH 6/9] fix review --- test/parquet_test.cc | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/test/parquet_test.cc b/test/parquet_test.cc index b7f966452..923a9fedc 100644 --- a/test/parquet_test.cc +++ b/test/parquet_test.cc @@ -62,27 +62,27 @@ Status WriteTable(std::shared_ptr<::arrow::Array> data, return WriteTableInner(*writer, data); } -Status ReadTable(std::optional>* out, +Status ReadTable(std::shared_ptr<::arrow::Array>& out, const ReaderOptions& reader_options) { ICEBERG_ASSIGN_OR_RAISE( auto reader, ReaderFactoryRegistry::Open(FileFormatType::kParquet, reader_options)); ICEBERG_ASSIGN_OR_RAISE(auto read_data, reader->Next()); if (!read_data.has_value()) { - *out = std::nullopt; + out = nullptr; return {}; } auto arrow_c_array = read_data.value(); ArrowSchema arrow_schema; ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*reader_options.projection, &arrow_schema)); - ICEBERG_ARROW_ASSIGN_OR_RETURN(*out, + ICEBERG_ARROW_ASSIGN_OR_RETURN(out, ::arrow::ImportArray(&arrow_c_array, &arrow_schema)); return {}; } void DoRoundtrip(std::shared_ptr<::arrow::Array> data, std::shared_ptr schema, - std::shared_ptr<::arrow::Array>* out) { + std::shared_ptr<::arrow::Array>& out) { std::shared_ptr file_io = arrow::ArrowFileSystemFileIO::MakeMockFileIO(); const std::string basePath = "base.parquet"; @@ -93,15 +93,13 @@ void DoRoundtrip(std::shared_ptr<::arrow::Array> data, std::shared_ptr s auto writer = std::move(writer_data.value()); ASSERT_THAT(WriteTableInner(*writer, data), IsOk()); - std::optional> out_array; - ASSERT_THAT(ReadTable(&out_array, {.path = basePath, - .length = writer->length(), - .io = file_io, - .projection = schema}), + ASSERT_THAT(ReadTable(out, {.path = basePath, + .length = writer->length(), + .io = file_io, + .projection = schema}), IsOk()); - ASSERT_TRUE(out_array.has_value()) << "Reader.Next() returned no data"; - *out = std::move(out_array.value()); + ASSERT_TRUE(out != nullptr) << "Reader.Next() returned no data"; } } // namespace @@ -334,7 +332,7 @@ TEST_F(ParquetReadWrite, SimpleStructRoundTrip) { .ValueOrDie(); std::shared_ptr<::arrow::Array> out; - DoRoundtrip(array, schema, &out); + DoRoundtrip(array, schema, out); ASSERT_TRUE(out->Equals(*array)); } @@ -362,7 +360,7 @@ TEST_F(ParquetReadWrite, SimpleTypeRoundTrip) { .ValueOrDie(); std::shared_ptr<::arrow::Array> out; - DoRoundtrip(array, schema, &out); + DoRoundtrip(array, schema, out); ASSERT_TRUE(out->Equals(*array)); } From 117b480764b710d3b4bf96fe436dfc4f4ddeefd8 Mon Sep 17 00:00:00 2001 From: Zehua Zou <41586196+HuaHuaY@users.noreply.github.com> Date: Tue, 2 Sep 2025 11:21:34 +0800 Subject: [PATCH 7/9] fix review --- src/iceberg/arrow/arrow_error_transform_internal.h | 2 ++ test/parquet_test.cc | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/iceberg/arrow/arrow_error_transform_internal.h b/src/iceberg/arrow/arrow_error_transform_internal.h index cf64892f5..a19b2f992 100644 --- a/src/iceberg/arrow/arrow_error_transform_internal.h +++ b/src/iceberg/arrow/arrow_error_transform_internal.h @@ -30,6 +30,8 @@ inline ErrorKind ToErrorKind(const ::arrow::Status& status) { switch (status.code()) { case ::arrow::StatusCode::IOError: return ErrorKind::kIOError; + case ::arrow::StatusCode::NotImplemented: + return ErrorKind::kNotImplemented; default: return ErrorKind::kUnknownError; } diff --git a/test/parquet_test.cc b/test/parquet_test.cc index 923a9fedc..3b8e06a83 100644 --- a/test/parquet_test.cc +++ b/test/parquet_test.cc @@ -310,7 +310,7 @@ TEST_F(ParquetReadWrite, EmptyStruct) { const std::string basePath = "base.parquet"; ASSERT_THAT(WriteTable(array, {.path = basePath, .schema = schema, .io = file_io}), - IsError(ErrorKind::kUnknownError)); + IsError(ErrorKind::kNotImplemented)); } TEST_F(ParquetReadWrite, SimpleStructRoundTrip) { From a036f8258337daa60bae13464f63f42af3ad6c14 Mon Sep 17 00:00:00 2001 From: Zehua Zou <41586196+HuaHuaY@users.noreply.github.com> Date: Tue, 2 Sep 2025 18:10:28 +0800 Subject: [PATCH 8/9] fix review --- src/iceberg/parquet/parquet_writer.cc | 9 ++++---- test/parquet_test.cc | 30 +++++++++++++-------------- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/src/iceberg/parquet/parquet_writer.cc b/src/iceberg/parquet/parquet_writer.cc index c782a3d6e..37e25a0aa 100644 --- a/src/iceberg/parquet/parquet_writer.cc +++ b/src/iceberg/parquet/parquet_writer.cc @@ -67,10 +67,11 @@ class ParquetWriter::Impl { schema_descriptor->schema_root()); ICEBERG_ASSIGN_OR_RAISE(output_stream_, OpenOutputStream(options)); - auto file_writer = ::parquet::ParquetFileWriter::Open(output_stream_, schema_node, - writer_properties); - ICEBERG_ARROW_RETURN_NOT_OK(::parquet::arrow::FileWriter::Make( - pool_, std::move(file_writer), arrow_schema_, arrow_writer_properties, &writer_)); + auto file_writer = ::parquet::ParquetFileWriter::Open( + output_stream_, std::move(schema_node), std::move(writer_properties)); + ICEBERG_ARROW_RETURN_NOT_OK( + ::parquet::arrow::FileWriter::Make(pool_, std::move(file_writer), arrow_schema_, + std::move(arrow_writer_properties), &writer_)); return {}; } diff --git a/test/parquet_test.cc b/test/parquet_test.cc index 3b8e06a83..c7158508a 100644 --- a/test/parquet_test.cc +++ b/test/parquet_test.cc @@ -48,21 +48,21 @@ namespace iceberg::parquet { namespace { -Status WriteTableInner(Writer& writer, std::shared_ptr<::arrow::Array> data) { +Status WriteArrayInner(Writer& writer, std::shared_ptr<::arrow::Array> data) { ArrowArray arr; ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportArray(*data, &arr)); ICEBERG_RETURN_UNEXPECTED(writer.Write(arr)); return writer.Close(); } -Status WriteTable(std::shared_ptr<::arrow::Array> data, +Status WriteArray(std::shared_ptr<::arrow::Array> data, const WriterOptions& writer_options) { ICEBERG_ASSIGN_OR_RAISE( auto writer, WriterFactoryRegistry::Open(FileFormatType::kParquet, writer_options)); - return WriteTableInner(*writer, data); + return WriteArrayInner(*writer, data); } -Status ReadTable(std::shared_ptr<::arrow::Array>& out, +Status ReadArray(std::shared_ptr<::arrow::Array>& out, const ReaderOptions& reader_options) { ICEBERG_ASSIGN_OR_RAISE( auto reader, ReaderFactoryRegistry::Open(FileFormatType::kParquet, reader_options)); @@ -74,8 +74,7 @@ Status ReadTable(std::shared_ptr<::arrow::Array>& out, } auto arrow_c_array = read_data.value(); - ArrowSchema arrow_schema; - ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*reader_options.projection, &arrow_schema)); + ICEBERG_ASSIGN_OR_RAISE(ArrowSchema arrow_schema, reader->Schema()); ICEBERG_ARROW_ASSIGN_OR_RETURN(out, ::arrow::ImportArray(&arrow_c_array, &arrow_schema)); return {}; @@ -91,9 +90,9 @@ void DoRoundtrip(std::shared_ptr<::arrow::Array> data, std::shared_ptr s ASSERT_THAT(writer_data, IsOk()) << "Failed to create writer: " << writer_data.error().message; auto writer = std::move(writer_data.value()); - ASSERT_THAT(WriteTableInner(*writer, data), IsOk()); + ASSERT_THAT(WriteArrayInner(*writer, data), IsOk()); - ASSERT_THAT(ReadTable(out, {.path = basePath, + ASSERT_THAT(ReadArray(out, {.path = basePath, .length = writer->length(), .io = file_io, .projection = schema}), @@ -127,7 +126,7 @@ class ParquetReaderTest : public ::testing::Test { R"([[1, "Foo"],[2, "Bar"],[3, "Baz"]])") .ValueOrDie(); - ASSERT_TRUE(WriteTable( + ASSERT_TRUE(WriteArray( array, {.path = temp_parquet_file_, .schema = schema, .io = file_io_})); } @@ -309,7 +308,7 @@ TEST_F(ParquetReadWrite, EmptyStruct) { std::shared_ptr file_io = arrow::ArrowFileSystemFileIO::MakeMockFileIO(); const std::string basePath = "base.parquet"; - ASSERT_THAT(WriteTable(array, {.path = basePath, .schema = schema, .io = file_io}), + ASSERT_THAT(WriteArray(array, {.path = basePath, .schema = schema, .io = file_io}), IsError(ErrorKind::kNotImplemented)); } @@ -353,11 +352,12 @@ TEST_F(ParquetReadWrite, SimpleTypeRoundTrip) { ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk()); auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie(); - auto array = - ::arrow::json::ArrayFromJSONString( - ::arrow::struct_(arrow_schema->fields()), - R"([[true, 1, 2, 1.1, 1.2, "abc", 44614000, 1756696503000000], [false, 0, 0, 0, 0, "", 0, 0], [null, null, null, null, null, null, null, null]])") - .ValueOrDie(); + auto array = ::arrow::json::ArrayFromJSONString( + ::arrow::struct_(arrow_schema->fields()), + R"([[true, 1, 2, 1.1, 1.2, "abc", 44614000, 1756696503000000], + [false, 0, 0, 0, 0, "", 0, 0], + [null, null, null, null, null, null, null, null]])") + .ValueOrDie(); std::shared_ptr<::arrow::Array> out; DoRoundtrip(array, schema, out); From 30d1559532d754e24a6d8a8f0b0c650312cf479c Mon Sep 17 00:00:00 2001 From: Zehua Zou <41586196+HuaHuaY@users.noreply.github.com> Date: Wed, 3 Sep 2025 13:33:10 +0800 Subject: [PATCH 9/9] fix review --- src/iceberg/parquet/parquet_writer.cc | 2 +- test/parquet_test.cc | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/iceberg/parquet/parquet_writer.cc b/src/iceberg/parquet/parquet_writer.cc index 37e25a0aa..2d2cd5406 100644 --- a/src/iceberg/parquet/parquet_writer.cc +++ b/src/iceberg/parquet/parquet_writer.cc @@ -120,7 +120,7 @@ class ParquetWriter::Impl { // Parquet file writer to write ArrowArray. std::unique_ptr<::parquet::arrow::FileWriter> writer_; // Total length of the written Parquet file. - int64_t total_bytes_; + int64_t total_bytes_{0}; // Row group start offsets in the Parquet file. std::vector split_offsets_; }; diff --git a/test/parquet_test.cc b/test/parquet_test.cc index c7158508a..0c42b8463 100644 --- a/test/parquet_test.cc +++ b/test/parquet_test.cc @@ -48,7 +48,7 @@ namespace iceberg::parquet { namespace { -Status WriteArrayInner(Writer& writer, std::shared_ptr<::arrow::Array> data) { +Status WriteArray(std::shared_ptr<::arrow::Array> data, Writer& writer) { ArrowArray arr; ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportArray(*data, &arr)); ICEBERG_RETURN_UNEXPECTED(writer.Write(arr)); @@ -59,7 +59,7 @@ Status WriteArray(std::shared_ptr<::arrow::Array> data, const WriterOptions& writer_options) { ICEBERG_ASSIGN_OR_RAISE( auto writer, WriterFactoryRegistry::Open(FileFormatType::kParquet, writer_options)); - return WriteArrayInner(*writer, data); + return WriteArray(data, *writer); } Status ReadArray(std::shared_ptr<::arrow::Array>& out, @@ -90,7 +90,7 @@ void DoRoundtrip(std::shared_ptr<::arrow::Array> data, std::shared_ptr s ASSERT_THAT(writer_data, IsOk()) << "Failed to create writer: " << writer_data.error().message; auto writer = std::move(writer_data.value()); - ASSERT_THAT(WriteArrayInner(*writer, data), IsOk()); + ASSERT_THAT(WriteArray(data, *writer), IsOk()); ASSERT_THAT(ReadArray(out, {.path = basePath, .length = writer->length(),