Skip to content

Commit 5924368

Browse files
committed
Adding SMIGRATED handling
1 parent da185bc commit 5924368

File tree

8 files changed

+1067
-152
lines changed

8 files changed

+1067
-152
lines changed

redis/_parsers/base.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
OSSNodeMigratedNotification,
1515
OSSNodeMigratingNotification,
1616
)
17+
from redis.utils import safe_str
1718

1819
if sys.version_info.major >= 3 and sys.version_info.minor >= 11:
1920
from asyncio import timeout as async_timeout
@@ -194,8 +195,9 @@ def parse_oss_maintenance_completed_msg(response):
194195
# Expected message format is:
195196
# SMIGRATED <seq_number> <host:port> <slot, range1-range2,...>
196197
id = response[1]
197-
node_address = response[2]
198+
node_address = safe_str(response[2])
198199
slots = response[3]
200+
199201
return OSSNodeMigratedNotification(id, node_address, slots)
200202

201203
@staticmethod
@@ -225,9 +227,7 @@ def parse_moving_msg(response):
225227
if response[3] is None:
226228
host, port = None, None
227229
else:
228-
value = response[3]
229-
if isinstance(value, bytes):
230-
value = value.decode()
230+
value = safe_str(response[3])
231231
host, port = value.split(":")
232232
port = int(port) if port is not None else None
233233

redis/client.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
from redis.lock import Lock
5959
from redis.maint_notifications import (
6060
MaintNotificationsConfig,
61+
OSSMaintNotificationsHandler,
6162
)
6263
from redis.retry import Retry
6364
from redis.utils import (
@@ -250,6 +251,9 @@ def __init__(
250251
cache_config: Optional[CacheConfig] = None,
251252
event_dispatcher: Optional[EventDispatcher] = None,
252253
maint_notifications_config: Optional[MaintNotificationsConfig] = None,
254+
oss_cluster_maint_notifications_handler: Optional[
255+
OSSMaintNotificationsHandler
256+
] = None,
253257
) -> None:
254258
"""
255259
Initialize a new Redis client.
@@ -288,6 +292,11 @@ def __init__(
288292
will be enabled by default (logic is included in the connection pool
289293
initialization).
290294
Argument is ignored when connection_pool is provided.
295+
oss_cluster_maint_notifications_handler:
296+
handler for OSS cluster notifications - see
297+
`redis.maint_notifications.OSSMaintNotificationsHandler` for details.
298+
Only supported with RESP3
299+
Argument is ignored when connection_pool is provided.
291300
"""
292301
if event_dispatcher is None:
293302
self._event_dispatcher = EventDispatcher()
@@ -380,6 +389,12 @@ def __init__(
380389
"maint_notifications_config": maint_notifications_config,
381390
}
382391
)
392+
if oss_cluster_maint_notifications_handler:
393+
kwargs.update(
394+
{
395+
"oss_cluster_maint_notifications_handler": oss_cluster_maint_notifications_handler,
396+
}
397+
)
383398
connection_pool = ConnectionPool(**kwargs)
384399
self._event_dispatcher.dispatch(
385400
AfterPooledConnectionsInstantiationEvent(

redis/cluster.py

Lines changed: 118 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,10 @@
5252
WatchError,
5353
)
5454
from redis.lock import Lock
55-
from redis.maint_notifications import MaintNotificationsConfig
55+
from redis.maint_notifications import (
56+
MaintNotificationsConfig,
57+
OSSMaintNotificationsHandler,
58+
)
5659
from redis.retry import Retry
5760
from redis.utils import (
5861
deprecated_args,
@@ -214,6 +217,62 @@ def cleanup_kwargs(**kwargs):
214217
return connection_kwargs
215218

216219

220+
class MaintNotificationsAbstractRedisCluster:
221+
"""
222+
Abstract class for handling maintenance notifications logic.
223+
This class is expected to be used as base class together with RedisCluster.
224+
225+
This class is intended to be used with multiple inheritance!
226+
227+
All logic related to maintenance notifications is encapsulated in this class.
228+
"""
229+
230+
def __init__(
231+
self,
232+
maint_notifications_config: Optional[MaintNotificationsConfig],
233+
**kwargs,
234+
):
235+
# Initialize maintenance notifications
236+
is_protocol_supported = kwargs.get("protocol") in [3, "3"]
237+
if maint_notifications_config is None and is_protocol_supported:
238+
maint_notifications_config = MaintNotificationsConfig()
239+
240+
self.maint_notifications_config = maint_notifications_config
241+
242+
if maint_notifications_config and maint_notifications_config.enabled:
243+
if not is_protocol_supported:
244+
raise RedisError(
245+
"Maintenance notifications handlers on connection are only supported with RESP version 3"
246+
)
247+
self._oss_cluster_maint_notifications_handler = (
248+
OSSMaintNotificationsHandler(self, maint_notifications_config)
249+
)
250+
# Update connection kwargs for all future nodes connections
251+
self._update_connection_kwargs_for_maint_notifications(
252+
self._oss_cluster_maint_notifications_handler
253+
)
254+
# Update existing nodes connections - they are created as part of the RedsiCluster constructor
255+
for node in self.get_nodes():
256+
node.redis_connection.connection_pool.update_maint_notifications_config(
257+
self.maint_notifications_config,
258+
oss_cluster_maint_notifications_handler=self._oss_cluster_maint_notifications_handler,
259+
)
260+
else:
261+
self._oss_cluster_maint_notifications_handler = None
262+
263+
def _update_connection_kwargs_for_maint_notifications(
264+
self, oss_cluster_maint_notifications_handler: OSSMaintNotificationsHandler
265+
):
266+
"""
267+
Update the connection kwargs for all future connections.
268+
"""
269+
self.nodes_manager.connection_kwargs.update(
270+
{
271+
"oss_cluster_maint_notifications_handler": oss_cluster_maint_notifications_handler,
272+
}
273+
)
274+
275+
217276
class AbstractRedisCluster:
218277
RedisClusterRequestTTL = 16
219278

@@ -461,7 +520,9 @@ def replace_default_node(self, target_node: "ClusterNode" = None) -> None:
461520
self.nodes_manager.default_node = random.choice(replicas)
462521

463522

464-
class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
523+
class RedisCluster(
524+
AbstractRedisCluster, MaintNotificationsAbstractRedisCluster, RedisClusterCommands
525+
):
465526
@classmethod
466527
def from_url(cls, url, **kwargs):
467528
"""
@@ -612,8 +673,7 @@ def __init__(
612673
`redis.maint_notifications.MaintNotificationsConfig` for details.
613674
Only supported with RESP3.
614675
If not provided and protocol is RESP3, the maintenance notifications
615-
will be enabled by default (logic is included in the NodesManager
616-
initialization).
676+
will be enabled by default.
617677
:**kwargs:
618678
Extra arguments that will be sent into Redis instance when created
619679
(See Official redis-py doc for supported kwargs - the only limitation
@@ -698,6 +758,13 @@ def __init__(
698758
if (cache_config or cache) and protocol not in [3, "3"]:
699759
raise RedisError("Client caching is only supported with RESP version 3")
700760

761+
if maint_notifications_config and protocol not in [3, "3"]:
762+
raise RedisError(
763+
"Maintenance notifications are only supported with RESP version 3"
764+
)
765+
if protocol in [3, "3"] and maint_notifications_config is None:
766+
maint_notifications_config = MaintNotificationsConfig()
767+
701768
self.command_flags = self.__class__.COMMAND_FLAGS.copy()
702769
self.node_flags = self.__class__.NODE_FLAGS.copy()
703770
self.read_from_replicas = read_from_replicas
@@ -709,6 +776,7 @@ def __init__(
709776
else:
710777
self._event_dispatcher = event_dispatcher
711778
self.startup_nodes = startup_nodes
779+
712780
self.nodes_manager = NodesManager(
713781
startup_nodes=startup_nodes,
714782
from_url=from_url,
@@ -763,6 +831,10 @@ def __init__(
763831
self._aggregate_nodes = None
764832
self._lock = threading.RLock()
765833

834+
MaintNotificationsAbstractRedisCluster.__init__(
835+
self, maint_notifications_config, **kwargs
836+
)
837+
766838
def __enter__(self):
767839
return self
768840

@@ -1632,9 +1704,7 @@ def __init__(
16321704
cache_config: Optional[CacheConfig] = None,
16331705
cache_factory: Optional[CacheFactoryInterface] = None,
16341706
event_dispatcher: Optional[EventDispatcher] = None,
1635-
maint_notifications_config: Optional[
1636-
MaintNotificationsConfig
1637-
] = MaintNotificationsConfig(),
1707+
maint_notifications_config: Optional[MaintNotificationsConfig] = None,
16381708
**kwargs,
16391709
):
16401710
self.nodes_cache: Dict[str, Redis] = {}
@@ -1879,11 +1949,29 @@ def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache):
18791949

18801950
return target_node
18811951

1882-
def initialize(self):
1952+
def initialize(
1953+
self,
1954+
additional_startup_nodes_info: List[Tuple[str, int]] = [],
1955+
disconect_startup_nodes_pools: bool = True,
1956+
):
18831957
"""
18841958
Initializes the nodes cache, slots cache and redis connections.
18851959
:startup_nodes:
18861960
Responsible for discovering other nodes in the cluster
1961+
:disconect_startup_nodes_pools:
1962+
Whether to disconnect the connection pool of the startup nodes
1963+
after the initialization is complete. This is useful when the
1964+
startup nodes are not part of the cluster and we want to avoid
1965+
keeping the connection open.
1966+
:additional_startup_nodes_info:
1967+
Additional nodes to add temporarily to the startup nodes.
1968+
The additional nodes will be used just in the process of extraction of the slots
1969+
and nodes information from the cluster.
1970+
This is useful when we want to add new nodes to the cluster
1971+
and initialize the client
1972+
with them.
1973+
The format of the list is a list of tuples, where each tuple contains
1974+
the host and port of the node.
18871975
"""
18881976
self.reset()
18891977
tmp_nodes_cache = {}
@@ -1893,9 +1981,25 @@ def initialize(self):
18931981
fully_covered = False
18941982
kwargs = self.connection_kwargs
18951983
exception = None
1984+
1985+
# Create cache if it's not provided and cache config is set
1986+
# should be done before initializing the first connection
1987+
# so that it will be applied to all connections
1988+
if self._cache is None and self._cache_config is not None:
1989+
if self._cache_factory is None:
1990+
self._cache = CacheFactory(self._cache_config).get_cache()
1991+
else:
1992+
self._cache = self._cache_factory.get_cache()
1993+
1994+
additional_startup_nodes = [
1995+
ClusterNode(host, port) for host, port in additional_startup_nodes_info
1996+
]
18961997
# Convert to tuple to prevent RuntimeError if self.startup_nodes
18971998
# is modified during iteration
1898-
for startup_node in tuple(self.startup_nodes.values()):
1999+
for startup_node in (
2000+
*self.startup_nodes.values(),
2001+
*additional_startup_nodes,
2002+
):
18992003
try:
19002004
if startup_node.redis_connection:
19012005
r = startup_node.redis_connection
@@ -1911,7 +2015,11 @@ def initialize(self):
19112015
# Make sure cluster mode is enabled on this node
19122016
try:
19132017
cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS"))
1914-
r.connection_pool.disconnect()
2018+
if disconect_startup_nodes_pools:
2019+
# Disconnect the connection pool to avoid keeping the connection open
2020+
# For some cases we might not want to disconnect current pool and
2021+
# lose in flight commands responses
2022+
r.connection_pool.disconnect()
19152023
except ResponseError:
19162024
raise RedisClusterException(
19172025
"Cluster mode is not enabled on this node"
@@ -1992,12 +2100,6 @@ def initialize(self):
19922100
f"one reachable node: {str(exception)}"
19932101
) from exception
19942102

1995-
if self._cache is None and self._cache_config is not None:
1996-
if self._cache_factory is None:
1997-
self._cache = CacheFactory(self._cache_config).get_cache()
1998-
else:
1999-
self._cache = self._cache_factory.get_cache()
2000-
20012103
# Create Redis connections to all nodes
20022104
self.create_redis_connections(list(tmp_nodes_cache.values()))
20032105

0 commit comments

Comments
 (0)