Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dynamic_startup_nodes parameter to async RedisCluster #3447

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Add dynamic_startup_nodes parameter to async RedisCluster (#2472)
* Move doctests (doc code examples) to main branch
* Update `ResponseT` type hint
* Allow to control the minimum SSL version
Expand Down
19 changes: 17 additions & 2 deletions redis/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
| Enable read from replicas in READONLY mode. You can read possibly stale data.
When set to true, read commands will be assigned between the primary and
its replications in a Round-Robin manner.
:param dynamic_startup_nodes:
| Set the RedisCluster's startup nodes to all the discovered nodes.
If true (default value), the cluster's discovered nodes will be used to
determine the cluster nodes-slots mapping in the next topology refresh.
It will remove the initial passed startup nodes if their endpoints aren't
listed in the CLUSTER SLOTS output.
If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
specific IP addresses, it is best to set it to false.
:param reinitialize_steps:
| Specifies the number of MOVED errors that need to occur before reinitializing
the whole cluster topology. If a MOVED error occurs and the cluster does not
Expand Down Expand Up @@ -233,6 +241,7 @@ def __init__(
startup_nodes: Optional[List["ClusterNode"]] = None,
require_full_coverage: bool = True,
read_from_replicas: bool = False,
dynamic_startup_nodes: bool = True,
reinitialize_steps: int = 5,
cluster_error_retry_attempts: int = 3,
connection_error_retry_attempts: int = 3,
Expand Down Expand Up @@ -370,6 +379,7 @@ def __init__(
startup_nodes,
require_full_coverage,
kwargs,
dynamic_startup_nodes=dynamic_startup_nodes,
address_remap=address_remap,
)
self.encoder = Encoder(encoding, encoding_errors, decode_responses)
Expand Down Expand Up @@ -1093,6 +1103,7 @@ class NodesManager:
"require_full_coverage",
"slots_cache",
"startup_nodes",
"_dynamic_startup_nodes",
"address_remap",
)

Expand All @@ -1101,11 +1112,13 @@ def __init__(
startup_nodes: List["ClusterNode"],
require_full_coverage: bool,
connection_kwargs: Dict[str, Any],
dynamic_startup_nodes: bool = True,
address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
) -> None:
self.startup_nodes = {node.name: node for node in startup_nodes}
self.require_full_coverage = require_full_coverage
self.connection_kwargs = connection_kwargs
self._dynamic_startup_nodes = dynamic_startup_nodes
self.address_remap = address_remap

self.default_node: "ClusterNode" = None
Expand Down Expand Up @@ -1338,8 +1351,10 @@ async def initialize(self) -> None:
# Set the tmp variables to the real variables
self.slots_cache = tmp_slots
self.set_nodes(self.nodes_cache, tmp_nodes_cache, remove_old=True)
# Populate the startup nodes with all discovered nodes
self.set_nodes(self.startup_nodes, self.nodes_cache, remove_old=True)

if self._dynamic_startup_nodes:
# Populate the startup nodes with all discovered nodes
self.set_nodes(self.startup_nodes, self.nodes_cache, remove_old=True)

# Set the default node
self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
Expand Down
21 changes: 21 additions & 0 deletions tests/test_asyncio/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2620,6 +2620,27 @@ def cmd_init_mock(self, r: ClusterNode) -> None:
assert rc.get_node(host=default_host, port=7001) is not None
assert rc.get_node(host=default_host, port=7002) is not None

@pytest.mark.parametrize("dynamic_startup_nodes", [True, False])
async def test_init_slots_dynamic_startup_nodes(self, dynamic_startup_nodes):
rc = await get_mocked_redis_client(
host="[email protected]",
port=7000,
cluster_slots=default_cluster_slots,
dynamic_startup_nodes=dynamic_startup_nodes,
)
# Nodes are taken from default_cluster_slots
discovered_nodes = [
"127.0.0.1:7000",
"127.0.0.1:7001",
"127.0.0.1:7002",
"127.0.0.1:7003",
]
startup_nodes = list(rc.nodes_manager.startup_nodes.keys())
if dynamic_startup_nodes is True:
assert startup_nodes.sort() == discovered_nodes.sort()
else:
assert startup_nodes == ["[email protected]:7000"]


class TestClusterPipeline:
"""Tests for the ClusterPipeline class."""
Expand Down