diff --git a/pom.xml b/pom.xml
index a5cb00fb69a94..eb765a7615e45 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,7 +70,7 @@
0.9.3
0.19.0
2.3.1
- 0.10.1
+ 0.9.0
1.15.1
org.apache.hudi
- hudi-presto-bundle
+ hudi-common
${dep.hudi.version}
- com.google.protobuf
- protobuf-java
+ org.apache.hbase
+ hbase-server
+
+
+ org.apache.orc
+ orc-core
+
+
+ org.objenesis
+ objenesis
+
+
+ commons-logging
+ commons-logging
+
+
+ org.slf4j
+ jcl-over-slf4j
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ org.apache.httpcomponents
+ httpclient
- commons-lang
- commons-lang
+ org.apache.httpcomponents
+ fluent-hc
+
+
+ org.rocksdb
+ rocksdbjni
+
+
+ com.esotericsoftware
+ kryo-shaded
+
+
+
+
+
+ org.apache.hudi
+ hudi-hadoop-mr
+ ${dep.hudi.version}
+
+
+ org.apache.hbase
+ hbase-server
+
+
+ org.apache.orc
+ orc-core
+
+
+ org.objenesis
+ objenesis
+
+
+ commons-logging
+ commons-logging
+
+
+ org.slf4j
+ jcl-over-slf4j
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ org.apache.httpcomponents
+ httpclient
+
+
+ org.apache.httpcomponents
+ fluent-hc
- org.apache.hudi
- hudi-common
+ org.rocksdb
+ rocksdbjni
- org.apache.hudi
- hudi-hadoop-mr-bundle
+ com.esotericsoftware
+ kryo-shaded
-
net.sf.opencsv
diff --git a/presto-hive/pom.xml b/presto-hive/pom.xml
index 471ec8643ba2b..6b7460ca968bc 100644
--- a/presto-hive/pom.xml
+++ b/presto-hive/pom.xml
@@ -49,7 +49,12 @@
org.apache.hudi
- hudi-presto-bundle
+ hudi-common
+
+
+
+ org.apache.hudi
+ hudi-hadoop-mr
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/CachingDirectoryLister.java b/presto-hive/src/main/java/com/facebook/presto/hive/CachingDirectoryLister.java
index 40efb29e9fa07..9161441899e7e 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/CachingDirectoryLister.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/CachingDirectoryLister.java
@@ -23,6 +23,7 @@
import com.google.common.collect.ImmutableSet;
import io.airlift.units.Duration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.weakref.jmx.Managed;
import javax.inject.Inject;
@@ -73,6 +74,7 @@ public Iterator list(
Table table,
Path path,
NamenodeStats namenodeStats,
+ PathFilter pathFilter,
HiveDirectoryContext hiveDirectoryContext)
{
List files = cache.getIfPresent(path);
@@ -80,7 +82,7 @@ public Iterator list(
return files.iterator();
}
- Iterator iterator = delegate.list(fileSystem, table, path, namenodeStats, hiveDirectoryContext);
+ Iterator iterator = delegate.list(fileSystem, table, path, namenodeStats, pathFilter, hiveDirectoryContext);
if (hiveDirectoryContext.isCacheable() && cachedTableChecker.isCachedTable(table.getSchemaTableName())) {
return cachingIterator(iterator, path);
}
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/DirectoryLister.java b/presto-hive/src/main/java/com/facebook/presto/hive/DirectoryLister.java
index e8583adb42e41..c61aa2c75ef60 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/DirectoryLister.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/DirectoryLister.java
@@ -16,6 +16,7 @@
import com.facebook.presto.hive.filesystem.ExtendedFileSystem;
import com.facebook.presto.hive.metastore.Table;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import java.util.Iterator;
@@ -26,5 +27,6 @@ Iterator list(
Table table,
Path path,
NamenodeStats namenodeStats,
+ PathFilter pathFilter,
HiveDirectoryContext hiveDirectoryContext);
}
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HadoopDirectoryLister.java b/presto-hive/src/main/java/com/facebook/presto/hive/HadoopDirectoryLister.java
index 07ca5ebe235cc..b5f65f28941a7 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/HadoopDirectoryLister.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/HadoopDirectoryLister.java
@@ -18,6 +18,7 @@
import com.facebook.presto.hive.util.HiveFileIterator;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import java.io.IOException;
@@ -36,13 +37,15 @@ public Iterator list(
Table table,
Path path,
NamenodeStats namenodeStats,
+ PathFilter pathFilter,
HiveDirectoryContext hiveDirectoryContext)
{
return new HiveFileIterator(
path,
p -> new HadoopFileInfoIterator(fileSystem.listLocatedStatus(p)),
namenodeStats,
- hiveDirectoryContext.getNestedDirectoryPolicy());
+ hiveDirectoryContext.getNestedDirectoryPolicy(),
+ pathFilter);
}
public static class HadoopFileInfoIterator
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java
index 568850ae255a3..28b6abd0fdc52 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java
@@ -200,7 +200,6 @@ public class HiveClientConfig
private boolean verboseRuntimeStatsEnabled;
private boolean useRecordPageSourceForCustomSplit = true;
- private boolean hudiMetadataEnabled;
private boolean sizeBasedSplitWeightsEnabled = true;
private double minimumAssignedSplitWeight = 0.05;
@@ -1787,17 +1786,4 @@ public HiveClientConfig setFileSplittable(boolean fileSplittable)
this.fileSplittable = fileSplittable;
return this;
}
-
- @Config("hive.hudi-metadata-enabled")
- @ConfigDescription("For Hudi tables prefer to fetch the list of file names, sizes and other metadata from the internal metadata table rather than storage")
- public HiveClientConfig setHudiMetadataEnabled(boolean hudiMetadataEnabled)
- {
- this.hudiMetadataEnabled = hudiMetadataEnabled;
- return this;
- }
-
- public boolean isHudiMetadataEnabled()
- {
- return this.hudiMetadataEnabled;
- }
}
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java
index 4482a6791b9bc..160d2f1ce45f3 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java
@@ -140,7 +140,6 @@ public final class HiveSessionProperties
private static final String USE_RECORD_PAGE_SOURCE_FOR_CUSTOM_SPLIT = "use_record_page_source_for_custom_split";
public static final String MAX_INITIAL_SPLITS = "max_initial_splits";
public static final String FILE_SPLITTABLE = "file_splittable";
- private static final String HUDI_METADATA_ENABLED = "hudi_metadata_enabled";
private final List> sessionProperties;
@@ -684,12 +683,7 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
FILE_SPLITTABLE,
"If a hive file is splittable when coordinator schedules splits",
hiveClientConfig.isFileSplittable(),
- true),
- booleanProperty(
- HUDI_METADATA_ENABLED,
- "For Hudi tables prefer to fetch the list of file names, sizes and other metadata from the internal metadata table rather than storage",
- hiveClientConfig.isHudiMetadataEnabled(),
- false));
+ true));
}
public List> getSessionProperties()
@@ -1193,9 +1187,4 @@ public static boolean isFileSplittable(ConnectorSession session)
{
return session.getProperty(FILE_SPLITTABLE, Boolean.class);
}
-
- public static boolean isHudiMetadataEnabled(ConnectorSession session)
- {
- return session.getProperty(HUDI_METADATA_ENABLED, Boolean.class);
- }
}
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java
index 68f215edbea0b..15d7ee77ca47f 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java
@@ -385,19 +385,14 @@ static boolean shouldUseRecordReaderFromInputFormat(Configuration configuration,
.anyMatch(USE_RECORD_READER_FROM_INPUT_FORMAT_ANNOTATION::equals);
}
- static boolean shouldUseFileSplitsFromInputFormat(InputFormat, ?> inputFormat, DirectoryLister directoryLister)
+ static boolean shouldUseFileSplitsFromInputFormat(InputFormat, ?> inputFormat, Configuration conf, String tablePath)
{
- if (directoryLister instanceof HudiDirectoryLister) {
- boolean hasUseSplitsAnnotation = Arrays.stream(inputFormat.getClass().getAnnotations())
- .map(Annotation::annotationType)
- .map(Class::getSimpleName)
- .anyMatch(USE_FILE_SPLITS_FROM_INPUT_FORMAT_ANNOTATION::equals);
-
- return hasUseSplitsAnnotation &&
- (!isHudiParquetInputFormat(inputFormat) || shouldUseFileSplitsForHudi(inputFormat, ((HudiDirectoryLister) directoryLister).getMetaClient()));
- }
+ boolean hasUseSplitsAnnotation = Arrays.stream(inputFormat.getClass().getAnnotations())
+ .map(Annotation::annotationType)
+ .map(Class::getSimpleName)
+ .anyMatch(USE_FILE_SPLITS_FROM_INPUT_FORMAT_ANNOTATION::equals);
- return false;
+ return hasUseSplitsAnnotation && (!isHudiParquetInputFormat(inputFormat) || shouldUseFileSplitsForHudi(inputFormat, conf, tablePath));
}
static boolean isHudiParquetInputFormat(InputFormat, ?> inputFormat)
@@ -405,13 +400,14 @@ static boolean isHudiParquetInputFormat(InputFormat, ?> inputFormat)
return inputFormat instanceof HoodieParquetInputFormat;
}
- private static boolean shouldUseFileSplitsForHudi(InputFormat, ?> inputFormat, HoodieTableMetaClient metaClient)
+ private static boolean shouldUseFileSplitsForHudi(InputFormat, ?> inputFormat, Configuration conf, String tablePath)
{
if (inputFormat instanceof HoodieParquetRealtimeInputFormat) {
return true;
}
- return metaClient.getTableConfig().getBootstrapBasePath().isPresent();
+ HoodieTableMetaClient hoodieTableMetaClient = HoodieTableMetaClient.builder().setConf(conf).setBasePath(tablePath).build();
+ return hoodieTableMetaClient.getTableConfig().getBootstrapBasePath().isPresent();
}
public static long parseHiveDate(String value)
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HudiDirectoryLister.java b/presto-hive/src/main/java/com/facebook/presto/hive/HudiDirectoryLister.java
deleted file mode 100644
index c323e6e6f0bf5..0000000000000
--- a/presto-hive/src/main/java/com/facebook/presto/hive/HudiDirectoryLister.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.facebook.presto.hive;
-
-import com.facebook.airlift.log.Logger;
-import com.facebook.presto.hive.filesystem.ExtendedFileSystem;
-import com.facebook.presto.hive.metastore.Table;
-import com.facebook.presto.hive.util.HiveFileIterator;
-import com.facebook.presto.spi.ConnectorSession;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.engine.HoodieLocalEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.view.FileSystemViewManager;
-import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Optional;
-
-import static com.facebook.presto.hive.HiveFileInfo.createHiveFileInfo;
-import static com.facebook.presto.hive.HiveSessionProperties.isHudiMetadataEnabled;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT;
-
-public class HudiDirectoryLister
- implements DirectoryLister
-{
- private static final Logger log = Logger.get(HudiDirectoryLister.class);
-
- private final HoodieTableFileSystemView fileSystemView;
- private final HoodieTableMetaClient metaClient;
- private final boolean metadataEnabled;
-
- public HudiDirectoryLister(Configuration conf, ConnectorSession session, Table table)
- {
- log.info("Using Hudi Directory Lister.");
- this.metadataEnabled = isHudiMetadataEnabled(session);
- this.metaClient = HoodieTableMetaClient.builder()
- .setConf(conf)
- .setBasePath(table.getStorage().getLocation())
- .build();
- HoodieEngineContext engineContext = new HoodieLocalEngineContext(conf);
- HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
- .enable(metadataEnabled)
- .build();
- this.fileSystemView = FileSystemViewManager.createInMemoryFileSystemView(engineContext, metaClient, metadataConfig);
- }
-
- public HoodieTableMetaClient getMetaClient()
- {
- return metaClient;
- }
-
- @Override
- public Iterator list(
- ExtendedFileSystem fileSystem,
- Table table,
- Path path,
- NamenodeStats namenodeStats,
- HiveDirectoryContext hiveDirectoryContext)
- {
- log.debug("Listing path using Hudi directory lister: %s", path.toString());
- return new HiveFileIterator(
- path,
- p -> new HudiFileInfoIterator(
- fileSystemView,
- metadataEnabled ? Optional.empty() : Optional.of(fileSystem.listStatus(p)),
- table.getStorage().getLocation(),
- p),
- namenodeStats,
- hiveDirectoryContext.getNestedDirectoryPolicy());
- }
-
- public static class HudiFileInfoIterator
- implements RemoteIterator
- {
- private final Iterator hoodieBaseFileIterator;
-
- public HudiFileInfoIterator(
- HoodieTableFileSystemView fileSystemView,
- Optional fileStatuses,
- String tablePath,
- Path directory)
- {
- String partition = FSUtils.getRelativePartitionPath(new Path(tablePath), directory);
- if (fileStatuses.isPresent()) {
- fileSystemView.addFilesToView(fileStatuses.get());
- this.hoodieBaseFileIterator = fileSystemView.fetchLatestBaseFiles(partition).iterator();
- }
- else {
- this.hoodieBaseFileIterator = fileSystemView.getLatestBaseFiles(partition).iterator();
- }
- }
-
- @Override
- public boolean hasNext()
- {
- return hoodieBaseFileIterator.hasNext();
- }
-
- @Override
- public HiveFileInfo next()
- throws IOException
- {
- FileStatus fileStatus = hoodieBaseFileIterator.next().getFileStatus();
- String[] name = new String[] {"localhost:" + DFS_DATANODE_DEFAULT_PORT};
- String[] host = new String[] {"localhost"};
- LocatedFileStatus hoodieFileStatus = new LocatedFileStatus(fileStatus,
- new BlockLocation[] {new BlockLocation(name, host, 0L, fileStatus.getLen())});
- return createHiveFileInfo(hoodieFileStatus, Optional.empty());
- }
- }
-}
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/ManifestPartitionLoader.java b/presto-hive/src/main/java/com/facebook/presto/hive/ManifestPartitionLoader.java
index 8ec5f783f609b..9b7f126265664 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/ManifestPartitionLoader.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/ManifestPartitionLoader.java
@@ -188,7 +188,7 @@ private void validateManifest(HivePartitionMetadata partition, Path path, List fileInfoIterator = directoryLister.list(fileSystem, table, path, namenodeStats, hiveDirectoryContext);
+ Iterator fileInfoIterator = directoryLister.list(fileSystem, table, path, namenodeStats, ignore -> true, hiveDirectoryContext);
int fileCount = 0;
while (fileInfoIterator.hasNext()) {
HiveFileInfo fileInfo = fileInfoIterator.next();
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java b/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java
index d3d10ffc19948..f7a0583dc4d53 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java
@@ -23,6 +23,10 @@
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
+import com.google.common.base.Function;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
@@ -32,6 +36,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
@@ -39,6 +44,7 @@
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hudi.hadoop.HoodieROTablePathFilter;
import java.io.BufferedReader;
import java.io.IOException;
@@ -81,7 +87,6 @@
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Maps.fromProperties;
import static com.google.common.collect.Streams.stream;
@@ -107,6 +112,7 @@ public class StoragePartitionLoader
private final ConnectorSession session;
private final Deque> fileIterators;
private final boolean schedulerUsesHostAddresses;
+ private final LoadingCache hoodiePathFilterLoadingCache;
private final boolean partialAggregationsPushedDown;
public StoragePartitionLoader(
@@ -128,21 +134,15 @@ public StoragePartitionLoader(
this.session = requireNonNull(session, "session is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.namenodeStats = requireNonNull(namenodeStats, "namenodeStats is null");
+ this.directoryLister = requireNonNull(directoryLister, "directoryLister is null");
this.recursiveDirWalkerEnabled = recursiveDirWalkerEnabled;
this.hdfsContext = new HdfsContext(session, table.getDatabaseName(), table.getTableName(), table.getStorage().getLocation(), false);
this.fileIterators = requireNonNull(fileIterators, "fileIterators is null");
this.schedulerUsesHostAddresses = schedulerUsesHostAddresses;
+ this.hoodiePathFilterLoadingCache = CacheBuilder.newBuilder()
+ .maximumSize(1000)
+ .build(CacheLoader.from((Function) HoodieROTablePathFilter::new));
this.partialAggregationsPushedDown = partialAggregationsPushedDown;
-
- Optional directoryListerOverride = Optional.empty();
- if (!isNullOrEmpty(table.getStorage().getLocation())) {
- Configuration configuration = hdfsEnvironment.getConfiguration(hdfsContext, new Path(table.getStorage().getLocation()));
- InputFormat, ?> inputFormat = getInputFormat(configuration, table.getStorage().getStorageFormat().getInputFormat(), false);
- if (isHudiParquetInputFormat(inputFormat)) {
- directoryListerOverride = Optional.of(new HudiDirectoryLister(configuration, session, table));
- }
- }
- this.directoryLister = directoryListerOverride.orElseGet(() -> requireNonNull(directoryLister, "directoryLister is null"));
}
@Override
@@ -249,7 +249,7 @@ public ListenableFuture> loadPartition(HivePartitionMetadata partition, HiveSp
schedulerUsesHostAddresses,
partition.getEncryptionInformation());
- if (shouldUseFileSplitsFromInputFormat(inputFormat, directoryLister)) {
+ if (shouldUseFileSplitsFromInputFormat(inputFormat, configuration, table.getStorage().getLocation())) {
if (tableBucketInfo.isPresent()) {
throw new PrestoException(NOT_SUPPORTED, "Presto cannot read bucketed partition in an input format with UseFileSplitsFromInputFormat annotation: " + inputFormat.getClass().getSimpleName());
}
@@ -261,6 +261,7 @@ public ListenableFuture> loadPartition(HivePartitionMetadata partition, HiveSp
return addSplitsToSource(splits, splitFactory, hiveSplitSource, stopped);
}
+ PathFilter pathFilter = isHudiParquetInputFormat(inputFormat) ? hoodiePathFilterLoadingCache.getUnchecked(configuration) : path1 -> true;
// Streaming aggregation works at the granularity of individual files
// S3 Select pushdown works at the granularity of individual S3 objects,
// Partial aggregation pushdown works at the granularity of individual files
@@ -280,12 +281,12 @@ public ListenableFuture> loadPartition(HivePartitionMetadata partition, HiveSp
checkState(
tableBucketInfo.get().getTableBucketCount() == tableBucketInfo.get().getReadBucketCount(),
"Table and read bucket count should be the same for virtual bucket");
- return hiveSplitSource.addToQueue(getVirtuallyBucketedSplits(path, fs, splitFactory, tableBucketInfo.get().getReadBucketCount(), splittable));
+ return hiveSplitSource.addToQueue(getVirtuallyBucketedSplits(path, fs, splitFactory, tableBucketInfo.get().getReadBucketCount(), splittable, pathFilter));
}
- return hiveSplitSource.addToQueue(getBucketedSplits(path, fs, splitFactory, tableBucketInfo.get(), bucketConversion, partitionName, splittable));
+ return hiveSplitSource.addToQueue(getBucketedSplits(path, fs, splitFactory, tableBucketInfo.get(), bucketConversion, partitionName, splittable, pathFilter));
}
- fileIterators.addLast(createInternalHiveSplitIterator(path, fs, splitFactory, splittable, partition.getPartition()));
+ fileIterators.addLast(createInternalHiveSplitIterator(path, fs, splitFactory, splittable, pathFilter, partition.getPartition()));
return COMPLETED_FUTURE;
}
@@ -305,7 +306,7 @@ private ListenableFuture> addSplitsToSource(InputSplit[] targetSplits, Interna
return lastResult;
}
- private Iterator createInternalHiveSplitIterator(Path path, ExtendedFileSystem fileSystem, InternalHiveSplitFactory splitFactory, boolean splittable, Optional partition)
+ private Iterator createInternalHiveSplitIterator(Path path, ExtendedFileSystem fileSystem, InternalHiveSplitFactory splitFactory, boolean splittable, PathFilter pathFilter, Optional partition)
{
boolean cacheable = isUseListDirectoryCache(session);
if (partition.isPresent()) {
@@ -314,7 +315,7 @@ private Iterator createInternalHiveSplitIterator(Path path, E
}
HiveDirectoryContext hiveDirectoryContext = new HiveDirectoryContext(recursiveDirWalkerEnabled ? RECURSE : IGNORED, cacheable);
- return stream(directoryLister.list(fileSystem, table, path, namenodeStats, hiveDirectoryContext))
+ return stream(directoryLister.list(fileSystem, table, path, namenodeStats, pathFilter, hiveDirectoryContext))
.map(status -> splitFactory.createInternalHiveSplit(status, splittable))
.filter(Optional::isPresent)
.map(Optional::get)
@@ -328,7 +329,8 @@ private List getBucketedSplits(
BucketSplitInfo bucketSplitInfo,
Optional bucketConversion,
String partitionName,
- boolean splittable)
+ boolean splittable,
+ PathFilter pathFilter)
{
int readBucketCount = bucketSplitInfo.getReadBucketCount();
int tableBucketCount = bucketSplitInfo.getTableBucketCount();
@@ -340,7 +342,7 @@ private List getBucketedSplits(
// list all files in the partition
List fileInfos = new ArrayList<>(partitionBucketCount);
try {
- Iterators.addAll(fileInfos, directoryLister.list(fileSystem, table, path, namenodeStats, new HiveDirectoryContext(FAIL, isUseListDirectoryCache(session))));
+ Iterators.addAll(fileInfos, directoryLister.list(fileSystem, table, path, namenodeStats, pathFilter, new HiveDirectoryContext(FAIL, isUseListDirectoryCache(session))));
}
catch (HiveFileIterator.NestedDirectoryNotAllowedException e) {
// Fail here to be on the safe side. This seems to be the same as what Hive does
@@ -459,11 +461,11 @@ private List getBucketedSplits(
return splitList;
}
- private List getVirtuallyBucketedSplits(Path path, ExtendedFileSystem fileSystem, InternalHiveSplitFactory splitFactory, int bucketCount, boolean splittable)
+ private List getVirtuallyBucketedSplits(Path path, ExtendedFileSystem fileSystem, InternalHiveSplitFactory splitFactory, int bucketCount, boolean splittable, PathFilter pathFilter)
{
// List all files recursively in the partition and assign virtual bucket number to each of them
HiveDirectoryContext hiveDirectoryContext = new HiveDirectoryContext(recursiveDirWalkerEnabled ? RECURSE : IGNORED, isUseListDirectoryCache(session));
- return stream(directoryLister.list(fileSystem, table, path, namenodeStats, hiveDirectoryContext))
+ return stream(directoryLister.list(fileSystem, table, path, namenodeStats, pathFilter, hiveDirectoryContext))
.map(fileInfo -> {
int virtualBucketNumber = getVirtualBucketNumber(bucketCount, fileInfo.getPath());
return splitFactory.createInternalHiveSplit(fileInfo, virtualBucketNumber, virtualBucketNumber, splittable);
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/util/HiveFileIterator.java b/presto-hive/src/main/java/com/facebook/presto/hive/util/HiveFileIterator.java
index 664ece31dcd64..7749eb78148af 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/util/HiveFileIterator.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/util/HiveFileIterator.java
@@ -19,7 +19,9 @@
import com.facebook.presto.hive.NestedDirectoryPolicy;
import com.facebook.presto.spi.PrestoException;
import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterators;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.security.AccessControlException;
@@ -42,6 +44,7 @@ public class HiveFileIterator
private final ListDirectoryOperation listDirectoryOperation;
private final NamenodeStats namenodeStats;
private final NestedDirectoryPolicy nestedDirectoryPolicy;
+ private final PathFilter pathFilter;
private Iterator remoteIterator = Collections.emptyIterator();
@@ -49,12 +52,14 @@ public HiveFileIterator(
Path path,
ListDirectoryOperation listDirectoryOperation,
NamenodeStats namenodeStats,
- NestedDirectoryPolicy nestedDirectoryPolicy)
+ NestedDirectoryPolicy nestedDirectoryPolicy,
+ PathFilter pathFilter)
{
paths.addLast(requireNonNull(path, "path is null"));
this.listDirectoryOperation = requireNonNull(listDirectoryOperation, "listDirectoryOperation is null");
this.namenodeStats = requireNonNull(namenodeStats, "namenodeStats is null");
this.nestedDirectoryPolicy = requireNonNull(nestedDirectoryPolicy, "nestedDirectoryPolicy is null");
+ this.pathFilter = requireNonNull(pathFilter, "pathFilter is null");
}
@Override
@@ -88,14 +93,14 @@ protected HiveFileInfo computeNext()
if (paths.isEmpty()) {
return endOfData();
}
- remoteIterator = getLocatedFileStatusRemoteIterator(paths.removeFirst());
+ remoteIterator = getLocatedFileStatusRemoteIterator(paths.removeFirst(), pathFilter);
}
}
- private Iterator getLocatedFileStatusRemoteIterator(Path path)
+ private Iterator getLocatedFileStatusRemoteIterator(Path path, PathFilter pathFilter)
{
try (TimeStat.BlockTimer ignored = namenodeStats.getListLocatedStatus().time()) {
- return new FileStatusIterator(path, listDirectoryOperation, namenodeStats);
+ return Iterators.filter(new FileStatusIterator(path, listDirectoryOperation, namenodeStats), input -> pathFilter.accept(input.getPath()));
}
}
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/util/HudiRealtimeBootstrapBaseFileSplitConverter.java b/presto-hive/src/main/java/com/facebook/presto/hive/util/HudiRealtimeBootstrapBaseFileSplitConverter.java
index 8c4f6119beacf..ba4f02ddf8b03 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/util/HudiRealtimeBootstrapBaseFileSplitConverter.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/util/HudiRealtimeBootstrapBaseFileSplitConverter.java
@@ -16,7 +16,6 @@
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.hadoop.realtime.RealtimeBootstrapBaseFileSplit;
import java.io.IOException;
@@ -25,7 +24,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.stream.Collectors;
import static com.facebook.presto.hive.HiveUtil.CUSTOM_FILE_SPLIT_CLASS_KEY;
import static com.google.common.base.Strings.isNullOrEmpty;
@@ -70,13 +68,12 @@ public Optional recreateFileSplitWithCustomInfo(FileSplit split, Map<
if (!isNullOrEmpty(customFileSplitClass) && RealtimeBootstrapBaseFileSplit.class.getName().equals(customFileSplitClass)) {
String deltaFilePaths = customSplitInfo.get(DELTA_FILE_PATHS_KEY);
List deltaLogPaths = isNullOrEmpty(deltaFilePaths) ? Collections.emptyList() : Arrays.asList(deltaFilePaths.split(","));
- List deltaLogFiles = deltaLogPaths.stream().map(p -> new HoodieLogFile(new Path(p))).collect(Collectors.toList());
FileSplit bootstrapFileSplit = new FileSplit(
new Path(customSplitInfo.get(BOOTSTRAP_FILE_SPLIT_PATH)),
parseLong(customSplitInfo.get(BOOTSTRAP_FILE_SPLIT_START)),
parseLong(customSplitInfo.get(BOOTSTRAP_FILE_SPLIT_LEN)),
(String[]) null);
- split = new RealtimeBootstrapBaseFileSplit(split, customSplitInfo.get(BASE_PATH_KEY), deltaLogFiles,
+ split = new RealtimeBootstrapBaseFileSplit(split, customSplitInfo.get(BASE_PATH_KEY), deltaLogPaths,
customSplitInfo.get(MAX_COMMIT_TIME_KEY), bootstrapFileSplit);
return Optional.of(split);
}
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/util/HudiRealtimeSplitConverter.java b/presto-hive/src/main/java/com/facebook/presto/hive/util/HudiRealtimeSplitConverter.java
index 5868ae1a9fbfd..5edcbc8811575 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/util/HudiRealtimeSplitConverter.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/util/HudiRealtimeSplitConverter.java
@@ -14,9 +14,7 @@
package com.facebook.presto.hive.util;
import com.google.common.collect.ImmutableMap;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
@@ -25,7 +23,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.stream.Collectors;
import static com.facebook.presto.hive.HiveUtil.CUSTOM_FILE_SPLIT_CLASS_KEY;
import static java.util.Objects.requireNonNull;
@@ -65,11 +62,10 @@ public Optional recreateFileSplitWithCustomInfo(FileSplit split, Map<
if (HoodieRealtimeFileSplit.class.getName().equals(customSplitClass)) {
requireNonNull(customSplitInfo.get(HUDI_DELTA_FILEPATHS_KEY), "HUDI_DELTA_FILEPATHS_KEY is missing");
List deltaLogPaths = Arrays.asList(customSplitInfo.get(HUDI_DELTA_FILEPATHS_KEY).split(","));
- List deltaLogFiles = deltaLogPaths.stream().map(p -> new HoodieLogFile(new Path(p))).collect(Collectors.toList());
return Optional.of(new HoodieRealtimeFileSplit(
split,
requireNonNull(customSplitInfo.get(HUDI_BASEPATH_KEY), "HUDI_BASEPATH_KEY is missing"),
- deltaLogFiles,
+ deltaLogPaths,
requireNonNull(customSplitInfo.get(HUDI_MAX_COMMIT_TIME_KEY), "HUDI_MAX_COMMIT_TIME_KEY is missing"),
Option.empty()));
}
diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java
index 58da4895f968e..69f3e2b48bfa6 100644
--- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java
+++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java
@@ -163,8 +163,7 @@ public void testDefaults()
.setMinimumAssignedSplitWeight(0.05)
.setUserDefinedTypeEncodingEnabled(false)
.setUseRecordPageSourceForCustomSplit(true)
- .setFileSplittable(true)
- .setHudiMetadataEnabled(false));
+ .setFileSplittable(true));
}
@Test
@@ -289,7 +288,6 @@ public void testExplicitPropertyMappings()
.put("hive.minimum-assigned-split-weight", "1.0")
.put("hive.use-record-page-source-for-custom-split", "false")
.put("hive.file-splittable", "false")
- .put("hive.hudi-metadata-enabled", "true")
.build();
HiveClientConfig expected = new HiveClientConfig()
@@ -410,8 +408,7 @@ public void testExplicitPropertyMappings()
.setMinimumAssignedSplitWeight(1.0)
.setUserDefinedTypeEncodingEnabled(true)
.setUseRecordPageSourceForCustomSplit(false)
- .setFileSplittable(false)
- .setHudiMetadataEnabled(true);
+ .setFileSplittable(false);
ConfigAssertions.assertFullMapping(properties, expected);
}
diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplitManager.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplitManager.java
index e6ba0c97e4448..9a65313223b63 100644
--- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplitManager.java
+++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplitManager.java
@@ -50,6 +50,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.joda.time.DateTimeZone;
import org.testng.annotations.AfterClass;
@@ -629,7 +630,7 @@ private static class TestingDirectoryLister
implements DirectoryLister
{
@Override
- public Iterator list(ExtendedFileSystem fileSystem, Table table, Path path, NamenodeStats namenodeStats, HiveDirectoryContext hiveDirectoryContext)
+ public Iterator list(ExtendedFileSystem fileSystem, Table table, Path path, NamenodeStats namenodeStats, PathFilter pathFilter, HiveDirectoryContext hiveDirectoryContext)
{
try {
return ImmutableList.of(
diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHudiDirectoryLister.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHudiDirectoryLister.java
deleted file mode 100644
index 46c2c3bc7606e..0000000000000
--- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHudiDirectoryLister.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.facebook.presto.hive;
-
-import com.facebook.presto.hive.filesystem.ExtendedFileSystem;
-import com.facebook.presto.hive.metastore.Storage;
-import com.facebook.presto.hive.metastore.Table;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.exception.TableNotFoundException;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Optional;
-
-import static com.facebook.presto.hive.BucketFunctionType.HIVE_COMPATIBLE;
-import static com.facebook.presto.hive.HiveStorageFormat.PARQUET;
-import static com.facebook.presto.hive.HiveTestUtils.SESSION;
-import static com.facebook.presto.hive.NestedDirectoryPolicy.IGNORED;
-import static com.facebook.presto.hive.metastore.PrestoTableType.EXTERNAL_TABLE;
-import static com.facebook.presto.hive.metastore.StorageFormat.fromHiveStorageFormat;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertThrows;
-import static org.testng.Assert.assertTrue;
-
-public class TestHudiDirectoryLister
-{
- private Configuration hadoopConf;
-
- @BeforeClass
- private void setup()
- {
- hadoopConf = new Configuration();
- hadoopConf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
- hadoopConf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
- }
-
- @AfterClass(alwaysRun = true)
- private void tearDown()
- {
- hadoopConf = null;
- }
-
- @Test
- public void testDirectoryListerForHudiTable()
- throws IOException
- {
- Table mockTable = new Table(
- "schema",
- "hudi_non_part_cow",
- "user",
- EXTERNAL_TABLE,
- new Storage(fromHiveStorageFormat(PARQUET),
- getTableBasePath("hudi_non_part_cow"),
- Optional.of(new HiveBucketProperty(
- ImmutableList.of(),
- 1,
- ImmutableList.of(),
- HIVE_COMPATIBLE,
- Optional.empty())),
- false,
- ImmutableMap.of(),
- ImmutableMap.of()),
- ImmutableList.of(),
- ImmutableList.of(),
- ImmutableMap.of(),
- Optional.empty(),
- Optional.empty());
-
- HudiDirectoryLister directoryLister = new HudiDirectoryLister(hadoopConf, SESSION, mockTable);
- HoodieTableMetaClient metaClient = directoryLister.getMetaClient();
- assertEquals(metaClient.getBasePath(), mockTable.getStorage().getLocation());
- Path path = new Path(mockTable.getStorage().getLocation());
- ExtendedFileSystem fs = (ExtendedFileSystem) path.getFileSystem(hadoopConf);
- Iterator fileInfoIterator = directoryLister.list(fs, mockTable, path, new NamenodeStats(), new HiveDirectoryContext(IGNORED, false));
- assertTrue(fileInfoIterator.hasNext());
- HiveFileInfo fileInfo = fileInfoIterator.next();
- assertEquals(fileInfo.getPath().getName(), "d0875d00-483d-4e8b-bbbe-c520366c47a0-0_0-6-11_20211217110514527.parquet");
- }
-
- @Test
- public void testDirectoryListerForNonHudiTable()
- {
- Table mockTable = new Table(
- "schema",
- "non_hudi_table",
- "user",
- EXTERNAL_TABLE,
- new Storage(fromHiveStorageFormat(PARQUET),
- getTableBasePath("non_hudi_table"),
- Optional.of(new HiveBucketProperty(
- ImmutableList.of(),
- 1,
- ImmutableList.of(),
- HIVE_COMPATIBLE,
- Optional.empty())),
- false,
- ImmutableMap.of(),
- ImmutableMap.of()),
- ImmutableList.of(),
- ImmutableList.of(),
- ImmutableMap.of(),
- Optional.empty(),
- Optional.empty());
-
- assertThrows(TableNotFoundException.class, () -> new HudiDirectoryLister(hadoopConf, SESSION, mockTable));
- }
-
- private static String getTableBasePath(String tableName)
- {
- return TestHudiDirectoryLister.class.getClassLoader().getResource(tableName).toString();
- }
-}
diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/util/TestCustomSplitConversionUtils.java b/presto-hive/src/test/java/com/facebook/presto/hive/util/TestCustomSplitConversionUtils.java
index fb65585e50b96..9fef07716578b 100644
--- a/presto-hive/src/test/java/com/facebook/presto/hive/util/TestCustomSplitConversionUtils.java
+++ b/presto-hive/src/test/java/com/facebook/presto/hive/util/TestCustomSplitConversionUtils.java
@@ -15,7 +15,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
@@ -26,7 +25,6 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import static org.testng.Assert.assertEquals;
@@ -42,12 +40,11 @@ public class TestCustomSplitConversionUtils
public void testHudiRealtimeSplitConverterRoundTrip()
throws IOException
{
- List deltaLogPaths = Arrays.asList("test1", "test2", "test3");
- List deltaLogFiles = deltaLogPaths.stream().map(p -> new HoodieLogFile(new Path(p))).collect(Collectors.toList());
+ List expectedDeltaLogPaths = Arrays.asList("test1", "test2", "test3");
String expectedMaxCommitTime = "max_commit_time";
FileSplit baseSplit = new FileSplit(FILE_PATH, SPLIT_START_POS, SPLIT_LENGTH, SPLIT_HOSTS);
- FileSplit hudiSplit = new HoodieRealtimeFileSplit(baseSplit, BASE_PATH, deltaLogFiles, expectedMaxCommitTime, Option.empty());
+ FileSplit hudiSplit = new HoodieRealtimeFileSplit(baseSplit, BASE_PATH, expectedDeltaLogPaths, expectedMaxCommitTime, Option.empty());
// Test conversion of HudiSplit -> customSplitInfo
Map customSplitInfo = CustomSplitConversionUtils.extractCustomSplitInfo(hudiSplit);
@@ -60,7 +57,7 @@ public void testHudiRealtimeSplitConverterRoundTrip()
assertEquals(SPLIT_LENGTH, recreatedSplit.getLength());
assertEquals(SPLIT_HOSTS, recreatedSplit.getLocations());
assertEquals(BASE_PATH, recreatedSplit.getBasePath());
- assertEquals(deltaLogPaths, recreatedSplit.getDeltaLogPaths());
+ assertEquals(expectedDeltaLogPaths, recreatedSplit.getDeltaLogPaths());
assertEquals(expectedMaxCommitTime, recreatedSplit.getMaxCommitTime());
}
@@ -98,7 +95,6 @@ public void testHudiRealtimeBootstrapBaseFileSplitConverter()
throws IOException
{
List deltaLogPaths = Arrays.asList("test1", "test2", "test3");
- List deltaLogFiles = deltaLogPaths.stream().map(p -> new HoodieLogFile(new Path(p))).collect(Collectors.toList());
String maxCommitTime = "max_commit_time";
Path bootstrapSourceFilePath = new Path("/test/source/test.parquet");
@@ -108,7 +104,7 @@ public void testHudiRealtimeBootstrapBaseFileSplitConverter()
FileSplit baseSplit = new FileSplit(FILE_PATH, SPLIT_START_POS, SPLIT_LENGTH, SPLIT_HOSTS);
FileSplit bootstrapSourceSplit = new FileSplit(bootstrapSourceFilePath, bootstrapSourceSplitStartPos, bootstrapSourceSplitLength,
new String[0]);
- FileSplit hudiSplit = new RealtimeBootstrapBaseFileSplit(baseSplit, BASE_PATH, deltaLogFiles, maxCommitTime,
+ FileSplit hudiSplit = new RealtimeBootstrapBaseFileSplit(baseSplit, BASE_PATH, deltaLogPaths, maxCommitTime,
bootstrapSourceSplit);
// Test conversion of HudiSplit -> customSplitInfo
diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/util/TestHiveFileIterator.java b/presto-hive/src/test/java/com/facebook/presto/hive/util/TestHiveFileIterator.java
new file mode 100644
index 0000000000000..b1e332b43a758
--- /dev/null
+++ b/presto-hive/src/test/java/com/facebook/presto/hive/util/TestHiveFileIterator.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.hive.util;
+
+import com.facebook.presto.hive.HadoopDirectoryLister.HadoopFileInfoIterator;
+import com.facebook.presto.hive.NamenodeStats;
+import com.facebook.presto.hive.util.HiveFileIterator.ListDirectoryOperation;
+import com.google.common.collect.Iterators;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static com.facebook.presto.hive.NestedDirectoryPolicy.IGNORED;
+import static com.facebook.presto.hive.NestedDirectoryPolicy.RECURSE;
+import static com.google.common.io.Files.createTempDir;
+import static com.google.common.io.MoreFiles.deleteRecursively;
+import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
+import static org.testng.Assert.assertEquals;
+
+public class TestHiveFileIterator
+{
+ private static final String PATH_FILTER_MATCHED_PREFIX = "path_filter_test_file_";
+ private static final String PATH_FILTER_NOT_MATCHED_PREFIX = "path_filter_not_matched_";
+
+ private Configuration hadoopConf;
+ private ListDirectoryOperation listDirectoryOperation;
+
+ @BeforeClass
+ private void setup()
+ {
+ hadoopConf = new Configuration();
+ hadoopConf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
+ hadoopConf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
+ listDirectoryOperation = path ->
+ {
+ FileSystem fs = path.getFileSystem(hadoopConf);
+ return new HadoopFileInfoIterator(fs.listLocatedStatus(path));
+ };
+ }
+
+ @AfterClass(alwaysRun = true)
+ private void tearDown()
+ {
+ hadoopConf = null;
+ listDirectoryOperation = null;
+ }
+
+ @Test
+ public void testDefaultPathFilterNoRecursion()
+ throws IOException
+ {
+ // set up
+ File rootDir = createTempDir();
+ String basePath = rootDir.getAbsolutePath();
+ // create 8 files in root directory - 3 pathFilter matched and 5 non matched files.
+ createFiles(basePath, 3, true);
+ createFiles(basePath, 5, false);
+ Path rootPath = new Path("file://" + basePath + File.separator);
+ PathFilter pathFilter = path -> true;
+ HiveFileIterator hiveFileIterator = new HiveFileIterator(rootPath, listDirectoryOperation, new NamenodeStats(), IGNORED, pathFilter);
+
+ int actualCount = Iterators.size(hiveFileIterator);
+ assertEquals(actualCount, 8);
+
+ // cleanup
+ deleteTestDir(rootDir);
+ }
+
+ @Test
+ public void testDefaultPathFilterWithRecursion()
+ throws IOException
+ {
+ // set up
+ File rootDir = createTempDir();
+ String basePath = rootDir.getAbsolutePath();
+ // create 8 files in root directory - 3 pathFilter matched and 5 non matched files.
+ createFiles(basePath, 3, true);
+ createFiles(basePath, 5, false);
+ // create two directories
+ List subDirs = createDirs(basePath, 2);
+ // create 5 files in dir1 - 3 pathFilter matched and 2 non matched files.
+ String dir1 = subDirs.get(0).getAbsolutePath();
+ createFiles(dir1, 3, true);
+ createFiles(dir1, 2, false);
+ // create 7 files in dir2 - 3 pathFilter matched and 4 non matched files.
+ String dir2 = subDirs.get(1).getAbsolutePath();
+ createFiles(dir2, 3, true);
+ createFiles(dir2, 4, false);
+ Path rootPath = new Path("file://" + basePath + File.separator);
+ PathFilter pathFilter = path -> true;
+ HiveFileIterator hiveFileIterator = new HiveFileIterator(rootPath, listDirectoryOperation, new NamenodeStats(), RECURSE, pathFilter);
+
+ int actualCount = Iterators.size(hiveFileIterator);
+ assertEquals(actualCount, 20);
+
+ // cleanup
+ deleteTestDir(rootDir);
+ }
+
+ @Test
+ public void testPathFilterWithNoRecursion()
+ throws IOException
+ {
+ // set up
+ File rootDir = createTempDir();
+ String basePath = rootDir.getAbsolutePath();
+ // create 8 files in root directory - 3 pathFilter matched and 5 non matched files.
+ createFiles(basePath, 3, true);
+ createFiles(basePath, 5, false);
+ Path rootPath = new Path("file://" + basePath + File.separator);
+ PathFilter pathFilter = path -> path.getName().contains(PATH_FILTER_MATCHED_PREFIX);
+ HiveFileIterator hiveFileIterator = new HiveFileIterator(rootPath, listDirectoryOperation, new NamenodeStats(), IGNORED, pathFilter);
+
+ int actualCount = Iterators.size(hiveFileIterator);
+ assertEquals(actualCount, 3);
+
+ // cleanup
+ deleteTestDir(rootDir);
+ }
+
+ @Test
+ public void testPathFilterWithRecursion()
+ throws IOException
+ {
+ // set up
+ File rootDir = createTempDir();
+ String basePath = rootDir.getAbsolutePath();
+ // create 8 files in root directory - 3 pathFilter matched and 5 non matched files.
+ createFiles(basePath, 3, true);
+ createFiles(basePath, 5, false);
+ // create two directories
+ List subDirs = createDirs(basePath, 2);
+ // create 5 files in dir1 - 3 pathFilter matched and 2 non matched files.
+ String dir1 = subDirs.get(0).getAbsolutePath();
+ createFiles(dir1, 3, true);
+ createFiles(dir1, 2, false);
+ // create 7 files in dir2 - 3 pathFilter matched and 4 non matched files.
+ String dir2 = subDirs.get(1).getAbsolutePath();
+ createFiles(dir2, 3, true);
+ createFiles(dir2, 4, false);
+ Path rootPath = new Path("file://" + basePath + File.separator);
+ PathFilter pathFilter = path -> path.getName().contains(PATH_FILTER_MATCHED_PREFIX);
+ HiveFileIterator hiveFileIterator = new HiveFileIterator(rootPath, listDirectoryOperation, new NamenodeStats(), RECURSE, pathFilter);
+
+ int actualCount = Iterators.size(hiveFileIterator);
+ assertEquals(actualCount, 9);
+
+ // cleanup
+ deleteTestDir(rootDir);
+ }
+
+ private void deleteTestDir(File rootDir)
+ throws IOException
+ {
+ if (rootDir.exists()) {
+ deleteRecursively(rootDir.toPath(), ALLOW_INSECURE);
+ }
+ }
+
+ private void createFiles(String basePath, int numFiles, boolean matchPathFilter)
+ throws IOException
+ {
+ new File(basePath).mkdirs();
+ for (int i = 0; i < numFiles; i++) {
+ String fileName;
+ if (matchPathFilter) {
+ fileName = PATH_FILTER_MATCHED_PREFIX + i;
+ }
+ else {
+ fileName = PATH_FILTER_NOT_MATCHED_PREFIX + i;
+ }
+ new File(basePath + File.separator + fileName).createNewFile();
+ }
+ }
+
+ private List createDirs(String basePath, int numDirectories)
+ {
+ List directories = new ArrayList<>();
+ for (int i = 0; i < numDirectories; i++) {
+ String dirName = basePath + File.separator + PATH_FILTER_MATCHED_PREFIX + "dir_" + i;
+ File file = new File(dirName);
+ file.mkdirs();
+ directories.add(file);
+ }
+ return directories;
+ }
+}
diff --git a/presto-hive/src/test/resources/hudi_non_part_cow/.hoodie/20211217110514527.commit b/presto-hive/src/test/resources/hudi_non_part_cow/.hoodie/20211217110514527.commit
deleted file mode 100644
index f77eeb137f026..0000000000000
--- a/presto-hive/src/test/resources/hudi_non_part_cow/.hoodie/20211217110514527.commit
+++ /dev/null
@@ -1,50 +0,0 @@
-{
- "partitionToWriteStats" : {
- "" : [ {
- "fileId" : "d0875d00-483d-4e8b-bbbe-c520366c47a0-0",
- "path" : "d0875d00-483d-4e8b-bbbe-c520366c47a0-0_0-6-11_20211217110514527.parquet",
- "prevCommit" : "null",
- "numWrites" : 3,
- "numDeletes" : 0,
- "numUpdateWrites" : 0,
- "numInserts" : 3,
- "totalWriteBytes" : 436273,
- "totalWriteErrors" : 0,
- "tempPath" : null,
- "partitionPath" : "",
- "totalLogRecords" : 0,
- "totalLogFilesCompacted" : 0,
- "totalLogSizeCompacted" : 0,
- "totalUpdatedRecordsCompacted" : 0,
- "totalLogBlocks" : 0,
- "totalCorruptLogBlock" : 0,
- "totalRollbackBlocks" : 0,
- "fileSizeInBytes" : 436273,
- "minEventTime" : null,
- "maxEventTime" : null
- } ]
- },
- "compacted" : false,
- "extraMetadata" : {
- "schema" : "{\"type\":\"record\",\"name\":\"hudi_non_part_cow_record\",\"namespace\":\"hoodie.hudi_non_part_cow\",\"fields\":[{\"name\":\"rowId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"partitionId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"preComb\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"versionId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"toBeDeletedStr\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"intToLong\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"longToInt\",\"type\":[\"null\",\"long\"],\"default\":null}]}"
- },
- "operationType" : "INSERT",
- "writePartitionPaths" : [ "" ],
- "fileIdAndRelativePaths" : {
- "d0875d00-483d-4e8b-bbbe-c520366c47a0-0" : "d0875d00-483d-4e8b-bbbe-c520366c47a0-0_0-6-11_20211217110514527.parquet"
- },
- "totalRecordsDeleted" : 0,
- "totalLogRecordsCompacted" : 0,
- "totalLogFilesCompacted" : 0,
- "totalCompactedRecordsUpdated" : 0,
- "totalLogFilesSize" : 0,
- "totalScanTime" : 0,
- "totalCreateTime" : 1743,
- "totalUpsertTime" : 0,
- "minAndMaxEventTime" : {
- "Optional.empty" : {
- "val" : null,
- "present" : false
- }
- }
-}
\ No newline at end of file
diff --git a/presto-hive/src/test/resources/hudi_non_part_cow/.hoodie/20211217110514527.commit.requested b/presto-hive/src/test/resources/hudi_non_part_cow/.hoodie/20211217110514527.commit.requested
deleted file mode 100644
index e69de29bb2d1d..0000000000000
diff --git a/presto-hive/src/test/resources/hudi_non_part_cow/.hoodie/20211217110514527.inflight b/presto-hive/src/test/resources/hudi_non_part_cow/.hoodie/20211217110514527.inflight
deleted file mode 100644
index 6605bcaf9b36c..0000000000000
--- a/presto-hive/src/test/resources/hudi_non_part_cow/.hoodie/20211217110514527.inflight
+++ /dev/null
@@ -1,48 +0,0 @@
-{
- "partitionToWriteStats" : {
- "" : [ {
- "fileId" : "",
- "path" : null,
- "prevCommit" : "null",
- "numWrites" : 0,
- "numDeletes" : 0,
- "numUpdateWrites" : 0,
- "numInserts" : 3,
- "totalWriteBytes" : 0,
- "totalWriteErrors" : 0,
- "tempPath" : null,
- "partitionPath" : null,
- "totalLogRecords" : 0,
- "totalLogFilesCompacted" : 0,
- "totalLogSizeCompacted" : 0,
- "totalUpdatedRecordsCompacted" : 0,
- "totalLogBlocks" : 0,
- "totalCorruptLogBlock" : 0,
- "totalRollbackBlocks" : 0,
- "fileSizeInBytes" : 0,
- "minEventTime" : null,
- "maxEventTime" : null
- } ]
- },
- "compacted" : false,
- "extraMetadata" : { },
- "operationType" : "INSERT",
- "writePartitionPaths" : [ "" ],
- "fileIdAndRelativePaths" : {
- "" : null
- },
- "totalRecordsDeleted" : 0,
- "totalLogRecordsCompacted" : 0,
- "totalLogFilesCompacted" : 0,
- "totalCompactedRecordsUpdated" : 0,
- "totalLogFilesSize" : 0,
- "totalScanTime" : 0,
- "totalCreateTime" : 0,
- "totalUpsertTime" : 0,
- "minAndMaxEventTime" : {
- "Optional.empty" : {
- "val" : null,
- "present" : false
- }
- }
-}
\ No newline at end of file
diff --git a/presto-hive/src/test/resources/hudi_non_part_cow/.hoodie/hoodie.properties b/presto-hive/src/test/resources/hudi_non_part_cow/.hoodie/hoodie.properties
deleted file mode 100644
index 3d03fa7915c39..0000000000000
--- a/presto-hive/src/test/resources/hudi_non_part_cow/.hoodie/hoodie.properties
+++ /dev/null
@@ -1,14 +0,0 @@
-#Properties saved on Fri Dec 17 11:05:14 UTC 2021
-#Fri Dec 17 11:05:14 UTC 2021
-hoodie.table.precombine.field=preComb
-hoodie.table.partition.fields=
-hoodie.table.type=COPY_ON_WRITE
-hoodie.archivelog.folder=archived
-hoodie.populate.meta.fields=true
-hoodie.timeline.layout.version=1
-hoodie.table.version=3
-hoodie.table.recordkey.fields=rowId
-hoodie.table.base.file.format=PARQUET
-hoodie.table.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
-hoodie.table.name=hudi_non_part_cow
-hoodie.datasource.write.hive_style_partitioning=false
diff --git a/presto-hive/src/test/resources/hudi_non_part_cow/.hoodie_partition_metadata b/presto-hive/src/test/resources/hudi_non_part_cow/.hoodie_partition_metadata
deleted file mode 100644
index f2149eb6cd5a3..0000000000000
--- a/presto-hive/src/test/resources/hudi_non_part_cow/.hoodie_partition_metadata
+++ /dev/null
@@ -1,4 +0,0 @@
-#partition metadata
-#Fri Dec 17 11:05:23 UTC 2021
-commitTime=20211217110514527
-partitionDepth=0
diff --git a/presto-hive/src/test/resources/hudi_non_part_cow/d0875d00-483d-4e8b-bbbe-c520366c47a0-0_0-6-11_20211217110514527.parquet b/presto-hive/src/test/resources/hudi_non_part_cow/d0875d00-483d-4e8b-bbbe-c520366c47a0-0_0-6-11_20211217110514527.parquet
deleted file mode 100644
index 52de8719bf62d..0000000000000
Binary files a/presto-hive/src/test/resources/hudi_non_part_cow/d0875d00-483d-4e8b-bbbe-c520366c47a0-0_0-6-11_20211217110514527.parquet and /dev/null differ
diff --git a/presto-hive/src/test/resources/non_hudi_table/d0875d00-483d-4e8b-bbbe-c520366c47a0-0_0-6-11_20211217110514527.parquet b/presto-hive/src/test/resources/non_hudi_table/d0875d00-483d-4e8b-bbbe-c520366c47a0-0_0-6-11_20211217110514527.parquet
deleted file mode 100644
index 52de8719bf62d..0000000000000
Binary files a/presto-hive/src/test/resources/non_hudi_table/d0875d00-483d-4e8b-bbbe-c520366c47a0-0_0-6-11_20211217110514527.parquet and /dev/null differ