Skip to content

feat: add executor pool support#687

Open
HuaHuaY wants to merge 3 commits into
apache:mainfrom
HuaHuaY:infra
Open

feat: add executor pool support#687
HuaHuaY wants to merge 3 commits into
apache:mainfrom
HuaHuaY:infra

Conversation

@HuaHuaY
Copy link
Copy Markdown
Contributor

@HuaHuaY HuaHuaY commented May 28, 2026

No description provided.

@HuaHuaY HuaHuaY force-pushed the infra branch 2 times, most recently from 08fa54a to 8ac58e2 Compare May 28, 2026 12:52
Copy link
Copy Markdown
Member

@wgtmac wgtmac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work. The abstraction is clean and the test coverage is solid. Since the default executor is nullopt, existing paths stay single-threaded, so the risk is well contained.

A couple of things before merge:

  • The PR has no description. Worth adding the motivation and a short design note, ideally with the parallel/serial numbers that justify it.
  • The RetryRunner change from a runtime fluent API (OnlyRetryOn/StopRetryOn) to the compile-time RetryRunner<Policy> is a breaking API change that's independent of executor support. Consider splitting it into its own PR so it gets reviewed on its own merits.

Left a few inline notes.


ExecutorTask executor_task(
[promise = std::move(promise), task = std::move(task)]() mutable {
promise.set_value(std::move(task)());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a task throws instead of returning a Status, the promise is never set and the exception escapes on the worker thread, while future.get() sees a broken promise. The repo uses Result/Status rather than exceptions, so this is unlikely in our own code, but Executor is a public extension point and users may plug in pools where it matters. Wrapping the call in try/catch and turning the exception into a Status would be safer.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Errors resulting from violating calling conventions are the user's responsibility, not the library's.

if (!executor_.has_value()) {
return internal::RunTasksSingleThreaded(std::move(tasks_));
}
return internal::RunTasksParallel(executor_->get(), std::move(tasks_));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Run() blocks on future.get() from the calling thread. That's fine for the current sequential call sites, but PlanWith is now public API. If someone drives a TaskGroup from a worker thread of the same bounded executor, it can deadlock (pool saturated while waiting on a task queued behind it). Worth documenting that the driving thread must not be one of the executor's own workers.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue of a thread pool's tasks spawning more tasks and then blocking themselves is a problem that I believe any thread pool with a thread limit will encounter, not just a problem with this particular PR.

result[spec_id].push_back(std::move(entry));
std::unordered_map<int32_t, std::vector<ManifestEntry>> result;
for (auto& manifest_result : manifest_results) {
result.merge(manifest_result);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This depends on merge() moving out the non-conflicting nodes and leaving only the conflicting keys, which are then appended. It's correct, but takes a moment to parse. A plain loop doing result[spec_id].insert(...) for every entry reads more directly, at negligible cost.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge() does not involve memory allocation and is more efficient.

Copy link
Copy Markdown
Member

@wgtmac wgtmac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more minor notes, non-blocking.

std::vector<FilterManifestResult> filter_results(manifests.size());
auto filter_tasks = TaskGroup().SetExecutor(executor_);
for (auto&& [manifest, result] : std::views::zip(manifests, filter_results)) {
filter_tasks.Submit([&]() -> Status {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is safe today: the bindings refer to elements of manifests and filter_results, both of which outlive Run(). But capturing a loop's structured bindings by [&] into a deferred closure is an easy footgun. Capturing explicitly (e.g. [&manifest, &result, ...]) makes the intent clearer and harder to break later.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are too many variables captured by this lambda. It is inappropriate to write the names clearly.

Comment thread src/iceberg/update/snapshot_update.cc
Comment thread src/iceberg/util/retry_util.h
Copy link
Copy Markdown
Member

@wgtmac wgtmac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another pass, this time on the interface design and extensibility.

The overall shape is good: one virtual Executor that engines implement, threaded through the builders via PlanWith. The Arrow adapter test is a nice proof that an external pool drops in with basically one line, so "bring your own threadpool" is well covered.

On future async directions (C++23 coroutines, P2300 std::execution): the model here is synchronous and blocking, TaskGroup::Run() fans out and blocks on std::future::get(), and the planning APIs return Result<...> directly. So this is a parallel-for primitive, not a step toward a sender/receiver pipeline. That's a reasonable scope for now, I'd just flag it explicitly so nobody expects this interface to extend into async later, it'll be a separate one. Details inline.

virtual ~Executor() = default;

/// \brief Schedule a task for execution.
virtual Status Submit(ExecutorTask task) = 0;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a fire-and-forget execute-style primitive (closer to the abandoned P0443 executor.execute than to P2300's scheduler/sender). Completion is tracked outside, via the std::promise/future plumbing in RunTasksParallel. Fine for a blocking parallel-for, but it doesn't lay groundwork for coroutines or std::execution: those need the executor to hand back something awaitable/composable, and the planning APIs (PlanFiles() -> Result<...>) are synchronous anyway. Going async later would be a separate interface, not an extension of this one. Worth stating in the header that Executor is a parallel-dispatch primitive, not an async scheduler.

Separately: ExecutorTask being move-only is the right call and matches Arrow, but pools whose submit takes a copyable std::function will need std::move_only_function or a small shim to adapt.

virtual Status Submit(ExecutorTask task) = 0;
};

using OptionalExecutor = std::optional<std::reference_wrapper<Executor>>;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::optional<std::reference_wrapper<Executor>> is a little awkward to use (executor_->get() at the call sites). A plain Executor* expresses "nullable, non-owning borrow" just as well and reads cleaner everywhere it's passed. Minor.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Holding a pointer requires a check to determine whether it needs to be destructed, which is detrimental to future code maintainability.

futures.reserve(tasks.size());

std::vector<Error> errors;
for (auto& task : tasks) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All tasks are submitted up front, each with its own promise/future. For a handful of manifests that's fine, but as a general primitive there's no concurrency bound and no fail-fast: N tasks always queue N and allocate N futures, and if one fails early the rest still run to completion. Probably out of scope for this PR, but worth a note if engines may push large fan-outs through here.

/// The executor is borrowed and must outlive this update. Planning callbacks may be
/// called concurrently; callers must synchronize shared mutable state captured by
/// those callbacks.
auto& ScanManifestsWith(this auto& self, Executor& executor) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming: every other entry point uses PlanWith (TableScanBuilder, ExpireSnapshots, ManifestGroup, ManifestFilterManager, ManifestMergeManager), but this one is ScanManifestsWith. Worth settling on one name for a consistent public surface.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the behavior of apache/iceberg repo.

Copy link
Copy Markdown
Member

@wgtmac wgtmac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more pass. The main thing I found is a documentation gap around a new concurrency contract that now leaks onto user-supplied callbacks.

Once an executor is set, the user's ManifestWriterFactory and the shared FileIO get called from multiple worker threads. The tests already account for this (the factories use an atomic counter plus a barrier), so the requirement is understood, it's just not written down anywhere a downstream engine would see it. Worth documenting on the public surface.

ManifestMergeManager& operator=(const ManifestMergeManager&) = delete;

/// \brief Configure an optional executor for manifest merging.
ManifestMergeManager& PlanWith(OptionalExecutor executor);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once an executor is set here, the user-supplied ManifestWriterFactory gets invoked concurrently from worker threads (parallel FlushBin). The tests already guard against this with an atomic path counter + barrier, which confirms the intent, but the requirement isn't documented. A downstream engine implementing a factory won't know it has to be thread-safe. Worth a \note on this PlanWith (and the other PlanWith entry points / the ManifestWriterFactory typedef) stating the factory must be safe to call concurrently when an executor is configured.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation is on the Executor interface. There are too many PlanWith and putting comments there will result in too much redundancy.

Comment thread src/iceberg/delete_file_index.cc
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants