From 581bfa50846cd927a86719badc5ee2313b77bb33 Mon Sep 17 00:00:00 2001 From: Tom Willemsen Date: Sat, 10 Jan 2026 13:25:30 +0000 Subject: [PATCH 1/2] Howl --- README.md | 6 ++++ pyproject.toml | 3 +- src/saluki/howl.py | 82 ++++++++++++++++++++++++++++++++++++++++++++++ src/saluki/main.py | 36 ++++++++++++++++++-- 4 files changed, 124 insertions(+), 3 deletions(-) create mode 100644 src/saluki/howl.py diff --git a/README.md b/README.md index 04e27db..5df7a65 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,12 @@ INFO:saluki: 0 - low:7515, high:7551, num_messages:36 `saluki play mybroker:9092/source_topic mybroker:9092/dest_topic -t 1762209990 1762209992` - This will forward messages between the two given timestamps. +## `howl` - Produce fake `ev44` messages to a topic + +``` +saluki howl mybroker:9092/dest_topic --events-per-frame 200 --frames-per-second 50 --tof-peak 10000000 --tof-sigma 5000000 --det-min 0 --det-max 500 +``` + # Developer setup `pip install -e .[dev]` diff --git a/pyproject.toml b/pyproject.toml index a74c110..2326bf2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,8 @@ dependencies = [ "ess-streaming-data-types", "confluent-kafka>=2.12.1", # for produce_batch in play() "python-dateutil", - "tzdata" + "tzdata", + "numpy", ] readme = {file = "README.md", content-type = "text/markdown"} license-files = ["LICENSE"] diff --git a/src/saluki/howl.py b/src/saluki/howl.py new file mode 100644 index 0000000..d06774e --- /dev/null +++ b/src/saluki/howl.py @@ -0,0 +1,82 @@ +import logging +import time + +import numpy as np +from confluent_kafka import Producer +from streaming_data_types import serialise_ev44 + +logger = logging.getLogger("saluki") + +RNG = np.random.default_rng() + + +def generate_fake_ev44( + msg_id: int, + events_per_frame: int, + tof_peak: float, + tof_sigma: float, + det_min: int, + det_max: int, +) -> bytes: + detector_ids = np.random.randint(low=det_min, high=det_max, size=events_per_frame) + tofs = np.maximum(0.0, RNG.normal(loc=tof_peak, scale=tof_sigma, size=events_per_frame)) + + return serialise_ev44( + source_name="saluki", + reference_time=[time.time() * 1_000_000_000], + message_id=msg_id, + reference_time_index=[0], + time_of_flight=tofs, + pixel_id=detector_ids, + ) + + +def howl( + broker: str, + topic: str, + events_per_frame: int, + frames_per_second: int, + tof_peak: float, + tof_sigma: float, + det_min: int, + det_max: int, +) -> None: + """ + Prints the broker and topic metadata for a given broker. + If a topic is given, only this topic's partitions and watermarks will be printed. + :param broker: The broker address including port number. + :param topic: Optional topic to filter information to. + """ + + producer = Producer( + { + "bootstrap.servers": broker, + } + ) + + target_frame_time = 1 / frames_per_second + + msg_id = 0 + + ev44_size = len(generate_fake_ev44(0, events_per_frame, tof_peak, tof_sigma, det_min, det_max)) + rate_bytes_per_sec = ev44_size * frames_per_second + rate_mbit_per_sec = (rate_bytes_per_sec / 1024**2) * 8 + logger.info(f"Attempting to simulate data rate: {rate_mbit_per_sec:.3f} MBit/s") + + while True: + start_time = time.time() + target_end_time = start_time + target_frame_time + + producer.produce( + topic=topic, + key=None, + value=generate_fake_ev44( + msg_id, events_per_frame, tof_peak, tof_sigma, det_min, det_max + ), + ) + msg_id += 1 + + sleep_time = max(target_end_time - time.time(), 0) + if sleep_time == 0: + logger.warning("saluki-howl cannot keep up with target event/frame rate") + time.sleep(sleep_time) diff --git a/src/saluki/main.py b/src/saluki/main.py index 1bcf97b..a8604d6 100644 --- a/src/saluki/main.py +++ b/src/saluki/main.py @@ -6,6 +6,7 @@ from saluki.listen import listen from saluki.play import play from saluki.sniff import sniff +from saluki.howl import howl from saluki.utils import dateutil_parsable_or_unix_timestamp, parse_kafka_uri logger = logging.getLogger("saluki") @@ -15,6 +16,7 @@ _CONSUME = "consume" _PLAY = "play" _SNIFF = "sniff" +_HOWL = "howl" def main() -> None: @@ -52,7 +54,9 @@ def main() -> None: _SNIFF, help="sniff - broker metadata", parents=[common_options] ) sniff_parser.add_argument( - "broker", type=str, help="broker, optionally suffixed with a topic name to filter to" + "broker", + type=str, + help="broker, optionally suffixed with a topic name to filter to", ) consumer_parser = argparse.ArgumentParser(add_help=False) @@ -65,7 +69,9 @@ def main() -> None: ) consumer_mode_parser = sub_parsers.add_parser( - _CONSUME, help="consumer mode", parents=[topic_parser, consumer_parser, common_options] + _CONSUME, + help="consumer mode", + parents=[topic_parser, consumer_parser, common_options], ) consumer_mode_parser.add_argument( "-m", @@ -120,6 +126,19 @@ def main() -> None: nargs=2, ) + howl_parser = sub_parsers.add_parser( + _HOWL, + help="replay mode - replay data into another topic", + parents=[common_options], + ) + howl_parser.add_argument("topic", type=str, help="Destination topic") + howl_parser.add_argument("--events-per-frame", type=int, help="Events per frame to simulate") + howl_parser.add_argument("--frames-per-second", type=int, help="Frames per second to simulate") + howl_parser.add_argument("--tof-peak", type=float, help="Time-of-flight peak (ns)") + howl_parser.add_argument("--tof-sigma", type=float, help="Time-of-flight sigma (ns)") + howl_parser.add_argument("--det-min", type=int, help="Minimum detector ID") + howl_parser.add_argument("--det-max", type=int, help="Maximum detector ID") + if len(sys.argv) == 1: parser.print_help() sys.exit(1) @@ -169,6 +188,19 @@ def main() -> None: except RuntimeError: logger.debug(f"Sniffing whole broker {args.broker}") sniff(args.broker) + elif args.command == _HOWL: + broker, topic = parse_kafka_uri(args.topic) + logger.debug(f"Howling to topic {topic} on broker {broker}") + howl( + broker, + topic, + events_per_frame=args.events_per_frame, + frames_per_second=args.frames_per_second, + tof_peak=args.tof_peak, + tof_sigma=args.tof_sigma, + det_min=args.det_min, + det_max=args.det_max, + ) if __name__ == "__main__": From 215d8c390d40d0c421595a6c5cfc08dff93b6478 Mon Sep 17 00:00:00 2001 From: Tom Willemsen Date: Mon, 12 Jan 2026 18:34:42 +0000 Subject: [PATCH 2/2] howl MORE --- README.md | 7 ++-- src/saluki/howl.py | 81 ++++++++++++++++++++++++++++++++++++--------- src/saluki/main.py | 38 ++++++++++++++------- src/saluki/play.py | 3 +- src/saluki/utils.py | 4 ++- tests/test_howl.py | 0 tests/test_utils.py | 7 ++-- 7 files changed, 107 insertions(+), 33 deletions(-) create mode 100644 tests/test_howl.py diff --git a/README.md b/README.md index 5df7a65..beeddba 100644 --- a/README.md +++ b/README.md @@ -59,10 +59,13 @@ INFO:saluki: 0 - low:7515, high:7551, num_messages:36 `saluki play mybroker:9092/source_topic mybroker:9092/dest_topic -t 1762209990 1762209992` - This will forward messages between the two given timestamps. -## `howl` - Produce fake `ev44` messages to a topic +## `howl` - Produce fake run-like messages + +`saluki-howl` emits `ev44` events, `pl72` run starts, and `6s4t` run stops to Kafka, in a format which +look somewhat like a real run. ``` -saluki howl mybroker:9092/dest_topic --events-per-frame 200 --frames-per-second 50 --tof-peak 10000000 --tof-sigma 5000000 --det-min 0 --det-max 500 +saluki howl mybroker:9092 SOME_PREFIX ``` # Developer setup diff --git a/src/saluki/howl.py b/src/saluki/howl.py index d06774e..7dc3781 100644 --- a/src/saluki/howl.py +++ b/src/saluki/howl.py @@ -1,16 +1,19 @@ +import json import logging import time +import uuid import numpy as np from confluent_kafka import Producer -from streaming_data_types import serialise_ev44 +from streaming_data_types import serialise_6s4t, serialise_ev44, serialise_pl72 +from streaming_data_types.run_start_pl72 import DetectorSpectrumMap logger = logging.getLogger("saluki") RNG = np.random.default_rng() -def generate_fake_ev44( +def generate_fake_events( msg_id: int, events_per_frame: int, tof_peak: float, @@ -18,7 +21,7 @@ def generate_fake_ev44( det_min: int, det_max: int, ) -> bytes: - detector_ids = np.random.randint(low=det_min, high=det_max, size=events_per_frame) + detector_ids = RNG.integers(low=det_min, high=det_max, size=events_per_frame) tofs = np.maximum(0.0, RNG.normal(loc=tof_peak, scale=tof_sigma, size=events_per_frame)) return serialise_ev44( @@ -31,50 +34,98 @@ def generate_fake_ev44( ) +def generate_run_start(det_max: int) -> bytes: + det_spec_map = DetectorSpectrumMap( + detector_ids=np.arange(0, det_max, dtype=np.int32), + spectrum_numbers=np.arange(0, det_max, dtype=np.int32), + n_spectra=det_max, + ) + return serialise_pl72( + start_time=int(time.time() * 1000), + stop_time=None, + run_name=f"saluki-howl-{uuid.uuid4()}", + instrument_name="saluki-howl", + nexus_structure=json.dumps({}), + job_id=str(uuid.uuid4()), + filename=str(uuid.uuid4()), + detector_spectrum_map=det_spec_map, + ) + + +def generate_run_stop() -> bytes: + return serialise_6s4t( + stop_time=int(time.time() * 1000), + job_id=str(uuid.uuid4()), + ) + + def howl( broker: str, - topic: str, + topic_prefix: str, events_per_frame: int, frames_per_second: int, + frames_per_run: int, tof_peak: float, tof_sigma: float, det_min: int, det_max: int, ) -> None: """ - Prints the broker and topic metadata for a given broker. - If a topic is given, only this topic's partitions and watermarks will be printed. - :param broker: The broker address including port number. - :param topic: Optional topic to filter information to. + Send messages vaguely resembling a run to Kafka. """ producer = Producer( { "bootstrap.servers": broker, + "queue.buffering.max.messages": 100000, + "queue.buffering.max.ms": 20, } ) target_frame_time = 1 / frames_per_second - msg_id = 0 + frames = 0 - ev44_size = len(generate_fake_ev44(0, events_per_frame, tof_peak, tof_sigma, det_min, det_max)) + ev44_size = len( + generate_fake_events(0, events_per_frame, tof_peak, tof_sigma, det_min, det_max) + ) rate_bytes_per_sec = ev44_size * frames_per_second rate_mbit_per_sec = (rate_bytes_per_sec / 1024**2) * 8 - logger.info(f"Attempting to simulate data rate: {rate_mbit_per_sec:.3f} MBit/s") + logger.info(f"Attempting to simulate data rate: {rate_mbit_per_sec:.3f} Mbit/s") + logger.info(f"Each ev44 is {ev44_size} bytes") + + producer.produce( + topic=f"{topic_prefix}_runInfo", + key=None, + value=generate_run_start(det_max), + ) while True: start_time = time.time() target_end_time = start_time + target_frame_time producer.produce( - topic=topic, + topic=f"{topic_prefix}_events", key=None, - value=generate_fake_ev44( - msg_id, events_per_frame, tof_peak, tof_sigma, det_min, det_max + value=generate_fake_events( + frames, events_per_frame, tof_peak, tof_sigma, det_min, det_max ), ) - msg_id += 1 + producer.poll(0) + frames += 1 + + if frames_per_run != 0 and frames % frames_per_run == 0: + logger.info(f"Starting new run after {frames_per_run} simulated frames") + producer.produce( + topic=f"{topic_prefix}_runInfo", + key=None, + value=generate_run_stop(), + ) + producer.produce( + topic=f"{topic_prefix}_runInfo", + key=None, + value=generate_run_start(det_max), + ) sleep_time = max(target_end_time - time.time(), 0) if sleep_time == 0: diff --git a/src/saluki/main.py b/src/saluki/main.py index a8604d6..5cc793a 100644 --- a/src/saluki/main.py +++ b/src/saluki/main.py @@ -3,10 +3,10 @@ import sys from saluki.consume import consume +from saluki.howl import howl from saluki.listen import listen from saluki.play import play from saluki.sniff import sniff -from saluki.howl import howl from saluki.utils import dateutil_parsable_or_unix_timestamp, parse_kafka_uri logger = logging.getLogger("saluki") @@ -131,13 +131,28 @@ def main() -> None: help="replay mode - replay data into another topic", parents=[common_options], ) - howl_parser.add_argument("topic", type=str, help="Destination topic") - howl_parser.add_argument("--events-per-frame", type=int, help="Events per frame to simulate") - howl_parser.add_argument("--frames-per-second", type=int, help="Frames per second to simulate") - howl_parser.add_argument("--tof-peak", type=float, help="Time-of-flight peak (ns)") - howl_parser.add_argument("--tof-sigma", type=float, help="Time-of-flight sigma (ns)") - howl_parser.add_argument("--det-min", type=int, help="Minimum detector ID") - howl_parser.add_argument("--det-max", type=int, help="Maximum detector ID") + howl_parser.add_argument("broker", type=str, help="Kafka broker URL") + howl_parser.add_argument("topic_prefix", type=str, help="Topic prefix e.g. INSTNAME") + howl_parser.add_argument( + "--events-per-frame", type=int, help="Events per frame to simulate", default=100 + ) + howl_parser.add_argument( + "--frames-per-second", type=int, help="Frames per second to simulate", default=1 + ) + howl_parser.add_argument( + "--frames-per-run", + type=int, + help="Frames to take before beginning new run (0 to run forever)", + default=0, + ) + howl_parser.add_argument( + "--tof-peak", type=float, help="Time-of-flight peak (ns)", default=10_000_000 + ) + howl_parser.add_argument( + "--tof-sigma", type=float, help="Time-of-flight sigma (ns)", default=2_000_000 + ) + howl_parser.add_argument("--det-min", type=int, help="Minimum detector ID", default=0) + howl_parser.add_argument("--det-max", type=int, help="Maximum detector ID", default=1000) if len(sys.argv) == 1: parser.print_help() @@ -189,13 +204,12 @@ def main() -> None: logger.debug(f"Sniffing whole broker {args.broker}") sniff(args.broker) elif args.command == _HOWL: - broker, topic = parse_kafka_uri(args.topic) - logger.debug(f"Howling to topic {topic} on broker {broker}") howl( - broker, - topic, + args.broker, + args.topic_prefix, events_per_frame=args.events_per_frame, frames_per_second=args.frames_per_second, + frames_per_run=args.frames_per_run, tof_peak=args.tof_peak, tof_sigma=args.tof_sigma, det_min=args.det_min, diff --git a/src/saluki/play.py b/src/saluki/play.py index 1f058e8..1bb7119 100644 --- a/src/saluki/play.py +++ b/src/saluki/play.py @@ -70,7 +70,8 @@ def play( logger.debug(f"finished consuming {num_messages} messages") consumer.close() producer.produce_batch( - dest_topic, [{"key": message.key(), "value": message.value()} for message in msgs] + dest_topic, + [{"key": message.key(), "value": message.value()} for message in msgs], ) logger.debug(f"flushing producer. len(p): {len(producer)}") producer.flush(timeout=10) diff --git a/src/saluki/utils.py b/src/saluki/utils.py index d033a57..62845ef 100644 --- a/src/saluki/utils.py +++ b/src/saluki/utils.py @@ -34,7 +34,9 @@ def fallback_deserialiser(payload: bytes) -> str: def deserialise_and_print_messages( - msgs: List[Message], partition: int | None, schemas_to_filter_to: list[str] | None = None + msgs: List[Message], + partition: int | None, + schemas_to_filter_to: list[str] | None = None, ) -> None: for msg in msgs: try: diff --git a/tests/test_howl.py b/tests/test_howl.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_utils.py b/tests/test_utils.py index 263c280..cf19970 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -78,7 +78,9 @@ def test_deserialising_message_which_raises_does_not_stop_loop(mock_message): assert logger.info.call_count == 1 -def test_deserialising_with_schema_list_ignores_messages_with_schema_not_in_list(mock_message): +def test_deserialising_with_schema_list_ignores_messages_with_schema_not_in_list( + mock_message, +): with patch("saluki.utils.logger") as logger: ok_message = Mock(spec=Message) ok_message.value.return_value = serialise_fc00(config_change=1, streams=[]) # type: ignore @@ -179,7 +181,8 @@ def test_uri_with_no_topic(): @pytest.mark.parametrize( - "timestamp", ["2025-11-19T15:27:11", "2025-11-19T15:27:11Z", "2025-11-19T15:27:11+00:00"] + "timestamp", + ["2025-11-19T15:27:11", "2025-11-19T15:27:11Z", "2025-11-19T15:27:11+00:00"], ) def test_parses_datetime_properly_with_string(timestamp): assert dateutil_parsable_or_unix_timestamp(timestamp) == 1763566031000