diff --git a/lib/trino-collect/src/main/java/io/trino/collect/cache/EmptyCache.java b/lib/trino-collect/src/main/java/io/trino/collect/cache/EmptyCache.java new file mode 100644 index 000000000000..8ebfe3fe934d --- /dev/null +++ b/lib/trino-collect/src/main/java/io/trino/collect/cache/EmptyCache.java @@ -0,0 +1,238 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.collect.cache; + +import com.google.common.cache.AbstractLoadingCache; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.CacheStats; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.UncheckedExecutionException; + +import javax.annotation.CheckForNull; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; + +import static java.util.Objects.requireNonNull; + +class EmptyCache + extends AbstractLoadingCache +{ + private final CacheLoader loader; + private final StatsCounter statsCounter; + + EmptyCache(CacheLoader loader, boolean recordStats) + { + this.loader = requireNonNull(loader, "loader is null"); + this.statsCounter = recordStats ? new SimpleStatsCounter() : new NoopStatsCounter(); + } + + @CheckForNull + @Override + public V getIfPresent(Object key) + { + statsCounter.recordMisses(1); + return null; + } + + @Override + public V get(K key) + throws ExecutionException + { + return get(key, () -> loader.load(key)); + } + + @Override + public V get(K key, Callable valueLoader) + throws ExecutionException + { + statsCounter.recordMisses(1); + try { + V value = valueLoader.call(); + statsCounter.recordLoadSuccess(1); + return value; + } + catch (RuntimeException e) { + statsCounter.recordLoadException(1); + throw new UncheckedExecutionException(e); + } + catch (Exception e) { + statsCounter.recordLoadException(1); + throw new ExecutionException(e); + } + } + + @Override + public void put(K key, V value) + { + // Cache, even if configured to evict everything immediately, should allow writes. + } + + @Override + public void refresh(K key) {} + + @Override + public void invalidate(Object key) {} + + @Override + public void invalidateAll(Iterable keys) {} + + @Override + public void invalidateAll() {} + + @Override + public long size() + { + return 0; + } + + @Override + public CacheStats stats() + { + return statsCounter.snapshot(); + } + + @Override + public ConcurrentMap asMap() + { + return new ConcurrentMap() + { + @Override + public V putIfAbsent(K key, V value) + { + // Cache, even if configured to evict everything immediately, should allow writes. + return value; + } + + @Override + public boolean remove(Object key, Object value) + { + return false; + } + + @Override + public boolean replace(K key, V oldValue, V newValue) + { + return false; + } + + @Override + public V replace(K key, V value) + { + return null; + } + + @Override + public int size() + { + return 0; + } + + @Override + public boolean isEmpty() + { + return true; + } + + @Override + public boolean containsKey(Object key) + { + return false; + } + + @Override + public boolean containsValue(Object value) + { + return false; + } + + @Override + public V get(Object key) + { + return null; + } + + @Override + public V put(K key, V value) + { + // Cache, even if configured to evict everything immediately, should allow writes. + return null; + } + + @Override + public V remove(Object key) + { + return null; + } + + @Override + public void putAll(Map m) + { + // Cache, even if configured to evict everything immediately, should allow writes. + } + + @Override + public void clear() {} + + @Override + public Set keySet() + { + return ImmutableSet.of(); + } + + @Override + public Collection values() + { + return ImmutableSet.of(); + } + + @Override + public Set> entrySet() + { + return ImmutableSet.of(); + } + }; + } + + private static class NoopStatsCounter + implements StatsCounter + { + private static final CacheStats EMPTY_STATS = new SimpleStatsCounter().snapshot(); + + @Override + public void recordHits(int count) {} + + @Override + public void recordMisses(int count) {} + + @Override + public void recordLoadSuccess(long loadTime) {} + + @Override + public void recordLoadException(long loadTime) {} + + @Override + public void recordEviction() {} + + @Override + public CacheStats snapshot() + { + return EMPTY_STATS; + } + } +} diff --git a/lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableCacheBuilder.java b/lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableCacheBuilder.java index fbc4dc39be21..ed28c28dbc5c 100644 --- a/lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableCacheBuilder.java +++ b/lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableCacheBuilder.java @@ -51,6 +51,7 @@ public static EvictableCacheBuilder newBuilder() private Optional maximumWeight = Optional.empty(); private Optional, ? super V>> weigher = Optional.empty(); private boolean recordStats; + private Optional disabledCacheImplementation = Optional.empty(); private EvictableCacheBuilder() {} @@ -109,6 +110,27 @@ public EvictableCacheBuilder recordStats() return this; } + /** + * Choose a behavior for case when caching is disabled that may allow data and failure sharing between concurrent callers. + */ + public EvictableCacheBuilder shareResultsAndFailuresEvenIfDisabled() + { + checkState(!disabledCacheImplementation.isPresent(), "disabledCacheImplementation already set"); + disabledCacheImplementation = Optional.of(DisabledCacheImplementation.GUAVA); + return this; + } + + /** + * Choose a behavior for case when caching is disabled that prevents data and failure sharing between concurrent callers. + * Note: disabled cache won't report any statistics. + */ + public EvictableCacheBuilder shareNothingWhenDisabled() + { + checkState(!disabledCacheImplementation.isPresent(), "disabledCacheImplementation already set"); + disabledCacheImplementation = Optional.of(DisabledCacheImplementation.NOOP); + return this; + } + @CheckReturnValue public Cache build() { @@ -119,15 +141,26 @@ public Cache build() public LoadingCache build(CacheLoader loader) { if (cacheDisabled()) { - // Disabled cache is always empty, so doesn't exhibit invalidation problems. - // Avoid overhead of EvictableCache wrapper. - CacheBuilder cacheBuilder = CacheBuilder.newBuilder() - .maximumSize(0) - .expireAfterWrite(0, SECONDS); - if (recordStats) { - cacheBuilder.recordStats(); + // Silently providing a behavior different from Guava's could be surprising, so require explicit choice. + DisabledCacheImplementation disabledCacheImplementation = this.disabledCacheImplementation.orElseThrow(() -> new IllegalStateException( + "Even when cache is disabled, the loads are synchronized and both load results and failures are shared between threads. " + + "This is rarely desired, thus builder caller is expected to either opt-in into this behavior with shareResultsAndFailuresEvenIfDisabled(), " + + "or choose not to share results (and failures) between concurrent invocations with shareNothingWhenDisabled().")); + switch (disabledCacheImplementation) { + case NOOP: + return new EmptyCache<>(loader, recordStats); + case GUAVA: + // Disabled cache is always empty, so doesn't exhibit invalidation problems. + // Avoid overhead of EvictableCache wrapper. + CacheBuilder cacheBuilder = CacheBuilder.newBuilder() + .maximumSize(0) + .expireAfterWrite(0, SECONDS); + if (recordStats) { + cacheBuilder.recordStats(); + } + return buildUnsafeCache(cacheBuilder, loader); } - return buildUnsafeCache(cacheBuilder, loader); + throw new UnsupportedOperationException("Unsupported option: " + disabledCacheImplementation); } if (!(maximumSize.isPresent() || maximumWeight.isPresent() || expireAfterWrite.isPresent())) { @@ -191,4 +224,10 @@ private static Duration toDuration(long duration, TimeUnit unit) // Saturated conversion, as in com.google.common.cache.CacheBuilder.toNanosSaturated return Duration.ofNanos(unit.toNanos(duration)); } + + private enum DisabledCacheImplementation + { + NOOP, + GUAVA, + } } diff --git a/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEmptyCache.java b/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEmptyCache.java new file mode 100644 index 000000000000..22d273769b7c --- /dev/null +++ b/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEmptyCache.java @@ -0,0 +1,86 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.collect.cache; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheLoader; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.util.concurrent.Executors.newFixedThreadPool; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestEmptyCache +{ + private static final int TEST_TIMEOUT_MILLIS = 10_000; + + @Test(timeOut = TEST_TIMEOUT_MILLIS) + public void testLoadFailure() + throws Exception + { + Cache cache = new EmptyCache<>( + CacheLoader.from(() -> { + throw new UnsupportedOperationException(); + }), + false); + int key = 10; + + ExecutorService executor = newFixedThreadPool(2); + try { + AtomicBoolean first = new AtomicBoolean(true); + CyclicBarrier barrier = new CyclicBarrier(2); + + List> futures = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + futures.add(executor.submit(() -> { + barrier.await(10, SECONDS); + return cache.get(key, () -> { + if (first.compareAndSet(true, false)) { + // first + Thread.sleep(1); // increase chances that second thread calls cache.get before we return + throw new RuntimeException("first attempt is poised to fail"); + } + return "success"; + }); + })); + } + + List results = new ArrayList<>(); + for (Future future : futures) { + try { + results.add(future.get()); + } + catch (ExecutionException e) { + results.add(e.getCause().toString()); + } + } + + assertThat(results).containsExactlyInAnyOrder( + "success", + "com.google.common.util.concurrent.UncheckedExecutionException: java.lang.RuntimeException: first attempt is poised to fail"); + } + finally { + executor.shutdownNow(); + executor.awaitTermination(10, SECONDS); + } + } +} diff --git a/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableCache.java b/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableCache.java index 76e68ce3249c..0124c5c317bd 100644 --- a/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableCache.java +++ b/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableCache.java @@ -19,14 +19,18 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import org.gaul.modernizer_maven_annotations.SuppressModernizer; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; @@ -36,8 +40,10 @@ import static java.lang.Math.toIntExact; import static java.lang.String.format; import static java.util.concurrent.Executors.newFixedThreadPool; +import static java.util.concurrent.TimeUnit.DAYS; import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotSame; @@ -131,13 +137,32 @@ public void testReplace() assertEquals(cache.asMap().keySet(), ImmutableSet.of(key)); } - @Test(timeOut = TEST_TIMEOUT_MILLIS) - public void testDisabledCache() + @Test(timeOut = TEST_TIMEOUT_MILLIS, dataProvider = "testDisabledCacheDataProvider") + public void testDisabledCache(String behavior) throws Exception { - Cache cache = EvictableCacheBuilder.newBuilder() - .maximumSize(0) - .build(); + EvictableCacheBuilder builder = EvictableCacheBuilder.newBuilder() + .maximumSize(0); + + switch (behavior) { + case "share-nothing": + builder.shareNothingWhenDisabled(); + break; + case "guava": + builder.shareResultsAndFailuresEvenIfDisabled(); + break; + case "none": + assertThatThrownBy(builder::build) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Even when cache is disabled, the loads are synchronized and both load results and failures are shared between threads. " + + "This is rarely desired, thus builder caller is expected to either opt-in into this behavior with shareResultsAndFailuresEvenIfDisabled(), " + + "or choose not to share results (and failures) between concurrent invocations with shareNothingWhenDisabled()."); + return; + default: + throw new UnsupportedOperationException("Unsupported: " + behavior); + } + + Cache cache = builder.build(); for (int i = 0; i < 10; i++) { int value = i * 10; @@ -149,6 +174,16 @@ public void testDisabledCache() assertThat(cache.asMap().values()).as("values").isEmpty(); } + @DataProvider + public static Object[][] testDisabledCacheDataProvider() + { + return new Object[][] { + {"share-nothing"}, + {"guava"}, + {"none"}, + }; + } + @Test(timeOut = TEST_TIMEOUT_MILLIS) public void testLoadStats() throws Exception @@ -178,6 +213,58 @@ public void testLoadStats() assertEquals(value, "abc"); } + @Test(timeOut = TEST_TIMEOUT_MILLIS, invocationCount = 10, successPercentage = 50) + public void testLoadFailure() + throws Exception + { + Cache cache = EvictableCacheBuilder.newBuilder() + .maximumSize(0) + .expireAfterWrite(0, DAYS) + .shareResultsAndFailuresEvenIfDisabled() + .build(); + int key = 10; + + ExecutorService executor = newFixedThreadPool(2); + try { + AtomicBoolean first = new AtomicBoolean(true); + CyclicBarrier barrier = new CyclicBarrier(2); + + List> futures = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + futures.add(executor.submit(() -> { + barrier.await(10, SECONDS); + return cache.get(key, () -> { + if (first.compareAndSet(true, false)) { + // first + Thread.sleep(1); // increase chances that second thread calls cache.get before we return + throw new RuntimeException("first attempt is poised to fail"); + } + return "success"; + }); + })); + } + + List results = new ArrayList<>(); + for (Future future : futures) { + try { + results.add(future.get()); + } + catch (ExecutionException e) { + results.add(e.getCause().toString()); + } + } + + // Note: if this starts to fail, that suggests that Guava implementation changed and NoopCache may be redundant now. + assertThat(results).containsExactly( + "com.google.common.util.concurrent.UncheckedExecutionException: java.lang.RuntimeException: first attempt is poised to fail", + "com.google.common.util.concurrent.UncheckedExecutionException: java.lang.RuntimeException: first attempt is poised to fail"); + } + finally { + executor.shutdownNow(); + executor.awaitTermination(10, SECONDS); + } + } + @SuppressModernizer private static Integer newInteger(int value) { diff --git a/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableLoadingCache.java b/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableLoadingCache.java index 66d029816a43..06f3d87144c6 100644 --- a/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableLoadingCache.java +++ b/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableLoadingCache.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import org.gaul.modernizer_maven_annotations.SuppressModernizer; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import java.util.List; @@ -44,6 +45,7 @@ import static java.util.concurrent.Executors.newFixedThreadPool; import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotSame; import static org.testng.Assert.assertSame; @@ -104,13 +106,33 @@ public void testEvictByWeight() assertThat(cache.asMap().values().stream().mapToInt(String::length).sum()).as("values length sum").isLessThanOrEqualTo(20); } - @Test(timeOut = TEST_TIMEOUT_MILLIS) - public void testDisabledCache() + @Test(timeOut = TEST_TIMEOUT_MILLIS, dataProvider = "testDisabledCacheDataProvider") + public void testDisabledCache(String behavior) throws Exception { - LoadingCache cache = EvictableCacheBuilder.newBuilder() - .maximumSize(0) - .build(CacheLoader.from(key -> key * 10)); + CacheLoader loader = CacheLoader.from(key -> key * 10); + EvictableCacheBuilder builder = EvictableCacheBuilder.newBuilder() + .maximumSize(0); + + switch (behavior) { + case "share-nothing": + builder.shareNothingWhenDisabled(); + break; + case "guava": + builder.shareResultsAndFailuresEvenIfDisabled(); + break; + case "none": + assertThatThrownBy(() -> builder.build(loader)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Even when cache is disabled, the loads are synchronized and both load results and failures are shared between threads. " + + "This is rarely desired, thus builder caller is expected to either opt-in into this behavior with shareResultsAndFailuresEvenIfDisabled(), " + + "or choose not to share results (and failures) between concurrent invocations with shareNothingWhenDisabled()."); + return; + default: + throw new UnsupportedOperationException("Unsupported: " + behavior); + } + + LoadingCache cache = builder.build(loader); for (int i = 0; i < 10; i++) { assertEquals((Object) cache.get(i), i * 10); @@ -121,6 +143,16 @@ public void testDisabledCache() assertThat(cache.asMap().values()).as("values").isEmpty(); } + @DataProvider + public static Object[][] testDisabledCacheDataProvider() + { + return new Object[][] { + {"share-nothing"}, + {"guava"}, + {"none"}, + }; + } + @Test(timeOut = TEST_TIMEOUT_MILLIS) public void testLoadStats() throws Exception diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java index fd48e6499af3..8ab0522a60e3 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java @@ -111,6 +111,7 @@ public CachingJdbcClient( EvictableCacheBuilder cacheBuilder = EvictableCacheBuilder.newBuilder() .expireAfterWrite(metadataCachingTtl.toMillis(), MILLISECONDS) + .shareNothingWhenDisabled() .recordStats(); if (metadataCachingTtl.equals(CACHING_DISABLED)) { diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestCachingJdbcClient.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestCachingJdbcClient.java index 16776e6c3ca2..5965bce97e0f 100644 --- a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestCachingJdbcClient.java +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestCachingJdbcClient.java @@ -42,8 +42,11 @@ import java.util.Optional; import java.util.OptionalLong; import java.util.Set; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.trino.collect.cache.CacheStatsAssertions.assertCacheStats; @@ -58,6 +61,7 @@ import static java.util.concurrent.Executors.newCachedThreadPool; import static java.util.concurrent.TimeUnit.DAYS; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -621,6 +625,76 @@ public void testConcurrentSchemaCreateAndDrop() futures.forEach(Futures::getUnchecked); } + @Test(timeOut = 60_000) + public void testLoadFailureNotSharedWhenDisabled() + throws Exception + { + AtomicBoolean first = new AtomicBoolean(true); + CyclicBarrier barrier = new CyclicBarrier(2); + + CachingJdbcClient cachingJdbcClient = new CachingJdbcClient( + new ForwardingJdbcClient() + { + private final JdbcClient delegate = database.getJdbcClient(); + + @Override + protected JdbcClient delegate() + { + return delegate; + } + + @Override + public Optional getTableHandle(ConnectorSession session, SchemaTableName schemaTableName) + { + if (first.compareAndSet(true, false)) { + // first + try { + // increase chances that second thread calls cache.get before we return + Thread.sleep(5); + } + catch (InterruptedException e1) { + throw new RuntimeException(e1); + } + throw new RuntimeException("first attempt is poised to fail"); + } + return super.getTableHandle(session, schemaTableName); + } + }, + SESSION_PROPERTIES_PROVIDERS, + new SingletonIdentityCacheMapping(), + // ttl is 0, cache is disabled + new Duration(0, DAYS), + true, + 10); + + SchemaTableName tableName = new SchemaTableName(schema, "test_load_failure_not_shared"); + createTable(tableName); + ConnectorSession session = createSession("session"); + + List> futures = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + futures.add(executor.submit(() -> { + barrier.await(10, SECONDS); + return cachingJdbcClient.getTableHandle(session, tableName).orElseThrow(); + })); + } + + List results = new ArrayList<>(); + for (Future future : futures) { + try { + results.add(future.get().toString()); + } + catch (ExecutionException e) { + results.add(e.getCause().toString()); + } + } + + assertThat(results) + .containsExactlyInAnyOrder( + "example.test_load_failure_not_shared " + database.getDatabaseName() + ".EXAMPLE.TEST_LOAD_FAILURE_NOT_SHARED", + "com.google.common.util.concurrent.UncheckedExecutionException: java.lang.RuntimeException: first attempt is poised to fail"); + } + private JdbcTableHandle getAnyTable(String schema) { SchemaTableName tableName = jdbcClient.getTableNames(SESSION, Optional.of(schema))