diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/ForMetastoreHdfsEnvironment.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/ForMetastoreHdfsEnvironment.java new file mode 100644 index 0000000000000..54be368395ee5 --- /dev/null +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/ForMetastoreHdfsEnvironment.java @@ -0,0 +1,31 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive; + +import javax.inject.Qualifier; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +@Retention(RUNTIME) +@Target({FIELD, PARAMETER, METHOD}) +@Qualifier +public @interface ForMetastoreHdfsEnvironment +{ +} diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HdfsEnvironment.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HdfsEnvironment.java index 9ca2aaf9546dc..18890b228a384 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HdfsEnvironment.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HdfsEnvironment.java @@ -44,7 +44,7 @@ public class HdfsEnvironment @Inject public HdfsEnvironment( - HdfsConfiguration hdfsConfiguration, + @ForMetastoreHdfsEnvironment HdfsConfiguration hdfsConfiguration, MetastoreClientConfig config, HdfsAuthentication hdfsAuthentication) { diff --git a/presto-hive/pom.xml b/presto-hive/pom.xml index 1ad88f9a04f8f..46560a52d85bb 100644 --- a/presto-hive/pom.xml +++ b/presto-hive/pom.xml @@ -42,6 +42,11 @@ presto-expressions + + com.facebook.presto + presto-cache + + org.apache.parquet parquet-column diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveCachingHdfsConfiguration.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveCachingHdfsConfiguration.java new file mode 100644 index 0000000000000..f92391efebe89 --- /dev/null +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveCachingHdfsConfiguration.java @@ -0,0 +1,117 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive; + +import com.facebook.presto.cache.CacheConfig; +import com.facebook.presto.cache.CacheManager; +import com.facebook.presto.cache.CacheStats; +import com.facebook.presto.cache.CachingFileSystem; +import com.facebook.presto.cache.ForCachingFileSystem; +import com.facebook.presto.cache.LocalRangeCacheManager; +import com.facebook.presto.cache.NoOpCacheManager; +import com.facebook.presto.hadoop.FileSystemFactory; +import com.facebook.presto.hive.HdfsEnvironment.HdfsContext; +import com.facebook.presto.spi.PrestoException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; + +import javax.inject.Inject; + +import java.io.IOException; +import java.net.URI; +import java.util.Map; +import java.util.function.BiFunction; + +import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed; +import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.Executors.newScheduledThreadPool; + +public class HiveCachingHdfsConfiguration + implements HdfsConfiguration +{ + private final HdfsConfiguration hiveHdfsConfiguration; + private final CacheManager cacheManager; + private final boolean cacheValidationEnabled; + + @Inject + public HiveCachingHdfsConfiguration( + @ForCachingFileSystem HdfsConfiguration hdfsConfiguration, + CacheConfig cacheConfig, + CacheStats cacheStats) + { + this.hiveHdfsConfiguration = requireNonNull(hdfsConfiguration, "hiveHdfsConfiguration is null"); + + CacheConfig config = requireNonNull(cacheConfig, "cacheConfig is null"); + this.cacheManager = config.getBaseDirectory() == null ? + new NoOpCacheManager() : + new LocalRangeCacheManager( + cacheConfig, + cacheStats, + newScheduledThreadPool(5, daemonThreadsNamed("hive-cache-flusher-%s")), + newScheduledThreadPool(1, daemonThreadsNamed("hive-cache-remover-%s"))); + this.cacheValidationEnabled = cacheConfig.isValidationEnabled(); + } + + @Override + public Configuration getConfiguration(HdfsContext context, URI uri) + { + @SuppressWarnings("resource") + Configuration config = new CachingJobConf((factoryConfig, factoryUri) -> { + try { + return new CachingFileSystem( + factoryUri, + factoryConfig, + cacheManager, + (new Path(uri)).getFileSystem(hiveHdfsConfiguration.getConfiguration(context, uri)), + cacheValidationEnabled); + } + catch (IOException e) { + throw new PrestoException(GENERIC_INTERNAL_ERROR, "cannot create caching file system", e); + } + }); + Configuration defaultConfig = hiveHdfsConfiguration.getConfiguration(context, uri); + + copy(defaultConfig, config); + return config; + } + + private static void copy(Configuration from, Configuration to) + { + for (Map.Entry entry : from) { + to.set(entry.getKey(), entry.getValue()); + } + } + + private static class CachingJobConf + extends JobConf + implements FileSystemFactory + { + private final BiFunction factory; + + private CachingJobConf(BiFunction factory) + { + super(false); + this.factory = requireNonNull(factory, "factory is null"); + } + + @Override + public FileSystem createFileSystem(URI uri) + { + return factory.apply(this, uri); + } + } +}