Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 58 additions & 52 deletions src/main/java/redis/clients/jedis/JedisClusterCommand.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package redis.clients.jedis;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.exceptions.JedisAskDataException;
import redis.clients.jedis.exceptions.JedisClusterMaxAttemptsException;
import redis.clients.jedis.exceptions.JedisClusterOperationException;
Expand All @@ -11,6 +14,8 @@

public abstract class JedisClusterCommand<T> {

private static final Logger LOG = LoggerFactory.getLogger(JedisClusterCommand.class);

private final JedisClusterConnectionHandler connectionHandler;
private final int maxAttempts;

Expand All @@ -22,7 +27,7 @@ public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler, int
public abstract T execute(Jedis connection);

public T run(String key) {
return runWithRetries(JedisClusterCRC16.getSlot(key), this.maxAttempts, false, null);
return runWithRetries(JedisClusterCRC16.getSlot(key));
}

public T run(int keyCount, String... keys) {
Expand All @@ -42,11 +47,11 @@ public T run(int keyCount, String... keys) {
}
}

return runWithRetries(slot, this.maxAttempts, false, null);
return runWithRetries(slot);
}

public T runBinary(byte[] key) {
return runWithRetries(JedisClusterCRC16.getSlot(key), this.maxAttempts, false, null);
return runWithRetries(JedisClusterCRC16.getSlot(key));
}

public T runBinary(int keyCount, byte[]... keys) {
Expand All @@ -66,7 +71,7 @@ public T runBinary(int keyCount, byte[]... keys) {
}
}

return runWithRetries(slot, this.maxAttempts, false, null);
return runWithRetries(slot);
}

public T runWithAnyNode() {
Expand All @@ -79,61 +84,62 @@ public T runWithAnyNode() {
}
}

private T runWithRetries(final int slot, int attempts, boolean tryRandomNode, JedisRedirectionException redirect) {
if (attempts <= 0) {
throw new JedisClusterMaxAttemptsException("No more cluster attempts left.");
}

Jedis connection = null;
try {

if (redirect != null) {
connection = this.connectionHandler.getConnectionFromNode(redirect.getTargetNode());
if (redirect instanceof JedisAskDataException) {
// TODO: Pipeline asking with the original command to make it faster....
connection.asking();
}
} else {
if (tryRandomNode) {
connection = connectionHandler.getConnection();
private T runWithRetries(final int slot) {
JedisRedirectionException redirect = null;
Exception lastException = null;
for (int attemptsLeft = this.maxAttempts; attemptsLeft > 0; attemptsLeft--) {
Jedis connection = null;
try {
if (redirect != null) {
connection = connectionHandler.getConnectionFromNode(redirect.getTargetNode());
if (redirect instanceof JedisAskDataException) {
// TODO: Pipeline asking with the original command to make it faster....
connection.asking();
}
} else {
connection = connectionHandler.getConnectionFromSlot(slot);
}
}

return execute(connection);

} catch (JedisNoReachableClusterNodeException jnrcne) {
throw jnrcne;
} catch (JedisConnectionException jce) {
// release current connection before recursion
releaseConnection(connection);
connection = null;

if (attempts <= 1) {
//We need this because if node is not reachable anymore - we need to finally initiate slots
//renewing, or we can stuck with cluster state without one node in opposite case.
//But now if maxAttempts = [1 or 2] we will do it too often.
//TODO make tracking of successful/unsuccessful operations for node - do renewing only
//if there were no successful responses from this node last few seconds
this.connectionHandler.renewSlotCache();
}

return runWithRetries(slot, attempts - 1, tryRandomNode, redirect);
} catch (JedisRedirectionException jre) {
// if MOVED redirection occurred,
if (jre instanceof JedisMovedDataException) {
// it rebuilds cluster's slot cache recommended by Redis cluster specification
this.connectionHandler.renewSlotCache(connection);
return execute(connection);

} catch (JedisNoReachableClusterNodeException jnrcne) {
throw jnrcne;
} catch (JedisConnectionException jce) {
lastException = jce;
LOG.debug("Failed connecting to Redis: {}", connection, jce);
// "- 1" because we just did one, but the attemptsLeft counter hasn't been decremented yet
handleConnectionProblem(attemptsLeft - 1);
} catch (JedisRedirectionException jre) {
// avoid updating lastException if it is a connection exception
if (lastException == null || lastException instanceof JedisRedirectionException) {
lastException = jre;
}
LOG.debug("Redirected by server to {}", jre.getTargetNode());
redirect = jre;
// if MOVED redirection occurred,
if (jre instanceof JedisMovedDataException) {
// it rebuilds cluster's slot cache recommended by Redis cluster specification
this.connectionHandler.renewSlotCache(connection);
}
} finally {
releaseConnection(connection);
}
}

// release current connection before recursion
releaseConnection(connection);
connection = null;
JedisClusterMaxAttemptsException maxAttemptsException
= new JedisClusterMaxAttemptsException("No more cluster attempts left.");
maxAttemptsException.addSuppressed(lastException);
throw maxAttemptsException;
}

return runWithRetries(slot, attempts - 1, false, jre);
} finally {
releaseConnection(connection);
private void handleConnectionProblem(int attemptsLeft) {
if (attemptsLeft <= 1) {
//We need this because if node is not reachable anymore - we need to finally initiate slots
//renewing, or we can stuck with cluster state without one node in opposite case.
//But now if maxAttempts = [1 or 2] we will do it too often.
//TODO make tracking of successful/unsuccessful operations for node - do renewing only
//if there were no successful responses from this node last few seconds
this.connectionHandler.renewSlotCache();
}
}

Expand Down