diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index f0312612d7..032b9ea8c2 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -3504,28 +3504,10 @@ mod tests { let mut stream = datafusion_plan.native_plan.execute(0, task_ctx).unwrap(); let runtime = tokio::runtime::Runtime::new().unwrap(); - let (tx, mut rx) = mpsc::channel(1); - - // Separate thread to send the EOF signal once we've processed the only input batch - runtime.spawn(async move { - // Create a dictionary array with 100 values, and use it as input to the execution. - let keys = Int32Array::new((0..(row_count as i32)).map(|n| n % 4).collect(), None); - let values = Int32Array::from(vec![0, 1, 2, 3]); - let input_array = DictionaryArray::new(keys, Arc::new(values)); - let input_batch1 = InputBatch::Batch(vec![Arc::new(input_array)], row_count); - let input_batch2 = InputBatch::EOF; - - let batches = vec![input_batch1, input_batch2]; - - for batch in batches.into_iter() { - tx.send(batch).await.unwrap(); - } - }); - runtime.block_on(async move { + let mut eof_sent = false; + let mut got_result = false; loop { - let batch = rx.recv().await.unwrap(); - scans[0].set_input_batch(batch); match poll!(stream.next()) { Poll::Ready(Some(batch)) => { assert!(batch.is_ok(), "got error {}", batch.unwrap_err()); @@ -3533,13 +3515,22 @@ mod tests { assert_eq!(batch.num_rows(), row_count / 4); // dictionary should be unpacked assert!(matches!(batch.column(0).data_type(), DataType::Int32)); + got_result = true; } Poll::Ready(None) => { break; } - _ => {} + Poll::Pending => { + // Stream needs more input (e.g. FilterExec's batch coalescer + // is accumulating). Send EOF to flush. + if !eof_sent { + scans[0].set_input_batch(InputBatch::EOF); + eof_sent = true; + } + } } } + assert!(got_result, "Expected at least one result batch"); }); } @@ -3589,29 +3580,10 @@ mod tests { let mut stream = datafusion_plan.native_plan.execute(0, task_ctx).unwrap(); let runtime = tokio::runtime::Runtime::new().unwrap(); - let (tx, mut rx) = mpsc::channel(1); - - // Separate thread to send the EOF signal once we've processed the only input batch - runtime.spawn(async move { - // Create a dictionary array with 100 values, and use it as input to the execution. - let keys = Int32Array::new((0..(row_count as i32)).map(|n| n % 4).collect(), None); - let values = StringArray::from(vec!["foo", "bar", "hello", "comet"]); - let input_array = DictionaryArray::new(keys, Arc::new(values)); - let input_batch1 = InputBatch::Batch(vec![Arc::new(input_array)], row_count); - - let input_batch2 = InputBatch::EOF; - - let batches = vec![input_batch1, input_batch2]; - - for batch in batches.into_iter() { - tx.send(batch).await.unwrap(); - } - }); - runtime.block_on(async move { + let mut eof_sent = false; + let mut got_result = false; loop { - let batch = rx.recv().await.unwrap(); - scans[0].set_input_batch(batch); match poll!(stream.next()) { Poll::Ready(Some(batch)) => { assert!(batch.is_ok(), "got error {}", batch.unwrap_err()); @@ -3619,13 +3591,22 @@ mod tests { assert_eq!(batch.num_rows(), row_count / 4); // string/binary should no longer be packed with dictionary assert!(matches!(batch.column(0).data_type(), DataType::Utf8)); + got_result = true; } Poll::Ready(None) => { break; } - _ => {} + Poll::Pending => { + // Stream needs more input (e.g. FilterExec's batch coalescer + // is accumulating). Send EOF to flush. + if !eof_sent { + scans[0].set_input_batch(InputBatch::EOF); + eof_sent = true; + } + } } } + assert!(got_result, "Expected at least one result batch"); }); }