Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 8 additions & 14 deletions relay-kafka/src/producer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub enum ClientError {

/// Failed to serialize the json message using serde.
#[error("failed to serialize json message")]
InvalidJson(#[source] serde_json::Error),
InvalidJson(#[from] serde_json::Error),

/// Failed to run schema validation on message.
#[cfg(debug_assertions)]
Expand Down Expand Up @@ -214,6 +214,8 @@ impl Producer {
return Err(ClientError::MissingTopic);
};

relay_log::configure_scope(|s| s.set_tag("topic", topic_name));
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error is now logged higher up in the call stack, so we set the topic tag here as soon as it is known.


Comment on lines +217 to +218
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The configure_scope call for Kafka producers now pollutes the Sentry scope with a topic tag on every message, leading to misleading context in unrelated error logs.
Severity: LOW

Suggested Fix

Move the relay_log::configure_scope call so that it only executes on error paths, similar to how it was handled previously. This ensures the topic tag is only added to the scope when a Kafka-related error is actually being logged, preventing scope pollution for successful operations.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: relay-kafka/src/producer/mod.rs#L217-L218

Potential issue: The `configure_scope` function is now called unconditionally for every
Kafka message being produced, setting a `topic` tag on the thread-local Sentry scope.
This happens even for successful messages. If a subsequent, unrelated error occurs on
the same thread, it will be logged with this potentially misleading `topic` tag from the
last successful Kafka operation. This pollutes the scope and can make debugging more
difficult by providing incorrect context in Sentry error reports.

let producer_name = producer.context().producer_name();

metric!(
Expand Down Expand Up @@ -272,12 +274,6 @@ impl Producer {
});

producer.send(record).map_err(|(error, _message)| {
relay_log::error!(
error = &error as &dyn std::error::Error,
tags.variant = variant,
tags.topic = topic_name,
"error sending kafka message",
);
metric!(
counter(KafkaCounters::ProducerEnqueueError) += 1,
variant = variant,
Expand Down Expand Up @@ -362,20 +358,18 @@ impl KafkaClient {
/// Sends the payload to the correct producer for the current topic.
///
/// Returns the name of the Kafka topic to which the message was produced.
pub fn send(
fn send(
&self,
topic: KafkaTopic,
key: Option<Key>,
headers: Option<&BTreeMap<String, String>>,
variant: &str,
payload: &[u8],
) -> Result<&str, ClientError> {
let producer = self.producers.get(&topic).ok_or_else(|| {
relay_log::error!(
"attempted to send message to {topic:?} using an unconfigured kafka producer",
);
ClientError::InvalidTopicName
})?;
let producer = self
.producers
.get(&topic)
.ok_or_else(|| ClientError::InvalidTopicName)?;

producer.send(key, headers, variant, payload)
}
Expand Down
83 changes: 38 additions & 45 deletions relay-server/src/services/outcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use relay_dynamic_config::Feature;
use relay_event_schema::protocol::{ClientReport, DiscardedEvent, EventId};
use relay_filter::FilterStatKey;
#[cfg(feature = "processing")]
use relay_kafka::{ClientError, KafkaClient, KafkaTopic};
use relay_kafka::{ClientError, KafkaClient, KafkaTopic, SerializationOutput};
use relay_quotas::{DataCategory, ReasonCode, Scoping};
use relay_sampling::config::RuleId;
use relay_sampling::evaluation::MatchedRuleIds;
Expand Down Expand Up @@ -904,16 +904,6 @@ impl FromMessage<Self> for TrackRawOutcome {
}
}

#[derive(Debug)]
#[cfg(feature = "processing")]
#[cfg_attr(feature = "processing", derive(thiserror::Error))]
pub enum OutcomeError {
#[error("failed to send kafka message")]
SendFailed(ClientError),
#[error("json serialization error")]
SerializationError(serde_json::Error),
}

/// Outcome producer backend via HTTP as [`TrackRawOutcome`].
#[derive(Debug)]
struct HttpOutcomeProducer {
Expand Down Expand Up @@ -1191,46 +1181,28 @@ enum OutcomeBroker {

impl OutcomeBroker {
fn handle_message(&self, message: OutcomeProducer, config: &Config) {
match message {
OutcomeProducer::TrackOutcome(msg) => self.handle_track_outcome(msg, config),
OutcomeProducer::TrackRawOutcome(msg) => self.handle_track_raw_outcome(msg),
}
relay_log::with_scope(
|_| {},
Comment on lines +1184 to +1185
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the point of the empty scope config here, does it reset the scope somehow or is that just a leftover?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to keep the topic name on the error, but it is not known at this point. This call creates a new scope without any configuration. configure_scope is then called in produce() further down the call stack to set the topic name.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, you need a new scope, to not "dirty" the surrounding one with the inner configure_scope.

|| match message {
OutcomeProducer::TrackOutcome(msg) => self.handle_track_outcome(msg, config),
OutcomeProducer::TrackRawOutcome(msg) => self.handle_track_raw_outcome(msg),
},
)
}

#[cfg(feature = "processing")]
fn send_kafka_message(
&self,
producer: &KafkaOutcomesProducer,
message: TrackRawOutcome,
) -> Result<(), OutcomeError> {
fn send_kafka_message(&self, producer: &KafkaOutcomesProducer, message: TrackRawOutcome) {
relay_log::trace!("Tracking kafka outcome: {message:?}");

let payload = serde_json::to_string(&message).map_err(OutcomeError::SerializationError)?;

// At the moment, we support outcomes with optional EventId.
// Here we create a fake EventId, when we don't have the real one, so that we can
// create a kafka message key that spreads the events nicely over all the
// kafka consumer groups.
let key = message.event_id.unwrap_or_default().0;

// Dispatch to the correct topic and cluster based on the kind of outcome.
let topic = if message.is_billing() {
KafkaTopic::OutcomesBilling
} else {
KafkaTopic::Outcomes
};

let result = producer.client.send(
topic,
Some(key.as_u128()),
None,
"outcome",
payload.as_bytes(),
);

match result {
Ok(_) => Ok(()),
Err(kafka_error) => Err(OutcomeError::SendFailed(kafka_error)),
if let Err(error) = producer.client.send_message(topic, &message) {
relay_log::error!(error = &error as &dyn Error, "failed to produce outcome");
}
}

Expand All @@ -1240,9 +1212,7 @@ impl OutcomeBroker {
Self::Kafka(kafka_producer) => {
send_outcome_metric(&message, "kafka");
let raw_message = TrackRawOutcome::from_outcome(message, config);
if let Err(error) = self.send_kafka_message(kafka_producer, raw_message) {
relay_log::error!(error = &error as &dyn Error, "failed to produce outcome");
}
self.send_kafka_message(kafka_producer, raw_message);
}
Self::ClientReport(producer) => {
send_outcome_metric(&message, "client_report");
Expand All @@ -1261,9 +1231,7 @@ impl OutcomeBroker {
#[cfg(feature = "processing")]
Self::Kafka(kafka_producer) => {
send_outcome_metric(&message, "kafka");
if let Err(error) = self.send_kafka_message(kafka_producer, message) {
relay_log::error!(error = &error as &dyn Error, "failed to produce outcome");
}
self.send_kafka_message(kafka_producer, message);
}
Self::Http(producer) => {
send_outcome_metric(&message, "http");
Expand All @@ -1275,6 +1243,31 @@ impl OutcomeBroker {
}
}

#[cfg(feature = "processing")]
impl relay_kafka::Message for TrackRawOutcome {
fn key(&self) -> Option<relay_kafka::Key> {
// At the moment, we support outcomes with optional EventId.
// Here we create a fake EventId, when we don't have the real one, so that we can
// create a kafka message key that spreads the events nicely over all the
// kafka consumer groups.
let key = self.event_id.unwrap_or_default().0;
Some(key.as_u128())
}

fn variant(&self) -> &'static str {
"outcome"
}

fn headers(&self) -> Option<&BTreeMap<String, String>> {
None
}

fn serialize(&self) -> Result<SerializationOutput<'_>, ClientError> {
let serialized = serde_json::to_vec(self)?;
Ok(SerializationOutput::Json(Cow::Owned(serialized)))
}
}

#[derive(Debug)]
enum ProducerInner {
#[cfg(feature = "processing")]
Expand Down
Loading
Loading