[Chore](pipeline) adjust some unnecessary pipeline_task.terminate call#61768
[Chore](pipeline) adjust some unnecessary pipeline_task.terminate call#61768BiteTheDDDDt wants to merge 1 commit intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
|
/review |
There was a problem hiding this comment.
Pull request overview
Adjusts pipeline task shutdown behavior to avoid unnecessary PipelineTask::terminate() calls during execution and centralize termination triggering in Pipeline::make_all_runnable, aiming to make early-wakeup shutdown more controlled.
Changes:
- Removed
PipelineTask::terminate()call fromPipelineTask::execute()’s_wake_up_earlytermination path, keeping explicit operator termination via_root->terminate()and_sink->terminate(). - Refactored
Pipeline::make_all_runnable()to set_wake_up_earlyand callterminate()in a single loop over tasks.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
be/src/exec/pipeline/pipeline_task.cpp |
Stops calling PipelineTask::terminate() during _wake_up_early execution teardown; still terminates root/sink operators. |
be/src/exec/pipeline/pipeline.cpp |
Changes how tasks are marked early-wakeup and terminated when making a pipeline runnable. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (_sink->count_down_destination()) { | ||
| for (auto* task : _tasks) { | ||
| if (task) { | ||
| task->set_wake_up_early(wake_by); | ||
| } | ||
| } | ||
| for (auto* task : _tasks) { | ||
| if (task) { | ||
| task->terminate(); | ||
| } |
There was a problem hiding this comment.
make_all_runnable() now calls terminate() immediately after set_wake_up_early() for each task. This changes the ordering from “set wake_up_early for all tasks, then terminate all tasks” to an interleaved per-task ordering. If any Dependency objects are shared across tasks (e.g. local-exchange dependencies can be shared), terminating an earlier task can set _always_ready=true on a shared dependency before later tasks have _wake_up_early=true, reintroducing the exact race described in PipelineTask::execute() (task can observe _always_ready and close without running operator terminate). Consider restoring the two-phase loop (set wake_up_early for all tasks first, then terminate all tasks) or otherwise ensuring all tasks have _wake_up_early set before any task mutates shared dependencies via terminate().
| // By reading _is_pending_finish() (B) before the second read of _wake_up_early (A), | ||
| // if Thread A observes B's effect (_always_ready=true), it is guaranteed to also observe | ||
| // A's effect (_wake_up_early=true) on this second read, ensuring terminate() is called. | ||
| if (_wake_up_early) { | ||
| terminate(); | ||
| THROW_IF_ERROR(_root->terminate(_state)); | ||
| THROW_IF_ERROR(_sink->terminate(_state)); | ||
| } |
There was a problem hiding this comment.
The inline comment block still says the ordering “ensures terminate() is called”, but this branch no longer calls PipelineTask::terminate() (only _root->terminate() / _sink->terminate()). Please update the comment to reflect the new behavior (i.e., what is being guaranteed now is that operator termination runs, while dependency unblocking via PipelineTask::terminate() happens elsewhere).
Code Review SummaryChanges reviewed: Merging two loops into one in Critical Checkpoint Conclusions
Verdict: No issues found. The changes are safe and the simplification is clean. |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
This pull request makes adjustments to the pipeline task termination logic to improve the reliability and correctness of task shutdown in the pipeline execution framework. The changes focus on ensuring that termination is handled in a more controlled and explicit manner.
Key changes include:
Pipeline Task Termination Logic:
terminate()withinPipelineTask::executewhen_wake_up_earlyis true, relying instead on explicit termination of the root and sink components.Pipeline::make_all_runnableto callterminate()on each task after setting the wake-up flag, consolidating task termination in one place and ensuring all tasks are properly terminated.