@@ -343,16 +343,17 @@ public void testReconnectAfterBufferFull() throws Exception {
343343 final List <Event > elist = new ArrayList <Event >();
344344 final MockFluentd fluentd = new MockFluentd (port , new MockFluentd .MockProcess () {
345345 public void process (MessagePack msgpack , Socket socket ) throws IOException {
346- BufferedInputStream in = new BufferedInputStream (socket .getInputStream ());
347346 try {
347+ BufferedInputStream in = new BufferedInputStream (socket .getInputStream ());
348348 Unpacker unpacker = msgpack .createUnpacker (in );
349349 while (true ) {
350350 Event e = unpacker .read (Event .class );
351351 elist .add (e );
352352 }
353- //socket.close();
354353 } catch (EOFException e ) {
355354 // ignore
355+ } finally {
356+ socket .close ();
356357 }
357358 }
358359 });
@@ -379,20 +380,19 @@ public void run() {
379380 record .put ("num" , i );
380381 record .put ("str" , "name" + i );
381382
382- if (bufferFull .getCount () == 0 ) {
383- // After starting the fluentd
384- if (sender .emit (tag , record )) {
385- // Succeeded in flush buffer
386- break ;
387- }
388- }
389- else {
383+ if (bufferFull .getCount () > 0 ) {
384+ // Fill the sender's buffer
390385 if (!sender .emit (tag , record )) {
391386 // Buffer full. Need to recover the fluentd
392387 bufferFull .countDown ();
393388 Thread .sleep (2000 );
394389 }
395390 }
391+ else {
392+ // Flush the sender's buffer after the fluentd starts
393+ sender .emit (tag , record );
394+ break ;
395+ }
396396 }
397397
398398 // close sender sockets
0 commit comments