Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 106 additions & 6 deletions sentry-rails/lib/sentry/rails/active_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,59 @@
module Sentry
module Rails
module ActiveJobExtensions
SENTRY_PAYLOAD_KEY = "_sentry"

USER_FIELDS_WHITELIST = %w[id email username].freeze

def perform_now
if !Sentry.initialized? || already_supported_by_sentry_integration?
super
else
SentryReporter.record(self) do
super
SentryReporter.record(
self,
trace_headers: @_sentry_trace_headers,
user: @_sentry_user
) { super }
end
end

def serialize
payload = super
return payload if !Sentry.initialized? || already_supported_by_sentry_integration?

begin
sentry_data = {}
headers = Sentry.get_trace_propagation_headers
sentry_data["trace_propagation_headers"] = headers if headers && !headers.empty?

if Sentry.configuration.send_default_pii
user = Sentry.get_current_scope.user || {}
whitelisted = user.each_with_object({}) do |(k, v), acc|
acc[k.to_s] = v if USER_FIELDS_WHITELIST.include?(k.to_s)
end
sentry_data["user"] = whitelisted unless whitelisted.empty?
end

payload[SENTRY_PAYLOAD_KEY] = sentry_data unless sentry_data.empty?
rescue StandardError => e
Sentry.sdk_logger&.error("sentry-rails: failed to inject _sentry payload: #{e}")
end

payload
end

def deserialize(job_data)
super
return if !Sentry.initialized? || already_supported_by_sentry_integration?

begin
sentry_data = job_data[SENTRY_PAYLOAD_KEY]
return unless sentry_data

@_sentry_trace_headers = sentry_data["trace_propagation_headers"]
@_sentry_user = sentry_data["user"]
rescue StandardError => e
Sentry.sdk_logger&.error("sentry-rails: failed to extract _sentry payload: #{e}")
end
end

Expand All @@ -28,19 +74,53 @@ class SentryReporter
}

class << self
def record(job, &block)
def producer_callback_registered?
@producer_callback_registered ||= false
end

def producer_callback_registered!
@producer_callback_registered = true
end

def record_producer_span(job)
return yield if !Sentry.initialized? || job.already_supported_by_sentry_integration?

Sentry.with_child_span(op: "queue.publish", description: job.class.name) do |span|
if span
span.set_origin(SPAN_ORIGIN)
span.set_data(Sentry::Span::DataConventions::MESSAGING_MESSAGE_ID, job.job_id)
span.set_data(Sentry::Span::DataConventions::MESSAGING_DESTINATION_NAME, job.queue_name)
end
yield
end
end

def record(job, trace_headers: nil, user: nil, &block)
Sentry.clone_hub_to_current_thread if Thread.current != Thread.main

Sentry.with_scope do |scope|
begin
scope.set_user(user) if user && !user.empty?
scope.set_transaction_name(job.class.name, source: :task)

transaction = Sentry.start_transaction(
transaction_options = {
name: scope.transaction_name,
source: scope.transaction_source,
op: OP_NAME,
origin: SPAN_ORIGIN
)
}

transaction = if trace_headers && !trace_headers.empty?
continued = Sentry.continue_trace(trace_headers, **transaction_options)
Sentry.start_transaction(transaction: continued, **transaction_options)
else
Sentry.start_transaction(**transaction_options)
end

scope.set_span(transaction) if transaction
if transaction
set_messaging_data(transaction, job)
scope.set_span(transaction)
end

yield.tap do
finish_sentry_transaction(transaction, 200)
Expand All @@ -55,6 +135,26 @@ def record(job, &block)
end
end

def set_messaging_data(transaction, job)
transaction.set_data(Sentry::Span::DataConventions::MESSAGING_MESSAGE_ID, job.job_id)
transaction.set_data(Sentry::Span::DataConventions::MESSAGING_DESTINATION_NAME, job.queue_name)

if job.executions && job.executions > 1
transaction.set_data(Sentry::Span::DataConventions::MESSAGING_MESSAGE_RETRY_COUNT, job.executions - 1)
end

if (latency = compute_latency(job))
transaction.set_data(Sentry::Span::DataConventions::MESSAGING_MESSAGE_RECEIVE_LATENCY, latency)
end
end

def compute_latency(job)
return unless job.respond_to?(:enqueued_at) && job.enqueued_at

enqueued_time = job.enqueued_at.is_a?(String) ? Time.parse(job.enqueued_at) : job.enqueued_at
((Time.now.to_f - enqueued_time.to_f) * 1000).round
end

def capture_exception(job, e)
Sentry::Rails.capture_exception(
e,
Expand Down
7 changes: 7 additions & 0 deletions sentry-rails/lib/sentry/rails/railtie.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ class Railtie < ::Rails::Railtie
ActiveSupport.on_load(:active_job) do
require "sentry/rails/active_job"
prepend Sentry::Rails::ActiveJobExtensions

unless Sentry::Rails::ActiveJobExtensions::SentryReporter.producer_callback_registered?
around_enqueue do |job, block|
Sentry::Rails::ActiveJobExtensions::SentryReporter.record_producer_span(job, &block)
end
Sentry::Rails::ActiveJobExtensions::SentryReporter.producer_callback_registered!
end
end
end

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# frozen_string_literal: true

RSpec.shared_examples "an ActiveJob backend that supports distributed tracing" do
it_behaves_like "an ActiveJob backend that emits a producer span on enqueue"
it_behaves_like "an ActiveJob backend that propagates trace context through the job payload"
it_behaves_like "an ActiveJob backend that records messaging span data on the consumer transaction"
it_behaves_like "an ActiveJob backend that propagates Sentry user context through job payloads"
it_behaves_like "an ActiveJob backend that isolates Sentry context per worker thread"
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# frozen_string_literal: true

RSpec.shared_examples "an ActiveJob backend that records messaging span data on the consumer transaction" do
include ActiveSupport::Testing::TimeHelpers

let(:successful_job) do
job_fixture do
def perform; end
end
end

let(:configure_sentry) { proc { |config| config.traces_sample_rate = 1.0 } }

it "records messaging.message.id and messaging.destination.name on the consumer transaction" do
successful_job.set(queue: "critical").perform_later
drain

data = consumer_transaction.contexts.dig(:trace, :data)
expect(data["messaging.message.id"]).to be_a(String).and(satisfy { |v| !v.empty? })
expect(data["messaging.destination.name"]).to eq("critical")
end

it "omits messaging.message.retry.count on the first execution" do
successful_job.perform_later
drain

data = consumer_transaction.contexts.dig(:trace, :data)
expect(data).not_to have_key("messaging.message.retry.count")
end

it "records messaging.message.retry.count = executions - 1 on retried executions" do
klass = job_fixture do
def perform; end
end

allow_any_instance_of(klass).to receive(:executions).and_return(3)

klass.perform_later
drain

data = consumer_transaction.contexts.dig(:trace, :data)
expect(data["messaging.message.retry.count"]).to eq(2)
end

it "records messaging.message.receive.latency in milliseconds", skip: RAILS_VERSION < 6.1 do
successful_job.perform_later

# Older Rails versions truncate Time.now to whole seconds inside `travel`
# (no `with_usec:` option until 7.0+), so the measured latency can be up
# to ~999ms below the travel delta. Widen the tolerance accordingly.
if RAILS_VERSION > 7.0
travel(5.seconds, with_usec: true) { drain }
tolerance = 50
else
travel(5.seconds) { drain }
tolerance = 1100
end

latency = consumer_transaction.contexts.dig(:trace, :data, "messaging.message.receive.latency")
expect(latency).to be_a(Integer)
expect(latency).to be_within(tolerance).of(5_000)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# frozen_string_literal: true

RSpec.shared_examples "an ActiveJob backend that emits a producer span on enqueue" do
let(:successful_job) do
job_fixture do
def perform; end
end
end

context "with traces_sample_rate = 1.0" do
let(:configure_sentry) { proc { |config| config.traces_sample_rate = 1.0 } }

it "adds a queue.publish child span to the active parent transaction" do
within_parent_transaction do
successful_job.set(queue: "events").perform_later
end

parent = transactions.find { |t| t.contexts.dig(:trace, :op) == "test" }
expect(parent).not_to be_nil

publish_span = parent.spans.find { |s| s[:op] == "queue.publish" }
expect(publish_span).not_to be_nil
expect(publish_span[:description]).to eq(successful_job.name)
expect(publish_span[:origin]).to eq("auto.queue.active_job")
expect(publish_span[:data]["messaging.message.id"]).to be_a(String).and(satisfy { |v| !v.empty? })
expect(publish_span[:data]["messaging.destination.name"]).to eq("events")
expect(publish_span[:timestamp]).not_to be_nil
end

it "does not raise or capture an orphan span when no parent transaction is active" do
expect { successful_job.perform_later }.not_to raise_error

orphan_publish = transactions.flat_map(&:spans).find { |s| s[:op] == "queue.publish" }
expect(orphan_publish).to be_nil
end
end

context "with traces_sample_rate = 0" do
let(:configure_sentry) { proc { |config| config.traces_sample_rate = 0 } }

it "does not capture a queue.publish span" do
within_parent_transaction do
successful_job.perform_later
end

publish_spans = transactions.flat_map(&:spans).select { |s| s[:op] == "queue.publish" }
expect(publish_spans).to be_empty
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# frozen_string_literal: true

RSpec.shared_examples "an ActiveJob backend that propagates trace context through the job payload" do
let(:successful_job) do
job_fixture do
def perform; end
end
end

let(:configure_sentry) { proc { |config| config.traces_sample_rate = 1.0 } }

it "produces a consumer transaction whose trace_id matches the parent transaction" do
parent_trace_id = nil
publish_span_id = nil

within_parent_transaction do |parent|
parent_trace_id = parent.trace_id
successful_job.perform_later
publish_span_id = parent.span_recorder.spans.find { |s| s.op == "queue.publish" }&.span_id
end

drain

expect(consumer_transaction).not_to be_nil
expect(consumer_transaction.contexts.dig(:trace, :trace_id)).to eq(parent_trace_id)
expect(consumer_transaction.contexts.dig(:trace, :parent_span_id)).to eq(publish_span_id)
end

it "captures a consumer transaction without raising when no parent transaction was active at enqueue" do
expect { successful_job.perform_later }.not_to raise_error
expect { drain }.not_to raise_error

expect(consumer_transaction).not_to be_nil
expect(consumer_transaction.contexts.dig(:trace, :trace_id)).to be_a(String)
end

it "survives a JSON round-trip of the serialized payload" do
parent_trace_id = nil

within_parent_transaction do |parent|
parent_trace_id = parent.trace_id
payload = successful_job.new.serialize
round_tripped = JSON.parse(JSON.generate(payload))
::ActiveJob::Base.execute(round_tripped)
end

expect(consumer_transaction).not_to be_nil
expect(consumer_transaction.contexts.dig(:trace, :trace_id)).to eq(parent_trace_id)
end
end
Loading
Loading