diff --git a/pom.xml b/pom.xml
index 6f5bdd83b94ec..939fbcb0c89c3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -71,9 +71,9 @@
0.11.0
30.0.1
2.3.1
- 0.14.0
- 1.20.5
- 3.4.1
+ 0.15.0
+ 1.18.3
+ 3.3.0
2.9.0
3.1.3
2.36.0
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HudiDirectoryLister.java b/presto-hive/src/main/java/com/facebook/presto/hive/HudiDirectoryLister.java
index f3eed3a9fe7a1..a77c49fc027fc 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/HudiDirectoryLister.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/HudiDirectoryLister.java
@@ -11,6 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package com.facebook.presto.hive;
import com.facebook.airlift.log.Logger;
@@ -30,7 +31,6 @@
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieTableType;
@@ -40,10 +40,14 @@
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.Optional;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import static com.facebook.presto.hive.HiveFileInfo.createHiveFileInfo;
@@ -51,6 +55,7 @@
import static com.facebook.presto.hive.HiveSessionProperties.isHudiMetadataEnabled;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT;
import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToHadoopFileStatus;
public class HudiDirectoryLister
implements DirectoryLister
@@ -78,7 +83,7 @@ public HudiDirectoryLister(Configuration conf, ConnectorSession session, Table t
actualConfig = ((CopyOnFirstWriteConfiguration) actualConfig).getConfig();
}
this.metaClient = HoodieTableMetaClient.builder()
- .setConf(actualConfig)
+ .setConf(new HadoopStorageConfiguration(actualConfig))
.setBasePath(table.getStorage().getLocation())
.build();
this.latestInstant = metaClient.getActiveTimeline()
@@ -91,7 +96,7 @@ public HudiDirectoryLister(Configuration conf, ConnectorSession session, Table t
.filterCompletedInstants()
.lastInstant()
.map(HoodieInstant::getTimestamp).orElseThrow(() -> new RuntimeException("No active instant found")));
- HoodieEngineContext engineContext = new HoodieLocalEngineContext(actualConfig);
+ HoodieEngineContext engineContext = new HoodieLocalEngineContext(metaClient.getStorageConf());
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
.enable(metadataEnabled)
.build();
@@ -142,9 +147,11 @@ public HudiFileInfoIterator(
String latestInstant,
boolean shouldUseMergedView)
{
- String partition = FSUtils.getRelativePartitionPath(new Path(tablePath), directory);
+ String partition = HadoopFSUtils.getRelativePartitionPath(new Path(tablePath), directory);
if (fileStatuses.isPresent()) {
- fileSystemView.addFilesToView(fileStatuses.get());
+ fileSystemView.addFilesToView(Arrays.stream(fileStatuses.get())
+ .map(HadoopFSUtils::convertToStoragePathInfo)
+ .collect(Collectors.toList()));
this.hoodieBaseFileIterator = fileSystemView.fetchLatestBaseFiles(partition).iterator();
}
else {
@@ -170,7 +177,7 @@ public boolean hasNext()
public HiveFileInfo next()
throws IOException
{
- FileStatus fileStatus = hoodieBaseFileIterator.next().getFileStatus();
+ FileStatus fileStatus = convertToHadoopFileStatus(hoodieBaseFileIterator.next().getPathInfo());
String[] name = {"localhost:" + DFS_DATANODE_DEFAULT_PORT};
String[] host = {"localhost"};
LocatedFileStatus hoodieFileStatus = new LocatedFileStatus(fileStatus,
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/util/HudiRealtimeBootstrapBaseFileSplitConverter.java b/presto-hive/src/main/java/com/facebook/presto/hive/util/HudiRealtimeBootstrapBaseFileSplitConverter.java
index 6b3f9493d78d9..c1976283baac2 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/util/HudiRealtimeBootstrapBaseFileSplitConverter.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/util/HudiRealtimeBootstrapBaseFileSplitConverter.java
@@ -11,6 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package com.facebook.presto.hive.util;
import com.google.common.collect.ImmutableMap;
@@ -19,6 +20,7 @@
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeBootstrapBaseFileSplit;
+import org.apache.hudi.storage.StoragePath;
import java.io.IOException;
import java.util.Arrays;
@@ -71,7 +73,7 @@ public Optional recreateFileSplitWithCustomInfo(FileSplit split, Map<
if (!isNullOrEmpty(customFileSplitClass) && HoodieRealtimeBootstrapBaseFileSplit.class.getName().equals(customFileSplitClass)) {
String deltaFilePaths = customSplitInfo.get(DELTA_FILE_PATHS_KEY);
List deltaLogPaths = isNullOrEmpty(deltaFilePaths) ? Collections.emptyList() : Arrays.asList(deltaFilePaths.split(","));
- List deltaLogFiles = deltaLogPaths.stream().map(p -> new HoodieLogFile(new Path(p))).collect(Collectors.toList());
+ List deltaLogFiles = deltaLogPaths.stream().map(p -> new HoodieLogFile(new StoragePath(p))).collect(Collectors.toList());
FileSplit bootstrapFileSplit = new FileSplit(
new Path(customSplitInfo.get(BOOTSTRAP_FILE_SPLIT_PATH)),
parseLong(customSplitInfo.get(BOOTSTRAP_FILE_SPLIT_START)),
diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/util/HudiRealtimeSplitConverter.java b/presto-hive/src/main/java/com/facebook/presto/hive/util/HudiRealtimeSplitConverter.java
index 31e0e337c3e80..62c67decb9882 100644
--- a/presto-hive/src/main/java/com/facebook/presto/hive/util/HudiRealtimeSplitConverter.java
+++ b/presto-hive/src/main/java/com/facebook/presto/hive/util/HudiRealtimeSplitConverter.java
@@ -11,15 +11,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package com.facebook.presto.hive.util;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
+import org.apache.hudi.storage.StoragePath;
import java.io.IOException;
import java.util.List;
@@ -67,7 +68,7 @@ public Optional recreateFileSplitWithCustomInfo(FileSplit split, Map<
if (HoodieRealtimeFileSplit.class.getName().equals(customSplitClass)) {
requireNonNull(customSplitInfo.get(HUDI_DELTA_FILEPATHS_KEY), "HUDI_DELTA_FILEPATHS_KEY is missing");
List deltaLogPaths = SPLITTER.splitToList(customSplitInfo.get(HUDI_DELTA_FILEPATHS_KEY));
- List deltaLogFiles = deltaLogPaths.stream().map(p -> new HoodieLogFile(new Path(p))).collect(Collectors.toList());
+ List deltaLogFiles = deltaLogPaths.stream().map(p -> new HoodieLogFile(new StoragePath(p))).collect(Collectors.toList());
return Optional.of(new HoodieRealtimeFileSplit(
split,
requireNonNull(customSplitInfo.get(HUDI_BASEPATH_KEY), "HUDI_BASEPATH_KEY is missing"),
diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/util/TestCustomSplitConversionUtils.java b/presto-hive/src/test/java/com/facebook/presto/hive/util/TestCustomSplitConversionUtils.java
index 2f333ac805a97..d186b4ca33914 100644
--- a/presto-hive/src/test/java/com/facebook/presto/hive/util/TestCustomSplitConversionUtils.java
+++ b/presto-hive/src/test/java/com/facebook/presto/hive/util/TestCustomSplitConversionUtils.java
@@ -11,6 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package com.facebook.presto.hive.util;
import com.google.common.collect.ImmutableList;
@@ -21,6 +22,7 @@
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeBootstrapBaseFileSplit;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
+import org.apache.hudi.storage.StoragePath;
import org.testng.annotations.Test;
import java.io.IOException;
@@ -44,7 +46,7 @@ public void testHudiRealtimeSplitConverterRoundTrip()
throws IOException
{
List deltaLogPaths = Arrays.asList("test1", "test2", "test3");
- List deltaLogFiles = deltaLogPaths.stream().map(p -> new HoodieLogFile(new Path(p))).collect(Collectors.toList());
+ List deltaLogFiles = deltaLogPaths.stream().map(p -> new HoodieLogFile(new StoragePath(p))).collect(Collectors.toList());
String expectedMaxCommitTime = "max_commit_time";
FileSplit baseSplit = new FileSplit(FILE_PATH, SPLIT_START_POS, SPLIT_LENGTH, SPLIT_HOSTS);
@@ -125,7 +127,7 @@ public void testHudiRealtimeBootstrapBaseFileSplitConverter()
throws IOException
{
List deltaLogPaths = Arrays.asList("test1", "test2", "test3");
- List deltaLogFiles = deltaLogPaths.stream().map(p -> new HoodieLogFile(new Path(p))).collect(Collectors.toList());
+ List deltaLogFiles = deltaLogPaths.stream().map(p -> new HoodieLogFile(new StoragePath(p))).collect(Collectors.toList());
String maxCommitTime = "max_commit_time";
Path bootstrapSourceFilePath = new Path("/test/source/test.parquet");
diff --git a/presto-hudi/pom.xml b/presto-hudi/pom.xml
index 6d8efe8128f0b..454b081bf65a8 100644
--- a/presto-hudi/pom.xml
+++ b/presto-hudi/pom.xml
@@ -237,6 +237,12 @@
units
provided
+
+
+ org.lz4
+ lz4-java
+ 1.8.0
+
diff --git a/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiFileSkippingManager.java b/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiFileSkippingManager.java
new file mode 100644
index 0000000000000..be946fde7c0c6
--- /dev/null
+++ b/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiFileSkippingManager.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.hudi;
+
+import com.facebook.airlift.log.Logger;
+import com.facebook.presto.common.predicate.Domain;
+import com.facebook.presto.common.predicate.Range;
+import com.facebook.presto.common.predicate.TupleDomain;
+import com.facebook.presto.common.predicate.ValueSet;
+import com.facebook.presto.common.type.Type;
+import com.facebook.presto.parquet.predicate.TupleDomainParquetPredicate;
+import com.facebook.presto.spi.ColumnHandle;
+import com.google.common.collect.ImmutableList;
+import io.airlift.slice.Slice;
+import io.airlift.slice.Slices;
+import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.hash.ColumnIndexID;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static com.facebook.presto.common.type.BigintType.BIGINT;
+import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
+import static com.facebook.presto.common.type.DateType.DATE;
+import static com.facebook.presto.common.type.DoubleType.DOUBLE;
+import static com.facebook.presto.common.type.IntegerType.INTEGER;
+import static com.facebook.presto.common.type.RealType.REAL;
+import static com.facebook.presto.common.type.SmallintType.SMALLINT;
+import static com.facebook.presto.common.type.TinyintType.TINYINT;
+import static com.facebook.presto.common.type.Varchars.isVarcharType;
+import static com.facebook.presto.parquet.predicate.PredicateUtils.isStatisticsOverflow;
+import static java.lang.Float.floatToRawIntBits;
+import static java.util.Objects.requireNonNull;
+
+public class HudiFileSkippingManager
+{
+ private static final Logger log = Logger.get(HudiFileSkippingManager.class);
+
+ private final Optional specifiedQueryInstant;
+ private final HoodieTableMetaClient metaClient;
+ private final HoodieTableMetadata metadataTable;
+
+ private final Map> allInputFileSlices;
+
+ public HudiFileSkippingManager(
+ List partitions,
+ String spillableDir,
+ HoodieEngineContext engineContext,
+ HoodieTableMetaClient metaClient,
+ Optional specifiedQueryInstant)
+ {
+ requireNonNull(partitions, "partitions is null");
+ requireNonNull(spillableDir, "spillableDir is null");
+ requireNonNull(engineContext, "engineContext is null");
+ this.specifiedQueryInstant = requireNonNull(specifiedQueryInstant, "specifiedQueryInstant is null");
+ this.metaClient = requireNonNull(metaClient, "metaClient is null");
+
+ HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().enable(true).build();
+ this.metadataTable = HoodieTableMetadata.create(engineContext, metaClient.getStorage(), metadataConfig, metaClient.getBasePathV2().toString(), true);
+ this.allInputFileSlices = prepareAllInputFileSlices(partitions, engineContext, metadataConfig, spillableDir);
+ }
+
+ private Map> prepareAllInputFileSlices(
+ List partitions,
+ HoodieEngineContext engineContext,
+ HoodieMetadataConfig metadataConfig,
+ String spillableDir)
+ {
+ long startTime = System.currentTimeMillis();
+ HoodieTimeline activeTimeline = metaClient.reloadActiveTimeline();
+ Optional latestInstant = activeTimeline.lastInstant().toJavaOptional();
+ // build system view.
+ SyncableFileSystemView fileSystemView = FileSystemViewManager.createViewManager(
+ engineContext,
+ FileSystemViewStorageConfig.newBuilder().withBaseStoreDir(spillableDir).build(),
+ HoodieCommonConfig.newBuilder().build(),
+ (e) -> metadataTable)
+ .getFileSystemView(metaClient);
+ Optional queryInstant = specifiedQueryInstant.isPresent() ?
+ specifiedQueryInstant : latestInstant.map(HoodieInstant::getTimestamp);
+
+ Map> allInputFileSlices = engineContext
+ .mapToPair(
+ partitions,
+ partitionPath -> Pair.of(
+ partitionPath,
+ getLatestFileSlices(partitionPath, fileSystemView, queryInstant)),
+ partitions.size());
+
+ long duration = System.currentTimeMillis() - startTime;
+ log.debug("prepare query files for table %s, spent: %d ms", metaClient.getTableConfig().getTableName(), duration);
+ return allInputFileSlices;
+ }
+
+ private List getLatestFileSlices(
+ String partitionPath,
+ SyncableFileSystemView fileSystemView,
+ Optional queryInstant)
+ {
+ return queryInstant
+ .map(instant ->
+ fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionPath, queryInstant.get()))
+ .orElse(fileSystemView.getLatestFileSlices(partitionPath))
+ .collect(Collectors.toList());
+ }
+
+ public Map> listQueryFiles(TupleDomain tupleDomain)
+ {
+ // do file skipping by MetadataTable
+ Map> candidateFileSlices = allInputFileSlices;
+ try {
+ if (!tupleDomain.isAll()) {
+ candidateFileSlices = lookupCandidateFilesInMetadataTable(candidateFileSlices, tupleDomain);
+ }
+ }
+ catch (Exception e) {
+ // Should not throw exception, just log this Exception.
+ log.warn(e, "failed to do data skipping for table: %s, fallback to all files scan", metaClient.getBasePathV2());
+ }
+ if (log.isDebugEnabled()) {
+ int candidateFileSize = candidateFileSlices.values().stream().mapToInt(List::size).sum();
+ int totalFiles = allInputFileSlices.values().stream().mapToInt(List::size).sum();
+ double skippingPercent = totalFiles == 0 ? 0.0d : (totalFiles - candidateFileSize) / (totalFiles + 0.0d);
+ log.debug("Total files: %s; candidate files after data skipping: %s; skipping percent %s",
+ totalFiles,
+ candidateFileSize,
+ skippingPercent);
+ }
+ return candidateFileSlices;
+ }
+
+ private Map> lookupCandidateFilesInMetadataTable(
+ Map> inputFileSlices,
+ TupleDomain tupleDomain)
+ {
+ // split regular column predicates
+ TupleDomain regularTupleDomain = HudiPredicates.from(tupleDomain).getRegularColumnPredicates();
+ TupleDomain regularColumnPredicates = regularTupleDomain.transform(HudiColumnHandle::getName);
+ if (regularColumnPredicates.isAll() || !regularColumnPredicates.getDomains().isPresent()) {
+ return inputFileSlices;
+ }
+ List regularColumns = new ArrayList<>(regularColumnPredicates.getDomains().get().keySet());
+ // get filter columns
+ List encodedTargetColumnNames = regularColumns
+ .stream()
+ .map(col -> new ColumnIndexID(col).asBase64EncodedString()).collect(Collectors.toList());
+ Map> statsByFileName = metadataTable.getRecordsByKeyPrefixes(
+ encodedTargetColumnNames,
+ HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, true)
+ .collectAsList()
+ .stream()
+ .filter(f -> f.getData().getColumnStatMetadata().isPresent())
+ .map(f -> f.getData().getColumnStatMetadata().get())
+ .collect(Collectors.groupingBy(HoodieMetadataColumnStats::getFileName));
+
+ // prune files.
+ return inputFileSlices
+ .entrySet()
+ .stream()
+ .collect(Collectors
+ .toMap(Map.Entry::getKey, entry -> entry
+ .getValue()
+ .stream()
+ .filter(fileSlice -> pruneFiles(fileSlice, statsByFileName, regularColumnPredicates, regularColumns))
+ .collect(Collectors.toList())));
+ }
+
+ private boolean pruneFiles(
+ FileSlice fileSlice,
+ Map> statsByFileName,
+ TupleDomain regularColumnPredicates,
+ List regularColumns)
+ {
+ String fileSliceName = fileSlice.getBaseFile().map(BaseFile::getFileName).orElse("");
+ // no stats found
+ if (!statsByFileName.containsKey(fileSliceName)) {
+ return true;
+ }
+ List stats = statsByFileName.get(fileSliceName);
+ return evaluateStatisticPredicate(regularColumnPredicates, stats, regularColumns);
+ }
+
+ private boolean evaluateStatisticPredicate(
+ TupleDomain regularColumnPredicates,
+ List stats,
+ List regularColumns)
+ {
+ if (regularColumnPredicates.isNone() || !regularColumnPredicates.getDomains().isPresent()) {
+ return true;
+ }
+ for (String regularColumn : regularColumns) {
+ Domain columnPredicate = regularColumnPredicates.getDomains().get().get(regularColumn);
+ Optional currentColumnStats = stats
+ .stream().filter(s -> s.getColumnName().equals(regularColumn)).findFirst();
+ if (currentColumnStats.isPresent()) {
+ Domain domain = getDomain(regularColumn, columnPredicate.getType(), currentColumnStats.get());
+ if (columnPredicate.intersect(domain).isNone()) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private static Domain getDomain(String colName, Type type, HoodieMetadataColumnStats statistics)
+ {
+ if (statistics == null) {
+ return Domain.all(type);
+ }
+ boolean hasNullValue = statistics.getNullCount() != 0L;
+ boolean hasNonNullValue = statistics.getValueCount() - statistics.getNullCount() > 0;
+ if (!hasNonNullValue || statistics.getMaxValue() == null || statistics.getMinValue() == null) {
+ return Domain.create(ValueSet.all(type), hasNullValue);
+ }
+ if (!(statistics.getMinValue() instanceof org.apache.hudi.org.apache.avro.generic.GenericRecord) ||
+ !(statistics.getMaxValue() instanceof org.apache.hudi.org.apache.avro.generic.GenericRecord)) {
+ return Domain.all(type);
+ }
+ return getDomain(colName, type, ((org.apache.hudi.org.apache.avro.generic.GenericRecord) statistics.getMinValue()).get(0),
+ ((org.apache.hudi.org.apache.avro.generic.GenericRecord) statistics.getMaxValue()).get(0), hasNullValue);
+ }
+
+ /**
+ * Get a domain for the ranges defined by each pair of elements from {@code minimums} and {@code maximums}.
+ * Both arrays must have the same length.
+ */
+ private static Domain getDomain(String colName, Type type, Object minimum, Object maximum, boolean hasNullValue)
+ {
+ try {
+ if (type.equals(BOOLEAN)) {
+ boolean hasTrueValue = (boolean) minimum || (boolean) maximum;
+ boolean hasFalseValue = !(boolean) minimum || !(boolean) maximum;
+ if (hasTrueValue && hasFalseValue) {
+ return Domain.all(type);
+ }
+ if (hasTrueValue) {
+ return Domain.create(ValueSet.of(type, true), hasNullValue);
+ }
+ if (hasFalseValue) {
+ return Domain.create(ValueSet.of(type, false), hasNullValue);
+ }
+ // No other case, since all null case is handled earlier.
+ }
+
+ if ((type.equals(BIGINT) || type.equals(TINYINT) || type.equals(SMALLINT) || type.equals(INTEGER) || type.equals(DATE))) {
+ long minValue = TupleDomainParquetPredicate.asLong(minimum);
+ long maxValue = TupleDomainParquetPredicate.asLong(maximum);
+ if (isStatisticsOverflow(type, minValue, maxValue)) {
+ return Domain.create(ValueSet.all(type), hasNullValue);
+ }
+ return ofMinMax(type, minValue, maxValue, hasNullValue);
+ }
+
+ if (type.equals(REAL)) {
+ Float minValue = (Float) minimum;
+ Float maxValue = (Float) maximum;
+ if (minValue.isNaN() || maxValue.isNaN()) {
+ return Domain.create(ValueSet.all(type), hasNullValue);
+ }
+ return ofMinMax(type, (long) floatToRawIntBits(minValue), (long) floatToRawIntBits(maxValue), hasNullValue);
+ }
+
+ if (type.equals(DOUBLE)) {
+ Double minValue = (Double) minimum;
+ Double maxValue = (Double) maximum;
+ if (minValue.isNaN() || maxValue.isNaN()) {
+ return Domain.create(ValueSet.all(type), hasNullValue);
+ }
+ return ofMinMax(type, minValue, maxValue, hasNullValue);
+ }
+
+ if (isVarcharType(type)) {
+ Slice min = Slices.utf8Slice((String) minimum);
+ Slice max = Slices.utf8Slice((String) maximum);
+ return ofMinMax(type, min, max, hasNullValue);
+ }
+ return Domain.create(ValueSet.all(type), hasNullValue);
+ }
+ catch (Exception e) {
+ log.warn("failed to create Domain for column: %s which type is: %s", colName, type.toString());
+ return Domain.create(ValueSet.all(type), hasNullValue);
+ }
+ }
+
+ private static Domain ofMinMax(Type type, Object min, Object max, boolean hasNullValue)
+ {
+ Range range = Range.range(type, min, true, max, true);
+ ValueSet vs = ValueSet.ofRanges(ImmutableList.of(range));
+ return Domain.create(vs, hasNullValue);
+ }
+}
diff --git a/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiPageSourceProvider.java b/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiPageSourceProvider.java
index a67174d8419b3..9da37f8b21c2b 100644
--- a/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiPageSourceProvider.java
+++ b/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiPageSourceProvider.java
@@ -15,7 +15,6 @@
package com.facebook.presto.hudi;
import com.facebook.presto.common.RuntimeStats;
-import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.hive.FileFormatDataSourceStats;
@@ -106,7 +105,7 @@ public ConnectorPageSource createPageSource(
baseFile.getStart(),
baseFile.getLength(),
dataColumns,
- TupleDomain.all(), // TODO: predicates
+ HudiPredicates.from(layout.getTupleDomain()).getRegularColumnPredicates(),
fileFormatDataSourceStats);
}
else if (tableType == HudiTableType.MOR) {
diff --git a/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiPartitionManager.java b/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiPartitionManager.java
index b7f5b6ff3127a..90bdbad1a3fd4 100644
--- a/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiPartitionManager.java
+++ b/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiPartitionManager.java
@@ -14,9 +14,12 @@
package com.facebook.presto.hudi;
+import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.common.predicate.NullableValue;
import com.facebook.presto.common.predicate.TupleDomain;
+import com.facebook.presto.common.predicate.ValueSet;
+import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.hive.PartitionNameWithVersion;
@@ -26,30 +29,50 @@
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import io.airlift.slice.Slice;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import javax.inject.Inject;
+import java.sql.Timestamp;
+import java.time.LocalDate;
import java.time.ZoneId;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TimeZone;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import static com.facebook.presto.common.Utils.checkArgument;
import static com.facebook.presto.hive.HiveUtil.parsePartitionValue;
import static com.facebook.presto.hive.metastore.MetastoreUtil.extractPartitionValues;
+import static com.facebook.presto.hudi.HudiErrorCode.HUDI_INVALID_PARTITION_VALUE;
import static com.facebook.presto.hudi.HudiMetadata.fromPartitionColumns;
import static com.facebook.presto.hudi.HudiMetadata.toMetastoreContext;
+import static com.facebook.presto.hudi.HudiSessionProperties.isHudiMetadataTableEnabled;
+import static io.airlift.slice.Slices.utf8Slice;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
public class HudiPartitionManager
{
+ private static final Logger log = Logger.get(HudiPartitionManager.class);
+ private static final Pattern HIVE_PARTITION_NAME_PATTERN = Pattern.compile("([^/]+)=([^/]+)");
+
private final TypeManager typeManager;
@Inject
@@ -61,6 +84,7 @@ public HudiPartitionManager(TypeManager typeManager)
public List getEffectivePartitions(
ConnectorSession connectorSession,
ExtendedHiveMetastore metastore,
+ HoodieTableMetaClient metaClient,
SchemaTableName schemaTableName,
TupleDomain constraintSummary)
{
@@ -72,6 +96,226 @@ public List getEffectivePartitions(
return ImmutableList.of("");
}
+ return isHudiMetadataTableEnabled(connectorSession) ?
+ prunePartitionByMetaDataTable(metaClient, partitionColumns, constraintSummary) :
+ prunePartitionByMetaStore(metastore, schemaTableName, constraintSummary, partitionColumns, metastoreContext);
+ }
+
+ private List prunePartitionByMetaDataTable(
+ HoodieTableMetaClient metaClient,
+ List partitionColumns,
+ TupleDomain tupleDomain)
+ {
+ // non-partition table
+ if (partitionColumns.isEmpty()) {
+ return ImmutableList.of("");
+ }
+
+ HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(metaClient.getStorageConf());
+ HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().enable(true).build();
+
+ // Load all the partition path from the basePath
+ List allPartitions = FSUtils.getAllPartitionPaths(
+ engineContext,
+ metaClient.getStorage(),
+ metadataConfig,
+ metaClient.getBasePathV2().toString());
+
+ // Extract partition columns predicate
+ TupleDomain partitionPredicate = tupleDomain.transform(hudiColumnHandle -> {
+ if (((HudiColumnHandle) hudiColumnHandle).getColumnType() != HudiColumnHandle.ColumnType.PARTITION_KEY) {
+ return null;
+ }
+ return ((HudiColumnHandle) hudiColumnHandle).getName();
+ });
+
+ if (partitionPredicate.isAll()) {
+ return allPartitions;
+ }
+
+ if (partitionPredicate.isNone()) {
+ return ImmutableList.of("");
+ }
+
+ List partitionColumnHandles = fromPartitionColumns(partitionColumns);
+
+ List matchedPartitionPaths = prunePartitions(
+ partitionPredicate,
+ partitionColumnHandles,
+ getPartitions(
+ partitionColumns.stream().map(Column::getName).collect(Collectors.toList()),
+ allPartitions));
+ log.debug(String.format("Total partition size is %s, after partition prune size is %s.",
+ allPartitions.size(), matchedPartitionPaths.size()));
+ return matchedPartitionPaths;
+ }
+
+ /**
+ * Returns the partition path key and values as a list of map.
+ * For example:
+ * partition keys: [p1, p2, p3],
+ * partition paths:
+ * p1=val1/p2=val2/p3=val3 (hive style partition)
+ * p1=val4/p2=val5/p3=val6 (hive style partition)
+ * return values {p1=val1/p2=val2/p3=val3 -> {p1 -> val1, p2 -> value2, p3 -> value3}},
+ * {p1=val4/p2=val5/p3=val6 -> {p1 -> val4, p2 -> value5, p3 -> value6}}
+ *
+ * @param partitionKey The partition key list
+ * @param partitionPaths partition path list
+ */
+ public static Map> getPartitions(List partitionKey, List partitionPaths)
+ {
+ Map> result = new HashMap<>();
+ if (partitionPaths.isEmpty() || partitionKey.isEmpty()) {
+ return result;
+ }
+ // try to infer hive style
+ boolean hiveStylePartition = HIVE_PARTITION_NAME_PATTERN.matcher(partitionPaths.get(0).split(Path.SEPARATOR)[0]).matches();
+ for (String partitionPath : partitionPaths) {
+ String[] pathParts = partitionPath.split(Path.SEPARATOR);
+ Map partitionMapping = new LinkedHashMap<>();
+ if (hiveStylePartition) {
+ Arrays.stream(pathParts).forEach(p -> {
+ String[] keyValue = p.split("=");
+ if (keyValue.length == 2) {
+ partitionMapping.put(keyValue[0], keyValue[1]);
+ }
+ });
+ }
+ else {
+ for (int i = 0; i < partitionKey.size(); i++) {
+ partitionMapping.put(partitionKey.get(i), pathParts[i]);
+ }
+ }
+ result.put(partitionPath, partitionMapping);
+ }
+ return result;
+ }
+
+ private List prunePartitions(
+ TupleDomain partitionPredicate,
+ List partitionColumnHandles,
+ Map> candidatePartitionPaths)
+ {
+ return candidatePartitionPaths.entrySet().stream().filter(f -> {
+ Map partitionMapping = f.getValue();
+ return partitionMapping
+ .entrySet()
+ .stream()
+ .allMatch(p -> evaluatePartitionPredicate(partitionPredicate, partitionColumnHandles, p.getValue(), p.getKey()));
+ }).map(Map.Entry::getKey).collect(Collectors.toList());
+ }
+
+ private boolean evaluatePartitionPredicate(
+ TupleDomain partitionPredicate,
+ List partitionColumnHandles,
+ String partitionPathValue,
+ String partitionName)
+ {
+ Optional columnHandleOpt =
+ partitionColumnHandles.stream().filter(f -> f.getName().equals(partitionName)).findFirst();
+ if (columnHandleOpt.isPresent()) {
+ Domain domain = getDomain(columnHandleOpt.get(), partitionPathValue);
+ if (!partitionPredicate.getDomains().isPresent()) {
+ return true;
+ }
+ Domain columnPredicate = partitionPredicate.getDomains().get().get(partitionName);
+ // no predicate on current partitionName
+ if (columnPredicate == null) {
+ return true;
+ }
+
+ // For null partition, hive will produce a default value for current partition.
+ if (partitionPathValue.equals("default")) {
+ return true;
+ }
+ return !columnPredicate.intersect(domain).isNone();
+ }
+ else {
+ // Should not happen
+ throw new IllegalArgumentException(String.format("Mismatched partition information found,"
+ + " partition: %s from Hudi metadataTable is not included by the partitions from HMS: %s",
+ partitionName, partitionColumnHandles.stream().map(HudiColumnHandle::getName).collect(Collectors.joining(","))));
+ }
+ }
+
+ private Domain getDomain(HudiColumnHandle columnHandle, String partitionValue)
+ {
+ Type type = columnHandle.getHiveType().getType(typeManager);
+ if (partitionValue == null) {
+ return Domain.onlyNull(type);
+ }
+ try {
+ switch (columnHandle.getHiveType().getTypeSignature().getBase()) {
+ case StandardTypes.TINYINT:
+ case StandardTypes.SMALLINT:
+ case StandardTypes.INTEGER:
+ case StandardTypes.BIGINT:
+ Long intValue = Long.parseLong(partitionValue);
+ return Domain.create(ValueSet.of(type, intValue), false);
+ case StandardTypes.REAL:
+ Long realValue = (long) Float.floatToRawIntBits(Float.parseFloat(partitionValue));
+ return Domain.create(ValueSet.of(type, realValue), false);
+ case StandardTypes.DOUBLE:
+ Long doubleValue = Double.doubleToRawLongBits(Double.parseDouble(partitionValue));
+ return Domain.create(ValueSet.of(type, doubleValue), false);
+ case StandardTypes.VARCHAR:
+ case StandardTypes.VARBINARY:
+ Slice sliceValue = utf8Slice(partitionValue);
+ return Domain.create(ValueSet.of(type, sliceValue), false);
+ case StandardTypes.DATE:
+ Long dateValue = LocalDate.parse(partitionValue, java.time.format.DateTimeFormatter.ISO_LOCAL_DATE).toEpochDay();
+ return Domain.create(ValueSet.of(type, dateValue), false);
+ case StandardTypes.TIMESTAMP:
+ Long timestampValue = Timestamp.valueOf(partitionValue).getTime();
+ return Domain.create(ValueSet.of(type, timestampValue), false);
+ case StandardTypes.BOOLEAN:
+ Boolean booleanValue = Boolean.valueOf(partitionValue);
+ return Domain.create(ValueSet.of(type, booleanValue), false);
+ default:
+ throw new PrestoException(HUDI_INVALID_PARTITION_VALUE, String.format(
+ "partition data type '%s' is unsupported for partition key: %s",
+ columnHandle.getHiveType(),
+ columnHandle.getName()));
+ }
+ }
+ catch (IllegalArgumentException e) {
+ throw new PrestoException(HUDI_INVALID_PARTITION_VALUE, String.format(
+ "Invalid partition value '%s' for %s partition key: %s",
+ partitionValue,
+ type.getDisplayName(),
+ columnHandle.getName()));
+ }
+ }
+
+ public static List parsePartitionValues(String partitionName, Optional> partitionColumnNames)
+ {
+ boolean hiveStylePartition = HIVE_PARTITION_NAME_PATTERN.matcher(partitionName).matches();
+ if (!hiveStylePartition) {
+ if (!partitionColumnNames.isPresent() || partitionColumnNames.get().size() == 1) {
+ return ImmutableList.of(partitionName);
+ }
+ else {
+ String[] partitionValues = partitionName.split(Path.SEPARATOR);
+ checkArgument(
+ partitionValues.length == partitionColumnNames.get().size(),
+ "Invalid partition spec: {partitionName: %s, partitionColumnNames: %s}",
+ partitionName,
+ partitionColumnNames.get());
+ return Arrays.asList(partitionValues);
+ }
+ }
+
+ return extractPartitionValues(partitionName, partitionColumnNames);
+ }
+
+ private List prunePartitionByMetaStore(
+ ExtendedHiveMetastore metastore,
+ SchemaTableName schemaTableName,
+ TupleDomain constraintSummary,
+ List partitionColumns,
+ MetastoreContext metastoreContext)
+ {
Map partitionPredicate = new HashMap<>();
Map domains = constraintSummary.getDomains().orElseGet(ImmutableMap::of);
List hudiColumnHandles = fromPartitionColumns(partitionColumns);
diff --git a/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiPredicates.java b/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiPredicates.java
new file mode 100644
index 0000000000000..5c80422fff709
--- /dev/null
+++ b/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiPredicates.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.hudi;
+
+import com.facebook.presto.common.predicate.Domain;
+import com.facebook.presto.common.predicate.TupleDomain;
+import com.facebook.presto.spi.ColumnHandle;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+public class HudiPredicates
+{
+ private final TupleDomain regularColumnPredicates;
+
+ public static HudiPredicates from(TupleDomain predicate)
+ {
+ Map partitionColumnPredicates = new HashMap<>();
+ Map regularColumnPredicates = new HashMap<>();
+
+ Optional