diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c39161..4c8ecf7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added `AsyncStream` and `TaskStream`, functional equivalents of `AsyncTask` and `TaskRunner` for streams (aka async iterators). See the streaming example. - Added `bevy_async_task::MAX_TIMEOUT` - Added `bevy_async_task::DEFAULT_TIMEOUT` diff --git a/examples/cross_system_stream.rs b/examples/cross_system_stream.rs new file mode 100644 index 0000000..d492464 --- /dev/null +++ b/examples/cross_system_stream.rs @@ -0,0 +1,58 @@ +//! Cross system example - This example shows how to start a stream from one system and poll it from +//! another through a resource. +use std::time::Duration; + +use bevy::app::PanicHandlerPlugin; +use bevy::log::LogPlugin; +use bevy::prelude::*; +use bevy::tasks::AsyncComputeTaskPool; +use bevy_async_task::AsyncStream; +use bevy_async_task::AsyncStreamReceiver; +use bevy_async_task::sleep; +use bevy_tasks::futures_lite::Stream; +use bevy_tasks::futures_lite::StreamExt; +use bevy_tasks::futures_lite::stream; + +#[derive(Resource, DerefMut, Deref, Default)] +struct MyStream(Option>); + +/// An async stream that yields numbers over time +async fn async_number_stream() -> impl Stream { + sleep(Duration::from_millis(500)).await; + stream::iter(vec![1, 2, 3, 4, 5]).then(|x| async move { + sleep(Duration::from_millis(500)).await; + x * 10 + }) +} + +fn system1_start(mut my_stream: ResMut<'_, MyStream>) { + let stream = AsyncStream::lazy(async_number_stream()); + let (fut, receiver) = stream.split(); + my_stream.replace(receiver); + AsyncComputeTaskPool::get().spawn_local(fut).detach(); + info!("Stream started!"); +} + +fn system2_poll(mut my_stream: ResMut<'_, MyStream>, mut exit: MessageWriter<'_, AppExit>) { + let Some(receiver) = my_stream.0.as_mut() else { + return; + }; + + while let Some(v) = receiver.try_recv() { + info!("Received {v}"); + } + if receiver.is_finished() { + info!("Stream finished!"); + exit.write(AppExit::Success); + } +} + +/// Entry point +pub fn main() { + App::new() + .init_resource::() + .add_plugins((MinimalPlugins, LogPlugin::default(), PanicHandlerPlugin)) + .add_systems(Startup, system1_start) + .add_systems(Update, system2_poll) + .run(); +} diff --git a/examples/stream.rs b/examples/stream.rs new file mode 100644 index 0000000..295c4cb --- /dev/null +++ b/examples/stream.rs @@ -0,0 +1,53 @@ +//! Simple example - this demonstrates running one async stream continuously. + +use std::task::Poll; +use std::time::Duration; + +use bevy::app::PanicHandlerPlugin; +use bevy::log::LogPlugin; +use bevy::prelude::*; +use bevy_async_task::AsyncStream; +use bevy_async_task::TaskStream; +use bevy_async_task::sleep; +use bevy_tasks::futures_lite::Stream; +use bevy_tasks::futures_lite::StreamExt; +use bevy_tasks::futures_lite::stream; + +/// An async stream that yields numbers over time +async fn async_number_stream() -> impl Stream { + sleep(Duration::from_millis(500)).await; + + stream::iter(vec![1, 2, 3, 4, 5]).then(|x| async move { + sleep(Duration::from_millis(500)).await; + x * 10 + }) +} + +fn my_system(mut task_stream: TaskStream<'_, u32>) { + if task_stream.is_idle() { + // Start an async stream! + let stream = AsyncStream::lazy(async_number_stream()); + task_stream.start(stream); + info!("Stream started!"); + } + + match task_stream.poll_next() { + Poll::Ready(Some(v)) => { + info!("Received {v}"); + } + Poll::Ready(None) => { + info!("Stream finished!"); + } + Poll::Pending => { + // Waiting for next item... + } + } +} + +/// Entry point +pub fn main() { + App::new() + .add_plugins((MinimalPlugins, LogPlugin::default(), PanicHandlerPlugin)) + .add_systems(Update, my_system) + .run(); +} diff --git a/src/lib.rs b/src/lib.rs index 7802900..71aee3e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,19 +2,24 @@ mod error; mod receiver; +mod stream; mod task; mod task_pool; mod task_runner; +mod task_stream; mod util; pub use error::TimeoutError; pub use receiver::AsyncReceiver; +pub use receiver::AsyncStreamReceiver; +pub use stream::AsyncStream; pub use task::AsyncTask; pub use task::TimedAsyncTask; pub use task_pool::TaskPool; pub use task_pool::TimedTaskPool; pub use task_runner::TaskRunner; pub use task_runner::TimedTaskRunner; +pub use task_stream::TaskStream; pub use util::pending; pub use util::sleep; pub use util::timeout; @@ -28,3 +33,17 @@ pub const MAX_TIMEOUT: core::time::Duration = { static I32_MAX_U64: u64 = 2_147_483_647; core::time::Duration::from_millis(I32_MAX_U64) }; + +#[doc(hidden)] +pub mod polyfill { + use bevy_tasks::ConditionalSend; + use bevy_tasks::futures_lite::Stream; + + /// The missing conditional send stream trait. + /// see: + pub trait ConditionalSendStream: Stream + ConditionalSend {} + + impl ConditionalSendStream for T {} +} + +pub use polyfill::ConditionalSendStream; diff --git a/src/receiver.rs b/src/receiver.rs index ec7cc44..befb1f8 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -1,9 +1,9 @@ -use std::sync::{ - Arc, - atomic::{AtomicBool, Ordering}, -}; +use std::sync::Arc; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; use futures::task::AtomicWaker; +use tokio::sync::mpsc; use tokio::sync::oneshot::{self}; /// A channel that catches an [`AsyncTask`](crate::AsyncTask) result. @@ -28,3 +28,44 @@ impl AsyncReceiver { } } } + +/// A channel that receives multiple items from an async stream. +#[derive(Debug)] +pub struct AsyncStreamReceiver { + pub(crate) finished: Arc, + pub(crate) waker: Arc, + pub(crate) receiver: mpsc::UnboundedReceiver, + pub(crate) received: Arc, +} + +impl AsyncStreamReceiver { + /// Returns whether the stream has finished producing items. + pub fn is_finished(&self) -> bool { + self.finished.load(Ordering::Relaxed) && self.receiver.is_empty() + } + + /// Try to receive the next item from the stream without blocking. + /// Returns `Some(item)` if an item is available, `None` otherwise. + pub fn try_recv(&mut self) -> Option { + match self.receiver.try_recv() { + Ok(item) => Some(item), + Err(_) => { + // Check if we're truly finished (no more items will come) + if self.finished.load(Ordering::Relaxed) { + // Signal to the producer that we're done + self.received.store(true, Ordering::Relaxed); + self.waker.wake(); + } + None + } + } + } +} + +impl Drop for AsyncStreamReceiver { + fn drop(&mut self) { + // Signal the producer we're done when dropped + self.received.store(true, Ordering::Relaxed); + self.waker.wake(); + } +} diff --git a/src/stream.rs b/src/stream.rs new file mode 100644 index 0000000..1b7897c --- /dev/null +++ b/src/stream.rs @@ -0,0 +1,562 @@ +use std::fmt; +use std::pin::Pin; +use std::sync::Arc; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; + +use bevy_tasks::ConditionalSend; +use bevy_tasks::ConditionalSendFuture; +use bevy_tasks::futures_lite::Stream; +use bevy_tasks::futures_lite::StreamExt; +use bevy_tasks::futures_lite::stream; +use futures::task::AtomicWaker; +use tokio::sync::mpsc; + +use crate::ConditionalSendStream; +use crate::receiver::AsyncStreamReceiver; + +/// An async stream task that yields multiple items. +pub struct AsyncStream { + stream: Pin + 'static>>, +} + +impl fmt::Debug for AsyncStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("AsyncStream") + .field("stream", &"") + .finish() + } +} + +impl AsyncStream { + /// Create a new async stream from a future that produces a stream. + pub fn lazy(fut: F) -> Self + where + F: ConditionalSendFuture + 'static, + S: ConditionalSendStream + 'static, + { + Self { + stream: Box::pin(stream::once_future(fut).flatten()), + } + } + + /// Create a new async stream from a stream. + pub fn new(stream: S) -> Self + where + S: ConditionalSendStream + 'static, + S::Item: ConditionalSend + 'static, + { + Self { + stream: Box::pin(stream), + } + } + + /// Split the stream into a runnable future and receiver. + /// This is a low-level operation and only useful for specific needs. + pub fn split( + self, + ) -> ( + impl ConditionalSendFuture, + AsyncStreamReceiver, + ) { + let (tx, rx) = mpsc::unbounded_channel(); + let finished = Arc::new(AtomicBool::new(false)); + let finished_clone = Arc::clone(&finished); + let waker = Arc::new(AtomicWaker::new()); + let waker_clone = Arc::clone(&waker); + let received = Arc::new(AtomicBool::new(false)); + let received_clone = Arc::clone(&received); + + let fut = async move { + let mut stream = self.stream; + while let Some(item) = stream.next().await { + if tx.send(item).is_err() { + // Receiver dropped, exit early + break; + } + } + finished_clone.store(true, Ordering::Relaxed); + waker_clone.wake(); + + // Wait for receiver to acknowledge it's done reading + futures::future::poll_fn(|cx| { + waker_clone.register(cx.waker()); + if received_clone.load(Ordering::Relaxed) { + std::task::Poll::Ready(()) + } else { + std::task::Poll::Pending + } + }) + .await; + }; + + ( + fut, + AsyncStreamReceiver { + finished, + waker, + receiver: rx, + received, + }, + ) + } +} + +impl From for AsyncStream +where + T: ConditionalSend + 'static, + S: Stream + Send + 'static, +{ + fn from(stream: S) -> Self { + Self::new(stream) + } +} + +#[cfg(not(target_arch = "wasm32"))] +#[cfg(test)] +mod test { + use core::time::Duration; + + use bevy_tasks::futures_lite::stream; + use futures::FutureExt; + use futures::pin_mut; + use futures_timer::Delay; + use tokio::select; + + use super::*; + + #[tokio::test] + async fn test_stream_basic() { + let items = vec![1, 2, 3, 4, 5]; + let stream = stream::iter(items.clone()); + let task = AsyncStream::new(stream); + let (fut, mut rx) = task.split(); + + assert_eq!(None, rx.try_recv()); + assert!(!rx.is_finished()); + + // Spawn + tokio::spawn(fut); + + // Collect all items + let mut collected = Vec::new(); + let fetch = Delay::new(Duration::from_millis(1)); + let timeout = Delay::new(Duration::from_millis(100)).fuse(); + pin_mut!(timeout, fetch); + + 'result: loop { + select! { + _ = (&mut fetch).fuse() => { + if let Some(v) = rx.try_recv() { + collected.push(v); + if collected.len() == items.len() { + break 'result; + } + fetch.reset(Duration::from_millis(1)); + } else if rx.is_finished() && collected.len() == items.len() { + break 'result; + } else { + fetch.reset(Duration::from_millis(1)); + } + } + _ = &mut timeout => panic!("timeout") + }; + } + + assert_eq!(items, collected); + assert!(rx.is_finished()); + } + + #[tokio::test] + async fn test_stream_empty() { + let stream = stream::empty::(); + let task = AsyncStream::new(stream); + let (fut, mut rx) = task.split(); + + assert_eq!(None, rx.try_recv()); + assert!(!rx.is_finished()); + + // Spawn + tokio::spawn(fut); + + // Wait for stream to finish + let fetch = Delay::new(Duration::from_millis(1)); + let timeout = Delay::new(Duration::from_millis(100)).fuse(); + pin_mut!(timeout, fetch); + + 'result: loop { + select! { + _ = (&mut fetch).fuse() => { + if rx.is_finished() { + break 'result; + } + fetch.reset(Duration::from_millis(1)); + } + _ = &mut timeout => panic!("timeout") + }; + } + + assert_eq!(None, rx.try_recv()); + assert!(rx.is_finished()); + } + + #[tokio::test] + async fn test_stream_single_item() { + let stream = stream::once(42); + let task = AsyncStream::new(stream); + let (fut, mut rx) = task.split(); + + // Spawn + tokio::spawn(fut); + + // Wait for item + let fetch = Delay::new(Duration::from_millis(1)); + let timeout = Delay::new(Duration::from_millis(100)).fuse(); + pin_mut!(timeout, fetch); + + 'result: loop { + select! { + _ = (&mut fetch).fuse() => { + if let Some(v) = rx.try_recv() { + assert_eq!(42, v); + break 'result; + } + fetch.reset(Duration::from_millis(1)); + } + _ = &mut timeout => panic!("timeout") + }; + } + + // Wait for finish + let fetch = Delay::new(Duration::from_millis(1)); + let timeout = Delay::new(Duration::from_millis(100)).fuse(); + pin_mut!(timeout, fetch); + + 'finish: loop { + select! { + _ = (&mut fetch).fuse() => { + if rx.is_finished() { + break 'finish; + } + fetch.reset(Duration::from_millis(1)); + } + _ = &mut timeout => panic!("timeout") + }; + } + + assert!(rx.is_finished()); + } + + #[tokio::test] + async fn test_stream_async_items() { + // Create a stream that yields items with delays + let stream = stream::iter(vec![1, 2, 3]).then(|x| async move { + Delay::new(Duration::from_millis(5)).await; + x * 2 + }); + + let task = AsyncStream::new(stream); + let (fut, mut rx) = task.split(); + + // Spawn + tokio::spawn(fut); + + // Collect all items + let mut collected = Vec::new(); + let fetch = Delay::new(Duration::from_millis(1)); + let timeout = Delay::new(Duration::from_millis(500)).fuse(); + pin_mut!(timeout, fetch); + + 'result: loop { + select! { + _ = (&mut fetch).fuse() => { + if let Some(v) = rx.try_recv() { + collected.push(v); + if collected.len() == 3 { + break 'result; + } + } + fetch.reset(Duration::from_millis(1)); + } + _ = &mut timeout => panic!("timeout") + }; + } + + assert_eq!(vec![2, 4, 6], collected); + } + + #[tokio::test] + async fn test_stream_large_batch() { + let items: Vec = (0..100).collect(); + let stream = stream::iter(items.clone()); + let task = AsyncStream::new(stream); + let (fut, mut rx) = task.split(); + + // Spawn + tokio::spawn(fut); + + // Collect all items + let mut collected = Vec::new(); + let fetch = Delay::new(Duration::from_millis(1)); + let timeout = Delay::new(Duration::from_millis(500)).fuse(); + pin_mut!(timeout, fetch); + + 'result: loop { + select! { + _ = (&mut fetch).fuse() => { + while let Some(v) = rx.try_recv() { + collected.push(v); + } + if rx.is_finished() && collected.len() == items.len() { + break 'result; + } + fetch.reset(Duration::from_millis(1)); + } + _ = &mut timeout => panic!("timeout") + }; + } + + assert_eq!(items, collected); + } + + #[tokio::test] + async fn test_stream_from_conversion() { + let items = vec![1, 2, 3]; + let stream = stream::iter(items.clone()); + let task: AsyncStream = stream.into(); + let (fut, mut rx) = task.split(); + + // Spawn + tokio::spawn(fut); + + // Collect all items + let mut collected = Vec::new(); + let fetch = Delay::new(Duration::from_millis(1)); + let timeout = Delay::new(Duration::from_millis(100)).fuse(); + pin_mut!(timeout, fetch); + + 'result: loop { + select! { + _ = (&mut fetch).fuse() => { + while let Some(v) = rx.try_recv() { + collected.push(v); + } + if rx.is_finished() { + break 'result; + } + fetch.reset(Duration::from_millis(1)); + } + _ = &mut timeout => panic!("timeout") + }; + } + + assert_eq!(items, collected); + } + + #[tokio::test] + async fn test_stream_receiver_dropped_early() { + let items = vec![1, 2, 3, 4, 5]; + let stream = stream::iter(items); + let task = AsyncStream::new(stream); + let (fut, mut rx) = task.split(); + + // Spawn + let handle = tokio::spawn(fut); + + // Get first item then drop receiver + let fetch = Delay::new(Duration::from_millis(1)); + let timeout = Delay::new(Duration::from_millis(100)).fuse(); + pin_mut!(timeout, fetch); + + 'result: loop { + select! { + _ = (&mut fetch).fuse() => { + if let Some(v) = rx.try_recv() { + assert_eq!(1, v); + break 'result; + } + fetch.reset(Duration::from_millis(1)); + } + _ = &mut timeout => panic!("timeout") + }; + } + + // Drop receiver + drop(rx); + + // Task should complete without panic + let timeout = Delay::new(Duration::from_millis(100)); + tokio::select! { + _ = handle => {}, + _ = timeout => panic!("task didn't complete after receiver dropped") + } + } +} + +#[cfg(target_arch = "wasm32")] +#[cfg(test)] +mod test { + use bevy_tasks::futures_lite::stream; + use wasm_bindgen::JsValue; + use wasm_bindgen_futures::JsFuture; + use wasm_bindgen_test::wasm_bindgen_test; + + use crate::AsyncStream; + + #[wasm_bindgen_test] + async fn test_stream_basic() { + let items = vec![1, 2, 3, 4, 5]; + let stream = stream::iter(items.clone()); + let task = AsyncStream::new(stream); + let (fut, mut rx) = task.split(); + + assert_eq!(None, rx.try_recv()); + assert!(!rx.is_finished()); + + // Convert to Promise and await it. + JsFuture::from(wasm_bindgen_futures::future_to_promise(async move { + fut.await; + Ok(JsValue::NULL) + })) + .await + .unwrap_or_else(|e| { + panic!("awaiting promise failed: {e:?}"); + }); + + // Collect all items + let mut collected = Vec::new(); + while let Some(v) = rx.try_recv() { + collected.push(v); + } + + assert_eq!(items, collected); + assert!(rx.is_finished()); + } + + #[wasm_bindgen_test] + async fn test_stream_empty() { + let stream = stream::empty::(); + let task = AsyncStream::new(stream); + let (fut, mut rx) = task.split(); + + assert_eq!(None, rx.try_recv()); + assert!(!rx.is_finished()); + + // Convert to Promise and await it. + JsFuture::from(wasm_bindgen_futures::future_to_promise(async move { + fut.await; + Ok(JsValue::NULL) + })) + .await + .unwrap_or_else(|e| { + panic!("awaiting promise failed: {e:?}"); + }); + + assert_eq!(None, rx.try_recv()); + assert!(rx.is_finished()); + } + + #[wasm_bindgen_test] + async fn test_stream_single_item() { + let stream = stream::once(42); + let task = AsyncStream::new(stream); + let (fut, mut rx) = task.split(); + + // Convert to Promise and await it. + JsFuture::from(wasm_bindgen_futures::future_to_promise(async move { + fut.await; + Ok(JsValue::NULL) + })) + .await + .unwrap_or_else(|e| { + panic!("awaiting promise failed: {e:?}"); + }); + + let item = rx.try_recv().unwrap_or_else(|| { + panic!("expected item after await"); + }); + assert_eq!(42, item); + assert!(rx.is_finished()); + } + + #[wasm_bindgen_test] + async fn test_stream_async_items() { + use bevy_tasks::futures_lite::StreamExt; + + // Create a stream that yields items with delays + let stream = stream::iter(vec![1, 2, 3]).then(|x| async move { x * 2 }); + + let task = AsyncStream::new(stream); + let (fut, mut rx) = task.split(); + + // Convert to Promise and await it. + JsFuture::from(wasm_bindgen_futures::future_to_promise(async move { + fut.await; + Ok(JsValue::NULL) + })) + .await + .unwrap_or_else(|e| { + panic!("awaiting promise failed: {e:?}"); + }); + + // Collect all items + let mut collected = Vec::new(); + while let Some(v) = rx.try_recv() { + collected.push(v); + } + + assert_eq!(vec![2, 4, 6], collected); + } + + #[wasm_bindgen_test] + async fn test_stream_large_batch() { + let items: Vec = (0..100).collect(); + let stream = stream::iter(items.clone()); + let task = AsyncStream::new(stream); + let (fut, mut rx) = task.split(); + + // Convert to Promise and await it. + JsFuture::from(wasm_bindgen_futures::future_to_promise(async move { + fut.await; + Ok(JsValue::NULL) + })) + .await + .unwrap_or_else(|e| { + panic!("awaiting promise failed: {e:?}"); + }); + + // Collect all items + let mut collected = Vec::new(); + while let Some(v) = rx.try_recv() { + collected.push(v); + } + + assert_eq!(items, collected); + } + + #[wasm_bindgen_test] + async fn test_stream_from_conversion() { + let items = vec![1, 2, 3]; + let stream = stream::iter(items.clone()); + let task: AsyncStream = stream.into(); + let (fut, mut rx) = task.split(); + + // Convert to Promise and await it. + JsFuture::from(wasm_bindgen_futures::future_to_promise(async move { + fut.await; + Ok(JsValue::NULL) + })) + .await + .unwrap_or_else(|e| { + panic!("awaiting promise failed: {e:?}"); + }); + + // Collect all items + let mut collected = Vec::new(); + while let Some(v) = rx.try_recv() { + collected.push(v); + } + + assert_eq!(items, collected); + } +} diff --git a/src/task.rs b/src/task.rs index 27e1f1b..967f600 100644 --- a/src/task.rs +++ b/src/task.rs @@ -1,6 +1,5 @@ use core::time::Duration; use std::fmt::Debug; -use std::future::Future; use std::future::pending; use std::pin::Pin; use std::sync::Arc; @@ -86,7 +85,7 @@ where /// Split the task into a runnable future and receiver. /// This is a low-level operation and only useful for specific needs. #[must_use] - pub fn split(self) -> (Pin>>, AsyncReceiver) { + pub fn split(self) -> (impl ConditionalSendFuture, AsyncReceiver) { let (tx, rx) = oneshot::channel(); let waker = Arc::new(AtomicWaker::new()); let received = Arc::new(AtomicBool::new(false)); @@ -184,7 +183,7 @@ where pub fn split( self, ) -> ( - Pin>>, + impl ConditionalSendFuture, AsyncReceiver>, ) { let (tx, rx) = oneshot::channel(); diff --git a/src/task_stream.rs b/src/task_stream.rs new file mode 100644 index 0000000..e9b2c1e --- /dev/null +++ b/src/task_stream.rs @@ -0,0 +1,128 @@ +use std::task::Poll; + +use bevy_ecs::component::Tick; +use bevy_ecs::system::ExclusiveSystemParam; +use bevy_ecs::system::ReadOnlySystemParam; +use bevy_ecs::system::SystemMeta; +use bevy_ecs::system::SystemParam; +use bevy_ecs::world::World; +use bevy_ecs::world::unsafe_world_cell::UnsafeWorldCell; +use bevy_platform::cell::SyncCell; +use bevy_tasks::AsyncComputeTaskPool; +use bevy_tasks::ConditionalSend; + +use crate::AsyncStream; +use crate::receiver::AsyncStreamReceiver; + +/// A Bevy [`SystemParam`] to execute async streams in the background. +#[derive(Debug)] +pub struct TaskStream<'s, T>(pub(crate) &'s mut Option>); + +impl TaskStream<'_, T> +where + T: ConditionalSend + 'static, +{ + /// Returns whether the stream is idle (not started). + pub fn is_idle(&self) -> bool { + self.0.is_none() + } + + /// Returns whether the stream is pending (running, but not finished). + pub fn is_pending(&self) -> bool { + if let Some(rx) = &self.0 { + !rx.is_finished() + } else { + false + } + } + + /// Returns whether the stream is finished. + pub fn is_finished(&self) -> bool { + if let Some(rx) = &self.0 { + rx.is_finished() + } else { + false + } + } + + /// Start an async stream in the background. If there is an existing stream + /// pending, it will be dropped and replaced with the given stream. + pub fn start(&mut self, stream: impl Into>) { + let stream = stream.into(); + let (fut, rx) = stream.split(); + let task_pool = AsyncComputeTaskPool::get(); + let handle = task_pool.spawn(fut); + handle.detach(); + self.0.replace(rx); + } + + /// Forget the stream being run. This does not stop the stream. + /// + /// Note: Bevy does not support cancelling a task on web currently. + pub fn forget(&mut self) { + self.0.take(); + } + + /// Poll for the next item. Returns `Poll::Ready(Some(T))` if an item is available, + /// `Poll::Ready(None)` if the stream is finished, or `Poll::Pending` if waiting. + pub fn poll_next(&mut self) -> Poll> { + match self.0.as_mut() { + Some(rx) => match rx.try_recv() { + Some(item) => Poll::Ready(Some(item)), + None => { + if rx.is_finished() { + self.0.take(); + Poll::Ready(None) + } else { + Poll::Pending + } + } + }, + None => Poll::Ready(None), + } + } +} + +impl ExclusiveSystemParam for TaskStream<'_, T> { + type State = SyncCell>>; + type Item<'s> = TaskStream<'s, T>; + + fn init(_world: &mut World, _system_meta: &mut SystemMeta) -> Self::State { + SyncCell::new(None) + } + + fn get_param<'s>(state: &'s mut Self::State, _system_meta: &SystemMeta) -> Self::Item<'s> { + TaskStream(state.get()) + } +} + +// SAFETY: only local state is accessed +unsafe impl ReadOnlySystemParam for TaskStream<'_, T> {} + +// SAFETY: only local state is accessed +unsafe impl SystemParam for TaskStream<'_, T> { + type State = SyncCell>>; + type Item<'w, 's> = TaskStream<'s, T>; + + fn init_state(_world: &mut World) -> Self::State { + SyncCell::new(None) + } + + #[inline] + unsafe fn get_param<'w, 's>( + state: &'s mut Self::State, + _system_meta: &SystemMeta, + _world: UnsafeWorldCell<'w>, + _change_tick: Tick, + ) -> Self::Item<'w, 's> { + TaskStream(state.get()) + } + + fn init_access( + _state: &Self::State, + _system_meta: &mut SystemMeta, + _component_access_set: &mut bevy_ecs::query::FilteredAccessSet, + _world: &mut World, + ) { + } +}