diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/OffLockDisposalCallback.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/OffLockDisposalCallback.java
new file mode 100644
index 0000000000..4d079eaa12
--- /dev/null
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/OffLockDisposalCallback.java
@@ -0,0 +1,61 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.impl.io;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.hc.core5.annotation.Internal;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.io.ModalCloseable;
+import org.apache.hc.core5.pool.DisposalCallback;
+
+@Internal
+final class OffLockDisposalCallback implements DisposalCallback {
+
+ private final DisposalCallback delegate;
+ private final Queue gracefulQueue = new ConcurrentLinkedQueue<>();
+
+ OffLockDisposalCallback(final DisposalCallback delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void execute(final T closeable, final CloseMode mode) {
+ if (mode == CloseMode.IMMEDIATE) {
+ delegate.execute(closeable, CloseMode.IMMEDIATE);
+ } else {
+ gracefulQueue.offer(closeable);
+ }
+ }
+
+ void drain() {
+ for (T c; (c = gracefulQueue.poll()) != null; ) {
+ delegate.execute(c, CloseMode.GRACEFUL);
+ }
+ }
+}
\ No newline at end of file
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java
index 4d58957b8e..2977f5f196 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java
@@ -72,6 +72,7 @@
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.pool.ConnPoolControl;
import org.apache.hc.core5.pool.DefaultDisposalCallback;
+import org.apache.hc.core5.pool.DisposalCallback;
import org.apache.hc.core5.pool.LaxConnPool;
import org.apache.hc.core5.pool.ManagedConnPool;
import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
@@ -107,10 +108,13 @@
*/
@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
public class PoolingHttpClientConnectionManager
- implements HttpClientConnectionManager, ConnPoolControl {
+ implements HttpClientConnectionManager, ConnPoolControl {
private static final Logger LOG = LoggerFactory.getLogger(PoolingHttpClientConnectionManager.class);
+ private final DisposalCallback defaultDisposal;
+ private final OffLockDisposalCallback offLockDisposer;
+
public static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 25;
public static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = 5;
@@ -216,8 +220,26 @@ public PoolingHttpClientConnectionManager(
final PoolReusePolicy poolReusePolicy,
final TimeValue timeToLive,
final HttpConnectionFactory connFactory) {
+ this(httpClientConnectionOperator,poolConcurrencyPolicy,poolReusePolicy,timeToLive,connFactory,false);
+
+ }
+
+ @Internal
+ public PoolingHttpClientConnectionManager(
+ final HttpClientConnectionOperator httpClientConnectionOperator,
+ final PoolConcurrencyPolicy poolConcurrencyPolicy,
+ final PoolReusePolicy poolReusePolicy,
+ final TimeValue timeToLive,
+ final HttpConnectionFactory connFactory,
+ final boolean offLockDisposalEnabled) {
super();
this.connectionOperator = Args.notNull(httpClientConnectionOperator, "Connection operator");
+
+ this.defaultDisposal = new DefaultDisposalCallback<>();
+ this.offLockDisposer = offLockDisposalEnabled ? new OffLockDisposalCallback<>(this.defaultDisposal) : null;
+ final DisposalCallback callbackForPool = offLockDisposalEnabled ? this.offLockDisposer : this.defaultDisposal;
+
+
switch (poolConcurrencyPolicy != null ? poolConcurrencyPolicy : PoolConcurrencyPolicy.STRICT) {
case STRICT:
this.pool = new StrictConnPool(
@@ -225,7 +247,7 @@ public PoolingHttpClientConnectionManager(
DEFAULT_MAX_TOTAL_CONNECTIONS,
timeToLive,
poolReusePolicy,
- new DefaultDisposalCallback<>(),
+ callbackForPool,
null) {
@Override
@@ -240,6 +262,7 @@ public void closeExpired() {
DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
timeToLive,
poolReusePolicy,
+ callbackForPool,
null) {
@Override
@@ -266,6 +289,8 @@ protected PoolingHttpClientConnectionManager(
this.pool = Args.notNull(pool, "Connection pool");
this.connFactory = connFactory != null ? connFactory : ManagedHttpClientConnectionFactory.INSTANCE;
this.closed = new AtomicBoolean(false);
+ this.defaultDisposal = null;
+ this.offLockDisposer = null;
}
@Override
@@ -280,6 +305,7 @@ public void close(final CloseMode closeMode) {
LOG.debug("Shutdown connection pool {}", closeMode);
}
this.pool.close(closeMode);
+ drainDisposals();
LOG.debug("Connection pool shut down");
}
}
@@ -386,6 +412,10 @@ public ConnectionEndpoint get(
}
}
}
+
+ // Single drain point under the lease lock.
+ drainDisposals();
+
final ManagedHttpClientConnection conn = poolEntry.getConnection();
if (conn != null) {
conn.activate();
@@ -472,6 +502,7 @@ public void release(final ConnectionEndpoint endpoint, final Object state, final
if (LOG.isDebugEnabled()) {
LOG.debug("{} connection released {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.formatStats(entry.getRoute(), entry.getState(), pool));
}
+ drainDisposals();
}
}
@@ -541,6 +572,7 @@ public void closeIdle(final TimeValue idleTime) {
return;
}
this.pool.closeIdle(idleTime);
+ drainDisposals();
}
@Override
@@ -550,6 +582,7 @@ public void closeExpired() {
}
LOG.debug("Closing expired connections");
this.pool.closeExpired();
+ drainDisposals();
}
@Override
@@ -825,16 +858,17 @@ public HttpConnection get() {
}
/**
- * Method that can be called to determine whether the connection manager has been shut down and
- * is closed or not.
+ * Returns whether this connection manager has been shut down.
*
- * @return {@code true} if the connection manager has been shut down and is closed, otherwise
- * return {@code false}.
* @since 5.4
*/
public boolean isClosed() {
return this.closed.get();
}
-
+ private void drainDisposals() {
+ if (offLockDisposer != null) {
+ offLockDisposer.drain();
+ }
+ }
}
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManagerBuilder.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManagerBuilder.java
index d76c2720bc..2f6c4aff93 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManagerBuilder.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManagerBuilder.java
@@ -38,6 +38,7 @@
import org.apache.hc.client5.http.io.ManagedHttpClientConnection;
import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
import org.apache.hc.client5.http.ssl.TlsSocketStrategy;
+import org.apache.hc.core5.annotation.Experimental;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.function.Resolver;
import org.apache.hc.core5.http.HttpHost;
@@ -92,6 +93,8 @@ public class PoolingHttpClientConnectionManagerBuilder {
private int maxConnTotal;
private int maxConnPerRoute;
+ private boolean offLockDisposalEnabled;
+
public static PoolingHttpClientConnectionManagerBuilder create() {
return new PoolingHttpClientConnectionManagerBuilder();
}
@@ -304,6 +307,16 @@ public final PoolingHttpClientConnectionManagerBuilder useSystemProperties() {
return this;
}
+ /**
+ * Enable/disable off-lock disposal.
+ * @since 5.6
+ */
+ @Experimental
+ public final PoolingHttpClientConnectionManagerBuilder setOffLockDisposalEnabled(final boolean enabled) {
+ this.offLockDisposalEnabled = enabled;
+ return this;
+ }
+
@Internal
protected HttpClientConnectionOperator createConnectionOperator(
final SchemePortResolver schemePortResolver,
@@ -332,7 +345,8 @@ public PoolingHttpClientConnectionManager build() {
poolConcurrencyPolicy,
poolReusePolicy,
null,
- connectionFactory);
+ connectionFactory,
+ offLockDisposalEnabled);
poolingmgr.setSocketConfigResolver(socketConfigResolver);
poolingmgr.setConnectionConfigResolver(connectionConfigResolver);
poolingmgr.setTlsConfigResolver(tlsConfigResolver);
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/NoopOperator.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/NoopOperator.java
new file mode 100644
index 0000000000..ba541b54ce
--- /dev/null
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/NoopOperator.java
@@ -0,0 +1,59 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.impl.io;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hc.client5.http.io.HttpClientConnectionOperator;
+import org.apache.hc.client5.http.io.ManagedHttpClientConnection;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.io.SocketConfig;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.util.TimeValue;
+
+public final class NoopOperator implements HttpClientConnectionOperator {
+
+ @Override
+ public void connect(
+ final ManagedHttpClientConnection conn,
+ final HttpHost host,
+ final InetSocketAddress localAddress,
+ final TimeValue connectTimeout,
+ final SocketConfig socketConfig,
+ final HttpContext context) throws IOException {
+ // no-op for tests
+ }
+
+ @Override
+ public void upgrade(
+ final ManagedHttpClientConnection conn,
+ final HttpHost host,
+ final HttpContext context) throws IOException {
+ // no-op for tests
+ }
+}
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManagerOffLockDisposal.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManagerOffLockDisposal.java
new file mode 100644
index 0000000000..bae2851428
--- /dev/null
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManagerOffLockDisposal.java
@@ -0,0 +1,309 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.impl.io;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLSession;
+
+import org.apache.hc.client5.http.HttpRoute;
+import org.apache.hc.client5.http.io.ConnectionEndpoint;
+import org.apache.hc.client5.http.io.ManagedHttpClientConnection;
+import org.apache.hc.core5.http.ClassicHttpRequest;
+import org.apache.hc.core5.http.ClassicHttpResponse;
+import org.apache.hc.core5.http.EndpointDetails;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.ProtocolVersion;
+import org.apache.hc.core5.http.URIScheme;
+import org.apache.hc.core5.http.io.HttpConnectionFactory;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
+import org.apache.hc.core5.pool.PoolReusePolicy;
+import org.apache.hc.core5.util.TimeValue;
+import org.apache.hc.core5.util.Timeout;
+import org.junit.jupiter.api.Test;
+
+class TestPoolingHttpClientConnectionManagerOffLockDisposal {
+
+ // Simulates slow close only for GRACEFUL
+ static final class SleeperConnection implements ManagedHttpClientConnection {
+ private volatile boolean open = true;
+ private volatile Timeout soTimeout = Timeout.DISABLED;
+ private final long sleepMillis;
+
+ SleeperConnection(final long sleepMillis) {
+ this.sleepMillis = sleepMillis;
+ }
+
+ @Override
+ public void bind(final Socket socket) {
+ }
+
+ @Override
+ public void close(final CloseMode closeMode) {
+ try {
+ if (closeMode == CloseMode.GRACEFUL) {
+ Thread.sleep(sleepMillis);
+ }
+ } catch (final InterruptedException ignore) {
+ Thread.currentThread().interrupt();
+ } finally {
+ open = false;
+ }
+ }
+
+ @Override
+ public void close() {
+ close(CloseMode.GRACEFUL);
+ }
+
+ @Override
+ public EndpointDetails getEndpointDetails() {
+ return null;
+ }
+
+ @Override
+ public SocketAddress getLocalAddress() {
+ return null;
+ }
+
+ @Override
+ public SocketAddress getRemoteAddress() {
+ return null;
+ }
+
+ @Override
+ public Socket getSocket() {
+ return null;
+ }
+
+ @Override
+ public SSLSession getSSLSession() {
+ return null;
+ }
+
+ @Override
+ public void passivate() {
+ }
+
+ @Override
+ public void activate() {
+ }
+
+ @Override
+ public boolean isOpen() {
+ return open;
+ }
+
+ @Override
+ public boolean isConsistent() {
+ return true;
+ }
+
+ @Override
+ public boolean isDataAvailable(final Timeout timeout) {
+ return false;
+ }
+
+ @Override
+ public boolean isStale() {
+ return false;
+ }
+
+ @Override
+ public void setSocketTimeout(final Timeout timeout) {
+ this.soTimeout = timeout;
+ }
+
+ @Override
+ public Timeout getSocketTimeout() {
+ return soTimeout;
+ }
+
+ @Override
+ public void sendRequestHeader(final ClassicHttpRequest request) {
+ }
+
+ @Override
+ public void sendRequestEntity(final ClassicHttpRequest request) {
+ }
+
+ @Override
+ public void flush() {
+ }
+
+ @Override
+ public ClassicHttpResponse receiveResponseHeader() {
+ return null;
+ }
+
+ @Override
+ public void receiveResponseEntity(final ClassicHttpResponse response) {
+ }
+
+ @Override
+ public void terminateRequest(final ClassicHttpRequest request) {
+ }
+
+ @Override
+ public ProtocolVersion getProtocolVersion() {
+ return null;
+ }
+ }
+
+ private static HttpConnectionFactory sleeperFactory(final long ms) {
+ return socket -> new SleeperConnection(ms);
+ }
+
+ private static ConnectionEndpoint lease(final PoolingHttpClientConnectionManager mgr,
+ final String id, final HttpRoute route, final Object state,
+ final long sec) throws Exception {
+ return mgr.lease(id, route, Timeout.ofSeconds((int) sec), state).get(Timeout.ofSeconds((int) sec));
+ }
+
+ // Measure only the lease latency; release happens outside this window
+ private static long leaseAndMeasure(final PoolingHttpClientConnectionManager mgr,
+ final String id, final HttpRoute route, final Object state,
+ final long sec) throws Exception {
+ final long start = System.nanoTime();
+ final ConnectionEndpoint ep = lease(mgr, id, route, state, sec);
+ final long elapsed = (System.nanoTime() - start) / 1_000_000L;
+ mgr.release(ep, state, TimeValue.ofSeconds(30)); // keep-alive, goes back to AVAILABLE
+ return elapsed;
+ }
+
+ private static PoolingHttpClientConnectionManager newMgrStrict(final long sleeperMs) {
+ return PoolingHttpClientConnectionManagerBuilder.create()
+ .setOffLockDisposalEnabled(true)
+ .setConnPoolPolicy(PoolReusePolicy.LIFO)
+ .setPoolConcurrencyPolicy(PoolConcurrencyPolicy.STRICT)
+ .setConnectionFactory(sleeperFactory(sleeperMs))
+ .build();
+ }
+
+ private static PoolingHttpClientConnectionManager newMgrLax(final long sleeperMs) {
+ return PoolingHttpClientConnectionManagerBuilder.create()
+ .setOffLockDisposalEnabled(true)
+ .setConnPoolPolicy(PoolReusePolicy.LIFO)
+ .setPoolConcurrencyPolicy(PoolConcurrencyPolicy.LAX)
+ .setConnectionFactory(sleeperFactory(sleeperMs))
+ .build();
+ }
+
+ @Test
+ void strictEviction_offLock_otherThreadLeasesFast() throws Exception {
+ final PoolingHttpClientConnectionManager mgr = newMgrStrict(1200);
+
+ final HttpRoute rA = new HttpRoute(new HttpHost(URIScheme.HTTP.id, "a.example", 80));
+ final HttpRoute rB = new HttpRoute(new HttpHost(URIScheme.HTTP.id, "b.example", 80));
+ final HttpRoute rC = new HttpRoute(new HttpHost(URIScheme.HTTP.id, "c.example", 80));
+
+ mgr.setMaxTotal(2);
+ mgr.setMaxPerRoute(rA, 1);
+ mgr.setMaxPerRoute(rB, 1);
+ mgr.setMaxPerRoute(rC, 1);
+
+ final ConnectionEndpoint epA0 = lease(mgr, "seedA", rA, null, 2);
+ mgr.release(epA0, null, TimeValue.ofSeconds(30));
+ final ConnectionEndpoint epB0 = lease(mgr, "seedB", rB, null, 2);
+ mgr.release(epB0, null, TimeValue.ofSeconds(30));
+
+ final ExecutorService es = Executors.newFixedThreadPool(2);
+ try {
+ final Callable t1Lease = () -> {
+ final long start = System.nanoTime();
+ final ConnectionEndpoint epC = lease(mgr, "t1", rC, null, 3);
+ mgr.release(epC, null, TimeValue.ofSeconds(5));
+ return (System.nanoTime() - start) / 1_000_000L;
+ };
+
+ final Callable t2Lease = () -> {
+ // small stagger so we overlap with t1’s drain window
+ Thread.sleep(200);
+ return leaseAndMeasure(mgr, "t2", rA, null, 2);
+ };
+
+ final long t1LeaseMs = es.submit(t1Lease).get(6, TimeUnit.SECONDS);
+ final long t2LeaseMs = es.submit(t2Lease).get(6, TimeUnit.SECONDS);
+
+ assertTrue(t1LeaseMs >= 900L, "T1 lease should include slow drain: " + t1LeaseMs + "ms");
+
+ assertTrue(t2LeaseMs < 1900L, "T2 lease should complete without timing out: " + t2LeaseMs + "ms");
+ } finally {
+ es.shutdownNow();
+ mgr.close(CloseMode.IMMEDIATE);
+ }
+ }
+
+
+ @Test
+ void leaseNotBlocked_LAX_stateMismatchDiscard_offLockDisposal() throws Exception {
+ final PoolingHttpClientConnectionManager mgr = newMgrLax(1200);
+
+ final HttpRoute route = new HttpRoute(new HttpHost(URIScheme.HTTP.id, "lax.example", 80));
+ mgr.setMaxTotal(2);
+ mgr.setMaxPerRoute(route, 2);
+
+ final ConnectionEndpoint epA = lease(mgr, "tA", route, "A", 2);
+ mgr.release(epA, "A", TimeValue.ofSeconds(30));
+
+ final ExecutorService es = Executors.newFixedThreadPool(2);
+ try {
+ final Callable t1Lease = () -> {
+ final long start = System.nanoTime();
+ final ConnectionEndpoint epB = lease(mgr, "tB", route, "B", 3); // state mismatch discard
+ // drainDisposals() runs in finally inside LeaseRequest#get → this lease measures the drain
+ mgr.release(epB, "B", TimeValue.ofSeconds(5));
+ return (System.nanoTime() - start) / 1_000_000L;
+ };
+
+ // T2: concurrent lease "B" should be fast
+ final Callable t2Lease = () -> {
+ Thread.sleep(50);
+ return leaseAndMeasure(mgr, "t2", route, "B", 2);
+ };
+
+ final long t1LeaseMs = es.submit(t1Lease).get(6, TimeUnit.SECONDS);
+ final long t2LeaseMs = es.submit(t2Lease).get(6, TimeUnit.SECONDS);
+
+ // With drain in LeaseRequest#get, T1 lease is slow; T2 remains fast.
+ assertTrue(t1LeaseMs >= 1000L, "T1 lease should include slow drain: " + t1LeaseMs + "ms");
+ assertTrue(t2LeaseMs < 300L, "Lease blocked by in-thread discard: " + t2LeaseMs + "ms");
+
+ } finally {
+ es.shutdownNow();
+ mgr.close(CloseMode.IMMEDIATE);
+ }
+ }
+}