Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- This release supports **Bevy 0.17**.

## [0.11.1] - 2025-12-21

- This release supports **Bevy 0.17**.

### Fixed

- Removed the need for tokio channels internally to reduce dependency graph
- Re-applied `async-compat` to native futures due to tokio reactor conflicts on native (particularly with reqwest).

## [0.11.0] - 2025-12-12

- This release supports **Bevy 0.17**.
Expand Down
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ clippy.wildcard_dependencies = "warn"
[lib]

[dependencies]
tokio = { version = "1.48.0", default-features = false, features = ["sync"] }

bevy_ecs = { version = "0.17.3", default-features = false, features = [
"multi_threaded",
"async_executor",
Expand All @@ -84,6 +82,7 @@ bevy = { version = "0.17.3", default-features = false, features = [

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
futures-timer = "3.0.3"
async-compat = "0.2.5"

[target.'cfg(target_arch = "wasm32")'.dependencies]
futures-timer = { version = "3.0.3", features = ["wasm-bindgen"] }
Expand Down
20 changes: 12 additions & 8 deletions src/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;

use futures::channel::mpsc;
use futures::channel::oneshot;
use futures::task::AtomicWaker;
use tokio::sync::mpsc;
use tokio::sync::oneshot::{self};

/// A channel that catches an [`AsyncTask`](crate::AsyncTask) result.
#[derive(Debug)]
Expand All @@ -18,13 +18,13 @@ impl<T> AsyncReceiver<T> {
/// Poll the current thread waiting for the async result.
pub fn try_recv(&mut self) -> Option<T> {
match self.receiver.try_recv() {
Ok(t) => {
Ok(Some(t)) => {
self.receiver.close();
self.received.store(true, Ordering::Relaxed);
self.waker.wake(); // Wake the sender to drop
Some(t)
}
Err(_) => None,
Ok(None) | Err(_) => None,
}
}
}
Expand All @@ -41,16 +41,20 @@ pub struct AsyncStreamReceiver<T> {
impl<T> AsyncStreamReceiver<T> {
/// Returns whether the stream has finished producing items.
pub fn is_finished(&self) -> bool {
self.finished.load(Ordering::Relaxed) && self.receiver.is_empty()
self.finished.load(Ordering::Relaxed)
}

/// Try to receive the next item from the stream without blocking.
/// Returns `Some(item)` if an item is available, `None` otherwise.
pub fn try_recv(&mut self) -> Option<T> {
match self.receiver.try_recv() {
Ok(item) => Some(item),
match self.receiver.try_next() {
Ok(Some(item)) => Some(item),
Err(_) => {
// Check if we're truly finished (no more items will come)
// No message yet, and sender is not dropped.
None
}
Ok(None) => {
// Sender is closed, stream exhausted
if self.finished.load(Ordering::Relaxed) {
// Signal to the producer that we're done
self.received.store(true, Ordering::Relaxed);
Expand Down
8 changes: 5 additions & 3 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use bevy_tasks::ConditionalSendFuture;
use bevy_tasks::futures_lite::Stream;
use bevy_tasks::futures_lite::StreamExt;
use bevy_tasks::futures_lite::stream;
use futures::SinkExt;
use futures::channel::mpsc;
use futures::task::AtomicWaker;
use tokio::sync::mpsc;

use crate::ConditionalSendStream;
use crate::receiver::AsyncStreamReceiver;
Expand Down Expand Up @@ -59,7 +60,7 @@ impl<T: ConditionalSend + 'static> AsyncStream<T> {
impl ConditionalSendFuture<Output = ()>,
AsyncStreamReceiver<T>,
) {
let (tx, rx) = mpsc::unbounded_channel();
let (mut tx, rx) = mpsc::unbounded();
let finished = Arc::new(AtomicBool::new(false));
let finished_clone = Arc::clone(&finished);
let waker = Arc::new(AtomicWaker::new());
Expand All @@ -69,8 +70,9 @@ impl<T: ConditionalSend + 'static> AsyncStream<T> {

let fut = async move {
let mut stream = self.stream;

while let Some(item) = stream.next().await {
if tx.send(item).is_err() {
if tx.send(item).await.is_err() {
// Receiver dropped, exit early
break;
}
Expand Down
10 changes: 9 additions & 1 deletion src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::task::Poll;

#[cfg(not(target_arch = "wasm32"))]
use async_compat::CompatExt;
use bevy_tasks::ConditionalSend;
use bevy_tasks::ConditionalSendFuture;
use futures::channel::oneshot;
use futures::task::AtomicWaker;
use tokio::sync::oneshot;

use crate::AsyncReceiver;
use crate::error::TimeoutError;
Expand Down Expand Up @@ -91,7 +93,10 @@ where
let waker = waker.clone();
let received = received.clone();
async move {
#[cfg(target_arch = "wasm32")]
let result = self.fut.await;
#[cfg(not(target_arch = "wasm32"))]
let result = self.fut.compat().await;

if let Ok(()) = tx.send(result) {
// Wait for the receiver to get the result before dropping.
Expand Down Expand Up @@ -188,7 +193,10 @@ where
let waker = waker.clone();
let received = received.clone();
async move {
#[cfg(target_arch = "wasm32")]
let result = timeout(self.timeout, self.fut).await;
#[cfg(not(target_arch = "wasm32"))]
let result = timeout(self.timeout, self.fut.compat()).await;

if let Ok(()) = tx.send(result) {
// Wait for the receiver to get the result before dropping.
Expand Down
Loading