From 4194e0a5cd8c9dedd657073d2e48ceaa530eb2ac Mon Sep 17 00:00:00 2001 From: Joel Dice Date: Thu, 17 Jul 2025 15:10:50 -0600 Subject: [PATCH 1/3] refactor `{Stream,Future}|{Reader,Writer}` APIs and internals This makes a several changes to how `{Stream,Future}|{Reader,Writer}` work to make them more efficient and, in some ways, more ergonomic: - The background tasks have been removed, allowing reads and writes to complete without task context switching. We now only allocate and use oneshot channels lazily when the other end is not yet ready; this improves real world performance benchmarks (e.g. wasi-http request handling) considerably. - Instances of `{Stream,Future}Reader` can now be lifted and lowered directly; no need for `Host{Stream,Future}` anymore. - The type parameter for `Stream{Reader,Writer}` no longer refers to the buffer type -- just the payload type (i.e. `StreamReader` instead of `StreamReader>`), meaning any buffer type may be used for a given read or write operation. This also means the compiler needs help with type inference less often when calling `Instance::stream`. - Instances of `{Stream,Future}|{Reader,Writer}` now require access to the store in order to be disposed of properly. I've added RAII wrapper structs (`WithAccessor[AndValue]`) to help with this, and also updated `Store::drop` and `Instance::run_concurrent` to ensure the store thread-local is set when dropping futures closing over `&Accessor`s. - In order to ensure that resources containing `{Stream,Future}|{Reader,Writer}` instances are disposed of properly, I've added `LinkerInstance::resource_concurrent` and have updated `wasmtime-wit-bindgen` to use it. This gives resource drop functions access to a `StoreContextMut` via an `Accessor`, allowing the stream and future handles to be disposed of. - In order to make this work, I had to change `Accessor::instance` from a `Instance` to an `Option`, which is awkward but temporary since we're planning to remove `Accessor::instance` entirely once we've moved concurrent state from `ComponentInstance` to `Store`. That problem of disposal is definitely the most awkward part of all this. In simple cases, it's easy enough to ensure that read and write handles are disposed of properly, but both `wasmtime-wasi` and `wasmtime-wasi-http` have some pretty complicated functions where handles are passed between tasks and/or stored inside resources, so it can be tricky to ensure proper disposal on all code paths. I'm open to ideas for improving this, but I suspect we'll need new Rust language features (e.g. linear types) to make it truly ergonomic, robust, and efficient. While testing the above, I discovered an issue with `Instance::poll_until` such that it would prematurely give up and return a "deadlock" trap error, believing that there was no further work to do, even though the future passed to it was ready to resolve the next time it was polled. I've fixed this by polling it one last time and only trapping if it returns pending. Note that I've moved a few associated functions from `ConcurrentState` to `Instance` (e.g. `guest_drop_writable` and others) since they now need access to the store; they're unchanged otherwise. Apologies for the diff noise. Finally, I've tweaked how `wasmtime serve` to poll the guest for content before handing the response to Hyper, which helps performance by ensuring the first content chunk can be sent with the same TCP packet as the beginning of the response. Signed-off-by: Joel Dice fix wasi p3 build and test failures Signed-off-by: Joel Dice use `ManuallyDrop` instead of `Option` in `Dropper` This allows us to drop its `value` field in-place, i.e. without moving it, thereby upholding the `Pin` guarantee. Signed-off-by: Joel Dice address review comments - Remove `DropWithStoreAndValue` and friends; go back to taking a `fn() -> T` parameter in `Instance::future` instead - Make `DropWithStore::drop[_with]` take `&mut self` instead of `self` - Make `WithAccessor` and `DropWithStore` private - Instead, I've added public `Guarded{Stream,Future}{Reader,Writer}` types for RAII - and also `{Stream,Future}{Reader,Writer}::close[_with]` methods - Use RAII in `FutureReader::read` and `FutureWriter::write` to ensure handles are dropped if the `Future` is dropped Signed-off-by: Joel Dice --- .../src/resource_stream.rs | 16 +- .../tests/scenario/streams.rs | 277 ++- .../tests/scenario/transmit.rs | 130 +- crates/wasi/src/p3/cli/host.rs | 37 +- .../src/runtime/component/concurrent.rs | 202 +- .../concurrent/futures_and_streams.rs | 2103 ++++++++--------- .../runtime/component/concurrent_disabled.rs | 8 +- .../wasmtime/src/runtime/component/linker.rs | 53 + crates/wasmtime/src/runtime/component/mod.rs | 5 +- .../wasmtime/src/runtime/component/store.rs | 7 +- .../wasmtime/src/runtime/component/values.rs | 10 +- crates/wasmtime/src/runtime/store.rs | 6 +- .../src/runtime/vm/component/libcalls.rs | 16 +- crates/wit-bindgen/src/lib.rs | 45 +- crates/wit-bindgen/src/rust.rs | 4 +- 15 files changed, 1528 insertions(+), 1391 deletions(-) diff --git a/crates/misc/component-async-tests/src/resource_stream.rs b/crates/misc/component-async-tests/src/resource_stream.rs index af030fbb6def..3b8bf30dda41 100644 --- a/crates/misc/component-async-tests/src/resource_stream.rs +++ b/crates/misc/component-async-tests/src/resource_stream.rs @@ -1,5 +1,7 @@ use anyhow::Result; -use wasmtime::component::{Accessor, AccessorTask, HostStream, Resource, StreamWriter}; +use wasmtime::component::{ + Accessor, AccessorTask, GuardedStreamWriter, Resource, StreamReader, StreamWriter, +}; use wasmtime_wasi::p2::IoView; use super::Ctx; @@ -36,20 +38,20 @@ impl bindings::local::local::resource_stream::HostWithStore for Ctx { async fn foo( accessor: &Accessor, count: u32, - ) -> wasmtime::Result>> { + ) -> wasmtime::Result>> { struct Task { - tx: StreamWriter>>, + tx: StreamWriter>, count: u32, } impl AccessorTask> for Task { async fn run(self, accessor: &Accessor) -> Result<()> { - let mut tx = self.tx; + let mut tx = GuardedStreamWriter::new(accessor, self.tx); for _ in 0..self.count { let item = accessor.with(|mut view| view.get().table().push(ResourceStreamX))?; - tx.write_all(accessor, Some(item)).await; + tx.write_all(Some(item)).await; } Ok(()) } @@ -57,10 +59,10 @@ impl bindings::local::local::resource_stream::HostWithStore for Ctx { let (tx, rx) = accessor.with(|mut view| { let instance = view.instance(); - instance.stream::<_, _, Option<_>>(&mut view) + instance.stream(&mut view) })?; accessor.spawn(Task { tx, count }); - Ok(rx.into()) + Ok(rx) } } diff --git a/crates/misc/component-async-tests/tests/scenario/streams.rs b/crates/misc/component-async-tests/tests/scenario/streams.rs index e77daf683aaf..f6a5972ae6e5 100644 --- a/crates/misc/component-async-tests/tests/scenario/streams.rs +++ b/crates/misc/component-async-tests/tests/scenario/streams.rs @@ -14,7 +14,10 @@ use { }, wasmtime::{ Engine, Store, - component::{Linker, ResourceTable, StreamReader, StreamWriter, VecBuffer}, + component::{ + GuardedFutureReader, GuardedStreamReader, GuardedStreamWriter, Linker, ResourceTable, + VecBuffer, + }, }, wasmtime_wasi::p2::WasiCtxBuilder, }; @@ -46,108 +49,114 @@ pub async fn async_watch_streams() -> Result<()> { let instance = linker.instantiate_async(&mut store, &component).await?; // Test watching and then dropping the read end of a stream. - let (mut tx, rx) = instance.stream::, Option<_>>(&mut store)?; + let (mut tx, rx) = instance.stream::(&mut store)?; instance .run_concurrent(&mut store, async |store| { - futures::join!(tx.watch_reader(store), async { - drop(rx); - }); + futures::join!(tx.watch_reader(store), async { rx.close_with(store) }).1 }) - .await?; + .await??; // Test dropping and then watching the read end of a stream. - let (mut tx, rx) = instance.stream::, Option<_>>(&mut store)?; - drop(rx); + let (mut tx, rx) = instance.stream::(&mut store)?; instance - .run_concurrent(&mut store, async |store| tx.watch_reader(store).await) - .await?; + .run_concurrent(&mut store, async |store| { + rx.close_with(store)?; + tx.watch_reader(store).await; + anyhow::Ok(()) + }) + .await??; // Test watching and then dropping the write end of a stream. - let (tx, mut rx) = instance.stream::, Option<_>>(&mut store)?; + let (tx, mut rx) = instance.stream::(&mut store)?; instance .run_concurrent(&mut store, async |store| { - futures::join!(rx.watch_writer(store), async { - drop(tx); - }); + futures::join!(rx.watch_writer(store), async { tx.close_with(store) }).1 }) - .await?; + .await??; // Test dropping and then watching the write end of a stream. - let (tx, mut rx) = instance.stream::, Option<_>>(&mut store)?; - drop(tx); + let (tx, mut rx) = instance.stream::(&mut store)?; instance - .run_concurrent(&mut store, async |store| rx.watch_writer(store).await) - .await?; + .run_concurrent(&mut store, async |store| { + tx.close_with(store)?; + rx.watch_writer(store).await; + anyhow::Ok(()) + }) + .await??; // Test watching and then dropping the read end of a future. - let (mut tx, rx) = instance.future::(|| 42, &mut store)?; + let (mut tx, rx) = instance.future::(&mut store, || 42)?; instance .run_concurrent(&mut store, async |store| { - futures::join!(tx.watch_reader(store), async { - drop(rx); - }); + futures::join!(tx.watch_reader(store), async { rx.close_with(store) }).1 }) - .await?; + .await??; // Test dropping and then watching the read end of a future. - let (mut tx, rx) = instance.future::(|| 42, &mut store)?; - drop(rx); + let (mut tx, rx) = instance.future::(&mut store, || 42)?; instance - .run_concurrent(&mut store, async |store| tx.watch_reader(store).await) - .await?; + .run_concurrent(&mut store, async |store| { + rx.close_with(store)?; + tx.watch_reader(store).await; + anyhow::Ok(()) + }) + .await??; // Test watching and then dropping the write end of a future. - let (tx, mut rx) = instance.future::(|| 42, &mut store)?; + let (tx, mut rx) = instance.future::(&mut store, || 42)?; instance .run_concurrent(&mut store, async |store| { - futures::join!(rx.watch_writer(store), async { - drop(tx); - }); + futures::join!(rx.watch_writer(store), async { tx.close_with(store) }).1 }) - .await?; + .await??; // Test dropping and then watching the write end of a future. - let (tx, mut rx) = instance.future::(|| 42, &mut store)?; - drop(tx); + let (tx, mut rx) = instance.future::(&mut store, || 42)?; instance - .run_concurrent(&mut store, async |store| rx.watch_writer(store).await) - .await?; + .run_concurrent(&mut store, async |store| { + tx.close_with(store)?; + rx.watch_writer(store).await; + anyhow::Ok(()) + }) + .await??; - enum Event { - Write(Option>>), - Read(Option>>, Option), + enum Event<'a> { + Write(Option>), + Read(Option>, Option), } // Test watching, then writing to, then dropping, then writing again to the // read end of a stream. + let (tx, rx) = instance.stream(&mut store)?; instance - .run_concurrent(&mut store, async |store| -> wasmtime::Result<_> { + .run_concurrent(&mut store, async move |store| -> wasmtime::Result<_> { + let mut tx = GuardedStreamWriter::new(store, tx); + let mut rx = GuardedStreamReader::new(store, rx); let mut futures = FuturesUnordered::new(); - let (mut tx, mut rx) = store.with(|s| instance.stream(s))?; assert!( - pin!(tx.watch_reader(store)) + pin!(tx.watch_reader()) .poll(&mut Context::from_waker(&Waker::noop())) .is_pending() ); futures.push( async move { - tx.write_all(store, Some(42)).await; + tx.write_all(Some(42)).await; let w = if tx.is_closed() { None } else { Some(tx) }; - Event::Write(w) + anyhow::Ok(Event::Write(w)) } .boxed(), ); futures.push( async move { - let b = rx.read(store, None).await; + let b = rx.read(None).await; let r = if rx.is_closed() { None } else { Some(rx) }; - Event::Read(r, b) + Ok(Event::Read(r, b)) } .boxed(), ); let mut rx = None; let mut tx = None; - while let Some(event) = futures.next().await { + while let Some(event) = futures.try_next().await? { match event { Event::Write(None) => unreachable!(), Event::Write(Some(new_tx)) => tx = Some(new_tx), @@ -161,8 +170,8 @@ pub async fn async_watch_streams() -> Result<()> { drop(rx); let mut tx = tx.take().unwrap(); - tx.watch_reader(store).await; - tx.write_all(store, Some(42)).await; + tx.watch_reader().await; + tx.write_all(Some(42)).await; assert!(tx.is_closed()); Ok(()) }) @@ -206,10 +215,10 @@ pub async fn test_closed_streams(watch: bool) -> Result<()> { let instance = linker.instantiate_async(&mut store, &component).await?; - enum StreamEvent { - FirstWrite(Option>>), - FirstRead(Option>>, Vec), - SecondWrite(Option>>), + enum StreamEvent<'a> { + FirstWrite(Option>), + FirstRead(Option>, Vec), + SecondWrite(Option>), GuestCompleted, } @@ -225,83 +234,95 @@ pub async fn test_closed_streams(watch: bool) -> Result<()> { let value = 42_u8; // First, test stream host->host - instance - .run_concurrent(&mut store, async |store| -> wasmtime::Result<_> { - let (mut tx, mut rx) = store.with(|mut s| instance.stream(&mut s))?; + { + let (tx, rx) = instance.stream(&mut store)?; + let values = values.clone(); - let mut futures = FuturesUnordered::new(); - futures.push({ - let values = values.clone(); - async move { - tx.write_all(store, values.into()).await; - StreamEvent::FirstWrite(if tx.is_closed() { None } else { Some(tx) }) - } - .boxed() - }); - futures.push( - async move { - let b = rx.read(store, Vec::with_capacity(3)).await; - let r = if rx.is_closed() { None } else { Some(rx) }; - StreamEvent::FirstRead(r, b) - } - .boxed(), - ); + instance + .run_concurrent(&mut store, async move |store| -> wasmtime::Result<_> { + let mut tx = GuardedStreamWriter::new(store, tx); + let mut rx = GuardedStreamReader::new(store, rx); - let mut count = 0; - while let Some(event) = futures.next().await { - count += 1; - match event { - StreamEvent::FirstWrite(Some(mut tx)) => { - if watch { - futures.push( - async move { - tx.watch_reader(store).await; - StreamEvent::SecondWrite(None) - } - .boxed(), - ); + let mut futures = FuturesUnordered::new(); + futures.push({ + let values = values.clone(); + async move { + tx.write_all(VecBuffer::from(values)).await; + anyhow::Ok(StreamEvent::FirstWrite(if tx.is_closed() { + None } else { - futures.push({ - let values = values.clone(); - async move { - tx.write_all(store, values.into()).await; - StreamEvent::SecondWrite(if tx.is_closed() { - None - } else { - Some(tx) - }) - } - .boxed() - }); - } + Some(tx) + })) } - StreamEvent::FirstWrite(None) => { - panic!("first write should have been accepted") - } - StreamEvent::FirstRead(Some(_), results) => { - assert_eq!(values, results); + .boxed() + }); + futures.push( + async move { + let b = rx.read(Vec::with_capacity(3)).await; + let r = if rx.is_closed() { None } else { Some(rx) }; + Ok(StreamEvent::FirstRead(r, b)) } - StreamEvent::FirstRead(None, _) => unreachable!(), - StreamEvent::SecondWrite(None) => {} - StreamEvent::SecondWrite(Some(_)) => { - panic!("second write should _not_ have been accepted") + .boxed(), + ); + + let mut count = 0; + while let Some(event) = futures.try_next().await? { + count += 1; + match event { + StreamEvent::FirstWrite(Some(mut tx)) => { + if watch { + futures.push( + async move { + tx.watch_reader().await; + Ok(StreamEvent::SecondWrite(None)) + } + .boxed(), + ); + } else { + futures.push({ + let values = values.clone(); + async move { + tx.write_all(VecBuffer::from(values)).await; + Ok(StreamEvent::SecondWrite(if tx.is_closed() { + None + } else { + Some(tx) + })) + } + .boxed() + }); + } + } + StreamEvent::FirstWrite(None) => { + panic!("first write should have been accepted") + } + StreamEvent::FirstRead(Some(_), results) => { + assert_eq!(values, results); + } + StreamEvent::FirstRead(None, _) => unreachable!(), + StreamEvent::SecondWrite(None) => {} + StreamEvent::SecondWrite(Some(_)) => { + panic!("second write should _not_ have been accepted") + } + StreamEvent::GuestCompleted => unreachable!(), } - StreamEvent::GuestCompleted => unreachable!(), } - } - assert_eq!(count, 3); - Ok(()) - }) - .await??; + assert_eq!(count, 3); + Ok(()) + }) + .await??; + } // Next, test futures host->host { - let (tx, rx) = instance.future(|| unreachable!(), &mut store)?; - let (mut tx_ignored, rx_ignored) = instance.future(|| 42u8, &mut store)?; + let (tx, rx) = instance.future(&mut store, || unreachable!())?; + let (mut tx_ignored, rx_ignored) = instance.future(&mut store, || unreachable!())?; instance - .run_concurrent(&mut store, async |store| { + .run_concurrent(&mut store, async move |store| { + let rx_ignored = GuardedFutureReader::new(store, rx_ignored); + let mut futures = FuturesUnordered::new(); futures.push(tx.write(store, value).map(FutureEvent::Write).boxed()); futures.push(rx.read(store).map(FutureEvent::Read).boxed()); @@ -348,24 +369,28 @@ pub async fn test_closed_streams(watch: bool) -> Result<()> { // Next, test stream host->guest { - let (mut tx, rx) = instance.stream::<_, _, Vec<_>>(&mut store)?; + let (tx, rx) = instance.stream(&mut store)?; let closed_streams = closed_streams::bindings::ClosedStreams::new(&mut store, &instance)?; + let values = values.clone(); + instance .run_concurrent(&mut store, async move |accessor| { + let mut tx = GuardedStreamWriter::new(accessor, tx); + let mut futures = FuturesUnordered::new(); futures.push( closed_streams .local_local_closed() - .call_read_stream(accessor, rx.into(), values.clone()) + .call_read_stream(accessor, rx, values.clone()) .map(|v| v.map(|()| StreamEvent::GuestCompleted)) .boxed(), ); futures.push({ let values = values.clone(); async move { - tx.write_all(accessor, values.into()).await; + tx.write_all(VecBuffer::from(values)).await; let w = if tx.is_closed() { None } else { Some(tx) }; Ok(StreamEvent::FirstWrite(w)) } @@ -380,7 +405,7 @@ pub async fn test_closed_streams(watch: bool) -> Result<()> { if watch { futures.push( async move { - tx.watch_reader(accessor).await; + tx.watch_reader().await; Ok(StreamEvent::SecondWrite(None)) } .boxed(), @@ -389,7 +414,7 @@ pub async fn test_closed_streams(watch: bool) -> Result<()> { futures.push({ let values = values.clone(); async move { - tx.write_all(accessor, values.into()).await; + tx.write_all(VecBuffer::from(values)).await; let w = if tx.is_closed() { None } else { Some(tx) }; Ok(StreamEvent::SecondWrite(w)) } @@ -418,8 +443,8 @@ pub async fn test_closed_streams(watch: bool) -> Result<()> { // Next, test futures host->guest { - let (tx, rx) = instance.future(|| unreachable!(), &mut store)?; - let (mut tx_ignored, rx_ignored) = instance.future(|| 0, &mut store)?; + let (tx, rx) = instance.future(&mut store, || unreachable!())?; + let (mut tx_ignored, rx_ignored) = instance.future(&mut store, || unreachable!())?; let closed_streams = closed_streams::bindings::ClosedStreams::new(&mut store, &instance)?; @@ -429,7 +454,7 @@ pub async fn test_closed_streams(watch: bool) -> Result<()> { futures.push( closed_streams .local_local_closed() - .call_read_future(accessor, rx.into(), value, rx_ignored.into()) + .call_read_future(accessor, rx, value, rx_ignored) .map(|v| v.map(|()| FutureEvent::GuestCompleted)) .boxed(), ); diff --git a/crates/misc/component-async-tests/tests/scenario/transmit.rs b/crates/misc/component-async-tests/tests/scenario/transmit.rs index 038c0e4cc8fc..6f40a9f083a0 100644 --- a/crates/misc/component-async-tests/tests/scenario/transmit.rs +++ b/crates/misc/component-async-tests/tests/scenario/transmit.rs @@ -12,8 +12,8 @@ use futures::{ stream::{FuturesUnordered, TryStreamExt}, }; use wasmtime::component::{ - Accessor, Component, HasSelf, HostFuture, HostStream, Instance, Linker, ResourceTable, - StreamReader, StreamWriter, Val, + Accessor, Component, FutureReader, GuardedFutureReader, GuardedStreamReader, + GuardedStreamWriter, HasSelf, Instance, Linker, ResourceTable, StreamReader, Val, }; use wasmtime::{AsContextMut, Engine, Store}; use wasmtime_wasi::p2::WasiCtxBuilder; @@ -175,17 +175,21 @@ pub trait TransmitTest { ) -> impl Future> + Send + 'a; fn into_params( - control: HostStream, - caller_stream: HostStream, - caller_future1: HostFuture, - caller_future2: HostFuture, + control: StreamReader, + caller_stream: StreamReader, + caller_future1: FutureReader, + caller_future2: FutureReader, ) -> Self::Params; fn from_result( store: impl AsContextMut, instance: Instance, result: Self::Result, - ) -> Result<(HostStream, HostFuture, HostFuture)>; + ) -> Result<( + StreamReader, + FutureReader, + FutureReader, + )>; } struct StaticTransmitTest; @@ -193,12 +197,16 @@ struct StaticTransmitTest; impl TransmitTest for StaticTransmitTest { type Instance = transmit::bindings::TransmitCallee; type Params = ( - HostStream, - HostStream, - HostFuture, - HostFuture, + StreamReader, + StreamReader, + FutureReader, + FutureReader, + ); + type Result = ( + StreamReader, + FutureReader, + FutureReader, ); - type Result = (HostStream, HostFuture, HostFuture); async fn instantiate( mut store: impl AsContextMut, @@ -221,10 +229,10 @@ impl TransmitTest for StaticTransmitTest { } fn into_params( - control: HostStream, - caller_stream: HostStream, - caller_future1: HostFuture, - caller_future2: HostFuture, + control: StreamReader, + caller_stream: StreamReader, + caller_future1: FutureReader, + caller_future2: FutureReader, ) -> Self::Params { (control, caller_stream, caller_future1, caller_future2) } @@ -233,7 +241,11 @@ impl TransmitTest for StaticTransmitTest { _: impl AsContextMut, _: Instance, result: Self::Result, - ) -> Result<(HostStream, HostFuture, HostFuture)> { + ) -> Result<( + StreamReader, + FutureReader, + FutureReader, + )> { Ok(result) } } @@ -283,10 +295,10 @@ impl TransmitTest for DynamicTransmitTest { } fn into_params( - control: HostStream, - caller_stream: HostStream, - caller_future1: HostFuture, - caller_future2: HostFuture, + control: StreamReader, + caller_stream: StreamReader, + caller_future1: FutureReader, + caller_future2: FutureReader, ) -> Self::Params { vec![ control.into_val(), @@ -300,13 +312,17 @@ impl TransmitTest for DynamicTransmitTest { mut store: impl AsContextMut, instance: Instance, result: Self::Result, - ) -> Result<(HostStream, HostFuture, HostFuture)> { + ) -> Result<( + StreamReader, + FutureReader, + FutureReader, + )> { let Val::Tuple(fields) = result else { unreachable!() }; - let stream = HostStream::from_val(store.as_context_mut(), instance, &fields[0])?; - let future1 = HostFuture::from_val(store.as_context_mut(), instance, &fields[1])?; - let future2 = HostFuture::from_val(store.as_context_mut(), instance, &fields[2])?; + let stream = StreamReader::from_val(store.as_context_mut(), instance, &fields[0])?; + let future1 = FutureReader::from_val(store.as_context_mut(), instance, &fields[1])?; + let future2 = FutureReader::from_val(store.as_context_mut(), instance, &fields[2])?; Ok((stream, future1, future2)) } } @@ -341,29 +357,32 @@ async fn test_transmit_with(component: &str) -> Re let (test, instance) = Test::instantiate(&mut store, &component, &linker).await?; - enum Event { + enum Event<'a, Test: TransmitTest> { Result(Test::Result), - ControlWriteA(Option>>), - ControlWriteB(Option>>), - ControlWriteC(Option>>), + ControlWriteA(Option>), + ControlWriteB(Option>), + ControlWriteC(Option>), ControlWriteD, WriteA, WriteB(bool), - ReadC(Option>>, Option), + ReadC(Option>, Option), ReadD(Option), - ReadNone(Option>>), + ReadNone(Option>), } - let (mut control_tx, control_rx) = instance.stream::<_, _, Option<_>>(&mut store)?; - let (mut caller_stream_tx, caller_stream_rx) = - instance.stream::<_, _, Option<_>>(&mut store)?; - let (caller_future1_tx, caller_future1_rx) = instance.future(|| unreachable!(), &mut store)?; - let (_caller_future2_tx, caller_future2_rx) = instance.future(|| unreachable!(), &mut store)?; + let (control_tx, control_rx) = instance.stream(&mut store)?; + let (caller_stream_tx, caller_stream_rx) = instance.stream(&mut store)?; + let (caller_future1_tx, caller_future1_rx) = instance.future(&mut store, || unreachable!())?; + let (_caller_future2_tx, caller_future2_rx) = instance.future(&mut store, || unreachable!())?; instance .run_concurrent(&mut store, async move |accessor| { + let mut control_tx = GuardedStreamWriter::new(accessor, control_tx); + let control_rx = GuardedStreamReader::new(accessor, control_rx); + let mut caller_stream_tx = GuardedStreamWriter::new(accessor, caller_stream_tx); + let mut futures = FuturesUnordered::< - Pin>> + Send>>, + Pin>> + Send>>, >::new(); let mut caller_future1_tx = Some(caller_future1_tx); let mut callee_stream_rx = None; @@ -373,7 +392,7 @@ async fn test_transmit_with(component: &str) -> Re futures.push( async move { control_tx - .write_all(accessor, Some(Control::ReadStream("a".into()))) + .write_all(Some(Control::ReadStream("a".into()))) .await; let w = if control_tx.is_closed() { None @@ -387,9 +406,7 @@ async fn test_transmit_with(component: &str) -> Re futures.push( async move { - caller_stream_tx - .write_all(accessor, Some(String::from("a"))) - .await; + caller_stream_tx.write_all(Some(String::from("a"))).await; Ok(Event::WriteA) } .boxed(), @@ -401,9 +418,9 @@ async fn test_transmit_with(component: &str) -> Re &test, Test::into_params( control_rx.into(), - caller_stream_rx.into(), - caller_future1_rx.into(), - caller_future2_rx.into(), + caller_stream_rx, + caller_future1_rx, + caller_future2_rx, ), ) .map(|v| v.map(Event::Result)) @@ -413,19 +430,16 @@ async fn test_transmit_with(component: &str) -> Re while let Some(event) = futures.try_next().await? { match event { Event::Result(result) => { - accessor.with(|mut store| { - let results = Test::from_result(&mut store, instance, result)?; - callee_stream_rx = Some(results.0.into_reader(&mut store)); - callee_future1_rx = Some(results.1.into_reader(&mut store)); - anyhow::Ok(()) - })?; + let (stream_rx, future_rx, _) = accessor + .with(|mut store| Test::from_result(&mut store, instance, result))?; + callee_stream_rx = Some(GuardedStreamReader::new(accessor, stream_rx)); + callee_future1_rx = Some(GuardedFutureReader::new(accessor, future_rx)); } Event::ControlWriteA(tx) => { futures.push( async move { let mut tx = tx.unwrap(); - tx.write_all(accessor, Some(Control::ReadFuture("b".into()))) - .await; + tx.write_all(Some(Control::ReadFuture("b".into()))).await; let w = if tx.is_closed() { None } else { Some(tx) }; Ok(Event::ControlWriteB(w)) } @@ -447,8 +461,7 @@ async fn test_transmit_with(component: &str) -> Re futures.push( async move { let mut tx = tx.unwrap(); - tx.write_all(accessor, Some(Control::WriteStream("c".into()))) - .await; + tx.write_all(Some(Control::WriteStream("c".into()))).await; let w = if tx.is_closed() { None } else { Some(tx) }; Ok(Event::ControlWriteC(w)) } @@ -460,7 +473,7 @@ async fn test_transmit_with(component: &str) -> Re let mut rx = callee_stream_rx.take().unwrap(); futures.push( async move { - let b = rx.read(accessor, None).await; + let b = rx.read(None).await; let r = if rx.is_closed() { None } else { Some(rx) }; Ok(Event::ReadC(r, b)) } @@ -471,8 +484,7 @@ async fn test_transmit_with(component: &str) -> Re futures.push( async move { let mut tx = tx.unwrap(); - tx.write_all(accessor, Some(Control::WriteFuture("d".into()))) - .await; + tx.write_all(Some(Control::WriteFuture("d".into()))).await; Ok(Event::ControlWriteD) } .boxed(), @@ -485,7 +497,7 @@ async fn test_transmit_with(component: &str) -> Re callee_future1_rx .take() .unwrap() - .read(accessor) + .read() .map(Event::ReadD) .map(Ok) .boxed(), @@ -499,7 +511,7 @@ async fn test_transmit_with(component: &str) -> Re let mut rx = callee_stream_rx.take().unwrap(); futures.push( async move { - rx.read(accessor, None).await; + rx.read(None).await; let r = if rx.is_closed() { None } else { Some(rx) }; Ok(Event::ReadNone(r)) } diff --git a/crates/wasi/src/p3/cli/host.rs b/crates/wasi/src/p3/cli/host.rs index c37a569594ee..b5d25fb4e1af 100644 --- a/crates/wasi/src/p3/cli/host.rs +++ b/crates/wasi/src/p3/cli/host.rs @@ -11,12 +11,13 @@ use bytes::BytesMut; use std::io::Cursor; use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _}; use wasmtime::component::{ - Accessor, AccessorTask, HasData, HostStream, Resource, StreamReader, StreamWriter, + Accessor, AccessorTask, GuardedStreamReader, GuardedStreamWriter, HasData, Resource, + StreamReader, StreamWriter, }; struct InputTask { rx: T, - tx: StreamWriter>, + tx: StreamWriter, } impl AccessorTask> for InputTask @@ -26,15 +27,12 @@ where { async fn run(mut self, store: &Accessor) -> wasmtime::Result<()> { let mut buf = BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY); - while !self.tx.is_closed() { + let mut tx = GuardedStreamWriter::new(store, self.tx); + while !tx.is_closed() { match self.rx.read_buf(&mut buf).await { Ok(0) => return Ok(()), Ok(_) => { - buf = self - .tx - .write_all(store, Cursor::new(buf)) - .await - .into_inner(); + buf = tx.write_all(Cursor::new(buf)).await.into_inner(); buf.clear(); } Err(_err) => { @@ -48,7 +46,7 @@ where } struct OutputTask { - rx: StreamReader, + rx: StreamReader, tx: T, } @@ -59,8 +57,9 @@ where { async fn run(mut self, store: &Accessor) -> wasmtime::Result<()> { let mut buf = BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY); - while !self.rx.is_closed() { - buf = self.rx.read(store, buf).await; + let mut rx = GuardedStreamReader::new(store, self.rx); + while !rx.is_closed() { + buf = rx.read(buf).await; match self.tx.write_all(&buf).await { Ok(()) => { buf.clear(); @@ -140,18 +139,18 @@ impl terminal_stderr::Host for WasiCliCtxView<'_> { } impl stdin::HostWithStore for WasiCli { - async fn get_stdin(store: &Accessor) -> wasmtime::Result> { + async fn get_stdin(store: &Accessor) -> wasmtime::Result> { store.with(|mut view| { let instance = view.instance(); let (tx, rx) = instance - .stream::<_, _, BytesMut>(&mut view) + .stream(&mut view) .context("failed to create stream")?; let stdin = view.get().ctx.stdin.reader(); view.spawn(InputTask { rx: Box::into_pin(stdin), tx, }); - Ok(rx.into()) + Ok(rx) }) } } @@ -161,13 +160,12 @@ impl stdin::Host for WasiCliCtxView<'_> {} impl stdout::HostWithStore for WasiCli { async fn set_stdout( store: &Accessor, - data: HostStream, + data: StreamReader, ) -> wasmtime::Result<()> { store.with(|mut view| { - let stdout = data.into_reader(&mut view); let tx = view.get().ctx.stdout.writer(); view.spawn(OutputTask { - rx: stdout, + rx: data, tx: Box::into_pin(tx), }); Ok(()) @@ -180,13 +178,12 @@ impl stdout::Host for WasiCliCtxView<'_> {} impl stderr::HostWithStore for WasiCli { async fn set_stderr( store: &Accessor, - data: HostStream, + data: StreamReader, ) -> wasmtime::Result<()> { store.with(|mut view| { - let stderr = data.into_reader(&mut view); let tx = view.get().ctx.stderr.writer(); view.spawn(OutputTask { - rx: stderr, + rx: data, tx: Box::into_pin(tx), }); Ok(()) diff --git a/crates/wasmtime/src/runtime/component/concurrent.rs b/crates/wasmtime/src/runtime/component/concurrent.rs index 13108033678e..043f44cc70c6 100644 --- a/crates/wasmtime/src/runtime/component/concurrent.rs +++ b/crates/wasmtime/src/runtime/component/concurrent.rs @@ -69,7 +69,8 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::fmt; use std::future::Future; use std::marker::PhantomData; -use std::mem::{self, MaybeUninit}; +use std::mem::{self, ManuallyDrop, MaybeUninit}; +use std::ops::DerefMut; use std::pin::{Pin, pin}; use std::ptr::{self, NonNull}; use std::slice; @@ -88,8 +89,9 @@ use wasmtime_environ::component::{ pub use abort::AbortHandle; pub use futures_and_streams::{ - ErrorContext, FutureReader, FutureWriter, HostFuture, HostStream, ReadBuffer, StreamReader, - StreamWriter, VecBuffer, Watch, WriteBuffer, + ErrorContext, FutureReader, FutureWriter, GuardedFutureReader, GuardedFutureWriter, + GuardedStreamReader, GuardedStreamWriter, ReadBuffer, StreamReader, StreamWriter, VecBuffer, + WriteBuffer, }; pub(crate) use futures_and_streams::{ ResourcePair, lower_error_context_to_index, lower_future_to_index, lower_stream_to_index, @@ -100,7 +102,7 @@ mod error_contexts; mod futures_and_streams; mod states; mod table; -mod tls; +pub(crate) mod tls; /// Constant defined in the Component Model spec to indicate that the async /// intrinsic (e.g. `future.write`) has not yet completed. @@ -228,7 +230,7 @@ where where T: 'static, { - self.accessor.instance.spawn_with_accessor( + self.accessor.instance.unwrap().spawn_with_accessor( self.store.as_context_mut(), self.accessor.clone_for_spawn(), task, @@ -328,7 +330,7 @@ where { token: StoreToken, get_data: fn(&mut T) -> D::Data<'_>, - instance: Instance, + instance: Option, } /// A helper trait to take any type of accessor-with-data in functions. @@ -415,7 +417,7 @@ impl Accessor { /// /// - `instance`: used to access the `Instance` to which this `Accessor` /// (and the future which closes over it) belongs - fn new(token: StoreToken, instance: Instance) -> Self { + pub(crate) fn new(token: StoreToken, instance: Option) -> Self { Self { token, get_data: |x| x, @@ -506,7 +508,7 @@ where where T: 'static, { - let instance = self.instance; + let instance = self.instance.unwrap(); let accessor = self.clone_for_spawn(); self.with(|mut access| { instance.spawn_with_accessor(access.as_context_mut(), accessor, task) @@ -515,7 +517,7 @@ where /// Retrieve the component instance of the caller. pub fn instance(&self) -> Instance { - self.instance + self.instance.unwrap() } fn clone_for_spawn(&self) -> Self { @@ -1291,11 +1293,33 @@ impl Instance { let mut store = store.as_context_mut(); let token = StoreToken::new(store.as_context_mut()); - self.poll_until(store.as_context_mut(), async move { - let accessor = Accessor::new(token, self); - fun(&accessor).await - }) - .await + struct Dropper<'a, T: 'static, V> { + store: StoreContextMut<'a, T>, + value: ManuallyDrop, + } + + impl<'a, T, V> Drop for Dropper<'a, T, V> { + fn drop(&mut self) { + tls::set(self.store.0.traitobj_mut(), || { + // SAFETY: Here we drop the value without moving it for the + // first and only time -- per the contract for `Drop::drop`, + // this code won't run again, and the `value` field will no + // longer be accessible. + unsafe { ManuallyDrop::drop(&mut self.value) } + }); + } + } + + let accessor = &Accessor::new(token, Some(self)); + let dropper = &mut Dropper { + store, + value: ManuallyDrop::new(fun(accessor)), + }; + // SAFETY: We never move `dropper` nor its `value` field. + let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) }; + + self.poll_until(dropper.store.as_context_mut(), future) + .await } /// Spawn a background task to run as part of this instance's event loop. @@ -1313,7 +1337,7 @@ impl Instance { task: impl AccessorTask, Result<()>>, ) -> AbortHandle { let mut store = store.as_context_mut(); - let accessor = Accessor::new(StoreToken::new(store.as_context_mut()), self); + let accessor = Accessor::new(StoreToken::new(store.as_context_mut()), Some(self)); self.spawn_with_accessor(store, accessor, task) } @@ -1352,10 +1376,8 @@ impl Instance { async fn poll_until( self, store: StoreContextMut<'_, T>, - future: impl Future, + mut future: Pin<&mut impl Future>, ) -> Result { - let mut future = pin!(future); - loop { // Take `ConcurrentState::futures` out of the instance so we can // poll it while also safely giving any of the futures inside access @@ -1408,34 +1430,49 @@ impl Instance { // outer loop in case there is another one ready to // complete. Poll::Ready(true) => Poll::Ready(Ok(Either::Right(Vec::new()))), - // In this case, there are no more pending futures - // in `ConcurrentState::futures`, there are no - // remaining work items, _and_ the future we were - // passed as an argument still hasn't completed, - // meaning we're stuck, so we return an error. The - // underlying assumption is that `future` depends on - // this component instance making such progress, and - // thus there's no point in continuing to poll it - // given we've run out of work to do. - // - // Note that we'd also reach this point if the host - // embedder passed e.g. a `std::future::Pending` to - // `Instance::run_concurrent`, in which case we'd - // return a "deadlock" error even when any and all - // tasks have completed normally. However, that's - // not how `Instance::run_concurrent` is intended - // (and documented) to be used, so it seems - // reasonable to lump that case in with "real" - // deadlocks. - // - // TODO: Once we've added host APIs for cancelling - // in-progress tasks, we can return some other, - // non-error value here, treating it as "normal" and - // giving the host embedder a chance to intervene by - // cancelling one or more tasks and/or starting new - // tasks capable of waking the existing ones. Poll::Ready(false) => { - Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock))) + // Poll the future we were passed one last time + // in case one of `ConcurrentState::futures` had + // the side effect of unblocking it. + if let Poll::Ready(value) = + self.set_tls(store.0, || future.as_mut().poll(cx)) + { + Poll::Ready(Ok(Either::Left(value))) + } else { + // In this case, there are no more pending + // futures in `ConcurrentState::futures`, + // there are no remaining work items, _and_ + // the future we were passed as an argument + // still hasn't completed, meaning we're + // stuck, so we return an error. The + // underlying assumption is that `future` + // depends on this component instance making + // such progress, and thus there's no point + // in continuing to poll it given we've run + // out of work to do. + // + // Note that we'd also reach this point if + // the host embedder passed e.g. a + // `std::future::Pending` to + // `Instance::run_concurrent`, in which case + // we'd return a "deadlock" error even when + // any and all tasks have completed + // normally. However, that's not how + // `Instance::run_concurrent` is intended + // (and documented) to be used, so it seems + // reasonable to lump that case in with + // "real" deadlocks. + // + // TODO: Once we've added host APIs for + // cancelling in-progress tasks, we can + // return some other, non-error value here, + // treating it as "normal" and giving the + // host embedder a chance to intervene by + // cancelling one or more tasks and/or + // starting new tasks capable of waking the + // existing ones. + Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock))) + } } // There is at least one pending future in // `ConcurrentState::futures` and we have nothing @@ -2445,7 +2482,7 @@ impl Instance { { let token = StoreToken::new(store); async move { - let mut accessor = Accessor::new(token, self); + let mut accessor = Accessor::new(token, Some(self)); closure(&mut accessor).await } } @@ -3224,6 +3261,14 @@ pub trait VMComponentAsyncStore { address: u32, ) -> Result; + /// The `future.drop-writable` intrinsic. + fn future_drop_writable( + &mut self, + instance: Instance, + ty: TypeFutureTableIndex, + writer: u32, + ) -> Result<()>; + /// The `stream.write` intrinsic. fn stream_write( &mut self, @@ -3274,6 +3319,14 @@ pub trait VMComponentAsyncStore { count: u32, ) -> Result; + /// The `stream.drop-writable` intrinsic. + fn stream_drop_writable( + &mut self, + instance: Instance, + ty: TypeStreamTableIndex, + writer: u32, + ) -> Result<()>; + /// The `error-context.debug-message` intrinsic. fn error_context_debug_message( &mut self, @@ -3472,6 +3525,15 @@ impl VMComponentAsyncStore for StoreInner { .map(|result| result.encode()) } + fn future_drop_writable( + &mut self, + instance: Instance, + ty: TypeFutureTableIndex, + writer: u32, + ) -> Result<()> { + instance.guest_drop_writable(StoreContextMut(self), TableIndex::Future(ty), writer) + } + fn flat_stream_write( &mut self, instance: Instance, @@ -3526,6 +3588,15 @@ impl VMComponentAsyncStore for StoreInner { .map(|result| result.encode()) } + fn stream_drop_writable( + &mut self, + instance: Instance, + ty: TypeStreamTableIndex, + writer: u32, + ) -> Result<()> { + instance.guest_drop_writable(StoreContextMut(self), TableIndex::Stream(ty), writer) + } + fn error_context_debug_message( &mut self, instance: Instance, @@ -3545,7 +3616,7 @@ impl VMComponentAsyncStore for StoreInner { } /// Represents the output of a host task or background task. -enum HostTaskOutput { +pub(crate) enum HostTaskOutput { /// A plain result Result(Result<()>), /// A function to be run after the future completes (e.g. post-processing @@ -4185,7 +4256,7 @@ impl ConcurrentState { } } - /// Take ownership of any fibers owned by this object. + /// Take ownership of any fibers and futures owned by this object. /// /// This should be used when disposing of the `Store` containing this object /// in order to gracefully resolve any and all fibers using @@ -4193,33 +4264,58 @@ impl ConcurrentState { /// use-after-free bugs due to fibers which may still have access to the /// `Store`. /// + /// Additionally, the futures collected with this function should be dropped + /// within a `tls::set` call, which will ensure than any futures closing + /// over an `&Accessor` will have access to the store when dropped, allowing + /// e.g. `WithAccessor[AndValue]` instances to be disposed of without + /// panicking. + /// /// Note that this will leave the object in an inconsistent and unusable /// state, so it should only be used just prior to dropping it. - pub(crate) fn take_fibers(&mut self, vec: &mut Vec>) { + pub(crate) fn take_fibers_and_futures( + &mut self, + fibers: &mut Vec>, + futures: &mut Vec>, + ) { for entry in mem::take(&mut self.table) { if let Ok(set) = entry.downcast::() { for mode in set.waiting.into_values() { if let WaitMode::Fiber(fiber) = mode { - vec.push(fiber); + fibers.push(fiber); } } } } if let Some(fiber) = self.worker.take() { - vec.push(fiber); + fibers.push(fiber); } let mut take_items = |list| { for item in mem::take(list) { - if let WorkItem::ResumeFiber(fiber) = item { - vec.push(fiber); + match item { + WorkItem::ResumeFiber(fiber) => { + fibers.push(fiber); + } + WorkItem::PushFuture(future) => { + self.futures + .get_mut() + .unwrap() + .as_mut() + .unwrap() + .push(future.into_inner().unwrap()); + } + _ => {} } } }; take_items(&mut self.high_priority); take_items(&mut self.low_priority); + + if let Some(them) = self.futures.get_mut().unwrap().take() { + futures.push(them); + } } } diff --git a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs index 8f495b99138a..74fb17c5ea9e 100644 --- a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs +++ b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs @@ -1,30 +1,30 @@ use super::table::{TableDebug, TableId}; use super::{ - Event, GlobalErrorContextRefCount, HostTaskOutput, LocalErrorContextRefCount, StateTable, - Waitable, WaitableCommon, WaitableState, + Event, GlobalErrorContextRefCount, LocalErrorContextRefCount, StateTable, Waitable, + WaitableCommon, WaitableState, }; -use crate::component::concurrent::{ConcurrentState, tls}; +use crate::component::concurrent::ConcurrentState; use crate::component::func::{self, LiftContext, LowerContext, Options}; use crate::component::matching::InstanceType; use crate::component::values::{ErrorContextAny, FutureAny, StreamAny}; -use crate::component::{AsAccessor, Instance, Lower, Val, WasmList, WasmStr}; +use crate::component::{ + Accessor, AsAccessor, HasData, HasSelf, Instance, Lower, Val, WasmList, WasmStr, +}; use crate::store::{StoreOpaque, StoreToken}; use crate::vm::VMStore; use crate::{AsContextMut, StoreContextMut, ValRaw}; use anyhow::{Context, Result, anyhow, bail}; use buffers::Extender; use buffers::UntypedWriteBuffer; -use futures::channel::{mpsc, oneshot}; -use futures::future::{self, FutureExt}; -use futures::stream::StreamExt; +use futures::channel::oneshot; use std::boxed::Box; use std::fmt; -use std::future::Future; +use std::future; use std::iter; use std::marker::PhantomData; -use std::mem::{self, MaybeUninit}; +use std::mem::{self, ManuallyDrop, MaybeUninit}; use std::string::{String, ToString}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::task::{Poll, Waker}; use std::vec::Vec; use wasmtime_environ::component::{ @@ -177,157 +177,193 @@ fn waitable_state(ty: TableIndex, state: StreamFutureState) -> WaitableState { } } -/// Return a closure which matches a host write operation to a read (or drop) -/// operation. -/// -/// This may be used when the host initiates a write but there is no read -/// pending at the other end, in which case we construct a -/// `WriteState::HostReady` using the closure created here and leave it in -/// `TransmitState::write` for the reader to find and call when it's ready. +/// Complete a write initiated by a host-owned future or stream by matching it +/// with the specified `Reader`. fn accept_reader, U: 'static>( - store: StoreContextMut, + mut store: StoreContextMut, + instance: Instance, + reader: Reader, mut buffer: B, - tx: oneshot::Sender>, kind: TransmitKind, -) -> impl FnOnce(&mut dyn VMStore, Instance, Reader) -> Result -+ Send -+ Sync -+ 'static -+ use { - let token = StoreToken::new(store); - move |store, instance, reader| { - let code = match reader { - Reader::Guest { - options, - ty, - address, - count, - } => { - let mut store = token.as_context_mut(store); - let types = instance.id().get(store.0).component().types().clone(); - let count = buffer.remaining().len().min(count); - - let lower = - &mut LowerContext::new(store.as_context_mut(), options, &types, instance); - if address % usize::try_from(T::ALIGN32)? != 0 { - bail!("read pointer not aligned"); - } - lower - .as_slice_mut() - .get_mut(address..) - .and_then(|b| b.get_mut(..T::SIZE32 * count)) - .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?; - - if let Some(ty) = payload(ty, &types) { - T::linear_store_list_to_memory( - lower, - ty, - address, - &buffer.remaining()[..count], - )?; - } - - buffer.skip(count); - _ = tx.send(HostResult { +) -> Result<(HostResult, ReturnCode)> { + Ok(match reader { + Reader::Guest { + options, + ty, + address, + count, + } => { + let types = instance.id().get(store.0).component().types().clone(); + let count = buffer.remaining().len().min(count); + + let lower = &mut LowerContext::new(store.as_context_mut(), options, &types, instance); + if address % usize::try_from(T::ALIGN32)? != 0 { + bail!("read pointer not aligned"); + } + lower + .as_slice_mut() + .get_mut(address..) + .and_then(|b| b.get_mut(..T::SIZE32 * count)) + .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?; + + if let Some(ty) = payload(ty, &types) { + T::linear_store_list_to_memory(lower, ty, address, &buffer.remaining()[..count])?; + } + + buffer.skip(count); + ( + HostResult { buffer, dropped: false, - }); - ReturnCode::completed(kind, count.try_into().unwrap()) - } - Reader::Host { accept } => { - let count = buffer.remaining().len(); - let mut untyped = UntypedWriteBuffer::new(&mut buffer); - let count = accept(&mut untyped, count); - _ = tx.send(HostResult { + }, + ReturnCode::completed(kind, count.try_into().unwrap()), + ) + } + Reader::Host { accept } => { + let count = buffer.remaining().len(); + let mut untyped = UntypedWriteBuffer::new(&mut buffer); + let count = accept(&mut untyped, count); + ( + HostResult { buffer, dropped: false, - }); - ReturnCode::completed(kind, count.try_into().unwrap()) - } - Reader::End => { - _ = tx.send(HostResult { - buffer, - dropped: true, - }); - ReturnCode::Dropped(0) - } - }; - - Ok(code) - } + }, + ReturnCode::completed(kind, count.try_into().unwrap()), + ) + } + Reader::End => ( + HostResult { + buffer, + dropped: true, + }, + ReturnCode::Dropped(0), + ), + }) } -/// Return a closure which matches a host read operation to a write (or drop) -/// operation. -/// -/// This may be used when the host initiates a read but there is no write -/// pending at the other end, in which case we construct a -/// `ReadState::HostReady` using the closure created here and leave it in -/// `TransmitState::read` for the writer to find and call when it's ready. +/// Complete a read initiated by a host-owned future or stream by matching it with the +/// specified `Writer`. fn accept_writer, U>( + writer: Writer, mut buffer: B, - tx: oneshot::Sender>, kind: TransmitKind, -) -> impl FnOnce(Writer) -> Result + Send + Sync + 'static { - move |writer| { - let count = match writer { - Writer::Guest { - lift, - ty, - address, - count, - } => { - let count = count.min(buffer.remaining_capacity()); - if T::IS_RUST_UNIT_TYPE { - // SAFETY: `T::IS_RUST_UNIT_TYPE` is only true for `()`, a - // zero-sized type, so `MaybeUninit::uninit().assume_init()` - // is a valid way to populate the zero-sized buffer. - buffer.extend( - iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }) - .take(count), - ) - } else { - let ty = ty.unwrap(); - if address % usize::try_from(T::ALIGN32)? != 0 { - bail!("write pointer not aligned"); - } - lift.memory() - .get(address..) - .and_then(|b| b.get(..T::SIZE32 * count)) - .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?; - - let list = &WasmList::new(address, count, lift, ty)?; - T::linear_lift_into_from_memory(lift, list, &mut Extender(&mut buffer))? +) -> Result<(HostResult, ReturnCode)> { + Ok(match writer { + Writer::Guest { + lift, + ty, + address, + count, + } => { + let count = count.min(buffer.remaining_capacity()); + if T::IS_RUST_UNIT_TYPE { + // SAFETY: `T::IS_RUST_UNIT_TYPE` is only true for `()`, a + // zero-sized type, so `MaybeUninit::uninit().assume_init()` + // is a valid way to populate the zero-sized buffer. + buffer.extend( + iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }) + .take(count), + ) + } else { + let ty = ty.unwrap(); + if address % usize::try_from(T::ALIGN32)? != 0 { + bail!("write pointer not aligned"); } - _ = tx.send(HostResult { - buffer, - dropped: false, - }); - ReturnCode::completed(kind, count.try_into().unwrap()) + lift.memory() + .get(address..) + .and_then(|b| b.get(..T::SIZE32 * count)) + .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?; + + let list = &WasmList::new(address, count, lift, ty)?; + T::linear_lift_into_from_memory(lift, list, &mut Extender(&mut buffer))? } - Writer::Host { - buffer: input, - count, - } => { - let count = count.min(buffer.remaining_capacity()); - buffer.move_from(input.get_mut::(), count); - _ = tx.send(HostResult { + ( + HostResult { buffer, dropped: false, - }); - ReturnCode::completed(kind, count.try_into().unwrap()) - } - Writer::End => { - _ = tx.send(HostResult { + }, + ReturnCode::completed(kind, count.try_into().unwrap()), + ) + } + Writer::Host { + buffer: input, + count, + } => { + let count = count.min(buffer.remaining_capacity()); + buffer.move_from(input.get_mut::(), count); + ( + HostResult { buffer, - dropped: true, - }); - ReturnCode::Dropped(0) - } - }; + dropped: false, + }, + ReturnCode::completed(kind, count.try_into().unwrap()), + ) + } + Writer::End => ( + HostResult { + buffer, + dropped: true, + }, + ReturnCode::Dropped(0), + ), + }) +} - Ok(count) - } +/// Return a `Future` which will resolve once the reader end corresponding to +/// the specified writer end of a future or stream is dropped. +async fn watch_reader(accessor: impl AsAccessor, instance: Instance, id: TableId) { + future::poll_fn(|cx| { + accessor + .as_accessor() + .with(|mut access| { + let concurrent_state = instance.concurrent_state_mut(access.as_context_mut().0); + let state_id = concurrent_state.get(id)?.state; + let state = concurrent_state.get_mut(state_id)?; + anyhow::Ok(if matches!(&state.read, ReadState::Dropped) { + Poll::Ready(()) + } else { + state.reader_watcher = Some(cx.waker().clone()); + Poll::Pending + }) + }) + .unwrap_or(Poll::Ready(())) + }) + .await +} + +/// Return a `Future` which will resolve once the writer end corresponding to +/// the specified reader end of a future or stream is dropped. +async fn watch_writer(accessor: impl AsAccessor, instance: Instance, id: TableId) { + future::poll_fn(|cx| { + accessor + .as_accessor() + .with(|mut access| { + let concurrent_state = instance.concurrent_state_mut(access.as_context_mut().0); + let state_id = concurrent_state.get(id)?.state; + let state = concurrent_state.get_mut(state_id)?; + anyhow::Ok( + if matches!( + &state.write, + WriteState::Dropped + | WriteState::GuestReady { + post_write: PostWrite::Drop, + .. + } + | WriteState::HostReady { + post_write: PostWrite::Drop, + .. + } + ) { + Poll::Ready(()) + } else { + state.writer_watcher = Some(cx.waker().clone()); + Poll::Pending + }, + ) + }) + .unwrap_or(Poll::Ready(())) + }) + .await } /// Represents the state of a stream or future handle from the perspective of a @@ -367,136 +403,78 @@ pub(super) struct FlatAbi { pub(super) align: u32, } -/// Represents a pending event on a host-owned write end of a stream or future. -/// -/// See `ComponentInstance::start_write_event_loop` for details. -enum WriteEvent { - /// Write the items in the specified buffer to the stream or future, and - /// return the result via the specified `Sender`. - Write { - buffer: B, - tx: oneshot::Sender>, - }, - /// Drop the write end of the stream or future. - Drop(Option B + Send + Sync>>), - /// Watch the read (i.e. opposite) end of this stream or future, dropping - /// the specified sender when it is dropped. - Watch { tx: oneshot::Sender<()> }, -} +/// Trait representing objects (such as streams, futures, or structs containing +/// them) which require access to the store in order to be disposed of properly. +trait DropWithStore: Sized { + /// Dispose of `self` using the specified store. + fn drop(&mut self, store: impl AsContextMut) -> Result<()>; -/// Represents a pending event on a host-owned read end of a stream or future. -/// -/// See `ComponentInstance::start_read_event_loop` for details. -enum ReadEvent { - /// Read as many items as the specified buffer will hold from the stream or - /// future, and return the result via the specified `Sender`. - Read { - buffer: B, - tx: oneshot::Sender>, - }, - /// Drop the read end of the stream or future. - Drop, - /// Watch the write (i.e. opposite) end of this stream or future, dropping - /// the specified sender when it is dropped. - Watch { tx: oneshot::Sender<()> }, -} - -/// Send the specified value to the specified `Sender`. -/// -/// This will panic if there is no room in the channel's buffer, so it should -/// only be used in a context where there is at least one empty spot in the -/// buffer. It will silently ignore any other error (e.g. if the `Receiver` has -/// been dropped). -fn send(tx: &mut mpsc::Sender, value: T) { - if let Err(e) = tx.try_send(value) { - if e.is_full() { - unreachable!(); - } + /// Dispose of `self` using the specified accessor. + fn drop_with(&mut self, accessor: impl AsAccessor) -> Result<()> { + accessor.as_accessor().with(|store| self.drop(store)) } } -/// Wrapper struct which may be converted to the inner value as needed. +/// RAII wrapper for `DropWithStore` implementations. /// -/// This object is normally paired with a `Future` which represents a state -/// change on the inner value, resolving when that state change happens _or_ -/// when the `Watch` is converted back into the inner value -- whichever happens -/// first. -pub struct Watch { - inner: T, - waker: Arc>, +/// This may be used to automatically dispose of the wrapped object when it goes +/// out of scope. +struct WithAccessor<'a, T: DropWithStore, U: 'static, D: HasData + ?Sized = HasSelf> { + accessor: &'a Accessor, + inner: ManuallyDrop, } -enum WatchState { - Idle, - Waiting(Waker), - Done, -} - -impl Watch { - /// Convert this object into its inner value. - /// - /// Calling this function will cause the associated `Future` to resolve - /// immediately if it hasn't already. - pub fn into_inner(self) -> T { - let state = mem::replace(&mut *self.waker.lock().unwrap(), WatchState::Done); - if let WatchState::Waiting(waker) = state { - waker.wake(); +impl<'a, T: DropWithStore, U, D: HasData + ?Sized> WithAccessor<'a, T, U, D> { + /// Create a new instance wrapping the specified `inner` object. + fn new(accessor: &'a Accessor, inner: T) -> Self { + Self { + accessor, + inner: ManuallyDrop::new(inner), } - self.inner } -} -/// Wrap the specified `oneshot::Receiver` in a future which resolves when -/// either that `Receiver` resolves or `Watch::into_inner` has been called on -/// the returned `Watch`. -fn watch( - instance: Instance, - mut rx: oneshot::Receiver<()>, - inner: T, -) -> (impl Future + Send + 'static, Watch) { - let waker = Arc::new(Mutex::new(WatchState::Idle)); - ( - super::checked( - instance, - future::poll_fn({ - let waker = waker.clone(); + fn into_parts(self) -> (&'a Accessor, T) { + let accessor = self.accessor; + let mut me = ManuallyDrop::new(self); + // SAFETY: We've wrapped `self` in a `ManuallyDrop` and will not use or + // drop it after we've moved the `inner` field out. + let inner = unsafe { ManuallyDrop::take(&mut me.inner) }; + (accessor, inner) + } +} - move |cx| { - if rx.poll_unpin(cx).is_ready() { - return Poll::Ready(()); - } - let mut state = waker.lock().unwrap(); - match *state { - WatchState::Done => Poll::Ready(()), - _ => { - *state = WatchState::Waiting(cx.waker().clone()); - Poll::Pending - } - } - } - }), - ), - Watch { waker, inner }, - ) +impl<'a, T: DropWithStore, U, D: HasData + ?Sized> Drop for WithAccessor<'a, T, U, D> { + fn drop(&mut self) { + // SAFETY: `Drop::drop` is called at most once and after which `self` + // can no longer be used, thus ensuring `self.inner` will no longer be + // used. + // + // Technically we could avoid `unsafe` here and just call + // `self.inner.drop_with` instead, but then `T` would never by dropped. + // As of this writing, we don't use types for `T` which implement `Drop` + // anyway, but that could change later. + _ = unsafe { ManuallyDrop::take(&mut self.inner) }.drop_with(self.accessor); + } } /// Represents the writable end of a Component Model `future`. -pub struct FutureWriter { - default: Option T>, +/// +/// Note that `FutureWriter` instances must be disposed of using either `write` +/// or `close`; otherwise the in-store representation will leak and the reader +/// end will hang indefinitely. Consider using [`GuardedFutureWriter`] to +/// ensure that disposal happens automatically. +pub struct FutureWriter { + default: fn() -> T, + id: TableId, instance: Instance, - tx: Option>>>, } impl FutureWriter { - fn new( - default: fn() -> T, - tx: Option>>>, - instance: Instance, - ) -> Self { + fn new(default: fn() -> T, id: TableId, instance: Instance) -> Self { Self { - default: Some(default), + default, + id, instance, - tx, } } @@ -510,28 +488,22 @@ impl FutureWriter { /// /// Panics if the store that the [`Accessor`] is derived from does not own /// this future. - pub async fn write(mut self, accessor: impl AsAccessor, value: T) -> bool + pub async fn write(self, accessor: impl AsAccessor, value: T) -> bool where - T: Send + 'static, + T: func::Lower + Send + Sync + 'static, { - // FIXME: this is intended to be used in the future to directly - // manipulate state for this future within the store without having to - // go through an mpsc. - let _accessor = accessor.as_accessor(); - let (tx, rx) = oneshot::channel(); - send( - &mut self.tx.as_mut().unwrap(), - WriteEvent::Write { - buffer: Some(value), - tx, - }, - ); - self.default = None; - let v = rx.await; - drop(self); - match v { + let accessor = accessor.as_accessor(); + + let me = WithAccessor::new(accessor, self); + let result = me + .inner + .instance + .host_write_async(accessor, me.inner.id, Some(value), TransmitKind::Future) + .await; + + match result { Ok(HostResult { dropped, .. }) => !dropped, - Err(_) => todo!("guarantee buffer recovery if event loop errors or panics"), + Err(_) => todo!("guarantee buffer recovery if `host_write` fails"), } } @@ -544,79 +516,153 @@ impl FutureWriter { /// /// Panics if the store that the [`Accessor`] is derived from does not own /// this future. - pub async fn watch_reader(&mut self, accessor: impl AsAccessor) + pub async fn watch_reader(&mut self, accessor: impl AsAccessor) { + watch_reader(accessor, self.instance, self.id).await + } + + /// Close this `FutureWriter`, writing the default value. + pub fn close(mut self, store: impl AsContextMut) -> Result<()> + where + T: func::Lower + Send + Sync + 'static, + { + self.drop(store) + } + + /// Close this `FutureWriter`, writing the default value. + pub fn close_with(mut self, accessor: impl AsAccessor) -> Result<()> where - T: Send + 'static, + T: func::Lower + Send + Sync + 'static, { - // FIXME: this is intended to be used in the future to directly - // manipulate state for this future within the store without having to - // go through an mpsc. - let _accessor = accessor.as_accessor(); - let (tx, rx) = oneshot::channel(); - send(&mut self.tx.as_mut().unwrap(), WriteEvent::Watch { tx }); - let (future, _watch) = watch(self.instance, rx, ()); - future.await; + accessor.as_accessor().with(|access| self.drop(access)) } } -impl Drop for FutureWriter { - fn drop(&mut self) { - if let Some(mut tx) = self.tx.take() { - send( - &mut tx, - WriteEvent::Drop(self.default.take().map(|v| { - Box::new(move || Some(v())) - as Box Option + Send + Sync + 'static> - })), - ); - } +impl DropWithStore for FutureWriter { + fn drop(&mut self, mut store: impl AsContextMut) -> Result<()> { + // `self` should never be used again, but leave an invalid handle there just in case. + let id = mem::replace(&mut self.id, TableId::new(0)); + let default = self.default; + self.instance + .host_drop_writer(store.as_context_mut(), id, Some(&move || Ok(default()))) } } -/// Represents the readable end of a Component Model `future`. +/// A `FutureWriter` paired with an `Accessor`. /// -/// In order to actually read from or drop this `future`, first convert it to a -/// [`FutureReader`] using the `into_reader` method. +/// This is an RAII wrapper around `FutureWriter` that ensures it is closed when +/// dropped. +pub struct GuardedFutureWriter< + 'a, + T: func::Lower + Send + Sync + 'static, + U: 'static, + D: HasData + ?Sized = HasSelf, +>(WithAccessor<'a, FutureWriter, U, D>); + +impl<'a, T: func::Lower + Send + Sync + 'static, U: 'static, D: HasData + ?Sized> + GuardedFutureWriter<'a, T, U, D> +{ + /// Create a new `GuardedFutureWriter` with the specified `accessor` and `writer`. + pub fn new(accessor: &'a Accessor, writer: FutureWriter) -> Self { + Self(WithAccessor::new(accessor, writer)) + } + + /// Wrapper for `FutureWriter::write`. + pub async fn write(self, value: T) -> bool + where + T: func::Lower + Send + Sync + 'static, + { + let (accessor, writer) = self.0.into_parts(); + writer.write(accessor, value).await + } + + /// Wrapper for `FutureWriter::watch_reader`. + pub async fn watch_reader(&mut self) { + self.0.inner.watch_reader(self.0.accessor).await + } +} + +impl<'a, T: func::Lower + Send + Sync + 'static, U: 'static, D: HasData + ?Sized> + From> for FutureWriter +{ + fn from(writer: GuardedFutureWriter<'a, T, U, D>) -> Self { + writer.0.into_parts().1 + } +} + +/// Represents the readable end of a Component Model `future`. /// -/// Note that if a value of this type is dropped without either being converted -/// to a `FutureReader` or passed to the guest, any writes on the write end may -/// block forever. -pub struct HostFuture { +/// Note that `FutureReader` instances must be disposed of using either `read` +/// or `close`; otherwise the in-store representation will leak and the writer +/// end will hang indefinitely. Consider using [`GuardedFutureReader`] to +/// ensure that disposal happens automatically. +pub struct FutureReader { instance: Instance, - rep: u32, + id: TableId, _phantom: PhantomData, } -impl HostFuture { - /// Create a new `HostFuture`. - fn new(rep: u32, instance: Instance) -> Self { +impl FutureReader { + fn new(id: TableId, instance: Instance) -> Self { Self { instance, - rep, + id, _phantom: PhantomData, } } - /// Convert this object into a [`FutureReader`]. - pub fn into_reader(self, mut store: impl AsContextMut) -> FutureReader + /// Read the value from this `future`. + /// + /// The returned `Future` will yield `Err` if the guest has trapped + /// before it could produce a result. + /// + /// The [`Accessor`] provided can be acquired from [`Instance::run_concurrent`] or + /// from within a host function for example. + /// + /// # Panics + /// + /// Panics if the store that the [`Accessor`] is derived from does not own + /// this future. + pub async fn read(self, accessor: impl AsAccessor) -> Option where - T: func::Lower + func::Lift + Send + Sync + 'static, + T: func::Lift + Send + 'static, { - FutureReader { - instance: self.instance, - rep: self.rep, - tx: Some(self.instance.start_read_event_loop( - store.as_context_mut(), - self.rep, - TransmitKind::Future, - )), + let accessor = accessor.as_accessor(); + + let me = WithAccessor::new(accessor, self); + let result = me + .inner + .instance + .host_read_async(accessor, me.inner.id, None, TransmitKind::Future) + .await; + + if let Ok(HostResult { + mut buffer, + dropped: false, + }) = result + { + buffer.take() + } else { + None } } + /// Wait for the write end of this `future` to be dropped. + /// + /// The [`Accessor`] provided can be acquired from + /// [`Instance::run_concurrent`] or from within a host function for example. + /// + /// # Panics + /// + /// Panics if the store that the [`Accessor`] is derived from does not own + /// this future. + pub async fn watch_writer(&mut self, accessor: impl AsAccessor) { + watch_writer(accessor, self.instance, self.id).await; + } + /// Convert this `FutureReader` into a [`Val`]. // See TODO comment for `FutureAny`; this is prone to handle leakage. pub fn into_val(self) -> Val { - Val::Future(FutureAny(self.rep)) + Val::Future(FutureAny(self.id.rep())) } /// Attempt to convert the specified [`Val`] to a `FutureReader`. @@ -629,10 +675,9 @@ impl HostFuture { bail!("expected `future`; got `{}`", value.desc()); }; let store = store.as_context_mut(); - instance - .concurrent_state_mut(store.0) - .get(TableId::::new(*rep))?; // Just make sure it's present - Ok(Self::new(*rep, instance)) + let id = TableId::::new(*rep); + instance.concurrent_state_mut(store.0).get(id)?; // Just make sure it's present + Ok(Self::new(id, instance)) } /// Transfer ownership of the read end of a future from a guest to the host. @@ -654,26 +699,48 @@ impl HostFuture { StreamFutureState::Busy => bail!("cannot transfer busy future"), } + let id = TableId::::new(rep); let concurrent_state = cx.instance_mut().concurrent_state_mut(); - let state = concurrent_state - .get(TableId::::new(rep))? - .state; + let state = concurrent_state.get(id)?.state; if concurrent_state.get(state)?.done { bail!("cannot lift future after previous read succeeded"); } - Ok(Self::new(rep, cx.instance_handle())) + Ok(Self::new(id, cx.instance_handle())) } _ => func::bad_type_info(), } } + + /// Close this `FutureReader`. + pub fn close(mut self, store: impl AsContextMut) -> Result<()> { + self.drop(store) + } + + /// Close this `FutureReader`. + pub fn close_with(mut self, accessor: impl AsAccessor) -> Result<()> { + accessor.as_accessor().with(|access| self.drop(access)) + } } -impl fmt::Debug for HostFuture { +impl DropWithStore for FutureReader { + fn drop(&mut self, mut store: impl AsContextMut) -> Result<()> { + // `self` should never be used again, but leave an invalid handle there just in case. + let id = mem::replace(&mut self.id, TableId::new(0)); + self.instance.host_drop_reader( + store.as_context_mut().0.traitobj_mut(), + id, + TransmitKind::Future, + ) + } +} + +impl fmt::Debug for FutureReader { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("HostFuture") - .field("rep", &self.rep) + f.debug_struct("FutureReader") + .field("id", &self.id) + .field("instance", &self.instance) .finish() } } @@ -705,7 +772,7 @@ pub(crate) fn lower_future_to_index( // SAFETY: This relies on the `ComponentType` implementation for `u32` being // safe and correct since we lift and lower future handles as `u32`s. -unsafe impl func::ComponentType for HostFuture { +unsafe impl func::ComponentType for FutureReader { const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4; type Lower = ::Lower; @@ -719,14 +786,18 @@ unsafe impl func::ComponentType for HostFuture { } // SAFETY: See the comment on the `ComponentType` `impl` for this type. -unsafe impl func::Lower for HostFuture { +unsafe impl func::Lower for FutureReader { fn linear_lower_to_flat( &self, cx: &mut LowerContext<'_, U>, ty: InterfaceType, dst: &mut MaybeUninit, ) -> Result<()> { - lower_future_to_index(self.rep, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst) + lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat( + cx, + InterfaceType::U32, + dst, + ) } fn linear_lower_to_memory( @@ -735,7 +806,7 @@ unsafe impl func::Lower for HostFuture { ty: InterfaceType, offset: usize, ) -> Result<()> { - lower_future_to_index(self.rep, cx, ty)?.linear_lower_to_memory( + lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory( cx, InterfaceType::U32, offset, @@ -744,7 +815,7 @@ unsafe impl func::Lower for HostFuture { } // SAFETY: See the comment on the `ComponentType` `impl` for this type. -unsafe impl func::Lift for HostFuture { +unsafe impl func::Lift for FutureReader { fn linear_lift_from_flat( cx: &mut LiftContext<'_>, ty: InterfaceType, @@ -764,117 +835,63 @@ unsafe impl func::Lift for HostFuture { } } -impl From> for HostFuture { - fn from(mut value: FutureReader) -> Self { - value.tx.take(); - - Self { - instance: value.instance, - rep: value.rep, - _phantom: PhantomData, - } - } -} - -/// Represents the readable end of a Component Model `future`. +/// A `FutureReader` paired with an `Accessor`. /// -/// In order to pass this end to guest code, first convert it to a -/// [`HostFuture`] using the `into` method. -pub struct FutureReader { - instance: Instance, - rep: u32, - tx: Option>>>, -} +/// This is an RAII wrapper around `FutureReader` that ensures it is closed when +/// dropped. +pub struct GuardedFutureReader<'a, T, U: 'static, D: HasData + ?Sized = HasSelf>( + WithAccessor<'a, FutureReader, U, D>, +); -impl FutureReader { - fn new(rep: u32, tx: Option>>>, instance: Instance) -> Self { - Self { instance, rep, tx } +impl<'a, T, U: 'static, D: HasData + ?Sized> GuardedFutureReader<'a, T, U, D> { + /// Create a new `GuardedFutureReader` with the specified `accessor` and `reader`. + pub fn new(accessor: &'a Accessor, reader: FutureReader) -> Self { + Self(WithAccessor::new(accessor.as_accessor(), reader)) } - /// Read the value from this `future`. - /// - /// The returned `Future` will yield `None` if the guest has trapped - /// before it could produce a result. - /// - /// The [`Accessor`] provided can be acquired from [`Instance::run_concurrent`] or - /// from within a host function for example. - /// - /// # Panics - /// - /// Panics if the store that the [`Accessor`] is derived from does not own - /// this future. - pub async fn read(mut self, accessor: impl AsAccessor) -> Option + /// Wrapper for `FutureReader::read`. + pub async fn read(self) -> Option where - T: Send + 'static, + T: func::Lift + Send + Sync + 'static, { - // FIXME: this is intended to be used in the future to directly - // manipulate state for this future within the store without having to - // go through an mpsc. - let _accessor = accessor.as_accessor(); - let (tx, rx) = oneshot::channel(); - send( - &mut self.tx.as_mut().unwrap(), - ReadEvent::Read { buffer: None, tx }, - ); - let v = rx.await; - drop(self); - - if let Ok(HostResult { - mut buffer, - dropped: false, - }) = v - { - buffer.take() - } else { - None - } + let (accessor, reader) = self.0.into_parts(); + reader.read(accessor).await } - /// Wait for the write end of this `future` to be dropped. - /// - /// The [`Accessor`] provided can be acquired from - /// [`Instance::run_concurrent`] or from within a host function for example. - /// - /// # Panics - /// - /// Panics if the store that the [`Accessor`] is derived from does not own - /// this future. - pub async fn watch_writer(&mut self, accessor: impl AsAccessor) - where - T: Send + 'static, - { - // FIXME: this is intended to be used in the future to directly - // manipulate state for this future within the store without having to - // go through an mpsc. - let _accessor = accessor.as_accessor(); - let (tx, rx) = oneshot::channel(); - send(&mut self.tx.as_mut().unwrap(), ReadEvent::Watch { tx }); - let (future, _watch) = watch(self.instance, rx, ()); - future.await + /// Wrapper for `FutureReader::watch_writer`. + pub async fn watch_writer(&mut self) { + self.0.inner.watch_writer(self.0.accessor).await } } -impl Drop for FutureReader { - fn drop(&mut self) { - if let Some(mut tx) = self.tx.take() { - send(&mut tx, ReadEvent::Drop); - } +impl<'a, T, U: 'static, D: HasData + ?Sized> From> + for FutureReader +{ + fn from(reader: GuardedFutureReader<'a, T, U, D>) -> Self { + reader.0.into_parts().1 } } /// Represents the writable end of a Component Model `stream`. -pub struct StreamWriter { +/// +/// Note that `StreamWriter` instances must be disposed of using `close`; +/// otherwise the in-store representation will leak and the reader end will hang +/// indefinitely. Consider using [`GuardedStreamWriter`] to ensure that +/// disposal happens automatically. +pub struct StreamWriter { instance: Instance, + id: TableId, closed: bool, - tx: Option>>, + _phantom: PhantomData, } -impl StreamWriter { - fn new(tx: Option>>, instance: Instance) -> Self { +impl StreamWriter { + fn new(id: TableId, instance: Instance) -> Self { Self { instance, - tx, + id, closed: false, + _phantom: PhantomData, } } @@ -901,18 +918,22 @@ impl StreamWriter { /// /// Panics if the store that the [`Accessor`] is derived from does not own /// this future. - pub async fn write(&mut self, accessor: impl AsAccessor, buffer: B) -> B + pub async fn write(&mut self, accessor: impl AsAccessor, buffer: B) -> B where - B: Send + 'static, + T: func::Lower + 'static, + B: WriteBuffer, { - // FIXME: this is intended to be used in the future to directly - // manipulate state for this future within the store without having to - // go through an mpsc. - let _accessor = accessor.as_accessor(); - let (tx, rx) = oneshot::channel(); - send(self.tx.as_mut().unwrap(), WriteEvent::Write { buffer, tx }); - let v = rx.await; - match v { + let result = self + .instance + .host_write_async( + accessor.as_accessor(), + self.id, + buffer, + TransmitKind::Stream, + ) + .await; + + match result { Ok(HostResult { buffer, dropped }) => { if self.closed { debug_assert!(dropped); @@ -920,7 +941,7 @@ impl StreamWriter { self.closed = dropped; buffer } - Err(_) => todo!("guarantee buffer recovery if event loop errors or panics"), + Err(_) => todo!("guarantee buffer recovery if `host_write` fails"), } } @@ -936,8 +957,9 @@ impl StreamWriter { /// /// Panics if the store that the [`Accessor`] is derived from does not own /// this future. - pub async fn write_all(&mut self, accessor: impl AsAccessor, mut buffer: B) -> B + pub async fn write_all(&mut self, accessor: impl AsAccessor, mut buffer: B) -> B where + T: func::Lower + 'static, B: WriteBuffer, { let accessor = accessor.as_accessor(); @@ -953,78 +975,168 @@ impl StreamWriter { /// /// Panics if the store that the [`Accessor`] is derived from does not own /// this future. - pub async fn watch_reader(&mut self, accessor: impl AsAccessor) + pub async fn watch_reader(&mut self, accessor: impl AsAccessor) { + watch_reader(accessor, self.instance, self.id).await + } + + /// Close this `StreamWriter`. + pub fn close(mut self, store: impl AsContextMut) -> Result<()> { + self.drop(store) + } + + /// Close this `StreamWriter`. + pub fn close_with(mut self, accessor: impl AsAccessor) -> Result<()> { + accessor.as_accessor().with(|access| self.drop(access)) + } +} + +impl DropWithStore for StreamWriter { + fn drop(&mut self, mut store: impl AsContextMut) -> Result<()> { + // `self` should never be used again, but leave an invalid handle there just in case. + let id = mem::replace(&mut self.id, TableId::new(0)); + self.instance + .host_drop_writer(store.as_context_mut(), id, None::<&dyn Fn() -> Result<()>>) + } +} + +/// A `StreamWriter` paired with an `Accessor`. +/// +/// This is an RAII wrapper around `StreamWriter` that ensures it is closed when +/// dropped. +pub struct GuardedStreamWriter<'a, T, U: 'static, D: HasData + ?Sized = HasSelf>( + WithAccessor<'a, StreamWriter, U, D>, +); + +impl<'a, T, U: 'static, D: HasData + ?Sized> GuardedStreamWriter<'a, T, U, D> { + /// Create a new `GuardedStreamWriter` with the specified `accessor` and `writer`. + pub fn new(accessor: &'a Accessor, writer: StreamWriter) -> Self { + Self(WithAccessor::new(accessor.as_accessor(), writer)) + } + + /// Wrapper for `StreamWriter::is_closed` + pub fn is_closed(&self) -> bool { + self.0.inner.is_closed() + } + + /// Wrapper for `StreamWriter::write`. + pub async fn write(&mut self, buffer: B) -> B where - B: Send + 'static, + T: func::Lower + 'static, + B: WriteBuffer, { - // FIXME: this is intended to be used in the future to directly - // manipulate state for this future within the store without having to - // go through an mpsc. - let _accessor = accessor.as_accessor(); - let (tx, rx) = oneshot::channel(); - send(&mut self.tx.as_mut().unwrap(), WriteEvent::Watch { tx }); - let (future, _watch) = watch(self.instance, rx, ()); - future.await; + self.0.inner.write(self.0.accessor, buffer).await + } + + /// Wrapper for `StreamWriter::write_all`. + pub async fn write_all(&mut self, buffer: B) -> B + where + T: func::Lower + 'static, + B: WriteBuffer, + { + self.0.inner.write_all(self.0.accessor, buffer).await + } + + /// Wrapper for `StreamWriter::watch_reader`. + pub async fn watch_reader(&mut self) { + self.0.inner.watch_reader(self.0.accessor).await } } -impl Drop for StreamWriter { - fn drop(&mut self) { - if let Some(mut tx) = self.tx.take() { - send(&mut tx, WriteEvent::Drop(None)); - } +impl<'a, T, U: 'static, D: HasData + ?Sized> From> + for StreamWriter +{ + fn from(writer: GuardedStreamWriter<'a, T, U, D>) -> Self { + writer.0.into_parts().1 } } /// Represents the readable end of a Component Model `stream`. /// -/// In order to actually read from or drop this `stream`, first convert it to a -/// [`FutureReader`] using the `into_reader` method. -/// -/// Note that if a value of this type is dropped without either being converted -/// to a `StreamReader` or passed to the guest, any writes on the write end may -/// block forever. -pub struct HostStream { +/// Note that `StreamReader` instances must be disposed of using `close`; +/// otherwise the in-store representation will leak and the writer end will hang +/// indefinitely. Consider using [`GuardedStreamReader`] to ensure that +/// disposal happens automatically. +pub struct StreamReader { instance: Instance, - rep: u32, + id: TableId, + closed: bool, _phantom: PhantomData, } -impl HostStream { - /// Create a new `HostStream`. - fn new(rep: u32, instance: Instance) -> Self { +impl StreamReader { + fn new(id: TableId, instance: Instance) -> Self { Self { instance, - rep, + id, + closed: false, _phantom: PhantomData, } } - /// Convert this object into a [`StreamReader`]. - pub fn into_reader(self, mut store: impl AsContextMut) -> StreamReader + /// Returns whether this stream is "closed" meaning that the other end of + /// the stream has been dropped. + pub fn is_closed(&self) -> bool { + self.closed + } + + /// Read values from this `stream`. + /// + /// The returned `Future` will yield a `(Some(_), _)` if the read completed + /// (possibly with zero items if the write was empty). It will return + /// `(None, _)` if the read failed due to the closure of the write end. In + /// either case, the returned buffer will be the same one passed as a + /// parameter, with zero or more items added. + /// + /// # Panics + /// + /// Panics if the store that the [`Accessor`] is derived from does not own + /// this future. + pub async fn read(&mut self, accessor: impl AsAccessor, buffer: B) -> B where - T: func::Lower + func::Lift + Send + 'static, - B: ReadBuffer, + T: func::Lift + 'static, + B: ReadBuffer + Send + 'static, { - StreamReader { - instance: self.instance, - rep: self.rep, - tx: Some(self.instance.start_read_event_loop( - store.as_context_mut(), - self.rep, + let result = self + .instance + .host_read_async( + accessor.as_accessor(), + self.id, + buffer, TransmitKind::Stream, - )), - closed: false, + ) + .await; + + match result { + Ok(HostResult { buffer, dropped }) => { + if self.closed { + debug_assert!(dropped); + } + self.closed = dropped; + buffer + } + Err(_) => { + todo!("guarantee buffer recovery if `host_read` fails") + } } } - /// Convert this `HostStream` into a [`Val`]. + /// Wait until the write end of this `stream` is dropped. + /// + /// # Panics + /// + /// Panics if the store that the [`Accessor`] is derived from does not own + /// this future. + pub async fn watch_writer(&mut self, accessor: impl AsAccessor) { + watch_writer(accessor, self.instance, self.id).await + } + + /// Convert this `StreamReader` into a [`Val`]. // See TODO comment for `StreamAny`; this is prone to handle leakage. pub fn into_val(self) -> Val { - Val::Stream(StreamAny(self.rep)) + Val::Stream(StreamAny(self.id.rep())) } - /// Attempt to convert the specified [`Val`] to a `HostStream`. + /// Attempt to convert the specified [`Val`] to a `StreamReader`. pub fn from_val( mut store: impl AsContextMut, instance: Instance, @@ -1034,10 +1146,9 @@ impl HostStream { bail!("expected `stream`; got `{}`", value.desc()); }; let store = store.as_context_mut(); - instance - .concurrent_state_mut(store.0) - .get(TableId::::new(*rep))?; // Just make sure it's present - Ok(Self::new(*rep, instance)) + let id = TableId::::new(*rep); + instance.concurrent_state_mut(store.0).get(id)?; // Just make sure it's present + Ok(Self::new(id, instance)) } /// Transfer ownership of the read end of a stream from a guest to the host. @@ -1062,17 +1173,42 @@ impl HostStream { StreamFutureState::Busy => bail!("cannot transfer busy stream"), } - Ok(Self::new(rep, cx.instance_handle())) + let id = TableId::::new(rep); + + Ok(Self::new(id, cx.instance_handle())) } _ => func::bad_type_info(), } } + + /// Close this `StreamReader`. + pub fn close(mut self, store: impl AsContextMut) -> Result<()> { + self.drop(store) + } + + /// Close this `StreamReader`. + pub fn close_with(mut self, accessor: impl AsAccessor) -> Result<()> { + accessor.as_accessor().with(|access| self.drop(access)) + } +} + +impl DropWithStore for StreamReader { + fn drop(&mut self, mut store: impl AsContextMut) -> Result<()> { + // `self` should never be used again, but leave an invalid handle there just in case. + let id = mem::replace(&mut self.id, TableId::new(0)); + self.instance.host_drop_reader( + store.as_context_mut().0.traitobj_mut(), + id, + TransmitKind::Stream, + ) + } } -impl fmt::Debug for HostStream { +impl fmt::Debug for StreamReader { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("HostStream") - .field("rep", &self.rep) + f.debug_struct("StreamReader") + .field("id", &self.id) + .field("instance", &self.instance) .finish() } } @@ -1104,7 +1240,7 @@ pub(crate) fn lower_stream_to_index( // SAFETY: This relies on the `ComponentType` implementation for `u32` being // safe and correct since we lift and lower stream handles as `u32`s. -unsafe impl func::ComponentType for HostStream { +unsafe impl func::ComponentType for StreamReader { const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4; type Lower = ::Lower; @@ -1118,14 +1254,18 @@ unsafe impl func::ComponentType for HostStream { } // SAFETY: See the comment on the `ComponentType` `impl` for this type. -unsafe impl func::Lower for HostStream { +unsafe impl func::Lower for StreamReader { fn linear_lower_to_flat( &self, cx: &mut LowerContext<'_, U>, ty: InterfaceType, dst: &mut MaybeUninit, ) -> Result<()> { - lower_stream_to_index(self.rep, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst) + lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat( + cx, + InterfaceType::U32, + dst, + ) } fn linear_lower_to_memory( @@ -1134,7 +1274,7 @@ unsafe impl func::Lower for HostStream { ty: InterfaceType, offset: usize, ) -> Result<()> { - lower_stream_to_index(self.rep, cx, ty)?.linear_lower_to_memory( + lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory( cx, InterfaceType::U32, offset, @@ -1143,7 +1283,7 @@ unsafe impl func::Lower for HostStream { } // SAFETY: See the comment on the `ComponentType` `impl` for this type. -unsafe impl func::Lift for HostStream { +unsafe impl func::Lift for StreamReader { fn linear_lift_from_flat( cx: &mut LiftContext<'_>, ty: InterfaceType, @@ -1157,114 +1297,51 @@ unsafe impl func::Lift for HostStream { cx: &mut LiftContext<'_>, ty: InterfaceType, bytes: &[u8], - ) -> Result { - let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?; - Self::lift_from_index(cx, ty, index) - } -} - -impl From> for HostStream { - fn from(mut value: StreamReader) -> Self { - value.tx.take(); - - Self { - instance: value.instance, - rep: value.rep, - _phantom: PhantomData, - } + ) -> Result { + let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?; + Self::lift_from_index(cx, ty, index) } } -/// Represents the readable end of a Component Model `stream`. +/// A `StreamReader` paired with an `Accessor`. /// -/// In order to pass this end to guest code, first convert it to a -/// [`HostStream`] using the `into` method. -pub struct StreamReader { - instance: Instance, - rep: u32, - tx: Option>>, - closed: bool, -} +/// This is an RAII wrapper around `StreamReader` that ensures it is closed when +/// dropped. +pub struct GuardedStreamReader<'a, T, U: 'static, D: HasData + ?Sized = HasSelf>( + WithAccessor<'a, StreamReader, U, D>, +); -impl StreamReader { - fn new(rep: u32, tx: Option>>, instance: Instance) -> Self { - Self { - instance, - rep, - tx, - closed: false, - } +impl<'a, T, U: 'static, D: HasData + ?Sized> GuardedStreamReader<'a, T, U, D> { + /// Create a new `GuardedStreamReader` with the specified `accessor` and `reader`. + pub fn new(accessor: &'a Accessor, reader: StreamReader) -> Self { + Self(WithAccessor::new(accessor.as_accessor(), reader)) } - /// Returns whether this stream is "closed" meaning that the other end of - /// the stream has been dropped. + /// Wrapper for `StreamReader::is_closed` pub fn is_closed(&self) -> bool { - self.closed + self.0.inner.is_closed() } - /// Read values from this `stream`. - /// - /// The returned `Future` will yield a `(Some(_), _)` if the read completed - /// (possibly with zero items if the write was empty). It will return - /// `(None, _)` if the read failed due to the closure of the write end. In - /// either case, the returned buffer will be the same one passed as a - /// parameter, with zero or more items added. - /// - /// # Panics - /// - /// Panics if the store that the [`Accessor`] is derived from does not own - /// this future. - pub async fn read(&mut self, accessor: impl AsAccessor, buffer: B) -> B + /// Wrapper for `StreamReader::read`. + pub async fn read(&mut self, buffer: B) -> B where - B: Send + 'static, + T: func::Lift + 'static, + B: ReadBuffer + Send + 'static, { - // FIXME: this is intended to be used in the future to directly - // manipulate state for this future within the store without having to - // go through an mpsc. - let _accessor = accessor.as_accessor(); - let (tx, rx) = oneshot::channel(); - send(self.tx.as_mut().unwrap(), ReadEvent::Read { buffer, tx }); - let v = rx.await; - match v { - Ok(HostResult { buffer, dropped }) => { - if self.closed { - debug_assert!(dropped); - } - self.closed = dropped; - buffer - } - Err(_) => { - todo!("guarantee buffer recovery if event loop errors or panics") - } - } + self.0.inner.read(self.0.accessor, buffer).await } - /// Wait until the write end of this `stream` is dropped. - /// - /// # Panics - /// - /// Panics if the store that the [`Accessor`] is derived from does not own - /// this future. - pub async fn watch_writer(&mut self, accessor: impl AsAccessor) - where - B: Send + 'static, - { - // FIXME: this is intended to be used in the future to directly - // manipulate state for this future within the store without having to - // go through an mpsc. - let _accessor = accessor.as_accessor(); - let (tx, rx) = oneshot::channel(); - send(&mut self.tx.as_mut().unwrap(), ReadEvent::Watch { tx }); - let (future, _) = watch(self.instance, rx, ()); - future.await + /// Wrapper for `StreamReader::watch_writer`. + pub async fn watch_writer(&mut self) { + self.0.inner.watch_writer(self.0.accessor).await } } -impl Drop for StreamReader { - fn drop(&mut self) { - if let Some(mut tx) = self.tx.take() { - send(&mut tx, ReadEvent::Drop); - } +impl<'a, T, U: 'static, D: HasData + ?Sized> From> + for StreamReader +{ + fn from(reader: GuardedStreamReader<'a, T, U, D>) -> Self { + reader.0.into_parts().1 } } @@ -1431,14 +1508,14 @@ struct TransmitState { write: WriteState, /// See `ReadState` read: ReadState, - /// The `Sender`, if any, to be dropped when the write end of the stream or + /// The `Waker`, if any, to be woken when the write end of the stream or /// future is dropped. /// /// This will signal to the host-owned read end that the write end has been /// dropped. - writer_watcher: Option>, + writer_watcher: Option, /// Like `writer_watcher`, but for the reverse direction. - reader_watcher: Option>, + reader_watcher: Option, /// Whether futher values may be transmitted via this stream or future. done: bool, } @@ -1563,7 +1640,7 @@ enum Reader<'a> { }, /// The read end is owned by the host. Host { - accept: Box usize>, + accept: Box usize + 'a>, }, /// The read end has been dropped. End, @@ -1573,253 +1650,53 @@ impl Instance { /// Create a new Component Model `future` as pair of writable and readable ends, /// the latter of which may be passed to guest code. /// - /// The `default` parameter will be used if the returned `FutureWriter` is - /// dropped before `FutureWriter::write` is called. Since the write end of - /// a Component Model `future` must be written to before it is dropped, and - /// since Rust does not currently provide a way to statically enforce that - /// (e.g. linear typing), we use this mechanism to ensure a value is always - /// written prior to closing. - /// - /// If there's no plausible default value, and you're sure - /// `FutureWriter::write` will be called, you can consider passing `|| - /// unreachable!()` as the `default` parameter. + /// `default` is a callback to be used if the writable end of the future is + /// closed without having written a value. You may supply e.g. `|| + /// unreachable!()` if you're sure that won't happen. pub fn future( self, - default: fn() -> T, mut store: impl AsContextMut, + default: fn() -> T, ) -> Result<(FutureWriter, FutureReader)> { - let mut store = store.as_context_mut(); - let (write, read) = self.concurrent_state_mut(store.0).new_transmit()?; + let (write, read) = self + .concurrent_state_mut(store.as_context_mut().0) + .new_transmit()?; Ok(( - FutureWriter::new( - default, - Some(self.start_write_event_loop( - store.as_context_mut(), - write.rep(), - TransmitKind::Future, - )), - self, - ), - FutureReader::new( - read.rep(), - Some(self.start_read_event_loop( - store.as_context_mut(), - read.rep(), - TransmitKind::Future, - )), - self, - ), + FutureWriter::new(default, write, self), + FutureReader::new(read, self), )) } /// Create a new Component Model `stream` as pair of writable and readable ends, /// the latter of which may be passed to guest code. - pub fn stream< - T: func::Lower + func::Lift + Send + 'static, - W: WriteBuffer, - R: ReadBuffer, - >( + pub fn stream( self, mut store: impl AsContextMut, - ) -> Result<(StreamWriter, StreamReader)> { - let mut store = store.as_context_mut(); - let (write, read) = self.concurrent_state_mut(store.0).new_transmit()?; + ) -> Result<(StreamWriter, StreamReader)> { + let (write, read) = self + .concurrent_state_mut(store.as_context_mut().0) + .new_transmit()?; Ok(( - StreamWriter::new( - Some(self.start_write_event_loop( - store.as_context_mut(), - write.rep(), - TransmitKind::Stream, - )), - self, - ), - StreamReader::new( - read.rep(), - Some(self.start_read_event_loop( - store.as_context_mut(), - read.rep(), - TransmitKind::Stream, - )), - self, - ), + StreamWriter::new(write, self), + StreamReader::new(read, self), )) } - /// Spawn a background task to be polled in this instance's event loop. - /// - /// The spawned task will accept host events from the `Receiver` corresponding to - /// the returned `Sender`, handling each event it receives and then exiting - /// when the channel is dropped. - /// - /// We handle `StreamWriter` and `FutureWriter` operations this way so that - /// they can be initiated without access to the store and possibly outside - /// the instance's event loop, improving the ergonmics for host embedders. - fn start_write_event_loop< - T: func::Lower + func::Lift + Send + 'static, - B: WriteBuffer, - U, - >( - self, - mut store: StoreContextMut, - rep: u32, - kind: TransmitKind, - ) -> mpsc::Sender> { - let (tx, mut rx) = mpsc::channel(1); - let id = TableId::::new(rep); - let run_on_drop = - RunOnDrop::new(move || log::trace!("write event loop for {id:?} dropped")); - let token = StoreToken::new(store.as_context_mut()); - let task = Box::pin( - async move { - log::trace!("write event loop for {id:?} started"); - let mut my_rep = None; - while let Some(event) = rx.next().await { - if my_rep.is_none() { - my_rep = Some(self.get_state_rep(rep)?); - } - let rep = my_rep.unwrap(); - match event { - WriteEvent::Write { buffer, tx } => tls::get(|store| { - self.host_write::<_, _, U>( - token.as_context_mut(store), - rep, - buffer, - PostWrite::Continue, - tx, - kind, - ) - })?, - WriteEvent::Drop(default) => tls::get(|store| { - if let Some(default) = default { - self.host_write::<_, _, U>( - token.as_context_mut(store), - rep, - default(), - PostWrite::Continue, - oneshot::channel().0, - kind, - )?; - } - self.concurrent_state_mut(store).host_drop_writer(rep, kind) - })?, - WriteEvent::Watch { tx } => tls::get(|store| { - let state = - self.concurrent_state_mut(store) - .get_mut(TableId::::new(rep))?; - if !matches!(&state.read, ReadState::Dropped) { - state.reader_watcher = Some(tx); - } - Ok::<_, anyhow::Error>(()) - })?, - } - } - Ok(()) - } - .map(move |v| { - run_on_drop.cancel(); - log::trace!("write event loop for {id:?} finished: {v:?}"); - HostTaskOutput::Result(v) - }), - ); - self.concurrent_state_mut(store.0).push_future(task); - tx - } - - /// Same as `Self::start_write_event_loop`, but for the read end of a stream - /// or future. - fn start_read_event_loop, U>( - self, - mut store: StoreContextMut, - rep: u32, - kind: TransmitKind, - ) -> mpsc::Sender> { - let (tx, mut rx) = mpsc::channel(1); - let id = TableId::::new(rep); - let run_on_drop = RunOnDrop::new(move || log::trace!("read event loop for {id:?} dropped")); - let token = StoreToken::new(store.as_context_mut()); - let task = Box::pin( - async move { - log::trace!("read event loop for {id:?} started"); - let mut my_rep = None; - while let Some(event) = rx.next().await { - if my_rep.is_none() { - my_rep = Some(self.get_state_rep(rep)?); - } - let rep = my_rep.unwrap(); - match event { - ReadEvent::Read { buffer, tx } => tls::get(|store| { - self.host_read::<_, _, U>( - token.as_context_mut(store), - rep, - buffer, - tx, - kind, - ) - })?, - ReadEvent::Drop => { - tls::get(|store| self.host_drop_reader(store, rep, kind))? - } - ReadEvent::Watch { tx } => tls::get(|store| { - let state = - self.concurrent_state_mut(store) - .get_mut(TableId::::new(rep))?; - if !matches!( - &state.write, - WriteState::Dropped - | WriteState::GuestReady { - post_write: PostWrite::Drop, - .. - } - | WriteState::HostReady { - post_write: PostWrite::Drop, - .. - } - ) { - state.writer_watcher = Some(tx); - } - Ok::<_, anyhow::Error>(()) - })?, - } - } - Ok(()) - } - .map(move |v| { - run_on_drop.cancel(); - log::trace!("read event loop for {id:?} finished: {v:?}"); - HostTaskOutput::Result(v) - }), - ); - self.concurrent_state_mut(store.0).push_future(task); - tx - } - /// Write to the specified stream or future from the host. - /// - /// # Arguments - /// - /// * `store` - The store to which this instance belongs - /// * `transmit_rep` - The `TransmitState` rep for the stream or future - /// * `buffer` - Buffer of values that should be written - /// * `post_write` - Whether the transmit should be dropped after write, possibly with an error context - /// * `tx` - Oneshot channel to notify when operation completes (or drop on error) - /// * `kind` - whether this is a stream or a future fn host_write, U>( self, mut store: StoreContextMut, - transmit_rep: u32, + id: TableId, mut buffer: B, - mut post_write: PostWrite, - tx: oneshot::Sender>, kind: TransmitKind, - ) -> Result<()> { - let mut store = store.as_context_mut(); - let transmit_id = TableId::::new(transmit_rep); + ) -> Result, oneshot::Receiver>>> { + let transmit_id = self.concurrent_state_mut(store.0).get(id)?.state; let transmit = self .concurrent_state_mut(store.0) .get_mut(transmit_id) - .with_context(|| format!("retrieving state for transmit [{transmit_rep}]"))?; + .with_context(|| format!("retrieving state for transmit [{transmit_id:?}]"))?; log::trace!("host_write state {transmit_id:?}; {:?}", transmit.read); let new_state = if let ReadState::Dropped = &transmit.read { @@ -1828,23 +1705,31 @@ impl Instance { ReadState::Open }; - match mem::replace(&mut transmit.read, new_state) { + Ok(match mem::replace(&mut transmit.read, new_state) { ReadState::Open => { assert!(matches!(&transmit.write, WriteState::Open)); + let token = StoreToken::new(store.as_context_mut()); + let (tx, rx) = oneshot::channel(); let state = WriteState::HostReady { - accept: Box::new(accept_reader::( - store.as_context_mut(), - buffer, - tx, - kind, - )), - post_write, + accept: Box::new(move |store, instance, reader| { + let (result, code) = accept_reader::( + token.as_context_mut(store), + instance, + reader, + buffer, + kind, + )?; + _ = tx.send(result); + Ok(code) + }), + post_write: PostWrite::Continue, }; self.concurrent_state_mut(store.0) .get_mut(transmit_id)? .write = state; - post_write = PostWrite::Continue; + + Err(rx) } ReadState::GuestReady { @@ -1861,8 +1746,8 @@ impl Instance { } let read_handle = transmit.read_handle; - let code = accept_reader::(store.as_context_mut(), buffer, tx, kind)( - store.0.traitobj_mut(), + let (result, code) = accept_reader::( + store.as_context_mut(), self, Reader::Guest { options: &options, @@ -1870,6 +1755,8 @@ impl Instance { address, count, }, + buffer, + kind, )?; self.concurrent_state_mut(store.0).set_event( @@ -1885,6 +1772,8 @@ impl Instance { }, }, )?; + + Ok(result) } ReadState::HostReady { accept } => { @@ -1898,51 +1787,49 @@ impl Instance { unreachable!() }; - _ = tx.send(HostResult { + Ok(HostResult { buffer, dropped: false, - }); + }) } - ReadState::Dropped => { - _ = tx.send(HostResult { - buffer, - dropped: true, - }); - } - } + ReadState::Dropped => Ok(HostResult { + buffer, + dropped: true, + }), + }) + } - if let PostWrite::Drop = post_write { - self.concurrent_state_mut(store.0) - .host_drop_writer(transmit_rep, kind)?; + /// Async wrapper around `Self::host_write`. + async fn host_write_async>( + self, + accessor: impl AsAccessor, + id: TableId, + buffer: B, + kind: TransmitKind, + ) -> Result> { + match accessor + .as_accessor() + .with(move |mut access| self.host_write(access.as_context_mut(), id, buffer, kind))? + { + Ok(result) => Ok(result), + Err(rx) => Ok(rx.await?), } - - Ok(()) } /// Read from the specified stream or future from the host. - /// - /// # Arguments - /// - /// * `store` - The store to which this instance belongs - /// * `rep` - The `TransmitState` rep for the stream or future - /// * `buffer` - Buffer to receive values - /// * `tx` - Oneshot channel to notify when operation completes (or drop on error) - /// * `kind` - whether this is a stream or a future fn host_read, U>( self, - mut store: StoreContextMut, - rep: u32, + store: StoreContextMut, + id: TableId, mut buffer: B, - tx: oneshot::Sender>, kind: TransmitKind, - ) -> Result<()> { - let store = store.as_context_mut(); - let transmit_id = TableId::::new(rep); + ) -> Result, oneshot::Receiver>>> { + let transmit_id = self.concurrent_state_mut(store.0).get(id)?.state; let transmit = self .concurrent_state_mut(store.0) .get_mut(transmit_id) - .with_context(|| rep.to_string())?; + .with_context(|| format!("retrieving state for transmit [{transmit_id:?}]"))?; log::trace!("host_read state {transmit_id:?}; {:?}", transmit.write); let new_state = if let WriteState::Dropped = &transmit.write { @@ -1951,13 +1838,20 @@ impl Instance { WriteState::Open }; - match mem::replace(&mut transmit.write, new_state) { + Ok(match mem::replace(&mut transmit.write, new_state) { WriteState::Open => { assert!(matches!(&transmit.read, ReadState::Open)); + let (tx, rx) = oneshot::channel(); transmit.read = ReadState::HostReady { - accept: Box::new(accept_writer::(buffer, tx, kind)), + accept: Box::new(move |writer| { + let (result, code) = accept_writer::(writer, buffer, kind)?; + _ = tx.send(result); + Ok(code) + }), }; + + Err(rx) } WriteState::GuestReady { @@ -1976,12 +1870,16 @@ impl Instance { let write_handle = transmit.write_handle; let lift = &mut LiftContext::new(store.0.store_opaque_mut(), &options, self); - let code = accept_writer::(buffer, tx, kind)(Writer::Guest { - ty: payload(ty, lift.types), - lift, - address, - count, - })?; + let (result, code) = accept_writer::( + Writer::Guest { + ty: payload(ty, lift.types), + lift, + address, + count, + }, + buffer, + kind, + )?; let state = self.concurrent_state_mut(store.0); let pending = if let PostWrite::Drop = post_write { @@ -2004,6 +1902,8 @@ impl Instance { }, }, )?; + + Ok(result) } WriteState::HostReady { accept, post_write } => { @@ -2011,13 +1911,9 @@ impl Instance { store.0.traitobj_mut(), self, Reader::Host { - accept: Box::new(move |input, count| { + accept: Box::new(|input, count| { let count = count.min(buffer.remaining_capacity()); buffer.move_from(input.get_mut::(), count); - _ = tx.send(HostResult { - buffer, - dropped: false, - }); count }), }, @@ -2028,36 +1924,49 @@ impl Instance { .get_mut(transmit_id)? .write = WriteState::Dropped; } - } - WriteState::Dropped => { - _ = tx.send(HostResult { + Ok(HostResult { buffer, - dropped: true, - }); + dropped: false, + }) } - } - Ok(()) + WriteState::Dropped => Ok(HostResult { + buffer, + dropped: true, + }), + }) + } + + /// Async wrapper around `Self::host_read`. + async fn host_read_async>( + self, + accessor: impl AsAccessor, + id: TableId, + buffer: B, + kind: TransmitKind, + ) -> Result> { + match accessor + .as_accessor() + .with(move |mut access| self.host_read(access.as_context_mut(), id, buffer, kind))? + { + Ok(result) => Ok(result), + Err(rx) => Ok(rx.await?), + } } /// Drop the read end of a stream or future read from the host. - /// - /// # Arguments - /// - /// * `store` - The store to which this instance belongs - /// * `transmit_rep` - The `TransmitState` rep for the stream or future. fn host_drop_reader( self, store: &mut dyn VMStore, - transmit_rep: u32, + id: TableId, kind: TransmitKind, ) -> Result<()> { - let transmit_id = TableId::::new(transmit_rep); + let transmit_id = self.concurrent_state_mut(store).get(id)?.state; let state = self.concurrent_state_mut(store); let transmit = state .get_mut(transmit_id) - .with_context(|| format!("error closing reader {transmit_rep}"))?; + .with_context(|| format!("error closing reader {transmit_id:?}"))?; log::trace!( "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}", transmit.read, @@ -2065,59 +1974,186 @@ impl Instance { ); transmit.read = ReadState::Dropped; - transmit.reader_watcher = None; + if let Some(waker) = transmit.reader_watcher.take() { + waker.wake(); + } + + // If the write end is already dropped, it should stay dropped, + // otherwise, it should be opened. + let new_state = if let WriteState::Dropped = &transmit.write { + WriteState::Dropped + } else { + WriteState::Open + }; + + let write_handle = transmit.write_handle; + + match mem::replace(&mut transmit.write, new_state) { + // If a guest is waiting to write, notify it that the read end has + // been dropped. + WriteState::GuestReady { + ty, + handle, + post_write, + .. + } => { + if let PostWrite::Drop = post_write { + state.delete_transmit(transmit_id)?; + } else { + state.update_event( + write_handle.rep(), + match ty { + TableIndex::Future(ty) => Event::FutureWrite { + code: ReturnCode::Dropped(0), + pending: Some((ty, handle)), + }, + TableIndex::Stream(ty) => Event::StreamWrite { + code: ReturnCode::Dropped(0), + pending: Some((ty, handle)), + }, + }, + )?; + }; + } + + WriteState::HostReady { accept, .. } => { + accept(store, self, Reader::End)?; + } + + WriteState::Open => { + state.update_event( + write_handle.rep(), + match kind { + TransmitKind::Future => Event::FutureWrite { + code: ReturnCode::Dropped(0), + pending: None, + }, + TransmitKind::Stream => Event::StreamWrite { + code: ReturnCode::Dropped(0), + pending: None, + }, + }, + )?; + } + + WriteState::Dropped => { + log::trace!("host_drop_reader delete {transmit_id:?}"); + state.delete_transmit(transmit_id)?; + } + } + Ok(()) + } + + /// Drop the write end of a stream or future read from the host. + fn host_drop_writer( + self, + mut store: StoreContextMut, + id: TableId, + default: Option<&dyn Fn() -> Result>, + ) -> Result<()> { + let transmit_id = self.concurrent_state_mut(store.0).get(id)?.state; + let token = StoreToken::new(store.as_context_mut()); + let transmit = self + .concurrent_state_mut(store.0) + .get_mut(transmit_id) + .with_context(|| format!("error closing writer {transmit_id:?}"))?; + log::trace!( + "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}", + transmit.read, + transmit.write + ); + + if let Some(waker) = transmit.writer_watcher.take() { + waker.wake(); + } + + // Existing queued transmits must be updated with information for the impending writer closure + match &mut transmit.write { + WriteState::GuestReady { .. } => { + unreachable!("can't call `host_drop_writer` on a guest-owned writer"); + } + WriteState::HostReady { post_write, .. } => { + *post_write = PostWrite::Drop; + } + v @ WriteState::Open => { + *v = if let (Some(default), false) = ( + default, + transmit.done || matches!(transmit.read, ReadState::Dropped), + ) { + // This is a future, and we haven't written a value yet -- + // write the default value. + let default = default()?; + WriteState::HostReady { + accept: Box::new(move |store, instance, reader| { + let (_, code) = accept_reader::, U>( + token.as_context_mut(store), + instance, + reader, + Some(default), + TransmitKind::Future, + )?; + Ok(code) + }), + post_write: PostWrite::Drop, + } + } else { + WriteState::Dropped + }; + } + WriteState::Dropped => unreachable!("write state is already dropped"), + } - // If the write end is already dropped, it should stay dropped, - // otherwise, it should be opened. - let new_state = if let WriteState::Dropped = &transmit.write { - WriteState::Dropped + // If the existing read state is dropped, then there's nothing to read + // and we can keep it that way. + // + // If the read state was any other state, then we must set the new state to open + // to indicate that there *is* data to be read + let new_state = if let ReadState::Dropped = &transmit.read { + ReadState::Dropped } else { - WriteState::Open + ReadState::Open }; - let write_handle = transmit.write_handle; + let read_handle = transmit.read_handle; - match mem::replace(&mut transmit.write, new_state) { - // If a guest is waiting to write, notify it that the read end has - // been dropped. - WriteState::GuestReady { - ty, - handle, - post_write, - .. - } => { - if let PostWrite::Drop = post_write { - state.delete_transmit(transmit_id)?; - } else { - state.update_event( - write_handle.rep(), - match ty { - TableIndex::Future(ty) => Event::FutureWrite { - code: ReturnCode::Dropped(0), - pending: Some((ty, handle)), - }, - TableIndex::Stream(ty) => Event::StreamWrite { - code: ReturnCode::Dropped(0), - pending: Some((ty, handle)), - }, + // Swap in the new read state + match mem::replace(&mut transmit.read, new_state) { + // If the guest was ready to read, then we cannot drop the reader (or writer); + // we must deliver the event, and update the state associated with the handle to + // represent that a read must be performed + ReadState::GuestReady { ty, handle, .. } => { + // Ensure the final read of the guest is queued, with appropriate closure indicator + self.concurrent_state_mut(store.0).update_event( + read_handle.rep(), + match ty { + TableIndex::Future(ty) => Event::FutureRead { + code: ReturnCode::Dropped(0), + pending: Some((ty, handle)), }, - )?; - }; + TableIndex::Stream(ty) => Event::StreamRead { + code: ReturnCode::Dropped(0), + pending: Some((ty, handle)), + }, + }, + )?; } - WriteState::HostReady { accept, .. } => { - accept(store, self, Reader::End)?; + // If the host was ready to read, and the writer end is being dropped (host->host write?) + // signal to the reader that we've reached the end of the stream + ReadState::HostReady { accept } => { + accept(Writer::End)?; } - WriteState::Open => { - state.update_event( - write_handle.rep(), - match kind { - TransmitKind::Future => Event::FutureWrite { + // If the read state is open, then there are no registered readers of the stream/future + ReadState::Open => { + self.concurrent_state_mut(store.0).update_event( + read_handle.rep(), + match default { + Some(_) => Event::FutureRead { code: ReturnCode::Dropped(0), pending: None, }, - TransmitKind::Stream => Event::StreamWrite { + None => Event::StreamRead { code: ReturnCode::Dropped(0), pending: None, }, @@ -2125,14 +2161,62 @@ impl Instance { )?; } - WriteState::Dropped => { - log::trace!("host_drop_reader delete {transmit_rep}"); - state.delete_transmit(transmit_id)?; + // If the read state was already dropped, then we can remove the transmit state completely + // (both writer and reader have been dropped) + ReadState::Dropped => { + log::trace!("host_drop_writer delete {transmit_id:?}"); + self.concurrent_state_mut(store.0) + .delete_transmit(transmit_id)?; } } Ok(()) } + /// Drop the writable end of the specified stream or future from the guest. + pub(super) fn guest_drop_writable( + self, + store: StoreContextMut, + ty: TableIndex, + writer: u32, + ) -> Result<()> { + let (transmit_rep, state) = self + .concurrent_state_mut(store.0) + .state_table(ty) + .remove_by_index(writer) + .context("failed to find writer")?; + let (state, kind) = match state { + WaitableState::Stream(_, state) => (state, TransmitKind::Stream), + WaitableState::Future(_, state) => (state, TransmitKind::Future), + _ => { + bail!("invalid stream or future handle"); + } + }; + match state { + StreamFutureState::Write { .. } => {} + StreamFutureState::Read { .. } => { + bail!("passed read end to `{{stream|future}}.drop-writable`") + } + StreamFutureState::Busy => bail!("cannot drop busy stream or future"), + } + + let id = TableId::::new(transmit_rep); + log::trace!("guest_drop_writable: drop writer {id:?}"); + match kind { + TransmitKind::Stream => { + self.host_drop_writer(store, id, None::<&dyn Fn() -> Result<()>>) + } + TransmitKind::Future => self.host_drop_writer( + store, + id, + Some(&|| { + Err::<(), _>(anyhow!( + "cannot drop future write end without first writing a value" + )) + }), + ), + } + } + /// Copy `count` items from `read_address` to `write_address` for the /// specified stream or future. fn copy( @@ -2724,9 +2808,8 @@ impl Instance { StreamFutureState::Busy => bail!("cannot drop busy stream or future"), } let id = TableId::::new(rep); - let rep = concurrent_state.get(id)?.state.rep(); log::trace!("guest_drop_readable: drop reader {id:?}"); - self.host_drop_reader(store, rep, kind) + self.host_drop_reader(store, id, kind) } /// Create a new error context for the given component. @@ -2842,40 +2925,6 @@ impl Instance { ) -> Result<()> { self.guest_drop_readable(store, TableIndex::Stream(ty), reader) } - - /// Retrieve the `TransmitState` rep for the specified `TransmitHandle` rep. - fn get_state_rep(&self, rep: u32) -> Result { - tls::get(|store| { - let transmit_handle = TableId::::new(rep); - Ok(self - .concurrent_state_mut(store) - .get(transmit_handle) - .with_context(|| format!("stream or future {transmit_handle:?} not found"))? - .state - .rep()) - }) - } -} - -/// Helper struct for running a closure on drop, e.g. for logging purposes. -struct RunOnDrop(Option); - -impl RunOnDrop { - fn new(fun: F) -> Self { - Self(Some(fun)) - } - - fn cancel(mut self) { - self.0 = None; - } -} - -impl Drop for RunOnDrop { - fn drop(&mut self) { - if let Some(fun) = self.0.take() { - fun(); - } - } } impl ConcurrentState { @@ -3099,113 +3148,6 @@ impl ConcurrentState { Ok(code) } - /// Drop the write end of a stream or future read from the host. - /// - /// # Arguments - /// - /// * `transmit_rep` - The `TransmitState` rep for the stream or future. - fn host_drop_writer(&mut self, transmit_rep: u32, kind: TransmitKind) -> Result<()> { - let transmit_id = TableId::::new(transmit_rep); - let transmit = self - .get_mut(transmit_id) - .with_context(|| format!("error closing writer {transmit_rep}"))?; - log::trace!( - "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}", - transmit.read, - transmit.write - ); - - transmit.writer_watcher = None; - - // Existing queued transmits must be updated with information for the impending writer closure - match &mut transmit.write { - WriteState::GuestReady { post_write, .. } => { - *post_write = PostWrite::Drop; - } - WriteState::HostReady { post_write, .. } => { - *post_write = PostWrite::Drop; - } - v @ WriteState::Open => { - if let (TransmitKind::Future, false) = ( - kind, - transmit.done || matches!(transmit.read, ReadState::Dropped), - ) { - bail!("cannot drop future write end without first writing a value") - } - - *v = WriteState::Dropped; - } - WriteState::Dropped => unreachable!("write state is already dropped"), - } - - // If the existing read state is dropped, then there's nothing to read - // and we can keep it that way. - // - // If the read state was any other state, then we must set the new state to open - // to indicate that there *is* data to be read - let new_state = if let ReadState::Dropped = &transmit.read { - ReadState::Dropped - } else { - ReadState::Open - }; - - let read_handle = transmit.read_handle; - - // Swap in the new read state - match mem::replace(&mut transmit.read, new_state) { - // If the guest was ready to read, then we cannot drop the reader (or writer) - // we must deliver the event, and update the state associated with the handle to - // represent that a read must be performed - ReadState::GuestReady { ty, handle, .. } => { - // Ensure the final read of the guest is queued, with appropriate closure indicator - self.update_event( - read_handle.rep(), - match ty { - TableIndex::Future(ty) => Event::FutureRead { - code: ReturnCode::Dropped(0), - pending: Some((ty, handle)), - }, - TableIndex::Stream(ty) => Event::StreamRead { - code: ReturnCode::Dropped(0), - pending: Some((ty, handle)), - }, - }, - )?; - } - - // If the host was ready to read, and the writer end is being dropped (host->host write?) - // signal to the reader that we've reached the end of the stream - ReadState::HostReady { accept } => { - accept(Writer::End)?; - } - - // If the read state is open, then there are no registered readers of the stream/future - ReadState::Open => { - self.update_event( - read_handle.rep(), - match kind { - TransmitKind::Future => Event::FutureRead { - code: ReturnCode::Dropped(0), - pending: None, - }, - TransmitKind::Stream => Event::StreamRead { - code: ReturnCode::Dropped(0), - pending: None, - }, - }, - )?; - } - - // If the read state was already dropped, then we can remove the transmit state completely - // (both writer and reader have been dropped) - ReadState::Dropped => { - log::trace!("host_drop_writer delete {transmit_rep}"); - self.delete_transmit(transmit_id)?; - } - } - Ok(()) - } - /// Cancel a pending write for the specified stream or future from the guest. fn guest_cancel_write( &mut self, @@ -3264,33 +3206,6 @@ impl ConcurrentState { self.host_cancel_read(rep) } - /// Drop the writable end of the specified stream or future from the guest. - fn guest_drop_writable(&mut self, ty: TableIndex, writer: u32) -> Result<()> { - let (transmit_rep, state) = self - .state_table(ty) - .remove_by_index(writer) - .context("failed to find writer")?; - let (state, kind) = match state { - WaitableState::Stream(_, state) => (state, TransmitKind::Stream), - WaitableState::Future(_, state) => (state, TransmitKind::Future), - _ => { - bail!("invalid stream or future handle"); - } - }; - match state { - StreamFutureState::Write { .. } => {} - StreamFutureState::Read { .. } => { - bail!("passed read end to `{{stream|future}}.drop-writable`") - } - StreamFutureState::Busy => bail!("cannot drop busy stream or future"), - } - - let id = TableId::::new(transmit_rep); - let transmit_rep = self.get(id)?.state.rep(); - log::trace!("guest_drop_writable: drop writer {id:?}"); - self.host_drop_writer(transmit_rep, kind) - } - /// Drop the specified error context. pub(crate) fn error_context_drop( &mut self, @@ -3411,15 +3326,6 @@ impl ConcurrentState { .map(|result| result.encode()) } - /// Implements the `future.drop-writable` intrinsic. - pub(crate) fn future_drop_writable( - &mut self, - ty: TypeFutureTableIndex, - writer: u32, - ) -> Result<()> { - self.guest_drop_writable(TableIndex::Future(ty), writer) - } - /// Implements the `stream.new` intrinsic. pub(crate) fn stream_new(&mut self, ty: TypeStreamTableIndex) -> Result { self.guest_new(TableIndex::Stream(ty)) @@ -3447,15 +3353,6 @@ impl ConcurrentState { .map(|result| result.encode()) } - /// Implements the `stream.drop-writable` intrinsic. - pub(crate) fn stream_drop_writable( - &mut self, - ty: TypeStreamTableIndex, - writer: u32, - ) -> Result<()> { - self.guest_drop_writable(TableIndex::Stream(ty), writer) - } - /// Transfer ownership of the specified future read end from one guest to /// another. pub(crate) fn future_transfer( diff --git a/crates/wasmtime/src/runtime/component/concurrent_disabled.rs b/crates/wasmtime/src/runtime/component/concurrent_disabled.rs index 1a661dd4da0e..186676180754 100644 --- a/crates/wasmtime/src/runtime/component/concurrent_disabled.rs +++ b/crates/wasmtime/src/runtime/component/concurrent_disabled.rs @@ -80,12 +80,12 @@ impl ErrorContext { } } -pub struct HostStream

{ +pub struct StreamReader

{ uninhabited: Uninhabited, _phantom: PhantomData

, } -impl

HostStream

{ +impl

StreamReader

{ pub(crate) fn into_val(self) -> Val { match self.uninhabited {} } @@ -107,12 +107,12 @@ impl

HostStream

{ } } -pub struct HostFuture

{ +pub struct FutureReader

{ uninhabited: Uninhabited, _phantom: PhantomData

, } -impl

HostFuture

{ +impl

FutureReader

{ pub(crate) fn into_val(self) -> Val { match self.uninhabited {} } diff --git a/crates/wasmtime/src/runtime/component/linker.rs b/crates/wasmtime/src/runtime/component/linker.rs index 71bae434742f..2f68d4b5e663 100644 --- a/crates/wasmtime/src/runtime/component/linker.rs +++ b/crates/wasmtime/src/runtime/component/linker.rs @@ -797,6 +797,59 @@ impl LinkerInstance<'_, T> { Ok(()) } + /// Identical to [`Self::resource`], except that it takes a concurrent destructor. + #[cfg(feature = "component-model-async")] + pub fn resource_concurrent(&mut self, name: &str, ty: ResourceType, dtor: F) -> Result<()> + where + T: Send + 'static, + F: Fn(&Accessor, u32) -> Pin> + Send + '_>> + + Send + + Sync + + 'static, + { + assert!( + self.engine.config().async_support, + "cannot use `resource_concurrent` without enabling async support in the config" + ); + // TODO: This isn't really concurrent -- it requires exclusive access to + // the store for the duration of the call, preventing guest code from + // running until it completes. We should make it concurrent and clean + // up the implementation to avoid using e.g. `Accessor::new` and + // `tls::set` directly. + let dtor = Arc::new(dtor); + let dtor = Arc::new(crate::func::HostFunc::wrap_inner( + &self.engine, + move |mut cx: crate::Caller<'_, T>, (param,): (u32,)| { + let dtor = dtor.clone(); + cx.as_context_mut().block_on(move |mut store| { + Box::pin(async move { + // NOTE: We currently pass `None` as the `instance` + // parameter to `Accessor::new` because we don't have ready + // access to it, meaning `dtor` will panic if it tries to + // use `Accessor::instance`. We could plumb that through + // from the `wasmtime-cranelift`-generated code, but we plan + // to remove `Accessor::instance` once all instances in a + // store share the same concurrent state, at which point we + // won't need it anyway. + let accessor = &Accessor::new( + crate::store::StoreToken::new(store.as_context_mut()), + None, + ); + let mut future = std::pin::pin!(dtor(accessor, param)); + std::future::poll_fn(|cx| { + crate::component::concurrent::tls::set(store.0.traitobj_mut(), || { + future.as_mut().poll(cx) + }) + }) + .await + }) + })? + }, + )); + self.insert(name, Definition::Resource(ty, dtor))?; + Ok(()) + } + /// Defines a nested instance within this instance. /// /// This can be used to describe arbitrarily nested levels of instances diff --git a/crates/wasmtime/src/runtime/component/mod.rs b/crates/wasmtime/src/runtime/component/mod.rs index cf7b662981a1..9219fd8513e0 100644 --- a/crates/wasmtime/src/runtime/component/mod.rs +++ b/crates/wasmtime/src/runtime/component/mod.rs @@ -120,8 +120,9 @@ pub use self::component::{Component, ComponentExportIndex}; #[cfg(feature = "component-model-async")] pub use self::concurrent::{ AbortHandle, Access, Accessor, AccessorTask, AsAccessor, ErrorContext, FutureReader, - FutureWriter, HostFuture, HostStream, ReadBuffer, StreamReader, StreamWriter, - VMComponentAsyncStore, VecBuffer, Watch, WriteBuffer, + FutureWriter, GuardedFutureReader, GuardedFutureWriter, GuardedStreamReader, + GuardedStreamWriter, ReadBuffer, StreamReader, StreamWriter, VMComponentAsyncStore, VecBuffer, + WriteBuffer, }; pub use self::func::{ ComponentNamedList, ComponentType, Func, Lift, Lower, TypedFunc, WasmList, WasmStr, diff --git a/crates/wasmtime/src/runtime/component/store.rs b/crates/wasmtime/src/runtime/component/store.rs index 71e214d72c5d..edb251ed5e5a 100644 --- a/crates/wasmtime/src/runtime/component/store.rs +++ b/crates/wasmtime/src/runtime/component/store.rs @@ -32,8 +32,9 @@ impl ComponentStoreData { } #[cfg(feature = "component-model-async")] - pub(crate) fn drop_fibers(store: &mut StoreOpaque) { + pub(crate) fn drop_fibers_and_futures(store: &mut StoreOpaque) { let mut fibers = Vec::new(); + let mut futures = Vec::new(); for (_, instance) in store.store_data_mut().components.instances.iter_mut() { let Some(instance) = instance.as_mut() else { continue; @@ -42,12 +43,14 @@ impl ComponentStoreData { instance .get_mut() .concurrent_state_mut() - .take_fibers(&mut fibers); + .take_fibers_and_futures(&mut fibers, &mut futures); } for mut fiber in fibers { fiber.dispose(store); } + + crate::component::concurrent::tls::set(store.traitobj_mut(), move || drop(futures)); } } diff --git a/crates/wasmtime/src/runtime/component/values.rs b/crates/wasmtime/src/runtime/component/values.rs index dbe129ca4370..943607280686 100644 --- a/crates/wasmtime/src/runtime/component/values.rs +++ b/crates/wasmtime/src/runtime/component/values.rs @@ -1,6 +1,6 @@ use crate::ValRaw; use crate::component::ResourceAny; -use crate::component::concurrent::{self, ErrorContext, HostFuture, HostStream}; +use crate::component::concurrent::{self, ErrorContext, FutureReader, StreamReader}; use crate::component::func::{Lift, LiftContext, Lower, LowerContext, desc}; use crate::prelude::*; use core::mem::MaybeUninit; @@ -207,10 +207,10 @@ impl Val { Val::Flags(flags) } InterfaceType::Future(_) => { - HostFuture::<()>::linear_lift_from_flat(cx, ty, next(src))?.into_val() + FutureReader::<()>::linear_lift_from_flat(cx, ty, next(src))?.into_val() } InterfaceType::Stream(_) => { - HostStream::<()>::linear_lift_from_flat(cx, ty, next(src))?.into_val() + StreamReader::<()>::linear_lift_from_flat(cx, ty, next(src))?.into_val() } InterfaceType::ErrorContext(_) => { ErrorContext::linear_lift_from_flat(cx, ty, next(src))?.into_val() @@ -337,10 +337,10 @@ impl Val { Val::Flags(flags) } InterfaceType::Future(_) => { - HostFuture::<()>::linear_lift_from_memory(cx, ty, bytes)?.into_val() + FutureReader::<()>::linear_lift_from_memory(cx, ty, bytes)?.into_val() } InterfaceType::Stream(_) => { - HostStream::<()>::linear_lift_from_memory(cx, ty, bytes)?.into_val() + StreamReader::<()>::linear_lift_from_memory(cx, ty, bytes)?.into_val() } InterfaceType::ErrorContext(_) => { ErrorContext::linear_lift_from_memory(cx, ty, bytes)?.into_val() diff --git a/crates/wasmtime/src/runtime/store.rs b/crates/wasmtime/src/runtime/store.rs index 901f4daa461c..33e4b6dae59d 100644 --- a/crates/wasmtime/src/runtime/store.rs +++ b/crates/wasmtime/src/runtime/store.rs @@ -664,8 +664,12 @@ impl Store { // attempting to drop the instances themselves since the fibers may need // to be resumed and allowed to exit cleanly before we yank the state // out from under them. + // + // This will also drop any futures which might use a `&Accessor` fields + // in their `Drop::drop` implementations, in which case they'll need to + // be called from with in the context of a `tls::set` closure. #[cfg(feature = "component-model-async")] - ComponentStoreData::drop_fibers(&mut self.inner); + ComponentStoreData::drop_fibers_and_futures(&mut self.inner); // Ensure all fiber stacks, even cached ones, are all flushed out to the // instance allocator. diff --git a/crates/wasmtime/src/runtime/vm/component/libcalls.rs b/crates/wasmtime/src/runtime/vm/component/libcalls.rs index 7577305f9d3c..be65a8820acc 100644 --- a/crates/wasmtime/src/runtime/vm/component/libcalls.rs +++ b/crates/wasmtime/src/runtime/vm/component/libcalls.rs @@ -1004,9 +1004,11 @@ fn future_drop_writable( ty: u32, writer: u32, ) -> Result<()> { - instance - .concurrent_state_mut(store) - .future_drop_writable(TypeFutureTableIndex::from_u32(ty), writer) + store.component_async_store().future_drop_writable( + instance, + TypeFutureTableIndex::from_u32(ty), + writer, + ) } #[cfg(feature = "component-model-async")] @@ -1103,9 +1105,11 @@ fn stream_drop_writable( ty: u32, writer: u32, ) -> Result<()> { - instance - .concurrent_state_mut(store) - .stream_drop_writable(TypeStreamTableIndex::from_u32(ty), writer) + store.component_async_store().stream_drop_writable( + instance, + TypeStreamTableIndex::from_u32(ty), + writer, + ) } #[cfg(feature = "component-model-async")] diff --git a/crates/wit-bindgen/src/lib.rs b/crates/wit-bindgen/src/lib.rs index 50f06777d341..d4811aa9499e 100644 --- a/crates/wit-bindgen/src/lib.rs +++ b/crates/wit-bindgen/src/lib.rs @@ -1516,9 +1516,25 @@ impl Wasmtime { let src = src.unwrap_or(&mut self.src); let gate = FeatureGate::open(src, stability); let camel = name.to_upper_camel_case(); + let flags = self.opts.imports.resource_drop_flags(resolve, key, name); if flags.contains(FunctionFlags::ASYNC) { - uwriteln!( + if flags.contains(FunctionFlags::STORE) { + uwriteln!( + src, + "{inst}.resource_concurrent( + \"{name}\", + {wt}::component::ResourceType::host::<{camel}>(), + move |caller: &{wt}::component::Accessor::, rep| {{ + {wt}::component::__internal::Box::pin(async move {{ + let accessor = &caller.with_data(host_getter); + Host{camel}Concurrent::drop(accessor, {wt}::component::Resource::new_own(rep)).await + }}) + }}, + )?;" + ) + } else { + uwriteln!( src, "{inst}.resource_async( \"{name}\", @@ -1530,6 +1546,7 @@ impl Wasmtime { }}, )?;" ) + } } else { uwriteln!( src, @@ -2874,6 +2891,27 @@ impl<'a> InterfaceGenerator<'a> { with_store_supertraits.join(" + "), ); ret.with_store_name = Some(format!("{trait_name}WithStore")); + + for extra in extra_functions { + match extra { + ExtraTraitMethod::ResourceDrop { name } => { + let flags = self.import_resource_drop_flags(name); + if !flags.contains(FunctionFlags::STORE) { + continue; + } + let camel = name.to_upper_camel_case(); + + assert!(flags.contains(FunctionFlags::ASYNC)); + + uwrite!( + self.src, + "fn drop(accessor: &{wt}::component::Accessor, rep: {wt}::component::Resource<{camel}>) -> impl ::core::future::Future> + Send where Self: Sized;" + ); + } + ExtraTraitMethod::ErrorConvert { .. } => {} + } + } + for (func, flags) in partition.with_store.iter() { self.generate_function_trait_sig(func, *flags); self.push_str(";\n"); @@ -2889,6 +2927,7 @@ impl<'a> InterfaceGenerator<'a> { " where _T: {}", with_store_supertraits.join(" + ") ); + uwriteln!(self.src, "{{}}"); } @@ -2907,8 +2946,12 @@ impl<'a> InterfaceGenerator<'a> { match extra { ExtraTraitMethod::ResourceDrop { name } => { let camel = name.to_upper_camel_case(); + let flags = self.import_resource_drop_flags(name); ret.all_func_flags |= flags; + if flags.contains(FunctionFlags::STORE) { + continue; + } uwrite!( self.src, "fn drop(&mut self, rep: {wt}::component::Resource<{camel}>) -> " diff --git a/crates/wit-bindgen/src/rust.rs b/crates/wit-bindgen/src/rust.rs index a5fccd5de07f..e2b6383379bd 100644 --- a/crates/wit-bindgen/src/rust.rs +++ b/crates/wit-bindgen/src/rust.rs @@ -176,12 +176,12 @@ pub trait RustGenerator<'a> { TypeDefKind::Future(ty) => { let wt = self.wasmtime_path(); let t = self.optional_ty(ty.as_ref(), TypeMode::Owned); - format!("{wt}::component::HostFuture<{t}>") + format!("{wt}::component::FutureReader<{t}>") } TypeDefKind::Stream(ty) => { let wt = self.wasmtime_path(); let t = self.optional_ty(ty.as_ref(), TypeMode::Owned); - format!("{wt}::component::HostStream<{t}>") + format!("{wt}::component::StreamReader<{t}>") } TypeDefKind::Handle(handle) => self.handle(handle), TypeDefKind::Resource => unreachable!(), From f1450b8a030db3a78fc96edaa117f17f3a1026dc Mon Sep 17 00:00:00 2001 From: Joel Dice Date: Wed, 30 Jul 2025 08:12:42 -0600 Subject: [PATCH 2/3] lower host stream/future writes in background task This avoids unsoundness due to guest realloc calls while there are host embedder frames on the stack. Signed-off-by: Joel Dice --- .../concurrent/futures_and_streams.rs | 79 +++++++++++++------ 1 file changed, 53 insertions(+), 26 deletions(-) diff --git a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs index 74fb17c5ea9e..2809cb6b12f7 100644 --- a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs +++ b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs @@ -3,7 +3,7 @@ use super::{ Event, GlobalErrorContextRefCount, LocalErrorContextRefCount, StateTable, Waitable, WaitableCommon, WaitableState, }; -use crate::component::concurrent::ConcurrentState; +use crate::component::concurrent::{ConcurrentState, HostTaskOutput, tls}; use crate::component::func::{self, LiftContext, LowerContext, Options}; use crate::component::matching::InstanceType; use crate::component::values::{ErrorContextAny, FutureAny, StreamAny}; @@ -1746,34 +1746,61 @@ impl Instance { } let read_handle = transmit.read_handle; - let (result, code) = accept_reader::( - store.as_context_mut(), - self, - Reader::Guest { - options: &options, - ty, - address, - count, - }, - buffer, - kind, - )?; - - self.concurrent_state_mut(store.0).set_event( - read_handle.rep(), - match ty { - TableIndex::Future(ty) => Event::FutureRead { - code, - pending: Some((ty, handle)), + let accept = move |mut store: StoreContextMut| { + let (result, code) = accept_reader::( + store.as_context_mut(), + self, + Reader::Guest { + options: &options, + ty, + address, + count, }, - TableIndex::Stream(ty) => Event::StreamRead { - code, - pending: Some((ty, handle)), + buffer, + kind, + )?; + + self.concurrent_state_mut(store.0).set_event( + read_handle.rep(), + match ty { + TableIndex::Future(ty) => Event::FutureRead { + code, + pending: Some((ty, handle)), + }, + TableIndex::Stream(ty) => Event::StreamRead { + code, + pending: Some((ty, handle)), + }, }, - }, - )?; + )?; - Ok(result) + anyhow::Ok(result) + }; + + if + // TODO: Check if payload is "flat" + false { + // Optimize flat payloads (i.e. those which do not require + // calling the guest's realloc function) by lowering + // directly instead of using a oneshot::channel and + // background task. + Ok(accept(store)?) + } else { + // Otherwise, for payloads which may require a realloc call, + // use a oneshot::channel and background task. This is + // necessary because calling the guest while there are host + // embedder frames on the stack is unsound. + let (tx, rx) = oneshot::channel(); + let token = StoreToken::new(store.as_context_mut()); + self.concurrent_state_mut(store.0) + .push_future(Box::pin(async move { + HostTaskOutput::Result(tls::get(|store| { + _ = tx.send(accept(token.as_context_mut(store))?); + Ok(()) + })) + })); + Err(rx) + } } ReadState::HostReady { accept } => { From 74ee080c0228a056c12b050aaeb24dbcc12c350c Mon Sep 17 00:00:00 2001 From: Joel Dice Date: Wed, 30 Jul 2025 09:17:55 -0600 Subject: [PATCH 3/3] fix `tcp.rs` regressions Signed-off-by: Joel Dice --- crates/wasi/src/p3/sockets/host/types/tcp.rs | 78 ++++++++++---------- 1 file changed, 41 insertions(+), 37 deletions(-) diff --git a/crates/wasi/src/p3/sockets/host/types/tcp.rs b/crates/wasi/src/p3/sockets/host/types/tcp.rs index 530fa3fbe144..82cd50a79c7f 100644 --- a/crates/wasi/src/p3/sockets/host/types/tcp.rs +++ b/crates/wasi/src/p3/sockets/host/types/tcp.rs @@ -14,8 +14,8 @@ use io_lifetimes::AsSocketlike as _; use rustix::io::Errno; use tokio::net::{TcpListener, TcpStream}; use wasmtime::component::{ - Accessor, AccessorTask, FutureWriter, HostFuture, HostStream, Resource, ResourceTable, - StreamWriter, + Accessor, AccessorTask, FutureReader, FutureWriter, GuardedFutureWriter, GuardedStreamWriter, + Resource, ResourceTable, StreamReader, StreamWriter, }; use crate::p3::DEFAULT_BUFFER_CAPACITY; @@ -57,16 +57,17 @@ fn get_socket_mut<'a>( struct ListenTask { listener: Arc, family: SocketAddressFamily, - tx: StreamWriter>>, + tx: StreamWriter>, options: NonInheritedOptions, } impl AccessorTask> for ListenTask { - async fn run(mut self, store: &Accessor) -> wasmtime::Result<()> { - while !self.tx.is_closed() { + async fn run(self, store: &Accessor) -> wasmtime::Result<()> { + let mut tx = GuardedStreamWriter::new(store, self.tx); + while !tx.is_closed() { let Some(res) = ({ let mut accept = pin!(self.listener.accept()); - let mut tx = pin!(self.tx.watch_reader(store)); + let mut tx = pin!(tx.watch_reader()); poll_fn(|cx| match tx.as_mut().poll(cx) { Poll::Ready(()) => return Poll::Ready(None), Poll::Pending => accept.as_mut().poll(cx).map(Some), @@ -121,8 +122,8 @@ impl AccessorTask> for ListenTask { .push(TcpSocket::from_state(state, self.family)) .context("failed to push socket resource to table") })?; - if let Some(socket) = self.tx.write(store, Some(socket)).await { - debug_assert!(self.tx.is_closed()); + if let Some(socket) = tx.write(Some(socket)).await { + debug_assert!(tx.is_closed()); store.with(|mut view| { view.get() .table @@ -143,32 +144,32 @@ struct ResultWriteTask { impl AccessorTask> for ResultWriteTask { async fn run(self, store: &Accessor) -> wasmtime::Result<()> { - self.result_tx.write(store, self.result).await; + GuardedFutureWriter::new(store, self.result_tx) + .write(self.result) + .await; Ok(()) } } struct ReceiveTask { stream: Arc, - data_tx: StreamWriter>, + data_tx: StreamWriter, result_tx: FutureWriter>, } impl AccessorTask> for ReceiveTask { - async fn run(mut self, store: &Accessor) -> wasmtime::Result<()> { + async fn run(self, store: &Accessor) -> wasmtime::Result<()> { let mut buf = BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY); + let mut data_tx = GuardedStreamWriter::new(store, self.data_tx); + let result_tx = GuardedFutureWriter::new(store, self.result_tx); let res = loop { match self.stream.try_read_buf(&mut buf) { Ok(0) => { break Ok(()); } Ok(..) => { - buf = self - .data_tx - .write_all(store, Cursor::new(buf)) - .await - .into_inner(); - if self.data_tx.is_closed() { + buf = data_tx.write_all(Cursor::new(buf)).await.into_inner(); + if data_tx.is_closed() { break Ok(()); } buf.clear(); @@ -176,7 +177,7 @@ impl AccessorTask> for ReceiveTask { Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { let Some(res) = ({ let mut readable = pin!(self.stream.readable()); - let mut tx = pin!(self.data_tx.watch_reader(store)); + let mut tx = pin!(data_tx.watch_reader()); poll_fn(|cx| match tx.as_mut().poll(cx) { Poll::Ready(()) => return Poll::Ready(None), Poll::Pending => readable.as_mut().poll(cx).map(Some), @@ -203,7 +204,7 @@ impl AccessorTask> for ReceiveTask { // task are freed store.spawn(ResultWriteTask { result: res, - result_tx: self.result_tx, + result_tx: result_tx.into(), }); Ok(()) } @@ -284,14 +285,10 @@ impl HostTcpSocketWithStore for WasiSockets { async fn listen( store: &Accessor, socket: Resource, - ) -> wasmtime::Result>, ErrorCode>> { + ) -> wasmtime::Result>, ErrorCode>> { store.with(|mut view| { - let (tx, rx) = view - .instance() - .stream::<_, _, Option<_>>(&mut view) - .context("failed to create stream")?; if !view.get().ctx.allowed_network_uses.tcp { - return Ok(Err(ErrorCode::AccessDenied)); + return anyhow::Ok(Err(ErrorCode::AccessDenied)); } let TcpSocket { tcp_state, @@ -328,24 +325,29 @@ impl HostTcpSocketWithStore for WasiSockets { }; let listener = Arc::new(listener); *tcp_state = TcpState::Listening(Arc::clone(&listener)); + let family = *family; + let options = options.clone(); + let (tx, rx) = view + .instance() + .stream(&mut view) + .context("failed to create stream")?; let task = ListenTask { listener, - family: *family, + family, tx, - options: options.clone(), + options, }; view.spawn(task); - Ok(Ok(rx.into())) + Ok(Ok(rx)) }) } async fn send( store: &Accessor, socket: Resource, - data: HostStream, + data: StreamReader, ) -> wasmtime::Result> { let (stream, mut data) = match store.with(|mut view| -> wasmtime::Result<_> { - let data = data.into_reader::>(&mut view); let sock = get_socket(view.get().table, &socket)?; if let TcpState::Connected(stream) | TcpState::Receiving(stream) = &sock.tcp_state { Ok(Ok((Arc::clone(&stream), data))) @@ -387,32 +389,34 @@ impl HostTcpSocketWithStore for WasiSockets { async fn receive( store: &Accessor, socket: Resource, - ) -> wasmtime::Result<(HostStream, HostFuture>)> { + ) -> wasmtime::Result<(StreamReader, FutureReader>)> { store.with(|mut view| { let instance = view.instance(); let (data_tx, data_rx) = instance - .stream::<_, _, BytesMut>(&mut view) + .stream(&mut view) .context("failed to create stream")?; let TcpSocket { tcp_state, .. } = get_socket_mut(view.get().table, &socket)?; match mem::replace(tcp_state, TcpState::Closed) { TcpState::Connected(stream) => { *tcp_state = TcpState::Receiving(Arc::clone(&stream)); let (result_tx, result_rx) = instance - .future(|| unreachable!(), &mut view) + .future(&mut view, || unreachable!()) .context("failed to create future")?; view.spawn(ReceiveTask { stream, data_tx, result_tx, }); - Ok((data_rx.into(), result_rx.into())) + Ok((data_rx, result_rx)) } prev => { *tcp_state = prev; - let (_, result_rx) = instance - .future(|| Err(ErrorCode::InvalidState), &mut view) + let (result_tx, result_rx) = instance + .future(&mut view, || Err(ErrorCode::InvalidState)) .context("failed to create future")?; - Ok((data_rx.into(), result_rx.into())) + result_tx.close(&mut view)?; + data_tx.close(&mut view)?; + Ok((data_rx, result_rx)) } } })