Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,90 @@
package org.apache.hc.client5.testing.async;

import java.io.IOException;
import java.io.InputStream;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.commons.io.input.ProxyInputStream;
import org.apache.hc.client5.http.ClientProtocolException;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.testing.Result;
import org.apache.hc.client5.testing.extension.async.ClientProtocolLevel;
import org.apache.hc.client5.testing.extension.async.ServerProtocolLevel;
import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.http.io.entity.ByteArrayEntity;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.io.entity.HttpEntityWrapper;
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public abstract class TestClassicOverAsync extends AbstractClassicOverAsyncIntegrationTestBase {

/**
* Mocks an InputStream that records whether it has been closed.
*/
private static final class MockInputStream extends ProxyInputStream {

boolean closed;

public MockInputStream(final InputStream proxy) {
super(proxy);
}

@Override
public synchronized void close() throws IOException {
super.close();
closed = true;
}

}

/**
* Mocks an HttpEntity that records whether it has been closed.
*/
private static final class MockHttpEntityWrapper extends HttpEntityWrapper {

boolean closed;
final boolean streaming;
final InputStream content;

public MockHttpEntityWrapper(final HttpEntity wrappedEntity, final boolean streaming) throws UnsupportedOperationException, IOException {
super(wrappedEntity);
this.streaming = streaming;
this.content = new MockInputStream(wrappedEntity.getContent());
}

@Override
public synchronized void close() throws IOException {
super.close();
closed = true;
}

@Override
public synchronized InputStream getContent() throws IOException {
return content;
}

@Override
public boolean isStreaming() {
return streaming;
}

}

public TestClassicOverAsync(final URIScheme scheme,
final ClientProtocolLevel clientProtocolLevel,
final ServerProtocolLevel serverProtocolLevel) {
Expand Down Expand Up @@ -84,6 +143,113 @@ void testSequentialGetRequests(final int contentSize) throws Exception {
}
}

/**
* Tests that the response entity is the client when an AssertionError is thrown from the response handler.
* The content stream is <strong>not closed</strong> because the entity is non-streaming.
*/
@ValueSource(ints = {0, 2048, 10240})
@ParameterizedTest(name = "{displayName}; content length: {0}")
void testSequentialGetRequestsAssertionError(final int contentSize) throws Exception {
configureServer(bootstrap -> bootstrap.register("/random/*", AsyncRandomHandler::new));
final HttpHost target = startServer();
final CloseableHttpClient client = startClient();
final int n = 10;
final String detailMessage = "Simulated unit test failure";
for (int i = 0; i < n; i++) {
final AtomicReference<MockHttpEntityWrapper> entityRef = new AtomicReference<>();
final AtomicReference<MockInputStream> contentRef = new AtomicReference<>();
try {
entityRef.set(null);
client.execute(
ClassicRequestBuilder.get()
.setHttpHost(target)
.setPath("/random/" + contentSize)
.build(),
response -> {
final MockHttpEntityWrapper entity = new MockHttpEntityWrapper(response.getEntity(), false);
entityRef.set(entity);
contentRef.set((MockInputStream) entity.getContent());
response.setEntity(entityRef.get());
throw new AssertionError(detailMessage);
});
Assertions.fail("AssertionError expected from execute()");
} catch (final AssertionError e) {
// Note that we can't use Assertions.assertThrows() because it doesn't catch AssertionError.
Assertions.assertEquals(detailMessage, e.getMessage());
}
Assertions.assertNotNull(entityRef.get());
Assertions.assertTrue(entityRef.get().closed);
Assertions.assertFalse(contentRef.get().closed);
}
}

/**
* Tests that the response entity is the client when an HttpException is thrown from the response handler.
* The content stream is <strong>not closed</strong> because the entity is non-streaming.
*/
@ValueSource(ints = {0, 2048, 10240})
@ParameterizedTest(name = "{displayName}; content length: {0}")
void testSequentialGetRequestsHttpException(final int contentSize) throws Exception {
configureServer(bootstrap -> bootstrap.register("/random/*", AsyncRandomHandler::new));
final HttpHost target = startServer();
final CloseableHttpClient client = startClient();
final int n = 10;
final String detailMessage = "Simulated HttpException failure";
for (int i = 0; i < n; i++) {
final AtomicReference<MockHttpEntityWrapper> entityRef = new AtomicReference<>();
final AtomicReference<MockInputStream> contentRef = new AtomicReference<>();
final ClientProtocolException e = Assertions.assertThrows(ClientProtocolException.class, () -> {
client.execute(
ClassicRequestBuilder.get()
.setHttpHost(target)
.setPath("/random/" + contentSize)
.build(),
response -> {
final MockHttpEntityWrapper entity = new MockHttpEntityWrapper(response.getEntity(), false);
entityRef.set(entity);
contentRef.set((MockInputStream) entity.getContent());
response.setEntity(entityRef.get());
throw new HttpException(detailMessage);
});
});
// If an HttpException is thrown from the handler, and the entity is non-streaming, the stream is left open.
Assertions.assertEquals(detailMessage, e.getCause().getMessage());
Assertions.assertTrue(entityRef.get().closed);
Assertions.assertFalse(contentRef.get().closed);
}
}

/**
* Tests that the response entity and its content are closed by the client.
*/
@ValueSource(ints = {0, 2048, 10240})
@ParameterizedTest(name = "{displayName}; content length: {0}")
void testSequentialGetRequestsStreaming(final int contentSize) throws Exception {
configureServer(bootstrap -> bootstrap.register("/random/*", AsyncRandomHandler::new));
final HttpHost target = startServer();
final CloseableHttpClient client = startClient();
final int n = 10;
for (int i = 0; i < n; i++) {
final AtomicReference<MockInputStream> contentRef = new AtomicReference<>();
final MockHttpEntityWrapper result = (MockHttpEntityWrapper) client.execute(
ClassicRequestBuilder.get()
.setHttpHost(target)
.setPath("/random/" + contentSize)
.build(),
response -> {
Assertions.assertEquals(200, response.getCode());
final MockHttpEntityWrapper entity = new MockHttpEntityWrapper(response.getEntity(), true);
contentRef.set((MockInputStream) entity.getContent());
response.setEntity(entity);
return response.getEntity();
});
// The plain use case where the entity and its input stream are closed.
Assertions.assertTrue(result.closed);
Assertions.assertTrue(contentRef.get().closed);
}
}


@ValueSource(ints = {0, 2048, 10240})
@ParameterizedTest(name = "{displayName}; content length: {0}")
void testConcurrentGetRequests(final int contentSize) throws Exception {
Expand Down