From cf2a0eaa2004812e1a17030c0e1e0836bd35f4de Mon Sep 17 00:00:00 2001 From: Li Feiyang Date: Fri, 29 Aug 2025 18:03:52 +0800 Subject: [PATCH 1/3] feat: Introduce ArrowArrayReader factory on FileScanTask --- src/iceberg/arrow_array_reader.h | 46 +++++++ src/iceberg/avro/avro_reader.cc | 8 +- src/iceberg/avro/avro_reader.h | 2 +- src/iceberg/file_reader.h | 9 +- src/iceberg/parquet/parquet_reader.cc | 8 +- src/iceberg/parquet/parquet_reader.h | 2 +- src/iceberg/table_scan.cc | 41 +++++-- src/iceberg/table_scan.h | 54 ++++++--- test/CMakeLists.txt | 3 +- test/file_scan_task_test.cc | 168 ++++++++++++++++++++++++++ test/parquet_test.cc | 2 +- 11 files changed, 298 insertions(+), 45 deletions(-) create mode 100644 src/iceberg/arrow_array_reader.h create mode 100644 test/file_scan_task_test.cc diff --git a/src/iceberg/arrow_array_reader.h b/src/iceberg/arrow_array_reader.h new file mode 100644 index 000000000..93b2d47b3 --- /dev/null +++ b/src/iceberg/arrow_array_reader.h @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include "iceberg/arrow_c_data.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" + +namespace iceberg { + +class ICEBERG_EXPORT ArrowArrayReader { + public: + /// \brief Read next batch of data. + /// + /// \return std::nullopt if the reader has no more data, otherwise `ArrowArray`. + virtual Result> Next() = 0; + + /// \brief Get schema of data returned by `Next`. + virtual Result Schema() const = 0; + + /// \brief Close this reader and release all resources. + virtual Status Close() = 0; + + virtual ~ArrowArrayReader() = default; +}; + +} // namespace iceberg diff --git a/src/iceberg/avro/avro_reader.cc b/src/iceberg/avro/avro_reader.cc index 048cd4997..ddb8d1df3 100644 --- a/src/iceberg/avro/avro_reader.cc +++ b/src/iceberg/avro/avro_reader.cc @@ -160,7 +160,7 @@ class AvroReader::Impl { return {}; } - Result Schema() { + Result Schema() const { if (!context_) { ICEBERG_RETURN_UNEXPECTED(InitReadContext()); } @@ -174,7 +174,7 @@ class AvroReader::Impl { } private: - Status InitReadContext() { + Status InitReadContext() const { context_ = std::make_unique(); context_->datum_ = std::make_unique<::avro::GenericDatum>(reader_->readerSchema()); @@ -232,14 +232,14 @@ class AvroReader::Impl { // The avro reader to read the data into a datum. std::unique_ptr<::avro::DataFileReader<::avro::GenericDatum>> reader_; // The context to keep track of the reading progress. - std::unique_ptr context_; + mutable std::unique_ptr context_; }; AvroReader::~AvroReader() = default; Result> AvroReader::Next() { return impl_->Next(); } -Result AvroReader::Schema() { return impl_->Schema(); } +Result AvroReader::Schema() const { return impl_->Schema(); } Status AvroReader::Open(const ReaderOptions& options) { impl_ = std::make_unique(); diff --git a/src/iceberg/avro/avro_reader.h b/src/iceberg/avro/avro_reader.h index 07737bb7b..6481fe276 100644 --- a/src/iceberg/avro/avro_reader.h +++ b/src/iceberg/avro/avro_reader.h @@ -37,7 +37,7 @@ class ICEBERG_BUNDLE_EXPORT AvroReader : public Reader { Result> Next() final; - Result Schema() final; + Result Schema() const final; private: class Impl; diff --git a/src/iceberg/file_reader.h b/src/iceberg/file_reader.h index 8a59e33fe..0ef2cd7dc 100644 --- a/src/iceberg/file_reader.h +++ b/src/iceberg/file_reader.h @@ -26,6 +26,7 @@ #include #include +#include "iceberg/arrow_array_reader.h" #include "iceberg/arrow_c_data.h" #include "iceberg/file_format.h" #include "iceberg/result.h" @@ -34,7 +35,7 @@ namespace iceberg { /// \brief Base reader class to read data from different file formats. -class ICEBERG_EXPORT Reader { +class ICEBERG_EXPORT Reader : public ArrowArrayReader { public: virtual ~Reader() = default; Reader() = default; @@ -45,15 +46,15 @@ class ICEBERG_EXPORT Reader { virtual Status Open(const struct ReaderOptions& options) = 0; /// \brief Close the reader. - virtual Status Close() = 0; + Status Close() override = 0; /// \brief Read next data from file. /// /// \return std::nullopt if the reader has no more data, otherwise `ArrowArray`. - virtual Result> Next() = 0; + Result> Next() override = 0; /// \brief Get the schema of the data. - virtual Result Schema() = 0; + Result Schema() const override = 0; }; /// \brief A split of the file to read. diff --git a/src/iceberg/parquet/parquet_reader.cc b/src/iceberg/parquet/parquet_reader.cc index 405b09f03..b6490f65e 100644 --- a/src/iceberg/parquet/parquet_reader.cc +++ b/src/iceberg/parquet/parquet_reader.cc @@ -173,7 +173,7 @@ class ParquetReader::Impl { } // Get the schema of the data - Result Schema() { + Result Schema() const { if (!context_) { ICEBERG_RETURN_UNEXPECTED(InitReadContext()); } @@ -185,7 +185,7 @@ class ParquetReader::Impl { } private: - Status InitReadContext() { + Status InitReadContext() const { context_ = std::make_unique(); // Build the output Arrow schema @@ -239,14 +239,14 @@ class ParquetReader::Impl { // Parquet file reader to create RecordBatchReader. std::unique_ptr<::parquet::arrow::FileReader> reader_; // The context to keep track of the reading progress. - std::unique_ptr context_; + mutable std::unique_ptr context_; }; ParquetReader::~ParquetReader() = default; Result> ParquetReader::Next() { return impl_->Next(); } -Result ParquetReader::Schema() { return impl_->Schema(); } +Result ParquetReader::Schema() const { return impl_->Schema(); } Status ParquetReader::Open(const ReaderOptions& options) { impl_ = std::make_unique(); diff --git a/src/iceberg/parquet/parquet_reader.h b/src/iceberg/parquet/parquet_reader.h index 23d34dfa9..47ecd44c2 100644 --- a/src/iceberg/parquet/parquet_reader.h +++ b/src/iceberg/parquet/parquet_reader.h @@ -37,7 +37,7 @@ class ICEBERG_BUNDLE_EXPORT ParquetReader : public Reader { Result> Next() final; - Result Schema() final; + Result Schema() const final; private: class Impl; diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index a7edd5d79..7be5cf0fb 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -21,7 +21,11 @@ #include #include +#include +#include + +#include "iceberg/file_reader.h" #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" #include "iceberg/manifest_reader.h" @@ -33,18 +37,6 @@ namespace iceberg { -// implement FileScanTask -FileScanTask::FileScanTask(std::shared_ptr data_file) - : data_file_(std::move(data_file)) {} - -const std::shared_ptr& FileScanTask::data_file() const { return data_file_; } - -int64_t FileScanTask::size_bytes() const { return data_file_->file_size_in_bytes; } - -int32_t FileScanTask::files_count() const { return 1; } - -int64_t FileScanTask::estimated_row_count() const { return data_file_->record_count; } - TableScanBuilder::TableScanBuilder(std::shared_ptr table_metadata, std::shared_ptr file_io) : file_io_(std::move(file_io)) { @@ -178,4 +170,29 @@ Result>> DataTableScan::PlanFiles() co return tasks; } +FileScanTask::FileScanTask(std::shared_ptr data_file) + : data_file_(std::move(data_file)) {} + +const std::shared_ptr& FileScanTask::data_file() const { return data_file_; } + +int64_t FileScanTask::size_bytes() const { return data_file_->file_size_in_bytes; } + +int32_t FileScanTask::files_count() const { return 1; } + +int64_t FileScanTask::estimated_row_count() const { return data_file_->record_count; } + +Result> FileScanTask::ToArrowArrayReader( + const TableScanContext& context, const std::shared_ptr& io) const { + const ReaderOptions options{.path = data_file_->file_path, + .length = data_file_->file_size_in_bytes, + .io = io, + .projection = context.projected_schema, + .filter = context.filter}; + + ICEBERG_ASSIGN_OR_RAISE(auto reader, + ReaderFactoryRegistry::Open(data_file_->file_format, options)); + + return std::move(reader); +} + } // namespace iceberg diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index dcfa72205..c72f9f539 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -22,6 +22,7 @@ #include #include +#include "iceberg/arrow_array_reader.h" #include "iceberg/manifest_entry.h" #include "iceberg/type_fwd.h" @@ -42,23 +43,6 @@ class ICEBERG_EXPORT ScanTask { virtual int64_t estimated_row_count() const = 0; }; -/// \brief Task representing a data file and its corresponding delete files. -class ICEBERG_EXPORT FileScanTask : public ScanTask { - public: - explicit FileScanTask(std::shared_ptr data_file); - - /// \brief The data file that should be read by this scan task. - const std::shared_ptr& data_file() const; - - int64_t size_bytes() const override; - int32_t files_count() const override; - int64_t estimated_row_count() const override; - - private: - /// \brief Data file metadata. - std::shared_ptr data_file_; -}; - /// \brief Scan context holding snapshot and scan-specific metadata. struct TableScanContext { /// \brief Table metadata. @@ -185,4 +169,40 @@ class ICEBERG_EXPORT DataTableScan : public TableScan { Result>> PlanFiles() const override; }; +/// \brief Task representing a data file and its corresponding delete files. +class ICEBERG_EXPORT FileScanTask : public ScanTask { + public: + explicit FileScanTask(std::shared_ptr data_file); + + /// \brief The data file that should be read by this scan task. + const std::shared_ptr& data_file() const; + + /// \brief The total size in bytes of the file split to be read. + int64_t size_bytes() const override; + + /// \brief The number of files that should be read by this scan task. + int32_t files_count() const override; + + /// \brief The number of rows that should be read by this scan task. + int64_t estimated_row_count() const override; + + /** + * \brief Creates and returns an ArrowArrayReader to read the data for this task. + * + * This acts as a factory to instantiate a file-format-specific reader (e.g., Parquet) + * based on the metadata in this task and the provided context. + * + * \param context The table scan context, used to configure the reader (e.g., with the + * projected schema). + * \param io The FileIO instance for accessing the file data. + * \return A Result containing a unique pointer to the reader, or an error on failure. + */ + Result> ToArrowArrayReader( + const TableScanContext& context, const std::shared_ptr& io) const; + + private: + /// \brief Data file metadata. + std::shared_ptr data_file_; +}; + } // namespace iceberg diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 8dbb3df86..ab24cb981 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -122,5 +122,6 @@ if(ICEBERG_BUILD_BUNDLE) SOURCES parquet_data_test.cc parquet_schema_test.cc - parquet_test.cc) + parquet_test.cc + file_scan_task_test.cc) endif() diff --git a/test/file_scan_task_test.cc b/test/file_scan_task_test.cc new file mode 100644 index 000000000..fedf10db7 --- /dev/null +++ b/test/file_scan_task_test.cc @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/arrow_array_reader.h" +#include "iceberg/file_format.h" +#include "iceberg/manifest_entry.h" +#include "iceberg/parquet/parquet_register.h" +#include "iceberg/schema.h" +#include "iceberg/table_scan.h" +#include "iceberg/type.h" +#include "iceberg/util/checked_cast.h" +#include "matchers.h" +#include "temp_file_test_base.h" + +namespace iceberg { + +class FileScanTaskTest : public TempFileTestBase { + protected: + static void SetUpTestSuite() { parquet::RegisterAll(); } + + void SetUp() override { + TempFileTestBase::SetUp(); + file_io_ = arrow::ArrowFileSystemFileIO::MakeLocalFileIO(); + temp_parquet_file_ = CreateNewTempFilePathWithSuffix(".parquet"); + CreateSimpleParquetFile(); + } + + // Helper method to create a Parquet file with sample data. + // This is identical to the one in ParquetReaderTest. + void CreateSimpleParquetFile() { + const std::string kParquetFieldIdKey = "PARQUET:field_id"; + auto arrow_schema = ::arrow::schema( + {::arrow::field("id", ::arrow::int32(), /*nullable=*/false, + ::arrow::KeyValueMetadata::Make({kParquetFieldIdKey}, {"1"})), + ::arrow::field("name", ::arrow::utf8(), /*nullable=*/true, + ::arrow::KeyValueMetadata::Make({kParquetFieldIdKey}, {"2"}))}); + auto table = ::arrow::Table::FromRecordBatches( + arrow_schema, {::arrow::RecordBatch::FromStructArray( + ::arrow::json::ArrayFromJSONString( + ::arrow::struct_(arrow_schema->fields()), + R"([[1, "Foo"], [2, "Bar"], [3, "Baz"]])") + .ValueOrDie()) + .ValueOrDie()}) + .ValueOrDie(); + + auto io = internal::checked_cast(*file_io_); + auto outfile = io.fs()->OpenOutputStream(temp_parquet_file_).ValueOrDie(); + + ASSERT_TRUE(::parquet::arrow::WriteTable(*table, ::arrow::default_memory_pool(), + outfile, /*chunk_size=*/1024) + .ok()); + } + + // Helper method to verify the content of the next batch from the reader. + // This is identical to the one in ParquetReaderTest and AvroReaderTest. + void VerifyNextBatch(ArrowArrayReader& reader, std::string_view expected_json) { + // Boilerplate to get Arrow schema + auto schema_result = reader.Schema(); + ASSERT_THAT(schema_result, IsOk()); + auto arrow_c_schema = std::move(schema_result.value()); + auto import_schema_result = ::arrow::ImportType(&arrow_c_schema); + auto arrow_schema = import_schema_result.ValueOrDie(); + + // Boilerplate to get Arrow array + auto data = reader.Next(); + ASSERT_THAT(data, IsOk()) << "Reader.Next() failed: " << data.error().message; + ASSERT_TRUE(data.value().has_value()) << "Reader.Next() returned no data"; + auto arrow_c_array = data.value().value(); + auto data_result = ::arrow::ImportArray(&arrow_c_array, arrow_schema); + auto arrow_array = data_result.ValueOrDie(); + + // Verify data + auto expected_array = + ::arrow::json::ArrayFromJSONString(arrow_schema, expected_json).ValueOrDie(); + ASSERT_TRUE(arrow_array->Equals(*expected_array)) + << "Expected: " << expected_array->ToString() + << "\nGot: " << arrow_array->ToString(); + } + + // Helper method to verify that the reader is exhausted. + void VerifyExhausted(ArrowArrayReader& reader) { + auto data = reader.Next(); + ASSERT_THAT(data, IsOk()); + ASSERT_FALSE(data.value().has_value()); + } + + std::shared_ptr file_io_; + std::string temp_parquet_file_; +}; + +TEST_F(FileScanTaskTest, ReadFullSchema) { + auto data_file = std::make_shared(); + data_file->file_path = temp_parquet_file_; + data_file->file_format = FileFormatType::kParquet; + + auto io_internal = internal::checked_cast(*file_io_); + data_file->file_size_in_bytes = + io_internal.fs()->GetFileInfo(temp_parquet_file_).ValueOrDie().size(); + + TableScanContext context; + context.projected_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(2, "name", string())}); + + FileScanTask task(data_file); + + auto reader_result = task.ToArrowArrayReader(context, file_io_); + ASSERT_THAT(reader_result, IsOk()); + auto reader = std::move(reader_result.value()); + + ASSERT_NO_FATAL_FAILURE( + VerifyNextBatch(*reader, R"([[1, "Foo"], [2, "Bar"], [3, "Baz"]])")); + ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); +} + +TEST_F(FileScanTaskTest, ReadProjectedAndReorderedSchema) { + auto data_file = std::make_shared(); + data_file->file_path = temp_parquet_file_; + data_file->file_format = FileFormatType::kParquet; + + auto io_internal = internal::checked_cast(*file_io_); + data_file->file_size_in_bytes = + io_internal.fs()->GetFileInfo(temp_parquet_file_).ValueOrDie().size(); + + TableScanContext context; + context.projected_schema = std::make_shared( + std::vector{SchemaField::MakeOptional(2, "name", string()), + SchemaField::MakeOptional(3, "score", float64())}); + + FileScanTask task(data_file); + + auto reader_result = task.ToArrowArrayReader(context, file_io_); + ASSERT_THAT(reader_result, IsOk()); + auto reader = std::move(reader_result.value()); + + ASSERT_NO_FATAL_FAILURE( + VerifyNextBatch(*reader, R"([["Foo", null], ["Bar", null], ["Baz", null]])")); + ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); +} + +} // namespace iceberg diff --git a/test/parquet_test.cc b/test/parquet_test.cc index 122256bb3..825517857 100644 --- a/test/parquet_test.cc +++ b/test/parquet_test.cc @@ -28,7 +28,7 @@ #include #include "iceberg/arrow/arrow_fs_file_io_internal.h" -#include "iceberg/parquet/parquet_reader.h" +#include "iceberg/file_reader.h" #include "iceberg/parquet/parquet_register.h" #include "iceberg/schema.h" #include "iceberg/type.h" From 607bc4e22d4bcfc415d7e33805139b5b2ddc2bcc Mon Sep 17 00:00:00 2001 From: Li Feiyang Date: Tue, 2 Sep 2025 11:07:41 +0800 Subject: [PATCH 2/3] fix review --- src/iceberg/file_reader.h | 12 ------ src/iceberg/table_scan.cc | 54 +++++++++++++-------------- src/iceberg/table_scan.h | 73 +++++++++++++++++++------------------ test/CMakeLists.txt | 6 ++- test/file_scan_task_test.cc | 10 ++--- 5 files changed, 72 insertions(+), 83 deletions(-) diff --git a/src/iceberg/file_reader.h b/src/iceberg/file_reader.h index 0ef2cd7dc..4bafefda1 100644 --- a/src/iceberg/file_reader.h +++ b/src/iceberg/file_reader.h @@ -37,24 +37,12 @@ namespace iceberg { /// \brief Base reader class to read data from different file formats. class ICEBERG_EXPORT Reader : public ArrowArrayReader { public: - virtual ~Reader() = default; Reader() = default; Reader(const Reader&) = delete; Reader& operator=(const Reader&) = delete; /// \brief Open the reader. virtual Status Open(const struct ReaderOptions& options) = 0; - - /// \brief Close the reader. - Status Close() override = 0; - - /// \brief Read next data from file. - /// - /// \return std::nullopt if the reader has no more data, otherwise `ArrowArray`. - Result> Next() override = 0; - - /// \brief Get the schema of the data. - Result Schema() const override = 0; }; /// \brief A split of the file to read. diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 7be5cf0fb..aa1719566 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -23,8 +23,6 @@ #include #include -#include - #include "iceberg/file_reader.h" #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" @@ -37,6 +35,33 @@ namespace iceberg { +// implement FileScanTask +FileScanTask::FileScanTask(std::shared_ptr data_file) + : data_file_(std::move(data_file)) {} + +const std::shared_ptr& FileScanTask::data_file() const { return data_file_; } + +int64_t FileScanTask::size_bytes() const { return data_file_->file_size_in_bytes; } + +int32_t FileScanTask::files_count() const { return 1; } + +int64_t FileScanTask::estimated_row_count() const { return data_file_->record_count; } + +Result> FileScanTask::ToArrowArrayReader( + const std::shared_ptr& projected_schema, + const std::shared_ptr& filter, const std::shared_ptr& io) const { + const ReaderOptions options{.path = data_file_->file_path, + .length = data_file_->file_size_in_bytes, + .io = io, + .projection = projected_schema, + .filter = filter}; + + ICEBERG_ASSIGN_OR_RAISE(auto reader, + ReaderFactoryRegistry::Open(data_file_->file_format, options)); + + return std::move(reader); +} + TableScanBuilder::TableScanBuilder(std::shared_ptr table_metadata, std::shared_ptr file_io) : file_io_(std::move(file_io)) { @@ -170,29 +195,4 @@ Result>> DataTableScan::PlanFiles() co return tasks; } -FileScanTask::FileScanTask(std::shared_ptr data_file) - : data_file_(std::move(data_file)) {} - -const std::shared_ptr& FileScanTask::data_file() const { return data_file_; } - -int64_t FileScanTask::size_bytes() const { return data_file_->file_size_in_bytes; } - -int32_t FileScanTask::files_count() const { return 1; } - -int64_t FileScanTask::estimated_row_count() const { return data_file_->record_count; } - -Result> FileScanTask::ToArrowArrayReader( - const TableScanContext& context, const std::shared_ptr& io) const { - const ReaderOptions options{.path = data_file_->file_path, - .length = data_file_->file_size_in_bytes, - .io = io, - .projection = context.projected_schema, - .filter = context.filter}; - - ICEBERG_ASSIGN_OR_RAISE(auto reader, - ReaderFactoryRegistry::Open(data_file_->file_format, options)); - - return std::move(reader); -} - } // namespace iceberg diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index c72f9f539..64c094605 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -43,6 +43,43 @@ class ICEBERG_EXPORT ScanTask { virtual int64_t estimated_row_count() const = 0; }; +/// \brief Task representing a data file and its corresponding delete files. +class ICEBERG_EXPORT FileScanTask : public ScanTask { + public: + explicit FileScanTask(std::shared_ptr data_file); + + /// \brief The data file that should be read by this scan task. + const std::shared_ptr& data_file() const; + + /// \brief The total size in bytes of the file split to be read. + int64_t size_bytes() const override; + + /// \brief The number of files that should be read by this scan task. + int32_t files_count() const override; + + /// \brief The number of rows that should be read by this scan task. + int64_t estimated_row_count() const override; + + /** + * \brief Returns an ArrowArrayReader to read the data for this task. + * + * This acts as a factory to instantiate a file-format-specific reader (e.g., Parquet) + * based on the metadata in this task and the provided parameters. + * + * \param projected_schema The projected schema for reading the data. + * \param filter Optional filter expression to apply during reading. + * \param io The FileIO instance for accessing the file data. + * \return A Result containing a unique pointer to the reader, or an error on failure. + */ + Result> ToArrowArrayReader( + const std::shared_ptr& projected_schema, + const std::shared_ptr& filter, const std::shared_ptr& io) const; + + private: + /// \brief Data file metadata. + std::shared_ptr data_file_; +}; + /// \brief Scan context holding snapshot and scan-specific metadata. struct TableScanContext { /// \brief Table metadata. @@ -169,40 +206,4 @@ class ICEBERG_EXPORT DataTableScan : public TableScan { Result>> PlanFiles() const override; }; -/// \brief Task representing a data file and its corresponding delete files. -class ICEBERG_EXPORT FileScanTask : public ScanTask { - public: - explicit FileScanTask(std::shared_ptr data_file); - - /// \brief The data file that should be read by this scan task. - const std::shared_ptr& data_file() const; - - /// \brief The total size in bytes of the file split to be read. - int64_t size_bytes() const override; - - /// \brief The number of files that should be read by this scan task. - int32_t files_count() const override; - - /// \brief The number of rows that should be read by this scan task. - int64_t estimated_row_count() const override; - - /** - * \brief Creates and returns an ArrowArrayReader to read the data for this task. - * - * This acts as a factory to instantiate a file-format-specific reader (e.g., Parquet) - * based on the metadata in this task and the provided context. - * - * \param context The table scan context, used to configure the reader (e.g., with the - * projected schema). - * \param io The FileIO instance for accessing the file data. - * \return A Result containing a unique pointer to the reader, or an error on failure. - */ - Result> ToArrowArrayReader( - const TableScanContext& context, const std::shared_ptr& io) const; - - private: - /// \brief Data file metadata. - std::shared_ptr data_file_; -}; - } // namespace iceberg diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index ab24cb981..bbce0d44c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -122,6 +122,8 @@ if(ICEBERG_BUILD_BUNDLE) SOURCES parquet_data_test.cc parquet_schema_test.cc - parquet_test.cc - file_scan_task_test.cc) + parquet_test.cc) + + add_iceberg_test(scan_test USE_BUNDLE SOURCES file_scan_task_test.cc) + endif() diff --git a/test/file_scan_task_test.cc b/test/file_scan_task_test.cc index fedf10db7..2b0c4485e 100644 --- a/test/file_scan_task_test.cc +++ b/test/file_scan_task_test.cc @@ -124,14 +124,13 @@ TEST_F(FileScanTaskTest, ReadFullSchema) { data_file->file_size_in_bytes = io_internal.fs()->GetFileInfo(temp_parquet_file_).ValueOrDie().size(); - TableScanContext context; - context.projected_schema = std::make_shared( + auto projected_schema = std::make_shared( std::vector{SchemaField::MakeRequired(1, "id", int32()), SchemaField::MakeOptional(2, "name", string())}); FileScanTask task(data_file); - auto reader_result = task.ToArrowArrayReader(context, file_io_); + auto reader_result = task.ToArrowArrayReader(projected_schema, nullptr, file_io_); ASSERT_THAT(reader_result, IsOk()); auto reader = std::move(reader_result.value()); @@ -149,14 +148,13 @@ TEST_F(FileScanTaskTest, ReadProjectedAndReorderedSchema) { data_file->file_size_in_bytes = io_internal.fs()->GetFileInfo(temp_parquet_file_).ValueOrDie().size(); - TableScanContext context; - context.projected_schema = std::make_shared( + auto projected_schema = std::make_shared( std::vector{SchemaField::MakeOptional(2, "name", string()), SchemaField::MakeOptional(3, "score", float64())}); FileScanTask task(data_file); - auto reader_result = task.ToArrowArrayReader(context, file_io_); + auto reader_result = task.ToArrowArrayReader(projected_schema, nullptr, file_io_); ASSERT_THAT(reader_result, IsOk()); auto reader = std::move(reader_result.value()); From dacc38d64c08c57885609108b2f5e33a9a77d53a Mon Sep 17 00:00:00 2001 From: Li Feiyang Date: Wed, 3 Sep 2025 17:30:16 +0800 Subject: [PATCH 3/3] fix review again --- src/iceberg/arrow_array_reader.h | 1 + src/iceberg/table_scan.cc | 6 +----- src/iceberg/table_scan.h | 6 ------ test/file_scan_task_test.cc | 8 ++++---- 4 files changed, 6 insertions(+), 15 deletions(-) diff --git a/src/iceberg/arrow_array_reader.h b/src/iceberg/arrow_array_reader.h index 93b2d47b3..a86f8162c 100644 --- a/src/iceberg/arrow_array_reader.h +++ b/src/iceberg/arrow_array_reader.h @@ -27,6 +27,7 @@ namespace iceberg { +/// \brief A reader interface that returns ArrowArray in a streaming fashion. class ICEBERG_EXPORT ArrowArrayReader { public: /// \brief Read next batch of data. diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index aa1719566..7ba5cdb8d 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -19,10 +19,6 @@ #include "iceberg/table_scan.h" -#include -#include -#include - #include "iceberg/file_reader.h" #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" @@ -59,7 +55,7 @@ Result> FileScanTask::ToArrowArrayReader( ICEBERG_ASSIGN_OR_RAISE(auto reader, ReaderFactoryRegistry::Open(data_file_->file_format, options)); - return std::move(reader); + return reader; } TableScanBuilder::TableScanBuilder(std::shared_ptr table_metadata, diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index 64c094605..99f8cae5f 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -51,21 +51,15 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask { /// \brief The data file that should be read by this scan task. const std::shared_ptr& data_file() const; - /// \brief The total size in bytes of the file split to be read. int64_t size_bytes() const override; - /// \brief The number of files that should be read by this scan task. int32_t files_count() const override; - /// \brief The number of rows that should be read by this scan task. int64_t estimated_row_count() const override; /** * \brief Returns an ArrowArrayReader to read the data for this task. * - * This acts as a factory to instantiate a file-format-specific reader (e.g., Parquet) - * based on the metadata in this task and the provided parameters. - * * \param projected_schema The projected schema for reading the data. * \param filter Optional filter expression to apply during reading. * \param io The FileIO instance for accessing the file data. diff --git a/test/file_scan_task_test.cc b/test/file_scan_task_test.cc index 2b0c4485e..7643ee3e1 100644 --- a/test/file_scan_task_test.cc +++ b/test/file_scan_task_test.cc @@ -120,9 +120,9 @@ TEST_F(FileScanTaskTest, ReadFullSchema) { data_file->file_path = temp_parquet_file_; data_file->file_format = FileFormatType::kParquet; - auto io_internal = internal::checked_cast(*file_io_); + auto io = internal::checked_cast(*file_io_); data_file->file_size_in_bytes = - io_internal.fs()->GetFileInfo(temp_parquet_file_).ValueOrDie().size(); + io.fs()->GetFileInfo(temp_parquet_file_).ValueOrDie().size(); auto projected_schema = std::make_shared( std::vector{SchemaField::MakeRequired(1, "id", int32()), @@ -144,9 +144,9 @@ TEST_F(FileScanTaskTest, ReadProjectedAndReorderedSchema) { data_file->file_path = temp_parquet_file_; data_file->file_format = FileFormatType::kParquet; - auto io_internal = internal::checked_cast(*file_io_); + auto io = internal::checked_cast(*file_io_); data_file->file_size_in_bytes = - io_internal.fs()->GetFileInfo(temp_parquet_file_).ValueOrDie().size(); + io.fs()->GetFileInfo(temp_parquet_file_).ValueOrDie().size(); auto projected_schema = std::make_shared( std::vector{SchemaField::MakeOptional(2, "name", string()),