From 097eb9cc0b1930265889ca90fc69fc461aa7281e Mon Sep 17 00:00:00 2001 From: Femi Date: Sun, 21 Sep 2025 01:11:25 +0100 Subject: [PATCH 1/6] added package.json for kafka example Signed-off-by: Femi --- examples/kafka-ex/package.json | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 examples/kafka-ex/package.json diff --git a/examples/kafka-ex/package.json b/examples/kafka-ex/package.json new file mode 100644 index 00000000..e3e13547 --- /dev/null +++ b/examples/kafka-ex/package.json @@ -0,0 +1,25 @@ +{ + "name": "kafka-ex", + "version": "1.0.0", + "description": "kafka example using CloudEvents", + "repository": "https://github.com/cloudevents/sdk-javascript.git", + "main": "index.js", + "scripts": { + "build": "tsc", + "start:producer": "ts-node src/producer.ts", + "start:consumer": "ts-node src/consumer.ts" + }, + "keywords": [], + "author": "", + "license": "ISC", + "type": "commonjs", + "dependencies": { + "cloudevents": "^10.0.0", + "kafkajs": "^2.2.4", + "ts-node": "^10.9.2" + }, + "devDependencies": { + "@types/node": "^22.13.2", + "typescript": "^5.7.3" + } +} From 113cf3cafe04745a5663377dc7154a12bfc50873 Mon Sep 17 00:00:00 2001 From: Femi Date: Sun, 21 Sep 2025 01:13:59 +0100 Subject: [PATCH 2/6] feat: added client and admin for kafka topics Signed-off-by: Femi --- examples/kafka-ex/src/admin.ts | 15 +++++++++++++++ examples/kafka-ex/src/client.ts | 9 +++++++++ examples/kafka-ex/tsconfig.json | 19 +++++++++++++++++++ 3 files changed, 43 insertions(+) create mode 100644 examples/kafka-ex/src/admin.ts create mode 100644 examples/kafka-ex/src/client.ts create mode 100644 examples/kafka-ex/tsconfig.json diff --git a/examples/kafka-ex/src/admin.ts b/examples/kafka-ex/src/admin.ts new file mode 100644 index 00000000..143887e3 --- /dev/null +++ b/examples/kafka-ex/src/admin.ts @@ -0,0 +1,15 @@ +import kafka from "./client"; + +(async () => { + const admin = kafka.admin(); + await admin.connect(); + await admin.createTopics({ + topics: [ + { + topic: "events.cloudevents.test", + numPartitions: 2, + }, + ], + }); + await admin.disconnect(); +})(); diff --git a/examples/kafka-ex/src/client.ts b/examples/kafka-ex/src/client.ts new file mode 100644 index 00000000..e236ef56 --- /dev/null +++ b/examples/kafka-ex/src/client.ts @@ -0,0 +1,9 @@ +import "dotenv/config"; +import { Kafka } from "kafkajs"; + +const kafka = new Kafka({ + clientId: 'kafka-ex-client-id', + brokers: ['localhost:9092'], +}); + +export default kafka; \ No newline at end of file diff --git a/examples/kafka-ex/tsconfig.json b/examples/kafka-ex/tsconfig.json new file mode 100644 index 00000000..a3db8a3a --- /dev/null +++ b/examples/kafka-ex/tsconfig.json @@ -0,0 +1,19 @@ +{ + "compilerOptions": { + "target": "ES2020", + "module": "commonjs", + "allowJs": true, + "checkJs": false, + "strict": true, + "noImplicitAny": true, + "moduleResolution": "node", + "esModuleInterop": true, + }, + "include": [ + "src/**/*.ts", + "src/**/*.js" + ], + "exclude": [ + "node_modules" + ] +} \ No newline at end of file From a22d0c071e85d175d9fd3b7fe9f69d866b79b496 Mon Sep 17 00:00:00 2001 From: Femi Date: Sun, 21 Sep 2025 01:17:26 +0100 Subject: [PATCH 3/6] feat: added producer for kafka sample Signed-off-by: Femi --- examples/kafka-ex/src/admin.ts | 1 + examples/kafka-ex/src/client.ts | 1 + examples/kafka-ex/src/producer.ts | 40 +++++++++++++++++++++++++++++++ examples/kafka-ex/tsconfig.json | 10 ++++---- 4 files changed, 46 insertions(+), 6 deletions(-) create mode 100644 examples/kafka-ex/src/producer.ts diff --git a/examples/kafka-ex/src/admin.ts b/examples/kafka-ex/src/admin.ts index 143887e3..73788dea 100644 --- a/examples/kafka-ex/src/admin.ts +++ b/examples/kafka-ex/src/admin.ts @@ -1,3 +1,4 @@ +/* eslint-disable */ import kafka from "./client"; (async () => { diff --git a/examples/kafka-ex/src/client.ts b/examples/kafka-ex/src/client.ts index e236ef56..cf53ab6c 100644 --- a/examples/kafka-ex/src/client.ts +++ b/examples/kafka-ex/src/client.ts @@ -1,3 +1,4 @@ +/* eslint-disable */ import "dotenv/config"; import { Kafka } from "kafkajs"; diff --git a/examples/kafka-ex/src/producer.ts b/examples/kafka-ex/src/producer.ts new file mode 100644 index 00000000..92fe9230 --- /dev/null +++ b/examples/kafka-ex/src/producer.ts @@ -0,0 +1,40 @@ +/* eslint-disable */ +import { CloudEvent, Kafka } from "cloudevents"; +import readline from "readline"; +import kafka from "./client"; + +const rl = readline.createInterface({ + input: process.stdin, + output: process.stdout, +}); + +(async () => { + const producer = kafka.producer(); + await producer.connect(); + + rl.setPrompt("> "); + rl.prompt(); + rl.on("line", async (line) => { + const event = new CloudEvent({ + source: "cloudevents-producer", + type: "events.cloudevents.test", + datacontenttype: "text/plain", + partitionkey: "1", + data: line, + }); + + const message = Kafka.structured(event); + + console.log("Sending CloudEvent:", message); + + await producer.send({ + topic: "events.cloudevents.test", + messages: [message], + }); + rl.prompt(); + }); + + rl.on("close", async () => { + await producer.disconnect(); + }); +})(); diff --git a/examples/kafka-ex/tsconfig.json b/examples/kafka-ex/tsconfig.json index a3db8a3a..9065c97c 100644 --- a/examples/kafka-ex/tsconfig.json +++ b/examples/kafka-ex/tsconfig.json @@ -7,13 +7,11 @@ "strict": true, "noImplicitAny": true, "moduleResolution": "node", - "esModuleInterop": true, + "esModuleInterop": true }, "include": [ "src/**/*.ts", - "src/**/*.js" + "src/**/*.js", ], - "exclude": [ - "node_modules" - ] -} \ No newline at end of file + "exclude": ["node_modules"] +} From e445a818058dc58c6a2e112c57214cd0ee5d5a69 Mon Sep 17 00:00:00 2001 From: Femi Date: Sun, 21 Sep 2025 01:18:59 +0100 Subject: [PATCH 4/6] feat: added consumer for kafka sample Signed-off-by: Femi --- examples/kafka-ex/package.json | 1 - examples/kafka-ex/src/consumer.ts | 46 +++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) create mode 100644 examples/kafka-ex/src/consumer.ts diff --git a/examples/kafka-ex/package.json b/examples/kafka-ex/package.json index e3e13547..b4b837c9 100644 --- a/examples/kafka-ex/package.json +++ b/examples/kafka-ex/package.json @@ -3,7 +3,6 @@ "version": "1.0.0", "description": "kafka example using CloudEvents", "repository": "https://github.com/cloudevents/sdk-javascript.git", - "main": "index.js", "scripts": { "build": "tsc", "start:producer": "ts-node src/producer.ts", diff --git a/examples/kafka-ex/src/consumer.ts b/examples/kafka-ex/src/consumer.ts new file mode 100644 index 00000000..4229c7bb --- /dev/null +++ b/examples/kafka-ex/src/consumer.ts @@ -0,0 +1,46 @@ +/* eslint-disable */ +import { Headers, Kafka, Message } from "cloudevents"; +import kafka from "./client"; + +const groupId = process.argv[2]; + +(async () => { + const consumer = kafka.consumer({ groupId }); + await consumer.connect(); + + consumer.subscribe({ topic: "events.cloudevents.test" }); + + consumer.run({ + eachMessage: async ({ topic, partition, message }) => { + console.log("Raw Kafka message:", { + topic, + partition, + offset: message.offset, + headers: message.headers, + value: message.value?.toString(), + }); + + try { + let newHeaders: Headers = {}; + Object.keys(message.headers as Headers).forEach((key) => { + // this is needed here because the headers are buffer values + // when it gets to the consumer which is invalid for the + // toEvent api from cloudevents, so this converts each key value to a string + // as expected by the toEvent api + newHeaders[key] = message!.headers![key]?.toString() ?? ""; + }); + + message.headers = newHeaders; + const messageValue = Kafka.toEvent( + message as unknown as Message + ); + + console.log("Deserialized CloudEvent:", messageValue); + // message is automatically acknowledged when the callback is finished + } catch (error) { + console.error("Error deserializing CloudEvent:", error); + console.log("Raw message value:", message.value?.toString()); + } + }, + }); +})(); From 599c61ea47de9e4a115f6f9d384811e87f3065b8 Mon Sep 17 00:00:00 2001 From: Femi Date: Sun, 21 Sep 2025 01:20:02 +0100 Subject: [PATCH 5/6] chore: remove dotenv import in sample file for kafka client Signed-off-by: Femi --- examples/kafka-ex/src/client.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/kafka-ex/src/client.ts b/examples/kafka-ex/src/client.ts index cf53ab6c..1ac9db5a 100644 --- a/examples/kafka-ex/src/client.ts +++ b/examples/kafka-ex/src/client.ts @@ -1,5 +1,4 @@ /* eslint-disable */ -import "dotenv/config"; import { Kafka } from "kafkajs"; const kafka = new Kafka({ From bc33ca8154e0d891cdbb8f67087dead28c4b7e94 Mon Sep 17 00:00:00 2001 From: Femi Date: Mon, 22 Sep 2025 22:42:40 +0100 Subject: [PATCH 6/6] chore: added readme with usage and kafka installation guide for easy usage Signed-off-by: Femi --- examples/kafka-ex/README.md | 58 ++++++++++++++++++++++++++++ examples/kafka-ex/docker-compose.yml | 23 +++++++++++ examples/kafka-ex/src/consumer.ts | 5 ++- examples/kafka-ex/src/producer.ts | 3 +- 4 files changed, 86 insertions(+), 3 deletions(-) create mode 100644 examples/kafka-ex/README.md create mode 100644 examples/kafka-ex/docker-compose.yml diff --git a/examples/kafka-ex/README.md b/examples/kafka-ex/README.md new file mode 100644 index 00000000..1313756d --- /dev/null +++ b/examples/kafka-ex/README.md @@ -0,0 +1,58 @@ +# Kafka Example + +## Summary +This is an example on how to use the cloudevents javascript sdk with Kafka in NodeJs. + + +## Description +A simple cli application sending user input as a cloudevent message through a kafka producer to a topic. And eventually, the cloudevent message is handled and deserialized correctly by a consumer within a consumer group subscribed to the same topic. + +## Dependencies +- NodeJS (>18) +- Kafka running locally or remotely + +## Local Kafka Setup with Docker + +#### Option 1: Run Zookeeper and Kafka Dccker Images sequentially with these commands + +```bash +docker run -d \ + --name zookeeper \ + -e ZOOKEEPER_CLIENT_PORT=2181 \ + -e ZOOKEEPER_TICK_TIME=2000 \ + confluentinc/cp-zookeeper:7.3.2 + +``` +```bash +docker run -d \ + --name kafka \ + -p 9092:9092 \ + -e KAFKA_BROKER_ID=1 \ + -e KAFKA_ZOOKEEPER_CONNECT=localhost:2181 \ + -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \ + -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ + -e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \ + --link zookeeper:zookeeper \ + confluentinc/cp-kafka:7.3.2 + +``` + +#### Option 2: Run both images using the docker compose file + +```bash + cd ${directory of the docker compose file} + + docker compose up -d +``` + +## Then, run the producer (cli) and consumer + +#### To Start the Producer +```bash +npm run start:producer +``` + +#### To Start the Consumer +```bash +npm run start:consumer ${groupId} +``` diff --git a/examples/kafka-ex/docker-compose.yml b/examples/kafka-ex/docker-compose.yml new file mode 100644 index 00000000..8cd19562 --- /dev/null +++ b/examples/kafka-ex/docker-compose.yml @@ -0,0 +1,23 @@ +--- +version: '3' +services: + zookeeper: + image: confluentinc/cp-zookeeper:7.3.2 + container_name: zookeeper + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + kafka: + image: confluentinc/cp-kafka:7.3.2 + container_name: kafka + depends_on: + - zookeeper + ports: + - "9092:9092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 \ No newline at end of file diff --git a/examples/kafka-ex/src/consumer.ts b/examples/kafka-ex/src/consumer.ts index 4229c7bb..043b31d6 100644 --- a/examples/kafka-ex/src/consumer.ts +++ b/examples/kafka-ex/src/consumer.ts @@ -1,4 +1,5 @@ /* eslint-disable */ + import { Headers, Kafka, Message } from "cloudevents"; import kafka from "./client"; @@ -21,10 +22,10 @@ const groupId = process.argv[2]; }); try { - let newHeaders: Headers = {}; + const newHeaders: Headers = {}; Object.keys(message.headers as Headers).forEach((key) => { // this is needed here because the headers are buffer values - // when it gets to the consumer which is invalid for the + // when it gets to the consumer and the buffer headers are not valid for the // toEvent api from cloudevents, so this converts each key value to a string // as expected by the toEvent api newHeaders[key] = message!.headers![key]?.toString() ?? ""; diff --git a/examples/kafka-ex/src/producer.ts b/examples/kafka-ex/src/producer.ts index 92fe9230..e7ae18ba 100644 --- a/examples/kafka-ex/src/producer.ts +++ b/examples/kafka-ex/src/producer.ts @@ -1,4 +1,5 @@ /* eslint-disable */ + import { CloudEvent, Kafka } from "cloudevents"; import readline from "readline"; import kafka from "./client"; @@ -12,7 +13,7 @@ const rl = readline.createInterface({ const producer = kafka.producer(); await producer.connect(); - rl.setPrompt("> "); + rl.setPrompt("Enter message > "); rl.prompt(); rl.on("line", async (line) => { const event = new CloudEvent({