-
Notifications
You must be signed in to change notification settings - Fork 3.9k
Retry with backoff on cluster connection failures #2358
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
6e2f312
5a4fdbd
fc3728a
cdf56b2
d99ef7b
8978ca5
85fa21c
f8d09c2
9c7ef1d
9bce8eb
67a062a
7e4abac
eabf10b
3223404
fd17343
bf56639
c7ae6b5
569afe7
1638603
68e8fdc
c665dc1
d9f2596
a23d602
6dd86db
fed69b0
791180c
4bf345b
62a619e
f1c307d
8a9e0a8
25c63a4
9b6242f
73d74d3
d14174d
af5d1f7
ddd4038
7aa0b74
0ef36d3
25303b7
9e3fbcc
882dd49
7430b9b
9eb8d58
27bce50
b900a87
4501b0d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,11 @@ | ||
| package redis.clients.jedis; | ||
|
|
||
| import java.time.Duration; | ||
| import java.time.Instant; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.function.Supplier; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
| import redis.clients.jedis.exceptions.JedisAskDataException; | ||
sazzad16 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| import redis.clients.jedis.exceptions.JedisClusterMaxAttemptsException; | ||
| import redis.clients.jedis.exceptions.JedisClusterOperationException; | ||
|
|
@@ -11,18 +17,33 @@ | |
|
|
||
| public abstract class JedisClusterCommand<T> { | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(JedisClusterCommand.class); | ||
|
|
||
| private static final Duration DEFAULT_MAX_TOTAL_RETRIES_DURATION = Duration.ofMillis( | ||
| BinaryJedisCluster.DEFAULT_TIMEOUT * BinaryJedisCluster.DEFAULT_MAX_ATTEMPTS); | ||
|
|
||
sazzad16 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| private final JedisClusterConnectionHandler connectionHandler; | ||
| private final int maxAttempts; | ||
| private final Duration maxTotalRetriesDuration; | ||
|
|
||
| public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler, int maxAttempts) { | ||
| this(connectionHandler, maxAttempts, DEFAULT_MAX_TOTAL_RETRIES_DURATION); | ||
sazzad16 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| /** | ||
| * @param maxTotalRetriesDuration No more attempts after we have been trying for this long. | ||
| */ | ||
| public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler, int maxAttempts, | ||
| Duration maxTotalRetriesDuration) { | ||
| this.connectionHandler = connectionHandler; | ||
| this.maxAttempts = maxAttempts; | ||
| this.maxTotalRetriesDuration = maxTotalRetriesDuration; | ||
| } | ||
|
|
||
| public abstract T execute(Jedis connection); | ||
|
|
||
| public T run(String key) { | ||
| return runWithRetries(JedisClusterCRC16.getSlot(key), this.maxAttempts, false, null); | ||
| return runWithRetries(JedisClusterCRC16.getSlot(key)); | ||
| } | ||
|
|
||
| public T run(int keyCount, String... keys) { | ||
|
|
@@ -42,11 +63,11 @@ public T run(int keyCount, String... keys) { | |
| } | ||
| } | ||
|
|
||
| return runWithRetries(slot, this.maxAttempts, false, null); | ||
| return runWithRetries(slot); | ||
| } | ||
|
|
||
| public T runBinary(byte[] key) { | ||
| return runWithRetries(JedisClusterCRC16.getSlot(key), this.maxAttempts, false, null); | ||
| return runWithRetries(JedisClusterCRC16.getSlot(key)); | ||
| } | ||
|
|
||
| public T runBinary(int keyCount, byte[]... keys) { | ||
|
|
@@ -66,7 +87,7 @@ public T runBinary(int keyCount, byte[]... keys) { | |
| } | ||
| } | ||
|
|
||
| return runWithRetries(slot, this.maxAttempts, false, null); | ||
| return runWithRetries(slot); | ||
| } | ||
|
|
||
| public T runWithAnyNode() { | ||
|
|
@@ -79,62 +100,131 @@ public T runWithAnyNode() { | |
| } | ||
| } | ||
|
|
||
| private T runWithRetries(final int slot, int attempts, boolean tryRandomNode, JedisRedirectionException redirect) { | ||
| if (attempts <= 0) { | ||
| throw new JedisClusterMaxAttemptsException("No more cluster attempts left."); | ||
| private boolean shouldBackOff(int attemptsLeft) { | ||
| int attemptsDone = maxAttempts - attemptsLeft; | ||
| return attemptsDone >= maxAttempts / 3; | ||
| } | ||
sazzad16 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| private static long getBackoffSleepMillis(int attemptsLeft, Instant deadline) { | ||
| if (attemptsLeft <= 0) { | ||
| return 0; | ||
| } | ||
|
|
||
| Jedis connection = null; | ||
| try { | ||
| long millisLeft = Duration.between(Instant.now(), deadline).toMillis(); | ||
| if (millisLeft < 0) { | ||
| throw new JedisClusterMaxAttemptsException("Deadline exceeded"); | ||
| } | ||
|
|
||
| if (redirect != null) { | ||
| connection = this.connectionHandler.getConnectionFromNode(redirect.getTargetNode()); | ||
| if (redirect instanceof JedisAskDataException) { | ||
| // TODO: Pipeline asking with the original command to make it faster.... | ||
| connection.asking(); | ||
| } | ||
| } else { | ||
| if (tryRandomNode) { | ||
| connection = connectionHandler.getConnection(); | ||
| return millisLeft / (attemptsLeft * attemptsLeft); | ||
| } | ||
|
|
||
| private T runWithRetries(final int slot) { | ||
| Instant deadline = Instant.now().plus(maxTotalRetriesDuration); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Taking the the time on each successful call seems like a waste and might impact the performance. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gkorland According to https://www.alibabacloud.com/blog/performance-issues-related-to-localdatetime-and-instant-during-serialization-operations_595605 Throughput of Instant.now+atZone+format+DateTimeFormatter.ofPattern is 6816922.578 ops/sec. |
||
| Supplier<Jedis> connectionSupplier = () -> connectionHandler.getConnectionFromSlot(slot); | ||
|
|
||
| // If we got one redirection, stick with that and don't try anything else | ||
| Supplier<Jedis> redirectionSupplier = null; | ||
|
|
||
| for (int currentAttempt = 0; currentAttempt < this.maxAttempts; currentAttempt++) { | ||
| Jedis connection = null; | ||
| try { | ||
| if (redirectionSupplier != null) { | ||
| connection = redirectionSupplier.get(); | ||
| } else { | ||
| connection = connectionHandler.getConnectionFromSlot(slot); | ||
| connection = connectionSupplier.get(); | ||
| } | ||
| } | ||
|
|
||
| return execute(connection); | ||
| if (shouldBackOff(maxAttempts - currentAttempt)) { | ||
| // Don't just stick to this any more, start asking around | ||
| redirectionSupplier = null; | ||
| } | ||
|
|
||
| } catch (JedisNoReachableClusterNodeException jnrcne) { | ||
| throw jnrcne; | ||
| } catch (JedisConnectionException jce) { | ||
| // release current connection before recursion | ||
| releaseConnection(connection); | ||
| connection = null; | ||
|
|
||
| if (attempts <= 1) { | ||
| //We need this because if node is not reachable anymore - we need to finally initiate slots | ||
| //renewing, or we can stuck with cluster state without one node in opposite case. | ||
| //But now if maxAttempts = [1 or 2] we will do it too often. | ||
| //TODO make tracking of successful/unsuccessful operations for node - do renewing only | ||
| //if there were no successful responses from this node last few seconds | ||
| this.connectionHandler.renewSlotCache(); | ||
| final T result = execute(connection); | ||
| if (currentAttempt > 0) { | ||
| LOG.info("Success after {} attempts", currentAttempt + 1); | ||
| } | ||
| return result; | ||
| } catch (JedisNoReachableClusterNodeException e) { | ||
| throw e; | ||
| } catch (JedisConnectionException e) { | ||
| LOG.warn("Failed connecting to Redis: {}", connection, e); | ||
| // "- 1" because we just did one, but the currentAttempt counter hasn't increased yet | ||
| int attemptsLeft = maxAttempts - currentAttempt - 1; | ||
| connectionSupplier = handleConnectionProblem(connection, slot, attemptsLeft, deadline); | ||
| } catch (JedisRedirectionException e) { | ||
| redirectionSupplier = handleRedirection(connection, e); | ||
| } finally { | ||
| releaseConnection(connection); | ||
| } | ||
|
|
||
| return runWithRetries(slot, attempts - 1, tryRandomNode, redirect); | ||
| } catch (JedisRedirectionException jre) { | ||
| // if MOVED redirection occurred, | ||
| if (jre instanceof JedisMovedDataException) { | ||
| // it rebuilds cluster's slot cache recommended by Redis cluster specification | ||
| this.connectionHandler.renewSlotCache(connection); | ||
| } | ||
| LOG.info("{} retries left...", maxAttempts - currentAttempt - 1); | ||
| } | ||
|
|
||
| // release current connection before recursion | ||
| releaseConnection(connection); | ||
| connection = null; | ||
| throw new JedisClusterMaxAttemptsException("No more cluster attempts left."); | ||
| } | ||
|
|
||
| return runWithRetries(slot, attempts - 1, false, jre); | ||
| } finally { | ||
| releaseConnection(connection); | ||
| protected void sleep(long sleepMillis) { | ||
| try { | ||
| LOG.info("Backing off, sleeping {}ms before trying again...", sleepMillis); | ||
| TimeUnit.MILLISECONDS.sleep(sleepMillis); | ||
| } catch (InterruptedException e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| } | ||
|
|
||
| private Supplier<Jedis> handleConnectionProblem(Jedis failedConnection, final int slot, int attemptsLeft, | ||
| Instant doneDeadline) { | ||
| if (!shouldBackOff(attemptsLeft)) { | ||
| return () -> { | ||
| Jedis connection = connectionHandler.getConnectionFromSlot(slot); | ||
| LOG.info("Retrying with {}", connection); | ||
| return connection; | ||
| }; | ||
| } | ||
|
|
||
| // Must release current connection before renewing the slot cache below. If we fail to do this, | ||
| // then JedisClusterTest.testReturnConnectionOnJedisClusterConnection will start failing | ||
| // intermittently. | ||
| releaseConnection(failedConnection); | ||
|
|
||
| //We need this because if node is not reachable anymore - we need to finally initiate slots | ||
| //renewing, or we can stuck with cluster state without one node in opposite case. | ||
| //TODO make tracking of successful/unsuccessful operations for node - do renewing only | ||
| //if there were no successful responses from this node last few seconds | ||
| this.connectionHandler.renewSlotCache(); | ||
|
|
||
| return () -> { | ||
| sleep(getBackoffSleepMillis(attemptsLeft, doneDeadline)); | ||
| // Get a random connection, it will redirect us if it's not the right one | ||
| LOG.info("Retrying with a random node..."); | ||
| Jedis connection = connectionHandler.getConnection(); | ||
| LOG.info("Retrying with random pick: {}", connection); | ||
| return connection; | ||
| }; | ||
| } | ||
|
|
||
| private Supplier<Jedis> handleRedirection(Jedis connection, final JedisRedirectionException jre) { | ||
| LOG.debug("Redirected by server to {}", jre.getTargetNode()); | ||
|
|
||
| // if MOVED redirection occurred, | ||
| if (jre instanceof JedisMovedDataException) { | ||
| // it rebuilds cluster's slot cache recommended by Redis cluster specification | ||
| this.connectionHandler.renewSlotCache(connection); | ||
| } | ||
|
|
||
| // release current connection before iteration | ||
| releaseConnection(connection); | ||
|
|
||
| return () -> { | ||
| Jedis redirectedConnection = connectionHandler.getConnectionFromNode(jre.getTargetNode()); | ||
| LOG.info("Retrying with redirection target {}", connection); | ||
| if (jre instanceof JedisAskDataException) { | ||
| // TODO: Pipeline asking with the original command to make it faster.... | ||
| redirectedConnection.asking(); | ||
| } | ||
|
|
||
| return redirectedConnection; | ||
| }; | ||
| } | ||
|
|
||
| private void releaseConnection(Jedis connection) { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.