Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/trino-hdfs/src/main/java/io/trino/hdfs/HdfsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ public HdfsConfig setWireEncryptionEnabled(boolean wireEncryptionEnabled)
return this;
}

@Min(0)
public int getFileSystemMaxCacheSize()
{
return fileSystemMaxCacheSize;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.hdfs;

import org.apache.hadoop.conf.Configuration;

import javax.inject.Inject;

import java.net.URI;

public class HdfsFileSystemCacheConfigurationProvider
implements DynamicConfigurationProvider
{
private final boolean cachingDisabled;

@Inject
public HdfsFileSystemCacheConfigurationProvider(HdfsConfig hdfsConfig)
{
this.cachingDisabled = hdfsConfig.getFileSystemMaxCacheSize() == 0;
}

@Override
public void updateConfiguration(Configuration configuration, HdfsContext context, URI uri)
{
if (cachingDisabled) {
configuration.setBoolean("fs." + uri.getScheme() + ".impl.disable.cache", true);
}
}
}
3 changes: 2 additions & 1 deletion lib/trino-hdfs/src/main/java/io/trino/hdfs/HdfsModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public void configure(Binder binder)

binder.bind(HdfsConfigurationInitializer.class).in(Scopes.SINGLETON);
newSetBinder(binder, ConfigurationInitializer.class);
newSetBinder(binder, DynamicConfigurationProvider.class);
newSetBinder(binder, DynamicConfigurationProvider.class)
.addBinding().to(HdfsFileSystemCacheConfigurationProvider.class).in(Scopes.SINGLETON);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static io.trino.hadoop.ConfigurationInstantiator.newEmptyConfiguration;
import static io.trino.plugin.base.security.UserNameProvider.SIMPLE_USER_NAME_PROVIDER;
import static java.util.Objects.requireNonNull;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand All @@ -58,7 +57,7 @@ public void testFileSystemCache()
throws IOException
{
HdfsEnvironment environment = new HdfsEnvironment(
new DynamicHdfsConfiguration(new HdfsConfigurationInitializer(new HdfsConfig()), ImmutableSet.of()),
new DynamicHdfsConfiguration(new HdfsConfigurationInitializer(new HdfsConfig()), ImmutableSet.of(new HdfsFileSystemCacheConfigurationProvider(new HdfsConfig()))),
new HdfsConfig(),
new ImpersonatingHdfsAuthentication(new SimpleHadoopAuthentication(), SIMPLE_USER_NAME_PROVIDER));
ConnectorIdentity userId = ConnectorIdentity.ofUser("user");
Expand All @@ -83,7 +82,7 @@ public void testFileSystemCache()
public void testFileSystemCacheException() throws IOException
{
HdfsEnvironment environment = new HdfsEnvironment(
new DynamicHdfsConfiguration(new HdfsConfigurationInitializer(new HdfsConfig()), ImmutableSet.of()),
new DynamicHdfsConfiguration(new HdfsConfigurationInitializer(new HdfsConfig()), ImmutableSet.of(new HdfsFileSystemCacheConfigurationProvider(new HdfsConfig()))),
new HdfsConfig(),
new ImpersonatingHdfsAuthentication(new SimpleHadoopAuthentication(), SIMPLE_USER_NAME_PROVIDER));

Expand All @@ -98,6 +97,22 @@ public void testFileSystemCacheException() throws IOException
.hasMessage("FileSystem max cache size has been reached: " + maxCacheSize);
}

@Test
public void testDisablingFileSystemCache()
throws IOException
{
HdfsConfig withCachingDisabled = new HdfsConfig().setFileSystemMaxCacheSize(0);
HdfsEnvironment environment = new HdfsEnvironment(
new DynamicHdfsConfiguration(new HdfsConfigurationInitializer(withCachingDisabled), ImmutableSet.of(new HdfsFileSystemCacheConfigurationProvider(withCachingDisabled))),
withCachingDisabled,
new ImpersonatingHdfsAuthentication(new SimpleHadoopAuthentication(), SIMPLE_USER_NAME_PROVIDER));

ConnectorIdentity userId = ConnectorIdentity.ofUser("user");
FileSystem fs1 = getFileSystem(environment, userId);
FileSystem fs2 = getFileSystem(environment, userId);
assertNotSame(fs1, fs2);
}

@Test
public void testFileSystemCacheConcurrency() throws InterruptedException, ExecutionException, IOException
{
Expand All @@ -122,7 +137,7 @@ public void testFileSystemCacheConcurrency() throws InterruptedException, Execut
private static FileSystem getFileSystem(HdfsEnvironment environment, ConnectorIdentity identity)
throws IOException
{
return environment.getFileSystem(identity, new Path("/"), newEmptyConfiguration());
return environment.getFileSystem(new HdfsContext(identity), new Path("file:///"));
}

@FunctionalInterface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.trino.hdfs.HdfsConfig;
import io.trino.hdfs.HdfsConfiguration;
import io.trino.hdfs.HdfsConfigurationInitializer;
import io.trino.hdfs.HdfsFileSystemCacheConfigurationProvider;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.hdfs.authentication.NoHdfsAuthentication;
Expand Down Expand Up @@ -172,7 +173,7 @@ public class FileHiveMetastore
public static FileHiveMetastore createTestingFileHiveMetastore(File catalogDirectory)
{
HdfsConfig hdfsConfig = new HdfsConfig();
HdfsConfiguration hdfsConfiguration = new DynamicHdfsConfiguration(new HdfsConfigurationInitializer(hdfsConfig), ImmutableSet.of());
HdfsConfiguration hdfsConfiguration = new DynamicHdfsConfiguration(new HdfsConfigurationInitializer(hdfsConfig), ImmutableSet.of(new HdfsFileSystemCacheConfigurationProvider(hdfsConfig)));
HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, hdfsConfig, new NoHdfsAuthentication());
return new FileHiveMetastore(
new NodeVersion("testversion"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import io.trino.hdfs.HdfsConfig;
import io.trino.hdfs.HdfsConfiguration;
import io.trino.hdfs.HdfsConfigurationInitializer;
import io.trino.hdfs.HdfsFileSystemCacheConfigurationProvider;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.hdfs.authentication.NoHdfsAuthentication;
Expand Down Expand Up @@ -220,7 +221,7 @@ public GlueHiveMetastore(
public static GlueHiveMetastore createTestingGlueHiveMetastore(String defaultWarehouseDir)
{
HdfsConfig hdfsConfig = new HdfsConfig();
HdfsConfiguration hdfsConfiguration = new DynamicHdfsConfiguration(new HdfsConfigurationInitializer(hdfsConfig), ImmutableSet.of());
HdfsConfiguration hdfsConfiguration = new DynamicHdfsConfiguration(new HdfsConfigurationInitializer(hdfsConfig), ImmutableSet.of(new HdfsFileSystemCacheConfigurationProvider(hdfsConfig)));
HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, hdfsConfig, new NoHdfsAuthentication());
GlueMetastoreStats stats = new GlueMetastoreStats();
GlueHiveMetastoreConfig glueConfig = new GlueHiveMetastoreConfig()
Expand Down