Skip to content

Commit

Permalink
Cache programmatic API - support asynchronous value loader
Browse files Browse the repository at this point in the history
- resolves #30311
  • Loading branch information
mkouba committed Jan 12, 2023
1 parent f8f7c49 commit 455ca8a
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.quarkus.cache.CaffeineCache;
import io.quarkus.cache.runtime.caffeine.CaffeineCacheImpl;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.mutiny.Uni;

public class ProgrammaticApiTest {

Expand Down Expand Up @@ -225,6 +226,20 @@ public void testInvalidatePredicate() throws Exception {
assertNull(cache.as(CaffeineCache.class).getIfPresent(key));
}

@Test
public void testAsyncLoader() throws Exception {
// Action: value retrieval from the cache.
// Expected effect: asyncvalue loader function used.
// Verified by: same object reference between STEPS 1 and 2 results.
String key = "alpha";
String result = "foo";
Uni<String> uni = Uni.createFrom().item(result);
String value = cache.getAsync(key, k -> uni).await().indefinitely();
assertSame(value, result);
assertKeySetContains(key);
assertGetIfPresent(key, value);
}

private void assertKeySetContains(Object... expectedKeys) {
Set<Object> expectedKeySet = new HashSet<>(Arrays.asList(expectedKeys));
Set<Object> actualKeySet = cache.as(CaffeineCache.class).keySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public <K, V> Uni<V> get(K key, Function<K, V> valueLoader) {
throw new UnsupportedOperationException("This method is not tested here");
}

@Override
public <K, V> Uni<V> getAsync(K key, Function<K, Uni<V>> valueLoader) {
throw new UnsupportedOperationException("This method is not tested here");
}

@Override
public Uni<Void> invalidate(Object key) {
throw new UnsupportedOperationException("This method is not tested here");
Expand Down
14 changes: 14 additions & 0 deletions extensions/cache/runtime/src/main/java/io/quarkus/cache/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,19 @@ public interface Cache {
*/
<K, V> Uni<V> get(K key, Function<K, V> valueLoader);

/**
* Returns a lazy asynchronous action that will emit the cache value identified by {@code key}, obtaining that value from
* {@code valueLoader} if necessary.
*
* @param <K>
* @param <V>
* @param key
* @param valueLoader
* @return a lazy asynchronous action that will emit a cache value
* @throws NullPointerException if the key is {@code null}
*/
<K, V> Uni<V> getAsync(K key, Function<K, Uni<V>> valueLoader);

/**
* Removes the cache entry identified by {@code key} from the cache. If the key does not identify any cache entry, nothing
* will happen.
Expand Down Expand Up @@ -69,4 +82,5 @@ public interface Cache {
* @throws IllegalStateException if this cache is not an instance of {@code type}
*/
<T extends Cache> T as(Class<T> type);

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class CaffeineCacheImpl extends AbstractCache implements CaffeineCache {

private final CaffeineCacheInfo cacheInfo;
private final StatsCounter statsCounter;
private final boolean recordStats;

public CaffeineCacheImpl(CaffeineCacheInfo cacheInfo, boolean recordStats) {
this.cacheInfo = cacheInfo;
Expand All @@ -56,6 +57,7 @@ public CaffeineCacheImpl(CaffeineCacheInfo cacheInfo, boolean recordStats) {
if (cacheInfo.expireAfterAccess != null) {
builder.expireAfterAccess(cacheInfo.expireAfterAccess);
}
this.recordStats = recordStats;
if (recordStats) {
LOGGER.tracef("Recording Caffeine stats for cache [%s]", cacheInfo.name);
statsCounter = new ConcurrentStatsCounter();
Expand Down Expand Up @@ -94,6 +96,12 @@ public CompletionStage<V> get() {
});
}

@Override
public <K, V> Uni<V> getAsync(K key, Function<K, Uni<V>> valueLoader) {
Objects.requireNonNull(key, NULL_KEYS_NOT_SUPPORTED_MSG);
return recordStats ? computeWithStats(key, valueLoader) : computeWithoutStats(key, valueLoader);
}

@Override
public <V> CompletableFuture<V> getIfPresent(Object key) {
Objects.requireNonNull(key, NULL_KEYS_NOT_SUPPORTED_MSG);
Expand Down Expand Up @@ -293,4 +301,38 @@ private <T> T cast(Object value) {
"An existing cached value type does not match the type returned by the value loading function", e);
}
}

@SuppressWarnings("unchecked")
private <K, V> Uni<V> computeWithStats(K key, Function<K, Uni<V>> valueLoader) {
return (Uni<V>) Uni.createFrom()
.completionStage(cache.asMap().compute(key,
new BiFunction<Object, CompletableFuture<Object>, CompletableFuture<Object>>() {
@Override
public CompletableFuture<Object> apply(Object key, CompletableFuture<Object> value) {
if (value == null) {
statsCounter.recordMisses(1);
return (CompletableFuture<Object>) valueLoader.apply((K) key)
.map(i -> NullValueConverter.toCacheValue(i))
.subscribeAsCompletionStage();
} else {
LOGGER.tracef("Key [%s] found in cache [%s]", key, cacheInfo.name);
statsCounter.recordHits(1);
return value;
}
}
}));
}

@SuppressWarnings("unchecked")
private <K, V> Uni<V> computeWithoutStats(K key, Function<K, Uni<V>> valueLoader) {
return (Uni<V>) Uni.createFrom()
.completionStage(cache.asMap().computeIfAbsent(key, new Function<Object, CompletableFuture<Object>>() {
@Override
public CompletableFuture<Object> apply(Object key) {
return (CompletableFuture<Object>) valueLoader.apply((K) key)
.map(i -> NullValueConverter.toCacheValue(i))
.subscribeAsCompletionStage();
}
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public V get() {
});
}

@Override
public <K, V> Uni<V> getAsync(K key, Function<K, Uni<V>> valueLoader) {
return valueLoader.apply(key);
}

@Override
public Uni<Void> invalidate(Object key) {
return Uni.createFrom().voidItem();
Expand Down

0 comments on commit 455ca8a

Please sign in to comment.