diff --git a/lib/trino-collect/pom.xml b/lib/trino-collect/pom.xml index bdcca0a2ce3a..f4d2953a5a09 100644 --- a/lib/trino-collect/pom.xml +++ b/lib/trino-collect/pom.xml @@ -45,6 +45,12 @@ test + + io.airlift + testing + test + + org.assertj assertj-core diff --git a/lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableCacheBuilder.java b/lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableCacheBuilder.java index ed28c28dbc5c..14895eaf6787 100644 --- a/lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableCacheBuilder.java +++ b/lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableCacheBuilder.java @@ -13,6 +13,7 @@ */ package io.trino.collect.cache; +import com.google.common.base.Ticker; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; @@ -45,6 +46,7 @@ public static EvictableCacheBuilder newBuilder() return new EvictableCacheBuilder<>(); } + private Optional ticker = Optional.empty(); private Optional expireAfterWrite = Optional.empty(); private Optional refreshAfterWrite = Optional.empty(); private Optional maximumSize = Optional.empty(); @@ -55,6 +57,15 @@ public static EvictableCacheBuilder newBuilder() private EvictableCacheBuilder() {} + /** + * Pass-through for {@link CacheBuilder#ticker(Ticker)}. + */ + public EvictableCacheBuilder ticker(Ticker ticker) + { + this.ticker = Optional.of(ticker); + return this; + } + public EvictableCacheBuilder expireAfterWrite(long duration, TimeUnit unit) { return expireAfterWrite(toDuration(duration, unit)); @@ -172,6 +183,7 @@ public LoadingCache build(CacheLoader cacheBuilder = CacheBuilder.newBuilder(); + ticker.ifPresent(cacheBuilder::ticker); expireAfterWrite.ifPresent(cacheBuilder::expireAfterWrite); refreshAfterWrite.ifPresent(cacheBuilder::refreshAfterWrite); maximumSize.ifPresent(cacheBuilder::maximumSize); diff --git a/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableCache.java b/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableCache.java index 79f03ac6f853..da04bd1881dd 100644 --- a/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableCache.java +++ b/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableCache.java @@ -19,6 +19,7 @@ import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import io.airlift.testing.TestingTicker; import org.gaul.modernizer_maven_annotations.SuppressModernizer; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -32,6 +33,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -45,6 +47,7 @@ import static java.lang.String.format; import static java.util.concurrent.Executors.newFixedThreadPool; import static java.util.concurrent.TimeUnit.DAYS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -72,8 +75,9 @@ public void testLoad() public void testEvictBySize() throws Exception { + int maximumSize = 10; Cache cache = EvictableCacheBuilder.newBuilder() - .maximumSize(10) + .maximumSize(maximumSize) .build(); for (int i = 0; i < 10_000; i++) { @@ -81,8 +85,14 @@ public void testEvictBySize() assertEquals((Object) cache.get(i, () -> value), value); } cache.cleanUp(); - assertEquals(cache.size(), 10); - assertEquals(((EvictableCache) cache).tokensCount(), 10); + assertEquals(cache.size(), maximumSize); + assertEquals(((EvictableCache) cache).tokensCount(), maximumSize); + + // Ensure cache is effective, i.e. some entries preserved + int lastKey = 10_000 - 1; + assertEquals((Object) cache.get(lastKey, () -> { + throw new UnsupportedOperationException(); + }), lastKey * 10); } @Test(timeOut = TEST_TIMEOUT_MILLIS) @@ -106,6 +116,75 @@ public void testEvictByWeight() assertThat(cache.asMap().keySet().stream().mapToInt(i -> i).sum()).as("key sum").isLessThanOrEqualTo(20); assertThat(cache.asMap().values()).as("values").hasSize(cacheSize); assertThat(cache.asMap().values().stream().mapToInt(String::length).sum()).as("values length sum").isLessThanOrEqualTo(20); + + // Ensure cache is effective, i.e. some entries preserved + int lastKey = 10 - 1; + assertEquals(cache.get(lastKey, () -> { + throw new UnsupportedOperationException(); + }), Strings.repeat("a", lastKey)); + } + + @Test(timeOut = TEST_TIMEOUT_MILLIS) + public void testEvictByTime() + throws Exception + { + TestingTicker ticker = new TestingTicker(); + int ttl = 100; + Cache cache = EvictableCacheBuilder.newBuilder() + .ticker(ticker) + .expireAfterWrite(ttl, TimeUnit.MILLISECONDS) + .build(); + + assertEquals(cache.get(1, () -> "1 ala ma kota"), "1 ala ma kota"); + ticker.increment(ttl, MILLISECONDS); + assertEquals(cache.get(2, () -> "2 ala ma kota"), "2 ala ma kota"); + cache.cleanUp(); + + // First entry should be expired and its token removed + int cacheSize = toIntExact(cache.size()); + assertThat(cacheSize).as("cacheSize").isEqualTo(1); + assertThat(((EvictableCache) cache).tokensCount()).as("tokensCount").isEqualTo(cacheSize); + assertThat(cache.asMap().keySet()).as("keySet").hasSize(cacheSize); + assertThat(cache.asMap().values()).as("values").hasSize(cacheSize); + } + + @Test(timeOut = TEST_TIMEOUT_MILLIS) + public void testPreserveValueLoadedAfterTimeExpiration() + throws Exception + { + TestingTicker ticker = new TestingTicker(); + int ttl = 100; + Cache cache = EvictableCacheBuilder.newBuilder() + .ticker(ticker) + .expireAfterWrite(ttl, TimeUnit.MILLISECONDS) + .build(); + int key = 11; + + assertEquals(cache.get(key, () -> "11 ala ma kota"), "11 ala ma kota"); + assertThat(((EvictableCache) cache).tokensCount()).as("tokensCount").isEqualTo(1); + + // Should be served from the cache + assertEquals(cache.get(key, () -> "something else"), "11 ala ma kota"); + assertThat(((EvictableCache) cache).tokensCount()).as("tokensCount").isEqualTo(1); + + ticker.increment(ttl, MILLISECONDS); + // Should be reloaded + assertEquals(cache.get(key, () -> "new value"), "new value"); + // TODO (https://github.com/trinodb/trino/issues/14545) tokensCount should be 1; 0 means we lost the token for a live entry + assertThat(((EvictableCache) cache).tokensCount()).as("tokensCount").isEqualTo(0); + + // Should be served from the cache + // TODO (https://github.com/trinodb/trino/issues/14545) this should return "new value" inserted into the cache above; it's not doing that due to the token being lost + assertEquals(cache.get(key, () -> "something yet different"), "something yet different"); + // TODO (https://github.com/trinodb/trino/issues/14545) loads count should be 2; it got incremented because we lost the token for a live entry + assertThat(((EvictableCache) cache).tokensCount()).as("tokensCount").isEqualTo(1); + + // TODO (https://github.com/trinodb/trino/issues/14545) cache size should be 1; it is misreported, because there are two live entries for the same key, due to first token being lost + assertThat(cache.size()).as("cacheSize").isEqualTo(2); + assertThat(((EvictableCache) cache).tokensCount()).as("tokensCount").isEqualTo(1); + assertThat(cache.asMap().keySet()).as("keySet").hasSize(1); + // TODO (https://github.com/trinodb/trino/issues/14545) values size should be 1; it is misreported, because there are two live entries for the same key, due to first token being lost + assertThat(cache.asMap().values()).as("values").hasSize(2); } @Test(timeOut = TEST_TIMEOUT_MILLIS) diff --git a/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableLoadingCache.java b/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableLoadingCache.java index 7563455a13f1..7dd45e5e6545 100644 --- a/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableLoadingCache.java +++ b/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableLoadingCache.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.airlift.testing.TestingTicker; import org.gaul.modernizer_maven_annotations.SuppressModernizer; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -36,6 +37,7 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; @@ -48,6 +50,7 @@ import static java.lang.Math.toIntExact; import static java.lang.String.format; import static java.util.concurrent.Executors.newFixedThreadPool; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -78,26 +81,41 @@ public void testLoad() public void testEvictBySize() throws Exception { + int maximumSize = 10; + AtomicInteger loads = new AtomicInteger(); LoadingCache cache = EvictableCacheBuilder.newBuilder() - .maximumSize(10) - .build(CacheLoader.from(key -> "abc" + key)); + .maximumSize(maximumSize) + .build(CacheLoader.from(key -> { + loads.incrementAndGet(); + return "abc" + key; + })); for (int i = 0; i < 10_000; i++) { assertEquals((Object) cache.get(i), "abc" + i); } cache.cleanUp(); - assertEquals(cache.size(), 10); - assertEquals(((EvictableCache) cache).tokensCount(), 10); + assertEquals(cache.size(), maximumSize); + assertEquals(((EvictableCache) cache).tokensCount(), maximumSize); + assertEquals(loads.get(), 10_000); + + // Ensure cache is effective, i.e. no new load + int lastKey = 10_000 - 1; + assertEquals((Object) cache.get(lastKey), "abc" + lastKey); + assertEquals(loads.get(), 10_000); } @Test(timeOut = TEST_TIMEOUT_MILLIS) public void testEvictByWeight() throws Exception { + AtomicInteger loads = new AtomicInteger(); LoadingCache cache = EvictableCacheBuilder.newBuilder() .maximumWeight(20) .weigher((Integer key, String value) -> value.length()) - .build(CacheLoader.from(key -> Strings.repeat("a", key))); + .build(CacheLoader.from(key -> { + loads.incrementAndGet(); + return Strings.repeat("a", key); + })); for (int i = 0; i < 10; i++) { assertEquals((Object) cache.get(i), Strings.repeat("a", i)); @@ -110,6 +128,85 @@ public void testEvictByWeight() assertThat(cache.asMap().keySet().stream().mapToInt(i -> i).sum()).as("key sum").isLessThanOrEqualTo(20); assertThat(cache.asMap().values()).as("values").hasSize(cacheSize); assertThat(cache.asMap().values().stream().mapToInt(String::length).sum()).as("values length sum").isLessThanOrEqualTo(20); + assertEquals(loads.get(), 10); + + // Ensure cache is effective, i.e. no new load + int lastKey = 10 - 1; + assertEquals((Object) cache.get(lastKey), Strings.repeat("a", lastKey)); + assertEquals(loads.get(), 10); + } + + @Test(timeOut = TEST_TIMEOUT_MILLIS) + public void testEvictByTime() + { + TestingTicker ticker = new TestingTicker(); + int ttl = 100; + AtomicInteger loads = new AtomicInteger(); + LoadingCache cache = EvictableCacheBuilder.newBuilder() + .ticker(ticker) + .expireAfterWrite(ttl, TimeUnit.MILLISECONDS) + .build(CacheLoader.from(k -> { + loads.incrementAndGet(); + return k + " ala ma kota"; + })); + + assertEquals(cache.getUnchecked(1), "1 ala ma kota"); + ticker.increment(ttl, MILLISECONDS); + assertEquals(cache.getUnchecked(2), "2 ala ma kota"); + cache.cleanUp(); + + // First entry should be expired and its token removed + int cacheSize = toIntExact(cache.size()); + assertThat(cacheSize).as("cacheSize").isEqualTo(1); + assertThat(((EvictableCache) cache).tokensCount()).as("tokensCount").isEqualTo(cacheSize); + assertThat(cache.asMap().keySet()).as("keySet").hasSize(cacheSize); + assertThat(cache.asMap().values()).as("values").hasSize(cacheSize); + assertEquals(loads.get(), 2); + } + + @Test(timeOut = TEST_TIMEOUT_MILLIS) + public void testPreserveValueLoadedAfterTimeExpiration() + { + TestingTicker ticker = new TestingTicker(); + int ttl = 100; + AtomicInteger loads = new AtomicInteger(); + LoadingCache cache = EvictableCacheBuilder.newBuilder() + .ticker(ticker) + .expireAfterWrite(ttl, TimeUnit.MILLISECONDS) + .build(CacheLoader.from(k -> { + loads.incrementAndGet(); + return k + " ala ma kota"; + })); + int key = 11; + + assertEquals(cache.getUnchecked(key), "11 ala ma kota"); + assertThat(loads.get()).as("initial load count").isEqualTo(1); + assertThat(((EvictableCache) cache).tokensCount()).as("tokensCount").isEqualTo(1); + + // Should be served from the cache + assertEquals(cache.getUnchecked(key), "11 ala ma kota"); + assertThat(loads.get()).as("loads count should not change before value expires").isEqualTo(1); + assertThat(((EvictableCache) cache).tokensCount()).as("tokensCount").isEqualTo(1); + + ticker.increment(ttl, MILLISECONDS); + // Should be reloaded + assertEquals(cache.getUnchecked(key), "11 ala ma kota"); + assertThat(loads.get()).as("loads count should reflect reloading of value after expiration").isEqualTo(2); + // TODO (https://github.com/trinodb/trino/issues/14545) tokensCount should be 1; 0 means we lost the token for a live entry + assertThat(((EvictableCache) cache).tokensCount()).as("tokensCount").isEqualTo(0); + + // Should be served from the cache + assertEquals(cache.getUnchecked(key), "11 ala ma kota"); + // TODO (https://github.com/trinodb/trino/issues/14545) loads count should be 2; it got incremented because we lost the token for a live entry + assertThat(loads.get()).as("loads count should not change before value expires again").isEqualTo(3); + assertThat(((EvictableCache) cache).tokensCount()).as("tokensCount").isEqualTo(1); + + // TODO (https://github.com/trinodb/trino/issues/14545) cache size should be 1; it is misreported, because there are two live entries for the same key, due to first token being lost + assertThat(cache.size()).as("cacheSize").isEqualTo(2); + assertThat(((EvictableCache) cache).tokensCount()).as("tokensCount").isEqualTo(1); + assertThat(cache.asMap().keySet()).as("keySet").hasSize(1); + // TODO (https://github.com/trinodb/trino/issues/14545) values size should be 1; it is misreported, because there are two live entries for the same key, due to first token being lost + assertThat(cache.asMap().values()).as("values").hasSize(2); } @Test(timeOut = TEST_TIMEOUT_MILLIS, dataProvider = "testDisabledCacheDataProvider")