diff --git a/examples/read_write_demo.cpp b/examples/read_write_demo.cpp index 549ca272..13d97576 100644 --- a/examples/read_write_demo.cpp +++ b/examples/read_write_demo.cpp @@ -145,6 +145,11 @@ paimon::Status Run(const std::string& root_path, const std::string& db_name, auto result_array = arrow_result.ValueUnsafe(); result_array_vector.push_back(result_array); } + + // Safely close the reader and release resources + batch_reader->Close(); + batch_reader.reset(); + auto chunk_result = arrow::ChunkedArray::Make(result_array_vector); if (!chunk_result.ok()) { return paimon::Status::Invalid(chunk_result.status().ToString()); diff --git a/include/paimon/memory/memory_pool.h b/include/paimon/memory/memory_pool.h index cb3e8cc4..737416c8 100644 --- a/include/paimon/memory/memory_pool.h +++ b/include/paimon/memory/memory_pool.h @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -30,6 +31,7 @@ namespace paimon { class MemoryPool; +class MemoryPoolAdaptorHolder; /// Create a default implementation of memory pool. /// @return Unique pointer to a newly created `MemoryPool` instance. @@ -42,7 +44,7 @@ PAIMON_EXPORT std::shared_ptr GetDefaultPool(); /// Abstract base class for memory pool implementations that provides controlled memory management. class PAIMON_EXPORT MemoryPool { public: - virtual ~MemoryPool() = default; + virtual ~MemoryPool(); MemoryPool& operator=(const MemoryPool& other) = delete; MemoryPool& operator=(MemoryPool& other) = delete; MemoryPool& operator=(MemoryPool&& other) = delete; @@ -110,6 +112,22 @@ class PAIMON_EXPORT MemoryPool { /// @return Peak memory usage in bytes. virtual uint64_t MaxMemoryUsage() const = 0; + /// Adapts this memory pool to a specified memory pool adaptor type. + /// + /// Returns a pointer to the memory pool adaptor of the specified type. The adaptor + /// is lazily created on first access and cached by this memory pool instance for + /// subsequent calls. + /// This method is thread-safe. + /// + /// @tparam MemoryPoolAdaptor Type of memory pool adaptor to retrieve. Must inherit + /// from paimon::MemoryPoolAdaptor. + /// @return Pointer to the requested memory pool adaptor. + template + MemoryPoolAdaptor* AsSpecifiedMemoryPool() { + void* adaptor = GetAdaptor(MemoryPoolAdaptor::Identifier(), MemoryPoolAdaptor::Create); + return static_cast(adaptor); + } + /// Custom deleter for use with std::unique_ptr that integrates with memory pools. /// /// AllocatorDelete provides automatic memory deallocation through the memory pool @@ -141,7 +159,7 @@ class PAIMON_EXPORT MemoryPool { return *this; } - AllocatorDelete& operator=(AllocatorDelete&& other) { + AllocatorDelete& operator=(AllocatorDelete&& other) noexcept { pool = other.GetPool(); size = other.GetSize(); return *this; @@ -208,6 +226,15 @@ class PAIMON_EXPORT MemoryPool { return std::unique_ptr>(reinterpret_cast(p), AllocatorDelete(*this, sizeof(T))); } + + using AdaptorPtr = std::unique_ptr; + using AdaptorCreator = std::function; + + private: + void* GetAdaptor(const std::string& identifier, const AdaptorCreator& creator); + + std::unique_ptr holder_; + std::once_flag flag_; }; template diff --git a/include/paimon/memory/memory_pool_adaptor.h b/include/paimon/memory/memory_pool_adaptor.h new file mode 100644 index 00000000..bbff69d9 --- /dev/null +++ b/include/paimon/memory/memory_pool_adaptor.h @@ -0,0 +1,63 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/memory/memory_pool.h" + +namespace paimon { + +/// CRTP base class for memory pool adaptors. +/// +/// This class provides the common interface required by MemoryPool::AsSpecifiedMemoryPool(). +/// Subclasses should inherit from MemoryPoolAdaptor and implement: +/// - A static Identifier() method returning a unique string identifier +/// - A constructor accepting MemoryPool& as parameter +/// +/// @tparam Adaptor The derived adaptor class (CRTP pattern). +/// +/// @example +/// class MyPoolAdaptor : public SomePoolInterface, +/// public MemoryPoolAdaptor { +/// public: +/// explicit MyPoolAdaptor(MemoryPool& pool) : pool_(pool) {} +/// static std::string Identifier() { return "MyPoolAdaptor"; } +/// // ... implement SomePoolInterface methods ... +/// private: +/// MemoryPool& pool_; +/// }; +/// +/// SomePoolInterface* AsSomePool(MemoryPool& pool) { +/// return pool.AsSpecifiedMemoryPool(); +/// } +template +class MemoryPoolAdaptor { + public: + static std::string Identifier() { + return Adaptor::Identifier(); + } + + static MemoryPool::AdaptorPtr Create(MemoryPool& pool) { + auto adaptor = std::make_unique(pool); + return MemoryPool::AdaptorPtr(adaptor.release(), + [](void* ptr) { delete static_cast(ptr); }); + } +}; + +} // namespace paimon diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index 964bc668..e8da57b9 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -113,7 +113,6 @@ set(PAIMON_COMMON_SRCS common/types/data_type_json_parser.cpp common/types/row_kind.cpp common/types/row_type.cpp - common/utils/arrow/mem_utils.cpp common/utils/binary_row_partition_computer.cpp common/utils/bit_set.cpp common/utils/bloom_filter.cpp @@ -388,8 +387,8 @@ if(PAIMON_BUILD_TESTS) common/types/data_type_json_parser_test.cpp common/types/row_kind_test.cpp common/types/data_type_test.cpp + common/utils/arrow/arrow_memory_pool_adaptor_test.cpp common/utils/arrow/arrow_utils_test.cpp - common/utils/arrow/mem_utils_test.cpp common/utils/arrow/status_utils_test.cpp common/utils/concurrent_hash_map_test.cpp common/utils/projected_row_test.cpp diff --git a/src/paimon/common/global_index/complete_index_score_batch_reader.cpp b/src/paimon/common/global_index/complete_index_score_batch_reader.cpp index d622438f..c6a05ea1 100644 --- a/src/paimon/common/global_index/complete_index_score_batch_reader.cpp +++ b/src/paimon/common/global_index/complete_index_score_batch_reader.cpp @@ -28,19 +28,21 @@ #include "paimon/common/reader/reader_utils.h" #include "paimon/common/table/special_fields.h" #include "paimon/common/types/row_kind.h" -#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/arrow/arrow_memory_pool_adaptor.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/status.h" + namespace paimon { CompleteIndexScoreBatchReader::CompleteIndexScoreBatchReader( std::unique_ptr&& reader, const std::vector& scores, const std::shared_ptr& pool) - : arrow_pool_(GetArrowPool(pool)), reader_(std::move(reader)), scores_(scores) {} + : memory_pool_(pool), reader_(std::move(reader)), scores_(scores) {} Result CompleteIndexScoreBatchReader::NextBatch() { PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatchWithBitmap batch_with_bitmap, NextBatchWithBitmap()); - return ReaderUtils::ApplyBitmapToReadBatch(std::move(batch_with_bitmap), arrow_pool_.get()); + return ReaderUtils::ApplyBitmapToReadBatch(std::move(batch_with_bitmap), + AsArrowMemoryPool(*memory_pool_)); } void CompleteIndexScoreBatchReader::UpdateScoreFieldIndex(const arrow::StructType* struct_type) { @@ -77,8 +79,9 @@ Result CompleteIndexScoreBatchReader::NextBatc // prepare index score array std::unique_ptr index_score_builder; - PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::MakeBuilder( - arrow_pool_.get(), SpecialFields::IndexScore().Type(), &index_score_builder)); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::MakeBuilder(AsArrowMemoryPool(*memory_pool_), + SpecialFields::IndexScore().Type(), + &index_score_builder)); auto typed_builder = dynamic_cast(index_score_builder.get()); assert(typed_builder); PAIMON_RETURN_NOT_OK_FROM_ARROW(typed_builder->Reserve(struct_array->length())); diff --git a/src/paimon/common/global_index/complete_index_score_batch_reader.h b/src/paimon/common/global_index/complete_index_score_batch_reader.h index 72743067..feb2b3eb 100644 --- a/src/paimon/common/global_index/complete_index_score_batch_reader.h +++ b/src/paimon/common/global_index/complete_index_score_batch_reader.h @@ -60,7 +60,7 @@ class CompleteIndexScoreBatchReader : public BatchReader { size_t score_cursor_ = 0; int32_t index_score_field_idx_ = -1; std::vector field_names_with_score_; - std::unique_ptr arrow_pool_; + std::shared_ptr memory_pool_; std::unique_ptr reader_; std::vector scores_; }; diff --git a/src/paimon/common/memory/memory_pool.cpp b/src/paimon/common/memory/memory_pool.cpp index a0f35736..2d28f2c4 100644 --- a/src/paimon/common/memory/memory_pool.cpp +++ b/src/paimon/common/memory/memory_pool.cpp @@ -22,6 +22,9 @@ #include #include #include +#include + +#include "paimon/common/memory/memory_pool_adaptor_holder.h" namespace paimon { @@ -92,6 +95,14 @@ uint64_t MemoryPoolImpl::CurrentUsage() const { return total_allocated_size.load(); } +MemoryPool::~MemoryPool() = default; + +void* MemoryPool::GetAdaptor(const std::string& identifier, const AdaptorCreator& creator) { + std::call_once(flag_, [this]() { holder_ = std::make_unique(); }); + MemoryPoolAdaptorSlot& slot = holder_->GetOrCreateSlot(identifier, creator); + return slot.GetOrCreateAdaptor(*this); +} + PAIMON_EXPORT std::shared_ptr GetDefaultPool() { static std::shared_ptr internal = std::make_shared(); return internal; diff --git a/src/paimon/common/memory/memory_pool_adaptor_holder.h b/src/paimon/common/memory/memory_pool_adaptor_holder.h new file mode 100644 index 00000000..cbc0b989 --- /dev/null +++ b/src/paimon/common/memory/memory_pool_adaptor_holder.h @@ -0,0 +1,67 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "paimon/memory/memory_pool.h" + +namespace paimon { +class MemoryPoolAdaptorSlot { + public: + explicit MemoryPoolAdaptorSlot(MemoryPool::AdaptorCreator creator) + : creator_(std::move(creator)) {} + + void* GetOrCreateAdaptor(MemoryPool& pool) { + std::call_once(once_flag_, [this, &pool] { ptr_ = creator_(pool); }); + return ptr_.get(); + } + + private: + MemoryPool::AdaptorCreator creator_; + MemoryPool::AdaptorPtr ptr_{nullptr, [](void*) {}}; + std::once_flag once_flag_; +}; + +class MemoryPoolAdaptorHolder { + public: + MemoryPoolAdaptorSlot& GetOrCreateSlot(const std::string& identifier, + const MemoryPool::AdaptorCreator& creator) { + auto snapshot = std::atomic_load(&slot_map_); + if (auto it = snapshot->find(identifier); it != snapshot->end()) { + return *it->second; + } + auto new_slot = std::make_shared(creator); + while (true) { + auto new_snapshot = std::make_shared(*snapshot); + auto [slot, inserted] = new_snapshot->emplace(identifier, new_slot); + if (std::atomic_compare_exchange_strong(&slot_map_, &snapshot, new_snapshot)) { + return *slot->second; + } + } + } + + private: + using SlotPtr = std::shared_ptr; + using SlotMap = std::unordered_map; + std::shared_ptr slot_map_{std::make_shared()}; +}; + +} // namespace paimon diff --git a/src/paimon/common/reader/complete_row_kind_batch_reader.cpp b/src/paimon/common/reader/complete_row_kind_batch_reader.cpp index d4e377a7..8ee88235 100644 --- a/src/paimon/common/reader/complete_row_kind_batch_reader.cpp +++ b/src/paimon/common/reader/complete_row_kind_batch_reader.cpp @@ -29,6 +29,7 @@ #include "paimon/common/table/special_fields.h" #include "paimon/common/types/data_field.h" #include "paimon/common/types/row_kind.h" +#include "paimon/common/utils/arrow/arrow_memory_pool_adaptor.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/status.h" @@ -37,7 +38,8 @@ namespace paimon { Result CompleteRowKindBatchReader::NextBatch() { PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatchWithBitmap batch_with_bitmap, NextBatchWithBitmap()); - return ReaderUtils::ApplyBitmapToReadBatch(std::move(batch_with_bitmap), arrow_pool_.get()); + return ReaderUtils::ApplyBitmapToReadBatch(std::move(batch_with_bitmap), + AsArrowMemoryPool(*memory_pool_)); } Result CompleteRowKindBatchReader::NextBatchWithBitmap() { @@ -82,8 +84,8 @@ Result> CompleteRowKindBatchReader::PrepareRowKind auto row_kind_scalar = std::make_shared(RowKind::Insert()->ToByteValue()); PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( - row_kind_array_, - arrow::MakeArrayFromScalar(*row_kind_scalar, struct_array_length, arrow_pool_.get())); + row_kind_array_, arrow::MakeArrayFromScalar(*row_kind_scalar, struct_array_length, + AsArrowMemoryPool(*memory_pool_))); return row_kind_array_; } else { return row_kind_array_->Slice(0, struct_array_length); diff --git a/src/paimon/common/reader/complete_row_kind_batch_reader.h b/src/paimon/common/reader/complete_row_kind_batch_reader.h index 548ab3d5..f8ec8666 100644 --- a/src/paimon/common/reader/complete_row_kind_batch_reader.h +++ b/src/paimon/common/reader/complete_row_kind_batch_reader.h @@ -24,7 +24,6 @@ #include "arrow/api.h" #include "arrow/array/array_base.h" -#include "paimon/common/utils/arrow/mem_utils.h" #include "paimon/reader/batch_reader.h" #include "paimon/result.h" @@ -40,7 +39,7 @@ class CompleteRowKindBatchReader : public BatchReader { public: CompleteRowKindBatchReader(std::unique_ptr&& reader, const std::shared_ptr& pool) - : arrow_pool_(GetArrowPool(pool)), reader_(std::move(reader)) {} + : memory_pool_(pool), reader_(std::move(reader)) {} Result NextBatch() override; @@ -62,7 +61,7 @@ class CompleteRowKindBatchReader : public BatchReader { void UpdateFieldNamesWithRowKind(const std::shared_ptr& struct_array); private: - std::unique_ptr arrow_pool_; + std::shared_ptr memory_pool_; std::unique_ptr reader_; std::shared_ptr row_kind_array_; std::vector field_names_with_row_kind_; diff --git a/src/paimon/common/reader/concat_batch_reader.cpp b/src/paimon/common/reader/concat_batch_reader.cpp index 8c172c8d..2a1f0bbc 100644 --- a/src/paimon/common/reader/concat_batch_reader.cpp +++ b/src/paimon/common/reader/concat_batch_reader.cpp @@ -21,19 +21,20 @@ #include "arrow/c/abi.h" #include "paimon/common/metrics/metrics_impl.h" #include "paimon/common/reader/reader_utils.h" -#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/arrow/arrow_memory_pool_adaptor.h" namespace paimon { class MemoryPool; ConcatBatchReader::ConcatBatchReader(std::vector>&& readers, const std::shared_ptr& pool) - : arrow_pool_(GetArrowPool(pool)), readers_(std::move(readers)), current_(0) {} + : memory_pool_(pool), readers_(std::move(readers)), current_(0) {} Result ConcatBatchReader::NextBatch() { PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatchWithBitmap batch_with_bitmap, NextBatchWithBitmap()); - return ReaderUtils::ApplyBitmapToReadBatch(std::move(batch_with_bitmap), arrow_pool_.get()); + return ReaderUtils::ApplyBitmapToReadBatch(std::move(batch_with_bitmap), + AsArrowMemoryPool(*memory_pool_)); } void ConcatBatchReader::Close() { diff --git a/src/paimon/common/reader/concat_batch_reader.h b/src/paimon/common/reader/concat_batch_reader.h index 4fc06516..a72e9742 100644 --- a/src/paimon/common/reader/concat_batch_reader.h +++ b/src/paimon/common/reader/concat_batch_reader.h @@ -41,7 +41,7 @@ class ConcatBatchReader : public BatchReader { std::shared_ptr GetReaderMetrics() const override; private: - std::unique_ptr arrow_pool_; + std::shared_ptr memory_pool_; std::vector> readers_; size_t current_; }; diff --git a/src/paimon/common/reader/data_evolution_file_reader.cpp b/src/paimon/common/reader/data_evolution_file_reader.cpp index 019ef782..ca0a37e9 100644 --- a/src/paimon/common/reader/data_evolution_file_reader.cpp +++ b/src/paimon/common/reader/data_evolution_file_reader.cpp @@ -21,8 +21,9 @@ #include "fmt/format.h" #include "paimon/common/metrics/metrics_impl.h" #include "paimon/common/reader/reader_utils.h" -#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/arrow/arrow_memory_pool_adaptor.h" #include "paimon/common/utils/arrow/status_utils.h" + namespace paimon { Result> DataEvolutionFileReader::Create( std::vector>&& readers, @@ -40,9 +41,8 @@ Result> DataEvolutionFileReader::Create if (readers.size() <= 1) { return Status::Invalid("readers size is supposed to be more than 1"); } - return std::unique_ptr( - new DataEvolutionFileReader(std::move(readers), read_schema, read_batch_size, - reader_offsets, field_offsets, GetArrowPool(pool))); + return std::unique_ptr(new DataEvolutionFileReader( + std::move(readers), read_schema, read_batch_size, reader_offsets, field_offsets, pool)); } Result DataEvolutionFileReader::NextBatchWithBitmap() { @@ -101,7 +101,7 @@ Result> DataEvolutionFileReader::GetOrCreateNonExi PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( non_exist_array_vec_[field_idx], arrow::MakeArrayOfNull(read_schema_->field(field_idx)->type(), array_length, - arrow_pool_.get())); + AsArrowMemoryPool(*memory_pool_))); } if (non_exist_array_vec_[field_idx]->length() == array_length) { return non_exist_array_vec_[field_idx]; @@ -170,8 +170,9 @@ Result> DataEvolutionFileReader::NextBatchForSingl return concat_array_vec[0]; } // TODO(xinyu.lxy) remove data copy for efficiency - PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr concat_array, - arrow::Concatenate(concat_array_vec, arrow_pool_.get())); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + std::shared_ptr concat_array, + arrow::Concatenate(concat_array_vec, AsArrowMemoryPool(*memory_pool_))); assert(concat_array->length() == total_array_length); assert(concat_array->length() <= read_batch_size_); return concat_array; diff --git a/src/paimon/common/reader/data_evolution_file_reader.h b/src/paimon/common/reader/data_evolution_file_reader.h index 2ba5853e..f3861e22 100644 --- a/src/paimon/common/reader/data_evolution_file_reader.h +++ b/src/paimon/common/reader/data_evolution_file_reader.h @@ -72,8 +72,8 @@ class DataEvolutionFileReader : public BatchReader { const std::shared_ptr& read_schema, int32_t read_batch_size, const std::vector& reader_offsets, const std::vector& field_offsets, - const std::shared_ptr& arrow_pool) - : arrow_pool_(arrow_pool), + const std::shared_ptr& memory_pool) + : memory_pool_(memory_pool), readers_(std::move(readers)), read_schema_(read_schema), read_batch_size_(read_batch_size), @@ -90,7 +90,7 @@ class DataEvolutionFileReader : public BatchReader { int64_t array_length); private: - std::shared_ptr arrow_pool_; + std::shared_ptr memory_pool_; std::vector> readers_; std::shared_ptr read_schema_; int32_t read_batch_size_; diff --git a/src/paimon/common/reader/data_evolution_file_reader_test.cpp b/src/paimon/common/reader/data_evolution_file_reader_test.cpp index 51f8f7bf..b1b04870 100644 --- a/src/paimon/common/reader/data_evolution_file_reader_test.cpp +++ b/src/paimon/common/reader/data_evolution_file_reader_test.cpp @@ -25,7 +25,6 @@ #include "arrow/ipc/api.h" #include "arrow/util/range.h" #include "gtest/gtest.h" -#include "paimon/common/utils/arrow/mem_utils.h" #include "paimon/memory/memory_pool.h" #include "paimon/testing/mock/mock_file_batch_reader.h" #include "paimon/testing/utils/read_result_collector.h" @@ -109,7 +108,7 @@ class DataEvolutionFileReaderTest : public ::testing::Test, readers.push_back(std::move(file_batch_reader)); DataEvolutionFileReader fake_data_evolution_reader( std::move(readers), /*read_schema=*/arrow::schema({}), read_batch_size, - /*reader_offsets=*/{}, /*field_offsets=*/{}, GetArrowPool(pool_)); + /*reader_offsets=*/{}, /*field_offsets=*/{}, pool_); arrow::ArrayVector result_array_vec; while (true) { ASSERT_OK_AND_ASSIGN(auto result_array, diff --git a/src/paimon/common/reader/predicate_batch_reader.cpp b/src/paimon/common/reader/predicate_batch_reader.cpp index da4ece4b..af650f8b 100644 --- a/src/paimon/common/reader/predicate_batch_reader.cpp +++ b/src/paimon/common/reader/predicate_batch_reader.cpp @@ -30,7 +30,7 @@ #include "fmt/format.h" #include "paimon/common/predicate/predicate_filter.h" #include "paimon/common/reader/reader_utils.h" -#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/arrow/arrow_memory_pool_adaptor.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/predicate/predicate.h" #include "paimon/status.h" @@ -41,9 +41,7 @@ class MemoryPool; PredicateBatchReader::PredicateBatchReader(std::unique_ptr&& reader, const std::shared_ptr& predicate_filter, const std::shared_ptr& pool) - : arrow_pool_(GetArrowPool(pool)), - reader_(std::move(reader)), - predicate_filter_(predicate_filter) {} + : memory_pool_(pool), reader_(std::move(reader)), predicate_filter_(predicate_filter) {} Result> PredicateBatchReader::Create( std::unique_ptr&& reader, const std::shared_ptr& predicate, @@ -63,7 +61,8 @@ Result> PredicateBatchReader::Create( Result PredicateBatchReader::NextBatch() { PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatchWithBitmap batch_with_bitmap, NextBatchWithBitmap()); - return ReaderUtils::ApplyBitmapToReadBatch(std::move(batch_with_bitmap), arrow_pool_.get()); + return ReaderUtils::ApplyBitmapToReadBatch(std::move(batch_with_bitmap), + AsArrowMemoryPool(*memory_pool_)); } Result PredicateBatchReader::NextBatchWithBitmap() { diff --git a/src/paimon/common/reader/predicate_batch_reader.h b/src/paimon/common/reader/predicate_batch_reader.h index d5619281..40a49d30 100644 --- a/src/paimon/common/reader/predicate_batch_reader.h +++ b/src/paimon/common/reader/predicate_batch_reader.h @@ -60,7 +60,7 @@ class PredicateBatchReader : public BatchReader { Result Filter(const std::shared_ptr& array) const; private: - std::unique_ptr arrow_pool_; + std::shared_ptr memory_pool_; std::unique_ptr reader_; std::shared_ptr predicate_filter_; }; diff --git a/src/paimon/common/utils/arrow/mem_utils.cpp b/src/paimon/common/utils/arrow/arrow_memory_pool_adaptor.h similarity index 76% rename from src/paimon/common/utils/arrow/mem_utils.cpp rename to src/paimon/common/utils/arrow/arrow_memory_pool_adaptor.h index 9bafac88..8fc682e1 100644 --- a/src/paimon/common/utils/arrow/mem_utils.cpp +++ b/src/paimon/common/utils/arrow/arrow_memory_pool_adaptor.h @@ -14,24 +14,25 @@ * limitations under the License. */ -#include "paimon/common/utils/arrow/mem_utils.h" +#pragma once -#include -#include #include #include "arrow/memory_pool.h" #include "arrow/status.h" #include "paimon/memory/memory_pool.h" +#include "paimon/memory/memory_pool_adaptor.h" namespace paimon { -class ArrowMemPoolAdaptor : public arrow::MemoryPool { +class ArrowMemoryPoolAdaptor : public arrow::MemoryPool, + public MemoryPoolAdaptor { public: - explicit ArrowMemPoolAdaptor(paimon::MemoryPool& pool) : pool_(pool) {} + static std::string Identifier() { + return "ArrowMemoryPoolAdaptor"; + } - explicit ArrowMemPoolAdaptor(const std::shared_ptr& pool) - : pool_(*pool), life_holder_(pool) {} + explicit ArrowMemoryPoolAdaptor(paimon::MemoryPool& pool) : pool_(pool) {} arrow::Status Allocate(int64_t size, int64_t alignment, uint8_t** out) override { *out = reinterpret_cast(pool_.Malloc(size, alignment)); @@ -75,16 +76,11 @@ class ArrowMemPoolAdaptor : public arrow::MemoryPool { private: paimon::MemoryPool& pool_; - std::shared_ptr life_holder_; arrow::internal::MemoryPoolStats stats_; }; -std::unique_ptr GetArrowPool(MemoryPool& pool) { - return std::make_unique(pool); -} - -std::unique_ptr GetArrowPool(const std::shared_ptr& pool) { - return std::make_unique(pool); +inline arrow::MemoryPool* AsArrowMemoryPool(paimon::MemoryPool& pool) { + return pool.AsSpecifiedMemoryPool(); } } // namespace paimon diff --git a/src/paimon/common/utils/arrow/arrow_memory_pool_adaptor_test.cpp b/src/paimon/common/utils/arrow/arrow_memory_pool_adaptor_test.cpp new file mode 100644 index 00000000..2ee93dd3 --- /dev/null +++ b/src/paimon/common/utils/arrow/arrow_memory_pool_adaptor_test.cpp @@ -0,0 +1,75 @@ +/* + * Copyright 2024-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/utils/arrow/arrow_memory_pool_adaptor.h" + +#include "gtest/gtest.h" +#include "paimon/memory/memory_pool.h" + +namespace paimon::test { + +TEST(MemUtilsTest, TestSimple) { + const int64_t alignment = 64; + auto pool = GetMemoryPool(); + auto arrow_pool = AsArrowMemoryPool(*pool); + ASSERT_EQ("Paimon Pool", arrow_pool->backend_name()); + ASSERT_EQ(0, arrow_pool->total_bytes_allocated()); + ASSERT_EQ(0, arrow_pool->num_allocations()); + + uint8_t* ptr1 = nullptr; + ASSERT_TRUE(arrow_pool->Allocate(10, alignment, &ptr1).ok()); + ASSERT_TRUE(ptr1); + ASSERT_EQ(10, arrow_pool->total_bytes_allocated()); + ASSERT_EQ(10, arrow_pool->bytes_allocated()); + ASSERT_EQ(10, arrow_pool->max_memory()); + ASSERT_EQ(1, arrow_pool->num_allocations()); + + // test malloc and free + uint8_t* ptr2 = nullptr; + ASSERT_TRUE(arrow_pool->Allocate(20, alignment, &ptr2).ok()); + ASSERT_TRUE(ptr2); + ASSERT_EQ(30, arrow_pool->bytes_allocated()); + ASSERT_EQ(30, arrow_pool->max_memory()); + arrow_pool->Free(ptr2, 20, alignment); + ASSERT_EQ(10, arrow_pool->bytes_allocated()); + ASSERT_EQ(30, arrow_pool->max_memory()); + ASSERT_EQ(2, arrow_pool->num_allocations()); + + // test realloc with nullptr + uint8_t* ptr3 = nullptr; + ASSERT_TRUE(arrow_pool->Reallocate(/*old_size=*/0, /*new_size=*/40, alignment, &ptr3).ok()); + ASSERT_TRUE(ptr3); + ASSERT_EQ(50, arrow_pool->bytes_allocated()); + ASSERT_EQ(50, arrow_pool->max_memory()); + ASSERT_EQ(3, arrow_pool->num_allocations()); + + uint8_t* ptr3_old = ptr3; + // test realloc with same size + ASSERT_TRUE(arrow_pool->Reallocate(/*old_size=*/40, /*new_size=*/40, alignment, &ptr3).ok()); + ASSERT_EQ(ptr3_old, ptr3); + ASSERT_EQ(50, arrow_pool->bytes_allocated()); + ASSERT_EQ(50, arrow_pool->max_memory()); + ASSERT_EQ(3, arrow_pool->num_allocations()); + + arrow_pool->Free(ptr1, 10, alignment); + arrow_pool->Free(ptr3, 40, alignment); + ASSERT_EQ(0, arrow_pool->bytes_allocated()); + ASSERT_EQ(70, arrow_pool->total_bytes_allocated()); + ASSERT_EQ(3, arrow_pool->num_allocations()); + ASSERT_EQ(50, arrow_pool->max_memory()); +} + +} // namespace paimon::test diff --git a/src/paimon/common/utils/arrow/mem_utils.h b/src/paimon/common/utils/arrow/mem_utils.h deleted file mode 100644 index 3f38eb9d..00000000 --- a/src/paimon/common/utils/arrow/mem_utils.h +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright 2024-present Alibaba Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include - -#include "arrow/memory_pool.h" -#include "paimon/memory/memory_pool.h" -#include "paimon/visibility.h" - -namespace paimon { -class MemoryPool; - -PAIMON_EXPORT std::unique_ptr GetArrowPool(MemoryPool& pool); - -PAIMON_EXPORT std::unique_ptr GetArrowPool( - const std::shared_ptr& pool); - -} // namespace paimon diff --git a/src/paimon/common/utils/arrow/mem_utils_test.cpp b/src/paimon/common/utils/arrow/mem_utils_test.cpp deleted file mode 100644 index 2881d79b..00000000 --- a/src/paimon/common/utils/arrow/mem_utils_test.cpp +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright 2024-present Alibaba Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "paimon/common/utils/arrow/mem_utils.h" - -#include "gtest/gtest.h" -#include "paimon/memory/memory_pool.h" - -namespace paimon::test { - -TEST(MemUtilsTest, TestSimple) { - const int64_t alignment = 64; - auto pool = GetArrowPool(GetDefaultPool()); - ASSERT_EQ("Paimon Pool", pool->backend_name()); - ASSERT_EQ(0, pool->total_bytes_allocated()); - ASSERT_EQ(0, pool->num_allocations()); - - uint8_t* ptr1 = nullptr; - ASSERT_TRUE(pool->Allocate(10, alignment, &ptr1).ok()); - ASSERT_TRUE(ptr1); - ASSERT_EQ(10, pool->total_bytes_allocated()); - ASSERT_EQ(10, pool->bytes_allocated()); - ASSERT_EQ(10, pool->max_memory()); - ASSERT_EQ(1, pool->num_allocations()); - - // test malloc and free - uint8_t* ptr2 = nullptr; - ASSERT_TRUE(pool->Allocate(20, alignment, &ptr2).ok()); - ASSERT_TRUE(ptr2); - ASSERT_EQ(30, pool->bytes_allocated()); - ASSERT_EQ(30, pool->max_memory()); - pool->Free(ptr2, 20, alignment); - ASSERT_EQ(10, pool->bytes_allocated()); - ASSERT_EQ(30, pool->max_memory()); - ASSERT_EQ(2, pool->num_allocations()); - - // test realloc with nullptr - uint8_t* ptr3 = nullptr; - ASSERT_TRUE(pool->Reallocate(/*old_size=*/0, /*new_size=*/40, alignment, &ptr3).ok()); - ASSERT_TRUE(ptr3); - ASSERT_EQ(50, pool->bytes_allocated()); - ASSERT_EQ(50, pool->max_memory()); - ASSERT_EQ(3, pool->num_allocations()); - - uint8_t* ptr3_old = ptr3; - // test realloc with same size - ASSERT_TRUE(pool->Reallocate(/*old_size=*/40, /*new_size=*/40, alignment, &ptr3).ok()); - ASSERT_EQ(ptr3_old, ptr3); - ASSERT_EQ(50, pool->bytes_allocated()); - ASSERT_EQ(50, pool->max_memory()); - ASSERT_EQ(3, pool->num_allocations()); - - pool->Free(ptr1, 10, alignment); - pool->Free(ptr3, 40, alignment); - ASSERT_EQ(0, pool->bytes_allocated()); - ASSERT_EQ(70, pool->total_bytes_allocated()); - ASSERT_EQ(3, pool->num_allocations()); - ASSERT_EQ(50, pool->max_memory()); -} - -} // namespace paimon::test diff --git a/src/paimon/core/casting/cast_executor_test.cpp b/src/paimon/core/casting/cast_executor_test.cpp index 838d0b60..4e8b3c9f 100644 --- a/src/paimon/core/casting/cast_executor_test.cpp +++ b/src/paimon/core/casting/cast_executor_test.cpp @@ -30,7 +30,7 @@ #include "arrow/array/builder_dict.h" #include "arrow/ipc/json_simple.h" #include "gtest/gtest.h" -#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/arrow/arrow_memory_pool_adaptor.h" #include "paimon/common/utils/date_time_utils.h" #include "paimon/common/utils/decimal_utils.h" #include "paimon/common/utils/field_type_utils.h" @@ -89,7 +89,7 @@ class CastExecutorTest : public ::testing::Test { const std::shared_ptr& src_array, const std::shared_ptr& expected_array) const { ASSERT_OK_AND_ASSIGN(std::shared_ptr target_array, - cast_executor->Cast(src_array, target_type, arrow_pool_.get())); + cast_executor->Cast(src_array, target_type, arrow_pool_)); ASSERT_TRUE( target_array->Equals(expected_array, arrow::EqualOptions::Defaults().nans_equal(true))) << "target:" << target_array->ToString() << "expected:" << expected_array->ToString(); @@ -100,7 +100,7 @@ class CastExecutorTest : public ::testing::Test { const std::shared_ptr& src_array, const std::shared_ptr& expected_array) const { ASSERT_OK_AND_ASSIGN(std::shared_ptr target_array, - cast_executor->Cast(src_array, target_type, arrow_pool_.get())); + cast_executor->Cast(src_array, target_type, arrow_pool_)); ASSERT_TRUE(target_array->ApproxEquals(expected_array, arrow::EqualOptions::Defaults().nans_equal(true))) << "target:" << target_array->ToString() << "expected:" << expected_array->ToString(); @@ -113,7 +113,7 @@ class CastExecutorTest : public ::testing::Test { auto src_array = arrow::ipc::internal::json::ArrayFromJSON(src_type, src_array_str).ValueOrDie(); ASSERT_OK_AND_ASSIGN(std::shared_ptr target_array, - cast_executor->Cast(src_array, target_type, arrow_pool_.get())); + cast_executor->Cast(src_array, target_type, arrow_pool_)); auto expected_array = arrow::ipc::internal::json::ArrayFromJSON(target_type, target_array_str).ValueOrDie(); ASSERT_TRUE(target_array->Equals(expected_array)) @@ -125,7 +125,7 @@ class CastExecutorTest : public ::testing::Test { const std::string& src_array_str) const { auto src_array = arrow::ipc::internal::json::ArrayFromJSON(src_type, src_array_str).ValueOrDie(); - auto target_array = cast_executor->Cast(src_array, target_type, arrow_pool_.get()); + auto target_array = cast_executor->Cast(src_array, target_type, arrow_pool_); EXPECT_FALSE(target_array.ok()) << target_array.value()->ToString(); return target_array.status().ToString(); } @@ -204,7 +204,7 @@ class CastExecutorTest : public ::testing::Test { static constexpr double MIN_DOUBLE = std::numeric_limits::lowest(); private: - std::shared_ptr arrow_pool_ = GetArrowPool(GetDefaultPool()); + arrow::MemoryPool* arrow_pool_ = AsArrowMemoryPool(*GetDefaultPool()); }; TEST_F(CastExecutorTest, TestNumericPrimitiveCastExecutorCastLiteral) { diff --git a/src/paimon/core/casting/casting_utils_test.cpp b/src/paimon/core/casting/casting_utils_test.cpp index 69bf213e..6b5d453a 100644 --- a/src/paimon/core/casting/casting_utils_test.cpp +++ b/src/paimon/core/casting/casting_utils_test.cpp @@ -20,12 +20,13 @@ #include "arrow/ipc/api.h" #include "gtest/gtest.h" -#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/arrow/arrow_memory_pool_adaptor.h" +#include "paimon/memory/memory_pool.h" #include "paimon/testing/utils/testharness.h" namespace paimon::test { class CastingUtilsTest : public ::testing::Test { - std::shared_ptr arrow_pool_ = GetArrowPool(GetDefaultPool()); + arrow::MemoryPool* arrow_pool_ = AsArrowMemoryPool(*GetDefaultPool()); }; TEST_F(CastingUtilsTest, TestDictionaryToString) { @@ -41,11 +42,11 @@ TEST_F(CastingUtilsTest, TestDictionaryToString) { arrow::utf8(), R"(["bar", "bazr", "foo", "bazr", "foo"])") .ValueOrDie(); - auto pool = GetArrowPool(GetDefaultPool()); + auto pool = AsArrowMemoryPool(*GetDefaultPool()); arrow::compute::CastOptions options = arrow::compute::CastOptions::Safe(); ASSERT_OK_AND_ASSIGN( auto result_array, - CastingUtils::Cast(dict_array, /*target_type=*/arrow::utf8(), options, pool.get())); + CastingUtils::Cast(dict_array, /*target_type=*/arrow::utf8(), options, pool)); ASSERT_TRUE(result_array->Equals(string_array)); } @@ -62,7 +63,7 @@ TEST_F(CastingUtilsTest, TestTimestampToTimestampWithTimezone) { .ValueOr(nullptr); ASSERT_TRUE(target_array); ASSERT_OK_AND_ASSIGN(auto result_array, CastingUtils::TimestampToTimestampWithTimezone( - src_array, target_ts_type, arrow_pool_.get())); + src_array, target_ts_type, arrow_pool_)); ASSERT_TRUE(target_array->Equals(result_array)); } @@ -77,9 +78,9 @@ TEST_F(CastingUtilsTest, TestTimestampToTimestampWithTimezoneInvalid) { auto target_type = arrow::timestamp(arrow::TimeUnit::NANO, "Asia/Shanghai"); auto target_ts_type = arrow::internal::checked_pointer_cast(target_type); - ASSERT_NOK_WITH_MSG(CastingUtils::TimestampToTimestampWithTimezone( - src_array, target_ts_type, arrow_pool_.get()), - "time unit of src and target type mismatch"); + ASSERT_NOK_WITH_MSG( + CastingUtils::TimestampToTimestampWithTimezone(src_array, target_ts_type, arrow_pool_), + "time unit of src and target type mismatch"); } { auto src_array = @@ -91,8 +92,7 @@ TEST_F(CastingUtilsTest, TestTimestampToTimestampWithTimezoneInvalid) { auto target_ts_type = arrow::internal::checked_pointer_cast(target_type); ASSERT_NOK_WITH_MSG( - CastingUtils::TimestampToTimestampWithTimezone(src_array, target_ts_type, - arrow_pool_.get()), + CastingUtils::TimestampToTimestampWithTimezone(src_array, target_ts_type, arrow_pool_), "src value must be local time (no tz), target value must be UTC (with tz)"); } { @@ -104,10 +104,10 @@ TEST_F(CastingUtilsTest, TestTimestampToTimestampWithTimezoneInvalid) { auto target_type = arrow::timestamp(arrow::TimeUnit::SECOND, "Europe/Warsaw"); auto target_ts_type = arrow::internal::checked_pointer_cast(target_type); - ASSERT_NOK_WITH_MSG(CastingUtils::TimestampToTimestampWithTimezone( - src_array, target_ts_type, arrow_pool_.get()), - "Timestamp doesn't exist in timezone 'Europe/Warsaw': 2015-03-29 " - "02:30:00 is in a gap between"); + ASSERT_NOK_WITH_MSG( + CastingUtils::TimestampToTimestampWithTimezone(src_array, target_ts_type, arrow_pool_), + "Timestamp doesn't exist in timezone 'Europe/Warsaw': 2015-03-29 " + "02:30:00 is in a gap between"); } } @@ -125,7 +125,7 @@ TEST_F(CastingUtilsTest, TestTimestampWithTimezoneToTimestamp) { .ValueOr(nullptr); ASSERT_TRUE(target_array); ASSERT_OK_AND_ASSIGN(auto result_array, CastingUtils::TimestampWithTimezoneToTimestamp( - src_array, target_ts_type, arrow_pool_.get())); + src_array, target_ts_type, arrow_pool_)); ASSERT_TRUE(target_array->Equals(result_array)); } @@ -140,9 +140,9 @@ TEST_F(CastingUtilsTest, TestTimestampWithTimezoneToTimestampInvalid) { auto target_type = arrow::timestamp(arrow::TimeUnit::SECOND); auto target_ts_type = arrow::internal::checked_pointer_cast(target_type); - ASSERT_NOK_WITH_MSG(CastingUtils::TimestampWithTimezoneToTimestamp( - src_array, target_ts_type, arrow_pool_.get()), - "in timezone converter, time unit of src and target type mismatch"); + ASSERT_NOK_WITH_MSG( + CastingUtils::TimestampWithTimezoneToTimestamp(src_array, target_ts_type, arrow_pool_), + "in timezone converter, time unit of src and target type mismatch"); } { auto src_array = arrow::ipc::internal::json::ArrayFromJSON( @@ -153,9 +153,9 @@ TEST_F(CastingUtilsTest, TestTimestampWithTimezoneToTimestampInvalid) { auto target_type = arrow::timestamp(arrow::TimeUnit::SECOND, "Asia/Tokyo"); auto target_ts_type = arrow::internal::checked_pointer_cast(target_type); - ASSERT_NOK_WITH_MSG(CastingUtils::TimestampWithTimezoneToTimestamp( - src_array, target_ts_type, arrow_pool_.get()), - "target value must be local time (no tz)"); + ASSERT_NOK_WITH_MSG( + CastingUtils::TimestampWithTimezoneToTimestamp(src_array, target_ts_type, arrow_pool_), + "target value must be local time (no tz)"); } } diff --git a/src/paimon/core/io/complete_row_tracking_fields_reader.cpp b/src/paimon/core/io/complete_row_tracking_fields_reader.cpp index 92b859ef..7b99bf10 100644 --- a/src/paimon/core/io/complete_row_tracking_fields_reader.cpp +++ b/src/paimon/core/io/complete_row_tracking_fields_reader.cpp @@ -24,7 +24,7 @@ #include "arrow/c/abi.h" #include "arrow/c/bridge.h" #include "paimon/common/table/special_fields.h" -#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/arrow/arrow_memory_pool_adaptor.h" #include "paimon/common/utils/arrow/status_utils.h" namespace paimon { @@ -33,7 +33,7 @@ CompleteRowTrackingFieldsBatchReader::CompleteRowTrackingFieldsBatchReader( int64_t snapshot_id, const std::shared_ptr& pool) : first_row_id_(first_row_id), snapshot_id_(snapshot_id), - arrow_pool_(GetArrowPool(pool)), + memory_pool_(pool), reader_(std::move(reader)) {} Status CompleteRowTrackingFieldsBatchReader::SetReadSchema( @@ -148,7 +148,8 @@ Status CompleteRowTrackingFieldsBatchReader::ConvertRowTrackingField( // condition2: special field all null auto scalar = std::make_shared(init_value); PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( - special_array, arrow::MakeArrayFromScalar(*scalar, array_length, arrow_pool_.get())); + special_array, + arrow::MakeArrayFromScalar(*scalar, array_length, AsArrowMemoryPool(*memory_pool_))); auto typed_special_array = arrow::internal::checked_pointer_cast>( special_array); diff --git a/src/paimon/core/io/complete_row_tracking_fields_reader.h b/src/paimon/core/io/complete_row_tracking_fields_reader.h index 812ae5d5..21b2aa53 100644 --- a/src/paimon/core/io/complete_row_tracking_fields_reader.h +++ b/src/paimon/core/io/complete_row_tracking_fields_reader.h @@ -80,7 +80,7 @@ class CompleteRowTrackingFieldsBatchReader : public FileBatchReader { private: std::optional first_row_id_; int64_t snapshot_id_ = -1; - std::shared_ptr arrow_pool_; + std::shared_ptr memory_pool_; std::shared_ptr read_schema_; std::unique_ptr reader_; }; diff --git a/src/paimon/core/io/field_mapping_reader.cpp b/src/paimon/core/io/field_mapping_reader.cpp index 427e7741..4d3f1905 100644 --- a/src/paimon/core/io/field_mapping_reader.cpp +++ b/src/paimon/core/io/field_mapping_reader.cpp @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + #include "paimon/core/io/field_mapping_reader.h" #include @@ -30,7 +31,7 @@ #include "fmt/format.h" #include "paimon/common/data/binary_string.h" #include "paimon/common/types/data_field.h" -#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/arrow/arrow_memory_pool_adaptor.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/core/casting/cast_executor.h" #include "paimon/core/casting/casting_utils.h" @@ -46,7 +47,7 @@ FieldMappingReader::FieldMappingReader(int32_t field_count, std::unique_ptr&& mapping, const std::shared_ptr& pool) : field_count_(field_count), - arrow_pool_(GetArrowPool(pool)), + memory_pool_(pool), reader_(std::move(reader)), partition_(partition), partition_info_(mapping->partition_info), @@ -86,16 +87,16 @@ Result> FieldMappingReader::CastNonPartitionArrayI auto dict_array = std::dynamic_pointer_cast(single_column_array); if (dict_array) { - PAIMON_ASSIGN_OR_RAISE( - single_column_array, - CastingUtils::Cast(dict_array, /*target_type=*/arrow::utf8(), - arrow::compute::CastOptions::Safe(), arrow_pool_.get())); + PAIMON_ASSIGN_OR_RAISE(single_column_array, + CastingUtils::Cast(dict_array, /*target_type=*/arrow::utf8(), + arrow::compute::CastOptions::Safe(), + AsArrowMemoryPool(*memory_pool_))); } PAIMON_ASSIGN_OR_RAISE( std::shared_ptr casted, non_partition_info_.cast_executors[i]->Cast( single_column_array, non_partition_info_.non_partition_read_schema[i].Type(), - arrow_pool_.get())); + AsArrowMemoryPool(*memory_pool_))); casted_array.push_back(casted); casted_field_names.push_back(non_partition_info_.non_partition_read_schema[i].Name()); } else { @@ -177,7 +178,7 @@ Result> FieldMappingReader::GenerateSinglePartitio // for null partition value PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( std::shared_ptr null_array, - arrow::MakeArrayOfNull(type, batch_size, arrow_pool_.get())); + arrow::MakeArrayOfNull(type, batch_size, AsArrowMemoryPool(*memory_pool_))); return null_array; } auto type_id = type->id(); @@ -242,7 +243,7 @@ Result> FieldMappingReader::GenerateSinglePartitio } PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( std::shared_ptr arrow_array, - arrow::MakeArrayFromScalar(*scalar, batch_size, arrow_pool_.get())); + arrow::MakeArrayFromScalar(*scalar, batch_size, AsArrowMemoryPool(*memory_pool_))); return arrow_array; } @@ -271,9 +272,9 @@ Result> FieldMappingReader::GenerateNonExistArray( non_exist_array.reserve(non_exist_field_info_.value().non_exist_read_schema.size()); non_exist_field_names.reserve(non_exist_field_info_.value().non_exist_read_schema.size()); for (const auto& non_exist_field : non_exist_field_info_.value().non_exist_read_schema) { - PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( - std::shared_ptr null_array, - arrow::MakeArrayOfNull(non_exist_field.Type(), batch_size, arrow_pool_.get())); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr null_array, + arrow::MakeArrayOfNull(non_exist_field.Type(), batch_size, + AsArrowMemoryPool(*memory_pool_))); non_exist_array.push_back(null_array); non_exist_field_names.push_back(non_exist_field.Name()); } diff --git a/src/paimon/core/io/field_mapping_reader.h b/src/paimon/core/io/field_mapping_reader.h index d23bb60c..f3823d9c 100644 --- a/src/paimon/core/io/field_mapping_reader.h +++ b/src/paimon/core/io/field_mapping_reader.h @@ -27,7 +27,6 @@ #include "arrow/c/abi.h" #include "arrow/c/bridge.h" #include "paimon/common/data/binary_row.h" -#include "paimon/common/utils/arrow/mem_utils.h" #include "paimon/core/partition/partition_info.h" #include "paimon/core/utils/field_mapping.h" #include "paimon/reader/batch_reader.h" @@ -85,7 +84,7 @@ class FieldMappingReader : public BatchReader { bool need_mapping_ = false; bool need_casting_ = false; int32_t field_count_; - std::shared_ptr arrow_pool_; + std::shared_ptr memory_pool_; std::unique_ptr reader_; BinaryRow partition_ = BinaryRow::EmptyRow(); diff --git a/src/paimon/core/io/key_value_meta_projection_consumer.cpp b/src/paimon/core/io/key_value_meta_projection_consumer.cpp index b46aac42..8ea4fb48 100644 --- a/src/paimon/core/io/key_value_meta_projection_consumer.cpp +++ b/src/paimon/core/io/key_value_meta_projection_consumer.cpp @@ -30,7 +30,7 @@ #include "paimon/common/data/internal_row.h" #include "paimon/common/table/special_fields.h" #include "paimon/common/types/row_kind.h" -#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/arrow/arrow_memory_pool_adaptor.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/reader/batch_reader.h" #include "paimon/status.h" @@ -40,11 +40,10 @@ class MemoryPool; Result> KeyValueMetaProjectionConsumer::Create( const std::shared_ptr& target_schema, const std::shared_ptr& pool) { - auto arrow_pool = GetArrowPool(pool); // target fields of output array: special fields + value fields std::unique_ptr array_builder; PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::MakeBuilder( - arrow_pool.get(), arrow::struct_(target_schema->fields()), &array_builder)); + AsArrowMemoryPool(*pool), arrow::struct_(target_schema->fields()), &array_builder)); auto struct_builder = arrow::internal::checked_pointer_cast(std::move(array_builder)); @@ -73,8 +72,8 @@ Result> KeyValueMetaProjectionCo appenders.emplace_back(func); } return std::unique_ptr(new KeyValueMetaProjectionConsumer( - reserve_count, std::move(appenders), std::move(struct_builder), std::move(arrow_pool), - sequence_appender, value_kind_appender)); + reserve_count, std::move(appenders), std::move(struct_builder), pool, sequence_appender, + value_kind_appender)); } Result KeyValueMetaProjectionConsumer::NextBatch( diff --git a/src/paimon/core/io/key_value_meta_projection_consumer.h b/src/paimon/core/io/key_value_meta_projection_consumer.h index ea5a8991..cdab3fb8 100644 --- a/src/paimon/core/io/key_value_meta_projection_consumer.h +++ b/src/paimon/core/io/key_value_meta_projection_consumer.h @@ -48,11 +48,11 @@ class KeyValueMetaProjectionConsumer : public RowToArrowArrayConverter&& appenders, std::unique_ptr&& array_builder, - std::unique_ptr&& arrow_pool, + std::shared_ptr memory_pool, arrow::Int64Builder* sequence_appender, arrow::Int8Builder* value_kind_appender) : RowToArrowArrayConverter(reserve_count, std::move(appenders), std::move(array_builder), - std::move(arrow_pool)), + std::move(memory_pool)), sequence_appender_(sequence_appender), value_kind_appender_(value_kind_appender) {} diff --git a/src/paimon/core/io/key_value_projection_consumer.cpp b/src/paimon/core/io/key_value_projection_consumer.cpp index 69e51b57..560ff435 100644 --- a/src/paimon/core/io/key_value_projection_consumer.cpp +++ b/src/paimon/core/io/key_value_projection_consumer.cpp @@ -26,7 +26,7 @@ #include "arrow/c/abi.h" #include "arrow/util/checked_cast.h" #include "paimon/common/data/internal_row.h" -#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/arrow/arrow_memory_pool_adaptor.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/core/key_value.h" #include "paimon/status.h" @@ -41,10 +41,9 @@ Result> KeyValueProjectionConsumer:: return Status::Invalid( "target_schema and target_to_src_mapping mismatch in KeyValueProjectionConsumer"); } - auto arrow_pool = GetArrowPool(pool); std::unique_ptr array_builder; PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::MakeBuilder( - arrow_pool.get(), std::make_shared(target_schema->fields()), + AsArrowMemoryPool(*pool), std::make_shared(target_schema->fields()), &array_builder)); auto struct_builder = @@ -60,9 +59,9 @@ Result> KeyValueProjectionConsumer:: AppendField(/*use_view=*/true, struct_builder->field_builder(i), &reserve_count)); appenders.emplace_back(func); } - return std::unique_ptr(new KeyValueProjectionConsumer( - reserve_count, std::move(appenders), std::move(struct_builder), std::move(arrow_pool), - target_to_src_mapping)); + return std::unique_ptr( + new KeyValueProjectionConsumer(reserve_count, std::move(appenders), + std::move(struct_builder), pool, target_to_src_mapping)); } Result KeyValueProjectionConsumer::NextBatch( diff --git a/src/paimon/core/io/key_value_projection_consumer.h b/src/paimon/core/io/key_value_projection_consumer.h index 71cfa69a..5f0f9137 100644 --- a/src/paimon/core/io/key_value_projection_consumer.h +++ b/src/paimon/core/io/key_value_projection_consumer.h @@ -51,10 +51,10 @@ class KeyValueProjectionConsumer private: KeyValueProjectionConsumer(int32_t reserve_count, std::vector&& appenders, std::unique_ptr&& array_builder, - std::unique_ptr&& arrow_pool, + std::shared_ptr memory_pool, const std::vector& target_to_src_mapping) : RowToArrowArrayConverter(reserve_count, std::move(appenders), std::move(array_builder), - std::move(arrow_pool)), + std::move(memory_pool)), target_to_src_mapping_(target_to_src_mapping) {} std::vector target_to_src_mapping_; diff --git a/src/paimon/core/io/key_value_projection_reader_test.cpp b/src/paimon/core/io/key_value_projection_reader_test.cpp index 91cbc563..91650cf2 100644 --- a/src/paimon/core/io/key_value_projection_reader_test.cpp +++ b/src/paimon/core/io/key_value_projection_reader_test.cpp @@ -33,7 +33,7 @@ #include "arrow/util/checked_cast.h" #include "gtest/gtest.h" #include "paimon/common/types/data_field.h" -#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/arrow/arrow_memory_pool_adaptor.h" #include "paimon/common/utils/date_time_utils.h" #include "paimon/core/io/async_key_value_projection_reader.h" #include "paimon/core/io/concat_key_value_record_reader.h" @@ -166,9 +166,8 @@ TEST_P(KeyValueProjectionReaderTest, TestBulkData) { arrow::schema(arrow::FieldVector({fields[2], fields[3], fields[4], fields[5]})); std::shared_ptr src_type = arrow::struct_({fields}); - auto arrow_pool = GetArrowPool(pool_); std::unique_ptr array_builder; - ASSERT_TRUE(arrow::MakeBuilder(arrow_pool.get(), src_type, &array_builder).ok()); + ASSERT_TRUE(arrow::MakeBuilder(AsArrowMemoryPool(*pool_), src_type, &array_builder).ok()); auto struct_builder = arrow::internal::checked_pointer_cast(std::move(array_builder)); diff --git a/src/paimon/core/io/meta_to_arrow_array_converter.cpp b/src/paimon/core/io/meta_to_arrow_array_converter.cpp index a06298b7..3579fdc7 100644 --- a/src/paimon/core/io/meta_to_arrow_array_converter.cpp +++ b/src/paimon/core/io/meta_to_arrow_array_converter.cpp @@ -13,8 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + #include "paimon/core/io/meta_to_arrow_array_converter.h" +#include "paimon/common/utils/arrow/arrow_memory_pool_adaptor.h" + namespace paimon { Result> MetaToArrowArrayConverter::Create( const std::shared_ptr& meta_data_type, @@ -23,10 +26,9 @@ Result> MetaToArrowArrayConverter::Cr if (!struct_type) { return Status::Invalid("meta_data_type in MetaToArrowArrayConverter must be struct type"); } - auto arrow_pool = GetArrowPool(pool); std::unique_ptr array_builder; PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::MakeBuilder( - arrow_pool.get(), arrow::struct_(struct_type->fields()), &array_builder)); + AsArrowMemoryPool(*pool), arrow::struct_(struct_type->fields()), &array_builder)); auto struct_builder = arrow::internal::checked_pointer_cast(std::move(array_builder)); @@ -42,7 +44,7 @@ Result> MetaToArrowArrayConverter::Cr appenders.emplace_back(func); } return std::unique_ptr(new MetaToArrowArrayConverter( - reserve_count, std::move(appenders), std::move(struct_builder), std::move(arrow_pool))); + reserve_count, std::move(appenders), std::move(struct_builder), pool)); } Result> MetaToArrowArrayConverter::NextBatch( diff --git a/src/paimon/core/io/meta_to_arrow_array_converter.h b/src/paimon/core/io/meta_to_arrow_array_converter.h index 267d32c9..eb484a0c 100644 --- a/src/paimon/core/io/meta_to_arrow_array_converter.h +++ b/src/paimon/core/io/meta_to_arrow_array_converter.h @@ -48,8 +48,8 @@ class MetaToArrowArrayConverter private: MetaToArrowArrayConverter(int32_t reserve_count, std::vector&& appenders, std::unique_ptr&& array_builder, - std::unique_ptr&& arrow_pool) + std::shared_ptr memory_pool) : RowToArrowArrayConverter(reserve_count, std::move(appenders), std::move(array_builder), - std::move(arrow_pool)) {} + std::move(memory_pool)) {} }; } // namespace paimon diff --git a/src/paimon/core/io/row_to_arrow_array_converter.h b/src/paimon/core/io/row_to_arrow_array_converter.h index 9d8ea932..cdbf1600 100644 --- a/src/paimon/core/io/row_to_arrow_array_converter.h +++ b/src/paimon/core/io/row_to_arrow_array_converter.h @@ -24,7 +24,6 @@ #include "arrow/c/bridge.h" #include "paimon/common/data/internal_array.h" #include "paimon/common/data/internal_map.h" -#include "paimon/common/utils/arrow/mem_utils.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/common/utils/date_time_utils.h" #include "paimon/core/key_value.h" @@ -49,7 +48,7 @@ class RowToArrowArrayConverter { std::function; RowToArrowArrayConverter(int32_t reserve_count, std::vector&& appenders, std::unique_ptr&& array_builder, - std::unique_ptr&& arrow_pool); + std::shared_ptr&& memory_pool); static Result AppendField(bool use_view, arrow::ArrayBuilder* array_builder, int32_t* reserve_count); @@ -69,7 +68,7 @@ class RowToArrowArrayConverter { protected: std::vector reserved_sizes_; - std::unique_ptr arrow_pool_; + std::shared_ptr memory_pool_; std::vector appenders_; std::unique_ptr array_builder_; }; @@ -83,9 +82,9 @@ template RowToArrowArrayConverter::RowToArrowArrayConverter( int32_t reserve_count, std::vector::AppendValueFunc>&& appenders, std::unique_ptr&& array_builder, - std::unique_ptr&& arrow_pool) + std::shared_ptr&& memory_pool) : reserved_sizes_(reserve_count, -1), - arrow_pool_(std::move(arrow_pool)), + memory_pool_(std::move(memory_pool)), appenders_(std::move(appenders)), array_builder_(std::move(array_builder)) {} diff --git a/src/paimon/core/manifest/index_manifest_entry_serializer.h b/src/paimon/core/manifest/index_manifest_entry_serializer.h index 718cf40e..b486d52d 100644 --- a/src/paimon/core/manifest/index_manifest_entry_serializer.h +++ b/src/paimon/core/manifest/index_manifest_entry_serializer.h @@ -21,7 +21,6 @@ #include #include "arrow/status.h" -#include "paimon/common/utils/arrow/mem_utils.h" #include "paimon/core/manifest/index_manifest_entry.h" #include "paimon/core/utils/versioned_object_serializer.h" #include "paimon/result.h" @@ -36,7 +35,7 @@ class MemoryPool; class IndexManifestEntrySerializer : public VersionedObjectSerializer { public: explicit IndexManifestEntrySerializer(const std::shared_ptr& pool) - : VersionedObjectSerializer(pool), arrow_pool_(GetArrowPool(pool_)) {} + : VersionedObjectSerializer(pool) {} int32_t GetVersion() const override { return VERSION; @@ -48,6 +47,5 @@ class IndexManifestEntrySerializer : public VersionedObjectSerializer arrow_pool_; }; } // namespace paimon diff --git a/src/paimon/core/manifest/manifest_entry_serializer.h b/src/paimon/core/manifest/manifest_entry_serializer.h index 8160d8c2..e7cc5001 100644 --- a/src/paimon/core/manifest/manifest_entry_serializer.h +++ b/src/paimon/core/manifest/manifest_entry_serializer.h @@ -22,7 +22,6 @@ #include "arrow/api.h" #include "arrow/c/bridge.h" -#include "paimon/common/utils/arrow/mem_utils.h" #include "paimon/core/io/data_file_meta_serializer.h" #include "paimon/core/manifest/manifest_entry.h" #include "paimon/core/utils/versioned_object_serializer.h" @@ -42,7 +41,7 @@ class ManifestEntrySerializer : public VersionedObjectSerializer public: explicit ManifestEntrySerializer(const std::shared_ptr& pool) : VersionedObjectSerializer(pool), - arrow_pool_(GetArrowPool(pool_)), + memory_pool_(pool_), data_file_meta_serializer_(pool) {} int32_t GetVersion() const override { @@ -56,7 +55,7 @@ class ManifestEntrySerializer : public VersionedObjectSerializer private: static constexpr int32_t VERSION_1 = 1; static constexpr int32_t VERSION_2 = 2; - std::unique_ptr arrow_pool_; + std::shared_ptr memory_pool_; DataFileMetaSerializer data_file_meta_serializer_; }; } // namespace paimon diff --git a/src/paimon/core/manifest/manifest_file_meta_serializer.h b/src/paimon/core/manifest/manifest_file_meta_serializer.h index fbdbfc60..2505d3fd 100644 --- a/src/paimon/core/manifest/manifest_file_meta_serializer.h +++ b/src/paimon/core/manifest/manifest_file_meta_serializer.h @@ -21,7 +21,6 @@ #include #include "arrow/api.h" -#include "paimon/common/utils/arrow/mem_utils.h" #include "paimon/core/manifest/manifest_entry.h" #include "paimon/core/manifest/manifest_file_meta.h" #include "paimon/core/utils/versioned_object_serializer.h" @@ -41,7 +40,7 @@ class MemoryPool; class ManifestFileMetaSerializer : public VersionedObjectSerializer { public: explicit ManifestFileMetaSerializer(const std::shared_ptr& pool) - : VersionedObjectSerializer(pool), arrow_pool_(GetArrowPool(pool_)) {} + : VersionedObjectSerializer(pool) {} int32_t GetVersion() const override { return VERSION_2; @@ -54,8 +53,6 @@ class ManifestFileMetaSerializer : public VersionedObjectSerializer arrow_pool_; }; } // namespace paimon diff --git a/src/paimon/core/postpone/postpone_bucket_writer.cpp b/src/paimon/core/postpone/postpone_bucket_writer.cpp index 001264b8..e6632ed8 100644 --- a/src/paimon/core/postpone/postpone_bucket_writer.cpp +++ b/src/paimon/core/postpone/postpone_bucket_writer.cpp @@ -34,7 +34,7 @@ #include "paimon/common/table/special_fields.h" #include "paimon/common/types/data_field.h" #include "paimon/common/types/row_kind.h" -#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/arrow/arrow_memory_pool_adaptor.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/common/utils/scope_guard.h" #include "paimon/core/io/compact_increment.h" @@ -59,7 +59,6 @@ PostponeBucketWriter::PostponeBucketWriter(const std::vector& trimm const CoreOptions& options, const std::shared_ptr& pool) : pool_(pool), - arrow_pool_(GetArrowPool(pool)), trimmed_primary_keys_(trimmed_primary_keys), options_(options), path_factory_(path_factory), @@ -148,7 +147,7 @@ Result> PostponeBucketWriter::PrepareSequenceNumbe PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( sequence_number_array_, arrow::MakeArrayFromScalar(*sequence_number_scalar, value_array_length, - arrow_pool_.get())); + AsArrowMemoryPool(*pool_))); return sequence_number_array_; } assert(sequence_number_array_->length() >= value_array_length); @@ -163,7 +162,8 @@ Result> PostponeBucketWriter::PrepareRowKindArray( std::make_shared(RowKind::Insert()->ToByteValue()); PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( std::shared_ptr scalar_array, - arrow::MakeArrayFromScalar(*row_kind_scalar, value_array_length, arrow_pool_.get())); + arrow::MakeArrayFromScalar(*row_kind_scalar, value_array_length, + AsArrowMemoryPool(*pool_))); auto typed_row_kind_array = arrow::internal::checked_pointer_cast>( scalar_array); diff --git a/src/paimon/core/postpone/postpone_bucket_writer.h b/src/paimon/core/postpone/postpone_bucket_writer.h index 036c638c..77b1999f 100644 --- a/src/paimon/core/postpone/postpone_bucket_writer.h +++ b/src/paimon/core/postpone/postpone_bucket_writer.h @@ -102,7 +102,6 @@ class PostponeBucketWriter : public BatchWriter { private: std::shared_ptr pool_; - std::unique_ptr arrow_pool_; std::vector trimmed_primary_keys_; CoreOptions options_; std::shared_ptr path_factory_; diff --git a/src/paimon/core/utils/manifest_meta_reader.cpp b/src/paimon/core/utils/manifest_meta_reader.cpp index b52bae54..faffb548 100644 --- a/src/paimon/core/utils/manifest_meta_reader.cpp +++ b/src/paimon/core/utils/manifest_meta_reader.cpp @@ -31,7 +31,7 @@ #include "arrow/compute/cast.h" #include "arrow/type.h" #include "arrow/util/checked_cast.h" -#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/arrow/arrow_memory_pool_adaptor.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/status.h" @@ -41,7 +41,7 @@ class MemoryPool; ManifestMetaReader::ManifestMetaReader(std::unique_ptr&& reader, const std::shared_ptr& target_type, const std::shared_ptr& pool) - : reader_(std::move(reader)), target_type_(target_type), pool_(GetArrowPool(pool)) {} + : reader_(std::move(reader)), target_type_(target_type), pool_(pool) {} Result ManifestMetaReader::NextBatch() { PAIMON_ASSIGN_OR_RAISE(ReadBatch src_result, reader_->NextBatch()); @@ -53,8 +53,9 @@ Result ManifestMetaReader::NextBatch() { PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr arrow_array, arrow::ImportArray(c_array.get(), c_schema.get())); - PAIMON_ASSIGN_OR_RAISE(std::shared_ptr target_array, - AlignArrayWithSchema(arrow_array, target_type_, pool_.get())); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr target_array, + AlignArrayWithSchema(arrow_array, target_type_, AsArrowMemoryPool(*pool_))); std::unique_ptr target_c_arrow_array = std::make_unique(); std::unique_ptr target_c_schema = std::make_unique(); diff --git a/src/paimon/core/utils/manifest_meta_reader.h b/src/paimon/core/utils/manifest_meta_reader.h index 087f1b40..205de895 100644 --- a/src/paimon/core/utils/manifest_meta_reader.h +++ b/src/paimon/core/utils/manifest_meta_reader.h @@ -22,7 +22,6 @@ #include "arrow/c/bridge.h" #include "arrow/memory_pool.h" #include "arrow/type.h" -#include "paimon/common/utils/arrow/mem_utils.h" #include "paimon/memory/memory_pool.h" #include "paimon/reader/batch_reader.h" #include "paimon/result.h" @@ -66,7 +65,7 @@ class PAIMON_EXPORT ManifestMetaReader : public BatchReader { std::unique_ptr reader_; std::shared_ptr target_type_; - std::unique_ptr pool_; + std::shared_ptr pool_; }; } // namespace paimon diff --git a/src/paimon/core/utils/manifest_meta_reader_test.cpp b/src/paimon/core/utils/manifest_meta_reader_test.cpp index c77604a4..e08da8dc 100644 --- a/src/paimon/core/utils/manifest_meta_reader_test.cpp +++ b/src/paimon/core/utils/manifest_meta_reader_test.cpp @@ -23,7 +23,7 @@ #include "arrow/ipc/json_simple.h" #include "arrow/type.h" #include "gtest/gtest.h" -#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/arrow/arrow_memory_pool_adaptor.h" #include "paimon/memory/memory_pool.h" #include "paimon/status.h" #include "paimon/testing/utils/testharness.h" @@ -49,9 +49,9 @@ TEST(ManifestMetaReaderTest, TestNoNeedCompleteNonExistField) { .ValueOrDie(); arrow::FieldVector target_fields = fields; auto target_arrow_type = arrow::struct_(target_fields); - auto arrow_pool = GetArrowPool(GetDefaultPool()); + auto arrow_pool = AsArrowMemoryPool(*GetDefaultPool()); ASSERT_OK_AND_ASSIGN(auto ret, ManifestMetaReader::AlignArrayWithSchema( - src_array, target_arrow_type, arrow_pool.get())); + src_array, target_arrow_type, arrow_pool)); ASSERT_TRUE(src_array->Equals(ret)) << ret->ToString(); } TEST(ManifestMetaReaderTest, TestCompleteNonExistFieldSimple) { @@ -81,9 +81,9 @@ TEST(ManifestMetaReaderTest, TestCompleteNonExistFieldSimple) { field("sub3", arrow::boolean()), field("sub4", arrow::int8())})), }; auto target_arrow_type = arrow::struct_(target_fields); - auto arrow_pool = GetArrowPool(GetDefaultPool()); + auto arrow_pool = AsArrowMemoryPool(*GetDefaultPool()); ASSERT_OK_AND_ASSIGN(auto ret, ManifestMetaReader::AlignArrayWithSchema( - src_array, target_arrow_type, arrow_pool.get())); + src_array, target_arrow_type, arrow_pool)); auto expected_array = arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({target_fields}), R"([ @@ -122,9 +122,9 @@ TEST(ManifestMetaReaderTest, TestCompleteNonExistFieldNested) { field("c", arrow::int64())}), arrow::boolean()))}; auto target_arrow_type = arrow::struct_(target_fields); - auto arrow_pool = GetArrowPool(GetDefaultPool()); + auto arrow_pool = AsArrowMemoryPool(*GetDefaultPool()); ASSERT_OK_AND_ASSIGN(auto ret, ManifestMetaReader::AlignArrayWithSchema( - src_array, target_arrow_type, arrow_pool.get())); + src_array, target_arrow_type, arrow_pool)); auto expected_array = arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({target_fields}), R"([ @@ -170,9 +170,9 @@ TEST(ManifestMetaReaderTest, TestCastInt32Type) { auto target_arrow_type = arrow::struct_(target_fields); auto target_array = arrow::ipc::internal::json::ArrayFromJSON(target_arrow_type, array_json).ValueOrDie(); - auto arrow_pool = GetArrowPool(GetDefaultPool()); + auto arrow_pool = AsArrowMemoryPool(*GetDefaultPool()); ASSERT_OK_AND_ASSIGN(auto result_array, ManifestMetaReader::AlignArrayWithSchema( - src_array, target_arrow_type, arrow_pool.get())); + src_array, target_arrow_type, arrow_pool)); ASSERT_TRUE(target_array->Equals(result_array)) << result_array->ToString(); } diff --git a/src/paimon/format/avro/avro_file_batch_reader.cpp b/src/paimon/format/avro/avro_file_batch_reader.cpp index 1cccdf1a..b09cb0b9 100644 --- a/src/paimon/format/avro/avro_file_batch_reader.cpp +++ b/src/paimon/format/avro/avro_file_batch_reader.cpp @@ -22,8 +22,8 @@ #include "arrow/c/bridge.h" #include "fmt/format.h" #include "paimon/common/metrics/metrics_impl.h" +#include "paimon/common/utils/arrow/arrow_memory_pool_adaptor.h" #include "paimon/common/utils/arrow/arrow_utils.h" -#include "paimon/common/utils/arrow/mem_utils.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/common/utils/scope_guard.h" #include "paimon/format/avro/avro_input_stream_impl.h" @@ -36,11 +36,9 @@ AvroFileBatchReader::AvroFileBatchReader(const std::shared_ptr& inp const std::shared_ptr<::arrow::DataType>& file_data_type, std::unique_ptr<::avro::DataFileReaderBase>&& reader, std::unique_ptr&& array_builder, - std::unique_ptr&& arrow_pool, int32_t batch_size, const std::shared_ptr& pool) : pool_(pool), - arrow_pool_(std::move(arrow_pool)), input_stream_(input_stream), file_data_type_(file_data_type), reader_(std::move(reader)), @@ -71,12 +69,11 @@ Result> AvroFileBatchReader::Create( const auto& avro_file_schema = reader->dataSchema(); PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<::arrow::DataType> file_data_type, AvroSchemaConverter::AvroSchemaToArrowDataType(avro_file_schema)); - auto arrow_pool = GetArrowPool(pool); PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::unique_ptr array_builder, - arrow::MakeBuilder(file_data_type, arrow_pool.get())); + arrow::MakeBuilder(file_data_type, AsArrowMemoryPool(*pool))); return std::unique_ptr( new AvroFileBatchReader(input_stream, file_data_type, std::move(reader), - std::move(array_builder), std::move(arrow_pool), batch_size, pool)); + std::move(array_builder), batch_size, pool)); } Result> AvroFileBatchReader::CreateDataFileReader( @@ -151,8 +148,8 @@ Status AvroFileBatchReader::SetReadSchema(::ArrowSchema* read_schema, CalculateReadFieldsProjection(file_schema, arrow_read_schema->fields())); array_builder_->Reset(); std::shared_ptr<::arrow::DataType> read_data_type = arrow::struct_(arrow_read_schema->fields()); - PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(array_builder_, - arrow::MakeBuilder(read_data_type, arrow_pool_.get())); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + array_builder_, arrow::MakeBuilder(read_data_type, AsArrowMemoryPool(*pool_))); return Status::OK(); } diff --git a/src/paimon/format/avro/avro_file_batch_reader.h b/src/paimon/format/avro/avro_file_batch_reader.h index 38aa054c..cc9367a7 100644 --- a/src/paimon/format/avro/avro_file_batch_reader.h +++ b/src/paimon/format/avro/avro_file_batch_reader.h @@ -75,14 +75,12 @@ class AvroFileBatchReader : public FileBatchReader { AvroFileBatchReader(const std::shared_ptr& input_stream, const std::shared_ptr<::arrow::DataType>& file_data_type, std::unique_ptr<::avro::DataFileReaderBase>&& reader, - std::unique_ptr&& array_builder, - std::unique_ptr&& arrow_pool, int32_t batch_size, + std::unique_ptr&& array_builder, int32_t batch_size, const std::shared_ptr& pool); static constexpr size_t BUFFER_SIZE = 1024 * 1024; // 1M std::shared_ptr pool_; - std::unique_ptr arrow_pool_; std::shared_ptr input_stream_; std::shared_ptr<::arrow::DataType> file_data_type_; std::unique_ptr<::avro::DataFileReaderBase> reader_; diff --git a/src/paimon/format/avro/avro_format_writer_test.cpp b/src/paimon/format/avro/avro_format_writer_test.cpp index ec0b5651..3b010484 100644 --- a/src/paimon/format/avro/avro_format_writer_test.cpp +++ b/src/paimon/format/avro/avro_format_writer_test.cpp @@ -27,7 +27,6 @@ #include "arrow/io/file.h" #include "arrow/memory_pool.h" #include "gtest/gtest.h" -#include "paimon/common/utils/arrow/mem_utils.h" #include "paimon/common/utils/path_util.h" #include "paimon/format/avro/avro_file_batch_reader.h" #include "paimon/format/file_format.h" @@ -49,7 +48,6 @@ class AvroFormatWriterTest : public ::testing::Test { ASSERT_TRUE(dir_); fs_ = std::make_shared(); pool_ = GetDefaultPool(); - arrow_pool_ = GetArrowPool(pool_); } void TearDown() override {} @@ -157,7 +155,6 @@ class AvroFormatWriterTest : public ::testing::Test { std::unique_ptr dir_; std::shared_ptr fs_; std::shared_ptr pool_; - std::shared_ptr arrow_pool_; }; TEST_F(AvroFormatWriterTest, TestWriteWithVariousBatchSize) { diff --git a/src/paimon/format/blob/blob_file_batch_reader.cpp b/src/paimon/format/blob/blob_file_batch_reader.cpp index d47e4173..69514636 100644 --- a/src/paimon/format/blob/blob_file_batch_reader.cpp +++ b/src/paimon/format/blob/blob_file_batch_reader.cpp @@ -29,7 +29,7 @@ #include "paimon/common/executor/future.h" #include "paimon/common/io/offset_input_stream.h" #include "paimon/common/metrics/metrics_impl.h" -#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/arrow/arrow_memory_pool_adaptor.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/common/utils/delta_varint_compressor.h" #include "paimon/common/utils/stream_utils.h" @@ -105,7 +105,6 @@ BlobFileBatchReader::BlobFileBatchReader(const std::shared_ptr& inp batch_size_(batch_size), blob_as_descriptor_(blob_as_descriptor), pool_(pool), - arrow_pool_(GetArrowPool(pool_)), metrics_(std::make_shared()) { target_blob_row_indexes_.resize(target_blob_lengths_.size()); std::iota(target_blob_row_indexes_.begin(), target_blob_row_indexes_.end(), 0); @@ -159,7 +158,7 @@ Status BlobFileBatchReader::SetReadSchema(::ArrowSchema* read_schema, Result> BlobFileBatchReader::NextBlobOffsets( int32_t rows_to_read) const { - arrow::TypedBufferBuilder buffer_builder(arrow_pool_.get()); + arrow::TypedBufferBuilder buffer_builder(AsArrowMemoryPool(*pool_)); PAIMON_RETURN_NOT_OK_FROM_ARROW(buffer_builder.Reserve(rows_to_read + 1)); PAIMON_RETURN_NOT_OK_FROM_ARROW(buffer_builder.Append(0)); int64_t data_length = 0; @@ -180,8 +179,9 @@ Result> BlobFileBatchReader::NextBlobContents( const size_t i = current_pos_ + k; total_length += GetTargetContentLength(i); } - PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr data_buffer, - arrow::AllocateBuffer(total_length, arrow_pool_.get())); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + std::shared_ptr data_buffer, + arrow::AllocateBuffer(total_length, AsArrowMemoryPool(*pool_))); uint8_t* buffer = data_buffer->mutable_data(); for (int32_t k = 0; k < rows_to_read; ++k) { const size_t i = current_pos_ + k; @@ -260,7 +260,7 @@ Result> BlobFileBatchReader::ToArrowArray( return Status::Invalid("target type is nullptr, call SetReadSchema first"); } PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::unique_ptr array_builder, - arrow::MakeBuilder(target_type_, arrow_pool_.get())); + arrow::MakeBuilder(target_type_, AsArrowMemoryPool(*pool_))); auto builder = dynamic_cast(array_builder.get()); if (builder == nullptr) { return Status::Invalid("cast to struct builder failed"); diff --git a/src/paimon/format/blob/blob_file_batch_reader.h b/src/paimon/format/blob/blob_file_batch_reader.h index d2a76673..90204fa8 100644 --- a/src/paimon/format/blob/blob_file_batch_reader.h +++ b/src/paimon/format/blob/blob_file_batch_reader.h @@ -160,7 +160,6 @@ class BlobFileBatchReader : public FileBatchReader { const int32_t batch_size_; const bool blob_as_descriptor_; std::shared_ptr pool_; - std::shared_ptr arrow_pool_; std::shared_ptr target_type_; std::shared_ptr metrics_; diff --git a/src/paimon/format/orc/orc_adapter_test.cpp b/src/paimon/format/orc/orc_adapter_test.cpp index bf310de3..fbf70fdb 100644 --- a/src/paimon/format/orc/orc_adapter_test.cpp +++ b/src/paimon/format/orc/orc_adapter_test.cpp @@ -41,7 +41,7 @@ #include "paimon/common/utils/date_time_utils.h" #include "paimon/format/orc/orc_format_defs.h" #include "paimon/format/orc/orc_input_stream_impl.h" -#include "paimon/format/orc/orc_memory_pool.h" +#include "paimon/format/orc/orc_memory_pool_adaptor.h" #include "paimon/format/orc/orc_output_stream_impl.h" #include "paimon/fs/file_system.h" #include "paimon/fs/local/local_file_system.h" @@ -630,8 +630,8 @@ TEST_F(OrcAdapterTest, TestBridgeForMapType) { TEST_F(OrcAdapterTest, TestDataBufferSetData) { { // buffer1 !ownBuf - OrcMemoryPool orc_pool(GetDefaultPool()); - ::orc::DataBuffer buffer1(orc_pool, /*size=*/0, /*ownBuf=*/false); + ::orc::MemoryPool* orc_pool = AsOrcMemoryPool(*GetDefaultPool()); + ::orc::DataBuffer buffer1(*orc_pool, /*size=*/0, /*ownBuf=*/false); std::vector data = {1, 2, 3, 1}; buffer1.setData(data.data(), /*bufSize=*/4 * 4); @@ -646,8 +646,8 @@ TEST_F(OrcAdapterTest, TestDataBufferSetData) { } { // buffer1 ownBuf - OrcMemoryPool orc_pool(GetDefaultPool()); - ::orc::DataBuffer buffer1(orc_pool, /*size=*/4, /*ownBuf=*/true); + ::orc::MemoryPool* orc_pool = AsOrcMemoryPool(*GetDefaultPool()); + ::orc::DataBuffer buffer1(*orc_pool, /*size=*/4, /*ownBuf=*/true); buffer1[0] = 1; buffer1[1] = 2; buffer1[2] = 3; diff --git a/src/paimon/format/orc/orc_file_batch_reader.cpp b/src/paimon/format/orc/orc_file_batch_reader.cpp index 26f8862b..35032bf8 100644 --- a/src/paimon/format/orc/orc_file_batch_reader.cpp +++ b/src/paimon/format/orc/orc_file_batch_reader.cpp @@ -27,7 +27,6 @@ #include "fmt/format.h" #include "orc/OrcFile.hh" #include "paimon/common/metrics/metrics_impl.h" -#include "paimon/common/utils/arrow/mem_utils.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/common/utils/options_utils.h" #include "paimon/common/utils/scope_guard.h" @@ -35,7 +34,7 @@ #include "paimon/format/orc/orc_adapter.h" #include "paimon/format/orc/orc_format_defs.h" #include "paimon/format/orc/orc_input_stream_impl.h" -#include "paimon/format/orc/orc_memory_pool.h" +#include "paimon/format/orc/orc_memory_pool_adaptor.h" #include "paimon/format/orc/orc_metrics.h" #include "paimon/format/orc/predicate_converter.h" @@ -44,11 +43,9 @@ namespace paimon::orc { OrcFileBatchReader::OrcFileBatchReader(std::unique_ptr<::orc::ReaderMetrics>&& reader_metrics, std::unique_ptr&& reader, const std::map& options, - const std::shared_ptr& arrow_pool, - const std::shared_ptr<::orc::MemoryPool>& orc_pool) + const std::shared_ptr& memory_pool) : options_(options), - arrow_pool_(arrow_pool), - orc_pool_(orc_pool), + memory_pool_(memory_pool), reader_metrics_(std::move(reader_metrics)), reader_(std::move(reader)), metrics_(std::make_shared()) {} @@ -64,9 +61,7 @@ Result> OrcFileBatchReader::Create( return Status::Invalid("memory pool is nullptr"); } uint64_t natural_read_size = input_stream->getNaturalReadSize(); - auto orc_pool = std::make_shared(pool); - std::shared_ptr arrow_pool = GetArrowPool(pool); - reader_options.setMemoryPool(*orc_pool); + reader_options.setMemoryPool(*AsOrcMemoryPool(*pool)); std::unique_ptr<::orc::ReaderMetrics> reader_metrics; PAIMON_ASSIGN_OR_RAISE( @@ -83,12 +78,11 @@ Result> OrcFileBatchReader::Create( std::unique_ptr<::orc::Reader> reader = ::orc::createReader(std::move(input_stream), reader_options); - PAIMON_ASSIGN_OR_RAISE( - std::unique_ptr reader_wrapper, - OrcReaderWrapper::Create(std::move(reader), file_name, batch_size, natural_read_size, - options, arrow_pool, orc_pool)); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr reader_wrapper, + OrcReaderWrapper::Create(std::move(reader), file_name, batch_size, + natural_read_size, options, pool)); auto orc_file_batch_reader = std::unique_ptr(new OrcFileBatchReader( - std::move(reader_metrics), std::move(reader_wrapper), options, arrow_pool, orc_pool)); + std::move(reader_metrics), std::move(reader_wrapper), options, pool)); PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<::ArrowSchema> file_schema, orc_file_batch_reader->GetFileSchema()); PAIMON_RETURN_NOT_OK(orc_file_batch_reader->SetReadSchema( diff --git a/src/paimon/format/orc/orc_file_batch_reader.h b/src/paimon/format/orc/orc_file_batch_reader.h index ca4043b3..f5b38c11 100644 --- a/src/paimon/format/orc/orc_file_batch_reader.h +++ b/src/paimon/format/orc/orc_file_batch_reader.h @@ -58,8 +58,6 @@ class OrcFileBatchReader : public PrefetchFileBatchReader { return reader_->SetReadRanges(read_ranges); } - // Important: output ArrowArray is allocated on arrow_pool_ whose lifecycle holds in - // OrcFileBatchReader. Therefore, we need to hold BatchReader when using output ArrowArray. Result NextBatch() override; uint64_t GetPreviousBatchFirstRowNumber() const override { @@ -98,8 +96,7 @@ class OrcFileBatchReader : public PrefetchFileBatchReader { OrcFileBatchReader(std::unique_ptr<::orc::ReaderMetrics>&& reader_metrics, std::unique_ptr&& reader, const std::map& options, - const std::shared_ptr& arrow_pool, - const std::shared_ptr<::orc::MemoryPool>& orc_pool); + const std::shared_ptr& memory_pool); static void GetSubColumnIds(const ::orc::Type* type, std::vector* col_ids); @@ -115,8 +112,7 @@ class OrcFileBatchReader : public PrefetchFileBatchReader { std::map options_; - std::shared_ptr arrow_pool_; - std::shared_ptr<::orc::MemoryPool> orc_pool_; + std::shared_ptr memory_pool_; std::unique_ptr<::orc::ReaderMetrics> reader_metrics_; std::unique_ptr reader_; diff --git a/src/paimon/format/orc/orc_file_batch_reader_test.cpp b/src/paimon/format/orc/orc_file_batch_reader_test.cpp index ea6b1c07..ae6beea9 100644 --- a/src/paimon/format/orc/orc_file_batch_reader_test.cpp +++ b/src/paimon/format/orc/orc_file_batch_reader_test.cpp @@ -32,7 +32,6 @@ #include "paimon/format/orc/orc_format_defs.h" #include "paimon/format/orc/orc_format_writer.h" #include "paimon/format/orc/orc_input_stream_impl.h" -#include "paimon/format/orc/orc_memory_pool.h" #include "paimon/format/orc/orc_metrics.h" #include "paimon/format/orc/orc_output_stream_impl.h" #include "paimon/fs/local/local_file_system.h" @@ -249,7 +248,6 @@ TEST_F(OrcFileBatchReaderTest, TestSetReadSchema) { } TEST_F(OrcFileBatchReaderTest, TestCreateRowReaderOptions) { - auto orc_pool = std::make_shared(pool_); std::vector target_column_ids; { // read all fields && default options diff --git a/src/paimon/format/orc/orc_format_writer.cpp b/src/paimon/format/orc/orc_format_writer.cpp index 0d7c99bc..5856924a 100644 --- a/src/paimon/format/orc/orc_format_writer.cpp +++ b/src/paimon/format/orc/orc_format_writer.cpp @@ -39,9 +39,10 @@ #include "paimon/core/schema/arrow_schema_validator.h" #include "paimon/format/orc/orc_adapter.h" #include "paimon/format/orc/orc_format_defs.h" -#include "paimon/format/orc/orc_memory_pool.h" +#include "paimon/format/orc/orc_memory_pool_adaptor.h" #include "paimon/format/orc/orc_metrics.h" #include "paimon/macros.h" +#include "paimon/memory/memory_pool.h" namespace paimon { class MemoryPool; @@ -50,7 +51,7 @@ struct ArrowArray; namespace paimon::orc { -OrcFormatWriter::OrcFormatWriter(const std::shared_ptr& orc_memory_pool, +OrcFormatWriter::OrcFormatWriter(const std::shared_ptr& memory_pool, std::unique_ptr<::orc::OutputStream>&& output_stream, std::unique_ptr<::orc::WriterMetrics>&& writer_metrics, std::unique_ptr<::orc::Writer>&& writer, @@ -58,7 +59,7 @@ OrcFormatWriter::OrcFormatWriter(const std::shared_ptr& orc_memor std::unique_ptr<::orc::Type>&& orc_type, const ::orc::WriterOptions& writer_options, const std::shared_ptr& data_type) - : orc_memory_pool_(orc_memory_pool), + : memory_pool_(memory_pool), output_stream_(std::move(output_stream)), writer_metrics_(std::move(writer_metrics)), writer_(std::move(writer)), @@ -78,10 +79,8 @@ Result> OrcFormatWriter::Create( try { PAIMON_ASSIGN_OR_RAISE(::orc::WriterOptions writer_options, PrepareWriterOptions(options, compression, data_type)); - std::shared_ptr orc_memory_pool; if (pool) { - orc_memory_pool = std::make_shared(pool); - writer_options.setMemoryPool(orc_memory_pool.get()); + writer_options.setMemoryPool(AsOrcMemoryPool(*pool)); } std::unique_ptr<::orc::WriterMetrics> writer_metrics; @@ -97,7 +96,7 @@ Result> OrcFormatWriter::Create( assert(writer); std::unique_ptr<::orc::ColumnVectorBatch> orc_batch = writer->createRowBatch(batch_size); return std::unique_ptr(new OrcFormatWriter( - orc_memory_pool, std::move(output_stream), std::move(writer_metrics), std::move(writer), + pool, std::move(output_stream), std::move(writer_metrics), std::move(writer), std::move(orc_batch), std::move(orc_type), writer_options, data_type)); } catch (const std::exception& e) { return Status::Invalid( diff --git a/src/paimon/format/orc/orc_format_writer.h b/src/paimon/format/orc/orc_format_writer.h index fd5a1fa9..320b262c 100644 --- a/src/paimon/format/orc/orc_format_writer.h +++ b/src/paimon/format/orc/orc_format_writer.h @@ -28,7 +28,6 @@ #include "orc/Vector.hh" #include "orc/Writer.hh" #include "paimon/format/format_writer.h" -#include "paimon/format/orc/orc_memory_pool.h" #include "paimon/metrics.h" #include "paimon/result.h" #include "paimon/status.h" @@ -39,9 +38,6 @@ class Schema; } // namespace arrow namespace paimon { class MemoryPool; -namespace orc { -class OrcMemoryPool; -} // namespace orc } // namespace paimon struct ArrowArray; @@ -65,7 +61,7 @@ class OrcFormatWriter : public FormatWriter { std::shared_ptr GetWriterMetrics() const override; private: - OrcFormatWriter(const std::shared_ptr& orc_memory_pool, + OrcFormatWriter(const std::shared_ptr& memory_pool, std::unique_ptr<::orc::OutputStream>&& output_stream, std::unique_ptr<::orc::WriterMetrics>&& writer_metrics, std::unique_ptr<::orc::Writer>&& writer, @@ -83,7 +79,7 @@ class OrcFormatWriter : public FormatWriter { static Result<::orc::CompressionKind> ToOrcCompressionKind(const std::string& file_compression); private: - std::shared_ptr orc_memory_pool_; + std::shared_ptr memory_pool_; std::unique_ptr<::orc::OutputStream> output_stream_; std::unique_ptr<::orc::WriterMetrics> writer_metrics_; std::unique_ptr<::orc::Writer> writer_; diff --git a/src/paimon/format/orc/orc_memory_pool.h b/src/paimon/format/orc/orc_memory_pool_adaptor.h similarity index 72% rename from src/paimon/format/orc/orc_memory_pool.h rename to src/paimon/format/orc/orc_memory_pool_adaptor.h index 0da96756..8d51d54f 100644 --- a/src/paimon/format/orc/orc_memory_pool.h +++ b/src/paimon/format/orc/orc_memory_pool_adaptor.h @@ -19,15 +19,20 @@ #include #include "orc/MemoryPool.hh" -#include "paimon/common/utils/concurrent_hash_map.h" #include "paimon/memory/memory_pool.h" +#include "paimon/memory/memory_pool_adaptor.h" namespace paimon::orc { -class OrcMemoryPool : public ::orc::MemoryPool { +class OrcMemoryPoolAdaptor : public ::orc::MemoryPool, + public MemoryPoolAdaptor { public: + static std::string Identifier() { + return "OrcMemoryPoolAdaptor"; + } + using SizeType = uint64_t; - explicit OrcMemoryPool(const std::shared_ptr& pool) : pool_(pool) {} + explicit OrcMemoryPoolAdaptor(paimon::MemoryPool& pool) : pool_(pool) {} char* malloc(SizeType size) override { if (size == 0) { return ZERO_SIZE_AREA; @@ -35,7 +40,7 @@ class OrcMemoryPool : public ::orc::MemoryPool { if (size > std::numeric_limits::max() - HEADER_SIZE) { return nullptr; } - if (void* ret = pool_->Malloc(size + HEADER_SIZE)) { + if (void* ret = pool_.Malloc(size + HEADER_SIZE)) { *reinterpret_cast(ret) = size; return reinterpret_cast(ret) + HEADER_SIZE; } @@ -47,7 +52,7 @@ class OrcMemoryPool : public ::orc::MemoryPool { } char* raw = p - HEADER_SIZE; SizeType size = *reinterpret_cast(raw); - pool_->Free(raw, size + HEADER_SIZE); + pool_.Free(raw, size + HEADER_SIZE); } private: @@ -55,7 +60,11 @@ class OrcMemoryPool : public ::orc::MemoryPool { static constexpr size_t HEADER_SIZE = (sizeof(SizeType) + ALIGNMENT - 1) & ~(ALIGNMENT - 1); alignas(ALIGNMENT) inline static char ZERO_SIZE_AREA[1]; - std::shared_ptr pool_; + paimon::MemoryPool& pool_; }; +inline ::orc::MemoryPool* AsOrcMemoryPool(paimon::MemoryPool& pool) { + return pool.AsSpecifiedMemoryPool(); +} + } // namespace paimon::orc diff --git a/src/paimon/format/orc/orc_reader_wrapper.cpp b/src/paimon/format/orc/orc_reader_wrapper.cpp index 77253168..5e50b3ba 100644 --- a/src/paimon/format/orc/orc_reader_wrapper.cpp +++ b/src/paimon/format/orc/orc_reader_wrapper.cpp @@ -23,6 +23,7 @@ #include "fmt/format.h" #include "orc/OrcFile.hh" +#include "paimon/common/utils/arrow/arrow_memory_pool_adaptor.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/common/utils/scope_guard.h" @@ -76,9 +77,9 @@ Result OrcReaderWrapper::Next() { } ScopeGuard guard([this]() { has_error_ = true; }); assert(orc_batch->numElements > 0); - PAIMON_ASSIGN_OR_RAISE( - std::shared_ptr array, - OrcAdapter::AppendBatch(target_type_, orc_batch.get(), arrow_pool_.get())); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr array, + OrcAdapter::AppendBatch(target_type_, orc_batch.get(), + AsArrowMemoryPool(*memory_pool_))); PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, c_array.get(), c_schema.get())); next_row_ = GetRowNumber() + orc_batch->numElements; guard.Release(); diff --git a/src/paimon/format/orc/orc_reader_wrapper.h b/src/paimon/format/orc/orc_reader_wrapper.h index 01c50c27..1f1a3eae 100644 --- a/src/paimon/format/orc/orc_reader_wrapper.h +++ b/src/paimon/format/orc/orc_reader_wrapper.h @@ -49,14 +49,12 @@ class OrcReaderWrapper { static Result> Create( std::unique_ptr<::orc::Reader> reader, const std::string& file_name, int32_t batch_size, uint64_t natural_read_size, const std::map& options, - const std::shared_ptr& arrow_pool, - const std::shared_ptr<::orc::MemoryPool>& orc_pool) { + const std::shared_ptr& memory_pool) { PAIMON_ASSIGN_OR_RAISE( std::unique_ptr range_generator, ReadRangeGenerator::Create(reader.get(), natural_read_size, options)); - return std::unique_ptr( - new OrcReaderWrapper(std::move(reader), std::move(range_generator), file_name, - batch_size, arrow_pool, orc_pool)); + return std::unique_ptr(new OrcReaderWrapper( + std::move(reader), std::move(range_generator), file_name, batch_size, memory_pool)); } Status SeekToRow(uint64_t row_number); @@ -115,14 +113,12 @@ class OrcReaderWrapper { OrcReaderWrapper(std::unique_ptr<::orc::Reader> reader, std::unique_ptr range_generator, const std::string& file_name, int32_t batch_size, - const std::shared_ptr& arrow_pool, - const std::shared_ptr<::orc::MemoryPool>& orc_pool) + const std::shared_ptr& memory_pool) : reader_(std::move(reader)), range_generator_(std::move(range_generator)), file_name_(file_name), batch_size_(batch_size), - arrow_pool_(arrow_pool), - orc_pool_(orc_pool) {} + memory_pool_(memory_pool) {} std::unique_ptr<::orc::Reader> reader_; std::unique_ptr<::orc::RowReader> row_reader_; @@ -132,8 +128,7 @@ class OrcReaderWrapper { const std::string file_name_; const int32_t batch_size_; - std::shared_ptr arrow_pool_; - std::shared_ptr<::orc::MemoryPool> orc_pool_; + std::shared_ptr memory_pool_; std::shared_ptr target_type_; diff --git a/src/paimon/format/orc/orc_reader_wrapper_test.cpp b/src/paimon/format/orc/orc_reader_wrapper_test.cpp index 9c50c8eb..3d6ca3fa 100644 --- a/src/paimon/format/orc/orc_reader_wrapper_test.cpp +++ b/src/paimon/format/orc/orc_reader_wrapper_test.cpp @@ -21,7 +21,6 @@ #include "gtest/gtest.h" #include "orc/OrcFile.hh" #include "paimon/common/reader/reader_utils.h" -#include "paimon/common/utils/arrow/mem_utils.h" #include "paimon/testing/utils/testharness.h" namespace paimon::orc::test { @@ -72,8 +71,7 @@ TEST_F(OrcReaderWrapperTest, NextRowToRead) { /*batch_size=*/2, /*natural_read_size=*/0, /*options=*/options, - /*arrow_pool=*/GetArrowPool(GetDefaultPool()), - /*orc_pool=*/nullptr)); + /*memory_pool=*/GetDefaultPool())); auto data_types = arrow::struct_({arrow::field("col1", arrow::int64()), arrow::field("col2", arrow::utf8())}); ::orc::RowReaderOptions row_opts; diff --git a/src/paimon/format/orc/orc_stats_extractor.cpp b/src/paimon/format/orc/orc_stats_extractor.cpp index 5d043278..7f04d12e 100644 --- a/src/paimon/format/orc/orc_stats_extractor.cpp +++ b/src/paimon/format/orc/orc_stats_extractor.cpp @@ -35,7 +35,7 @@ #include "paimon/format/column_stats.h" #include "paimon/format/orc/orc_format_defs.h" #include "paimon/format/orc/orc_input_stream_impl.h" -#include "paimon/format/orc/orc_memory_pool.h" +#include "paimon/format/orc/orc_memory_pool_adaptor.h" #include "paimon/fs/file_system.h" #include "paimon/predicate/literal.h" #include "paimon/status.h" @@ -58,8 +58,7 @@ OrcStatsExtractor::ExtractWithFileInfo(const std::shared_ptr& file_s OrcInputStreamImpl::Create(input_stream, DEFAULT_NATURAL_READ_SIZE)); try { ::orc::ReaderOptions reader_options; - auto orc_pool = std::make_shared(pool); - reader_options.setMemoryPool(*orc_pool); + reader_options.setMemoryPool(*AsOrcMemoryPool(*pool)); std::unique_ptr<::orc::Reader> reader = ::orc::createReader(std::move(orc_input_stream), reader_options); assert(reader); diff --git a/src/paimon/format/parquet/file_reader_wrapper_test.cpp b/src/paimon/format/parquet/file_reader_wrapper_test.cpp index 6a617719..a513a4bf 100644 --- a/src/paimon/format/parquet/file_reader_wrapper_test.cpp +++ b/src/paimon/format/parquet/file_reader_wrapper_test.cpp @@ -28,7 +28,7 @@ #include "arrow/io/caching.h" #include "arrow/memory_pool.h" #include "gtest/gtest.h" -#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/arrow/arrow_memory_pool_adaptor.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/common/utils/path_util.h" #include "paimon/format/parquet/parquet_field_id_converter.h" @@ -55,7 +55,6 @@ class FileReaderWrapperTest : public ::testing::Test { ASSERT_TRUE(dir_); fs_ = std::make_shared(); pool_ = GetDefaultPool(); - arrow_pool_ = GetArrowPool(pool_); batch_size_ = 512; } void TearDown() override {} @@ -117,7 +116,7 @@ class FileReaderWrapperTest : public ::testing::Test { Result> PrepareReaderWrapper(const std::string& file_path) { PAIMON_ASSIGN_OR_RAISE(std::shared_ptr in, fs_->Open(file_path)); PAIMON_ASSIGN_OR_RAISE(uint64_t file_length, in->Length()); - auto input_stream = std::make_unique(in, arrow_pool_, file_length); + auto input_stream = std::make_unique(in, pool_, file_length); ::parquet::arrow::FileReaderBuilder file_reader_builder; ::parquet::ReaderProperties reader_properties; reader_properties.enable_buffered_stream(); @@ -130,7 +129,7 @@ class FileReaderWrapperTest : public ::testing::Test { arrow_reader_props.set_use_threads(true); arrow_reader_props.set_cache_options(arrow::io::CacheOptions::Defaults()); std::unique_ptr<::parquet::arrow::FileReader> file_reader; - PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_builder.memory_pool(arrow_pool_.get()) + PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_builder.memory_pool(AsArrowMemoryPool(*pool_)) ->properties(arrow_reader_props) ->Build(&file_reader)); return FileReaderWrapper::Create(std::move(file_reader)); @@ -150,7 +149,7 @@ class FileReaderWrapperTest : public ::testing::Test { auto writer_properties = builder.build(); ASSERT_OK_AND_ASSIGN( std::shared_ptr format_writer, - ParquetFormatWriter::Create(out, arrow_schema, writer_properties, arrow_pool_)); + ParquetFormatWriter::Create(out, arrow_schema, writer_properties, pool_)); AddRecordBatchOnce(format_writer, struct_type, /*record_batch_size=*/row_count, /*offset=*/0); @@ -164,7 +163,6 @@ class FileReaderWrapperTest : public ::testing::Test { std::unique_ptr dir_; std::shared_ptr fs_; std::shared_ptr pool_; - std::shared_ptr arrow_pool_; int32_t batch_size_; }; diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.cpp b/src/paimon/format/parquet/parquet_file_batch_reader.cpp index cfb09102..7ce50483 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.cpp +++ b/src/paimon/format/parquet/parquet_file_batch_reader.cpp @@ -35,6 +35,7 @@ #include "arrow/util/range.h" #include "fmt/format.h" #include "paimon/common/metrics/metrics_impl.h" +#include "paimon/common/utils/arrow/arrow_memory_pool_adaptor.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/common/utils/options_utils.h" #include "paimon/format/parquet/parquet_field_id_converter.h" @@ -58,9 +59,9 @@ namespace paimon::parquet { ParquetFileBatchReader::ParquetFileBatchReader( std::shared_ptr&& input_stream, std::unique_ptr&& reader, const std::map& options, - const std::shared_ptr& arrow_pool) + const std::shared_ptr& memory_pool) : options_(options), - arrow_pool_(arrow_pool), + memory_pool_(memory_pool), input_stream_(std::move(input_stream)), reader_(std::move(reader)), read_ranges_(reader_->GetAllRowGroupRanges()), @@ -68,8 +69,8 @@ ParquetFileBatchReader::ParquetFileBatchReader( Result> ParquetFileBatchReader::Create( std::shared_ptr&& input_stream, - const std::shared_ptr& pool, - const std::map& options, int32_t batch_size) { + const std::shared_ptr& pool, const std::map& options, + int32_t batch_size) { assert(input_stream); PAIMON_ASSIGN_OR_RAISE(::parquet::ReaderProperties reader_properties, CreateReaderProperties(pool, options)); @@ -80,7 +81,7 @@ Result> ParquetFileBatchReader::Create( PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_builder.Open(input_stream, reader_properties)); std::unique_ptr<::parquet::arrow::FileReader> file_reader; - PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_builder.memory_pool(pool.get()) + PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_builder.memory_pool(AsArrowMemoryPool(*pool)) ->properties(arrow_reader_properties) ->Build(&file_reader)); @@ -230,7 +231,7 @@ Result ParquetFileBatchReader::NextBatch() { array->type(), read_data_type_)); if (need_cast) { PAIMON_ASSIGN_OR_RAISE(array, ParquetTimestampConverter::CastArrayForTimestamp( - array, read_data_type_, arrow_pool_)); + array, read_data_type_, memory_pool_)); } PAIMON_ASSIGN_OR_RAISE(need_cast, ParquetTimestampConverter::NeedCastArrayForTimestamp( array->type(), read_data_type_)); @@ -253,8 +254,7 @@ Result>> ParquetFileBatchReader::GenRe } Result<::parquet::ReaderProperties> ParquetFileBatchReader::CreateReaderProperties( - const std::shared_ptr& pool, - const std::map& options) { + const std::shared_ptr& pool, const std::map& options) { ::parquet::ReaderProperties reader_properties; // TODO(jinli.zjw): set more ReaderProperties (compare with java) reader_properties.enable_buffered_stream(); @@ -262,8 +262,8 @@ Result<::parquet::ReaderProperties> ParquetFileBatchReader::CreateReaderProperti } Result<::parquet::ArrowReaderProperties> ParquetFileBatchReader::CreateArrowReaderProperties( - const std::shared_ptr& pool, - const std::map& options, int32_t batch_size) { + const std::shared_ptr& pool, const std::map& options, + int32_t batch_size) { PAIMON_ASSIGN_OR_RAISE(bool use_threads, OptionsUtils::GetValueFromMap(options, PARQUET_READ_USE_THREADS, DEFAULT_PARQUET_READ_USE_THREADS)); diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.h b/src/paimon/format/parquet/parquet_file_batch_reader.h index 6294eecd..b660d309 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.h +++ b/src/paimon/format/parquet/parquet_file_batch_reader.h @@ -61,8 +61,8 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader { public: static Result> Create( std::shared_ptr&& input_stream, - const std::shared_ptr& pool, - const std::map& options, int32_t batch_size); + const std::shared_ptr& pool, const std::map& options, + int32_t batch_size); // For timestamp type, we return the schema stored in file, e.g., second in parquet file will // store as milli. @@ -76,9 +76,6 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader { return reader_->SeekToRow(row_number); } - // Important: output ArrowArray is allocated on arrow_pool_ whose lifecycle holds in - // ParquetFileBatchReader. Therefore, we need to hold BatchReader when using output - // ArrowArray. Result NextBatch() override; Result>> GenReadRanges( @@ -128,15 +125,14 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader { ParquetFileBatchReader(std::shared_ptr&& input_stream, std::unique_ptr&& reader, const std::map& options, - const std::shared_ptr& arrow_pool); + const std::shared_ptr& memory_pool); static Result<::parquet::ReaderProperties> CreateReaderProperties( - const std::shared_ptr& pool, - const std::map& options); + const std::shared_ptr& pool, const std::map& options); static Result<::parquet::ArrowReaderProperties> CreateArrowReaderProperties( - const std::shared_ptr& pool, - const std::map& options, int32_t batch_size); + const std::shared_ptr& pool, const std::map& options, + int32_t batch_size); static void FlattenSchema(const std::shared_ptr& type, int32_t* index, std::vector* index_vector) { @@ -163,8 +159,7 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader { private: std::map options_; - // hold the lifecycle of arrow memory pool. - std::shared_ptr arrow_pool_; + std::shared_ptr memory_pool_; std::shared_ptr input_stream_; std::unique_ptr reader_; diff --git a/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp b/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp index 1c4863e5..e719448e 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp +++ b/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp @@ -32,7 +32,6 @@ #include "arrow/util/thread_pool.h" #include "gtest/gtest.h" #include "paimon/common/types/data_field.h" -#include "paimon/common/utils/arrow/mem_utils.h" #include "paimon/common/utils/date_time_utils.h" #include "paimon/common/utils/path_util.h" #include "paimon/defs.h" @@ -65,7 +64,7 @@ class ParquetFileBatchReaderTest : public ::testing::Test, dir_ = paimon::test::UniqueTestDirectory::Create(); ASSERT_TRUE(dir_); fs_ = std::make_shared(); - pool_ = GetArrowPool(GetDefaultPool()); + pool_ = GetDefaultPool(); batch_size_ = 10; file_path_ = PathUtil::JoinPath(dir_->Str(), "test.parquet"); @@ -156,7 +155,7 @@ class ParquetFileBatchReaderTest : public ::testing::Test, std::string file_path_; std::unique_ptr dir_; std::shared_ptr fs_; - std::shared_ptr pool_; + std::shared_ptr pool_; int32_t batch_size_; std::shared_ptr schema_; std::shared_ptr struct_array_; diff --git a/src/paimon/format/parquet/parquet_format_writer.cpp b/src/paimon/format/parquet/parquet_format_writer.cpp index 338cd5ca..1e0c1a08 100644 --- a/src/paimon/format/parquet/parquet_format_writer.cpp +++ b/src/paimon/format/parquet/parquet_format_writer.cpp @@ -22,9 +22,11 @@ #include "arrow/c/bridge.h" #include "arrow/record_batch.h" #include "paimon/common/metrics/metrics_impl.h" +#include "paimon/common/utils/arrow/arrow_memory_pool_adaptor.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/format/parquet/parquet_format_defs.h" #include "paimon/format/parquet/parquet_output_stream_impl.h" +#include "paimon/memory/memory_pool.h" #include "paimon/metrics.h" #include "parquet/arrow/writer.h" #include "parquet/properties.h" @@ -44,15 +46,15 @@ Result> ParquetFormatWriter::Create( const std::shared_ptr& output_stream, const std::shared_ptr& schema, const std::shared_ptr<::parquet::WriterProperties>& writer_properties, - const std::shared_ptr& pool) { + const std::shared_ptr& pool) { auto out = std::make_shared(output_stream); ::parquet::ArrowWriterProperties::Builder arrow_properties_builder; auto arrow_writer_properties = arrow_properties_builder.enable_deprecated_int96_timestamps()->build(); PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( std::unique_ptr<::parquet::arrow::FileWriter> file_writer, - ::parquet::arrow::FileWriter::Open(*schema, pool.get(), out, writer_properties, - arrow_writer_properties)); + ::parquet::arrow::FileWriter::Open(*schema, AsArrowMemoryPool(*pool), out, + writer_properties, arrow_writer_properties)); return std::unique_ptr( new ParquetFormatWriter(std::move(file_writer), out, schema, pool)); } @@ -92,7 +94,7 @@ Result ParquetFormatWriter::GetEstimateLength() const { ParquetFormatWriter::ParquetFormatWriter(std::unique_ptr<::parquet::arrow::FileWriter> writer, const std::shared_ptr& out, const std::shared_ptr& schema, - const std::shared_ptr& pool) + const std::shared_ptr& pool) : pool_(pool), out_(out), writer_(std::move(writer)), diff --git a/src/paimon/format/parquet/parquet_format_writer.h b/src/paimon/format/parquet/parquet_format_writer.h index 9b4589e7..e6557cff 100644 --- a/src/paimon/format/parquet/parquet_format_writer.h +++ b/src/paimon/format/parquet/parquet_format_writer.h @@ -19,7 +19,6 @@ #include #include -#include "paimon/common/utils/arrow/mem_utils.h" #include "paimon/format/format_writer.h" #include "paimon/format/parquet/parquet_output_stream_impl.h" #include "paimon/fs/file_system.h" @@ -54,7 +53,7 @@ class ParquetFormatWriter : public FormatWriter { const std::shared_ptr& output_stream, const std::shared_ptr& schema, const std::shared_ptr<::parquet::WriterProperties>& writer_properties, - const std::shared_ptr& pool); + const std::shared_ptr& pool); Status AddBatch(ArrowArray* batch) override; @@ -72,11 +71,11 @@ class ParquetFormatWriter : public FormatWriter { ParquetFormatWriter(std::unique_ptr<::parquet::arrow::FileWriter> writer, const std::shared_ptr& out, const std::shared_ptr& schema, - const std::shared_ptr& pool); + const std::shared_ptr& pool); Result GetEstimateLength() const; - std::shared_ptr pool_; + std::shared_ptr pool_; std::shared_ptr out_; std::unique_ptr<::parquet::arrow::FileWriter> writer_; std::shared_ptr schema_; diff --git a/src/paimon/format/parquet/parquet_format_writer_test.cpp b/src/paimon/format/parquet/parquet_format_writer_test.cpp index 2c3a39f3..c961a40e 100644 --- a/src/paimon/format/parquet/parquet_format_writer_test.cpp +++ b/src/paimon/format/parquet/parquet_format_writer_test.cpp @@ -33,7 +33,7 @@ #include "arrow/ipc/api.h" #include "arrow/memory_pool.h" #include "gtest/gtest.h" -#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/arrow/arrow_memory_pool_adaptor.h" #include "paimon/common/utils/date_time_utils.h" #include "paimon/common/utils/path_util.h" #include "paimon/format/parquet/parquet_field_id_converter.h" @@ -63,7 +63,6 @@ class ParquetFormatWriterTest : public ::testing::Test { ASSERT_TRUE(dir_); fs_ = std::make_shared(); pool_ = GetDefaultPool(); - arrow_pool_ = GetArrowPool(pool_); } void TearDown() override {} @@ -122,10 +121,11 @@ class ParquetFormatWriterTest : public ::testing::Test { } void CheckResult(const std::string& file_path, int32_t row_count) const { - auto file = arrow::io::ReadableFile::Open(file_path, arrow_pool_.get()); + auto file = arrow::io::ReadableFile::Open(file_path, AsArrowMemoryPool(*pool_)); ASSERT_TRUE(file.ok()); std::unique_ptr<::parquet::arrow::FileReader> reader; - auto status = ::parquet::arrow::OpenFile(file.ValueOrDie(), arrow_pool_.get(), &reader); + auto status = + ::parquet::arrow::OpenFile(file.ValueOrDie(), AsArrowMemoryPool(*pool_), &reader); ASSERT_TRUE(status.ok()) << status.ToString(); const ::parquet::FileMetaData* metadata = reader->parquet_reader()->metadata().get(); const ::parquet::SchemaDescriptor* schema = metadata->schema(); @@ -176,7 +176,6 @@ class ParquetFormatWriterTest : public ::testing::Test { std::unique_ptr dir_; std::shared_ptr fs_; std::shared_ptr pool_; - std::shared_ptr arrow_pool_; }; TEST_F(ParquetFormatWriterTest, TestWriteWithVariousBatchSize) { @@ -196,7 +195,7 @@ TEST_F(ParquetFormatWriterTest, TestWriteWithVariousBatchSize) { auto writer_properties = builder.build(); ASSERT_OK_AND_ASSIGN( auto format_writer, - ParquetFormatWriter::Create(out, arrow_schema, writer_properties, arrow_pool_)); + ParquetFormatWriter::Create(out, arrow_schema, writer_properties, pool_)); auto array = PrepareArray(struct_type, record_batch_size); auto arrow_array = std::make_unique(); ASSERT_TRUE(arrow::ExportArray(*array, arrow_array.get()).ok()); @@ -227,9 +226,8 @@ TEST_F(ParquetFormatWriterTest, TestWriteMultipleTimes) { ::parquet::WriterProperties::Builder builder; builder.write_batch_size(10); auto writer_properties = builder.build(); - ASSERT_OK_AND_ASSIGN( - std::shared_ptr format_writer, - ParquetFormatWriter::Create(out, arrow_schema, writer_properties, arrow_pool_)); + ASSERT_OK_AND_ASSIGN(std::shared_ptr format_writer, + ParquetFormatWriter::Create(out, arrow_schema, writer_properties, pool_)); // add batch first time, 6 rows AddRecordBatchOnce(format_writer, struct_type, 6, 0); @@ -269,9 +267,8 @@ TEST_F(ParquetFormatWriterTest, TestGetEstimateLength) { fs_->Create(file_path, /*overwrite=*/false)); ::parquet::WriterProperties::Builder builder; auto writer_properties = builder.build(); - ASSERT_OK_AND_ASSIGN( - std::shared_ptr format_writer, - ParquetFormatWriter::Create(out, arrow_schema, writer_properties, arrow_pool_)); + ASSERT_OK_AND_ASSIGN(std::shared_ptr format_writer, + ParquetFormatWriter::Create(out, arrow_schema, writer_properties, pool_)); // add batch first time, 1 row AddRecordBatchOnce(format_writer, struct_type, 1, 0); @@ -306,7 +303,7 @@ TEST_F(ParquetFormatWriterTest, TestTimestampType) { auto writer_properties = builder.build(); ASSERT_OK_AND_ASSIGN(std::shared_ptr format_writer, ParquetFormatWriter::Create(out, std::make_shared(fields), - writer_properties, arrow_pool_)); + writer_properties, pool_)); auto array = std::dynamic_pointer_cast( arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([ diff --git a/src/paimon/format/parquet/parquet_input_output_stream_test.cpp b/src/paimon/format/parquet/parquet_input_output_stream_test.cpp index 020e583e..10f59c1e 100644 --- a/src/paimon/format/parquet/parquet_input_output_stream_test.cpp +++ b/src/paimon/format/parquet/parquet_input_output_stream_test.cpp @@ -22,7 +22,6 @@ #include "arrow/io/type_fwd.h" #include "arrow/util/future.h" #include "gtest/gtest.h" -#include "paimon/common/utils/arrow/mem_utils.h" #include "paimon/format/parquet/parquet_input_stream_impl.h" #include "paimon/format/parquet/parquet_output_stream_impl.h" #include "paimon/fs/file_system.h" @@ -63,8 +62,7 @@ TEST(ParquetInputOutputStreamTest, TestInOutStream) { // in stream ASSERT_OK_AND_ASSIGN(std::shared_ptr in, file_system->Open(file_name)); ASSERT_OK_AND_ASSIGN(uint64_t length, in->Length()); - auto in_stream = - std::make_unique(in, GetArrowPool(GetDefaultPool()), length); + auto in_stream = std::make_unique(in, GetDefaultPool(), length); int64_t file_length = in_stream->GetSize().ValueOr(-1); ASSERT_EQ(file_length, data.length()); int64_t in_tell1 = in_stream->Tell().ValueOr(-1); diff --git a/src/paimon/format/parquet/parquet_input_stream_impl.cpp b/src/paimon/format/parquet/parquet_input_stream_impl.cpp index 3168db9a..5c63ed56 100644 --- a/src/paimon/format/parquet/parquet_input_stream_impl.cpp +++ b/src/paimon/format/parquet/parquet_input_stream_impl.cpp @@ -21,9 +21,11 @@ #include "arrow/api.h" #include "arrow/util/future.h" +#include "paimon/common/utils/arrow/arrow_memory_pool_adaptor.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/fs/file_system.h" #include "paimon/macros.h" +#include "paimon/memory/memory_pool.h" #include "paimon/result.h" #include "paimon/status.h" @@ -35,7 +37,7 @@ namespace paimon::parquet { ParquetInputStreamImpl::ParquetInputStreamImpl( const std::shared_ptr<::paimon::InputStream>& input_stream, - const std::shared_ptr& pool, uint64_t file_size) + const std::shared_ptr& pool, uint64_t file_size) : input_stream_(input_stream), pool_(pool), file_size_(file_size) {} ParquetInputStreamImpl::~ParquetInputStreamImpl() { @@ -56,7 +58,7 @@ arrow::Result ParquetInputStreamImpl::Read(int64_t nbytes, void* out) { arrow::Result> ParquetInputStreamImpl::Read(int64_t nbytes) { ARROW_ASSIGN_OR_RAISE(std::shared_ptr buffer, - arrow::AllocateResizableBuffer(nbytes, pool_.get())); + arrow::AllocateResizableBuffer(nbytes, AsArrowMemoryPool(*pool_))); ARROW_ASSIGN_OR_RAISE(int64_t read_bytes, Read(nbytes, buffer->mutable_data())); if (read_bytes < nbytes) { RETURN_NOT_OK(buffer->Resize(read_bytes)); @@ -75,7 +77,7 @@ arrow::Result ParquetInputStreamImpl::ReadAt(int64_t position, int64_t arrow::Result> ParquetInputStreamImpl::ReadAt(int64_t position, int64_t nbytes) { ARROW_ASSIGN_OR_RAISE(std::shared_ptr buffer, - arrow::AllocateResizableBuffer(nbytes, pool_.get())); + arrow::AllocateResizableBuffer(nbytes, AsArrowMemoryPool(*pool_))); ARROW_ASSIGN_OR_RAISE(int64_t read_bytes, ReadAt(position, nbytes, buffer->mutable_data())); if (read_bytes < nbytes) { RETURN_NOT_OK(buffer->Resize(read_bytes)); @@ -86,7 +88,7 @@ arrow::Result> ParquetInputStreamImpl::ReadAt(int arrow::Future> ParquetInputStreamImpl::ReadAsync( const arrow::io::IOContext& io_context, int64_t position, int64_t nbytes) { arrow::Result> buffer_result = - arrow::AllocateResizableBuffer(nbytes, pool_.get()); + arrow::AllocateResizableBuffer(nbytes, AsArrowMemoryPool(*pool_)); auto fut = arrow::Future>::Make(); if (PAIMON_UNLIKELY(!buffer_result.ok())) { fut.MarkFinished(buffer_result.status()); diff --git a/src/paimon/format/parquet/parquet_input_stream_impl.h b/src/paimon/format/parquet/parquet_input_stream_impl.h index a20684fc..b488f984 100644 --- a/src/paimon/format/parquet/parquet_input_stream_impl.h +++ b/src/paimon/format/parquet/parquet_input_stream_impl.h @@ -33,7 +33,7 @@ namespace paimon::parquet { class ParquetInputStreamImpl : public arrow::io::RandomAccessFile { public: ParquetInputStreamImpl(const std::shared_ptr<::paimon::InputStream>& input_stream, - const std::shared_ptr& pool, uint64_t file_size); + const std::shared_ptr& pool, uint64_t file_size); ~ParquetInputStreamImpl() override; // NOTE: In paimon file system definition, position + nbytes should not exceed file_size_. @@ -55,7 +55,7 @@ class ParquetInputStreamImpl : public arrow::io::RandomAccessFile { private: arrow::Status DoClose(); std::shared_ptr<::paimon::InputStream> input_stream_; - std::shared_ptr pool_; + std::shared_ptr pool_; uint64_t file_size_; bool closed_ = false; }; diff --git a/src/paimon/format/parquet/parquet_reader_builder.h b/src/paimon/format/parquet/parquet_reader_builder.h index 6ab8062f..9b5e5042 100644 --- a/src/paimon/format/parquet/parquet_reader_builder.h +++ b/src/paimon/format/parquet/parquet_reader_builder.h @@ -21,7 +21,6 @@ #include #include -#include "paimon/common/utils/arrow/mem_utils.h" #include "paimon/format/parquet/parquet_file_batch_reader.h" #include "paimon/format/parquet/parquet_input_stream_impl.h" #include "paimon/format/reader_builder.h" @@ -44,9 +43,8 @@ class ParquetReaderBuilder : public ReaderBuilder { Result> Build( const std::shared_ptr& path) const override { PAIMON_ASSIGN_OR_RAISE(uint64_t file_length, path->Length()); - std::shared_ptr arrow_pool = GetArrowPool(pool_); - auto input_stream = std::make_unique(path, arrow_pool, file_length); - return ParquetFileBatchReader::Create(std::move(input_stream), arrow_pool, options_, + auto input_stream = std::make_unique(path, pool_, file_length); + return ParquetFileBatchReader::Create(std::move(input_stream), pool_, options_, batch_size_); } diff --git a/src/paimon/format/parquet/parquet_stats_extractor.cpp b/src/paimon/format/parquet/parquet_stats_extractor.cpp index 82264adf..fe59a8a5 100644 --- a/src/paimon/format/parquet/parquet_stats_extractor.cpp +++ b/src/paimon/format/parquet/parquet_stats_extractor.cpp @@ -25,7 +25,7 @@ #include "arrow/type.h" #include "arrow/util/checked_cast.h" #include "fmt/format.h" -#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/arrow/arrow_memory_pool_adaptor.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/common/utils/date_time_utils.h" #include "paimon/data/decimal.h" @@ -274,10 +274,9 @@ ParquetStatsExtractor::ExtractWithFileInfo(const std::shared_ptr& fi PAIMON_ASSIGN_OR_RAISE(std::unique_ptr input_stream, file_system->Open(path)); assert(input_stream); PAIMON_ASSIGN_OR_RAISE(uint64_t file_length, input_stream->Length()); - std::shared_ptr parquet_memory_pool = GetArrowPool(pool); - auto parquet_input_file = std::make_shared( - std::move(input_stream), parquet_memory_pool, file_length); - ::parquet::ReaderProperties read_properties(parquet_memory_pool.get()); + auto parquet_input_file = + std::make_shared(std::move(input_stream), pool, file_length); + ::parquet::ReaderProperties read_properties(AsArrowMemoryPool(*pool)); read_properties.enable_buffered_stream(); ::parquet::arrow::FileReaderBuilder file_reader_builder; PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_builder.Open(parquet_input_file, read_properties)); diff --git a/src/paimon/format/parquet/parquet_stats_extractor_test.cpp b/src/paimon/format/parquet/parquet_stats_extractor_test.cpp index d2ec2f98..ffc3dbbc 100644 --- a/src/paimon/format/parquet/parquet_stats_extractor_test.cpp +++ b/src/paimon/format/parquet/parquet_stats_extractor_test.cpp @@ -32,7 +32,6 @@ #include "arrow/memory_pool.h" #include "gtest/gtest.h" #include "paimon/common/data/binary_row.h" -#include "paimon/common/utils/arrow/mem_utils.h" #include "paimon/common/utils/date_time_utils.h" #include "paimon/common/utils/path_util.h" #include "paimon/common/utils/uuid.h" @@ -63,7 +62,6 @@ class ParquetStatsExtractorTest : public ::testing::Test { auto arrow_schema = arrow::schema(fields); auto struct_type = arrow::struct_(fields); std::map options; - std::shared_ptr pool = GetArrowPool(GetDefaultPool()); std::shared_ptr fs = std::make_shared(); std::string file_name; ASSERT_TRUE(UUID::Generate(&file_name)); @@ -72,8 +70,9 @@ class ParquetStatsExtractorTest : public ::testing::Test { fs->Create(file_path, /*overwrite=*/false)); ::parquet::WriterProperties::Builder builder; builder.enable_store_decimal_as_integer(); - ASSERT_OK_AND_ASSIGN(auto format_writer, - ParquetFormatWriter::Create(out, arrow_schema, builder.build(), pool)); + ASSERT_OK_AND_ASSIGN( + auto format_writer, + ParquetFormatWriter::Create(out, arrow_schema, builder.build(), GetDefaultPool())); auto array = arrow::ipc::internal::json::ArrayFromJSON(struct_type, input).ValueOrDie(); auto arrow_array = std::make_unique(); ASSERT_TRUE(arrow::ExportArray(*array, arrow_array.get()).ok()); @@ -268,11 +267,10 @@ TEST_F(ParquetStatsExtractorTest, TestNullForAllType) { ASSERT_OK_AND_ASSIGN(std::shared_ptr out, fs->Create(file_name, /*overwrite=*/false)); auto pool = GetDefaultPool(); - std::shared_ptr arrow_pool = GetArrowPool(pool); ::parquet::WriterProperties::Builder builder; builder.enable_store_decimal_as_integer(); ASSERT_OK_AND_ASSIGN(auto format_writer, - ParquetFormatWriter::Create(out, schema, builder.build(), arrow_pool)); + ParquetFormatWriter::Create(out, schema, builder.build(), pool)); auto src_array = std::dynamic_pointer_cast( arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({fields}), R"([ [null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null] diff --git a/src/paimon/format/parquet/parquet_timestamp_converter.cpp b/src/paimon/format/parquet/parquet_timestamp_converter.cpp index f12ad4cc..2b47cb31 100644 --- a/src/paimon/format/parquet/parquet_timestamp_converter.cpp +++ b/src/paimon/format/parquet/parquet_timestamp_converter.cpp @@ -22,6 +22,7 @@ #include "arrow/type.h" #include "fmt/format.h" +#include "paimon/common/utils/arrow/arrow_memory_pool_adaptor.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/common/utils/date_time_utils.h" #include "paimon/core/casting/timestamp_to_timestamp_cast_executor.h" @@ -148,7 +149,7 @@ Result ParquetTimestampConverter::NeedCastArrayForTimestamp( Result> ParquetTimestampConverter::CastArrayForTimestamp( const std::shared_ptr& array, const std::shared_ptr& target_data_type, - const std::shared_ptr& arrow_pool) { + const std::shared_ptr& memory_pool) { arrow::Type::type type = array->type()->id(); switch (type) { case arrow::Type::type::STRUCT: { @@ -161,7 +162,7 @@ Result> ParquetTimestampConverter::CastArrayForTim const auto& field = struct_array->field(i); PAIMON_ASSIGN_OR_RAISE( std::shared_ptr sub_array, - CastArrayForTimestamp(field, target_data_type->field(i)->type(), arrow_pool)); + CastArrayForTimestamp(field, target_data_type->field(i)->type(), memory_pool)); target_sub_arrays.push_back(sub_array); target_names.push_back(target_data_type->field(i)->name()); } @@ -177,10 +178,10 @@ Result> ParquetTimestampConverter::CastArrayForTim auto* map_type = arrow::internal::checked_cast(target_data_type.get()); PAIMON_ASSIGN_OR_RAISE( std::shared_ptr key_array, - CastArrayForTimestamp(map_array->keys(), map_type->key_type(), arrow_pool)); + CastArrayForTimestamp(map_array->keys(), map_type->key_type(), memory_pool)); PAIMON_ASSIGN_OR_RAISE( std::shared_ptr item_array, - CastArrayForTimestamp(map_array->items(), map_type->item_type(), arrow_pool)); + CastArrayForTimestamp(map_array->items(), map_type->item_type(), memory_pool)); return std::make_shared( arrow::map(key_array->type(), item_array->type()), map_array->length(), map_array->value_offsets(), key_array, item_array, map_array->null_bitmap(), @@ -192,7 +193,7 @@ Result> ParquetTimestampConverter::CastArrayForTim arrow::internal::checked_cast(target_data_type.get()); PAIMON_ASSIGN_OR_RAISE( std::shared_ptr value_array, - CastArrayForTimestamp(list_array->values(), list_type->value_type(), arrow_pool)); + CastArrayForTimestamp(list_array->values(), list_type->value_type(), memory_pool)); return std::make_shared( arrow::list(value_array->type()), list_array->length(), list_array->value_offsets(), value_array, list_array->null_bitmap(), list_array->null_count(), @@ -211,7 +212,7 @@ Result> ParquetTimestampConverter::CastArrayForTim auto cast_executor = std::make_shared(); PAIMON_ASSIGN_OR_RAISE( std::shared_ptr target_array, - cast_executor->Cast(array, target_data_type, arrow_pool.get())); + cast_executor->Cast(array, target_data_type, AsArrowMemoryPool(*memory_pool))); return target_array; } if (src_type->timezone() != ts_target_type->timezone()) { diff --git a/src/paimon/format/parquet/parquet_timestamp_converter.h b/src/paimon/format/parquet/parquet_timestamp_converter.h index b1283adc..41fdd5f1 100644 --- a/src/paimon/format/parquet/parquet_timestamp_converter.h +++ b/src/paimon/format/parquet/parquet_timestamp_converter.h @@ -38,7 +38,7 @@ class ParquetTimestampConverter { static Result> CastArrayForTimestamp( const std::shared_ptr& array, const std::shared_ptr& target_data_type, - const std::shared_ptr& arrow_pool); + const std::shared_ptr& memory_pool); }; } // namespace paimon::parquet diff --git a/src/paimon/format/parquet/parquet_timestamp_converter_test.cpp b/src/paimon/format/parquet/parquet_timestamp_converter_test.cpp index 7490ddfb..65e3eb98 100644 --- a/src/paimon/format/parquet/parquet_timestamp_converter_test.cpp +++ b/src/paimon/format/parquet/parquet_timestamp_converter_test.cpp @@ -21,7 +21,6 @@ #include "arrow/api.h" #include "arrow/ipc/api.h" #include "gtest/gtest.h" -#include "paimon/common/utils/arrow/mem_utils.h" #include "paimon/common/utils/date_time_utils.h" #include "paimon/memory/memory_pool.h" #include "paimon/testing/utils/testharness.h" @@ -119,10 +118,9 @@ TEST(ParquetTimestampConverterTest, TestCastArrayForTimestamp) { ])") .ValueOrDie()); - std::shared_ptr pool = GetArrowPool(GetDefaultPool()); ASSERT_OK_AND_ASSIGN(std::shared_ptr result_array, ParquetTimestampConverter::CastArrayForTimestamp( - array, arrow::struct_(target_fields), pool)); + array, arrow::struct_(target_fields), GetDefaultPool())); auto expected_array = std::dynamic_pointer_cast( arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(target_fields), R"([ diff --git a/src/paimon/format/parquet/parquet_writer_builder.cpp b/src/paimon/format/parquet/parquet_writer_builder.cpp index e2dc013d..6abd6c34 100644 --- a/src/paimon/format/parquet/parquet_writer_builder.cpp +++ b/src/paimon/format/parquet/parquet_writer_builder.cpp @@ -22,6 +22,7 @@ #include "arrow/util/compression.h" #include "arrow/util/type_fwd.h" #include "fmt/format.h" +#include "paimon/common/utils/arrow/arrow_memory_pool_adaptor.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/common/utils/options_utils.h" #include "paimon/common/utils/string_utils.h" @@ -42,7 +43,7 @@ Result> ParquetWriterBuilder::Build( const std::shared_ptr& out, const std::string& compression) { PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<::parquet::WriterProperties> writer_properties, PrepareWriterProperties(compression)); - return ParquetFormatWriter::Create(out, schema_, writer_properties, pool_); + return ParquetFormatWriter::Create(out, schema_, writer_properties, memory_pool_); } Result> ParquetWriterBuilder::PrepareWriterProperties( @@ -52,7 +53,7 @@ Result> ParquetWriterBuilder::Prepa arrow::Compression::type compression_type, arrow::util::Codec::GetCompressionType(StringUtils::ToLowerCase(compression))); ::parquet::WriterProperties::Builder builder; - builder.memory_pool(pool_.get()); + builder.memory_pool(AsArrowMemoryPool(*memory_pool_)); builder.write_batch_size(batch_size_); builder.compression(compression_type); PAIMON_ASSIGN_OR_RAISE( diff --git a/src/paimon/format/parquet/parquet_writer_builder.h b/src/paimon/format/parquet/parquet_writer_builder.h index e0a78007..9e4cae6b 100644 --- a/src/paimon/format/parquet/parquet_writer_builder.h +++ b/src/paimon/format/parquet/parquet_writer_builder.h @@ -26,7 +26,6 @@ #include "arrow/type.h" #include "arrow/util/compression.h" #include "arrow/util/type_fwd.h" -#include "paimon/common/utils/arrow/mem_utils.h" #include "paimon/format/format_writer.h" #include "paimon/format/writer_builder.h" #include "paimon/fs/file_system.h" @@ -51,14 +50,14 @@ class ParquetWriterBuilder : public WriterBuilder { ParquetWriterBuilder(const std::shared_ptr& schema, int32_t batch_size, const std::map& options) : batch_size_(batch_size), - pool_(GetArrowPool(GetDefaultPool())), + memory_pool_(GetDefaultPool()), schema_(schema), options_(options) { assert(schema); } WriterBuilder* WithMemoryPool(const std::shared_ptr& pool) override { - pool_ = GetArrowPool(pool); + memory_pool_ = pool; return this; } @@ -75,7 +74,7 @@ class ParquetWriterBuilder : public WriterBuilder { const std::string& compression); int32_t batch_size_ = -1; - std::shared_ptr pool_; + std::shared_ptr memory_pool_; std::shared_ptr schema_; std::map options_; }; diff --git a/src/paimon/format/parquet/predicate_pushdown_test.cpp b/src/paimon/format/parquet/predicate_pushdown_test.cpp index bf834b31..383b5bd2 100644 --- a/src/paimon/format/parquet/predicate_pushdown_test.cpp +++ b/src/paimon/format/parquet/predicate_pushdown_test.cpp @@ -29,7 +29,6 @@ #include "arrow/c/bridge.h" #include "arrow/ipc/json_simple.h" #include "gtest/gtest.h" -#include "paimon/common/utils/arrow/mem_utils.h" #include "paimon/common/utils/decimal_utils.h" #include "paimon/data/decimal.h" #include "paimon/data/timestamp.h" @@ -59,7 +58,6 @@ class PredicatePushdownTest : public ::testing::Test { public: void SetUp() override { pool_ = GetDefaultPool(); - arrow_pool_ = GetArrowPool(pool_); batch_size_ = 10; arrow::FieldVector fields = { @@ -91,9 +89,8 @@ class PredicatePushdownTest : public ::testing::Test { ::parquet::WriterProperties::Builder builder; builder.write_batch_size(batch_size_); auto writer_properties = builder.build(); - ASSERT_OK_AND_ASSIGN( - auto format_writer, - ParquetFormatWriter::Create(out, data_schema, writer_properties, arrow_pool_)); + ASSERT_OK_AND_ASSIGN(auto format_writer, ParquetFormatWriter::Create( + out, data_schema, writer_properties, pool_)); ASSERT_OK(format_writer->AddBatch(data_arrow_array.get())); ASSERT_OK(format_writer->Finish()); ASSERT_OK(out->Close()); @@ -106,14 +103,14 @@ class PredicatePushdownTest : public ::testing::Test { paimon::parquet::DEFAULT_PARQUET_READ_PREDICATE_NODE_COUNT_LIMIT) { ASSERT_OK_AND_ASSIGN(std::shared_ptr in, fs_->Open(file_name_)); ASSERT_OK_AND_ASSIGN(uint64_t length, in->Length()); - auto in_stream = std::make_shared(in, arrow_pool_, length); + auto in_stream = std::make_shared(in, pool_, length); std::map options; options[paimon::parquet::PARQUET_READ_PREDICATE_NODE_COUNT_LIMIT] = std::to_string(predicate_node_count_limit); - ASSERT_OK_AND_ASSIGN(auto batch_reader, - ParquetFileBatchReader::Create(std::move(in_stream), arrow_pool_, - options, batch_size_)); + ASSERT_OK_AND_ASSIGN( + auto batch_reader, + ParquetFileBatchReader::Create(std::move(in_stream), pool_, options, batch_size_)); std::unique_ptr c_schema = std::make_unique(); auto arrow_status = arrow::ExportSchema(*read_schema, c_schema.get()); ASSERT_TRUE(arrow_status.ok()); @@ -131,7 +128,6 @@ class PredicatePushdownTest : public ::testing::Test { } private: - std::shared_ptr arrow_pool_; std::shared_ptr pool_; int32_t batch_size_; std::shared_ptr struct_array_;