Skip to content

Commit a576cd0

Browse files
committed
Fixed Undertow and added websocket support for it
1 parent 5fb52ac commit a576cd0

File tree

8 files changed

+168
-71
lines changed

8 files changed

+168
-71
lines changed

src/main/java/org/javawebstack/httpserver/adapter/simple/SimpleHTTPSocketServer.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import org.javawebstack.httpserver.adapter.IHTTPSocketHandler;
44
import org.javawebstack.httpserver.adapter.IHTTPSocketServer;
5+
import org.javawebstack.httpserver.util.websocket.WebSocketUtil;
56

67
import java.io.IOException;
78
import java.net.ServerSocket;
@@ -23,7 +24,17 @@ public SimpleHTTPSocketServer() {
2324
try {
2425
Socket socket = serverSocket.accept();
2526
SimpleHTTPSocket httpSocket = new SimpleHTTPSocket(socket);
26-
executorService.execute(() -> handler.handle(httpSocket));
27+
executorService.execute(() -> {
28+
if(httpSocket.getRequestHeaderNames().contains("sec-websocket-key")) {
29+
try {
30+
if(!WebSocketUtil.accept(httpSocket, null))
31+
return;
32+
} catch (IOException e) {
33+
return;
34+
}
35+
}
36+
handler.handle(httpSocket);
37+
});
2738
} catch (IOException exception) {}
2839
}
2940
});
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package org.javawebstack.httpserver.adapter.undertow;
2+
3+
import org.xnio.channels.StreamSinkChannel;
4+
5+
import java.io.IOException;
6+
import java.io.OutputStream;
7+
import java.nio.ByteBuffer;
8+
9+
public class StreamSinkOutputStream extends OutputStream {
10+
11+
private final StreamSinkChannel sink;
12+
private final ByteBuffer byteBuffer = ByteBuffer.allocate(1);
13+
14+
public StreamSinkOutputStream(StreamSinkChannel sink) {
15+
this.sink = sink;
16+
}
17+
18+
public void write(int i) throws IOException {
19+
byteBuffer.position(0);
20+
byteBuffer.put((byte) i);
21+
byteBuffer.position(0);
22+
sink.write(byteBuffer);
23+
}
24+
25+
public void close() throws IOException {
26+
sink.close();
27+
}
28+
29+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package org.javawebstack.httpserver.adapter.undertow;
2+
3+
import org.xnio.channels.StreamSourceChannel;
4+
5+
import java.io.IOException;
6+
import java.io.InputStream;
7+
import java.nio.ByteBuffer;
8+
9+
public class StreamSourceInputStream extends InputStream {
10+
11+
private final StreamSourceChannel source;
12+
private final ByteBuffer byteBuffer = ByteBuffer.allocate(1);
13+
14+
public StreamSourceInputStream(StreamSourceChannel source) {
15+
this.source = source;
16+
}
17+
18+
public synchronized int read() throws IOException {
19+
byteBuffer.position(0);
20+
int r;
21+
while ((r = source.read(byteBuffer)) == 0)
22+
Thread.yield();
23+
if(r == -1)
24+
return -1;
25+
int b = byteBuffer.position(0).get();
26+
return b;
27+
}
28+
29+
}

src/main/java/org/javawebstack/httpserver/adapter/untertow/UndertowHTTPSocket.java renamed to src/main/java/org/javawebstack/httpserver/adapter/undertow/UndertowHTTPSocket.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
package org.javawebstack.httpserver.adapter.untertow;
1+
package org.javawebstack.httpserver.adapter.undertow;
22

3-
import io.undertow.server.BlockingHttpExchange;
43
import io.undertow.server.HttpServerExchange;
54
import io.undertow.util.HeaderValues;
65
import io.undertow.util.HttpString;
@@ -17,24 +16,29 @@
1716
public class UndertowHTTPSocket implements IHTTPSocket {
1817

1918
private final HttpServerExchange exchange;
20-
private final BlockingHttpExchange blockingExchange;
19+
private final InputStream inputStream;
20+
private final OutputStream outputStream;
21+
private boolean closed;
2122

22-
public UndertowHTTPSocket(HttpServerExchange exchange) {
23+
public UndertowHTTPSocket(HttpServerExchange exchange, InputStream inputStream, OutputStream outputStream) {
2324
this.exchange = exchange;
24-
this.blockingExchange = exchange.startBlocking();
25+
this.inputStream = inputStream == null ? exchange.getInputStream() : inputStream;
26+
this.outputStream = outputStream == null ? exchange.getOutputStream() : outputStream;
2527
}
2628

2729
public InputStream getInputStream() throws IOException {
28-
return exchange.getInputStream();
30+
return inputStream;
2931
}
3032

3133
public OutputStream getOutputStream() throws IOException {
32-
return exchange.getOutputStream();
34+
return outputStream;
3335
}
3436

3537
public void close() throws IOException {
36-
blockingExchange.close();
37-
exchange.endExchange();
38+
if(closed)
39+
return;
40+
closed = true;
41+
exchange.getOutputStream().close();
3842
}
3943

4044
public boolean isClosed() {
@@ -102,7 +106,7 @@ public String getResponseStatusMessage() {
102106
}
103107

104108
public void writeHeaders() throws IOException {
105-
exchange.getResponseSender().send("");
109+
exchange.getOutputStream().write(new byte[0]);
106110
}
107111

108112
public String getRemoteAddress() {
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package org.javawebstack.httpserver.adapter.undertow;
2+
3+
import io.undertow.Undertow;
4+
import io.undertow.server.handlers.BlockingHandler;
5+
import io.undertow.websockets.core.WebSocketVersion;
6+
import org.javawebstack.httpserver.adapter.IHTTPSocketHandler;
7+
import org.javawebstack.httpserver.adapter.IHTTPSocketServer;
8+
import org.javawebstack.httpserver.util.websocket.WebSocketUtil;
9+
import org.xnio.Options;
10+
11+
import java.io.IOException;
12+
import java.io.InputStream;
13+
import java.io.OutputStream;
14+
import java.util.concurrent.ExecutorService;
15+
import java.util.concurrent.Executors;
16+
17+
public class UndertowHTTPSocketServer implements IHTTPSocketServer {
18+
19+
private int port = 80;
20+
private Undertow server;
21+
private IHTTPSocketHandler handler;
22+
private ExecutorService executorService;
23+
24+
public void setPort(int port) {
25+
this.port = port;
26+
}
27+
28+
public int getPort() {
29+
return port;
30+
}
31+
32+
public void start() throws IOException {
33+
executorService = Executors.newCachedThreadPool();
34+
server = Undertow.builder()
35+
.addHttpListener(port, "0.0.0.0")
36+
.setServerOption(Options.KEEP_ALIVE, true)
37+
.setHandler(new BlockingHandler(httpServerExchange -> {
38+
if(httpServerExchange.getRequestHeaders().contains("sec-websocket-key")) {
39+
httpServerExchange.upgradeChannel((streamConnection, httpServerExchange1) -> {
40+
InputStream inputStream = new StreamSourceInputStream(streamConnection.getSourceChannel());
41+
OutputStream outputStream = new StreamSinkOutputStream(streamConnection.getSinkChannel());
42+
handler.handle(new UndertowHTTPSocket(httpServerExchange1, inputStream, outputStream));
43+
});
44+
httpServerExchange.putAttachment(WebSocketVersion.ATTACHMENT_KEY, WebSocketVersion.V13);
45+
if(!WebSocketUtil.accept(new UndertowHTTPSocket(httpServerExchange, null, null), null))
46+
return;
47+
httpServerExchange.endExchange();
48+
} else {
49+
handler.handle(new UndertowHTTPSocket(httpServerExchange, null, null));
50+
}
51+
}))
52+
.build();
53+
server.start();
54+
}
55+
56+
public void stop() {
57+
executorService.shutdown();
58+
server.stop();
59+
}
60+
61+
public void join() {
62+
try {
63+
server.getWorker().awaitTermination();
64+
} catch (InterruptedException e) {}
65+
}
66+
67+
public void setHandler(IHTTPSocketHandler handler) {
68+
this.handler = handler;
69+
}
70+
71+
public boolean isWebSocketSupported() {
72+
return true;
73+
}
74+
75+
}

src/main/java/org/javawebstack/httpserver/adapter/untertow/UndertowHTTPSocketServer.java

Lines changed: 0 additions & 49 deletions
This file was deleted.

src/main/java/org/javawebstack/httpserver/util/websocket/WebSocketFrame.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,11 @@ public void write(OutputStream stream) throws IOException {
9898

9999
public static WebSocketFrame read(InputStream stream) throws IOException {
100100
WebSocketFrame frame = new WebSocketFrame();
101-
int b = safeRead(stream);
101+
byte b = safeRead(stream);
102102
frame.flags = (byte) (b & 0xF0);
103103
frame.opcode = (byte) (b & 0x0F);
104104
b = safeRead(stream);
105-
frame.maskKey = (b >> 7) == 1 ? new byte[4] : null;
105+
frame.maskKey = ((b & 0xFF) >> 7) == 1 ? new byte[4] : null;
106106
int len = b & 0b0111_1111;
107107
if(len == 126) {
108108
len = safeRead(stream) << 8;
@@ -114,27 +114,27 @@ public static WebSocketFrame read(InputStream stream) throws IOException {
114114
len |= safeRead(stream);
115115
}
116116
if(frame.maskKey != null) {
117-
frame.maskKey[0] = (byte) safeRead(stream);
118-
frame.maskKey[1] = (byte) safeRead(stream);
119-
frame.maskKey[2] = (byte) safeRead(stream);
120-
frame.maskKey[3] = (byte) safeRead(stream);
117+
frame.maskKey[0] = safeRead(stream);
118+
frame.maskKey[1] = safeRead(stream);
119+
frame.maskKey[2] = safeRead(stream);
120+
frame.maskKey[3] = safeRead(stream);
121121
}
122122
frame.payload = new byte[len];
123123
if(frame.maskKey != null) {
124124
for(int i=0; i<len; i++)
125125
frame.payload[i] = (byte) (safeRead(stream) ^ frame.maskKey[i % 4]);
126126
} else {
127127
for(int i=0; i<len; i++)
128-
frame.payload[i] = (byte) safeRead(stream);
128+
frame.payload[i] = safeRead(stream);
129129
}
130130
return frame;
131131
}
132132

133-
private static int safeRead(InputStream stream) throws IOException {
133+
private static byte safeRead(InputStream stream) throws IOException {
134134
int b = stream.read();
135135
if(b == -1)
136136
throw new IOException("Unexpected end of stream");
137-
return b;
137+
return (byte) b;
138138
}
139139

140140
}

src/main/java/org/javawebstack/httpserver/websocket/InternalWebSocketRequestHandler.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ public InternalWebSocketRequestHandler(WebSocketHandler handler) {
2121
public Object handle(Exchange exchange) {
2222
IHTTPSocket socket = exchange.socket();
2323
try {
24-
if(!WebSocketUtil.accept(socket, null))
25-
return null;
2624
WebSocket webSocket = new WebSocket(exchange);
2725
handler.onConnect(webSocket);
2826
WebSocketFrame frame;

0 commit comments

Comments
 (0)