From 119d039ebb164e32573a82ffdb3fd004a642263b Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 4 Apr 2025 09:43:32 -0700 Subject: [PATCH 1/7] Rely on external brokers for benchmarks; add arg defaults to help text --- benchmarks/consumer_performance.py | 74 +++++++-------------------- benchmarks/producer_performance.py | 81 ++++++++++++++---------------- 2 files changed, 56 insertions(+), 99 deletions(-) diff --git a/benchmarks/consumer_performance.py b/benchmarks/consumer_performance.py index 9e3b6a919..d1587d061 100755 --- a/benchmarks/consumer_performance.py +++ b/benchmarks/consumer_performance.py @@ -8,39 +8,15 @@ import pprint import sys import threading +import time import traceback from kafka.vendor.six.moves import range from kafka import KafkaConsumer, KafkaProducer -from test.fixtures import KafkaFixture, ZookeeperFixture - -logging.basicConfig(level=logging.ERROR) - - -def start_brokers(n): - print('Starting {0} {1}-node cluster...'.format(KafkaFixture.kafka_version, n)) - print('-> 1 Zookeeper') - zk = ZookeeperFixture.instance() - print('---> {0}:{1}'.format(zk.host, zk.port)) - print() - - partitions = min(n, 3) - replicas = min(n, 3) - print('-> {0} Brokers [{1} partitions / {2} replicas]'.format(n, partitions, replicas)) - brokers = [ - KafkaFixture.instance(i, zk, zk_chroot='', - partitions=partitions, replicas=replicas) - for i in range(n) - ] - for broker in brokers: - print('---> {0}:{1}'.format(broker.host, broker.port)) - print() - return brokers class ConsumerPerformance(object): - @staticmethod def run(args): try: @@ -53,28 +29,17 @@ def run(args): pass if v == 'None': v = None + elif v == 'False': + v = False + elif v == 'True': + v = True props[k] = v - if args.brokers: - brokers = start_brokers(args.brokers) - props['bootstrap_servers'] = ['{0}:{1}'.format(broker.host, broker.port) - for broker in brokers] - print('---> bootstrap_servers={0}'.format(props['bootstrap_servers'])) - print() - - print('-> Producing records') - record = bytes(bytearray(args.record_size)) - producer = KafkaProducer(compression_type=args.fixture_compression, - **props) - for i in range(args.num_records): - producer.send(topic=args.topic, value=record) - producer.flush() - producer.close() - print('-> OK!') - print() - print('Initializing Consumer...') + props['bootstrap_servers'] = args.bootstrap_servers props['auto_offset_reset'] = 'earliest' + if 'group_id' not in props: + props['group_id'] = 'kafka-consumer-benchmark' if 'consumer_timeout_ms' not in props: props['consumer_timeout_ms'] = 10000 props['metrics_sample_window_ms'] = args.stats_interval * 1000 @@ -92,14 +57,18 @@ def run(args): print('-> OK!') print() + start_time = time.time() records = 0 for msg in consumer: records += 1 if records >= args.num_records: break - print('Consumed {0} records'.format(records)) + end_time = time.time() timer_stop.set() + timer.join() + print('Consumed {0} records'.format(records)) + print('Execution time:', end_time - start_time, 'secs') except Exception: exc_info = sys.exc_info() @@ -143,18 +112,17 @@ def get_args_parser(): parser = argparse.ArgumentParser( description='This tool is used to verify the consumer performance.') + parser.add_argument( + '--bootstrap-servers', type=str, nargs='+', default=(), + help='host:port for cluster bootstrap servers') parser.add_argument( '--topic', type=str, - help='Topic for consumer test', + help='Topic for consumer test (default: kafka-python-benchmark-test)', default='kafka-python-benchmark-test') parser.add_argument( '--num-records', type=int, - help='number of messages to consume', + help='number of messages to consume (default: 1000000)', default=1000000) - parser.add_argument( - '--record-size', type=int, - help='message size in bytes', - default=100) parser.add_argument( '--consumer-config', type=str, nargs='+', default=(), help='kafka consumer related configuration properties like ' @@ -162,13 +130,9 @@ def get_args_parser(): parser.add_argument( '--fixture-compression', type=str, help='specify a compression type for use with broker fixtures / producer') - parser.add_argument( - '--brokers', type=int, - help='Number of kafka brokers to start', - default=0) parser.add_argument( '--stats-interval', type=int, - help='Interval in seconds for stats reporting to console', + help='Interval in seconds for stats reporting to console (default: 5)', default=5) parser.add_argument( '--raw-metrics', action='store_true', diff --git a/benchmarks/producer_performance.py b/benchmarks/producer_performance.py index c0de6fd23..1a1092960 100755 --- a/benchmarks/producer_performance.py +++ b/benchmarks/producer_performance.py @@ -7,37 +7,15 @@ import pprint import sys import threading +import time import traceback from kafka.vendor.six.moves import range from kafka import KafkaProducer -from test.fixtures import KafkaFixture, ZookeeperFixture - - -def start_brokers(n): - print('Starting {0} {1}-node cluster...'.format(KafkaFixture.kafka_version, n)) - print('-> 1 Zookeeper') - zk = ZookeeperFixture.instance() - print('---> {0}:{1}'.format(zk.host, zk.port)) - print() - - partitions = min(n, 3) - replicas = min(n, 3) - print('-> {0} Brokers [{1} partitions / {2} replicas]'.format(n, partitions, replicas)) - brokers = [ - KafkaFixture.instance(i, zk, zk_chroot='', - partitions=partitions, replicas=replicas) - for i in range(n) - ] - for broker in brokers: - print('---> {0}:{1}'.format(broker.host, broker.port)) - print() - return brokers class ProducerPerformance(object): - @staticmethod def run(args): try: @@ -50,18 +28,14 @@ def run(args): pass if v == 'None': v = None + elif v == 'False': + v = False + elif v == 'True': + v = True props[k] = v - if args.brokers: - brokers = start_brokers(args.brokers) - props['bootstrap_servers'] = ['{0}:{1}'.format(broker.host, broker.port) - for broker in brokers] - print("---> bootstrap_servers={0}".format(props['bootstrap_servers'])) - print() - print('-> OK!') - print() - print('Initializing producer...') + props['bootstrap_servers'] = args.bootstrap_servers record = bytes(bytearray(args.record_size)) props['metrics_sample_window_ms'] = args.stats_interval * 1000 @@ -79,11 +53,29 @@ def run(args): print('-> OK!') print() - for i in range(args.num_records): - producer.send(topic=args.topic, value=record) - producer.flush() - + def _benchmark(): + results = [] + for i in range(args.num_records): + results.append(producer.send(topic=args.topic, value=record)) + print("Send complete...") + producer.flush() + producer.close() + count_success, count_failure = 0, 0 + for r in results: + if r.succeeded(): + count_success += 1 + elif r.failed(): + count_failure += 1 + else: + raise ValueError(r) + print("%d suceeded, %d failed" % (count_success, count_failure)) + + start_time = time.time() + _benchmark() + end_time = time.time() timer_stop.set() + timer.join() + print('Execution time:', end_time - start_time, 'secs') except Exception: exc_info = sys.exc_info() @@ -101,6 +93,8 @@ def __init__(self, interval, producer, event=None, raw_metrics=False): def print_stats(self): metrics = self.producer.metrics() + if not metrics: + return if self.raw_metrics: pprint.pprint(metrics) else: @@ -125,29 +119,28 @@ def get_args_parser(): parser = argparse.ArgumentParser( description='This tool is used to verify the producer performance.') + parser.add_argument( + '--bootstrap-servers', type=str, nargs='+', default=(), + help='host:port for cluster bootstrap server') parser.add_argument( '--topic', type=str, - help='Topic name for test', + help='Topic name for test (default: kafka-python-benchmark-test)', default='kafka-python-benchmark-test') parser.add_argument( '--num-records', type=int, - help='number of messages to produce', + help='number of messages to produce (default: 1000000)', default=1000000) parser.add_argument( '--record-size', type=int, - help='message size in bytes', + help='message size in bytes (default: 100)', default=100) parser.add_argument( '--producer-config', type=str, nargs='+', default=(), help='kafka producer related configuaration properties like ' 'bootstrap_servers,client_id etc..') - parser.add_argument( - '--brokers', type=int, - help='Number of kafka brokers to start', - default=0) parser.add_argument( '--stats-interval', type=int, - help='Interval in seconds for stats reporting to console', + help='Interval in seconds for stats reporting to console (default: 5)', default=5) parser.add_argument( '--raw-metrics', action='store_true', From 6cb22f08641d722ef3a9010f90285d98813e0c58 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 4 Apr 2025 09:44:50 -0700 Subject: [PATCH 2/7] Move to kafka.benchmarks --- {benchmarks => kafka/benchmarks}/README.md | 0 {benchmarks => kafka/benchmarks}/consumer_performance.py | 0 {benchmarks => kafka/benchmarks}/load_example.py | 0 {benchmarks => kafka/benchmarks}/producer_performance.py | 0 {benchmarks => kafka/benchmarks}/record_batch_compose.py | 0 {benchmarks => kafka/benchmarks}/record_batch_read.py | 0 {benchmarks => kafka/benchmarks}/varint_speed.py | 0 7 files changed, 0 insertions(+), 0 deletions(-) rename {benchmarks => kafka/benchmarks}/README.md (100%) rename {benchmarks => kafka/benchmarks}/consumer_performance.py (100%) rename {benchmarks => kafka/benchmarks}/load_example.py (100%) rename {benchmarks => kafka/benchmarks}/producer_performance.py (100%) rename {benchmarks => kafka/benchmarks}/record_batch_compose.py (100%) rename {benchmarks => kafka/benchmarks}/record_batch_read.py (100%) rename {benchmarks => kafka/benchmarks}/varint_speed.py (100%) diff --git a/benchmarks/README.md b/kafka/benchmarks/README.md similarity index 100% rename from benchmarks/README.md rename to kafka/benchmarks/README.md diff --git a/benchmarks/consumer_performance.py b/kafka/benchmarks/consumer_performance.py similarity index 100% rename from benchmarks/consumer_performance.py rename to kafka/benchmarks/consumer_performance.py diff --git a/benchmarks/load_example.py b/kafka/benchmarks/load_example.py similarity index 100% rename from benchmarks/load_example.py rename to kafka/benchmarks/load_example.py diff --git a/benchmarks/producer_performance.py b/kafka/benchmarks/producer_performance.py similarity index 100% rename from benchmarks/producer_performance.py rename to kafka/benchmarks/producer_performance.py diff --git a/benchmarks/record_batch_compose.py b/kafka/benchmarks/record_batch_compose.py similarity index 100% rename from benchmarks/record_batch_compose.py rename to kafka/benchmarks/record_batch_compose.py diff --git a/benchmarks/record_batch_read.py b/kafka/benchmarks/record_batch_read.py similarity index 100% rename from benchmarks/record_batch_read.py rename to kafka/benchmarks/record_batch_read.py diff --git a/benchmarks/varint_speed.py b/kafka/benchmarks/varint_speed.py similarity index 100% rename from benchmarks/varint_speed.py rename to kafka/benchmarks/varint_speed.py From 0d3072171f48cf32bef39c45c06fcd303ae08287 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 4 Apr 2025 09:53:32 -0700 Subject: [PATCH 3/7] Wrap all benchmarks in __main__ --- kafka/benchmarks/consumer_performance.py | 0 kafka/benchmarks/load_example.py | 1 + kafka/benchmarks/producer_performance.py | 0 kafka/benchmarks/record_batch_compose.py | 9 +- kafka/benchmarks/record_batch_read.py | 9 +- kafka/benchmarks/varint_speed.py | 161 +++++++++++------------ 6 files changed, 87 insertions(+), 93 deletions(-) mode change 100755 => 100644 kafka/benchmarks/consumer_performance.py mode change 100755 => 100644 kafka/benchmarks/load_example.py mode change 100755 => 100644 kafka/benchmarks/producer_performance.py diff --git a/kafka/benchmarks/consumer_performance.py b/kafka/benchmarks/consumer_performance.py old mode 100755 new mode 100644 diff --git a/kafka/benchmarks/load_example.py b/kafka/benchmarks/load_example.py old mode 100755 new mode 100644 index eef113e9a..25d571671 --- a/kafka/benchmarks/load_example.py +++ b/kafka/benchmarks/load_example.py @@ -20,6 +20,7 @@ def run(self): producer.send('my-topic', self.big_msg) self.sent += 1 producer.flush() + producer.close() class Consumer(threading.Thread): diff --git a/kafka/benchmarks/producer_performance.py b/kafka/benchmarks/producer_performance.py old mode 100755 new mode 100644 diff --git a/kafka/benchmarks/record_batch_compose.py b/kafka/benchmarks/record_batch_compose.py index 5bdefa7af..5b07fd59a 100644 --- a/kafka/benchmarks/record_batch_compose.py +++ b/kafka/benchmarks/record_batch_compose.py @@ -71,7 +71,8 @@ def func(loops, magic): return res -runner = pyperf.Runner() -runner.bench_time_func('batch_append_v0', func, 0) -runner.bench_time_func('batch_append_v1', func, 1) -runner.bench_time_func('batch_append_v2', func, 2) +if __name__ == '__main__': + runner = pyperf.Runner() + runner.bench_time_func('batch_append_v0', func, 0) + runner.bench_time_func('batch_append_v1', func, 1) + runner.bench_time_func('batch_append_v2', func, 2) diff --git a/kafka/benchmarks/record_batch_read.py b/kafka/benchmarks/record_batch_read.py index aa5e9c1e5..2ef32298d 100644 --- a/kafka/benchmarks/record_batch_read.py +++ b/kafka/benchmarks/record_batch_read.py @@ -76,7 +76,8 @@ def func(loops, magic): return res -runner = pyperf.Runner() -runner.bench_time_func('batch_read_v0', func, 0) -runner.bench_time_func('batch_read_v1', func, 1) -runner.bench_time_func('batch_read_v2', func, 2) +if __name__ == '__main__': + runner = pyperf.Runner() + runner.bench_time_func('batch_read_v0', func, 0) + runner.bench_time_func('batch_read_v1', func, 1) + runner.bench_time_func('batch_read_v2', func, 2) diff --git a/kafka/benchmarks/varint_speed.py b/kafka/benchmarks/varint_speed.py index fd63d0ac1..b2628a1b5 100644 --- a/kafka/benchmarks/varint_speed.py +++ b/kafka/benchmarks/varint_speed.py @@ -113,8 +113,6 @@ def encode_varint_1(num): raise ValueError("Out of double range") return buf[:i + 1] -_assert_valid_enc(encode_varint_1) - def encode_varint_2(value, int2byte=six.int2byte): value = (value << 1) ^ (value >> 63) @@ -128,8 +126,6 @@ def encode_varint_2(value, int2byte=six.int2byte): value >>= 7 return res + int2byte(bits) -_assert_valid_enc(encode_varint_2) - def encode_varint_3(value, buf): append = buf.append @@ -145,12 +141,6 @@ def encode_varint_3(value, buf): return value -for encoded, decoded in test_data: - res = bytearray() - encode_varint_3(decoded, res) - assert res == encoded - - def encode_varint_4(value, int2byte=six.int2byte): value = (value << 1) ^ (value >> 63) @@ -185,12 +175,6 @@ def encode_varint_4(value, int2byte=six.int2byte): return res + int2byte(bits) -_assert_valid_enc(encode_varint_4) - -# import dis -# dis.dis(encode_varint_4) - - def encode_varint_5(value, buf, pos=0): value = (value << 1) ^ (value >> 63) @@ -204,12 +188,6 @@ def encode_varint_5(value, buf, pos=0): buf[pos] = bits return pos + 1 -for encoded, decoded in test_data: - res = bytearray(10) - written = encode_varint_5(decoded, res) - assert res[:written] == encoded - - def encode_varint_6(value, buf): append = buf.append value = (value << 1) ^ (value >> 63) @@ -253,12 +231,6 @@ def encode_varint_6(value, buf): return i -for encoded, decoded in test_data: - res = bytearray() - encode_varint_6(decoded, res) - assert res == encoded - - def size_of_varint_1(value): """ Number of bytes needed to encode an integer in variable-length format. """ @@ -271,8 +243,6 @@ def size_of_varint_1(value): break return res -_assert_valid_size(size_of_varint_1) - def size_of_varint_2(value): """ Number of bytes needed to encode an integer in variable-length format. @@ -298,8 +268,6 @@ def size_of_varint_2(value): return 9 return 10 -_assert_valid_size(size_of_varint_2) - if six.PY3: def _read_byte(memview, pos): @@ -351,8 +319,6 @@ def decode_varint_1(buffer, pos=0): # Normalize sign return (value >> 1) ^ -(value & 1), i + 1 -_assert_valid_dec(decode_varint_1) - def decode_varint_2(buffer, pos=0): result = 0 @@ -369,9 +335,6 @@ def decode_varint_2(buffer, pos=0): raise ValueError("Out of int64 range") -_assert_valid_dec(decode_varint_2) - - def decode_varint_3(buffer, pos=0): result = buffer[pos] if not (result & 0x81): @@ -393,51 +356,79 @@ def decode_varint_3(buffer, pos=0): raise ValueError("Out of int64 range") -_assert_valid_dec(decode_varint_3) - -# import dis -# dis.dis(decode_varint_3) - -runner = pyperf.Runner() -# Encode algorithms returning a bytes result -for bench_func in [ - encode_varint_1, - encode_varint_2, - encode_varint_4]: - for i, value in enumerate(BENCH_VALUES_ENC): - runner.bench_func( - '{}_{}byte'.format(bench_func.__name__, i + 1), - bench_func, value) - -# Encode algorithms writing to the buffer -for bench_func in [ - encode_varint_3, - encode_varint_5, - encode_varint_6]: - for i, value in enumerate(BENCH_VALUES_ENC): - fname = bench_func.__name__ - runner.timeit( - '{}_{}byte'.format(fname, i + 1), - stmt="{}({}, buffer)".format(fname, value), - setup="from __main__ import {}; buffer = bytearray(10)".format( - fname) - ) - -# Size algorithms -for bench_func in [ - size_of_varint_1, - size_of_varint_2]: - for i, value in enumerate(BENCH_VALUES_ENC): - runner.bench_func( - '{}_{}byte'.format(bench_func.__name__, i + 1), - bench_func, value) - -# Decode algorithms -for bench_func in [ - decode_varint_1, - decode_varint_2, - decode_varint_3]: - for i, value in enumerate(BENCH_VALUES_DEC): - runner.bench_func( - '{}_{}byte'.format(bench_func.__name__, i + 1), - bench_func, value) +if __name__ == '__main__': + _assert_valid_enc(encode_varint_1) + _assert_valid_enc(encode_varint_2) + + for encoded, decoded in test_data: + res = bytearray() + encode_varint_3(decoded, res) + assert res == encoded + + _assert_valid_enc(encode_varint_4) + + # import dis + # dis.dis(encode_varint_4) + + for encoded, decoded in test_data: + res = bytearray(10) + written = encode_varint_5(decoded, res) + assert res[:written] == encoded + + for encoded, decoded in test_data: + res = bytearray() + encode_varint_6(decoded, res) + assert res == encoded + + _assert_valid_size(size_of_varint_1) + _assert_valid_size(size_of_varint_2) + _assert_valid_dec(decode_varint_1) + _assert_valid_dec(decode_varint_2) + _assert_valid_dec(decode_varint_3) + + # import dis + # dis.dis(decode_varint_3) + + runner = pyperf.Runner() + # Encode algorithms returning a bytes result + for bench_func in [ + encode_varint_1, + encode_varint_2, + encode_varint_4]: + for i, value in enumerate(BENCH_VALUES_ENC): + runner.bench_func( + '{}_{}byte'.format(bench_func.__name__, i + 1), + bench_func, value) + + # Encode algorithms writing to the buffer + for bench_func in [ + encode_varint_3, + encode_varint_5, + encode_varint_6]: + for i, value in enumerate(BENCH_VALUES_ENC): + fname = bench_func.__name__ + runner.timeit( + '{}_{}byte'.format(fname, i + 1), + stmt="{}({}, buffer)".format(fname, value), + setup="from __main__ import {}; buffer = bytearray(10)".format( + fname) + ) + + # Size algorithms + for bench_func in [ + size_of_varint_1, + size_of_varint_2]: + for i, value in enumerate(BENCH_VALUES_ENC): + runner.bench_func( + '{}_{}byte'.format(bench_func.__name__, i + 1), + bench_func, value) + + # Decode algorithms + for bench_func in [ + decode_varint_1, + decode_varint_2, + decode_varint_3]: + for i, value in enumerate(BENCH_VALUES_DEC): + runner.bench_func( + '{}_{}byte'.format(bench_func.__name__, i + 1), + bench_func, value) From d52dffe39220be668e9085de9ebd2df78148fc9e Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 4 Apr 2025 09:56:32 -0700 Subject: [PATCH 4/7] Add pyperf to requirements-dev and [benchmarks] optional deps --- pyproject.toml | 1 + requirements-dev.txt | 1 + 2 files changed, 2 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 2a675c111..d575a8959 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,7 @@ lz4 = ["lz4"] snappy = ["python-snappy"] zstd = ["zstandard"] testing = ["pytest", "mock; python_version < '3.3'", "pytest-mock", "pytest-timeout"] +benchmarks = ["pyperf"] [tool.setuptools] include-package-data = false diff --git a/requirements-dev.txt b/requirements-dev.txt index 3bc51fd78..8de5e28d4 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -6,6 +6,7 @@ lz4 mock; python_version < '3.3' py pylint +pyperf pytest pytest-cov pytest-mock From 120a7753d9841967ce6b2ff74e367b064ed193dd Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 4 Apr 2025 11:18:19 -0700 Subject: [PATCH 5/7] __init__.py --- kafka/benchmarks/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 kafka/benchmarks/__init__.py diff --git a/kafka/benchmarks/__init__.py b/kafka/benchmarks/__init__.py new file mode 100644 index 000000000..e69de29bb From 75c0fa104e1d400f12856999891800695240919b Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 4 Apr 2025 12:08:02 -0700 Subject: [PATCH 6/7] Use argparse for load_example --- kafka/benchmarks/load_example.py | 89 +++++++++++++++++++++++--------- 1 file changed, 66 insertions(+), 23 deletions(-) diff --git a/kafka/benchmarks/load_example.py b/kafka/benchmarks/load_example.py index 25d571671..29796a74c 100644 --- a/kafka/benchmarks/load_example.py +++ b/kafka/benchmarks/load_example.py @@ -1,67 +1,110 @@ #!/usr/bin/env python from __future__ import print_function -import threading, logging, time -from kafka import KafkaConsumer, KafkaProducer +import argparse +import logging +import threading +import time -msg_size = 524288 +from kafka import KafkaConsumer, KafkaProducer -producer_stop = threading.Event() -consumer_stop = threading.Event() class Producer(threading.Thread): - big_msg = b'1' * msg_size + + def __init__(self, bootstrap_servers, topic, stop_event, msg_size): + super(Producer, self).__init__() + self.bootstrap_servers = bootstrap_servers + self.topic = topic + self.stop_event = stop_event + self.big_msg = b'1' * msg_size def run(self): - producer = KafkaProducer(bootstrap_servers='localhost:9092') + producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers) self.sent = 0 - while not producer_stop.is_set(): - producer.send('my-topic', self.big_msg) + while not self.stop_event.is_set(): + producer.send(self.topic, self.big_msg) self.sent += 1 producer.flush() producer.close() class Consumer(threading.Thread): + def __init__(self, bootstrap_servers, topic, stop_event, msg_size): + super(Consumer, self).__init__() + self.bootstrap_servers = bootstrap_servers + self.topic = topic + self.stop_event = stop_event + self.msg_size = msg_size def run(self): - consumer = KafkaConsumer(bootstrap_servers='localhost:9092', + consumer = KafkaConsumer(bootstrap_servers=self.bootstrap_servers, auto_offset_reset='earliest') - consumer.subscribe(['my-topic']) + consumer.subscribe([self.topic]) self.valid = 0 self.invalid = 0 for message in consumer: - if len(message.value) == msg_size: + if len(message.value) == self.msg_size: self.valid += 1 else: + print('Invalid message:', len(message.value), self.msg_size) self.invalid += 1 - if consumer_stop.is_set(): + if self.stop_event.is_set(): break - consumer.close() -def main(): + +def get_args_parser(): + parser = argparse.ArgumentParser( + description='This tool is used to demonstrate consumer and producer load.') + + parser.add_argument( + '--bootstrap-servers', type=str, nargs='+', default=('localhost:9092'), + help='host:port for cluster bootstrap servers (default: localhost:9092)') + parser.add_argument( + '--topic', type=str, + help='Topic for load test (default: kafka-python-benchmark-load-example)', + default='kafka-python-benchmark-load-example') + parser.add_argument( + '--msg-size', type=int, + help='Message size, in bytes, for load test (default: 524288)', + default=524288) + parser.add_argument( + '--load-time', type=int, + help='number of seconds to run load test (default: 10)', + default=10) + parser.add_argument( + '--log-level', type=str, + help='Optional logging level for load test: ERROR|INFO|DEBUG etc', + default=None) + return parser + + +def main(args): + if args.log_level: + logging.basicConfig( + format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', + level=getattr(logging, args.log_level)) + producer_stop = threading.Event() + consumer_stop = threading.Event() threads = [ - Producer(), - Consumer() + Producer(args.bootstrap_servers, args.topic, producer_stop, args.msg_size), + Consumer(args.bootstrap_servers, args.topic, consumer_stop, args.msg_size) ] for t in threads: t.start() - time.sleep(10) + time.sleep(args.load_time) producer_stop.set() consumer_stop.set() print('Messages sent: %d' % threads[0].sent) print('Messages recvd: %d' % threads[1].valid) print('Messages invalid: %d' % threads[1].invalid) + if __name__ == "__main__": - logging.basicConfig( - format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', - level=logging.INFO - ) - main() + args = get_args_parser().parse_args() + main(args) From 2d8500e3f7582794f66089497dea69e7b906baae Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 4 Apr 2025 12:15:49 -0700 Subject: [PATCH 7/7] lint fixups --- kafka/benchmarks/consumer_performance.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/kafka/benchmarks/consumer_performance.py b/kafka/benchmarks/consumer_performance.py index d1587d061..c35a164c2 100644 --- a/kafka/benchmarks/consumer_performance.py +++ b/kafka/benchmarks/consumer_performance.py @@ -4,16 +4,13 @@ from __future__ import absolute_import, print_function import argparse -import logging import pprint import sys import threading import time import traceback -from kafka.vendor.six.moves import range - -from kafka import KafkaConsumer, KafkaProducer +from kafka import KafkaConsumer class ConsumerPerformance(object):