Skip to content

Commit

Permalink
feat: improve the performance of jedis batch commands. (#873)
Browse files Browse the repository at this point in the history
* feat: improve the performance of jedis batch commands.

* catch non-cluster exception

* new pipelineFirst param

* reduce biFunction scope

* add comment

* remove bifunction usage

* apply suggest
  • Loading branch information
Roiocam authored Mar 29, 2024
1 parent 4cdd4a7 commit 54cd818
Showing 1 changed file with 138 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import redis.clients.jedis.Response;
import redis.clients.jedis.UnifiedJedis;
import redis.clients.jedis.commands.KeyBinaryCommands;
import redis.clients.jedis.commands.KeyPipelineBinaryCommands;
import redis.clients.jedis.commands.StringBinaryCommands;
import redis.clients.jedis.commands.StringPipelineBinaryCommands;
import redis.clients.jedis.params.SetParams;
Expand All @@ -30,13 +31,15 @@
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* Created on 2016/10/7.
Expand Down Expand Up @@ -210,65 +213,62 @@ protected CacheGetResult<V> do_GET(K key) {

@Override
protected MultiGetResult<K, V> do_GET_ALL(Set<? extends K> keys) {
StringBinaryCommands commands = null;
if (keys == null || keys.isEmpty()) {
return new MultiGetResult<K, V>(CacheResultCode.SUCCESS, null, Collections.emptyMap());
}
// define the result object early to gain statefulFunction feature.
Map<K, CacheGetResult<V>> resultMap = new HashMap<>();

try {
StringBinaryCommands readCommands = (StringBinaryCommands) readCommands();
ArrayList<K> keyList = new ArrayList<K>(keys);
byte[][] newKeys = keyList.stream().map((k) -> buildKey(k)).toArray(byte[][]::new);
byte[][] newKeys = keyList.stream().map(this::buildKey).toArray(byte[][]::new);

return this.<StringBinaryCommands, StringPipelineBinaryCommands, MultiGetResult<K, V>>doWithPipeline(readCommands, false, (pipeline) -> {
List<byte[]> results;
if (pipeline != null) {
List<Response<byte[]>> responseList = new ArrayList<>();

for (byte[] newKey : newKeys) {
Response<byte[]> response = pipeline.get(newKey);
responseList.add(response);
}

if (newKeys.length > 0) {
commands = (StringBinaryCommands) readCommands();
if (commands instanceof JedisCluster) {
jedisClusterGetAll((JedisCluster) commands, resultMap, keyList, newKeys);
sync(pipeline);

results = responseList.stream().map(Response::get).collect(Collectors.toList());
} else {
List mgetResults = commands.mget(newKeys);
for (int i = 0; i < mgetResults.size(); i++) {
Object value = mgetResults.get(i);
K key = keyList.get(i);
if (value != null) {
CacheValueHolder<V> holder = (CacheValueHolder<V>) valueDecoder.apply((byte[]) value);
if (System.currentTimeMillis() >= holder.getExpireTime()) {
resultMap.put(key, CacheGetResult.EXPIRED_WITHOUT_MSG);
} else {
CacheGetResult<V> r = new CacheGetResult<V>(CacheResultCode.SUCCESS, null, holder);
resultMap.put(key, r);
}
results = readCommands.mget(newKeys);
}

for (int i = 0; i < results.size(); i++) {
Object value = results.get(i);
K key = keyList.get(i);
if (value != null) {
CacheValueHolder<V> holder = (CacheValueHolder<V>) valueDecoder.apply((byte[]) value);
if (System.currentTimeMillis() >= holder.getExpireTime()) {
resultMap.put(key, CacheGetResult.EXPIRED_WITHOUT_MSG);
} else {
resultMap.put(key, CacheGetResult.NOT_EXISTS_WITHOUT_MSG);
CacheGetResult<V> r = new CacheGetResult<V>(CacheResultCode.SUCCESS, null, holder);
resultMap.put(key, r);
}
} else {
resultMap.put(key, CacheGetResult.NOT_EXISTS_WITHOUT_MSG);
}
}
}
return new MultiGetResult<K, V>(CacheResultCode.SUCCESS, null, resultMap);

return new MultiGetResult<K, V>(CacheResultCode.SUCCESS, null, resultMap);
});
} catch (Exception ex) {
logError("GET_ALL", "keys(" + keys.size() + ")", ex);
if (resultMap.size() > 0) {
if (!resultMap.isEmpty()) {
return new MultiGetResult<K, V>(CacheResultCode.PART_SUCCESS, ex.toString(), resultMap);
} else {
return new MultiGetResult<K, V>(ex);
}
} finally {
closeJedis(commands);
}
}

private void jedisClusterGetAll(JedisCluster commands, Map<K, CacheGetResult<V>> resultMap, ArrayList<K> keyList, byte[][] newKeys) {
for (int i = 0; i < newKeys.length; i++) {
byte[] bytes = commands.get(newKeys[i]);
if (bytes != null) {
CacheValueHolder<V> holder = (CacheValueHolder<V>) valueDecoder.apply(bytes);
if (System.currentTimeMillis() >= holder.getExpireTime()) {
resultMap.put(keyList.get(i), CacheGetResult.EXPIRED_WITHOUT_MSG);
} else {
resultMap.put(keyList.get(i), new CacheGetResult<V>(CacheResultCode.SUCCESS, null, holder));
}
} else {
resultMap.put(keyList.get(i), CacheGetResult.NOT_EXISTS_WITHOUT_MSG);
}
}
}


@Override
protected CacheResult do_PUT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
StringBinaryCommands commands = null;
Expand All @@ -292,58 +292,36 @@ protected CacheResult do_PUT(K key, V value, long expireAfterWrite, TimeUnit tim

@Override
protected CacheResult do_PUT_ALL(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit) {
StringBinaryCommands commands = null;
Closeable closeable = null;
if (map == null || map.isEmpty()) {
return CacheResult.SUCCESS_WITHOUT_MSG;
}
try {
commands = (StringBinaryCommands) writeCommands();
StringPipelineBinaryCommands pipeline;
// The connection from JedisPooled or JedisCluster needs to be returned to the pool.
if (commands instanceof JedisPooled) {
Connection connection = ((JedisPooled) commands).getPool().getResource();
closeable = connection;
pipeline = new Pipeline(connection);
} else if (commands instanceof JedisCluster) {
ClusterPipeline clusterPipeline = new ClusterPipeline(provider);
closeable = clusterPipeline;
pipeline = clusterPipeline;
} else if (commands instanceof Jedis) {
pipeline = new Pipeline((Jedis) commands);
} else {
throw new IllegalArgumentException(String.format("unknown jedis client type, <%s>", commands.getClass().getName()));
}
return executeWithPipeline(pipeline, map, expireAfterWrite, timeUnit);
StringBinaryCommands writeCommands = (StringBinaryCommands) writeCommands();
return this.<StringBinaryCommands, StringPipelineBinaryCommands, CacheResult>doWithPipeline(writeCommands, true, pipeline -> {
int failCount = 0;
List<Response<String>> responses = new ArrayList<>();
for (Map.Entry<? extends K, ? extends V> en : map.entrySet()) {
CacheValueHolder<V> holder = new CacheValueHolder(en.getValue(), timeUnit.toMillis(expireAfterWrite));
Response<String> resp = pipeline.psetex(buildKey(en.getKey()), timeUnit.toMillis(expireAfterWrite), valueEncoder.apply(holder));
responses.add(resp);
}

sync(pipeline);

for (Response<String> resp : responses) {
if (!"OK".equals(resp.get())) {
failCount++;
}
}
return failCount == 0 ? CacheResult.SUCCESS_WITHOUT_MSG :
failCount == map.size() ? CacheResult.FAIL_WITHOUT_MSG : CacheResult.PART_SUCCESS_WITHOUT_MSG;
});
} catch (Exception ex) {
logError("PUT_ALL", "map(" + map.size() + ")", ex);
return new CacheResult(ex);
} finally {
closeJedis(commands);
close(closeable);
}
}

private CacheResult executeWithPipeline(StringPipelineBinaryCommands pipeline, Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit) {
int failCount = 0;
List<Response<String>> responses = new ArrayList<>();
for (Map.Entry<? extends K, ? extends V> en : map.entrySet()) {
CacheValueHolder<V> holder = new CacheValueHolder(en.getValue(), timeUnit.toMillis(expireAfterWrite));
Response<String> resp = pipeline.psetex(buildKey(en.getKey()), timeUnit.toMillis(expireAfterWrite), valueEncoder.apply(holder));
responses.add(resp);
}
if (pipeline instanceof Pipeline) {
((Pipeline) pipeline).sync();
} else if (pipeline instanceof ClusterPipeline) {
((ClusterPipeline) pipeline).sync();
} else {
throw new UnsupportedOperationException("unrecognized pipeline type");
}
for (Response<String> resp : responses) {
if (!"OK".equals(resp.get())) {
failCount++;
}
}
return failCount == 0 ? CacheResult.SUCCESS_WITHOUT_MSG :
failCount == map.size() ? CacheResult.FAIL_WITHOUT_MSG : CacheResult.PART_SUCCESS_WITHOUT_MSG;
}

@Override
protected CacheResult do_REMOVE(K key) {
Expand Down Expand Up @@ -374,29 +352,35 @@ private CacheResult REMOVE_impl(Object key, byte[] newKey) {

@Override
protected CacheResult do_REMOVE_ALL(Set<? extends K> keys) {
KeyBinaryCommands commands = null;
int x = 0;
if (keys == null || keys.isEmpty()) {
return CacheResult.SUCCESS_WITHOUT_MSG;
}
long[] count = new long[1];
try {
KeyBinaryCommands writeCommands = (KeyBinaryCommands) writeCommands();
byte[][] newKeys = keys.stream().map((k) -> buildKey(k)).toArray((len) -> new byte[keys.size()][]);
commands = (KeyBinaryCommands) writeCommands();
if (commands instanceof JedisCluster) {
for (byte[] newKey : newKeys) {
commands.del(newKey);
x++;
return this.<KeyBinaryCommands, KeyPipelineBinaryCommands, CacheResult>doWithPipeline(writeCommands, false, (pipeline) -> {

if (pipeline != null) {
for (byte[] newKey : newKeys) {
pipeline.del(newKey);
count[0]++;
}

sync(pipeline);
} else {
writeCommands.del(newKeys);
}
} else {
commands.del(newKeys);
}
return CacheResult.SUCCESS_WITHOUT_MSG;

return CacheResult.SUCCESS_WITHOUT_MSG;
});
} catch (Exception ex) {
logError("REMOVE_ALL", "keys(" + keys.size() + ")", ex);
if (commands instanceof JedisCluster && x > 0) {
if (count[0] > 0) {
return new CacheResult(CacheResultCode.PART_SUCCESS, ex.toString());
} else {
return new CacheResult(ex);
}
} finally {
closeJedis(commands);
}
}

Expand Down Expand Up @@ -426,4 +410,58 @@ protected CacheResult do_PUT_IF_ABSENT(K key, V value, long expireAfterWrite, Ti
}
}

/**
* Useful function for redis interaction via Pipeline mode. Resources will be auto close. <br/>
* For batch command, such as {@link StringBinaryCommands#mget(byte[]...)}, {@link KeyBinaryCommands#del(byte[]...)}, the performance of pipeline will be worse,
* so choose them instead of pipeline as much as possible in non-cluster clients, See:
* - https://medium.com/@jychen7/redis-get-pipeline-vs-mget-6e41aeaecef
* - https://stackoverflow.com/questions/73992769/redis-del-many-keys-vs-pipeline-are-both-non-blocking
*
* @param client redisClient
* @param pipelineFirst set as false when only want to use the pipeline on cluster clients.
* @param function operator function
* @param <C> client type
* @param <P> pipeline type
* @param <R> result type
* @return result
*/
@SuppressWarnings("unchecked")
private <C, P, R> R doWithPipeline(C client, boolean pipelineFirst, Function<P, R> function) {
C commands = null;
Closeable closeable = null;
try {
commands = client;
P pipeline = null;
// The connection from JedisPooled or JedisCluster needs to be returned to the pool.
if (commands instanceof JedisCluster) {
ClusterPipeline clusterPipeline = new ClusterPipeline(provider);
closeable = clusterPipeline;
pipeline = (P) clusterPipeline;
} else if (pipelineFirst) {
if (commands instanceof JedisPooled) {
Connection connection = ((JedisPooled) commands).getPool().getResource();
closeable = connection;
pipeline = (P) new Pipeline(connection);
} else if (commands instanceof Jedis) {
pipeline = (P) new Pipeline((Jedis) commands);
}
}

return function.apply(pipeline);
} finally {
closeJedis(commands);
close(closeable);
}
}

private <T> void sync(T pipeline) {
if (pipeline instanceof Pipeline) {
((Pipeline) pipeline).sync();
} else if (pipeline instanceof ClusterPipeline) {
((ClusterPipeline) pipeline).sync();
} else {
throw new UnsupportedOperationException("unrecognized pipeline type");
}
}

}

0 comments on commit 54cd818

Please sign in to comment.