From ab44f87f8d27bc20870cdf9665f69256aefa27ba Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Wed, 30 Jul 2025 11:52:26 -0700 Subject: [PATCH 1/4] Don't support fallible drop in futures_and_streams This commit is a refinement of #11325 to use `.unwrap()` internally instead of ignoring errors from dropping futures and streams. Fallible drop isn't supported in Rust and these shouldn't panic assuming the host is properly matching handles to stores. --- .../tests/scenario/streams.rs | 28 +++++----- .../concurrent/futures_and_streams.rs | 54 ++++++++++--------- 2 files changed, 42 insertions(+), 40 deletions(-) diff --git a/crates/misc/component-async-tests/tests/scenario/streams.rs b/crates/misc/component-async-tests/tests/scenario/streams.rs index f6a5972ae6e5..a0d45fce642a 100644 --- a/crates/misc/component-async-tests/tests/scenario/streams.rs +++ b/crates/misc/component-async-tests/tests/scenario/streams.rs @@ -54,17 +54,16 @@ pub async fn async_watch_streams() -> Result<()> { .run_concurrent(&mut store, async |store| { futures::join!(tx.watch_reader(store), async { rx.close_with(store) }).1 }) - .await??; + .await?; // Test dropping and then watching the read end of a stream. let (mut tx, rx) = instance.stream::(&mut store)?; instance .run_concurrent(&mut store, async |store| { - rx.close_with(store)?; + rx.close_with(store); tx.watch_reader(store).await; - anyhow::Ok(()) }) - .await??; + .await?; // Test watching and then dropping the write end of a stream. let (tx, mut rx) = instance.stream::(&mut store)?; @@ -72,17 +71,16 @@ pub async fn async_watch_streams() -> Result<()> { .run_concurrent(&mut store, async |store| { futures::join!(rx.watch_writer(store), async { tx.close_with(store) }).1 }) - .await??; + .await?; // Test dropping and then watching the write end of a stream. let (tx, mut rx) = instance.stream::(&mut store)?; instance .run_concurrent(&mut store, async |store| { - tx.close_with(store)?; + tx.close_with(store); rx.watch_writer(store).await; - anyhow::Ok(()) }) - .await??; + .await?; // Test watching and then dropping the read end of a future. let (mut tx, rx) = instance.future::(&mut store, || 42)?; @@ -90,17 +88,16 @@ pub async fn async_watch_streams() -> Result<()> { .run_concurrent(&mut store, async |store| { futures::join!(tx.watch_reader(store), async { rx.close_with(store) }).1 }) - .await??; + .await?; // Test dropping and then watching the read end of a future. let (mut tx, rx) = instance.future::(&mut store, || 42)?; instance .run_concurrent(&mut store, async |store| { - rx.close_with(store)?; + rx.close_with(store); tx.watch_reader(store).await; - anyhow::Ok(()) }) - .await??; + .await?; // Test watching and then dropping the write end of a future. let (tx, mut rx) = instance.future::(&mut store, || 42)?; @@ -108,17 +105,16 @@ pub async fn async_watch_streams() -> Result<()> { .run_concurrent(&mut store, async |store| { futures::join!(rx.watch_writer(store), async { tx.close_with(store) }).1 }) - .await??; + .await?; // Test dropping and then watching the write end of a future. let (tx, mut rx) = instance.future::(&mut store, || 42)?; instance .run_concurrent(&mut store, async |store| { - tx.close_with(store)?; + tx.close_with(store); rx.watch_writer(store).await; - anyhow::Ok(()) }) - .await??; + .await?; enum Event<'a> { Write(Option>), diff --git a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs index 2809cb6b12f7..f98e633293f1 100644 --- a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs +++ b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs @@ -407,10 +407,10 @@ pub(super) struct FlatAbi { /// them) which require access to the store in order to be disposed of properly. trait DropWithStore: Sized { /// Dispose of `self` using the specified store. - fn drop(&mut self, store: impl AsContextMut) -> Result<()>; + fn drop(&mut self, store: impl AsContextMut); /// Dispose of `self` using the specified accessor. - fn drop_with(&mut self, accessor: impl AsAccessor) -> Result<()> { + fn drop_with(&mut self, accessor: impl AsAccessor) { accessor.as_accessor().with(|store| self.drop(store)) } } @@ -521,7 +521,7 @@ impl FutureWriter { } /// Close this `FutureWriter`, writing the default value. - pub fn close(mut self, store: impl AsContextMut) -> Result<()> + pub fn close(mut self, store: impl AsContextMut) where T: func::Lower + Send + Sync + 'static, { @@ -529,7 +529,7 @@ impl FutureWriter { } /// Close this `FutureWriter`, writing the default value. - pub fn close_with(mut self, accessor: impl AsAccessor) -> Result<()> + pub fn close_with(mut self, accessor: impl AsAccessor) where T: func::Lower + Send + Sync + 'static, { @@ -538,12 +538,13 @@ impl FutureWriter { } impl DropWithStore for FutureWriter { - fn drop(&mut self, mut store: impl AsContextMut) -> Result<()> { + fn drop(&mut self, mut store: impl AsContextMut) { // `self` should never be used again, but leave an invalid handle there just in case. let id = mem::replace(&mut self.id, TableId::new(0)); let default = self.default; self.instance .host_drop_writer(store.as_context_mut(), id, Some(&move || Ok(default()))) + .unwrap() } } @@ -714,25 +715,27 @@ impl FutureReader { } /// Close this `FutureReader`. - pub fn close(mut self, store: impl AsContextMut) -> Result<()> { + pub fn close(mut self, store: impl AsContextMut) { self.drop(store) } /// Close this `FutureReader`. - pub fn close_with(mut self, accessor: impl AsAccessor) -> Result<()> { + pub fn close_with(mut self, accessor: impl AsAccessor) { accessor.as_accessor().with(|access| self.drop(access)) } } impl DropWithStore for FutureReader { - fn drop(&mut self, mut store: impl AsContextMut) -> Result<()> { + fn drop(&mut self, mut store: impl AsContextMut) { // `self` should never be used again, but leave an invalid handle there just in case. let id = mem::replace(&mut self.id, TableId::new(0)); - self.instance.host_drop_reader( - store.as_context_mut().0.traitobj_mut(), - id, - TransmitKind::Future, - ) + self.instance + .host_drop_reader( + store.as_context_mut().0.traitobj_mut(), + id, + TransmitKind::Future, + ) + .unwrap() } } @@ -980,22 +983,23 @@ impl StreamWriter { } /// Close this `StreamWriter`. - pub fn close(mut self, store: impl AsContextMut) -> Result<()> { + pub fn close(mut self, store: impl AsContextMut) { self.drop(store) } /// Close this `StreamWriter`. - pub fn close_with(mut self, accessor: impl AsAccessor) -> Result<()> { + pub fn close_with(mut self, accessor: impl AsAccessor) { accessor.as_accessor().with(|access| self.drop(access)) } } impl DropWithStore for StreamWriter { - fn drop(&mut self, mut store: impl AsContextMut) -> Result<()> { + fn drop(&mut self, mut store: impl AsContextMut) { // `self` should never be used again, but leave an invalid handle there just in case. let id = mem::replace(&mut self.id, TableId::new(0)); self.instance .host_drop_writer(store.as_context_mut(), id, None::<&dyn Fn() -> Result<()>>) + .unwrap() } } @@ -1182,25 +1186,27 @@ impl StreamReader { } /// Close this `StreamReader`. - pub fn close(mut self, store: impl AsContextMut) -> Result<()> { + pub fn close(mut self, store: impl AsContextMut) { self.drop(store) } /// Close this `StreamReader`. - pub fn close_with(mut self, accessor: impl AsAccessor) -> Result<()> { + pub fn close_with(mut self, accessor: impl AsAccessor) { accessor.as_accessor().with(|access| self.drop(access)) } } impl DropWithStore for StreamReader { - fn drop(&mut self, mut store: impl AsContextMut) -> Result<()> { + fn drop(&mut self, mut store: impl AsContextMut) { // `self` should never be used again, but leave an invalid handle there just in case. let id = mem::replace(&mut self.id, TableId::new(0)); - self.instance.host_drop_reader( - store.as_context_mut().0.traitobj_mut(), - id, - TransmitKind::Stream, - ) + self.instance + .host_drop_reader( + store.as_context_mut().0.traitobj_mut(), + id, + TransmitKind::Stream, + ) + .unwrap() } } From 6d1a578f9085b1848dd3f2b4aacf0a1dfc1d563d Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Wed, 30 Jul 2025 12:56:59 -0700 Subject: [PATCH 2/4] Fix build of wasmtime-wasi --- crates/wasi/src/p3/sockets/host/types/tcp.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/wasi/src/p3/sockets/host/types/tcp.rs b/crates/wasi/src/p3/sockets/host/types/tcp.rs index 82cd50a79c7f..aca7b959ba91 100644 --- a/crates/wasi/src/p3/sockets/host/types/tcp.rs +++ b/crates/wasi/src/p3/sockets/host/types/tcp.rs @@ -414,8 +414,8 @@ impl HostTcpSocketWithStore for WasiSockets { let (result_tx, result_rx) = instance .future(&mut view, || Err(ErrorCode::InvalidState)) .context("failed to create future")?; - result_tx.close(&mut view)?; - data_tx.close(&mut view)?; + result_tx.close(&mut view); + data_tx.close(&mut view); Ok((data_rx, result_rx)) } } From 93818e5a8f6e536a7156c97867617ce65e925b38 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Wed, 30 Jul 2025 13:53:37 -0700 Subject: [PATCH 3/4] Don't empty the table during store destruction Leave it around while futures/fibers are being manually dropped so any destructors associated there get access to the table (as required by streams/futures/etc). --- Cargo.lock | 1 + crates/wasi/Cargo.toml | 1 + crates/wasi/tests/all/p3/mod.rs | 1 + .../src/runtime/component/concurrent.rs | 6 ++-- .../src/runtime/component/concurrent/table.rs | 30 ++++--------------- 5 files changed, 11 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d4d21ca7df9f..248d52ac3657 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4979,6 +4979,7 @@ dependencies = [ "cap-rand", "cap-std", "cap-time-ext", + "env_logger 0.11.5", "fs-set-times", "futures", "io-extras", diff --git a/crates/wasi/Cargo.toml b/crates/wasi/Cargo.toml index a225646dc138..a2d5081ae873 100644 --- a/crates/wasi/Cargo.toml +++ b/crates/wasi/Cargo.toml @@ -45,6 +45,7 @@ test-programs-artifacts = { workspace = true } tempfile = { workspace = true } wasmtime = { workspace = true, features = ['cranelift', 'incremental-cache'] } wasmtime-test-util = { workspace = true } +env_logger = { workspace = true } [target.'cfg(unix)'.dependencies] rustix = { workspace = true, features = ["event", "fs", "net"] } diff --git a/crates/wasi/tests/all/p3/mod.rs b/crates/wasi/tests/all/p3/mod.rs index 3adf0cd3f4a7..1d057490baec 100644 --- a/crates/wasi/tests/all/p3/mod.rs +++ b/crates/wasi/tests/all/p3/mod.rs @@ -46,6 +46,7 @@ impl wasmtime_wasi::p2::IoView for Ctx { } async fn run(path: &str) -> anyhow::Result<()> { + let _ = env_logger::try_init(); let path = Path::new(path); let engine = test_programs_artifacts::engine(|config| { config.async_support(true); diff --git a/crates/wasmtime/src/runtime/component/concurrent.rs b/crates/wasmtime/src/runtime/component/concurrent.rs index 043f44cc70c6..628df13954ac 100644 --- a/crates/wasmtime/src/runtime/component/concurrent.rs +++ b/crates/wasmtime/src/runtime/component/concurrent.rs @@ -4277,9 +4277,9 @@ impl ConcurrentState { fibers: &mut Vec>, futures: &mut Vec>, ) { - for entry in mem::take(&mut self.table) { - if let Ok(set) = entry.downcast::() { - for mode in set.waiting.into_values() { + for entry in self.table.iter_mut() { + if let Some(set) = entry.downcast_mut::() { + for mode in mem::take(&mut set.waiting).into_values() { if let WaitMode::Fiber(fiber) = mode { fibers.push(fiber); } diff --git a/crates/wasmtime/src/runtime/component/concurrent/table.rs b/crates/wasmtime/src/runtime/component/concurrent/table.rs index d46a0dfe7005..474d8b2ebdbc 100644 --- a/crates/wasmtime/src/runtime/component/concurrent/table.rs +++ b/crates/wasmtime/src/runtime/component/concurrent/table.rs @@ -356,32 +356,12 @@ impl Table { } Ok(e) } -} - -pub struct TableIterator(vec::IntoIter); - -impl Iterator for TableIterator { - type Item = Box; - fn next(&mut self) -> Option { - loop { - if let Some(entry) = self.0.next() { - if let Entry::Occupied { entry } = entry { - break Some(entry.entry); - } - } else { - break None; - } - } - } -} - -impl IntoIterator for Table { - type Item = Box; - type IntoIter = TableIterator; - - fn into_iter(self) -> TableIterator { - TableIterator(self.entries.into_iter()) + pub fn iter_mut(&mut self) -> impl Iterator { + self.entries.iter_mut().filter_map(|entry| match entry { + Entry::Occupied { entry } => Some(&mut *entry.entry), + Entry::Free { .. } => None, + }) } } From f7ea4f0be12a967b2d8394bc9af993de014d3837 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Wed, 30 Jul 2025 14:03:44 -0700 Subject: [PATCH 4/4] Remove unused import --- crates/wasmtime/src/runtime/component/concurrent/table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/wasmtime/src/runtime/component/concurrent/table.rs b/crates/wasmtime/src/runtime/component/concurrent/table.rs index 474d8b2ebdbc..382eb07e6add 100644 --- a/crates/wasmtime/src/runtime/component/concurrent/table.rs +++ b/crates/wasmtime/src/runtime/component/concurrent/table.rs @@ -13,7 +13,7 @@ use std::collections::BTreeSet; use std::fmt; use std::hash::{Hash, Hasher}; use std::marker::PhantomData; -use std::vec::{self, Vec}; +use std::vec::Vec; pub struct TableId { rep: u32,