diff --git a/.github/workflows/flakey-test-repro.yml b/.github/workflows/flakey-test-repro.yml index 820169ded..9ca875caf 100644 --- a/.github/workflows/flakey-test-repro.yml +++ b/.github/workflows/flakey-test-repro.yml @@ -4,7 +4,7 @@ # to maximize the chance of catching timing-sensitive flakes. Uploads the # pytest log from any shard that catches a failure. # -# Trigger from the Actions UI via "Run workflow" (workflow_dispatch only) — +# Trigger from the Actions UI via "Run workflow" (workflow_dispatch only) - # does NOT run on PRs or pushes, to avoid burning runner minutes on every # commit. Defaults target the test_heartbeat_thread rejoin hang, but any # pytest node id can be supplied. diff --git a/AUTHORS.md b/AUTHORS.md index 7d44efd6e..f5ed3d19a 100644 --- a/AUTHORS.md +++ b/AUTHORS.md @@ -14,7 +14,7 @@ * Enrico Canzonieri, [@ecanzonieri](https://github.com/ecanzonieri) * haosdent, [@haosdent](https://github.com/haosdent) * Arturo Filastò, [@hellais](https://github.com/hellais) -* Job Evers‐Meltzer, [@jobevers](https://github.com/jobevers) +* Job Evers-Meltzer, [@jobevers](https://github.com/jobevers) * Martin Olveyra, [@kalessin](https://github.com/kalessin) * Kubilay Kocak, [@koobs](https://github.com/koobs) * Matthew L Daniel diff --git a/docs/usage.rst b/docs/usage.rst index ee1ac7371..f493ce6bf 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -13,7 +13,7 @@ python -m kafka.consumer .. code:: bash - ❯ python -m kafka.consumer --help + > python -m kafka.consumer --help usage: python -m kafka.consumer [-h] -b BOOTSTRAP_SERVERS -t TOPICS -g GROUP [-c EXTRA_CONFIG] [-l LOG_LEVEL] [-f FORMAT] [--encoding ENCODING] Kafka console consumer @@ -40,7 +40,7 @@ python -m kafka.producer .. code:: bash - ❯ python -m kafka.producer --help + > python -m kafka.producer --help usage: python -m kafka.producer [-h] -b BOOTSTRAP_SERVERS -t TOPIC [-c EXTRA_CONFIG] [-l LOG_LEVEL] [--encoding ENCODING] Kafka console producer @@ -63,7 +63,7 @@ python -m kafka.admin .. code:: bash - ❯ python -m kafka.admin --help + > python -m kafka.admin --help usage: python -m kafka.admin [-h] -b BOOTSTRAP_SERVERS [-c EXTRA_CONFIG] [-l LOG_LEVEL] [-f FORMAT] {cluster,configs,log-dirs,topics,consumer-groups} ... Kafka admin client diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 02e713c10..4b26e22a1 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -1,4 +1,4 @@ -"""KafkaAdminClient — high-level Kafka cluster administration.""" +"""KafkaAdminClient - high-level Kafka cluster administration.""" import copy import logging diff --git a/kafka/benchmarks/producer_encode_path.py b/kafka/benchmarks/producer_encode_path.py index 61d9f87f7..bdf50851f 100644 --- a/kafka/benchmarks/producer_encode_path.py +++ b/kafka/benchmarks/producer_encode_path.py @@ -2,7 +2,7 @@ """Benchmarks for the producer encode hot path. Measures the cost of finalizing a record batch and encoding it into a -ProduceRequest — the pipeline that runs on every send to the broker. +ProduceRequest - the pipeline that runs on every send to the broker. To compare two implementations (e.g. before/after a change) run this script twice and diff the output: @@ -34,7 +34,7 @@ DEFAULT_RECORDS_PER_BATCH = 100 DEFAULT_VALUE_SIZE = 128 -DEFAULT_BATCH_SIZE = 1 << 20 # 1 MiB — big enough to hold the largest case +DEFAULT_BATCH_SIZE = 1 << 20 # 1 MiB - big enough to hold the largest case def _build_unclosed_batch(num_records, value_size): @@ -56,7 +56,7 @@ def _build_closed_batch(num_records, value_size): def bench_build_and_close(loops, num_records, value_size): - """Time build + close — isolates the batch-finalization path. + """Time build + close - isolates the batch-finalization path. close() is fast enough that pyperf would need many thousands of loops to measure it directly, so we include the build cost and rely on the @@ -139,7 +139,7 @@ def report_allocations(num_records, value_size): ) req.with_header(correlation_id=1, client_id='bench') - # Warmup — populate the compiled-encoder cache so we don't count its + # Warmup - populate the compiled-encoder cache so we don't count its # one-time allocations. req.encode(framed=True, header=True) diff --git a/kafka/benchmarks/protocol_old_vs_new.py b/kafka/benchmarks/protocol_old_vs_new.py index c4d451b83..58a7ef0a1 100644 --- a/kafka/benchmarks/protocol_old_vs_new.py +++ b/kafka/benchmarks/protocol_old_vs_new.py @@ -204,7 +204,7 @@ # --------------------------------------------------------------------------- -# Decode objects (Responses) — pre-encode bytes from old system +# Decode objects (Responses) - pre-encode bytes from old system # --------------------------------------------------------------------------- OLD_APIVER_RESP_V0 = OldApiVersionsResp_v0( diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 2a7d27879..947b0a7e0 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -160,7 +160,7 @@ def fetch_records(self, max_records=None, update_offsets=True, timeout_ms=None): for in-flight responses if no records are immediately available. Single-call replacement for the legacy - ``fetched_records → send_fetches → client.poll → fetched_records`` + ``fetched_records -> send_fetches -> client.poll -> fetched_records`` loop in :meth:`KafkaConsumer._poll_once`. The caller no longer drives the event loop; the wait happens inside this method via a wakeup Future fired by any in-flight fetch's completion callback. diff --git a/kafka/net/connection.py b/kafka/net/connection.py index 838a6ffb2..71434adb0 100644 --- a/kafka/net/connection.py +++ b/kafka/net/connection.py @@ -158,7 +158,7 @@ def _send_request(self, request, future=None, timeout_at=None): # Write the current request's bytes before checking max_in_flight. # Otherwise with max_in_flight=1, the first request would be added to # in_flight_requests (len==1), trip the >= check, pause, and never be - # written to the transport — hanging forever. + # written to the transport - hanging forever. if not self.paused: self.transport.write(self.parser.send_bytes()) if len(self.in_flight_requests) >= self.config['max_in_flight_requests_per_connection']: diff --git a/kafka/net/manager.py b/kafka/net/manager.py index faf8a7881..0b4bdc741 100644 --- a/kafka/net/manager.py +++ b/kafka/net/manager.py @@ -349,7 +349,7 @@ async def wait_for(self, future, timeout_ms): """Await `future` with a timeout in ms. Raises KafkaTimeoutError on timeout. Must be awaited from a coroutine running on this loop. The underlying - future is not cancelled on timeout — it continues to run; the timeout + future is not cancelled on timeout - it continues to run; the timeout only unblocks the awaiter. """ if timeout_ms is None: diff --git a/kafka/partitioner/sticky.py b/kafka/partitioner/sticky.py index a9e4cec18..cdda61101 100644 --- a/kafka/partitioner/sticky.py +++ b/kafka/partitioner/sticky.py @@ -2,7 +2,7 @@ Records with a non-None key are hashed to a partition just like :class:`~kafka.partitioner.default.DefaultPartitioner`. Records with a -None key go to a *sticky* partition — i.e. the same partition is reused +None key go to a *sticky* partition - i.e. the same partition is reused for every null-key record on a topic until KafkaProducer signals that a batch has been completed (via :meth:`StickyPartitioner.on_new_batch`), at which point a different partition is picked. @@ -77,7 +77,7 @@ def on_new_batch(self, topic, all_partitions, prev_partition): The *first* event per sticky is absorbed silently: it corresponds to the first batch ever being created on the - partition we just picked, which is expected — we want + partition we just picked, which is expected - we want subsequent records to keep landing there. The *second* event means the previous batch filled up and a new one was opened; that's the signal to rotate to a different partition so the @@ -100,7 +100,7 @@ def _pick_sticky(self, topic, all_partitions, available, avoid=None): candidates = [p for p in pool if p != avoid] if avoid is not None else pool if not candidates: # Single-partition topic, or only the avoid-partition is - # available — no rotation possible. + # available - no rotation possible. candidates = pool partition = random.choice(candidates) self._sticky[topic] = partition diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 441dd2317..e74d061bf 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -751,7 +751,7 @@ def send_offsets_to_transaction(self, offsets, group_metadata): offsets ({TopicPartition: OffsetAndMetadata}): map of topic-partition -> offsets to commit as part of current transaction. group_metadata (ConsumerGroupMetadata or str): full group metadata from - KafkaConsumer.group_metadata() (preferred — enables broker-side fencing + KafkaConsumer.group_metadata() (preferred - enables broker-side fencing of stale consumer instances per KIP-447 against Kafka 2.5+ brokers), or a bare consumer_group_id str for backwards compatibility. diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index e13682dfa..71872208b 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -432,7 +432,7 @@ def _complete_batch(self, batch, partition_response): # arrive after the broker already committed the original. # DUPLICATE_SEQUENCE_NUMBER means the records were already # written successfully; treat as success. - log.debug("%s: Received DUPLICATE_SEQUENCE_NUMBER for %s — records already committed, treating as success", + log.debug("%s: Received DUPLICATE_SEQUENCE_NUMBER for %s - records already committed, treating as success", str(self), batch.topic_partition) error_code = 0 @@ -585,8 +585,8 @@ def _record_retries(self, batch): def _can_retry(self, batch, error_cls): """ We can retry a send if the error is transient, the number of - attempts taken is fewer than the maximum allowed, and — for the - idempotent producer — the batch's producer id/epoch still matches + attempts taken is fewer than the maximum allowed, and - for the + idempotent producer - the batch's producer id/epoch still matches ours. A mismatched producer id/epoch (e.g. after a reset or future KIP-360 epoch bump) means retrying would violate idempotence. """ @@ -614,7 +614,7 @@ def _is_retention_based_unknown_producer_id(self, batch, error_cls, log_start_of state was legitimately removed by retention, or because of actual data loss. If the broker's log_start_offset is strictly greater than the last offset we acknowledged for this partition, then the records - we previously wrote have been aged out — the producer can safely + we previously wrote have been aged out - the producer can safely reset its sequence to 0 and resume. """ if error_cls is not Errors.UnknownProducerIdError: diff --git a/kafka/producer/transaction_manager.py b/kafka/producer/transaction_manager.py index 40f4df9e1..b294a4e6c 100644 --- a/kafka/producer/transaction_manager.py +++ b/kafka/producer/transaction_manager.py @@ -120,7 +120,7 @@ def __init__(self, transactional_id=None, transaction_timeout_ms=0, retry_backof self._sequence_numbers = collections.defaultdict(lambda: 0) # The offset of the last ack'd record for each partition. Used to # distinguish retention-based UnknownProducerIdError (broker's - # log_start_offset > last_acked_offset → safe to reset and retry) + # log_start_offset > last_acked_offset -> safe to reset and retry) # from actual data loss. See KAFKA-5793. self._last_acked_offset = {} @@ -216,9 +216,9 @@ def send_offsets_to_transaction(self, offsets, group_metadata): Arguments: offsets ({TopicPartition: OffsetAndMetadata}): offsets to commit. group_metadata (ConsumerGroupMetadata or str): full group metadata - from KafkaConsumer.group_metadata() (preferred — enables + from KafkaConsumer.group_metadata() (preferred - enables broker-side fencing per KIP-447), or a bare group_id string - for backwards compatibility (broker treats it as v0–v2). + for backwards compatibility (broker treats it as v0-v2). Returns: FutureRecordMetadata-style Future that completes once the offsets @@ -350,7 +350,7 @@ def classify_batch_error(self, error, batch, log_start_offset=-1): """Categorize a batch-completion error into a recovery outcome. Used by the Sender to decide what to do with a failed batch. This - method does not mutate any state — it is a pure classification + method does not mutate any state - it is a pure classification helper. The caller is responsible for dispatching to the appropriate recovery path. @@ -362,15 +362,15 @@ def classify_batch_error(self, error, batch, log_start_offset=-1): failure. Used for KAFKA-5793 retention detection. Returns one of: - ERROR_CLASS_RETRIABLE — caller should retry the batch - ERROR_CLASS_ABORTABLE — transactional producer only; + ERROR_CLASS_RETRIABLE - caller should retry the batch + ERROR_CLASS_ABORTABLE - transactional producer only; abort the transaction - ERROR_CLASS_FATAL — unrecoverable; transition to + ERROR_CLASS_FATAL - unrecoverable; transition to fatal error and fail the batch - ERROR_CLASS_NEEDS_EPOCH_BUMP — recoverable via KIP-360 epoch + ERROR_CLASS_NEEDS_EPOCH_BUMP - recoverable via KIP-360 epoch bump (only when broker supports InitProducerIdRequest v3+) - ERROR_CLASS_NEEDS_PRODUCER_ID_RESET — non-transactional pre-KIP-360 + ERROR_CLASS_NEEDS_PRODUCER_ID_RESET - non-transactional pre-KIP-360 fallback: reset the producer id entirely @@ -1211,7 +1211,7 @@ def __init__(self, transaction_manager, group_metadata, offsets, result): def _build_request(self): # KIP-447: v3+ carries member_id / generation_id / group_instance_id # so the broker can fence stale consumer instances. We always set them - # — the protocol drops them when the connection negotiates v0-v2 + # - the protocol drops them when the connection negotiates v0-v2 # against an older broker. max_version is the highest version this # client knows how to drive: v4/v5 belong to KIP-890. Topic = TxnOffsetCommitRequest.TxnOffsetCommitRequestTopic diff --git a/kafka/protocol/schemas/fields/codecs/encode_buffer.py b/kafka/protocol/schemas/fields/codecs/encode_buffer.py index 5db2b12ea..3230c4025 100644 --- a/kafka/protocol/schemas/fields/codecs/encode_buffer.py +++ b/kafka/protocol/schemas/fields/codecs/encode_buffer.py @@ -22,7 +22,7 @@ def ensure(self, needed): def result(self): # Return a bytearray slice (one copy) rather than bytes(self.buf[:pos]) - # (two copies — the slice creates a bytearray, then bytes() copies + # (two copies - the slice creates a bytearray, then bytes() copies # again). Downstream consumers (protocol codec slice assignment, # socket.send) accept bytearray transparently. return self.buf[:self.pos] @@ -32,7 +32,7 @@ class EncodeBufferPool: """Thread-local pool of reusable EncodeBuffer objects. Each thread gets its own buffer that grows to match the largest message - encoded on that thread and stays that size — avoiding repeated allocation + encoded on that thread and stays that size - avoiding repeated allocation of large bytearrays. Usage: @@ -58,7 +58,7 @@ def _get(cls): @classmethod def _release(cls, buf): - # Nothing to do — the buffer stays on the thread-local. + # Nothing to do - the buffer stays on the thread-local. # Future: could cap max size to prevent memory leaks from # one-off large messages. pass diff --git a/kafka/protocol/schemas/fields/codecs/types.py b/kafka/protocol/schemas/fields/codecs/types.py index 8deca2e6f..e423cd02a 100644 --- a/kafka/protocol/schemas/fields/codecs/types.py +++ b/kafka/protocol/schemas/fields/codecs/types.py @@ -11,8 +11,8 @@ class FixedCodec: types. This prefix is applied here so subclasses only specify the type format character (e.g., 'i' for 32-bit signed int). """ - fmt = None # e.g., 'i' — set by subclass - size = None # e.g., 4 — set by subclass + fmt = None # e.g., 'i' - set by subclass + size = None # e.g., 4 - set by subclass batchable = True # Can be batched with adjacent FixedCodec fields def __init_subclass__(cls, **kw): diff --git a/kafka/protocol/schemas/fields/codegen.py b/kafka/protocol/schemas/fields/codegen.py index e6eaf1f09..911869e0c 100644 --- a/kafka/protocol/schemas/fields/codegen.py +++ b/kafka/protocol/schemas/fields/codegen.py @@ -1,7 +1,7 @@ """Generate flat encode/decode functions for a StructField + version. Given a StructField and a protocol version, generates Python functions -that encode/decode directly with zero dispatch overhead — no intermediate +that encode/decode directly with zero dispatch overhead - no intermediate SimpleField/ArrayField/StructField method calls. Usage: diff --git a/kafka/protocol/schemas/fields/struct.py b/kafka/protocol/schemas/fields/struct.py index 5e9264692..84e5db6b8 100644 --- a/kafka/protocol/schemas/fields/struct.py +++ b/kafka/protocol/schemas/fields/struct.py @@ -241,7 +241,7 @@ def emit_decode_from(self, ctx, var_name, indent, version=None, compact=False, t break if len(batch_fields) == 1: - # Single field — no batching benefit + # Single field - no batching benefit f = batch_fields[0] v = ctx.next_var('val') f.emit_decode_from(ctx, v, indent, version=version, @@ -278,7 +278,7 @@ def emit_decode_from(self, ctx, var_name, indent, version=None, compact=False, t ctx.emit(indent, 'pos += 1 # empty tagged fields') def _emit_tagged_decode(self, ctx, var_name, indent, version): - """Emit tagged fields decode — falls back to method-based decode.""" + """Emit tagged fields decode - falls back to method-based decode.""" tf_var = ctx.next_var('tf') ctx.globs[tf_var] = self.tagged_fields(version) ctx.globs['_BytesIO'] = __import__('io').BytesIO diff --git a/test/admin/test_admin_concurrent.py b/test/admin/test_admin_concurrent.py index 377a5b230..1a5a5d1ef 100644 --- a/test/admin/test_admin_concurrent.py +++ b/test/admin/test_admin_concurrent.py @@ -74,7 +74,7 @@ def test_close_unblocks_pending_callers(broker): async def blocker(): blocked.set() - # Wait forever — only unblocks via manager.stop() + # Wait forever - only unblocks via manager.stop() await manager._net.sleep(3600) result = {} diff --git a/test/admin/test_admin_configs.py b/test/admin/test_admin_configs.py index 0fd059454..cbd1806aa 100644 --- a/test/admin/test_admin_configs.py +++ b/test/admin/test_admin_configs.py @@ -124,7 +124,7 @@ def test_fills_in_other_modified_keys(self, broker, admin): ('bar', 'barval', _SRC_DYNAMIC_TOPIC, False), ('baz', None, _SRC_DEFAULT, False), ])) - # add_missing describe (modified filter) — same wire response, Python filters + # add_missing describe (modified filter) - same wire response, Python filters broker.respond(DescribeConfigsRequest, _describe_configs_response( _TOPIC, 'topic-a', [ ('foo', 'old', _SRC_DYNAMIC_TOPIC, False), diff --git a/test/consumer/test_consumer.py b/test/consumer/test_consumer.py index b96f6b75a..7f11ace91 100644 --- a/test/consumer/test_consumer.py +++ b/test/consumer/test_consumer.py @@ -60,7 +60,7 @@ def test_context_manager_closes_on_exit(): def test_context_manager_suppresses_autocommit_on_exception(): - # Verify the __exit__ → close(autocommit=...) wiring. We don't need a + # Verify the __exit__ -> close(autocommit=...) wiring. We don't need a # real coordinator for this; just check that an exception propagates and # that close() is reached. consumer = KafkaConsumer(api_version=(0, 10, 0)) diff --git a/test/consumer/test_coordinator.py b/test/consumer/test_coordinator.py index 0c9acb5bc..431d163f4 100644 --- a/test/consumer/test_coordinator.py +++ b/test/consumer/test_coordinator.py @@ -85,7 +85,7 @@ def test_group_metadata_after_join(coordinator): # group_instance_id comes from config (None by default for this fixture). assert gm.group_instance_id is None - # Still returns the snapshot even while rebalancing — the producer needs + # Still returns the snapshot even while rebalancing - the producer needs # *something* to send and the broker handles fencing. coordinator.state = MemberState.REBALANCING assert coordinator.group_metadata().generation_id == 42 @@ -589,10 +589,10 @@ def test_send_offset_commit_request_fail(coordinator, offsets): # selector. Default coordinator state has coordinator_id=None, so # coordinator() returns None and the no-coordinator path fires. - # No offsets — coroutine returns None + # No offsets - coroutine returns None assert coordinator._net.run(coordinator._send_offset_commit_request, {}) is None - # No coordinator — coroutine raises + # No coordinator - coroutine raises with pytest.raises(Errors.CoordinatorNotAvailableError): coordinator._net.run(coordinator._send_offset_commit_request, offsets) @@ -731,10 +731,10 @@ def test_send_offset_fetch_request_fail(coordinator, partitions): # _send_offset_fetch_request is an async coroutine; run it via the # selector. Default coordinator state has coordinator_id=None. - # No partitions — coroutine returns {} + # No partitions - coroutine returns {} assert coordinator._net.run(coordinator._send_offset_fetch_request, []) == {} - # No coordinator — coroutine raises + # No coordinator - coroutine raises with pytest.raises(Errors.CoordinatorNotAvailableError): coordinator._net.run(coordinator._send_offset_fetch_request, partitions) @@ -1000,7 +1000,7 @@ def _sync_response_object(error_code=0, assignment=b''): def test_do_join_and_sync_async_follower(request, broker, seeded_coord): request.addfinalizer(lambda: setattr(seeded_coord, 'state', MemberState.UNJOINED)) - # Default broker.broker_version=(4,2) → JoinGroup v9, SyncGroup v5. + # Default broker.broker_version=(4,2) -> JoinGroup v9, SyncGroup v5. # Follower: leader != our member_id. broker.respond(JoinGroupRequest, _join_response_object( leader='leader-x', member_id='member-1', members=[])) diff --git a/test/consumer/test_fetcher.py b/test/consumer/test_fetcher.py index 2b3e5e76f..3e6ad9e8a 100644 --- a/test/consumer/test_fetcher.py +++ b/test/consumer/test_fetcher.py @@ -709,7 +709,7 @@ def test_fetched_records_parks_parsed_records_on_pause_between_calls(fetcher, to assert fetcher._next_partition_records is None assert tp in fetcher._paused_partition_records - # Resume — remainder comes back on the next call. + # Resume - remainder comes back on the next call. fetcher._subscriptions.resume(tp) records, _ = fetcher.fetched_records() assert tp in records @@ -739,7 +739,7 @@ def test_fetched_records_other_partitions_progress_while_one_paused(fetcher, top def test_fetchable_partitions_excludes_parked_pause_data(fetcher, topic, mocker): """A partition with parked (paused) data must not be picked up by the fetch-request side after resume() but before the next fetched_records() - drains the parked entry — otherwise we'd issue a redundant fetch.""" + drains the parked entry - otherwise we'd issue a redundant fetch.""" fetcher.config['check_crcs'] = False tp = TopicPartition(topic, 0) @@ -747,7 +747,7 @@ def test_fetchable_partitions_excludes_parked_pause_data(fetcher, topic, mocker) fetcher._paused_completed_fetches[tp] = _build_completed_fetch(tp, _paused_msgs(5)) fetcher._subscriptions.resume(tp) - # Even though the partition is now resumed, parked data is pending — + # Even though the partition is now resumed, parked data is pending - # exclude from the fetchable set so we don't refetch. assert tp not in fetcher._fetchable_partitions() @@ -1621,7 +1621,7 @@ def test_offset_out_of_range_with_follower_retries_against_leader( fetcher._parse_fetched_data(completed) # Cache cleared so next fetch goes to leader. assert fetcher._subscriptions.assignment[tp].preferred_read_replica() is None - # No reset requested — we want to re-confirm against the leader first. + # No reset requested - we want to re-confirm against the leader first. assert fetcher._subscriptions.assignment[tp].awaiting_reset is False def test_offset_out_of_range_without_follower_resets( @@ -1637,7 +1637,7 @@ def test_offset_out_of_range_without_follower_resets( def test_leader_epoch_advance_clears_preferred_replica(self, topic): """maybe_validate_position must drop the cache when the cluster - epoch advances past our position's epoch — same partition's leader + epoch advances past our position's epoch - same partition's leader almost certainly changed.""" from kafka.consumer.subscription_state import TopicPartitionState state = TopicPartitionState() @@ -1651,7 +1651,7 @@ def test_leader_epoch_advance_clears_preferred_replica(self, topic): def test_update_does_not_refresh_ttl_on_same_replica(self, topic): """A steady stream of fetches from the same follower must NOT keep - refreshing the lease — the TTL counts down regardless. Matches Java.""" + refreshing the lease - the TTL counts down regardless. Matches Java.""" from kafka.consumer.subscription_state import TopicPartitionState state = TopicPartitionState() expiration = time.monotonic() + 60 diff --git a/test/integration/test_admin_integration.py b/test/integration/test_admin_integration.py index 24899dbfe..635dbc5ef 100644 --- a/test/integration/test_admin_integration.py +++ b/test/integration/test_admin_integration.py @@ -575,7 +575,7 @@ def test_describe_topic_partitions_pagination(kafka_admin_client, topic): result = kafka_admin_client.describe_topic_partitions( [topic], response_partition_limit=1) # Small clusters may still satisfy the request without a cursor if the - # broker ignores the partition limit — tolerate either outcome but verify + # broker ignores the partition limit - tolerate either outcome but verify # the cursor round-trips when present. if result['next_cursor'] is not None: cursor = result['next_cursor'] diff --git a/test/net/test_selector.py b/test/net/test_selector.py index 666e530dc..34042cdd6 100644 --- a/test/net/test_selector.py +++ b/test/net/test_selector.py @@ -521,7 +521,7 @@ def test_slow_task_warns_with_default_threshold(self, caplog): done = Future() async def hog(): - time.sleep(0.05) # synchronous sleep — does not yield to loop + time.sleep(0.05) # synchronous sleep - does not yield to loop done.success(True) net.call_soon(hog) diff --git a/test/net/test_transport.py b/test/net/test_transport.py index 72dfd9d72..dbd7794d5 100644 --- a/test/net/test_transport.py +++ b/test/net/test_transport.py @@ -157,7 +157,7 @@ def test_sock_send_error_closes_transport(self, net, socketpair): err = Errors.KafkaConnectionError('write failed') # Stub _sock_send: drain one chunk (real _sock_send drops the chunk - # that errored — no appendleft on BaseException) and return the + # that errored - no appendleft on BaseException) and return the # error so the while-loop's buffer check terminates. def fake_sock_send(): if t._write_buffer: diff --git a/test/net/test_wakeup_notifier.py b/test/net/test_wakeup_notifier.py index fbf6da7d1..c687ef9b1 100644 --- a/test/net/test_wakeup_notifier.py +++ b/test/net/test_wakeup_notifier.py @@ -63,7 +63,7 @@ def test_notify_before_await_is_latched(self, net, notifier): latched so the next __call__ returns immediately. This is the scenario that the WakeupNotifier docstring describes.""" async def task(): - # notify() before any awaiter — simulates the race in + # notify() before any awaiter - simulates the race in # cluster._refresh_loop where request_update() lands between # ttl() and await self._wakeup(...). notifier.notify() diff --git a/test/producer/test_partitioner.py b/test/producer/test_partitioner.py index 7c094ebf7..a4b2b0cbf 100644 --- a/test/producer/test_partitioner.py +++ b/test/producer/test_partitioner.py @@ -51,7 +51,7 @@ def test_keyed_records_hash_like_default(self): def test_null_key_sticks_until_second_on_new_batch(self): """The *first* on_new_batch event is absorbed (it's the first - batch being opened on the newly-picked sticky — exactly what we + batch being opened on the newly-picked sticky - exactly what we want). Rotation only happens on the *second* event, which signals that the previous batch filled up. Without this, the partitioner would rotate on every record whose partition had @@ -62,11 +62,11 @@ def test_null_key_sticks_until_second_on_new_batch(self): for _ in range(50): assert sticky.partition('t', None, all_partitions, available) == p1 - # First on_new_batch: opens the first batch on p1 — no rotation. + # First on_new_batch: opens the first batch on p1 - no rotation. sticky.on_new_batch('t', all_partitions, p1) assert sticky.partition('t', None, all_partitions, available) == p1 - # Second on_new_batch: previous batch filled — rotate. + # Second on_new_batch: previous batch filled - rotate. sticky.on_new_batch('t', all_partitions, p1) p2 = sticky.partition('t', None, all_partitions, available) assert p2 != p1, 'second on_new_batch should rotate' @@ -99,7 +99,7 @@ def test_unavailable_sticky_partition_repicks(self): def test_single_partition_topic_cannot_rotate(self): """on_new_batch on a single-partition topic just keeps the same - partition — there's nothing else to rotate to.""" + partition - there's nothing else to rotate to.""" sticky = StickyPartitioner() all_partitions = available = [0] assert sticky.partition('t', None, all_partitions, available) == 0 diff --git a/test/producer/test_producer.py b/test/producer/test_producer.py index a9227ad43..46e1a5bc2 100644 --- a/test/producer/test_producer.py +++ b/test/producer/test_producer.py @@ -52,7 +52,7 @@ def test_partition_falls_back_to_legacy_callable(): producer._metadata.partitions_for_topic.return_value = {0, 1, 2} producer._metadata.available_partitions_for_topic.return_value = {0, 1, 2} - # A plain function — no .partition attribute — must still work. + # A plain function - no .partition attribute - must still work. calls = [] def legacy_partitioner(key, all_partitions, available): calls.append((key, all_partitions, available)) diff --git a/test/producer/test_sender.py b/test/producer/test_sender.py index 137e1da46..0ff86d124 100644 --- a/test/producer/test_sender.py +++ b/test/producer/test_sender.py @@ -121,7 +121,7 @@ def test_produce_request_negotiates_wire_version(sender, broker, manager, produc batch = producer_batch(magic=magic) produce_request = sender._produce_request(0, 1, 0, [batch]) # acks=1 assert isinstance(produce_request, ProduceRequest) - # Version is not pinned at construction — that's the whole point. + # Version is not pinned at construction - that's the whole point. assert produce_request.API_VERSION is None captured = {} diff --git a/test/producer/test_transaction_manager_mock_broker.py b/test/producer/test_transaction_manager_mock_broker.py index 258c06f22..0d43de5a8 100644 --- a/test/producer/test_transaction_manager_mock_broker.py +++ b/test/producer/test_transaction_manager_mock_broker.py @@ -413,7 +413,7 @@ def test_epoch_bump_negotiates_to_v3_or_higher(self, broker, client, expected_version): """KIP-360 epoch-bump path sets ``min_version=3``; older brokers (<2.5) lack v3 of InitProducerId and the request fails before reach- - ing the wire — covered separately in + ing the wire - covered separately in ``TestBumpProducerIdAndEpoch._supports_epoch_bump`` in test_transaction_manager.py.""" from kafka.producer.transaction_manager import InitProducerIdHandler @@ -1139,7 +1139,7 @@ class TestKip447ConsumerGroupMetadata: The TxnOffsetCommit request uses the modern style (max_version=3 cap + per-connection api_versions negotiation), so we assert the v3 fields - are *populated* on the request object — the wire encoding drops them + are *populated* on the request object - the wire encoding drops them automatically when the negotiated version is v0-v2. """ @@ -1188,7 +1188,7 @@ def test_request_negotiates_down_to_v2_against_old_broker(self, broker, client): def test_send_offsets_to_transaction_accepts_bare_string(self, broker, client): """Back-compat: legacy callers pass a group_id str; wrap into metadata with no-generation defaults so broker treats it as a non-fenced - commit (which is the v0–v2 behavior on older brokers anyway).""" + commit (which is the v0-v2 behavior on older brokers anyway).""" tm = _make_manager(client) tm._current_state = TransactionState.IN_TRANSACTION tm._consumer_group_coordinator = 0 @@ -1320,7 +1320,7 @@ def test_retry_does_not_reorder_against_later_batches(self): # in-flight window contains multiple batches; then return NotLeader # (transient retryable). All subsequent calls: success. # - # The hold is what surfaces the bug — without partition muting, the + # The hold is what surfaces the bug - without partition muting, the # sender drains additional batches while the first is still in flight, # and the first batch's retry (reenqueued via appendleft) is # sequenced *after* them. @@ -1397,7 +1397,7 @@ def produce_response(api_key, api_version, correlation_id, request_bytes): # The first ProduceRequest had base_sequence 0 and was rejected with # NotLeader. With partition muting (the fix), the second # ProduceRequest the broker sees must be the *retry* of that batch - # — same base_sequence. Without muting, a later batch with a higher + # - same base_sequence. Without muting, a later batch with a higher # base_sequence would have drained while the first was in flight, # arriving here ahead of the retry. assert len(received_sequences) >= 2, ( @@ -1407,5 +1407,5 @@ def produce_response(api_key, api_version, correlation_id, request_bytes): % received_sequences) assert received_sequences[1] == 0, ( 'second ProduceRequest must be the retry of base_sequence 0; ' - 'got %r — partition was not muted, later batch drained ahead of ' + 'got %r - partition was not muted, later batch drained ahead of ' 'retry' % received_sequences) diff --git a/test/protocol/consumer/test_group.py b/test/protocol/consumer/test_group.py index 258320832..e5de266e2 100644 --- a/test/protocol/consumer/test_group.py +++ b/test/protocol/consumer/test_group.py @@ -402,7 +402,7 @@ def test_joingroup_request_with_subscription_object_metadata(): protocols=[('consumer', sub)], ) - # This should work — the Bytes codec should call sub.encode() automatically + # This should work - the Bytes codec should call sub.encode() automatically encoded = req.encode() # Now create the same request but with pre-encoded bytes diff --git a/test/protocol/test_broker_version_data.py b/test/protocol/test_broker_version_data.py index afcb49601..dcbf2ab57 100644 --- a/test/protocol/test_broker_version_data.py +++ b/test/protocol/test_broker_version_data.py @@ -1,4 +1,4 @@ -"""Tests for BrokerVersionData — version resolution, construction, and edge cases.""" +"""Tests for BrokerVersionData - version resolution, construction, and edge cases.""" import pytest diff --git a/test/protocol/test_stubs.py b/test/protocol/test_stubs.py index f3c8e83a3..c3f97a5c5 100644 --- a/test/protocol/test_stubs.py +++ b/test/protocol/test_stubs.py @@ -134,7 +134,7 @@ def test_stubs_up_to_date(self): for mod_file, (mod, exports) in modules.items(): pyi_path = mod_file.replace('.py', '.pyi') if not os.path.exists(pyi_path): - pytest.skip('Stubs not generated yet — run: python -m kafka.protocol.generate_stubs') + pytest.skip('Stubs not generated yet - run: python -m kafka.protocol.generate_stubs') with open(pyi_path, 'r') as f: existing = f.read() # Stubs are version-dependent; skip freshness check if generated on a different Python