diff --git a/src/dataflow/src/source/kafka.rs b/src/dataflow/src/source/kafka.rs index 81d9f0060f552..57314bd1c3a6e 100644 --- a/src/dataflow/src/source/kafka.rs +++ b/src/dataflow/src/source/kafka.rs @@ -150,8 +150,15 @@ impl SourceReader> for KafkaSourceReader { _ => (), } - let partition_stats = - &statistics.topics[self.topic_name.as_str()].partitions[&part.pid]; + let topic_stats = match statistics.topics.get(self.topic_name.as_str()) { + Some(t) => t, + None => continue, + }; + + let partition_stats = match topic_stats.partitions.get(&part.pid) { + Some(p) => p, + None => continue, + }; logger.log(MaterializedEvent::KafkaConsumerInfo { consumer_name: statistics.name.to_string(), diff --git a/test/testdrive/avro-sources.td b/test/testdrive/avro-sources.td index 490287df4f19b..222c10a4c3558 100644 --- a/test/testdrive/avro-sources.td +++ b/test/testdrive/avro-sources.td @@ -299,7 +299,7 @@ $ kafka-ingest format=avro topic=non-dbz-data-varying-partition schema=${non-dbz # Erroneously adds start_offsets for non-existent partitions. > CREATE MATERIALIZED SOURCE non_dbz_data_varying_partition_2 FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-non-dbz-data-varying-partition-${testdrive.seed}' - WITH (start_offset=[0,1]) + WITH (start_offset=[0,1], statistics_interval_ms = 1000) FORMAT AVRO USING SCHEMA '${non-dbz-schema}' ENVELOPE NONE @@ -333,9 +333,15 @@ a b 5 6 9 10 +# There should be two partitions per consumer +> SELECT count(*) FROM mz_kafka_consumer_statistics GROUP BY consumer_name; +count +----- +2 + > CREATE MATERIALIZED SOURCE non_dbz_data_varying_partition_3 FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-non-dbz-data-varying-partition-${testdrive.seed}' - WITH (start_offset=[1,1]) + WITH (start_offset=[1,1], statistics_interval_ms = 1000) FORMAT AVRO USING SCHEMA '${non-dbz-schema}' ENVELOPE NONE @@ -352,6 +358,13 @@ a b 9 10 11 12 +# There should be metrics for 3 partitions per consumer +> SELECT count(*) FROM mz_kafka_consumer_statistics GROUP BY consumer_name; +count +----- +3 +3 + $ set-sql-timeout duration=12.7s # Source with new-style three-valued "snapshot".