feat: add executor pool support#687
Conversation
08fa54a to
8ac58e2
Compare
wgtmac
left a comment
There was a problem hiding this comment.
Nice work. The abstraction is clean and the test coverage is solid. Since the default executor is nullopt, existing paths stay single-threaded, so the risk is well contained.
A couple of things before merge:
- The PR has no description. Worth adding the motivation and a short design note, ideally with the parallel/serial numbers that justify it.
- The
RetryRunnerchange from a runtime fluent API (OnlyRetryOn/StopRetryOn) to the compile-timeRetryRunner<Policy>is a breaking API change that's independent of executor support. Consider splitting it into its own PR so it gets reviewed on its own merits.
Left a few inline notes.
|
|
||
| ExecutorTask executor_task( | ||
| [promise = std::move(promise), task = std::move(task)]() mutable { | ||
| promise.set_value(std::move(task)()); |
There was a problem hiding this comment.
If a task throws instead of returning a Status, the promise is never set and the exception escapes on the worker thread, while future.get() sees a broken promise. The repo uses Result/Status rather than exceptions, so this is unlikely in our own code, but Executor is a public extension point and users may plug in pools where it matters. Wrapping the call in try/catch and turning the exception into a Status would be safer.
There was a problem hiding this comment.
Errors resulting from violating calling conventions are the user's responsibility, not the library's.
| if (!executor_.has_value()) { | ||
| return internal::RunTasksSingleThreaded(std::move(tasks_)); | ||
| } | ||
| return internal::RunTasksParallel(executor_->get(), std::move(tasks_)); |
There was a problem hiding this comment.
Run() blocks on future.get() from the calling thread. That's fine for the current sequential call sites, but PlanWith is now public API. If someone drives a TaskGroup from a worker thread of the same bounded executor, it can deadlock (pool saturated while waiting on a task queued behind it). Worth documenting that the driving thread must not be one of the executor's own workers.
There was a problem hiding this comment.
The issue of a thread pool's tasks spawning more tasks and then blocking themselves is a problem that I believe any thread pool with a thread limit will encounter, not just a problem with this particular PR.
| result[spec_id].push_back(std::move(entry)); | ||
| std::unordered_map<int32_t, std::vector<ManifestEntry>> result; | ||
| for (auto& manifest_result : manifest_results) { | ||
| result.merge(manifest_result); |
There was a problem hiding this comment.
This depends on merge() moving out the non-conflicting nodes and leaving only the conflicting keys, which are then appended. It's correct, but takes a moment to parse. A plain loop doing result[spec_id].insert(...) for every entry reads more directly, at negligible cost.
There was a problem hiding this comment.
merge() does not involve memory allocation and is more efficient.
wgtmac
left a comment
There was a problem hiding this comment.
A few more minor notes, non-blocking.
| std::vector<FilterManifestResult> filter_results(manifests.size()); | ||
| auto filter_tasks = TaskGroup().SetExecutor(executor_); | ||
| for (auto&& [manifest, result] : std::views::zip(manifests, filter_results)) { | ||
| filter_tasks.Submit([&]() -> Status { |
There was a problem hiding this comment.
This is safe today: the bindings refer to elements of manifests and filter_results, both of which outlive Run(). But capturing a loop's structured bindings by [&] into a deferred closure is an easy footgun. Capturing explicitly (e.g. [&manifest, &result, ...]) makes the intent clearer and harder to break later.
There was a problem hiding this comment.
There are too many variables captured by this lambda. It is inappropriate to write the names clearly.
wgtmac
left a comment
There was a problem hiding this comment.
Another pass, this time on the interface design and extensibility.
The overall shape is good: one virtual Executor that engines implement, threaded through the builders via PlanWith. The Arrow adapter test is a nice proof that an external pool drops in with basically one line, so "bring your own threadpool" is well covered.
On future async directions (C++23 coroutines, P2300 std::execution): the model here is synchronous and blocking, TaskGroup::Run() fans out and blocks on std::future::get(), and the planning APIs return Result<...> directly. So this is a parallel-for primitive, not a step toward a sender/receiver pipeline. That's a reasonable scope for now, I'd just flag it explicitly so nobody expects this interface to extend into async later, it'll be a separate one. Details inline.
| virtual ~Executor() = default; | ||
|
|
||
| /// \brief Schedule a task for execution. | ||
| virtual Status Submit(ExecutorTask task) = 0; |
There was a problem hiding this comment.
This is a fire-and-forget execute-style primitive (closer to the abandoned P0443 executor.execute than to P2300's scheduler/sender). Completion is tracked outside, via the std::promise/future plumbing in RunTasksParallel. Fine for a blocking parallel-for, but it doesn't lay groundwork for coroutines or std::execution: those need the executor to hand back something awaitable/composable, and the planning APIs (PlanFiles() -> Result<...>) are synchronous anyway. Going async later would be a separate interface, not an extension of this one. Worth stating in the header that Executor is a parallel-dispatch primitive, not an async scheduler.
Separately: ExecutorTask being move-only is the right call and matches Arrow, but pools whose submit takes a copyable std::function will need std::move_only_function or a small shim to adapt.
| virtual Status Submit(ExecutorTask task) = 0; | ||
| }; | ||
|
|
||
| using OptionalExecutor = std::optional<std::reference_wrapper<Executor>>; |
There was a problem hiding this comment.
std::optional<std::reference_wrapper<Executor>> is a little awkward to use (executor_->get() at the call sites). A plain Executor* expresses "nullable, non-owning borrow" just as well and reads cleaner everywhere it's passed. Minor.
There was a problem hiding this comment.
Holding a pointer requires a check to determine whether it needs to be destructed, which is detrimental to future code maintainability.
| futures.reserve(tasks.size()); | ||
|
|
||
| std::vector<Error> errors; | ||
| for (auto& task : tasks) { |
There was a problem hiding this comment.
All tasks are submitted up front, each with its own promise/future. For a handful of manifests that's fine, but as a general primitive there's no concurrency bound and no fail-fast: N tasks always queue N and allocate N futures, and if one fails early the rest still run to completion. Probably out of scope for this PR, but worth a note if engines may push large fan-outs through here.
| /// The executor is borrowed and must outlive this update. Planning callbacks may be | ||
| /// called concurrently; callers must synchronize shared mutable state captured by | ||
| /// those callbacks. | ||
| auto& ScanManifestsWith(this auto& self, Executor& executor) { |
There was a problem hiding this comment.
Naming: every other entry point uses PlanWith (TableScanBuilder, ExpireSnapshots, ManifestGroup, ManifestFilterManager, ManifestMergeManager), but this one is ScanManifestsWith. Worth settling on one name for a consistent public surface.
There was a problem hiding this comment.
This is the behavior of apache/iceberg repo.
wgtmac
left a comment
There was a problem hiding this comment.
One more pass. The main thing I found is a documentation gap around a new concurrency contract that now leaks onto user-supplied callbacks.
Once an executor is set, the user's ManifestWriterFactory and the shared FileIO get called from multiple worker threads. The tests already account for this (the factories use an atomic counter plus a barrier), so the requirement is understood, it's just not written down anywhere a downstream engine would see it. Worth documenting on the public surface.
| ManifestMergeManager& operator=(const ManifestMergeManager&) = delete; | ||
|
|
||
| /// \brief Configure an optional executor for manifest merging. | ||
| ManifestMergeManager& PlanWith(OptionalExecutor executor); |
There was a problem hiding this comment.
Once an executor is set here, the user-supplied ManifestWriterFactory gets invoked concurrently from worker threads (parallel FlushBin). The tests already guard against this with an atomic path counter + barrier, which confirms the intent, but the requirement isn't documented. A downstream engine implementing a factory won't know it has to be thread-safe. Worth a \note on this PlanWith (and the other PlanWith entry points / the ManifestWriterFactory typedef) stating the factory must be safe to call concurrently when an executor is configured.
There was a problem hiding this comment.
The documentation is on the Executor interface. There are too many PlanWith and putting comments there will result in too much redundancy.
No description provided.