diff --git a/package-lock.json b/package-lock.json index 298e2db..fe9822a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -28,7 +28,7 @@ "@typescript-eslint/eslint-plugin": "^8.35.1", "@typescript-eslint/parser": "^8.35.1", "@typescript-eslint/typescript-estree": "^8.35.1", - "chai": "^5.2.0", + "chai": "^4.5.0", "coveralls-next": "^4.2.1", "eslint": "^9.30.1", "glob": "^11.0.3", @@ -1477,13 +1477,13 @@ } }, "node_modules/assertion-error": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/assertion-error/-/assertion-error-2.0.1.tgz", - "integrity": "sha512-Izi8RQcffqCeNVgFigKli1ssklIbpHnCYc6AknXGYoB6grJqyeby7jv12JUQgmTAnIDnbck1uxksT4dzN3PWBA==", + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/assertion-error/-/assertion-error-1.1.0.tgz", + "integrity": "sha512-jgsaNduz+ndvGyFt3uSuWqvy4lCnIJiovtouQN5JZHOKCS2QuhEdbcQHFhVksz2N2U9hXJo8odG7ETyWlEeuDw==", "dev": true, "license": "MIT", "engines": { - "node": ">=12" + "node": "*" } }, "node_modules/asynckit": { @@ -1694,20 +1694,32 @@ "license": "CC-BY-4.0" }, "node_modules/chai": { - "version": "5.2.0", - "resolved": "https://registry.npmjs.org/chai/-/chai-5.2.0.tgz", - "integrity": "sha512-mCuXncKXk5iCLhfhwTc0izo0gtEmpz5CtG2y8GiOINBlMVS6v8TMRc5TaLWKS6692m9+dVVfzgeVxR5UxWHTYw==", + "version": "4.5.0", + "resolved": "https://registry.npmjs.org/chai/-/chai-4.5.0.tgz", + "integrity": "sha512-RITGBfijLkBddZvnn8jdqoTypxvqbOLYQkGGxXzeFjVHvudaPw0HNFD9x928/eUwYWd2dPCugVqspGALTZZQKw==", "dev": true, "license": "MIT", "dependencies": { - "assertion-error": "^2.0.1", - "check-error": "^2.1.1", - "deep-eql": "^5.0.1", - "loupe": "^3.1.0", - "pathval": "^2.0.0" + "assertion-error": "^1.1.0", + "check-error": "^1.0.3", + "deep-eql": "^4.1.3", + "get-func-name": "^2.0.2", + "loupe": "^2.3.6", + "pathval": "^1.1.1", + "type-detect": "^4.1.0" }, "engines": { - "node": ">=12" + "node": ">=4" + } + }, + "node_modules/chai/node_modules/type-detect": { + "version": "4.1.0", + "resolved": "https://registry.npmjs.org/type-detect/-/type-detect-4.1.0.tgz", + "integrity": "sha512-Acylog8/luQ8L7il+geoSxhEkazvkslg7PSNKOX59mbB9cOveP5aq9h74Y7YU8yDpJwetzQQrfIwtf4Wp4LKcw==", + "dev": true, + "license": "MIT", + "engines": { + "node": ">=4" } }, "node_modules/chalk": { @@ -1727,13 +1739,16 @@ } }, "node_modules/check-error": { - "version": "2.1.1", - "resolved": "https://registry.npmjs.org/check-error/-/check-error-2.1.1.tgz", - "integrity": "sha512-OAlb+T7V4Op9OwdkjmguYRqncdlx5JiofwOAUkmTF+jNdHwzTaTs4sRAGpzLF3oOz5xAyDGrPgeIDFQmDOTiJw==", + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/check-error/-/check-error-1.0.3.tgz", + "integrity": "sha512-iKEoDYaRmd1mxM90a2OEfWhjsjPpYPuQ+lMYsoxB126+t8fw7ySEO48nmDg5COTjxDI65/Y2OWpeEHk3ZOe8zg==", "dev": true, "license": "MIT", + "dependencies": { + "get-func-name": "^2.0.2" + }, "engines": { - "node": ">= 16" + "node": "*" } }, "node_modules/chokidar": { @@ -1975,11 +1990,14 @@ } }, "node_modules/deep-eql": { - "version": "5.0.2", - "resolved": "https://registry.npmjs.org/deep-eql/-/deep-eql-5.0.2.tgz", - "integrity": "sha512-h5k/5U50IJJFpzfL6nO9jaaumfjO/f2NjK/oYB2Djzm4p9L+3T9qWpZqZ2hAbLPuuYq9wrU08WQyBTL5GbPk5Q==", + "version": "4.1.4", + "resolved": "https://registry.npmjs.org/deep-eql/-/deep-eql-4.1.4.tgz", + "integrity": "sha512-SUwdGfqdKOwxCPeVYjwSyRpJ7Z+fhpwIAtmCUdZIWZ/YP5R9WAsyuSgpLVDi9bjWoN2LXHNss/dk3urXtdQxGg==", "dev": true, "license": "MIT", + "dependencies": { + "type-detect": "^4.0.0" + }, "engines": { "node": ">=6" } @@ -2604,6 +2622,16 @@ "integrity": "sha512-3t6rVToeoZfYSGd8YoLFR2DJkiQrIiUrGcjvFX2mDw3bn6k2OtwHN0TNCLbBO+w8qTvimhDkv+LSscbJY1vE6w==", "dev": true }, + "node_modules/get-func-name": { + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/get-func-name/-/get-func-name-2.0.2.tgz", + "integrity": "sha512-8vXOvuE167CtIc3OyItco7N/dpRtBbYOsPsXCz7X/PMnlGjYjSGuZJgM1Y7mmew7BKf9BqvLX2tnOVy1BBUsxQ==", + "dev": true, + "license": "MIT", + "engines": { + "node": "*" + } + }, "node_modules/get-package-type": { "version": "0.1.0", "resolved": "https://registry.npmjs.org/get-package-type/-/get-package-type-0.1.0.tgz", @@ -3305,11 +3333,14 @@ } }, "node_modules/loupe": { - "version": "3.1.4", - "resolved": "https://registry.npmjs.org/loupe/-/loupe-3.1.4.tgz", - "integrity": "sha512-wJzkKwJrheKtknCOKNEtDK4iqg/MxmZheEMtSTYvnzRdEYaZzmgH976nenp8WdJRdx5Vc1X/9MO0Oszl6ezeXg==", + "version": "2.3.7", + "resolved": "https://registry.npmjs.org/loupe/-/loupe-2.3.7.tgz", + "integrity": "sha512-zSMINGVYkdpYSOBmLi0D1Uo7JU9nVdQKrHxC8eYlV+9YKK9WePqAlL7lSlorG/U2Fw1w0hTBmaa/jrQ3UbPHtA==", "dev": true, - "license": "MIT" + "license": "MIT", + "dependencies": { + "get-func-name": "^2.0.1" + } }, "node_modules/lru-cache": { "version": "5.1.1", @@ -4222,13 +4253,13 @@ } }, "node_modules/pathval": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/pathval/-/pathval-2.0.1.tgz", - "integrity": "sha512-//nshmD55c46FuFw26xV/xFAaB5HF9Xdap7HJBBnrKdAd6/GxDBaNA1870O79+9ueg61cZLSVc+OaFlfmObYVQ==", + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/pathval/-/pathval-1.1.1.tgz", + "integrity": "sha512-Dp6zGqpTdETdR63lehJYPeIOqpiNBNtc7BpWSLrOje7UaIsE5aY92r/AunQA7rsXvet3lrJ3JnZX29UPTKXyKQ==", "dev": true, "license": "MIT", "engines": { - "node": ">= 14.16" + "node": "*" } }, "node_modules/pg": { diff --git a/package.json b/package.json index da7fef1..bad8dbc 100644 --- a/package.json +++ b/package.json @@ -66,7 +66,7 @@ "@typescript-eslint/eslint-plugin": "^8.35.1", "@typescript-eslint/parser": "^8.35.1", "@typescript-eslint/typescript-estree": "^8.35.1", - "chai": "^5.2.0", + "chai": "^4.5.0", "coveralls-next": "^4.2.1", "eslint": "^9.30.1", "glob": "^11.0.3", diff --git a/src/PgPubSub.ts b/src/PgPubSub.ts index 92f43ea..befddf7 100644 --- a/src/PgPubSub.ts +++ b/src/PgPubSub.ts @@ -296,7 +296,7 @@ export declare interface PgPubSub { */ export class PgPubSub extends EventEmitter { - public readonly pgClient: PgClient; + public pgClient: PgClient; public readonly options: PgPubSubOptions; public readonly channels: PgChannelEmitter = new PgChannelEmitter(); @@ -319,9 +319,6 @@ export class PgPubSub extends EventEmitter { this.pgClient = (this.options.pgClient || new Client(this.options)) as PgClient; - this.pgClient.on('end', () => this.emit('end')); - this.pgClient.on('error', () => this.emit('error')); - this.onNotification = this.options.executionLock ? this.onNotificationLockExec.bind(this) : this.onNotification.bind(this) @@ -329,7 +326,27 @@ export class PgPubSub extends EventEmitter { this.reconnect = this.reconnect.bind(this); this.onReconnect = this.onReconnect.bind(this); - this.pgClient.on('notification', this.onNotification); + this.initClientListeners(this.pgClient); + } + + /** + * Sets up event listeners on the given pg client instance. + * + * @param {PgClient} client - pg client to attach listeners to + */ + private initClientListeners(client: PgClient): void { + client.on('end', () => this.emit('end')); + client.on('error', (err: Error) => this.emit('error', err)); + client.on('notification', this.onNotification); + } + + /** + * Creates a fresh pg.Client instance. + * + * @return {PgClient} + */ + private createClient(): PgClient { + return new Client(this.options) as PgClient; } /** @@ -645,12 +662,16 @@ export class PgPubSub extends EventEmitter { /** * Reconnect routine, used for implementation of auto-reconnecting db - * connection + * connection. Creates a fresh pg.Client on each attempt since pg.Client + * cannot be reused after disconnect. * * @access private * @return {number} */ private reconnect(): number { + this.pgClient.off('end', this.reconnect); + this.pgClient.off('error', this.reconnect); + return setTimeout(async () => { if (this.options.retryLimit <= ++this.retry) { this.emit('error', new Error( @@ -660,6 +681,10 @@ export class PgPubSub extends EventEmitter { return this.close(); } + this.pgClient.removeAllListeners(); + this.pgClient = this.createClient(); + this.initClientListeners(this.pgClient); + this.setOnceHandler(['connect'], this.onReconnect); try { await this.connect(); } catch (err) { /* ignore */ } diff --git a/test/mocks/pg.ts b/test/mocks/pg.ts index 89f9068..497d2e7 100644 --- a/test/mocks/pg.ts +++ b/test/mocks/pg.ts @@ -28,18 +28,44 @@ export interface ClientConfig { connectionString?: string; } +let _onConnect: ((client: Client) => void) | null = null; + +export function setOnConnect(fn: ((client: Client) => void) | null): void { + _onConnect = fn; +} + +export function getOnConnect(): ((client: Client) => void) | null { + return _onConnect; +} + // noinspection JSUnusedGlobalSymbols export class Client extends EventEmitter { + private _connected = false; + private _ended = false; + // noinspection JSUnusedGlobalSymbols,JSUnusedLocalSymbols - public constructor(options: ClientConfig) { + public constructor(_options?: ClientConfig) { super(); this.setMaxListeners(Infinity); } public connect() { - this.emit('connect'); + if (this._connected || this._ended) { + throw new Error( + 'Client has already been connected. ' + + 'You cannot reuse a client.', + ); + } + this._connected = true; + + if (_onConnect) { + _onConnect(this); + } else { + this.emit('connect'); + } } // noinspection JSUnusedGlobalSymbols public end() { + this._ended = true; this.emit('end'); } public async query(queryText: string) { diff --git a/test/src/PgPubSub.ts b/test/src/PgPubSub.ts index 34b1749..18dea22 100644 --- a/test/src/PgPubSub.ts +++ b/test/src/PgPubSub.ts @@ -24,7 +24,8 @@ import '../mocks'; import { expect } from 'chai'; import { Client } from 'pg'; import * as sinon from 'sinon'; -import { PgClient, PgIpLock, PgPubSub, RETRY_LIMIT } from '../../src'; +import * as pgMock from '../mocks/pg'; +import { PgIpLock, PgPubSub, RETRY_LIMIT } from '../../src'; describe('PgPubSub', () => { let pgClient: Client; @@ -37,7 +38,7 @@ describe('PgPubSub', () => { payload: 'true', }); }); - } + }; beforeEach(() => { pgClient = new Client(); @@ -89,15 +90,20 @@ describe('PgPubSub', () => { }); }); }); + + afterEach(() => { + pgMock.setOnConnect(null); + }); + describe('reconnect', () => { it('should support automatic reconnect', done => { let counter = 0; - // emulate termination - (pgClient as any).connect = () => { + // Make all new client connections emit 'end' + pgMock.setOnConnect((client: any) => { counter++; - pgClient.emit('end'); - }; + client.emit('end'); + }); pubSub.on('error', err => { expect(err.message).equals( @@ -111,18 +117,15 @@ describe('PgPubSub', () => { it('should fire connect event only once', done => { let connectCalls = 0; - // emulate termination - (pgClient as any).connect = () => { + // First connection attempt fails, second succeeds + pgMock.setOnConnect((client: any) => { if (connectCalls < 1) { - pgClient.emit('error'); + client.emit('error'); + } else { + client.emit('connect'); } - - else { - pgClient.emit('connect'); - } - connectCalls++; - }; + }); // test will fail if done is called more than once pubSub.on('connect', done); @@ -131,11 +134,11 @@ describe('PgPubSub', () => { it('should support automatic reconnect on errors', done => { let counter = 0; - // emulate termination - (pgClient as any).connect = () => { + // Make all new client connections emit 'error' + pgMock.setOnConnect((client: any) => { counter++; - pgClient.emit('error'); - }; + client.emit('error'); + }); pubSub.on('error', err => { if (err) { @@ -149,10 +152,10 @@ describe('PgPubSub', () => { pubSub.connect().catch(() => { /* ignore faking errors */ }); }); it('should emit error and end if retry limit reached', async () => { - // emulate connection failure - (pgClient as any).connect = async () => { - pgClient.emit('end'); - }; + // Make all new client connections fail + pgMock.setOnConnect((client: any) => { + client.emit('end'); + }); try { await pubSub.connect(); } catch (err) { expect(err).to.be.instanceOf(Error); @@ -161,13 +164,27 @@ describe('PgPubSub', () => { ); } }); + it('should create a fresh client on reconnect', done => { + pubSub.connect().then(() => { + const originalClient = pubSub.pgClient; + + pubSub.once('reconnect', () => { + // After reconnect, pgClient should be a new instance + expect(pubSub.pgClient).to.not.equal(originalClient); + done(); + }); + + // Simulate connection drop + originalClient.emit('end'); + }); + }); it('should re-subscribe all channels', done => { pubSub.listen('TestOne'); pubSub.listen('TestTwo'); const spy = sinon.spy(pubSub, 'listen'); - pubSub.connect().then(() => pgClient.emit('end')); + pubSub.connect().then(() => pubSub.pgClient.emit('end')); setTimeout(() => { expect(spy.calledTwice).to.be.true; @@ -323,7 +340,7 @@ describe('PgPubSub', () => { const spy = sinon.spy(pubSub.pgClient, 'query'); await pubSub.notify('Test', { a: 'b' }); const [{ args: [arg, ] }] = spy.getCalls(); - expect(arg.trim()).equals(`NOTIFY "Test", '{"a":"b"}'`); + expect(arg.trim()).equals('NOTIFY "Test", \'{"a":"b"}\''); }); }); describe('Channels API', () => { @@ -332,9 +349,7 @@ describe('PgPubSub', () => { let pubSub3: PgPubSub; beforeEach(async () => { - const pgClientShared = new Client() as PgClient; - - pubSub1 = new PgPubSub({ pgClient: pgClientShared }); + pubSub1 = new PgPubSub({ pgClient: new Client() }); await pubSub1.connect(); await pubSub1.listen('ChannelOne'); await pubSub1.listen('ChannelTwo'); @@ -344,7 +359,7 @@ describe('PgPubSub', () => { await pubSub2.listen('ChannelThree'); await pubSub2.listen('ChannelFour'); - pubSub3 = new PgPubSub({ pgClient: pgClientShared }); + pubSub3 = new PgPubSub({ pgClient: new Client() }); await pubSub3.connect(); await pubSub3.listen('ChannelFive'); await pubSub3.listen('ChannelSix');