From c9be173306cfb47f81b73fb58e080f3ea45c1dcb Mon Sep 17 00:00:00 2001 From: Karol Sobczak Date: Tue, 22 Mar 2022 17:16:35 +0100 Subject: [PATCH 1/3] Move CachingDirectoryLister to fs package --- .../io/trino/plugin/hive/BackgroundHiveSplitLoader.java | 9 +++++---- .../src/main/java/io/trino/plugin/hive/HiveModule.java | 1 + .../main/java/io/trino/plugin/hive/HiveSplitManager.java | 1 + .../trino/plugin/hive/InternalHiveConnectorFactory.java | 2 ++ .../plugin/hive/{ => fs}/CachingDirectoryLister.java | 4 +++- .../hive/{ => fs}/CachingDirectoryListerModule.java | 3 ++- .../io/trino/plugin/hive/{ => fs}/DirectoryLister.java | 2 +- .../trino/plugin/hive/{util => fs}/HiveFileIterator.java | 3 +-- .../test/java/io/trino/plugin/hive/AbstractTestHive.java | 1 + .../io/trino/plugin/hive/AbstractTestHiveFileSystem.java | 1 + .../test/java/io/trino/plugin/hive/HiveQueryRunner.java | 1 + .../trino/plugin/hive/TestBackgroundHiveSplitLoader.java | 2 ++ .../trino/plugin/hive/TestingHiveConnectorFactory.java | 1 + .../java/io/trino/plugin/hive/TestingHivePlugin.java | 1 + .../plugin/hive/{ => fs}/TestCachingDirectoryLister.java | 3 ++- 15 files changed, 25 insertions(+), 10 deletions(-) rename plugin/trino-hive/src/main/java/io/trino/plugin/hive/{ => fs}/CachingDirectoryLister.java (98%) rename plugin/trino-hive/src/main/java/io/trino/plugin/hive/{ => fs}/CachingDirectoryListerModule.java (95%) rename plugin/trino-hive/src/main/java/io/trino/plugin/hive/{ => fs}/DirectoryLister.java (96%) rename plugin/trino-hive/src/main/java/io/trino/plugin/hive/{util => fs}/HiveFileIterator.java (98%) rename plugin/trino-hive/src/test/java/io/trino/plugin/hive/{ => fs}/TestCachingDirectoryLister.java (99%) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java index cd5ce0d29bbb..b98b4510333d 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java @@ -28,12 +28,13 @@ import io.trino.plugin.hive.HiveSplit.BucketConversion; import io.trino.plugin.hive.HiveSplit.BucketValidation; import io.trino.plugin.hive.acid.AcidTransaction; +import io.trino.plugin.hive.fs.DirectoryLister; +import io.trino.plugin.hive.fs.HiveFileIterator; import io.trino.plugin.hive.metastore.Column; import io.trino.plugin.hive.metastore.Partition; import io.trino.plugin.hive.metastore.Table; import io.trino.plugin.hive.util.HiveBucketing.BucketingVersion; import io.trino.plugin.hive.util.HiveBucketing.HiveBucketFilter; -import io.trino.plugin.hive.util.HiveFileIterator; import io.trino.plugin.hive.util.InternalHiveSplitFactory; import io.trino.plugin.hive.util.ResumableTask; import io.trino.plugin.hive.util.ResumableTasks; @@ -106,13 +107,13 @@ import static io.trino.plugin.hive.HiveSessionProperties.getMaxInitialSplitSize; import static io.trino.plugin.hive.HiveSessionProperties.isForceLocalScheduling; import static io.trino.plugin.hive.HiveSessionProperties.isValidateBucketing; +import static io.trino.plugin.hive.fs.HiveFileIterator.NestedDirectoryPolicy.FAIL; +import static io.trino.plugin.hive.fs.HiveFileIterator.NestedDirectoryPolicy.IGNORED; +import static io.trino.plugin.hive.fs.HiveFileIterator.NestedDirectoryPolicy.RECURSE; import static io.trino.plugin.hive.metastore.MetastoreUtil.getHiveSchema; import static io.trino.plugin.hive.metastore.MetastoreUtil.getPartitionLocation; import static io.trino.plugin.hive.s3select.S3SelectPushdown.shouldEnablePushdownForTable; import static io.trino.plugin.hive.util.ConfigurationUtils.toJobConf; -import static io.trino.plugin.hive.util.HiveFileIterator.NestedDirectoryPolicy.FAIL; -import static io.trino.plugin.hive.util.HiveFileIterator.NestedDirectoryPolicy.IGNORED; -import static io.trino.plugin.hive.util.HiveFileIterator.NestedDirectoryPolicy.RECURSE; import static io.trino.plugin.hive.util.HiveUtil.checkCondition; import static io.trino.plugin.hive.util.HiveUtil.getFooterCount; import static io.trino.plugin.hive.util.HiveUtil.getHeaderCount; diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java index 870b1eb687c5..3680832a1234 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java @@ -21,6 +21,7 @@ import com.google.inject.multibindings.Multibinder; import io.airlift.event.client.EventClient; import io.trino.plugin.base.CatalogName; +import io.trino.plugin.hive.fs.CachingDirectoryLister; import io.trino.plugin.hive.metastore.MetastoreConfig; import io.trino.plugin.hive.orc.OrcFileWriterFactory; import io.trino.plugin.hive.orc.OrcPageSourceFactory; diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java index d3dca08a59a1..1e5f938b2a8a 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java @@ -21,6 +21,7 @@ import io.airlift.concurrent.BoundedExecutor; import io.airlift.stats.CounterStat; import io.airlift.units.DataSize; +import io.trino.plugin.hive.fs.DirectoryLister; import io.trino.plugin.hive.metastore.Column; import io.trino.plugin.hive.metastore.Partition; import io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore; diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java index e5484aa0d788..d71c1aeb9684 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java @@ -36,6 +36,8 @@ import io.trino.plugin.base.session.SessionPropertiesProvider; import io.trino.plugin.hive.authentication.HdfsAuthenticationModule; import io.trino.plugin.hive.azure.HiveAzureModule; +import io.trino.plugin.hive.fs.CachingDirectoryLister; +import io.trino.plugin.hive.fs.CachingDirectoryListerModule; import io.trino.plugin.hive.gcs.HiveGcsModule; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.HiveMetastoreModule; diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/CachingDirectoryLister.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/CachingDirectoryLister.java similarity index 98% rename from plugin/trino-hive/src/main/java/io/trino/plugin/hive/CachingDirectoryLister.java rename to plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/CachingDirectoryLister.java index 66f2ddd7754b..88493457ee0d 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/CachingDirectoryLister.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/CachingDirectoryLister.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.hive; +package io.trino.plugin.hive.fs; import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.Cache; @@ -19,6 +19,8 @@ import com.google.common.collect.ImmutableList; import io.airlift.units.Duration; import io.trino.collect.cache.EvictableCacheBuilder; +import io.trino.plugin.hive.HiveConfig; +import io.trino.plugin.hive.TableInvalidationCallback; import io.trino.plugin.hive.metastore.Partition; import io.trino.plugin.hive.metastore.Storage; import io.trino.plugin.hive.metastore.Table; diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/CachingDirectoryListerModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/CachingDirectoryListerModule.java similarity index 95% rename from plugin/trino-hive/src/main/java/io/trino/plugin/hive/CachingDirectoryListerModule.java rename to plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/CachingDirectoryListerModule.java index 4d91045fdf1e..d9720bedd7b2 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/CachingDirectoryListerModule.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/CachingDirectoryListerModule.java @@ -11,11 +11,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.hive; +package io.trino.plugin.hive.fs; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Scopes; +import io.trino.plugin.hive.TableInvalidationCallback; import java.util.Optional; diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/DirectoryLister.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryLister.java similarity index 96% rename from plugin/trino-hive/src/main/java/io/trino/plugin/hive/DirectoryLister.java rename to plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryLister.java index e9ed48f2b069..da90ea35c83e 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/DirectoryLister.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryLister.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.hive; +package io.trino.plugin.hive.fs; import io.trino.plugin.hive.metastore.Table; import org.apache.hadoop.fs.FileSystem; diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveFileIterator.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/HiveFileIterator.java similarity index 98% rename from plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveFileIterator.java rename to plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/HiveFileIterator.java index de577035be09..27e876ed12b9 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveFileIterator.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/HiveFileIterator.java @@ -11,11 +11,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.hive.util; +package io.trino.plugin.hive.fs; import com.google.common.collect.AbstractIterator; import io.airlift.stats.TimeStat; -import io.trino.plugin.hive.DirectoryLister; import io.trino.plugin.hive.NamenodeStats; import io.trino.plugin.hive.metastore.Table; import io.trino.spi.TrinoException; diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java index 7648be46b967..840961189d2d 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java @@ -32,6 +32,7 @@ import io.trino.plugin.hive.authentication.NoHdfsAuthentication; import io.trino.plugin.hive.azure.HiveAzureConfig; import io.trino.plugin.hive.azure.TrinoAzureConfigurationInitializer; +import io.trino.plugin.hive.fs.CachingDirectoryLister; import io.trino.plugin.hive.gcs.GoogleGcsConfigurationInitializer; import io.trino.plugin.hive.gcs.HiveGcsConfig; import io.trino.plugin.hive.metastore.Column; diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java index 750ab3de1eec..f723d65be082 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java @@ -27,6 +27,7 @@ import io.trino.plugin.hive.HdfsEnvironment.HdfsContext; import io.trino.plugin.hive.authentication.HiveIdentity; import io.trino.plugin.hive.authentication.NoHdfsAuthentication; +import io.trino.plugin.hive.fs.CachingDirectoryLister; import io.trino.plugin.hive.metastore.Column; import io.trino.plugin.hive.metastore.Database; import io.trino.plugin.hive.metastore.ForwardingHiveMetastore; diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java index bdd901e74c03..8b56d8836d35 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java @@ -20,6 +20,7 @@ import io.airlift.log.Logging; import io.trino.Session; import io.trino.metadata.QualifiedObjectName; +import io.trino.plugin.hive.fs.CachingDirectoryLister; import io.trino.plugin.hive.metastore.Database; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.MetastoreConfig; diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java index f579f342a706..18e5c163e45a 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java @@ -25,6 +25,8 @@ import io.airlift.units.Duration; import io.trino.plugin.hive.HiveColumnHandle.ColumnType; import io.trino.plugin.hive.authentication.NoHdfsAuthentication; +import io.trino.plugin.hive.fs.CachingDirectoryLister; +import io.trino.plugin.hive.fs.DirectoryLister; import io.trino.plugin.hive.metastore.Column; import io.trino.plugin.hive.metastore.StorageFormat; import io.trino.plugin.hive.metastore.Table; diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHiveConnectorFactory.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHiveConnectorFactory.java index da0ca0f1c5e9..1424253e7bc7 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHiveConnectorFactory.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHiveConnectorFactory.java @@ -14,6 +14,7 @@ package io.trino.plugin.hive; import com.google.inject.Module; +import io.trino.plugin.hive.fs.CachingDirectoryLister; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.spi.connector.Connector; import io.trino.spi.connector.ConnectorContext; diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHivePlugin.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHivePlugin.java index 0c51272c22e6..2770369572b1 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHivePlugin.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHivePlugin.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Module; +import io.trino.plugin.hive.fs.CachingDirectoryLister; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.spi.Plugin; import io.trino.spi.connector.ConnectorFactory; diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestCachingDirectoryLister.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestCachingDirectoryLister.java similarity index 99% rename from plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestCachingDirectoryLister.java rename to plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestCachingDirectoryLister.java index 08084f75fecb..c50a8feef728 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestCachingDirectoryLister.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestCachingDirectoryLister.java @@ -11,11 +11,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.hive; +package io.trino.plugin.hive.fs; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.airlift.units.Duration; +import io.trino.plugin.hive.HiveQueryRunner; import io.trino.plugin.hive.metastore.Table; import io.trino.plugin.hive.metastore.file.FileHiveMetastore; import io.trino.testing.AbstractTestQueryFramework; From 0c5b67c5ce0d7f0a3f24f52d42cbc7db6e017857 Mon Sep 17 00:00:00 2001 From: Karol Sobczak Date: Thu, 24 Mar 2022 10:57:17 +0100 Subject: [PATCH 2/3] Fix formatting --- .../test/java/io/trino/plugin/hive/AbstractTestHive.java | 4 ++-- .../test/java/io/trino/plugin/hive/HiveQueryRunner.java | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java index 840961189d2d..36d7270d06c9 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java @@ -5542,8 +5542,8 @@ protected void doTestTransactionDeleteInsert(HiveStorageFormat storageFormat, bo .row(620L, "f", "insert3") .build(); Domain domainToDrop = Domain.create(ValueSet.of( - createUnboundedVarcharType(), - utf8Slice("alter1"), utf8Slice("alter2"), utf8Slice("alter3"), utf8Slice("drop1"), utf8Slice("drop2"), utf8Slice("drop3")), + createUnboundedVarcharType(), + utf8Slice("alter1"), utf8Slice("alter2"), utf8Slice("alter3"), utf8Slice("drop1"), utf8Slice("drop2"), utf8Slice("drop3")), false); List extraRowsForInsertExisting = ImmutableList.of(); if (allowInsertExisting) { diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java index 8b56d8836d35..383b8a712763 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java @@ -300,8 +300,8 @@ private static Session createSession(Optional role) return testSessionBuilder() .setIdentity(Identity.forUser("hive") .withConnectorRoles(role.map(selectedRole -> ImmutableMap.of( - HIVE_CATALOG, selectedRole, - HIVE_BUCKETED_CATALOG, selectedRole)) + HIVE_CATALOG, selectedRole, + HIVE_BUCKETED_CATALOG, selectedRole)) .orElse(ImmutableMap.of())) .build()) .setCatalog(HIVE_CATALOG) @@ -314,8 +314,8 @@ public static Session createBucketedSession(Optional role) return testSessionBuilder() .setIdentity(Identity.forUser("hive") .withConnectorRoles(role.map(selectedRole -> ImmutableMap.of( - HIVE_CATALOG, selectedRole, - HIVE_BUCKETED_CATALOG, selectedRole)) + HIVE_CATALOG, selectedRole, + HIVE_BUCKETED_CATALOG, selectedRole)) .orElse(ImmutableMap.of())) .build()) .setCatalog(HIVE_BUCKETED_CATALOG) From 79cc1b52927bc11b276d44f6155c0821655e18d4 Mon Sep 17 00:00:00 2001 From: Karol Sobczak Date: Fri, 1 Apr 2022 14:16:46 +0200 Subject: [PATCH 3/3] Cache file listings per transaction Cache concurrent directory listing when same table is being scanned during single query --- .../plugin/deltalake/DeltaLakeModule.java | 7 + .../plugin/hive/TestHiveAlluxioMetastore.java | 6 + .../java/io/trino/plugin/hive/HiveConfig.java | 16 + .../io/trino/plugin/hive/HiveMetadata.java | 12 +- .../plugin/hive/HiveMetadataFactory.java | 23 +- .../trino/plugin/hive/HiveSplitManager.java | 11 +- .../hive/InternalHiveConnectorFactory.java | 6 +- .../hive/TableInvalidationCallback.java | 12 - .../plugin/hive/TransactionalMetadata.java | 3 + .../hive/fs/CachingDirectoryLister.java | 26 +- .../hive/fs/CachingDirectoryListerModule.java | 19 +- .../trino/plugin/hive/fs/DirectoryLister.java | 2 + .../plugin/hive/fs/SimpleRemoteIterator.java | 47 +++ ...ransactionScopeCachingDirectoryLister.java | 239 ++++++++++++ .../trino/plugin/hive/AbstractTestHive.java | 82 +++- .../hive/AbstractTestHiveFileSystem.java | 5 +- .../io/trino/plugin/hive/HiveQueryRunner.java | 10 +- .../io/trino/plugin/hive/TestHiveConfig.java | 3 + .../hive/TestingHiveConnectorFactory.java | 10 +- .../trino/plugin/hive/TestingHivePlugin.java | 10 +- .../fs/BaseCachingDirectoryListerTest.java | 350 ++++++++++++++++++ .../hive/fs/FileSystemDirectoryLister.java | 44 +++ .../hive/fs/TestCachingDirectoryLister.java | 322 +--------------- ...ransactionScopeCachingDirectoryLister.java | 234 ++++++++++++ .../TestSemiTransactionalHiveMetastore.java | 6 +- 25 files changed, 1104 insertions(+), 401 deletions(-) create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/SimpleRemoteIterator.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryLister.java create mode 100644 plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/BaseCachingDirectoryListerTest.java create mode 100644 plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/FileSystemDirectoryLister.java create mode 100644 plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestTransactionScopeCachingDirectoryLister.java diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeModule.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeModule.java index dc5170f615f1..9f68c7f0f6d1 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeModule.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeModule.java @@ -49,6 +49,7 @@ import io.trino.plugin.hive.SystemTableProvider; import io.trino.plugin.hive.TransactionalMetadata; import io.trino.plugin.hive.TransactionalMetadataFactory; +import io.trino.plugin.hive.fs.DirectoryLister; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.MetastoreConfig; import io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore; @@ -175,6 +176,12 @@ public SemiTransactionalHiveMetastore getMetastore() throw new RuntimeException("SemiTransactionalHiveMetastore is not used by Delta"); } + @Override + public DirectoryLister getDirectoryLister() + { + throw new RuntimeException("DirectoryLister is not used by Delta"); + } + @Override public void commit() {} diff --git a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/TestHiveAlluxioMetastore.java b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/TestHiveAlluxioMetastore.java index ecd0326f00f3..9a0552778002 100644 --- a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/TestHiveAlluxioMetastore.java +++ b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/TestHiveAlluxioMetastore.java @@ -101,6 +101,12 @@ public void testEmptyOrcFile() // Alluxio metastore does not support create operations } + @Override + public void testPerTransactionDirectoryListerCache() + { + // Alluxio metastore does not support create operations + } + // specifically disable so that expected exception on the superclass don't fail this test @Override @Test(enabled = false) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java index b9febc8be3d6..e37811ce1240 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java @@ -141,6 +141,8 @@ public class HiveConfig private Duration fileStatusCacheExpireAfterWrite = new Duration(1, MINUTES); private long fileStatusCacheMaxSize = 1000 * 1000; private List fileStatusCacheTables = ImmutableList.of(); + private long perTransactionFileStatusCacheMaximumSize = 1000 * 1000; + private boolean translateHiveViews; private Optional hiveTransactionHeartbeatInterval = Optional.empty(); @@ -746,6 +748,20 @@ public HiveConfig setFileStatusCacheTables(String fileStatusCacheTables) return this; } + @Min(1) + public long getPerTransactionFileStatusCacheMaximumSize() + { + return perTransactionFileStatusCacheMaximumSize; + } + + @Config("hive.per-transaction-file-status-cache-maximum-size") + @ConfigDescription("Maximum number of file statuses cached by transactional file status cache") + public HiveConfig setPerTransactionFileStatusCacheMaximumSize(long perTransactionFileStatusCacheMaximumSize) + { + this.perTransactionFileStatusCacheMaximumSize = perTransactionFileStatusCacheMaximumSize; + return this; + } + public boolean isTranslateHiveViews() { return translateHiveViews; diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java index 6e4ccb1d8328..0807fef1e443 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java @@ -36,6 +36,7 @@ import io.trino.plugin.hive.acid.AcidOperation; import io.trino.plugin.hive.acid.AcidSchema; import io.trino.plugin.hive.acid.AcidTransaction; +import io.trino.plugin.hive.fs.DirectoryLister; import io.trino.plugin.hive.metastore.Column; import io.trino.plugin.hive.metastore.Database; import io.trino.plugin.hive.metastore.HiveColumnStatistics; @@ -364,6 +365,7 @@ public class HiveMetadata private final HiveMaterializedViewMetadata hiveMaterializedViewMetadata; private final AccessControlMetadata accessControlMetadata; private final HiveTableRedirectionsProvider tableRedirectionsProvider; + private final DirectoryLister directoryLister; public HiveMetadata( CatalogName catalogName, @@ -385,7 +387,8 @@ public HiveMetadata( Set systemTableProviders, HiveMaterializedViewMetadata hiveMaterializedViewMetadata, AccessControlMetadata accessControlMetadata, - HiveTableRedirectionsProvider tableRedirectionsProvider) + HiveTableRedirectionsProvider tableRedirectionsProvider, + DirectoryLister directoryLister) { this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.metastore = requireNonNull(metastore, "metastore is null"); @@ -407,6 +410,7 @@ public HiveMetadata( this.hiveMaterializedViewMetadata = requireNonNull(hiveMaterializedViewMetadata, "hiveMaterializedViewMetadata is null"); this.accessControlMetadata = requireNonNull(accessControlMetadata, "accessControlMetadata is null"); this.tableRedirectionsProvider = requireNonNull(tableRedirectionsProvider, "tableRedirectionsProvider is null"); + this.directoryLister = requireNonNull(directoryLister, "directoryLister is null"); } @Override @@ -415,6 +419,12 @@ public SemiTransactionalHiveMetastore getMetastore() return metastore; } + @Override + public DirectoryLister getDirectoryLister() + { + return directoryLister; + } + @Override public List listSchemaNames(ConnectorSession session) { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java index aea493116109..da120c183a8a 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java @@ -17,6 +17,8 @@ import io.airlift.json.JsonCodec; import io.airlift.units.Duration; import io.trino.plugin.base.CatalogName; +import io.trino.plugin.hive.fs.DirectoryLister; +import io.trino.plugin.hive.fs.TransactionScopeCachingDirectoryLister; import io.trino.plugin.hive.metastore.HiveMetastoreFactory; import io.trino.plugin.hive.metastore.MetastoreConfig; import io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore; @@ -68,7 +70,8 @@ public class HiveMetadataFactory private final Optional hiveTransactionHeartbeatInterval; private final HiveTableRedirectionsProvider tableRedirectionsProvider; private final ScheduledExecutorService heartbeatService; - private final TableInvalidationCallback tableInvalidationCallback; + private final DirectoryLister directoryLister; + private final long perTransactionFileStatusCacheMaximumSize; @Inject public HiveMetadataFactory( @@ -90,7 +93,7 @@ public HiveMetadataFactory( HiveMaterializedViewMetadataFactory hiveMaterializedViewMetadataFactory, AccessControlMetadataFactory accessControlMetadataFactory, HiveTableRedirectionsProvider tableRedirectionsProvider, - TableInvalidationCallback tableInvalidationCallback) + DirectoryLister directoryLister) { this( catalogName, @@ -121,7 +124,8 @@ public HiveMetadataFactory( hiveMaterializedViewMetadataFactory, accessControlMetadataFactory, tableRedirectionsProvider, - tableInvalidationCallback); + directoryLister, + hiveConfig.getPerTransactionFileStatusCacheMaximumSize()); } public HiveMetadataFactory( @@ -153,7 +157,8 @@ public HiveMetadataFactory( HiveMaterializedViewMetadataFactory hiveMaterializedViewMetadataFactory, AccessControlMetadataFactory accessControlMetadataFactory, HiveTableRedirectionsProvider tableRedirectionsProvider, - TableInvalidationCallback tableInvalidationCallback) + DirectoryLister directoryLister, + long perTransactionFileStatusCacheMaximumSize) { this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.skipDeletionForAlter = skipDeletionForAlter; @@ -190,7 +195,8 @@ public HiveMetadataFactory( updateExecutor = new BoundedExecutor(executorService, maxConcurrentMetastoreUpdates); } this.heartbeatService = requireNonNull(heartbeatService, "heartbeatService is null"); - this.tableInvalidationCallback = requireNonNull(tableInvalidationCallback, "tableInvalidationCallback is null"); + this.directoryLister = requireNonNull(directoryLister, "directoryLister is null"); + this.perTransactionFileStatusCacheMaximumSize = perTransactionFileStatusCacheMaximumSize; } @Override @@ -199,6 +205,8 @@ public TransactionalMetadata create(ConnectorIdentity identity, boolean autoComm HiveMetastoreClosure hiveMetastoreClosure = new HiveMetastoreClosure( memoizeMetastore(metastoreFactory.createMetastore(Optional.of(identity)), perTransactionCacheMaximumSize)); // per-transaction cache + DirectoryLister directoryLister = new TransactionScopeCachingDirectoryLister(this.directoryLister, perTransactionFileStatusCacheMaximumSize); + SemiTransactionalHiveMetastore metastore = new SemiTransactionalHiveMetastore( hdfsEnvironment, hiveMetastoreClosure, @@ -210,7 +218,7 @@ public TransactionalMetadata create(ConnectorIdentity identity, boolean autoComm deleteSchemaLocationsFallback, hiveTransactionHeartbeatInterval, heartbeatService, - tableInvalidationCallback); + directoryLister); return new HiveMetadata( catalogName, @@ -232,6 +240,7 @@ public TransactionalMetadata create(ConnectorIdentity identity, boolean autoComm systemTableProviders, hiveMaterializedViewMetadataFactory.create(hiveMetastoreClosure), accessControlMetadataFactory.create(metastore), - tableRedirectionsProvider); + tableRedirectionsProvider, + directoryLister); } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java index 1e5f938b2a8a..e0950b21917e 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java @@ -21,7 +21,6 @@ import io.airlift.concurrent.BoundedExecutor; import io.airlift.stats.CounterStat; import io.airlift.units.DataSize; -import io.trino.plugin.hive.fs.DirectoryLister; import io.trino.plugin.hive.metastore.Column; import io.trino.plugin.hive.metastore.Partition; import io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore; @@ -98,7 +97,6 @@ public class HiveSplitManager private final HivePartitionManager partitionManager; private final NamenodeStats namenodeStats; private final HdfsEnvironment hdfsEnvironment; - private final DirectoryLister directoryLister; private final Executor executor; private final int maxOutstandingSplits; private final DataSize maxOutstandingSplitsSize; @@ -118,7 +116,6 @@ public HiveSplitManager( HivePartitionManager partitionManager, NamenodeStats namenodeStats, HdfsEnvironment hdfsEnvironment, - DirectoryLister directoryLister, ExecutorService executorService, VersionEmbedder versionEmbedder, TypeManager typeManager) @@ -128,7 +125,6 @@ public HiveSplitManager( partitionManager, namenodeStats, hdfsEnvironment, - directoryLister, versionEmbedder.embedVersion(new BoundedExecutor(executorService, hiveConfig.getMaxSplitIteratorThreads())), new CounterStat(), hiveConfig.getMaxOutstandingSplits(), @@ -147,7 +143,6 @@ public HiveSplitManager( HivePartitionManager partitionManager, NamenodeStats namenodeStats, HdfsEnvironment hdfsEnvironment, - DirectoryLister directoryLister, Executor executor, CounterStat highMemorySplitSourceCounter, int maxOutstandingSplits, @@ -164,7 +159,6 @@ public HiveSplitManager( this.partitionManager = requireNonNull(partitionManager, "partitionManager is null"); this.namenodeStats = requireNonNull(namenodeStats, "namenodeStats is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); - this.directoryLister = requireNonNull(directoryLister, "directoryLister is null"); this.executor = new ErrorCodedExecutor(executor); this.highMemorySplitSourceCounter = requireNonNull(highMemorySplitSourceCounter, "highMemorySplitSourceCounter is null"); checkArgument(maxOutstandingSplits >= 1, "maxOutstandingSplits must be at least 1"); @@ -191,7 +185,8 @@ public ConnectorSplitSource getSplits( SchemaTableName tableName = hiveTable.getSchemaTableName(); // get table metadata - SemiTransactionalHiveMetastore metastore = transactionManager.get(transaction, session.getIdentity()).getMetastore(); + TransactionalMetadata transactionalMetadata = transactionManager.get(transaction, session.getIdentity()); + SemiTransactionalHiveMetastore metastore = transactionalMetadata.getMetastore(); Table table = metastore.getTable(tableName.getSchemaName(), tableName.getTableName()) .orElseThrow(() -> new TableNotFoundException(tableName)); @@ -240,7 +235,7 @@ public ConnectorSplitSource getSplits( session, hdfsEnvironment, namenodeStats, - directoryLister, + transactionalMetadata.getDirectoryLister(), executor, concurrency, recursiveDfsWalkerEnabled, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java index d71c1aeb9684..e099668fa024 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java @@ -36,8 +36,8 @@ import io.trino.plugin.base.session.SessionPropertiesProvider; import io.trino.plugin.hive.authentication.HdfsAuthenticationModule; import io.trino.plugin.hive.azure.HiveAzureModule; -import io.trino.plugin.hive.fs.CachingDirectoryLister; import io.trino.plugin.hive.fs.CachingDirectoryListerModule; +import io.trino.plugin.hive.fs.DirectoryLister; import io.trino.plugin.hive.gcs.HiveGcsModule; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.HiveMetastoreModule; @@ -89,7 +89,7 @@ public static Connector createConnector( ConnectorContext context, Module module, Optional metastore, - Optional cachingDirectoryLister) + Optional directoryLister) { requireNonNull(config, "config is null"); @@ -103,7 +103,7 @@ public static Connector createConnector( new JsonModule(), new TypeDeserializerModule(context.getTypeManager()), new HiveModule(), - new CachingDirectoryListerModule(cachingDirectoryLister), + new CachingDirectoryListerModule(directoryLister), new HiveHdfsModule(), new HiveS3Module(), new HiveGcsModule(), diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/TableInvalidationCallback.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/TableInvalidationCallback.java index d648c962e6a9..d444eb45eab5 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/TableInvalidationCallback.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/TableInvalidationCallback.java @@ -18,18 +18,6 @@ public interface TableInvalidationCallback { - TableInvalidationCallback NOOP = new TableInvalidationCallback() { - @Override - public void invalidate(Partition partition) - { - } - - @Override - public void invalidate(Table table) - { - } - }; - void invalidate(Partition partition); void invalidate(Table table); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/TransactionalMetadata.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/TransactionalMetadata.java index 8a1020b66cec..c6e107027ffd 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/TransactionalMetadata.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/TransactionalMetadata.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.hive; +import io.trino.plugin.hive.fs.DirectoryLister; import io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore; import io.trino.spi.connector.ConnectorMetadata; @@ -21,6 +22,8 @@ public interface TransactionalMetadata { SemiTransactionalHiveMetastore getMetastore(); + DirectoryLister getDirectoryLister(); + void commit(); void rollback(); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/CachingDirectoryLister.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/CachingDirectoryLister.java index 88493457ee0d..1503d50643b4 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/CachingDirectoryLister.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/CachingDirectoryLister.java @@ -20,7 +20,6 @@ import io.airlift.units.Duration; import io.trino.collect.cache.EvictableCacheBuilder; import io.trino.plugin.hive.HiveConfig; -import io.trino.plugin.hive.TableInvalidationCallback; import io.trino.plugin.hive.metastore.Partition; import io.trino.plugin.hive.metastore.Storage; import io.trino.plugin.hive.metastore.Table; @@ -36,7 +35,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -48,7 +46,7 @@ import static org.apache.commons.lang3.StringUtils.isNotEmpty; public class CachingDirectoryLister - implements DirectoryLister, TableInvalidationCallback + implements DirectoryLister { //TODO use a cache key based on Path & SchemaTableName and iterate over the cache keys // to deal more efficiently with cache invalidation scenarios for partitioned tables. @@ -100,7 +98,7 @@ public RemoteIterator list(FileSystem fs, Table table, Path p ValueHolder cachedValueHolder = uncheckedCacheGet(cache, path, ValueHolder::new); if (cachedValueHolder.getFiles().isPresent()) { - return simpleRemoteIterator(cachedValueHolder.getFiles().get()); + return new SimpleRemoteIterator(cachedValueHolder.getFiles().get().iterator()); } return cachingRemoteIterator(cachedValueHolder, fs.listLocatedStatus(path), path); } @@ -157,26 +155,6 @@ public LocatedFileStatus next() }; } - private static RemoteIterator simpleRemoteIterator(List files) - { - return new RemoteIterator<>() - { - private final Iterator iterator = ImmutableList.copyOf(files).iterator(); - - @Override - public boolean hasNext() - { - return iterator.hasNext(); - } - - @Override - public LocatedFileStatus next() - { - return iterator.next(); - } - }; - } - @Managed public void flushCache() { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/CachingDirectoryListerModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/CachingDirectoryListerModule.java index d9720bedd7b2..c2e1240bfb2b 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/CachingDirectoryListerModule.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/CachingDirectoryListerModule.java @@ -13,10 +13,10 @@ */ package io.trino.plugin.hive.fs; +import com.google.common.annotations.VisibleForTesting; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Scopes; -import io.trino.plugin.hive.TableInvalidationCallback; import java.util.Optional; @@ -25,25 +25,22 @@ public class CachingDirectoryListerModule implements Module { - private final Optional cachingDirectoryLister; + private final Optional directoryLister; - public CachingDirectoryListerModule(Optional cachingDirectoryLister) + @VisibleForTesting + public CachingDirectoryListerModule(Optional directoryLister) { - this.cachingDirectoryLister = requireNonNull(cachingDirectoryLister, "cachingDirectoryLister is null"); + this.directoryLister = requireNonNull(directoryLister, "directoryLister is null"); } @Override public void configure(Binder binder) { - if (cachingDirectoryLister.isPresent()) { - CachingDirectoryLister directoryLister = cachingDirectoryLister.get(); - binder.bind(DirectoryLister.class).toInstance(directoryLister); - binder.bind(TableInvalidationCallback.class).toInstance(directoryLister); + if (directoryLister.isPresent()) { + binder.bind(DirectoryLister.class).toInstance(directoryLister.get()); } else { - binder.bind(CachingDirectoryLister.class).in(Scopes.SINGLETON); - binder.bind(DirectoryLister.class).to(CachingDirectoryLister.class); - binder.bind(TableInvalidationCallback.class).to(CachingDirectoryLister.class); + binder.bind(DirectoryLister.class).to(CachingDirectoryLister.class).in(Scopes.SINGLETON); } } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryLister.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryLister.java index da90ea35c83e..327125c1c7ee 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryLister.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryLister.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.hive.fs; +import io.trino.plugin.hive.TableInvalidationCallback; import io.trino.plugin.hive.metastore.Table; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; @@ -22,6 +23,7 @@ import java.io.IOException; public interface DirectoryLister + extends TableInvalidationCallback { RemoteIterator list(FileSystem fs, Table table, Path path) throws IOException; diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/SimpleRemoteIterator.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/SimpleRemoteIterator.java new file mode 100644 index 000000000000..4ab952ca7aca --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/SimpleRemoteIterator.java @@ -0,0 +1,47 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.fs; + +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.RemoteIterator; + +import java.io.IOException; +import java.util.Iterator; + +import static java.util.Objects.requireNonNull; + +class SimpleRemoteIterator + implements RemoteIterator +{ + private final Iterator iterator; + + public SimpleRemoteIterator(Iterator iterator) + { + this.iterator = requireNonNull(iterator, "iterator is null"); + } + + @Override + public boolean hasNext() + throws IOException + { + return iterator.hasNext(); + } + + @Override + public LocatedFileStatus next() + throws IOException + { + return iterator.next(); + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryLister.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryLister.java new file mode 100644 index 000000000000..fad1be7fa299 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryLister.java @@ -0,0 +1,239 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.fs; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.Cache; +import com.google.common.util.concurrent.UncheckedExecutionException; +import io.trino.collect.cache.EvictableCacheBuilder; +import io.trino.plugin.hive.metastore.Partition; +import io.trino.plugin.hive.metastore.Storage; +import io.trino.plugin.hive.metastore.Table; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Throwables.throwIfInstanceOf; +import static com.google.common.base.Throwables.throwIfUnchecked; +import static java.util.Collections.synchronizedList; +import static java.util.Objects.requireNonNull; +import static org.apache.commons.lang3.StringUtils.isNotEmpty; + +/** + * Caches directory content (including listings that were started concurrently). + * {@link TransactionScopeCachingDirectoryLister} assumes that all listings + * are performed by same user within single transaction, therefore any failure can + * be shared between concurrent listings. + */ +public class TransactionScopeCachingDirectoryLister + implements DirectoryLister +{ + //TODO use a cache key based on Path & SchemaTableName and iterate over the cache keys + // to deal more efficiently with cache invalidation scenarios for partitioned tables. + private final Cache cache; + private final DirectoryLister delegate; + + public TransactionScopeCachingDirectoryLister(DirectoryLister delegate, long maxFileStatuses) + { + EvictableCacheBuilder cacheBuilder = EvictableCacheBuilder.newBuilder() + .maximumWeight(maxFileStatuses) + .weigher((key, value) -> value.getCachedFilesSize()); + this.cache = cacheBuilder.build(); + this.delegate = requireNonNull(delegate, "delegate is null"); + } + + @Override + public RemoteIterator list(FileSystem fs, Table table, Path path) + throws IOException + { + FetchingValueHolder cachedValueHolder; + try { + cachedValueHolder = cache.get(path, () -> new FetchingValueHolder(delegate.list(fs, table, path))); + } + catch (ExecutionException | UncheckedExecutionException e) { + Throwable throwable = e.getCause(); + throwIfInstanceOf(throwable, IOException.class); + throwIfUnchecked(throwable); + throw new RuntimeException("Failed to list directory: " + path, throwable); + } + + if (cachedValueHolder.isFullyCached()) { + return new SimpleRemoteIterator(cachedValueHolder.getCachedFiles()); + } + + return cachingRemoteIterator(cachedValueHolder, path); + } + + @Override + public void invalidate(Table table) + { + if (isLocationPresent(table.getStorage())) { + if (table.getPartitionColumns().isEmpty()) { + cache.invalidate(new Path(table.getStorage().getLocation())); + } + else { + // a partitioned table can have multiple paths in cache + cache.invalidateAll(); + } + } + delegate.invalidate(table); + } + + @Override + public void invalidate(Partition partition) + { + if (isLocationPresent(partition.getStorage())) { + cache.invalidate(new Path(partition.getStorage().getLocation())); + } + delegate.invalidate(partition); + } + + private RemoteIterator cachingRemoteIterator(FetchingValueHolder cachedValueHolder, Path path) + { + return new RemoteIterator<>() + { + private int fileIndex; + + @Override + public boolean hasNext() + throws IOException + { + try { + boolean hasNext = cachedValueHolder.getCachedFile(fileIndex).isPresent(); + // Update cache weight of cachedValueHolder for a given path. + // The cachedValueHolder acts as an invalidation guard. If a cache invalidation happens while this iterator goes over + // the files from the specified path, the eventually outdated file listing will not be added anymore to the cache. + cache.asMap().replace(path, cachedValueHolder, cachedValueHolder); + return hasNext; + } + catch (Exception exception) { + // invalidate cached value to force retry of directory listing + cache.invalidate(path); + throw exception; + } + } + + @Override + public LocatedFileStatus next() + throws IOException + { + // force cache entry weight update in case next file is cached + checkState(hasNext()); + return cachedValueHolder.getCachedFile(fileIndex++).orElseThrow(); + } + }; + } + + @VisibleForTesting + boolean isCached(Path path) + { + FetchingValueHolder cached = cache.getIfPresent(path); + return cached != null && cached.isFullyCached(); + } + + private static boolean isLocationPresent(Storage storage) + { + // Some Hive table types (e.g.: views) do not have a storage location + return storage.getOptionalLocation().isPresent() && isNotEmpty(storage.getLocation()); + } + + private static class FetchingValueHolder + { + private final List cachedFiles = synchronizedList(new ArrayList<>()); + @GuardedBy("this") + @Nullable + private RemoteIterator fileIterator; + @GuardedBy("this") + @Nullable + private Exception exception; + + public FetchingValueHolder(RemoteIterator fileIterator) + { + this.fileIterator = requireNonNull(fileIterator, "fileIterator is null"); + } + + public synchronized boolean isFullyCached() + { + return fileIterator == null && exception == null; + } + + public int getCachedFilesSize() + { + return cachedFiles.size(); + } + + public Iterator getCachedFiles() + { + checkState(isFullyCached()); + return cachedFiles.iterator(); + } + + public Optional getCachedFile(int index) + throws IOException + { + int filesSize = cachedFiles.size(); + checkArgument(index >= 0 && index <= filesSize, "File index (%s) out of bounds [0, %s]", index, filesSize); + + // avoid fileIterator synchronization (and thus blocking) for already cached files + if (index < filesSize) { + return Optional.of(cachedFiles.get(index)); + } + + return fetchNextCachedFile(index); + } + + private synchronized Optional fetchNextCachedFile(int index) + throws IOException + { + if (exception != null) { + throw new IOException("Exception while listing directory", exception); + } + + if (index < cachedFiles.size()) { + // file was fetched concurrently + return Optional.of(cachedFiles.get(index)); + } + + try { + if (fileIterator == null || !fileIterator.hasNext()) { + // no more files + fileIterator = null; + return Optional.empty(); + } + + LocatedFileStatus fileStatus = fileIterator.next(); + cachedFiles.add(fileStatus); + return Optional.of(fileStatus); + } + catch (Exception exception) { + fileIterator = null; + this.exception = exception; + throw exception; + } + } + } +} diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java index 36d7270d06c9..61f6e8a1933f 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java @@ -32,7 +32,7 @@ import io.trino.plugin.hive.authentication.NoHdfsAuthentication; import io.trino.plugin.hive.azure.HiveAzureConfig; import io.trino.plugin.hive.azure.TrinoAzureConfigurationInitializer; -import io.trino.plugin.hive.fs.CachingDirectoryLister; +import io.trino.plugin.hive.fs.DirectoryLister; import io.trino.plugin.hive.gcs.GoogleGcsConfigurationInitializer; import io.trino.plugin.hive.gcs.HiveGcsConfig; import io.trino.plugin.hive.metastore.Column; @@ -136,7 +136,9 @@ import io.trino.type.BlockTypeOperators; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.metastore.TableType; import org.joda.time.DateTime; import org.testng.annotations.AfterClass; @@ -162,6 +164,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; @@ -644,6 +647,7 @@ private static RowType toRowType(List columns) protected HdfsEnvironment hdfsEnvironment; protected LocationService locationService; + protected CountingDirectoryLister countingDirectoryLister; protected HiveMetadataFactory metadataFactory; protected HiveTransactionManager transactionManager; protected HiveMetastore metastoreClient; @@ -810,6 +814,7 @@ protected final void setup(String databaseName, HiveConfig hiveConfig, HiveMetas HivePartitionManager partitionManager = new HivePartitionManager(hiveConfig); locationService = new HiveLocationService(hdfsEnvironment); JsonCodec partitionUpdateCodec = JsonCodec.jsonCodec(PartitionUpdate.class); + countingDirectoryLister = new CountingDirectoryLister(); metadataFactory = new HiveMetadataFactory( new CatalogName("hive"), HiveMetastoreFactory.ofInstance(metastoreClient), @@ -867,14 +872,14 @@ public Optional getMaterializedView(Connect }, SqlStandardAccessControlMetadata::new, NO_REDIRECTIONS, - TableInvalidationCallback.NOOP); + countingDirectoryLister, + 1000); transactionManager = new HiveTransactionManager(metadataFactory); splitManager = new HiveSplitManager( transactionManager, partitionManager, new NamenodeStats(), hdfsEnvironment, - new CachingDirectoryLister(hiveConfig), directExecutor(), new CounterStat(), 100, @@ -1496,6 +1501,48 @@ public void testGetPartitionSplitsBatchUnpartitioned() } } + @Test + public void testPerTransactionDirectoryListerCache() + throws Exception + { + long initListCount = countingDirectoryLister.getListCount(); + SchemaTableName tableName = temporaryTable("per_transaction_listing_cache_test"); + List columns = ImmutableList.of(new Column("test", HIVE_STRING, Optional.empty())); + createEmptyTable(tableName, ORC, columns, ImmutableList.of()); + try { + try (Transaction transaction = newTransaction()) { + ConnectorMetadata metadata = transaction.getMetadata(); + ConnectorSession session = newSession(); + metadata.beginQuery(session); + + ConnectorTableHandle tableHandle = getTableHandle(metadata, tableName); + getSplits(splitManager, transaction, session, tableHandle); + + // directory should be listed initially + assertEquals(countingDirectoryLister.getListCount(), initListCount + 1); + + // directory content should be cached + getSplits(splitManager, transaction, session, tableHandle); + assertEquals(countingDirectoryLister.getListCount(), initListCount + 1); + } + + try (Transaction transaction = newTransaction()) { + ConnectorMetadata metadata = transaction.getMetadata(); + ConnectorSession session = newSession(); + metadata.beginQuery(session); + + ConnectorTableHandle tableHandle = getTableHandle(metadata, tableName); + getSplits(splitManager, transaction, session, tableHandle); + + // directory should be listed again in new transaction + assertEquals(countingDirectoryLister.getListCount(), initListCount + 2); + } + } + finally { + dropTable(tableName); + } + } + @Test(expectedExceptions = TableNotFoundException.class) public void testGetPartitionSplitsBatchInvalidTable() { @@ -5909,4 +5956,33 @@ public void verifyAndCleanup(ConnectorSession session, SchemaTableName tableName assertFalse(hdfsEnvironment.getFileSystem(context, path).exists(path)); } } + + private static class CountingDirectoryLister + implements DirectoryLister + { + private final AtomicInteger listCount = new AtomicInteger(); + + @Override + public RemoteIterator list(FileSystem fs, Table table, Path path) + throws IOException + { + listCount.incrementAndGet(); + return fs.listLocatedStatus(path); + } + + public int getListCount() + { + return listCount.get(); + } + + @Override + public void invalidate(Partition partition) + { + } + + @Override + public void invalidate(Table table) + { + } + } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java index f723d65be082..39dbd8fe07ac 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java @@ -27,7 +27,7 @@ import io.trino.plugin.hive.HdfsEnvironment.HdfsContext; import io.trino.plugin.hive.authentication.HiveIdentity; import io.trino.plugin.hive.authentication.NoHdfsAuthentication; -import io.trino.plugin.hive.fs.CachingDirectoryLister; +import io.trino.plugin.hive.fs.FileSystemDirectoryLister; import io.trino.plugin.hive.metastore.Column; import io.trino.plugin.hive.metastore.Database; import io.trino.plugin.hive.metastore.ForwardingHiveMetastore; @@ -222,14 +222,13 @@ protected void setup(String host, int port, String databaseName, boolean s3Selec new DefaultHiveMaterializedViewMetadataFactory(), SqlStandardAccessControlMetadata::new, NO_REDIRECTIONS, - TableInvalidationCallback.NOOP); + new FileSystemDirectoryLister()); transactionManager = new HiveTransactionManager(metadataFactory); splitManager = new HiveSplitManager( transactionManager, hivePartitionManager, new NamenodeStats(), hdfsEnvironment, - new CachingDirectoryLister(new HiveConfig()), new BoundedExecutor(executor, config.getMaxSplitIteratorThreads()), new CounterStat(), config.getMaxOutstandingSplits(), diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java index 383b8a712763..ad6f919af633 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java @@ -20,7 +20,7 @@ import io.airlift.log.Logging; import io.trino.Session; import io.trino.metadata.QualifiedObjectName; -import io.trino.plugin.hive.fs.CachingDirectoryLister; +import io.trino.plugin.hive.fs.DirectoryLister; import io.trino.plugin.hive.metastore.Database; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.MetastoreConfig; @@ -114,7 +114,7 @@ public static class Builder> .setMetastoreUser("test")); }; private Module module = EMPTY_MODULE; - private Optional cachingDirectoryLister = Optional.empty(); + private Optional directoryLister = Optional.empty(); private boolean tpcdsCatalogEnabled; private String security = SQL_STANDARD; private ColumnNaming tpchColumnNaming = SIMPLIFIED; @@ -179,9 +179,9 @@ public SELF setModule(Module module) return self(); } - public SELF setCachingDirectoryLister(CachingDirectoryLister cachingDirectoryLister) + public SELF setDirectoryLister(DirectoryLister directoryLister) { - this.cachingDirectoryLister = Optional.ofNullable(cachingDirectoryLister); + this.directoryLister = Optional.ofNullable(directoryLister); return self(); } @@ -231,7 +231,7 @@ public DistributedQueryRunner build() } HiveMetastore metastore = this.metastore.apply(queryRunner); - queryRunner.installPlugin(new TestingHivePlugin(Optional.of(metastore), module, cachingDirectoryLister)); + queryRunner.installPlugin(new TestingHivePlugin(Optional.of(metastore), module, directoryLister)); Map hiveProperties = new HashMap<>(); if (!skipTimezoneSetup) { diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java index 8d3cb0cd33da..a780ab94dadf 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java @@ -96,6 +96,7 @@ public void testDefaults() .setFileStatusCacheExpireAfterWrite(new Duration(1, TimeUnit.MINUTES)) .setFileStatusCacheMaxSize(1000 * 1000) .setFileStatusCacheTables("") + .setPerTransactionFileStatusCacheMaximumSize(1000 * 1000) .setTranslateHiveViews(false) .setHiveTransactionHeartbeatInterval(null) .setHiveTransactionHeartbeatThreads(5) @@ -176,6 +177,7 @@ public void testExplicitPropertyMappings() .put("hive.file-status-cache-tables", "foo.bar1, foo.bar2") .put("hive.file-status-cache-size", "1000") .put("hive.file-status-cache-expire-time", "30m") + .put("hive.per-transaction-file-status-cache-maximum-size", "42") .put("hive.translate-hive-views", "true") .put("hive.transaction-heartbeat-interval", "10s") .put("hive.transaction-heartbeat-threads", "10") @@ -253,6 +255,7 @@ public void testExplicitPropertyMappings() .setFileStatusCacheTables("foo.bar1,foo.bar2") .setFileStatusCacheMaxSize(1000) .setFileStatusCacheExpireAfterWrite(new Duration(30, TimeUnit.MINUTES)) + .setPerTransactionFileStatusCacheMaximumSize(42) .setTranslateHiveViews(true) .setHiveTransactionHeartbeatInterval(new Duration(10, TimeUnit.SECONDS)) .setHiveTransactionHeartbeatThreads(10) diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHiveConnectorFactory.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHiveConnectorFactory.java index 1424253e7bc7..9974fd40b682 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHiveConnectorFactory.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHiveConnectorFactory.java @@ -14,7 +14,7 @@ package io.trino.plugin.hive; import com.google.inject.Module; -import io.trino.plugin.hive.fs.CachingDirectoryLister; +import io.trino.plugin.hive.fs.DirectoryLister; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.spi.connector.Connector; import io.trino.spi.connector.ConnectorContext; @@ -32,18 +32,18 @@ public class TestingHiveConnectorFactory { private final Optional metastore; private final Module module; - private final Optional cachingDirectoryLister; + private final Optional directoryLister; public TestingHiveConnectorFactory(HiveMetastore metastore) { this(Optional.of(metastore), EMPTY_MODULE, Optional.empty()); } - public TestingHiveConnectorFactory(Optional metastore, Module module, Optional cachingDirectoryLister) + public TestingHiveConnectorFactory(Optional metastore, Module module, Optional directoryLister) { this.metastore = requireNonNull(metastore, "metastore is null"); this.module = requireNonNull(module, "module is null"); - this.cachingDirectoryLister = requireNonNull(cachingDirectoryLister, "cachingDirectoryLister is null"); + this.directoryLister = requireNonNull(directoryLister, "directoryLister is null"); } @Override @@ -55,6 +55,6 @@ public String getName() @Override public Connector create(String catalogName, Map config, ConnectorContext context) { - return createConnector(catalogName, config, context, module, metastore, cachingDirectoryLister); + return createConnector(catalogName, config, context, module, metastore, directoryLister); } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHivePlugin.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHivePlugin.java index 2770369572b1..adc6ad833f5e 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHivePlugin.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHivePlugin.java @@ -15,7 +15,7 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Module; -import io.trino.plugin.hive.fs.CachingDirectoryLister; +import io.trino.plugin.hive.fs.DirectoryLister; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.spi.Plugin; import io.trino.spi.connector.ConnectorFactory; @@ -30,7 +30,7 @@ public class TestingHivePlugin { private final Optional metastore; private final Module module; - private final Optional cachingDirectoryLister; + private final Optional directoryLister; public TestingHivePlugin() { @@ -42,16 +42,16 @@ public TestingHivePlugin(HiveMetastore metastore) this(Optional.of(metastore), EMPTY_MODULE, Optional.empty()); } - public TestingHivePlugin(Optional metastore, Module module, Optional cachingDirectoryLister) + public TestingHivePlugin(Optional metastore, Module module, Optional directoryLister) { this.metastore = requireNonNull(metastore, "metastore is null"); this.module = requireNonNull(module, "module is null"); - this.cachingDirectoryLister = requireNonNull(cachingDirectoryLister, "cachingDirectoryLister is null"); + this.directoryLister = requireNonNull(directoryLister, "directoryLister is null"); } @Override public Iterable getConnectorFactories() { - return ImmutableList.of(new TestingHiveConnectorFactory(metastore, module, cachingDirectoryLister)); + return ImmutableList.of(new TestingHiveConnectorFactory(metastore, module, directoryLister)); } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/BaseCachingDirectoryListerTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/BaseCachingDirectoryListerTest.java new file mode 100644 index 000000000000..e3029633daab --- /dev/null +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/BaseCachingDirectoryListerTest.java @@ -0,0 +1,350 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.fs; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.trino.plugin.hive.HiveQueryRunner; +import io.trino.plugin.hive.metastore.Table; +import io.trino.plugin.hive.metastore.file.FileHiveMetastore; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.MaterializedRow; +import io.trino.testing.QueryRunner; +import org.testng.annotations.Test; + +import java.nio.file.Path; +import java.util.List; +import java.util.NoSuchElementException; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static io.trino.plugin.hive.HiveQueryRunner.TPCH_SCHEMA; +import static io.trino.plugin.hive.metastore.file.FileHiveMetastore.createTestingFileHiveMetastore; +import static java.lang.String.format; +import static java.nio.file.Files.createTempDirectory; +import static org.assertj.core.api.Assertions.assertThat; + +public abstract class BaseCachingDirectoryListerTest + extends AbstractTestQueryFramework +{ + private C directoryLister; + private FileHiveMetastore fileHiveMetastore; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + Path temporaryMetastoreDirectory = createTempDirectory(null); + closeAfterClass(() -> deleteRecursively(temporaryMetastoreDirectory, ALLOW_INSECURE)); + directoryLister = createDirectoryLister(); + return HiveQueryRunner.builder() + .setHiveProperties(ImmutableMap.of( + "hive.allow-register-partition-procedure", "true")) + .setMetastore(distributedQueryRunner -> fileHiveMetastore = createTestingFileHiveMetastore(temporaryMetastoreDirectory.toFile())) + .setDirectoryLister(directoryLister) + .build(); + } + + protected abstract C createDirectoryLister(); + + protected abstract boolean isCached(C directoryLister, org.apache.hadoop.fs.Path path); + + @Test + public void testCacheInvalidationIsAppliedSpecificallyOnTheNonPartitionedTableBeingChanged() + { + assertUpdate("CREATE TABLE partial_cache_invalidation_table1 (col1 int) WITH (format = 'ORC')"); + assertUpdate("INSERT INTO partial_cache_invalidation_table1 VALUES (1), (2), (3)", 3); + // The listing for the invalidate_non_partitioned_table1 should be in the directory cache after this call + assertQuery("SELECT sum(col1) FROM partial_cache_invalidation_table1", "VALUES (6)"); + org.apache.hadoop.fs.Path cachedTable1Location = getTableLocation(TPCH_SCHEMA, "partial_cache_invalidation_table1"); + assertThat(isCached(cachedTable1Location)).isTrue(); + + assertUpdate("CREATE TABLE partial_cache_invalidation_table2 (col1 int) WITH (format = 'ORC')"); + assertUpdate("INSERT INTO partial_cache_invalidation_table2 VALUES (11), (12)", 2); + // The listing for the invalidate_non_partitioned_table2 should be in the directory cache after this call + assertQuery("SELECT sum(col1) FROM partial_cache_invalidation_table2", "VALUES (23)"); + org.apache.hadoop.fs.Path cachedTable2Location = getTableLocation(TPCH_SCHEMA, "partial_cache_invalidation_table2"); + assertThat(isCached(cachedTable2Location)).isTrue(); + + assertUpdate("INSERT INTO partial_cache_invalidation_table1 VALUES (4), (5)", 2); + // Inserting into the invalidate_non_partitioned_table1 should invalidate only the cached listing of the files belonging only to this table. + assertThat(isCached(cachedTable1Location)).isFalse(); + assertThat(isCached(cachedTable2Location)).isTrue(); + + assertQuery("SELECT sum(col1) FROM partial_cache_invalidation_table1", "VALUES (15)"); + assertQuery("SELECT sum(col1) FROM partial_cache_invalidation_table2", "VALUES (23)"); + + assertUpdate("DROP TABLE partial_cache_invalidation_table1"); + assertUpdate("DROP TABLE partial_cache_invalidation_table2"); + } + + @Test + public void testCacheInvalidationIsAppliedOnTheEntireCacheOnPartitionedTableDrop() + { + assertUpdate("CREATE TABLE full_cache_invalidation_non_partitioned_table (col1 int) WITH (format = 'ORC')"); + assertUpdate("INSERT INTO full_cache_invalidation_non_partitioned_table VALUES (1), (2), (3)", 3); + // The listing for the invalidate_non_partitioned_table1 should be in the directory cache after this call + assertQuery("SELECT sum(col1) FROM full_cache_invalidation_non_partitioned_table", "VALUES (6)"); + org.apache.hadoop.fs.Path nonPartitionedTableLocation = getTableLocation(TPCH_SCHEMA, "full_cache_invalidation_non_partitioned_table"); + assertThat(isCached(nonPartitionedTableLocation)).isTrue(); + + assertUpdate("CREATE TABLE full_cache_invalidation_partitioned_table (col1 int, col2 varchar) WITH (format = 'ORC', partitioned_by = ARRAY['col2'])"); + assertUpdate("INSERT INTO full_cache_invalidation_partitioned_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2')", 4); + assertQuery("SELECT col2, sum(col1) FROM full_cache_invalidation_partitioned_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7)"); + org.apache.hadoop.fs.Path partitionedTableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "full_cache_invalidation_partitioned_table", ImmutableList.of("group1")); + org.apache.hadoop.fs.Path partitionedTableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "full_cache_invalidation_partitioned_table", ImmutableList.of("group2")); + assertThat(isCached(partitionedTableGroup1PartitionLocation)).isTrue(); + assertThat(isCached(partitionedTableGroup2PartitionLocation)).isTrue(); + + assertUpdate("INSERT INTO full_cache_invalidation_non_partitioned_table VALUES (4), (5)", 2); + // Inserting into the invalidate_non_partitioned_table1 should invalidate only the cached listing of the files belonging only to this table. + assertThat(isCached(nonPartitionedTableLocation)).isFalse(); + assertThat(isCached(partitionedTableGroup1PartitionLocation)).isTrue(); + assertThat(isCached(partitionedTableGroup2PartitionLocation)).isTrue(); + + assertUpdate("DROP TABLE full_cache_invalidation_partitioned_table"); + // Invalidation of the partitioned table causes the full invalidation of the cache + assertThat(isCached(nonPartitionedTableLocation)).isFalse(); + assertThat(isCached(partitionedTableGroup1PartitionLocation)).isFalse(); + assertThat(isCached(partitionedTableGroup2PartitionLocation)).isFalse(); + + assertQuery("SELECT sum(col1) FROM full_cache_invalidation_non_partitioned_table", "VALUES (15)"); + + assertUpdate("DROP TABLE full_cache_invalidation_non_partitioned_table"); + } + + @Test + public void testCacheInvalidationIsAppliedSpecificallyOnPartitionDropped() + { + assertUpdate("CREATE TABLE partition_path_cache_invalidation_non_partitioned_table (col1 int) WITH (format = 'ORC')"); + assertUpdate("INSERT INTO partition_path_cache_invalidation_non_partitioned_table VALUES (1), (2), (3)", 3); + // The listing for the invalidate_non_partitioned_table1 should be in the directory cache after this call + assertQuery("SELECT sum(col1) FROM partition_path_cache_invalidation_non_partitioned_table", "VALUES (6)"); + org.apache.hadoop.fs.Path nonPartitionedTableLocation = getTableLocation(TPCH_SCHEMA, "partition_path_cache_invalidation_non_partitioned_table"); + assertThat(isCached(nonPartitionedTableLocation)).isTrue(); + + assertUpdate("CREATE TABLE partition_path_cache_invalidation_partitioned_table (col1 int, col2 varchar) WITH (format = 'ORC', partitioned_by = ARRAY['col2'])"); + assertUpdate("INSERT INTO partition_path_cache_invalidation_partitioned_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2')", 4); + assertQuery("SELECT col2, sum(col1) FROM partition_path_cache_invalidation_partitioned_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7)"); + org.apache.hadoop.fs.Path partitionedTableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "partition_path_cache_invalidation_partitioned_table", ImmutableList.of("group1")); + org.apache.hadoop.fs.Path partitionedTableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "partition_path_cache_invalidation_partitioned_table", ImmutableList.of("group2")); + assertThat(isCached(partitionedTableGroup1PartitionLocation)).isTrue(); + assertThat(isCached(partitionedTableGroup2PartitionLocation)).isTrue(); + + assertUpdate("DELETE FROM partition_path_cache_invalidation_partitioned_table WHERE col2='group1'"); + assertThat(isCached(nonPartitionedTableLocation)).isTrue(); + assertThat(isCached(partitionedTableGroup1PartitionLocation)).isFalse(); + assertThat(isCached(partitionedTableGroup2PartitionLocation)).isTrue(); + + assertQuery("SELECT sum(col1) FROM partition_path_cache_invalidation_non_partitioned_table", "VALUES (6)"); + assertQuery("SELECT col2, sum(col1) FROM partition_path_cache_invalidation_partitioned_table GROUP BY col2", "VALUES ('group2', 7)"); + + assertUpdate("DROP TABLE partition_path_cache_invalidation_non_partitioned_table"); + assertUpdate("DROP TABLE partition_path_cache_invalidation_partitioned_table"); + } + + @Test + public void testInsertIntoNonPartitionedTable() + { + assertUpdate("CREATE TABLE insert_into_non_partitioned_table (col1 int) WITH (format = 'ORC')"); + assertUpdate("INSERT INTO insert_into_non_partitioned_table VALUES (1), (2), (3)", 3); + // The listing for the table should be in the directory cache after this call + assertQuery("SELECT sum(col1) FROM insert_into_non_partitioned_table", "VALUES (6)"); + assertThat(isCached(getTableLocation(TPCH_SCHEMA, "insert_into_non_partitioned_table"))).isTrue(); + assertUpdate("INSERT INTO insert_into_non_partitioned_table VALUES (4), (5)", 2); + // Inserting into the table should invalidate the cached listing of the files belonging to the table. + assertThat(isCached(getTableLocation(TPCH_SCHEMA, "insert_into_non_partitioned_table"))).isFalse(); + + assertQuery("SELECT sum(col1) FROM insert_into_non_partitioned_table", "VALUES (15)"); + + assertUpdate("DROP TABLE insert_into_non_partitioned_table"); + } + + @Test + public void testInsertIntoPartitionedTable() + { + assertUpdate("CREATE TABLE insert_into_partitioned_table (col1 int, col2 varchar) WITH (format = 'ORC', partitioned_by = ARRAY['col2'])"); + assertUpdate("INSERT INTO insert_into_partitioned_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2')", 4); + // The listing for the table partitions should be in the directory cache after this call + assertQuery("SELECT col2, sum(col1) FROM insert_into_partitioned_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7)"); + org.apache.hadoop.fs.Path tableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "insert_into_partitioned_table", ImmutableList.of("group1")); + org.apache.hadoop.fs.Path tableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "insert_into_partitioned_table", ImmutableList.of("group2")); + assertThat(isCached(tableGroup1PartitionLocation)).isTrue(); + assertThat(isCached(tableGroup2PartitionLocation)).isTrue(); + + assertUpdate("INSERT INTO insert_into_partitioned_table VALUES (5, 'group2'), (6, 'group3')", 2); + assertThat(isCached(tableGroup1PartitionLocation)).isTrue(); + // Inserting into the table should invalidate the cached listing of the partitions affected by the insert statement + assertThat(isCached(tableGroup2PartitionLocation)).isFalse(); + assertQuery("SELECT col2, sum(col1) FROM insert_into_partitioned_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 12), ('group3', 6)"); + + assertUpdate("DROP TABLE insert_into_partitioned_table"); + } + + @Test + public void testDropPartition() + { + assertUpdate("CREATE TABLE delete_from_partitioned_table (col1 int, col2 varchar) WITH (format = 'ORC', partitioned_by = ARRAY['col2'])"); + assertUpdate("INSERT INTO delete_from_partitioned_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2'), (5, 'group3')", 5); + // The listing for the table partitions should be in the directory cache after this call + assertQuery("SELECT col2, sum(col1) FROM delete_from_partitioned_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7), ('group3', 5)"); + org.apache.hadoop.fs.Path tableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("group1")); + org.apache.hadoop.fs.Path tableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("group2")); + org.apache.hadoop.fs.Path tableGroup3PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("group3")); + assertThat(isCached(tableGroup1PartitionLocation)).isTrue(); + assertThat(isCached(tableGroup2PartitionLocation)).isTrue(); + assertUpdate("DELETE FROM delete_from_partitioned_table WHERE col2 = 'group1' OR col2 = 'group2'"); + // Deleting from the table should invalidate the cached listing of the partitions dropped from the table. + assertThat(isCached(tableGroup1PartitionLocation)).isFalse(); + assertThat(isCached(tableGroup2PartitionLocation)).isFalse(); + assertThat(isCached(tableGroup3PartitionLocation)).isTrue(); + assertQuery("SELECT col2, sum(col1) FROM delete_from_partitioned_table GROUP BY col2", "VALUES ('group3', 5)"); + + assertUpdate("DROP TABLE delete_from_partitioned_table"); + } + + @Test + public void testDropMultiLevelPartition() + { + assertUpdate("CREATE TABLE delete_from_partitioned_table (clicks bigint, day date, country varchar) WITH (format = 'ORC', partitioned_by = ARRAY['day', 'country'])"); + assertUpdate("INSERT INTO delete_from_partitioned_table VALUES (1000, DATE '2022-02-01', 'US'), (2000, DATE '2022-02-01', 'US'), (4000, DATE '2022-02-02', 'US'), (1500, DATE '2022-02-01', 'AT'), (2500, DATE '2022-02-02', 'AT')", 5); + // The listing for the table partitions should be in the directory cache after this call + assertQuery("SELECT day, country, sum(clicks) FROM delete_from_partitioned_table GROUP BY day, country", "VALUES (DATE '2022-02-01', 'US', 3000), (DATE '2022-02-02', 'US', 4000), (DATE '2022-02-01', 'AT', 1500), (DATE '2022-02-02', 'AT', 2500)"); + org.apache.hadoop.fs.Path table20220201UsPartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("2022-02-01", "US")); + org.apache.hadoop.fs.Path table20220202UsPartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("2022-02-02", "US")); + org.apache.hadoop.fs.Path table20220201AtPartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("2022-02-01", "AT")); + org.apache.hadoop.fs.Path table20220202AtPartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("2022-02-02", "AT")); + assertThat(isCached(table20220201UsPartitionLocation)).isTrue(); + assertThat(isCached(table20220202UsPartitionLocation)).isTrue(); + assertThat(isCached(table20220201AtPartitionLocation)).isTrue(); + assertThat(isCached(table20220202AtPartitionLocation)).isTrue(); + assertUpdate("DELETE FROM delete_from_partitioned_table WHERE day = DATE '2022-02-01'"); + // Deleting from the table should invalidate the cached listing of the partitions dropped from the table. + assertThat(isCached(table20220201UsPartitionLocation)).isFalse(); + assertThat(isCached(table20220202UsPartitionLocation)).isTrue(); + assertThat(isCached(table20220201AtPartitionLocation)).isFalse(); + assertThat(isCached(table20220202AtPartitionLocation)).isTrue(); + assertUpdate("DELETE FROM delete_from_partitioned_table WHERE country = 'US'"); + assertThat(isCached(table20220202UsPartitionLocation)).isFalse(); + assertThat(isCached(table20220202AtPartitionLocation)).isTrue(); + assertQuery("SELECT day, country, sum(clicks) FROM delete_from_partitioned_table GROUP BY day, country", "VALUES (DATE '2022-02-02', 'AT', 2500)"); + + assertUpdate("DROP TABLE delete_from_partitioned_table"); + } + + @Test + public void testUnregisterRegisterPartition() + { + assertUpdate("CREATE TABLE register_unregister_partition_table (col1 int, col2 varchar) WITH (format = 'ORC', partitioned_by = ARRAY['col2'])"); + assertUpdate("INSERT INTO register_unregister_partition_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2')", 4); + // The listing for the table partitions should be in the directory cache after this call + assertQuery("SELECT col2, sum(col1) FROM register_unregister_partition_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7)"); + org.apache.hadoop.fs.Path tableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "register_unregister_partition_table", ImmutableList.of("group1")); + org.apache.hadoop.fs.Path tableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "register_unregister_partition_table", ImmutableList.of("group2")); + assertThat(isCached(tableGroup1PartitionLocation)).isTrue(); + assertThat(isCached(tableGroup2PartitionLocation)).isTrue(); + + List paths = getQueryRunner().execute(getSession(), "SELECT \"$path\" FROM register_unregister_partition_table WHERE col2 = 'group1' LIMIT 1").toTestTypes().getMaterializedRows(); + String group1PartitionPath = new org.apache.hadoop.fs.Path((String) paths.get(0).getField(0)).getParent().toString(); + + assertUpdate(format("CALL system.unregister_partition('%s', '%s', ARRAY['col2'], ARRAY['group1'])", TPCH_SCHEMA, "register_unregister_partition_table")); + // Unregistering the partition in the table should invalidate the cached listing of all the partitions belonging to the table. + assertThat(isCached(tableGroup1PartitionLocation)).isFalse(); + assertThat(isCached(tableGroup2PartitionLocation)).isTrue(); + assertQuery("SELECT col2, sum(col1) FROM register_unregister_partition_table GROUP BY col2", "VALUES ('group2', 7)"); + assertThat(isCached(tableGroup2PartitionLocation)).isTrue(); + + assertUpdate(format("CALL system.register_partition('%s', '%s', ARRAY['col2'], ARRAY['group1'], '%s')", TPCH_SCHEMA, "register_unregister_partition_table", group1PartitionPath)); + // Registering the partition in the table should invalidate the cached listing of all the partitions belonging to the table. + assertThat(isCached(tableGroup1PartitionLocation)).isFalse(); + assertThat(isCached(tableGroup2PartitionLocation)).isTrue(); + + assertQuery("SELECT col2, sum(col1) FROM register_unregister_partition_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7)"); + + assertUpdate("DROP TABLE register_unregister_partition_table"); + } + + @Test + public void testRenameTable() + { + assertUpdate("CREATE TABLE table_to_be_renamed (col1 int) WITH (format = 'ORC')"); + assertUpdate("INSERT INTO table_to_be_renamed VALUES (1), (2), (3)", 3); + // The listing for the table should be in the directory cache after this call + assertQuery("SELECT sum(col1) FROM table_to_be_renamed", "VALUES (6)"); + org.apache.hadoop.fs.Path tableLocation = getTableLocation(TPCH_SCHEMA, "table_to_be_renamed"); + assertThat(isCached(tableLocation)).isTrue(); + assertUpdate("ALTER TABLE table_to_be_renamed RENAME TO table_renamed"); + // Altering the table should invalidate the cached listing of the files belonging to the table. + assertThat(isCached(tableLocation)).isFalse(); + + assertUpdate("DROP TABLE table_renamed"); + } + + @Test + public void testDropTable() + { + assertUpdate("CREATE TABLE table_to_be_dropped (col1 int) WITH (format = 'ORC')"); + assertUpdate("INSERT INTO table_to_be_dropped VALUES (1), (2), (3)", 3); + // The listing for the table should be in the directory cache after this call + assertQuery("SELECT sum(col1) FROM table_to_be_dropped", "VALUES (6)"); + org.apache.hadoop.fs.Path tableLocation = getTableLocation(TPCH_SCHEMA, "table_to_be_dropped"); + assertThat(isCached(tableLocation)).isTrue(); + assertUpdate("DROP TABLE table_to_be_dropped"); + // Dropping the table should invalidate the cached listing of the files belonging to the table. + assertThat(isCached(tableLocation)).isFalse(); + } + + @Test + public void testDropPartitionedTable() + { + assertUpdate("CREATE TABLE drop_partitioned_table (col1 int, col2 varchar) WITH (format = 'ORC', partitioned_by = ARRAY['col2'])"); + assertUpdate("INSERT INTO drop_partitioned_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2'), (5, 'group3')", 5); + // The listing for the table partitions should be in the directory cache after this call + assertQuery("SELECT col2, sum(col1) FROM drop_partitioned_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7), ('group3', 5)"); + org.apache.hadoop.fs.Path tableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "drop_partitioned_table", ImmutableList.of("group1")); + org.apache.hadoop.fs.Path tableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "drop_partitioned_table", ImmutableList.of("group2")); + org.apache.hadoop.fs.Path tableGroup3PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "drop_partitioned_table", ImmutableList.of("group3")); + assertThat(isCached(tableGroup1PartitionLocation)).isTrue(); + assertThat(isCached(tableGroup2PartitionLocation)).isTrue(); + assertThat(isCached(tableGroup3PartitionLocation)).isTrue(); + assertUpdate("DROP TABLE drop_partitioned_table"); + assertThat(isCached(tableGroup1PartitionLocation)).isFalse(); + assertThat(isCached(tableGroup2PartitionLocation)).isFalse(); + assertThat(isCached(tableGroup3PartitionLocation)).isFalse(); + } + + private org.apache.hadoop.fs.Path getTableLocation(String schemaName, String tableName) + { + return fileHiveMetastore.getTable(schemaName, tableName) + .map(table -> table.getStorage().getLocation()) + .map(tableLocation -> new org.apache.hadoop.fs.Path(tableLocation)) + .orElseThrow(() -> new NoSuchElementException(format("The table %s.%s could not be found", schemaName, tableName))); + } + + private org.apache.hadoop.fs.Path getPartitionLocation(String schemaName, String tableName, List partitionValues) + { + Table table = fileHiveMetastore.getTable(schemaName, tableName) + .orElseThrow(() -> new NoSuchElementException(format("The table %s.%s could not be found", schemaName, tableName))); + + return fileHiveMetastore.getPartition(table, partitionValues) + .map(partition -> partition.getStorage().getLocation()) + .map(partitionLocation -> new org.apache.hadoop.fs.Path(partitionLocation)) + .orElseThrow(() -> new NoSuchElementException(format("The partition %s from the table %s.%s could not be found", partitionValues, schemaName, tableName))); + } + + private boolean isCached(org.apache.hadoop.fs.Path path) + { + return isCached(directoryLister, path); + } +} diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/FileSystemDirectoryLister.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/FileSystemDirectoryLister.java new file mode 100644 index 000000000000..2a78d8dd4224 --- /dev/null +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/FileSystemDirectoryLister.java @@ -0,0 +1,44 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.fs; + +import io.trino.plugin.hive.metastore.Partition; +import io.trino.plugin.hive.metastore.Table; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; + +import java.io.IOException; + +public class FileSystemDirectoryLister + implements DirectoryLister +{ + @Override + public RemoteIterator list(FileSystem fs, Table table, Path path) + throws IOException + { + return fs.listLocatedStatus(path); + } + + @Override + public void invalidate(Partition partition) + { + } + + @Override + public void invalidate(Table table) + { + } +} diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestCachingDirectoryLister.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestCachingDirectoryLister.java index c50a8feef728..7e54d253a06a 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestCachingDirectoryLister.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestCachingDirectoryLister.java @@ -13,334 +13,34 @@ */ package io.trino.plugin.hive.fs; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import io.airlift.units.Duration; -import io.trino.plugin.hive.HiveQueryRunner; -import io.trino.plugin.hive.metastore.Table; -import io.trino.plugin.hive.metastore.file.FileHiveMetastore; -import io.trino.testing.AbstractTestQueryFramework; -import io.trino.testing.MaterializedRow; -import io.trino.testing.QueryRunner; +import org.apache.hadoop.fs.Path; import org.testng.annotations.Test; -import java.nio.file.Path; import java.util.List; -import java.util.NoSuchElementException; - -import static com.google.common.io.MoreFiles.deleteRecursively; -import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; -import static io.trino.plugin.hive.HiveQueryRunner.TPCH_SCHEMA; -import static io.trino.plugin.hive.metastore.file.FileHiveMetastore.createTestingFileHiveMetastore; -import static java.lang.String.format; -import static java.nio.file.Files.createTempDirectory; -import static org.assertj.core.api.Assertions.assertThat; // some tests may invalidate the whole cache affecting therefore other concurrent tests @Test(singleThreaded = true) public class TestCachingDirectoryLister - extends AbstractTestQueryFramework + extends BaseCachingDirectoryListerTest { - private CachingDirectoryLister cachingDirectoryLister; - private FileHiveMetastore fileHiveMetastore; - @Override - protected QueryRunner createQueryRunner() - throws Exception - { - Path temporaryMetastoreDirectory = createTempDirectory(null); - closeAfterClass(() -> deleteRecursively(temporaryMetastoreDirectory, ALLOW_INSECURE)); - - cachingDirectoryLister = new CachingDirectoryLister(Duration.valueOf("5m"), 1_000_000L, List.of("tpch.*")); - - return HiveQueryRunner.builder() - .setHiveProperties(ImmutableMap.of( - "hive.allow-register-partition-procedure", "true")) - .setMetastore(distributedQueryRunner -> fileHiveMetastore = createTestingFileHiveMetastore(temporaryMetastoreDirectory.toFile())) - .setCachingDirectoryLister(cachingDirectoryLister) - .build(); - } - - @Test - public void testCacheInvalidationIsAppliedSpecificallyOnTheNonPartitionedTableBeingChanged() - { - assertUpdate("CREATE TABLE partial_cache_invalidation_table1 (col1 int) WITH (format = 'ORC')"); - assertUpdate("INSERT INTO partial_cache_invalidation_table1 VALUES (1), (2), (3)", 3); - // The listing for the invalidate_non_partitioned_table1 should be in the directory cache after this call - assertQuery("SELECT sum(col1) FROM partial_cache_invalidation_table1", "VALUES (6)"); - org.apache.hadoop.fs.Path cachedTable1Location = getTableLocation(TPCH_SCHEMA, "partial_cache_invalidation_table1"); - assertThat(cachingDirectoryLister.isCached(cachedTable1Location)).isTrue(); - - assertUpdate("CREATE TABLE partial_cache_invalidation_table2 (col1 int) WITH (format = 'ORC')"); - assertUpdate("INSERT INTO partial_cache_invalidation_table2 VALUES (11), (12)", 2); - // The listing for the invalidate_non_partitioned_table2 should be in the directory cache after this call - assertQuery("SELECT sum(col1) FROM partial_cache_invalidation_table2", "VALUES (23)"); - org.apache.hadoop.fs.Path cachedTable2Location = getTableLocation(TPCH_SCHEMA, "partial_cache_invalidation_table2"); - assertThat(cachingDirectoryLister.isCached(cachedTable2Location)).isTrue(); - - assertUpdate("INSERT INTO partial_cache_invalidation_table1 VALUES (4), (5)", 2); - // Inserting into the invalidate_non_partitioned_table1 should invalidate only the cached listing of the files belonging only to this table. - assertThat(cachingDirectoryLister.isCached(cachedTable1Location)).isFalse(); - assertThat(cachingDirectoryLister.isCached(cachedTable2Location)).isTrue(); - - assertQuery("SELECT sum(col1) FROM partial_cache_invalidation_table1", "VALUES (15)"); - assertQuery("SELECT sum(col1) FROM partial_cache_invalidation_table2", "VALUES (23)"); - - assertUpdate("DROP TABLE partial_cache_invalidation_table1"); - assertUpdate("DROP TABLE partial_cache_invalidation_table2"); - } - - @Test - public void testCacheInvalidationIsAppliedOnTheEntireCacheOnPartitionedTableDrop() - { - assertUpdate("CREATE TABLE full_cache_invalidation_non_partitioned_table (col1 int) WITH (format = 'ORC')"); - assertUpdate("INSERT INTO full_cache_invalidation_non_partitioned_table VALUES (1), (2), (3)", 3); - // The listing for the invalidate_non_partitioned_table1 should be in the directory cache after this call - assertQuery("SELECT sum(col1) FROM full_cache_invalidation_non_partitioned_table", "VALUES (6)"); - org.apache.hadoop.fs.Path nonPartitionedTableLocation = getTableLocation(TPCH_SCHEMA, "full_cache_invalidation_non_partitioned_table"); - assertThat(cachingDirectoryLister.isCached(nonPartitionedTableLocation)).isTrue(); - - assertUpdate("CREATE TABLE full_cache_invalidation_partitioned_table (col1 int, col2 varchar) WITH (format = 'ORC', partitioned_by = ARRAY['col2'])"); - assertUpdate("INSERT INTO full_cache_invalidation_partitioned_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2')", 4); - assertQuery("SELECT col2, sum(col1) FROM full_cache_invalidation_partitioned_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7)"); - org.apache.hadoop.fs.Path partitionedTableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "full_cache_invalidation_partitioned_table", ImmutableList.of("group1")); - org.apache.hadoop.fs.Path partitionedTableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "full_cache_invalidation_partitioned_table", ImmutableList.of("group2")); - assertThat(cachingDirectoryLister.isCached(partitionedTableGroup1PartitionLocation)).isTrue(); - assertThat(cachingDirectoryLister.isCached(partitionedTableGroup2PartitionLocation)).isTrue(); - - assertUpdate("INSERT INTO full_cache_invalidation_non_partitioned_table VALUES (4), (5)", 2); - // Inserting into the invalidate_non_partitioned_table1 should invalidate only the cached listing of the files belonging only to this table. - assertThat(cachingDirectoryLister.isCached(nonPartitionedTableLocation)).isFalse(); - assertThat(cachingDirectoryLister.isCached(partitionedTableGroup1PartitionLocation)).isTrue(); - assertThat(cachingDirectoryLister.isCached(partitionedTableGroup2PartitionLocation)).isTrue(); - - assertUpdate("DROP TABLE full_cache_invalidation_partitioned_table"); - // Invalidation of the partitioned table causes the full invalidation of the cache - assertThat(cachingDirectoryLister.isCached(nonPartitionedTableLocation)).isFalse(); - assertThat(cachingDirectoryLister.isCached(partitionedTableGroup1PartitionLocation)).isFalse(); - assertThat(cachingDirectoryLister.isCached(partitionedTableGroup2PartitionLocation)).isFalse(); - - assertQuery("SELECT sum(col1) FROM full_cache_invalidation_non_partitioned_table", "VALUES (15)"); - - assertUpdate("DROP TABLE full_cache_invalidation_non_partitioned_table"); - } - - @Test - public void testCacheInvalidationIsAppliedSpecificallyOnPartitionDropped() - { - assertUpdate("CREATE TABLE partition_path_cache_invalidation_non_partitioned_table (col1 int) WITH (format = 'ORC')"); - assertUpdate("INSERT INTO partition_path_cache_invalidation_non_partitioned_table VALUES (1), (2), (3)", 3); - // The listing for the invalidate_non_partitioned_table1 should be in the directory cache after this call - assertQuery("SELECT sum(col1) FROM partition_path_cache_invalidation_non_partitioned_table", "VALUES (6)"); - org.apache.hadoop.fs.Path nonPartitionedTableLocation = getTableLocation(TPCH_SCHEMA, "partition_path_cache_invalidation_non_partitioned_table"); - assertThat(cachingDirectoryLister.isCached(nonPartitionedTableLocation)).isTrue(); - - assertUpdate("CREATE TABLE partition_path_cache_invalidation_partitioned_table (col1 int, col2 varchar) WITH (format = 'ORC', partitioned_by = ARRAY['col2'])"); - assertUpdate("INSERT INTO partition_path_cache_invalidation_partitioned_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2')", 4); - assertQuery("SELECT col2, sum(col1) FROM partition_path_cache_invalidation_partitioned_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7)"); - org.apache.hadoop.fs.Path partitionedTableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "partition_path_cache_invalidation_partitioned_table", ImmutableList.of("group1")); - org.apache.hadoop.fs.Path partitionedTableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "partition_path_cache_invalidation_partitioned_table", ImmutableList.of("group2")); - assertThat(cachingDirectoryLister.isCached(partitionedTableGroup1PartitionLocation)).isTrue(); - assertThat(cachingDirectoryLister.isCached(partitionedTableGroup2PartitionLocation)).isTrue(); - - assertUpdate("DELETE FROM partition_path_cache_invalidation_partitioned_table WHERE col2='group1'"); - assertThat(cachingDirectoryLister.isCached(nonPartitionedTableLocation)).isTrue(); - assertThat(cachingDirectoryLister.isCached(partitionedTableGroup1PartitionLocation)).isFalse(); - assertThat(cachingDirectoryLister.isCached(partitionedTableGroup2PartitionLocation)).isTrue(); - - assertQuery("SELECT sum(col1) FROM partition_path_cache_invalidation_non_partitioned_table", "VALUES (6)"); - assertQuery("SELECT col2, sum(col1) FROM partition_path_cache_invalidation_partitioned_table GROUP BY col2", "VALUES ('group2', 7)"); - - assertUpdate("DROP TABLE partition_path_cache_invalidation_non_partitioned_table"); - assertUpdate("DROP TABLE partition_path_cache_invalidation_partitioned_table"); - } - - @Test - public void testInsertIntoNonPartitionedTable() - { - assertUpdate("CREATE TABLE insert_into_non_partitioned_table (col1 int) WITH (format = 'ORC')"); - assertUpdate("INSERT INTO insert_into_non_partitioned_table VALUES (1), (2), (3)", 3); - // The listing for the table should be in the directory cache after this call - assertQuery("SELECT sum(col1) FROM insert_into_non_partitioned_table", "VALUES (6)"); - assertThat(cachingDirectoryLister.isCached(getTableLocation(TPCH_SCHEMA, "insert_into_non_partitioned_table"))).isTrue(); - assertUpdate("INSERT INTO insert_into_non_partitioned_table VALUES (4), (5)", 2); - // Inserting into the table should invalidate the cached listing of the files belonging to the table. - assertThat(cachingDirectoryLister.isCached(getTableLocation(TPCH_SCHEMA, "insert_into_non_partitioned_table"))).isFalse(); - - assertQuery("SELECT sum(col1) FROM insert_into_non_partitioned_table", "VALUES (15)"); - - assertUpdate("DROP TABLE insert_into_non_partitioned_table"); - } - - @Test - public void testInsertIntoPartitionedTable() - { - assertUpdate("CREATE TABLE insert_into_partitioned_table (col1 int, col2 varchar) WITH (format = 'ORC', partitioned_by = ARRAY['col2'])"); - assertUpdate("INSERT INTO insert_into_partitioned_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2')", 4); - // The listing for the table partitions should be in the directory cache after this call - assertQuery("SELECT col2, sum(col1) FROM insert_into_partitioned_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7)"); - org.apache.hadoop.fs.Path tableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "insert_into_partitioned_table", ImmutableList.of("group1")); - org.apache.hadoop.fs.Path tableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "insert_into_partitioned_table", ImmutableList.of("group2")); - assertThat(cachingDirectoryLister.isCached(tableGroup1PartitionLocation)).isTrue(); - assertThat(cachingDirectoryLister.isCached(tableGroup2PartitionLocation)).isTrue(); - - assertUpdate("INSERT INTO insert_into_partitioned_table VALUES (5, 'group2'), (6, 'group3')", 2); - assertThat(cachingDirectoryLister.isCached(tableGroup1PartitionLocation)).isTrue(); - // Inserting into the table should invalidate the cached listing of the partitions affected by the insert statement - assertThat(cachingDirectoryLister.isCached(tableGroup2PartitionLocation)).isFalse(); - assertQuery("SELECT col2, sum(col1) FROM insert_into_partitioned_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 12), ('group3', 6)"); - - assertUpdate("DROP TABLE insert_into_partitioned_table"); - } - - @Test - public void testDropPartition() + protected CachingDirectoryLister createDirectoryLister() { - assertUpdate("CREATE TABLE delete_from_partitioned_table (col1 int, col2 varchar) WITH (format = 'ORC', partitioned_by = ARRAY['col2'])"); - assertUpdate("INSERT INTO delete_from_partitioned_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2'), (5, 'group3')", 5); - // The listing for the table partitions should be in the directory cache after this call - assertQuery("SELECT col2, sum(col1) FROM delete_from_partitioned_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7), ('group3', 5)"); - org.apache.hadoop.fs.Path tableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("group1")); - org.apache.hadoop.fs.Path tableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("group2")); - org.apache.hadoop.fs.Path tableGroup3PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("group3")); - assertThat(cachingDirectoryLister.isCached(tableGroup1PartitionLocation)).isTrue(); - assertThat(cachingDirectoryLister.isCached(tableGroup2PartitionLocation)).isTrue(); - assertUpdate("DELETE FROM delete_from_partitioned_table WHERE col2 = 'group1' OR col2 = 'group2'"); - // Deleting from the table should invalidate the cached listing of the partitions dropped from the table. - assertThat(cachingDirectoryLister.isCached(tableGroup1PartitionLocation)).isFalse(); - assertThat(cachingDirectoryLister.isCached(tableGroup2PartitionLocation)).isFalse(); - assertThat(cachingDirectoryLister.isCached(tableGroup3PartitionLocation)).isTrue(); - assertQuery("SELECT col2, sum(col1) FROM delete_from_partitioned_table GROUP BY col2", "VALUES ('group3', 5)"); - - assertUpdate("DROP TABLE delete_from_partitioned_table"); + return new CachingDirectoryLister(Duration.valueOf("5m"), 1_000_000L, List.of("tpch.*")); } - @Test - public void testDropMultiLevelPartition() - { - assertUpdate("CREATE TABLE delete_from_partitioned_table (clicks bigint, day date, country varchar) WITH (format = 'ORC', partitioned_by = ARRAY['day', 'country'])"); - assertUpdate("INSERT INTO delete_from_partitioned_table VALUES (1000, DATE '2022-02-01', 'US'), (2000, DATE '2022-02-01', 'US'), (4000, DATE '2022-02-02', 'US'), (1500, DATE '2022-02-01', 'AT'), (2500, DATE '2022-02-02', 'AT')", 5); - // The listing for the table partitions should be in the directory cache after this call - assertQuery("SELECT day, country, sum(clicks) FROM delete_from_partitioned_table GROUP BY day, country", "VALUES (DATE '2022-02-01', 'US', 3000), (DATE '2022-02-02', 'US', 4000), (DATE '2022-02-01', 'AT', 1500), (DATE '2022-02-02', 'AT', 2500)"); - org.apache.hadoop.fs.Path table20220201UsPartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("2022-02-01", "US")); - org.apache.hadoop.fs.Path table20220202UsPartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("2022-02-02", "US")); - org.apache.hadoop.fs.Path table20220201AtPartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("2022-02-01", "AT")); - org.apache.hadoop.fs.Path table20220202AtPartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("2022-02-02", "AT")); - assertThat(cachingDirectoryLister.isCached(table20220201UsPartitionLocation)).isTrue(); - assertThat(cachingDirectoryLister.isCached(table20220202UsPartitionLocation)).isTrue(); - assertThat(cachingDirectoryLister.isCached(table20220201AtPartitionLocation)).isTrue(); - assertThat(cachingDirectoryLister.isCached(table20220202AtPartitionLocation)).isTrue(); - assertUpdate("DELETE FROM delete_from_partitioned_table WHERE day = DATE '2022-02-01'"); - // Deleting from the table should invalidate the cached listing of the partitions dropped from the table. - assertThat(cachingDirectoryLister.isCached(table20220201UsPartitionLocation)).isFalse(); - assertThat(cachingDirectoryLister.isCached(table20220202UsPartitionLocation)).isTrue(); - assertThat(cachingDirectoryLister.isCached(table20220201AtPartitionLocation)).isFalse(); - assertThat(cachingDirectoryLister.isCached(table20220202AtPartitionLocation)).isTrue(); - assertUpdate("DELETE FROM delete_from_partitioned_table WHERE country = 'US'"); - assertThat(cachingDirectoryLister.isCached(table20220202UsPartitionLocation)).isFalse(); - assertThat(cachingDirectoryLister.isCached(table20220202AtPartitionLocation)).isTrue(); - assertQuery("SELECT day, country, sum(clicks) FROM delete_from_partitioned_table GROUP BY day, country", "VALUES (DATE '2022-02-02', 'AT', 2500)"); - - assertUpdate("DROP TABLE delete_from_partitioned_table"); - } - - @Test - public void testUnregisterRegisterPartition() - { - assertUpdate("CREATE TABLE register_unregister_partition_table (col1 int, col2 varchar) WITH (format = 'ORC', partitioned_by = ARRAY['col2'])"); - assertUpdate("INSERT INTO register_unregister_partition_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2')", 4); - // The listing for the table partitions should be in the directory cache after this call - assertQuery("SELECT col2, sum(col1) FROM register_unregister_partition_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7)"); - org.apache.hadoop.fs.Path tableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "register_unregister_partition_table", ImmutableList.of("group1")); - org.apache.hadoop.fs.Path tableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "register_unregister_partition_table", ImmutableList.of("group2")); - assertThat(cachingDirectoryLister.isCached(tableGroup1PartitionLocation)).isTrue(); - assertThat(cachingDirectoryLister.isCached(tableGroup2PartitionLocation)).isTrue(); - - List paths = getQueryRunner().execute(getSession(), "SELECT \"$path\" FROM register_unregister_partition_table WHERE col2 = 'group1' LIMIT 1").toTestTypes().getMaterializedRows(); - String group1PartitionPath = new org.apache.hadoop.fs.Path((String) paths.get(0).getField(0)).getParent().toString(); - - assertUpdate(format("CALL system.unregister_partition('%s', '%s', ARRAY['col2'], ARRAY['group1'])", TPCH_SCHEMA, "register_unregister_partition_table")); - // Unregistering the partition in the table should invalidate the cached listing of all the partitions belonging to the table. - assertThat(cachingDirectoryLister.isCached(tableGroup1PartitionLocation)).isFalse(); - assertThat(cachingDirectoryLister.isCached(tableGroup2PartitionLocation)).isTrue(); - assertQuery("SELECT col2, sum(col1) FROM register_unregister_partition_table GROUP BY col2", "VALUES ('group2', 7)"); - assertThat(cachingDirectoryLister.isCached(tableGroup2PartitionLocation)).isTrue(); - - assertUpdate(format("CALL system.register_partition('%s', '%s', ARRAY['col2'], ARRAY['group1'], '%s')", TPCH_SCHEMA, "register_unregister_partition_table", group1PartitionPath)); - // Registering the partition in the table should invalidate the cached listing of all the partitions belonging to the table. - assertThat(cachingDirectoryLister.isCached(tableGroup1PartitionLocation)).isFalse(); - assertThat(cachingDirectoryLister.isCached(tableGroup2PartitionLocation)).isTrue(); - - assertQuery("SELECT col2, sum(col1) FROM register_unregister_partition_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7)"); - - assertUpdate("DROP TABLE register_unregister_partition_table"); - } - - @Test - public void testRenameTable() - { - assertUpdate("CREATE TABLE table_to_be_renamed (col1 int) WITH (format = 'ORC')"); - assertUpdate("INSERT INTO table_to_be_renamed VALUES (1), (2), (3)", 3); - // The listing for the table should be in the directory cache after this call - assertQuery("SELECT sum(col1) FROM table_to_be_renamed", "VALUES (6)"); - org.apache.hadoop.fs.Path tableLocation = getTableLocation(TPCH_SCHEMA, "table_to_be_renamed"); - assertThat(cachingDirectoryLister.isCached(tableLocation)).isTrue(); - assertUpdate("ALTER TABLE table_to_be_renamed RENAME TO table_renamed"); - // Altering the table should invalidate the cached listing of the files belonging to the table. - assertThat(cachingDirectoryLister.isCached(tableLocation)).isFalse(); - - assertUpdate("DROP TABLE table_renamed"); - } - - @Test - public void testDropTable() + @Override + protected boolean isCached(CachingDirectoryLister directoryLister, Path path) { - assertUpdate("CREATE TABLE table_to_be_dropped (col1 int) WITH (format = 'ORC')"); - assertUpdate("INSERT INTO table_to_be_dropped VALUES (1), (2), (3)", 3); - // The listing for the table should be in the directory cache after this call - assertQuery("SELECT sum(col1) FROM table_to_be_dropped", "VALUES (6)"); - org.apache.hadoop.fs.Path tableLocation = getTableLocation(TPCH_SCHEMA, "table_to_be_dropped"); - assertThat(cachingDirectoryLister.isCached(tableLocation)).isTrue(); - assertUpdate("DROP TABLE table_to_be_dropped"); - // Dropping the table should invalidate the cached listing of the files belonging to the table. - assertThat(cachingDirectoryLister.isCached(tableLocation)).isFalse(); + return directoryLister.isCached(path); } @Test - public void testDropPartitionedTable() - { - assertUpdate("CREATE TABLE drop_partitioned_table (col1 int, col2 varchar) WITH (format = 'ORC', partitioned_by = ARRAY['col2'])"); - assertUpdate("INSERT INTO drop_partitioned_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2'), (5, 'group3')", 5); - // The listing for the table partitions should be in the directory cache after this call - assertQuery("SELECT col2, sum(col1) FROM drop_partitioned_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7), ('group3', 5)"); - org.apache.hadoop.fs.Path tableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "drop_partitioned_table", ImmutableList.of("group1")); - org.apache.hadoop.fs.Path tableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "drop_partitioned_table", ImmutableList.of("group2")); - org.apache.hadoop.fs.Path tableGroup3PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "drop_partitioned_table", ImmutableList.of("group3")); - assertThat(cachingDirectoryLister.isCached(tableGroup1PartitionLocation)).isTrue(); - assertThat(cachingDirectoryLister.isCached(tableGroup2PartitionLocation)).isTrue(); - assertThat(cachingDirectoryLister.isCached(tableGroup3PartitionLocation)).isTrue(); - assertUpdate("DROP TABLE drop_partitioned_table"); - assertThat(cachingDirectoryLister.isCached(tableGroup1PartitionLocation)).isFalse(); - assertThat(cachingDirectoryLister.isCached(tableGroup2PartitionLocation)).isFalse(); - assertThat(cachingDirectoryLister.isCached(tableGroup3PartitionLocation)).isFalse(); - } - - private org.apache.hadoop.fs.Path getTableLocation(String schemaName, String tableName) + public void forceTestNgToRespectSingleThreaded() { - return fileHiveMetastore.getTable(schemaName, tableName) - .map(table -> table.getStorage().getLocation()) - .map(tableLocation -> new org.apache.hadoop.fs.Path(tableLocation)) - .orElseThrow(() -> new NoSuchElementException(format("The table %s.%s could not be found", schemaName, tableName))); - } - - private org.apache.hadoop.fs.Path getPartitionLocation(String schemaName, String tableName, List partitionValues) - { - Table table = fileHiveMetastore.getTable(schemaName, tableName) - .orElseThrow(() -> new NoSuchElementException(format("The table %s.%s could not be found", schemaName, tableName))); - - return fileHiveMetastore.getPartition(table, partitionValues) - .map(partition -> partition.getStorage().getLocation()) - .map(partitionLocation -> new org.apache.hadoop.fs.Path(partitionLocation)) - .orElseThrow(() -> new NoSuchElementException(format("The partition %s from the table %s.%s could not be found", partitionValues, schemaName, tableName))); + // TODO: Remove after updating TestNG to 7.4.0+ (https://github.com/trinodb/trino/issues/8571) + // TestNG doesn't enforce @Test(singleThreaded = true) when tests are defined in base class. According to + // https://github.com/cbeust/testng/issues/2361#issuecomment-688393166 a workaround it to add a dummy test to the leaf test class. } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestTransactionScopeCachingDirectoryLister.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestTransactionScopeCachingDirectoryLister.java new file mode 100644 index 000000000000..adf413c35f6f --- /dev/null +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestTransactionScopeCachingDirectoryLister.java @@ -0,0 +1,234 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.fs; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.trino.plugin.hive.HiveBucketProperty; +import io.trino.plugin.hive.HiveType; +import io.trino.plugin.hive.metastore.Column; +import io.trino.plugin.hive.metastore.Partition; +import io.trino.plugin.hive.metastore.SortingColumn; +import io.trino.plugin.hive.metastore.Storage; +import io.trino.plugin.hive.metastore.StorageFormat; +import io.trino.plugin.hive.metastore.Table; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; + +import static io.trino.plugin.hive.util.HiveBucketing.BucketingVersion.BUCKETING_V1; +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.fs.permission.FsPermission.getFileDefault; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +// some tests may invalidate the whole cache affecting therefore other concurrent tests +@Test(singleThreaded = true) +public class TestTransactionScopeCachingDirectoryLister + extends BaseCachingDirectoryListerTest +{ + private static final Column TABLE_COLUMN = new Column( + "column", + HiveType.HIVE_INT, + Optional.of("comment")); + private static final Storage TABLE_STORAGE = new Storage( + StorageFormat.create("serde", "input", "output"), + Optional.of("location"), + Optional.of(new HiveBucketProperty(ImmutableList.of("column"), BUCKETING_V1, 10, ImmutableList.of(new SortingColumn("column", SortingColumn.Order.ASCENDING)))), + true, + ImmutableMap.of("param", "value2")); + private static final Table TABLE = new Table( + "database", + "table", + Optional.of("owner"), + "table_type", + TABLE_STORAGE, + ImmutableList.of(TABLE_COLUMN), + ImmutableList.of(TABLE_COLUMN), + ImmutableMap.of("param", "value3"), + Optional.of("original_text"), + Optional.of("expanded_text"), + OptionalLong.empty()); + + @Override + protected TransactionScopeCachingDirectoryLister createDirectoryLister() + { + return new TransactionScopeCachingDirectoryLister(new FileSystemDirectoryLister(), 1_000_000L); + } + + @Override + protected boolean isCached(TransactionScopeCachingDirectoryLister directoryLister, Path path) + { + return directoryLister.isCached(path); + } + + @Test + public void testConcurrentDirectoryListing() + throws IOException + { + LocatedFileStatus firstFile = new LocatedFileStatus(1, false, 1, 1, 1, 1, getFileDefault(), "x", "x", new org.apache.hadoop.fs.Path("x"), new org.apache.hadoop.fs.Path("x"), false, false, false, new BlockLocation[] {}); + LocatedFileStatus secondFile = new LocatedFileStatus(1, false, 1, 1, 1, 1, getFileDefault(), "y", "y", new org.apache.hadoop.fs.Path("y"), new org.apache.hadoop.fs.Path("y"), false, false, false, new BlockLocation[] {}); + LocatedFileStatus thirdFile = new LocatedFileStatus(1, false, 1, 1, 1, 1, getFileDefault(), "z", "z", new org.apache.hadoop.fs.Path("z"), new org.apache.hadoop.fs.Path("z"), false, false, false, new BlockLocation[] {}); + + org.apache.hadoop.fs.Path path1 = new org.apache.hadoop.fs.Path("x"); + org.apache.hadoop.fs.Path path2 = new org.apache.hadoop.fs.Path("y"); + + CountingDirectoryLister countingLister = new CountingDirectoryLister( + ImmutableMap.of( + path1, ImmutableList.of(firstFile, secondFile), + path2, ImmutableList.of(thirdFile))); + + TransactionScopeCachingDirectoryLister cachingLister = new TransactionScopeCachingDirectoryLister(countingLister, 2); + + assertFiles(cachingLister.list(null, TABLE, path2), ImmutableList.of(thirdFile)); + assertThat(countingLister.getListCount()).isEqualTo(1); + + // listing path2 again shouldn't increase listing count + assertThat(cachingLister.isCached(path2)).isTrue(); + assertFiles(cachingLister.list(null, TABLE, path2), ImmutableList.of(thirdFile)); + assertThat(countingLister.getListCount()).isEqualTo(1); + + // start listing path1 concurrently + RemoteIterator path1FilesA = cachingLister.list(null, TABLE, path1); + RemoteIterator path1FilesB = cachingLister.list(null, TABLE, path1); + assertThat(countingLister.getListCount()).isEqualTo(2); + + // list path1 files using both iterators concurrently + assertThat(path1FilesA.next()).isEqualTo(firstFile); + assertThat(path1FilesB.next()).isEqualTo(firstFile); + assertThat(path1FilesB.next()).isEqualTo(secondFile); + assertThat(path1FilesA.next()).isEqualTo(secondFile); + assertThat(path1FilesA.hasNext()).isFalse(); + assertThat(path1FilesB.hasNext()).isFalse(); + assertThat(countingLister.getListCount()).isEqualTo(2); + + // listing path2 again should increase listing count because 2 files were cached for path1 + assertThat(cachingLister.isCached(path2)).isFalse(); + assertFiles(cachingLister.list(null, TABLE, path2), ImmutableList.of(thirdFile)); + assertThat(countingLister.getListCount()).isEqualTo(3); + } + + @Test + public void testConcurrentDirectoryListingException() + throws IOException + { + LocatedFileStatus file = new LocatedFileStatus(1, false, 1, 1, 1, 1, getFileDefault(), "x", "x", new org.apache.hadoop.fs.Path("x"), new org.apache.hadoop.fs.Path("x"), false, false, false, new BlockLocation[] {}); + org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path("x"); + + CountingDirectoryLister countingLister = new CountingDirectoryLister(ImmutableMap.of(path, ImmutableList.of(file))); + DirectoryLister cachingLister = new TransactionScopeCachingDirectoryLister(countingLister, 1); + + // start listing path concurrently + countingLister.setThrowException(true); + RemoteIterator filesA = cachingLister.list(null, TABLE, path); + RemoteIterator filesB = cachingLister.list(null, TABLE, path); + assertThat(countingLister.getListCount()).isEqualTo(1); + + // listing should throw an exception + assertThatThrownBy(filesA::hasNext).isInstanceOf(IOException.class); + + // listing again should succeed + countingLister.setThrowException(false); + assertFiles(cachingLister.list(null, TABLE, path), ImmutableList.of(file)); + assertThat(countingLister.getListCount()).isEqualTo(2); + + // listing using second concurrently initialized DirectoryLister should fail + assertThatThrownBy(filesB::hasNext).isInstanceOf(IOException.class); + } + + private void assertFiles(RemoteIterator iterator, List expectedFiles) + throws IOException + { + ImmutableList.Builder actualFiles = ImmutableList.builder(); + while (iterator.hasNext()) { + actualFiles.add(iterator.next()); + } + assertThat(actualFiles.build()).isEqualTo(expectedFiles); + } + + private static class CountingDirectoryLister + implements DirectoryLister + { + private final Map> fileStatuses; + private int listCount; + private boolean throwException; + + public CountingDirectoryLister(Map> fileStatuses) + { + this.fileStatuses = requireNonNull(fileStatuses, "fileStatuses is null"); + } + + @Override + public RemoteIterator list(FileSystem fs, Table table, org.apache.hadoop.fs.Path path) + throws IOException + { + listCount++; + return throwingRemoteIterator(requireNonNull(fileStatuses.get(path)), throwException); + } + + public void setThrowException(boolean throwException) + { + this.throwException = throwException; + } + + public int getListCount() + { + return listCount; + } + + @Override + public void invalidate(Partition partition) + { + } + + @Override + public void invalidate(Table table) + { + } + } + + static RemoteIterator throwingRemoteIterator(List files, boolean throwException) + { + return new RemoteIterator<>() + { + private final Iterator iterator = ImmutableList.copyOf(files).iterator(); + + @Override + public boolean hasNext() + throws IOException + { + if (throwException) { + throw new IOException(); + } + return iterator.hasNext(); + } + + @Override + public LocatedFileStatus next() + { + return iterator.next(); + } + }; + } +} diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/TestSemiTransactionalHiveMetastore.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/TestSemiTransactionalHiveMetastore.java index 5a6e50d72165..fb1edd768ac6 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/TestSemiTransactionalHiveMetastore.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/TestSemiTransactionalHiveMetastore.java @@ -19,8 +19,8 @@ import io.trino.plugin.hive.HiveMetastoreClosure; import io.trino.plugin.hive.HiveType; import io.trino.plugin.hive.PartitionStatistics; -import io.trino.plugin.hive.TableInvalidationCallback; import io.trino.plugin.hive.acid.AcidTransaction; +import io.trino.plugin.hive.fs.FileSystemDirectoryLister; import org.apache.hadoop.fs.Path; import org.testng.annotations.Test; @@ -88,7 +88,7 @@ private SemiTransactionalHiveMetastore getSemiTransactionalHiveMetastoreWithDrop true, Optional.empty(), newScheduledThreadPool(1), - TableInvalidationCallback.NOOP); + new FileSystemDirectoryLister()); } @Test @@ -128,7 +128,7 @@ private SemiTransactionalHiveMetastore getSemiTransactionalHiveMetastoreWithUpda true, Optional.empty(), newScheduledThreadPool(1), - TableInvalidationCallback.NOOP); + new FileSystemDirectoryLister()); } private class TestingHiveMetastore