From 7bf5fca3a7be52bf78868eba34f5144a3caaece9 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Thu, 14 Aug 2025 18:00:17 +0800 Subject: [PATCH 01/14] feat: add manifest&manifest list writer 1 add v1v2v3 writer definition 2 --- src/iceberg/CMakeLists.txt | 2 + src/iceberg/manifest_writer.cc | 90 ++++++++++++++++ src/iceberg/manifest_writer.h | 32 +++++- src/iceberg/manifest_writer_internal.cc | 69 ++++++++++++ src/iceberg/manifest_writer_internal.h | 136 ++++++++++++++++++++++++ 5 files changed, 324 insertions(+), 5 deletions(-) create mode 100644 src/iceberg/manifest_writer.cc create mode 100644 src/iceberg/manifest_writer_internal.cc create mode 100644 src/iceberg/manifest_writer_internal.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index b509c2579..cf757e3a3 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -48,6 +48,8 @@ set(ICEBERG_SOURCES type.cc manifest_reader.cc manifest_reader_internal.cc + manifest_writer.cc + manifest_writer_internal.cc arrow_c_data_guard_internal.cc util/murmurhash3_internal.cc util/timepoint.cc diff --git a/src/iceberg/manifest_writer.cc b/src/iceberg/manifest_writer.cc new file mode 100644 index 000000000..13b9ed5ef --- /dev/null +++ b/src/iceberg/manifest_writer.cc @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest_writer.h" + +#include "iceberg/manifest_entry.h" +#include "iceberg/manifest_list.h" +#include "iceberg/manifest_writer_internal.h" +#include "iceberg/schema.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result> ManifestWriter::MakeWriter( + int32_t format_version, int64_t first_row_id, std::string_view manifest_location, + std::shared_ptr file_io, std::shared_ptr partition_schema) { + auto manifest_entry_schema = ManifestEntry::TypeFromPartitionType(partition_schema); + auto fields_span = manifest_entry_schema->fields(); + std::vector fields(fields_span.begin(), fields_span.end()); + auto schema = std::make_shared(fields); + ICEBERG_ASSIGN_OR_RAISE( + auto writer, WriterFactoryRegistry::Open(FileFormatType::kAvro, + {.path = std::string(manifest_location), + .schema = schema, + .io = std::move(file_io)})); + switch (format_version) { + case 1: + return std::make_unique(first_row_id, std::move(writer), + std::move(schema)); + case 2: + return std::make_unique(first_row_id, std::move(writer), + std::move(schema)); + case 3: + return std::make_unique(first_row_id, std::move(writer), + std::move(schema)); + + default: + return InvalidArgument("Unsupported manifest format version: {}", format_version); + } +} + +Result> ManifestListWriter::MakeWriter( + int32_t format_version, int64_t snapshot_id, int64_t parent_snapshot_id, + int64_t sequence_number, int64_t first_row_id, + std::string_view manifest_list_location, std::shared_ptr file_io) { + std::vector fields(ManifestFile::Type().fields().begin(), + ManifestFile::Type().fields().end()); + auto schema = std::make_shared(fields); + ICEBERG_ASSIGN_OR_RAISE(auto writer, WriterFactoryRegistry::Open( + FileFormatType::kAvro, + {.path = std::string(manifest_list_location), + .schema = schema, + .io = std::move(file_io)})); + switch (format_version) { + case 1: + return std::make_unique(snapshot_id, parent_snapshot_id, + sequence_number, first_row_id, + std::move(writer), std::move(schema)); + case 2: + return std::make_unique(snapshot_id, parent_snapshot_id, + sequence_number, first_row_id, + std::move(writer), std::move(schema)); + case 3: + return std::make_unique(snapshot_id, parent_snapshot_id, + sequence_number, first_row_id, + std::move(writer), std::move(schema)); + + default: + return InvalidArgument("Unsupported manifest list format version: {}", + format_version); + } +} + +} // namespace iceberg diff --git a/src/iceberg/manifest_writer.h b/src/iceberg/manifest_writer.h index 3c5091b3d..517a7dacf 100644 --- a/src/iceberg/manifest_writer.h +++ b/src/iceberg/manifest_writer.h @@ -35,29 +35,51 @@ namespace iceberg { class ICEBERG_EXPORT ManifestWriter { public: virtual ~ManifestWriter() = default; - virtual Status WriteManifestEntries( - const std::vector& entries) const = 0; + + /// \brief Write manifest entry to file + /// \param entry Manifest entry to write. + /// \return Status::OK() if all entry was written successfully + virtual Status WriteManifestEntry(const ManifestEntry& entry) const = 0; + + /// \brief Close writer and flush to storage. + virtual Status Close() = 0; /// \brief Creates a writer for a manifest file. + /// \param format_version Format version of the manifest. + /// \param first_row_id First row ID of the snapshot. /// \param manifest_location Path to the manifest file. /// \param file_io File IO implementation to use. /// \return A Result containing the writer or an error. static Result> MakeWriter( - std::string_view manifest_location, std::shared_ptr file_io, - std::shared_ptr partition_schema); + int32_t format_version, int64_t first_row_id, std::string_view manifest_location, + std::shared_ptr file_io, std::shared_ptr partition_schema); }; /// \brief Write manifest files to a manifest list file. class ICEBERG_EXPORT ManifestListWriter { public: virtual ~ManifestListWriter() = default; - virtual Status WriteManifestFiles(const std::vector& files) const = 0; + + /// \brief Write manifest file list to manifest list file. + /// \param file Manifest file to write. + /// \return Status::OK() if all file was written successfully + virtual Status WriteManifestFile(const ManifestFile& file) const = 0; + + /// \brief Close writer and flush to storage. + virtual Status Close() = 0; /// \brief Creates a writer for the manifest list. + /// \param format_version Format version of the manifest list. + /// \param snapshot_id ID of the snapshot. + /// \param parent_snapshot_id ID of the parent snapshot. + /// \param sequence_number Sequence number of the snapshot. + /// \param first_row_id First row ID of the snapshot. /// \param manifest_list_location Path to the manifest list file. /// \param file_io File IO implementation to use. /// \return A Result containing the writer or an error. static Result> MakeWriter( + int32_t format_version, int64_t snapshot_id, int64_t parent_snapshot_id, + int64_t sequence_number, int64_t first_row_id, std::string_view manifest_list_location, std::shared_ptr file_io); }; diff --git a/src/iceberg/manifest_writer_internal.cc b/src/iceberg/manifest_writer_internal.cc new file mode 100644 index 000000000..a6faa35fa --- /dev/null +++ b/src/iceberg/manifest_writer_internal.cc @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "manifest_writer_internal.h" + +#include "iceberg/manifest_entry.h" +#include "iceberg/manifest_list.h" +#include "iceberg/schema.h" + +namespace iceberg { + +Status ManifestWriterV1::WriteManifestEntry(const ManifestEntry& entry) const { + // TODO(xiao.dong) convert entries to arrow data + return {}; +} + +Status ManifestWriterV1::Close() { return {}; } + +Status ManifestWriterV2::WriteManifestEntry(const ManifestEntry& entry) const { + // TODO(xiao.dong) convert entries to arrow data + return {}; +} + +Status ManifestWriterV2::Close() { return {}; } + +Status ManifestWriterV3::WriteManifestEntry(const ManifestEntry& entry) const { + // TODO(xiao.dong) convert entries to arrow data + return {}; +} + +Status ManifestWriterV3::Close() { return {}; } + +Status ManifestListWriterV1::WriteManifestFile(const ManifestFile& file) const { + // TODO(xiao.dong) convert manifest files to arrow data + return {}; +} + +Status ManifestListWriterV1::Close() { return {}; } + +Status ManifestListWriterV2::WriteManifestFile(const ManifestFile& file) const { + // TODO(xiao.dong) convert manifest files to arrow data + return {}; +} + +Status ManifestListWriterV2::Close() { return {}; } + +Status ManifestListWriterV3::WriteManifestFile(const ManifestFile& file) const { + // TODO(xiao.dong) convert manifest files to arrow data + return {}; +} + +Status ManifestListWriterV3::Close() { return {}; } +} // namespace iceberg diff --git a/src/iceberg/manifest_writer_internal.h b/src/iceberg/manifest_writer_internal.h new file mode 100644 index 000000000..9efca8c31 --- /dev/null +++ b/src/iceberg/manifest_writer_internal.h @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/internal/manifest_writer_internal.h +/// Writer implementation for manifest list files and manifest files. + +#include "iceberg/manifest_writer.h" + +namespace iceberg { + +/// \brief Write manifest entries to a manifest file. +class ManifestWriterImpl : public ManifestWriter { + public: + explicit ManifestWriterImpl(int64_t first_row_id, std::unique_ptr writer, + std::shared_ptr schema) + : schema_(std::move(schema)), writer_(std::move(writer)) {} + + private: + std::shared_ptr schema_; + std::unique_ptr writer_; +}; + +/// \brief Write v1 manifest entries to a manifest file. +class ManifestWriterV1 : public ManifestWriterImpl { + public: + explicit ManifestWriterV1(int64_t first_row_id, std::unique_ptr writer, + std::shared_ptr schema) + : ManifestWriterImpl(first_row_id, std::move(writer), std::move(schema)) {} + + Status WriteManifestEntry(const ManifestEntry& entry) const override; + + Status Close() override; +}; + +/// \brief Write v2 manifest entries to a manifest file. +class ManifestWriterV2 : public ManifestWriterImpl { + public: + explicit ManifestWriterV2(int64_t first_row_id, std::unique_ptr writer, + std::shared_ptr schema) + : ManifestWriterImpl(first_row_id, std::move(writer), std::move(schema)) {} + + Status WriteManifestEntry(const ManifestEntry& entry) const override; + + Status Close() override; +}; + +/// \brief Write v3 manifest entries to a manifest file. +class ManifestWriterV3 : public ManifestWriterImpl { + public: + explicit ManifestWriterV3(int64_t first_row_id, std::unique_ptr writer, + std::shared_ptr schema) + : ManifestWriterImpl(first_row_id, std::move(writer), std::move(schema)) {} + + Status WriteManifestEntry(const ManifestEntry& entry) const override; + + Status Close() override; +}; + +/// \brief Write manifest files to a manifest list file. +class ManifestListWriterImpl : public ManifestListWriter { + public: + explicit ManifestListWriterImpl(int64_t snapshot_id, int64_t parent_snapshot_id, + int64_t sequence_number, int64_t first_row_id, + std::unique_ptr writer, + std::shared_ptr schema) + : schema_(std::move(schema)), writer_(std::move(writer)) {} + + private: + std::shared_ptr schema_; + std::unique_ptr writer_; +}; + +/// \brief Write v1 manifest files to a manifest list file. +class ManifestListWriterV1 : public ManifestListWriterImpl { + public: + explicit ManifestListWriterV1(int64_t snapshot_id, int64_t parent_snapshot_id, + int64_t sequence_number, int64_t first_row_id, + std::unique_ptr writer, + std::shared_ptr schema) + : ManifestListWriterImpl(snapshot_id, parent_snapshot_id, sequence_number, + first_row_id, std::move(writer), std::move(schema)) {} + + Status WriteManifestFile(const ManifestFile& file) const override; + + Status Close() override; +}; + +/// \brief Write v2 manifest files to a manifest list file. +class ManifestListWriterV2 : public ManifestListWriterImpl { + public: + explicit ManifestListWriterV2(int64_t snapshot_id, int64_t parent_snapshot_id, + int64_t sequence_number, int64_t first_row_id, + std::unique_ptr writer, + std::shared_ptr schema) + : ManifestListWriterImpl(snapshot_id, parent_snapshot_id, sequence_number, + first_row_id, std::move(writer), std::move(schema)) {} + + Status WriteManifestFile(const ManifestFile& file) const override; + + Status Close() override; +}; + +/// \brief Write v3 manifest files to a manifest list file. +class ManifestListWriterV3 : public ManifestListWriterImpl { + public: + explicit ManifestListWriterV3(int64_t snapshot_id, int64_t parent_snapshot_id, + int64_t sequence_number, int64_t first_row_id, + std::unique_ptr writer, + std::shared_ptr schema) + : ManifestListWriterImpl(snapshot_id, parent_snapshot_id, sequence_number, + first_row_id, std::move(writer), std::move(schema)) {} + + Status WriteManifestFile(const ManifestFile& file) const override; + + Status Close() override; +}; + +} // namespace iceberg From 3f589ac8f57388bc2d0826435772e3e375c2367b Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Fri, 15 Aug 2025 10:09:18 +0800 Subject: [PATCH 02/14] add v1v2v3 metadata wrapper --- src/iceberg/manifest_writer.cc | 19 +++++---- src/iceberg/manifest_writer.h | 6 ++- src/iceberg/manifest_writer_internal.h | 58 ++++++++++++++++++-------- src/iceberg/v1_metadata.h | 53 +++++++++++++++++++++++ src/iceberg/v2_metadata.h | 58 ++++++++++++++++++++++++++ src/iceberg/v3_metadata.h | 58 ++++++++++++++++++++++++++ 6 files changed, 224 insertions(+), 28 deletions(-) create mode 100644 src/iceberg/v1_metadata.h create mode 100644 src/iceberg/v2_metadata.h create mode 100644 src/iceberg/v3_metadata.h diff --git a/src/iceberg/manifest_writer.cc b/src/iceberg/manifest_writer.cc index 13b9ed5ef..866db4a3e 100644 --- a/src/iceberg/manifest_writer.cc +++ b/src/iceberg/manifest_writer.cc @@ -28,8 +28,9 @@ namespace iceberg { Result> ManifestWriter::MakeWriter( - int32_t format_version, int64_t first_row_id, std::string_view manifest_location, - std::shared_ptr file_io, std::shared_ptr partition_schema) { + int32_t format_version, int64_t snapshot_id, int64_t first_row_id, + std::string_view manifest_location, std::shared_ptr file_io, + std::shared_ptr partition_schema) { auto manifest_entry_schema = ManifestEntry::TypeFromPartitionType(partition_schema); auto fields_span = manifest_entry_schema->fields(); std::vector fields(fields_span.begin(), fields_span.end()); @@ -41,14 +42,14 @@ Result> ManifestWriter::MakeWriter( .io = std::move(file_io)})); switch (format_version) { case 1: - return std::make_unique(first_row_id, std::move(writer), + return std::make_unique(snapshot_id, std::move(writer), std::move(schema)); case 2: - return std::make_unique(first_row_id, std::move(writer), + return std::make_unique(snapshot_id, std::move(writer), std::move(schema)); case 3: - return std::make_unique(first_row_id, std::move(writer), - std::move(schema)); + return std::make_unique(snapshot_id, first_row_id, + std::move(writer), std::move(schema)); default: return InvalidArgument("Unsupported manifest format version: {}", format_version); @@ -70,12 +71,12 @@ Result> ManifestListWriter::MakeWriter( switch (format_version) { case 1: return std::make_unique(snapshot_id, parent_snapshot_id, - sequence_number, first_row_id, + std::move(writer), std::move(schema)); case 2: return std::make_unique(snapshot_id, parent_snapshot_id, - sequence_number, first_row_id, - std::move(writer), std::move(schema)); + sequence_number, std::move(writer), + std::move(schema)); case 3: return std::make_unique(snapshot_id, parent_snapshot_id, sequence_number, first_row_id, diff --git a/src/iceberg/manifest_writer.h b/src/iceberg/manifest_writer.h index 517a7dacf..5930243db 100644 --- a/src/iceberg/manifest_writer.h +++ b/src/iceberg/manifest_writer.h @@ -46,13 +46,15 @@ class ICEBERG_EXPORT ManifestWriter { /// \brief Creates a writer for a manifest file. /// \param format_version Format version of the manifest. + /// \param snapshot_id ID of the snapshot. /// \param first_row_id First row ID of the snapshot. /// \param manifest_location Path to the manifest file. /// \param file_io File IO implementation to use. /// \return A Result containing the writer or an error. static Result> MakeWriter( - int32_t format_version, int64_t first_row_id, std::string_view manifest_location, - std::shared_ptr file_io, std::shared_ptr partition_schema); + int32_t format_version, int64_t snapshot_id, int64_t first_row_id, + std::string_view manifest_location, std::shared_ptr file_io, + std::shared_ptr partition_schema); }; /// \brief Write manifest files to a manifest list file. diff --git a/src/iceberg/manifest_writer_internal.h b/src/iceberg/manifest_writer_internal.h index 9efca8c31..6acbe61e2 100644 --- a/src/iceberg/manifest_writer_internal.h +++ b/src/iceberg/manifest_writer_internal.h @@ -23,13 +23,16 @@ /// Writer implementation for manifest list files and manifest files. #include "iceberg/manifest_writer.h" +#include "iceberg/v1_metadata.h" +#include "iceberg/v2_metadata.h" +#include "iceberg/v3_metadata.h" namespace iceberg { /// \brief Write manifest entries to a manifest file. class ManifestWriterImpl : public ManifestWriter { public: - explicit ManifestWriterImpl(int64_t first_row_id, std::unique_ptr writer, + explicit ManifestWriterImpl(int64_t snapshot_id, std::unique_ptr writer, std::shared_ptr schema) : schema_(std::move(schema)), writer_(std::move(writer)) {} @@ -41,44 +44,55 @@ class ManifestWriterImpl : public ManifestWriter { /// \brief Write v1 manifest entries to a manifest file. class ManifestWriterV1 : public ManifestWriterImpl { public: - explicit ManifestWriterV1(int64_t first_row_id, std::unique_ptr writer, + explicit ManifestWriterV1(int64_t snapshot_id, std::unique_ptr writer, std::shared_ptr schema) - : ManifestWriterImpl(first_row_id, std::move(writer), std::move(schema)) {} + : ManifestWriterImpl(snapshot_id, std::move(writer), std::move(schema)) {} Status WriteManifestEntry(const ManifestEntry& entry) const override; Status Close() override; + + private: + V1MetaData::ManifestEntryWrapper wrapper_; }; /// \brief Write v2 manifest entries to a manifest file. class ManifestWriterV2 : public ManifestWriterImpl { public: - explicit ManifestWriterV2(int64_t first_row_id, std::unique_ptr writer, + explicit ManifestWriterV2(int64_t snapshot_id, std::unique_ptr writer, std::shared_ptr schema) - : ManifestWriterImpl(first_row_id, std::move(writer), std::move(schema)) {} + : ManifestWriterImpl(snapshot_id, std::move(writer), std::move(schema)), + wrapper_(snapshot_id) {} Status WriteManifestEntry(const ManifestEntry& entry) const override; Status Close() override; + + private: + V2MetaData::ManifestEntryWrapper wrapper_; }; /// \brief Write v3 manifest entries to a manifest file. class ManifestWriterV3 : public ManifestWriterImpl { public: - explicit ManifestWriterV3(int64_t first_row_id, std::unique_ptr writer, + explicit ManifestWriterV3(int64_t snapshot_id, int64_t first_row_id, + std::unique_ptr writer, std::shared_ptr schema) - : ManifestWriterImpl(first_row_id, std::move(writer), std::move(schema)) {} + : ManifestWriterImpl(snapshot_id, std::move(writer), std::move(schema)), + wrapper_(snapshot_id) {} Status WriteManifestEntry(const ManifestEntry& entry) const override; Status Close() override; + + private: + V3MetaData::ManifestEntryWrapper wrapper_; }; /// \brief Write manifest files to a manifest list file. class ManifestListWriterImpl : public ManifestListWriter { public: explicit ManifestListWriterImpl(int64_t snapshot_id, int64_t parent_snapshot_id, - int64_t sequence_number, int64_t first_row_id, std::unique_ptr writer, std::shared_ptr schema) : schema_(std::move(schema)), writer_(std::move(writer)) {} @@ -92,30 +106,36 @@ class ManifestListWriterImpl : public ManifestListWriter { class ManifestListWriterV1 : public ManifestListWriterImpl { public: explicit ManifestListWriterV1(int64_t snapshot_id, int64_t parent_snapshot_id, - int64_t sequence_number, int64_t first_row_id, + std::unique_ptr writer, std::shared_ptr schema) - : ManifestListWriterImpl(snapshot_id, parent_snapshot_id, sequence_number, - first_row_id, std::move(writer), std::move(schema)) {} + : ManifestListWriterImpl(snapshot_id, parent_snapshot_id, std::move(writer), + std::move(schema)) {} Status WriteManifestFile(const ManifestFile& file) const override; Status Close() override; + + private: + V1MetaData::ManifestFileWrapper wrapper_; }; /// \brief Write v2 manifest files to a manifest list file. class ManifestListWriterV2 : public ManifestListWriterImpl { public: explicit ManifestListWriterV2(int64_t snapshot_id, int64_t parent_snapshot_id, - int64_t sequence_number, int64_t first_row_id, - std::unique_ptr writer, + int64_t sequence_number, std::unique_ptr writer, std::shared_ptr schema) - : ManifestListWriterImpl(snapshot_id, parent_snapshot_id, sequence_number, - first_row_id, std::move(writer), std::move(schema)) {} + : ManifestListWriterImpl(snapshot_id, parent_snapshot_id, std::move(writer), + std::move(schema)), + wrapper_(snapshot_id, sequence_number) {} Status WriteManifestFile(const ManifestFile& file) const override; Status Close() override; + + private: + V2MetaData::ManifestFileWrapper wrapper_; }; /// \brief Write v3 manifest files to a manifest list file. @@ -125,12 +145,16 @@ class ManifestListWriterV3 : public ManifestListWriterImpl { int64_t sequence_number, int64_t first_row_id, std::unique_ptr writer, std::shared_ptr schema) - : ManifestListWriterImpl(snapshot_id, parent_snapshot_id, sequence_number, - first_row_id, std::move(writer), std::move(schema)) {} + : ManifestListWriterImpl(snapshot_id, parent_snapshot_id, std::move(writer), + std::move(schema)), + wrapper_(snapshot_id, sequence_number) {} Status WriteManifestFile(const ManifestFile& file) const override; Status Close() override; + + private: + V3MetaData::ManifestFileWrapper wrapper_; }; } // namespace iceberg diff --git a/src/iceberg/v1_metadata.h b/src/iceberg/v1_metadata.h new file mode 100644 index 000000000..6fb47a37f --- /dev/null +++ b/src/iceberg/v1_metadata.h @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/v1_metadata.h + +#include "iceberg/manifest_entry.h" +#include "iceberg/manifest_list.h" + +namespace iceberg { + +/// \brief v1 metadata wrapper. +/// +/// Wrapper for v1 manifest list and manifest entry. +class V1MetaData { + public: + /// \brief v1 manifest file wrapper. + struct ManifestFileWrapper : public ManifestFile { + explicit ManifestFileWrapper() {} + + ManifestFile wrap(ManifestFile file, int64_t first_row_id) { return *this; } + }; + + /// \brief v1 manifest entry wrapper. + struct ManifestEntryWrapper : public ManifestEntry { + explicit ManifestEntryWrapper() {} + + ManifestEntry wrap(ManifestEntry entry) { return *this; } + }; + + static ManifestFileWrapper manifestFileWrapper() { return ManifestFileWrapper(); } + + static ManifestEntryWrapper manifestEntryWrapper() { return ManifestEntryWrapper(); } +}; + +} // namespace iceberg diff --git a/src/iceberg/v2_metadata.h b/src/iceberg/v2_metadata.h new file mode 100644 index 000000000..7de4c06ce --- /dev/null +++ b/src/iceberg/v2_metadata.h @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/v2_metadata.h + +#include "iceberg/manifest_entry.h" +#include "iceberg/manifest_list.h" + +namespace iceberg { + +/// \brief v2 metadata wrapper. +/// +/// Wrapper for v2 manifest list and manifest entry. +class V2MetaData { + public: + /// \brief v2 manifest file wrapper. + struct ManifestFileWrapper : public ManifestFile { + explicit ManifestFileWrapper(int64_t commit_snapshotId, int64_t sequence_number) {} + + ManifestFile wrap(ManifestFile file, int64_t first_row_id) { return *this; } + }; + + /// \brief v2 manifest entry wrapper. + struct ManifestEntryWrapper : public ManifestEntry { + explicit ManifestEntryWrapper(int64_t commit_snapshot_id) {} + + ManifestEntry wrap(ManifestEntry entry) { return *this; } + }; + + static ManifestFileWrapper manifestFileWrapper(int64_t commit_snapshotId, + int64_t sequence_number) { + return ManifestFileWrapper(commit_snapshotId, sequence_number); + } + + static ManifestEntryWrapper manifestEntryWrapper(int64_t commit_snapshot_id) { + return ManifestEntryWrapper(commit_snapshot_id); + } +}; + +} // namespace iceberg diff --git a/src/iceberg/v3_metadata.h b/src/iceberg/v3_metadata.h new file mode 100644 index 000000000..efbceda4a --- /dev/null +++ b/src/iceberg/v3_metadata.h @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/v3_metadata.h + +#include "iceberg/manifest_entry.h" +#include "iceberg/manifest_list.h" + +namespace iceberg { + +/// \brief v3 metadata wrapper. +/// +/// Wrapper for v3 manifest list and manifest entry. +class V3MetaData { + public: + /// \brief v3 manifest file wrapper. + struct ManifestFileWrapper : public ManifestFile { + explicit ManifestFileWrapper(int64_t commit_snapshotId, int64_t sequence_number) {} + + ManifestFile wrap(ManifestFile file, int64_t first_row_id) { return *this; } + }; + + /// \brief v3 manifest entry wrapper. + struct ManifestEntryWrapper : public ManifestEntry { + explicit ManifestEntryWrapper(int64_t commit_snapshot_id) {} + + ManifestEntry wrap(ManifestEntry entry) { return *this; } + }; + + static ManifestFileWrapper manifestFileWrapper(int64_t commit_snapshotId, + int64_t sequence_number) { + return ManifestFileWrapper(commit_snapshotId, sequence_number); + } + + static ManifestEntryWrapper manifestEntryWrapper(int64_t commit_snapshot_id) { + return ManifestEntryWrapper(commit_snapshot_id); + } +}; + +} // namespace iceberg From fd3650fee23f3eb391ad175eb5f04bae3f1d9f73 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Fri, 15 Aug 2025 11:17:22 +0800 Subject: [PATCH 03/14] fix cpplint --- src/iceberg/v1_metadata.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/iceberg/v1_metadata.h b/src/iceberg/v1_metadata.h index 6fb47a37f..79e410316 100644 --- a/src/iceberg/v1_metadata.h +++ b/src/iceberg/v1_metadata.h @@ -33,14 +33,14 @@ class V1MetaData { public: /// \brief v1 manifest file wrapper. struct ManifestFileWrapper : public ManifestFile { - explicit ManifestFileWrapper() {} + ManifestFileWrapper() = default; ManifestFile wrap(ManifestFile file, int64_t first_row_id) { return *this; } }; /// \brief v1 manifest entry wrapper. struct ManifestEntryWrapper : public ManifestEntry { - explicit ManifestEntryWrapper() {} + ManifestEntryWrapper() = default; ManifestEntry wrap(ManifestEntry entry) { return *this; } }; From 73e115e6466d746b8c5d3e017ac33cfbf6e31387 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Fri, 15 Aug 2025 11:35:59 +0800 Subject: [PATCH 04/14] fix cpplint --- src/iceberg/v1_metadata.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/iceberg/v1_metadata.h b/src/iceberg/v1_metadata.h index 79e410316..8f2ea07ea 100644 --- a/src/iceberg/v1_metadata.h +++ b/src/iceberg/v1_metadata.h @@ -45,9 +45,9 @@ class V1MetaData { ManifestEntry wrap(ManifestEntry entry) { return *this; } }; - static ManifestFileWrapper manifestFileWrapper() { return ManifestFileWrapper(); } + static ManifestFileWrapper manifestFileWrapper() { return {}; } - static ManifestEntryWrapper manifestEntryWrapper() { return ManifestEntryWrapper(); } + static ManifestEntryWrapper manifestEntryWrapper() { return {}; } }; } // namespace iceberg From cf01c0849be58c41a4d59b83c71297d2867e9189 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Mon, 18 Aug 2025 14:31:40 +0800 Subject: [PATCH 05/14] fix comment --- src/iceberg/manifest_writer.cc | 40 ++++++++------ src/iceberg/manifest_writer.h | 30 +++++++---- src/iceberg/manifest_writer_internal.cc | 72 ++++++++++++++++++++++--- src/iceberg/manifest_writer_internal.h | 45 +++++++++++++--- src/iceberg/v1_metadata.h | 4 +- src/iceberg/v2_metadata.h | 4 +- src/iceberg/v3_metadata.h | 6 ++- 7 files changed, 156 insertions(+), 45 deletions(-) diff --git a/src/iceberg/manifest_writer.cc b/src/iceberg/manifest_writer.cc index 866db4a3e..ac412aea2 100644 --- a/src/iceberg/manifest_writer.cc +++ b/src/iceberg/manifest_writer.cc @@ -27,11 +27,12 @@ namespace iceberg { -Result> ManifestWriter::MakeWriter( - int32_t format_version, int64_t snapshot_id, int64_t first_row_id, +Result> ManifestWriter::Make( + int32_t format_version, int64_t snapshot_id, std::optional first_row_id, std::string_view manifest_location, std::shared_ptr file_io, std::shared_ptr partition_schema) { - auto manifest_entry_schema = ManifestEntry::TypeFromPartitionType(partition_schema); + auto manifest_entry_schema = + ManifestEntry::TypeFromPartitionType(std::move(partition_schema)); auto fields_span = manifest_entry_schema->fields(); std::vector fields(fields_span.begin(), fields_span.end()); auto schema = std::make_shared(fields); @@ -48,17 +49,21 @@ Result> ManifestWriter::MakeWriter( return std::make_unique(snapshot_id, std::move(writer), std::move(schema)); case 3: - return std::make_unique(snapshot_id, first_row_id, + // first_row_id is required for V3 manifest entry + if (!first_row_id.has_value()) { + return InvalidManifest("first_row_id is required for V3 manifest entry"); + } + return std::make_unique(snapshot_id, first_row_id.value(), std::move(writer), std::move(schema)); default: - return InvalidArgument("Unsupported manifest format version: {}", format_version); + return NotSupported("Unsupported manifest format version: {}", format_version); } } -Result> ManifestListWriter::MakeWriter( +Result> ManifestListWriter::Make( int32_t format_version, int64_t snapshot_id, int64_t parent_snapshot_id, - int64_t sequence_number, int64_t first_row_id, + std::optional sequence_number, std::optional first_row_id, std::string_view manifest_list_location, std::shared_ptr file_io) { std::vector fields(ManifestFile::Type().fields().begin(), ManifestFile::Type().fields().end()); @@ -71,20 +76,25 @@ Result> ManifestListWriter::MakeWriter( switch (format_version) { case 1: return std::make_unique(snapshot_id, parent_snapshot_id, - std::move(writer), std::move(schema)); case 2: return std::make_unique(snapshot_id, parent_snapshot_id, - sequence_number, std::move(writer), - std::move(schema)); - case 3: - return std::make_unique(snapshot_id, parent_snapshot_id, - sequence_number, first_row_id, + sequence_number.value(), std::move(writer), std::move(schema)); + case 3: + // sequence_number&first_row_id is required for V3 manifest list + if (!sequence_number.has_value()) { + return InvalidManifestList("sequence_number is required for V3 manifest list"); + } + if (!first_row_id.has_value()) { + return InvalidManifestList("first_row_id is required for V3 manifest list"); + } + return std::make_unique( + snapshot_id, parent_snapshot_id, sequence_number.value(), first_row_id.value(), + std::move(writer), std::move(schema)); default: - return InvalidArgument("Unsupported manifest list format version: {}", - format_version); + return NotSupported("Unsupported manifest list format version: {}", format_version); } } diff --git a/src/iceberg/manifest_writer.h b/src/iceberg/manifest_writer.h index 5930243db..6ddc17f1f 100644 --- a/src/iceberg/manifest_writer.h +++ b/src/iceberg/manifest_writer.h @@ -36,10 +36,15 @@ class ICEBERG_EXPORT ManifestWriter { public: virtual ~ManifestWriter() = default; - /// \brief Write manifest entry to file + /// \brief Write manifest entry to file. /// \param entry Manifest entry to write. - /// \return Status::OK() if all entry was written successfully - virtual Status WriteManifestEntry(const ManifestEntry& entry) const = 0; + /// \return Status::OK() if entry was written successfully + virtual Status Add(const ManifestEntry& entry) = 0; + + /// \brief Write manifest entries to file. + /// \param entries Manifest entries to write. + /// \return Status::OK() if all entries were written successfully + virtual Status AddAll(const std::vector& entries) = 0; /// \brief Close writer and flush to storage. virtual Status Close() = 0; @@ -51,8 +56,8 @@ class ICEBERG_EXPORT ManifestWriter { /// \param manifest_location Path to the manifest file. /// \param file_io File IO implementation to use. /// \return A Result containing the writer or an error. - static Result> MakeWriter( - int32_t format_version, int64_t snapshot_id, int64_t first_row_id, + static Result> Make( + int32_t format_version, int64_t snapshot_id, std::optional first_row_id, std::string_view manifest_location, std::shared_ptr file_io, std::shared_ptr partition_schema); }; @@ -62,10 +67,15 @@ class ICEBERG_EXPORT ManifestListWriter { public: virtual ~ManifestListWriter() = default; - /// \brief Write manifest file list to manifest list file. + /// \brief Write manifest file to manifest list file. /// \param file Manifest file to write. - /// \return Status::OK() if all file was written successfully - virtual Status WriteManifestFile(const ManifestFile& file) const = 0; + /// \return Status::OK() if file was written successfully + virtual Status Add(const ManifestFile& file) = 0; + + /// \brief Write manifest file list to manifest list file. + /// \param files Manifest file list to write. + /// \return Status::OK() if all files were written successfully + virtual Status AddAll(const std::vector& files) = 0; /// \brief Close writer and flush to storage. virtual Status Close() = 0; @@ -79,9 +89,9 @@ class ICEBERG_EXPORT ManifestListWriter { /// \param manifest_list_location Path to the manifest list file. /// \param file_io File IO implementation to use. /// \return A Result containing the writer or an error. - static Result> MakeWriter( + static Result> Make( int32_t format_version, int64_t snapshot_id, int64_t parent_snapshot_id, - int64_t sequence_number, int64_t first_row_id, + std::optional sequence_number, std::optional first_row_id, std::string_view manifest_list_location, std::shared_ptr file_io); }; diff --git a/src/iceberg/manifest_writer_internal.cc b/src/iceberg/manifest_writer_internal.cc index a6faa35fa..d5b30d8af 100644 --- a/src/iceberg/manifest_writer_internal.cc +++ b/src/iceberg/manifest_writer_internal.cc @@ -25,45 +25,105 @@ namespace iceberg { -Status ManifestWriterV1::WriteManifestEntry(const ManifestEntry& entry) const { +Status ManifestWriterV1::Add(const ManifestEntry& entry) { + // TODO(xiao.dong) convert entries to arrow data + return {}; +} + +Status ManifestWriterV1::AddAll(const std::vector& files) { // TODO(xiao.dong) convert entries to arrow data return {}; } Status ManifestWriterV1::Close() { return {}; } -Status ManifestWriterV2::WriteManifestEntry(const ManifestEntry& entry) const { +ManifestEntry ManifestWriterV1::prepare(const ManifestEntry& entry) { + return wrapper_.Wrap(entry); +} + +Status ManifestWriterV2::Add(const ManifestEntry& entry) { + // TODO(xiao.dong) convert entries to arrow data + return {}; +} + +Status ManifestWriterV2::AddAll(const std::vector& files) { // TODO(xiao.dong) convert entries to arrow data return {}; } Status ManifestWriterV2::Close() { return {}; } -Status ManifestWriterV3::WriteManifestEntry(const ManifestEntry& entry) const { +ManifestEntry ManifestWriterV2::prepare(const ManifestEntry& entry) { + return wrapper_.Wrap(entry); +} + +Status ManifestWriterV3::Add(const ManifestEntry& entry) { + // TODO(xiao.dong) convert entries to arrow data + return {}; +} + +Status ManifestWriterV3::AddAll(const std::vector& files) { // TODO(xiao.dong) convert entries to arrow data return {}; } Status ManifestWriterV3::Close() { return {}; } -Status ManifestListWriterV1::WriteManifestFile(const ManifestFile& file) const { +ManifestEntry ManifestWriterV3::prepare(const ManifestEntry& entry) { + return wrapper_.Wrap(entry); +} + +Status ManifestListWriterV1::Add(const ManifestFile& file) { + // TODO(xiao.dong) convert manifest files to arrow data + return {}; +} + +Status ManifestListWriterV1::AddAll(const std::vector& files) { // TODO(xiao.dong) convert manifest files to arrow data return {}; } Status ManifestListWriterV1::Close() { return {}; } -Status ManifestListWriterV2::WriteManifestFile(const ManifestFile& file) const { +ManifestFile ManifestListWriterV1::prepare(const ManifestFile& file) { + return wrapper_.Wrap(file); +} + +Status ManifestListWriterV2::Add(const ManifestFile& file) { + // TODO(xiao.dong) convert manifest files to arrow data + return {}; +} + +Status ManifestListWriterV2::AddAll(const std::vector& files) { // TODO(xiao.dong) convert manifest files to arrow data return {}; } Status ManifestListWriterV2::Close() { return {}; } -Status ManifestListWriterV3::WriteManifestFile(const ManifestFile& file) const { +ManifestFile ManifestListWriterV2::prepare(const ManifestFile& file) { + return wrapper_.Wrap(file); +} + +Status ManifestListWriterV3::Add(const ManifestFile& file) { + // TODO(xiao.dong) convert manifest files to arrow data + return {}; +} + +Status ManifestListWriterV3::AddAll(const std::vector& files) { // TODO(xiao.dong) convert manifest files to arrow data return {}; } Status ManifestListWriterV3::Close() { return {}; } + +ManifestFile ManifestListWriterV3::prepare(const ManifestFile& file) { + if (file.content != ManifestFile::Content::kData || file.first_row_id.has_value()) { + return wrapper_.Wrap(file, std::nullopt); + } + auto result = wrapper_.Wrap(file, next_row_id_); + next_row_id_ += + file.existing_rows_count.value_or(0) + file.added_rows_count.value_or(0); + return result; +} } // namespace iceberg diff --git a/src/iceberg/manifest_writer_internal.h b/src/iceberg/manifest_writer_internal.h index 6acbe61e2..0ae711942 100644 --- a/src/iceberg/manifest_writer_internal.h +++ b/src/iceberg/manifest_writer_internal.h @@ -36,6 +36,9 @@ class ManifestWriterImpl : public ManifestWriter { std::shared_ptr schema) : schema_(std::move(schema)), writer_(std::move(writer)) {} + protected: + virtual ManifestEntry prepare(const ManifestEntry& entry) = 0; + private: std::shared_ptr schema_; std::unique_ptr writer_; @@ -48,10 +51,13 @@ class ManifestWriterV1 : public ManifestWriterImpl { std::shared_ptr schema) : ManifestWriterImpl(snapshot_id, std::move(writer), std::move(schema)) {} - Status WriteManifestEntry(const ManifestEntry& entry) const override; - + Status Add(const ManifestEntry& entry) override; + Status AddAll(const std::vector& entries) override; Status Close() override; + protected: + ManifestEntry prepare(const ManifestEntry& entry) override; + private: V1MetaData::ManifestEntryWrapper wrapper_; }; @@ -64,10 +70,14 @@ class ManifestWriterV2 : public ManifestWriterImpl { : ManifestWriterImpl(snapshot_id, std::move(writer), std::move(schema)), wrapper_(snapshot_id) {} - Status WriteManifestEntry(const ManifestEntry& entry) const override; + Status Add(const ManifestEntry& entry) override; + Status AddAll(const std::vector& entries) override; Status Close() override; + protected: + ManifestEntry prepare(const ManifestEntry& entry) override; + private: V2MetaData::ManifestEntryWrapper wrapper_; }; @@ -81,10 +91,14 @@ class ManifestWriterV3 : public ManifestWriterImpl { : ManifestWriterImpl(snapshot_id, std::move(writer), std::move(schema)), wrapper_(snapshot_id) {} - Status WriteManifestEntry(const ManifestEntry& entry) const override; + Status Add(const ManifestEntry& entry) override; + Status AddAll(const std::vector& entries) override; Status Close() override; + protected: + ManifestEntry prepare(const ManifestEntry& entry) override; + private: V3MetaData::ManifestEntryWrapper wrapper_; }; @@ -97,6 +111,9 @@ class ManifestListWriterImpl : public ManifestListWriter { std::shared_ptr schema) : schema_(std::move(schema)), writer_(std::move(writer)) {} + protected: + virtual ManifestFile prepare(const ManifestFile& file) = 0; + private: std::shared_ptr schema_; std::unique_ptr writer_; @@ -112,10 +129,13 @@ class ManifestListWriterV1 : public ManifestListWriterImpl { : ManifestListWriterImpl(snapshot_id, parent_snapshot_id, std::move(writer), std::move(schema)) {} - Status WriteManifestFile(const ManifestFile& file) const override; - + Status Add(const ManifestFile& file) override; + Status AddAll(const std::vector& files) override; Status Close() override; + protected: + ManifestFile prepare(const ManifestFile& file) override; + private: V1MetaData::ManifestFileWrapper wrapper_; }; @@ -130,10 +150,14 @@ class ManifestListWriterV2 : public ManifestListWriterImpl { std::move(schema)), wrapper_(snapshot_id, sequence_number) {} - Status WriteManifestFile(const ManifestFile& file) const override; + Status Add(const ManifestFile& file) override; + Status AddAll(const std::vector& files) override; Status Close() override; + protected: + ManifestFile prepare(const ManifestFile& file) override; + private: V2MetaData::ManifestFileWrapper wrapper_; }; @@ -149,11 +173,16 @@ class ManifestListWriterV3 : public ManifestListWriterImpl { std::move(schema)), wrapper_(snapshot_id, sequence_number) {} - Status WriteManifestFile(const ManifestFile& file) const override; + Status Add(const ManifestFile& file) override; + Status AddAll(const std::vector& files) override; Status Close() override; + protected: + ManifestFile prepare(const ManifestFile& file) override; + private: + int64_t next_row_id_ = 0; V3MetaData::ManifestFileWrapper wrapper_; }; diff --git a/src/iceberg/v1_metadata.h b/src/iceberg/v1_metadata.h index 8f2ea07ea..4ec9a12fc 100644 --- a/src/iceberg/v1_metadata.h +++ b/src/iceberg/v1_metadata.h @@ -35,14 +35,14 @@ class V1MetaData { struct ManifestFileWrapper : public ManifestFile { ManifestFileWrapper() = default; - ManifestFile wrap(ManifestFile file, int64_t first_row_id) { return *this; } + ManifestFile Wrap(ManifestFile file) { return *this; } }; /// \brief v1 manifest entry wrapper. struct ManifestEntryWrapper : public ManifestEntry { ManifestEntryWrapper() = default; - ManifestEntry wrap(ManifestEntry entry) { return *this; } + ManifestEntry Wrap(ManifestEntry entry) { return *this; } }; static ManifestFileWrapper manifestFileWrapper() { return {}; } diff --git a/src/iceberg/v2_metadata.h b/src/iceberg/v2_metadata.h index 7de4c06ce..e264e6eef 100644 --- a/src/iceberg/v2_metadata.h +++ b/src/iceberg/v2_metadata.h @@ -35,14 +35,14 @@ class V2MetaData { struct ManifestFileWrapper : public ManifestFile { explicit ManifestFileWrapper(int64_t commit_snapshotId, int64_t sequence_number) {} - ManifestFile wrap(ManifestFile file, int64_t first_row_id) { return *this; } + ManifestFile Wrap(ManifestFile file) { return *this; } }; /// \brief v2 manifest entry wrapper. struct ManifestEntryWrapper : public ManifestEntry { explicit ManifestEntryWrapper(int64_t commit_snapshot_id) {} - ManifestEntry wrap(ManifestEntry entry) { return *this; } + ManifestEntry Wrap(ManifestEntry entry) { return *this; } }; static ManifestFileWrapper manifestFileWrapper(int64_t commit_snapshotId, diff --git a/src/iceberg/v3_metadata.h b/src/iceberg/v3_metadata.h index efbceda4a..c5d257c92 100644 --- a/src/iceberg/v3_metadata.h +++ b/src/iceberg/v3_metadata.h @@ -35,14 +35,16 @@ class V3MetaData { struct ManifestFileWrapper : public ManifestFile { explicit ManifestFileWrapper(int64_t commit_snapshotId, int64_t sequence_number) {} - ManifestFile wrap(ManifestFile file, int64_t first_row_id) { return *this; } + ManifestFile Wrap(ManifestFile file, std::optional first_row_id) { + return *this; + } }; /// \brief v3 manifest entry wrapper. struct ManifestEntryWrapper : public ManifestEntry { explicit ManifestEntryWrapper(int64_t commit_snapshot_id) {} - ManifestEntry wrap(ManifestEntry entry) { return *this; } + ManifestEntry Wrap(ManifestEntry entry) { return *this; } }; static ManifestFileWrapper manifestFileWrapper(int64_t commit_snapshotId, From 9e70336b649facd90efa56c2451ef869202378df Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Wed, 20 Aug 2025 15:45:56 +0800 Subject: [PATCH 06/14] refactor interface --- src/iceberg/CMakeLists.txt | 1 - src/iceberg/manifest_writer.cc | 240 ++++++++++++++++++------ src/iceberg/manifest_writer.h | 76 +++++++- src/iceberg/manifest_writer_internal.cc | 129 ------------- src/iceberg/manifest_writer_internal.h | 189 ------------------- src/iceberg/metadata_adapter.h | 66 +++++++ src/iceberg/v1_metadata.h | 49 +++-- src/iceberg/v2_metadata.h | 48 ++--- src/iceberg/v3_metadata.h | 52 ++--- src/iceberg/v4_metadata.h | 64 +++++++ 10 files changed, 464 insertions(+), 450 deletions(-) delete mode 100644 src/iceberg/manifest_writer_internal.cc delete mode 100644 src/iceberg/manifest_writer_internal.h create mode 100644 src/iceberg/metadata_adapter.h create mode 100644 src/iceberg/v4_metadata.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index cf757e3a3..dac3ddd23 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -49,7 +49,6 @@ set(ICEBERG_SOURCES manifest_reader.cc manifest_reader_internal.cc manifest_writer.cc - manifest_writer_internal.cc arrow_c_data_guard_internal.cc util/murmurhash3_internal.cc util/timepoint.cc diff --git a/src/iceberg/manifest_writer.cc b/src/iceberg/manifest_writer.cc index ac412aea2..043eff304 100644 --- a/src/iceberg/manifest_writer.cc +++ b/src/iceberg/manifest_writer.cc @@ -21,81 +21,205 @@ #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" -#include "iceberg/manifest_writer_internal.h" #include "iceberg/schema.h" #include "iceberg/util/macros.h" +#include "iceberg/v1_metadata.h" +#include "iceberg/v2_metadata.h" +#include "iceberg/v3_metadata.h" +#include "iceberg/v4_metadata.h" namespace iceberg { -Result> ManifestWriter::Make( - int32_t format_version, int64_t snapshot_id, std::optional first_row_id, - std::string_view manifest_location, std::shared_ptr file_io, - std::shared_ptr partition_schema) { +/// \brief Write manifest files to a manifest list file. +class ManifestWriterImpl : public ManifestWriter { + public: + ManifestWriterImpl(std::unique_ptr writer, + std::unique_ptr adapter) + : writer_(std::move(writer)), adapter_(std::move(adapter)) {} + + Status Add(const ManifestEntry& entry) override { + if (adapter_->size() >= kBatchSize) { + ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending()); + ICEBERG_RETURN_UNEXPECTED(writer_->Write(array)); + ICEBERG_RETURN_UNEXPECTED(adapter_->StartAppending()); + } + return adapter_->Append(entry); + } + + Status AddAll(const std::vector& entries) override { + for (const auto& entry : entries) { + ICEBERG_RETURN_UNEXPECTED(Add(entry)); + } + return {}; + } + + Status Close() override { + if (adapter_->size() > 0) { + ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending()); + ICEBERG_RETURN_UNEXPECTED(writer_->Write(array)); + } + return {}; + } + + private: + static constexpr int64_t kBatchSize = 1024; + std::unique_ptr writer_; + std::unique_ptr adapter_; +}; + +std::shared_ptr ParseSchema(std::shared_ptr partition_schema) { auto manifest_entry_schema = ManifestEntry::TypeFromPartitionType(std::move(partition_schema)); auto fields_span = manifest_entry_schema->fields(); std::vector fields(fields_span.begin(), fields_span.end()); - auto schema = std::make_shared(fields); + return std::make_shared(fields); +} + +Result> OpenFileWriter(std::string_view location, + const std::shared_ptr schema, + std::shared_ptr file_io) { ICEBERG_ASSIGN_OR_RAISE( - auto writer, WriterFactoryRegistry::Open(FileFormatType::kAvro, - {.path = std::string(manifest_location), - .schema = schema, - .io = std::move(file_io)})); - switch (format_version) { - case 1: - return std::make_unique(snapshot_id, std::move(writer), - std::move(schema)); - case 2: - return std::make_unique(snapshot_id, std::move(writer), - std::move(schema)); - case 3: - // first_row_id is required for V3 manifest entry - if (!first_row_id.has_value()) { - return InvalidManifest("first_row_id is required for V3 manifest entry"); - } - return std::make_unique(snapshot_id, first_row_id.value(), - std::move(writer), std::move(schema)); - - default: - return NotSupported("Unsupported manifest format version: {}", format_version); + auto writer, + WriterFactoryRegistry::Open( + FileFormatType::kAvro, + {.path = std::string(location), .schema = schema, .io = std::move(file_io)})); + return writer; +} + +Result> ManifestWriter::MakeV1Writer( + std::optional snapshot_id, std::string_view manifest_location, + std::shared_ptr file_io, std::shared_ptr partition_schema) { + auto schema = ParseSchema(partition_schema); + ICEBERG_ASSIGN_OR_RAISE(auto writer, + OpenFileWriter(manifest_location, schema, std::move(file_io))); + auto adapter = std::make_unique(snapshot_id, std::move(schema)); + return std::make_unique(std::move(writer), std::move(adapter)); +} + +Result> ManifestWriter::MakeV2Writer( + std::optional snapshot_id, std::string_view manifest_location, + std::shared_ptr file_io, std::shared_ptr partition_schema) { + auto schema = ParseSchema(partition_schema); + ICEBERG_ASSIGN_OR_RAISE(auto writer, + OpenFileWriter(manifest_location, schema, std::move(file_io))); + auto adapter = std::make_unique(snapshot_id, std::move(schema)); + return std::make_unique(std::move(writer), std::move(adapter)); +} + +Result> ManifestWriter::MakeV3Writer( + std::optional snapshot_id, std::optional first_row_id, + std::string_view manifest_location, std::shared_ptr file_io, + std::shared_ptr partition_schema) { + auto schema = ParseSchema(partition_schema); + ICEBERG_ASSIGN_OR_RAISE(auto writer, + OpenFileWriter(manifest_location, schema, std::move(file_io))); + auto adapter = std::make_unique(snapshot_id, first_row_id, + std::move(schema)); + return std::make_unique(std::move(writer), std::move(adapter)); +} + +Result> ManifestWriter::MakeV4Writer( + std::optional snapshot_id, std::optional first_row_id, + std::string_view manifest_location, std::shared_ptr file_io, + std::shared_ptr partition_schema) { + auto schema = ParseSchema(partition_schema); + ICEBERG_ASSIGN_OR_RAISE(auto writer, + OpenFileWriter(manifest_location, schema, std::move(file_io))); + auto adapter = std::make_unique(snapshot_id, first_row_id, + std::move(schema)); + return std::make_unique(std::move(writer), std::move(adapter)); +} + +/// \brief Write manifest files to a manifest list file. +class ManifestListWriterImpl : public ManifestListWriter { + public: + ManifestListWriterImpl(std::unique_ptr writer, + std::unique_ptr adapter) + : writer_(std::move(writer)), adapter_(std::move(adapter)) {} + + Status Add(const ManifestFile& file) override { + if (adapter_->size() >= kBatchSize) { + ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending()); + ICEBERG_RETURN_UNEXPECTED(writer_->Write(array)); + ICEBERG_RETURN_UNEXPECTED(adapter_->StartAppending()); + } + return adapter_->Append(file); + } + + Status AddAll(const std::vector& files) override { + for (const auto& file : files) { + ICEBERG_RETURN_UNEXPECTED(Add(file)); + } + return {}; } + + Status Close() override { + if (adapter_->size() > 0) { + ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending()); + ICEBERG_RETURN_UNEXPECTED(writer_->Write(array)); + } + return {}; + } + + private: + static constexpr int64_t kBatchSize = 1024; + std::unique_ptr writer_; + std::unique_ptr adapter_; +}; + +Result> ManifestListWriter::MakeV1Writer( + int64_t snapshot_id, std::optional parent_snapshot_id, + std::string_view manifest_list_location, std::shared_ptr file_io) { + std::vector fields(ManifestFile::Type().fields().begin(), + ManifestFile::Type().fields().end()); + auto schema = std::make_shared(fields); + ICEBERG_ASSIGN_OR_RAISE( + auto writer, OpenFileWriter(manifest_list_location, schema, std::move(file_io))); + auto adapter = std::make_unique(snapshot_id, parent_snapshot_id, + std::move(schema)); + return std::make_unique(std::move(writer), std::move(adapter)); } -Result> ManifestListWriter::Make( - int32_t format_version, int64_t snapshot_id, int64_t parent_snapshot_id, - std::optional sequence_number, std::optional first_row_id, +Result> ManifestListWriter::MakeV2Writer( + int64_t snapshot_id, std::optional parent_snapshot_id, + int64_t sequence_number, std::string_view manifest_list_location, + std::shared_ptr file_io) { + std::vector fields(ManifestFile::Type().fields().begin(), + ManifestFile::Type().fields().end()); + auto schema = std::make_shared(fields); + ICEBERG_ASSIGN_OR_RAISE( + auto writer, OpenFileWriter(manifest_list_location, schema, std::move(file_io))); + auto adapter = std::make_unique( + snapshot_id, parent_snapshot_id, sequence_number, std::move(schema)); + return std::make_unique(std::move(writer), std::move(adapter)); +} + +Result> ManifestListWriter::MakeV3Writer( + int64_t snapshot_id, std::optional parent_snapshot_id, + int64_t sequence_number, std::optional first_row_id, std::string_view manifest_list_location, std::shared_ptr file_io) { std::vector fields(ManifestFile::Type().fields().begin(), ManifestFile::Type().fields().end()); auto schema = std::make_shared(fields); - ICEBERG_ASSIGN_OR_RAISE(auto writer, WriterFactoryRegistry::Open( - FileFormatType::kAvro, - {.path = std::string(manifest_list_location), - .schema = schema, - .io = std::move(file_io)})); - switch (format_version) { - case 1: - return std::make_unique(snapshot_id, parent_snapshot_id, - std::move(writer), std::move(schema)); - case 2: - return std::make_unique(snapshot_id, parent_snapshot_id, - sequence_number.value(), - std::move(writer), std::move(schema)); - case 3: - // sequence_number&first_row_id is required for V3 manifest list - if (!sequence_number.has_value()) { - return InvalidManifestList("sequence_number is required for V3 manifest list"); - } - if (!first_row_id.has_value()) { - return InvalidManifestList("first_row_id is required for V3 manifest list"); - } - return std::make_unique( - snapshot_id, parent_snapshot_id, sequence_number.value(), first_row_id.value(), - std::move(writer), std::move(schema)); - - default: - return NotSupported("Unsupported manifest list format version: {}", format_version); - } + ICEBERG_ASSIGN_OR_RAISE( + auto writer, OpenFileWriter(manifest_list_location, schema, std::move(file_io))); + auto adapter = std::make_unique( + snapshot_id, parent_snapshot_id, sequence_number, first_row_id, std::move(schema)); + return std::make_unique(std::move(writer), std::move(adapter)); +} + +Result> ManifestListWriter::MakeV4Writer( + int64_t snapshot_id, std::optional parent_snapshot_id, + int64_t sequence_number, std::optional first_row_id, + std::string_view manifest_list_location, std::shared_ptr file_io) { + std::vector fields(ManifestFile::Type().fields().begin(), + ManifestFile::Type().fields().end()); + auto schema = std::make_shared(fields); + ICEBERG_ASSIGN_OR_RAISE( + auto writer, OpenFileWriter(manifest_list_location, schema, std::move(file_io))); + auto adapter = std::make_unique( + snapshot_id, parent_snapshot_id, sequence_number, first_row_id, std::move(schema)); + return std::make_unique(std::move(writer), std::move(adapter)); } } // namespace iceberg diff --git a/src/iceberg/manifest_writer.h b/src/iceberg/manifest_writer.h index 6ddc17f1f..9326ff7b5 100644 --- a/src/iceberg/manifest_writer.h +++ b/src/iceberg/manifest_writer.h @@ -50,14 +50,42 @@ class ICEBERG_EXPORT ManifestWriter { virtual Status Close() = 0; /// \brief Creates a writer for a manifest file. - /// \param format_version Format version of the manifest. + /// \param snapshot_id ID of the snapshot. + /// \param manifest_location Path to the manifest file. + /// \param file_io File IO implementation to use. + /// \return A Result containing the writer or an error. + static Result> MakeV1Writer( + std::optional snapshot_id, std::string_view manifest_location, + std::shared_ptr file_io, std::shared_ptr partition_schema); + + /// \brief Creates a writer for a manifest file. + /// \param snapshot_id ID of the snapshot. + /// \param manifest_location Path to the manifest file. + /// \param file_io File IO implementation to use. + /// \return A Result containing the writer or an error. + static Result> MakeV2Writer( + std::optional snapshot_id, std::string_view manifest_location, + std::shared_ptr file_io, std::shared_ptr partition_schema); + + /// \brief Creates a writer for a manifest file. /// \param snapshot_id ID of the snapshot. /// \param first_row_id First row ID of the snapshot. /// \param manifest_location Path to the manifest file. /// \param file_io File IO implementation to use. /// \return A Result containing the writer or an error. - static Result> Make( - int32_t format_version, int64_t snapshot_id, std::optional first_row_id, + static Result> MakeV3Writer( + std::optional snapshot_id, std::optional first_row_id, + std::string_view manifest_location, std::shared_ptr file_io, + std::shared_ptr partition_schema); + + /// \brief Creates a writer for a manifest file. + /// \param snapshot_id ID of the snapshot. + /// \param first_row_id First row ID of the snapshot. + /// \param manifest_location Path to the manifest file. + /// \param file_io File IO implementation to use. + /// \return A Result containing the writer or an error. + static Result> MakeV4Writer( + std::optional snapshot_id, std::optional first_row_id, std::string_view manifest_location, std::shared_ptr file_io, std::shared_ptr partition_schema); }; @@ -80,8 +108,42 @@ class ICEBERG_EXPORT ManifestListWriter { /// \brief Close writer and flush to storage. virtual Status Close() = 0; + /// \brief Creates a writer for the v1 manifest list. + /// \param snapshot_id ID of the snapshot. + /// \param parent_snapshot_id ID of the parent snapshot. + /// \param manifest_list_location Path to the manifest list file. + /// \param file_io File IO implementation to use. + /// \return A Result containing the writer or an error. + static Result> MakeV1Writer( + int64_t snapshot_id, std::optional parent_snapshot_id, + std::string_view manifest_list_location, std::shared_ptr file_io); + + /// \brief Creates a writer for the manifest list. + /// \param snapshot_id ID of the snapshot. + /// \param parent_snapshot_id ID of the parent snapshot. + /// \param sequence_number Sequence number of the snapshot. + /// \param manifest_list_location Path to the manifest list file. + /// \param file_io File IO implementation to use. + /// \return A Result containing the writer or an error. + static Result> MakeV2Writer( + int64_t snapshot_id, std::optional parent_snapshot_id, + int64_t sequence_number, std::string_view manifest_list_location, + std::shared_ptr file_io); + + /// \brief Creates a writer for the manifest list. + /// \param snapshot_id ID of the snapshot. + /// \param parent_snapshot_id ID of the parent snapshot. + /// \param sequence_number Sequence number of the snapshot. + /// \param first_row_id First row ID of the snapshot. + /// \param manifest_list_location Path to the manifest list file. + /// \param file_io File IO implementation to use. + /// \return A Result containing the writer or an error. + static Result> MakeV3Writer( + int64_t snapshot_id, std::optional parent_snapshot_id, + int64_t sequence_number, std::optional first_row_id, + std::string_view manifest_list_location, std::shared_ptr file_io); + /// \brief Creates a writer for the manifest list. - /// \param format_version Format version of the manifest list. /// \param snapshot_id ID of the snapshot. /// \param parent_snapshot_id ID of the parent snapshot. /// \param sequence_number Sequence number of the snapshot. @@ -89,9 +151,9 @@ class ICEBERG_EXPORT ManifestListWriter { /// \param manifest_list_location Path to the manifest list file. /// \param file_io File IO implementation to use. /// \return A Result containing the writer or an error. - static Result> Make( - int32_t format_version, int64_t snapshot_id, int64_t parent_snapshot_id, - std::optional sequence_number, std::optional first_row_id, + static Result> MakeV4Writer( + int64_t snapshot_id, std::optional parent_snapshot_id, + int64_t sequence_number, std::optional first_row_id, std::string_view manifest_list_location, std::shared_ptr file_io); }; diff --git a/src/iceberg/manifest_writer_internal.cc b/src/iceberg/manifest_writer_internal.cc deleted file mode 100644 index d5b30d8af..000000000 --- a/src/iceberg/manifest_writer_internal.cc +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "manifest_writer_internal.h" - -#include "iceberg/manifest_entry.h" -#include "iceberg/manifest_list.h" -#include "iceberg/schema.h" - -namespace iceberg { - -Status ManifestWriterV1::Add(const ManifestEntry& entry) { - // TODO(xiao.dong) convert entries to arrow data - return {}; -} - -Status ManifestWriterV1::AddAll(const std::vector& files) { - // TODO(xiao.dong) convert entries to arrow data - return {}; -} - -Status ManifestWriterV1::Close() { return {}; } - -ManifestEntry ManifestWriterV1::prepare(const ManifestEntry& entry) { - return wrapper_.Wrap(entry); -} - -Status ManifestWriterV2::Add(const ManifestEntry& entry) { - // TODO(xiao.dong) convert entries to arrow data - return {}; -} - -Status ManifestWriterV2::AddAll(const std::vector& files) { - // TODO(xiao.dong) convert entries to arrow data - return {}; -} - -Status ManifestWriterV2::Close() { return {}; } - -ManifestEntry ManifestWriterV2::prepare(const ManifestEntry& entry) { - return wrapper_.Wrap(entry); -} - -Status ManifestWriterV3::Add(const ManifestEntry& entry) { - // TODO(xiao.dong) convert entries to arrow data - return {}; -} - -Status ManifestWriterV3::AddAll(const std::vector& files) { - // TODO(xiao.dong) convert entries to arrow data - return {}; -} - -Status ManifestWriterV3::Close() { return {}; } - -ManifestEntry ManifestWriterV3::prepare(const ManifestEntry& entry) { - return wrapper_.Wrap(entry); -} - -Status ManifestListWriterV1::Add(const ManifestFile& file) { - // TODO(xiao.dong) convert manifest files to arrow data - return {}; -} - -Status ManifestListWriterV1::AddAll(const std::vector& files) { - // TODO(xiao.dong) convert manifest files to arrow data - return {}; -} - -Status ManifestListWriterV1::Close() { return {}; } - -ManifestFile ManifestListWriterV1::prepare(const ManifestFile& file) { - return wrapper_.Wrap(file); -} - -Status ManifestListWriterV2::Add(const ManifestFile& file) { - // TODO(xiao.dong) convert manifest files to arrow data - return {}; -} - -Status ManifestListWriterV2::AddAll(const std::vector& files) { - // TODO(xiao.dong) convert manifest files to arrow data - return {}; -} - -Status ManifestListWriterV2::Close() { return {}; } - -ManifestFile ManifestListWriterV2::prepare(const ManifestFile& file) { - return wrapper_.Wrap(file); -} - -Status ManifestListWriterV3::Add(const ManifestFile& file) { - // TODO(xiao.dong) convert manifest files to arrow data - return {}; -} - -Status ManifestListWriterV3::AddAll(const std::vector& files) { - // TODO(xiao.dong) convert manifest files to arrow data - return {}; -} - -Status ManifestListWriterV3::Close() { return {}; } - -ManifestFile ManifestListWriterV3::prepare(const ManifestFile& file) { - if (file.content != ManifestFile::Content::kData || file.first_row_id.has_value()) { - return wrapper_.Wrap(file, std::nullopt); - } - auto result = wrapper_.Wrap(file, next_row_id_); - next_row_id_ += - file.existing_rows_count.value_or(0) + file.added_rows_count.value_or(0); - return result; -} -} // namespace iceberg diff --git a/src/iceberg/manifest_writer_internal.h b/src/iceberg/manifest_writer_internal.h deleted file mode 100644 index 0ae711942..000000000 --- a/src/iceberg/manifest_writer_internal.h +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#pragma once - -/// \file iceberg/internal/manifest_writer_internal.h -/// Writer implementation for manifest list files and manifest files. - -#include "iceberg/manifest_writer.h" -#include "iceberg/v1_metadata.h" -#include "iceberg/v2_metadata.h" -#include "iceberg/v3_metadata.h" - -namespace iceberg { - -/// \brief Write manifest entries to a manifest file. -class ManifestWriterImpl : public ManifestWriter { - public: - explicit ManifestWriterImpl(int64_t snapshot_id, std::unique_ptr writer, - std::shared_ptr schema) - : schema_(std::move(schema)), writer_(std::move(writer)) {} - - protected: - virtual ManifestEntry prepare(const ManifestEntry& entry) = 0; - - private: - std::shared_ptr schema_; - std::unique_ptr writer_; -}; - -/// \brief Write v1 manifest entries to a manifest file. -class ManifestWriterV1 : public ManifestWriterImpl { - public: - explicit ManifestWriterV1(int64_t snapshot_id, std::unique_ptr writer, - std::shared_ptr schema) - : ManifestWriterImpl(snapshot_id, std::move(writer), std::move(schema)) {} - - Status Add(const ManifestEntry& entry) override; - Status AddAll(const std::vector& entries) override; - Status Close() override; - - protected: - ManifestEntry prepare(const ManifestEntry& entry) override; - - private: - V1MetaData::ManifestEntryWrapper wrapper_; -}; - -/// \brief Write v2 manifest entries to a manifest file. -class ManifestWriterV2 : public ManifestWriterImpl { - public: - explicit ManifestWriterV2(int64_t snapshot_id, std::unique_ptr writer, - std::shared_ptr schema) - : ManifestWriterImpl(snapshot_id, std::move(writer), std::move(schema)), - wrapper_(snapshot_id) {} - - Status Add(const ManifestEntry& entry) override; - Status AddAll(const std::vector& entries) override; - - Status Close() override; - - protected: - ManifestEntry prepare(const ManifestEntry& entry) override; - - private: - V2MetaData::ManifestEntryWrapper wrapper_; -}; - -/// \brief Write v3 manifest entries to a manifest file. -class ManifestWriterV3 : public ManifestWriterImpl { - public: - explicit ManifestWriterV3(int64_t snapshot_id, int64_t first_row_id, - std::unique_ptr writer, - std::shared_ptr schema) - : ManifestWriterImpl(snapshot_id, std::move(writer), std::move(schema)), - wrapper_(snapshot_id) {} - - Status Add(const ManifestEntry& entry) override; - Status AddAll(const std::vector& entries) override; - - Status Close() override; - - protected: - ManifestEntry prepare(const ManifestEntry& entry) override; - - private: - V3MetaData::ManifestEntryWrapper wrapper_; -}; - -/// \brief Write manifest files to a manifest list file. -class ManifestListWriterImpl : public ManifestListWriter { - public: - explicit ManifestListWriterImpl(int64_t snapshot_id, int64_t parent_snapshot_id, - std::unique_ptr writer, - std::shared_ptr schema) - : schema_(std::move(schema)), writer_(std::move(writer)) {} - - protected: - virtual ManifestFile prepare(const ManifestFile& file) = 0; - - private: - std::shared_ptr schema_; - std::unique_ptr writer_; -}; - -/// \brief Write v1 manifest files to a manifest list file. -class ManifestListWriterV1 : public ManifestListWriterImpl { - public: - explicit ManifestListWriterV1(int64_t snapshot_id, int64_t parent_snapshot_id, - - std::unique_ptr writer, - std::shared_ptr schema) - : ManifestListWriterImpl(snapshot_id, parent_snapshot_id, std::move(writer), - std::move(schema)) {} - - Status Add(const ManifestFile& file) override; - Status AddAll(const std::vector& files) override; - Status Close() override; - - protected: - ManifestFile prepare(const ManifestFile& file) override; - - private: - V1MetaData::ManifestFileWrapper wrapper_; -}; - -/// \brief Write v2 manifest files to a manifest list file. -class ManifestListWriterV2 : public ManifestListWriterImpl { - public: - explicit ManifestListWriterV2(int64_t snapshot_id, int64_t parent_snapshot_id, - int64_t sequence_number, std::unique_ptr writer, - std::shared_ptr schema) - : ManifestListWriterImpl(snapshot_id, parent_snapshot_id, std::move(writer), - std::move(schema)), - wrapper_(snapshot_id, sequence_number) {} - - Status Add(const ManifestFile& file) override; - Status AddAll(const std::vector& files) override; - - Status Close() override; - - protected: - ManifestFile prepare(const ManifestFile& file) override; - - private: - V2MetaData::ManifestFileWrapper wrapper_; -}; - -/// \brief Write v3 manifest files to a manifest list file. -class ManifestListWriterV3 : public ManifestListWriterImpl { - public: - explicit ManifestListWriterV3(int64_t snapshot_id, int64_t parent_snapshot_id, - int64_t sequence_number, int64_t first_row_id, - std::unique_ptr writer, - std::shared_ptr schema) - : ManifestListWriterImpl(snapshot_id, parent_snapshot_id, std::move(writer), - std::move(schema)), - wrapper_(snapshot_id, sequence_number) {} - - Status Add(const ManifestFile& file) override; - Status AddAll(const std::vector& files) override; - - Status Close() override; - - protected: - ManifestFile prepare(const ManifestFile& file) override; - - private: - int64_t next_row_id_ = 0; - V3MetaData::ManifestFileWrapper wrapper_; -}; - -} // namespace iceberg diff --git a/src/iceberg/metadata_adapter.h b/src/iceberg/metadata_adapter.h new file mode 100644 index 000000000..16d200823 --- /dev/null +++ b/src/iceberg/metadata_adapter.h @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/metadata_adapter.h +/// Base class of adapter for v1v2v3v4 metadata. + +#include "iceberg/arrow_c_data.h" +#include "iceberg/manifest_entry.h" +#include "iceberg/manifest_list.h" +#include "iceberg/result.h" + +namespace iceberg { + +// \brief Implemented by different versions with different schemas to +// append a list of `ManifestEntry`s to an `ArrowArray`. +class ManifestEntryAdapter { + public: + ManifestEntryAdapter() = default; + virtual ~ManifestEntryAdapter() = default; + + virtual Status StartAppending() = 0; + virtual Status Append(const ManifestEntry& entry) = 0; + virtual Result FinishAppending() = 0; + int64_t size() const { return size_; } + + protected: + ArrowArray array_; // array to append `ManifestEntry`s + int64_t size_ = 0; +}; + +// \brief Implemented by different versions with different schemas to +// append a list of `ManifestFile`s to an `ArrowArray`. +class ManifestFileAdapter { + public: + ManifestFileAdapter() = default; + virtual ~ManifestFileAdapter() = default; + + virtual Status StartAppending() = 0; + virtual Status Append(const ManifestFile& file) = 0; + virtual Result FinishAppending() = 0; + int64_t size() const { return size_; } + + protected: + ArrowArray array_; // array to append `ManifestFile`s + int64_t size_ = 0; +}; + +} // namespace iceberg diff --git a/src/iceberg/v1_metadata.h b/src/iceberg/v1_metadata.h index 4ec9a12fc..1fe2dc672 100644 --- a/src/iceberg/v1_metadata.h +++ b/src/iceberg/v1_metadata.h @@ -23,31 +23,40 @@ #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" +#include "iceberg/metadata_adapter.h" namespace iceberg { -/// \brief v1 metadata wrapper. -/// -/// Wrapper for v1 manifest list and manifest entry. -class V1MetaData { +/// \brief Adapter to convert V1 ManifestEntry to `ArrowArray`. +class ManifestEntryAdapterV1 : public ManifestEntryAdapter { public: - /// \brief v1 manifest file wrapper. - struct ManifestFileWrapper : public ManifestFile { - ManifestFileWrapper() = default; - - ManifestFile Wrap(ManifestFile file) { return *this; } - }; - - /// \brief v1 manifest entry wrapper. - struct ManifestEntryWrapper : public ManifestEntry { - ManifestEntryWrapper() = default; - - ManifestEntry Wrap(ManifestEntry entry) { return *this; } - }; - - static ManifestFileWrapper manifestFileWrapper() { return {}; } + ManifestEntryAdapterV1(std::optional snapshot_id, + std::shared_ptr schema) { + // TODO: init v1 schema + } + Status StartAppending() override { return {}; } + Status Append(const ManifestEntry& entry) override { return {}; } + Result FinishAppending() override { return {}; } + + private: + std::shared_ptr manifest_schema_; + ArrowSchema schema_; // converted from manifest_schema_ +}; - static ManifestEntryWrapper manifestEntryWrapper() { return {}; } +/// \brief Adapter to convert V1 ManifestFile to `ArrowArray`. +class ManifestFileAdapterV1 : public ManifestFileAdapter { + public: + ManifestFileAdapterV1(int64_t snapshot_id, std::optional parent_snapshot_id, + std::shared_ptr schema) { + // TODO: init v1 schema + } + Status StartAppending() override { return {}; } + Status Append(const ManifestFile& file) override { return {}; } + Result FinishAppending() override { return {}; } + + private: + std::shared_ptr manifest_list_schema_; + ArrowSchema schema_; // converted from manifest_list_schema_ }; } // namespace iceberg diff --git a/src/iceberg/v2_metadata.h b/src/iceberg/v2_metadata.h index e264e6eef..7959a7408 100644 --- a/src/iceberg/v2_metadata.h +++ b/src/iceberg/v2_metadata.h @@ -23,36 +23,40 @@ #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" +#include "iceberg/metadata_adapter.h" namespace iceberg { -/// \brief v2 metadata wrapper. -/// -/// Wrapper for v2 manifest list and manifest entry. -class V2MetaData { +/// \brief Adapter to convert V2 ManifestEntry to `ArrowArray`. +class ManifestEntryAdapterV2 : public ManifestEntryAdapter { public: - /// \brief v2 manifest file wrapper. - struct ManifestFileWrapper : public ManifestFile { - explicit ManifestFileWrapper(int64_t commit_snapshotId, int64_t sequence_number) {} - - ManifestFile Wrap(ManifestFile file) { return *this; } - }; - - /// \brief v2 manifest entry wrapper. - struct ManifestEntryWrapper : public ManifestEntry { - explicit ManifestEntryWrapper(int64_t commit_snapshot_id) {} + ManifestEntryAdapterV2(std::optional snapshot_id, + std::shared_ptr schema) { + // TODO: init v2 schema + } + Status StartAppending() override { return {}; } + Status Append(const ManifestEntry& entry) override { return {}; } + Result FinishAppending() override { return {}; } - ManifestEntry Wrap(ManifestEntry entry) { return *this; } - }; + private: + std::shared_ptr manifest_schema_; + ArrowSchema schema_; // converted from manifest_schema_ +}; - static ManifestFileWrapper manifestFileWrapper(int64_t commit_snapshotId, - int64_t sequence_number) { - return ManifestFileWrapper(commit_snapshotId, sequence_number); +/// \brief Adapter to convert V2 ManifestFile to `ArrowArray`. +class ManifestFileAdapterV2 : public ManifestFileAdapter { + public: + ManifestFileAdapterV2(int64_t snapshot_id, std::optional parent_snapshot_id, + int64_t sequence_number, std::shared_ptr schema) { + // TODO: init v2 schema } + Status StartAppending() override { return {}; } + Status Append(const ManifestFile& file) override { return {}; } + Result FinishAppending() override { return {}; } - static ManifestEntryWrapper manifestEntryWrapper(int64_t commit_snapshot_id) { - return ManifestEntryWrapper(commit_snapshot_id); - } + private: + std::shared_ptr manifest_list_schema_; + ArrowSchema schema_; // converted from manifest_list_schema_ }; } // namespace iceberg diff --git a/src/iceberg/v3_metadata.h b/src/iceberg/v3_metadata.h index c5d257c92..00d3b27a3 100644 --- a/src/iceberg/v3_metadata.h +++ b/src/iceberg/v3_metadata.h @@ -23,38 +23,42 @@ #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" +#include "iceberg/metadata_adapter.h" namespace iceberg { -/// \brief v3 metadata wrapper. -/// -/// Wrapper for v3 manifest list and manifest entry. -class V3MetaData { +/// \brief Adapter to convert V3ManifestEntry to `ArrowArray`. +class ManifestEntryAdapterV3 : public ManifestEntryAdapter { public: - /// \brief v3 manifest file wrapper. - struct ManifestFileWrapper : public ManifestFile { - explicit ManifestFileWrapper(int64_t commit_snapshotId, int64_t sequence_number) {} - - ManifestFile Wrap(ManifestFile file, std::optional first_row_id) { - return *this; - } - }; - - /// \brief v3 manifest entry wrapper. - struct ManifestEntryWrapper : public ManifestEntry { - explicit ManifestEntryWrapper(int64_t commit_snapshot_id) {} + ManifestEntryAdapterV3(std::optional snapshot_id, + std::optional first_row_id, + std::shared_ptr schema) { + // TODO: init v3 schema + } + Status StartAppending() override { return {}; } + Status Append(const ManifestEntry& entry) override { return {}; } + Result FinishAppending() override { return {}; } - ManifestEntry Wrap(ManifestEntry entry) { return *this; } - }; + private: + std::shared_ptr manifest_schema_; + ArrowSchema schema_; // converted from manifest_schema_ +}; - static ManifestFileWrapper manifestFileWrapper(int64_t commit_snapshotId, - int64_t sequence_number) { - return ManifestFileWrapper(commit_snapshotId, sequence_number); +/// \brief Adapter to convert V3 ManifestFile to `ArrowArray`. +class ManifestFileAdapterV3 : public ManifestFileAdapter { + public: + ManifestFileAdapterV3(int64_t snapshot_id, std::optional parent_snapshot_id, + int64_t sequence_number, std::optional first_row_id, + std::shared_ptr schema) { + // TODO: init v3 schema } + Status StartAppending() override { return {}; } + Status Append(const ManifestFile& file) override { return {}; } + Result FinishAppending() override { return {}; } - static ManifestEntryWrapper manifestEntryWrapper(int64_t commit_snapshot_id) { - return ManifestEntryWrapper(commit_snapshot_id); - } + private: + std::shared_ptr manifest_list_schema_; + ArrowSchema schema_; // converted from manifest_list_schema_ }; } // namespace iceberg diff --git a/src/iceberg/v4_metadata.h b/src/iceberg/v4_metadata.h new file mode 100644 index 000000000..04d27469a --- /dev/null +++ b/src/iceberg/v4_metadata.h @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/v4_metadata.h + +#include "iceberg/manifest_entry.h" +#include "iceberg/manifest_list.h" +#include "iceberg/metadata_adapter.h" + +namespace iceberg { + +/// \brief Adapter to convert V4 ManifestEntry to `ArrowArray`. +class ManifestEntryAdapterV4 : public ManifestEntryAdapter { + public: + ManifestEntryAdapterV4(std::optional snapshot_id, + std::optional first_row_id, + std::shared_ptr schema) { + // TODO: init v4 schema + } + Status StartAppending() override { return {}; } + Status Append(const ManifestEntry& entry) override { return {}; } + Result FinishAppending() override { return {}; } + + private: + std::shared_ptr manifest_schema_; + ArrowSchema schema_; // converted from manifest_schema_ +}; + +/// \brief Adapter to convert V4 ManifestFile to `ArrowArray`. +class ManifestFileAdapterV4 : public ManifestFileAdapter { + public: + ManifestFileAdapterV4(int64_t snapshot_id, std::optional parent_snapshot_id, + int64_t sequence_number, std::optional first_row_id, + std::shared_ptr schema) { + // TODO: init v4 schema + } + Status StartAppending() override { return {}; } + Status Append(const ManifestFile& file) override { return {}; } + Result FinishAppending() override { return {}; } + + private: + std::shared_ptr manifest_list_schema_; + ArrowSchema schema_; // converted from manifest_list_schema_ +}; + +} // namespace iceberg From 4cb91b5e04bbf1a21a1b5f078fde136fd5b533a9 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Wed, 20 Aug 2025 16:51:40 +0800 Subject: [PATCH 07/14] fix todo --- src/iceberg/v1_metadata.h | 4 ++-- src/iceberg/v2_metadata.h | 4 ++-- src/iceberg/v3_metadata.h | 4 ++-- src/iceberg/v4_metadata.h | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/iceberg/v1_metadata.h b/src/iceberg/v1_metadata.h index 1fe2dc672..b010b284d 100644 --- a/src/iceberg/v1_metadata.h +++ b/src/iceberg/v1_metadata.h @@ -32,7 +32,7 @@ class ManifestEntryAdapterV1 : public ManifestEntryAdapter { public: ManifestEntryAdapterV1(std::optional snapshot_id, std::shared_ptr schema) { - // TODO: init v1 schema + // TODO(xiao.dong): init v1 schema } Status StartAppending() override { return {}; } Status Append(const ManifestEntry& entry) override { return {}; } @@ -48,7 +48,7 @@ class ManifestFileAdapterV1 : public ManifestFileAdapter { public: ManifestFileAdapterV1(int64_t snapshot_id, std::optional parent_snapshot_id, std::shared_ptr schema) { - // TODO: init v1 schema + // TODO(xiao.dong): init v1 schema } Status StartAppending() override { return {}; } Status Append(const ManifestFile& file) override { return {}; } diff --git a/src/iceberg/v2_metadata.h b/src/iceberg/v2_metadata.h index 7959a7408..185de6510 100644 --- a/src/iceberg/v2_metadata.h +++ b/src/iceberg/v2_metadata.h @@ -32,7 +32,7 @@ class ManifestEntryAdapterV2 : public ManifestEntryAdapter { public: ManifestEntryAdapterV2(std::optional snapshot_id, std::shared_ptr schema) { - // TODO: init v2 schema + // TODO(xiao.dong): init v2 schema } Status StartAppending() override { return {}; } Status Append(const ManifestEntry& entry) override { return {}; } @@ -48,7 +48,7 @@ class ManifestFileAdapterV2 : public ManifestFileAdapter { public: ManifestFileAdapterV2(int64_t snapshot_id, std::optional parent_snapshot_id, int64_t sequence_number, std::shared_ptr schema) { - // TODO: init v2 schema + // TODO(xiao.dong): init v2 schema } Status StartAppending() override { return {}; } Status Append(const ManifestFile& file) override { return {}; } diff --git a/src/iceberg/v3_metadata.h b/src/iceberg/v3_metadata.h index 00d3b27a3..20c655f0f 100644 --- a/src/iceberg/v3_metadata.h +++ b/src/iceberg/v3_metadata.h @@ -33,7 +33,7 @@ class ManifestEntryAdapterV3 : public ManifestEntryAdapter { ManifestEntryAdapterV3(std::optional snapshot_id, std::optional first_row_id, std::shared_ptr schema) { - // TODO: init v3 schema + // TODO(xiao.dong): init v3 schema } Status StartAppending() override { return {}; } Status Append(const ManifestEntry& entry) override { return {}; } @@ -50,7 +50,7 @@ class ManifestFileAdapterV3 : public ManifestFileAdapter { ManifestFileAdapterV3(int64_t snapshot_id, std::optional parent_snapshot_id, int64_t sequence_number, std::optional first_row_id, std::shared_ptr schema) { - // TODO: init v3 schema + // TODO(xiao.dong): init v3 schema } Status StartAppending() override { return {}; } Status Append(const ManifestFile& file) override { return {}; } diff --git a/src/iceberg/v4_metadata.h b/src/iceberg/v4_metadata.h index 04d27469a..54eea075b 100644 --- a/src/iceberg/v4_metadata.h +++ b/src/iceberg/v4_metadata.h @@ -33,7 +33,7 @@ class ManifestEntryAdapterV4 : public ManifestEntryAdapter { ManifestEntryAdapterV4(std::optional snapshot_id, std::optional first_row_id, std::shared_ptr schema) { - // TODO: init v4 schema + // TODO(xiao.dong): init v4 schema } Status StartAppending() override { return {}; } Status Append(const ManifestEntry& entry) override { return {}; } @@ -50,7 +50,7 @@ class ManifestFileAdapterV4 : public ManifestFileAdapter { ManifestFileAdapterV4(int64_t snapshot_id, std::optional parent_snapshot_id, int64_t sequence_number, std::optional first_row_id, std::shared_ptr schema) { - // TODO: init v4 schema + // TODO(xiao.dong): init v4 schema } Status StartAppending() override { return {}; } Status Append(const ManifestFile& file) override { return {}; } From 515c0add4ebd68d154b440c3074cfc96c83fbb6d Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Wed, 20 Aug 2025 19:01:13 +0800 Subject: [PATCH 08/14] remove v4 --- ...{metadata_adapter.h => manifest_adapter.h} | 0 src/iceberg/manifest_writer.cc | 59 ++++++----------- src/iceberg/manifest_writer.h | 24 ------- src/iceberg/v1_metadata.h | 2 +- src/iceberg/v2_metadata.h | 2 +- src/iceberg/v3_metadata.h | 2 +- src/iceberg/v4_metadata.h | 64 ------------------- 7 files changed, 24 insertions(+), 129 deletions(-) rename src/iceberg/{metadata_adapter.h => manifest_adapter.h} (100%) delete mode 100644 src/iceberg/v4_metadata.h diff --git a/src/iceberg/metadata_adapter.h b/src/iceberg/manifest_adapter.h similarity index 100% rename from src/iceberg/metadata_adapter.h rename to src/iceberg/manifest_adapter.h diff --git a/src/iceberg/manifest_writer.cc b/src/iceberg/manifest_writer.cc index 043eff304..649b3ed66 100644 --- a/src/iceberg/manifest_writer.cc +++ b/src/iceberg/manifest_writer.cc @@ -26,7 +26,6 @@ #include "iceberg/v1_metadata.h" #include "iceberg/v2_metadata.h" #include "iceberg/v3_metadata.h" -#include "iceberg/v4_metadata.h" namespace iceberg { @@ -67,14 +66,6 @@ class ManifestWriterImpl : public ManifestWriter { std::unique_ptr adapter_; }; -std::shared_ptr ParseSchema(std::shared_ptr partition_schema) { - auto manifest_entry_schema = - ManifestEntry::TypeFromPartitionType(std::move(partition_schema)); - auto fields_span = manifest_entry_schema->fields(); - std::vector fields(fields_span.begin(), fields_span.end()); - return std::make_shared(fields); -} - Result> OpenFileWriter(std::string_view location, const std::shared_ptr schema, std::shared_ptr file_io) { @@ -89,7 +80,12 @@ Result> OpenFileWriter(std::string_view location, Result> ManifestWriter::MakeV1Writer( std::optional snapshot_id, std::string_view manifest_location, std::shared_ptr file_io, std::shared_ptr partition_schema) { - auto schema = ParseSchema(partition_schema); + // TODO(xiao.dong) parse v1 schema + auto manifest_entry_schema = + ManifestEntry::TypeFromPartitionType(std::move(partition_schema)); + auto fields_span = manifest_entry_schema->fields(); + std::vector fields(fields_span.begin(), fields_span.end()); + auto schema = std::make_shared(fields); ICEBERG_ASSIGN_OR_RAISE(auto writer, OpenFileWriter(manifest_location, schema, std::move(file_io))); auto adapter = std::make_unique(snapshot_id, std::move(schema)); @@ -99,7 +95,12 @@ Result> ManifestWriter::MakeV1Writer( Result> ManifestWriter::MakeV2Writer( std::optional snapshot_id, std::string_view manifest_location, std::shared_ptr file_io, std::shared_ptr partition_schema) { - auto schema = ParseSchema(partition_schema); + // TODO(xiao.dong) parse v2 schema + auto manifest_entry_schema = + ManifestEntry::TypeFromPartitionType(std::move(partition_schema)); + auto fields_span = manifest_entry_schema->fields(); + std::vector fields(fields_span.begin(), fields_span.end()); + auto schema = std::make_shared(fields); ICEBERG_ASSIGN_OR_RAISE(auto writer, OpenFileWriter(manifest_location, schema, std::move(file_io))); auto adapter = std::make_unique(snapshot_id, std::move(schema)); @@ -110,7 +111,12 @@ Result> ManifestWriter::MakeV3Writer( std::optional snapshot_id, std::optional first_row_id, std::string_view manifest_location, std::shared_ptr file_io, std::shared_ptr partition_schema) { - auto schema = ParseSchema(partition_schema); + // TODO(xiao.dong) parse v3 schema + auto manifest_entry_schema = + ManifestEntry::TypeFromPartitionType(std::move(partition_schema)); + auto fields_span = manifest_entry_schema->fields(); + std::vector fields(fields_span.begin(), fields_span.end()); + auto schema = std::make_shared(fields); ICEBERG_ASSIGN_OR_RAISE(auto writer, OpenFileWriter(manifest_location, schema, std::move(file_io))); auto adapter = std::make_unique(snapshot_id, first_row_id, @@ -118,18 +124,6 @@ Result> ManifestWriter::MakeV3Writer( return std::make_unique(std::move(writer), std::move(adapter)); } -Result> ManifestWriter::MakeV4Writer( - std::optional snapshot_id, std::optional first_row_id, - std::string_view manifest_location, std::shared_ptr file_io, - std::shared_ptr partition_schema) { - auto schema = ParseSchema(partition_schema); - ICEBERG_ASSIGN_OR_RAISE(auto writer, - OpenFileWriter(manifest_location, schema, std::move(file_io))); - auto adapter = std::make_unique(snapshot_id, first_row_id, - std::move(schema)); - return std::make_unique(std::move(writer), std::move(adapter)); -} - /// \brief Write manifest files to a manifest list file. class ManifestListWriterImpl : public ManifestListWriter { public: @@ -170,6 +164,7 @@ class ManifestListWriterImpl : public ManifestListWriter { Result> ManifestListWriter::MakeV1Writer( int64_t snapshot_id, std::optional parent_snapshot_id, std::string_view manifest_list_location, std::shared_ptr file_io) { + // TODO(xiao.dong) parse v1 schema std::vector fields(ManifestFile::Type().fields().begin(), ManifestFile::Type().fields().end()); auto schema = std::make_shared(fields); @@ -184,6 +179,7 @@ Result> ManifestListWriter::MakeV2Writer( int64_t snapshot_id, std::optional parent_snapshot_id, int64_t sequence_number, std::string_view manifest_list_location, std::shared_ptr file_io) { + // TODO(xiao.dong) parse v2 schema std::vector fields(ManifestFile::Type().fields().begin(), ManifestFile::Type().fields().end()); auto schema = std::make_shared(fields); @@ -198,6 +194,7 @@ Result> ManifestListWriter::MakeV3Writer( int64_t snapshot_id, std::optional parent_snapshot_id, int64_t sequence_number, std::optional first_row_id, std::string_view manifest_list_location, std::shared_ptr file_io) { + // TODO(xiao.dong) parse v3 schema std::vector fields(ManifestFile::Type().fields().begin(), ManifestFile::Type().fields().end()); auto schema = std::make_shared(fields); @@ -208,18 +205,4 @@ Result> ManifestListWriter::MakeV3Writer( return std::make_unique(std::move(writer), std::move(adapter)); } -Result> ManifestListWriter::MakeV4Writer( - int64_t snapshot_id, std::optional parent_snapshot_id, - int64_t sequence_number, std::optional first_row_id, - std::string_view manifest_list_location, std::shared_ptr file_io) { - std::vector fields(ManifestFile::Type().fields().begin(), - ManifestFile::Type().fields().end()); - auto schema = std::make_shared(fields); - ICEBERG_ASSIGN_OR_RAISE( - auto writer, OpenFileWriter(manifest_list_location, schema, std::move(file_io))); - auto adapter = std::make_unique( - snapshot_id, parent_snapshot_id, sequence_number, first_row_id, std::move(schema)); - return std::make_unique(std::move(writer), std::move(adapter)); -} - } // namespace iceberg diff --git a/src/iceberg/manifest_writer.h b/src/iceberg/manifest_writer.h index 9326ff7b5..51c246b26 100644 --- a/src/iceberg/manifest_writer.h +++ b/src/iceberg/manifest_writer.h @@ -77,17 +77,6 @@ class ICEBERG_EXPORT ManifestWriter { std::optional snapshot_id, std::optional first_row_id, std::string_view manifest_location, std::shared_ptr file_io, std::shared_ptr partition_schema); - - /// \brief Creates a writer for a manifest file. - /// \param snapshot_id ID of the snapshot. - /// \param first_row_id First row ID of the snapshot. - /// \param manifest_location Path to the manifest file. - /// \param file_io File IO implementation to use. - /// \return A Result containing the writer or an error. - static Result> MakeV4Writer( - std::optional snapshot_id, std::optional first_row_id, - std::string_view manifest_location, std::shared_ptr file_io, - std::shared_ptr partition_schema); }; /// \brief Write manifest files to a manifest list file. @@ -142,19 +131,6 @@ class ICEBERG_EXPORT ManifestListWriter { int64_t snapshot_id, std::optional parent_snapshot_id, int64_t sequence_number, std::optional first_row_id, std::string_view manifest_list_location, std::shared_ptr file_io); - - /// \brief Creates a writer for the manifest list. - /// \param snapshot_id ID of the snapshot. - /// \param parent_snapshot_id ID of the parent snapshot. - /// \param sequence_number Sequence number of the snapshot. - /// \param first_row_id First row ID of the snapshot. - /// \param manifest_list_location Path to the manifest list file. - /// \param file_io File IO implementation to use. - /// \return A Result containing the writer or an error. - static Result> MakeV4Writer( - int64_t snapshot_id, std::optional parent_snapshot_id, - int64_t sequence_number, std::optional first_row_id, - std::string_view manifest_list_location, std::shared_ptr file_io); }; } // namespace iceberg diff --git a/src/iceberg/v1_metadata.h b/src/iceberg/v1_metadata.h index b010b284d..57be46ba7 100644 --- a/src/iceberg/v1_metadata.h +++ b/src/iceberg/v1_metadata.h @@ -21,9 +21,9 @@ /// \file iceberg/v1_metadata.h +#include "iceberg/manifest_adapter.h" #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" -#include "iceberg/metadata_adapter.h" namespace iceberg { diff --git a/src/iceberg/v2_metadata.h b/src/iceberg/v2_metadata.h index 185de6510..e0344802b 100644 --- a/src/iceberg/v2_metadata.h +++ b/src/iceberg/v2_metadata.h @@ -21,9 +21,9 @@ /// \file iceberg/v2_metadata.h +#include "iceberg/manifest_adapter.h" #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" -#include "iceberg/metadata_adapter.h" namespace iceberg { diff --git a/src/iceberg/v3_metadata.h b/src/iceberg/v3_metadata.h index 20c655f0f..c9a137915 100644 --- a/src/iceberg/v3_metadata.h +++ b/src/iceberg/v3_metadata.h @@ -21,9 +21,9 @@ /// \file iceberg/v3_metadata.h +#include "iceberg/manifest_adapter.h" #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" -#include "iceberg/metadata_adapter.h" namespace iceberg { diff --git a/src/iceberg/v4_metadata.h b/src/iceberg/v4_metadata.h deleted file mode 100644 index 54eea075b..000000000 --- a/src/iceberg/v4_metadata.h +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#pragma once - -/// \file iceberg/v4_metadata.h - -#include "iceberg/manifest_entry.h" -#include "iceberg/manifest_list.h" -#include "iceberg/metadata_adapter.h" - -namespace iceberg { - -/// \brief Adapter to convert V4 ManifestEntry to `ArrowArray`. -class ManifestEntryAdapterV4 : public ManifestEntryAdapter { - public: - ManifestEntryAdapterV4(std::optional snapshot_id, - std::optional first_row_id, - std::shared_ptr schema) { - // TODO(xiao.dong): init v4 schema - } - Status StartAppending() override { return {}; } - Status Append(const ManifestEntry& entry) override { return {}; } - Result FinishAppending() override { return {}; } - - private: - std::shared_ptr manifest_schema_; - ArrowSchema schema_; // converted from manifest_schema_ -}; - -/// \brief Adapter to convert V4 ManifestFile to `ArrowArray`. -class ManifestFileAdapterV4 : public ManifestFileAdapter { - public: - ManifestFileAdapterV4(int64_t snapshot_id, std::optional parent_snapshot_id, - int64_t sequence_number, std::optional first_row_id, - std::shared_ptr schema) { - // TODO(xiao.dong): init v4 schema - } - Status StartAppending() override { return {}; } - Status Append(const ManifestFile& file) override { return {}; } - Result FinishAppending() override { return {}; } - - private: - std::shared_ptr manifest_list_schema_; - ArrowSchema schema_; // converted from manifest_list_schema_ -}; - -} // namespace iceberg From 854176a5f7e126c3af3babff61637502e46ed8d5 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Thu, 21 Aug 2025 15:55:14 +0800 Subject: [PATCH 09/14] fix comment --- src/iceberg/manifest_adapter.h | 31 +++++----- src/iceberg/manifest_writer.cc | 110 +++++++++++++-------------------- src/iceberg/manifest_writer.h | 31 ++++++++-- 3 files changed, 83 insertions(+), 89 deletions(-) diff --git a/src/iceberg/manifest_adapter.h b/src/iceberg/manifest_adapter.h index 16d200823..69bbecc26 100644 --- a/src/iceberg/manifest_adapter.h +++ b/src/iceberg/manifest_adapter.h @@ -29,38 +29,39 @@ namespace iceberg { -// \brief Implemented by different versions with different schemas to -// append a list of `ManifestEntry`s to an `ArrowArray`. -class ManifestEntryAdapter { +// \brief Base class of adapter for v1v2v3v4 metadata. +class ICEBERG_EXPORT MetadataAdapter { public: - ManifestEntryAdapter() = default; - virtual ~ManifestEntryAdapter() = default; + MetadataAdapter() = default; + virtual ~MetadataAdapter() = default; virtual Status StartAppending() = 0; - virtual Status Append(const ManifestEntry& entry) = 0; virtual Result FinishAppending() = 0; int64_t size() const { return size_; } protected: - ArrowArray array_; // array to append `ManifestEntry`s + ArrowArray array_; int64_t size_ = 0; }; +// \brief Implemented by different versions with different schemas to +// append a list of `ManifestEntry`s to an `ArrowArray`. +class ICEBERG_EXPORT ManifestEntryAdapter : public MetadataAdapter { + public: + ManifestEntryAdapter() = default; + virtual ~ManifestEntryAdapter() = default; + + virtual Status Append(const ManifestEntry& entry) = 0; +}; + // \brief Implemented by different versions with different schemas to // append a list of `ManifestFile`s to an `ArrowArray`. -class ManifestFileAdapter { +class ICEBERG_EXPORT ManifestFileAdapter : public MetadataAdapter { public: ManifestFileAdapter() = default; virtual ~ManifestFileAdapter() = default; - virtual Status StartAppending() = 0; virtual Status Append(const ManifestFile& file) = 0; - virtual Result FinishAppending() = 0; - int64_t size() const { return size_; } - - protected: - ArrowArray array_; // array to append `ManifestFile`s - int64_t size_ = 0; }; } // namespace iceberg diff --git a/src/iceberg/manifest_writer.cc b/src/iceberg/manifest_writer.cc index 649b3ed66..671cae449 100644 --- a/src/iceberg/manifest_writer.cc +++ b/src/iceberg/manifest_writer.cc @@ -29,42 +29,29 @@ namespace iceberg { -/// \brief Write manifest files to a manifest list file. -class ManifestWriterImpl : public ManifestWriter { - public: - ManifestWriterImpl(std::unique_ptr writer, - std::unique_ptr adapter) - : writer_(std::move(writer)), adapter_(std::move(adapter)) {} - - Status Add(const ManifestEntry& entry) override { - if (adapter_->size() >= kBatchSize) { - ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending()); - ICEBERG_RETURN_UNEXPECTED(writer_->Write(array)); - ICEBERG_RETURN_UNEXPECTED(adapter_->StartAppending()); - } - return adapter_->Append(entry); +Status ManifestWriter::Add(const ManifestEntry& entry) { + if (adapter_->size() >= kBatchSize) { + ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending()); + ICEBERG_RETURN_UNEXPECTED(writer_->Write(array)); + ICEBERG_RETURN_UNEXPECTED(adapter_->StartAppending()); } + return adapter_->Append(entry); +} - Status AddAll(const std::vector& entries) override { - for (const auto& entry : entries) { - ICEBERG_RETURN_UNEXPECTED(Add(entry)); - } - return {}; +Status ManifestWriter::AddAll(const std::vector& entries) { + for (const auto& entry : entries) { + ICEBERG_RETURN_UNEXPECTED(Add(entry)); } + return {}; +} - Status Close() override { - if (adapter_->size() > 0) { - ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending()); - ICEBERG_RETURN_UNEXPECTED(writer_->Write(array)); - } - return {}; +Status ManifestWriter::Close() { + if (adapter_->size() > 0) { + ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending()); + ICEBERG_RETURN_UNEXPECTED(writer_->Write(array)); } - - private: - static constexpr int64_t kBatchSize = 1024; - std::unique_ptr writer_; - std::unique_ptr adapter_; -}; + return {}; +} Result> OpenFileWriter(std::string_view location, const std::shared_ptr schema, @@ -89,7 +76,7 @@ Result> ManifestWriter::MakeV1Writer( ICEBERG_ASSIGN_OR_RAISE(auto writer, OpenFileWriter(manifest_location, schema, std::move(file_io))); auto adapter = std::make_unique(snapshot_id, std::move(schema)); - return std::make_unique(std::move(writer), std::move(adapter)); + return std::make_unique(std::move(writer), std::move(adapter)); } Result> ManifestWriter::MakeV2Writer( @@ -104,7 +91,7 @@ Result> ManifestWriter::MakeV2Writer( ICEBERG_ASSIGN_OR_RAISE(auto writer, OpenFileWriter(manifest_location, schema, std::move(file_io))); auto adapter = std::make_unique(snapshot_id, std::move(schema)); - return std::make_unique(std::move(writer), std::move(adapter)); + return std::make_unique(std::move(writer), std::move(adapter)); } Result> ManifestWriter::MakeV3Writer( @@ -121,45 +108,32 @@ Result> ManifestWriter::MakeV3Writer( OpenFileWriter(manifest_location, schema, std::move(file_io))); auto adapter = std::make_unique(snapshot_id, first_row_id, std::move(schema)); - return std::make_unique(std::move(writer), std::move(adapter)); + return std::make_unique(std::move(writer), std::move(adapter)); } -/// \brief Write manifest files to a manifest list file. -class ManifestListWriterImpl : public ManifestListWriter { - public: - ManifestListWriterImpl(std::unique_ptr writer, - std::unique_ptr adapter) - : writer_(std::move(writer)), adapter_(std::move(adapter)) {} - - Status Add(const ManifestFile& file) override { - if (adapter_->size() >= kBatchSize) { - ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending()); - ICEBERG_RETURN_UNEXPECTED(writer_->Write(array)); - ICEBERG_RETURN_UNEXPECTED(adapter_->StartAppending()); - } - return adapter_->Append(file); +Status ManifestListWriter::Add(const ManifestFile& file) { + if (adapter_->size() >= kBatchSize) { + ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending()); + ICEBERG_RETURN_UNEXPECTED(writer_->Write(array)); + ICEBERG_RETURN_UNEXPECTED(adapter_->StartAppending()); } + return adapter_->Append(file); +} - Status AddAll(const std::vector& files) override { - for (const auto& file : files) { - ICEBERG_RETURN_UNEXPECTED(Add(file)); - } - return {}; +Status ManifestListWriter::AddAll(const std::vector& files) { + for (const auto& file : files) { + ICEBERG_RETURN_UNEXPECTED(Add(file)); } + return {}; +} - Status Close() override { - if (adapter_->size() > 0) { - ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending()); - ICEBERG_RETURN_UNEXPECTED(writer_->Write(array)); - } - return {}; +Status ManifestListWriter::Close() { + if (adapter_->size() > 0) { + ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending()); + ICEBERG_RETURN_UNEXPECTED(writer_->Write(array)); } - - private: - static constexpr int64_t kBatchSize = 1024; - std::unique_ptr writer_; - std::unique_ptr adapter_; -}; + return {}; +} Result> ManifestListWriter::MakeV1Writer( int64_t snapshot_id, std::optional parent_snapshot_id, @@ -172,7 +146,7 @@ Result> ManifestListWriter::MakeV1Writer( auto writer, OpenFileWriter(manifest_list_location, schema, std::move(file_io))); auto adapter = std::make_unique(snapshot_id, parent_snapshot_id, std::move(schema)); - return std::make_unique(std::move(writer), std::move(adapter)); + return std::make_unique(std::move(writer), std::move(adapter)); } Result> ManifestListWriter::MakeV2Writer( @@ -187,7 +161,7 @@ Result> ManifestListWriter::MakeV2Writer( auto writer, OpenFileWriter(manifest_list_location, schema, std::move(file_io))); auto adapter = std::make_unique( snapshot_id, parent_snapshot_id, sequence_number, std::move(schema)); - return std::make_unique(std::move(writer), std::move(adapter)); + return std::make_unique(std::move(writer), std::move(adapter)); } Result> ManifestListWriter::MakeV3Writer( @@ -202,7 +176,7 @@ Result> ManifestListWriter::MakeV3Writer( auto writer, OpenFileWriter(manifest_list_location, schema, std::move(file_io))); auto adapter = std::make_unique( snapshot_id, parent_snapshot_id, sequence_number, first_row_id, std::move(schema)); - return std::make_unique(std::move(writer), std::move(adapter)); + return std::make_unique(std::move(writer), std::move(adapter)); } } // namespace iceberg diff --git a/src/iceberg/manifest_writer.h b/src/iceberg/manifest_writer.h index 51c246b26..fc3cffb66 100644 --- a/src/iceberg/manifest_writer.h +++ b/src/iceberg/manifest_writer.h @@ -27,6 +27,7 @@ #include "iceberg/file_writer.h" #include "iceberg/iceberg_export.h" +#include "iceberg/manifest_adapter.h" #include "iceberg/type_fwd.h" namespace iceberg { @@ -34,20 +35,24 @@ namespace iceberg { /// \brief Write manifest entries to a manifest file. class ICEBERG_EXPORT ManifestWriter { public: + ManifestWriter(std::unique_ptr writer, + std::unique_ptr adapter) + : writer_(std::move(writer)), adapter_(std::move(adapter)) {} + virtual ~ManifestWriter() = default; /// \brief Write manifest entry to file. /// \param entry Manifest entry to write. /// \return Status::OK() if entry was written successfully - virtual Status Add(const ManifestEntry& entry) = 0; + Status Add(const ManifestEntry& entry); /// \brief Write manifest entries to file. /// \param entries Manifest entries to write. /// \return Status::OK() if all entries were written successfully - virtual Status AddAll(const std::vector& entries) = 0; + Status AddAll(const std::vector& entries); /// \brief Close writer and flush to storage. - virtual Status Close() = 0; + Status Close(); /// \brief Creates a writer for a manifest file. /// \param snapshot_id ID of the snapshot. @@ -77,25 +82,34 @@ class ICEBERG_EXPORT ManifestWriter { std::optional snapshot_id, std::optional first_row_id, std::string_view manifest_location, std::shared_ptr file_io, std::shared_ptr partition_schema); + + private: + static constexpr int64_t kBatchSize = 1024; + std::unique_ptr writer_; + std::unique_ptr adapter_; }; /// \brief Write manifest files to a manifest list file. class ICEBERG_EXPORT ManifestListWriter { public: + ManifestListWriter(std::unique_ptr writer, + std::unique_ptr adapter) + : writer_(std::move(writer)), adapter_(std::move(adapter)) {} + virtual ~ManifestListWriter() = default; /// \brief Write manifest file to manifest list file. /// \param file Manifest file to write. /// \return Status::OK() if file was written successfully - virtual Status Add(const ManifestFile& file) = 0; + Status Add(const ManifestFile& file); /// \brief Write manifest file list to manifest list file. /// \param files Manifest file list to write. /// \return Status::OK() if all files were written successfully - virtual Status AddAll(const std::vector& files) = 0; + Status AddAll(const std::vector& files); /// \brief Close writer and flush to storage. - virtual Status Close() = 0; + Status Close(); /// \brief Creates a writer for the v1 manifest list. /// \param snapshot_id ID of the snapshot. @@ -131,6 +145,11 @@ class ICEBERG_EXPORT ManifestListWriter { int64_t snapshot_id, std::optional parent_snapshot_id, int64_t sequence_number, std::optional first_row_id, std::string_view manifest_list_location, std::shared_ptr file_io); + + private: + static constexpr int64_t kBatchSize = 1024; + std::unique_ptr writer_; + std::unique_ptr adapter_; }; } // namespace iceberg From f8072cbab37c474baca845c5ad7b70e6f9ed7f4f Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Thu, 21 Aug 2025 16:15:31 +0800 Subject: [PATCH 10/14] fix cpplint --- src/iceberg/manifest_adapter.h | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/iceberg/manifest_adapter.h b/src/iceberg/manifest_adapter.h index 69bbecc26..534074a8c 100644 --- a/src/iceberg/manifest_adapter.h +++ b/src/iceberg/manifest_adapter.h @@ -30,10 +30,10 @@ namespace iceberg { // \brief Base class of adapter for v1v2v3v4 metadata. -class ICEBERG_EXPORT MetadataAdapter { +class ICEBERG_EXPORT ManifestAdapter { public: - MetadataAdapter() = default; - virtual ~MetadataAdapter() = default; + ManifestAdapter() = default; + virtual ~ManifestAdapter() = default; virtual Status StartAppending() = 0; virtual Result FinishAppending() = 0; @@ -46,20 +46,20 @@ class ICEBERG_EXPORT MetadataAdapter { // \brief Implemented by different versions with different schemas to // append a list of `ManifestEntry`s to an `ArrowArray`. -class ICEBERG_EXPORT ManifestEntryAdapter : public MetadataAdapter { +class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter { public: ManifestEntryAdapter() = default; - virtual ~ManifestEntryAdapter() = default; + ~ManifestEntryAdapter() override = default; virtual Status Append(const ManifestEntry& entry) = 0; }; // \brief Implemented by different versions with different schemas to // append a list of `ManifestFile`s to an `ArrowArray`. -class ICEBERG_EXPORT ManifestFileAdapter : public MetadataAdapter { +class ICEBERG_EXPORT ManifestFileAdapter : public ManifestAdapter { public: ManifestFileAdapter() = default; - virtual ~ManifestFileAdapter() = default; + ~ManifestFileAdapter() override = default; virtual Status Append(const ManifestFile& file) = 0; }; From bdc2300862850954b94953531f0cf7dac834060e Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Tue, 26 Aug 2025 14:08:49 +0800 Subject: [PATCH 11/14] fix comments --- src/iceberg/manifest_adapter.h | 5 ++--- src/iceberg/manifest_writer.cc | 8 ++++---- src/iceberg/v1_metadata.h | 2 -- src/iceberg/v2_metadata.h | 2 -- src/iceberg/v3_metadata.h | 2 -- 5 files changed, 6 insertions(+), 13 deletions(-) diff --git a/src/iceberg/manifest_adapter.h b/src/iceberg/manifest_adapter.h index 534074a8c..2ffe51be3 100644 --- a/src/iceberg/manifest_adapter.h +++ b/src/iceberg/manifest_adapter.h @@ -23,13 +23,12 @@ /// Base class of adapter for v1v2v3v4 metadata. #include "iceberg/arrow_c_data.h" -#include "iceberg/manifest_entry.h" -#include "iceberg/manifest_list.h" #include "iceberg/result.h" +#include "iceberg/type_fwd.h" namespace iceberg { -// \brief Base class of adapter for v1v2v3v4 metadata. +// \brief Base class to append manifest metadata to Arrow array. class ICEBERG_EXPORT ManifestAdapter { public: ManifestAdapter() = default; diff --git a/src/iceberg/manifest_writer.cc b/src/iceberg/manifest_writer.cc index 671cae449..27fd3f767 100644 --- a/src/iceberg/manifest_writer.cc +++ b/src/iceberg/manifest_writer.cc @@ -54,13 +54,13 @@ Status ManifestWriter::Close() { } Result> OpenFileWriter(std::string_view location, - const std::shared_ptr schema, + std::shared_ptr schema, std::shared_ptr file_io) { ICEBERG_ASSIGN_OR_RAISE( auto writer, - WriterFactoryRegistry::Open( - FileFormatType::kAvro, - {.path = std::string(location), .schema = schema, .io = std::move(file_io)})); + WriterFactoryRegistry::Open(FileFormatType::kAvro, {.path = std::string(location), + .schema = std::move(schema), + .io = std::move(file_io)})); return writer; } diff --git a/src/iceberg/v1_metadata.h b/src/iceberg/v1_metadata.h index 57be46ba7..5805593d5 100644 --- a/src/iceberg/v1_metadata.h +++ b/src/iceberg/v1_metadata.h @@ -22,8 +22,6 @@ /// \file iceberg/v1_metadata.h #include "iceberg/manifest_adapter.h" -#include "iceberg/manifest_entry.h" -#include "iceberg/manifest_list.h" namespace iceberg { diff --git a/src/iceberg/v2_metadata.h b/src/iceberg/v2_metadata.h index e0344802b..bbf2e37e6 100644 --- a/src/iceberg/v2_metadata.h +++ b/src/iceberg/v2_metadata.h @@ -22,8 +22,6 @@ /// \file iceberg/v2_metadata.h #include "iceberg/manifest_adapter.h" -#include "iceberg/manifest_entry.h" -#include "iceberg/manifest_list.h" namespace iceberg { diff --git a/src/iceberg/v3_metadata.h b/src/iceberg/v3_metadata.h index c9a137915..4f5fd2701 100644 --- a/src/iceberg/v3_metadata.h +++ b/src/iceberg/v3_metadata.h @@ -22,8 +22,6 @@ /// \file iceberg/v3_metadata.h #include "iceberg/manifest_adapter.h" -#include "iceberg/manifest_entry.h" -#include "iceberg/manifest_list.h" namespace iceberg { From 6be59655676c996a927f24853d232bf9873016ec Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Tue, 26 Aug 2025 14:35:46 +0800 Subject: [PATCH 12/14] fix cpplint --- src/iceberg/v1_metadata.h | 2 ++ src/iceberg/v2_metadata.h | 2 ++ src/iceberg/v3_metadata.h | 2 ++ 3 files changed, 6 insertions(+) diff --git a/src/iceberg/v1_metadata.h b/src/iceberg/v1_metadata.h index 5805593d5..7e91da7b3 100644 --- a/src/iceberg/v1_metadata.h +++ b/src/iceberg/v1_metadata.h @@ -21,6 +21,8 @@ /// \file iceberg/v1_metadata.h +#include + #include "iceberg/manifest_adapter.h" namespace iceberg { diff --git a/src/iceberg/v2_metadata.h b/src/iceberg/v2_metadata.h index bbf2e37e6..d6ff6aa3a 100644 --- a/src/iceberg/v2_metadata.h +++ b/src/iceberg/v2_metadata.h @@ -21,6 +21,8 @@ /// \file iceberg/v2_metadata.h +#include + #include "iceberg/manifest_adapter.h" namespace iceberg { diff --git a/src/iceberg/v3_metadata.h b/src/iceberg/v3_metadata.h index 4f5fd2701..e7bcc3552 100644 --- a/src/iceberg/v3_metadata.h +++ b/src/iceberg/v3_metadata.h @@ -21,6 +21,8 @@ /// \file iceberg/v3_metadata.h +#include + #include "iceberg/manifest_adapter.h" namespace iceberg { From d2978d888a133cebb07041f05cce906d84b79205 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Wed, 27 Aug 2025 10:35:46 +0800 Subject: [PATCH 13/14] revert cpplinter --- .github/workflows/cpp-linter.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/cpp-linter.yml b/.github/workflows/cpp-linter.yml index 1100c6525..8a3d4cca5 100644 --- a/.github/workflows/cpp-linter.yml +++ b/.github/workflows/cpp-linter.yml @@ -39,7 +39,7 @@ jobs: mkdir build && cd build cmake .. -DCMAKE_EXPORT_COMPILE_COMMANDS=ON cmake --build . - - uses: cpp-linter/cpp-linter-action@d7155ea6699028b6b09b0457e26b3c5d73f0ed46 + - uses: cpp-linter/cpp-linter-action@f91c446a32ae3eb9f98fef8c9ed4c7cb613a4f8a id: linter continue-on-error: true env: From baec4f2caf065003ee79b6da2a01dcdc511d5ab1 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Wed, 27 Aug 2025 11:35:51 +0800 Subject: [PATCH 14/14] fix comments --- src/iceberg/manifest_writer.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/iceberg/manifest_writer.h b/src/iceberg/manifest_writer.h index fc3cffb66..c9ec30702 100644 --- a/src/iceberg/manifest_writer.h +++ b/src/iceberg/manifest_writer.h @@ -20,7 +20,7 @@ #pragma once /// \file iceberg/manifest_writer.h -/// Data writer interface for manifest files. +/// Data writer interface for manifest files and manifest list files. #include #include