11//! Types and traits for sharing `Bytes`.
22
33use std:: sync:: { Arc , Mutex } ;
4- use std:: sync:: atomic:: { AtomicBool , Ordering } ;
54use std:: collections:: VecDeque ;
65use anyhow:: bail;
76
87use bytes:: arc:: Bytes ;
98use super :: bytes_slab:: BytesSlab ;
109
11- use crate :: Result ;
12-
1310/// A target for `Bytes`.
1411pub trait BytesPush {
1512 // /// Pushes bytes at the instance.
1613 // fn push(&mut self, bytes: Bytes);
1714 /// Pushes many bytes at the instance.
18- fn extend < I : IntoIterator < Item =Bytes > > ( & mut self , iter : I ) -> Result < ( ) > ;
15+ fn extend < I : IntoIterator < Item =Bytes > > ( & mut self , iter : I ) -> crate :: Result < ( ) > ;
1916}
2017/// A source for `Bytes`.
2118pub trait BytesPull {
2219 // /// Pulls bytes from the instance.
2320 // fn pull(&mut self) -> Option<Bytes>;
2421 /// Drains many bytes from the instance.
25- fn drain_into ( & mut self , vec : & mut Vec < Bytes > ) -> Result < ( ) > ;
22+ fn drain_into ( & mut self , vec : & mut Vec < Bytes > ) -> crate :: Result < ( ) > ;
2623}
2724
25+ use std:: sync:: atomic:: { AtomicBool , Ordering } ;
2826/// An unbounded queue of bytes intended for point-to-point communication
2927/// between threads. Cloning returns another handle to the same queue.
3028///
@@ -46,19 +44,19 @@ impl MergeQueue {
4644 }
4745 }
4846 /// Indicates that all input handles to the queue have dropped.
49- pub fn is_complete ( & self ) -> Result < bool > {
47+ pub fn is_complete ( & self ) -> crate :: Result < bool > {
5048 if self . panic . load ( Ordering :: SeqCst ) { bail ! ( "MergeQueue poisoned." ) ; }
51- Ok ( Arc :: strong_count ( & self . queue ) == 1 && self . queue . lock ( ) . map_err ( |_ | anyhow:: anyhow!( "MergeQueue mutex poisoned. " ) ) ?. is_empty ( ) )
49+ Ok ( Arc :: strong_count ( & self . queue ) == 1 && self . queue . lock ( ) . map_err ( |e | anyhow:: anyhow!( "MergeQueue mutex poisoned: {e} " ) ) ?. is_empty ( ) )
5250 }
5351
54- /// TODO
52+ /// Mark self as poisoned, which causes all subsequent operations to error.
5553 pub fn poison ( & mut self ) {
5654 self . panic . store ( true , Ordering :: SeqCst ) ;
5755 }
5856}
5957
6058impl BytesPush for MergeQueue {
61- fn extend < I : IntoIterator < Item =Bytes > > ( & mut self , iterator : I ) -> Result < ( ) > {
59+ fn extend < I : IntoIterator < Item =Bytes > > ( & mut self , iterator : I ) -> crate :: Result < ( ) > {
6260
6361 if self . panic . load ( Ordering :: SeqCst ) { bail ! ( "MergeQueue poisoned." ) ; }
6462
@@ -68,7 +66,7 @@ impl BytesPush for MergeQueue {
6866 lock_ok = self . queue . try_lock ( ) ;
6967 }
7068 let mut queue = lock_ok
71- . map_err ( |_ | anyhow:: anyhow!( "MergeQueue mutex poisoned. " ) ) ?;
69+ . map_err ( |e | anyhow:: anyhow!( "MergeQueue mutex poisoned: {e} " ) ) ?;
7270
7371 let mut iterator = iterator. into_iter ( ) ;
7472 let mut should_ping = false ;
@@ -102,7 +100,7 @@ impl BytesPush for MergeQueue {
102100}
103101
104102impl BytesPull for MergeQueue {
105- fn drain_into ( & mut self , vec : & mut Vec < Bytes > ) -> Result < ( ) > {
103+ fn drain_into ( & mut self , vec : & mut Vec < Bytes > ) -> crate :: Result < ( ) > {
106104 if self . panic . load ( Ordering :: SeqCst ) { bail ! ( "MergeQueue poisoned." ) ; }
107105
108106 // try to acquire lock without going to sleep (Rust's lock() might yield)
@@ -111,7 +109,7 @@ impl BytesPull for MergeQueue {
111109 lock_ok = self . queue . try_lock ( ) ;
112110 }
113111 let mut queue = lock_ok
114- . map_err ( |_ | anyhow:: anyhow!( "MergeQueue mutex poisoned. " ) ) ?;
112+ . map_err ( |e | anyhow:: anyhow!( "MergeQueue mutex poisoned: {e} " ) ) ?;
115113
116114 vec. extend ( queue. drain ( ..) ) ;
117115 Ok ( ( ) )
@@ -128,7 +126,7 @@ impl Drop for MergeQueue {
128126 }
129127 else {
130128 // TODO: Perhaps this aggressive ordering can relax orderings elsewhere.
131- // if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
129+ if self . panic . load ( Ordering :: SeqCst ) { panic ! ( "MergeQueue poisoned." ) ; }
132130 }
133131 // Drop the queue before pinging.
134132 self . queue = Arc :: new ( Mutex :: new ( VecDeque :: new ( ) ) ) ;
@@ -146,7 +144,7 @@ pub struct SendEndpoint<P: BytesPush> {
146144impl < P : BytesPush > SendEndpoint < P > {
147145
148146 /// Moves `self.buffer` into `self.send`, replaces with empty buffer.
149- fn send_buffer ( & mut self ) -> Result < ( ) > {
147+ fn send_buffer ( & mut self ) -> crate :: Result < ( ) > {
150148 let valid_len = self . buffer . valid ( ) . len ( ) ;
151149 if valid_len > 0 {
152150 self . send . extend ( Some ( self . buffer . extract ( valid_len) ) ) ?;
@@ -164,12 +162,12 @@ impl<P: BytesPush> SendEndpoint<P> {
164162 /// Makes the next `bytes` bytes valid.
165163 ///
166164 /// The current implementation also sends the bytes, to ensure early visibility.
167- pub fn make_valid ( & mut self , bytes : usize ) -> Result < ( ) > {
165+ pub fn make_valid ( & mut self , bytes : usize ) -> crate :: Result < ( ) > {
168166 self . buffer . make_valid ( bytes) ;
169167 self . send_buffer ( )
170168 }
171169 /// Acquires a prefix of `self.empty()` of length at least `capacity`.
172- pub fn reserve ( & mut self , capacity : usize ) -> Result < & mut [ u8 ] > {
170+ pub fn reserve ( & mut self , capacity : usize ) -> crate :: Result < & mut [ u8 ] > {
173171
174172 if self . buffer . empty ( ) . len ( ) < capacity {
175173 self . send_buffer ( ) ?;
@@ -180,7 +178,7 @@ impl<P: BytesPush> SendEndpoint<P> {
180178 Ok ( self . buffer . empty ( ) )
181179 }
182180 /// Marks all written data as valid, makes visible.
183- pub fn publish ( & mut self ) -> Result < ( ) > {
181+ pub fn publish ( & mut self ) -> crate :: Result < ( ) > {
184182 self . send_buffer ( )
185183 }
186184}
0 commit comments