Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 0 additions & 66 deletions benchmarks/load_example.py

This file was deleted.

File renamed without changes.
Empty file added kafka/benchmarks/__init__.py
Empty file.
79 changes: 20 additions & 59 deletions benchmarks/consumer_performance.py → kafka/benchmarks/consumer_performance.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,16 @@
from __future__ import absolute_import, print_function

import argparse
import logging
import pprint
import sys
import threading
import time
import traceback

from kafka.vendor.six.moves import range

from kafka import KafkaConsumer, KafkaProducer
from test.fixtures import KafkaFixture, ZookeeperFixture

logging.basicConfig(level=logging.ERROR)


def start_brokers(n):
print('Starting {0} {1}-node cluster...'.format(KafkaFixture.kafka_version, n))
print('-> 1 Zookeeper')
zk = ZookeeperFixture.instance()
print('---> {0}:{1}'.format(zk.host, zk.port))
print()

partitions = min(n, 3)
replicas = min(n, 3)
print('-> {0} Brokers [{1} partitions / {2} replicas]'.format(n, partitions, replicas))
brokers = [
KafkaFixture.instance(i, zk, zk_chroot='',
partitions=partitions, replicas=replicas)
for i in range(n)
]
for broker in brokers:
print('---> {0}:{1}'.format(broker.host, broker.port))
print()
return brokers
from kafka import KafkaConsumer


class ConsumerPerformance(object):

@staticmethod
def run(args):
try:
Expand All @@ -53,28 +26,17 @@ def run(args):
pass
if v == 'None':
v = None
elif v == 'False':
v = False
elif v == 'True':
v = True
props[k] = v

if args.brokers:
brokers = start_brokers(args.brokers)
props['bootstrap_servers'] = ['{0}:{1}'.format(broker.host, broker.port)
for broker in brokers]
print('---> bootstrap_servers={0}'.format(props['bootstrap_servers']))
print()

print('-> Producing records')
record = bytes(bytearray(args.record_size))
producer = KafkaProducer(compression_type=args.fixture_compression,
**props)
for i in range(args.num_records):
producer.send(topic=args.topic, value=record)
producer.flush()
producer.close()
print('-> OK!')
print()

print('Initializing Consumer...')
props['bootstrap_servers'] = args.bootstrap_servers
props['auto_offset_reset'] = 'earliest'
if 'group_id' not in props:
props['group_id'] = 'kafka-consumer-benchmark'
if 'consumer_timeout_ms' not in props:
props['consumer_timeout_ms'] = 10000
props['metrics_sample_window_ms'] = args.stats_interval * 1000
Expand All @@ -92,14 +54,18 @@ def run(args):
print('-> OK!')
print()

start_time = time.time()
records = 0
for msg in consumer:
records += 1
if records >= args.num_records:
break
print('Consumed {0} records'.format(records))

end_time = time.time()
timer_stop.set()
timer.join()
print('Consumed {0} records'.format(records))
print('Execution time:', end_time - start_time, 'secs')

except Exception:
exc_info = sys.exc_info()
Expand Down Expand Up @@ -143,32 +109,27 @@ def get_args_parser():
parser = argparse.ArgumentParser(
description='This tool is used to verify the consumer performance.')

parser.add_argument(
'--bootstrap-servers', type=str, nargs='+', default=(),
help='host:port for cluster bootstrap servers')
parser.add_argument(
'--topic', type=str,
help='Topic for consumer test',
help='Topic for consumer test (default: kafka-python-benchmark-test)',
default='kafka-python-benchmark-test')
parser.add_argument(
'--num-records', type=int,
help='number of messages to consume',
help='number of messages to consume (default: 1000000)',
default=1000000)
parser.add_argument(
'--record-size', type=int,
help='message size in bytes',
default=100)
parser.add_argument(
'--consumer-config', type=str, nargs='+', default=(),
help='kafka consumer related configuration properties like '
'bootstrap_servers,client_id etc..')
parser.add_argument(
'--fixture-compression', type=str,
help='specify a compression type for use with broker fixtures / producer')
parser.add_argument(
'--brokers', type=int,
help='Number of kafka brokers to start',
default=0)
parser.add_argument(
'--stats-interval', type=int,
help='Interval in seconds for stats reporting to console',
help='Interval in seconds for stats reporting to console (default: 5)',
default=5)
parser.add_argument(
'--raw-metrics', action='store_true',
Expand Down
110 changes: 110 additions & 0 deletions kafka/benchmarks/load_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#!/usr/bin/env python
from __future__ import print_function

import argparse
import logging
import threading
import time

from kafka import KafkaConsumer, KafkaProducer


class Producer(threading.Thread):

def __init__(self, bootstrap_servers, topic, stop_event, msg_size):
super(Producer, self).__init__()
self.bootstrap_servers = bootstrap_servers
self.topic = topic
self.stop_event = stop_event
self.big_msg = b'1' * msg_size

def run(self):
producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers)
self.sent = 0

while not self.stop_event.is_set():
producer.send(self.topic, self.big_msg)
self.sent += 1
producer.flush()
producer.close()


class Consumer(threading.Thread):
def __init__(self, bootstrap_servers, topic, stop_event, msg_size):
super(Consumer, self).__init__()
self.bootstrap_servers = bootstrap_servers
self.topic = topic
self.stop_event = stop_event
self.msg_size = msg_size

def run(self):
consumer = KafkaConsumer(bootstrap_servers=self.bootstrap_servers,
auto_offset_reset='earliest')
consumer.subscribe([self.topic])
self.valid = 0
self.invalid = 0

for message in consumer:
if len(message.value) == self.msg_size:
self.valid += 1
else:
print('Invalid message:', len(message.value), self.msg_size)
self.invalid += 1

if self.stop_event.is_set():
break
consumer.close()


def get_args_parser():
parser = argparse.ArgumentParser(
description='This tool is used to demonstrate consumer and producer load.')

parser.add_argument(
'--bootstrap-servers', type=str, nargs='+', default=('localhost:9092'),
help='host:port for cluster bootstrap servers (default: localhost:9092)')
parser.add_argument(
'--topic', type=str,
help='Topic for load test (default: kafka-python-benchmark-load-example)',
default='kafka-python-benchmark-load-example')
parser.add_argument(
'--msg-size', type=int,
help='Message size, in bytes, for load test (default: 524288)',
default=524288)
parser.add_argument(
'--load-time', type=int,
help='number of seconds to run load test (default: 10)',
default=10)
parser.add_argument(
'--log-level', type=str,
help='Optional logging level for load test: ERROR|INFO|DEBUG etc',
default=None)
return parser


def main(args):
if args.log_level:
logging.basicConfig(
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
level=getattr(logging, args.log_level))
producer_stop = threading.Event()
consumer_stop = threading.Event()
threads = [
Producer(args.bootstrap_servers, args.topic, producer_stop, args.msg_size),
Consumer(args.bootstrap_servers, args.topic, consumer_stop, args.msg_size)
]

for t in threads:
t.start()

time.sleep(args.load_time)
producer_stop.set()
consumer_stop.set()
print('Messages sent: %d' % threads[0].sent)
print('Messages recvd: %d' % threads[1].valid)
print('Messages invalid: %d' % threads[1].invalid)


if __name__ == "__main__":
args = get_args_parser().parse_args()
main(args)
Loading
Loading