Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/flakey-test-repro.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# to maximize the chance of catching timing-sensitive flakes. Uploads the
# pytest log from any shard that catches a failure.
#
# Trigger from the Actions UI via "Run workflow" (workflow_dispatch only)
# Trigger from the Actions UI via "Run workflow" (workflow_dispatch only) -
# does NOT run on PRs or pushes, to avoid burning runner minutes on every
# commit. Defaults target the test_heartbeat_thread rejoin hang, but any
# pytest node id can be supplied.
Expand Down
2 changes: 1 addition & 1 deletion AUTHORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* Enrico Canzonieri, [@ecanzonieri](https://github.com/ecanzonieri)
* haosdent, [@haosdent](https://github.com/haosdent)
* Arturo Filastò, [@hellais](https://github.com/hellais)
* Job EversMeltzer, [@jobevers](https://github.com/jobevers)
* Job Evers-Meltzer, [@jobevers](https://github.com/jobevers)
* Martin Olveyra, [@kalessin](https://github.com/kalessin)
* Kubilay Kocak, [@koobs](https://github.com/koobs)
* Matthew L Daniel <mdaniel@gmail.com>
Expand Down
6 changes: 3 additions & 3 deletions docs/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ python -m kafka.consumer

.. code:: bash

python -m kafka.consumer --help
> python -m kafka.consumer --help
usage: python -m kafka.consumer [-h] -b BOOTSTRAP_SERVERS -t TOPICS -g GROUP [-c EXTRA_CONFIG] [-l LOG_LEVEL] [-f FORMAT] [--encoding ENCODING]

Kafka console consumer
Expand All @@ -40,7 +40,7 @@ python -m kafka.producer

.. code:: bash

python -m kafka.producer --help
> python -m kafka.producer --help
usage: python -m kafka.producer [-h] -b BOOTSTRAP_SERVERS -t TOPIC [-c EXTRA_CONFIG] [-l LOG_LEVEL] [--encoding ENCODING]

Kafka console producer
Expand All @@ -63,7 +63,7 @@ python -m kafka.admin

.. code:: bash

python -m kafka.admin --help
> python -m kafka.admin --help
usage: python -m kafka.admin [-h] -b BOOTSTRAP_SERVERS [-c EXTRA_CONFIG] [-l LOG_LEVEL] [-f FORMAT] {cluster,configs,log-dirs,topics,consumer-groups} ...

Kafka admin client
Expand Down
2 changes: 1 addition & 1 deletion kafka/admin/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""KafkaAdminClient high-level Kafka cluster administration."""
"""KafkaAdminClient - high-level Kafka cluster administration."""

import copy
import logging
Expand Down
8 changes: 4 additions & 4 deletions kafka/benchmarks/producer_encode_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""Benchmarks for the producer encode hot path.

Measures the cost of finalizing a record batch and encoding it into a
ProduceRequest the pipeline that runs on every send to the broker.
ProduceRequest - the pipeline that runs on every send to the broker.

To compare two implementations (e.g. before/after a change) run this
script twice and diff the output:
Expand Down Expand Up @@ -34,7 +34,7 @@

DEFAULT_RECORDS_PER_BATCH = 100
DEFAULT_VALUE_SIZE = 128
DEFAULT_BATCH_SIZE = 1 << 20 # 1 MiB big enough to hold the largest case
DEFAULT_BATCH_SIZE = 1 << 20 # 1 MiB - big enough to hold the largest case


def _build_unclosed_batch(num_records, value_size):
Expand All @@ -56,7 +56,7 @@ def _build_closed_batch(num_records, value_size):


def bench_build_and_close(loops, num_records, value_size):
"""Time build + close isolates the batch-finalization path.
"""Time build + close - isolates the batch-finalization path.

close() is fast enough that pyperf would need many thousands of loops
to measure it directly, so we include the build cost and rely on the
Expand Down Expand Up @@ -139,7 +139,7 @@ def report_allocations(num_records, value_size):
)
req.with_header(correlation_id=1, client_id='bench')

# Warmup populate the compiled-encoder cache so we don't count its
# Warmup - populate the compiled-encoder cache so we don't count its
# one-time allocations.
req.encode(framed=True, header=True)

Expand Down
2 changes: 1 addition & 1 deletion kafka/benchmarks/protocol_old_vs_new.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@


# ---------------------------------------------------------------------------
# Decode objects (Responses) pre-encode bytes from old system
# Decode objects (Responses) - pre-encode bytes from old system
# ---------------------------------------------------------------------------

OLD_APIVER_RESP_V0 = OldApiVersionsResp_v0(
Expand Down
2 changes: 1 addition & 1 deletion kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def fetch_records(self, max_records=None, update_offsets=True, timeout_ms=None):
for in-flight responses if no records are immediately available.

Single-call replacement for the legacy
``fetched_records send_fetches client.poll fetched_records``
``fetched_records -> send_fetches -> client.poll -> fetched_records``
loop in :meth:`KafkaConsumer._poll_once`. The caller no longer
drives the event loop; the wait happens inside this method via a
wakeup Future fired by any in-flight fetch's completion callback.
Expand Down
2 changes: 1 addition & 1 deletion kafka/net/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def _send_request(self, request, future=None, timeout_at=None):
# Write the current request's bytes before checking max_in_flight.
# Otherwise with max_in_flight=1, the first request would be added to
# in_flight_requests (len==1), trip the >= check, pause, and never be
# written to the transport hanging forever.
# written to the transport - hanging forever.
if not self.paused:
self.transport.write(self.parser.send_bytes())
if len(self.in_flight_requests) >= self.config['max_in_flight_requests_per_connection']:
Expand Down
2 changes: 1 addition & 1 deletion kafka/net/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ async def wait_for(self, future, timeout_ms):
"""Await `future` with a timeout in ms. Raises KafkaTimeoutError on timeout.

Must be awaited from a coroutine running on this loop. The underlying
future is not cancelled on timeout it continues to run; the timeout
future is not cancelled on timeout - it continues to run; the timeout
only unblocks the awaiter.
"""
if timeout_ms is None:
Expand Down
6 changes: 3 additions & 3 deletions kafka/partitioner/sticky.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Records with a non-None key are hashed to a partition just like
:class:`~kafka.partitioner.default.DefaultPartitioner`. Records with a
None key go to a *sticky* partition i.e. the same partition is reused
None key go to a *sticky* partition - i.e. the same partition is reused
for every null-key record on a topic until KafkaProducer signals that a
batch has been completed (via :meth:`StickyPartitioner.on_new_batch`),
at which point a different partition is picked.
Expand Down Expand Up @@ -77,7 +77,7 @@ def on_new_batch(self, topic, all_partitions, prev_partition):

The *first* event per sticky is absorbed silently: it
corresponds to the first batch ever being created on the
partition we just picked, which is expected we want
partition we just picked, which is expected - we want
subsequent records to keep landing there. The *second* event
means the previous batch filled up and a new one was opened;
that's the signal to rotate to a different partition so the
Expand All @@ -100,7 +100,7 @@ def _pick_sticky(self, topic, all_partitions, available, avoid=None):
candidates = [p for p in pool if p != avoid] if avoid is not None else pool
if not candidates:
# Single-partition topic, or only the avoid-partition is
# available no rotation possible.
# available - no rotation possible.
candidates = pool
partition = random.choice(candidates)
self._sticky[topic] = partition
Expand Down
2 changes: 1 addition & 1 deletion kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ def send_offsets_to_transaction(self, offsets, group_metadata):
offsets ({TopicPartition: OffsetAndMetadata}): map of topic-partition -> offsets to commit
as part of current transaction.
group_metadata (ConsumerGroupMetadata or str): full group metadata from
KafkaConsumer.group_metadata() (preferred enables broker-side fencing
KafkaConsumer.group_metadata() (preferred - enables broker-side fencing
of stale consumer instances per KIP-447 against Kafka 2.5+ brokers), or
a bare consumer_group_id str for backwards compatibility.

Expand Down
8 changes: 4 additions & 4 deletions kafka/producer/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ def _complete_batch(self, batch, partition_response):
# arrive after the broker already committed the original.
# DUPLICATE_SEQUENCE_NUMBER means the records were already
# written successfully; treat as success.
log.debug("%s: Received DUPLICATE_SEQUENCE_NUMBER for %s records already committed, treating as success",
log.debug("%s: Received DUPLICATE_SEQUENCE_NUMBER for %s - records already committed, treating as success",
str(self), batch.topic_partition)
error_code = 0

Expand Down Expand Up @@ -585,8 +585,8 @@ def _record_retries(self, batch):
def _can_retry(self, batch, error_cls):
"""
We can retry a send if the error is transient, the number of
attempts taken is fewer than the maximum allowed, and for the
idempotent producer the batch's producer id/epoch still matches
attempts taken is fewer than the maximum allowed, and - for the
idempotent producer - the batch's producer id/epoch still matches
ours. A mismatched producer id/epoch (e.g. after a reset or future
KIP-360 epoch bump) means retrying would violate idempotence.
"""
Expand Down Expand Up @@ -614,7 +614,7 @@ def _is_retention_based_unknown_producer_id(self, batch, error_cls, log_start_of
state was legitimately removed by retention, or because of actual
data loss. If the broker's log_start_offset is strictly greater than
the last offset we acknowledged for this partition, then the records
we previously wrote have been aged out the producer can safely
we previously wrote have been aged out - the producer can safely
reset its sequence to 0 and resume.
"""
if error_cls is not Errors.UnknownProducerIdError:
Expand Down
20 changes: 10 additions & 10 deletions kafka/producer/transaction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def __init__(self, transactional_id=None, transaction_timeout_ms=0, retry_backof
self._sequence_numbers = collections.defaultdict(lambda: 0)
# The offset of the last ack'd record for each partition. Used to
# distinguish retention-based UnknownProducerIdError (broker's
# log_start_offset > last_acked_offset safe to reset and retry)
# log_start_offset > last_acked_offset -> safe to reset and retry)
# from actual data loss. See KAFKA-5793.
self._last_acked_offset = {}

Expand Down Expand Up @@ -216,9 +216,9 @@ def send_offsets_to_transaction(self, offsets, group_metadata):
Arguments:
offsets ({TopicPartition: OffsetAndMetadata}): offsets to commit.
group_metadata (ConsumerGroupMetadata or str): full group metadata
from KafkaConsumer.group_metadata() (preferred enables
from KafkaConsumer.group_metadata() (preferred - enables
broker-side fencing per KIP-447), or a bare group_id string
for backwards compatibility (broker treats it as v0v2).
for backwards compatibility (broker treats it as v0-v2).

Returns:
FutureRecordMetadata-style Future that completes once the offsets
Expand Down Expand Up @@ -350,7 +350,7 @@ def classify_batch_error(self, error, batch, log_start_offset=-1):
"""Categorize a batch-completion error into a recovery outcome.

Used by the Sender to decide what to do with a failed batch. This
method does not mutate any state it is a pure classification
method does not mutate any state - it is a pure classification
helper. The caller is responsible for dispatching to the
appropriate recovery path.

Expand All @@ -362,15 +362,15 @@ def classify_batch_error(self, error, batch, log_start_offset=-1):
failure. Used for KAFKA-5793 retention detection.

Returns one of:
ERROR_CLASS_RETRIABLE caller should retry the batch
ERROR_CLASS_ABORTABLE transactional producer only;
ERROR_CLASS_RETRIABLE - caller should retry the batch
ERROR_CLASS_ABORTABLE - transactional producer only;
abort the transaction
ERROR_CLASS_FATAL unrecoverable; transition to
ERROR_CLASS_FATAL - unrecoverable; transition to
fatal error and fail the batch
ERROR_CLASS_NEEDS_EPOCH_BUMP recoverable via KIP-360 epoch
ERROR_CLASS_NEEDS_EPOCH_BUMP - recoverable via KIP-360 epoch
bump (only when broker supports
InitProducerIdRequest v3+)
ERROR_CLASS_NEEDS_PRODUCER_ID_RESET non-transactional pre-KIP-360
ERROR_CLASS_NEEDS_PRODUCER_ID_RESET - non-transactional pre-KIP-360
fallback: reset the
producer id entirely

Expand Down Expand Up @@ -1211,7 +1211,7 @@ def __init__(self, transaction_manager, group_metadata, offsets, result):
def _build_request(self):
# KIP-447: v3+ carries member_id / generation_id / group_instance_id
# so the broker can fence stale consumer instances. We always set them
# the protocol drops them when the connection negotiates v0-v2
# - the protocol drops them when the connection negotiates v0-v2
# against an older broker. max_version is the highest version this
# client knows how to drive: v4/v5 belong to KIP-890.
Topic = TxnOffsetCommitRequest.TxnOffsetCommitRequestTopic
Expand Down
6 changes: 3 additions & 3 deletions kafka/protocol/schemas/fields/codecs/encode_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def ensure(self, needed):

def result(self):
# Return a bytearray slice (one copy) rather than bytes(self.buf[:pos])
# (two copies the slice creates a bytearray, then bytes() copies
# (two copies - the slice creates a bytearray, then bytes() copies
# again). Downstream consumers (protocol codec slice assignment,
# socket.send) accept bytearray transparently.
return self.buf[:self.pos]
Expand All @@ -32,7 +32,7 @@ class EncodeBufferPool:
"""Thread-local pool of reusable EncodeBuffer objects.

Each thread gets its own buffer that grows to match the largest message
encoded on that thread and stays that size avoiding repeated allocation
encoded on that thread and stays that size - avoiding repeated allocation
of large bytearrays.

Usage:
Expand All @@ -58,7 +58,7 @@ def _get(cls):

@classmethod
def _release(cls, buf):
# Nothing to do the buffer stays on the thread-local.
# Nothing to do - the buffer stays on the thread-local.
# Future: could cap max size to prevent memory leaks from
# one-off large messages.
pass
Expand Down
4 changes: 2 additions & 2 deletions kafka/protocol/schemas/fields/codecs/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ class FixedCodec:
types. This prefix is applied here so subclasses only specify the type
format character (e.g., 'i' for 32-bit signed int).
"""
fmt = None # e.g., 'i' set by subclass
size = None # e.g., 4 set by subclass
fmt = None # e.g., 'i' - set by subclass
size = None # e.g., 4 - set by subclass
batchable = True # Can be batched with adjacent FixedCodec fields

def __init_subclass__(cls, **kw):
Expand Down
2 changes: 1 addition & 1 deletion kafka/protocol/schemas/fields/codegen.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Generate flat encode/decode functions for a StructField + version.

Given a StructField and a protocol version, generates Python functions
that encode/decode directly with zero dispatch overhead no intermediate
that encode/decode directly with zero dispatch overhead - no intermediate
SimpleField/ArrayField/StructField method calls.

Usage:
Expand Down
4 changes: 2 additions & 2 deletions kafka/protocol/schemas/fields/struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def emit_decode_from(self, ctx, var_name, indent, version=None, compact=False, t
break

if len(batch_fields) == 1:
# Single field no batching benefit
# Single field - no batching benefit
f = batch_fields[0]
v = ctx.next_var('val')
f.emit_decode_from(ctx, v, indent, version=version,
Expand Down Expand Up @@ -278,7 +278,7 @@ def emit_decode_from(self, ctx, var_name, indent, version=None, compact=False, t
ctx.emit(indent, 'pos += 1 # empty tagged fields')

def _emit_tagged_decode(self, ctx, var_name, indent, version):
"""Emit tagged fields decode falls back to method-based decode."""
"""Emit tagged fields decode - falls back to method-based decode."""
tf_var = ctx.next_var('tf')
ctx.globs[tf_var] = self.tagged_fields(version)
ctx.globs['_BytesIO'] = __import__('io').BytesIO
Expand Down
2 changes: 1 addition & 1 deletion test/admin/test_admin_concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def test_close_unblocks_pending_callers(broker):

async def blocker():
blocked.set()
# Wait forever only unblocks via manager.stop()
# Wait forever - only unblocks via manager.stop()
await manager._net.sleep(3600)

result = {}
Expand Down
2 changes: 1 addition & 1 deletion test/admin/test_admin_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def test_fills_in_other_modified_keys(self, broker, admin):
('bar', 'barval', _SRC_DYNAMIC_TOPIC, False),
('baz', None, _SRC_DEFAULT, False),
]))
# add_missing describe (modified filter) same wire response, Python filters
# add_missing describe (modified filter) - same wire response, Python filters
broker.respond(DescribeConfigsRequest, _describe_configs_response(
_TOPIC, 'topic-a', [
('foo', 'old', _SRC_DYNAMIC_TOPIC, False),
Expand Down
2 changes: 1 addition & 1 deletion test/consumer/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def test_context_manager_closes_on_exit():


def test_context_manager_suppresses_autocommit_on_exception():
# Verify the __exit__ close(autocommit=...) wiring. We don't need a
# Verify the __exit__ -> close(autocommit=...) wiring. We don't need a
# real coordinator for this; just check that an exception propagates and
# that close() is reached.
consumer = KafkaConsumer(api_version=(0, 10, 0))
Expand Down
12 changes: 6 additions & 6 deletions test/consumer/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def test_group_metadata_after_join(coordinator):
# group_instance_id comes from config (None by default for this fixture).
assert gm.group_instance_id is None

# Still returns the snapshot even while rebalancing the producer needs
# Still returns the snapshot even while rebalancing - the producer needs
# *something* to send and the broker handles fencing.
coordinator.state = MemberState.REBALANCING
assert coordinator.group_metadata().generation_id == 42
Expand Down Expand Up @@ -589,10 +589,10 @@ def test_send_offset_commit_request_fail(coordinator, offsets):
# selector. Default coordinator state has coordinator_id=None, so
# coordinator() returns None and the no-coordinator path fires.

# No offsets coroutine returns None
# No offsets - coroutine returns None
assert coordinator._net.run(coordinator._send_offset_commit_request, {}) is None

# No coordinator coroutine raises
# No coordinator - coroutine raises
with pytest.raises(Errors.CoordinatorNotAvailableError):
coordinator._net.run(coordinator._send_offset_commit_request, offsets)

Expand Down Expand Up @@ -731,10 +731,10 @@ def test_send_offset_fetch_request_fail(coordinator, partitions):
# _send_offset_fetch_request is an async coroutine; run it via the
# selector. Default coordinator state has coordinator_id=None.

# No partitions coroutine returns {}
# No partitions - coroutine returns {}
assert coordinator._net.run(coordinator._send_offset_fetch_request, []) == {}

# No coordinator coroutine raises
# No coordinator - coroutine raises
with pytest.raises(Errors.CoordinatorNotAvailableError):
coordinator._net.run(coordinator._send_offset_fetch_request, partitions)

Expand Down Expand Up @@ -1000,7 +1000,7 @@ def _sync_response_object(error_code=0, assignment=b''):

def test_do_join_and_sync_async_follower(request, broker, seeded_coord):
request.addfinalizer(lambda: setattr(seeded_coord, 'state', MemberState.UNJOINED))
# Default broker.broker_version=(4,2) JoinGroup v9, SyncGroup v5.
# Default broker.broker_version=(4,2) -> JoinGroup v9, SyncGroup v5.
# Follower: leader != our member_id.
broker.respond(JoinGroupRequest, _join_response_object(
leader='leader-x', member_id='member-1', members=[]))
Expand Down
Loading
Loading