Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 15 additions & 18 deletions src/main/java/redis/clients/jedis/JedisClusterCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@
import redis.clients.jedis.exceptions.JedisNoReachableClusterNodeException;
import redis.clients.jedis.exceptions.JedisRedirectionException;
import redis.clients.util.JedisClusterCRC16;
import redis.clients.util.SafeEncoder;

public abstract class JedisClusterCommand<T> {

private static final String NO_DISPATCH_MESSAGE = "No way to dispatch this command to Redis Cluster.";

private JedisClusterConnectionHandler connectionHandler;
private int maxAttempts;
private ThreadLocal<Jedis> askConnection = new ThreadLocal<Jedis>();
private final JedisClusterConnectionHandler connectionHandler;
private final int maxAttempts;
private final ThreadLocal<Jedis> askConnection = new ThreadLocal<Jedis>();

public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler, int maxAttempts) {
this.connectionHandler = connectionHandler;
Expand All @@ -30,18 +29,17 @@ public T run(String key) {
throw new JedisClusterException(NO_DISPATCH_MESSAGE);
}

return runWithRetries(SafeEncoder.encode(key), this.maxAttempts, false, false);
return runWithRetries(JedisClusterCRC16.getSlot(key), this.maxAttempts, false, false);
}

public T run(int keyCount, String... keys) {
if (keys == null || keys.length == 0) {
throw new JedisClusterException(NO_DISPATCH_MESSAGE);
}

// For multiple keys, only execute if they all share the
// same connection slot.
if (keys.length > 1) {
// For multiple keys, only execute if they all share the same connection slot.
int slot = JedisClusterCRC16.getSlot(keys[0]);
if (keys.length > 1) {
for (int i = 1; i < keyCount; i++) {
int nextSlot = JedisClusterCRC16.getSlot(keys[i]);
if (slot != nextSlot) {
Expand All @@ -51,26 +49,25 @@ public T run(int keyCount, String... keys) {
}
}

return runWithRetries(SafeEncoder.encode(keys[0]), this.maxAttempts, false, false);
return runWithRetries(slot, this.maxAttempts, false, false);
}

public T runBinary(byte[] key) {
if (key == null) {
throw new JedisClusterException(NO_DISPATCH_MESSAGE);
}

return runWithRetries(key, this.maxAttempts, false, false);
return runWithRetries(JedisClusterCRC16.getSlot(key), this.maxAttempts, false, false);
}

public T runBinary(int keyCount, byte[]... keys) {
if (keys == null || keys.length == 0) {
throw new JedisClusterException(NO_DISPATCH_MESSAGE);
}

// For multiple keys, only execute if they all share the
// same connection slot.
// For multiple keys, only execute if they all share the same connection slot.
int slot = JedisClusterCRC16.getSlot(keys[0]);
if (keys.length > 1) {
int slot = JedisClusterCRC16.getSlot(keys[0]);
for (int i = 1; i < keyCount; i++) {
int nextSlot = JedisClusterCRC16.getSlot(keys[i]);
if (slot != nextSlot) {
Expand All @@ -80,7 +77,7 @@ public T runBinary(int keyCount, byte[]... keys) {
}
}

return runWithRetries(keys[0], this.maxAttempts, false, false);
return runWithRetries(slot, this.maxAttempts, false, false);
}

public T runWithAnyNode() {
Expand All @@ -95,7 +92,7 @@ public T runWithAnyNode() {
}
}

private T runWithRetries(byte[] key, int attempts, boolean tryRandomNode, boolean asking) {
private T runWithRetries(final int slot, int attempts, boolean tryRandomNode, boolean asking) {
if (attempts <= 0) {
throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");
}
Expand All @@ -115,7 +112,7 @@ private T runWithRetries(byte[] key, int attempts, boolean tryRandomNode, boolea
if (tryRandomNode) {
connection = connectionHandler.getConnection();
} else {
connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
connection = connectionHandler.getConnectionFromSlot(slot);
}
}

Expand All @@ -140,7 +137,7 @@ private T runWithRetries(byte[] key, int attempts, boolean tryRandomNode, boolea
throw jce;
}

return runWithRetries(key, attempts - 1, tryRandomNode, asking);
return runWithRetries(slot, attempts - 1, tryRandomNode, asking);
} catch (JedisRedirectionException jre) {
// if MOVED redirection occurred,
if (jre instanceof JedisMovedDataException) {
Expand All @@ -161,7 +158,7 @@ private T runWithRetries(byte[] key, int attempts, boolean tryRandomNode, boolea
throw new JedisClusterException(jre);
}

return runWithRetries(key, attempts - 1, false, asking);
return runWithRetries(slot, attempts - 1, false, asking);
} finally {
releaseConnection(connection);
}
Expand Down