33from __future__ import print_function
44
55import threading
6- import time
76
87try :
98 from queue import Queue , Full , Empty
1514
1615__all__ = ["EventTime" , "FluentSender" ]
1716
18- _global_sender = None
19-
20- DEFAULT_QUEUE_TIMEOUT = 0.05
2117DEFAULT_QUEUE_MAXSIZE = 100
2218DEFAULT_QUEUE_CIRCULAR = False
2319
20+ _TOMBSTONE = object ()
21+
22+ _global_sender = None
23+
2424
2525def _set_global_sender (sender ): # pragma: no cover
2626 """ [For testing] Function to set global sender directly
@@ -42,8 +42,9 @@ def close(): # pragma: no cover
4242 get_global_sender ().close ()
4343
4444
45- class CommunicatorThread (threading .Thread ):
46- def __init__ (self , tag ,
45+ class FluentSender (sender .FluentSender ):
46+ def __init__ (self ,
47+ tag ,
4748 host = 'localhost' ,
4849 port = 24224 ,
4950 bufmax = 1 * 1024 * 1024 ,
@@ -52,76 +53,42 @@ def __init__(self, tag,
5253 buffer_overflow_handler = None ,
5354 nanosecond_precision = False ,
5455 msgpack_kwargs = None ,
55- queue_timeout = DEFAULT_QUEUE_TIMEOUT ,
5656 queue_maxsize = DEFAULT_QUEUE_MAXSIZE ,
57- queue_circular = DEFAULT_QUEUE_CIRCULAR , * args , ** kwargs ):
58- super (CommunicatorThread , self ).__init__ (** kwargs )
59- self ._queue = Queue (maxsize = queue_maxsize )
60- self ._do_run = True
61- self ._queue_timeout = queue_timeout
57+ queue_circular = DEFAULT_QUEUE_CIRCULAR ,
58+ ** kwargs ):
59+ """
60+ :param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version.
61+ """
62+ super (FluentSender , self ).__init__ (tag = tag , host = host , port = port , bufmax = bufmax , timeout = timeout ,
63+ verbose = verbose , buffer_overflow_handler = buffer_overflow_handler ,
64+ nanosecond_precision = nanosecond_precision ,
65+ msgpack_kwargs = msgpack_kwargs ,
66+ ** kwargs )
6267 self ._queue_maxsize = queue_maxsize
6368 self ._queue_circular = queue_circular
64- self ._conn_close_lock = threading .Lock ()
65- self ._sender = sender .FluentSender (tag = tag , host = host , port = port , bufmax = bufmax , timeout = timeout ,
66- verbose = verbose , buffer_overflow_handler = buffer_overflow_handler ,
67- nanosecond_precision = nanosecond_precision , msgpack_kwargs = msgpack_kwargs )
6869
69- def send (self , bytes_ ):
70- if self ._queue_circular and self ._queue .full ():
71- # discard oldest
72- try :
73- self ._queue .get (block = False )
74- except Empty : # pragma: no cover
75- pass
76- try :
77- self ._queue .put (bytes_ , block = (not self ._queue_circular ))
78- except Full :
79- return False
80- return True
70+ self ._thread_guard = threading .Event () # This ensures visibility across all variables
71+ self ._closed = False
8172
82- def run (self ):
83- while self ._do_run :
84- try :
85- bytes_ = self ._queue .get (block = True , timeout = self ._queue_timeout )
86- except Empty :
87- continue
88- with self ._conn_close_lock :
89- self ._sender ._send (bytes_ )
90-
91- def close (self , flush = True , discard = True ):
92- if discard :
93- while not self ._queue .empty ():
94- try :
95- self ._queue .get (block = False )
96- except Empty :
97- break
98- while flush and (not self ._queue .empty ()):
99- time .sleep (0.1 )
100- self ._do_run = False
101- self ._sender .close ()
102-
103- def _close (self ):
104- with self ._conn_close_lock :
105- self ._sender ._close ()
106-
107- @property
108- def last_error (self ):
109- return self ._sender .last_error
110-
111- @last_error .setter
112- def last_error (self , err ):
113- self ._sender .last_error = err
114-
115- def clear_last_error (self , _thread_id = None ):
116- self ._sender .clear_last_error (_thread_id = _thread_id )
117-
118- @property
119- def queue_timeout (self ):
120- return self ._queue_timeout
121-
122- @queue_timeout .setter
123- def queue_timeout (self , value ):
124- self ._queue_timeout = value
73+ self ._queue = Queue (maxsize = queue_maxsize )
74+ self ._send_thread = threading .Thread (target = self ._send_loop ,
75+ name = "AsyncFluentSender %d" % id (self ))
76+ self ._send_thread .daemon = True
77+ self ._send_thread .start ()
78+
79+ def close (self , flush = True ):
80+ with self .lock :
81+ if self ._closed :
82+ return
83+ self ._closed = True
84+ if not flush :
85+ while True :
86+ try :
87+ self ._queue .get (block = False )
88+ except Empty :
89+ break
90+ self ._queue .put (_TOMBSTONE )
91+ self ._send_thread .join ()
12592
12693 @property
12794 def queue_maxsize (self ):
@@ -135,91 +102,35 @@ def queue_blocking(self):
135102 def queue_circular (self ):
136103 return self ._queue_circular
137104
138-
139- class FluentSender (sender .FluentSender ):
140- def __init__ (self ,
141- tag ,
142- host = 'localhost' ,
143- port = 24224 ,
144- bufmax = 1 * 1024 * 1024 ,
145- timeout = 3.0 ,
146- verbose = False ,
147- buffer_overflow_handler = None ,
148- nanosecond_precision = False ,
149- msgpack_kwargs = None ,
150- queue_timeout = DEFAULT_QUEUE_TIMEOUT ,
151- queue_maxsize = DEFAULT_QUEUE_MAXSIZE ,
152- queue_circular = DEFAULT_QUEUE_CIRCULAR ,
153- ** kwargs ): # This kwargs argument is not used in __init__. This will be removed in the next major version.
154- super (FluentSender , self ).__init__ (tag = tag , host = host , port = port , bufmax = bufmax , timeout = timeout ,
155- verbose = verbose , buffer_overflow_handler = buffer_overflow_handler ,
156- nanosecond_precision = nanosecond_precision , msgpack_kwargs = msgpack_kwargs ,
157- ** kwargs )
158- self ._communicator = CommunicatorThread (tag = tag , host = host , port = port , bufmax = bufmax , timeout = timeout ,
159- verbose = verbose , buffer_overflow_handler = buffer_overflow_handler ,
160- nanosecond_precision = nanosecond_precision , msgpack_kwargs = msgpack_kwargs ,
161- queue_timeout = queue_timeout , queue_maxsize = queue_maxsize ,
162- queue_circular = queue_circular )
163- self ._communicator .start ()
164-
165105 def _send (self , bytes_ ):
166- return self ._communicator .send (bytes_ = bytes_ )
167-
168- def _close (self ):
169- # super(FluentSender, self)._close()
170- self ._communicator ._close ()
171-
172- def _send_internal (self , bytes_ ):
173- assert False # pragma: no cover
174-
175- def _send_data (self , bytes_ ):
176- assert False # pragma: no cover
177-
178- # override reconnect, so we don't open a socket here (since it
179- # will be opened by the CommunicatorThread)
180- def _reconnect (self ):
181- return
182-
183- def close (self ):
184- self ._communicator .close (flush = True )
185- self ._communicator .join ()
186- return super (FluentSender , self ).close ()
187-
188- @property
189- def last_error (self ):
190- return self ._communicator .last_error
191-
192- @last_error .setter
193- def last_error (self , err ):
194- self ._communicator .last_error = err
195-
196- def clear_last_error (self , _thread_id = None ):
197- self ._communicator .clear_last_error (_thread_id = _thread_id )
106+ with self .lock :
107+ if self ._closed :
108+ return False
109+ if self ._queue_circular and self ._queue .full ():
110+ # discard oldest
111+ try :
112+ self ._queue .get (block = False )
113+ except Empty : # pragma: no cover
114+ pass
115+ try :
116+ self ._queue .put (bytes_ , block = (not self ._queue_circular ))
117+ except Full : # pragma: no cover
118+ return False # this actually can't happen
198119
199- @property
200- def queue_timeout (self ):
201- return self ._communicator .queue_timeout
120+ return True
202121
203- @queue_timeout .setter
204- def queue_timeout (self , value ):
205- self ._communicator .queue_timeout = value
122+ def _send_loop (self ):
123+ send_internal = super (FluentSender , self )._send_internal
206124
207- @property
208- def queue_maxsize (self ):
209- return self ._communicator .queue_maxsize
210-
211- @property
212- def queue_blocking (self ):
213- return self ._communicator .queue_blocking
214-
215- @property
216- def queue_circular (self ):
217- return self ._communicator .queue_circular
125+ try :
126+ while True :
127+ bytes_ = self ._queue .get (block = True )
128+ if bytes_ is _TOMBSTONE :
129+ break
218130
219- def __enter__ (self ):
220- return self
131+ send_internal (bytes_ )
132+ finally :
133+ self ._close ()
221134
222- def __exit__ (self , typ , value , traceback ):
223- # give time to the comm. thread to send its queued messages
224- time .sleep (0.2 )
135+ def __exit__ (self , exc_type , exc_val , exc_tb ):
225136 self .close ()
0 commit comments