Skip to content

Commit 22d2ea5

Browse files
committed
Cap pending HTTP/2 request commands per connection
Restore per-session command count and use it in H2MultiplexingRequester to fail fast when the per-connection command queue exceeds the configured limit.
1 parent 712b33c commit 22d2ea5

11 files changed

Lines changed: 348 additions & 8 deletions

File tree

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2AsyncRequester.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,10 @@ public H2AsyncRequester(
7777
final TlsStrategy tlsStrategy,
7878
final Timeout handshakeTimeout,
7979
final IOReactorMetricsListener threadPoolListener,
80-
final IOWorkerSelector workerSelector) {
80+
final IOWorkerSelector workerSelector,
81+
final int maxPendingCommandsPerConnection) {
8182
super(ioReactorConfig, eventHandlerFactory, ioSessionDecorator, exceptionCallback, sessionListener, connPool,
82-
tlsStrategy, handshakeTimeout, threadPoolListener, workerSelector);
83+
tlsStrategy, handshakeTimeout, threadPoolListener, workerSelector, maxPendingCommandsPerConnection);
8384
this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
8485
}
8586

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.List;
3434
import java.util.Set;
3535
import java.util.concurrent.Future;
36+
import java.util.concurrent.RejectedExecutionException;
3637

3738
import org.apache.hc.core5.annotation.Internal;
3839
import org.apache.hc.core5.concurrent.Cancellable;
@@ -87,6 +88,12 @@ public class H2MultiplexingRequester extends AsyncRequester {
8788

8889
private final H2ConnPool connPool;
8990

91+
/**
92+
* Hard cap on per-connection queued / in-flight commands.
93+
* {@code <= 0} disables the cap.
94+
*/
95+
private final int maxCommandsPerConnection;
96+
9097
/**
9198
* Use {@link H2MultiplexingRequesterBootstrap} to create instances of this class.
9299
*/
@@ -100,11 +107,13 @@ public H2MultiplexingRequester(
100107
final Resolver<HttpHost, InetSocketAddress> addressResolver,
101108
final TlsStrategy tlsStrategy,
102109
final IOReactorMetricsListener threadPoolListener,
103-
final IOWorkerSelector workerSelector) {
110+
final IOWorkerSelector workerSelector,
111+
final int maxCommandsPerConnection) {
104112
super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener,
105113
ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE,
106114
threadPoolListener, workerSelector);
107115
this.connPool = new H2ConnPool(this, addressResolver, tlsStrategy);
116+
this.maxCommandsPerConnection = maxCommandsPerConnection;
108117
}
109118

110119
public void closeIdle(final TimeValue idleTime) {
@@ -245,6 +254,16 @@ public void failed(final Exception cause) {
245254
}
246255

247256
};
257+
final int max = maxCommandsPerConnection;
258+
if (max > 0) {
259+
final int current = ioSession.getPendingCommandCount();
260+
if (current >= 0 && current >= max) {
261+
exchangeHandler.failed(new RejectedExecutionException(
262+
"Maximum number of pending commands per connection reached (max=" + max + ")"));
263+
exchangeHandler.releaseResources();
264+
return;
265+
}
266+
}
248267
final Timeout socketTimeout = ioSession.getSocketTimeout();
249268
ioSession.enqueue(new RequestExecutionCommand(
250269
handlerProxy,
@@ -349,5 +368,4 @@ public final <T> Future<T> execute(
349368
public H2ConnPool getConnPool() {
350369
return connPool;
351370
}
352-
353371
}

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.ArrayList;
3030
import java.util.List;
3131

32+
import org.apache.hc.core5.annotation.Experimental;
3233
import org.apache.hc.core5.function.Callback;
3334
import org.apache.hc.core5.function.Decorator;
3435
import org.apache.hc.core5.function.Supplier;
@@ -76,6 +77,8 @@ public class H2MultiplexingRequesterBootstrap {
7677

7778
private IOReactorMetricsListener threadPoolListener;
7879

80+
private int maxCommandsPerConnection;
81+
7982
private H2MultiplexingRequesterBootstrap() {
8083
this.routeEntries = new ArrayList<>();
8184
}
@@ -180,6 +183,23 @@ public final H2MultiplexingRequesterBootstrap setIOReactorMetricsListener(final
180183
return this;
181184
}
182185

186+
/**
187+
* Sets a hard limit on the number of pending commands execution commands that can be queued per connection.
188+
* When the limit is reached, new submissions fail fast with {@link java.util.concurrent.RejectedExecutionException}.
189+
* A value {@code <= 0} disables the limit (default).
190+
* Note: this limit applies to commands waiting in the connection's internal queue (backlog). HTTP/2 in-flight
191+
* concurrency is governed separately by protocol settings (e.g. MAX_CONCURRENT_STREAMS).
192+
*
193+
* @param max maximum number of pending commands per connection; {@code <= 0} to disable the limit.
194+
* @return this instance.
195+
* @since 5.5
196+
*/
197+
@Experimental
198+
public final H2MultiplexingRequesterBootstrap setMaxCommandsPerConnection(final int max) {
199+
this.maxCommandsPerConnection = max;
200+
return this;
201+
}
202+
183203
/**
184204
* Sets {@link H2StreamListener} instance.
185205
*
@@ -274,7 +294,8 @@ public H2MultiplexingRequester create() {
274294
DefaultAddressResolver.INSTANCE,
275295
tlsStrategy != null ? tlsStrategy : new H2ClientTlsStrategy(),
276296
threadPoolListener,
277-
null);
297+
null,
298+
maxCommandsPerConnection);
278299
}
279300

280301
}

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ public class H2RequesterBootstrap {
104104
private ConnPoolListener<HttpHost> connPoolListener;
105105
private IOReactorMetricsListener threadPoolListener;
106106
private FrameFactory frameFactory;
107+
private int maxPendingCommandsPerConnection;
107108

108109

109110
private H2RequesterBootstrap() {
@@ -210,6 +211,11 @@ public final H2RequesterBootstrap setPoolConcurrencyPolicy(final PoolConcurrency
210211
return this;
211212
}
212213

214+
public final H2RequesterBootstrap setMaxPendingCommandsPerConnection(final int maxPendingCommandsPerConnection) {
215+
this.maxPendingCommandsPerConnection = maxPendingCommandsPerConnection;
216+
return this;
217+
}
218+
213219
/**
214220
* Sets {@link TlsStrategy} instance.
215221
*
@@ -433,7 +439,8 @@ public H2AsyncRequester create() {
433439
actualTlsStrategy,
434440
handshakeTimeout,
435441
threadPoolListener,
436-
null);
442+
null,
443+
maxPendingCommandsPerConnection);
437444
}
438445

439446
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
*/
27+
package org.apache.hc.core5.http2.impl.nio.bootstrap;
28+
29+
import java.net.InetSocketAddress;
30+
import java.util.concurrent.CountDownLatch;
31+
import java.util.concurrent.ExecutorService;
32+
import java.util.concurrent.Executors;
33+
import java.util.concurrent.RejectedExecutionException;
34+
import java.util.concurrent.ScheduledExecutorService;
35+
import java.util.concurrent.TimeUnit;
36+
import java.util.concurrent.atomic.AtomicInteger;
37+
38+
import org.apache.hc.core5.concurrent.FutureCallback;
39+
import org.apache.hc.core5.http.ContentType;
40+
import org.apache.hc.core5.http.EntityDetails;
41+
import org.apache.hc.core5.http.HttpHost;
42+
import org.apache.hc.core5.http.HttpRequest;
43+
import org.apache.hc.core5.http.HttpResponse;
44+
import org.apache.hc.core5.http.Message;
45+
import org.apache.hc.core5.http.URIScheme;
46+
import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
47+
import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
48+
import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
49+
import org.apache.hc.core5.http.nio.entity.DiscardingEntityConsumer;
50+
import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
51+
import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
52+
import org.apache.hc.core5.http.nio.support.AsyncResponseBuilder;
53+
import org.apache.hc.core5.http.nio.support.BasicRequestConsumer;
54+
import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
55+
import org.apache.hc.core5.http.protocol.HttpContext;
56+
import org.apache.hc.core5.http.protocol.HttpCoreContext;
57+
import org.apache.hc.core5.http2.HttpVersionPolicy;
58+
import org.apache.hc.core5.http2.config.H2Config;
59+
import org.apache.hc.core5.io.CloseMode;
60+
import org.apache.hc.core5.reactor.IOReactorConfig;
61+
import org.apache.hc.core5.reactor.ListenerEndpoint;
62+
import org.apache.hc.core5.util.Timeout;
63+
import org.junit.jupiter.api.Assertions;
64+
import org.junit.jupiter.api.Test;
65+
66+
class TestH2MultiplexingRequesterMaxRequestsPerConnection {
67+
68+
@Test
69+
@org.junit.jupiter.api.Timeout(value = 60, unit = TimeUnit.SECONDS)
70+
void testRejectsWhenLimitReached() throws Exception {
71+
final int maxPerConn = 2;
72+
final int totalRequests = 30;
73+
final Timeout timeout = Timeout.ofSeconds(30);
74+
final long serverDelayMillis = 5000;
75+
76+
final IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
77+
.setIoThreadCount(1)
78+
.build();
79+
80+
final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
81+
82+
final H2Config serverH2Config = H2Config.custom()
83+
.setPushEnabled(false)
84+
.setMaxConcurrentStreams(1) // force backlog
85+
.build();
86+
87+
final HttpAsyncServer server = H2ServerBootstrap.bootstrap()
88+
.setIOReactorConfig(ioReactorConfig)
89+
.setH2Config(serverH2Config)
90+
.setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2)
91+
.setCanonicalHostName("127.0.0.1") // avoid 421
92+
.register("*", new AsyncServerRequestHandler<Message<HttpRequest, Void>>() {
93+
94+
@Override
95+
public AsyncRequestConsumer<Message<HttpRequest, Void>> prepare(
96+
final HttpRequest request,
97+
final EntityDetails entityDetails,
98+
final HttpContext context) {
99+
return new BasicRequestConsumer<>(
100+
entityDetails != null ? new DiscardingEntityConsumer<>() : null);
101+
}
102+
103+
@Override
104+
public void handle(
105+
final Message<HttpRequest, Void> message,
106+
final ResponseTrigger responseTrigger,
107+
final HttpContext localContext) {
108+
109+
final HttpCoreContext context = HttpCoreContext.cast(localContext);
110+
final String path = message.getHead().getPath();
111+
112+
final Runnable task = () -> {
113+
try {
114+
responseTrigger.submitResponse(
115+
AsyncResponseBuilder.create(200)
116+
.setEntity("ok\n", ContentType.TEXT_PLAIN)
117+
.build(),
118+
context);
119+
} catch (final Exception ignore) {
120+
// ignore
121+
}
122+
};
123+
124+
if ("/warmup".equals(path)) {
125+
task.run();
126+
} else {
127+
scheduler.schedule(task, serverDelayMillis, TimeUnit.MILLISECONDS);
128+
}
129+
}
130+
131+
})
132+
.create();
133+
134+
server.start();
135+
final ListenerEndpoint ep = server.listen(new InetSocketAddress("127.0.0.1", 0), URIScheme.HTTP).get();
136+
final int port = ((InetSocketAddress) ep.getAddress()).getPort();
137+
138+
final H2MultiplexingRequester requester = H2MultiplexingRequesterBootstrap.bootstrap()
139+
.setIOReactorConfig(ioReactorConfig)
140+
.setH2Config(H2Config.custom().setPushEnabled(false).build())
141+
.setMaxCommandsPerConnection(maxPerConn)
142+
.create();
143+
144+
145+
requester.start();
146+
147+
try {
148+
final HttpHost target = new HttpHost("http", "127.0.0.1", port);
149+
150+
// Warmup
151+
requester.execute(
152+
target,
153+
AsyncRequestBuilder.get().setHttpHost(target).setPath("/warmup").build(),
154+
new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
155+
timeout,
156+
HttpCoreContext.create(),
157+
null).get();
158+
159+
final AtomicInteger ok = new AtomicInteger(0);
160+
final AtomicInteger rejected = new AtomicInteger(0);
161+
final AtomicInteger failed = new AtomicInteger(0);
162+
163+
final CountDownLatch done = new CountDownLatch(totalRequests);
164+
final CountDownLatch start = new CountDownLatch(1);
165+
final ExecutorService exec = Executors.newFixedThreadPool(16);
166+
167+
for (int i = 0; i < totalRequests; i++) {
168+
final int id = i;
169+
exec.execute(new Runnable() {
170+
@Override
171+
public void run() {
172+
try {
173+
start.await();
174+
} catch (final InterruptedException e) {
175+
Thread.currentThread().interrupt();
176+
done.countDown();
177+
return;
178+
}
179+
180+
requester.execute(
181+
target,
182+
AsyncRequestBuilder.get().setHttpHost(target).setPath("/slow?i=" + id).build(),
183+
new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
184+
timeout,
185+
HttpCoreContext.create(),
186+
new FutureCallback<Message<HttpResponse, String>>() {
187+
188+
@Override
189+
public void completed(final Message<HttpResponse, String> message) {
190+
ok.incrementAndGet();
191+
done.countDown();
192+
}
193+
194+
@Override
195+
public void failed(final Exception ex) {
196+
if (ex instanceof RejectedExecutionException) {
197+
rejected.incrementAndGet();
198+
} else {
199+
failed.incrementAndGet();
200+
}
201+
done.countDown();
202+
}
203+
204+
@Override
205+
public void cancelled() {
206+
failed.incrementAndGet();
207+
done.countDown();
208+
}
209+
});
210+
}
211+
});
212+
}
213+
214+
start.countDown();
215+
216+
final boolean allDone = done.await(60, TimeUnit.SECONDS);
217+
exec.shutdownNow();
218+
219+
Assertions.assertTrue(allDone, "Timed out");
220+
Assertions.assertEquals(totalRequests, ok.get() + rejected.get() + failed.get());
221+
Assertions.assertTrue(rejected.get() > 0, "Expected at least one RejectedExecutionException");
222+
Assertions.assertEquals(0, failed.get(), "Unexpected non-rejection failures: " + failed.get());
223+
} finally {
224+
requester.close(CloseMode.GRACEFUL);
225+
server.close(CloseMode.GRACEFUL);
226+
scheduler.shutdownNow();
227+
}
228+
}
229+
}

0 commit comments

Comments
 (0)