From 9ebda06012c98766b486ee525e87f085fd580998 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 14 Mar 2025 14:03:21 -0700 Subject: [PATCH] KafkaProducer: Flush pending records before close() --- kafka/producer/kafka.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index d5620075a..b97983a78 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -506,6 +506,7 @@ def __getattr__(self, name): assert timeout >= 0 log.info("Closing the Kafka producer with %s secs timeout.", timeout) + self.flush(timeout) invoked_from_callback = bool(threading.current_thread() is self._sender) if timeout > 0: if invoked_from_callback: