Skip to content

Commit cb14b54

Browse files
author
Rafał Hibner
committed
Handle PauseProducing in asof_join
1 parent 10f2c9c commit cb14b54

1 file changed

Lines changed: 51 additions & 2 deletions

File tree

cpp/src/arrow/acero/asof_join_node.cc

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1108,6 +1108,7 @@ class AsofJoinNode : public ExecNode {
11081108
EndFromProcessThread();
11091109
return;
11101110
}
1111+
backpressure_future_.Wait();
11111112
if (!Process()) {
11121113
return;
11131114
}
@@ -1520,8 +1521,51 @@ class AsofJoinNode : public ExecNode {
15201521
return Status::OK();
15211522
}
15221523

1523-
void PauseProducing(ExecNode* output, int32_t counter) override {}
1524-
void ResumeProducing(ExecNode* output, int32_t counter) override {}
1524+
void PauseProducing(ExecNode* output, int32_t counter) override {
1525+
std::lock_guard<std::mutex> lg(backpressure_mutex_);
1526+
if (counter <= last_backpressure_counter_) {
1527+
return;
1528+
}
1529+
last_backpressure_counter_ = counter;
1530+
if (!backpressure_future_.is_finished()) {
1531+
// Could happen if we get something like Pause(1) Pause(3) Resume(2)
1532+
return;
1533+
}
1534+
backpressure_future_ = Future<>::Make();
1535+
}
1536+
void ResumeProducing(ExecNode* output, int32_t counter) override {
1537+
Future<> to_finish;
1538+
{
1539+
std::lock_guard<std::mutex> lg(backpressure_mutex_);
1540+
if (counter <= last_backpressure_counter_) {
1541+
return;
1542+
}
1543+
last_backpressure_counter_ = counter;
1544+
if (backpressure_future_.is_finished()) {
1545+
return;
1546+
}
1547+
to_finish = backpressure_future_;
1548+
backpressure_future_ = Future<>::MakeFinished();
1549+
}
1550+
to_finish.MarkFinished();
1551+
}
1552+
1553+
Status StopProducing() override {
1554+
// GH-35837: ensure node is not paused
1555+
Future<> to_finish;
1556+
{
1557+
std::lock_guard<std::mutex> lg(backpressure_mutex_);
1558+
if (!backpressure_future_.is_finished()) {
1559+
to_finish = backpressure_future_;
1560+
backpressure_future_ = Future<>::MakeFinished();
1561+
}
1562+
}
1563+
if (to_finish.is_valid()) {
1564+
to_finish.MarkFinished();
1565+
}
1566+
// only then stop
1567+
return ExecNode::StopProducing();
1568+
}
15251569

15261570
Status StopProducingImpl() override {
15271571
#ifdef ARROW_ENABLE_THREADING
@@ -1549,6 +1593,11 @@ class AsofJoinNode : public ExecNode {
15491593
// Each input state corresponds to an input table
15501594
std::vector<std::unique_ptr<InputState>> state_;
15511595
std::mutex gate_;
1596+
1597+
std::mutex backpressure_mutex_;
1598+
std::atomic<int32_t> last_backpressure_counter_{0};
1599+
Future<> backpressure_future_ = Future<>::MakeFinished();
1600+
15521601
TolType tolerance_;
15531602
#ifndef NDEBUG
15541603
std::ostream* debug_os_;

0 commit comments

Comments
 (0)