From 675f82b4ef617042ebc24a0569039695c51f4b2c Mon Sep 17 00:00:00 2001 From: Chris Golden Date: Wed, 24 Mar 2021 15:08:38 -0700 Subject: [PATCH 1/4] Testing race condition in rdkafka statistics --- test/testdrive/avro-sources.td | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/testdrive/avro-sources.td b/test/testdrive/avro-sources.td index 490287df4f19b..43e93dfd18b35 100644 --- a/test/testdrive/avro-sources.td +++ b/test/testdrive/avro-sources.td @@ -299,7 +299,7 @@ $ kafka-ingest format=avro topic=non-dbz-data-varying-partition schema=${non-dbz # Erroneously adds start_offsets for non-existent partitions. > CREATE MATERIALIZED SOURCE non_dbz_data_varying_partition_2 FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-non-dbz-data-varying-partition-${testdrive.seed}' - WITH (start_offset=[0,1]) + WITH (start_offset=[0,1], statistics_interval_ms = 1000) FORMAT AVRO USING SCHEMA '${non-dbz-schema}' ENVELOPE NONE @@ -335,7 +335,7 @@ a b > CREATE MATERIALIZED SOURCE non_dbz_data_varying_partition_3 FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-non-dbz-data-varying-partition-${testdrive.seed}' - WITH (start_offset=[1,1]) + WITH (start_offset=[1,1], statistics_interval_ms = 1000) FORMAT AVRO USING SCHEMA '${non-dbz-schema}' ENVELOPE NONE From f0f756075ba68ea9b4994333dc4cee8cd4072033 Mon Sep 17 00:00:00 2001 From: Chris Golden Date: Wed, 24 Mar 2021 14:40:02 -0700 Subject: [PATCH 2/4] Another debugging commit for CI --- src/dataflow/src/source/kafka.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/dataflow/src/source/kafka.rs b/src/dataflow/src/source/kafka.rs index 81d9f0060f552..f669c2a1280ee 100644 --- a/src/dataflow/src/source/kafka.rs +++ b/src/dataflow/src/source/kafka.rs @@ -142,6 +142,7 @@ impl SourceReader> for KafkaSourceReader { // Read any statistics objects generated via the GlueConsumerContext::stats callback while let Ok(statistics) = self.stats_rx.try_recv() { if let Some(logger) = self.logger.as_mut() { + info!("Client stats: {:#?}", statistics); for mut part in self.partition_consumers.iter_mut() { // If this is the first callback, initialize our consumer name // so that we can later retract this when the source is dropped @@ -150,6 +151,12 @@ impl SourceReader> for KafkaSourceReader { _ => (), } + info!( + "Topic: {}, Partition: {}", + self.topic_name.as_str(), + part.pid.to_string() + ); + let partition_stats = &statistics.topics[self.topic_name.as_str()].partitions[&part.pid]; From 5004c8784b7d022e3707399b980a55f0efdf0938 Mon Sep 17 00:00:00 2001 From: Chris Golden Date: Wed, 24 Mar 2021 16:08:45 -0700 Subject: [PATCH 3/4] Continue processing stats on key errors If metrics for a topic or partition are missing, don't abort. Continue processing the rest of the metrics instead. Eventually the metrics should show up. Add a test case to verify that the metrics are synchronized eventually. Fixes #6204 --- src/dataflow/src/source/kafka.rs | 11 +++++++++-- test/testdrive/avro-sources.td | 13 +++++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/src/dataflow/src/source/kafka.rs b/src/dataflow/src/source/kafka.rs index f669c2a1280ee..eef050c84bcba 100644 --- a/src/dataflow/src/source/kafka.rs +++ b/src/dataflow/src/source/kafka.rs @@ -157,8 +157,15 @@ impl SourceReader> for KafkaSourceReader { part.pid.to_string() ); - let partition_stats = - &statistics.topics[self.topic_name.as_str()].partitions[&part.pid]; + let topic_stats = match statistics.topics.get(self.topic_name.as_str()) { + Some(t) => t, + None => continue, + }; + + let partition_stats = match topic_stats.partitions.get(&part.pid) { + Some(p) => p, + None => continue, + }; logger.log(MaterializedEvent::KafkaConsumerInfo { consumer_name: statistics.name.to_string(), diff --git a/test/testdrive/avro-sources.td b/test/testdrive/avro-sources.td index 43e93dfd18b35..222c10a4c3558 100644 --- a/test/testdrive/avro-sources.td +++ b/test/testdrive/avro-sources.td @@ -333,6 +333,12 @@ a b 5 6 9 10 +# There should be two partitions per consumer +> SELECT count(*) FROM mz_kafka_consumer_statistics GROUP BY consumer_name; +count +----- +2 + > CREATE MATERIALIZED SOURCE non_dbz_data_varying_partition_3 FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-non-dbz-data-varying-partition-${testdrive.seed}' WITH (start_offset=[1,1], statistics_interval_ms = 1000) @@ -352,6 +358,13 @@ a b 9 10 11 12 +# There should be metrics for 3 partitions per consumer +> SELECT count(*) FROM mz_kafka_consumer_statistics GROUP BY consumer_name; +count +----- +3 +3 + $ set-sql-timeout duration=12.7s # Source with new-style three-valued "snapshot". From 211086cd0a12a1106538dcab808acecf9a651fef Mon Sep 17 00:00:00 2001 From: Chris Golden Date: Wed, 24 Mar 2021 16:19:53 -0700 Subject: [PATCH 4/4] Remove debugging commit --- src/dataflow/src/source/kafka.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/dataflow/src/source/kafka.rs b/src/dataflow/src/source/kafka.rs index eef050c84bcba..57314bd1c3a6e 100644 --- a/src/dataflow/src/source/kafka.rs +++ b/src/dataflow/src/source/kafka.rs @@ -142,7 +142,6 @@ impl SourceReader> for KafkaSourceReader { // Read any statistics objects generated via the GlueConsumerContext::stats callback while let Ok(statistics) = self.stats_rx.try_recv() { if let Some(logger) = self.logger.as_mut() { - info!("Client stats: {:#?}", statistics); for mut part in self.partition_consumers.iter_mut() { // If this is the first callback, initialize our consumer name // so that we can later retract this when the source is dropped @@ -151,12 +150,6 @@ impl SourceReader> for KafkaSourceReader { _ => (), } - info!( - "Topic: {}, Partition: {}", - self.topic_name.as_str(), - part.pid.to_string() - ); - let topic_stats = match statistics.topics.get(self.topic_name.as_str()) { Some(t) => t, None => continue,