- 
                Notifications
    You must be signed in to change notification settings 
- Fork 3.9k
Retry with backoff on cluster connection failures #2358
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 39 commits
6e2f312
              5a4fdbd
              fc3728a
              cdf56b2
              d99ef7b
              8978ca5
              85fa21c
              f8d09c2
              9c7ef1d
              9bce8eb
              67a062a
              7e4abac
              eabf10b
              3223404
              fd17343
              bf56639
              c7ae6b5
              569afe7
              1638603
              68e8fdc
              c665dc1
              d9f2596
              a23d602
              6dd86db
              fed69b0
              791180c
              4bf345b
              62a619e
              f1c307d
              8a9e0a8
              25c63a4
              9b6242f
              73d74d3
              d14174d
              af5d1f7
              ddd4038
              7aa0b74
              0ef36d3
              25303b7
              9e3fbcc
              882dd49
              7430b9b
              9eb8d58
              27bce50
              b900a87
              4501b0d
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -1,5 +1,8 @@ | ||
| package redis.clients.jedis; | ||
|  | ||
| import java.time.Duration; | ||
| import java.time.Instant; | ||
| import java.util.concurrent.TimeUnit; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|  | ||
|  | @@ -18,10 +21,20 @@ public abstract class JedisClusterCommand<T> { | |
|  | ||
| private final JedisClusterConnectionHandler connectionHandler; | ||
| private final int maxAttempts; | ||
| private final Duration maxTotalRetriesDuration; | ||
|  | ||
| public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler, int maxAttempts) { | ||
| this(connectionHandler, maxAttempts, Duration.ofMillis((long) BinaryJedisCluster.DEFAULT_TIMEOUT * maxAttempts)); | ||
| } | ||
|  | ||
| /** | ||
| * @param maxTotalRetriesDuration No more attempts after we have been trying for this long. | ||
| */ | ||
| public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler, int maxAttempts, | ||
| Duration maxTotalRetriesDuration) { | ||
| this.connectionHandler = connectionHandler; | ||
| this.maxAttempts = maxAttempts; | ||
| this.maxTotalRetriesDuration = maxTotalRetriesDuration; | ||
| } | ||
|  | ||
| public abstract T execute(Jedis connection); | ||
|  | @@ -85,7 +98,10 @@ public T runWithAnyNode() { | |
| } | ||
|  | ||
| private T runWithRetries(final int slot) { | ||
| Instant deadline = Instant.now().plus(maxTotalRetriesDuration); | ||
|  | ||
| JedisRedirectionException redirect = null; | ||
| int consecutiveConnectionFailures = 0; | ||
| Exception lastException = null; | ||
| for (int attemptsLeft = this.maxAttempts; attemptsLeft > 0; attemptsLeft--) { | ||
| Jedis connection = null; | ||
|  | @@ -106,15 +122,18 @@ private T runWithRetries(final int slot) { | |
| throw jnrcne; | ||
| } catch (JedisConnectionException jce) { | ||
| lastException = jce; | ||
| ++consecutiveConnectionFailures; | ||
| LOG.debug("Failed connecting to Redis: {}", connection, jce); | ||
| // "- 1" because we just did one, but the attemptsLeft counter hasn't been decremented yet | ||
| handleConnectionProblem(attemptsLeft - 1); | ||
| } catch (JedisRedirectionException jre) { | ||
| // avoid updating lastException if it is a connection exception | ||
| if (lastException == null || lastException instanceof JedisRedirectionException) { | ||
| lastException = jre; | ||
| boolean reset = handleConnectionProblem(attemptsLeft - 1, consecutiveConnectionFailures, deadline); | ||
| if (reset) { | ||
| consecutiveConnectionFailures = 0; | ||
| redirect = null; | ||
| } | ||
| } catch (JedisRedirectionException jre) { | ||
| lastException = jre; | ||
| LOG.debug("Redirected by server to {}", jre.getTargetNode()); | ||
| consecutiveConnectionFailures = 0; | ||
| redirect = jre; | ||
| // if MOVED redirection occurred, | ||
| if (jre instanceof JedisMovedDataException) { | ||
|  | @@ -124,22 +143,69 @@ private T runWithRetries(final int slot) { | |
| } finally { | ||
| releaseConnection(connection); | ||
| } | ||
| if (Instant.now().isAfter(deadline)) { | ||
| throw new JedisClusterOperationException("Retry deadline exceeded"); | ||
| } | ||
| } | ||
|  | ||
| throw new JedisClusterMaxAttemptsException("No more cluster attempts left.", lastException); | ||
| } | ||
|  | ||
| /** | ||
| * Related values should be reset if <code>TRUE</code> is returned. | ||
| * | ||
| * @param attemptsLeft | ||
| * @param consecutiveConnectionFailures | ||
| * @param doneDeadline | ||
| * @return true - if some actions are taken | ||
| * <br /> false - if no actions are taken | ||
| */ | ||
| private boolean handleConnectionProblem(int attemptsLeft, int consecutiveConnectionFailures, Instant doneDeadline) { | ||
| if (this.maxAttempts < 3) { | ||
| // Since we only renew the slots cache after two consecutive connection | ||
| // failures (see consecutiveConnectionFailures above), we need to special | ||
| // case the situation where we max out after two or fewer attempts. | ||
| // Otherwise, on two or fewer max attempts, the slots cache would never be | ||
| // renewed. | ||
| if (attemptsLeft == 0) { | ||
| this.connectionHandler.renewSlotCache(); | ||
| return true; | ||
|         
                  walles marked this conversation as resolved.
              Show resolved
            Hide resolved | ||
| } | ||
| return false; | ||
| } | ||
|  | ||
| if (consecutiveConnectionFailures < 2) { | ||
| return false; | ||
| } | ||
|  | ||
| sleep(getBackoffSleepMillis(attemptsLeft, doneDeadline)); | ||
| //We need this because if node is not reachable anymore - we need to finally initiate slots | ||
| //renewing, or we can stuck with cluster state without one node in opposite case. | ||
| //TODO make tracking of successful/unsuccessful operations for node - do renewing only | ||
| //if there were no successful responses from this node last few seconds | ||
| this.connectionHandler.renewSlotCache(); | ||
| return true; | ||
| } | ||
|  | ||
| private static long getBackoffSleepMillis(int attemptsLeft, Instant deadline) { | ||
| if (attemptsLeft <= 0) { | ||
| return 0; | ||
| } | ||
|  | ||
| JedisClusterMaxAttemptsException maxAttemptsException | ||
| = new JedisClusterMaxAttemptsException("No more cluster attempts left."); | ||
| maxAttemptsException.addSuppressed(lastException); | ||
| throw maxAttemptsException; | ||
| long millisLeft = Duration.between(Instant.now(), deadline).toMillis(); | ||
| if (millisLeft < 0) { | ||
| // TODO: change to JedisClusterOperationException or a new sub-class of it | ||
| throw new JedisClusterMaxAttemptsException("Deadline exceeded"); | ||
| } | ||
|  | ||
| return millisLeft / (attemptsLeft * (attemptsLeft + 1)); | ||
| } | ||
|  | ||
| private void handleConnectionProblem(int attemptsLeft) { | ||
| if (attemptsLeft <= 1) { | ||
| // We need this because if node is not reachable anymore - we need to finally initiate slots | ||
| // renewing, or we can stuck with cluster state without one node in opposite case. | ||
| // But now if maxAttempts = [1 or 2] we will do it too often. | ||
| // TODO make tracking of successful/unsuccessful operations for node - do renewing only | ||
| // if there were no successful responses from this node last few seconds | ||
| this.connectionHandler.renewSlotCache(); | ||
| protected void sleep(long sleepMillis) { | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be  There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No,  | ||
| try { | ||
| TimeUnit.MILLISECONDS.sleep(sleepMillis); | ||
| } catch (InterruptedException e) { | ||
| throw new JedisClusterOperationException(e); | ||
| } | ||
| } | ||
|  | ||
|  | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taking the the time on each successful call seems like a waste and might impact the performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gkorland According to https://www.alibabacloud.com/blog/performance-issues-related-to-localdatetime-and-instant-during-serialization-operations_595605
Throughput of Instant.now+atZone+format+DateTimeFormatter.ofPattern is 6816922.578 ops/sec.
Without any formatting, throughput of Instant.now+plus should be much higher. Shouldn't it be enough?