diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index d90c05421..82f60729a 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -23,6 +23,7 @@ set(ICEBERG_SOURCES expression/expression.cc expression/literal.cc file_reader.cc + file_writer.cc json_internal.cc manifest_entry.cc manifest_list.cc @@ -46,6 +47,8 @@ set(ICEBERG_SOURCES type.cc manifest_reader.cc manifest_reader_internal.cc + manifest_writer.cc + manifest_writer_internal.cc arrow_c_data_guard_internal.cc util/murmurhash3_internal.cc util/timepoint.cc @@ -107,6 +110,7 @@ if(ICEBERG_BUILD_BUNDLE) arrow/arrow_fs_file_io.cc avro/avro_data_util.cc avro/avro_reader.cc + avro/avro_writer.cc avro/avro_schema_util.cc avro/avro_register.cc avro/avro_stream_internal.cc diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc new file mode 100644 index 000000000..e205ff8a8 --- /dev/null +++ b/src/iceberg/avro/avro_writer.cc @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/avro/avro_writer.h" + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/avro/avro_schema_util_internal.h" +#include "iceberg/avro/avro_stream_internal.h" +#include "iceberg/schema.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg::avro { + +namespace { + +Result> CreateOutputStream(const WriterOptions& options, + int64_t buffer_size) { + auto io = internal::checked_pointer_cast(options.io); + auto result = io->fs()->OpenOutputStream(options.path); + if (!result.ok()) { + return IOError("Failed to open file {} for {}", options.path, + result.status().message()); + } + return std::make_unique(result.MoveValueUnsafe(), buffer_size); +} + +} // namespace + +// A stateful context to keep track of the writing progress. +struct WriteContext {}; + +class AvroWriter::Impl { + public: + Status Open(const WriterOptions& options) { + write_schema_ = options.schema; + + auto root = std::make_shared<::avro::NodeRecord>(); + ToAvroNodeVisitor visitor; + for (const auto& field : write_schema_->fields()) { + ::avro::NodePtr node; + ICEBERG_RETURN_UNEXPECTED(visitor.Visit(field, &node)); + root->addLeaf(node); + } + avro_schema_ = std::make_shared<::avro::ValidSchema>(root); + + // Open the output stream and adapt to the avro interface. + constexpr int64_t kDefaultBufferSize = 1024 * 1024; + ICEBERG_ASSIGN_OR_RAISE(auto output_stream, + CreateOutputStream(options, kDefaultBufferSize)); + + writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>( + std::move(output_stream), *avro_schema_); + return {}; + } + + Status Write(ArrowArray /*data*/) { + if (!context_) { + ICEBERG_RETURN_UNEXPECTED(InitWriteContext()); + } + // TODO(xiao.dong) convert data and write to avro + // total_bytes_+= written_bytes; + return {}; + } + + Status Close() { + if (writer_ != nullptr) { + writer_->close(); + writer_.reset(); + } + context_.reset(); + return {}; + } + + bool Closed() const { return writer_ == nullptr; } + + int64_t length() { return total_bytes_; } + + private: + Status InitWriteContext() { return {}; } + + private: + int64_t total_bytes_ = 0; + // The schema to write. + std::shared_ptr<::iceberg::Schema> write_schema_; + // The avro schema to write. + std::shared_ptr<::avro::ValidSchema> avro_schema_; + // The avro writer to write the data into a datum. + std::unique_ptr<::avro::DataFileWriter<::avro::GenericDatum>> writer_; + // The context to keep track of the writing progress. + std::unique_ptr context_; +}; + +AvroWriter::~AvroWriter() = default; + +Status AvroWriter::Write(ArrowArray data) { return impl_->Write(data); } + +Status AvroWriter::Open(const WriterOptions& options) { + impl_ = std::make_unique(); + return impl_->Open(options); +} + +Status AvroWriter::Close() { + if (!impl_->Closed()) { + return impl_->Close(); + } + return {}; +} + +std::shared_ptr AvroWriter::metrics() { + if (impl_->Closed()) { + // TODO(xiao.dong) implement metrics + return std::make_shared(); + } + return nullptr; +} + +int64_t AvroWriter::length() { + if (impl_->Closed()) { + return impl_->length(); + } + return 0; +} + +std::vector AvroWriter::splitOffsets() { return {}; } + +void AvroWriter::Register() { + static WriterFactoryRegistry avro_writer_register( + FileFormatType::kAvro, + []() -> Result> { return std::make_unique(); }); +} + +} // namespace iceberg::avro diff --git a/src/iceberg/avro/avro_writer.h b/src/iceberg/avro/avro_writer.h new file mode 100644 index 000000000..61c1176f4 --- /dev/null +++ b/src/iceberg/avro/avro_writer.h @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include "iceberg/file_writer.h" +#include "iceberg/iceberg_bundle_export.h" + +namespace iceberg::avro { + +/// \brief A writer ArrowArray to Avro files. +class ICEBERG_BUNDLE_EXPORT AvroWriter : public Writer { + public: + AvroWriter() = default; + + ~AvroWriter() override; + + Status Open(const WriterOptions& options) final; + + Status Close() final; + + Status Write(ArrowArray data) final; + + std::shared_ptr metrics() final; + + int64_t length() final; + + std::vector splitOffsets() final; + + /// \brief Register this Avro writer implementation. + static void Register(); + + private: + class Impl; + std::unique_ptr impl_; +}; + +} // namespace iceberg::avro diff --git a/src/iceberg/file_writer.cc b/src/iceberg/file_writer.cc new file mode 100644 index 000000000..e5dbea347 --- /dev/null +++ b/src/iceberg/file_writer.cc @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/file_writer.h" + +#include + +#include "iceberg/result.h" +#include "iceberg/util/formatter.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +WriterFactory GetNotImplementedFactory(FileFormatType format_type) { + return [format_type]() -> Result> { + return NotImplemented("Missing writer factory for file format: {}", format_type); + }; +} + +} // namespace + +WriterFactory& WriterFactoryRegistry::GetFactory(FileFormatType format_type) { + static std::unordered_map factories = { + {FileFormatType::kAvro, GetNotImplementedFactory(FileFormatType::kAvro)}, + {FileFormatType::kParquet, GetNotImplementedFactory(FileFormatType::kParquet)}, + {FileFormatType::kOrc, GetNotImplementedFactory(FileFormatType::kOrc)}, + {FileFormatType::kPuffin, GetNotImplementedFactory(FileFormatType::kPuffin)}, + }; + return factories.at(format_type); +} + +WriterFactoryRegistry::WriterFactoryRegistry(FileFormatType format_type, + WriterFactory factory) { + GetFactory(format_type) = std::move(factory); +} + +Result> WriterFactoryRegistry::Open( + FileFormatType format_type, const WriterOptions& options) { + ICEBERG_ASSIGN_OR_RAISE(auto writer, GetFactory(format_type)()); + ICEBERG_RETURN_UNEXPECTED(writer->Open(options)); + return writer; +} + +} // namespace iceberg diff --git a/src/iceberg/file_writer.h b/src/iceberg/file_writer.h index 6c8e75e34..6a0f511bd 100644 --- a/src/iceberg/file_writer.h +++ b/src/iceberg/file_writer.h @@ -28,7 +28,9 @@ #include "iceberg/arrow_c_data.h" #include "iceberg/file_format.h" +#include "iceberg/metrics.h" #include "iceberg/result.h" +#include "iceberg/schema.h" #include "iceberg/type_fwd.h" namespace iceberg { @@ -38,7 +40,7 @@ struct ICEBERG_EXPORT WriterOptions { /// \brief The path to the file to write. std::string path; /// \brief The schema of the data to write. - ArrowSchema schema; + std::shared_ptr schema; /// \brief FileIO instance to open the file. Writer implementations should down cast it /// to the specific FileIO implementation. By default, the `iceberg-bundle` library uses /// `ArrowFileSystemFileIO` as the default implementation. @@ -65,6 +67,19 @@ class ICEBERG_EXPORT Writer { /// /// \return Status of write results. virtual Status Write(ArrowArray data) = 0; + + /// \brief Get the file statistics. + virtual std::shared_ptr metrics() = 0; + + /// \brief Get the file length. + virtual int64_t length() = 0; + + /// \brief Get the file length. + /// Returns a list of recommended split locations, if applicable, null otherwise. + /// When available, this information is used for planning scan tasks whose boundaries + /// are determined by these offsets. The returned list must be sorted in ascending order + /// Only valid after the file is closed. + virtual std::vector splitOffsets() = 0; }; /// \brief Factory function to create a writer of a specific file format. diff --git a/src/iceberg/manifest_writer.cc b/src/iceberg/manifest_writer.cc new file mode 100644 index 000000000..a72adc9ad --- /dev/null +++ b/src/iceberg/manifest_writer.cc @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest_writer.h" + +#include "iceberg/manifest_entry.h" +#include "iceberg/manifest_list.h" +#include "iceberg/manifest_writer_internal.h" +#include "iceberg/schema.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result> ManifestWriter::MakeWriter( + std::string_view manifest_location, std::shared_ptr file_io, + std::shared_ptr partition_schema) { + auto manifest_entry_schema = ManifestEntry::TypeFromPartitionType(partition_schema); + auto fields_span = manifest_entry_schema->fields(); + std::vector fields(fields_span.begin(), fields_span.end()); + auto schema = std::make_shared(fields); + ICEBERG_ASSIGN_OR_RAISE( + auto writer, WriterFactoryRegistry::Open(FileFormatType::kAvro, + {.path = std::string(manifest_location), + .schema = schema, + .io = std::move(file_io)})); + return std::make_unique(std::move(writer), std::move(schema)); +} + +Result> ManifestListWriter::MakeWriter( + std::string_view manifest_list_location, std::shared_ptr file_io) { + std::vector fields(ManifestFile::Type().fields().begin(), + ManifestFile::Type().fields().end()); + auto schema = std::make_shared(fields); + ICEBERG_ASSIGN_OR_RAISE(auto writer, WriterFactoryRegistry::Open( + FileFormatType::kAvro, + {.path = std::string(manifest_list_location), + .schema = schema, + .io = std::move(file_io)})); + return std::make_unique(std::move(writer), std::move(schema)); +} + +} // namespace iceberg diff --git a/src/iceberg/manifest_writer.h b/src/iceberg/manifest_writer.h index 3c5091b3d..76e012451 100644 --- a/src/iceberg/manifest_writer.h +++ b/src/iceberg/manifest_writer.h @@ -35,9 +35,16 @@ namespace iceberg { class ICEBERG_EXPORT ManifestWriter { public: virtual ~ManifestWriter() = default; + + /// \brief Write manifest entries to file + /// \param entries List of manifest entries to write. + /// \return Status::OK() if all entries were written successfully virtual Status WriteManifestEntries( const std::vector& entries) const = 0; + /// \brief Close writer and flush to storage. + virtual void Close() = 0; + /// \brief Creates a writer for a manifest file. /// \param manifest_location Path to the manifest file. /// \param file_io File IO implementation to use. @@ -51,8 +58,15 @@ class ICEBERG_EXPORT ManifestWriter { class ICEBERG_EXPORT ManifestListWriter { public: virtual ~ManifestListWriter() = default; + + /// \brief Write manifest file list to mainifest list file. + /// \param files List of manifest files to write. + /// \return Status::OK() if all files were written successfully virtual Status WriteManifestFiles(const std::vector& files) const = 0; + /// \brief Close writer and flush to storage. + virtual void Close() = 0; + /// \brief Creates a writer for the manifest list. /// \param manifest_list_location Path to the manifest list file. /// \param file_io File IO implementation to use. diff --git a/src/iceberg/manifest_writer_internal.cc b/src/iceberg/manifest_writer_internal.cc new file mode 100644 index 000000000..b127b2cfd --- /dev/null +++ b/src/iceberg/manifest_writer_internal.cc @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "manifest_writer_internal.h" + +#include "iceberg/manifest_entry.h" +#include "iceberg/manifest_list.h" +#include "iceberg/schema.h" +#include "iceberg/type.h" + +namespace iceberg { + +Status ManifestWriterImpl::WriteManifestEntries( + const std::vector& /*entries*/) const { + // TODO(xiao.dong) convert entries to arrow data + return {}; +} + +Status ManifestListWriterImpl::WriteManifestFiles( + const std::vector& /*files*/) const { + // TODO(xiao.dong) convert manifest files to arrow data + return {}; +} + +} // namespace iceberg diff --git a/src/iceberg/manifest_writer_internal.h b/src/iceberg/manifest_writer_internal.h new file mode 100644 index 000000000..6bb393751 --- /dev/null +++ b/src/iceberg/manifest_writer_internal.h @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/internal/manifest_writer_internal.h +/// Writer implementation for manifest list files and manifest files. + +#include "iceberg/manifest_writer.h" + +namespace iceberg { + +/// \brief Write manifest entries to a manifest file. +class ManifestWriterImpl : public ManifestWriter { + public: + explicit ManifestWriterImpl(std::unique_ptr writer, + std::shared_ptr schema) + : schema_(std::move(schema)), writer_(std::move(writer)) {} + + Status WriteManifestEntries(const std::vector& entries) const override; + + void Close() override {} + + private: + std::shared_ptr schema_; + std::unique_ptr writer_; +}; + +/// \brief Write manifest files to a manifest list file. +class ManifestListWriterImpl : public ManifestListWriter { + public: + explicit ManifestListWriterImpl(std::unique_ptr writer, + std::shared_ptr schema) + : schema_(std::move(schema)), writer_(std::move(writer)) {} + + Status WriteManifestFiles(const std::vector& files) const override; + + void Close() override {} + + private: + std::shared_ptr schema_; + std::unique_ptr writer_; +}; + +} // namespace iceberg diff --git a/src/iceberg/metrics.h b/src/iceberg/metrics.h new file mode 100644 index 000000000..244e349b0 --- /dev/null +++ b/src/iceberg/metrics.h @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/metrics.h + +#include + +#include "iceberg/expression/literal.h" +#include "iceberg/iceberg_export.h" + +namespace iceberg { + +/// \brief Iceberg file format metrics +class ICEBERG_EXPORT Metrics { + public: + Metrics() = default; + + explicit Metrics(int64_t row_count, + std::unordered_map column_sizes = {}, + std::unordered_map value_counts = {}, + std::unordered_map null_value_counts = {}, + std::unordered_map nan_value_counts = {}, + std::unordered_map lower_bounds = {}, + std::unordered_map upper_bounds = {}) + : row_count_(row_count), + column_sizes_(std::move(column_sizes)), + value_counts_(std::move(value_counts)), + null_value_counts_(std::move(null_value_counts)), + nan_value_counts_(std::move(nan_value_counts)), + lower_bounds_(std::move(lower_bounds)), + upper_bounds_(std::move(upper_bounds)) {} + + private: + int64_t row_count_ = 0; + std::unordered_map column_sizes_; + std::unordered_map value_counts_; + std::unordered_map null_value_counts_; + std::unordered_map nan_value_counts_; + std::unordered_map lower_bounds_; + std::unordered_map upper_bounds_; +}; + +} // namespace iceberg