diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/OffLockDisposalCallback.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/OffLockDisposalCallback.java new file mode 100644 index 0000000000..4d079eaa12 --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/OffLockDisposalCallback.java @@ -0,0 +1,61 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.io; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.hc.core5.annotation.Internal; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.io.ModalCloseable; +import org.apache.hc.core5.pool.DisposalCallback; + +@Internal +final class OffLockDisposalCallback implements DisposalCallback { + + private final DisposalCallback delegate; + private final Queue gracefulQueue = new ConcurrentLinkedQueue<>(); + + OffLockDisposalCallback(final DisposalCallback delegate) { + this.delegate = delegate; + } + + @Override + public void execute(final T closeable, final CloseMode mode) { + if (mode == CloseMode.IMMEDIATE) { + delegate.execute(closeable, CloseMode.IMMEDIATE); + } else { + gracefulQueue.offer(closeable); + } + } + + void drain() { + for (T c; (c = gracefulQueue.poll()) != null; ) { + delegate.execute(c, CloseMode.GRACEFUL); + } + } +} \ No newline at end of file diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java index 4d58957b8e..2977f5f196 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java @@ -72,6 +72,7 @@ import org.apache.hc.core5.io.CloseMode; import org.apache.hc.core5.pool.ConnPoolControl; import org.apache.hc.core5.pool.DefaultDisposalCallback; +import org.apache.hc.core5.pool.DisposalCallback; import org.apache.hc.core5.pool.LaxConnPool; import org.apache.hc.core5.pool.ManagedConnPool; import org.apache.hc.core5.pool.PoolConcurrencyPolicy; @@ -107,10 +108,13 @@ */ @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL) public class PoolingHttpClientConnectionManager - implements HttpClientConnectionManager, ConnPoolControl { + implements HttpClientConnectionManager, ConnPoolControl { private static final Logger LOG = LoggerFactory.getLogger(PoolingHttpClientConnectionManager.class); + private final DisposalCallback defaultDisposal; + private final OffLockDisposalCallback offLockDisposer; + public static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 25; public static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = 5; @@ -216,8 +220,26 @@ public PoolingHttpClientConnectionManager( final PoolReusePolicy poolReusePolicy, final TimeValue timeToLive, final HttpConnectionFactory connFactory) { + this(httpClientConnectionOperator,poolConcurrencyPolicy,poolReusePolicy,timeToLive,connFactory,false); + + } + + @Internal + public PoolingHttpClientConnectionManager( + final HttpClientConnectionOperator httpClientConnectionOperator, + final PoolConcurrencyPolicy poolConcurrencyPolicy, + final PoolReusePolicy poolReusePolicy, + final TimeValue timeToLive, + final HttpConnectionFactory connFactory, + final boolean offLockDisposalEnabled) { super(); this.connectionOperator = Args.notNull(httpClientConnectionOperator, "Connection operator"); + + this.defaultDisposal = new DefaultDisposalCallback<>(); + this.offLockDisposer = offLockDisposalEnabled ? new OffLockDisposalCallback<>(this.defaultDisposal) : null; + final DisposalCallback callbackForPool = offLockDisposalEnabled ? this.offLockDisposer : this.defaultDisposal; + + switch (poolConcurrencyPolicy != null ? poolConcurrencyPolicy : PoolConcurrencyPolicy.STRICT) { case STRICT: this.pool = new StrictConnPool( @@ -225,7 +247,7 @@ public PoolingHttpClientConnectionManager( DEFAULT_MAX_TOTAL_CONNECTIONS, timeToLive, poolReusePolicy, - new DefaultDisposalCallback<>(), + callbackForPool, null) { @Override @@ -240,6 +262,7 @@ public void closeExpired() { DEFAULT_MAX_CONNECTIONS_PER_ROUTE, timeToLive, poolReusePolicy, + callbackForPool, null) { @Override @@ -266,6 +289,8 @@ protected PoolingHttpClientConnectionManager( this.pool = Args.notNull(pool, "Connection pool"); this.connFactory = connFactory != null ? connFactory : ManagedHttpClientConnectionFactory.INSTANCE; this.closed = new AtomicBoolean(false); + this.defaultDisposal = null; + this.offLockDisposer = null; } @Override @@ -280,6 +305,7 @@ public void close(final CloseMode closeMode) { LOG.debug("Shutdown connection pool {}", closeMode); } this.pool.close(closeMode); + drainDisposals(); LOG.debug("Connection pool shut down"); } } @@ -386,6 +412,10 @@ public ConnectionEndpoint get( } } } + + // Single drain point under the lease lock. + drainDisposals(); + final ManagedHttpClientConnection conn = poolEntry.getConnection(); if (conn != null) { conn.activate(); @@ -472,6 +502,7 @@ public void release(final ConnectionEndpoint endpoint, final Object state, final if (LOG.isDebugEnabled()) { LOG.debug("{} connection released {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.formatStats(entry.getRoute(), entry.getState(), pool)); } + drainDisposals(); } } @@ -541,6 +572,7 @@ public void closeIdle(final TimeValue idleTime) { return; } this.pool.closeIdle(idleTime); + drainDisposals(); } @Override @@ -550,6 +582,7 @@ public void closeExpired() { } LOG.debug("Closing expired connections"); this.pool.closeExpired(); + drainDisposals(); } @Override @@ -825,16 +858,17 @@ public HttpConnection get() { } /** - * Method that can be called to determine whether the connection manager has been shut down and - * is closed or not. + * Returns whether this connection manager has been shut down. * - * @return {@code true} if the connection manager has been shut down and is closed, otherwise - * return {@code false}. * @since 5.4 */ public boolean isClosed() { return this.closed.get(); } - + private void drainDisposals() { + if (offLockDisposer != null) { + offLockDisposer.drain(); + } + } } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManagerBuilder.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManagerBuilder.java index d76c2720bc..2f6c4aff93 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManagerBuilder.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManagerBuilder.java @@ -38,6 +38,7 @@ import org.apache.hc.client5.http.io.ManagedHttpClientConnection; import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy; import org.apache.hc.client5.http.ssl.TlsSocketStrategy; +import org.apache.hc.core5.annotation.Experimental; import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.function.Resolver; import org.apache.hc.core5.http.HttpHost; @@ -92,6 +93,8 @@ public class PoolingHttpClientConnectionManagerBuilder { private int maxConnTotal; private int maxConnPerRoute; + private boolean offLockDisposalEnabled; + public static PoolingHttpClientConnectionManagerBuilder create() { return new PoolingHttpClientConnectionManagerBuilder(); } @@ -304,6 +307,16 @@ public final PoolingHttpClientConnectionManagerBuilder useSystemProperties() { return this; } + /** + * Enable/disable off-lock disposal. + * @since 5.6 + */ + @Experimental + public final PoolingHttpClientConnectionManagerBuilder setOffLockDisposalEnabled(final boolean enabled) { + this.offLockDisposalEnabled = enabled; + return this; + } + @Internal protected HttpClientConnectionOperator createConnectionOperator( final SchemePortResolver schemePortResolver, @@ -332,7 +345,8 @@ public PoolingHttpClientConnectionManager build() { poolConcurrencyPolicy, poolReusePolicy, null, - connectionFactory); + connectionFactory, + offLockDisposalEnabled); poolingmgr.setSocketConfigResolver(socketConfigResolver); poolingmgr.setConnectionConfigResolver(connectionConfigResolver); poolingmgr.setTlsConfigResolver(tlsConfigResolver); diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/NoopOperator.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/NoopOperator.java new file mode 100644 index 0000000000..ba541b54ce --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/NoopOperator.java @@ -0,0 +1,59 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.io; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hc.client5.http.io.HttpClientConnectionOperator; +import org.apache.hc.client5.http.io.ManagedHttpClientConnection; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.io.SocketConfig; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.util.TimeValue; + +public final class NoopOperator implements HttpClientConnectionOperator { + + @Override + public void connect( + final ManagedHttpClientConnection conn, + final HttpHost host, + final InetSocketAddress localAddress, + final TimeValue connectTimeout, + final SocketConfig socketConfig, + final HttpContext context) throws IOException { + // no-op for tests + } + + @Override + public void upgrade( + final ManagedHttpClientConnection conn, + final HttpHost host, + final HttpContext context) throws IOException { + // no-op for tests + } +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManagerOffLockDisposal.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManagerOffLockDisposal.java new file mode 100644 index 0000000000..bae2851428 --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManagerOffLockDisposal.java @@ -0,0 +1,309 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.io; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.Socket; +import java.net.SocketAddress; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import javax.net.ssl.SSLSession; + +import org.apache.hc.client5.http.HttpRoute; +import org.apache.hc.client5.http.io.ConnectionEndpoint; +import org.apache.hc.client5.http.io.ManagedHttpClientConnection; +import org.apache.hc.core5.http.ClassicHttpRequest; +import org.apache.hc.core5.http.ClassicHttpResponse; +import org.apache.hc.core5.http.EndpointDetails; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.ProtocolVersion; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.io.HttpConnectionFactory; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.pool.PoolConcurrencyPolicy; +import org.apache.hc.core5.pool.PoolReusePolicy; +import org.apache.hc.core5.util.TimeValue; +import org.apache.hc.core5.util.Timeout; +import org.junit.jupiter.api.Test; + +class TestPoolingHttpClientConnectionManagerOffLockDisposal { + + // Simulates slow close only for GRACEFUL + static final class SleeperConnection implements ManagedHttpClientConnection { + private volatile boolean open = true; + private volatile Timeout soTimeout = Timeout.DISABLED; + private final long sleepMillis; + + SleeperConnection(final long sleepMillis) { + this.sleepMillis = sleepMillis; + } + + @Override + public void bind(final Socket socket) { + } + + @Override + public void close(final CloseMode closeMode) { + try { + if (closeMode == CloseMode.GRACEFUL) { + Thread.sleep(sleepMillis); + } + } catch (final InterruptedException ignore) { + Thread.currentThread().interrupt(); + } finally { + open = false; + } + } + + @Override + public void close() { + close(CloseMode.GRACEFUL); + } + + @Override + public EndpointDetails getEndpointDetails() { + return null; + } + + @Override + public SocketAddress getLocalAddress() { + return null; + } + + @Override + public SocketAddress getRemoteAddress() { + return null; + } + + @Override + public Socket getSocket() { + return null; + } + + @Override + public SSLSession getSSLSession() { + return null; + } + + @Override + public void passivate() { + } + + @Override + public void activate() { + } + + @Override + public boolean isOpen() { + return open; + } + + @Override + public boolean isConsistent() { + return true; + } + + @Override + public boolean isDataAvailable(final Timeout timeout) { + return false; + } + + @Override + public boolean isStale() { + return false; + } + + @Override + public void setSocketTimeout(final Timeout timeout) { + this.soTimeout = timeout; + } + + @Override + public Timeout getSocketTimeout() { + return soTimeout; + } + + @Override + public void sendRequestHeader(final ClassicHttpRequest request) { + } + + @Override + public void sendRequestEntity(final ClassicHttpRequest request) { + } + + @Override + public void flush() { + } + + @Override + public ClassicHttpResponse receiveResponseHeader() { + return null; + } + + @Override + public void receiveResponseEntity(final ClassicHttpResponse response) { + } + + @Override + public void terminateRequest(final ClassicHttpRequest request) { + } + + @Override + public ProtocolVersion getProtocolVersion() { + return null; + } + } + + private static HttpConnectionFactory sleeperFactory(final long ms) { + return socket -> new SleeperConnection(ms); + } + + private static ConnectionEndpoint lease(final PoolingHttpClientConnectionManager mgr, + final String id, final HttpRoute route, final Object state, + final long sec) throws Exception { + return mgr.lease(id, route, Timeout.ofSeconds((int) sec), state).get(Timeout.ofSeconds((int) sec)); + } + + // Measure only the lease latency; release happens outside this window + private static long leaseAndMeasure(final PoolingHttpClientConnectionManager mgr, + final String id, final HttpRoute route, final Object state, + final long sec) throws Exception { + final long start = System.nanoTime(); + final ConnectionEndpoint ep = lease(mgr, id, route, state, sec); + final long elapsed = (System.nanoTime() - start) / 1_000_000L; + mgr.release(ep, state, TimeValue.ofSeconds(30)); // keep-alive, goes back to AVAILABLE + return elapsed; + } + + private static PoolingHttpClientConnectionManager newMgrStrict(final long sleeperMs) { + return PoolingHttpClientConnectionManagerBuilder.create() + .setOffLockDisposalEnabled(true) + .setConnPoolPolicy(PoolReusePolicy.LIFO) + .setPoolConcurrencyPolicy(PoolConcurrencyPolicy.STRICT) + .setConnectionFactory(sleeperFactory(sleeperMs)) + .build(); + } + + private static PoolingHttpClientConnectionManager newMgrLax(final long sleeperMs) { + return PoolingHttpClientConnectionManagerBuilder.create() + .setOffLockDisposalEnabled(true) + .setConnPoolPolicy(PoolReusePolicy.LIFO) + .setPoolConcurrencyPolicy(PoolConcurrencyPolicy.LAX) + .setConnectionFactory(sleeperFactory(sleeperMs)) + .build(); + } + + @Test + void strictEviction_offLock_otherThreadLeasesFast() throws Exception { + final PoolingHttpClientConnectionManager mgr = newMgrStrict(1200); + + final HttpRoute rA = new HttpRoute(new HttpHost(URIScheme.HTTP.id, "a.example", 80)); + final HttpRoute rB = new HttpRoute(new HttpHost(URIScheme.HTTP.id, "b.example", 80)); + final HttpRoute rC = new HttpRoute(new HttpHost(URIScheme.HTTP.id, "c.example", 80)); + + mgr.setMaxTotal(2); + mgr.setMaxPerRoute(rA, 1); + mgr.setMaxPerRoute(rB, 1); + mgr.setMaxPerRoute(rC, 1); + + final ConnectionEndpoint epA0 = lease(mgr, "seedA", rA, null, 2); + mgr.release(epA0, null, TimeValue.ofSeconds(30)); + final ConnectionEndpoint epB0 = lease(mgr, "seedB", rB, null, 2); + mgr.release(epB0, null, TimeValue.ofSeconds(30)); + + final ExecutorService es = Executors.newFixedThreadPool(2); + try { + final Callable t1Lease = () -> { + final long start = System.nanoTime(); + final ConnectionEndpoint epC = lease(mgr, "t1", rC, null, 3); + mgr.release(epC, null, TimeValue.ofSeconds(5)); + return (System.nanoTime() - start) / 1_000_000L; + }; + + final Callable t2Lease = () -> { + // small stagger so we overlap with t1’s drain window + Thread.sleep(200); + return leaseAndMeasure(mgr, "t2", rA, null, 2); + }; + + final long t1LeaseMs = es.submit(t1Lease).get(6, TimeUnit.SECONDS); + final long t2LeaseMs = es.submit(t2Lease).get(6, TimeUnit.SECONDS); + + assertTrue(t1LeaseMs >= 900L, "T1 lease should include slow drain: " + t1LeaseMs + "ms"); + + assertTrue(t2LeaseMs < 1900L, "T2 lease should complete without timing out: " + t2LeaseMs + "ms"); + } finally { + es.shutdownNow(); + mgr.close(CloseMode.IMMEDIATE); + } + } + + + @Test + void leaseNotBlocked_LAX_stateMismatchDiscard_offLockDisposal() throws Exception { + final PoolingHttpClientConnectionManager mgr = newMgrLax(1200); + + final HttpRoute route = new HttpRoute(new HttpHost(URIScheme.HTTP.id, "lax.example", 80)); + mgr.setMaxTotal(2); + mgr.setMaxPerRoute(route, 2); + + final ConnectionEndpoint epA = lease(mgr, "tA", route, "A", 2); + mgr.release(epA, "A", TimeValue.ofSeconds(30)); + + final ExecutorService es = Executors.newFixedThreadPool(2); + try { + final Callable t1Lease = () -> { + final long start = System.nanoTime(); + final ConnectionEndpoint epB = lease(mgr, "tB", route, "B", 3); // state mismatch discard + // drainDisposals() runs in finally inside LeaseRequest#get → this lease measures the drain + mgr.release(epB, "B", TimeValue.ofSeconds(5)); + return (System.nanoTime() - start) / 1_000_000L; + }; + + // T2: concurrent lease "B" should be fast + final Callable t2Lease = () -> { + Thread.sleep(50); + return leaseAndMeasure(mgr, "t2", route, "B", 2); + }; + + final long t1LeaseMs = es.submit(t1Lease).get(6, TimeUnit.SECONDS); + final long t2LeaseMs = es.submit(t2Lease).get(6, TimeUnit.SECONDS); + + // With drain in LeaseRequest#get, T1 lease is slow; T2 remains fast. + assertTrue(t1LeaseMs >= 1000L, "T1 lease should include slow drain: " + t1LeaseMs + "ms"); + assertTrue(t2LeaseMs < 300L, "Lease blocked by in-thread discard: " + t2LeaseMs + "ms"); + + } finally { + es.shutdownNow(); + mgr.close(CloseMode.IMMEDIATE); + } + } +}