Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@
*/
package io.trino.plugin.hive;

import com.google.common.cache.CacheBuilder;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.Weigher;
import com.google.common.collect.ImmutableList;
import io.airlift.units.Duration;
import io.trino.collect.cache.NonKeyEvictableCache;
import io.trino.collect.cache.EvictableCacheBuilder;
import io.trino.plugin.hive.metastore.Partition;
import io.trino.plugin.hive.metastore.Storage;
import io.trino.plugin.hive.metastore.Table;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SchemaTablePrefix;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
Expand All @@ -32,17 +36,21 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.collect.cache.SafeCaches.buildNonEvictableCacheWithWeakInvalidateAll;
import static java.util.Objects.requireNonNull;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;

public class CachingDirectoryLister
implements DirectoryLister
implements DirectoryLister, TableInvalidationCallback
{
// TODO (https://github.com/trinodb/trino/issues/10621) this cache lacks invalidation
private final NonKeyEvictableCache<Path, List<LocatedFileStatus>> cache;
//TODO use a cache key based on Path & SchemaTableName and iterate over the cache keys
// to deal more efficiently with cache invalidation scenarios for partitioned tables.
private final Cache<Path, ValueHolder> cache;
private final List<SchemaTablePrefix> tablePrefixes;

@Inject
Expand All @@ -53,11 +61,12 @@ public CachingDirectoryLister(HiveConfig hiveClientConfig)

public CachingDirectoryLister(Duration expireAfterWrite, long maxSize, List<String> tables)
{
this.cache = buildNonEvictableCacheWithWeakInvalidateAll(CacheBuilder.newBuilder()
this.cache = EvictableCacheBuilder.newBuilder()
.maximumWeight(maxSize)
.weigher((Weigher<Path, List<LocatedFileStatus>>) (key, value) -> value.size())
.weigher((Weigher<Path, ValueHolder>) (key, value) -> value.files.map(List::size).orElse(1))
.expireAfterWrite(expireAfterWrite.toMillis(), TimeUnit.MILLISECONDS)
.recordStats());
.recordStats()
.build();
this.tablePrefixes = tables.stream()
.map(CachingDirectoryLister::parseTableName)
.collect(toImmutableList());
Expand All @@ -82,19 +91,46 @@ private static SchemaTablePrefix parseTableName(String tableName)
public RemoteIterator<LocatedFileStatus> list(FileSystem fs, Table table, Path path)
throws IOException
{
List<LocatedFileStatus> files = cache.getIfPresent(path);
if (files != null) {
return simpleRemoteIterator(files);
if (!isCacheEnabledFor(table.getSchemaTableName())) {
return fs.listLocatedStatus(path);
}
RemoteIterator<LocatedFileStatus> iterator = fs.listLocatedStatus(path);

if (tablePrefixes.stream().noneMatch(prefix -> prefix.matches(table.getSchemaTableName()))) {
return iterator;
ValueHolder cachedValueHolder;
try {
cachedValueHolder = cache.get(path, ValueHolder::new);
}
catch (ExecutionException e) {
throw new RuntimeException(e); // cannot happen
}
if (cachedValueHolder.getFiles().isPresent()) {
return simpleRemoteIterator(cachedValueHolder.getFiles().get());
}
return cachingRemoteIterator(cachedValueHolder, fs.listLocatedStatus(path), path);
}

@Override
public void invalidate(Table table)
{
if (isCacheEnabledFor(table.getSchemaTableName()) && isLocationPresent(table.getStorage())) {
if (table.getPartitionColumns().isEmpty()) {
cache.invalidate(new Path(table.getStorage().getLocation()));
Copy link
Copy Markdown
Contributor Author

@findinpath findinpath Feb 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the usage of the constructor with an unexpected path cause us problems?
I could use String instead of Path for the cache keys to be on the safe side.

}
else {
// a partitioned table can have multiple paths in cache
cache.invalidateAll();
}
}
return cachingRemoteIterator(iterator, path);
}

private RemoteIterator<LocatedFileStatus> cachingRemoteIterator(RemoteIterator<LocatedFileStatus> iterator, Path path)
@Override
public void invalidate(Partition partition)
{
if (isCacheEnabledFor(partition.getSchemaTableName()) && isLocationPresent(partition.getStorage())) {
cache.invalidate(new Path(partition.getStorage().getLocation()));
}
}

private RemoteIterator<LocatedFileStatus> cachingRemoteIterator(ValueHolder cachedValueHolder, RemoteIterator<LocatedFileStatus> iterator, Path path)
{
return new RemoteIterator<>()
{
Expand All @@ -106,7 +142,9 @@ public boolean hasNext()
{
boolean hasNext = iterator.hasNext();
if (!hasNext) {
cache.put(path, ImmutableList.copyOf(files));
// The cachedValueHolder acts as an invalidation guard. If a cache invalidation happens while this iterator goes over
// the files from the specified path, the eventually outdated file listing will not be added anymore to the cache.
cache.asMap().replace(path, cachedValueHolder, new ValueHolder(files));
}
return hasNext;
}
Expand Down Expand Up @@ -145,8 +183,6 @@ public LocatedFileStatus next()
@Managed
public void flushCache()
{
// Note: this may not invalidate ongoing loads (https://github.com/trinodb/trino/issues/10512, https://github.com/google/guava/issues/1881)
// This is acceptable, since this operation is invoked manually, and not relied upon for correctness.
cache.invalidateAll();
}

Expand Down Expand Up @@ -179,4 +215,47 @@ public long getRequestCount()
{
return cache.stats().requestCount();
}

@VisibleForTesting
boolean isCached(Path path)
{
ValueHolder cached = cache.getIfPresent(path);
return cached != null && cached.getFiles().isPresent();
}

private boolean isCacheEnabledFor(SchemaTableName schemaTableName)
{
return tablePrefixes.stream().anyMatch(prefix -> prefix.matches(schemaTableName));
}

private static boolean isLocationPresent(Storage storage)
{
// Some Hive table types (e.g.: views) do not have a storage location
return storage.getOptionalLocation().isPresent() && isNotEmpty(storage.getLocation());
}

/**
* The class enforces intentionally object identity semantics for the value holder,
* not value-based class semantics to correctly act as an invalidation guard in the
* cache.
*/
private static class ValueHolder
{
private final Optional<List<LocatedFileStatus>> files;

public ValueHolder()
{
files = Optional.empty();
}

public ValueHolder(List<LocatedFileStatus> files)
{
this.files = Optional.of(ImmutableList.copyOf(requireNonNull(files, "files is null")));
}

public Optional<List<LocatedFileStatus>> getFiles()
{
return files;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive;

import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;

import java.util.Optional;

import static java.util.Objects.requireNonNull;

public class CachingDirectoryListerModule
implements Module
{
private final Optional<CachingDirectoryLister> cachingDirectoryLister;

public CachingDirectoryListerModule(Optional<CachingDirectoryLister> cachingDirectoryLister)
{
this.cachingDirectoryLister = requireNonNull(cachingDirectoryLister, "cachingDirectoryLister is null");
}

@Override
public void configure(Binder binder)
{
if (cachingDirectoryLister.isPresent()) {
CachingDirectoryLister directoryLister = cachingDirectoryLister.get();
binder.bind(DirectoryLister.class).toInstance(directoryLister);
binder.bind(TableInvalidationCallback.class).toInstance(directoryLister);
}
else {
binder.bind(CachingDirectoryLister.class).in(Scopes.SINGLETON);
binder.bind(DirectoryLister.class).to(CachingDirectoryLister.class);
binder.bind(TableInvalidationCallback.class).to(CachingDirectoryLister.class);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class HiveMetadataFactory
private final Optional<Duration> hiveTransactionHeartbeatInterval;
private final HiveTableRedirectionsProvider tableRedirectionsProvider;
private final ScheduledExecutorService heartbeatService;
private final TableInvalidationCallback tableInvalidationCallback;

@Inject
public HiveMetadataFactory(
Expand All @@ -88,7 +89,8 @@ public HiveMetadataFactory(
Set<SystemTableProvider> systemTableProviders,
HiveMaterializedViewMetadataFactory hiveMaterializedViewMetadataFactory,
AccessControlMetadataFactory accessControlMetadataFactory,
HiveTableRedirectionsProvider tableRedirectionsProvider)
HiveTableRedirectionsProvider tableRedirectionsProvider,
TableInvalidationCallback tableInvalidationCallback)
{
this(
catalogName,
Expand Down Expand Up @@ -118,7 +120,8 @@ public HiveMetadataFactory(
systemTableProviders,
hiveMaterializedViewMetadataFactory,
accessControlMetadataFactory,
tableRedirectionsProvider);
tableRedirectionsProvider,
tableInvalidationCallback);
}

public HiveMetadataFactory(
Expand Down Expand Up @@ -149,7 +152,8 @@ public HiveMetadataFactory(
Set<SystemTableProvider> systemTableProviders,
HiveMaterializedViewMetadataFactory hiveMaterializedViewMetadataFactory,
AccessControlMetadataFactory accessControlMetadataFactory,
HiveTableRedirectionsProvider tableRedirectionsProvider)
HiveTableRedirectionsProvider tableRedirectionsProvider,
TableInvalidationCallback tableInvalidationCallback)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.skipDeletionForAlter = skipDeletionForAlter;
Expand Down Expand Up @@ -186,6 +190,7 @@ public HiveMetadataFactory(
updateExecutor = new BoundedExecutor(executorService, maxConcurrentMetastoreUpdates);
}
this.heartbeatService = requireNonNull(heartbeatService, "heartbeatService is null");
this.tableInvalidationCallback = requireNonNull(tableInvalidationCallback, "tableInvalidationCallback is null");
}

@Override
Expand All @@ -204,7 +209,8 @@ public TransactionalMetadata create(ConnectorIdentity identity, boolean autoComm
skipTargetCleanupOnRollback,
deleteSchemaLocationsFallback,
hiveTransactionHeartbeatInterval,
heartbeatService);
heartbeatService,
tableInvalidationCallback);

return new HiveMetadata(
catalogName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ public class HiveModule
@Override
public void configure(Binder binder)
{
binder.bind(DirectoryLister.class).to(CachingDirectoryLister.class).in(Scopes.SINGLETON);
configBinder(binder).bindConfig(HiveConfig.class);
configBinder(binder).bindConfig(MetastoreConfig.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ private InternalHiveConnectorFactory() {}

public static Connector createConnector(String catalogName, Map<String, String> config, ConnectorContext context, Module module)
{
return createConnector(catalogName, config, context, module, Optional.empty());
return createConnector(catalogName, config, context, module, Optional.empty(), Optional.empty());
}

public static Connector createConnector(String catalogName, Map<String, String> config, ConnectorContext context, Module module, Optional<HiveMetastore> metastore)
public static Connector createConnector(String catalogName, Map<String, String> config, ConnectorContext context, Module module, Optional<HiveMetastore> metastore, Optional<CachingDirectoryLister> cachingDirectoryLister)
{
requireNonNull(config, "config is null");

Expand All @@ -95,6 +95,7 @@ public static Connector createConnector(String catalogName, Map<String, String>
new JsonModule(),
new TypeDeserializerModule(context.getTypeManager()),
new HiveModule(),
new CachingDirectoryListerModule(cachingDirectoryLister),
new HiveHdfsModule(),
new HiveS3Module(),
new HiveGcsModule(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive;

import io.trino.plugin.hive.metastore.Partition;
import io.trino.plugin.hive.metastore.Table;

public interface TableInvalidationCallback
{
TableInvalidationCallback NOOP = new TableInvalidationCallback() {
@Override
public void invalidate(Partition partition)
{
}

@Override
public void invalidate(Table table)
{
}
};

void invalidate(Partition partition);

void invalidate(Table table);
}
Loading