diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index 7d22346d0..c175e142c 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -75,43 +75,42 @@ def consumer_thread(i): try: timeout = time.time() + 15 while True: - for c in range(num_consumers): - - # Verify all consumers have been created - if c not in consumers: - logging.info('%s not in consumers list yet...', c) - break - - # Verify all consumers have an assignment - elif not consumers[c].assignment(): - logging.info('Consumer %s does not have assignment yet...', c) - break + assert time.time() < timeout, "timeout waiting for assignments" + # Verify all consumers have been created + missing_consumers = set(consumers.keys()) - set(range(num_consumers)) + if missing_consumers: + logging.info('Waiting on consumer threads: %s', missing_consumers) + time.sleep(1) + continue + + unassigned_consumers = {c for c, consumer in six.iteritems(consumers) if not consumer.assignment()} + if unassigned_consumers: + logging.info('Waiting for consumer assignments: %s', unassigned_consumers) + time.sleep(1) + continue # If all consumers exist and have an assignment + logging.info('All consumers have assignment... checking for stable group') + # Verify all consumers are in the same generation + # then log state and break while loop + generations = set([consumer._coordinator._generation.generation_id + for consumer in six.itervalues(consumers)]) + + # New generation assignment is not complete until + # coordinator.rejoining = False + rejoining = set([c for c, consumer in six.iteritems(consumers) if consumer._coordinator.rejoining]) + + if not rejoining and len(generations) == 1: + for c, consumer in six.iteritems(consumers): + logging.info("[%s] %s %s: %s", c, + consumer._coordinator._generation.generation_id, + consumer._coordinator._generation.member_id, + consumer.assignment()) + break else: - - logging.info('All consumers have assignment... checking for stable group') - # Verify all consumers are in the same generation - # then log state and break while loop - generations = set([consumer._coordinator._generation.generation_id - for consumer in list(consumers.values())]) - - # New generation assignment is not complete until - # coordinator.rejoining = False - rejoining = set([c for c, consumer in list(consumers.items()) if consumer._coordinator.rejoining]) - - if not rejoining and len(generations) == 1: - for c, consumer in list(consumers.items()): - logging.info("[%s] %s %s: %s", c, - consumer._coordinator._generation.generation_id, - consumer._coordinator._generation.member_id, - consumer.assignment()) - break - else: - logging.info('Rejoining: %s, generations: %s', rejoining, generations) - time.sleep(1) - assert time.time() < timeout, "timeout waiting for assignments" - time.sleep(1) + logging.info('Rejoining: %s, generations: %s', rejoining, generations) + time.sleep(1) + continue logging.info('Group stabilized; verifying assignment') group_assignment = set()