Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions be/src/exec/pipeline/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,7 @@ void Pipeline::make_all_runnable(PipelineId wake_by) {
for (auto* task : _tasks) {
if (task) {
task->set_wake_up_early(wake_by);
}
}
for (auto* task : _tasks) {
if (task) {
task->terminate();
task->unblock_all_dependencies();
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ void PipelineFragmentContext::cancel(const Status reason) {

for (auto& tasks : _tasks) {
for (auto& task : tasks) {
task.first->terminate();
task.first->unblock_all_dependencies();
}
}
}
Expand Down
34 changes: 19 additions & 15 deletions be/src/exec/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ Status PipelineTask::prepare(const std::vector<TScanRangeParams>& scan_range, co
}
if (auto fragment = _fragment_context.lock()) {
if (fragment->get_query_ctx()->is_cancelled()) {
terminate();
unblock_all_dependencies();
return fragment->get_query_ctx()->exec_status();
}
} else {
Expand Down Expand Up @@ -344,7 +344,7 @@ bool PipelineTask::_is_blocked() {
});
}

void PipelineTask::terminate() {
void PipelineTask::unblock_all_dependencies() {
// We use a lock to assure all dependencies are not deconstructed here.
std::unique_lock<std::mutex> lc(_dependency_lock);
auto fragment = _fragment_context.lock();
Expand All @@ -363,7 +363,8 @@ void PipelineTask::terminate() {
[&](Dependency* dep) { dep->set_ready(); });
_memory_sufficient_dependency->set_ready();
} catch (const doris::Exception& e) {
LOG(WARNING) << "Terminate failed: " << e.code() << ", " << e.to_string();
LOG(WARNING) << "unblock_all_dependencies failed: " << e.code() << ", "
<< e.to_string();
}
}
}
Expand Down Expand Up @@ -478,20 +479,21 @@ Status PipelineTask::execute(bool* done) {
} else if (_eos && !_spilling &&
(fragment_context->is_canceled() || !_is_pending_finish())) {
// Debug point for testing the race condition fix: inject set_wake_up_early() +
// terminate() here to simulate Thread B writing A then B between Thread A's two
// reads of _wake_up_early.
// unblock_all_dependencies() here to simulate Thread B writing A then B between
// Thread A's two reads of _wake_up_early.
DBUG_EXECUTE_IF("PipelineTask::execute.wake_up_early_in_else_if", {
set_wake_up_early();
terminate();
unblock_all_dependencies();
});
*done = true;
}

// NOTE: The terminate() call is intentionally placed AFTER the _is_pending_finish() check
// above, not before. This ordering is critical to avoid a race condition:
// NOTE: The operator terminate() call is intentionally placed AFTER the
// _is_pending_finish() check above, not before. This ordering is critical to avoid a race
// condition with the seq_cst memory ordering guarantee:
//
// Pipeline::make_all_runnable() writes in this order:
// (A) set_wake_up_early() -> (B) terminate() [sets finish_dep._always_ready]
// (A) set_wake_up_early() -> (B) unblock_all_dependencies() [sets finish_dep._always_ready]
//
// If we checked _wake_up_early (A) before _is_pending_finish() (B), there would be a
// window where Thread A reads _wake_up_early=false, then Thread B writes both A and B,
Expand All @@ -502,9 +504,10 @@ Status PipelineTask::execute(bool* done) {
//
// By reading _is_pending_finish() (B) before the second read of _wake_up_early (A),
// if Thread A observes B's effect (_always_ready=true), it is guaranteed to also observe
// A's effect (_wake_up_early=true) on this second read, ensuring terminate() is called.
// A's effect (_wake_up_early=true) on this second read, ensuring operator terminate() is
// called. This relies on _wake_up_early and _always_ready both being std::atomic with the
// default seq_cst ordering — do not weaken them to relaxed or acq/rel.
if (_wake_up_early) {
terminate();
THROW_IF_ERROR(_root->terminate(_state));
THROW_IF_ERROR(_sink->terminate(_state));
}
Expand Down Expand Up @@ -708,7 +711,7 @@ Status PipelineTask::execute(bool* done) {
if (required_pipeline_id == pipeline_id() && required_task_id == task_id() &&
fragment_context->get_fragment_id() == required_fragment_id) {
_wake_up_early = true;
terminate();
unblock_all_dependencies();
} else if (required_pipeline_id == pipeline_id() &&
fragment_context->get_fragment_id() == required_fragment_id) {
LOG(WARNING) << "PipelineTask::execute.terminate sleep 5s";
Expand Down Expand Up @@ -759,9 +762,10 @@ Status PipelineTask::do_revoke_memory(const std::shared_ptr<SpillContext>& spill
fragment_context->get_query_ctx()->resource_ctx()->cpu_context()->update_cpu_cost_ms(
delta_cpu_time);

// If task is woke up early, we should terminate all operators, and this task could be closed immediately.
// If task is woke up early, unblock all dependencies and terminate all operators,
// so this task could be closed immediately.
if (_wake_up_early) {
terminate();
unblock_all_dependencies();
THROW_IF_ERROR(_root->terminate(_state));
THROW_IF_ERROR(_sink->terminate(_state));
_eos = true;
Expand Down Expand Up @@ -874,7 +878,7 @@ void PipelineTask::stop_if_finished() {
if (auto sink = _sink) {
if (sink->is_finished(_state)) {
set_wake_up_early();
terminate();
unblock_all_dependencies();
}
}
}
Expand Down
12 changes: 8 additions & 4 deletions be/src/exec/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
}

virtual PipelineTask& set_thread_id(int thread_id) {
_thread_id = thread_id;
if (thread_id != _thread_id) {
COUNTER_UPDATE(_core_change_times, 1);
_thread_id = thread_id;
}
return *this;
}
Expand Down Expand Up @@ -130,8 +130,12 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
_wake_by = wake_by;
}

// Execution phase should be terminated. This is called if this task is canceled or waken up early.
void terminate();
// Unblock all dependencies so this task can never be blocked again.
// This is called when the task is woken up early or the fragment is canceled.
//
// NOTE: This does NOT call operator-level terminate() — operator terminate must run
// inside execute() on the worker thread because operator state is not thread-safe.
void unblock_all_dependencies();

// 1 used for update priority queue
// note(wb) an ugly implementation, need refactor later
Expand Down Expand Up @@ -317,7 +321,7 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
MonotonicStopWatch _state_change_watcher;
std::atomic<bool> _spilling = false;
const std::string _pipeline_name;
int _wake_by = -1;
std::atomic<int> _wake_by = -1;
};

using PipelineTaskSPtr = std::shared_ptr<PipelineTask>;
Expand Down
2 changes: 1 addition & 1 deletion be/test/exec/pipeline/pipeline_task_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ TEST_F(PipelineTaskTest, TEST_TERMINATE) {
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 5000));
terminated = true;
task->set_wake_up_early();
task->terminate();
task->unblock_all_dependencies();
};

std::thread exec_thread(exec_func);
Expand Down
Loading