Skip to content

Commit 271fb8b

Browse files
adinauerclaude
andcommitted
test(samples): Add Kafka queue system tests for Spring Boot 3
Add KafkaQueueSystemTest with e2e tests for: - Producer endpoint creates queue.publish span - Consumer creates queue.process transaction - Distributed tracing (producer and consumer share same trace) - Messaging attributes on publish span and process transaction Also add produceKafkaMessage to RestTestClient and enable sentry.enable-queue-tracing in the kafka profile properties. Requires a running Kafka broker at localhost:9092 and the sample app started with --spring.profiles.active=kafka. Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 04a4689 commit 271fb8b

File tree

5 files changed

+143
-8
lines changed

5 files changed

+143
-8
lines changed

sentry-samples/sentry-samples-spring-boot-jakarta/src/main/resources/application-kafka.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
# Kafka — activate with: --spring.profiles.active=kafka
2+
sentry.enable-queue-tracing=true
3+
24
spring.autoconfigure.exclude=
35
spring.kafka.bootstrap-servers=localhost:9092
46
spring.kafka.consumer.group-id=sentry-sample-group
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package io.sentry.systemtest
2+
3+
import io.sentry.systemtest.util.TestHelper
4+
import kotlin.test.Test
5+
import kotlin.test.assertEquals
6+
import org.junit.Before
7+
8+
/**
9+
* System tests for Kafka queue instrumentation.
10+
*
11+
* Requires:
12+
* - The sample app running with `--spring.profiles.active=kafka`
13+
* - A Kafka broker at localhost:9092
14+
* - The mock Sentry server at localhost:8000
15+
*/
16+
class KafkaQueueSystemTest {
17+
lateinit var testHelper: TestHelper
18+
19+
@Before
20+
fun setup() {
21+
testHelper = TestHelper("http://localhost:8080")
22+
testHelper.reset()
23+
}
24+
25+
@Test
26+
fun `producer endpoint creates queue publish span`() {
27+
val restClient = testHelper.restClient
28+
29+
restClient.produceKafkaMessage("test-message")
30+
assertEquals(200, restClient.lastKnownStatusCode)
31+
32+
testHelper.ensureTransactionReceived { transaction, _ ->
33+
testHelper.doesTransactionContainSpanWithOp(transaction, "queue.publish")
34+
}
35+
}
36+
37+
@Test
38+
fun `consumer creates queue process transaction`() {
39+
val restClient = testHelper.restClient
40+
41+
restClient.produceKafkaMessage("test-consumer-message")
42+
assertEquals(200, restClient.lastKnownStatusCode)
43+
44+
// The consumer runs asynchronously, so wait for the queue.process transaction
45+
testHelper.ensureTransactionReceived { transaction, _ ->
46+
testHelper.doesTransactionHaveOp(transaction, "queue.process")
47+
}
48+
}
49+
50+
@Test
51+
fun `producer and consumer share same trace`() {
52+
val restClient = testHelper.restClient
53+
54+
restClient.produceKafkaMessage("trace-test-message")
55+
assertEquals(200, restClient.lastKnownStatusCode)
56+
57+
// Capture the trace ID from the producer transaction (has queue.publish span)
58+
var producerTraceId: String? = null
59+
testHelper.ensureTransactionReceived { transaction, _ ->
60+
if (testHelper.doesTransactionContainSpanWithOp(transaction, "queue.publish")) {
61+
producerTraceId = transaction.contexts.trace?.traceId?.toString()
62+
true
63+
} else {
64+
false
65+
}
66+
}
67+
68+
// Verify the consumer transaction has the same trace ID
69+
// Use retryCount=3 since the consumer may take a moment to process
70+
testHelper.ensureEnvelopeReceived(retryCount = 3) { envelopeString ->
71+
val envelope =
72+
testHelper.jsonSerializer.deserializeEnvelope(envelopeString.byteInputStream())
73+
?: return@ensureEnvelopeReceived false
74+
val txItem =
75+
envelope.items.firstOrNull { it.header.type == io.sentry.SentryItemType.Transaction }
76+
?: return@ensureEnvelopeReceived false
77+
val tx =
78+
txItem.getTransaction(testHelper.jsonSerializer) ?: return@ensureEnvelopeReceived false
79+
80+
tx.contexts.trace?.operation == "queue.process" &&
81+
tx.contexts.trace?.traceId?.toString() == producerTraceId
82+
}
83+
}
84+
85+
@Test
86+
fun `queue publish span has messaging attributes`() {
87+
val restClient = testHelper.restClient
88+
89+
restClient.produceKafkaMessage("attrs-test")
90+
assertEquals(200, restClient.lastKnownStatusCode)
91+
92+
testHelper.ensureTransactionReceived { transaction, _ ->
93+
val span = transaction.spans.firstOrNull { it.op == "queue.publish" }
94+
if (span == null) return@ensureTransactionReceived false
95+
96+
val data = span.data ?: return@ensureTransactionReceived false
97+
data["messaging.system"] == "kafka" && data["messaging.destination.name"] == "sentry-topic"
98+
}
99+
}
100+
101+
@Test
102+
fun `queue process transaction has messaging attributes`() {
103+
val restClient = testHelper.restClient
104+
105+
restClient.produceKafkaMessage("process-attrs-test")
106+
assertEquals(200, restClient.lastKnownStatusCode)
107+
108+
testHelper.ensureTransactionReceived { transaction, _ ->
109+
if (!testHelper.doesTransactionHaveOp(transaction, "queue.process")) {
110+
return@ensureTransactionReceived false
111+
}
112+
113+
val data = transaction.contexts.trace?.data ?: return@ensureTransactionReceived false
114+
data["messaging.system"] == "kafka" && data["messaging.destination.name"] == "sentry-topic"
115+
}
116+
}
117+
}

sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,10 @@ public SentryKafkaRecordInterceptor(
5555
final @NotNull IScopes forkedScopes = scopes.forkedScopes("SentryKafkaRecordInterceptor");
5656
final @NotNull ISentryLifecycleToken lifecycleToken = forkedScopes.makeCurrent();
5757

58-
continueTrace(forkedScopes, record);
58+
final @Nullable TransactionContext transactionContext = continueTrace(forkedScopes, record);
5959

60-
final @Nullable ITransaction transaction = startTransaction(forkedScopes, record);
60+
final @Nullable ITransaction transaction =
61+
startTransaction(forkedScopes, record, transactionContext);
6162
currentContext.set(new SentryRecordContext(lifecycleToken, transaction));
6263

6364
return delegateIntercept(record, consumer);
@@ -105,28 +106,35 @@ public void afterRecord(
105106
return record;
106107
}
107108

108-
private void continueTrace(
109+
private @Nullable TransactionContext continueTrace(
109110
final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord<K, V> record) {
110111
final @Nullable String sentryTrace = headerValue(record, SentryTraceHeader.SENTRY_TRACE_HEADER);
111112
final @Nullable String baggage = headerValue(record, BaggageHeader.BAGGAGE_HEADER);
112113
final @Nullable List<String> baggageHeaders =
113114
baggage != null ? Collections.singletonList(baggage) : null;
114-
forkedScopes.continueTrace(sentryTrace, baggageHeaders);
115+
return forkedScopes.continueTrace(sentryTrace, baggageHeaders);
115116
}
116117

117118
private @Nullable ITransaction startTransaction(
118-
final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord<K, V> record) {
119+
final @NotNull IScopes forkedScopes,
120+
final @NotNull ConsumerRecord<K, V> record,
121+
final @Nullable TransactionContext transactionContext) {
119122
if (!forkedScopes.getOptions().isTracingEnabled()) {
120123
return null;
121124
}
122125

126+
final @NotNull TransactionContext txContext =
127+
transactionContext != null
128+
? transactionContext
129+
: new TransactionContext("queue.process", "queue.process");
130+
txContext.setName("queue.process");
131+
txContext.setOperation("queue.process");
132+
123133
final @NotNull TransactionOptions txOptions = new TransactionOptions();
124134
txOptions.setOrigin(TRACE_ORIGIN);
125135
txOptions.setBindToScope(true);
126136

127-
final @NotNull ITransaction transaction =
128-
forkedScopes.startTransaction(
129-
new TransactionContext("queue.process", "queue.process"), txOptions);
137+
final @NotNull ITransaction transaction = forkedScopes.startTransaction(txContext, txOptions);
130138

131139
if (transaction.isNoOp()) {
132140
return null;

sentry-system-test-support/api/sentry-system-test-support.api

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,8 @@ public final class io/sentry/systemtest/util/RestTestClient : io/sentry/systemte
560560
public final fun getTodo (J)Lio/sentry/systemtest/Todo;
561561
public final fun getTodoRestClient (J)Lio/sentry/systemtest/Todo;
562562
public final fun getTodoWebclient (J)Lio/sentry/systemtest/Todo;
563+
public final fun produceKafkaMessage (Ljava/lang/String;)Ljava/lang/String;
564+
public static synthetic fun produceKafkaMessage$default (Lio/sentry/systemtest/util/RestTestClient;Ljava/lang/String;ILjava/lang/Object;)Ljava/lang/String;
563565
public final fun saveCachedTodo (Lio/sentry/systemtest/Todo;)Lio/sentry/systemtest/Todo;
564566
}
565567

sentry-system-test-support/src/main/kotlin/io/sentry/systemtest/util/RestTestClient.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,12 @@ class RestTestClient(private val backendBaseUrl: String) : LoggingInsecureRestCl
8181
return response?.body?.string()
8282
}
8383

84+
fun produceKafkaMessage(message: String = "hello from sentry!"): String? {
85+
val request = Request.Builder().url("$backendBaseUrl/kafka/produce?message=$message")
86+
87+
return callTyped(request, true)
88+
}
89+
8490
fun getCountMetric(): String? {
8591
val request = Request.Builder().url("$backendBaseUrl/metric/count")
8692

0 commit comments

Comments
 (0)