Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.airlift.event.client.EventClient;
import io.trino.plugin.base.CatalogName;
import io.trino.plugin.hive.fs.CachingDirectoryLister;
import io.trino.plugin.hive.fs.TrinoFileSystemCache;
import io.trino.plugin.hive.fs.TrinoFileSystemCacheStats;
import io.trino.plugin.hive.metastore.HiveMetastoreConfig;
import io.trino.plugin.hive.metastore.thrift.TranslateHiveViews;
import io.trino.plugin.hive.orc.OrcFileWriterFactory;
Expand Down Expand Up @@ -101,6 +103,10 @@ public void configure(Binder binder)
binder.bind(FileFormatDataSourceStats.class).in(Scopes.SINGLETON);
newExporter(binder).export(FileFormatDataSourceStats.class).withGeneratedName();

binder.bind(TrinoFileSystemCacheStats.class).toInstance(TrinoFileSystemCache.INSTANCE.getFileSystemCacheStats());
newExporter(binder).export(TrinoFileSystemCacheStats.class)
.as(generator -> generator.generatedNameOf(TrinoFileSystemCache.class));
Comment thread
posulliv marked this conversation as resolved.
Outdated

Multibinder<HivePageSourceFactory> pageSourceFactoryBinder = newSetBinder(binder, HivePageSourceFactory.class);
pageSourceFactoryBinder.addBinding().to(OrcPageSourceFactory.class).in(Scopes.SINGLETON);
pageSourceFactoryBinder.addBinding().to(ParquetPageSourceFactory.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.hive.fs;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.log.Logger;
Expand Down Expand Up @@ -64,25 +65,43 @@ public class TrinoFileSystemCache

private final AtomicLong unique = new AtomicLong();
Comment thread
findepi marked this conversation as resolved.
Outdated

private final TrinoFileSystemCacheStats stats;

@GuardedBy("this")
private final Map<FileSystemKey, FileSystemHolder> map = new HashMap<>();

private TrinoFileSystemCache() {}
@VisibleForTesting
TrinoFileSystemCache()
{
this.stats = new TrinoFileSystemCacheStats(() -> {
synchronized (this) {
return map.size();
}
});
}

@Override
public FileSystem get(URI uri, Configuration conf)
throws IOException
{
stats.newGetCall();
return getInternal(uri, conf, 0);
}

@Override
public FileSystem getUnique(URI uri, Configuration conf)
throws IOException
{
stats.newGetUniqueCall();
return getInternal(uri, conf, unique.incrementAndGet());
}

@VisibleForTesting
int getCacheSize()
{
return map.size();
}

private synchronized FileSystem getInternal(URI uri, Configuration conf, long unique)
throws IOException
{
Expand All @@ -94,11 +113,18 @@ private synchronized FileSystem getInternal(URI uri, Configuration conf, long un
if (fileSystemHolder == null) {
int maxSize = conf.getInt("fs.cache.max-size", 1000);
if (map.size() >= maxSize) {
stats.newGetCallFailed();
Comment thread
posulliv marked this conversation as resolved.
Outdated
throw new IOException(format("FileSystem max cache size has been reached: %s", maxSize));
}
FileSystem fileSystem = createFileSystem(uri, conf);
fileSystemHolder = new FileSystemHolder(fileSystem, privateCredentials);
map.put(key, fileSystemHolder);
try {
FileSystem fileSystem = createFileSystem(uri, conf);
fileSystemHolder = new FileSystemHolder(fileSystem, privateCredentials);
map.put(key, fileSystemHolder);
}
catch (IOException e) {
stats.newGetCallFailed();
throw e;
}
}

// Update file system instance when credentials change.
Expand All @@ -113,9 +139,15 @@ private synchronized FileSystem getInternal(URI uri, Configuration conf, long un
if ((isHdfs(uri) && !fileSystemHolder.getPrivateCredentials().equals(privateCredentials)) ||
extraCredentialsChanged(fileSystemHolder.getFileSystem(), conf)) {
map.remove(key);
Comment thread
posulliv marked this conversation as resolved.
Outdated
FileSystem fileSystem = createFileSystem(uri, conf);
fileSystemHolder = new FileSystemHolder(fileSystem, privateCredentials);
map.put(key, fileSystemHolder);
try {
FileSystem fileSystem = createFileSystem(uri, conf);
fileSystemHolder = new FileSystemHolder(fileSystem, privateCredentials);
map.put(key, fileSystemHolder);
Comment thread
posulliv marked this conversation as resolved.
Outdated
}
catch (IOException e) {
stats.newGetCallFailed();
throw e;
}
}

return fileSystemHolder.getFileSystem();
Expand Down Expand Up @@ -145,6 +177,7 @@ private static FileSystem createFileSystem(URI uri, Configuration conf)
@Override
public synchronized void remove(FileSystem fileSystem)
{
stats.newRemoveCall();
map.values().removeIf(holder -> holder.getFileSystem().equals(fileSystem));
}

Expand Down Expand Up @@ -381,4 +414,9 @@ public InputStream getWrappedStream()
return ((FSDataInputStream) super.getWrappedStream()).getWrappedStream();
}
}

public TrinoFileSystemCacheStats getFileSystemCacheStats()
{
return stats;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive.fs;

import io.airlift.stats.CounterStat;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

import java.util.function.LongSupplier;

import static java.util.Objects.requireNonNull;

public class TrinoFileSystemCacheStats
{
private final CounterStat getCalls = new CounterStat();
Comment thread
findepi marked this conversation as resolved.
Outdated
private final CounterStat getUniqueCalls = new CounterStat();
private final CounterStat getCallsFailed = new CounterStat();
private final CounterStat removeCalls = new CounterStat();
private final LongSupplier cacheSize;

TrinoFileSystemCacheStats(LongSupplier cacheSize)
{
this.cacheSize = requireNonNull(cacheSize, "cacheSize is null");
}

@Managed
public long getCacheSize()
{
return cacheSize.getAsLong();
}

@Managed
@Nested
public CounterStat getGetCalls()
Comment thread
posulliv marked this conversation as resolved.
Outdated
{
return getCalls;
}

@Managed
@Nested
public CounterStat getGetCallsFailed()
{
return getCallsFailed;
}

@Managed
@Nested
public CounterStat getGetUniqueCalls()
{
return getUniqueCalls;
}

@Managed
@Nested
public CounterStat getRemoveCalls()
{
return removeCalls;
}

public void newGetCall()
{
getCalls.update(1);
}

public void newGetUniqueCall()
{
getUniqueCalls.update(1);
}

public void newGetCallFailed()
{
getCallsFailed.update(1);
}

public void newRemoveCall()
{
removeCalls.update(1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive.fs;

import io.trino.plugin.hive.s3.TrinoS3FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.testng.annotations.Test;

import java.net.URI;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertEquals;

public class TestTrinoFileSystemCacheStats
{
@Test
public void testCacheSizeIsCorrect()
throws Exception
{
TrinoFileSystemCache trinoFileSystemCache = new TrinoFileSystemCache();
TrinoFileSystemCacheStats trinoFileSystemCacheStats = trinoFileSystemCache.getFileSystemCacheStats();
assertEquals(trinoFileSystemCacheStats.getCacheSize(), 0);
assertEquals(trinoFileSystemCache.getCacheSize(), 0);

Configuration configuration = new Configuration(true);
configuration.set("fs.s3.impl", TrinoS3FileSystem.class.getName());
trinoFileSystemCache.get(new URI("s3://bucket/path/"), configuration);
assertEquals(trinoFileSystemCacheStats.getGetCalls().getTotalCount(), 1);
assertEquals(trinoFileSystemCacheStats.getCacheSize(), 1);
assertEquals(trinoFileSystemCache.getCacheSize(), 1);

trinoFileSystemCache.get(new URI("s3://bucket/path1/"), configuration);
assertEquals(trinoFileSystemCacheStats.getGetCalls().getTotalCount(), 2);
assertEquals(trinoFileSystemCacheStats.getCacheSize(), 1);
assertEquals(trinoFileSystemCache.getCacheSize(), 1);

// use getUnique to ensure cache size is increased
FileSystem fileSystem = trinoFileSystemCache.getUnique(new URI("s3://bucket/path2/"), configuration);
assertEquals(trinoFileSystemCacheStats.getGetCalls().getTotalCount(), 2);
assertEquals(trinoFileSystemCacheStats.getGetUniqueCalls().getTotalCount(), 1);
assertEquals(trinoFileSystemCacheStats.getCacheSize(), 2);
assertEquals(trinoFileSystemCache.getCacheSize(), 2);

trinoFileSystemCache.remove(fileSystem);
assertEquals(trinoFileSystemCacheStats.getRemoveCalls().getTotalCount(), 1);
assertEquals(trinoFileSystemCacheStats.getCacheSize(), 1);
assertEquals(trinoFileSystemCache.getCacheSize(), 1);

trinoFileSystemCache.closeAll();
assertEquals(trinoFileSystemCacheStats.getCacheSize(), 0);
assertEquals(trinoFileSystemCache.getCacheSize(), 0);
}

@Test
public void testFailedCallsCountIsCorrect()
{
TrinoFileSystemCache trinoFileSystemCache = new TrinoFileSystemCache();
TrinoFileSystemCacheStats trinoFileSystemCacheStats = trinoFileSystemCache.getFileSystemCacheStats();
Configuration configuration = new Configuration(false);
configuration.setInt("fs.cache.max-size", 0);
assertThatThrownBy(() -> trinoFileSystemCache.get(new URI("s3://bucket/path/"), configuration))
.hasMessageMatching("FileSystem max cache size has been reached: 0");
assertEquals(trinoFileSystemCacheStats.getGetCallsFailed().getTotalCount(), 1);
assertEquals(trinoFileSystemCacheStats.getGetCalls().getTotalCount(), 1);
assertEquals(trinoFileSystemCacheStats.getCacheSize(), 0);
assertEquals(trinoFileSystemCache.getCacheSize(), 0);
}
}