From a97960b3ace28132e027ea32166c2cde6fbc22bd Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 3 Jul 2025 08:41:28 -0700 Subject: [PATCH] KIP-654: Abort transaction with pending data with TransactionAbortedError --- kafka/errors.py | 4 ++++ kafka/producer/sender.py | 6 +++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/kafka/errors.py b/kafka/errors.py index 351e07375..dffa35f35 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -102,6 +102,10 @@ class UnsupportedCodecError(KafkaError): pass +class TransactionAbortedError(KafkaError): + pass + + class BrokerResponseError(KafkaError): errno = None message = None diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 09b9a0f10..d7855b03d 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -166,7 +166,11 @@ def run_once(self): self._client.poll(timeout_ms=self.config['retry_backoff_ms']) return elif self._transaction_manager.has_abortable_error(): - self._accumulator.abort_undrained_batches(self._transaction_manager.last_error) + # Attempt to get the last error that caused this abort. + # If there was no error, but we are still aborting, + # then this is most likely a case where there was no fatal error. + exception = self._transaction_manager.last_error or Errors.TransactionAbortedError() + self._accumulator.abort_undrained_batches(exception) except Errors.SaslAuthenticationFailedError as e: # This is already logged as error, but propagated here to perform any clean ups.