diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/AbstractCache.java b/jetcache-core/src/main/java/com/alicp/jetcache/AbstractCache.java index 26daa253..04deec3f 100644 --- a/jetcache-core/src/main/java/com/alicp/jetcache/AbstractCache.java +++ b/jetcache-core/src/main/java/com/alicp/jetcache/AbstractCache.java @@ -9,6 +9,7 @@ import com.alicp.jetcache.event.CacheRemoveAllEvent; import com.alicp.jetcache.event.CacheRemoveEvent; import com.alicp.jetcache.external.AbstractExternalCache; +import com.alicp.jetcache.support.JetCacheExecutor; import com.alicp.jetcache.support.SquashedLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,6 +19,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -46,7 +48,7 @@ ConcurrentHashMap initOrGetLoaderMap() { if (loaderMap == null) { loaderMap = new ConcurrentHashMap<>(); } - }finally { + } finally { reentrantLock.unlock(); } } @@ -72,6 +74,19 @@ protected void logError(String oper, Object key, Throwable e) { } public void notify(CacheEvent e) { + notify0(e); + } + + private void notify(CacheResult r, CacheEvent e) { + CompletionStage f = r.future(); + if (f.toCompletableFuture().isDone()) { + notify0(e); + } else { + f.thenRunAsync(() -> notify0(e), JetCacheExecutor.defaultExecutor()); + } + } + + private void notify0(CacheEvent e) { List monitors = config().getMonitors(); for (CacheMonitor m : monitors) { m.afterOperation(e); @@ -87,10 +102,8 @@ public final CacheGetResult GET(K key) { } else { result = do_GET(key); } - result.future().thenRun(() -> { - CacheGetEvent event = new CacheGetEvent(this, System.currentTimeMillis() - t, key, result); - notify(event); - }); + CacheGetEvent event = new CacheGetEvent(this, System.currentTimeMillis() - t, key, result); + notify(result, event); return result; } @@ -105,10 +118,8 @@ public final MultiGetResult GET_ALL(Set keys) { } else { result = do_GET_ALL(keys); } - result.future().thenRun(() -> { - CacheGetAllEvent event = new CacheGetAllEvent(this, System.currentTimeMillis() - t, keys, result); - notify(event); - }); + CacheGetAllEvent event = new CacheGetAllEvent(this, System.currentTimeMillis() - t, keys, result); + notify(result, event); return result; } @@ -138,7 +149,7 @@ private static boolean needUpdate(V loadedValue, boolean cacheNullWhenLoa } static V computeIfAbsentImpl(K key, Function loader, boolean cacheNullWhenLoaderReturnNull, - long expireAfterWrite, TimeUnit timeUnit, Cache cache) { + long expireAfterWrite, TimeUnit timeUnit, Cache cache) { AbstractCache abstractCache = CacheUtil.getAbstractCache(cache); CacheLoader newLoader = CacheUtil.createProxyLoader(cache, loader, abstractCache::notify); CacheGetResult r; @@ -153,7 +164,7 @@ static V computeIfAbsentImpl(K key, Function loader, boolean cacheN return r.getValue(); } else { Consumer cacheUpdater = (loadedValue) -> { - if(needUpdate(loadedValue, cacheNullWhenLoaderReturnNull, newLoader)) { + if (needUpdate(loadedValue, cacheNullWhenLoaderReturnNull, newLoader)) { if (timeUnit != null) { cache.PUT(key, loadedValue, expireAfterWrite, timeUnit).waitForResult(); } else { @@ -174,7 +185,7 @@ static V computeIfAbsentImpl(K key, Function loader, boolean cacheN } } - static V synchronizedLoad(CacheConfig config, AbstractCache abstractCache, + static V synchronizedLoad(CacheConfig config, AbstractCache abstractCache, K key, Function newLoader, Consumer cacheUpdater) { ConcurrentHashMap loaderMap = abstractCache.initOrGetLoaderMap(); Object lockKey = buildLoaderLockKey(abstractCache, key); @@ -214,7 +225,7 @@ static V synchronizedLoad(CacheConfig config, AbstractCache abstract ll.signal.await(); } else { boolean ok = ll.signal.await(timeout.toMillis(), TimeUnit.MILLISECONDS); - if(!ok) { + if (!ok) { logger.info("loader wait timeout:" + timeout); return newLoader.apply(key); } @@ -242,7 +253,7 @@ private static Object buildLoaderLockKey(Cache c, Object key) { } else if (c instanceof MultiLevelCache) { c = ((MultiLevelCache) c).caches()[0]; return buildLoaderLockKey(c, key); - } else if(c instanceof ProxyCache) { + } else if (c instanceof ProxyCache) { c = ((ProxyCache) c).getTargetCache(); return buildLoaderLockKey(c, key); } else { @@ -259,10 +270,8 @@ public final CacheResult PUT(K key, V value, long expireAfterWrite, TimeUnit tim } else { result = do_PUT(key, value, expireAfterWrite, timeUnit); } - result.future().thenRun(() -> { - CachePutEvent event = new CachePutEvent(this, System.currentTimeMillis() - t, key, value, result); - notify(event); - }); + CachePutEvent event = new CachePutEvent(this, System.currentTimeMillis() - t, key, value, result); + notify(result, event); return result; } @@ -277,10 +286,8 @@ public final CacheResult PUT_ALL(Map map, long expireA } else { result = do_PUT_ALL(map, expireAfterWrite, timeUnit); } - result.future().thenRun(() -> { - CachePutAllEvent event = new CachePutAllEvent(this, System.currentTimeMillis() - t, map, result); - notify(event); - }); + CachePutAllEvent event = new CachePutAllEvent(this, System.currentTimeMillis() - t, map, result); + notify(result, event); return result; } @@ -295,10 +302,8 @@ public final CacheResult REMOVE(K key) { } else { result = do_REMOVE(key); } - result.future().thenRun(() -> { - CacheRemoveEvent event = new CacheRemoveEvent(this, System.currentTimeMillis() - t, key, result); - notify(event); - }); + CacheRemoveEvent event = new CacheRemoveEvent(this, System.currentTimeMillis() - t, key, result); + notify(result, event); return result; } @@ -313,10 +318,8 @@ public final CacheResult REMOVE_ALL(Set keys) { } else { result = do_REMOVE_ALL(keys); } - result.future().thenRun(() -> { - CacheRemoveAllEvent event = new CacheRemoveAllEvent(this, System.currentTimeMillis() - t, keys, result); - notify(event); - }); + CacheRemoveAllEvent event = new CacheRemoveAllEvent(this, System.currentTimeMillis() - t, keys, result); + notify(result, event); return result; } @@ -331,10 +334,8 @@ public final CacheResult PUT_IF_ABSENT(K key, V value, long expireAfterWrite, Ti } else { result = do_PUT_IF_ABSENT(key, value, expireAfterWrite, timeUnit); } - result.future().thenRun(() -> { - CachePutEvent event = new CachePutEvent(this, System.currentTimeMillis() - t, key, value, result); - notify(event); - }); + CachePutEvent event = new CachePutEvent(this, System.currentTimeMillis() - t, key, value, result); + notify(result, event); return result; } diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/event/CacheEvent.java b/jetcache-core/src/main/java/com/alicp/jetcache/event/CacheEvent.java index 9b269cb4..d8c9fef4 100644 --- a/jetcache-core/src/main/java/com/alicp/jetcache/event/CacheEvent.java +++ b/jetcache-core/src/main/java/com/alicp/jetcache/event/CacheEvent.java @@ -4,6 +4,7 @@ package com.alicp.jetcache.event; import com.alicp.jetcache.Cache; +import com.alicp.jetcache.support.Epoch; /** * The CacheEvent is used in single JVM while CacheMessage used for distributed message. @@ -12,6 +13,8 @@ */ public class CacheEvent { + private final long epoch = Epoch.get(); + protected Cache cache; public CacheEvent(Cache cache) { @@ -22,4 +25,7 @@ public Cache getCache() { return cache; } + public long getEpoch() { + return epoch; + } } diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/external/MockRemoteCacheBuilder.java b/jetcache-core/src/main/java/com/alicp/jetcache/external/MockRemoteCacheBuilder.java index 280e2be6..c9d6cf72 100644 --- a/jetcache-core/src/main/java/com/alicp/jetcache/external/MockRemoteCacheBuilder.java +++ b/jetcache-core/src/main/java/com/alicp/jetcache/external/MockRemoteCacheBuilder.java @@ -12,8 +12,8 @@ */ public class MockRemoteCacheBuilder> extends ExternalCacheBuilder { - private static boolean subscribeStart; - private static CacheMessage lastPublishMessage; + private static volatile boolean subscribeStart; + private static volatile CacheMessage lastPublishMessage; public static class MockRemoteCacheBuilderImpl extends MockRemoteCacheBuilder { } diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/support/DefaultCacheMonitor.java b/jetcache-core/src/main/java/com/alicp/jetcache/support/DefaultCacheMonitor.java index aba1b0ed..2315f9aa 100644 --- a/jetcache-core/src/main/java/com/alicp/jetcache/support/DefaultCacheMonitor.java +++ b/jetcache-core/src/main/java/com/alicp/jetcache/support/DefaultCacheMonitor.java @@ -32,6 +32,7 @@ public class DefaultCacheMonitor implements CacheMonitor { private final ReentrantLock reentrantLock = new ReentrantLock(); protected CacheStat cacheStat; private String cacheName; + private long epoch; public DefaultCacheMonitor(String cacheName) { if (cacheName == null) { @@ -51,6 +52,8 @@ public void resetStat() { cacheStat = new CacheStat(); cacheStat.setStatStartTime(System.currentTimeMillis()); cacheStat.setCacheName(cacheName); + Epoch.increment(); + epoch = Epoch.get(); }finally { reentrantLock.unlock(); } @@ -71,6 +74,9 @@ public CacheStat getCacheStat() { public void afterOperation(CacheEvent event) { reentrantLock.lock(); try { + if (event.getEpoch() < epoch) { + return; + } if (event instanceof CacheGetEvent) { CacheGetEvent e = (CacheGetEvent) event; afterGet(e.getMillis(), e.getKey(), e.getResult()); diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/support/Epoch.java b/jetcache-core/src/main/java/com/alicp/jetcache/support/Epoch.java new file mode 100644 index 00000000..72c7432c --- /dev/null +++ b/jetcache-core/src/main/java/com/alicp/jetcache/support/Epoch.java @@ -0,0 +1,20 @@ +package com.alicp.jetcache.support; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Created on 2024/08/13. + * + * @author huangli + */ +public class Epoch { + private static final AtomicLong V = new AtomicLong(); + + public static long increment() { + return V.incrementAndGet(); + } + + public static long get() { + return V.get(); + } +} diff --git a/jetcache-core/src/main/java/com/alicp/jetcache/support/JetCacheExecutor.java b/jetcache-core/src/main/java/com/alicp/jetcache/support/JetCacheExecutor.java index 324807be..a5b4ea55 100644 --- a/jetcache-core/src/main/java/com/alicp/jetcache/support/JetCacheExecutor.java +++ b/jetcache-core/src/main/java/com/alicp/jetcache/support/JetCacheExecutor.java @@ -45,8 +45,8 @@ public static ScheduledExecutorService defaultExecutor() { t.setDaemon(true); return t; }; - defaultExecutor = new ScheduledThreadPoolExecutor( - 1, tf, new ThreadPoolExecutor.DiscardPolicy()); + int coreSize = Math.min(4, Runtime.getRuntime().availableProcessors()); + defaultExecutor = new ScheduledThreadPoolExecutor(coreSize, tf); } }finally { reentrantLock.unlock(); diff --git a/jetcache-support/jetcache-redis-lettuce/src/main/java/com/alicp/jetcache/redis/lettuce/LettuceBroadcastManager.java b/jetcache-support/jetcache-redis-lettuce/src/main/java/com/alicp/jetcache/redis/lettuce/LettuceBroadcastManager.java index 1e12d6a4..30079a7a 100644 --- a/jetcache-support/jetcache-redis-lettuce/src/main/java/com/alicp/jetcache/redis/lettuce/LettuceBroadcastManager.java +++ b/jetcache-support/jetcache-redis-lettuce/src/main/java/com/alicp/jetcache/redis/lettuce/LettuceBroadcastManager.java @@ -85,7 +85,7 @@ public void startSubscribe() { this.pubSubAdapter = new RedisPubSubAdapter() { @Override public void message(byte[] channel, byte[] message) { - processNotification(message, config.getValueDecoder()); + JetCacheExecutor.defaultExecutor().execute(() -> processNotification(message, config.getValueDecoder())); } }; config.getPubSubConnection().addListener(this.pubSubAdapter); diff --git a/jetcache-support/jetcache-redis-lettuce/src/main/java/com/alicp/jetcache/redis/lettuce/RedisLettuceCache.java b/jetcache-support/jetcache-redis-lettuce/src/main/java/com/alicp/jetcache/redis/lettuce/RedisLettuceCache.java index 0b6368cd..99909be9 100644 --- a/jetcache-support/jetcache-redis-lettuce/src/main/java/com/alicp/jetcache/redis/lettuce/RedisLettuceCache.java +++ b/jetcache-support/jetcache-redis-lettuce/src/main/java/com/alicp/jetcache/redis/lettuce/RedisLettuceCache.java @@ -162,9 +162,9 @@ protected CacheGetResult do_GET(K key) { try { byte[] newKey = buildKey(key); RedisFuture future = stringAsyncCommands.get(newKey); - CacheGetResult result = new CacheGetResult<>(future.handle((valueBytes, ex) -> { + CacheGetResult result = new CacheGetResult<>(future.handleAsync((valueBytes, ex) -> { if (ex != null) { - JetCacheExecutor.defaultExecutor().execute(() -> logError("GET", key, ex)); + logError("GET", key, ex); return new ResultData(ex); } else { try { @@ -183,7 +183,7 @@ protected CacheGetResult do_GET(K key) { return new ResultData(exception); } } - })); + }, JetCacheExecutor.defaultExecutor())); setTimeout(result); return result; } catch (Exception ex) { @@ -203,9 +203,9 @@ protected MultiGetResult do_GET_ALL(Set keys) { return new MultiGetResult(CacheResultCode.SUCCESS, null, resultMap); } RedisFuture>> mgetResults = stringAsyncCommands.mget(newKeys); - MultiGetResult result = new MultiGetResult<>(mgetResults.handle((list, ex) -> { + MultiGetResult result = new MultiGetResult<>(mgetResults.handleAsync((list, ex) -> { if (ex != null) { - JetCacheExecutor.defaultExecutor().execute(() -> logError("GET_ALL", "keys(" + keys.size() + ")", ex)); + logError("GET_ALL", "keys(" + keys.size() + ")", ex); return new ResultData(ex); } else { try { @@ -230,7 +230,7 @@ protected MultiGetResult do_GET_ALL(Set keys) { return new ResultData(exception); } } - })); + }, JetCacheExecutor.defaultExecutor())); setTimeout(result); return result; } catch (Exception ex) { diff --git a/jetcache-test/src/main/java/com/alicp/jetcache/test/anno/TestUtil.java b/jetcache-test/src/main/java/com/alicp/jetcache/test/anno/TestUtil.java index 98ef6f75..65a35615 100644 --- a/jetcache-test/src/main/java/com/alicp/jetcache/test/anno/TestUtil.java +++ b/jetcache-test/src/main/java/com/alicp/jetcache/test/anno/TestUtil.java @@ -80,7 +80,7 @@ public static void waitUtil(Object expectValue, Supplier actua int waitCount = 0; while (deadline - System.nanoTime() > 0) { try { - Thread.sleep(5); + Thread.sleep(1); } catch (InterruptedException e) { throw new RuntimeException(e); } diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/LoadingCacheTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/LoadingCacheTest.java index 11b7581c..e8035ae6 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/LoadingCacheTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/LoadingCacheTest.java @@ -13,6 +13,8 @@ import java.util.TreeSet; import java.util.concurrent.atomic.AtomicInteger; +import static com.alicp.jetcache.test.support.Tick.tick; + /** * Created on 2017/5/24. * @@ -34,9 +36,9 @@ public static void loadingCacheTest(Cache cache, long waitMillis) throws Excepti AtomicInteger count = new AtomicInteger(0); CacheLoader oldLoader = cache.config().getLoader(); cache.config().setLoader((key) -> key + "_V" + count.getAndIncrement()); - loadingCacheTestImpl(cache, waitMillis); - vetoTest(cache, waitMillis); - nullValueTest(cache, waitMillis); + loadingCacheTestImpl(cache, tick(waitMillis)); + vetoTest(cache, tick(waitMillis)); + nullValueTest(cache, tick(waitMillis)); cache.config().setLoader(oldLoader); } @@ -44,9 +46,9 @@ public static void loadingCacheTest(AbstractCacheBuilder builder, long waitMilli AtomicInteger count = new AtomicInteger(0); builder.loader((key) -> key + "_V" + count.getAndIncrement()); Cache cache = builder.buildCache(); - loadingCacheTestImpl(cache, waitMillis); - vetoTest(cache, waitMillis); - nullValueTest(cache, waitMillis); + loadingCacheTestImpl(cache, tick(waitMillis)); + vetoTest(cache, tick(waitMillis)); + nullValueTest(cache, tick(waitMillis)); } private static void vetoTest(Cache cache, long waitMillis) throws Exception { diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/RefreshCacheTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/RefreshCacheTest.java index 51b61446..964ec156 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/RefreshCacheTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/RefreshCacheTest.java @@ -4,9 +4,11 @@ import com.alicp.jetcache.embedded.LinkedHashMapCacheBuilder; import com.alicp.jetcache.external.AbstractExternalCache; import com.alicp.jetcache.external.MockRemoteCacheBuilder; +import com.alicp.jetcache.support.CacheStat; import com.alicp.jetcache.support.DefaultCacheMonitor; import com.alicp.jetcache.support.FastjsonKeyConvertor; import com.alicp.jetcache.test.AbstractCacheTest; +import com.alicp.jetcache.test.anno.TestUtil; import com.alicp.jetcache.testsupport.Sleeper; import org.junit.Assert; import org.junit.Before; @@ -181,35 +183,29 @@ public boolean vetoCacheUpdate() { cache.config().setLoader(oldLoader); } + private static void waitUtil(DefaultCacheMonitor monitor, long getCount, long getHitCount, long getMissCount, + long loadCount, long getPutCount) { + TestUtil.waitUtil(() -> { + CacheStat s = monitor.getCacheStat(); + return s.getGetCount() == getCount && s.getGetHitCount() == getHitCount && + s.getGetMissCount() == getMissCount && s.getLoadCount() == loadCount && + s.getPutCount() == getPutCount; + }); + } + private static void refreshCacheTest1(Cache cache) throws Exception { DefaultCacheMonitor monitor = new DefaultCacheMonitor("test"); cache.config().getMonitors().add(monitor); long refreshMillis = cache.config().getRefreshPolicy().getRefreshMillis(); Assert.assertEquals("refreshCacheTest1_K1_V0", cache.get("refreshCacheTest1_K1")); - Assert.assertEquals(1, monitor.getCacheStat().getGetCount()); - Assert.assertEquals(0, monitor.getCacheStat().getGetHitCount()); - Assert.assertEquals(1, monitor.getCacheStat().getGetMissCount()); - Assert.assertEquals(1, monitor.getCacheStat().getLoadCount()); - Assert.assertEquals(1, monitor.getCacheStat().getPutCount()); + waitUtil(monitor, 1, 0, 1, 1, 1); Assert.assertEquals("refreshCacheTest1_K2_V1", cache.get("refreshCacheTest1_K2")); - Assert.assertEquals(2, monitor.getCacheStat().getGetCount()); - Assert.assertEquals(0, monitor.getCacheStat().getGetHitCount()); - Assert.assertEquals(2, monitor.getCacheStat().getGetMissCount()); - Assert.assertEquals(2, monitor.getCacheStat().getLoadCount()); - Assert.assertEquals(2, monitor.getCacheStat().getPutCount()); + waitUtil(monitor, 2, 0, 2, 2, 2); Assert.assertEquals("refreshCacheTest1_K1_V0", cache.get("refreshCacheTest1_K1")); - Assert.assertEquals(3, monitor.getCacheStat().getGetCount()); - Assert.assertEquals(1, monitor.getCacheStat().getGetHitCount()); - Assert.assertEquals(2, monitor.getCacheStat().getGetMissCount()); - Assert.assertEquals(2, monitor.getCacheStat().getLoadCount()); - Assert.assertEquals(2, monitor.getCacheStat().getPutCount()); + waitUtil(monitor, 3, 1, 2, 2, 2); Assert.assertEquals("refreshCacheTest1_K2_V1", cache.get("refreshCacheTest1_K2")); - Assert.assertEquals(4, monitor.getCacheStat().getGetCount()); - Assert.assertEquals(2, monitor.getCacheStat().getGetHitCount()); - Assert.assertEquals(2, monitor.getCacheStat().getGetMissCount()); - Assert.assertEquals(2, monitor.getCacheStat().getLoadCount()); - Assert.assertEquals(2, monitor.getCacheStat().getPutCount()); + waitUtil(monitor, 4, 2, 2, 2, 2); Thread.sleep((long) (1.5 * refreshMillis)); @@ -219,31 +215,20 @@ private static void refreshCacheTest1(Cache cache) throws Exception { Assert.assertEquals(4, monitor.getCacheStat().getLoadCount()); Assert.assertNotEquals("refreshCacheTest1_K1_V0", cache.get("refreshCacheTest1_K1")); if (external && !multiLevel) { - Assert.assertEquals(5 + 2/*timestamp*/, monitor.getCacheStat().getGetCount()); - Assert.assertEquals(3, monitor.getCacheStat().getGetHitCount()); - Assert.assertEquals(2 + 2/*timestamp*/, monitor.getCacheStat().getGetMissCount()); - Assert.assertEquals(4, monitor.getCacheStat().getLoadCount()); - Assert.assertEquals(4 + 2/*timestamp*/ + 2/*tryLock -> putIfAbsent*/, monitor.getCacheStat().getPutCount()); + long getCount = 5 + 2/*timestamp*/; + long getHitCount = 3; + long getMissCount = 2 + 2/*timestamp*/; + long loadCount = 4; + long putCount = 4 + 2/*timestamp*/ + 2/*tryLock -> putIfAbsent*/; + waitUtil(monitor, getCount, getHitCount, getMissCount, loadCount, putCount); } else { - Assert.assertEquals(5, monitor.getCacheStat().getGetCount()); - Assert.assertEquals(3, monitor.getCacheStat().getGetHitCount()); - Assert.assertEquals(2, monitor.getCacheStat().getGetMissCount()); - Assert.assertEquals(4, monitor.getCacheStat().getLoadCount()); - Assert.assertEquals(4, monitor.getCacheStat().getPutCount()); + waitUtil(monitor, 5, 3, 2, 4, 4); } Assert.assertNotEquals("refreshCacheTest1_K2_V1", cache.get("refreshCacheTest1_K2")); if (external && !multiLevel) { - Assert.assertEquals(6 + 2, monitor.getCacheStat().getGetCount()); - Assert.assertEquals(4, monitor.getCacheStat().getGetHitCount()); - Assert.assertEquals(2 + 2, monitor.getCacheStat().getGetMissCount()); - Assert.assertEquals(4, monitor.getCacheStat().getLoadCount()); - Assert.assertEquals(4 + 2 + 2, monitor.getCacheStat().getPutCount()); + waitUtil(monitor, 6 + 2, 4, 2 + 2, 4, 4 + 2 + 2); } else { - Assert.assertEquals(6, monitor.getCacheStat().getGetCount()); - Assert.assertEquals(4, monitor.getCacheStat().getGetHitCount()); - Assert.assertEquals(2, monitor.getCacheStat().getGetMissCount()); - Assert.assertEquals(4, monitor.getCacheStat().getLoadCount()); - Assert.assertEquals(4, monitor.getCacheStat().getPutCount()); + waitUtil(monitor, 6, 4, 2, 4, 4); } cache.config().getMonitors().remove(monitor); @@ -261,11 +246,7 @@ private static void refreshCacheTest2(Cache cache) throws Exception { Map values = cache.getAll(s); long key1StartRefreshTime = System.currentTimeMillis(); - Assert.assertEquals(2, monitor.getCacheStat().getGetCount()); - Assert.assertEquals(0, monitor.getCacheStat().getGetHitCount()); - Assert.assertEquals(2, monitor.getCacheStat().getGetMissCount()); - Assert.assertEquals(2, monitor.getCacheStat().getLoadCount()); - Assert.assertEquals(2, monitor.getCacheStat().getPutCount()); + waitUtil(monitor, 2, 0, 2, 2, 2); while (true) { long sleepTime = stopRefresh / 5; diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/redis/lettuce/RedisLettuceCacheFailTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/redis/lettuce/RedisLettuceCacheFailTest.java index c346c7d6..27132fa9 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/redis/lettuce/RedisLettuceCacheFailTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/redis/lettuce/RedisLettuceCacheFailTest.java @@ -20,6 +20,7 @@ import java.util.HashSet; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.function.Function; @@ -70,6 +71,16 @@ private RedisFuture mockFuture(Object value, Throwable ex) { Object resultData = function.apply(value, ex); return CompletableFuture.completedFuture(resultData); }); + when(redisFuture.handleAsync(any(), any())).thenAnswer((invoke) -> { + BiFunction function = invoke.getArgument(0); + Executor executor = invoke.getArgument(1); + CompletableFuture f = new CompletableFuture(); + executor.execute(() -> { + Object resultData = function.apply(value, ex); + f.complete(resultData); + }); + return f; + }); return redisFuture; } diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/redis/lettuce/RedisLettuceCacheTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/redis/lettuce/RedisLettuceCacheTest.java index ddae0379..184031b5 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/redis/lettuce/RedisLettuceCacheTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/redis/lettuce/RedisLettuceCacheTest.java @@ -140,7 +140,7 @@ public void testWithMultiLevelCache() throws Exception { LoadingCacheTest.loadingCacheTest(MultiLevelCacheBuilder.createMultiLevelCacheBuilder() .expireAfterWrite(5000, TimeUnit.MILLISECONDS) - .addCache(l1Cache, l2Cache), 50); + .addCache(l1Cache, l2Cache), 20); LettuceConnectionManager.defaultManager().removeAndClose(client); } @@ -165,7 +165,7 @@ private void test(AbstractRedisClient client, StatefulConnection connection) thr .keyConvertor(FastjsonKeyConvertor.INSTANCE) .valueEncoder(JavaValueEncoder.INSTANCE) .valueDecoder(JavaValueDecoder.INSTANCE) - .keyPrefix(new Random().nextInt() + ""), 50); + .keyPrefix(new Random().nextInt() + ""), 20); cache = RedisLettuceCacheBuilder.createRedisLettuceCacheBuilder() .redisClient(client) diff --git a/jetcache-test/src/test/java/com/alicp/jetcache/support/DefaultCacheMonitorTest.java b/jetcache-test/src/test/java/com/alicp/jetcache/support/DefaultCacheMonitorTest.java index d5204488..7d265348 100644 --- a/jetcache-test/src/test/java/com/alicp/jetcache/support/DefaultCacheMonitorTest.java +++ b/jetcache-test/src/test/java/com/alicp/jetcache/support/DefaultCacheMonitorTest.java @@ -3,7 +3,7 @@ import com.alicp.jetcache.Cache; import com.alicp.jetcache.MultiLevelCache; import com.alicp.jetcache.embedded.LinkedHashMapCacheBuilder; -import org.junit.Assert; +import com.alicp.jetcache.test.anno.TestUtil; import org.junit.Test; import java.util.HashMap; @@ -12,6 +12,8 @@ import java.util.Map; import java.util.concurrent.TimeUnit; +import static com.alicp.jetcache.test.anno.TestUtil.waitUtil; + /** * Created on 2016/11/1. * @@ -27,93 +29,154 @@ public Cache createCache() { } private static void basetest(Cache cache, DefaultCacheMonitor m) { - CacheStat oldStat = m.getCacheStat().clone(); - cache.get("MONITOR_TEST_K1"); - Assert.assertEquals(oldStat.getGetCount() + 1, m.getCacheStat().getGetCount()); - Assert.assertEquals(oldStat.getGetMissCount() + 1, m.getCacheStat().getGetMissCount()); - - oldStat = m.getCacheStat().clone(); - cache.put("MONITOR_TEST_K1", "V1"); - Assert.assertEquals(oldStat.getPutCount() + 1, m.getCacheStat().getPutCount()); - - oldStat = m.getCacheStat().clone(); - cache.get("MONITOR_TEST_K1"); - Assert.assertEquals(oldStat.getGetCount() + 1, m.getCacheStat().getGetCount()); - Assert.assertEquals(oldStat.getGetHitCount() + 1, m.getCacheStat().getGetHitCount()); - - oldStat = m.getCacheStat().clone(); - cache.remove("MONITOR_TEST_K1"); - Assert.assertEquals(oldStat.getRemoveCount() + 1, m.getCacheStat().getRemoveCount()); - - oldStat = m.getCacheStat().clone(); - cache.computeIfAbsent("MONITOR_TEST_K1", (k) -> null); - Assert.assertEquals(oldStat.getGetCount() + 1, m.getCacheStat().getGetCount()); - Assert.assertEquals(oldStat.getGetMissCount() + 1, m.getCacheStat().getGetMissCount()); - Assert.assertEquals(oldStat.getPutCount(), m.getCacheStat().getPutCount()); - Assert.assertEquals(oldStat.getLoadCount() + 1, m.getCacheStat().getLoadCount()); - Assert.assertEquals(oldStat.getLoadSuccessCount() + 1, m.getCacheStat().getLoadSuccessCount()); - - oldStat = m.getCacheStat().clone(); - cache.computeIfAbsent("MONITOR_TEST_K1", (k) -> null, true); - Assert.assertEquals(oldStat.getGetCount() + 1, m.getCacheStat().getGetCount()); - Assert.assertEquals(oldStat.getGetMissCount() + 1, m.getCacheStat().getGetMissCount()); - Assert.assertEquals(oldStat.getPutCount() + 1, m.getCacheStat().getPutCount()); - Assert.assertEquals(oldStat.getLoadCount() + 1, m.getCacheStat().getLoadCount()); - Assert.assertEquals(oldStat.getLoadSuccessCount() + 1, m.getCacheStat().getLoadSuccessCount()); - - oldStat = m.getCacheStat().clone(); - cache.get("MONITOR_TEST_K1"); - Assert.assertEquals(oldStat.getGetCount() + 1, m.getCacheStat().getGetCount()); - Assert.assertEquals(oldStat.getGetHitCount() + 1, m.getCacheStat().getGetHitCount()); - - oldStat = m.getCacheStat().clone(); - cache.remove("MONITOR_TEST_K1"); - Assert.assertEquals(oldStat.getRemoveCount() + 1, m.getCacheStat().getRemoveCount()); - - oldStat = m.getCacheStat().clone(); - cache.computeIfAbsent("MONITOR_TEST_K2", (k) -> null, false, 10, TimeUnit.SECONDS); - Assert.assertEquals(oldStat.getGetCount() + 1, m.getCacheStat().getGetCount()); - Assert.assertEquals(oldStat.getGetMissCount() + 1, m.getCacheStat().getGetMissCount()); - Assert.assertEquals(oldStat.getPutCount(), m.getCacheStat().getPutCount()); - Assert.assertEquals(oldStat.getLoadCount() + 1, m.getCacheStat().getLoadCount()); - Assert.assertEquals(oldStat.getLoadSuccessCount() + 1, m.getCacheStat().getLoadSuccessCount()); - - oldStat = m.getCacheStat().clone(); - cache.computeIfAbsent("MONITOR_TEST_K2", (k) -> null, true, 10, TimeUnit.SECONDS); - Assert.assertEquals(oldStat.getGetCount() + 1, m.getCacheStat().getGetCount()); - Assert.assertEquals(oldStat.getGetMissCount() + 1, m.getCacheStat().getGetMissCount()); - Assert.assertEquals(oldStat.getPutCount() + 1, m.getCacheStat().getPutCount()); - Assert.assertEquals(oldStat.getLoadCount() + 1, m.getCacheStat().getLoadCount()); - Assert.assertEquals(oldStat.getLoadSuccessCount() + 1, m.getCacheStat().getLoadSuccessCount()); - - oldStat = m.getCacheStat().clone(); - cache.get("MONITOR_TEST_K2"); - Assert.assertEquals(oldStat.getGetCount() + 1, m.getCacheStat().getGetCount()); - Assert.assertEquals(oldStat.getGetHitCount() + 1, m.getCacheStat().getGetHitCount()); - - oldStat = m.getCacheStat().clone(); - cache.remove("MONITOR_TEST_K2"); - Assert.assertEquals(oldStat.getRemoveCount() + 1, m.getCacheStat().getRemoveCount()); - - oldStat = m.getCacheStat().clone(); + { + CacheStat oldStat = m.getCacheStat(); + cache.get("MONITOR_TEST_K1"); + waitUtil(() -> { + CacheStat s = m.getCacheStat(); + return oldStat.getGetCount() + 1 == s.getGetCount() + && oldStat.getGetMissCount() + 1 == s.getGetMissCount(); + }); + } + + { + CacheStat oldStat = m.getCacheStat(); + cache.put("MONITOR_TEST_K1", "V1"); + waitUtil(oldStat.getPutCount() + 1, () -> m.getCacheStat().getPutCount()); + } + + { + CacheStat oldStat = m.getCacheStat(); + cache.get("MONITOR_TEST_K1"); + waitUtil(() -> { + CacheStat s = m.getCacheStat(); + return oldStat.getGetCount() + 1 == s.getGetCount() + && oldStat.getGetHitCount() + 1 == s.getGetHitCount(); + }); + } + + { + CacheStat oldStat = m.getCacheStat(); + cache.remove("MONITOR_TEST_K1"); + waitUtil(oldStat.getRemoveCount() + 1, () -> m.getCacheStat().getRemoveCount()); + } + + { + CacheStat oldStat = m.getCacheStat(); + cache.computeIfAbsent("MONITOR_TEST_K1", (k) -> null); + waitUtil(() -> { + CacheStat s = m.getCacheStat(); + return oldStat.getGetCount() + 1 == s.getGetCount() + && oldStat.getGetMissCount() + 1 == s.getGetMissCount() + && oldStat.getPutCount() == s.getPutCount() + && oldStat.getLoadCount() + 1 == s.getLoadCount() + && oldStat.getLoadSuccessCount() + 1 == s.getLoadSuccessCount(); + }); + } + + { + CacheStat oldStat = m.getCacheStat(); + cache.computeIfAbsent("MONITOR_TEST_K1", (k) -> null, true); + waitUtil(() -> { + CacheStat s = m.getCacheStat(); + return oldStat.getGetCount() + 1 == s.getGetCount() + && oldStat.getGetMissCount() + 1 == s.getGetMissCount() + && oldStat.getPutCount() + 1 == s.getPutCount() + && oldStat.getLoadCount() + 1 == s.getLoadCount() + && oldStat.getLoadSuccessCount() + 1 == s.getLoadSuccessCount(); + }); + } + + { + CacheStat oldStat = m.getCacheStat(); + cache.get("MONITOR_TEST_K1"); + waitUtil(() -> { + CacheStat s = m.getCacheStat(); + return oldStat.getGetCount() + 1 == s.getGetCount() + && oldStat.getGetHitCount() + 1 == s.getGetHitCount(); + }); + } + + { + CacheStat oldStat = m.getCacheStat(); + cache.remove("MONITOR_TEST_K1"); + waitUtil(oldStat.getRemoveCount() + 1, () -> m.getCacheStat().getRemoveCount()); + } + + { + CacheStat oldStat = m.getCacheStat(); + cache.computeIfAbsent("MONITOR_TEST_K2", (k) -> null, false, 10, TimeUnit.SECONDS); + waitUtil(() -> { + CacheStat s = m.getCacheStat(); + return oldStat.getGetCount() + 1 == s.getGetCount() + && oldStat.getGetMissCount() + 1 == s.getGetMissCount() + && oldStat.getPutCount() == s.getPutCount() + && oldStat.getLoadCount() + 1 == s.getLoadCount() + && oldStat.getLoadSuccessCount() + 1 == s.getLoadSuccessCount(); + }); + } + + { + CacheStat oldStat = m.getCacheStat(); + cache.computeIfAbsent("MONITOR_TEST_K2", (k) -> null, true, 10, TimeUnit.SECONDS); + waitUtil(() -> { + CacheStat s = m.getCacheStat(); + return oldStat.getGetCount() + 1 == s.getGetCount() + && oldStat.getGetMissCount() + 1 == s.getGetMissCount() + && oldStat.getPutCount() + 1 == s.getPutCount() + && oldStat.getLoadCount() + 1 == s.getLoadCount() + && oldStat.getLoadSuccessCount() + 1 == s.getLoadSuccessCount(); + }); + } + + { + CacheStat oldStat = m.getCacheStat(); + cache.get("MONITOR_TEST_K2"); + waitUtil(() -> { + CacheStat s = m.getCacheStat(); + return oldStat.getGetCount() + 1 == s.getGetCount() + && oldStat.getGetHitCount() + 1 == s.getGetHitCount(); + }); + } + + { + CacheStat oldStat = m.getCacheStat(); + cache.remove("MONITOR_TEST_K2"); + waitUtil(oldStat.getRemoveCount() + 1, () -> m.getCacheStat().getRemoveCount()); + } + Map map = new HashMap(); map.put("MONITOR_TEST_multi_k1", "V1"); map.put("MONITOR_TEST_multi_k2", "V2"); - cache.putAll(map); - Assert.assertEquals(oldStat.getPutCount() + 2, m.getCacheStat().getPutCount()); - - oldStat = m.getCacheStat().clone(); - HashSet keys = new HashSet(map.keySet()); - keys.add("MONITOR_TEST_multi_k3"); - cache.getAll(keys); - Assert.assertEquals(oldStat.getGetCount() + 3, m.getCacheStat().getGetCount()); - Assert.assertEquals(oldStat.getGetHitCount() + 2, m.getCacheStat().getGetHitCount()); - Assert.assertEquals(oldStat.getGetMissCount() + 1, m.getCacheStat().getGetMissCount()); - - oldStat = m.getCacheStat().clone(); - cache.removeAll(keys); - Assert.assertEquals(oldStat.getRemoveCount() + 3, m.getCacheStat().getRemoveCount()); - Assert.assertEquals(oldStat.getRemoveCount() + 3, m.getCacheStat().getRemoveSuccessCount()); + { + CacheStat oldStat = m.getCacheStat(); + cache.putAll(map); + waitUtil(oldStat.getPutCount() + 2, () -> m.getCacheStat().getPutCount()); + } + + { + CacheStat oldStat = m.getCacheStat(); + HashSet keys = new HashSet(map.keySet()); + keys.add("MONITOR_TEST_multi_k3"); + cache.getAll(keys); + waitUtil(() -> { + CacheStat s = m.getCacheStat(); + return oldStat.getGetCount() + 3 == s.getGetCount() + && oldStat.getGetHitCount() + 2 == s.getGetHitCount() + && oldStat.getGetMissCount() + 1 == s.getGetMissCount(); + }); + } + + { + CacheStat oldStat = m.getCacheStat(); + HashSet keys = new HashSet(map.keySet()); + cache.removeAll(keys); + waitUtil(() -> { + CacheStat s = m.getCacheStat(); + return oldStat.getRemoveCount() + 2 == s.getRemoveCount() + && oldStat.getRemoveSuccessCount() + 2 == s.getRemoveSuccessCount(); + }); + } } public static void testMonitor(Cache cache) {