diff --git a/kafka/producer/transaction_manager.py b/kafka/producer/transaction_manager.py index b294a4e6c..3d327d7d6 100644 --- a/kafka/producer/transaction_manager.py +++ b/kafka/producer/transaction_manager.py @@ -231,6 +231,11 @@ def send_offsets_to_transaction(self, offsets, group_metadata): "send_offsets_to_transaction expects group_metadata to be a " "ConsumerGroupMetadata or a group_id str, got %r" % (type(group_metadata),)) + if group_metadata.generation_id > 0 and not group_metadata.member_id: + raise ValueError( + "Invalid ConsumerGroupMetadata: generation_id=%s implies a" + " joined group but member_id is empty" % (group_metadata.generation_id,)) + with self._lock: self._ensure_transactional() self._maybe_fail_with_error() @@ -1191,6 +1196,11 @@ def handle_response(self, response): # Java client normalizes INVALID_PRODUCER_EPOCH to PRODUCER_FENCED # on the txn-coordinator RPC paths (KIP-360). self.fatal_error(Errors.ProducerFencedError()) + elif error_type in (Errors.UnknownProducerIdError, Errors.InvalidProducerIdMappingError): + if self.transaction_manager._supports_epoch_bump(): + self.abortable_error(error_type()) + else: + self.fatal_error(error_type()) elif error_type is Errors.TransactionalIdAuthorizationFailedError: self.fatal_error(error_type()) elif error_type is Errors.GroupAuthorizationFailedError: @@ -1275,6 +1285,24 @@ def handle_response(self, response): # on the txn-coordinator RPC paths (KIP-360). self.fatal_error(Errors.ProducerFencedError()) return + elif error_type is Errors.FencedInstanceIdError: + # KIP-447: static-membership fencing - another consumer + # instance with this group_instance_id displaced ours. The + # transaction must be aborted, but the producer can be + # reused for a fresh transaction. + self.abortable_error(error_type()) + return + elif error_type in (Errors.IllegalGenerationError, + Errors.UnknownMemberIdError): + # KIP-447: the consumer generation / member_id we passed + # in are stale (the consumer rebalanced between when we + # snapshotted group_metadata and when the broker checked + # it). Abort the txn so the application can re-snapshot + # and retry. + self.abortable_error(Errors.CommitFailedError( + "Transaction offset commit failed due to consumer group" + " metadata mismatch: %s" % (error_type.__name__,))) + return elif error_type in (Errors.TransactionalIdAuthorizationFailedError, Errors.UnsupportedForMessageFormatError): self.fatal_error(error_type()) diff --git a/test/producer/test_transaction_manager_mock_broker.py b/test/producer/test_transaction_manager_mock_broker.py index 0d43de5a8..72c1941bd 100644 --- a/test/producer/test_transaction_manager_mock_broker.py +++ b/test/producer/test_transaction_manager_mock_broker.py @@ -931,6 +931,48 @@ def test_group_authz_failed_is_abortable(self, broker, client): assert isinstance(tm.last_error, Errors.GroupAuthorizationFailedError) assert handler._result.failed + @pytest.mark.parametrize("error", [ + Errors.UnknownProducerIdError, + Errors.InvalidProducerIdMappingError, + ]) + def test_unknown_producer_id_abortable_on_modern_broker( + self, broker, client, error): + """KIP-360: UNKNOWN_PRODUCER_ID / INVALID_PRODUCER_ID_MAPPING on a + broker that supports epoch bumping (>= 2.5) is abortable - the + application aborts the txn and retries; the producer epoch is + bumped under the hood. Matches Java's abortableErrorIfPossible.""" + tm = _make_manager(client, api_version=(2, 5)) + handler, _, _ = self._enqueue_add_offsets(tm) + + broker.respond(AddOffsetsToTxnResponse, + self._response(error_code=error.errno)) + _, future = _dispatch_next(client, tm) + _poll_for_future(client, future) + + assert tm._current_state == TransactionState.ABORTABLE_ERROR + assert isinstance(tm.last_error, error) + assert handler._result.failed + + @pytest.mark.parametrize("error", [ + Errors.UnknownProducerIdError, + Errors.InvalidProducerIdMappingError, + ]) + def test_unknown_producer_id_fatal_on_old_broker( + self, broker, client, error): + """On a broker < 2.5 there's no epoch-bump path, so these errors + remain fatal.""" + tm = _make_manager(client, api_version=(2, 4)) + handler, _, _ = self._enqueue_add_offsets(tm) + + broker.respond(AddOffsetsToTxnResponse, + self._response(error_code=error.errno)) + _, future = _dispatch_next(client, tm) + _poll_for_future(client, future) + + assert tm._current_state == TransactionState.FATAL_ERROR + assert isinstance(tm.last_error, error) + assert handler._result.failed + def test_unknown_error_is_fatal(self, broker, client): tm = _make_manager(client) handler, _, _ = self._enqueue_add_offsets(tm) @@ -1077,6 +1119,44 @@ def test_group_authz_failed_is_abortable(self, broker, client): assert isinstance(tm.last_error, Errors.GroupAuthorizationFailedError) assert handler._result.failed + def test_fenced_instance_id_is_abortable(self, broker, client): + """KIP-447: FENCED_INSTANCE_ID means another static-membership + instance displaced us - the transaction must abort, but the producer + can be reused (matches Java).""" + tm = _make_manager(client) + handler, tp = self._enqueue_offset_commit(tm) + broker.respond( + TxnOffsetCommitResponse, + self._response({(tp.topic, tp.partition): + Errors.FencedInstanceIdError.errno})) + _, future = _dispatch_next(client, tm) + _poll_for_future(client, future) + assert tm._current_state == TransactionState.ABORTABLE_ERROR + assert isinstance(tm.last_error, Errors.FencedInstanceIdError) + assert handler._result.failed + + @pytest.mark.parametrize("error", [ + Errors.IllegalGenerationError, + Errors.UnknownMemberIdError, + ]) + def test_consumer_group_metadata_mismatch_is_abortable( + self, broker, client, error): + """KIP-447: ILLEGAL_GENERATION / UNKNOWN_MEMBER_ID indicate the + consumer rebalanced between snapshot and commit. Abort the txn so + the app can re-snapshot and retry (matches Java's + CommitFailedException).""" + tm = _make_manager(client) + handler, tp = self._enqueue_offset_commit(tm) + broker.respond( + TxnOffsetCommitResponse, + self._response({(tp.topic, tp.partition): error.errno})) + _, future = _dispatch_next(client, tm) + _poll_for_future(client, future) + assert tm._current_state == TransactionState.ABORTABLE_ERROR + assert isinstance(tm.last_error, Errors.CommitFailedError) + assert error.__name__ in str(tm.last_error) + assert handler._result.failed + def test_unknown_partition_error_is_fatal(self, broker, client): tm = _make_manager(client) handler, tp = self._enqueue_offset_commit(tm) @@ -1211,6 +1291,18 @@ def test_send_offsets_to_transaction_rejects_garbage(self, broker, client): with pytest.raises(TypeError): tm.send_offsets_to_transaction(self._offsets(), 42) + def test_send_offsets_to_transaction_rejects_incoherent_metadata( + self, broker, client): + """Mirror Java's throwIfInvalidGroupMetadata: a generation_id > 0 + with an empty member_id is incoherent and should be rejected at + the API boundary rather than sent to the broker.""" + tm = _make_manager(client) + tm._current_state = TransactionState.IN_TRANSACTION + bad = ConsumerGroupMetadata(group_id='g', generation_id=5, + member_id='', group_instance_id=None) + with pytest.raises(ValueError, match='generation_id'): + tm.send_offsets_to_transaction(self._offsets(), bad) + def test_group_metadata_propagates_through_add_offsets_to_commit_handler( self, broker, client): """The AddOffsetsToTxn -> TxnOffsetCommit chain must preserve the