diff --git a/be/src/exec/operator/result_file_sink_operator.cpp b/be/src/exec/operator/result_file_sink_operator.cpp index 77eceae8e8ce9b..26c46fc05f4e34 100644 --- a/be/src/exec/operator/result_file_sink_operator.cpp +++ b/be/src/exec/operator/result_file_sink_operator.cpp @@ -133,11 +133,20 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status) if (_sender) { int64_t written_rows = _writer == nullptr ? 0 : _writer->get_written_rows(); state->get_query_ctx()->resource_ctx()->io_context()->update_returned_rows(written_rows); - RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), final_status, written_rows)); + bool is_fully_closed = false; + RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), final_status, written_rows, + is_fully_closed)); + // Schedule deferred cleanup only when the last instance closes the shared + // buffer. In parallel outfile mode the buffer is keyed by query_id; in + // non-parallel mode it is keyed by fragment_instance_id. Either way, + // _sender->buffer_id() returns the correct registration key, so there is + // no need to branch on enable_parallel_outfile here. + if (is_fully_closed) { + state->exec_env()->result_mgr()->cancel_at_time( + time(nullptr) + config::result_buffer_cancelled_interval_time, + _sender->buffer_id()); + } } - state->exec_env()->result_mgr()->cancel_at_time( - time(nullptr) + config::result_buffer_cancelled_interval_time, - state->fragment_instance_id()); return Base::close(state, exec_status); } diff --git a/be/src/exec/operator/result_sink_operator.cpp b/be/src/exec/operator/result_sink_operator.cpp index b361cb7b6e1f90..da684ffc8d18b8 100644 --- a/be/src/exec/operator/result_sink_operator.cpp +++ b/be/src/exec/operator/result_sink_operator.cpp @@ -197,11 +197,20 @@ Status ResultSinkLocalState::close(RuntimeState* state, Status exec_status) { state->get_query_ctx()->resource_ctx()->io_context()->update_returned_rows( written_rows); } - RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), final_status, written_rows)); + bool is_fully_closed = false; + RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), final_status, written_rows, + is_fully_closed)); + // Schedule deferred cleanup only when the last instance closes the shared + // buffer. In parallel result-sink mode the buffer is keyed by query_id; + // in non-parallel mode it is keyed by fragment_instance_id. Either way, + // _sender->buffer_id() returns the correct registration key, so there is + // no need to branch on enable_parallel_result_sink here. + if (is_fully_closed) { + state->exec_env()->result_mgr()->cancel_at_time( + time(nullptr) + config::result_buffer_cancelled_interval_time, + _sender->buffer_id()); + } } - state->exec_env()->result_mgr()->cancel_at_time( - time(nullptr) + config::result_buffer_cancelled_interval_time, - state->fragment_instance_id()); RETURN_IF_ERROR(Base::close(state, exec_status)); return final_status; } diff --git a/be/src/runtime/result_block_buffer.cpp b/be/src/runtime/result_block_buffer.cpp index 828af5da2c70ba..df595f29cc0109 100644 --- a/be/src/runtime/result_block_buffer.cpp +++ b/be/src/runtime/result_block_buffer.cpp @@ -60,7 +60,7 @@ ResultBlockBuffer::ResultBlockBuffer(TUniqueId id, RuntimeState* template Status ResultBlockBuffer::close(const TUniqueId& id, Status exec_status, - int64_t num_rows) { + int64_t num_rows, bool& is_fully_closed) { std::unique_lock l(_lock); _returned_rows.fetch_add(num_rows); // close will be called multiple times and error status needs to be collected. @@ -77,9 +77,13 @@ Status ResultBlockBuffer::close(const TUniqueId& id, Status exec_ print_id(id)); } if (!_result_sink_dependencies.empty()) { + // Still waiting for other instances to finish; this is not the final close. + is_fully_closed = false; return _status; } + // All instances have closed: the buffer is now fully closed. + is_fully_closed = true; _is_close = true; _arrow_data_arrival.notify_all(); diff --git a/be/src/runtime/result_block_buffer.h b/be/src/runtime/result_block_buffer.h index 40df3b0538b643..a14d9e831ffef3 100644 --- a/be/src/runtime/result_block_buffer.h +++ b/be/src/runtime/result_block_buffer.h @@ -56,9 +56,21 @@ class ResultBlockBufferBase { ResultBlockBufferBase() = default; virtual ~ResultBlockBufferBase() = default; - virtual Status close(const TUniqueId& id, Status exec_status, int64_t num_rows) = 0; + // Close one fragment instance's contribution to this buffer. When the last + // registered instance calls close(), |is_fully_closed| is set to true, + // indicating that no more producers will write to this buffer and callers may + // safely schedule deferred cleanup. The buffer is keyed in ResultBufferMgr + // under buffer_id(); use that id (not the per-instance fragment_instance_id) + // when scheduling cancel_at_time() for the deferred cleanup. + virtual Status close(const TUniqueId& id, Status exec_status, int64_t num_rows, + bool& is_fully_closed) = 0; virtual void cancel(const Status& reason) = 0; + // The id under which this buffer was registered in ResultBufferMgr. + // In parallel result-sink mode this equals query_id; in non-parallel mode + // it equals fragment_instance_id. + [[nodiscard]] virtual const TUniqueId& buffer_id() const = 0; + [[nodiscard]] virtual std::shared_ptr mem_tracker() = 0; virtual void set_dependency(const TUniqueId& id, std::shared_ptr result_sink_dependency) = 0; @@ -74,9 +86,11 @@ class ResultBlockBuffer : public ResultBlockBufferBase { Status add_batch(RuntimeState* state, std::shared_ptr& result); Status get_batch(std::shared_ptr ctx); - Status close(const TUniqueId& id, Status exec_status, int64_t num_rows) override; + Status close(const TUniqueId& id, Status exec_status, int64_t num_rows, + bool& is_fully_closed) override; void cancel(const Status& reason) override; + [[nodiscard]] const TUniqueId& buffer_id() const override { return _fragment_id; } [[nodiscard]] std::shared_ptr mem_tracker() override { return _mem_tracker; } void set_dependency(const TUniqueId& id, std::shared_ptr result_sink_dependency) override; diff --git a/be/test/exec/sink/arrow_result_block_buffer_test.cpp b/be/test/exec/sink/arrow_result_block_buffer_test.cpp index 3cb6939b78c08b..a87a03d154274a 100644 --- a/be/test/exec/sink/arrow_result_block_buffer_test.cpp +++ b/be/test/exec/sink/arrow_result_block_buffer_test.cpp @@ -173,7 +173,9 @@ TEST_F(ArrowResultBlockBufferTest, TestArrowResultBlockBuffer) { EXPECT_FALSE(fail); } { - EXPECT_TRUE(buffer.close(ins_id, Status::OK(), 0).ok()); + bool is_fully_closed = false; + EXPECT_TRUE(buffer.close(ins_id, Status::OK(), 0, is_fully_closed).ok()); + EXPECT_TRUE(is_fully_closed); EXPECT_EQ(buffer._instance_rows[ins_id], 0); EXPECT_TRUE(buffer._instance_rows_in_queue.empty()); EXPECT_EQ(buffer._waiting_rpc.size(), 0); @@ -305,8 +307,10 @@ TEST_F(ArrowResultBlockBufferTest, TestErrorClose) { EXPECT_FALSE(fail); } { - EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0).code(), + bool is_fully_closed = false; + EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0, is_fully_closed).code(), ErrorCode::INTERNAL_ERROR); + EXPECT_TRUE(is_fully_closed); EXPECT_EQ(buffer._instance_rows[ins_id], 0); EXPECT_TRUE(buffer._instance_rows_in_queue.empty()); EXPECT_EQ(buffer._waiting_rpc.size(), 0); @@ -324,8 +328,10 @@ TEST_F(ArrowResultBlockBufferTest, TestErrorClose) { new_ins_id.lo = 1; auto new_dep = Dependency::create_shared(0, 0, "Test", true); buffer.set_dependency(new_ins_id, new_dep); - EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0).code(), + bool is_fully_closed = true; // will be set to false since new_dep remains + EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0, is_fully_closed).code(), ErrorCode::INTERNAL_ERROR); + EXPECT_FALSE(is_fully_closed); EXPECT_FALSE(data); EXPECT_FALSE(close); EXPECT_FALSE(fail); diff --git a/be/test/exec/sink/result_block_buffer_test.cpp b/be/test/exec/sink/result_block_buffer_test.cpp index 427bf0d8e7165e..46ea26c3d794e2 100644 --- a/be/test/exec/sink/result_block_buffer_test.cpp +++ b/be/test/exec/sink/result_block_buffer_test.cpp @@ -160,7 +160,9 @@ TEST_F(MysqlResultBlockBufferTest, TestMySQLResultBlockBuffer) { EXPECT_FALSE(fail); } { - EXPECT_TRUE(buffer.close(ins_id, Status::OK(), 0).ok()); + bool is_fully_closed = false; + EXPECT_TRUE(buffer.close(ins_id, Status::OK(), 0, is_fully_closed).ok()); + EXPECT_TRUE(is_fully_closed); EXPECT_EQ(buffer._instance_rows[ins_id], 0); EXPECT_TRUE(buffer._instance_rows_in_queue.empty()); EXPECT_EQ(buffer._waiting_rpc.size(), 0); @@ -289,8 +291,10 @@ TEST_F(MysqlResultBlockBufferTest, TestErrorClose) { EXPECT_FALSE(fail); } { - EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0).code(), + bool is_fully_closed = false; + EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0, is_fully_closed).code(), ErrorCode::INTERNAL_ERROR); + EXPECT_TRUE(is_fully_closed); EXPECT_EQ(buffer._instance_rows[ins_id], 0); EXPECT_TRUE(buffer._instance_rows_in_queue.empty()); EXPECT_EQ(buffer._waiting_rpc.size(), 0); @@ -308,8 +312,10 @@ TEST_F(MysqlResultBlockBufferTest, TestErrorClose) { new_ins_id.lo = 1; auto new_dep = Dependency::create_shared(0, 0, "Test", true); buffer.set_dependency(new_ins_id, new_dep); - EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0).code(), + bool is_fully_closed = true; // will be set to false since new_dep remains + EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0, is_fully_closed).code(), ErrorCode::INTERNAL_ERROR); + EXPECT_FALSE(is_fully_closed); EXPECT_FALSE(data); EXPECT_FALSE(close); EXPECT_FALSE(fail);