diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestClassicOverAsync.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestClassicOverAsync.java index ac06bdf527..08736af528 100644 --- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestClassicOverAsync.java +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestClassicOverAsync.java @@ -27,6 +27,7 @@ package org.apache.hc.client5.testing.async; import java.io.IOException; +import java.io.InputStream; import java.util.Queue; import java.util.Random; import java.util.concurrent.ConcurrentLinkedQueue; @@ -34,17 +35,23 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.io.input.ProxyInputStream; +import org.apache.hc.client5.http.ClientProtocolException; import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; import org.apache.hc.client5.testing.Result; import org.apache.hc.client5.testing.extension.async.ClientProtocolLevel; import org.apache.hc.client5.testing.extension.async.ServerProtocolLevel; import org.apache.hc.core5.http.ClassicHttpRequest; import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpException; import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.URIScheme; import org.apache.hc.core5.http.io.entity.ByteArrayEntity; import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.apache.hc.core5.http.io.entity.HttpEntityWrapper; import org.apache.hc.core5.http.io.support.ClassicRequestBuilder; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.params.ParameterizedTest; @@ -52,6 +59,58 @@ public abstract class TestClassicOverAsync extends AbstractClassicOverAsyncIntegrationTestBase { + /** + * Mocks an InputStream that records whether it has been closed. + */ + private static final class MockInputStream extends ProxyInputStream { + + boolean closed; + + public MockInputStream(final InputStream proxy) { + super(proxy); + } + + @Override + public synchronized void close() throws IOException { + super.close(); + closed = true; + } + + } + + /** + * Mocks an HttpEntity that records whether it has been closed. + */ + private static final class MockHttpEntityWrapper extends HttpEntityWrapper { + + boolean closed; + final boolean streaming; + final InputStream content; + + public MockHttpEntityWrapper(final HttpEntity wrappedEntity, final boolean streaming) throws UnsupportedOperationException, IOException { + super(wrappedEntity); + this.streaming = streaming; + this.content = new MockInputStream(wrappedEntity.getContent()); + } + + @Override + public synchronized void close() throws IOException { + super.close(); + closed = true; + } + + @Override + public synchronized InputStream getContent() throws IOException { + return content; + } + + @Override + public boolean isStreaming() { + return streaming; + } + + } + public TestClassicOverAsync(final URIScheme scheme, final ClientProtocolLevel clientProtocolLevel, final ServerProtocolLevel serverProtocolLevel) { @@ -84,6 +143,113 @@ void testSequentialGetRequests(final int contentSize) throws Exception { } } + /** + * Tests that the response entity is the client when an AssertionError is thrown from the response handler. + * The content stream is not closed because the entity is non-streaming. + */ + @ValueSource(ints = {0, 2048, 10240}) + @ParameterizedTest(name = "{displayName}; content length: {0}") + void testSequentialGetRequestsAssertionError(final int contentSize) throws Exception { + configureServer(bootstrap -> bootstrap.register("/random/*", AsyncRandomHandler::new)); + final HttpHost target = startServer(); + final CloseableHttpClient client = startClient(); + final int n = 10; + final String detailMessage = "Simulated unit test failure"; + for (int i = 0; i < n; i++) { + final AtomicReference entityRef = new AtomicReference<>(); + final AtomicReference contentRef = new AtomicReference<>(); + try { + entityRef.set(null); + client.execute( + ClassicRequestBuilder.get() + .setHttpHost(target) + .setPath("/random/" + contentSize) + .build(), + response -> { + final MockHttpEntityWrapper entity = new MockHttpEntityWrapper(response.getEntity(), false); + entityRef.set(entity); + contentRef.set((MockInputStream) entity.getContent()); + response.setEntity(entityRef.get()); + throw new AssertionError(detailMessage); + }); + Assertions.fail("AssertionError expected from execute()"); + } catch (final AssertionError e) { + // Note that we can't use Assertions.assertThrows() because it doesn't catch AssertionError. + Assertions.assertEquals(detailMessage, e.getMessage()); + } + Assertions.assertNotNull(entityRef.get()); + Assertions.assertTrue(entityRef.get().closed); + Assertions.assertFalse(contentRef.get().closed); + } + } + + /** + * Tests that the response entity is the client when an HttpException is thrown from the response handler. + * The content stream is not closed because the entity is non-streaming. + */ + @ValueSource(ints = {0, 2048, 10240}) + @ParameterizedTest(name = "{displayName}; content length: {0}") + void testSequentialGetRequestsHttpException(final int contentSize) throws Exception { + configureServer(bootstrap -> bootstrap.register("/random/*", AsyncRandomHandler::new)); + final HttpHost target = startServer(); + final CloseableHttpClient client = startClient(); + final int n = 10; + final String detailMessage = "Simulated HttpException failure"; + for (int i = 0; i < n; i++) { + final AtomicReference entityRef = new AtomicReference<>(); + final AtomicReference contentRef = new AtomicReference<>(); + final ClientProtocolException e = Assertions.assertThrows(ClientProtocolException.class, () -> { + client.execute( + ClassicRequestBuilder.get() + .setHttpHost(target) + .setPath("/random/" + contentSize) + .build(), + response -> { + final MockHttpEntityWrapper entity = new MockHttpEntityWrapper(response.getEntity(), false); + entityRef.set(entity); + contentRef.set((MockInputStream) entity.getContent()); + response.setEntity(entityRef.get()); + throw new HttpException(detailMessage); + }); + }); + // If an HttpException is thrown from the handler, and the entity is non-streaming, the stream is left open. + Assertions.assertEquals(detailMessage, e.getCause().getMessage()); + Assertions.assertTrue(entityRef.get().closed); + Assertions.assertFalse(contentRef.get().closed); + } + } + + /** + * Tests that the response entity and its content are closed by the client. + */ + @ValueSource(ints = {0, 2048, 10240}) + @ParameterizedTest(name = "{displayName}; content length: {0}") + void testSequentialGetRequestsStreaming(final int contentSize) throws Exception { + configureServer(bootstrap -> bootstrap.register("/random/*", AsyncRandomHandler::new)); + final HttpHost target = startServer(); + final CloseableHttpClient client = startClient(); + final int n = 10; + for (int i = 0; i < n; i++) { + final AtomicReference contentRef = new AtomicReference<>(); + final MockHttpEntityWrapper result = (MockHttpEntityWrapper) client.execute( + ClassicRequestBuilder.get() + .setHttpHost(target) + .setPath("/random/" + contentSize) + .build(), + response -> { + Assertions.assertEquals(200, response.getCode()); + final MockHttpEntityWrapper entity = new MockHttpEntityWrapper(response.getEntity(), true); + contentRef.set((MockInputStream) entity.getContent()); + response.setEntity(entity); + return response.getEntity(); + }); + // The plain use case where the entity and its input stream are closed. + Assertions.assertTrue(result.closed); + Assertions.assertTrue(contentRef.get().closed); + } + } + + @ValueSource(ints = {0, 2048, 10240}) @ParameterizedTest(name = "{displayName}; content length: {0}") void testConcurrentGetRequests(final int contentSize) throws Exception {