Skip to content

Commit bb4a8e8

Browse files
committed
Add repeatable HTTP/3 ping-pong test flow
Signed-off-by: jeroen.veltman <jeroen.veltman@nextend.nl>
1 parent dff7e2a commit bb4a8e8

File tree

6 files changed

+438
-11
lines changed

6 files changed

+438
-11
lines changed

rsocket-transport-netty/QUIC_HTTP3_TRANSPORTS.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,38 @@ In practice, that means a future `CONNECT`-based RSocket transport likely needs
4242
- keep `POST` as the default transport contract until the `CONNECT` path is validated end-to-end
4343
- if `CONNECT` is revisited, start with a dedicated experiment around response-header timing and remote settings negotiation
4444
- add a focused handshake-level test before changing the public default again
45+
46+
## Repeatable local checks
47+
48+
The repository now includes a small helper script for the HTTP/3 ping/pong fixtures:
49+
50+
- [scripts/http3-pingpong.sh](../scripts/http3-pingpong.sh)
51+
52+
It creates the temporary Gradle init scripts needed to forward `RSOCKET_*` properties into both the
53+
`Test` workers and the `JavaExec`-based pong server task, which makes the local loop repeatable.
54+
55+
Examples:
56+
57+
```bash
58+
./scripts/http3-pingpong.sh integration
59+
./scripts/http3-pingpong.sh pong-server
60+
./scripts/http3-pingpong.sh request-response 1000
61+
./scripts/http3-pingpong.sh request-stream 100
62+
./scripts/http3-pingpong.sh all
63+
```
64+
65+
The `all` command starts the HTTP/3 pong server in the background, waits for the UDP port to bind,
66+
then runs:
67+
68+
- `Http3TransportIntegrationTest`
69+
- `Http3Ping.requestResponseTest`
70+
- `Http3Ping.requestStreamTest`
71+
72+
Useful environment overrides:
73+
74+
- `RSOCKET_HTTP3_PORT`
75+
- `RSOCKET_HTTP3_BIND_HOST`
76+
- `RSOCKET_HTTP3_CLIENT_HOST`
77+
- `RSOCKET_HTTP3_PATH`
78+
- `RSOCKET_HTTP3_REQUEST_RESPONSE_INTERACTIONS`
79+
- `RSOCKET_HTTP3_REQUEST_STREAM_INTERACTIONS`

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/Http3DuplexConnection.java

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@
2828
import io.rsocket.DuplexConnection;
2929
import io.rsocket.RSocketErrorException;
3030
import io.rsocket.frame.ErrorFrameCodec;
31+
import io.rsocket.frame.FrameLengthCodec;
3132
import io.rsocket.internal.BaseDuplexConnection;
3233
import io.rsocket.internal.UnboundedProcessor;
33-
import java.nio.channels.ClosedChannelException;
3434
import java.net.SocketAddress;
3535
import java.util.Objects;
3636
import reactor.core.publisher.Flux;
@@ -41,6 +41,7 @@ public final class Http3DuplexConnection extends BaseDuplexConnection {
4141
private final String side;
4242
private final Channel connection;
4343
private final UnboundedProcessor inbound;
44+
private ByteBuf inboundBuffer;
4445

4546
public Http3DuplexConnection(Channel connection) {
4647
this("unknown", connection);
@@ -50,6 +51,16 @@ public Http3DuplexConnection(String side, Channel connection) {
5051
this.connection = Objects.requireNonNull(connection, "connection must not be null");
5152
this.side = side;
5253
this.inbound = new UnboundedProcessor();
54+
this.connection
55+
.closeFuture()
56+
.addListener(
57+
future -> {
58+
if (future.isSuccess()) {
59+
onClose.tryEmitEmpty();
60+
} else {
61+
onClose.tryEmitError(future.cause());
62+
}
63+
});
5364

5465
Flux.from(sender)
5566
.concatMap(
@@ -117,16 +128,50 @@ public Flux<ByteBuf> receive() {
117128
@Override
118129
public void sendErrorAndClose(RSocketErrorException e) {
119130
final ByteBuf errorFrame = ErrorFrameCodec.encode(alloc(), 0, e);
120-
sender.tryEmitFinal(errorFrame);
131+
sender.tryEmitFinal(FrameLengthCodec.encode(alloc(), errorFrame.readableBytes(), errorFrame));
132+
}
133+
134+
@Override
135+
public void sendFrame(int streamId, ByteBuf frame) {
136+
super.sendFrame(streamId, FrameLengthCodec.encode(alloc(), frame.readableBytes(), frame));
121137
}
122138

123139
public void handleHeaders(Http3HeadersFrame frame) {
124140
}
125141

126142
public void handleData(Http3DataFrame frame) {
127-
ByteBuf byteBuf = frame.content().retain();
128-
frame.release();
129-
inbound.onNext(byteBuf);
143+
try {
144+
ByteBuf content = frame.content();
145+
if (!content.isReadable()) {
146+
return;
147+
}
148+
149+
if (inboundBuffer == null) {
150+
inboundBuffer = alloc().buffer(content.readableBytes());
151+
}
152+
inboundBuffer.writeBytes(content, content.readerIndex(), content.readableBytes());
153+
154+
while (inboundBuffer.readableBytes() >= FrameLengthCodec.FRAME_LENGTH_SIZE) {
155+
inboundBuffer.markReaderIndex();
156+
int frameLength = FrameLengthCodec.length(inboundBuffer);
157+
if (inboundBuffer.readableBytes() < FrameLengthCodec.FRAME_LENGTH_SIZE + frameLength) {
158+
inboundBuffer.resetReaderIndex();
159+
break;
160+
}
161+
162+
inboundBuffer.skipBytes(FrameLengthCodec.FRAME_LENGTH_SIZE);
163+
inbound.onNext(inboundBuffer.readRetainedSlice(frameLength));
164+
}
165+
166+
if (inboundBuffer.isReadable()) {
167+
inboundBuffer.discardReadBytes();
168+
} else {
169+
inboundBuffer.release();
170+
inboundBuffer = null;
171+
}
172+
} finally {
173+
frame.release();
174+
}
130175
}
131176

132177
public void handleError(Throwable cause) {
@@ -135,8 +180,8 @@ public void handleError(Throwable cause) {
135180
}
136181

137182
public void handleInputClosed() {
138-
inbound.onError(new ClosedChannelException());
139-
onClose.tryEmitEmpty();
183+
// HTTP/3 request streams can observe half-close on the inbound side while the
184+
// opposite direction is still expected to deliver response frames.
140185
}
141186

142187
@Override

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/internal/Http3TransportBootstrap.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -147,10 +147,11 @@ protected void channelRead(
147147
protected void channelInputClosed(ChannelHandlerContext ctx) {
148148
if (holder[0] != null) {
149149
holder[0].handleInputClosed();
150+
} else {
151+
established.tryEmitError(
152+
new IllegalStateException(
153+
"HTTP/3 transport closed before handshake completed"));
150154
}
151-
established.tryEmitError(
152-
new IllegalStateException(
153-
"HTTP/3 transport closed before handshake completed"));
154155
}
155156

156157
@Override
@@ -363,7 +364,8 @@ protected void channelRead(ChannelHandlerContext ctx, Http3HeadersFrame frame) {
363364
response.headers().status("200").add("content-type", config.contentType());
364365
ctx.writeAndFlush(response);
365366
established = true;
366-
acceptor.apply(connection).doFinally(__ -> ctx.close()).subscribe(null, t -> ctx.close());
367+
acceptor.apply(connection).subscribe(null, t -> ctx.close());
368+
connection.onClose().doFinally(__ -> ctx.close()).subscribe();
367369
return;
368370
}
369371

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright 2015-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.transport.netty;
18+
19+
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
20+
import io.netty.incubator.codec.http3.Http3;
21+
import io.netty.incubator.codec.quic.QuicSslContext;
22+
import io.netty.incubator.codec.quic.QuicSslContextBuilder;
23+
import io.rsocket.RSocket;
24+
import io.rsocket.core.RSocketConnector;
25+
import io.rsocket.frame.decoder.PayloadDecoder;
26+
import io.rsocket.test.PerfTest;
27+
import io.rsocket.test.PingClient;
28+
import io.rsocket.transport.netty.client.Http3ClientTransport;
29+
import java.time.Duration;
30+
import org.HdrHistogram.Recorder;
31+
import org.junit.jupiter.api.BeforeEach;
32+
import org.junit.jupiter.api.Test;
33+
import reactor.core.publisher.Mono;
34+
35+
@PerfTest
36+
public final class Http3Ping {
37+
private static final int INTERACTIONS_COUNT =
38+
Integer.valueOf(System.getProperty("RSOCKET_HTTP3_INTERACTIONS", "1000000000"));
39+
private static final String host = System.getProperty("RSOCKET_TEST_HOST", "127.0.0.1");
40+
private static final int port =
41+
Integer.valueOf(
42+
System.getProperty(
43+
"RSOCKET_TEST_PORT", String.valueOf(Http3TransportConfig.DEFAULT_PORT)));
44+
private static final String path =
45+
System.getProperty("RSOCKET_TEST_PATH", Http3TransportConfig.DEFAULT_PATH);
46+
47+
@BeforeEach
48+
void setUp() {
49+
System.out.println("Starting ping-pong test (HTTP/3 transport)");
50+
System.out.println("host: " + host);
51+
System.out.println("port: " + port);
52+
System.out.println("path: " + path);
53+
}
54+
55+
@Test
56+
void requestResponseTest() throws Exception {
57+
PingClient pingClient = newPingClient();
58+
Recorder recorder = pingClient.startTracker(Duration.ofSeconds(1));
59+
60+
pingClient
61+
.requestResponsePingPong(INTERACTIONS_COUNT, recorder)
62+
.doOnTerminate(() -> System.out.println("Sent " + INTERACTIONS_COUNT + " messages."))
63+
.blockLast();
64+
}
65+
66+
@Test
67+
void requestStreamTest() throws Exception {
68+
PingClient pingClient = newPingClient();
69+
Recorder recorder = pingClient.startTracker(Duration.ofSeconds(1));
70+
71+
pingClient
72+
.requestStreamPingPong(INTERACTIONS_COUNT, recorder)
73+
.doOnTerminate(() -> System.out.println("Sent " + INTERACTIONS_COUNT + " messages."))
74+
.blockLast();
75+
}
76+
77+
private static PingClient newPingClient() throws Exception {
78+
QuicSslContext clientSslContext =
79+
QuicSslContextBuilder.forClient()
80+
.trustManager(InsecureTrustManagerFactory.INSTANCE)
81+
.applicationProtocols(Http3.supportedApplicationProtocols())
82+
.build();
83+
84+
Http3TransportConfig clientConfig =
85+
Http3TransportConfig.builder()
86+
.path(path)
87+
.quicConfig(
88+
QuicTransportConfig.builder()
89+
.sslContext(clientSslContext)
90+
.validateCertificates(false)
91+
.build())
92+
.build();
93+
94+
Mono<RSocket> rSocket =
95+
RSocketConnector.create()
96+
.payloadDecoder(PayloadDecoder.ZERO_COPY)
97+
.keepAlive(Duration.ofMinutes(1), Duration.ofMinutes(30))
98+
.connect(Http3ClientTransport.create(host, port).config(clientConfig));
99+
100+
return new PingClient(rSocket);
101+
}
102+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2015-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.transport.netty;
18+
19+
import io.netty.handler.ssl.util.SelfSignedCertificate;
20+
import io.netty.incubator.codec.http3.Http3;
21+
import io.netty.incubator.codec.quic.QuicSslContext;
22+
import io.netty.incubator.codec.quic.QuicSslContextBuilder;
23+
import io.rsocket.core.RSocketServer;
24+
import io.rsocket.frame.decoder.PayloadDecoder;
25+
import io.rsocket.test.PingHandler;
26+
import io.rsocket.transport.netty.server.Http3ServerTransport;
27+
28+
public final class Http3PongServer {
29+
private static final String bindAddress = System.getProperty("RSOCKET_TEST_HOST", "127.0.0.1");
30+
private static final int port =
31+
Integer.valueOf(
32+
System.getProperty(
33+
"RSOCKET_TEST_PORT", String.valueOf(Http3TransportConfig.DEFAULT_PORT)));
34+
private static final String path =
35+
System.getProperty("RSOCKET_TEST_PATH", Http3TransportConfig.DEFAULT_PATH);
36+
37+
public static void main(String... args) throws Exception {
38+
System.out.println("Starting HTTP/3 ping-pong server");
39+
System.out.println("bind address: " + bindAddress);
40+
System.out.println("port: " + port);
41+
System.out.println("path: " + path);
42+
43+
SelfSignedCertificate certificate = new SelfSignedCertificate();
44+
try {
45+
QuicSslContext serverSslContext =
46+
QuicSslContextBuilder.forServer(
47+
certificate.privateKey(), null, certificate.certificate())
48+
.applicationProtocols(Http3.supportedApplicationProtocols())
49+
.build();
50+
51+
Http3TransportConfig serverConfig =
52+
Http3TransportConfig.builder()
53+
.path(path)
54+
.quicConfig(QuicTransportConfig.builder().sslContext(serverSslContext).build())
55+
.build();
56+
57+
RSocketServer.create(new PingHandler())
58+
.payloadDecoder(PayloadDecoder.ZERO_COPY)
59+
.bind(Http3ServerTransport.create(bindAddress, port).config(serverConfig))
60+
.block()
61+
.onClose()
62+
.block();
63+
} finally {
64+
certificate.delete();
65+
}
66+
}
67+
}

0 commit comments

Comments
 (0)