Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -316,4 +316,10 @@ public void testWriteWithClientResponseException() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithElasticClientResponseException();
}

@Test
public void testWriteWithClientResponseExceptionIsRetried() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithElasticClientResponseExceptionIsRetried();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -316,4 +316,10 @@ public void testWriteWithClientResponseException() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithElasticClientResponseException();
}

@Test
public void testWriteWithClientResponseExceptionIsRetried() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithElasticClientResponseExceptionIsRetried();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,55 @@ void testWriteWithElasticClientResponseException() throws Exception {
pipeline.run();
}

void testWriteWithElasticClientResponseExceptionIsRetried() throws Exception {
try (ElasticsearchIOTestUtils.AlwaysFailServer srv =
new ElasticsearchIOTestUtils.AlwaysFailServer(0, 500)) {
int port = srv.getPort();
String[] hosts = {String.format("http://localhost:%d", port)};
ConnectionConfiguration clientConfig = ConnectionConfiguration.create(hosts);

Write write =
ElasticsearchIO.write()
.withConnectionConfiguration(clientConfig)
.withBackendVersion(8) // Mock server does not return proper version
.withMaxBatchSize(numDocs + 1)
.withMaxBatchSizeBytes(
Long.MAX_VALUE) // Max long number to make sure all docs are flushed in one batch.
.withThrowWriteErrors(false)
.withRetryConfiguration(
ElasticsearchIO.RetryConfiguration.create(MAX_ATTEMPTS, Duration.millis(35000))
.withRetryPredicate(CUSTOM_RETRY_PREDICATE))
.withIdFn(new ExtractValueFn("id"))
.withUseStatefulBatches(true);

List<String> data =
ElasticsearchIOTestUtils.createDocuments(1, InjectionMode.DO_NOT_INJECT_INVALID_DOCS);

PCollectionTuple outputs = pipeline.apply(Create.of(data)).apply(write);

// The whole batch should fail and direct to tag FAILED_WRITES because of one invalid doc.
PCollection<String> success =
outputs
.get(Write.SUCCESSFUL_WRITES)
.apply("Convert success to input ID", MapElements.via(mapToInputIdString));

PCollection<String> fail =
outputs
.get(Write.FAILED_WRITES)
.apply("Convert fails to input ID", MapElements.via(mapToInputIdString));

PAssert.that(success).empty();
PAssert.that(fail).containsInAnyOrder("0"); // First and only document

// Verify response item contains the corresponding error message.
String expectedError =
String.format(ElasticsearchIO.BulkIO.RETRY_FAILED_LOG, EXPECTED_RETRIES);
PAssert.that(outputs.get(Write.FAILED_WRITES))
.satisfies(responseItemJsonSubstringValidator(expectedError));
pipeline.run();
}
}

void testWriteWithAllowedErrors() throws Exception {
Set<String> allowedErrors = new HashSet<>();
allowedErrors.add("json_parse_exception");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpServer;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
Expand Down Expand Up @@ -546,6 +551,7 @@ public Integer apply(Document document) {
new SimpleFunction<Document, String>() {
@Override
public String apply(Document document) {
System.err.println("INPUT DOC: " + document.getResponseItemJson());
try {
// Account for intentionally invalid input json docs
String fixedJson = document.getInputDoc().replaceAll(";", ":");
Expand All @@ -555,4 +561,41 @@ public String apply(Document document) {
}
}
};

/**
* Small server that always returns a specified HTTP error code. This is useful to simulate server
* errors in tests.
*/
static class AlwaysFailServer implements AutoCloseable {
private final HttpServer server;
private final int port;

AlwaysFailServer(int port, int status) throws IOException {
HttpServer server = HttpServer.create(new InetSocketAddress(port), 0);
this.port = server.getAddress().getPort();
server.createContext("/", exchange -> handle(exchange, status));
server.start();

this.server = server;
}

int getPort() {
return port;
}

private static void handle(HttpExchange exchange, int status) throws IOException {
byte[] response = "Internal Server Error".getBytes(StandardCharsets.UTF_8);
exchange.sendResponseHeaders(status, response.length);
try (OutputStream os = exchange.getResponseBody()) {
os.write(response);
}
}

@Override
public void close() throws Exception {
if (server != null) {
server.stop(0);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2811,13 +2811,14 @@ protected void addAndMaybeFlush(Document doc, ProcessContext context)
}

private boolean isRetryableClientException(Throwable t) {
// RestClient#performRequest only throws wrapped IOException so we must inspect the
// RestClient#performRequest mainly throws wrapped IOException so we must inspect the
// exception cause to determine if the exception is likely transient i.e. retryable or
// not.
// not. One exception is the ResponseException which is thrown when attempting to parse the
// response. This exception is not wrapped.

// Retry for 500-range response code except for 501.
if (t.getCause() instanceof ResponseException) {
ResponseException ex = (ResponseException) t.getCause();
if (t instanceof ResponseException) {
ResponseException ex = (ResponseException) t;
int statusCode = ex.getResponse().getStatusLine().getStatusCode();
return statusCode >= 500 && statusCode != 501;
}
Expand Down Expand Up @@ -2893,7 +2894,16 @@ private List<Document> flushBatch() throws IOException, InterruptedException {
&& spec.getRetryConfiguration().getRetryPredicate().test(responseEntity)) {
LOG.warn("ES Cluster is responding with HTP 429 - TOO_MANY_REQUESTS.");
}
responseEntity = handleRetry("POST", endPoint, Collections.emptyMap(), requestBody);
try {
responseEntity = handleRetry("POST", endPoint, Collections.emptyMap(), requestBody);
} catch (java.io.IOException ex) {
// No more retry attempts, determine what to do using throwWriteErrors
if (spec.getThrowWriteErrors()) {
throw ex;
} else {
elasticResponseExceptionMessage = ex.getMessage();
}
}
}

List<Document> responses;
Expand Down
Loading