diff --git a/benchmarks/load_example.py b/benchmarks/load_example.py deleted file mode 100755 index eef113e9a..000000000 --- a/benchmarks/load_example.py +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/env python -from __future__ import print_function -import threading, logging, time - -from kafka import KafkaConsumer, KafkaProducer - -msg_size = 524288 - -producer_stop = threading.Event() -consumer_stop = threading.Event() - -class Producer(threading.Thread): - big_msg = b'1' * msg_size - - def run(self): - producer = KafkaProducer(bootstrap_servers='localhost:9092') - self.sent = 0 - - while not producer_stop.is_set(): - producer.send('my-topic', self.big_msg) - self.sent += 1 - producer.flush() - - -class Consumer(threading.Thread): - - def run(self): - consumer = KafkaConsumer(bootstrap_servers='localhost:9092', - auto_offset_reset='earliest') - consumer.subscribe(['my-topic']) - self.valid = 0 - self.invalid = 0 - - for message in consumer: - if len(message.value) == msg_size: - self.valid += 1 - else: - self.invalid += 1 - - if consumer_stop.is_set(): - break - - consumer.close() - -def main(): - threads = [ - Producer(), - Consumer() - ] - - for t in threads: - t.start() - - time.sleep(10) - producer_stop.set() - consumer_stop.set() - print('Messages sent: %d' % threads[0].sent) - print('Messages recvd: %d' % threads[1].valid) - print('Messages invalid: %d' % threads[1].invalid) - -if __name__ == "__main__": - logging.basicConfig( - format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', - level=logging.INFO - ) - main() diff --git a/benchmarks/README.md b/kafka/benchmarks/README.md similarity index 100% rename from benchmarks/README.md rename to kafka/benchmarks/README.md diff --git a/kafka/benchmarks/__init__.py b/kafka/benchmarks/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/benchmarks/consumer_performance.py b/kafka/benchmarks/consumer_performance.py old mode 100755 new mode 100644 similarity index 67% rename from benchmarks/consumer_performance.py rename to kafka/benchmarks/consumer_performance.py index 9e3b6a919..c35a164c2 --- a/benchmarks/consumer_performance.py +++ b/kafka/benchmarks/consumer_performance.py @@ -4,43 +4,16 @@ from __future__ import absolute_import, print_function import argparse -import logging import pprint import sys import threading +import time import traceback -from kafka.vendor.six.moves import range - -from kafka import KafkaConsumer, KafkaProducer -from test.fixtures import KafkaFixture, ZookeeperFixture - -logging.basicConfig(level=logging.ERROR) - - -def start_brokers(n): - print('Starting {0} {1}-node cluster...'.format(KafkaFixture.kafka_version, n)) - print('-> 1 Zookeeper') - zk = ZookeeperFixture.instance() - print('---> {0}:{1}'.format(zk.host, zk.port)) - print() - - partitions = min(n, 3) - replicas = min(n, 3) - print('-> {0} Brokers [{1} partitions / {2} replicas]'.format(n, partitions, replicas)) - brokers = [ - KafkaFixture.instance(i, zk, zk_chroot='', - partitions=partitions, replicas=replicas) - for i in range(n) - ] - for broker in brokers: - print('---> {0}:{1}'.format(broker.host, broker.port)) - print() - return brokers +from kafka import KafkaConsumer class ConsumerPerformance(object): - @staticmethod def run(args): try: @@ -53,28 +26,17 @@ def run(args): pass if v == 'None': v = None + elif v == 'False': + v = False + elif v == 'True': + v = True props[k] = v - if args.brokers: - brokers = start_brokers(args.brokers) - props['bootstrap_servers'] = ['{0}:{1}'.format(broker.host, broker.port) - for broker in brokers] - print('---> bootstrap_servers={0}'.format(props['bootstrap_servers'])) - print() - - print('-> Producing records') - record = bytes(bytearray(args.record_size)) - producer = KafkaProducer(compression_type=args.fixture_compression, - **props) - for i in range(args.num_records): - producer.send(topic=args.topic, value=record) - producer.flush() - producer.close() - print('-> OK!') - print() - print('Initializing Consumer...') + props['bootstrap_servers'] = args.bootstrap_servers props['auto_offset_reset'] = 'earliest' + if 'group_id' not in props: + props['group_id'] = 'kafka-consumer-benchmark' if 'consumer_timeout_ms' not in props: props['consumer_timeout_ms'] = 10000 props['metrics_sample_window_ms'] = args.stats_interval * 1000 @@ -92,14 +54,18 @@ def run(args): print('-> OK!') print() + start_time = time.time() records = 0 for msg in consumer: records += 1 if records >= args.num_records: break - print('Consumed {0} records'.format(records)) + end_time = time.time() timer_stop.set() + timer.join() + print('Consumed {0} records'.format(records)) + print('Execution time:', end_time - start_time, 'secs') except Exception: exc_info = sys.exc_info() @@ -143,18 +109,17 @@ def get_args_parser(): parser = argparse.ArgumentParser( description='This tool is used to verify the consumer performance.') + parser.add_argument( + '--bootstrap-servers', type=str, nargs='+', default=(), + help='host:port for cluster bootstrap servers') parser.add_argument( '--topic', type=str, - help='Topic for consumer test', + help='Topic for consumer test (default: kafka-python-benchmark-test)', default='kafka-python-benchmark-test') parser.add_argument( '--num-records', type=int, - help='number of messages to consume', + help='number of messages to consume (default: 1000000)', default=1000000) - parser.add_argument( - '--record-size', type=int, - help='message size in bytes', - default=100) parser.add_argument( '--consumer-config', type=str, nargs='+', default=(), help='kafka consumer related configuration properties like ' @@ -162,13 +127,9 @@ def get_args_parser(): parser.add_argument( '--fixture-compression', type=str, help='specify a compression type for use with broker fixtures / producer') - parser.add_argument( - '--brokers', type=int, - help='Number of kafka brokers to start', - default=0) parser.add_argument( '--stats-interval', type=int, - help='Interval in seconds for stats reporting to console', + help='Interval in seconds for stats reporting to console (default: 5)', default=5) parser.add_argument( '--raw-metrics', action='store_true', diff --git a/kafka/benchmarks/load_example.py b/kafka/benchmarks/load_example.py new file mode 100644 index 000000000..29796a74c --- /dev/null +++ b/kafka/benchmarks/load_example.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python +from __future__ import print_function + +import argparse +import logging +import threading +import time + +from kafka import KafkaConsumer, KafkaProducer + + +class Producer(threading.Thread): + + def __init__(self, bootstrap_servers, topic, stop_event, msg_size): + super(Producer, self).__init__() + self.bootstrap_servers = bootstrap_servers + self.topic = topic + self.stop_event = stop_event + self.big_msg = b'1' * msg_size + + def run(self): + producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers) + self.sent = 0 + + while not self.stop_event.is_set(): + producer.send(self.topic, self.big_msg) + self.sent += 1 + producer.flush() + producer.close() + + +class Consumer(threading.Thread): + def __init__(self, bootstrap_servers, topic, stop_event, msg_size): + super(Consumer, self).__init__() + self.bootstrap_servers = bootstrap_servers + self.topic = topic + self.stop_event = stop_event + self.msg_size = msg_size + + def run(self): + consumer = KafkaConsumer(bootstrap_servers=self.bootstrap_servers, + auto_offset_reset='earliest') + consumer.subscribe([self.topic]) + self.valid = 0 + self.invalid = 0 + + for message in consumer: + if len(message.value) == self.msg_size: + self.valid += 1 + else: + print('Invalid message:', len(message.value), self.msg_size) + self.invalid += 1 + + if self.stop_event.is_set(): + break + consumer.close() + + +def get_args_parser(): + parser = argparse.ArgumentParser( + description='This tool is used to demonstrate consumer and producer load.') + + parser.add_argument( + '--bootstrap-servers', type=str, nargs='+', default=('localhost:9092'), + help='host:port for cluster bootstrap servers (default: localhost:9092)') + parser.add_argument( + '--topic', type=str, + help='Topic for load test (default: kafka-python-benchmark-load-example)', + default='kafka-python-benchmark-load-example') + parser.add_argument( + '--msg-size', type=int, + help='Message size, in bytes, for load test (default: 524288)', + default=524288) + parser.add_argument( + '--load-time', type=int, + help='number of seconds to run load test (default: 10)', + default=10) + parser.add_argument( + '--log-level', type=str, + help='Optional logging level for load test: ERROR|INFO|DEBUG etc', + default=None) + return parser + + +def main(args): + if args.log_level: + logging.basicConfig( + format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', + level=getattr(logging, args.log_level)) + producer_stop = threading.Event() + consumer_stop = threading.Event() + threads = [ + Producer(args.bootstrap_servers, args.topic, producer_stop, args.msg_size), + Consumer(args.bootstrap_servers, args.topic, consumer_stop, args.msg_size) + ] + + for t in threads: + t.start() + + time.sleep(args.load_time) + producer_stop.set() + consumer_stop.set() + print('Messages sent: %d' % threads[0].sent) + print('Messages recvd: %d' % threads[1].valid) + print('Messages invalid: %d' % threads[1].invalid) + + +if __name__ == "__main__": + args = get_args_parser().parse_args() + main(args) diff --git a/benchmarks/producer_performance.py b/kafka/benchmarks/producer_performance.py old mode 100755 new mode 100644 similarity index 71% rename from benchmarks/producer_performance.py rename to kafka/benchmarks/producer_performance.py index c0de6fd23..1a1092960 --- a/benchmarks/producer_performance.py +++ b/kafka/benchmarks/producer_performance.py @@ -7,37 +7,15 @@ import pprint import sys import threading +import time import traceback from kafka.vendor.six.moves import range from kafka import KafkaProducer -from test.fixtures import KafkaFixture, ZookeeperFixture - - -def start_brokers(n): - print('Starting {0} {1}-node cluster...'.format(KafkaFixture.kafka_version, n)) - print('-> 1 Zookeeper') - zk = ZookeeperFixture.instance() - print('---> {0}:{1}'.format(zk.host, zk.port)) - print() - - partitions = min(n, 3) - replicas = min(n, 3) - print('-> {0} Brokers [{1} partitions / {2} replicas]'.format(n, partitions, replicas)) - brokers = [ - KafkaFixture.instance(i, zk, zk_chroot='', - partitions=partitions, replicas=replicas) - for i in range(n) - ] - for broker in brokers: - print('---> {0}:{1}'.format(broker.host, broker.port)) - print() - return brokers class ProducerPerformance(object): - @staticmethod def run(args): try: @@ -50,18 +28,14 @@ def run(args): pass if v == 'None': v = None + elif v == 'False': + v = False + elif v == 'True': + v = True props[k] = v - if args.brokers: - brokers = start_brokers(args.brokers) - props['bootstrap_servers'] = ['{0}:{1}'.format(broker.host, broker.port) - for broker in brokers] - print("---> bootstrap_servers={0}".format(props['bootstrap_servers'])) - print() - print('-> OK!') - print() - print('Initializing producer...') + props['bootstrap_servers'] = args.bootstrap_servers record = bytes(bytearray(args.record_size)) props['metrics_sample_window_ms'] = args.stats_interval * 1000 @@ -79,11 +53,29 @@ def run(args): print('-> OK!') print() - for i in range(args.num_records): - producer.send(topic=args.topic, value=record) - producer.flush() - + def _benchmark(): + results = [] + for i in range(args.num_records): + results.append(producer.send(topic=args.topic, value=record)) + print("Send complete...") + producer.flush() + producer.close() + count_success, count_failure = 0, 0 + for r in results: + if r.succeeded(): + count_success += 1 + elif r.failed(): + count_failure += 1 + else: + raise ValueError(r) + print("%d suceeded, %d failed" % (count_success, count_failure)) + + start_time = time.time() + _benchmark() + end_time = time.time() timer_stop.set() + timer.join() + print('Execution time:', end_time - start_time, 'secs') except Exception: exc_info = sys.exc_info() @@ -101,6 +93,8 @@ def __init__(self, interval, producer, event=None, raw_metrics=False): def print_stats(self): metrics = self.producer.metrics() + if not metrics: + return if self.raw_metrics: pprint.pprint(metrics) else: @@ -125,29 +119,28 @@ def get_args_parser(): parser = argparse.ArgumentParser( description='This tool is used to verify the producer performance.') + parser.add_argument( + '--bootstrap-servers', type=str, nargs='+', default=(), + help='host:port for cluster bootstrap server') parser.add_argument( '--topic', type=str, - help='Topic name for test', + help='Topic name for test (default: kafka-python-benchmark-test)', default='kafka-python-benchmark-test') parser.add_argument( '--num-records', type=int, - help='number of messages to produce', + help='number of messages to produce (default: 1000000)', default=1000000) parser.add_argument( '--record-size', type=int, - help='message size in bytes', + help='message size in bytes (default: 100)', default=100) parser.add_argument( '--producer-config', type=str, nargs='+', default=(), help='kafka producer related configuaration properties like ' 'bootstrap_servers,client_id etc..') - parser.add_argument( - '--brokers', type=int, - help='Number of kafka brokers to start', - default=0) parser.add_argument( '--stats-interval', type=int, - help='Interval in seconds for stats reporting to console', + help='Interval in seconds for stats reporting to console (default: 5)', default=5) parser.add_argument( '--raw-metrics', action='store_true', diff --git a/benchmarks/record_batch_compose.py b/kafka/benchmarks/record_batch_compose.py similarity index 89% rename from benchmarks/record_batch_compose.py rename to kafka/benchmarks/record_batch_compose.py index 5bdefa7af..5b07fd59a 100644 --- a/benchmarks/record_batch_compose.py +++ b/kafka/benchmarks/record_batch_compose.py @@ -71,7 +71,8 @@ def func(loops, magic): return res -runner = pyperf.Runner() -runner.bench_time_func('batch_append_v0', func, 0) -runner.bench_time_func('batch_append_v1', func, 1) -runner.bench_time_func('batch_append_v2', func, 2) +if __name__ == '__main__': + runner = pyperf.Runner() + runner.bench_time_func('batch_append_v0', func, 0) + runner.bench_time_func('batch_append_v1', func, 1) + runner.bench_time_func('batch_append_v2', func, 2) diff --git a/benchmarks/record_batch_read.py b/kafka/benchmarks/record_batch_read.py similarity index 90% rename from benchmarks/record_batch_read.py rename to kafka/benchmarks/record_batch_read.py index aa5e9c1e5..2ef32298d 100644 --- a/benchmarks/record_batch_read.py +++ b/kafka/benchmarks/record_batch_read.py @@ -76,7 +76,8 @@ def func(loops, magic): return res -runner = pyperf.Runner() -runner.bench_time_func('batch_read_v0', func, 0) -runner.bench_time_func('batch_read_v1', func, 1) -runner.bench_time_func('batch_read_v2', func, 2) +if __name__ == '__main__': + runner = pyperf.Runner() + runner.bench_time_func('batch_read_v0', func, 0) + runner.bench_time_func('batch_read_v1', func, 1) + runner.bench_time_func('batch_read_v2', func, 2) diff --git a/benchmarks/varint_speed.py b/kafka/benchmarks/varint_speed.py similarity index 81% rename from benchmarks/varint_speed.py rename to kafka/benchmarks/varint_speed.py index fd63d0ac1..b2628a1b5 100644 --- a/benchmarks/varint_speed.py +++ b/kafka/benchmarks/varint_speed.py @@ -113,8 +113,6 @@ def encode_varint_1(num): raise ValueError("Out of double range") return buf[:i + 1] -_assert_valid_enc(encode_varint_1) - def encode_varint_2(value, int2byte=six.int2byte): value = (value << 1) ^ (value >> 63) @@ -128,8 +126,6 @@ def encode_varint_2(value, int2byte=six.int2byte): value >>= 7 return res + int2byte(bits) -_assert_valid_enc(encode_varint_2) - def encode_varint_3(value, buf): append = buf.append @@ -145,12 +141,6 @@ def encode_varint_3(value, buf): return value -for encoded, decoded in test_data: - res = bytearray() - encode_varint_3(decoded, res) - assert res == encoded - - def encode_varint_4(value, int2byte=six.int2byte): value = (value << 1) ^ (value >> 63) @@ -185,12 +175,6 @@ def encode_varint_4(value, int2byte=six.int2byte): return res + int2byte(bits) -_assert_valid_enc(encode_varint_4) - -# import dis -# dis.dis(encode_varint_4) - - def encode_varint_5(value, buf, pos=0): value = (value << 1) ^ (value >> 63) @@ -204,12 +188,6 @@ def encode_varint_5(value, buf, pos=0): buf[pos] = bits return pos + 1 -for encoded, decoded in test_data: - res = bytearray(10) - written = encode_varint_5(decoded, res) - assert res[:written] == encoded - - def encode_varint_6(value, buf): append = buf.append value = (value << 1) ^ (value >> 63) @@ -253,12 +231,6 @@ def encode_varint_6(value, buf): return i -for encoded, decoded in test_data: - res = bytearray() - encode_varint_6(decoded, res) - assert res == encoded - - def size_of_varint_1(value): """ Number of bytes needed to encode an integer in variable-length format. """ @@ -271,8 +243,6 @@ def size_of_varint_1(value): break return res -_assert_valid_size(size_of_varint_1) - def size_of_varint_2(value): """ Number of bytes needed to encode an integer in variable-length format. @@ -298,8 +268,6 @@ def size_of_varint_2(value): return 9 return 10 -_assert_valid_size(size_of_varint_2) - if six.PY3: def _read_byte(memview, pos): @@ -351,8 +319,6 @@ def decode_varint_1(buffer, pos=0): # Normalize sign return (value >> 1) ^ -(value & 1), i + 1 -_assert_valid_dec(decode_varint_1) - def decode_varint_2(buffer, pos=0): result = 0 @@ -369,9 +335,6 @@ def decode_varint_2(buffer, pos=0): raise ValueError("Out of int64 range") -_assert_valid_dec(decode_varint_2) - - def decode_varint_3(buffer, pos=0): result = buffer[pos] if not (result & 0x81): @@ -393,51 +356,79 @@ def decode_varint_3(buffer, pos=0): raise ValueError("Out of int64 range") -_assert_valid_dec(decode_varint_3) - -# import dis -# dis.dis(decode_varint_3) - -runner = pyperf.Runner() -# Encode algorithms returning a bytes result -for bench_func in [ - encode_varint_1, - encode_varint_2, - encode_varint_4]: - for i, value in enumerate(BENCH_VALUES_ENC): - runner.bench_func( - '{}_{}byte'.format(bench_func.__name__, i + 1), - bench_func, value) - -# Encode algorithms writing to the buffer -for bench_func in [ - encode_varint_3, - encode_varint_5, - encode_varint_6]: - for i, value in enumerate(BENCH_VALUES_ENC): - fname = bench_func.__name__ - runner.timeit( - '{}_{}byte'.format(fname, i + 1), - stmt="{}({}, buffer)".format(fname, value), - setup="from __main__ import {}; buffer = bytearray(10)".format( - fname) - ) - -# Size algorithms -for bench_func in [ - size_of_varint_1, - size_of_varint_2]: - for i, value in enumerate(BENCH_VALUES_ENC): - runner.bench_func( - '{}_{}byte'.format(bench_func.__name__, i + 1), - bench_func, value) - -# Decode algorithms -for bench_func in [ - decode_varint_1, - decode_varint_2, - decode_varint_3]: - for i, value in enumerate(BENCH_VALUES_DEC): - runner.bench_func( - '{}_{}byte'.format(bench_func.__name__, i + 1), - bench_func, value) +if __name__ == '__main__': + _assert_valid_enc(encode_varint_1) + _assert_valid_enc(encode_varint_2) + + for encoded, decoded in test_data: + res = bytearray() + encode_varint_3(decoded, res) + assert res == encoded + + _assert_valid_enc(encode_varint_4) + + # import dis + # dis.dis(encode_varint_4) + + for encoded, decoded in test_data: + res = bytearray(10) + written = encode_varint_5(decoded, res) + assert res[:written] == encoded + + for encoded, decoded in test_data: + res = bytearray() + encode_varint_6(decoded, res) + assert res == encoded + + _assert_valid_size(size_of_varint_1) + _assert_valid_size(size_of_varint_2) + _assert_valid_dec(decode_varint_1) + _assert_valid_dec(decode_varint_2) + _assert_valid_dec(decode_varint_3) + + # import dis + # dis.dis(decode_varint_3) + + runner = pyperf.Runner() + # Encode algorithms returning a bytes result + for bench_func in [ + encode_varint_1, + encode_varint_2, + encode_varint_4]: + for i, value in enumerate(BENCH_VALUES_ENC): + runner.bench_func( + '{}_{}byte'.format(bench_func.__name__, i + 1), + bench_func, value) + + # Encode algorithms writing to the buffer + for bench_func in [ + encode_varint_3, + encode_varint_5, + encode_varint_6]: + for i, value in enumerate(BENCH_VALUES_ENC): + fname = bench_func.__name__ + runner.timeit( + '{}_{}byte'.format(fname, i + 1), + stmt="{}({}, buffer)".format(fname, value), + setup="from __main__ import {}; buffer = bytearray(10)".format( + fname) + ) + + # Size algorithms + for bench_func in [ + size_of_varint_1, + size_of_varint_2]: + for i, value in enumerate(BENCH_VALUES_ENC): + runner.bench_func( + '{}_{}byte'.format(bench_func.__name__, i + 1), + bench_func, value) + + # Decode algorithms + for bench_func in [ + decode_varint_1, + decode_varint_2, + decode_varint_3]: + for i, value in enumerate(BENCH_VALUES_DEC): + runner.bench_func( + '{}_{}byte'.format(bench_func.__name__, i + 1), + bench_func, value) diff --git a/pyproject.toml b/pyproject.toml index 2a675c111..d575a8959 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,7 @@ lz4 = ["lz4"] snappy = ["python-snappy"] zstd = ["zstandard"] testing = ["pytest", "mock; python_version < '3.3'", "pytest-mock", "pytest-timeout"] +benchmarks = ["pyperf"] [tool.setuptools] include-package-data = false diff --git a/requirements-dev.txt b/requirements-dev.txt index 3bc51fd78..8de5e28d4 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -6,6 +6,7 @@ lz4 mock; python_version < '3.3' py pylint +pyperf pytest pytest-cov pytest-mock