Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions presto-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,26 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-hadoop-mr</artifactId>
<version>0.5.1-incubating</version>
<exclusions>
<exclusion>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-memory-context</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.predicate.Domain;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.io.CharStreams;
Expand All @@ -36,13 +38,16 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.HoodieROTablePathFilter;

import java.io.BufferedReader;
import java.io.IOException;
Expand Down Expand Up @@ -111,6 +116,7 @@ public class BackgroundHiveSplitLoader
private final ConcurrentLazyQueue<HivePartitionMetadata> partitions;
private final Deque<Iterator<InternalHiveSplit>> fileIterators = new ConcurrentLinkedDeque<>();
private final boolean schedulerUsesHostAddresses;
private final Supplier<HoodieROTablePathFilter> hoodiePathFilterSupplier;

// Purpose of this lock:
// * Write lock: when you need a consistent view across partitions, fileIterators, and hiveSplitSource.
Expand Down Expand Up @@ -159,6 +165,7 @@ public BackgroundHiveSplitLoader(
this.partitions = new ConcurrentLazyQueue<>(requireNonNull(partitions, "partitions is null"));
this.hdfsContext = new HdfsContext(session, table.getDatabaseName(), table.getTableName());
this.schedulerUsesHostAddresses = schedulerUsesHostAddresses;
this.hoodiePathFilterSupplier = Suppliers.memoize(HoodieROTablePathFilter::new);
}

@Override
Expand Down Expand Up @@ -280,14 +287,14 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)
{
String partitionName = partition.getHivePartition().getPartitionId();
Storage storage = partition.getPartition().map(Partition::getStorage).orElse(table.getStorage());
String inputFormatName = storage.getStorageFormat().getInputFormat();
int partitionDataColumnCount = partition.getPartition()
.map(p -> p.getColumns().size())
.orElse(table.getDataColumns().size());
List<HivePartitionKey> partitionKeys = getPartitionKeys(table, partition.getPartition());

Path path = new Path(getPartitionLocation(table, partition.getPartition()));
Configuration configuration = hdfsEnvironment.getConfiguration(hdfsContext, path);
InputFormat<?, ?> inputFormat = getInputFormat(configuration, storage.getStorageFormat().getInputFormat(), false);
InputFormat<?, ?> inputFormat = getInputFormat(configuration, inputFormatName, false);
FileSystem fs = hdfsEnvironment.getFileSystem(hdfsContext, path);
boolean s3SelectPushdownEnabled = shouldEnablePushdownForTable(session, table, path.toString(), partition.getPartition());

Expand Down Expand Up @@ -359,9 +366,7 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)
bucketConversionRequiresWorkerParticipation ? bucketConversion : Optional.empty()),
schedulerUsesHostAddresses);

// To support custom input formats, we want to call getSplits()
// on the input format to obtain file splits.
if (shouldUseFileSplitsFromInputFormat(inputFormat)) {
if (!isHudiInputFormat(inputFormat) && shouldUseFileSplitsFromInputFormat(inputFormat)) {
if (tableBucketInfo.isPresent()) {
throw new PrestoException(NOT_SUPPORTED, "Presto cannot read bucketed partition in an input format with UseFileSplitsFromInputFormat annotation: " + inputFormat.getClass().getSimpleName());
}
Expand All @@ -371,7 +376,7 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)

return addSplitsToSource(splits, splitFactory);
}

PathFilter pathFilter = isHudiInputFormat(inputFormat) ? hoodiePathFilterSupplier.get() : path1 -> true;
// S3 Select pushdown works at the granularity of individual S3 objects,
// therefore we must not split files when it is enabled.
Properties schema = getHiveSchema(storage.getSerdeParameters(), table.getParameters());
Expand All @@ -385,12 +390,12 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)
checkState(
tableBucketInfo.get().getTableBucketCount() == tableBucketInfo.get().getReadBucketCount(),
"Table and read bucket count should be the same for virtual bucket");
return hiveSplitSource.addToQueue(getVirtuallyBucketedSplits(path, fs, splitFactory, tableBucketInfo.get().getReadBucketCount(), splittable));
return hiveSplitSource.addToQueue(getVirtuallyBucketedSplits(path, fs, splitFactory, tableBucketInfo.get().getReadBucketCount(), splittable, pathFilter));
}
return hiveSplitSource.addToQueue(getBucketedSplits(path, fs, splitFactory, tableBucketInfo.get(), bucketConversion, partitionName, splittable));
return hiveSplitSource.addToQueue(getBucketedSplits(path, fs, splitFactory, tableBucketInfo.get(), bucketConversion, partitionName, splittable, pathFilter));
}

fileIterators.addLast(createInternalHiveSplitIterator(path, fs, splitFactory, splittable));
fileIterators.addLast(createInternalHiveSplitIterator(path, fs, splitFactory, splittable, pathFilter));
return COMPLETED_FUTURE;
}

Expand All @@ -410,6 +415,11 @@ private ListenableFuture<?> addSplitsToSource(InputSplit[] targetSplits, Interna
return lastResult;
}

private static boolean isHudiInputFormat(InputFormat<?, ?> inputFormat)
{
return inputFormat instanceof HoodieParquetInputFormat;
}

private static boolean shouldUseFileSplitsFromInputFormat(InputFormat<?, ?> inputFormat)
{
return Arrays.stream(inputFormat.getClass().getAnnotations())
Expand All @@ -418,9 +428,9 @@ private static boolean shouldUseFileSplitsFromInputFormat(InputFormat<?, ?> inpu
.anyMatch(name -> name.equals("UseFileSplitsFromInputFormat"));
}

private Iterator<InternalHiveSplit> createInternalHiveSplitIterator(Path path, FileSystem fileSystem, InternalHiveSplitFactory splitFactory, boolean splittable)
private Iterator<InternalHiveSplit> createInternalHiveSplitIterator(Path path, FileSystem fileSystem, InternalHiveSplitFactory splitFactory, boolean splittable, PathFilter pathFilter)
{
return stream(directoryLister.list(fileSystem, path, namenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED))
return stream(directoryLister.list(fileSystem, path, namenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED, pathFilter))
.map(status -> splitFactory.createInternalHiveSplit(status, splittable))
.filter(Optional::isPresent)
.map(Optional::get)
Expand All @@ -434,7 +444,8 @@ private List<InternalHiveSplit> getBucketedSplits(
BucketSplitInfo bucketSplitInfo,
Optional<BucketConversion> bucketConversion,
String partitionName,
boolean splittable)
boolean splittable,
PathFilter pathFilter)
{
int readBucketCount = bucketSplitInfo.getReadBucketCount();
int tableBucketCount = bucketSplitInfo.getTableBucketCount();
Expand All @@ -445,7 +456,7 @@ private List<InternalHiveSplit> getBucketedSplits(
// list all files in the partition
List<HiveFileInfo> fileInfos = new ArrayList<>(partitionBucketCount);
try {
Iterators.addAll(fileInfos, directoryLister.list(fileSystem, path, namenodeStats, FAIL));
Iterators.addAll(fileInfos, directoryLister.list(fileSystem, path, namenodeStats, FAIL, pathFilter));
}
catch (NestedDirectoryNotAllowedException e) {
// Fail here to be on the safe side. This seems to be the same as what Hive does
Expand Down Expand Up @@ -510,10 +521,10 @@ private List<InternalHiveSplit> getBucketedSplits(
return splitList;
}

private List<InternalHiveSplit> getVirtuallyBucketedSplits(Path path, FileSystem fileSystem, InternalHiveSplitFactory splitFactory, int bucketCount, boolean splittable)
private List<InternalHiveSplit> getVirtuallyBucketedSplits(Path path, FileSystem fileSystem, InternalHiveSplitFactory splitFactory, int bucketCount, boolean splittable, PathFilter pathFilter)
{
// List all files recursively in the partition and assign virtual bucket number to each of them
return stream(directoryLister.list(fileSystem, path, namenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED))
return stream(directoryLister.list(fileSystem, path, namenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED, pathFilter))
.map(fileInfo -> {
int virtualBucketNumber = getVirtualBucketNumber(bucketCount, fileInfo.getPath());
return splitFactory.createInternalHiveSplit(fileInfo, virtualBucketNumber, virtualBucketNumber, splittable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;

import java.util.Iterator;

import static com.facebook.presto.hive.util.HiveFileIterator.NestedDirectoryPolicy;

public interface DirectoryLister
{
Iterator<HiveFileInfo> list(FileSystem fileSystem, Path path, NamenodeStats namenodeStats, NestedDirectoryPolicy nestedDirectoryPolicy);
Iterator<HiveFileInfo> list(FileSystem fileSystem, Path path, NamenodeStats namenodeStats, NestedDirectoryPolicy nestedDirectoryPolicy, PathFilter pathFilter);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;

import java.io.IOException;
Expand All @@ -31,9 +32,9 @@ public class HadoopDirectoryLister
implements DirectoryLister
{
@Override
public Iterator<HiveFileInfo> list(FileSystem fileSystem, Path path, NamenodeStats namenodeStats, NestedDirectoryPolicy nestedDirectoryPolicy)
public Iterator<HiveFileInfo> list(FileSystem fileSystem, Path path, NamenodeStats namenodeStats, NestedDirectoryPolicy nestedDirectoryPolicy, PathFilter pathFilter)
{
return new HiveFileIterator(path, p -> new HadoopFileInfoIterator(fileSystem.listLocatedStatus(p)), namenodeStats, nestedDirectoryPolicy);
return new HiveFileIterator(path, p -> new HadoopFileInfoIterator(fileSystem.listLocatedStatus(p)), namenodeStats, nestedDirectoryPolicy, pathFilter);
}

public static class HadoopFileInfoIterator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import com.facebook.presto.hive.NamenodeStats;
import com.facebook.presto.spi.PrestoException;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterators;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;

import java.io.FileNotFoundException;
Expand Down Expand Up @@ -46,19 +48,22 @@ public enum NestedDirectoryPolicy
private final ListDirectoryOperation listDirectoryOperation;
private final NamenodeStats namenodeStats;
private final NestedDirectoryPolicy nestedDirectoryPolicy;
private final PathFilter pathFilter;

private Iterator<HiveFileInfo> remoteIterator = Collections.emptyIterator();

public HiveFileIterator(
Path path,
ListDirectoryOperation listDirectoryOperation,
NamenodeStats namenodeStats,
NestedDirectoryPolicy nestedDirectoryPolicy)
NestedDirectoryPolicy nestedDirectoryPolicy,
PathFilter pathFilter)
{
paths.addLast(requireNonNull(path, "path is null"));
this.listDirectoryOperation = requireNonNull(listDirectoryOperation, "listDirectoryOperation is null");
this.namenodeStats = requireNonNull(namenodeStats, "namenodeStats is null");
this.nestedDirectoryPolicy = requireNonNull(nestedDirectoryPolicy, "nestedDirectoryPolicy is null");
this.pathFilter = requireNonNull(pathFilter, "pathFilter is null");
}

@Override
Expand Down Expand Up @@ -92,14 +97,14 @@ protected HiveFileInfo computeNext()
if (paths.isEmpty()) {
return endOfData();
}
remoteIterator = getLocatedFileStatusRemoteIterator(paths.removeFirst());
remoteIterator = getLocatedFileStatusRemoteIterator(paths.removeFirst(), pathFilter);
}
}

private Iterator<HiveFileInfo> getLocatedFileStatusRemoteIterator(Path path)
private Iterator<HiveFileInfo> getLocatedFileStatusRemoteIterator(Path path, PathFilter pathFilter)
{
try (TimeStat.BlockTimer ignored = namenodeStats.getListLocatedStatus().time()) {
return new FileStatusIterator(path, listDirectoryOperation, namenodeStats);
return Iterators.filter(new FileStatusIterator(path, listDirectoryOperation, namenodeStats), input -> pathFilter.accept(input.getPath()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -393,7 +394,7 @@ public TestingDirectoryLister(List<HiveFileInfo> files)
}

@Override
public Iterator<HiveFileInfo> list(FileSystem fs, Path path, NamenodeStats namenodeStats, NestedDirectoryPolicy nestedDirectoryPolicy)
public Iterator<HiveFileInfo> list(FileSystem fs, Path path, NamenodeStats namenodeStats, NestedDirectoryPolicy nestedDirectoryPolicy, PathFilter pathFilter)
{
return files.iterator();
}
Expand Down
Loading