queue;
+ private final AtomicBoolean draining;
+
+ SharedRequestExecutionQueue(final int maxConcurrent) {
+ if (maxConcurrent <= 0) {
+ throw new IllegalArgumentException("maxConcurrent must be > 0");
+ }
+ this.maxConcurrent = maxConcurrent;
+ this.inFlight = new AtomicInteger(0);
+ this.queue = new ConcurrentLinkedQueue<>();
+ this.draining = new AtomicBoolean(false);
+ }
+
+ Cancellable enqueue(final Runnable task, final Runnable onCancel) {
+ Args.notNull(task, "task");
+ Args.notNull(onCancel, "onCancel");
+
+ final Entry entry = new Entry(task, onCancel);
+ queue.add(entry);
+ drain();
+ return entry;
+ }
+
+ void completed() {
+ inFlight.decrementAndGet();
+ drain();
+ }
+
+ private void drain() {
+ for (;;) {
+ if (!draining.compareAndSet(false, true)) {
+ return;
+ }
+ try {
+ while (inFlight.get() < maxConcurrent) {
+ final Entry entry = queue.poll();
+ if (entry == null) {
+ break;
+ }
+ if (entry.isCancelled()) {
+ continue;
+ }
+ entry.markStarted();
+ inFlight.incrementAndGet();
+ entry.run();
+ }
+ } finally {
+ draining.set(false);
+ }
+ if (inFlight.get() >= maxConcurrent || queue.isEmpty()) {
+ return;
+ }
+ }
+ }
+
+
+ private static final class Entry implements Cancellable {
+
+ private final Runnable task;
+ private final Runnable onCancel;
+ private final AtomicBoolean started;
+ private final AtomicBoolean cancelled;
+
+ Entry(final Runnable task, final Runnable onCancel) {
+ this.task = task;
+ this.onCancel = onCancel;
+ this.started = new AtomicBoolean(false);
+ this.cancelled = new AtomicBoolean(false);
+ }
+
+ void run() {
+ task.run();
+ }
+
+ void markStarted() {
+ started.set(true);
+ }
+
+ boolean isCancelled() {
+ return cancelled.get();
+ }
+
+ @Override
+ public boolean cancel() {
+ if (started.get()) {
+ return false;
+ }
+ if (cancelled.compareAndSet(false, true)) {
+ onCancel.run();
+ return true;
+ }
+ return false;
+ }
+ }
+
+}
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncSharedClientQueueLocalExample.java b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncSharedClientQueueLocalExample.java
new file mode 100644
index 0000000000..916fde5eec
--- /dev/null
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncSharedClientQueueLocalExample.java
@@ -0,0 +1,255 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.examples;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
+import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
+import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
+import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
+import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.URIScheme;
+import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
+import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
+import org.apache.hc.core5.http.message.StatusLine;
+import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
+import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
+import org.apache.hc.core5.http.nio.entity.DiscardingEntityConsumer;
+import org.apache.hc.core5.http.nio.support.AsyncResponseBuilder;
+import org.apache.hc.core5.http.nio.support.BasicRequestConsumer;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.http.protocol.HttpCoreContext;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.reactor.ListenerEndpoint;
+
+/**
+ * Demonstrates client-level request throttling for the async client using a shared FIFO queue.
+ *
+ * The example configures {@code setMaxQueuedRequests(int)} to cap the number of concurrently executing requests per
+ * client instance. Requests submitted beyond that limit are queued in FIFO order and executed when the number of
+ * in-flight requests drops below the configured maximum.
+ *
+ * @since 5.7
+ */
+public final class AsyncSharedClientQueueLocalExample {
+
+ public static void main(final String[] args) throws Exception {
+ final int maxConcurrentPerClient = 2; // shared per-client cap
+ final int totalRequests = 50;
+
+ final IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
+ .setIoThreadCount(1)
+ .build();
+
+ final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+ final AtomicInteger serverInFlight = new AtomicInteger(0);
+ final AtomicInteger serverMaxInFlight = new AtomicInteger(0);
+
+ final HttpAsyncServer server = AsyncServerBootstrap.bootstrap()
+ .setIOReactorConfig(ioReactorConfig)
+ .setCanonicalHostName("127.0.0.1")
+ .register("*", new AsyncServerRequestHandler>() {
+
+ @Override
+ public AsyncRequestConsumer> prepare(
+ final HttpRequest request,
+ final EntityDetails entityDetails,
+ final HttpContext context) {
+ return new BasicRequestConsumer<>(
+ entityDetails != null ? new DiscardingEntityConsumer<>() : null);
+ }
+
+ @Override
+ public void handle(
+ final Message message,
+ final ResponseTrigger responseTrigger,
+ final HttpContext localContext) {
+
+ final HttpCoreContext context = HttpCoreContext.cast(localContext);
+
+ final int cur = serverInFlight.incrementAndGet();
+ serverMaxInFlight.updateAndGet(prev -> Math.max(prev, cur));
+
+ scheduler.schedule(() -> {
+ try {
+ responseTrigger.submitResponse(
+ AsyncResponseBuilder.create(200)
+ .setEntity("ok\n", ContentType.TEXT_PLAIN)
+ .build(),
+ context);
+ } catch (final Exception ex) {
+ try {
+ responseTrigger.submitResponse(
+ AsyncResponseBuilder.create(500)
+ .setEntity(ex.toString(), ContentType.TEXT_PLAIN)
+ .build(),
+ context);
+ } catch (final Exception ignore) {
+ // ignore
+ }
+ } finally {
+ serverInFlight.decrementAndGet();
+ }
+ }, 200, TimeUnit.MILLISECONDS);
+ }
+
+ })
+ .create();
+
+ server.start();
+ final ListenerEndpoint ep = server.listen(new InetSocketAddress("127.0.0.1", 0), URIScheme.HTTP).get();
+ final int port = ((InetSocketAddress) ep.getAddress()).getPort();
+ System.out.println("server on 127.0.0.1:" + port);
+
+ final PoolingAsyncClientConnectionManager cm = PoolingAsyncClientConnectionManagerBuilder.create()
+ .setMaxConnTotal(100)
+ .setMaxConnPerRoute(100)
+ .build();
+
+ final CloseableHttpAsyncClient client = HttpAsyncClients.custom()
+ .setIOReactorConfig(ioReactorConfig)
+ .setConnectionManager(cm)
+ // This is the knob you’re implementing as "max concurrent per client + shared FIFO queue".
+ .setMaxQueuedRequests(maxConcurrentPerClient)
+ .build();
+
+ client.start();
+
+ // warmup
+ final SimpleHttpRequest warmup = SimpleRequestBuilder.get("http://127.0.0.1:" + port + "/warmup").build();
+ final CountDownLatch warmupLatch = new CountDownLatch(1);
+ client.execute(
+ SimpleRequestProducer.create(warmup),
+ SimpleResponseConsumer.create(),
+ new FutureCallback() {
+ @Override
+ public void completed(final SimpleHttpResponse result) {
+ System.out.println("warmup -> " + new StatusLine(result));
+ warmupLatch.countDown();
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ System.out.println("warmup failed -> " + ex);
+ warmupLatch.countDown();
+ }
+
+ @Override
+ public void cancelled() {
+ System.out.println("warmup cancelled");
+ warmupLatch.countDown();
+ }
+ });
+ warmupLatch.await();
+
+ final AtomicInteger ok = new AtomicInteger(0);
+ final AtomicInteger failed = new AtomicInteger(0);
+ final CountDownLatch latch = new CountDownLatch(totalRequests);
+
+ final ExecutorService exec = Executors.newFixedThreadPool(Math.min(16, totalRequests));
+ final long t0 = System.nanoTime();
+
+ for (int i = 0; i < totalRequests; i++) {
+ final int id = i;
+ exec.execute(() -> {
+ final SimpleHttpRequest req = SimpleRequestBuilder
+ .get("http://127.0.0.1:" + port + "/slow?i=" + id)
+ .build();
+
+ client.execute(
+ SimpleRequestProducer.create(req),
+ SimpleResponseConsumer.create(),
+ new FutureCallback() {
+
+ @Override
+ public void completed(final SimpleHttpResponse result) {
+ if (result.getCode() == 200) {
+ ok.incrementAndGet();
+ } else {
+ failed.incrementAndGet();
+ System.out.println("FAILED i=" + id + " -> " + new StatusLine(result));
+ }
+ latch.countDown();
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ failed.incrementAndGet();
+ System.out.println("FAILED i=" + id + " -> " + ex);
+ latch.countDown();
+ }
+
+ @Override
+ public void cancelled() {
+ failed.incrementAndGet();
+ System.out.println("CANCELLED i=" + id);
+ latch.countDown();
+ }
+
+ });
+ });
+ }
+
+ final boolean done = latch.await(120, TimeUnit.SECONDS);
+ final long t1 = System.nanoTime();
+
+ exec.shutdownNow();
+
+ System.out.println("done=" + done
+ + " ok=" + ok.get()
+ + " failed=" + failed.get()
+ + " clientCap=" + maxConcurrentPerClient
+ + " serverMaxInFlightSeen=" + serverMaxInFlight.get()
+ + " elapsedMs=" + TimeUnit.NANOSECONDS.toMillis(t1 - t0));
+
+ client.close(CloseMode.GRACEFUL);
+ server.close(CloseMode.GRACEFUL);
+ scheduler.shutdownNow();
+ }
+
+ private AsyncSharedClientQueueLocalExample() {
+ }
+
+}
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/H2SharedClientQueueLocalExample.java b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/H2SharedClientQueueLocalExample.java
new file mode 100644
index 0000000000..8ba14d0c4d
--- /dev/null
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/H2SharedClientQueueLocalExample.java
@@ -0,0 +1,260 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.examples;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
+import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
+import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.client5.http.impl.async.H2AsyncClientBuilder;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.URIScheme;
+import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
+import org.apache.hc.core5.http.message.StatusLine;
+import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
+import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
+import org.apache.hc.core5.http.nio.entity.DiscardingEntityConsumer;
+import org.apache.hc.core5.http.nio.support.AsyncResponseBuilder;
+import org.apache.hc.core5.http.nio.support.BasicRequestConsumer;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.http.protocol.HttpCoreContext;
+import org.apache.hc.core5.http2.HttpVersionPolicy;
+import org.apache.hc.core5.http2.config.H2Config;
+import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.reactor.ListenerEndpoint;
+
+
+/**
+ * Demonstrates client-level request throttling for the HTTP/2 async client using a shared FIFO queue.
+ *
+ * The example configures {@link org.apache.hc.client5.http.impl.async.H2AsyncClientBuilder#setMaxQueuedRequests(int)}
+ * to cap the number of concurrently executing requests per client instance. Requests submitted beyond that limit are
+ * queued in FIFO order and executed as soon as the number of in-flight requests drops below the configured maximum.
+ *
+ * A local HTTP/2 server is started with a permissive {@code maxConcurrentStreams} setting so that the client-side cap
+ * is the limiting factor. The example then submits multiple requests and prints summary statistics, including the
+ * maximum number of concurrent server requests observed.
+ *
+ * @since 5.7
+ */
+public final class H2SharedClientQueueLocalExample {
+
+
+ public static void main(final String[] args) throws Exception {
+ final int maxConcurrentPerClient = 2; // <-- the feature under test
+ final int totalRequests = 50;
+
+ final IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
+ .setIoThreadCount(1)
+ .build();
+
+ final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+ final AtomicInteger serverInFlight = new AtomicInteger(0);
+ final AtomicInteger serverMaxInFlight = new AtomicInteger(0);
+
+ final H2Config serverH2Config = H2Config.custom()
+ .setPushEnabled(false)
+ // Keep server permissive. We want the *client* cap to be the limiting factor.
+ .setMaxConcurrentStreams(256)
+ .build();
+
+ final HttpAsyncServer server = H2ServerBootstrap.bootstrap()
+ .setIOReactorConfig(ioReactorConfig)
+ .setH2Config(serverH2Config)
+ .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2)
+ .setCanonicalHostName("127.0.0.1")
+ .register("*", new AsyncServerRequestHandler>() {
+
+ @Override
+ public AsyncRequestConsumer> prepare(
+ final HttpRequest request,
+ final EntityDetails entityDetails,
+ final HttpContext context) {
+ return new BasicRequestConsumer<>(entityDetails != null ? new DiscardingEntityConsumer<>() : null);
+ }
+
+ @Override
+ public void handle(
+ final Message message,
+ final ResponseTrigger responseTrigger,
+ final HttpContext localContext) {
+
+ final HttpCoreContext context = HttpCoreContext.cast(localContext);
+
+ final int cur = serverInFlight.incrementAndGet();
+ serverMaxInFlight.updateAndGet(prev -> Math.max(prev, cur));
+
+ scheduler.schedule(() -> {
+ try {
+ responseTrigger.submitResponse(
+ AsyncResponseBuilder.create(200)
+ .setEntity("ok\n", ContentType.TEXT_PLAIN)
+ .build(),
+ context);
+ } catch (final Exception ex) {
+ try {
+ responseTrigger.submitResponse(
+ AsyncResponseBuilder.create(500)
+ .setEntity(ex.toString(), ContentType.TEXT_PLAIN)
+ .build(),
+ context);
+ } catch (final Exception ignore) {
+ // ignore
+ }
+ } finally {
+ serverInFlight.decrementAndGet();
+ }
+ }, 200, TimeUnit.MILLISECONDS);
+ }
+
+ })
+ .create();
+
+ server.start();
+ final ListenerEndpoint ep = server.listen(new InetSocketAddress("127.0.0.1", 0), URIScheme.HTTP).get();
+ final int port = ((InetSocketAddress) ep.getAddress()).getPort();
+ System.out.println("server on 127.0.0.1:" + port);
+
+ final CloseableHttpAsyncClient client = H2AsyncClientBuilder.create()
+ .setIOReactorConfig(ioReactorConfig)
+ .setH2Config(H2Config.DEFAULT)
+ // IMPORTANT: this is the knob you’re implementing as “max concurrent per client + shared FIFO queue”.
+ .setMaxQueuedRequests(maxConcurrentPerClient)
+ .build();
+
+ client.start();
+
+ // warmup
+ final SimpleHttpRequest warmup = SimpleRequestBuilder.get("http://127.0.0.1:" + port + "/warmup").build();
+ final CountDownLatch warmupLatch = new CountDownLatch(1);
+ client.execute(
+ SimpleRequestProducer.create(warmup),
+ SimpleResponseConsumer.create(),
+ new FutureCallback() {
+ @Override
+ public void completed(final SimpleHttpResponse result) {
+ System.out.println("warmup -> " + new StatusLine(result));
+ warmupLatch.countDown();
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ System.out.println("warmup failed -> " + ex);
+ warmupLatch.countDown();
+ }
+
+ @Override
+ public void cancelled() {
+ System.out.println("warmup cancelled");
+ warmupLatch.countDown();
+ }
+ });
+ warmupLatch.await();
+
+ final AtomicInteger ok = new AtomicInteger(0);
+ final AtomicInteger failed = new AtomicInteger(0);
+ final CountDownLatch latch = new CountDownLatch(totalRequests);
+
+ final ExecutorService exec = Executors.newFixedThreadPool(Math.min(16, totalRequests));
+ final long t0 = System.nanoTime();
+
+ for (int i = 0; i < totalRequests; i++) {
+ final int id = i;
+ exec.execute(() -> {
+ final SimpleHttpRequest req = SimpleRequestBuilder
+ .get("http://127.0.0.1:" + port + "/slow?i=" + id)
+ .build();
+
+ client.execute(
+ SimpleRequestProducer.create(req),
+ SimpleResponseConsumer.create(),
+ new FutureCallback() {
+ @Override
+ public void completed(final SimpleHttpResponse result) {
+ if (result.getCode() == 200) {
+ ok.incrementAndGet();
+ } else {
+ failed.incrementAndGet();
+ System.out.println("FAILED i=" + id + " -> " + new StatusLine(result));
+ }
+ latch.countDown();
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ failed.incrementAndGet();
+ System.out.println("FAILED i=" + id + " -> " + ex);
+ latch.countDown();
+ }
+
+ @Override
+ public void cancelled() {
+ failed.incrementAndGet();
+ System.out.println("CANCELLED i=" + id);
+ latch.countDown();
+ }
+ });
+ });
+ }
+
+ final boolean done = latch.await(120, TimeUnit.SECONDS);
+ final long t1 = System.nanoTime();
+
+ exec.shutdownNow();
+
+ System.out.println("done=" + done
+ + " ok=" + ok.get()
+ + " failed=" + failed.get()
+ + " maxInFlightSeen=" + serverMaxInFlight.get()
+ + " elapsedMs=" + TimeUnit.NANOSECONDS.toMillis(t1 - t0));
+
+ client.close(CloseMode.GRACEFUL);
+ server.close(CloseMode.GRACEFUL);
+ scheduler.shutdownNow();
+ }
+
+ private H2SharedClientQueueLocalExample() {
+ }
+
+}
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilderMaxQueuedRequestsTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilderMaxQueuedRequestsTest.java
index f37bd43c71..15848db9e4 100644
--- a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilderMaxQueuedRequestsTest.java
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilderMaxQueuedRequestsTest.java
@@ -29,13 +29,14 @@
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.io.IOException;
import java.net.SocketAddress;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.client5.http.EndpointInfo;
@@ -120,8 +121,14 @@ public void cancelled() {
}
});
- assertTrue(latch.await(2, TimeUnit.SECONDS), "rejection should arrive quickly");
- assertInstanceOf(RejectedExecutionException.class, failure.get(), "Expected RejectedExecutionException, got: " + failure.get());
+ // With queueing semantics, r2 is queued and must not complete until r1 is finished.
+ assertTrue(!latch.await(200, TimeUnit.MILLISECONDS), "second request should be queued");
+
+ // Finish r1, release the slot -> r2 should now execute and fail (see BlockingEndpoint.execute()).
+ endpoint.failFirst(new IOException("release-slot"));
+
+ assertTrue(latch.await(2, TimeUnit.SECONDS), "second request should complete after slot released");
+ assertInstanceOf(IOException.class, failure.get(), "Expected IOException, got: " + failure.get());
}
}
@@ -179,11 +186,27 @@ public void close() {
private static final class BlockingEndpoint extends AsyncConnectionEndpoint {
volatile boolean connected = true;
+ private final AtomicInteger execCount = new AtomicInteger(0);
+ private final AtomicReference first = new AtomicReference<>();
+
@Override
public void execute(final String id,
final AsyncClientExchangeHandler handler,
final HandlerFactory pushHandlerFactory,
final HttpContext context) {
+ final int n = execCount.incrementAndGet();
+ if (n == 1) {
+ first.set(handler); // keep slot occupied
+ } else {
+ handler.failed(new IOException("queued request executed"));
+ }
+ }
+
+ void failFirst(final Exception ex) {
+ final AsyncClientExchangeHandler h = first.getAndSet(null);
+ if (h != null) {
+ h.failed(ex);
+ }
}
@Override
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntimeQueueCapTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntimeQueueCapTest.java
index 0309d8710d..7a63e863f0 100644
--- a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntimeQueueCapTest.java
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntimeQueueCapTest.java
@@ -37,7 +37,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -68,16 +67,17 @@
public class InternalH2AsyncExecRuntimeQueueCapTest {
- private static InternalH2AsyncExecRuntime newRuntime(final int maxQueued) {
+ private static InternalH2AsyncExecRuntime newRuntime(final int maxConcurrent) {
final IOSession ioSession = newImmediateFailSession();
final FakeH2ConnPool connPool = new FakeH2ConnPool(ioSession);
- final AtomicInteger queued = maxQueued > 0 ? new AtomicInteger(0) : null;
+
+ final SharedRequestExecutionQueue queue = maxConcurrent > 0 ? new SharedRequestExecutionQueue(maxConcurrent) : null;
+
return new InternalH2AsyncExecRuntime(
LoggerFactory.getLogger("test"),
connPool,
new NoopPushFactory(),
- maxQueued,
- queued);
+ queue);
}
private static void acquireEndpoint(
@@ -109,19 +109,19 @@ public void cancelled() {
}
/**
- * With no cap (maxQueued <= 0) the recursive re-entry path should blow the stack.
- * This documents the pathological behaviour without queue protection.
+ * With no cap / no queue, the re-entrant execute path can recurse until SOE
+ * if failures are delivered synchronously.
*/
@Test
void testRecursiveReentryCausesSOEWithoutCap() throws Exception {
- final InternalH2AsyncExecRuntime runtime = newRuntime(-1);
+ final InternalH2AsyncExecRuntime runtime = newRuntime(0);
final HttpClientContext ctx = HttpClientContext.create();
ctx.setRequestConfig(RequestConfig.custom().build());
acquireEndpoint(runtime, ctx);
- final ReentrantHandler loop = new ReentrantHandler(runtime, ctx);
+ final ReentrantHandler loop = new ReentrantHandler(runtime, ctx, Integer.MAX_VALUE);
assertThrows(StackOverflowError.class, () -> {
runtime.execute("loop", loop, ctx);
@@ -129,8 +129,8 @@ void testRecursiveReentryCausesSOEWithoutCap() throws Exception {
}
/**
- * With a cap of 1, the second re-entrant execute call must be rejected and
- * the recursion broken.
+ * With cap=1 and QUEUE semantics, re-entrant submissions should be queued,
+ * not executed inline recursively. This test must NOT expect rejection.
*/
@Test
void testCapBreaksRecursiveReentry() throws Exception {
@@ -141,14 +141,13 @@ void testCapBreaksRecursiveReentry() throws Exception {
acquireEndpoint(runtime, ctx);
- final ReentrantHandler loop = new ReentrantHandler(runtime, ctx);
+ final int maxAttempts = 200;
+ final ReentrantHandler loop = new ReentrantHandler(runtime, ctx, maxAttempts);
runtime.execute("loop", loop, ctx);
- // immediate fail path runs synchronously; small wait is just defensive
- Thread.sleep(50);
- assertTrue(loop.lastException.get() instanceof RejectedExecutionException,
- "Expected queue rejection to break recursion");
+ assertTrue(loop.done.await(2, TimeUnit.SECONDS), "expected bounded re-entry loop to terminate");
+ assertTrue(loop.attempts.get() >= maxAttempts, "expected at least " + maxAttempts + " attempts, got " + loop.attempts.get());
}
/**
@@ -168,8 +167,7 @@ public Future getSession(
final HttpRoute route,
final Timeout timeout,
final FutureCallback callback) {
- final CompletableFuture cf = new CompletableFuture<>();
- cf.complete(session);
+ final CompletableFuture cf = CompletableFuture.completedFuture(session);
if (callback != null) {
callback.completed(session);
}
@@ -228,19 +226,33 @@ private static final class ReentrantHandler implements AsyncClientExchangeHandle
private final InternalH2AsyncExecRuntime runtime;
private final HttpClientContext context;
- final AtomicReference lastException = new AtomicReference<>();
+ private final int maxAttempts;
+
+ final AtomicInteger attempts;
+ final AtomicReference lastException;
+ final CountDownLatch done;
- ReentrantHandler(final InternalH2AsyncExecRuntime runtime, final HttpClientContext context) {
+ ReentrantHandler(final InternalH2AsyncExecRuntime runtime, final HttpClientContext context, final int maxAttempts) {
this.runtime = runtime;
this.context = context;
+ this.maxAttempts = maxAttempts;
+ this.attempts = new AtomicInteger(0);
+ this.lastException = new AtomicReference<>();
+ this.done = new CountDownLatch(1);
}
@Override
public void failed(final Exception cause) {
lastException.set(cause);
- if (!(cause instanceof RejectedExecutionException)) {
- runtime.execute("loop/reenter", this, context);
+
+ final int n = attempts.incrementAndGet();
+ if (n >= maxAttempts) {
+ done.countDown();
+ return;
}
+
+ // Re-enter. With QUEUE cap=1 this must not recurse inline until SOE.
+ runtime.execute("loop/reenter/" + n, this, context);
}
@Override
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntimeQueueCapTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntimeQueueCapTest.java
index 6aae8413af..0a00c17fbf 100644
--- a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntimeQueueCapTest.java
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntimeQueueCapTest.java
@@ -27,6 +27,7 @@
package org.apache.hc.client5.http.impl.async;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -41,7 +42,6 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -79,112 +79,163 @@
public class InternalHttpAsyncExecRuntimeQueueCapTest {
@Test
- void testFailFastWhenQueueFull() throws Exception {
+ void testRequestsQueuedWhenOverCap() throws Exception {
final FakeEndpoint endpoint = new FakeEndpoint();
final FakeManager manager = new FakeManager(endpoint);
+
final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime(
LoggerFactory.getLogger("test"),
manager,
new NoopInitiator(),
new NoopPushFactory(),
TlsConfig.DEFAULT,
- 2,
- new AtomicInteger()
- );
+ new SharedRequestExecutionQueue(2));
final HttpClientContext ctx = HttpClientContext.create();
ctx.setRequestConfig(RequestConfig.custom().build());
- runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx, new FutureCallback() {
- @Override
- public void completed(final AsyncExecRuntime result) {
- }
-
- @Override
- public void failed(final Exception ex) {
- fail(ex);
- }
-
- @Override
- public void cancelled() {
- fail("cancelled");
- }
- });
-
- final CountDownLatch rejected = new CountDownLatch(1);
+ runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx, new NoopRuntimeCallback());
final LatchingHandler h1 = new LatchingHandler();
final LatchingHandler h2 = new LatchingHandler();
+ final LatchingHandler h3 = new LatchingHandler();
+
runtime.execute("r1", h1, ctx);
runtime.execute("r2", h2, ctx);
-
- final LatchingHandler h3 = new LatchingHandler() {
- @Override
- public void failed(final Exception cause) {
- super.failed(cause);
- rejected.countDown();
- }
- };
runtime.execute("r3", h3, ctx);
- assertTrue(rejected.await(2, TimeUnit.SECONDS), "r3 should be failed fast");
- assertTrue(h3.failedException.get() instanceof RejectedExecutionException);
+ assertTrue(waitFor(() -> endpoint.executedCount() == 2, 2, TimeUnit.SECONDS), "r1 and r2 should start");
+ assertTrue(endpoint.executedIds.contains("r1"));
+ assertTrue(endpoint.executedIds.contains("r2"));
+ assertTrue(!endpoint.executedIds.contains("r3"), "r3 should be queued, not started yet");
+
+ endpoint.completeOne(); // releases a slot and should trigger queued r3
+
+ assertTrue(waitFor(() -> endpoint.executedIds.contains("r3"), 2, TimeUnit.SECONDS), "r3 should start after slot release");
+
assertNull(h1.failedException.get());
assertNull(h2.failedException.get());
+ assertNull(h3.failedException.get());
}
@Test
void testSlotReleasedOnTerminalSignalAllowsNext() throws Exception {
final FakeEndpoint endpoint = new FakeEndpoint();
final FakeManager manager = new FakeManager(endpoint);
+
final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime(
LoggerFactory.getLogger("test"),
manager,
new NoopInitiator(),
new NoopPushFactory(),
TlsConfig.DEFAULT,
- 1,
- new AtomicInteger()
- );
+ new SharedRequestExecutionQueue(1));
final HttpClientContext ctx = HttpClientContext.create();
ctx.setRequestConfig(RequestConfig.custom().build());
- runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx,
- new FutureCallback() {
- @Override
- public void completed(final AsyncExecRuntime result) {
- }
-
- @Override
- public void failed(final Exception ex) {
- fail(ex);
- }
-
- @Override
- public void cancelled() {
- fail("cancelled");
- }
- });
+ runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx, new NoopRuntimeCallback());
final LatchingHandler h1 = new LatchingHandler();
- runtime.execute("r1", h1, ctx);
-
final LatchingHandler h2 = new LatchingHandler();
+
+ runtime.execute("r1", h1, ctx);
runtime.execute("r2", h2, ctx);
- assertTrue(h2.awaitFailed(2, TimeUnit.SECONDS));
- assertTrue(h2.failedException.get() instanceof RejectedExecutionException);
- // free the slot via releaseResources(), not failed()
+ assertTrue(waitFor(() -> endpoint.executedIds.contains("r1"), 2, TimeUnit.SECONDS));
+ assertTrue(!endpoint.executedIds.contains("r2"), "r2 should be queued until r1 completes");
+
endpoint.completeOne();
- final LatchingHandler h3 = new LatchingHandler();
- runtime.execute("r3", h3, ctx);
- Thread.sleep(150);
- assertNull(h3.failedException.get(), "r3 should not be rejected after slot released");
- h3.cancel();
+ assertTrue(waitFor(() -> endpoint.executedIds.contains("r2"), 2, TimeUnit.SECONDS), "r2 should start after r1 completes");
+
+ assertNull(h1.failedException.get());
+ assertNull(h2.failedException.get());
}
+ @Test
+ void testRecursiveReentryCausesSOEWithoutCap() {
+ final ImmediateFailEndpoint endpoint = new ImmediateFailEndpoint();
+ final FakeManager manager = new FakeManager(endpoint);
+
+ // no queue => old synchronous recursion behaviour remains
+ final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime(
+ LoggerFactory.getLogger("test"),
+ manager,
+ new NoopInitiator(),
+ new NoopPushFactory(),
+ TlsConfig.DEFAULT,
+ null);
+
+ final HttpClientContext ctx = HttpClientContext.create();
+ ctx.setRequestConfig(RequestConfig.custom().build());
+
+ runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx, new NoopRuntimeCallback());
+
+ final ReentrantHandler loop = new ReentrantHandler(runtime, ctx);
+
+ assertThrows(StackOverflowError.class, () -> runtime.execute("loop", loop, ctx));
+ }
+
+ @Test
+ void testCapBreaksRecursiveReentry() throws Exception {
+ final ImmediateFailEndpoint endpoint = new ImmediateFailEndpoint();
+ final FakeManager manager = new FakeManager(endpoint);
+
+ // queue => no synchronous recursion -> no SOE
+ final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime(
+ LoggerFactory.getLogger("test"),
+ manager,
+ new NoopInitiator(),
+ new NoopPushFactory(),
+ TlsConfig.DEFAULT,
+ new SharedRequestExecutionQueue(1));
+
+ final HttpClientContext ctx = HttpClientContext.create();
+ ctx.setRequestConfig(RequestConfig.custom().build());
+
+ runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx, new NoopRuntimeCallback());
+
+ final CountDownLatch done = new CountDownLatch(1);
+ final BoundedReentrantHandler loop = new BoundedReentrantHandler(runtime, ctx, 50, done);
+
+ assertDoesNotThrow(() -> runtime.execute("loop", loop, ctx));
+ assertTrue(done.await(2, TimeUnit.SECONDS), "Expected bounded re-entry loop to complete without SOE");
+ assertTrue(loop.invocations.get() >= 1);
+ assertTrue(loop.lastException.get() instanceof IOException);
+ }
+
+ private static boolean waitFor(final Condition condition, final long time, final TimeUnit unit) throws InterruptedException {
+ final long deadline = System.nanoTime() + unit.toNanos(time);
+ while (System.nanoTime() < deadline) {
+ if (condition.get()) {
+ return true;
+ }
+ Thread.sleep(10);
+ }
+ return condition.get();
+ }
+
+ @FunctionalInterface
+ private interface Condition {
+ boolean get();
+ }
+
+ private static final class NoopRuntimeCallback implements FutureCallback {
+ @Override
+ public void completed(final AsyncExecRuntime result) {
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ fail(ex);
+ }
+
+ @Override
+ public void cancelled() {
+ fail("cancelled");
+ }
+ }
private static final class NoopInitiator implements ConnectionInitiator {
@Override
@@ -267,36 +318,37 @@ public void close() {
private static final class FakeEndpoint extends AsyncConnectionEndpoint {
volatile boolean connected = true;
- private final ConcurrentLinkedQueue inFlight = new ConcurrentLinkedQueue<>();
+
+ private static final class InFlightEntry {
+ final String id;
+ final AsyncClientExchangeHandler handler;
+
+ InFlightEntry(final String id, final AsyncClientExchangeHandler handler) {
+ this.id = id;
+ this.handler = handler;
+ }
+ }
+
+ final ConcurrentLinkedQueue executedIds = new ConcurrentLinkedQueue<>();
+ private final ConcurrentLinkedQueue inFlight = new ConcurrentLinkedQueue<>();
@Override
public void execute(final String id,
final AsyncClientExchangeHandler handler,
final HandlerFactory pushHandlerFactory,
final HttpContext context) {
- // keep the guarded handler so tests can signal terminal events
- inFlight.add(handler);
- }
-
- // helpers for tests
- void failOne(final Exception ex) {
- final AsyncClientExchangeHandler h = inFlight.poll();
- if (h != null) {
- h.failed(ex);
- }
+ executedIds.add(id);
+ inFlight.add(new InFlightEntry(id, handler));
}
- void cancelOne() {
- final AsyncClientExchangeHandler h = inFlight.poll();
- if (h != null) {
- h.cancel();
- }
+ int executedCount() {
+ return executedIds.size();
}
void completeOne() {
- final AsyncClientExchangeHandler h = inFlight.poll();
- if (h != null) {
- h.releaseResources();
+ final InFlightEntry e = inFlight.poll();
+ if (e != null) {
+ e.handler.releaseResources();
}
}
@@ -320,8 +372,6 @@ public EndpointInfo getInfo() {
}
}
-
-
private static class LatchingHandler implements AsyncClientExchangeHandler {
final AtomicReference failedException = new AtomicReference<>();
final CountDownLatch failLatch = new CountDownLatch(1);
@@ -378,94 +428,6 @@ public void failed(final Exception cause) {
}
}
- @Test
- void testRecursiveReentryCausesSOEWithoutCap() {
- final ImmediateFailEndpoint endpoint = new ImmediateFailEndpoint();
- final FakeManager manager = new FakeManager(endpoint);
-
- final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime(
- LoggerFactory.getLogger("test"),
- manager,
- new NoopInitiator(),
- new NoopPushFactory(),
- TlsConfig.DEFAULT,
- -1,
- null // no cap, no counter
- );
-
- final HttpClientContext ctx = HttpClientContext.create();
- ctx.setRequestConfig(RequestConfig.custom().build());
-
- runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx,
- new FutureCallback() {
- @Override
- public void completed(final AsyncExecRuntime result) {
- }
-
- @Override
- public void failed(final Exception ex) {
- fail(ex);
- }
-
- @Override
- public void cancelled() {
- fail("cancelled");
- }
- });
-
- final ReentrantHandler loop = new ReentrantHandler(runtime, ctx);
-
- assertThrows(StackOverflowError.class, () -> {
- runtime.execute("loop", loop, ctx); // execute -> endpoint.execute -> failed() -> execute -> ...
- });
- }
-
- @Test
- void testCapBreaksRecursiveReentry() throws Exception {
- final ImmediateFailEndpoint endpoint = new ImmediateFailEndpoint();
- final FakeManager manager = new FakeManager(endpoint);
-
- final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime(
- LoggerFactory.getLogger("test"),
- manager,
- new NoopInitiator(),
- new NoopPushFactory(),
- TlsConfig.DEFAULT,
- 1,
- new AtomicInteger()
- );
-
- final HttpClientContext ctx = HttpClientContext.create();
- ctx.setRequestConfig(RequestConfig.custom().build());
-
- runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx,
- new FutureCallback() {
- @Override
- public void completed(final AsyncExecRuntime result) {
- }
-
- @Override
- public void failed(final Exception ex) {
- fail(ex);
- }
-
- @Override
- public void cancelled() {
- fail("cancelled");
- }
- });
-
- final ReentrantHandler loop = new ReentrantHandler(runtime, ctx);
-
- // Should NOT blow the stack; the re-entrant call should be rejected.
- runtime.execute("loop", loop, ctx);
- // allow the immediate fail+re-submit path to run
- Thread.sleep(50);
-
- assertTrue(loop.lastException.get() instanceof RejectedExecutionException,
- "Expected rejection to break the recursion");
- }
-
/**
* Endpoint that synchronously fails any handler passed to execute().
*/
@@ -513,9 +475,78 @@ private static final class ReentrantHandler implements AsyncClientExchangeHandle
@Override
public void failed(final Exception cause) {
lastException.set(cause);
- // Re-enter only if this was NOT the cap rejecting us
- if (!(cause instanceof RejectedExecutionException)) {
+ runtime.execute("loop/reenter", this, ctx);
+ }
+
+ @Override
+ public void produceRequest(final RequestChannel channel, final HttpContext context) {
+ }
+
+ @Override
+ public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails, final HttpContext context) {
+ }
+
+ @Override
+ public void consumeInformation(final HttpResponse response, final HttpContext context) {
+ }
+
+ @Override
+ public void cancel() {
+ }
+
+ @Override
+ public int available() {
+ return 0;
+ }
+
+ @Override
+ public void produce(final DataStreamChannel channel) {
+ }
+
+ @Override
+ public void updateCapacity(final CapacityChannel capacityChannel) {
+ }
+
+ @Override
+ public void consume(final ByteBuffer src) {
+ }
+
+ @Override
+ public void streamEnd(final List extends Header> trailers) {
+ }
+
+ @Override
+ public void releaseResources() {
+ }
+ }
+
+ private static final class BoundedReentrantHandler implements AsyncClientExchangeHandler {
+ private final InternalHttpAsyncExecRuntime runtime;
+ private final HttpClientContext ctx;
+ private final AtomicInteger remaining;
+ private final CountDownLatch done;
+
+ final AtomicInteger invocations = new AtomicInteger(0);
+ final AtomicReference lastException = new AtomicReference<>();
+
+ BoundedReentrantHandler(final InternalHttpAsyncExecRuntime runtime,
+ final HttpClientContext ctx,
+ final int maxReentries,
+ final CountDownLatch done) {
+ this.runtime = runtime;
+ this.ctx = ctx;
+ this.remaining = new AtomicInteger(maxReentries);
+ this.done = done;
+ }
+
+ @Override
+ public void failed(final Exception cause) {
+ invocations.incrementAndGet();
+ lastException.set(cause);
+ if (remaining.getAndDecrement() > 0) {
runtime.execute("loop/reenter", this, ctx);
+ } else {
+ done.countDown();
}
}