Skip to content
118 changes: 92 additions & 26 deletions src/sessions.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { setTimeout } from 'timers/promises';

import { Binary, type Document, Long, type Timestamp } from './bson';
import type { CommandOptions, Connection } from './cmap/connection';
import { ConnectionPoolMetrics } from './cmap/metrics';
Expand Down Expand Up @@ -714,6 +716,8 @@ export class ClientSession
timeoutMS?: number;
}
): Promise<T> {
// 1.iii Set TIMEOUT_MS to be timeoutMS if given, otherwise 120-seconds
// DP: We do this indirectly as a combination of timeoutMS->timeoutContext and this MAX_TIMEOUT
const MAX_TIMEOUT = 120000;

const timeoutMS = options?.timeoutMS ?? this.timeoutMS ?? null;
Expand All @@ -724,18 +728,25 @@ export class ClientSession
serverSelectionTimeoutMS: this.clientOptions.serverSelectionTimeoutMS,
socketTimeoutMS: this.clientOptions.socketTimeoutMS
})
: null;
: null; // DP: this is always a CSOT context or null

// 1.i Record the current monotonic time, which will be used to enforce the 120-second / CSOT timeout before later retry attempts.
const startTime = this.timeoutContext?.csotEnabled() ? this.timeoutContext.start : now();
// DP: the CSOT check is redundant, because of the definition in L725
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Durran brought this up a while ago - the timeoutContext abstraction needs a bit of work. start is only defined on the CSOT timeout context, so we need to narrow. Otherwise, we'd get a TS error:

Image


let committed = false;
let result: any;
let result: T;

try {
while (!committed) {
this.startTransaction(options); // may throw on error
// 1.ii Set retry to 0. This will be used for backoff later in step 7.
for (let retry = 0; !committed; ++retry) {
// 2. Invoke startTransaction on the session
// 3. If startTransaction reported an error, propagate that error to the caller of withTransaction and return immediately.
this.startTransaction(options); // may throw on error (satisfies the spec requirement 3. above)

try {
// 4. Invoke the callback.
// 5. Control returns to withTransaction. (continued below)
const promise = fn(this);
if (!isPromiseLike(promise)) {
throw new MongoInvalidArgumentError(
Expand All @@ -745,34 +756,48 @@ export class ClientSession

result = await promise;

// 5. (cont.) Determine the current state of the ClientSession (continued below)
if (
this.transaction.state === TxnState.NO_TRANSACTION ||
this.transaction.state === TxnState.TRANSACTION_COMMITTED ||
this.transaction.state === TxnState.TRANSACTION_ABORTED
) {
// Assume callback intentionally ended the transaction
// 7. If the ClientSession is in the "no transaction", "transaction aborted", or "transaction committed" state,
// assume the callback intentionally aborted or committed the transaction and return immediately.
return result;
}
// 5. (cont.) and whether the callback reported an error
// 6. If the callback reported an error:
} catch (fnError) {
// DP: The preemptive abort isn't spec; this !MongoError would be an error thrown by the callback.
// DP: Is it safe to assume that the callback hasn't committed the transaction before throwing?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

     // DP: The preemptive abort isn't spec; this !MongoError would be an error thrown by the callback.

No context here

// DP: Is it safe to assume that the callback hasn't committed the transaction before throwing?

Yes - a withTransaction callback should never commit a transaction (we commit the transaction for users).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I say the preemptive abort is not spec, I mean the spec says to go through steps 6.i-6.iii before throwing

if (!(fnError instanceof MongoError) || fnError instanceof MongoInvalidArgumentError) {
await this.abortTransaction();
throw fnError;
}

// 6.i If the ClientSession is in the "starting transaction" or "transaction in progress" state, invoke abortTransaction on the session
if (
this.transaction.state === TxnState.STARTING_TRANSACTION ||
this.transaction.state === TxnState.TRANSACTION_IN_PROGRESS
) {
await this.abortTransaction();
}

// 6.ii If the callback's error includes a "TransientTransactionError" label and the elapsed time of withTransaction is less than TIMEOUT_MS, calculate the backoffMS
if (
fnError.hasErrorLabel(MongoErrorLabel.TransientTransactionError) &&
(this.timeoutContext != null || now() - startTime < MAX_TIMEOUT)
(this.timeoutContext?.csotEnabled() || now() - startTime < MAX_TIMEOUT)
) {
// 6.ii (cont.) If elapsed time + backoffMS > TIMEOUT_MS, then raise last known error. Otherwise, sleep for backoffMS, increment retry, and jump back to step two.
// DP: The spec would have us apply backoff and jitter here instead of immediately retrying the startTransaction, which is where this continue is sending us
continue;
}

// 6.iii If the callback's error includes a "UnknownTransactionCommitResult" label, the callback must have manually committed a transaction,
// propagate the callback's error to the caller of withTransaction and return immediately.
// DP: The 6.iii check is redundant with 6.iv, so we don't write code for it
// 6.iv Otherwise, propagate the callback's error to the caller of withTransaction and return immediately.
throw fnError;
}

Expand All @@ -783,35 +808,76 @@ export class ClientSession
* apply a majority write concern if commitTransaction is
* being retried (see: DRIVERS-601)
*/
// 8. Invoke commitTransaction on the session.
await this.commitTransaction();
committed = true;
} catch (commitError) {
/*
* Note: a maxTimeMS error will have the MaxTimeMSExpired
* code (50) and can be reported as a top-level error or
* inside writeConcernError, ex.
* { ok:0, code: 50, codeName: 'MaxTimeMSExpired' }
* { ok:1, writeConcernError: { code: 50, codeName: 'MaxTimeMSExpired' } }
*/
if (
!isMaxTimeMSExpiredError(commitError) &&
commitError.hasErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult) &&
(this.timeoutContext != null || now() - startTime < MAX_TIMEOUT)
) {
continue;
}

if (
commitError.hasErrorLabel(MongoErrorLabel.TransientTransactionError) &&
(this.timeoutContext != null || now() - startTime < MAX_TIMEOUT)
) {
break;
// 9. If commitTransaction reported an error:
} catch (commitError) {
// If CSOT is enabled, we repeatedly retry until timeoutMS expires. This is enforced by providing a
// timeoutContext to each async API, which know how to cancel themselves (i.e., the next retry will
// abort the withTransaction call).
// If CSOT is not enabled, do we still have time remaining or have we timed out?
const hasTimedOut =
!this.timeoutContext?.csotEnabled() && now() - startTime >= MAX_TIMEOUT;

if (!hasTimedOut) {
// 9.i If the commitTransaction error includes a "UnknownTransactionCommitResult" label
// and the error is not MaxTimeMSExpired
// and the elapsed time of withTransaction is less than TIMEOUT_MS, jump back to step eight
if (
!isMaxTimeMSExpiredError(commitError) &&
commitError.hasErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult)
) {
/*
* Note: a maxTimeMS error will have the MaxTimeMSExpired
* code (50) and can be reported as a top-level error or
* inside writeConcernError, ex.
* { ok:0, code: 50, codeName: 'MaxTimeMSExpired' }
* { ok:1, writeConcernError: { code: 50, codeName: 'MaxTimeMSExpired' } }
*/
continue;
}

// 9.ii If the commitTransaction error includes a "TransientTransactionError" label
// and the elapsed time of withTransaction is less than TIMEOUT_MS, jump back to step two.
// DP: Step two makes no sense here, perhaps the intent is to perform the instructions in 6.ii and then jump back to 8?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if my comment is correct, then this is ok here, but needs to also be in 6.ii

if (commitError.hasErrorLabel(MongoErrorLabel.TransientTransactionError)) {
const BACKOFF_INITIAL_MS = 5; // 6.ii.c BACKOFF_INITIAL is 5ms
const BACKOFF_MAX_MS = 500; // 6.ii.d BACKOFF_MAX is 500ms
const BACKOFF_GROWTH = 1.5; // DP: convenience var for the formula below
const jitter = Math.random(); // 6.ii.a jitter is a random float between [0, 1)
// 6.ii (cont.) calculate the backoffMS to bejitter * min(BACKOFF_INITIAL * (1.5**retry), BACKOFF_MAX)
const backoffMS =
jitter * Math.min(BACKOFF_INITIAL_MS * BACKOFF_GROWTH ** retry, BACKOFF_MAX_MS);

const willExceedTransactionDeadline =
(this.timeoutContext?.csotEnabled() &&
backoffMS > this.timeoutContext.remainingTimeMS) ||
now() + backoffMS > startTime + MAX_TIMEOUT;

// 6.ii (cont.) If elapsed time + backoffMS > TIMEOUT_MS, then raise last known error
// DP: the lines above do this math slightly indirectly, but this break does NOT raise the last known error
// because breaking the while loop takes us to the return result statement outside the while(!committed)
Copy link
Contributor Author

@dariakp dariakp Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, to the top of the for retry loop (but with the same effect re: last known error)

if (willExceedTransactionDeadline) {
break;
}

// 6.ii (cont.) Otherwise, sleep for backoffMS, increment retry, and jump back to step two.
// DP: Assuming the spec means step 8
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the spec means 2, this is correct

await setTimeout(backoffMS);

break;
}
}

// 9.iii Otherwise, propagate the commitTransaction error to the caller of withTransaction and return immediately.
throw commitError;
}
}
}

// @ts-expect-error Result is always defined if we reach here, the for-loop above convinces TS it is not.
return result;
} finally {
this.timeoutContext = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { expect } from 'chai';
import { test } from 'mocha';
import * as sinon from 'sinon';

import { type MongoClient } from '../../../src';
import { configureFailPoint, type FailCommandFailPoint, measureDuration } from '../../tools/utils';

const failCommand: FailCommandFailPoint = {
configureFailPoint: 'failCommand',
mode: {
times: 13
},
data: {
failCommands: ['commitTransaction'],
errorCode: 251
}
};

describe('Retry Backoff is Enforced', function () {
let client: MongoClient;

beforeEach(async function () {
client = this.configuration.newClient();
});

afterEach(async function () {
sinon.restore();
await client?.close();
});

test(
'works',
{
requires: {
mongodb: '>=4.4', // failCommand
topology: '!single' // transactions can't run on standalone servers
}
},
async function () {
const randomStub = sinon.stub(Math, 'random');

randomStub.returns(0);

await configureFailPoint(this.configuration, failCommand);

const { duration: noBackoffTime } = await measureDuration(() => {
return client.withSession(async s => {
await s.withTransaction(async s => {
await client.db('foo').collection('bar').insertOne({ name: 'bailey' }, { session: s });
});
});
});

randomStub.returns(1);

await configureFailPoint(this.configuration, failCommand);

const { duration: fullBackoffDuration } = await measureDuration(() => {
return client.withSession(async s => {
await s.withTransaction(async s => {
await client.db('foo').collection('bar').insertOne({ name: 'bailey' }, { session: s });
});
});
});

expect(fullBackoffDuration).to.be.within(
noBackoffTime + 2200 - 1000,
noBackoffTime + 2200 + 1000
);
}
);
});