diff --git a/package.json b/package.json index 4c99672..2d61fa6 100644 --- a/package.json +++ b/package.json @@ -46,6 +46,7 @@ "ioredis": "^5.9.0", "mqtt": "^5.14.1", "prettier": "^3.7.4", + "redis": "^5.10.0", "release-it": "^19.2.3", "testcontainers": "^11.11.0", "ts-node-maintained": "^10.9.6", @@ -58,11 +59,15 @@ "object-hash": "^3.0.0" }, "peerDependencies": { - "ioredis": "^5.0.0" + "ioredis": "^5.0.0", + "redis": "^5.0.0" }, "peerDependenciesMeta": { "ioredis": { "optional": true + }, + "redis": { + "optional": true } }, "author": "Romain Lanz ", diff --git a/src/transports/node_redis.ts b/src/transports/node_redis.ts new file mode 100644 index 0000000..a584f51 --- /dev/null +++ b/src/transports/node_redis.ts @@ -0,0 +1,194 @@ +/** + * @boringnode/bus + * + * @license MIT + * @copyright BoringNode + */ + +import { createClient, createCluster } from 'redis' +import { assert } from '@poppinss/utils/assert' + +import debug from '../debug.js' +import { JsonEncoder } from '../encoders/json_encoder.js' +import type { + Transport, + TransportEncoder, + TransportMessage, + Serializable, + SubscribeHandler, + NodeRedisTransportConfig, + NodeRedisTransportConnection, + NodeRedisTransportOptions, + NodeRedisClusterTransportConfig, +} from '../types/main.js' + +export function nodeRedis(config: NodeRedisTransportConfig | string, encoder?: TransportEncoder) { + return () => new NodeRedisTransport(config, encoder) +} + +function isNodeRedisConnection(value: unknown): value is NodeRedisTransportConnection { + const candidate = value as NodeRedisTransportConnection + + return ( + typeof candidate?.duplicate === 'function' && + typeof candidate?.publish === 'function' && + typeof candidate?.subscribe === 'function' + ) +} + +function isClusterConfig( + value: NodeRedisTransportConfig +): value is NodeRedisClusterTransportConfig { + const candidate = value as NodeRedisClusterTransportConfig + return typeof candidate?.rootNodes !== 'undefined' +} + +export class NodeRedisTransport implements Transport { + readonly #publisher: NodeRedisTransportConnection + readonly #subscriber: NodeRedisTransportConnection + readonly #encoder: TransportEncoder + readonly #useMessageBuffer: boolean = false + + #id: string | undefined + #connected: boolean = false + #connecting: Promise | null = null + + constructor(options: NodeRedisTransportConfig | string, encoder?: TransportEncoder) + constructor( + connection: NodeRedisTransportConnection, + encoder?: TransportEncoder, + options?: NodeRedisTransportOptions + ) + constructor( + options: NodeRedisTransportConfig | string | NodeRedisTransportConnection, + encoder?: TransportEncoder, + transportOptions?: NodeRedisTransportOptions + ) { + this.#encoder = encoder ?? new JsonEncoder() + + if (isNodeRedisConnection(options)) { + this.#publisher = options.duplicate() + this.#subscriber = options.duplicate() + this.#useMessageBuffer = transportOptions?.useMessageBuffer ?? false + this.#bindErrorHandlers() + return + } + + if (typeof options === 'string') { + this.#publisher = createClient({ url: options }) + this.#subscriber = createClient({ url: options }) + this.#bindErrorHandlers() + return + } + + this.#useMessageBuffer = options.useMessageBuffer ?? false + + if (isClusterConfig(options)) { + const { useMessageBuffer, ...clusterOptions } = options + this.#publisher = createCluster(clusterOptions) as unknown as NodeRedisTransportConnection + this.#subscriber = createCluster(clusterOptions) as unknown as NodeRedisTransportConnection + this.#bindErrorHandlers() + return + } + + const { useMessageBuffer, ...clientOptions } = options + this.#publisher = createClient(clientOptions) as unknown as NodeRedisTransportConnection + this.#subscriber = createClient(clientOptions) as unknown as NodeRedisTransportConnection + this.#bindErrorHandlers() + } + + setId(id: string): Transport { + this.#id = id + + return this + } + + async disconnect(): Promise { + await Promise.all([ + this.#publisher.isOpen ? this.#publisher.close() : Promise.resolve(), + this.#subscriber.isOpen ? this.#subscriber.close() : Promise.resolve(), + ]) + + this.#connected = false + this.#connecting = null + } + + async publish(channel: string, message: Serializable): Promise { + assert(this.#id, 'You must set an id before publishing a message') + await this.#ensureConnected() + + const encoded = this.#encoder.encode({ payload: message, busId: this.#id }) + + await this.#publisher.publish(channel, encoded) + } + + async subscribe( + channel: string, + handler: SubscribeHandler + ): Promise { + await this.#ensureConnected() + + await this.#subscriber.subscribe( + channel, + (message: string | Buffer) => { + debug('received message for channel "%s"', channel) + + const data = this.#encoder.decode>(message) + + /** + * Ignore messages published by this bus instance + */ + if (data.busId === this.#id) { + debug('ignoring message published by the same bus instance') + return + } + + // @ts-expect-error - TODO: Weird typing issue + handler(data.payload) + }, + this.#useMessageBuffer + ) + } + + onReconnect(callback: () => void): void { + this.#subscriber.on('reconnecting', callback) + } + + async unsubscribe(channel: string): Promise { + await this.#ensureConnected() + await this.#subscriber.unsubscribe(channel) + } + + async #ensureConnected() { + if (this.#connected) { + return + } + + if (this.#connecting) { + await this.#connecting + return + } + + this.#connecting = Promise.all([ + this.#publisher.isOpen ? Promise.resolve() : this.#publisher.connect(), + this.#subscriber.isOpen ? Promise.resolve() : this.#subscriber.connect(), + ]).then(() => undefined) + + try { + await this.#connecting + this.#connected = true + } finally { + this.#connecting = null + } + } + + #bindErrorHandlers() { + this.#publisher.on('error', (error) => { + debug('node-redis publisher error: %O', error) + }) + + this.#subscriber.on('error', (error) => { + debug('node-redis subscriber error: %O', error) + }) + } +} diff --git a/src/types/main.ts b/src/types/main.ts index 5ca8db1..fcf2128 100644 --- a/src/types/main.ts +++ b/src/types/main.ts @@ -7,8 +7,15 @@ import type { RedisOptions } from 'ioredis' import type { IClientOptions } from 'mqtt' +import type { + RedisClientOptions, + RedisClusterOptions, + createClient, + createCluster +} from 'redis' export type { Redis, Cluster } from 'ioredis' +export type { RedisClientType, RedisClusterType } from 'redis' export type TransportFactory = () => Transport /** @@ -48,6 +55,33 @@ export interface RedisTransportOptions { useMessageBuffer?: boolean } +export interface NodeRedisClientTransportConfig extends RedisClientOptions { + /** + * If true, we will subscribe in buffer mode. + */ + useMessageBuffer?: boolean +} + +export interface NodeRedisClusterTransportConfig extends RedisClusterOptions { + /** + * If true, we will subscribe in buffer mode. + */ + useMessageBuffer?: boolean +} + +export type NodeRedisTransportConnection = ReturnType | ReturnType + +export type NodeRedisTransportConfig = + | NodeRedisClientTransportConfig + | NodeRedisClusterTransportConfig + +export interface NodeRedisTransportOptions { + /** + * If true, we will subscribe in buffer mode. + */ + useMessageBuffer?: boolean +} + export enum MqttProtocol { MQTT = 'mqtt', MQTTS = 'mqtts', diff --git a/tests/drivers/node_redis_transport.spec.ts b/tests/drivers/node_redis_transport.spec.ts new file mode 100644 index 0000000..57a025a --- /dev/null +++ b/tests/drivers/node_redis_transport.spec.ts @@ -0,0 +1,274 @@ +/** + * @boringnode/bus + * + * @license MIT + * @copyright BoringNode + */ + +import { setTimeout } from 'node:timers/promises' +import { test } from '@japa/runner' +import { RedisContainer, StartedRedisContainer } from '@testcontainers/redis' +import { createClient, createCluster } from 'redis' +import { NodeRedisTransport } from '../../src/transports/node_redis.js' +import { JsonEncoder } from '../../src/encoders/json_encoder.js' +import { TransportEncoder, TransportMessage } from '../../src/types/main.js' + +test.group('Node Redis Transport', (group) => { + let container: StartedRedisContainer + + group.setup(async () => { + container = await new RedisContainer('redis:7.2').start() + + return async () => { + await container.stop() + } + }) + + test('transport should not receive message emitted by itself', async ({ assert, cleanup }) => { + const transport = new NodeRedisTransport(container.getConnectionUrl()).setId('bus') + cleanup(() => transport.disconnect()) + + await transport.subscribe('testing-channel', () => { + assert.fail('Bus should not receive message emitted by itself') + }) + + await transport.publish('testing-channel', 'test') + await setTimeout(1000) + }).disableTimeout() + + test('transport should receive message emitted by another bus', async ({ + assert, + cleanup, + }, done) => { + assert.plan(1) + + const transport1 = new NodeRedisTransport(container.getConnectionUrl()).setId('bus1') + const transport2 = new NodeRedisTransport(container.getConnectionUrl()).setId('bus2') + + cleanup(async () => { + await transport1.disconnect() + await transport2.disconnect() + }) + + await transport1.subscribe('testing-channel', (payload) => { + assert.equal(payload, 'test') + done() + }) + + await setTimeout(200) + + await transport2.publish('testing-channel', 'test') + }).waitForDone() + + test('transport should trigger onReconnect when the client reconnects', async ({ + assert, + cleanup, + }) => { + const transport = new NodeRedisTransport(container.getConnectionUrl()).setId('bus') + cleanup(() => transport.disconnect()) + + let onReconnectTriggered = false + transport.onReconnect(() => { + onReconnectTriggered = true + }) + + await transport.subscribe('testing-channel', () => {}) + + await container.restart() + await setTimeout(200) + + assert.isTrue(onReconnectTriggered) + }) + + test('message should be encoded and decoded correctly when using JSON encoder', async ({ + assert, + cleanup, + }, done) => { + assert.plan(1) + + const transport1 = new NodeRedisTransport(container.getConnectionUrl(), new JsonEncoder()).setId( + 'bus1' + ) + const transport2 = new NodeRedisTransport(container.getConnectionUrl(), new JsonEncoder()).setId( + 'bus2' + ) + + cleanup(async () => { + await transport1.disconnect() + await transport2.disconnect() + }) + + const data = { test: 'test' } + + await transport1.subscribe('testing-channel', (payload) => { + assert.deepEqual(payload, data) + done() + }) + + await setTimeout(200) + + await transport2.publish('testing-channel', data) + }).waitForDone() + + test('send binary data using useMessageBuffer', async ({ assert, cleanup }, done) => { + assert.plan(1) + + class BinaryEncoder implements TransportEncoder { + encode(message: TransportMessage) { + return Buffer.from(JSON.stringify(message)) + } + + decode(data: string | Buffer) { + const buffer = Buffer.isBuffer(data) ? data : Buffer.from(data, 'binary') + return JSON.parse(buffer.toString()) + } + } + + const transport1 = new NodeRedisTransport( + { url: container.getConnectionUrl(), useMessageBuffer: true }, + new BinaryEncoder() + ).setId('bus1') + + const transport2 = new NodeRedisTransport( + { url: container.getConnectionUrl(), useMessageBuffer: true }, + new BinaryEncoder() + ).setId('bus2') + + cleanup(() => { + transport1.disconnect() + transport2.disconnect() + }) + + const data = ['foo', '👍'] + + await transport1.subscribe('testing-channel', (payload) => { + assert.deepEqual(payload, data) + done() + }) + + await setTimeout(200) + await transport2.publish('testing-channel', data) + }).waitForDone() + + test('should work with an existing redis instance', async ({ assert, cleanup }, done) => { + assert.plan(1) + + const redisInstance = createClient({ + socket: { + host: container.getHost(), + port: container.getMappedPort(6379), + }, + }) + redisInstance.on('error', () => {}) + await redisInstance.connect() + + cleanup(async () => { + if (redisInstance.isOpen) { + await redisInstance.close() + } + }) + + const transport1 = new NodeRedisTransport(redisInstance).setId('bus1') + const transport2 = new NodeRedisTransport(redisInstance).setId('bus2') + + cleanup(async () => { + await transport1.disconnect() + await transport2.disconnect() + }) + + await transport1.subscribe('testing-channel', (payload) => { + assert.equal(payload, 'test') + done() + }) + + await setTimeout(200) + + await transport2.publish('testing-channel', 'test') + }).waitForDone() + + test('should work with an existing cluster instance', async ({ assert, cleanup }, done) => { + assert.plan(1) + + const cluster = createCluster({ + rootNodes: [{ url: 'redis://127.0.0.1:7000' }], + }) + cluster.on('error', () => {}) + await cluster.connect() + + cleanup(async () => { + if (cluster.isOpen) { + await cluster.close() + } + }) + + const transport1 = new NodeRedisTransport(cluster).setId('bus1') + const transport2 = new NodeRedisTransport(cluster).setId('bus2') + + cleanup(async () => { + await transport1.disconnect() + await transport2.disconnect() + }) + + await transport1.subscribe('testing-channel', (payload) => { + assert.equal(payload, 'test') + done() + }) + + await setTimeout(200) + + await transport2.publish('testing-channel', 'test') + }) + .waitForDone() + .skip(!!process.env.CI, 'Skipping cluster test on CI') + + test('send binary data using useMessageBuffer with existing redis instance', async ({ + assert, + cleanup, + }, done) => { + assert.plan(1) + + class BinaryEncoder implements TransportEncoder { + encode(message: TransportMessage) { + return Buffer.from(JSON.stringify(message)) + } + + decode(data: string | Buffer) { + const buffer = Buffer.isBuffer(data) ? data : Buffer.from(data, 'binary') + return JSON.parse(buffer.toString()) + } + } + + const redisInstance = createClient({ url: container.getConnectionUrl() }) + redisInstance.on('error', () => {}) + await redisInstance.connect() + + cleanup(async () => { + if (redisInstance.isOpen) { + await redisInstance.close() + } + }) + + const transport1 = new NodeRedisTransport(redisInstance, new BinaryEncoder(), { + useMessageBuffer: true, + }).setId('bus1') + + const transport2 = new NodeRedisTransport(redisInstance, new BinaryEncoder(), { + useMessageBuffer: true, + }).setId('bus2') + + cleanup(async () => { + await transport1.disconnect() + await transport2.disconnect() + }) + + const data = ['foo', '👍'] + + await transport1.subscribe('testing-channel', (payload) => { + assert.deepEqual(payload, data) + done() + }) + + await setTimeout(200) + await transport2.publish('testing-channel', data) + }).waitForDone() +})