Skip to content

[Chore](pipeline) adjust some unnecessary pipeline_task.terminate call#61768

Open
BiteTheDDDDt wants to merge 1 commit intoapache:masterfrom
BiteTheDDDDt:dev_0326
Open

[Chore](pipeline) adjust some unnecessary pipeline_task.terminate call#61768
BiteTheDDDDt wants to merge 1 commit intoapache:masterfrom
BiteTheDDDDt:dev_0326

Conversation

@BiteTheDDDDt
Copy link
Contributor

This pull request makes adjustments to the pipeline task termination logic to improve the reliability and correctness of task shutdown in the pipeline execution framework. The changes focus on ensuring that termination is handled in a more controlled and explicit manner.

Key changes include:

Pipeline Task Termination Logic:

  • Removed the direct call to terminate() within PipelineTask::execute when _wake_up_early is true, relying instead on explicit termination of the root and sink components.
  • Updated Pipeline::make_all_runnable to call terminate() on each task after setting the wake-up flag, consolidating task termination in one place and ensuring all tasks are properly terminated.

Copilot AI review requested due to automatic review settings March 26, 2026 08:11
@hello-stephen
Copy link
Contributor

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@BiteTheDDDDt
Copy link
Contributor Author

run buildall

@BiteTheDDDDt
Copy link
Contributor Author

/review

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adjusts pipeline task shutdown behavior to avoid unnecessary PipelineTask::terminate() calls during execution and centralize termination triggering in Pipeline::make_all_runnable, aiming to make early-wakeup shutdown more controlled.

Changes:

  • Removed PipelineTask::terminate() call from PipelineTask::execute()’s _wake_up_early termination path, keeping explicit operator termination via _root->terminate() and _sink->terminate().
  • Refactored Pipeline::make_all_runnable() to set _wake_up_early and call terminate() in a single loop over tasks.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

File Description
be/src/exec/pipeline/pipeline_task.cpp Stops calling PipelineTask::terminate() during _wake_up_early execution teardown; still terminates root/sink operators.
be/src/exec/pipeline/pipeline.cpp Changes how tasks are marked early-wakeup and terminated when making a pipeline runnable.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 124 to 129
if (_sink->count_down_destination()) {
for (auto* task : _tasks) {
if (task) {
task->set_wake_up_early(wake_by);
}
}
for (auto* task : _tasks) {
if (task) {
task->terminate();
}
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make_all_runnable() now calls terminate() immediately after set_wake_up_early() for each task. This changes the ordering from “set wake_up_early for all tasks, then terminate all tasks” to an interleaved per-task ordering. If any Dependency objects are shared across tasks (e.g. local-exchange dependencies can be shared), terminating an earlier task can set _always_ready=true on a shared dependency before later tasks have _wake_up_early=true, reintroducing the exact race described in PipelineTask::execute() (task can observe _always_ready and close without running operator terminate). Consider restoring the two-phase loop (set wake_up_early for all tasks first, then terminate all tasks) or otherwise ensuring all tasks have _wake_up_early set before any task mutates shared dependencies via terminate().

Copilot uses AI. Check for mistakes.
Comment on lines 503 to 509
// By reading _is_pending_finish() (B) before the second read of _wake_up_early (A),
// if Thread A observes B's effect (_always_ready=true), it is guaranteed to also observe
// A's effect (_wake_up_early=true) on this second read, ensuring terminate() is called.
if (_wake_up_early) {
terminate();
THROW_IF_ERROR(_root->terminate(_state));
THROW_IF_ERROR(_sink->terminate(_state));
}
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inline comment block still says the ordering “ensures terminate() is called”, but this branch no longer calls PipelineTask::terminate() (only _root->terminate() / _sink->terminate()). Please update the comment to reflect the new behavior (i.e., what is being guaranteed now is that operator termination runs, while dependency unblocking via PipelineTask::terminate() happens elsewhere).

Copilot uses AI. Check for mistakes.
@github-actions
Copy link
Contributor

Code Review Summary

Changes reviewed: Merging two loops into one in Pipeline::make_all_runnable(), and removing the redundant PipelineTask::terminate() call from PipelineTask::execute()'s deferred lambda.

Critical Checkpoint Conclusions

  • Goal and correctness: The PR removes a redundant PipelineTask::terminate() call from execute() and simplifies the loop in make_all_runnable(). Both changes are correct.
  • Concurrency analysis:
    • The per-task ordering guarantee (set_wake_up_early before terminate) described in the comment at lines 490-505 of pipeline_task.cpp is preserved in the single-loop version, because for each individual task, set_wake_up_early() is still called before terminate().
    • Cross-task effects via shared dependencies (_execution_dependencies, _memory_sufficient_dependency) do not affect the race condition because _is_pending_finish() only checks per-task _finish_dependencies.
    • All code paths that set _wake_up_early=true (make_all_runnable, stop_if_finished) call PipelineTask::terminate() immediately after, so the removed call in execute() was indeed redundant (idempotent double-call).
  • Parallel code paths: do_revoke_memory() at line 763 still calls terminate() in its deferred lambda when _wake_up_early is true. This is a minor inconsistency with this PR's change but is not incorrect — it serves as an idempotent safety net in the spill path.
  • Lifecycle / memory / error handling: No issues — PipelineTask::terminate() is guarded by !is_finalized() and the _dependency_lock mutex, so late or redundant calls are safe.
  • Test coverage: Existing regression tests (test_terminate.groovy, test_slow_close.groovy) cover the wake-up-early and make_all_runnable paths with debug points. No new test is needed for this simplification.
  • Incompatible changes: None — this is a pure refactoring with no behavioral change.
  • Configuration / observability: Not applicable.

Verdict: No issues found. The changes are safe and the simplification is clean.

@hello-stephen
Copy link
Contributor

BE UT Coverage Report

Increment line coverage 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 52.87% (19911/37659)
Line Coverage 36.40% (186532/512489)
Region Coverage 32.64% (144599/442970)
Branch Coverage 33.86% (63422/187308)

@hello-stephen
Copy link
Contributor

BE Regression && UT Coverage Report

Increment line coverage 100% (0/0) 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 71.84% (26498/36884)
Line Coverage 54.71% (279524/510944)
Region Coverage 51.87% (231901/447095)
Branch Coverage 53.34% (100213/187874)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants