Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pymongo/asynchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2825,7 +2825,11 @@ async def run(self) -> T:
if self._last_error is None:
self._last_error = exc

if self._server is not None:
if (
self._server is not None
and self._client.topology_description.topology_type_name == "Sharded"
or exc.has_error_label("SystemOverloadedError")
):
self._deprioritized_servers.append(self._server)

def _is_not_eligible_for_retry(self) -> bool:
Expand Down
6 changes: 5 additions & 1 deletion pymongo/synchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2815,7 +2815,11 @@ def run(self) -> T:
if self._last_error is None:
self._last_error = exc

if self._server is not None:
if (
self._server is not None
and self._client.topology_description.topology_type_name == "Sharded"
or exc.has_error_label("SystemOverloadedError")
):
self._deprioritized_servers.append(self._server)

def _is_not_eligible_for_retry(self) -> bool:
Expand Down
74 changes: 74 additions & 0 deletions test/asynchronous/test_retryable_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,80 @@ async def test_retryable_reads_are_retried_on_the_same_implicit_session(self):
self.assertEqual(command_docs[0]["lsid"], command_docs[1]["lsid"])
self.assertIsNot(command_docs[0], command_docs[1])

@async_client_context.require_replica_set
@async_client_context.require_failCommand_fail_point
async def test_03_01_retryable_reads_caused_by_overload_errors_are_retried_on_a_different_replicaset_server_when_one_is_available(
self
):
listener = OvertCommandListener()

# 1. Create a client `client` with `retryReads=true`, `readPreference=primaryPreferred`, and command event monitoring enabled.
client = await self.async_rs_or_single_client(
event_listeners=[listener], retryReads=True, readPreference="primaryPreferred"
)

# 2. Configure a fail point with the RetryableError and SystemOverloadedError error labels.
command_args = {
"configureFailPoint": "failCommand",
"mode": {"times": 1},
"data": {
"failCommands": ["find"],
"errorLabels": ["RetryableError", "SystemOverloadedError"],
"errorCode": 6,
},
}
await async_set_fail_point(client, command_args)

# 3. Reset the command event monitor to clear the fail point command from its stored events.
listener.reset()

# 4. Execute a `find` command with `client`.
await client.t.t.find_one({})

# 5. Assert that one failed command event and one successful command event occurred.
self.assertEqual(len(listener.failed_events), 1)
self.assertEqual(len(listener.succeeded_events), 1)

# 6. Assert that both events occurred on different servers.
assert listener.failed_events[0].connection_id != listener.succeeded_events[0].connection_id

@async_client_context.require_replica_set
@async_client_context.require_failCommand_fail_point
async def test_03_02_retryable_reads_caused_by_non_overload_errors_are_retried_on_the_same_replicaset_server(
self
):
listener = OvertCommandListener()

# 1. Create a client `client` with `retryReads=true`, `readPreference=primaryPreferred`, and command event monitoring enabled.
client = await self.async_rs_or_single_client(
event_listeners=[listener], retryReads=True, readPreference="primaryPreferred"
)

# 2. Configure a fail point with the RetryableError error label.
command_args = {
"configureFailPoint": "failCommand",
"mode": {"times": 1},
"data": {
"failCommands": ["find"],
"errorLabels": ["RetryableError"],
"errorCode": 6,
},
}
await async_set_fail_point(client, command_args)

# 3. Reset the command event monitor to clear the fail point command from its stored events.
listener.reset()

# 4. Execute a `find` command with `client`.
await client.t.t.find_one({})

# 5. Assert that one failed command event and one successful command event occurred.
self.assertEqual(len(listener.failed_events), 1)
self.assertEqual(len(listener.succeeded_events), 1)

# 6. Assert that both events occurred the same server.
assert listener.failed_events[0].connection_id == listener.succeeded_events[0].connection_id


if __name__ == "__main__":
unittest.main()
74 changes: 74 additions & 0 deletions test/test_retryable_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,80 @@ def test_retryable_reads_are_retried_on_the_same_implicit_session(self):
self.assertEqual(command_docs[0]["lsid"], command_docs[1]["lsid"])
self.assertIsNot(command_docs[0], command_docs[1])

@client_context.require_replica_set
@client_context.require_failCommand_fail_point
def test_03_01_retryable_reads_caused_by_overload_errors_are_retried_on_a_different_replicaset_server_when_one_is_available(
self
):
listener = OvertCommandListener()

# 1. Create a client `client` with `retryReads=true`, `readPreference=primaryPreferred`, and command event monitoring enabled.
client = self.rs_or_single_client(
event_listeners=[listener], retryReads=True, readPreference="primaryPreferred"
)

# 2. Configure a fail point with the RetryableError and SystemOverloadedError error labels.
command_args = {
"configureFailPoint": "failCommand",
"mode": {"times": 1},
"data": {
"failCommands": ["find"],
"errorLabels": ["RetryableError", "SystemOverloadedError"],
"errorCode": 6,
},
}
set_fail_point(client, command_args)

# 3. Reset the command event monitor to clear the fail point command from its stored events.
listener.reset()

# 4. Execute a `find` command with `client`.
client.t.t.find_one({})

# 5. Assert that one failed command event and one successful command event occurred.
self.assertEqual(len(listener.failed_events), 1)
self.assertEqual(len(listener.succeeded_events), 1)

# 6. Assert that both events occurred on different servers.
assert listener.failed_events[0].connection_id != listener.succeeded_events[0].connection_id

@client_context.require_replica_set
@client_context.require_failCommand_fail_point
def test_03_02_retryable_reads_caused_by_non_overload_errors_are_retried_on_the_same_replicaset_server(
self
):
listener = OvertCommandListener()

# 1. Create a client `client` with `retryReads=true`, `readPreference=primaryPreferred`, and command event monitoring enabled.
client = self.rs_or_single_client(
event_listeners=[listener], retryReads=True, readPreference="primaryPreferred"
)

# 2. Configure a fail point with the RetryableError error label.
command_args = {
"configureFailPoint": "failCommand",
"mode": {"times": 1},
"data": {
"failCommands": ["find"],
"errorLabels": ["RetryableError"],
"errorCode": 6,
},
}
set_fail_point(client, command_args)

# 3. Reset the command event monitor to clear the fail point command from its stored events.
listener.reset()

# 4. Execute a `find` command with `client`.
client.t.t.find_one({})

# 5. Assert that one failed command event and one successful command event occurred.
self.assertEqual(len(listener.failed_events), 1)
self.assertEqual(len(listener.succeeded_events), 1)

# 6. Assert that both events occurred the same server.
assert listener.failed_events[0].connection_id == listener.succeeded_events[0].connection_id


if __name__ == "__main__":
unittest.main()
Loading