From 657f22f0601f1ad7bc1f80fb9a68681906eccb4d Mon Sep 17 00:00:00 2001 From: Xinli Shang Date: Sun, 28 Dec 2025 13:17:55 -0800 Subject: [PATCH 1/6] feat: add FileWriter base interface for data file writers Add iceberg/data subdirectory with FileWriter base interface that defines common operations for writing Iceberg data files, including data files, equality delete files, and position delete files. - Add FileWriter interface with Write, Length, Close, and Metadata methods - Add WriteResult struct to hold metadata for produced files - Add comprehensive unit tests with MockFileWriter implementation - Update build system to include new data subdirectory Related to #441 task 1 --- src/iceberg/CMakeLists.txt | 2 + src/iceberg/data/CMakeLists.txt | 18 +++ src/iceberg/data/writer.cc | 27 ++++ src/iceberg/data/writer.h | 91 +++++++++++ src/iceberg/test/CMakeLists.txt | 2 + src/iceberg/test/data_writer_test.cc | 218 +++++++++++++++++++++++++++ 6 files changed, 358 insertions(+) create mode 100644 src/iceberg/data/CMakeLists.txt create mode 100644 src/iceberg/data/writer.cc create mode 100644 src/iceberg/data/writer.h create mode 100644 src/iceberg/test/data_writer_test.cc diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 519757d26..a65bb5b37 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -20,6 +20,7 @@ set(ICEBERG_INCLUDES "$" set(ICEBERG_SOURCES arrow_c_data_guard_internal.cc catalog/memory/in_memory_catalog.cc + data/writer.cc expression/aggregate.cc expression/binder.cc expression/evaluator.cc @@ -142,6 +143,7 @@ add_iceberg_lib(iceberg iceberg_install_all_headers(iceberg) add_subdirectory(catalog) +add_subdirectory(data) add_subdirectory(expression) add_subdirectory(manifest) add_subdirectory(row) diff --git a/src/iceberg/data/CMakeLists.txt b/src/iceberg/data/CMakeLists.txt new file mode 100644 index 000000000..e50b8b989 --- /dev/null +++ b/src/iceberg/data/CMakeLists.txt @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +iceberg_install_all_headers(iceberg/data) diff --git a/src/iceberg/data/writer.cc b/src/iceberg/data/writer.cc new file mode 100644 index 000000000..4a18c8053 --- /dev/null +++ b/src/iceberg/data/writer.cc @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/data/writer.h" + +namespace iceberg { + +// FileWriter is a pure virtual interface class. +// Implementations will be provided in subsequent tasks. + +} // namespace iceberg diff --git a/src/iceberg/data/writer.h b/src/iceberg/data/writer.h new file mode 100644 index 000000000..8b18eeafe --- /dev/null +++ b/src/iceberg/data/writer.h @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/data/writer.h +/// Base interface for Iceberg data file writers. + +#include +#include +#include + +#include "iceberg/arrow_c_data.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/result.h" + +namespace iceberg { + +/// \brief Base interface for data file writers. +/// +/// This interface defines the common operations for writing Iceberg data files, +/// including data files, equality delete files, and position delete files. +/// +/// Typical usage: +/// 1. Create a writer instance (via concrete implementation) +/// 2. Call Write() one or more times to write data +/// 3. Call Close() to finalize the file +/// 4. Call Metadata() to get file metadata (only valid after Close()) +/// +/// \note This interface is not thread-safe. Concurrent calls to Write() +/// from multiple threads on the same instance are not supported. +/// +/// \note This interface uses PascalCase method naming (Write, Length, Close, Metadata) +/// to distinguish it from the lower-level iceberg/file_writer.h::Writer interface which +/// uses lowercase naming. FileWriter is the Iceberg-specific data file writer abstraction, +/// while Writer is the file format-level abstraction. +class ICEBERG_EXPORT FileWriter { + public: + virtual ~FileWriter() = default; + + /// \brief Write a batch of records. + /// + /// \param data Arrow array containing the records to write. + /// \return Status indicating success or failure. + virtual Status Write(ArrowArray* data) = 0; + + /// \brief Get the current number of bytes written. + /// + /// \return Result containing the number of bytes written or an error. + virtual Result Length() const = 0; + + /// \brief Close the writer and finalize the file. + /// + /// \return Status indicating success or failure. + virtual Status Close() = 0; + + /// \brief File metadata for all files produced by the writer. + struct ICEBERG_EXPORT WriteResult { + /// Usually a writer produces a single data or delete file. + /// Position delete writer may produce multiple file-scoped delete files. + /// In the future, multiple files can be produced if file rolling is supported. + std::vector> data_files; + }; + + /// \brief Get file metadata for all files produced by this writer. + /// + /// This method should be called after Close() to retrieve the metadata + /// for all files written by this writer. + /// + /// \return Result containing the write result or an error. + virtual Result Metadata() = 0; +}; + +} // namespace iceberg diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 4b2c0f473..14dd808b3 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -113,6 +113,8 @@ add_iceberg_test(util_test add_iceberg_test(roaring_test SOURCES roaring_test.cc) +add_iceberg_test(data_writer_test SOURCES data_writer_test.cc) + if(ICEBERG_BUILD_BUNDLE) add_iceberg_test(avro_test USE_BUNDLE diff --git a/src/iceberg/test/data_writer_test.cc b/src/iceberg/test/data_writer_test.cc new file mode 100644 index 000000000..6df8929fa --- /dev/null +++ b/src/iceberg/test/data_writer_test.cc @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/data/writer.h" + +#include +#include + +#include +#include + +#include "iceberg/arrow_c_data.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/result.h" +#include "iceberg/test/matchers.h" + +namespace iceberg { + +// Mock implementation of FileWriter for testing +class MockFileWriter : public FileWriter { + public: + MockFileWriter() : bytes_written_(0), is_closed_(false), write_count_(0) {} + + Status Write(ArrowArray* data) override { + if (is_closed_) { + return Invalid("Writer is closed"); + } + if (data == nullptr) { + return Invalid("Null data provided"); + } + write_count_++; + // Simulate writing some bytes + bytes_written_ += 1024; + return {}; + } + + Result Length() const override { return bytes_written_; } + + Status Close() override { + if (is_closed_) { + return Invalid("Writer already closed"); + } + is_closed_ = true; + return {}; + } + + Result Metadata() override { + if (!is_closed_) { + return Invalid("Writer must be closed before getting metadata"); + } + + WriteResult result; + auto data_file = std::make_shared(); + data_file->file_path = "/test/data/file.parquet"; + data_file->file_format = FileFormatType::kParquet; + data_file->record_count = write_count_ * 100; + data_file->file_size_in_bytes = bytes_written_; + result.data_files.push_back(data_file); + + return result; + } + + bool is_closed() const { return is_closed_; } + int32_t write_count() const { return write_count_; } + + private: + int64_t bytes_written_; + bool is_closed_; + int32_t write_count_; +}; + +TEST(FileWriterTest, BasicWriteOperation) { + MockFileWriter writer; + + // Create a dummy ArrowArray (normally this would contain actual data) + ArrowArray dummy_array = {}; + + ASSERT_THAT(writer.Write(&dummy_array), IsOk()); + ASSERT_EQ(writer.write_count(), 1); + + auto length_result = writer.Length(); + ASSERT_THAT(length_result, IsOk()); + ASSERT_EQ(*length_result, 1024); +} + +TEST(FileWriterTest, MultipleWrites) { + MockFileWriter writer; + ArrowArray dummy_array = {}; + + // Write multiple times + for (int i = 0; i < 5; i++) { + ASSERT_THAT(writer.Write(&dummy_array), IsOk()); + } + + ASSERT_EQ(writer.write_count(), 5); + + auto length_result = writer.Length(); + ASSERT_THAT(length_result, IsOk()); + ASSERT_EQ(*length_result, 5120); // 5 * 1024 +} + +TEST(FileWriterTest, WriteNullData) { + MockFileWriter writer; + + auto status = writer.Write(nullptr); + ASSERT_THAT(status, HasErrorMessage("Null data provided")); +} + +TEST(FileWriterTest, CloseWriter) { + MockFileWriter writer; + ArrowArray dummy_array = {}; + + ASSERT_THAT(writer.Write(&dummy_array), IsOk()); + ASSERT_FALSE(writer.is_closed()); + + ASSERT_THAT(writer.Close(), IsOk()); + ASSERT_TRUE(writer.is_closed()); +} + +TEST(FileWriterTest, DoubleClose) { + MockFileWriter writer; + + ASSERT_THAT(writer.Close(), IsOk()); + auto status = writer.Close(); + ASSERT_THAT(status, HasErrorMessage("Writer already closed")); +} + +TEST(FileWriterTest, WriteAfterClose) { + MockFileWriter writer; + ArrowArray dummy_array = {}; + + ASSERT_THAT(writer.Close(), IsOk()); + + auto status = writer.Write(&dummy_array); + ASSERT_THAT(status, HasErrorMessage("Writer is closed")); +} + +TEST(FileWriterTest, MetadataBeforeClose) { + MockFileWriter writer; + ArrowArray dummy_array = {}; + + ASSERT_THAT(writer.Write(&dummy_array), IsOk()); + + auto metadata_result = writer.Metadata(); + ASSERT_THAT(metadata_result, HasErrorMessage("Writer must be closed before getting metadata")); +} + +TEST(FileWriterTest, MetadataAfterClose) { + MockFileWriter writer; + ArrowArray dummy_array = {}; + + // Write some data + ASSERT_THAT(writer.Write(&dummy_array), IsOk()); + ASSERT_THAT(writer.Write(&dummy_array), IsOk()); + ASSERT_THAT(writer.Write(&dummy_array), IsOk()); + + // Close the writer + ASSERT_THAT(writer.Close(), IsOk()); + + // Get metadata + auto metadata_result = writer.Metadata(); + ASSERT_THAT(metadata_result, IsOk()); + + const auto& result = *metadata_result; + ASSERT_EQ(result.data_files.size(), 1); + + const auto& data_file = result.data_files[0]; + ASSERT_EQ(data_file->file_path, "/test/data/file.parquet"); + ASSERT_EQ(data_file->file_format, FileFormatType::kParquet); + ASSERT_EQ(data_file->record_count, 300); // 3 writes * 100 records + ASSERT_EQ(data_file->file_size_in_bytes, 3072); // 3 * 1024 +} + +TEST(FileWriterTest, WriteResultStructure) { + FileWriter::WriteResult result; + + // Test that WriteResult can hold multiple data files + auto data_file1 = std::make_shared(); + data_file1->file_path = "/test/data/file1.parquet"; + data_file1->record_count = 100; + + auto data_file2 = std::make_shared(); + data_file2->file_path = "/test/data/file2.parquet"; + data_file2->record_count = 200; + + result.data_files.push_back(data_file1); + result.data_files.push_back(data_file2); + + ASSERT_EQ(result.data_files.size(), 2); + ASSERT_EQ(result.data_files[0]->file_path, "/test/data/file1.parquet"); + ASSERT_EQ(result.data_files[0]->record_count, 100); + ASSERT_EQ(result.data_files[1]->file_path, "/test/data/file2.parquet"); + ASSERT_EQ(result.data_files[1]->record_count, 200); +} + +TEST(FileWriterTest, EmptyWriteResult) { + FileWriter::WriteResult result; + ASSERT_EQ(result.data_files.size(), 0); + ASSERT_TRUE(result.data_files.empty()); +} + +} // namespace iceberg From 5cae02953c26c9315cd1f9acfbe6f5c4eef43dec Mon Sep 17 00:00:00 2001 From: Xinli Shang Date: Sun, 28 Dec 2025 13:38:20 -0800 Subject: [PATCH 2/6] fix: apply clang-format and use default member initializers - Use default member initializers in MockFileWriter - Fix include ordering per clang-format - Fix line wrapping in comments and assertions --- src/iceberg/data/writer.h | 4 ++-- src/iceberg/test/data_writer_test.cc | 16 ++++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/iceberg/data/writer.h b/src/iceberg/data/writer.h index 8b18eeafe..9d1942fde 100644 --- a/src/iceberg/data/writer.h +++ b/src/iceberg/data/writer.h @@ -49,8 +49,8 @@ namespace iceberg { /// /// \note This interface uses PascalCase method naming (Write, Length, Close, Metadata) /// to distinguish it from the lower-level iceberg/file_writer.h::Writer interface which -/// uses lowercase naming. FileWriter is the Iceberg-specific data file writer abstraction, -/// while Writer is the file format-level abstraction. +/// uses lowercase naming. FileWriter is the Iceberg-specific data file writer +/// abstraction, while Writer is the file format-level abstraction. class ICEBERG_EXPORT FileWriter { public: virtual ~FileWriter() = default; diff --git a/src/iceberg/test/data_writer_test.cc b/src/iceberg/test/data_writer_test.cc index 6df8929fa..df7ea9d89 100644 --- a/src/iceberg/test/data_writer_test.cc +++ b/src/iceberg/test/data_writer_test.cc @@ -17,8 +17,6 @@ * under the License. */ -#include "iceberg/data/writer.h" - #include #include @@ -26,6 +24,7 @@ #include #include "iceberg/arrow_c_data.h" +#include "iceberg/data/writer.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/result.h" #include "iceberg/test/matchers.h" @@ -35,7 +34,7 @@ namespace iceberg { // Mock implementation of FileWriter for testing class MockFileWriter : public FileWriter { public: - MockFileWriter() : bytes_written_(0), is_closed_(false), write_count_(0) {} + MockFileWriter() = default; Status Write(ArrowArray* data) override { if (is_closed_) { @@ -80,9 +79,9 @@ class MockFileWriter : public FileWriter { int32_t write_count() const { return write_count_; } private: - int64_t bytes_written_; - bool is_closed_; - int32_t write_count_; + int64_t bytes_written_ = 0; + bool is_closed_ = false; + int32_t write_count_ = 0; }; TEST(FileWriterTest, BasicWriteOperation) { @@ -158,7 +157,8 @@ TEST(FileWriterTest, MetadataBeforeClose) { ASSERT_THAT(writer.Write(&dummy_array), IsOk()); auto metadata_result = writer.Metadata(); - ASSERT_THAT(metadata_result, HasErrorMessage("Writer must be closed before getting metadata")); + ASSERT_THAT(metadata_result, + HasErrorMessage("Writer must be closed before getting metadata")); } TEST(FileWriterTest, MetadataAfterClose) { @@ -183,7 +183,7 @@ TEST(FileWriterTest, MetadataAfterClose) { const auto& data_file = result.data_files[0]; ASSERT_EQ(data_file->file_path, "/test/data/file.parquet"); ASSERT_EQ(data_file->file_format, FileFormatType::kParquet); - ASSERT_EQ(data_file->record_count, 300); // 3 writes * 100 records + ASSERT_EQ(data_file->record_count, 300); // 3 writes * 100 records ASSERT_EQ(data_file->file_size_in_bytes, 3072); // 3 * 1024 } From 994e7da3f9520ce23b8610bafddb66427ced6b4b Mon Sep 17 00:00:00 2001 From: Xinli Shang Date: Mon, 29 Dec 2025 10:06:36 -0800 Subject: [PATCH 3/6] Update src/iceberg/data/writer.h Co-authored-by: Gang Wu --- src/iceberg/data/writer.h | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/iceberg/data/writer.h b/src/iceberg/data/writer.h index 9d1942fde..9876f8cbf 100644 --- a/src/iceberg/data/writer.h +++ b/src/iceberg/data/writer.h @@ -46,11 +46,6 @@ namespace iceberg { /// /// \note This interface is not thread-safe. Concurrent calls to Write() /// from multiple threads on the same instance are not supported. -/// -/// \note This interface uses PascalCase method naming (Write, Length, Close, Metadata) -/// to distinguish it from the lower-level iceberg/file_writer.h::Writer interface which -/// uses lowercase naming. FileWriter is the Iceberg-specific data file writer -/// abstraction, while Writer is the file format-level abstraction. class ICEBERG_EXPORT FileWriter { public: virtual ~FileWriter() = default; From 0b5f88083e0c514659f957c741df56b0ba4a8300 Mon Sep 17 00:00:00 2001 From: Xinli Shang Date: Mon, 29 Dec 2025 10:11:12 -0800 Subject: [PATCH 4/6] build: move data_writer_test to ICEBERG_BUILD_BUNDLE block The test depends on Avro and Parquet libraries, so it should be built within the ICEBERG_BUILD_BUNDLE conditional block. --- src/iceberg/test/CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 22907995f..8ae7cfd1e 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -113,8 +113,6 @@ add_iceberg_test(util_test add_iceberg_test(roaring_test SOURCES roaring_test.cc) -add_iceberg_test(data_writer_test SOURCES data_writer_test.cc) - if(ICEBERG_BUILD_BUNDLE) add_iceberg_test(avro_test USE_BUNDLE @@ -167,6 +165,8 @@ if(ICEBERG_BUILD_BUNDLE) update_properties_test.cc update_sort_order_test.cc) + add_iceberg_test(data_writer_test USE_BUNDLE SOURCES data_writer_test.cc) + endif() if(ICEBERG_BUILD_REST) From 1af767dfc5ca41bf1e23a0d0b759a5bb70c34be0 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Tue, 30 Dec 2025 15:10:17 +0800 Subject: [PATCH 5/6] refine includes --- src/iceberg/data/writer.cc | 3 +-- src/iceberg/data/writer.h | 8 ++------ 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/src/iceberg/data/writer.cc b/src/iceberg/data/writer.cc index 4a18c8053..65b172475 100644 --- a/src/iceberg/data/writer.cc +++ b/src/iceberg/data/writer.cc @@ -21,7 +21,6 @@ namespace iceberg { -// FileWriter is a pure virtual interface class. -// Implementations will be provided in subsequent tasks. +FileWriter::~FileWriter() = default; } // namespace iceberg diff --git a/src/iceberg/data/writer.h b/src/iceberg/data/writer.h index 9876f8cbf..fcfe7e90a 100644 --- a/src/iceberg/data/writer.h +++ b/src/iceberg/data/writer.h @@ -23,13 +23,12 @@ /// Base interface for Iceberg data file writers. #include -#include #include #include "iceberg/arrow_c_data.h" #include "iceberg/iceberg_export.h" -#include "iceberg/manifest/manifest_entry.h" #include "iceberg/result.h" +#include "iceberg/type_fwd.h" namespace iceberg { @@ -43,12 +42,9 @@ namespace iceberg { /// 2. Call Write() one or more times to write data /// 3. Call Close() to finalize the file /// 4. Call Metadata() to get file metadata (only valid after Close()) -/// -/// \note This interface is not thread-safe. Concurrent calls to Write() -/// from multiple threads on the same instance are not supported. class ICEBERG_EXPORT FileWriter { public: - virtual ~FileWriter() = default; + virtual ~FileWriter(); /// \brief Write a batch of records. /// From 41c7a4b8c825fb858528c3f57f9ef89e21a4d0e9 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Tue, 30 Dec 2025 15:27:34 +0800 Subject: [PATCH 6/6] Update src/iceberg/data/writer.h --- src/iceberg/data/writer.h | 1 + 1 file changed, 1 insertion(+) diff --git a/src/iceberg/data/writer.h b/src/iceberg/data/writer.h index fcfe7e90a..6c8400911 100644 --- a/src/iceberg/data/writer.h +++ b/src/iceberg/data/writer.h @@ -23,6 +23,7 @@ /// Base interface for Iceberg data file writers. #include +#include #include #include "iceberg/arrow_c_data.h"