;
}) => React.ReactNode;
action: string;
+ autoAdvance?: boolean;
};
const EVENT_TOPICS_STEP: Step = {
@@ -137,6 +138,7 @@ const DESTINATION_TYPE_STEP: Step = {
),
action: "Next",
+ autoAdvance: true,
};
const CONFIGURATION_STEP: Step = {
@@ -383,6 +385,12 @@ export default function CreateDestination() {
const values = Object.fromEntries(formData.entries());
const allValues = { ...stepValues, ...values };
+ if (currentStep.autoAdvance && nextStep && currentStep.isValid?.(allValues)) {
+ setStepValues(allValues);
+ setCurrentStepIndex(currentStepIndex + 1);
+ return;
+ }
+
if (currentStep.isValid) {
setIsValid(currentStep.isValid(allValues));
} else {
@@ -423,16 +431,18 @@ export default function CreateDestination() {
)}
-
-
-
+ {!currentStep.autoAdvance && (
+
+
+
+ )}
diff --git a/internal/portal/src/typings/Destination.ts b/internal/portal/src/typings/Destination.ts
index 459910c14..3caee4c4b 100644
--- a/internal/portal/src/typings/Destination.ts
+++ b/internal/portal/src/typings/Destination.ts
@@ -1,5 +1,5 @@
interface ConfigField {
- type: "text" | "checkbox" | "key_value_map";
+ type: "text" | "checkbox" | "key_value_map" | "select";
label: string;
description: string;
key: string;
@@ -12,6 +12,7 @@ interface ConfigField {
minlength?: number;
maxlength?: number;
pattern?: string;
+ options?: Array<{ label: string; value: string }>;
key_placeholder?: string;
value_placeholder?: string;
}
diff --git a/internal/util/testinfra/kafka.go b/internal/util/testinfra/kafka.go
new file mode 100644
index 000000000..9d72ed0cf
--- /dev/null
+++ b/internal/util/testinfra/kafka.go
@@ -0,0 +1,64 @@
+package testinfra
+
+import (
+ "context"
+ "log"
+ "sync"
+
+ "github.com/testcontainers/testcontainers-go"
+ "github.com/testcontainers/testcontainers-go/wait"
+)
+
+var kafkaOnce sync.Once
+
+func EnsureKafka() string {
+ cfg := ReadConfig()
+ if cfg.KafkaURL == "" {
+ kafkaOnce.Do(func() {
+ startKafkaTestContainer(cfg)
+ })
+ }
+ return cfg.KafkaURL
+}
+
+func startKafkaTestContainer(cfg *Config) {
+ ctx := context.Background()
+
+ // Use a fixed host port so advertised listeners match what clients connect to.
+ // Kafka brokers redirect clients to the advertised address, so if testcontainers
+ // maps to a random port but the broker advertises 9092, clients would fail.
+ const hostPort = "19092"
+
+ req := testcontainers.ContainerRequest{
+ Image: "confluentinc/confluent-local:7.4.0",
+ ExposedPorts: []string{hostPort + ":9092/tcp"},
+ Env: map[string]string{
+ "KAFKA_ADVERTISED_LISTENERS": "PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:" + hostPort,
+ "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT",
+ "KAFKA_LISTENERS": "PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093",
+ "KAFKA_INTER_BROKER_LISTENER_NAME": "PLAINTEXT",
+ "KAFKA_CONTROLLER_LISTENER_NAMES": "CONTROLLER",
+ "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "1",
+ "KAFKA_TRANSACTION_STATE_LOG_MIN_ISR": "1",
+ "KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR": "1",
+ },
+ WaitingFor: wait.ForListeningPort("9092/tcp"),
+ }
+
+ container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
+ ContainerRequest: req,
+ Started: true,
+ })
+ if err != nil {
+ panic(err)
+ }
+
+ endpoint := "localhost:" + hostPort
+ log.Printf("Kafka running at %s", endpoint)
+ cfg.KafkaURL = endpoint
+ cfg.cleanupFns = append(cfg.cleanupFns, func() {
+ if err := container.Terminate(ctx); err != nil {
+ log.Printf("failed to terminate kafka container: %s", err)
+ }
+ })
+}
diff --git a/internal/util/testinfra/testinfra.go b/internal/util/testinfra/testinfra.go
index 3c8cf2cc2..6b4ce3f04 100644
--- a/internal/util/testinfra/testinfra.go
+++ b/internal/util/testinfra/testinfra.go
@@ -26,6 +26,7 @@ type Config struct {
PostgresURL string
LocalStackURL string
RabbitMQURL string
+ KafkaURL string
MockServerURL string
GCPURL string
AzureSBConnString string
@@ -75,6 +76,7 @@ func initConfig() {
GCPURL: v.GetString("TEST_GCP_URL"),
AzureSBConnString: v.GetString("TEST_AZURE_SB_CONNSTRING"),
RabbitMQURL: rabbitmqURL,
+ KafkaURL: v.GetString("TEST_KAFKA_URL"),
MockServerURL: mockServerURL,
}
return
@@ -89,6 +91,7 @@ func initConfig() {
GCPURL: "",
AzureSBConnString: "",
RabbitMQURL: "",
+ KafkaURL: "",
MockServerURL: "",
}
}