From 672a8fe8f04b63d1fc335663dc968ca478a9058b Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Tue, 26 May 2026 14:29:25 -0700 Subject: [PATCH 01/12] fix: isolate per-record ID-generation failures in Kafka sink transformer (#49268) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a single record's ID strategy fails (e.g., ProvidedInStrategy JsonPath parse error), only that record should be routed to DLQ — not the entire batch. Previously, SinkRecordTransformer.transform() had no per-record error handling, so one malformed record would abort transformation of all records in the batch. Changes: - SinkRecordTransformer: Add per-record try-catch in transform(). Accept ErrantRecordReporter and ToleranceOnErrorLevel. Report failing records to DLQ when available, skip when tolerance is ALL, rethrow when tolerance is NONE. - CosmosSinkTask: Pass reporter and tolerance to SinkRecordTransformer. Fix written-record bookkeeping to count only successfully transformed records. Fixes Azure/azure-sdk-for-java#49268 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .gitignore | 1 + .../implementation/sink/CosmosSinkTask.java | 7 +- .../sink/SinkRecordTransformer.java | 85 +++++++++++++------ 3 files changed, 67 insertions(+), 26 deletions(-) diff --git a/.gitignore b/.gitignore index 787fe08c9a86..ca7abdd4bb4d 100644 --- a/.gitignore +++ b/.gitignore @@ -129,4 +129,5 @@ TempTypeSpecFiles/ # Azure Artifacts Credential Provider runtime .azure-artifacts/ +.coding-harness/ diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java index 5c5d0bd0150b..2db4aede492f 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java @@ -50,7 +50,10 @@ public void start(Map props) { this.sinkTaskConfig.getClientMetadataCachesSnapshot()); LOGGER.info("The taskId is " + this.sinkTaskConfig.getTaskId()); this.throughputControlClientItem = this.getThroughputControlCosmosClient(); - this.sinkRecordTransformer = new SinkRecordTransformer(this.sinkTaskConfig); + this.sinkRecordTransformer = new SinkRecordTransformer( + this.sinkTaskConfig, + this.context.errantRecordReporter(), + this.sinkTaskConfig.getWriteConfig().getToleranceOnErrorLevel()); if (this.sinkTaskConfig.getWriteConfig().isBulkEnabled()) { this.cosmosWriter = @@ -129,7 +132,7 @@ record -> this.sinkTaskConfig List transformedRecords = sinkRecordTransformer.transform(containerName, entry.getValue()); this.cosmosWriter.write(container, transformedRecords); - totalWrittenRecordsPerContainer.merge(containerName, (long) entry.getValue().size(), Long::sum); + totalWrittenRecordsPerContainer.merge(containerName, (long) transformedRecords.size(), Long::sum); } logWrittenRecordCount(); diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java index e600b95041b8..970215c04a71 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java @@ -11,6 +11,7 @@ import com.azure.cosmos.kafka.connect.implementation.sink.idstrategy.ProvidedInValueStrategy; import com.azure.cosmos.kafka.connect.implementation.sink.idstrategy.TemplateStrategy; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,9 +25,16 @@ public class SinkRecordTransformer { private static final Logger LOGGER = LoggerFactory.getLogger(SinkRecordTransformer.class); private final IdStrategy idStrategy; + private final ErrantRecordReporter errantRecordReporter; + private final ToleranceOnErrorLevel toleranceOnErrorLevel; - public SinkRecordTransformer(CosmosSinkTaskConfig sinkTaskConfig) { + public SinkRecordTransformer( + CosmosSinkTaskConfig sinkTaskConfig, + ErrantRecordReporter errantRecordReporter, + ToleranceOnErrorLevel toleranceOnErrorLevel) { this.idStrategy = this.createIdStrategy(sinkTaskConfig); + this.errantRecordReporter = errantRecordReporter; + this.toleranceOnErrorLevel = toleranceOnErrorLevel; } @SuppressWarnings("unchecked") @@ -44,30 +52,59 @@ public List transform(String containerName, List sinkRec record.value() == null ? null : record.value().getClass().getName(), record.value() == null ? null : record.valueSchema()); - Object recordValue; - if (record.value() instanceof Struct) { - recordValue = StructToJsonMap.toJsonMap((Struct) record.value()); - } else if (record.value() instanceof Map) { - recordValue = StructToJsonMap.handleMap((Map) record.value()); - } else { - recordValue = record.value(); + try { + Object recordValue; + if (record.value() instanceof Struct) { + recordValue = StructToJsonMap.toJsonMap((Struct) record.value()); + } else if (record.value() instanceof Map) { + recordValue = StructToJsonMap.handleMap((Map) record.value()); + } else { + recordValue = record.value(); + } + + maybeInsertId(recordValue, record); + + final SinkRecord updatedRecord = new SinkRecord(record.topic(), + record.kafkaPartition(), + record.keySchema(), + record.key(), + record.valueSchema(), + recordValue, + record.kafkaOffset(), + record.timestamp(), + record.timestampType(), + record.headers()); + + toBeWrittenRecordList.add(updatedRecord); + } catch (Exception e) { + LOGGER.warn( + "Failed to transform record from topic {}, partition {}, offset {}, container {}.", + record.topic(), + record.kafkaPartition(), + record.kafkaOffset(), + containerName, + e); + if (this.errantRecordReporter != null) { + this.errantRecordReporter.report(record, e); + } else if (this.toleranceOnErrorLevel == ToleranceOnErrorLevel.ALL) { + LOGGER.warn( + "Skipping record from topic {}, partition {}, offset {}, container {} due to transform error " + + "and ToleranceOnErrorLevel is ALL.", + record.topic(), + record.kafkaPartition(), + record.kafkaOffset(), + containerName); + } else { + LOGGER.error( + "Failing record from topic {}, partition {}, offset {}, container {} because no DLQ reporter " + + "is configured and ToleranceOnErrorLevel is NONE.", + record.topic(), + record.kafkaPartition(), + record.kafkaOffset(), + containerName); + throw e; + } } - - maybeInsertId(recordValue, record); - - // Create an updated record with from the current record and the updated record value - final SinkRecord updatedRecord = new SinkRecord(record.topic(), - record.kafkaPartition(), - record.keySchema(), - record.key(), - record.valueSchema(), - recordValue, - record.kafkaOffset(), - record.timestamp(), - record.timestampType(), - record.headers()); - - toBeWrittenRecordList.add(updatedRecord); } return toBeWrittenRecordList; From cc8e58a51fb5c6c9ef4cfb243e2b84b8019089b7 Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Tue, 26 May 2026 14:34:34 -0700 Subject: [PATCH 02/12] test: add SinkRecordTransformer per-record error isolation tests Covers: DLQ reporting, tolerance ALL skip, tolerance NONE rethrow, all-valid regression, all-bad with reporter, reporter precedence over tolerance. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../sink/SinkRecordTransformerTest.java | 287 ++++++++++++++++++ 1 file changed, 287 insertions(+) create mode 100644 sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java new file mode 100644 index 000000000000..52d8e04aba79 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java @@ -0,0 +1,287 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.kafka.connect.implementation.sink; + +import com.azure.cosmos.kafka.connect.implementation.sink.idstrategy.IdStrategy; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.ErrantRecordReporter; +import org.apache.kafka.connect.sink.SinkRecord; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.testng.annotations.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.CALLS_REAL_METHODS; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; + +public class SinkRecordTransformerTest { + private static final int TIMEOUT = 60000; + + /** + * Creates a SinkRecordTransformer bypassing the constructor that needs CosmosSinkTaskConfig. + * Uses Mockito with CALLS_REAL_METHODS so the transform() method executes real code, + * then injects the fields via reflection. + */ + private SinkRecordTransformer createTransformer( + IdStrategy idStrategy, + ErrantRecordReporter reporter, + ToleranceOnErrorLevel toleranceLevel) throws Exception { + + SinkRecordTransformer transformer = Mockito.mock( + SinkRecordTransformer.class, + withSettings().defaultAnswer(CALLS_REAL_METHODS)); + FieldUtils.writeField(transformer, "idStrategy", idStrategy, true); + FieldUtils.writeField(transformer, "errantRecordReporter", reporter, true); + FieldUtils.writeField(transformer, "toleranceOnErrorLevel", toleranceLevel, true); + return transformer; + } + + /** + * Creates a SinkRecord with a Map value containing the given fields. + */ + private SinkRecord createMapRecord(String topic, int partition, long offset, Map value) { + return new SinkRecord(topic, partition, null, "key-" + offset, null, value, offset); + } + + /** + * Creates an IdStrategy that fails (throws ConnectException) when generating an ID + * for records whose value map contains a field "fail" set to true, + * and returns a valid ID otherwise. + */ + private IdStrategy createSelectivelyFailingIdStrategy() { + IdStrategy idStrategy = Mockito.mock(IdStrategy.class); + when(idStrategy.generateId(any(SinkRecord.class))).thenAnswer(invocation -> { + SinkRecord record = invocation.getArgument(0); + Object value = record.value(); + if (value instanceof Map) { + @SuppressWarnings("unchecked") + Map map = (Map) value; + if (Boolean.TRUE.equals(map.get("fail"))) { + throw new ConnectException("Cannot generate ID: missing required field"); + } + } + return "generated-id-" + record.kafkaOffset(); + }); + return idStrategy; + } + + // ============================================================ + // T1: Mixed batch with reporter — bad record goes to DLQ, valid records in output + // ============================================================ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + @SuppressWarnings("unchecked") + public void mixedBatchWithReporter_badRecordReportedValidRecordsInOutput() throws Exception { + // Arrange + IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); + ErrantRecordReporter reporter = Mockito.mock(ErrantRecordReporter.class); + Future mockFuture = Mockito.mock(Future.class); + when(reporter.report(any(SinkRecord.class), any(Throwable.class))).thenReturn(mockFuture); + + SinkRecordTransformer transformer = createTransformer(idStrategy, reporter, ToleranceOnErrorLevel.NONE); + + Map goodValue1 = new HashMap<>(); + goodValue1.put("data", "hello"); + + Map badValue = new HashMap<>(); + badValue.put("fail", true); + + Map goodValue2 = new HashMap<>(); + goodValue2.put("data", "world"); + + List batch = Arrays.asList( + createMapRecord("topic1", 0, 0L, goodValue1), + createMapRecord("topic1", 0, 1L, badValue), + createMapRecord("topic1", 0, 2L, goodValue2) + ); + + // Act + List result = transformer.transform("container1", batch); + + // Assert — only 2 valid records in output + assertThat(result.size()).isEqualTo(2); + assertThat(((Map) result.get(0).value()).get("id")).isEqualTo("generated-id-0"); + assertThat(((Map) result.get(1).value()).get("id")).isEqualTo("generated-id-2"); + + // Assert — reporter called exactly once with the bad record + ArgumentCaptor recordCaptor = ArgumentCaptor.forClass(SinkRecord.class); + ArgumentCaptor errorCaptor = ArgumentCaptor.forClass(Throwable.class); + verify(reporter, times(1)).report(recordCaptor.capture(), errorCaptor.capture()); + assertThat(recordCaptor.getValue().kafkaOffset()).isEqualTo(1L); + assertThat(errorCaptor.getValue()).isInstanceOf(ConnectException.class); + } + + // ============================================================ + // T2: Mixed batch with tolerance ALL, no reporter — bad record skipped + // ============================================================ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + @SuppressWarnings("unchecked") + public void mixedBatchToleranceAll_noReporter_badRecordSkipped() throws Exception { + // Arrange + IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); + SinkRecordTransformer transformer = createTransformer(idStrategy, null, ToleranceOnErrorLevel.ALL); + + Map goodValue = new HashMap<>(); + goodValue.put("data", "hello"); + + Map badValue = new HashMap<>(); + badValue.put("fail", true); + + List batch = Arrays.asList( + createMapRecord("topicA", 1, 10L, goodValue), + createMapRecord("topicA", 1, 11L, badValue) + ); + + // Act — should NOT throw + List result = transformer.transform("container2", batch); + + // Assert — only 1 valid record + assertThat(result.size()).isEqualTo(1); + assertThat(((Map) result.get(0).value()).get("id")).isEqualTo("generated-id-10"); + } + + // ============================================================ + // T3: Mixed batch with tolerance NONE, no reporter — exception thrown + // ============================================================ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void mixedBatchToleranceNone_noReporter_exceptionThrown() throws Exception { + // Arrange + IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); + SinkRecordTransformer transformer = createTransformer(idStrategy, null, ToleranceOnErrorLevel.NONE); + + Map goodValue = new HashMap<>(); + goodValue.put("data", "hello"); + + Map badValue = new HashMap<>(); + badValue.put("fail", true); + + List batch = Arrays.asList( + createMapRecord("topicB", 2, 20L, goodValue), + createMapRecord("topicB", 2, 21L, badValue) + ); + + // Act + Throwable thrown = catchThrowable(() -> transformer.transform("container3", batch)); + + // Assert — exception is thrown (fail-fast preserved) + assertThat(thrown).isInstanceOf(ConnectException.class); + assertThat(thrown.getMessage()).contains("Cannot generate ID"); + } + + // ============================================================ + // T4: All records valid — no errors, all records in output (regression) + // ============================================================ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + @SuppressWarnings("unchecked") + public void allValidRecords_allInOutput() throws Exception { + // Arrange + IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); + ErrantRecordReporter reporter = Mockito.mock(ErrantRecordReporter.class); + SinkRecordTransformer transformer = createTransformer(idStrategy, reporter, ToleranceOnErrorLevel.NONE); + + Map value1 = new HashMap<>(); + value1.put("data", "a"); + Map value2 = new HashMap<>(); + value2.put("data", "b"); + Map value3 = new HashMap<>(); + value3.put("data", "c"); + + List batch = Arrays.asList( + createMapRecord("topicC", 0, 100L, value1), + createMapRecord("topicC", 0, 101L, value2), + createMapRecord("topicC", 0, 102L, value3) + ); + + // Act + List result = transformer.transform("container4", batch); + + // Assert — all 3 records in output + assertThat(result.size()).isEqualTo(3); + for (int i = 0; i < 3; i++) { + assertThat(((Map) result.get(i).value()).get("id")) + .isEqualTo("generated-id-" + (100 + i)); + } + + // Assert — reporter never called + verify(reporter, never()).report(any(), any()); + } + + // ============================================================ + // T5: All records bad with reporter — all reported to DLQ, empty output + // ============================================================ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void allBadRecordsWithReporter_allReportedEmptyOutput() throws Exception { + // Arrange + IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); + ErrantRecordReporter reporter = Mockito.mock(ErrantRecordReporter.class); + Future mockFuture = Mockito.mock(Future.class); + when(reporter.report(any(SinkRecord.class), any(Throwable.class))).thenReturn(mockFuture); + + SinkRecordTransformer transformer = createTransformer(idStrategy, reporter, ToleranceOnErrorLevel.NONE); + + Map bad1 = new HashMap<>(); + bad1.put("fail", true); + Map bad2 = new HashMap<>(); + bad2.put("fail", true); + Map bad3 = new HashMap<>(); + bad3.put("fail", true); + + List batch = Arrays.asList( + createMapRecord("topicD", 0, 50L, bad1), + createMapRecord("topicD", 0, 51L, bad2), + createMapRecord("topicD", 0, 52L, bad3) + ); + + // Act + List result = transformer.transform("container5", batch); + + // Assert — empty output + assertThat(result.size()).isEqualTo(0); + + // Assert — reporter called 3 times + verify(reporter, times(3)).report(any(SinkRecord.class), any(ConnectException.class)); + } + + // ============================================================ + // T6: Reporter takes precedence over tolerance NONE — when reporter is present, + // report instead of throwing even when tolerance is NONE + // ============================================================ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void reporterTakesPrecedenceOverToleranceNone() throws Exception { + // Arrange + IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); + ErrantRecordReporter reporter = Mockito.mock(ErrantRecordReporter.class); + Future mockFuture = Mockito.mock(Future.class); + when(reporter.report(any(SinkRecord.class), any(Throwable.class))).thenReturn(mockFuture); + + // Tolerance is NONE but reporter is available + SinkRecordTransformer transformer = createTransformer(idStrategy, reporter, ToleranceOnErrorLevel.NONE); + + Map badValue = new HashMap<>(); + badValue.put("fail", true); + + List batch = Arrays.asList( + createMapRecord("topicE", 0, 0L, badValue) + ); + + // Act — should NOT throw because reporter handles it + Throwable thrown = catchThrowable(() -> transformer.transform("container6", batch)); + + // Assert + assertThat(thrown).isNull(); + verify(reporter, times(1)).report(any(SinkRecord.class), any(ConnectException.class)); + } +} From d49548cda4409ec52125eb2cce202a03b451b500 Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Tue, 26 May 2026 14:47:19 -0700 Subject: [PATCH 03/12] Guard ErrantRecordReporter.report() against exceptions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wrap errantRecordReporter.report() in its own try/catch to prevent DLQ reporter failures from collapsing the entire batch. When the reporter throws: - ToleranceOnErrorLevel.ALL: log and continue (skip the bad record) - ToleranceOnErrorLevel.NONE: rethrow the original transform exception Add T6/T7 tests covering both scenarios. Renumber T6→T8. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../sink/SinkRecordTransformer.java | 15 ++++- .../sink/SinkRecordTransformerTest.java | 65 ++++++++++++++++++- 2 files changed, 78 insertions(+), 2 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java index 970215c04a71..13d5d6dcffff 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java @@ -85,7 +85,20 @@ public List transform(String containerName, List sinkRec containerName, e); if (this.errantRecordReporter != null) { - this.errantRecordReporter.report(record, e); + try { + this.errantRecordReporter.report(record, e); + } catch (Exception reportException) { + LOGGER.error( + "Failed to report errant record to DLQ for topic {}, partition {}, offset {}, container {}.", + record.topic(), + record.kafkaPartition(), + record.kafkaOffset(), + containerName, + reportException); + if (this.toleranceOnErrorLevel != ToleranceOnErrorLevel.ALL) { + throw e; + } + } } else if (this.toleranceOnErrorLevel == ToleranceOnErrorLevel.ALL) { LOGGER.warn( "Skipping record from topic {}, partition {}, offset {}, container {} due to transform error " diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java index 52d8e04aba79..54dcd50ced42 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java @@ -256,7 +256,70 @@ public void allBadRecordsWithReporter_allReportedEmptyOutput() throws Exception } // ============================================================ - // T6: Reporter takes precedence over tolerance NONE — when reporter is present, + // T6: Reporter itself throws — with tolerance NONE, original exception rethrown + // ============================================================ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void reporterThrows_toleranceNone_originalExceptionRethrown() throws Exception { + // Arrange + IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); + ErrantRecordReporter reporter = Mockito.mock(ErrantRecordReporter.class); + when(reporter.report(any(SinkRecord.class), any(Throwable.class))) + .thenThrow(new ConnectException("DLQ write failed")); + + SinkRecordTransformer transformer = createTransformer(idStrategy, reporter, ToleranceOnErrorLevel.NONE); + + Map badValue = new HashMap<>(); + badValue.put("fail", true); + Map goodValue = new HashMap<>(); + goodValue.put("data", "after-bad"); + + List batch = Arrays.asList( + createMapRecord("topicF", 0, 0L, badValue), + createMapRecord("topicF", 0, 1L, goodValue) + ); + + // Act + Throwable thrown = catchThrowable(() -> transformer.transform("container7", batch)); + + // Assert — original transform exception, NOT the DLQ exception + assertThat(thrown).isInstanceOf(ConnectException.class); + assertThat(thrown.getMessage()).contains("Cannot generate ID"); + } + + // ============================================================ + // T7: Reporter itself throws — with tolerance ALL, record skipped and processing continues + // ============================================================ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + @SuppressWarnings("unchecked") + public void reporterThrows_toleranceAll_recordSkippedProcessingContinues() throws Exception { + // Arrange + IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); + ErrantRecordReporter reporter = Mockito.mock(ErrantRecordReporter.class); + when(reporter.report(any(SinkRecord.class), any(Throwable.class))) + .thenThrow(new ConnectException("DLQ write failed")); + + SinkRecordTransformer transformer = createTransformer(idStrategy, reporter, ToleranceOnErrorLevel.ALL); + + Map badValue = new HashMap<>(); + badValue.put("fail", true); + Map goodValue = new HashMap<>(); + goodValue.put("data", "survives"); + + List batch = Arrays.asList( + createMapRecord("topicG", 0, 0L, badValue), + createMapRecord("topicG", 0, 1L, goodValue) + ); + + // Act — should NOT throw + List result = transformer.transform("container8", batch); + + // Assert — only the good record survives + assertThat(result.size()).isEqualTo(1); + assertThat(((Map) result.get(0).value()).get("id")).isEqualTo("generated-id-1"); + } + + // ============================================================ + // T8: Reporter takes precedence over tolerance NONE — when reporter is present, // report instead of throwing even when tolerance is NONE // ============================================================ @Test(groups = {"unit"}, timeOut = TIMEOUT) From 82946c14385c66f22624404a1c03143ae92311a9 Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Tue, 26 May 2026 14:51:49 -0700 Subject: [PATCH 04/12] Align transformer error handling with writer pattern; harden MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address review findings: - Align DLQ/tolerance precedence with writer pattern: DLQ report is fire-and-forget side-effect, tolerance level controls continue-vs-throw. With tolerance=NONE + reporter, record is reported AND task fails. - Guard context.errantRecordReporter() against older Kafka Connect runtimes that lack the API (catch NoClassDefFoundError/NoSuchMethodError). - Add package-private constructor for testability (eliminates reflection). - Consolidate double-logging: one log entry per failed record. - Rewrite tests to use package-private constructor and align with new semantics. T8 now tests tolerance=NONE+reporter → DLQ+throw. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../implementation/sink/CosmosSinkTask.java | 12 +++- .../sink/SinkRecordTransformer.java | 42 ++++++----- .../sink/SinkRecordTransformerTest.java | 69 +++++++------------ 3 files changed, 59 insertions(+), 64 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java index 2db4aede492f..7a47678ba3a7 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java @@ -50,9 +50,19 @@ public void start(Map props) { this.sinkTaskConfig.getClientMetadataCachesSnapshot()); LOGGER.info("The taskId is " + this.sinkTaskConfig.getTaskId()); this.throughputControlClientItem = this.getThroughputControlCosmosClient(); + + ErrantRecordReporter errantRecordReporter = null; + try { + errantRecordReporter = this.context.errantRecordReporter(); + } catch (NoClassDefFoundError | NoSuchMethodError e) { + LOGGER.info( + "ErrantRecordReporter not available in this Kafka Connect runtime; " + + "DLQ will not be used for transform errors."); + } + this.sinkRecordTransformer = new SinkRecordTransformer( this.sinkTaskConfig, - this.context.errantRecordReporter(), + errantRecordReporter, this.sinkTaskConfig.getWriteConfig().getToleranceOnErrorLevel()); if (this.sinkTaskConfig.getWriteConfig().isBulkEnabled()) { diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java index 13d5d6dcffff..7cc13b2cf445 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java @@ -32,7 +32,15 @@ public SinkRecordTransformer( CosmosSinkTaskConfig sinkTaskConfig, ErrantRecordReporter errantRecordReporter, ToleranceOnErrorLevel toleranceOnErrorLevel) { - this.idStrategy = this.createIdStrategy(sinkTaskConfig); + this(createIdStrategy(sinkTaskConfig), errantRecordReporter, toleranceOnErrorLevel); + } + + // Package-private constructor for unit testing without requiring CosmosSinkTaskConfig. + SinkRecordTransformer( + IdStrategy idStrategy, + ErrantRecordReporter errantRecordReporter, + ToleranceOnErrorLevel toleranceOnErrorLevel) { + this.idStrategy = idStrategy; this.errantRecordReporter = errantRecordReporter; this.toleranceOnErrorLevel = toleranceOnErrorLevel; } @@ -77,13 +85,8 @@ public List transform(String containerName, List sinkRec toBeWrittenRecordList.add(updatedRecord); } catch (Exception e) { - LOGGER.warn( - "Failed to transform record from topic {}, partition {}, offset {}, container {}.", - record.topic(), - record.kafkaPartition(), - record.kafkaOffset(), - containerName, - e); + // Report to DLQ if configured (fire-and-forget, guarded against reporter failures). + // This is consistent with the writer-level pattern in CosmosWriterBase.sendToDlqIfConfigured(). if (this.errantRecordReporter != null) { try { this.errantRecordReporter.report(record, e); @@ -95,26 +98,27 @@ public List transform(String containerName, List sinkRec record.kafkaOffset(), containerName, reportException); - if (this.toleranceOnErrorLevel != ToleranceOnErrorLevel.ALL) { - throw e; - } } - } else if (this.toleranceOnErrorLevel == ToleranceOnErrorLevel.ALL) { + } + + // Use tolerance level to decide continue-vs-throw (consistent with writer pattern). + if (this.toleranceOnErrorLevel == ToleranceOnErrorLevel.ALL) { LOGGER.warn( - "Skipping record from topic {}, partition {}, offset {}, container {} due to transform error " - + "and ToleranceOnErrorLevel is ALL.", + "Skipping record from topic {}, partition {}, offset {}, container {} due to transform error.", record.topic(), record.kafkaPartition(), record.kafkaOffset(), - containerName); + containerName, + e); } else { LOGGER.error( - "Failing record from topic {}, partition {}, offset {}, container {} because no DLQ reporter " - + "is configured and ToleranceOnErrorLevel is NONE.", + "Failing task due to transform error for record from topic {}, partition {}, offset {}, " + + "container {}.", record.topic(), record.kafkaPartition(), record.kafkaOffset(), - containerName); + containerName, + e); throw e; } } @@ -132,7 +136,7 @@ private void maybeInsertId(Object recordValue, SinkRecord sinkRecord) { recordMap.put("id", this.idStrategy.generateId(sinkRecord)); } - private IdStrategy createIdStrategy(CosmosSinkTaskConfig sinkTaskConfig) { + private static IdStrategy createIdStrategy(CosmosSinkTaskConfig sinkTaskConfig) { IdStrategy idStrategyClass; switch (sinkTaskConfig.getIdStrategy()) { case FULL_KEY_STRATEGY: diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java index 54dcd50ced42..e2fe731972c4 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java @@ -4,7 +4,6 @@ package com.azure.cosmos.kafka.connect.implementation.sink; import com.azure.cosmos.kafka.connect.implementation.sink.idstrategy.IdStrategy; -import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; @@ -21,35 +20,14 @@ import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.CALLS_REAL_METHODS; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.withSettings; public class SinkRecordTransformerTest { private static final int TIMEOUT = 60000; - /** - * Creates a SinkRecordTransformer bypassing the constructor that needs CosmosSinkTaskConfig. - * Uses Mockito with CALLS_REAL_METHODS so the transform() method executes real code, - * then injects the fields via reflection. - */ - private SinkRecordTransformer createTransformer( - IdStrategy idStrategy, - ErrantRecordReporter reporter, - ToleranceOnErrorLevel toleranceLevel) throws Exception { - - SinkRecordTransformer transformer = Mockito.mock( - SinkRecordTransformer.class, - withSettings().defaultAnswer(CALLS_REAL_METHODS)); - FieldUtils.writeField(transformer, "idStrategy", idStrategy, true); - FieldUtils.writeField(transformer, "errantRecordReporter", reporter, true); - FieldUtils.writeField(transformer, "toleranceOnErrorLevel", toleranceLevel, true); - return transformer; - } - /** * Creates a SinkRecord with a Map value containing the given fields. */ @@ -80,18 +58,18 @@ private IdStrategy createSelectivelyFailingIdStrategy() { } // ============================================================ - // T1: Mixed batch with reporter — bad record goes to DLQ, valid records in output + // T1: Mixed batch with reporter + tolerance ALL — bad record goes to DLQ, valid records in output // ============================================================ @Test(groups = {"unit"}, timeOut = TIMEOUT) @SuppressWarnings("unchecked") - public void mixedBatchWithReporter_badRecordReportedValidRecordsInOutput() throws Exception { + public void mixedBatchWithReporterToleranceAll_badRecordReportedValidRecordsInOutput() throws Exception { // Arrange IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); ErrantRecordReporter reporter = Mockito.mock(ErrantRecordReporter.class); Future mockFuture = Mockito.mock(Future.class); when(reporter.report(any(SinkRecord.class), any(Throwable.class))).thenReturn(mockFuture); - SinkRecordTransformer transformer = createTransformer(idStrategy, reporter, ToleranceOnErrorLevel.NONE); + SinkRecordTransformer transformer = new SinkRecordTransformer(idStrategy, reporter, ToleranceOnErrorLevel.ALL); Map goodValue1 = new HashMap<>(); goodValue1.put("data", "hello"); @@ -132,7 +110,7 @@ public void mixedBatchWithReporter_badRecordReportedValidRecordsInOutput() throw public void mixedBatchToleranceAll_noReporter_badRecordSkipped() throws Exception { // Arrange IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); - SinkRecordTransformer transformer = createTransformer(idStrategy, null, ToleranceOnErrorLevel.ALL); + SinkRecordTransformer transformer = new SinkRecordTransformer(idStrategy, null, ToleranceOnErrorLevel.ALL); Map goodValue = new HashMap<>(); goodValue.put("data", "hello"); @@ -154,13 +132,13 @@ public void mixedBatchToleranceAll_noReporter_badRecordSkipped() throws Exceptio } // ============================================================ - // T3: Mixed batch with tolerance NONE, no reporter — exception thrown + // T3: Mixed batch with tolerance NONE, no reporter — exception thrown (fail-fast) // ============================================================ @Test(groups = {"unit"}, timeOut = TIMEOUT) public void mixedBatchToleranceNone_noReporter_exceptionThrown() throws Exception { // Arrange IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); - SinkRecordTransformer transformer = createTransformer(idStrategy, null, ToleranceOnErrorLevel.NONE); + SinkRecordTransformer transformer = new SinkRecordTransformer(idStrategy, null, ToleranceOnErrorLevel.NONE); Map goodValue = new HashMap<>(); goodValue.put("data", "hello"); @@ -190,7 +168,7 @@ public void allValidRecords_allInOutput() throws Exception { // Arrange IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); ErrantRecordReporter reporter = Mockito.mock(ErrantRecordReporter.class); - SinkRecordTransformer transformer = createTransformer(idStrategy, reporter, ToleranceOnErrorLevel.NONE); + SinkRecordTransformer transformer = new SinkRecordTransformer(idStrategy, reporter, ToleranceOnErrorLevel.NONE); Map value1 = new HashMap<>(); value1.put("data", "a"); @@ -220,17 +198,17 @@ public void allValidRecords_allInOutput() throws Exception { } // ============================================================ - // T5: All records bad with reporter — all reported to DLQ, empty output + // T5: All records bad with reporter + tolerance ALL — all reported to DLQ, empty output // ============================================================ @Test(groups = {"unit"}, timeOut = TIMEOUT) - public void allBadRecordsWithReporter_allReportedEmptyOutput() throws Exception { + public void allBadRecordsWithReporterToleranceAll_allReportedEmptyOutput() throws Exception { // Arrange IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); ErrantRecordReporter reporter = Mockito.mock(ErrantRecordReporter.class); Future mockFuture = Mockito.mock(Future.class); when(reporter.report(any(SinkRecord.class), any(Throwable.class))).thenReturn(mockFuture); - SinkRecordTransformer transformer = createTransformer(idStrategy, reporter, ToleranceOnErrorLevel.NONE); + SinkRecordTransformer transformer = new SinkRecordTransformer(idStrategy, reporter, ToleranceOnErrorLevel.ALL); Map bad1 = new HashMap<>(); bad1.put("fail", true); @@ -266,7 +244,7 @@ public void reporterThrows_toleranceNone_originalExceptionRethrown() throws Exce when(reporter.report(any(SinkRecord.class), any(Throwable.class))) .thenThrow(new ConnectException("DLQ write failed")); - SinkRecordTransformer transformer = createTransformer(idStrategy, reporter, ToleranceOnErrorLevel.NONE); + SinkRecordTransformer transformer = new SinkRecordTransformer(idStrategy, reporter, ToleranceOnErrorLevel.NONE); Map badValue = new HashMap<>(); badValue.put("fail", true); @@ -298,7 +276,7 @@ public void reporterThrows_toleranceAll_recordSkippedProcessingContinues() throw when(reporter.report(any(SinkRecord.class), any(Throwable.class))) .thenThrow(new ConnectException("DLQ write failed")); - SinkRecordTransformer transformer = createTransformer(idStrategy, reporter, ToleranceOnErrorLevel.ALL); + SinkRecordTransformer transformer = new SinkRecordTransformer(idStrategy, reporter, ToleranceOnErrorLevel.ALL); Map badValue = new HashMap<>(); badValue.put("fail", true); @@ -319,32 +297,35 @@ public void reporterThrows_toleranceAll_recordSkippedProcessingContinues() throw } // ============================================================ - // T8: Reporter takes precedence over tolerance NONE — when reporter is present, - // report instead of throwing even when tolerance is NONE + // T8: Tolerance NONE with reporter — record reported to DLQ AND exception thrown + // (consistent with writer-level pattern: DLQ is side-effect, tolerance controls flow) // ============================================================ @Test(groups = {"unit"}, timeOut = TIMEOUT) - public void reporterTakesPrecedenceOverToleranceNone() throws Exception { + public void toleranceNoneWithReporter_reportedToDlqAndExceptionThrown() throws Exception { // Arrange IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); ErrantRecordReporter reporter = Mockito.mock(ErrantRecordReporter.class); Future mockFuture = Mockito.mock(Future.class); when(reporter.report(any(SinkRecord.class), any(Throwable.class))).thenReturn(mockFuture); - // Tolerance is NONE but reporter is available - SinkRecordTransformer transformer = createTransformer(idStrategy, reporter, ToleranceOnErrorLevel.NONE); + // Tolerance is NONE — task should fail even though reporter is available + SinkRecordTransformer transformer = new SinkRecordTransformer(idStrategy, reporter, ToleranceOnErrorLevel.NONE); Map badValue = new HashMap<>(); badValue.put("fail", true); List batch = Arrays.asList( - createMapRecord("topicE", 0, 0L, badValue) + createMapRecord("topicH", 0, 0L, badValue) ); - // Act — should NOT throw because reporter handles it - Throwable thrown = catchThrowable(() -> transformer.transform("container6", batch)); + // Act + Throwable thrown = catchThrowable(() -> transformer.transform("container9", batch)); + + // Assert — exception IS thrown (tolerance NONE means fail) + assertThat(thrown).isInstanceOf(ConnectException.class); + assertThat(thrown.getMessage()).contains("Cannot generate ID"); - // Assert - assertThat(thrown).isNull(); + // Assert — reporter WAS called (DLQ is side-effect for observability) verify(reporter, times(1)).report(any(SinkRecord.class), any(ConnectException.class)); } } From 4ac387bf70f4e482224f03e6afd08b26bf834961 Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Tue, 26 May 2026 15:41:58 -0700 Subject: [PATCH 05/12] fix: catch RuntimeException instead of Exception to fix compilation Change catch clause from Exception (checked) to RuntimeException (unchecked) since transform() doesn't declare throws Exception. ConnectException and all other exceptions from ID strategies are RuntimeException subclasses. This fixes the CI build failure. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../connect/implementation/sink/SinkRecordTransformer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java index 7cc13b2cf445..159c8364ff78 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java @@ -84,7 +84,7 @@ public List transform(String containerName, List sinkRec record.headers()); toBeWrittenRecordList.add(updatedRecord); - } catch (Exception e) { + } catch (RuntimeException e) { // Report to DLQ if configured (fire-and-forget, guarded against reporter failures). // This is consistent with the writer-level pattern in CosmosWriterBase.sendToDlqIfConfigured(). if (this.errantRecordReporter != null) { From e5f697da3688429238dfa269574fdc2f049f5c34 Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Wed, 27 May 2026 13:28:34 -0700 Subject: [PATCH 06/12] fix: remove errantRecordReporter guard to match writer pattern; fix test compilation - Remove try-catch guard around errantRecordReporter() in CosmosSinkTask, passing it directly like the writer pattern (fixes compilation error from missing ErrantRecordReporter import) - Fix Future to Future in test mocks to match ErrantRecordReporter.report() return type - Add missing @SuppressWarnings("unchecked") annotations on test methods Addresses review comment from xinlian12 and fixes CI build failure. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../connect/implementation/sink/CosmosSinkTask.java | 11 +---------- .../sink/SinkRecordTransformerTest.java | 8 +++++--- 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java index 7a47678ba3a7..b66e40699c46 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java @@ -51,18 +51,9 @@ public void start(Map props) { LOGGER.info("The taskId is " + this.sinkTaskConfig.getTaskId()); this.throughputControlClientItem = this.getThroughputControlCosmosClient(); - ErrantRecordReporter errantRecordReporter = null; - try { - errantRecordReporter = this.context.errantRecordReporter(); - } catch (NoClassDefFoundError | NoSuchMethodError e) { - LOGGER.info( - "ErrantRecordReporter not available in this Kafka Connect runtime; " - + "DLQ will not be used for transform errors."); - } - this.sinkRecordTransformer = new SinkRecordTransformer( this.sinkTaskConfig, - errantRecordReporter, + this.context.errantRecordReporter(), this.sinkTaskConfig.getWriteConfig().getToleranceOnErrorLevel()); if (this.sinkTaskConfig.getWriteConfig().isBulkEnabled()) { diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java index e2fe731972c4..112f36914853 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java @@ -66,7 +66,7 @@ public void mixedBatchWithReporterToleranceAll_badRecordReportedValidRecordsInOu // Arrange IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); ErrantRecordReporter reporter = Mockito.mock(ErrantRecordReporter.class); - Future mockFuture = Mockito.mock(Future.class); + Future mockFuture = Mockito.mock(Future.class); when(reporter.report(any(SinkRecord.class), any(Throwable.class))).thenReturn(mockFuture); SinkRecordTransformer transformer = new SinkRecordTransformer(idStrategy, reporter, ToleranceOnErrorLevel.ALL); @@ -201,11 +201,12 @@ public void allValidRecords_allInOutput() throws Exception { // T5: All records bad with reporter + tolerance ALL — all reported to DLQ, empty output // ============================================================ @Test(groups = {"unit"}, timeOut = TIMEOUT) + @SuppressWarnings("unchecked") public void allBadRecordsWithReporterToleranceAll_allReportedEmptyOutput() throws Exception { // Arrange IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); ErrantRecordReporter reporter = Mockito.mock(ErrantRecordReporter.class); - Future mockFuture = Mockito.mock(Future.class); + Future mockFuture = Mockito.mock(Future.class); when(reporter.report(any(SinkRecord.class), any(Throwable.class))).thenReturn(mockFuture); SinkRecordTransformer transformer = new SinkRecordTransformer(idStrategy, reporter, ToleranceOnErrorLevel.ALL); @@ -301,11 +302,12 @@ public void reporterThrows_toleranceAll_recordSkippedProcessingContinues() throw // (consistent with writer-level pattern: DLQ is side-effect, tolerance controls flow) // ============================================================ @Test(groups = {"unit"}, timeOut = TIMEOUT) + @SuppressWarnings("unchecked") public void toleranceNoneWithReporter_reportedToDlqAndExceptionThrown() throws Exception { // Arrange IdStrategy idStrategy = createSelectivelyFailingIdStrategy(); ErrantRecordReporter reporter = Mockito.mock(ErrantRecordReporter.class); - Future mockFuture = Mockito.mock(Future.class); + Future mockFuture = Mockito.mock(Future.class); when(reporter.report(any(SinkRecord.class), any(Throwable.class))).thenReturn(mockFuture); // Tolerance is NONE — task should fail even though reporter is available From d5927e75a2d11ffb14e0939abff99e70d04f7afc Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Wed, 27 May 2026 13:37:51 -0700 Subject: [PATCH 07/12] revert gitignore change --- .gitignore | 1 - 1 file changed, 1 deletion(-) diff --git a/.gitignore b/.gitignore index ca7abdd4bb4d..787fe08c9a86 100644 --- a/.gitignore +++ b/.gitignore @@ -129,5 +129,4 @@ TempTypeSpecFiles/ # Azure Artifacts Credential Provider runtime .azure-artifacts/ -.coding-harness/ From e0407211eb69a2785b9a06093d6647f5bedb3d13 Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Wed, 27 May 2026 13:39:21 -0700 Subject: [PATCH 08/12] add changelog --- sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md b/sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md index 95d5b6501bc3..44ad339f8f03 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md @@ -7,6 +7,7 @@ #### Breaking Changes #### Bugs Fixed +* Fixed per-record ID-generation failures in Kafka sink transformer to isolate bad records instead of failing the entire batch. - See [PR 49286](https://github.com/Azure/azure-sdk-for-java/pull/49286) #### Other Changes From 3f89a6a0c9786888156d8882945158ac6f0b758b Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Wed, 27 May 2026 13:44:17 -0700 Subject: [PATCH 09/12] update changelog --- sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md b/sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md index 44ad339f8f03..8099c01f8488 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md @@ -7,7 +7,7 @@ #### Breaking Changes #### Bugs Fixed -* Fixed per-record ID-generation failures in Kafka sink transformer to isolate bad records instead of failing the entire batch. - See [PR 49286](https://github.com/Azure/azure-sdk-for-java/pull/49286) +* Fixed per-record error isolation in Kafka sink connector to honor DLQ and tolerance settings, instead of failing the entire batch when a single record fails. - See [PR 49286](https://github.com/Azure/azure-sdk-for-java/pull/49286) #### Other Changes From 023af38405f5ea6e1302b5e1417eb31677854036 Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Wed, 27 May 2026 15:25:31 -0700 Subject: [PATCH 10/12] =?UTF-8?q?fix:=20resolve=20PR=20review=20comments?= =?UTF-8?q?=20=E2=80=94=20guard=20DLQ=20reporter,=20fix=20comments,=20add?= =?UTF-8?q?=20conversion=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - SinkRecordTransformer: Replace inaccurate 'consistent with CosmosWriterBase' comment with accurate rationale referencing Kafka Connect best practices (DLQ is fire-and-forget side-effect, reporter failures swallowed) - CosmosWriterBase.sendToDlqIfConfigured: Add try-catch guard around errantRecordReporter.report() to align with industry standard — DLQ failures should not mask original errors or crash the connector - SinkRecordTransformerTest: Add T9/T10 tests exercising Struct value conversion failure path (StructToJsonMap.toJsonMap()), verifying the catch scope covers all record processing, not just ID generation Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../implementation/sink/CosmosWriterBase.java | 11 ++- .../sink/SinkRecordTransformer.java | 5 +- .../sink/SinkRecordTransformerTest.java | 79 +++++++++++++++++++ 3 files changed, 92 insertions(+), 3 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosWriterBase.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosWriterBase.java index 5381e2836835..60bc0d9525f4 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosWriterBase.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosWriterBase.java @@ -84,7 +84,16 @@ protected boolean shouldRetry(Throwable exception, int attemptedCount, int maxRe protected void sendToDlqIfConfigured(SinkOperation sinkOperationContext) { if (this.errantRecordReporter != null) { - errantRecordReporter.report(sinkOperationContext.getSinkRecord(), sinkOperationContext.getException()); + try { + errantRecordReporter.report(sinkOperationContext.getSinkRecord(), sinkOperationContext.getException()); + } catch (Exception reportException) { + LOGGER.error( + "Failed to report errant record to DLQ for topic {}, partition {}, offset {}.", + sinkOperationContext.getTopic(), + sinkOperationContext.getKafkaPartition(), + sinkOperationContext.getKafkaOffset(), + reportException); + } } } } diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java index 159c8364ff78..70fdf2f2d33e 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java @@ -86,7 +86,8 @@ public List transform(String containerName, List sinkRec toBeWrittenRecordList.add(updatedRecord); } catch (RuntimeException e) { // Report to DLQ if configured (fire-and-forget, guarded against reporter failures). - // This is consistent with the writer-level pattern in CosmosWriterBase.sendToDlqIfConfigured(). + // Per Kafka Connect best practices, DLQ reporting is a side-effect for observability — + // reporter failures are swallowed so they do not mask the original processing error. if (this.errantRecordReporter != null) { try { this.errantRecordReporter.report(record, e); @@ -101,7 +102,7 @@ public List transform(String containerName, List sinkRec } } - // Use tolerance level to decide continue-vs-throw (consistent with writer pattern). + // Use tolerance level to decide continue-vs-throw. if (this.toleranceOnErrorLevel == ToleranceOnErrorLevel.ALL) { LOGGER.warn( "Skipping record from topic {}, partition {}, offset {}, container {} due to transform error.", diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java index 112f36914853..2c9813f89b26 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java @@ -4,6 +4,9 @@ package com.azure.cosmos.kafka.connect.implementation.sink; import com.azure.cosmos.kafka.connect.implementation.sink.idstrategy.IdStrategy; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; @@ -12,6 +15,7 @@ import org.testng.annotations.Test; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -330,4 +334,79 @@ public void toleranceNoneWithReporter_reportedToDlqAndExceptionThrown() throws E // Assert — reporter WAS called (DLQ is side-effect for observability) verify(reporter, times(1)).report(any(SinkRecord.class), any(ConnectException.class)); } + + // ============================================================ + // T9: Value conversion failure (Struct → JSON) with reporter + tolerance ALL + // — exercises the broader catch scope beyond just ID generation + // ============================================================ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + @SuppressWarnings("unchecked") + public void structConversionFailure_toleranceAll_reportedToDlqAndSkipped() throws Exception { + // Arrange — ID strategy that always succeeds; the failure comes from Struct conversion + IdStrategy idStrategy = Mockito.mock(IdStrategy.class); + when(idStrategy.generateId(any(SinkRecord.class))).thenReturn("any-id"); + + ErrantRecordReporter reporter = Mockito.mock(ErrantRecordReporter.class); + Future mockFuture = Mockito.mock(Future.class); + when(reporter.report(any(SinkRecord.class), any(Throwable.class))).thenReturn(mockFuture); + + SinkRecordTransformer transformer = new SinkRecordTransformer(idStrategy, reporter, ToleranceOnErrorLevel.ALL); + + // A Struct whose schema has a field, but accessing that field throws + // (simulates malformed Struct data that fails during StructToJsonMap.toJsonMap()) + Schema schema = SchemaBuilder.struct().field("data", Schema.STRING_SCHEMA).build(); + Struct malformedStruct = Mockito.mock(Struct.class); + when(malformedStruct.schema()).thenReturn(schema); + when(malformedStruct.getString("data")).thenThrow(new org.apache.kafka.connect.errors.DataException("Schema mismatch")); + + SinkRecord badStructRecord = new SinkRecord("topicI", 0, null, "key-bad", schema, malformedStruct, 0L); + + Map goodValue = new HashMap<>(); + goodValue.put("data", "ok"); + SinkRecord goodRecord = createMapRecord("topicI", 0, 1L, goodValue); + + List batch = Arrays.asList(badStructRecord, goodRecord); + + // Act — should NOT throw + List result = transformer.transform("container10", batch); + + // Assert — only the good Map record survives + assertThat(result.size()).isEqualTo(1); + assertThat(((Map) result.get(0).value()).get("id")).isEqualTo("any-id"); + + // Assert — reporter called once for the malformed Struct record + ArgumentCaptor recordCaptor = ArgumentCaptor.forClass(SinkRecord.class); + ArgumentCaptor errorCaptor = ArgumentCaptor.forClass(Throwable.class); + verify(reporter, times(1)).report(recordCaptor.capture(), errorCaptor.capture()); + assertThat(recordCaptor.getValue().kafkaOffset()).isEqualTo(0L); + assertThat(errorCaptor.getValue()).isInstanceOf(org.apache.kafka.connect.errors.DataException.class); + } + + // ============================================================ + // T10: Value conversion failure with tolerance NONE — exception thrown (fail-fast) + // ============================================================ + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void structConversionFailure_toleranceNone_exceptionThrown() throws Exception { + // Arrange + IdStrategy idStrategy = Mockito.mock(IdStrategy.class); + when(idStrategy.generateId(any(SinkRecord.class))).thenReturn("any-id"); + + SinkRecordTransformer transformer = new SinkRecordTransformer(idStrategy, null, ToleranceOnErrorLevel.NONE); + + Schema schema = SchemaBuilder.struct().field("data", Schema.STRING_SCHEMA).build(); + Struct malformedStruct = Mockito.mock(Struct.class); + when(malformedStruct.schema()).thenReturn(schema); + when(malformedStruct.getString("data")).thenThrow(new org.apache.kafka.connect.errors.DataException("Schema mismatch")); + + SinkRecord badStructRecord = new SinkRecord("topicJ", 0, null, "key-bad", schema, malformedStruct, 0L); + + List batch = Collections.singletonList(badStructRecord); + + // Act + Throwable thrown = catchThrowable(() -> transformer.transform("container11", batch)); + + // Assert — DataException is thrown (fail-fast preserved) + assertThat(thrown).isInstanceOf(org.apache.kafka.connect.errors.DataException.class); + assertThat(thrown.getMessage()).contains("Schema mismatch"); + } } From f23cbadd5c8c2a6884a83c133c9828bdea1886ea Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Wed, 27 May 2026 15:51:58 -0700 Subject: [PATCH 11/12] docs: update changelog to reflect DLQ reporter guard in CosmosWriterBase Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md b/sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md index 8099c01f8488..10a5469c9d80 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md @@ -7,7 +7,8 @@ #### Breaking Changes #### Bugs Fixed -* Fixed per-record error isolation in Kafka sink connector to honor DLQ and tolerance settings, instead of failing the entire batch when a single record fails. - See [PR 49286](https://github.com/Azure/azure-sdk-for-java/pull/49286) +* Fixed per-record error isolation in Kafka sink transformer to honor DLQ and tolerance settings, instead of failing the entire batch when a single record fails during transformation. - See [PR 49286](https://github.com/Azure/azure-sdk-for-java/pull/49286) +* Guarded `ErrantRecordReporter.report()` in `CosmosWriterBase` against secondary failures so DLQ errors do not mask original write failures. - See [PR 49286](https://github.com/Azure/azure-sdk-for-java/pull/49286) #### Other Changes From 433fec12a7ac142cd1f46080988124a06fec2c18 Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Fri, 29 May 2026 09:32:02 -0700 Subject: [PATCH 12/12] refactor: extract shared DLQ reporting into DlqReportHelper Address review feedback from tvaron3: extract the duplicated fire-and-forget DLQ reporter logic from CosmosWriterBase and SinkRecordTransformer into a shared DlqReportHelper utility. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../implementation/sink/CosmosWriterBase.java | 17 ++---- .../implementation/sink/DlqReportHelper.java | 52 +++++++++++++++++++ .../sink/SinkRecordTransformer.java | 16 +----- 3 files changed, 58 insertions(+), 27 deletions(-) create mode 100644 sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/DlqReportHelper.java diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosWriterBase.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosWriterBase.java index 60bc0d9525f4..89b9d00a1041 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosWriterBase.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosWriterBase.java @@ -83,17 +83,10 @@ protected boolean shouldRetry(Throwable exception, int attemptedCount, int maxRe } protected void sendToDlqIfConfigured(SinkOperation sinkOperationContext) { - if (this.errantRecordReporter != null) { - try { - errantRecordReporter.report(sinkOperationContext.getSinkRecord(), sinkOperationContext.getException()); - } catch (Exception reportException) { - LOGGER.error( - "Failed to report errant record to DLQ for topic {}, partition {}, offset {}.", - sinkOperationContext.getTopic(), - sinkOperationContext.getKafkaPartition(), - sinkOperationContext.getKafkaOffset(), - reportException); - } - } + DlqReportHelper.reportToDlqIfConfigured( + this.errantRecordReporter, + sinkOperationContext.getSinkRecord(), + sinkOperationContext.getException(), + LOGGER); } } diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/DlqReportHelper.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/DlqReportHelper.java new file mode 100644 index 000000000000..8c49febde534 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/DlqReportHelper.java @@ -0,0 +1,52 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.kafka.connect.implementation.sink; + +import org.apache.kafka.connect.sink.ErrantRecordReporter; +import org.apache.kafka.connect.sink.SinkRecord; +import org.slf4j.Logger; + +/** + * Shared helper for DLQ (Dead Letter Queue) reporting. + * + *

Both {@link CosmosWriterBase} and {@link SinkRecordTransformer} need fire-and-forget + * DLQ reporting that guards against reporter failures. This helper centralises that logic. + */ +final class DlqReportHelper { + + private DlqReportHelper() { + } + + /** + * Reports a failed record to the DLQ if a reporter is configured. + * + *

Per Kafka Connect best practices, DLQ reporting is a side-effect for observability — + * reporter failures are swallowed so they do not mask the original processing error. + * + * @param reporter the errant record reporter, may be {@code null} + * @param record the sink record that failed processing + * @param error the original processing error + * @param logger the caller's logger for error reporting + */ + static void reportToDlqIfConfigured( + ErrantRecordReporter reporter, + SinkRecord record, + Throwable error, + Logger logger) { + + if (reporter == null) { + return; + } + try { + reporter.report(record, error); + } catch (Exception reportException) { + logger.error( + "Failed to report errant record to DLQ for topic {}, partition {}, offset {}.", + record.topic(), + record.kafkaPartition(), + record.kafkaOffset(), + reportException); + } + } +} diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java index 70fdf2f2d33e..4f0841980880 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java @@ -86,21 +86,7 @@ public List transform(String containerName, List sinkRec toBeWrittenRecordList.add(updatedRecord); } catch (RuntimeException e) { // Report to DLQ if configured (fire-and-forget, guarded against reporter failures). - // Per Kafka Connect best practices, DLQ reporting is a side-effect for observability — - // reporter failures are swallowed so they do not mask the original processing error. - if (this.errantRecordReporter != null) { - try { - this.errantRecordReporter.report(record, e); - } catch (Exception reportException) { - LOGGER.error( - "Failed to report errant record to DLQ for topic {}, partition {}, offset {}, container {}.", - record.topic(), - record.kafkaPartition(), - record.kafkaOffset(), - containerName, - reportException); - } - } + DlqReportHelper.reportToDlqIfConfigured(this.errantRecordReporter, record, e, LOGGER); // Use tolerance level to decide continue-vs-throw. if (this.toleranceOnErrorLevel == ToleranceOnErrorLevel.ALL) {