From c2b48601c6503ddb46a9d5b5b71c0281b3baf4f8 Mon Sep 17 00:00:00 2001 From: "Monhemius, B. (Bart)" Date: Wed, 13 Sep 2023 13:24:49 +0200 Subject: [PATCH] Recompute cached value when the Redis connection fails. If the Redis client cannot connect to Redis (and only in this case), recompute the value. The value is neither cached nor written in the cache, making the cache implementation more robust to transient network issues. A log message (warning) has been added to detect the issue. Co-authored-by: Monhemius, B. (Bart) Co-authored-by: Clement Escoffier --- .../cache/redis/runtime/RedisCacheImpl.java | 46 +++++++++++++------ .../redis/runtime/RedisCacheImplTest.java | 45 +++++++++++++++++- 2 files changed, 77 insertions(+), 14 deletions(-) diff --git a/extensions/redis-cache/runtime/src/main/java/io/quarkus/cache/redis/runtime/RedisCacheImpl.java b/extensions/redis-cache/runtime/src/main/java/io/quarkus/cache/redis/runtime/RedisCacheImpl.java index 7ba1c30da0eed..2a60256424809 100644 --- a/extensions/redis-cache/runtime/src/main/java/io/quarkus/cache/redis/runtime/RedisCacheImpl.java +++ b/extensions/redis-cache/runtime/src/main/java/io/quarkus/cache/redis/runtime/RedisCacheImpl.java @@ -1,5 +1,6 @@ package io.quarkus.cache.redis.runtime; +import java.net.ConnectException; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.List; @@ -10,6 +11,8 @@ import java.util.function.Predicate; import java.util.function.Supplier; +import org.jboss.logging.Logger; + import io.quarkus.arc.Arc; import io.quarkus.arc.ArcContainer; import io.quarkus.cache.CacheException; @@ -35,6 +38,8 @@ */ public class RedisCacheImpl extends AbstractCache implements RedisCache { + private static final Logger log = Logger.getLogger(RedisCacheImpl.class); + private static final Map> PRIMITIVE_TO_CLASS_MAPPING = Map.of( "int", Integer.class, "byte", Byte.class, @@ -123,6 +128,19 @@ private String encodeKey(K key) { return new String(marshaller.encode(key), StandardCharsets.UTF_8); } + private Uni computeValue(K key, Function valueLoader, boolean isWorkerThread) { + if (isWorkerThread) { + return Uni.createFrom().item(new Supplier() { + @Override + public V get() { + return valueLoader.apply(key); + } + }).runSubscriptionOn(MutinyHelper.blockingExecutor(vertx.getDelegate())); + } else { + return Uni.createFrom().item(valueLoader.apply(key)); + } + } + @Override public Uni get(K key, Class clazz, Function valueLoader) { // With optimistic locking: @@ -148,17 +166,7 @@ public Uni apply(V cached) throws Exception { if (cached != null) { return Uni.createFrom().item(new StaticSupplier<>(cached)); } else { - Uni uni; - if (isWorkerThread) { - uni = Uni.createFrom().item(new Supplier() { - @Override - public V get() { - return valueLoader.apply(key); - } - }).runSubscriptionOn(MutinyHelper.blockingExecutor(vertx.getDelegate())); - } else { - uni = Uni.createFrom().item(valueLoader.apply(key)); - } + Uni uni = computeValue(key, valueLoader, isWorkerThread); return uni.onItem().call(new Function>() { @Override @@ -185,7 +193,15 @@ public Uni apply(V value) { } })); } - }); + }) + + .onFailure(ConnectException.class).recoverWithUni(new Function>() { + @Override + public Uni apply(Throwable e) { + log.warn("Unable to connect to Redis, recomputing cached value", e); + return computeValue(key, valueLoader, isWorkerThread); + } + }); } @Override @@ -215,7 +231,11 @@ public Uni apply(RedisConnection connection) { } }); } - }); + }) + .onFailure(ConnectException.class).recoverWithUni(e -> { + log.warn("Unable to connect to Redis, recomputing cached value", e); + return valueLoader.apply(key); + }); } @Override diff --git a/extensions/redis-cache/runtime/src/test/java/io/quarkus/cache/redis/runtime/RedisCacheImplTest.java b/extensions/redis-cache/runtime/src/test/java/io/quarkus/cache/redis/runtime/RedisCacheImplTest.java index c69a50ff13796..c1e7466a4ffbe 100644 --- a/extensions/redis-cache/runtime/src/test/java/io/quarkus/cache/redis/runtime/RedisCacheImplTest.java +++ b/extensions/redis-cache/runtime/src/test/java/io/quarkus/cache/redis/runtime/RedisCacheImplTest.java @@ -29,7 +29,12 @@ class RedisCacheImplTest extends RedisCacheTestBase { @AfterEach void clear() { - redis.send(Request.cmd(Command.FLUSHALL).arg("SYNC")).await().atMost(Duration.ofSeconds(10)); + try { + redis.send(Request.cmd(Command.FLUSHALL).arg("SYNC")).await() + .atMost(Duration.ofSeconds(10)); + } catch (Exception ignored) { + // ignored. + } } @Test @@ -45,6 +50,18 @@ public void testPutInTheCache() { assertThat(r).isNotNull(); } + @Test + public void testPutInTheCacheWithoutRedis() { + String k = UUID.randomUUID().toString(); + RedisCacheInfo info = new RedisCacheInfo(); + info.name = "foo"; + info.valueType = String.class.getName(); + info.expireAfterWrite = Optional.of(Duration.ofSeconds(2)); + RedisCacheImpl cache = new RedisCacheImpl(info, vertx, redis, BLOCKING_ALLOWED); + server.close(); + assertThat(cache.get(k, s -> "hello").await().indefinitely()).isEqualTo("hello"); + } + @Test public void testPutInTheCacheWithOptimisticLocking() { String k = UUID.randomUUID().toString(); @@ -355,6 +372,32 @@ void testAsyncGetWithDefaultType() { })); } + @Test + void testAsyncGetWithDefaultTypeWithoutRedis() { + RedisCacheInfo info = new RedisCacheInfo(); + info.name = "star-wars"; + info.expireAfterWrite = Optional.of(Duration.ofSeconds(2)); + info.valueType = Person.class.getName(); + RedisCacheImpl cache = new RedisCacheImpl(info, vertx, redis, BLOCKING_ALLOWED); + + server.close(); + + assertThat(cache + .getAsync("test", + x -> Uni.createFrom().item(new Person("luke", "skywalker")) + .runSubscriptionOn(Infrastructure.getDefaultExecutor())) + .await().indefinitely()).satisfies(p -> { + assertThat(p.firstName).isEqualTo("luke"); + assertThat(p.lastName).isEqualTo("skywalker"); + }); + + assertThat(cache.getAsync("test", x -> Uni.createFrom().item(new Person("leia", "organa"))) + .await().indefinitely()).satisfies(p -> { + assertThat(p.firstName).isEqualTo("leia"); + assertThat(p.lastName).isEqualTo("organa"); + }); + } + @Test void testAsyncGetWithDefaultTypeWithOptimisticLocking() { RedisCacheInfo info = new RedisCacheInfo();