diff --git a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/ConfigUtil.java b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/ConfigUtil.java new file mode 100644 index 0000000..108bc7e --- /dev/null +++ b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/ConfigUtil.java @@ -0,0 +1,42 @@ +// Copied from https://github.com/strimzi/client-examples/blob/main/java/common/src/main/java/io/strimzi/common/ConfigUtil.java +package com.github.streamshub.kafka.data.generator; + +import java.util.Locale; +import java.util.Properties; +import java.util.stream.Collectors; +import java.util.Map; + + +/** + * Provides utility methods for managing common configuration properties. + */ +public class ConfigUtil { + private static final String KAFKA_PREFIX = "KAFKA_"; + + /** + * Converts environment variables into a corresponding property key format. + * + * @param envVar Name of the environment variable to be converted to property key format. + * @return Returns a String which removes a prefix containing '_', converts to lower case and replaces '_' with '.'. + */ + public static String convertEnvVarToPropertyKey(String envVar) { + return envVar.substring(envVar.indexOf("_") + 1).toLowerCase(Locale.ENGLISH).replace("_", "."); + } + + /** + * Retrieves Kafka-related properties from environment variables. + * This method scans all environment variables, filters those that start with the prefix "KAFKA_", + * converts them to a property key format, and collects them into a Properties object. + * + * @return Properties object containing Kafka-related properties derived from environment variables. + */ + public static Properties getKafkaPropertiesFromEnv() { + Properties properties = new Properties(); + properties.putAll(System.getenv() + .entrySet() + .stream() + .filter(mapEntry -> mapEntry.getKey().startsWith(KAFKA_PREFIX)) + .collect(Collectors.toMap(mapEntry -> convertEnvVarToPropertyKey(mapEntry.getKey()), Map.Entry::getValue))); + return properties; + } +} diff --git a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/KafkaCommonProps.java b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/KafkaCommonProps.java index 4e1ab38..808b210 100644 --- a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/KafkaCommonProps.java +++ b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/KafkaCommonProps.java @@ -2,35 +2,17 @@ import java.util.Properties; -import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; import static org.apache.kafka.clients.CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; -import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG; -import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG; -import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG; -import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG; -import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG; public class KafkaCommonProps { static final int KAFKA_CLIENT_REQUEST_TIMEOUT_MS_CONFIG = 5000; public static Properties get(String clientId) { - Properties props = new Properties(); - props.put(BOOTSTRAP_SERVERS_CONFIG, System.getenv("KAFKA_BOOTSTRAP_SERVERS")); + Properties props = ConfigUtil.getKafkaPropertiesFromEnv(); props.put(CLIENT_ID_CONFIG, clientId); - props.put(REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(KAFKA_CLIENT_REQUEST_TIMEOUT_MS_CONFIG)); - - String securityProtocol = System.getenv("KAFKA_SECURITY_PROTOCOL"); - - if (securityProtocol != null && securityProtocol.equals("SSL")) { - props.put(SECURITY_PROTOCOL_CONFIG, securityProtocol); - props.put(SSL_TRUSTSTORE_LOCATION_CONFIG, System.getenv("KAFKA_SSL_TRUSTSTORE_LOCATION")); - props.put(SSL_TRUSTSTORE_TYPE_CONFIG, System.getenv("KAFKA_SSL_TRUSTSTORE_TYPE")); - props.put(SSL_KEYSTORE_LOCATION_CONFIG, System.getenv("KAFKA_SSL_KEYSTORE_LOCATION")); - props.put(SSL_KEYSTORE_PASSWORD_CONFIG, System.getenv("KAFKA_SSL_KEYSTORE_PASSWORD")); - } - - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.putIfAbsent(REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(KAFKA_CLIENT_REQUEST_TIMEOUT_MS_CONFIG)); + props.putIfAbsent("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return props; }