-
Notifications
You must be signed in to change notification settings - Fork 1.8k
feat(NODE-7142): Exponential backoff and jitter in retry loops #4871
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
baileympearson
wants to merge
20
commits into
main
Choose a base branch
from
NODE-7142-backpressure-backoff
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+12,792
−64
Open
Changes from all commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
7c08fb5
POC
baileympearson 4100da7
deposit retry tokens only on retries, update maxAttempts property con…
tadjik1 091e989
update to latest, small clean-ups
tadjik1 680ee08
add timeoutContext to retry selectServer
tadjik1 0c36556
revert non-compliant spec change
tadjik1 a4caf38
always cleanup local timeout (also for sharded clusters)
tadjik1 89eafde
do not leak the timeout also in normal path finally block
tadjik1 50d5c2c
run all tests
tadjik1 85f10c3
define constants
tadjik1 b048983
export TokenBucket
tadjik1 adcdc4f
retry batching commands only on retryable errors
tadjik1 ea394cb
Apply suggestions from code review
tadjik1 b7cf299
do not change package-lock.json
tadjik1 6cf961f
initialize attemptsMade
tadjik1 2bce2a4
move export of TokenBucket type into "internal" section
tadjik1 412a779
implement prose test #2; re-arrange export of TokenBucket type
tadjik1 3ac26b7
extend spec test for retry write transaction
tadjik1 ac9e097
run updated transactions tests
tadjik1 805ef19
run prev. version of the tests
tadjik1 02f2b18
do not validate TokenBucket export
tadjik1 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,5 @@ | ||
| import { setTimeout } from 'timers/promises'; | ||
|
|
||
| import { MIN_SUPPORTED_SNAPSHOT_READS_WIRE_VERSION } from '../cmap/wire_protocol/constants'; | ||
| import { | ||
| isRetryableReadError, | ||
|
|
@@ -26,9 +28,17 @@ import { | |
| import type { Topology } from '../sdam/topology'; | ||
| import type { ClientSession } from '../sessions'; | ||
| import { TimeoutContext } from '../timeout'; | ||
| import { | ||
| BASE_BACKOFF_MS, | ||
| MAX_BACKOFF_MS, | ||
| MAX_RETRIES, | ||
| RETRY_COST, | ||
| RETRY_TOKEN_RETURN_RATE | ||
| } from '../token_bucket'; | ||
| import { abortable, maxWireVersion, supportsRetryableWrites } from '../utils'; | ||
| import { AggregateOperation } from './aggregate'; | ||
| import { AbstractOperation, Aspect } from './operation'; | ||
| import { RunCommandOperation } from './run_command'; | ||
|
|
||
| const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation; | ||
| const MMAPv1_RETRY_WRITES_ERROR_MESSAGE = | ||
|
|
@@ -50,7 +60,7 @@ type ResultTypeFromOperation<TOperation extends AbstractOperation> = ReturnType< | |
| * The expectation is that this function: | ||
| * - Connects the MongoClient if it has not already been connected, see {@link autoConnect} | ||
| * - Creates a session if none is provided and cleans up the session it creates | ||
| * - Tries an operation and retries under certain conditions, see {@link tryOperation} | ||
| * - Tries an operation and retries under certain conditions, see {@link executeOperationWithRetries} | ||
| * | ||
| * @typeParam T - The operation's type | ||
| * @typeParam TResult - The type of the operation's result, calculated from T | ||
|
|
@@ -120,7 +130,7 @@ export async function executeOperation< | |
| }); | ||
|
|
||
| try { | ||
| return await tryOperation(operation, { | ||
| return await executeOperationWithRetries(operation, { | ||
| topology, | ||
| timeoutContext, | ||
| session, | ||
|
|
@@ -183,8 +193,11 @@ type RetryOptions = { | |
| * @typeParam TResult - The type of the operation's result, calculated from T | ||
| * | ||
| * @param operation - The operation to execute | ||
| * */ | ||
| async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFromOperation<T>>( | ||
| */ | ||
| async function executeOperationWithRetries< | ||
| T extends AbstractOperation, | ||
| TResult = ResultTypeFromOperation<T> | ||
| >( | ||
| operation: T, | ||
| { topology, timeoutContext, session, readPreference }: RetryOptions | ||
| ): Promise<TResult> { | ||
|
|
@@ -233,33 +246,94 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro | |
| session.incrementTransactionNumber(); | ||
| } | ||
|
|
||
| const maxTries = willRetry ? (timeoutContext.csotEnabled() ? Infinity : 2) : 1; | ||
| let previousOperationError: MongoError | undefined; | ||
| const deprioritizedServers = new DeprioritizedServers(); | ||
|
|
||
| for (let tries = 0; tries < maxTries; tries++) { | ||
| if (previousOperationError) { | ||
| if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) { | ||
| let maxAttempts = | ||
| typeof operation.maxAttempts === 'number' | ||
| ? operation.maxAttempts | ||
| : willRetry | ||
| ? timeoutContext.csotEnabled() | ||
| ? Infinity | ||
| : 2 | ||
| : 1; | ||
|
|
||
| let error: MongoError | null = null; | ||
|
|
||
| for (let attempt = 0; attempt < maxAttempts; attempt++) { | ||
| operation.attemptsMade = attempt + 1; | ||
| operation.server = server; | ||
|
|
||
| try { | ||
| try { | ||
| const result = await server.command(operation, timeoutContext); | ||
| topology.tokenBucket.deposit( | ||
| attempt > 0 | ||
| ? RETRY_TOKEN_RETURN_RATE + RETRY_COST // on successful retry | ||
| : RETRY_TOKEN_RETURN_RATE // otherwise | ||
| ); | ||
| return operation.handleOk(result); | ||
| } catch (error) { | ||
| return operation.handleError(error); | ||
| } | ||
| } catch (operationError) { | ||
| // Should never happen but if it does - propagate the error. | ||
| if (!(operationError instanceof MongoError)) throw operationError; | ||
|
|
||
| if (attempt > 0 && !operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) { | ||
| // if a retry attempt fails with a non-overload error, deposit 1 token. | ||
| topology.tokenBucket.deposit(RETRY_COST); | ||
| } | ||
|
|
||
| // Preserve the original error once a write has been performed. | ||
| // Only update to the latest error if no writes were performed. | ||
| if (error == null) { | ||
| error = operationError; | ||
| } else { | ||
| if (!operationError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed)) { | ||
| error = operationError; | ||
| } | ||
| } | ||
nbbeeken marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // Reset timeouts | ||
| timeoutContext.clear(); | ||
|
|
||
| if (hasWriteAspect && operationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) { | ||
| throw new MongoServerError({ | ||
| message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE, | ||
| errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE, | ||
| originalError: previousOperationError | ||
| originalError: operationError | ||
| }); | ||
| } | ||
|
|
||
| if (operation.hasAspect(Aspect.COMMAND_BATCHING) && !operation.canRetryWrite) { | ||
| throw previousOperationError; | ||
| if (!canRetry(operation, operationError)) { | ||
| throw error; | ||
| } | ||
|
|
||
| if (operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) { | ||
| maxAttempts = Math.min(MAX_RETRIES + 1, operation.maxAttempts ?? MAX_RETRIES + 1); | ||
| } | ||
|
|
||
| if (hasWriteAspect && !isRetryableWriteError(previousOperationError)) | ||
| throw previousOperationError; | ||
| if (attempt + 1 >= maxAttempts) { | ||
| throw error; | ||
| } | ||
|
|
||
| if (operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) { | ||
| if (!topology.tokenBucket.consume(RETRY_COST)) { | ||
| throw error; | ||
| } | ||
|
|
||
| const backoffMS = Math.random() * Math.min(MAX_BACKOFF_MS, BASE_BACKOFF_MS * 2 ** attempt); | ||
|
|
||
| // if the backoff would exhaust the CSOT timeout, short-circuit. | ||
| if (timeoutContext.csotEnabled() && backoffMS > timeoutContext.remainingTimeMS) { | ||
| throw error; | ||
| } | ||
|
|
||
| if (hasReadAspect && !isRetryableReadError(previousOperationError)) { | ||
| throw previousOperationError; | ||
| await setTimeout(backoffMS); | ||
| } | ||
|
|
||
| if ( | ||
| previousOperationError instanceof MongoNetworkError && | ||
| operationError instanceof MongoNetworkError && | ||
| operation.hasAspect(Aspect.CURSOR_CREATING) && | ||
| session != null && | ||
| session.isPinned && | ||
|
|
@@ -268,52 +342,62 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro | |
| session.unpin({ force: true, forceClear: true }); | ||
| } | ||
|
|
||
| deprioritizedServers.add(server.description); | ||
|
|
||
| server = await topology.selectServer(selector, { | ||
| session, | ||
| operationName: operation.commandName, | ||
| deprioritizedServers, | ||
| signal: operation.options.signal | ||
| }); | ||
|
|
||
| if (hasWriteAspect && !supportsRetryableWrites(server)) { | ||
| if ( | ||
| hasWriteAspect && | ||
| !supportsRetryableWrites(server) && | ||
| !operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError) | ||
| ) { | ||
| throw new MongoUnexpectedServerResponseError( | ||
| 'Selected server does not support retryable writes' | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| operation.server = server; | ||
|
|
||
| try { | ||
| // If tries > 0 and we are command batching we need to reset the batch. | ||
| if (tries > 0 && operation.hasAspect(Aspect.COMMAND_BATCHING)) { | ||
| // Batched operations must reset the batch before retry, | ||
| // otherwise building a command will build the _next_ batch, not the current batch. | ||
| if (operation.hasAspect(Aspect.COMMAND_BATCHING)) { | ||
| operation.resetBatch(); | ||
| } | ||
|
|
||
| try { | ||
| const result = await server.command(operation, timeoutContext); | ||
| return operation.handleOk(result); | ||
| } catch (error) { | ||
| return operation.handleError(error); | ||
| } | ||
| } catch (operationError) { | ||
| if (!(operationError instanceof MongoError)) throw operationError; | ||
| if ( | ||
| previousOperationError != null && | ||
| operationError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed) | ||
| ) { | ||
| throw previousOperationError; | ||
| } | ||
| deprioritizedServers.add(server.description); | ||
| previousOperationError = operationError; | ||
|
|
||
| // Reset timeouts | ||
| timeoutContext.clear(); | ||
| } | ||
| } | ||
|
|
||
| throw ( | ||
| previousOperationError ?? | ||
| new MongoRuntimeError('Tried to propagate retryability error, but no error was found.') | ||
| error ?? | ||
| new MongoRuntimeError( | ||
| 'Should never happen: operation execution loop terminated but no error was recorded.' | ||
| ) | ||
| ); | ||
|
|
||
| function canRetry(operation: AbstractOperation, error: MongoError) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function centralizes retry eligibility, as before it was split across the entire old loop. Key idea: |
||
| // always retryable | ||
| if ( | ||
| error.hasErrorLabel(MongoErrorLabel.SystemOverloadedError) && | ||
| error.hasErrorLabel(MongoErrorLabel.RetryableError) | ||
| ) { | ||
| return true; | ||
| } | ||
|
|
||
| // run command is only retryable if we get retryable overload errors | ||
| if (operation instanceof RunCommandOperation) { | ||
| return false; | ||
| } | ||
|
|
||
| // batch operations are only retryable if the batch is retryable | ||
| if (operation.hasAspect(Aspect.COMMAND_BATCHING)) { | ||
| return operation.canRetryWrite && isRetryableWriteError(error); | ||
| } | ||
|
|
||
| return ( | ||
| (hasWriteAspect && willRetryWrite && isRetryableWriteError(error)) || | ||
| (hasReadAspect && willRetryRead && isRetryableReadError(error)) | ||
| ); | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will consequently transition the transaction into
TRANSACTION_IN_PROGRESSstate (as .empty has no error and no success labels).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have coverage for this through the spec tests or is there a bespoke node test to write for this?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a new change, rather this code preserves the same behaviour we have on main (where we transition transaction inside applySession), after refactoring this condition becomes necessary, so the state machine doesn't get stuck in STARTING_TRANSACTION after non-server error.
The question is valid though, I will dig deeper to see if we cover this somehow.
UPD:
I couldn't find anything about this specific transition in our tests (neither in integration/unit, nor in spec tests). At the same time I would prefer to keep it out of the scope of this PR because this is pre-backpressure behaviour: