Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions tutorials/data-generator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.release>17</maven.compiler.release>
<slf4j.version>2.0.17</slf4j.version>
</properties>

<dependencies>
Expand All @@ -27,6 +28,16 @@
<artifactId>apicurio-registry-serdes-avro-serde</artifactId>
<version>${apicurio-registry.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,17 @@
import java.util.function.Function;
import java.util.function.Supplier;

import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.RETENTION_MS_CONFIG;

public class DataGenerator implements Runnable {
final static Map<String, String> RETENTION_CONFIG = Collections.singletonMap(RETENTION_MS_CONFIG, String.valueOf(60 * 60 * 1000)); // 1 hour
final static int KAFKA_ADMIN_CLIENT_REQUEST_TIMEOUT_MS_CONFIG = 5000;
final static String KAFKA_ADMIN_CLIENT_ID_CONFIG = "data-generator-admin-client";
final String bootstrapServers;
static final Map<String, String> RETENTION_CONFIG = Collections.singletonMap(RETENTION_MS_CONFIG, String.valueOf(60 * 60 * 1000)); // 1 hour
final List<Data> dataTypes;
final Properties kafkaAdminProps;

public DataGenerator(String bootstrapServers, List<Data> dataTypes) {
this.bootstrapServers = bootstrapServers;
public DataGenerator(List<Data> dataTypes) {
this.dataTypes = dataTypes;

kafkaAdminProps = new Properties();
kafkaAdminProps.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
kafkaAdminProps.put(CLIENT_ID_CONFIG, KAFKA_ADMIN_CLIENT_ID_CONFIG);
kafkaAdminProps.put(REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(KAFKA_ADMIN_CLIENT_REQUEST_TIMEOUT_MS_CONFIG));
kafkaAdminProps = KafkaAdminProps.get();
}

@Override
Expand All @@ -45,10 +35,10 @@ public void run() {

if (Boolean.parseBoolean(System.getenv("USE_APICURIO_REGISTRY"))) {
String registryUrl = System.getenv("REGISTRY_URL");
Producer<String, Object> producer = new KafkaProducer<>(KafkaClientProps.avro(bootstrapServers, registryUrl));
Producer<String, Object> producer = new KafkaProducer<>(KafkaClientProps.avro(registryUrl));
send(producer, () -> generateTopicRecords(this::generateAvroRecord));
} else {
Producer<String, String> producer = new KafkaProducer<>(KafkaClientProps.csv(bootstrapServers));
Producer<String, String> producer = new KafkaProducer<>(KafkaClientProps.csv());
send(producer, () -> generateTopicRecords(this::generateCsvRecord));
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.github.streamshub.kafka.data.generator;

import java.util.Properties;

public class KafkaAdminProps {
static final String KAFKA_CLIENT_ID_CONFIG = "data-generator-admin-client";

public static Properties get() {
return KafkaCommonProps.get(KAFKA_CLIENT_ID_CONFIG);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@
import java.util.Properties;

public class KafkaClientProps {
public static Properties csv(String bootstrapServers) {
Properties props = init(bootstrapServers);
static final String KAFKA_CLIENT_ID_CONFIG = "data-generator-client";

public static Properties csv() {
Properties props = KafkaCommonProps.get(KAFKA_CLIENT_ID_CONFIG);
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
return props;
}

public static Properties avro(String bootstrapServers, String registryUrl) {
Properties props = init(bootstrapServers);
public static Properties avro(String registryUrl) {
Properties props = KafkaCommonProps.get(KAFKA_CLIENT_ID_CONFIG);
props.put("value.serializer", AvroKafkaSerializer.class.getName());

props.put(SerdeConfig.REGISTRY_URL, registryUrl);
Expand All @@ -28,13 +30,4 @@ public static Properties avro(String bootstrapServers, String registryUrl) {

return props;
}

private static Properties init(String bootstrapServer) {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServer);

props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
return props;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.github.streamshub.kafka.data.generator;

import java.util.Properties;

import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG;

public class KafkaCommonProps {
static final int KAFKA_CLIENT_REQUEST_TIMEOUT_MS_CONFIG = 5000;

public static Properties get(String clientId) {
Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, System.getenv("KAFKA_BOOTSTRAP_SERVERS"));
props.put(CLIENT_ID_CONFIG, clientId);
props.put(REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(KAFKA_CLIENT_REQUEST_TIMEOUT_MS_CONFIG));

String securityProtocol = System.getenv("KAFKA_SECURITY_PROTOCOL");

if (securityProtocol != null && securityProtocol.equals("SSL")) {
props.put(SECURITY_PROTOCOL_CONFIG, securityProtocol);
props.put(SSL_TRUSTSTORE_LOCATION_CONFIG, System.getenv("KAFKA_SSL_TRUSTSTORE_LOCATION"));
props.put(SSL_TRUSTSTORE_TYPE_CONFIG, System.getenv("KAFKA_SSL_TRUSTSTORE_TYPE"));
props.put(SSL_KEYSTORE_LOCATION_CONFIG, System.getenv("KAFKA_SSL_KEYSTORE_LOCATION"));
props.put(SSL_KEYSTORE_PASSWORD_CONFIG, System.getenv("KAFKA_SSL_KEYSTORE_PASSWORD"));
}

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

return props;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@

public class Main {
public static void main(String[] args) {
String bootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS");
List<Data> dataTypes = Arrays.stream(System.getenv("DATA").split(","))
.map(Main::getDataClass)
.toList();
Thread dgThread = new Thread(new DataGenerator(bootstrapServers, dataTypes));
Thread dgThread = new Thread(new DataGenerator(dataTypes));
dgThread.start();
}

Expand Down