Skip to content

Commit d7af0cf

Browse files
committed
Merge pull request #27 from fluent/issue_17
RawSocketSender.send() should try to flush the buffer after it's full
2 parents 4ecd3f2 + 2e32201 commit d7af0cf

File tree

3 files changed

+100
-4
lines changed

3 files changed

+100
-4
lines changed

src/main/java/org/fluentd/logger/sender/RawSocketSender.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,11 +139,24 @@ protected boolean emit(Event event) {
139139
return send(bytes);
140140
}
141141

142+
private boolean flushBuffer() {
143+
if (reconnector.enableReconnection(System.currentTimeMillis())) {
144+
flush();
145+
if (pendings.position() == 0) {
146+
return true;
147+
}
148+
}
149+
150+
return false;
151+
}
152+
142153
private synchronized boolean send(byte[] bytes) {
143154
// buffering
144155
if (pendings.position() + bytes.length > pendings.capacity()) {
145-
LOG.error("Cannot send logs to " + server.toString());
146-
return false;
156+
if (!flushBuffer()) {
157+
LOG.error("Cannot send logs to " + server.toString());
158+
return false;
159+
}
147160
}
148161
pendings.put(bytes);
149162

src/test/java/org/fluentd/logger/sender/TestRawSocketSender.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,4 +333,79 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
333333
assertTrue(event.data.keySet().contains("key0"));
334334
assertTrue(event.data.values().contains("v3"));
335335
}
336+
337+
@Test
338+
public void testReconnectAfterBufferFull() throws Exception {
339+
final CountDownLatch bufferFull = new CountDownLatch(1);
340+
341+
// start mock fluentd
342+
int port = MockFluentd.randomPort(); // Use a random port available
343+
final List<Event> elist = new ArrayList<Event>();
344+
final MockFluentd fluentd = new MockFluentd(port, new MockFluentd.MockProcess() {
345+
public void process(MessagePack msgpack, Socket socket) throws IOException {
346+
try {
347+
BufferedInputStream in = new BufferedInputStream(socket.getInputStream());
348+
Unpacker unpacker = msgpack.createUnpacker(in);
349+
while (true) {
350+
Event e = unpacker.read(Event.class);
351+
elist.add(e);
352+
}
353+
} catch (EOFException e) {
354+
// ignore
355+
} finally {
356+
socket.close();
357+
}
358+
}
359+
});
360+
361+
ExecutorService executor = Executors.newSingleThreadExecutor();
362+
executor.execute(new Runnable() {
363+
@Override
364+
public void run() {
365+
try {
366+
bufferFull.await(20, TimeUnit.SECONDS);
367+
fluentd.start();
368+
} catch (InterruptedException e) {
369+
e.printStackTrace();
370+
}
371+
}
372+
});
373+
374+
// start senders
375+
Sender sender = new RawSocketSender("localhost", port);
376+
String tag = "tag";
377+
int i;
378+
for (i = 0; i < 1000000; i++) { // Enough to fill the sender's buffer
379+
Map<String, Object> record = new HashMap<String, Object>();
380+
record.put("num", i);
381+
record.put("str", "name" + i);
382+
383+
if (bufferFull.getCount() > 0) {
384+
// Fill the sender's buffer
385+
if (!sender.emit(tag, record)) {
386+
// Buffer full. Need to recover the fluentd
387+
bufferFull.countDown();
388+
Thread.sleep(2000);
389+
}
390+
}
391+
else {
392+
// Flush the sender's buffer after the fluentd starts
393+
sender.emit(tag, record);
394+
break;
395+
}
396+
}
397+
398+
// close sender sockets
399+
sender.close();
400+
401+
// wait for unpacking event data on fluentd
402+
Thread.sleep(2000);
403+
404+
// close mock server sockets
405+
fluentd.close();
406+
407+
// check data
408+
assertEquals(0, bufferFull.getCount());
409+
assertEquals(i, elist.size());
410+
}
336411
}

src/test/java/org/fluentd/logger/util/MockFluentd.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,15 +84,16 @@ private static Object toObject(Unpacker u, Value v) {
8484
}
8585
}
8686

87+
private final int port;
8788
private ServerSocket serverSocket;
8889

8990
private MockProcess process;
9091

9192
private AtomicBoolean started = new AtomicBoolean(false);
9293
private AtomicBoolean finished = new AtomicBoolean(false);
9394

94-
public MockFluentd(int port, MockProcess mockProcess) throws IOException {
95-
serverSocket = new ServerSocket(port);
95+
public MockFluentd(int port, MockProcess mockProcess) {
96+
this.port = port;
9697
process = mockProcess;
9798
}
9899

@@ -112,6 +113,13 @@ public static int randomPort() throws IOException {
112113

113114

114115
public void run() {
116+
try {
117+
serverSocket = new ServerSocket(port);
118+
} catch (IOException e) {
119+
e.printStackTrace();
120+
throw new RuntimeException("Failed to start MockFluentd process", e);
121+
}
122+
115123
_logger.debug("Started MockFluentd port:" + serverSocket.getLocalPort());
116124

117125
while (!finished.get()) {

0 commit comments

Comments
 (0)