From 10f20410e589a39ce38a2bc9eb8cb3ad98e96b2d Mon Sep 17 00:00:00 2001 From: Dongie Agnir Date: Tue, 31 Mar 2026 13:26:22 -0700 Subject: [PATCH 1/3] Prevent conn leak on thread interrupt If a thread blocked waiting on a connection pool is interrupted, it's possible for the connection to become leaked if the pool then completes the future from its side. This commit prevents that from happening by intercepting the interrupt and attempting to cancel the lease future. If that is unsuccessful, the connection is returned to the upper, and the thread is reinterrupted. Adresses #6786 --- .../bugfix-Apache5HTTPClient-9de458e.json | 6 + .../http/apache5/Apache5HttpClient.java | 6 +- ...afePoolingHttpClientConnectionManager.java | 44 ++++ ...ingHttpClientConnectionManagerBuilder.java | 235 ++++++++++++++++++ .../internal/conn/SafeStrictConnPool.java | 59 +++++ .../utils/CancelOnInterruptWrapper.java | 78 ++++++ ...oolingHttpClientConnectionManagerTest.java | 127 ++++++++++ .../utils/CancelOnInterruptFutureTest.java | 121 +++++++++ 8 files changed, 673 insertions(+), 3 deletions(-) create mode 100644 .changes/next-release/bugfix-Apache5HTTPClient-9de458e.json create mode 100644 http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManager.java create mode 100644 http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManagerBuilder.java create mode 100644 http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafeStrictConnPool.java create mode 100644 http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptWrapper.java create mode 100644 http-clients/apache5-client/src/test/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManagerTest.java create mode 100644 http-clients/apache5-client/src/test/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptFutureTest.java diff --git a/.changes/next-release/bugfix-Apache5HTTPClient-9de458e.json b/.changes/next-release/bugfix-Apache5HTTPClient-9de458e.json new file mode 100644 index 000000000000..01eb808d9d22 --- /dev/null +++ b/.changes/next-release/bugfix-Apache5HTTPClient-9de458e.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "Apache 5 HTTP Client", + "contributor": "", + "description": "Fixed a connection leak that could occur when the thread waiting to acquire a connection from the pool is interrupted. Fixes [#6786](https://github.com/aws/aws-sdk-java-v2/issues/6786)." +} diff --git a/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/Apache5HttpClient.java b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/Apache5HttpClient.java index a07eb5f107c0..34b2a2a583a7 100644 --- a/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/Apache5HttpClient.java +++ b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/Apache5HttpClient.java @@ -50,7 +50,6 @@ import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; import org.apache.hc.client5.http.impl.classic.HttpClients; import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager; -import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder; import org.apache.hc.client5.http.impl.routing.DefaultRoutePlanner; import org.apache.hc.client5.http.io.HttpClientConnectionManager; import org.apache.hc.client5.http.protocol.HttpClientContext; @@ -92,6 +91,7 @@ import software.amazon.awssdk.http.apache5.internal.SdkProxyRoutePlanner; import software.amazon.awssdk.http.apache5.internal.conn.ClientConnectionManagerFactory; import software.amazon.awssdk.http.apache5.internal.conn.IdleConnectionReaper; +import software.amazon.awssdk.http.apache5.internal.conn.SafePoolingHttpClientConnectionManagerBuilder; import software.amazon.awssdk.http.apache5.internal.conn.SdkConnectionKeepAliveStrategy; import software.amazon.awssdk.http.apache5.internal.conn.SdkTlsSocketFactory; import software.amazon.awssdk.http.apache5.internal.impl.Apache5HttpRequestFactory; @@ -754,8 +754,8 @@ public PoolingHttpClientConnectionManager create(Apache5HttpClient.DefaultBuilde TlsSocketStrategy tlsStrategy = getPreferredTlsStrategy(configuration, standardOptions); - PoolingHttpClientConnectionManagerBuilder builder = - PoolingHttpClientConnectionManagerBuilder.create() + SafePoolingHttpClientConnectionManagerBuilder builder = + SafePoolingHttpClientConnectionManagerBuilder.create() .setTlsSocketStrategy(tlsStrategy) .setSchemePortResolver(DefaultSchemePortResolver.INSTANCE) .setDnsResolver(configuration.dnsResolver); diff --git a/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManager.java b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManager.java new file mode 100644 index 000000000000..7f2b52bde072 --- /dev/null +++ b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManager.java @@ -0,0 +1,44 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.apache5.internal.conn; + +import org.apache.hc.client5.http.impl.io.ManagedHttpClientConnectionFactory; +import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager; +import org.apache.hc.client5.http.io.HttpClientConnectionOperator; +import org.apache.hc.core5.pool.DefaultDisposalCallback; +import org.apache.hc.core5.pool.PoolReusePolicy; +import software.amazon.awssdk.annotations.SdkInternalApi; + +/** + * Specialization of {@link PoolingHttpClientConnectionManager} to enable use of {@link SafeStrictConnPool} to prevent leaking + * connections when the thread waiting on the future is interrupted. + */ +@SdkInternalApi +public final class SafePoolingHttpClientConnectionManager extends PoolingHttpClientConnectionManager { + public SafePoolingHttpClientConnectionManager(HttpClientConnectionOperator connectionOperator) { + super(connectionOperator, + new SafeStrictConnPool( + DEFAULT_MAX_CONNECTIONS_PER_ROUTE, + DEFAULT_MAX_TOTAL_CONNECTIONS, + null, + PoolReusePolicy.LIFO, + new DefaultDisposalCallback<>(), + null + ), + ManagedHttpClientConnectionFactory.INSTANCE + ); + } +} diff --git a/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManagerBuilder.java b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManagerBuilder.java new file mode 100644 index 000000000000..36373ce71290 --- /dev/null +++ b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManagerBuilder.java @@ -0,0 +1,235 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package software.amazon.awssdk.http.apache5.internal.conn; + +import org.apache.hc.client5.http.DnsResolver; +import org.apache.hc.client5.http.HttpRoute; +import org.apache.hc.client5.http.SchemePortResolver; +import org.apache.hc.client5.http.config.ConnectionConfig; +import org.apache.hc.client5.http.config.TlsConfig; +import org.apache.hc.client5.http.impl.io.DefaultHttpClientConnectionOperator; +import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder; +import org.apache.hc.client5.http.io.HttpClientConnectionOperator; +import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy; +import org.apache.hc.client5.http.ssl.TlsSocketStrategy; +import org.apache.hc.core5.function.Resolver; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.config.RegistryBuilder; +import org.apache.hc.core5.http.io.SocketConfig; +import software.amazon.awssdk.annotations.SdkInternalApi; + +/** + * This is a fork of {@link PoolingHttpClientConnectionManagerBuilder} from Apache 5. The purpose of this forked class is to + * enable usage of the {@link SafePoolingHttpClientConnectionManager} to enable the workaround for + * https://github.com/aws/aws-sdk-java-v2/issues/6786. + */ +// This a direct copy of PoolingHttpClientConnectionManagerBuilder with minor changes to remove methods we don't use and +// updates to follow our style guide. +@SdkInternalApi +public final class SafePoolingHttpClientConnectionManagerBuilder { + + private TlsSocketStrategy tlsSocketStrategy; + private SchemePortResolver schemePortResolver; + private DnsResolver dnsResolver; + private Resolver socketConfigResolver; + private Resolver connectionConfigResolver; + private Resolver tlsConfigResolver; + + private int maxConnTotal; + private int maxConnPerRoute; + + public static SafePoolingHttpClientConnectionManagerBuilder create() { + return new SafePoolingHttpClientConnectionManagerBuilder(); + } + + /** + * Sets {@link TlsSocketStrategy} instance. + * + * @return this instance. + */ + public SafePoolingHttpClientConnectionManagerBuilder setTlsSocketStrategy(TlsSocketStrategy tlsSocketStrategy) { + this.tlsSocketStrategy = tlsSocketStrategy; + return this; + } + + /** + * Sets {@link DnsResolver} instance. + * + * @return this instance. + */ + public SafePoolingHttpClientConnectionManagerBuilder setDnsResolver(DnsResolver dnsResolver) { + this.dnsResolver = dnsResolver; + return this; + } + + /** + * Sets {@link SchemePortResolver} instance. + * + * @return this instance. + */ + public SafePoolingHttpClientConnectionManagerBuilder setSchemePortResolver(SchemePortResolver schemePortResolver) { + this.schemePortResolver = schemePortResolver; + return this; + } + + /** + * Sets maximum total connection value. + * + * @return this instance. + */ + public SafePoolingHttpClientConnectionManagerBuilder setMaxConnTotal(int maxConnTotal) { + this.maxConnTotal = maxConnTotal; + return this; + } + + /** + * Sets maximum connection per route value. + * + * @return this instance. + */ + public SafePoolingHttpClientConnectionManagerBuilder setMaxConnPerRoute(int maxConnPerRoute) { + this.maxConnPerRoute = maxConnPerRoute; + return this; + } + + /** + * Sets the same {@link SocketConfig} for all routes. + * + * @return this instance. + */ + public SafePoolingHttpClientConnectionManagerBuilder setDefaultSocketConfig(SocketConfig config) { + this.socketConfigResolver = route -> config; + return this; + } + + /** + * Sets {@link Resolver} of {@link SocketConfig} on a per route basis. + * + * @return this instance. + * @since 5.2 + */ + public SafePoolingHttpClientConnectionManagerBuilder setSocketConfigResolver( + Resolver socketConfigResolver) { + this.socketConfigResolver = socketConfigResolver; + return this; + } + + /** + * Sets the same {@link ConnectionConfig} for all routes. + * + * @return this instance. + * @since 5.2 + */ + public SafePoolingHttpClientConnectionManagerBuilder setDefaultConnectionConfig(ConnectionConfig config) { + this.connectionConfigResolver = route -> config; + return this; + } + + /** + * Sets {@link Resolver} of {@link ConnectionConfig} on a per route basis. + * + * @return this instance. + * @since 5.2 + */ + public SafePoolingHttpClientConnectionManagerBuilder setConnectionConfigResolver( + Resolver connectionConfigResolver) { + this.connectionConfigResolver = connectionConfigResolver; + return this; + } + + /** + * Sets the same {@link TlsConfig} for all hosts. + * + * @return this instance. + * @since 5.2 + */ + public SafePoolingHttpClientConnectionManagerBuilder setDefaultTlsConfig(TlsConfig config) { + this.tlsConfigResolver = host -> config; + return this; + } + + /** + * Sets {@link Resolver} of {@link TlsConfig} on a per host basis. + * + * @return this instance. + * @since 5.2 + */ + public SafePoolingHttpClientConnectionManagerBuilder setTlsConfigResolver( + Resolver tlsConfigResolver) { + this.tlsConfigResolver = tlsConfigResolver; + return this; + } + + protected HttpClientConnectionOperator createConnectionOperator(SchemePortResolver schemePortResolver, + DnsResolver dnsResolver, + TlsSocketStrategy tlsSocketStrategy) { + return new DefaultHttpClientConnectionOperator(schemePortResolver, dnsResolver, + RegistryBuilder.create() + .register(URIScheme.HTTPS.id, tlsSocketStrategy) + .build()); + } + + public SafePoolingHttpClientConnectionManager build() { + TlsSocketStrategy tlsSocketStrategyCopy; + if (tlsSocketStrategy != null) { + tlsSocketStrategyCopy = tlsSocketStrategy; + } else { + tlsSocketStrategyCopy = DefaultClientTlsStrategy.createDefault(); + } + + SafePoolingHttpClientConnectionManager poolingmgr = new SafePoolingHttpClientConnectionManager( + createConnectionOperator(schemePortResolver, dnsResolver, tlsSocketStrategyCopy)); + poolingmgr.setSocketConfigResolver(socketConfigResolver); + poolingmgr.setConnectionConfigResolver(connectionConfigResolver); + poolingmgr.setTlsConfigResolver(tlsConfigResolver); + if (maxConnTotal > 0) { + poolingmgr.setMaxTotal(maxConnTotal); + } + if (maxConnPerRoute > 0) { + poolingmgr.setDefaultMaxPerRoute(maxConnPerRoute); + } + return poolingmgr; + } + +} diff --git a/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafeStrictConnPool.java b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafeStrictConnPool.java new file mode 100644 index 000000000000..5df86d1729b3 --- /dev/null +++ b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafeStrictConnPool.java @@ -0,0 +1,59 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.apache5.internal.conn; + +import java.util.concurrent.Future; +import org.apache.hc.client5.http.HttpRoute; +import org.apache.hc.client5.http.io.ManagedHttpClientConnection; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.pool.ConnPoolListener; +import org.apache.hc.core5.pool.DisposalCallback; +import org.apache.hc.core5.pool.PoolEntry; +import org.apache.hc.core5.pool.PoolReusePolicy; +import org.apache.hc.core5.pool.StrictConnPool; +import org.apache.hc.core5.util.TimeValue; +import org.apache.hc.core5.util.Timeout; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.http.apache5.internal.utils.CancelOnInterruptWrapper; + +/** + * Specialization of {@link StrictConnPool} that prevents leaking the connection when thread waiting on the future is + * interrupted. + */ +@SdkInternalApi +public final class SafeStrictConnPool extends StrictConnPool { + public SafeStrictConnPool(int defaultMaxPerRoute, + int maxTotal, + TimeValue timeToLive, + PoolReusePolicy policy, + DisposalCallback disposalCallback, + ConnPoolListener connPoolListener) { + super(defaultMaxPerRoute, maxTotal, timeToLive, policy, disposalCallback, connPoolListener); + } + + public Future> lease(HttpRoute route, + Object state, + Timeout requestTimeout, + FutureCallback> callback) { + return safeLease(super.lease(route, state, requestTimeout, callback)); + } + + private Future> safeLease( + Future> leaseFuture) { + return new CancelOnInterruptWrapper<>(leaseFuture); + } +} diff --git a/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptWrapper.java b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptWrapper.java new file mode 100644 index 000000000000..acc429b757f9 --- /dev/null +++ b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptWrapper.java @@ -0,0 +1,78 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.apache5.internal.utils; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import software.amazon.awssdk.annotations.SdkInternalApi; + +/** + * Wrapper that attempts to cancel the delegate future if the thread is interrupted within get(). + */ +@SdkInternalApi +public final class CancelOnInterruptWrapper implements Future { + private final Future f; + + public CancelOnInterruptWrapper(Future f) { + this.f = f; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return f.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return f.isCancelled(); + } + + @Override + public boolean isDone() { + return f.isDone(); + } + + @Override + public ResultT get() throws InterruptedException, ExecutionException { + return f.get(); + } + + // This method attempts to cancel the wrapped future if the thread is interrupted while blocked on get(). This is done by + // attempting to cancel() the future when InterruptedException is thrown. If the the cancel() is unsuccessful (i.e. + // the future is completed either successfully or exceptionally), then get the result if present and return it. + @Override + public ResultT get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + try { + return f.get(timeout, unit); + } catch (InterruptedException ie) { + if (!cancel(true)) { + try { + // We couldn't cancel so the result will be available or it failed + ResultT entry = f.get(); + Thread.currentThread().interrupt(); + return entry; + } catch (CancellationException | InterruptedException | ExecutionException e) { + // no-op, let it fall through to throwing the original interrupted exception + } + } + throw ie; + } + } +} \ No newline at end of file diff --git a/http-clients/apache5-client/src/test/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManagerTest.java b/http-clients/apache5-client/src/test/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManagerTest.java new file mode 100644 index 000000000000..8849bb3ba3ce --- /dev/null +++ b/http-clients/apache5-client/src/test/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManagerTest.java @@ -0,0 +1,127 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.apache5.internal.conn; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.github.tomakehurst.wiremock.WireMockServer; +import com.github.tomakehurst.wiremock.client.WireMock; +import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hc.client5.http.classic.methods.HttpGet; +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; +import org.apache.hc.client5.http.impl.classic.HttpClients; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.ParseException; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.apache.hc.core5.pool.PoolStats; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class SafePoolingHttpClientConnectionManagerTest { + private static WireMockServer wm; + + @BeforeAll + static void setup() { + wm = new WireMockServer(WireMockConfiguration.wireMockConfig().dynamicPort()); + wm.start(); + } + + @AfterAll + static void teardown() { + wm.stop(); + } + + @Test + void leaseInterrupted_doesNotLeakConnections_probabilistic() throws IOException, InterruptedException, ParseException { + wm.stubFor(WireMock.get(WireMock.anyUrl()).willReturn(WireMock.aResponse().withStatus(204))); + int rounds = 16; + for (int i = 0; i < rounds; ++i) { + leaseInterrupted_doesNotLeakConnections(); + } + } + + private void leaseInterrupted_doesNotLeakConnections() throws InterruptedException, IOException, ParseException { + int numRequests = 10_000; + + SafePoolingHttpClientConnectionManager connectionManager = SafePoolingHttpClientConnectionManagerBuilder.create().build(); + connectionManager.setMaxTotal(100); + connectionManager.setDefaultMaxPerRoute(40); + + try (CloseableHttpClient httpClient = HttpClients.custom().setConnectionManager(connectionManager).build()) { + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger errorCount = new AtomicInteger(0); + ExecutorService executor = Executors.newFixedThreadPool(100); + + try { + List> futures = new ArrayList<>(numRequests); + CountDownLatch go = new CountDownLatch(numRequests); + + for (int i = 0; i < numRequests; i++) { + Future f = + executor.submit( + () -> { + go.countDown(); + try { + doRequest(httpClient); + successCount.incrementAndGet(); + } catch (Exception e) { + errorCount.incrementAndGet(); + } + }); + futures.add(f); + } + + go.await(); + + for (Future future : futures) { + future.cancel(true); + } + } finally { + executor.shutdown(); + executor.awaitTermination(100, TimeUnit.SECONDS); + } + connectionManager.closeExpired(); + assertNoLeases(connectionManager); + doRequest(httpClient); + } + } + + private static void doRequest(CloseableHttpClient httpClient) throws IOException, ParseException { + HttpGet httpGet = new HttpGet("http://localhost:" + wm.port()); + try (CloseableHttpResponse response = httpClient.execute(httpGet)) { + HttpEntity entity = response.getEntity(); + if (entity != null) { + EntityUtils.toString(entity); + } + } + } + + private static void assertNoLeases(SafePoolingHttpClientConnectionManager manager) { + PoolStats totalStats = manager.getTotalStats(); + assertThat(totalStats.getLeased()).as("Leased connection count").isEqualTo(0); + } +} diff --git a/http-clients/apache5-client/src/test/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptFutureTest.java b/http-clients/apache5-client/src/test/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptFutureTest.java new file mode 100644 index 000000000000..72df8a3926cd --- /dev/null +++ b/http-clients/apache5-client/src/test/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptFutureTest.java @@ -0,0 +1,121 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.apache5.internal.utils; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class CancelOnInterruptFutureTest { + private Future mockDelegate; + + @BeforeEach + void setup() { + mockDelegate = mock(Future.class); + } + + @Test + void cancel_callsDelegate() { + CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate); + wrapper.cancel(true); + verify(mockDelegate).cancel(eq(true)); + } + + @Test + void isCancelled_callsDelegate() { + CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate); + wrapper.isCancelled(); + verify(mockDelegate).isCancelled(); + } + + @Test + void isDone_callsDelegate() { + CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate); + wrapper.isDone(); + verify(mockDelegate).isDone(); + } + + @Test + void get_callsDelegate() throws ExecutionException, InterruptedException { + CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate); + wrapper.get(); + verify(mockDelegate).get(); + } + + @Test + void getTimeout_callsDelegate() throws ExecutionException, InterruptedException, TimeoutException { + CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate); + wrapper.get(42, TimeUnit.DAYS); + verify(mockDelegate).get(eq(42L), eq(TimeUnit.DAYS)); + } + + @Test + void get_interrupted_cancelSuccessful_throws() throws ExecutionException, InterruptedException, TimeoutException { + when(mockDelegate.get(anyLong(), any(TimeUnit.class))).thenThrow(new InterruptedException("interrupt")); + when(mockDelegate.cancel(eq(true))).thenReturn(true); + + CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate); + + assertThatThrownBy(() -> wrapper.get(1, TimeUnit.SECONDS)) + .isInstanceOf(InterruptedException.class) + .hasMessage("interrupt"); + + verify(mockDelegate).get(eq(1L), eq(TimeUnit.SECONDS)); + verify(mockDelegate).cancel(eq(true)); + + verifyNoMoreInteractions(mockDelegate); + } + + @Test + void get_interrupted_cancelUnsuccessful_returnsEntry() throws ExecutionException, InterruptedException, TimeoutException { + String result = "hello there"; + + when(mockDelegate.get(anyLong(), any(TimeUnit.class))).thenThrow(new InterruptedException("interrupt")); + when(mockDelegate.cancel(eq(true))).thenReturn(false); + when(mockDelegate.get()).thenReturn(result); + + CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate); + + assertThat(wrapper.get(1, TimeUnit.SECONDS)).isEqualTo(result); + } + + @Test + void get_interrupted_cancelUnsuccessful_getUnsuccessful_rethrowsOriginalIe() throws ExecutionException, InterruptedException, TimeoutException { + InterruptedException interrupt = new InterruptedException("interrupt"); + + when(mockDelegate.get(anyLong(), any(TimeUnit.class))).thenThrow(interrupt); + when(mockDelegate.cancel(eq(true))).thenReturn(false); + when(mockDelegate.get()).thenThrow(new CancellationException("cancelled")); + + CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate); + + assertThatThrownBy(() -> wrapper.get(1, TimeUnit.SECONDS)).isSameAs(interrupt); + } +} From 3fb551fb6ed6258a5efd7c1042f40f0a079f917f Mon Sep 17 00:00:00 2001 From: Dongie Agnir Date: Thu, 2 Apr 2026 15:05:18 -0700 Subject: [PATCH 2/3] Review comments --- .../utils/CancelOnInterruptWrapper.java | 25 ++++- ...java => CancelOnInterruptWrapperTest.java} | 106 +++++++++++++++++- 2 files changed, 124 insertions(+), 7 deletions(-) rename http-clients/apache5-client/src/test/java/software/amazon/awssdk/http/apache5/internal/utils/{CancelOnInterruptFutureTest.java => CancelOnInterruptWrapperTest.java} (54%) diff --git a/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptWrapper.java b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptWrapper.java index acc429b757f9..aefb9fd7eb2d 100644 --- a/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptWrapper.java +++ b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptWrapper.java @@ -48,17 +48,36 @@ public boolean isDone() { return f.isDone(); } + + // This method attempts to cancel the wrapped future if the thread is interrupted while blocked on get(). This is done by + // attempting to cancel() the future when InterruptedException is thrown. If the the cancel() is unsuccessful (i.e. + // the future is completed either successfully or exceptionally), then get the result if present and return it. @Override public ResultT get() throws InterruptedException, ExecutionException { - return f.get(); + try { + return f.get(); + } catch (InterruptedException ie) { + if (!cancel(true)) { + try { + // We couldn't cancel so the result will be available or it failed + ResultT entry = f.get(); + Thread.currentThread().interrupt(); + return entry; + } catch (CancellationException | InterruptedException | ExecutionException e) { + // no-op, let it fall through to throwing the original interrupted exception + } + } + throw ie; + } } + // This method attempts to cancel the wrapped future if the thread is interrupted while blocked on get(). This is done by // attempting to cancel() the future when InterruptedException is thrown. If the the cancel() is unsuccessful (i.e. // the future is completed either successfully or exceptionally), then get the result if present and return it. @Override public ResultT get(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { + throws InterruptedException, ExecutionException, TimeoutException { try { return f.get(timeout, unit); } catch (InterruptedException ie) { @@ -75,4 +94,4 @@ public ResultT get(long timeout, TimeUnit unit) throw ie; } } -} \ No newline at end of file +} diff --git a/http-clients/apache5-client/src/test/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptFutureTest.java b/http-clients/apache5-client/src/test/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptWrapperTest.java similarity index 54% rename from http-clients/apache5-client/src/test/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptFutureTest.java rename to http-clients/apache5-client/src/test/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptWrapperTest.java index 72df8a3926cd..9d98bba5da1a 100644 --- a/http-clients/apache5-client/src/test/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptFutureTest.java +++ b/http-clients/apache5-client/src/test/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptWrapperTest.java @@ -30,10 +30,12 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -public class CancelOnInterruptFutureTest { +public class CancelOnInterruptWrapperTest { private Future mockDelegate; @BeforeEach @@ -41,6 +43,11 @@ void setup() { mockDelegate = mock(Future.class); } + @AfterEach + void teardown() { + Thread.interrupted(); // clear the flag if it was set by the last test + } + @Test void cancel_callsDelegate() { CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate); @@ -77,7 +84,7 @@ void getTimeout_callsDelegate() throws ExecutionException, InterruptedException, } @Test - void get_interrupted_cancelSuccessful_throws() throws ExecutionException, InterruptedException, TimeoutException { + void getTimeout_interrupted_cancelSuccessful_throws() throws ExecutionException, InterruptedException, TimeoutException { when(mockDelegate.get(anyLong(), any(TimeUnit.class))).thenThrow(new InterruptedException("interrupt")); when(mockDelegate.cancel(eq(true))).thenReturn(true); @@ -94,7 +101,8 @@ void get_interrupted_cancelSuccessful_throws() throws ExecutionException, Interr } @Test - void get_interrupted_cancelUnsuccessful_returnsEntry() throws ExecutionException, InterruptedException, TimeoutException { + void getTimeout_interrupted_cancelUnsuccessful_returnsEntry() throws ExecutionException, InterruptedException, + TimeoutException { String result = "hello there"; when(mockDelegate.get(anyLong(), any(TimeUnit.class))).thenThrow(new InterruptedException("interrupt")); @@ -107,7 +115,8 @@ void get_interrupted_cancelUnsuccessful_returnsEntry() throws ExecutionException } @Test - void get_interrupted_cancelUnsuccessful_getUnsuccessful_rethrowsOriginalIe() throws ExecutionException, InterruptedException, TimeoutException { + void getTimeout_interrupted_cancelUnsuccessful_getUnsuccessful_rethrowsOriginalIe() throws ExecutionException, + InterruptedException, TimeoutException { InterruptedException interrupt = new InterruptedException("interrupt"); when(mockDelegate.get(anyLong(), any(TimeUnit.class))).thenThrow(interrupt); @@ -118,4 +127,93 @@ void get_interrupted_cancelUnsuccessful_getUnsuccessful_rethrowsOriginalIe() thr assertThatThrownBy(() -> wrapper.get(1, TimeUnit.SECONDS)).isSameAs(interrupt); } + + @Test + void get_interrupted_cancelSuccessful_throws() throws ExecutionException, InterruptedException, TimeoutException { + when(mockDelegate.get()).thenThrow(new InterruptedException("interrupt")); + when(mockDelegate.cancel(eq(true))).thenReturn(true); + + CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate); + + assertThatThrownBy(wrapper::get) + .isInstanceOf(InterruptedException.class) + .hasMessage("interrupt"); + + verify(mockDelegate).get(); + verify(mockDelegate).cancel(eq(true)); + + verifyNoMoreInteractions(mockDelegate); + } + + @Test + void get_interrupted_cancelUnsuccessful_returnsEntry() throws ExecutionException, InterruptedException, TimeoutException { + String result = "hello there"; + + AtomicBoolean first = new AtomicBoolean(true); + when(mockDelegate.get()).thenAnswer(i -> { + if (first.compareAndSet(true, false)) { + throw new InterruptedException("interrupt"); + } + return result; + }); + when(mockDelegate.cancel(eq(true))).thenReturn(false); + + CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate); + + assertThat(wrapper.get()).isEqualTo(result); + } + + @Test + void get_interrupted_cancelUnsuccessful_getUnsuccessful_rethrowsOriginalIe() throws ExecutionException, InterruptedException, TimeoutException { + InterruptedException interrupt = new InterruptedException("interrupt"); + + AtomicBoolean first = new AtomicBoolean(true); + when(mockDelegate.get()).thenAnswer(i -> { + if (first.compareAndSet(true, false)) { + throw interrupt; + } + throw new CancellationException("cancelled"); + }); + when(mockDelegate.cancel(eq(true))).thenReturn(false); + + CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate); + + assertThatThrownBy(wrapper::get).isSameAs(interrupt); + } + + @Test + void get_interrupted_cancelUnsuccessful_cancelUnsuccessful_preservesInterruptedFlag() throws ExecutionException, InterruptedException { + String result = "hello there"; + + AtomicBoolean first = new AtomicBoolean(true); + when(mockDelegate.get()).thenAnswer(i -> { + if (first.compareAndSet(true, false)) { + throw new InterruptedException("interrupt"); + } + return result; + }); + when(mockDelegate.cancel(eq(true))).thenReturn(false); + + CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate); + + wrapper.get(); + + assertThat(Thread.interrupted()).isTrue(); + } + + @Test + void getTimeout_interrupted_cancelUnsuccessful_preservesInterruptedFlag() throws ExecutionException, InterruptedException, + TimeoutException { + String result = "hello there"; + + when(mockDelegate.get(anyLong(), any(TimeUnit.class))).thenThrow(new InterruptedException("interrupt")); + when(mockDelegate.cancel(eq(true))).thenReturn(false); + when(mockDelegate.get()).thenReturn(result); + + CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate); + + wrapper.get(1, TimeUnit.SECONDS); + + assertThat(Thread.interrupted()).isTrue(); + } } From fcdc139d5023c25f719a836235bada01801f5842 Mon Sep 17 00:00:00 2001 From: Dongie Agnir Date: Fri, 3 Apr 2026 14:01:31 -0700 Subject: [PATCH 3/3] Checkstyle fix --- .../http/apache5/internal/utils/CancelOnInterruptWrapper.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptWrapper.java b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptWrapper.java index aefb9fd7eb2d..19f5f29ae481 100644 --- a/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptWrapper.java +++ b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptWrapper.java @@ -76,8 +76,7 @@ public ResultT get() throws InterruptedException, ExecutionException { // attempting to cancel() the future when InterruptedException is thrown. If the the cancel() is unsuccessful (i.e. // the future is completed either successfully or exceptionally), then get the result if present and return it. @Override - public ResultT get(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { + public ResultT get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { try { return f.get(timeout, unit); } catch (InterruptedException ie) {