Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions be/src/exec/operator/result_file_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,20 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status)
if (_sender) {
int64_t written_rows = _writer == nullptr ? 0 : _writer->get_written_rows();
state->get_query_ctx()->resource_ctx()->io_context()->update_returned_rows(written_rows);
RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), final_status, written_rows));
bool is_fully_closed = false;
RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), final_status, written_rows,
is_fully_closed));
// Schedule deferred cleanup only when the last instance closes the shared
// buffer. In parallel outfile mode the buffer is keyed by query_id; in
// non-parallel mode it is keyed by fragment_instance_id. Either way,
// _sender->buffer_id() returns the correct registration key, so there is
// no need to branch on enable_parallel_outfile here.
if (is_fully_closed) {
state->exec_env()->result_mgr()->cancel_at_time(
time(nullptr) + config::result_buffer_cancelled_interval_time,
_sender->buffer_id());
}
}
state->exec_env()->result_mgr()->cancel_at_time(
time(nullptr) + config::result_buffer_cancelled_interval_time,
state->fragment_instance_id());

return Base::close(state, exec_status);
}
Expand Down
17 changes: 13 additions & 4 deletions be/src/exec/operator/result_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,20 @@ Status ResultSinkLocalState::close(RuntimeState* state, Status exec_status) {
state->get_query_ctx()->resource_ctx()->io_context()->update_returned_rows(
written_rows);
}
RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), final_status, written_rows));
bool is_fully_closed = false;
RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), final_status, written_rows,
is_fully_closed));
// Schedule deferred cleanup only when the last instance closes the shared
// buffer. In parallel result-sink mode the buffer is keyed by query_id;
// in non-parallel mode it is keyed by fragment_instance_id. Either way,
// _sender->buffer_id() returns the correct registration key, so there is
// no need to branch on enable_parallel_result_sink here.
if (is_fully_closed) {
state->exec_env()->result_mgr()->cancel_at_time(
time(nullptr) + config::result_buffer_cancelled_interval_time,
_sender->buffer_id());
}
}
state->exec_env()->result_mgr()->cancel_at_time(
time(nullptr) + config::result_buffer_cancelled_interval_time,
state->fragment_instance_id());
RETURN_IF_ERROR(Base::close(state, exec_status));
return final_status;
}
Expand Down
6 changes: 5 additions & 1 deletion be/src/runtime/result_block_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ ResultBlockBuffer<ResultCtxType>::ResultBlockBuffer(TUniqueId id, RuntimeState*

template <typename ResultCtxType>
Status ResultBlockBuffer<ResultCtxType>::close(const TUniqueId& id, Status exec_status,
int64_t num_rows) {
int64_t num_rows, bool& is_fully_closed) {
std::unique_lock<std::mutex> l(_lock);
_returned_rows.fetch_add(num_rows);
// close will be called multiple times and error status needs to be collected.
Expand All @@ -77,9 +77,13 @@ Status ResultBlockBuffer<ResultCtxType>::close(const TUniqueId& id, Status exec_
print_id(id));
}
if (!_result_sink_dependencies.empty()) {
// Still waiting for other instances to finish; this is not the final close.
is_fully_closed = false;
return _status;
}

// All instances have closed: the buffer is now fully closed.
is_fully_closed = true;
_is_close = true;
_arrow_data_arrival.notify_all();

Expand Down
18 changes: 16 additions & 2 deletions be/src/runtime/result_block_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,21 @@ class ResultBlockBufferBase {
ResultBlockBufferBase() = default;
virtual ~ResultBlockBufferBase() = default;

virtual Status close(const TUniqueId& id, Status exec_status, int64_t num_rows) = 0;
// Close one fragment instance's contribution to this buffer. When the last
// registered instance calls close(), |is_fully_closed| is set to true,
// indicating that no more producers will write to this buffer and callers may
// safely schedule deferred cleanup. The buffer is keyed in ResultBufferMgr
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Build Break] The close() signature changed from 3 to 4 parameters, but the following test call sites were not updated and will fail to compile:

  • be/test/exec/sink/result_block_buffer_test.cpp lines 163, 292, 311
  • be/test/exec/sink/arrow_result_block_buffer_test.cpp lines 176, 308, 327

All 6 sites call buffer.close(ins_id, Status::..., 0) with only 3 arguments. They need to be updated to pass bool& is_fully_closed, e.g.:

bool is_fully_closed = false;
EXPECT_TRUE(buffer.close(ins_id, Status::OK(), 0, is_fully_closed).ok());

Also consider adding test assertions on is_fully_closed itself — e.g., verify it returns false when other dependencies remain, and true when the last instance closes.

// under buffer_id(); use that id (not the per-instance fragment_instance_id)
// when scheduling cancel_at_time() for the deferred cleanup.
virtual Status close(const TUniqueId& id, Status exec_status, int64_t num_rows,
bool& is_fully_closed) = 0;
virtual void cancel(const Status& reason) = 0;

// The id under which this buffer was registered in ResultBufferMgr.
// In parallel result-sink mode this equals query_id; in non-parallel mode
// it equals fragment_instance_id.
[[nodiscard]] virtual const TUniqueId& buffer_id() const = 0;

[[nodiscard]] virtual std::shared_ptr<MemTrackerLimiter> mem_tracker() = 0;
virtual void set_dependency(const TUniqueId& id,
std::shared_ptr<Dependency> result_sink_dependency) = 0;
Expand All @@ -74,9 +86,11 @@ class ResultBlockBuffer : public ResultBlockBufferBase {

Status add_batch(RuntimeState* state, std::shared_ptr<InBlockType>& result);
Status get_batch(std::shared_ptr<ResultCtxType> ctx);
Status close(const TUniqueId& id, Status exec_status, int64_t num_rows) override;
Status close(const TUniqueId& id, Status exec_status, int64_t num_rows,
bool& is_fully_closed) override;
void cancel(const Status& reason) override;

[[nodiscard]] const TUniqueId& buffer_id() const override { return _fragment_id; }
[[nodiscard]] std::shared_ptr<MemTrackerLimiter> mem_tracker() override { return _mem_tracker; }
void set_dependency(const TUniqueId& id,
std::shared_ptr<Dependency> result_sink_dependency) override;
Expand Down
12 changes: 9 additions & 3 deletions be/test/exec/sink/arrow_result_block_buffer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,9 @@ TEST_F(ArrowResultBlockBufferTest, TestArrowResultBlockBuffer) {
EXPECT_FALSE(fail);
}
{
EXPECT_TRUE(buffer.close(ins_id, Status::OK(), 0).ok());
bool is_fully_closed = false;
EXPECT_TRUE(buffer.close(ins_id, Status::OK(), 0, is_fully_closed).ok());
EXPECT_TRUE(is_fully_closed);
EXPECT_EQ(buffer._instance_rows[ins_id], 0);
EXPECT_TRUE(buffer._instance_rows_in_queue.empty());
EXPECT_EQ(buffer._waiting_rpc.size(), 0);
Expand Down Expand Up @@ -305,8 +307,10 @@ TEST_F(ArrowResultBlockBufferTest, TestErrorClose) {
EXPECT_FALSE(fail);
}
{
EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0).code(),
bool is_fully_closed = false;
EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0, is_fully_closed).code(),
ErrorCode::INTERNAL_ERROR);
EXPECT_TRUE(is_fully_closed);
EXPECT_EQ(buffer._instance_rows[ins_id], 0);
EXPECT_TRUE(buffer._instance_rows_in_queue.empty());
EXPECT_EQ(buffer._waiting_rpc.size(), 0);
Expand All @@ -324,8 +328,10 @@ TEST_F(ArrowResultBlockBufferTest, TestErrorClose) {
new_ins_id.lo = 1;
auto new_dep = Dependency::create_shared(0, 0, "Test", true);
buffer.set_dependency(new_ins_id, new_dep);
EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0).code(),
bool is_fully_closed = true; // will be set to false since new_dep remains
EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0, is_fully_closed).code(),
ErrorCode::INTERNAL_ERROR);
EXPECT_FALSE(is_fully_closed);
EXPECT_FALSE(data);
EXPECT_FALSE(close);
EXPECT_FALSE(fail);
Expand Down
12 changes: 9 additions & 3 deletions be/test/exec/sink/result_block_buffer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ TEST_F(MysqlResultBlockBufferTest, TestMySQLResultBlockBuffer) {
EXPECT_FALSE(fail);
}
{
EXPECT_TRUE(buffer.close(ins_id, Status::OK(), 0).ok());
bool is_fully_closed = false;
EXPECT_TRUE(buffer.close(ins_id, Status::OK(), 0, is_fully_closed).ok());
EXPECT_TRUE(is_fully_closed);
EXPECT_EQ(buffer._instance_rows[ins_id], 0);
EXPECT_TRUE(buffer._instance_rows_in_queue.empty());
EXPECT_EQ(buffer._waiting_rpc.size(), 0);
Expand Down Expand Up @@ -289,8 +291,10 @@ TEST_F(MysqlResultBlockBufferTest, TestErrorClose) {
EXPECT_FALSE(fail);
}
{
EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0).code(),
bool is_fully_closed = false;
EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0, is_fully_closed).code(),
ErrorCode::INTERNAL_ERROR);
EXPECT_TRUE(is_fully_closed);
EXPECT_EQ(buffer._instance_rows[ins_id], 0);
EXPECT_TRUE(buffer._instance_rows_in_queue.empty());
EXPECT_EQ(buffer._waiting_rpc.size(), 0);
Expand All @@ -308,8 +312,10 @@ TEST_F(MysqlResultBlockBufferTest, TestErrorClose) {
new_ins_id.lo = 1;
auto new_dep = Dependency::create_shared(0, 0, "Test", true);
buffer.set_dependency(new_ins_id, new_dep);
EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0).code(),
bool is_fully_closed = true; // will be set to false since new_dep remains
EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0, is_fully_closed).code(),
ErrorCode::INTERNAL_ERROR);
EXPECT_FALSE(is_fully_closed);
EXPECT_FALSE(data);
EXPECT_FALSE(close);
EXPECT_FALSE(fail);
Expand Down
Loading