Skip to content

Commit ee5e27c

Browse files
committed
RawSocketSender.send() should try to flush the buffer after it's full
1 parent 907ab99 commit ee5e27c

File tree

3 files changed

+100
-4
lines changed

3 files changed

+100
-4
lines changed

src/main/java/org/fluentd/logger/sender/RawSocketSender.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,11 +139,24 @@ protected boolean emit(Event event) {
139139
return send(bytes);
140140
}
141141

142+
private boolean flushBuffer() {
143+
if (reconnector.enableReconnection(System.currentTimeMillis())) {
144+
flush();
145+
if (pendings.position() == 0) {
146+
return true;
147+
}
148+
}
149+
150+
return false;
151+
}
152+
142153
private synchronized boolean send(byte[] bytes) {
143154
// buffering
144155
if (pendings.position() + bytes.length > pendings.capacity()) {
145-
LOG.error("Cannot send logs to " + server.toString());
146-
return false;
156+
if (!flushBuffer()) {
157+
LOG.error("Cannot send logs to " + server.toString());
158+
return false;
159+
}
147160
}
148161
pendings.put(bytes);
149162

src/test/java/org/fluentd/logger/sender/TestRawSocketSender.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,4 +333,79 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
333333
assertTrue(event.data.keySet().contains("key0"));
334334
assertTrue(event.data.values().contains("v3"));
335335
}
336+
337+
@Test
338+
public void testReconnectAfterBufferFull() throws Exception {
339+
final CountDownLatch bufferFull = new CountDownLatch(1);
340+
341+
// start mock fluentd
342+
int port = MockFluentd.randomPort(); // Use a random port available
343+
final List<Event> elist = new ArrayList<Event>();
344+
final MockFluentd fluentd = new MockFluentd(port, new MockFluentd.MockProcess() {
345+
public void process(MessagePack msgpack, Socket socket) throws IOException {
346+
BufferedInputStream in = new BufferedInputStream(socket.getInputStream());
347+
try {
348+
Unpacker unpacker = msgpack.createUnpacker(in);
349+
while (true) {
350+
Event e = unpacker.read(Event.class);
351+
elist.add(e);
352+
}
353+
//socket.close();
354+
} catch (EOFException e) {
355+
// ignore
356+
}
357+
}
358+
});
359+
360+
ExecutorService executor = Executors.newSingleThreadExecutor();
361+
executor.execute(new Runnable() {
362+
@Override
363+
public void run() {
364+
try {
365+
bufferFull.await(20, TimeUnit.SECONDS);
366+
fluentd.start();
367+
} catch (InterruptedException e) {
368+
e.printStackTrace();
369+
}
370+
}
371+
});
372+
373+
// start senders
374+
Sender sender = new RawSocketSender("localhost", port);
375+
String tag = "tag";
376+
int i;
377+
for (i = 0; i < 1000000; i++) { // Enough to fill the sender's buffer
378+
Map<String, Object> record = new HashMap<String, Object>();
379+
record.put("num", i);
380+
record.put("str", "name" + i);
381+
382+
if (bufferFull.getCount() == 0) {
383+
// After starting the fluentd
384+
if (sender.emit(tag, record)) {
385+
// Succeeded in flush buffer
386+
break;
387+
}
388+
}
389+
else {
390+
if (!sender.emit(tag, record)) {
391+
// Buffer full. Need to recover the fluentd
392+
bufferFull.countDown();
393+
Thread.sleep(2000);
394+
}
395+
}
396+
}
397+
398+
// close sender sockets
399+
sender.close();
400+
401+
// wait for unpacking event data on fluentd
402+
Thread.sleep(2000);
403+
404+
// close mock server sockets
405+
fluentd.close();
406+
407+
// check data
408+
assertEquals(0, bufferFull.getCount());
409+
assertEquals(i, elist.size());
410+
}
336411
}

src/test/java/org/fluentd/logger/util/MockFluentd.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,15 +84,16 @@ private static Object toObject(Unpacker u, Value v) {
8484
}
8585
}
8686

87+
private final int port;
8788
private ServerSocket serverSocket;
8889

8990
private MockProcess process;
9091

9192
private AtomicBoolean started = new AtomicBoolean(false);
9293
private AtomicBoolean finished = new AtomicBoolean(false);
9394

94-
public MockFluentd(int port, MockProcess mockProcess) throws IOException {
95-
serverSocket = new ServerSocket(port);
95+
public MockFluentd(int port, MockProcess mockProcess) {
96+
this.port = port;
9697
process = mockProcess;
9798
}
9899

@@ -112,6 +113,13 @@ public static int randomPort() throws IOException {
112113

113114

114115
public void run() {
116+
try {
117+
serverSocket = new ServerSocket(port);
118+
} catch (IOException e) {
119+
e.printStackTrace();
120+
throw new RuntimeException("Failed to start MockFluentd process", e);
121+
}
122+
115123
_logger.debug("Started MockFluentd port:" + serverSocket.getLocalPort());
116124

117125
while (!finished.get()) {

0 commit comments

Comments
 (0)