diff --git a/tutorials/data-generator/pom.xml b/tutorials/data-generator/pom.xml index 1504fd1..01a0d2f 100644 --- a/tutorials/data-generator/pom.xml +++ b/tutorials/data-generator/pom.xml @@ -14,6 +14,7 @@ UTF-8 17 + 2.0.17 @@ -27,6 +28,16 @@ apicurio-registry-serdes-avro-serde ${apicurio-registry.version} + + org.slf4j + slf4j-api + ${slf4j.version} + + + org.slf4j + slf4j-simple + ${slf4j.version} + diff --git a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/DataGenerator.java b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/DataGenerator.java index 008eb9f..fca3649 100644 --- a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/DataGenerator.java +++ b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/DataGenerator.java @@ -16,27 +16,17 @@ import java.util.function.Function; import java.util.function.Supplier; -import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; -import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG; -import static org.apache.kafka.clients.CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; import static org.apache.kafka.common.config.TopicConfig.RETENTION_MS_CONFIG; public class DataGenerator implements Runnable { - final static Map RETENTION_CONFIG = Collections.singletonMap(RETENTION_MS_CONFIG, String.valueOf(60 * 60 * 1000)); // 1 hour - final static int KAFKA_ADMIN_CLIENT_REQUEST_TIMEOUT_MS_CONFIG = 5000; - final static String KAFKA_ADMIN_CLIENT_ID_CONFIG = "data-generator-admin-client"; - final String bootstrapServers; + static final Map RETENTION_CONFIG = Collections.singletonMap(RETENTION_MS_CONFIG, String.valueOf(60 * 60 * 1000)); // 1 hour final List dataTypes; final Properties kafkaAdminProps; - public DataGenerator(String bootstrapServers, List dataTypes) { - this.bootstrapServers = bootstrapServers; + public DataGenerator(List dataTypes) { this.dataTypes = dataTypes; - kafkaAdminProps = new Properties(); - kafkaAdminProps.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - kafkaAdminProps.put(CLIENT_ID_CONFIG, KAFKA_ADMIN_CLIENT_ID_CONFIG); - kafkaAdminProps.put(REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(KAFKA_ADMIN_CLIENT_REQUEST_TIMEOUT_MS_CONFIG)); + kafkaAdminProps = KafkaAdminProps.get(); } @Override @@ -45,10 +35,10 @@ public void run() { if (Boolean.parseBoolean(System.getenv("USE_APICURIO_REGISTRY"))) { String registryUrl = System.getenv("REGISTRY_URL"); - Producer producer = new KafkaProducer<>(KafkaClientProps.avro(bootstrapServers, registryUrl)); + Producer producer = new KafkaProducer<>(KafkaClientProps.avro(registryUrl)); send(producer, () -> generateTopicRecords(this::generateAvroRecord)); } else { - Producer producer = new KafkaProducer<>(KafkaClientProps.csv(bootstrapServers)); + Producer producer = new KafkaProducer<>(KafkaClientProps.csv()); send(producer, () -> generateTopicRecords(this::generateCsvRecord)); } } diff --git a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/KafkaAdminProps.java b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/KafkaAdminProps.java new file mode 100644 index 0000000..1917bfc --- /dev/null +++ b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/KafkaAdminProps.java @@ -0,0 +1,11 @@ +package com.github.streamshub.kafka.data.generator; + +import java.util.Properties; + +public class KafkaAdminProps { + static final String KAFKA_CLIENT_ID_CONFIG = "data-generator-admin-client"; + + public static Properties get() { + return KafkaCommonProps.get(KAFKA_CLIENT_ID_CONFIG); + } +} diff --git a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/KafkaClientProps.java b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/KafkaClientProps.java index 9b5bd8d..43c4f22 100644 --- a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/KafkaClientProps.java +++ b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/KafkaClientProps.java @@ -8,15 +8,17 @@ import java.util.Properties; public class KafkaClientProps { - public static Properties csv(String bootstrapServers) { - Properties props = init(bootstrapServers); + static final String KAFKA_CLIENT_ID_CONFIG = "data-generator-client"; + + public static Properties csv() { + Properties props = KafkaCommonProps.get(KAFKA_CLIENT_ID_CONFIG); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return props; } - public static Properties avro(String bootstrapServers, String registryUrl) { - Properties props = init(bootstrapServers); + public static Properties avro(String registryUrl) { + Properties props = KafkaCommonProps.get(KAFKA_CLIENT_ID_CONFIG); props.put("value.serializer", AvroKafkaSerializer.class.getName()); props.put(SerdeConfig.REGISTRY_URL, registryUrl); @@ -28,13 +30,4 @@ public static Properties avro(String bootstrapServers, String registryUrl) { return props; } - - private static Properties init(String bootstrapServer) { - Properties props = new Properties(); - props.put("bootstrap.servers", bootstrapServer); - - props.put("key.serializer", - "org.apache.kafka.common.serialization.StringSerializer"); - return props; - } } diff --git a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/KafkaCommonProps.java b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/KafkaCommonProps.java new file mode 100644 index 0000000..4e1ab38 --- /dev/null +++ b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/KafkaCommonProps.java @@ -0,0 +1,37 @@ +package com.github.streamshub.kafka.data.generator; + +import java.util.Properties; + +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG; + +public class KafkaCommonProps { + static final int KAFKA_CLIENT_REQUEST_TIMEOUT_MS_CONFIG = 5000; + + public static Properties get(String clientId) { + Properties props = new Properties(); + props.put(BOOTSTRAP_SERVERS_CONFIG, System.getenv("KAFKA_BOOTSTRAP_SERVERS")); + props.put(CLIENT_ID_CONFIG, clientId); + props.put(REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(KAFKA_CLIENT_REQUEST_TIMEOUT_MS_CONFIG)); + + String securityProtocol = System.getenv("KAFKA_SECURITY_PROTOCOL"); + + if (securityProtocol != null && securityProtocol.equals("SSL")) { + props.put(SECURITY_PROTOCOL_CONFIG, securityProtocol); + props.put(SSL_TRUSTSTORE_LOCATION_CONFIG, System.getenv("KAFKA_SSL_TRUSTSTORE_LOCATION")); + props.put(SSL_TRUSTSTORE_TYPE_CONFIG, System.getenv("KAFKA_SSL_TRUSTSTORE_TYPE")); + props.put(SSL_KEYSTORE_LOCATION_CONFIG, System.getenv("KAFKA_SSL_KEYSTORE_LOCATION")); + props.put(SSL_KEYSTORE_PASSWORD_CONFIG, System.getenv("KAFKA_SSL_KEYSTORE_PASSWORD")); + } + + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + + return props; + } +} diff --git a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/Main.java b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/Main.java index 03cc566..85081a4 100644 --- a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/Main.java +++ b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/Main.java @@ -9,11 +9,10 @@ public class Main { public static void main(String[] args) { - String bootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS"); List dataTypes = Arrays.stream(System.getenv("DATA").split(",")) .map(Main::getDataClass) .toList(); - Thread dgThread = new Thread(new DataGenerator(bootstrapServers, dataTypes)); + Thread dgThread = new Thread(new DataGenerator(dataTypes)); dgThread.start(); }