Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
da7bd46
feat: add Apache Kafka destination provider
alexluong Mar 22, 2026
37ce064
chore: add Kafka test infrastructure
alexluong Mar 22, 2026
5774ff8
fix(portal): fix destination type list scroll and config form overlap
alexluong Mar 22, 2026
baa9e51
feat(portal): add select field type for destination config
alexluong Mar 22, 2026
3d84504
chore: gofmt
alexluong Mar 22, 2026
ab3dac5
fix(kafka): catch Kafka protocol timeout errors in error classification
alexluong Mar 22, 2026
4e5aa35
docs: add select field type and kafka to openapi schema
alexluong Mar 22, 2026
16e2968
fix: harden kafka validation, writer config, and error classification
alexluong Mar 22, 2026
2dcfafd
fix: address kafka test infrastructure issues
alexluong Mar 22, 2026
0becbba
fix: add onChange handler to portal select field
alexluong Mar 22, 2026
0de4f7b
test: add SASL publisher and ClassifyKafkaError unit tests
alexluong Mar 22, 2026
b9d9b17
fix: remove redundant WriteTimeout from kafka writer
alexluong Mar 22, 2026
c80f566
docs: add partition_key_template to kafka OpenAPI spec example
alexluong Mar 22, 2026
a351fad
feat: require SASL auth and default TLS to on for Kafka destination
alexluong Mar 23, 2026
7234107
fix: use page-level scroll for create destination flow
alexluong Mar 23, 2026
e634f3f
fix(portal): sticky sidebar and auto-advance on destination type select
alexluong Mar 23, 2026
8853ad2
Merge remote-tracking branch 'origin/main' into feat/kafka-destination
alexluong Mar 23, 2026
e84b397
Merge remote-tracking branch 'origin/main' into feat/kafka-destination
alexluong Mar 23, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ TEST_POSTGRES_URL="localhost:35432"
TEST_CLICKHOUSE_URL="localhost:39000"
# MQs
TEST_RABBITMQ_URL="localhost:35672"
TEST_KAFKA_URL="localhost:39092"
TEST_LOCALSTACK_URL="localhost:34566"
TEST_GCP_URL="localhost:38085"
TEST_AZURE_SB_CONNSTRING="Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"
Expand Down
24 changes: 18 additions & 6 deletions build/test/compose.yml
Original file line number Diff line number Diff line change
@@ -1,38 +1,45 @@
name: "outpost-test"

services:
mock:
test-mock:
image: outpost-mock
build:
context: ../../
dockerfile: ./build/test/Dockerfile.mock
ports:
- 35555:5555
clickhouse:
test-clickhouse:
image: clickhouse/clickhouse-server:24-alpine
ports:
- 39000:9000
postgres:
test-postgres:
image: postgres:16-alpine
environment:
- POSTGRES_USER=outpost
- POSTGRES_PASSWORD=outpost
- POSTGRES_DB=default
ports:
- 35432:5432
rabbitmq:
test-rabbitmq:
image: rabbitmq:3-management
ports:
- 35672:5672
- 45672:15672
aws:
test-aws:
image: localstack/localstack:latest
environment:
- SERVICES=s3,sns,sts,sqs,kinesis
ports:
- 34566:4566
- 34571:4571
gcp:
test-kafka:
image: confluentinc/confluent-local:7.4.0
environment:
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://test-kafka:29092,PLAINTEXT_HOST://localhost:39092
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:9092
ports:
- 39092:9092
test-gcp:
image: gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators
command:
[
Expand All @@ -46,3 +53,8 @@ services:
]
ports:
- "38085:8085"

networks:
default:
name: outpost
external: true
75 changes: 73 additions & 2 deletions docs/apis/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2009,7 +2009,7 @@ components:
properties:
type:
type: string
enum: [text, checkbox, key_value_map]
enum: [text, checkbox, key_value_map, select]
example: "text"
label:
type: string
Expand Down Expand Up @@ -2040,6 +2040,19 @@ components:
type: string
description: Regex pattern for validation (compatible with HTML5 pattern attribute).
example: "^[a-zA-Z0-9_]+$"
options:
type: array
description: Available options for select fields.
items:
type: object
required: [label, value]
properties:
label:
type: string
example: "PLAIN"
value:
type: string
example: "plain"

MetricsDataPoint:
type: object
Expand Down Expand Up @@ -3675,6 +3688,64 @@ paths:
sensitive: true, # Added sensitive
},
]
- type: "kafka"
label: "Apache Kafka"
description: "Send events to Apache Kafka topics for real-time event streaming"
icon: "<svg />"
config_fields:
[
{
type: "text",
label: "Brokers",
description: "Comma-separated list of Kafka broker addresses.",
required: true,
},
{
type: "text",
label: "Topic",
description: "The Kafka topic to publish messages to.",
required: true,
},
{
type: "checkbox",
label: "TLS",
description: "Enable TLS for the connection.",
default: "true",
},
{
type: "text",
label: "Partition Key Template",
description: "JMESPath template to extract the partition key from the event payload.",
required: false,
},
{
type: "select",
label: "SASL Mechanism",
description: "SASL authentication mechanism.",
required: true,
options: [
{ label: "PLAIN", value: "plain" },
{ label: "SCRAM-SHA-256", value: "scram-sha-256" },
{ label: "SCRAM-SHA-512", value: "scram-sha-512" },
],
},
]
credential_fields:
[
{
type: "text",
label: "Username",
description: "SASL username for authentication.",
required: true,
},
{
type: "text",
label: "Password",
description: "SASL password for authentication.",
required: true,
sensitive: true,
},
]
- type: "aws_sqs"
label: "AWS SQS"
description: "Send event to an AWS SQS queue"
Expand Down Expand Up @@ -3731,7 +3802,7 @@ paths:
required: true
schema:
type: string
enum: [webhook, aws_sqs, rabbitmq, hookdeck, aws_kinesis, azure_servicebus, aws_s3, gcp_pubsub]
enum: [webhook, aws_sqs, rabbitmq, hookdeck, aws_kinesis, azure_servicebus, aws_s3, gcp_pubsub, kafka]
description: The type of the destination.
get:
tags: [Schemas]
Expand Down
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ require (
github.com/rabbitmq/amqp091-go v1.10.0
github.com/redis/go-redis/extra/redisotel/v9 v9.5.3
github.com/redis/go-redis/v9 v9.6.1
github.com/segmentio/kafka-go v0.4.50
github.com/spf13/viper v1.19.0
github.com/standard-webhooks/standard-webhooks/libraries v0.0.0-20250711233419-a173a6c0125c
github.com/stretchr/testify v1.10.0
Expand Down Expand Up @@ -205,6 +206,9 @@ require (
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
github.com/uptrace/opentelemetry-go-extra/otelutil v0.3.1 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.opencensus.io v0.24.0 // indirect
Expand All @@ -219,7 +223,7 @@ require (
golang.org/x/crypto v0.37.0 // indirect
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/net v0.36.0 // indirect
golang.org/x/net v0.38.0 // indirect
golang.org/x/sys v0.32.0 // indirect
golang.org/x/text v0.24.0 // indirect
golang.org/x/time v0.6.0 // indirect
Expand Down
11 changes: 9 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1227,6 +1227,8 @@ github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6g
github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
github.com/segmentio/kafka-go v0.4.50 h1:mcyC3tT5WeyWzrFbd6O374t+hmcu1NKt2Pu1L3QaXmc=
github.com/segmentio/kafka-go v0.4.50/go.mod h1:Y1gn60kzLEEaW28YshXyk2+VCUKbJ3Qr6DrnT3i4+9E=
github.com/shirou/gopsutil/v4 v4.25.1 h1:QSWkTc+fu9LTAWfkZwZ6j8MSUk4A2LV7rbH0ZqmLjXs=
github.com/shirou/gopsutil/v4 v4.25.1/go.mod h1:RoUCUpndaJFtT+2zsZzzmhvbfGoDCJ7nFXKJf8GqJbI=
github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k=
Expand Down Expand Up @@ -1297,9 +1299,14 @@ github.com/uptrace/opentelemetry-go-extra/otelzap v0.3.1 h1:0iCp8hx3PFhGihubKHxy
github.com/uptrace/opentelemetry-go-extra/otelzap v0.3.1/go.mod h1:FXrjpUJDqwqofvXWG3YNxQwhg2876tUpZASj8VvOMAM=
github.com/urfave/cli/v3 v3.4.1 h1:1M9UOCy5bLmGnuu1yn3t3CB4rG79Rtoxuv1sPhnm6qM=
github.com/urfave/cli/v3 v3.4.1/go.mod h1:FJSKtM/9AiiTOJL4fJ6TbMUkxBXn7GO9guZqoZtpYpo=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8=
github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down Expand Up @@ -1533,8 +1540,8 @@ golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/net v0.36.0 h1:vWF2fRbw4qslQsQzgFqZff+BItCvGFQqKzKIzx1rmoA=
golang.org/x/net v0.36.0/go.mod h1:bFmbeoIPfrw4sMHNhb4J9f6+tPziuGjq7Jk/38fxi1I=
golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down
23 changes: 23 additions & 0 deletions internal/destregistry/metadata/providers/kafka/instructions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Apache Kafka Configuration Instructions

Apache Kafka is a distributed event streaming platform used for building real-time data pipelines and streaming applications. It provides:

- High-throughput, low-latency message delivery
- Durable message storage with configurable retention
- Horizontal scalability through partitioning
- Built-in support for SASL authentication and TLS encryption

## How to configure Kafka as an event destination

To configure Kafka as a destination you must provide:

- **Brokers** — A comma-separated list of Kafka broker addresses (e.g., `broker1:9092,broker2:9092`)
- **Topic** — The Kafka topic to publish messages to
- **SASL Mechanism** — Authentication mechanism: `plain`, `scram-sha-256`, or `scram-sha-512`
- **Username** — SASL username for authentication
- **Password** — SASL password for authentication

### Optional settings

- **TLS** — Enable TLS encryption for the connection. Enabled by default.
- **Partition Key Template** — A JMESPath expression to extract the message key from the event payload. If not set, the event ID is used as the message key.
67 changes: 67 additions & 0 deletions internal/destregistry/metadata/providers/kafka/metadata.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
{
"type": "kafka",
"label": "Apache Kafka",
"description": "Send events to Apache Kafka topics for real-time event streaming",
"link": "https://kafka.apache.org/",
"config_fields": [
{
"key": "brokers",
"type": "text",
"label": "Brokers",
"description": "Comma-separated list of Kafka broker addresses (e.g., broker1:9092,broker2:9092)",
"required": true
},
{
"key": "topic",
"type": "text",
"label": "Topic",
"description": "The Kafka topic to publish messages to",
"required": true
},
{
"key": "tls",
"type": "checkbox",
"label": "TLS",
"description": "Enable TLS for the connection",
"default": "true"
},
{
"key": "partition_key_template",
"type": "text",
"label": "Partition Key Template",
"description": "JMESPath template to extract the partition key from the event payload (e.g., metadata.\"event-id\"). Default is event ID, which is also used as fallback if template evaluation fails or returns empty.",
"required": false
},
{
"key": "sasl_mechanism",
"type": "select",
"label": "SASL Mechanism",
"description": "SASL authentication mechanism",
"required": true,
"options": [
{ "label": "PLAIN", "value": "plain" },
{ "label": "SCRAM-SHA-256", "value": "scram-sha-256" },
{ "label": "SCRAM-SHA-512", "value": "scram-sha-512" }
]
}
],
"credential_fields": [
{
"key": "username",
"type": "text",
"label": "Username",
"description": "SASL username for authentication",
"required": true,
"sensitive": false
},
{
"key": "password",
"type": "text",
"label": "Password",
"description": "SASL password for authentication",
"required": true,
"sensitive": true
}
],
"icon": "<svg xmlns=\"http://www.w3.org/2000/svg\" viewBox=\"0 0 128 128\"><path d=\"M86.758 70.89c-4.992 0-9.465 2.208-12.528 5.68l-7.851-5.547a21.275 21.275 0 001.312-7.32c0-2.531-.46-4.95-1.27-7.203l7.837-5.488c3.062 3.457 7.523 5.652 12.5 5.652 9.207 0 16.703-7.48 16.703-16.672 0-9.195-7.496-16.672-16.703-16.672-9.211 0-16.707 7.477-16.707 16.672 0 1.645.25 3.23.699 4.735l-7.84 5.488a21.578 21.578 0 00-13.36-7.746v-9.43c7.567-1.586 13.27-8.293 13.27-16.312C62.82 7.53 55.324.055 46.117.055c-9.21 0-16.707 7.476-16.707 16.672 0 7.91 5.555 14.539 12.969 16.238v9.547c-10.117 1.773-17.84 10.59-17.84 21.191 0 10.652 7.797 19.5 17.992 21.211V95c-7.492 1.64-13.12 8.309-13.12 16.273 0 9.196 7.495 16.672 16.706 16.672 9.207 0 16.703-7.476 16.703-16.672 0-7.964-5.629-14.632-13.117-16.273V84.914a21.592 21.592 0 0013.133-7.625l7.902 5.586a16.45 16.45 0 00-.687 4.688c0 9.195 7.496 16.671 16.707 16.671 9.207 0 16.703-7.476 16.703-16.671 0-9.196-7.496-16.672-16.703-16.672zm0-38.984c4.465 0 8.097 3.63 8.097 8.086 0 4.453-3.632 8.082-8.097 8.082-4.469 0-8.102-3.629-8.102-8.082 0-4.457 3.633-8.086 8.102-8.086zm-48.742-15.18c0-4.456 3.632-8.081 8.101-8.081 4.465 0 8.098 3.625 8.098 8.082 0 4.457-3.633 8.082-8.098 8.082-4.469 0-8.101-3.625-8.101-8.082zm16.199 94.547c0 4.457-3.633 8.082-8.098 8.082-4.469 0-8.101-3.625-8.101-8.082 0-4.457 3.632-8.082 8.101-8.082 4.465 0 8.098 3.625 8.098 8.082zm-8.102-36.296c-6.226 0-11.293-5.059-11.293-11.274 0-6.219 5.067-11.277 11.293-11.277 6.23 0 11.297 5.058 11.297 11.277 0 6.215-5.066 11.274-11.297 11.274zm40.645 20.668c-4.469 0-8.102-3.625-8.102-8.082 0-4.458 3.633-8.083 8.102-8.083 4.465 0 8.097 3.625 8.097 8.082 0 4.458-3.632 8.083-8.097 8.083zm0 0\" fill=\"currentColor\"/></svg>"
}
34 changes: 20 additions & 14 deletions internal/destregistry/metadata/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,25 @@ type SetupLink struct {
Cta string `json:"cta"`
}

type FieldOption struct {
Label string `json:"label"`
Value string `json:"value"`
}

type FieldSchema struct {
Type string `json:"type"`
Label string `json:"label"`
Description string `json:"description"`
Key string `json:"key"`
Required bool `json:"required"`
Default *string `json:"default,omitempty"`
Disabled bool `json:"disabled,omitempty"`
Sensitive bool `json:"sensitive,omitempty"` // Whether the field value should be obfuscated in API responses
Min *int `json:"min,omitempty"` // Minimum value for numeric fields
Max *int `json:"max,omitempty"` // Maximum value for numeric fields
Step *int `json:"step,omitempty"` // Step value for numeric fields
MinLength *int `json:"minlength,omitempty"` // Minimum length for text fields
MaxLength *int `json:"maxlength,omitempty"` // Maximum length for text fields
Pattern *string `json:"pattern,omitempty"` // Regular expression pattern for text fields
Type string `json:"type"`
Label string `json:"label"`
Description string `json:"description"`
Key string `json:"key"`
Required bool `json:"required"`
Default *string `json:"default,omitempty"`
Disabled bool `json:"disabled,omitempty"`
Sensitive bool `json:"sensitive,omitempty"` // Whether the field value should be obfuscated in API responses
Min *int `json:"min,omitempty"` // Minimum value for numeric fields
Max *int `json:"max,omitempty"` // Maximum value for numeric fields
Step *int `json:"step,omitempty"` // Step value for numeric fields
MinLength *int `json:"minlength,omitempty"` // Minimum length for text fields
MaxLength *int `json:"maxlength,omitempty"` // Maximum length for text fields
Pattern *string `json:"pattern,omitempty"` // Regular expression pattern for text fields
Options []FieldOption `json:"options,omitempty"` // Options for select fields
}
Loading
Loading