Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.io.Closeable;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.async.AsyncExecRuntime;
Expand All @@ -55,11 +54,6 @@

/**
* Internal implementation of HTTP/2 only {@link CloseableHttpAsyncClient}.
* <p>
* Concurrent message exchanges with the same connection route executed by
* this client will get automatically multiplexed over a single physical HTTP/2
* connection.
* </p>
*
* @since 5.0
*/
Expand All @@ -70,8 +64,12 @@ public final class InternalH2AsyncClient extends InternalAbstractHttpAsyncClient
private static final Logger LOG = LoggerFactory.getLogger(InternalH2AsyncClient.class);
private final HttpRoutePlanner routePlanner;
private final InternalH2ConnPool connPool;
private final int maxQueuedRequests;
private final AtomicInteger queuedRequests;

/**
* One shared FIFO queue per client instance (Oleg).
* null means "unlimited" / no throttling.
*/
private final SharedRequestExecutionQueue executionQueue;

InternalH2AsyncClient(
final DefaultConnectingIOReactor ioReactor,
Expand All @@ -87,25 +85,22 @@ public final class InternalH2AsyncClient extends InternalAbstractHttpAsyncClient
final RequestConfig defaultConfig,
final List<Closeable> closeables,
final int maxQueuedRequests) {

super(ioReactor, pushConsumerRegistry, threadFactory, execChain,
cookieSpecRegistry, authSchemeRegistry, cookieStore, credentialsProvider, HttpClientContext::castOrCreate,
defaultConfig, closeables);
this.connPool = connPool;
this.routePlanner = routePlanner;
this.maxQueuedRequests = maxQueuedRequests;
this.queuedRequests = maxQueuedRequests > 0 ? new AtomicInteger(0) : null;
this.executionQueue = maxQueuedRequests > 0 ? new SharedRequestExecutionQueue(maxQueuedRequests) : null;
}

@Override
AsyncExecRuntime createAsyncExecRuntime(final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
return new InternalH2AsyncExecRuntime(LOG, connPool, pushHandlerFactory, maxQueuedRequests, queuedRequests);
return new InternalH2AsyncExecRuntime(LOG, connPool, pushHandlerFactory, executionQueue);
}

@Override
HttpRoute determineRoute(
final HttpHost httpHost,
final HttpRequest request,
final HttpClientContext clientContext) throws HttpException {
HttpRoute determineRoute(final HttpHost httpHost, final HttpRequest request, final HttpClientContext clientContext) throws HttpException {
final HttpRoute route = routePlanner.determineRoute(httpHost, request, clientContext);
if (route.isTunnelled()) {
throw new HttpException("HTTP/2 tunneling not supported");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
package org.apache.hc.client5.http.impl.async;

import java.io.InterruptedIOException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.hc.client5.http.EndpointInfo;
Expand Down Expand Up @@ -63,30 +61,27 @@ class InternalH2AsyncExecRuntime implements AsyncExecRuntime {
private final InternalH2ConnPool connPool;
private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
private final AtomicReference<Endpoint> sessionRef;
private final int maxQueued;
private final AtomicInteger sharedQueued;
private final SharedRequestExecutionQueue executionQueue;
private volatile boolean reusable;

InternalH2AsyncExecRuntime(
final Logger log,
final InternalH2ConnPool connPool,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
this(log, connPool, pushHandlerFactory, -1, null);
this(log, connPool, pushHandlerFactory, null);
}

InternalH2AsyncExecRuntime(
final Logger log,
final InternalH2ConnPool connPool,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final int maxQueued,
final AtomicInteger sharedQueued) {
final SharedRequestExecutionQueue executionQueue) {
super();
this.log = log;
this.connPool = connPool;
this.pushHandlerFactory = pushHandlerFactory;
this.sessionRef = new AtomicReference<>();
this.maxQueued = maxQueued;
this.sharedQueued = sharedQueued;
this.executionQueue = executionQueue;
}

@Override
Expand Down Expand Up @@ -179,7 +174,6 @@ public boolean isEndpointConnected() {
return endpoint != null && endpoint.session.isOpen();
}


Endpoint ensureValid() {
final Endpoint endpoint = sessionRef.get();
if (endpoint == null) {
Expand Down Expand Up @@ -261,49 +255,55 @@ public EndpointInfo getEndpointInfo() {
return null;
}

private boolean tryAcquireSlot() {
if (sharedQueued == null || maxQueued <= 0) {
return true;
@Override
public Cancellable execute(
final String id,
final AsyncClientExchangeHandler exchangeHandler,
final HttpClientContext context) {

final Endpoint endpoint = ensureValid();
final ComplexCancellable complexCancellable = new ComplexCancellable();

if (executionQueue == null) {
startExecution(id, endpoint, exchangeHandler, context, complexCancellable);
return complexCancellable;
}
for (;;) {
final int q = sharedQueued.get();
if (q >= maxQueued) {
return false;
}
if (sharedQueued.compareAndSet(q, q + 1)) {

final Cancellable queued = executionQueue.enqueue(
() -> {
final AsyncClientExchangeHandler wrapped =
new ReleasingAsyncClientExchangeHandler(exchangeHandler, executionQueue::completed);
try {
startExecution(id, endpoint, wrapped, context, complexCancellable);
} catch (final RuntimeException ex) {
wrapped.failed(ex);
}
},
exchangeHandler::cancel);

return () -> {
if (queued.cancel()) {
return true;
}
}
}

private void releaseSlot() {
if (sharedQueued != null && maxQueued > 0) {
sharedQueued.decrementAndGet();
}
return complexCancellable.cancel();
};
}

@Override
public Cancellable execute(
private void startExecution(
final String id,
final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
final Endpoint endpoint = ensureValid();
if (!tryAcquireSlot()) {
exchangeHandler.failed(new RejectedExecutionException(
"Execution pipeline queue limit reached (max=" + maxQueued + ")"));
return Operations.nonCancellable();
}
final AsyncClientExchangeHandler actual = sharedQueued != null
? new ReleasingAsyncClientExchangeHandler(exchangeHandler, this::releaseSlot)
: exchangeHandler;
final ComplexCancellable complexCancellable = new ComplexCancellable();
final Endpoint endpoint,
final AsyncClientExchangeHandler exchangeHandler,
final HttpClientContext context,
final ComplexCancellable complexCancellable) {

final IOSession session = endpoint.session;
if (session.isOpen()) {
if (log.isDebugEnabled()) {
log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
}
context.setProtocolVersion(HttpVersion.HTTP_2);
session.enqueue(
new RequestExecutionCommand(actual, pushHandlerFactory, complexCancellable, context),
new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
Command.Priority.NORMAL);
} else {
final HttpRoute route = endpoint.route;
Expand All @@ -321,23 +321,22 @@ public void completed(final IOSession ioSession) {
}
context.setProtocolVersion(HttpVersion.HTTP_2);
ioSession.enqueue(
new RequestExecutionCommand(actual, pushHandlerFactory, complexCancellable, context),
new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
Command.Priority.NORMAL);
}

@Override
public void failed(final Exception ex) {
actual.failed(ex);
exchangeHandler.failed(ex);
}

@Override
public void cancelled() {
actual.failed(new InterruptedIOException());
exchangeHandler.failed(new InterruptedIOException());
}

});
}
return complexCancellable;
}

@Override
Expand Down Expand Up @@ -369,7 +368,7 @@ public String getId() {

@Override
public AsyncExecRuntime fork() {
return new InternalH2AsyncExecRuntime(log, connPool, pushHandlerFactory, maxQueued, sharedQueued);
return new InternalH2AsyncExecRuntime(log, connPool, pushHandlerFactory, executionQueue);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.io.Closeable;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import org.apache.hc.client5.http.HttpRoute;
Expand Down Expand Up @@ -76,8 +75,12 @@ public final class InternalHttpAsyncClient extends InternalAbstractHttpAsyncClie
private final AsyncClientConnectionManager manager;
private final HttpRoutePlanner routePlanner;
private final TlsConfig tlsConfig;
private final int maxQueuedRequests;
private final AtomicInteger queuedCounter;

/**
* One shared FIFO queue per client instance.
* null means "unlimited" / no throttling.
*/
private final SharedRequestExecutionQueue executionQueue;

InternalHttpAsyncClient(
final DefaultConnectingIOReactor ioReactor,
Expand All @@ -101,13 +104,12 @@ public final class InternalHttpAsyncClient extends InternalAbstractHttpAsyncClie
this.manager = manager;
this.routePlanner = routePlanner;
this.tlsConfig = tlsConfig;
this.maxQueuedRequests = maxQueuedRequests;
this.queuedCounter = maxQueuedRequests > 0 ? new AtomicInteger(0) : null;
this.executionQueue = maxQueuedRequests > 0 ? new SharedRequestExecutionQueue(maxQueuedRequests) : null;
}

@Override
AsyncExecRuntime createAsyncExecRuntime(final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
return new InternalHttpAsyncExecRuntime(LOG, manager, getConnectionInitiator(), pushHandlerFactory, tlsConfig, maxQueuedRequests, queuedCounter);
return new InternalHttpAsyncExecRuntime(LOG, manager, getConnectionInitiator(), pushHandlerFactory, tlsConfig, executionQueue);
}

@Override
Expand Down
Loading