Skip to content

Commit 8e70b5c

Browse files
Mike Kobyakovcosmo0920
authored andcommitted
add getLogger(..., Sender) method to the factory; add an asynchronous wrapper around RawSocketSender
1 parent ea176c2 commit 8e70b5c

File tree

2 files changed

+140
-2
lines changed

2 files changed

+140
-2
lines changed

src/main/java/org/fluentd/logger/FluentLoggerFactory.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.lang.reflect.Constructor;
2121
import java.lang.reflect.InvocationTargetException;
22+
import java.util.Collections;
2223
import java.util.Map;
2324
import java.util.Properties;
2425
import java.util.WeakHashMap;
@@ -33,7 +34,7 @@ public class FluentLoggerFactory {
3334
private final Map<FluentLogger, String> loggers;
3435

3536
public FluentLoggerFactory() {
36-
loggers = new WeakHashMap<FluentLogger, String>();
37+
loggers = Collections.synchronizedMap(new WeakHashMap<FluentLogger, String>());
3738
}
3839

3940
public FluentLogger getLogger(String tagPrefix) {
@@ -48,7 +49,7 @@ public FluentLogger getLogger(String tagPrefix, String host, int port, int timeo
4849
return getLogger(tagPrefix, host, port, timeout, bufferCapacity, new ExponentialDelayReconnector());
4950
}
5051

51-
public synchronized FluentLogger getLogger(String tagPrefix, String host, int port, int timeout, int bufferCapacity,
52+
public FluentLogger getLogger(String tagPrefix, String host, int port, int timeout, int bufferCapacity,
5253
Reconnector reconnector) {
5354
String key = String.format("%s_%s_%d_%d_%d", new Object[] { tagPrefix, host, port, timeout, bufferCapacity });
5455

@@ -80,6 +81,30 @@ public synchronized FluentLogger getLogger(String tagPrefix, String host, int po
8081
return logger;
8182
}
8283

84+
public FluentLogger getLogger(String tagPrefix, String host, int port, int timeout, int bufferCapacity,
85+
Sender sender) {
86+
if (sender == null) {
87+
return getLogger(tagPrefix, host, port, timeout, bufferCapacity);
88+
}
89+
String key = String.format("%s_%s_%d_%d_%d_%s", new Object[] { tagPrefix, host, port, timeout, bufferCapacity, sender == null ? "null" : sender .getName() });
90+
if (loggers.containsKey(key)) {
91+
for (Map.Entry<FluentLogger, String> entry : loggers.entrySet()) {
92+
if (entry.getValue().equals(key)) {
93+
FluentLogger found = entry.getKey();
94+
if(found != null) {
95+
return found;
96+
}
97+
break;
98+
}
99+
}
100+
return getLogger(tagPrefix, host, port, timeout, bufferCapacity);
101+
} else {
102+
FluentLogger logger = new FluentLogger(tagPrefix, sender);
103+
loggers.put(logger, key);
104+
return logger;
105+
}
106+
}
107+
83108
@SuppressWarnings("unchecked")
84109
private Sender createSenderInstance(final String className, final Object[] params) throws ClassNotFoundException,
85110
SecurityException, NoSuchMethodException, IllegalArgumentException, InstantiationException,
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
2+
package org.fluentd.logger.sender;
3+
4+
import java.util.Map;
5+
import java.util.concurrent.ExecutorService;
6+
import java.util.concurrent.Executors;
7+
8+
import org.fluentd.logger.errorhandler.ErrorHandler;
9+
import org.fluentd.logger.sender.ExponentialDelayReconnector;
10+
import org.fluentd.logger.sender.RawSocketSender;
11+
import org.fluentd.logger.sender.Reconnector;
12+
import org.fluentd.logger.sender.Sender;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
16+
/**
17+
*
18+
* @author mkobyakov
19+
*
20+
*/
21+
public class AsyncRawSocketSender implements Sender {
22+
23+
private RawSocketSender sender;
24+
private Reconnector reconnector;
25+
26+
@SuppressWarnings("unused")
27+
private static final Logger logger = LoggerFactory.getLogger(AsyncRawSocketSender.class);
28+
29+
private static final ExecutorService flusher = Executors.newSingleThreadExecutor();
30+
31+
private static final ErrorHandler DEFAULT_ERROR_HANLDER = new ErrorHandler() {};
32+
33+
private ErrorHandler errorHandler = DEFAULT_ERROR_HANLDER;
34+
35+
public AsyncRawSocketSender() {
36+
this("localhost", 24224);
37+
}
38+
39+
public AsyncRawSocketSender(String host, int port) {
40+
this(host, port, 3 * 1000, 8 * 1024 * 1024);
41+
}
42+
43+
public AsyncRawSocketSender(String host, int port, int timeout,
44+
int bufferCapacity) {
45+
this(host, port, timeout, bufferCapacity,
46+
new ExponentialDelayReconnector());
47+
}
48+
49+
public AsyncRawSocketSender(String host, int port, int timeout,
50+
int bufferCapacity, Reconnector reconnector) {
51+
this.reconnector = reconnector;
52+
this.sender = new RawSocketSender(host, port, timeout, bufferCapacity,
53+
reconnector);
54+
}
55+
56+
@Override
57+
public synchronized void flush() {
58+
final RawSocketSender sender = this.sender;
59+
flusher.execute(new Runnable() {
60+
@Override
61+
public void run() {
62+
sender.flush();
63+
}
64+
});
65+
}
66+
67+
@Override
68+
public void close() {
69+
sender.close();
70+
}
71+
72+
@Override
73+
public boolean emit(String tag, Map<String, Object> data) {
74+
return emit(tag, System.currentTimeMillis() / 1000, data);
75+
}
76+
77+
@Override
78+
public boolean emit(final String tag, final long timestamp, final Map<String, Object> data) {
79+
final RawSocketSender sender = this.sender;
80+
flusher.execute(new Runnable() {
81+
@Override
82+
public void run() {
83+
sender.emit(tag, timestamp, data);
84+
}
85+
});
86+
87+
return sender.isConnected() || reconnector.enableReconnection(System.currentTimeMillis());
88+
}
89+
90+
@Override
91+
public String getName() {
92+
return sender.getName();
93+
}
94+
95+
@Override
96+
public boolean isConnected() {
97+
return sender.isConnected();
98+
}
99+
100+
@Override
101+
public void setErrorHandler(ErrorHandler errorHandler) {
102+
if (errorHandler == null) {
103+
throw new IllegalArgumentException("errorHandler is null");
104+
}
105+
106+
this.errorHandler = errorHandler;
107+
}
108+
109+
@Override
110+
public void removeErrorHandler() {
111+
this.errorHandler = DEFAULT_ERROR_HANLDER;
112+
}
113+
}

0 commit comments

Comments
 (0)