diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java b/presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java index f89b955cd2d97..f2747035f70d0 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileSplit; @@ -111,6 +112,7 @@ public class BackgroundHiveSplitLoader private final ConcurrentLazyQueue partitions; private final Deque> fileIterators = new ConcurrentLinkedDeque<>(); private final boolean schedulerUsesHostAddresses; + private Optional inputPathFilter; // Purpose of this lock: // * Write lock: when you need a consistent view across partitions, fileIterators, and hiveSplitSource. @@ -144,7 +146,8 @@ public BackgroundHiveSplitLoader( Executor executor, int loaderConcurrency, boolean recursiveDirWalkerEnabled, - boolean schedulerUsesHostAddresses) + boolean schedulerUsesHostAddresses, + Optional inputPathFilterClass) { this.table = requireNonNull(table, "table is null"); this.pathDomain = requireNonNull(pathDomain, "pathDomain is null"); @@ -159,6 +162,21 @@ public BackgroundHiveSplitLoader( this.partitions = new ConcurrentLazyQueue<>(requireNonNull(partitions, "partitions is null")); this.hdfsContext = new HdfsContext(session, table.getDatabaseName(), table.getTableName()); this.schedulerUsesHostAddresses = schedulerUsesHostAddresses; + this.inputPathFilter = inputPathFilterClass.map(BackgroundHiveSplitLoader::createPathFilter); + } + + private static PathFilter createPathFilter(String inputPathFilterClass) + { + try { + Object instance = Class.forName(inputPathFilterClass).getConstructor().newInstance(); + if (!(instance instanceof PathFilter)) { + throw new RuntimeException("Invalid path filter class: " + instance.getClass().getName()); + } + return (PathFilter) instance; + } + catch (ReflectiveOperationException e) { + throw new RuntimeException("Unable to create path filter: " + inputPathFilterClass, e); + } } @Override @@ -359,9 +377,11 @@ private ListenableFuture loadPartition(HivePartitionMetadata partition) bucketConversionRequiresWorkerParticipation ? bucketConversion : Optional.empty()), schedulerUsesHostAddresses); - // To support custom input formats, we want to call getSplits() - // on the input format to obtain file splits. - if (shouldUseFileSplitsFromInputFormat(inputFormat)) { + // To support custom input formats, we want to call getSplits() on the input format to obtain file splits. + // Alternatively custom InputFormats can choose to provide a PathFilter implementation for getting splits. + // In the presence of PathFilter (set using 'hive.input-path-filter-class' config), shouldUseFileSplitsFromInputFormat + // will be overridden. + if (shouldUseFileSplitsFromInputFormat(inputFormat) && !inputPathFilter.isPresent()) { if (tableBucketInfo.isPresent()) { throw new PrestoException(NOT_SUPPORTED, "Presto cannot read bucketed partition in an input format with UseFileSplitsFromInputFormat annotation: " + inputFormat.getClass().getSimpleName()); } @@ -420,7 +440,7 @@ private static boolean shouldUseFileSplitsFromInputFormat(InputFormat inpu private Iterator createInternalHiveSplitIterator(Path path, FileSystem fileSystem, InternalHiveSplitFactory splitFactory, boolean splittable) { - return stream(directoryLister.list(fileSystem, path, namenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED)) + return stream(directoryLister.list(fileSystem, path, namenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED, inputPathFilter)) .map(status -> splitFactory.createInternalHiveSplit(status, splittable)) .filter(Optional::isPresent) .map(Optional::get) @@ -445,7 +465,7 @@ private List getBucketedSplits( // list all files in the partition List fileInfos = new ArrayList<>(partitionBucketCount); try { - Iterators.addAll(fileInfos, directoryLister.list(fileSystem, path, namenodeStats, FAIL)); + Iterators.addAll(fileInfos, directoryLister.list(fileSystem, path, namenodeStats, FAIL, inputPathFilter)); } catch (NestedDirectoryNotAllowedException e) { // Fail here to be on the safe side. This seems to be the same as what Hive does @@ -513,7 +533,7 @@ private List getBucketedSplits( private List getVirtuallyBucketedSplits(Path path, FileSystem fileSystem, InternalHiveSplitFactory splitFactory, int bucketCount, boolean splittable) { // List all files recursively in the partition and assign virtual bucket number to each of them - return stream(directoryLister.list(fileSystem, path, namenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED)) + return stream(directoryLister.list(fileSystem, path, namenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED, inputPathFilter)) .map(fileInfo -> { int virtualBucketNumber = getVirtualBucketNumber(bucketCount, fileInfo.getPath()); return splitFactory.createInternalHiveSplit(fileInfo, virtualBucketNumber, virtualBucketNumber, splittable); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/DirectoryLister.java b/presto-hive/src/main/java/com/facebook/presto/hive/DirectoryLister.java index 26124fe5578e1..5b5e8095ed904 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/DirectoryLister.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/DirectoryLister.java @@ -15,12 +15,16 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import java.util.Iterator; +import java.util.Optional; import static com.facebook.presto.hive.util.HiveFileIterator.NestedDirectoryPolicy; public interface DirectoryLister { Iterator list(FileSystem fileSystem, Path path, NamenodeStats namenodeStats, NestedDirectoryPolicy nestedDirectoryPolicy); + + Iterator list(FileSystem fileSystem, Path path, NamenodeStats namenodeStats, NestedDirectoryPolicy nestedDirectoryPolicy, Optional pathFilter); } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HadoopDirectoryLister.java b/presto-hive/src/main/java/com/facebook/presto/hive/HadoopDirectoryLister.java index 011b5eb84235d..ed2edec4e7f74 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HadoopDirectoryLister.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HadoopDirectoryLister.java @@ -17,6 +17,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RemoteIterator; import java.io.IOException; @@ -33,7 +34,13 @@ public class HadoopDirectoryLister @Override public Iterator list(FileSystem fileSystem, Path path, NamenodeStats namenodeStats, NestedDirectoryPolicy nestedDirectoryPolicy) { - return new HiveFileIterator(path, p -> new HadoopFileInfoIterator(fileSystem.listLocatedStatus(p)), namenodeStats, nestedDirectoryPolicy); + return new HiveFileIterator(path, p -> new HadoopFileInfoIterator(fileSystem.listLocatedStatus(p)), namenodeStats, nestedDirectoryPolicy, Optional.empty()); + } + + @Override + public Iterator list(FileSystem fileSystem, Path path, NamenodeStats namenodeStats, NestedDirectoryPolicy nestedDirectoryPolicy, Optional pathFilter) + { + return new HiveFileIterator(path, p -> new HadoopFileInfoIterator(fileSystem.listLocatedStatus(p)), namenodeStats, nestedDirectoryPolicy, pathFilter); } public static class HadoopFileInfoIterator diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java index 1d6068789d514..fe5fdf4db952c 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java @@ -166,6 +166,7 @@ public class HiveClientConfig private boolean pushdownFilterEnabled; private boolean nestedColumnsFilterEnabled; + private String inputPathFilterClass; public int getMaxInitialSplits() { @@ -1394,4 +1395,17 @@ public HiveClientConfig setNestedColumnsFilterEnabled(boolean nestedColumnsFilte this.nestedColumnsFilterEnabled = nestedColumnsFilterEnabled; return this; } + + public String getInputPathFilterClass() + { + return inputPathFilterClass; + } + + @Config("hive.input-path-filter-class") + @ConfigDescription("Enable PathFilter on files listed by DirectoryLister instead of getting splits from custom InputFormats") + public HiveClientConfig setInputPathFilterClass(String inputPathFilterClass) + { + this.inputPathFilterClass = inputPathFilterClass; + return this; + } } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitManager.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitManager.java index df18d4aa695a5..d4d0b70f0cab2 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitManager.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitManager.java @@ -95,6 +95,7 @@ public class HiveSplitManager private final int splitLoaderConcurrency; private final boolean recursiveDfsWalkerEnabled; private final CounterStat highMemorySplitSourceCounter; + private final Optional inputPathFilterClass; @Inject public HiveSplitManager( @@ -120,7 +121,8 @@ public HiveSplitManager( hiveClientConfig.getMaxPartitionBatchSize(), hiveClientConfig.getMaxInitialSplits(), hiveClientConfig.getSplitLoaderConcurrency(), - hiveClientConfig.getRecursiveDirWalkerEnabled()); + hiveClientConfig.getRecursiveDirWalkerEnabled(), + Optional.ofNullable(hiveClientConfig.getInputPathFilterClass())); } public HiveSplitManager( @@ -137,7 +139,8 @@ public HiveSplitManager( int maxPartitionBatchSize, int maxInitialSplits, int splitLoaderConcurrency, - boolean recursiveDfsWalkerEnabled) + boolean recursiveDfsWalkerEnabled, + Optional inputPathFilterClass) { this.hiveTransactionManager = requireNonNull(hiveTransactionManager, "hiveTransactionManager is null"); this.namenodeStats = requireNonNull(namenodeStats, "namenodeStats is null"); @@ -154,6 +157,7 @@ public HiveSplitManager( this.maxInitialSplits = maxInitialSplits; this.splitLoaderConcurrency = splitLoaderConcurrency; this.recursiveDfsWalkerEnabled = recursiveDfsWalkerEnabled; + this.inputPathFilterClass = inputPathFilterClass; } @Override @@ -230,7 +234,8 @@ public ConnectorSplitSource getSplits( executor, splitLoaderConcurrency, recursiveDfsWalkerEnabled, - splitSchedulingContext.schedulerUsesHostAddresses()); + splitSchedulingContext.schedulerUsesHostAddresses(), + inputPathFilterClass); HiveSplitSource splitSource; switch (splitSchedulingContext.getSplitSchedulingStrategy()) { diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/util/HiveFileIterator.java b/presto-hive/src/main/java/com/facebook/presto/hive/util/HiveFileIterator.java index 0bce6aa7d5e65..458a7668ab8e4 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/util/HiveFileIterator.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/util/HiveFileIterator.java @@ -19,6 +19,7 @@ import com.facebook.presto.spi.PrestoException; import com.google.common.collect.AbstractIterator; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RemoteIterator; import java.io.FileNotFoundException; @@ -27,6 +28,7 @@ import java.util.Collections; import java.util.Deque; import java.util.Iterator; +import java.util.Optional; import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR; import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILE_NOT_FOUND; @@ -46,6 +48,7 @@ public enum NestedDirectoryPolicy private final ListDirectoryOperation listDirectoryOperation; private final NamenodeStats namenodeStats; private final NestedDirectoryPolicy nestedDirectoryPolicy; + private final Optional pathFilter; private Iterator remoteIterator = Collections.emptyIterator(); @@ -53,12 +56,14 @@ public HiveFileIterator( Path path, ListDirectoryOperation listDirectoryOperation, NamenodeStats namenodeStats, - NestedDirectoryPolicy nestedDirectoryPolicy) + NestedDirectoryPolicy nestedDirectoryPolicy, + Optional pathFilter) { paths.addLast(requireNonNull(path, "path is null")); this.listDirectoryOperation = requireNonNull(listDirectoryOperation, "listDirectoryOperation is null"); this.namenodeStats = requireNonNull(namenodeStats, "namenodeStats is null"); this.nestedDirectoryPolicy = requireNonNull(nestedDirectoryPolicy, "nestedDirectoryPolicy is null"); + this.pathFilter = requireNonNull(pathFilter, "path filter is null"); } @Override @@ -92,14 +97,14 @@ protected HiveFileInfo computeNext() if (paths.isEmpty()) { return endOfData(); } - remoteIterator = getLocatedFileStatusRemoteIterator(paths.removeFirst()); + remoteIterator = getLocatedFileStatusRemoteIterator(paths.removeFirst(), pathFilter); } } - private Iterator getLocatedFileStatusRemoteIterator(Path path) + private Iterator getLocatedFileStatusRemoteIterator(Path path, Optional pathFilter) { try (TimeStat.BlockTimer ignored = namenodeStats.getListLocatedStatus().time()) { - return new FileStatusIterator(path, listDirectoryOperation, namenodeStats); + return new FileStatusIterator(path, listDirectoryOperation, namenodeStats, pathFilter); } } @@ -116,11 +121,14 @@ private static class FileStatusIterator private final Path path; private final NamenodeStats namenodeStats; private final RemoteIterator fileStatusIterator; + private final Optional pathFilter; + private HiveFileInfo next; - private FileStatusIterator(Path path, ListDirectoryOperation listDirectoryOperation, NamenodeStats namenodeStats) + private FileStatusIterator(Path path, ListDirectoryOperation listDirectoryOperation, NamenodeStats namenodeStats, Optional pathFilter) { this.path = path; this.namenodeStats = namenodeStats; + this.pathFilter = pathFilter; try { this.fileStatusIterator = listDirectoryOperation.list(path); } @@ -129,26 +137,39 @@ private FileStatusIterator(Path path, ListDirectoryOperation listDirectoryOperat } } - @Override - public boolean hasNext() + private void computeNext() { + if (next != null) { + return; + } try { - return fileStatusIterator.hasNext(); + while (fileStatusIterator.hasNext()) { + HiveFileInfo hiveFileInfo = fileStatusIterator.next(); + if (!pathFilter.isPresent() || pathFilter.get().accept(hiveFileInfo.getPath())) { + next = hiveFileInfo; + return; + } + } } catch (IOException e) { throw processException(e); } } + @Override + public boolean hasNext() + { + computeNext(); + return (next != null); + } + @Override public HiveFileInfo next() { - try { - return fileStatusIterator.next(); - } - catch (IOException e) { - throw processException(e); - } + computeNext(); + HiveFileInfo result = next; + next = null; + return result; } private PrestoException processException(IOException exception) diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java index 89975bf507459..00992be1630c4 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java @@ -856,7 +856,8 @@ protected final void setup(String databaseName, HiveClientConfig hiveClientConfi hiveClientConfig.getMaxPartitionBatchSize(), hiveClientConfig.getMaxInitialSplits(), hiveClientConfig.getSplitLoaderConcurrency(), - false); + false, + Optional.ofNullable(hiveClientConfig.getInputPathFilterClass())); pageSinkProvider = new HivePageSinkProvider( getDefaultHiveFileWriterFactories(hiveClientConfig), hdfsEnvironment, diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java index d5f7186ba2d42..227b1081c7609 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java @@ -211,7 +211,8 @@ protected void setup(String host, int port, String databaseName, Function createPartitionMetadataWithOfflinePartitions() @@ -397,6 +400,12 @@ public Iterator list(FileSystem fs, Path path, NamenodeStats namen { return files.iterator(); } + + @Override + public Iterator list(FileSystem fs, Path path, NamenodeStats namenodeStats, NestedDirectoryPolicy nestedDirectoryPolicy, Optional pathFilter) + { + return files.iterator(); + } } private static class TestingHdfsEnvironment diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java index d1805b430e656..9f3bdfa6334f5 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java @@ -133,7 +133,8 @@ public void testDefaults() .setTemporaryTableStorageFormat(ORC) .setTemporaryTableCompressionCodec(SNAPPY) .setPushdownFilterEnabled(false) - .setNestedColumnsFilterEnabled(false)); + .setNestedColumnsFilterEnabled(false) + .setInputPathFilterClass(null)); } @Test @@ -231,6 +232,7 @@ public void testExplicitPropertyMappings() .put("hive.temporary-table-compression-codec", "NONE") .put("hive.pushdown-filter-enabled", "true") .put("hive.nested-columns-filter-enabled", "true") + .put("hive.input-path-filter-class", "TestPathFilterClass") .build(); HiveClientConfig expected = new HiveClientConfig() @@ -325,7 +327,8 @@ public void testExplicitPropertyMappings() .setTemporaryTableStorageFormat(DWRF) .setTemporaryTableCompressionCodec(NONE) .setPushdownFilterEnabled(true) - .setNestedColumnsFilterEnabled(true); + .setNestedColumnsFilterEnabled(true) + .setInputPathFilterClass("TestPathFilterClass"); ConfigAssertions.assertFullMapping(properties, expected); } diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/util/TestHiveFileIterator.java b/presto-hive/src/test/java/com/facebook/presto/hive/util/TestHiveFileIterator.java new file mode 100644 index 0000000000000..94aa8ccce073b --- /dev/null +++ b/presto-hive/src/test/java/com/facebook/presto/hive/util/TestHiveFileIterator.java @@ -0,0 +1,230 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive.util; + +import com.facebook.presto.hive.NamenodeStats; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Random; + +import static com.facebook.presto.hive.HadoopDirectoryLister.HadoopFileInfoIterator; +import static com.facebook.presto.hive.util.HiveFileIterator.NestedDirectoryPolicy.IGNORED; +import static com.facebook.presto.hive.util.HiveFileIterator.NestedDirectoryPolicy.RECURSE; +import static com.google.common.io.Files.createTempDir; +import static org.testng.Assert.assertEquals; + +public class TestHiveFileIterator +{ + private NamenodeStats namenodeStats; + private PathFilter pathFilter; + private Path rootPath; + private File rootDir; + private HiveFileIterator.ListDirectoryOperation listDirectoryOperation; + private HiveFileIterator hiveFileIterator; + private String pathFilterString = "path_filter_test_file_"; + private int fileCountForIgnoredPolicyPathFilterOn; + private int fileCountForRecursePolicyPathFilterOn; + private int fileCountForIgnoredPolicyPathFilterOff; + private int fileCountForRecursePolicyPathFilterOff; + private Configuration hadoopConf; + private Random random; + private static final String RANDOM_FILE_NAME_SALT_STRING = "abcdefghijklmnopqrstuvwxyz"; + + @AfterClass + private void cleanup() + { + if (rootDir != null && rootDir.exists()) { + delete(rootDir); + } + } + + private void delete(File file) + { + if (!file.exists()) { + return; + } + if (file.isDirectory()) { + for (File child : file.listFiles()) { + delete(child); + } + } + file.delete(); + } + + @BeforeClass + private void setup() throws IOException + { + namenodeStats = new NamenodeStats(); + pathFilter = path -> path.getName().contains(pathFilterString); + rootDir = createTempDir(); + String basePath = rootDir.getAbsolutePath(); + rootPath = new Path("file://" + basePath + File.separator); + random = new Random(); + + // create few files in rootDir to match path filter criteria + createFiles(basePath, 5, false, pathFilterString); + updateExpectedStats(IGNORED, true, 5); + updateExpectedStats(IGNORED, false, 5); + updateExpectedStats(RECURSE, true, 5); + updateExpectedStats(RECURSE, false, 5); + + // create more files that will fail path filter criteria + createFiles(basePath, 5, true, null); + updateExpectedStats(IGNORED, false, 5); + updateExpectedStats(RECURSE, false, 5); + + // create sub directories + List dirs = createDirs(basePath, 2); + + // create files in each subdir along with couple files and one nested directory + for (int i = 0; i < dirs.size(); i++) { + if (i % 2 == 0) { + createFiles(dirs.get(i).getAbsolutePath(), 5, true, null); + updateExpectedStats(RECURSE, false, 5); + List subDirs = createDirs(dirs.get(i).getAbsolutePath(), 1); + createFiles(subDirs.get(0).getAbsolutePath(), 5, false, pathFilterString); + updateExpectedStats(RECURSE, true, 5); + updateExpectedStats(RECURSE, false, 5); + } + else { + createFiles(dirs.get(i).getAbsolutePath(), 5, false, pathFilterString); + updateExpectedStats(RECURSE, true, 5); + updateExpectedStats(RECURSE, false, 5); + List subDirs = createDirs(dirs.get(i).getAbsolutePath(), 1); + createFiles(subDirs.get(0).getAbsolutePath(), 5, true, null); + updateExpectedStats(RECURSE, false, 5); + } + } + hadoopConf = new Configuration(); + hadoopConf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); + hadoopConf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); + + listDirectoryOperation = path -> + { + FileSystem fs = path.getFileSystem(hadoopConf); + return new HadoopFileInfoIterator(fs.listLocatedStatus(path)); + }; + } + + private void createFiles(String basePath, int numFiles, boolean randomName, String fileNamePrefix) throws IOException + { + for (int i = 0; i < numFiles; i++) { + new File(basePath).mkdirs(); + String fileName; + if (randomName) { + fileName = getRandomStringOfLen(10); + } + else { + fileName = fileNamePrefix + "_" + i; + } + new File(basePath + File.separator + fileName).createNewFile(); + } + } + + private List createDirs(String basePath, int numDirectories) + { + List directories = new ArrayList<>(); + for (int i = 0; i < numDirectories; i++) { + String dirName = basePath + File.separator + pathFilterString + "_" + getRandomStringOfLen(5); + File file = new File(dirName); + file.mkdirs(); + directories.add(file); + } + return directories; + } + + private String getRandomStringOfLen(int len) + { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < len; i++) { + sb.append(RANDOM_FILE_NAME_SALT_STRING.charAt(random.nextInt(RANDOM_FILE_NAME_SALT_STRING.length()))); + } + return sb.toString(); + } + + private void updateExpectedStats(HiveFileIterator.NestedDirectoryPolicy policy, boolean hasPathFilter, int expectedCount) + { + switch (policy) { + case IGNORED: + if (hasPathFilter) { + fileCountForIgnoredPolicyPathFilterOn += expectedCount; + } + else { + fileCountForIgnoredPolicyPathFilterOff += expectedCount; + } + break; + case RECURSE: + if (hasPathFilter) { + fileCountForRecursePolicyPathFilterOn += expectedCount; + } + else { + fileCountForRecursePolicyPathFilterOff += expectedCount; + } + break; + } + } + + public int getFileCount(HiveFileIterator hiveFileIterator) + { + int actualCount = 0; + while (hiveFileIterator.hasNext()) { + hiveFileIterator.next(); + actualCount++; + } + return actualCount; + } + + @Test + public void testNoPathFilterNoRecursion() + { + hiveFileIterator = new HiveFileIterator(rootPath, listDirectoryOperation, namenodeStats, IGNORED, Optional.empty()); + int actualCount = getFileCount(hiveFileIterator); + assertEquals(actualCount, fileCountForIgnoredPolicyPathFilterOff); + } + + @Test + public void testNoPathFilterWithRecursion() + { + hiveFileIterator = new HiveFileIterator(rootPath, listDirectoryOperation, namenodeStats, RECURSE, Optional.empty()); + int actualCount = getFileCount(hiveFileIterator); + assertEquals(actualCount, fileCountForRecursePolicyPathFilterOff); + } + + @Test + public void testPathFilterWithNoRecursion() + { + hiveFileIterator = new HiveFileIterator(rootPath, listDirectoryOperation, namenodeStats, IGNORED, Optional.of(pathFilter)); + int actualCount = getFileCount(hiveFileIterator); + assertEquals(actualCount, fileCountForIgnoredPolicyPathFilterOn); + } + + @Test + public void testPathFilterWithRecursion() + { + hiveFileIterator = new HiveFileIterator(rootPath, listDirectoryOperation, namenodeStats, RECURSE, Optional.of(pathFilter)); + int actualCount = getFileCount(hiveFileIterator); + assertEquals(actualCount, fileCountForRecursePolicyPathFilterOn); + } +}