From c4ee77636436ad51d88cfe3f8e7f28cd764a775b Mon Sep 17 00:00:00 2001 From: Georgios Kousouris Date: Fri, 18 Aug 2023 15:53:32 +0100 Subject: [PATCH 1/2] Fix control batch bug --- kafka/consumer/fetcher.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 99d11f274..cb355b20f 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -821,11 +821,12 @@ def _parse_fetched_data(self, completed_fetch): " offset %d to buffered record list", tp, position) unpacked = list(self._unpack_message_set(tp, records)) - parsed_records = self.PartitionRecords(fetch_offset, tp, unpacked) - last_offset = unpacked[-1].offset - self._sensors.records_fetch_lag.record(highwater - last_offset) - num_bytes = records.valid_bytes() - records_count = len(unpacked) + if unpacked: + parsed_records = self.PartitionRecords(fetch_offset, tp, unpacked) + last_offset = unpacked[-1].offset + self._sensors.records_fetch_lag.record(highwater - last_offset) + num_bytes = records.valid_bytes() + records_count = len(unpacked) elif records.size_in_bytes() > 0: # we did not read a single message from a non-empty # buffer because that message's size is larger than From 84f985b21bdb891913e1a896d82465c99cbb73c6 Mon Sep 17 00:00:00 2001 From: Georgios Kousouris Date: Mon, 21 Aug 2023 11:25:01 +0100 Subject: [PATCH 2/2] Bump version to post4 --- kafka/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/version.py b/kafka/version.py index c6af931a1..ccf7bda8c 100644 --- a/kafka/version.py +++ b/kafka/version.py @@ -1 +1 @@ -__version__ = '1.4.7.post3' +__version__ = '1.4.7.post4'