Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Failover handling improvements for RedisCluster and Async RedisCluster #2377

Merged
merged 17 commits into from
Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
* ClusterPipeline Doesn't Handle ConnectionError for Dead Hosts (#2225)
* Remove compatibility code for old versions of Hiredis, drop Packaging dependency
* The `deprecated` library is no longer a dependency
* Failover handling improvements for RedisCluster and Async RedisCluster (#2377)
* Fixed "cannot pickle '_thread.lock' object" bug (#2354, #2297)

* 4.1.3 (Feb 8, 2022)
* Fix flushdb and flushall (#1926)
Expand Down
2 changes: 2 additions & 0 deletions redis/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sys

from redis.backoff import default_backoff
from redis.client import Redis, StrictRedis
from redis.cluster import RedisCluster
from redis.connection import (
Expand Down Expand Up @@ -64,6 +65,7 @@ def int_or_str(value):
"ConnectionPool",
"DataError",
"from_url",
"default_backoff",
"InvalidResponse",
"PubSubError",
"ReadOnlyError",
Expand Down
2 changes: 2 additions & 0 deletions redis/asyncio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
SentinelManagedSSLConnection,
)
from redis.asyncio.utils import from_url
from redis.backoff import default_backoff
from redis.exceptions import (
AuthenticationError,
AuthenticationWrongNumberOfArgsError,
Expand Down Expand Up @@ -43,6 +44,7 @@
"ConnectionPool",
"DataError",
"from_url",
"default_backoff",
"InvalidResponse",
"PubSubError",
"ReadOnlyError",
Expand Down
7 changes: 7 additions & 0 deletions redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,13 @@ def get_connection_kwargs(self):
"""Get the connection's key-word arguments"""
return self.connection_pool.connection_kwargs

def get_retry(self) -> Optional["Retry"]:
return self.get_connection_kwargs().get("retry")

def set_retry(self, retry: "Retry") -> None:
self.get_connection_kwargs().update({"retry": retry})
self.connection_pool.set_retry(retry)

def load_external_module(self, funcname, func):
"""
This function can be used to add externally defined redis modules,
Expand Down
121 changes: 63 additions & 58 deletions redis/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
parse_url,
)
from redis.asyncio.parser import CommandsParser
from redis.asyncio.retry import Retry
from redis.backoff import default_backoff
from redis.client import EMPTY_RESPONSE, NEVER_DECODE, AbstractRedis
from redis.cluster import (
PIPELINE_BLOCKED_COMMANDS,
Expand Down Expand Up @@ -108,10 +110,10 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
:param startup_nodes:
| :class:`~.ClusterNode` to used as a startup node
:param require_full_coverage:
| When set to ``False``: the client will not require a full coverage of the
slots. However, if not all slots are covered, and at least one node has
``cluster-require-full-coverage`` set to ``yes``, the server will throw a
:class:`~.ClusterDownError` for some key-based commands.
| When set to ``False``: the client will not require a full coverage of
the slots. However, if not all slots are covered, and at least one node
has ``cluster-require-full-coverage`` set to ``yes``, the server will throw
a :class:`~.ClusterDownError` for some key-based commands.
| When set to ``True``: all slots must be covered to construct the cluster
client. If not all slots are covered, :class:`~.RedisClusterException` will be
thrown.
Expand All @@ -134,7 +136,10 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
or :class:`~.ConnectionError` or :class:`~.ClusterDownError` are encountered
:param connection_error_retry_attempts:
| Number of times to retry before reinitializing when :class:`~.TimeoutError`
or :class:`~.ConnectionError` are encountered
or :class:`~.ConnectionError` are encountered.
The default backoff strategy will be set if Retry object is not passed (see
default_backoff in backoff.py). To change it, pass a custom Retry object
using the "retry" keyword.
:param max_connections:
| Maximum number of connections per node. If there are no free connections & the
maximum number of connections are already created, a
Expand Down Expand Up @@ -212,9 +217,9 @@ def __init__(
startup_nodes: Optional[List["ClusterNode"]] = None,
require_full_coverage: bool = True,
read_from_replicas: bool = False,
reinitialize_steps: int = 10,
reinitialize_steps: int = 5,
cluster_error_retry_attempts: int = 3,
connection_error_retry_attempts: int = 5,
connection_error_retry_attempts: int = 3,
max_connections: int = 2**31,
# Client related kwargs
db: Union[str, int] = 0,
Expand All @@ -232,6 +237,8 @@ def __init__(
socket_keepalive: bool = False,
socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
socket_timeout: Optional[float] = None,
retry: Optional["Retry"] = None,
chayim marked this conversation as resolved.
Show resolved Hide resolved
retry_on_error: Optional[List[Exception]] = None,
# SSL related kwargs
ssl: bool = False,
ssl_ca_certs: Optional[str] = None,
Expand Down Expand Up @@ -278,6 +285,7 @@ def __init__(
"socket_keepalive": socket_keepalive,
"socket_keepalive_options": socket_keepalive_options,
"socket_timeout": socket_timeout,
"retry": retry,
}

if ssl:
Expand All @@ -298,6 +306,18 @@ def __init__(
# Call our on_connect function to configure READONLY mode
kwargs["redis_connect_func"] = self.on_connect

self.retry = retry
if retry or retry_on_error or connection_error_retry_attempts > 0:
chayim marked this conversation as resolved.
Show resolved Hide resolved
# Set a retry object for all cluster nodes
self.retry = retry or Retry(
default_backoff(), connection_error_retry_attempts
)
if not retry_on_error:
# Default errors for retrying
retry_on_error = [ConnectionError, TimeoutError]
self.retry.update_supported_errors(retry_on_error)
kwargs.update({"retry": self.retry})

kwargs["response_callbacks"] = self.__class__.RESPONSE_CALLBACKS.copy()
self.connection_kwargs = kwargs

Expand All @@ -319,7 +339,6 @@ def __init__(
self.reinitialize_steps = reinitialize_steps
self.cluster_error_retry_attempts = cluster_error_retry_attempts
self.connection_error_retry_attempts = connection_error_retry_attempts

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: Don't delete this line.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't delete it, just moved it to line 309

self.reinitialize_counter = 0
self.commands_parser = CommandsParser()
self.node_flags = self.__class__.NODE_FLAGS.copy()
Expand Down Expand Up @@ -477,6 +496,16 @@ def get_connection_kwargs(self) -> Dict[str, Optional[Any]]:
"""Get the kwargs passed to :class:`~redis.asyncio.connection.Connection`."""
return self.connection_kwargs

def get_retry(self) -> Optional["Retry"]:
return self.retry

def set_retry(self, retry: "Retry") -> None:
self.retry = retry
for node in self.get_nodes():
node.connection_kwargs.update({"retry": retry})
barshaul marked this conversation as resolved.
Show resolved Hide resolved
for conn in node._connections:
conn.retry = retry

def set_response_callback(self, command: str, callback: ResponseCallbackT) -> None:
"""Set a custom response callback."""
self.response_callbacks[command] = callback
Expand Down Expand Up @@ -614,9 +643,11 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any:
if passed_targets and not self._is_node_flag(passed_targets):
target_nodes = self._parse_target_nodes(passed_targets)
target_nodes_specified = True
retry_attempts = 1
retry_attempts = 0

for _ in range(retry_attempts):
# Add one for the first execution
execute_attempts = 1 + retry_attempts
for _ in range(execute_attempts):
if self._initialize:
await self.initialize()
try:
Expand Down Expand Up @@ -654,25 +685,21 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any:
)
return dict(zip(keys, values))
except Exception as e:
if type(e) in self.__class__.ERRORS_ALLOW_RETRY:
# The nodes and slots cache were reinitialized.
if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
# The nodes and slots cache were should be reinitialized.
# Try again with the new cluster setup.
exception = e
retry_attempts -= 1
continue
else:
# All other errors should be raised.
raise

# If it fails the configured number of times then raise exception back
# to caller of this method
raise exception
# raise the exception
raise e

async def _execute_command(
self, target_node: "ClusterNode", *args: Union[KeyT, EncodableT], **kwargs: Any
) -> Any:
asking = moved = False
redirect_addr = None
ttl = self.RedisClusterRequestTTL
connection_error_retry_counter = 0

while ttl > 0:
ttl -= 1
Expand All @@ -691,25 +718,18 @@ async def _execute_command(
moved = False

return await target_node.execute_command(*args, **kwargs)
except BusyLoadingError:
except (BusyLoadingError, MaxConnectionsError):
raise
except (ConnectionError, TimeoutError):
# Connection retries are being handled in the node's
# Retry object.
# Remove the failed node from the startup nodes before we try
# to reinitialize the cluster
self.nodes_manager.startup_nodes.pop(target_node.name, None)
# Hard force of reinitialize of the node/slots setup
# and try again with the new setup
chayim marked this conversation as resolved.
Show resolved Hide resolved
await self.close()
raise
except (ConnectionError, TimeoutError) as e:
# Give the node 0.25 seconds to get back up and retry again with the
# same node and configuration. After the defined number of attempts, try
# to reinitialize the cluster and try again.
connection_error_retry_counter += 1
if (
connection_error_retry_counter
< self.connection_error_retry_attempts
):
await asyncio.sleep(0.25)
else:
if isinstance(e, MaxConnectionsError):
raise
# Hard force of reinitialize of the node/slots setup
# and try again with the new setup
await self.close()
raise
except ClusterDownError:
# ClusterDownError can occur during a failover and to get
# self-healed, we will try to reinitialize the cluster layout
Expand Down Expand Up @@ -1071,26 +1091,11 @@ async def initialize(self) -> None:
)
cluster_slots = await startup_node.execute_command("CLUSTER SLOTS")
startup_nodes_reachable = True
except (ConnectionError, TimeoutError) as e:
except Exception as e:
# Try the next startup node.
# The exception is saved and raised only if we have no more nodes.
exception = e
continue
except ResponseError as e:
# Isn't a cluster connection, so it won't parse these
# exceptions automatically
message = e.__str__()
if "CLUSTERDOWN" in message or "MASTERDOWN" in message:
continue
else:
raise RedisClusterException(
'ERROR sending "cluster slots" command to redis '
f"server: {startup_node}. error: {message}"
)
except Exception as e:
message = e.__str__()
raise RedisClusterException(
'ERROR sending "cluster slots" command to redis '
f"server {startup_node.name}. error: {message}"
)

# CLUSTER SLOTS command results in the following output:
# [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
Expand Down Expand Up @@ -1171,8 +1176,8 @@ async def initialize(self) -> None:

if not startup_nodes_reachable:
raise RedisClusterException(
"Redis Cluster cannot be connected. Please provide at least "
"one reachable node. "
f"Redis Cluster cannot be connected. Please provide at least "
f"one reachable node: {str(exception)}"
) from exception

# Check if the slots are not fully covered
Expand Down
8 changes: 7 additions & 1 deletion redis/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ def __init__(
retry_on_error.append(socket.timeout)
retry_on_error.append(asyncio.TimeoutError)
self.retry_on_error = retry_on_error
if retry_on_error:
if retry or retry_on_error:
chayim marked this conversation as resolved.
Show resolved Hide resolved
if not retry:
self.retry = Retry(NoBackoff(), 1)
else:
Expand Down Expand Up @@ -1426,6 +1426,12 @@ async def disconnect(self, inuse_connections: bool = True):
if exc:
raise exc

def set_retry(self, retry: "Retry") -> None:
for conn in self._available_connections:
conn.retry = retry
for conn in self._in_use_connections:
conn.retry = retry


class BlockingConnectionPool(ConnectionPool):
"""
Expand Down
17 changes: 13 additions & 4 deletions redis/backoff.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import random
from abc import ABC, abstractmethod

# Maximum backoff between each retry in seconds
DEFAULT_CAP = 0.512
barshaul marked this conversation as resolved.
Show resolved Hide resolved
# Minimum backoff between each retry in seconds
DEFAULT_BASE = 0.008


class AbstractBackoff(ABC):
"""Backoff interface"""
Expand Down Expand Up @@ -40,7 +45,7 @@ def __init__(self):
class ExponentialBackoff(AbstractBackoff):
"""Exponential backoff upon failure"""

def __init__(self, cap, base):
def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE):
"""
`cap`: maximum backoff time in seconds
`base`: base backoff time in seconds
Expand All @@ -55,7 +60,7 @@ def compute(self, failures):
class FullJitterBackoff(AbstractBackoff):
"""Full jitter backoff upon failure"""

def __init__(self, cap, base):
def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE):
"""
`cap`: maximum backoff time in seconds
`base`: base backoff time in seconds
Expand All @@ -70,7 +75,7 @@ def compute(self, failures):
class EqualJitterBackoff(AbstractBackoff):
"""Equal jitter backoff upon failure"""

def __init__(self, cap, base):
def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE):
"""
`cap`: maximum backoff time in seconds
`base`: base backoff time in seconds
Expand All @@ -86,7 +91,7 @@ def compute(self, failures):
class DecorrelatedJitterBackoff(AbstractBackoff):
"""Decorrelated jitter backoff upon failure"""

def __init__(self, cap, base):
def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE):
"""
`cap`: maximum backoff time in seconds
`base`: base backoff time in seconds
Expand All @@ -103,3 +108,7 @@ def compute(self, failures):
temp = random.uniform(self._base, max_backoff)
self._previous_backoff = min(self._cap, temp)
return self._previous_backoff


def default_backoff():
return EqualJitterBackoff()
9 changes: 9 additions & 0 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
import warnings
from itertools import chain
from typing import Optional

from redis.commands import (
CoreCommands,
Expand All @@ -24,6 +25,7 @@
WatchError,
)
from redis.lock import Lock
from redis.retry import Retry
from redis.utils import safe_str, str_if_bytes

SYM_EMPTY = b""
Expand Down Expand Up @@ -1043,6 +1045,13 @@ def get_connection_kwargs(self):
"""Get the connection's key-word arguments"""
return self.connection_pool.connection_kwargs

def get_retry(self) -> Optional["Retry"]:
return self.get_connection_kwargs().get("retry")

def set_retry(self, retry: "Retry") -> None:
self.get_connection_kwargs().update({"retry": retry})
self.connection_pool.set_retry(retry)

def set_response_callback(self, command, callback):
"""Set a custom Response Callback"""
self.response_callbacks[command] = callback
Expand Down
Loading