diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestClassicOverAsync.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestClassicOverAsync.java
index ac06bdf527..08736af528 100644
--- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestClassicOverAsync.java
+++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestClassicOverAsync.java
@@ -27,6 +27,7 @@
package org.apache.hc.client5.testing.async;
import java.io.IOException;
+import java.io.InputStream;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -34,17 +35,23 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.io.input.ProxyInputStream;
+import org.apache.hc.client5.http.ClientProtocolException;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.testing.Result;
import org.apache.hc.client5.testing.extension.async.ClientProtocolLevel;
import org.apache.hc.client5.testing.extension.async.ServerProtocolLevel;
import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.HttpEntity;
+import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.http.io.entity.ByteArrayEntity;
import org.apache.hc.core5.http.io.entity.EntityUtils;
+import org.apache.hc.core5.http.io.entity.HttpEntityWrapper;
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
@@ -52,6 +59,58 @@
public abstract class TestClassicOverAsync extends AbstractClassicOverAsyncIntegrationTestBase {
+ /**
+ * Mocks an InputStream that records whether it has been closed.
+ */
+ private static final class MockInputStream extends ProxyInputStream {
+
+ boolean closed;
+
+ public MockInputStream(final InputStream proxy) {
+ super(proxy);
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ super.close();
+ closed = true;
+ }
+
+ }
+
+ /**
+ * Mocks an HttpEntity that records whether it has been closed.
+ */
+ private static final class MockHttpEntityWrapper extends HttpEntityWrapper {
+
+ boolean closed;
+ final boolean streaming;
+ final InputStream content;
+
+ public MockHttpEntityWrapper(final HttpEntity wrappedEntity, final boolean streaming) throws UnsupportedOperationException, IOException {
+ super(wrappedEntity);
+ this.streaming = streaming;
+ this.content = new MockInputStream(wrappedEntity.getContent());
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ super.close();
+ closed = true;
+ }
+
+ @Override
+ public synchronized InputStream getContent() throws IOException {
+ return content;
+ }
+
+ @Override
+ public boolean isStreaming() {
+ return streaming;
+ }
+
+ }
+
public TestClassicOverAsync(final URIScheme scheme,
final ClientProtocolLevel clientProtocolLevel,
final ServerProtocolLevel serverProtocolLevel) {
@@ -84,6 +143,113 @@ void testSequentialGetRequests(final int contentSize) throws Exception {
}
}
+ /**
+ * Tests that the response entity is the client when an AssertionError is thrown from the response handler.
+ * The content stream is not closed because the entity is non-streaming.
+ */
+ @ValueSource(ints = {0, 2048, 10240})
+ @ParameterizedTest(name = "{displayName}; content length: {0}")
+ void testSequentialGetRequestsAssertionError(final int contentSize) throws Exception {
+ configureServer(bootstrap -> bootstrap.register("/random/*", AsyncRandomHandler::new));
+ final HttpHost target = startServer();
+ final CloseableHttpClient client = startClient();
+ final int n = 10;
+ final String detailMessage = "Simulated unit test failure";
+ for (int i = 0; i < n; i++) {
+ final AtomicReference entityRef = new AtomicReference<>();
+ final AtomicReference contentRef = new AtomicReference<>();
+ try {
+ entityRef.set(null);
+ client.execute(
+ ClassicRequestBuilder.get()
+ .setHttpHost(target)
+ .setPath("/random/" + contentSize)
+ .build(),
+ response -> {
+ final MockHttpEntityWrapper entity = new MockHttpEntityWrapper(response.getEntity(), false);
+ entityRef.set(entity);
+ contentRef.set((MockInputStream) entity.getContent());
+ response.setEntity(entityRef.get());
+ throw new AssertionError(detailMessage);
+ });
+ Assertions.fail("AssertionError expected from execute()");
+ } catch (final AssertionError e) {
+ // Note that we can't use Assertions.assertThrows() because it doesn't catch AssertionError.
+ Assertions.assertEquals(detailMessage, e.getMessage());
+ }
+ Assertions.assertNotNull(entityRef.get());
+ Assertions.assertTrue(entityRef.get().closed);
+ Assertions.assertFalse(contentRef.get().closed);
+ }
+ }
+
+ /**
+ * Tests that the response entity is the client when an HttpException is thrown from the response handler.
+ * The content stream is not closed because the entity is non-streaming.
+ */
+ @ValueSource(ints = {0, 2048, 10240})
+ @ParameterizedTest(name = "{displayName}; content length: {0}")
+ void testSequentialGetRequestsHttpException(final int contentSize) throws Exception {
+ configureServer(bootstrap -> bootstrap.register("/random/*", AsyncRandomHandler::new));
+ final HttpHost target = startServer();
+ final CloseableHttpClient client = startClient();
+ final int n = 10;
+ final String detailMessage = "Simulated HttpException failure";
+ for (int i = 0; i < n; i++) {
+ final AtomicReference entityRef = new AtomicReference<>();
+ final AtomicReference contentRef = new AtomicReference<>();
+ final ClientProtocolException e = Assertions.assertThrows(ClientProtocolException.class, () -> {
+ client.execute(
+ ClassicRequestBuilder.get()
+ .setHttpHost(target)
+ .setPath("/random/" + contentSize)
+ .build(),
+ response -> {
+ final MockHttpEntityWrapper entity = new MockHttpEntityWrapper(response.getEntity(), false);
+ entityRef.set(entity);
+ contentRef.set((MockInputStream) entity.getContent());
+ response.setEntity(entityRef.get());
+ throw new HttpException(detailMessage);
+ });
+ });
+ // If an HttpException is thrown from the handler, and the entity is non-streaming, the stream is left open.
+ Assertions.assertEquals(detailMessage, e.getCause().getMessage());
+ Assertions.assertTrue(entityRef.get().closed);
+ Assertions.assertFalse(contentRef.get().closed);
+ }
+ }
+
+ /**
+ * Tests that the response entity and its content are closed by the client.
+ */
+ @ValueSource(ints = {0, 2048, 10240})
+ @ParameterizedTest(name = "{displayName}; content length: {0}")
+ void testSequentialGetRequestsStreaming(final int contentSize) throws Exception {
+ configureServer(bootstrap -> bootstrap.register("/random/*", AsyncRandomHandler::new));
+ final HttpHost target = startServer();
+ final CloseableHttpClient client = startClient();
+ final int n = 10;
+ for (int i = 0; i < n; i++) {
+ final AtomicReference contentRef = new AtomicReference<>();
+ final MockHttpEntityWrapper result = (MockHttpEntityWrapper) client.execute(
+ ClassicRequestBuilder.get()
+ .setHttpHost(target)
+ .setPath("/random/" + contentSize)
+ .build(),
+ response -> {
+ Assertions.assertEquals(200, response.getCode());
+ final MockHttpEntityWrapper entity = new MockHttpEntityWrapper(response.getEntity(), true);
+ contentRef.set((MockInputStream) entity.getContent());
+ response.setEntity(entity);
+ return response.getEntity();
+ });
+ // The plain use case where the entity and its input stream are closed.
+ Assertions.assertTrue(result.closed);
+ Assertions.assertTrue(contentRef.get().closed);
+ }
+ }
+
+
@ValueSource(ints = {0, 2048, 10240})
@ParameterizedTest(name = "{displayName}; content length: {0}")
void testConcurrentGetRequests(final int contentSize) throws Exception {