From 3b7ba8a903813824cabede3a3941a0a7fd1d7356 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Thu, 26 Mar 2026 10:47:48 +0800 Subject: [PATCH 1/3] fix cancel_at_time not work at parallel_sink/parallel_outfile --- be/src/exec/operator/result_file_sink_operator.cpp | 6 +++++- be/src/exec/operator/result_sink_operator.cpp | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/be/src/exec/operator/result_file_sink_operator.cpp b/be/src/exec/operator/result_file_sink_operator.cpp index 77eceae8e8ce9b..fa14bb8f5c5a62 100644 --- a/be/src/exec/operator/result_file_sink_operator.cpp +++ b/be/src/exec/operator/result_file_sink_operator.cpp @@ -135,9 +135,13 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status) state->get_query_ctx()->resource_ctx()->io_context()->update_returned_rows(written_rows); RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), final_status, written_rows)); } + // In parallel outfile mode, the buffer is registered under query_id; otherwise + // it is registered under fragment_instance_id. Pass the matching key so the + // deferred cancel actually finds and removes the buffer entry. state->exec_env()->result_mgr()->cancel_at_time( time(nullptr) + config::result_buffer_cancelled_interval_time, - state->fragment_instance_id()); + state->query_options().enable_parallel_outfile ? state->query_id() + : state->fragment_instance_id()); return Base::close(state, exec_status); } diff --git a/be/src/exec/operator/result_sink_operator.cpp b/be/src/exec/operator/result_sink_operator.cpp index b361cb7b6e1f90..b5057ddd07471b 100644 --- a/be/src/exec/operator/result_sink_operator.cpp +++ b/be/src/exec/operator/result_sink_operator.cpp @@ -199,9 +199,13 @@ Status ResultSinkLocalState::close(RuntimeState* state, Status exec_status) { } RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), final_status, written_rows)); } + // In parallel result sink mode, the buffer is registered under query_id; otherwise + // it is registered under fragment_instance_id. Pass the matching key so the + // deferred cancel actually finds and removes the buffer entry. state->exec_env()->result_mgr()->cancel_at_time( time(nullptr) + config::result_buffer_cancelled_interval_time, - state->fragment_instance_id()); + state->query_options().enable_parallel_result_sink ? state->query_id() + : state->fragment_instance_id()); RETURN_IF_ERROR(Base::close(state, exec_status)); return final_status; } From 020f877ebe160c80472db11371de2c08859e8a56 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Thu, 26 Mar 2026 11:02:20 +0800 Subject: [PATCH 2/3] update --- .../operator/result_file_sink_operator.cpp | 21 ++++++++++++------- be/src/exec/operator/result_sink_operator.cpp | 21 ++++++++++++------- be/src/runtime/result_block_buffer.cpp | 6 +++++- be/src/runtime/result_block_buffer.h | 18 ++++++++++++++-- 4 files changed, 47 insertions(+), 19 deletions(-) diff --git a/be/src/exec/operator/result_file_sink_operator.cpp b/be/src/exec/operator/result_file_sink_operator.cpp index fa14bb8f5c5a62..26c46fc05f4e34 100644 --- a/be/src/exec/operator/result_file_sink_operator.cpp +++ b/be/src/exec/operator/result_file_sink_operator.cpp @@ -133,15 +133,20 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status) if (_sender) { int64_t written_rows = _writer == nullptr ? 0 : _writer->get_written_rows(); state->get_query_ctx()->resource_ctx()->io_context()->update_returned_rows(written_rows); - RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), final_status, written_rows)); + bool is_fully_closed = false; + RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), final_status, written_rows, + is_fully_closed)); + // Schedule deferred cleanup only when the last instance closes the shared + // buffer. In parallel outfile mode the buffer is keyed by query_id; in + // non-parallel mode it is keyed by fragment_instance_id. Either way, + // _sender->buffer_id() returns the correct registration key, so there is + // no need to branch on enable_parallel_outfile here. + if (is_fully_closed) { + state->exec_env()->result_mgr()->cancel_at_time( + time(nullptr) + config::result_buffer_cancelled_interval_time, + _sender->buffer_id()); + } } - // In parallel outfile mode, the buffer is registered under query_id; otherwise - // it is registered under fragment_instance_id. Pass the matching key so the - // deferred cancel actually finds and removes the buffer entry. - state->exec_env()->result_mgr()->cancel_at_time( - time(nullptr) + config::result_buffer_cancelled_interval_time, - state->query_options().enable_parallel_outfile ? state->query_id() - : state->fragment_instance_id()); return Base::close(state, exec_status); } diff --git a/be/src/exec/operator/result_sink_operator.cpp b/be/src/exec/operator/result_sink_operator.cpp index b5057ddd07471b..da684ffc8d18b8 100644 --- a/be/src/exec/operator/result_sink_operator.cpp +++ b/be/src/exec/operator/result_sink_operator.cpp @@ -197,15 +197,20 @@ Status ResultSinkLocalState::close(RuntimeState* state, Status exec_status) { state->get_query_ctx()->resource_ctx()->io_context()->update_returned_rows( written_rows); } - RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), final_status, written_rows)); + bool is_fully_closed = false; + RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), final_status, written_rows, + is_fully_closed)); + // Schedule deferred cleanup only when the last instance closes the shared + // buffer. In parallel result-sink mode the buffer is keyed by query_id; + // in non-parallel mode it is keyed by fragment_instance_id. Either way, + // _sender->buffer_id() returns the correct registration key, so there is + // no need to branch on enable_parallel_result_sink here. + if (is_fully_closed) { + state->exec_env()->result_mgr()->cancel_at_time( + time(nullptr) + config::result_buffer_cancelled_interval_time, + _sender->buffer_id()); + } } - // In parallel result sink mode, the buffer is registered under query_id; otherwise - // it is registered under fragment_instance_id. Pass the matching key so the - // deferred cancel actually finds and removes the buffer entry. - state->exec_env()->result_mgr()->cancel_at_time( - time(nullptr) + config::result_buffer_cancelled_interval_time, - state->query_options().enable_parallel_result_sink ? state->query_id() - : state->fragment_instance_id()); RETURN_IF_ERROR(Base::close(state, exec_status)); return final_status; } diff --git a/be/src/runtime/result_block_buffer.cpp b/be/src/runtime/result_block_buffer.cpp index 828af5da2c70ba..df595f29cc0109 100644 --- a/be/src/runtime/result_block_buffer.cpp +++ b/be/src/runtime/result_block_buffer.cpp @@ -60,7 +60,7 @@ ResultBlockBuffer::ResultBlockBuffer(TUniqueId id, RuntimeState* template Status ResultBlockBuffer::close(const TUniqueId& id, Status exec_status, - int64_t num_rows) { + int64_t num_rows, bool& is_fully_closed) { std::unique_lock l(_lock); _returned_rows.fetch_add(num_rows); // close will be called multiple times and error status needs to be collected. @@ -77,9 +77,13 @@ Status ResultBlockBuffer::close(const TUniqueId& id, Status exec_ print_id(id)); } if (!_result_sink_dependencies.empty()) { + // Still waiting for other instances to finish; this is not the final close. + is_fully_closed = false; return _status; } + // All instances have closed: the buffer is now fully closed. + is_fully_closed = true; _is_close = true; _arrow_data_arrival.notify_all(); diff --git a/be/src/runtime/result_block_buffer.h b/be/src/runtime/result_block_buffer.h index 40df3b0538b643..a14d9e831ffef3 100644 --- a/be/src/runtime/result_block_buffer.h +++ b/be/src/runtime/result_block_buffer.h @@ -56,9 +56,21 @@ class ResultBlockBufferBase { ResultBlockBufferBase() = default; virtual ~ResultBlockBufferBase() = default; - virtual Status close(const TUniqueId& id, Status exec_status, int64_t num_rows) = 0; + // Close one fragment instance's contribution to this buffer. When the last + // registered instance calls close(), |is_fully_closed| is set to true, + // indicating that no more producers will write to this buffer and callers may + // safely schedule deferred cleanup. The buffer is keyed in ResultBufferMgr + // under buffer_id(); use that id (not the per-instance fragment_instance_id) + // when scheduling cancel_at_time() for the deferred cleanup. + virtual Status close(const TUniqueId& id, Status exec_status, int64_t num_rows, + bool& is_fully_closed) = 0; virtual void cancel(const Status& reason) = 0; + // The id under which this buffer was registered in ResultBufferMgr. + // In parallel result-sink mode this equals query_id; in non-parallel mode + // it equals fragment_instance_id. + [[nodiscard]] virtual const TUniqueId& buffer_id() const = 0; + [[nodiscard]] virtual std::shared_ptr mem_tracker() = 0; virtual void set_dependency(const TUniqueId& id, std::shared_ptr result_sink_dependency) = 0; @@ -74,9 +86,11 @@ class ResultBlockBuffer : public ResultBlockBufferBase { Status add_batch(RuntimeState* state, std::shared_ptr& result); Status get_batch(std::shared_ptr ctx); - Status close(const TUniqueId& id, Status exec_status, int64_t num_rows) override; + Status close(const TUniqueId& id, Status exec_status, int64_t num_rows, + bool& is_fully_closed) override; void cancel(const Status& reason) override; + [[nodiscard]] const TUniqueId& buffer_id() const override { return _fragment_id; } [[nodiscard]] std::shared_ptr mem_tracker() override { return _mem_tracker; } void set_dependency(const TUniqueId& id, std::shared_ptr result_sink_dependency) override; From 5c329a34c95e1799ea9cedeffdbda593d097c4af Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Thu, 26 Mar 2026 11:16:48 +0800 Subject: [PATCH 3/3] fix ut --- be/test/exec/sink/arrow_result_block_buffer_test.cpp | 12 +++++++++--- be/test/exec/sink/result_block_buffer_test.cpp | 12 +++++++++--- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/be/test/exec/sink/arrow_result_block_buffer_test.cpp b/be/test/exec/sink/arrow_result_block_buffer_test.cpp index 3cb6939b78c08b..a87a03d154274a 100644 --- a/be/test/exec/sink/arrow_result_block_buffer_test.cpp +++ b/be/test/exec/sink/arrow_result_block_buffer_test.cpp @@ -173,7 +173,9 @@ TEST_F(ArrowResultBlockBufferTest, TestArrowResultBlockBuffer) { EXPECT_FALSE(fail); } { - EXPECT_TRUE(buffer.close(ins_id, Status::OK(), 0).ok()); + bool is_fully_closed = false; + EXPECT_TRUE(buffer.close(ins_id, Status::OK(), 0, is_fully_closed).ok()); + EXPECT_TRUE(is_fully_closed); EXPECT_EQ(buffer._instance_rows[ins_id], 0); EXPECT_TRUE(buffer._instance_rows_in_queue.empty()); EXPECT_EQ(buffer._waiting_rpc.size(), 0); @@ -305,8 +307,10 @@ TEST_F(ArrowResultBlockBufferTest, TestErrorClose) { EXPECT_FALSE(fail); } { - EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0).code(), + bool is_fully_closed = false; + EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0, is_fully_closed).code(), ErrorCode::INTERNAL_ERROR); + EXPECT_TRUE(is_fully_closed); EXPECT_EQ(buffer._instance_rows[ins_id], 0); EXPECT_TRUE(buffer._instance_rows_in_queue.empty()); EXPECT_EQ(buffer._waiting_rpc.size(), 0); @@ -324,8 +328,10 @@ TEST_F(ArrowResultBlockBufferTest, TestErrorClose) { new_ins_id.lo = 1; auto new_dep = Dependency::create_shared(0, 0, "Test", true); buffer.set_dependency(new_ins_id, new_dep); - EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0).code(), + bool is_fully_closed = true; // will be set to false since new_dep remains + EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0, is_fully_closed).code(), ErrorCode::INTERNAL_ERROR); + EXPECT_FALSE(is_fully_closed); EXPECT_FALSE(data); EXPECT_FALSE(close); EXPECT_FALSE(fail); diff --git a/be/test/exec/sink/result_block_buffer_test.cpp b/be/test/exec/sink/result_block_buffer_test.cpp index 427bf0d8e7165e..46ea26c3d794e2 100644 --- a/be/test/exec/sink/result_block_buffer_test.cpp +++ b/be/test/exec/sink/result_block_buffer_test.cpp @@ -160,7 +160,9 @@ TEST_F(MysqlResultBlockBufferTest, TestMySQLResultBlockBuffer) { EXPECT_FALSE(fail); } { - EXPECT_TRUE(buffer.close(ins_id, Status::OK(), 0).ok()); + bool is_fully_closed = false; + EXPECT_TRUE(buffer.close(ins_id, Status::OK(), 0, is_fully_closed).ok()); + EXPECT_TRUE(is_fully_closed); EXPECT_EQ(buffer._instance_rows[ins_id], 0); EXPECT_TRUE(buffer._instance_rows_in_queue.empty()); EXPECT_EQ(buffer._waiting_rpc.size(), 0); @@ -289,8 +291,10 @@ TEST_F(MysqlResultBlockBufferTest, TestErrorClose) { EXPECT_FALSE(fail); } { - EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0).code(), + bool is_fully_closed = false; + EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0, is_fully_closed).code(), ErrorCode::INTERNAL_ERROR); + EXPECT_TRUE(is_fully_closed); EXPECT_EQ(buffer._instance_rows[ins_id], 0); EXPECT_TRUE(buffer._instance_rows_in_queue.empty()); EXPECT_EQ(buffer._waiting_rpc.size(), 0); @@ -308,8 +312,10 @@ TEST_F(MysqlResultBlockBufferTest, TestErrorClose) { new_ins_id.lo = 1; auto new_dep = Dependency::create_shared(0, 0, "Test", true); buffer.set_dependency(new_ins_id, new_dep); - EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0).code(), + bool is_fully_closed = true; // will be set to false since new_dep remains + EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0, is_fully_closed).code(), ErrorCode::INTERNAL_ERROR); + EXPECT_FALSE(is_fully_closed); EXPECT_FALSE(data); EXPECT_FALSE(close); EXPECT_FALSE(fail);