Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,17 @@ save ""
appendonly no
endef

# UNAVAILABLE REDIS NODES
define REDIS_UNAVAILABLE_CONF
daemonize yes
protected-mode no
port 6400
pidfile /tmp/redis_unavailable.pid
logfile /tmp/redis_unavailable.log
save ""
appendonly no
endef

#STUNNEL
define STUNNEL_CONF
cert = src/test/resources/private.pem
Expand Down Expand Up @@ -278,6 +289,7 @@ export REDIS_CLUSTER_NODE3_CONF
export REDIS_CLUSTER_NODE4_CONF
export REDIS_CLUSTER_NODE5_CONF
export REDIS_UDS
export REDIS_UNAVAILABLE_CONF
export STUNNEL_CONF
export STUNNEL_BIN

Expand Down Expand Up @@ -309,6 +321,7 @@ start: stunnel cleanup
echo "$$REDIS_CLUSTER_NODE4_CONF" | redis-server -
echo "$$REDIS_CLUSTER_NODE5_CONF" | redis-server -
echo "$$REDIS_UDS" | redis-server -
echo "$$REDIS_UNAVAILABLE_CONF" | redis-server -

cleanup:
- rm -vf /tmp/redis_cluster_node*.conf 2>/dev/null
Expand Down Expand Up @@ -338,6 +351,7 @@ stop:
kill `cat /tmp/redis_cluster_node5.pid` || true
kill `cat /tmp/redis_uds.pid` || true
kill `cat /tmp/stunnel.pid` || true
[ -f /tmp/redis_unavailable.pid ] && kill `cat /tmp/redis_unavailable.pid` || true
rm -f /tmp/sentinel1.conf
rm -f /tmp/sentinel2.conf
rm -f /tmp/sentinel3.conf
Expand Down
14 changes: 9 additions & 5 deletions src/main/java/redis/clients/jedis/BinaryJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,14 @@ private void initializeClientFromURI(URI uri, final SSLSocketFactory sslSocketFa
}
}

public boolean isConnected() {
return client.isConnected();
}

public boolean isBroken() {
return client.isBroken();
}

@Override
public String ping() {
checkIsInMultiOrPipeline();
Expand Down Expand Up @@ -2044,7 +2052,7 @@ public void disconnect() {
}

public void resetState() {
if (client.isConnected()) {
if (isConnected()) {
if (transaction != null) {
transaction.close();
}
Expand Down Expand Up @@ -3298,10 +3306,6 @@ public byte[] configSet(final byte[] parameter, final byte[] value) {
return client.getBinaryBulkReply();
}

public boolean isConnected() {
return client.isConnected();
}

@Override
public Long strlen(final byte[] key) {
checkIsInMultiOrPipeline();
Expand Down
11 changes: 10 additions & 1 deletion src/main/java/redis/clients/jedis/BinaryShardedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.commands.BinaryJedisCommands;
import redis.clients.jedis.commands.ProtocolCommand;
Expand All @@ -19,6 +21,8 @@
public class BinaryShardedJedis extends Sharded<Jedis, JedisShardInfo> implements
BinaryJedisCommands {

private static final Logger logger = LoggerFactory.getLogger(BinaryShardedJedis.class);

private final byte[][] dummyArray = new byte[0][];

public BinaryShardedJedis(List<JedisShardInfo> shards) {
Expand All @@ -41,14 +45,19 @@ public void disconnect() {
for (Jedis jedis : getAllShards()) {
if (jedis.isConnected()) {
try {
jedis.quit();
// need a proper test, probably with mock
if (!jedis.isBroken()) {
jedis.quit();
}
} catch (JedisConnectionException e) {
// ignore the exception node, so that all other normal nodes can release all connections.
logger.warn("Error while QUIT", e);
}
try {
jedis.disconnect();
} catch (JedisConnectionException e) {
// ignore the exception node, so that all other normal nodes can release all connections.
logger.warn("Error while disconnect", e);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/Jedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -3608,7 +3608,7 @@ public void close() {
if (dataSource != null) {
JedisPoolAbstract pool = this.dataSource;
this.dataSource = null;
if (client.isBroken()) {
if (isBroken()) {
pool.returnBrokenResource(this);
} else {
pool.returnResource(this);
Expand Down
16 changes: 13 additions & 3 deletions src/main/java/redis/clients/jedis/JedisFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.exceptions.InvalidURIException;
import redis.clients.jedis.exceptions.JedisException;
Expand All @@ -19,6 +21,9 @@
* PoolableObjectFactory custom impl.
*/
class JedisFactory implements PooledObjectFactory<Jedis> {

private static final Logger logger = LoggerFactory.getLogger(JedisFactory.class);

private final AtomicReference<HostAndPort> hostAndPort = new AtomicReference<>();
private final int connectionTimeout;
private final int soTimeout;
Expand Down Expand Up @@ -129,12 +134,17 @@ public void destroyObject(PooledObject<Jedis> pooledJedis) throws Exception {
final BinaryJedis jedis = pooledJedis.getObject();
if (jedis.isConnected()) {
try {
try {
// need a proper test, probably with mock
if (!jedis.isBroken()) {
jedis.quit();
} catch (Exception e) {
}
} catch (Exception e) {
logger.warn("Error while QUIT", e);
}
try {
jedis.disconnect();
} catch (Exception e) {
logger.warn("Error while disconnect", e);
}
}
}
Expand Down Expand Up @@ -187,4 +197,4 @@ public boolean validateObject(PooledObject<Jedis> pooledJedis) {
return false;
}
}
}
}
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/ShardedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ public void close() {
boolean broken = false;

for (Jedis jedis : getAllShards()) {
if (jedis.getClient().isBroken()) {
if (jedis.isBroken()) {
broken = true;
break;
}
Expand Down
25 changes: 17 additions & 8 deletions src/main/java/redis/clients/jedis/ShardedJedisPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,16 @@
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.util.Hashing;
import redis.clients.jedis.util.Pool;

public class ShardedJedisPool extends Pool<ShardedJedis> {

private static final Logger logger = LoggerFactory.getLogger(ShardedJedisPool.class);

public ShardedJedisPool(final GenericObjectPoolConfig poolConfig, List<JedisShardInfo> shards) {
this(poolConfig, shards, Hashing.MURMUR_HASH);
}
Expand Down Expand Up @@ -50,9 +55,10 @@ public void returnResource(final ShardedJedis resource) {
* PoolableObjectFactory custom impl.
*/
private static class ShardedJedisFactory implements PooledObjectFactory<ShardedJedis> {
private List<JedisShardInfo> shards;
private Hashing algo;
private Pattern keyTagPattern;

private final List<JedisShardInfo> shards;
private final Hashing algo;
private final Pattern keyTagPattern;

public ShardedJedisFactory(List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) {
this.shards = shards;
Expand All @@ -72,14 +78,17 @@ public void destroyObject(PooledObject<ShardedJedis> pooledShardedJedis) throws
for (Jedis jedis : shardedJedis.getAllShards()) {
if (jedis.isConnected()) {
try {
try {
// need a proper test, probably with mock
if (!jedis.isBroken()) {
jedis.quit();
} catch (Exception e) {

}
} catch (Exception e) {
logger.warn("Error while QUIT", e);
}
try {
jedis.disconnect();
} catch (Exception e) {

logger.warn("Error while disconnect", e);
}
}
}
Expand Down Expand Up @@ -110,4 +119,4 @@ public void passivateObject(PooledObject<ShardedJedis> p) throws Exception {

}
}
}
}
2 changes: 1 addition & 1 deletion src/test/java/redis/clients/jedis/tests/JedisPoolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ public void closeBrokenResourceTwice() {
fail();
} catch (Exception e) {
}
assertTrue(j.getClient().isBroken());
assertTrue(j.isBroken());
j.close();
j.close();
}
Expand Down
29 changes: 14 additions & 15 deletions src/test/java/redis/clients/jedis/tests/JedisTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,17 @@ public void timeoutConnectionWithURI() throws Exception {

@Test
public void infiniteTimeout() throws Exception {
Jedis jedis = new Jedis("localhost", 6379, 350, 350, 350);
jedis.auth("foobared");
try {
jedis.blpop(0, "foo");
fail("SocketTimeoutException should occur");
} catch(JedisConnectionException jce) {
assertEquals(java.net.SocketTimeoutException.class, jce.getCause().getClass());
assertEquals("Read timed out", jce.getCause().getMessage());
assertTrue(jedis.getClient().isBroken());
try (Jedis timeoutJedis = new Jedis("localhost", 6379, 350, 350, 350)) {
timeoutJedis.auth("foobared");
try {
timeoutJedis.blpop(0, "foo");
fail("SocketTimeoutException should occur");
} catch(JedisConnectionException jce) {
assertEquals(java.net.SocketTimeoutException.class, jce.getCause().getClass());
assertEquals("Read timed out", jce.getCause().getMessage());
assertTrue(timeoutJedis.isBroken());
}
}
jedis.close();
}

@Test(expected = JedisDataException.class)
Expand Down Expand Up @@ -192,16 +192,15 @@ public void allowUrlWithNoDBAndNoPassword() {

@Test
public void checkCloseable() {
jedis.close();
BinaryJedis bj = new BinaryJedis("localhost");
bj.connect();
bj.close();
try (BinaryJedis bj = new BinaryJedis("localhost")) {
bj.connect();
}
}

@Test
public void checkDisconnectOnQuit() {
jedis.quit();
assertFalse(jedis.getClient().isConnected());
assertFalse(jedis.isConnected());
}

}
20 changes: 10 additions & 10 deletions src/test/java/redis/clients/jedis/tests/ShardedJedisTest.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package redis.clients.jedis.tests;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
Expand Down Expand Up @@ -64,20 +64,20 @@ public void testAvoidLeaksUponDisconnect() throws InterruptedException {

ClientKillerUtil.killClient(deadClient, "DEAD");

assertEquals(true, deadClient.isConnected());
assertEquals(false, deadClient.getClient().getSocket().isClosed());
assertEquals(false, deadClient.getClient().isBroken()); // normal - not found
assertTrue(deadClient.isConnected());
assertFalse(deadClient.getClient().getSocket().isClosed());
assertFalse(deadClient.isBroken()); // normal - not found

shardedJedis.disconnect();

assertEquals(false, deadClient.isConnected());
assertEquals(true, deadClient.getClient().getSocket().isClosed());
assertEquals(true, deadClient.getClient().isBroken());
assertFalse(deadClient.isConnected());
assertTrue(deadClient.getClient().getSocket().isClosed());
assertTrue(deadClient.isBroken());

Jedis jedis2 = it.next();
assertEquals(false, jedis2.isConnected());
assertEquals(true, jedis2.getClient().getSocket().isClosed());
assertEquals(false, jedis2.getClient().isBroken());
assertFalse(jedis2.isConnected());
assertTrue(jedis2.getClient().getSocket().isClosed());
assertFalse(jedis2.isBroken());

}

Expand Down
Loading