diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/CachingDirectoryLister.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/CachingDirectoryLister.java index 2457e626ea5f..82df0d87b3fb 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/CachingDirectoryLister.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/CachingDirectoryLister.java @@ -13,12 +13,16 @@ */ package io.trino.plugin.hive; -import com.google.common.cache.CacheBuilder; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.Cache; import com.google.common.cache.Weigher; import com.google.common.collect.ImmutableList; import io.airlift.units.Duration; -import io.trino.collect.cache.NonKeyEvictableCache; +import io.trino.collect.cache.EvictableCacheBuilder; +import io.trino.plugin.hive.metastore.Partition; +import io.trino.plugin.hive.metastore.Storage; import io.trino.plugin.hive.metastore.Table; +import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SchemaTablePrefix; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; @@ -32,17 +36,21 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; -import static io.trino.collect.cache.SafeCaches.buildNonEvictableCacheWithWeakInvalidateAll; +import static java.util.Objects.requireNonNull; +import static org.apache.commons.lang3.StringUtils.isNotEmpty; public class CachingDirectoryLister - implements DirectoryLister + implements DirectoryLister, TableInvalidationCallback { - // TODO (https://github.com/trinodb/trino/issues/10621) this cache lacks invalidation - private final NonKeyEvictableCache> cache; + //TODO use a cache key based on Path & SchemaTableName and iterate over the cache keys + // to deal more efficiently with cache invalidation scenarios for partitioned tables. + private final Cache cache; private final List tablePrefixes; @Inject @@ -53,11 +61,12 @@ public CachingDirectoryLister(HiveConfig hiveClientConfig) public CachingDirectoryLister(Duration expireAfterWrite, long maxSize, List tables) { - this.cache = buildNonEvictableCacheWithWeakInvalidateAll(CacheBuilder.newBuilder() + this.cache = EvictableCacheBuilder.newBuilder() .maximumWeight(maxSize) - .weigher((Weigher>) (key, value) -> value.size()) + .weigher((Weigher) (key, value) -> value.files.map(List::size).orElse(1)) .expireAfterWrite(expireAfterWrite.toMillis(), TimeUnit.MILLISECONDS) - .recordStats()); + .recordStats() + .build(); this.tablePrefixes = tables.stream() .map(CachingDirectoryLister::parseTableName) .collect(toImmutableList()); @@ -82,19 +91,46 @@ private static SchemaTablePrefix parseTableName(String tableName) public RemoteIterator list(FileSystem fs, Table table, Path path) throws IOException { - List files = cache.getIfPresent(path); - if (files != null) { - return simpleRemoteIterator(files); + if (!isCacheEnabledFor(table.getSchemaTableName())) { + return fs.listLocatedStatus(path); } - RemoteIterator iterator = fs.listLocatedStatus(path); - if (tablePrefixes.stream().noneMatch(prefix -> prefix.matches(table.getSchemaTableName()))) { - return iterator; + ValueHolder cachedValueHolder; + try { + cachedValueHolder = cache.get(path, ValueHolder::new); + } + catch (ExecutionException e) { + throw new RuntimeException(e); // cannot happen + } + if (cachedValueHolder.getFiles().isPresent()) { + return simpleRemoteIterator(cachedValueHolder.getFiles().get()); + } + return cachingRemoteIterator(cachedValueHolder, fs.listLocatedStatus(path), path); + } + + @Override + public void invalidate(Table table) + { + if (isCacheEnabledFor(table.getSchemaTableName()) && isLocationPresent(table.getStorage())) { + if (table.getPartitionColumns().isEmpty()) { + cache.invalidate(new Path(table.getStorage().getLocation())); + } + else { + // a partitioned table can have multiple paths in cache + cache.invalidateAll(); + } } - return cachingRemoteIterator(iterator, path); } - private RemoteIterator cachingRemoteIterator(RemoteIterator iterator, Path path) + @Override + public void invalidate(Partition partition) + { + if (isCacheEnabledFor(partition.getSchemaTableName()) && isLocationPresent(partition.getStorage())) { + cache.invalidate(new Path(partition.getStorage().getLocation())); + } + } + + private RemoteIterator cachingRemoteIterator(ValueHolder cachedValueHolder, RemoteIterator iterator, Path path) { return new RemoteIterator<>() { @@ -106,7 +142,9 @@ public boolean hasNext() { boolean hasNext = iterator.hasNext(); if (!hasNext) { - cache.put(path, ImmutableList.copyOf(files)); + // The cachedValueHolder acts as an invalidation guard. If a cache invalidation happens while this iterator goes over + // the files from the specified path, the eventually outdated file listing will not be added anymore to the cache. + cache.asMap().replace(path, cachedValueHolder, new ValueHolder(files)); } return hasNext; } @@ -145,8 +183,6 @@ public LocatedFileStatus next() @Managed public void flushCache() { - // Note: this may not invalidate ongoing loads (https://github.com/trinodb/trino/issues/10512, https://github.com/google/guava/issues/1881) - // This is acceptable, since this operation is invoked manually, and not relied upon for correctness. cache.invalidateAll(); } @@ -179,4 +215,47 @@ public long getRequestCount() { return cache.stats().requestCount(); } + + @VisibleForTesting + boolean isCached(Path path) + { + ValueHolder cached = cache.getIfPresent(path); + return cached != null && cached.getFiles().isPresent(); + } + + private boolean isCacheEnabledFor(SchemaTableName schemaTableName) + { + return tablePrefixes.stream().anyMatch(prefix -> prefix.matches(schemaTableName)); + } + + private static boolean isLocationPresent(Storage storage) + { + // Some Hive table types (e.g.: views) do not have a storage location + return storage.getOptionalLocation().isPresent() && isNotEmpty(storage.getLocation()); + } + + /** + * The class enforces intentionally object identity semantics for the value holder, + * not value-based class semantics to correctly act as an invalidation guard in the + * cache. + */ + private static class ValueHolder + { + private final Optional> files; + + public ValueHolder() + { + files = Optional.empty(); + } + + public ValueHolder(List files) + { + this.files = Optional.of(ImmutableList.copyOf(requireNonNull(files, "files is null"))); + } + + public Optional> getFiles() + { + return files; + } + } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/CachingDirectoryListerModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/CachingDirectoryListerModule.java new file mode 100644 index 000000000000..4d91045fdf1e --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/CachingDirectoryListerModule.java @@ -0,0 +1,48 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive; + +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Scopes; + +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +public class CachingDirectoryListerModule + implements Module +{ + private final Optional cachingDirectoryLister; + + public CachingDirectoryListerModule(Optional cachingDirectoryLister) + { + this.cachingDirectoryLister = requireNonNull(cachingDirectoryLister, "cachingDirectoryLister is null"); + } + + @Override + public void configure(Binder binder) + { + if (cachingDirectoryLister.isPresent()) { + CachingDirectoryLister directoryLister = cachingDirectoryLister.get(); + binder.bind(DirectoryLister.class).toInstance(directoryLister); + binder.bind(TableInvalidationCallback.class).toInstance(directoryLister); + } + else { + binder.bind(CachingDirectoryLister.class).in(Scopes.SINGLETON); + binder.bind(DirectoryLister.class).to(CachingDirectoryLister.class); + binder.bind(TableInvalidationCallback.class).to(CachingDirectoryLister.class); + } + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java index 9419a743d084..aea493116109 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java @@ -68,6 +68,7 @@ public class HiveMetadataFactory private final Optional hiveTransactionHeartbeatInterval; private final HiveTableRedirectionsProvider tableRedirectionsProvider; private final ScheduledExecutorService heartbeatService; + private final TableInvalidationCallback tableInvalidationCallback; @Inject public HiveMetadataFactory( @@ -88,7 +89,8 @@ public HiveMetadataFactory( Set systemTableProviders, HiveMaterializedViewMetadataFactory hiveMaterializedViewMetadataFactory, AccessControlMetadataFactory accessControlMetadataFactory, - HiveTableRedirectionsProvider tableRedirectionsProvider) + HiveTableRedirectionsProvider tableRedirectionsProvider, + TableInvalidationCallback tableInvalidationCallback) { this( catalogName, @@ -118,7 +120,8 @@ public HiveMetadataFactory( systemTableProviders, hiveMaterializedViewMetadataFactory, accessControlMetadataFactory, - tableRedirectionsProvider); + tableRedirectionsProvider, + tableInvalidationCallback); } public HiveMetadataFactory( @@ -149,7 +152,8 @@ public HiveMetadataFactory( Set systemTableProviders, HiveMaterializedViewMetadataFactory hiveMaterializedViewMetadataFactory, AccessControlMetadataFactory accessControlMetadataFactory, - HiveTableRedirectionsProvider tableRedirectionsProvider) + HiveTableRedirectionsProvider tableRedirectionsProvider, + TableInvalidationCallback tableInvalidationCallback) { this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.skipDeletionForAlter = skipDeletionForAlter; @@ -186,6 +190,7 @@ public HiveMetadataFactory( updateExecutor = new BoundedExecutor(executorService, maxConcurrentMetastoreUpdates); } this.heartbeatService = requireNonNull(heartbeatService, "heartbeatService is null"); + this.tableInvalidationCallback = requireNonNull(tableInvalidationCallback, "tableInvalidationCallback is null"); } @Override @@ -204,7 +209,8 @@ public TransactionalMetadata create(ConnectorIdentity identity, boolean autoComm skipTargetCleanupOnRollback, deleteSchemaLocationsFallback, hiveTransactionHeartbeatInterval, - heartbeatService); + heartbeatService, + tableInvalidationCallback); return new HiveMetadata( catalogName, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java index 35a7fab6e688..870b1eb687c5 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java @@ -58,7 +58,6 @@ public class HiveModule @Override public void configure(Binder binder) { - binder.bind(DirectoryLister.class).to(CachingDirectoryLister.class).in(Scopes.SINGLETON); configBinder(binder).bindConfig(HiveConfig.class); configBinder(binder).bindConfig(MetastoreConfig.class); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java index 6ac6dfa7dc66..4d4856bdb4d8 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java @@ -78,10 +78,10 @@ private InternalHiveConnectorFactory() {} public static Connector createConnector(String catalogName, Map config, ConnectorContext context, Module module) { - return createConnector(catalogName, config, context, module, Optional.empty()); + return createConnector(catalogName, config, context, module, Optional.empty(), Optional.empty()); } - public static Connector createConnector(String catalogName, Map config, ConnectorContext context, Module module, Optional metastore) + public static Connector createConnector(String catalogName, Map config, ConnectorContext context, Module module, Optional metastore, Optional cachingDirectoryLister) { requireNonNull(config, "config is null"); @@ -95,6 +95,7 @@ public static Connector createConnector(String catalogName, Map new JsonModule(), new TypeDeserializerModule(context.getTypeManager()), new HiveModule(), + new CachingDirectoryListerModule(cachingDirectoryLister), new HiveHdfsModule(), new HiveS3Module(), new HiveGcsModule(), diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/TableInvalidationCallback.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/TableInvalidationCallback.java new file mode 100644 index 000000000000..d648c962e6a9 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/TableInvalidationCallback.java @@ -0,0 +1,36 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive; + +import io.trino.plugin.hive.metastore.Partition; +import io.trino.plugin.hive.metastore.Table; + +public interface TableInvalidationCallback +{ + TableInvalidationCallback NOOP = new TableInvalidationCallback() { + @Override + public void invalidate(Partition partition) + { + } + + @Override + public void invalidate(Table table) + { + } + }; + + void invalidate(Partition partition); + + void invalidate(Table table); +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java index 40cabb1067a3..36741733c426 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java @@ -35,6 +35,7 @@ import io.trino.plugin.hive.PartitionNotFoundException; import io.trino.plugin.hive.PartitionStatistics; import io.trino.plugin.hive.TableAlreadyExistsException; +import io.trino.plugin.hive.TableInvalidationCallback; import io.trino.plugin.hive.acid.AcidOperation; import io.trino.plugin.hive.acid.AcidTransaction; import io.trino.plugin.hive.metastore.HivePrivilegeInfo.HivePrivilege; @@ -68,6 +69,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -140,6 +142,7 @@ public class SemiTransactionalHiveMetastore private final boolean deleteSchemaLocationsFallback; private final ScheduledExecutorService heartbeatExecutor; private final Optional configuredTransactionHeartbeatInterval; + private final TableInvalidationCallback tableInvalidationCallback; private boolean throwOnCleanupFailure; @@ -176,7 +179,8 @@ public SemiTransactionalHiveMetastore( boolean skipTargetCleanupOnRollback, boolean deleteSchemaLocationsFallback, Optional hiveTransactionHeartbeatInterval, - ScheduledExecutorService heartbeatService) + ScheduledExecutorService heartbeatService, + TableInvalidationCallback tableInvalidationCallback) { this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.delegate = requireNonNull(delegate, "delegate is null"); @@ -188,6 +192,7 @@ public SemiTransactionalHiveMetastore( this.deleteSchemaLocationsFallback = deleteSchemaLocationsFallback; this.heartbeatExecutor = heartbeatService; this.configuredTransactionHeartbeatInterval = requireNonNull(hiveTransactionHeartbeatInterval, "hiveTransactionHeartbeatInterval is null"); + this.tableInvalidationCallback = requireNonNull(tableInvalidationCallback, "tableInvalidationCallback is null"); } public synchronized List getAllDatabases() @@ -547,7 +552,16 @@ public synchronized void replaceTable(String databaseName, String tableName, Tab public synchronized void renameTable(String databaseName, String tableName, String newDatabaseName, String newTableName) { - setExclusive((delegate, hdfsEnvironment) -> delegate.renameTable(databaseName, tableName, newDatabaseName, newTableName)); + setExclusive((delegate, hdfsEnvironment) -> { + Optional oldTable = delegate.getTable(databaseName, tableName); + try { + delegate.renameTable(databaseName, tableName, newDatabaseName, newTableName); + } + finally { + // perform explicit invalidation for the table in exclusive metastore operations + oldTable.ifPresent(tableInvalidationCallback::invalidate); + } + }); } public synchronized void commentTable(String databaseName, String tableName, Optional comment) @@ -1504,6 +1518,9 @@ private void commitShared() throw t; } + finally { + committer.executeTableInvalidationCallback(); + } try { // After this line, operations are no longer reversible. @@ -1544,6 +1561,10 @@ private class Committer private final List cleanUpTasksForAbort = new ArrayList<>(); private final List renameTasksForAbort = new ArrayList<>(); + // Notify callback about changes on the schema tables / partitions + private final Set
tablesToInvalidate = new LinkedHashSet<>(); + private final Set partitionsToInvalidate = new LinkedHashSet<>(); + // Metastore private final List addTableOperations = new ArrayList<>(); private final List alterTableOperations = new ArrayList<>(); @@ -1566,7 +1587,16 @@ private void prepareDropTable(SchemaTableName schemaTableName) { metastoreDeleteOperations.add(new IrreversibleMetastoreOperation( format("drop table %s", schemaTableName), - () -> delegate.dropTable(schemaTableName.getSchemaName(), schemaTableName.getTableName(), true))); + () -> { + Optional
droppedTable = delegate.getTable(schemaTableName.getSchemaName(), schemaTableName.getTableName()); + try { + delegate.dropTable(schemaTableName.getSchemaName(), schemaTableName.getTableName(), true); + } + finally { + // perform explicit invalidation for the table in irreversible metastore operation + droppedTable.ifPresent(tableInvalidationCallback::invalidate); + } + })); } private void prepareAlterTable(HdfsContext hdfsContext, String queryId, TableAndMore tableAndMore) @@ -1579,6 +1609,7 @@ private void prepareAlterTable(HdfsContext hdfsContext, String queryId, TableAnd .orElseThrow(() -> new TrinoException(TRANSACTION_CONFLICT, "The table that this transaction modified was deleted in another transaction. " + table.getSchemaTableName())); String oldTableLocation = oldTable.getStorage().getLocation(); Path oldTablePath = new Path(oldTableLocation); + tablesToInvalidate.add(oldTable); cleanExtraOutputFiles(hdfsContext, queryId, tableAndMore); @@ -1695,6 +1726,7 @@ private void prepareInsertExistingTable(HdfsContext context, String queryId, Tab deleteOnly = false; Table table = tableAndMore.getTable(); Path targetPath = new Path(table.getStorage().getLocation()); + tablesToInvalidate.add(table); Path currentPath = tableAndMore.getCurrentLocation().get(); cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, targetPath, false)); @@ -1726,6 +1758,7 @@ private void prepareDeleteRowsFromExistingTable(HdfsContext context, TableAndMor deleteOnly = false; Table table = deletionState.getTable(); + tablesToInvalidate.add(table); checkArgument(currentHiveTransaction.isPresent(), "currentHiveTransaction isn't present"); AcidTransaction transaction = currentHiveTransaction.get().getTransaction(); checkArgument(transaction.isDelete(), "transaction should be delete, but is %s", transaction); @@ -1809,6 +1842,7 @@ private void prepareUpdateExistingTable(HdfsContext context, TableAndMore tableA checkArgument(!partitionAndStatementIds.isEmpty(), "partitionAndStatementIds is empty"); Table table = updateState.getTable(); + tablesToInvalidate.add(table); checkArgument(currentHiveTransaction.isPresent(), "currentHiveTransaction isn't present"); AcidTransaction transaction = currentHiveTransaction.get().getTransaction(); checkArgument(transaction.isUpdate(), "transaction should be update, but is %s", transaction); @@ -1831,7 +1865,16 @@ private void prepareDropPartition(SchemaTableName schemaTableName, List { metastoreDeleteOperations.add(new IrreversibleMetastoreOperation( format("drop partition %s.%s %s", schemaTableName.getSchemaName(), schemaTableName.getTableName(), partitionValues), - () -> delegate.dropPartition(schemaTableName.getSchemaName(), schemaTableName.getTableName(), partitionValues, deleteData))); + () -> { + Optional droppedPartition = delegate.getPartition(schemaTableName.getSchemaName(), schemaTableName.getTableName(), partitionValues); + try { + delegate.dropPartition(schemaTableName.getSchemaName(), schemaTableName.getTableName(), partitionValues, deleteData); + } + finally { + // perform explicit invalidation for the partition in irreversible metastore operation + droppedPartition.ifPresent(tableInvalidationCallback::invalidate); + } + })); } private void prepareAlterPartition(HdfsContext hdfsContext, String queryId, PartitionAndMore partitionAndMore) @@ -1839,6 +1882,7 @@ private void prepareAlterPartition(HdfsContext hdfsContext, String queryId, Part deleteOnly = false; Partition partition = partitionAndMore.getPartition(); + partitionsToInvalidate.add(partition); String targetLocation = partition.getStorage().getLocation(); Optional oldPartition = delegate.getPartition(partition.getDatabaseName(), partition.getTableName(), partition.getValues()); if (oldPartition.isEmpty()) { @@ -2031,6 +2075,7 @@ private void prepareInsertExistingPartition(HdfsContext hdfsContext, String quer deleteOnly = false; Partition partition = partitionAndMore.getPartition(); + partitionsToInvalidate.add(partition); Path targetPath = new Path(partition.getStorage().getLocation()); Path currentPath = partitionAndMore.getCurrentLocation(); cleanUpTasksForAbort.add(new DirectoryCleanUpTask(hdfsContext, targetPath, false)); @@ -2188,6 +2233,12 @@ private void executeUpdateStatisticsOperations(AcidTransaction transaction) } } + private void executeTableInvalidationCallback() + { + tablesToInvalidate.forEach(tableInvalidationCallback::invalidate); + partitionsToInvalidate.forEach(tableInvalidationCallback::invalidate); + } + private void undoAddPartitionOperations() { for (PartitionAdder partitionAdder : partitionAdders.values()) { diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java index 1bbae553875f..e6fff319cfad 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java @@ -865,7 +865,8 @@ public Optional getMaterializedView(Connect } }, SqlStandardAccessControlMetadata::new, - NO_REDIRECTIONS); + NO_REDIRECTIONS, + TableInvalidationCallback.NOOP); transactionManager = new HiveTransactionManager(metadataFactory); splitManager = new HiveSplitManager( transactionManager, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java index c5f549248ec5..750ab3de1eec 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java @@ -220,7 +220,8 @@ protected void setup(String host, int port, String databaseName, boolean s3Selec new PropertiesSystemTableProvider()), new DefaultHiveMaterializedViewMetadataFactory(), SqlStandardAccessControlMetadata::new, - NO_REDIRECTIONS); + NO_REDIRECTIONS, + TableInvalidationCallback.NOOP); transactionManager = new HiveTransactionManager(metadataFactory); splitManager = new HiveSplitManager( transactionManager, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java index 59cbe6d1e78c..93d9288bd6e2 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java @@ -109,6 +109,7 @@ public static class Builder> .setMetastoreUser("test")); }; private Module module = EMPTY_MODULE; + private Optional cachingDirectoryLister = Optional.empty(); protected Builder() { @@ -175,6 +176,12 @@ public SELF setModule(Module module) return self(); } + public SELF setCachingDirectoryLister(CachingDirectoryLister cachingDirectoryLister) + { + this.cachingDirectoryLister = Optional.ofNullable(cachingDirectoryLister); + return self(); + } + @Override public DistributedQueryRunner build() throws Exception @@ -188,7 +195,7 @@ public DistributedQueryRunner build() queryRunner.createCatalog("tpch", "tpch"); HiveMetastore metastore = this.metastore.apply(queryRunner); - queryRunner.installPlugin(new TestingHivePlugin(metastore, module)); + queryRunner.installPlugin(new TestingHivePlugin(metastore, module, cachingDirectoryLister)); if (!exchangeManagerProperties.isEmpty()) { queryRunner.installPlugin(new FileSystemExchangePlugin()); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestCachingDirectoryLister.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestCachingDirectoryLister.java new file mode 100644 index 000000000000..08084f75fecb --- /dev/null +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestCachingDirectoryLister.java @@ -0,0 +1,345 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.airlift.units.Duration; +import io.trino.plugin.hive.metastore.Table; +import io.trino.plugin.hive.metastore.file.FileHiveMetastore; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.MaterializedRow; +import io.trino.testing.QueryRunner; +import org.testng.annotations.Test; + +import java.nio.file.Path; +import java.util.List; +import java.util.NoSuchElementException; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static io.trino.plugin.hive.HiveQueryRunner.TPCH_SCHEMA; +import static io.trino.plugin.hive.metastore.file.FileHiveMetastore.createTestingFileHiveMetastore; +import static java.lang.String.format; +import static java.nio.file.Files.createTempDirectory; +import static org.assertj.core.api.Assertions.assertThat; + +// some tests may invalidate the whole cache affecting therefore other concurrent tests +@Test(singleThreaded = true) +public class TestCachingDirectoryLister + extends AbstractTestQueryFramework +{ + private CachingDirectoryLister cachingDirectoryLister; + private FileHiveMetastore fileHiveMetastore; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + Path temporaryMetastoreDirectory = createTempDirectory(null); + closeAfterClass(() -> deleteRecursively(temporaryMetastoreDirectory, ALLOW_INSECURE)); + + cachingDirectoryLister = new CachingDirectoryLister(Duration.valueOf("5m"), 1_000_000L, List.of("tpch.*")); + + return HiveQueryRunner.builder() + .setHiveProperties(ImmutableMap.of( + "hive.allow-register-partition-procedure", "true")) + .setMetastore(distributedQueryRunner -> fileHiveMetastore = createTestingFileHiveMetastore(temporaryMetastoreDirectory.toFile())) + .setCachingDirectoryLister(cachingDirectoryLister) + .build(); + } + + @Test + public void testCacheInvalidationIsAppliedSpecificallyOnTheNonPartitionedTableBeingChanged() + { + assertUpdate("CREATE TABLE partial_cache_invalidation_table1 (col1 int) WITH (format = 'ORC')"); + assertUpdate("INSERT INTO partial_cache_invalidation_table1 VALUES (1), (2), (3)", 3); + // The listing for the invalidate_non_partitioned_table1 should be in the directory cache after this call + assertQuery("SELECT sum(col1) FROM partial_cache_invalidation_table1", "VALUES (6)"); + org.apache.hadoop.fs.Path cachedTable1Location = getTableLocation(TPCH_SCHEMA, "partial_cache_invalidation_table1"); + assertThat(cachingDirectoryLister.isCached(cachedTable1Location)).isTrue(); + + assertUpdate("CREATE TABLE partial_cache_invalidation_table2 (col1 int) WITH (format = 'ORC')"); + assertUpdate("INSERT INTO partial_cache_invalidation_table2 VALUES (11), (12)", 2); + // The listing for the invalidate_non_partitioned_table2 should be in the directory cache after this call + assertQuery("SELECT sum(col1) FROM partial_cache_invalidation_table2", "VALUES (23)"); + org.apache.hadoop.fs.Path cachedTable2Location = getTableLocation(TPCH_SCHEMA, "partial_cache_invalidation_table2"); + assertThat(cachingDirectoryLister.isCached(cachedTable2Location)).isTrue(); + + assertUpdate("INSERT INTO partial_cache_invalidation_table1 VALUES (4), (5)", 2); + // Inserting into the invalidate_non_partitioned_table1 should invalidate only the cached listing of the files belonging only to this table. + assertThat(cachingDirectoryLister.isCached(cachedTable1Location)).isFalse(); + assertThat(cachingDirectoryLister.isCached(cachedTable2Location)).isTrue(); + + assertQuery("SELECT sum(col1) FROM partial_cache_invalidation_table1", "VALUES (15)"); + assertQuery("SELECT sum(col1) FROM partial_cache_invalidation_table2", "VALUES (23)"); + + assertUpdate("DROP TABLE partial_cache_invalidation_table1"); + assertUpdate("DROP TABLE partial_cache_invalidation_table2"); + } + + @Test + public void testCacheInvalidationIsAppliedOnTheEntireCacheOnPartitionedTableDrop() + { + assertUpdate("CREATE TABLE full_cache_invalidation_non_partitioned_table (col1 int) WITH (format = 'ORC')"); + assertUpdate("INSERT INTO full_cache_invalidation_non_partitioned_table VALUES (1), (2), (3)", 3); + // The listing for the invalidate_non_partitioned_table1 should be in the directory cache after this call + assertQuery("SELECT sum(col1) FROM full_cache_invalidation_non_partitioned_table", "VALUES (6)"); + org.apache.hadoop.fs.Path nonPartitionedTableLocation = getTableLocation(TPCH_SCHEMA, "full_cache_invalidation_non_partitioned_table"); + assertThat(cachingDirectoryLister.isCached(nonPartitionedTableLocation)).isTrue(); + + assertUpdate("CREATE TABLE full_cache_invalidation_partitioned_table (col1 int, col2 varchar) WITH (format = 'ORC', partitioned_by = ARRAY['col2'])"); + assertUpdate("INSERT INTO full_cache_invalidation_partitioned_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2')", 4); + assertQuery("SELECT col2, sum(col1) FROM full_cache_invalidation_partitioned_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7)"); + org.apache.hadoop.fs.Path partitionedTableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "full_cache_invalidation_partitioned_table", ImmutableList.of("group1")); + org.apache.hadoop.fs.Path partitionedTableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "full_cache_invalidation_partitioned_table", ImmutableList.of("group2")); + assertThat(cachingDirectoryLister.isCached(partitionedTableGroup1PartitionLocation)).isTrue(); + assertThat(cachingDirectoryLister.isCached(partitionedTableGroup2PartitionLocation)).isTrue(); + + assertUpdate("INSERT INTO full_cache_invalidation_non_partitioned_table VALUES (4), (5)", 2); + // Inserting into the invalidate_non_partitioned_table1 should invalidate only the cached listing of the files belonging only to this table. + assertThat(cachingDirectoryLister.isCached(nonPartitionedTableLocation)).isFalse(); + assertThat(cachingDirectoryLister.isCached(partitionedTableGroup1PartitionLocation)).isTrue(); + assertThat(cachingDirectoryLister.isCached(partitionedTableGroup2PartitionLocation)).isTrue(); + + assertUpdate("DROP TABLE full_cache_invalidation_partitioned_table"); + // Invalidation of the partitioned table causes the full invalidation of the cache + assertThat(cachingDirectoryLister.isCached(nonPartitionedTableLocation)).isFalse(); + assertThat(cachingDirectoryLister.isCached(partitionedTableGroup1PartitionLocation)).isFalse(); + assertThat(cachingDirectoryLister.isCached(partitionedTableGroup2PartitionLocation)).isFalse(); + + assertQuery("SELECT sum(col1) FROM full_cache_invalidation_non_partitioned_table", "VALUES (15)"); + + assertUpdate("DROP TABLE full_cache_invalidation_non_partitioned_table"); + } + + @Test + public void testCacheInvalidationIsAppliedSpecificallyOnPartitionDropped() + { + assertUpdate("CREATE TABLE partition_path_cache_invalidation_non_partitioned_table (col1 int) WITH (format = 'ORC')"); + assertUpdate("INSERT INTO partition_path_cache_invalidation_non_partitioned_table VALUES (1), (2), (3)", 3); + // The listing for the invalidate_non_partitioned_table1 should be in the directory cache after this call + assertQuery("SELECT sum(col1) FROM partition_path_cache_invalidation_non_partitioned_table", "VALUES (6)"); + org.apache.hadoop.fs.Path nonPartitionedTableLocation = getTableLocation(TPCH_SCHEMA, "partition_path_cache_invalidation_non_partitioned_table"); + assertThat(cachingDirectoryLister.isCached(nonPartitionedTableLocation)).isTrue(); + + assertUpdate("CREATE TABLE partition_path_cache_invalidation_partitioned_table (col1 int, col2 varchar) WITH (format = 'ORC', partitioned_by = ARRAY['col2'])"); + assertUpdate("INSERT INTO partition_path_cache_invalidation_partitioned_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2')", 4); + assertQuery("SELECT col2, sum(col1) FROM partition_path_cache_invalidation_partitioned_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7)"); + org.apache.hadoop.fs.Path partitionedTableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "partition_path_cache_invalidation_partitioned_table", ImmutableList.of("group1")); + org.apache.hadoop.fs.Path partitionedTableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "partition_path_cache_invalidation_partitioned_table", ImmutableList.of("group2")); + assertThat(cachingDirectoryLister.isCached(partitionedTableGroup1PartitionLocation)).isTrue(); + assertThat(cachingDirectoryLister.isCached(partitionedTableGroup2PartitionLocation)).isTrue(); + + assertUpdate("DELETE FROM partition_path_cache_invalidation_partitioned_table WHERE col2='group1'"); + assertThat(cachingDirectoryLister.isCached(nonPartitionedTableLocation)).isTrue(); + assertThat(cachingDirectoryLister.isCached(partitionedTableGroup1PartitionLocation)).isFalse(); + assertThat(cachingDirectoryLister.isCached(partitionedTableGroup2PartitionLocation)).isTrue(); + + assertQuery("SELECT sum(col1) FROM partition_path_cache_invalidation_non_partitioned_table", "VALUES (6)"); + assertQuery("SELECT col2, sum(col1) FROM partition_path_cache_invalidation_partitioned_table GROUP BY col2", "VALUES ('group2', 7)"); + + assertUpdate("DROP TABLE partition_path_cache_invalidation_non_partitioned_table"); + assertUpdate("DROP TABLE partition_path_cache_invalidation_partitioned_table"); + } + + @Test + public void testInsertIntoNonPartitionedTable() + { + assertUpdate("CREATE TABLE insert_into_non_partitioned_table (col1 int) WITH (format = 'ORC')"); + assertUpdate("INSERT INTO insert_into_non_partitioned_table VALUES (1), (2), (3)", 3); + // The listing for the table should be in the directory cache after this call + assertQuery("SELECT sum(col1) FROM insert_into_non_partitioned_table", "VALUES (6)"); + assertThat(cachingDirectoryLister.isCached(getTableLocation(TPCH_SCHEMA, "insert_into_non_partitioned_table"))).isTrue(); + assertUpdate("INSERT INTO insert_into_non_partitioned_table VALUES (4), (5)", 2); + // Inserting into the table should invalidate the cached listing of the files belonging to the table. + assertThat(cachingDirectoryLister.isCached(getTableLocation(TPCH_SCHEMA, "insert_into_non_partitioned_table"))).isFalse(); + + assertQuery("SELECT sum(col1) FROM insert_into_non_partitioned_table", "VALUES (15)"); + + assertUpdate("DROP TABLE insert_into_non_partitioned_table"); + } + + @Test + public void testInsertIntoPartitionedTable() + { + assertUpdate("CREATE TABLE insert_into_partitioned_table (col1 int, col2 varchar) WITH (format = 'ORC', partitioned_by = ARRAY['col2'])"); + assertUpdate("INSERT INTO insert_into_partitioned_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2')", 4); + // The listing for the table partitions should be in the directory cache after this call + assertQuery("SELECT col2, sum(col1) FROM insert_into_partitioned_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7)"); + org.apache.hadoop.fs.Path tableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "insert_into_partitioned_table", ImmutableList.of("group1")); + org.apache.hadoop.fs.Path tableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "insert_into_partitioned_table", ImmutableList.of("group2")); + assertThat(cachingDirectoryLister.isCached(tableGroup1PartitionLocation)).isTrue(); + assertThat(cachingDirectoryLister.isCached(tableGroup2PartitionLocation)).isTrue(); + + assertUpdate("INSERT INTO insert_into_partitioned_table VALUES (5, 'group2'), (6, 'group3')", 2); + assertThat(cachingDirectoryLister.isCached(tableGroup1PartitionLocation)).isTrue(); + // Inserting into the table should invalidate the cached listing of the partitions affected by the insert statement + assertThat(cachingDirectoryLister.isCached(tableGroup2PartitionLocation)).isFalse(); + assertQuery("SELECT col2, sum(col1) FROM insert_into_partitioned_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 12), ('group3', 6)"); + + assertUpdate("DROP TABLE insert_into_partitioned_table"); + } + + @Test + public void testDropPartition() + { + assertUpdate("CREATE TABLE delete_from_partitioned_table (col1 int, col2 varchar) WITH (format = 'ORC', partitioned_by = ARRAY['col2'])"); + assertUpdate("INSERT INTO delete_from_partitioned_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2'), (5, 'group3')", 5); + // The listing for the table partitions should be in the directory cache after this call + assertQuery("SELECT col2, sum(col1) FROM delete_from_partitioned_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7), ('group3', 5)"); + org.apache.hadoop.fs.Path tableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("group1")); + org.apache.hadoop.fs.Path tableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("group2")); + org.apache.hadoop.fs.Path tableGroup3PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("group3")); + assertThat(cachingDirectoryLister.isCached(tableGroup1PartitionLocation)).isTrue(); + assertThat(cachingDirectoryLister.isCached(tableGroup2PartitionLocation)).isTrue(); + assertUpdate("DELETE FROM delete_from_partitioned_table WHERE col2 = 'group1' OR col2 = 'group2'"); + // Deleting from the table should invalidate the cached listing of the partitions dropped from the table. + assertThat(cachingDirectoryLister.isCached(tableGroup1PartitionLocation)).isFalse(); + assertThat(cachingDirectoryLister.isCached(tableGroup2PartitionLocation)).isFalse(); + assertThat(cachingDirectoryLister.isCached(tableGroup3PartitionLocation)).isTrue(); + assertQuery("SELECT col2, sum(col1) FROM delete_from_partitioned_table GROUP BY col2", "VALUES ('group3', 5)"); + + assertUpdate("DROP TABLE delete_from_partitioned_table"); + } + + @Test + public void testDropMultiLevelPartition() + { + assertUpdate("CREATE TABLE delete_from_partitioned_table (clicks bigint, day date, country varchar) WITH (format = 'ORC', partitioned_by = ARRAY['day', 'country'])"); + assertUpdate("INSERT INTO delete_from_partitioned_table VALUES (1000, DATE '2022-02-01', 'US'), (2000, DATE '2022-02-01', 'US'), (4000, DATE '2022-02-02', 'US'), (1500, DATE '2022-02-01', 'AT'), (2500, DATE '2022-02-02', 'AT')", 5); + // The listing for the table partitions should be in the directory cache after this call + assertQuery("SELECT day, country, sum(clicks) FROM delete_from_partitioned_table GROUP BY day, country", "VALUES (DATE '2022-02-01', 'US', 3000), (DATE '2022-02-02', 'US', 4000), (DATE '2022-02-01', 'AT', 1500), (DATE '2022-02-02', 'AT', 2500)"); + org.apache.hadoop.fs.Path table20220201UsPartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("2022-02-01", "US")); + org.apache.hadoop.fs.Path table20220202UsPartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("2022-02-02", "US")); + org.apache.hadoop.fs.Path table20220201AtPartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("2022-02-01", "AT")); + org.apache.hadoop.fs.Path table20220202AtPartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("2022-02-02", "AT")); + assertThat(cachingDirectoryLister.isCached(table20220201UsPartitionLocation)).isTrue(); + assertThat(cachingDirectoryLister.isCached(table20220202UsPartitionLocation)).isTrue(); + assertThat(cachingDirectoryLister.isCached(table20220201AtPartitionLocation)).isTrue(); + assertThat(cachingDirectoryLister.isCached(table20220202AtPartitionLocation)).isTrue(); + assertUpdate("DELETE FROM delete_from_partitioned_table WHERE day = DATE '2022-02-01'"); + // Deleting from the table should invalidate the cached listing of the partitions dropped from the table. + assertThat(cachingDirectoryLister.isCached(table20220201UsPartitionLocation)).isFalse(); + assertThat(cachingDirectoryLister.isCached(table20220202UsPartitionLocation)).isTrue(); + assertThat(cachingDirectoryLister.isCached(table20220201AtPartitionLocation)).isFalse(); + assertThat(cachingDirectoryLister.isCached(table20220202AtPartitionLocation)).isTrue(); + assertUpdate("DELETE FROM delete_from_partitioned_table WHERE country = 'US'"); + assertThat(cachingDirectoryLister.isCached(table20220202UsPartitionLocation)).isFalse(); + assertThat(cachingDirectoryLister.isCached(table20220202AtPartitionLocation)).isTrue(); + assertQuery("SELECT day, country, sum(clicks) FROM delete_from_partitioned_table GROUP BY day, country", "VALUES (DATE '2022-02-02', 'AT', 2500)"); + + assertUpdate("DROP TABLE delete_from_partitioned_table"); + } + + @Test + public void testUnregisterRegisterPartition() + { + assertUpdate("CREATE TABLE register_unregister_partition_table (col1 int, col2 varchar) WITH (format = 'ORC', partitioned_by = ARRAY['col2'])"); + assertUpdate("INSERT INTO register_unregister_partition_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2')", 4); + // The listing for the table partitions should be in the directory cache after this call + assertQuery("SELECT col2, sum(col1) FROM register_unregister_partition_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7)"); + org.apache.hadoop.fs.Path tableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "register_unregister_partition_table", ImmutableList.of("group1")); + org.apache.hadoop.fs.Path tableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "register_unregister_partition_table", ImmutableList.of("group2")); + assertThat(cachingDirectoryLister.isCached(tableGroup1PartitionLocation)).isTrue(); + assertThat(cachingDirectoryLister.isCached(tableGroup2PartitionLocation)).isTrue(); + + List paths = getQueryRunner().execute(getSession(), "SELECT \"$path\" FROM register_unregister_partition_table WHERE col2 = 'group1' LIMIT 1").toTestTypes().getMaterializedRows(); + String group1PartitionPath = new org.apache.hadoop.fs.Path((String) paths.get(0).getField(0)).getParent().toString(); + + assertUpdate(format("CALL system.unregister_partition('%s', '%s', ARRAY['col2'], ARRAY['group1'])", TPCH_SCHEMA, "register_unregister_partition_table")); + // Unregistering the partition in the table should invalidate the cached listing of all the partitions belonging to the table. + assertThat(cachingDirectoryLister.isCached(tableGroup1PartitionLocation)).isFalse(); + assertThat(cachingDirectoryLister.isCached(tableGroup2PartitionLocation)).isTrue(); + assertQuery("SELECT col2, sum(col1) FROM register_unregister_partition_table GROUP BY col2", "VALUES ('group2', 7)"); + assertThat(cachingDirectoryLister.isCached(tableGroup2PartitionLocation)).isTrue(); + + assertUpdate(format("CALL system.register_partition('%s', '%s', ARRAY['col2'], ARRAY['group1'], '%s')", TPCH_SCHEMA, "register_unregister_partition_table", group1PartitionPath)); + // Registering the partition in the table should invalidate the cached listing of all the partitions belonging to the table. + assertThat(cachingDirectoryLister.isCached(tableGroup1PartitionLocation)).isFalse(); + assertThat(cachingDirectoryLister.isCached(tableGroup2PartitionLocation)).isTrue(); + + assertQuery("SELECT col2, sum(col1) FROM register_unregister_partition_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7)"); + + assertUpdate("DROP TABLE register_unregister_partition_table"); + } + + @Test + public void testRenameTable() + { + assertUpdate("CREATE TABLE table_to_be_renamed (col1 int) WITH (format = 'ORC')"); + assertUpdate("INSERT INTO table_to_be_renamed VALUES (1), (2), (3)", 3); + // The listing for the table should be in the directory cache after this call + assertQuery("SELECT sum(col1) FROM table_to_be_renamed", "VALUES (6)"); + org.apache.hadoop.fs.Path tableLocation = getTableLocation(TPCH_SCHEMA, "table_to_be_renamed"); + assertThat(cachingDirectoryLister.isCached(tableLocation)).isTrue(); + assertUpdate("ALTER TABLE table_to_be_renamed RENAME TO table_renamed"); + // Altering the table should invalidate the cached listing of the files belonging to the table. + assertThat(cachingDirectoryLister.isCached(tableLocation)).isFalse(); + + assertUpdate("DROP TABLE table_renamed"); + } + + @Test + public void testDropTable() + { + assertUpdate("CREATE TABLE table_to_be_dropped (col1 int) WITH (format = 'ORC')"); + assertUpdate("INSERT INTO table_to_be_dropped VALUES (1), (2), (3)", 3); + // The listing for the table should be in the directory cache after this call + assertQuery("SELECT sum(col1) FROM table_to_be_dropped", "VALUES (6)"); + org.apache.hadoop.fs.Path tableLocation = getTableLocation(TPCH_SCHEMA, "table_to_be_dropped"); + assertThat(cachingDirectoryLister.isCached(tableLocation)).isTrue(); + assertUpdate("DROP TABLE table_to_be_dropped"); + // Dropping the table should invalidate the cached listing of the files belonging to the table. + assertThat(cachingDirectoryLister.isCached(tableLocation)).isFalse(); + } + + @Test + public void testDropPartitionedTable() + { + assertUpdate("CREATE TABLE drop_partitioned_table (col1 int, col2 varchar) WITH (format = 'ORC', partitioned_by = ARRAY['col2'])"); + assertUpdate("INSERT INTO drop_partitioned_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2'), (5, 'group3')", 5); + // The listing for the table partitions should be in the directory cache after this call + assertQuery("SELECT col2, sum(col1) FROM drop_partitioned_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7), ('group3', 5)"); + org.apache.hadoop.fs.Path tableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "drop_partitioned_table", ImmutableList.of("group1")); + org.apache.hadoop.fs.Path tableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "drop_partitioned_table", ImmutableList.of("group2")); + org.apache.hadoop.fs.Path tableGroup3PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "drop_partitioned_table", ImmutableList.of("group3")); + assertThat(cachingDirectoryLister.isCached(tableGroup1PartitionLocation)).isTrue(); + assertThat(cachingDirectoryLister.isCached(tableGroup2PartitionLocation)).isTrue(); + assertThat(cachingDirectoryLister.isCached(tableGroup3PartitionLocation)).isTrue(); + assertUpdate("DROP TABLE drop_partitioned_table"); + assertThat(cachingDirectoryLister.isCached(tableGroup1PartitionLocation)).isFalse(); + assertThat(cachingDirectoryLister.isCached(tableGroup2PartitionLocation)).isFalse(); + assertThat(cachingDirectoryLister.isCached(tableGroup3PartitionLocation)).isFalse(); + } + + private org.apache.hadoop.fs.Path getTableLocation(String schemaName, String tableName) + { + return fileHiveMetastore.getTable(schemaName, tableName) + .map(table -> table.getStorage().getLocation()) + .map(tableLocation -> new org.apache.hadoop.fs.Path(tableLocation)) + .orElseThrow(() -> new NoSuchElementException(format("The table %s.%s could not be found", schemaName, tableName))); + } + + private org.apache.hadoop.fs.Path getPartitionLocation(String schemaName, String tableName, List partitionValues) + { + Table table = fileHiveMetastore.getTable(schemaName, tableName) + .orElseThrow(() -> new NoSuchElementException(format("The table %s.%s could not be found", schemaName, tableName))); + + return fileHiveMetastore.getPartition(table, partitionValues) + .map(partition -> partition.getStorage().getLocation()) + .map(partitionLocation -> new org.apache.hadoop.fs.Path(partitionLocation)) + .orElseThrow(() -> new NoSuchElementException(format("The partition %s from the table %s.%s could not be found", partitionValues, schemaName, tableName))); + } +} diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHiveConnectorFactory.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHiveConnectorFactory.java index 37929b7b644d..f205fd40f906 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHiveConnectorFactory.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHiveConnectorFactory.java @@ -31,16 +31,18 @@ public class TestingHiveConnectorFactory { private final HiveMetastore metastore; private final Module module; + private final Optional cachingDirectoryLister; public TestingHiveConnectorFactory(HiveMetastore metastore) { - this(metastore, EMPTY_MODULE); + this(metastore, EMPTY_MODULE, Optional.empty()); } - public TestingHiveConnectorFactory(HiveMetastore metastore, Module module) + public TestingHiveConnectorFactory(HiveMetastore metastore, Module module, Optional cachingDirectoryLister) { this.metastore = requireNonNull(metastore, "metastore is null"); this.module = requireNonNull(module, "module is null"); + this.cachingDirectoryLister = requireNonNull(cachingDirectoryLister, "cachingDirectoryLister is null"); } @Override @@ -52,6 +54,6 @@ public String getName() @Override public Connector create(String catalogName, Map config, ConnectorContext context) { - return createConnector(catalogName, config, context, module, Optional.of(metastore)); + return createConnector(catalogName, config, context, module, Optional.of(metastore), cachingDirectoryLister); } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHivePlugin.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHivePlugin.java index b1e350d3e3bc..f5b27da4b4a3 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHivePlugin.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHivePlugin.java @@ -19,6 +19,8 @@ import io.trino.spi.Plugin; import io.trino.spi.connector.ConnectorFactory; +import java.util.Optional; + import static com.google.inject.util.Modules.EMPTY_MODULE; import static java.util.Objects.requireNonNull; @@ -27,21 +29,23 @@ public class TestingHivePlugin { private final HiveMetastore metastore; private final Module module; + private final Optional cachingDirectoryLister; public TestingHivePlugin(HiveMetastore metastore) { - this(metastore, EMPTY_MODULE); + this(metastore, EMPTY_MODULE, Optional.empty()); } - public TestingHivePlugin(HiveMetastore metastore, Module module) + public TestingHivePlugin(HiveMetastore metastore, Module module, Optional cachingDirectoryLister) { this.metastore = requireNonNull(metastore, "metastore is null"); this.module = requireNonNull(module, "module is null"); + this.cachingDirectoryLister = requireNonNull(cachingDirectoryLister, "cachingDirectoryLister is null"); } @Override public Iterable getConnectorFactories() { - return ImmutableList.of(new TestingHiveConnectorFactory(metastore, module)); + return ImmutableList.of(new TestingHiveConnectorFactory(metastore, module, cachingDirectoryLister)); } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/TestSemiTransactionalHiveMetastore.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/TestSemiTransactionalHiveMetastore.java index 83565c49b798..5a6e50d72165 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/TestSemiTransactionalHiveMetastore.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/TestSemiTransactionalHiveMetastore.java @@ -19,6 +19,7 @@ import io.trino.plugin.hive.HiveMetastoreClosure; import io.trino.plugin.hive.HiveType; import io.trino.plugin.hive.PartitionStatistics; +import io.trino.plugin.hive.TableInvalidationCallback; import io.trino.plugin.hive.acid.AcidTransaction; import org.apache.hadoop.fs.Path; import org.testng.annotations.Test; @@ -86,7 +87,8 @@ private SemiTransactionalHiveMetastore getSemiTransactionalHiveMetastoreWithDrop false, true, Optional.empty(), - newScheduledThreadPool(1)); + newScheduledThreadPool(1), + TableInvalidationCallback.NOOP); } @Test @@ -125,7 +127,8 @@ private SemiTransactionalHiveMetastore getSemiTransactionalHiveMetastoreWithUpda false, true, Optional.empty(), - newScheduledThreadPool(1)); + newScheduledThreadPool(1), + TableInvalidationCallback.NOOP); } private class TestingHiveMetastore