From 6a89b5376c42b26b9359b69407dcb12f70faa369 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 26 Feb 2025 15:00:36 -0800 Subject: [PATCH] Decode and skip transactional control records in consumer --- kafka/consumer/fetcher.py | 14 ++++++++-- kafka/record/default_records.py | 47 +++++++++++++++++++++++++++++++-- test/record/test_records.py | 24 +++++++++++++++++ 3 files changed, 81 insertions(+), 4 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index b544e4b0e..9dd4b84c9 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -456,10 +456,20 @@ def _unpack_message_set(self, tp, records): batch = records.next_batch() while batch is not None: - # LegacyRecordBatch cannot access either base_offset or last_offset_delta + # Try DefaultsRecordBatch / message log format v2 + # base_offset, last_offset_delta, and control batches try: self._subscriptions.assignment[tp].last_offset_from_message_batch = batch.base_offset + \ batch.last_offset_delta + # Control batches have a single record indicating whether a transaction + # was aborted or committed. + # When isolation_level is READ_COMMITTED (currently unsupported) + # we should also skip all messages from aborted transactions + # For now we only support READ_UNCOMMITTED and so we ignore the + # abort/commit signal. + if batch.is_control_batch: + batch = records.next_batch() + continue except AttributeError: pass @@ -674,7 +684,7 @@ def _create_fetch_requests(self): if next_offset_from_batch_header > self._subscriptions.assignment[partition].position: log.debug( "Advance position for partition %s from %s to %s (last message batch location plus one)" - " to correct for deleted compacted messages", + " to correct for deleted compacted messages and/or transactional control records", partition, self._subscriptions.assignment[partition].position, next_offset_from_batch_header) self._subscriptions.assignment[partition].position = next_offset_from_batch_header diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index a098c42a9..b3a6fd082 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -269,8 +269,12 @@ def _read_msg( "payload, but instead read {}".format(length, pos - start_pos)) self._pos = pos - return DefaultRecord( - offset, timestamp, self.timestamp_type, key, value, headers) + if self.is_control_batch: + return ControlRecord( + offset, timestamp, self.timestamp_type, key, value, headers) + else: + return DefaultRecord( + offset, timestamp, self.timestamp_type, key, value, headers) def __iter__(self): self._maybe_uncompress() @@ -362,6 +366,45 @@ def __repr__(self): ) +class ControlRecord(DefaultRecord): + __slots__ = ("_offset", "_timestamp", "_timestamp_type", "_key", "_value", + "_headers", "_version", "_type") + + KEY_STRUCT = struct.Struct( + ">h" # Current Version => Int16 + "h" # Type => Int16 (0 indicates an abort marker, 1 indicates a commit) + ) + + def __init__(self, offset, timestamp, timestamp_type, key, value, headers): + super(ControlRecord, self).__init__(offset, timestamp, timestamp_type, key, value, headers) + (self._version, self._type) = self.KEY_STRUCT.unpack(self._key) + + # see https://kafka.apache.org/documentation/#controlbatch + @property + def version(self): + return self._version + + @property + def type(self): + return self._type + + @property + def abort(self): + return self._type == 0 + + @property + def commit(self): + return self._type == 1 + + def __repr__(self): + return ( + "ControlRecord(offset={!r}, timestamp={!r}, timestamp_type={!r}," + " version={!r}, type={!r} <{!s}>)".format( + self._offset, self._timestamp, self._timestamp_type, + self._version, self._type, "abort" if self.abort else "commit") + ) + + class DefaultRecordBatchBuilder(DefaultRecordBase, ABCRecordBatchBuilder): # excluding key, value and headers: diff --git a/test/record/test_records.py b/test/record/test_records.py index 5ed22d816..cab95922d 100644 --- a/test/record/test_records.py +++ b/test/record/test_records.py @@ -60,6 +60,15 @@ b'\x00\xff\xff\xff\xff\x00\x00\x00\x03123' ] +# Single record control batch (abort) +control_batch_data_v2 = [ + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00R\x00\x00\x00\x00' + b'\x02e\x97\xff\xd0\x00\x20\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x98\x96\x7f\x00\x00\x00\x00\x00\x98\x96' + b'\x7f\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff' + b'\x00\x00\x00\x01@\x00\x00\x00\x08\x00\x00\x00\x00,opaque-control-message\x00' +] + def test_memory_records_v2(): data_bytes = b"".join(record_batch_data_v2) + b"\x00" * 4 @@ -230,3 +239,18 @@ def test_memory_records_builder_full(magic, compression_type): key=None, timestamp=None, value=b"M") assert metadata is None assert builder.next_offset() == 1 + + +def test_control_record_v2(): + data_bytes = b"".join(control_batch_data_v2) + records = MemoryRecords(data_bytes) + + assert records.has_next() is True + batch = records.next_batch() + assert batch.is_control_batch is True + recs = list(batch) + assert len(recs) == 1 + assert recs[0].version == 0 + assert recs[0].type == 0 + assert recs[0].abort is True + assert recs[0].commit is False