Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copied from https://github.com/strimzi/client-examples/blob/main/java/common/src/main/java/io/strimzi/common/ConfigUtil.java
package com.github.streamshub.kafka.data.generator;

import java.util.Locale;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.Map;


/**
* Provides utility methods for managing common configuration properties.
*/
public class ConfigUtil {
private static final String KAFKA_PREFIX = "KAFKA_";

/**
* Converts environment variables into a corresponding property key format.
*
* @param envVar Name of the environment variable to be converted to property key format.
* @return Returns a String which removes a prefix containing '_', converts to lower case and replaces '_' with '.'.
*/
public static String convertEnvVarToPropertyKey(String envVar) {
return envVar.substring(envVar.indexOf("_") + 1).toLowerCase(Locale.ENGLISH).replace("_", ".");
}

/**
* Retrieves Kafka-related properties from environment variables.
* This method scans all environment variables, filters those that start with the prefix "KAFKA_",
* converts them to a property key format, and collects them into a Properties object.
*
* @return Properties object containing Kafka-related properties derived from environment variables.
*/
public static Properties getKafkaPropertiesFromEnv() {
Properties properties = new Properties();
properties.putAll(System.getenv()
.entrySet()
.stream()
.filter(mapEntry -> mapEntry.getKey().startsWith(KAFKA_PREFIX))
.collect(Collectors.toMap(mapEntry -> convertEnvVarToPropertyKey(mapEntry.getKey()), Map.Entry::getValue)));
return properties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,17 @@

import java.util.Properties;

import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG;

public class KafkaCommonProps {
static final int KAFKA_CLIENT_REQUEST_TIMEOUT_MS_CONFIG = 5000;

public static Properties get(String clientId) {
Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, System.getenv("KAFKA_BOOTSTRAP_SERVERS"));
Properties props = ConfigUtil.getKafkaPropertiesFromEnv();
props.put(CLIENT_ID_CONFIG, clientId);
props.put(REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(KAFKA_CLIENT_REQUEST_TIMEOUT_MS_CONFIG));

String securityProtocol = System.getenv("KAFKA_SECURITY_PROTOCOL");

if (securityProtocol != null && securityProtocol.equals("SSL")) {
props.put(SECURITY_PROTOCOL_CONFIG, securityProtocol);
props.put(SSL_TRUSTSTORE_LOCATION_CONFIG, System.getenv("KAFKA_SSL_TRUSTSTORE_LOCATION"));
props.put(SSL_TRUSTSTORE_TYPE_CONFIG, System.getenv("KAFKA_SSL_TRUSTSTORE_TYPE"));
props.put(SSL_KEYSTORE_LOCATION_CONFIG, System.getenv("KAFKA_SSL_KEYSTORE_LOCATION"));
props.put(SSL_KEYSTORE_PASSWORD_CONFIG, System.getenv("KAFKA_SSL_KEYSTORE_PASSWORD"));
}

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.putIfAbsent(REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(KAFKA_CLIENT_REQUEST_TIMEOUT_MS_CONFIG));
props.putIfAbsent("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

return props;
}
Expand Down