From 52ce399c01d3d70ffb9cfaa53f442aa3d9384af1 Mon Sep 17 00:00:00 2001 From: c00887447 Date: Fri, 28 Mar 2025 02:04:27 +0800 Subject: [PATCH 1/3] Improves the performance of obtaining write connections. Signed-off-by: c00887447 --- .../PooledClusterConnectionProvider.java | 63 +++++++++---------- 1 file changed, 30 insertions(+), 33 deletions(-) diff --git a/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java b/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java index 56ece5d733..806bd12a1e 100644 --- a/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java +++ b/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java @@ -157,47 +157,44 @@ public CompletableFuture> getConnectionAsync(Conne } private CompletableFuture> getWriteConnection(int slot) { + if (writers[slot] == null) { + stateLock.lock(); + try { + if (writers[slot] == null) { + RedisClusterNode master = partitions.getMasterBySlot(slot); + if (master == null) { + clusterEventListener.onUncoveredSlot(slot); + return Futures.failed(new PartitionSelectorException( + "Cannot determine a partition for slot " + slot + ".", partitions.clone())); + } - CompletableFuture> writer;// avoid races when reconfiguring partitions. + // Use always host and port for slot-oriented operations. We don't want to get reconnected on a different + // host because the nodeId can be handled by a different host. + RedisURI uri = master.getUri(); + ConnectionKey key = new ConnectionKey(ConnectionIntent.WRITE, uri.getHost(), uri.getPort()); - stateLock.lock(); - try { - writer = writers[slot]; - } finally { - stateLock.unlock(); - } + ConnectionFuture> future = getConnectionAsync(key); - if (writer == null) { - RedisClusterNode master = partitions.getMasterBySlot(slot); - if (master == null) { - clusterEventListener.onUncoveredSlot(slot); - return Futures.failed(new PartitionSelectorException("Cannot determine a partition for slot " + slot + ".", - partitions.clone())); - } + return future.thenApply(connection -> { - // Use always host and port for slot-oriented operations. We don't want to get reconnected on a different - // host because the nodeId can be handled by a different host. - RedisURI uri = master.getUri(); - ConnectionKey key = new ConnectionKey(ConnectionIntent.WRITE, uri.getHost(), uri.getPort()); - - ConnectionFuture> future = getConnectionAsync(key); - - return future.thenApply(connection -> { + stateLock.lock(); + try { + if (writers[slot] == null) { + writers[slot] = CompletableFuture.completedFuture(connection); + } + } finally { + stateLock.unlock(); + } - stateLock.lock(); - try { - if (writers[slot] == null) { - writers[slot] = CompletableFuture.completedFuture(connection); - } - } finally { - stateLock.unlock(); + return connection; + }).toCompletableFuture(); } - - return connection; - }).toCompletableFuture(); + } finally { + stateLock.unlock(); + } } - return writer; + return writers[slot]; } private CompletableFuture> getReadConnection(int slot) { From 206dc5b31fbffe95f0c925875be0d64085cee007 Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Fri, 9 May 2025 18:09:58 +0300 Subject: [PATCH 2/3] Polishing --- .../PooledClusterConnectionProvider.java | 57 ++++++++----------- 1 file changed, 25 insertions(+), 32 deletions(-) diff --git a/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java b/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java index 806bd12a1e..c38872ce70 100644 --- a/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java +++ b/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java @@ -74,7 +74,6 @@ class PooledClusterConnectionProvider private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledClusterConnectionProvider.class); - // Contains NodeId-identified and HostAndPort-identified connections. private final Lock stateLock = new ReentrantLock(); private final boolean debugEnabled = logger.isDebugEnabled(); @@ -157,44 +156,39 @@ public CompletableFuture> getConnectionAsync(Conne } private CompletableFuture> getWriteConnection(int slot) { - if (writers[slot] == null) { - stateLock.lock(); - try { - if (writers[slot] == null) { - RedisClusterNode master = partitions.getMasterBySlot(slot); - if (master == null) { - clusterEventListener.onUncoveredSlot(slot); - return Futures.failed(new PartitionSelectorException( - "Cannot determine a partition for slot " + slot + ".", partitions.clone())); - } - // Use always host and port for slot-oriented operations. We don't want to get reconnected on a different - // host because the nodeId can be handled by a different host. - RedisURI uri = master.getUri(); - ConnectionKey key = new ConnectionKey(ConnectionIntent.WRITE, uri.getHost(), uri.getPort()); + CompletableFuture> writer = writers[slot]; + if (writer != null) { + return writer; + } - ConnectionFuture> future = getConnectionAsync(key); + RedisClusterNode master = partitions.getMasterBySlot(slot); + if (master == null) { + clusterEventListener.onUncoveredSlot(slot); + return Futures.failed( + new PartitionSelectorException("Cannot determine a partition for slot " + slot + ".", partitions.clone())); + } - return future.thenApply(connection -> { + // Use always host and port for slot-oriented operations. We don't want to get reconnected on a different + // host because the nodeId can be handled by a different host. + RedisURI uri = master.getUri(); + ConnectionKey key = new ConnectionKey(ConnectionIntent.WRITE, uri.getHost(), uri.getPort()); - stateLock.lock(); - try { - if (writers[slot] == null) { - writers[slot] = CompletableFuture.completedFuture(connection); - } - } finally { - stateLock.unlock(); - } + ConnectionFuture> future = getConnectionAsync(key); + + return future.thenApply(connection -> { - return connection; - }).toCompletableFuture(); + stateLock.lock(); + try { + if (writers[slot] == null) { + writers[slot] = CompletableFuture.completedFuture(connection); } } finally { stateLock.unlock(); } - } - return writers[slot]; + return connection; + }).toCompletableFuture(); } private CompletableFuture> getReadConnection(int slot) { @@ -651,7 +645,6 @@ public ReadFrom getReadFrom() { } /** - * * @return number of connections. */ long getConnectionCount() { @@ -682,8 +675,8 @@ private static RuntimeException connectionAttemptRejected(String message) { } private boolean validateClusterNodeMembership() { - return redisClusterClient.getClusterClientOptions() == null - || redisClusterClient.getClusterClientOptions().isValidateClusterNodeMembership(); + return redisClusterClient.getClusterClientOptions() == null || redisClusterClient.getClusterClientOptions() + .isValidateClusterNodeMembership(); } /** From 6fd5e440c5766a5881f710ba23055df500e5c8f1 Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Fri, 9 May 2025 18:11:26 +0300 Subject: [PATCH 3/3] Formatter carelessly forgotten --- .../lettuce/core/cluster/PooledClusterConnectionProvider.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java b/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java index c38872ce70..a82dee16c3 100644 --- a/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java +++ b/src/main/java/io/lettuce/core/cluster/PooledClusterConnectionProvider.java @@ -675,8 +675,8 @@ private static RuntimeException connectionAttemptRejected(String message) { } private boolean validateClusterNodeMembership() { - return redisClusterClient.getClusterClientOptions() == null || redisClusterClient.getClusterClientOptions() - .isValidateClusterNodeMembership(); + return redisClusterClient.getClusterClientOptions() == null + || redisClusterClient.getClusterClientOptions().isValidateClusterNodeMembership(); } /**