@@ -53,57 +53,59 @@ def __init__(self, sign_result_loc: str, request_id: str, rconf: RadasConfig) ->
5353 super ().__init__ ()
5454 self .sign_result_loc = sign_result_loc
5555 self .request_id = request_id
56- self .conn : Optional [Connection ] = None
57- self .message_handled = False
5856 self .sign_result_status : Optional [str ] = None
5957 self .sign_result_errors : List [str ] = []
6058 self .rconf = rconf
61- self .start_time = 0.0
62- self .timeout_check_delay = 30.0
63- self .ssl = SSLDomain (SSLDomain .MODE_CLIENT )
64- self .ssl .set_trusted_ca_db (self .rconf .root_ca ())
65- self .ssl .set_peer_authentication (SSLDomain .VERIFY_PEER )
66- self .ssl .set_credentials (
67- self .rconf .client_ca (),
68- self .rconf .client_key (),
69- self .rconf .client_key_password ()
70- )
59+ self ._conn : Optional [Connection ] = None
60+ self ._message_handled = False
61+ self ._start_time = 0.0
62+ self ._timeout_check_delay = 30.0
63+ self ._ssl : Optional [SSLDomain ] = None
64+ if rconf .ssl_enabled ():
65+ self ._ssl = SSLDomain (SSLDomain .MODE_CLIENT )
66+ self ._ssl .set_trusted_ca_db (self .rconf .root_ca ())
67+ self ._ssl .set_peer_authentication (SSLDomain .VERIFY_PEER )
68+ self ._ssl .set_credentials (
69+ self .rconf .client_ca (),
70+ self .rconf .client_key (),
71+ self .rconf .client_key_password ()
72+ )
7173 self .log = logging .getLogger ("charon.pkgs.radas_sign.RadasReceiver" )
7274
7375 def on_start (self , event : Event ) -> None :
7476 umb_target = self .rconf .umb_target ()
7577 container = event .container
76- self .conn = container .connect (
78+ self ._conn = container .connect (
7779 url = umb_target ,
78- ssl_domain = self .ssl ,
80+ ssl_domain = self ._ssl ,
7981 heartbeat = 500
8082 )
8183 receiver = container .create_receiver (
82- context = self .conn , source = self .rconf .result_queue (),
84+ context = self ._conn , source = self .rconf .result_queue (),
8385 )
8486 self .log .info ("Listening on %s, queue: %s" ,
8587 umb_target ,
8688 receiver .source .address )
87- self .start_time = time .time ()
88- container .schedule (self .timeout_check_delay , self )
89+ self ._start_time = time .time ()
90+ container .schedule (self ._timeout_check_delay , self )
8991
9092 def on_timer_task (self , event : Event ) -> None :
9193 current = time .time ()
9294 timeout = self .rconf .receiver_timeout ()
93- idle_time = current - self .start_time
95+ idle_time = current - self ._start_time
9496 self .log .debug ("Checking timeout: passed %s seconds, timeout time %s seconds" ,
9597 idle_time , timeout )
9698 if idle_time > self .rconf .receiver_timeout ():
9799 self .log .error ("The receiver did not receive messages for more than %s seconds,"
98100 " and needs to stop receiving and quit." , timeout )
99101 self ._close (event )
100102 else :
101- event .container .schedule (self .timeout_check_delay , self )
103+ event .container .schedule (self ._timeout_check_delay , self )
102104
103105 def on_message (self , event : Event ) -> None :
104106 self .log .debug ("Got message: %s" , event .message .body )
105107 self ._process_message (event .message .body )
106- if self .message_handled :
108+ if self ._message_handled :
107109 self .log .debug ("The signing result is handled." )
108110 self ._close (event )
109111
@@ -137,7 +139,7 @@ def _process_message(self, msg: Any) -> None:
137139 )
138140 return
139141
140- self .message_handled = True
142+ self ._message_handled = True
141143 self .log .info (
142144 "Start to process the sign event message, request_id %s is matched" , msg_request_id
143145 )
@@ -171,56 +173,60 @@ class RadasSender(MessagingHandler):
171173 this value construct from the cmd flag
172174 rconf (RadasConfig): the configurations for the radas messaging
173175 system.
176+ status (str): tell if status for message sending, only "success"
177+ means the message is sent successfully.
174178 """
175179 def __init__ (self , payload : Any , rconf : RadasConfig ):
176180 super (RadasSender , self ).__init__ ()
177181 self .payload = payload
178182 self .rconf = rconf
179- self .message_sent = False # Flag to track if message was sent
180183 self .status : Optional [str ] = None
181- self .retried = 0
182- self .pending : Optional [Message ] = None
183- self .message : Optional [Message ] = None
184- self .container : Optional [Container ] = None
185- self .sender : Optional [Sender ] = None
186- self .ssl = SSLDomain (SSLDomain .MODE_CLIENT )
187- self .ssl .set_trusted_ca_db (self .rconf .root_ca ())
188- self .ssl .set_peer_authentication (SSLDomain .VERIFY_PEER )
189- self .ssl .set_credentials (
190- self .rconf .client_ca (),
191- self .rconf .client_key (),
192- self .rconf .client_key_password ()
193- )
184+ self ._message_sent = False # Flag to track if message was sent
185+ self ._retried = 0
186+ self ._pending : Optional [Message ] = None
187+ self ._message : Optional [Message ] = None
188+ self ._container : Optional [Container ] = None
189+ self ._sender : Optional [Sender ] = None
190+ self ._ssl : Optional [SSLDomain ] = None
191+ if self .rconf .ssl_enabled ():
192+ self ._ssl = SSLDomain (SSLDomain .MODE_CLIENT )
193+ self ._ssl .set_trusted_ca_db (self .rconf .root_ca ())
194+ self ._ssl .set_peer_authentication (SSLDomain .VERIFY_PEER )
195+ self ._ssl .set_credentials (
196+ self .rconf .client_ca (),
197+ self .rconf .client_key (),
198+ self .rconf .client_key_password ()
199+ )
194200 self .log = logging .getLogger ("charon.pkgs.radas_sign.RadasSender" )
195201
196202 def on_start (self , event ):
197- self .container = event .container
198- conn = self .container .connect (
203+ self ._container = event .container
204+ conn = self ._container .connect (
199205 url = self .rconf .umb_target (),
200- ssl_domain = self .ssl
206+ ssl_domain = self ._ssl
201207 )
202208 if conn :
203- self .sender = self .container .create_sender (conn , self .rconf .request_queue ())
209+ self ._sender = self ._container .create_sender (conn , self .rconf .request_queue ())
204210
205211 def on_sendable (self , event ):
206- if not self .message_sent :
212+ if not self ._message_sent :
207213 msg = Message (body = self .payload , durable = True )
208214 self .log .debug ("Sending message: %s to %s" , msg .id , event .sender .target .address )
209215 self ._send_msg (msg )
210- self .message = msg
211- self .message_sent = True
216+ self ._message = msg
217+ self ._message_sent = True
212218
213219 def on_error (self , event ):
214220 self .log .error ("Error happened during message sending, reason %s" ,
215221 event .description )
216222 self .status = "failed"
217223
218224 def on_rejected (self , event ):
219- self .pending = self .message
225+ self ._pending = self ._message
220226 self ._handle_failed_delivery ("Rejected" )
221227
222228 def on_released (self , event ):
223- self .pending = self .message
229+ self ._pending = self ._message
224230 self ._handle_failed_delivery ("Released" )
225231
226232 def on_accepted (self , event ):
@@ -229,42 +235,42 @@ def on_accepted(self, event):
229235 self .close () # Close connection after confirmation
230236
231237 def on_timer_task (self , event ):
232- message_to_retry = self .message
238+ message_to_retry = self ._message
233239 self ._send_msg (message_to_retry )
234- self .pending = None
240+ self ._pending = None
235241
236242 def close (self ):
237243 self .log .info ("Message has been sent successfully, close connection" )
238- if self .sender :
239- self .sender .close ()
240- if self .container :
241- self .container .stop ()
244+ if self ._sender :
245+ self ._sender .close ()
246+ if self ._container :
247+ self ._container .stop ()
242248
243249 def _send_msg (self , msg : Message ):
244- if self .sender and self .sender .credit > 0 :
245- self .sender .send (msg )
250+ if self ._sender and self ._sender .credit > 0 :
251+ self ._sender .send (msg )
246252 self .log .debug ("Message %s sent" , msg .id )
247253 else :
248254 self .log .warning ("Sender not ready or no credit available" )
249255
250256 def _handle_failed_delivery (self , reason : str ):
251- if self .pending :
252- msg = self .pending
257+ if self ._pending :
258+ msg = self ._pending
253259 self .log .warning ("Message %s failed for reason: %s" , msg .id , reason )
254260 max_retries = self .rconf .radas_sign_timeout_retry_count ()
255- if self .retried < max_retries :
261+ if self ._retried < max_retries :
256262 # Schedule retry
257- self .retried = self .retried + 1
263+ self ._retried = self ._retried + 1
258264 self .log .info ("Scheduling retry %s/%s for message %s" ,
259- self .retried , max_retries , msg .id )
265+ self ._retried , max_retries , msg .id )
260266 # Schedule retry after delay
261- if self .container :
262- self .container .schedule (self .rconf .radas_sign_timeout_retry_interval (), self )
267+ if self ._container :
268+ self ._container .schedule (self .rconf .radas_sign_timeout_retry_interval (), self )
263269 else :
264270 # Max retries exceeded
265271 self .log .error ("Message %s failed after %s retries" , msg .id , max_retries )
266272 self .status = "failed"
267- self .pending = None
273+ self ._pending = None
268274 else :
269275 self .log .info ("Message has been sent successfully, close connection" )
270276 self .close ()
0 commit comments