From da7bd464f4f238d5b07499d02b26c6b06c92370d Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Sun, 22 Mar 2026 20:58:19 +0700 Subject: [PATCH 01/16] feat: add Apache Kafka destination provider Implement Kafka as a new destination type with SASL auth (plain, scram-sha-256, scram-sha-512), TLS support, and configurable partition keys via JMESPath templates. Extract shared partition key evaluator from Kinesis into reusable partitionkey package. Co-Authored-By: Claude Opus 4.6 --- go.mod | 6 +- go.sum | 11 +- .../metadata/providers/kafka/instructions.md | 21 ++ .../metadata/providers/kafka/metadata.json | 62 ++++ .../destregistry/partitionkey/partitionkey.go | 40 +++ .../partitionkey/partitionkey_test.go | 118 ++++++ internal/destregistry/providers/default.go | 7 + .../destawskinesis/destawskinesis.go | 40 +-- .../destawskinesis_publish_test.go | 35 +- .../providers/destkafka/destkafka.go | 338 ++++++++++++++++++ .../destkafka/destkafka_publish_test.go | 249 +++++++++++++ .../destkafka/destkafka_validate_test.go | 247 +++++++++++++ 12 files changed, 1100 insertions(+), 74 deletions(-) create mode 100644 internal/destregistry/metadata/providers/kafka/instructions.md create mode 100644 internal/destregistry/metadata/providers/kafka/metadata.json create mode 100644 internal/destregistry/partitionkey/partitionkey.go create mode 100644 internal/destregistry/partitionkey/partitionkey_test.go create mode 100644 internal/destregistry/providers/destkafka/destkafka.go create mode 100644 internal/destregistry/providers/destkafka/destkafka_publish_test.go create mode 100644 internal/destregistry/providers/destkafka/destkafka_validate_test.go diff --git a/go.mod b/go.mod index d9056f764..f498cf41b 100644 --- a/go.mod +++ b/go.mod @@ -40,6 +40,7 @@ require ( github.com/rabbitmq/amqp091-go v1.10.0 github.com/redis/go-redis/extra/redisotel/v9 v9.5.3 github.com/redis/go-redis/v9 v9.6.1 + github.com/segmentio/kafka-go v0.4.50 github.com/spf13/viper v1.19.0 github.com/standard-webhooks/standard-webhooks/libraries v0.0.0-20250711233419-a173a6c0125c github.com/stretchr/testify v1.10.0 @@ -205,6 +206,9 @@ require ( github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.12 // indirect github.com/uptrace/opentelemetry-go-extra/otelutil v0.3.1 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect github.com/yuin/gopher-lua v1.1.1 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect go.opencensus.io v0.24.0 // indirect @@ -219,7 +223,7 @@ require ( golang.org/x/crypto v0.37.0 // indirect golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect golang.org/x/mod v0.21.0 // indirect - golang.org/x/net v0.36.0 // indirect + golang.org/x/net v0.38.0 // indirect golang.org/x/sys v0.32.0 // indirect golang.org/x/text v0.24.0 // indirect golang.org/x/time v0.6.0 // indirect diff --git a/go.sum b/go.sum index e0017210b..e7deb082b 100644 --- a/go.sum +++ b/go.sum @@ -1227,6 +1227,8 @@ github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6g github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= +github.com/segmentio/kafka-go v0.4.50 h1:mcyC3tT5WeyWzrFbd6O374t+hmcu1NKt2Pu1L3QaXmc= +github.com/segmentio/kafka-go v0.4.50/go.mod h1:Y1gn60kzLEEaW28YshXyk2+VCUKbJ3Qr6DrnT3i4+9E= github.com/shirou/gopsutil/v4 v4.25.1 h1:QSWkTc+fu9LTAWfkZwZ6j8MSUk4A2LV7rbH0ZqmLjXs= github.com/shirou/gopsutil/v4 v4.25.1/go.mod h1:RoUCUpndaJFtT+2zsZzzmhvbfGoDCJ7nFXKJf8GqJbI= github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= @@ -1297,9 +1299,14 @@ github.com/uptrace/opentelemetry-go-extra/otelzap v0.3.1 h1:0iCp8hx3PFhGihubKHxy github.com/uptrace/opentelemetry-go-extra/otelzap v0.3.1/go.mod h1:FXrjpUJDqwqofvXWG3YNxQwhg2876tUpZASj8VvOMAM= github.com/urfave/cli/v3 v3.4.1 h1:1M9UOCy5bLmGnuu1yn3t3CB4rG79Rtoxuv1sPhnm6qM= github.com/urfave/cli/v3 v3.4.1/go.mod h1:FJSKtM/9AiiTOJL4fJ6TbMUkxBXn7GO9guZqoZtpYpo= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -1533,8 +1540,8 @@ golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= -golang.org/x/net v0.36.0 h1:vWF2fRbw4qslQsQzgFqZff+BItCvGFQqKzKIzx1rmoA= -golang.org/x/net v0.36.0/go.mod h1:bFmbeoIPfrw4sMHNhb4J9f6+tPziuGjq7Jk/38fxi1I= +golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= +golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= diff --git a/internal/destregistry/metadata/providers/kafka/instructions.md b/internal/destregistry/metadata/providers/kafka/instructions.md new file mode 100644 index 000000000..b4c07d24e --- /dev/null +++ b/internal/destregistry/metadata/providers/kafka/instructions.md @@ -0,0 +1,21 @@ +# Apache Kafka Configuration Instructions + +Apache Kafka is a distributed event streaming platform used for building real-time data pipelines and streaming applications. It provides: + +- High-throughput, low-latency message delivery +- Durable message storage with configurable retention +- Horizontal scalability through partitioning +- Built-in support for SASL authentication and TLS encryption + +## How to configure Kafka as an event destination + +To configure Kafka as a destination you must provide: + +- **Brokers** — A comma-separated list of Kafka broker addresses (e.g., `broker1:9092,broker2:9092`) +- **Topic** — The Kafka topic to publish messages to + +### Optional settings + +- **SASL Mechanism** — Authentication mechanism: `plain`, `scram-sha-256`, or `scram-sha-512`. If set, you must also provide **Username** and **Password**. +- **TLS** — Enable TLS encryption for the connection. +- **Partition Key Template** — A JMESPath expression to extract the message key from the event payload. If not set, the event ID is used as the message key. diff --git a/internal/destregistry/metadata/providers/kafka/metadata.json b/internal/destregistry/metadata/providers/kafka/metadata.json new file mode 100644 index 000000000..48bfe5bb4 --- /dev/null +++ b/internal/destregistry/metadata/providers/kafka/metadata.json @@ -0,0 +1,62 @@ +{ + "type": "kafka", + "label": "Apache Kafka", + "description": "Send events to Apache Kafka topics for real-time event streaming", + "link": "https://kafka.apache.org/", + "config_fields": [ + { + "key": "brokers", + "type": "text", + "label": "Brokers", + "description": "Comma-separated list of Kafka broker addresses (e.g., broker1:9092,broker2:9092)", + "required": true + }, + { + "key": "topic", + "type": "text", + "label": "Topic", + "description": "The Kafka topic to publish messages to", + "required": true + }, + { + "key": "sasl_mechanism", + "type": "text", + "label": "SASL Mechanism", + "description": "SASL authentication mechanism (plain, scram-sha-256, scram-sha-512). Leave empty for no authentication.", + "required": false + }, + { + "key": "tls", + "type": "checkbox", + "label": "TLS", + "description": "Enable TLS for the connection", + "default": "false" + }, + { + "key": "partition_key_template", + "type": "text", + "label": "Partition Key Template", + "description": "JMESPath template to extract the partition key from the event payload (e.g., metadata.\"event-id\"). Default is event ID, which is also used as fallback if template evaluation fails or returns empty.", + "required": false + } + ], + "credential_fields": [ + { + "key": "username", + "type": "text", + "label": "Username", + "description": "SASL username for authentication", + "required": false, + "sensitive": false + }, + { + "key": "password", + "type": "text", + "label": "Password", + "description": "SASL password for authentication", + "required": false, + "sensitive": true + } + ], + "icon": "" +} diff --git a/internal/destregistry/partitionkey/partitionkey.go b/internal/destregistry/partitionkey/partitionkey.go new file mode 100644 index 000000000..395829fc8 --- /dev/null +++ b/internal/destregistry/partitionkey/partitionkey.go @@ -0,0 +1,40 @@ +package partitionkey + +import ( + "fmt" + + "github.com/jmespath/go-jmespath" +) + +// Evaluate extracts a partition key from payload using a JMESPath template. +// If the template is empty or evaluation returns nil/empty, fallbackKey is returned. +func Evaluate(template string, payload map[string]interface{}, fallbackKey string) (string, error) { + if template == "" { + return fallbackKey, nil + } + + result, err := jmespath.Search(template, payload) + if err != nil { + return "", fmt.Errorf("error evaluating partition key template: %w", err) + } + + if result == nil { + return fallbackKey, nil + } + + switch v := result.(type) { + case string: + if v == "" { + return fallbackKey, nil + } + return v, nil + case float64: + return fmt.Sprintf("%g", v), nil + case int: + return fmt.Sprintf("%d", v), nil + case bool: + return fmt.Sprintf("%t", v), nil + default: + return fmt.Sprintf("%v", v), nil + } +} diff --git a/internal/destregistry/partitionkey/partitionkey_test.go b/internal/destregistry/partitionkey/partitionkey_test.go new file mode 100644 index 000000000..78d65e67a --- /dev/null +++ b/internal/destregistry/partitionkey/partitionkey_test.go @@ -0,0 +1,118 @@ +package partitionkey_test + +import ( + "testing" + + "github.com/hookdeck/outpost/internal/destregistry/partitionkey" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEvaluate(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + template string + payload map[string]interface{} + fallbackKey string + expected string + expectError bool + }{ + { + name: "empty template returns fallback", + template: "", + payload: map[string]interface{}{"key": "value"}, + fallbackKey: "fallback-123", + expected: "fallback-123", + }, + { + name: "simple field access", + template: "metadata.topic", + payload: map[string]interface{}{ + "metadata": map[string]interface{}{"topic": "test-topic"}, + }, + fallbackKey: "fallback", + expected: "test-topic", + }, + { + name: "nested field access", + template: "data.user.id", + payload: map[string]interface{}{ + "data": map[string]interface{}{ + "user": map[string]interface{}{"id": "user-456"}, + }, + }, + fallbackKey: "fallback", + expected: "user-456", + }, + { + name: "join expression", + template: "join('-', [metadata.topic, metadata.\"event-id\"])", + payload: map[string]interface{}{ + "metadata": map[string]interface{}{ + "topic": "test-topic", + "event-id": "event-123", + }, + }, + fallbackKey: "fallback", + expected: "test-topic-event-123", + }, + { + name: "non-existent field returns fallback", + template: "metadata.nonexistent", + payload: map[string]interface{}{ + "metadata": map[string]interface{}{"topic": "test"}, + }, + fallbackKey: "fallback-123", + expected: "fallback-123", + }, + { + name: "invalid template syntax returns error", + template: "metadata.topic[", + payload: map[string]interface{}{}, + fallbackKey: "fallback", + expectError: true, + }, + { + name: "numeric value", + template: "data.count", + payload: map[string]interface{}{ + "data": map[string]interface{}{"count": float64(123)}, + }, + fallbackKey: "fallback", + expected: "123", + }, + { + name: "boolean value", + template: "data.active", + payload: map[string]interface{}{ + "data": map[string]interface{}{"active": true}, + }, + fallbackKey: "fallback", + expected: "true", + }, + { + name: "empty string result returns fallback", + template: "data.empty", + payload: map[string]interface{}{ + "data": map[string]interface{}{"empty": ""}, + }, + fallbackKey: "fallback-123", + expected: "fallback-123", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + result, err := partitionkey.Evaluate(tt.template, tt.payload, tt.fallbackKey) + if tt.expectError { + assert.Error(t, err) + return + } + require.NoError(t, err) + assert.Equal(t, tt.expected, result) + }) + } +} diff --git a/internal/destregistry/providers/default.go b/internal/destregistry/providers/default.go index ae19b0dfb..c72c0e228 100644 --- a/internal/destregistry/providers/default.go +++ b/internal/destregistry/providers/default.go @@ -8,6 +8,7 @@ import ( "github.com/hookdeck/outpost/internal/destregistry/providers/destazureservicebus" "github.com/hookdeck/outpost/internal/destregistry/providers/destgcppubsub" "github.com/hookdeck/outpost/internal/destregistry/providers/desthookdeck" + "github.com/hookdeck/outpost/internal/destregistry/providers/destkafka" "github.com/hookdeck/outpost/internal/destregistry/providers/destrabbitmq" "github.com/hookdeck/outpost/internal/destregistry/providers/destwebhook" "github.com/hookdeck/outpost/internal/destregistry/providers/destwebhookstandard" @@ -138,5 +139,11 @@ func RegisterDefault(registry destregistry.Registry, opts RegisterDefaultDestina } registry.RegisterProvider("rabbitmq", rabbitmq) + kafkaDest, err := destkafka.New(loader, basePublisherOpts) + if err != nil { + return err + } + registry.RegisterProvider("kafka", kafkaDest) + return nil } diff --git a/internal/destregistry/providers/destawskinesis/destawskinesis.go b/internal/destregistry/providers/destawskinesis/destawskinesis.go index e62e28099..7d5ec3e5a 100644 --- a/internal/destregistry/providers/destawskinesis/destawskinesis.go +++ b/internal/destregistry/providers/destawskinesis/destawskinesis.go @@ -14,8 +14,8 @@ import ( "github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/hookdeck/outpost/internal/destregistry" "github.com/hookdeck/outpost/internal/destregistry/metadata" + "github.com/hookdeck/outpost/internal/destregistry/partitionkey" "github.com/hookdeck/outpost/internal/models" - "github.com/jmespath/go-jmespath" ) // Configuration types @@ -182,42 +182,6 @@ func (p *AWSKinesisPublisher) Close() error { return nil } -// evaluatePartitionKey extracts the partition key from the event using the JMESPath template -func (p *AWSKinesisPublisher) evaluatePartitionKey(payload map[string]interface{}, eventID string) (string, error) { - // If no template is specified or empty, use event ID - if p.partitionKeyTemplate == "" { - return eventID, nil - } - - // Evaluate the JMESPath template - result, err := jmespath.Search(p.partitionKeyTemplate, payload) - if err != nil { - return "", fmt.Errorf("error evaluating partition key template: %w", err) - } - - // Handle nil result - fall back to event ID - if result == nil { - return eventID, nil - } - - // Convert the result to string based on its type - switch v := result.(type) { - case string: - if v == "" { - return eventID, nil // Fall back to event ID if empty string - } - return v, nil - case float64: - return fmt.Sprintf("%g", v), nil - case int: - return fmt.Sprintf("%d", v), nil - case bool: - return fmt.Sprintf("%t", v), nil - default: - return fmt.Sprintf("%v", v), nil - } -} - // Format prepares the event for sending to Kinesis func (p *AWSKinesisPublisher) Format(ctx context.Context, event *models.Event) (*kinesis.PutRecordInput, error) { var payload map[string]interface{} @@ -270,7 +234,7 @@ func (p *AWSKinesisPublisher) Format(ctx context.Context, event *models.Event) ( } // Get partition key from template or use event ID as default - partitionKey, err := p.evaluatePartitionKey(payload, event.ID) + partitionKey, err := partitionkey.Evaluate(p.partitionKeyTemplate, payload, event.ID) if err != nil { // If template evaluation fails, log the error and fall back to event ID partitionKey = event.ID diff --git a/internal/destregistry/providers/destawskinesis/destawskinesis_publish_test.go b/internal/destregistry/providers/destawskinesis/destawskinesis_publish_test.go index 601e0fd97..1b323fad7 100644 --- a/internal/destregistry/providers/destawskinesis/destawskinesis_publish_test.go +++ b/internal/destregistry/providers/destawskinesis/destawskinesis_publish_test.go @@ -13,13 +13,13 @@ import ( "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/aws/aws-sdk-go-v2/service/kinesis/types" + "github.com/hookdeck/outpost/internal/destregistry/partitionkey" "github.com/hookdeck/outpost/internal/destregistry/providers/destawskinesis" testsuite "github.com/hookdeck/outpost/internal/destregistry/testing" "github.com/hookdeck/outpost/internal/idgen" "github.com/hookdeck/outpost/internal/models" "github.com/hookdeck/outpost/internal/util/testinfra" "github.com/hookdeck/outpost/internal/util/testutil" - "github.com/jmespath/go-jmespath" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -164,38 +164,7 @@ type KinesisAsserter struct { // evaluateTemplate is a test helper that evaluates a JMESPath template against payload data func (a *KinesisAsserter) evaluateTemplate(payload map[string]interface{}, eventID string) (string, error) { - // If no template is specified or empty, use event ID - if a.partitionKeyTemplate == "" { - return eventID, nil - } - - // Evaluate the JMESPath template - result, err := jmespath.Search(a.partitionKeyTemplate, payload) - if err != nil { - return "", fmt.Errorf("error evaluating partition key template: %w", err) - } - - // Handle nil result - fall back to event ID - if result == nil { - return eventID, nil - } - - // Convert the result to string based on its type - switch v := result.(type) { - case string: - if v == "" { - return eventID, nil // Fall back to event ID if empty string - } - return v, nil - case float64: - return fmt.Sprintf("%g", v), nil - case int: - return fmt.Sprintf("%d", v), nil - case bool: - return fmt.Sprintf("%t", v), nil - default: - return fmt.Sprintf("%v", v), nil - } + return partitionkey.Evaluate(a.partitionKeyTemplate, payload, eventID) } func (a *KinesisAsserter) AssertMessage(t testsuite.TestingT, msg testsuite.Message, event models.Event) { diff --git a/internal/destregistry/providers/destkafka/destkafka.go b/internal/destregistry/providers/destkafka/destkafka.go new file mode 100644 index 000000000..506528a97 --- /dev/null +++ b/internal/destregistry/providers/destkafka/destkafka.go @@ -0,0 +1,338 @@ +package destkafka + +import ( + "context" + "crypto/tls" + "fmt" + "strings" + "time" + + "github.com/hookdeck/outpost/internal/destregistry" + "github.com/hookdeck/outpost/internal/destregistry/metadata" + "github.com/hookdeck/outpost/internal/destregistry/partitionkey" + "github.com/hookdeck/outpost/internal/models" + "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl" + "github.com/segmentio/kafka-go/sasl/plain" + "github.com/segmentio/kafka-go/sasl/scram" +) + +// Configuration types + +type KafkaConfig struct { + Brokers []string + Topic string + SASLMechanism string + UseTLS bool + PartitionKeyTemplate string +} + +type KafkaCredentials struct { + Username string + Password string +} + +// Provider implementation + +type KafkaDestination struct { + *destregistry.BaseProvider +} + +var _ destregistry.Provider = (*KafkaDestination)(nil) + +func New(loader metadata.MetadataLoader, basePublisherOpts []destregistry.BasePublisherOption) (*KafkaDestination, error) { + base, err := destregistry.NewBaseProvider(loader, "kafka", basePublisherOpts...) + if err != nil { + return nil, err + } + return &KafkaDestination{BaseProvider: base}, nil +} + +func (d *KafkaDestination) Validate(ctx context.Context, destination *models.Destination) error { + if err := d.BaseProvider.Validate(ctx, destination); err != nil { + return err + } + + // Validate SASL mechanism if provided + if mechanism := destination.Config["sasl_mechanism"]; mechanism != "" { + switch mechanism { + case "plain", "scram-sha-256", "scram-sha-512": + // valid + default: + return destregistry.NewErrDestinationValidation([]destregistry.ValidationErrorDetail{ + { + Field: "config.sasl_mechanism", + Type: "invalid", + }, + }) + } + } + + // Validate TLS config if provided + if tlsStr, ok := destination.Config["tls"]; ok { + if tlsStr != "on" && tlsStr != "true" && tlsStr != "false" { + return destregistry.NewErrDestinationValidation([]destregistry.ValidationErrorDetail{ + { + Field: "config.tls", + Type: "invalid", + }, + }) + } + } + + return nil +} + +func (d *KafkaDestination) CreatePublisher(ctx context.Context, destination *models.Destination) (destregistry.Publisher, error) { + config, credentials, err := d.resolveConfig(ctx, destination) + if err != nil { + return nil, err + } + + // Build SASL mechanism + var mechanism sasl.Mechanism + if config.SASLMechanism != "" { + mechanism, err = buildSASLMechanism(config.SASLMechanism, credentials) + if err != nil { + return nil, fmt.Errorf("failed to configure SASL: %w", err) + } + } + + // Build transport + transport := &kafka.Transport{} + if mechanism != nil { + transport.SASL = mechanism + } + if config.UseTLS { + transport.TLS = &tls.Config{} + } + + // Create writer with hash balancer for consistent partition key routing + writer := &kafka.Writer{ + Addr: kafka.TCP(config.Brokers...), + Topic: config.Topic, + Balancer: &kafka.Hash{}, + Transport: transport, + } + + return &KafkaPublisher{ + BasePublisher: d.BaseProvider.NewPublisher(destregistry.WithDeliveryMetadata(destination.DeliveryMetadata)), + writer: writer, + partitionKeyTemplate: config.PartitionKeyTemplate, + }, nil +} + +func (d *KafkaDestination) resolveConfig(ctx context.Context, destination *models.Destination) (*KafkaConfig, *KafkaCredentials, error) { + if err := d.Validate(ctx, destination); err != nil { + return nil, nil, err + } + + // Parse brokers + brokersStr := destination.Config["brokers"] + brokers := parseBrokers(brokersStr) + + useTLS := false + if tlsStr, ok := destination.Config["tls"]; ok { + useTLS = tlsStr == "true" || tlsStr == "on" + } + + return &KafkaConfig{ + Brokers: brokers, + Topic: destination.Config["topic"], + SASLMechanism: destination.Config["sasl_mechanism"], + UseTLS: useTLS, + PartitionKeyTemplate: destination.Config["partition_key_template"], + }, &KafkaCredentials{ + Username: destination.Credentials["username"], + Password: destination.Credentials["password"], + }, nil +} + +func (d *KafkaDestination) Preprocess(newDestination *models.Destination, originalDestination *models.Destination, opts *destregistry.PreprocessDestinationOpts) error { + if newDestination.Config == nil { + return nil + } + + // Normalize TLS value + if newDestination.Config["tls"] == "on" { + newDestination.Config["tls"] = "true" + } else if newDestination.Config["tls"] == "" { + newDestination.Config["tls"] = "false" + } + + // Trim whitespace from brokers + if brokers := newDestination.Config["brokers"]; brokers != "" { + parts := strings.Split(brokers, ",") + for i := range parts { + parts[i] = strings.TrimSpace(parts[i]) + } + newDestination.Config["brokers"] = strings.Join(parts, ",") + } + + if _, _, err := d.resolveConfig(context.Background(), newDestination); err != nil { + return err + } + return nil +} + +func (d *KafkaDestination) ComputeTarget(destination *models.Destination) destregistry.DestinationTarget { + brokers := parseBrokers(destination.Config["brokers"]) + topic := destination.Config["topic"] + + var target string + if len(brokers) > 0 { + target = fmt.Sprintf("%s / %s", brokers[0], topic) + } else { + target = topic + } + + return destregistry.DestinationTarget{ + Target: target, + TargetURL: "", + } +} + +// Publisher implementation + +type KafkaPublisher struct { + *destregistry.BasePublisher + writer *kafka.Writer + partitionKeyTemplate string +} + +func (p *KafkaPublisher) Close() error { + p.BasePublisher.StartClose() + return p.writer.Close() +} + +func (p *KafkaPublisher) Publish(ctx context.Context, event *models.Event) (*destregistry.Delivery, error) { + if err := p.BasePublisher.StartPublish(); err != nil { + return nil, err + } + defer p.BasePublisher.FinishPublish() + + // Build metadata for headers and partition key evaluation + meta := p.BasePublisher.MakeMetadata(event, time.Now()) + metadataMap := make(map[string]interface{}) + for k, v := range meta { + metadataMap[k] = v + } + + // Build parsed payload for partition key JMESPath evaluation + dataMap, err := event.ParsedData() + if err != nil { + return nil, destregistry.NewErrDestinationPublishAttempt( + err, "kafka", map[string]interface{}{ + "error": "format_failed", + "message": err.Error(), + }, + ) + } + if dataMap == nil { + dataMap = make(map[string]interface{}) + } + payload := map[string]interface{}{ + "metadata": metadataMap, + "data": dataMap, + } + + // Evaluate partition key + key, err := partitionkey.Evaluate(p.partitionKeyTemplate, payload, event.ID) + if err != nil { + key = event.ID + } + + // Build Kafka headers from metadata + headers := make([]kafka.Header, 0, len(meta)+1) + headers = append(headers, kafka.Header{Key: "content-type", Value: []byte("application/json")}) + for k, v := range meta { + headers = append(headers, kafka.Header{Key: k, Value: []byte(v)}) + } + + // Write message — send raw event data as value, metadata in headers only + msg := kafka.Message{ + Key: []byte(key), + Value: []byte(event.Data), + Headers: headers, + } + + if err := p.writer.WriteMessages(ctx, msg); err != nil { + return &destregistry.Delivery{ + Status: "failed", + Code: ClassifyKafkaError(err), + Response: map[string]interface{}{ + "error": err.Error(), + }, + }, destregistry.NewErrDestinationPublishAttempt(err, "kafka", map[string]interface{}{ + "error": ClassifyKafkaError(err), + "message": err.Error(), + }) + } + + return &destregistry.Delivery{ + Status: "success", + Code: "OK", + Response: map[string]interface{}{}, + }, nil +} + +// ClassifyKafkaError returns a descriptive error code based on the error type. +func ClassifyKafkaError(err error) string { + if err == nil { + return "unknown" + } + + errStr := err.Error() + + switch { + case strings.Contains(errStr, "SASL") || strings.Contains(errStr, "auth") || strings.Contains(errStr, "Authentication"): + return "auth_failed" + case strings.Contains(errStr, "connection refused"): + return "connection_refused" + case strings.Contains(errStr, "no such host"): + return "dns_error" + case strings.Contains(errStr, "Unknown Topic") || strings.Contains(errStr, "UNKNOWN_TOPIC"): + return "topic_not_found" + case strings.Contains(errStr, "Message Size Too Large") || strings.Contains(errStr, "MESSAGE_TOO_LARGE"): + return "message_too_large" + case strings.Contains(errStr, "i/o timeout") || strings.Contains(errStr, "context deadline exceeded"): + return "timeout" + case strings.Contains(errStr, "tls:") || strings.Contains(errStr, "x509:"): + return "tls_error" + default: + return "kafka_error" + } +} + +// Helper functions + +func parseBrokers(brokersStr string) []string { + if brokersStr == "" { + return nil + } + parts := strings.Split(brokersStr, ",") + brokers := make([]string, 0, len(parts)) + for _, p := range parts { + trimmed := strings.TrimSpace(p) + if trimmed != "" { + brokers = append(brokers, trimmed) + } + } + return brokers +} + +func buildSASLMechanism(mechanism string, creds *KafkaCredentials) (sasl.Mechanism, error) { + switch mechanism { + case "plain": + return &plain.Mechanism{ + Username: creds.Username, + Password: creds.Password, + }, nil + case "scram-sha-256": + return scram.Mechanism(scram.SHA256, creds.Username, creds.Password) + case "scram-sha-512": + return scram.Mechanism(scram.SHA512, creds.Username, creds.Password) + default: + return nil, fmt.Errorf("unsupported SASL mechanism: %s", mechanism) + } +} diff --git a/internal/destregistry/providers/destkafka/destkafka_publish_test.go b/internal/destregistry/providers/destkafka/destkafka_publish_test.go new file mode 100644 index 000000000..2b1d55128 --- /dev/null +++ b/internal/destregistry/providers/destkafka/destkafka_publish_test.go @@ -0,0 +1,249 @@ +package destkafka_test + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/hookdeck/outpost/internal/destregistry/providers/destkafka" + testsuite "github.com/hookdeck/outpost/internal/destregistry/testing" + "github.com/hookdeck/outpost/internal/idgen" + "github.com/hookdeck/outpost/internal/models" + "github.com/hookdeck/outpost/internal/util/testinfra" + "github.com/hookdeck/outpost/internal/util/testutil" + "github.com/segmentio/kafka-go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +// KafkaConsumer implements testsuite.MessageConsumer +type KafkaConsumer struct { + reader *kafka.Reader + msgChan chan testsuite.Message + done chan struct{} + shuttingDown bool + wg sync.WaitGroup +} + +func NewKafkaConsumer(brokerAddr, topic string) (*KafkaConsumer, error) { + reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{brokerAddr}, + Topic: topic, + StartOffset: kafka.FirstOffset, + MaxWait: 500 * time.Millisecond, + }) + + c := &KafkaConsumer{ + reader: reader, + msgChan: make(chan testsuite.Message, 100), + done: make(chan struct{}), + } + c.wg.Add(1) + go c.consume() + return c, nil +} + +func (c *KafkaConsumer) consume() { + defer c.wg.Done() + + for { + select { + case <-c.done: + return + default: + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + msg, err := c.reader.ReadMessage(ctx) + cancel() + if err != nil { + continue + } + + // Extract metadata from headers + metadata := make(map[string]string) + for _, h := range msg.Headers { + metadata[h.Key] = string(h.Value) + } + + if !c.shuttingDown { + c.msgChan <- testsuite.Message{ + Data: msg.Value, + Metadata: metadata, + Raw: msg, + } + } + } + } +} + +func (c *KafkaConsumer) Consume() <-chan testsuite.Message { + return c.msgChan +} + +func (c *KafkaConsumer) Close() error { + c.shuttingDown = true + close(c.done) + c.wg.Wait() + close(c.msgChan) + return c.reader.Close() +} + +// KafkaAsserter implements testsuite.MessageAsserter +type KafkaAsserter struct{} + +func (a *KafkaAsserter) AssertMessage(t testsuite.TestingT, msg testsuite.Message, event models.Event) { + kafkaMsg, ok := msg.Raw.(kafka.Message) + assert.True(t, ok, "raw message should be kafka.Message") + + // Verify content-type header + assert.Equal(t, "application/json", msg.Metadata["content-type"]) + + // Verify message key is set (event ID by default) + assert.Equal(t, event.ID, string(kafkaMsg.Key), "message key should be event ID") + + // Verify system metadata in headers + metadata := msg.Metadata + assert.NotEmpty(t, metadata["timestamp"], "timestamp should be present") + testsuite.AssertTimestampIsUnixSeconds(t, metadata["timestamp"]) + assert.Equal(t, event.ID, metadata["event-id"], "event-id should match") + assert.Equal(t, event.Topic, metadata["topic"], "topic should match") + + // Verify custom metadata + for k, v := range event.Metadata { + assert.Equal(t, v, metadata[k], "metadata key %s should match expected value", k) + } +} + +// KafkaPublishSuite runs the shared publisher test suite for Kafka +type KafkaPublishSuite struct { + testsuite.PublisherSuite + consumer *KafkaConsumer +} + +func (s *KafkaPublishSuite) SetupSuite() { + t := s.T() + t.Cleanup(testinfra.Start(t)) + + brokerAddr := testinfra.EnsureKafka() + topic := "test-topic-" + idgen.String() + + // Ensure topic exists by creating it + ensureKafkaTopic(t, brokerAddr, topic) + + provider, err := destkafka.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + dest := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("kafka"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "brokers": brokerAddr, + "topic": topic, + }), + testutil.DestinationFactory.WithCredentials(map[string]string{}), + ) + + consumer, err := NewKafkaConsumer(brokerAddr, topic) + require.NoError(t, err) + s.consumer = consumer + + s.InitSuite(testsuite.Config{ + Provider: provider, + Dest: &dest, + Consumer: consumer, + Asserter: &KafkaAsserter{}, + }) +} + +func (s *KafkaPublishSuite) TearDownSuite() { + if s.consumer != nil { + s.consumer.Close() + } +} + +func TestKafkaPublishIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + suite.Run(t, new(KafkaPublishSuite)) +} + +// TestKafkaPublisher_ConnectionErrors tests that connection errors return a Delivery object alongside the error. +func TestKafkaPublisher_ConnectionErrors(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + brokers string + description string + expectedCode string + }{ + { + name: "connection refused", + brokers: "127.0.0.1:1", + description: "simulates a server that is not running", + expectedCode: "connection_refused", + }, + { + name: "DNS failure", + brokers: "this-domain-does-not-exist-abc123xyz.invalid:9092", + description: "simulates an invalid/non-existent domain", + expectedCode: "dns_error", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + provider, err := destkafka.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("kafka"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "brokers": tt.brokers, + "topic": "test-topic", + }), + testutil.DestinationFactory.WithCredentials(map[string]string{}), + ) + + publisher, err := provider.CreatePublisher(context.Background(), &destination) + require.NoError(t, err) + defer publisher.Close() + + event := testutil.EventFactory.Any( + testutil.EventFactory.WithDataMap(map[string]interface{}{"key": "value"}), + ) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + delivery, err := publisher.Publish(ctx, &event) + + require.Error(t, err, "should return error for %s", tt.description) + require.NotNil(t, delivery, "delivery should NOT be nil for connection errors") + assert.Equal(t, "failed", delivery.Status, "delivery status should be 'failed'") + assert.Equal(t, tt.expectedCode, delivery.Code, "delivery code should indicate error type") + }) + } +} + +// Helper to ensure a Kafka topic exists +func ensureKafkaTopic(t *testing.T, brokerAddr, topic string) { + t.Helper() + + conn, err := kafka.Dial("tcp", brokerAddr) + require.NoError(t, err) + defer conn.Close() + + err = conn.CreateTopics(kafka.TopicConfig{ + Topic: topic, + NumPartitions: 1, + ReplicationFactor: 1, + }) + require.NoError(t, err) + + // Give Kafka a moment to fully create the topic + time.Sleep(500 * time.Millisecond) +} diff --git a/internal/destregistry/providers/destkafka/destkafka_validate_test.go b/internal/destregistry/providers/destkafka/destkafka_validate_test.go new file mode 100644 index 000000000..2f6c7faab --- /dev/null +++ b/internal/destregistry/providers/destkafka/destkafka_validate_test.go @@ -0,0 +1,247 @@ +package destkafka_test + +import ( + "context" + "maps" + "testing" + + "github.com/hookdeck/outpost/internal/destregistry" + "github.com/hookdeck/outpost/internal/destregistry/providers/destkafka" + "github.com/hookdeck/outpost/internal/util/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestKafkaDestination_Validate(t *testing.T) { + t.Parallel() + + validDestination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("kafka"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "brokers": "localhost:9092", + "topic": "test-topic", + }), + testutil.DestinationFactory.WithCredentials(map[string]string{}), + ) + + kafkaDestination, err := destkafka.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + t.Run("should validate valid destination", func(t *testing.T) { + t.Parallel() + assert.NoError(t, kafkaDestination.Validate(context.Background(), &validDestination)) + }) + + t.Run("should validate invalid type", func(t *testing.T) { + t.Parallel() + dest := validDestination + dest.Config = maps.Clone(validDestination.Config) + dest.Credentials = maps.Clone(validDestination.Credentials) + dest.Type = "invalid" + err := kafkaDestination.Validate(context.Background(), &dest) + var validationErr *destregistry.ErrDestinationValidation + assert.ErrorAs(t, err, &validationErr) + assert.Equal(t, "type", validationErr.Errors[0].Field) + assert.Equal(t, "invalid_type", validationErr.Errors[0].Type) + }) + + t.Run("should validate missing brokers", func(t *testing.T) { + t.Parallel() + dest := validDestination + dest.Config = map[string]string{ + "topic": "test-topic", + } + dest.Credentials = maps.Clone(validDestination.Credentials) + err := kafkaDestination.Validate(context.Background(), &dest) + var validationErr *destregistry.ErrDestinationValidation + assert.ErrorAs(t, err, &validationErr) + assert.Equal(t, "config.brokers", validationErr.Errors[0].Field) + assert.Equal(t, "required", validationErr.Errors[0].Type) + }) + + t.Run("should validate missing topic", func(t *testing.T) { + t.Parallel() + dest := validDestination + dest.Config = map[string]string{ + "brokers": "localhost:9092", + } + dest.Credentials = maps.Clone(validDestination.Credentials) + err := kafkaDestination.Validate(context.Background(), &dest) + var validationErr *destregistry.ErrDestinationValidation + assert.ErrorAs(t, err, &validationErr) + assert.Equal(t, "config.topic", validationErr.Errors[0].Field) + assert.Equal(t, "required", validationErr.Errors[0].Type) + }) + + t.Run("should validate sasl_mechanism values", func(t *testing.T) { + t.Parallel() + testCases := []struct { + name string + mechanism string + shouldError bool + }{ + {name: "valid plain", mechanism: "plain", shouldError: false}, + {name: "valid scram-sha-256", mechanism: "scram-sha-256", shouldError: false}, + {name: "valid scram-sha-512", mechanism: "scram-sha-512", shouldError: false}, + {name: "invalid mechanism", mechanism: "oauth", shouldError: true}, + {name: "empty is valid (no auth)", mechanism: "", shouldError: false}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + dest := validDestination + dest.Config = maps.Clone(validDestination.Config) + dest.Credentials = maps.Clone(validDestination.Credentials) + if tc.mechanism != "" { + dest.Config["sasl_mechanism"] = tc.mechanism + } + err := kafkaDestination.Validate(context.Background(), &dest) + if tc.shouldError { + var validationErr *destregistry.ErrDestinationValidation + if !assert.ErrorAs(t, err, &validationErr) { + return + } + assert.Equal(t, "config.sasl_mechanism", validationErr.Errors[0].Field) + assert.Equal(t, "invalid", validationErr.Errors[0].Type) + } else { + assert.NoError(t, err) + } + }) + } + }) + + t.Run("should validate tls config values", func(t *testing.T) { + t.Parallel() + testCases := []struct { + name string + tlsValue string + shouldError bool + }{ + {name: "valid true", tlsValue: "true", shouldError: false}, + {name: "valid on", tlsValue: "on", shouldError: false}, + {name: "valid false", tlsValue: "false", shouldError: false}, + {name: "invalid value", tlsValue: "yes", shouldError: true}, + {name: "empty value", tlsValue: "", shouldError: true}, + {name: "case sensitive True", tlsValue: "True", shouldError: true}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + dest := validDestination + dest.Config = maps.Clone(validDestination.Config) + dest.Credentials = maps.Clone(validDestination.Credentials) + dest.Config["tls"] = tc.tlsValue + err := kafkaDestination.Validate(context.Background(), &dest) + if tc.shouldError { + var validationErr *destregistry.ErrDestinationValidation + if !assert.ErrorAs(t, err, &validationErr) { + return + } + assert.Equal(t, "config.tls", validationErr.Errors[0].Field) + assert.Equal(t, "invalid", validationErr.Errors[0].Type) + } else { + assert.NoError(t, err) + } + }) + } + }) + + t.Run("should allow tls to be omitted", func(t *testing.T) { + t.Parallel() + dest := validDestination + dest.Config = maps.Clone(validDestination.Config) + dest.Credentials = maps.Clone(validDestination.Credentials) + delete(dest.Config, "tls") + assert.NoError(t, kafkaDestination.Validate(context.Background(), &dest)) + }) +} + +func TestKafkaDestination_ComputeTarget(t *testing.T) { + t.Parallel() + + kafkaDestination, err := destkafka.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + t.Run("should return 'broker / topic' for single broker", func(t *testing.T) { + t.Parallel() + dest := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("kafka"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "brokers": "broker1:9092", + "topic": "my-topic", + }), + ) + target := kafkaDestination.ComputeTarget(&dest) + assert.Equal(t, "broker1:9092 / my-topic", target.Target) + assert.Empty(t, target.TargetURL) + }) + + t.Run("should return first broker for multiple brokers", func(t *testing.T) { + t.Parallel() + dest := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("kafka"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "brokers": "broker1:9092,broker2:9092,broker3:9092", + "topic": "my-topic", + }), + ) + target := kafkaDestination.ComputeTarget(&dest) + assert.Equal(t, "broker1:9092 / my-topic", target.Target) + }) +} + +func TestKafkaDestination_Preprocess(t *testing.T) { + t.Parallel() + + kafkaDestination, err := destkafka.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + t.Run("should normalize tls 'on' to 'true'", func(t *testing.T) { + t.Parallel() + dest := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("kafka"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "brokers": "broker1:9092", + "topic": "my-topic", + "tls": "on", + }), + testutil.DestinationFactory.WithCredentials(map[string]string{}), + ) + err := kafkaDestination.Preprocess(&dest, nil, nil) + require.NoError(t, err) + assert.Equal(t, "true", dest.Config["tls"]) + }) + + t.Run("should normalize empty tls to 'false'", func(t *testing.T) { + t.Parallel() + dest := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("kafka"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "brokers": "broker1:9092", + "topic": "my-topic", + "tls": "", + }), + testutil.DestinationFactory.WithCredentials(map[string]string{}), + ) + err := kafkaDestination.Preprocess(&dest, nil, nil) + require.NoError(t, err) + assert.Equal(t, "false", dest.Config["tls"]) + }) + + t.Run("should trim whitespace from brokers", func(t *testing.T) { + t.Parallel() + dest := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("kafka"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "brokers": " broker1:9092 , broker2:9092 ", + "topic": "my-topic", + }), + testutil.DestinationFactory.WithCredentials(map[string]string{}), + ) + err := kafkaDestination.Preprocess(&dest, nil, nil) + require.NoError(t, err) + assert.Equal(t, "broker1:9092,broker2:9092", dest.Config["brokers"]) + }) +} From 37ce06400909c3384c110dc49876e2d131c4bcfe Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Sun, 22 Mar 2026 20:58:30 +0700 Subject: [PATCH 02/16] chore: add Kafka test infrastructure Add Kafka to test compose (confluent-local), testinfra helper, and rename test services with test- prefix to avoid name collisions when sharing the outpost Docker network. Co-Authored-By: Claude Opus 4.6 --- .env.test | 1 + build/test/compose.yml | 24 ++++++++--- internal/util/testinfra/kafka.go | 63 ++++++++++++++++++++++++++++ internal/util/testinfra/testinfra.go | 2 + 4 files changed, 84 insertions(+), 6 deletions(-) create mode 100644 internal/util/testinfra/kafka.go diff --git a/.env.test b/.env.test index d71503a81..26e04f292 100644 --- a/.env.test +++ b/.env.test @@ -3,6 +3,7 @@ TEST_POSTGRES_URL="localhost:35432" TEST_CLICKHOUSE_URL="localhost:39000" # MQs TEST_RABBITMQ_URL="localhost:35672" +TEST_KAFKA_URL="localhost:39092" TEST_LOCALSTACK_URL="localhost:34566" TEST_GCP_URL="localhost:38085" TEST_AZURE_SB_CONNSTRING="Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;" diff --git a/build/test/compose.yml b/build/test/compose.yml index 442992832..d486ce321 100644 --- a/build/test/compose.yml +++ b/build/test/compose.yml @@ -1,18 +1,18 @@ name: "outpost-test" services: - mock: + test-mock: image: outpost-mock build: context: ../../ dockerfile: ./build/test/Dockerfile.mock ports: - 35555:5555 - clickhouse: + test-clickhouse: image: clickhouse/clickhouse-server:24-alpine ports: - 39000:9000 - postgres: + test-postgres: image: postgres:16-alpine environment: - POSTGRES_USER=outpost @@ -20,19 +20,26 @@ services: - POSTGRES_DB=default ports: - 35432:5432 - rabbitmq: + test-rabbitmq: image: rabbitmq:3-management ports: - 35672:5672 - 45672:15672 - aws: + test-aws: image: localstack/localstack:latest environment: - SERVICES=s3,sns,sts,sqs,kinesis ports: - 34566:4566 - 34571:4571 - gcp: + test-kafka: + image: confluentinc/confluent-local:7.4.0 + environment: + - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://test-kafka:29092,PLAINTEXT_HOST://localhost:39092 + - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:9092 + ports: + - 39092:9092 + test-gcp: image: gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators command: [ @@ -46,3 +53,8 @@ services: ] ports: - "38085:8085" + +networks: + default: + name: outpost + external: true diff --git a/internal/util/testinfra/kafka.go b/internal/util/testinfra/kafka.go new file mode 100644 index 000000000..ba7df6036 --- /dev/null +++ b/internal/util/testinfra/kafka.go @@ -0,0 +1,63 @@ +package testinfra + +import ( + "context" + "log" + "sync" + + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" +) + +var kafkaOnce sync.Once + +func EnsureKafka() string { + cfg := ReadConfig() + if cfg.KafkaURL == "" { + kafkaOnce.Do(func() { + startKafkaTestContainer(cfg) + }) + } + return cfg.KafkaURL +} + +func startKafkaTestContainer(cfg *Config) { + ctx := context.Background() + + req := testcontainers.ContainerRequest{ + Image: "confluentinc/confluent-local:7.4.0", + ExposedPorts: []string{"9092/tcp"}, + Env: map[string]string{ + "KAFKA_ADVERTISED_LISTENERS": "PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:9092", + "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT", + "KAFKA_LISTENERS": "PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093", + "KAFKA_INTER_BROKER_LISTENER_NAME": "PLAINTEXT", + "KAFKA_CONTROLLER_LISTENER_NAMES": "CONTROLLER", + "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "1", + "KAFKA_TRANSACTION_STATE_LOG_MIN_ISR": "1", + "KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR": "1", + }, + WaitingFor: wait.ForListeningPort("9092/tcp"), + } + + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + if err != nil { + panic(err) + } + + endpoint, err := container.PortEndpoint(ctx, "9092/tcp", "") + if err != nil { + panic(err) + } + + log.Printf("Kafka running at %s", endpoint) + cfg.KafkaURL = endpoint + cfg.cleanupFns = append(cfg.cleanupFns, func() { + if err := container.Terminate(ctx); err != nil { + log.Printf("failed to terminate kafka container: %s", err) + } + }) +} diff --git a/internal/util/testinfra/testinfra.go b/internal/util/testinfra/testinfra.go index 3c8cf2cc2..1b7166c21 100644 --- a/internal/util/testinfra/testinfra.go +++ b/internal/util/testinfra/testinfra.go @@ -26,6 +26,7 @@ type Config struct { PostgresURL string LocalStackURL string RabbitMQURL string + KafkaURL string MockServerURL string GCPURL string AzureSBConnString string @@ -75,6 +76,7 @@ func initConfig() { GCPURL: v.GetString("TEST_GCP_URL"), AzureSBConnString: v.GetString("TEST_AZURE_SB_CONNSTRING"), RabbitMQURL: rabbitmqURL, + KafkaURL: v.GetString("TEST_KAFKA_URL"), MockServerURL: mockServerURL, } return From 5774ff85474c66d163bd8722bd9300b8cc0cfb7a Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Sun, 22 Mar 2026 21:40:00 +0700 Subject: [PATCH 03/16] fix(portal): fix destination type list scroll and config form overlap Make destination type list scrollable and fix Create Destination button overlapping form fields when there are many config fields (e.g. Kafka). Co-Authored-By: Claude Opus 4.6 --- .../src/scenes/CreateDestination/CreateDestination.scss | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/portal/src/scenes/CreateDestination/CreateDestination.scss b/internal/portal/src/scenes/CreateDestination/CreateDestination.scss index 7a3a59eac..a784db00b 100644 --- a/internal/portal/src/scenes/CreateDestination/CreateDestination.scss +++ b/internal/portal/src/scenes/CreateDestination/CreateDestination.scss @@ -87,8 +87,9 @@ } &__fields { - flex: 0 1 auto; + flex: 1 1 auto; min-height: 0; + overflow-y: auto; } &__actions { @@ -110,6 +111,7 @@ overflow-y: auto; display: flex; flex-direction: column; + height: 100%; } } From baa9e517542a6c2da3ac4d6950908a1c2772d53f Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Sun, 22 Mar 2026 22:46:28 +0700 Subject: [PATCH 04/16] feat(portal): add select field type for destination config Add generic select field type to the portal destination config system and use it for Kafka SASL Mechanism, replacing the free-text input with a dropdown (None, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512). Closes #141 Co-Authored-By: Claude Opus 4.6 --- .../metadata/providers/kafka/metadata.json | 12 +++++-- internal/destregistry/metadata/types.go | 34 +++++++++++-------- .../DestinationConfigFields.scss | 10 ++++++ .../DestinationConfigFields.tsx | 20 +++++++++++ internal/portal/src/global.scss | 3 +- internal/portal/src/typings/Destination.ts | 3 +- 6 files changed, 63 insertions(+), 19 deletions(-) diff --git a/internal/destregistry/metadata/providers/kafka/metadata.json b/internal/destregistry/metadata/providers/kafka/metadata.json index 48bfe5bb4..383ca06b3 100644 --- a/internal/destregistry/metadata/providers/kafka/metadata.json +++ b/internal/destregistry/metadata/providers/kafka/metadata.json @@ -20,10 +20,16 @@ }, { "key": "sasl_mechanism", - "type": "text", + "type": "select", "label": "SASL Mechanism", - "description": "SASL authentication mechanism (plain, scram-sha-256, scram-sha-512). Leave empty for no authentication.", - "required": false + "description": "SASL authentication mechanism. Leave empty for no authentication.", + "required": false, + "options": [ + { "label": "None", "value": "" }, + { "label": "PLAIN", "value": "plain" }, + { "label": "SCRAM-SHA-256", "value": "scram-sha-256" }, + { "label": "SCRAM-SHA-512", "value": "scram-sha-512" } + ] }, { "key": "tls", diff --git a/internal/destregistry/metadata/types.go b/internal/destregistry/metadata/types.go index e8fc499e2..3ef41efb7 100644 --- a/internal/destregistry/metadata/types.go +++ b/internal/destregistry/metadata/types.go @@ -22,19 +22,25 @@ type SetupLink struct { Cta string `json:"cta"` } +type FieldOption struct { + Label string `json:"label"` + Value string `json:"value"` +} + type FieldSchema struct { - Type string `json:"type"` - Label string `json:"label"` - Description string `json:"description"` - Key string `json:"key"` - Required bool `json:"required"` - Default *string `json:"default,omitempty"` - Disabled bool `json:"disabled,omitempty"` - Sensitive bool `json:"sensitive,omitempty"` // Whether the field value should be obfuscated in API responses - Min *int `json:"min,omitempty"` // Minimum value for numeric fields - Max *int `json:"max,omitempty"` // Maximum value for numeric fields - Step *int `json:"step,omitempty"` // Step value for numeric fields - MinLength *int `json:"minlength,omitempty"` // Minimum length for text fields - MaxLength *int `json:"maxlength,omitempty"` // Maximum length for text fields - Pattern *string `json:"pattern,omitempty"` // Regular expression pattern for text fields + Type string `json:"type"` + Label string `json:"label"` + Description string `json:"description"` + Key string `json:"key"` + Required bool `json:"required"` + Default *string `json:"default,omitempty"` + Disabled bool `json:"disabled,omitempty"` + Sensitive bool `json:"sensitive,omitempty"` // Whether the field value should be obfuscated in API responses + Min *int `json:"min,omitempty"` // Minimum value for numeric fields + Max *int `json:"max,omitempty"` // Maximum value for numeric fields + Step *int `json:"step,omitempty"` // Step value for numeric fields + MinLength *int `json:"minlength,omitempty"` // Minimum length for text fields + MaxLength *int `json:"maxlength,omitempty"` // Maximum length for text fields + Pattern *string `json:"pattern,omitempty"` // Regular expression pattern for text fields + Options []FieldOption `json:"options,omitempty"` // Options for select fields } diff --git a/internal/portal/src/common/DestinationConfigFields/DestinationConfigFields.scss b/internal/portal/src/common/DestinationConfigFields/DestinationConfigFields.scss index bfcc6f393..b0f2c1e9e 100644 --- a/internal/portal/src/common/DestinationConfigFields/DestinationConfigFields.scss +++ b/internal/portal/src/common/DestinationConfigFields/DestinationConfigFields.scss @@ -51,6 +51,16 @@ } } + select { + width: 100%; + cursor: pointer; + appearance: none; + background-image: url("data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' width='12' height='12' viewBox='0 0 24 24' fill='none' stroke='%23888' stroke-width='2' stroke-linecap='round' stroke-linejoin='round'%3E%3Cpath d='M6 9l6 6 6-6'/%3E%3C/svg%3E"); + background-repeat: no-repeat; + background-position: right var(--spacing-3) center; + padding-right: var(--spacing-8); + } + .description { font-size: var(--font-size-s); line-height: var(--line-height-s); diff --git a/internal/portal/src/common/DestinationConfigFields/DestinationConfigFields.tsx b/internal/portal/src/common/DestinationConfigFields/DestinationConfigFields.tsx index c817024e4..0e6d0cefc 100644 --- a/internal/portal/src/common/DestinationConfigFields/DestinationConfigFields.tsx +++ b/internal/portal/src/common/DestinationConfigFields/DestinationConfigFields.tsx @@ -172,6 +172,26 @@ const DestinationConfigFields = ({ )} )} + {field.type === "select" && ( + + )} {field.type === "checkbox" && ( ; key_placeholder?: string; value_placeholder?: string; } From 3d84504f81a0b648647950c8ee9d5dc6bd9e45b1 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Sun, 22 Mar 2026 22:53:24 +0700 Subject: [PATCH 05/16] chore: gofmt --- internal/logstore/driver/metrics.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/logstore/driver/metrics.go b/internal/logstore/driver/metrics.go index 1750f611a..a58ccba83 100644 --- a/internal/logstore/driver/metrics.go +++ b/internal/logstore/driver/metrics.go @@ -84,10 +84,10 @@ type EventMetricsResponse struct { type AttemptMetricsDataPoint struct { TimeBucket *time.Time // Measures - Count *int - SuccessfulCount *int - FailedCount *int - ErrorRate *float64 + Count *int + SuccessfulCount *int + FailedCount *int + ErrorRate *float64 // NOTE: The following three measures are equivalent to using "count" with // the corresponding filters (attempt_number=1 AND manual=false, attempt_number>1, // manual=true). They exist for composability — callers can request multiple From ab3dac5e1524858d763b0b46474f7814c65c4733 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Sun, 22 Mar 2026 23:09:06 +0700 Subject: [PATCH 06/16] fix(kafka): catch Kafka protocol timeout errors in error classification Add "Timed Out" pattern to also match kafka-go's RequestTimedOut protocol error, not just Go-level i/o timeouts. Co-Authored-By: Claude Opus 4.6 --- internal/destregistry/providers/destkafka/destkafka.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/destregistry/providers/destkafka/destkafka.go b/internal/destregistry/providers/destkafka/destkafka.go index 506528a97..9b7923fd4 100644 --- a/internal/destregistry/providers/destkafka/destkafka.go +++ b/internal/destregistry/providers/destkafka/destkafka.go @@ -295,7 +295,7 @@ func ClassifyKafkaError(err error) string { return "topic_not_found" case strings.Contains(errStr, "Message Size Too Large") || strings.Contains(errStr, "MESSAGE_TOO_LARGE"): return "message_too_large" - case strings.Contains(errStr, "i/o timeout") || strings.Contains(errStr, "context deadline exceeded"): + case strings.Contains(errStr, "i/o timeout") || strings.Contains(errStr, "context deadline exceeded") || strings.Contains(errStr, "Timed Out"): return "timeout" case strings.Contains(errStr, "tls:") || strings.Contains(errStr, "x509:"): return "tls_error" From 4e5aa35df1b9d4770ade5777cf982350132a83d8 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Mon, 23 Mar 2026 00:05:29 +0700 Subject: [PATCH 07/16] docs: add select field type and kafka to openapi schema Add select to DestinationSchemaField type enum with options property. Add kafka to destination type path enum and list example. Co-Authored-By: Claude Opus 4.6 --- docs/apis/openapi.yaml | 70 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 68 insertions(+), 2 deletions(-) diff --git a/docs/apis/openapi.yaml b/docs/apis/openapi.yaml index 81325efa6..2685a95c4 100644 --- a/docs/apis/openapi.yaml +++ b/docs/apis/openapi.yaml @@ -2003,7 +2003,7 @@ components: properties: type: type: string - enum: [text, checkbox, key_value_map] + enum: [text, checkbox, key_value_map, select] example: "text" label: type: string @@ -2034,6 +2034,19 @@ components: type: string description: Regex pattern for validation (compatible with HTML5 pattern attribute). example: "^[a-zA-Z0-9_]+$" + options: + type: array + description: Available options for select fields. + items: + type: object + required: [label, value] + properties: + label: + type: string + example: "PLAIN" + value: + type: string + example: "plain" MetricsDataPoint: type: object @@ -3664,6 +3677,59 @@ paths: sensitive: true, # Added sensitive }, ] + - type: "kafka" + label: "Apache Kafka" + description: "Send events to Apache Kafka topics for real-time event streaming" + icon: "" + config_fields: + [ + { + type: "text", + label: "Brokers", + description: "Comma-separated list of Kafka broker addresses.", + required: true, + }, + { + type: "text", + label: "Topic", + description: "The Kafka topic to publish messages to.", + required: true, + }, + { + type: "select", + label: "SASL Mechanism", + description: "SASL authentication mechanism.", + required: false, + options: [ + { label: "None", value: "" }, + { label: "PLAIN", value: "plain" }, + { label: "SCRAM-SHA-256", value: "scram-sha-256" }, + { label: "SCRAM-SHA-512", value: "scram-sha-512" }, + ], + }, + { + type: "checkbox", + label: "TLS", + description: "Enable TLS for the connection.", + default: "false", + }, + ] + credential_fields: + [ + { + type: "text", + label: "Username", + description: "SASL username for authentication.", + required: false, + }, + { + type: "text", + label: "Password", + description: "SASL password for authentication.", + required: false, + sensitive: true, + }, + ] - type: "aws_sqs" label: "AWS SQS" description: "Send event to an AWS SQS queue" @@ -3720,7 +3786,7 @@ paths: required: true schema: type: string - enum: [webhook, aws_sqs, rabbitmq, hookdeck, aws_kinesis, azure_servicebus, aws_s3, gcp_pubsub] + enum: [webhook, aws_sqs, rabbitmq, hookdeck, aws_kinesis, azure_servicebus, aws_s3, gcp_pubsub, kafka] description: The type of the destination. get: tags: [Schemas] From 16e2968111c85b423b443933533bc9b204080bfb Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Mon, 23 Mar 2026 00:19:45 +0700 Subject: [PATCH 08/16] fix: harden kafka validation, writer config, and error classification - Require username/password credentials when SASL mechanism is configured - Accept empty string for TLS config in Validate (treat as "not configured") - Add WriteTimeout (10s) and MaxAttempts (3) to kafka.Writer - Use "authentication" instead of "auth" in ClassifyKafkaError to avoid overly broad string matching Co-Authored-By: Claude Opus 4.6 (1M context) --- .../providers/destkafka/destkafka.go | 31 +++++++---- .../destkafka/destkafka_validate_test.go | 53 ++++++++++++++++++- 2 files changed, 72 insertions(+), 12 deletions(-) diff --git a/internal/destregistry/providers/destkafka/destkafka.go b/internal/destregistry/providers/destkafka/destkafka.go index 9b7923fd4..f062ebb49 100644 --- a/internal/destregistry/providers/destkafka/destkafka.go +++ b/internal/destregistry/providers/destkafka/destkafka.go @@ -54,10 +54,19 @@ func (d *KafkaDestination) Validate(ctx context.Context, destination *models.Des } // Validate SASL mechanism if provided - if mechanism := destination.Config["sasl_mechanism"]; mechanism != "" { - switch mechanism { + saslMechanism := destination.Config["sasl_mechanism"] + if saslMechanism != "" { + switch saslMechanism { case "plain", "scram-sha-256", "scram-sha-512": - // valid + // valid — require credentials when SASL is configured + if destination.Credentials["username"] == "" || destination.Credentials["password"] == "" { + return destregistry.NewErrDestinationValidation([]destregistry.ValidationErrorDetail{ + { + Field: "credentials", + Type: "required", + }, + }) + } default: return destregistry.NewErrDestinationValidation([]destregistry.ValidationErrorDetail{ { @@ -68,8 +77,8 @@ func (d *KafkaDestination) Validate(ctx context.Context, destination *models.Des } } - // Validate TLS config if provided - if tlsStr, ok := destination.Config["tls"]; ok { + // Validate TLS config if provided — empty string is treated as "not configured" + if tlsStr, ok := destination.Config["tls"]; ok && tlsStr != "" { if tlsStr != "on" && tlsStr != "true" && tlsStr != "false" { return destregistry.NewErrDestinationValidation([]destregistry.ValidationErrorDetail{ { @@ -109,10 +118,12 @@ func (d *KafkaDestination) CreatePublisher(ctx context.Context, destination *mod // Create writer with hash balancer for consistent partition key routing writer := &kafka.Writer{ - Addr: kafka.TCP(config.Brokers...), - Topic: config.Topic, - Balancer: &kafka.Hash{}, - Transport: transport, + Addr: kafka.TCP(config.Brokers...), + Topic: config.Topic, + Balancer: &kafka.Hash{}, + Transport: transport, + WriteTimeout: 10 * time.Second, + MaxAttempts: 3, } return &KafkaPublisher{ @@ -285,7 +296,7 @@ func ClassifyKafkaError(err error) string { errStr := err.Error() switch { - case strings.Contains(errStr, "SASL") || strings.Contains(errStr, "auth") || strings.Contains(errStr, "Authentication"): + case strings.Contains(errStr, "SASL") || strings.Contains(errStr, "authentication") || strings.Contains(errStr, "Authentication"): return "auth_failed" case strings.Contains(errStr, "connection refused"): return "connection_refused" diff --git a/internal/destregistry/providers/destkafka/destkafka_validate_test.go b/internal/destregistry/providers/destkafka/destkafka_validate_test.go index 2f6c7faab..388351050 100644 --- a/internal/destregistry/providers/destkafka/destkafka_validate_test.go +++ b/internal/destregistry/providers/destkafka/destkafka_validate_test.go @@ -92,7 +92,12 @@ func TestKafkaDestination_Validate(t *testing.T) { t.Parallel() dest := validDestination dest.Config = maps.Clone(validDestination.Config) - dest.Credentials = maps.Clone(validDestination.Credentials) + // Provide credentials when mechanism is set so we only test mechanism validation + if tc.mechanism != "" { + dest.Credentials = map[string]string{"username": "user", "password": "pass"} + } else { + dest.Credentials = maps.Clone(validDestination.Credentials) + } if tc.mechanism != "" { dest.Config["sasl_mechanism"] = tc.mechanism } @@ -122,7 +127,7 @@ func TestKafkaDestination_Validate(t *testing.T) { {name: "valid on", tlsValue: "on", shouldError: false}, {name: "valid false", tlsValue: "false", shouldError: false}, {name: "invalid value", tlsValue: "yes", shouldError: true}, - {name: "empty value", tlsValue: "", shouldError: true}, + {name: "empty value is valid (not configured)", tlsValue: "", shouldError: false}, {name: "case sensitive True", tlsValue: "True", shouldError: true}, } @@ -148,6 +153,50 @@ func TestKafkaDestination_Validate(t *testing.T) { } }) + t.Run("should require credentials when sasl_mechanism is set", func(t *testing.T) { + t.Parallel() + testCases := []struct { + name string + mechanism string + username string + password string + shouldError bool + }{ + {name: "plain with credentials", mechanism: "plain", username: "user", password: "pass", shouldError: false}, + {name: "plain without username", mechanism: "plain", username: "", password: "pass", shouldError: true}, + {name: "plain without password", mechanism: "plain", username: "user", password: "", shouldError: true}, + {name: "scram-sha-256 with credentials", mechanism: "scram-sha-256", username: "user", password: "pass", shouldError: false}, + {name: "scram-sha-256 without credentials", mechanism: "scram-sha-256", username: "", password: "", shouldError: true}, + {name: "no mechanism no credentials", mechanism: "", username: "", password: "", shouldError: false}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + dest := validDestination + dest.Config = maps.Clone(validDestination.Config) + dest.Credentials = map[string]string{ + "username": tc.username, + "password": tc.password, + } + if tc.mechanism != "" { + dest.Config["sasl_mechanism"] = tc.mechanism + } + err := kafkaDestination.Validate(context.Background(), &dest) + if tc.shouldError { + var validationErr *destregistry.ErrDestinationValidation + if !assert.ErrorAs(t, err, &validationErr) { + return + } + assert.Equal(t, "credentials", validationErr.Errors[0].Field) + assert.Equal(t, "required", validationErr.Errors[0].Type) + } else { + assert.NoError(t, err) + } + }) + } + }) + t.Run("should allow tls to be omitted", func(t *testing.T) { t.Parallel() dest := validDestination From 2dcfafdb7bc0fc8c3d6dee913502d3afb4ec4258 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Mon, 23 Mar 2026 00:20:40 +0700 Subject: [PATCH 09/16] fix: address kafka test infrastructure issues - Use fixed host port (19092) for kafka test container so advertised listeners match the actual port clients connect to - Add KafkaURL to non-TESTINFRA config path for consistency - Use atomic.Bool for KafkaConsumer.shuttingDown to fix data race Co-Authored-By: Claude Opus 4.6 (1M context) --- .../providers/destkafka/destkafka_publish_test.go | 7 ++++--- internal/util/testinfra/kafka.go | 15 ++++++++------- internal/util/testinfra/testinfra.go | 1 + 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/internal/destregistry/providers/destkafka/destkafka_publish_test.go b/internal/destregistry/providers/destkafka/destkafka_publish_test.go index 2b1d55128..e26e4bc8f 100644 --- a/internal/destregistry/providers/destkafka/destkafka_publish_test.go +++ b/internal/destregistry/providers/destkafka/destkafka_publish_test.go @@ -3,6 +3,7 @@ package destkafka_test import ( "context" "sync" + "sync/atomic" "testing" "time" @@ -23,7 +24,7 @@ type KafkaConsumer struct { reader *kafka.Reader msgChan chan testsuite.Message done chan struct{} - shuttingDown bool + shuttingDown atomic.Bool wg sync.WaitGroup } @@ -66,7 +67,7 @@ func (c *KafkaConsumer) consume() { metadata[h.Key] = string(h.Value) } - if !c.shuttingDown { + if !c.shuttingDown.Load() { c.msgChan <- testsuite.Message{ Data: msg.Value, Metadata: metadata, @@ -82,7 +83,7 @@ func (c *KafkaConsumer) Consume() <-chan testsuite.Message { } func (c *KafkaConsumer) Close() error { - c.shuttingDown = true + c.shuttingDown.Store(true) close(c.done) c.wg.Wait() close(c.msgChan) diff --git a/internal/util/testinfra/kafka.go b/internal/util/testinfra/kafka.go index ba7df6036..9d72ed0cf 100644 --- a/internal/util/testinfra/kafka.go +++ b/internal/util/testinfra/kafka.go @@ -24,11 +24,16 @@ func EnsureKafka() string { func startKafkaTestContainer(cfg *Config) { ctx := context.Background() + // Use a fixed host port so advertised listeners match what clients connect to. + // Kafka brokers redirect clients to the advertised address, so if testcontainers + // maps to a random port but the broker advertises 9092, clients would fail. + const hostPort = "19092" + req := testcontainers.ContainerRequest{ Image: "confluentinc/confluent-local:7.4.0", - ExposedPorts: []string{"9092/tcp"}, + ExposedPorts: []string{hostPort + ":9092/tcp"}, Env: map[string]string{ - "KAFKA_ADVERTISED_LISTENERS": "PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:9092", + "KAFKA_ADVERTISED_LISTENERS": "PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:" + hostPort, "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT", "KAFKA_LISTENERS": "PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093", "KAFKA_INTER_BROKER_LISTENER_NAME": "PLAINTEXT", @@ -48,11 +53,7 @@ func startKafkaTestContainer(cfg *Config) { panic(err) } - endpoint, err := container.PortEndpoint(ctx, "9092/tcp", "") - if err != nil { - panic(err) - } - + endpoint := "localhost:" + hostPort log.Printf("Kafka running at %s", endpoint) cfg.KafkaURL = endpoint cfg.cleanupFns = append(cfg.cleanupFns, func() { diff --git a/internal/util/testinfra/testinfra.go b/internal/util/testinfra/testinfra.go index 1b7166c21..6b4ce3f04 100644 --- a/internal/util/testinfra/testinfra.go +++ b/internal/util/testinfra/testinfra.go @@ -91,6 +91,7 @@ func initConfig() { GCPURL: "", AzureSBConnString: "", RabbitMQURL: "", + KafkaURL: "", MockServerURL: "", } } From 0becbbae5a2a8dacbad2f9c81555102a39e13824 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Mon, 23 Mar 2026 00:20:53 +0700 Subject: [PATCH 10/16] fix: add onChange handler to portal select field Ensures the parent component is notified when the select value changes, consistent with how other field types propagate changes. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../common/DestinationConfigFields/DestinationConfigFields.tsx | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/portal/src/common/DestinationConfigFields/DestinationConfigFields.tsx b/internal/portal/src/common/DestinationConfigFields/DestinationConfigFields.tsx index 0e6d0cefc..4d0c3d3fa 100644 --- a/internal/portal/src/common/DestinationConfigFields/DestinationConfigFields.tsx +++ b/internal/portal/src/common/DestinationConfigFields/DestinationConfigFields.tsx @@ -184,6 +184,7 @@ const DestinationConfigFields = ({ } disabled={field.disabled} required={field.required} + onChange={onChange} > {field.options?.map((option) => (