Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
Expand Down Expand Up @@ -111,6 +112,7 @@ public class BackgroundHiveSplitLoader
private final ConcurrentLazyQueue<HivePartitionMetadata> partitions;
private final Deque<Iterator<InternalHiveSplit>> fileIterators = new ConcurrentLinkedDeque<>();
private final boolean schedulerUsesHostAddresses;
private Optional<PathFilter> inputPathFilter;

// Purpose of this lock:
// * Write lock: when you need a consistent view across partitions, fileIterators, and hiveSplitSource.
Expand Down Expand Up @@ -144,7 +146,8 @@ public BackgroundHiveSplitLoader(
Executor executor,
int loaderConcurrency,
boolean recursiveDirWalkerEnabled,
boolean schedulerUsesHostAddresses)
boolean schedulerUsesHostAddresses,
Optional<String> inputPathFilterClass)
{
this.table = requireNonNull(table, "table is null");
this.pathDomain = requireNonNull(pathDomain, "pathDomain is null");
Expand All @@ -159,6 +162,21 @@ public BackgroundHiveSplitLoader(
this.partitions = new ConcurrentLazyQueue<>(requireNonNull(partitions, "partitions is null"));
this.hdfsContext = new HdfsContext(session, table.getDatabaseName(), table.getTableName());
this.schedulerUsesHostAddresses = schedulerUsesHostAddresses;
this.inputPathFilter = inputPathFilterClass.map(BackgroundHiveSplitLoader::createPathFilter);
}

private static PathFilter createPathFilter(String inputPathFilterClass)
{
try {
Object instance = Class.forName(inputPathFilterClass).getConstructor().newInstance();
if (!(instance instanceof PathFilter)) {
throw new RuntimeException("Invalid path filter class: " + instance.getClass().getName());
}
return (PathFilter) instance;
}
catch (ReflectiveOperationException e) {
throw new RuntimeException("Unable to create path filter: " + inputPathFilterClass, e);
}
}

@Override
Expand Down Expand Up @@ -359,9 +377,11 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)
bucketConversionRequiresWorkerParticipation ? bucketConversion : Optional.empty()),
schedulerUsesHostAddresses);

// To support custom input formats, we want to call getSplits()
// on the input format to obtain file splits.
if (shouldUseFileSplitsFromInputFormat(inputFormat)) {
// To support custom input formats, we want to call getSplits() on the input format to obtain file splits.
// Alternatively custom InputFormats can choose to provide a PathFilter implementation for getting splits.
// In the presence of PathFilter (set using 'hive.input-path-filter-class' config), shouldUseFileSplitsFromInputFormat
// will be overridden.
if (shouldUseFileSplitsFromInputFormat(inputFormat) && !inputPathFilter.isPresent()) {
Comment thread
bhasudha marked this conversation as resolved.
Outdated
if (tableBucketInfo.isPresent()) {
throw new PrestoException(NOT_SUPPORTED, "Presto cannot read bucketed partition in an input format with UseFileSplitsFromInputFormat annotation: " + inputFormat.getClass().getSimpleName());
}
Expand Down Expand Up @@ -420,7 +440,7 @@ private static boolean shouldUseFileSplitsFromInputFormat(InputFormat<?, ?> inpu

private Iterator<InternalHiveSplit> createInternalHiveSplitIterator(Path path, FileSystem fileSystem, InternalHiveSplitFactory splitFactory, boolean splittable)
{
return stream(directoryLister.list(fileSystem, path, namenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED))
return stream(directoryLister.list(fileSystem, path, namenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED, inputPathFilter))
.map(status -> splitFactory.createInternalHiveSplit(status, splittable))
.filter(Optional::isPresent)
.map(Optional::get)
Expand All @@ -445,7 +465,7 @@ private List<InternalHiveSplit> getBucketedSplits(
// list all files in the partition
List<HiveFileInfo> fileInfos = new ArrayList<>(partitionBucketCount);
try {
Iterators.addAll(fileInfos, directoryLister.list(fileSystem, path, namenodeStats, FAIL));
Iterators.addAll(fileInfos, directoryLister.list(fileSystem, path, namenodeStats, FAIL, inputPathFilter));
}
catch (NestedDirectoryNotAllowedException e) {
// Fail here to be on the safe side. This seems to be the same as what Hive does
Expand Down Expand Up @@ -513,7 +533,7 @@ private List<InternalHiveSplit> getBucketedSplits(
private List<InternalHiveSplit> getVirtuallyBucketedSplits(Path path, FileSystem fileSystem, InternalHiveSplitFactory splitFactory, int bucketCount, boolean splittable)
{
// List all files recursively in the partition and assign virtual bucket number to each of them
return stream(directoryLister.list(fileSystem, path, namenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED))
return stream(directoryLister.list(fileSystem, path, namenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED, inputPathFilter))
.map(fileInfo -> {
int virtualBucketNumber = getVirtualBucketNumber(bucketCount, fileInfo.getPath());
return splitFactory.createInternalHiveSplit(fileInfo, virtualBucketNumber, virtualBucketNumber, splittable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;

import java.util.Iterator;
import java.util.Optional;

import static com.facebook.presto.hive.util.HiveFileIterator.NestedDirectoryPolicy;

public interface DirectoryLister
{
Iterator<HiveFileInfo> list(FileSystem fileSystem, Path path, NamenodeStats namenodeStats, NestedDirectoryPolicy nestedDirectoryPolicy);

Iterator<HiveFileInfo> list(FileSystem fileSystem, Path path, NamenodeStats namenodeStats, NestedDirectoryPolicy nestedDirectoryPolicy, Optional<PathFilter> pathFilter);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave only a single method in the interface (remove the old one)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;

import java.io.IOException;
Expand All @@ -33,7 +34,13 @@ public class HadoopDirectoryLister
@Override
public Iterator<HiveFileInfo> list(FileSystem fileSystem, Path path, NamenodeStats namenodeStats, NestedDirectoryPolicy nestedDirectoryPolicy)
{
return new HiveFileIterator(path, p -> new HadoopFileInfoIterator(fileSystem.listLocatedStatus(p)), namenodeStats, nestedDirectoryPolicy);
return new HiveFileIterator(path, p -> new HadoopFileInfoIterator(fileSystem.listLocatedStatus(p)), namenodeStats, nestedDirectoryPolicy, Optional.empty());
}

@Override
public Iterator<HiveFileInfo> list(FileSystem fileSystem, Path path, NamenodeStats namenodeStats, NestedDirectoryPolicy nestedDirectoryPolicy, Optional<PathFilter> pathFilter)
{
return new HiveFileIterator(path, p -> new HadoopFileInfoIterator(fileSystem.listLocatedStatus(p)), namenodeStats, nestedDirectoryPolicy, pathFilter);
}

public static class HadoopFileInfoIterator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ public class HiveClientConfig

private boolean pushdownFilterEnabled;
private boolean nestedColumnsFilterEnabled;
private String inputPathFilterClass;

public int getMaxInitialSplits()
{
Expand Down Expand Up @@ -1394,4 +1395,17 @@ public HiveClientConfig setNestedColumnsFilterEnabled(boolean nestedColumnsFilte
this.nestedColumnsFilterEnabled = nestedColumnsFilterEnabled;
return this;
}

public String getInputPathFilterClass()
{
return inputPathFilterClass;
}

@Config("hive.input-path-filter-class")
Comment thread
bhasudha marked this conversation as resolved.
Outdated
@ConfigDescription("Enable PathFilter on files listed by DirectoryLister instead of getting splits from custom InputFormats")
public HiveClientConfig setInputPathFilterClass(String inputPathFilterClass)
{
this.inputPathFilterClass = inputPathFilterClass;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public class HiveSplitManager
private final int splitLoaderConcurrency;
private final boolean recursiveDfsWalkerEnabled;
private final CounterStat highMemorySplitSourceCounter;
private final Optional<String> inputPathFilterClass;

@Inject
public HiveSplitManager(
Expand All @@ -120,7 +121,8 @@ public HiveSplitManager(
hiveClientConfig.getMaxPartitionBatchSize(),
hiveClientConfig.getMaxInitialSplits(),
hiveClientConfig.getSplitLoaderConcurrency(),
hiveClientConfig.getRecursiveDirWalkerEnabled());
hiveClientConfig.getRecursiveDirWalkerEnabled(),
Optional.ofNullable(hiveClientConfig.getInputPathFilterClass()));
}

public HiveSplitManager(
Expand All @@ -137,7 +139,8 @@ public HiveSplitManager(
int maxPartitionBatchSize,
int maxInitialSplits,
int splitLoaderConcurrency,
boolean recursiveDfsWalkerEnabled)
boolean recursiveDfsWalkerEnabled,
Optional<String> inputPathFilterClass)
{
this.hiveTransactionManager = requireNonNull(hiveTransactionManager, "hiveTransactionManager is null");
this.namenodeStats = requireNonNull(namenodeStats, "namenodeStats is null");
Expand All @@ -154,6 +157,7 @@ public HiveSplitManager(
this.maxInitialSplits = maxInitialSplits;
this.splitLoaderConcurrency = splitLoaderConcurrency;
this.recursiveDfsWalkerEnabled = recursiveDfsWalkerEnabled;
this.inputPathFilterClass = inputPathFilterClass;
}

@Override
Expand Down Expand Up @@ -230,7 +234,8 @@ public ConnectorSplitSource getSplits(
executor,
splitLoaderConcurrency,
recursiveDfsWalkerEnabled,
splitSchedulingContext.schedulerUsesHostAddresses());
splitSchedulingContext.schedulerUsesHostAddresses(),
inputPathFilterClass);

HiveSplitSource splitSource;
switch (splitSchedulingContext.getSplitSchedulingStrategy()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.facebook.presto.spi.PrestoException;
import com.google.common.collect.AbstractIterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;

import java.io.FileNotFoundException;
Expand All @@ -27,6 +28,7 @@
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.Optional;

import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILE_NOT_FOUND;
Expand All @@ -46,19 +48,22 @@ public enum NestedDirectoryPolicy
private final ListDirectoryOperation listDirectoryOperation;
private final NamenodeStats namenodeStats;
private final NestedDirectoryPolicy nestedDirectoryPolicy;
private final Optional<PathFilter> pathFilter;

private Iterator<HiveFileInfo> remoteIterator = Collections.emptyIterator();

public HiveFileIterator(
Path path,
ListDirectoryOperation listDirectoryOperation,
NamenodeStats namenodeStats,
NestedDirectoryPolicy nestedDirectoryPolicy)
NestedDirectoryPolicy nestedDirectoryPolicy,
Optional<PathFilter> pathFilter)
{
paths.addLast(requireNonNull(path, "path is null"));
this.listDirectoryOperation = requireNonNull(listDirectoryOperation, "listDirectoryOperation is null");
this.namenodeStats = requireNonNull(namenodeStats, "namenodeStats is null");
this.nestedDirectoryPolicy = requireNonNull(nestedDirectoryPolicy, "nestedDirectoryPolicy is null");
this.pathFilter = requireNonNull(pathFilter, "path filter is null");
Comment thread
bhasudha marked this conversation as resolved.
}

@Override
Expand Down Expand Up @@ -92,14 +97,14 @@ protected HiveFileInfo computeNext()
if (paths.isEmpty()) {
return endOfData();
}
remoteIterator = getLocatedFileStatusRemoteIterator(paths.removeFirst());
remoteIterator = getLocatedFileStatusRemoteIterator(paths.removeFirst(), pathFilter);
}
}

private Iterator<HiveFileInfo> getLocatedFileStatusRemoteIterator(Path path)
private Iterator<HiveFileInfo> getLocatedFileStatusRemoteIterator(Path path, Optional<PathFilter> pathFilter)
{
try (TimeStat.BlockTimer ignored = namenodeStats.getListLocatedStatus().time()) {
return new FileStatusIterator(path, listDirectoryOperation, namenodeStats);
return new FileStatusIterator(path, listDirectoryOperation, namenodeStats, pathFilter);
}
}

Expand All @@ -116,11 +121,14 @@ private static class FileStatusIterator
private final Path path;
private final NamenodeStats namenodeStats;
private final RemoteIterator<HiveFileInfo> fileStatusIterator;
private final Optional<PathFilter> pathFilter;
private HiveFileInfo next;

private FileStatusIterator(Path path, ListDirectoryOperation listDirectoryOperation, NamenodeStats namenodeStats)
private FileStatusIterator(Path path, ListDirectoryOperation listDirectoryOperation, NamenodeStats namenodeStats, Optional<PathFilter> pathFilter)
{
this.path = path;
this.namenodeStats = namenodeStats;
this.pathFilter = pathFilter;
try {
this.fileStatusIterator = listDirectoryOperation.list(path);
}
Expand All @@ -129,26 +137,39 @@ private FileStatusIterator(Path path, ListDirectoryOperation listDirectoryOperat
}
}

@Override
public boolean hasNext()
private void computeNext()
{
if (next != null) {
return;
}
try {
return fileStatusIterator.hasNext();
while (fileStatusIterator.hasNext()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about wrapping the FileStatusIterator in Iterators#filter from guava? So we don't have to implement filtering logic here?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm. I tried doing this. It looks like this accepts only Iterator as the input parameter - https://github.com/google/guava/blob/49f5a6332a63737dff70cf77472f9867bc7ca6eb/guava/src/com/google/common/collect/Iterators.java#L629 However the ListDirectoryOperation.list(path) return RemoteIterator.

HiveFileInfo hiveFileInfo = fileStatusIterator.next();
if (!pathFilter.isPresent() || pathFilter.get().accept(hiveFileInfo.getPath())) {
next = hiveFileInfo;
return;
}
}
}
catch (IOException e) {
throw processException(e);
}
}

@Override
public boolean hasNext()
{
computeNext();
return (next != null);
}

@Override
public HiveFileInfo next()
{
try {
return fileStatusIterator.next();
}
catch (IOException e) {
throw processException(e);
}
computeNext();
HiveFileInfo result = next;
next = null;
return result;
}

private PrestoException processException(IOException exception)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,8 @@ protected final void setup(String databaseName, HiveClientConfig hiveClientConfi
hiveClientConfig.getMaxPartitionBatchSize(),
hiveClientConfig.getMaxInitialSplits(),
hiveClientConfig.getSplitLoaderConcurrency(),
false);
false,
Optional.ofNullable(hiveClientConfig.getInputPathFilterClass()));
pageSinkProvider = new HivePageSinkProvider(
getDefaultHiveFileWriterFactories(hiveClientConfig),
hdfsEnvironment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ protected void setup(String host, int port, String databaseName, Function<HiveCl
config.getMaxPartitionBatchSize(),
config.getMaxInitialSplits(),
config.getSplitLoaderConcurrency(),
config.getRecursiveDirWalkerEnabled());
config.getRecursiveDirWalkerEnabled(),
Optional.ofNullable(config.getInputPathFilterClass()));
pageSinkProvider = new HivePageSinkProvider(
getDefaultHiveFileWriterFactories(config),
hdfsEnvironment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -257,7 +258,8 @@ private static BackgroundHiveSplitLoader backgroundHiveSplitLoader(
EXECUTOR,
2,
false,
false);
false,
Optional.empty());
}

private static BackgroundHiveSplitLoader backgroundHiveSplitLoaderOfflinePartitions()
Expand All @@ -277,7 +279,8 @@ private static BackgroundHiveSplitLoader backgroundHiveSplitLoaderOfflinePartiti
directExecutor(),
2,
false,
false);
false,
Optional.empty());
}

private static Iterable<HivePartitionMetadata> createPartitionMetadataWithOfflinePartitions()
Expand Down Expand Up @@ -397,6 +400,12 @@ public Iterator<HiveFileInfo> list(FileSystem fs, Path path, NamenodeStats namen
{
return files.iterator();
}

@Override
public Iterator<HiveFileInfo> list(FileSystem fs, Path path, NamenodeStats namenodeStats, NestedDirectoryPolicy nestedDirectoryPolicy, Optional<PathFilter> pathFilter)
{
return files.iterator();
}
}

private static class TestingHdfsEnvironment
Expand Down
Loading