From 03e8d03de65dc69787e28d0b118cb831692de9e2 Mon Sep 17 00:00:00 2001 From: Zehua Zou Date: Thu, 28 May 2026 20:44:03 +0800 Subject: [PATCH 1/3] feat: add executor pool support --- .../IcebergThirdpartyToolchain.cmake | 5 +- src/iceberg/CMakeLists.txt | 1 + src/iceberg/delete_file_index.cc | 215 ++++++----- src/iceberg/delete_file_index.h | 5 + .../manifest/manifest_filter_manager.cc | 144 ++++++-- .../manifest/manifest_filter_manager.h | 31 +- src/iceberg/manifest/manifest_group.cc | 117 ++++-- src/iceberg/manifest/manifest_group.h | 5 + .../manifest/manifest_merge_manager.cc | 44 ++- src/iceberg/manifest/manifest_merge_manager.h | 5 + src/iceberg/meson.build | 1 + src/iceberg/result.h | 9 + src/iceberg/table_scan.cc | 15 +- src/iceberg/table_scan.h | 7 + src/iceberg/test/CMakeLists.txt | 1 + src/iceberg/test/arrow_test.cc | 81 ++++- src/iceberg/test/executor.h | 51 +++ src/iceberg/test/manifest_group_test.cc | 39 ++ src/iceberg/test/meson.build | 1 + src/iceberg/test/retry.h | 76 ++++ src/iceberg/test/retry_util_test.cc | 336 +++++++----------- src/iceberg/test/table_scan_test.cc | 67 ++++ src/iceberg/test/task_group_test.cc | 321 +++++++++++++++++ src/iceberg/update/expire_snapshots.cc | 43 ++- src/iceberg/update/expire_snapshots.h | 5 + src/iceberg/update/snapshot_update.cc | 15 +- src/iceberg/update/snapshot_update.h | 8 + src/iceberg/util/executor.h | 43 +++ src/iceberg/util/functional.h | 82 +++++ src/iceberg/util/lazy.h | 12 +- src/iceberg/util/meson.build | 3 + src/iceberg/util/retry_util.cc | 45 +-- src/iceberg/util/retry_util.h | 225 ++++++------ src/iceberg/util/task_group.cc | 100 ++++++ src/iceberg/util/task_group.h | 107 ++++++ 35 files changed, 1692 insertions(+), 573 deletions(-) create mode 100644 src/iceberg/test/executor.h create mode 100644 src/iceberg/test/retry.h create mode 100644 src/iceberg/test/task_group_test.cc create mode 100644 src/iceberg/util/executor.h create mode 100644 src/iceberg/util/functional.h create mode 100644 src/iceberg/util/task_group.cc create mode 100644 src/iceberg/util/task_group.h diff --git a/cmake_modules/IcebergThirdpartyToolchain.cmake b/cmake_modules/IcebergThirdpartyToolchain.cmake index a0d418e05..ea5fa951a 100644 --- a/cmake_modules/IcebergThirdpartyToolchain.cmake +++ b/cmake_modules/IcebergThirdpartyToolchain.cmake @@ -105,6 +105,7 @@ function(resolve_arrow_dependency) set(ARROW_S3 ${ICEBERG_S3}) set(ARROW_JSON ON) set(ARROW_PARQUET ON) + set(ARROW_ENABLE_THREADING ON) set(ARROW_SIMD_LEVEL "NONE") set(ARROW_RUNTIME_SIMD_LEVEL "NONE") set(ARROW_POSITION_INDEPENDENT_CODE ON) @@ -167,8 +168,8 @@ function(resolve_arrow_dependency) endif() # Arrow's exported static target interface may reference system libraries - # (e.g. OpenSSL, CURL, ZLIB) that consumers need to find. - list(APPEND ICEBERG_SYSTEM_DEPENDENCIES ZLIB) + # (e.g. Threads, OpenSSL, CURL, ZLIB) that consumers need to find. + list(APPEND ICEBERG_SYSTEM_DEPENDENCIES Threads ZLIB) if(ARROW_S3) list(APPEND ICEBERG_SYSTEM_DEPENDENCIES OpenSSL CURL) endif() diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 18cf70bdb..14964ca41 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -112,6 +112,7 @@ set(ICEBERG_SOURCES util/snapshot_util.cc util/string_util.cc util/struct_like_set.cc + util/task_group.cc util/temporal_util.cc util/timepoint.cc util/transform_util.cc diff --git a/src/iceberg/delete_file_index.cc b/src/iceberg/delete_file_index.cc index 7c8c35032..c4a5b0ca1 100644 --- a/src/iceberg/delete_file_index.cc +++ b/src/iceberg/delete_file_index.cc @@ -22,7 +22,9 @@ #include #include #include +#include #include +#include #include #include "iceberg/expression/expression.h" @@ -38,6 +40,7 @@ #include "iceberg/util/checked_cast.h" #include "iceberg/util/content_file_util.h" #include "iceberg/util/macros.h" +#include "iceberg/util/task_group.h" namespace iceberg { @@ -528,107 +531,159 @@ DeleteFileIndex::Builder& DeleteFileIndex::Builder::IgnoreResiduals() { return *this; } +DeleteFileIndex::Builder& DeleteFileIndex::Builder::PlanWith(OptionalExecutor executor) { + executor_ = executor; + return *this; +} + Result> DeleteFileIndex::Builder::LoadDeleteFiles() { - // Build expression caches per spec ID - std::unordered_map> part_expr_cache; + // TODO(zehua): Replace with a thread-safe LRU cache. + std::shared_mutex projected_expr_cache_mutex; + std::unordered_map> projected_expr_cache; + std::shared_mutex eval_cache_mutex; std::unordered_map> eval_cache; auto data_filter = ignore_residuals_ ? True::Instance() : data_filter_; - // Filter and read manifests into manifest entries - std::vector files; - for (const auto& manifest : delete_manifests_) { - if (manifest.content != ManifestContent::kDeletes) { - continue; + auto and_filters = + [](std::shared_ptr left, + std::shared_ptr right) -> Result> { + if (left && right) { + return And::MakeFolded(std::move(left), std::move(right)); } - if (!manifest.has_added_files() && !manifest.has_existing_files()) { - continue; + if (right) { + return right; + } + return left; + }; + + auto get_projected_expr = [&](int32_t spec_id, + const std::shared_ptr& spec) + -> Result> { + if (!data_filter_) { + return std::shared_ptr(); } - const int32_t spec_id = manifest.partition_spec_id; - auto spec_iter = specs_by_id_.find(spec_id); - ICEBERG_CHECK(spec_iter != specs_by_id_.cend(), - "Partition spec ID {} not found when loading delete files", spec_id); + { + std::shared_lock lock(projected_expr_cache_mutex); + auto iter = projected_expr_cache.find(spec_id); + if (iter != projected_expr_cache.end()) { + return iter->second; + } + } - const auto& spec = spec_iter->second; + std::lock_guard lock(projected_expr_cache_mutex); + auto iter = projected_expr_cache.find(spec_id); + if (iter != projected_expr_cache.end()) { + return iter->second; + } - // Get or compute projected partition expression - if (!part_expr_cache.contains(spec_id) && data_filter_) { - auto projector = Projections::Inclusive(*spec, *schema_, case_sensitive_); - ICEBERG_ASSIGN_OR_RAISE(auto projected, projector->Project(data_filter_)); - part_expr_cache[spec_id] = std::move(projected); + auto projector = Projections::Inclusive(*spec, *schema_, case_sensitive_); + ICEBERG_ASSIGN_OR_RAISE(auto projected, projector->Project(data_filter_)); + auto [inserted_iter, _] = projected_expr_cache.emplace(spec_id, std::move(projected)); + return inserted_iter->second; + }; + + auto get_manifest_evaluator = + [&](int32_t spec_id, const std::shared_ptr& spec, + const std::shared_ptr& filter) -> Result { + if (!filter) { + return nullptr; } - // Get or create manifest evaluator - if (!eval_cache.contains(spec_id)) { - auto filter = partition_filter_; - if (auto it = part_expr_cache.find(spec_id); it != part_expr_cache.cend()) { - if (filter) { - ICEBERG_ASSIGN_OR_RAISE(filter, And::Make(filter, it->second)); - } else { - filter = it->second; - } - } - if (filter) { - ICEBERG_ASSIGN_OR_RAISE(auto evaluator, - ManifestEvaluator::MakePartitionFilter( - std::move(filter), spec, *schema_, case_sensitive_)); - eval_cache[spec_id] = std::move(evaluator); + { + std::shared_lock lock(eval_cache_mutex); + auto iter = eval_cache.find(spec_id); + if (iter != eval_cache.end()) { + return iter->second.get(); } } - // Evaluate manifest against filter - if (auto it = eval_cache.find(spec_id); it != eval_cache.end()) { - ICEBERG_ASSIGN_OR_RAISE(auto should_match, it->second->Evaluate(manifest)); - if (!should_match) { - continue; // Manifest doesn't match filter - } + std::lock_guard lock(eval_cache_mutex); + auto iter = eval_cache.find(spec_id); + if (iter != eval_cache.end()) { + return iter->second.get(); } - // Read manifest entries - ICEBERG_ASSIGN_OR_RAISE(auto reader, - ManifestReader::Make(manifest, io_, schema_, spec)); - - auto partition_filter = partition_filter_; - if (auto it = part_expr_cache.find(spec_id); it != part_expr_cache.cend()) { - if (partition_filter) { - ICEBERG_ASSIGN_OR_RAISE(partition_filter, - And::Make(partition_filter, it->second)); - } else { - partition_filter = it->second; + ICEBERG_ASSIGN_OR_RAISE(auto evaluator, ManifestEvaluator::MakePartitionFilter( + filter, spec, *schema_, case_sensitive_)); + auto [inserted_iter, _] = eval_cache.emplace(spec_id, std::move(evaluator)); + return inserted_iter->second.get(); + }; + + std::vector> manifest_results(delete_manifests_.size()); + auto read_tasks = TaskGroup().SetExecutor(executor_); + for (auto&& [manifest, manifest_result] : + std::views::zip(delete_manifests_, manifest_results)) { + read_tasks.Submit([&]() -> Status { + if (manifest.content != ManifestContent::kDeletes) { + return {}; } - } - if (partition_filter) { - reader->FilterPartitions(std::move(partition_filter)); - } - if (partition_set_) { - reader->FilterPartitions(partition_set_); - } - reader->FilterRows(data_filter).CaseSensitive(case_sensitive_).TryDropStats(); - - ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->LiveEntries()); - files.reserve(files.size() + entries.size()); - - for (auto& entry : entries) { - ICEBERG_CHECK(entry.data_file != nullptr, "ManifestEntry must have a data file"); - ICEBERG_CHECK(entry.sequence_number.has_value(), - "Missing sequence number from delete file: {}", - entry.data_file->file_path); - if (entry.sequence_number.value() > min_sequence_number_) { - auto& file = *entry.data_file; - // keep minimum stats to avoid memory pressure - std::unordered_set columns = - file.content == DataFile::Content::kPositionDeletes - ? std::unordered_set{MetadataColumns::kDeleteFilePathColumnId} - : std::unordered_set(file.equality_ids.begin(), - file.equality_ids.end()); - ContentFileUtil::DropUnselectedStats(*entry.data_file, columns); - files.emplace_back(std::move(entry)); + if (!manifest.has_added_files() && !manifest.has_existing_files()) { + return {}; } - } + + const int32_t spec_id = manifest.partition_spec_id; + auto spec_iter = specs_by_id_.find(spec_id); + ICEBERG_CHECK(spec_iter != specs_by_id_.cend(), + "Partition spec ID {} not found when loading delete files", spec_id); + + const auto& spec = spec_iter->second; + + ICEBERG_ASSIGN_OR_RAISE(auto projected_data_filter, + get_projected_expr(spec_id, spec)); + ICEBERG_ASSIGN_OR_RAISE(auto delete_partition_filter, + and_filters(partition_filter_, projected_data_filter)); + ICEBERG_ASSIGN_OR_RAISE( + auto manifest_evaluator, + get_manifest_evaluator(spec_id, spec, delete_partition_filter)); + if (manifest_evaluator != nullptr) { + ICEBERG_ASSIGN_OR_RAISE(auto should_match, + manifest_evaluator->Evaluate(manifest)); + if (!should_match) { + return {}; + } + } + + // Read manifest entries + ICEBERG_ASSIGN_OR_RAISE(auto reader, + ManifestReader::Make(manifest, io_, schema_, spec)); + + if (delete_partition_filter) { + reader->FilterPartitions(std::move(delete_partition_filter)); + } + if (partition_set_) { + reader->FilterPartitions(partition_set_); + } + reader->FilterRows(data_filter).CaseSensitive(case_sensitive_).TryDropStats(); + + ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->LiveEntries()); + manifest_result.reserve(entries.size()); + + for (auto& entry : entries) { + ICEBERG_CHECK(entry.data_file != nullptr, "ManifestEntry must have a data file"); + ICEBERG_CHECK(entry.sequence_number.has_value(), + "Missing sequence number from delete file: {}", + entry.data_file->file_path); + if (entry.sequence_number.value() > min_sequence_number_) { + auto& file = *entry.data_file; + // keep minimum stats to avoid memory pressure + std::unordered_set columns = + file.content == DataFile::Content::kPositionDeletes + ? std::unordered_set{MetadataColumns::kDeleteFilePathColumnId} + : std::unordered_set(file.equality_ids.begin(), + file.equality_ids.end()); + ContentFileUtil::DropUnselectedStats(*entry.data_file, columns); + manifest_result.emplace_back(std::move(entry)); + } + } + return {}; + }); } + ICEBERG_RETURN_UNEXPECTED(std::move(read_tasks).Run()); - return files; + return manifest_results | std::views::join | std::views::as_rvalue | + std::ranges::to>(); } Status DeleteFileIndex::Builder::AddDV( diff --git a/src/iceberg/delete_file_index.h b/src/iceberg/delete_file_index.h index 5444281a0..9f3768edf 100644 --- a/src/iceberg/delete_file_index.h +++ b/src/iceberg/delete_file_index.h @@ -35,6 +35,7 @@ #include "iceberg/result.h" #include "iceberg/type_fwd.h" #include "iceberg/util/error_collector.h" +#include "iceberg/util/executor.h" #include "iceberg/util/partition_value_util.h" namespace iceberg { @@ -356,6 +357,9 @@ class ICEBERG_EXPORT DeleteFileIndex::Builder : public ErrorCollector { /// \brief Ignore residual expressions after partition filtering. Builder& IgnoreResiduals(); + /// \brief Configure an optional executor for reading delete manifests. + Builder& PlanWith(OptionalExecutor executor); + /// \brief Build the DeleteFileIndex. Result> Build(); @@ -388,6 +392,7 @@ class ICEBERG_EXPORT DeleteFileIndex::Builder : public ErrorCollector { std::shared_ptr data_filter_; std::shared_ptr partition_filter_; std::shared_ptr partition_set_; + OptionalExecutor executor_; bool case_sensitive_ = true; bool ignore_residuals_ = false; }; diff --git a/src/iceberg/manifest/manifest_filter_manager.cc b/src/iceberg/manifest/manifest_filter_manager.cc index 086c94a78..9a977a544 100644 --- a/src/iceberg/manifest/manifest_filter_manager.cc +++ b/src/iceberg/manifest/manifest_filter_manager.cc @@ -19,6 +19,9 @@ #include "iceberg/manifest/manifest_filter_manager.h" +#include +#include +#include #include #include #include @@ -37,6 +40,7 @@ #include "iceberg/snapshot.h" #include "iceberg/table_metadata.h" #include "iceberg/util/macros.h" +#include "iceberg/util/task_group.h" namespace iceberg { @@ -76,15 +80,27 @@ ManifestFilterManager::~ManifestFilterManager() = default; Status ManifestFilterManager::DeleteByRowFilter(std::shared_ptr expr) { ICEBERG_PRECHECK(expr != nullptr, "Cannot delete files using filter: null"); ICEBERG_ASSIGN_OR_RAISE(delete_expr_, Or::MakeFolded(delete_expr_, std::move(expr))); - manifest_evaluator_cache_.clear(); - residual_evaluator_cache_.clear(); + { + std::lock_guard lock(manifest_evaluator_cache_mutex_); + manifest_evaluator_cache_.clear(); + } + { + std::lock_guard lock(residual_evaluator_cache_mutex_); + residual_evaluator_cache_.clear(); + } return {}; } void ManifestFilterManager::CaseSensitive(bool case_sensitive) { case_sensitive_ = case_sensitive; - manifest_evaluator_cache_.clear(); - residual_evaluator_cache_.clear(); + { + std::lock_guard lock(manifest_evaluator_cache_mutex_); + manifest_evaluator_cache_.clear(); + } + { + std::lock_guard lock(residual_evaluator_cache_mutex_); + residual_evaluator_cache_.clear(); + } } void ManifestFilterManager::DeleteFile(std::string_view path) { @@ -117,6 +133,11 @@ bool ManifestFilterManager::ContainsDeletes() const { !drop_partitions_.empty(); } +ManifestFilterManager& ManifestFilterManager::PlanWith(OptionalExecutor executor) { + executor_ = executor; + return *this; +} + Result ManifestFilterManager::CanContainDroppedFiles(const ManifestFile&) const { // TODO(Guotao): Use the manifest descriptor to skip unrelated object-delete // manifests once object-delete partitions are tracked separately. @@ -179,25 +200,53 @@ Result ManifestFilterManager::CanContainDeletedFiles( Result ManifestFilterManager::GetManifestEvaluator( const std::shared_ptr& schema, const PartitionSpecsById& specs_by_id, int32_t spec_id) { - auto& evaluator = manifest_evaluator_cache_[spec_id]; - if (!evaluator) { - ICEBERG_ASSIGN_OR_RAISE(auto spec, PartitionSpecById(specs_by_id, spec_id)); - ICEBERG_ASSIGN_OR_RAISE(evaluator, ManifestEvaluator::MakeRowFilter( - delete_expr_, spec, *schema, case_sensitive_)); + { + std::shared_lock lock(manifest_evaluator_cache_mutex_); + auto iter = manifest_evaluator_cache_.find(spec_id); + if (iter != manifest_evaluator_cache_.end()) { + return iter->second.get(); + } } - return evaluator.get(); + + std::lock_guard lock(manifest_evaluator_cache_mutex_); + auto iter = manifest_evaluator_cache_.find(spec_id); + if (iter != manifest_evaluator_cache_.end()) { + return iter->second.get(); + } + + ICEBERG_ASSIGN_OR_RAISE(auto spec, PartitionSpecById(specs_by_id, spec_id)); + ICEBERG_ASSIGN_OR_RAISE( + auto evaluator, + ManifestEvaluator::MakeRowFilter(delete_expr_, spec, *schema, case_sensitive_)); + auto [inserted_iter, _] = + manifest_evaluator_cache_.emplace(spec_id, std::move(evaluator)); + return inserted_iter->second.get(); } Result ManifestFilterManager::GetResidualEvaluator( const std::shared_ptr& schema, const PartitionSpecsById& specs_by_id, int32_t spec_id) { - auto& evaluator = residual_evaluator_cache_[spec_id]; - if (!evaluator) { - ICEBERG_ASSIGN_OR_RAISE(auto spec, PartitionSpecById(specs_by_id, spec_id)); - ICEBERG_ASSIGN_OR_RAISE(evaluator, ResidualEvaluator::Make(delete_expr_, *spec, - *schema, case_sensitive_)); + { + std::shared_lock lock(residual_evaluator_cache_mutex_); + auto iter = residual_evaluator_cache_.find(spec_id); + if (iter != residual_evaluator_cache_.end()) { + return iter->second.get(); + } } - return evaluator.get(); + + std::lock_guard lock(residual_evaluator_cache_mutex_); + auto iter = residual_evaluator_cache_.find(spec_id); + if (iter != residual_evaluator_cache_.end()) { + return iter->second.get(); + } + + ICEBERG_ASSIGN_OR_RAISE(auto spec, PartitionSpecById(specs_by_id, spec_id)); + ICEBERG_ASSIGN_OR_RAISE( + auto evaluator, + ResidualEvaluator::Make(delete_expr_, *spec, *schema, case_sensitive_)); + auto [inserted_iter, _] = + residual_evaluator_cache_.emplace(spec_id, std::move(evaluator)); + return inserted_iter->second.get(); } Result ManifestFilterManager::ShouldDelete(const ManifestEntry& entry, @@ -262,16 +311,15 @@ bool ManifestFilterManager::CanTrustManifestReferences( return false; } -Result ManifestFilterManager::FilterManifest( +Result ManifestFilterManager::FilterManifest( const std::shared_ptr& schema, const PartitionSpecsById& specs_by_id, const ManifestFile& manifest, bool trust_manifest_references, - const ManifestWriterFactory& writer_factory, - std::unordered_set& found_paths) { + const ManifestWriterFactory& writer_factory) { ICEBERG_ASSIGN_OR_RAISE( auto can_contain_deleted_files, CanContainDeletedFiles(manifest, schema, specs_by_id, trust_manifest_references)); if (!can_contain_deleted_files) { - return manifest; + return FilterManifestResult{.manifest = manifest}; } int32_t spec_id = manifest.partition_spec_id; @@ -283,11 +331,11 @@ Result ManifestFilterManager::FilterManifest( ICEBERG_ASSIGN_OR_RAISE(auto has_deleted_files, ManifestHasDeletedFiles(entries, schema, specs_by_id, spec_id)); if (!has_deleted_files) { - return manifest; + return FilterManifestResult{.manifest = manifest}; } return FilterManifestWithDeletedFiles(entries, spec_id, schema, specs_by_id, - writer_factory, found_paths); + writer_factory); } Result ManifestFilterManager::ManifestHasDeletedFiles( @@ -303,11 +351,12 @@ Result ManifestFilterManager::ManifestHasDeletedFiles( return false; } -Result ManifestFilterManager::FilterManifestWithDeletedFiles( +Result +ManifestFilterManager::FilterManifestWithDeletedFiles( const std::vector& entries, int32_t manifest_spec_id, const std::shared_ptr& schema, const PartitionSpecsById& specs_by_id, - const ManifestWriterFactory& writer_factory, - std::unordered_set& found_paths) { + const ManifestWriterFactory& writer_factory) { + FilterManifestResult result; ICEBERG_ASSIGN_OR_RAISE(auto writer, writer_factory(manifest_spec_id, manifest_content_)); for (const auto& entry : entries) { @@ -315,12 +364,12 @@ Result ManifestFilterManager::FilterManifestWithDeletedFiles( ShouldDelete(entry, schema, specs_by_id, manifest_spec_id)); if (should_delete) { if (entry.data_file && delete_paths_.count(entry.data_file->file_path)) { - found_paths.insert(entry.data_file->file_path); + result.found_paths.insert(entry.data_file->file_path); } if (entry.data_file) { // TODO(Guotao): Track duplicate deletes and avoid full DataFile copies when // summary generation can use lighter records. - delete_files_.insert(std::make_shared(*entry.data_file)); + result.deleted_files.insert(std::make_shared(*entry.data_file)); } ICEBERG_RETURN_UNEXPECTED(writer->WriteDeletedEntry(entry)); } else { @@ -329,7 +378,8 @@ Result ManifestFilterManager::FilterManifestWithDeletedFiles( } ICEBERG_RETURN_UNEXPECTED(writer->Close()); - return writer->ToManifestFile(); + ICEBERG_ASSIGN_OR_RAISE(result.manifest, writer->ToManifestFile()); + return result; } Status ManifestFilterManager::ValidateRequiredDeletes( @@ -401,19 +451,35 @@ Result> ManifestFilterManager::FilterManifests( } bool trust_manifest_references = CanTrustManifestReferences(manifests); - manifest_evaluator_cache_.clear(); - residual_evaluator_cache_.clear(); + { + std::lock_guard lock(manifest_evaluator_cache_mutex_); + manifest_evaluator_cache_.clear(); + } + { + std::lock_guard lock(residual_evaluator_cache_mutex_); + residual_evaluator_cache_.clear(); + } + + std::vector filter_results(manifests.size()); + auto filter_tasks = TaskGroup().SetExecutor(executor_); + for (auto&& [manifest, result] : std::views::zip(manifests, filter_results)) { + filter_tasks.Submit([&]() -> Status { + ICEBERG_ASSIGN_OR_RAISE(result, + FilterManifest(schema, specs_by_id, *manifest, + trust_manifest_references, writer_factory)); + return {}; + }); + } + ICEBERG_RETURN_UNEXPECTED(std::move(filter_tasks).Run()); - // TODO(Guotao): Parallelize manifest filtering with per-manifest results, then - // merge found paths and deleted files after the loop. std::vector filtered; - filtered.reserve(manifests.size()); - for (const auto* manifest_ptr : manifests) { - ICEBERG_ASSIGN_OR_RAISE( - auto filtered_manifest, - FilterManifest(schema, specs_by_id, *manifest_ptr, trust_manifest_references, - writer_factory, found_paths)); - filtered.push_back(std::move(filtered_manifest)); + filtered.reserve(filter_results.size()); + for (auto& [manifest, result_found_paths, result_deleted_files] : filter_results) { + found_paths.merge(result_found_paths); + for (auto& deleted_file : result_deleted_files) { + delete_files_.insert(std::move(deleted_file)); + } + filtered.push_back(std::move(manifest)); } ICEBERG_RETURN_UNEXPECTED(ValidateRequiredDeletes(found_paths)); diff --git a/src/iceberg/manifest/manifest_filter_manager.h b/src/iceberg/manifest/manifest_filter_manager.h index 55258b2b1..deaf922c8 100644 --- a/src/iceberg/manifest/manifest_filter_manager.h +++ b/src/iceberg/manifest/manifest_filter_manager.h @@ -25,6 +25,7 @@ #include #include +#include #include #include #include @@ -36,6 +37,7 @@ #include "iceberg/result.h" #include "iceberg/type_fwd.h" #include "iceberg/util/data_file_set.h" +#include "iceberg/util/executor.h" #include "iceberg/util/partition_value_util.h" namespace iceberg { @@ -119,6 +121,9 @@ class ICEBERG_EXPORT ManifestFilterManager { /// \brief Returns true if any delete condition has been registered. bool ContainsDeletes() const; + /// \brief Configure an optional executor for manifest filtering. + ManifestFilterManager& PlanWith(OptionalExecutor executor); + /// \brief Apply all accumulated delete conditions to the base snapshot's manifests. /// /// Manifests that cannot possibly contain deleted files are returned unchanged. @@ -172,23 +177,26 @@ class ICEBERG_EXPORT ManifestFilterManager { bool CanTrustManifestReferences( const std::vector& manifests) const; - Result FilterManifest(const std::shared_ptr& schema, - const PartitionSpecsById& specs_by_id, - const ManifestFile& manifest, - bool trust_manifest_references, - const ManifestWriterFactory& writer_factory, - std::unordered_set& found_paths); + struct FilterManifestResult { + ManifestFile manifest; + std::unordered_set found_paths; + DataFileSet deleted_files; + }; + + Result FilterManifest( + const std::shared_ptr& schema, const PartitionSpecsById& specs_by_id, + const ManifestFile& manifest, bool trust_manifest_references, + const ManifestWriterFactory& writer_factory); Result ManifestHasDeletedFiles(const std::vector& entries, const std::shared_ptr& schema, const PartitionSpecsById& specs_by_id, int32_t manifest_spec_id); - Result FilterManifestWithDeletedFiles( + Result FilterManifestWithDeletedFiles( const std::vector& entries, int32_t manifest_spec_id, const std::shared_ptr& schema, const PartitionSpecsById& specs_by_id, - const ManifestWriterFactory& writer_factory, - std::unordered_set& found_paths); + const ManifestWriterFactory& writer_factory); Status ValidateRequiredDeletes( const std::unordered_set& found_paths) const; @@ -219,9 +227,14 @@ class ICEBERG_EXPORT ManifestFilterManager { bool fail_missing_delete_paths_{false}; bool fail_any_delete_{false}; bool case_sensitive_{true}; + OptionalExecutor executor_; + // TODO(zehua): Replace with a thread-safe LRU cache. + std::shared_mutex manifest_evaluator_cache_mutex_; std::unordered_map> manifest_evaluator_cache_; + // TODO(zehua): Replace with a thread-safe LRU cache. + std::shared_mutex residual_evaluator_cache_mutex_; std::unordered_map> residual_evaluator_cache_; }; diff --git a/src/iceberg/manifest/manifest_group.cc b/src/iceberg/manifest/manifest_group.cc index 61bb57da2..1850d1a50 100644 --- a/src/iceberg/manifest/manifest_group.cc +++ b/src/iceberg/manifest/manifest_group.cc @@ -21,6 +21,9 @@ #include #include +#include +#include +#include #include #include #include @@ -42,6 +45,7 @@ #include "iceberg/util/checked_cast.h" #include "iceberg/util/content_file_util.h" #include "iceberg/util/macros.h" +#include "iceberg/util/task_group.h" namespace iceberg { @@ -189,6 +193,12 @@ ManifestGroup& ManifestGroup::ColumnsToKeepStats(std::unordered_set col return *this; } +ManifestGroup& ManifestGroup::PlanWith(OptionalExecutor executor) { + executor_ = executor; + delete_index_builder_.PlanWith(executor); + return *this; +} + Result>> ManifestGroup::PlanFiles() { auto create_file_scan_tasks = [this](std::vector&& entries, @@ -343,10 +353,23 @@ Result> ManifestGroup::MakeReader( Result>> ManifestGroup::ReadEntries() { + // TODO(zehua): Replace with a thread-safe LRU cache. + std::shared_mutex eval_cache_mutex; std::unordered_map> eval_cache; + auto get_manifest_evaluator = [&](int32_t spec_id) -> Result { - if (eval_cache.contains(spec_id)) { - return eval_cache[spec_id].get(); + { + std::shared_lock lock(eval_cache_mutex); + auto iter = eval_cache.find(spec_id); + if (iter != eval_cache.end()) { + return iter->second.get(); + } + } + + std::lock_guard lock(eval_cache_mutex); + auto iter = eval_cache.find(spec_id); + if (iter != eval_cache.end()) { + return iter->second.get(); } auto spec_iter = specs_by_id_.find(spec_id); @@ -376,57 +399,73 @@ ManifestGroup::ReadEntries() { Evaluator::Make(*DataFileFilterSchema(), file_filter_, case_sensitive_)); } - std::unordered_map> result; + std::vector>> manifest_results( + data_manifests_.size()); - // TODO(gangwu): Parallelize reading manifests - for (const auto& manifest : data_manifests_) { - const int32_t spec_id = manifest.partition_spec_id; + auto read_tasks = TaskGroup().SetExecutor(executor_); + for (auto&& [manifest, manifest_result] : + std::views::zip(data_manifests_, manifest_results)) { + read_tasks.Submit([&]() -> Status { + const int32_t spec_id = manifest.partition_spec_id; - ICEBERG_ASSIGN_OR_RAISE(auto manifest_evaluator, get_manifest_evaluator(spec_id)); - ICEBERG_ASSIGN_OR_RAISE(bool should_match, manifest_evaluator->Evaluate(manifest)); - if (!should_match) { - // Skip this manifest because it doesn't match partition filter - continue; - } + ICEBERG_ASSIGN_OR_RAISE(auto manifest_evaluator, get_manifest_evaluator(spec_id)); + ICEBERG_ASSIGN_OR_RAISE(bool should_match, manifest_evaluator->Evaluate(manifest)); + if (!should_match) { + // Skip this manifest because it doesn't match partition filter + return {}; + } - if (ignore_deleted_) { - // only scan manifests that have entries other than deletes - if (!manifest.has_added_files() && !manifest.has_existing_files()) { - continue; + if (ignore_deleted_) { + // only scan manifests that have entries other than deletes + if (!manifest.has_added_files() && !manifest.has_existing_files()) { + return {}; + } } - } - if (ignore_existing_) { - // only scan manifests that have entries other than existing - if (!manifest.has_added_files() && !manifest.has_deleted_files()) { - continue; + if (ignore_existing_) { + // only scan manifests that have entries other than existing + if (!manifest.has_added_files() && !manifest.has_deleted_files()) { + return {}; + } } - } - // Read manifest entries - ICEBERG_ASSIGN_OR_RAISE(auto reader, MakeReader(manifest)); - ICEBERG_ASSIGN_OR_RAISE(auto entries, - ignore_deleted_ ? reader->LiveEntries() : reader->Entries()); + // Read manifest entries + ICEBERG_ASSIGN_OR_RAISE(auto reader, MakeReader(manifest)); + ICEBERG_ASSIGN_OR_RAISE( + auto entries, ignore_deleted_ ? reader->LiveEntries() : reader->Entries()); - for (auto& entry : entries) { - if (ignore_existing_ && entry.status == ManifestStatus::kExisting) { - continue; - } + for (auto& entry : entries) { + if (ignore_existing_ && entry.status == ManifestStatus::kExisting) { + continue; + } - if (data_file_evaluator != nullptr) { - DataFileStructLike data_file(*entry.data_file); - ICEBERG_ASSIGN_OR_RAISE(bool should_match, - data_file_evaluator->Evaluate(data_file)); - if (!should_match) { + if (data_file_evaluator != nullptr) { + DataFileStructLike data_file(*entry.data_file); + ICEBERG_ASSIGN_OR_RAISE(bool should_match, + data_file_evaluator->Evaluate(data_file)); + if (!should_match) { + continue; + } + } + + if (!manifest_entry_predicate_(entry)) { continue; } - } - if (!manifest_entry_predicate_(entry)) { - continue; + manifest_result[spec_id].push_back(std::move(entry)); } + return {}; + }); + } + ICEBERG_RETURN_UNEXPECTED(std::move(read_tasks).Run()); - result[spec_id].push_back(std::move(entry)); + std::unordered_map> result; + for (auto& manifest_result : manifest_results) { + result.merge(manifest_result); + for (auto& [spec_id, entries] : manifest_result) { + auto& spec_entries = result[spec_id]; + spec_entries.insert(spec_entries.end(), std::make_move_iterator(entries.begin()), + std::make_move_iterator(entries.end())); } } diff --git a/src/iceberg/manifest/manifest_group.h b/src/iceberg/manifest/manifest_group.h index 10b552786..cbc935047 100644 --- a/src/iceberg/manifest/manifest_group.h +++ b/src/iceberg/manifest/manifest_group.h @@ -36,6 +36,7 @@ #include "iceberg/result.h" #include "iceberg/type_fwd.h" #include "iceberg/util/error_collector.h" +#include "iceberg/util/executor.h" namespace iceberg { @@ -120,6 +121,9 @@ class ICEBERG_EXPORT ManifestGroup : public ErrorCollector { /// \param column_ids Field IDs of columns whose statistics should be preserved. ManifestGroup& ColumnsToKeepStats(std::unordered_set column_ids); + /// \brief Configure an optional executor for manifest planning. + ManifestGroup& PlanWith(OptionalExecutor executor); + /// \brief Plan scan tasks for all matching data files. Result>> PlanFiles(); @@ -158,6 +162,7 @@ class ICEBERG_EXPORT ManifestGroup : public ErrorCollector { std::function manifest_entry_predicate_; std::vector columns_; std::unordered_set columns_to_keep_stats_; + OptionalExecutor executor_; bool case_sensitive_ = true; bool ignore_deleted_ = false; bool ignore_existing_ = false; diff --git a/src/iceberg/manifest/manifest_merge_manager.cc b/src/iceberg/manifest/manifest_merge_manager.cc index 056dce3f5..b7d317cb2 100644 --- a/src/iceberg/manifest/manifest_merge_manager.cc +++ b/src/iceberg/manifest/manifest_merge_manager.cc @@ -31,6 +31,7 @@ #include "iceberg/manifest/manifest_reader.h" #include "iceberg/table_metadata.h" #include "iceberg/util/macros.h" +#include "iceberg/util/task_group.h" namespace iceberg { @@ -40,6 +41,11 @@ ManifestMergeManager::ManifestMergeManager(int64_t target_size_bytes, min_count_to_merge_(min_count_to_merge), merge_enabled_(merge_enabled) {} +ManifestMergeManager& ManifestMergeManager::PlanWith(OptionalExecutor executor) { + executor_ = executor; + return *this; +} + Result> ManifestMergeManager::MergeManifests( const std::vector& existing_manifests, const std::vector& new_manifests, int64_t snapshot_id, @@ -125,23 +131,31 @@ Result> ManifestMergeManager::MergeGroup( } std::ranges::reverse(bins); - // Process each bin: if the bin contains the newest manifest and is too small, - // pass its contents through unchanged. + std::vector> bin_results(bins.size()); + auto group_tasks = TaskGroup().SetExecutor(executor_); + for (auto&& [bin, result] : std::views::zip(bins, bin_results)) { + group_tasks.Submit([&]() -> Status { + bool contains_first = std::ranges::contains(bin, first); + if (contains_first && std::cmp_less(bin.size(), min_count_to_merge_)) { + result.reserve(bin.size()); + for (const auto* manifest : bin) { + result.push_back(*manifest); + } + } else { + ICEBERG_ASSIGN_OR_RAISE( + auto merged, FlushBin(bin, snapshot_id, metadata, file_io, writer_factory)); + result.push_back(std::move(merged)); + } + return {}; + }); + } + ICEBERG_RETURN_UNEXPECTED(std::move(group_tasks).Run()); + std::vector result; result.reserve(group.size()); - // TODO(Guotao): Flush independent bins in parallel and cache successful merged bins - // for commit retries. - for (auto& bin : bins) { - bool contains_first = std::ranges::find(bin, first) != bin.end(); - if (contains_first && std::cmp_less(bin.size(), min_count_to_merge_)) { - for (const auto* manifest : bin) { - result.push_back(*manifest); - } - } else { - ICEBERG_ASSIGN_OR_RAISE( - auto merged, FlushBin(bin, snapshot_id, metadata, file_io, writer_factory)); - result.push_back(std::move(merged)); - } + for (auto& bin_result : bin_results) { + result.insert(result.end(), std::make_move_iterator(bin_result.begin()), + std::make_move_iterator(bin_result.end())); } return result; diff --git a/src/iceberg/manifest/manifest_merge_manager.h b/src/iceberg/manifest/manifest_merge_manager.h index 16cc8d987..379d57f8a 100644 --- a/src/iceberg/manifest/manifest_merge_manager.h +++ b/src/iceberg/manifest/manifest_merge_manager.h @@ -31,6 +31,7 @@ #include "iceberg/manifest/manifest_writer.h" #include "iceberg/result.h" #include "iceberg/type_fwd.h" +#include "iceberg/util/executor.h" namespace iceberg { @@ -56,6 +57,9 @@ class ICEBERG_EXPORT ManifestMergeManager { ManifestMergeManager(const ManifestMergeManager&) = delete; ManifestMergeManager& operator=(const ManifestMergeManager&) = delete; + /// \brief Configure an optional executor for manifest merging. + ManifestMergeManager& PlanWith(OptionalExecutor executor); + /// \brief Merge existing and new manifests according to configured thresholds. /// /// Manifests are grouped by (partition_spec_id, content) — data and delete manifests @@ -109,6 +113,7 @@ class ICEBERG_EXPORT ManifestMergeManager { const int64_t target_size_bytes_; const int32_t min_count_to_merge_; const bool merge_enabled_; + OptionalExecutor executor_; }; } // namespace iceberg diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index a5a60b605..532a5ae91 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -134,6 +134,7 @@ iceberg_sources = files( 'util/snapshot_util.cc', 'util/string_util.cc', 'util/struct_like_set.cc', + 'util/task_group.cc', 'util/temporal_util.cc', 'util/timepoint.cc', 'util/transform_util.cc', diff --git a/src/iceberg/result.h b/src/iceberg/result.h index 765508705..01d17b299 100644 --- a/src/iceberg/result.h +++ b/src/iceberg/result.h @@ -19,9 +19,11 @@ #pragma once +#include #include #include #include +#include #include "iceberg/iceberg_export.h" @@ -126,4 +128,11 @@ DEFINE_ERROR_FUNCTION(ValidationFailed) #undef DEFINE_ERROR_FUNCTION +template +concept AsResult = std::derived_from, + Result::value_type>>; + +template +using ResultValueT = typename std::remove_cvref_t::value_type; + } // namespace iceberg diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 6881d34fb..fb4fbf3c5 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -288,6 +288,12 @@ TableScanBuilder& TableScanBuilder::MinRowsRequested( return *this; } +template +TableScanBuilder& TableScanBuilder::PlanWith(Executor& executor) { + context_.plan_executor = std::ref(executor); + return *this; +} + template TableScanBuilder& TableScanBuilder::UseSnapshot(int64_t snapshot_id) { ICEBERG_BUILDER_CHECK(!context_.snapshot_id.has_value(), @@ -538,7 +544,8 @@ Result>> DataTableScan::PlanFiles() co .Select(ScanColumns()) .FilterData(filter()) .IgnoreDeleted() - .ColumnsToKeepStats(context_.columns_to_keep_stats); + .ColumnsToKeepStats(context_.columns_to_keep_stats) + .PlanWith(context_.plan_executor); if (context_.ignore_residuals) { manifest_group->IgnoreResiduals(); } @@ -641,7 +648,8 @@ Result>> IncrementalAppendScan::PlanFi entry.status == ManifestStatus::kAdded; }) .IgnoreDeleted() - .ColumnsToKeepStats(context_.columns_to_keep_stats); + .ColumnsToKeepStats(context_.columns_to_keep_stats) + .PlanWith(context_.plan_executor); if (context_.ignore_residuals) { manifest_group->IgnoreResiduals(); @@ -737,7 +745,8 @@ IncrementalChangelogScan::PlanFiles(std::optional from_snapshot_id_excl snapshot_ids.contains(entry.snapshot_id.value()); }) .IgnoreExisting() - .ColumnsToKeepStats(context_.columns_to_keep_stats); + .ColumnsToKeepStats(context_.columns_to_keep_stats) + .PlanWith(context_.plan_executor); if (context_.ignore_residuals) { manifest_group->IgnoreResiduals(); diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index 64fb3ffd1..21561d46d 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -32,6 +32,7 @@ #include "iceberg/table_metadata.h" #include "iceberg/type_fwd.h" #include "iceberg/util/error_collector.h" +#include "iceberg/util/executor.h" namespace iceberg { @@ -228,6 +229,7 @@ struct TableScanContext { std::optional to_snapshot_id; std::string branch{}; std::optional min_rows_requested; + OptionalExecutor plan_executor; // Validate the context parameters to see if they have conflicts. [[nodiscard]] Status Validate() const; @@ -302,6 +304,11 @@ class ICEBERG_TEMPLATE_CLASS_EXPORT TableScanBuilder : public ErrorCollector { /// \param num_rows The minimum number of rows requested TableScanBuilder& MinRowsRequested(int64_t num_rows); + /// \brief Configure an executor for manifest planning. + /// + /// The executor is borrowed and must outlive the built scan. + TableScanBuilder& PlanWith(Executor& executor); + /// \brief Request this scan to use the given snapshot by ID. /// \param snapshot_id a snapshot ID /// \note InvalidArgument will be returned if the snapshot cannot be found diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 997d18354..5e2e2b1d1 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -135,6 +135,7 @@ add_iceberg_test(util_test retry_util_test.cc string_util_test.cc struct_like_set_test.cc + task_group_test.cc temporal_util_test.cc transform_util_test.cc truncate_util_test.cc diff --git a/src/iceberg/test/arrow_test.cc b/src/iceberg/test/arrow_test.cc index dcfdb6b56..a954cce88 100644 --- a/src/iceberg/test/arrow_test.cc +++ b/src/iceberg/test/arrow_test.cc @@ -17,28 +17,39 @@ * under the License. */ +#include #include +#include +#include #include +#include +#include #include #include #include #include #include +#include #include +#include #include +#include "iceberg/arrow/arrow_status_internal.h" #include "iceberg/constants.h" +#include "iceberg/result.h" #include "iceberg/schema.h" #include "iceberg/schema_internal.h" #include "iceberg/test/matchers.h" +#include "iceberg/util/executor.h" +#include "iceberg/util/task_group.h" namespace iceberg { struct ToArrowSchemaParam { std::shared_ptr iceberg_type; bool optional = true; - std::shared_ptr arrow_type; + std::shared_ptr<::arrow::DataType> arrow_type; }; class ToArrowSchemaTest : public ::testing::TestWithParam {}; @@ -89,17 +100,17 @@ INSTANTIATE_TEST_SUITE_P( ToArrowSchemaParam{.iceberg_type = iceberg::date(), .arrow_type = ::arrow::date32()}, ToArrowSchemaParam{.iceberg_type = iceberg::time(), - .arrow_type = ::arrow::time64(arrow::TimeUnit::MICRO)}, + .arrow_type = ::arrow::time64(::arrow::TimeUnit::MICRO)}, ToArrowSchemaParam{.iceberg_type = iceberg::timestamp(), - .arrow_type = ::arrow::timestamp(arrow::TimeUnit::MICRO)}, + .arrow_type = ::arrow::timestamp(::arrow::TimeUnit::MICRO)}, ToArrowSchemaParam{ .iceberg_type = iceberg::timestamp_tz(), - .arrow_type = ::arrow::timestamp(arrow::TimeUnit::MICRO, "UTC")}, + .arrow_type = ::arrow::timestamp(::arrow::TimeUnit::MICRO, "UTC")}, ToArrowSchemaParam{.iceberg_type = iceberg::timestamp_ns(), - .arrow_type = ::arrow::timestamp(arrow::TimeUnit::NANO)}, + .arrow_type = ::arrow::timestamp(::arrow::TimeUnit::NANO)}, ToArrowSchemaParam{ .iceberg_type = iceberg::timestamptz_ns(), - .arrow_type = ::arrow::timestamp(arrow::TimeUnit::NANO, "UTC")}, + .arrow_type = ::arrow::timestamp(::arrow::TimeUnit::NANO, "UTC")}, ToArrowSchemaParam{.iceberg_type = iceberg::string(), .arrow_type = ::arrow::utf8()}, ToArrowSchemaParam{.iceberg_type = iceberg::binary(), @@ -234,7 +245,7 @@ TEST(ToArrowSchemaTest, MapType) { } struct FromArrowSchemaParam { - std::shared_ptr arrow_type; + std::shared_ptr<::arrow::DataType> arrow_type; bool optional = true; std::shared_ptr iceberg_type; }; @@ -288,17 +299,17 @@ INSTANTIATE_TEST_SUITE_P( .iceberg_type = iceberg::decimal(10, 2)}, FromArrowSchemaParam{.arrow_type = ::arrow::date32(), .iceberg_type = iceberg::date()}, - FromArrowSchemaParam{.arrow_type = ::arrow::time64(arrow::TimeUnit::MICRO), + FromArrowSchemaParam{.arrow_type = ::arrow::time64(::arrow::TimeUnit::MICRO), .iceberg_type = iceberg::time()}, - FromArrowSchemaParam{.arrow_type = ::arrow::timestamp(arrow::TimeUnit::MICRO), + FromArrowSchemaParam{.arrow_type = ::arrow::timestamp(::arrow::TimeUnit::MICRO), .iceberg_type = iceberg::timestamp()}, FromArrowSchemaParam{ - .arrow_type = ::arrow::timestamp(arrow::TimeUnit::MICRO, "UTC"), + .arrow_type = ::arrow::timestamp(::arrow::TimeUnit::MICRO, "UTC"), .iceberg_type = std::make_shared()}, - FromArrowSchemaParam{.arrow_type = ::arrow::timestamp(arrow::TimeUnit::NANO), + FromArrowSchemaParam{.arrow_type = ::arrow::timestamp(::arrow::TimeUnit::NANO), .iceberg_type = iceberg::timestamp_ns()}, FromArrowSchemaParam{ - .arrow_type = ::arrow::timestamp(arrow::TimeUnit::NANO, "UTC"), + .arrow_type = ::arrow::timestamp(::arrow::TimeUnit::NANO, "UTC"), .iceberg_type = iceberg::timestamptz_ns()}, FromArrowSchemaParam{.arrow_type = ::arrow::utf8(), .iceberg_type = iceberg::string()}, @@ -467,4 +478,50 @@ TEST(FromArrowSchemaTest, MapType) { ASSERT_EQ(value.type()->type_id(), TypeId::kInt); } +TEST(ArrowExecutorAdapterTest, RunsTaskGroupOnThreadPool) { +#ifndef ARROW_ENABLE_THREADING + GTEST_SKIP() << "Test requires ARROW_ENABLE_THREADING=ON"; +#endif + + class ArrowExecutorAdapter final : public Executor { + public: + explicit ArrowExecutorAdapter(::arrow::internal::Executor& executor) + : executor_(executor) {} + + Status Submit(ExecutorTask task) override { + ICEBERG_ARROW_RETURN_NOT_OK(executor_.Spawn(std::move(task))); + return {}; + } + + private: + ::arrow::internal::Executor& executor_; + }; + + auto thread_pool = ::arrow::internal::ThreadPool::Make(2).ValueOrDie(); + ArrowExecutorAdapter executor(*thread_pool); + + std::mutex mutex; + std::vector thread_ids; + + auto status = TaskGroup<>() + .SetExecutor(std::ref(executor)) + .Submit([&]() -> Status { + std::lock_guard lock(mutex); + thread_ids.push_back(std::this_thread::get_id()); + return {}; + }) + .Submit([&]() -> Status { + std::lock_guard lock(mutex); + thread_ids.push_back(std::this_thread::get_id()); + return {}; + }) + .Run(); + + EXPECT_THAT(status, IsOk()); + EXPECT_EQ(thread_ids.size(), 2); + EXPECT_NE(thread_ids[0], std::this_thread::get_id()); + EXPECT_NE(thread_ids[1], std::this_thread::get_id()); + EXPECT_TRUE(thread_pool->Shutdown().ok()); +} + } // namespace iceberg diff --git a/src/iceberg/test/executor.h b/src/iceberg/test/executor.h new file mode 100644 index 000000000..45ec9d8e3 --- /dev/null +++ b/src/iceberg/test/executor.h @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include "iceberg/result.h" +#include "iceberg/util/executor.h" + +namespace iceberg::test { + +class InlineExecutor final : public Executor { + public: + explicit InlineExecutor(Status submit_status = {}) + : submit_status_(std::move(submit_status)) {} + + Status Submit(ExecutorTask task) override { + ++submit_count_; + if (!submit_status_.has_value()) { + return std::unexpected(submit_status_.error()); + } + + std::move(task)(); + return {}; + } + + int submit_count() const { return submit_count_; } + + private: + Status submit_status_; + int submit_count_ = 0; +}; + +} // namespace iceberg::test diff --git a/src/iceberg/test/manifest_group_test.cc b/src/iceberg/test/manifest_group_test.cc index 70e2cea99..1cb0a64d1 100644 --- a/src/iceberg/test/manifest_group_test.cc +++ b/src/iceberg/test/manifest_group_test.cc @@ -39,6 +39,7 @@ #include "iceberg/partition_spec.h" #include "iceberg/schema.h" #include "iceberg/table_scan.h" +#include "iceberg/test/executor.h" #include "iceberg/test/matchers.h" #include "iceberg/transform.h" #include "iceberg/type.h" @@ -329,6 +330,44 @@ TEST_P(ManifestGroupTest, IgnoreDeleted) { "/path/to/existing.parquet")); } +TEST_P(ManifestGroupTest, PlanWithExecutor) { + auto version = GetParam(); + + constexpr int64_t kSnapshotId = 1000L; + constexpr int64_t kSequenceNumber = 1L; + const auto partition_a = PartitionValues({Literal::Int(0)}); + const auto partition_b = PartitionValues({Literal::Int(1)}); + + auto manifest_a = + WriteDataManifest(version, kSnapshotId, + {MakeEntry(ManifestStatus::kAdded, kSnapshotId, kSequenceNumber, + MakeDataFile("/path/to/data-a.parquet", partition_a, + partitioned_spec_->spec_id()))}, + partitioned_spec_); + auto manifest_b = + WriteDataManifest(version, kSnapshotId, + {MakeEntry(ManifestStatus::kAdded, kSnapshotId, kSequenceNumber, + MakeDataFile("/path/to/data-b.parquet", partition_b, + partitioned_spec_->spec_id()))}, + partitioned_spec_); + + std::vector manifests = { + WriteAndReadManifestListEntry(version, kSnapshotId, kSequenceNumber, manifest_a), + WriteAndReadManifestListEntry(version, kSnapshotId, kSequenceNumber, manifest_b)}; + ICEBERG_UNWRAP_OR_FAIL( + auto group, + ManifestGroup::Make(file_io_, schema_, GetSpecsById(), std::move(manifests))); + + test::InlineExecutor executor; + group->PlanWith(std::ref(executor)); + + ICEBERG_UNWRAP_OR_FAIL(auto tasks, group->PlanFiles()); + ASSERT_EQ(tasks.size(), 2); + EXPECT_THAT(GetPaths(tasks), testing::UnorderedElementsAre("/path/to/data-a.parquet", + "/path/to/data-b.parquet")); + EXPECT_EQ(executor.submit_count(), 2); +} + TEST_P(ManifestGroupTest, IgnoreExisting) { auto version = GetParam(); diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index b21a264b1..226217ca7 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -99,6 +99,7 @@ iceberg_tests = { 'roaring_position_bitmap_test.cc', 'string_util_test.cc', 'struct_like_set_test.cc', + 'task_group_test.cc', 'temporal_util_test.cc', 'transform_util_test.cc', 'truncate_util_test.cc', diff --git a/src/iceberg/test/retry.h b/src/iceberg/test/retry.h new file mode 100644 index 000000000..ce96ce163 --- /dev/null +++ b/src/iceberg/test/retry.h @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "iceberg/result.h" +#include "iceberg/util/retry_util.h" +#include "iceberg/util/retry_util_internal.h" + +namespace iceberg::test { + +using CommitFailedRetry = retry::OnlyRetryOn; + +using TransientIORetry = + retry::OnlyRetryOn; + +class FakeRetryEnvironment { + public: + using Duration = RetryTestHooks::Duration; + using TimePoint = RetryTestHooks::TimePoint; + + FakeRetryEnvironment() { + hooks_.now = [this]() { return now_; }; + hooks_.sleep_for = [this](Duration duration) { + sleep_durations_.push_back(duration); + now_ += duration; + }; + hooks_.jitter = [this](int32_t base_delay_ms) { + observed_base_delays_ms_.push_back(base_delay_ms); + return base_delay_ms + jitter_offset_ms_; + }; + } + + void Advance(Duration duration) { now_ += duration; } + + void SetJitterOffsetMs(int32_t jitter_offset_ms) { + jitter_offset_ms_ = jitter_offset_ms; + } + + const RetryTestHooks& hooks() const { return hooks_; } + + const std::vector& sleep_durations() const { return sleep_durations_; } + + const std::vector& observed_base_delays_ms() const { + return observed_base_delays_ms_; + } + + private: + RetryTestHooks hooks_; + TimePoint now_{}; + int32_t jitter_offset_ms_ = 0; + std::vector sleep_durations_; + std::vector observed_base_delays_ms_; +}; + +} // namespace iceberg::test diff --git a/src/iceberg/test/retry_util_test.cc b/src/iceberg/test/retry_util_test.cc index ead221910..1b2f35578 100644 --- a/src/iceberg/test/retry_util_test.cc +++ b/src/iceberg/test/retry_util_test.cc @@ -27,6 +27,7 @@ #include "iceberg/result.h" #include "iceberg/test/matchers.h" +#include "iceberg/test/retry.h" #include "iceberg/util/retry_util_internal.h" namespace iceberg { @@ -40,50 +41,16 @@ struct NonResultReturningTask { int operator()() const { return 1; } }; +using test::CommitFailedRetry; +using test::FakeRetryEnvironment; +using test::TransientIORetry; + static_assert(detail::RetryTask); static_assert(!detail::RetryTask); -static_assert(requires(RetryRunner runner, ResultReturningTask task) { +static_assert(requires(RetryRunner runner, ResultReturningTask task) { { runner.Run(task) } -> std::same_as>; }); - -class FakeRetryEnvironment { - public: - using Duration = RetryTestHooks::Duration; - using TimePoint = RetryTestHooks::TimePoint; - - FakeRetryEnvironment() { - hooks_.now = [this]() { return now_; }; - hooks_.sleep_for = [this](Duration duration) { - sleep_durations_.push_back(duration); - now_ += duration; - }; - hooks_.jitter = [this](int32_t base_delay_ms) { - observed_base_delays_ms_.push_back(base_delay_ms); - return base_delay_ms + jitter_offset_ms_; - }; - } - - void Advance(Duration duration) { now_ += duration; } - - void SetJitterOffsetMs(int32_t jitter_offset_ms) { - jitter_offset_ms_ = jitter_offset_ms; - } - - const RetryTestHooks& hooks() const { return hooks_; } - - const std::vector& sleep_durations() const { return sleep_durations_; } - - const std::vector& observed_base_delays_ms() const { - return observed_base_delays_ms_; - } - - private: - RetryTestHooks hooks_; - TimePoint now_{}; - int32_t jitter_offset_ms_ = 0; - std::vector sleep_durations_; - std::vector observed_base_delays_ms_; -}; +static_assert(retry::NoRetry::kMode == retry::RetryPolicyMode::kNoRetry); } // namespace @@ -91,11 +58,10 @@ TEST(RetryRunnerTest, SuccessOnFirstAttempt) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(RetryConfig{.num_retries = 3, - .min_wait_ms = 1, - .max_wait_ms = 10, - .total_timeout_ms = 5000}) - .OnlyRetryOn(ErrorKind::kCommitFailed) + auto result = RetryRunner(RetryConfig{.num_retries = 3, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) .Run( [&]() -> Result { ++call_count; @@ -113,11 +79,10 @@ TEST(RetryRunnerTest, RetryOnceThenSucceed) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(RetryConfig{.num_retries = 3, - .min_wait_ms = 1, - .max_wait_ms = 10, - .total_timeout_ms = 5000}) - .OnlyRetryOn(ErrorKind::kCommitFailed) + auto result = RetryRunner(RetryConfig{.num_retries = 3, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) .Run( [&]() -> Result { ++call_count; @@ -138,11 +103,10 @@ TEST(RetryRunnerTest, MaxAttemptsExhausted) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(RetryConfig{.num_retries = 2, - .min_wait_ms = 1, - .max_wait_ms = 10, - .total_timeout_ms = 5000}) - .OnlyRetryOn(ErrorKind::kCommitFailed) + auto result = RetryRunner(RetryConfig{.num_retries = 2, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) .Run( [&]() -> Result { ++call_count; @@ -159,11 +123,10 @@ TEST(RetryRunnerTest, OnlyRetryOnFilter) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(RetryConfig{.num_retries = 3, - .min_wait_ms = 1, - .max_wait_ms = 10, - .total_timeout_ms = 5000}) - .OnlyRetryOn(ErrorKind::kCommitFailed) + auto result = RetryRunner(RetryConfig{.num_retries = 3, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) .Run( [&]() -> Result { ++call_count; @@ -180,11 +143,10 @@ TEST(RetryRunnerTest, OnlyRetryOnMatchingError) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(RetryConfig{.num_retries = 2, - .min_wait_ms = 1, - .max_wait_ms = 10, - .total_timeout_ms = 5000}) - .OnlyRetryOn(ErrorKind::kCommitFailed) + auto result = RetryRunner(RetryConfig{.num_retries = 2, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) .Run( [&]() -> Result { ++call_count; @@ -201,41 +163,15 @@ TEST(RetryRunnerTest, OnlyRetryOnMatchingError) { EXPECT_EQ(attempts, 3); } -TEST(RetryRunnerTest, OnlyRetryOnTakesPrecedenceOverStopRetryOn) { - int call_count = 0; - int32_t attempts = 0; - - auto result = RetryRunner(RetryConfig{.num_retries = 2, - .min_wait_ms = 1, - .max_wait_ms = 10, - .total_timeout_ms = 5000}) - .OnlyRetryOn(ErrorKind::kCommitFailed) - .StopRetryOn(ErrorKind::kCommitFailed) - .Run( - [&]() -> Result { - ++call_count; - if (call_count == 1) { - return CommitFailed("transient"); - } - return 100; - }, - &attempts); - - EXPECT_THAT(result, IsOk()); - EXPECT_EQ(*result, 100); - EXPECT_EQ(call_count, 2); - EXPECT_EQ(attempts, 2); -} - TEST(RetryRunnerTest, StopRetryOnMatchingError) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(RetryConfig{.num_retries = 5, - .min_wait_ms = 1, - .max_wait_ms = 10, - .total_timeout_ms = 5000}) - .StopRetryOn(ErrorKind::kCommitStateUnknown) + auto result = RetryRunner>( + RetryConfig{.num_retries = 5, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) .Run( [&]() -> Result { ++call_count; @@ -252,11 +188,11 @@ TEST(RetryRunnerTest, StopRetryOnNonMatchingErrorAllowsRetry) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(RetryConfig{.num_retries = 2, - .min_wait_ms = 1, - .max_wait_ms = 10, - .total_timeout_ms = 5000}) - .StopRetryOn({ErrorKind::kCommitStateUnknown}) + auto result = RetryRunner>( + RetryConfig{.num_retries = 2, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) .Run( [&]() -> Result { ++call_count; @@ -277,11 +213,11 @@ TEST(RetryRunnerTest, ZeroRetriesAllowsUnsetPolicyAndSkipsBackoffValidation) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(RetryConfig{.num_retries = 0, - .min_wait_ms = 0, - .max_wait_ms = 0, - .total_timeout_ms = 5000, - .scale_factor = 0.5}) + auto result = RetryRunner(RetryConfig{.num_retries = 0, + .min_wait_ms = 0, + .max_wait_ms = 0, + .total_timeout_ms = 5000, + .scale_factor = 0.5}) .Run( [&]() -> Result { ++call_count; @@ -298,10 +234,10 @@ TEST(RetryRunnerTest, NegativeRetriesFailsBeforeTaskRuns) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(RetryConfig{.num_retries = -1, - .min_wait_ms = 1, - .max_wait_ms = 10, - .total_timeout_ms = 5000}) + auto result = RetryRunner(RetryConfig{.num_retries = -1, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) .Run( [&]() -> Result { ++call_count; @@ -360,8 +296,7 @@ TEST(RetryRunnerTest, InvalidBackoffConfigFailsBeforeTaskRuns) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(test_case.config) - .OnlyRetryOn(ErrorKind::kCommitFailed) + auto result = RetryRunner(test_case.config) .Run( [&]() -> Result { ++call_count; @@ -377,36 +312,14 @@ TEST(RetryRunnerTest, InvalidBackoffConfigFailsBeforeTaskRuns) { } } -TEST(RetryRunnerTest, UnsetRetryPolicyFailsBeforeTaskRuns) { +TEST(RetryRunnerTest, NoRetryWithRetries) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(RetryConfig{.num_retries = 1, - .min_wait_ms = 1, - .max_wait_ms = 10, - .total_timeout_ms = 5000}) - .Run( - [&]() -> Result { - ++call_count; - return CommitFailed("fail"); - }, - &attempts); - - EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); - EXPECT_THAT(result, HasErrorMessage("Retry policy must be explicitly configured")); - EXPECT_EQ(call_count, 0); - EXPECT_EQ(attempts, 0); -} - -TEST(RetryRunnerTest, EmptyOnlyRetryOnPolicyFailsBeforeTaskRuns) { - int call_count = 0; - int32_t attempts = 0; - - auto result = RetryRunner(RetryConfig{.num_retries = 1, - .min_wait_ms = 1, - .max_wait_ms = 10, - .total_timeout_ms = 5000}) - .OnlyRetryOn(std::initializer_list{}) + auto result = RetryRunner(RetryConfig{.num_retries = 1, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) .Run( [&]() -> Result { ++call_count; @@ -416,30 +329,7 @@ TEST(RetryRunnerTest, EmptyOnlyRetryOnPolicyFailsBeforeTaskRuns) { EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); EXPECT_THAT(result, - HasErrorMessage("Retry policy must include at least one error kind")); - EXPECT_EQ(call_count, 0); - EXPECT_EQ(attempts, 0); -} - -TEST(RetryRunnerTest, EmptyStopRetryOnPolicyFailsBeforeTaskRuns) { - int call_count = 0; - int32_t attempts = 0; - - auto result = RetryRunner(RetryConfig{.num_retries = 1, - .min_wait_ms = 1, - .max_wait_ms = 10, - .total_timeout_ms = 5000}) - .StopRetryOn({}) - .Run( - [&]() -> Result { - ++call_count; - return CommitFailed("fail"); - }, - &attempts); - - EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); - EXPECT_THAT(result, - HasErrorMessage("Retry policy must include at least one error kind")); + HasErrorMessage("Retry policy must be enabled when num_retries > 0")); EXPECT_EQ(call_count, 0); EXPECT_EQ(attempts, 0); } @@ -450,11 +340,10 @@ TEST(RetryRunnerTest, TotalTimeoutStopsBeforeStartingAnotherAttempt) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(RetryConfig{.num_retries = 3, - .min_wait_ms = 20, - .max_wait_ms = 20, - .total_timeout_ms = 15}) - .OnlyRetryOn(ErrorKind::kCommitFailed) + auto result = RetryRunner(RetryConfig{.num_retries = 3, + .min_wait_ms = 20, + .max_wait_ms = 20, + .total_timeout_ms = 15}) .Run( [&]() -> Result { ++call_count; @@ -478,11 +367,10 @@ TEST(RetryRunnerTest, TotalTimeoutStopsWhenDelayEqualsRemainingBudget) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(RetryConfig{.num_retries = 3, - .min_wait_ms = 10, - .max_wait_ms = 10, - .total_timeout_ms = 20}) - .OnlyRetryOn(ErrorKind::kCommitFailed) + auto result = RetryRunner(RetryConfig{.num_retries = 3, + .min_wait_ms = 10, + .max_wait_ms = 10, + .total_timeout_ms = 20}) .Run( [&]() -> Result { ++call_count; @@ -504,11 +392,10 @@ TEST(RetryRunnerTest, NonPositiveTotalTimeoutDisablesDeadline) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(RetryConfig{.num_retries = 2, - .min_wait_ms = 10, - .max_wait_ms = 10, - .total_timeout_ms = 0}) - .OnlyRetryOn(ErrorKind::kCommitFailed) + auto result = RetryRunner(RetryConfig{.num_retries = 2, + .min_wait_ms = 10, + .max_wait_ms = 10, + .total_timeout_ms = 0}) .Run( [&]() -> Result { ++call_count; @@ -537,11 +424,10 @@ TEST(RetryRunnerTest, RetryDelayDoesNotExceedMaxWaitAfterJitter) { int call_count = 0; int32_t attempts = 0; - auto result = RetryRunner(RetryConfig{.num_retries = 1, - .min_wait_ms = 10, - .max_wait_ms = 10, - .total_timeout_ms = 0}) - .OnlyRetryOn(ErrorKind::kCommitFailed) + auto result = RetryRunner(RetryConfig{.num_retries = 1, + .min_wait_ms = 10, + .max_wait_ms = 10, + .total_timeout_ms = 0}) .Run( [&]() -> Result { ++call_count; @@ -603,24 +489,26 @@ TEST(RetryRunnerTest, OnlyRetryOnMultipleErrorKinds) { int call_count = 0; int32_t attempts = 0; - auto result = - RetryRunner(RetryConfig{.num_retries = 5, - .min_wait_ms = 1, - .max_wait_ms = 10, - .total_timeout_ms = 5000}) - .OnlyRetryOn({ErrorKind::kCommitFailed, ErrorKind::kServiceUnavailable}) - .Run( - [&]() -> Result { - ++call_count; - if (call_count == 1) { - return CommitFailed("conflict"); - } - if (call_count == 2) { - return ServiceUnavailable("server busy"); - } - return 77; - }, - &attempts); + using CommitOrUnavailable = + retry::RetryPolicy; + + auto result = RetryRunner(RetryConfig{.num_retries = 5, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) + .Run( + [&]() -> Result { + ++call_count; + if (call_count == 1) { + return CommitFailed("conflict"); + } + if (call_count == 2) { + return ServiceUnavailable("server busy"); + } + return 77; + }, + &attempts); EXPECT_THAT(result, IsOk()); EXPECT_EQ(*result, 77); @@ -628,4 +516,50 @@ TEST(RetryRunnerTest, OnlyRetryOnMultipleErrorKinds) { EXPECT_EQ(attempts, 3); } +TEST(RetryRunnerTest, RetriesTransientIO) { + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner(RetryConfig{.num_retries = 3, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) + .Run( + [&]() -> Status { + ++call_count; + if (call_count == 1) { + return IOError("read failed"); + } + if (call_count == 2) { + return ServiceUnavailable("server busy"); + } + return {}; + }, + &attempts); + + EXPECT_THAT(result, IsOk()); + EXPECT_EQ(call_count, 3); + EXPECT_EQ(attempts, 3); +} + +TEST(RetryRunnerTest, DoesNotRetryNotFound) { + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner(RetryConfig{.num_retries = 3, + .min_wait_ms = 1, + .max_wait_ms = 10, + .total_timeout_ms = 5000}) + .Run( + [&]() -> Status { + ++call_count; + return NotFound("missing file"); + }, + &attempts); + + EXPECT_THAT(result, IsError(ErrorKind::kNotFound)); + EXPECT_EQ(call_count, 1); + EXPECT_EQ(attempts, 1); +} + } // namespace iceberg diff --git a/src/iceberg/test/table_scan_test.cc b/src/iceberg/test/table_scan_test.cc index 11905a870..c0863e80d 100644 --- a/src/iceberg/test/table_scan_test.cc +++ b/src/iceberg/test/table_scan_test.cc @@ -30,6 +30,7 @@ #include "iceberg/expression/expressions.h" #include "iceberg/snapshot.h" #include "iceberg/table_metadata.h" +#include "iceberg/test/executor.h" #include "iceberg/test/scan_test_base.h" namespace iceberg { @@ -401,6 +402,72 @@ TEST_P(TableScanTest, PlanFilesWithMultipleManifests) { "/path/to/data2.parquet")); } +TEST_P(TableScanTest, PlanWithExecutor) { + auto version = GetParam(); + + const auto partition_a = PartitionValues({Literal::Int(0)}); + const auto partition_b = PartitionValues({Literal::Int(1)}); + + std::vector data_entries_1{MakeEntry( + ManifestStatus::kAdded, /*snapshot_id=*/1000L, /*sequence_number=*/1, + MakeDataFile("/path/to/data1.parquet", partition_a, partitioned_spec_->spec_id()))}; + auto data_manifest_1 = WriteDataManifest(version, /*snapshot_id=*/1000L, + std::move(data_entries_1), partitioned_spec_); + + std::vector data_entries_2{MakeEntry( + ManifestStatus::kAdded, /*snapshot_id=*/1000L, /*sequence_number=*/1, + MakeDataFile("/path/to/data2.parquet", partition_b, partitioned_spec_->spec_id()))}; + auto data_manifest_2 = WriteDataManifest(version, /*snapshot_id=*/1000L, + std::move(data_entries_2), partitioned_spec_); + + std::string manifest_list_path = + WriteManifestList(version, /*snapshot_id=*/1000L, /*sequence_number=*/1, + {data_manifest_1, data_manifest_2}); + + auto timestamp_ms = TimePointMsFromUnixMs(1609459200000L); + auto snapshot_with_manifests = + std::make_shared(Snapshot{.snapshot_id = 1000L, + .parent_snapshot_id = std::nullopt, + .sequence_number = 1L, + .timestamp_ms = timestamp_ms, + .manifest_list = manifest_list_path, + .summary = {}, + .schema_id = schema_->schema_id()}); + + auto metadata_with_manifests = std::make_shared( + TableMetadata{.format_version = 2, + .table_uuid = "test-table-uuid", + .location = "/tmp/table", + .last_sequence_number = 1L, + .last_updated_ms = timestamp_ms, + .last_column_id = 2, + .schemas = {schema_}, + .current_schema_id = schema_->schema_id(), + .partition_specs = {partitioned_spec_, unpartitioned_spec_}, + .default_spec_id = partitioned_spec_->spec_id(), + .last_partition_id = 1000, + .current_snapshot_id = 1000L, + .snapshots = {snapshot_with_manifests}, + .snapshot_log = {SnapshotLogEntry{.timestamp_ms = timestamp_ms, + .snapshot_id = 1000L}}, + .default_sort_order_id = 0, + .refs = {{"main", std::make_shared(SnapshotRef{ + .snapshot_id = 1000L, + .retention = SnapshotRef::Branch{}, + })}}}); + + test::InlineExecutor executor; + ICEBERG_UNWRAP_OR_FAIL(auto builder, + DataTableScanBuilder::Make(metadata_with_manifests, file_io_)); + builder->PlanWith(executor); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 2); + EXPECT_THAT(GetPaths(tasks), testing::UnorderedElementsAre("/path/to/data1.parquet", + "/path/to/data2.parquet")); + EXPECT_EQ(executor.submit_count(), 2); +} + TEST_P(TableScanTest, PlanFilesWithFilter) { auto version = GetParam(); diff --git a/src/iceberg/test/task_group_test.cc b/src/iceberg/test/task_group_test.cc new file mode 100644 index 000000000..a14f01827 --- /dev/null +++ b/src/iceberg/test/task_group_test.cc @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/util/task_group.h" + +#include +#include +#include +#include +#include + +#include + +#include "iceberg/result.h" +#include "iceberg/test/executor.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/retry.h" +#include "iceberg/util/functional.h" + +namespace iceberg { + +namespace { + +RetryConfig FastRetryConfig(int32_t num_retries = 2) { + return RetryConfig{.num_retries = num_retries, + .min_wait_ms = 1, + .max_wait_ms = 1, + .total_timeout_ms = 0}; +} + +} // namespace + +TEST(TaskGroupCompileTest, TaskConcepts) { + auto move_only_once_lambda = [value = std::make_unique(1)]() mutable -> Status { + return {}; + }; + using MoveOnlyOnceLambda = decltype(move_only_once_lambda); + + auto copyable_mutable_lambda = [attempt = 0]() mutable -> Status { + return ++attempt > 0 ? Status{} : Status{}; + }; + using CopyableMutableLambda = decltype(copyable_mutable_lambda); + + static_assert(!std::copy_constructible>); + static_assert(!std::default_initializable>); + static_assert(std::move_constructible>); + + static_assert(internal::OnceStatusTask); + static_assert(!internal::OnceStatusTask); + static_assert(internal::OnceStatusTask); + + static_assert(internal::RetryableStatusTask); + static_assert(internal::RetryableStatusTask); + static_assert(!internal::RetryableStatusTask); +} + +TEST(FnOnceTest, SupportsMoveOnlyCapture) { + auto value = std::make_unique(41); + FnOnce task([value = std::move(value)]() { return *value + 1; }); + + EXPECT_EQ(std::move(task)(), 42); +} + +TEST(TaskGroupTest, UsesExecutor) { + test::InlineExecutor executor; + TaskGroup<> group; + bool ran = false; + + group.SetExecutor(std::ref(executor)); + group.Submit([&]() -> Status { + ran = true; + return {}; + }); + + EXPECT_THAT(std::move(group).Run(), IsOk()); + EXPECT_TRUE(ran); + EXPECT_EQ(executor.submit_count(), 1); +} + +TEST(TaskGroupTest, ReturnsSubmitError) { + test::InlineExecutor executor(ServiceUnavailable("executor busy")); + TaskGroup<> group; + + group.SetExecutor(std::ref(executor)); + group.Submit([]() -> Status { return {}; }); + + EXPECT_THAT(std::move(group).Run(), IsError(ErrorKind::kServiceUnavailable)); + EXPECT_EQ(executor.submit_count(), 1); +} + +TEST(TaskGroupTest, DirectMoveOnlyTask) { + TaskGroup<> group; + auto value = std::make_unique(7); + int observed = 0; + + group.Submit([value = std::move(value), &observed]() mutable -> Status { + observed = *value; + return {}; + }); + + EXPECT_THAT(std::move(group).Run(), IsOk()); + EXPECT_EQ(observed, 7); +} + +TEST(TaskGroupTest, ClearsExecutor) { + test::InlineExecutor executor; + TaskGroup<> group; + int call_count = 0; + + group.SetExecutor(std::ref(executor)); + group.SetExecutor(std::nullopt); + group.Submit([&]() -> Status { + ++call_count; + return {}; + }); + + EXPECT_THAT(std::move(group).Run(), IsOk()); + EXPECT_EQ(call_count, 1); + EXPECT_EQ(executor.submit_count(), 0); +} + +TEST(TaskGroupTest, FluentSubmit) { + test::InlineExecutor executor; + int call_count = 0; + + auto status = TaskGroup<>() + .SetExecutor(std::ref(executor)) + .Submit([&]() -> Status { + ++call_count; + return {}; + }) + .Submit([&]() -> Status { + ++call_count; + return {}; + }) + .Run(); + + EXPECT_THAT(status, IsOk()); + EXPECT_EQ(call_count, 2); + EXPECT_EQ(executor.submit_count(), 2); +} + +TEST(TaskGroupTest, DirectAggregatesErrors) { + TaskGroup<> group; + int call_count = 0; + + group.Submit([&]() -> Status { + ++call_count; + return IOError("first failure"); + }); + group.Submit([&]() -> Status { + ++call_count; + return ValidationFailed("second failure"); + }); + + auto status = std::move(group).Run(); + EXPECT_THAT(status, IsError(ErrorKind::kIOError)); + EXPECT_THAT(status, HasErrorMessage("Task group failed with 2 errors")); + EXPECT_THAT(status, HasErrorMessage("first failure")); + EXPECT_THAT(status, HasErrorMessage("second failure")); + EXPECT_EQ(call_count, 2); +} + +TEST(TaskGroupTest, ParallelSubmitsAll) { + test::InlineExecutor executor; + TaskGroup<> group; + int call_count = 0; + + group.SetExecutor(std::ref(executor)); + group.Submit([&]() -> Status { + ++call_count; + return {}; + }); + group.Submit([&]() -> Status { + ++call_count; + return {}; + }); + + EXPECT_THAT(std::move(group).Run(), IsOk()); + EXPECT_EQ(call_count, 2); + EXPECT_EQ(executor.submit_count(), 2); +} + +TEST(TaskGroupTest, ParallelAggregatesErrors) { + test::InlineExecutor executor; + TaskGroup<> group; + int call_count = 0; + + group.SetExecutor(std::ref(executor)); + group.Submit([&]() -> Status { + ++call_count; + return IOError("first failure"); + }); + group.Submit([&]() -> Status { + ++call_count; + return ValidationFailed("second failure"); + }); + + auto status = std::move(group).Run(); + EXPECT_THAT(status, IsError(ErrorKind::kIOError)); + EXPECT_THAT(status, HasErrorMessage("Task group failed with 2 errors")); + EXPECT_THAT(status, HasErrorMessage("first failure")); + EXPECT_THAT(status, HasErrorMessage("second failure")); + EXPECT_EQ(call_count, 2); + EXPECT_EQ(executor.submit_count(), 2); +} + +TEST(TaskGroupTest, ParallelSubmitErrors) { + test::InlineExecutor executor(ServiceUnavailable("executor busy")); + TaskGroup<> group; + int call_count = 0; + + group.SetExecutor(std::ref(executor)); + group.Submit([&]() -> Status { + ++call_count; + return {}; + }); + group.Submit([&]() -> Status { + ++call_count; + return {}; + }); + + auto status = std::move(group).Run(); + EXPECT_THAT(status, IsError(ErrorKind::kServiceUnavailable)); + EXPECT_THAT(status, HasErrorMessage("Task group failed with 2 errors")); + EXPECT_THAT(status, HasErrorMessage("executor busy")); + EXPECT_EQ(call_count, 0); + EXPECT_EQ(executor.submit_count(), 2); +} + +TEST(TaskGroupTest, RetriesTasks) { + test::FakeRetryEnvironment fake_retry; + ScopedRetryTestHooks retry_hooks(fake_retry.hooks()); + TaskGroup group{FastRetryConfig()}; + int call_count = 0; + + group.Submit([&]() -> Status { + ++call_count; + if (call_count == 1) { + return IOError("transient read failure"); + } + return {}; + }); + + EXPECT_THAT(std::move(group).Run(), IsOk()); + EXPECT_EQ(call_count, 2); + EXPECT_EQ(fake_retry.sleep_durations(), + std::vector( + {test::FakeRetryEnvironment::Duration(1)})); +} + +TEST(TaskGroupTest, DefaultRetryConfig) { + TaskGroup group; + int call_count = 0; + + group.Submit([&]() -> Status { + ++call_count; + return {}; + }); + + EXPECT_THAT(std::move(group).Run(), IsOk()); + EXPECT_EQ(call_count, 1); +} + +TEST(TaskGroupTest, DoesNotRetryNotFound) { + TaskGroup group{FastRetryConfig()}; + int call_count = 0; + + group.Submit([&]() -> Status { + ++call_count; + return NotFound("missing manifest"); + }); + + EXPECT_THAT(std::move(group).Run(), IsError(ErrorKind::kNotFound)); + EXPECT_EQ(call_count, 1); +} + +TEST(TaskGroupTest, RetryUsesExecutor) { + test::FakeRetryEnvironment fake_retry; + ScopedRetryTestHooks retry_hooks(fake_retry.hooks()); + test::InlineExecutor executor; + TaskGroup group{FastRetryConfig()}; + int first_task_calls = 0; + int second_task_calls = 0; + + group.SetExecutor(std::ref(executor)); + group.Submit([&]() -> Status { + ++first_task_calls; + if (first_task_calls == 1) { + return ServiceUnavailable("server busy"); + } + return {}; + }); + group.Submit([&]() -> Status { + ++second_task_calls; + return {}; + }); + + EXPECT_THAT(std::move(group).Run(), IsOk()); + EXPECT_EQ(first_task_calls, 2); + EXPECT_EQ(second_task_calls, 1); + EXPECT_EQ(executor.submit_count(), 2); +} + +} // namespace iceberg diff --git a/src/iceberg/update/expire_snapshots.cc b/src/iceberg/update/expire_snapshots.cc index c9ac9e4cd..12107d872 100644 --- a/src/iceberg/update/expire_snapshots.cc +++ b/src/iceberg/update/expire_snapshots.cc @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -43,6 +44,7 @@ #include "iceberg/util/macros.h" #include "iceberg/util/snapshot_util_internal.h" #include "iceberg/util/string_util.h" +#include "iceberg/util/task_group.h" namespace iceberg { @@ -693,6 +695,11 @@ ExpireSnapshots& ExpireSnapshots::DeleteWith( return *this; } +ExpireSnapshots& ExpireSnapshots::PlanWith(Executor& executor) { + plan_executor_ = std::ref(executor); + return *this; +} + ExpireSnapshots& ExpireSnapshots::CleanupLevel(enum CleanupLevel level) { cleanup_level_ = level; return *this; @@ -865,20 +872,34 @@ Result ExpireSnapshots::Apply() { }); if (clean_expired_metadata_) { + std::vector> reachable_spec_ids(ids_to_retain.size()); + std::vector> reachable_schema_ids(ids_to_retain.size()); + TaskGroup<> metadata_tasks; + metadata_tasks.SetExecutor(plan_executor_); + for (auto&& [snapshot_id, spec_ids, schema_id] : + std::views::zip(ids_to_retain, reachable_spec_ids, reachable_schema_ids)) { + metadata_tasks.Submit([&]() -> Status { + ICEBERG_ASSIGN_OR_RAISE(auto snapshot, base.SnapshotById(snapshot_id)); + SnapshotCache snapshot_cache(snapshot.get()); + ICEBERG_ASSIGN_OR_RAISE(auto manifests, + snapshot_cache.Manifests(ctx_->table->io())); + for (const auto& manifest : manifests) { + spec_ids.insert(manifest.partition_spec_id); + } + schema_id = snapshot->schema_id; + return {}; + }); + } + ICEBERG_RETURN_UNEXPECTED(std::move(metadata_tasks).Run()); + std::unordered_set reachable_specs = {base.default_spec_id}; std::unordered_set reachable_schemas = {base.current_schema_id}; - // TODO(xiao.dong) parallel processing - for (int64_t snapshot_id : ids_to_retain) { - ICEBERG_ASSIGN_OR_RAISE(auto snapshot, base.SnapshotById(snapshot_id)); - SnapshotCache snapshot_cache(snapshot.get()); - ICEBERG_ASSIGN_OR_RAISE(auto manifests, - snapshot_cache.Manifests(ctx_->table->io())); - for (const auto& manifest : manifests) { - reachable_specs.insert(manifest.partition_spec_id); - } - if (snapshot->schema_id.has_value()) { - reachable_schemas.insert(snapshot->schema_id.value()); + for (auto&& [spec_ids, schema_id] : + std::views::zip(reachable_spec_ids, reachable_schema_ids)) { + reachable_specs.merge(spec_ids); + if (schema_id.has_value()) { + reachable_schemas.insert(schema_id.value()); } } diff --git a/src/iceberg/update/expire_snapshots.h b/src/iceberg/update/expire_snapshots.h index a5b6e3b32..8ed9d5b3c 100644 --- a/src/iceberg/update/expire_snapshots.h +++ b/src/iceberg/update/expire_snapshots.h @@ -32,6 +32,7 @@ #include "iceberg/result.h" #include "iceberg/type_fwd.h" #include "iceberg/update/pending_update.h" +#include "iceberg/util/executor.h" #include "iceberg/util/timepoint.h" /// \file iceberg/update/expire_snapshots.h @@ -119,6 +120,9 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate { /// \return Reference to this for method chaining. ExpireSnapshots& DeleteWith(std::function delete_func); + /// \brief Configure an executor for planning expired snapshot metadata. + ExpireSnapshots& PlanWith(Executor& executor); + /// \brief Configures the cleanup level for expired files. /// /// This method provides fine-grained control over which files are cleaned up during @@ -182,6 +186,7 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate { std::function delete_func_; std::vector snapshot_ids_to_expire_; enum CleanupLevel cleanup_level_ { CleanupLevel::kAll }; + OptionalExecutor plan_executor_; bool clean_expired_metadata_{false}; bool specified_snapshot_id_{false}; diff --git a/src/iceberg/update/snapshot_update.cc b/src/iceberg/update/snapshot_update.cc index a59ebdc72..4640bac00 100644 --- a/src/iceberg/update/snapshot_update.cc +++ b/src/iceberg/update/snapshot_update.cc @@ -30,11 +30,12 @@ #include "iceberg/manifest/manifest_writer.h" #include "iceberg/manifest/rolling_manifest_writer.h" #include "iceberg/partition_summary_internal.h" -#include "iceberg/table.h" +#include "iceberg/table.h" // IWYU pragma: keep #include "iceberg/transaction.h" #include "iceberg/util/macros.h" #include "iceberg/util/snapshot_util_internal.h" #include "iceberg/util/string_util.h" +#include "iceberg/util/task_group.h" #include "iceberg/util/uuid.h" namespace iceberg { @@ -163,7 +164,7 @@ SnapshotUpdate::SnapshotUpdate(std::shared_ptr ctx) target_manifest_size_bytes_( base().properties.Get(TableProperties::kManifestTargetSizeBytes)) {} -// TODO(xxx): write manifests in parallel +// TODO(xxx): Split files into independent rolling-writer groups before parallelizing. Result> SnapshotUpdate::WriteDataManifests( std::span> files, const std::shared_ptr& spec, @@ -190,7 +191,7 @@ Result> SnapshotUpdate::WriteDataManifests( return rolling_writer.ToManifestFiles(); } -// TODO(xxx): write manifests in parallel +// TODO(xxx): Split files into independent rolling-writer groups before parallelizing. Result> SnapshotUpdate::WriteDeleteManifests( std::span> files, const std::shared_ptr& spec) { @@ -250,13 +251,17 @@ Result SnapshotUpdate::Apply() { } ICEBERG_ASSIGN_OR_RAISE(auto manifests, Apply(base(), parent_snapshot)); + auto metadata_tasks = TaskGroup().SetExecutor(plan_executor_); for (auto& manifest : manifests) { if (manifest.added_snapshot_id != kInvalidSnapshotId) { continue; } - // TODO(xxx): read in parallel and cache enriched manifests for retries - ICEBERG_ASSIGN_OR_RAISE(manifest, AddMetadata(manifest, ctx_->table->io(), base())); + metadata_tasks.Submit([&manifest, this]() -> Status { + ICEBERG_ASSIGN_OR_RAISE(manifest, AddMetadata(manifest, ctx_->table->io(), base())); + return {}; + }); } + ICEBERG_RETURN_UNEXPECTED(std::move(metadata_tasks).Run()); std::string manifest_list_path = ManifestListPath(); manifest_lists_.push_back(manifest_list_path); diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h index f48e5f44d..3c193b1e5 100644 --- a/src/iceberg/update/snapshot_update.h +++ b/src/iceberg/update/snapshot_update.h @@ -33,6 +33,7 @@ #include "iceberg/snapshot.h" #include "iceberg/type_fwd.h" #include "iceberg/update/pending_update.h" +#include "iceberg/util/executor.h" namespace iceberg { @@ -77,6 +78,12 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { return self; } + /// \brief Configure an executor for manifest planning work. + auto& ScanManifestsWith(this auto& self, Executor& executor) { + self.plan_executor_ = std::ref(executor); + return self; + } + /// \brief Perform operations on a particular branch /// /// \param branch Which is name of SnapshotRef of type branch @@ -230,6 +237,7 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { std::vector manifest_lists_; const int64_t target_manifest_size_bytes_; std::optional snapshot_id_; + OptionalExecutor plan_executor_; bool stage_only_{false}; std::function delete_func_; std::string target_branch_{SnapshotRef::kMainBranch}; diff --git a/src/iceberg/util/executor.h b/src/iceberg/util/executor.h new file mode 100644 index 000000000..d7c8cb51a --- /dev/null +++ b/src/iceberg/util/executor.h @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/util/functional.h" + +namespace iceberg { + +using ExecutorTask = FnOnce; + +class ICEBERG_EXPORT Executor { + public: + virtual ~Executor() = default; + + /// \brief Schedule a task for execution. + virtual Status Submit(ExecutorTask task) = 0; +}; + +using OptionalExecutor = std::optional>; + +} // namespace iceberg diff --git a/src/iceberg/util/functional.h b/src/iceberg/util/functional.h new file mode 100644 index 000000000..c5ab2164b --- /dev/null +++ b/src/iceberg/util/functional.h @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Borrowed the file from Apache Arrow: +// https://github.com/apache/arrow/blob/main/cpp/src/arrow/util/functional.h + +#pragma once + +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" + +namespace iceberg { + +namespace internal { + +template +concept RvalueInvocable = std::constructible_from, Fn> && + std::move_constructible> && + std::is_invocable_r_v&&, Args...>; + +} // namespace internal + +template +class FnOnce; + +template +class ICEBERG_TEMPLATE_CLASS_EXPORT FnOnce { + public: + template + requires(!std::same_as, FnOnce> && + internal::RvalueInvocable) + explicit FnOnce(Fn&& fn) : impl_(std::make_unique>(std::forward(fn))) {} + + FnOnce(FnOnce&&) noexcept = default; + FnOnce& operator=(FnOnce&&) noexcept = default; + FnOnce(const FnOnce&) = delete; + FnOnce& operator=(const FnOnce&) = delete; + + R operator()(Args... args) && { + return std::move(*impl_).Invoke(std::forward(args)...); + } + + private: + struct Impl { + virtual ~Impl() = default; + virtual R Invoke(Args&&... args) && = 0; + }; + + template + struct ImplFor final : Impl { + explicit ImplFor(Fn&& fn) : fn_(std::forward(fn)) {} + R Invoke(Args&&... args) && override { + return std::invoke(std::move(fn_), std::forward(args)...); + } + std::remove_cvref_t fn_; + }; + + std::unique_ptr impl_; +}; + +} // namespace iceberg diff --git a/src/iceberg/util/lazy.h b/src/iceberg/util/lazy.h index be7bcd41d..b31cbc786 100644 --- a/src/iceberg/util/lazy.h +++ b/src/iceberg/util/lazy.h @@ -34,20 +34,14 @@ namespace iceberg { template class Lazy { - template - struct Trait; - template - struct Trait { - using ReturnType = R::value_type; - }; + static R ExtractReturnType(R (*)(Args...)); // only declaration, never defined - using T = Trait::ReturnType; + using T = ResultValueT; public: template - requires std::invocable && - std::same_as, Result> + requires std::invocable Result> Get(Args&&... args) const { std::call_once( flag_, [this, &args...]() { value_ = InitFunc(std::forward(args)...); }); diff --git a/src/iceberg/util/meson.build b/src/iceberg/util/meson.build index d70855016..f3a45e599 100644 --- a/src/iceberg/util/meson.build +++ b/src/iceberg/util/meson.build @@ -26,8 +26,10 @@ install_headers( 'decimal.h', 'endian.h', 'error_collector.h', + 'executor.h', 'formattable.h', 'formatter.h', + 'functional.h', 'int128.h', 'lazy.h', 'location_util.h', @@ -37,6 +39,7 @@ install_headers( 'retry_util.h', 'string_util.h', 'struct_like_set.h', + 'task_group.h', 'temporal_util.h', 'timepoint.h', 'transform_util.h', diff --git a/src/iceberg/util/retry_util.cc b/src/iceberg/util/retry_util.cc index d6e0d509e..63cfc61ee 100644 --- a/src/iceberg/util/retry_util.cc +++ b/src/iceberg/util/retry_util.cc @@ -31,6 +31,7 @@ #include "iceberg/util/retry_util_internal.h" namespace iceberg { + namespace { const RetryTestHooks*& ActiveRetryTestHooks() { @@ -79,7 +80,7 @@ void SetActiveRetryTestHooks(const RetryTestHooks* hooks) { ActiveRetryTestHooks() = hooks; } -Status RetryRunner::ValidateConfig() const { +Status detail::RetryRunnerBase::ValidateConfig() const { if (config_.num_retries < 0) { return InvalidArgument("num_retries must be non-negative, got {}", config_.num_retries); @@ -103,48 +104,24 @@ Status RetryRunner::ValidateConfig() const { return InvalidArgument("scale_factor must be finite and at least 1.0, got {}", config_.scale_factor); } - if (retry_policy_mode_ == RetryPolicyMode::kUnset) { - return InvalidArgument( - "Retry policy must be explicitly configured with OnlyRetryOn(...) or " - "StopRetryOn(...) when num_retries > 0"); - } - if (retry_error_kinds_.empty()) { - return InvalidArgument("Retry policy must include at least one error kind"); - } - return {}; } -std::optional RetryRunner::ComputeDeadline() const { +std::optional +detail::RetryRunnerBase::ComputeDeadline() const { if (config_.total_timeout_ms <= 0) { return std::nullopt; } return RetryNow() + Duration(config_.total_timeout_ms); } -bool RetryRunner::HasTimedOut(const std::optional& deadline) const { +bool detail::RetryRunnerBase::HasTimedOut( + const std::optional& deadline) const { return deadline.has_value() && RetryNow() >= *deadline; } -bool RetryRunner::ShouldRetry(ErrorKind kind) const { - const bool policy_contains_kind = std::ranges::contains(retry_error_kinds_, kind); - switch (retry_policy_mode_) { - case RetryPolicyMode::kOnlyRetryOn: - return policy_contains_kind; - case RetryPolicyMode::kStopRetryOn: - return !policy_contains_kind; - case RetryPolicyMode::kUnset: - return false; - } - return false; -} - -bool RetryRunner::CanRetry(ErrorKind kind, int32_t attempt, int32_t max_attempts, - const std::optional& deadline) const { - return attempt < max_attempts && !HasTimedOut(deadline) && ShouldRetry(kind); -} - -std::optional RetryRunner::RetryDelayWithinBudget( +std::optional +detail::RetryRunnerBase::RetryDelayWithinBudget( int32_t attempt, const std::optional& deadline) const { const auto delay = Duration(CalculateDelay(attempt)); if (!deadline.has_value()) { @@ -164,8 +141,8 @@ std::optional RetryRunner::RetryDelayWithinBudget( return delay; } -bool RetryRunner::WaitForNextAttempt(int32_t attempt, - const std::optional& deadline) const { +bool detail::RetryRunnerBase::WaitForNextAttempt( + int32_t attempt, const std::optional& deadline) const { const auto delay = RetryDelayWithinBudget(attempt, deadline); if (!delay.has_value()) { return false; @@ -175,7 +152,7 @@ bool RetryRunner::WaitForNextAttempt(int32_t attempt, return !HasTimedOut(deadline); } -int32_t RetryRunner::CalculateDelay(int32_t attempt) const { +int32_t detail::RetryRunnerBase::CalculateDelay(int32_t attempt) const { const double base_delay = config_.min_wait_ms * std::pow(config_.scale_factor, attempt - 1); const int32_t delay_ms = static_cast( diff --git a/src/iceberg/util/retry_util.h b/src/iceberg/util/retry_util.h index 83e2cd5c8..656213976 100644 --- a/src/iceberg/util/retry_util.h +++ b/src/iceberg/util/retry_util.h @@ -20,38 +20,22 @@ #pragma once #include -#include #include #include -#include #include #include #include -#include #include "iceberg/iceberg_export.h" #include "iceberg/result.h" +#include "iceberg/util/macros.h" namespace iceberg { namespace detail { -template -struct IsResult : std::false_type {}; - -template -struct IsResult> : std::true_type {}; - -template -concept ResultType = IsResult>::value; - template -concept RetryTask = requires(F& f) { - { std::invoke(f) } -> ResultType; -}; - -template -using RetryTaskResult = std::remove_cvref_t>; +concept RetryTask = AsResult>; } // namespace detail @@ -69,76 +53,104 @@ struct ICEBERG_EXPORT RetryConfig { double scale_factor = 2.0; }; -/// \brief Utility class for running tasks with retry logic -/// -/// When retries are enabled (`num_retries > 0`), callers must explicitly configure -/// retry policy with `OnlyRetryOn(...)` or `StopRetryOn(...)`. -class ICEBERG_EXPORT RetryRunner { - public: - /// \brief Construct a RetryRunner with the given configuration - explicit RetryRunner(RetryConfig config = {}) : config_(std::move(config)) {} +namespace detail { - /// \brief Specify error types that should trigger a retry. - /// - /// When set, only errors matching one of these kinds will be retried. - /// All other errors will stop retries immediately. - /// - /// \note OnlyRetryOn takes priority over StopRetryOn. If OnlyRetryOn is set, - /// StopRetryOn is ignored. - RetryRunner& OnlyRetryOn(std::initializer_list error_kinds) { - retry_policy_mode_ = RetryPolicyMode::kOnlyRetryOn; - retry_error_kinds_ = std::vector(error_kinds); - return *this; - } +class ICEBERG_EXPORT RetryRunnerBase { + protected: + explicit RetryRunnerBase(RetryConfig config) : config_(std::move(config)) {} - /// \brief Specify a single error type that should trigger a retry. - /// - /// \note OnlyRetryOn takes priority over StopRetryOn. If OnlyRetryOn is set, - /// StopRetryOn is ignored. - RetryRunner& OnlyRetryOn(ErrorKind error_kind) { return OnlyRetryOn({error_kind}); } + using Clock = std::chrono::steady_clock; + using Duration = std::chrono::milliseconds; + using TimePoint = Clock::time_point; - /// \brief Specify error types that should stop retries immediately. - /// - /// When set, errors matching one of these kinds will not be retried. - /// All other errors will be retried. - /// - /// \note OnlyRetryOn takes priority over StopRetryOn. If OnlyRetryOn is set, - /// StopRetryOn is ignored. - RetryRunner& StopRetryOn(std::initializer_list error_kinds) { - if (retry_policy_mode_ == RetryPolicyMode::kOnlyRetryOn) { - return *this; - } + /// \brief Validate retry counts and timing bounds. + Status ValidateConfig() const; + std::optional ComputeDeadline() const; + bool HasTimedOut(const std::optional& deadline) const; + std::optional RetryDelayWithinBudget( + int32_t attempt, const std::optional& deadline) const; + bool WaitForNextAttempt(int32_t attempt, + const std::optional& deadline) const; + /// \brief Calculate delay with exponential backoff and jitter + int32_t CalculateDelay(int32_t attempt) const; + + RetryConfig config_; +}; + +} // namespace detail + +namespace retry { + +enum class RetryPolicyMode { + kNoRetry, + kOnlyRetryOn, + kStopRetryOn, +}; - retry_policy_mode_ = RetryPolicyMode::kStopRetryOn; - retry_error_kinds_ = std::vector(error_kinds); - return *this; +template +struct RetryPolicy { + static_assert(Mode != RetryPolicyMode::kNoRetry || sizeof...(Kinds) == 0, + "NoRetry must not include error kinds"); + static_assert(Mode == RetryPolicyMode::kNoRetry || sizeof...(Kinds) > 0, + "RetryPolicy must include at least one error kind"); + + static constexpr RetryPolicyMode kMode = Mode; + static constexpr bool kEnabled = Mode != RetryPolicyMode::kNoRetry; + + static constexpr bool ShouldRetry(ErrorKind kind) { + if constexpr (Mode == RetryPolicyMode::kNoRetry) { + return false; + } else if constexpr (Mode == RetryPolicyMode::kOnlyRetryOn) { + return ((kind == Kinds) || ...); + } else { + return !((kind == Kinds) || ...); + } } +}; - /// \brief Specify a single error type that should stop retries immediately. - /// - /// \note OnlyRetryOn takes priority over StopRetryOn. If OnlyRetryOn is set, - /// StopRetryOn is ignored. - RetryRunner& StopRetryOn(ErrorKind error_kind) { return StopRetryOn({error_kind}); } +using NoRetry = RetryPolicy; + +template +using OnlyRetryOn = RetryPolicy; + +template +using StopRetryOn = RetryPolicy; + +template +inline constexpr bool kIsRetryPolicy = false; + +template +inline constexpr bool kIsRetryPolicy> = true; + +template +concept Policy = kIsRetryPolicy>; + +} // namespace retry + +/// \brief Utility class for running tasks with retry logic +/// +/// When retries are enabled (`num_retries > 0`), RetryPolicy must be an enabled +/// policy such as `retry::OnlyRetryOn<...>` or `retry::StopRetryOn<...>`. +template +class RetryRunner : private detail::RetryRunnerBase { + public: + /// \brief Construct a RetryRunner with the given configuration + explicit RetryRunner(RetryConfig config = {}) + : detail::RetryRunnerBase(std::move(config)) {} /// \brief Run a task that returns a Result /// - /// When `num_retries > 0`, the retry policy must be configured explicitly via - /// `OnlyRetryOn(...)` or `StopRetryOn(...)`. + /// When `num_retries > 0`, RetryPolicy must allow retrying matching errors. /// /// TODO: Replace attempt_counter with a metrics reporter once it is available. - template - requires detail::RetryTask - auto Run(F&& task, int32_t* attempt_counter = nullptr) -> detail::RetryTaskResult { - using TaskResult = detail::RetryTaskResult; - - const auto validation = ValidateConfig(); - if (!validation.has_value()) { - return TaskResult(std::unexpected(validation.error())); - } + template + auto Run(F&& task, int32_t* attempt_counter = nullptr) + -> std::remove_cvref_t> { + ICEBERG_RETURN_UNEXPECTED(ValidatePolicyConfig()); - const auto deadline = ComputeDeadline(); + const auto deadline = this->ComputeDeadline(); int32_t attempt = 0; - const int32_t max_attempts = config_.num_retries + 1; + const int32_t max_attempts = this->config_.num_retries + 1; while (true) { ++attempt; @@ -155,57 +167,42 @@ class ICEBERG_EXPORT RetryRunner { return result; } - if (!WaitForNextAttempt(attempt, deadline)) { + if (!this->WaitForNextAttempt(attempt, deadline)) { return result; } } } private: - enum class RetryPolicyMode { - // No retry policy was selected; invalid when retries are enabled. - kUnset, - // Retry only errors listed in retry_error_kinds_. - kOnlyRetryOn, - // Retry all errors except those listed in retry_error_kinds_. - kStopRetryOn, - }; + using TimePoint = detail::RetryRunnerBase::TimePoint; - using Clock = std::chrono::steady_clock; - using Duration = std::chrono::milliseconds; - using TimePoint = Clock::time_point; - - /// \brief Validate retry counts, timing bounds, and the selected retry policy. - Status ValidateConfig() const; - std::optional ComputeDeadline() const; - bool HasTimedOut(const std::optional& deadline) const; + Status ValidatePolicyConfig() const { + auto validation = this->ValidateConfig(); + if (!validation.has_value()) { + return validation; + } + if (this->config_.num_retries > 0 && !RetryPolicy::kEnabled) { + return InvalidArgument("Retry policy must be enabled when num_retries > 0"); + } + return {}; + } - /// \brief Check if the given error kind should trigger a retry. - bool ShouldRetry(ErrorKind kind) const; bool CanRetry(ErrorKind kind, int32_t attempt, int32_t max_attempts, - const std::optional& deadline) const; - std::optional RetryDelayWithinBudget( - int32_t attempt, const std::optional& deadline) const; - bool WaitForNextAttempt(int32_t attempt, - const std::optional& deadline) const; - /// \brief Calculate delay with exponential backoff and jitter - int32_t CalculateDelay(int32_t attempt) const; - - RetryConfig config_; - RetryPolicyMode retry_policy_mode_ = RetryPolicyMode::kUnset; - std::vector retry_error_kinds_; + const std::optional& deadline) const { + return attempt < max_attempts && !this->HasTimedOut(deadline) && + RetryPolicy::ShouldRetry(kind); + } }; /// \brief Helper function to create a RetryRunner with table commit configuration -ICEBERG_EXPORT inline RetryRunner MakeCommitRetryRunner(int32_t num_retries, - int32_t min_wait_ms, - int32_t max_wait_ms, - int32_t total_timeout_ms) { - return RetryRunner(RetryConfig{.num_retries = num_retries, - .min_wait_ms = min_wait_ms, - .max_wait_ms = max_wait_ms, - .total_timeout_ms = total_timeout_ms}) - .OnlyRetryOn(ErrorKind::kCommitFailed); +ICEBERG_EXPORT inline auto MakeCommitRetryRunner(int32_t num_retries, int32_t min_wait_ms, + int32_t max_wait_ms, + int32_t total_timeout_ms) { + return RetryRunner>( + RetryConfig{.num_retries = num_retries, + .min_wait_ms = min_wait_ms, + .max_wait_ms = max_wait_ms, + .total_timeout_ms = total_timeout_ms}); } } // namespace iceberg diff --git a/src/iceberg/util/task_group.cc b/src/iceberg/util/task_group.cc new file mode 100644 index 000000000..cbac305fd --- /dev/null +++ b/src/iceberg/util/task_group.cc @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/util/task_group.h" + +#include +#include +#include +#include + +#include "iceberg/util/macros.h" + +namespace iceberg::internal { + +namespace { + +Status AggregateTaskErrors(std::vector errors) { + if (errors.empty()) { + return {}; + } + if (errors.size() == 1) { + return std::unexpected(std::move(errors.front())); + } + + ErrorKind kind = errors.front().kind; + std::string message = std::format("Task group failed with {} errors:", errors.size()); + for (const auto& error : errors) { + message += std::format("\n - {}", error.message); + } + return std::unexpected(Error{.kind = kind, .message = std::move(message)}); +} + +Result> SubmitTask(Executor& executor, FnOnce task) { + std::promise promise; + auto future = promise.get_future(); + + ExecutorTask executor_task( + [promise = std::move(promise), task = std::move(task)]() mutable { + promise.set_value(std::move(task)()); + }); + + ICEBERG_RETURN_UNEXPECTED(executor.Submit(std::move(executor_task))); + + return future; +} + +} // namespace + +Status RunTasksSingleThreaded(std::vector> tasks) { + std::vector errors; + for (auto& task : tasks) { + auto status = std::move(task)(); + if (!status.has_value()) { + errors.push_back(std::move(status.error())); + } + } + return AggregateTaskErrors(std::move(errors)); +} + +Status RunTasksParallel(Executor& executor, std::vector> tasks) { + std::vector> futures; + futures.reserve(tasks.size()); + + std::vector errors; + for (auto& task : tasks) { + auto future = SubmitTask(executor, std::move(task)); + if (!future.has_value()) { + errors.push_back(std::move(future.error())); + continue; + } + futures.push_back(std::move(future.value())); + } + + for (auto& future : futures) { + auto status = future.get(); + if (!status.has_value()) { + errors.push_back(std::move(status.error())); + } + } + + return AggregateTaskErrors(std::move(errors)); +} + +} // namespace iceberg::internal diff --git a/src/iceberg/util/task_group.h b/src/iceberg/util/task_group.h new file mode 100644 index 000000000..15fd99ac1 --- /dev/null +++ b/src/iceberg/util/task_group.h @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/util/executor.h" +#include "iceberg/util/functional.h" +#include "iceberg/util/retry_util.h" + +namespace iceberg { + +namespace internal { + +template +concept OnceStatusTask = RvalueInvocable; + +template +concept RetryableStatusTask = + OnceStatusTask && std::copy_constructible>; + +ICEBERG_EXPORT Status RunTasksSingleThreaded(std::vector> tasks); + +ICEBERG_EXPORT Status RunTasksParallel(Executor& executor, + std::vector> tasks); + +} // namespace internal + +template +class ICEBERG_TEMPLATE_CLASS_EXPORT TaskGroup { + private: + static constexpr bool kRetryEnabled = !std::same_as; + + struct Empty {}; + + using RetryConfigStorage = std::conditional_t; + + public: + TaskGroup() = default; + + explicit TaskGroup(RetryConfig retry_config) + requires(kRetryEnabled) + : retry_config_(std::move(retry_config)) {} + + auto&& SetExecutor(this auto&& self, OptionalExecutor executor) { + self.executor_ = std::move(executor); + return std::forward(self); + } + + template + requires((!kRetryEnabled && internal::OnceStatusTask) || + (kRetryEnabled && internal::RetryableStatusTask)) + auto&& Submit(this auto&& self, F&& task) { + self.tasks_.emplace_back([&] { + if constexpr (!kRetryEnabled) { + return std::forward(task); + } else { + return [retry_config = self.retry_config_, + task = std::forward(task)]() -> Status { + return RetryRunner(retry_config).Run([&task]() -> Status { + auto attempt_task = task; + return std::invoke(std::move(attempt_task)); + }); + }; + } + }()); + return std::forward(self); + } + + Status Run() && { + if (!executor_.has_value()) { + return internal::RunTasksSingleThreaded(std::move(tasks_)); + } + return internal::RunTasksParallel(executor_->get(), std::move(tasks_)); + } + + private: + std::vector> tasks_; + OptionalExecutor executor_; + [[no_unique_address]] RetryConfigStorage retry_config_; +}; + +} // namespace iceberg From 669410576c3bd3172b1189a874cff93d8d78471a Mon Sep 17 00:00:00 2001 From: Zehua Zou Date: Fri, 29 May 2026 01:32:35 +0800 Subject: [PATCH 2/3] fix factory thread safe and adjust concept --- src/iceberg/table_scan.h | 4 +- src/iceberg/test/executor.h | 25 ++-- src/iceberg/test/fast_append_test.cc | 58 ++++++++- .../test/manifest_filter_manager_test.cc | 35 +++++- src/iceberg/test/manifest_group_test.cc | 2 +- .../test/manifest_merge_manager_test.cc | 30 +++++ src/iceberg/test/table_scan_test.cc | 2 +- src/iceberg/test/task_group_test.cc | 116 +++++++++++++----- src/iceberg/update/expire_snapshots.h | 4 + src/iceberg/update/snapshot_update.cc | 6 +- src/iceberg/update/snapshot_update.h | 15 ++- src/iceberg/util/task_group.h | 16 +-- 12 files changed, 254 insertions(+), 59 deletions(-) diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index 21561d46d..a4e3293f4 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -306,7 +306,9 @@ class ICEBERG_TEMPLATE_CLASS_EXPORT TableScanBuilder : public ErrorCollector { /// \brief Configure an executor for manifest planning. /// - /// The executor is borrowed and must outlive the built scan. + /// The executor is borrowed and must outlive the built scan. Planning callbacks may + /// be called concurrently; callers must synchronize shared mutable state captured by + /// those callbacks. TableScanBuilder& PlanWith(Executor& executor); /// \brief Request this scan to use the given snapshot by ID. diff --git a/src/iceberg/test/executor.h b/src/iceberg/test/executor.h index 45ec9d8e3..5b4de65b0 100644 --- a/src/iceberg/test/executor.h +++ b/src/iceberg/test/executor.h @@ -19,33 +19,44 @@ #pragma once +#include +#include #include +#include #include "iceberg/result.h" #include "iceberg/util/executor.h" namespace iceberg::test { -class InlineExecutor final : public Executor { +class ThreadExecutor final : public Executor { public: - explicit InlineExecutor(Status submit_status = {}) + explicit ThreadExecutor(Status submit_status = {}) : submit_status_(std::move(submit_status)) {} + ~ThreadExecutor() override { + for (auto& thread : threads_) { + if (thread.joinable()) { + thread.join(); + } + } + } + Status Submit(ExecutorTask task) override { - ++submit_count_; + submit_count_.fetch_add(1, std::memory_order_relaxed); if (!submit_status_.has_value()) { return std::unexpected(submit_status_.error()); } - - std::move(task)(); + threads_.emplace_back(std::move(task)); return {}; } - int submit_count() const { return submit_count_; } + int submit_count() const { return submit_count_.load(std::memory_order_relaxed); } private: Status submit_status_; - int submit_count_ = 0; + std::atomic submit_count_{0}; + std::vector threads_; }; } // namespace iceberg::test diff --git a/src/iceberg/test/fast_append_test.cc b/src/iceberg/test/fast_append_test.cc index 6c77fad16..57c41d66c 100644 --- a/src/iceberg/test/fast_append_test.cc +++ b/src/iceberg/test/fast_append_test.cc @@ -20,6 +20,8 @@ #include "iceberg/update/fast_append.h" #include +#include +#include #include #include @@ -29,12 +31,31 @@ #include "iceberg/schema.h" #include "iceberg/table_metadata.h" #include "iceberg/test/matchers.h" -#include "iceberg/test/test_resource.h" #include "iceberg/test/update_test_base.h" -#include "iceberg/util/uuid.h" +#include "iceberg/transaction.h" namespace iceberg { +namespace { + +class TestSnapshotUpdate : public SnapshotUpdate { + public: + explicit TestSnapshotUpdate(std::shared_ptr ctx) + : SnapshotUpdate(std::move(ctx)) {} + + using SnapshotUpdate::ManifestPath; + + void CleanUncommitted(const std::unordered_set&) override {} + std::string operation() override { return "test"; } + Result> Apply(const TableMetadata&, + const std::shared_ptr&) override { + return std::vector{}; + } + std::unordered_map Summary() override { return {}; } +}; + +} // namespace + class FastAppendTest : public UpdateTestBase { protected: static void SetUpTestSuite() { avro::RegisterAll(); } @@ -78,6 +99,8 @@ class FastAppendTest : public UpdateTestBase { std::shared_ptr file_b_; }; +class SnapshotUpdateTest : public UpdateTestBase {}; + TEST_F(FastAppendTest, AppendDataFile) { std::shared_ptr fast_append; ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend()); @@ -188,4 +211,35 @@ TEST_F(FastAppendTest, SetSnapshotProperty) { EXPECT_EQ(snapshot->summary.at("custom-property"), "custom-value"); } +TEST_F(SnapshotUpdateTest, ConcurrentManifestPaths) { + ICEBERG_UNWRAP_OR_FAIL(auto ctx, + TransactionContext::Make(table_, TransactionKind::kUpdate)); + TestSnapshotUpdate update(std::move(ctx)); + + constexpr int kThreadCount = 8; + constexpr int kPathsPerThread = 32; + std::vector paths(kThreadCount * kPathsPerThread); + std::vector threads; + threads.reserve(kThreadCount); + + for (int thread_index = 0; thread_index < kThreadCount; ++thread_index) { + threads.emplace_back([&, thread_index] { + for (int path_index = 0; path_index < kPathsPerThread; ++path_index) { + paths[thread_index * kPathsPerThread + path_index] = update.ManifestPath(); + } + }); + } + + for (auto& thread : threads) { + thread.join(); + } + + std::unordered_set unique_paths(paths.begin(), paths.end()); + ASSERT_EQ(unique_paths.size(), paths.size()); + for (const auto& path : paths) { + EXPECT_THAT(path, ::testing::HasSubstr("/metadata/")); + EXPECT_THAT(path, ::testing::HasSubstr("-m")); + } +} + } // namespace iceberg diff --git a/src/iceberg/test/manifest_filter_manager_test.cc b/src/iceberg/test/manifest_filter_manager_test.cc index 7810509fa..75d99ceb0 100644 --- a/src/iceberg/test/manifest_filter_manager_test.cc +++ b/src/iceberg/test/manifest_filter_manager_test.cc @@ -19,6 +19,8 @@ #include "iceberg/manifest/manifest_filter_manager.h" +#include +#include #include #include #include @@ -27,7 +29,6 @@ #include #include "iceberg/avro/avro_register.h" -#include "iceberg/expression/expression.h" #include "iceberg/expression/expressions.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/manifest/manifest_reader.h" @@ -36,11 +37,11 @@ #include "iceberg/result.h" #include "iceberg/row/partition_values.h" #include "iceberg/schema.h" -#include "iceberg/table.h" #include "iceberg/table_metadata.h" +#include "iceberg/test/executor.h" #include "iceberg/test/matchers.h" #include "iceberg/test/update_test_base.h" -#include "iceberg/update/fast_append.h" +#include "iceberg/update/fast_append.h" // IWYU pragma: keep #include "iceberg/util/macros.h" namespace iceberg { @@ -379,4 +380,32 @@ TEST_F(ManifestFilterManagerTest, MultipleRowFiltersUseCombinedExpression) { EXPECT_EQ(entries[0].status, ManifestStatus::kDeleted); } +TEST_F(ManifestFilterManagerTest, ConcurrentWriterFactory) { + ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot, CommitFiles({file_a_})); + ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_b_})); + auto* metadata = table_->metadata().get(); + test::ThreadExecutor executor; + std::atomic path_counter = 0; + std::barrier<> factory_barrier(2); + + ManifestWriterFactory factory = + [&, fv = metadata->format_version]( + int32_t spec_id, + ManifestContent content) -> Result> { + factory_barrier.arrive_and_wait(); + + ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata->PartitionSpecById(spec_id)); + ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata->Schema()); + auto path = std::format("{}/metadata/parallel-flt-{}.avro", table_location_, + path_counter.fetch_add(1)); + return ManifestWriter::MakeWriter(fv, kTestSnapshotId, path, file_io_, spec, schema, + content); + }; + + ManifestFilterManager mgr(ManifestContent::kData, file_io_); + mgr.PlanWith(std::ref(executor)); + ASSERT_THAT(mgr.DeleteByRowFilter(Expressions::AlwaysTrue()), IsOk()); + EXPECT_THAT(mgr.FilterManifests(*metadata, snap, factory), IsOk()); +} + } // namespace iceberg diff --git a/src/iceberg/test/manifest_group_test.cc b/src/iceberg/test/manifest_group_test.cc index 1cb0a64d1..b3026ab4a 100644 --- a/src/iceberg/test/manifest_group_test.cc +++ b/src/iceberg/test/manifest_group_test.cc @@ -358,7 +358,7 @@ TEST_P(ManifestGroupTest, PlanWithExecutor) { auto group, ManifestGroup::Make(file_io_, schema_, GetSpecsById(), std::move(manifests))); - test::InlineExecutor executor; + test::ThreadExecutor executor; group->PlanWith(std::ref(executor)); ICEBERG_UNWRAP_OR_FAIL(auto tasks, group->PlanFiles()); diff --git a/src/iceberg/test/manifest_merge_manager_test.cc b/src/iceberg/test/manifest_merge_manager_test.cc index b19eace86..005c06980 100644 --- a/src/iceberg/test/manifest_merge_manager_test.cc +++ b/src/iceberg/test/manifest_merge_manager_test.cc @@ -19,6 +19,8 @@ #include "iceberg/manifest/manifest_merge_manager.h" +#include +#include #include #include #include @@ -41,6 +43,7 @@ #include "iceberg/schema_field.h" #include "iceberg/sort_order.h" #include "iceberg/table_metadata.h" +#include "iceberg/test/executor.h" #include "iceberg/test/matchers.h" #include "iceberg/transform.h" #include "iceberg/type.h" @@ -352,4 +355,31 @@ TEST_F(ManifestMergeManagerTest, PackEndOlderManifestsMergedNotNewest) { EXPECT_EQ(count, 3); } +TEST_F(ManifestMergeManagerTest, ConcurrentWriterFactory) { + ICEBERG_UNWRAP_OR_FAIL(auto m0, WriteManifest(kSpecId0, 1, /*size=*/100)); + ICEBERG_UNWRAP_OR_FAIL(auto m1, WriteManifest(kSpecId0, 1, /*size=*/100)); + ICEBERG_UNWRAP_OR_FAIL(auto m2, WriteManifest(kSpecId0, 1, /*size=*/100)); + ICEBERG_UNWRAP_OR_FAIL(auto m3, WriteManifest(kSpecId0, 1, /*size=*/100)); + + test::ThreadExecutor executor; + std::atomic path_counter = 0; + std::barrier<> factory_barrier(2); + ManifestWriterFactory factory = + [&](int32_t spec_id, + ManifestContent content) -> Result> { + factory_barrier.arrive_and_wait(); + + auto spec = spec_id == kSpecId0 ? spec0_ : spec1_; + auto path = std::format("parallel-merged-{}.avro", path_counter.fetch_add(1)); + return ManifestWriter::MakeWriter(kFormatVersion, kSnapshotId, path, file_io_, spec, + schema_, content); + }; + + ManifestMergeManager mgr(/*target=*/250, /*min_count=*/2, /*enabled=*/true); + mgr.PlanWith(std::ref(executor)); + EXPECT_THAT( + mgr.MergeManifests({m2, m3}, {m0, m1}, kSnapshotId, *metadata_, file_io_, factory), + IsOk()); +} + } // namespace iceberg diff --git a/src/iceberg/test/table_scan_test.cc b/src/iceberg/test/table_scan_test.cc index c0863e80d..3ac877fcc 100644 --- a/src/iceberg/test/table_scan_test.cc +++ b/src/iceberg/test/table_scan_test.cc @@ -456,7 +456,7 @@ TEST_P(TableScanTest, PlanWithExecutor) { .retention = SnapshotRef::Branch{}, })}}}); - test::InlineExecutor executor; + test::ThreadExecutor executor; ICEBERG_UNWRAP_OR_FAIL(auto builder, DataTableScanBuilder::Make(metadata_with_manifests, file_io_)); builder->PlanWith(executor); diff --git a/src/iceberg/test/task_group_test.cc b/src/iceberg/test/task_group_test.cc index a14f01827..2f0da3de2 100644 --- a/src/iceberg/test/task_group_test.cc +++ b/src/iceberg/test/task_group_test.cc @@ -19,6 +19,7 @@ #include "iceberg/util/task_group.h" +#include #include #include #include @@ -47,10 +48,15 @@ RetryConfig FastRetryConfig(int32_t num_retries = 2) { } // namespace TEST(TaskGroupCompileTest, TaskConcepts) { - auto move_only_once_lambda = [value = std::make_unique(1)]() mutable -> Status { - return {}; + auto move_only_mutable_lambda = [value = std::make_unique(1)]() mutable -> Status { + return *value == 1 ? Status{} : IOError("unexpected value"); + }; + using MoveOnlyMutableLambda = decltype(move_only_mutable_lambda); + + auto move_only_const_lambda = [value = std::make_unique(1)]() -> Status { + return *value == 1 ? Status{} : IOError("unexpected value"); }; - using MoveOnlyOnceLambda = decltype(move_only_once_lambda); + using MoveOnlyConstLambda = decltype(move_only_const_lambda); auto copyable_mutable_lambda = [attempt = 0]() mutable -> Status { return ++attempt > 0 ? Status{} : Status{}; @@ -61,13 +67,23 @@ TEST(TaskGroupCompileTest, TaskConcepts) { static_assert(!std::default_initializable>); static_assert(std::move_constructible>); - static_assert(internal::OnceStatusTask); - static_assert(!internal::OnceStatusTask); + static_assert(internal::OnceStatusTask); + static_assert(!internal::OnceStatusTask); + static_assert(internal::OnceStatusTask); + static_assert(!internal::OnceStatusTask); + static_assert(internal::OnceStatusTask); static_assert(internal::OnceStatusTask); + static_assert(!internal::RepeatableStatusTask); + static_assert(internal::RepeatableStatusTask); + static_assert(internal::RepeatableStatusTask); + + static_assert(!internal::RetryableStatusTask); + static_assert(!internal::RetryableStatusTask); + static_assert(internal::RetryableStatusTask); + static_assert(!internal::RetryableStatusTask); static_assert(internal::RetryableStatusTask); static_assert(internal::RetryableStatusTask); - static_assert(!internal::RetryableStatusTask); } TEST(FnOnceTest, SupportsMoveOnlyCapture) { @@ -78,7 +94,7 @@ TEST(FnOnceTest, SupportsMoveOnlyCapture) { } TEST(TaskGroupTest, UsesExecutor) { - test::InlineExecutor executor; + test::ThreadExecutor executor; TaskGroup<> group; bool ran = false; @@ -94,7 +110,7 @@ TEST(TaskGroupTest, UsesExecutor) { } TEST(TaskGroupTest, ReturnsSubmitError) { - test::InlineExecutor executor(ServiceUnavailable("executor busy")); + test::ThreadExecutor executor(ServiceUnavailable("executor busy")); TaskGroup<> group; group.SetExecutor(std::ref(executor)); @@ -119,7 +135,7 @@ TEST(TaskGroupTest, DirectMoveOnlyTask) { } TEST(TaskGroupTest, ClearsExecutor) { - test::InlineExecutor executor; + test::ThreadExecutor executor; TaskGroup<> group; int call_count = 0; @@ -136,11 +152,9 @@ TEST(TaskGroupTest, ClearsExecutor) { } TEST(TaskGroupTest, FluentSubmit) { - test::InlineExecutor executor; int call_count = 0; auto status = TaskGroup<>() - .SetExecutor(std::ref(executor)) .Submit([&]() -> Status { ++call_count; return {}; @@ -153,7 +167,6 @@ TEST(TaskGroupTest, FluentSubmit) { EXPECT_THAT(status, IsOk()); EXPECT_EQ(call_count, 2); - EXPECT_EQ(executor.submit_count(), 2); } TEST(TaskGroupTest, DirectAggregatesErrors) { @@ -178,37 +191,37 @@ TEST(TaskGroupTest, DirectAggregatesErrors) { } TEST(TaskGroupTest, ParallelSubmitsAll) { - test::InlineExecutor executor; + test::ThreadExecutor executor; TaskGroup<> group; - int call_count = 0; + std::atomic call_count = 0; group.SetExecutor(std::ref(executor)); group.Submit([&]() -> Status { - ++call_count; + call_count.fetch_add(1, std::memory_order_relaxed); return {}; }); group.Submit([&]() -> Status { - ++call_count; + call_count.fetch_add(1, std::memory_order_relaxed); return {}; }); EXPECT_THAT(std::move(group).Run(), IsOk()); - EXPECT_EQ(call_count, 2); + EXPECT_EQ(call_count.load(std::memory_order_relaxed), 2); EXPECT_EQ(executor.submit_count(), 2); } TEST(TaskGroupTest, ParallelAggregatesErrors) { - test::InlineExecutor executor; + test::ThreadExecutor executor; TaskGroup<> group; - int call_count = 0; + std::atomic call_count = 0; group.SetExecutor(std::ref(executor)); group.Submit([&]() -> Status { - ++call_count; + call_count.fetch_add(1, std::memory_order_relaxed); return IOError("first failure"); }); group.Submit([&]() -> Status { - ++call_count; + call_count.fetch_add(1, std::memory_order_relaxed); return ValidationFailed("second failure"); }); @@ -217,12 +230,12 @@ TEST(TaskGroupTest, ParallelAggregatesErrors) { EXPECT_THAT(status, HasErrorMessage("Task group failed with 2 errors")); EXPECT_THAT(status, HasErrorMessage("first failure")); EXPECT_THAT(status, HasErrorMessage("second failure")); - EXPECT_EQ(call_count, 2); + EXPECT_EQ(call_count.load(std::memory_order_relaxed), 2); EXPECT_EQ(executor.submit_count(), 2); } TEST(TaskGroupTest, ParallelSubmitErrors) { - test::InlineExecutor executor(ServiceUnavailable("executor busy")); + test::ThreadExecutor executor(ServiceUnavailable("executor busy")); TaskGroup<> group; int call_count = 0; @@ -265,6 +278,47 @@ TEST(TaskGroupTest, RetriesTasks) { {test::FakeRetryEnvironment::Duration(1)})); } +TEST(TaskGroupTest, RetryReusesTaskState) { + test::FakeRetryEnvironment fake_retry; + ScopedRetryTestHooks retry_hooks(fake_retry.hooks()); + TaskGroup group{FastRetryConfig()}; + + group.Submit([attempt = 0]() mutable -> Status { + ++attempt; + if (attempt == 1) { + return IOError("transient read failure"); + } + return {}; + }); + + EXPECT_THAT(std::move(group).Run(), IsOk()); + EXPECT_EQ(fake_retry.sleep_durations(), + std::vector( + {test::FakeRetryEnvironment::Duration(1)})); +} + +TEST(TaskGroupTest, RetryAcceptsMoveOnlyRepeatableTask) { + test::FakeRetryEnvironment fake_retry; + ScopedRetryTestHooks retry_hooks(fake_retry.hooks()); + TaskGroup group{FastRetryConfig()}; + int call_count = 0; + auto value = std::make_unique(7); + + group.Submit([value = std::move(value), &call_count]() -> Status { + ++call_count; + if (call_count == 1) { + return IOError("transient read failure"); + } + return *value == 7 ? Status{} : IOError("unexpected value"); + }); + + EXPECT_THAT(std::move(group).Run(), IsOk()); + EXPECT_EQ(call_count, 2); + EXPECT_EQ(fake_retry.sleep_durations(), + std::vector( + {test::FakeRetryEnvironment::Duration(1)})); +} + TEST(TaskGroupTest, DefaultRetryConfig) { TaskGroup group; int call_count = 0; @@ -294,27 +348,27 @@ TEST(TaskGroupTest, DoesNotRetryNotFound) { TEST(TaskGroupTest, RetryUsesExecutor) { test::FakeRetryEnvironment fake_retry; ScopedRetryTestHooks retry_hooks(fake_retry.hooks()); - test::InlineExecutor executor; + test::ThreadExecutor executor; TaskGroup group{FastRetryConfig()}; - int first_task_calls = 0; - int second_task_calls = 0; + std::atomic first_task_calls = 0; + std::atomic second_task_calls = 0; group.SetExecutor(std::ref(executor)); group.Submit([&]() -> Status { - ++first_task_calls; - if (first_task_calls == 1) { + auto call_count = first_task_calls.fetch_add(1, std::memory_order_relaxed) + 1; + if (call_count == 1) { return ServiceUnavailable("server busy"); } return {}; }); group.Submit([&]() -> Status { - ++second_task_calls; + second_task_calls.fetch_add(1, std::memory_order_relaxed); return {}; }); EXPECT_THAT(std::move(group).Run(), IsOk()); - EXPECT_EQ(first_task_calls, 2); - EXPECT_EQ(second_task_calls, 1); + EXPECT_EQ(first_task_calls.load(std::memory_order_relaxed), 2); + EXPECT_EQ(second_task_calls.load(std::memory_order_relaxed), 1); EXPECT_EQ(executor.submit_count(), 2); } diff --git a/src/iceberg/update/expire_snapshots.h b/src/iceberg/update/expire_snapshots.h index 8ed9d5b3c..68c82d071 100644 --- a/src/iceberg/update/expire_snapshots.h +++ b/src/iceberg/update/expire_snapshots.h @@ -121,6 +121,10 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate { ExpireSnapshots& DeleteWith(std::function delete_func); /// \brief Configure an executor for planning expired snapshot metadata. + /// + /// The executor is borrowed and must outlive this update. Planning callbacks may be + /// called concurrently; callers must synchronize shared mutable state captured by + /// those callbacks. ExpireSnapshots& PlanWith(Executor& executor); /// \brief Configures the cleanup level for expired files. diff --git a/src/iceberg/update/snapshot_update.cc b/src/iceberg/update/snapshot_update.cc index 4640bac00..536dc667a 100644 --- a/src/iceberg/update/snapshot_update.cc +++ b/src/iceberg/update/snapshot_update.cc @@ -417,15 +417,17 @@ std::string SnapshotUpdate::ManifestListPath() { // Generate manifest list path // Format: {metadata_location}/snap-{snapshot_id}-{attempt}-{uuid}.avro int64_t snapshot_id = SnapshotId(); + auto attempt = attempt_.fetch_add(1, std::memory_order_relaxed) + 1; std::string filename = - std::format("snap-{}-{}-{}.avro", snapshot_id, ++attempt_, commit_uuid_); + std::format("snap-{}-{}-{}.avro", snapshot_id, attempt, commit_uuid_); return ctx_->MetadataFileLocation(filename); } std::string SnapshotUpdate::ManifestPath() { // Generate manifest path // Format: {metadata_location}/{uuid}-m{manifest_count}.avro - std::string filename = std::format("{}-m{}.avro", commit_uuid_, manifest_count_++); + auto manifest_count = manifest_count_.fetch_add(1, std::memory_order_relaxed); + std::string filename = std::format("{}-m{}.avro", commit_uuid_, manifest_count); return ctx_->MetadataFileLocation(filename); } diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h index 3c193b1e5..8f9877a0f 100644 --- a/src/iceberg/update/snapshot_update.h +++ b/src/iceberg/update/snapshot_update.h @@ -19,6 +19,7 @@ #pragma once +#include #include #include #include @@ -79,6 +80,10 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { } /// \brief Configure an executor for manifest planning work. + /// + /// The executor is borrowed and must outlive this update. Planning callbacks may be + /// called concurrently; callers must synchronize shared mutable state captured by + /// those callbacks. auto& ScanManifestsWith(this auto& self, Executor& executor) { self.plan_executor_ = std::ref(executor); return self; @@ -154,8 +159,10 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { const std::string& target_branch() const { return target_branch_; } bool can_inherit_snapshot_id() const { return can_inherit_snapshot_id_; } const std::string& commit_uuid() const { return commit_uuid_; } - int32_t manifest_count() const { return manifest_count_; } - int32_t attempt() const { return attempt_; } + int32_t manifest_count() const { + return manifest_count_.load(std::memory_order_relaxed); + } + int32_t attempt() const { return attempt_.load(std::memory_order_relaxed); } int64_t target_manifest_size_bytes() const { return target_manifest_size_bytes_; } /// \brief Clean up any uncommitted manifests that were created. @@ -232,8 +239,8 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { private: const bool can_inherit_snapshot_id_{true}; const std::string commit_uuid_; - int32_t manifest_count_{0}; - int32_t attempt_{0}; + std::atomic manifest_count_{0}; + std::atomic attempt_{0}; std::vector manifest_lists_; const int64_t target_manifest_size_bytes_; std::optional snapshot_id_; diff --git a/src/iceberg/util/task_group.h b/src/iceberg/util/task_group.h index 15fd99ac1..97c87c716 100644 --- a/src/iceberg/util/task_group.h +++ b/src/iceberg/util/task_group.h @@ -39,9 +39,14 @@ namespace internal { template concept OnceStatusTask = RvalueInvocable; +template +concept RepeatableStatusTask = + std::is_invocable_r_v || + (std::copy_constructible && std::is_invocable_r_v); + template -concept RetryableStatusTask = - OnceStatusTask && std::copy_constructible>; +concept RetryableStatusTask = std::constructible_from, F> && + RepeatableStatusTask>; ICEBERG_EXPORT Status RunTasksSingleThreaded(std::vector> tasks); @@ -80,11 +85,8 @@ class ICEBERG_TEMPLATE_CLASS_EXPORT TaskGroup { return std::forward(task); } else { return [retry_config = self.retry_config_, - task = std::forward(task)]() -> Status { - return RetryRunner(retry_config).Run([&task]() -> Status { - auto attempt_task = task; - return std::invoke(std::move(attempt_task)); - }); + task = std::forward(task)]() mutable -> Status { + return RetryRunner(retry_config).Run(task); }; } }()); From 14e231c4642dacd46ecff995fb7965cd738a6047 Mon Sep 17 00:00:00 2001 From: Zehua Zou Date: Fri, 29 May 2026 11:57:18 +0800 Subject: [PATCH 3/3] update comments --- src/iceberg/table_scan.h | 4 ---- src/iceberg/update/expire_snapshots.h | 4 ---- src/iceberg/update/snapshot_update.h | 4 ---- src/iceberg/util/executor.h | 10 ++++++++++ 4 files changed, 10 insertions(+), 12 deletions(-) diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index a4e3293f4..f0c1747b6 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -305,10 +305,6 @@ class ICEBERG_TEMPLATE_CLASS_EXPORT TableScanBuilder : public ErrorCollector { TableScanBuilder& MinRowsRequested(int64_t num_rows); /// \brief Configure an executor for manifest planning. - /// - /// The executor is borrowed and must outlive the built scan. Planning callbacks may - /// be called concurrently; callers must synchronize shared mutable state captured by - /// those callbacks. TableScanBuilder& PlanWith(Executor& executor); /// \brief Request this scan to use the given snapshot by ID. diff --git a/src/iceberg/update/expire_snapshots.h b/src/iceberg/update/expire_snapshots.h index 68c82d071..8ed9d5b3c 100644 --- a/src/iceberg/update/expire_snapshots.h +++ b/src/iceberg/update/expire_snapshots.h @@ -121,10 +121,6 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate { ExpireSnapshots& DeleteWith(std::function delete_func); /// \brief Configure an executor for planning expired snapshot metadata. - /// - /// The executor is borrowed and must outlive this update. Planning callbacks may be - /// called concurrently; callers must synchronize shared mutable state captured by - /// those callbacks. ExpireSnapshots& PlanWith(Executor& executor); /// \brief Configures the cleanup level for expired files. diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h index 8f9877a0f..50c0fe0db 100644 --- a/src/iceberg/update/snapshot_update.h +++ b/src/iceberg/update/snapshot_update.h @@ -80,10 +80,6 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { } /// \brief Configure an executor for manifest planning work. - /// - /// The executor is borrowed and must outlive this update. Planning callbacks may be - /// called concurrently; callers must synchronize shared mutable state captured by - /// those callbacks. auto& ScanManifestsWith(this auto& self, Executor& executor) { self.plan_executor_ = std::ref(executor); return self; diff --git a/src/iceberg/util/executor.h b/src/iceberg/util/executor.h index d7c8cb51a..01aa83d7a 100644 --- a/src/iceberg/util/executor.h +++ b/src/iceberg/util/executor.h @@ -30,6 +30,16 @@ namespace iceberg { using ExecutorTask = FnOnce; +/// \brief Schedules iceberg-cpp internal planning tasks. +/// +/// Public APIs that accept an executor remain synchronous: the calling thread may block +/// while waiting for submitted tasks to finish. Callers must ensure the executor can +/// continue making progress while the caller is blocked. Calling those APIs from one of +/// the same bounded executor's worker threads can deadlock unless the executor supports +/// nested blocking work. +/// +/// When an executor is configured, planning callbacks may be called concurrently. Any +/// shared mutable state captured by those callbacks must be synchronized by the caller. class ICEBERG_EXPORT Executor { public: virtual ~Executor() = default;