diff --git a/presto-hive/pom.xml b/presto-hive/pom.xml index 1663cbca21ee9..2a47c5ab32f22 100644 --- a/presto-hive/pom.xml +++ b/presto-hive/pom.xml @@ -75,6 +75,26 @@ + + org.apache.hudi + hudi-hadoop-mr + 0.5.1-incubating + + + org.objenesis + objenesis + + + commons-logging + commons-logging + + + org.slf4j + jcl-over-slf4j + + + + com.facebook.presto presto-memory-context diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java b/presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java index 330fc4526b837..eec67ca811f68 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java @@ -28,6 +28,8 @@ import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.predicate.Domain; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import com.google.common.io.CharStreams; @@ -36,6 +38,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileSplit; @@ -43,6 +46,8 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hudi.hadoop.HoodieParquetInputFormat; +import org.apache.hudi.hadoop.HoodieROTablePathFilter; import java.io.BufferedReader; import java.io.IOException; @@ -111,6 +116,7 @@ public class BackgroundHiveSplitLoader private final ConcurrentLazyQueue partitions; private final Deque> fileIterators = new ConcurrentLinkedDeque<>(); private final boolean schedulerUsesHostAddresses; + private final Supplier hoodiePathFilterSupplier; // Purpose of this lock: // * Write lock: when you need a consistent view across partitions, fileIterators, and hiveSplitSource. @@ -159,6 +165,7 @@ public BackgroundHiveSplitLoader( this.partitions = new ConcurrentLazyQueue<>(requireNonNull(partitions, "partitions is null")); this.hdfsContext = new HdfsContext(session, table.getDatabaseName(), table.getTableName()); this.schedulerUsesHostAddresses = schedulerUsesHostAddresses; + this.hoodiePathFilterSupplier = Suppliers.memoize(HoodieROTablePathFilter::new); } @Override @@ -280,14 +287,14 @@ private ListenableFuture loadPartition(HivePartitionMetadata partition) { String partitionName = partition.getHivePartition().getPartitionId(); Storage storage = partition.getPartition().map(Partition::getStorage).orElse(table.getStorage()); + String inputFormatName = storage.getStorageFormat().getInputFormat(); int partitionDataColumnCount = partition.getPartition() .map(p -> p.getColumns().size()) .orElse(table.getDataColumns().size()); List partitionKeys = getPartitionKeys(table, partition.getPartition()); - Path path = new Path(getPartitionLocation(table, partition.getPartition())); Configuration configuration = hdfsEnvironment.getConfiguration(hdfsContext, path); - InputFormat inputFormat = getInputFormat(configuration, storage.getStorageFormat().getInputFormat(), false); + InputFormat inputFormat = getInputFormat(configuration, inputFormatName, false); FileSystem fs = hdfsEnvironment.getFileSystem(hdfsContext, path); boolean s3SelectPushdownEnabled = shouldEnablePushdownForTable(session, table, path.toString(), partition.getPartition()); @@ -359,9 +366,7 @@ private ListenableFuture loadPartition(HivePartitionMetadata partition) bucketConversionRequiresWorkerParticipation ? bucketConversion : Optional.empty()), schedulerUsesHostAddresses); - // To support custom input formats, we want to call getSplits() - // on the input format to obtain file splits. - if (shouldUseFileSplitsFromInputFormat(inputFormat)) { + if (!isHudiInputFormat(inputFormat) && shouldUseFileSplitsFromInputFormat(inputFormat)) { if (tableBucketInfo.isPresent()) { throw new PrestoException(NOT_SUPPORTED, "Presto cannot read bucketed partition in an input format with UseFileSplitsFromInputFormat annotation: " + inputFormat.getClass().getSimpleName()); } @@ -371,7 +376,7 @@ private ListenableFuture loadPartition(HivePartitionMetadata partition) return addSplitsToSource(splits, splitFactory); } - + PathFilter pathFilter = isHudiInputFormat(inputFormat) ? hoodiePathFilterSupplier.get() : path1 -> true; // S3 Select pushdown works at the granularity of individual S3 objects, // therefore we must not split files when it is enabled. Properties schema = getHiveSchema(storage.getSerdeParameters(), table.getParameters()); @@ -385,12 +390,12 @@ private ListenableFuture loadPartition(HivePartitionMetadata partition) checkState( tableBucketInfo.get().getTableBucketCount() == tableBucketInfo.get().getReadBucketCount(), "Table and read bucket count should be the same for virtual bucket"); - return hiveSplitSource.addToQueue(getVirtuallyBucketedSplits(path, fs, splitFactory, tableBucketInfo.get().getReadBucketCount(), splittable)); + return hiveSplitSource.addToQueue(getVirtuallyBucketedSplits(path, fs, splitFactory, tableBucketInfo.get().getReadBucketCount(), splittable, pathFilter)); } - return hiveSplitSource.addToQueue(getBucketedSplits(path, fs, splitFactory, tableBucketInfo.get(), bucketConversion, partitionName, splittable)); + return hiveSplitSource.addToQueue(getBucketedSplits(path, fs, splitFactory, tableBucketInfo.get(), bucketConversion, partitionName, splittable, pathFilter)); } - fileIterators.addLast(createInternalHiveSplitIterator(path, fs, splitFactory, splittable)); + fileIterators.addLast(createInternalHiveSplitIterator(path, fs, splitFactory, splittable, pathFilter)); return COMPLETED_FUTURE; } @@ -410,6 +415,11 @@ private ListenableFuture addSplitsToSource(InputSplit[] targetSplits, Interna return lastResult; } + private static boolean isHudiInputFormat(InputFormat inputFormat) + { + return inputFormat instanceof HoodieParquetInputFormat; + } + private static boolean shouldUseFileSplitsFromInputFormat(InputFormat inputFormat) { return Arrays.stream(inputFormat.getClass().getAnnotations()) @@ -418,9 +428,9 @@ private static boolean shouldUseFileSplitsFromInputFormat(InputFormat inpu .anyMatch(name -> name.equals("UseFileSplitsFromInputFormat")); } - private Iterator createInternalHiveSplitIterator(Path path, FileSystem fileSystem, InternalHiveSplitFactory splitFactory, boolean splittable) + private Iterator createInternalHiveSplitIterator(Path path, FileSystem fileSystem, InternalHiveSplitFactory splitFactory, boolean splittable, PathFilter pathFilter) { - return stream(directoryLister.list(fileSystem, path, namenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED)) + return stream(directoryLister.list(fileSystem, path, namenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED, pathFilter)) .map(status -> splitFactory.createInternalHiveSplit(status, splittable)) .filter(Optional::isPresent) .map(Optional::get) @@ -434,7 +444,8 @@ private List getBucketedSplits( BucketSplitInfo bucketSplitInfo, Optional bucketConversion, String partitionName, - boolean splittable) + boolean splittable, + PathFilter pathFilter) { int readBucketCount = bucketSplitInfo.getReadBucketCount(); int tableBucketCount = bucketSplitInfo.getTableBucketCount(); @@ -445,7 +456,7 @@ private List getBucketedSplits( // list all files in the partition List fileInfos = new ArrayList<>(partitionBucketCount); try { - Iterators.addAll(fileInfos, directoryLister.list(fileSystem, path, namenodeStats, FAIL)); + Iterators.addAll(fileInfos, directoryLister.list(fileSystem, path, namenodeStats, FAIL, pathFilter)); } catch (NestedDirectoryNotAllowedException e) { // Fail here to be on the safe side. This seems to be the same as what Hive does @@ -510,10 +521,10 @@ private List getBucketedSplits( return splitList; } - private List getVirtuallyBucketedSplits(Path path, FileSystem fileSystem, InternalHiveSplitFactory splitFactory, int bucketCount, boolean splittable) + private List getVirtuallyBucketedSplits(Path path, FileSystem fileSystem, InternalHiveSplitFactory splitFactory, int bucketCount, boolean splittable, PathFilter pathFilter) { // List all files recursively in the partition and assign virtual bucket number to each of them - return stream(directoryLister.list(fileSystem, path, namenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED)) + return stream(directoryLister.list(fileSystem, path, namenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED, pathFilter)) .map(fileInfo -> { int virtualBucketNumber = getVirtualBucketNumber(bucketCount, fileInfo.getPath()); return splitFactory.createInternalHiveSplit(fileInfo, virtualBucketNumber, virtualBucketNumber, splittable); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/DirectoryLister.java b/presto-hive/src/main/java/com/facebook/presto/hive/DirectoryLister.java index 26124fe5578e1..542b57c75119a 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/DirectoryLister.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/DirectoryLister.java @@ -15,6 +15,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import java.util.Iterator; @@ -22,5 +23,5 @@ public interface DirectoryLister { - Iterator list(FileSystem fileSystem, Path path, NamenodeStats namenodeStats, NestedDirectoryPolicy nestedDirectoryPolicy); + Iterator list(FileSystem fileSystem, Path path, NamenodeStats namenodeStats, NestedDirectoryPolicy nestedDirectoryPolicy, PathFilter pathFilter); } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HadoopDirectoryLister.java b/presto-hive/src/main/java/com/facebook/presto/hive/HadoopDirectoryLister.java index 011b5eb84235d..d076198371e62 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HadoopDirectoryLister.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HadoopDirectoryLister.java @@ -17,6 +17,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RemoteIterator; import java.io.IOException; @@ -31,9 +32,9 @@ public class HadoopDirectoryLister implements DirectoryLister { @Override - public Iterator list(FileSystem fileSystem, Path path, NamenodeStats namenodeStats, NestedDirectoryPolicy nestedDirectoryPolicy) + public Iterator list(FileSystem fileSystem, Path path, NamenodeStats namenodeStats, NestedDirectoryPolicy nestedDirectoryPolicy, PathFilter pathFilter) { - return new HiveFileIterator(path, p -> new HadoopFileInfoIterator(fileSystem.listLocatedStatus(p)), namenodeStats, nestedDirectoryPolicy); + return new HiveFileIterator(path, p -> new HadoopFileInfoIterator(fileSystem.listLocatedStatus(p)), namenodeStats, nestedDirectoryPolicy, pathFilter); } public static class HadoopFileInfoIterator diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/util/HiveFileIterator.java b/presto-hive/src/main/java/com/facebook/presto/hive/util/HiveFileIterator.java index f80308ef6904f..30ab75d47c452 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/util/HiveFileIterator.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/util/HiveFileIterator.java @@ -18,7 +18,9 @@ import com.facebook.presto.hive.NamenodeStats; import com.facebook.presto.spi.PrestoException; import com.google.common.collect.AbstractIterator; +import com.google.common.collect.Iterators; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RemoteIterator; import java.io.FileNotFoundException; @@ -46,6 +48,7 @@ public enum NestedDirectoryPolicy private final ListDirectoryOperation listDirectoryOperation; private final NamenodeStats namenodeStats; private final NestedDirectoryPolicy nestedDirectoryPolicy; + private final PathFilter pathFilter; private Iterator remoteIterator = Collections.emptyIterator(); @@ -53,12 +56,14 @@ public HiveFileIterator( Path path, ListDirectoryOperation listDirectoryOperation, NamenodeStats namenodeStats, - NestedDirectoryPolicy nestedDirectoryPolicy) + NestedDirectoryPolicy nestedDirectoryPolicy, + PathFilter pathFilter) { paths.addLast(requireNonNull(path, "path is null")); this.listDirectoryOperation = requireNonNull(listDirectoryOperation, "listDirectoryOperation is null"); this.namenodeStats = requireNonNull(namenodeStats, "namenodeStats is null"); this.nestedDirectoryPolicy = requireNonNull(nestedDirectoryPolicy, "nestedDirectoryPolicy is null"); + this.pathFilter = requireNonNull(pathFilter, "pathFilter is null"); } @Override @@ -92,14 +97,14 @@ protected HiveFileInfo computeNext() if (paths.isEmpty()) { return endOfData(); } - remoteIterator = getLocatedFileStatusRemoteIterator(paths.removeFirst()); + remoteIterator = getLocatedFileStatusRemoteIterator(paths.removeFirst(), pathFilter); } } - private Iterator getLocatedFileStatusRemoteIterator(Path path) + private Iterator getLocatedFileStatusRemoteIterator(Path path, PathFilter pathFilter) { try (TimeStat.BlockTimer ignored = namenodeStats.getListLocatedStatus().time()) { - return new FileStatusIterator(path, listDirectoryOperation, namenodeStats); + return Iterators.filter(new FileStatusIterator(path, listDirectoryOperation, namenodeStats), input -> pathFilter.accept(input.getPath())); } } diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestBackgroundHiveSplitLoader.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestBackgroundHiveSplitLoader.java index d97763c558f38..0027ba9ea9270 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestBackgroundHiveSplitLoader.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestBackgroundHiveSplitLoader.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.Progressable; import org.testng.annotations.Test; @@ -393,7 +394,7 @@ public TestingDirectoryLister(List files) } @Override - public Iterator list(FileSystem fs, Path path, NamenodeStats namenodeStats, NestedDirectoryPolicy nestedDirectoryPolicy) + public Iterator list(FileSystem fs, Path path, NamenodeStats namenodeStats, NestedDirectoryPolicy nestedDirectoryPolicy, PathFilter pathFilter) { return files.iterator(); } diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/util/TestHiveFileIterator.java b/presto-hive/src/test/java/com/facebook/presto/hive/util/TestHiveFileIterator.java new file mode 100644 index 0000000000000..8252153dc8435 --- /dev/null +++ b/presto-hive/src/test/java/com/facebook/presto/hive/util/TestHiveFileIterator.java @@ -0,0 +1,202 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive.util; + +import com.facebook.presto.hive.HadoopDirectoryLister.HadoopFileInfoIterator; +import com.facebook.presto.hive.NamenodeStats; +import com.facebook.presto.hive.util.HiveFileIterator.ListDirectoryOperation; +import com.google.common.collect.Iterators; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static com.facebook.presto.hive.util.HiveFileIterator.NestedDirectoryPolicy.IGNORED; +import static com.facebook.presto.hive.util.HiveFileIterator.NestedDirectoryPolicy.RECURSE; +import static com.google.common.io.Files.createTempDir; +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static org.testng.Assert.assertEquals; + +public class TestHiveFileIterator +{ + private static final String PATH_FILTER_MATCHED_PREFIX = "path_filter_test_file_"; + private static final String PATH_FILTER_NOT_MATCHED_PREFIX = "path_filter_not_matched_"; + + private Configuration hadoopConf; + private ListDirectoryOperation listDirectoryOperation; + + @BeforeClass + private void setup() + { + hadoopConf = new Configuration(); + hadoopConf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); + hadoopConf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); + listDirectoryOperation = path -> + { + FileSystem fs = path.getFileSystem(hadoopConf); + return new HadoopFileInfoIterator(fs.listLocatedStatus(path)); + }; + } + + @AfterClass(alwaysRun = true) + private void tearDown() + { + hadoopConf = null; + listDirectoryOperation = null; + } + + @Test + public void testDefaultPathFilterNoRecursion() throws IOException + { + // set up + File rootDir = createTempDir(); + String basePath = rootDir.getAbsolutePath(); + // create 8 files in root directory - 3 pathFilter matched and 5 non matched files. + createFiles(basePath, 3, true); + createFiles(basePath, 5, false); + Path rootPath = new Path("file://" + basePath + File.separator); + PathFilter pathFilter = path -> true; + HiveFileIterator hiveFileIterator = new HiveFileIterator(rootPath, listDirectoryOperation, new NamenodeStats(), IGNORED, pathFilter); + + int actualCount = Iterators.size(hiveFileIterator); + assertEquals(actualCount, 8); + + // cleanup + deleteTestDir(rootDir); + } + + @Test + public void testDefaultPathFilterWithRecursion() throws IOException + { + // set up + File rootDir = createTempDir(); + String basePath = rootDir.getAbsolutePath(); + // create 8 files in root directory - 3 pathFilter matched and 5 non matched files. + createFiles(basePath, 3, true); + createFiles(basePath, 5, false); + // create two directories + List subDirs = createDirs(basePath, 2); + // create 5 files in dir1 - 3 pathFilter matched and 2 non matched files. + String dir1 = subDirs.get(0).getAbsolutePath(); + createFiles(dir1, 3, true); + createFiles(dir1, 2, false); + // create 7 files in dir2 - 3 pathFilter matched and 4 non matched files. + String dir2 = subDirs.get(1).getAbsolutePath(); + createFiles(dir2, 3, true); + createFiles(dir2, 4, false); + Path rootPath = new Path("file://" + basePath + File.separator); + PathFilter pathFilter = path -> true; + HiveFileIterator hiveFileIterator = new HiveFileIterator(rootPath, listDirectoryOperation, new NamenodeStats(), RECURSE, pathFilter); + + int actualCount = Iterators.size(hiveFileIterator); + assertEquals(actualCount, 20); + + // cleanup + deleteTestDir(rootDir); + } + + @Test + public void testPathFilterWithNoRecursion() throws IOException + { + // set up + File rootDir = createTempDir(); + String basePath = rootDir.getAbsolutePath(); + // create 8 files in root directory - 3 pathFilter matched and 5 non matched files. + createFiles(basePath, 3, true); + createFiles(basePath, 5, false); + Path rootPath = new Path("file://" + basePath + File.separator); + PathFilter pathFilter = path -> path.getName().contains(PATH_FILTER_MATCHED_PREFIX); + HiveFileIterator hiveFileIterator = new HiveFileIterator(rootPath, listDirectoryOperation, new NamenodeStats(), IGNORED, pathFilter); + + int actualCount = Iterators.size(hiveFileIterator); + assertEquals(actualCount, 3); + + // cleanup + deleteTestDir(rootDir); + } + + @Test + public void testPathFilterWithRecursion() throws IOException + { + // set up + File rootDir = createTempDir(); + String basePath = rootDir.getAbsolutePath(); + // create 8 files in root directory - 3 pathFilter matched and 5 non matched files. + createFiles(basePath, 3, true); + createFiles(basePath, 5, false); + // create two directories + List subDirs = createDirs(basePath, 2); + // create 5 files in dir1 - 3 pathFilter matched and 2 non matched files. + String dir1 = subDirs.get(0).getAbsolutePath(); + createFiles(dir1, 3, true); + createFiles(dir1, 2, false); + // create 7 files in dir2 - 3 pathFilter matched and 4 non matched files. + String dir2 = subDirs.get(1).getAbsolutePath(); + createFiles(dir2, 3, true); + createFiles(dir2, 4, false); + Path rootPath = new Path("file://" + basePath + File.separator); + PathFilter pathFilter = path -> path.getName().contains(PATH_FILTER_MATCHED_PREFIX); + HiveFileIterator hiveFileIterator = new HiveFileIterator(rootPath, listDirectoryOperation, new NamenodeStats(), RECURSE, pathFilter); + + int actualCount = Iterators.size(hiveFileIterator); + assertEquals(actualCount, 9); + + // cleanup + deleteTestDir(rootDir); + } + + private void deleteTestDir(File rootDir) + throws IOException + { + if (rootDir.exists()) { + deleteRecursively(rootDir.toPath(), ALLOW_INSECURE); + } + } + + private void createFiles(String basePath, int numFiles, boolean matchPathFilter) throws IOException + { + new File(basePath).mkdirs(); + for (int i = 0; i < numFiles; i++) { + String fileName; + if (matchPathFilter) { + fileName = PATH_FILTER_MATCHED_PREFIX + i; + } + else { + fileName = PATH_FILTER_NOT_MATCHED_PREFIX + i; + } + new File(basePath + File.separator + fileName).createNewFile(); + } + } + + private List createDirs(String basePath, int numDirectories) + { + List directories = new ArrayList<>(); + for (int i = 0; i < numDirectories; i++) { + String dirName = basePath + File.separator + PATH_FILTER_MATCHED_PREFIX + "dir_" + i; + File file = new File(dirName); + file.mkdirs(); + directories.add(file); + } + return directories; + } +}