-
Notifications
You must be signed in to change notification settings - Fork 76
feat: support manifest&list writer part 1 #169
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
1 add avro writer and factory(without write func since converter pull/166 not finished yet) 2 add manifest and manifest list writer internal implement definition
wgtmac
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest split this PR into separate ones to make review easier. Perhaps one for avro writer, one for manifest list writer, and one for manifest file writer.
| class ICEBERG_EXPORT Metrics { | ||
| public: | ||
| Metrics() = default; | ||
|
|
||
| explicit Metrics(int64_t row_count, | ||
| std::unordered_map<int64_t, int64_t> column_sizes = {}, | ||
| std::unordered_map<int64_t, int64_t> value_counts = {}, | ||
| std::unordered_map<int64_t, int64_t> null_value_counts = {}, | ||
| std::unordered_map<int64_t, int64_t> nan_value_counts = {}, | ||
| std::unordered_map<int64_t, Literal> lower_bounds = {}, | ||
| std::unordered_map<int64_t, Literal> upper_bounds = {}) | ||
| : row_count_(row_count), | ||
| column_sizes_(std::move(column_sizes)), | ||
| value_counts_(std::move(value_counts)), | ||
| null_value_counts_(std::move(null_value_counts)), | ||
| nan_value_counts_(std::move(nan_value_counts)), | ||
| lower_bounds_(std::move(lower_bounds)), | ||
| upper_bounds_(std::move(upper_bounds)) {} | ||
|
|
||
| private: | ||
| int64_t row_count_ = 0; | ||
| std::unordered_map<int64_t, int64_t> column_sizes_; | ||
| std::unordered_map<int64_t, int64_t> value_counts_; | ||
| std::unordered_map<int64_t, int64_t> null_value_counts_; | ||
| std::unordered_map<int64_t, int64_t> nan_value_counts_; | ||
| std::unordered_map<int64_t, Literal> lower_bounds_; | ||
| std::unordered_map<int64_t, Literal> upper_bounds_; | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| class ICEBERG_EXPORT Metrics { | |
| public: | |
| Metrics() = default; | |
| explicit Metrics(int64_t row_count, | |
| std::unordered_map<int64_t, int64_t> column_sizes = {}, | |
| std::unordered_map<int64_t, int64_t> value_counts = {}, | |
| std::unordered_map<int64_t, int64_t> null_value_counts = {}, | |
| std::unordered_map<int64_t, int64_t> nan_value_counts = {}, | |
| std::unordered_map<int64_t, Literal> lower_bounds = {}, | |
| std::unordered_map<int64_t, Literal> upper_bounds = {}) | |
| : row_count_(row_count), | |
| column_sizes_(std::move(column_sizes)), | |
| value_counts_(std::move(value_counts)), | |
| null_value_counts_(std::move(null_value_counts)), | |
| nan_value_counts_(std::move(nan_value_counts)), | |
| lower_bounds_(std::move(lower_bounds)), | |
| upper_bounds_(std::move(upper_bounds)) {} | |
| private: | |
| int64_t row_count_ = 0; | |
| std::unordered_map<int64_t, int64_t> column_sizes_; | |
| std::unordered_map<int64_t, int64_t> value_counts_; | |
| std::unordered_map<int64_t, int64_t> null_value_counts_; | |
| std::unordered_map<int64_t, int64_t> nan_value_counts_; | |
| std::unordered_map<int64_t, Literal> lower_bounds_; | |
| std::unordered_map<int64_t, Literal> upper_bounds_; | |
| }; | |
| struct ICEBERG_EXPORT Metrics { | |
| int64_t row_count = 0; | |
| std::unordered_map<int64_t, int64_t> column_sizes; | |
| std::unordered_map<int64_t, int64_t> value_counts; | |
| std::unordered_map<int64_t, int64_t> null_value_counts; | |
| std::unordered_map<int64_t, int64_t> nan_value_counts; | |
| std::unordered_map<int64_t, Literal> lower_bounds; | |
| std::unordered_map<int64_t, Literal> upper_bounds; | |
| }; |
What about making it a simple struct to enable aggregate initialization?
|
|
||
| #pragma once | ||
|
|
||
| /// \file iceberg/metrics.h |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some comment here? e.g. Iceberg file format metrics
| const std::vector<ManifestEntry>& entries) const = 0; | ||
|
|
||
| /// \brief Close writer and flush to storage. | ||
| virtual void Close() = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| virtual void Close() = 0; | |
| virtual Status Close() = 0; |
| virtual Status WriteManifestFiles(const std::vector<ManifestFile>& files) const = 0; | ||
|
|
||
| /// \brief Close writer and flush to storage. | ||
| virtual void Close() = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| virtual void Close() = 0; | |
| virtual Status Close() = 0; |
| /// When available, this information is used for planning scan tasks whose boundaries | ||
| /// are determined by these offsets. The returned list must be sorted in ascending order | ||
| /// Only valid after the file is closed. | ||
| virtual std::vector<int64_t> splitOffsets() = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| virtual std::vector<int64_t> splitOffsets() = 0; | |
| virtual std::vector<int64_t> split_offsets() = 0; |
| /// \brief Get the file statistics. | ||
| virtual std::shared_ptr<Metrics> metrics() = 0; | ||
|
|
||
| /// \brief Get the file length. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /// \brief Get the file length. | |
| /// \brief Get the file length. | |
| /// Only valid after the file is closed. |
| /// \return Status of write results. | ||
| virtual Status Write(ArrowArray data) = 0; | ||
|
|
||
| /// \brief Get the file statistics. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /// \brief Get the file statistics. | |
| /// \brief Get the file statistics. | |
| /// Only valid after the file is closed. |
| virtual Status Write(ArrowArray data) = 0; | ||
|
|
||
| /// \brief Get the file statistics. | ||
| virtual std::shared_ptr<Metrics> metrics() = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| virtual std::shared_ptr<Metrics> metrics() = 0; | |
| virtual Metrics metrics() = 0; |
Perhaps we can just return a simple struct instead of a shared_ptr?
| } // namespace | ||
|
|
||
| // A stateful context to keep track of the writing progress. | ||
| struct WriteContext {}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need a context? Writer is much simpler than the reader impl.
| auto root = std::make_shared<::avro::NodeRecord>(); | ||
| ToAvroNodeVisitor visitor; | ||
| for (const auto& field : write_schema_->fields()) { | ||
| ::avro::NodePtr node; | ||
| ICEBERG_RETURN_UNEXPECTED(visitor.Visit(field, &node)); | ||
| root->addLeaf(node); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| auto root = std::make_shared<::avro::NodeRecord>(); | |
| ToAvroNodeVisitor visitor; | |
| for (const auto& field : write_schema_->fields()) { | |
| ::avro::NodePtr node; | |
| ICEBERG_RETURN_UNEXPECTED(visitor.Visit(field, &node)); | |
| root->addLeaf(node); | |
| } | |
| ::avro::NodePtr root; | |
| ICEBERG_RETURN_UNEXPECTED(ToAvroNodeVisitor{}.Visit(*write_schema_, &root)); |
ok, I will seperate them into 2 pr |
1 add avro writer and factory(without write func since converter pull/166 not finished yet)
2 add manifest and manifest list writer internal implement definition