diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 29c2a7182..508e35a0b 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -56,12 +56,13 @@ class Fetcher(six.Iterator): 'max_partition_fetch_bytes': 1048576, 'max_poll_records': sys.maxsize, 'check_crcs': True, + 'metrics': None, 'metric_group_prefix': 'consumer', 'retry_backoff_ms': 100, 'enable_incremental_fetch_sessions': True, } - def __init__(self, client, subscriptions, metrics, **configs): + def __init__(self, client, subscriptions, **configs): """Initialize a Kafka Message Fetcher. Keyword Arguments: @@ -111,7 +112,10 @@ def __init__(self, client, subscriptions, metrics, **configs): self._next_partition_records = None # Holds a single PartitionRecords until fully consumed self._iterator = None self._fetch_futures = collections.deque() - self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix']) + if self.config['metrics']: + self._sensors = FetchManagerMetrics(self.config['metrics'], self.config['metric_group_prefix']) + else: + self._sensors = None self._isolation_level = READ_UNCOMMITTED self._session_handlers = {} self._nodes_with_pending_fetch_requests = set() @@ -391,7 +395,7 @@ def _append(self, drained, part, max_records, update_offsets): # when each message is yielded). There may be edge cases where we re-fetch records # that we'll end up skipping, but for now we'll live with that. highwater = self._subscriptions.assignment[tp].highwater - if highwater is not None: + if highwater is not None and self._sensors: self._sensors.records_fetch_lag.record(highwater - part.next_fetch_offset) if update_offsets or not part_records: # TODO: save leader_epoch @@ -705,7 +709,10 @@ def _handle_fetch_response(self, node_id, fetch_offsets, send_time, response): partitions = set([TopicPartition(topic, partition_data[0]) for topic, partitions in response.topics for partition_data in partitions]) - metric_aggregator = FetchResponseMetricAggregator(self._sensors, partitions) + if self._sensors: + metric_aggregator = FetchResponseMetricAggregator(self._sensors, partitions) + else: + metric_aggregator = None for topic, partitions in response.topics: for partition_data in partitions: @@ -719,7 +726,8 @@ def _handle_fetch_response(self, node_id, fetch_offsets, send_time, response): ) self._completed_fetches.append(completed_fetch) - self._sensors.fetch_latency.record((time.time() - send_time) * 1000) + if self._sensors: + self._sensors.fetch_latency.record((time.time() - send_time) * 1000) self._nodes_with_pending_fetch_requests.remove(node_id) def _handle_fetch_error(self, node_id, exception): @@ -816,7 +824,7 @@ def _parse_fetched_data(self, completed_fetch): raise error_type('Unexpected error while fetching data') finally: - if parsed_records is None: + if parsed_records is None and completed_fetch.metric_aggregator: completed_fetch.metric_aggregator.record(tp, 0, 0) if error_type is not Errors.NoError: @@ -873,7 +881,8 @@ def __bool__(self): def drain(self): if self.record_iterator is not None: self.record_iterator = None - self.metric_aggregator.record(self.topic_partition, self.bytes_read, self.records_read) + if self.metric_aggregator: + self.metric_aggregator.record(self.topic_partition, self.bytes_read, self.records_read) self.on_drain(self) def take(self, n=None): diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 6e6a88724..4a39dc135 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -234,6 +234,7 @@ class KafkaConsumer(six.Iterator): metric_reporters (list): A list of classes to use as metrics reporters. Implementing the AbstractMetricsReporter interface allows plugging in classes that will be notified of new metric creation. Default: [] + metrics_enabled (bool): Whether to track metrics on this instance. Default True. metrics_num_samples (int): The number of samples maintained to compute metrics. Default: 2 metrics_sample_window_ms (int): The maximum age in milliseconds of @@ -315,6 +316,7 @@ class KafkaConsumer(six.Iterator): 'api_version_auto_timeout_ms': 2000, 'connections_max_idle_ms': 9 * 60 * 1000, 'metric_reporters': [], + 'metrics_enabled': True, 'metrics_num_samples': 2, 'metrics_sample_window_ms': 30000, 'metric_group_prefix': 'consumer', @@ -358,13 +360,15 @@ def __init__(self, *topics, **configs): "fetch_max_wait_ms ({})." .format(connections_max_idle_ms, request_timeout_ms, fetch_max_wait_ms)) - metrics_tags = {'client-id': self.config['client_id']} - metric_config = MetricConfig(samples=self.config['metrics_num_samples'], - time_window_ms=self.config['metrics_sample_window_ms'], - tags=metrics_tags) - reporters = [reporter() for reporter in self.config['metric_reporters']] - self._metrics = Metrics(metric_config, reporters) - # TODO _metrics likely needs to be passed to KafkaClient, etc. + if self.config['metrics_enabled']: + metrics_tags = {'client-id': self.config['client_id']} + metric_config = MetricConfig(samples=self.config['metrics_num_samples'], + time_window_ms=self.config['metrics_sample_window_ms'], + tags=metrics_tags) + reporters = [reporter() for reporter in self.config['metric_reporters']] + self._metrics = Metrics(metric_config, reporters) + else: + self._metrics = None # api_version was previously a str. Accept old format for now if isinstance(self.config['api_version'], str): @@ -402,9 +406,9 @@ def __init__(self, *topics, **configs): self._subscription = SubscriptionState(self.config['auto_offset_reset']) self._fetcher = Fetcher( - self._client, self._subscription, self._metrics, **self.config) + self._client, self._subscription, metrics=self._metrics, **self.config) self._coordinator = ConsumerCoordinator( - self._client, self._subscription, self._metrics, + self._client, self._subscription, metrics=self._metrics, assignors=self.config['partition_assignment_strategy'], **self.config) self._closed = False @@ -485,7 +489,8 @@ def close(self, autocommit=True, timeout_ms=None): log.debug("Closing the KafkaConsumer.") self._closed = True self._coordinator.close(autocommit=autocommit, timeout_ms=timeout_ms) - self._metrics.close() + if self._metrics: + self._metrics.close() self._client.close() try: self.config['key_deserializer'].close() @@ -989,6 +994,8 @@ def metrics(self, raw=False): This is an unstable interface. It may change in future releases without warning. """ + if not self._metrics: + return if raw: return self._metrics.metrics.copy() diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 410e92fc9..0c238fde8 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -84,10 +84,11 @@ class BaseCoordinator(object): 'max_poll_interval_ms': 300000, 'retry_backoff_ms': 100, 'api_version': (0, 10, 1), + 'metrics': None, 'metric_group_prefix': '', } - def __init__(self, client, metrics, **configs): + def __init__(self, client, **configs): """ Keyword Arguments: group_id (str): name of the consumer group to join for dynamic @@ -130,8 +131,11 @@ def __init__(self, client, metrics, **configs): self.coordinator_id = None self._find_coordinator_future = None self._generation = Generation.NO_GENERATION - self.sensors = GroupCoordinatorMetrics(self.heartbeat, metrics, - self.config['metric_group_prefix']) + if self.config['metrics']: + self._sensors = GroupCoordinatorMetrics(self.heartbeat, self.config['metrics'], + self.config['metric_group_prefix']) + else: + self._sensors = None @abc.abstractmethod def protocol_type(self): @@ -531,7 +535,8 @@ def _handle_join_group_response(self, future, send_time, response): if error_type is Errors.NoError: log.debug("Received successful JoinGroup response for group %s: %s", self.group_id, response) - self.sensors.join_latency.record((time.time() - send_time) * 1000) + if self._sensors: + self._sensors.join_latency.record((time.time() - send_time) * 1000) with self._lock: if self.state is not MemberState.REBALANCING: # if the consumer was woken up before a rebalance completes, @@ -650,7 +655,8 @@ def _send_sync_group_request(self, request): def _handle_sync_group_response(self, future, send_time, response): error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: - self.sensors.sync_latency.record((time.time() - send_time) * 1000) + if self._sensors: + self._sensors.sync_latency.record((time.time() - send_time) * 1000) future.success(response.member_assignment) return @@ -856,7 +862,8 @@ def _send_heartbeat_request(self): return future def _handle_heartbeat_response(self, future, send_time, response): - self.sensors.heartbeat_latency.record((time.time() - send_time) * 1000) + if self._sensors: + self._sensors.heartbeat_latency.record((time.time() - send_time) * 1000) error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: log.debug("Received successful heartbeat response for group %s", diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 773df38bd..4bc7ba9cb 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -39,10 +39,11 @@ class ConsumerCoordinator(BaseCoordinator): 'retry_backoff_ms': 100, 'api_version': (0, 10, 1), 'exclude_internal_topics': True, + 'metrics': None, 'metric_group_prefix': 'consumer' } - def __init__(self, client, subscription, metrics, **configs): + def __init__(self, client, subscription, **configs): """Initialize the coordination manager. Keyword Arguments: @@ -78,7 +79,7 @@ def __init__(self, client, subscription, metrics, **configs): True the only way to receive records from an internal topic is subscribing to it. Requires 0.10+. Default: True """ - super(ConsumerCoordinator, self).__init__(client, metrics, **configs) + super(ConsumerCoordinator, self).__init__(client, **configs) self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: @@ -120,8 +121,11 @@ def __init__(self, client, subscription, metrics, **configs): else: self.next_auto_commit_deadline = time.time() + self.auto_commit_interval - self.consumer_sensors = ConsumerCoordinatorMetrics( - metrics, self.config['metric_group_prefix'], self._subscription) + if self.config['metrics']: + self._consumer_sensors = ConsumerCoordinatorMetrics( + self.config['metrics'], self.config['metric_group_prefix'], self._subscription) + else: + self._consumer_sensors = None self._cluster.request_update() self._cluster.add_listener(WeakMethod(self._handle_metadata_update)) @@ -686,7 +690,8 @@ def _send_offset_commit_request(self, offsets): def _handle_offset_commit_response(self, offsets, future, send_time, response): # TODO look at adding request_latency_ms to response (like java kafka) - self.consumer_sensors.commit_latency.record((time.time() - send_time) * 1000) + if self._consumer_sensors: + self._consumer_sensors.commit_latency.record((time.time() - send_time) * 1000) unauthorized_topics = set() for topic, partitions in response.topics: diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index df86e907e..f0eb37a8f 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -267,6 +267,7 @@ class KafkaProducer(object): metric_reporters (list): A list of classes to use as metrics reporters. Implementing the AbstractMetricsReporter interface allows plugging in classes that will be notified of new metric creation. Default: [] + metrics_enabled (bool): Whether to track metrics on this instance. Default True. metrics_num_samples (int): The number of samples maintained to compute metrics. Default: 2 metrics_sample_window_ms (int): The maximum age in milliseconds of @@ -336,6 +337,7 @@ class KafkaProducer(object): 'api_version': None, 'api_version_auto_timeout_ms': 2000, 'metric_reporters': [], + 'metrics_enabled': True, 'metrics_num_samples': 2, 'metrics_sample_window_ms': 30000, 'selector': selectors.DefaultSelector, @@ -393,12 +395,15 @@ def __init__(self, **configs): str(self.config['api_version']), deprecated) # Configure metrics - metrics_tags = {'client-id': self.config['client_id']} - metric_config = MetricConfig(samples=self.config['metrics_num_samples'], - time_window_ms=self.config['metrics_sample_window_ms'], - tags=metrics_tags) - reporters = [reporter() for reporter in self.config['metric_reporters']] - self._metrics = Metrics(metric_config, reporters) + if self.config['metrics_enabled']: + metrics_tags = {'client-id': self.config['client_id']} + metric_config = MetricConfig(samples=self.config['metrics_num_samples'], + time_window_ms=self.config['metrics_sample_window_ms'], + tags=metrics_tags) + reporters = [reporter() for reporter in self.config['metric_reporters']] + self._metrics = Metrics(metric_config, reporters) + else: + self._metrics = None client = self.config['kafka_client']( metrics=self._metrics, metric_group_prefix='producer', @@ -424,11 +429,12 @@ def __init__(self, **configs): self.config['compression_attrs'] = compression_attrs message_version = self._max_usable_produce_magic() - self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config) + self._accumulator = RecordAccumulator(message_version=message_version, **self.config) self._metadata = client.cluster guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1) self._sender = Sender(client, self._metadata, - self._accumulator, self._metrics, + self._accumulator, + metrics=self._metrics, guarantee_message_order=guarantee_message_order, **self.config) self._sender.daemon = True @@ -524,7 +530,8 @@ def __getattr__(self, name): timeout) self._sender.force_close() - self._metrics.close() + if self._metrics: + self._metrics.close() try: self.config['key_serializer'].close() except AttributeError: @@ -773,6 +780,8 @@ def metrics(self, raw=False): This is an unstable interface. It may change in future releases without warning. """ + if not self._metrics: + return if raw: return self._metrics.metrics.copy() diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 6e7fa60f7..ba823500d 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -162,8 +162,6 @@ class RecordAccumulator(object): 'linger_ms': 0, 'retry_backoff_ms': 100, 'message_version': 0, - 'metrics': None, - 'metric_group_prefix': 'producer-metrics', } def __init__(self, **configs): diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 0e2ea577e..20af28d07 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -29,11 +29,12 @@ class Sender(threading.Thread): 'acks': 1, 'retries': 0, 'request_timeout_ms': 30000, + 'metrics': None, 'guarantee_message_order': False, 'client_id': 'kafka-python-' + __version__, } - def __init__(self, client, metadata, accumulator, metrics, **configs): + def __init__(self, client, metadata, accumulator, **configs): super(Sender, self).__init__() self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: @@ -47,7 +48,10 @@ def __init__(self, client, metadata, accumulator, metrics, **configs): self._running = True self._force_close = False self._topics_to_add = set() - self._sensors = SenderMetrics(metrics, self._client, self._metadata) + if self.config['metrics']: + self._sensors = SenderMetrics(self.config['metrics'], self._client, self._metadata) + else: + self._sensors = None def run(self): """The main run loop for the sender thread.""" @@ -123,10 +127,12 @@ def run_once(self): expired_batches = self._accumulator.abort_expired_batches( self.config['request_timeout_ms'], self._metadata) - for expired_batch in expired_batches: - self._sensors.record_errors(expired_batch.topic_partition.topic, expired_batch.record_count) - self._sensors.update_produce_request_metrics(batches_by_node) + if self._sensors: + for expired_batch in expired_batches: + self._sensors.record_errors(expired_batch.topic_partition.topic, expired_batch.record_count) + self._sensors.update_produce_request_metrics(batches_by_node) + requests = self._create_produce_requests(batches_by_node) # If we have any nodes that are ready to send + have sendable data, # poll with 0 timeout so this can immediately loop and try sending more @@ -237,7 +243,8 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star self.config['retries'] - batch.attempts - 1, error) self._accumulator.reenqueue(batch) - self._sensors.record_retries(batch.topic_partition.topic, batch.record_count) + if self._sensors: + self._sensors.record_retries(batch.topic_partition.topic, batch.record_count) else: if error is Errors.TopicAuthorizationFailedError: error = error(batch.topic_partition.topic) @@ -245,7 +252,7 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star # tell the user the result of their request batch.done(base_offset, timestamp_ms, error, log_start_offset) self._accumulator.deallocate(batch) - if error is not None: + if error is not None and self._sensors: self._sensors.record_errors(batch.topic_partition.topic, batch.record_count) if getattr(error, 'invalid_metadata', False): diff --git a/test/conftest.py b/test/conftest.py index 4c4c503e7..ba76d6cc5 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -195,3 +195,26 @@ def _send_messages(number_range, partition=0, topic=topic, producer=kafka_produc return [msg for (msg, f) in messages_and_futures] return _send_messages + + +@pytest.fixture +def metrics(): + from kafka.metrics import Metrics + + metrics = Metrics() + try: + yield metrics + finally: + metrics.close() + + +@pytest.fixture +def client(conn, mocker): + from kafka import KafkaClient + + cli = KafkaClient(api_version=(0, 9)) + mocker.patch.object(cli, '_init_connect', return_value=True) + try: + yield cli + finally: + cli._close() diff --git a/test/test_coordinator.py b/test/test_coordinator.py index eac1a1e62..1d1a6df50 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -16,7 +16,6 @@ ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment) import kafka.errors as Errors from kafka.future import Future -from kafka.metrics import Metrics from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS from kafka.protocol.commit import ( OffsetCommitRequest, OffsetCommitResponse, @@ -35,15 +34,13 @@ def client(conn, mocker): cli._close() @pytest.fixture -def coordinator(client, mocker): - metrics = Metrics() - coord = ConsumerCoordinator(client, SubscriptionState(), metrics) +def coordinator(client, metrics, mocker): + coord = ConsumerCoordinator(client, SubscriptionState(), metrics=metrics) try: yield coord finally: mocker.patch.object(coord, 'coordinator_unknown', return_value=True) # avoid attempting to leave group during close() coord.close(timeout_ms=0) - metrics.close() def test_init(client, coordinator): @@ -53,10 +50,10 @@ def test_init(client, coordinator): @pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)]) -def test_autocommit_enable_api_version(conn, api_version): +def test_autocommit_enable_api_version(conn, metrics, api_version): coordinator = ConsumerCoordinator(KafkaClient(api_version=api_version), SubscriptionState(), - Metrics(), + metrics=metrics, enable_auto_commit=True, session_timeout_ms=30000, # session_timeout_ms and max_poll_interval_ms max_poll_interval_ms=30000, # should be the same to avoid KafkaConfigurationError @@ -100,10 +97,10 @@ def test_group_protocols(coordinator): @pytest.mark.parametrize('api_version', [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)]) -def test_pattern_subscription(conn, api_version): +def test_pattern_subscription(conn, metrics, api_version): coordinator = ConsumerCoordinator(KafkaClient(api_version=api_version), SubscriptionState(), - Metrics(), + metrics=metrics, api_version=api_version, session_timeout_ms=10000, max_poll_interval_ms=10000) @@ -390,7 +387,6 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable, mock_exc = mocker.patch('kafka.coordinator.consumer.log.exception') client = KafkaClient(api_version=api_version) coordinator = ConsumerCoordinator(client, SubscriptionState(), - Metrics(), api_version=api_version, session_timeout_ms=30000, max_poll_interval_ms=30000, diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 854f1fa98..184acc9e1 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -15,7 +15,6 @@ from kafka.consumer.subscription_state import SubscriptionState import kafka.errors as Errors from kafka.future import Future -from kafka.metrics import Metrics from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS from kafka.protocol.fetch import FetchRequest, FetchResponse from kafka.protocol.list_offsets import ListOffsetsResponse @@ -43,13 +42,13 @@ def topic(): @pytest.fixture -def fetcher(client, subscription_state, topic): +def fetcher(client, metrics, subscription_state, topic): subscription_state.subscribe(topics=[topic]) assignment = [TopicPartition(topic, i) for i in range(3)] subscription_state.assign_from_subscribed(assignment) for tp in assignment: subscription_state.seek(tp, 0) - return Fetcher(client, subscription_state, Metrics()) + return Fetcher(client, subscription_state, metrics=metrics) def _build_record_batch(msgs, compression=0, offset=0, magic=2): diff --git a/test/test_sender.py b/test/test_sender.py index 1656bbfe9..b037d2b48 100644 --- a/test/test_sender.py +++ b/test/test_sender.py @@ -5,7 +5,6 @@ import io from kafka.client_async import KafkaClient -from kafka.metrics import Metrics from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS from kafka.protocol.produce import ProduceRequest from kafka.producer.record_accumulator import RecordAccumulator, ProducerBatch @@ -14,24 +13,14 @@ from kafka.structs import TopicPartition -@pytest.fixture -def client(): - return KafkaClient(bootstrap_servers=(), api_version=(0, 9)) - - @pytest.fixture def accumulator(): return RecordAccumulator() -@pytest.fixture -def metrics(): - return Metrics() - - @pytest.fixture def sender(client, accumulator, metrics, mocker): - return Sender(client, client.cluster, accumulator, metrics) + return Sender(client, client.cluster, accumulator, metrics=metrics) @pytest.mark.parametrize(("api_version", "produce_version"), [