diff --git a/crates/misc/component-async-tests/src/resource_stream.rs b/crates/misc/component-async-tests/src/resource_stream.rs index af030fbb6def..3b8bf30dda41 100644 --- a/crates/misc/component-async-tests/src/resource_stream.rs +++ b/crates/misc/component-async-tests/src/resource_stream.rs @@ -1,5 +1,7 @@ use anyhow::Result; -use wasmtime::component::{Accessor, AccessorTask, HostStream, Resource, StreamWriter}; +use wasmtime::component::{ + Accessor, AccessorTask, GuardedStreamWriter, Resource, StreamReader, StreamWriter, +}; use wasmtime_wasi::p2::IoView; use super::Ctx; @@ -36,20 +38,20 @@ impl bindings::local::local::resource_stream::HostWithStore for Ctx { async fn foo( accessor: &Accessor, count: u32, - ) -> wasmtime::Result>> { + ) -> wasmtime::Result>> { struct Task { - tx: StreamWriter>>, + tx: StreamWriter>, count: u32, } impl AccessorTask> for Task { async fn run(self, accessor: &Accessor) -> Result<()> { - let mut tx = self.tx; + let mut tx = GuardedStreamWriter::new(accessor, self.tx); for _ in 0..self.count { let item = accessor.with(|mut view| view.get().table().push(ResourceStreamX))?; - tx.write_all(accessor, Some(item)).await; + tx.write_all(Some(item)).await; } Ok(()) } @@ -57,10 +59,10 @@ impl bindings::local::local::resource_stream::HostWithStore for Ctx { let (tx, rx) = accessor.with(|mut view| { let instance = view.instance(); - instance.stream::<_, _, Option<_>>(&mut view) + instance.stream(&mut view) })?; accessor.spawn(Task { tx, count }); - Ok(rx.into()) + Ok(rx) } } diff --git a/crates/misc/component-async-tests/tests/scenario/streams.rs b/crates/misc/component-async-tests/tests/scenario/streams.rs index e77daf683aaf..f6a5972ae6e5 100644 --- a/crates/misc/component-async-tests/tests/scenario/streams.rs +++ b/crates/misc/component-async-tests/tests/scenario/streams.rs @@ -14,7 +14,10 @@ use { }, wasmtime::{ Engine, Store, - component::{Linker, ResourceTable, StreamReader, StreamWriter, VecBuffer}, + component::{ + GuardedFutureReader, GuardedStreamReader, GuardedStreamWriter, Linker, ResourceTable, + VecBuffer, + }, }, wasmtime_wasi::p2::WasiCtxBuilder, }; @@ -46,108 +49,114 @@ pub async fn async_watch_streams() -> Result<()> { let instance = linker.instantiate_async(&mut store, &component).await?; // Test watching and then dropping the read end of a stream. - let (mut tx, rx) = instance.stream::, Option<_>>(&mut store)?; + let (mut tx, rx) = instance.stream::(&mut store)?; instance .run_concurrent(&mut store, async |store| { - futures::join!(tx.watch_reader(store), async { - drop(rx); - }); + futures::join!(tx.watch_reader(store), async { rx.close_with(store) }).1 }) - .await?; + .await??; // Test dropping and then watching the read end of a stream. - let (mut tx, rx) = instance.stream::, Option<_>>(&mut store)?; - drop(rx); + let (mut tx, rx) = instance.stream::(&mut store)?; instance - .run_concurrent(&mut store, async |store| tx.watch_reader(store).await) - .await?; + .run_concurrent(&mut store, async |store| { + rx.close_with(store)?; + tx.watch_reader(store).await; + anyhow::Ok(()) + }) + .await??; // Test watching and then dropping the write end of a stream. - let (tx, mut rx) = instance.stream::, Option<_>>(&mut store)?; + let (tx, mut rx) = instance.stream::(&mut store)?; instance .run_concurrent(&mut store, async |store| { - futures::join!(rx.watch_writer(store), async { - drop(tx); - }); + futures::join!(rx.watch_writer(store), async { tx.close_with(store) }).1 }) - .await?; + .await??; // Test dropping and then watching the write end of a stream. - let (tx, mut rx) = instance.stream::, Option<_>>(&mut store)?; - drop(tx); + let (tx, mut rx) = instance.stream::(&mut store)?; instance - .run_concurrent(&mut store, async |store| rx.watch_writer(store).await) - .await?; + .run_concurrent(&mut store, async |store| { + tx.close_with(store)?; + rx.watch_writer(store).await; + anyhow::Ok(()) + }) + .await??; // Test watching and then dropping the read end of a future. - let (mut tx, rx) = instance.future::(|| 42, &mut store)?; + let (mut tx, rx) = instance.future::(&mut store, || 42)?; instance .run_concurrent(&mut store, async |store| { - futures::join!(tx.watch_reader(store), async { - drop(rx); - }); + futures::join!(tx.watch_reader(store), async { rx.close_with(store) }).1 }) - .await?; + .await??; // Test dropping and then watching the read end of a future. - let (mut tx, rx) = instance.future::(|| 42, &mut store)?; - drop(rx); + let (mut tx, rx) = instance.future::(&mut store, || 42)?; instance - .run_concurrent(&mut store, async |store| tx.watch_reader(store).await) - .await?; + .run_concurrent(&mut store, async |store| { + rx.close_with(store)?; + tx.watch_reader(store).await; + anyhow::Ok(()) + }) + .await??; // Test watching and then dropping the write end of a future. - let (tx, mut rx) = instance.future::(|| 42, &mut store)?; + let (tx, mut rx) = instance.future::(&mut store, || 42)?; instance .run_concurrent(&mut store, async |store| { - futures::join!(rx.watch_writer(store), async { - drop(tx); - }); + futures::join!(rx.watch_writer(store), async { tx.close_with(store) }).1 }) - .await?; + .await??; // Test dropping and then watching the write end of a future. - let (tx, mut rx) = instance.future::(|| 42, &mut store)?; - drop(tx); + let (tx, mut rx) = instance.future::(&mut store, || 42)?; instance - .run_concurrent(&mut store, async |store| rx.watch_writer(store).await) - .await?; + .run_concurrent(&mut store, async |store| { + tx.close_with(store)?; + rx.watch_writer(store).await; + anyhow::Ok(()) + }) + .await??; - enum Event { - Write(Option>>), - Read(Option>>, Option), + enum Event<'a> { + Write(Option>), + Read(Option>, Option), } // Test watching, then writing to, then dropping, then writing again to the // read end of a stream. + let (tx, rx) = instance.stream(&mut store)?; instance - .run_concurrent(&mut store, async |store| -> wasmtime::Result<_> { + .run_concurrent(&mut store, async move |store| -> wasmtime::Result<_> { + let mut tx = GuardedStreamWriter::new(store, tx); + let mut rx = GuardedStreamReader::new(store, rx); let mut futures = FuturesUnordered::new(); - let (mut tx, mut rx) = store.with(|s| instance.stream(s))?; assert!( - pin!(tx.watch_reader(store)) + pin!(tx.watch_reader()) .poll(&mut Context::from_waker(&Waker::noop())) .is_pending() ); futures.push( async move { - tx.write_all(store, Some(42)).await; + tx.write_all(Some(42)).await; let w = if tx.is_closed() { None } else { Some(tx) }; - Event::Write(w) + anyhow::Ok(Event::Write(w)) } .boxed(), ); futures.push( async move { - let b = rx.read(store, None).await; + let b = rx.read(None).await; let r = if rx.is_closed() { None } else { Some(rx) }; - Event::Read(r, b) + Ok(Event::Read(r, b)) } .boxed(), ); let mut rx = None; let mut tx = None; - while let Some(event) = futures.next().await { + while let Some(event) = futures.try_next().await? { match event { Event::Write(None) => unreachable!(), Event::Write(Some(new_tx)) => tx = Some(new_tx), @@ -161,8 +170,8 @@ pub async fn async_watch_streams() -> Result<()> { drop(rx); let mut tx = tx.take().unwrap(); - tx.watch_reader(store).await; - tx.write_all(store, Some(42)).await; + tx.watch_reader().await; + tx.write_all(Some(42)).await; assert!(tx.is_closed()); Ok(()) }) @@ -206,10 +215,10 @@ pub async fn test_closed_streams(watch: bool) -> Result<()> { let instance = linker.instantiate_async(&mut store, &component).await?; - enum StreamEvent { - FirstWrite(Option>>), - FirstRead(Option>>, Vec), - SecondWrite(Option>>), + enum StreamEvent<'a> { + FirstWrite(Option>), + FirstRead(Option>, Vec), + SecondWrite(Option>), GuestCompleted, } @@ -225,83 +234,95 @@ pub async fn test_closed_streams(watch: bool) -> Result<()> { let value = 42_u8; // First, test stream host->host - instance - .run_concurrent(&mut store, async |store| -> wasmtime::Result<_> { - let (mut tx, mut rx) = store.with(|mut s| instance.stream(&mut s))?; + { + let (tx, rx) = instance.stream(&mut store)?; + let values = values.clone(); - let mut futures = FuturesUnordered::new(); - futures.push({ - let values = values.clone(); - async move { - tx.write_all(store, values.into()).await; - StreamEvent::FirstWrite(if tx.is_closed() { None } else { Some(tx) }) - } - .boxed() - }); - futures.push( - async move { - let b = rx.read(store, Vec::with_capacity(3)).await; - let r = if rx.is_closed() { None } else { Some(rx) }; - StreamEvent::FirstRead(r, b) - } - .boxed(), - ); + instance + .run_concurrent(&mut store, async move |store| -> wasmtime::Result<_> { + let mut tx = GuardedStreamWriter::new(store, tx); + let mut rx = GuardedStreamReader::new(store, rx); - let mut count = 0; - while let Some(event) = futures.next().await { - count += 1; - match event { - StreamEvent::FirstWrite(Some(mut tx)) => { - if watch { - futures.push( - async move { - tx.watch_reader(store).await; - StreamEvent::SecondWrite(None) - } - .boxed(), - ); + let mut futures = FuturesUnordered::new(); + futures.push({ + let values = values.clone(); + async move { + tx.write_all(VecBuffer::from(values)).await; + anyhow::Ok(StreamEvent::FirstWrite(if tx.is_closed() { + None } else { - futures.push({ - let values = values.clone(); - async move { - tx.write_all(store, values.into()).await; - StreamEvent::SecondWrite(if tx.is_closed() { - None - } else { - Some(tx) - }) - } - .boxed() - }); - } + Some(tx) + })) } - StreamEvent::FirstWrite(None) => { - panic!("first write should have been accepted") - } - StreamEvent::FirstRead(Some(_), results) => { - assert_eq!(values, results); + .boxed() + }); + futures.push( + async move { + let b = rx.read(Vec::with_capacity(3)).await; + let r = if rx.is_closed() { None } else { Some(rx) }; + Ok(StreamEvent::FirstRead(r, b)) } - StreamEvent::FirstRead(None, _) => unreachable!(), - StreamEvent::SecondWrite(None) => {} - StreamEvent::SecondWrite(Some(_)) => { - panic!("second write should _not_ have been accepted") + .boxed(), + ); + + let mut count = 0; + while let Some(event) = futures.try_next().await? { + count += 1; + match event { + StreamEvent::FirstWrite(Some(mut tx)) => { + if watch { + futures.push( + async move { + tx.watch_reader().await; + Ok(StreamEvent::SecondWrite(None)) + } + .boxed(), + ); + } else { + futures.push({ + let values = values.clone(); + async move { + tx.write_all(VecBuffer::from(values)).await; + Ok(StreamEvent::SecondWrite(if tx.is_closed() { + None + } else { + Some(tx) + })) + } + .boxed() + }); + } + } + StreamEvent::FirstWrite(None) => { + panic!("first write should have been accepted") + } + StreamEvent::FirstRead(Some(_), results) => { + assert_eq!(values, results); + } + StreamEvent::FirstRead(None, _) => unreachable!(), + StreamEvent::SecondWrite(None) => {} + StreamEvent::SecondWrite(Some(_)) => { + panic!("second write should _not_ have been accepted") + } + StreamEvent::GuestCompleted => unreachable!(), } - StreamEvent::GuestCompleted => unreachable!(), } - } - assert_eq!(count, 3); - Ok(()) - }) - .await??; + assert_eq!(count, 3); + Ok(()) + }) + .await??; + } // Next, test futures host->host { - let (tx, rx) = instance.future(|| unreachable!(), &mut store)?; - let (mut tx_ignored, rx_ignored) = instance.future(|| 42u8, &mut store)?; + let (tx, rx) = instance.future(&mut store, || unreachable!())?; + let (mut tx_ignored, rx_ignored) = instance.future(&mut store, || unreachable!())?; instance - .run_concurrent(&mut store, async |store| { + .run_concurrent(&mut store, async move |store| { + let rx_ignored = GuardedFutureReader::new(store, rx_ignored); + let mut futures = FuturesUnordered::new(); futures.push(tx.write(store, value).map(FutureEvent::Write).boxed()); futures.push(rx.read(store).map(FutureEvent::Read).boxed()); @@ -348,24 +369,28 @@ pub async fn test_closed_streams(watch: bool) -> Result<()> { // Next, test stream host->guest { - let (mut tx, rx) = instance.stream::<_, _, Vec<_>>(&mut store)?; + let (tx, rx) = instance.stream(&mut store)?; let closed_streams = closed_streams::bindings::ClosedStreams::new(&mut store, &instance)?; + let values = values.clone(); + instance .run_concurrent(&mut store, async move |accessor| { + let mut tx = GuardedStreamWriter::new(accessor, tx); + let mut futures = FuturesUnordered::new(); futures.push( closed_streams .local_local_closed() - .call_read_stream(accessor, rx.into(), values.clone()) + .call_read_stream(accessor, rx, values.clone()) .map(|v| v.map(|()| StreamEvent::GuestCompleted)) .boxed(), ); futures.push({ let values = values.clone(); async move { - tx.write_all(accessor, values.into()).await; + tx.write_all(VecBuffer::from(values)).await; let w = if tx.is_closed() { None } else { Some(tx) }; Ok(StreamEvent::FirstWrite(w)) } @@ -380,7 +405,7 @@ pub async fn test_closed_streams(watch: bool) -> Result<()> { if watch { futures.push( async move { - tx.watch_reader(accessor).await; + tx.watch_reader().await; Ok(StreamEvent::SecondWrite(None)) } .boxed(), @@ -389,7 +414,7 @@ pub async fn test_closed_streams(watch: bool) -> Result<()> { futures.push({ let values = values.clone(); async move { - tx.write_all(accessor, values.into()).await; + tx.write_all(VecBuffer::from(values)).await; let w = if tx.is_closed() { None } else { Some(tx) }; Ok(StreamEvent::SecondWrite(w)) } @@ -418,8 +443,8 @@ pub async fn test_closed_streams(watch: bool) -> Result<()> { // Next, test futures host->guest { - let (tx, rx) = instance.future(|| unreachable!(), &mut store)?; - let (mut tx_ignored, rx_ignored) = instance.future(|| 0, &mut store)?; + let (tx, rx) = instance.future(&mut store, || unreachable!())?; + let (mut tx_ignored, rx_ignored) = instance.future(&mut store, || unreachable!())?; let closed_streams = closed_streams::bindings::ClosedStreams::new(&mut store, &instance)?; @@ -429,7 +454,7 @@ pub async fn test_closed_streams(watch: bool) -> Result<()> { futures.push( closed_streams .local_local_closed() - .call_read_future(accessor, rx.into(), value, rx_ignored.into()) + .call_read_future(accessor, rx, value, rx_ignored) .map(|v| v.map(|()| FutureEvent::GuestCompleted)) .boxed(), ); diff --git a/crates/misc/component-async-tests/tests/scenario/transmit.rs b/crates/misc/component-async-tests/tests/scenario/transmit.rs index 038c0e4cc8fc..6f40a9f083a0 100644 --- a/crates/misc/component-async-tests/tests/scenario/transmit.rs +++ b/crates/misc/component-async-tests/tests/scenario/transmit.rs @@ -12,8 +12,8 @@ use futures::{ stream::{FuturesUnordered, TryStreamExt}, }; use wasmtime::component::{ - Accessor, Component, HasSelf, HostFuture, HostStream, Instance, Linker, ResourceTable, - StreamReader, StreamWriter, Val, + Accessor, Component, FutureReader, GuardedFutureReader, GuardedStreamReader, + GuardedStreamWriter, HasSelf, Instance, Linker, ResourceTable, StreamReader, Val, }; use wasmtime::{AsContextMut, Engine, Store}; use wasmtime_wasi::p2::WasiCtxBuilder; @@ -175,17 +175,21 @@ pub trait TransmitTest { ) -> impl Future> + Send + 'a; fn into_params( - control: HostStream, - caller_stream: HostStream, - caller_future1: HostFuture, - caller_future2: HostFuture, + control: StreamReader, + caller_stream: StreamReader, + caller_future1: FutureReader, + caller_future2: FutureReader, ) -> Self::Params; fn from_result( store: impl AsContextMut, instance: Instance, result: Self::Result, - ) -> Result<(HostStream, HostFuture, HostFuture)>; + ) -> Result<( + StreamReader, + FutureReader, + FutureReader, + )>; } struct StaticTransmitTest; @@ -193,12 +197,16 @@ struct StaticTransmitTest; impl TransmitTest for StaticTransmitTest { type Instance = transmit::bindings::TransmitCallee; type Params = ( - HostStream, - HostStream, - HostFuture, - HostFuture, + StreamReader, + StreamReader, + FutureReader, + FutureReader, + ); + type Result = ( + StreamReader, + FutureReader, + FutureReader, ); - type Result = (HostStream, HostFuture, HostFuture); async fn instantiate( mut store: impl AsContextMut, @@ -221,10 +229,10 @@ impl TransmitTest for StaticTransmitTest { } fn into_params( - control: HostStream, - caller_stream: HostStream, - caller_future1: HostFuture, - caller_future2: HostFuture, + control: StreamReader, + caller_stream: StreamReader, + caller_future1: FutureReader, + caller_future2: FutureReader, ) -> Self::Params { (control, caller_stream, caller_future1, caller_future2) } @@ -233,7 +241,11 @@ impl TransmitTest for StaticTransmitTest { _: impl AsContextMut, _: Instance, result: Self::Result, - ) -> Result<(HostStream, HostFuture, HostFuture)> { + ) -> Result<( + StreamReader, + FutureReader, + FutureReader, + )> { Ok(result) } } @@ -283,10 +295,10 @@ impl TransmitTest for DynamicTransmitTest { } fn into_params( - control: HostStream, - caller_stream: HostStream, - caller_future1: HostFuture, - caller_future2: HostFuture, + control: StreamReader, + caller_stream: StreamReader, + caller_future1: FutureReader, + caller_future2: FutureReader, ) -> Self::Params { vec![ control.into_val(), @@ -300,13 +312,17 @@ impl TransmitTest for DynamicTransmitTest { mut store: impl AsContextMut, instance: Instance, result: Self::Result, - ) -> Result<(HostStream, HostFuture, HostFuture)> { + ) -> Result<( + StreamReader, + FutureReader, + FutureReader, + )> { let Val::Tuple(fields) = result else { unreachable!() }; - let stream = HostStream::from_val(store.as_context_mut(), instance, &fields[0])?; - let future1 = HostFuture::from_val(store.as_context_mut(), instance, &fields[1])?; - let future2 = HostFuture::from_val(store.as_context_mut(), instance, &fields[2])?; + let stream = StreamReader::from_val(store.as_context_mut(), instance, &fields[0])?; + let future1 = FutureReader::from_val(store.as_context_mut(), instance, &fields[1])?; + let future2 = FutureReader::from_val(store.as_context_mut(), instance, &fields[2])?; Ok((stream, future1, future2)) } } @@ -341,29 +357,32 @@ async fn test_transmit_with(component: &str) -> Re let (test, instance) = Test::instantiate(&mut store, &component, &linker).await?; - enum Event { + enum Event<'a, Test: TransmitTest> { Result(Test::Result), - ControlWriteA(Option>>), - ControlWriteB(Option>>), - ControlWriteC(Option>>), + ControlWriteA(Option>), + ControlWriteB(Option>), + ControlWriteC(Option>), ControlWriteD, WriteA, WriteB(bool), - ReadC(Option>>, Option), + ReadC(Option>, Option), ReadD(Option), - ReadNone(Option>>), + ReadNone(Option>), } - let (mut control_tx, control_rx) = instance.stream::<_, _, Option<_>>(&mut store)?; - let (mut caller_stream_tx, caller_stream_rx) = - instance.stream::<_, _, Option<_>>(&mut store)?; - let (caller_future1_tx, caller_future1_rx) = instance.future(|| unreachable!(), &mut store)?; - let (_caller_future2_tx, caller_future2_rx) = instance.future(|| unreachable!(), &mut store)?; + let (control_tx, control_rx) = instance.stream(&mut store)?; + let (caller_stream_tx, caller_stream_rx) = instance.stream(&mut store)?; + let (caller_future1_tx, caller_future1_rx) = instance.future(&mut store, || unreachable!())?; + let (_caller_future2_tx, caller_future2_rx) = instance.future(&mut store, || unreachable!())?; instance .run_concurrent(&mut store, async move |accessor| { + let mut control_tx = GuardedStreamWriter::new(accessor, control_tx); + let control_rx = GuardedStreamReader::new(accessor, control_rx); + let mut caller_stream_tx = GuardedStreamWriter::new(accessor, caller_stream_tx); + let mut futures = FuturesUnordered::< - Pin>> + Send>>, + Pin>> + Send>>, >::new(); let mut caller_future1_tx = Some(caller_future1_tx); let mut callee_stream_rx = None; @@ -373,7 +392,7 @@ async fn test_transmit_with(component: &str) -> Re futures.push( async move { control_tx - .write_all(accessor, Some(Control::ReadStream("a".into()))) + .write_all(Some(Control::ReadStream("a".into()))) .await; let w = if control_tx.is_closed() { None @@ -387,9 +406,7 @@ async fn test_transmit_with(component: &str) -> Re futures.push( async move { - caller_stream_tx - .write_all(accessor, Some(String::from("a"))) - .await; + caller_stream_tx.write_all(Some(String::from("a"))).await; Ok(Event::WriteA) } .boxed(), @@ -401,9 +418,9 @@ async fn test_transmit_with(component: &str) -> Re &test, Test::into_params( control_rx.into(), - caller_stream_rx.into(), - caller_future1_rx.into(), - caller_future2_rx.into(), + caller_stream_rx, + caller_future1_rx, + caller_future2_rx, ), ) .map(|v| v.map(Event::Result)) @@ -413,19 +430,16 @@ async fn test_transmit_with(component: &str) -> Re while let Some(event) = futures.try_next().await? { match event { Event::Result(result) => { - accessor.with(|mut store| { - let results = Test::from_result(&mut store, instance, result)?; - callee_stream_rx = Some(results.0.into_reader(&mut store)); - callee_future1_rx = Some(results.1.into_reader(&mut store)); - anyhow::Ok(()) - })?; + let (stream_rx, future_rx, _) = accessor + .with(|mut store| Test::from_result(&mut store, instance, result))?; + callee_stream_rx = Some(GuardedStreamReader::new(accessor, stream_rx)); + callee_future1_rx = Some(GuardedFutureReader::new(accessor, future_rx)); } Event::ControlWriteA(tx) => { futures.push( async move { let mut tx = tx.unwrap(); - tx.write_all(accessor, Some(Control::ReadFuture("b".into()))) - .await; + tx.write_all(Some(Control::ReadFuture("b".into()))).await; let w = if tx.is_closed() { None } else { Some(tx) }; Ok(Event::ControlWriteB(w)) } @@ -447,8 +461,7 @@ async fn test_transmit_with(component: &str) -> Re futures.push( async move { let mut tx = tx.unwrap(); - tx.write_all(accessor, Some(Control::WriteStream("c".into()))) - .await; + tx.write_all(Some(Control::WriteStream("c".into()))).await; let w = if tx.is_closed() { None } else { Some(tx) }; Ok(Event::ControlWriteC(w)) } @@ -460,7 +473,7 @@ async fn test_transmit_with(component: &str) -> Re let mut rx = callee_stream_rx.take().unwrap(); futures.push( async move { - let b = rx.read(accessor, None).await; + let b = rx.read(None).await; let r = if rx.is_closed() { None } else { Some(rx) }; Ok(Event::ReadC(r, b)) } @@ -471,8 +484,7 @@ async fn test_transmit_with(component: &str) -> Re futures.push( async move { let mut tx = tx.unwrap(); - tx.write_all(accessor, Some(Control::WriteFuture("d".into()))) - .await; + tx.write_all(Some(Control::WriteFuture("d".into()))).await; Ok(Event::ControlWriteD) } .boxed(), @@ -485,7 +497,7 @@ async fn test_transmit_with(component: &str) -> Re callee_future1_rx .take() .unwrap() - .read(accessor) + .read() .map(Event::ReadD) .map(Ok) .boxed(), @@ -499,7 +511,7 @@ async fn test_transmit_with(component: &str) -> Re let mut rx = callee_stream_rx.take().unwrap(); futures.push( async move { - rx.read(accessor, None).await; + rx.read(None).await; let r = if rx.is_closed() { None } else { Some(rx) }; Ok(Event::ReadNone(r)) } diff --git a/crates/wasi/src/p3/cli/host.rs b/crates/wasi/src/p3/cli/host.rs index c37a569594ee..b5d25fb4e1af 100644 --- a/crates/wasi/src/p3/cli/host.rs +++ b/crates/wasi/src/p3/cli/host.rs @@ -11,12 +11,13 @@ use bytes::BytesMut; use std::io::Cursor; use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _}; use wasmtime::component::{ - Accessor, AccessorTask, HasData, HostStream, Resource, StreamReader, StreamWriter, + Accessor, AccessorTask, GuardedStreamReader, GuardedStreamWriter, HasData, Resource, + StreamReader, StreamWriter, }; struct InputTask { rx: T, - tx: StreamWriter>, + tx: StreamWriter, } impl AccessorTask> for InputTask @@ -26,15 +27,12 @@ where { async fn run(mut self, store: &Accessor) -> wasmtime::Result<()> { let mut buf = BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY); - while !self.tx.is_closed() { + let mut tx = GuardedStreamWriter::new(store, self.tx); + while !tx.is_closed() { match self.rx.read_buf(&mut buf).await { Ok(0) => return Ok(()), Ok(_) => { - buf = self - .tx - .write_all(store, Cursor::new(buf)) - .await - .into_inner(); + buf = tx.write_all(Cursor::new(buf)).await.into_inner(); buf.clear(); } Err(_err) => { @@ -48,7 +46,7 @@ where } struct OutputTask { - rx: StreamReader, + rx: StreamReader, tx: T, } @@ -59,8 +57,9 @@ where { async fn run(mut self, store: &Accessor) -> wasmtime::Result<()> { let mut buf = BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY); - while !self.rx.is_closed() { - buf = self.rx.read(store, buf).await; + let mut rx = GuardedStreamReader::new(store, self.rx); + while !rx.is_closed() { + buf = rx.read(buf).await; match self.tx.write_all(&buf).await { Ok(()) => { buf.clear(); @@ -140,18 +139,18 @@ impl terminal_stderr::Host for WasiCliCtxView<'_> { } impl stdin::HostWithStore for WasiCli { - async fn get_stdin(store: &Accessor) -> wasmtime::Result> { + async fn get_stdin(store: &Accessor) -> wasmtime::Result> { store.with(|mut view| { let instance = view.instance(); let (tx, rx) = instance - .stream::<_, _, BytesMut>(&mut view) + .stream(&mut view) .context("failed to create stream")?; let stdin = view.get().ctx.stdin.reader(); view.spawn(InputTask { rx: Box::into_pin(stdin), tx, }); - Ok(rx.into()) + Ok(rx) }) } } @@ -161,13 +160,12 @@ impl stdin::Host for WasiCliCtxView<'_> {} impl stdout::HostWithStore for WasiCli { async fn set_stdout( store: &Accessor, - data: HostStream, + data: StreamReader, ) -> wasmtime::Result<()> { store.with(|mut view| { - let stdout = data.into_reader(&mut view); let tx = view.get().ctx.stdout.writer(); view.spawn(OutputTask { - rx: stdout, + rx: data, tx: Box::into_pin(tx), }); Ok(()) @@ -180,13 +178,12 @@ impl stdout::Host for WasiCliCtxView<'_> {} impl stderr::HostWithStore for WasiCli { async fn set_stderr( store: &Accessor, - data: HostStream, + data: StreamReader, ) -> wasmtime::Result<()> { store.with(|mut view| { - let stderr = data.into_reader(&mut view); let tx = view.get().ctx.stderr.writer(); view.spawn(OutputTask { - rx: stderr, + rx: data, tx: Box::into_pin(tx), }); Ok(()) diff --git a/crates/wasi/src/p3/sockets/host/types/tcp.rs b/crates/wasi/src/p3/sockets/host/types/tcp.rs index 530fa3fbe144..82cd50a79c7f 100644 --- a/crates/wasi/src/p3/sockets/host/types/tcp.rs +++ b/crates/wasi/src/p3/sockets/host/types/tcp.rs @@ -14,8 +14,8 @@ use io_lifetimes::AsSocketlike as _; use rustix::io::Errno; use tokio::net::{TcpListener, TcpStream}; use wasmtime::component::{ - Accessor, AccessorTask, FutureWriter, HostFuture, HostStream, Resource, ResourceTable, - StreamWriter, + Accessor, AccessorTask, FutureReader, FutureWriter, GuardedFutureWriter, GuardedStreamWriter, + Resource, ResourceTable, StreamReader, StreamWriter, }; use crate::p3::DEFAULT_BUFFER_CAPACITY; @@ -57,16 +57,17 @@ fn get_socket_mut<'a>( struct ListenTask { listener: Arc, family: SocketAddressFamily, - tx: StreamWriter>>, + tx: StreamWriter>, options: NonInheritedOptions, } impl AccessorTask> for ListenTask { - async fn run(mut self, store: &Accessor) -> wasmtime::Result<()> { - while !self.tx.is_closed() { + async fn run(self, store: &Accessor) -> wasmtime::Result<()> { + let mut tx = GuardedStreamWriter::new(store, self.tx); + while !tx.is_closed() { let Some(res) = ({ let mut accept = pin!(self.listener.accept()); - let mut tx = pin!(self.tx.watch_reader(store)); + let mut tx = pin!(tx.watch_reader()); poll_fn(|cx| match tx.as_mut().poll(cx) { Poll::Ready(()) => return Poll::Ready(None), Poll::Pending => accept.as_mut().poll(cx).map(Some), @@ -121,8 +122,8 @@ impl AccessorTask> for ListenTask { .push(TcpSocket::from_state(state, self.family)) .context("failed to push socket resource to table") })?; - if let Some(socket) = self.tx.write(store, Some(socket)).await { - debug_assert!(self.tx.is_closed()); + if let Some(socket) = tx.write(Some(socket)).await { + debug_assert!(tx.is_closed()); store.with(|mut view| { view.get() .table @@ -143,32 +144,32 @@ struct ResultWriteTask { impl AccessorTask> for ResultWriteTask { async fn run(self, store: &Accessor) -> wasmtime::Result<()> { - self.result_tx.write(store, self.result).await; + GuardedFutureWriter::new(store, self.result_tx) + .write(self.result) + .await; Ok(()) } } struct ReceiveTask { stream: Arc, - data_tx: StreamWriter>, + data_tx: StreamWriter, result_tx: FutureWriter>, } impl AccessorTask> for ReceiveTask { - async fn run(mut self, store: &Accessor) -> wasmtime::Result<()> { + async fn run(self, store: &Accessor) -> wasmtime::Result<()> { let mut buf = BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY); + let mut data_tx = GuardedStreamWriter::new(store, self.data_tx); + let result_tx = GuardedFutureWriter::new(store, self.result_tx); let res = loop { match self.stream.try_read_buf(&mut buf) { Ok(0) => { break Ok(()); } Ok(..) => { - buf = self - .data_tx - .write_all(store, Cursor::new(buf)) - .await - .into_inner(); - if self.data_tx.is_closed() { + buf = data_tx.write_all(Cursor::new(buf)).await.into_inner(); + if data_tx.is_closed() { break Ok(()); } buf.clear(); @@ -176,7 +177,7 @@ impl AccessorTask> for ReceiveTask { Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { let Some(res) = ({ let mut readable = pin!(self.stream.readable()); - let mut tx = pin!(self.data_tx.watch_reader(store)); + let mut tx = pin!(data_tx.watch_reader()); poll_fn(|cx| match tx.as_mut().poll(cx) { Poll::Ready(()) => return Poll::Ready(None), Poll::Pending => readable.as_mut().poll(cx).map(Some), @@ -203,7 +204,7 @@ impl AccessorTask> for ReceiveTask { // task are freed store.spawn(ResultWriteTask { result: res, - result_tx: self.result_tx, + result_tx: result_tx.into(), }); Ok(()) } @@ -284,14 +285,10 @@ impl HostTcpSocketWithStore for WasiSockets { async fn listen( store: &Accessor, socket: Resource, - ) -> wasmtime::Result>, ErrorCode>> { + ) -> wasmtime::Result>, ErrorCode>> { store.with(|mut view| { - let (tx, rx) = view - .instance() - .stream::<_, _, Option<_>>(&mut view) - .context("failed to create stream")?; if !view.get().ctx.allowed_network_uses.tcp { - return Ok(Err(ErrorCode::AccessDenied)); + return anyhow::Ok(Err(ErrorCode::AccessDenied)); } let TcpSocket { tcp_state, @@ -328,24 +325,29 @@ impl HostTcpSocketWithStore for WasiSockets { }; let listener = Arc::new(listener); *tcp_state = TcpState::Listening(Arc::clone(&listener)); + let family = *family; + let options = options.clone(); + let (tx, rx) = view + .instance() + .stream(&mut view) + .context("failed to create stream")?; let task = ListenTask { listener, - family: *family, + family, tx, - options: options.clone(), + options, }; view.spawn(task); - Ok(Ok(rx.into())) + Ok(Ok(rx)) }) } async fn send( store: &Accessor, socket: Resource, - data: HostStream, + data: StreamReader, ) -> wasmtime::Result> { let (stream, mut data) = match store.with(|mut view| -> wasmtime::Result<_> { - let data = data.into_reader::>(&mut view); let sock = get_socket(view.get().table, &socket)?; if let TcpState::Connected(stream) | TcpState::Receiving(stream) = &sock.tcp_state { Ok(Ok((Arc::clone(&stream), data))) @@ -387,32 +389,34 @@ impl HostTcpSocketWithStore for WasiSockets { async fn receive( store: &Accessor, socket: Resource, - ) -> wasmtime::Result<(HostStream, HostFuture>)> { + ) -> wasmtime::Result<(StreamReader, FutureReader>)> { store.with(|mut view| { let instance = view.instance(); let (data_tx, data_rx) = instance - .stream::<_, _, BytesMut>(&mut view) + .stream(&mut view) .context("failed to create stream")?; let TcpSocket { tcp_state, .. } = get_socket_mut(view.get().table, &socket)?; match mem::replace(tcp_state, TcpState::Closed) { TcpState::Connected(stream) => { *tcp_state = TcpState::Receiving(Arc::clone(&stream)); let (result_tx, result_rx) = instance - .future(|| unreachable!(), &mut view) + .future(&mut view, || unreachable!()) .context("failed to create future")?; view.spawn(ReceiveTask { stream, data_tx, result_tx, }); - Ok((data_rx.into(), result_rx.into())) + Ok((data_rx, result_rx)) } prev => { *tcp_state = prev; - let (_, result_rx) = instance - .future(|| Err(ErrorCode::InvalidState), &mut view) + let (result_tx, result_rx) = instance + .future(&mut view, || Err(ErrorCode::InvalidState)) .context("failed to create future")?; - Ok((data_rx.into(), result_rx.into())) + result_tx.close(&mut view)?; + data_tx.close(&mut view)?; + Ok((data_rx, result_rx)) } } }) diff --git a/crates/wasmtime/src/runtime/component/concurrent.rs b/crates/wasmtime/src/runtime/component/concurrent.rs index 13108033678e..043f44cc70c6 100644 --- a/crates/wasmtime/src/runtime/component/concurrent.rs +++ b/crates/wasmtime/src/runtime/component/concurrent.rs @@ -69,7 +69,8 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::fmt; use std::future::Future; use std::marker::PhantomData; -use std::mem::{self, MaybeUninit}; +use std::mem::{self, ManuallyDrop, MaybeUninit}; +use std::ops::DerefMut; use std::pin::{Pin, pin}; use std::ptr::{self, NonNull}; use std::slice; @@ -88,8 +89,9 @@ use wasmtime_environ::component::{ pub use abort::AbortHandle; pub use futures_and_streams::{ - ErrorContext, FutureReader, FutureWriter, HostFuture, HostStream, ReadBuffer, StreamReader, - StreamWriter, VecBuffer, Watch, WriteBuffer, + ErrorContext, FutureReader, FutureWriter, GuardedFutureReader, GuardedFutureWriter, + GuardedStreamReader, GuardedStreamWriter, ReadBuffer, StreamReader, StreamWriter, VecBuffer, + WriteBuffer, }; pub(crate) use futures_and_streams::{ ResourcePair, lower_error_context_to_index, lower_future_to_index, lower_stream_to_index, @@ -100,7 +102,7 @@ mod error_contexts; mod futures_and_streams; mod states; mod table; -mod tls; +pub(crate) mod tls; /// Constant defined in the Component Model spec to indicate that the async /// intrinsic (e.g. `future.write`) has not yet completed. @@ -228,7 +230,7 @@ where where T: 'static, { - self.accessor.instance.spawn_with_accessor( + self.accessor.instance.unwrap().spawn_with_accessor( self.store.as_context_mut(), self.accessor.clone_for_spawn(), task, @@ -328,7 +330,7 @@ where { token: StoreToken, get_data: fn(&mut T) -> D::Data<'_>, - instance: Instance, + instance: Option, } /// A helper trait to take any type of accessor-with-data in functions. @@ -415,7 +417,7 @@ impl Accessor { /// /// - `instance`: used to access the `Instance` to which this `Accessor` /// (and the future which closes over it) belongs - fn new(token: StoreToken, instance: Instance) -> Self { + pub(crate) fn new(token: StoreToken, instance: Option) -> Self { Self { token, get_data: |x| x, @@ -506,7 +508,7 @@ where where T: 'static, { - let instance = self.instance; + let instance = self.instance.unwrap(); let accessor = self.clone_for_spawn(); self.with(|mut access| { instance.spawn_with_accessor(access.as_context_mut(), accessor, task) @@ -515,7 +517,7 @@ where /// Retrieve the component instance of the caller. pub fn instance(&self) -> Instance { - self.instance + self.instance.unwrap() } fn clone_for_spawn(&self) -> Self { @@ -1291,11 +1293,33 @@ impl Instance { let mut store = store.as_context_mut(); let token = StoreToken::new(store.as_context_mut()); - self.poll_until(store.as_context_mut(), async move { - let accessor = Accessor::new(token, self); - fun(&accessor).await - }) - .await + struct Dropper<'a, T: 'static, V> { + store: StoreContextMut<'a, T>, + value: ManuallyDrop, + } + + impl<'a, T, V> Drop for Dropper<'a, T, V> { + fn drop(&mut self) { + tls::set(self.store.0.traitobj_mut(), || { + // SAFETY: Here we drop the value without moving it for the + // first and only time -- per the contract for `Drop::drop`, + // this code won't run again, and the `value` field will no + // longer be accessible. + unsafe { ManuallyDrop::drop(&mut self.value) } + }); + } + } + + let accessor = &Accessor::new(token, Some(self)); + let dropper = &mut Dropper { + store, + value: ManuallyDrop::new(fun(accessor)), + }; + // SAFETY: We never move `dropper` nor its `value` field. + let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) }; + + self.poll_until(dropper.store.as_context_mut(), future) + .await } /// Spawn a background task to run as part of this instance's event loop. @@ -1313,7 +1337,7 @@ impl Instance { task: impl AccessorTask, Result<()>>, ) -> AbortHandle { let mut store = store.as_context_mut(); - let accessor = Accessor::new(StoreToken::new(store.as_context_mut()), self); + let accessor = Accessor::new(StoreToken::new(store.as_context_mut()), Some(self)); self.spawn_with_accessor(store, accessor, task) } @@ -1352,10 +1376,8 @@ impl Instance { async fn poll_until( self, store: StoreContextMut<'_, T>, - future: impl Future, + mut future: Pin<&mut impl Future>, ) -> Result { - let mut future = pin!(future); - loop { // Take `ConcurrentState::futures` out of the instance so we can // poll it while also safely giving any of the futures inside access @@ -1408,34 +1430,49 @@ impl Instance { // outer loop in case there is another one ready to // complete. Poll::Ready(true) => Poll::Ready(Ok(Either::Right(Vec::new()))), - // In this case, there are no more pending futures - // in `ConcurrentState::futures`, there are no - // remaining work items, _and_ the future we were - // passed as an argument still hasn't completed, - // meaning we're stuck, so we return an error. The - // underlying assumption is that `future` depends on - // this component instance making such progress, and - // thus there's no point in continuing to poll it - // given we've run out of work to do. - // - // Note that we'd also reach this point if the host - // embedder passed e.g. a `std::future::Pending` to - // `Instance::run_concurrent`, in which case we'd - // return a "deadlock" error even when any and all - // tasks have completed normally. However, that's - // not how `Instance::run_concurrent` is intended - // (and documented) to be used, so it seems - // reasonable to lump that case in with "real" - // deadlocks. - // - // TODO: Once we've added host APIs for cancelling - // in-progress tasks, we can return some other, - // non-error value here, treating it as "normal" and - // giving the host embedder a chance to intervene by - // cancelling one or more tasks and/or starting new - // tasks capable of waking the existing ones. Poll::Ready(false) => { - Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock))) + // Poll the future we were passed one last time + // in case one of `ConcurrentState::futures` had + // the side effect of unblocking it. + if let Poll::Ready(value) = + self.set_tls(store.0, || future.as_mut().poll(cx)) + { + Poll::Ready(Ok(Either::Left(value))) + } else { + // In this case, there are no more pending + // futures in `ConcurrentState::futures`, + // there are no remaining work items, _and_ + // the future we were passed as an argument + // still hasn't completed, meaning we're + // stuck, so we return an error. The + // underlying assumption is that `future` + // depends on this component instance making + // such progress, and thus there's no point + // in continuing to poll it given we've run + // out of work to do. + // + // Note that we'd also reach this point if + // the host embedder passed e.g. a + // `std::future::Pending` to + // `Instance::run_concurrent`, in which case + // we'd return a "deadlock" error even when + // any and all tasks have completed + // normally. However, that's not how + // `Instance::run_concurrent` is intended + // (and documented) to be used, so it seems + // reasonable to lump that case in with + // "real" deadlocks. + // + // TODO: Once we've added host APIs for + // cancelling in-progress tasks, we can + // return some other, non-error value here, + // treating it as "normal" and giving the + // host embedder a chance to intervene by + // cancelling one or more tasks and/or + // starting new tasks capable of waking the + // existing ones. + Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock))) + } } // There is at least one pending future in // `ConcurrentState::futures` and we have nothing @@ -2445,7 +2482,7 @@ impl Instance { { let token = StoreToken::new(store); async move { - let mut accessor = Accessor::new(token, self); + let mut accessor = Accessor::new(token, Some(self)); closure(&mut accessor).await } } @@ -3224,6 +3261,14 @@ pub trait VMComponentAsyncStore { address: u32, ) -> Result; + /// The `future.drop-writable` intrinsic. + fn future_drop_writable( + &mut self, + instance: Instance, + ty: TypeFutureTableIndex, + writer: u32, + ) -> Result<()>; + /// The `stream.write` intrinsic. fn stream_write( &mut self, @@ -3274,6 +3319,14 @@ pub trait VMComponentAsyncStore { count: u32, ) -> Result; + /// The `stream.drop-writable` intrinsic. + fn stream_drop_writable( + &mut self, + instance: Instance, + ty: TypeStreamTableIndex, + writer: u32, + ) -> Result<()>; + /// The `error-context.debug-message` intrinsic. fn error_context_debug_message( &mut self, @@ -3472,6 +3525,15 @@ impl VMComponentAsyncStore for StoreInner { .map(|result| result.encode()) } + fn future_drop_writable( + &mut self, + instance: Instance, + ty: TypeFutureTableIndex, + writer: u32, + ) -> Result<()> { + instance.guest_drop_writable(StoreContextMut(self), TableIndex::Future(ty), writer) + } + fn flat_stream_write( &mut self, instance: Instance, @@ -3526,6 +3588,15 @@ impl VMComponentAsyncStore for StoreInner { .map(|result| result.encode()) } + fn stream_drop_writable( + &mut self, + instance: Instance, + ty: TypeStreamTableIndex, + writer: u32, + ) -> Result<()> { + instance.guest_drop_writable(StoreContextMut(self), TableIndex::Stream(ty), writer) + } + fn error_context_debug_message( &mut self, instance: Instance, @@ -3545,7 +3616,7 @@ impl VMComponentAsyncStore for StoreInner { } /// Represents the output of a host task or background task. -enum HostTaskOutput { +pub(crate) enum HostTaskOutput { /// A plain result Result(Result<()>), /// A function to be run after the future completes (e.g. post-processing @@ -4185,7 +4256,7 @@ impl ConcurrentState { } } - /// Take ownership of any fibers owned by this object. + /// Take ownership of any fibers and futures owned by this object. /// /// This should be used when disposing of the `Store` containing this object /// in order to gracefully resolve any and all fibers using @@ -4193,33 +4264,58 @@ impl ConcurrentState { /// use-after-free bugs due to fibers which may still have access to the /// `Store`. /// + /// Additionally, the futures collected with this function should be dropped + /// within a `tls::set` call, which will ensure than any futures closing + /// over an `&Accessor` will have access to the store when dropped, allowing + /// e.g. `WithAccessor[AndValue]` instances to be disposed of without + /// panicking. + /// /// Note that this will leave the object in an inconsistent and unusable /// state, so it should only be used just prior to dropping it. - pub(crate) fn take_fibers(&mut self, vec: &mut Vec>) { + pub(crate) fn take_fibers_and_futures( + &mut self, + fibers: &mut Vec>, + futures: &mut Vec>, + ) { for entry in mem::take(&mut self.table) { if let Ok(set) = entry.downcast::() { for mode in set.waiting.into_values() { if let WaitMode::Fiber(fiber) = mode { - vec.push(fiber); + fibers.push(fiber); } } } } if let Some(fiber) = self.worker.take() { - vec.push(fiber); + fibers.push(fiber); } let mut take_items = |list| { for item in mem::take(list) { - if let WorkItem::ResumeFiber(fiber) = item { - vec.push(fiber); + match item { + WorkItem::ResumeFiber(fiber) => { + fibers.push(fiber); + } + WorkItem::PushFuture(future) => { + self.futures + .get_mut() + .unwrap() + .as_mut() + .unwrap() + .push(future.into_inner().unwrap()); + } + _ => {} } } }; take_items(&mut self.high_priority); take_items(&mut self.low_priority); + + if let Some(them) = self.futures.get_mut().unwrap().take() { + futures.push(them); + } } } diff --git a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs index 8f495b99138a..2809cb6b12f7 100644 --- a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs +++ b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs @@ -1,30 +1,30 @@ use super::table::{TableDebug, TableId}; use super::{ - Event, GlobalErrorContextRefCount, HostTaskOutput, LocalErrorContextRefCount, StateTable, - Waitable, WaitableCommon, WaitableState, + Event, GlobalErrorContextRefCount, LocalErrorContextRefCount, StateTable, Waitable, + WaitableCommon, WaitableState, }; -use crate::component::concurrent::{ConcurrentState, tls}; +use crate::component::concurrent::{ConcurrentState, HostTaskOutput, tls}; use crate::component::func::{self, LiftContext, LowerContext, Options}; use crate::component::matching::InstanceType; use crate::component::values::{ErrorContextAny, FutureAny, StreamAny}; -use crate::component::{AsAccessor, Instance, Lower, Val, WasmList, WasmStr}; +use crate::component::{ + Accessor, AsAccessor, HasData, HasSelf, Instance, Lower, Val, WasmList, WasmStr, +}; use crate::store::{StoreOpaque, StoreToken}; use crate::vm::VMStore; use crate::{AsContextMut, StoreContextMut, ValRaw}; use anyhow::{Context, Result, anyhow, bail}; use buffers::Extender; use buffers::UntypedWriteBuffer; -use futures::channel::{mpsc, oneshot}; -use futures::future::{self, FutureExt}; -use futures::stream::StreamExt; +use futures::channel::oneshot; use std::boxed::Box; use std::fmt; -use std::future::Future; +use std::future; use std::iter; use std::marker::PhantomData; -use std::mem::{self, MaybeUninit}; +use std::mem::{self, ManuallyDrop, MaybeUninit}; use std::string::{String, ToString}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::task::{Poll, Waker}; use std::vec::Vec; use wasmtime_environ::component::{ @@ -177,157 +177,193 @@ fn waitable_state(ty: TableIndex, state: StreamFutureState) -> WaitableState { } } -/// Return a closure which matches a host write operation to a read (or drop) -/// operation. -/// -/// This may be used when the host initiates a write but there is no read -/// pending at the other end, in which case we construct a -/// `WriteState::HostReady` using the closure created here and leave it in -/// `TransmitState::write` for the reader to find and call when it's ready. +/// Complete a write initiated by a host-owned future or stream by matching it +/// with the specified `Reader`. fn accept_reader, U: 'static>( - store: StoreContextMut, + mut store: StoreContextMut, + instance: Instance, + reader: Reader, mut buffer: B, - tx: oneshot::Sender>, kind: TransmitKind, -) -> impl FnOnce(&mut dyn VMStore, Instance, Reader) -> Result -+ Send -+ Sync -+ 'static -+ use { - let token = StoreToken::new(store); - move |store, instance, reader| { - let code = match reader { - Reader::Guest { - options, - ty, - address, - count, - } => { - let mut store = token.as_context_mut(store); - let types = instance.id().get(store.0).component().types().clone(); - let count = buffer.remaining().len().min(count); - - let lower = - &mut LowerContext::new(store.as_context_mut(), options, &types, instance); - if address % usize::try_from(T::ALIGN32)? != 0 { - bail!("read pointer not aligned"); - } - lower - .as_slice_mut() - .get_mut(address..) - .and_then(|b| b.get_mut(..T::SIZE32 * count)) - .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?; - - if let Some(ty) = payload(ty, &types) { - T::linear_store_list_to_memory( - lower, - ty, - address, - &buffer.remaining()[..count], - )?; - } - - buffer.skip(count); - _ = tx.send(HostResult { +) -> Result<(HostResult, ReturnCode)> { + Ok(match reader { + Reader::Guest { + options, + ty, + address, + count, + } => { + let types = instance.id().get(store.0).component().types().clone(); + let count = buffer.remaining().len().min(count); + + let lower = &mut LowerContext::new(store.as_context_mut(), options, &types, instance); + if address % usize::try_from(T::ALIGN32)? != 0 { + bail!("read pointer not aligned"); + } + lower + .as_slice_mut() + .get_mut(address..) + .and_then(|b| b.get_mut(..T::SIZE32 * count)) + .ok_or_else(|| anyhow::anyhow!("read pointer out of bounds of memory"))?; + + if let Some(ty) = payload(ty, &types) { + T::linear_store_list_to_memory(lower, ty, address, &buffer.remaining()[..count])?; + } + + buffer.skip(count); + ( + HostResult { buffer, dropped: false, - }); - ReturnCode::completed(kind, count.try_into().unwrap()) - } - Reader::Host { accept } => { - let count = buffer.remaining().len(); - let mut untyped = UntypedWriteBuffer::new(&mut buffer); - let count = accept(&mut untyped, count); - _ = tx.send(HostResult { + }, + ReturnCode::completed(kind, count.try_into().unwrap()), + ) + } + Reader::Host { accept } => { + let count = buffer.remaining().len(); + let mut untyped = UntypedWriteBuffer::new(&mut buffer); + let count = accept(&mut untyped, count); + ( + HostResult { buffer, dropped: false, - }); - ReturnCode::completed(kind, count.try_into().unwrap()) - } - Reader::End => { - _ = tx.send(HostResult { - buffer, - dropped: true, - }); - ReturnCode::Dropped(0) - } - }; - - Ok(code) - } + }, + ReturnCode::completed(kind, count.try_into().unwrap()), + ) + } + Reader::End => ( + HostResult { + buffer, + dropped: true, + }, + ReturnCode::Dropped(0), + ), + }) } -/// Return a closure which matches a host read operation to a write (or drop) -/// operation. -/// -/// This may be used when the host initiates a read but there is no write -/// pending at the other end, in which case we construct a -/// `ReadState::HostReady` using the closure created here and leave it in -/// `TransmitState::read` for the writer to find and call when it's ready. +/// Complete a read initiated by a host-owned future or stream by matching it with the +/// specified `Writer`. fn accept_writer, U>( + writer: Writer, mut buffer: B, - tx: oneshot::Sender>, kind: TransmitKind, -) -> impl FnOnce(Writer) -> Result + Send + Sync + 'static { - move |writer| { - let count = match writer { - Writer::Guest { - lift, - ty, - address, - count, - } => { - let count = count.min(buffer.remaining_capacity()); - if T::IS_RUST_UNIT_TYPE { - // SAFETY: `T::IS_RUST_UNIT_TYPE` is only true for `()`, a - // zero-sized type, so `MaybeUninit::uninit().assume_init()` - // is a valid way to populate the zero-sized buffer. - buffer.extend( - iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }) - .take(count), - ) - } else { - let ty = ty.unwrap(); - if address % usize::try_from(T::ALIGN32)? != 0 { - bail!("write pointer not aligned"); - } - lift.memory() - .get(address..) - .and_then(|b| b.get(..T::SIZE32 * count)) - .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?; - - let list = &WasmList::new(address, count, lift, ty)?; - T::linear_lift_into_from_memory(lift, list, &mut Extender(&mut buffer))? +) -> Result<(HostResult, ReturnCode)> { + Ok(match writer { + Writer::Guest { + lift, + ty, + address, + count, + } => { + let count = count.min(buffer.remaining_capacity()); + if T::IS_RUST_UNIT_TYPE { + // SAFETY: `T::IS_RUST_UNIT_TYPE` is only true for `()`, a + // zero-sized type, so `MaybeUninit::uninit().assume_init()` + // is a valid way to populate the zero-sized buffer. + buffer.extend( + iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }) + .take(count), + ) + } else { + let ty = ty.unwrap(); + if address % usize::try_from(T::ALIGN32)? != 0 { + bail!("write pointer not aligned"); } - _ = tx.send(HostResult { - buffer, - dropped: false, - }); - ReturnCode::completed(kind, count.try_into().unwrap()) + lift.memory() + .get(address..) + .and_then(|b| b.get(..T::SIZE32 * count)) + .ok_or_else(|| anyhow::anyhow!("write pointer out of bounds of memory"))?; + + let list = &WasmList::new(address, count, lift, ty)?; + T::linear_lift_into_from_memory(lift, list, &mut Extender(&mut buffer))? } - Writer::Host { - buffer: input, - count, - } => { - let count = count.min(buffer.remaining_capacity()); - buffer.move_from(input.get_mut::(), count); - _ = tx.send(HostResult { + ( + HostResult { buffer, dropped: false, - }); - ReturnCode::completed(kind, count.try_into().unwrap()) - } - Writer::End => { - _ = tx.send(HostResult { + }, + ReturnCode::completed(kind, count.try_into().unwrap()), + ) + } + Writer::Host { + buffer: input, + count, + } => { + let count = count.min(buffer.remaining_capacity()); + buffer.move_from(input.get_mut::(), count); + ( + HostResult { buffer, - dropped: true, - }); - ReturnCode::Dropped(0) - } - }; + dropped: false, + }, + ReturnCode::completed(kind, count.try_into().unwrap()), + ) + } + Writer::End => ( + HostResult { + buffer, + dropped: true, + }, + ReturnCode::Dropped(0), + ), + }) +} - Ok(count) - } +/// Return a `Future` which will resolve once the reader end corresponding to +/// the specified writer end of a future or stream is dropped. +async fn watch_reader(accessor: impl AsAccessor, instance: Instance, id: TableId) { + future::poll_fn(|cx| { + accessor + .as_accessor() + .with(|mut access| { + let concurrent_state = instance.concurrent_state_mut(access.as_context_mut().0); + let state_id = concurrent_state.get(id)?.state; + let state = concurrent_state.get_mut(state_id)?; + anyhow::Ok(if matches!(&state.read, ReadState::Dropped) { + Poll::Ready(()) + } else { + state.reader_watcher = Some(cx.waker().clone()); + Poll::Pending + }) + }) + .unwrap_or(Poll::Ready(())) + }) + .await +} + +/// Return a `Future` which will resolve once the writer end corresponding to +/// the specified reader end of a future or stream is dropped. +async fn watch_writer(accessor: impl AsAccessor, instance: Instance, id: TableId) { + future::poll_fn(|cx| { + accessor + .as_accessor() + .with(|mut access| { + let concurrent_state = instance.concurrent_state_mut(access.as_context_mut().0); + let state_id = concurrent_state.get(id)?.state; + let state = concurrent_state.get_mut(state_id)?; + anyhow::Ok( + if matches!( + &state.write, + WriteState::Dropped + | WriteState::GuestReady { + post_write: PostWrite::Drop, + .. + } + | WriteState::HostReady { + post_write: PostWrite::Drop, + .. + } + ) { + Poll::Ready(()) + } else { + state.writer_watcher = Some(cx.waker().clone()); + Poll::Pending + }, + ) + }) + .unwrap_or(Poll::Ready(())) + }) + .await } /// Represents the state of a stream or future handle from the perspective of a @@ -367,136 +403,78 @@ pub(super) struct FlatAbi { pub(super) align: u32, } -/// Represents a pending event on a host-owned write end of a stream or future. -/// -/// See `ComponentInstance::start_write_event_loop` for details. -enum WriteEvent { - /// Write the items in the specified buffer to the stream or future, and - /// return the result via the specified `Sender`. - Write { - buffer: B, - tx: oneshot::Sender>, - }, - /// Drop the write end of the stream or future. - Drop(Option B + Send + Sync>>), - /// Watch the read (i.e. opposite) end of this stream or future, dropping - /// the specified sender when it is dropped. - Watch { tx: oneshot::Sender<()> }, -} - -/// Represents a pending event on a host-owned read end of a stream or future. -/// -/// See `ComponentInstance::start_read_event_loop` for details. -enum ReadEvent { - /// Read as many items as the specified buffer will hold from the stream or - /// future, and return the result via the specified `Sender`. - Read { - buffer: B, - tx: oneshot::Sender>, - }, - /// Drop the read end of the stream or future. - Drop, - /// Watch the write (i.e. opposite) end of this stream or future, dropping - /// the specified sender when it is dropped. - Watch { tx: oneshot::Sender<()> }, -} +/// Trait representing objects (such as streams, futures, or structs containing +/// them) which require access to the store in order to be disposed of properly. +trait DropWithStore: Sized { + /// Dispose of `self` using the specified store. + fn drop(&mut self, store: impl AsContextMut) -> Result<()>; -/// Send the specified value to the specified `Sender`. -/// -/// This will panic if there is no room in the channel's buffer, so it should -/// only be used in a context where there is at least one empty spot in the -/// buffer. It will silently ignore any other error (e.g. if the `Receiver` has -/// been dropped). -fn send(tx: &mut mpsc::Sender, value: T) { - if let Err(e) = tx.try_send(value) { - if e.is_full() { - unreachable!(); - } + /// Dispose of `self` using the specified accessor. + fn drop_with(&mut self, accessor: impl AsAccessor) -> Result<()> { + accessor.as_accessor().with(|store| self.drop(store)) } } -/// Wrapper struct which may be converted to the inner value as needed. +/// RAII wrapper for `DropWithStore` implementations. /// -/// This object is normally paired with a `Future` which represents a state -/// change on the inner value, resolving when that state change happens _or_ -/// when the `Watch` is converted back into the inner value -- whichever happens -/// first. -pub struct Watch { - inner: T, - waker: Arc>, +/// This may be used to automatically dispose of the wrapped object when it goes +/// out of scope. +struct WithAccessor<'a, T: DropWithStore, U: 'static, D: HasData + ?Sized = HasSelf> { + accessor: &'a Accessor, + inner: ManuallyDrop, } -enum WatchState { - Idle, - Waiting(Waker), - Done, -} - -impl Watch { - /// Convert this object into its inner value. - /// - /// Calling this function will cause the associated `Future` to resolve - /// immediately if it hasn't already. - pub fn into_inner(self) -> T { - let state = mem::replace(&mut *self.waker.lock().unwrap(), WatchState::Done); - if let WatchState::Waiting(waker) = state { - waker.wake(); +impl<'a, T: DropWithStore, U, D: HasData + ?Sized> WithAccessor<'a, T, U, D> { + /// Create a new instance wrapping the specified `inner` object. + fn new(accessor: &'a Accessor, inner: T) -> Self { + Self { + accessor, + inner: ManuallyDrop::new(inner), } - self.inner } -} -/// Wrap the specified `oneshot::Receiver` in a future which resolves when -/// either that `Receiver` resolves or `Watch::into_inner` has been called on -/// the returned `Watch`. -fn watch( - instance: Instance, - mut rx: oneshot::Receiver<()>, - inner: T, -) -> (impl Future + Send + 'static, Watch) { - let waker = Arc::new(Mutex::new(WatchState::Idle)); - ( - super::checked( - instance, - future::poll_fn({ - let waker = waker.clone(); + fn into_parts(self) -> (&'a Accessor, T) { + let accessor = self.accessor; + let mut me = ManuallyDrop::new(self); + // SAFETY: We've wrapped `self` in a `ManuallyDrop` and will not use or + // drop it after we've moved the `inner` field out. + let inner = unsafe { ManuallyDrop::take(&mut me.inner) }; + (accessor, inner) + } +} - move |cx| { - if rx.poll_unpin(cx).is_ready() { - return Poll::Ready(()); - } - let mut state = waker.lock().unwrap(); - match *state { - WatchState::Done => Poll::Ready(()), - _ => { - *state = WatchState::Waiting(cx.waker().clone()); - Poll::Pending - } - } - } - }), - ), - Watch { waker, inner }, - ) +impl<'a, T: DropWithStore, U, D: HasData + ?Sized> Drop for WithAccessor<'a, T, U, D> { + fn drop(&mut self) { + // SAFETY: `Drop::drop` is called at most once and after which `self` + // can no longer be used, thus ensuring `self.inner` will no longer be + // used. + // + // Technically we could avoid `unsafe` here and just call + // `self.inner.drop_with` instead, but then `T` would never by dropped. + // As of this writing, we don't use types for `T` which implement `Drop` + // anyway, but that could change later. + _ = unsafe { ManuallyDrop::take(&mut self.inner) }.drop_with(self.accessor); + } } /// Represents the writable end of a Component Model `future`. -pub struct FutureWriter { - default: Option T>, +/// +/// Note that `FutureWriter` instances must be disposed of using either `write` +/// or `close`; otherwise the in-store representation will leak and the reader +/// end will hang indefinitely. Consider using [`GuardedFutureWriter`] to +/// ensure that disposal happens automatically. +pub struct FutureWriter { + default: fn() -> T, + id: TableId, instance: Instance, - tx: Option>>>, } impl FutureWriter { - fn new( - default: fn() -> T, - tx: Option>>>, - instance: Instance, - ) -> Self { + fn new(default: fn() -> T, id: TableId, instance: Instance) -> Self { Self { - default: Some(default), + default, + id, instance, - tx, } } @@ -510,28 +488,22 @@ impl FutureWriter { /// /// Panics if the store that the [`Accessor`] is derived from does not own /// this future. - pub async fn write(mut self, accessor: impl AsAccessor, value: T) -> bool + pub async fn write(self, accessor: impl AsAccessor, value: T) -> bool where - T: Send + 'static, + T: func::Lower + Send + Sync + 'static, { - // FIXME: this is intended to be used in the future to directly - // manipulate state for this future within the store without having to - // go through an mpsc. - let _accessor = accessor.as_accessor(); - let (tx, rx) = oneshot::channel(); - send( - &mut self.tx.as_mut().unwrap(), - WriteEvent::Write { - buffer: Some(value), - tx, - }, - ); - self.default = None; - let v = rx.await; - drop(self); - match v { + let accessor = accessor.as_accessor(); + + let me = WithAccessor::new(accessor, self); + let result = me + .inner + .instance + .host_write_async(accessor, me.inner.id, Some(value), TransmitKind::Future) + .await; + + match result { Ok(HostResult { dropped, .. }) => !dropped, - Err(_) => todo!("guarantee buffer recovery if event loop errors or panics"), + Err(_) => todo!("guarantee buffer recovery if `host_write` fails"), } } @@ -544,79 +516,153 @@ impl FutureWriter { /// /// Panics if the store that the [`Accessor`] is derived from does not own /// this future. - pub async fn watch_reader(&mut self, accessor: impl AsAccessor) + pub async fn watch_reader(&mut self, accessor: impl AsAccessor) { + watch_reader(accessor, self.instance, self.id).await + } + + /// Close this `FutureWriter`, writing the default value. + pub fn close(mut self, store: impl AsContextMut) -> Result<()> + where + T: func::Lower + Send + Sync + 'static, + { + self.drop(store) + } + + /// Close this `FutureWriter`, writing the default value. + pub fn close_with(mut self, accessor: impl AsAccessor) -> Result<()> where - T: Send + 'static, + T: func::Lower + Send + Sync + 'static, { - // FIXME: this is intended to be used in the future to directly - // manipulate state for this future within the store without having to - // go through an mpsc. - let _accessor = accessor.as_accessor(); - let (tx, rx) = oneshot::channel(); - send(&mut self.tx.as_mut().unwrap(), WriteEvent::Watch { tx }); - let (future, _watch) = watch(self.instance, rx, ()); - future.await; + accessor.as_accessor().with(|access| self.drop(access)) } } -impl Drop for FutureWriter { - fn drop(&mut self) { - if let Some(mut tx) = self.tx.take() { - send( - &mut tx, - WriteEvent::Drop(self.default.take().map(|v| { - Box::new(move || Some(v())) - as Box Option + Send + Sync + 'static> - })), - ); - } +impl DropWithStore for FutureWriter { + fn drop(&mut self, mut store: impl AsContextMut) -> Result<()> { + // `self` should never be used again, but leave an invalid handle there just in case. + let id = mem::replace(&mut self.id, TableId::new(0)); + let default = self.default; + self.instance + .host_drop_writer(store.as_context_mut(), id, Some(&move || Ok(default()))) } } -/// Represents the readable end of a Component Model `future`. +/// A `FutureWriter` paired with an `Accessor`. /// -/// In order to actually read from or drop this `future`, first convert it to a -/// [`FutureReader`] using the `into_reader` method. +/// This is an RAII wrapper around `FutureWriter` that ensures it is closed when +/// dropped. +pub struct GuardedFutureWriter< + 'a, + T: func::Lower + Send + Sync + 'static, + U: 'static, + D: HasData + ?Sized = HasSelf, +>(WithAccessor<'a, FutureWriter, U, D>); + +impl<'a, T: func::Lower + Send + Sync + 'static, U: 'static, D: HasData + ?Sized> + GuardedFutureWriter<'a, T, U, D> +{ + /// Create a new `GuardedFutureWriter` with the specified `accessor` and `writer`. + pub fn new(accessor: &'a Accessor, writer: FutureWriter) -> Self { + Self(WithAccessor::new(accessor, writer)) + } + + /// Wrapper for `FutureWriter::write`. + pub async fn write(self, value: T) -> bool + where + T: func::Lower + Send + Sync + 'static, + { + let (accessor, writer) = self.0.into_parts(); + writer.write(accessor, value).await + } + + /// Wrapper for `FutureWriter::watch_reader`. + pub async fn watch_reader(&mut self) { + self.0.inner.watch_reader(self.0.accessor).await + } +} + +impl<'a, T: func::Lower + Send + Sync + 'static, U: 'static, D: HasData + ?Sized> + From> for FutureWriter +{ + fn from(writer: GuardedFutureWriter<'a, T, U, D>) -> Self { + writer.0.into_parts().1 + } +} + +/// Represents the readable end of a Component Model `future`. /// -/// Note that if a value of this type is dropped without either being converted -/// to a `FutureReader` or passed to the guest, any writes on the write end may -/// block forever. -pub struct HostFuture { +/// Note that `FutureReader` instances must be disposed of using either `read` +/// or `close`; otherwise the in-store representation will leak and the writer +/// end will hang indefinitely. Consider using [`GuardedFutureReader`] to +/// ensure that disposal happens automatically. +pub struct FutureReader { instance: Instance, - rep: u32, + id: TableId, _phantom: PhantomData, } -impl HostFuture { - /// Create a new `HostFuture`. - fn new(rep: u32, instance: Instance) -> Self { +impl FutureReader { + fn new(id: TableId, instance: Instance) -> Self { Self { instance, - rep, + id, _phantom: PhantomData, } } - /// Convert this object into a [`FutureReader`]. - pub fn into_reader(self, mut store: impl AsContextMut) -> FutureReader + /// Read the value from this `future`. + /// + /// The returned `Future` will yield `Err` if the guest has trapped + /// before it could produce a result. + /// + /// The [`Accessor`] provided can be acquired from [`Instance::run_concurrent`] or + /// from within a host function for example. + /// + /// # Panics + /// + /// Panics if the store that the [`Accessor`] is derived from does not own + /// this future. + pub async fn read(self, accessor: impl AsAccessor) -> Option where - T: func::Lower + func::Lift + Send + Sync + 'static, + T: func::Lift + Send + 'static, { - FutureReader { - instance: self.instance, - rep: self.rep, - tx: Some(self.instance.start_read_event_loop( - store.as_context_mut(), - self.rep, - TransmitKind::Future, - )), + let accessor = accessor.as_accessor(); + + let me = WithAccessor::new(accessor, self); + let result = me + .inner + .instance + .host_read_async(accessor, me.inner.id, None, TransmitKind::Future) + .await; + + if let Ok(HostResult { + mut buffer, + dropped: false, + }) = result + { + buffer.take() + } else { + None } } + /// Wait for the write end of this `future` to be dropped. + /// + /// The [`Accessor`] provided can be acquired from + /// [`Instance::run_concurrent`] or from within a host function for example. + /// + /// # Panics + /// + /// Panics if the store that the [`Accessor`] is derived from does not own + /// this future. + pub async fn watch_writer(&mut self, accessor: impl AsAccessor) { + watch_writer(accessor, self.instance, self.id).await; + } + /// Convert this `FutureReader` into a [`Val`]. // See TODO comment for `FutureAny`; this is prone to handle leakage. pub fn into_val(self) -> Val { - Val::Future(FutureAny(self.rep)) + Val::Future(FutureAny(self.id.rep())) } /// Attempt to convert the specified [`Val`] to a `FutureReader`. @@ -629,10 +675,9 @@ impl HostFuture { bail!("expected `future`; got `{}`", value.desc()); }; let store = store.as_context_mut(); - instance - .concurrent_state_mut(store.0) - .get(TableId::::new(*rep))?; // Just make sure it's present - Ok(Self::new(*rep, instance)) + let id = TableId::::new(*rep); + instance.concurrent_state_mut(store.0).get(id)?; // Just make sure it's present + Ok(Self::new(id, instance)) } /// Transfer ownership of the read end of a future from a guest to the host. @@ -654,26 +699,48 @@ impl HostFuture { StreamFutureState::Busy => bail!("cannot transfer busy future"), } + let id = TableId::::new(rep); let concurrent_state = cx.instance_mut().concurrent_state_mut(); - let state = concurrent_state - .get(TableId::::new(rep))? - .state; + let state = concurrent_state.get(id)?.state; if concurrent_state.get(state)?.done { bail!("cannot lift future after previous read succeeded"); } - Ok(Self::new(rep, cx.instance_handle())) + Ok(Self::new(id, cx.instance_handle())) } _ => func::bad_type_info(), } } + + /// Close this `FutureReader`. + pub fn close(mut self, store: impl AsContextMut) -> Result<()> { + self.drop(store) + } + + /// Close this `FutureReader`. + pub fn close_with(mut self, accessor: impl AsAccessor) -> Result<()> { + accessor.as_accessor().with(|access| self.drop(access)) + } } -impl fmt::Debug for HostFuture { +impl DropWithStore for FutureReader { + fn drop(&mut self, mut store: impl AsContextMut) -> Result<()> { + // `self` should never be used again, but leave an invalid handle there just in case. + let id = mem::replace(&mut self.id, TableId::new(0)); + self.instance.host_drop_reader( + store.as_context_mut().0.traitobj_mut(), + id, + TransmitKind::Future, + ) + } +} + +impl fmt::Debug for FutureReader { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("HostFuture") - .field("rep", &self.rep) + f.debug_struct("FutureReader") + .field("id", &self.id) + .field("instance", &self.instance) .finish() } } @@ -705,7 +772,7 @@ pub(crate) fn lower_future_to_index( // SAFETY: This relies on the `ComponentType` implementation for `u32` being // safe and correct since we lift and lower future handles as `u32`s. -unsafe impl func::ComponentType for HostFuture { +unsafe impl func::ComponentType for FutureReader { const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4; type Lower = ::Lower; @@ -719,14 +786,18 @@ unsafe impl func::ComponentType for HostFuture { } // SAFETY: See the comment on the `ComponentType` `impl` for this type. -unsafe impl func::Lower for HostFuture { +unsafe impl func::Lower for FutureReader { fn linear_lower_to_flat( &self, cx: &mut LowerContext<'_, U>, ty: InterfaceType, dst: &mut MaybeUninit, ) -> Result<()> { - lower_future_to_index(self.rep, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst) + lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat( + cx, + InterfaceType::U32, + dst, + ) } fn linear_lower_to_memory( @@ -735,7 +806,7 @@ unsafe impl func::Lower for HostFuture { ty: InterfaceType, offset: usize, ) -> Result<()> { - lower_future_to_index(self.rep, cx, ty)?.linear_lower_to_memory( + lower_future_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory( cx, InterfaceType::U32, offset, @@ -744,7 +815,7 @@ unsafe impl func::Lower for HostFuture { } // SAFETY: See the comment on the `ComponentType` `impl` for this type. -unsafe impl func::Lift for HostFuture { +unsafe impl func::Lift for FutureReader { fn linear_lift_from_flat( cx: &mut LiftContext<'_>, ty: InterfaceType, @@ -764,117 +835,63 @@ unsafe impl func::Lift for HostFuture { } } -impl From> for HostFuture { - fn from(mut value: FutureReader) -> Self { - value.tx.take(); - - Self { - instance: value.instance, - rep: value.rep, - _phantom: PhantomData, - } - } -} - -/// Represents the readable end of a Component Model `future`. +/// A `FutureReader` paired with an `Accessor`. /// -/// In order to pass this end to guest code, first convert it to a -/// [`HostFuture`] using the `into` method. -pub struct FutureReader { - instance: Instance, - rep: u32, - tx: Option>>>, -} +/// This is an RAII wrapper around `FutureReader` that ensures it is closed when +/// dropped. +pub struct GuardedFutureReader<'a, T, U: 'static, D: HasData + ?Sized = HasSelf>( + WithAccessor<'a, FutureReader, U, D>, +); -impl FutureReader { - fn new(rep: u32, tx: Option>>>, instance: Instance) -> Self { - Self { instance, rep, tx } +impl<'a, T, U: 'static, D: HasData + ?Sized> GuardedFutureReader<'a, T, U, D> { + /// Create a new `GuardedFutureReader` with the specified `accessor` and `reader`. + pub fn new(accessor: &'a Accessor, reader: FutureReader) -> Self { + Self(WithAccessor::new(accessor.as_accessor(), reader)) } - /// Read the value from this `future`. - /// - /// The returned `Future` will yield `None` if the guest has trapped - /// before it could produce a result. - /// - /// The [`Accessor`] provided can be acquired from [`Instance::run_concurrent`] or - /// from within a host function for example. - /// - /// # Panics - /// - /// Panics if the store that the [`Accessor`] is derived from does not own - /// this future. - pub async fn read(mut self, accessor: impl AsAccessor) -> Option + /// Wrapper for `FutureReader::read`. + pub async fn read(self) -> Option where - T: Send + 'static, + T: func::Lift + Send + Sync + 'static, { - // FIXME: this is intended to be used in the future to directly - // manipulate state for this future within the store without having to - // go through an mpsc. - let _accessor = accessor.as_accessor(); - let (tx, rx) = oneshot::channel(); - send( - &mut self.tx.as_mut().unwrap(), - ReadEvent::Read { buffer: None, tx }, - ); - let v = rx.await; - drop(self); - - if let Ok(HostResult { - mut buffer, - dropped: false, - }) = v - { - buffer.take() - } else { - None - } + let (accessor, reader) = self.0.into_parts(); + reader.read(accessor).await } - /// Wait for the write end of this `future` to be dropped. - /// - /// The [`Accessor`] provided can be acquired from - /// [`Instance::run_concurrent`] or from within a host function for example. - /// - /// # Panics - /// - /// Panics if the store that the [`Accessor`] is derived from does not own - /// this future. - pub async fn watch_writer(&mut self, accessor: impl AsAccessor) - where - T: Send + 'static, - { - // FIXME: this is intended to be used in the future to directly - // manipulate state for this future within the store without having to - // go through an mpsc. - let _accessor = accessor.as_accessor(); - let (tx, rx) = oneshot::channel(); - send(&mut self.tx.as_mut().unwrap(), ReadEvent::Watch { tx }); - let (future, _watch) = watch(self.instance, rx, ()); - future.await + /// Wrapper for `FutureReader::watch_writer`. + pub async fn watch_writer(&mut self) { + self.0.inner.watch_writer(self.0.accessor).await } } -impl Drop for FutureReader { - fn drop(&mut self) { - if let Some(mut tx) = self.tx.take() { - send(&mut tx, ReadEvent::Drop); - } +impl<'a, T, U: 'static, D: HasData + ?Sized> From> + for FutureReader +{ + fn from(reader: GuardedFutureReader<'a, T, U, D>) -> Self { + reader.0.into_parts().1 } } /// Represents the writable end of a Component Model `stream`. -pub struct StreamWriter { +/// +/// Note that `StreamWriter` instances must be disposed of using `close`; +/// otherwise the in-store representation will leak and the reader end will hang +/// indefinitely. Consider using [`GuardedStreamWriter`] to ensure that +/// disposal happens automatically. +pub struct StreamWriter { instance: Instance, + id: TableId, closed: bool, - tx: Option>>, + _phantom: PhantomData, } -impl StreamWriter { - fn new(tx: Option>>, instance: Instance) -> Self { +impl StreamWriter { + fn new(id: TableId, instance: Instance) -> Self { Self { instance, - tx, + id, closed: false, + _phantom: PhantomData, } } @@ -901,18 +918,22 @@ impl StreamWriter { /// /// Panics if the store that the [`Accessor`] is derived from does not own /// this future. - pub async fn write(&mut self, accessor: impl AsAccessor, buffer: B) -> B + pub async fn write(&mut self, accessor: impl AsAccessor, buffer: B) -> B where - B: Send + 'static, + T: func::Lower + 'static, + B: WriteBuffer, { - // FIXME: this is intended to be used in the future to directly - // manipulate state for this future within the store without having to - // go through an mpsc. - let _accessor = accessor.as_accessor(); - let (tx, rx) = oneshot::channel(); - send(self.tx.as_mut().unwrap(), WriteEvent::Write { buffer, tx }); - let v = rx.await; - match v { + let result = self + .instance + .host_write_async( + accessor.as_accessor(), + self.id, + buffer, + TransmitKind::Stream, + ) + .await; + + match result { Ok(HostResult { buffer, dropped }) => { if self.closed { debug_assert!(dropped); @@ -920,7 +941,7 @@ impl StreamWriter { self.closed = dropped; buffer } - Err(_) => todo!("guarantee buffer recovery if event loop errors or panics"), + Err(_) => todo!("guarantee buffer recovery if `host_write` fails"), } } @@ -936,8 +957,9 @@ impl StreamWriter { /// /// Panics if the store that the [`Accessor`] is derived from does not own /// this future. - pub async fn write_all(&mut self, accessor: impl AsAccessor, mut buffer: B) -> B + pub async fn write_all(&mut self, accessor: impl AsAccessor, mut buffer: B) -> B where + T: func::Lower + 'static, B: WriteBuffer, { let accessor = accessor.as_accessor(); @@ -953,78 +975,168 @@ impl StreamWriter { /// /// Panics if the store that the [`Accessor`] is derived from does not own /// this future. - pub async fn watch_reader(&mut self, accessor: impl AsAccessor) - where - B: Send + 'static, - { - // FIXME: this is intended to be used in the future to directly - // manipulate state for this future within the store without having to - // go through an mpsc. - let _accessor = accessor.as_accessor(); - let (tx, rx) = oneshot::channel(); - send(&mut self.tx.as_mut().unwrap(), WriteEvent::Watch { tx }); - let (future, _watch) = watch(self.instance, rx, ()); - future.await; + pub async fn watch_reader(&mut self, accessor: impl AsAccessor) { + watch_reader(accessor, self.instance, self.id).await + } + + /// Close this `StreamWriter`. + pub fn close(mut self, store: impl AsContextMut) -> Result<()> { + self.drop(store) + } + + /// Close this `StreamWriter`. + pub fn close_with(mut self, accessor: impl AsAccessor) -> Result<()> { + accessor.as_accessor().with(|access| self.drop(access)) } } -impl Drop for StreamWriter { - fn drop(&mut self) { - if let Some(mut tx) = self.tx.take() { - send(&mut tx, WriteEvent::Drop(None)); - } +impl DropWithStore for StreamWriter { + fn drop(&mut self, mut store: impl AsContextMut) -> Result<()> { + // `self` should never be used again, but leave an invalid handle there just in case. + let id = mem::replace(&mut self.id, TableId::new(0)); + self.instance + .host_drop_writer(store.as_context_mut(), id, None::<&dyn Fn() -> Result<()>>) } } -/// Represents the readable end of a Component Model `stream`. +/// A `StreamWriter` paired with an `Accessor`. /// -/// In order to actually read from or drop this `stream`, first convert it to a -/// [`FutureReader`] using the `into_reader` method. -/// -/// Note that if a value of this type is dropped without either being converted -/// to a `StreamReader` or passed to the guest, any writes on the write end may -/// block forever. -pub struct HostStream { - instance: Instance, - rep: u32, - _phantom: PhantomData, -} +/// This is an RAII wrapper around `StreamWriter` that ensures it is closed when +/// dropped. +pub struct GuardedStreamWriter<'a, T, U: 'static, D: HasData + ?Sized = HasSelf>( + WithAccessor<'a, StreamWriter, U, D>, +); -impl HostStream { - /// Create a new `HostStream`. - fn new(rep: u32, instance: Instance) -> Self { - Self { - instance, - rep, - _phantom: PhantomData, - } +impl<'a, T, U: 'static, D: HasData + ?Sized> GuardedStreamWriter<'a, T, U, D> { + /// Create a new `GuardedStreamWriter` with the specified `accessor` and `writer`. + pub fn new(accessor: &'a Accessor, writer: StreamWriter) -> Self { + Self(WithAccessor::new(accessor.as_accessor(), writer)) } - /// Convert this object into a [`StreamReader`]. - pub fn into_reader(self, mut store: impl AsContextMut) -> StreamReader - where - T: func::Lower + func::Lift + Send + 'static, - B: ReadBuffer, + /// Wrapper for `StreamWriter::is_closed` + pub fn is_closed(&self) -> bool { + self.0.inner.is_closed() + } + + /// Wrapper for `StreamWriter::write`. + pub async fn write(&mut self, buffer: B) -> B + where + T: func::Lower + 'static, + B: WriteBuffer, { - StreamReader { - instance: self.instance, - rep: self.rep, - tx: Some(self.instance.start_read_event_loop( - store.as_context_mut(), - self.rep, - TransmitKind::Stream, - )), + self.0.inner.write(self.0.accessor, buffer).await + } + + /// Wrapper for `StreamWriter::write_all`. + pub async fn write_all(&mut self, buffer: B) -> B + where + T: func::Lower + 'static, + B: WriteBuffer, + { + self.0.inner.write_all(self.0.accessor, buffer).await + } + + /// Wrapper for `StreamWriter::watch_reader`. + pub async fn watch_reader(&mut self) { + self.0.inner.watch_reader(self.0.accessor).await + } +} + +impl<'a, T, U: 'static, D: HasData + ?Sized> From> + for StreamWriter +{ + fn from(writer: GuardedStreamWriter<'a, T, U, D>) -> Self { + writer.0.into_parts().1 + } +} + +/// Represents the readable end of a Component Model `stream`. +/// +/// Note that `StreamReader` instances must be disposed of using `close`; +/// otherwise the in-store representation will leak and the writer end will hang +/// indefinitely. Consider using [`GuardedStreamReader`] to ensure that +/// disposal happens automatically. +pub struct StreamReader { + instance: Instance, + id: TableId, + closed: bool, + _phantom: PhantomData, +} + +impl StreamReader { + fn new(id: TableId, instance: Instance) -> Self { + Self { + instance, + id, closed: false, + _phantom: PhantomData, + } + } + + /// Returns whether this stream is "closed" meaning that the other end of + /// the stream has been dropped. + pub fn is_closed(&self) -> bool { + self.closed + } + + /// Read values from this `stream`. + /// + /// The returned `Future` will yield a `(Some(_), _)` if the read completed + /// (possibly with zero items if the write was empty). It will return + /// `(None, _)` if the read failed due to the closure of the write end. In + /// either case, the returned buffer will be the same one passed as a + /// parameter, with zero or more items added. + /// + /// # Panics + /// + /// Panics if the store that the [`Accessor`] is derived from does not own + /// this future. + pub async fn read(&mut self, accessor: impl AsAccessor, buffer: B) -> B + where + T: func::Lift + 'static, + B: ReadBuffer + Send + 'static, + { + let result = self + .instance + .host_read_async( + accessor.as_accessor(), + self.id, + buffer, + TransmitKind::Stream, + ) + .await; + + match result { + Ok(HostResult { buffer, dropped }) => { + if self.closed { + debug_assert!(dropped); + } + self.closed = dropped; + buffer + } + Err(_) => { + todo!("guarantee buffer recovery if `host_read` fails") + } } } - /// Convert this `HostStream` into a [`Val`]. + /// Wait until the write end of this `stream` is dropped. + /// + /// # Panics + /// + /// Panics if the store that the [`Accessor`] is derived from does not own + /// this future. + pub async fn watch_writer(&mut self, accessor: impl AsAccessor) { + watch_writer(accessor, self.instance, self.id).await + } + + /// Convert this `StreamReader` into a [`Val`]. // See TODO comment for `StreamAny`; this is prone to handle leakage. pub fn into_val(self) -> Val { - Val::Stream(StreamAny(self.rep)) + Val::Stream(StreamAny(self.id.rep())) } - /// Attempt to convert the specified [`Val`] to a `HostStream`. + /// Attempt to convert the specified [`Val`] to a `StreamReader`. pub fn from_val( mut store: impl AsContextMut, instance: Instance, @@ -1034,10 +1146,9 @@ impl HostStream { bail!("expected `stream`; got `{}`", value.desc()); }; let store = store.as_context_mut(); - instance - .concurrent_state_mut(store.0) - .get(TableId::::new(*rep))?; // Just make sure it's present - Ok(Self::new(*rep, instance)) + let id = TableId::::new(*rep); + instance.concurrent_state_mut(store.0).get(id)?; // Just make sure it's present + Ok(Self::new(id, instance)) } /// Transfer ownership of the read end of a stream from a guest to the host. @@ -1062,17 +1173,42 @@ impl HostStream { StreamFutureState::Busy => bail!("cannot transfer busy stream"), } - Ok(Self::new(rep, cx.instance_handle())) + let id = TableId::::new(rep); + + Ok(Self::new(id, cx.instance_handle())) } _ => func::bad_type_info(), } } + + /// Close this `StreamReader`. + pub fn close(mut self, store: impl AsContextMut) -> Result<()> { + self.drop(store) + } + + /// Close this `StreamReader`. + pub fn close_with(mut self, accessor: impl AsAccessor) -> Result<()> { + accessor.as_accessor().with(|access| self.drop(access)) + } +} + +impl DropWithStore for StreamReader { + fn drop(&mut self, mut store: impl AsContextMut) -> Result<()> { + // `self` should never be used again, but leave an invalid handle there just in case. + let id = mem::replace(&mut self.id, TableId::new(0)); + self.instance.host_drop_reader( + store.as_context_mut().0.traitobj_mut(), + id, + TransmitKind::Stream, + ) + } } -impl fmt::Debug for HostStream { +impl fmt::Debug for StreamReader { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("HostStream") - .field("rep", &self.rep) + f.debug_struct("StreamReader") + .field("id", &self.id) + .field("instance", &self.instance) .finish() } } @@ -1104,7 +1240,7 @@ pub(crate) fn lower_stream_to_index( // SAFETY: This relies on the `ComponentType` implementation for `u32` being // safe and correct since we lift and lower stream handles as `u32`s. -unsafe impl func::ComponentType for HostStream { +unsafe impl func::ComponentType for StreamReader { const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4; type Lower = ::Lower; @@ -1118,14 +1254,18 @@ unsafe impl func::ComponentType for HostStream { } // SAFETY: See the comment on the `ComponentType` `impl` for this type. -unsafe impl func::Lower for HostStream { +unsafe impl func::Lower for StreamReader { fn linear_lower_to_flat( &self, cx: &mut LowerContext<'_, U>, ty: InterfaceType, dst: &mut MaybeUninit, ) -> Result<()> { - lower_stream_to_index(self.rep, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst) + lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_flat( + cx, + InterfaceType::U32, + dst, + ) } fn linear_lower_to_memory( @@ -1134,7 +1274,7 @@ unsafe impl func::Lower for HostStream { ty: InterfaceType, offset: usize, ) -> Result<()> { - lower_stream_to_index(self.rep, cx, ty)?.linear_lower_to_memory( + lower_stream_to_index(self.id.rep(), cx, ty)?.linear_lower_to_memory( cx, InterfaceType::U32, offset, @@ -1143,7 +1283,7 @@ unsafe impl func::Lower for HostStream { } // SAFETY: See the comment on the `ComponentType` `impl` for this type. -unsafe impl func::Lift for HostStream { +unsafe impl func::Lift for StreamReader { fn linear_lift_from_flat( cx: &mut LiftContext<'_>, ty: InterfaceType, @@ -1163,108 +1303,45 @@ unsafe impl func::Lift for HostStream { } } -impl From> for HostStream { - fn from(mut value: StreamReader) -> Self { - value.tx.take(); - - Self { - instance: value.instance, - rep: value.rep, - _phantom: PhantomData, - } - } -} - -/// Represents the readable end of a Component Model `stream`. +/// A `StreamReader` paired with an `Accessor`. /// -/// In order to pass this end to guest code, first convert it to a -/// [`HostStream`] using the `into` method. -pub struct StreamReader { - instance: Instance, - rep: u32, - tx: Option>>, - closed: bool, -} +/// This is an RAII wrapper around `StreamReader` that ensures it is closed when +/// dropped. +pub struct GuardedStreamReader<'a, T, U: 'static, D: HasData + ?Sized = HasSelf>( + WithAccessor<'a, StreamReader, U, D>, +); -impl StreamReader { - fn new(rep: u32, tx: Option>>, instance: Instance) -> Self { - Self { - instance, - rep, - tx, - closed: false, - } +impl<'a, T, U: 'static, D: HasData + ?Sized> GuardedStreamReader<'a, T, U, D> { + /// Create a new `GuardedStreamReader` with the specified `accessor` and `reader`. + pub fn new(accessor: &'a Accessor, reader: StreamReader) -> Self { + Self(WithAccessor::new(accessor.as_accessor(), reader)) } - /// Returns whether this stream is "closed" meaning that the other end of - /// the stream has been dropped. + /// Wrapper for `StreamReader::is_closed` pub fn is_closed(&self) -> bool { - self.closed + self.0.inner.is_closed() } - /// Read values from this `stream`. - /// - /// The returned `Future` will yield a `(Some(_), _)` if the read completed - /// (possibly with zero items if the write was empty). It will return - /// `(None, _)` if the read failed due to the closure of the write end. In - /// either case, the returned buffer will be the same one passed as a - /// parameter, with zero or more items added. - /// - /// # Panics - /// - /// Panics if the store that the [`Accessor`] is derived from does not own - /// this future. - pub async fn read(&mut self, accessor: impl AsAccessor, buffer: B) -> B + /// Wrapper for `StreamReader::read`. + pub async fn read(&mut self, buffer: B) -> B where - B: Send + 'static, + T: func::Lift + 'static, + B: ReadBuffer + Send + 'static, { - // FIXME: this is intended to be used in the future to directly - // manipulate state for this future within the store without having to - // go through an mpsc. - let _accessor = accessor.as_accessor(); - let (tx, rx) = oneshot::channel(); - send(self.tx.as_mut().unwrap(), ReadEvent::Read { buffer, tx }); - let v = rx.await; - match v { - Ok(HostResult { buffer, dropped }) => { - if self.closed { - debug_assert!(dropped); - } - self.closed = dropped; - buffer - } - Err(_) => { - todo!("guarantee buffer recovery if event loop errors or panics") - } - } + self.0.inner.read(self.0.accessor, buffer).await } - /// Wait until the write end of this `stream` is dropped. - /// - /// # Panics - /// - /// Panics if the store that the [`Accessor`] is derived from does not own - /// this future. - pub async fn watch_writer(&mut self, accessor: impl AsAccessor) - where - B: Send + 'static, - { - // FIXME: this is intended to be used in the future to directly - // manipulate state for this future within the store without having to - // go through an mpsc. - let _accessor = accessor.as_accessor(); - let (tx, rx) = oneshot::channel(); - send(&mut self.tx.as_mut().unwrap(), ReadEvent::Watch { tx }); - let (future, _) = watch(self.instance, rx, ()); - future.await + /// Wrapper for `StreamReader::watch_writer`. + pub async fn watch_writer(&mut self) { + self.0.inner.watch_writer(self.0.accessor).await } } -impl Drop for StreamReader { - fn drop(&mut self) { - if let Some(mut tx) = self.tx.take() { - send(&mut tx, ReadEvent::Drop); - } +impl<'a, T, U: 'static, D: HasData + ?Sized> From> + for StreamReader +{ + fn from(reader: GuardedStreamReader<'a, T, U, D>) -> Self { + reader.0.into_parts().1 } } @@ -1431,14 +1508,14 @@ struct TransmitState { write: WriteState, /// See `ReadState` read: ReadState, - /// The `Sender`, if any, to be dropped when the write end of the stream or + /// The `Waker`, if any, to be woken when the write end of the stream or /// future is dropped. /// /// This will signal to the host-owned read end that the write end has been /// dropped. - writer_watcher: Option>, + writer_watcher: Option, /// Like `writer_watcher`, but for the reverse direction. - reader_watcher: Option>, + reader_watcher: Option, /// Whether futher values may be transmitted via this stream or future. done: bool, } @@ -1563,7 +1640,7 @@ enum Reader<'a> { }, /// The read end is owned by the host. Host { - accept: Box usize>, + accept: Box usize + 'a>, }, /// The read end has been dropped. End, @@ -1573,253 +1650,53 @@ impl Instance { /// Create a new Component Model `future` as pair of writable and readable ends, /// the latter of which may be passed to guest code. /// - /// The `default` parameter will be used if the returned `FutureWriter` is - /// dropped before `FutureWriter::write` is called. Since the write end of - /// a Component Model `future` must be written to before it is dropped, and - /// since Rust does not currently provide a way to statically enforce that - /// (e.g. linear typing), we use this mechanism to ensure a value is always - /// written prior to closing. - /// - /// If there's no plausible default value, and you're sure - /// `FutureWriter::write` will be called, you can consider passing `|| - /// unreachable!()` as the `default` parameter. + /// `default` is a callback to be used if the writable end of the future is + /// closed without having written a value. You may supply e.g. `|| + /// unreachable!()` if you're sure that won't happen. pub fn future( self, - default: fn() -> T, mut store: impl AsContextMut, + default: fn() -> T, ) -> Result<(FutureWriter, FutureReader)> { - let mut store = store.as_context_mut(); - let (write, read) = self.concurrent_state_mut(store.0).new_transmit()?; + let (write, read) = self + .concurrent_state_mut(store.as_context_mut().0) + .new_transmit()?; Ok(( - FutureWriter::new( - default, - Some(self.start_write_event_loop( - store.as_context_mut(), - write.rep(), - TransmitKind::Future, - )), - self, - ), - FutureReader::new( - read.rep(), - Some(self.start_read_event_loop( - store.as_context_mut(), - read.rep(), - TransmitKind::Future, - )), - self, - ), + FutureWriter::new(default, write, self), + FutureReader::new(read, self), )) } /// Create a new Component Model `stream` as pair of writable and readable ends, /// the latter of which may be passed to guest code. - pub fn stream< - T: func::Lower + func::Lift + Send + 'static, - W: WriteBuffer, - R: ReadBuffer, - >( + pub fn stream( self, mut store: impl AsContextMut, - ) -> Result<(StreamWriter, StreamReader)> { - let mut store = store.as_context_mut(); - let (write, read) = self.concurrent_state_mut(store.0).new_transmit()?; + ) -> Result<(StreamWriter, StreamReader)> { + let (write, read) = self + .concurrent_state_mut(store.as_context_mut().0) + .new_transmit()?; Ok(( - StreamWriter::new( - Some(self.start_write_event_loop( - store.as_context_mut(), - write.rep(), - TransmitKind::Stream, - )), - self, - ), - StreamReader::new( - read.rep(), - Some(self.start_read_event_loop( - store.as_context_mut(), - read.rep(), - TransmitKind::Stream, - )), - self, - ), + StreamWriter::new(write, self), + StreamReader::new(read, self), )) } - /// Spawn a background task to be polled in this instance's event loop. - /// - /// The spawned task will accept host events from the `Receiver` corresponding to - /// the returned `Sender`, handling each event it receives and then exiting - /// when the channel is dropped. - /// - /// We handle `StreamWriter` and `FutureWriter` operations this way so that - /// they can be initiated without access to the store and possibly outside - /// the instance's event loop, improving the ergonmics for host embedders. - fn start_write_event_loop< - T: func::Lower + func::Lift + Send + 'static, - B: WriteBuffer, - U, - >( - self, - mut store: StoreContextMut, - rep: u32, - kind: TransmitKind, - ) -> mpsc::Sender> { - let (tx, mut rx) = mpsc::channel(1); - let id = TableId::::new(rep); - let run_on_drop = - RunOnDrop::new(move || log::trace!("write event loop for {id:?} dropped")); - let token = StoreToken::new(store.as_context_mut()); - let task = Box::pin( - async move { - log::trace!("write event loop for {id:?} started"); - let mut my_rep = None; - while let Some(event) = rx.next().await { - if my_rep.is_none() { - my_rep = Some(self.get_state_rep(rep)?); - } - let rep = my_rep.unwrap(); - match event { - WriteEvent::Write { buffer, tx } => tls::get(|store| { - self.host_write::<_, _, U>( - token.as_context_mut(store), - rep, - buffer, - PostWrite::Continue, - tx, - kind, - ) - })?, - WriteEvent::Drop(default) => tls::get(|store| { - if let Some(default) = default { - self.host_write::<_, _, U>( - token.as_context_mut(store), - rep, - default(), - PostWrite::Continue, - oneshot::channel().0, - kind, - )?; - } - self.concurrent_state_mut(store).host_drop_writer(rep, kind) - })?, - WriteEvent::Watch { tx } => tls::get(|store| { - let state = - self.concurrent_state_mut(store) - .get_mut(TableId::::new(rep))?; - if !matches!(&state.read, ReadState::Dropped) { - state.reader_watcher = Some(tx); - } - Ok::<_, anyhow::Error>(()) - })?, - } - } - Ok(()) - } - .map(move |v| { - run_on_drop.cancel(); - log::trace!("write event loop for {id:?} finished: {v:?}"); - HostTaskOutput::Result(v) - }), - ); - self.concurrent_state_mut(store.0).push_future(task); - tx - } - - /// Same as `Self::start_write_event_loop`, but for the read end of a stream - /// or future. - fn start_read_event_loop, U>( - self, - mut store: StoreContextMut, - rep: u32, - kind: TransmitKind, - ) -> mpsc::Sender> { - let (tx, mut rx) = mpsc::channel(1); - let id = TableId::::new(rep); - let run_on_drop = RunOnDrop::new(move || log::trace!("read event loop for {id:?} dropped")); - let token = StoreToken::new(store.as_context_mut()); - let task = Box::pin( - async move { - log::trace!("read event loop for {id:?} started"); - let mut my_rep = None; - while let Some(event) = rx.next().await { - if my_rep.is_none() { - my_rep = Some(self.get_state_rep(rep)?); - } - let rep = my_rep.unwrap(); - match event { - ReadEvent::Read { buffer, tx } => tls::get(|store| { - self.host_read::<_, _, U>( - token.as_context_mut(store), - rep, - buffer, - tx, - kind, - ) - })?, - ReadEvent::Drop => { - tls::get(|store| self.host_drop_reader(store, rep, kind))? - } - ReadEvent::Watch { tx } => tls::get(|store| { - let state = - self.concurrent_state_mut(store) - .get_mut(TableId::::new(rep))?; - if !matches!( - &state.write, - WriteState::Dropped - | WriteState::GuestReady { - post_write: PostWrite::Drop, - .. - } - | WriteState::HostReady { - post_write: PostWrite::Drop, - .. - } - ) { - state.writer_watcher = Some(tx); - } - Ok::<_, anyhow::Error>(()) - })?, - } - } - Ok(()) - } - .map(move |v| { - run_on_drop.cancel(); - log::trace!("read event loop for {id:?} finished: {v:?}"); - HostTaskOutput::Result(v) - }), - ); - self.concurrent_state_mut(store.0).push_future(task); - tx - } - /// Write to the specified stream or future from the host. - /// - /// # Arguments - /// - /// * `store` - The store to which this instance belongs - /// * `transmit_rep` - The `TransmitState` rep for the stream or future - /// * `buffer` - Buffer of values that should be written - /// * `post_write` - Whether the transmit should be dropped after write, possibly with an error context - /// * `tx` - Oneshot channel to notify when operation completes (or drop on error) - /// * `kind` - whether this is a stream or a future fn host_write, U>( self, mut store: StoreContextMut, - transmit_rep: u32, + id: TableId, mut buffer: B, - mut post_write: PostWrite, - tx: oneshot::Sender>, kind: TransmitKind, - ) -> Result<()> { - let mut store = store.as_context_mut(); - let transmit_id = TableId::::new(transmit_rep); + ) -> Result, oneshot::Receiver>>> { + let transmit_id = self.concurrent_state_mut(store.0).get(id)?.state; let transmit = self .concurrent_state_mut(store.0) .get_mut(transmit_id) - .with_context(|| format!("retrieving state for transmit [{transmit_rep}]"))?; + .with_context(|| format!("retrieving state for transmit [{transmit_id:?}]"))?; log::trace!("host_write state {transmit_id:?}; {:?}", transmit.read); let new_state = if let ReadState::Dropped = &transmit.read { @@ -1828,23 +1705,31 @@ impl Instance { ReadState::Open }; - match mem::replace(&mut transmit.read, new_state) { + Ok(match mem::replace(&mut transmit.read, new_state) { ReadState::Open => { assert!(matches!(&transmit.write, WriteState::Open)); + let token = StoreToken::new(store.as_context_mut()); + let (tx, rx) = oneshot::channel(); let state = WriteState::HostReady { - accept: Box::new(accept_reader::( - store.as_context_mut(), - buffer, - tx, - kind, - )), - post_write, + accept: Box::new(move |store, instance, reader| { + let (result, code) = accept_reader::( + token.as_context_mut(store), + instance, + reader, + buffer, + kind, + )?; + _ = tx.send(result); + Ok(code) + }), + post_write: PostWrite::Continue, }; self.concurrent_state_mut(store.0) .get_mut(transmit_id)? .write = state; - post_write = PostWrite::Continue; + + Err(rx) } ReadState::GuestReady { @@ -1861,30 +1746,61 @@ impl Instance { } let read_handle = transmit.read_handle; - let code = accept_reader::(store.as_context_mut(), buffer, tx, kind)( - store.0.traitobj_mut(), - self, - Reader::Guest { - options: &options, - ty, - address, - count, - }, - )?; - - self.concurrent_state_mut(store.0).set_event( - read_handle.rep(), - match ty { - TableIndex::Future(ty) => Event::FutureRead { - code, - pending: Some((ty, handle)), + let accept = move |mut store: StoreContextMut| { + let (result, code) = accept_reader::( + store.as_context_mut(), + self, + Reader::Guest { + options: &options, + ty, + address, + count, }, - TableIndex::Stream(ty) => Event::StreamRead { - code, - pending: Some((ty, handle)), + buffer, + kind, + )?; + + self.concurrent_state_mut(store.0).set_event( + read_handle.rep(), + match ty { + TableIndex::Future(ty) => Event::FutureRead { + code, + pending: Some((ty, handle)), + }, + TableIndex::Stream(ty) => Event::StreamRead { + code, + pending: Some((ty, handle)), + }, }, - }, - )?; + )?; + + anyhow::Ok(result) + }; + + if + // TODO: Check if payload is "flat" + false { + // Optimize flat payloads (i.e. those which do not require + // calling the guest's realloc function) by lowering + // directly instead of using a oneshot::channel and + // background task. + Ok(accept(store)?) + } else { + // Otherwise, for payloads which may require a realloc call, + // use a oneshot::channel and background task. This is + // necessary because calling the guest while there are host + // embedder frames on the stack is unsound. + let (tx, rx) = oneshot::channel(); + let token = StoreToken::new(store.as_context_mut()); + self.concurrent_state_mut(store.0) + .push_future(Box::pin(async move { + HostTaskOutput::Result(tls::get(|store| { + _ = tx.send(accept(token.as_context_mut(store))?); + Ok(()) + })) + })); + Err(rx) + } } ReadState::HostReady { accept } => { @@ -1898,51 +1814,49 @@ impl Instance { unreachable!() }; - _ = tx.send(HostResult { + Ok(HostResult { buffer, dropped: false, - }); + }) } - ReadState::Dropped => { - _ = tx.send(HostResult { - buffer, - dropped: true, - }); - } - } + ReadState::Dropped => Ok(HostResult { + buffer, + dropped: true, + }), + }) + } - if let PostWrite::Drop = post_write { - self.concurrent_state_mut(store.0) - .host_drop_writer(transmit_rep, kind)?; + /// Async wrapper around `Self::host_write`. + async fn host_write_async>( + self, + accessor: impl AsAccessor, + id: TableId, + buffer: B, + kind: TransmitKind, + ) -> Result> { + match accessor + .as_accessor() + .with(move |mut access| self.host_write(access.as_context_mut(), id, buffer, kind))? + { + Ok(result) => Ok(result), + Err(rx) => Ok(rx.await?), } - - Ok(()) } /// Read from the specified stream or future from the host. - /// - /// # Arguments - /// - /// * `store` - The store to which this instance belongs - /// * `rep` - The `TransmitState` rep for the stream or future - /// * `buffer` - Buffer to receive values - /// * `tx` - Oneshot channel to notify when operation completes (or drop on error) - /// * `kind` - whether this is a stream or a future fn host_read, U>( self, - mut store: StoreContextMut, - rep: u32, + store: StoreContextMut, + id: TableId, mut buffer: B, - tx: oneshot::Sender>, kind: TransmitKind, - ) -> Result<()> { - let store = store.as_context_mut(); - let transmit_id = TableId::::new(rep); + ) -> Result, oneshot::Receiver>>> { + let transmit_id = self.concurrent_state_mut(store.0).get(id)?.state; let transmit = self .concurrent_state_mut(store.0) .get_mut(transmit_id) - .with_context(|| rep.to_string())?; + .with_context(|| format!("retrieving state for transmit [{transmit_id:?}]"))?; log::trace!("host_read state {transmit_id:?}; {:?}", transmit.write); let new_state = if let WriteState::Dropped = &transmit.write { @@ -1951,13 +1865,20 @@ impl Instance { WriteState::Open }; - match mem::replace(&mut transmit.write, new_state) { + Ok(match mem::replace(&mut transmit.write, new_state) { WriteState::Open => { assert!(matches!(&transmit.read, ReadState::Open)); + let (tx, rx) = oneshot::channel(); transmit.read = ReadState::HostReady { - accept: Box::new(accept_writer::(buffer, tx, kind)), + accept: Box::new(move |writer| { + let (result, code) = accept_writer::(writer, buffer, kind)?; + _ = tx.send(result); + Ok(code) + }), }; + + Err(rx) } WriteState::GuestReady { @@ -1976,12 +1897,16 @@ impl Instance { let write_handle = transmit.write_handle; let lift = &mut LiftContext::new(store.0.store_opaque_mut(), &options, self); - let code = accept_writer::(buffer, tx, kind)(Writer::Guest { - ty: payload(ty, lift.types), - lift, - address, - count, - })?; + let (result, code) = accept_writer::( + Writer::Guest { + ty: payload(ty, lift.types), + lift, + address, + count, + }, + buffer, + kind, + )?; let state = self.concurrent_state_mut(store.0); let pending = if let PostWrite::Drop = post_write { @@ -2004,6 +1929,8 @@ impl Instance { }, }, )?; + + Ok(result) } WriteState::HostReady { accept, post_write } => { @@ -2011,13 +1938,9 @@ impl Instance { store.0.traitobj_mut(), self, Reader::Host { - accept: Box::new(move |input, count| { + accept: Box::new(|input, count| { let count = count.min(buffer.remaining_capacity()); buffer.move_from(input.get_mut::(), count); - _ = tx.send(HostResult { - buffer, - dropped: false, - }); count }), }, @@ -2028,36 +1951,49 @@ impl Instance { .get_mut(transmit_id)? .write = WriteState::Dropped; } - } - WriteState::Dropped => { - _ = tx.send(HostResult { + Ok(HostResult { buffer, - dropped: true, - }); + dropped: false, + }) } - } - Ok(()) + WriteState::Dropped => Ok(HostResult { + buffer, + dropped: true, + }), + }) + } + + /// Async wrapper around `Self::host_read`. + async fn host_read_async>( + self, + accessor: impl AsAccessor, + id: TableId, + buffer: B, + kind: TransmitKind, + ) -> Result> { + match accessor + .as_accessor() + .with(move |mut access| self.host_read(access.as_context_mut(), id, buffer, kind))? + { + Ok(result) => Ok(result), + Err(rx) => Ok(rx.await?), + } } /// Drop the read end of a stream or future read from the host. - /// - /// # Arguments - /// - /// * `store` - The store to which this instance belongs - /// * `transmit_rep` - The `TransmitState` rep for the stream or future. fn host_drop_reader( self, store: &mut dyn VMStore, - transmit_rep: u32, + id: TableId, kind: TransmitKind, ) -> Result<()> { - let transmit_id = TableId::::new(transmit_rep); + let transmit_id = self.concurrent_state_mut(store).get(id)?.state; let state = self.concurrent_state_mut(store); let transmit = state .get_mut(transmit_id) - .with_context(|| format!("error closing reader {transmit_rep}"))?; + .with_context(|| format!("error closing reader {transmit_id:?}"))?; log::trace!( "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}", transmit.read, @@ -2065,59 +2001,186 @@ impl Instance { ); transmit.read = ReadState::Dropped; - transmit.reader_watcher = None; + if let Some(waker) = transmit.reader_watcher.take() { + waker.wake(); + } + + // If the write end is already dropped, it should stay dropped, + // otherwise, it should be opened. + let new_state = if let WriteState::Dropped = &transmit.write { + WriteState::Dropped + } else { + WriteState::Open + }; + + let write_handle = transmit.write_handle; + + match mem::replace(&mut transmit.write, new_state) { + // If a guest is waiting to write, notify it that the read end has + // been dropped. + WriteState::GuestReady { + ty, + handle, + post_write, + .. + } => { + if let PostWrite::Drop = post_write { + state.delete_transmit(transmit_id)?; + } else { + state.update_event( + write_handle.rep(), + match ty { + TableIndex::Future(ty) => Event::FutureWrite { + code: ReturnCode::Dropped(0), + pending: Some((ty, handle)), + }, + TableIndex::Stream(ty) => Event::StreamWrite { + code: ReturnCode::Dropped(0), + pending: Some((ty, handle)), + }, + }, + )?; + }; + } + + WriteState::HostReady { accept, .. } => { + accept(store, self, Reader::End)?; + } + + WriteState::Open => { + state.update_event( + write_handle.rep(), + match kind { + TransmitKind::Future => Event::FutureWrite { + code: ReturnCode::Dropped(0), + pending: None, + }, + TransmitKind::Stream => Event::StreamWrite { + code: ReturnCode::Dropped(0), + pending: None, + }, + }, + )?; + } + + WriteState::Dropped => { + log::trace!("host_drop_reader delete {transmit_id:?}"); + state.delete_transmit(transmit_id)?; + } + } + Ok(()) + } + + /// Drop the write end of a stream or future read from the host. + fn host_drop_writer( + self, + mut store: StoreContextMut, + id: TableId, + default: Option<&dyn Fn() -> Result>, + ) -> Result<()> { + let transmit_id = self.concurrent_state_mut(store.0).get(id)?.state; + let token = StoreToken::new(store.as_context_mut()); + let transmit = self + .concurrent_state_mut(store.0) + .get_mut(transmit_id) + .with_context(|| format!("error closing writer {transmit_id:?}"))?; + log::trace!( + "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}", + transmit.read, + transmit.write + ); + + if let Some(waker) = transmit.writer_watcher.take() { + waker.wake(); + } + + // Existing queued transmits must be updated with information for the impending writer closure + match &mut transmit.write { + WriteState::GuestReady { .. } => { + unreachable!("can't call `host_drop_writer` on a guest-owned writer"); + } + WriteState::HostReady { post_write, .. } => { + *post_write = PostWrite::Drop; + } + v @ WriteState::Open => { + *v = if let (Some(default), false) = ( + default, + transmit.done || matches!(transmit.read, ReadState::Dropped), + ) { + // This is a future, and we haven't written a value yet -- + // write the default value. + let default = default()?; + WriteState::HostReady { + accept: Box::new(move |store, instance, reader| { + let (_, code) = accept_reader::, U>( + token.as_context_mut(store), + instance, + reader, + Some(default), + TransmitKind::Future, + )?; + Ok(code) + }), + post_write: PostWrite::Drop, + } + } else { + WriteState::Dropped + }; + } + WriteState::Dropped => unreachable!("write state is already dropped"), + } - // If the write end is already dropped, it should stay dropped, - // otherwise, it should be opened. - let new_state = if let WriteState::Dropped = &transmit.write { - WriteState::Dropped + // If the existing read state is dropped, then there's nothing to read + // and we can keep it that way. + // + // If the read state was any other state, then we must set the new state to open + // to indicate that there *is* data to be read + let new_state = if let ReadState::Dropped = &transmit.read { + ReadState::Dropped } else { - WriteState::Open + ReadState::Open }; - let write_handle = transmit.write_handle; + let read_handle = transmit.read_handle; - match mem::replace(&mut transmit.write, new_state) { - // If a guest is waiting to write, notify it that the read end has - // been dropped. - WriteState::GuestReady { - ty, - handle, - post_write, - .. - } => { - if let PostWrite::Drop = post_write { - state.delete_transmit(transmit_id)?; - } else { - state.update_event( - write_handle.rep(), - match ty { - TableIndex::Future(ty) => Event::FutureWrite { - code: ReturnCode::Dropped(0), - pending: Some((ty, handle)), - }, - TableIndex::Stream(ty) => Event::StreamWrite { - code: ReturnCode::Dropped(0), - pending: Some((ty, handle)), - }, + // Swap in the new read state + match mem::replace(&mut transmit.read, new_state) { + // If the guest was ready to read, then we cannot drop the reader (or writer); + // we must deliver the event, and update the state associated with the handle to + // represent that a read must be performed + ReadState::GuestReady { ty, handle, .. } => { + // Ensure the final read of the guest is queued, with appropriate closure indicator + self.concurrent_state_mut(store.0).update_event( + read_handle.rep(), + match ty { + TableIndex::Future(ty) => Event::FutureRead { + code: ReturnCode::Dropped(0), + pending: Some((ty, handle)), }, - )?; - }; + TableIndex::Stream(ty) => Event::StreamRead { + code: ReturnCode::Dropped(0), + pending: Some((ty, handle)), + }, + }, + )?; } - WriteState::HostReady { accept, .. } => { - accept(store, self, Reader::End)?; + // If the host was ready to read, and the writer end is being dropped (host->host write?) + // signal to the reader that we've reached the end of the stream + ReadState::HostReady { accept } => { + accept(Writer::End)?; } - WriteState::Open => { - state.update_event( - write_handle.rep(), - match kind { - TransmitKind::Future => Event::FutureWrite { + // If the read state is open, then there are no registered readers of the stream/future + ReadState::Open => { + self.concurrent_state_mut(store.0).update_event( + read_handle.rep(), + match default { + Some(_) => Event::FutureRead { code: ReturnCode::Dropped(0), pending: None, }, - TransmitKind::Stream => Event::StreamWrite { + None => Event::StreamRead { code: ReturnCode::Dropped(0), pending: None, }, @@ -2125,14 +2188,62 @@ impl Instance { )?; } - WriteState::Dropped => { - log::trace!("host_drop_reader delete {transmit_rep}"); - state.delete_transmit(transmit_id)?; + // If the read state was already dropped, then we can remove the transmit state completely + // (both writer and reader have been dropped) + ReadState::Dropped => { + log::trace!("host_drop_writer delete {transmit_id:?}"); + self.concurrent_state_mut(store.0) + .delete_transmit(transmit_id)?; } } Ok(()) } + /// Drop the writable end of the specified stream or future from the guest. + pub(super) fn guest_drop_writable( + self, + store: StoreContextMut, + ty: TableIndex, + writer: u32, + ) -> Result<()> { + let (transmit_rep, state) = self + .concurrent_state_mut(store.0) + .state_table(ty) + .remove_by_index(writer) + .context("failed to find writer")?; + let (state, kind) = match state { + WaitableState::Stream(_, state) => (state, TransmitKind::Stream), + WaitableState::Future(_, state) => (state, TransmitKind::Future), + _ => { + bail!("invalid stream or future handle"); + } + }; + match state { + StreamFutureState::Write { .. } => {} + StreamFutureState::Read { .. } => { + bail!("passed read end to `{{stream|future}}.drop-writable`") + } + StreamFutureState::Busy => bail!("cannot drop busy stream or future"), + } + + let id = TableId::::new(transmit_rep); + log::trace!("guest_drop_writable: drop writer {id:?}"); + match kind { + TransmitKind::Stream => { + self.host_drop_writer(store, id, None::<&dyn Fn() -> Result<()>>) + } + TransmitKind::Future => self.host_drop_writer( + store, + id, + Some(&|| { + Err::<(), _>(anyhow!( + "cannot drop future write end without first writing a value" + )) + }), + ), + } + } + /// Copy `count` items from `read_address` to `write_address` for the /// specified stream or future. fn copy( @@ -2724,9 +2835,8 @@ impl Instance { StreamFutureState::Busy => bail!("cannot drop busy stream or future"), } let id = TableId::::new(rep); - let rep = concurrent_state.get(id)?.state.rep(); log::trace!("guest_drop_readable: drop reader {id:?}"); - self.host_drop_reader(store, rep, kind) + self.host_drop_reader(store, id, kind) } /// Create a new error context for the given component. @@ -2842,40 +2952,6 @@ impl Instance { ) -> Result<()> { self.guest_drop_readable(store, TableIndex::Stream(ty), reader) } - - /// Retrieve the `TransmitState` rep for the specified `TransmitHandle` rep. - fn get_state_rep(&self, rep: u32) -> Result { - tls::get(|store| { - let transmit_handle = TableId::::new(rep); - Ok(self - .concurrent_state_mut(store) - .get(transmit_handle) - .with_context(|| format!("stream or future {transmit_handle:?} not found"))? - .state - .rep()) - }) - } -} - -/// Helper struct for running a closure on drop, e.g. for logging purposes. -struct RunOnDrop(Option); - -impl RunOnDrop { - fn new(fun: F) -> Self { - Self(Some(fun)) - } - - fn cancel(mut self) { - self.0 = None; - } -} - -impl Drop for RunOnDrop { - fn drop(&mut self) { - if let Some(fun) = self.0.take() { - fun(); - } - } } impl ConcurrentState { @@ -3099,113 +3175,6 @@ impl ConcurrentState { Ok(code) } - /// Drop the write end of a stream or future read from the host. - /// - /// # Arguments - /// - /// * `transmit_rep` - The `TransmitState` rep for the stream or future. - fn host_drop_writer(&mut self, transmit_rep: u32, kind: TransmitKind) -> Result<()> { - let transmit_id = TableId::::new(transmit_rep); - let transmit = self - .get_mut(transmit_id) - .with_context(|| format!("error closing writer {transmit_rep}"))?; - log::trace!( - "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}", - transmit.read, - transmit.write - ); - - transmit.writer_watcher = None; - - // Existing queued transmits must be updated with information for the impending writer closure - match &mut transmit.write { - WriteState::GuestReady { post_write, .. } => { - *post_write = PostWrite::Drop; - } - WriteState::HostReady { post_write, .. } => { - *post_write = PostWrite::Drop; - } - v @ WriteState::Open => { - if let (TransmitKind::Future, false) = ( - kind, - transmit.done || matches!(transmit.read, ReadState::Dropped), - ) { - bail!("cannot drop future write end without first writing a value") - } - - *v = WriteState::Dropped; - } - WriteState::Dropped => unreachable!("write state is already dropped"), - } - - // If the existing read state is dropped, then there's nothing to read - // and we can keep it that way. - // - // If the read state was any other state, then we must set the new state to open - // to indicate that there *is* data to be read - let new_state = if let ReadState::Dropped = &transmit.read { - ReadState::Dropped - } else { - ReadState::Open - }; - - let read_handle = transmit.read_handle; - - // Swap in the new read state - match mem::replace(&mut transmit.read, new_state) { - // If the guest was ready to read, then we cannot drop the reader (or writer) - // we must deliver the event, and update the state associated with the handle to - // represent that a read must be performed - ReadState::GuestReady { ty, handle, .. } => { - // Ensure the final read of the guest is queued, with appropriate closure indicator - self.update_event( - read_handle.rep(), - match ty { - TableIndex::Future(ty) => Event::FutureRead { - code: ReturnCode::Dropped(0), - pending: Some((ty, handle)), - }, - TableIndex::Stream(ty) => Event::StreamRead { - code: ReturnCode::Dropped(0), - pending: Some((ty, handle)), - }, - }, - )?; - } - - // If the host was ready to read, and the writer end is being dropped (host->host write?) - // signal to the reader that we've reached the end of the stream - ReadState::HostReady { accept } => { - accept(Writer::End)?; - } - - // If the read state is open, then there are no registered readers of the stream/future - ReadState::Open => { - self.update_event( - read_handle.rep(), - match kind { - TransmitKind::Future => Event::FutureRead { - code: ReturnCode::Dropped(0), - pending: None, - }, - TransmitKind::Stream => Event::StreamRead { - code: ReturnCode::Dropped(0), - pending: None, - }, - }, - )?; - } - - // If the read state was already dropped, then we can remove the transmit state completely - // (both writer and reader have been dropped) - ReadState::Dropped => { - log::trace!("host_drop_writer delete {transmit_rep}"); - self.delete_transmit(transmit_id)?; - } - } - Ok(()) - } - /// Cancel a pending write for the specified stream or future from the guest. fn guest_cancel_write( &mut self, @@ -3264,33 +3233,6 @@ impl ConcurrentState { self.host_cancel_read(rep) } - /// Drop the writable end of the specified stream or future from the guest. - fn guest_drop_writable(&mut self, ty: TableIndex, writer: u32) -> Result<()> { - let (transmit_rep, state) = self - .state_table(ty) - .remove_by_index(writer) - .context("failed to find writer")?; - let (state, kind) = match state { - WaitableState::Stream(_, state) => (state, TransmitKind::Stream), - WaitableState::Future(_, state) => (state, TransmitKind::Future), - _ => { - bail!("invalid stream or future handle"); - } - }; - match state { - StreamFutureState::Write { .. } => {} - StreamFutureState::Read { .. } => { - bail!("passed read end to `{{stream|future}}.drop-writable`") - } - StreamFutureState::Busy => bail!("cannot drop busy stream or future"), - } - - let id = TableId::::new(transmit_rep); - let transmit_rep = self.get(id)?.state.rep(); - log::trace!("guest_drop_writable: drop writer {id:?}"); - self.host_drop_writer(transmit_rep, kind) - } - /// Drop the specified error context. pub(crate) fn error_context_drop( &mut self, @@ -3411,15 +3353,6 @@ impl ConcurrentState { .map(|result| result.encode()) } - /// Implements the `future.drop-writable` intrinsic. - pub(crate) fn future_drop_writable( - &mut self, - ty: TypeFutureTableIndex, - writer: u32, - ) -> Result<()> { - self.guest_drop_writable(TableIndex::Future(ty), writer) - } - /// Implements the `stream.new` intrinsic. pub(crate) fn stream_new(&mut self, ty: TypeStreamTableIndex) -> Result { self.guest_new(TableIndex::Stream(ty)) @@ -3447,15 +3380,6 @@ impl ConcurrentState { .map(|result| result.encode()) } - /// Implements the `stream.drop-writable` intrinsic. - pub(crate) fn stream_drop_writable( - &mut self, - ty: TypeStreamTableIndex, - writer: u32, - ) -> Result<()> { - self.guest_drop_writable(TableIndex::Stream(ty), writer) - } - /// Transfer ownership of the specified future read end from one guest to /// another. pub(crate) fn future_transfer( diff --git a/crates/wasmtime/src/runtime/component/concurrent_disabled.rs b/crates/wasmtime/src/runtime/component/concurrent_disabled.rs index 1a661dd4da0e..186676180754 100644 --- a/crates/wasmtime/src/runtime/component/concurrent_disabled.rs +++ b/crates/wasmtime/src/runtime/component/concurrent_disabled.rs @@ -80,12 +80,12 @@ impl ErrorContext { } } -pub struct HostStream

{ +pub struct StreamReader

{ uninhabited: Uninhabited, _phantom: PhantomData

, } -impl

HostStream

{ +impl

StreamReader

{ pub(crate) fn into_val(self) -> Val { match self.uninhabited {} } @@ -107,12 +107,12 @@ impl

HostStream

{ } } -pub struct HostFuture

{ +pub struct FutureReader

{ uninhabited: Uninhabited, _phantom: PhantomData

, } -impl

HostFuture

{ +impl

FutureReader

{ pub(crate) fn into_val(self) -> Val { match self.uninhabited {} } diff --git a/crates/wasmtime/src/runtime/component/linker.rs b/crates/wasmtime/src/runtime/component/linker.rs index 71bae434742f..2f68d4b5e663 100644 --- a/crates/wasmtime/src/runtime/component/linker.rs +++ b/crates/wasmtime/src/runtime/component/linker.rs @@ -797,6 +797,59 @@ impl LinkerInstance<'_, T> { Ok(()) } + /// Identical to [`Self::resource`], except that it takes a concurrent destructor. + #[cfg(feature = "component-model-async")] + pub fn resource_concurrent(&mut self, name: &str, ty: ResourceType, dtor: F) -> Result<()> + where + T: Send + 'static, + F: Fn(&Accessor, u32) -> Pin> + Send + '_>> + + Send + + Sync + + 'static, + { + assert!( + self.engine.config().async_support, + "cannot use `resource_concurrent` without enabling async support in the config" + ); + // TODO: This isn't really concurrent -- it requires exclusive access to + // the store for the duration of the call, preventing guest code from + // running until it completes. We should make it concurrent and clean + // up the implementation to avoid using e.g. `Accessor::new` and + // `tls::set` directly. + let dtor = Arc::new(dtor); + let dtor = Arc::new(crate::func::HostFunc::wrap_inner( + &self.engine, + move |mut cx: crate::Caller<'_, T>, (param,): (u32,)| { + let dtor = dtor.clone(); + cx.as_context_mut().block_on(move |mut store| { + Box::pin(async move { + // NOTE: We currently pass `None` as the `instance` + // parameter to `Accessor::new` because we don't have ready + // access to it, meaning `dtor` will panic if it tries to + // use `Accessor::instance`. We could plumb that through + // from the `wasmtime-cranelift`-generated code, but we plan + // to remove `Accessor::instance` once all instances in a + // store share the same concurrent state, at which point we + // won't need it anyway. + let accessor = &Accessor::new( + crate::store::StoreToken::new(store.as_context_mut()), + None, + ); + let mut future = std::pin::pin!(dtor(accessor, param)); + std::future::poll_fn(|cx| { + crate::component::concurrent::tls::set(store.0.traitobj_mut(), || { + future.as_mut().poll(cx) + }) + }) + .await + }) + })? + }, + )); + self.insert(name, Definition::Resource(ty, dtor))?; + Ok(()) + } + /// Defines a nested instance within this instance. /// /// This can be used to describe arbitrarily nested levels of instances diff --git a/crates/wasmtime/src/runtime/component/mod.rs b/crates/wasmtime/src/runtime/component/mod.rs index cf7b662981a1..9219fd8513e0 100644 --- a/crates/wasmtime/src/runtime/component/mod.rs +++ b/crates/wasmtime/src/runtime/component/mod.rs @@ -120,8 +120,9 @@ pub use self::component::{Component, ComponentExportIndex}; #[cfg(feature = "component-model-async")] pub use self::concurrent::{ AbortHandle, Access, Accessor, AccessorTask, AsAccessor, ErrorContext, FutureReader, - FutureWriter, HostFuture, HostStream, ReadBuffer, StreamReader, StreamWriter, - VMComponentAsyncStore, VecBuffer, Watch, WriteBuffer, + FutureWriter, GuardedFutureReader, GuardedFutureWriter, GuardedStreamReader, + GuardedStreamWriter, ReadBuffer, StreamReader, StreamWriter, VMComponentAsyncStore, VecBuffer, + WriteBuffer, }; pub use self::func::{ ComponentNamedList, ComponentType, Func, Lift, Lower, TypedFunc, WasmList, WasmStr, diff --git a/crates/wasmtime/src/runtime/component/store.rs b/crates/wasmtime/src/runtime/component/store.rs index 71e214d72c5d..edb251ed5e5a 100644 --- a/crates/wasmtime/src/runtime/component/store.rs +++ b/crates/wasmtime/src/runtime/component/store.rs @@ -32,8 +32,9 @@ impl ComponentStoreData { } #[cfg(feature = "component-model-async")] - pub(crate) fn drop_fibers(store: &mut StoreOpaque) { + pub(crate) fn drop_fibers_and_futures(store: &mut StoreOpaque) { let mut fibers = Vec::new(); + let mut futures = Vec::new(); for (_, instance) in store.store_data_mut().components.instances.iter_mut() { let Some(instance) = instance.as_mut() else { continue; @@ -42,12 +43,14 @@ impl ComponentStoreData { instance .get_mut() .concurrent_state_mut() - .take_fibers(&mut fibers); + .take_fibers_and_futures(&mut fibers, &mut futures); } for mut fiber in fibers { fiber.dispose(store); } + + crate::component::concurrent::tls::set(store.traitobj_mut(), move || drop(futures)); } } diff --git a/crates/wasmtime/src/runtime/component/values.rs b/crates/wasmtime/src/runtime/component/values.rs index dbe129ca4370..943607280686 100644 --- a/crates/wasmtime/src/runtime/component/values.rs +++ b/crates/wasmtime/src/runtime/component/values.rs @@ -1,6 +1,6 @@ use crate::ValRaw; use crate::component::ResourceAny; -use crate::component::concurrent::{self, ErrorContext, HostFuture, HostStream}; +use crate::component::concurrent::{self, ErrorContext, FutureReader, StreamReader}; use crate::component::func::{Lift, LiftContext, Lower, LowerContext, desc}; use crate::prelude::*; use core::mem::MaybeUninit; @@ -207,10 +207,10 @@ impl Val { Val::Flags(flags) } InterfaceType::Future(_) => { - HostFuture::<()>::linear_lift_from_flat(cx, ty, next(src))?.into_val() + FutureReader::<()>::linear_lift_from_flat(cx, ty, next(src))?.into_val() } InterfaceType::Stream(_) => { - HostStream::<()>::linear_lift_from_flat(cx, ty, next(src))?.into_val() + StreamReader::<()>::linear_lift_from_flat(cx, ty, next(src))?.into_val() } InterfaceType::ErrorContext(_) => { ErrorContext::linear_lift_from_flat(cx, ty, next(src))?.into_val() @@ -337,10 +337,10 @@ impl Val { Val::Flags(flags) } InterfaceType::Future(_) => { - HostFuture::<()>::linear_lift_from_memory(cx, ty, bytes)?.into_val() + FutureReader::<()>::linear_lift_from_memory(cx, ty, bytes)?.into_val() } InterfaceType::Stream(_) => { - HostStream::<()>::linear_lift_from_memory(cx, ty, bytes)?.into_val() + StreamReader::<()>::linear_lift_from_memory(cx, ty, bytes)?.into_val() } InterfaceType::ErrorContext(_) => { ErrorContext::linear_lift_from_memory(cx, ty, bytes)?.into_val() diff --git a/crates/wasmtime/src/runtime/store.rs b/crates/wasmtime/src/runtime/store.rs index 901f4daa461c..33e4b6dae59d 100644 --- a/crates/wasmtime/src/runtime/store.rs +++ b/crates/wasmtime/src/runtime/store.rs @@ -664,8 +664,12 @@ impl Store { // attempting to drop the instances themselves since the fibers may need // to be resumed and allowed to exit cleanly before we yank the state // out from under them. + // + // This will also drop any futures which might use a `&Accessor` fields + // in their `Drop::drop` implementations, in which case they'll need to + // be called from with in the context of a `tls::set` closure. #[cfg(feature = "component-model-async")] - ComponentStoreData::drop_fibers(&mut self.inner); + ComponentStoreData::drop_fibers_and_futures(&mut self.inner); // Ensure all fiber stacks, even cached ones, are all flushed out to the // instance allocator. diff --git a/crates/wasmtime/src/runtime/vm/component/libcalls.rs b/crates/wasmtime/src/runtime/vm/component/libcalls.rs index 7577305f9d3c..be65a8820acc 100644 --- a/crates/wasmtime/src/runtime/vm/component/libcalls.rs +++ b/crates/wasmtime/src/runtime/vm/component/libcalls.rs @@ -1004,9 +1004,11 @@ fn future_drop_writable( ty: u32, writer: u32, ) -> Result<()> { - instance - .concurrent_state_mut(store) - .future_drop_writable(TypeFutureTableIndex::from_u32(ty), writer) + store.component_async_store().future_drop_writable( + instance, + TypeFutureTableIndex::from_u32(ty), + writer, + ) } #[cfg(feature = "component-model-async")] @@ -1103,9 +1105,11 @@ fn stream_drop_writable( ty: u32, writer: u32, ) -> Result<()> { - instance - .concurrent_state_mut(store) - .stream_drop_writable(TypeStreamTableIndex::from_u32(ty), writer) + store.component_async_store().stream_drop_writable( + instance, + TypeStreamTableIndex::from_u32(ty), + writer, + ) } #[cfg(feature = "component-model-async")] diff --git a/crates/wit-bindgen/src/lib.rs b/crates/wit-bindgen/src/lib.rs index 50f06777d341..d4811aa9499e 100644 --- a/crates/wit-bindgen/src/lib.rs +++ b/crates/wit-bindgen/src/lib.rs @@ -1516,9 +1516,25 @@ impl Wasmtime { let src = src.unwrap_or(&mut self.src); let gate = FeatureGate::open(src, stability); let camel = name.to_upper_camel_case(); + let flags = self.opts.imports.resource_drop_flags(resolve, key, name); if flags.contains(FunctionFlags::ASYNC) { - uwriteln!( + if flags.contains(FunctionFlags::STORE) { + uwriteln!( + src, + "{inst}.resource_concurrent( + \"{name}\", + {wt}::component::ResourceType::host::<{camel}>(), + move |caller: &{wt}::component::Accessor::, rep| {{ + {wt}::component::__internal::Box::pin(async move {{ + let accessor = &caller.with_data(host_getter); + Host{camel}Concurrent::drop(accessor, {wt}::component::Resource::new_own(rep)).await + }}) + }}, + )?;" + ) + } else { + uwriteln!( src, "{inst}.resource_async( \"{name}\", @@ -1530,6 +1546,7 @@ impl Wasmtime { }}, )?;" ) + } } else { uwriteln!( src, @@ -2874,6 +2891,27 @@ impl<'a> InterfaceGenerator<'a> { with_store_supertraits.join(" + "), ); ret.with_store_name = Some(format!("{trait_name}WithStore")); + + for extra in extra_functions { + match extra { + ExtraTraitMethod::ResourceDrop { name } => { + let flags = self.import_resource_drop_flags(name); + if !flags.contains(FunctionFlags::STORE) { + continue; + } + let camel = name.to_upper_camel_case(); + + assert!(flags.contains(FunctionFlags::ASYNC)); + + uwrite!( + self.src, + "fn drop(accessor: &{wt}::component::Accessor, rep: {wt}::component::Resource<{camel}>) -> impl ::core::future::Future> + Send where Self: Sized;" + ); + } + ExtraTraitMethod::ErrorConvert { .. } => {} + } + } + for (func, flags) in partition.with_store.iter() { self.generate_function_trait_sig(func, *flags); self.push_str(";\n"); @@ -2889,6 +2927,7 @@ impl<'a> InterfaceGenerator<'a> { " where _T: {}", with_store_supertraits.join(" + ") ); + uwriteln!(self.src, "{{}}"); } @@ -2907,8 +2946,12 @@ impl<'a> InterfaceGenerator<'a> { match extra { ExtraTraitMethod::ResourceDrop { name } => { let camel = name.to_upper_camel_case(); + let flags = self.import_resource_drop_flags(name); ret.all_func_flags |= flags; + if flags.contains(FunctionFlags::STORE) { + continue; + } uwrite!( self.src, "fn drop(&mut self, rep: {wt}::component::Resource<{camel}>) -> " diff --git a/crates/wit-bindgen/src/rust.rs b/crates/wit-bindgen/src/rust.rs index a5fccd5de07f..e2b6383379bd 100644 --- a/crates/wit-bindgen/src/rust.rs +++ b/crates/wit-bindgen/src/rust.rs @@ -176,12 +176,12 @@ pub trait RustGenerator<'a> { TypeDefKind::Future(ty) => { let wt = self.wasmtime_path(); let t = self.optional_ty(ty.as_ref(), TypeMode::Owned); - format!("{wt}::component::HostFuture<{t}>") + format!("{wt}::component::FutureReader<{t}>") } TypeDefKind::Stream(ty) => { let wt = self.wasmtime_path(); let t = self.optional_ty(ty.as_ref(), TypeMode::Owned); - format!("{wt}::component::HostStream<{t}>") + format!("{wt}::component::StreamReader<{t}>") } TypeDefKind::Handle(handle) => self.handle(handle), TypeDefKind::Resource => unreachable!(),