Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,11 +636,14 @@ def poll(self, timeout_ms=None, future=None):
Returns:
list: responses received (can be empty)
"""
if timeout_ms is None:
timeout_ms = self.config['request_timeout_ms']
elif not isinstance(timeout_ms, (int, float)):
if not isinstance(timeout_ms, (int, float, type(None))):
raise TypeError('Invalid type for timeout: %s' % type(timeout_ms))

begin = time.time()
if timeout_ms is not None:
timeout_at = begin + (timeout_ms / 1000)
else:
timeout_at = begin + (self.config['request_timeout_ms'] / 1000)
# Loop for futures, break after first loop if None
responses = []
while True:
Expand All @@ -665,11 +668,12 @@ def poll(self, timeout_ms=None, future=None):
if future is not None and future.is_done:
timeout = 0
else:
user_timeout_ms = 1000 * max(0, timeout_at - time.time())
idle_connection_timeout_ms = self._idle_expiry_manager.next_check_ms()
request_timeout_ms = self._next_ifr_request_timeout_ms()
log.debug("Timeouts: user %f, metadata %f, idle connection %f, request %f", timeout_ms, metadata_timeout_ms, idle_connection_timeout_ms, request_timeout_ms)
log.debug("Timeouts: user %f, metadata %f, idle connection %f, request %f", user_timeout_ms, metadata_timeout_ms, idle_connection_timeout_ms, request_timeout_ms)
timeout = min(
timeout_ms,
user_timeout_ms,
metadata_timeout_ms,
idle_connection_timeout_ms,
request_timeout_ms)
Expand All @@ -683,7 +687,11 @@ def poll(self, timeout_ms=None, future=None):

# If all we had was a timeout (future is None) - only do one poll
# If we do have a future, we keep looping until it is done
if future is None or future.is_done:
if future is None:
break
elif future.is_done:
break
elif timeout_ms is not None and time.time() >= timeout_at:
break

return responses
Expand Down
19 changes: 6 additions & 13 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from kafka.record import MemoryRecords
from kafka.serializer import Deserializer
from kafka.structs import TopicPartition, OffsetAndMetadata, OffsetAndTimestamp
from kafka.util import timeout_ms_fn

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -266,19 +267,7 @@ def _retrieve_offsets(self, timestamps, timeout_ms=None):
if not timestamps:
return {}

elapsed = 0.0 # noqa: F841
begin = time.time()
def inner_timeout_ms(fallback=None):
if timeout_ms is None:
return fallback
elapsed = (time.time() - begin) * 1000
if elapsed >= timeout_ms:
raise Errors.KafkaTimeoutError('Timeout attempting to find coordinator')
ret = max(0, timeout_ms - elapsed)
if fallback is not None:
return min(ret, fallback)
return ret

inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout attempting to find coordinator')
timestamps = copy.copy(timestamps)
while True:
if not timestamps:
Expand All @@ -287,6 +276,10 @@ def inner_timeout_ms(fallback=None):
future = self._send_list_offsets_requests(timestamps)
self._client.poll(future=future, timeout_ms=inner_timeout_ms())

# Timeout w/o future completion
if not future.is_done:
break

if future.succeeded():
return future.value
if not future.retriable():
Expand Down
6 changes: 4 additions & 2 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,8 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True):
dict: Map of topic to list of records (may be empty).
"""
begin = time.time()
self._coordinator.poll(timeout_ms=timeout_ms)
if not self._coordinator.poll(timeout_ms=timeout_ms):
return {}

# Fetch positions if we have partitions we're subscribed to that we
# don't know the offset for
Expand Down Expand Up @@ -1162,7 +1163,8 @@ def inner_poll_ms():

while time.time() < self._consumer_timeout:

self._coordinator.poll(timeout_ms=inner_poll_ms())
if not self._coordinator.poll(timeout_ms=inner_poll_ms()):
continue

# Fetch offsets for any subscribed partitions that we arent tracking yet
if not self._subscription.has_all_fetch_positions():
Expand Down
39 changes: 12 additions & 27 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.find_coordinator import FindCoordinatorRequest
from kafka.protocol.group import HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest, DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID
from kafka.util import timeout_ms_fn

log = logging.getLogger('kafka.coordinator')

Expand Down Expand Up @@ -243,19 +244,7 @@ def ensure_coordinator_ready(self, timeout_ms=None):

Raises: KafkaTimeoutError if timeout_ms is not None
"""
elapsed = 0.0 # noqa: F841
begin = time.time()
def inner_timeout_ms(fallback=None):
if timeout_ms is None:
return fallback
elapsed = (time.time() - begin) * 1000
if elapsed >= timeout_ms:
raise Errors.KafkaTimeoutError('Timeout attempting to find coordinator')
ret = max(0, timeout_ms - elapsed)
if fallback is not None:
return min(ret, fallback)
return ret

inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout attempting to find group coordinator')
with self._client._lock, self._lock:
while self.coordinator_unknown():

Expand All @@ -271,12 +260,17 @@ def inner_timeout_ms(fallback=None):
future = self.lookup_coordinator()
self._client.poll(future=future, timeout_ms=inner_timeout_ms())

if not future.is_done:
raise Errors.KafkaTimeoutError()

if future.failed():
if future.retriable():
if getattr(future.exception, 'invalid_metadata', False):
log.debug('Requesting metadata for group coordinator request: %s', future.exception)
metadata_update = self._client.cluster.request_update()
self._client.poll(future=metadata_update, timeout_ms=inner_timeout_ms())
if not metadata_update.is_done:
raise Errors.KafkaTimeoutError()
else:
time.sleep(inner_timeout_ms(self.config['retry_backoff_ms']) / 1000)
else:
Expand Down Expand Up @@ -349,7 +343,6 @@ def _handle_join_success(self, member_assignment_bytes):
log.info("Successfully joined group %s with generation %s",
self.group_id, self._generation.generation_id)
self.state = MemberState.STABLE
self.rejoin_needed = False
if self._heartbeat_thread:
self._heartbeat_thread.enable()

Expand All @@ -366,23 +359,11 @@ def ensure_active_group(self, timeout_ms=None):

Raises: KafkaTimeoutError if timeout_ms is not None
"""
inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout attempting to join consumer group')
with self._client._lock, self._lock:
if self._heartbeat_thread is None:
self._start_heartbeat_thread()

elapsed = 0.0 # noqa: F841
begin = time.time()
def inner_timeout_ms(fallback=None):
if timeout_ms is None:
return fallback
elapsed = (time.time() - begin) * 1000
if elapsed >= timeout_ms:
raise Errors.KafkaTimeoutError('Timeout attempting to find coordinator')
ret = max(0, timeout_ms - elapsed)
if fallback is not None:
return min(ret, fallback)
return ret

while self.need_rejoin() or self._rejoin_incomplete():
self.ensure_coordinator_ready(timeout_ms=inner_timeout_ms())

Expand Down Expand Up @@ -440,13 +421,17 @@ def inner_timeout_ms(fallback=None):

self._client.poll(future=future, timeout_ms=inner_timeout_ms())

if not future.is_done:
raise Errors.KafkaTimeoutError()

if future.succeeded():
self._on_join_complete(self._generation.generation_id,
self._generation.member_id,
self._generation.protocol,
future.value)
self.join_future = None
self.rejoining = False
self.rejoin_needed = False

else:
self.join_future = None
Expand Down
18 changes: 5 additions & 13 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.commit import OffsetCommitRequest, OffsetFetchRequest
from kafka.structs import OffsetAndMetadata, TopicPartition
from kafka.util import WeakMethod
from kafka.util import timeout_ms_fn, WeakMethod


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -267,18 +267,9 @@ def poll(self, timeout_ms=None):
periodic offset commits if they are enabled.
"""
if self.group_id is None:
return

elapsed = 0.0 # noqa: F841
begin = time.time()
def inner_timeout_ms():
if timeout_ms is None:
return None
elapsed = (time.time() - begin) * 1000
if elapsed >= timeout_ms:
raise Errors.KafkaTimeoutError()
return max(0, timeout_ms - elapsed)
return True

inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout in coordinator.poll')
try:
self._invoke_completed_offset_commit_callbacks()
self.ensure_coordinator_ready(timeout_ms=inner_timeout_ms())
Expand All @@ -305,9 +296,10 @@ def inner_timeout_ms():
self.poll_heartbeat()

self._maybe_auto_commit_offsets_async()
return True

except Errors.KafkaTimeoutError:
return
return False

def time_to_next_poll(self):
"""Return seconds (float) remaining until :meth:`.poll` should be called again"""
Expand Down
18 changes: 18 additions & 0 deletions kafka/util.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from __future__ import absolute_import

import binascii
import time
import weakref

from kafka.errors import KafkaTimeoutError
from kafka.vendor import six


Expand All @@ -22,6 +24,22 @@ def crc32(data):
from binascii import crc32


def timeout_ms_fn(timeout_ms, error_message):
elapsed = 0.0 # noqa: F841
begin = time.time()
def inner_timeout_ms(fallback=None):
if timeout_ms is None:
return fallback
elapsed = (time.time() - begin) * 1000
if elapsed >= timeout_ms:
raise KafkaTimeoutError(error_message)
ret = max(0, timeout_ms - elapsed)
if fallback is not None:
return min(ret, fallback)
return ret
return inner_timeout_ms


class WeakMethod(object):
"""
Callable that weakly references a method and the object it is bound to. It
Expand Down
4 changes: 3 additions & 1 deletion test/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401

from kafka import errors, KafkaAdminClient, KafkaClient, KafkaConsumer, KafkaProducer
from kafka.errors import InvalidReplicationFactorError
from kafka.errors import InvalidReplicationFactorError, KafkaTimeoutError
from kafka.protocol.admin import CreateTopicsRequest
from kafka.protocol.metadata import MetadataRequest
from test.testutil import env_kafka_version, random_string
Expand Down Expand Up @@ -555,6 +555,8 @@ def _failure(error):
future.error_on_callbacks = True
future.add_errback(_failure)
self._client.poll(future=future, timeout_ms=timeout)
if not future.is_done:
raise KafkaTimeoutError()
return future.value
except Exception as exc:
time.sleep(1)
Expand Down
12 changes: 7 additions & 5 deletions test/test_client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ def test_poll(mocker):
ifr_request_timeout = mocker.patch.object(KafkaClient, '_next_ifr_request_timeout_ms')
_poll = mocker.patch.object(KafkaClient, '_poll')
cli = KafkaClient(api_version=(0, 9))
now = time.time()
t = mocker.patch('time.time')
t.return_value = now

# metadata timeout wins
ifr_request_timeout.return_value = float('inf')
Expand Down Expand Up @@ -346,17 +349,16 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
t.return_value = now

# first poll attempts connection
client.poll(timeout_ms=12345678)
client._poll.assert_called_with(12345.678)
client.poll()
client._poll.assert_called()
client._init_connect.assert_called_once_with('foobar')

# poll while connecting should not attempt a new connection
client._connecting.add('foobar')
client._can_connect.reset_mock()
client.poll(timeout_ms=12345678)
client._poll.assert_called_with(12345.678)
client.poll()
client._poll.assert_called()
assert not client._can_connect.called

assert not client._metadata_refresh_in_progress


Expand Down
11 changes: 6 additions & 5 deletions test/test_consumer_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def consumer_thread(i):
bootstrap_servers=connect_str,
group_id=group_id,
client_id="consumer_thread-%s" % i,
api_version_auto_timeout_ms=30000,
api_version_auto_timeout_ms=5000,
heartbeat_interval_ms=500)
while not stop[i].is_set():
for tp, records in six.itervalues(consumers[i].poll(timeout_ms=200)):
Expand All @@ -73,16 +73,18 @@ def consumer_thread(i):
threads[i] = t

try:
timeout = time.time() + 35
timeout = time.time() + 15
while True:
for c in range(num_consumers):

# Verify all consumers have been created
if c not in consumers:
logging.info('%s not in consumers list yet...', c)
break

# Verify all consumers have an assignment
elif not consumers[c].assignment():
logging.info('Consumer %s does not have assignment yet...', c)
break

# If all consumers exist and have an assignment
Expand All @@ -96,8 +98,7 @@ def consumer_thread(i):

# New generation assignment is not complete until
# coordinator.rejoining = False
rejoining = any([consumer._coordinator.rejoining
for consumer in list(consumers.values())])
rejoining = set([c for c, consumer in list(consumers.items()) if consumer._coordinator.rejoining])

if not rejoining and len(generations) == 1:
for c, consumer in list(consumers.items()):
Expand All @@ -110,6 +111,7 @@ def consumer_thread(i):
logging.info('Rejoining: %s, generations: %s', rejoining, generations)
time.sleep(1)
assert time.time() < timeout, "timeout waiting for assignments"
time.sleep(1)

logging.info('Group stabilized; verifying assignment')
group_assignment = set()
Expand Down Expand Up @@ -157,7 +159,6 @@ def test_heartbeat_thread(kafka_broker, topic):
consumer = KafkaConsumer(topic,
bootstrap_servers=get_connect_str(kafka_broker),
group_id=group_id,
api_version_auto_timeout_ms=30000,
heartbeat_interval_ms=500)

# poll until we have joined group / have assignment
Expand Down
Loading