From 26487c4a385cf47c6acec6298dbb4e1fa6b88b20 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Thu, 26 Mar 2026 16:08:18 +0800 Subject: [PATCH 1/3] adjust some unnecessary pipeline_task.terminate call --- be/src/exec/pipeline/pipeline.cpp | 4 ---- be/src/exec/pipeline/pipeline_task.cpp | 1 - 2 files changed, 5 deletions(-) diff --git a/be/src/exec/pipeline/pipeline.cpp b/be/src/exec/pipeline/pipeline.cpp index 78a2cffafad989..b06b3a59d3d755 100644 --- a/be/src/exec/pipeline/pipeline.cpp +++ b/be/src/exec/pipeline/pipeline.cpp @@ -125,10 +125,6 @@ void Pipeline::make_all_runnable(PipelineId wake_by) { for (auto* task : _tasks) { if (task) { task->set_wake_up_early(wake_by); - } - } - for (auto* task : _tasks) { - if (task) { task->terminate(); } } diff --git a/be/src/exec/pipeline/pipeline_task.cpp b/be/src/exec/pipeline/pipeline_task.cpp index 1b82530ebb8b03..9eef9217ab3374 100644 --- a/be/src/exec/pipeline/pipeline_task.cpp +++ b/be/src/exec/pipeline/pipeline_task.cpp @@ -504,7 +504,6 @@ Status PipelineTask::execute(bool* done) { // if Thread A observes B's effect (_always_ready=true), it is guaranteed to also observe // A's effect (_wake_up_early=true) on this second read, ensuring terminate() is called. if (_wake_up_early) { - terminate(); THROW_IF_ERROR(_root->terminate(_state)); THROW_IF_ERROR(_sink->terminate(_state)); } From 911aa399bff51d64446ce9b12a983b44fef3953d Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Mon, 30 Mar 2026 12:59:42 +0800 Subject: [PATCH 2/3] update some minor fix and refactor --- be/src/exec/pipeline/pipeline.cpp | 2 +- .../pipeline/pipeline_fragment_context.cpp | 2 +- be/src/exec/pipeline/pipeline_task.cpp | 33 +++++++++++-------- be/src/exec/pipeline/pipeline_task.h | 12 ++++--- be/test/exec/pipeline/pipeline_task_test.cpp | 6 ++-- 5 files changed, 33 insertions(+), 22 deletions(-) diff --git a/be/src/exec/pipeline/pipeline.cpp b/be/src/exec/pipeline/pipeline.cpp index b06b3a59d3d755..de3c852ada1bb6 100644 --- a/be/src/exec/pipeline/pipeline.cpp +++ b/be/src/exec/pipeline/pipeline.cpp @@ -125,7 +125,7 @@ void Pipeline::make_all_runnable(PipelineId wake_by) { for (auto* task : _tasks) { if (task) { task->set_wake_up_early(wake_by); - task->terminate(); + task->unblock_all_dependencies(); } } } diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp b/be/src/exec/pipeline/pipeline_fragment_context.cpp index 64f1bbf8c30422..40177f59acab94 100644 --- a/be/src/exec/pipeline/pipeline_fragment_context.cpp +++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp @@ -257,7 +257,7 @@ void PipelineFragmentContext::cancel(const Status reason) { for (auto& tasks : _tasks) { for (auto& task : tasks) { - task.first->terminate(); + task.first->unblock_all_dependencies(); } } } diff --git a/be/src/exec/pipeline/pipeline_task.cpp b/be/src/exec/pipeline/pipeline_task.cpp index 9eef9217ab3374..5999854d5b1fcf 100644 --- a/be/src/exec/pipeline/pipeline_task.cpp +++ b/be/src/exec/pipeline/pipeline_task.cpp @@ -160,7 +160,7 @@ Status PipelineTask::prepare(const std::vector& scan_range, co } if (auto fragment = _fragment_context.lock()) { if (fragment->get_query_ctx()->is_cancelled()) { - terminate(); + unblock_all_dependencies(); return fragment->get_query_ctx()->exec_status(); } } else { @@ -344,7 +344,7 @@ bool PipelineTask::_is_blocked() { }); } -void PipelineTask::terminate() { +void PipelineTask::unblock_all_dependencies() { // We use a lock to assure all dependencies are not deconstructed here. std::unique_lock lc(_dependency_lock); auto fragment = _fragment_context.lock(); @@ -363,7 +363,8 @@ void PipelineTask::terminate() { [&](Dependency* dep) { dep->set_ready(); }); _memory_sufficient_dependency->set_ready(); } catch (const doris::Exception& e) { - LOG(WARNING) << "Terminate failed: " << e.code() << ", " << e.to_string(); + LOG(WARNING) << "unblock_all_dependencies failed: " << e.code() << ", " + << e.to_string(); } } } @@ -478,20 +479,21 @@ Status PipelineTask::execute(bool* done) { } else if (_eos && !_spilling && (fragment_context->is_canceled() || !_is_pending_finish())) { // Debug point for testing the race condition fix: inject set_wake_up_early() + - // terminate() here to simulate Thread B writing A then B between Thread A's two - // reads of _wake_up_early. + // unblock_all_dependencies() here to simulate Thread B writing A then B between + // Thread A's two reads of _wake_up_early. DBUG_EXECUTE_IF("PipelineTask::execute.wake_up_early_in_else_if", { set_wake_up_early(); - terminate(); + unblock_all_dependencies(); }); *done = true; } - // NOTE: The terminate() call is intentionally placed AFTER the _is_pending_finish() check - // above, not before. This ordering is critical to avoid a race condition: + // NOTE: The operator terminate() call is intentionally placed AFTER the + // _is_pending_finish() check above, not before. This ordering is critical to avoid a race + // condition with the seq_cst memory ordering guarantee: // // Pipeline::make_all_runnable() writes in this order: - // (A) set_wake_up_early() -> (B) terminate() [sets finish_dep._always_ready] + // (A) set_wake_up_early() -> (B) unblock_all_dependencies() [sets finish_dep._always_ready] // // If we checked _wake_up_early (A) before _is_pending_finish() (B), there would be a // window where Thread A reads _wake_up_early=false, then Thread B writes both A and B, @@ -502,7 +504,9 @@ Status PipelineTask::execute(bool* done) { // // By reading _is_pending_finish() (B) before the second read of _wake_up_early (A), // if Thread A observes B's effect (_always_ready=true), it is guaranteed to also observe - // A's effect (_wake_up_early=true) on this second read, ensuring terminate() is called. + // A's effect (_wake_up_early=true) on this second read, ensuring operator terminate() is + // called. This relies on _wake_up_early and _always_ready both being std::atomic with the + // default seq_cst ordering — do not weaken them to relaxed or acq/rel. if (_wake_up_early) { THROW_IF_ERROR(_root->terminate(_state)); THROW_IF_ERROR(_sink->terminate(_state)); @@ -707,7 +711,7 @@ Status PipelineTask::execute(bool* done) { if (required_pipeline_id == pipeline_id() && required_task_id == task_id() && fragment_context->get_fragment_id() == required_fragment_id) { _wake_up_early = true; - terminate(); + unblock_all_dependencies(); } else if (required_pipeline_id == pipeline_id() && fragment_context->get_fragment_id() == required_fragment_id) { LOG(WARNING) << "PipelineTask::execute.terminate sleep 5s"; @@ -758,9 +762,10 @@ Status PipelineTask::do_revoke_memory(const std::shared_ptr& spill fragment_context->get_query_ctx()->resource_ctx()->cpu_context()->update_cpu_cost_ms( delta_cpu_time); - // If task is woke up early, we should terminate all operators, and this task could be closed immediately. + // If task is woke up early, unblock all dependencies and terminate all operators, + // so this task could be closed immediately. if (_wake_up_early) { - terminate(); + unblock_all_dependencies(); THROW_IF_ERROR(_root->terminate(_state)); THROW_IF_ERROR(_sink->terminate(_state)); _eos = true; @@ -873,7 +878,7 @@ void PipelineTask::stop_if_finished() { if (auto sink = _sink) { if (sink->is_finished(_state)) { set_wake_up_early(); - terminate(); + unblock_all_dependencies(); } } } diff --git a/be/src/exec/pipeline/pipeline_task.h b/be/src/exec/pipeline/pipeline_task.h index 950eb8fe2f9428..999bdb07116428 100644 --- a/be/src/exec/pipeline/pipeline_task.h +++ b/be/src/exec/pipeline/pipeline_task.h @@ -71,9 +71,9 @@ class PipelineTask : public std::enable_shared_from_this { } virtual PipelineTask& set_thread_id(int thread_id) { - _thread_id = thread_id; if (thread_id != _thread_id) { COUNTER_UPDATE(_core_change_times, 1); + _thread_id = thread_id; } return *this; } @@ -130,8 +130,12 @@ class PipelineTask : public std::enable_shared_from_this { _wake_by = wake_by; } - // Execution phase should be terminated. This is called if this task is canceled or waken up early. - void terminate(); + // Unblock all dependencies so this task can never be blocked again. + // This is called when the task is woken up early or the fragment is canceled. + // + // NOTE: This does NOT call operator-level terminate() — operator terminate must run + // inside execute() on the worker thread because operator state is not thread-safe. + void unblock_all_dependencies(); // 1 used for update priority queue // note(wb) an ugly implementation, need refactor later @@ -317,7 +321,7 @@ class PipelineTask : public std::enable_shared_from_this { MonotonicStopWatch _state_change_watcher; std::atomic _spilling = false; const std::string _pipeline_name; - int _wake_by = -1; + std::atomic _wake_by = -1; }; using PipelineTaskSPtr = std::shared_ptr; diff --git a/be/test/exec/pipeline/pipeline_task_test.cpp b/be/test/exec/pipeline/pipeline_task_test.cpp index db61819c4dac12..9453c633e2f1d5 100644 --- a/be/test/exec/pipeline/pipeline_task_test.cpp +++ b/be/test/exec/pipeline/pipeline_task_test.cpp @@ -435,7 +435,7 @@ TEST_F(PipelineTaskTest, TEST_TERMINATE) { std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 5000)); terminated = true; task->set_wake_up_early(); - task->terminate(); + task->unblock_all_dependencies(); }; std::thread exec_thread(exec_func); @@ -1238,7 +1238,9 @@ TEST_F(PipelineTaskTest, TEST_SHOULD_TRIGGER_REVOKING) { query_mem_tracker->set_limit(wg_mem_limit); } // Case 4: reserve_size too small (reserve * parallelism <= query_limit / 5) -> false - { EXPECT_FALSE(task->_should_trigger_revoking(wg_mem_limit / 5)); } + { + EXPECT_FALSE(task->_should_trigger_revoking(wg_mem_limit / 5)); + } // Case 5: no memory pressure (neither query tracker nor wg watermark) -> false { // consumption + reserve = 100MB + 250MB = 350MB < 90% of 1GB (900MB); wg not at watermark From ab7b81b531bffff57ea389a86b08782a04c7de64 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Mon, 30 Mar 2026 13:02:22 +0800 Subject: [PATCH 3/3] fmt --- be/test/exec/pipeline/pipeline_task_test.cpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/be/test/exec/pipeline/pipeline_task_test.cpp b/be/test/exec/pipeline/pipeline_task_test.cpp index 9453c633e2f1d5..fb55afbc9de3f3 100644 --- a/be/test/exec/pipeline/pipeline_task_test.cpp +++ b/be/test/exec/pipeline/pipeline_task_test.cpp @@ -1238,9 +1238,7 @@ TEST_F(PipelineTaskTest, TEST_SHOULD_TRIGGER_REVOKING) { query_mem_tracker->set_limit(wg_mem_limit); } // Case 4: reserve_size too small (reserve * parallelism <= query_limit / 5) -> false - { - EXPECT_FALSE(task->_should_trigger_revoking(wg_mem_limit / 5)); - } + { EXPECT_FALSE(task->_should_trigger_revoking(wg_mem_limit / 5)); } // Case 5: no memory pressure (neither query tracker nor wg watermark) -> false { // consumption + reserve = 100MB + 250MB = 350MB < 90% of 1GB (900MB); wg not at watermark