From 62250bad5dd77bf8a73d012cfece0414a25ced96 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Tue, 1 Nov 2022 14:49:52 +0000 Subject: [PATCH 01/11] refactor: make `Connection#makeRequest` more readable --- src/connection.ts | 94 ++++++++++++++++++++++++----------------------- 1 file changed, 48 insertions(+), 46 deletions(-) diff --git a/src/connection.ts b/src/connection.ts index 20d35fd6c..72d7f0447 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -2978,68 +2978,70 @@ class Connection extends EventEmitter { if (this.state !== this.STATE.LOGGED_IN) { const message = 'Requests can only be made in the ' + this.STATE.LOGGED_IN.name + ' state, not the ' + this.state.name + ' state'; this.debug.log(message); - request.callback(new RequestError(message, 'EINVALIDSTATE')); - } else if (request.canceled) { - process.nextTick(() => { + return request.callback(new RequestError(message, 'EINVALIDSTATE')); + } + + if (request.canceled) { + return process.nextTick(() => { request.callback(new RequestError('Canceled.', 'ECANCEL')); }); + } + + if (packetType === TYPE.SQL_BATCH) { + this.isSqlBatch = true; } else { - if (packetType === TYPE.SQL_BATCH) { - this.isSqlBatch = true; - } else { - this.isSqlBatch = false; - } + this.isSqlBatch = false; + } - this.request = request; - request.connection! = this; - request.rowCount! = 0; - request.rows! = []; - request.rst! = []; + this.request = request; + request.connection! = this; + request.rowCount! = 0; + request.rows! = []; + request.rst! = []; - const onCancel = () => { - payloadStream.unpipe(message); - payloadStream.destroy(new RequestError('Canceled.', 'ECANCEL')); + const onCancel = () => { + payloadStream.unpipe(message); + payloadStream.destroy(new RequestError('Canceled.', 'ECANCEL')); - // set the ignore bit and end the message. - message.ignore = true; - message.end(); + // set the ignore bit and end the message. + message.ignore = true; + message.end(); - if (request instanceof Request && request.paused) { - // resume the request if it was paused so we can read the remaining tokens - request.resume(); - } - }; + if (request instanceof Request && request.paused) { + // resume the request if it was paused so we can read the remaining tokens + request.resume(); + } + }; - request.once('cancel', onCancel); + request.once('cancel', onCancel); - this.createRequestTimer(); + this.createRequestTimer(); - const message = new Message({ type: packetType, resetConnection: this.resetConnectionOnNextRequest }); - this.messageIo.outgoingMessageStream.write(message); - this.transitionTo(this.STATE.SENT_CLIENT_REQUEST); + const message = new Message({ type: packetType, resetConnection: this.resetConnectionOnNextRequest }); + this.messageIo.outgoingMessageStream.write(message); + this.transitionTo(this.STATE.SENT_CLIENT_REQUEST); - message.once('finish', () => { - request.removeListener('cancel', onCancel); - request.once('cancel', this._cancelAfterRequestSent); + message.once('finish', () => { + request.removeListener('cancel', onCancel); + request.once('cancel', this._cancelAfterRequestSent); - this.resetConnectionOnNextRequest = false; - this.debug.payload(function() { - return payload!.toString(' '); - }); + this.resetConnectionOnNextRequest = false; + this.debug.payload(function() { + return payload!.toString(' '); }); + }); - const payloadStream = Readable.from(payload); - payloadStream.once('error', (error) => { - payloadStream.unpipe(message); + const payloadStream = Readable.from(payload); + payloadStream.once('error', (error) => { + payloadStream.unpipe(message); - // Only set a request error if no error was set yet. - request.error ??= error; + // Only set a request error if no error was set yet. + request.error ??= error; - message.ignore = true; - message.end(); - }); - payloadStream.pipe(message); - } + message.ignore = true; + message.end(); + }); + payloadStream.pipe(message); } /** From 13f6e8bd371550cb3ac40a38c8b42cf22d398110 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Tue, 1 Nov 2022 14:57:05 +0000 Subject: [PATCH 02/11] refactor: move logic from event handler to `Connection#makeRequest` --- src/connection.ts | 155 +++++++++++++++++++++++----------------------- 1 file changed, 77 insertions(+), 78 deletions(-) diff --git a/src/connection.ts b/src/connection.ts index 72d7f0447..648e5e4ff 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -3042,6 +3042,82 @@ class Connection extends EventEmitter { message.end(); }); payloadStream.pipe(message); + + (async () => { + let message; + try { + message = await this.messageIo.readMessage(); + } catch (err: any) { + return this.socketError(err); + } + // request timer is stopped on first data package + this.clearRequestTimer(); + + const tokenStreamParser = this.createTokenStreamParser(message, new RequestTokenHandler(this, this.request!)); + + // If the request was canceled and we have a `cancelTimer` + // defined, we send a attention message after the + // request message was fully sent off. + // + // We already started consuming the current message + // (but all the token handlers should be no-ops), and + // need to ensure the next message is handled by the + // `SENT_ATTENTION` state. + if (this.request?.canceled && this.cancelTimer) { + return this.transitionTo(this.STATE.SENT_ATTENTION); + } + + const onResume = () => { + tokenStreamParser.resume(); + }; + const onPause = () => { + tokenStreamParser.pause(); + + this.request?.once('resume', onResume); + }; + + this.request?.on('pause', onPause); + + if (this.request instanceof Request && this.request.paused) { + onPause(); + } + + const onCancel = () => { + tokenStreamParser.removeListener('end', onEndOfMessage); + + if (this.request instanceof Request && this.request.paused) { + // resume the request if it was paused so we can read the remaining tokens + this.request.resume(); + } + + this.request?.removeListener('pause', onPause); + this.request?.removeListener('resume', onResume); + + // The `_cancelAfterRequestSent` callback will have sent a + // attention message, so now we need to also switch to + // the `SENT_ATTENTION` state to make sure the attention ack + // message is processed correctly. + this.transitionTo(this.STATE.SENT_ATTENTION); + }; + + const onEndOfMessage = () => { + this.request?.removeListener('cancel', this._cancelAfterRequestSent); + this.request?.removeListener('cancel', onCancel); + this.request?.removeListener('pause', onPause); + this.request?.removeListener('resume', onResume); + + this.transitionTo(this.STATE.LOGGED_IN); + const sqlRequest = this.request as Request; + this.request = undefined; + if (this.config.options.tdsVersion < '7_2' && sqlRequest.error && this.isSqlBatch) { + this.inTransaction = false; + } + sqlRequest.callback(sqlRequest.error, sqlRequest.rowCount, sqlRequest.rows); + }; + + tokenStreamParser.once('end', onEndOfMessage); + this.request?.once('cancel', onCancel); + })(); } /** @@ -3520,84 +3596,7 @@ Connection.prototype.STATE = { }, SENT_CLIENT_REQUEST: { name: 'SentClientRequest', - enter: function() { - (async () => { - let message; - try { - message = await this.messageIo.readMessage(); - } catch (err: any) { - return this.socketError(err); - } - // request timer is stopped on first data package - this.clearRequestTimer(); - - const tokenStreamParser = this.createTokenStreamParser(message, new RequestTokenHandler(this, this.request!)); - - // If the request was canceled and we have a `cancelTimer` - // defined, we send a attention message after the - // request message was fully sent off. - // - // We already started consuming the current message - // (but all the token handlers should be no-ops), and - // need to ensure the next message is handled by the - // `SENT_ATTENTION` state. - if (this.request?.canceled && this.cancelTimer) { - return this.transitionTo(this.STATE.SENT_ATTENTION); - } - - const onResume = () => { - tokenStreamParser.resume(); - }; - const onPause = () => { - tokenStreamParser.pause(); - - this.request?.once('resume', onResume); - }; - - this.request?.on('pause', onPause); - - if (this.request instanceof Request && this.request.paused) { - onPause(); - } - - const onCancel = () => { - tokenStreamParser.removeListener('end', onEndOfMessage); - - if (this.request instanceof Request && this.request.paused) { - // resume the request if it was paused so we can read the remaining tokens - this.request.resume(); - } - - this.request?.removeListener('pause', onPause); - this.request?.removeListener('resume', onResume); - - // The `_cancelAfterRequestSent` callback will have sent a - // attention message, so now we need to also switch to - // the `SENT_ATTENTION` state to make sure the attention ack - // message is processed correctly. - this.transitionTo(this.STATE.SENT_ATTENTION); - }; - - const onEndOfMessage = () => { - this.request?.removeListener('cancel', this._cancelAfterRequestSent); - this.request?.removeListener('cancel', onCancel); - this.request?.removeListener('pause', onPause); - this.request?.removeListener('resume', onResume); - - this.transitionTo(this.STATE.LOGGED_IN); - const sqlRequest = this.request as Request; - this.request = undefined; - if (this.config.options.tdsVersion < '7_2' && sqlRequest.error && this.isSqlBatch) { - this.inTransaction = false; - } - sqlRequest.callback(sqlRequest.error, sqlRequest.rowCount, sqlRequest.rows); - }; - - tokenStreamParser.once('end', onEndOfMessage); - this.request?.once('cancel', onCancel); - })(); - - }, + enter: function() { }, exit: function(nextState) { this.clearRequestTimer(); }, From c0ad29be3df28f884fdecbac2c964b7c24dcd6e9 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Tue, 1 Nov 2022 15:00:34 +0000 Subject: [PATCH 03/11] refactor: use local variable instead of instance property --- src/connection.ts | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/src/connection.ts b/src/connection.ts index 648e5e4ff..07c101db2 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -3053,7 +3053,7 @@ class Connection extends EventEmitter { // request timer is stopped on first data package this.clearRequestTimer(); - const tokenStreamParser = this.createTokenStreamParser(message, new RequestTokenHandler(this, this.request!)); + const tokenStreamParser = this.createTokenStreamParser(message, new RequestTokenHandler(this, request)); // If the request was canceled and we have a `cancelTimer` // defined, we send a attention message after the @@ -3063,7 +3063,7 @@ class Connection extends EventEmitter { // (but all the token handlers should be no-ops), and // need to ensure the next message is handled by the // `SENT_ATTENTION` state. - if (this.request?.canceled && this.cancelTimer) { + if (request.canceled && this.cancelTimer) { return this.transitionTo(this.STATE.SENT_ATTENTION); } @@ -3073,25 +3073,25 @@ class Connection extends EventEmitter { const onPause = () => { tokenStreamParser.pause(); - this.request?.once('resume', onResume); + request.once('resume', onResume); }; - this.request?.on('pause', onPause); + request.on('pause', onPause); - if (this.request instanceof Request && this.request.paused) { + if (request instanceof Request && request.paused) { onPause(); } const onCancel = () => { tokenStreamParser.removeListener('end', onEndOfMessage); - if (this.request instanceof Request && this.request.paused) { + if (request instanceof Request && request.paused) { // resume the request if it was paused so we can read the remaining tokens - this.request.resume(); + request.resume(); } - this.request?.removeListener('pause', onPause); - this.request?.removeListener('resume', onResume); + request.removeListener('pause', onPause); + request.removeListener('resume', onResume); // The `_cancelAfterRequestSent` callback will have sent a // attention message, so now we need to also switch to @@ -3101,13 +3101,13 @@ class Connection extends EventEmitter { }; const onEndOfMessage = () => { - this.request?.removeListener('cancel', this._cancelAfterRequestSent); - this.request?.removeListener('cancel', onCancel); - this.request?.removeListener('pause', onPause); - this.request?.removeListener('resume', onResume); + request.removeListener('cancel', this._cancelAfterRequestSent); + request.removeListener('cancel', onCancel); + request.removeListener('pause', onPause); + request.removeListener('resume', onResume); this.transitionTo(this.STATE.LOGGED_IN); - const sqlRequest = this.request as Request; + const sqlRequest = request as Request; this.request = undefined; if (this.config.options.tdsVersion < '7_2' && sqlRequest.error && this.isSqlBatch) { this.inTransaction = false; @@ -3116,7 +3116,7 @@ class Connection extends EventEmitter { }; tokenStreamParser.once('end', onEndOfMessage); - this.request?.once('cancel', onCancel); + request.once('cancel', onCancel); })(); } From dd443e1f2e83b89ba2903628ed9783da3d4bfb3c Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Tue, 1 Nov 2022 22:07:59 +0000 Subject: [PATCH 04/11] refactor: pull up callback handling --- src/connection.ts | 97 +++++++++++++++++++++++++----- test/integration/bulk-load-test.js | 14 ++--- 2 files changed, 89 insertions(+), 22 deletions(-) diff --git a/src/connection.ts b/src/connection.ts index 07c101db2..eb4a7827c 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -2454,7 +2454,13 @@ class Connection extends EventEmitter { * @param request A [[Request]] object representing the request. */ execSqlBatch(request: Request) { - this.makeRequest(request, TYPE.SQL_BATCH, new SqlBatchPayload(request.sqlTextOrProcedure!, this.currentTransactionDescriptor(), this.config.options)); + this.makeRequest(request, TYPE.SQL_BATCH, new SqlBatchPayload(request.sqlTextOrProcedure!, this.currentTransactionDescriptor(), this.config.options), (err) => { + if (err) { + request.callback(err); + } else { + request.callback(undefined, request.rowCount, request.rows); + } + }); } /** @@ -2512,7 +2518,13 @@ class Connection extends EventEmitter { parameters.push(...request.parameters); } - this.makeRequest(request, TYPE.RPC_REQUEST, new RpcRequestPayload('sp_executesql', parameters, this.currentTransactionDescriptor(), this.config.options, this.databaseCollation)); + this.makeRequest(request, TYPE.RPC_REQUEST, new RpcRequestPayload('sp_executesql', parameters, this.currentTransactionDescriptor(), this.config.options, this.databaseCollation), (err) => { + if (err) { + request.callback(err); + } else { + request.callback(undefined, request.rowCount, request.rows); + } + }); } /** @@ -2628,7 +2640,13 @@ class Connection extends EventEmitter { return; } - this.makeRequest(bulkLoad, TYPE.BULK_LOAD, payload); + this.makeRequest(bulkLoad, TYPE.BULK_LOAD, payload, (err) => { + if (err) { + bulkLoad.callback(err); + } else { + bulkLoad.callback(undefined, bulkLoad.rowCount); + } + }); }); bulkLoad.once('cancel', onCancel); @@ -2688,7 +2706,13 @@ class Connection extends EventEmitter { } }); - this.makeRequest(request, TYPE.RPC_REQUEST, new RpcRequestPayload('sp_prepare', parameters, this.currentTransactionDescriptor(), this.config.options, this.databaseCollation)); + this.makeRequest(request, TYPE.RPC_REQUEST, new RpcRequestPayload('sp_prepare', parameters, this.currentTransactionDescriptor(), this.config.options, this.databaseCollation), (err) => { + if (err) { + request.callback(err); + } else { + request.callback(undefined, request.rowCount, request.rows); + } + }); } /** @@ -2712,7 +2736,13 @@ class Connection extends EventEmitter { scale: undefined }); - this.makeRequest(request, TYPE.RPC_REQUEST, new RpcRequestPayload('sp_unprepare', parameters, this.currentTransactionDescriptor(), this.config.options, this.databaseCollation)); + this.makeRequest(request, TYPE.RPC_REQUEST, new RpcRequestPayload('sp_unprepare', parameters, this.currentTransactionDescriptor(), this.config.options, this.databaseCollation), (err) => { + if (err) { + request.callback(err); + } else { + request.callback(undefined, request.rowCount, request.rows); + } + }); } /** @@ -2758,7 +2788,13 @@ class Connection extends EventEmitter { return; } - this.makeRequest(request, TYPE.RPC_REQUEST, new RpcRequestPayload('sp_execute', executeParameters, this.currentTransactionDescriptor(), this.config.options, this.databaseCollation)); + this.makeRequest(request, TYPE.RPC_REQUEST, new RpcRequestPayload('sp_execute', executeParameters, this.currentTransactionDescriptor(), this.config.options, this.databaseCollation), (err) => { + if (err) { + request.callback(err); + } else { + request.callback(undefined, request.rowCount, request.rows); + } + }); } /** @@ -2780,7 +2816,13 @@ class Connection extends EventEmitter { return; } - this.makeRequest(request, TYPE.RPC_REQUEST, new RpcRequestPayload(request.sqlTextOrProcedure!, request.parameters, this.currentTransactionDescriptor(), this.config.options, this.databaseCollation)); + this.makeRequest(request, TYPE.RPC_REQUEST, new RpcRequestPayload(request.sqlTextOrProcedure!, request.parameters, this.currentTransactionDescriptor(), this.config.options, this.databaseCollation), (err) => { + if (err) { + request.callback(err); + } else { + request.callback(undefined, request.rowCount, request.rows); + } + }); } /** @@ -2819,7 +2861,13 @@ class Connection extends EventEmitter { const request = new Request(undefined, (err) => { return callback(err, this.currentTransactionDescriptor()); }); - return this.makeRequest(request, TYPE.TRANSACTION_MANAGER, transaction.beginPayload(this.currentTransactionDescriptor())); + return this.makeRequest(request, TYPE.TRANSACTION_MANAGER, transaction.beginPayload(this.currentTransactionDescriptor()), (err) => { + if (err) { + request.callback(err); + } else { + request.callback(undefined, request.rowCount, request.rows); + } + }); } /** @@ -2845,7 +2893,13 @@ class Connection extends EventEmitter { })); } const request = new Request(undefined, callback); - return this.makeRequest(request, TYPE.TRANSACTION_MANAGER, transaction.commitPayload(this.currentTransactionDescriptor())); + return this.makeRequest(request, TYPE.TRANSACTION_MANAGER, transaction.commitPayload(this.currentTransactionDescriptor()), (err) => { + if (err) { + request.callback(err); + } else { + request.callback(undefined, request.rowCount, request.rows); + } + }); } /** @@ -2871,7 +2925,13 @@ class Connection extends EventEmitter { })); } const request = new Request(undefined, callback); - return this.makeRequest(request, TYPE.TRANSACTION_MANAGER, transaction.rollbackPayload(this.currentTransactionDescriptor())); + return this.makeRequest(request, TYPE.TRANSACTION_MANAGER, transaction.rollbackPayload(this.currentTransactionDescriptor()), (err) => { + if (err) { + request.callback(err); + } else { + request.callback(undefined, request.rowCount, request.rows); + } + }); } /** @@ -2894,7 +2954,13 @@ class Connection extends EventEmitter { })); } const request = new Request(undefined, callback); - return this.makeRequest(request, TYPE.TRANSACTION_MANAGER, transaction.savePayload(this.currentTransactionDescriptor())); + return this.makeRequest(request, TYPE.TRANSACTION_MANAGER, transaction.savePayload(this.currentTransactionDescriptor()), (err) => { + if (err) { + request.callback(err); + } else { + request.callback(undefined, request.rowCount, request.rows); + } + }); } /** @@ -2974,16 +3040,16 @@ class Connection extends EventEmitter { /** * @private */ - makeRequest(request: Request | BulkLoad, packetType: number, payload: (Iterable | AsyncIterable) & { toString: (indent?: string) => string }) { + makeRequest(request: Request | BulkLoad, packetType: number, payload: (Iterable | AsyncIterable) & { toString: (indent?: string) => string }, callback: (err: Error | undefined | null) => void) { if (this.state !== this.STATE.LOGGED_IN) { const message = 'Requests can only be made in the ' + this.STATE.LOGGED_IN.name + ' state, not the ' + this.state.name + ' state'; this.debug.log(message); - return request.callback(new RequestError(message, 'EINVALIDSTATE')); + return callback(new RequestError(message, 'EINVALIDSTATE')); } if (request.canceled) { return process.nextTick(() => { - request.callback(new RequestError('Canceled.', 'ECANCEL')); + callback(new RequestError('Canceled.', 'ECANCEL')); }); } @@ -3112,7 +3178,8 @@ class Connection extends EventEmitter { if (this.config.options.tdsVersion < '7_2' && sqlRequest.error && this.isSqlBatch) { this.inTransaction = false; } - sqlRequest.callback(sqlRequest.error, sqlRequest.rowCount, sqlRequest.rows); + + callback(sqlRequest.error); }; tokenStreamParser.once('end', onEndOfMessage); diff --git a/test/integration/bulk-load-test.js b/test/integration/bulk-load-test.js index b14ec7892..196b928fc 100644 --- a/test/integration/bulk-load-test.js +++ b/test/integration/bulk-load-test.js @@ -156,7 +156,7 @@ describe('BulkLoad', function() { const bulkLoad = connection.newBulkLoad('#tmpTestTable3', { checkConstraints: true }, (err, rowCount) => { assert.ok(err, 'An error should have been thrown to indicate the conflict with the CHECK constraint.'); - assert.strictEqual(rowCount, 0); + assert.isUndefined(rowCount); done(); }); @@ -378,7 +378,7 @@ describe('BulkLoad', function() { assert.instanceOf(err, RequestError); assert.strictEqual(/** @type {RequestError} */(err).message, expectedMessage); - assert.strictEqual(rowCount, 0); + assert.isUndefined(rowCount); done(); }); @@ -863,7 +863,7 @@ describe('BulkLoad', function() { const bulkLoad = connection.newBulkLoad('#tmpTestTable', (err, rowCount) => { assert.strictEqual(err, expectedError); - assert.strictEqual(rowCount, 0); + assert.isUndefined(rowCount); /** @type {unknown[]} */ const results = []; @@ -1156,7 +1156,7 @@ describe('BulkLoad', function() { assert.instanceOf(err, RequestError); assert.strictEqual(/** @type {RequestError} */(err).message, 'Canceled.'); - assert.strictEqual(rowCount, 0); + assert.isUndefined(rowCount); startVerifyTableContent(); } @@ -1201,7 +1201,7 @@ describe('BulkLoad', function() { assert.instanceOf(err, RequestError); assert.strictEqual(/** @type {RequestError} */(err).message, 'Canceled.'); - assert.strictEqual(rowCount, 0); + assert.isUndefined(rowCount); }); bulkLoad.addColumn('i', TYPES.Int, { nullable: false }); @@ -1341,7 +1341,7 @@ describe('BulkLoad', function() { assert.instanceOf(err, RequestError); assert.strictEqual(/** @type {RequestError} */(err).message, 'Timeout: Request failed to complete in 200ms'); - assert.strictEqual(rowCount, 0); + assert.isUndefined(rowCount); done(); } @@ -1548,7 +1548,7 @@ describe('BulkLoad', function() { assert.instanceOf(err, TypeError); assert.strictEqual(/** @type {TypeError} */(err).message, 'Invalid date.'); - assert.strictEqual(rowCount, 0); + assert.isUndefined(rowCount); /** * @type {unknown[]} From af32656a892adbce8cbeb141840a4fab8390d219 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Thu, 3 Nov 2022 10:03:53 +0000 Subject: [PATCH 05/11] refactor: remove token stream parser wrapper --- src/connection.ts | 123 +++++++++----------- src/token/token-stream-parser.ts | 46 -------- test/unit/token/stream-parser-test.js | 68 +++++++++++ test/unit/token/token-stream-parser-test.js | 52 --------- 4 files changed, 120 insertions(+), 169 deletions(-) delete mode 100644 src/token/token-stream-parser.ts create mode 100644 test/unit/token/stream-parser-test.js delete mode 100644 test/unit/token/token-stream-parser-test.js diff --git a/src/connection.ts b/src/connection.ts index eb4a7827c..ff23dfd10 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -28,7 +28,7 @@ import Request from './request'; import RpcRequestPayload from './rpcrequest-payload'; import SqlBatchPayload from './sqlbatch-payload'; import MessageIO from './message-io'; -import { Parser as TokenStreamParser } from './token/token-stream-parser'; +import StreamParser from './token/stream-parser'; import { Transaction, ISOLATION_LEVEL, assertValidIsolationLevel } from './transaction'; import { ConnectionError, RequestError } from './errors'; import { connectInParallel, connectInSequence } from './connector'; @@ -47,7 +47,7 @@ import { Collation } from './collation'; import AggregateError from 'es-aggregate-error'; import { version } from '../package.json'; import { URL } from 'url'; -import { AttentionTokenHandler, InitialSqlTokenHandler, Login7TokenHandler, RequestTokenHandler, TokenHandler } from './token/handler'; +import { AttentionTokenHandler, InitialSqlTokenHandler, Login7TokenHandler, RequestTokenHandler } from './token/handler'; type BeginTransactionCallback = /** @@ -1949,13 +1949,6 @@ class Connection extends EventEmitter { return debug; } - /** - * @private - */ - createTokenStreamParser(message: Message, handler: TokenHandler) { - return new TokenStreamParser(message, this.debug, handler, this.config.options); - } - connectOnPort(port: number, multiSubnetFailover: boolean, signal: AbortSignal) { const connectOpts = { host: this.routingData ? this.routingData.server : this.config.server, @@ -3119,7 +3112,32 @@ class Connection extends EventEmitter { // request timer is stopped on first data package this.clearRequestTimer(); - const tokenStreamParser = this.createTokenStreamParser(message, new RequestTokenHandler(this, request)); + const onCancel = () => { + if (request instanceof Request && request.paused) { + // resume the request if it was paused so we can read the remaining tokens + request.resume(); + } + }; + + request.once('cancel', onCancel); + + if (request instanceof Request && request.paused) { + await once(request, 'resume'); + } + + const handler = new RequestTokenHandler(this, request); + for await (const token of StreamParser.parseTokens(message, this.debug, this.config.options)) { + if (request instanceof Request && request.paused) { + await once(request, 'resume'); + } + + if (!request.canceled) { + handler[token.handlerName](token as any); + } + } + + request.removeListener('cancel', this._cancelAfterRequestSent); + request.removeListener('cancel', onCancel); // If the request was canceled and we have a `cancelTimer` // defined, we send a attention message after the @@ -3133,57 +3151,14 @@ class Connection extends EventEmitter { return this.transitionTo(this.STATE.SENT_ATTENTION); } - const onResume = () => { - tokenStreamParser.resume(); - }; - const onPause = () => { - tokenStreamParser.pause(); - - request.once('resume', onResume); - }; - - request.on('pause', onPause); - - if (request instanceof Request && request.paused) { - onPause(); + this.transitionTo(this.STATE.LOGGED_IN); + const sqlRequest = request as Request; + this.request = undefined; + if (this.config.options.tdsVersion < '7_2' && sqlRequest.error && this.isSqlBatch) { + this.inTransaction = false; } - const onCancel = () => { - tokenStreamParser.removeListener('end', onEndOfMessage); - - if (request instanceof Request && request.paused) { - // resume the request if it was paused so we can read the remaining tokens - request.resume(); - } - - request.removeListener('pause', onPause); - request.removeListener('resume', onResume); - - // The `_cancelAfterRequestSent` callback will have sent a - // attention message, so now we need to also switch to - // the `SENT_ATTENTION` state to make sure the attention ack - // message is processed correctly. - this.transitionTo(this.STATE.SENT_ATTENTION); - }; - - const onEndOfMessage = () => { - request.removeListener('cancel', this._cancelAfterRequestSent); - request.removeListener('cancel', onCancel); - request.removeListener('pause', onPause); - request.removeListener('resume', onResume); - - this.transitionTo(this.STATE.LOGGED_IN); - const sqlRequest = request as Request; - this.request = undefined; - if (this.config.options.tdsVersion < '7_2' && sqlRequest.error && this.isSqlBatch) { - this.inTransaction = false; - } - - callback(sqlRequest.error); - }; - - tokenStreamParser.once('end', onEndOfMessage); - request.once('cancel', onCancel); + callback(sqlRequest.error); })(); } @@ -3411,9 +3386,9 @@ Connection.prototype.STATE = { } const handler = new Login7TokenHandler(this); - const tokenStreamParser = this.createTokenStreamParser(message, handler); - - await once(tokenStreamParser, 'end'); + for await (const token of StreamParser.parseTokens(message, this.debug, this.config.options)) { + handler[token.handlerName](token as any); + } if (handler.loginAckReceived) { if (handler.routingData) { @@ -3462,9 +3437,9 @@ Connection.prototype.STATE = { } const handler = new Login7TokenHandler(this); - const tokenStreamParser = this.createTokenStreamParser(message, handler); - - await once(tokenStreamParser, 'end'); + for await (const token of StreamParser.parseTokens(message, this.debug, this.config.options)) { + handler[token.handlerName](token as any); + } if (handler.loginAckReceived) { if (handler.routingData) { @@ -3530,8 +3505,10 @@ Connection.prototype.STATE = { } const handler = new Login7TokenHandler(this); - const tokenStreamParser = this.createTokenStreamParser(message, handler); - await once(tokenStreamParser, 'end'); + for await (const token of StreamParser.parseTokens(message, this.debug, this.config.options)) { + handler[token.handlerName](token as any); + } + if (handler.loginAckReceived) { if (handler.routingData) { this.routingData = handler.routingData; @@ -3632,8 +3609,11 @@ Connection.prototype.STATE = { } catch (err: any) { return this.socketError(err); } - const tokenStreamParser = this.createTokenStreamParser(message, new InitialSqlTokenHandler(this)); - await once(tokenStreamParser, 'end'); + + const handler = new InitialSqlTokenHandler(this); + for await (const token of StreamParser.parseTokens(message, this.debug, this.config.options)) { + handler[token.handlerName](token as any); + } this.transitionTo(this.STATE.LOGGED_IN); this.processedInitialSql(); @@ -3689,9 +3669,10 @@ Connection.prototype.STATE = { } const handler = new AttentionTokenHandler(this, this.request!); - const tokenStreamParser = this.createTokenStreamParser(message, handler); + for await (const token of StreamParser.parseTokens(message, this.debug, this.config.options)) { + handler[token.handlerName](token as any); + } - await once(tokenStreamParser, 'end'); // 3.2.5.7 Sent Attention State // Discard any data contained in the response, until we receive the attention response if (handler.attentionReceived) { diff --git a/src/token/token-stream-parser.ts b/src/token/token-stream-parser.ts deleted file mode 100644 index a95258f8d..000000000 --- a/src/token/token-stream-parser.ts +++ /dev/null @@ -1,46 +0,0 @@ -import { EventEmitter } from 'events'; -import StreamParser, { ParserOptions } from './stream-parser'; -import Debug from '../debug'; -import { Token } from './token'; -import { Readable } from 'stream'; -import Message from '../message'; -import { TokenHandler } from './handler'; - -export class Parser extends EventEmitter { - debug: Debug; - options: ParserOptions; - parser: Readable; - - constructor(message: Message, debug: Debug, handler: TokenHandler, options: ParserOptions) { - super(); - - this.debug = debug; - this.options = options; - - this.parser = Readable.from(StreamParser.parseTokens(message, this.debug, this.options)); - this.parser.on('data', (token: Token) => { - handler[token.handlerName as keyof TokenHandler](token as any); - }); - - this.parser.on('drain', () => { - this.emit('drain'); - }); - - this.parser.on('end', () => { - this.emit('end'); - }); - } - - declare on: ( - ((event: 'end', listener: () => void) => this) & - ((event: string | symbol, listener: (...args: any[]) => void) => this) - ); - - pause() { - return this.parser.pause(); - } - - resume() { - return this.parser.resume(); - } -} diff --git a/test/unit/token/stream-parser-test.js b/test/unit/token/stream-parser-test.js new file mode 100644 index 000000000..01da854bd --- /dev/null +++ b/test/unit/token/stream-parser-test.js @@ -0,0 +1,68 @@ +// @ts-check + +import Debug from '../../../src/debug'; +import Parser from '../../../src/token/stream-parser'; +import { TYPE } from '../../../src/token/token'; +import WritableTrackingBuffer from '../../../src/tracking-buffer/writable-tracking-buffer'; +import { assert } from 'chai'; + +const debug = new Debug({ token: true }); + +function createDbChangeBuffer() { + var oldDb = 'old'; + var newDb = 'new'; + var buffer = new WritableTrackingBuffer(50, 'ucs2'); + + buffer.writeUInt8(TYPE.ENVCHANGE); + buffer.writeUInt16LE(0); // Length written later + buffer.writeUInt8(0x01); // Database + buffer.writeUInt8(newDb.length); + buffer.writeString(newDb); + buffer.writeUInt8(oldDb.length); + buffer.writeString(oldDb); + + buffer.data.writeUInt16LE(buffer.data.length - (1 + 2), 1); + // console.log(buffer) + + return buffer.data; +} + +describe('Token Stream Parser', () => { + it('should envChange', async function() { + const buffer = createDbChangeBuffer(); + const options = { + useUTC: false, + lowerCaseGuids: true, + tdsVersion: '7_4', + useColumnNames: false, + columnNameReplacer: undefined, + camelCaseColumns: false + }; + + const tokens = []; + for await (const token of Parser.parseTokens([buffer], debug, options)) { + tokens.push(token); + } + + assert.lengthOf(tokens, 1); + }); + + it('should split token across buffers', async function() { + const buffer = createDbChangeBuffer(); + const options = { + useUTC: false, + lowerCaseGuids: true, + tdsVersion: '7_4', + useColumnNames: false, + columnNameReplacer: undefined, + camelCaseColumns: false + }; + + const tokens = []; + for await (const token of Parser.parseTokens([buffer.slice(0, 6), buffer.slice(6)], debug, options)) { + tokens.push(token); + } + + assert.lengthOf(tokens, 1); + }); +}); diff --git a/test/unit/token/token-stream-parser-test.js b/test/unit/token/token-stream-parser-test.js deleted file mode 100644 index 4aa9dc664..000000000 --- a/test/unit/token/token-stream-parser-test.js +++ /dev/null @@ -1,52 +0,0 @@ -var Debug = require('../../../src/debug'); -var Parser = require('../../../src/token/token-stream-parser').Parser; -var TYPE = require('../../../src/token/token').TYPE; -var WritableTrackingBuffer = require('../../../src/tracking-buffer/writable-tracking-buffer'); -const assert = require('chai').assert; - -var debug = new Debug({ token: true }); - -function createDbChangeBuffer() { - var oldDb = 'old'; - var newDb = 'new'; - var buffer = new WritableTrackingBuffer(50, 'ucs2'); - - buffer.writeUInt8(TYPE.ENVCHANGE); - buffer.writeUInt16LE(0); // Length written later - buffer.writeUInt8(0x01); // Database - buffer.writeUInt8(newDb.length); - buffer.writeString(newDb); - buffer.writeUInt8(oldDb.length); - buffer.writeString(oldDb); - - buffer.data.writeUInt16LE(buffer.data.length - (1 + 2), 1); - // console.log(buffer) - - return buffer.data; -} - -describe('Token Stream Parser', () => { - it('should envChange', (done) => { - const buffer = createDbChangeBuffer(); - - const parser = new Parser([buffer], debug, { - onDatabaseChange: function(token) { - assert.isOk(token); - } - }); - - parser.on('end', done); - }); - - it('should split token across buffers', (done) => { - const buffer = createDbChangeBuffer(); - - const parser = new Parser([buffer.slice(0, 6), buffer.slice(6)], debug, { - onDatabaseChange: function(token) { - assert.isOk(token); - } - }); - - parser.on('end', done); - }); -}); From cefabbba5ad83df0e8c32c5d2005ede1d9e1d21b Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Thu, 3 Nov 2022 10:06:35 +0000 Subject: [PATCH 06/11] refactor: remove no longer required instance property --- src/connection.ts | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/src/connection.ts b/src/connection.ts index ff23dfd10..557c86311 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -1006,11 +1006,6 @@ class Connection extends EventEmitter { */ retryTimer: undefined | NodeJS.Timeout; - /** - * @private - */ - _cancelAfterRequestSent: () => void; - /** * @private */ @@ -1707,11 +1702,6 @@ class Connection extends EventEmitter { this.transientErrorLookup = new TransientErrorLookup(); this.state = this.STATE.INITIALIZED; - - this._cancelAfterRequestSent = () => { - this.messageIo.sendMessage(TYPE.ATTENTION); - this.createCancelTimer(); - }; } connect(connectListener?: (err?: Error) => void) { @@ -3080,9 +3070,14 @@ class Connection extends EventEmitter { this.messageIo.outgoingMessageStream.write(message); this.transitionTo(this.STATE.SENT_CLIENT_REQUEST); + const onCancelAfterRequestSent = () => { + this.messageIo.sendMessage(TYPE.ATTENTION); + this.createCancelTimer(); + }; + message.once('finish', () => { request.removeListener('cancel', onCancel); - request.once('cancel', this._cancelAfterRequestSent); + request.once('cancel', onCancelAfterRequestSent); this.resetConnectionOnNextRequest = false; this.debug.payload(function() { @@ -3136,7 +3131,7 @@ class Connection extends EventEmitter { } } - request.removeListener('cancel', this._cancelAfterRequestSent); + request.removeListener('cancel', onCancelAfterRequestSent); request.removeListener('cancel', onCancel); // If the request was canceled and we have a `cancelTimer` From 2d9e7fc6e3879d2ac22ca37d0b8404561d842c04 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Thu, 3 Nov 2022 11:33:06 +0000 Subject: [PATCH 07/11] refactor: move request cancellation logic to `Connection#makeRequest` --- src/connection.ts | 160 ++++++++++++++++++++++------------------------ 1 file changed, 77 insertions(+), 83 deletions(-) diff --git a/src/connection.ts b/src/connection.ts index 557c86311..c89b7879d 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -3070,14 +3070,8 @@ class Connection extends EventEmitter { this.messageIo.outgoingMessageStream.write(message); this.transitionTo(this.STATE.SENT_CLIENT_REQUEST); - const onCancelAfterRequestSent = () => { - this.messageIo.sendMessage(TYPE.ATTENTION); - this.createCancelTimer(); - }; - message.once('finish', () => { request.removeListener('cancel', onCancel); - request.once('cancel', onCancelAfterRequestSent); this.resetConnectionOnNextRequest = false; this.debug.payload(function() { @@ -3098,63 +3092,98 @@ class Connection extends EventEmitter { payloadStream.pipe(message); (async () => { - let message; - try { - message = await this.messageIo.readMessage(); - } catch (err: any) { - return this.socketError(err); - } - // request timer is stopped on first data package - this.clearRequestTimer(); + const onCancelAfterRequestSent = () => { + this.messageIo.sendMessage(TYPE.ATTENTION); + this.createCancelTimer(); + this.transitionTo(this.STATE.SENT_ATTENTION); + }; - const onCancel = () => { - if (request instanceof Request && request.paused) { - // resume the request if it was paused so we can read the remaining tokens - request.resume(); + request.once('cancel', onCancelAfterRequestSent); + try { + let message; + try { + message = await this.messageIo.readMessage(); + } catch (err: any) { + return this.socketError(err); } - }; + // request timer is stopped on first data package + this.clearRequestTimer(); + + const onCancel = () => { + if (request instanceof Request && request.paused) { + // resume the request if it was paused so we can read the remaining tokens + request.resume(); + } + }; - request.once('cancel', onCancel); + request.once('cancel', onCancel); + try { + if (request instanceof Request && request.paused) { + await once(request, 'resume'); + } - if (request instanceof Request && request.paused) { - await once(request, 'resume'); - } + const handler = new RequestTokenHandler(this, request); + for await (const token of StreamParser.parseTokens(message, this.debug, this.config.options)) { + // If the request was canceled, we discard any data contained in the response. + if (!request.canceled) { + if (request instanceof Request && request.paused) { + await once(request, 'resume'); + } - const handler = new RequestTokenHandler(this, request); - for await (const token of StreamParser.parseTokens(message, this.debug, this.config.options)) { - if (request instanceof Request && request.paused) { - await once(request, 'resume'); + handler[token.handlerName](token as any); + } + } + } finally { + request.removeListener('cancel', onCancel); } - if (!request.canceled) { - handler[token.handlerName](token as any); - } - } + if (request.canceled) { + // 3.2.5.7 Sent Attention State + while (true) { + try { + message = await this.messageIo.readMessage(); + } catch (err: any) { + return this.socketError(err); + } - request.removeListener('cancel', onCancelAfterRequestSent); - request.removeListener('cancel', onCancel); + const handler = new AttentionTokenHandler(this, request); + for await (const token of StreamParser.parseTokens(message, this.debug, this.config.options)) { + try { + handler[token.handlerName](token as any); + } catch (err) { + console.log(err); + throw err; + } + } - // If the request was canceled and we have a `cancelTimer` - // defined, we send a attention message after the - // request message was fully sent off. - // - // We already started consuming the current message - // (but all the token handlers should be no-ops), and - // need to ensure the next message is handled by the - // `SENT_ATTENTION` state. - if (request.canceled && this.cancelTimer) { - return this.transitionTo(this.STATE.SENT_ATTENTION); + if (handler.attentionReceived) { + this.clearCancelTimer(); + + if (!request.error || !(request.error instanceof RequestError) || request.error.code !== 'ETIMEOUT') { + request.error = new RequestError('Canceled.', 'ECANCEL'); + } + + break; + } + } + } + } finally { + request.removeListener('cancel', onCancelAfterRequestSent); } this.transitionTo(this.STATE.LOGGED_IN); - const sqlRequest = request as Request; this.request = undefined; - if (this.config.options.tdsVersion < '7_2' && sqlRequest.error && this.isSqlBatch) { + + if (this.config.options.tdsVersion < '7_2' && request.error && this.isSqlBatch) { this.inTransaction = false; } - callback(sqlRequest.error); - })(); + callback(request.error); + })().catch((err) => { + process.nextTick(() => { + throw err; + }); + }); } /** @@ -3654,42 +3683,7 @@ Connection.prototype.STATE = { }, SENT_ATTENTION: { name: 'SentAttention', - enter: function() { - (async () => { - let message; - try { - message = await this.messageIo.readMessage(); - } catch (err: any) { - return this.socketError(err); - } - - const handler = new AttentionTokenHandler(this, this.request!); - for await (const token of StreamParser.parseTokens(message, this.debug, this.config.options)) { - handler[token.handlerName](token as any); - } - - // 3.2.5.7 Sent Attention State - // Discard any data contained in the response, until we receive the attention response - if (handler.attentionReceived) { - this.clearCancelTimer(); - - const sqlRequest = this.request!; - this.request = undefined; - this.transitionTo(this.STATE.LOGGED_IN); - - if (sqlRequest.error && sqlRequest.error instanceof RequestError && sqlRequest.error.code === 'ETIMEOUT') { - sqlRequest.callback(sqlRequest.error); - } else { - sqlRequest.callback(new RequestError('Canceled.', 'ECANCEL')); - } - } - - })().catch((err) => { - process.nextTick(() => { - throw err; - }); - }); - }, + enter: function() { }, events: { socketError: function(err) { const sqlRequest = this.request!; From 20bfb255b93458b8bf5215dd83079fe0ba93ea08 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Thu, 3 Nov 2022 13:05:22 +0000 Subject: [PATCH 08/11] refactor: make request sending in `Connection#makeRequest` use `async`/`await` as well --- src/connection.ts | 69 ++++++++++++++++++++++++----------------------- 1 file changed, 35 insertions(+), 34 deletions(-) diff --git a/src/connection.ts b/src/connection.ts index c89b7879d..79d4c2e48 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -3048,52 +3048,53 @@ class Connection extends EventEmitter { request.rows! = []; request.rst! = []; - const onCancel = () => { - payloadStream.unpipe(message); - payloadStream.destroy(new RequestError('Canceled.', 'ECANCEL')); - - // set the ignore bit and end the message. - message.ignore = true; - message.end(); - - if (request instanceof Request && request.paused) { - // resume the request if it was paused so we can read the remaining tokens - request.resume(); - } - }; - - request.once('cancel', onCancel); - this.createRequestTimer(); const message = new Message({ type: packetType, resetConnection: this.resetConnectionOnNextRequest }); this.messageIo.outgoingMessageStream.write(message); this.transitionTo(this.STATE.SENT_CLIENT_REQUEST); - message.once('finish', () => { - request.removeListener('cancel', onCancel); + (async () => { + { + const payloadStream = Readable.from(payload); - this.resetConnectionOnNextRequest = false; - this.debug.payload(function() { - return payload!.toString(' '); - }); - }); + const onCancel = () => { + payloadStream.destroy(new RequestError('Canceled.', 'ECANCEL')); + }; + + request.once('cancel', onCancel); + + try { + for await (const chunk of payloadStream) { + if (message.write(chunk) === false) { + // TODO: Handle request cancellation while waiting for 'drain' event + await once(message, 'drain'); + } + } + } catch (error) { + request.error ??= error as Error; + message.ignore = true; - const payloadStream = Readable.from(payload); - payloadStream.once('error', (error) => { - payloadStream.unpipe(message); + if (request instanceof Request && request.paused) { + // resume the request if it was paused so we can read the remaining tokens + request.resume(); + } + } finally { + request.removeListener('cancel', onCancel); + } - // Only set a request error if no error was set yet. - request.error ??= error; + message.end(); - message.ignore = true; - message.end(); - }); - payloadStream.pipe(message); + this.resetConnectionOnNextRequest = false; + this.debug.payload(function() { + return payload!.toString(' '); + }); + } - (async () => { + let waitForAttentionResponse = false; const onCancelAfterRequestSent = () => { this.messageIo.sendMessage(TYPE.ATTENTION); + waitForAttentionResponse = true; this.createCancelTimer(); this.transitionTo(this.STATE.SENT_ATTENTION); }; @@ -3137,7 +3138,7 @@ class Connection extends EventEmitter { request.removeListener('cancel', onCancel); } - if (request.canceled) { + if (waitForAttentionResponse) { // 3.2.5.7 Sent Attention State while (true) { try { From 0f2b2698f22611fe28e933f3269187c710ea7218 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Thu, 3 Nov 2022 15:20:32 +0000 Subject: [PATCH 09/11] refactor: convert `Connection#makeRequest` to an async function --- src/connection.ts | 376 ++++++++++++++++++++++++++++------------------ 1 file changed, 226 insertions(+), 150 deletions(-) diff --git a/src/connection.ts b/src/connection.ts index 79d4c2e48..f0a73cad6 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -2437,12 +2437,20 @@ class Connection extends EventEmitter { * @param request A [[Request]] object representing the request. */ execSqlBatch(request: Request) { - this.makeRequest(request, TYPE.SQL_BATCH, new SqlBatchPayload(request.sqlTextOrProcedure!, this.currentTransactionDescriptor(), this.config.options), (err) => { - if (err) { - request.callback(err); + this.makeRequest(request, TYPE.SQL_BATCH, new SqlBatchPayload(request.sqlTextOrProcedure!, this.currentTransactionDescriptor(), this.config.options)).then(() => { + if (request.error) { + process.nextTick(() => { + request.callback(request.error); + }); } else { - request.callback(undefined, request.rowCount, request.rows); + process.nextTick(() => { + request.callback(undefined, request.rowCount, request.rows); + }); } + }, (err) => { + process.nextTick(() => { + throw err; + }); }); } @@ -2501,12 +2509,20 @@ class Connection extends EventEmitter { parameters.push(...request.parameters); } - this.makeRequest(request, TYPE.RPC_REQUEST, new RpcRequestPayload('sp_executesql', parameters, this.currentTransactionDescriptor(), this.config.options, this.databaseCollation), (err) => { - if (err) { - request.callback(err); + this.makeRequest(request, TYPE.RPC_REQUEST, new RpcRequestPayload('sp_executesql', parameters, this.currentTransactionDescriptor(), this.config.options, this.databaseCollation)).then(() => { + if (request.error) { + process.nextTick(() => { + request.callback(request.error); + }); } else { - request.callback(undefined, request.rowCount, request.rows); + process.nextTick(() => { + request.callback(undefined, request.rowCount, request.rows); + }); } + }, (err) => { + process.nextTick(() => { + throw err; + }); }); } @@ -2623,12 +2639,20 @@ class Connection extends EventEmitter { return; } - this.makeRequest(bulkLoad, TYPE.BULK_LOAD, payload, (err) => { - if (err) { - bulkLoad.callback(err); + this.makeRequest(bulkLoad, TYPE.BULK_LOAD, payload).then(() => { + if (bulkLoad.error) { + process.nextTick(() => { + bulkLoad.callback(bulkLoad.error); + }); } else { - bulkLoad.callback(undefined, bulkLoad.rowCount); + process.nextTick(() => { + bulkLoad.callback(undefined, bulkLoad.rowCount); + }); } + }, (err) => { + process.nextTick(() => { + throw err; + }); }); }); @@ -2689,12 +2713,20 @@ class Connection extends EventEmitter { } }); - this.makeRequest(request, TYPE.RPC_REQUEST, new RpcRequestPayload('sp_prepare', parameters, this.currentTransactionDescriptor(), this.config.options, this.databaseCollation), (err) => { - if (err) { - request.callback(err); + this.makeRequest(request, TYPE.RPC_REQUEST, new RpcRequestPayload('sp_prepare', parameters, this.currentTransactionDescriptor(), this.config.options, this.databaseCollation)).then(() => { + if (request.error) { + process.nextTick(() => { + request.callback(request.error); + }); } else { - request.callback(undefined, request.rowCount, request.rows); + process.nextTick(() => { + request.callback(undefined, request.rowCount, request.rows); + }); } + }, (err) => { + process.nextTick(() => { + throw err; + }); }); } @@ -2719,12 +2751,20 @@ class Connection extends EventEmitter { scale: undefined }); - this.makeRequest(request, TYPE.RPC_REQUEST, new RpcRequestPayload('sp_unprepare', parameters, this.currentTransactionDescriptor(), this.config.options, this.databaseCollation), (err) => { - if (err) { - request.callback(err); + this.makeRequest(request, TYPE.RPC_REQUEST, new RpcRequestPayload('sp_unprepare', parameters, this.currentTransactionDescriptor(), this.config.options, this.databaseCollation)).then(() => { + if (request.error) { + process.nextTick(() => { + request.callback(request.error); + }); } else { - request.callback(undefined, request.rowCount, request.rows); + process.nextTick(() => { + request.callback(undefined, request.rowCount, request.rows); + }); } + }, (err) => { + process.nextTick(() => { + throw err; + }); }); } @@ -2771,12 +2811,20 @@ class Connection extends EventEmitter { return; } - this.makeRequest(request, TYPE.RPC_REQUEST, new RpcRequestPayload('sp_execute', executeParameters, this.currentTransactionDescriptor(), this.config.options, this.databaseCollation), (err) => { - if (err) { - request.callback(err); + this.makeRequest(request, TYPE.RPC_REQUEST, new RpcRequestPayload('sp_execute', executeParameters, this.currentTransactionDescriptor(), this.config.options, this.databaseCollation)).then(() => { + if (request.error) { + process.nextTick(() => { + request.callback(request.error); + }); } else { - request.callback(undefined, request.rowCount, request.rows); + process.nextTick(() => { + request.callback(undefined, request.rowCount, request.rows); + }); } + }, (err) => { + process.nextTick(() => { + throw err; + }); }); } @@ -2799,12 +2847,20 @@ class Connection extends EventEmitter { return; } - this.makeRequest(request, TYPE.RPC_REQUEST, new RpcRequestPayload(request.sqlTextOrProcedure!, request.parameters, this.currentTransactionDescriptor(), this.config.options, this.databaseCollation), (err) => { - if (err) { - request.callback(err); + this.makeRequest(request, TYPE.RPC_REQUEST, new RpcRequestPayload(request.sqlTextOrProcedure!, request.parameters, this.currentTransactionDescriptor(), this.config.options, this.databaseCollation)).then(() => { + if (request.error) { + process.nextTick(() => { + request.callback(request.error); + }); } else { - request.callback(undefined, request.rowCount, request.rows); + process.nextTick(() => { + request.callback(undefined, request.rowCount, request.rows); + }); } + }, (err) => { + process.nextTick(() => { + throw err; + }); }); } @@ -2844,12 +2900,20 @@ class Connection extends EventEmitter { const request = new Request(undefined, (err) => { return callback(err, this.currentTransactionDescriptor()); }); - return this.makeRequest(request, TYPE.TRANSACTION_MANAGER, transaction.beginPayload(this.currentTransactionDescriptor()), (err) => { - if (err) { - request.callback(err); + this.makeRequest(request, TYPE.TRANSACTION_MANAGER, transaction.beginPayload(this.currentTransactionDescriptor())).then(() => { + if (request.error) { + process.nextTick(() => { + request.callback(request.error); + }); } else { - request.callback(undefined, request.rowCount, request.rows); + process.nextTick(() => { + request.callback(undefined, request.rowCount, request.rows); + }); } + }, (err) => { + process.nextTick(() => { + throw err; + }); }); } @@ -2876,12 +2940,20 @@ class Connection extends EventEmitter { })); } const request = new Request(undefined, callback); - return this.makeRequest(request, TYPE.TRANSACTION_MANAGER, transaction.commitPayload(this.currentTransactionDescriptor()), (err) => { - if (err) { - request.callback(err); + this.makeRequest(request, TYPE.TRANSACTION_MANAGER, transaction.commitPayload(this.currentTransactionDescriptor())).then(() => { + if (request.error) { + process.nextTick(() => { + request.callback(request.error); + }); } else { - request.callback(undefined, request.rowCount, request.rows); + process.nextTick(() => { + request.callback(undefined, request.rowCount, request.rows); + }); } + }, (err) => { + process.nextTick(() => { + throw err; + }); }); } @@ -2908,12 +2980,20 @@ class Connection extends EventEmitter { })); } const request = new Request(undefined, callback); - return this.makeRequest(request, TYPE.TRANSACTION_MANAGER, transaction.rollbackPayload(this.currentTransactionDescriptor()), (err) => { - if (err) { - request.callback(err); + this.makeRequest(request, TYPE.TRANSACTION_MANAGER, transaction.rollbackPayload(this.currentTransactionDescriptor())).then(() => { + if (request.error) { + process.nextTick(() => { + request.callback(request.error); + }); } else { - request.callback(undefined, request.rowCount, request.rows); + process.nextTick(() => { + request.callback(undefined, request.rowCount, request.rows); + }); } + }, (err) => { + process.nextTick(() => { + throw err; + }); }); } @@ -2937,12 +3017,21 @@ class Connection extends EventEmitter { })); } const request = new Request(undefined, callback); - return this.makeRequest(request, TYPE.TRANSACTION_MANAGER, transaction.savePayload(this.currentTransactionDescriptor()), (err) => { - if (err) { - request.callback(err); + + this.makeRequest(request, TYPE.TRANSACTION_MANAGER, transaction.savePayload(this.currentTransactionDescriptor())).then(() => { + if (request.error) { + process.nextTick(() => { + request.callback(request.error); + }); } else { - request.callback(undefined, request.rowCount, request.rows); + process.nextTick(() => { + request.callback(undefined, request.rowCount, request.rows); + }); } + }, (err) => { + process.nextTick(() => { + throw err; + }); }); } @@ -3023,17 +3112,17 @@ class Connection extends EventEmitter { /** * @private */ - makeRequest(request: Request | BulkLoad, packetType: number, payload: (Iterable | AsyncIterable) & { toString: (indent?: string) => string }, callback: (err: Error | undefined | null) => void) { + async makeRequest(request: Request | BulkLoad, packetType: number, payload: (Iterable | AsyncIterable) & { toString: (indent?: string) => string }) { if (this.state !== this.STATE.LOGGED_IN) { const message = 'Requests can only be made in the ' + this.STATE.LOGGED_IN.name + ' state, not the ' + this.state.name + ' state'; this.debug.log(message); - return callback(new RequestError(message, 'EINVALIDSTATE')); + request.error = new RequestError(message, 'EINVALIDSTATE'); + return; } if (request.canceled) { - return process.nextTick(() => { - callback(new RequestError('Canceled.', 'ECANCEL')); - }); + request.error = new RequestError('Canceled.', 'ECANCEL'); + return; } if (packetType === TYPE.SQL_BATCH) { @@ -3054,137 +3143,124 @@ class Connection extends EventEmitter { this.messageIo.outgoingMessageStream.write(message); this.transitionTo(this.STATE.SENT_CLIENT_REQUEST); - (async () => { - { - const payloadStream = Readable.from(payload); + { + const payloadStream = Readable.from(payload); - const onCancel = () => { - payloadStream.destroy(new RequestError('Canceled.', 'ECANCEL')); - }; + const onCancel = () => { + payloadStream.destroy(new RequestError('Canceled.', 'ECANCEL')); + }; - request.once('cancel', onCancel); + request.once('cancel', onCancel); - try { - for await (const chunk of payloadStream) { - if (message.write(chunk) === false) { - // TODO: Handle request cancellation while waiting for 'drain' event - await once(message, 'drain'); - } + try { + for await (const chunk of payloadStream) { + if (message.write(chunk) === false) { + // TODO: Handle request cancellation while waiting for 'drain' event + await once(message, 'drain'); } - } catch (error) { - request.error ??= error as Error; - message.ignore = true; + } + } catch (error) { + request.error ??= error as Error; + message.ignore = true; - if (request instanceof Request && request.paused) { - // resume the request if it was paused so we can read the remaining tokens - request.resume(); - } - } finally { - request.removeListener('cancel', onCancel); + if (request instanceof Request && request.paused) { + // resume the request if it was paused so we can read the remaining tokens + request.resume(); } + } finally { + request.removeListener('cancel', onCancel); + } - message.end(); + message.end(); - this.resetConnectionOnNextRequest = false; - this.debug.payload(function() { - return payload!.toString(' '); - }); + this.resetConnectionOnNextRequest = false; + this.debug.payload(function() { + return payload!.toString(' '); + }); + } + + let waitForAttentionResponse = false; + const onCancelAfterRequestSent = () => { + this.messageIo.sendMessage(TYPE.ATTENTION); + waitForAttentionResponse = true; + this.createCancelTimer(); + this.transitionTo(this.STATE.SENT_ATTENTION); + }; + + request.once('cancel', onCancelAfterRequestSent); + try { + let message; + try { + message = await this.messageIo.readMessage(); + } catch (err: any) { + return this.socketError(err); } + // request timer is stopped on first data package + this.clearRequestTimer(); - let waitForAttentionResponse = false; - const onCancelAfterRequestSent = () => { - this.messageIo.sendMessage(TYPE.ATTENTION); - waitForAttentionResponse = true; - this.createCancelTimer(); - this.transitionTo(this.STATE.SENT_ATTENTION); + const onCancel = () => { + if (request instanceof Request && request.paused) { + // resume the request if it was paused so we can read the remaining tokens + request.resume(); + } }; - request.once('cancel', onCancelAfterRequestSent); + request.once('cancel', onCancel); try { - let message; - try { - message = await this.messageIo.readMessage(); - } catch (err: any) { - return this.socketError(err); + if (request instanceof Request && request.paused) { + await once(request, 'resume'); } - // request timer is stopped on first data package - this.clearRequestTimer(); - const onCancel = () => { - if (request instanceof Request && request.paused) { - // resume the request if it was paused so we can read the remaining tokens - request.resume(); + const handler = new RequestTokenHandler(this, request); + for await (const token of StreamParser.parseTokens(message, this.debug, this.config.options)) { + // If the request was canceled, we discard any data contained in the response. + if (!request.canceled) { + if (request instanceof Request && request.paused) { + await once(request, 'resume'); + } + + handler[token.handlerName](token as any); } - }; + } + } finally { + request.removeListener('cancel', onCancel); + } - request.once('cancel', onCancel); - try { - if (request instanceof Request && request.paused) { - await once(request, 'resume'); + if (waitForAttentionResponse) { + // 3.2.5.7 Sent Attention State + while (true) { + try { + message = await this.messageIo.readMessage(); + } catch (err: any) { + return this.socketError(err); } - const handler = new RequestTokenHandler(this, request); + const handler = new AttentionTokenHandler(this, request); for await (const token of StreamParser.parseTokens(message, this.debug, this.config.options)) { - // If the request was canceled, we discard any data contained in the response. - if (!request.canceled) { - if (request instanceof Request && request.paused) { - await once(request, 'resume'); - } - - handler[token.handlerName](token as any); - } + handler[token.handlerName](token as any); } - } finally { - request.removeListener('cancel', onCancel); - } - if (waitForAttentionResponse) { - // 3.2.5.7 Sent Attention State - while (true) { - try { - message = await this.messageIo.readMessage(); - } catch (err: any) { - return this.socketError(err); - } + if (handler.attentionReceived) { + this.clearCancelTimer(); - const handler = new AttentionTokenHandler(this, request); - for await (const token of StreamParser.parseTokens(message, this.debug, this.config.options)) { - try { - handler[token.handlerName](token as any); - } catch (err) { - console.log(err); - throw err; - } + if (!request.error || !(request.error instanceof RequestError) || request.error.code !== 'ETIMEOUT') { + request.error = new RequestError('Canceled.', 'ECANCEL'); } - if (handler.attentionReceived) { - this.clearCancelTimer(); - - if (!request.error || !(request.error instanceof RequestError) || request.error.code !== 'ETIMEOUT') { - request.error = new RequestError('Canceled.', 'ECANCEL'); - } - - break; - } + break; } } - } finally { - request.removeListener('cancel', onCancelAfterRequestSent); } + } finally { + request.removeListener('cancel', onCancelAfterRequestSent); + } - this.transitionTo(this.STATE.LOGGED_IN); - this.request = undefined; - - if (this.config.options.tdsVersion < '7_2' && request.error && this.isSqlBatch) { - this.inTransaction = false; - } + this.transitionTo(this.STATE.LOGGED_IN); + this.request = undefined; - callback(request.error); - })().catch((err) => { - process.nextTick(() => { - throw err; - }); - }); + if (this.config.options.tdsVersion < '7_2' && request.error && this.isSqlBatch) { + this.inTransaction = false; + } } /** From 19253f8d05910f0d4bdfa28bf53131496019b03d Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Sat, 15 Jul 2023 16:33:23 +0000 Subject: [PATCH 10/11] refactor: improve cancellation handling in `makeRequest` --- src/connection.ts | 55 +++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 48 insertions(+), 7 deletions(-) diff --git a/src/connection.ts b/src/connection.ts index 7e2acf5e8..19adbca30 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -3176,8 +3176,29 @@ class Connection extends EventEmitter { try { for await (const chunk of payloadStream) { if (message.write(chunk) === false) { - // TODO: Handle request cancellation while waiting for 'drain' event - await once(message, 'drain'); + // Wait for the message to drain, or the request to be cancelled. + await new Promise((resolve) => { + const onDrain = () => { + request.removeListener('cancel', onCancel); + message.removeListener('drain', onDrain); + + resolve(); + }; + + const onCancel = () => { + request.removeListener('cancel', onCancel); + message.removeListener('drain', onDrain); + + resolve(); + }; + + message.once('drain', onDrain); + request.once('cancel', onCancel); + }); + + if (request.canceled) { + break; + } } } } catch (error) { @@ -3235,13 +3256,33 @@ class Connection extends EventEmitter { const handler = new RequestTokenHandler(this, request); for await (const token of StreamParser.parseTokens(message, this.debug, this.config.options)) { // If the request was canceled, we discard any data contained in the response. - if (!request.canceled) { - if (request instanceof Request && request.paused) { - await once(request, 'resume'); - } + if (request.canceled) { + continue; + } - handler[token.handlerName](token as any); + if (request instanceof Request && request.paused) { + // Wait for the request to be unpaused or canceled + await new Promise((resolve) => { + const onResume = () => { + request.removeListener('resume', onResume); + request.removeListener('cancel', onCancel); + + resolve(); + }; + + const onCancel = () => { + request.removeListener('resume', onResume); + request.removeListener('cancel', onCancel); + + resolve(); + }; + + request.on('resume', onResume); + request.on('cancel', onCancel); + }); } + + handler[token.handlerName](token as any); } } finally { request.removeListener('cancel', onCancel); From 903c1c5c40910cff977dda0d1ec4ac93f2b63bd6 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Sat, 22 Jul 2023 12:01:05 +0000 Subject: [PATCH 11/11] Fix incorrect merge conflict resolution. --- src/connection.ts | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/connection.ts b/src/connection.ts index 5cfa414c2..23b098ebb 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -1973,13 +1973,6 @@ class Connection extends EventEmitter { return debug; } - /** - * @private - */ - createTokenStreamParser(message: Message, handler: TokenHandler) { - return new TokenStreamParser(message, this.debug, handler, this.config.options); - } - socketHandlingForSendPreLogin(socket: net.Socket) { socket.on('error', (error) => { this.socketError(error); }); socket.on('close', () => { this.socketClose(); });