Skip to content

Commit 1fffcba

Browse files
Wrapper for fault injector and mock proxy service + cluster maint notifications e2e tests for node slots migration and node fail over (#3882)
* Adding fault injector abstraction that would be able to work with re fault injector as well as with proxy server * Adding wrapper that will allow using the mock proxy for e2e tests execution. Updating standalone maint notifications tests. Adding cluster maint notifications test with slot migration and node fail over. Fix in the connection logic when applying relaxed_timeout. * Apply suggestions from code review --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent bc74454 commit 1fffcba

File tree

12 files changed

+1393
-359
lines changed

12 files changed

+1393
-359
lines changed

docker-compose.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ services:
134134
API_PORT: "4000"
135135
ENABLE_LOGGING: true
136136
SIMULATE_CLUSTER: true
137+
DEFAULT_INTERCEPTORS: "cluster,hitless,logger"
138+
137139
ports:
138140
- "15379:15379"
139141
- "15380:15380"

redis/_parsers/socket.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ def _read_from_socket(
6262
sock.settimeout(timeout)
6363
try:
6464
while True:
65-
data = self._sock.recv(socket_read_size)
65+
data = sock.recv(socket_read_size)
6666
# an empty string indicates the server shutdown the socket
6767
if isinstance(data, bytes) and len(data) == 0:
6868
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)

redis/cluster.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2027,10 +2027,10 @@ def initialize(
20272027
# Make sure cluster mode is enabled on this node
20282028
try:
20292029
cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS"))
2030+
# For some cases we might not want to disconnect current pool and
2031+
# lose in flight commands responses
20302032
if disconnect_startup_nodes_pools:
20312033
# Disconnect the connection pool to avoid keeping the connection open
2032-
# For some cases we might not want to disconnect current pool and
2033-
# lose in flight commands responses
20342034
r.connection_pool.disconnect()
20352035
except ResponseError:
20362036
raise RedisClusterException(

redis/connection.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -698,7 +698,13 @@ def update_current_socket_timeout(self, relaxed_timeout: Optional[float] = None)
698698
conn_socket = self._get_socket()
699699
if conn_socket:
700700
timeout = relaxed_timeout if relaxed_timeout != -1 else self.socket_timeout
701-
conn_socket.settimeout(timeout)
701+
# if the current timeout is 0 it means we are in the middle of a can_read call
702+
# in this case we don't want to change the timeout because the operation
703+
# is non-blocking and should return immediately
704+
# Changing the state from non-blocking to blocking in the middle of a read operation
705+
# will lead to a deadlock
706+
if conn_socket.gettimeout() != 0:
707+
conn_socket.settimeout(timeout)
702708
self.update_parser_timeout(timeout)
703709

704710
def update_parser_timeout(self, timeout: Optional[float] = None):

tests/conftest.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,13 @@ def pytest_addoption(parser):
163163
help="Name of the Redis endpoint the tests should be executed on",
164164
)
165165

166+
parser.addoption(
167+
"--cluster-endpoint-name",
168+
action="store",
169+
default=None,
170+
help="Name of the Redis endpoint with OSS API the tests should be executed on",
171+
)
172+
166173

167174
def _get_info(redis_url):
168175
client = redis.Redis.from_url(redis_url)

tests/maint_notifications/proxy_server_helpers.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import base64
22
from dataclasses import dataclass
33
import logging
4-
import re
54
from typing import Union
65

76
from redis.http.http_client import HttpClient, HttpError
@@ -10,6 +9,25 @@
109
class RespTranslator:
1110
"""Helper class to translate between RESP and other encodings."""
1211

12+
@staticmethod
13+
def re_cluster_maint_notification_to_resp(txt: str) -> str:
14+
"""Convert query to RESP format."""
15+
parts = txt.split()
16+
17+
match parts:
18+
case ["MOVING", seq_id, time, new_host]:
19+
return f">4\r\n+MOVING\r\n:{seq_id}\r\n:{time}\r\n+{new_host}\r\n"
20+
case ["MIGRATING", seq_id, time, shards]:
21+
return f">4\r\n+MIGRATING\r\n:{seq_id}\r\n:{time}\r\n+{shards}\r\n"
22+
case ["MIGRATED", seq_id, shards]:
23+
return f">3\r\n+MIGRATED\r\n:{seq_id}\r\n+{shards}\r\n"
24+
case ["FAILING_OVER", seq_id, time, shards]:
25+
return f">4\r\n+FAILING_OVER\r\n:{seq_id}\r\n:{time}\r\n+{shards}\r\n"
26+
case ["FAILED_OVER", seq_id, shards]:
27+
return f">3\r\n+FAILED_OVER\r\n:{seq_id}\r\n+{shards}\r\n"
28+
case _:
29+
raise NotImplementedError(f"Unknown notification: {txt}")
30+
1331
@staticmethod
1432
def oss_maint_notification_to_resp(txt: str) -> str:
1533
"""Convert query to RESP format."""
@@ -232,8 +250,7 @@ def send_notification(
232250

233251
if not conn_ids:
234252
raise RuntimeError(
235-
f"No connections found for node {node_port}. "
236-
f"Available nodes: {list(set(c.get('node') for c in stats.get('connections', {}).values()))}"
253+
f"No connections found for node {connected_to_port}. \nStats: {stats}"
237254
)
238255

239256
# Send notification to each connection

tests/test_asyncio/test_scenario/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from redis.event import AsyncEventListenerInterface, EventDispatcher
1818
from redis.multidb.failure_detector import DEFAULT_MIN_NUM_FAILURES
1919
from tests.test_scenario.conftest import get_endpoints_config, extract_cluster_fqdn
20-
from tests.test_scenario.fault_injector_client import FaultInjectorClient
20+
from tests.test_scenario.fault_injector_client import REFaultInjector
2121

2222

2323
class CheckActiveDatabaseChangedListener(AsyncEventListenerInterface):
@@ -31,7 +31,7 @@ async def listen(self, event: AsyncActiveDatabaseChanged):
3131
@pytest.fixture()
3232
def fault_injector_client():
3333
url = os.getenv("FAULT_INJECTION_API_URL", "http://127.0.0.1:20324")
34-
return FaultInjectorClient(url)
34+
return REFaultInjector(url)
3535

3636

3737
@pytest_asyncio.fixture()

tests/test_scenario/conftest.py

Lines changed: 111 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from urllib.parse import urlparse
77

88
import pytest
9+
from redis import RedisCluster
910

1011
from redis.backoff import NoBackoff, ExponentialBackoff
1112
from redis.event import EventDispatcher, EventListenerInterface
@@ -22,12 +23,16 @@
2223
from redis.client import Redis
2324
from redis.maint_notifications import EndpointType, MaintNotificationsConfig
2425
from redis.retry import Retry
25-
from tests.test_scenario.fault_injector_client import FaultInjectorClient
26+
from tests.test_scenario.fault_injector_client import (
27+
ProxyServerFaultInjector,
28+
REFaultInjector,
29+
)
2630

2731
RELAXED_TIMEOUT = 30
2832
CLIENT_TIMEOUT = 5
2933

3034
DEFAULT_ENDPOINT_NAME = "m-standard"
35+
DEFAULT_OSS_API_ENDPOINT_NAME = "oss-api"
3136

3237

3338
class CheckActiveDatabaseChangedListener(EventListenerInterface):
@@ -38,13 +43,24 @@ def listen(self, event: ActiveDatabaseChanged):
3843
self.is_changed_flag = True
3944

4045

46+
def use_mock_proxy():
47+
return os.getenv("REDIS_ENTERPRISE_TESTS", "true").lower() == "false"
48+
49+
4150
@pytest.fixture()
4251
def endpoint_name(request):
4352
return request.config.getoption("--endpoint-name") or os.getenv(
4453
"REDIS_ENDPOINT_NAME", DEFAULT_ENDPOINT_NAME
4554
)
4655

4756

57+
@pytest.fixture()
58+
def cluster_endpoint_name(request):
59+
return request.config.getoption("--cluster-endpoint-name") or os.getenv(
60+
"REDIS_CLUSTER_ENDPOINT_NAME", DEFAULT_OSS_API_ENDPOINT_NAME
61+
)
62+
63+
4864
def get_endpoints_config(endpoint_name: str):
4965
endpoints_config = os.getenv("REDIS_ENDPOINTS_CONFIG_PATH", None)
5066

@@ -67,10 +83,27 @@ def endpoints_config(endpoint_name: str):
6783
return get_endpoints_config(endpoint_name)
6884

6985

86+
@pytest.fixture()
87+
def cluster_endpoints_config(cluster_endpoint_name: str):
88+
return get_endpoints_config(cluster_endpoint_name)
89+
90+
7091
@pytest.fixture()
7192
def fault_injector_client():
72-
url = os.getenv("FAULT_INJECTION_API_URL", "http://127.0.0.1:20324")
73-
return FaultInjectorClient(url)
93+
if use_mock_proxy():
94+
return ProxyServerFaultInjector(oss_cluster=False)
95+
else:
96+
url = os.getenv("FAULT_INJECTION_API_URL", "http://127.0.0.1:20324")
97+
return REFaultInjector(url)
98+
99+
100+
@pytest.fixture()
101+
def fault_injector_client_oss_api():
102+
if use_mock_proxy():
103+
return ProxyServerFaultInjector(oss_cluster=True)
104+
else:
105+
url = os.getenv("FAULT_INJECTION_API_URL", "http://127.0.0.1:20324")
106+
return REFaultInjector(url)
74107

75108

76109
@pytest.fixture()
@@ -208,8 +241,6 @@ def _get_client_maint_notifications(
208241
endpoint_type=endpoint_type,
209242
)
210243

211-
# Create Redis client with maintenance notifications config
212-
# This will automatically create the MaintNotificationsPoolHandler
213244
if disable_retries:
214245
retry = Retry(NoBackoff(), 0)
215246
else:
@@ -218,6 +249,8 @@ def _get_client_maint_notifications(
218249
tls_enabled = True if parsed.scheme == "rediss" else False
219250
logging.info(f"TLS enabled: {tls_enabled}")
220251

252+
# Create Redis client with maintenance notifications config
253+
# This will automatically create the MaintNotificationsPoolHandler
221254
client = Redis(
222255
host=host,
223256
port=port,
@@ -235,3 +268,76 @@ def _get_client_maint_notifications(
235268
logging.info(f"Client uses Protocol: {client.connection_pool.get_protocol()}")
236269

237270
return client
271+
272+
273+
@pytest.fixture()
274+
def cluster_client_maint_notifications(cluster_endpoints_config):
275+
return _get_cluster_client_maint_notifications(cluster_endpoints_config)
276+
277+
278+
def _get_cluster_client_maint_notifications(
279+
endpoints_config,
280+
protocol: int = 3,
281+
enable_maintenance_notifications: bool = True,
282+
endpoint_type: Optional[EndpointType] = None,
283+
enable_relaxed_timeout: bool = True,
284+
enable_proactive_reconnect: bool = True,
285+
disable_retries: bool = False,
286+
socket_timeout: Optional[float] = None,
287+
host_config: Optional[str] = None,
288+
):
289+
"""Create Redis cluster client with maintenance notifications enabled."""
290+
# Get credentials from the configuration
291+
username = endpoints_config.get("username")
292+
password = endpoints_config.get("password")
293+
294+
# Parse host and port from endpoints URL
295+
endpoints = endpoints_config.get("endpoints", [])
296+
if not endpoints:
297+
raise ValueError("No endpoints found in configuration")
298+
299+
parsed = urlparse(endpoints[0])
300+
host = parsed.hostname
301+
port = parsed.port
302+
303+
if not host:
304+
raise ValueError(f"Could not parse host from endpoint URL: {endpoints[0]}")
305+
306+
logging.info(f"Connecting to Redis Enterprise: {host}:{port} with user: {username}")
307+
308+
if disable_retries:
309+
retry = Retry(NoBackoff(), 0)
310+
else:
311+
retry = Retry(backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3)
312+
313+
tls_enabled = True if parsed.scheme == "rediss" else False
314+
logging.info(f"TLS enabled: {tls_enabled}")
315+
316+
# Configure maintenance notifications
317+
maintenance_config = MaintNotificationsConfig(
318+
enabled=enable_maintenance_notifications,
319+
proactive_reconnect=enable_proactive_reconnect,
320+
relaxed_timeout=RELAXED_TIMEOUT if enable_relaxed_timeout else -1,
321+
endpoint_type=endpoint_type,
322+
)
323+
324+
# Create Redis cluster client with maintenance notifications config
325+
client = RedisCluster(
326+
host=host,
327+
port=port,
328+
socket_timeout=CLIENT_TIMEOUT if socket_timeout is None else socket_timeout,
329+
username=username,
330+
password=password,
331+
ssl=tls_enabled,
332+
ssl_cert_reqs="none",
333+
ssl_check_hostname=False,
334+
protocol=protocol, # RESP3 required for push notifications
335+
maint_notifications_config=maintenance_config,
336+
retry=retry,
337+
)
338+
logging.info("Redis cluster client created with maintenance notifications enabled")
339+
logging.info(
340+
f"Cluster working with the following nodes: {[(node.name, node.server_type) for node in client.get_nodes()]}"
341+
)
342+
343+
return client

0 commit comments

Comments
 (0)