@@ -186,7 +186,7 @@ impl<'a> generated::types::HostClientHandshake for WasiTlsCtx<'a> {
186186
187187 Ok ( self
188188 . table
189- . push ( FutureClientStreams ( StreamState :: Pending ( Box :: pin (
189+ . push ( FutureStreams ( StreamState :: Pending ( Box :: pin (
190190 async move {
191191 let connector = tokio_rustls:: TlsConnector :: from ( default_client_config ( ) ) ;
192192 connector
@@ -206,21 +206,20 @@ impl<'a> generated::types::HostClientHandshake for WasiTlsCtx<'a> {
206206 }
207207}
208208
209- /// Future TLS connection after the handshake is completed.
210- pub struct FutureClientStreams ( StreamState < Result < TlsStream < WasiStreams > > > ) ;
209+ struct FutureStreams < T > ( StreamState < Result < T > > ) ;
210+
211+ /// Library specific version of TLS connection after the handshake is completed.
212+ /// This alias allows it to use with wit-bindgen component generator which won't take generic types
213+ #[ allow( private_interfaces) ]
214+ pub type FutureClientStreams = FutureStreams < TlsStream < WasiStreams > > ;
211215
212216#[ async_trait]
213- impl Pollable for FutureClientStreams {
217+ impl < T : Send + ' static > Pollable for FutureStreams < T > {
214218 async fn ready ( & mut self ) {
215- match & self . 0 {
216- StreamState :: Pending ( _) => ( ) ,
219+ match & mut self . 0 {
217220 StreamState :: Ready ( _) | StreamState :: Closed => return ,
221+ StreamState :: Pending ( task) => self . 0 = StreamState :: Ready ( task. as_mut ( ) . await ) ,
218222 }
219-
220- let StreamState :: Pending ( future) = mem:: replace ( & mut self . 0 , StreamState :: Closed ) else {
221- unreachable ! ( )
222- } ;
223- self . 0 = StreamState :: Ready ( future. await ) ;
224223 }
225224}
226225
@@ -635,3 +634,41 @@ fn try_lock_for_stream<TlsWriter>(
635634 . try_lock ( )
636635 . map_err ( |_| StreamError :: trap ( "concurrent access to resource not supported" ) )
637636}
637+
638+
639+ #[ cfg( test) ]
640+ mod tests {
641+ use super :: * ;
642+ use tokio:: sync:: oneshot;
643+
644+ #[ tokio:: test]
645+ async fn test_future_client_streams_ready_can_be_canceled ( ) {
646+ let ( tx1, rx1) = oneshot:: channel :: < ( ) > ( ) ;
647+
648+ let mut future_streams = FutureStreams ( StreamState :: Pending ( Box :: pin ( async move {
649+ rx1. await . map_err ( |_| anyhow:: anyhow!( "oneshot canceled" ) )
650+ } ) ) ) ;
651+
652+ let mut fut = future_streams. ready ( ) ;
653+
654+ let mut cx = std:: task:: Context :: from_waker ( futures:: task:: noop_waker_ref ( ) ) ;
655+ assert ! ( fut. as_mut( ) . poll( & mut cx) . is_pending( ) ) ;
656+
657+ //cancel the readiness check
658+ drop ( fut) ;
659+
660+ match future_streams. 0 {
661+ StreamState :: Closed => panic ! ( "First future should be in Pending/ready state" ) ,
662+ _ => ( ) ,
663+ }
664+
665+ // make it ready and wait for it to progress
666+ tx1. send ( ( ) ) . unwrap ( ) ;
667+ future_streams. ready ( ) . await ;
668+
669+ match future_streams. 0 {
670+ StreamState :: Ready ( Ok ( ( ) ) ) => ( ) ,
671+ _ => panic ! ( "First future should be in Ready(Err) state" ) ,
672+ }
673+ }
674+ }
0 commit comments