Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ClusterPipeline Doesn't Handle ConnectionError for Dead Hosts #2225

Merged
merged 9 commits into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
* Added dynaminc_startup_nodes configuration to RedisCluster
* Fix reusing the old nodes' connections when cluster topology refresh is being done
* Fix RedisCluster to immediately raise AuthenticationError without a retry
* ClusterPipeline Doesn't Handle ConnectionError for Dead Hosts (#2225)

* 4.1.3 (Feb 8, 2022)
* Fix flushdb and flushall (#1926)
* Add redis5 and redis4 dockers (#1871)
Expand Down
69 changes: 44 additions & 25 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1897,34 +1897,53 @@ def _send_cluster_commands(
# we figure out the slot number that command maps to, then from
# the slot determine the node.
for c in attempt:
# refer to our internal node -> slot table that
# tells us where a given
# command should route to.
passed_targets = c.options.pop("target_nodes", None)
if passed_targets and not self._is_nodes_flag(passed_targets):
target_nodes = self._parse_target_nodes(passed_targets)
else:
target_nodes = self._determine_nodes(*c.args, node_flag=passed_targets)
if not target_nodes:
connection_error_retry_counter = 0
while True:
# refer to our internal node -> slot table that
# tells us where a given command should route to.
# (it might be possible we have a cached node that no longer
# exists in the cluster, which is why we do this in a loop)
passed_targets = c.options.pop("target_nodes", None)
if passed_targets and not self._is_nodes_flag(passed_targets):
target_nodes = self._parse_target_nodes(passed_targets)
else:
target_nodes = self._determine_nodes(
*c.args, node_flag=passed_targets
)
if not target_nodes:
raise RedisClusterException(
f"No targets were found to execute {c.args} command on"
)
if len(target_nodes) > 1:
raise RedisClusterException(
f"No targets were found to execute {c.args} command on"
f"Too many targets for command {c.args}"
)
if len(target_nodes) > 1:
raise RedisClusterException(f"Too many targets for command {c.args}")

node = target_nodes[0]
# now that we know the name of the node
# ( it's just a string in the form of host:port )
# we can build a list of commands for each node.
node_name = node.name
if node_name not in nodes:
redis_node = self.get_redis_connection(node)
connection = get_connection(redis_node, c.args)
nodes[node_name] = NodeCommands(
redis_node.parse_response, redis_node.connection_pool, connection
)

nodes[node_name].append(c)
node = target_nodes[0]

# now that we know the name of the node
# ( it's just a string in the form of host:port )
# we can build a list of commands for each node.
node_name = node.name
if node_name not in nodes:
redis_node = self.get_redis_connection(node)
try:
connection = get_connection(redis_node, c.args)
except ConnectionError:
connection_error_retry_counter += 1
if connection_error_retry_counter < 5:
# reinitialize the node -> slot table
self.nodes_manager.initialize()
continue
else:
raise
nodes[node_name] = NodeCommands(
redis_node.parse_response,
redis_node.connection_pool,
connection,
)
nodes[node_name].append(c)
break

# send the commands in sequence.
# we write to all the open sockets for each node first,
Expand Down