diff --git a/.changes/next-release/bugfix-Apache5HTTPClient-9de458e.json b/.changes/next-release/bugfix-Apache5HTTPClient-9de458e.json new file mode 100644 index 000000000000..01eb808d9d22 --- /dev/null +++ b/.changes/next-release/bugfix-Apache5HTTPClient-9de458e.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "Apache 5 HTTP Client", + "contributor": "", + "description": "Fixed a connection leak that could occur when the thread waiting to acquire a connection from the pool is interrupted. Fixes [#6786](https://github.com/aws/aws-sdk-java-v2/issues/6786)." +} diff --git a/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/Apache5HttpClient.java b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/Apache5HttpClient.java index ef4780866f36..6a4b738fdac7 100644 --- a/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/Apache5HttpClient.java +++ b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/Apache5HttpClient.java @@ -50,7 +50,6 @@ import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; import org.apache.hc.client5.http.impl.classic.HttpClients; import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager; -import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder; import org.apache.hc.client5.http.impl.routing.DefaultRoutePlanner; import org.apache.hc.client5.http.io.HttpClientConnectionManager; import org.apache.hc.client5.http.protocol.HttpClientContext; @@ -92,6 +91,7 @@ import software.amazon.awssdk.http.apache5.internal.SdkProxyRoutePlanner; import software.amazon.awssdk.http.apache5.internal.conn.ClientConnectionManagerFactory; import software.amazon.awssdk.http.apache5.internal.conn.IdleConnectionReaper; +import software.amazon.awssdk.http.apache5.internal.conn.SafePoolingHttpClientConnectionManagerBuilder; import software.amazon.awssdk.http.apache5.internal.conn.SdkConnectionKeepAliveStrategy; import software.amazon.awssdk.http.apache5.internal.conn.SdkTlsSocketFactory; import software.amazon.awssdk.http.apache5.internal.impl.Apache5HttpRequestFactory; @@ -755,8 +755,8 @@ public PoolingHttpClientConnectionManager create(Apache5HttpClient.DefaultBuilde TlsSocketStrategy tlsStrategy = getPreferredTlsStrategy(configuration, standardOptions); - PoolingHttpClientConnectionManagerBuilder builder = - PoolingHttpClientConnectionManagerBuilder.create() + SafePoolingHttpClientConnectionManagerBuilder builder = + SafePoolingHttpClientConnectionManagerBuilder.create() .setTlsSocketStrategy(tlsStrategy) .setSchemePortResolver(DefaultSchemePortResolver.INSTANCE) .setDnsResolver(configuration.dnsResolver); diff --git a/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManager.java b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManager.java new file mode 100644 index 000000000000..7f2b52bde072 --- /dev/null +++ b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManager.java @@ -0,0 +1,44 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.apache5.internal.conn; + +import org.apache.hc.client5.http.impl.io.ManagedHttpClientConnectionFactory; +import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager; +import org.apache.hc.client5.http.io.HttpClientConnectionOperator; +import org.apache.hc.core5.pool.DefaultDisposalCallback; +import org.apache.hc.core5.pool.PoolReusePolicy; +import software.amazon.awssdk.annotations.SdkInternalApi; + +/** + * Specialization of {@link PoolingHttpClientConnectionManager} to enable use of {@link SafeStrictConnPool} to prevent leaking + * connections when the thread waiting on the future is interrupted. + */ +@SdkInternalApi +public final class SafePoolingHttpClientConnectionManager extends PoolingHttpClientConnectionManager { + public SafePoolingHttpClientConnectionManager(HttpClientConnectionOperator connectionOperator) { + super(connectionOperator, + new SafeStrictConnPool( + DEFAULT_MAX_CONNECTIONS_PER_ROUTE, + DEFAULT_MAX_TOTAL_CONNECTIONS, + null, + PoolReusePolicy.LIFO, + new DefaultDisposalCallback<>(), + null + ), + ManagedHttpClientConnectionFactory.INSTANCE + ); + } +} diff --git a/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManagerBuilder.java b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManagerBuilder.java new file mode 100644 index 000000000000..36373ce71290 --- /dev/null +++ b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManagerBuilder.java @@ -0,0 +1,235 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package software.amazon.awssdk.http.apache5.internal.conn; + +import org.apache.hc.client5.http.DnsResolver; +import org.apache.hc.client5.http.HttpRoute; +import org.apache.hc.client5.http.SchemePortResolver; +import org.apache.hc.client5.http.config.ConnectionConfig; +import org.apache.hc.client5.http.config.TlsConfig; +import org.apache.hc.client5.http.impl.io.DefaultHttpClientConnectionOperator; +import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder; +import org.apache.hc.client5.http.io.HttpClientConnectionOperator; +import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy; +import org.apache.hc.client5.http.ssl.TlsSocketStrategy; +import org.apache.hc.core5.function.Resolver; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.config.RegistryBuilder; +import org.apache.hc.core5.http.io.SocketConfig; +import software.amazon.awssdk.annotations.SdkInternalApi; + +/** + * This is a fork of {@link PoolingHttpClientConnectionManagerBuilder} from Apache 5. The purpose of this forked class is to + * enable usage of the {@link SafePoolingHttpClientConnectionManager} to enable the workaround for + * https://github.com/aws/aws-sdk-java-v2/issues/6786. + */ +// This a direct copy of PoolingHttpClientConnectionManagerBuilder with minor changes to remove methods we don't use and +// updates to follow our style guide. +@SdkInternalApi +public final class SafePoolingHttpClientConnectionManagerBuilder { + + private TlsSocketStrategy tlsSocketStrategy; + private SchemePortResolver schemePortResolver; + private DnsResolver dnsResolver; + private Resolver socketConfigResolver; + private Resolver connectionConfigResolver; + private Resolver tlsConfigResolver; + + private int maxConnTotal; + private int maxConnPerRoute; + + public static SafePoolingHttpClientConnectionManagerBuilder create() { + return new SafePoolingHttpClientConnectionManagerBuilder(); + } + + /** + * Sets {@link TlsSocketStrategy} instance. + * + * @return this instance. + */ + public SafePoolingHttpClientConnectionManagerBuilder setTlsSocketStrategy(TlsSocketStrategy tlsSocketStrategy) { + this.tlsSocketStrategy = tlsSocketStrategy; + return this; + } + + /** + * Sets {@link DnsResolver} instance. + * + * @return this instance. + */ + public SafePoolingHttpClientConnectionManagerBuilder setDnsResolver(DnsResolver dnsResolver) { + this.dnsResolver = dnsResolver; + return this; + } + + /** + * Sets {@link SchemePortResolver} instance. + * + * @return this instance. + */ + public SafePoolingHttpClientConnectionManagerBuilder setSchemePortResolver(SchemePortResolver schemePortResolver) { + this.schemePortResolver = schemePortResolver; + return this; + } + + /** + * Sets maximum total connection value. + * + * @return this instance. + */ + public SafePoolingHttpClientConnectionManagerBuilder setMaxConnTotal(int maxConnTotal) { + this.maxConnTotal = maxConnTotal; + return this; + } + + /** + * Sets maximum connection per route value. + * + * @return this instance. + */ + public SafePoolingHttpClientConnectionManagerBuilder setMaxConnPerRoute(int maxConnPerRoute) { + this.maxConnPerRoute = maxConnPerRoute; + return this; + } + + /** + * Sets the same {@link SocketConfig} for all routes. + * + * @return this instance. + */ + public SafePoolingHttpClientConnectionManagerBuilder setDefaultSocketConfig(SocketConfig config) { + this.socketConfigResolver = route -> config; + return this; + } + + /** + * Sets {@link Resolver} of {@link SocketConfig} on a per route basis. + * + * @return this instance. + * @since 5.2 + */ + public SafePoolingHttpClientConnectionManagerBuilder setSocketConfigResolver( + Resolver socketConfigResolver) { + this.socketConfigResolver = socketConfigResolver; + return this; + } + + /** + * Sets the same {@link ConnectionConfig} for all routes. + * + * @return this instance. + * @since 5.2 + */ + public SafePoolingHttpClientConnectionManagerBuilder setDefaultConnectionConfig(ConnectionConfig config) { + this.connectionConfigResolver = route -> config; + return this; + } + + /** + * Sets {@link Resolver} of {@link ConnectionConfig} on a per route basis. + * + * @return this instance. + * @since 5.2 + */ + public SafePoolingHttpClientConnectionManagerBuilder setConnectionConfigResolver( + Resolver connectionConfigResolver) { + this.connectionConfigResolver = connectionConfigResolver; + return this; + } + + /** + * Sets the same {@link TlsConfig} for all hosts. + * + * @return this instance. + * @since 5.2 + */ + public SafePoolingHttpClientConnectionManagerBuilder setDefaultTlsConfig(TlsConfig config) { + this.tlsConfigResolver = host -> config; + return this; + } + + /** + * Sets {@link Resolver} of {@link TlsConfig} on a per host basis. + * + * @return this instance. + * @since 5.2 + */ + public SafePoolingHttpClientConnectionManagerBuilder setTlsConfigResolver( + Resolver tlsConfigResolver) { + this.tlsConfigResolver = tlsConfigResolver; + return this; + } + + protected HttpClientConnectionOperator createConnectionOperator(SchemePortResolver schemePortResolver, + DnsResolver dnsResolver, + TlsSocketStrategy tlsSocketStrategy) { + return new DefaultHttpClientConnectionOperator(schemePortResolver, dnsResolver, + RegistryBuilder.create() + .register(URIScheme.HTTPS.id, tlsSocketStrategy) + .build()); + } + + public SafePoolingHttpClientConnectionManager build() { + TlsSocketStrategy tlsSocketStrategyCopy; + if (tlsSocketStrategy != null) { + tlsSocketStrategyCopy = tlsSocketStrategy; + } else { + tlsSocketStrategyCopy = DefaultClientTlsStrategy.createDefault(); + } + + SafePoolingHttpClientConnectionManager poolingmgr = new SafePoolingHttpClientConnectionManager( + createConnectionOperator(schemePortResolver, dnsResolver, tlsSocketStrategyCopy)); + poolingmgr.setSocketConfigResolver(socketConfigResolver); + poolingmgr.setConnectionConfigResolver(connectionConfigResolver); + poolingmgr.setTlsConfigResolver(tlsConfigResolver); + if (maxConnTotal > 0) { + poolingmgr.setMaxTotal(maxConnTotal); + } + if (maxConnPerRoute > 0) { + poolingmgr.setDefaultMaxPerRoute(maxConnPerRoute); + } + return poolingmgr; + } + +} diff --git a/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafeStrictConnPool.java b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafeStrictConnPool.java new file mode 100644 index 000000000000..5df86d1729b3 --- /dev/null +++ b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/conn/SafeStrictConnPool.java @@ -0,0 +1,59 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.apache5.internal.conn; + +import java.util.concurrent.Future; +import org.apache.hc.client5.http.HttpRoute; +import org.apache.hc.client5.http.io.ManagedHttpClientConnection; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.pool.ConnPoolListener; +import org.apache.hc.core5.pool.DisposalCallback; +import org.apache.hc.core5.pool.PoolEntry; +import org.apache.hc.core5.pool.PoolReusePolicy; +import org.apache.hc.core5.pool.StrictConnPool; +import org.apache.hc.core5.util.TimeValue; +import org.apache.hc.core5.util.Timeout; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.http.apache5.internal.utils.CancelOnInterruptWrapper; + +/** + * Specialization of {@link StrictConnPool} that prevents leaking the connection when thread waiting on the future is + * interrupted. + */ +@SdkInternalApi +public final class SafeStrictConnPool extends StrictConnPool { + public SafeStrictConnPool(int defaultMaxPerRoute, + int maxTotal, + TimeValue timeToLive, + PoolReusePolicy policy, + DisposalCallback disposalCallback, + ConnPoolListener connPoolListener) { + super(defaultMaxPerRoute, maxTotal, timeToLive, policy, disposalCallback, connPoolListener); + } + + public Future> lease(HttpRoute route, + Object state, + Timeout requestTimeout, + FutureCallback> callback) { + return safeLease(super.lease(route, state, requestTimeout, callback)); + } + + private Future> safeLease( + Future> leaseFuture) { + return new CancelOnInterruptWrapper<>(leaseFuture); + } +} diff --git a/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptWrapper.java b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptWrapper.java new file mode 100644 index 000000000000..19f5f29ae481 --- /dev/null +++ b/http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptWrapper.java @@ -0,0 +1,96 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.apache5.internal.utils; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import software.amazon.awssdk.annotations.SdkInternalApi; + +/** + * Wrapper that attempts to cancel the delegate future if the thread is interrupted within get(). + */ +@SdkInternalApi +public final class CancelOnInterruptWrapper implements Future { + private final Future f; + + public CancelOnInterruptWrapper(Future f) { + this.f = f; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return f.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return f.isCancelled(); + } + + @Override + public boolean isDone() { + return f.isDone(); + } + + + // This method attempts to cancel the wrapped future if the thread is interrupted while blocked on get(). This is done by + // attempting to cancel() the future when InterruptedException is thrown. If the the cancel() is unsuccessful (i.e. + // the future is completed either successfully or exceptionally), then get the result if present and return it. + @Override + public ResultT get() throws InterruptedException, ExecutionException { + try { + return f.get(); + } catch (InterruptedException ie) { + if (!cancel(true)) { + try { + // We couldn't cancel so the result will be available or it failed + ResultT entry = f.get(); + Thread.currentThread().interrupt(); + return entry; + } catch (CancellationException | InterruptedException | ExecutionException e) { + // no-op, let it fall through to throwing the original interrupted exception + } + } + throw ie; + } + } + + + // This method attempts to cancel the wrapped future if the thread is interrupted while blocked on get(). This is done by + // attempting to cancel() the future when InterruptedException is thrown. If the the cancel() is unsuccessful (i.e. + // the future is completed either successfully or exceptionally), then get the result if present and return it. + @Override + public ResultT get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + try { + return f.get(timeout, unit); + } catch (InterruptedException ie) { + if (!cancel(true)) { + try { + // We couldn't cancel so the result will be available or it failed + ResultT entry = f.get(); + Thread.currentThread().interrupt(); + return entry; + } catch (CancellationException | InterruptedException | ExecutionException e) { + // no-op, let it fall through to throwing the original interrupted exception + } + } + throw ie; + } + } +} diff --git a/http-clients/apache5-client/src/test/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManagerTest.java b/http-clients/apache5-client/src/test/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManagerTest.java new file mode 100644 index 000000000000..8849bb3ba3ce --- /dev/null +++ b/http-clients/apache5-client/src/test/java/software/amazon/awssdk/http/apache5/internal/conn/SafePoolingHttpClientConnectionManagerTest.java @@ -0,0 +1,127 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.apache5.internal.conn; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.github.tomakehurst.wiremock.WireMockServer; +import com.github.tomakehurst.wiremock.client.WireMock; +import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hc.client5.http.classic.methods.HttpGet; +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; +import org.apache.hc.client5.http.impl.classic.HttpClients; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.ParseException; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.apache.hc.core5.pool.PoolStats; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class SafePoolingHttpClientConnectionManagerTest { + private static WireMockServer wm; + + @BeforeAll + static void setup() { + wm = new WireMockServer(WireMockConfiguration.wireMockConfig().dynamicPort()); + wm.start(); + } + + @AfterAll + static void teardown() { + wm.stop(); + } + + @Test + void leaseInterrupted_doesNotLeakConnections_probabilistic() throws IOException, InterruptedException, ParseException { + wm.stubFor(WireMock.get(WireMock.anyUrl()).willReturn(WireMock.aResponse().withStatus(204))); + int rounds = 16; + for (int i = 0; i < rounds; ++i) { + leaseInterrupted_doesNotLeakConnections(); + } + } + + private void leaseInterrupted_doesNotLeakConnections() throws InterruptedException, IOException, ParseException { + int numRequests = 10_000; + + SafePoolingHttpClientConnectionManager connectionManager = SafePoolingHttpClientConnectionManagerBuilder.create().build(); + connectionManager.setMaxTotal(100); + connectionManager.setDefaultMaxPerRoute(40); + + try (CloseableHttpClient httpClient = HttpClients.custom().setConnectionManager(connectionManager).build()) { + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger errorCount = new AtomicInteger(0); + ExecutorService executor = Executors.newFixedThreadPool(100); + + try { + List> futures = new ArrayList<>(numRequests); + CountDownLatch go = new CountDownLatch(numRequests); + + for (int i = 0; i < numRequests; i++) { + Future f = + executor.submit( + () -> { + go.countDown(); + try { + doRequest(httpClient); + successCount.incrementAndGet(); + } catch (Exception e) { + errorCount.incrementAndGet(); + } + }); + futures.add(f); + } + + go.await(); + + for (Future future : futures) { + future.cancel(true); + } + } finally { + executor.shutdown(); + executor.awaitTermination(100, TimeUnit.SECONDS); + } + connectionManager.closeExpired(); + assertNoLeases(connectionManager); + doRequest(httpClient); + } + } + + private static void doRequest(CloseableHttpClient httpClient) throws IOException, ParseException { + HttpGet httpGet = new HttpGet("http://localhost:" + wm.port()); + try (CloseableHttpResponse response = httpClient.execute(httpGet)) { + HttpEntity entity = response.getEntity(); + if (entity != null) { + EntityUtils.toString(entity); + } + } + } + + private static void assertNoLeases(SafePoolingHttpClientConnectionManager manager) { + PoolStats totalStats = manager.getTotalStats(); + assertThat(totalStats.getLeased()).as("Leased connection count").isEqualTo(0); + } +} diff --git a/http-clients/apache5-client/src/test/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptWrapperTest.java b/http-clients/apache5-client/src/test/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptWrapperTest.java new file mode 100644 index 000000000000..9d98bba5da1a --- /dev/null +++ b/http-clients/apache5-client/src/test/java/software/amazon/awssdk/http/apache5/internal/utils/CancelOnInterruptWrapperTest.java @@ -0,0 +1,219 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.apache5.internal.utils; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class CancelOnInterruptWrapperTest { + private Future mockDelegate; + + @BeforeEach + void setup() { + mockDelegate = mock(Future.class); + } + + @AfterEach + void teardown() { + Thread.interrupted(); // clear the flag if it was set by the last test + } + + @Test + void cancel_callsDelegate() { + CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate); + wrapper.cancel(true); + verify(mockDelegate).cancel(eq(true)); + } + + @Test + void isCancelled_callsDelegate() { + CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate); + wrapper.isCancelled(); + verify(mockDelegate).isCancelled(); + } + + @Test + void isDone_callsDelegate() { + CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate); + wrapper.isDone(); + verify(mockDelegate).isDone(); + } + + @Test + void get_callsDelegate() throws ExecutionException, InterruptedException { + CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate); + wrapper.get(); + verify(mockDelegate).get(); + } + + @Test + void getTimeout_callsDelegate() throws ExecutionException, InterruptedException, TimeoutException { + CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate); + wrapper.get(42, TimeUnit.DAYS); + verify(mockDelegate).get(eq(42L), eq(TimeUnit.DAYS)); + } + + @Test + void getTimeout_interrupted_cancelSuccessful_throws() throws ExecutionException, InterruptedException, TimeoutException { + when(mockDelegate.get(anyLong(), any(TimeUnit.class))).thenThrow(new InterruptedException("interrupt")); + when(mockDelegate.cancel(eq(true))).thenReturn(true); + + CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate); + + assertThatThrownBy(() -> wrapper.get(1, TimeUnit.SECONDS)) + .isInstanceOf(InterruptedException.class) + .hasMessage("interrupt"); + + verify(mockDelegate).get(eq(1L), eq(TimeUnit.SECONDS)); + verify(mockDelegate).cancel(eq(true)); + + verifyNoMoreInteractions(mockDelegate); + } + + @Test + void getTimeout_interrupted_cancelUnsuccessful_returnsEntry() throws ExecutionException, InterruptedException, + TimeoutException { + String result = "hello there"; + + when(mockDelegate.get(anyLong(), any(TimeUnit.class))).thenThrow(new InterruptedException("interrupt")); + when(mockDelegate.cancel(eq(true))).thenReturn(false); + when(mockDelegate.get()).thenReturn(result); + + CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate); + + assertThat(wrapper.get(1, TimeUnit.SECONDS)).isEqualTo(result); + } + + @Test + void getTimeout_interrupted_cancelUnsuccessful_getUnsuccessful_rethrowsOriginalIe() throws ExecutionException, + InterruptedException, TimeoutException { + InterruptedException interrupt = new InterruptedException("interrupt"); + + when(mockDelegate.get(anyLong(), any(TimeUnit.class))).thenThrow(interrupt); + when(mockDelegate.cancel(eq(true))).thenReturn(false); + when(mockDelegate.get()).thenThrow(new CancellationException("cancelled")); + + CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate); + + assertThatThrownBy(() -> wrapper.get(1, TimeUnit.SECONDS)).isSameAs(interrupt); + } + + @Test + void get_interrupted_cancelSuccessful_throws() throws ExecutionException, InterruptedException, TimeoutException { + when(mockDelegate.get()).thenThrow(new InterruptedException("interrupt")); + when(mockDelegate.cancel(eq(true))).thenReturn(true); + + CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate); + + assertThatThrownBy(wrapper::get) + .isInstanceOf(InterruptedException.class) + .hasMessage("interrupt"); + + verify(mockDelegate).get(); + verify(mockDelegate).cancel(eq(true)); + + verifyNoMoreInteractions(mockDelegate); + } + + @Test + void get_interrupted_cancelUnsuccessful_returnsEntry() throws ExecutionException, InterruptedException, TimeoutException { + String result = "hello there"; + + AtomicBoolean first = new AtomicBoolean(true); + when(mockDelegate.get()).thenAnswer(i -> { + if (first.compareAndSet(true, false)) { + throw new InterruptedException("interrupt"); + } + return result; + }); + when(mockDelegate.cancel(eq(true))).thenReturn(false); + + CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate); + + assertThat(wrapper.get()).isEqualTo(result); + } + + @Test + void get_interrupted_cancelUnsuccessful_getUnsuccessful_rethrowsOriginalIe() throws ExecutionException, InterruptedException, TimeoutException { + InterruptedException interrupt = new InterruptedException("interrupt"); + + AtomicBoolean first = new AtomicBoolean(true); + when(mockDelegate.get()).thenAnswer(i -> { + if (first.compareAndSet(true, false)) { + throw interrupt; + } + throw new CancellationException("cancelled"); + }); + when(mockDelegate.cancel(eq(true))).thenReturn(false); + + CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate); + + assertThatThrownBy(wrapper::get).isSameAs(interrupt); + } + + @Test + void get_interrupted_cancelUnsuccessful_cancelUnsuccessful_preservesInterruptedFlag() throws ExecutionException, InterruptedException { + String result = "hello there"; + + AtomicBoolean first = new AtomicBoolean(true); + when(mockDelegate.get()).thenAnswer(i -> { + if (first.compareAndSet(true, false)) { + throw new InterruptedException("interrupt"); + } + return result; + }); + when(mockDelegate.cancel(eq(true))).thenReturn(false); + + CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate); + + wrapper.get(); + + assertThat(Thread.interrupted()).isTrue(); + } + + @Test + void getTimeout_interrupted_cancelUnsuccessful_preservesInterruptedFlag() throws ExecutionException, InterruptedException, + TimeoutException { + String result = "hello there"; + + when(mockDelegate.get(anyLong(), any(TimeUnit.class))).thenThrow(new InterruptedException("interrupt")); + when(mockDelegate.cancel(eq(true))).thenReturn(false); + when(mockDelegate.get()).thenReturn(result); + + CancelOnInterruptWrapper wrapper = new CancelOnInterruptWrapper<>(mockDelegate); + + wrapper.get(1, TimeUnit.SECONDS); + + assertThat(Thread.interrupted()).isTrue(); + } +}