Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Added `AsyncStream` and `TaskStream`, functional equivalents of `AsyncTask` and `TaskRunner` for streams (aka async iterators). See the streaming example.
- Added `bevy_async_task::MAX_TIMEOUT`
- Added `bevy_async_task::DEFAULT_TIMEOUT`

Expand Down
58 changes: 58 additions & 0 deletions examples/cross_system_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
//! Cross system example - This example shows how to start a stream from one system and poll it from
//! another through a resource.
use std::time::Duration;

use bevy::app::PanicHandlerPlugin;
use bevy::log::LogPlugin;
use bevy::prelude::*;
use bevy::tasks::AsyncComputeTaskPool;
use bevy_async_task::AsyncStream;
use bevy_async_task::AsyncStreamReceiver;
use bevy_async_task::sleep;
use bevy_tasks::futures_lite::Stream;
use bevy_tasks::futures_lite::StreamExt;
use bevy_tasks::futures_lite::stream;

#[derive(Resource, DerefMut, Deref, Default)]
struct MyStream(Option<AsyncStreamReceiver<u32>>);

/// An async stream that yields numbers over time
async fn async_number_stream() -> impl Stream<Item = u32> {
sleep(Duration::from_millis(500)).await;
stream::iter(vec![1, 2, 3, 4, 5]).then(|x| async move {
sleep(Duration::from_millis(500)).await;
x * 10
})
}

fn system1_start(mut my_stream: ResMut<'_, MyStream>) {
let stream = AsyncStream::lazy(async_number_stream());
let (fut, receiver) = stream.split();
my_stream.replace(receiver);
AsyncComputeTaskPool::get().spawn_local(fut).detach();
info!("Stream started!");
}

fn system2_poll(mut my_stream: ResMut<'_, MyStream>, mut exit: MessageWriter<'_, AppExit>) {
let Some(receiver) = my_stream.0.as_mut() else {
return;
};

while let Some(v) = receiver.try_recv() {
info!("Received {v}");
}
if receiver.is_finished() {
info!("Stream finished!");
exit.write(AppExit::Success);
}
}

/// Entry point
pub fn main() {
App::new()
.init_resource::<MyStream>()
.add_plugins((MinimalPlugins, LogPlugin::default(), PanicHandlerPlugin))
.add_systems(Startup, system1_start)
.add_systems(Update, system2_poll)
.run();
}
53 changes: 53 additions & 0 deletions examples/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
//! Simple example - this demonstrates running one async stream continuously.

use std::task::Poll;
use std::time::Duration;

use bevy::app::PanicHandlerPlugin;
use bevy::log::LogPlugin;
use bevy::prelude::*;
use bevy_async_task::AsyncStream;
use bevy_async_task::TaskStream;
use bevy_async_task::sleep;
use bevy_tasks::futures_lite::Stream;
use bevy_tasks::futures_lite::StreamExt;
use bevy_tasks::futures_lite::stream;

/// An async stream that yields numbers over time
async fn async_number_stream() -> impl Stream<Item = u32> {
sleep(Duration::from_millis(500)).await;

stream::iter(vec![1, 2, 3, 4, 5]).then(|x| async move {
sleep(Duration::from_millis(500)).await;
x * 10
})
}

fn my_system(mut task_stream: TaskStream<'_, u32>) {
if task_stream.is_idle() {
// Start an async stream!
let stream = AsyncStream::lazy(async_number_stream());
task_stream.start(stream);
info!("Stream started!");
}

match task_stream.poll_next() {
Poll::Ready(Some(v)) => {
info!("Received {v}");
}
Poll::Ready(None) => {
info!("Stream finished!");
}
Poll::Pending => {
// Waiting for next item...
}
}
}

/// Entry point
pub fn main() {
App::new()
.add_plugins((MinimalPlugins, LogPlugin::default(), PanicHandlerPlugin))
.add_systems(Update, my_system)
.run();
}
19 changes: 19 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,24 @@

mod error;
mod receiver;
mod stream;
mod task;
mod task_pool;
mod task_runner;
mod task_stream;
mod util;

pub use error::TimeoutError;
pub use receiver::AsyncReceiver;
pub use receiver::AsyncStreamReceiver;
pub use stream::AsyncStream;
pub use task::AsyncTask;
pub use task::TimedAsyncTask;
pub use task_pool::TaskPool;
pub use task_pool::TimedTaskPool;
pub use task_runner::TaskRunner;
pub use task_runner::TimedTaskRunner;
pub use task_stream::TaskStream;
pub use util::pending;
pub use util::sleep;
pub use util::timeout;
Expand All @@ -28,3 +33,17 @@ pub const MAX_TIMEOUT: core::time::Duration = {
static I32_MAX_U64: u64 = 2_147_483_647;
core::time::Duration::from_millis(I32_MAX_U64)
};

#[doc(hidden)]
pub mod polyfill {
use bevy_tasks::ConditionalSend;
use bevy_tasks::futures_lite::Stream;

/// The missing conditional send stream trait.
/// see: <https://github.com/bevyengine/bevy/issues/22093>
pub trait ConditionalSendStream: Stream + ConditionalSend {}

impl<T: Stream + ConditionalSend> ConditionalSendStream for T {}
}

pub use polyfill::ConditionalSendStream;
49 changes: 45 additions & 4 deletions src/receiver.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;

use futures::task::AtomicWaker;
use tokio::sync::mpsc;
use tokio::sync::oneshot::{self};

/// A channel that catches an [`AsyncTask`](crate::AsyncTask) result.
Expand All @@ -28,3 +28,44 @@ impl<T> AsyncReceiver<T> {
}
}
}

/// A channel that receives multiple items from an async stream.
#[derive(Debug)]
pub struct AsyncStreamReceiver<T> {
pub(crate) finished: Arc<AtomicBool>,
pub(crate) waker: Arc<AtomicWaker>,
pub(crate) receiver: mpsc::UnboundedReceiver<T>,
pub(crate) received: Arc<AtomicBool>,
}

impl<T> AsyncStreamReceiver<T> {
/// Returns whether the stream has finished producing items.
pub fn is_finished(&self) -> bool {
self.finished.load(Ordering::Relaxed) && self.receiver.is_empty()
}

/// Try to receive the next item from the stream without blocking.
/// Returns `Some(item)` if an item is available, `None` otherwise.
pub fn try_recv(&mut self) -> Option<T> {
match self.receiver.try_recv() {
Ok(item) => Some(item),
Err(_) => {
// Check if we're truly finished (no more items will come)
if self.finished.load(Ordering::Relaxed) {
// Signal to the producer that we're done
self.received.store(true, Ordering::Relaxed);
self.waker.wake();
}
None
}
}
}
}

impl<T> Drop for AsyncStreamReceiver<T> {
fn drop(&mut self) {
// Signal the producer we're done when dropped
self.received.store(true, Ordering::Relaxed);
self.waker.wake();
}
}
Loading