diff --git a/.env.test b/.env.test index d71503a81..26e04f292 100644 --- a/.env.test +++ b/.env.test @@ -3,6 +3,7 @@ TEST_POSTGRES_URL="localhost:35432" TEST_CLICKHOUSE_URL="localhost:39000" # MQs TEST_RABBITMQ_URL="localhost:35672" +TEST_KAFKA_URL="localhost:39092" TEST_LOCALSTACK_URL="localhost:34566" TEST_GCP_URL="localhost:38085" TEST_AZURE_SB_CONNSTRING="Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;" diff --git a/build/test/compose.yml b/build/test/compose.yml index 442992832..d486ce321 100644 --- a/build/test/compose.yml +++ b/build/test/compose.yml @@ -1,18 +1,18 @@ name: "outpost-test" services: - mock: + test-mock: image: outpost-mock build: context: ../../ dockerfile: ./build/test/Dockerfile.mock ports: - 35555:5555 - clickhouse: + test-clickhouse: image: clickhouse/clickhouse-server:24-alpine ports: - 39000:9000 - postgres: + test-postgres: image: postgres:16-alpine environment: - POSTGRES_USER=outpost @@ -20,19 +20,26 @@ services: - POSTGRES_DB=default ports: - 35432:5432 - rabbitmq: + test-rabbitmq: image: rabbitmq:3-management ports: - 35672:5672 - 45672:15672 - aws: + test-aws: image: localstack/localstack:latest environment: - SERVICES=s3,sns,sts,sqs,kinesis ports: - 34566:4566 - 34571:4571 - gcp: + test-kafka: + image: confluentinc/confluent-local:7.4.0 + environment: + - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://test-kafka:29092,PLAINTEXT_HOST://localhost:39092 + - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:9092 + ports: + - 39092:9092 + test-gcp: image: gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators command: [ @@ -46,3 +53,8 @@ services: ] ports: - "38085:8085" + +networks: + default: + name: outpost + external: true diff --git a/docs/apis/openapi.yaml b/docs/apis/openapi.yaml index 549ae48e6..9698ec3e6 100644 --- a/docs/apis/openapi.yaml +++ b/docs/apis/openapi.yaml @@ -2009,7 +2009,7 @@ components: properties: type: type: string - enum: [text, checkbox, key_value_map] + enum: [text, checkbox, key_value_map, select] example: "text" label: type: string @@ -2040,6 +2040,19 @@ components: type: string description: Regex pattern for validation (compatible with HTML5 pattern attribute). example: "^[a-zA-Z0-9_]+$" + options: + type: array + description: Available options for select fields. + items: + type: object + required: [label, value] + properties: + label: + type: string + example: "PLAIN" + value: + type: string + example: "plain" MetricsDataPoint: type: object @@ -3675,6 +3688,64 @@ paths: sensitive: true, # Added sensitive }, ] + - type: "kafka" + label: "Apache Kafka" + description: "Send events to Apache Kafka topics for real-time event streaming" + icon: "" + config_fields: + [ + { + type: "text", + label: "Brokers", + description: "Comma-separated list of Kafka broker addresses.", + required: true, + }, + { + type: "text", + label: "Topic", + description: "The Kafka topic to publish messages to.", + required: true, + }, + { + type: "checkbox", + label: "TLS", + description: "Enable TLS for the connection.", + default: "true", + }, + { + type: "text", + label: "Partition Key Template", + description: "JMESPath template to extract the partition key from the event payload.", + required: false, + }, + { + type: "select", + label: "SASL Mechanism", + description: "SASL authentication mechanism.", + required: true, + options: [ + { label: "PLAIN", value: "plain" }, + { label: "SCRAM-SHA-256", value: "scram-sha-256" }, + { label: "SCRAM-SHA-512", value: "scram-sha-512" }, + ], + }, + ] + credential_fields: + [ + { + type: "text", + label: "Username", + description: "SASL username for authentication.", + required: true, + }, + { + type: "text", + label: "Password", + description: "SASL password for authentication.", + required: true, + sensitive: true, + }, + ] - type: "aws_sqs" label: "AWS SQS" description: "Send event to an AWS SQS queue" @@ -3731,7 +3802,7 @@ paths: required: true schema: type: string - enum: [webhook, aws_sqs, rabbitmq, hookdeck, aws_kinesis, azure_servicebus, aws_s3, gcp_pubsub] + enum: [webhook, aws_sqs, rabbitmq, hookdeck, aws_kinesis, azure_servicebus, aws_s3, gcp_pubsub, kafka] description: The type of the destination. get: tags: [Schemas] diff --git a/go.mod b/go.mod index d9056f764..f498cf41b 100644 --- a/go.mod +++ b/go.mod @@ -40,6 +40,7 @@ require ( github.com/rabbitmq/amqp091-go v1.10.0 github.com/redis/go-redis/extra/redisotel/v9 v9.5.3 github.com/redis/go-redis/v9 v9.6.1 + github.com/segmentio/kafka-go v0.4.50 github.com/spf13/viper v1.19.0 github.com/standard-webhooks/standard-webhooks/libraries v0.0.0-20250711233419-a173a6c0125c github.com/stretchr/testify v1.10.0 @@ -205,6 +206,9 @@ require ( github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.12 // indirect github.com/uptrace/opentelemetry-go-extra/otelutil v0.3.1 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect github.com/yuin/gopher-lua v1.1.1 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect go.opencensus.io v0.24.0 // indirect @@ -219,7 +223,7 @@ require ( golang.org/x/crypto v0.37.0 // indirect golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect golang.org/x/mod v0.21.0 // indirect - golang.org/x/net v0.36.0 // indirect + golang.org/x/net v0.38.0 // indirect golang.org/x/sys v0.32.0 // indirect golang.org/x/text v0.24.0 // indirect golang.org/x/time v0.6.0 // indirect diff --git a/go.sum b/go.sum index e0017210b..e7deb082b 100644 --- a/go.sum +++ b/go.sum @@ -1227,6 +1227,8 @@ github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6g github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= +github.com/segmentio/kafka-go v0.4.50 h1:mcyC3tT5WeyWzrFbd6O374t+hmcu1NKt2Pu1L3QaXmc= +github.com/segmentio/kafka-go v0.4.50/go.mod h1:Y1gn60kzLEEaW28YshXyk2+VCUKbJ3Qr6DrnT3i4+9E= github.com/shirou/gopsutil/v4 v4.25.1 h1:QSWkTc+fu9LTAWfkZwZ6j8MSUk4A2LV7rbH0ZqmLjXs= github.com/shirou/gopsutil/v4 v4.25.1/go.mod h1:RoUCUpndaJFtT+2zsZzzmhvbfGoDCJ7nFXKJf8GqJbI= github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= @@ -1297,9 +1299,14 @@ github.com/uptrace/opentelemetry-go-extra/otelzap v0.3.1 h1:0iCp8hx3PFhGihubKHxy github.com/uptrace/opentelemetry-go-extra/otelzap v0.3.1/go.mod h1:FXrjpUJDqwqofvXWG3YNxQwhg2876tUpZASj8VvOMAM= github.com/urfave/cli/v3 v3.4.1 h1:1M9UOCy5bLmGnuu1yn3t3CB4rG79Rtoxuv1sPhnm6qM= github.com/urfave/cli/v3 v3.4.1/go.mod h1:FJSKtM/9AiiTOJL4fJ6TbMUkxBXn7GO9guZqoZtpYpo= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -1533,8 +1540,8 @@ golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= -golang.org/x/net v0.36.0 h1:vWF2fRbw4qslQsQzgFqZff+BItCvGFQqKzKIzx1rmoA= -golang.org/x/net v0.36.0/go.mod h1:bFmbeoIPfrw4sMHNhb4J9f6+tPziuGjq7Jk/38fxi1I= +golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= +golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= diff --git a/internal/destregistry/metadata/providers/kafka/instructions.md b/internal/destregistry/metadata/providers/kafka/instructions.md new file mode 100644 index 000000000..08847c0d2 --- /dev/null +++ b/internal/destregistry/metadata/providers/kafka/instructions.md @@ -0,0 +1,23 @@ +# Apache Kafka Configuration Instructions + +Apache Kafka is a distributed event streaming platform used for building real-time data pipelines and streaming applications. It provides: + +- High-throughput, low-latency message delivery +- Durable message storage with configurable retention +- Horizontal scalability through partitioning +- Built-in support for SASL authentication and TLS encryption + +## How to configure Kafka as an event destination + +To configure Kafka as a destination you must provide: + +- **Brokers** — A comma-separated list of Kafka broker addresses (e.g., `broker1:9092,broker2:9092`) +- **Topic** — The Kafka topic to publish messages to +- **SASL Mechanism** — Authentication mechanism: `plain`, `scram-sha-256`, or `scram-sha-512` +- **Username** — SASL username for authentication +- **Password** — SASL password for authentication + +### Optional settings + +- **TLS** — Enable TLS encryption for the connection. Enabled by default. +- **Partition Key Template** — A JMESPath expression to extract the message key from the event payload. If not set, the event ID is used as the message key. diff --git a/internal/destregistry/metadata/providers/kafka/metadata.json b/internal/destregistry/metadata/providers/kafka/metadata.json new file mode 100644 index 000000000..b569ca013 --- /dev/null +++ b/internal/destregistry/metadata/providers/kafka/metadata.json @@ -0,0 +1,67 @@ +{ + "type": "kafka", + "label": "Apache Kafka", + "description": "Send events to Apache Kafka topics for real-time event streaming", + "link": "https://kafka.apache.org/", + "config_fields": [ + { + "key": "brokers", + "type": "text", + "label": "Brokers", + "description": "Comma-separated list of Kafka broker addresses (e.g., broker1:9092,broker2:9092)", + "required": true + }, + { + "key": "topic", + "type": "text", + "label": "Topic", + "description": "The Kafka topic to publish messages to", + "required": true + }, + { + "key": "tls", + "type": "checkbox", + "label": "TLS", + "description": "Enable TLS for the connection", + "default": "true" + }, + { + "key": "partition_key_template", + "type": "text", + "label": "Partition Key Template", + "description": "JMESPath template to extract the partition key from the event payload (e.g., metadata.\"event-id\"). Default is event ID, which is also used as fallback if template evaluation fails or returns empty.", + "required": false + }, + { + "key": "sasl_mechanism", + "type": "select", + "label": "SASL Mechanism", + "description": "SASL authentication mechanism", + "required": true, + "options": [ + { "label": "PLAIN", "value": "plain" }, + { "label": "SCRAM-SHA-256", "value": "scram-sha-256" }, + { "label": "SCRAM-SHA-512", "value": "scram-sha-512" } + ] + } + ], + "credential_fields": [ + { + "key": "username", + "type": "text", + "label": "Username", + "description": "SASL username for authentication", + "required": true, + "sensitive": false + }, + { + "key": "password", + "type": "text", + "label": "Password", + "description": "SASL password for authentication", + "required": true, + "sensitive": true + } + ], + "icon": "" +} diff --git a/internal/destregistry/metadata/types.go b/internal/destregistry/metadata/types.go index e8fc499e2..3ef41efb7 100644 --- a/internal/destregistry/metadata/types.go +++ b/internal/destregistry/metadata/types.go @@ -22,19 +22,25 @@ type SetupLink struct { Cta string `json:"cta"` } +type FieldOption struct { + Label string `json:"label"` + Value string `json:"value"` +} + type FieldSchema struct { - Type string `json:"type"` - Label string `json:"label"` - Description string `json:"description"` - Key string `json:"key"` - Required bool `json:"required"` - Default *string `json:"default,omitempty"` - Disabled bool `json:"disabled,omitempty"` - Sensitive bool `json:"sensitive,omitempty"` // Whether the field value should be obfuscated in API responses - Min *int `json:"min,omitempty"` // Minimum value for numeric fields - Max *int `json:"max,omitempty"` // Maximum value for numeric fields - Step *int `json:"step,omitempty"` // Step value for numeric fields - MinLength *int `json:"minlength,omitempty"` // Minimum length for text fields - MaxLength *int `json:"maxlength,omitempty"` // Maximum length for text fields - Pattern *string `json:"pattern,omitempty"` // Regular expression pattern for text fields + Type string `json:"type"` + Label string `json:"label"` + Description string `json:"description"` + Key string `json:"key"` + Required bool `json:"required"` + Default *string `json:"default,omitempty"` + Disabled bool `json:"disabled,omitempty"` + Sensitive bool `json:"sensitive,omitempty"` // Whether the field value should be obfuscated in API responses + Min *int `json:"min,omitempty"` // Minimum value for numeric fields + Max *int `json:"max,omitempty"` // Maximum value for numeric fields + Step *int `json:"step,omitempty"` // Step value for numeric fields + MinLength *int `json:"minlength,omitempty"` // Minimum length for text fields + MaxLength *int `json:"maxlength,omitempty"` // Maximum length for text fields + Pattern *string `json:"pattern,omitempty"` // Regular expression pattern for text fields + Options []FieldOption `json:"options,omitempty"` // Options for select fields } diff --git a/internal/destregistry/partitionkey/partitionkey.go b/internal/destregistry/partitionkey/partitionkey.go new file mode 100644 index 000000000..395829fc8 --- /dev/null +++ b/internal/destregistry/partitionkey/partitionkey.go @@ -0,0 +1,40 @@ +package partitionkey + +import ( + "fmt" + + "github.com/jmespath/go-jmespath" +) + +// Evaluate extracts a partition key from payload using a JMESPath template. +// If the template is empty or evaluation returns nil/empty, fallbackKey is returned. +func Evaluate(template string, payload map[string]interface{}, fallbackKey string) (string, error) { + if template == "" { + return fallbackKey, nil + } + + result, err := jmespath.Search(template, payload) + if err != nil { + return "", fmt.Errorf("error evaluating partition key template: %w", err) + } + + if result == nil { + return fallbackKey, nil + } + + switch v := result.(type) { + case string: + if v == "" { + return fallbackKey, nil + } + return v, nil + case float64: + return fmt.Sprintf("%g", v), nil + case int: + return fmt.Sprintf("%d", v), nil + case bool: + return fmt.Sprintf("%t", v), nil + default: + return fmt.Sprintf("%v", v), nil + } +} diff --git a/internal/destregistry/partitionkey/partitionkey_test.go b/internal/destregistry/partitionkey/partitionkey_test.go new file mode 100644 index 000000000..78d65e67a --- /dev/null +++ b/internal/destregistry/partitionkey/partitionkey_test.go @@ -0,0 +1,118 @@ +package partitionkey_test + +import ( + "testing" + + "github.com/hookdeck/outpost/internal/destregistry/partitionkey" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEvaluate(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + template string + payload map[string]interface{} + fallbackKey string + expected string + expectError bool + }{ + { + name: "empty template returns fallback", + template: "", + payload: map[string]interface{}{"key": "value"}, + fallbackKey: "fallback-123", + expected: "fallback-123", + }, + { + name: "simple field access", + template: "metadata.topic", + payload: map[string]interface{}{ + "metadata": map[string]interface{}{"topic": "test-topic"}, + }, + fallbackKey: "fallback", + expected: "test-topic", + }, + { + name: "nested field access", + template: "data.user.id", + payload: map[string]interface{}{ + "data": map[string]interface{}{ + "user": map[string]interface{}{"id": "user-456"}, + }, + }, + fallbackKey: "fallback", + expected: "user-456", + }, + { + name: "join expression", + template: "join('-', [metadata.topic, metadata.\"event-id\"])", + payload: map[string]interface{}{ + "metadata": map[string]interface{}{ + "topic": "test-topic", + "event-id": "event-123", + }, + }, + fallbackKey: "fallback", + expected: "test-topic-event-123", + }, + { + name: "non-existent field returns fallback", + template: "metadata.nonexistent", + payload: map[string]interface{}{ + "metadata": map[string]interface{}{"topic": "test"}, + }, + fallbackKey: "fallback-123", + expected: "fallback-123", + }, + { + name: "invalid template syntax returns error", + template: "metadata.topic[", + payload: map[string]interface{}{}, + fallbackKey: "fallback", + expectError: true, + }, + { + name: "numeric value", + template: "data.count", + payload: map[string]interface{}{ + "data": map[string]interface{}{"count": float64(123)}, + }, + fallbackKey: "fallback", + expected: "123", + }, + { + name: "boolean value", + template: "data.active", + payload: map[string]interface{}{ + "data": map[string]interface{}{"active": true}, + }, + fallbackKey: "fallback", + expected: "true", + }, + { + name: "empty string result returns fallback", + template: "data.empty", + payload: map[string]interface{}{ + "data": map[string]interface{}{"empty": ""}, + }, + fallbackKey: "fallback-123", + expected: "fallback-123", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + result, err := partitionkey.Evaluate(tt.template, tt.payload, tt.fallbackKey) + if tt.expectError { + assert.Error(t, err) + return + } + require.NoError(t, err) + assert.Equal(t, tt.expected, result) + }) + } +} diff --git a/internal/destregistry/providers/default.go b/internal/destregistry/providers/default.go index ae19b0dfb..c72c0e228 100644 --- a/internal/destregistry/providers/default.go +++ b/internal/destregistry/providers/default.go @@ -8,6 +8,7 @@ import ( "github.com/hookdeck/outpost/internal/destregistry/providers/destazureservicebus" "github.com/hookdeck/outpost/internal/destregistry/providers/destgcppubsub" "github.com/hookdeck/outpost/internal/destregistry/providers/desthookdeck" + "github.com/hookdeck/outpost/internal/destregistry/providers/destkafka" "github.com/hookdeck/outpost/internal/destregistry/providers/destrabbitmq" "github.com/hookdeck/outpost/internal/destregistry/providers/destwebhook" "github.com/hookdeck/outpost/internal/destregistry/providers/destwebhookstandard" @@ -138,5 +139,11 @@ func RegisterDefault(registry destregistry.Registry, opts RegisterDefaultDestina } registry.RegisterProvider("rabbitmq", rabbitmq) + kafkaDest, err := destkafka.New(loader, basePublisherOpts) + if err != nil { + return err + } + registry.RegisterProvider("kafka", kafkaDest) + return nil } diff --git a/internal/destregistry/providers/destawskinesis/destawskinesis.go b/internal/destregistry/providers/destawskinesis/destawskinesis.go index e62e28099..7d5ec3e5a 100644 --- a/internal/destregistry/providers/destawskinesis/destawskinesis.go +++ b/internal/destregistry/providers/destawskinesis/destawskinesis.go @@ -14,8 +14,8 @@ import ( "github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/hookdeck/outpost/internal/destregistry" "github.com/hookdeck/outpost/internal/destregistry/metadata" + "github.com/hookdeck/outpost/internal/destregistry/partitionkey" "github.com/hookdeck/outpost/internal/models" - "github.com/jmespath/go-jmespath" ) // Configuration types @@ -182,42 +182,6 @@ func (p *AWSKinesisPublisher) Close() error { return nil } -// evaluatePartitionKey extracts the partition key from the event using the JMESPath template -func (p *AWSKinesisPublisher) evaluatePartitionKey(payload map[string]interface{}, eventID string) (string, error) { - // If no template is specified or empty, use event ID - if p.partitionKeyTemplate == "" { - return eventID, nil - } - - // Evaluate the JMESPath template - result, err := jmespath.Search(p.partitionKeyTemplate, payload) - if err != nil { - return "", fmt.Errorf("error evaluating partition key template: %w", err) - } - - // Handle nil result - fall back to event ID - if result == nil { - return eventID, nil - } - - // Convert the result to string based on its type - switch v := result.(type) { - case string: - if v == "" { - return eventID, nil // Fall back to event ID if empty string - } - return v, nil - case float64: - return fmt.Sprintf("%g", v), nil - case int: - return fmt.Sprintf("%d", v), nil - case bool: - return fmt.Sprintf("%t", v), nil - default: - return fmt.Sprintf("%v", v), nil - } -} - // Format prepares the event for sending to Kinesis func (p *AWSKinesisPublisher) Format(ctx context.Context, event *models.Event) (*kinesis.PutRecordInput, error) { var payload map[string]interface{} @@ -270,7 +234,7 @@ func (p *AWSKinesisPublisher) Format(ctx context.Context, event *models.Event) ( } // Get partition key from template or use event ID as default - partitionKey, err := p.evaluatePartitionKey(payload, event.ID) + partitionKey, err := partitionkey.Evaluate(p.partitionKeyTemplate, payload, event.ID) if err != nil { // If template evaluation fails, log the error and fall back to event ID partitionKey = event.ID diff --git a/internal/destregistry/providers/destawskinesis/destawskinesis_publish_test.go b/internal/destregistry/providers/destawskinesis/destawskinesis_publish_test.go index 601e0fd97..1b323fad7 100644 --- a/internal/destregistry/providers/destawskinesis/destawskinesis_publish_test.go +++ b/internal/destregistry/providers/destawskinesis/destawskinesis_publish_test.go @@ -13,13 +13,13 @@ import ( "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/aws/aws-sdk-go-v2/service/kinesis/types" + "github.com/hookdeck/outpost/internal/destregistry/partitionkey" "github.com/hookdeck/outpost/internal/destregistry/providers/destawskinesis" testsuite "github.com/hookdeck/outpost/internal/destregistry/testing" "github.com/hookdeck/outpost/internal/idgen" "github.com/hookdeck/outpost/internal/models" "github.com/hookdeck/outpost/internal/util/testinfra" "github.com/hookdeck/outpost/internal/util/testutil" - "github.com/jmespath/go-jmespath" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -164,38 +164,7 @@ type KinesisAsserter struct { // evaluateTemplate is a test helper that evaluates a JMESPath template against payload data func (a *KinesisAsserter) evaluateTemplate(payload map[string]interface{}, eventID string) (string, error) { - // If no template is specified or empty, use event ID - if a.partitionKeyTemplate == "" { - return eventID, nil - } - - // Evaluate the JMESPath template - result, err := jmespath.Search(a.partitionKeyTemplate, payload) - if err != nil { - return "", fmt.Errorf("error evaluating partition key template: %w", err) - } - - // Handle nil result - fall back to event ID - if result == nil { - return eventID, nil - } - - // Convert the result to string based on its type - switch v := result.(type) { - case string: - if v == "" { - return eventID, nil // Fall back to event ID if empty string - } - return v, nil - case float64: - return fmt.Sprintf("%g", v), nil - case int: - return fmt.Sprintf("%d", v), nil - case bool: - return fmt.Sprintf("%t", v), nil - default: - return fmt.Sprintf("%v", v), nil - } + return partitionkey.Evaluate(a.partitionKeyTemplate, payload, eventID) } func (a *KinesisAsserter) AssertMessage(t testsuite.TestingT, msg testsuite.Message, event models.Event) { diff --git a/internal/destregistry/providers/destkafka/destkafka.go b/internal/destregistry/providers/destkafka/destkafka.go new file mode 100644 index 000000000..a75cdc637 --- /dev/null +++ b/internal/destregistry/providers/destkafka/destkafka.go @@ -0,0 +1,339 @@ +package destkafka + +import ( + "context" + "crypto/tls" + "fmt" + "strings" + "time" + + "github.com/hookdeck/outpost/internal/destregistry" + "github.com/hookdeck/outpost/internal/destregistry/metadata" + "github.com/hookdeck/outpost/internal/destregistry/partitionkey" + "github.com/hookdeck/outpost/internal/models" + "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl" + "github.com/segmentio/kafka-go/sasl/plain" + "github.com/segmentio/kafka-go/sasl/scram" +) + +// Configuration types + +type KafkaConfig struct { + Brokers []string + Topic string + SASLMechanism string + UseTLS bool + PartitionKeyTemplate string +} + +type KafkaCredentials struct { + Username string + Password string +} + +// Provider implementation + +type KafkaDestination struct { + *destregistry.BaseProvider +} + +var _ destregistry.Provider = (*KafkaDestination)(nil) + +func New(loader metadata.MetadataLoader, basePublisherOpts []destregistry.BasePublisherOption) (*KafkaDestination, error) { + base, err := destregistry.NewBaseProvider(loader, "kafka", basePublisherOpts...) + if err != nil { + return nil, err + } + return &KafkaDestination{BaseProvider: base}, nil +} + +func (d *KafkaDestination) Validate(ctx context.Context, destination *models.Destination) error { + if err := d.BaseProvider.Validate(ctx, destination); err != nil { + return err + } + + // Validate SASL mechanism value (field itself is required via metadata) + saslMechanism := destination.Config["sasl_mechanism"] + switch saslMechanism { + case "plain", "scram-sha-256", "scram-sha-512": + // valid + default: + return destregistry.NewErrDestinationValidation([]destregistry.ValidationErrorDetail{ + { + Field: "config.sasl_mechanism", + Type: "invalid", + }, + }) + } + + // Validate TLS config if provided — empty string is treated as "not configured" + if tlsStr, ok := destination.Config["tls"]; ok && tlsStr != "" { + if tlsStr != "on" && tlsStr != "true" && tlsStr != "false" { + return destregistry.NewErrDestinationValidation([]destregistry.ValidationErrorDetail{ + { + Field: "config.tls", + Type: "invalid", + }, + }) + } + } + + return nil +} + +func (d *KafkaDestination) CreatePublisher(ctx context.Context, destination *models.Destination) (destregistry.Publisher, error) { + config, credentials, err := d.resolveConfig(ctx, destination) + if err != nil { + return nil, err + } + + // Build SASL mechanism + var mechanism sasl.Mechanism + if config.SASLMechanism != "" { + mechanism, err = buildSASLMechanism(config.SASLMechanism, credentials) + if err != nil { + return nil, fmt.Errorf("failed to configure SASL: %w", err) + } + } + + // Build transport + transport := &kafka.Transport{} + if mechanism != nil { + transport.SASL = mechanism + } + if config.UseTLS { + transport.TLS = &tls.Config{} + } + + // Create writer with hash balancer for consistent partition key routing. + // No WriteTimeout — the caller's context deadline (registry DeliveryTimeout) + // controls the timeout, consistent with how other providers work. + writer := &kafka.Writer{ + Addr: kafka.TCP(config.Brokers...), + Topic: config.Topic, + Balancer: &kafka.Hash{}, + Transport: transport, + } + + return &KafkaPublisher{ + BasePublisher: d.BaseProvider.NewPublisher(destregistry.WithDeliveryMetadata(destination.DeliveryMetadata)), + writer: writer, + partitionKeyTemplate: config.PartitionKeyTemplate, + }, nil +} + +func (d *KafkaDestination) resolveConfig(ctx context.Context, destination *models.Destination) (*KafkaConfig, *KafkaCredentials, error) { + if err := d.Validate(ctx, destination); err != nil { + return nil, nil, err + } + + // Parse brokers + brokersStr := destination.Config["brokers"] + brokers := parseBrokers(brokersStr) + + useTLS := false + if tlsStr, ok := destination.Config["tls"]; ok { + useTLS = tlsStr == "true" || tlsStr == "on" + } + + return &KafkaConfig{ + Brokers: brokers, + Topic: destination.Config["topic"], + SASLMechanism: destination.Config["sasl_mechanism"], + UseTLS: useTLS, + PartitionKeyTemplate: destination.Config["partition_key_template"], + }, &KafkaCredentials{ + Username: destination.Credentials["username"], + Password: destination.Credentials["password"], + }, nil +} + +func (d *KafkaDestination) Preprocess(newDestination *models.Destination, originalDestination *models.Destination, opts *destregistry.PreprocessDestinationOpts) error { + if newDestination.Config == nil { + return nil + } + + // Normalize TLS value + if newDestination.Config["tls"] == "on" { + newDestination.Config["tls"] = "true" + } else if newDestination.Config["tls"] == "" { + newDestination.Config["tls"] = "false" + } + + // Trim whitespace from brokers + if brokers := newDestination.Config["brokers"]; brokers != "" { + parts := strings.Split(brokers, ",") + for i := range parts { + parts[i] = strings.TrimSpace(parts[i]) + } + newDestination.Config["brokers"] = strings.Join(parts, ",") + } + + if _, _, err := d.resolveConfig(context.Background(), newDestination); err != nil { + return err + } + return nil +} + +func (d *KafkaDestination) ComputeTarget(destination *models.Destination) destregistry.DestinationTarget { + brokers := parseBrokers(destination.Config["brokers"]) + topic := destination.Config["topic"] + + var target string + if len(brokers) > 0 { + target = fmt.Sprintf("%s / %s", brokers[0], topic) + } else { + target = topic + } + + return destregistry.DestinationTarget{ + Target: target, + TargetURL: "", + } +} + +// Publisher implementation + +type KafkaPublisher struct { + *destregistry.BasePublisher + writer *kafka.Writer + partitionKeyTemplate string +} + +func (p *KafkaPublisher) Close() error { + p.BasePublisher.StartClose() + return p.writer.Close() +} + +func (p *KafkaPublisher) Publish(ctx context.Context, event *models.Event) (*destregistry.Delivery, error) { + if err := p.BasePublisher.StartPublish(); err != nil { + return nil, err + } + defer p.BasePublisher.FinishPublish() + + // Build metadata for headers and partition key evaluation + meta := p.BasePublisher.MakeMetadata(event, time.Now()) + metadataMap := make(map[string]interface{}) + for k, v := range meta { + metadataMap[k] = v + } + + // Build parsed payload for partition key JMESPath evaluation + dataMap, err := event.ParsedData() + if err != nil { + return nil, destregistry.NewErrDestinationPublishAttempt( + err, "kafka", map[string]interface{}{ + "error": "format_failed", + "message": err.Error(), + }, + ) + } + if dataMap == nil { + dataMap = make(map[string]interface{}) + } + payload := map[string]interface{}{ + "metadata": metadataMap, + "data": dataMap, + } + + // Evaluate partition key + key, err := partitionkey.Evaluate(p.partitionKeyTemplate, payload, event.ID) + if err != nil { + key = event.ID + } + + // Build Kafka headers from metadata + headers := make([]kafka.Header, 0, len(meta)+1) + headers = append(headers, kafka.Header{Key: "content-type", Value: []byte("application/json")}) + for k, v := range meta { + headers = append(headers, kafka.Header{Key: k, Value: []byte(v)}) + } + + // Write message — send raw event data as value, metadata in headers only + msg := kafka.Message{ + Key: []byte(key), + Value: []byte(event.Data), + Headers: headers, + } + + if err := p.writer.WriteMessages(ctx, msg); err != nil { + return &destregistry.Delivery{ + Status: "failed", + Code: ClassifyKafkaError(err), + Response: map[string]interface{}{ + "error": err.Error(), + }, + }, destregistry.NewErrDestinationPublishAttempt(err, "kafka", map[string]interface{}{ + "error": ClassifyKafkaError(err), + "message": err.Error(), + }) + } + + return &destregistry.Delivery{ + Status: "success", + Code: "OK", + Response: map[string]interface{}{}, + }, nil +} + +// ClassifyKafkaError returns a descriptive error code based on the error type. +func ClassifyKafkaError(err error) string { + if err == nil { + return "unknown" + } + + errStr := err.Error() + + switch { + case strings.Contains(errStr, "SASL") || strings.Contains(errStr, "authentication") || strings.Contains(errStr, "Authentication"): + return "auth_failed" + case strings.Contains(errStr, "connection refused"): + return "connection_refused" + case strings.Contains(errStr, "no such host"): + return "dns_error" + case strings.Contains(errStr, "Unknown Topic") || strings.Contains(errStr, "UNKNOWN_TOPIC"): + return "topic_not_found" + case strings.Contains(errStr, "Message Size Too Large") || strings.Contains(errStr, "MESSAGE_TOO_LARGE"): + return "message_too_large" + case strings.Contains(errStr, "i/o timeout") || strings.Contains(errStr, "context deadline exceeded") || strings.Contains(errStr, "Timed Out"): + return "timeout" + case strings.Contains(errStr, "tls:") || strings.Contains(errStr, "x509:"): + return "tls_error" + default: + return "kafka_error" + } +} + +// Helper functions + +func parseBrokers(brokersStr string) []string { + if brokersStr == "" { + return nil + } + parts := strings.Split(brokersStr, ",") + brokers := make([]string, 0, len(parts)) + for _, p := range parts { + trimmed := strings.TrimSpace(p) + if trimmed != "" { + brokers = append(brokers, trimmed) + } + } + return brokers +} + +func buildSASLMechanism(mechanism string, creds *KafkaCredentials) (sasl.Mechanism, error) { + switch mechanism { + case "plain": + return &plain.Mechanism{ + Username: creds.Username, + Password: creds.Password, + }, nil + case "scram-sha-256": + return scram.Mechanism(scram.SHA256, creds.Username, creds.Password) + case "scram-sha-512": + return scram.Mechanism(scram.SHA512, creds.Username, creds.Password) + default: + return nil, fmt.Errorf("unsupported SASL mechanism: %s", mechanism) + } +} diff --git a/internal/destregistry/providers/destkafka/destkafka_publish_test.go b/internal/destregistry/providers/destkafka/destkafka_publish_test.go new file mode 100644 index 000000000..99629760b --- /dev/null +++ b/internal/destregistry/providers/destkafka/destkafka_publish_test.go @@ -0,0 +1,258 @@ +package destkafka_test + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/hookdeck/outpost/internal/destregistry/providers/destkafka" + testsuite "github.com/hookdeck/outpost/internal/destregistry/testing" + "github.com/hookdeck/outpost/internal/idgen" + "github.com/hookdeck/outpost/internal/models" + "github.com/hookdeck/outpost/internal/util/testinfra" + "github.com/hookdeck/outpost/internal/util/testutil" + "github.com/segmentio/kafka-go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +// KafkaConsumer implements testsuite.MessageConsumer +type KafkaConsumer struct { + reader *kafka.Reader + msgChan chan testsuite.Message + done chan struct{} + shuttingDown atomic.Bool + wg sync.WaitGroup +} + +func NewKafkaConsumer(brokerAddr, topic string) (*KafkaConsumer, error) { + reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{brokerAddr}, + Topic: topic, + StartOffset: kafka.FirstOffset, + MaxWait: 500 * time.Millisecond, + }) + + c := &KafkaConsumer{ + reader: reader, + msgChan: make(chan testsuite.Message, 100), + done: make(chan struct{}), + } + c.wg.Add(1) + go c.consume() + return c, nil +} + +func (c *KafkaConsumer) consume() { + defer c.wg.Done() + + for { + select { + case <-c.done: + return + default: + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + msg, err := c.reader.ReadMessage(ctx) + cancel() + if err != nil { + continue + } + + // Extract metadata from headers + metadata := make(map[string]string) + for _, h := range msg.Headers { + metadata[h.Key] = string(h.Value) + } + + if !c.shuttingDown.Load() { + c.msgChan <- testsuite.Message{ + Data: msg.Value, + Metadata: metadata, + Raw: msg, + } + } + } + } +} + +func (c *KafkaConsumer) Consume() <-chan testsuite.Message { + return c.msgChan +} + +func (c *KafkaConsumer) Close() error { + c.shuttingDown.Store(true) + close(c.done) + c.wg.Wait() + close(c.msgChan) + return c.reader.Close() +} + +// KafkaAsserter implements testsuite.MessageAsserter +type KafkaAsserter struct{} + +func (a *KafkaAsserter) AssertMessage(t testsuite.TestingT, msg testsuite.Message, event models.Event) { + kafkaMsg, ok := msg.Raw.(kafka.Message) + assert.True(t, ok, "raw message should be kafka.Message") + + // Verify content-type header + assert.Equal(t, "application/json", msg.Metadata["content-type"]) + + // Verify message key is set (event ID by default) + assert.Equal(t, event.ID, string(kafkaMsg.Key), "message key should be event ID") + + // Verify system metadata in headers + metadata := msg.Metadata + assert.NotEmpty(t, metadata["timestamp"], "timestamp should be present") + testsuite.AssertTimestampIsUnixSeconds(t, metadata["timestamp"]) + assert.Equal(t, event.ID, metadata["event-id"], "event-id should match") + assert.Equal(t, event.Topic, metadata["topic"], "topic should match") + + // Verify custom metadata + for k, v := range event.Metadata { + assert.Equal(t, v, metadata[k], "metadata key %s should match expected value", k) + } +} + +// KafkaPublishSuite runs the shared publisher test suite for Kafka +type KafkaPublishSuite struct { + testsuite.PublisherSuite + consumer *KafkaConsumer +} + +func (s *KafkaPublishSuite) SetupSuite() { + t := s.T() + t.Cleanup(testinfra.Start(t)) + + brokerAddr := testinfra.EnsureKafka() + topic := "test-topic-" + idgen.String() + + // Ensure topic exists by creating it + ensureKafkaTopic(t, brokerAddr, topic) + + provider, err := destkafka.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + dest := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("kafka"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "brokers": brokerAddr, + "topic": topic, + "sasl_mechanism": "plain", + }), + testutil.DestinationFactory.WithCredentials(map[string]string{ + "username": "user", + "password": "pass", + }), + ) + + consumer, err := NewKafkaConsumer(brokerAddr, topic) + require.NoError(t, err) + s.consumer = consumer + + s.InitSuite(testsuite.Config{ + Provider: provider, + Dest: &dest, + Consumer: consumer, + Asserter: &KafkaAsserter{}, + }) +} + +func (s *KafkaPublishSuite) TearDownSuite() { + if s.consumer != nil { + s.consumer.Close() + } +} + +func TestKafkaPublishIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + suite.Run(t, new(KafkaPublishSuite)) +} + +// TestKafkaPublisher_ConnectionErrors tests that connection errors return a Delivery object alongside the error. +func TestKafkaPublisher_ConnectionErrors(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + brokers string + description string + expectedCode string + }{ + { + name: "connection refused", + brokers: "127.0.0.1:1", + description: "simulates a server that is not running", + expectedCode: "connection_refused", + }, + { + name: "DNS failure", + brokers: "this-domain-does-not-exist-abc123xyz.invalid:9092", + description: "simulates an invalid/non-existent domain", + expectedCode: "dns_error", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + provider, err := destkafka.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("kafka"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "brokers": tt.brokers, + "topic": "test-topic", + "sasl_mechanism": "plain", + }), + testutil.DestinationFactory.WithCredentials(map[string]string{ + "username": "user", + "password": "pass", + }), + ) + + publisher, err := provider.CreatePublisher(context.Background(), &destination) + require.NoError(t, err) + defer publisher.Close() + + event := testutil.EventFactory.Any( + testutil.EventFactory.WithDataMap(map[string]interface{}{"key": "value"}), + ) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + delivery, err := publisher.Publish(ctx, &event) + + require.Error(t, err, "should return error for %s", tt.description) + require.NotNil(t, delivery, "delivery should NOT be nil for connection errors") + assert.Equal(t, "failed", delivery.Status, "delivery status should be 'failed'") + assert.Equal(t, tt.expectedCode, delivery.Code, "delivery code should indicate error type") + }) + } +} + +// Helper to ensure a Kafka topic exists +func ensureKafkaTopic(t *testing.T, brokerAddr, topic string) { + t.Helper() + + conn, err := kafka.Dial("tcp", brokerAddr) + require.NoError(t, err) + defer conn.Close() + + err = conn.CreateTopics(kafka.TopicConfig{ + Topic: topic, + NumPartitions: 1, + ReplicationFactor: 1, + }) + require.NoError(t, err) + + // Give Kafka a moment to fully create the topic + time.Sleep(500 * time.Millisecond) +} diff --git a/internal/destregistry/providers/destkafka/destkafka_validate_test.go b/internal/destregistry/providers/destkafka/destkafka_validate_test.go new file mode 100644 index 000000000..7d83a36b3 --- /dev/null +++ b/internal/destregistry/providers/destkafka/destkafka_validate_test.go @@ -0,0 +1,370 @@ +package destkafka_test + +import ( + "context" + "fmt" + "maps" + "testing" + + "github.com/hookdeck/outpost/internal/destregistry" + "github.com/hookdeck/outpost/internal/destregistry/providers/destkafka" + "github.com/hookdeck/outpost/internal/util/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestKafkaDestination_Validate(t *testing.T) { + t.Parallel() + + validDestination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("kafka"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "brokers": "localhost:9092", + "topic": "test-topic", + "sasl_mechanism": "plain", + }), + testutil.DestinationFactory.WithCredentials(map[string]string{ + "username": "user", + "password": "pass", + }), + ) + + kafkaDestination, err := destkafka.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + t.Run("should validate valid destination", func(t *testing.T) { + t.Parallel() + assert.NoError(t, kafkaDestination.Validate(context.Background(), &validDestination)) + }) + + t.Run("should validate invalid type", func(t *testing.T) { + t.Parallel() + dest := validDestination + dest.Config = maps.Clone(validDestination.Config) + dest.Credentials = maps.Clone(validDestination.Credentials) + dest.Type = "invalid" + err := kafkaDestination.Validate(context.Background(), &dest) + var validationErr *destregistry.ErrDestinationValidation + assert.ErrorAs(t, err, &validationErr) + assert.Equal(t, "type", validationErr.Errors[0].Field) + assert.Equal(t, "invalid_type", validationErr.Errors[0].Type) + }) + + t.Run("should validate missing brokers", func(t *testing.T) { + t.Parallel() + dest := validDestination + dest.Config = map[string]string{ + "topic": "test-topic", + "sasl_mechanism": "plain", + } + dest.Credentials = maps.Clone(validDestination.Credentials) + err := kafkaDestination.Validate(context.Background(), &dest) + var validationErr *destregistry.ErrDestinationValidation + assert.ErrorAs(t, err, &validationErr) + assert.Equal(t, "config.brokers", validationErr.Errors[0].Field) + assert.Equal(t, "required", validationErr.Errors[0].Type) + }) + + t.Run("should validate missing topic", func(t *testing.T) { + t.Parallel() + dest := validDestination + dest.Config = map[string]string{ + "brokers": "localhost:9092", + "sasl_mechanism": "plain", + } + dest.Credentials = maps.Clone(validDestination.Credentials) + err := kafkaDestination.Validate(context.Background(), &dest) + var validationErr *destregistry.ErrDestinationValidation + assert.ErrorAs(t, err, &validationErr) + assert.Equal(t, "config.topic", validationErr.Errors[0].Field) + assert.Equal(t, "required", validationErr.Errors[0].Type) + }) + + t.Run("should validate sasl_mechanism values", func(t *testing.T) { + t.Parallel() + testCases := []struct { + name string + mechanism string + shouldError bool + }{ + {name: "valid plain", mechanism: "plain", shouldError: false}, + {name: "valid scram-sha-256", mechanism: "scram-sha-256", shouldError: false}, + {name: "valid scram-sha-512", mechanism: "scram-sha-512", shouldError: false}, + {name: "invalid mechanism", mechanism: "oauth", shouldError: true}, + {name: "empty is invalid (auth required)", mechanism: "", shouldError: true}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + dest := validDestination + dest.Config = maps.Clone(validDestination.Config) + dest.Credentials = map[string]string{"username": "user", "password": "pass"} + dest.Config["sasl_mechanism"] = tc.mechanism + err := kafkaDestination.Validate(context.Background(), &dest) + if tc.shouldError { + var validationErr *destregistry.ErrDestinationValidation + if !assert.ErrorAs(t, err, &validationErr) { + return + } + assert.Equal(t, "config.sasl_mechanism", validationErr.Errors[0].Field) + } else { + assert.NoError(t, err) + } + }) + } + }) + + t.Run("should validate tls config values", func(t *testing.T) { + t.Parallel() + testCases := []struct { + name string + tlsValue string + shouldError bool + }{ + {name: "valid true", tlsValue: "true", shouldError: false}, + {name: "valid on", tlsValue: "on", shouldError: false}, + {name: "valid false", tlsValue: "false", shouldError: false}, + {name: "invalid value", tlsValue: "yes", shouldError: true}, + {name: "empty value is valid (not configured)", tlsValue: "", shouldError: false}, + {name: "case sensitive True", tlsValue: "True", shouldError: true}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + dest := validDestination + dest.Config = maps.Clone(validDestination.Config) + dest.Credentials = maps.Clone(validDestination.Credentials) + dest.Config["tls"] = tc.tlsValue + err := kafkaDestination.Validate(context.Background(), &dest) + if tc.shouldError { + var validationErr *destregistry.ErrDestinationValidation + if !assert.ErrorAs(t, err, &validationErr) { + return + } + assert.Equal(t, "config.tls", validationErr.Errors[0].Field) + assert.Equal(t, "invalid", validationErr.Errors[0].Type) + } else { + assert.NoError(t, err) + } + }) + } + }) + + t.Run("should require credentials", func(t *testing.T) { + t.Parallel() + testCases := []struct { + name string + username string + password string + shouldError bool + expectedField string + }{ + {name: "with credentials", username: "user", password: "pass", shouldError: false}, + {name: "without username", username: "", password: "pass", shouldError: true, expectedField: "credentials.username"}, + {name: "without password", username: "user", password: "", shouldError: true, expectedField: "credentials.password"}, + {name: "without both", username: "", password: "", shouldError: true, expectedField: "credentials.username"}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + dest := validDestination + dest.Config = maps.Clone(validDestination.Config) + dest.Credentials = map[string]string{ + "username": tc.username, + "password": tc.password, + } + err := kafkaDestination.Validate(context.Background(), &dest) + if tc.shouldError { + var validationErr *destregistry.ErrDestinationValidation + if !assert.ErrorAs(t, err, &validationErr) { + return + } + assert.Equal(t, tc.expectedField, validationErr.Errors[0].Field) + assert.Equal(t, "required", validationErr.Errors[0].Type) + } else { + assert.NoError(t, err) + } + }) + } + }) + + t.Run("should allow tls to be omitted", func(t *testing.T) { + t.Parallel() + dest := validDestination + dest.Config = maps.Clone(validDestination.Config) + dest.Credentials = maps.Clone(validDestination.Credentials) + delete(dest.Config, "tls") + assert.NoError(t, kafkaDestination.Validate(context.Background(), &dest)) + }) +} + +func TestKafkaDestination_ComputeTarget(t *testing.T) { + t.Parallel() + + kafkaDestination, err := destkafka.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + t.Run("should return 'broker / topic' for single broker", func(t *testing.T) { + t.Parallel() + dest := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("kafka"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "brokers": "broker1:9092", + "topic": "my-topic", + }), + ) + target := kafkaDestination.ComputeTarget(&dest) + assert.Equal(t, "broker1:9092 / my-topic", target.Target) + assert.Empty(t, target.TargetURL) + }) + + t.Run("should return first broker for multiple brokers", func(t *testing.T) { + t.Parallel() + dest := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("kafka"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "brokers": "broker1:9092,broker2:9092,broker3:9092", + "topic": "my-topic", + }), + ) + target := kafkaDestination.ComputeTarget(&dest) + assert.Equal(t, "broker1:9092 / my-topic", target.Target) + }) +} + +func TestKafkaDestination_CreatePublisher_SASL(t *testing.T) { + t.Parallel() + + kafkaDestination, err := destkafka.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + testCases := []struct { + name string + mechanism string + }{ + {name: "plain", mechanism: "plain"}, + {name: "scram-sha-256", mechanism: "scram-sha-256"}, + {name: "scram-sha-512", mechanism: "scram-sha-512"}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + dest := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("kafka"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "brokers": "localhost:9092", + "topic": "test-topic", + "sasl_mechanism": tc.mechanism, + }), + testutil.DestinationFactory.WithCredentials(map[string]string{ + "username": "testuser", + "password": "testpass", + }), + ) + publisher, err := kafkaDestination.CreatePublisher(context.Background(), &dest) + require.NoError(t, err) + assert.NotNil(t, publisher) + publisher.Close() + }) + } +} + +func TestClassifyKafkaError(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + err error + expected string + }{ + {name: "nil error", err: nil, expected: "unknown"}, + {name: "SASL error", err: fmt.Errorf("SASL handshake failed"), expected: "auth_failed"}, + {name: "authentication error", err: fmt.Errorf("authentication failed"), expected: "auth_failed"}, + {name: "connection refused", err: fmt.Errorf("dial tcp: connection refused"), expected: "connection_refused"}, + {name: "dns error", err: fmt.Errorf("no such host"), expected: "dns_error"}, + {name: "unknown topic", err: fmt.Errorf("Unknown Topic Or Partition"), expected: "topic_not_found"}, + {name: "message too large", err: fmt.Errorf("MESSAGE_TOO_LARGE"), expected: "message_too_large"}, + {name: "timeout", err: fmt.Errorf("i/o timeout"), expected: "timeout"}, + {name: "context deadline", err: fmt.Errorf("context deadline exceeded"), expected: "timeout"}, + {name: "tls error", err: fmt.Errorf("tls: handshake failure"), expected: "tls_error"}, + {name: "x509 error", err: fmt.Errorf("x509: certificate signed by unknown authority"), expected: "tls_error"}, + {name: "generic error", err: fmt.Errorf("something went wrong"), expected: "kafka_error"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + assert.Equal(t, tt.expected, destkafka.ClassifyKafkaError(tt.err)) + }) + } +} + +func TestKafkaDestination_Preprocess(t *testing.T) { + t.Parallel() + + kafkaDestination, err := destkafka.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + t.Run("should normalize tls 'on' to 'true'", func(t *testing.T) { + t.Parallel() + dest := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("kafka"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "brokers": "broker1:9092", + "topic": "my-topic", + "tls": "on", + "sasl_mechanism": "plain", + }), + testutil.DestinationFactory.WithCredentials(map[string]string{ + "username": "user", + "password": "pass", + }), + ) + err := kafkaDestination.Preprocess(&dest, nil, nil) + require.NoError(t, err) + assert.Equal(t, "true", dest.Config["tls"]) + }) + + t.Run("should normalize empty tls to 'false'", func(t *testing.T) { + t.Parallel() + dest := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("kafka"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "brokers": "broker1:9092", + "topic": "my-topic", + "tls": "", + "sasl_mechanism": "plain", + }), + testutil.DestinationFactory.WithCredentials(map[string]string{ + "username": "user", + "password": "pass", + }), + ) + err := kafkaDestination.Preprocess(&dest, nil, nil) + require.NoError(t, err) + assert.Equal(t, "false", dest.Config["tls"]) + }) + + t.Run("should trim whitespace from brokers", func(t *testing.T) { + t.Parallel() + dest := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("kafka"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "brokers": " broker1:9092 , broker2:9092 ", + "topic": "my-topic", + "sasl_mechanism": "plain", + }), + testutil.DestinationFactory.WithCredentials(map[string]string{ + "username": "user", + "password": "pass", + }), + ) + err := kafkaDestination.Preprocess(&dest, nil, nil) + require.NoError(t, err) + assert.Equal(t, "broker1:9092,broker2:9092", dest.Config["brokers"]) + }) +} diff --git a/internal/portal/src/app.scss b/internal/portal/src/app.scss index db99d322d..068150dd1 100644 --- a/internal/portal/src/app.scss +++ b/internal/portal/src/app.scss @@ -3,7 +3,7 @@ margin: 0 auto; display: flex; flex-direction: column; - max-height: 100vh; + min-height: 100vh; padding-left: calc(var(--base-grid-multiplier) * 24); padding-right: calc(var(--base-grid-multiplier) * 24); diff --git a/internal/portal/src/common/DestinationConfigFields/DestinationConfigFields.scss b/internal/portal/src/common/DestinationConfigFields/DestinationConfigFields.scss index bfcc6f393..b0f2c1e9e 100644 --- a/internal/portal/src/common/DestinationConfigFields/DestinationConfigFields.scss +++ b/internal/portal/src/common/DestinationConfigFields/DestinationConfigFields.scss @@ -51,6 +51,16 @@ } } + select { + width: 100%; + cursor: pointer; + appearance: none; + background-image: url("data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' width='12' height='12' viewBox='0 0 24 24' fill='none' stroke='%23888' stroke-width='2' stroke-linecap='round' stroke-linejoin='round'%3E%3Cpath d='M6 9l6 6 6-6'/%3E%3C/svg%3E"); + background-repeat: no-repeat; + background-position: right var(--spacing-3) center; + padding-right: var(--spacing-8); + } + .description { font-size: var(--font-size-s); line-height: var(--line-height-s); diff --git a/internal/portal/src/common/DestinationConfigFields/DestinationConfigFields.tsx b/internal/portal/src/common/DestinationConfigFields/DestinationConfigFields.tsx index c817024e4..4d0c3d3fa 100644 --- a/internal/portal/src/common/DestinationConfigFields/DestinationConfigFields.tsx +++ b/internal/portal/src/common/DestinationConfigFields/DestinationConfigFields.tsx @@ -172,6 +172,27 @@ const DestinationConfigFields = ({ )} )} + {field.type === "select" && ( + + )} {field.type === "checkbox" && ( div { padding: 4px; @@ -87,15 +85,13 @@ } &__fields { - flex: 0 1 auto; - min-height: 0; + flex: 1 1 auto; } &__actions { margin-top: var(--spacing-8); display: flex; justify-content: flex-end; - flex-shrink: 0; } } } @@ -104,10 +100,8 @@ border: 1px solid var(--colors-outline-neutral); border-radius: var(--radius-m); overflow: hidden; - height: 100%; &__container { - overflow-y: auto; display: flex; flex-direction: column; } diff --git a/internal/portal/src/scenes/CreateDestination/CreateDestination.tsx b/internal/portal/src/scenes/CreateDestination/CreateDestination.tsx index eb4cce43d..e1b4ed3f0 100644 --- a/internal/portal/src/scenes/CreateDestination/CreateDestination.tsx +++ b/internal/portal/src/scenes/CreateDestination/CreateDestination.tsx @@ -34,6 +34,7 @@ type Step = { destinationTypes?: Record; }) => React.ReactNode; action: string; + autoAdvance?: boolean; }; const EVENT_TOPICS_STEP: Step = { @@ -137,6 +138,7 @@ const DESTINATION_TYPE_STEP: Step = { ), action: "Next", + autoAdvance: true, }; const CONFIGURATION_STEP: Step = { @@ -383,6 +385,12 @@ export default function CreateDestination() { const values = Object.fromEntries(formData.entries()); const allValues = { ...stepValues, ...values }; + if (currentStep.autoAdvance && nextStep && currentStep.isValid?.(allValues)) { + setStepValues(allValues); + setCurrentStepIndex(currentStepIndex + 1); + return; + } + if (currentStep.isValid) { setIsValid(currentStep.isValid(allValues)); } else { @@ -423,16 +431,18 @@ export default function CreateDestination() { )} -
- -
+ {!currentStep.autoAdvance && ( +
+ +
+ )} diff --git a/internal/portal/src/typings/Destination.ts b/internal/portal/src/typings/Destination.ts index 459910c14..3caee4c4b 100644 --- a/internal/portal/src/typings/Destination.ts +++ b/internal/portal/src/typings/Destination.ts @@ -1,5 +1,5 @@ interface ConfigField { - type: "text" | "checkbox" | "key_value_map"; + type: "text" | "checkbox" | "key_value_map" | "select"; label: string; description: string; key: string; @@ -12,6 +12,7 @@ interface ConfigField { minlength?: number; maxlength?: number; pattern?: string; + options?: Array<{ label: string; value: string }>; key_placeholder?: string; value_placeholder?: string; } diff --git a/internal/util/testinfra/kafka.go b/internal/util/testinfra/kafka.go new file mode 100644 index 000000000..9d72ed0cf --- /dev/null +++ b/internal/util/testinfra/kafka.go @@ -0,0 +1,64 @@ +package testinfra + +import ( + "context" + "log" + "sync" + + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" +) + +var kafkaOnce sync.Once + +func EnsureKafka() string { + cfg := ReadConfig() + if cfg.KafkaURL == "" { + kafkaOnce.Do(func() { + startKafkaTestContainer(cfg) + }) + } + return cfg.KafkaURL +} + +func startKafkaTestContainer(cfg *Config) { + ctx := context.Background() + + // Use a fixed host port so advertised listeners match what clients connect to. + // Kafka brokers redirect clients to the advertised address, so if testcontainers + // maps to a random port but the broker advertises 9092, clients would fail. + const hostPort = "19092" + + req := testcontainers.ContainerRequest{ + Image: "confluentinc/confluent-local:7.4.0", + ExposedPorts: []string{hostPort + ":9092/tcp"}, + Env: map[string]string{ + "KAFKA_ADVERTISED_LISTENERS": "PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:" + hostPort, + "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT", + "KAFKA_LISTENERS": "PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093", + "KAFKA_INTER_BROKER_LISTENER_NAME": "PLAINTEXT", + "KAFKA_CONTROLLER_LISTENER_NAMES": "CONTROLLER", + "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "1", + "KAFKA_TRANSACTION_STATE_LOG_MIN_ISR": "1", + "KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR": "1", + }, + WaitingFor: wait.ForListeningPort("9092/tcp"), + } + + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + if err != nil { + panic(err) + } + + endpoint := "localhost:" + hostPort + log.Printf("Kafka running at %s", endpoint) + cfg.KafkaURL = endpoint + cfg.cleanupFns = append(cfg.cleanupFns, func() { + if err := container.Terminate(ctx); err != nil { + log.Printf("failed to terminate kafka container: %s", err) + } + }) +} diff --git a/internal/util/testinfra/testinfra.go b/internal/util/testinfra/testinfra.go index 3c8cf2cc2..6b4ce3f04 100644 --- a/internal/util/testinfra/testinfra.go +++ b/internal/util/testinfra/testinfra.go @@ -26,6 +26,7 @@ type Config struct { PostgresURL string LocalStackURL string RabbitMQURL string + KafkaURL string MockServerURL string GCPURL string AzureSBConnString string @@ -75,6 +76,7 @@ func initConfig() { GCPURL: v.GetString("TEST_GCP_URL"), AzureSBConnString: v.GetString("TEST_AZURE_SB_CONNSTRING"), RabbitMQURL: rabbitmqURL, + KafkaURL: v.GetString("TEST_KAFKA_URL"), MockServerURL: mockServerURL, } return @@ -89,6 +91,7 @@ func initConfig() { GCPURL: "", AzureSBConnString: "", RabbitMQURL: "", + KafkaURL: "", MockServerURL: "", } }