From 61435e96a7bfbca459afa15e33adc0cc2e92a9bf Mon Sep 17 00:00:00 2001 From: Egbert van der Wal Date: Mon, 22 Dec 2025 18:29:23 +0100 Subject: [PATCH] fix: handle ResponseException correctly, honor throwWriteExceptions when using RetryConfiguration --- .../io/elasticsearch/ElasticsearchIOTest.java | 6 +++ .../io/elasticsearch/ElasticsearchIOTest.java | 6 +++ .../ElasticsearchIOTestCommon.java | 49 +++++++++++++++++++ .../ElasticsearchIOTestUtils.java | 43 ++++++++++++++++ .../sdk/io/elasticsearch/ElasticsearchIO.java | 20 ++++++-- 5 files changed, 119 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java index 08bf6e3a2983..1e4531202ec9 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -316,4 +316,10 @@ public void testWriteWithClientResponseException() throws Exception { elasticsearchIOTestCommon.setPipeline(pipeline); elasticsearchIOTestCommon.testWriteWithElasticClientResponseException(); } + + @Test + public void testWriteWithClientResponseExceptionIsRetried() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWriteWithElasticClientResponseExceptionIsRetried(); + } } diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-9/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-9/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java index 5f3861e69a99..9933f5d1cdcd 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-9/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-9/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -316,4 +316,10 @@ public void testWriteWithClientResponseException() throws Exception { elasticsearchIOTestCommon.setPipeline(pipeline); elasticsearchIOTestCommon.testWriteWithElasticClientResponseException(); } + + @Test + public void testWriteWithClientResponseExceptionIsRetried() throws Exception { + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWriteWithElasticClientResponseExceptionIsRetried(); + } } diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java index 768b3cfe8032..2c911b2014c3 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java @@ -506,6 +506,55 @@ void testWriteWithElasticClientResponseException() throws Exception { pipeline.run(); } + void testWriteWithElasticClientResponseExceptionIsRetried() throws Exception { + try (ElasticsearchIOTestUtils.AlwaysFailServer srv = + new ElasticsearchIOTestUtils.AlwaysFailServer(0, 500)) { + int port = srv.getPort(); + String[] hosts = {String.format("http://localhost:%d", port)}; + ConnectionConfiguration clientConfig = ConnectionConfiguration.create(hosts); + + Write write = + ElasticsearchIO.write() + .withConnectionConfiguration(clientConfig) + .withBackendVersion(8) // Mock server does not return proper version + .withMaxBatchSize(numDocs + 1) + .withMaxBatchSizeBytes( + Long.MAX_VALUE) // Max long number to make sure all docs are flushed in one batch. + .withThrowWriteErrors(false) + .withRetryConfiguration( + ElasticsearchIO.RetryConfiguration.create(MAX_ATTEMPTS, Duration.millis(35000)) + .withRetryPredicate(CUSTOM_RETRY_PREDICATE)) + .withIdFn(new ExtractValueFn("id")) + .withUseStatefulBatches(true); + + List data = + ElasticsearchIOTestUtils.createDocuments(1, InjectionMode.DO_NOT_INJECT_INVALID_DOCS); + + PCollectionTuple outputs = pipeline.apply(Create.of(data)).apply(write); + + // The whole batch should fail and direct to tag FAILED_WRITES because of one invalid doc. + PCollection success = + outputs + .get(Write.SUCCESSFUL_WRITES) + .apply("Convert success to input ID", MapElements.via(mapToInputIdString)); + + PCollection fail = + outputs + .get(Write.FAILED_WRITES) + .apply("Convert fails to input ID", MapElements.via(mapToInputIdString)); + + PAssert.that(success).empty(); + PAssert.that(fail).containsInAnyOrder("0"); // First and only document + + // Verify response item contains the corresponding error message. + String expectedError = + String.format(ElasticsearchIO.BulkIO.RETRY_FAILED_LOG, EXPECTED_RETRIES); + PAssert.that(outputs.get(Write.FAILED_WRITES)) + .satisfies(responseItemJsonSubstringValidator(expectedError)); + pipeline.run(); + } + } + void testWriteWithAllowedErrors() throws Exception { Set allowedErrors = new HashSet<>(); allowedErrors.add("json_parse_exception"); diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestUtils.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestUtils.java index 7e3cd58fd202..9c5e230f3363 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestUtils.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestUtils.java @@ -27,7 +27,12 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpServer; import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.LocalDateTime; import java.util.ArrayList; @@ -546,6 +551,7 @@ public Integer apply(Document document) { new SimpleFunction() { @Override public String apply(Document document) { + System.err.println("INPUT DOC: " + document.getResponseItemJson()); try { // Account for intentionally invalid input json docs String fixedJson = document.getInputDoc().replaceAll(";", ":"); @@ -555,4 +561,41 @@ public String apply(Document document) { } } }; + + /** + * Small server that always returns a specified HTTP error code. This is useful to simulate server + * errors in tests. + */ + static class AlwaysFailServer implements AutoCloseable { + private final HttpServer server; + private final int port; + + AlwaysFailServer(int port, int status) throws IOException { + HttpServer server = HttpServer.create(new InetSocketAddress(port), 0); + this.port = server.getAddress().getPort(); + server.createContext("/", exchange -> handle(exchange, status)); + server.start(); + + this.server = server; + } + + int getPort() { + return port; + } + + private static void handle(HttpExchange exchange, int status) throws IOException { + byte[] response = "Internal Server Error".getBytes(StandardCharsets.UTF_8); + exchange.sendResponseHeaders(status, response.length); + try (OutputStream os = exchange.getResponseBody()) { + os.write(response); + } + } + + @Override + public void close() throws Exception { + if (server != null) { + server.stop(0); + } + } + } } diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index c634bb99e02e..e00faef54f80 100644 --- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -2811,13 +2811,14 @@ protected void addAndMaybeFlush(Document doc, ProcessContext context) } private boolean isRetryableClientException(Throwable t) { - // RestClient#performRequest only throws wrapped IOException so we must inspect the + // RestClient#performRequest mainly throws wrapped IOException so we must inspect the // exception cause to determine if the exception is likely transient i.e. retryable or - // not. + // not. One exception is the ResponseException which is thrown when attempting to parse the + // response. This exception is not wrapped. // Retry for 500-range response code except for 501. - if (t.getCause() instanceof ResponseException) { - ResponseException ex = (ResponseException) t.getCause(); + if (t instanceof ResponseException) { + ResponseException ex = (ResponseException) t; int statusCode = ex.getResponse().getStatusLine().getStatusCode(); return statusCode >= 500 && statusCode != 501; } @@ -2893,7 +2894,16 @@ private List flushBatch() throws IOException, InterruptedException { && spec.getRetryConfiguration().getRetryPredicate().test(responseEntity)) { LOG.warn("ES Cluster is responding with HTP 429 - TOO_MANY_REQUESTS."); } - responseEntity = handleRetry("POST", endPoint, Collections.emptyMap(), requestBody); + try { + responseEntity = handleRetry("POST", endPoint, Collections.emptyMap(), requestBody); + } catch (java.io.IOException ex) { + // No more retry attempts, determine what to do using throwWriteErrors + if (spec.getThrowWriteErrors()) { + throw ex; + } else { + elasticResponseExceptionMessage = ex.getMessage(); + } + } } List responses;