Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed per-record error isolation in Kafka sink transformer to honor DLQ and tolerance settings, instead of failing the entire batch when a single record fails during transformation. - See [PR 49286](https://github.com/Azure/azure-sdk-for-java/pull/49286)
* Guarded `ErrantRecordReporter.report()` in `CosmosWriterBase` against secondary failures so DLQ errors do not mask original write failures. - See [PR 49286](https://github.com/Azure/azure-sdk-for-java/pull/49286)

#### Other Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ public void start(Map<String, String> props) {
this.sinkTaskConfig.getClientMetadataCachesSnapshot());
LOGGER.info("The taskId is " + this.sinkTaskConfig.getTaskId());
this.throughputControlClientItem = this.getThroughputControlCosmosClient();
this.sinkRecordTransformer = new SinkRecordTransformer(this.sinkTaskConfig);

this.sinkRecordTransformer = new SinkRecordTransformer(
this.sinkTaskConfig,
this.context.errantRecordReporter(),
this.sinkTaskConfig.getWriteConfig().getToleranceOnErrorLevel());
Comment thread
xinlian12 marked this conversation as resolved.

if (this.sinkTaskConfig.getWriteConfig().isBulkEnabled()) {
this.cosmosWriter =
Expand Down Expand Up @@ -129,7 +133,7 @@ record -> this.sinkTaskConfig
List<SinkRecord> transformedRecords = sinkRecordTransformer.transform(containerName, entry.getValue());
this.cosmosWriter.write(container, transformedRecords);

totalWrittenRecordsPerContainer.merge(containerName, (long) entry.getValue().size(), Long::sum);
totalWrittenRecordsPerContainer.merge(containerName, (long) transformedRecords.size(), Long::sum);
}

logWrittenRecordCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,10 @@ protected boolean shouldRetry(Throwable exception, int attemptedCount, int maxRe
}

protected void sendToDlqIfConfigured(SinkOperation sinkOperationContext) {
if (this.errantRecordReporter != null) {
errantRecordReporter.report(sinkOperationContext.getSinkRecord(), sinkOperationContext.getException());
}
DlqReportHelper.reportToDlqIfConfigured(
this.errantRecordReporter,
sinkOperationContext.getSinkRecord(),
sinkOperationContext.getException(),
LOGGER);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.kafka.connect.implementation.sink;

import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;

/**
* Shared helper for DLQ (Dead Letter Queue) reporting.
*
* <p>Both {@link CosmosWriterBase} and {@link SinkRecordTransformer} need fire-and-forget
* DLQ reporting that guards against reporter failures. This helper centralises that logic.
*/
final class DlqReportHelper {

private DlqReportHelper() {
}

/**
* Reports a failed record to the DLQ if a reporter is configured.
*
* <p>Per Kafka Connect best practices, DLQ reporting is a side-effect for observability —
* reporter failures are swallowed so they do not mask the original processing error.
*
* @param reporter the errant record reporter, may be {@code null}
* @param record the sink record that failed processing
* @param error the original processing error
* @param logger the caller's logger for error reporting
*/
static void reportToDlqIfConfigured(
ErrantRecordReporter reporter,
SinkRecord record,
Throwable error,
Logger logger) {

if (reporter == null) {
return;
}
try {
reporter.report(record, error);
} catch (Exception reportException) {
logger.error(
"Failed to report errant record to DLQ for topic {}, partition {}, offset {}.",
record.topic(),
record.kafkaPartition(),
record.kafkaOffset(),
reportException);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.azure.cosmos.kafka.connect.implementation.sink.idstrategy.ProvidedInValueStrategy;
import com.azure.cosmos.kafka.connect.implementation.sink.idstrategy.TemplateStrategy;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -24,9 +25,24 @@ public class SinkRecordTransformer {
private static final Logger LOGGER = LoggerFactory.getLogger(SinkRecordTransformer.class);

private final IdStrategy idStrategy;
private final ErrantRecordReporter errantRecordReporter;
private final ToleranceOnErrorLevel toleranceOnErrorLevel;

public SinkRecordTransformer(
CosmosSinkTaskConfig sinkTaskConfig,
ErrantRecordReporter errantRecordReporter,
ToleranceOnErrorLevel toleranceOnErrorLevel) {
this(createIdStrategy(sinkTaskConfig), errantRecordReporter, toleranceOnErrorLevel);
}

public SinkRecordTransformer(CosmosSinkTaskConfig sinkTaskConfig) {
this.idStrategy = this.createIdStrategy(sinkTaskConfig);
// Package-private constructor for unit testing without requiring CosmosSinkTaskConfig.
SinkRecordTransformer(
IdStrategy idStrategy,
ErrantRecordReporter errantRecordReporter,
ToleranceOnErrorLevel toleranceOnErrorLevel) {
this.idStrategy = idStrategy;
this.errantRecordReporter = errantRecordReporter;
this.toleranceOnErrorLevel = toleranceOnErrorLevel;
}

@SuppressWarnings("unchecked")
Expand All @@ -44,30 +60,55 @@ public List<SinkRecord> transform(String containerName, List<SinkRecord> sinkRec
record.value() == null ? null : record.value().getClass().getName(),
record.value() == null ? null : record.valueSchema());

Object recordValue;
if (record.value() instanceof Struct) {
recordValue = StructToJsonMap.toJsonMap((Struct) record.value());
} else if (record.value() instanceof Map) {
recordValue = StructToJsonMap.handleMap((Map<String, Object>) record.value());
} else {
recordValue = record.value();
try {
Object recordValue;
if (record.value() instanceof Struct) {
recordValue = StructToJsonMap.toJsonMap((Struct) record.value());
} else if (record.value() instanceof Map) {
recordValue = StructToJsonMap.handleMap((Map<String, Object>) record.value());
} else {
recordValue = record.value();
}

maybeInsertId(recordValue, record);

final SinkRecord updatedRecord = new SinkRecord(record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
record.valueSchema(),
recordValue,
record.kafkaOffset(),
record.timestamp(),
record.timestampType(),
record.headers());

toBeWrittenRecordList.add(updatedRecord);
} catch (RuntimeException e) {
// Report to DLQ if configured (fire-and-forget, guarded against reporter failures).
DlqReportHelper.reportToDlqIfConfigured(this.errantRecordReporter, record, e, LOGGER);

// Use tolerance level to decide continue-vs-throw.
if (this.toleranceOnErrorLevel == ToleranceOnErrorLevel.ALL) {
LOGGER.warn(
"Skipping record from topic {}, partition {}, offset {}, container {} due to transform error.",
record.topic(),
record.kafkaPartition(),
record.kafkaOffset(),
containerName,
e);
} else {
LOGGER.error(
"Failing task due to transform error for record from topic {}, partition {}, offset {}, "
+ "container {}.",
record.topic(),
record.kafkaPartition(),
record.kafkaOffset(),
containerName,
e);
throw e;
}
}

maybeInsertId(recordValue, record);

// Create an updated record with from the current record and the updated record value
final SinkRecord updatedRecord = new SinkRecord(record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
record.valueSchema(),
recordValue,
record.kafkaOffset(),
record.timestamp(),
record.timestampType(),
record.headers());

toBeWrittenRecordList.add(updatedRecord);
}

return toBeWrittenRecordList;
Expand All @@ -82,7 +123,7 @@ private void maybeInsertId(Object recordValue, SinkRecord sinkRecord) {
recordMap.put("id", this.idStrategy.generateId(sinkRecord));
}

private IdStrategy createIdStrategy(CosmosSinkTaskConfig sinkTaskConfig) {
private static IdStrategy createIdStrategy(CosmosSinkTaskConfig sinkTaskConfig) {
IdStrategy idStrategyClass;
switch (sinkTaskConfig.getIdStrategy()) {
case FULL_KEY_STRATEGY:
Expand Down
Loading
Loading