Skip to content

Commit 9dbeda0

Browse files
committed
sample Kafka + Avro producer & consumer
Signed-off-by: Emanuel Trandafir <emanueltrandafir1993@gmail.com>
1 parent 73fdcf9 commit 9dbeda0

15 files changed

Lines changed: 823 additions & 0 deletions

File tree

consumer_kafka_avro/pom.xml

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xmlns="http://maven.apache.org/POM/4.0.0"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
<parent>
7+
<groupId>org.springframework.boot</groupId>
8+
<artifactId>spring-boot-starter-parent</artifactId>
9+
<version>4.0.1</version>
10+
<relativePath/> <!-- lookup parent from repository -->
11+
</parent>
12+
13+
<groupId>org.springframework.cloud</groupId>
14+
<artifactId>consumer_kafka_avro</artifactId>
15+
<version>5.0.2-SNAPSHOT</version>
16+
<packaging>jar</packaging>
17+
18+
<properties>
19+
<spring-boot.version>4.0.1</spring-boot.version>
20+
<kafka-avro-serializer.version>8.1.1</kafka-avro-serializer.version>
21+
<spring-cloud.version>2025.1.0</spring-cloud.version>
22+
<testcontainers.version>1.20.4</testcontainers.version>
23+
<avro.version>1.12.0</avro.version>
24+
</properties>
25+
26+
<repositories>
27+
<repository>
28+
<id>confluent</id>
29+
<name>Confluent Maven Repository</name>
30+
<url>https://packages.confluent.io/maven/</url>
31+
</repository>
32+
<repository>
33+
<id>spring-snapshots</id>
34+
<name>Spring Snapshots</name>
35+
<url>https://repo.spring.io/snapshot</url>
36+
<snapshots>
37+
<enabled>true</enabled>
38+
</snapshots>
39+
</repository>
40+
</repositories>
41+
42+
<pluginRepositories>
43+
<pluginRepository>
44+
<id>spring-snapshots</id>
45+
<name>Spring Snapshots</name>
46+
<url>https://repo.spring.io/snapshot</url>
47+
<snapshots>
48+
<enabled>true</enabled>
49+
</snapshots>
50+
</pluginRepository>
51+
<pluginRepository>
52+
<id>spring-plugin-snapshots</id>
53+
<name>Spring Snapshots</name>
54+
<url>https://repo.spring.io/snapshot</url>
55+
<snapshots>
56+
<enabled>true</enabled>
57+
</snapshots>
58+
</pluginRepository>
59+
</pluginRepositories>
60+
61+
<dependencyManagement>
62+
<dependencies>
63+
<dependency>
64+
<groupId>org.springframework.cloud</groupId>
65+
<artifactId>spring-cloud-dependencies</artifactId>
66+
<version>${spring-cloud.version}</version>
67+
<type>pom</type>
68+
<scope>import</scope>
69+
</dependency>
70+
<dependency>
71+
<groupId>org.testcontainers</groupId>
72+
<artifactId>testcontainers-bom</artifactId>
73+
<version>1.20.4</version>
74+
<type>pom</type>
75+
<scope>import</scope>
76+
</dependency>
77+
<dependency>
78+
<groupId>org.springframework.boot</groupId>
79+
<artifactId>spring-boot-starter-kafka</artifactId>
80+
<version>4.0.1</version>
81+
</dependency>
82+
<dependency>
83+
<groupId>io.confluent</groupId>
84+
<artifactId>kafka-avro-serializer</artifactId>
85+
<version>${kafka-avro-serializer.version}</version>
86+
</dependency>
87+
</dependencies>
88+
</dependencyManagement>
89+
90+
<dependencies>
91+
<dependency>
92+
<groupId>org.apache.avro</groupId>
93+
<artifactId>avro</artifactId>
94+
<version>${avro.version}</version>
95+
</dependency>
96+
<dependency>
97+
<groupId>org.springframework.boot</groupId>
98+
<artifactId>spring-boot-starter</artifactId>
99+
</dependency>
100+
<dependency>
101+
<groupId>org.springframework.boot</groupId>
102+
<artifactId>spring-boot-starter-kafka</artifactId>
103+
</dependency>
104+
<dependency>
105+
<groupId>io.confluent</groupId>
106+
<artifactId>kafka-avro-serializer</artifactId>
107+
</dependency>
108+
109+
<dependency>
110+
<groupId>org.springframework.boot</groupId>
111+
<artifactId>spring-boot-starter-test</artifactId>
112+
<scope>test</scope>
113+
</dependency>
114+
<dependency>
115+
<groupId>org.springframework.cloud</groupId>
116+
<artifactId>spring-cloud-starter-contract-stub-runner</artifactId>
117+
<scope>test</scope>
118+
</dependency>
119+
120+
<dependency>
121+
<groupId>org.testcontainers</groupId>
122+
<artifactId>testcontainers</artifactId>
123+
<scope>test</scope>
124+
</dependency>
125+
<dependency>
126+
<groupId>org.testcontainers</groupId>
127+
<artifactId>junit-jupiter</artifactId>
128+
<scope>test</scope>
129+
</dependency>
130+
<dependency>
131+
<groupId>org.testcontainers</groupId>
132+
<artifactId>kafka</artifactId>
133+
<scope>test</scope>
134+
</dependency>
135+
</dependencies>
136+
137+
<build>
138+
<pluginManagement>
139+
<plugins>
140+
<plugin>
141+
<groupId>org.springframework.boot</groupId>
142+
<artifactId>spring-boot-maven-plugin</artifactId>
143+
<version>4.0.1</version>
144+
</plugin>
145+
</plugins>
146+
</pluginManagement>
147+
<plugins>
148+
<plugin>
149+
<groupId>org.apache.avro</groupId>
150+
<artifactId>avro-maven-plugin</artifactId>
151+
<version>1.11.3</version>
152+
<executions>
153+
<execution>
154+
<phase>generate-sources</phase>
155+
<goals>
156+
<goal>schema</goal>
157+
</goals>
158+
<configuration>
159+
<sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
160+
<outputDirectory>${project.build.directory}/generated-sources/avro</outputDirectory>
161+
<stringType>String</stringType>
162+
</configuration>
163+
</execution>
164+
</executions>
165+
</plugin>
166+
<plugin>
167+
<groupId>org.springframework.boot</groupId>
168+
<artifactId>spring-boot-maven-plugin</artifactId>
169+
</plugin>
170+
</plugins>
171+
</build>
172+
</project>
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.example.kafka.consumer;
2+
3+
import com.example.kafka.avro.Book;
4+
5+
import org.springframework.kafka.annotation.KafkaListener;
6+
import org.springframework.stereotype.Component;
7+
8+
@Component
9+
class BooksReturnedListener {
10+
11+
private final EmailService emailService;
12+
13+
BooksReturnedListener(EmailService emailService) {
14+
this.emailService = emailService;
15+
}
16+
17+
@KafkaListener(topics = "book.returned")
18+
public void sendEmailOnBookReturned(Book book) {
19+
String emailBody = """
20+
Dear User,
21+
22+
The book you borrowed has been successfully returned:
23+
Title: %s, Author: %s, ISBN: %s
24+
25+
""".formatted(book.getTitle(), book.getAuthor(), book.getIsbn());
26+
27+
emailService.sendEmail(emailBody);
28+
}
29+
30+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.example.kafka.consumer;
2+
3+
import org.springframework.stereotype.Service;
4+
5+
@Service
6+
public class EmailService {
7+
8+
public void sendEmail(String emailBody) {
9+
// Simulate sending an email
10+
System.out.println("Sending email:\n" + emailBody);
11+
}
12+
13+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.example.kafka.consumer;
2+
3+
import org.springframework.boot.SpringApplication;
4+
import org.springframework.boot.autoconfigure.SpringBootApplication;
5+
6+
@SpringBootApplication
7+
public class KafkaAvroConsumerApplication {
8+
9+
public static void main(String[] args) {
10+
SpringApplication.run(KafkaAvroConsumerApplication.class, args);
11+
}
12+
13+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
spring:
2+
application-name: kafka-avro-consumer
3+
kafka:
4+
bootstrap-servers: localhost:9092
5+
producer:
6+
key-serializer: org.apache.kafka.common.serialization.StringSerializer
7+
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
8+
consumer:
9+
group-id: kafka-avro-consumer-group
10+
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
11+
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
12+
properties:
13+
specific.avro.reader: true
14+
properties:
15+
schema.registry.url: mock://
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"type": "record",
3+
"name": "Book",
4+
"namespace": "com.example.kafka.avro",
5+
"fields": [
6+
{
7+
"name": "isbn",
8+
"type": "string"
9+
},
10+
{
11+
"name": "title",
12+
"type": "string"
13+
},
14+
{
15+
"name": "author",
16+
"type": "string"
17+
}
18+
]
19+
}
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package com.example.kafka.consumer;
2+
3+
import java.util.HashMap;
4+
import java.util.Map;
5+
6+
import com.example.kafka.avro.Book;
7+
8+
import org.jetbrains.annotations.Nullable;
9+
import org.junit.jupiter.api.Tag;
10+
import org.junit.jupiter.api.Test;
11+
import org.junit.jupiter.api.extension.ExtendWith;
12+
import org.testcontainers.junit.jupiter.Container;
13+
import org.testcontainers.junit.jupiter.Testcontainers;
14+
import org.testcontainers.kafka.ConfluentKafkaContainer;
15+
import org.testcontainers.utility.DockerImageName;
16+
17+
import wiremock.com.fasterxml.jackson.core.JsonProcessingException;
18+
import wiremock.com.fasterxml.jackson.databind.json.JsonMapper;
19+
20+
import org.springframework.beans.factory.annotation.Autowired;
21+
import org.springframework.boot.test.context.SpringBootTest;
22+
import org.springframework.boot.test.system.OutputCaptureExtension;
23+
import org.springframework.cloud.contract.stubrunner.StubTrigger;
24+
import org.springframework.cloud.contract.stubrunner.spring.AutoConfigureStubRunner;
25+
import org.springframework.cloud.contract.stubrunner.spring.StubRunnerProperties;
26+
import org.springframework.cloud.contract.verifier.converter.YamlContract;
27+
import org.springframework.cloud.contract.verifier.messaging.MessageVerifierSender;
28+
import org.springframework.context.annotation.Bean;
29+
import org.springframework.context.annotation.Configuration;
30+
import org.springframework.kafka.core.KafkaTemplate;
31+
import org.springframework.kafka.support.KafkaHeaders;
32+
import org.springframework.messaging.Message;
33+
import org.springframework.messaging.MessageHeaders;
34+
import org.springframework.messaging.support.MessageBuilder;
35+
import org.springframework.test.context.DynamicPropertyRegistry;
36+
import org.springframework.test.context.DynamicPropertySource;
37+
import org.springframework.test.context.bean.override.mockito.MockitoBean;
38+
39+
import static java.util.Collections.emptyMap;
40+
import static org.awaitility.Awaitility.await;
41+
import static org.mockito.ArgumentMatchers.contains;
42+
import static org.mockito.Mockito.verify;
43+
44+
@Tag("kafka-avro")
45+
@Testcontainers
46+
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = {
47+
CollaborationTest.TestConfig.class, KafkaAvroConsumerApplication.class })
48+
@AutoConfigureStubRunner(ids = "org.springframework.cloud:spring-cloud-contract-sample-kafka-avro-producer:+:stubs", stubsMode = StubRunnerProperties.StubsMode.LOCAL)
49+
@ExtendWith(OutputCaptureExtension.class)
50+
class CollaborationTest {
51+
52+
@Autowired
53+
StubTrigger trigger;
54+
55+
@MockitoBean
56+
EmailService emailService;
57+
58+
@Container
59+
static ConfluentKafkaContainer kafka = new ConfluentKafkaContainer(
60+
DockerImageName.parse("confluentinc/cp-kafka"));
61+
62+
@DynamicPropertySource
63+
static void kafkaProperties(DynamicPropertyRegistry registry) {
64+
registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
65+
}
66+
67+
@Test
68+
void shouldSendEmail_onBookReturned() {
69+
trigger.trigger("book_returned");
70+
71+
// @formatter:off
72+
await().untilAsserted(() ->
73+
verify(emailService).sendEmail(
74+
contains("Title: Contract Testing for Dummies, Author: John Doe, ISBN: 978-1234567890")));
75+
// @formatter:om
76+
}
77+
78+
@Configuration
79+
static class TestConfig {
80+
81+
@Bean
82+
MessageVerifierSender<Message<?>> standaloneMessageVerifier(KafkaTemplate<String, Object> kafkaTemplate) {
83+
return new KafkaAvroMessageVerifierSender<>(kafkaTemplate);
84+
}
85+
86+
}
87+
88+
static class KafkaAvroMessageVerifierSender<M> implements MessageVerifierSender<M> {
89+
90+
private final KafkaTemplate<String, Object> kafkaTemplate;
91+
92+
// TODO: should this be the default?
93+
@Override
94+
public void send(M message, String destination, @Nullable YamlContract contract) {
95+
send(message, emptyMap(), destination, contract);
96+
}
97+
98+
@Override
99+
public <T> void send(T payload, Map<String, Object> headers, String destination,
100+
@Nullable YamlContract contract) {
101+
Map<String, Object> newHeaders = headers != null ? new HashMap<>(headers) : new HashMap<>();
102+
newHeaders.put(KafkaHeaders.TOPIC, destination);
103+
MessageHeaders msgHeaders = new MessageHeaders(newHeaders);
104+
105+
try {
106+
// TODO: remove this workaround after merging:
107+
// https://github.com/spring-cloud/spring-cloud-contract/issues/2404
108+
Book avroPayload = new JsonMapper().readValue(payload.toString(), Book.class);
109+
var message = MessageBuilder.createMessage(avroPayload, msgHeaders);
110+
kafkaTemplate.send(message);
111+
}
112+
catch (JsonProcessingException e) {
113+
throw new RuntimeException(e);
114+
}
115+
}
116+
117+
KafkaAvroMessageVerifierSender(KafkaTemplate<String, Object> kafkaTemplate) {
118+
this.kafkaTemplate = kafkaTemplate;
119+
}
120+
121+
}
122+
123+
}

0 commit comments

Comments
 (0)