Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.MethodNotSupportedException;
import org.apache.hc.core5.http.NameValuePair;
import org.apache.hc.core5.http.ProtocolException;
import org.apache.hc.core5.http.message.BasicHttpResponse;
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
Expand All @@ -53,8 +54,11 @@
import org.apache.hc.core5.http.nio.StreamChannel;
import org.apache.hc.core5.http.nio.entity.AbstractBinAsyncEntityProducer;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.net.WWWFormCodec;
import org.apache.hc.core5.util.Asserts;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
* A handler that generates random data.
*/
Expand Down Expand Up @@ -93,6 +97,22 @@ public void handleRequest(
} catch (final URISyntaxException ex) {
throw new ProtocolException(ex.getMessage(), ex);
}
final String query = uri.getQuery();
int delayMs = 0;
boolean drip = false;
if (query != null) {
final List<NameValuePair> params = WWWFormCodec.parse(query, UTF_8);
for (final NameValuePair param : params) {
final String name = param.getName();
final String value = param.getValue();
if ("delay".equals(name)) {
delayMs = Integer.parseInt(value);
} else if ("drip".equals(name)) {
drip = "1".equals(value);
}
}
}

final String path = uri.getPath();
final int slash = path.lastIndexOf('/');
if (slash != -1) {
Expand All @@ -109,7 +129,7 @@ public void handleRequest(
n = 1 + (int)(Math.random() * 79.0);
}
final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
final AsyncEntityProducer entityProducer = new RandomBinAsyncEntityProducer(n);
final AsyncEntityProducer entityProducer = new RandomBinAsyncEntityProducer(n, delayMs, drip);
entityProducerRef.set(entityProducer);
responseChannel.sendResponse(response, entityProducer, context);
} else {
Expand Down Expand Up @@ -162,12 +182,22 @@ public static class RandomBinAsyncEntityProducer extends AbstractBinAsyncEntityP
private final long length;
private long remaining;
private final ByteBuffer buffer;
private final int delayMs;
private final boolean drip;
private volatile long deadline;

public RandomBinAsyncEntityProducer(final long len) {
this(len, 0, false);
}

public RandomBinAsyncEntityProducer(final long len, final int delayMs, final boolean drip) {
super(512, ContentType.DEFAULT_TEXT);
length = len;
remaining = len;
buffer = ByteBuffer.allocate(1024);
this.delayMs = delayMs;
this.deadline = System.currentTimeMillis() + (drip ? 0 : delayMs);
this.drip = drip;
}

@Override
Expand All @@ -192,6 +222,10 @@ public int availableData() {

@Override
protected void produceData(final StreamChannel<ByteBuffer> channel) throws IOException {
if (System.currentTimeMillis() < deadline) {
return;
}

final int chunk = Math.min((int) (remaining < Integer.MAX_VALUE ? remaining : Integer.MAX_VALUE), buffer.remaining());
for (int i = 0; i < chunk; i++) {
final byte b = RANGE[(int) (Math.random() * RANGE.length)];
Expand All @@ -205,6 +239,8 @@ protected void produceData(final StreamChannel<ByteBuffer> channel) throws IOExc

if (remaining <= 0 && buffer.position() == 0) {
channel.endStream();
} else if (drip) {
deadline = System.currentTimeMillis() + delayMs;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,27 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.List;

import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.MethodNotSupportedException;
import org.apache.hc.core5.http.NameValuePair;
import org.apache.hc.core5.http.ProtocolException;
import org.apache.hc.core5.http.io.HttpRequestHandler;
import org.apache.hc.core5.http.io.entity.AbstractHttpEntity;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.net.WWWFormCodec;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
* A handler that generates random data.
Expand Down Expand Up @@ -84,6 +90,22 @@ public void handle(final ClassicHttpRequest request,
} catch (final URISyntaxException ex) {
throw new ProtocolException(ex.getMessage(), ex);
}
final String query = uri.getQuery();
int delayMs = 0;
boolean drip = false;
if (query != null) {
final List<NameValuePair> params = WWWFormCodec.parse(query, UTF_8);
for (final NameValuePair param : params) {
final String name = param.getName();
final String value = param.getValue();
if ("delay".equals(name)) {
delayMs = Integer.parseInt(value);
} else if ("drip".equals(name)) {
drip = "1".equals(value);
}
}
}

final String path = uri.getPath();
final int slash = path.lastIndexOf('/');
if (slash != -1) {
Expand All @@ -100,7 +122,7 @@ public void handle(final ClassicHttpRequest request,
n = 1 + (int)(Math.random() * 79.0);
}
response.setCode(HttpStatus.SC_OK);
response.setEntity(new RandomEntity(n));
response.setEntity(new RandomEntity(n, delayMs, drip));
} else {
throw new ProtocolException("Invalid request path: " + path);
}
Expand All @@ -120,6 +142,12 @@ public static class RandomEntity extends AbstractHttpEntity {
/** The length of the random data to generate. */
protected final long length;

/** The duration of the delay before sending the response entity. */
protected final int delayMs;

/** Whether to delay after each chunk sent. If {@code false},
* will only delay at the start of the response entity. */
protected final boolean drip;

/**
* Creates a new entity generating the given amount of data.
Expand All @@ -128,8 +156,22 @@ public static class RandomEntity extends AbstractHttpEntity {
* 0 to maxint
*/
public RandomEntity(final long len) {
this(len, 0, false);
}

/**
* Creates a new entity generating the given amount of data.
*
* @param len the number of random bytes to generate,
* 0 to maxint
* @param delayMs how long to wait before sending the first byte
* @param drip whether to repeat the delay after each byte
*/
public RandomEntity(final long len, final int delayMs, final boolean drip) {
super((ContentType) null, null);
length = len;
this.length = len;
this.delayMs = delayMs;
this.drip = drip;
}

/**
Expand Down Expand Up @@ -185,31 +227,48 @@ public InputStream getContent() {
@Override
public void writeTo(final OutputStream out) throws IOException {

final int blocksize = 2048;
int remaining = (int) length; // range checked in constructor
final byte[] data = new byte[Math.min(remaining, blocksize)];
try {
final int blocksize = 2048;
int remaining = (int) length; // range checked in constructor
final byte[] data = new byte[Math.min(remaining, blocksize)];

while (remaining > 0) {
final int end = Math.min(remaining, data.length);
out.flush();
if (!drip) {
delay();
}
while (remaining > 0) {
final int end = Math.min(remaining, data.length);

double value = 0.0;
for (int i = 0; i < end; i++) {
// we get 5 random characters out of one random value
if (i % 5 == 0) {
value = Math.random();
double value = 0.0;
for (int i = 0; i < end; i++) {
// we get 5 random characters out of one random value
if (i % 5 == 0) {
value = Math.random();
}
value = value * RANGE.length;
final int d = (int) value;
value = value - d;
data[i] = RANGE[d];
}
value = value * RANGE.length;
final int d = (int) value;
value = value - d;
data[i] = RANGE[d];
}
out.write(data, 0, end);
out.flush();
out.write(data, 0, end);
out.flush();

remaining = remaining - end;
remaining = remaining - end;
if (drip) {
delay();
}
}
} finally {
out.close();
}
out.close();
}

private void delay() throws IOException {
try {
Thread.sleep(delayMs);
} catch (final InterruptedException ex) {
throw new InterruptedIOException();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
package org.apache.hc.client5.testing.async;

import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.util.function.Consumer;

import org.apache.hc.client5.testing.extension.async.ClientProtocolLevel;
Expand All @@ -48,9 +49,17 @@ abstract class AbstractIntegrationTestBase {

@RegisterExtension
private final TestAsyncResources testResources;
private final boolean useUnixDomainSocket;

protected AbstractIntegrationTestBase(final URIScheme scheme, final ClientProtocolLevel clientProtocolLevel, final ServerProtocolLevel serverProtocolLevel) {
this(scheme, clientProtocolLevel, serverProtocolLevel, false);
}

protected AbstractIntegrationTestBase(
final URIScheme scheme, final ClientProtocolLevel clientProtocolLevel,
final ServerProtocolLevel serverProtocolLevel, final boolean useUnixDomainSocket) {
this.testResources = new TestAsyncResources(scheme, clientProtocolLevel, serverProtocolLevel, TIMEOUT);
this.useUnixDomainSocket = useUnixDomainSocket;
}

public URIScheme scheme() {
Expand All @@ -72,6 +81,9 @@ public void configureServer(final Consumer<TestAsyncServerBootstrap> serverCusto
public HttpHost startServer() throws Exception {
final TestAsyncServer server = testResources.server();
final InetSocketAddress inetSocketAddress = server.start();
if (useUnixDomainSocket) {
testResources.udsProxy().start();
}
return new HttpHost(testResources.scheme().id, "localhost", inetSocketAddress.getPort());
}

Expand All @@ -80,9 +92,21 @@ public void configureClient(final Consumer<TestAsyncClientBuilder> clientCustomi
}

public TestAsyncClient startClient() throws Exception {
if (useUnixDomainSocket) {
final Path socketPath = getUnixDomainSocket();
testResources.configureClient(builder -> {
builder.setUnixDomainSocket(socketPath);
});
}
final TestAsyncClient client = testResources.client();
client.start();
return client;
}

public Path getUnixDomainSocket() throws Exception {
if (useUnixDomainSocket) {
return testResources.udsProxy().getSocketPath();
}
return null;
}
}
Loading