From 5201ade1f17fb9df712424915a709926c184b7c3 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Tue, 8 Jul 2025 11:11:38 +0800 Subject: [PATCH 01/10] feat: add manifest list reader - Add manifest list reader - Integrate with avro reader - Add simple ut --- src/iceberg/CMakeLists.txt | 2 + src/iceberg/arrow_c_data.h | 41 +++ src/iceberg/manifest_list.cc | 4 +- src/iceberg/manifest_reader.cc | 36 +++ src/iceberg/manifest_reader.h | 12 +- src/iceberg/manifest_reader_internal.cc | 242 ++++++++++++++++++ src/iceberg/manifest_reader_internal.h | 51 ++++ test/CMakeLists.txt | 10 +- test/manifest_list_reader_test.cc | 133 ++++++++++ ...-2bccd69e-d642-4816-bba0-261cd9bd0d93.avro | Bin 0 -> 4628 bytes 10 files changed, 519 insertions(+), 12 deletions(-) create mode 100644 src/iceberg/manifest_reader.cc create mode 100644 src/iceberg/manifest_reader_internal.cc create mode 100644 src/iceberg/manifest_reader_internal.h create mode 100644 test/manifest_list_reader_test.cc create mode 100644 test/resources/snap-7412193043800610213-1-2bccd69e-d642-4816-bba0-261cd9bd0d93.avro diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 4fb0d5429..55906cdf6 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -43,6 +43,8 @@ set(ICEBERG_SOURCES transform.cc transform_function.cc type.cc + manifest_reader.cc + manifest_reader_internal.cc util/murmurhash3_internal.cc util/timepoint.cc util/unreachable.cc diff --git a/src/iceberg/arrow_c_data.h b/src/iceberg/arrow_c_data.h index 7a4618ecd..55d708b23 100644 --- a/src/iceberg/arrow_c_data.h +++ b/src/iceberg/arrow_c_data.h @@ -73,4 +73,45 @@ struct ArrowArray { #endif // ARROW_C_DATA_INTERFACE +#ifndef ARROW_C_STREAM_INTERFACE +# define ARROW_C_STREAM_INTERFACE + +struct ArrowArrayStream { + // Callback to get the stream type + // (will be the same for all arrays in the stream). + // + // Return value: 0 if successful, an `errno`-compatible error code otherwise. + // + // If successful, the ArrowSchema must be released independently from the stream. + int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out); + + // Callback to get the next array + // (if no error and the array is released, the stream has ended) + // + // Return value: 0 if successful, an `errno`-compatible error code otherwise. + // + // If successful, the ArrowArray must be released independently from the stream. + int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out); + + // Callback to get optional detailed error information. + // This must only be called if the last stream operation failed + // with a non-0 return code. + // + // Return value: pointer to a null-terminated character array describing + // the last error, or NULL if no description is available. + // + // The returned pointer is only valid until the next operation on this stream + // (including release). + const char* (*get_last_error)(struct ArrowArrayStream*); + + // Release callback: release the stream's own resources. + // Note that arrays returned by `get_next` must be individually released. + void (*release)(struct ArrowArrayStream*); + + // Opaque producer-specific data + void* private_data; +}; + +#endif // ARROW_C_STREAM_INTERFACE + } // extern "C" diff --git a/src/iceberg/manifest_list.cc b/src/iceberg/manifest_list.cc index e85c42140..b9907604f 100644 --- a/src/iceberg/manifest_list.cc +++ b/src/iceberg/manifest_list.cc @@ -19,9 +19,7 @@ #include "iceberg/manifest_list.h" -#include - -#include "iceberg/type.h" +#include "iceberg/schema.h" namespace iceberg { diff --git a/src/iceberg/manifest_reader.cc b/src/iceberg/manifest_reader.cc new file mode 100644 index 000000000..830da3566 --- /dev/null +++ b/src/iceberg/manifest_reader.cc @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest_reader.h" + +#include "iceberg/manifest_reader_internal.h" + +namespace iceberg { + +std::shared_ptr ManifestReader::NewReader( + std::unique_ptr reader) { + return std::make_shared(std::move(reader)); +} + +std::shared_ptr ManifestListReader::NewReader( + std::unique_ptr reader) { + return std::make_shared(std::move(reader)); +} + +} // namespace iceberg diff --git a/src/iceberg/manifest_reader.h b/src/iceberg/manifest_reader.h index 8827ec79a..4be4e50cd 100644 --- a/src/iceberg/manifest_reader.h +++ b/src/iceberg/manifest_reader.h @@ -23,7 +23,7 @@ /// Data reader interface for manifest files. #include -#include +#include #include "iceberg/file_reader.h" #include "iceberg/iceberg_export.h" @@ -34,19 +34,17 @@ namespace iceberg { /// \brief Read manifest entries from a manifest file. class ICEBERG_EXPORT ManifestReader { public: - virtual Result>> Entries() const = 0; + virtual Result>> Entries() const = 0; - private: - std::unique_ptr reader_; + static std::shared_ptr NewReader(std::unique_ptr reader); }; /// \brief Read manifest files from a manifest list file. class ICEBERG_EXPORT ManifestListReader { public: - virtual Result>> Files() const = 0; + virtual Result>> Files() const = 0; - private: - std::unique_ptr reader_; + static std::shared_ptr NewReader(std::unique_ptr reader); }; } // namespace iceberg diff --git a/src/iceberg/manifest_reader_internal.cc b/src/iceberg/manifest_reader_internal.cc new file mode 100644 index 000000000..fe3780e09 --- /dev/null +++ b/src/iceberg/manifest_reader_internal.cc @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "manifest_reader_internal.h" + +#include + +#include + +#include "iceberg/manifest_entry.h" +#include "iceberg/manifest_list.h" +#include "iceberg/schema.h" +#include "iceberg/schema_internal.h" +#include "iceberg/type.h" + +namespace iceberg { + +#define ARROW_RETURN_IF_NOT_OK(status, error) \ + if (status != NANOARROW_OK) { \ + return InvalidArrowData("NanoArrow error: {}", error.message); \ + } + +Result>> ParseManifestListEntry( + ArrowSchema* schema, ArrowArray* array_in, const Schema& iceberg_schema) { + if (schema->n_children != array_in->n_children) { + return InvalidArgument("Columns size not match between schema:{} and array:{}", + schema->n_children, array_in->n_children); + } + if (iceberg_schema.fields().size() != array_in->n_children) { + return InvalidArgument("Columns size not match between schema:{} and array:{}", + iceberg_schema.fields().size(), array_in->n_children); + } + + ArrowError error; + ArrowArrayView array_view; + auto status = ArrowArrayViewInitFromSchema(&array_view, schema, &error); + ARROW_RETURN_IF_NOT_OK(status, error); + status = ArrowArrayViewSetArray(&array_view, array_in, &error); + ARROW_RETURN_IF_NOT_OK(status, error); + status = ArrowArrayViewValidate(&array_view, NANOARROW_VALIDATION_LEVEL_FULL, &error); + ARROW_RETURN_IF_NOT_OK(status, error); + + std::vector> manifest_files; + manifest_files.resize(array_in->length); + for (auto& manifest_file : manifest_files) { + manifest_file = std::make_unique(); + } + + for (int64_t idx = 0; idx < array_in->n_children; idx++) { + const auto& field = iceberg_schema.GetFieldByIndex(idx); + if (!field.has_value()) { + ArrowArrayRelease(array_in); + ArrowArrayViewReset(&array_view); + return InvalidArgument("Field not found in schema: {}", idx); + } + auto field_name = field.value().get().name(); + auto view_of_column = array_view.children[idx]; + +#define PARSE_PRIMITIVE_FIELD(field_name) \ + for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { \ + if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { \ + auto value = ArrowArrayViewGetIntUnsafe(view_of_column, row_idx); \ + manifest_files[row_idx]->field_name = value; \ + } \ + } + + if (field_name == ManifestFile::kManifestPath.name()) { + for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { + if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { + auto value = ArrowArrayViewGetStringUnsafe(view_of_column, row_idx); + std::string path_str(value.data, value.size_bytes); + manifest_files[row_idx]->manifest_path = path_str; + } + } + } else if (field_name == ManifestFile::kManifestLength.name()) { + PARSE_PRIMITIVE_FIELD(manifest_length); + } else if (field_name == ManifestFile::kPartitionSpecId.name()) { + PARSE_PRIMITIVE_FIELD(partition_spec_id); + } else if (field_name == ManifestFile::kContent.name()) { + for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { + if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { + auto value = ArrowArrayViewGetIntUnsafe(view_of_column, row_idx); + manifest_files[row_idx]->content = static_cast(value); + } + } + } else if (field_name == ManifestFile::kSequenceNumber.name()) { + PARSE_PRIMITIVE_FIELD(sequence_number); + } else if (field_name == ManifestFile::kMinSequenceNumber.name()) { + PARSE_PRIMITIVE_FIELD(min_sequence_number); + } else if (field_name == ManifestFile::kAddedSnapshotId.name()) { + PARSE_PRIMITIVE_FIELD(added_snapshot_id); + } else if (field_name == ManifestFile::kAddedFilesCount.name()) { + PARSE_PRIMITIVE_FIELD(added_files_count); + } else if (field_name == ManifestFile::kExistingFilesCount.name()) { + PARSE_PRIMITIVE_FIELD(existing_files_count); + } else if (field_name == ManifestFile::kDeletedFilesCount.name()) { + PARSE_PRIMITIVE_FIELD(deleted_files_count); + } else if (field_name == ManifestFile::kAddedRowsCount.name()) { + PARSE_PRIMITIVE_FIELD(added_rows_count); + } else if (field_name == ManifestFile::kExistingRowsCount.name()) { + PARSE_PRIMITIVE_FIELD(existing_rows_count); + } else if (field_name == ManifestFile::kDeletedRowsCount.name()) { + PARSE_PRIMITIVE_FIELD(deleted_rows_count); + } else if (field_name == ManifestFile::kPartitions.name()) { + // view_of_column is list> + auto manifest_count = view_of_column->length; + if (view_of_column->storage_type != ArrowType::NANOARROW_TYPE_LIST) { + return InvalidArgument("partitions field should be a list."); + } + auto view_of_list_iterm = view_of_column->children[0]; + // view_of_list_iterm is struct + if (view_of_list_iterm->storage_type != ArrowType::NANOARROW_TYPE_STRUCT) { + return InvalidArgument("partitions list field should be a list."); + } + if (view_of_list_iterm->n_children != 4) { + return InvalidArgument("PartitionFieldSummary should have 4 fields."); + } + if (view_of_list_iterm->children[0]->storage_type != + ArrowType::NANOARROW_TYPE_BOOL) { + return InvalidArgument("contains_null should have be bool type column."); + } + auto contains_null = view_of_list_iterm->children[0]; + if (view_of_list_iterm->children[1]->storage_type != + ArrowType::NANOARROW_TYPE_BOOL) { + return InvalidArgument("contains_nan should have be bool type column."); + } + auto contains_nan = view_of_list_iterm->children[1]; + if (view_of_list_iterm->children[2]->storage_type != + ArrowType::NANOARROW_TYPE_BINARY) { + return InvalidArgument("lower_bound should have be binary type column."); + } + auto lower_bound_list = view_of_list_iterm->children[2]; + if (view_of_list_iterm->children[3]->storage_type != + ArrowType::NANOARROW_TYPE_BINARY) { + return InvalidArgument("upper_bound should have be binary type column."); + } + auto upper_bound_list = view_of_list_iterm->children[3]; + for (int64_t manifest_idx = 0; manifest_idx < manifest_count; manifest_idx++) { + auto offset = ArrowArrayViewListChildOffset(view_of_column, manifest_idx); + auto next_offset = + ArrowArrayViewListChildOffset(view_of_column, manifest_idx + 1); + // partitions from offset to next_offset belongs to manifest_idx + auto& manifest_file = manifest_files[manifest_idx]; + for (int64_t partition_idx = offset; partition_idx < next_offset; + partition_idx++) { + PartitionFieldSummary partition_field_summary; + if (!ArrowArrayViewIsNull(contains_null, partition_idx)) { + partition_field_summary.contains_null = + ArrowArrayViewGetIntUnsafe(contains_null, partition_idx); + } + if (!ArrowArrayViewIsNull(contains_nan, partition_idx)) { + partition_field_summary.contains_nan = + ArrowArrayViewGetIntUnsafe(contains_nan, partition_idx); + } + if (!ArrowArrayViewIsNull(lower_bound_list, partition_idx)) { + auto buffer = ArrowArrayViewGetBytesUnsafe(lower_bound_list, partition_idx); + partition_field_summary.lower_bound = std::vector( + buffer.data.as_char, buffer.data.as_char + buffer.size_bytes); + } + if (!ArrowArrayViewIsNull(upper_bound_list, partition_idx)) { + auto buffer = ArrowArrayViewGetBytesUnsafe(upper_bound_list, partition_idx); + partition_field_summary.upper_bound = std::vector( + buffer.data.as_char, buffer.data.as_char + buffer.size_bytes); + } + + manifest_file->partitions.emplace_back(partition_field_summary); + } + } + } else if (field_name == ManifestFile::kKeyMetadata.name()) { + for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { + if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { + auto value = ArrowArrayViewGetUIntUnsafe(view_of_column, row_idx); + manifest_files[row_idx]->key_metadata.push_back(value); + } + } + } else if (field_name == ManifestFile::kFirstRowId.name()) { + PARSE_PRIMITIVE_FIELD(first_row_id); + } else { + return InvalidArgument("Unsupported type: {}", field_name); + } + } +#undef PARSE_PRIMITIVE_FIELD + ArrowArrayRelease(array_in); + ArrowArrayViewReset(&array_view); + return manifest_files; +} // namespace iceberg + +Result>> ManifestReaderImpl::Entries() const { + return {}; +} + +Result>> ManifestListReaderImpl::Files() const { + std::vector> manifest_files; + auto arrow_schema = reader_->Schema(); + if (!arrow_schema.has_value()) { + return InvalidArgument("Get schema failed in reader:{}", + arrow_schema.error().message); + } + auto schema = FromArrowSchema(arrow_schema.value(), std::nullopt); + if (!schema.has_value()) { + return InvalidArgument("Parse iceberg schema failed:{}", schema.error().message); + } + while (true) { + auto result = reader_->Next(); + if (!result.has_value()) { + return InvalidArgument("Failed to read manifest list entry:{}", + result.error().message); + } + if (result.value().has_value()) { + auto parse_result = ParseManifestListEntry( + &arrow_schema.value(), &result.value().value(), *schema.value()); + if (!parse_result.has_value()) { + return InvalidArgument("Failed to parse manifest list entry:{}", + parse_result.error().message); + } + manifest_files.insert(manifest_files.end(), + std::make_move_iterator(parse_result.value().begin()), + std::make_move_iterator(parse_result.value().end())); + } else { + break; + } + } + return manifest_files; +} + +} // namespace iceberg diff --git a/src/iceberg/manifest_reader_internal.h b/src/iceberg/manifest_reader_internal.h new file mode 100644 index 000000000..187f37e99 --- /dev/null +++ b/src/iceberg/manifest_reader_internal.h @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/internal/manifest_reader_internal.h +/// Reader implement for manifest files. + +#include "iceberg/manifest_reader.h" + +namespace iceberg { + +/// \brief Read manifest entries from a manifest file. +class ManifestReaderImpl : public ManifestReader { + public: + ManifestReaderImpl(std::unique_ptr reader) : reader_(std::move(reader)) {} + + Result>> Entries() const override; + + private: + std::unique_ptr reader_; +}; + +/// \brief Read manifest files from a manifest list file. +class ManifestListReaderImpl : public ManifestListReader { + public: + ManifestListReaderImpl(std::unique_ptr reader) : reader_(std::move(reader)) {} + + Result>> Files() const override; + + private: + std::unique_ptr reader_; +}; + +} // namespace iceberg diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 62e6321a4..57bc971c7 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -78,10 +78,16 @@ add_test(NAME util_test COMMAND util_test) if(ICEBERG_BUILD_BUNDLE) add_executable(avro_test) - target_sources(avro_test PRIVATE avro_data_test.cc avro_test.cc avro_schema_test.cc - avro_stream_test.cc) + target_sources(avro_test + PRIVATE avro_data_test.cc + avro_test.cc + avro_schema_test.cc + avro_stream_test.cc + manifest_list_reader_test.cc + test_common.cc) target_link_libraries(avro_test PRIVATE iceberg_bundle_static GTest::gtest_main GTest::gmock) + target_include_directories(avro_test PRIVATE "${CMAKE_BINARY_DIR}") add_test(NAME avro_test COMMAND avro_test) add_executable(arrow_test) diff --git a/test/manifest_list_reader_test.cc b/test/manifest_list_reader_test.cc new file mode 100644 index 000000000..e57f2f9b9 --- /dev/null +++ b/test/manifest_list_reader_test.cc @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include + +#include "iceberg/arrow/arrow_fs_file_io.h" +#include "iceberg/avro/avro_reader.h" +#include "iceberg/manifest_list.h" +#include "iceberg/manifest_reader.h" +#include "iceberg/schema.h" +#include "matchers.h" +#include "temp_file_test_base.h" +#include "test_common.h" + +namespace iceberg { + +class ManifestListReaderTest : public TempFileTestBase { + protected: + static void SetUpTestSuite() { avro::AvroReader::Register(); } + + void SetUp() override { + TempFileTestBase::SetUp(); + local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>(); + file_io_ = std::make_shared(local_fs_); + } + + std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_; + std::shared_ptr file_io_; +}; + +TEST_F(ManifestListReaderTest, BasicTest) { + std::vector fields(ManifestFile::Type().fields().begin(), + ManifestFile::Type().fields().end()); + auto schema = std::make_shared(fields); + std::string path = GetResourcePath( + "snap-7412193043800610213-1-2bccd69e-d642-4816-bba0-261cd9bd0d93.avro"); + auto reader_result = ReaderFactoryRegistry::Open( + FileFormatType::kAvro, {.path = path, .io = file_io_, .projection = schema}); + ASSERT_THAT(reader_result, IsOk()); + auto reader = std::move(reader_result.value()); + auto manifest_reader = ManifestListReader::NewReader(std::move(reader)); + auto read_result = manifest_reader->Files(); + ASSERT_EQ(read_result.has_value(), true); + ASSERT_EQ(read_result.value().size(), 4); + std::string test_dir_prefix = "/tmp/db/db/iceberg_test/metadata/"; + for (const auto& file : read_result.value()) { + auto manifest_path = file->manifest_path.substr(test_dir_prefix.size()); + if (manifest_path == "2bccd69e-d642-4816-bba0-261cd9bd0d93-m0.avro") { + ASSERT_EQ(file->added_snapshot_id, 7412193043800610213); + ASSERT_EQ(file->manifest_length, 7433); + ASSERT_EQ(file->sequence_number, 4); + ASSERT_EQ(file->min_sequence_number, 4); + ASSERT_EQ(file->partitions.size(), 1); + const auto& partition = file->partitions[0]; + ASSERT_EQ(partition.contains_null, false); + ASSERT_EQ(partition.contains_nan.value(), false); + ASSERT_EQ(partition.lower_bound.value(), + std::vector({'x', ';', 0x07, 0x00})); + ASSERT_EQ(partition.upper_bound.value(), + std::vector({'x', ';', 0x07, 0x00})); + } else if (manifest_path == "9b6ffacd-ef10-4abf-a89c-01c733696796-m0.avro") { + ASSERT_EQ(file->added_snapshot_id, 5485972788975780755); + ASSERT_EQ(file->manifest_length, 7431); + ASSERT_EQ(file->sequence_number, 3); + ASSERT_EQ(file->min_sequence_number, 3); + ASSERT_EQ(file->partitions.size(), 1); + const auto& partition = file->partitions[0]; + ASSERT_EQ(partition.contains_null, false); + ASSERT_EQ(partition.contains_nan.value(), false); + ASSERT_EQ(partition.lower_bound.value(), + std::vector({'(', 0x19, 0x07, 0x00})); + ASSERT_EQ(partition.upper_bound.value(), + std::vector({'(', 0x19, 0x07, 0x00})); + } else if (manifest_path == "2541e6b5-4923-4bd5-886d-72c6f7228400-m0.avro") { + ASSERT_EQ(file->added_snapshot_id, 1679468743751242972); + ASSERT_EQ(file->manifest_length, 7433); + ASSERT_EQ(file->sequence_number, 2); + ASSERT_EQ(file->min_sequence_number, 2); + ASSERT_EQ(file->partitions.size(), 1); + const auto& partition = file->partitions[0]; + ASSERT_EQ(partition.contains_null, false); + ASSERT_EQ(partition.contains_nan.value(), false); + ASSERT_EQ(partition.lower_bound.value(), + std::vector({0xd0, 0xd4, 0x06, 0x00})); + ASSERT_EQ(partition.upper_bound.value(), + std::vector({0xd0, 0xd4, 0x06, 0x00})); + } else if (manifest_path == "3118c801-d2e0-4df6-8c7a-7d4eaade32f8-m0.avro") { + ASSERT_EQ(file->added_snapshot_id, 1579605567338877265); + ASSERT_EQ(file->manifest_length, 7431); + ASSERT_EQ(file->sequence_number, 1); + ASSERT_EQ(file->min_sequence_number, 1); + ASSERT_EQ(file->partitions.size(), 1); + const auto& partition = file->partitions[0]; + ASSERT_EQ(partition.contains_null, false); + ASSERT_EQ(partition.contains_nan.value(), false); + ASSERT_EQ(partition.lower_bound.value(), + std::vector({0xb8, 0xd4, 0x06, 0x00})); + ASSERT_EQ(partition.upper_bound.value(), + std::vector({0xb8, 0xd4, 0x06, 0x00})); + } else { + ASSERT_TRUE(false) << "Unexpected manifest file: " << manifest_path; + } + ASSERT_EQ(file->partition_spec_id, 0); + ASSERT_EQ(file->content, ManifestFile::Content::kData); + ASSERT_EQ(file->added_files_count, 1); + ASSERT_EQ(file->existing_files_count, 0); + ASSERT_EQ(file->deleted_files_count, 0); + ASSERT_EQ(file->added_rows_count, 1); + ASSERT_EQ(file->existing_rows_count, 0); + ASSERT_EQ(file->deleted_rows_count, 0); + ASSERT_EQ(file->key_metadata.empty(), true); + } +} + +} // namespace iceberg diff --git a/test/resources/snap-7412193043800610213-1-2bccd69e-d642-4816-bba0-261cd9bd0d93.avro b/test/resources/snap-7412193043800610213-1-2bccd69e-d642-4816-bba0-261cd9bd0d93.avro new file mode 100644 index 0000000000000000000000000000000000000000..c2299391756f006abb61924138db9c15c64e7595 GIT binary patch literal 4628 zcmbVPTa46H7#3UvF#)59iGUs}nizN2b}l=!8<9H+cVRIS78uU7XLgF6c4$wx4!di( zX+$B}=n~;YgG5Xq5%mcq5fN8SAQD6aF(Dd6ffx}VR5TJjZKvm)&TVIWoX+X@{pb5{ zUzeuWj~F)*_LxSiC1+6%{`5evfD3sfbOl z&E?n{(z|(d&ET+C7#M264G>uBMg&wHqzf2XLAXU=i383<6RU1KDJzfU6k(=;t2(Ap zcalHB{5in3kOHs)98<-m@kTgU+0Zfa-Y=@agU$B;w1ANTIEz3aZ3hsiDGV*d^`Sj1={6&MzjDo_=dPgWqnu&{vuRZ;nT(av|rgWsiHf^KV?K*mn7 zYoTev0{NRXdydRH`n+^FnsG51)O2Vjx^n=kx<#7Q1rBi2hM^&-lL!7#Tw&T|T4ey# z3;y`Zx=l%%N7Dd}6ZAmME*(1UTm>>R4?27&A>dpy94XW*a4IMRZ4C<@ZcO1pk>oD4;|rzSuP|_yH@c@A#;1Fmw?vGBB`H|h4KoBQydr_bI>}E+$PrZ0 zCscf8ZiWUgv1C(y8iuF(a1Mkqu=EDuN-|;Z^(5w#J$Z%$)k^^FjCM(c--iNV#yu?7k7LFE77-8XO)obbN02y=U**yQleyzH{SGAAW6cXziBFM^k>@F-bFa>Bkn0 zeP;dFZ!g*X{=^r|OxCa8V=mtK<43#a$q)OE z{<>|Zm|B_GF+S0|cNU)t9E?=L+EKfgM3&9 zze~TfblKTm=LeUo5AWS{>Ko_G9RqFW1|D)&?OV9#^^?wtylMTuVdKr4N6w@Vj_PVT IpDNYrKc%`mrvLx| literal 0 HcmV?d00001 From d283d3848cd9184699be3dcfee2f69e97ff007e6 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Tue, 8 Jul 2025 11:32:31 +0800 Subject: [PATCH 02/10] fix manifest_list build issue --- src/iceberg/manifest_reader_internal.cc | 33 +++++++++++++------------ src/iceberg/manifest_reader_internal.h | 6 +++-- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/src/iceberg/manifest_reader_internal.cc b/src/iceberg/manifest_reader_internal.cc index fe3780e09..9dc5f1455 100644 --- a/src/iceberg/manifest_reader_internal.cc +++ b/src/iceberg/manifest_reader_internal.cc @@ -72,11 +72,11 @@ Result>> ParseManifestListEntry( auto field_name = field.value().get().name(); auto view_of_column = array_view.children[idx]; -#define PARSE_PRIMITIVE_FIELD(field_name) \ +#define PARSE_PRIMITIVE_FIELD(field_name, type) \ for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { \ if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { \ auto value = ArrowArrayViewGetIntUnsafe(view_of_column, row_idx); \ - manifest_files[row_idx]->field_name = value; \ + manifest_files[row_idx]->field_name = static_cast(value); \ } \ } @@ -89,9 +89,9 @@ Result>> ParseManifestListEntry( } } } else if (field_name == ManifestFile::kManifestLength.name()) { - PARSE_PRIMITIVE_FIELD(manifest_length); + PARSE_PRIMITIVE_FIELD(manifest_length, int64_t); } else if (field_name == ManifestFile::kPartitionSpecId.name()) { - PARSE_PRIMITIVE_FIELD(partition_spec_id); + PARSE_PRIMITIVE_FIELD(partition_spec_id, int32_t); } else if (field_name == ManifestFile::kContent.name()) { for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { @@ -100,23 +100,23 @@ Result>> ParseManifestListEntry( } } } else if (field_name == ManifestFile::kSequenceNumber.name()) { - PARSE_PRIMITIVE_FIELD(sequence_number); + PARSE_PRIMITIVE_FIELD(sequence_number, int64_t); } else if (field_name == ManifestFile::kMinSequenceNumber.name()) { - PARSE_PRIMITIVE_FIELD(min_sequence_number); + PARSE_PRIMITIVE_FIELD(min_sequence_number, int64_t); } else if (field_name == ManifestFile::kAddedSnapshotId.name()) { - PARSE_PRIMITIVE_FIELD(added_snapshot_id); + PARSE_PRIMITIVE_FIELD(added_snapshot_id, int64_t); } else if (field_name == ManifestFile::kAddedFilesCount.name()) { - PARSE_PRIMITIVE_FIELD(added_files_count); + PARSE_PRIMITIVE_FIELD(added_files_count, int32_t); } else if (field_name == ManifestFile::kExistingFilesCount.name()) { - PARSE_PRIMITIVE_FIELD(existing_files_count); + PARSE_PRIMITIVE_FIELD(existing_files_count, int32_t); } else if (field_name == ManifestFile::kDeletedFilesCount.name()) { - PARSE_PRIMITIVE_FIELD(deleted_files_count); + PARSE_PRIMITIVE_FIELD(deleted_files_count, int32_t); } else if (field_name == ManifestFile::kAddedRowsCount.name()) { - PARSE_PRIMITIVE_FIELD(added_rows_count); + PARSE_PRIMITIVE_FIELD(added_rows_count, int64_t); } else if (field_name == ManifestFile::kExistingRowsCount.name()) { - PARSE_PRIMITIVE_FIELD(existing_rows_count); + PARSE_PRIMITIVE_FIELD(existing_rows_count, int64_t); } else if (field_name == ManifestFile::kDeletedRowsCount.name()) { - PARSE_PRIMITIVE_FIELD(deleted_rows_count); + PARSE_PRIMITIVE_FIELD(deleted_rows_count, int64_t); } else if (field_name == ManifestFile::kPartitions.name()) { // view_of_column is list> auto manifest_count = view_of_column->length; @@ -185,12 +185,13 @@ Result>> ParseManifestListEntry( } else if (field_name == ManifestFile::kKeyMetadata.name()) { for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { - auto value = ArrowArrayViewGetUIntUnsafe(view_of_column, row_idx); - manifest_files[row_idx]->key_metadata.push_back(value); + auto buffer = ArrowArrayViewGetBytesUnsafe(view_of_column, row_idx); + manifest_files[row_idx]->key_metadata = std::vector( + buffer.data.as_char, buffer.data.as_char + buffer.size_bytes); } } } else if (field_name == ManifestFile::kFirstRowId.name()) { - PARSE_PRIMITIVE_FIELD(first_row_id); + PARSE_PRIMITIVE_FIELD(first_row_id, int64_t); } else { return InvalidArgument("Unsupported type: {}", field_name); } diff --git a/src/iceberg/manifest_reader_internal.h b/src/iceberg/manifest_reader_internal.h index 187f37e99..70fb484e4 100644 --- a/src/iceberg/manifest_reader_internal.h +++ b/src/iceberg/manifest_reader_internal.h @@ -29,7 +29,8 @@ namespace iceberg { /// \brief Read manifest entries from a manifest file. class ManifestReaderImpl : public ManifestReader { public: - ManifestReaderImpl(std::unique_ptr reader) : reader_(std::move(reader)) {} + explicit ManifestReaderImpl(std::unique_ptr reader) + : reader_(std::move(reader)) {} Result>> Entries() const override; @@ -40,7 +41,8 @@ class ManifestReaderImpl : public ManifestReader { /// \brief Read manifest files from a manifest list file. class ManifestListReaderImpl : public ManifestListReader { public: - ManifestListReaderImpl(std::unique_ptr reader) : reader_(std::move(reader)) {} + explicit ManifestListReaderImpl(std::unique_ptr reader) + : reader_(std::move(reader)) {} Result>> Files() const override; From f74159f6702a69caab87abe0e1e6098b5d741c93 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Tue, 8 Jul 2025 15:02:30 +0800 Subject: [PATCH 03/10] fix arrow memory leak --- src/iceberg/CMakeLists.txt | 1 + src/iceberg/arrow_struct_guard.cc | 48 +++++++++++++++++++ src/iceberg/arrow_struct_guard.h | 64 +++++++++++++++++++++++++ src/iceberg/manifest_reader_internal.cc | 9 ++-- 4 files changed, 118 insertions(+), 4 deletions(-) create mode 100644 src/iceberg/arrow_struct_guard.cc create mode 100644 src/iceberg/arrow_struct_guard.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 55906cdf6..66a4c061c 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -45,6 +45,7 @@ set(ICEBERG_SOURCES type.cc manifest_reader.cc manifest_reader_internal.cc + arrow_struct_guard.cc util/murmurhash3_internal.cc util/timepoint.cc util/unreachable.cc diff --git a/src/iceberg/arrow_struct_guard.cc b/src/iceberg/arrow_struct_guard.cc new file mode 100644 index 000000000..0d808ca11 --- /dev/null +++ b/src/iceberg/arrow_struct_guard.cc @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/arrow_struct_guard.h" + +namespace iceberg::internal { + +ArrowArrayGuard::~ArrowArrayGuard() { + if (array_ != nullptr) { + ArrowArrayRelease(array_); + } +} + +ArrowSchemaGuard::~ArrowSchemaGuard() { + if (schema_ != nullptr) { + ArrowSchemaRelease(schema_); + } +} + +ArrowArrayViewGuard::~ArrowArrayViewGuard() { + if (view_ != nullptr) { + ArrowArrayViewReset(view_); + } +} + +ArrowArrayBufferGuard::~ArrowArrayBufferGuard() { + if (buffer_ != nullptr) { + ArrowBufferReset(buffer_); + } +} + +} // namespace iceberg::internal diff --git a/src/iceberg/arrow_struct_guard.h b/src/iceberg/arrow_struct_guard.h new file mode 100644 index 000000000..4e09c7a3a --- /dev/null +++ b/src/iceberg/arrow_struct_guard.h @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include "iceberg/arrow_c_data.h" + +namespace iceberg::internal { + +class ArrowArrayGuard { + public: + ArrowArrayGuard(ArrowArray* array) : array_(array) {} + ~ArrowArrayGuard(); + + private: + ArrowArray* array_; +}; + +class ArrowSchemaGuard { + public: + ArrowSchemaGuard(ArrowSchema* schema) : schema_(schema) {} + ~ArrowSchemaGuard(); + + private: + ArrowSchema* schema_; +}; + +class ArrowArrayViewGuard { + public: + ArrowArrayViewGuard(ArrowArrayView* view) : view_(view) {} + ~ArrowArrayViewGuard(); + + private: + ArrowArrayView* view_; +}; + +class ArrowArrayBufferGuard { + public: + ArrowArrayBufferGuard(ArrowBuffer* buffer) : buffer_(buffer) {} + ~ArrowArrayBufferGuard(); + + private: + ArrowBuffer* buffer_; +}; + +} // namespace iceberg::internal diff --git a/src/iceberg/manifest_reader_internal.cc b/src/iceberg/manifest_reader_internal.cc index 9dc5f1455..ee805bde5 100644 --- a/src/iceberg/manifest_reader_internal.cc +++ b/src/iceberg/manifest_reader_internal.cc @@ -23,6 +23,7 @@ #include +#include "iceberg/arrow_struct_guard.h" #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" #include "iceberg/schema.h" @@ -51,6 +52,7 @@ Result>> ParseManifestListEntry( ArrowArrayView array_view; auto status = ArrowArrayViewInitFromSchema(&array_view, schema, &error); ARROW_RETURN_IF_NOT_OK(status, error); + internal::ArrowArrayViewGuard view_guard(&array_view); status = ArrowArrayViewSetArray(&array_view, array_in, &error); ARROW_RETURN_IF_NOT_OK(status, error); status = ArrowArrayViewValidate(&array_view, NANOARROW_VALIDATION_LEVEL_FULL, &error); @@ -65,8 +67,6 @@ Result>> ParseManifestListEntry( for (int64_t idx = 0; idx < array_in->n_children; idx++) { const auto& field = iceberg_schema.GetFieldByIndex(idx); if (!field.has_value()) { - ArrowArrayRelease(array_in); - ArrowArrayViewReset(&array_view); return InvalidArgument("Field not found in schema: {}", idx); } auto field_name = field.value().get().name(); @@ -197,8 +197,6 @@ Result>> ParseManifestListEntry( } } #undef PARSE_PRIMITIVE_FIELD - ArrowArrayRelease(array_in); - ArrowArrayViewReset(&array_view); return manifest_files; } // namespace iceberg @@ -213,6 +211,7 @@ Result>> ManifestListReaderImpl::Files return InvalidArgument("Get schema failed in reader:{}", arrow_schema.error().message); } + internal::ArrowSchemaGuard schema_guard(&arrow_schema.value()); auto schema = FromArrowSchema(arrow_schema.value(), std::nullopt); if (!schema.has_value()) { return InvalidArgument("Parse iceberg schema failed:{}", schema.error().message); @@ -224,6 +223,7 @@ Result>> ManifestListReaderImpl::Files result.error().message); } if (result.value().has_value()) { + internal::ArrowArrayGuard array_guard(&result.value().value()); auto parse_result = ParseManifestListEntry( &arrow_schema.value(), &result.value().value(), *schema.value()); if (!parse_result.has_value()) { @@ -234,6 +234,7 @@ Result>> ManifestListReaderImpl::Files std::make_move_iterator(parse_result.value().begin()), std::make_move_iterator(parse_result.value().end())); } else { + // eof break; } } From 705c4ea59cc557f15605aef6900f15b8bc2c0483 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Tue, 8 Jul 2025 15:21:27 +0800 Subject: [PATCH 04/10] rename to internal --- src/iceberg/CMakeLists.txt | 2 +- ...row_struct_guard.cc => arrow_c_data_guard_internal.cc} | 2 +- ...arrow_struct_guard.h => arrow_c_data_guard_internal.h} | 8 ++++---- src/iceberg/manifest_reader_internal.cc | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) rename src/iceberg/{arrow_struct_guard.cc => arrow_c_data_guard_internal.cc} (96%) rename src/iceberg/{arrow_struct_guard.h => arrow_c_data_guard_internal.h} (82%) diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 66a4c061c..d448caa39 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -45,7 +45,7 @@ set(ICEBERG_SOURCES type.cc manifest_reader.cc manifest_reader_internal.cc - arrow_struct_guard.cc + arrow_c_data_guard_internal.cc util/murmurhash3_internal.cc util/timepoint.cc util/unreachable.cc diff --git a/src/iceberg/arrow_struct_guard.cc b/src/iceberg/arrow_c_data_guard_internal.cc similarity index 96% rename from src/iceberg/arrow_struct_guard.cc rename to src/iceberg/arrow_c_data_guard_internal.cc index 0d808ca11..5fb3f9fa5 100644 --- a/src/iceberg/arrow_struct_guard.cc +++ b/src/iceberg/arrow_c_data_guard_internal.cc @@ -17,7 +17,7 @@ * under the License. */ -#include "iceberg/arrow_struct_guard.h" +#include "iceberg/arrow_c_data_guard_internal.h" namespace iceberg::internal { diff --git a/src/iceberg/arrow_struct_guard.h b/src/iceberg/arrow_c_data_guard_internal.h similarity index 82% rename from src/iceberg/arrow_struct_guard.h rename to src/iceberg/arrow_c_data_guard_internal.h index 4e09c7a3a..8bce14e57 100644 --- a/src/iceberg/arrow_struct_guard.h +++ b/src/iceberg/arrow_c_data_guard_internal.h @@ -27,7 +27,7 @@ namespace iceberg::internal { class ArrowArrayGuard { public: - ArrowArrayGuard(ArrowArray* array) : array_(array) {} + explicit ArrowArrayGuard(ArrowArray* array) : array_(array) {} ~ArrowArrayGuard(); private: @@ -36,7 +36,7 @@ class ArrowArrayGuard { class ArrowSchemaGuard { public: - ArrowSchemaGuard(ArrowSchema* schema) : schema_(schema) {} + explicit ArrowSchemaGuard(ArrowSchema* schema) : schema_(schema) {} ~ArrowSchemaGuard(); private: @@ -45,7 +45,7 @@ class ArrowSchemaGuard { class ArrowArrayViewGuard { public: - ArrowArrayViewGuard(ArrowArrayView* view) : view_(view) {} + explicit ArrowArrayViewGuard(ArrowArrayView* view) : view_(view) {} ~ArrowArrayViewGuard(); private: @@ -54,7 +54,7 @@ class ArrowArrayViewGuard { class ArrowArrayBufferGuard { public: - ArrowArrayBufferGuard(ArrowBuffer* buffer) : buffer_(buffer) {} + explicit ArrowArrayBufferGuard(ArrowBuffer* buffer) : buffer_(buffer) {} ~ArrowArrayBufferGuard(); private: diff --git a/src/iceberg/manifest_reader_internal.cc b/src/iceberg/manifest_reader_internal.cc index ee805bde5..3e31a0ffc 100644 --- a/src/iceberg/manifest_reader_internal.cc +++ b/src/iceberg/manifest_reader_internal.cc @@ -23,7 +23,7 @@ #include -#include "iceberg/arrow_struct_guard.h" +#include "iceberg/arrow_c_data_guard_internal.h" #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" #include "iceberg/schema.h" From b80c0d2cb05b35e2047cf4e96d468bc9d8fdc730 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Tue, 8 Jul 2025 19:00:45 +0800 Subject: [PATCH 05/10] refactor MakeReader --- src/iceberg/manifest_reader.cc | 34 ++++++++++++++++++++----- src/iceberg/manifest_reader.h | 7 +++-- src/iceberg/manifest_reader_internal.cc | 21 +++++---------- src/iceberg/manifest_reader_internal.h | 12 ++++++--- test/manifest_list_reader_test.cc | 11 +++----- 5 files changed, 51 insertions(+), 34 deletions(-) diff --git a/src/iceberg/manifest_reader.cc b/src/iceberg/manifest_reader.cc index 830da3566..f36a41a21 100644 --- a/src/iceberg/manifest_reader.cc +++ b/src/iceberg/manifest_reader.cc @@ -19,18 +19,40 @@ #include "iceberg/manifest_reader.h" +#include "iceberg/manifest_entry.h" +#include "iceberg/manifest_list.h" #include "iceberg/manifest_reader_internal.h" +#include "iceberg/schema.h" +#include "iceberg/util/macros.h" namespace iceberg { -std::shared_ptr ManifestReader::NewReader( - std::unique_ptr reader) { - return std::make_shared(std::move(reader)); +Result> ManifestReader::MakeReader( + const std::string& manifest_location, std::shared_ptr file_io, + std::shared_ptr partition_schema) { + auto manifest_entry_schema = ManifestEntry::TypeFromPartitionType(partition_schema); + auto fields_span = manifest_entry_schema->fields(); + std::vector fields(fields_span.begin(), fields_span.end()); + auto schema = std::make_shared(fields); + ICEBERG_ASSIGN_OR_RAISE( + auto reader, + ReaderFactoryRegistry::Open( + FileFormatType::kAvro, + {.path = manifest_location, .io = std::move(file_io), .projection = schema})); + return std::make_shared(std::move(reader), std::move(schema)); } -std::shared_ptr ManifestListReader::NewReader( - std::unique_ptr reader) { - return std::make_shared(std::move(reader)); +Result> ManifestListReader::MakeReader( + const std::string& manifest_list_location, std::shared_ptr file_io) { + std::vector fields(ManifestFile::Type().fields().begin(), + ManifestFile::Type().fields().end()); + auto schema = std::make_shared(fields); + ICEBERG_ASSIGN_OR_RAISE( + auto reader, + ReaderFactoryRegistry::Open(FileFormatType::kAvro, {.path = manifest_list_location, + .io = std::move(file_io), + .projection = schema})); + return std::make_shared(std::move(reader), std::move(schema)); } } // namespace iceberg diff --git a/src/iceberg/manifest_reader.h b/src/iceberg/manifest_reader.h index 4be4e50cd..0a747475a 100644 --- a/src/iceberg/manifest_reader.h +++ b/src/iceberg/manifest_reader.h @@ -36,7 +36,9 @@ class ICEBERG_EXPORT ManifestReader { public: virtual Result>> Entries() const = 0; - static std::shared_ptr NewReader(std::unique_ptr reader); + static Result> MakeReader( + const std::string& manifest_location, std::shared_ptr file_io, + std::shared_ptr partition_schema); }; /// \brief Read manifest files from a manifest list file. @@ -44,7 +46,8 @@ class ICEBERG_EXPORT ManifestListReader { public: virtual Result>> Files() const = 0; - static std::shared_ptr NewReader(std::unique_ptr reader); + static Result> MakeReader( + const std::string& manifest_list_location, std::shared_ptr file_io); }; } // namespace iceberg diff --git a/src/iceberg/manifest_reader_internal.cc b/src/iceberg/manifest_reader_internal.cc index 3e31a0ffc..fb9d0251e 100644 --- a/src/iceberg/manifest_reader_internal.cc +++ b/src/iceberg/manifest_reader_internal.cc @@ -29,12 +29,13 @@ #include "iceberg/schema.h" #include "iceberg/schema_internal.h" #include "iceberg/type.h" +#include "iceberg/util/macros.h" namespace iceberg { #define ARROW_RETURN_IF_NOT_OK(status, error) \ - if (status != NANOARROW_OK) { \ - return InvalidArrowData("NanoArrow error: {}", error.message); \ + if (status != NANOARROW_OK) [[unlikely]] { \ + return InvalidArrowData("Nanoarrow error: {}", error.message); \ } Result>> ParseManifestListEntry( @@ -206,16 +207,8 @@ Result>> ManifestReaderImpl::Entries( Result>> ManifestListReaderImpl::Files() const { std::vector> manifest_files; - auto arrow_schema = reader_->Schema(); - if (!arrow_schema.has_value()) { - return InvalidArgument("Get schema failed in reader:{}", - arrow_schema.error().message); - } - internal::ArrowSchemaGuard schema_guard(&arrow_schema.value()); - auto schema = FromArrowSchema(arrow_schema.value(), std::nullopt); - if (!schema.has_value()) { - return InvalidArgument("Parse iceberg schema failed:{}", schema.error().message); - } + ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader_->Schema()); + internal::ArrowSchemaGuard schema_guard(&arrow_schema); while (true) { auto result = reader_->Next(); if (!result.has_value()) { @@ -224,8 +217,8 @@ Result>> ManifestListReaderImpl::Files } if (result.value().has_value()) { internal::ArrowArrayGuard array_guard(&result.value().value()); - auto parse_result = ParseManifestListEntry( - &arrow_schema.value(), &result.value().value(), *schema.value()); + auto parse_result = + ParseManifestListEntry(&arrow_schema, &result.value().value(), *schema_); if (!parse_result.has_value()) { return InvalidArgument("Failed to parse manifest list entry:{}", parse_result.error().message); diff --git a/src/iceberg/manifest_reader_internal.h b/src/iceberg/manifest_reader_internal.h index 70fb484e4..738682095 100644 --- a/src/iceberg/manifest_reader_internal.h +++ b/src/iceberg/manifest_reader_internal.h @@ -29,24 +29,28 @@ namespace iceberg { /// \brief Read manifest entries from a manifest file. class ManifestReaderImpl : public ManifestReader { public: - explicit ManifestReaderImpl(std::unique_ptr reader) - : reader_(std::move(reader)) {} + explicit ManifestReaderImpl(std::unique_ptr reader, + std::shared_ptr schema) + : schema_(std::move(schema)), reader_(std::move(reader)) {} Result>> Entries() const override; private: + std::shared_ptr schema_; std::unique_ptr reader_; }; /// \brief Read manifest files from a manifest list file. class ManifestListReaderImpl : public ManifestListReader { public: - explicit ManifestListReaderImpl(std::unique_ptr reader) - : reader_(std::move(reader)) {} + explicit ManifestListReaderImpl(std::unique_ptr reader, + std::shared_ptr schema) + : schema_(std::move(schema)), reader_(std::move(reader)) {} Result>> Files() const override; private: + std::shared_ptr schema_; std::unique_ptr reader_; }; diff --git a/test/manifest_list_reader_test.cc b/test/manifest_list_reader_test.cc index e57f2f9b9..2f3a7102b 100644 --- a/test/manifest_list_reader_test.cc +++ b/test/manifest_list_reader_test.cc @@ -47,16 +47,11 @@ class ManifestListReaderTest : public TempFileTestBase { }; TEST_F(ManifestListReaderTest, BasicTest) { - std::vector fields(ManifestFile::Type().fields().begin(), - ManifestFile::Type().fields().end()); - auto schema = std::make_shared(fields); std::string path = GetResourcePath( "snap-7412193043800610213-1-2bccd69e-d642-4816-bba0-261cd9bd0d93.avro"); - auto reader_result = ReaderFactoryRegistry::Open( - FileFormatType::kAvro, {.path = path, .io = file_io_, .projection = schema}); - ASSERT_THAT(reader_result, IsOk()); - auto reader = std::move(reader_result.value()); - auto manifest_reader = ManifestListReader::NewReader(std::move(reader)); + auto manifest_reader_result = ManifestListReader::MakeReader(path, file_io_); + ASSERT_EQ(manifest_reader_result.has_value(), true); + auto manifest_reader = manifest_reader_result.value(); auto read_result = manifest_reader->Files(); ASSERT_EQ(read_result.has_value(), true); ASSERT_EQ(read_result.value().size(), 4); From a9f4525a891eb767394e24817ce480ea403e14e0 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Tue, 8 Jul 2025 19:10:25 +0800 Subject: [PATCH 06/10] remove useless unique_ptr --- src/iceberg/manifest_reader.h | 4 +- src/iceberg/manifest_reader_internal.cc | 28 +++++----- src/iceberg/manifest_reader_internal.h | 4 +- test/manifest_list_reader_test.cc | 68 ++++++++++++------------- 4 files changed, 50 insertions(+), 54 deletions(-) diff --git a/src/iceberg/manifest_reader.h b/src/iceberg/manifest_reader.h index 0a747475a..16e064dbe 100644 --- a/src/iceberg/manifest_reader.h +++ b/src/iceberg/manifest_reader.h @@ -34,7 +34,7 @@ namespace iceberg { /// \brief Read manifest entries from a manifest file. class ICEBERG_EXPORT ManifestReader { public: - virtual Result>> Entries() const = 0; + virtual Result> Entries() const = 0; static Result> MakeReader( const std::string& manifest_location, std::shared_ptr file_io, @@ -44,7 +44,7 @@ class ICEBERG_EXPORT ManifestReader { /// \brief Read manifest files from a manifest list file. class ICEBERG_EXPORT ManifestListReader { public: - virtual Result>> Files() const = 0; + virtual Result> Files() const = 0; static Result> MakeReader( const std::string& manifest_list_location, std::shared_ptr file_io); diff --git a/src/iceberg/manifest_reader_internal.cc b/src/iceberg/manifest_reader_internal.cc index fb9d0251e..768886fab 100644 --- a/src/iceberg/manifest_reader_internal.cc +++ b/src/iceberg/manifest_reader_internal.cc @@ -38,8 +38,9 @@ namespace iceberg { return InvalidArrowData("Nanoarrow error: {}", error.message); \ } -Result>> ParseManifestListEntry( - ArrowSchema* schema, ArrowArray* array_in, const Schema& iceberg_schema) { +Result> ParseManifestListEntry(ArrowSchema* schema, + ArrowArray* array_in, + const Schema& iceberg_schema) { if (schema->n_children != array_in->n_children) { return InvalidArgument("Columns size not match between schema:{} and array:{}", schema->n_children, array_in->n_children); @@ -59,11 +60,8 @@ Result>> ParseManifestListEntry( status = ArrowArrayViewValidate(&array_view, NANOARROW_VALIDATION_LEVEL_FULL, &error); ARROW_RETURN_IF_NOT_OK(status, error); - std::vector> manifest_files; + std::vector manifest_files; manifest_files.resize(array_in->length); - for (auto& manifest_file : manifest_files) { - manifest_file = std::make_unique(); - } for (int64_t idx = 0; idx < array_in->n_children; idx++) { const auto& field = iceberg_schema.GetFieldByIndex(idx); @@ -77,7 +75,7 @@ Result>> ParseManifestListEntry( for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { \ if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { \ auto value = ArrowArrayViewGetIntUnsafe(view_of_column, row_idx); \ - manifest_files[row_idx]->field_name = static_cast(value); \ + manifest_files[row_idx].field_name = static_cast(value); \ } \ } @@ -86,7 +84,7 @@ Result>> ParseManifestListEntry( if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { auto value = ArrowArrayViewGetStringUnsafe(view_of_column, row_idx); std::string path_str(value.data, value.size_bytes); - manifest_files[row_idx]->manifest_path = path_str; + manifest_files[row_idx].manifest_path = path_str; } } } else if (field_name == ManifestFile::kManifestLength.name()) { @@ -97,7 +95,7 @@ Result>> ParseManifestListEntry( for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { auto value = ArrowArrayViewGetIntUnsafe(view_of_column, row_idx); - manifest_files[row_idx]->content = static_cast(value); + manifest_files[row_idx].content = static_cast(value); } } } else if (field_name == ManifestFile::kSequenceNumber.name()) { @@ -180,14 +178,14 @@ Result>> ParseManifestListEntry( buffer.data.as_char, buffer.data.as_char + buffer.size_bytes); } - manifest_file->partitions.emplace_back(partition_field_summary); + manifest_file.partitions.emplace_back(partition_field_summary); } } } else if (field_name == ManifestFile::kKeyMetadata.name()) { for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { auto buffer = ArrowArrayViewGetBytesUnsafe(view_of_column, row_idx); - manifest_files[row_idx]->key_metadata = std::vector( + manifest_files[row_idx].key_metadata = std::vector( buffer.data.as_char, buffer.data.as_char + buffer.size_bytes); } } @@ -201,12 +199,10 @@ Result>> ParseManifestListEntry( return manifest_files; } // namespace iceberg -Result>> ManifestReaderImpl::Entries() const { - return {}; -} +Result> ManifestReaderImpl::Entries() const { return {}; } -Result>> ManifestListReaderImpl::Files() const { - std::vector> manifest_files; +Result> ManifestListReaderImpl::Files() const { + std::vector manifest_files; ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader_->Schema()); internal::ArrowSchemaGuard schema_guard(&arrow_schema); while (true) { diff --git a/src/iceberg/manifest_reader_internal.h b/src/iceberg/manifest_reader_internal.h index 738682095..db2533359 100644 --- a/src/iceberg/manifest_reader_internal.h +++ b/src/iceberg/manifest_reader_internal.h @@ -33,7 +33,7 @@ class ManifestReaderImpl : public ManifestReader { std::shared_ptr schema) : schema_(std::move(schema)), reader_(std::move(reader)) {} - Result>> Entries() const override; + Result> Entries() const override; private: std::shared_ptr schema_; @@ -47,7 +47,7 @@ class ManifestListReaderImpl : public ManifestListReader { std::shared_ptr schema) : schema_(std::move(schema)), reader_(std::move(reader)) {} - Result>> Files() const override; + Result> Files() const override; private: std::shared_ptr schema_; diff --git a/test/manifest_list_reader_test.cc b/test/manifest_list_reader_test.cc index 2f3a7102b..5a98ff8db 100644 --- a/test/manifest_list_reader_test.cc +++ b/test/manifest_list_reader_test.cc @@ -57,14 +57,14 @@ TEST_F(ManifestListReaderTest, BasicTest) { ASSERT_EQ(read_result.value().size(), 4); std::string test_dir_prefix = "/tmp/db/db/iceberg_test/metadata/"; for (const auto& file : read_result.value()) { - auto manifest_path = file->manifest_path.substr(test_dir_prefix.size()); + auto manifest_path = file.manifest_path.substr(test_dir_prefix.size()); if (manifest_path == "2bccd69e-d642-4816-bba0-261cd9bd0d93-m0.avro") { - ASSERT_EQ(file->added_snapshot_id, 7412193043800610213); - ASSERT_EQ(file->manifest_length, 7433); - ASSERT_EQ(file->sequence_number, 4); - ASSERT_EQ(file->min_sequence_number, 4); - ASSERT_EQ(file->partitions.size(), 1); - const auto& partition = file->partitions[0]; + ASSERT_EQ(file.added_snapshot_id, 7412193043800610213); + ASSERT_EQ(file.manifest_length, 7433); + ASSERT_EQ(file.sequence_number, 4); + ASSERT_EQ(file.min_sequence_number, 4); + ASSERT_EQ(file.partitions.size(), 1); + const auto& partition = file.partitions[0]; ASSERT_EQ(partition.contains_null, false); ASSERT_EQ(partition.contains_nan.value(), false); ASSERT_EQ(partition.lower_bound.value(), @@ -72,12 +72,12 @@ TEST_F(ManifestListReaderTest, BasicTest) { ASSERT_EQ(partition.upper_bound.value(), std::vector({'x', ';', 0x07, 0x00})); } else if (manifest_path == "9b6ffacd-ef10-4abf-a89c-01c733696796-m0.avro") { - ASSERT_EQ(file->added_snapshot_id, 5485972788975780755); - ASSERT_EQ(file->manifest_length, 7431); - ASSERT_EQ(file->sequence_number, 3); - ASSERT_EQ(file->min_sequence_number, 3); - ASSERT_EQ(file->partitions.size(), 1); - const auto& partition = file->partitions[0]; + ASSERT_EQ(file.added_snapshot_id, 5485972788975780755); + ASSERT_EQ(file.manifest_length, 7431); + ASSERT_EQ(file.sequence_number, 3); + ASSERT_EQ(file.min_sequence_number, 3); + ASSERT_EQ(file.partitions.size(), 1); + const auto& partition = file.partitions[0]; ASSERT_EQ(partition.contains_null, false); ASSERT_EQ(partition.contains_nan.value(), false); ASSERT_EQ(partition.lower_bound.value(), @@ -85,12 +85,12 @@ TEST_F(ManifestListReaderTest, BasicTest) { ASSERT_EQ(partition.upper_bound.value(), std::vector({'(', 0x19, 0x07, 0x00})); } else if (manifest_path == "2541e6b5-4923-4bd5-886d-72c6f7228400-m0.avro") { - ASSERT_EQ(file->added_snapshot_id, 1679468743751242972); - ASSERT_EQ(file->manifest_length, 7433); - ASSERT_EQ(file->sequence_number, 2); - ASSERT_EQ(file->min_sequence_number, 2); - ASSERT_EQ(file->partitions.size(), 1); - const auto& partition = file->partitions[0]; + ASSERT_EQ(file.added_snapshot_id, 1679468743751242972); + ASSERT_EQ(file.manifest_length, 7433); + ASSERT_EQ(file.sequence_number, 2); + ASSERT_EQ(file.min_sequence_number, 2); + ASSERT_EQ(file.partitions.size(), 1); + const auto& partition = file.partitions[0]; ASSERT_EQ(partition.contains_null, false); ASSERT_EQ(partition.contains_nan.value(), false); ASSERT_EQ(partition.lower_bound.value(), @@ -98,12 +98,12 @@ TEST_F(ManifestListReaderTest, BasicTest) { ASSERT_EQ(partition.upper_bound.value(), std::vector({0xd0, 0xd4, 0x06, 0x00})); } else if (manifest_path == "3118c801-d2e0-4df6-8c7a-7d4eaade32f8-m0.avro") { - ASSERT_EQ(file->added_snapshot_id, 1579605567338877265); - ASSERT_EQ(file->manifest_length, 7431); - ASSERT_EQ(file->sequence_number, 1); - ASSERT_EQ(file->min_sequence_number, 1); - ASSERT_EQ(file->partitions.size(), 1); - const auto& partition = file->partitions[0]; + ASSERT_EQ(file.added_snapshot_id, 1579605567338877265); + ASSERT_EQ(file.manifest_length, 7431); + ASSERT_EQ(file.sequence_number, 1); + ASSERT_EQ(file.min_sequence_number, 1); + ASSERT_EQ(file.partitions.size(), 1); + const auto& partition = file.partitions[0]; ASSERT_EQ(partition.contains_null, false); ASSERT_EQ(partition.contains_nan.value(), false); ASSERT_EQ(partition.lower_bound.value(), @@ -113,15 +113,15 @@ TEST_F(ManifestListReaderTest, BasicTest) { } else { ASSERT_TRUE(false) << "Unexpected manifest file: " << manifest_path; } - ASSERT_EQ(file->partition_spec_id, 0); - ASSERT_EQ(file->content, ManifestFile::Content::kData); - ASSERT_EQ(file->added_files_count, 1); - ASSERT_EQ(file->existing_files_count, 0); - ASSERT_EQ(file->deleted_files_count, 0); - ASSERT_EQ(file->added_rows_count, 1); - ASSERT_EQ(file->existing_rows_count, 0); - ASSERT_EQ(file->deleted_rows_count, 0); - ASSERT_EQ(file->key_metadata.empty(), true); + ASSERT_EQ(file.partition_spec_id, 0); + ASSERT_EQ(file.content, ManifestFile::Content::kData); + ASSERT_EQ(file.added_files_count, 1); + ASSERT_EQ(file.existing_files_count, 0); + ASSERT_EQ(file.deleted_files_count, 0); + ASSERT_EQ(file.added_rows_count, 1); + ASSERT_EQ(file.existing_rows_count, 0); + ASSERT_EQ(file.deleted_rows_count, 0); + ASSERT_EQ(file.key_metadata.empty(), true); } } From 9712fd3ebac60bfa6de523496c22cd8782aeb118 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Mon, 14 Jul 2025 14:56:34 +0800 Subject: [PATCH 07/10] refactor some impl --- src/iceberg/manifest_reader.cc | 8 +- src/iceberg/manifest_reader.h | 4 +- src/iceberg/manifest_reader_internal.cc | 288 +++++++++++++----------- src/iceberg/manifest_reader_internal.h | 2 +- src/iceberg/result.h | 4 + test/manifest_list_reader_test.cc | 2 +- 6 files changed, 165 insertions(+), 143 deletions(-) diff --git a/src/iceberg/manifest_reader.cc b/src/iceberg/manifest_reader.cc index f36a41a21..587e01c6c 100644 --- a/src/iceberg/manifest_reader.cc +++ b/src/iceberg/manifest_reader.cc @@ -27,7 +27,7 @@ namespace iceberg { -Result> ManifestReader::MakeReader( +Result> ManifestReader::MakeReader( const std::string& manifest_location, std::shared_ptr file_io, std::shared_ptr partition_schema) { auto manifest_entry_schema = ManifestEntry::TypeFromPartitionType(partition_schema); @@ -39,10 +39,10 @@ Result> ManifestReader::MakeReader( ReaderFactoryRegistry::Open( FileFormatType::kAvro, {.path = manifest_location, .io = std::move(file_io), .projection = schema})); - return std::make_shared(std::move(reader), std::move(schema)); + return std::make_unique(std::move(reader), std::move(schema)); } -Result> ManifestListReader::MakeReader( +Result> ManifestListReader::MakeReader( const std::string& manifest_list_location, std::shared_ptr file_io) { std::vector fields(ManifestFile::Type().fields().begin(), ManifestFile::Type().fields().end()); @@ -52,7 +52,7 @@ Result> ManifestListReader::MakeReader( ReaderFactoryRegistry::Open(FileFormatType::kAvro, {.path = manifest_list_location, .io = std::move(file_io), .projection = schema})); - return std::make_shared(std::move(reader), std::move(schema)); + return std::make_unique(std::move(reader), std::move(schema)); } } // namespace iceberg diff --git a/src/iceberg/manifest_reader.h b/src/iceberg/manifest_reader.h index 16e064dbe..c354c6c90 100644 --- a/src/iceberg/manifest_reader.h +++ b/src/iceberg/manifest_reader.h @@ -36,7 +36,7 @@ class ICEBERG_EXPORT ManifestReader { public: virtual Result> Entries() const = 0; - static Result> MakeReader( + static Result> MakeReader( const std::string& manifest_location, std::shared_ptr file_io, std::shared_ptr partition_schema); }; @@ -46,7 +46,7 @@ class ICEBERG_EXPORT ManifestListReader { public: virtual Result> Files() const = 0; - static Result> MakeReader( + static Result> MakeReader( const std::string& manifest_list_location, std::shared_ptr file_io); }; diff --git a/src/iceberg/manifest_reader_internal.cc b/src/iceberg/manifest_reader_internal.cc index 768886fab..6417ffd88 100644 --- a/src/iceberg/manifest_reader_internal.cc +++ b/src/iceberg/manifest_reader_internal.cc @@ -33,32 +33,95 @@ namespace iceberg { -#define ARROW_RETURN_IF_NOT_OK(status, error) \ +#define NANOARROW_RETURN_IF_NOT_OK(status, error) \ if (status != NANOARROW_OK) [[unlikely]] { \ return InvalidArrowData("Nanoarrow error: {}", error.message); \ } +Status ParsePartitionFieldSummaryList(ArrowArrayView* view_of_column, + std::vector& manifest_files) { + auto manifest_count = view_of_column->length; + // view_of_column is list> + if (view_of_column->storage_type != ArrowType::NANOARROW_TYPE_LIST) { + return InvalidManifestList("partitions field should be a list."); + } + auto view_of_list_iterm = view_of_column->children[0]; + // view_of_list_iterm is struct + if (view_of_list_iterm->storage_type != ArrowType::NANOARROW_TYPE_STRUCT) { + return InvalidManifestList("partitions list field should be a list."); + } + if (view_of_list_iterm->n_children != 4) { + return InvalidManifestList("PartitionFieldSummary should have 4 fields."); + } + if (view_of_list_iterm->children[0]->storage_type != ArrowType::NANOARROW_TYPE_BOOL) { + return InvalidManifestList("contains_null should have be bool type column."); + } + auto contains_null = view_of_list_iterm->children[0]; + if (view_of_list_iterm->children[1]->storage_type != ArrowType::NANOARROW_TYPE_BOOL) { + return InvalidManifestList("contains_nan should have be bool type column."); + } + auto contains_nan = view_of_list_iterm->children[1]; + if (view_of_list_iterm->children[2]->storage_type != ArrowType::NANOARROW_TYPE_BINARY) { + return InvalidManifestList("lower_bound should have be binary type column."); + } + auto lower_bound_list = view_of_list_iterm->children[2]; + if (view_of_list_iterm->children[3]->storage_type != ArrowType::NANOARROW_TYPE_BINARY) { + return InvalidManifestList("upper_bound should have be binary type column."); + } + auto upper_bound_list = view_of_list_iterm->children[3]; + for (int64_t manifest_idx = 0; manifest_idx < manifest_count; manifest_idx++) { + auto offset = ArrowArrayViewListChildOffset(view_of_column, manifest_idx); + auto next_offset = ArrowArrayViewListChildOffset(view_of_column, manifest_idx + 1); + // partitions from offset to next_offset belongs to manifest_idx + auto& manifest_file = manifest_files[manifest_idx]; + for (int64_t partition_idx = offset; partition_idx < next_offset; partition_idx++) { + PartitionFieldSummary partition_field_summary; + if (!ArrowArrayViewIsNull(contains_null, partition_idx)) { + partition_field_summary.contains_null = + ArrowArrayViewGetIntUnsafe(contains_null, partition_idx); + } + if (!ArrowArrayViewIsNull(contains_nan, partition_idx)) { + partition_field_summary.contains_nan = + ArrowArrayViewGetIntUnsafe(contains_nan, partition_idx); + } + if (!ArrowArrayViewIsNull(lower_bound_list, partition_idx)) { + auto buffer = ArrowArrayViewGetBytesUnsafe(lower_bound_list, partition_idx); + partition_field_summary.lower_bound = std::vector( + buffer.data.as_char, buffer.data.as_char + buffer.size_bytes); + } + if (!ArrowArrayViewIsNull(upper_bound_list, partition_idx)) { + auto buffer = ArrowArrayViewGetBytesUnsafe(upper_bound_list, partition_idx); + partition_field_summary.upper_bound = std::vector( + buffer.data.as_char, buffer.data.as_char + buffer.size_bytes); + } + + manifest_file.partitions.emplace_back(partition_field_summary); + } + } + return {}; +} + Result> ParseManifestListEntry(ArrowSchema* schema, ArrowArray* array_in, const Schema& iceberg_schema) { if (schema->n_children != array_in->n_children) { - return InvalidArgument("Columns size not match between schema:{} and array:{}", - schema->n_children, array_in->n_children); + return InvalidManifestList("Columns size not match between schema:{} and array:{}", + schema->n_children, array_in->n_children); } if (iceberg_schema.fields().size() != array_in->n_children) { - return InvalidArgument("Columns size not match between schema:{} and array:{}", - iceberg_schema.fields().size(), array_in->n_children); + return InvalidManifestList("Columns size not match between schema:{} and array:{}", + iceberg_schema.fields().size(), array_in->n_children); } ArrowError error; ArrowArrayView array_view; auto status = ArrowArrayViewInitFromSchema(&array_view, schema, &error); - ARROW_RETURN_IF_NOT_OK(status, error); + NANOARROW_RETURN_IF_NOT_OK(status, error); internal::ArrowArrayViewGuard view_guard(&array_view); status = ArrowArrayViewSetArray(&array_view, array_in, &error); - ARROW_RETURN_IF_NOT_OK(status, error); + NANOARROW_RETURN_IF_NOT_OK(status, error); status = ArrowArrayViewValidate(&array_view, NANOARROW_VALIDATION_LEVEL_FULL, &error); - ARROW_RETURN_IF_NOT_OK(status, error); + NANOARROW_RETURN_IF_NOT_OK(status, error); std::vector manifest_files; manifest_files.resize(array_in->length); @@ -66,136 +129,94 @@ Result> ParseManifestListEntry(ArrowSchema* schema, for (int64_t idx = 0; idx < array_in->n_children; idx++) { const auto& field = iceberg_schema.GetFieldByIndex(idx); if (!field.has_value()) { - return InvalidArgument("Field not found in schema: {}", idx); + return InvalidSchema("Field index {} is not found in schema", idx); } auto field_name = field.value().get().name(); + bool required = !field.value().get().optional(); auto view_of_column = array_view.children[idx]; -#define PARSE_PRIMITIVE_FIELD(field_name, type) \ - for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { \ - if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { \ - auto value = ArrowArrayViewGetIntUnsafe(view_of_column, row_idx); \ - manifest_files[row_idx].field_name = static_cast(value); \ - } \ +#define PARSE_PRIMITIVE_FIELD(item, type) \ + for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { \ + if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { \ + auto value = ArrowArrayViewGetIntUnsafe(view_of_column, row_idx); \ + manifest_files[row_idx].item = static_cast(value); \ + } else if (required) { \ + return InvalidManifestList("Field {} is required but null at row {}", field_name, \ + row_idx); \ + } \ } - if (field_name == ManifestFile::kManifestPath.name()) { - for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { - if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { - auto value = ArrowArrayViewGetStringUnsafe(view_of_column, row_idx); - std::string path_str(value.data, value.size_bytes); - manifest_files[row_idx].manifest_path = path_str; - } - } - } else if (field_name == ManifestFile::kManifestLength.name()) { - PARSE_PRIMITIVE_FIELD(manifest_length, int64_t); - } else if (field_name == ManifestFile::kPartitionSpecId.name()) { - PARSE_PRIMITIVE_FIELD(partition_spec_id, int32_t); - } else if (field_name == ManifestFile::kContent.name()) { - for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { - if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { - auto value = ArrowArrayViewGetIntUnsafe(view_of_column, row_idx); - manifest_files[row_idx].content = static_cast(value); - } - } - } else if (field_name == ManifestFile::kSequenceNumber.name()) { - PARSE_PRIMITIVE_FIELD(sequence_number, int64_t); - } else if (field_name == ManifestFile::kMinSequenceNumber.name()) { - PARSE_PRIMITIVE_FIELD(min_sequence_number, int64_t); - } else if (field_name == ManifestFile::kAddedSnapshotId.name()) { - PARSE_PRIMITIVE_FIELD(added_snapshot_id, int64_t); - } else if (field_name == ManifestFile::kAddedFilesCount.name()) { - PARSE_PRIMITIVE_FIELD(added_files_count, int32_t); - } else if (field_name == ManifestFile::kExistingFilesCount.name()) { - PARSE_PRIMITIVE_FIELD(existing_files_count, int32_t); - } else if (field_name == ManifestFile::kDeletedFilesCount.name()) { - PARSE_PRIMITIVE_FIELD(deleted_files_count, int32_t); - } else if (field_name == ManifestFile::kAddedRowsCount.name()) { - PARSE_PRIMITIVE_FIELD(added_rows_count, int64_t); - } else if (field_name == ManifestFile::kExistingRowsCount.name()) { - PARSE_PRIMITIVE_FIELD(existing_rows_count, int64_t); - } else if (field_name == ManifestFile::kDeletedRowsCount.name()) { - PARSE_PRIMITIVE_FIELD(deleted_rows_count, int64_t); - } else if (field_name == ManifestFile::kPartitions.name()) { - // view_of_column is list> - auto manifest_count = view_of_column->length; - if (view_of_column->storage_type != ArrowType::NANOARROW_TYPE_LIST) { - return InvalidArgument("partitions field should be a list."); - } - auto view_of_list_iterm = view_of_column->children[0]; - // view_of_list_iterm is struct - if (view_of_list_iterm->storage_type != ArrowType::NANOARROW_TYPE_STRUCT) { - return InvalidArgument("partitions list field should be a list."); - } - if (view_of_list_iterm->n_children != 4) { - return InvalidArgument("PartitionFieldSummary should have 4 fields."); - } - if (view_of_list_iterm->children[0]->storage_type != - ArrowType::NANOARROW_TYPE_BOOL) { - return InvalidArgument("contains_null should have be bool type column."); - } - auto contains_null = view_of_list_iterm->children[0]; - if (view_of_list_iterm->children[1]->storage_type != - ArrowType::NANOARROW_TYPE_BOOL) { - return InvalidArgument("contains_nan should have be bool type column."); - } - auto contains_nan = view_of_list_iterm->children[1]; - if (view_of_list_iterm->children[2]->storage_type != - ArrowType::NANOARROW_TYPE_BINARY) { - return InvalidArgument("lower_bound should have be binary type column."); - } - auto lower_bound_list = view_of_list_iterm->children[2]; - if (view_of_list_iterm->children[3]->storage_type != - ArrowType::NANOARROW_TYPE_BINARY) { - return InvalidArgument("upper_bound should have be binary type column."); - } - auto upper_bound_list = view_of_list_iterm->children[3]; - for (int64_t manifest_idx = 0; manifest_idx < manifest_count; manifest_idx++) { - auto offset = ArrowArrayViewListChildOffset(view_of_column, manifest_idx); - auto next_offset = - ArrowArrayViewListChildOffset(view_of_column, manifest_idx + 1); - // partitions from offset to next_offset belongs to manifest_idx - auto& manifest_file = manifest_files[manifest_idx]; - for (int64_t partition_idx = offset; partition_idx < next_offset; - partition_idx++) { - PartitionFieldSummary partition_field_summary; - if (!ArrowArrayViewIsNull(contains_null, partition_idx)) { - partition_field_summary.contains_null = - ArrowArrayViewGetIntUnsafe(contains_null, partition_idx); - } - if (!ArrowArrayViewIsNull(contains_nan, partition_idx)) { - partition_field_summary.contains_nan = - ArrowArrayViewGetIntUnsafe(contains_nan, partition_idx); + switch (idx) { + case 0: + for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { + if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { + auto value = ArrowArrayViewGetStringUnsafe(view_of_column, row_idx); + std::string path_str(value.data, value.size_bytes); + manifest_files[row_idx].manifest_path = path_str; } - if (!ArrowArrayViewIsNull(lower_bound_list, partition_idx)) { - auto buffer = ArrowArrayViewGetBytesUnsafe(lower_bound_list, partition_idx); - partition_field_summary.lower_bound = std::vector( - buffer.data.as_char, buffer.data.as_char + buffer.size_bytes); + } + break; + case 1: + PARSE_PRIMITIVE_FIELD(manifest_length, int64_t); + break; + case 2: + PARSE_PRIMITIVE_FIELD(partition_spec_id, int32_t); + break; + case 3: + for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { + if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { + auto value = ArrowArrayViewGetIntUnsafe(view_of_column, row_idx); + manifest_files[row_idx].content = static_cast(value); } - if (!ArrowArrayViewIsNull(upper_bound_list, partition_idx)) { - auto buffer = ArrowArrayViewGetBytesUnsafe(upper_bound_list, partition_idx); - partition_field_summary.upper_bound = std::vector( + } + break; + case 4: + PARSE_PRIMITIVE_FIELD(sequence_number, int64_t); + break; + case 5: + PARSE_PRIMITIVE_FIELD(min_sequence_number, int64_t); + break; + case 6: + PARSE_PRIMITIVE_FIELD(added_snapshot_id, int64_t); + break; + case 7: + PARSE_PRIMITIVE_FIELD(added_files_count, int32_t); + break; + case 8: + PARSE_PRIMITIVE_FIELD(existing_files_count, int32_t); + break; + case 9: + PARSE_PRIMITIVE_FIELD(deleted_files_count, int32_t); + break; + case 10: + PARSE_PRIMITIVE_FIELD(added_rows_count, int64_t); + break; + case 11: + PARSE_PRIMITIVE_FIELD(existing_rows_count, int64_t); + break; + case 12: + PARSE_PRIMITIVE_FIELD(deleted_rows_count, int64_t); + break; + case 13: + ICEBERG_RETURN_UNEXPECTED( + ParsePartitionFieldSummaryList(view_of_column, manifest_files)); + break; + case 14: + for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { + if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { + auto buffer = ArrowArrayViewGetBytesUnsafe(view_of_column, row_idx); + manifest_files[row_idx].key_metadata = std::vector( buffer.data.as_char, buffer.data.as_char + buffer.size_bytes); } - - manifest_file.partitions.emplace_back(partition_field_summary); } - } - } else if (field_name == ManifestFile::kKeyMetadata.name()) { - for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { - if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { - auto buffer = ArrowArrayViewGetBytesUnsafe(view_of_column, row_idx); - manifest_files[row_idx].key_metadata = std::vector( - buffer.data.as_char, buffer.data.as_char + buffer.size_bytes); - } - } - } else if (field_name == ManifestFile::kFirstRowId.name()) { - PARSE_PRIMITIVE_FIELD(first_row_id, int64_t); - } else { - return InvalidArgument("Unsupported type: {}", field_name); + break; + case 15: + PARSE_PRIMITIVE_FIELD(first_row_id, int64_t); + break; + default: + return InvalidManifestList("Unsupported type: {}", field_name); } } -#undef PARSE_PRIMITIVE_FIELD return manifest_files; } // namespace iceberg @@ -208,20 +229,17 @@ Result> ManifestListReaderImpl::Files() const { while (true) { auto result = reader_->Next(); if (!result.has_value()) { - return InvalidArgument("Failed to read manifest list entry:{}", - result.error().message); + return InvalidManifestList("Failed to read manifest list entry:{}", + result.error().message); } if (result.value().has_value()) { internal::ArrowArrayGuard array_guard(&result.value().value()); - auto parse_result = - ParseManifestListEntry(&arrow_schema, &result.value().value(), *schema_); - if (!parse_result.has_value()) { - return InvalidArgument("Failed to parse manifest list entry:{}", - parse_result.error().message); - } + ICEBERG_ASSIGN_OR_RAISE( + auto entries, + ParseManifestListEntry(&arrow_schema, &result.value().value(), *schema_)); manifest_files.insert(manifest_files.end(), - std::make_move_iterator(parse_result.value().begin()), - std::make_move_iterator(parse_result.value().end())); + std::make_move_iterator(entries.begin()), + std::make_move_iterator(entries.end())); } else { // eof break; diff --git a/src/iceberg/manifest_reader_internal.h b/src/iceberg/manifest_reader_internal.h index db2533359..a367ede2f 100644 --- a/src/iceberg/manifest_reader_internal.h +++ b/src/iceberg/manifest_reader_internal.h @@ -20,7 +20,7 @@ #pragma once /// \file iceberg/internal/manifest_reader_internal.h -/// Reader implement for manifest files. +/// Reader implementation for manifest list files and manifest files. #include "iceberg/manifest_reader.h" diff --git a/src/iceberg/result.h b/src/iceberg/result.h index e9f1dfbfd..f66b1187f 100644 --- a/src/iceberg/result.h +++ b/src/iceberg/result.h @@ -36,6 +36,8 @@ enum class ErrorKind { kInvalidArrowData, kInvalidExpression, kInvalidSchema, + kInvalidManifest, + kInvalidManifestList, kIOError, kJsonParseError, kNoSuchNamespace, @@ -81,6 +83,8 @@ DEFINE_ERROR_FUNCTION(InvalidArgument) DEFINE_ERROR_FUNCTION(InvalidArrowData) DEFINE_ERROR_FUNCTION(InvalidExpression) DEFINE_ERROR_FUNCTION(InvalidSchema) +DEFINE_ERROR_FUNCTION(InvalidManifest) +DEFINE_ERROR_FUNCTION(InvalidManifestList) DEFINE_ERROR_FUNCTION(IOError) DEFINE_ERROR_FUNCTION(JsonParseError) DEFINE_ERROR_FUNCTION(NoSuchNamespace) diff --git a/test/manifest_list_reader_test.cc b/test/manifest_list_reader_test.cc index 5a98ff8db..f825a589d 100644 --- a/test/manifest_list_reader_test.cc +++ b/test/manifest_list_reader_test.cc @@ -51,7 +51,7 @@ TEST_F(ManifestListReaderTest, BasicTest) { "snap-7412193043800610213-1-2bccd69e-d642-4816-bba0-261cd9bd0d93.avro"); auto manifest_reader_result = ManifestListReader::MakeReader(path, file_io_); ASSERT_EQ(manifest_reader_result.has_value(), true); - auto manifest_reader = manifest_reader_result.value(); + auto manifest_reader = std::move(manifest_reader_result.value()); auto read_result = manifest_reader->Files(); ASSERT_EQ(read_result.has_value(), true); ASSERT_EQ(read_result.value().size(), 4); From d77b8f2bba73bf47a05707f1408421789a1d3c05 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Mon, 14 Jul 2025 15:00:33 +0800 Subject: [PATCH 08/10] resolve conflict --- src/iceberg/manifest_reader.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/iceberg/manifest_reader.h b/src/iceberg/manifest_reader.h index c354c6c90..7f59eb55e 100644 --- a/src/iceberg/manifest_reader.h +++ b/src/iceberg/manifest_reader.h @@ -34,6 +34,7 @@ namespace iceberg { /// \brief Read manifest entries from a manifest file. class ICEBERG_EXPORT ManifestReader { public: + virtual ~ManifestReader() = default; virtual Result> Entries() const = 0; static Result> MakeReader( @@ -44,6 +45,7 @@ class ICEBERG_EXPORT ManifestReader { /// \brief Read manifest files from a manifest list file. class ICEBERG_EXPORT ManifestListReader { public: + virtual ~ManifestListReader() = default; virtual Result> Files() const = 0; static Result> MakeReader( From 34d0a06c63eb912484ea15b709d6e4d27ecaa4af Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Mon, 14 Jul 2025 15:39:36 +0800 Subject: [PATCH 09/10] fix conflict --- src/iceberg/manifest_reader.cc | 9 +++++++++ src/iceberg/manifest_reader.h | 8 ++------ src/iceberg/table_scan.cc | 6 +++--- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/iceberg/manifest_reader.cc b/src/iceberg/manifest_reader.cc index 587e01c6c..2f7233ff4 100644 --- a/src/iceberg/manifest_reader.cc +++ b/src/iceberg/manifest_reader.cc @@ -55,4 +55,13 @@ Result> ManifestListReader::MakeReader( return std::make_unique(std::move(reader), std::move(schema)); } +Result> CreateManifestListReader( + std::string_view file_path) { + return NotImplemented("CreateManifestListReader is not implemented yet."); +} + +Result> CreateManifestReader(std::string_view file_path) { + return NotImplemented("CreateManifestReader is not implemented yet."); +} + } // namespace iceberg diff --git a/src/iceberg/manifest_reader.h b/src/iceberg/manifest_reader.h index 945e42406..da6104928 100644 --- a/src/iceberg/manifest_reader.h +++ b/src/iceberg/manifest_reader.h @@ -56,15 +56,11 @@ class ICEBERG_EXPORT ManifestListReader { /// \param file_path Path to the manifest list file. /// \return A Result containing the reader or an error. Result> CreateManifestListReader( - std::string_view file_path) { - return NotImplemented("CreateManifestListReader is not implemented yet."); -} + std::string_view file_path); /// \brief Creates a reader for a manifest file. /// \param file_path Path to the manifest file. /// \return A Result containing the reader or an error. -Result> CreateManifestReader(std::string_view file_path) { - return NotImplemented("CreateManifestReader is not implemented yet."); -} +Result> CreateManifestReader(std::string_view file_path); } // namespace iceberg diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 45539ef83..5172f3fd3 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -168,16 +168,16 @@ Result>> DataTableScan::PlanFiles() co std::vector> tasks; for (const auto& manifest_file : manifest_files) { ICEBERG_ASSIGN_OR_RAISE(auto manifest_reader, - CreateManifestReader(manifest_file->manifest_path)); + CreateManifestReader(manifest_file.manifest_path)); ICEBERG_ASSIGN_OR_RAISE(auto manifests, manifest_reader->Entries()); // TODO(gty404): filter manifests using partition spec and filter expression for (auto& manifest_entry : manifests) { - const auto& data_file = manifest_entry->data_file; + const auto& data_file = manifest_entry.data_file; switch (data_file->content) { case DataFile::Content::kData: - tasks.emplace_back(std::make_shared(manifest_entry->data_file)); + tasks.emplace_back(std::make_shared(manifest_entry.data_file)); break; case DataFile::Content::kPositionDeletes: case DataFile::Content::kEqualityDeletes: From 835b26abde5ac9da00ef61ef4ad0d4cefbf896b6 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Tue, 15 Jul 2025 11:45:22 +0800 Subject: [PATCH 10/10] use ManifestReader::MakeReader instead of CreateManifestReader --- src/iceberg/manifest_reader.cc | 31 +++++++++---------------- src/iceberg/manifest_reader.h | 23 ++++++++---------- src/iceberg/manifest_reader_internal.cc | 2 +- src/iceberg/table_scan.cc | 11 +++++---- 4 files changed, 29 insertions(+), 38 deletions(-) diff --git a/src/iceberg/manifest_reader.cc b/src/iceberg/manifest_reader.cc index 2f7233ff4..208263d57 100644 --- a/src/iceberg/manifest_reader.cc +++ b/src/iceberg/manifest_reader.cc @@ -28,40 +28,31 @@ namespace iceberg { Result> ManifestReader::MakeReader( - const std::string& manifest_location, std::shared_ptr file_io, + std::string_view manifest_location, std::shared_ptr file_io, std::shared_ptr partition_schema) { auto manifest_entry_schema = ManifestEntry::TypeFromPartitionType(partition_schema); auto fields_span = manifest_entry_schema->fields(); std::vector fields(fields_span.begin(), fields_span.end()); auto schema = std::make_shared(fields); ICEBERG_ASSIGN_OR_RAISE( - auto reader, - ReaderFactoryRegistry::Open( - FileFormatType::kAvro, - {.path = manifest_location, .io = std::move(file_io), .projection = schema})); + auto reader, ReaderFactoryRegistry::Open(FileFormatType::kAvro, + {.path = std::string(manifest_location), + .io = std::move(file_io), + .projection = schema})); return std::make_unique(std::move(reader), std::move(schema)); } Result> ManifestListReader::MakeReader( - const std::string& manifest_list_location, std::shared_ptr file_io) { + std::string_view manifest_list_location, std::shared_ptr file_io) { std::vector fields(ManifestFile::Type().fields().begin(), ManifestFile::Type().fields().end()); auto schema = std::make_shared(fields); - ICEBERG_ASSIGN_OR_RAISE( - auto reader, - ReaderFactoryRegistry::Open(FileFormatType::kAvro, {.path = manifest_list_location, - .io = std::move(file_io), - .projection = schema})); + ICEBERG_ASSIGN_OR_RAISE(auto reader, ReaderFactoryRegistry::Open( + FileFormatType::kAvro, + {.path = std::string(manifest_list_location), + .io = std::move(file_io), + .projection = schema})); return std::make_unique(std::move(reader), std::move(schema)); } -Result> CreateManifestListReader( - std::string_view file_path) { - return NotImplemented("CreateManifestListReader is not implemented yet."); -} - -Result> CreateManifestReader(std::string_view file_path) { - return NotImplemented("CreateManifestReader is not implemented yet."); -} - } // namespace iceberg diff --git a/src/iceberg/manifest_reader.h b/src/iceberg/manifest_reader.h index da6104928..5d231de0f 100644 --- a/src/iceberg/manifest_reader.h +++ b/src/iceberg/manifest_reader.h @@ -37,8 +37,12 @@ class ICEBERG_EXPORT ManifestReader { virtual ~ManifestReader() = default; virtual Result> Entries() const = 0; + /// \brief Creates a reader for a manifest file. + /// \param manifest_location Path to the manifest file. + /// \param file_io File IO implementation to use. + /// \return A Result containing the reader or an error. static Result> MakeReader( - const std::string& manifest_location, std::shared_ptr file_io, + std::string_view manifest_location, std::shared_ptr file_io, std::shared_ptr partition_schema); }; @@ -48,19 +52,12 @@ class ICEBERG_EXPORT ManifestListReader { virtual ~ManifestListReader() = default; virtual Result> Files() const = 0; + /// \brief Creates a reader for the manifest list. + /// \param manifest_list_location Path to the manifest list file. + /// \param file_io File IO implementation to use. + /// \return A Result containing the reader or an error. static Result> MakeReader( - const std::string& manifest_list_location, std::shared_ptr file_io); + std::string_view manifest_list_location, std::shared_ptr file_io); }; -/// \brief Creates a reader for the manifest list. -/// \param file_path Path to the manifest list file. -/// \return A Result containing the reader or an error. -Result> CreateManifestListReader( - std::string_view file_path); - -/// \brief Creates a reader for a manifest file. -/// \param file_path Path to the manifest file. -/// \return A Result containing the reader or an error. -Result> CreateManifestReader(std::string_view file_path); - } // namespace iceberg diff --git a/src/iceberg/manifest_reader_internal.cc b/src/iceberg/manifest_reader_internal.cc index 6417ffd88..92d51c631 100644 --- a/src/iceberg/manifest_reader_internal.cc +++ b/src/iceberg/manifest_reader_internal.cc @@ -218,7 +218,7 @@ Result> ParseManifestListEntry(ArrowSchema* schema, } } return manifest_files; -} // namespace iceberg +} Result> ManifestReaderImpl::Entries() const { return {}; } diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 5172f3fd3..8bc0a363d 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -161,14 +161,17 @@ DataTableScan::DataTableScan(TableScanContext context, std::shared_ptr f : TableScan(std::move(context), std::move(file_io)) {} Result>> DataTableScan::PlanFiles() const { - ICEBERG_ASSIGN_OR_RAISE(auto manifest_list_reader, - CreateManifestListReader(context_.snapshot->manifest_list)); + ICEBERG_ASSIGN_OR_RAISE( + auto manifest_list_reader, + ManifestListReader::MakeReader(context_.snapshot->manifest_list, file_io_)); ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, manifest_list_reader->Files()); std::vector> tasks; for (const auto& manifest_file : manifest_files) { - ICEBERG_ASSIGN_OR_RAISE(auto manifest_reader, - CreateManifestReader(manifest_file.manifest_path)); + ICEBERG_ASSIGN_OR_RAISE( + auto manifest_reader, + ManifestReader::MakeReader(manifest_file.manifest_path, file_io_, + /* TODO(xiao.dong) partition schema*/ nullptr)); ICEBERG_ASSIGN_OR_RAISE(auto manifests, manifest_reader->Entries()); // TODO(gty404): filter manifests using partition spec and filter expression