diff --git a/lib/trino-plugin-toolkit/pom.xml b/lib/trino-plugin-toolkit/pom.xml
index e848f83e5db5..ebb2211fb649 100644
--- a/lib/trino-plugin-toolkit/pom.xml
+++ b/lib/trino-plugin-toolkit/pom.xml
@@ -146,6 +146,13 @@
test
+
+ org.gaul
+ modernizer-maven-annotations
+ 2.1.0
+ test
+
+
org.testng
testng
diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/cache/EvictableCache.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/cache/EvictableCache.java
index 97cedec90f9e..43e03d6e05b2 100644
--- a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/cache/EvictableCache.java
+++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/cache/EvictableCache.java
@@ -36,10 +36,10 @@
import static java.util.Objects.requireNonNull;
/**
- * A Cache implementation similar to ones produced by {@code com.google.common.cache.CacheBuilder},
- * but one that does not exhibits Guava issue #1881, i.e.
- * a {@link #getIfPresent(Object)} after {@link #invalidate(Object)} is guaranteed to return {@code null} and
- * {@link #get(Object, Callable)} after {@link #invalidate(Object)} is guaranteed to load a fresh value.
+ * A {@link Cache} implementation similar to ones produced by {@link CacheBuilder#build()}, but one that does not exhibit
+ * Guava issue #1881: a cache inspection with
+ * {@link #getIfPresent(Object)} or {@link #get(Object, Callable)} is guaranteed to return fresh state after
+ * {@link #invalidate(Object)}, {@link #invalidateAll(Iterable)} or {@link #invalidateAll()} were called.
*/
public class EvictableCache
extends AbstractCache
diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/cache/EvictableLoadingCache.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/cache/EvictableLoadingCache.java
new file mode 100644
index 000000000000..a9b532638b14
--- /dev/null
+++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/cache/EvictableLoadingCache.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.base.cache;
+
+import com.google.common.cache.AbstractLoadingCache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.CacheStats;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkState;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+/**
+ * A {@link LoadingCache} implementation similar to ones produced by {@link CacheBuilder#build(CacheLoader)},
+ * but one that does not exhibit Guava issue #1881:
+ * a cache inspection with {@link #getIfPresent(Object)} or {@link #get(Object)} is guaranteed to return fresh
+ * state after {@link #invalidate(Object)}, {@link #invalidateAll(Iterable)} or {@link #invalidateAll()} were called.
+ */
+public class EvictableLoadingCache
+ extends AbstractLoadingCache
+{
+ public static LoadingCache build(
+ OptionalLong expiresAfterWriteMillis,
+ OptionalLong refreshMillis,
+ long maximumSize,
+ boolean recordStats,
+ CacheLoader cacheLoader)
+ {
+ requireNonNull(cacheLoader, "cacheLoader is null");
+
+ CacheBuilder
+
+ io.trino
+ trino-plugin-toolkit
+ test-jar
+ test
+
+
io.trino
trino-spi
diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestCachingJdbcClient.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestCachingJdbcClient.java
index 4e49c8cd5d4b..b3e219805946 100644
--- a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestCachingJdbcClient.java
+++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestCachingJdbcClient.java
@@ -13,11 +13,11 @@
*/
package io.trino.plugin.jdbc;
-import com.google.common.cache.CacheStats;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import io.airlift.units.Duration;
+import io.trino.plugin.base.cache.CacheStatsAssertions;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.plugin.jdbc.credential.ExtraCredentialConfig;
import io.trino.spi.connector.ColumnHandle;
@@ -44,9 +44,9 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.function.Supplier;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
+import static io.trino.plugin.base.cache.CacheStatsAssertions.assertCacheStats;
import static io.trino.spi.session.PropertyMetadata.stringProperty;
import static io.trino.spi.testing.InterfaceTestUtils.assertAllMethodsOverridden;
import static io.trino.spi.type.IntegerType.INTEGER;
@@ -55,7 +55,6 @@
import static java.lang.Math.abs;
import static java.lang.Math.min;
import static java.util.Collections.emptyList;
-import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.TimeUnit.DAYS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -673,70 +672,17 @@ public void testEverythingImplemented()
private static CacheStatsAssertions assertTableNamesCache(CachingJdbcClient cachingJdbcClient)
{
- return new CacheStatsAssertions(cachingJdbcClient::getTableNamesCacheStats);
+ return assertCacheStats(cachingJdbcClient::getTableNamesCacheStats);
}
private static CacheStatsAssertions assertColumnCacheStats(CachingJdbcClient client)
{
- return new CacheStatsAssertions(client::getColumnsCacheStats);
+ return assertCacheStats(client::getColumnsCacheStats);
}
private static CacheStatsAssertions assertStatisticsCacheStats(CachingJdbcClient client)
{
- return new CacheStatsAssertions(client::getStatisticsCacheStats);
- }
-
- private static final class CacheStatsAssertions
- {
- private final Supplier stats;
-
- private long loads;
- private long hits;
- private long misses;
-
- private CacheStatsAssertions(Supplier stats)
- {
- this.stats = requireNonNull(stats, "stats is null");
- }
-
- public CacheStatsAssertions loads(long value)
- {
- this.loads = value;
- return this;
- }
-
- public CacheStatsAssertions hits(long value)
- {
- this.hits = value;
- return this;
- }
-
- public CacheStatsAssertions misses(long value)
- {
- this.misses = value;
- return this;
- }
-
- public void afterRunning(Runnable runnable)
- {
- CacheStats beforeStats = stats.get();
- runnable.run();
- CacheStats afterStats = stats.get();
-
- long expectedLoad = beforeStats.loadCount() + loads;
- long expectedMisses = beforeStats.missCount() + misses;
- long expectedHits = beforeStats.hitCount() + hits;
-
- assertThat(afterStats.loadCount())
- .withFailMessage("Expected load count is %d but actual is %d", expectedLoad, afterStats.loadCount())
- .isEqualTo(expectedLoad);
- assertThat(afterStats.hitCount())
- .withFailMessage("Expected hit count is %d but actual is %d", expectedHits, afterStats.hitCount())
- .isEqualTo(expectedHits);
- assertThat(afterStats.missCount())
- .withFailMessage("Expected miss count is %d but actual is %d", expectedMisses, afterStats.missCount())
- .isEqualTo(expectedMisses);
- }
+ return assertCacheStats(client::getStatisticsCacheStats);
}
private static String randomSuffix()
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java
index 528cf56adaef..095138b9ceb5 100644
--- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java
@@ -13,7 +13,6 @@
*/
package io.trino.plugin.hive.metastore.cache;
-import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
@@ -23,6 +22,7 @@
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.airlift.jmx.CacheStatsMBean;
import io.airlift.units.Duration;
+import io.trino.plugin.base.cache.EvictableLoadingCache;
import io.trino.plugin.hive.HivePartition;
import io.trino.plugin.hive.HiveType;
import io.trino.plugin.hive.PartitionNotFoundException;
@@ -70,6 +70,7 @@
import java.util.function.Function;
import java.util.function.Predicate;
+import static com.google.common.base.Functions.identity;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
@@ -82,7 +83,6 @@
import static com.google.common.collect.ImmutableSetMultimap.toImmutableSetMultimap;
import static com.google.common.collect.Maps.immutableEntry;
import static com.google.common.collect.Streams.stream;
-import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static io.trino.plugin.hive.HivePartitionManager.extractPartitionValues;
import static io.trino.plugin.hive.metastore.HivePartitionName.hivePartitionName;
import static io.trino.plugin.hive.metastore.HiveTableName.hiveTableName;
@@ -91,7 +91,6 @@
import static io.trino.plugin.hive.metastore.cache.CachingHiveMetastoreConfig.isCacheEnabled;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.hadoop.hive.common.FileUtils.makePartName;
/**
@@ -141,12 +140,12 @@ public static CachingHiveMetastore cachingHiveMetastore(HiveMetastore delegate,
format("Invalid cache parameters (cacheTtl: %s, maxSize: %s)", cacheTtl, maximumSize));
return new CachingHiveMetastore(
delegate,
- executor,
OptionalLong.of(cacheTtl.toMillis()),
refreshInterval
.map(Duration::toMillis)
.map(OptionalLong::of)
.orElseGet(OptionalLong::empty),
+ Optional.of(executor),
maximumSize,
StatsRecording.ENABLED);
}
@@ -155,90 +154,50 @@ public static CachingHiveMetastore memoizeMetastore(HiveMetastore delegate, long
{
return new CachingHiveMetastore(
delegate,
- newDirectExecutorService(),
OptionalLong.empty(),
OptionalLong.empty(),
+ Optional.empty(),
maximumSize,
StatsRecording.DISABLED);
}
- protected CachingHiveMetastore(HiveMetastore delegate, Executor executor, OptionalLong expiresAfterWriteMillis, OptionalLong refreshMills, long maximumSize, StatsRecording statsRecording)
+ protected CachingHiveMetastore(HiveMetastore delegate, OptionalLong expiresAfterWriteMillis, OptionalLong refreshMills, Optional executor, long maximumSize, StatsRecording statsRecording)
{
this.delegate = requireNonNull(delegate, "delegate is null");
requireNonNull(executor, "executor is null");
- databaseNamesCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize, statsRecording)
- .build(asyncReloading(CacheLoader.from(this::loadAllDatabases), executor));
+ databaseNamesCache = buildCache(expiresAfterWriteMillis, refreshMills, executor, maximumSize, statsRecording, ignored -> loadAllDatabases());
- databaseCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize, statsRecording)
- .build(asyncReloading(CacheLoader.from(this::loadDatabase), executor));
+ databaseCache = buildCache(expiresAfterWriteMillis, refreshMills, executor, maximumSize, statsRecording, this::loadDatabase);
- tableNamesCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize, statsRecording)
- .build(asyncReloading(CacheLoader.from(this::loadAllTables), executor));
+ tableNamesCache = buildCache(expiresAfterWriteMillis, refreshMills, executor, maximumSize, statsRecording, this::loadAllTables);
- tablesWithParameterCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize, statsRecording)
- .build(asyncReloading(CacheLoader.from(this::loadTablesMatchingParameter), executor));
+ tablesWithParameterCache = buildCache(expiresAfterWriteMillis, refreshMills, executor, maximumSize, statsRecording, this::loadTablesMatchingParameter);
- tableStatisticsCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize, statsRecording)
- .build(asyncReloading(CacheLoader.from(this::loadTableColumnStatistics), executor));
+ tableStatisticsCache = buildCache(expiresAfterWriteMillis, refreshMills, executor, maximumSize, statsRecording, this::loadTableColumnStatistics);
// disable refresh since it can't use the bulk loading and causes too many requests
- partitionStatisticsCache = newCacheBuilder(expiresAfterWriteMillis, OptionalLong.empty(), maximumSize, statsRecording)
- .build(asyncReloading(new CacheLoader<>()
- {
- @Override
- public PartitionStatistics load(WithIdentity key)
- {
- return loadPartitionColumnStatistics(key);
- }
-
- @Override
- public Map, PartitionStatistics> loadAll(Iterable extends WithIdentity> keys)
- {
- return loadPartitionColumnStatistics(keys);
- }
- }, executor));
-
- tableCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize, statsRecording)
- .build(asyncReloading(CacheLoader.from(this::loadTable), executor));
-
- viewNamesCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize, statsRecording)
- .build(asyncReloading(CacheLoader.from(this::loadAllViews), executor));
-
- partitionFilterCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize, statsRecording)
- .build(asyncReloading(CacheLoader.from(this::loadPartitionNamesByFilter), executor));
+ partitionStatisticsCache = buildCache(expiresAfterWriteMillis, maximumSize, statsRecording, this::loadPartitionColumnStatistics, this::loadPartitionsColumnStatistics);
- // disable refresh since it can't use the bulk loading and causes too many requests
- partitionCache = newCacheBuilder(expiresAfterWriteMillis, OptionalLong.empty(), maximumSize, statsRecording)
- .build(asyncReloading(new CacheLoader<>()
- {
- @Override
- public Optional load(WithIdentity partitionName)
- {
- return loadPartitionByName(partitionName);
- }
+ tableCache = buildCache(expiresAfterWriteMillis, refreshMills, executor, maximumSize, statsRecording, this::loadTable);
+
+ viewNamesCache = buildCache(expiresAfterWriteMillis, refreshMills, executor, maximumSize, statsRecording, this::loadAllViews);
- @Override
- public Map, Optional> loadAll(Iterable extends WithIdentity> partitionNames)
- {
- return loadPartitionsByNames(partitionNames);
- }
- }, executor));
+ partitionFilterCache = buildCache(expiresAfterWriteMillis, refreshMills, executor, maximumSize, statsRecording, this::loadPartitionNamesByFilter);
+
+ // disable refresh since it can't use the bulk loading and causes too many requests
+ partitionCache = buildCache(expiresAfterWriteMillis, maximumSize, statsRecording, this::loadPartitionByName, this::loadPartitionsByNames);
- tablePrivilegesCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize, statsRecording)
- .build(asyncReloading(CacheLoader.from(key -> loadTablePrivileges(key.getDatabase(), key.getTable(), key.getOwner(), key.getPrincipal())), executor));
+ tablePrivilegesCache = buildCache(expiresAfterWriteMillis, refreshMills, executor, maximumSize, statsRecording, key ->
+ loadTablePrivileges(key.getDatabase(), key.getTable(), key.getOwner(), key.getPrincipal()));
- rolesCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize, statsRecording)
- .build(asyncReloading(CacheLoader.from(this::loadRoles), executor));
+ rolesCache = buildCache(expiresAfterWriteMillis, refreshMills, executor, maximumSize, statsRecording, ignored -> loadRoles());
- roleGrantsCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize, statsRecording)
- .build(asyncReloading(CacheLoader.from(this::loadRoleGrants), executor));
+ roleGrantsCache = buildCache(expiresAfterWriteMillis, refreshMills, executor, maximumSize, statsRecording, this::loadRoleGrants);
- grantedPrincipalsCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize, statsRecording)
- .build(asyncReloading(CacheLoader.from(this::loadPrincipals), executor));
+ grantedPrincipalsCache = buildCache(expiresAfterWriteMillis, refreshMills, executor, maximumSize, statsRecording, this::loadPrincipals);
- configValuesCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize, statsRecording)
- .build(asyncReloading(CacheLoader.from(this::loadConfigValue), executor));
+ configValuesCache = buildCache(expiresAfterWriteMillis, refreshMills, executor, maximumSize, statsRecording, this::loadConfigValue);
}
@Managed
@@ -375,7 +334,7 @@ private PartitionStatistics loadPartitionColumnStatistics(WithIdentity, PartitionStatistics> loadPartitionColumnStatistics(Iterable extends WithIdentity> keys)
+ private Map, PartitionStatistics> loadPartitionsColumnStatistics(Iterable extends WithIdentity> keys)
{
SetMultimap, WithIdentity> tablePartitions = stream(keys)
.collect(toImmutableSetMultimap(value -> new WithIdentity<>(value.getIdentity(), value.getKey().getHiveTableName()), Function.identity()));
@@ -1056,20 +1015,63 @@ public void alterTransactionalTable(HiveIdentity identity, Table table, long tra
}
}
- private static CacheBuilder newCacheBuilder(OptionalLong expiresAfterWriteMillis, OptionalLong refreshMillis, long maximumSize, StatsRecording statsRecording)
+ private static LoadingCache buildCache(
+ OptionalLong expiresAfterWriteMillis,
+ OptionalLong refreshMillis,
+ Optional refreshExecutor,
+ long maximumSize,
+ StatsRecording statsRecording,
+ com.google.common.base.Function loader)
{
- CacheBuilder cacheBuilder = CacheBuilder.newBuilder();
- if (expiresAfterWriteMillis.isPresent()) {
- cacheBuilder = cacheBuilder.expireAfterWrite(expiresAfterWriteMillis.getAsLong(), MILLISECONDS);
- }
+ CacheLoader cacheLoader = CacheLoader.from(loader);
+
+ checkArgument(refreshMillis.isEmpty() || refreshExecutor.isPresent(), "refreshMillis is provided but refreshExecutor is not");
if (refreshMillis.isPresent() && (expiresAfterWriteMillis.isEmpty() || expiresAfterWriteMillis.getAsLong() > refreshMillis.getAsLong())) {
- cacheBuilder = cacheBuilder.refreshAfterWrite(refreshMillis.getAsLong(), MILLISECONDS);
+ cacheLoader = asyncReloading(cacheLoader, refreshExecutor.orElseThrow(() -> new IllegalArgumentException("Executor not provided")));
}
- cacheBuilder = cacheBuilder.maximumSize(maximumSize);
- if (statsRecording == StatsRecording.ENABLED) {
- cacheBuilder = cacheBuilder.recordStats();
+ else {
+ refreshMillis = OptionalLong.empty();
}
- return cacheBuilder;
+
+ return EvictableLoadingCache.build(
+ expiresAfterWriteMillis,
+ refreshMillis,
+ maximumSize,
+ statsRecording == StatsRecording.ENABLED,
+ cacheLoader);
+ }
+
+ private static LoadingCache buildCache(
+ OptionalLong expiresAfterWriteMillis,
+ long maximumSize,
+ StatsRecording statsRecording,
+ Function loader,
+ Function, Map> bulkLoader)
+ {
+ requireNonNull(loader, "loader is null");
+ requireNonNull(bulkLoader, "bulkLoader is null");
+ CacheLoader cacheLoader = new CacheLoader<>()
+ {
+ @Override
+ public V load(K key)
+ {
+ return loader.apply(key);
+ }
+
+ @Override
+ public Map loadAll(Iterable extends K> keys)
+ {
+ return bulkLoader.apply(Iterables.transform(keys, identity()));
+ }
+ };
+
+ return EvictableLoadingCache.build(
+ expiresAfterWriteMillis,
+ // cannot use refreshAfterWrite since it can't use the bulk loading and causes too many requests
+ OptionalLong.empty(),
+ maximumSize,
+ statsRecording == StatsRecording.ENABLED,
+ cacheLoader);
}
private static class WithIdentity
diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java
index 41f2bf5edada..1ed0520dd6f6 100644
--- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java
+++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java
@@ -79,7 +79,6 @@
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
-import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.IntStream;
@@ -196,7 +195,6 @@ protected void setup(String host, int port, String databaseName, boolean s3Selec
new ThriftMetastoreConfig(),
hdfsEnvironment,
false)),
- executor,
getBasePath(),
hdfsEnvironment);
locationService = new HiveLocationService(hdfsEnvironment);
@@ -543,9 +541,9 @@ protected static class TestingHiveMetastore
private final Path basePath;
private final HdfsEnvironment hdfsEnvironment;
- public TestingHiveMetastore(HiveMetastore delegate, Executor executor, Path basePath, HdfsEnvironment hdfsEnvironment)
+ public TestingHiveMetastore(HiveMetastore delegate, Path basePath, HdfsEnvironment hdfsEnvironment)
{
- super(delegate, executor, OptionalLong.empty(), OptionalLong.empty(), 0, StatsRecording.ENABLED);
+ super(delegate, OptionalLong.empty(), OptionalLong.empty(), Optional.empty(), 0, StatsRecording.ENABLED);
this.basePath = basePath;
this.hdfsEnvironment = hdfsEnvironment;
}
diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/UnimplementedHiveMetastore.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/UnimplementedHiveMetastore.java
index f63437d14a18..d2e17253559c 100644
--- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/UnimplementedHiveMetastore.java
+++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/UnimplementedHiveMetastore.java
@@ -29,7 +29,7 @@
import java.util.Set;
import java.util.function.Function;
-class UnimplementedHiveMetastore
+public class UnimplementedHiveMetastore
implements HiveMetastore
{
@Override
diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/cache/TestCachingHiveMetastore.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/cache/TestCachingHiveMetastore.java
index 11a11a3cdd65..ea764a4f86fb 100644
--- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/cache/TestCachingHiveMetastore.java
+++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/cache/TestCachingHiveMetastore.java
@@ -17,16 +17,20 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListeningExecutorService;
+import io.airlift.log.Logger;
import io.airlift.units.Duration;
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.plugin.hive.HiveConfig;
import io.trino.plugin.hive.HiveMetastoreClosure;
import io.trino.plugin.hive.PartitionStatistics;
import io.trino.plugin.hive.authentication.HiveIdentity;
+import io.trino.plugin.hive.metastore.Column;
+import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.HivePrincipal;
import io.trino.plugin.hive.metastore.MetastoreConfig;
import io.trino.plugin.hive.metastore.Partition;
import io.trino.plugin.hive.metastore.Table;
+import io.trino.plugin.hive.metastore.UnimplementedHiveMetastore;
import io.trino.plugin.hive.metastore.thrift.BridgingHiveMetastore;
import io.trino.plugin.hive.metastore.thrift.MetastoreLocator;
import io.trino.plugin.hive.metastore.thrift.MockThriftMetastoreClient;
@@ -38,26 +42,40 @@
import io.trino.spi.predicate.Range;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.predicate.ValueSet;
+import io.trino.testing.DataProviders;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.PARTITION_KEY;
import static io.trino.plugin.hive.HiveColumnHandle.createBaseColumn;
+import static io.trino.plugin.hive.HiveStorageFormat.TEXTFILE;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static io.trino.plugin.hive.HiveType.HIVE_STRING;
+import static io.trino.plugin.hive.HiveType.toHiveType;
import static io.trino.plugin.hive.metastore.HiveColumnStatistics.createIntegerColumnStatistics;
import static io.trino.plugin.hive.metastore.MetastoreUtil.computePartitionKeyFilter;
+import static io.trino.plugin.hive.metastore.StorageFormat.fromHiveStorageFormat;
import static io.trino.plugin.hive.metastore.cache.CachingHiveMetastore.cachingHiveMetastore;
import static io.trino.plugin.hive.metastore.cache.CachingHiveMetastore.memoizeMetastore;
import static io.trino.plugin.hive.metastore.thrift.MockThriftMetastoreClient.BAD_DATABASE;
@@ -75,7 +93,9 @@
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.testing.TestingConnectorSession.SESSION;
import static java.util.concurrent.Executors.newCachedThreadPool;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.function.Function.identity;
+import static org.apache.hadoop.hive.metastore.TableType.EXTERNAL_TABLE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
@@ -85,6 +105,8 @@
@Test(singleThreaded = true)
public class TestCachingHiveMetastore
{
+ private static final Logger log = Logger.get(TestCachingHiveMetastore.class);
+
private static final HiveIdentity IDENTITY = new HiveIdentity(SESSION);
private static final PartitionStatistics TEST_STATS = PartitionStatistics.builder()
.setColumnStatistics(ImmutableMap.of(TEST_COLUMN, createIntegerColumnStatistics(OptionalLong.empty(), OptionalLong.empty(), OptionalLong.empty(), OptionalLong.empty())))
@@ -468,6 +490,182 @@ public void testCachingHiveMetastoreCreationViaMemoize()
assertEquals(metastore.getDatabaseNamesStats().getRequestCount(), 0);
}
+ @Test(timeOut = 60_000, dataProviderClass = DataProviders.class, dataProvider = "trueFalse")
+ public void testLoadAfterInvalidate(boolean invalidateAll)
+ throws Exception
+ {
+ // State
+ CopyOnWriteArrayList tableColumns = new CopyOnWriteArrayList<>();
+ ConcurrentMap tablePartitionsByName = new ConcurrentHashMap<>();
+ Map tableParameters = new ConcurrentHashMap<>();
+ tableParameters.put("frequent-changing-table-parameter", "parameter initial value");
+
+ // Initialize data
+ String databaseName = "my_database";
+ String tableName = "my_table_name";
+
+ tableColumns.add(new Column("value", toHiveType(VARCHAR), Optional.empty() /* comment */));
+ tableColumns.add(new Column("pk", toHiveType(VARCHAR), Optional.empty() /* comment */));
+
+ List partitionNames = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ String partitionName = "pk=" + i;
+ tablePartitionsByName.put(
+ partitionName,
+ Partition.builder()
+ .setDatabaseName(databaseName)
+ .setTableName(tableName)
+ .setColumns(ImmutableList.copyOf(tableColumns))
+ .setValues(List.of(Integer.toString(i)))
+ .withStorage(storage -> storage.setStorageFormat(fromHiveStorageFormat(TEXTFILE)))
+ .setParameters(Map.of("frequent-changing-partition-parameter", "parameter initial value"))
+ .build());
+ partitionNames.add(partitionName);
+ }
+
+ // Mock metastore
+ CountDownLatch getTableEnteredLatch = new CountDownLatch(1);
+ CountDownLatch getTableReturnLatch = new CountDownLatch(1);
+ CountDownLatch getTableFinishedLatch = new CountDownLatch(1);
+ CountDownLatch getPartitionsByNamesEnteredLatch = new CountDownLatch(1);
+ CountDownLatch getPartitionsByNamesReturnLatch = new CountDownLatch(1);
+ CountDownLatch getPartitionsByNamesFinishedLatch = new CountDownLatch(1);
+
+ HiveMetastore mockMetastore = new UnimplementedHiveMetastore()
+ {
+ @Override
+ public Optional getTable(HiveIdentity identity, String databaseName, String tableName)
+ {
+ Optional table = Optional.of(Table.builder()
+ .setDatabaseName(databaseName)
+ .setTableName(tableName)
+ .setTableType(EXTERNAL_TABLE.name())
+ .setDataColumns(tableColumns)
+ .setParameters(ImmutableMap.copyOf(tableParameters))
+ // Required by 'Table', but not used by view translation.
+ .withStorage(storage -> storage.setStorageFormat(fromHiveStorageFormat(TEXTFILE)))
+ .setOwner(Optional.empty())
+ .build());
+
+ getTableEnteredLatch.countDown(); // 1
+ await(getTableReturnLatch, 10, SECONDS); // 2
+
+ return table;
+ }
+
+ @Override
+ public Map> getPartitionsByNames(HiveIdentity identity, Table table, List partitionNames)
+ {
+ Map> result = new HashMap<>();
+ for (String partitionName : partitionNames) {
+ result.put(partitionName, Optional.ofNullable(tablePartitionsByName.get(partitionName)));
+ }
+
+ getPartitionsByNamesEnteredLatch.countDown(); // loader#1
+ await(getPartitionsByNamesReturnLatch, 10, SECONDS); // loader#2
+
+ return result;
+ }
+
+ @Override
+ public boolean isImpersonationEnabled()
+ {
+ return false;
+ }
+ };
+
+ // Caching metastore
+ metastore = cachingHiveMetastore(
+ mockMetastore,
+ executor,
+ new Duration(5, TimeUnit.MINUTES),
+ Optional.of(new Duration(1, TimeUnit.MINUTES)),
+ 1000);
+
+ // The test. Main thread does modifications and verifies subsequent load sees them. Background thread loads the state into the cache.
+ ExecutorService executor = Executors.newFixedThreadPool(1);
+ try {
+ Future future = executor.submit(() -> {
+ try {
+ Table table;
+
+ table = metastore.getTable(IDENTITY, databaseName, tableName).orElseThrow();
+ getTableFinishedLatch.countDown(); // 3
+
+ metastore.getPartitionsByNames(IDENTITY, table, partitionNames);
+ getPartitionsByNamesFinishedLatch.countDown(); // 6
+
+ return (Void) null;
+ }
+ catch (Throwable e) {
+ log.error(e);
+ throw e;
+ }
+ });
+
+ await(getTableEnteredLatch, 10, SECONDS); // 21
+ tableParameters.put("frequent-changing-table-parameter", "main-thread-put-xyz");
+ if (invalidateAll) {
+ metastore.flushCache();
+ }
+ else {
+ metastore.invalidateTable(databaseName, tableName);
+ }
+ getTableReturnLatch.countDown(); // 2
+ await(getTableFinishedLatch, 10, SECONDS); // 3
+ Table table = metastore.getTable(IDENTITY, databaseName, tableName).orElseThrow();
+ assertThat(table.getParameters())
+ .isEqualTo(Map.of("frequent-changing-table-parameter", "main-thread-put-xyz"));
+
+ await(getPartitionsByNamesEnteredLatch, 10, SECONDS); // 4
+ String partitionName = partitionNames.get(2);
+ Map newPartitionParameters = Map.of("frequent-changing-partition-parameter", "main-thread-put-alice");
+ tablePartitionsByName.put(partitionName,
+ Partition.builder(tablePartitionsByName.get(partitionName))
+ .setParameters(newPartitionParameters)
+ .build());
+ if (invalidateAll) {
+ metastore.flushCache();
+ }
+ else {
+ metastore.invalidateTable(databaseName, tableName);
+ }
+ getPartitionsByNamesReturnLatch.countDown(); // 5
+ await(getPartitionsByNamesFinishedLatch, 10, SECONDS); // 6
+ Map> loadedPartitions = metastore.getPartitionsByNames(IDENTITY, table, partitionNames);
+ assertThat(loadedPartitions.get(partitionName))
+ .isNotNull()
+ .isPresent()
+ .hasValueSatisfying(partition -> assertThat(partition.getParameters()).isEqualTo(newPartitionParameters));
+
+ // verify no failure in the background thread
+ future.get(10, SECONDS);
+ }
+ finally {
+ getTableEnteredLatch.countDown();
+ getTableReturnLatch.countDown();
+ getTableFinishedLatch.countDown();
+ getPartitionsByNamesEnteredLatch.countDown();
+ getPartitionsByNamesReturnLatch.countDown();
+ getPartitionsByNamesFinishedLatch.countDown();
+
+ executor.shutdownNow();
+ executor.awaitTermination(10, SECONDS);
+ }
+ }
+
+ private static void await(CountDownLatch latch, long timeout, TimeUnit unit)
+ {
+ try {
+ boolean awaited = latch.await(timeout, unit);
+ checkState(awaited, "wait timed out");
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException();
+ }
+ }
+
private CachingHiveMetastore createMetastoreWithDirectExecutor(CachingHiveMetastoreConfig config)
{
return (CachingHiveMetastore) cachingHiveMetastore(
diff --git a/pom.xml b/pom.xml
index 436873a83e3c..4f9400a79d32 100644
--- a/pom.xml
+++ b/pom.xml
@@ -388,6 +388,13 @@
${project.version}
+
+ io.trino
+ trino-plugin-toolkit
+ test-jar
+ ${project.version}
+
+
io.trino
trino-product-tests