Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import com.google.common.cache.CacheStats;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalCause;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -32,6 +30,8 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -41,7 +41,6 @@
import java.util.concurrent.ExecutionException;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -144,42 +143,50 @@ public ImmutableMap<K, V> getAll(Iterable<? extends K> keys)
throws ExecutionException
{
List<Token<K>> newTokens = new ArrayList<>();
List<Token<K>> temporaryTokens = new ArrayList<>();
try {
BiMap<K, Token<K>> keyToToken = HashBiMap.create();
Map<K, V> result = new LinkedHashMap<>();
for (K key : keys) {
if (result.containsKey(key)) {
continue;
}
// This is not bulk, but is fast local operation
Token<K> newToken = new Token<>(key);
Token<K> token = tokens.computeIfAbsent(key, ignored -> newToken);
keyToToken.put(key, token);
if (token == newToken) {
newTokens.add(newToken);
Token<K> oldToken = tokens.putIfAbsent(key, newToken);
if (oldToken != null) {
// Token exists but a data may not exist (e.g. due to concurrent eviction)
V value = dataCache.getIfPresent(oldToken);
if (value != null) {
result.put(key, value);
continue;
}
// Old token exists but value wasn't found. This can happen when there is concurrent eviction/invalidation
// or when the value is still being loaded. The new token is not registered in tokens, so won't be used
// by subsequent invocations.
temporaryTokens.add(newToken);
}
newTokens.add(newToken);
}

Map<Token<K>, V> values = dataCache.getAll(keyToToken.values());

BiMap<Token<K>, K> tokenToKey = keyToToken.inverse();
ImmutableMap.Builder<K, V> result = ImmutableMap.builder();
Map<Token<K>, V> values = dataCache.getAll(newTokens);
for (Map.Entry<Token<K>, V> entry : values.entrySet()) {
Token<K> token = entry.getKey();

// While token.getKey() returns equal key, a caller may expect us to maintain key identity, in case equal keys are still distinguishable.
K key = tokenToKey.get(token);
checkState(key != null, "No key found for %s in %s when loading %s", token, tokenToKey, keys);

result.put(key, entry.getValue());
Token<K> newToken = entry.getKey();
result.put(newToken.getKey(), entry.getValue());
}
return result.buildOrThrow();
return ImmutableMap.copyOf(result);
}
catch (Throwable e) {
for (Token<K> token : newTokens) {
// Failed to load and it was our new token persisted in tokens map.
// Failed to load and it was our new token (potentially) persisted in tokens map.
// No cache entry exists for the token (unless concurrent load happened),
// so we need to remove it.
tokens.remove(token.getKey(), token);
}
throw e;
}
finally {
dataCache.invalidateAll(temporaryTokens);
}
}

@Override
Expand Down Expand Up @@ -226,6 +233,16 @@ public void invalidateAll()
tokens.clear();
}

// Not thread safe, test only.
@VisibleForTesting
void clearDataCacheOnly()
{
Map<K, Token<K>> tokensCopy = new HashMap<>(tokens);
dataCache.asMap().clear();
verify(tokens.isEmpty(), "Clearing dataCache should trigger tokens eviction");
tokens.putAll(tokensCopy);
}

@Override
public CacheStats stats()
{
Expand Down Expand Up @@ -406,7 +423,17 @@ public Map<Token<K>, V> loadAll(Iterable<? extends Token<K>> tokens)
for (Token<K> token : tokenList) {
keys.add(token.getKey());
}
Map<? super K, V> values = delegate.loadAll(keys);
Map<? super K, V> values;
try {
values = delegate.loadAll(keys);
}
catch (UnsupportedLoadingOperationException e) {
// Guava uses UnsupportedLoadingOperationException in LoadingCache.loadAll to fall back from bulk loading (without load sharing)
// to loading individual values (with load sharing). EvictableCache implementation does not currently support the fallback mechanism,
// so the individual values would be loaded without load sharing. This would be an unintentional and non-obvious behavioral
// discrepancy between EvictableCache and Guava Caches, so the mechanism is disabled.
throw new UnsupportedOperationException("LoadingCache.getAll() is not supported by EvictableCache when CacheLoader.loadAll is not implemented", e);
}

ImmutableMap.Builder<Token<K>, V> result = ImmutableMap.builder();
for (int i = 0; i < tokenList.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.base.Strings;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheStats;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.gaul.modernizer_maven_annotations.SuppressModernizer;
Expand All @@ -25,15 +26,18 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.collect.cache.CacheStatsAssertions.assertCacheStats;
Expand Down Expand Up @@ -275,6 +279,60 @@ private static Integer newInteger(int value)
return newInteger;
}

/**
* Test that the loader is invoked only once for concurrent invocations of {{@link LoadingCache#get(Object, Callable)} with equal keys.
* This is a behavior of Guava Cache as well. While this is necessarily desirable behavior (see
* <a href="https://github.com/trinodb/trino/issues/11067">https://github.com/trinodb/trino/issues/11067</a>),
* the test exists primarily to document current state and support discussion, should the current state change.
*/
@Test(timeOut = TEST_TIMEOUT_MILLIS)
public void testConcurrentGetWithCallableShareLoad()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should look into jcstress (https://github.com/openjdk/jcstress) for testing concurrency (for later, not as part of this PR).

throws Exception
{
AtomicInteger loads = new AtomicInteger();
AtomicInteger concurrentInvocations = new AtomicInteger();

Cache<Integer, Integer> cache = EvictableCacheBuilder.newBuilder()
.maximumSize(10_000)
.build();

int threads = 2;
int invocationsPerThread = 100;
ExecutorService executor = newFixedThreadPool(threads);
try {
CyclicBarrier barrier = new CyclicBarrier(threads);
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < threads; i++) {
futures.add(executor.submit(() -> {
for (int invocation = 0; invocation < invocationsPerThread; invocation++) {
int key = invocation;
barrier.await(10, SECONDS);
int value = cache.get(key, () -> {
loads.incrementAndGet();
int invocations = concurrentInvocations.incrementAndGet();
checkState(invocations == 1, "There should be no concurrent invocations, cache should do load sharing when get() invoked for same key");
Thread.sleep(1);
concurrentInvocations.decrementAndGet();
return -key;
});
assertEquals(value, -invocation);
}
return null;
}));
}

for (Future<?> future : futures) {
future.get(10, SECONDS);
}
assertThat(loads).as("loads")
.hasValueBetween(invocationsPerThread, threads * invocationsPerThread - 1 /* inclusive */);
}
finally {
executor.shutdownNow();
executor.awaitTermination(10, SECONDS);
}
}

/**
* Covers https://github.com/google/guava/issues/1881
*/
Expand Down
Loading