Skip to content

Commit

Permalink
perf: fix encode/decode performance issue with lettuce #908
Browse files Browse the repository at this point in the history
  • Loading branch information
areyouok committed Aug 14, 2024
1 parent 07de031 commit 26b9139
Show file tree
Hide file tree
Showing 14 changed files with 274 additions and 184 deletions.
69 changes: 35 additions & 34 deletions jetcache-core/src/main/java/com/alicp/jetcache/AbstractCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.alicp.jetcache.event.CacheRemoveAllEvent;
import com.alicp.jetcache.event.CacheRemoveEvent;
import com.alicp.jetcache.external.AbstractExternalCache;
import com.alicp.jetcache.support.JetCacheExecutor;
import com.alicp.jetcache.support.SquashedLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -18,6 +19,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -46,7 +48,7 @@ ConcurrentHashMap<Object, LoaderLock> initOrGetLoaderMap() {
if (loaderMap == null) {
loaderMap = new ConcurrentHashMap<>();
}
}finally {
} finally {
reentrantLock.unlock();
}
}
Expand All @@ -72,6 +74,19 @@ protected void logError(String oper, Object key, Throwable e) {
}

public void notify(CacheEvent e) {
notify0(e);
}

private void notify(CacheResult r, CacheEvent e) {
CompletionStage<?> f = r.future();
if (f.toCompletableFuture().isDone()) {
notify0(e);
} else {
f.thenRunAsync(() -> notify0(e), JetCacheExecutor.defaultExecutor());
}
}

private void notify0(CacheEvent e) {
List<CacheMonitor> monitors = config().getMonitors();
for (CacheMonitor m : monitors) {
m.afterOperation(e);
Expand All @@ -87,10 +102,8 @@ public final CacheGetResult<V> GET(K key) {
} else {
result = do_GET(key);
}
result.future().thenRun(() -> {
CacheGetEvent event = new CacheGetEvent(this, System.currentTimeMillis() - t, key, result);
notify(event);
});
CacheGetEvent event = new CacheGetEvent(this, System.currentTimeMillis() - t, key, result);
notify(result, event);
return result;
}

Expand All @@ -105,10 +118,8 @@ public final MultiGetResult<K, V> GET_ALL(Set<? extends K> keys) {
} else {
result = do_GET_ALL(keys);
}
result.future().thenRun(() -> {
CacheGetAllEvent event = new CacheGetAllEvent(this, System.currentTimeMillis() - t, keys, result);
notify(event);
});
CacheGetAllEvent event = new CacheGetAllEvent(this, System.currentTimeMillis() - t, keys, result);
notify(result, event);
return result;
}

Expand Down Expand Up @@ -138,7 +149,7 @@ private static <K, V> boolean needUpdate(V loadedValue, boolean cacheNullWhenLoa
}

static <K, V> V computeIfAbsentImpl(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull,
long expireAfterWrite, TimeUnit timeUnit, Cache<K, V> cache) {
long expireAfterWrite, TimeUnit timeUnit, Cache<K, V> cache) {
AbstractCache<K, V> abstractCache = CacheUtil.getAbstractCache(cache);
CacheLoader<K, V> newLoader = CacheUtil.createProxyLoader(cache, loader, abstractCache::notify);
CacheGetResult<V> r;
Expand All @@ -153,7 +164,7 @@ static <K, V> V computeIfAbsentImpl(K key, Function<K, V> loader, boolean cacheN
return r.getValue();
} else {
Consumer<V> cacheUpdater = (loadedValue) -> {
if(needUpdate(loadedValue, cacheNullWhenLoaderReturnNull, newLoader)) {
if (needUpdate(loadedValue, cacheNullWhenLoaderReturnNull, newLoader)) {
if (timeUnit != null) {
cache.PUT(key, loadedValue, expireAfterWrite, timeUnit).waitForResult();
} else {
Expand All @@ -174,7 +185,7 @@ static <K, V> V computeIfAbsentImpl(K key, Function<K, V> loader, boolean cacheN
}
}

static <K, V> V synchronizedLoad(CacheConfig config, AbstractCache<K,V> abstractCache,
static <K, V> V synchronizedLoad(CacheConfig config, AbstractCache<K, V> abstractCache,
K key, Function<K, V> newLoader, Consumer<V> cacheUpdater) {
ConcurrentHashMap<Object, LoaderLock> loaderMap = abstractCache.initOrGetLoaderMap();
Object lockKey = buildLoaderLockKey(abstractCache, key);
Expand Down Expand Up @@ -214,7 +225,7 @@ static <K, V> V synchronizedLoad(CacheConfig config, AbstractCache<K,V> abstract
ll.signal.await();
} else {
boolean ok = ll.signal.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
if(!ok) {
if (!ok) {
logger.info("loader wait timeout:" + timeout);
return newLoader.apply(key);
}
Expand Down Expand Up @@ -242,7 +253,7 @@ private static Object buildLoaderLockKey(Cache c, Object key) {
} else if (c instanceof MultiLevelCache) {
c = ((MultiLevelCache) c).caches()[0];
return buildLoaderLockKey(c, key);
} else if(c instanceof ProxyCache) {
} else if (c instanceof ProxyCache) {
c = ((ProxyCache) c).getTargetCache();
return buildLoaderLockKey(c, key);
} else {
Expand All @@ -259,10 +270,8 @@ public final CacheResult PUT(K key, V value, long expireAfterWrite, TimeUnit tim
} else {
result = do_PUT(key, value, expireAfterWrite, timeUnit);
}
result.future().thenRun(() -> {
CachePutEvent event = new CachePutEvent(this, System.currentTimeMillis() - t, key, value, result);
notify(event);
});
CachePutEvent event = new CachePutEvent(this, System.currentTimeMillis() - t, key, value, result);
notify(result, event);
return result;
}

Expand All @@ -277,10 +286,8 @@ public final CacheResult PUT_ALL(Map<? extends K, ? extends V> map, long expireA
} else {
result = do_PUT_ALL(map, expireAfterWrite, timeUnit);
}
result.future().thenRun(() -> {
CachePutAllEvent event = new CachePutAllEvent(this, System.currentTimeMillis() - t, map, result);
notify(event);
});
CachePutAllEvent event = new CachePutAllEvent(this, System.currentTimeMillis() - t, map, result);
notify(result, event);
return result;
}

Expand All @@ -295,10 +302,8 @@ public final CacheResult REMOVE(K key) {
} else {
result = do_REMOVE(key);
}
result.future().thenRun(() -> {
CacheRemoveEvent event = new CacheRemoveEvent(this, System.currentTimeMillis() - t, key, result);
notify(event);
});
CacheRemoveEvent event = new CacheRemoveEvent(this, System.currentTimeMillis() - t, key, result);
notify(result, event);
return result;
}

Expand All @@ -313,10 +318,8 @@ public final CacheResult REMOVE_ALL(Set<? extends K> keys) {
} else {
result = do_REMOVE_ALL(keys);
}
result.future().thenRun(() -> {
CacheRemoveAllEvent event = new CacheRemoveAllEvent(this, System.currentTimeMillis() - t, keys, result);
notify(event);
});
CacheRemoveAllEvent event = new CacheRemoveAllEvent(this, System.currentTimeMillis() - t, keys, result);
notify(result, event);
return result;
}

Expand All @@ -331,10 +334,8 @@ public final CacheResult PUT_IF_ABSENT(K key, V value, long expireAfterWrite, Ti
} else {
result = do_PUT_IF_ABSENT(key, value, expireAfterWrite, timeUnit);
}
result.future().thenRun(() -> {
CachePutEvent event = new CachePutEvent(this, System.currentTimeMillis() - t, key, value, result);
notify(event);
});
CachePutEvent event = new CachePutEvent(this, System.currentTimeMillis() - t, key, value, result);
notify(result, event);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.alicp.jetcache.event;

import com.alicp.jetcache.Cache;
import com.alicp.jetcache.support.Epoch;

/**
* The CacheEvent is used in single JVM while CacheMessage used for distributed message.
Expand All @@ -12,6 +13,8 @@
*/
public class CacheEvent {

private final long epoch = Epoch.get();

protected Cache cache;

public CacheEvent(Cache cache) {
Expand All @@ -22,4 +25,7 @@ public Cache getCache() {
return cache;
}

public long getEpoch() {
return epoch;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
*/
public class MockRemoteCacheBuilder<T extends ExternalCacheBuilder<T>> extends ExternalCacheBuilder<T> {

private static boolean subscribeStart;
private static CacheMessage lastPublishMessage;
private static volatile boolean subscribeStart;
private static volatile CacheMessage lastPublishMessage;

public static class MockRemoteCacheBuilderImpl extends MockRemoteCacheBuilder<MockRemoteCacheBuilderImpl> {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class DefaultCacheMonitor implements CacheMonitor {
private final ReentrantLock reentrantLock = new ReentrantLock();
protected CacheStat cacheStat;
private String cacheName;
private long epoch;

public DefaultCacheMonitor(String cacheName) {
if (cacheName == null) {
Expand All @@ -51,6 +52,8 @@ public void resetStat() {
cacheStat = new CacheStat();
cacheStat.setStatStartTime(System.currentTimeMillis());
cacheStat.setCacheName(cacheName);
Epoch.increment();
epoch = Epoch.get();
}finally {
reentrantLock.unlock();
}
Expand All @@ -71,6 +74,9 @@ public CacheStat getCacheStat() {
public void afterOperation(CacheEvent event) {
reentrantLock.lock();
try {
if (event.getEpoch() < epoch) {
return;
}
if (event instanceof CacheGetEvent) {
CacheGetEvent e = (CacheGetEvent) event;
afterGet(e.getMillis(), e.getKey(), e.getResult());
Expand Down
20 changes: 20 additions & 0 deletions jetcache-core/src/main/java/com/alicp/jetcache/support/Epoch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.alicp.jetcache.support;

import java.util.concurrent.atomic.AtomicLong;

/**
* Created on 2024/08/13.
*
* @author huangli
*/
public class Epoch {
private static final AtomicLong V = new AtomicLong();

public static long increment() {
return V.incrementAndGet();
}

public static long get() {
return V.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public static ScheduledExecutorService defaultExecutor() {
t.setDaemon(true);
return t;
};
defaultExecutor = new ScheduledThreadPoolExecutor(
1, tf, new ThreadPoolExecutor.DiscardPolicy());
int coreSize = Math.min(4, Runtime.getRuntime().availableProcessors());
defaultExecutor = new ScheduledThreadPoolExecutor(coreSize, tf);
}
}finally {
reentrantLock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void startSubscribe() {
this.pubSubAdapter = new RedisPubSubAdapter<byte[], byte[]>() {
@Override
public void message(byte[] channel, byte[] message) {
processNotification(message, config.getValueDecoder());
JetCacheExecutor.defaultExecutor().execute(() -> processNotification(message, config.getValueDecoder()));
}
};
config.getPubSubConnection().addListener(this.pubSubAdapter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,9 @@ protected CacheGetResult<V> do_GET(K key) {
try {
byte[] newKey = buildKey(key);
RedisFuture<byte[]> future = stringAsyncCommands.get(newKey);
CacheGetResult<V> result = new CacheGetResult<>(future.handle((valueBytes, ex) -> {
CacheGetResult<V> result = new CacheGetResult<>(future.handleAsync((valueBytes, ex) -> {
if (ex != null) {
JetCacheExecutor.defaultExecutor().execute(() -> logError("GET", key, ex));
logError("GET", key, ex);
return new ResultData(ex);
} else {
try {
Expand All @@ -183,7 +183,7 @@ protected CacheGetResult<V> do_GET(K key) {
return new ResultData(exception);
}
}
}));
}, JetCacheExecutor.defaultExecutor()));
setTimeout(result);
return result;
} catch (Exception ex) {
Expand All @@ -203,9 +203,9 @@ protected MultiGetResult<K, V> do_GET_ALL(Set<? extends K> keys) {
return new MultiGetResult<K, V>(CacheResultCode.SUCCESS, null, resultMap);
}
RedisFuture<List<KeyValue<byte[],byte[]>>> mgetResults = stringAsyncCommands.mget(newKeys);
MultiGetResult<K, V> result = new MultiGetResult<>(mgetResults.handle((list, ex) -> {
MultiGetResult<K, V> result = new MultiGetResult<>(mgetResults.handleAsync((list, ex) -> {
if (ex != null) {
JetCacheExecutor.defaultExecutor().execute(() -> logError("GET_ALL", "keys(" + keys.size() + ")", ex));
logError("GET_ALL", "keys(" + keys.size() + ")", ex);
return new ResultData(ex);
} else {
try {
Expand All @@ -230,7 +230,7 @@ protected MultiGetResult<K, V> do_GET_ALL(Set<? extends K> keys) {
return new ResultData(exception);
}
}
}));
}, JetCacheExecutor.defaultExecutor()));
setTimeout(result);
return result;
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public static void waitUtil(Object expectValue, Supplier<? extends Object> actua
int waitCount = 0;
while (deadline - System.nanoTime() > 0) {
try {
Thread.sleep(5);
Thread.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;

import static com.alicp.jetcache.test.support.Tick.tick;

/**
* Created on 2017/5/24.
*
Expand All @@ -34,19 +36,19 @@ public static void loadingCacheTest(Cache cache, long waitMillis) throws Excepti
AtomicInteger count = new AtomicInteger(0);
CacheLoader oldLoader = cache.config().getLoader();
cache.config().setLoader((key) -> key + "_V" + count.getAndIncrement());
loadingCacheTestImpl(cache, waitMillis);
vetoTest(cache, waitMillis);
nullValueTest(cache, waitMillis);
loadingCacheTestImpl(cache, tick(waitMillis));
vetoTest(cache, tick(waitMillis));
nullValueTest(cache, tick(waitMillis));
cache.config().setLoader(oldLoader);
}

public static void loadingCacheTest(AbstractCacheBuilder builder, long waitMillis) throws Exception {
AtomicInteger count = new AtomicInteger(0);
builder.loader((key) -> key + "_V" + count.getAndIncrement());
Cache cache = builder.buildCache();
loadingCacheTestImpl(cache, waitMillis);
vetoTest(cache, waitMillis);
nullValueTest(cache, waitMillis);
loadingCacheTestImpl(cache, tick(waitMillis));
vetoTest(cache, tick(waitMillis));
nullValueTest(cache, tick(waitMillis));
}

private static void vetoTest(Cache cache, long waitMillis) throws Exception {
Expand Down
Loading

0 comments on commit 26b9139

Please sign in to comment.