From 71af76929c79b92e62ec7027d8177d0ac86e5a64 Mon Sep 17 00:00:00 2001 From: "Spencer C. Imbleau" Date: Thu, 11 Dec 2025 12:41:59 -0500 Subject: [PATCH 1/6] feat: add stream runner --- src/lib.rs | 4 ++ src/receiver.rs | 31 +++++++++-- src/stream.rs | 71 +++++++++++++++++++++++++ src/task_stream.rs | 128 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 230 insertions(+), 4 deletions(-) create mode 100644 src/stream.rs create mode 100644 src/task_stream.rs diff --git a/src/lib.rs b/src/lib.rs index 7802900..03be98b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,19 +2,23 @@ mod error; mod receiver; +mod stream; mod task; mod task_pool; mod task_runner; +mod task_stream; mod util; pub use error::TimeoutError; pub use receiver::AsyncReceiver; +pub use stream::AsyncStream; pub use task::AsyncTask; pub use task::TimedAsyncTask; pub use task_pool::TaskPool; pub use task_pool::TimedTaskPool; pub use task_runner::TaskRunner; pub use task_runner::TimedTaskRunner; +pub use task_stream::TaskStream; pub use util::pending; pub use util::sleep; pub use util::timeout; diff --git a/src/receiver.rs b/src/receiver.rs index ec7cc44..2a1e7b0 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -1,9 +1,9 @@ -use std::sync::{ - Arc, - atomic::{AtomicBool, Ordering}, -}; +use std::sync::Arc; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; use futures::task::AtomicWaker; +use tokio::sync::mpsc; use tokio::sync::oneshot::{self}; /// A channel that catches an [`AsyncTask`](crate::AsyncTask) result. @@ -28,3 +28,26 @@ impl AsyncReceiver { } } } + +/// A channel that receives multiple items from an async stream. +#[derive(Debug)] +pub struct AsyncStreamReceiver { + pub(crate) finished: Arc, + pub(crate) waker: Arc, + pub(crate) receiver: mpsc::UnboundedReceiver, +} + +impl AsyncStreamReceiver { + /// Returns whether the stream has finished producing items. + pub fn is_finished(&self) -> bool { + self.finished.load(Ordering::Relaxed) + } + + /// Try to receive the next item from the stream without blocking. + pub fn try_recv(&mut self) -> Option { + match self.receiver.try_recv() { + Ok(item) => Some(item), + Err(_) => None, + } + } +} diff --git a/src/stream.rs b/src/stream.rs new file mode 100644 index 0000000..c60869f --- /dev/null +++ b/src/stream.rs @@ -0,0 +1,71 @@ +use std::pin::Pin; +use std::sync::Arc; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; + +use bevy_tasks::ConditionalSend; +use bevy_tasks::futures_lite::Stream; +use bevy_tasks::futures_lite::StreamExt; +use futures::task::AtomicWaker; +use tokio::sync::mpsc; + +use crate::receiver::AsyncStreamReceiver; + +/// An async stream task that yields multiple items. +pub struct AsyncStream { + stream: Pin + Send + 'static>>, +} + +impl AsyncStream { + pub fn new(stream: S) -> Self + where + S: Stream + Send + 'static, + { + Self { + stream: Box::pin(stream), + } + } + + pub fn split( + self, + ) -> ( + impl std::future::Future + Send, + AsyncStreamReceiver, + ) { + let (tx, rx) = mpsc::unbounded_channel(); + let finished = Arc::new(AtomicBool::new(false)); + let finished_clone = Arc::clone(&finished); + let waker = Arc::new(AtomicWaker::new()); + let waker_clone = Arc::clone(&waker); + + let fut = async move { + let mut stream = self.stream; + while let Some(item) = stream.next().await { + if tx.send(item).is_err() { + break; + } + } + finished_clone.store(true, Ordering::Relaxed); + waker_clone.wake(); + }; + + ( + fut, + AsyncStreamReceiver { + finished, + waker, + receiver: rx, + }, + ) + } +} + +impl From for AsyncStream +where + T: ConditionalSend + 'static, + S: Stream + Send + 'static, +{ + fn from(stream: S) -> Self { + Self::new(stream) + } +} diff --git a/src/task_stream.rs b/src/task_stream.rs new file mode 100644 index 0000000..1ee72e6 --- /dev/null +++ b/src/task_stream.rs @@ -0,0 +1,128 @@ +use std::task::Poll; + +use bevy_ecs::component::Tick; +use bevy_ecs::system::ExclusiveSystemParam; +use bevy_ecs::system::ReadOnlySystemParam; +use bevy_ecs::system::SystemMeta; +use bevy_ecs::system::SystemParam; +use bevy_ecs::world::World; +use bevy_ecs::world::unsafe_world_cell::UnsafeWorldCell; +use bevy_platform::cell::SyncCell; +use bevy_tasks::AsyncComputeTaskPool; +use bevy_tasks::ConditionalSend; + +use crate::AsyncStream; +use crate::receiver::AsyncStreamReceiver; + +/// A Bevy [`SystemParam`] to execute async streams in the background. +#[derive(Debug)] +pub struct TaskStream<'s, T>(pub(crate) &'s mut Option>); + +impl TaskStream<'_, T> +where + T: ConditionalSend + 'static, +{ + /// Returns whether the stream is idle (not started). + pub fn is_idle(&self) -> bool { + self.0.is_none() + } + + /// Returns whether the stream is active (started but not finished). + pub fn is_active(&self) -> bool { + if let Some(rx) = &self.0 { + !rx.is_finished() + } else { + false + } + } + + /// Returns whether the stream is finished. + pub fn is_finished(&self) -> bool { + if let Some(rx) = &self.0 { + rx.is_finished() + } else { + false + } + } + + /// Start an async stream in the background. If there is an existing stream + /// pending, it will be dropped and replaced with the given stream. + pub fn start(&mut self, stream: impl Into>) { + let stream = stream.into(); + let (fut, rx) = stream.split(); + let task_pool = AsyncComputeTaskPool::get(); + let handle = task_pool.spawn(fut); + handle.detach(); + self.0.replace(rx); + } + + /// Forget the stream being run. This does not stop the stream. + /// + /// Note: Bevy does not support cancelling a task on web currently. + pub fn forget(&mut self) { + self.0.take(); + } + + /// Poll for the next item. Returns `Poll::Ready(Some(T))` if an item is available, + /// `Poll::Ready(None)` if the stream is finished, or `Poll::Pending` if waiting. + pub fn poll_next(&mut self) -> Poll> { + match self.0.as_mut() { + Some(rx) => match rx.try_recv() { + Some(item) => Poll::Ready(Some(item)), + None => { + if rx.is_finished() { + self.0.take(); + Poll::Ready(None) + } else { + Poll::Pending + } + } + }, + None => Poll::Ready(None), + } + } +} + +impl ExclusiveSystemParam for TaskStream<'_, T> { + type State = SyncCell>>; + type Item<'s> = TaskStream<'s, T>; + + fn init(_world: &mut World, _system_meta: &mut SystemMeta) -> Self::State { + SyncCell::new(None) + } + + fn get_param<'s>(state: &'s mut Self::State, _system_meta: &SystemMeta) -> Self::Item<'s> { + TaskStream(state.get()) + } +} + +// SAFETY: only local state is accessed +unsafe impl ReadOnlySystemParam for TaskStream<'_, T> {} + +// SAFETY: only local state is accessed +unsafe impl SystemParam for TaskStream<'_, T> { + type State = SyncCell>>; + type Item<'w, 's> = TaskStream<'s, T>; + + fn init_state(_world: &mut World) -> Self::State { + SyncCell::new(None) + } + + #[inline] + unsafe fn get_param<'w, 's>( + state: &'s mut Self::State, + _system_meta: &SystemMeta, + _world: UnsafeWorldCell<'w>, + _change_tick: Tick, + ) -> Self::Item<'w, 's> { + TaskStream(state.get()) + } + + fn init_access( + _state: &Self::State, + _system_meta: &mut SystemMeta, + _component_access_set: &mut bevy_ecs::query::FilteredAccessSet, + _world: &mut World, + ) { + } +} From d0e562eed0fc48f831cb3700e7efbd79a4887f0f Mon Sep 17 00:00:00 2001 From: "Spencer C. Imbleau" Date: Thu, 11 Dec 2025 23:50:33 -0500 Subject: [PATCH 2/6] feat: finish streaming api --- examples/cross_system_stream.rs | 59 ++++ examples/stream.rs | 53 ++++ src/lib.rs | 15 + src/receiver.rs | 22 +- src/stream.rs | 499 +++++++++++++++++++++++++++++++- src/task.rs | 4 +- 6 files changed, 644 insertions(+), 8 deletions(-) create mode 100644 examples/cross_system_stream.rs create mode 100644 examples/stream.rs diff --git a/examples/cross_system_stream.rs b/examples/cross_system_stream.rs new file mode 100644 index 0000000..e51ff94 --- /dev/null +++ b/examples/cross_system_stream.rs @@ -0,0 +1,59 @@ +//! Cross system example - This example shows how to start a stream from one system and poll it from +//! another through a resource. +use std::task::Poll; +use std::time::Duration; + +use bevy::app::PanicHandlerPlugin; +use bevy::log::LogPlugin; +use bevy::prelude::*; +use bevy::tasks::AsyncComputeTaskPool; +use bevy_async_task::AsyncStream; +use bevy_async_task::AsyncStreamReceiver; +use bevy_async_task::sleep; +use bevy_tasks::futures_lite::Stream; +use bevy_tasks::futures_lite::StreamExt; +use bevy_tasks::futures_lite::stream; + +#[derive(Resource, DerefMut, Deref, Default)] +struct MyStream(Option>); + +/// An async stream that yields numbers over time +async fn async_number_stream() -> impl Stream { + sleep(Duration::from_millis(500)).await; + stream::iter(vec![1, 2, 3, 4, 5]).then(|x| async move { + sleep(Duration::from_millis(500)).await; + x * 10 + }) +} + +fn system1_start(mut my_stream: ResMut<'_, MyStream>) { + let stream = AsyncStream::lazy(async_number_stream()); + let (fut, receiver) = stream.split(); + my_stream.replace(receiver); + AsyncComputeTaskPool::get().spawn_local(fut).detach(); + info!("Stream started!"); +} + +fn system2_poll(mut my_stream: ResMut<'_, MyStream>, mut exit: MessageWriter<'_, AppExit>) { + let Some(receiver) = my_stream.0.as_mut() else { + return; + }; + + while let Some(v) = receiver.try_recv() { + info!("Received {v}"); + } + if receiver.is_finished() { + info!("Stream finished!"); + exit.write(AppExit::Success); + } +} + +/// Entry point +pub fn main() { + App::new() + .init_resource::() + .add_plugins((MinimalPlugins, LogPlugin::default(), PanicHandlerPlugin)) + .add_systems(Startup, system1_start) + .add_systems(Update, system2_poll) + .run(); +} diff --git a/examples/stream.rs b/examples/stream.rs new file mode 100644 index 0000000..295c4cb --- /dev/null +++ b/examples/stream.rs @@ -0,0 +1,53 @@ +//! Simple example - this demonstrates running one async stream continuously. + +use std::task::Poll; +use std::time::Duration; + +use bevy::app::PanicHandlerPlugin; +use bevy::log::LogPlugin; +use bevy::prelude::*; +use bevy_async_task::AsyncStream; +use bevy_async_task::TaskStream; +use bevy_async_task::sleep; +use bevy_tasks::futures_lite::Stream; +use bevy_tasks::futures_lite::StreamExt; +use bevy_tasks::futures_lite::stream; + +/// An async stream that yields numbers over time +async fn async_number_stream() -> impl Stream { + sleep(Duration::from_millis(500)).await; + + stream::iter(vec![1, 2, 3, 4, 5]).then(|x| async move { + sleep(Duration::from_millis(500)).await; + x * 10 + }) +} + +fn my_system(mut task_stream: TaskStream<'_, u32>) { + if task_stream.is_idle() { + // Start an async stream! + let stream = AsyncStream::lazy(async_number_stream()); + task_stream.start(stream); + info!("Stream started!"); + } + + match task_stream.poll_next() { + Poll::Ready(Some(v)) => { + info!("Received {v}"); + } + Poll::Ready(None) => { + info!("Stream finished!"); + } + Poll::Pending => { + // Waiting for next item... + } + } +} + +/// Entry point +pub fn main() { + App::new() + .add_plugins((MinimalPlugins, LogPlugin::default(), PanicHandlerPlugin)) + .add_systems(Update, my_system) + .run(); +} diff --git a/src/lib.rs b/src/lib.rs index 03be98b..71aee3e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,7 @@ mod util; pub use error::TimeoutError; pub use receiver::AsyncReceiver; +pub use receiver::AsyncStreamReceiver; pub use stream::AsyncStream; pub use task::AsyncTask; pub use task::TimedAsyncTask; @@ -32,3 +33,17 @@ pub const MAX_TIMEOUT: core::time::Duration = { static I32_MAX_U64: u64 = 2_147_483_647; core::time::Duration::from_millis(I32_MAX_U64) }; + +#[doc(hidden)] +pub mod polyfill { + use bevy_tasks::ConditionalSend; + use bevy_tasks::futures_lite::Stream; + + /// The missing conditional send stream trait. + /// see: + pub trait ConditionalSendStream: Stream + ConditionalSend {} + + impl ConditionalSendStream for T {} +} + +pub use polyfill::ConditionalSendStream; diff --git a/src/receiver.rs b/src/receiver.rs index 2a1e7b0..befb1f8 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -35,19 +35,37 @@ pub struct AsyncStreamReceiver { pub(crate) finished: Arc, pub(crate) waker: Arc, pub(crate) receiver: mpsc::UnboundedReceiver, + pub(crate) received: Arc, } impl AsyncStreamReceiver { /// Returns whether the stream has finished producing items. pub fn is_finished(&self) -> bool { - self.finished.load(Ordering::Relaxed) + self.finished.load(Ordering::Relaxed) && self.receiver.is_empty() } /// Try to receive the next item from the stream without blocking. + /// Returns `Some(item)` if an item is available, `None` otherwise. pub fn try_recv(&mut self) -> Option { match self.receiver.try_recv() { Ok(item) => Some(item), - Err(_) => None, + Err(_) => { + // Check if we're truly finished (no more items will come) + if self.finished.load(Ordering::Relaxed) { + // Signal to the producer that we're done + self.received.store(true, Ordering::Relaxed); + self.waker.wake(); + } + None + } } } } + +impl Drop for AsyncStreamReceiver { + fn drop(&mut self) { + // Signal the producer we're done when dropped + self.received.store(true, Ordering::Relaxed); + self.waker.wake(); + } +} diff --git a/src/stream.rs b/src/stream.rs index c60869f..1b7897c 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,35 +1,62 @@ +use std::fmt; use std::pin::Pin; use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use bevy_tasks::ConditionalSend; +use bevy_tasks::ConditionalSendFuture; use bevy_tasks::futures_lite::Stream; use bevy_tasks::futures_lite::StreamExt; +use bevy_tasks::futures_lite::stream; use futures::task::AtomicWaker; use tokio::sync::mpsc; +use crate::ConditionalSendStream; use crate::receiver::AsyncStreamReceiver; /// An async stream task that yields multiple items. -pub struct AsyncStream { - stream: Pin + Send + 'static>>, +pub struct AsyncStream { + stream: Pin + 'static>>, +} + +impl fmt::Debug for AsyncStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("AsyncStream") + .field("stream", &"") + .finish() + } } impl AsyncStream { + /// Create a new async stream from a future that produces a stream. + pub fn lazy(fut: F) -> Self + where + F: ConditionalSendFuture + 'static, + S: ConditionalSendStream + 'static, + { + Self { + stream: Box::pin(stream::once_future(fut).flatten()), + } + } + + /// Create a new async stream from a stream. pub fn new(stream: S) -> Self where - S: Stream + Send + 'static, + S: ConditionalSendStream + 'static, + S::Item: ConditionalSend + 'static, { Self { stream: Box::pin(stream), } } + /// Split the stream into a runnable future and receiver. + /// This is a low-level operation and only useful for specific needs. pub fn split( self, ) -> ( - impl std::future::Future + Send, + impl ConditionalSendFuture, AsyncStreamReceiver, ) { let (tx, rx) = mpsc::unbounded_channel(); @@ -37,16 +64,30 @@ impl AsyncStream { let finished_clone = Arc::clone(&finished); let waker = Arc::new(AtomicWaker::new()); let waker_clone = Arc::clone(&waker); + let received = Arc::new(AtomicBool::new(false)); + let received_clone = Arc::clone(&received); let fut = async move { let mut stream = self.stream; while let Some(item) = stream.next().await { if tx.send(item).is_err() { + // Receiver dropped, exit early break; } } finished_clone.store(true, Ordering::Relaxed); waker_clone.wake(); + + // Wait for receiver to acknowledge it's done reading + futures::future::poll_fn(|cx| { + waker_clone.register(cx.waker()); + if received_clone.load(Ordering::Relaxed) { + std::task::Poll::Ready(()) + } else { + std::task::Poll::Pending + } + }) + .await; }; ( @@ -55,6 +96,7 @@ impl AsyncStream { finished, waker, receiver: rx, + received, }, ) } @@ -69,3 +111,452 @@ where Self::new(stream) } } + +#[cfg(not(target_arch = "wasm32"))] +#[cfg(test)] +mod test { + use core::time::Duration; + + use bevy_tasks::futures_lite::stream; + use futures::FutureExt; + use futures::pin_mut; + use futures_timer::Delay; + use tokio::select; + + use super::*; + + #[tokio::test] + async fn test_stream_basic() { + let items = vec![1, 2, 3, 4, 5]; + let stream = stream::iter(items.clone()); + let task = AsyncStream::new(stream); + let (fut, mut rx) = task.split(); + + assert_eq!(None, rx.try_recv()); + assert!(!rx.is_finished()); + + // Spawn + tokio::spawn(fut); + + // Collect all items + let mut collected = Vec::new(); + let fetch = Delay::new(Duration::from_millis(1)); + let timeout = Delay::new(Duration::from_millis(100)).fuse(); + pin_mut!(timeout, fetch); + + 'result: loop { + select! { + _ = (&mut fetch).fuse() => { + if let Some(v) = rx.try_recv() { + collected.push(v); + if collected.len() == items.len() { + break 'result; + } + fetch.reset(Duration::from_millis(1)); + } else if rx.is_finished() && collected.len() == items.len() { + break 'result; + } else { + fetch.reset(Duration::from_millis(1)); + } + } + _ = &mut timeout => panic!("timeout") + }; + } + + assert_eq!(items, collected); + assert!(rx.is_finished()); + } + + #[tokio::test] + async fn test_stream_empty() { + let stream = stream::empty::(); + let task = AsyncStream::new(stream); + let (fut, mut rx) = task.split(); + + assert_eq!(None, rx.try_recv()); + assert!(!rx.is_finished()); + + // Spawn + tokio::spawn(fut); + + // Wait for stream to finish + let fetch = Delay::new(Duration::from_millis(1)); + let timeout = Delay::new(Duration::from_millis(100)).fuse(); + pin_mut!(timeout, fetch); + + 'result: loop { + select! { + _ = (&mut fetch).fuse() => { + if rx.is_finished() { + break 'result; + } + fetch.reset(Duration::from_millis(1)); + } + _ = &mut timeout => panic!("timeout") + }; + } + + assert_eq!(None, rx.try_recv()); + assert!(rx.is_finished()); + } + + #[tokio::test] + async fn test_stream_single_item() { + let stream = stream::once(42); + let task = AsyncStream::new(stream); + let (fut, mut rx) = task.split(); + + // Spawn + tokio::spawn(fut); + + // Wait for item + let fetch = Delay::new(Duration::from_millis(1)); + let timeout = Delay::new(Duration::from_millis(100)).fuse(); + pin_mut!(timeout, fetch); + + 'result: loop { + select! { + _ = (&mut fetch).fuse() => { + if let Some(v) = rx.try_recv() { + assert_eq!(42, v); + break 'result; + } + fetch.reset(Duration::from_millis(1)); + } + _ = &mut timeout => panic!("timeout") + }; + } + + // Wait for finish + let fetch = Delay::new(Duration::from_millis(1)); + let timeout = Delay::new(Duration::from_millis(100)).fuse(); + pin_mut!(timeout, fetch); + + 'finish: loop { + select! { + _ = (&mut fetch).fuse() => { + if rx.is_finished() { + break 'finish; + } + fetch.reset(Duration::from_millis(1)); + } + _ = &mut timeout => panic!("timeout") + }; + } + + assert!(rx.is_finished()); + } + + #[tokio::test] + async fn test_stream_async_items() { + // Create a stream that yields items with delays + let stream = stream::iter(vec![1, 2, 3]).then(|x| async move { + Delay::new(Duration::from_millis(5)).await; + x * 2 + }); + + let task = AsyncStream::new(stream); + let (fut, mut rx) = task.split(); + + // Spawn + tokio::spawn(fut); + + // Collect all items + let mut collected = Vec::new(); + let fetch = Delay::new(Duration::from_millis(1)); + let timeout = Delay::new(Duration::from_millis(500)).fuse(); + pin_mut!(timeout, fetch); + + 'result: loop { + select! { + _ = (&mut fetch).fuse() => { + if let Some(v) = rx.try_recv() { + collected.push(v); + if collected.len() == 3 { + break 'result; + } + } + fetch.reset(Duration::from_millis(1)); + } + _ = &mut timeout => panic!("timeout") + }; + } + + assert_eq!(vec![2, 4, 6], collected); + } + + #[tokio::test] + async fn test_stream_large_batch() { + let items: Vec = (0..100).collect(); + let stream = stream::iter(items.clone()); + let task = AsyncStream::new(stream); + let (fut, mut rx) = task.split(); + + // Spawn + tokio::spawn(fut); + + // Collect all items + let mut collected = Vec::new(); + let fetch = Delay::new(Duration::from_millis(1)); + let timeout = Delay::new(Duration::from_millis(500)).fuse(); + pin_mut!(timeout, fetch); + + 'result: loop { + select! { + _ = (&mut fetch).fuse() => { + while let Some(v) = rx.try_recv() { + collected.push(v); + } + if rx.is_finished() && collected.len() == items.len() { + break 'result; + } + fetch.reset(Duration::from_millis(1)); + } + _ = &mut timeout => panic!("timeout") + }; + } + + assert_eq!(items, collected); + } + + #[tokio::test] + async fn test_stream_from_conversion() { + let items = vec![1, 2, 3]; + let stream = stream::iter(items.clone()); + let task: AsyncStream = stream.into(); + let (fut, mut rx) = task.split(); + + // Spawn + tokio::spawn(fut); + + // Collect all items + let mut collected = Vec::new(); + let fetch = Delay::new(Duration::from_millis(1)); + let timeout = Delay::new(Duration::from_millis(100)).fuse(); + pin_mut!(timeout, fetch); + + 'result: loop { + select! { + _ = (&mut fetch).fuse() => { + while let Some(v) = rx.try_recv() { + collected.push(v); + } + if rx.is_finished() { + break 'result; + } + fetch.reset(Duration::from_millis(1)); + } + _ = &mut timeout => panic!("timeout") + }; + } + + assert_eq!(items, collected); + } + + #[tokio::test] + async fn test_stream_receiver_dropped_early() { + let items = vec![1, 2, 3, 4, 5]; + let stream = stream::iter(items); + let task = AsyncStream::new(stream); + let (fut, mut rx) = task.split(); + + // Spawn + let handle = tokio::spawn(fut); + + // Get first item then drop receiver + let fetch = Delay::new(Duration::from_millis(1)); + let timeout = Delay::new(Duration::from_millis(100)).fuse(); + pin_mut!(timeout, fetch); + + 'result: loop { + select! { + _ = (&mut fetch).fuse() => { + if let Some(v) = rx.try_recv() { + assert_eq!(1, v); + break 'result; + } + fetch.reset(Duration::from_millis(1)); + } + _ = &mut timeout => panic!("timeout") + }; + } + + // Drop receiver + drop(rx); + + // Task should complete without panic + let timeout = Delay::new(Duration::from_millis(100)); + tokio::select! { + _ = handle => {}, + _ = timeout => panic!("task didn't complete after receiver dropped") + } + } +} + +#[cfg(target_arch = "wasm32")] +#[cfg(test)] +mod test { + use bevy_tasks::futures_lite::stream; + use wasm_bindgen::JsValue; + use wasm_bindgen_futures::JsFuture; + use wasm_bindgen_test::wasm_bindgen_test; + + use crate::AsyncStream; + + #[wasm_bindgen_test] + async fn test_stream_basic() { + let items = vec![1, 2, 3, 4, 5]; + let stream = stream::iter(items.clone()); + let task = AsyncStream::new(stream); + let (fut, mut rx) = task.split(); + + assert_eq!(None, rx.try_recv()); + assert!(!rx.is_finished()); + + // Convert to Promise and await it. + JsFuture::from(wasm_bindgen_futures::future_to_promise(async move { + fut.await; + Ok(JsValue::NULL) + })) + .await + .unwrap_or_else(|e| { + panic!("awaiting promise failed: {e:?}"); + }); + + // Collect all items + let mut collected = Vec::new(); + while let Some(v) = rx.try_recv() { + collected.push(v); + } + + assert_eq!(items, collected); + assert!(rx.is_finished()); + } + + #[wasm_bindgen_test] + async fn test_stream_empty() { + let stream = stream::empty::(); + let task = AsyncStream::new(stream); + let (fut, mut rx) = task.split(); + + assert_eq!(None, rx.try_recv()); + assert!(!rx.is_finished()); + + // Convert to Promise and await it. + JsFuture::from(wasm_bindgen_futures::future_to_promise(async move { + fut.await; + Ok(JsValue::NULL) + })) + .await + .unwrap_or_else(|e| { + panic!("awaiting promise failed: {e:?}"); + }); + + assert_eq!(None, rx.try_recv()); + assert!(rx.is_finished()); + } + + #[wasm_bindgen_test] + async fn test_stream_single_item() { + let stream = stream::once(42); + let task = AsyncStream::new(stream); + let (fut, mut rx) = task.split(); + + // Convert to Promise and await it. + JsFuture::from(wasm_bindgen_futures::future_to_promise(async move { + fut.await; + Ok(JsValue::NULL) + })) + .await + .unwrap_or_else(|e| { + panic!("awaiting promise failed: {e:?}"); + }); + + let item = rx.try_recv().unwrap_or_else(|| { + panic!("expected item after await"); + }); + assert_eq!(42, item); + assert!(rx.is_finished()); + } + + #[wasm_bindgen_test] + async fn test_stream_async_items() { + use bevy_tasks::futures_lite::StreamExt; + + // Create a stream that yields items with delays + let stream = stream::iter(vec![1, 2, 3]).then(|x| async move { x * 2 }); + + let task = AsyncStream::new(stream); + let (fut, mut rx) = task.split(); + + // Convert to Promise and await it. + JsFuture::from(wasm_bindgen_futures::future_to_promise(async move { + fut.await; + Ok(JsValue::NULL) + })) + .await + .unwrap_or_else(|e| { + panic!("awaiting promise failed: {e:?}"); + }); + + // Collect all items + let mut collected = Vec::new(); + while let Some(v) = rx.try_recv() { + collected.push(v); + } + + assert_eq!(vec![2, 4, 6], collected); + } + + #[wasm_bindgen_test] + async fn test_stream_large_batch() { + let items: Vec = (0..100).collect(); + let stream = stream::iter(items.clone()); + let task = AsyncStream::new(stream); + let (fut, mut rx) = task.split(); + + // Convert to Promise and await it. + JsFuture::from(wasm_bindgen_futures::future_to_promise(async move { + fut.await; + Ok(JsValue::NULL) + })) + .await + .unwrap_or_else(|e| { + panic!("awaiting promise failed: {e:?}"); + }); + + // Collect all items + let mut collected = Vec::new(); + while let Some(v) = rx.try_recv() { + collected.push(v); + } + + assert_eq!(items, collected); + } + + #[wasm_bindgen_test] + async fn test_stream_from_conversion() { + let items = vec![1, 2, 3]; + let stream = stream::iter(items.clone()); + let task: AsyncStream = stream.into(); + let (fut, mut rx) = task.split(); + + // Convert to Promise and await it. + JsFuture::from(wasm_bindgen_futures::future_to_promise(async move { + fut.await; + Ok(JsValue::NULL) + })) + .await + .unwrap_or_else(|e| { + panic!("awaiting promise failed: {e:?}"); + }); + + // Collect all items + let mut collected = Vec::new(); + while let Some(v) = rx.try_recv() { + collected.push(v); + } + + assert_eq!(items, collected); + } +} diff --git a/src/task.rs b/src/task.rs index 27e1f1b..c6b29fa 100644 --- a/src/task.rs +++ b/src/task.rs @@ -86,7 +86,7 @@ where /// Split the task into a runnable future and receiver. /// This is a low-level operation and only useful for specific needs. #[must_use] - pub fn split(self) -> (Pin>>, AsyncReceiver) { + pub fn split(self) -> (impl ConditionalSendFuture, AsyncReceiver) { let (tx, rx) = oneshot::channel(); let waker = Arc::new(AtomicWaker::new()); let received = Arc::new(AtomicBool::new(false)); @@ -184,7 +184,7 @@ where pub fn split( self, ) -> ( - Pin>>, + impl ConditionalSendFuture, AsyncReceiver>, ) { let (tx, rx) = oneshot::channel(); From 17a0af74f1765162e5303ef7c52634d3285083a8 Mon Sep 17 00:00:00 2001 From: "Spencer C. Imbleau" Date: Thu, 11 Dec 2025 23:57:21 -0500 Subject: [PATCH 3/6] fix: clippy --- examples/cross_system_stream.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/cross_system_stream.rs b/examples/cross_system_stream.rs index e51ff94..d492464 100644 --- a/examples/cross_system_stream.rs +++ b/examples/cross_system_stream.rs @@ -1,6 +1,5 @@ //! Cross system example - This example shows how to start a stream from one system and poll it from //! another through a resource. -use std::task::Poll; use std::time::Duration; use bevy::app::PanicHandlerPlugin; From 178de004ed9fd023d6f7ed34916a886aaad6ad90 Mon Sep 17 00:00:00 2001 From: "Spencer C. Imbleau" Date: Thu, 11 Dec 2025 23:58:25 -0500 Subject: [PATCH 4/6] docs: update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c39161..4c8ecf7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added `AsyncStream` and `TaskStream`, functional equivalents of `AsyncTask` and `TaskRunner` for streams (aka async iterators). See the streaming example. - Added `bevy_async_task::MAX_TIMEOUT` - Added `bevy_async_task::DEFAULT_TIMEOUT` From fc43da493ee24172be475ec5babbce6c574d6ae8 Mon Sep 17 00:00:00 2001 From: "Spencer C. Imbleau" Date: Fri, 12 Dec 2025 00:01:26 -0500 Subject: [PATCH 5/6] feat: match taskrunner api --- src/task_stream.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/task_stream.rs b/src/task_stream.rs index 1ee72e6..e9b2c1e 100644 --- a/src/task_stream.rs +++ b/src/task_stream.rs @@ -27,8 +27,8 @@ where self.0.is_none() } - /// Returns whether the stream is active (started but not finished). - pub fn is_active(&self) -> bool { + /// Returns whether the stream is pending (running, but not finished). + pub fn is_pending(&self) -> bool { if let Some(rx) = &self.0 { !rx.is_finished() } else { From a3f30d08f822b3d2f66ef97814ee462db33c5b5f Mon Sep 17 00:00:00 2001 From: "Spencer C. Imbleau" Date: Fri, 12 Dec 2025 00:02:50 -0500 Subject: [PATCH 6/6] fix: lint --- src/task.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/task.rs b/src/task.rs index c6b29fa..967f600 100644 --- a/src/task.rs +++ b/src/task.rs @@ -1,6 +1,5 @@ use core::time::Duration; use std::fmt::Debug; -use std::future::Future; use std::future::pending; use std::pin::Pin; use std::sync::Arc;