@@ -1532,7 +1532,7 @@ void TestBackpressure(BatchesMaker maker, int batch_size, int num_l_batches,
15321532 return true ;
15331533 };
15341534
1535- BusyWait (60 .0 , has_bp_been_applied);
1535+ BusyWait (3 .0 , has_bp_been_applied);
15361536 ASSERT_TRUE (has_bp_been_applied ());
15371537
15381538 gate.ReleaseAllBatches ();
@@ -1649,11 +1649,10 @@ TEST(AsofJoinTest, PauseProducingAsofJoinSource) {
16491649 EXPECT_FALSE (backpressure_monitor->is_paused ());
16501650 batch_producer_left.producer ().Push (l_batches.batches [l_cnt++]);
16511651 batch_producer_right.producer ().Push (r0_batches.batches [r_cnt++]);
1652- // }
16531652
1654- // One more batch should trigger back pressure
1653+ // this should trigger pause on sink
16551654
1656- BusyWait (60 .0 , [&]() { return backpressure_monitor->is_paused (); });
1655+ BusyWait (3 .0 , [&]() { return backpressure_monitor->is_paused (); });
16571656 arrow::io::internal::GetIOThreadPool ()->WaitForIdle ();
16581657 arrow::internal::GetCpuThreadPool ()->WaitForIdle ();
16591658
@@ -1678,38 +1677,33 @@ TEST(AsofJoinTest, PauseProducingAsofJoinSource) {
16781677 batch_producer_right.producer ().Push (r0_batches.batches [r_cnt++]);
16791678 }
16801679
1681- std::optional<ExecBatch> opt_batch;
1680+ BusyWait (3.0 , is_l_paused);
1681+ BusyWait (3.0 , is_r_paused);
1682+ arrow::io::internal::GetIOThreadPool ()->WaitForIdle ();
16821683 arrow::internal::GetCpuThreadPool ()->WaitForIdle ();
1683- // Read the batches from the sink to open up input of the asof join node
1684- for (uint32_t i = 0 ; i < thresholdOfBackpressureAsof - thresholdOfBackpressureAsofLow;
1685- i++) {
1686- SleepABit ();
1687- EXPECT_TRUE (is_l_paused ());
1688- EXPECT_TRUE (is_r_paused ());
1689- EXPECT_TRUE (backpressure_monitor->is_paused ());
1684+ // Verify pause propagates
1685+ EXPECT_TRUE (is_l_paused ());
1686+ EXPECT_TRUE (is_r_paused ());
16901687
1691- ASSERT_FINISHES_OK_AND_ASSIGN (opt_batch, sink_gen ());
1692- EXPECT_TRUE (opt_batch);
1693- }
1688+ batch_producer_left.producer ().Push (IterationEnd<std::optional<ExecBatch>>());
1689+ batch_producer_right.producer ().Push (IterationEnd<std::optional<ExecBatch>>());
16941690
1695- arrow::internal::GetCpuThreadPool ()-> WaitForIdle () ;
1691+ std::optional<ExecBatch> opt_batch ;
16961692
1697- // Finish the batches in the left and right producers
1698- for (uint32_t i = 0 ; i < thresholdOfBackpressureAsofLow + 1 ; i++) {
1699- SleepABit ();
1700- EXPECT_FALSE (is_l_paused ());
1701- EXPECT_FALSE (is_r_paused ());
1693+ for (uint32_t i = 1 ; i < thresholdOfBackpressureAsof - thresholdOfBackpressureAsofLow;
1694+ i++) {
17021695 ASSERT_FINISHES_OK_AND_ASSIGN (opt_batch, sink_gen ());
17031696 EXPECT_TRUE (opt_batch);
17041697 }
1698+ BusyWait (3.0 , [&]() { return !is_l_paused (); });
1699+ BusyWait (3.0 , [&]() { return !is_r_paused (); });
1700+ arrow::io::internal::GetIOThreadPool ()->WaitForIdle ();
1701+ arrow::internal::GetCpuThreadPool ()->WaitForIdle ();
17051702
17061703 EXPECT_FALSE (is_l_paused ());
17071704 EXPECT_FALSE (is_r_paused ());
17081705 EXPECT_FALSE (backpressure_monitor->is_paused ());
17091706
1710- batch_producer_left.producer ().Push (IterationEnd<std::optional<ExecBatch>>());
1711- batch_producer_right.producer ().Push (IterationEnd<std::optional<ExecBatch>>());
1712-
17131707 ASSERT_FINISHES_OK_AND_ASSIGN (opt_batch, sink_gen ());
17141708 EXPECT_FALSE (opt_batch);
17151709
0 commit comments