1+ use futures:: executor:: block_on;
12use std:: future:: Future ;
23use std:: time:: Duration ;
34use std:: {
45 marker:: PhantomData ,
56 sync:: {
6- atomic:: { AtomicBool , AtomicU64 , AtomicUsize , Ordering } ,
7+ atomic:: { AtomicBool , AtomicU64 , Ordering } ,
78 Arc ,
89 } ,
910} ;
1011
1112use dashmap:: DashMap ;
1213use futures:: { future:: BoxFuture , FutureExt } ;
14+ use tokio:: sync:: mpsc;
1315use tokio:: sync:: mpsc:: channel;
14- use tokio:: sync:: { mpsc, Mutex } ;
1516use tokio:: time:: sleep;
16- use tracing:: { debug , error , trace} ;
17+ use tracing:: { error , info , trace} ;
1718
1819use rabbitmq_stream_protocol:: { message:: Message , ResponseCode , ResponseKind } ;
1920
@@ -60,18 +61,49 @@ impl ConfirmationStatus {
6061}
6162
6263pub struct ProducerInternal {
63- client : Client ,
64+ client : Arc < Client > ,
6465 stream : String ,
6566 producer_id : u8 ,
66- batch_size : usize ,
6767 publish_sequence : Arc < AtomicU64 > ,
6868 waiting_confirmations : WaiterMap ,
6969 closed : Arc < AtomicBool > ,
70- accumulator : MessageAccumulator ,
71- publish_version : u16 ,
70+ sender : mpsc:: Sender < ClientMessage > ,
7271 filter_value_extractor : Option < FilterValueExtractor > ,
7372}
7473
74+ impl Drop for ProducerInternal {
75+ fn drop ( & mut self ) {
76+ block_on ( async {
77+ if let Err ( e) = self . close ( ) . await {
78+ error ! ( error = ?e, "Error closing producer" ) ;
79+ }
80+ } ) ;
81+ }
82+ }
83+
84+ impl ProducerInternal {
85+ pub async fn close ( & self ) -> Result < ( ) , ProducerCloseError > {
86+ match self
87+ . closed
88+ . compare_exchange ( false , true , Ordering :: SeqCst , Ordering :: SeqCst )
89+ {
90+ Ok ( false ) => {
91+ let response = self . client . delete_publisher ( self . producer_id ) . await ?;
92+ if response. is_ok ( ) {
93+ self . client . close ( ) . await ?;
94+ Ok ( ( ) )
95+ } else {
96+ Err ( ProducerCloseError :: Close {
97+ status : response. code ( ) . clone ( ) ,
98+ stream : self . stream . clone ( ) ,
99+ } )
100+ }
101+ }
102+ _ => Ok ( ( ) ) , // Already closed
103+ }
104+ }
105+ }
106+
75107/// API for publising messages to RabbitMQ stream
76108#[ derive( Clone ) ]
77109pub struct Producer < T > ( Arc < ProducerInternal > , PhantomData < T > ) ;
@@ -139,22 +171,29 @@ impl<T> ProducerBuilder<T> {
139171 } ;
140172
141173 if response. is_ok ( ) {
174+ let ( sender, receiver) = mpsc:: channel ( self . batch_size ) ;
175+
176+ let client = Arc :: new ( client) ;
142177 let producer = ProducerInternal {
143178 producer_id,
144- batch_size : self . batch_size ,
145179 stream : stream. to_string ( ) ,
146180 client,
147181 publish_sequence,
148182 waiting_confirmations,
149- publish_version,
150183 closed : Arc :: new ( AtomicBool :: new ( false ) ) ,
151- accumulator : MessageAccumulator :: new ( self . batch_size ) ,
184+ sender ,
152185 filter_value_extractor : self . filter_value_extractor ,
153186 } ;
154187
155188 let internal_producer = Arc :: new ( producer) ;
156- let producer = Producer ( internal_producer. clone ( ) , PhantomData ) ;
157- schedule_batch_send ( internal_producer) ;
189+ schedule_batch_send (
190+ self . batch_size ,
191+ receiver,
192+ internal_producer. client . clone ( ) ,
193+ producer_id,
194+ publish_version,
195+ ) ;
196+ let producer = Producer ( internal_producer, PhantomData ) ;
158197
159198 Ok ( producer)
160199 } else {
@@ -205,78 +244,33 @@ impl<T> ProducerBuilder<T> {
205244 }
206245}
207246
208- pub struct MessageAccumulator {
209- sender : mpsc:: Sender < ClientMessage > ,
210- receiver : Mutex < mpsc:: Receiver < ClientMessage > > ,
211- message_count : AtomicUsize ,
212- }
213-
214- impl MessageAccumulator {
215- pub fn new ( batch_size : usize ) -> Self {
216- let ( sender, receiver) = mpsc:: channel ( batch_size) ;
217- Self {
218- sender,
219- receiver : Mutex :: new ( receiver) ,
220- message_count : AtomicUsize :: new ( 0 ) ,
221- }
222- }
223-
224- pub async fn add ( & self , message : ClientMessage ) -> RabbitMQStreamResult < ( ) > {
225- match self . sender . send ( message) . await {
226- Ok ( _) => {
227- self . message_count . fetch_add ( 1 , Ordering :: Relaxed ) ;
228- Ok ( ( ) )
229- }
230- Err ( e) => Err ( ClientError :: GenericError ( Box :: new ( e) ) ) ,
231- }
232- }
233-
234- pub async fn get ( & self , buffer : & mut Vec < ClientMessage > , batch_size : usize ) -> ( bool , usize ) {
235- let mut receiver = self . receiver . lock ( ) . await ;
236-
237- let count = receiver. recv_many ( buffer, batch_size) . await ;
238- self . message_count . fetch_sub ( count, Ordering :: Relaxed ) ;
239-
240- // `recv_many` returns 0 only if the channel is closed
241- // Read https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.recv_many
242- ( count == 0 , count)
243- }
244- }
245-
246- fn schedule_batch_send ( producer : Arc < ProducerInternal > ) {
247+ fn schedule_batch_send (
248+ batch_size : usize ,
249+ mut receiver : mpsc:: Receiver < ClientMessage > ,
250+ client : Arc < Client > ,
251+ producer_id : u8 ,
252+ publish_version : u16 ,
253+ ) {
247254 tokio:: task:: spawn ( async move {
248- let mut buffer = Vec :: with_capacity ( producer . batch_size ) ;
255+ let mut buffer = Vec :: with_capacity ( batch_size) ;
249256 loop {
250- let ( is_closed, count) = producer
251- . accumulator
252- . get ( & mut buffer, producer. batch_size )
253- . await ;
257+ let count = receiver. recv_many ( & mut buffer, batch_size) . await ;
254258
255- if is_closed {
256- error ! ( " Channel is closed and this is bad" ) ;
259+ if count == 0 || buffer . is_empty ( ) {
260+ // Channel is closed, exit the loop
257261 break ;
258262 }
259263
260- if count > 0 {
261- debug ! ( "Sending batch of {} messages" , count) ;
262- let messages: Vec < _ > = buffer. drain ( ..count) . collect ( ) ;
263- match producer
264- . client
265- . publish ( producer. producer_id , messages, producer. publish_version )
266- . await
267- {
268- Ok ( _) => { }
269- Err ( e) => {
270- error ! ( "Error publishing batch {:?}" , e) ;
271-
272- // Stop loop if producer is closed
273- if producer. closed . load ( Ordering :: Relaxed ) {
274- break ;
275- }
276- }
277- } ;
278- }
264+ let messages: Vec < _ > = buffer. drain ( ..count) . collect ( ) ;
265+ match client. publish ( producer_id, messages, publish_version) . await {
266+ Ok ( _) => { }
267+ Err ( e) => {
268+ error ! ( "Error publishing batch {:?}" , e) ;
269+ }
270+ } ;
279271 }
272+
273+ info ! ( "Batch send task finished" ) ;
280274 } ) ;
281275}
282276
@@ -455,10 +449,13 @@ impl<T> Producer<T> {
455449 . waiting_confirmations
456450 . insert ( publishing_id, ProducerMessageWaiter :: Once ( waiter) ) ;
457451
458- self . 0 . accumulator . add ( msg) . await ?;
452+ if let Err ( e) = self . 0 . sender . send ( msg) . await {
453+ return Err ( ClientError :: GenericError ( Box :: new ( e) ) ) ?;
454+ }
459455
460456 Ok ( ( ) )
461457 }
458+
462459 async fn internal_batch_send < Fut > (
463460 & self ,
464461 messages : Vec < Message > ,
@@ -488,7 +485,9 @@ impl<T> Producer<T> {
488485 }
489486
490487 // Queue the message for sending
491- self . 0 . accumulator . add ( client_message) . await ?;
488+ if let Err ( e) = self . 0 . sender . send ( client_message) . await {
489+ return Err ( ClientError :: GenericError ( Box :: new ( e) ) ) ?;
490+ }
492491 self . 0
493492 . waiting_confirmations
494493 . insert ( publishing_id, ProducerMessageWaiter :: Shared ( waiter. clone ( ) ) ) ;
@@ -500,27 +499,9 @@ impl<T> Producer<T> {
500499 pub fn is_closed ( & self ) -> bool {
501500 self . 0 . closed . load ( Ordering :: Relaxed )
502501 }
503- // TODO handle producer state after close
502+
504503 pub async fn close ( self ) -> Result < ( ) , ProducerCloseError > {
505- match self
506- . 0
507- . closed
508- . compare_exchange ( false , true , Ordering :: SeqCst , Ordering :: SeqCst )
509- {
510- Ok ( false ) => {
511- let response = self . 0 . client . delete_publisher ( self . 0 . producer_id ) . await ?;
512- if response. is_ok ( ) {
513- self . 0 . client . close ( ) . await ?;
514- Ok ( ( ) )
515- } else {
516- Err ( ProducerCloseError :: Close {
517- status : response. code ( ) . clone ( ) ,
518- stream : self . 0 . stream . clone ( ) ,
519- } )
520- }
521- }
522- _ => Err ( ProducerCloseError :: AlreadyClosed ) ,
523- }
504+ self . 0 . close ( ) . await
524505 }
525506}
526507
0 commit comments