Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
6e2f312
Split JedisClusterCommand into multiple methods
Jan 25, 2021
5a4fdbd
Drop redundant null check
Jan 25, 2021
fc3728a
Bump JDK version to 1.8
Jan 25, 2021
cdf56b2
Replace ConnectionGetters with lambdas
Jan 25, 2021
d99ef7b
Retrigger CI
Jan 25, 2021
8978ca5
Add backoff to Redis connections
Jan 28, 2021
85fa21c
Add unit tests for backoff logic
Jan 22, 2021
f8d09c2
Add retries logging
Jan 29, 2021
9c7ef1d
Always use the user requested timeout
Jan 28, 2021
9bce8eb
Remedy review feedback
Feb 2, 2021
67a062a
Consider connection exceptions and disregard random nodes
sazzad16 Feb 11, 2021
7e4abac
Revert "Consider connection exceptions and disregard random nodes"
Feb 11, 2021
eabf10b
Add another backoff test case
Feb 11, 2021
3223404
consider connection exceptions and disregard random nodes
sazzad16 Feb 10, 2021
fd17343
reset redirection
sazzad16 Feb 11, 2021
bf56639
Fix test failure
walles Feb 12, 2021
c7ae6b5
Merge pull request #3 from sazzad16/backoff-walles
Feb 12, 2021
569afe7
Merge branch 'master' into j/backoff
walles Feb 12, 2021
1638603
Apply suggestions from code review
Feb 12, 2021
68e8fdc
update documentation
sazzad16 Feb 12, 2021
c665dc1
Improve a comment
Feb 14, 2021
d9f2596
Update src/main/java/redis/clients/jedis/DefaultJedisSocketFactory.java
Feb 17, 2021
a23d602
Add change from another branch
Feb 25, 2021
6dd86db
Merge branch 'master' into j/backoff
walles Feb 25, 2021
fed69b0
Merge remote-tracking branch 'origin/master' into j/backoff
Feb 26, 2021
791180c
Move JedisClusterCommandTest out of commands package
sazzad16 Feb 26, 2021
4bf345b
Use JedisClusterOperationException
sazzad16 Feb 26, 2021
62a619e
Reduce sleep time, especially when few attempts left
sazzad16 Feb 26, 2021
f1c307d
Update src/main/java/redis/clients/jedis/JedisClusterCommand.java
sazzad16 Feb 26, 2021
8a9e0a8
Merge remote-tracking branch 'origin/master' into j/backoff
Mar 3, 2021
25c63a4
Merge branch 'master' into j/backoff
sazzad16 Mar 10, 2021
9b6242f
merge fix
sazzad16 Mar 10, 2021
73d74d3
Merge branch 'master' into j/backoff
gkorland Mar 18, 2021
d14174d
merge fix
sazzad16 Mar 20, 2021
af5d1f7
Merge remote-tracking branch 'redis/master' into j/backoff
sazzad16 Mar 20, 2021
ddd4038
Merge branch 'master' into j/backoff
sazzad16 Mar 25, 2021
7aa0b74
Merge branch 'master' into j/backoff
sazzad16 Mar 29, 2021
0ef36d3
Use maxAttempts
sazzad16 Mar 29, 2021
25303b7
format import
sazzad16 Mar 29, 2021
9e3fbcc
Re-add missing codes due to merge
sazzad16 Mar 29, 2021
882dd49
avoid NPE while zero max attempts
sazzad16 Mar 29, 2021
7430b9b
Remove zero attempts test
sazzad16 Mar 29, 2021
9eb8d58
More cluster constructors and customizability
sazzad16 Mar 29, 2021
27bce50
Use maxTotalRetriesDuration everywhere
sazzad16 Mar 29, 2021
b900a87
Merge remote-tracking branch 'redis/master' into j/backoff
sazzad16 Mar 31, 2021
4501b0d
more missing maxTotalRetriesDuration after merge
sazzad16 Mar 31, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@
<version>2.3.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>3.7.7</version>
<scope>test</scope>
</dependency>
</dependencies>

<distributionManagement>
Expand Down Expand Up @@ -129,8 +135,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/redis/clients/jedis/BinaryJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ public BinaryJedis(final JedisSocketFactory jedisSocketFactory) {
client = new Client(jedisSocketFactory);
}

@Override
public String toString() {
return "BinaryJedis{" + client + '}';
}

private void initializeClientFromURI(URI uri) {
initializeClientFromURI(uri, null, null, null);
}
Expand Down
448 changes: 228 additions & 220 deletions src/main/java/redis/clients/jedis/BinaryJedisCluster.java

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions src/main/java/redis/clients/jedis/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public Connection(final JedisSocketFactory jedisSocketFactory) {
this.jedisSocketFactory = jedisSocketFactory;
}

@Override
public String toString() {
return "Connection{" + jedisSocketFactory + "}";
}

public Socket getSocket() {
return socket;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,9 @@ public int getSoTimeout() {
public void setSoTimeout(int soTimeout) {
this.soTimeout = soTimeout;
}

@Override
public String toString() {
return "DefaultJedisSocketFactory{" + host + ":" + +port + "}";
}
}
441 changes: 221 additions & 220 deletions src/main/java/redis/clients/jedis/JedisCluster.java

Large diffs are not rendered by default.

178 changes: 129 additions & 49 deletions src/main/java/redis/clients/jedis/JedisClusterCommand.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package redis.clients.jedis;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.exceptions.JedisAskDataException;
import redis.clients.jedis.exceptions.JedisClusterMaxAttemptsException;
import redis.clients.jedis.exceptions.JedisClusterOperationException;
Expand All @@ -11,18 +17,23 @@

public abstract class JedisClusterCommand<T> {

private static final Logger LOG = LoggerFactory.getLogger(JedisClusterCommand.class);

private final JedisClusterConnectionHandler connectionHandler;
private final int maxAttempts;
private final Duration timeout;

public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler, int maxAttempts) {
public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler, int maxAttempts,
Duration timeout) {
this.connectionHandler = connectionHandler;
this.maxAttempts = maxAttempts;
this.timeout = timeout;
}

public abstract T execute(Jedis connection);

public T run(String key) {
return runWithRetries(JedisClusterCRC16.getSlot(key), this.maxAttempts, false, null);
return runWithRetries(JedisClusterCRC16.getSlot(key));
}

public T run(int keyCount, String... keys) {
Expand All @@ -42,11 +53,11 @@ public T run(int keyCount, String... keys) {
}
}

return runWithRetries(slot, this.maxAttempts, false, null);
return runWithRetries(slot);
}

public T runBinary(byte[] key) {
return runWithRetries(JedisClusterCRC16.getSlot(key), this.maxAttempts, false, null);
return runWithRetries(JedisClusterCRC16.getSlot(key));
}

public T runBinary(int keyCount, byte[]... keys) {
Expand All @@ -66,7 +77,7 @@ public T runBinary(int keyCount, byte[]... keys) {
}
}

return runWithRetries(slot, this.maxAttempts, false, null);
return runWithRetries(slot);
}

public T runWithAnyNode() {
Expand All @@ -79,64 +90,133 @@ public T runWithAnyNode() {
}
}

private T runWithRetries(final int slot, int attempts, boolean tryRandomNode, JedisRedirectionException redirect) {
if (attempts <= 0) {
throw new JedisClusterMaxAttemptsException("No more cluster attempts left.");
private boolean shouldBackOff(int attemptsLeft) {
int attemptsDone = maxAttempts - attemptsLeft;
return attemptsDone >= maxAttempts / 3;
}

private static long getBackoffSleepMillis(int attemptsLeft, Instant deadline) {
if (attemptsLeft <= 0) {
return 0;
}

Jedis connection = null;
try {
long millisLeft = Duration.between(Instant.now(), deadline).toMillis();
if (millisLeft < 0) {
throw new JedisClusterMaxAttemptsException("Deadline exceeded");
}

if (redirect != null) {
connection = this.connectionHandler.getConnectionFromNode(redirect.getTargetNode());
if (redirect instanceof JedisAskDataException) {
// TODO: Pipeline asking with the original command to make it faster....
connection.asking();
}
} else {
if (tryRandomNode) {
connection = connectionHandler.getConnection();
return millisLeft / (attemptsLeft * attemptsLeft);
}

private T runWithRetries(final int slot) {
Instant deadline = Instant.now().plus(timeout);
Supplier<Jedis> connectionSupplier = () -> connectionHandler.getConnectionFromSlot(slot);

// If we got one redirection, stick with that and don't try anything else
Supplier<Jedis> redirectionSupplier = null;

for (int currentAttempt = 0; currentAttempt < this.maxAttempts; currentAttempt++) {
Jedis connection = null;
try {
if (redirectionSupplier != null) {
connection = redirectionSupplier.get();
} else {
connection = connectionHandler.getConnectionFromSlot(slot);
connection = connectionSupplier.get();
}
}

return execute(connection);
if (shouldBackOff(maxAttempts - currentAttempt)) {
// Don't just stick to this any more, start asking around
redirectionSupplier = null;
}

} catch (JedisNoReachableClusterNodeException jnrcne) {
throw jnrcne;
} catch (JedisConnectionException jce) {
// release current connection before recursion
releaseConnection(connection);
connection = null;

if (attempts <= 1) {
//We need this because if node is not reachable anymore - we need to finally initiate slots
//renewing, or we can stuck with cluster state without one node in opposite case.
//But now if maxAttempts = [1 or 2] we will do it too often.
//TODO make tracking of successful/unsuccessful operations for node - do renewing only
//if there were no successful responses from this node last few seconds
this.connectionHandler.renewSlotCache();
final T result = execute(connection);
if (currentAttempt > 0) {
LOG.info("Success after {} attempts", currentAttempt + 1);
}
return result;
} catch (JedisNoReachableClusterNodeException e) {
throw e;
} catch (JedisConnectionException e) {
LOG.warn("Failed connecting to Redis: {}", connection, e);
// "- 1" because we just did one, but the currentAttempt counter hasn't increased yet
int attemptsLeft = maxAttempts - currentAttempt - 1;
connectionSupplier = handleConnectionProblem(connection, slot, attemptsLeft, deadline);
} catch (JedisRedirectionException e) {
redirectionSupplier = handleRedirection(connection, e);
} finally {
releaseConnection(connection);
}

return runWithRetries(slot, attempts - 1, tryRandomNode, redirect);
} catch (JedisRedirectionException jre) {
// if MOVED redirection occurred,
if (jre instanceof JedisMovedDataException) {
// it rebuilds cluster's slot cache recommended by Redis cluster specification
this.connectionHandler.renewSlotCache(connection);
}
LOG.info("{} retries left...", maxAttempts - currentAttempt - 1);
}

// release current connection before recursion
releaseConnection(connection);
connection = null;
throw new JedisClusterMaxAttemptsException("No more cluster attempts left.");
}

return runWithRetries(slot, attempts - 1, false, jre);
} finally {
releaseConnection(connection);
protected void sleep(long sleepMillis) {
try {
LOG.info("Backing off, sleeping {}ms before trying again...", sleepMillis);
TimeUnit.MILLISECONDS.sleep(sleepMillis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

private Supplier<Jedis> handleConnectionProblem(Jedis failedConnection, final int slot, int attemptsLeft,
Instant doneDeadline) {
if (!shouldBackOff(attemptsLeft)) {
return () -> {
Jedis connection = connectionHandler.getConnectionFromSlot(slot);
LOG.info("Retrying with {}", connection);
return connection;
};
}

// Must release current connection before renewing the slot cache below. If we fail to do this,
// then JedisClusterTest.testReturnConnectionOnJedisClusterConnection will start failing
// intermittently.
releaseConnection(failedConnection);

//We need this because if node is not reachable anymore - we need to finally initiate slots
//renewing, or we can stuck with cluster state without one node in opposite case.
//TODO make tracking of successful/unsuccessful operations for node - do renewing only
//if there were no successful responses from this node last few seconds
this.connectionHandler.renewSlotCache();

return () -> {
sleep(getBackoffSleepMillis(attemptsLeft, doneDeadline));
// Get a random connection, it will redirect us if it's not the right one
LOG.info("Retrying with a random node...");
Jedis connection = connectionHandler.getConnection();
LOG.info("Retrying with random pick: {}", connection);
return connection;
};
}

private Supplier<Jedis> handleRedirection(Jedis connection, final JedisRedirectionException jre) {
LOG.debug("Redirected by server to {}", jre.getTargetNode());

// if MOVED redirection occurred,
if (jre instanceof JedisMovedDataException) {
// it rebuilds cluster's slot cache recommended by Redis cluster specification
this.connectionHandler.renewSlotCache(connection);
}

// release current connection before iteration
releaseConnection(connection);

return () -> {
Jedis redirectedConnection = connectionHandler.getConnectionFromNode(jre.getTargetNode());
LOG.info("Retrying with redirection target {}", connection);
if (jre instanceof JedisAskDataException) {
// TODO: Pipeline asking with the original command to make it faster....
redirectedConnection.asking();
}

return redirectedConnection;
};
}

private void releaseConnection(Jedis connection) {
if (connection != null) {
connection.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,11 @@ public void testReturnConnectionOnJedisConnectionException() throws InterruptedE
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7379));
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(1);
JedisCluster jc = new JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT,

// Otherwise the test can time out before we are done
int shorterThanTheTestTimeoutMs = DEFAULT_TIMEOUT / 2;

JedisCluster jc = new JedisCluster(jedisClusterNode, shorterThanTheTestTimeoutMs, shorterThanTheTestTimeoutMs,
DEFAULT_REDIRECTIONS, "cluster", config);

Jedis j = jc.getClusterNodes().get("127.0.0.1:7380").getResource();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void connectToNodesFailsWithSSLParametersAndNoHostMapping() {
null, sslParameters, null, portMap)){
jc.get("foo");
Assert.fail("The code did not throw the expected JedisClusterMaxAttemptsException.");
} catch (JedisClusterMaxAttemptsException e) {
} catch (JedisNoReachableClusterNodeException e) {
// initial connection to localhost works, but subsequent connections to nodes use 127.0.0.1
// and fail hostname verification
}
Expand Down Expand Up @@ -159,7 +159,7 @@ public void connectWithCustomHostNameVerifier() {
null, null, hostnameVerifier, portMap)){
jc.get("foo");
Assert.fail("The code did not throw the expected JedisClusterMaxAttemptsException.");
} catch (JedisClusterMaxAttemptsException e) {
} catch (JedisNoReachableClusterNodeException e) {
// initial connection made with 'localhost' but subsequent connections to nodes use 127.0.0.1
// which causes custom hostname verification to fail
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import redis.clients.jedis.*;
import redis.clients.jedis.exceptions.JedisClusterMaxAttemptsException;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisNoReachableClusterNodeException;
import redis.clients.jedis.tests.SSLJedisTest.BasicHostnameVerifier;
import redis.clients.jedis.tests.utils.RedisVersionUtil;

Expand Down Expand Up @@ -103,7 +104,7 @@ public void connectToNodesFailsWithSSLParametersAndNoHostMapping() {
null, sslParameters, null, portMap)){
jc.get("foo");
Assert.fail("The code did not throw the expected JedisClusterMaxAttemptsException.");
} catch (JedisClusterMaxAttemptsException e) {
} catch (JedisNoReachableClusterNodeException e) {
// initial connection to localhost works, but subsequent connections to nodes use 127.0.0.1
// and fail hostname verification
}
Expand Down Expand Up @@ -147,7 +148,7 @@ public void connectWithCustomHostNameVerifier() {
null, null, hostnameVerifier, portMap)){
jc.get("foo");
Assert.fail("The code did not throw the expected JedisClusterMaxAttemptsException.");
} catch (JedisClusterMaxAttemptsException e) {
} catch (JedisNoReachableClusterNodeException e) {
// initial connection made with 'localhost' but subsequent connections to nodes use 127.0.0.1
// which causes custom hostname verification to fail
}
Expand Down
Loading