diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index f13c21b9f..4f08b8c08 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -68,17 +68,19 @@ def try_append(self, timestamp_ms, key, value, headers): sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1) return future - def done(self, base_offset=None, timestamp_ms=None, exception=None, log_start_offset=None, global_error=None): - level = logging.DEBUG if exception is None else logging.WARNING - log.log(level, "Produced messages to topic-partition %s with base offset" - " %s log start offset %s and error %s.", self.topic_partition, base_offset, - log_start_offset, global_error) # trace + def done(self, base_offset=None, timestamp_ms=None, exception=None, log_start_offset=None): if self.produce_future.is_done: log.warning('Batch is already closed -- ignoring batch.done()') return elif exception is None: + log.debug("Produced messages to topic-partition %s with base offset" + " %s log start offset %s.", self.topic_partition, base_offset, + log_start_offset) # trace self.produce_future.success((base_offset, timestamp_ms, log_start_offset)) else: + log.warning("Failed to produce messages to topic-partition %s with base offset" + " %s log start offset %s and error %s.", self.topic_partition, base_offset, + log_start_offset, exception) # trace self.produce_future.failure(exception) def maybe_expire(self, request_timeout_ms, retry_backoff_ms, linger_ms, is_full): @@ -109,7 +111,7 @@ def maybe_expire(self, request_timeout_ms, retry_backoff_ms, linger_ms, is_full) if error: self.records.close() - self.done(-1, None, Errors.KafkaTimeoutError( + self.done(base_offset=-1, exception=Errors.KafkaTimeoutError( "Batch for %s containing %s record(s) expired: %s" % ( self.topic_partition, self.records.next_offset(), error))) return True diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 3dd52ba76..0e2ea577e 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -182,7 +182,7 @@ def add_topic(self, topic): def _failed_produce(self, batches, node_id, error): log.error("Error sending produce request to node %d: %s", node_id, error) # trace for batch in batches: - self._complete_batch(batch, error, -1, None) + self._complete_batch(batch, error, -1) def _handle_produce_response(self, node_id, send_time, batches, response): """Handle a produce response.""" @@ -194,7 +194,6 @@ def _handle_produce_response(self, node_id, send_time, batches, response): for topic, partitions in response.topics: for partition_info in partitions: - global_error = None log_start_offset = None if response.API_VERSION < 2: partition, error_code, offset = partition_info @@ -204,19 +203,19 @@ def _handle_produce_response(self, node_id, send_time, batches, response): elif 5 <= response.API_VERSION <= 7: partition, error_code, offset, ts, log_start_offset = partition_info else: - # the ignored parameter is record_error of type list[(batch_index: int, error_message: str)] - partition, error_code, offset, ts, log_start_offset, _, global_error = partition_info + # Currently unused / TODO: KIP-467 + partition, error_code, offset, ts, log_start_offset, _record_errors, _global_error = partition_info tp = TopicPartition(topic, partition) error = Errors.for_code(error_code) batch = batches_by_partition[tp] - self._complete_batch(batch, error, offset, ts, log_start_offset, global_error) + self._complete_batch(batch, error, offset, timestamp_ms=ts, log_start_offset=log_start_offset) else: # this is the acks = 0 case, just complete all requests for batch in batches: - self._complete_batch(batch, None, -1, None) + self._complete_batch(batch, None, -1) - def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_start_offset=None, global_error=None): + def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_start_offset=None): """Complete or retry the given batch of records. Arguments: @@ -224,8 +223,7 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star error (Exception): The error (or None if none) base_offset (int): The base offset assigned to the records if successful timestamp_ms (int, optional): The timestamp returned by the broker for this batch - log_start_offset (int): The start offset of the log at the time this produce response was created - global_error (str): The summarising error message + log_start_offset (int, optional): The start offset of the log at the time this produce response was created """ # Standardize no-error to None if error is Errors.NoError: @@ -237,7 +235,7 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star " retrying (%d attempts left). Error: %s", batch.topic_partition, self.config['retries'] - batch.attempts - 1, - global_error or error) + error) self._accumulator.reenqueue(batch) self._sensors.record_retries(batch.topic_partition.topic, batch.record_count) else: @@ -245,7 +243,7 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star error = error(batch.topic_partition.topic) # tell the user the result of their request - batch.done(base_offset, timestamp_ms, error, log_start_offset, global_error) + batch.done(base_offset, timestamp_ms, error, log_start_offset) self._accumulator.deallocate(batch) if error is not None: self._sensors.record_errors(batch.topic_partition.topic, batch.record_count)