diff --git a/.changes/next-release/bugfix-Apache5HTTPClient-9de458e.json b/.changes/next-release/bugfix-Apache5HTTPClient-9de458e.json
new file mode 100644
index 000000000000..01eb808d9d22
--- /dev/null
+++ b/.changes/next-release/bugfix-Apache5HTTPClient-9de458e.json
@@ -0,0 +1,6 @@
+{
+ "type": "bugfix",
+ "category": "Apache 5 HTTP Client",
+ "contributor": "",
+ "description": "Fixed a connection leak that could occur when the thread waiting to acquire a connection from the pool is interrupted. Fixes [#6786](https://github.com/aws/aws-sdk-java-v2/issues/6786)."
+}
diff --git a/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/Apache5HttpClient.java b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/Apache5HttpClient.java
index ef4780866f36..6a4b738fdac7 100644
--- a/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/Apache5HttpClient.java
+++ b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/Apache5HttpClient.java
@@ -50,7 +50,6 @@
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
-import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
import org.apache.hc.client5.http.impl.routing.DefaultRoutePlanner;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.hc.client5.http.protocol.HttpClientContext;
@@ -92,6 +91,7 @@
import software.amazon.awssdk.http.apache5.internal.SdkProxyRoutePlanner;
import software.amazon.awssdk.http.apache5.internal.conn.ClientConnectionManagerFactory;
import software.amazon.awssdk.http.apache5.internal.conn.IdleConnectionReaper;
+import software.amazon.awssdk.http.apache5.internal.conn.SafePoolingHttpClientConnectionManagerBuilder;
import software.amazon.awssdk.http.apache5.internal.conn.SdkConnectionKeepAliveStrategy;
import software.amazon.awssdk.http.apache5.internal.conn.SdkTlsSocketFactory;
import software.amazon.awssdk.http.apache5.internal.impl.Apache5HttpRequestFactory;
@@ -755,8 +755,8 @@ public PoolingHttpClientConnectionManager create(Apache5HttpClient.DefaultBuilde
TlsSocketStrategy tlsStrategy = getPreferredTlsStrategy(configuration, standardOptions);
- PoolingHttpClientConnectionManagerBuilder builder =
- PoolingHttpClientConnectionManagerBuilder.create()
+ SafePoolingHttpClientConnectionManagerBuilder builder =
+ SafePoolingHttpClientConnectionManagerBuilder.create()
.setTlsSocketStrategy(tlsStrategy)
.setSchemePortResolver(DefaultSchemePortResolver.INSTANCE)
.setDnsResolver(configuration.dnsResolver);
diff --git a/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManager.java b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManager.java
new file mode 100644
index 000000000000..7f2b52bde072
--- /dev/null
+++ b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManager.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.apache5.internal.conn;
+
+import org.apache.hc.client5.http.impl.io.ManagedHttpClientConnectionFactory;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
+import org.apache.hc.client5.http.io.HttpClientConnectionOperator;
+import org.apache.hc.core5.pool.DefaultDisposalCallback;
+import org.apache.hc.core5.pool.PoolReusePolicy;
+import software.amazon.awssdk.annotations.SdkInternalApi;
+
+/**
+ * Specialization of {@link PoolingHttpClientConnectionManager} to enable use of {@link SafeStrictConnPool} to prevent leaking
+ * connections when the thread waiting on the future is interrupted.
+ */
+@SdkInternalApi
+public final class SafePoolingHttpClientConnectionManager extends PoolingHttpClientConnectionManager {
+ public SafePoolingHttpClientConnectionManager(HttpClientConnectionOperator connectionOperator) {
+ super(connectionOperator,
+ new SafeStrictConnPool(
+ DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
+ DEFAULT_MAX_TOTAL_CONNECTIONS,
+ null,
+ PoolReusePolicy.LIFO,
+ new DefaultDisposalCallback<>(),
+ null
+ ),
+ ManagedHttpClientConnectionFactory.INSTANCE
+ );
+ }
+}
diff --git a/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManagerBuilder.java b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManagerBuilder.java
new file mode 100644
index 000000000000..36373ce71290
--- /dev/null
+++ b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManagerBuilder.java
@@ -0,0 +1,235 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+
+package software.amazon.awssdk.http.apache5.internal.conn;
+
+import org.apache.hc.client5.http.DnsResolver;
+import org.apache.hc.client5.http.HttpRoute;
+import org.apache.hc.client5.http.SchemePortResolver;
+import org.apache.hc.client5.http.config.ConnectionConfig;
+import org.apache.hc.client5.http.config.TlsConfig;
+import org.apache.hc.client5.http.impl.io.DefaultHttpClientConnectionOperator;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
+import org.apache.hc.client5.http.io.HttpClientConnectionOperator;
+import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
+import org.apache.hc.client5.http.ssl.TlsSocketStrategy;
+import org.apache.hc.core5.function.Resolver;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.URIScheme;
+import org.apache.hc.core5.http.config.RegistryBuilder;
+import org.apache.hc.core5.http.io.SocketConfig;
+import software.amazon.awssdk.annotations.SdkInternalApi;
+
+/**
+ * This is a fork of {@link PoolingHttpClientConnectionManagerBuilder} from Apache 5. The purpose of this forked class is to
+ * enable usage of the {@link SafePoolingHttpClientConnectionManager} to enable the workaround for
+ * https://github.com/aws/aws-sdk-java-v2/issues/6786.
+ */
+// This a direct copy of PoolingHttpClientConnectionManagerBuilder with minor changes to remove methods we don't use and
+// updates to follow our style guide.
+@SdkInternalApi
+public final class SafePoolingHttpClientConnectionManagerBuilder {
+
+ private TlsSocketStrategy tlsSocketStrategy;
+ private SchemePortResolver schemePortResolver;
+ private DnsResolver dnsResolver;
+ private Resolver socketConfigResolver;
+ private Resolver connectionConfigResolver;
+ private Resolver tlsConfigResolver;
+
+ private int maxConnTotal;
+ private int maxConnPerRoute;
+
+ public static SafePoolingHttpClientConnectionManagerBuilder create() {
+ return new SafePoolingHttpClientConnectionManagerBuilder();
+ }
+
+ /**
+ * Sets {@link TlsSocketStrategy} instance.
+ *
+ * @return this instance.
+ */
+ public SafePoolingHttpClientConnectionManagerBuilder setTlsSocketStrategy(TlsSocketStrategy tlsSocketStrategy) {
+ this.tlsSocketStrategy = tlsSocketStrategy;
+ return this;
+ }
+
+ /**
+ * Sets {@link DnsResolver} instance.
+ *
+ * @return this instance.
+ */
+ public SafePoolingHttpClientConnectionManagerBuilder setDnsResolver(DnsResolver dnsResolver) {
+ this.dnsResolver = dnsResolver;
+ return this;
+ }
+
+ /**
+ * Sets {@link SchemePortResolver} instance.
+ *
+ * @return this instance.
+ */
+ public SafePoolingHttpClientConnectionManagerBuilder setSchemePortResolver(SchemePortResolver schemePortResolver) {
+ this.schemePortResolver = schemePortResolver;
+ return this;
+ }
+
+ /**
+ * Sets maximum total connection value.
+ *
+ * @return this instance.
+ */
+ public SafePoolingHttpClientConnectionManagerBuilder setMaxConnTotal(int maxConnTotal) {
+ this.maxConnTotal = maxConnTotal;
+ return this;
+ }
+
+ /**
+ * Sets maximum connection per route value.
+ *
+ * @return this instance.
+ */
+ public SafePoolingHttpClientConnectionManagerBuilder setMaxConnPerRoute(int maxConnPerRoute) {
+ this.maxConnPerRoute = maxConnPerRoute;
+ return this;
+ }
+
+ /**
+ * Sets the same {@link SocketConfig} for all routes.
+ *
+ * @return this instance.
+ */
+ public SafePoolingHttpClientConnectionManagerBuilder setDefaultSocketConfig(SocketConfig config) {
+ this.socketConfigResolver = route -> config;
+ return this;
+ }
+
+ /**
+ * Sets {@link Resolver} of {@link SocketConfig} on a per route basis.
+ *
+ * @return this instance.
+ * @since 5.2
+ */
+ public SafePoolingHttpClientConnectionManagerBuilder setSocketConfigResolver(
+ Resolver socketConfigResolver) {
+ this.socketConfigResolver = socketConfigResolver;
+ return this;
+ }
+
+ /**
+ * Sets the same {@link ConnectionConfig} for all routes.
+ *
+ * @return this instance.
+ * @since 5.2
+ */
+ public SafePoolingHttpClientConnectionManagerBuilder setDefaultConnectionConfig(ConnectionConfig config) {
+ this.connectionConfigResolver = route -> config;
+ return this;
+ }
+
+ /**
+ * Sets {@link Resolver} of {@link ConnectionConfig} on a per route basis.
+ *
+ * @return this instance.
+ * @since 5.2
+ */
+ public SafePoolingHttpClientConnectionManagerBuilder setConnectionConfigResolver(
+ Resolver connectionConfigResolver) {
+ this.connectionConfigResolver = connectionConfigResolver;
+ return this;
+ }
+
+ /**
+ * Sets the same {@link TlsConfig} for all hosts.
+ *
+ * @return this instance.
+ * @since 5.2
+ */
+ public SafePoolingHttpClientConnectionManagerBuilder setDefaultTlsConfig(TlsConfig config) {
+ this.tlsConfigResolver = host -> config;
+ return this;
+ }
+
+ /**
+ * Sets {@link Resolver} of {@link TlsConfig} on a per host basis.
+ *
+ * @return this instance.
+ * @since 5.2
+ */
+ public SafePoolingHttpClientConnectionManagerBuilder setTlsConfigResolver(
+ Resolver tlsConfigResolver) {
+ this.tlsConfigResolver = tlsConfigResolver;
+ return this;
+ }
+
+ protected HttpClientConnectionOperator createConnectionOperator(SchemePortResolver schemePortResolver,
+ DnsResolver dnsResolver,
+ TlsSocketStrategy tlsSocketStrategy) {
+ return new DefaultHttpClientConnectionOperator(schemePortResolver, dnsResolver,
+ RegistryBuilder.create()
+ .register(URIScheme.HTTPS.id, tlsSocketStrategy)
+ .build());
+ }
+
+ public SafePoolingHttpClientConnectionManager build() {
+ TlsSocketStrategy tlsSocketStrategyCopy;
+ if (tlsSocketStrategy != null) {
+ tlsSocketStrategyCopy = tlsSocketStrategy;
+ } else {
+ tlsSocketStrategyCopy = DefaultClientTlsStrategy.createDefault();
+ }
+
+ SafePoolingHttpClientConnectionManager poolingmgr = new SafePoolingHttpClientConnectionManager(
+ createConnectionOperator(schemePortResolver, dnsResolver, tlsSocketStrategyCopy));
+ poolingmgr.setSocketConfigResolver(socketConfigResolver);
+ poolingmgr.setConnectionConfigResolver(connectionConfigResolver);
+ poolingmgr.setTlsConfigResolver(tlsConfigResolver);
+ if (maxConnTotal > 0) {
+ poolingmgr.setMaxTotal(maxConnTotal);
+ }
+ if (maxConnPerRoute > 0) {
+ poolingmgr.setDefaultMaxPerRoute(maxConnPerRoute);
+ }
+ return poolingmgr;
+ }
+
+}
diff --git a/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafeStrictConnPool.java b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafeStrictConnPool.java
new file mode 100644
index 000000000000..5df86d1729b3
--- /dev/null
+++ b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafeStrictConnPool.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.apache5.internal.conn;
+
+import java.util.concurrent.Future;
+import org.apache.hc.client5.http.HttpRoute;
+import org.apache.hc.client5.http.io.ManagedHttpClientConnection;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.pool.ConnPoolListener;
+import org.apache.hc.core5.pool.DisposalCallback;
+import org.apache.hc.core5.pool.PoolEntry;
+import org.apache.hc.core5.pool.PoolReusePolicy;
+import org.apache.hc.core5.pool.StrictConnPool;
+import org.apache.hc.core5.util.TimeValue;
+import org.apache.hc.core5.util.Timeout;
+import software.amazon.awssdk.annotations.SdkInternalApi;
+import software.amazon.awssdk.http.apache5.internal.utils.CancelOnInterruptWrapper;
+
+/**
+ * Specialization of {@link StrictConnPool} that prevents leaking the connection when thread waiting on the future is
+ * interrupted.
+ */
+@SdkInternalApi
+public final class SafeStrictConnPool extends StrictConnPool {
+ public SafeStrictConnPool(int defaultMaxPerRoute,
+ int maxTotal,
+ TimeValue timeToLive,
+ PoolReusePolicy policy,
+ DisposalCallback disposalCallback,
+ ConnPoolListener connPoolListener) {
+ super(defaultMaxPerRoute, maxTotal, timeToLive, policy, disposalCallback, connPoolListener);
+ }
+
+ public Future> lease(HttpRoute route,
+ Object state,
+ Timeout requestTimeout,
+ FutureCallback> callback) {
+ return safeLease(super.lease(route, state, requestTimeout, callback));
+ }
+
+ private Future> safeLease(
+ Future> leaseFuture) {
+ return new CancelOnInterruptWrapper<>(leaseFuture);
+ }
+}
diff --git a/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptWrapper.java b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptWrapper.java
new file mode 100644
index 000000000000..19f5f29ae481
--- /dev/null
+++ b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptWrapper.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.apache5.internal.utils;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import software.amazon.awssdk.annotations.SdkInternalApi;
+
+/**
+ * Wrapper that attempts to cancel the delegate future if the thread is interrupted within get().
+ */
+@SdkInternalApi
+public final class CancelOnInterruptWrapper implements Future {
+ private final Future extends ResultT> f;
+
+ public CancelOnInterruptWrapper(Future extends ResultT> f) {
+ this.f = f;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return f.cancel(mayInterruptIfRunning);
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return f.isCancelled();
+ }
+
+ @Override
+ public boolean isDone() {
+ return f.isDone();
+ }
+
+
+ // This method attempts to cancel the wrapped future if the thread is interrupted while blocked on get(). This is done by
+ // attempting to cancel() the future when InterruptedException is thrown. If the the cancel() is unsuccessful (i.e.
+ // the future is completed either successfully or exceptionally), then get the result if present and return it.
+ @Override
+ public ResultT get() throws InterruptedException, ExecutionException {
+ try {
+ return f.get();
+ } catch (InterruptedException ie) {
+ if (!cancel(true)) {
+ try {
+ // We couldn't cancel so the result will be available or it failed
+ ResultT entry = f.get();
+ Thread.currentThread().interrupt();
+ return entry;
+ } catch (CancellationException | InterruptedException | ExecutionException e) {
+ // no-op, let it fall through to throwing the original interrupted exception
+ }
+ }
+ throw ie;
+ }
+ }
+
+
+ // This method attempts to cancel the wrapped future if the thread is interrupted while blocked on get(). This is done by
+ // attempting to cancel() the future when InterruptedException is thrown. If the the cancel() is unsuccessful (i.e.
+ // the future is completed either successfully or exceptionally), then get the result if present and return it.
+ @Override
+ public ResultT get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ try {
+ return f.get(timeout, unit);
+ } catch (InterruptedException ie) {
+ if (!cancel(true)) {
+ try {
+ // We couldn't cancel so the result will be available or it failed
+ ResultT entry = f.get();
+ Thread.currentThread().interrupt();
+ return entry;
+ } catch (CancellationException | InterruptedException | ExecutionException e) {
+ // no-op, let it fall through to throwing the original interrupted exception
+ }
+ }
+ throw ie;
+ }
+ }
+}
diff --git a/http-clients/apache5-client/src/test/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManagerTest.java b/http-clients/apache5-client/src/test/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManagerTest.java
new file mode 100644
index 000000000000..8849bb3ba3ce
--- /dev/null
+++ b/http-clients/apache5-client/src/test/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManagerTest.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.apache5.internal.conn;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.github.tomakehurst.wiremock.WireMockServer;
+import com.github.tomakehurst.wiremock.client.WireMock;
+import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hc.client5.http.classic.methods.HttpGet;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
+import org.apache.hc.client5.http.impl.classic.HttpClients;
+import org.apache.hc.core5.http.HttpEntity;
+import org.apache.hc.core5.http.ParseException;
+import org.apache.hc.core5.http.io.entity.EntityUtils;
+import org.apache.hc.core5.pool.PoolStats;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class SafePoolingHttpClientConnectionManagerTest {
+ private static WireMockServer wm;
+
+ @BeforeAll
+ static void setup() {
+ wm = new WireMockServer(WireMockConfiguration.wireMockConfig().dynamicPort());
+ wm.start();
+ }
+
+ @AfterAll
+ static void teardown() {
+ wm.stop();
+ }
+
+ @Test
+ void leaseInterrupted_doesNotLeakConnections_probabilistic() throws IOException, InterruptedException, ParseException {
+ wm.stubFor(WireMock.get(WireMock.anyUrl()).willReturn(WireMock.aResponse().withStatus(204)));
+ int rounds = 16;
+ for (int i = 0; i < rounds; ++i) {
+ leaseInterrupted_doesNotLeakConnections();
+ }
+ }
+
+ private void leaseInterrupted_doesNotLeakConnections() throws InterruptedException, IOException, ParseException {
+ int numRequests = 10_000;
+
+ SafePoolingHttpClientConnectionManager connectionManager = SafePoolingHttpClientConnectionManagerBuilder.create().build();
+ connectionManager.setMaxTotal(100);
+ connectionManager.setDefaultMaxPerRoute(40);
+
+ try (CloseableHttpClient httpClient = HttpClients.custom().setConnectionManager(connectionManager).build()) {
+ AtomicInteger successCount = new AtomicInteger(0);
+ AtomicInteger errorCount = new AtomicInteger(0);
+ ExecutorService executor = Executors.newFixedThreadPool(100);
+
+ try {
+ List> futures = new ArrayList<>(numRequests);
+ CountDownLatch go = new CountDownLatch(numRequests);
+
+ for (int i = 0; i < numRequests; i++) {
+ Future> f =
+ executor.submit(
+ () -> {
+ go.countDown();
+ try {
+ doRequest(httpClient);
+ successCount.incrementAndGet();
+ } catch (Exception e) {
+ errorCount.incrementAndGet();
+ }
+ });
+ futures.add(f);
+ }
+
+ go.await();
+
+ for (Future> future : futures) {
+ future.cancel(true);
+ }
+ } finally {
+ executor.shutdown();
+ executor.awaitTermination(100, TimeUnit.SECONDS);
+ }
+ connectionManager.closeExpired();
+ assertNoLeases(connectionManager);
+ doRequest(httpClient);
+ }
+ }
+
+ private static void doRequest(CloseableHttpClient httpClient) throws IOException, ParseException {
+ HttpGet httpGet = new HttpGet("http://localhost:" + wm.port());
+ try (CloseableHttpResponse response = httpClient.execute(httpGet)) {
+ HttpEntity entity = response.getEntity();
+ if (entity != null) {
+ EntityUtils.toString(entity);
+ }
+ }
+ }
+
+ private static void assertNoLeases(SafePoolingHttpClientConnectionManager manager) {
+ PoolStats totalStats = manager.getTotalStats();
+ assertThat(totalStats.getLeased()).as("Leased connection count").isEqualTo(0);
+ }
+}
diff --git a/http-clients/apache5-client/src/test/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptWrapperTest.java b/http-clients/apache5-client/src/test/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptWrapperTest.java
new file mode 100644
index 000000000000..9d98bba5da1a
--- /dev/null
+++ b/http-clients/apache5-client/src/test/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptWrapperTest.java
@@ -0,0 +1,219 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.apache5.internal.utils;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class CancelOnInterruptWrapperTest {
+ private Future mockDelegate;
+
+ @BeforeEach
+ void setup() {
+ mockDelegate = mock(Future.class);
+ }
+
+ @AfterEach
+ void teardown() {
+ Thread.interrupted(); // clear the flag if it was set by the last test
+ }
+
+ @Test
+ void cancel_callsDelegate() {
+ CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate);
+ wrapper.cancel(true);
+ verify(mockDelegate).cancel(eq(true));
+ }
+
+ @Test
+ void isCancelled_callsDelegate() {
+ CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate);
+ wrapper.isCancelled();
+ verify(mockDelegate).isCancelled();
+ }
+
+ @Test
+ void isDone_callsDelegate() {
+ CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate);
+ wrapper.isDone();
+ verify(mockDelegate).isDone();
+ }
+
+ @Test
+ void get_callsDelegate() throws ExecutionException, InterruptedException {
+ CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate);
+ wrapper.get();
+ verify(mockDelegate).get();
+ }
+
+ @Test
+ void getTimeout_callsDelegate() throws ExecutionException, InterruptedException, TimeoutException {
+ CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate);
+ wrapper.get(42, TimeUnit.DAYS);
+ verify(mockDelegate).get(eq(42L), eq(TimeUnit.DAYS));
+ }
+
+ @Test
+ void getTimeout_interrupted_cancelSuccessful_throws() throws ExecutionException, InterruptedException, TimeoutException {
+ when(mockDelegate.get(anyLong(), any(TimeUnit.class))).thenThrow(new InterruptedException("interrupt"));
+ when(mockDelegate.cancel(eq(true))).thenReturn(true);
+
+ CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate);
+
+ assertThatThrownBy(() -> wrapper.get(1, TimeUnit.SECONDS))
+ .isInstanceOf(InterruptedException.class)
+ .hasMessage("interrupt");
+
+ verify(mockDelegate).get(eq(1L), eq(TimeUnit.SECONDS));
+ verify(mockDelegate).cancel(eq(true));
+
+ verifyNoMoreInteractions(mockDelegate);
+ }
+
+ @Test
+ void getTimeout_interrupted_cancelUnsuccessful_returnsEntry() throws ExecutionException, InterruptedException,
+ TimeoutException {
+ String result = "hello there";
+
+ when(mockDelegate.get(anyLong(), any(TimeUnit.class))).thenThrow(new InterruptedException("interrupt"));
+ when(mockDelegate.cancel(eq(true))).thenReturn(false);
+ when(mockDelegate.get()).thenReturn(result);
+
+ CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate);
+
+ assertThat(wrapper.get(1, TimeUnit.SECONDS)).isEqualTo(result);
+ }
+
+ @Test
+ void getTimeout_interrupted_cancelUnsuccessful_getUnsuccessful_rethrowsOriginalIe() throws ExecutionException,
+ InterruptedException, TimeoutException {
+ InterruptedException interrupt = new InterruptedException("interrupt");
+
+ when(mockDelegate.get(anyLong(), any(TimeUnit.class))).thenThrow(interrupt);
+ when(mockDelegate.cancel(eq(true))).thenReturn(false);
+ when(mockDelegate.get()).thenThrow(new CancellationException("cancelled"));
+
+ CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate);
+
+ assertThatThrownBy(() -> wrapper.get(1, TimeUnit.SECONDS)).isSameAs(interrupt);
+ }
+
+ @Test
+ void get_interrupted_cancelSuccessful_throws() throws ExecutionException, InterruptedException, TimeoutException {
+ when(mockDelegate.get()).thenThrow(new InterruptedException("interrupt"));
+ when(mockDelegate.cancel(eq(true))).thenReturn(true);
+
+ CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate);
+
+ assertThatThrownBy(wrapper::get)
+ .isInstanceOf(InterruptedException.class)
+ .hasMessage("interrupt");
+
+ verify(mockDelegate).get();
+ verify(mockDelegate).cancel(eq(true));
+
+ verifyNoMoreInteractions(mockDelegate);
+ }
+
+ @Test
+ void get_interrupted_cancelUnsuccessful_returnsEntry() throws ExecutionException, InterruptedException, TimeoutException {
+ String result = "hello there";
+
+ AtomicBoolean first = new AtomicBoolean(true);
+ when(mockDelegate.get()).thenAnswer(i -> {
+ if (first.compareAndSet(true, false)) {
+ throw new InterruptedException("interrupt");
+ }
+ return result;
+ });
+ when(mockDelegate.cancel(eq(true))).thenReturn(false);
+
+ CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate);
+
+ assertThat(wrapper.get()).isEqualTo(result);
+ }
+
+ @Test
+ void get_interrupted_cancelUnsuccessful_getUnsuccessful_rethrowsOriginalIe() throws ExecutionException, InterruptedException, TimeoutException {
+ InterruptedException interrupt = new InterruptedException("interrupt");
+
+ AtomicBoolean first = new AtomicBoolean(true);
+ when(mockDelegate.get()).thenAnswer(i -> {
+ if (first.compareAndSet(true, false)) {
+ throw interrupt;
+ }
+ throw new CancellationException("cancelled");
+ });
+ when(mockDelegate.cancel(eq(true))).thenReturn(false);
+
+ CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate);
+
+ assertThatThrownBy(wrapper::get).isSameAs(interrupt);
+ }
+
+ @Test
+ void get_interrupted_cancelUnsuccessful_cancelUnsuccessful_preservesInterruptedFlag() throws ExecutionException, InterruptedException {
+ String result = "hello there";
+
+ AtomicBoolean first = new AtomicBoolean(true);
+ when(mockDelegate.get()).thenAnswer(i -> {
+ if (first.compareAndSet(true, false)) {
+ throw new InterruptedException("interrupt");
+ }
+ return result;
+ });
+ when(mockDelegate.cancel(eq(true))).thenReturn(false);
+
+ CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate);
+
+ wrapper.get();
+
+ assertThat(Thread.interrupted()).isTrue();
+ }
+
+ @Test
+ void getTimeout_interrupted_cancelUnsuccessful_preservesInterruptedFlag() throws ExecutionException, InterruptedException,
+ TimeoutException {
+ String result = "hello there";
+
+ when(mockDelegate.get(anyLong(), any(TimeUnit.class))).thenThrow(new InterruptedException("interrupt"));
+ when(mockDelegate.cancel(eq(true))).thenReturn(false);
+ when(mockDelegate.get()).thenReturn(result);
+
+ CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate);
+
+ wrapper.get(1, TimeUnit.SECONDS);
+
+ assertThat(Thread.interrupted()).isTrue();
+ }
+}