Skip to content

Commit

Permalink
feat: support jedis cluster pipeline (#866)
Browse files Browse the repository at this point in the history
* feat: support jedis cluster pipeline

* feat: support jedis cluster pipeline on 4.3.2

* fix unit test

* fix: deadlock fix

* chore: update code, re-trigger ci run

* feat: use reflect rather than wrapper

* avoid public methods in config classes
  • Loading branch information
Roiocam authored Mar 27, 2024
1 parent 94c5a19 commit 0b6889b
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.alicp.jetcache.external.AbstractExternalCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.ClusterPipeline;
import redis.clients.jedis.Connection;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
Expand All @@ -20,10 +21,13 @@
import redis.clients.jedis.UnifiedJedis;
import redis.clients.jedis.commands.KeyBinaryCommands;
import redis.clients.jedis.commands.StringBinaryCommands;
import redis.clients.jedis.commands.StringPipelineBinaryCommands;
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.providers.ClusterConnectionProvider;
import redis.clients.jedis.util.Pool;

import java.io.Closeable;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -47,6 +51,7 @@ public class RedisCache<K, V> extends AbstractExternalCache<K, V> {

Function<Object, byte[]> valueEncoder;
Function<byte[], Object> valueDecoder;
ClusterConnectionProvider provider = null;

private static ThreadLocalRandom random = ThreadLocalRandom.current();

Expand Down Expand Up @@ -82,6 +87,18 @@ public RedisCache(RedisCacheConfig<K, V> config) {
if (config.isExpireAfterAccess()) {
throw new CacheConfigException("expireAfterAccess is not supported");
}
UnifiedJedis jedis = config.getJedis();
if (jedis != null && jedis instanceof JedisCluster) {
try {
Field field = UnifiedJedis.class.getDeclaredField("provider");
boolean accessible = field.isAccessible();
field.setAccessible(true);
provider = (ClusterConnectionProvider) field.get(jedis);
field.setAccessible(accessible);
} catch (Exception ex) {
throw new IllegalStateException("can not get ConnectionProvider from JedisClient", ex);
}
}
}

private int slaveCount() {
Expand Down Expand Up @@ -143,7 +160,7 @@ static int randomIndex(int[] weights) {
int x = 0;
for (int i = 0; i < weights.length; i++) {
x += weights[i];
if(r < x){
if (r < x) {
return i;
}
}
Expand Down Expand Up @@ -276,47 +293,56 @@ protected CacheResult do_PUT(K key, V value, long expireAfterWrite, TimeUnit tim
@Override
protected CacheResult do_PUT_ALL(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit) {
StringBinaryCommands commands = null;
Connection connection = null;
Closeable closeable = null;
try {
commands = (StringBinaryCommands) writeCommands();
int failCount = 0;
if(commands instanceof Jedis || commands instanceof JedisPooled) {
List<Response<String>> responses = new ArrayList<>();
Pipeline pipeline = null;
if(commands instanceof JedisPooled) {
connection = ((JedisPooled) commands).getPool().getResource();
pipeline = new Pipeline(connection);
} else {
pipeline = new Pipeline((Jedis) commands);
}
for (Map.Entry<? extends K, ? extends V> en : map.entrySet()) {
CacheValueHolder<V> holder = new CacheValueHolder(en.getValue(), timeUnit.toMillis(expireAfterWrite));
Response<String> resp = pipeline.psetex(buildKey(en.getKey()), timeUnit.toMillis(expireAfterWrite), valueEncoder.apply(holder));
responses.add(resp);
}
pipeline.sync();
for (Response<String> resp : responses) {
if (!"OK".equals(resp.get())) {
failCount++;
}
}
StringPipelineBinaryCommands pipeline;
// The connection from JedisPooled or JedisCluster needs to be returned to the pool.
if (commands instanceof JedisPooled) {
Connection connection = ((JedisPooled) commands).getPool().getResource();
closeable = connection;
pipeline = new Pipeline(connection);
} else if (commands instanceof JedisCluster) {
ClusterPipeline clusterPipeline = new ClusterPipeline(provider);
closeable = clusterPipeline;
pipeline = clusterPipeline;
} else if (commands instanceof Jedis) {
pipeline = new Pipeline((Jedis) commands);
} else {
for (Map.Entry<? extends K, ? extends V> en : map.entrySet()) {
CacheResult r = do_PUT(en.getKey(), en.getValue(), expireAfterWrite, timeUnit);
if (!r.isSuccess()) {
failCount++;
}
}
throw new IllegalArgumentException(String.format("unknown jedis client type, <%s>", commands.getClass().getName()));
}
return failCount == 0 ? CacheResult.SUCCESS_WITHOUT_MSG :
failCount == map.size() ? CacheResult.FAIL_WITHOUT_MSG : CacheResult.PART_SUCCESS_WITHOUT_MSG;
return executeWithPipeline(pipeline, map, expireAfterWrite, timeUnit);
} catch (Exception ex) {
logError("PUT_ALL", "map(" + map.size() + ")", ex);
return new CacheResult(ex);
} finally {
closeJedis(commands);
close(connection);
close(closeable);
}
}

private CacheResult executeWithPipeline(StringPipelineBinaryCommands pipeline, Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit) {
int failCount = 0;
List<Response<String>> responses = new ArrayList<>();
for (Map.Entry<? extends K, ? extends V> en : map.entrySet()) {
CacheValueHolder<V> holder = new CacheValueHolder(en.getValue(), timeUnit.toMillis(expireAfterWrite));
Response<String> resp = pipeline.psetex(buildKey(en.getKey()), timeUnit.toMillis(expireAfterWrite), valueEncoder.apply(holder));
responses.add(resp);
}
if (pipeline instanceof Pipeline) {
((Pipeline) pipeline).sync();
} else if (pipeline instanceof ClusterPipeline) {
((ClusterPipeline) pipeline).sync();
} else {
throw new UnsupportedOperationException("unrecognized pipeline type");
}
for (Response<String> resp : responses) {
if (!"OK".equals(resp.get())) {
failCount++;
}
}
return failCount == 0 ? CacheResult.SUCCESS_WITHOUT_MSG :
failCount == map.size() ? CacheResult.FAIL_WITHOUT_MSG : CacheResult.PART_SUCCESS_WITHOUT_MSG;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,6 @@ protected void lockTest() throws Exception {
try (AutoReleaseLock lock = cache.tryLock("LockKeyAndRunKey", 1, TimeUnit.SECONDS)) {
Assert.assertNotNull(lock);
}
;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.lettuce.core.masterslave.MasterSlave;
import io.lettuce.core.masterslave.StatefulRedisMasterSlaveConnection;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

import java.util.Arrays;
Expand Down

0 comments on commit 0b6889b

Please sign in to comment.