Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/cmap/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ export interface HandshakeDocument extends Document {
compression: string[];
saslSupportedMechs?: string;
loadBalanced?: boolean;
backpressure: true;
}

/**
Expand All @@ -241,6 +242,7 @@ export async function prepareHandshakeDocument(

const handshakeDoc: HandshakeDocument = {
[serverApi?.version || options.loadBalanced === true ? 'hello' : LEGACY_HELLO_COMMAND]: 1,
backpressure: true,
helloOk: true,
client: clientMetadata,
compression: compressors
Expand Down
3 changes: 3 additions & 0 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this.throwIfAborted();
}
} catch (error) {
if (options.session != null && !(error instanceof MongoServerError)) {
updateSessionFromResponse(options.session, MongoDBResponse.empty);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will consequently transition the transaction into TRANSACTION_IN_PROGRESS state (as .empty has no error and no success labels).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have coverage for this through the spec tests or is there a bespoke node test to write for this?

Copy link
Member

@tadjik1 tadjik1 Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a new change, rather this code preserves the same behaviour we have on main (where we transition transaction inside applySession), after refactoring this condition becomes necessary, so the state machine doesn't get stuck in STARTING_TRANSACTION after non-server error.

The question is valid though, I will dig deeper to see if we cover this somehow.

UPD:
I couldn't find anything about this specific transition in our tests (neither in integration/unit, nor in spec tests). At the same time I would prefer to keep it out of the scope of this PR because this is pre-backpressure behaviour:

// src/sessions.ts
if (session.transaction.state === TxnState.STARTING_TRANSACTION) {
    session.transaction.transition(TxnState.TRANSACTION_IN_PROGRESS); // this line is removed now
    command.startTransaction = true;

}
if (this.shouldEmitAndLogCommand) {
this.emitAndLogCommand(
this.monitorCommands,
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,7 @@ export type {
TimeoutContext,
TimeoutContextOptions
} from './timeout';
export type { TokenBucket } from './token_bucket';
export type { Transaction, TransactionOptions, TxnState } from './transactions';
export type {
BufferPool,
Expand Down
176 changes: 130 additions & 46 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { setTimeout } from 'timers/promises';

import { MIN_SUPPORTED_SNAPSHOT_READS_WIRE_VERSION } from '../cmap/wire_protocol/constants';
import {
isRetryableReadError,
Expand Down Expand Up @@ -26,9 +28,17 @@ import {
import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import { TimeoutContext } from '../timeout';
import {
BASE_BACKOFF_MS,
MAX_BACKOFF_MS,
MAX_RETRIES,
RETRY_COST,
RETRY_TOKEN_RETURN_RATE
} from '../token_bucket';
import { abortable, maxWireVersion, supportsRetryableWrites } from '../utils';
import { AggregateOperation } from './aggregate';
import { AbstractOperation, Aspect } from './operation';
import { RunCommandOperation } from './run_command';

const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
const MMAPv1_RETRY_WRITES_ERROR_MESSAGE =
Expand All @@ -50,7 +60,7 @@ type ResultTypeFromOperation<TOperation extends AbstractOperation> = ReturnType<
* The expectation is that this function:
* - Connects the MongoClient if it has not already been connected, see {@link autoConnect}
* - Creates a session if none is provided and cleans up the session it creates
* - Tries an operation and retries under certain conditions, see {@link tryOperation}
* - Tries an operation and retries under certain conditions, see {@link executeOperationWithRetries}
*
* @typeParam T - The operation's type
* @typeParam TResult - The type of the operation's result, calculated from T
Expand Down Expand Up @@ -120,7 +130,7 @@ export async function executeOperation<
});

try {
return await tryOperation(operation, {
return await executeOperationWithRetries(operation, {
topology,
timeoutContext,
session,
Expand Down Expand Up @@ -183,8 +193,11 @@ type RetryOptions = {
* @typeParam TResult - The type of the operation's result, calculated from T
*
* @param operation - The operation to execute
* */
async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFromOperation<T>>(
*/
async function executeOperationWithRetries<
T extends AbstractOperation,
TResult = ResultTypeFromOperation<T>
>(
operation: T,
{ topology, timeoutContext, session, readPreference }: RetryOptions
): Promise<TResult> {
Expand Down Expand Up @@ -233,33 +246,94 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
session.incrementTransactionNumber();
}

const maxTries = willRetry ? (timeoutContext.csotEnabled() ? Infinity : 2) : 1;
let previousOperationError: MongoError | undefined;
const deprioritizedServers = new DeprioritizedServers();

for (let tries = 0; tries < maxTries; tries++) {
if (previousOperationError) {
if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
let maxAttempts =
typeof operation.maxAttempts === 'number'
? operation.maxAttempts
: willRetry
? timeoutContext.csotEnabled()
? Infinity
: 2
: 1;

let error: MongoError | null = null;

for (let attempt = 0; attempt < maxAttempts; attempt++) {
operation.attemptsMade = attempt + 1;
operation.server = server;

try {
try {
const result = await server.command(operation, timeoutContext);
topology.tokenBucket.deposit(
attempt > 0
? RETRY_TOKEN_RETURN_RATE + RETRY_COST // on successful retry
: RETRY_TOKEN_RETURN_RATE // otherwise
);
return operation.handleOk(result);
} catch (error) {
return operation.handleError(error);
}
} catch (operationError) {
// Should never happen but if it does - propagate the error.
if (!(operationError instanceof MongoError)) throw operationError;

if (attempt > 0 && !operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
// if a retry attempt fails with a non-overload error, deposit 1 token.
topology.tokenBucket.deposit(RETRY_COST);
}

// Preserve the original error once a write has been performed.
// Only update to the latest error if no writes were performed.
if (error == null) {
error = operationError;
} else {
if (!operationError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed)) {
error = operationError;
}
}

// Reset timeouts
timeoutContext.clear();

if (hasWriteAspect && operationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
throw new MongoServerError({
message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
originalError: previousOperationError
originalError: operationError
});
}

if (operation.hasAspect(Aspect.COMMAND_BATCHING) && !operation.canRetryWrite) {
throw previousOperationError;
if (!canRetry(operation, operationError)) {
throw error;
}

if (operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
maxAttempts = Math.min(MAX_RETRIES + 1, operation.maxAttempts ?? MAX_RETRIES + 1);
}

if (hasWriteAspect && !isRetryableWriteError(previousOperationError))
throw previousOperationError;
if (attempt + 1 >= maxAttempts) {
throw error;
}

if (operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
if (!topology.tokenBucket.consume(RETRY_COST)) {
throw error;
}

const backoffMS = Math.random() * Math.min(MAX_BACKOFF_MS, BASE_BACKOFF_MS * 2 ** attempt);

// if the backoff would exhaust the CSOT timeout, short-circuit.
if (timeoutContext.csotEnabled() && backoffMS > timeoutContext.remainingTimeMS) {
throw error;
}

if (hasReadAspect && !isRetryableReadError(previousOperationError)) {
throw previousOperationError;
await setTimeout(backoffMS);
}

if (
previousOperationError instanceof MongoNetworkError &&
operationError instanceof MongoNetworkError &&
operation.hasAspect(Aspect.CURSOR_CREATING) &&
session != null &&
session.isPinned &&
Expand All @@ -268,52 +342,62 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
session.unpin({ force: true, forceClear: true });
}

deprioritizedServers.add(server.description);

server = await topology.selectServer(selector, {
session,
operationName: operation.commandName,
deprioritizedServers,
signal: operation.options.signal
});

if (hasWriteAspect && !supportsRetryableWrites(server)) {
if (
hasWriteAspect &&
!supportsRetryableWrites(server) &&
!operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)
) {
throw new MongoUnexpectedServerResponseError(
'Selected server does not support retryable writes'
);
}
}

operation.server = server;

try {
// If tries > 0 and we are command batching we need to reset the batch.
if (tries > 0 && operation.hasAspect(Aspect.COMMAND_BATCHING)) {
// Batched operations must reset the batch before retry,
// otherwise building a command will build the _next_ batch, not the current batch.
if (operation.hasAspect(Aspect.COMMAND_BATCHING)) {
operation.resetBatch();
}

try {
const result = await server.command(operation, timeoutContext);
return operation.handleOk(result);
} catch (error) {
return operation.handleError(error);
}
} catch (operationError) {
if (!(operationError instanceof MongoError)) throw operationError;
if (
previousOperationError != null &&
operationError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed)
) {
throw previousOperationError;
}
deprioritizedServers.add(server.description);
previousOperationError = operationError;

// Reset timeouts
timeoutContext.clear();
}
}

throw (
previousOperationError ??
new MongoRuntimeError('Tried to propagate retryability error, but no error was found.')
error ??
new MongoRuntimeError(
'Should never happen: operation execution loop terminated but no error was recorded.'
)
);

function canRetry(operation: AbstractOperation, error: MongoError) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function centralizes retry eligibility, as before it was split across the entire old loop. Key idea: SystemOverloadedError + RetryableError is always retryable regardless of retryReads/retryWrites settings or operation type (including runCommand).

// always retryable
if (
error.hasErrorLabel(MongoErrorLabel.SystemOverloadedError) &&
error.hasErrorLabel(MongoErrorLabel.RetryableError)
) {
return true;
}

// run command is only retryable if we get retryable overload errors
if (operation instanceof RunCommandOperation) {
return false;
}

// batch operations are only retryable if the batch is retryable
if (operation.hasAspect(Aspect.COMMAND_BATCHING)) {
return operation.canRetryWrite && isRetryableWriteError(error);
}

return (
(hasWriteAspect && willRetryWrite && isRetryableWriteError(error)) ||
(hasReadAspect && willRetryRead && isRetryableReadError(error))
);
}
}
8 changes: 8 additions & 0 deletions src/operations/operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ export abstract class AbstractOperation<TResult = any> {
/** Specifies the time an operation will run until it throws a timeout error. */
timeoutMS?: number;

/** Used by commitTransaction to share the retry budget across two executeOperation calls. */
maxAttempts?: number;

/** Tracks how many attempts were made in the last executeOperation call. */
attemptsMade: number;

private _session: ClientSession | undefined;

static aspects?: Set<symbol>;
Expand All @@ -82,6 +88,8 @@ export abstract class AbstractOperation<TResult = any> {

this.options = options;
this.bypassPinningCheck = !!options.bypassPinningCheck;

this.attemptsMade = 0;
}

/** Must match the first key of the command object sent to the server.
Expand Down
19 changes: 12 additions & 7 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import { type Abortable, TypedEventEmitter } from '../mongo_types';
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import type { ClientSession } from '../sessions';
import { Timeout, TimeoutContext, TimeoutError } from '../timeout';
import { INITIAL_TOKEN_BUCKET_SIZE, TokenBucket } from '../token_bucket';
import type { Transaction } from '../transactions';
import {
addAbortListener,
Expand Down Expand Up @@ -207,18 +208,16 @@ export type TopologyEvents = {
* @internal
*/
export class Topology extends TypedEventEmitter<TopologyEvents> {
/** @internal */
s: TopologyPrivate;
/** @internal */
waitQueue: List<ServerSelectionRequest>;
/** @internal */
hello?: Document;
/** @internal */
_type?: string;

/** @internal */
tokenBucket = new TokenBucket(INITIAL_TOKEN_BUCKET_SIZE);

client!: MongoClient;

/** @internal */
private connectionLock?: Promise<Topology>;

/** @event */
Expand Down Expand Up @@ -595,7 +594,11 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
)
);
}
if (options.timeoutContext?.clearServerSelectionTimeout) timeout?.clear();

if (!options.timeoutContext || options.timeoutContext.clearServerSelectionTimeout) {
timeout?.clear();
}

return transaction.server;
}

Expand Down Expand Up @@ -666,7 +669,9 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
throw error;
} finally {
abortListener?.[kDispose]();
if (options.timeoutContext?.clearServerSelectionTimeout) timeout?.clear();
if (!options.timeoutContext || options.timeoutContext.clearServerSelectionTimeout) {
timeout?.clear();
}
}
}
/**
Expand Down
Loading