Skip to content

Commit

Permalink
Fix dead weakref in sentinel connection causing ReferenceError (redis…
Browse files Browse the repository at this point in the history
…#2767) (redis#2771)

* Fix dead weakref in sentinel conn (redis#2767)

* Update CHANGES

---------

Co-authored-by: Igor Malinovskiy <[email protected]>
Co-authored-by: dvora-h <[email protected]>
  • Loading branch information
3 people authored and zach-iee committed Aug 21, 2023
1 parent 72e7716 commit 1a27e4d
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Fix dead weakref in sentinel connection causing ReferenceError (#2767)
* Fix #2768, Fix KeyError: 'first-entry' in parse_xinfo_stream.
* Fix #2749, remove unnecessary __del__ logic to close connections.
* Fix #2754, adding a missing argument to SentinelManagedConnection
Expand Down
88 changes: 63 additions & 25 deletions redis/sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,54 @@ class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection):
pass


class SentinelConnectionPoolProxy:
def __init__(
self,
connection_pool,
is_master,
check_connection,
service_name,
sentinel_manager,
):
self.connection_pool_ref = weakref.ref(connection_pool)
self.is_master = is_master
self.check_connection = check_connection
self.service_name = service_name
self.sentinel_manager = sentinel_manager
self.reset()

def reset(self):
self.master_address = None
self.slave_rr_counter = None

def get_master_address(self):
master_address = self.sentinel_manager.discover_master(self.service_name)
if self.is_master and self.master_address != master_address:
self.master_address = master_address
# disconnect any idle connections so that they reconnect
# to the new master the next time that they are used.
connection_pool = self.connection_pool_ref()
if connection_pool is not None:
connection_pool.disconnect(inuse_connections=False)
return master_address

def rotate_slaves(self):
slaves = self.sentinel_manager.discover_slaves(self.service_name)
if slaves:
if self.slave_rr_counter is None:
self.slave_rr_counter = random.randint(0, len(slaves) - 1)
for _ in range(len(slaves)):
self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves)
slave = slaves[self.slave_rr_counter]
yield slave
# Fallback to the master connection
try:
yield self.get_master_address()
except MasterNotFoundError:
pass
raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")


class SentinelConnectionPool(ConnectionPool):
"""
Sentinel backed connection pool.
Expand All @@ -95,8 +143,15 @@ def __init__(self, service_name, sentinel_manager, **kwargs):
)
self.is_master = kwargs.pop("is_master", True)
self.check_connection = kwargs.pop("check_connection", False)
self.proxy = SentinelConnectionPoolProxy(
connection_pool=self,
is_master=self.is_master,
check_connection=self.check_connection,
service_name=service_name,
sentinel_manager=sentinel_manager,
)
super().__init__(**kwargs)
self.connection_kwargs["connection_pool"] = weakref.proxy(self)
self.connection_kwargs["connection_pool"] = self.proxy
self.service_name = service_name
self.sentinel_manager = sentinel_manager

Expand All @@ -106,8 +161,11 @@ def __repr__(self):

def reset(self):
super().reset()
self.master_address = None
self.slave_rr_counter = None
self.proxy.reset()

@property
def master_address(self):
return self.proxy.master_address

def owns_connection(self, connection):
check = not self.is_master or (
Expand All @@ -117,31 +175,11 @@ def owns_connection(self, connection):
return check and parent.owns_connection(connection)

def get_master_address(self):
master_address = self.sentinel_manager.discover_master(self.service_name)
if self.is_master:
if self.master_address != master_address:
self.master_address = master_address
# disconnect any idle connections so that they reconnect
# to the new master the next time that they are used.
self.disconnect(inuse_connections=False)
return master_address
return self.proxy.get_master_address()

def rotate_slaves(self):
"Round-robin slave balancer"
slaves = self.sentinel_manager.discover_slaves(self.service_name)
if slaves:
if self.slave_rr_counter is None:
self.slave_rr_counter = random.randint(0, len(slaves) - 1)
for _ in range(len(slaves)):
self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves)
slave = slaves[self.slave_rr_counter]
yield slave
# Fallback to the master connection
try:
yield self.get_master_address()
except MasterNotFoundError:
pass
raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")
return self.proxy.rotate_slaves()


class Sentinel(SentinelCommands):
Expand Down
9 changes: 9 additions & 0 deletions tests/test_sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ def test_discover_master_error(sentinel):
sentinel.discover_master("xxx")


@pytest.mark.onlynoncluster
def test_dead_pool(sentinel):
master = sentinel.master_for("mymaster", db=9)
conn = master.connection_pool.get_connection("_")
conn.disconnect()
del master
conn.connect()


@pytest.mark.onlynoncluster
def test_discover_master_sentinel_down(cluster, sentinel, master_ip):
# Put first sentinel 'foo' down
Expand Down

0 comments on commit 1a27e4d

Please sign in to comment.