diff --git a/tutorials/data-generator/pom.xml b/tutorials/data-generator/pom.xml
index 1504fd1..01a0d2f 100644
--- a/tutorials/data-generator/pom.xml
+++ b/tutorials/data-generator/pom.xml
@@ -14,6 +14,7 @@
UTF-8
17
+ 2.0.17
@@ -27,6 +28,16 @@
apicurio-registry-serdes-avro-serde
${apicurio-registry.version}
+
+ org.slf4j
+ slf4j-api
+ ${slf4j.version}
+
+
+ org.slf4j
+ slf4j-simple
+ ${slf4j.version}
+
diff --git a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/DataGenerator.java b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/DataGenerator.java
index 008eb9f..fca3649 100644
--- a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/DataGenerator.java
+++ b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/DataGenerator.java
@@ -16,27 +16,17 @@
import java.util.function.Function;
import java.util.function.Supplier;
-import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
-import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
-import static org.apache.kafka.clients.CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.RETENTION_MS_CONFIG;
public class DataGenerator implements Runnable {
- final static Map RETENTION_CONFIG = Collections.singletonMap(RETENTION_MS_CONFIG, String.valueOf(60 * 60 * 1000)); // 1 hour
- final static int KAFKA_ADMIN_CLIENT_REQUEST_TIMEOUT_MS_CONFIG = 5000;
- final static String KAFKA_ADMIN_CLIENT_ID_CONFIG = "data-generator-admin-client";
- final String bootstrapServers;
+ static final Map RETENTION_CONFIG = Collections.singletonMap(RETENTION_MS_CONFIG, String.valueOf(60 * 60 * 1000)); // 1 hour
final List dataTypes;
final Properties kafkaAdminProps;
- public DataGenerator(String bootstrapServers, List dataTypes) {
- this.bootstrapServers = bootstrapServers;
+ public DataGenerator(List dataTypes) {
this.dataTypes = dataTypes;
- kafkaAdminProps = new Properties();
- kafkaAdminProps.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- kafkaAdminProps.put(CLIENT_ID_CONFIG, KAFKA_ADMIN_CLIENT_ID_CONFIG);
- kafkaAdminProps.put(REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(KAFKA_ADMIN_CLIENT_REQUEST_TIMEOUT_MS_CONFIG));
+ kafkaAdminProps = KafkaAdminProps.get();
}
@Override
@@ -45,10 +35,10 @@ public void run() {
if (Boolean.parseBoolean(System.getenv("USE_APICURIO_REGISTRY"))) {
String registryUrl = System.getenv("REGISTRY_URL");
- Producer producer = new KafkaProducer<>(KafkaClientProps.avro(bootstrapServers, registryUrl));
+ Producer producer = new KafkaProducer<>(KafkaClientProps.avro(registryUrl));
send(producer, () -> generateTopicRecords(this::generateAvroRecord));
} else {
- Producer producer = new KafkaProducer<>(KafkaClientProps.csv(bootstrapServers));
+ Producer producer = new KafkaProducer<>(KafkaClientProps.csv());
send(producer, () -> generateTopicRecords(this::generateCsvRecord));
}
}
diff --git a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/KafkaAdminProps.java b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/KafkaAdminProps.java
new file mode 100644
index 0000000..1917bfc
--- /dev/null
+++ b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/KafkaAdminProps.java
@@ -0,0 +1,11 @@
+package com.github.streamshub.kafka.data.generator;
+
+import java.util.Properties;
+
+public class KafkaAdminProps {
+ static final String KAFKA_CLIENT_ID_CONFIG = "data-generator-admin-client";
+
+ public static Properties get() {
+ return KafkaCommonProps.get(KAFKA_CLIENT_ID_CONFIG);
+ }
+}
diff --git a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/KafkaClientProps.java b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/KafkaClientProps.java
index 9b5bd8d..43c4f22 100644
--- a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/KafkaClientProps.java
+++ b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/KafkaClientProps.java
@@ -8,15 +8,17 @@
import java.util.Properties;
public class KafkaClientProps {
- public static Properties csv(String bootstrapServers) {
- Properties props = init(bootstrapServers);
+ static final String KAFKA_CLIENT_ID_CONFIG = "data-generator-client";
+
+ public static Properties csv() {
+ Properties props = KafkaCommonProps.get(KAFKA_CLIENT_ID_CONFIG);
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
return props;
}
- public static Properties avro(String bootstrapServers, String registryUrl) {
- Properties props = init(bootstrapServers);
+ public static Properties avro(String registryUrl) {
+ Properties props = KafkaCommonProps.get(KAFKA_CLIENT_ID_CONFIG);
props.put("value.serializer", AvroKafkaSerializer.class.getName());
props.put(SerdeConfig.REGISTRY_URL, registryUrl);
@@ -28,13 +30,4 @@ public static Properties avro(String bootstrapServers, String registryUrl) {
return props;
}
-
- private static Properties init(String bootstrapServer) {
- Properties props = new Properties();
- props.put("bootstrap.servers", bootstrapServer);
-
- props.put("key.serializer",
- "org.apache.kafka.common.serialization.StringSerializer");
- return props;
- }
}
diff --git a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/KafkaCommonProps.java b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/KafkaCommonProps.java
new file mode 100644
index 0000000..4e1ab38
--- /dev/null
+++ b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/KafkaCommonProps.java
@@ -0,0 +1,37 @@
+package com.github.streamshub.kafka.data.generator;
+
+import java.util.Properties;
+
+import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
+import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
+import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG;
+import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG;
+
+public class KafkaCommonProps {
+ static final int KAFKA_CLIENT_REQUEST_TIMEOUT_MS_CONFIG = 5000;
+
+ public static Properties get(String clientId) {
+ Properties props = new Properties();
+ props.put(BOOTSTRAP_SERVERS_CONFIG, System.getenv("KAFKA_BOOTSTRAP_SERVERS"));
+ props.put(CLIENT_ID_CONFIG, clientId);
+ props.put(REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(KAFKA_CLIENT_REQUEST_TIMEOUT_MS_CONFIG));
+
+ String securityProtocol = System.getenv("KAFKA_SECURITY_PROTOCOL");
+
+ if (securityProtocol != null && securityProtocol.equals("SSL")) {
+ props.put(SECURITY_PROTOCOL_CONFIG, securityProtocol);
+ props.put(SSL_TRUSTSTORE_LOCATION_CONFIG, System.getenv("KAFKA_SSL_TRUSTSTORE_LOCATION"));
+ props.put(SSL_TRUSTSTORE_TYPE_CONFIG, System.getenv("KAFKA_SSL_TRUSTSTORE_TYPE"));
+ props.put(SSL_KEYSTORE_LOCATION_CONFIG, System.getenv("KAFKA_SSL_KEYSTORE_LOCATION"));
+ props.put(SSL_KEYSTORE_PASSWORD_CONFIG, System.getenv("KAFKA_SSL_KEYSTORE_PASSWORD"));
+ }
+
+ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+
+ return props;
+ }
+}
diff --git a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/Main.java b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/Main.java
index 03cc566..85081a4 100644
--- a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/Main.java
+++ b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/Main.java
@@ -9,11 +9,10 @@
public class Main {
public static void main(String[] args) {
- String bootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS");
List dataTypes = Arrays.stream(System.getenv("DATA").split(","))
.map(Main::getDataClass)
.toList();
- Thread dgThread = new Thread(new DataGenerator(bootstrapServers, dataTypes));
+ Thread dgThread = new Thread(new DataGenerator(dataTypes));
dgThread.start();
}