Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 12 additions & 16 deletions crates/misc/component-async-tests/tests/scenario/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,71 +54,67 @@ pub async fn async_watch_streams() -> Result<()> {
.run_concurrent(&mut store, async |store| {
futures::join!(tx.watch_reader(store), async { rx.close_with(store) }).1
})
.await??;
.await?;

// Test dropping and then watching the read end of a stream.
let (mut tx, rx) = instance.stream::<u8>(&mut store)?;
instance
.run_concurrent(&mut store, async |store| {
rx.close_with(store)?;
rx.close_with(store);
tx.watch_reader(store).await;
anyhow::Ok(())
})
.await??;
.await?;

// Test watching and then dropping the write end of a stream.
let (tx, mut rx) = instance.stream::<u8>(&mut store)?;
instance
.run_concurrent(&mut store, async |store| {
futures::join!(rx.watch_writer(store), async { tx.close_with(store) }).1
})
.await??;
.await?;

// Test dropping and then watching the write end of a stream.
let (tx, mut rx) = instance.stream::<u8>(&mut store)?;
instance
.run_concurrent(&mut store, async |store| {
tx.close_with(store)?;
tx.close_with(store);
rx.watch_writer(store).await;
anyhow::Ok(())
})
.await??;
.await?;

// Test watching and then dropping the read end of a future.
let (mut tx, rx) = instance.future::<u8>(&mut store, || 42)?;
instance
.run_concurrent(&mut store, async |store| {
futures::join!(tx.watch_reader(store), async { rx.close_with(store) }).1
})
.await??;
.await?;

// Test dropping and then watching the read end of a future.
let (mut tx, rx) = instance.future::<u8>(&mut store, || 42)?;
instance
.run_concurrent(&mut store, async |store| {
rx.close_with(store)?;
rx.close_with(store);
tx.watch_reader(store).await;
anyhow::Ok(())
})
.await??;
.await?;

// Test watching and then dropping the write end of a future.
let (tx, mut rx) = instance.future::<u8>(&mut store, || 42)?;
instance
.run_concurrent(&mut store, async |store| {
futures::join!(rx.watch_writer(store), async { tx.close_with(store) }).1
})
.await??;
.await?;

// Test dropping and then watching the write end of a future.
let (tx, mut rx) = instance.future::<u8>(&mut store, || 42)?;
instance
.run_concurrent(&mut store, async |store| {
tx.close_with(store)?;
tx.close_with(store);
rx.watch_writer(store).await;
anyhow::Ok(())
})
.await??;
.await?;

enum Event<'a> {
Write(Option<GuardedStreamWriter<'a, u8, Ctx>>),
Expand Down
1 change: 1 addition & 0 deletions crates/wasi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ test-programs-artifacts = { workspace = true }
tempfile = { workspace = true }
wasmtime = { workspace = true, features = ['cranelift', 'incremental-cache'] }
wasmtime-test-util = { workspace = true }
env_logger = { workspace = true }

[target.'cfg(unix)'.dependencies]
rustix = { workspace = true, features = ["event", "fs", "net"] }
Expand Down
4 changes: 2 additions & 2 deletions crates/wasi/src/p3/sockets/host/types/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,8 @@ impl HostTcpSocketWithStore for WasiSockets {
let (result_tx, result_rx) = instance
.future(&mut view, || Err(ErrorCode::InvalidState))
.context("failed to create future")?;
result_tx.close(&mut view)?;
data_tx.close(&mut view)?;
result_tx.close(&mut view);
data_tx.close(&mut view);
Ok((data_rx, result_rx))
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/wasi/tests/all/p3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl wasmtime_wasi::p2::IoView for Ctx {
}

async fn run(path: &str) -> anyhow::Result<()> {
let _ = env_logger::try_init();
let path = Path::new(path);
let engine = test_programs_artifacts::engine(|config| {
config.async_support(true);
Expand Down
6 changes: 3 additions & 3 deletions crates/wasmtime/src/runtime/component/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4277,9 +4277,9 @@ impl ConcurrentState {
fibers: &mut Vec<StoreFiber<'static>>,
futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
) {
for entry in mem::take(&mut self.table) {
if let Ok(set) = entry.downcast::<WaitableSet>() {
for mode in set.waiting.into_values() {
for entry in self.table.iter_mut() {
if let Some(set) = entry.downcast_mut::<WaitableSet>() {
for mode in mem::take(&mut set.waiting).into_values() {
if let WaitMode::Fiber(fiber) = mode {
fibers.push(fiber);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,10 +407,10 @@ pub(super) struct FlatAbi {
/// them) which require access to the store in order to be disposed of properly.
trait DropWithStore: Sized {
/// Dispose of `self` using the specified store.
fn drop(&mut self, store: impl AsContextMut) -> Result<()>;
fn drop(&mut self, store: impl AsContextMut);

/// Dispose of `self` using the specified accessor.
fn drop_with(&mut self, accessor: impl AsAccessor) -> Result<()> {
fn drop_with(&mut self, accessor: impl AsAccessor) {
accessor.as_accessor().with(|store| self.drop(store))
}
}
Expand Down Expand Up @@ -521,15 +521,15 @@ impl<T> FutureWriter<T> {
}

/// Close this `FutureWriter`, writing the default value.
pub fn close(mut self, store: impl AsContextMut) -> Result<()>
pub fn close(mut self, store: impl AsContextMut)
where
T: func::Lower + Send + Sync + 'static,
{
self.drop(store)
}

/// Close this `FutureWriter`, writing the default value.
pub fn close_with(mut self, accessor: impl AsAccessor) -> Result<()>
pub fn close_with(mut self, accessor: impl AsAccessor)
where
T: func::Lower + Send + Sync + 'static,
{
Expand All @@ -538,12 +538,13 @@ impl<T> FutureWriter<T> {
}

impl<T: func::Lower + Send + Sync + 'static> DropWithStore for FutureWriter<T> {
fn drop(&mut self, mut store: impl AsContextMut) -> Result<()> {
fn drop(&mut self, mut store: impl AsContextMut) {
// `self` should never be used again, but leave an invalid handle there just in case.
let id = mem::replace(&mut self.id, TableId::new(0));
let default = self.default;
self.instance
.host_drop_writer(store.as_context_mut(), id, Some(&move || Ok(default())))
.unwrap()
}
}

Expand Down Expand Up @@ -714,25 +715,27 @@ impl<T> FutureReader<T> {
}

/// Close this `FutureReader`.
pub fn close(mut self, store: impl AsContextMut) -> Result<()> {
pub fn close(mut self, store: impl AsContextMut) {
self.drop(store)
}

/// Close this `FutureReader`.
pub fn close_with(mut self, accessor: impl AsAccessor) -> Result<()> {
pub fn close_with(mut self, accessor: impl AsAccessor) {
accessor.as_accessor().with(|access| self.drop(access))
}
}

impl<T> DropWithStore for FutureReader<T> {
fn drop(&mut self, mut store: impl AsContextMut) -> Result<()> {
fn drop(&mut self, mut store: impl AsContextMut) {
// `self` should never be used again, but leave an invalid handle there just in case.
let id = mem::replace(&mut self.id, TableId::new(0));
self.instance.host_drop_reader(
store.as_context_mut().0.traitobj_mut(),
id,
TransmitKind::Future,
)
self.instance
.host_drop_reader(
store.as_context_mut().0.traitobj_mut(),
id,
TransmitKind::Future,
)
.unwrap()
}
}

Expand Down Expand Up @@ -980,22 +983,23 @@ impl<T> StreamWriter<T> {
}

/// Close this `StreamWriter`.
pub fn close(mut self, store: impl AsContextMut) -> Result<()> {
pub fn close(mut self, store: impl AsContextMut) {
self.drop(store)
}

/// Close this `StreamWriter`.
pub fn close_with(mut self, accessor: impl AsAccessor) -> Result<()> {
pub fn close_with(mut self, accessor: impl AsAccessor) {
accessor.as_accessor().with(|access| self.drop(access))
}
}

impl<T> DropWithStore for StreamWriter<T> {
fn drop(&mut self, mut store: impl AsContextMut) -> Result<()> {
fn drop(&mut self, mut store: impl AsContextMut) {
// `self` should never be used again, but leave an invalid handle there just in case.
let id = mem::replace(&mut self.id, TableId::new(0));
self.instance
.host_drop_writer(store.as_context_mut(), id, None::<&dyn Fn() -> Result<()>>)
.unwrap()
}
}

Expand Down Expand Up @@ -1182,25 +1186,27 @@ impl<T> StreamReader<T> {
}

/// Close this `StreamReader`.
pub fn close(mut self, store: impl AsContextMut) -> Result<()> {
pub fn close(mut self, store: impl AsContextMut) {
self.drop(store)
}

/// Close this `StreamReader`.
pub fn close_with(mut self, accessor: impl AsAccessor) -> Result<()> {
pub fn close_with(mut self, accessor: impl AsAccessor) {
accessor.as_accessor().with(|access| self.drop(access))
}
}

impl<T> DropWithStore for StreamReader<T> {
fn drop(&mut self, mut store: impl AsContextMut) -> Result<()> {
fn drop(&mut self, mut store: impl AsContextMut) {
// `self` should never be used again, but leave an invalid handle there just in case.
let id = mem::replace(&mut self.id, TableId::new(0));
self.instance.host_drop_reader(
store.as_context_mut().0.traitobj_mut(),
id,
TransmitKind::Stream,
)
self.instance
.host_drop_reader(
store.as_context_mut().0.traitobj_mut(),
id,
TransmitKind::Stream,
)
.unwrap()
}
}

Expand Down
32 changes: 6 additions & 26 deletions crates/wasmtime/src/runtime/component/concurrent/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::collections::BTreeSet;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::marker::PhantomData;
use std::vec::{self, Vec};
use std::vec::Vec;

pub struct TableId<T> {
rep: u32,
Expand Down Expand Up @@ -356,32 +356,12 @@ impl Table {
}
Ok(e)
}
}

pub struct TableIterator(vec::IntoIter<Entry>);

impl Iterator for TableIterator {
type Item = Box<dyn Any + Send + Sync>;

fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(entry) = self.0.next() {
if let Entry::Occupied { entry } = entry {
break Some(entry.entry);
}
} else {
break None;
}
}
}
}

impl IntoIterator for Table {
type Item = Box<dyn Any + Send + Sync>;
type IntoIter = TableIterator;

fn into_iter(self) -> TableIterator {
TableIterator(self.entries.into_iter())
pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (dyn Any + Send + Sync)> {
self.entries.iter_mut().filter_map(|entry| match entry {
Entry::Occupied { entry } => Some(&mut *entry.entry),
Entry::Free { .. } => None,
})
}
}

Expand Down