Skip to content

Commit b597423

Browse files
committed
Remove Sender#getBuffer and make RawSocketSender#getBuffer thread-safe
1 parent dbb9404 commit b597423

File tree

4 files changed

+10
-12
lines changed

4 files changed

+10
-12
lines changed

src/main/java/org/fluentd/logger/sender/NullSender.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,6 @@ public boolean emit(String tag, long timestamp, Map<String, Object> data) {
3838
public void flush() {
3939
}
4040

41-
@Override
42-
public byte[] getBuffer() {
43-
return new byte[0];
44-
}
45-
4641
@Override
4742
public void close() {
4843
}

src/main/java/org/fluentd/logger/sender/RawSocketSender.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ private void reconnect(boolean forceReconnection) throws IOException {
9191
}
9292
}
9393

94+
@Override
9495
public void close() {
9596
// close output stream
9697
if (out != null) {
@@ -113,10 +114,12 @@ public void close() {
113114
}
114115
}
115116

117+
@Override
116118
public boolean emit(String tag, Map<String, Object> data) {
117119
return emit(tag, System.currentTimeMillis() / 1000, data);
118120
}
119121

122+
@Override
120123
public boolean emit(String tag, long timestamp, Map<String, Object> data) {
121124
return emit(new Event(tag, timestamp, data));
122125
}
@@ -171,6 +174,7 @@ private synchronized boolean send(byte[] bytes) {
171174
return true;
172175
}
173176

177+
@Override
174178
public synchronized void flush() {
175179
try {
176180
// check whether connection is established or not
@@ -186,7 +190,7 @@ public synchronized void flush() {
186190
}
187191
}
188192

189-
public byte[] getBuffer() {
193+
synchronized byte[] getBuffer() {
190194
int len = pendings.position();
191195
pendings.position(0);
192196
byte[] ret = new byte[len];
@@ -198,6 +202,7 @@ private void clearBuffer() {
198202
pendings.clear();
199203
}
200204

205+
@Override
201206
public String getName() {
202207
return name;
203208
}

src/main/java/org/fluentd/logger/sender/Sender.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,7 @@ public interface Sender {
2626

2727
void flush();
2828

29-
public byte[] getBuffer();
29+
void close();
3030

31-
public void close();
32-
33-
public String getName();
31+
String getName();
3432
}

src/test/java/org/fluentd/logger/sender/TestSenderFluentdDownOperation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public void testFluentdDownOperation01() throws Exception {
3030
long timestamp = System.currentTimeMillis() / 1000;
3131

3232
// start senders
33-
Sender sender = new RawSocketSender("localhost", port);
33+
RawSocketSender sender = new RawSocketSender("localhost", port);
3434
Map<String, Object> data = new HashMap<String, Object>();
3535
data.put("t1k1", "t1v1");
3636
data.put("t1k2", "t1v2");
@@ -75,7 +75,7 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
7575
server.start();
7676

7777
// start senders
78-
Sender sender = new RawSocketSender("localhost", port);
78+
RawSocketSender sender = new RawSocketSender("localhost", port);
7979

8080
// server close
8181
server.close();

0 commit comments

Comments
 (0)