Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@
<version>2.3.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>3.7.7</version>
<scope>test</scope>
</dependency>
</dependencies>

<distributionManagement>
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/JedisClusterCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private T runWithRetries(final int slot, int attempts, boolean tryRandomNode, Je
this.connectionHandler.renewSlotCache();
}

return runWithRetries(slot, attempts - 1, tryRandomNode, redirect);
return runWithRetries(slot, attempts - 1, true, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would heavily affect current applications. Connection exceptions can happen for silliest of the reasons. So we can't backoff right after a connection exception. There should a balance between retry and backoff. For example, a slot based node or a redirected node should be given at least 2/3 consecutive tries before backing off.

In all of the cases, maxAttempts, remaining attempts, renewSlotCache() should be kept in mind.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll just try to rephrase what you said so I know I understand:

  • With this change, we'd start trying random nodes on the first connection exception
  • In your opinion, what we should do is:
    1. First, try the exact same operation a few times in a row
    2. If that doesn't help, start trying random nodes

I think this sounds reasonable, just want to ensure I change the right thing!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@walles Yes, you've got it right.

} catch (JedisRedirectionException jre) {
// if MOVED redirection occurred,
if (jre instanceof JedisMovedDataException) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package redis.clients.jedis.tests.commands;

import static org.junit.Assert.assertEquals;

import org.junit.Test;
import org.mockito.Mockito;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisClusterCommand;
import redis.clients.jedis.JedisClusterConnectionHandler;
import redis.clients.jedis.JedisSlotBasedConnectionHandler;
import redis.clients.jedis.exceptions.JedisAskDataException;
import redis.clients.jedis.exceptions.JedisClusterMaxAttemptsException;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisMovedDataException;
import redis.clients.jedis.exceptions.JedisNoReachableClusterNodeException;

public class JedisClusterCommandTest {

@Test(expected = JedisClusterMaxAttemptsException.class)
public void runZeroAttempts() {
JedisClusterCommand<String> testMe = new JedisClusterCommand<String>(null, 0) {
@Override
public String execute(Jedis connection) {
return null;
}
};

testMe.run("");
}

@Test
public void runSuccessfulExecute() {
JedisClusterConnectionHandler connectionHandler = Mockito
.mock(JedisClusterConnectionHandler.class);
JedisClusterCommand<String> testMe = new JedisClusterCommand<String>(connectionHandler, 10) {
@Override
public String execute(Jedis connection) {
return "foo";
}
};
String actual = testMe.run("");
assertEquals("foo", actual);
}

@Test
public void runFailOnFirstExecSuccessOnSecondExec() {
JedisClusterConnectionHandler connectionHandler = Mockito
.mock(JedisClusterConnectionHandler.class);

JedisClusterCommand<String> testMe = new JedisClusterCommand<String>(connectionHandler, 10) {
boolean isFirstCall = true;

@Override
public String execute(Jedis connection) {
if (isFirstCall) {
isFirstCall = false;
throw new JedisConnectionException("Borkenz");
}

return "foo";
}
};

String actual = testMe.run("");
assertEquals("foo", actual);
}

@Test
public void runReconnectWithRandomConnection() {
JedisSlotBasedConnectionHandler connectionHandler = Mockito
.mock(JedisSlotBasedConnectionHandler.class);
// simulate failing connection
Mockito.when(connectionHandler.getConnectionFromSlot(Mockito.anyInt())).thenReturn(null);
// simulate good connection
Mockito.when(connectionHandler.getConnection()).thenReturn(Mockito.mock(Jedis.class));

JedisClusterCommand<String> testMe = new JedisClusterCommand<String>(connectionHandler, 10) {
@Override
public String execute(Jedis connection) {
if (connection == null) {
throw new JedisConnectionException("");
}
return "foo";
}
};

String actual = testMe.run("");
assertEquals("foo", actual);
}

@Test
public void runMovedSuccess() {
JedisClusterConnectionHandler connectionHandler = Mockito
.mock(JedisClusterConnectionHandler.class);

JedisClusterCommand<String> testMe = new JedisClusterCommand<String>(connectionHandler, 10) {
boolean isFirstCall = true;

@Override
public String execute(Jedis connection) {
if (isFirstCall) {
isFirstCall = false;

// Slot 0 moved
throw new JedisMovedDataException("", null, 0);
}

return "foo";
}
};

String actual = testMe.run("");
assertEquals("foo", actual);

Mockito.verify(connectionHandler).renewSlotCache(Mockito.<Jedis> any());
}

@Test
public void runAskSuccess() {
JedisSlotBasedConnectionHandler connectionHandler = Mockito
.mock(JedisSlotBasedConnectionHandler.class);
Jedis jedis = Mockito.mock(Jedis.class);
Mockito.when(connectionHandler.getConnectionFromNode(Mockito.<HostAndPort> any())).thenReturn(
jedis);

JedisClusterCommand<String> testMe = new JedisClusterCommand<String>(connectionHandler, 10) {
boolean isFirstCall = true;

@Override
public String execute(Jedis connection) {
if (isFirstCall) {
isFirstCall = false;

// Slot 0 moved
throw new JedisAskDataException("", null, 0);
}

return "foo";
}
};

String actual = testMe.run("");
assertEquals("foo", actual);
Mockito.verify(jedis).asking();
}

@Test
public void runMovedFailSuccess() {
// Test:
// First attempt is a JedisMovedDataException() move, because we asked the wrong node
// Second attempt is a JedisConnectionException, because this node is down
// In response to that, runWithTimeout() requests a random node using
// connectionHandler.getConnection()
// Third attempt works
JedisSlotBasedConnectionHandler connectionHandler = Mockito
.mock(JedisSlotBasedConnectionHandler.class);

Jedis fromGetConnectionFromSlot = Mockito.mock(Jedis.class);
Mockito.when(fromGetConnectionFromSlot.toString()).thenReturn("getConnectionFromSlot");
Mockito.when(connectionHandler.getConnectionFromSlot(Mockito.anyInt())).thenReturn(
fromGetConnectionFromSlot);

Jedis fromGetConnectionFromNode = Mockito.mock(Jedis.class);
Mockito.when(fromGetConnectionFromNode.toString()).thenReturn("getConnectionFromNode");
Mockito.when(connectionHandler.getConnectionFromNode(Mockito.<HostAndPort> any())).thenReturn(
fromGetConnectionFromNode);

Jedis fromGetConnection = Mockito.mock(Jedis.class);
Mockito.when(fromGetConnection.toString()).thenReturn("getConnection");
Mockito.when(connectionHandler.getConnection()).thenReturn(fromGetConnection);

JedisClusterCommand<String> testMe = new JedisClusterCommand<String>(connectionHandler, 10) {
@Override
public String execute(Jedis connection) {
String source = connection.toString();
if ("getConnectionFromSlot".equals(source)) {
// First attempt, report moved
throw new JedisMovedDataException("Moved", null, 0);
}

if ("getConnectionFromNode".equals(source)) {
// Second attempt in response to the move, report failure
throw new JedisConnectionException("Connection failed");
}

// This is the third and last case we handle
assert "getConnection".equals(source);
return "foo";
}
};

String actual = testMe.run("");
assertEquals("foo", actual);
}

@Test(expected = JedisNoReachableClusterNodeException.class)
public void runRethrowsJedisNoReachableClusterNodeException() {
JedisSlotBasedConnectionHandler connectionHandler = Mockito
.mock(JedisSlotBasedConnectionHandler.class);
Mockito.when(connectionHandler.getConnectionFromSlot(Mockito.anyInt())).thenThrow(
JedisNoReachableClusterNodeException.class);

JedisClusterCommand<String> testMe = new JedisClusterCommand<String>(connectionHandler, 10) {
@Override
public String execute(Jedis connection) {
return null;
}
};

testMe.run("");
}
}
33 changes: 33 additions & 0 deletions src/test/java/redis/clients/jedis/tests/demo/ClusterDemo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package redis.clients.jedis.tests.demo;

import java.util.HashSet;
import java.util.Set;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;

// See: https://github.com/redis/jedis/issues/2347
public class ClusterDemo {
// Expect to find a Redis cluster on this and the five following ports
private final static int BASE_PORT = 6379;

// This program should survive any node being down for up to 30s, and keep printing. Having
// printouts pause while the node is down is fine.
public static void main(String[] args) throws InterruptedException {
JedisPoolConfig poolConfig = new JedisPoolConfig();

Set<HostAndPort> nodes = new HashSet<>();
for (int i = 0; i < 6; i++) {
nodes.add(new HostAndPort("127.0.0.1", BASE_PORT + i));
}
JedisCluster cluster = new JedisCluster(nodes, 10_000, 10, poolConfig);

// noinspection InfiniteLoopStatement
while (true) {
final Long foo = cluster.incr("foo");
System.out.printf("foo=%d%n", foo);
// noinspection BusyWait
Thread.sleep(1000);
}
}
}