Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions lib/trino-collect/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>testing</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.collect.cache;

import com.google.common.base.Ticker;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
Expand Down Expand Up @@ -45,6 +46,7 @@ public static EvictableCacheBuilder<Object, Object> newBuilder()
return new EvictableCacheBuilder<>();
}

private Optional<Ticker> ticker = Optional.empty();
private Optional<Duration> expireAfterWrite = Optional.empty();
private Optional<Duration> refreshAfterWrite = Optional.empty();
private Optional<Long> maximumSize = Optional.empty();
Expand All @@ -55,6 +57,15 @@ public static EvictableCacheBuilder<Object, Object> newBuilder()

private EvictableCacheBuilder() {}

/**
* Pass-through for {@link CacheBuilder#ticker(Ticker)}.
*/
public EvictableCacheBuilder<K, V> ticker(Ticker ticker)
{
this.ticker = Optional.of(ticker);
return this;
}

public EvictableCacheBuilder<K, V> expireAfterWrite(long duration, TimeUnit unit)
{
return expireAfterWrite(toDuration(duration, unit));
Expand Down Expand Up @@ -172,6 +183,7 @@ public <K1 extends K, V1 extends V> LoadingCache<K1, V1> build(CacheLoader<? sup

// CacheBuilder is further modified in EvictableCache::new, so cannot be shared between build() calls.
CacheBuilder<Object, ? super V> cacheBuilder = CacheBuilder.newBuilder();
ticker.ifPresent(cacheBuilder::ticker);
expireAfterWrite.ifPresent(cacheBuilder::expireAfterWrite);
refreshAfterWrite.ifPresent(cacheBuilder::refreshAfterWrite);
maximumSize.ifPresent(cacheBuilder::maximumSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.testing.TestingTicker;
import org.gaul.modernizer_maven_annotations.SuppressModernizer;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand All @@ -32,6 +33,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -45,6 +47,7 @@
import static java.lang.String.format;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.TimeUnit.DAYS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down Expand Up @@ -72,17 +75,24 @@ public void testLoad()
public void testEvictBySize()
throws Exception
{
int maximumSize = 10;
Cache<Integer, Integer> cache = EvictableCacheBuilder.newBuilder()
.maximumSize(10)
.maximumSize(maximumSize)
.build();

for (int i = 0; i < 10_000; i++) {
int value = i * 10;
assertEquals((Object) cache.get(i, () -> value), value);
}
cache.cleanUp();
assertEquals(cache.size(), 10);
assertEquals(((EvictableCache<?, ?>) cache).tokensCount(), 10);
assertEquals(cache.size(), maximumSize);
assertEquals(((EvictableCache<?, ?>) cache).tokensCount(), maximumSize);

// Ensure cache is effective, i.e. some entries preserved
int lastKey = 10_000 - 1;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extract 10_000 to variable

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i chose not to apply this one, hope it's fine

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah :)

assertEquals((Object) cache.get(lastKey, () -> {
throw new UnsupportedOperationException();
}), lastKey * 10);
}

@Test(timeOut = TEST_TIMEOUT_MILLIS)
Expand All @@ -106,6 +116,75 @@ public void testEvictByWeight()
assertThat(cache.asMap().keySet().stream().mapToInt(i -> i).sum()).as("key sum").isLessThanOrEqualTo(20);
assertThat(cache.asMap().values()).as("values").hasSize(cacheSize);
assertThat(cache.asMap().values().stream().mapToInt(String::length).sum()).as("values length sum").isLessThanOrEqualTo(20);

// Ensure cache is effective, i.e. some entries preserved
int lastKey = 10 - 1;
assertEquals(cache.get(lastKey, () -> {
throw new UnsupportedOperationException();
}), Strings.repeat("a", lastKey));
}

@Test(timeOut = TEST_TIMEOUT_MILLIS)
Comment thread
findepi marked this conversation as resolved.
Outdated
public void testEvictByTime()
throws Exception
{
TestingTicker ticker = new TestingTicker();
int ttl = 100;
Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder()
.ticker(ticker)
.expireAfterWrite(ttl, TimeUnit.MILLISECONDS)
.build();

assertEquals(cache.get(1, () -> "1 ala ma kota"), "1 ala ma kota");
ticker.increment(ttl, MILLISECONDS);
assertEquals(cache.get(2, () -> "2 ala ma kota"), "2 ala ma kota");
cache.cleanUp();

// First entry should be expired and its token removed
int cacheSize = toIntExact(cache.size());
assertThat(cacheSize).as("cacheSize").isEqualTo(1);
assertThat(((EvictableCache<?, ?>) cache).tokensCount()).as("tokensCount").isEqualTo(cacheSize);
assertThat(cache.asMap().keySet()).as("keySet").hasSize(cacheSize);
assertThat(cache.asMap().values()).as("values").hasSize(cacheSize);
}

@Test(timeOut = TEST_TIMEOUT_MILLIS)
public void testPreserveValueLoadedAfterTimeExpiration()
throws Exception
{
TestingTicker ticker = new TestingTicker();
int ttl = 100;
Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder()
.ticker(ticker)
.expireAfterWrite(ttl, TimeUnit.MILLISECONDS)
.build();
int key = 11;

assertEquals(cache.get(key, () -> "11 ala ma kota"), "11 ala ma kota");
assertThat(((EvictableCache<?, ?>) cache).tokensCount()).as("tokensCount").isEqualTo(1);

// Should be served from the cache
assertEquals(cache.get(key, () -> "something else"), "11 ala ma kota");
assertThat(((EvictableCache<?, ?>) cache).tokensCount()).as("tokensCount").isEqualTo(1);

ticker.increment(ttl, MILLISECONDS);
// Should be reloaded
assertEquals(cache.get(key, () -> "new value"), "new value");
// TODO (https://github.com/trinodb/trino/issues/14545) tokensCount should be 1; 0 means we lost the token for a live entry
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #14545

assertThat(((EvictableCache<?, ?>) cache).tokensCount()).as("tokensCount").isEqualTo(0);

// Should be served from the cache
// TODO (https://github.com/trinodb/trino/issues/14545) this should return "new value" inserted into the cache above; it's not doing that due to the token being lost
assertEquals(cache.get(key, () -> "something yet different"), "something yet different");
// TODO (https://github.com/trinodb/trino/issues/14545) loads count should be 2; it got incremented because we lost the token for a live entry
assertThat(((EvictableCache<?, ?>) cache).tokensCount()).as("tokensCount").isEqualTo(1);

// TODO (https://github.com/trinodb/trino/issues/14545) cache size should be 1; it is misreported, because there are two live entries for the same key, due to first token being lost
assertThat(cache.size()).as("cacheSize").isEqualTo(2);
assertThat(((EvictableCache<?, ?>) cache).tokensCount()).as("tokensCount").isEqualTo(1);
assertThat(cache.asMap().keySet()).as("keySet").hasSize(1);
// TODO (https://github.com/trinodb/trino/issues/14545) values size should be 1; it is misreported, because there are two live entries for the same key, due to first token being lost
assertThat(cache.asMap().values()).as("values").hasSize(2);
}

@Test(timeOut = TEST_TIMEOUT_MILLIS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.testing.TestingTicker;
import org.gaul.modernizer_maven_annotations.SuppressModernizer;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand All @@ -36,6 +37,7 @@
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
Expand All @@ -48,6 +50,7 @@
import static java.lang.Math.toIntExact;
import static java.lang.String.format;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down Expand Up @@ -78,26 +81,41 @@ public void testLoad()
public void testEvictBySize()
throws Exception
{
int maximumSize = 10;
AtomicInteger loads = new AtomicInteger();
LoadingCache<Integer, String> cache = EvictableCacheBuilder.newBuilder()
.maximumSize(10)
.build(CacheLoader.from(key -> "abc" + key));
.maximumSize(maximumSize)
.build(CacheLoader.from(key -> {
loads.incrementAndGet();
return "abc" + key;
}));

for (int i = 0; i < 10_000; i++) {
assertEquals((Object) cache.get(i), "abc" + i);
}
cache.cleanUp();
assertEquals(cache.size(), 10);
assertEquals(((EvictableCache<?, ?>) cache).tokensCount(), 10);
assertEquals(cache.size(), maximumSize);
assertEquals(((EvictableCache<?, ?>) cache).tokensCount(), maximumSize);
assertEquals(loads.get(), 10_000);

// Ensure cache is effective, i.e. no new load
int lastKey = 10_000 - 1;
assertEquals((Object) cache.get(lastKey), "abc" + lastKey);
assertEquals(loads.get(), 10_000);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract 10_000

}

@Test(timeOut = TEST_TIMEOUT_MILLIS)
public void testEvictByWeight()
throws Exception
{
AtomicInteger loads = new AtomicInteger();
LoadingCache<Integer, String> cache = EvictableCacheBuilder.newBuilder()
.maximumWeight(20)
.weigher((Integer key, String value) -> value.length())
.build(CacheLoader.from(key -> Strings.repeat("a", key)));
.build(CacheLoader.from(key -> {
loads.incrementAndGet();
return Strings.repeat("a", key);
}));

for (int i = 0; i < 10; i++) {
assertEquals((Object) cache.get(i), Strings.repeat("a", i));
Expand All @@ -110,6 +128,85 @@ public void testEvictByWeight()
assertThat(cache.asMap().keySet().stream().mapToInt(i -> i).sum()).as("key sum").isLessThanOrEqualTo(20);
assertThat(cache.asMap().values()).as("values").hasSize(cacheSize);
assertThat(cache.asMap().values().stream().mapToInt(String::length).sum()).as("values length sum").isLessThanOrEqualTo(20);
assertEquals(loads.get(), 10);

// Ensure cache is effective, i.e. no new load
int lastKey = 10 - 1;
assertEquals((Object) cache.get(lastKey), Strings.repeat("a", lastKey));
assertEquals(loads.get(), 10);
}

@Test(timeOut = TEST_TIMEOUT_MILLIS)
public void testEvictByTime()
{
TestingTicker ticker = new TestingTicker();
int ttl = 100;
AtomicInteger loads = new AtomicInteger();
LoadingCache<Integer, String> cache = EvictableCacheBuilder.newBuilder()
.ticker(ticker)
.expireAfterWrite(ttl, TimeUnit.MILLISECONDS)
.build(CacheLoader.from(k -> {
loads.incrementAndGet();
return k + " ala ma kota";
}));

assertEquals(cache.getUnchecked(1), "1 ala ma kota");
ticker.increment(ttl, MILLISECONDS);
assertEquals(cache.getUnchecked(2), "2 ala ma kota");
cache.cleanUp();

// First entry should be expired and its token removed
int cacheSize = toIntExact(cache.size());
assertThat(cacheSize).as("cacheSize").isEqualTo(1);
assertThat(((EvictableCache<?, ?>) cache).tokensCount()).as("tokensCount").isEqualTo(cacheSize);
assertThat(cache.asMap().keySet()).as("keySet").hasSize(cacheSize);
assertThat(cache.asMap().values()).as("values").hasSize(cacheSize);
assertEquals(loads.get(), 2);
}

@Test(timeOut = TEST_TIMEOUT_MILLIS)
public void testPreserveValueLoadedAfterTimeExpiration()
{
TestingTicker ticker = new TestingTicker();
int ttl = 100;
AtomicInteger loads = new AtomicInteger();
LoadingCache<Integer, String> cache = EvictableCacheBuilder.newBuilder()
.ticker(ticker)
.expireAfterWrite(ttl, TimeUnit.MILLISECONDS)
.build(CacheLoader.from(k -> {
loads.incrementAndGet();
return k + " ala ma kota";
}));
int key = 11;

assertEquals(cache.getUnchecked(key), "11 ala ma kota");
assertThat(loads.get()).as("initial load count").isEqualTo(1);
assertThat(((EvictableCache<?, ?>) cache).tokensCount()).as("tokensCount").isEqualTo(1);

// Should be served from the cache
assertEquals(cache.getUnchecked(key), "11 ala ma kota");
assertThat(loads.get()).as("loads count should not change before value expires").isEqualTo(1);
assertThat(((EvictableCache<?, ?>) cache).tokensCount()).as("tokensCount").isEqualTo(1);

ticker.increment(ttl, MILLISECONDS);
// Should be reloaded
assertEquals(cache.getUnchecked(key), "11 ala ma kota");
assertThat(loads.get()).as("loads count should reflect reloading of value after expiration").isEqualTo(2);
// TODO (https://github.com/trinodb/trino/issues/14545) tokensCount should be 1; 0 means we lost the token for a live entry
assertThat(((EvictableCache<?, ?>) cache).tokensCount()).as("tokensCount").isEqualTo(0);

// Should be served from the cache
assertEquals(cache.getUnchecked(key), "11 ala ma kota");
// TODO (https://github.com/trinodb/trino/issues/14545) loads count should be 2; it got incremented because we lost the token for a live entry
assertThat(loads.get()).as("loads count should not change before value expires again").isEqualTo(3);
assertThat(((EvictableCache<?, ?>) cache).tokensCount()).as("tokensCount").isEqualTo(1);

// TODO (https://github.com/trinodb/trino/issues/14545) cache size should be 1; it is misreported, because there are two live entries for the same key, due to first token being lost
assertThat(cache.size()).as("cacheSize").isEqualTo(2);
assertThat(((EvictableCache<?, ?>) cache).tokensCount()).as("tokensCount").isEqualTo(1);
assertThat(cache.asMap().keySet()).as("keySet").hasSize(1);
// TODO (https://github.com/trinodb/trino/issues/14545) values size should be 1; it is misreported, because there are two live entries for the same key, due to first token being lost
assertThat(cache.asMap().values()).as("values").hasSize(2);
}

@Test(timeOut = TEST_TIMEOUT_MILLIS, dataProvider = "testDisabledCacheDataProvider")
Expand Down