Skip to content

Commit eb18029

Browse files
committed
- netty: code cleaup + minor fixes
1 parent ec8a153 commit eb18029

File tree

7 files changed

+157
-35
lines changed

7 files changed

+157
-35
lines changed

modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyContext.java

Lines changed: 72 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737

3838
import org.slf4j.Logger;
3939

40-
import com.typesafe.config.Config;
4140
import edu.umd.cs.findbugs.annotations.NonNull;
4241
import edu.umd.cs.findbugs.annotations.Nullable;
4342
import io.jooby.*;
@@ -51,7 +50,6 @@
5150
import io.netty.handler.codec.http.cookie.ServerCookieDecoder;
5251
import io.netty.handler.codec.http.multipart.*;
5352
import io.netty.handler.codec.http.websocketx.WebSocketDecoderConfig;
54-
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
5553
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
5654
import io.netty.handler.ssl.SslHandler;
5755
import io.netty.handler.stream.ChunkedNioFile;
@@ -412,25 +410,76 @@ public Context onComplete(@NonNull Route.Complete task) {
412410
return this;
413411
}
414412

413+
// @NonNull @Override
414+
// public Context upgrade(WebSocket.Initializer handler) {
415+
// try {
416+
// responseStarted = true;
417+
// Config conf = getRouter().getConfig();
418+
// int maxSize =
419+
// conf.hasPath("websocket.maxSize")
420+
// ? conf.getBytes("websocket.maxSize").intValue()
421+
// : WebSocket.MAX_BUFFER_SIZE;
422+
// String webSocketURL = getProtocol() + "://" + req.headers().get(HttpHeaderNames.HOST) +
423+
// path;
424+
// var config =
425+
// WebSocketDecoderConfig.newBuilder()
426+
// .allowExtensions(true)
427+
// .allowMaskMismatch(false)
428+
// .withUTF8Validator(false)
429+
// .maxFramePayloadLength(maxSize)
430+
// .build();
431+
// webSocket = new NettyWebSocket(this);
432+
// handler.init(Context.readOnly(this), webSocket);
433+
// var webSocketRequest =
434+
// new DefaultFullHttpRequest(
435+
// HTTP_1_1,
436+
// req.method(),
437+
// req.uri(),
438+
// Unpooled.EMPTY_BUFFER,
439+
// req.headers(),
440+
// EmptyHttpHeaders.INSTANCE);
441+
// var codec = ctx.pipeline().get(NettyServerCodec.class);
442+
// codec.webSocketHandshake(ctx);
443+
// WebSocketServerHandshakerFactory factory =
444+
// new WebSocketServerHandshakerFactory(webSocketURL, null, config);
445+
// WebSocketServerHandshaker handshaker = factory.newHandshaker(webSocketRequest);
446+
// handshaker.handshake(ctx.channel(), webSocketRequest);
447+
// webSocket.fireConnect();
448+
// long timeout =
449+
// conf.hasPath("websocket.idleTimeout")
450+
// ? conf.getDuration("websocket.idleTimeout", MILLISECONDS)
451+
// : MINUTES.toMillis(5);
452+
// if (timeout > 0) {
453+
// IdleStateHandler idle = new IdleStateHandler(timeout, 0, 0, MILLISECONDS);
454+
// ctx.pipeline().addBefore("handler", "idle", idle);
455+
// }
456+
// } catch (Throwable x) {
457+
// sendError(x);
458+
// }
459+
// return this;
460+
// }
415461
@NonNull @Override
416462
public Context upgrade(WebSocket.Initializer handler) {
417463
try {
418464
responseStarted = true;
419-
Config conf = getRouter().getConfig();
465+
var conf = getRouter().getConfig();
420466
int maxSize =
421467
conf.hasPath("websocket.maxSize")
422468
? conf.getBytes("websocket.maxSize").intValue()
423469
: WebSocket.MAX_BUFFER_SIZE;
424-
String webSocketURL = getProtocol() + "://" + req.headers().get(HttpHeaderNames.HOST) + path;
470+
var webSocketURL = getProtocol() + "://" + req.headers().get(HttpHeaderNames.HOST) + path;
471+
425472
var config =
426473
WebSocketDecoderConfig.newBuilder()
427474
.allowExtensions(true)
428475
.allowMaskMismatch(false)
429476
.withUTF8Validator(false)
430477
.maxFramePayloadLength(maxSize)
431478
.build();
479+
432480
webSocket = new NettyWebSocket(this);
433481
handler.init(Context.readOnly(this), webSocket);
482+
434483
var webSocketRequest =
435484
new DefaultFullHttpRequest(
436485
HTTP_1_1,
@@ -439,21 +488,36 @@ public Context upgrade(WebSocket.Initializer handler) {
439488
Unpooled.EMPTY_BUFFER,
440489
req.headers(),
441490
EmptyHttpHeaders.INSTANCE);
491+
442492
var codec = ctx.pipeline().get(NettyServerCodec.class);
443493
codec.webSocketHandshake(ctx);
444-
WebSocketServerHandshakerFactory factory =
445-
new WebSocketServerHandshakerFactory(webSocketURL, null, config);
446-
WebSocketServerHandshaker handshaker = factory.newHandshaker(webSocketRequest);
494+
495+
var factory = new WebSocketServerHandshakerFactory(webSocketURL, null, config);
496+
var handshaker = factory.newHandshaker(webSocketRequest);
497+
498+
if (handshaker == null) {
499+
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
500+
return this;
501+
}
502+
447503
handshaker.handshake(ctx.channel(), webSocketRequest);
448504
webSocket.fireConnect();
505+
449506
long timeout =
450507
conf.hasPath("websocket.idleTimeout")
451508
? conf.getDuration("websocket.idleTimeout", MILLISECONDS)
452509
: MINUTES.toMillis(5);
510+
453511
if (timeout > 0) {
454512
IdleStateHandler idle = new IdleStateHandler(timeout, 0, 0, MILLISECONDS);
455-
ctx.pipeline().addBefore("handler", "idle", idle);
513+
// If the global server timeout is already there, replace it. Otherwise, add it.
514+
if (ctx.pipeline().get(IdleStateHandler.class) != null) {
515+
ctx.pipeline().replace(IdleStateHandler.class, "idle", idle);
516+
} else {
517+
ctx.pipeline().addBefore("handler", "idle", idle);
518+
}
456519
}
520+
457521
} catch (Throwable x) {
458522
sendError(x);
459523
}

modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyGrpcExchange.java

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@
55
*/
66
package io.jooby.internal.netty;
77

8+
import java.net.URLEncoder;
89
import java.nio.ByteBuffer;
10+
import java.nio.charset.StandardCharsets;
911
import java.util.HashMap;
1012
import java.util.Map;
13+
import java.util.concurrent.atomic.AtomicBoolean;
1114
import java.util.function.Consumer;
1215

1316
import io.jooby.rpc.grpc.GrpcExchange;
@@ -17,7 +20,6 @@
1720
import io.netty.handler.codec.http.DefaultHttpContent;
1821
import io.netty.handler.codec.http.DefaultHttpResponse;
1922
import io.netty.handler.codec.http.DefaultLastHttpContent;
20-
import io.netty.handler.codec.http.HttpContent;
2123
import io.netty.handler.codec.http.HttpHeaderNames;
2224
import io.netty.handler.codec.http.HttpRequest;
2325
import io.netty.handler.codec.http.HttpResponse;
@@ -29,7 +31,8 @@ public class NettyGrpcExchange implements GrpcExchange {
2931

3032
private final ChannelHandlerContext ctx;
3133
private final HttpRequest request;
32-
private boolean headersSent = false;
34+
35+
private final AtomicBoolean headersSent = new AtomicBoolean(false);
3336

3437
public NettyGrpcExchange(ChannelHandlerContext ctx, HttpRequest request) {
3538
this.ctx = ctx;
@@ -58,12 +61,11 @@ public Map<String, String> getHeaders() {
5861
}
5962

6063
private void sendHeadersIfNecessary() {
61-
if (!headersSent) {
64+
if (headersSent.compareAndSet(false, true)) {
6265
// Send the initial HTTP/2 HEADERS frame (Status 200)
6366
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
6467
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/grpc");
6568
ctx.write(response);
66-
headersSent = true;
6769
}
6870
}
6971

@@ -72,7 +74,7 @@ public void send(ByteBuffer payload, Consumer<Throwable> callback) {
7274
sendHeadersIfNecessary();
7375

7476
// Wrap the NIO ByteBuffer in a Netty ByteBuf without copying
75-
HttpContent chunk = new DefaultHttpContent(Unpooled.wrappedBuffer(payload));
77+
var chunk = new DefaultHttpContent(Unpooled.wrappedBuffer(payload));
7678

7779
// Write and flush, then map Netty's Future to your single-lambda callback
7880
ctx.writeAndFlush(chunk)
@@ -88,28 +90,41 @@ public void send(ByteBuffer payload, Consumer<Throwable> callback) {
8890

8991
@Override
9092
public void close(int statusCode, String description) {
91-
if (headersSent) {
92-
// Trailers-Appended: Send the final HTTP/2 HEADERS frame with END_STREAM flag
93+
var encodedDescription = encodeGrpcMessage(description);
94+
95+
// If headers were already sent, we just need to send the final trailers
96+
if (headersSent.get()) {
9397
LastHttpContent lastContent = new DefaultLastHttpContent();
9498
lastContent.trailingHeaders().set("grpc-status", String.valueOf(statusCode));
95-
if (description != null) {
96-
lastContent.trailingHeaders().set("grpc-message", description);
99+
100+
if (encodedDescription != null) {
101+
lastContent.trailingHeaders().set("grpc-message", encodedDescription);
97102
}
98-
// writeAndFlush the LastHttpContent, then close the Netty stream channel
103+
99104
ctx.writeAndFlush(lastContent).addListener(ChannelFutureListener.CLOSE);
105+
100106
} else {
101-
// Trailers-Only: No body was sent, so standard headers become the trailers
102-
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
107+
// Trailers-Only fast path: Headers and trailers combined
108+
var response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
103109
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/grpc");
104110
response.headers().set("grpc-status", String.valueOf(statusCode));
105-
if (description != null) {
106-
response.headers().set("grpc-message", description);
111+
112+
if (encodedDescription != null) {
113+
response.headers().set("grpc-message", encodedDescription);
107114
}
108-
ctx.write(response);
109115

110-
// Close out the stream with an empty DATA frame possessing the END_STREAM flag
116+
ctx.write(response);
111117
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
112118
.addListener(ChannelFutureListener.CLOSE);
113119
}
114120
}
121+
122+
/** gRPC specification requires the grpc-message trailer to be percent-encoded. */
123+
private static String encodeGrpcMessage(String description) {
124+
if (description == null) {
125+
return null;
126+
}
127+
// URLEncoder uses '+' for spaces, but gRPC strictly expects '%20'
128+
return URLEncoder.encode(description, StandardCharsets.UTF_8).replace("+", "%20");
129+
}
115130
}

modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyGrpcHandler.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
*/
66
package io.jooby.internal.netty;
77

8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
811
import io.jooby.rpc.grpc.GrpcProcessor;
912
import io.netty.channel.ChannelFutureListener;
1013
import io.netty.channel.ChannelHandlerContext;
@@ -14,6 +17,8 @@
1417

1518
public class NettyGrpcHandler extends ChannelInboundHandlerAdapter {
1619

20+
private static final Logger log = LoggerFactory.getLogger(NettyGrpcHandler.class);
21+
1722
private final GrpcProcessor processor;
1823
private final boolean isHttp2;
1924

@@ -39,7 +44,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
3944
if (processor.isGrpcMethod(path)
4045
&& contentType != null
4146
&& contentType.startsWith("application/grpc")) {
42-
isGrpc = true;
4347

4448
if (!isHttp2) {
4549
// gRPC requires HTTP/2. Reject HTTP/1.1 calls immediately.
@@ -53,6 +57,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
5357
return;
5458
}
5559

60+
// This prevents leaking state on rejected HTTP/1.1 connections.
61+
isGrpc = true;
62+
5663
// We will implement NettyGrpcExchange in the next step
5764
var exchange = new NettyGrpcExchange(ctx, req);
5865
var subscriber = processor.process(exchange);
@@ -92,6 +99,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
9299

93100
@Override
94101
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
102+
log.debug("gRPC stream exception caught", cause);
95103
if (isGrpc) {
96104
ctx.close();
97105
} else {

modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyGrpcInputBridge.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,20 +50,27 @@ public void cancel() {
5050
ctx.close(); // Abort the connection
5151
}
5252

53-
/** Called by the NettyGrpcHandler when a new chunk arrives from the network. */
5453
public void onChunk(HttpContent chunk) {
5554
try {
5655
ByteBuf content = chunk.content();
5756
if (content.isReadable()) {
58-
// Convert Netty ByteBuf to standard Java ByteBuffer
59-
ByteBuffer buffer = content.nioBuffer();
6057

61-
// Pass to the gRPC deframer
62-
subscriber.onNext(buffer);
58+
// Copy the bytes. content.nioBuffer() shares memory with the ByteBuf,
59+
// which gets released in the finally block of NettyGrpcHandler.
60+
// We must copy it to an unmanaged heap buffer to prevent use-after-free corruption.
61+
byte[] bytes = new byte[content.readableBytes()];
62+
content.readBytes(bytes);
63+
ByteBuffer buffer = ByteBuffer.wrap(bytes);
6364

65+
// If onNext synchronously calls request(1), that request() will see demand transition
66+
// from 0 to 1 and safely trigger ctx.read().
6467
long currentDemand = demand.decrementAndGet();
68+
69+
// Pass the isolated buffer to the gRPC deframer
70+
subscriber.onNext(buffer);
71+
6572
if (currentDemand > 0) {
66-
// Still have demand, ask Netty for the next chunk
73+
// Still have demand from previous requests, ask Netty for the next chunk
6774
ctx.read();
6875
}
6976
}

modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyHandler.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.netty.handler.codec.http.multipart.*;
2222
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
2323
import io.netty.handler.timeout.IdleStateEvent;
24+
import io.netty.util.ReferenceCountUtil;
2425

2526
public class NettyHandler extends ChannelInboundHandlerAdapter {
2627
private final Logger log = LoggerFactory.getLogger(NettyServer.class);
@@ -88,6 +89,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
8889
if (req.getClass() == DefaultFullHttpRequest.class) {
8990
// HTTP2 aggregates all into a full http request.
9091
if (((DefaultFullHttpRequest) req).content().readableBytes() > maxRequestSize) {
92+
release(req);
9193
router.match(context).execute(context, Route.REQUEST_ENTITY_TOO_LARGE);
9294
return;
9395
}
@@ -127,6 +129,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
127129
} else if (isWebSocketFrame(msg)) {
128130
if (context.webSocket != null) {
129131
context.webSocket.handleFrame((WebSocketFrame) msg);
132+
} else {
133+
release(msg);
130134
}
131135
}
132136
}
@@ -189,9 +193,9 @@ public void writeChunks(Object header, Object body, ChannelPromise promise) {
189193
}
190194
}
191195

192-
private void release(HttpContent ref) {
193-
if (ref.refCnt() > 0) {
194-
ref.release();
196+
private void release(Object ref) {
197+
if (ReferenceCountUtil.refCnt(ref) > 0) {
198+
ReferenceCountUtil.release(ref);
195199
}
196200
}
197201

modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyPipeline.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,29 @@ private void setupHttp11Upgrade(ChannelPipeline pipeline) {
112112
protocol -> "h2c".equals(protocol.toString()) ? createH2CUpgradeCodec() : null,
113113
(int) maxRequestSize));
114114

115+
pipeline.addLast(
116+
"h2upgrade-cleaner",
117+
new ChannelInboundHandlerAdapter() {
118+
@Override
119+
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
120+
if (evt instanceof HttpServerUpgradeHandler.UpgradeEvent) {
121+
ChannelPipeline p = ctx.pipeline();
122+
123+
// Strip out HTTP/1.1 handlers that are no longer needed on the parent channel.
124+
// The Http2StreamInitializer will attach new ones for the multiplexed child channels.
125+
if (p.context("grpc") != null) p.remove("grpc");
126+
if (p.context("handler") != null) p.remove("handler");
127+
if (p.context("expect-continue") != null) p.remove("expect-continue");
128+
if (p.context("compressor") != null) p.remove("compressor");
129+
if (p.context("ws-compressor") != null) p.remove("ws-compressor");
130+
131+
// Remove this cleaner itself so it doesn't linger
132+
p.remove(this);
133+
}
134+
super.userEventTriggered(ctx, evt);
135+
}
136+
});
137+
115138
addCommonHandlers(pipeline);
116139

117140
// Inject gRPC handler (isHttp2 = false to trigger 426 Upgrade Required)

0 commit comments

Comments
 (0)