Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 12 additions & 18 deletions kafka/partitioner/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,18 @@ class DefaultPartitioner:
If key is None, selects partition randomly from available,
or from all partitions if none are currently available
"""
@classmethod
def __call__(cls, key, all_partitions, available):
"""
Get the partition corresponding to key
:param key: partitioning key
:param all_partitions: list of all partitions sorted by partition ID
:param available: list of available partitions in no particular order
:return: one of the values from all_partitions or available
"""
if key is None:
if available:
return random.choice(available)
return random.choice(all_partitions)

idx = murmur2(key)
idx &= 0x7fffffff
idx %= len(all_partitions)
return all_partitions[idx]
def partition(self, topic, serialized_key, cluster):
if topic not in cluster.topics():
raise ValueError("Topic %s not found in ClusterMetadata" % (topic,))
all_partitions = sorted(cluster.partitions_for_topic(topic))
available = list(cluster.available_partitions_for_topic(topic))
if serialized_key is not None:
idx = murmur2(serialized_key)
idx &= 0x7fffffff
idx %= len(all_partitions)
return all_partitions[idx]
pool = available if available else all_partitions
return random.choice(pool)


# https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244
Expand Down
133 changes: 59 additions & 74 deletions kafka/partitioner/sticky.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,106 +16,91 @@
"""

import random
import threading

from kafka.partitioner.default import murmur2
from kafka.partitioner.default import DefaultPartitioner


class StickyPartitioner:
class StickyPartitioner(DefaultPartitioner):
"""Partitioner that sticks null-key records to one partition per
topic until ``on_new_batch`` rotates it.

Thread-safety: the underlying ``_sticky`` dict is mutated only by
individually-atomic Python ops (get / setitem / contains). Two
concurrent partitioners may pick different sticky partitions; the
last write wins and both choices are valid, so no lock is needed.
Thread-safety: ``_sticky`` mutations are protected by ``_lock`` so
concurrent ``send()`` callers can't observe a torn read-modify-write.
"""

def __init__(self):
self._sticky = {} # topic -> partition_id
# Java's accumulator distinguishes "first batch created on a
# partition" (no rotation) from "existing batch filled, new one
# being created" (rotate). Our accumulator collapses both into
# ``new_batch_created=True``, so the partitioner absorbs the
# *first* on_new_batch event per sticky and only rotates on the
# subsequent one. Without this, we'd rotate on every record
# whose partition has no existing batch, defeating stickiness.
self._sticky_seen_batch = set() # topics whose current sticky has had >=1 batch event
self._lock = threading.Lock()

def partition(self, topic, key, all_partitions, available):
def partition(self, topic, key, cluster):
"""Choose a partition for the next record.

Arguments:
topic (str): topic to partition on.
key (bytes or None): partitioning key.
all_partitions (list[int]): every partition ID for the topic,
sorted ascending.
available (list[int]): partitions whose leader is currently
known (may be empty when metadata is stale).
cluster (ClusterMetadata): metadata for cluster; provides
all and available partitions for topic.

Raises:
ValueError: if topic is not in ClusterMetadata

Returns:
int: chosen partition ID.
"""
if topic not in cluster.topics():
raise ValueError("Topic %s not found in ClusterMetadata" % (topic,))
if key is not None:
idx = murmur2(key)
idx &= 0x7fffffff
idx %= len(all_partitions)
return all_partitions[idx]
return super().partition(topic, key, cluster)
# Null key: reuse the sticky partition if still valid.
partition = self._sticky.get(topic)
if partition is not None:
if available:
if partition in available:
with self._lock:
partition = self._sticky.get(topic)
if partition is not None:
all_partitions = sorted(cluster.partitions_for_topic(topic))
available = list(cluster.available_partitions_for_topic(topic))
if available:
if partition in available:
return partition
elif partition in all_partitions:
return partition
elif partition in all_partitions:
return partition
# Stale (leader unavailable, topic shrunk); fall through to re-pick.
return self._pick_sticky(topic, all_partitions, available)
# Stale (leader unavailable, topic shrunk); fall through to re-pick.
return self._pick_sticky_locked(topic, cluster)

def on_new_batch(self, topic, all_partitions, prev_partition):
"""Hook called by ``KafkaProducer`` when the accumulator just
opened a new batch for ``topic`` on ``prev_partition``.
def on_new_batch(self, topic, cluster, prev_partition):
"""Hook called by ``KafkaProducer`` on the abort-for-new-batch
retry path: rotate the sticky for ``topic`` so the next
null-key record lands on a different partition.

The *first* event per sticky is absorbed silently: it
corresponds to the first batch ever being created on the
partition we just picked, which is expected - we want
subsequent records to keep landing there. The *second* event
means the previous batch filled up and a new one was opened;
that's the signal to rotate to a different partition so the
next records build up a fresh dense batch elsewhere.
Stale events (where another thread already rotated us off
``prev_partition``) are no-ops.
"""
if self._sticky.get(topic) != prev_partition:
# Someone else (or a key-routed send) already moved us off
# this partition; don't override their choice.
return
if topic not in self._sticky_seen_batch:
self._sticky_seen_batch.add(topic)
return
# Existing batch filled; rotate.
self._sticky_seen_batch.discard(topic)
self._pick_sticky(topic, all_partitions, None,
avoid=prev_partition)
with self._lock:
if self._sticky.get(topic) != prev_partition:
# Another caller already rotated us; don't override.
return
self._pick_sticky_locked(topic, cluster, avoid=prev_partition)

def _pick_sticky(self, topic, all_partitions, available, avoid=None):
pool = available if available else all_partitions
candidates = [p for p in pool if p != avoid] if avoid is not None else pool
if not candidates:
# Single-partition topic, or only the avoid-partition is
# available - no rotation possible.
candidates = pool
partition = random.choice(candidates)
def _pick_sticky_locked(self, topic, cluster, avoid=None):
"""Pick a new sticky partition for ``topic``. Must be called with
``self._lock`` held. Returns None when the topic is no longer in
cluster metadata (caller is expected to no-op in that case)."""
all_partitions = cluster.partitions_for_topic(topic)
if not all_partitions:
return None
all_partitions = sorted(all_partitions)
available = list(cluster.available_partitions_for_topic(topic) or ())
if available:
if len(available) == 1:
partition = available[0]
else:
# >= 2 available: pick uniformly, avoiding ``avoid`` if set.
candidates = [p for p in available if p != avoid] if avoid is not None else available
if not candidates:
candidates = available
partition = random.choice(candidates)
else:
# No partitions are currently available - pick from the full
# set without enforcing ``!= avoid``
partition = random.choice(all_partitions)
self._sticky[topic] = partition
# Reset the seen-batch flag; the new sticky has had no batches yet.
self._sticky_seen_batch.discard(topic)
return partition

# Compatibility shim: legacy code paths that treat partitioners as
# bare callables (key, all_partitions, available) still work, though
# they lose the per-topic stickiness.
def __call__(self, key, all_partitions, available):
if key is not None:
idx = murmur2(key)
idx &= 0x7fffffff
idx %= len(all_partitions)
return all_partitions[idx]
pool = available if available else all_partitions
return random.choice(pool)
71 changes: 36 additions & 35 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,9 @@ class KafkaProducer:
would have the effect of reducing the number of requests sent but
would add up to 5ms of latency to records sent in the absence of
load. Default: 0.
partitioner (callable): Callable used to determine which partition
partitioner (Partitioner): Used to determine which partition
each message is assigned to. Called (after key serialization):
partitioner(key_bytes, all_partitions, available_partitions).
partitioner.partition(topic, key_bytes, cluster_metadata).
The default partitioner implementation hashes each non-None key
using the same murmur2 algorithm as the java client so that
messages with the same key are assigned to the same partition.
Expand Down Expand Up @@ -391,7 +391,7 @@ class KafkaProducer:
'retries': float('inf'),
'batch_size': 16384,
'linger_ms': 0,
'partitioner': StickyPartitioner(),
'partitioner': DefaultPartitioner(),
'connections_max_idle_ms': 9 * 60 * 1000,
'max_block_ms': 60000,
'max_request_size': 1048576,
Expand Down Expand Up @@ -857,6 +857,8 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
sum(len(h_key.encode("utf-8")) + len(h_value) for h_key, h_value in headers) if headers else -1,
).failure(e)

# Track if the user passed an explicit partition b/c sticky logic does not apply
explicit_partition = partition is not None
partition = self._partition(topic, partition, key, value, key_bytes, value_bytes)
assert partition is not None, f'Partitioner did not assign a partition for topic {topic}!'

Expand All @@ -874,24 +876,34 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
if self._transaction_manager and self._transaction_manager.is_transactional():
self._transaction_manager.maybe_add_partition_to_transaction(tp)

result = self._accumulator.append(tp, timestamp_ms, key_bytes, value_bytes, headers)
future, batch_is_full, new_batch_created = result
# KIP-480: when sticky-aware partitioning is in play (no explicit
# partition, no key), try once with abort_on_new_batch=True. If the
# accumulator would have to allocate a fresh batch for this partition,
# rotate the sticky partition first and re-pick. The record that
# *triggers* the new batch then lands on the rotated partition, not
# the next one.
sticky_eligible = not explicit_partition and key_bytes is None
result = self._accumulator.append(tp, timestamp_ms, key_bytes, value_bytes, headers,
abort_on_new_batch=sticky_eligible)
future, batch_is_full, new_batch_created, abort_for_new_batch = result
if abort_for_new_batch:
prev_partition = partition
on_new_batch = getattr(self.config['partitioner'], 'on_new_batch', None)
if on_new_batch is not None:
on_new_batch(topic, self._metadata, prev_partition)
# Re-pick - sticky cache may now point at a different partition.
partition = self._partition(topic, None, key, value, key_bytes, value_bytes)
tp = TopicPartition(topic, partition)
if self._transaction_manager and self._transaction_manager.is_transactional():
self._transaction_manager.maybe_add_partition_to_transaction(tp)
result = self._accumulator.append(tp, timestamp_ms, key_bytes, value_bytes, headers,
abort_on_new_batch=False)
future, batch_is_full, new_batch_created, _ = result

if batch_is_full or new_batch_created:
log.debug("%s: Waking up the sender since %s is either full or"
" getting a new batch", str(self), tp)
self._sender.wakeup()
# KIP-480: notify a sticky-aware partitioner that this null-key
# record opened a new batch on `partition`, so the next null-key
# send for `topic` rotates to a different partition. Keyed
# records hash deterministically and don't participate in sticky
# rotation, so skip the hook for them.
if new_batch_created and key_bytes is None:
partitioner = self.config['partitioner']
on_new_batch = getattr(partitioner, 'on_new_batch', None)
if on_new_batch is not None:
all_partitions = self._metadata.partitions_for_topic(topic)
if all_partitions is not None:
on_new_batch(topic, sorted(all_partitions), partition)
return future

def flush(self, timeout=None):
Expand Down Expand Up @@ -984,28 +996,17 @@ def _serialize(self, f, topic, data):

def _partition(self, topic, partition, key, value,
serialized_key, serialized_value):
all_partitions = self._metadata.partitions_for_topic(topic)
available = self._metadata.available_partitions_for_topic(topic)
if all_partitions is None or available is None:
if topic not in self._metadata.topics():
return None
if partition is not None:
assert partition >= 0
assert partition in all_partitions, 'Unrecognized partition'
all_partitions = self._metadata.partitions_for_topic(topic)
assert all_partitions is not None and partition in all_partitions, (
'Unrecognized partition %s for topic %s' % (partition, topic))
return partition

# Prefer the topic-aware partition() method (KIP-480 sticky
# partitioner needs the topic for its per-topic stickiness).
# Fall back to the legacy callable interface so user-supplied
# custom partitioners written against pre-KIP-480 kafka-python
# continue to work unchanged.
partitioner = self.config['partitioner']
if hasattr(partitioner, 'partition'):
return partitioner.partition(topic, serialized_key,
sorted(all_partitions),
list(available))
return partitioner(serialized_key,
sorted(all_partitions),
list(available))
return self.config['partitioner'].partition(
topic, serialized_key, self._metadata)

def metrics(self, raw=False):
"""Get metrics on producer performance.
Expand All @@ -1032,4 +1033,4 @@ def metrics(self, raw=False):
return metrics

def __str__(self):
return "<KafkaProducer client_id=%s transactional_id=%s>" % (self.config['client_id'], self.config['transactional_id'])
return "<KafkaProducer client_id=%s transactional_id=%s>" % (self.config.get('client_id', None), self.config.get('transactional_id', None))
21 changes: 16 additions & 5 deletions kafka/producer/record_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ def _tp_lock(self, tp):
self._tp_locks[tp] = threading.Lock()
return self._tp_locks[tp]

def append(self, tp, timestamp_ms, key, value, headers, now=None):
def append(self, tp, timestamp_ms, key, value, headers, now=None,
abort_on_new_batch=False):
"""Add a record to the accumulator, return the append result.

The append result will contain the future metadata, and flag for
Expand All @@ -123,9 +124,14 @@ def append(self, tp, timestamp_ms, key, value, headers, now=None):
key (bytes): The key for the record
value (bytes): The value for the record
headers (List[Tuple[str, bytes]]): The header fields for the record
abort_on_new_batch (bool): KIP-480. When True, return early with
``abort_for_new_batch=True`` instead of allocating a new
batch when no in-progress batch has room. Caller is expected
to consult the partitioner's ``on_new_batch`` hook, re-pick
the partition, and retry with ``abort_on_new_batch=False``.

Returns:
tuple: (future, batch_is_full, new_batch_created)
tuple: (future, batch_is_full, new_batch_created, abort_for_new_batch)
"""
assert isinstance(tp, TopicPartition), 'not TopicPartition'
assert not self._closed, 'RecordAccumulator is closed'
Expand All @@ -142,7 +148,12 @@ def append(self, tp, timestamp_ms, key, value, headers, now=None):
future = last.try_append(timestamp_ms, key, value, headers, now=now)
if future is not None:
batch_is_full = len(dq) > 1 or last.records.is_full()
return future, batch_is_full, False
return future, batch_is_full, False, False

if abort_on_new_batch:
# KIP-480: don't allocate a new batch yet. Caller will
# rotate the sticky partition and retry.
return None, False, False, True

with self._tp_lock(tp):
# Need to check if producer is closed again after grabbing the
Expand All @@ -156,7 +167,7 @@ def append(self, tp, timestamp_ms, key, value, headers, now=None):
# Somebody else found us a batch, return the one we
# waited for! Hopefully this doesn't happen often...
batch_is_full = len(dq) > 1 or last.records.is_full()
return future, batch_is_full, False
return future, batch_is_full, False, False

if self._transaction_manager and self.config['message_version'] < 2:
raise Errors.UnsupportedVersionError("Attempting to use idempotence with a broker which"
Expand All @@ -176,7 +187,7 @@ def append(self, tp, timestamp_ms, key, value, headers, now=None):
dq.append(batch)
self._incomplete.add(batch)
batch_is_full = len(dq) > 1 or batch.records.is_full()
return future, batch_is_full, True
return future, batch_is_full, True, False
finally:
self._appends_in_progress.decrement()

Expand Down
Loading
Loading