diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 5434c36a2..99d11f274 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -464,6 +464,13 @@ def _unpack_message_set(self, tp, records): except AttributeError: pass + # Control messages are used to enable transactions in Kafka and are generated by the + # broker. Clients should not return control batches (ie. those with this bit set) to + # applications. (since 0.11.0.0) + if getattr(batch, "is_control_batch", False): + batch = records.next_batch() + continue + for record in batch: key_size = len(record.key) if record.key is not None else -1 value_size = len(record.value) if record.value is not None else -1 diff --git a/kafka/version.py b/kafka/version.py index 013f2b967..c6af931a1 100644 --- a/kafka/version.py +++ b/kafka/version.py @@ -1 +1 @@ -__version__ = '1.4.7.post2' +__version__ = '1.4.7.post3'