Skip to content

Commit 540eb99

Browse files
author
Mark
committed
changed velocystream message sending to async
1 parent 3ef5fba commit 540eb99

File tree

7 files changed

+251
-42
lines changed

7 files changed

+251
-42
lines changed

ChangeLog

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
v4.1.10 (2017-02-xx)
2+
---------------------------
3+
* changed velocystream message sending to async
4+
15
v4.1.9 (2017-02-10)
26
---------------------------
37
* added missing IndexType.edge
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* DISCLAIMER
3+
*
4+
* Copyright 2016 ArangoDB GmbH, Cologne, Germany
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*
18+
* Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
*/
20+
21+
package com.arangodb.internal.velocystream;
22+
23+
import java.nio.BufferUnderflowException;
24+
import java.nio.ByteBuffer;
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
28+
/**
29+
* @author Mark - mark at arangodb.com
30+
*
31+
*/
32+
public class ChunkStore {
33+
34+
private final MessageStore messageStore;
35+
private final Map<Long, ByteBuffer> data;
36+
37+
public ChunkStore(final MessageStore messageStore) {
38+
super();
39+
this.messageStore = messageStore;
40+
data = new HashMap<Long, ByteBuffer>();
41+
}
42+
43+
public ByteBuffer storeChunk(final Chunk chunk) throws BufferUnderflowException, IndexOutOfBoundsException {
44+
final long messageId = chunk.getMessageId();
45+
ByteBuffer chunkBuffer = data.get(messageId);
46+
if (chunkBuffer == null) {
47+
if (!chunk.isFirstChunk()) {
48+
messageStore.cancel(messageId);
49+
return null;
50+
}
51+
final int length = (int) (chunk.getMessageLength() > 0 ? chunk.getMessageLength()
52+
: chunk.getContentLength());
53+
chunkBuffer = ByteBuffer.allocate(length);
54+
data.put(messageId, chunkBuffer);
55+
}
56+
return chunkBuffer;
57+
}
58+
59+
public void checkCompleteness(final long messageId) {
60+
checkCompleteness(messageId, data.get(messageId));
61+
}
62+
63+
private void checkCompleteness(final long messageId, final ByteBuffer chunkBuffer)
64+
throws BufferUnderflowException, IndexOutOfBoundsException {
65+
if (chunkBuffer.position() == chunkBuffer.limit()) {
66+
messageStore.consume(new Message(messageId, chunkBuffer.array()));
67+
data.remove(messageId);
68+
}
69+
}
70+
71+
}

src/main/java/com/arangodb/internal/velocystream/CommunicationSync.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ protected CommunicationSync(final String host, final Integer port, final Integer
105105
final String password, final Boolean useSsl, final SSLContext sslContext, final VPack vpack,
106106
final CollectionCache collectionCache, final Integer chunksize) {
107107
super(host, port, timeout, user, password, useSsl, sslContext, vpack, collectionCache, chunksize,
108-
new ConnectionSync.Builder().host(host).port(port).timeout(timeout).useSsl(useSsl)
108+
new ConnectionSync.Builder(new MessageStore()).host(host).port(port).timeout(timeout).useSsl(useSsl)
109109
.sslContext(sslContext).build());
110110
}
111111

src/main/java/com/arangodb/internal/velocystream/Connection.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
import java.nio.ByteBuffer;
3030
import java.nio.ByteOrder;
3131
import java.util.Collection;
32+
import java.util.concurrent.Callable;
33+
import java.util.concurrent.ExecutorService;
34+
import java.util.concurrent.Executors;
3235

3336
import javax.net.SocketFactory;
3437
import javax.net.ssl.SSLContext;
@@ -51,6 +54,9 @@ public abstract class Connection {
5154
private static final Logger LOGGER = LoggerFactory.getLogger(Connection.class);
5255
private static final byte[] PROTOCOL_HEADER = "VST/1.0\r\n\r\n".getBytes();
5356

57+
private ExecutorService executor;
58+
protected final MessageStore messageStore;
59+
5460
private final String host;
5561
private final Integer port;
5662
private final Integer timeout;
@@ -62,13 +68,14 @@ public abstract class Connection {
6268
private InputStream inputStream;
6369

6470
protected Connection(final String host, final Integer port, final Integer timeout, final Boolean useSsl,
65-
final SSLContext sslContext) {
71+
final SSLContext sslContext, final MessageStore messageStore) {
6672
super();
6773
this.host = host;
6874
this.port = port;
6975
this.timeout = timeout;
7076
this.useSsl = useSsl;
7177
this.sslContext = sslContext;
78+
this.messageStore = messageStore;
7279
}
7380

7481
public boolean isOpen() {
@@ -111,12 +118,45 @@ public synchronized void open() throws IOException {
111118
((SSLSocket) socket).startHandshake();
112119
}
113120
sendProtocolHeader();
121+
executor = Executors.newSingleThreadExecutor();
122+
executor.submit(new Callable<Void>() {
123+
@Override
124+
public Void call() throws Exception {
125+
final ChunkStore chunkStore = new ChunkStore(messageStore);
126+
while (true) {
127+
if (!isOpen()) {
128+
messageStore.clear(new IOException("The socket is closed."));
129+
close();
130+
break;
131+
}
132+
try {
133+
final Chunk chunk = readChunk();
134+
final ByteBuffer chunkBuffer = chunkStore.storeChunk(chunk);
135+
if (chunkBuffer != null) {
136+
final byte[] buf = new byte[chunk.getContentLength()];
137+
readBytesIntoBuffer(buf, 0, buf.length);
138+
chunkBuffer.put(buf);
139+
chunkStore.checkCompleteness(chunk.getMessageId());
140+
}
141+
} catch (final Exception e) {
142+
messageStore.clear(e);
143+
close();
144+
break;
145+
}
146+
}
147+
return null;
148+
}
149+
});
114150
}
115151

116152
public synchronized void close() {
117153
if (LOGGER.isDebugEnabled()) {
118154
LOGGER.debug(String.format("Close connection %s", socket));
119155
}
156+
messageStore.clear();
157+
if (executor != null && !executor.isShutdown()) {
158+
executor.shutdown();
159+
}
120160
if (socket != null) {
121161
try {
122162
socket.close();

src/main/java/com/arangodb/internal/velocystream/ConnectionSync.java

Lines changed: 23 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,13 @@
2020

2121
package com.arangodb.internal.velocystream;
2222

23-
import java.io.IOException;
2423
import java.util.Collection;
24+
import java.util.concurrent.Callable;
25+
import java.util.concurrent.ExecutionException;
26+
import java.util.concurrent.FutureTask;
2527

2628
import javax.net.ssl.SSLContext;
2729

28-
import org.slf4j.Logger;
29-
import org.slf4j.LoggerFactory;
30-
3130
import com.arangodb.ArangoDBException;
3231

3332
/**
@@ -36,18 +35,18 @@
3635
*/
3736
public class ConnectionSync extends Connection {
3837

39-
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionSync.class);
40-
4138
public static class Builder {
4239

4340
private String host;
4441
private Integer port;
4542
private Integer timeout;
4643
private Boolean useSsl;
4744
private SSLContext sslContext;
45+
private final MessageStore messageStore;
4846

49-
public Builder() {
47+
public Builder(final MessageStore messageStore) {
5048
super();
49+
this.messageStore = messageStore;
5150
}
5251

5352
public Builder host(final String host) {
@@ -76,47 +75,31 @@ public Builder sslContext(final SSLContext sslContext) {
7675
}
7776

7877
public ConnectionSync build() {
79-
return new ConnectionSync(host, port, timeout, useSsl, sslContext);
78+
return new ConnectionSync(host, port, timeout, useSsl, sslContext, messageStore);
8079
}
8180
}
8281

8382
private ConnectionSync(final String host, final Integer port, final Integer timeout, final Boolean useSsl,
84-
final SSLContext sslContext) {
85-
super(host, port, timeout, useSsl, sslContext);
83+
final SSLContext sslContext, final MessageStore messageStore) {
84+
super(host, port, timeout, useSsl, sslContext, messageStore);
8685
}
8786

88-
public synchronized Message write(final Message message, final Collection<Chunk> chunks) throws ArangoDBException {
89-
super.writeIntern(message, chunks);
90-
byte[] chunkBuffer = null;
91-
int off = 0;
92-
while (chunkBuffer == null || off < chunkBuffer.length) {
93-
if (!isOpen()) {
94-
close();
95-
throw new ArangoDBException(new IOException("The socket is closed."));
87+
public Message write(final Message message, final Collection<Chunk> chunks) throws ArangoDBException {
88+
final FutureTask<Message> task = new FutureTask<Message>(new Callable<Message>() {
89+
@Override
90+
public Message call() throws Exception {
91+
return messageStore.get(message.getId());
9692
}
97-
try {
98-
final Chunk chunk = readChunk();
99-
final int contentLength = chunk.getContentLength();
100-
if (chunkBuffer == null) {
101-
if (!chunk.isFirstChunk()) {
102-
throw new ArangoDBException("Wrong Chunk recieved! Expected first Chunk.");
103-
}
104-
final int length = (int) (chunk.getMessageLength() > 0 ? chunk.getMessageLength() : contentLength);
105-
chunkBuffer = new byte[length];
106-
}
107-
readBytesIntoBuffer(chunkBuffer, off, contentLength);
108-
off += contentLength;
109-
} catch (final Exception e) {
110-
close();
111-
throw new ArangoDBException(e);
112-
}
113-
}
114-
final Message responseMessage = new Message(message.getId(), chunkBuffer);
115-
if (LOGGER.isDebugEnabled()) {
116-
LOGGER.debug(String.format("Received Message (id=%s, head=%s, body=%s)", responseMessage.getId(),
117-
responseMessage.getHead(), responseMessage.getBody() != null ? responseMessage.getBody() : "{}"));
93+
});
94+
messageStore.storeMessage(message.getId(), task);
95+
super.writeIntern(message, chunks);
96+
try {
97+
return task.get();
98+
} catch (final InterruptedException e) {
99+
throw new ArangoDBException(e);
100+
} catch (final ExecutionException e) {
101+
throw new ArangoDBException(e);
118102
}
119-
return responseMessage;
120103
}
121104

122105
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* DISCLAIMER
3+
*
4+
* Copyright 2016 ArangoDB GmbH, Cologne, Germany
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*
18+
* Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
*/
20+
21+
package com.arangodb.internal.velocystream;
22+
23+
import java.util.Map;
24+
import java.util.Map.Entry;
25+
import java.util.concurrent.ConcurrentHashMap;
26+
import java.util.concurrent.FutureTask;
27+
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import com.arangodb.ArangoDBException;
32+
33+
/**
34+
* @author Mark - mark at arangodb.com
35+
*
36+
*/
37+
public class MessageStore {
38+
39+
private static final Logger LOGGER = LoggerFactory.getLogger(MessageStore.class);
40+
41+
private final Map<Long, FutureTask<Message>> task;
42+
private final Map<Long, Message> response;
43+
private final Map<Long, Exception> error;
44+
45+
public MessageStore() {
46+
super();
47+
task = new ConcurrentHashMap<Long, FutureTask<Message>>();
48+
response = new ConcurrentHashMap<Long, Message>();
49+
error = new ConcurrentHashMap<Long, Exception>();
50+
}
51+
52+
public void storeMessage(final long messageId, final FutureTask<Message> future) {
53+
task.put(messageId, future);
54+
}
55+
56+
public void consume(final Message message) {
57+
final FutureTask<Message> future = task.remove(message.getId());
58+
if (future != null) {
59+
if (LOGGER.isDebugEnabled()) {
60+
LOGGER.debug(String.format("Received Message (id=%s, head=%s, body=%s)", message.getId(),
61+
message.getHead(), message.getBody() != null ? message.getBody() : "{}"));
62+
}
63+
response.put(message.getId(), message);
64+
future.run();
65+
}
66+
}
67+
68+
public Message get(final long messageId) throws ArangoDBException {
69+
final Message result = response.remove(messageId);
70+
if (result == null) {
71+
final Exception e = error.remove(messageId);
72+
if (e != null) {
73+
throw new ArangoDBException(e);
74+
}
75+
}
76+
return result;
77+
}
78+
79+
public void cancel(final long messageId) {
80+
final FutureTask<Message> future = task.remove(messageId);
81+
if (future != null) {
82+
LOGGER.error(String.format("Cancel Message unexpected (id=%s).", messageId));
83+
future.cancel(true);
84+
}
85+
}
86+
87+
public void clear(final Exception e) {
88+
if (!task.isEmpty()) {
89+
LOGGER.error(e.getMessage(), e);
90+
}
91+
for (final Entry<Long, FutureTask<Message>> entry : task.entrySet()) {
92+
if (LOGGER.isDebugEnabled()) {
93+
LOGGER.debug(String.format("Exceptionally complete Message (id=%s).", entry.getKey()));
94+
}
95+
error.put(entry.getKey(), e);
96+
entry.getValue().run();
97+
}
98+
task.clear();
99+
}
100+
101+
public void clear() {
102+
for (final Entry<Long, FutureTask<Message>> entry : task.entrySet()) {
103+
if (LOGGER.isDebugEnabled()) {
104+
LOGGER.debug(String.format("Cancel Message (id=%s).", entry.getKey()));
105+
}
106+
entry.getValue().cancel(true);
107+
}
108+
task.clear();
109+
}
110+
111+
}

src/test/java/com/arangodb/internal/net/CommunicationTest.java renamed to src/test/java/com/arangodb/internal/velocystream/CommunicationTest.java

File renamed without changes.

0 commit comments

Comments
 (0)