Skip to content

Commit d517a69

Browse files
Mike Kobyakovcosmo0920
authored andcommitted
add getLogger(..., Sender) method to the factory; add an asynchronous wrapper around RawSocketSender
1 parent 8e70b5c commit d517a69

File tree

1 file changed

+44
-28
lines changed

1 file changed

+44
-28
lines changed

src/main/java/org/fluentd/logger/sender/AsyncRawSocketSender.java

Lines changed: 44 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,46 @@
1414
import org.slf4j.LoggerFactory;
1515

1616
/**
17-
*
18-
* @author mkobyakov
17+
* An asynchronous wrapper around RawSocketSender
18+
*
19+
* @author mxk
1920
*
2021
*/
2122
public class AsyncRawSocketSender implements Sender {
2223

24+
private final class EmitRunnable implements Runnable {
25+
private final String tag;
26+
private final Map<String, Object> data;
27+
private final RawSocketSender sender;
28+
private final long timestamp;
29+
30+
private EmitRunnable(String tag, Map<String, Object> data,
31+
RawSocketSender sender, long timestamp) {
32+
this.tag = tag;
33+
this.data = data;
34+
this.sender = sender;
35+
this.timestamp = timestamp;
36+
}
37+
38+
@Override
39+
public void run() {
40+
sender.emit(tag, timestamp, data);
41+
}
42+
}
43+
44+
private final class FlushRunnable implements Runnable {
45+
private final RawSocketSender sender;
46+
47+
private FlushRunnable(RawSocketSender sender) {
48+
this.sender = sender;
49+
}
50+
51+
@Override
52+
public void run() {
53+
sender.flush();
54+
}
55+
}
56+
2357
private RawSocketSender sender;
2458
private Reconnector reconnector;
2559

@@ -28,10 +62,6 @@ public class AsyncRawSocketSender implements Sender {
2862

2963
private static final ExecutorService flusher = Executors.newSingleThreadExecutor();
3064

31-
private static final ErrorHandler DEFAULT_ERROR_HANLDER = new ErrorHandler() {};
32-
33-
private ErrorHandler errorHandler = DEFAULT_ERROR_HANLDER;
34-
3565
public AsyncRawSocketSender() {
3666
this("localhost", 24224);
3767
}
@@ -41,27 +71,22 @@ public AsyncRawSocketSender(String host, int port) {
4171
}
4272

4373
public AsyncRawSocketSender(String host, int port, int timeout,
44-
int bufferCapacity) {
74+
int bufferCapacity) {
4575
this(host, port, timeout, bufferCapacity,
4676
new ExponentialDelayReconnector());
4777
}
4878

4979
public AsyncRawSocketSender(String host, int port, int timeout,
50-
int bufferCapacity, Reconnector reconnector) {
80+
int bufferCapacity, Reconnector reconnector) {
5181
this.reconnector = reconnector;
5282
this.sender = new RawSocketSender(host, port, timeout, bufferCapacity,
53-
reconnector);
83+
reconnector);
5484
}
5585

5686
@Override
5787
public synchronized void flush() {
5888
final RawSocketSender sender = this.sender;
59-
flusher.execute(new Runnable() {
60-
@Override
61-
public void run() {
62-
sender.flush();
63-
}
64-
});
89+
flusher.execute(new FlushRunnable(sender));
6590
}
6691

6792
@Override
@@ -77,12 +102,7 @@ public boolean emit(String tag, Map<String, Object> data) {
77102
@Override
78103
public boolean emit(final String tag, final long timestamp, final Map<String, Object> data) {
79104
final RawSocketSender sender = this.sender;
80-
flusher.execute(new Runnable() {
81-
@Override
82-
public void run() {
83-
sender.emit(tag, timestamp, data);
84-
}
85-
});
105+
flusher.execute(new EmitRunnable(tag, data, sender, timestamp));
86106

87107
return sender.isConnected() || reconnector.enableReconnection(System.currentTimeMillis());
88108
}
@@ -99,15 +119,11 @@ public boolean isConnected() {
99119

100120
@Override
101121
public void setErrorHandler(ErrorHandler errorHandler) {
102-
if (errorHandler == null) {
103-
throw new IllegalArgumentException("errorHandler is null");
104-
}
105-
106-
this.errorHandler = errorHandler;
122+
sender.setErrorHandler(errorHandler);
107123
}
108124

109125
@Override
110126
public void removeErrorHandler() {
111-
this.errorHandler = DEFAULT_ERROR_HANLDER;
127+
sender.removeErrorHandler();
112128
}
113-
}
129+
}

0 commit comments

Comments
 (0)