-
Notifications
You must be signed in to change notification settings - Fork 114
ref(kafka): Deduplicate kafka error logging #5811
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9d18102
ac38441
12a82b3
9b68c19
bec955e
8c29180
8fc7b3f
fa94546
5a5cfdc
323cc92
bfa5e14
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -53,7 +53,7 @@ pub enum ClientError { | |
|
|
||
| /// Failed to serialize the json message using serde. | ||
| #[error("failed to serialize json message")] | ||
| InvalidJson(#[source] serde_json::Error), | ||
| InvalidJson(#[from] serde_json::Error), | ||
|
|
||
| /// Failed to run schema validation on message. | ||
| #[cfg(debug_assertions)] | ||
|
|
@@ -214,6 +214,8 @@ impl Producer { | |
| return Err(ClientError::MissingTopic); | ||
| }; | ||
|
|
||
| relay_log::configure_scope(|s| s.set_tag("topic", topic_name)); | ||
|
|
||
|
Comment on lines
+217
to
+218
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: The Suggested FixMove the Prompt for AI Agent |
||
| let producer_name = producer.context().producer_name(); | ||
|
|
||
| metric!( | ||
|
|
@@ -272,12 +274,6 @@ impl Producer { | |
| }); | ||
|
|
||
| producer.send(record).map_err(|(error, _message)| { | ||
| relay_log::error!( | ||
| error = &error as &dyn std::error::Error, | ||
| tags.variant = variant, | ||
| tags.topic = topic_name, | ||
| "error sending kafka message", | ||
| ); | ||
| metric!( | ||
| counter(KafkaCounters::ProducerEnqueueError) += 1, | ||
| variant = variant, | ||
|
|
@@ -362,20 +358,18 @@ impl KafkaClient { | |
| /// Sends the payload to the correct producer for the current topic. | ||
| /// | ||
| /// Returns the name of the Kafka topic to which the message was produced. | ||
| pub fn send( | ||
| fn send( | ||
| &self, | ||
| topic: KafkaTopic, | ||
| key: Option<Key>, | ||
| headers: Option<&BTreeMap<String, String>>, | ||
| variant: &str, | ||
| payload: &[u8], | ||
| ) -> Result<&str, ClientError> { | ||
| let producer = self.producers.get(&topic).ok_or_else(|| { | ||
| relay_log::error!( | ||
| "attempted to send message to {topic:?} using an unconfigured kafka producer", | ||
| ); | ||
| ClientError::InvalidTopicName | ||
| })?; | ||
| let producer = self | ||
| .producers | ||
| .get(&topic) | ||
| .ok_or_else(|| ClientError::InvalidTopicName)?; | ||
|
|
||
| producer.send(key, headers, variant, payload) | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,7 +28,7 @@ use relay_dynamic_config::Feature; | |
| use relay_event_schema::protocol::{ClientReport, DiscardedEvent, EventId}; | ||
| use relay_filter::FilterStatKey; | ||
| #[cfg(feature = "processing")] | ||
| use relay_kafka::{ClientError, KafkaClient, KafkaTopic}; | ||
| use relay_kafka::{ClientError, KafkaClient, KafkaTopic, SerializationOutput}; | ||
| use relay_quotas::{DataCategory, ReasonCode, Scoping}; | ||
| use relay_sampling::config::RuleId; | ||
| use relay_sampling::evaluation::MatchedRuleIds; | ||
|
|
@@ -904,16 +904,6 @@ impl FromMessage<Self> for TrackRawOutcome { | |
| } | ||
| } | ||
|
|
||
| #[derive(Debug)] | ||
| #[cfg(feature = "processing")] | ||
| #[cfg_attr(feature = "processing", derive(thiserror::Error))] | ||
| pub enum OutcomeError { | ||
| #[error("failed to send kafka message")] | ||
| SendFailed(ClientError), | ||
| #[error("json serialization error")] | ||
| SerializationError(serde_json::Error), | ||
| } | ||
|
|
||
| /// Outcome producer backend via HTTP as [`TrackRawOutcome`]. | ||
| #[derive(Debug)] | ||
| struct HttpOutcomeProducer { | ||
|
|
@@ -1191,46 +1181,28 @@ enum OutcomeBroker { | |
|
|
||
| impl OutcomeBroker { | ||
| fn handle_message(&self, message: OutcomeProducer, config: &Config) { | ||
| match message { | ||
| OutcomeProducer::TrackOutcome(msg) => self.handle_track_outcome(msg, config), | ||
| OutcomeProducer::TrackRawOutcome(msg) => self.handle_track_raw_outcome(msg), | ||
| } | ||
| relay_log::with_scope( | ||
| |_| {}, | ||
|
Comment on lines
+1184
to
+1185
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the point of the empty scope config here, does it reset the scope somehow or is that just a leftover?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wanted to keep the topic name on the error, but it is not known at this point. This call creates a new scope without any configuration.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh I see, you need a new scope, to not "dirty" the surrounding one with the inner |
||
| || match message { | ||
| OutcomeProducer::TrackOutcome(msg) => self.handle_track_outcome(msg, config), | ||
| OutcomeProducer::TrackRawOutcome(msg) => self.handle_track_raw_outcome(msg), | ||
| }, | ||
| ) | ||
| } | ||
|
|
||
| #[cfg(feature = "processing")] | ||
| fn send_kafka_message( | ||
| &self, | ||
| producer: &KafkaOutcomesProducer, | ||
| message: TrackRawOutcome, | ||
| ) -> Result<(), OutcomeError> { | ||
| fn send_kafka_message(&self, producer: &KafkaOutcomesProducer, message: TrackRawOutcome) { | ||
| relay_log::trace!("Tracking kafka outcome: {message:?}"); | ||
|
|
||
| let payload = serde_json::to_string(&message).map_err(OutcomeError::SerializationError)?; | ||
|
|
||
| // At the moment, we support outcomes with optional EventId. | ||
| // Here we create a fake EventId, when we don't have the real one, so that we can | ||
| // create a kafka message key that spreads the events nicely over all the | ||
| // kafka consumer groups. | ||
| let key = message.event_id.unwrap_or_default().0; | ||
|
|
||
| // Dispatch to the correct topic and cluster based on the kind of outcome. | ||
| let topic = if message.is_billing() { | ||
| KafkaTopic::OutcomesBilling | ||
| } else { | ||
| KafkaTopic::Outcomes | ||
| }; | ||
|
|
||
| let result = producer.client.send( | ||
| topic, | ||
| Some(key.as_u128()), | ||
| None, | ||
| "outcome", | ||
| payload.as_bytes(), | ||
| ); | ||
|
|
||
| match result { | ||
| Ok(_) => Ok(()), | ||
| Err(kafka_error) => Err(OutcomeError::SendFailed(kafka_error)), | ||
| if let Err(error) = producer.client.send_message(topic, &message) { | ||
| relay_log::error!(error = &error as &dyn Error, "failed to produce outcome"); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -1240,9 +1212,7 @@ impl OutcomeBroker { | |
| Self::Kafka(kafka_producer) => { | ||
| send_outcome_metric(&message, "kafka"); | ||
| let raw_message = TrackRawOutcome::from_outcome(message, config); | ||
| if let Err(error) = self.send_kafka_message(kafka_producer, raw_message) { | ||
| relay_log::error!(error = &error as &dyn Error, "failed to produce outcome"); | ||
| } | ||
| self.send_kafka_message(kafka_producer, raw_message); | ||
| } | ||
| Self::ClientReport(producer) => { | ||
| send_outcome_metric(&message, "client_report"); | ||
|
|
@@ -1261,9 +1231,7 @@ impl OutcomeBroker { | |
| #[cfg(feature = "processing")] | ||
| Self::Kafka(kafka_producer) => { | ||
| send_outcome_metric(&message, "kafka"); | ||
| if let Err(error) = self.send_kafka_message(kafka_producer, message) { | ||
| relay_log::error!(error = &error as &dyn Error, "failed to produce outcome"); | ||
| } | ||
| self.send_kafka_message(kafka_producer, message); | ||
| } | ||
| Self::Http(producer) => { | ||
| send_outcome_metric(&message, "http"); | ||
|
|
@@ -1275,6 +1243,31 @@ impl OutcomeBroker { | |
| } | ||
| } | ||
|
|
||
| #[cfg(feature = "processing")] | ||
| impl relay_kafka::Message for TrackRawOutcome { | ||
| fn key(&self) -> Option<relay_kafka::Key> { | ||
| // At the moment, we support outcomes with optional EventId. | ||
| // Here we create a fake EventId, when we don't have the real one, so that we can | ||
| // create a kafka message key that spreads the events nicely over all the | ||
| // kafka consumer groups. | ||
| let key = self.event_id.unwrap_or_default().0; | ||
| Some(key.as_u128()) | ||
| } | ||
|
|
||
| fn variant(&self) -> &'static str { | ||
| "outcome" | ||
| } | ||
|
|
||
| fn headers(&self) -> Option<&BTreeMap<String, String>> { | ||
| None | ||
| } | ||
|
|
||
| fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError> { | ||
| let serialized = serde_json::to_vec(self)?; | ||
| Ok(SerializationOutput::Json(Cow::Owned(serialized))) | ||
| } | ||
| } | ||
|
|
||
| #[derive(Debug)] | ||
| enum ProducerInner { | ||
| #[cfg(feature = "processing")] | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error is now logged higher up in the call stack, so we set the topic tag here as soon as it is known.