From 97067f4826ec09466f28f94efd33343e5f0a7cb3 Mon Sep 17 00:00:00 2001 From: Arturo Bernal Date: Mon, 5 Jan 2026 19:44:16 +0100 Subject: [PATCH] Implement shared FIFO execution queue for H2 client Introduce one shared per-client queue to cap concurrently executing requests and enqueue overflow. Ensure queued starts release the slot on any terminal path, including synchronous start failures. --- .../impl/async/InternalH2AsyncClient.java | 25 +- .../async/InternalH2AsyncExecRuntime.java | 89 ++--- .../impl/async/InternalHttpAsyncClient.java | 14 +- .../async/InternalHttpAsyncExecRuntime.java | 111 +++--- .../ReleasingAsyncClientExchangeHandler.java | 23 +- .../async/SharedRequestExecutionQueue.java | 142 +++++++ .../AsyncSharedClientQueueLocalExample.java | 255 ++++++++++++ .../H2SharedClientQueueLocalExample.java | 260 ++++++++++++ ...yncClientBuilderMaxQueuedRequestsTest.java | 29 +- ...nternalH2AsyncExecRuntimeQueueCapTest.java | 56 ++- ...ernalHttpAsyncExecRuntimeQueueCapTest.java | 375 ++++++++++-------- 11 files changed, 1056 insertions(+), 323 deletions(-) create mode 100644 httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/SharedRequestExecutionQueue.java create mode 100644 httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncSharedClientQueueLocalExample.java create mode 100644 httpclient5/src/test/java/org/apache/hc/client5/http/examples/H2SharedClientQueueLocalExample.java diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncClient.java index 3c27051000..a6e6c3804e 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncClient.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncClient.java @@ -29,7 +29,6 @@ import java.io.Closeable; import java.util.List; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.hc.client5.http.HttpRoute; import org.apache.hc.client5.http.async.AsyncExecRuntime; @@ -55,11 +54,6 @@ /** * Internal implementation of HTTP/2 only {@link CloseableHttpAsyncClient}. - *

- * Concurrent message exchanges with the same connection route executed by - * this client will get automatically multiplexed over a single physical HTTP/2 - * connection. - *

* * @since 5.0 */ @@ -70,8 +64,12 @@ public final class InternalH2AsyncClient extends InternalAbstractHttpAsyncClient private static final Logger LOG = LoggerFactory.getLogger(InternalH2AsyncClient.class); private final HttpRoutePlanner routePlanner; private final InternalH2ConnPool connPool; - private final int maxQueuedRequests; - private final AtomicInteger queuedRequests; + + /** + * One shared FIFO queue per client instance (Oleg). + * null means "unlimited" / no throttling. + */ + private final SharedRequestExecutionQueue executionQueue; InternalH2AsyncClient( final DefaultConnectingIOReactor ioReactor, @@ -87,25 +85,22 @@ public final class InternalH2AsyncClient extends InternalAbstractHttpAsyncClient final RequestConfig defaultConfig, final List closeables, final int maxQueuedRequests) { + super(ioReactor, pushConsumerRegistry, threadFactory, execChain, cookieSpecRegistry, authSchemeRegistry, cookieStore, credentialsProvider, HttpClientContext::castOrCreate, defaultConfig, closeables); this.connPool = connPool; this.routePlanner = routePlanner; - this.maxQueuedRequests = maxQueuedRequests; - this.queuedRequests = maxQueuedRequests > 0 ? new AtomicInteger(0) : null; + this.executionQueue = maxQueuedRequests > 0 ? new SharedRequestExecutionQueue(maxQueuedRequests) : null; } @Override AsyncExecRuntime createAsyncExecRuntime(final HandlerFactory pushHandlerFactory) { - return new InternalH2AsyncExecRuntime(LOG, connPool, pushHandlerFactory, maxQueuedRequests, queuedRequests); + return new InternalH2AsyncExecRuntime(LOG, connPool, pushHandlerFactory, executionQueue); } @Override - HttpRoute determineRoute( - final HttpHost httpHost, - final HttpRequest request, - final HttpClientContext clientContext) throws HttpException { + HttpRoute determineRoute(final HttpHost httpHost, final HttpRequest request, final HttpClientContext clientContext) throws HttpException { final HttpRoute route = routePlanner.determineRoute(httpHost, request, clientContext); if (route.isTunnelled()) { throw new HttpException("HTTP/2 tunneling not supported"); diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntime.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntime.java index 89ce8833cf..9e81219141 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntime.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntime.java @@ -28,8 +28,6 @@ package org.apache.hc.client5.http.impl.async; import java.io.InterruptedIOException; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.hc.client5.http.EndpointInfo; @@ -63,30 +61,27 @@ class InternalH2AsyncExecRuntime implements AsyncExecRuntime { private final InternalH2ConnPool connPool; private final HandlerFactory pushHandlerFactory; private final AtomicReference sessionRef; - private final int maxQueued; - private final AtomicInteger sharedQueued; + private final SharedRequestExecutionQueue executionQueue; private volatile boolean reusable; InternalH2AsyncExecRuntime( final Logger log, final InternalH2ConnPool connPool, final HandlerFactory pushHandlerFactory) { - this(log, connPool, pushHandlerFactory, -1, null); + this(log, connPool, pushHandlerFactory, null); } InternalH2AsyncExecRuntime( final Logger log, final InternalH2ConnPool connPool, final HandlerFactory pushHandlerFactory, - final int maxQueued, - final AtomicInteger sharedQueued) { + final SharedRequestExecutionQueue executionQueue) { super(); this.log = log; this.connPool = connPool; this.pushHandlerFactory = pushHandlerFactory; this.sessionRef = new AtomicReference<>(); - this.maxQueued = maxQueued; - this.sharedQueued = sharedQueued; + this.executionQueue = executionQueue; } @Override @@ -179,7 +174,6 @@ public boolean isEndpointConnected() { return endpoint != null && endpoint.session.isOpen(); } - Endpoint ensureValid() { final Endpoint endpoint = sessionRef.get(); if (endpoint == null) { @@ -261,41 +255,47 @@ public EndpointInfo getEndpointInfo() { return null; } - private boolean tryAcquireSlot() { - if (sharedQueued == null || maxQueued <= 0) { - return true; + @Override + public Cancellable execute( + final String id, + final AsyncClientExchangeHandler exchangeHandler, + final HttpClientContext context) { + + final Endpoint endpoint = ensureValid(); + final ComplexCancellable complexCancellable = new ComplexCancellable(); + + if (executionQueue == null) { + startExecution(id, endpoint, exchangeHandler, context, complexCancellable); + return complexCancellable; } - for (;;) { - final int q = sharedQueued.get(); - if (q >= maxQueued) { - return false; - } - if (sharedQueued.compareAndSet(q, q + 1)) { + + final Cancellable queued = executionQueue.enqueue( + () -> { + final AsyncClientExchangeHandler wrapped = + new ReleasingAsyncClientExchangeHandler(exchangeHandler, executionQueue::completed); + try { + startExecution(id, endpoint, wrapped, context, complexCancellable); + } catch (final RuntimeException ex) { + wrapped.failed(ex); + } + }, + exchangeHandler::cancel); + + return () -> { + if (queued.cancel()) { return true; } - } - } - - private void releaseSlot() { - if (sharedQueued != null && maxQueued > 0) { - sharedQueued.decrementAndGet(); - } + return complexCancellable.cancel(); + }; } - @Override - public Cancellable execute( + private void startExecution( final String id, - final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) { - final Endpoint endpoint = ensureValid(); - if (!tryAcquireSlot()) { - exchangeHandler.failed(new RejectedExecutionException( - "Execution pipeline queue limit reached (max=" + maxQueued + ")")); - return Operations.nonCancellable(); - } - final AsyncClientExchangeHandler actual = sharedQueued != null - ? new ReleasingAsyncClientExchangeHandler(exchangeHandler, this::releaseSlot) - : exchangeHandler; - final ComplexCancellable complexCancellable = new ComplexCancellable(); + final Endpoint endpoint, + final AsyncClientExchangeHandler exchangeHandler, + final HttpClientContext context, + final ComplexCancellable complexCancellable) { + final IOSession session = endpoint.session; if (session.isOpen()) { if (log.isDebugEnabled()) { @@ -303,7 +303,7 @@ public Cancellable execute( } context.setProtocolVersion(HttpVersion.HTTP_2); session.enqueue( - new RequestExecutionCommand(actual, pushHandlerFactory, complexCancellable, context), + new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context), Command.Priority.NORMAL); } else { final HttpRoute route = endpoint.route; @@ -321,23 +321,22 @@ public void completed(final IOSession ioSession) { } context.setProtocolVersion(HttpVersion.HTTP_2); ioSession.enqueue( - new RequestExecutionCommand(actual, pushHandlerFactory, complexCancellable, context), + new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context), Command.Priority.NORMAL); } @Override public void failed(final Exception ex) { - actual.failed(ex); + exchangeHandler.failed(ex); } @Override public void cancelled() { - actual.failed(new InterruptedIOException()); + exchangeHandler.failed(new InterruptedIOException()); } }); } - return complexCancellable; } @Override @@ -369,7 +368,7 @@ public String getId() { @Override public AsyncExecRuntime fork() { - return new InternalH2AsyncExecRuntime(log, connPool, pushHandlerFactory, maxQueued, sharedQueued); + return new InternalH2AsyncExecRuntime(log, connPool, pushHandlerFactory, executionQueue); } } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java index 1604039f30..111bcc01da 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java @@ -29,7 +29,6 @@ import java.io.Closeable; import java.util.List; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import org.apache.hc.client5.http.HttpRoute; @@ -76,8 +75,12 @@ public final class InternalHttpAsyncClient extends InternalAbstractHttpAsyncClie private final AsyncClientConnectionManager manager; private final HttpRoutePlanner routePlanner; private final TlsConfig tlsConfig; - private final int maxQueuedRequests; - private final AtomicInteger queuedCounter; + + /** + * One shared FIFO queue per client instance. + * null means "unlimited" / no throttling. + */ + private final SharedRequestExecutionQueue executionQueue; InternalHttpAsyncClient( final DefaultConnectingIOReactor ioReactor, @@ -101,13 +104,12 @@ public final class InternalHttpAsyncClient extends InternalAbstractHttpAsyncClie this.manager = manager; this.routePlanner = routePlanner; this.tlsConfig = tlsConfig; - this.maxQueuedRequests = maxQueuedRequests; - this.queuedCounter = maxQueuedRequests > 0 ? new AtomicInteger(0) : null; + this.executionQueue = maxQueuedRequests > 0 ? new SharedRequestExecutionQueue(maxQueuedRequests) : null; } @Override AsyncExecRuntime createAsyncExecRuntime(final HandlerFactory pushHandlerFactory) { - return new InternalHttpAsyncExecRuntime(LOG, manager, getConnectionInitiator(), pushHandlerFactory, tlsConfig, maxQueuedRequests, queuedCounter); + return new InternalHttpAsyncExecRuntime(LOG, manager, getConnectionInitiator(), pushHandlerFactory, tlsConfig, executionQueue); } @Override diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java index 9a7820cb50..e83a5e52a2 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java @@ -28,8 +28,6 @@ package org.apache.hc.client5.http.impl.async; import java.io.InterruptedIOException; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.hc.client5.http.EndpointInfo; @@ -79,8 +77,7 @@ static class ReUseData { private final TlsConfig tlsConfig; private final AtomicReference endpointRef; private final AtomicReference reuseDataRef; - private final int maxQueued; - private final AtomicInteger sharedQueued; + private final SharedRequestExecutionQueue executionQueue; InternalHttpAsyncExecRuntime( final Logger log, @@ -88,7 +85,7 @@ static class ReUseData { final ConnectionInitiator connectionInitiator, final HandlerFactory pushHandlerFactory, final TlsConfig tlsConfig) { - this(log, manager, connectionInitiator, pushHandlerFactory, tlsConfig, -1, null); + this(log, manager, connectionInitiator, pushHandlerFactory, tlsConfig, null); } InternalHttpAsyncExecRuntime( @@ -97,8 +94,7 @@ static class ReUseData { final ConnectionInitiator connectionInitiator, final HandlerFactory pushHandlerFactory, final TlsConfig tlsConfig, - final int maxQueued, - final AtomicInteger sharedQueued) { + final SharedRequestExecutionQueue executionQueue) { super(); this.log = log; this.manager = manager; @@ -107,8 +103,7 @@ static class ReUseData { this.tlsConfig = tlsConfig; this.endpointRef = new AtomicReference<>(); this.reuseDataRef = new AtomicReference<>(); - this.maxQueued = maxQueued; - this.sharedQueued = sharedQueued; + this.executionQueue = executionQueue; } @Override @@ -299,44 +294,12 @@ public EndpointInfo getEndpointInfo() { return endpoint != null ? endpoint.getInfo() : null; } - private boolean tryAcquireSlot() { - if (sharedQueued == null || maxQueued <= 0) { - return true; - } - for (;;) { - final int q = sharedQueued.get(); - if (q >= maxQueued) { - return false; - } - if (sharedQueued.compareAndSet(q, q + 1)) { - return true; - } - } - } - - private void releaseSlot() { - if (sharedQueued != null && maxQueued > 0) { - sharedQueued.decrementAndGet(); - } - } - - private AsyncClientExchangeHandler guard(final AsyncClientExchangeHandler handler) { - if (sharedQueued == null) { - return handler; - } - return new ReleasingAsyncClientExchangeHandler(handler, this::releaseSlot); - } + private Cancellable startExecution( + final String id, + final AsyncClientExchangeHandler exchangeHandler, + final HttpClientContext context) { - @Override - public Cancellable execute( - final String id, final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) { final AsyncConnectionEndpoint endpoint = ensureValid(); - if (sharedQueued != null && !tryAcquireSlot()) { - exchangeHandler.failed(new RejectedExecutionException( - "Execution pipeline queue limit reached (max=" + maxQueued + ")")); - return Operations.nonCancellable(); - } - final AsyncClientExchangeHandler actual = guard(exchangeHandler); if (endpoint.isConnected()) { if (log.isDebugEnabled()) { log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id); @@ -346,15 +309,18 @@ public Cancellable execute( if (responseTimeout != null) { endpoint.setSocketTimeout(responseTimeout); } - endpoint.execute(id, actual, pushHandlerFactory, context); - if (context.getRequestConfigOrDefault().isHardCancellationEnabled()) { + endpoint.execute(id, exchangeHandler, pushHandlerFactory, context); + + if (requestConfig.isHardCancellationEnabled()) { return () -> { - actual.cancel(); + exchangeHandler.cancel(); return true; }; } - } else { - connectEndpoint(context, new FutureCallback() { + return Operations.nonCancellable(); + } + + connectEndpoint(context, new FutureCallback() { @Override public void completed(final AsyncExecRuntime runtime) { @@ -362,7 +328,7 @@ public void completed(final AsyncExecRuntime runtime) { log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id); } try { - endpoint.execute(id, actual, pushHandlerFactory, context); + endpoint.execute(id, exchangeHandler, pushHandlerFactory, context); } catch (final RuntimeException ex) { failed(ex); } @@ -370,19 +336,54 @@ public void completed(final AsyncExecRuntime runtime) { @Override public void failed(final Exception ex) { - actual.failed(ex); + exchangeHandler.failed(ex); } @Override public void cancelled() { - actual.failed(new InterruptedIOException()); + exchangeHandler.failed(new InterruptedIOException()); } }); - } return Operations.nonCancellable(); } + @Override + public Cancellable execute( + final String id, + final AsyncClientExchangeHandler exchangeHandler, + final HttpClientContext context) { + + if (executionQueue == null) { + return startExecution(id, exchangeHandler, context); + } + + final AtomicReference activeCancelRef = new AtomicReference<>(Operations.nonCancellable()); + + final Cancellable queued = executionQueue.enqueue( + () -> { + final AsyncClientExchangeHandler wrapped = + new ReleasingAsyncClientExchangeHandler(exchangeHandler, executionQueue::completed); + try { + final Cancellable cancellable = startExecution(id, wrapped, context); + activeCancelRef.set(cancellable); + } catch (final RuntimeException ex) { + wrapped.failed(ex); + } + }, + () -> { + activeCancelRef.get().cancel(); + exchangeHandler.cancel(); + }); + + return () -> { + if (queued.cancel()) { + return true; + } + return activeCancelRef.get().cancel(); + }; + } + @Override public void markConnectionReusable(final Object newState, final TimeValue newValidDuration) { reuseDataRef.set(new ReUseData(newState, newValidDuration)); @@ -395,7 +396,7 @@ public void markConnectionNonReusable() { @Override public AsyncExecRuntime fork() { - return new InternalHttpAsyncExecRuntime(log, manager, connectionInitiator, pushHandlerFactory, tlsConfig, maxQueued, sharedQueued); + return new InternalHttpAsyncExecRuntime(log, manager, connectionInitiator, pushHandlerFactory, tlsConfig, executionQueue); } } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/ReleasingAsyncClientExchangeHandler.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/ReleasingAsyncClientExchangeHandler.java index c8639ce420..9358a7962b 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/ReleasingAsyncClientExchangeHandler.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/ReleasingAsyncClientExchangeHandler.java @@ -24,6 +24,7 @@ * . * */ + package org.apache.hc.client5.http.impl.async; import java.io.IOException; @@ -55,6 +56,12 @@ final class ReleasingAsyncClientExchangeHandler implements AsyncClientExchangeHa this.released = new AtomicBoolean(false); } + private void releaseOnce() { + if (released.compareAndSet(false, true)) { + onRelease.run(); + } + } + @Override public void produceRequest(final RequestChannel channel, final HttpContext context) throws HttpException, IOException { handler.produceRequest(channel, context); @@ -98,12 +105,20 @@ public void streamEnd(final List trailers) throws HttpExceptio @Override public void failed(final Exception cause) { - handler.failed(cause); + try { + handler.failed(cause); + } finally { + releaseOnce(); + } } @Override public void cancel() { - handler.cancel(); + try { + handler.cancel(); + } finally { + releaseOnce(); + } } @Override @@ -111,9 +126,7 @@ public void releaseResources() { try { handler.releaseResources(); } finally { - if (released.compareAndSet(false, true)) { - onRelease.run(); - } + releaseOnce(); } } } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/SharedRequestExecutionQueue.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/SharedRequestExecutionQueue.java new file mode 100644 index 0000000000..aa80a8fb2b --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/SharedRequestExecutionQueue.java @@ -0,0 +1,142 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.client5.http.impl.async; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hc.core5.annotation.Internal; +import org.apache.hc.core5.concurrent.Cancellable; +import org.apache.hc.core5.util.Args; + +/** + * Shared FIFO execution queue with a hard cap on concurrently executing tasks. + * Tasks beyond the cap are queued and executed when in-flight count drops. + */ +@Internal +final class SharedRequestExecutionQueue { + + private final int maxConcurrent; + private final AtomicInteger inFlight; + private final ConcurrentLinkedQueue queue; + private final AtomicBoolean draining; + + SharedRequestExecutionQueue(final int maxConcurrent) { + if (maxConcurrent <= 0) { + throw new IllegalArgumentException("maxConcurrent must be > 0"); + } + this.maxConcurrent = maxConcurrent; + this.inFlight = new AtomicInteger(0); + this.queue = new ConcurrentLinkedQueue<>(); + this.draining = new AtomicBoolean(false); + } + + Cancellable enqueue(final Runnable task, final Runnable onCancel) { + Args.notNull(task, "task"); + Args.notNull(onCancel, "onCancel"); + + final Entry entry = new Entry(task, onCancel); + queue.add(entry); + drain(); + return entry; + } + + void completed() { + inFlight.decrementAndGet(); + drain(); + } + + private void drain() { + for (;;) { + if (!draining.compareAndSet(false, true)) { + return; + } + try { + while (inFlight.get() < maxConcurrent) { + final Entry entry = queue.poll(); + if (entry == null) { + break; + } + if (entry.isCancelled()) { + continue; + } + entry.markStarted(); + inFlight.incrementAndGet(); + entry.run(); + } + } finally { + draining.set(false); + } + if (inFlight.get() >= maxConcurrent || queue.isEmpty()) { + return; + } + } + } + + + private static final class Entry implements Cancellable { + + private final Runnable task; + private final Runnable onCancel; + private final AtomicBoolean started; + private final AtomicBoolean cancelled; + + Entry(final Runnable task, final Runnable onCancel) { + this.task = task; + this.onCancel = onCancel; + this.started = new AtomicBoolean(false); + this.cancelled = new AtomicBoolean(false); + } + + void run() { + task.run(); + } + + void markStarted() { + started.set(true); + } + + boolean isCancelled() { + return cancelled.get(); + } + + @Override + public boolean cancel() { + if (started.get()) { + return false; + } + if (cancelled.compareAndSet(false, true)) { + onCancel.run(); + return true; + } + return false; + } + } + +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncSharedClientQueueLocalExample.java b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncSharedClientQueueLocalExample.java new file mode 100644 index 0000000000..916fde5eec --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncSharedClientQueueLocalExample.java @@ -0,0 +1,255 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.examples; + +import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; +import org.apache.hc.client5.http.async.methods.SimpleRequestProducer; +import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer; +import org.apache.hc.core5.http.message.StatusLine; +import org.apache.hc.core5.http.nio.AsyncRequestConsumer; +import org.apache.hc.core5.http.nio.AsyncServerRequestHandler; +import org.apache.hc.core5.http.nio.entity.DiscardingEntityConsumer; +import org.apache.hc.core5.http.nio.support.AsyncResponseBuilder; +import org.apache.hc.core5.http.nio.support.BasicRequestConsumer; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.http.protocol.HttpCoreContext; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.ListenerEndpoint; + +/** + * Demonstrates client-level request throttling for the async client using a shared FIFO queue. + *

+ * The example configures {@code setMaxQueuedRequests(int)} to cap the number of concurrently executing requests per + * client instance. Requests submitted beyond that limit are queued in FIFO order and executed when the number of + * in-flight requests drops below the configured maximum. + * + * @since 5.7 + */ +public final class AsyncSharedClientQueueLocalExample { + + public static void main(final String[] args) throws Exception { + final int maxConcurrentPerClient = 2; // shared per-client cap + final int totalRequests = 50; + + final IOReactorConfig ioReactorConfig = IOReactorConfig.custom() + .setIoThreadCount(1) + .build(); + + final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + + final AtomicInteger serverInFlight = new AtomicInteger(0); + final AtomicInteger serverMaxInFlight = new AtomicInteger(0); + + final HttpAsyncServer server = AsyncServerBootstrap.bootstrap() + .setIOReactorConfig(ioReactorConfig) + .setCanonicalHostName("127.0.0.1") + .register("*", new AsyncServerRequestHandler>() { + + @Override + public AsyncRequestConsumer> prepare( + final HttpRequest request, + final EntityDetails entityDetails, + final HttpContext context) { + return new BasicRequestConsumer<>( + entityDetails != null ? new DiscardingEntityConsumer<>() : null); + } + + @Override + public void handle( + final Message message, + final ResponseTrigger responseTrigger, + final HttpContext localContext) { + + final HttpCoreContext context = HttpCoreContext.cast(localContext); + + final int cur = serverInFlight.incrementAndGet(); + serverMaxInFlight.updateAndGet(prev -> Math.max(prev, cur)); + + scheduler.schedule(() -> { + try { + responseTrigger.submitResponse( + AsyncResponseBuilder.create(200) + .setEntity("ok\n", ContentType.TEXT_PLAIN) + .build(), + context); + } catch (final Exception ex) { + try { + responseTrigger.submitResponse( + AsyncResponseBuilder.create(500) + .setEntity(ex.toString(), ContentType.TEXT_PLAIN) + .build(), + context); + } catch (final Exception ignore) { + // ignore + } + } finally { + serverInFlight.decrementAndGet(); + } + }, 200, TimeUnit.MILLISECONDS); + } + + }) + .create(); + + server.start(); + final ListenerEndpoint ep = server.listen(new InetSocketAddress("127.0.0.1", 0), URIScheme.HTTP).get(); + final int port = ((InetSocketAddress) ep.getAddress()).getPort(); + System.out.println("server on 127.0.0.1:" + port); + + final PoolingAsyncClientConnectionManager cm = PoolingAsyncClientConnectionManagerBuilder.create() + .setMaxConnTotal(100) + .setMaxConnPerRoute(100) + .build(); + + final CloseableHttpAsyncClient client = HttpAsyncClients.custom() + .setIOReactorConfig(ioReactorConfig) + .setConnectionManager(cm) + // This is the knob you’re implementing as "max concurrent per client + shared FIFO queue". + .setMaxQueuedRequests(maxConcurrentPerClient) + .build(); + + client.start(); + + // warmup + final SimpleHttpRequest warmup = SimpleRequestBuilder.get("http://127.0.0.1:" + port + "/warmup").build(); + final CountDownLatch warmupLatch = new CountDownLatch(1); + client.execute( + SimpleRequestProducer.create(warmup), + SimpleResponseConsumer.create(), + new FutureCallback() { + @Override + public void completed(final SimpleHttpResponse result) { + System.out.println("warmup -> " + new StatusLine(result)); + warmupLatch.countDown(); + } + + @Override + public void failed(final Exception ex) { + System.out.println("warmup failed -> " + ex); + warmupLatch.countDown(); + } + + @Override + public void cancelled() { + System.out.println("warmup cancelled"); + warmupLatch.countDown(); + } + }); + warmupLatch.await(); + + final AtomicInteger ok = new AtomicInteger(0); + final AtomicInteger failed = new AtomicInteger(0); + final CountDownLatch latch = new CountDownLatch(totalRequests); + + final ExecutorService exec = Executors.newFixedThreadPool(Math.min(16, totalRequests)); + final long t0 = System.nanoTime(); + + for (int i = 0; i < totalRequests; i++) { + final int id = i; + exec.execute(() -> { + final SimpleHttpRequest req = SimpleRequestBuilder + .get("http://127.0.0.1:" + port + "/slow?i=" + id) + .build(); + + client.execute( + SimpleRequestProducer.create(req), + SimpleResponseConsumer.create(), + new FutureCallback() { + + @Override + public void completed(final SimpleHttpResponse result) { + if (result.getCode() == 200) { + ok.incrementAndGet(); + } else { + failed.incrementAndGet(); + System.out.println("FAILED i=" + id + " -> " + new StatusLine(result)); + } + latch.countDown(); + } + + @Override + public void failed(final Exception ex) { + failed.incrementAndGet(); + System.out.println("FAILED i=" + id + " -> " + ex); + latch.countDown(); + } + + @Override + public void cancelled() { + failed.incrementAndGet(); + System.out.println("CANCELLED i=" + id); + latch.countDown(); + } + + }); + }); + } + + final boolean done = latch.await(120, TimeUnit.SECONDS); + final long t1 = System.nanoTime(); + + exec.shutdownNow(); + + System.out.println("done=" + done + + " ok=" + ok.get() + + " failed=" + failed.get() + + " clientCap=" + maxConcurrentPerClient + + " serverMaxInFlightSeen=" + serverMaxInFlight.get() + + " elapsedMs=" + TimeUnit.NANOSECONDS.toMillis(t1 - t0)); + + client.close(CloseMode.GRACEFUL); + server.close(CloseMode.GRACEFUL); + scheduler.shutdownNow(); + } + + private AsyncSharedClientQueueLocalExample() { + } + +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/H2SharedClientQueueLocalExample.java b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/H2SharedClientQueueLocalExample.java new file mode 100644 index 0000000000..8ba14d0c4d --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/H2SharedClientQueueLocalExample.java @@ -0,0 +1,260 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.examples; + +import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; +import org.apache.hc.client5.http.async.methods.SimpleRequestProducer; +import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.H2AsyncClientBuilder; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer; +import org.apache.hc.core5.http.message.StatusLine; +import org.apache.hc.core5.http.nio.AsyncRequestConsumer; +import org.apache.hc.core5.http.nio.AsyncServerRequestHandler; +import org.apache.hc.core5.http.nio.entity.DiscardingEntityConsumer; +import org.apache.hc.core5.http.nio.support.AsyncResponseBuilder; +import org.apache.hc.core5.http.nio.support.BasicRequestConsumer; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.http.protocol.HttpCoreContext; +import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.ListenerEndpoint; + + +/** + * Demonstrates client-level request throttling for the HTTP/2 async client using a shared FIFO queue. + *

+ * The example configures {@link org.apache.hc.client5.http.impl.async.H2AsyncClientBuilder#setMaxQueuedRequests(int)} + * to cap the number of concurrently executing requests per client instance. Requests submitted beyond that limit are + * queued in FIFO order and executed as soon as the number of in-flight requests drops below the configured maximum. + *

+ * A local HTTP/2 server is started with a permissive {@code maxConcurrentStreams} setting so that the client-side cap + * is the limiting factor. The example then submits multiple requests and prints summary statistics, including the + * maximum number of concurrent server requests observed. + * + * @since 5.7 + */ +public final class H2SharedClientQueueLocalExample { + + + public static void main(final String[] args) throws Exception { + final int maxConcurrentPerClient = 2; // <-- the feature under test + final int totalRequests = 50; + + final IOReactorConfig ioReactorConfig = IOReactorConfig.custom() + .setIoThreadCount(1) + .build(); + + final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + + final AtomicInteger serverInFlight = new AtomicInteger(0); + final AtomicInteger serverMaxInFlight = new AtomicInteger(0); + + final H2Config serverH2Config = H2Config.custom() + .setPushEnabled(false) + // Keep server permissive. We want the *client* cap to be the limiting factor. + .setMaxConcurrentStreams(256) + .build(); + + final HttpAsyncServer server = H2ServerBootstrap.bootstrap() + .setIOReactorConfig(ioReactorConfig) + .setH2Config(serverH2Config) + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .setCanonicalHostName("127.0.0.1") + .register("*", new AsyncServerRequestHandler>() { + + @Override + public AsyncRequestConsumer> prepare( + final HttpRequest request, + final EntityDetails entityDetails, + final HttpContext context) { + return new BasicRequestConsumer<>(entityDetails != null ? new DiscardingEntityConsumer<>() : null); + } + + @Override + public void handle( + final Message message, + final ResponseTrigger responseTrigger, + final HttpContext localContext) { + + final HttpCoreContext context = HttpCoreContext.cast(localContext); + + final int cur = serverInFlight.incrementAndGet(); + serverMaxInFlight.updateAndGet(prev -> Math.max(prev, cur)); + + scheduler.schedule(() -> { + try { + responseTrigger.submitResponse( + AsyncResponseBuilder.create(200) + .setEntity("ok\n", ContentType.TEXT_PLAIN) + .build(), + context); + } catch (final Exception ex) { + try { + responseTrigger.submitResponse( + AsyncResponseBuilder.create(500) + .setEntity(ex.toString(), ContentType.TEXT_PLAIN) + .build(), + context); + } catch (final Exception ignore) { + // ignore + } + } finally { + serverInFlight.decrementAndGet(); + } + }, 200, TimeUnit.MILLISECONDS); + } + + }) + .create(); + + server.start(); + final ListenerEndpoint ep = server.listen(new InetSocketAddress("127.0.0.1", 0), URIScheme.HTTP).get(); + final int port = ((InetSocketAddress) ep.getAddress()).getPort(); + System.out.println("server on 127.0.0.1:" + port); + + final CloseableHttpAsyncClient client = H2AsyncClientBuilder.create() + .setIOReactorConfig(ioReactorConfig) + .setH2Config(H2Config.DEFAULT) + // IMPORTANT: this is the knob you’re implementing as “max concurrent per client + shared FIFO queue”. + .setMaxQueuedRequests(maxConcurrentPerClient) + .build(); + + client.start(); + + // warmup + final SimpleHttpRequest warmup = SimpleRequestBuilder.get("http://127.0.0.1:" + port + "/warmup").build(); + final CountDownLatch warmupLatch = new CountDownLatch(1); + client.execute( + SimpleRequestProducer.create(warmup), + SimpleResponseConsumer.create(), + new FutureCallback() { + @Override + public void completed(final SimpleHttpResponse result) { + System.out.println("warmup -> " + new StatusLine(result)); + warmupLatch.countDown(); + } + + @Override + public void failed(final Exception ex) { + System.out.println("warmup failed -> " + ex); + warmupLatch.countDown(); + } + + @Override + public void cancelled() { + System.out.println("warmup cancelled"); + warmupLatch.countDown(); + } + }); + warmupLatch.await(); + + final AtomicInteger ok = new AtomicInteger(0); + final AtomicInteger failed = new AtomicInteger(0); + final CountDownLatch latch = new CountDownLatch(totalRequests); + + final ExecutorService exec = Executors.newFixedThreadPool(Math.min(16, totalRequests)); + final long t0 = System.nanoTime(); + + for (int i = 0; i < totalRequests; i++) { + final int id = i; + exec.execute(() -> { + final SimpleHttpRequest req = SimpleRequestBuilder + .get("http://127.0.0.1:" + port + "/slow?i=" + id) + .build(); + + client.execute( + SimpleRequestProducer.create(req), + SimpleResponseConsumer.create(), + new FutureCallback() { + @Override + public void completed(final SimpleHttpResponse result) { + if (result.getCode() == 200) { + ok.incrementAndGet(); + } else { + failed.incrementAndGet(); + System.out.println("FAILED i=" + id + " -> " + new StatusLine(result)); + } + latch.countDown(); + } + + @Override + public void failed(final Exception ex) { + failed.incrementAndGet(); + System.out.println("FAILED i=" + id + " -> " + ex); + latch.countDown(); + } + + @Override + public void cancelled() { + failed.incrementAndGet(); + System.out.println("CANCELLED i=" + id); + latch.countDown(); + } + }); + }); + } + + final boolean done = latch.await(120, TimeUnit.SECONDS); + final long t1 = System.nanoTime(); + + exec.shutdownNow(); + + System.out.println("done=" + done + + " ok=" + ok.get() + + " failed=" + failed.get() + + " maxInFlightSeen=" + serverMaxInFlight.get() + + " elapsedMs=" + TimeUnit.NANOSECONDS.toMillis(t1 - t0)); + + client.close(CloseMode.GRACEFUL); + server.close(CloseMode.GRACEFUL); + scheduler.shutdownNow(); + } + + private H2SharedClientQueueLocalExample() { + } + +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilderMaxQueuedRequestsTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilderMaxQueuedRequestsTest.java index f37bd43c71..15848db9e4 100644 --- a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilderMaxQueuedRequestsTest.java +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilderMaxQueuedRequestsTest.java @@ -29,13 +29,14 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.io.IOException; import java.net.SocketAddress; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.hc.client5.http.EndpointInfo; @@ -120,8 +121,14 @@ public void cancelled() { } }); - assertTrue(latch.await(2, TimeUnit.SECONDS), "rejection should arrive quickly"); - assertInstanceOf(RejectedExecutionException.class, failure.get(), "Expected RejectedExecutionException, got: " + failure.get()); + // With queueing semantics, r2 is queued and must not complete until r1 is finished. + assertTrue(!latch.await(200, TimeUnit.MILLISECONDS), "second request should be queued"); + + // Finish r1, release the slot -> r2 should now execute and fail (see BlockingEndpoint.execute()). + endpoint.failFirst(new IOException("release-slot")); + + assertTrue(latch.await(2, TimeUnit.SECONDS), "second request should complete after slot released"); + assertInstanceOf(IOException.class, failure.get(), "Expected IOException, got: " + failure.get()); } } @@ -179,11 +186,27 @@ public void close() { private static final class BlockingEndpoint extends AsyncConnectionEndpoint { volatile boolean connected = true; + private final AtomicInteger execCount = new AtomicInteger(0); + private final AtomicReference first = new AtomicReference<>(); + @Override public void execute(final String id, final AsyncClientExchangeHandler handler, final HandlerFactory pushHandlerFactory, final HttpContext context) { + final int n = execCount.incrementAndGet(); + if (n == 1) { + first.set(handler); // keep slot occupied + } else { + handler.failed(new IOException("queued request executed")); + } + } + + void failFirst(final Exception ex) { + final AsyncClientExchangeHandler h = first.getAndSet(null); + if (h != null) { + h.failed(ex); + } } @Override diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntimeQueueCapTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntimeQueueCapTest.java index 0309d8710d..7a63e863f0 100644 --- a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntimeQueueCapTest.java +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntimeQueueCapTest.java @@ -37,7 +37,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -68,16 +67,17 @@ public class InternalH2AsyncExecRuntimeQueueCapTest { - private static InternalH2AsyncExecRuntime newRuntime(final int maxQueued) { + private static InternalH2AsyncExecRuntime newRuntime(final int maxConcurrent) { final IOSession ioSession = newImmediateFailSession(); final FakeH2ConnPool connPool = new FakeH2ConnPool(ioSession); - final AtomicInteger queued = maxQueued > 0 ? new AtomicInteger(0) : null; + + final SharedRequestExecutionQueue queue = maxConcurrent > 0 ? new SharedRequestExecutionQueue(maxConcurrent) : null; + return new InternalH2AsyncExecRuntime( LoggerFactory.getLogger("test"), connPool, new NoopPushFactory(), - maxQueued, - queued); + queue); } private static void acquireEndpoint( @@ -109,19 +109,19 @@ public void cancelled() { } /** - * With no cap (maxQueued <= 0) the recursive re-entry path should blow the stack. - * This documents the pathological behaviour without queue protection. + * With no cap / no queue, the re-entrant execute path can recurse until SOE + * if failures are delivered synchronously. */ @Test void testRecursiveReentryCausesSOEWithoutCap() throws Exception { - final InternalH2AsyncExecRuntime runtime = newRuntime(-1); + final InternalH2AsyncExecRuntime runtime = newRuntime(0); final HttpClientContext ctx = HttpClientContext.create(); ctx.setRequestConfig(RequestConfig.custom().build()); acquireEndpoint(runtime, ctx); - final ReentrantHandler loop = new ReentrantHandler(runtime, ctx); + final ReentrantHandler loop = new ReentrantHandler(runtime, ctx, Integer.MAX_VALUE); assertThrows(StackOverflowError.class, () -> { runtime.execute("loop", loop, ctx); @@ -129,8 +129,8 @@ void testRecursiveReentryCausesSOEWithoutCap() throws Exception { } /** - * With a cap of 1, the second re-entrant execute call must be rejected and - * the recursion broken. + * With cap=1 and QUEUE semantics, re-entrant submissions should be queued, + * not executed inline recursively. This test must NOT expect rejection. */ @Test void testCapBreaksRecursiveReentry() throws Exception { @@ -141,14 +141,13 @@ void testCapBreaksRecursiveReentry() throws Exception { acquireEndpoint(runtime, ctx); - final ReentrantHandler loop = new ReentrantHandler(runtime, ctx); + final int maxAttempts = 200; + final ReentrantHandler loop = new ReentrantHandler(runtime, ctx, maxAttempts); runtime.execute("loop", loop, ctx); - // immediate fail path runs synchronously; small wait is just defensive - Thread.sleep(50); - assertTrue(loop.lastException.get() instanceof RejectedExecutionException, - "Expected queue rejection to break recursion"); + assertTrue(loop.done.await(2, TimeUnit.SECONDS), "expected bounded re-entry loop to terminate"); + assertTrue(loop.attempts.get() >= maxAttempts, "expected at least " + maxAttempts + " attempts, got " + loop.attempts.get()); } /** @@ -168,8 +167,7 @@ public Future getSession( final HttpRoute route, final Timeout timeout, final FutureCallback callback) { - final CompletableFuture cf = new CompletableFuture<>(); - cf.complete(session); + final CompletableFuture cf = CompletableFuture.completedFuture(session); if (callback != null) { callback.completed(session); } @@ -228,19 +226,33 @@ private static final class ReentrantHandler implements AsyncClientExchangeHandle private final InternalH2AsyncExecRuntime runtime; private final HttpClientContext context; - final AtomicReference lastException = new AtomicReference<>(); + private final int maxAttempts; + + final AtomicInteger attempts; + final AtomicReference lastException; + final CountDownLatch done; - ReentrantHandler(final InternalH2AsyncExecRuntime runtime, final HttpClientContext context) { + ReentrantHandler(final InternalH2AsyncExecRuntime runtime, final HttpClientContext context, final int maxAttempts) { this.runtime = runtime; this.context = context; + this.maxAttempts = maxAttempts; + this.attempts = new AtomicInteger(0); + this.lastException = new AtomicReference<>(); + this.done = new CountDownLatch(1); } @Override public void failed(final Exception cause) { lastException.set(cause); - if (!(cause instanceof RejectedExecutionException)) { - runtime.execute("loop/reenter", this, context); + + final int n = attempts.incrementAndGet(); + if (n >= maxAttempts) { + done.countDown(); + return; } + + // Re-enter. With QUEUE cap=1 this must not recurse inline until SOE. + runtime.execute("loop/reenter/" + n, this, context); } @Override diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntimeQueueCapTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntimeQueueCapTest.java index 6aae8413af..0a00c17fbf 100644 --- a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntimeQueueCapTest.java +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntimeQueueCapTest.java @@ -27,6 +27,7 @@ package org.apache.hc.client5.http.impl.async; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -41,7 +42,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -79,112 +79,163 @@ public class InternalHttpAsyncExecRuntimeQueueCapTest { @Test - void testFailFastWhenQueueFull() throws Exception { + void testRequestsQueuedWhenOverCap() throws Exception { final FakeEndpoint endpoint = new FakeEndpoint(); final FakeManager manager = new FakeManager(endpoint); + final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime( LoggerFactory.getLogger("test"), manager, new NoopInitiator(), new NoopPushFactory(), TlsConfig.DEFAULT, - 2, - new AtomicInteger() - ); + new SharedRequestExecutionQueue(2)); final HttpClientContext ctx = HttpClientContext.create(); ctx.setRequestConfig(RequestConfig.custom().build()); - runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx, new FutureCallback() { - @Override - public void completed(final AsyncExecRuntime result) { - } - - @Override - public void failed(final Exception ex) { - fail(ex); - } - - @Override - public void cancelled() { - fail("cancelled"); - } - }); - - final CountDownLatch rejected = new CountDownLatch(1); + runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx, new NoopRuntimeCallback()); final LatchingHandler h1 = new LatchingHandler(); final LatchingHandler h2 = new LatchingHandler(); + final LatchingHandler h3 = new LatchingHandler(); + runtime.execute("r1", h1, ctx); runtime.execute("r2", h2, ctx); - - final LatchingHandler h3 = new LatchingHandler() { - @Override - public void failed(final Exception cause) { - super.failed(cause); - rejected.countDown(); - } - }; runtime.execute("r3", h3, ctx); - assertTrue(rejected.await(2, TimeUnit.SECONDS), "r3 should be failed fast"); - assertTrue(h3.failedException.get() instanceof RejectedExecutionException); + assertTrue(waitFor(() -> endpoint.executedCount() == 2, 2, TimeUnit.SECONDS), "r1 and r2 should start"); + assertTrue(endpoint.executedIds.contains("r1")); + assertTrue(endpoint.executedIds.contains("r2")); + assertTrue(!endpoint.executedIds.contains("r3"), "r3 should be queued, not started yet"); + + endpoint.completeOne(); // releases a slot and should trigger queued r3 + + assertTrue(waitFor(() -> endpoint.executedIds.contains("r3"), 2, TimeUnit.SECONDS), "r3 should start after slot release"); + assertNull(h1.failedException.get()); assertNull(h2.failedException.get()); + assertNull(h3.failedException.get()); } @Test void testSlotReleasedOnTerminalSignalAllowsNext() throws Exception { final FakeEndpoint endpoint = new FakeEndpoint(); final FakeManager manager = new FakeManager(endpoint); + final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime( LoggerFactory.getLogger("test"), manager, new NoopInitiator(), new NoopPushFactory(), TlsConfig.DEFAULT, - 1, - new AtomicInteger() - ); + new SharedRequestExecutionQueue(1)); final HttpClientContext ctx = HttpClientContext.create(); ctx.setRequestConfig(RequestConfig.custom().build()); - runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx, - new FutureCallback() { - @Override - public void completed(final AsyncExecRuntime result) { - } - - @Override - public void failed(final Exception ex) { - fail(ex); - } - - @Override - public void cancelled() { - fail("cancelled"); - } - }); + runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx, new NoopRuntimeCallback()); final LatchingHandler h1 = new LatchingHandler(); - runtime.execute("r1", h1, ctx); - final LatchingHandler h2 = new LatchingHandler(); + + runtime.execute("r1", h1, ctx); runtime.execute("r2", h2, ctx); - assertTrue(h2.awaitFailed(2, TimeUnit.SECONDS)); - assertTrue(h2.failedException.get() instanceof RejectedExecutionException); - // free the slot via releaseResources(), not failed() + assertTrue(waitFor(() -> endpoint.executedIds.contains("r1"), 2, TimeUnit.SECONDS)); + assertTrue(!endpoint.executedIds.contains("r2"), "r2 should be queued until r1 completes"); + endpoint.completeOne(); - final LatchingHandler h3 = new LatchingHandler(); - runtime.execute("r3", h3, ctx); - Thread.sleep(150); - assertNull(h3.failedException.get(), "r3 should not be rejected after slot released"); - h3.cancel(); + assertTrue(waitFor(() -> endpoint.executedIds.contains("r2"), 2, TimeUnit.SECONDS), "r2 should start after r1 completes"); + + assertNull(h1.failedException.get()); + assertNull(h2.failedException.get()); } + @Test + void testRecursiveReentryCausesSOEWithoutCap() { + final ImmediateFailEndpoint endpoint = new ImmediateFailEndpoint(); + final FakeManager manager = new FakeManager(endpoint); + + // no queue => old synchronous recursion behaviour remains + final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime( + LoggerFactory.getLogger("test"), + manager, + new NoopInitiator(), + new NoopPushFactory(), + TlsConfig.DEFAULT, + null); + + final HttpClientContext ctx = HttpClientContext.create(); + ctx.setRequestConfig(RequestConfig.custom().build()); + + runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx, new NoopRuntimeCallback()); + + final ReentrantHandler loop = new ReentrantHandler(runtime, ctx); + + assertThrows(StackOverflowError.class, () -> runtime.execute("loop", loop, ctx)); + } + + @Test + void testCapBreaksRecursiveReentry() throws Exception { + final ImmediateFailEndpoint endpoint = new ImmediateFailEndpoint(); + final FakeManager manager = new FakeManager(endpoint); + + // queue => no synchronous recursion -> no SOE + final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime( + LoggerFactory.getLogger("test"), + manager, + new NoopInitiator(), + new NoopPushFactory(), + TlsConfig.DEFAULT, + new SharedRequestExecutionQueue(1)); + + final HttpClientContext ctx = HttpClientContext.create(); + ctx.setRequestConfig(RequestConfig.custom().build()); + + runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx, new NoopRuntimeCallback()); + + final CountDownLatch done = new CountDownLatch(1); + final BoundedReentrantHandler loop = new BoundedReentrantHandler(runtime, ctx, 50, done); + + assertDoesNotThrow(() -> runtime.execute("loop", loop, ctx)); + assertTrue(done.await(2, TimeUnit.SECONDS), "Expected bounded re-entry loop to complete without SOE"); + assertTrue(loop.invocations.get() >= 1); + assertTrue(loop.lastException.get() instanceof IOException); + } + + private static boolean waitFor(final Condition condition, final long time, final TimeUnit unit) throws InterruptedException { + final long deadline = System.nanoTime() + unit.toNanos(time); + while (System.nanoTime() < deadline) { + if (condition.get()) { + return true; + } + Thread.sleep(10); + } + return condition.get(); + } + + @FunctionalInterface + private interface Condition { + boolean get(); + } + + private static final class NoopRuntimeCallback implements FutureCallback { + @Override + public void completed(final AsyncExecRuntime result) { + } + + @Override + public void failed(final Exception ex) { + fail(ex); + } + + @Override + public void cancelled() { + fail("cancelled"); + } + } private static final class NoopInitiator implements ConnectionInitiator { @Override @@ -267,36 +318,37 @@ public void close() { private static final class FakeEndpoint extends AsyncConnectionEndpoint { volatile boolean connected = true; - private final ConcurrentLinkedQueue inFlight = new ConcurrentLinkedQueue<>(); + + private static final class InFlightEntry { + final String id; + final AsyncClientExchangeHandler handler; + + InFlightEntry(final String id, final AsyncClientExchangeHandler handler) { + this.id = id; + this.handler = handler; + } + } + + final ConcurrentLinkedQueue executedIds = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue inFlight = new ConcurrentLinkedQueue<>(); @Override public void execute(final String id, final AsyncClientExchangeHandler handler, final HandlerFactory pushHandlerFactory, final HttpContext context) { - // keep the guarded handler so tests can signal terminal events - inFlight.add(handler); - } - - // helpers for tests - void failOne(final Exception ex) { - final AsyncClientExchangeHandler h = inFlight.poll(); - if (h != null) { - h.failed(ex); - } + executedIds.add(id); + inFlight.add(new InFlightEntry(id, handler)); } - void cancelOne() { - final AsyncClientExchangeHandler h = inFlight.poll(); - if (h != null) { - h.cancel(); - } + int executedCount() { + return executedIds.size(); } void completeOne() { - final AsyncClientExchangeHandler h = inFlight.poll(); - if (h != null) { - h.releaseResources(); + final InFlightEntry e = inFlight.poll(); + if (e != null) { + e.handler.releaseResources(); } } @@ -320,8 +372,6 @@ public EndpointInfo getInfo() { } } - - private static class LatchingHandler implements AsyncClientExchangeHandler { final AtomicReference failedException = new AtomicReference<>(); final CountDownLatch failLatch = new CountDownLatch(1); @@ -378,94 +428,6 @@ public void failed(final Exception cause) { } } - @Test - void testRecursiveReentryCausesSOEWithoutCap() { - final ImmediateFailEndpoint endpoint = new ImmediateFailEndpoint(); - final FakeManager manager = new FakeManager(endpoint); - - final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime( - LoggerFactory.getLogger("test"), - manager, - new NoopInitiator(), - new NoopPushFactory(), - TlsConfig.DEFAULT, - -1, - null // no cap, no counter - ); - - final HttpClientContext ctx = HttpClientContext.create(); - ctx.setRequestConfig(RequestConfig.custom().build()); - - runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx, - new FutureCallback() { - @Override - public void completed(final AsyncExecRuntime result) { - } - - @Override - public void failed(final Exception ex) { - fail(ex); - } - - @Override - public void cancelled() { - fail("cancelled"); - } - }); - - final ReentrantHandler loop = new ReentrantHandler(runtime, ctx); - - assertThrows(StackOverflowError.class, () -> { - runtime.execute("loop", loop, ctx); // execute -> endpoint.execute -> failed() -> execute -> ... - }); - } - - @Test - void testCapBreaksRecursiveReentry() throws Exception { - final ImmediateFailEndpoint endpoint = new ImmediateFailEndpoint(); - final FakeManager manager = new FakeManager(endpoint); - - final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime( - LoggerFactory.getLogger("test"), - manager, - new NoopInitiator(), - new NoopPushFactory(), - TlsConfig.DEFAULT, - 1, - new AtomicInteger() - ); - - final HttpClientContext ctx = HttpClientContext.create(); - ctx.setRequestConfig(RequestConfig.custom().build()); - - runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx, - new FutureCallback() { - @Override - public void completed(final AsyncExecRuntime result) { - } - - @Override - public void failed(final Exception ex) { - fail(ex); - } - - @Override - public void cancelled() { - fail("cancelled"); - } - }); - - final ReentrantHandler loop = new ReentrantHandler(runtime, ctx); - - // Should NOT blow the stack; the re-entrant call should be rejected. - runtime.execute("loop", loop, ctx); - // allow the immediate fail+re-submit path to run - Thread.sleep(50); - - assertTrue(loop.lastException.get() instanceof RejectedExecutionException, - "Expected rejection to break the recursion"); - } - /** * Endpoint that synchronously fails any handler passed to execute(). */ @@ -513,9 +475,78 @@ private static final class ReentrantHandler implements AsyncClientExchangeHandle @Override public void failed(final Exception cause) { lastException.set(cause); - // Re-enter only if this was NOT the cap rejecting us - if (!(cause instanceof RejectedExecutionException)) { + runtime.execute("loop/reenter", this, ctx); + } + + @Override + public void produceRequest(final RequestChannel channel, final HttpContext context) { + } + + @Override + public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails, final HttpContext context) { + } + + @Override + public void consumeInformation(final HttpResponse response, final HttpContext context) { + } + + @Override + public void cancel() { + } + + @Override + public int available() { + return 0; + } + + @Override + public void produce(final DataStreamChannel channel) { + } + + @Override + public void updateCapacity(final CapacityChannel capacityChannel) { + } + + @Override + public void consume(final ByteBuffer src) { + } + + @Override + public void streamEnd(final List trailers) { + } + + @Override + public void releaseResources() { + } + } + + private static final class BoundedReentrantHandler implements AsyncClientExchangeHandler { + private final InternalHttpAsyncExecRuntime runtime; + private final HttpClientContext ctx; + private final AtomicInteger remaining; + private final CountDownLatch done; + + final AtomicInteger invocations = new AtomicInteger(0); + final AtomicReference lastException = new AtomicReference<>(); + + BoundedReentrantHandler(final InternalHttpAsyncExecRuntime runtime, + final HttpClientContext ctx, + final int maxReentries, + final CountDownLatch done) { + this.runtime = runtime; + this.ctx = ctx; + this.remaining = new AtomicInteger(maxReentries); + this.done = done; + } + + @Override + public void failed(final Exception cause) { + invocations.incrementAndGet(); + lastException.set(cause); + if (remaining.getAndDecrement() > 0) { runtime.execute("loop/reenter", this, ctx); + } else { + done.countDown(); } }