Skip to content

Commit 18a85e2

Browse files
shangxinliwgtmac
andauthored
feat: add FileWriter base interface for data file writers (#446)
Add iceberg/data subdirectory with FileWriter base interface that defines common operations for writing Iceberg data files, including data files, equality delete files, and position delete files. --------- Co-authored-by: Gang Wu <ustcwg@gmail.com>
1 parent 2bd9747 commit 18a85e2

File tree

6 files changed

+349
-0
lines changed

6 files changed

+349
-0
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ set(ICEBERG_INCLUDES "$<BUILD_INTERFACE:${PROJECT_BINARY_DIR}/src>"
2020
set(ICEBERG_SOURCES
2121
arrow_c_data_guard_internal.cc
2222
catalog/memory/in_memory_catalog.cc
23+
data/writer.cc
2324
delete_file_index.cc
2425
expression/aggregate.cc
2526
expression/binder.cc
@@ -147,6 +148,7 @@ add_iceberg_lib(iceberg
147148
iceberg_install_all_headers(iceberg)
148149

149150
add_subdirectory(catalog)
151+
add_subdirectory(data)
150152
add_subdirectory(expression)
151153
add_subdirectory(manifest)
152154
add_subdirectory(row)

src/iceberg/data/CMakeLists.txt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
iceberg_install_all_headers(iceberg/data)

src/iceberg/data/writer.cc

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/data/writer.h"
21+
22+
namespace iceberg {
23+
24+
FileWriter::~FileWriter() = default;
25+
26+
} // namespace iceberg

src/iceberg/data/writer.h

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/data/writer.h
23+
/// Base interface for Iceberg data file writers.
24+
25+
#include <cstdint>
26+
#include <memory>
27+
#include <vector>
28+
29+
#include "iceberg/arrow_c_data.h"
30+
#include "iceberg/iceberg_export.h"
31+
#include "iceberg/result.h"
32+
#include "iceberg/type_fwd.h"
33+
34+
namespace iceberg {
35+
36+
/// \brief Base interface for data file writers.
37+
///
38+
/// This interface defines the common operations for writing Iceberg data files,
39+
/// including data files, equality delete files, and position delete files.
40+
///
41+
/// Typical usage:
42+
/// 1. Create a writer instance (via concrete implementation)
43+
/// 2. Call Write() one or more times to write data
44+
/// 3. Call Close() to finalize the file
45+
/// 4. Call Metadata() to get file metadata (only valid after Close())
46+
class ICEBERG_EXPORT FileWriter {
47+
public:
48+
virtual ~FileWriter();
49+
50+
/// \brief Write a batch of records.
51+
///
52+
/// \param data Arrow array containing the records to write.
53+
/// \return Status indicating success or failure.
54+
virtual Status Write(ArrowArray* data) = 0;
55+
56+
/// \brief Get the current number of bytes written.
57+
///
58+
/// \return Result containing the number of bytes written or an error.
59+
virtual Result<int64_t> Length() const = 0;
60+
61+
/// \brief Close the writer and finalize the file.
62+
///
63+
/// \return Status indicating success or failure.
64+
virtual Status Close() = 0;
65+
66+
/// \brief File metadata for all files produced by the writer.
67+
struct ICEBERG_EXPORT WriteResult {
68+
/// Usually a writer produces a single data or delete file.
69+
/// Position delete writer may produce multiple file-scoped delete files.
70+
/// In the future, multiple files can be produced if file rolling is supported.
71+
std::vector<std::shared_ptr<DataFile>> data_files;
72+
};
73+
74+
/// \brief Get file metadata for all files produced by this writer.
75+
///
76+
/// This method should be called after Close() to retrieve the metadata
77+
/// for all files written by this writer.
78+
///
79+
/// \return Result containing the write result or an error.
80+
virtual Result<WriteResult> Metadata() = 0;
81+
};
82+
83+
} // namespace iceberg

src/iceberg/test/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ if(ICEBERG_BUILD_BUNDLE)
166166
update_properties_test.cc
167167
update_sort_order_test.cc)
168168

169+
add_iceberg_test(data_writer_test USE_BUNDLE SOURCES data_writer_test.cc)
170+
169171
endif()
170172

171173
if(ICEBERG_BUILD_REST)
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include <memory>
21+
#include <vector>
22+
23+
#include <gmock/gmock.h>
24+
#include <gtest/gtest.h>
25+
26+
#include "iceberg/arrow_c_data.h"
27+
#include "iceberg/data/writer.h"
28+
#include "iceberg/manifest/manifest_entry.h"
29+
#include "iceberg/result.h"
30+
#include "iceberg/test/matchers.h"
31+
32+
namespace iceberg {
33+
34+
// Mock implementation of FileWriter for testing
35+
class MockFileWriter : public FileWriter {
36+
public:
37+
MockFileWriter() = default;
38+
39+
Status Write(ArrowArray* data) override {
40+
if (is_closed_) {
41+
return Invalid("Writer is closed");
42+
}
43+
if (data == nullptr) {
44+
return Invalid("Null data provided");
45+
}
46+
write_count_++;
47+
// Simulate writing some bytes
48+
bytes_written_ += 1024;
49+
return {};
50+
}
51+
52+
Result<int64_t> Length() const override { return bytes_written_; }
53+
54+
Status Close() override {
55+
if (is_closed_) {
56+
return Invalid("Writer already closed");
57+
}
58+
is_closed_ = true;
59+
return {};
60+
}
61+
62+
Result<WriteResult> Metadata() override {
63+
if (!is_closed_) {
64+
return Invalid("Writer must be closed before getting metadata");
65+
}
66+
67+
WriteResult result;
68+
auto data_file = std::make_shared<DataFile>();
69+
data_file->file_path = "/test/data/file.parquet";
70+
data_file->file_format = FileFormatType::kParquet;
71+
data_file->record_count = write_count_ * 100;
72+
data_file->file_size_in_bytes = bytes_written_;
73+
result.data_files.push_back(data_file);
74+
75+
return result;
76+
}
77+
78+
bool is_closed() const { return is_closed_; }
79+
int32_t write_count() const { return write_count_; }
80+
81+
private:
82+
int64_t bytes_written_ = 0;
83+
bool is_closed_ = false;
84+
int32_t write_count_ = 0;
85+
};
86+
87+
TEST(FileWriterTest, BasicWriteOperation) {
88+
MockFileWriter writer;
89+
90+
// Create a dummy ArrowArray (normally this would contain actual data)
91+
ArrowArray dummy_array = {};
92+
93+
ASSERT_THAT(writer.Write(&dummy_array), IsOk());
94+
ASSERT_EQ(writer.write_count(), 1);
95+
96+
auto length_result = writer.Length();
97+
ASSERT_THAT(length_result, IsOk());
98+
ASSERT_EQ(*length_result, 1024);
99+
}
100+
101+
TEST(FileWriterTest, MultipleWrites) {
102+
MockFileWriter writer;
103+
ArrowArray dummy_array = {};
104+
105+
// Write multiple times
106+
for (int i = 0; i < 5; i++) {
107+
ASSERT_THAT(writer.Write(&dummy_array), IsOk());
108+
}
109+
110+
ASSERT_EQ(writer.write_count(), 5);
111+
112+
auto length_result = writer.Length();
113+
ASSERT_THAT(length_result, IsOk());
114+
ASSERT_EQ(*length_result, 5120); // 5 * 1024
115+
}
116+
117+
TEST(FileWriterTest, WriteNullData) {
118+
MockFileWriter writer;
119+
120+
auto status = writer.Write(nullptr);
121+
ASSERT_THAT(status, HasErrorMessage("Null data provided"));
122+
}
123+
124+
TEST(FileWriterTest, CloseWriter) {
125+
MockFileWriter writer;
126+
ArrowArray dummy_array = {};
127+
128+
ASSERT_THAT(writer.Write(&dummy_array), IsOk());
129+
ASSERT_FALSE(writer.is_closed());
130+
131+
ASSERT_THAT(writer.Close(), IsOk());
132+
ASSERT_TRUE(writer.is_closed());
133+
}
134+
135+
TEST(FileWriterTest, DoubleClose) {
136+
MockFileWriter writer;
137+
138+
ASSERT_THAT(writer.Close(), IsOk());
139+
auto status = writer.Close();
140+
ASSERT_THAT(status, HasErrorMessage("Writer already closed"));
141+
}
142+
143+
TEST(FileWriterTest, WriteAfterClose) {
144+
MockFileWriter writer;
145+
ArrowArray dummy_array = {};
146+
147+
ASSERT_THAT(writer.Close(), IsOk());
148+
149+
auto status = writer.Write(&dummy_array);
150+
ASSERT_THAT(status, HasErrorMessage("Writer is closed"));
151+
}
152+
153+
TEST(FileWriterTest, MetadataBeforeClose) {
154+
MockFileWriter writer;
155+
ArrowArray dummy_array = {};
156+
157+
ASSERT_THAT(writer.Write(&dummy_array), IsOk());
158+
159+
auto metadata_result = writer.Metadata();
160+
ASSERT_THAT(metadata_result,
161+
HasErrorMessage("Writer must be closed before getting metadata"));
162+
}
163+
164+
TEST(FileWriterTest, MetadataAfterClose) {
165+
MockFileWriter writer;
166+
ArrowArray dummy_array = {};
167+
168+
// Write some data
169+
ASSERT_THAT(writer.Write(&dummy_array), IsOk());
170+
ASSERT_THAT(writer.Write(&dummy_array), IsOk());
171+
ASSERT_THAT(writer.Write(&dummy_array), IsOk());
172+
173+
// Close the writer
174+
ASSERT_THAT(writer.Close(), IsOk());
175+
176+
// Get metadata
177+
auto metadata_result = writer.Metadata();
178+
ASSERT_THAT(metadata_result, IsOk());
179+
180+
const auto& result = *metadata_result;
181+
ASSERT_EQ(result.data_files.size(), 1);
182+
183+
const auto& data_file = result.data_files[0];
184+
ASSERT_EQ(data_file->file_path, "/test/data/file.parquet");
185+
ASSERT_EQ(data_file->file_format, FileFormatType::kParquet);
186+
ASSERT_EQ(data_file->record_count, 300); // 3 writes * 100 records
187+
ASSERT_EQ(data_file->file_size_in_bytes, 3072); // 3 * 1024
188+
}
189+
190+
TEST(FileWriterTest, WriteResultStructure) {
191+
FileWriter::WriteResult result;
192+
193+
// Test that WriteResult can hold multiple data files
194+
auto data_file1 = std::make_shared<DataFile>();
195+
data_file1->file_path = "/test/data/file1.parquet";
196+
data_file1->record_count = 100;
197+
198+
auto data_file2 = std::make_shared<DataFile>();
199+
data_file2->file_path = "/test/data/file2.parquet";
200+
data_file2->record_count = 200;
201+
202+
result.data_files.push_back(data_file1);
203+
result.data_files.push_back(data_file2);
204+
205+
ASSERT_EQ(result.data_files.size(), 2);
206+
ASSERT_EQ(result.data_files[0]->file_path, "/test/data/file1.parquet");
207+
ASSERT_EQ(result.data_files[0]->record_count, 100);
208+
ASSERT_EQ(result.data_files[1]->file_path, "/test/data/file2.parquet");
209+
ASSERT_EQ(result.data_files[1]->record_count, 200);
210+
}
211+
212+
TEST(FileWriterTest, EmptyWriteResult) {
213+
FileWriter::WriteResult result;
214+
ASSERT_EQ(result.data_files.size(), 0);
215+
ASSERT_TRUE(result.data_files.empty());
216+
}
217+
218+
} // namespace iceberg

0 commit comments

Comments
 (0)