Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.trino.plugin.hive.SystemTableProvider;
import io.trino.plugin.hive.TransactionalMetadata;
import io.trino.plugin.hive.TransactionalMetadataFactory;
import io.trino.plugin.hive.fs.DirectoryLister;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.MetastoreConfig;
import io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore;
Expand Down Expand Up @@ -175,6 +176,12 @@ public SemiTransactionalHiveMetastore getMetastore()
throw new RuntimeException("SemiTransactionalHiveMetastore is not used by Delta");
}

@Override
public DirectoryLister getDirectoryLister()
{
throw new RuntimeException("DirectoryLister is not used by Delta");
Comment thread
sopel39 marked this conversation as resolved.
Outdated
}

@Override
public void commit() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ public void testEmptyOrcFile()
// Alluxio metastore does not support create operations
}

@Override
public void testPerTransactionDirectoryListerCache()
{
// Alluxio metastore does not support create operations
}

// specifically disable so that expected exception on the superclass don't fail this test
@Override
@Test(enabled = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@
import io.trino.plugin.hive.HiveSplit.BucketConversion;
import io.trino.plugin.hive.HiveSplit.BucketValidation;
import io.trino.plugin.hive.acid.AcidTransaction;
import io.trino.plugin.hive.fs.DirectoryLister;
import io.trino.plugin.hive.fs.HiveFileIterator;
import io.trino.plugin.hive.metastore.Column;
import io.trino.plugin.hive.metastore.Partition;
import io.trino.plugin.hive.metastore.Table;
import io.trino.plugin.hive.util.HiveBucketing.BucketingVersion;
import io.trino.plugin.hive.util.HiveBucketing.HiveBucketFilter;
import io.trino.plugin.hive.util.HiveFileIterator;
import io.trino.plugin.hive.util.InternalHiveSplitFactory;
import io.trino.plugin.hive.util.ResumableTask;
import io.trino.plugin.hive.util.ResumableTasks;
Expand Down Expand Up @@ -106,13 +107,13 @@
import static io.trino.plugin.hive.HiveSessionProperties.getMaxInitialSplitSize;
import static io.trino.plugin.hive.HiveSessionProperties.isForceLocalScheduling;
import static io.trino.plugin.hive.HiveSessionProperties.isValidateBucketing;
import static io.trino.plugin.hive.fs.HiveFileIterator.NestedDirectoryPolicy.FAIL;
import static io.trino.plugin.hive.fs.HiveFileIterator.NestedDirectoryPolicy.IGNORED;
import static io.trino.plugin.hive.fs.HiveFileIterator.NestedDirectoryPolicy.RECURSE;
import static io.trino.plugin.hive.metastore.MetastoreUtil.getHiveSchema;
import static io.trino.plugin.hive.metastore.MetastoreUtil.getPartitionLocation;
import static io.trino.plugin.hive.s3select.S3SelectPushdown.shouldEnablePushdownForTable;
import static io.trino.plugin.hive.util.ConfigurationUtils.toJobConf;
import static io.trino.plugin.hive.util.HiveFileIterator.NestedDirectoryPolicy.FAIL;
import static io.trino.plugin.hive.util.HiveFileIterator.NestedDirectoryPolicy.IGNORED;
import static io.trino.plugin.hive.util.HiveFileIterator.NestedDirectoryPolicy.RECURSE;
import static io.trino.plugin.hive.util.HiveUtil.checkCondition;
import static io.trino.plugin.hive.util.HiveUtil.getFooterCount;
import static io.trino.plugin.hive.util.HiveUtil.getHeaderCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ public class HiveConfig
private Duration fileStatusCacheExpireAfterWrite = new Duration(1, MINUTES);
private long fileStatusCacheMaxSize = 1000 * 1000;
private List<String> fileStatusCacheTables = ImmutableList.of();
private long perTransactionFileStatusCacheMaximumSize = 1000 * 1000;

private boolean translateHiveViews;

private Optional<Duration> hiveTransactionHeartbeatInterval = Optional.empty();
Expand Down Expand Up @@ -746,6 +748,20 @@ public HiveConfig setFileStatusCacheTables(String fileStatusCacheTables)
return this;
}

@Min(1)
public long getPerTransactionFileStatusCacheMaximumSize()
{
return perTransactionFileStatusCacheMaximumSize;
}

@Config("hive.per-transaction-file-status-cache-maximum-size")
Comment thread
sopel39 marked this conversation as resolved.
Outdated
@ConfigDescription("Maximum number of file statuses cached by transactional file status cache")
public HiveConfig setPerTransactionFileStatusCacheMaximumSize(long perTransactionFileStatusCacheMaximumSize)
{
this.perTransactionFileStatusCacheMaximumSize = perTransactionFileStatusCacheMaximumSize;
return this;
}

public boolean isTranslateHiveViews()
{
return translateHiveViews;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.trino.plugin.hive.acid.AcidOperation;
import io.trino.plugin.hive.acid.AcidSchema;
import io.trino.plugin.hive.acid.AcidTransaction;
import io.trino.plugin.hive.fs.DirectoryLister;
import io.trino.plugin.hive.metastore.Column;
import io.trino.plugin.hive.metastore.Database;
import io.trino.plugin.hive.metastore.HiveColumnStatistics;
Expand Down Expand Up @@ -364,6 +365,7 @@ public class HiveMetadata
private final HiveMaterializedViewMetadata hiveMaterializedViewMetadata;
private final AccessControlMetadata accessControlMetadata;
private final HiveTableRedirectionsProvider tableRedirectionsProvider;
private final DirectoryLister directoryLister;

public HiveMetadata(
CatalogName catalogName,
Expand All @@ -385,7 +387,8 @@ public HiveMetadata(
Set<SystemTableProvider> systemTableProviders,
HiveMaterializedViewMetadata hiveMaterializedViewMetadata,
AccessControlMetadata accessControlMetadata,
HiveTableRedirectionsProvider tableRedirectionsProvider)
HiveTableRedirectionsProvider tableRedirectionsProvider,
DirectoryLister directoryLister)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.metastore = requireNonNull(metastore, "metastore is null");
Expand All @@ -407,6 +410,7 @@ public HiveMetadata(
this.hiveMaterializedViewMetadata = requireNonNull(hiveMaterializedViewMetadata, "hiveMaterializedViewMetadata is null");
this.accessControlMetadata = requireNonNull(accessControlMetadata, "accessControlMetadata is null");
this.tableRedirectionsProvider = requireNonNull(tableRedirectionsProvider, "tableRedirectionsProvider is null");
this.directoryLister = requireNonNull(directoryLister, "directoryLister is null");
}

@Override
Expand All @@ -415,6 +419,12 @@ public SemiTransactionalHiveMetastore getMetastore()
return metastore;
}

@Override
public DirectoryLister getDirectoryLister()
{
return directoryLister;
}

@Override
public List<String> listSchemaNames(ConnectorSession session)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import io.airlift.json.JsonCodec;
import io.airlift.units.Duration;
import io.trino.plugin.base.CatalogName;
import io.trino.plugin.hive.fs.DirectoryLister;
import io.trino.plugin.hive.fs.TransactionScopeCachingDirectoryLister;
import io.trino.plugin.hive.metastore.HiveMetastoreFactory;
import io.trino.plugin.hive.metastore.MetastoreConfig;
import io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore;
Expand Down Expand Up @@ -68,7 +70,8 @@ public class HiveMetadataFactory
private final Optional<Duration> hiveTransactionHeartbeatInterval;
private final HiveTableRedirectionsProvider tableRedirectionsProvider;
private final ScheduledExecutorService heartbeatService;
private final TableInvalidationCallback tableInvalidationCallback;
private final DirectoryLister directoryLister;
private final long perTransactionFileStatusCacheMaximumSize;

@Inject
public HiveMetadataFactory(
Expand All @@ -90,7 +93,7 @@ public HiveMetadataFactory(
HiveMaterializedViewMetadataFactory hiveMaterializedViewMetadataFactory,
AccessControlMetadataFactory accessControlMetadataFactory,
HiveTableRedirectionsProvider tableRedirectionsProvider,
TableInvalidationCallback tableInvalidationCallback)
DirectoryLister directoryLister)
{
this(
catalogName,
Expand Down Expand Up @@ -121,7 +124,8 @@ public HiveMetadataFactory(
hiveMaterializedViewMetadataFactory,
accessControlMetadataFactory,
tableRedirectionsProvider,
tableInvalidationCallback);
directoryLister,
hiveConfig.getPerTransactionFileStatusCacheMaximumSize());
}

public HiveMetadataFactory(
Expand Down Expand Up @@ -153,7 +157,8 @@ public HiveMetadataFactory(
HiveMaterializedViewMetadataFactory hiveMaterializedViewMetadataFactory,
AccessControlMetadataFactory accessControlMetadataFactory,
HiveTableRedirectionsProvider tableRedirectionsProvider,
TableInvalidationCallback tableInvalidationCallback)
DirectoryLister directoryLister,
long perTransactionFileStatusCacheMaximumSize)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.skipDeletionForAlter = skipDeletionForAlter;
Expand Down Expand Up @@ -190,7 +195,8 @@ public HiveMetadataFactory(
updateExecutor = new BoundedExecutor(executorService, maxConcurrentMetastoreUpdates);
}
this.heartbeatService = requireNonNull(heartbeatService, "heartbeatService is null");
this.tableInvalidationCallback = requireNonNull(tableInvalidationCallback, "tableInvalidationCallback is null");
this.directoryLister = requireNonNull(directoryLister, "directoryLister is null");
this.perTransactionFileStatusCacheMaximumSize = perTransactionFileStatusCacheMaximumSize;
}

@Override
Expand All @@ -199,6 +205,8 @@ public TransactionalMetadata create(ConnectorIdentity identity, boolean autoComm
HiveMetastoreClosure hiveMetastoreClosure = new HiveMetastoreClosure(
memoizeMetastore(metastoreFactory.createMetastore(Optional.of(identity)), perTransactionCacheMaximumSize)); // per-transaction cache

DirectoryLister directoryLister = new TransactionScopeCachingDirectoryLister(this.directoryLister, perTransactionFileStatusCacheMaximumSize);

SemiTransactionalHiveMetastore metastore = new SemiTransactionalHiveMetastore(
hdfsEnvironment,
hiveMetastoreClosure,
Expand All @@ -210,7 +218,7 @@ public TransactionalMetadata create(ConnectorIdentity identity, boolean autoComm
deleteSchemaLocationsFallback,
hiveTransactionHeartbeatInterval,
heartbeatService,
tableInvalidationCallback);
directoryLister);

return new HiveMetadata(
catalogName,
Expand All @@ -232,6 +240,7 @@ public TransactionalMetadata create(ConnectorIdentity identity, boolean autoComm
systemTableProviders,
hiveMaterializedViewMetadataFactory.create(hiveMetastoreClosure),
accessControlMetadataFactory.create(metastore),
tableRedirectionsProvider);
tableRedirectionsProvider,
directoryLister);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.inject.multibindings.Multibinder;
import io.airlift.event.client.EventClient;
import io.trino.plugin.base.CatalogName;
import io.trino.plugin.hive.fs.CachingDirectoryLister;
import io.trino.plugin.hive.metastore.MetastoreConfig;
import io.trino.plugin.hive.orc.OrcFileWriterFactory;
import io.trino.plugin.hive.orc.OrcPageSourceFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ public class HiveSplitManager
private final HivePartitionManager partitionManager;
private final NamenodeStats namenodeStats;
private final HdfsEnvironment hdfsEnvironment;
private final DirectoryLister directoryLister;
private final Executor executor;
private final int maxOutstandingSplits;
private final DataSize maxOutstandingSplitsSize;
Expand All @@ -117,7 +116,6 @@ public HiveSplitManager(
HivePartitionManager partitionManager,
NamenodeStats namenodeStats,
HdfsEnvironment hdfsEnvironment,
DirectoryLister directoryLister,
ExecutorService executorService,
VersionEmbedder versionEmbedder,
TypeManager typeManager)
Expand All @@ -127,7 +125,6 @@ public HiveSplitManager(
partitionManager,
namenodeStats,
hdfsEnvironment,
directoryLister,
versionEmbedder.embedVersion(new BoundedExecutor(executorService, hiveConfig.getMaxSplitIteratorThreads())),
new CounterStat(),
hiveConfig.getMaxOutstandingSplits(),
Expand All @@ -146,7 +143,6 @@ public HiveSplitManager(
HivePartitionManager partitionManager,
NamenodeStats namenodeStats,
HdfsEnvironment hdfsEnvironment,
DirectoryLister directoryLister,
Executor executor,
CounterStat highMemorySplitSourceCounter,
int maxOutstandingSplits,
Expand All @@ -163,7 +159,6 @@ public HiveSplitManager(
this.partitionManager = requireNonNull(partitionManager, "partitionManager is null");
this.namenodeStats = requireNonNull(namenodeStats, "namenodeStats is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.directoryLister = requireNonNull(directoryLister, "directoryLister is null");
this.executor = new ErrorCodedExecutor(executor);
this.highMemorySplitSourceCounter = requireNonNull(highMemorySplitSourceCounter, "highMemorySplitSourceCounter is null");
checkArgument(maxOutstandingSplits >= 1, "maxOutstandingSplits must be at least 1");
Expand All @@ -190,7 +185,8 @@ public ConnectorSplitSource getSplits(
SchemaTableName tableName = hiveTable.getSchemaTableName();

// get table metadata
SemiTransactionalHiveMetastore metastore = transactionManager.get(transaction, session.getIdentity()).getMetastore();
TransactionalMetadata transactionalMetadata = transactionManager.get(transaction, session.getIdentity());
SemiTransactionalHiveMetastore metastore = transactionalMetadata.getMetastore();
Table table = metastore.getTable(tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(tableName));

Expand Down Expand Up @@ -239,7 +235,7 @@ public ConnectorSplitSource getSplits(
session,
hdfsEnvironment,
namenodeStats,
directoryLister,
transactionalMetadata.getDirectoryLister(),
executor,
concurrency,
recursiveDfsWalkerEnabled,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.plugin.hive.authentication.HdfsAuthenticationModule;
import io.trino.plugin.hive.azure.HiveAzureModule;
import io.trino.plugin.hive.fs.CachingDirectoryListerModule;
import io.trino.plugin.hive.fs.DirectoryLister;
import io.trino.plugin.hive.gcs.HiveGcsModule;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.HiveMetastoreModule;
Expand Down Expand Up @@ -87,7 +89,7 @@ public static Connector createConnector(
ConnectorContext context,
Module module,
Optional<HiveMetastore> metastore,
Optional<CachingDirectoryLister> cachingDirectoryLister)
Optional<DirectoryLister> directoryLister)
{
requireNonNull(config, "config is null");

Expand All @@ -101,7 +103,7 @@ public static Connector createConnector(
new JsonModule(),
new TypeDeserializerModule(context.getTypeManager()),
new HiveModule(),
new CachingDirectoryListerModule(cachingDirectoryLister),
new CachingDirectoryListerModule(directoryLister),
new HiveHdfsModule(),
new HiveS3Module(),
new HiveGcsModule(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,6 @@

public interface TableInvalidationCallback
{
TableInvalidationCallback NOOP = new TableInvalidationCallback() {
@Override
public void invalidate(Partition partition)
{
}

@Override
public void invalidate(Table table)
{
}
};

void invalidate(Partition partition);

void invalidate(Table table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.hive;

import io.trino.plugin.hive.fs.DirectoryLister;
import io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore;
import io.trino.spi.connector.ConnectorMetadata;

Expand All @@ -21,6 +22,8 @@ public interface TransactionalMetadata
{
SemiTransactionalHiveMetastore getMetastore();

DirectoryLister getDirectoryLister();

void commit();

void rollback();
Expand Down
Loading