Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions lib/trino-hdfs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,20 @@
<artifactId>jmxutils</artifactId>
</dependency>

<!-- used by tests but also needed transitively -->
<dependency>
<groupId>io.airlift</groupId>
<artifactId>concurrent</artifactId>
<scope>runtime</scope>
</dependency>

<!-- for testing -->
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-testing-services</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>testing</artifactId>
Expand All @@ -111,6 +124,18 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
Expand Down
183 changes: 113 additions & 70 deletions lib/trino-hdfs/src/main/java/io/trino/hdfs/TrinoFileSystemCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.hdfs;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.log.Logger;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -36,21 +35,20 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.gaul.modernizer_maven_annotations.SuppressModernizer;

import javax.annotation.concurrent.GuardedBy;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Strings.nullToEmpty;
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
Expand All @@ -70,17 +68,20 @@ public class TrinoFileSystemCache

private final TrinoFileSystemCacheStats stats;

@GuardedBy("this")
private final Map<FileSystemKey, FileSystemHolder> map = new HashMap<>();
private final Map<FileSystemKey, FileSystemHolder> cache = new ConcurrentHashMap<>();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a comment saying why we need cacheSize separately.

/*
* ConcurrentHashMap has a lock per partitioned key-space bucket, and hence there is no consistent
* or 'serialized' view of current number of entries in the map from a thread that would like to
* add/delete/update an entry. As we have to limit the max size of the cache to 'fs.cache.max-size',
* an auxiliary variable `cacheSize` is used to track the 'serialized' view of entry count in the cache.
* cacheSize should only be updated after acquiring cache's partition lock (eg: from inside cache.compute())
*/
private final AtomicLong cacheSize = new AtomicLong();

@VisibleForTesting
TrinoFileSystemCache()
{
this.stats = new TrinoFileSystemCacheStats(() -> {
synchronized (this) {
return map.size();
}
});
this.stats = new TrinoFileSystemCacheStats(cache::size);
}

@Override
Expand All @@ -102,55 +103,46 @@ public FileSystem getUnique(URI uri, Configuration conf)
@VisibleForTesting
int getCacheSize()
{
return map.size();
return cache.size();
Comment thread
phd3 marked this conversation as resolved.
}

private synchronized FileSystem getInternal(URI uri, Configuration conf, long unique)
private FileSystem getInternal(URI uri, Configuration conf, long unique)
throws IOException
{
UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser();
FileSystemKey key = createFileSystemKey(uri, userGroupInformation, unique);
Set<?> privateCredentials = getPrivateCredentials(userGroupInformation);

FileSystemHolder fileSystemHolder = map.get(key);
if (fileSystemHolder == null) {
int maxSize = conf.getInt("fs.cache.max-size", 1000);
if (map.size() >= maxSize) {
stats.newGetCallFailed();
throw new IOException(format("FileSystem max cache size has been reached: %s", maxSize));
}
try {
FileSystem fileSystem = createFileSystem(uri, conf);
fileSystemHolder = new FileSystemHolder(fileSystem, privateCredentials);
map.put(key, fileSystemHolder);
}
catch (IOException e) {
stats.newGetCallFailed();
throw e;
}
int maxSize = conf.getInt("fs.cache.max-size", 1000);
FileSystemHolder fileSystemHolder;
try {
fileSystemHolder = cache.compute(key, (k, currentFileSystemHolder) -> {
if (currentFileSystemHolder == null) {
if (cacheSize.getAndUpdate(currentSize -> Math.min(currentSize + 1, maxSize)) >= maxSize) {
throw new RuntimeException(
new IOException(format("FileSystem max cache size has been reached: %s", maxSize)));
}
return new FileSystemHolder(conf, privateCredentials);
}
else {
// Update file system instance when credentials change.
if (currentFileSystemHolder.credentialsChanged(uri, conf, privateCredentials)) {
return new FileSystemHolder(conf, privateCredentials);
}
else {
return currentFileSystemHolder;
}
}
});

// Now create the filesystem object outside of cache's lock
fileSystemHolder.createFileSystemOnce(uri, conf);
}

// Update file system instance when credentials change.
// - Private credentials are only set when using Kerberos authentication.
// When the user is the same, but the private credentials are different,
// that means that Kerberos ticket has expired and re-login happened.
// To prevent cache leak in such situation, the privateCredentials are not
// a part of the FileSystemKey, but part of the FileSystemHolder. When a
// Kerberos re-login occurs, re-create the file system and cache it using
// the same key.
// - Extra credentials are used to authenticate with certain file systems.
if ((isHdfs(uri) && !fileSystemHolder.getPrivateCredentials().equals(privateCredentials)) ||
extraCredentialsChanged(fileSystemHolder.getFileSystem(), conf)) {
map.remove(key);
try {
FileSystem fileSystem = createFileSystem(uri, conf);
fileSystemHolder = new FileSystemHolder(fileSystem, privateCredentials);
map.put(key, fileSystemHolder);
}
catch (IOException e) {
stats.newGetCallFailed();
throw e;
}
catch (RuntimeException | IOException e) {
stats.newGetCallFailed();
throwIfInstanceOf(e, IOException.class);
throwIfInstanceOf(e.getCause(), IOException.class);
throw e;
}

return fileSystemHolder.getFileSystem();
Expand Down Expand Up @@ -178,20 +170,54 @@ private static FileSystem createFileSystem(URI uri, Configuration conf)
}

@Override
public synchronized void remove(FileSystem fileSystem)
public void remove(FileSystem fileSystem)
{
stats.newRemoveCall();
map.values().removeIf(holder -> holder.getFileSystem().equals(fileSystem));
cache.forEach((key, fileSystemHolder) -> {
if (fileSystem.equals(fileSystemHolder.getFileSystem())) {
// After acquiring the lock, decrement cacheSize only if
// (1) the key is still mapped to a FileSystemHolder
// (2) the filesystem object inside FileSystemHolder is the same
cache.compute(key, (k, currentFileSystemHolder) -> {
if (currentFileSystemHolder != null
&& fileSystem.equals(currentFileSystemHolder.getFileSystem())) {
cacheSize.decrementAndGet();
return null;
}
return currentFileSystemHolder;
});
}
});
}

@Override
public synchronized void closeAll()
public void closeAll()
throws IOException
{
for (FileSystemHolder fileSystemHolder : ImmutableList.copyOf(map.values())) {
closeFileSystem(fileSystemHolder.getFileSystem());
try {
cache.forEach((key, fileSystemHolder) -> {
try {
cache.compute(key, (k, currentFileSystemHolder) -> {
// decrement cacheSize only if the key is still mapped
if (currentFileSystemHolder != null) {
cacheSize.decrementAndGet();
}
return null;
});
FileSystem fs = fileSystemHolder.getFileSystem();
if (fs != null) {
closeFileSystem(fs);
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
});
}
catch (RuntimeException e) {
throwIfInstanceOf(e.getCause(), IOException.class);
throw e;
}
map.clear();
}

@SuppressModernizer
Expand Down Expand Up @@ -245,12 +271,6 @@ private static boolean isHdfs(URI uri)
return "hdfs".equals(scheme) || "viewfs".equals(scheme);
}

private static boolean extraCredentialsChanged(FileSystem fileSystem, Configuration configuration)
{
return !configuration.get(CACHE_KEY, "").equals(
fileSystem.getConf().get(CACHE_KEY, ""));
}

private static class FileSystemKey
{
private final String scheme;
Expand Down Expand Up @@ -306,23 +326,45 @@ public String toString()

private static class FileSystemHolder
{
private final FileSystem fileSystem;
private final Set<?> privateCredentials;
private final String cacheCredentials;
private volatile FileSystem fileSystem;

public FileSystemHolder(FileSystem fileSystem, Set<?> privateCredentials)
public FileSystemHolder(Configuration conf, Set<?> privateCredentials)
{
this.fileSystem = requireNonNull(fileSystem, "fileSystem is null");
this.privateCredentials = ImmutableSet.copyOf(requireNonNull(privateCredentials, "privateCredentials is null"));
this.cacheCredentials = conf.get(CACHE_KEY, "");
}

public FileSystem getFileSystem()
public void createFileSystemOnce(URI uri, Configuration conf)
throws IOException
{
return fileSystem;
if (fileSystem == null) {
synchronized (this) {
if (fileSystem == null) {
fileSystem = TrinoFileSystemCache.createFileSystem(uri, conf);
}
}
}
}

public Set<?> getPrivateCredentials()
public boolean credentialsChanged(URI newUri, Configuration newConf, Set<?> newPrivateCredentials)
{
return privateCredentials;
// - Private credentials are only set when using Kerberos authentication.
// When the user is the same, but the private credentials are different,
// that means that Kerberos ticket has expired and re-login happened.
// To prevent cache leak in such situation, the privateCredentials are not
// a part of the FileSystemKey, but part of the FileSystemHolder. When a
// Kerberos re-login occurs, re-create the file system and cache it using
// the same key.
// - Extra credentials are used to authenticate with certain file systems.
return (isHdfs(newUri) && !this.privateCredentials.equals(newPrivateCredentials))
|| !this.cacheCredentials.equals(newConf.get(CACHE_KEY, ""));
}

public FileSystem getFileSystem()
{
return fileSystem;
}

@Override
Expand All @@ -331,6 +373,7 @@ public String toString()
return toStringHelper(this)
.add("fileSystem", fileSystem)
.add("privateCredentials", privateCredentials)
.add("cacheCredentials", cacheCredentials)
.toString();
}
}
Expand Down
Loading