Skip to content

Commit fe67a1c

Browse files
committed
When closing MockFluentd, wait 3 seconds until running threads for writing logs are terminated
1 parent 03b250d commit fe67a1c

File tree

4 files changed

+81
-18
lines changed

4 files changed

+81
-18
lines changed

src/main/java/org/fluentd/logger/sender/ExponentialDelayReconnector.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
* Calcurate exponential delay for reconnecting
1010
*/
1111
public class ExponentialDelayReconnector implements Reconnector {
12-
private static final Logger LOG = LoggerFactory.getLogger(ExponentialDelayReconnector.class);
1312

1413
private double wait = 0.5;
1514

src/test/java/org/fluentd/logger/TestFluentLogger.java

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,47 @@
33
import org.fluentd.logger.sender.Event;
44
import org.fluentd.logger.sender.NullSender;
55
import org.fluentd.logger.util.MockFluentd;
6+
import org.junit.Ignore;
67
import org.junit.Test;
78
import org.msgpack.MessagePack;
89
import org.msgpack.unpacker.Unpacker;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
912

1013
import java.io.BufferedInputStream;
1114
import java.io.EOFException;
1215
import java.io.IOException;
1316
import java.net.Socket;
1417
import java.util.*;
15-
import java.util.concurrent.Executor;
1618
import java.util.concurrent.ExecutorService;
1719
import java.util.concurrent.Executors;
1820
import java.util.concurrent.TimeUnit;
1921

2022
import static org.junit.Assert.assertEquals;
2123

2224
public class TestFluentLogger {
25+
private Logger _logger = LoggerFactory.getLogger(TestFluentLogger.class);
26+
27+
class FixedThreadManager {
28+
private final ExecutorService service;
29+
30+
public FixedThreadManager(int numThreads) {
31+
service = Executors.newFixedThreadPool(numThreads);
32+
}
33+
34+
public void submit(Runnable r) {
35+
service.submit(r);
36+
}
37+
38+
public void join() throws InterruptedException {
39+
service.shutdown();
40+
while(!service.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
41+
_logger.debug("waiting ...");
42+
}
43+
_logger.trace("Terminating FixedThreadManager");
44+
}
45+
}
46+
2347

2448
@Test
2549
public void testNormal01() throws Exception {
@@ -42,7 +66,9 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
4266
}
4367
}
4468
});
45-
fluentd.start();
69+
70+
FixedThreadManager threadManager = new FixedThreadManager(1);
71+
threadManager.submit(fluentd);
4672

4773
// start loggers
4874
FluentLogger logger = FluentLogger.getLogger("testtag", host, port);
@@ -66,14 +92,15 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
6692
fluentd.close();
6793

6894
// wait for unpacking event data on fluentd
69-
Thread.sleep(1000);
95+
threadManager.join();
7096

7197
// check data
7298
assertEquals(2, elist.size());
7399
assertEquals("testtag.test01", elist.get(0).tag);
74100
assertEquals("testtag.test01", elist.get(1).tag);
75101
}
76102

103+
@SuppressWarnings("unchecked")
77104
@Test
78105
public void testNormal02() throws Exception {
79106
int loggerCount = 3;
@@ -106,8 +133,8 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
106133
}
107134
}
108135
});
109-
ExecutorService executor = Executors.newFixedThreadPool(1);
110-
executor.submit(fluentd);
136+
FixedThreadManager threadManager = new FixedThreadManager(1);
137+
threadManager.submit(fluentd);
111138

112139
// start loggers
113140
FluentLogger[] loggers = new FluentLogger[loggerCount];
@@ -147,8 +174,7 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
147174
fluentd.close();
148175

149176
// wait for unpacking event data on fluentd
150-
while(executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
151-
}
177+
threadManager.join();
152178

153179
// check data
154180
assertEquals(counts[0], elists[0].size());
@@ -174,6 +200,9 @@ public void testReconnection() throws Exception {
174200
int port = MockFluentd.randomPort();
175201
String host = "localhost";
176202
final List<Event> elist1 = new ArrayList<Event>();
203+
204+
FixedThreadManager threadManager = new FixedThreadManager(2);
205+
177206
MockFluentd fluentd1 = new MockFluentd(port, new MockFluentd.MockProcess() {
178207
public void process(MessagePack msgpack, Socket socket) throws IOException {
179208
BufferedInputStream in = new BufferedInputStream(socket.getInputStream());
@@ -192,7 +221,7 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
192221
}
193222
}
194223
});
195-
fluentd1.start();
224+
threadManager.submit(fluentd1);
196225

197226
// start loggers
198227
FluentLogger logger = FluentLogger.getLogger("testtag", host, port);
@@ -204,6 +233,7 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
204233
}
205234

206235
TimeUnit.MILLISECONDS.sleep(500);
236+
_logger.info("Closing the current fluentd instance");
207237
fluentd1.closeClientSockets();
208238
fluentd1.close();
209239

@@ -232,7 +262,7 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
232262
}
233263
}
234264
});
235-
fluentd2.start();
265+
threadManager.submit(fluentd2);
236266

237267
TimeUnit.MILLISECONDS.sleep(500);
238268

@@ -245,11 +275,15 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
245275

246276
// close loggers
247277
FluentLogger.closeAll();
278+
Thread.sleep(2000);
248279

249280
fluentd2.close();
250281

282+
251283
// wait for unpacking event data on fluentd
252-
TimeUnit.MILLISECONDS.sleep(500);
284+
TimeUnit.MILLISECONDS.sleep(2000);
285+
threadManager.join();
286+
253287

254288
// check data
255289
assertEquals(1, elist1.size());

src/test/java/org/fluentd/logger/util/MockFluentd.java

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,24 @@
66
import org.msgpack.template.Templates;
77
import org.msgpack.type.Value;
88
import org.msgpack.unpacker.Unpacker;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
911

1012
import java.io.IOException;
1113
import java.net.ServerSocket;
1214
import java.net.Socket;
1315
import java.util.HashMap;
16+
import java.util.List;
1417
import java.util.concurrent.ConcurrentLinkedQueue;
18+
import java.util.concurrent.ExecutorService;
19+
import java.util.concurrent.Executors;
20+
import java.util.concurrent.TimeUnit;
1521
import java.util.concurrent.atomic.AtomicBoolean;
1622

1723
public class MockFluentd extends Thread {
24+
25+
private static final Logger _logger = LoggerFactory.getLogger(MockFluentd.class);
26+
1827
private ConcurrentLinkedQueue<Socket> clientSockets = new ConcurrentLinkedQueue<Socket>();
1928

2029
public static interface MockProcess {
@@ -79,6 +88,7 @@ private static Object toObject(Unpacker u, Value v) {
7988

8089
private MockProcess process;
8190

91+
private AtomicBoolean started = new AtomicBoolean(false);
8292
private AtomicBoolean finished = new AtomicBoolean(false);
8393

8494
public MockFluentd(int port, MockProcess mockProcess) throws IOException {
@@ -98,35 +108,55 @@ public static int randomPort() throws IOException {
98108
return port;
99109
}
100110

111+
private ExecutorService service = Executors.newCachedThreadPool();
112+
113+
101114
public void run() {
115+
_logger.debug("Started MockFluentd port:" + serverSocket.getLocalPort());
116+
102117
while (!finished.get()) {
103118
try {
119+
started.set(true);
104120
final Socket socket = serverSocket.accept();
105121
socket.setSoLinger(true, 0);
106122
clientSockets.add(socket);
107-
Runnable r = new Runnable() {
123+
service.submit(new Runnable() {
108124
public void run() {
109125
try {
126+
_logger.trace("received log");
110127
MessagePack msgpack = new MessagePack();
111128
msgpack.register(Event.class, MockEventTemplate.INSTANCE);
112129
process.process(msgpack, socket);
130+
_logger.trace("wrote log");
113131
} catch (IOException e) {
114132
// ignore
115133
}
116134
}
117-
};
118-
new Thread(r).start();
135+
});
119136
} catch (IOException e) {
120137
// ignore
121138
}
122139
}
140+
_logger.debug("Terminated MockFluentd port:" + serverSocket.getLocalPort());
123141
}
124142

125143
public void close() throws IOException {
126144
finished.set(true);
127-
if (serverSocket != null) {
145+
service.shutdown();
146+
try {
147+
// We need to wait until all log writing threads are finished.
148+
int numTrial = 0;
149+
while(numTrial < 3 && !service.awaitTermination(1, TimeUnit.SECONDS)) {
150+
numTrial++;
151+
}
152+
}
153+
catch(InterruptedException e) {
154+
_logger.error("interrupted", e);
155+
}
156+
if (serverSocket != null) {
128157
serverSocket.close();
129158
}
159+
130160
}
131161

132162
public void closeClientSockets() {

src/test/resources/logback-test.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@
55
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
66
<!-- Meanings of the symbols are defined in http://logback.qos.ch/manual/layouts.html -->
77
<encoder>
8-
<pattern>%d %highlight(%-5level) [%thread] - %msg %ex{0}</pattern>
8+
<pattern>%d %highlight(%-5level) [%thread] %msg%n%ex{0}</pattern>
99
</encoder>
1010
</appender>
11-
11+
1212
<root level="debug">
1313
<appender-ref ref="STDOUT"/>
1414
</root>
15-
</configuration>
15+
</configuration>

0 commit comments

Comments
 (0)