Skip to content

Commit 7f02aaa

Browse files
committed
Add asyncsender options to get a circular queue and/or with a bounded size.
NOTE: by default the async sender queue is blocking and non-circular. Enabling the `queue_circular` async sender option, makes the sender queue circular and non-blocking, useful in high throughput low-latency applications. Please note that with circular mode enabled, back-pressure could cause the queue to fill up and start discarding events (the oldest not yet sent).
1 parent af4fd66 commit 7f02aaa

File tree

2 files changed

+212
-4
lines changed

2 files changed

+212
-4
lines changed

fluent/asyncsender.py

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
_global_sender = None
1515

1616
DEFAULT_QUEUE_TIMEOUT = 0.05
17+
DEFAULT_QUEUE_MAXSIZE = 100
18+
DEFAULT_QUEUE_CIRCULAR = False
1719

1820

1921
def _set_global_sender(sender):
@@ -46,19 +48,29 @@ def __init__(self, tag,
4648
buffer_overflow_handler=None,
4749
nanosecond_precision=False,
4850
msgpack_kwargs=None,
49-
queue_timeout=DEFAULT_QUEUE_TIMEOUT, *args, **kwargs):
51+
queue_timeout=DEFAULT_QUEUE_TIMEOUT,
52+
queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
53+
queue_circular=DEFAULT_QUEUE_CIRCULAR, *args, **kwargs):
5054
super(CommunicatorThread, self).__init__(**kwargs)
51-
self._queue = Queue()
55+
self._queue = Queue(maxsize=queue_maxsize)
5256
self._do_run = True
5357
self._queue_timeout = queue_timeout
58+
self._queue_maxsize = queue_maxsize
59+
self._queue_circular = queue_circular
5460
self._conn_close_lock = threading.Lock()
5561
self._sender = sender.FluentSender(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
5662
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
5763
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs)
5864

5965
def send(self, bytes_):
66+
if self._queue_circular and self._queue.full():
67+
# discard oldest
68+
try:
69+
self._queue.get(block=False)
70+
except Empty:
71+
pass
6072
try:
61-
self._queue.put(bytes_)
73+
self._queue.put(bytes_, block=(not self._queue_circular))
6274
except Full:
6375
return False
6476
return True
@@ -114,6 +126,18 @@ def queue_timeout(self):
114126
def queue_timeout(self, value):
115127
self._queue_timeout = value
116128

129+
@property
130+
def queue_maxsize(self):
131+
return self._queue_maxsize
132+
133+
@property
134+
def queue_blocking(self):
135+
return not self._queue_circular
136+
137+
@property
138+
def queue_circular(self):
139+
return self._queue_circular
140+
117141
def __enter__(self):
118142
return self
119143

@@ -133,6 +157,8 @@ def __init__(self,
133157
nanosecond_precision=False,
134158
msgpack_kwargs=None,
135159
queue_timeout=DEFAULT_QUEUE_TIMEOUT,
160+
queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
161+
queue_circular=DEFAULT_QUEUE_CIRCULAR,
136162
**kwargs): # This kwargs argument is not used in __init__. This will be removed in the next major version.
137163
super(FluentSender, self).__init__(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
138164
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
@@ -141,7 +167,8 @@ def __init__(self,
141167
self._communicator = CommunicatorThread(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
142168
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
143169
nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs,
144-
queue_timeout=queue_timeout)
170+
queue_timeout=queue_timeout, queue_maxsize=queue_maxsize,
171+
queue_circular=queue_circular)
145172
self._communicator.start()
146173

147174
def _send(self, bytes_):
@@ -186,6 +213,18 @@ def queue_timeout(self):
186213
def queue_timeout(self, value):
187214
self._communicator.queue_timeout = value
188215

216+
@property
217+
def queue_maxsize(self):
218+
return self._communicator.queue_maxsize
219+
220+
@property
221+
def queue_blocking(self):
222+
return self._communicator.queue_blocking
223+
224+
@property
225+
def queue_circular(self):
226+
return self._communicator.queue_circular
227+
189228
def __enter__(self):
190229
return self
191230

tests/test_asyncsender.py

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,27 @@ def test_connect_exception_during_sender_init(self, mock_socket):
145145
self.assertEqual(self._sender.last_error.args[0], EXCEPTION_MSG)
146146

147147

148+
class TestSenderDefaultProperties(unittest.TestCase):
149+
def setUp(self):
150+
super(TestSenderDefaultProperties, self).setUp()
151+
self._server = mockserver.MockRecvServer('localhost')
152+
self._sender = fluent.asyncsender.FluentSender(tag='test',
153+
port=self._server.port)
154+
155+
def tearDown(self):
156+
self._sender.close()
157+
158+
def test_default_properties(self):
159+
sender = self._sender
160+
self.assertTrue(sender.queue_blocking)
161+
self.assertFalse(sender.queue_circular)
162+
self.assertTrue(isinstance(sender.queue_maxsize, int))
163+
self.assertTrue(sender.queue_maxsize > 0)
164+
self.assertTrue(isinstance(sender.queue_timeout, (int, float)))
165+
self.assertTrue(sender.queue_timeout > 0)
166+
sender._close()
167+
168+
148169
class TestSenderWithTimeout(unittest.TestCase):
149170
def setUp(self):
150171
super(TestSenderWithTimeout, self).setUp()
@@ -195,3 +216,151 @@ def test_event_time(self):
195216
time = fluent.asyncsender.EventTime(1490061367.8616468906402588)
196217
self.assertEqual(time.code, 0)
197218
self.assertEqual(time.data, b'X\xd0\x8873[\xb0*')
219+
220+
221+
class TestSenderWithTimeoutAndCircular(unittest.TestCase):
222+
Q_SIZE = 3
223+
224+
def setUp(self):
225+
super(TestSenderWithTimeoutAndCircular, self).setUp()
226+
self._server = mockserver.MockRecvServer('localhost')
227+
self._sender = fluent.asyncsender.FluentSender(tag='test',
228+
port=self._server.port,
229+
queue_timeout=0.04,
230+
queue_maxsize=self.Q_SIZE,
231+
queue_circular=True)
232+
233+
def tearDown(self):
234+
self._sender.close()
235+
236+
def get_data(self):
237+
return self._server.get_recieved()
238+
239+
def test_simple(self):
240+
sender = self._sender
241+
242+
self.assertEqual(self._sender.queue_maxsize, self.Q_SIZE)
243+
self.assertEqual(self._sender.queue_circular, True)
244+
self.assertEqual(self._sender.queue_blocking, False)
245+
246+
ok = sender.emit('foo1', {'bar': 'baz1'})
247+
self.assertTrue(ok)
248+
ok = sender.emit('foo2', {'bar': 'baz2'})
249+
self.assertTrue(ok)
250+
ok = sender.emit('foo3', {'bar': 'baz3'})
251+
self.assertTrue(ok)
252+
ok = sender.emit('foo4', {'bar': 'baz4'})
253+
self.assertTrue(ok)
254+
ok = sender.emit('foo5', {'bar': 'baz5'})
255+
self.assertTrue(ok)
256+
time.sleep(0.5)
257+
sender._close()
258+
data = self.get_data()
259+
eq = self.assertEqual
260+
eq(self.Q_SIZE, len(data))
261+
eq(3, len(data[0]))
262+
eq('test.foo3', data[0][0])
263+
eq({'bar': 'baz3'}, data[0][2])
264+
self.assertTrue(data[0][1])
265+
self.assertTrue(isinstance(data[0][1], int))
266+
267+
eq(3, len(data[2]))
268+
eq('test.foo5', data[2][0])
269+
eq({'bar': 'baz5'}, data[2][2])
270+
271+
272+
class TestSenderWithTimeoutMaxSizeNonCircular(unittest.TestCase):
273+
Q_SIZE = 3
274+
275+
def setUp(self):
276+
super(TestSenderWithTimeoutMaxSizeNonCircular, self).setUp()
277+
self._server = mockserver.MockRecvServer('localhost')
278+
self._sender = fluent.asyncsender.FluentSender(tag='test',
279+
port=self._server.port,
280+
queue_timeout=0.04,
281+
queue_maxsize=self.Q_SIZE)
282+
283+
def tearDown(self):
284+
self._sender.close()
285+
286+
def get_data(self):
287+
return self._server.get_recieved()
288+
289+
def test_simple(self):
290+
sender = self._sender
291+
292+
self.assertEqual(self._sender.queue_maxsize, self.Q_SIZE)
293+
self.assertEqual(self._sender.queue_blocking, True)
294+
self.assertEqual(self._sender.queue_circular, False)
295+
296+
ok = sender.emit('foo1', {'bar': 'baz1'})
297+
self.assertTrue(ok)
298+
ok = sender.emit('foo2', {'bar': 'baz2'})
299+
self.assertTrue(ok)
300+
ok = sender.emit('foo3', {'bar': 'baz3'})
301+
self.assertTrue(ok)
302+
ok = sender.emit('foo4', {'bar': 'baz4'})
303+
self.assertTrue(ok)
304+
ok = sender.emit('foo5', {'bar': 'baz5'})
305+
self.assertTrue(ok)
306+
time.sleep(0.5)
307+
sender._close()
308+
data = self.get_data()
309+
eq = self.assertEqual
310+
print(data)
311+
eq(5, len(data))
312+
eq(3, len(data[0]))
313+
eq('test.foo1', data[0][0])
314+
eq({'bar': 'baz1'}, data[0][2])
315+
self.assertTrue(data[0][1])
316+
self.assertTrue(isinstance(data[0][1], int))
317+
318+
eq(3, len(data[2]))
319+
eq('test.foo3', data[2][0])
320+
eq({'bar': 'baz3'}, data[2][2])
321+
322+
323+
class TestSenderUnlimitedSize(unittest.TestCase):
324+
Q_SIZE = 3
325+
326+
def setUp(self):
327+
super(TestSenderUnlimitedSize, self).setUp()
328+
self._server = mockserver.MockRecvServer('localhost')
329+
self._sender = fluent.asyncsender.FluentSender(tag='test',
330+
port=self._server.port,
331+
queue_timeout=0.04,
332+
queue_maxsize=0)
333+
334+
def tearDown(self):
335+
self._sender.close()
336+
337+
def get_data(self):
338+
return self._server.get_recieved()
339+
340+
def test_simple(self):
341+
sender = self._sender
342+
343+
self.assertEqual(self._sender.queue_maxsize, 0)
344+
self.assertEqual(self._sender.queue_blocking, True)
345+
self.assertEqual(self._sender.queue_circular, False)
346+
347+
NUM = 1000
348+
for i in range(1, NUM+1):
349+
ok = sender.emit("foo{}".format(i), {'bar': "baz{}".format(i)})
350+
self.assertTrue(ok)
351+
time.sleep(0.5)
352+
sender._close()
353+
data = self.get_data()
354+
eq = self.assertEqual
355+
eq(NUM, len(data))
356+
el = data[0]
357+
eq(3, len(el))
358+
eq('test.foo1', el[0])
359+
eq({'bar': 'baz1'}, el[2])
360+
self.assertTrue(el[1])
361+
self.assertTrue(isinstance(el[1], int))
362+
363+
el = data[NUM-1]
364+
eq(3, len(el))
365+
eq("test.foo{}".format(NUM), el[0])
366+
eq({'bar': "baz{}".format(NUM)}, el[2])

0 commit comments

Comments
 (0)