fix: isolate per-record failures in Kafka sink transformer and guard DLQ reporter#49286
Merged
xinlian12 merged 13 commits intoMay 29, 2026
Merged
Conversation
…mer (Azure#49268) When a single record's ID strategy fails (e.g., ProvidedInStrategy JsonPath parse error), only that record should be routed to DLQ — not the entire batch. Previously, SinkRecordTransformer.transform() had no per-record error handling, so one malformed record would abort transformation of all records in the batch. Changes: - SinkRecordTransformer: Add per-record try-catch in transform(). Accept ErrantRecordReporter and ToleranceOnErrorLevel. Report failing records to DLQ when available, skip when tolerance is ALL, rethrow when tolerance is NONE. - CosmosSinkTask: Pass reporter and tolerance to SinkRecordTransformer. Fix written-record bookkeeping to count only successfully transformed records. Fixes Azure#49268 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Covers: DLQ reporting, tolerance ALL skip, tolerance NONE rethrow, all-valid regression, all-bad with reporter, reporter precedence over tolerance. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Wrap errantRecordReporter.report() in its own try/catch to prevent DLQ reporter failures from collapsing the entire batch. When the reporter throws: - ToleranceOnErrorLevel.ALL: log and continue (skip the bad record) - ToleranceOnErrorLevel.NONE: rethrow the original transform exception Add T6/T7 tests covering both scenarios. Renumber T6→T8. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Address review findings: - Align DLQ/tolerance precedence with writer pattern: DLQ report is fire-and-forget side-effect, tolerance level controls continue-vs-throw. With tolerance=NONE + reporter, record is reported AND task fails. - Guard context.errantRecordReporter() against older Kafka Connect runtimes that lack the API (catch NoClassDefFoundError/NoSuchMethodError). - Add package-private constructor for testability (eliminates reflection). - Consolidate double-logging: one log entry per failed record. - Rewrite tests to use package-private constructor and align with new semantics. T8 now tests tolerance=NONE+reporter → DLQ+throw. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Change catch clause from Exception (checked) to RuntimeException (unchecked) since transform() doesn't declare throws Exception. ConnectException and all other exceptions from ID strategies are RuntimeException subclasses. This fixes the CI build failure. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…est compilation - Remove try-catch guard around errantRecordReporter() in CosmosSinkTask, passing it directly like the writer pattern (fixes compilation error from missing ErrantRecordReporter import) - Fix Future<?> to Future<Void> in test mocks to match ErrantRecordReporter.report() return type - Add missing @SuppressWarnings("unchecked") annotations on test methods Addresses review comment from xinlian12 and fixes CI build failure. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Contributor
There was a problem hiding this comment.
Pull request overview
This PR updates the Cosmos DB Kafka Sink connector to isolate per-record ID-generation/transform failures so that a single malformed SinkRecord no longer aborts the entire batch and routes all records to the DLQ.
Changes:
- Added per-record exception isolation in
SinkRecordTransformer.transform()with DLQ reporting (when available) and tolerance-driven continue vs fail-fast behavior. - Wired
ErrantRecordReporterandToleranceOnErrorLevelintoCosmosSinkTaskand fixed written-record bookkeeping to count post-transform records. - Added unit tests covering mixed/invalid batches, reporter failures, and tolerance behaviors.
Show a summary per file
| File | Description |
|---|---|
| sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java | Adds per-record try/catch, optional DLQ reporting, and tolerance-based control flow during record transformation/ID injection. |
| sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java | Passes reporter/tolerance into the transformer and fixes per-container written record counting to use transformed output size. |
| sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java | Introduces unit tests validating per-record isolation, DLQ reporting behavior, and tolerance handling. |
| .gitignore | Ignores .coding-harness/ directory. |
Copilot's findings
- Files reviewed: 3/3 changed files
- Comments generated: 2
Member
Author
|
@sdkReviewAgent |
Member
Author
|
✅ Review complete (37:40) Posted 1 inline comment(s). Steps: ✓ context, correctness, cross-sdk, design, history, past-prs, synthesis, test-coverage |
…dd conversion tests - SinkRecordTransformer: Replace inaccurate 'consistent with CosmosWriterBase' comment with accurate rationale referencing Kafka Connect best practices (DLQ is fire-and-forget side-effect, reporter failures swallowed) - CosmosWriterBase.sendToDlqIfConfigured: Add try-catch guard around errantRecordReporter.report() to align with industry standard — DLQ failures should not mask original errors or crash the connector - SinkRecordTransformerTest: Add T9/T10 tests exercising Struct value conversion failure path (StructToJsonMap.toJsonMap()), verifying the catch scope covers all record processing, not just ID generation Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Member
Author
|
/azp run java - cosmos - kafka |
|
Azure Pipelines successfully started running 1 pipeline(s). |
tvaron3
approved these changes
May 29, 2026
kushagraThapar
approved these changes
May 29, 2026
Member
kushagraThapar
left a comment
There was a problem hiding this comment.
LGTM, thanks @xinlian12
Address review feedback from tvaron3: extract the duplicated fire-and-forget DLQ reporter logic from CosmosWriterBase and SinkRecordTransformer into a shared DlqReportHelper utility. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Member
Author
|
/azp run java - cosmos - kafka |
|
Azure Pipelines successfully started running 1 pipeline(s). |
tvaron3
approved these changes
May 29, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
When a single
SinkRecordin a batch fails during transformation (ID generation or value conversion), the entire batch fails and all records are routed to the DLQ — not just the malformed record.Root cause:
SinkRecordTransformer.transform()lacked per-record error isolation. An exception fromidStrategy.generateId()orStructToJsonMap.toJsonMap()would abort the entiretransform()call before records reached the writer-level DLQ handling.Additionally,
CosmosWriterBase.sendToDlqIfConfigured()did not guard againstErrantRecordReporter.report()failures, meaning a DLQ error could mask the original write failure.Solution
ErrantRecordReporterif available, guarded against reporter failures per Kafka Connect best practices.ALLskips and continues,NONEthrows (regardless of whether DLQ reporter is present).CosmosWriterBase.sendToDlqIfConfigured()now guards against reporter failures too.ErrantRecordReporterToleranceOnErrorLevelALLNONEnullALLnullNONEALLNONEChanges
SinkRecordTransformer.javaErrantRecordReporterandToleranceOnErrorLevelfieldsErrantRecordReporter.report()against secondary failurescreateIdStrategy()static for constructor chainCosmosWriterBase.javaerrantRecordReporter.report()insendToDlqIfConfigured()so DLQ failures do not mask original errorsCosmosSinkTask.javaerrantRecordReporterandtoleranceOnErrorLeveltoSinkRecordTransformertransformedRecords.size()(post-filter) instead ofentry.getValue().size()(pre-filter)SinkRecordTransformerTest.java— 10 unit tests:Fixes #49268