diff --git a/lib/trino-hdfs/pom.xml b/lib/trino-hdfs/pom.xml
index a908ed991e2c..2eb528bdc235 100644
--- a/lib/trino-hdfs/pom.xml
+++ b/lib/trino-hdfs/pom.xml
@@ -98,7 +98,20 @@
jmxutils
+
+
+ io.airlift
+ concurrent
+ runtime
+
+
+
+ io.trino
+ trino-testing-services
+ test
+
+
io.airlift
testing
@@ -111,6 +124,18 @@
test
+
+ org.openjdk.jmh
+ jmh-core
+ test
+
+
+
+ org.openjdk.jmh
+ jmh-generator-annprocess
+ test
+
+
org.testng
testng
diff --git a/lib/trino-hdfs/src/main/java/io/trino/hdfs/TrinoFileSystemCache.java b/lib/trino-hdfs/src/main/java/io/trino/hdfs/TrinoFileSystemCache.java
index b36936b117da..246364a91880 100644
--- a/lib/trino-hdfs/src/main/java/io/trino/hdfs/TrinoFileSystemCache.java
+++ b/lib/trino-hdfs/src/main/java/io/trino/hdfs/TrinoFileSystemCache.java
@@ -14,7 +14,6 @@
package io.trino.hdfs;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.log.Logger;
import org.apache.hadoop.conf.Configuration;
@@ -36,21 +35,20 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.gaul.modernizer_maven_annotations.SuppressModernizer;
-import javax.annotation.concurrent.GuardedBy;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.EnumSet;
-import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Strings.nullToEmpty;
+import static com.google.common.base.Throwables.throwIfInstanceOf;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
@@ -70,17 +68,20 @@ public class TrinoFileSystemCache
private final TrinoFileSystemCacheStats stats;
- @GuardedBy("this")
- private final Map map = new HashMap<>();
+ private final Map cache = new ConcurrentHashMap<>();
+ /*
+ * ConcurrentHashMap has a lock per partitioned key-space bucket, and hence there is no consistent
+ * or 'serialized' view of current number of entries in the map from a thread that would like to
+ * add/delete/update an entry. As we have to limit the max size of the cache to 'fs.cache.max-size',
+ * an auxiliary variable `cacheSize` is used to track the 'serialized' view of entry count in the cache.
+ * cacheSize should only be updated after acquiring cache's partition lock (eg: from inside cache.compute())
+ */
+ private final AtomicLong cacheSize = new AtomicLong();
@VisibleForTesting
TrinoFileSystemCache()
{
- this.stats = new TrinoFileSystemCacheStats(() -> {
- synchronized (this) {
- return map.size();
- }
- });
+ this.stats = new TrinoFileSystemCacheStats(cache::size);
}
@Override
@@ -102,55 +103,46 @@ public FileSystem getUnique(URI uri, Configuration conf)
@VisibleForTesting
int getCacheSize()
{
- return map.size();
+ return cache.size();
}
- private synchronized FileSystem getInternal(URI uri, Configuration conf, long unique)
+ private FileSystem getInternal(URI uri, Configuration conf, long unique)
throws IOException
{
UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser();
FileSystemKey key = createFileSystemKey(uri, userGroupInformation, unique);
Set> privateCredentials = getPrivateCredentials(userGroupInformation);
- FileSystemHolder fileSystemHolder = map.get(key);
- if (fileSystemHolder == null) {
- int maxSize = conf.getInt("fs.cache.max-size", 1000);
- if (map.size() >= maxSize) {
- stats.newGetCallFailed();
- throw new IOException(format("FileSystem max cache size has been reached: %s", maxSize));
- }
- try {
- FileSystem fileSystem = createFileSystem(uri, conf);
- fileSystemHolder = new FileSystemHolder(fileSystem, privateCredentials);
- map.put(key, fileSystemHolder);
- }
- catch (IOException e) {
- stats.newGetCallFailed();
- throw e;
- }
+ int maxSize = conf.getInt("fs.cache.max-size", 1000);
+ FileSystemHolder fileSystemHolder;
+ try {
+ fileSystemHolder = cache.compute(key, (k, currentFileSystemHolder) -> {
+ if (currentFileSystemHolder == null) {
+ if (cacheSize.getAndUpdate(currentSize -> Math.min(currentSize + 1, maxSize)) >= maxSize) {
+ throw new RuntimeException(
+ new IOException(format("FileSystem max cache size has been reached: %s", maxSize)));
+ }
+ return new FileSystemHolder(conf, privateCredentials);
+ }
+ else {
+ // Update file system instance when credentials change.
+ if (currentFileSystemHolder.credentialsChanged(uri, conf, privateCredentials)) {
+ return new FileSystemHolder(conf, privateCredentials);
+ }
+ else {
+ return currentFileSystemHolder;
+ }
+ }
+ });
+
+ // Now create the filesystem object outside of cache's lock
+ fileSystemHolder.createFileSystemOnce(uri, conf);
}
-
- // Update file system instance when credentials change.
- // - Private credentials are only set when using Kerberos authentication.
- // When the user is the same, but the private credentials are different,
- // that means that Kerberos ticket has expired and re-login happened.
- // To prevent cache leak in such situation, the privateCredentials are not
- // a part of the FileSystemKey, but part of the FileSystemHolder. When a
- // Kerberos re-login occurs, re-create the file system and cache it using
- // the same key.
- // - Extra credentials are used to authenticate with certain file systems.
- if ((isHdfs(uri) && !fileSystemHolder.getPrivateCredentials().equals(privateCredentials)) ||
- extraCredentialsChanged(fileSystemHolder.getFileSystem(), conf)) {
- map.remove(key);
- try {
- FileSystem fileSystem = createFileSystem(uri, conf);
- fileSystemHolder = new FileSystemHolder(fileSystem, privateCredentials);
- map.put(key, fileSystemHolder);
- }
- catch (IOException e) {
- stats.newGetCallFailed();
- throw e;
- }
+ catch (RuntimeException | IOException e) {
+ stats.newGetCallFailed();
+ throwIfInstanceOf(e, IOException.class);
+ throwIfInstanceOf(e.getCause(), IOException.class);
+ throw e;
}
return fileSystemHolder.getFileSystem();
@@ -178,20 +170,54 @@ private static FileSystem createFileSystem(URI uri, Configuration conf)
}
@Override
- public synchronized void remove(FileSystem fileSystem)
+ public void remove(FileSystem fileSystem)
{
stats.newRemoveCall();
- map.values().removeIf(holder -> holder.getFileSystem().equals(fileSystem));
+ cache.forEach((key, fileSystemHolder) -> {
+ if (fileSystem.equals(fileSystemHolder.getFileSystem())) {
+ // After acquiring the lock, decrement cacheSize only if
+ // (1) the key is still mapped to a FileSystemHolder
+ // (2) the filesystem object inside FileSystemHolder is the same
+ cache.compute(key, (k, currentFileSystemHolder) -> {
+ if (currentFileSystemHolder != null
+ && fileSystem.equals(currentFileSystemHolder.getFileSystem())) {
+ cacheSize.decrementAndGet();
+ return null;
+ }
+ return currentFileSystemHolder;
+ });
+ }
+ });
}
@Override
- public synchronized void closeAll()
+ public void closeAll()
throws IOException
{
- for (FileSystemHolder fileSystemHolder : ImmutableList.copyOf(map.values())) {
- closeFileSystem(fileSystemHolder.getFileSystem());
+ try {
+ cache.forEach((key, fileSystemHolder) -> {
+ try {
+ cache.compute(key, (k, currentFileSystemHolder) -> {
+ // decrement cacheSize only if the key is still mapped
+ if (currentFileSystemHolder != null) {
+ cacheSize.decrementAndGet();
+ }
+ return null;
+ });
+ FileSystem fs = fileSystemHolder.getFileSystem();
+ if (fs != null) {
+ closeFileSystem(fs);
+ }
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ catch (RuntimeException e) {
+ throwIfInstanceOf(e.getCause(), IOException.class);
+ throw e;
}
- map.clear();
}
@SuppressModernizer
@@ -245,12 +271,6 @@ private static boolean isHdfs(URI uri)
return "hdfs".equals(scheme) || "viewfs".equals(scheme);
}
- private static boolean extraCredentialsChanged(FileSystem fileSystem, Configuration configuration)
- {
- return !configuration.get(CACHE_KEY, "").equals(
- fileSystem.getConf().get(CACHE_KEY, ""));
- }
-
private static class FileSystemKey
{
private final String scheme;
@@ -306,23 +326,45 @@ public String toString()
private static class FileSystemHolder
{
- private final FileSystem fileSystem;
private final Set> privateCredentials;
+ private final String cacheCredentials;
+ private volatile FileSystem fileSystem;
- public FileSystemHolder(FileSystem fileSystem, Set> privateCredentials)
+ public FileSystemHolder(Configuration conf, Set> privateCredentials)
{
- this.fileSystem = requireNonNull(fileSystem, "fileSystem is null");
this.privateCredentials = ImmutableSet.copyOf(requireNonNull(privateCredentials, "privateCredentials is null"));
+ this.cacheCredentials = conf.get(CACHE_KEY, "");
}
- public FileSystem getFileSystem()
+ public void createFileSystemOnce(URI uri, Configuration conf)
+ throws IOException
{
- return fileSystem;
+ if (fileSystem == null) {
+ synchronized (this) {
+ if (fileSystem == null) {
+ fileSystem = TrinoFileSystemCache.createFileSystem(uri, conf);
+ }
+ }
+ }
}
- public Set> getPrivateCredentials()
+ public boolean credentialsChanged(URI newUri, Configuration newConf, Set> newPrivateCredentials)
{
- return privateCredentials;
+ // - Private credentials are only set when using Kerberos authentication.
+ // When the user is the same, but the private credentials are different,
+ // that means that Kerberos ticket has expired and re-login happened.
+ // To prevent cache leak in such situation, the privateCredentials are not
+ // a part of the FileSystemKey, but part of the FileSystemHolder. When a
+ // Kerberos re-login occurs, re-create the file system and cache it using
+ // the same key.
+ // - Extra credentials are used to authenticate with certain file systems.
+ return (isHdfs(newUri) && !this.privateCredentials.equals(newPrivateCredentials))
+ || !this.cacheCredentials.equals(newConf.get(CACHE_KEY, ""));
+ }
+
+ public FileSystem getFileSystem()
+ {
+ return fileSystem;
}
@Override
@@ -331,6 +373,7 @@ public String toString()
return toStringHelper(this)
.add("fileSystem", fileSystem)
.add("privateCredentials", privateCredentials)
+ .add("cacheCredentials", cacheCredentials)
.toString();
}
}
diff --git a/lib/trino-hdfs/src/test/java/io/trino/hdfs/BenchmarkGetFileSystem.java b/lib/trino-hdfs/src/test/java/io/trino/hdfs/BenchmarkGetFileSystem.java
new file mode 100644
index 000000000000..457a23c70b6a
--- /dev/null
+++ b/lib/trino-hdfs/src/test/java/io/trino/hdfs/BenchmarkGetFileSystem.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.hdfs;
+
+import io.trino.jmh.Benchmarks;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.SplittableRandom;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static io.airlift.concurrent.MoreFutures.getFutureValue;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Warmup(iterations = 5, time = 5000, timeUnit = MILLISECONDS)
+@Fork(2)
+@Measurement(iterations = 5, time = 5000, timeUnit = MILLISECONDS)
+@BenchmarkMode(Mode.AverageTime)
+public class BenchmarkGetFileSystem
+{
+ @State(Scope.Thread)
+ public static class BenchmarkData
+ {
+ @Param({"10", "100", "1000"})
+ private int userCount;
+
+ @Param({"1", "16"})
+ private int threadCount;
+
+ @Param("1000")
+ private int getCallsPerInvocation;
+
+ public List> callableTasks;
+ public ExecutorService executor;
+ public Blackhole blackhole;
+
+ @Setup(Level.Invocation)
+ public void setUp(Blackhole blackhole)
+ {
+ this.blackhole = blackhole;
+
+ this.callableTasks = new ArrayList<>();
+ for (int i = 0; i < threadCount; i++) {
+ this.callableTasks.add(new TestFileSystemCache.CreateFileSystemsAndConsume(
+ new SplittableRandom(i), userCount, getCallsPerInvocation, fs -> {}));
+ }
+
+ this.executor = Executors.newFixedThreadPool(threadCount);
+ }
+
+ @TearDown(Level.Invocation)
+ public void tearDown()
+ throws IOException
+ {
+ TrinoFileSystemCache.INSTANCE.closeAll();
+ executor.shutdownNow();
+ }
+ }
+
+ @Benchmark
+ public void benchmark(BenchmarkData data)
+ throws InterruptedException, ExecutionException
+ {
+ data.executor.invokeAll(data.callableTasks).forEach(f -> data.blackhole.consume(getFutureValue(f)));
+ }
+
+ public static void main(String[] args)
+ throws Exception
+ {
+ Benchmarks.benchmark(BenchmarkGetFileSystem.class).run();
+ }
+}
diff --git a/lib/trino-hdfs/src/test/java/io/trino/hdfs/TestFSDataInputStreamTail.java b/lib/trino-hdfs/src/test/java/io/trino/hdfs/TestFSDataInputStreamTail.java
index 0e1f7a054daa..fa1fbd14b596 100644
--- a/lib/trino-hdfs/src/test/java/io/trino/hdfs/TestFSDataInputStreamTail.java
+++ b/lib/trino-hdfs/src/test/java/io/trino/hdfs/TestFSDataInputStreamTail.java
@@ -70,6 +70,7 @@ public void tearDown()
closeAll(
() -> fs.delete(new Path(tempRoot.toURI()), true),
fs);
+ fs = null;
}
@Test
diff --git a/lib/trino-hdfs/src/test/java/io/trino/hdfs/TestFileSystemCache.java b/lib/trino-hdfs/src/test/java/io/trino/hdfs/TestFileSystemCache.java
index e8c003da8666..93d39462e48c 100644
--- a/lib/trino-hdfs/src/test/java/io/trino/hdfs/TestFileSystemCache.java
+++ b/lib/trino-hdfs/src/test/java/io/trino/hdfs/TestFileSystemCache.java
@@ -14,22 +14,45 @@
package io.trino.hdfs;
import com.google.common.collect.ImmutableSet;
+import io.airlift.concurrent.MoreFutures;
import io.trino.hdfs.authentication.ImpersonatingHdfsAuthentication;
import io.trino.hdfs.authentication.SimpleHadoopAuthentication;
import io.trino.hdfs.authentication.SimpleUserNameProvider;
import io.trino.spi.security.ConnectorIdentity;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.gaul.modernizer_maven_annotations.SuppressModernizer;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.SplittableRandom;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import static io.trino.hadoop.ConfigurationInstantiator.newEmptyConfiguration;
+import static java.util.Objects.requireNonNull;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotSame;
import static org.testng.Assert.assertSame;
+@Test(singleThreaded = true)
public class TestFileSystemCache
{
+ @BeforeMethod(alwaysRun = true)
+ @AfterClass(alwaysRun = true)
+ public void cleanup()
+ throws IOException
+ {
+ FileSystem.closeAll();
+ }
+
@Test
public void testFileSystemCache()
throws IOException
@@ -56,9 +79,98 @@ public void testFileSystemCache()
assertNotSame(fs5, fs1);
}
- private FileSystem getFileSystem(HdfsEnvironment environment, ConnectorIdentity identity)
+ @Test
+ public void testFileSystemCacheException() throws IOException
+ {
+ HdfsEnvironment environment = new HdfsEnvironment(
+ new DynamicHdfsConfiguration(new HdfsConfigurationInitializer(new HdfsConfig()), ImmutableSet.of()),
+ new HdfsConfig(),
+ new ImpersonatingHdfsAuthentication(new SimpleHadoopAuthentication(), new SimpleUserNameProvider()));
+
+ int maxCacheSize = 1000;
+ for (int i = 0; i < maxCacheSize; i++) {
+ assertEquals(TrinoFileSystemCache.INSTANCE.getFileSystemCacheStats().getCacheSize(), i);
+ getFileSystem(environment, ConnectorIdentity.ofUser("user" + i));
+ }
+ assertEquals(TrinoFileSystemCache.INSTANCE.getFileSystemCacheStats().getCacheSize(), maxCacheSize);
+ assertThatThrownBy(() -> getFileSystem(environment, ConnectorIdentity.ofUser("user" + maxCacheSize)))
+ .isInstanceOf(IOException.class)
+ .hasMessage("FileSystem max cache size has been reached: " + maxCacheSize);
+ }
+
+ @Test
+ public void testFileSystemCacheConcurrency() throws InterruptedException, ExecutionException, IOException
+ {
+ int numThreads = 20;
+ List> callableTasks = new ArrayList<>();
+ for (int i = 0; i < numThreads; i++) {
+ callableTasks.add(
+ new CreateFileSystemsAndConsume(
+ new SplittableRandom(i),
+ 10,
+ 1000,
+ new FileSystemCloser()));
+ }
+ ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+
+ assertEquals(TrinoFileSystemCache.INSTANCE.getFileSystemCacheStats().getCacheSize(), 0);
+ executor.invokeAll(callableTasks).forEach(MoreFutures::getFutureValue);
+ executor.shutdown();
+ assertEquals(TrinoFileSystemCache.INSTANCE.getFileSystemCacheStats().getCacheSize(), 0, "Cache size is non zero");
+ }
+
+ private static FileSystem getFileSystem(HdfsEnvironment environment, ConnectorIdentity identity)
throws IOException
{
return environment.getFileSystem(identity, new Path("/"), newEmptyConfiguration());
}
+
+ @FunctionalInterface
+ public interface FileSystemConsumer
+ {
+ void consume(FileSystem fileSystem) throws IOException;
+ }
+
+ private static class FileSystemCloser
+ implements FileSystemConsumer
+ {
+ @Override
+ @SuppressModernizer
+ public void consume(FileSystem fileSystem) throws IOException
+ {
+ fileSystem.close(); /* triggers fscache.remove() */
+ }
+ }
+
+ public static class CreateFileSystemsAndConsume
+ implements Callable
+ {
+ private final SplittableRandom random;
+ private final int userCount;
+ private final int getCallsPerInvocation;
+ private final FileSystemConsumer consumer;
+
+ private static final HdfsEnvironment environment = new HdfsEnvironment(
+ new DynamicHdfsConfiguration(new HdfsConfigurationInitializer(new HdfsConfig()), ImmutableSet.of()),
+ new HdfsConfig(),
+ new ImpersonatingHdfsAuthentication(new SimpleHadoopAuthentication(), new SimpleUserNameProvider()));
+
+ CreateFileSystemsAndConsume(SplittableRandom random, int numUsers, int numGetCallsPerInvocation, FileSystemConsumer consumer)
+ {
+ this.random = requireNonNull(random, "random is null");
+ this.userCount = numUsers;
+ this.getCallsPerInvocation = numGetCallsPerInvocation;
+ this.consumer = consumer;
+ }
+
+ @Override
+ public Void call() throws IOException
+ {
+ for (int i = 0; i < getCallsPerInvocation; i++) {
+ FileSystem fs = getFileSystem(environment, ConnectorIdentity.ofUser("user" + random.nextInt(userCount)));
+ consumer.consume(fs);
+ }
+ return null;
+ }
+ }
}