Skip to content

Commit 4bbd133

Browse files
authored
feat(writer): support length() for both open and closed writers (#433)
Modified Writer::length() to return current flushed position when open and final file length when closed. Previously length() required the writer to be closed.
1 parent a94cf21 commit 4bbd133

File tree

2 files changed

+21
-14
lines changed

2 files changed

+21
-14
lines changed

src/iceberg/avro/avro_writer.cc

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,14 @@ class AvroWriter::Impl {
120120

121121
bool Closed() const { return writer_ == nullptr; }
122122

123-
int64_t length() { return total_bytes_; }
123+
Result<int64_t> length() {
124+
if (Closed()) {
125+
return total_bytes_;
126+
}
127+
// Return current flushed length when writer is still open
128+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto current_pos, arrow_output_stream_->Tell());
129+
return current_pos;
130+
}
124131

125132
private:
126133
// The schema to write.
@@ -135,6 +142,7 @@ class AvroWriter::Impl {
135142
std::unique_ptr<::avro::GenericDatum> datum_;
136143
// Arrow schema to write data.
137144
ArrowSchema arrow_schema_;
145+
// Total length of the written Avro file.
138146
int64_t total_bytes_ = 0;
139147
};
140148

@@ -162,12 +170,7 @@ Result<Metrics> AvroWriter::metrics() {
162170
return Invalid("AvroWriter is not closed");
163171
}
164172

165-
Result<int64_t> AvroWriter::length() {
166-
if (impl_->Closed()) {
167-
return impl_->length();
168-
}
169-
return Invalid("AvroWriter is not closed");
170-
}
173+
Result<int64_t> AvroWriter::length() { return impl_->length(); }
171174

172175
std::vector<int64_t> AvroWriter::split_offsets() { return {}; }
173176

src/iceberg/parquet/parquet_writer.cc

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,16 @@ class ParquetWriter::Impl {
107107

108108
bool Closed() const { return writer_ == nullptr; }
109109

110-
int64_t length() const { return total_bytes_; }
110+
Result<int64_t> length() {
111+
if (Closed()) {
112+
return total_bytes_;
113+
}
114+
// Return current flushed length when writer is still open.
115+
// It would be good if we could get the number of buffered bytes
116+
// from the internal RowGroupWriter.
117+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto current_pos, output_stream_->Tell());
118+
return current_pos;
119+
}
111120

112121
std::vector<int64_t> split_offsets() const { return split_offsets_; }
113122

@@ -144,12 +153,7 @@ Result<Metrics> ParquetWriter::metrics() {
144153
return {};
145154
}
146155

147-
Result<int64_t> ParquetWriter::length() {
148-
if (!impl_->Closed()) {
149-
return Invalid("ParquetWriter is not closed");
150-
}
151-
return impl_->length();
152-
}
156+
Result<int64_t> ParquetWriter::length() { return impl_->length(); }
153157

154158
std::vector<int64_t> ParquetWriter::split_offsets() {
155159
if (!impl_->Closed()) {

0 commit comments

Comments
 (0)