diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java index 697d295594c3..961225dc60a4 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java @@ -22,6 +22,8 @@ import io.airlift.event.client.EventClient; import io.trino.plugin.base.CatalogName; import io.trino.plugin.hive.fs.CachingDirectoryLister; +import io.trino.plugin.hive.fs.TrinoFileSystemCache; +import io.trino.plugin.hive.fs.TrinoFileSystemCacheStats; import io.trino.plugin.hive.metastore.HiveMetastoreConfig; import io.trino.plugin.hive.metastore.thrift.TranslateHiveViews; import io.trino.plugin.hive.orc.OrcFileWriterFactory; @@ -101,6 +103,10 @@ public void configure(Binder binder) binder.bind(FileFormatDataSourceStats.class).in(Scopes.SINGLETON); newExporter(binder).export(FileFormatDataSourceStats.class).withGeneratedName(); + binder.bind(TrinoFileSystemCacheStats.class).toInstance(TrinoFileSystemCache.INSTANCE.getFileSystemCacheStats()); + newExporter(binder).export(TrinoFileSystemCacheStats.class) + .as(generator -> generator.generatedNameOf(TrinoFileSystemCache.class)); + Multibinder pageSourceFactoryBinder = newSetBinder(binder, HivePageSourceFactory.class); pageSourceFactoryBinder.addBinding().to(OrcPageSourceFactory.class).in(Scopes.SINGLETON); pageSourceFactoryBinder.addBinding().to(ParquetPageSourceFactory.class).in(Scopes.SINGLETON); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TrinoFileSystemCache.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TrinoFileSystemCache.java index f36e1933e8c6..26a25fa2dd91 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TrinoFileSystemCache.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TrinoFileSystemCache.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.hive.fs; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import io.airlift.log.Logger; @@ -64,15 +65,26 @@ public class TrinoFileSystemCache private final AtomicLong unique = new AtomicLong(); + private final TrinoFileSystemCacheStats stats; + @GuardedBy("this") private final Map map = new HashMap<>(); - private TrinoFileSystemCache() {} + @VisibleForTesting + TrinoFileSystemCache() + { + this.stats = new TrinoFileSystemCacheStats(() -> { + synchronized (this) { + return map.size(); + } + }); + } @Override public FileSystem get(URI uri, Configuration conf) throws IOException { + stats.newGetCall(); return getInternal(uri, conf, 0); } @@ -80,9 +92,16 @@ public FileSystem get(URI uri, Configuration conf) public FileSystem getUnique(URI uri, Configuration conf) throws IOException { + stats.newGetUniqueCall(); return getInternal(uri, conf, unique.incrementAndGet()); } + @VisibleForTesting + int getCacheSize() + { + return map.size(); + } + private synchronized FileSystem getInternal(URI uri, Configuration conf, long unique) throws IOException { @@ -94,11 +113,18 @@ private synchronized FileSystem getInternal(URI uri, Configuration conf, long un if (fileSystemHolder == null) { int maxSize = conf.getInt("fs.cache.max-size", 1000); if (map.size() >= maxSize) { + stats.newGetCallFailed(); throw new IOException(format("FileSystem max cache size has been reached: %s", maxSize)); } - FileSystem fileSystem = createFileSystem(uri, conf); - fileSystemHolder = new FileSystemHolder(fileSystem, privateCredentials); - map.put(key, fileSystemHolder); + try { + FileSystem fileSystem = createFileSystem(uri, conf); + fileSystemHolder = new FileSystemHolder(fileSystem, privateCredentials); + map.put(key, fileSystemHolder); + } + catch (IOException e) { + stats.newGetCallFailed(); + throw e; + } } // Update file system instance when credentials change. @@ -113,9 +139,15 @@ private synchronized FileSystem getInternal(URI uri, Configuration conf, long un if ((isHdfs(uri) && !fileSystemHolder.getPrivateCredentials().equals(privateCredentials)) || extraCredentialsChanged(fileSystemHolder.getFileSystem(), conf)) { map.remove(key); - FileSystem fileSystem = createFileSystem(uri, conf); - fileSystemHolder = new FileSystemHolder(fileSystem, privateCredentials); - map.put(key, fileSystemHolder); + try { + FileSystem fileSystem = createFileSystem(uri, conf); + fileSystemHolder = new FileSystemHolder(fileSystem, privateCredentials); + map.put(key, fileSystemHolder); + } + catch (IOException e) { + stats.newGetCallFailed(); + throw e; + } } return fileSystemHolder.getFileSystem(); @@ -145,6 +177,7 @@ private static FileSystem createFileSystem(URI uri, Configuration conf) @Override public synchronized void remove(FileSystem fileSystem) { + stats.newRemoveCall(); map.values().removeIf(holder -> holder.getFileSystem().equals(fileSystem)); } @@ -381,4 +414,9 @@ public InputStream getWrappedStream() return ((FSDataInputStream) super.getWrappedStream()).getWrappedStream(); } } + + public TrinoFileSystemCacheStats getFileSystemCacheStats() + { + return stats; + } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TrinoFileSystemCacheStats.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TrinoFileSystemCacheStats.java new file mode 100644 index 000000000000..88db2588f092 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TrinoFileSystemCacheStats.java @@ -0,0 +1,90 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.fs; + +import io.airlift.stats.CounterStat; +import org.weakref.jmx.Managed; +import org.weakref.jmx.Nested; + +import java.util.function.LongSupplier; + +import static java.util.Objects.requireNonNull; + +public class TrinoFileSystemCacheStats +{ + private final CounterStat getCalls = new CounterStat(); + private final CounterStat getUniqueCalls = new CounterStat(); + private final CounterStat getCallsFailed = new CounterStat(); + private final CounterStat removeCalls = new CounterStat(); + private final LongSupplier cacheSize; + + TrinoFileSystemCacheStats(LongSupplier cacheSize) + { + this.cacheSize = requireNonNull(cacheSize, "cacheSize is null"); + } + + @Managed + public long getCacheSize() + { + return cacheSize.getAsLong(); + } + + @Managed + @Nested + public CounterStat getGetCalls() + { + return getCalls; + } + + @Managed + @Nested + public CounterStat getGetCallsFailed() + { + return getCallsFailed; + } + + @Managed + @Nested + public CounterStat getGetUniqueCalls() + { + return getUniqueCalls; + } + + @Managed + @Nested + public CounterStat getRemoveCalls() + { + return removeCalls; + } + + public void newGetCall() + { + getCalls.update(1); + } + + public void newGetUniqueCall() + { + getUniqueCalls.update(1); + } + + public void newGetCallFailed() + { + getCallsFailed.update(1); + } + + public void newRemoveCall() + { + removeCalls.update(1); + } +} diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestTrinoFileSystemCacheStats.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestTrinoFileSystemCacheStats.java new file mode 100644 index 000000000000..5f44f5f4b32e --- /dev/null +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestTrinoFileSystemCacheStats.java @@ -0,0 +1,80 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.fs; + +import io.trino.plugin.hive.s3.TrinoS3FileSystem; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.testng.annotations.Test; + +import java.net.URI; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.testng.Assert.assertEquals; + +public class TestTrinoFileSystemCacheStats +{ + @Test + public void testCacheSizeIsCorrect() + throws Exception + { + TrinoFileSystemCache trinoFileSystemCache = new TrinoFileSystemCache(); + TrinoFileSystemCacheStats trinoFileSystemCacheStats = trinoFileSystemCache.getFileSystemCacheStats(); + assertEquals(trinoFileSystemCacheStats.getCacheSize(), 0); + assertEquals(trinoFileSystemCache.getCacheSize(), 0); + + Configuration configuration = new Configuration(true); + configuration.set("fs.s3.impl", TrinoS3FileSystem.class.getName()); + trinoFileSystemCache.get(new URI("s3://bucket/path/"), configuration); + assertEquals(trinoFileSystemCacheStats.getGetCalls().getTotalCount(), 1); + assertEquals(trinoFileSystemCacheStats.getCacheSize(), 1); + assertEquals(trinoFileSystemCache.getCacheSize(), 1); + + trinoFileSystemCache.get(new URI("s3://bucket/path1/"), configuration); + assertEquals(trinoFileSystemCacheStats.getGetCalls().getTotalCount(), 2); + assertEquals(trinoFileSystemCacheStats.getCacheSize(), 1); + assertEquals(trinoFileSystemCache.getCacheSize(), 1); + + // use getUnique to ensure cache size is increased + FileSystem fileSystem = trinoFileSystemCache.getUnique(new URI("s3://bucket/path2/"), configuration); + assertEquals(trinoFileSystemCacheStats.getGetCalls().getTotalCount(), 2); + assertEquals(trinoFileSystemCacheStats.getGetUniqueCalls().getTotalCount(), 1); + assertEquals(trinoFileSystemCacheStats.getCacheSize(), 2); + assertEquals(trinoFileSystemCache.getCacheSize(), 2); + + trinoFileSystemCache.remove(fileSystem); + assertEquals(trinoFileSystemCacheStats.getRemoveCalls().getTotalCount(), 1); + assertEquals(trinoFileSystemCacheStats.getCacheSize(), 1); + assertEquals(trinoFileSystemCache.getCacheSize(), 1); + + trinoFileSystemCache.closeAll(); + assertEquals(trinoFileSystemCacheStats.getCacheSize(), 0); + assertEquals(trinoFileSystemCache.getCacheSize(), 0); + } + + @Test + public void testFailedCallsCountIsCorrect() + { + TrinoFileSystemCache trinoFileSystemCache = new TrinoFileSystemCache(); + TrinoFileSystemCacheStats trinoFileSystemCacheStats = trinoFileSystemCache.getFileSystemCacheStats(); + Configuration configuration = new Configuration(false); + configuration.setInt("fs.cache.max-size", 0); + assertThatThrownBy(() -> trinoFileSystemCache.get(new URI("s3://bucket/path/"), configuration)) + .hasMessageMatching("FileSystem max cache size has been reached: 0"); + assertEquals(trinoFileSystemCacheStats.getGetCallsFailed().getTotalCount(), 1); + assertEquals(trinoFileSystemCacheStats.getGetCalls().getTotalCount(), 1); + assertEquals(trinoFileSystemCacheStats.getCacheSize(), 0); + assertEquals(trinoFileSystemCache.getCacheSize(), 0); + } +}