diff --git a/plugin/trino-hive/pom.xml b/plugin/trino-hive/pom.xml
index de7b9a8f5c22..6e92c3e008e0 100644
--- a/plugin/trino-hive/pom.xml
+++ b/plugin/trino-hive/pom.xml
@@ -31,6 +31,12 @@
io.trino
trino-parquet
+
+
+ org.apache.parquet
+ parquet-encoding
+
+
@@ -225,6 +231,12 @@
alluxio-shaded-client
+
+ org.apache.hudi
+ hudi-hadoop-mr
+ ${dep.hudi.version}
+
+
org.apache.thrift
libthrift
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java
index 8ffc11d53a10..cd573064d5be 100644
--- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java
@@ -15,6 +15,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
@@ -48,6 +50,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
@@ -60,6 +63,9 @@
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hudi.hadoop.HoodieParquetInputFormat;
+import org.apache.hudi.hadoop.HoodieROTablePathFilter;
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import java.io.BufferedReader;
import java.io.IOException;
@@ -181,6 +187,7 @@ public class BackgroundHiveSplitLoader
// * if you hold a read lock but not a write lock, you can do any of the above three operations, but you may
// see a series of operations involving two or more of the operations carried out half way.
private final ReadWriteLock taskExecutionLock = new ReentrantReadWriteLock();
+ private final Supplier hoodiePathFilterSupplier;
private HiveSplitSource hiveSplitSource;
private Stopwatch stopwatch;
@@ -226,6 +233,7 @@ public BackgroundHiveSplitLoader(
this.partitions = new ConcurrentLazyQueue<>(partitions);
this.hdfsContext = new HdfsContext(session);
this.validWriteIds = requireNonNull(validWriteIds, "validWriteIds is null");
+ this.hoodiePathFilterSupplier = Suppliers.memoize(HoodieROTablePathFilter::new);
}
@Override
@@ -378,6 +386,7 @@ private ListenableFuture loadPartition(HivePartitionMetadata partition)
InputFormat, ?> inputFormat = getInputFormat(configuration, schema, false);
FileSystem fs = hdfsEnvironment.getFileSystem(hdfsContext, path);
boolean s3SelectPushdownEnabled = shouldEnablePushdownForTable(session, table, path.toString(), partition.getPartition());
+ PathFilter pathFilter = isHudiParquetInputFormat(inputFormat) ? hoodiePathFilterSupplier.get() : path1 -> true;
// S3 Select pushdown works at the granularity of individual S3 objects,
// therefore we must not split files when it is enabled.
@@ -408,7 +417,7 @@ private ListenableFuture loadPartition(HivePartitionMetadata partition)
partition.getTableToPartitionMapping(),
getOnlyElement(parents),
targetPaths,
- splittable);
+ splittable, pathFilter);
if (manifestFileIterator.isPresent()) {
fileIterators.addLast(manifestFileIterator.get());
return COMPLETED_FUTURE;
@@ -469,7 +478,7 @@ private ListenableFuture loadPartition(HivePartitionMetadata partition)
// To support custom input formats, we want to call getSplits()
// on the input format to obtain file splits.
- if (shouldUseFileSplitsFromInputFormat(inputFormat)) {
+ if (!isHudiParquetInputFormat(inputFormat) && shouldUseFileSplitsFromInputFormat(inputFormat)) {
if (tableBucketInfo.isPresent()) {
throw new TrinoException(NOT_SUPPORTED, "Trino cannot read bucketed partition in an input format with UseFileSplitsFromInputFormat annotation: " + inputFormat.getClass().getSimpleName());
}
@@ -562,6 +571,7 @@ private ListenableFuture loadPartition(HivePartitionMetadata partition)
acidInfoBuilder.setOrcAcidVersionValidated(true); // no ACID; no further validation needed
readPaths = ImmutableList.of(path);
}
+
// Bucketed partitions are fully loaded immediately since all files must be loaded to determine the file to bucket mapping
if (tableBucketInfo.isPresent()) {
ListenableFuture lastResult = immediateVoidFuture(); // TODO document in addToQueue() that it is sufficient to hold on to last returned future
@@ -569,7 +579,7 @@ private ListenableFuture loadPartition(HivePartitionMetadata partition)
// list all files in the partition
List files = new ArrayList<>();
try {
- Iterators.addAll(files, new HiveFileIterator(table, readPath, fs, directoryLister, namenodeStats, FAIL, ignoreAbsentPartitions));
+ Iterators.addAll(files, new HiveFileIterator(table, readPath, fs, directoryLister, namenodeStats, FAIL, ignoreAbsentPartitions, pathFilter));
}
catch (HiveFileIterator.NestedDirectoryNotAllowedException e) {
// Fail here to be on the safe side. This seems to be the same as what Hive does
@@ -597,7 +607,7 @@ private ListenableFuture loadPartition(HivePartitionMetadata partition)
for (Path readPath : readPaths) {
Optional acidInfo = isFullAcid ? acidInfoBuilder.build() : Optional.empty();
- fileIterators.addLast(createInternalHiveSplitIterator(readPath, fs, splitFactory, splittable, acidInfo));
+ fileIterators.addLast(createInternalHiveSplitIterator(readPath, fs, splitFactory, splittable, acidInfo, pathFilter));
}
if (!fileStatusOriginalFiles.isEmpty()) {
@@ -674,13 +684,15 @@ Optional> buildManifestFileIterator(
TableToPartitionMapping tableToPartitionMapping,
Path parent,
List paths,
- boolean splittable)
+ boolean splittable,
+ PathFilter pathFilter)
throws IOException
{
FileSystem targetFilesystem = hdfsEnvironment.getFileSystem(hdfsContext, parent);
Map fileStatuses = new HashMap<>();
- HiveFileIterator fileStatusIterator = new HiveFileIterator(table, parent, targetFilesystem, directoryLister, namenodeStats, IGNORED, false);
+ pathFilter = path1 -> true;
+ HiveFileIterator fileStatusIterator = new HiveFileIterator(table, parent, targetFilesystem, directoryLister, namenodeStats, IGNORED, false, pathFilter);
fileStatusIterator.forEachRemaining(status -> fileStatuses.put(getPathWithoutSchemeAndAuthority(status.getPath()), status));
List locatedFileStatuses = new ArrayList<>();
@@ -757,6 +769,14 @@ private ListenableFuture addSplitsToSource(InputSplit[] targetSplits, Inte
return lastResult;
}
+ private static boolean isHudiParquetInputFormat(InputFormat, ?> inputFormat)
+ {
+ if (inputFormat instanceof HoodieParquetRealtimeInputFormat) {
+ return false;
+ }
+ return inputFormat instanceof HoodieParquetInputFormat;
+ }
+
private static boolean shouldUseFileSplitsFromInputFormat(InputFormat, ?> inputFormat)
{
return Arrays.stream(inputFormat.getClass().getAnnotations())
@@ -765,9 +785,9 @@ private static boolean shouldUseFileSplitsFromInputFormat(InputFormat, ?> inpu
.anyMatch(name -> name.equals("UseFileSplitsFromInputFormat"));
}
- private Iterator createInternalHiveSplitIterator(Path path, FileSystem fileSystem, InternalHiveSplitFactory splitFactory, boolean splittable, Optional acidInfo)
+ private Iterator createInternalHiveSplitIterator(Path path, FileSystem fileSystem, InternalHiveSplitFactory splitFactory, boolean splittable, Optional acidInfo, PathFilter pathFilter)
{
- return Streams.stream(new HiveFileIterator(table, path, fileSystem, directoryLister, namenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED, ignoreAbsentPartitions))
+ return Streams.stream(new HiveFileIterator(table, path, fileSystem, directoryLister, namenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED, ignoreAbsentPartitions, pathFilter))
.map(status -> splitFactory.createInternalHiveSplit(status, OptionalInt.empty(), splittable, acidInfo))
.filter(Optional::isPresent)
.map(Optional::get)
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/GenericHiveRecordCursorProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/GenericHiveRecordCursorProvider.java
index 65fc0c2795a1..9345503d2fb8 100644
--- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/GenericHiveRecordCursorProvider.java
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/GenericHiveRecordCursorProvider.java
@@ -30,6 +30,7 @@
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@@ -71,7 +72,8 @@ public Optional createRecordCursor(
List columns,
TupleDomain effectivePredicate,
TypeManager typeManager,
- boolean s3SelectPushdownEnabled)
+ boolean s3SelectPushdownEnabled,
+ final Map customSplitInfo)
{
configuration.setInt(LineRecordReader.MAX_LINE_LENGTH, textMaxLineLengthBytes);
@@ -98,7 +100,8 @@ public Optional createRecordCursor(
start,
length,
schema,
- readerColumns);
+ readerColumns,
+ customSplitInfo);
try {
return new GenericHiveRecordCursor<>(
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceProvider.java
index 8666dfdb67e4..b4e1a2a2a5bf 100644
--- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceProvider.java
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceProvider.java
@@ -195,7 +195,8 @@ public ConnectorPageSource createPageSource(
hiveSplit.isS3SelectPushdownEnabled(),
hiveSplit.getAcidInfo(),
originalFile,
- hiveTable.getTransaction());
+ hiveTable.getTransaction(),
+ hiveSplit.getCustomSplitInfo());
if (pageSource.isPresent()) {
ConnectorPageSource source = pageSource.get();
@@ -259,7 +260,8 @@ public static Optional createHivePageSource(
boolean s3SelectPushdownEnabled,
Optional acidInfo,
boolean originalFile,
- AcidTransaction transaction)
+ AcidTransaction transaction,
+ Map customSplitInfo)
{
if (effectivePredicate.isNone()) {
return Optional.of(new EmptyPageSource());
@@ -333,7 +335,8 @@ public static Optional createHivePageSource(
desiredColumns,
effectivePredicate,
typeManager,
- s3SelectPushdownEnabled);
+ s3SelectPushdownEnabled,
+ customSplitInfo);
if (readerWithProjections.isPresent()) {
RecordCursor delegate = readerWithProjections.get().getRecordCursor();
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveRecordCursorProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveRecordCursorProvider.java
index ca924cb27e48..b13f9dae9987 100644
--- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveRecordCursorProvider.java
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveRecordCursorProvider.java
@@ -21,6 +21,7 @@
import org.apache.hadoop.fs.Path;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@@ -39,7 +40,8 @@ Optional createRecordCursor(
List columns,
TupleDomain effectivePredicate,
TypeManager typeManager,
- boolean s3SelectPushdownEnabled);
+ boolean s3SelectPushdownEnabled,
+ Map customSplitInfo);
/**
* A wrapper class for
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplit.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplit.java
index 5bf3ae0d17ea..bc22c21625b5 100644
--- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplit.java
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplit.java
@@ -22,6 +22,7 @@
import io.trino.spi.connector.ConnectorSplit;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
@@ -55,6 +56,7 @@ public class HiveSplit
private final boolean s3SelectPushdownEnabled;
private final Optional acidInfo;
private final long splitNumber;
+ private final Map customSplitInfo;
@JsonCreator
public HiveSplit(
@@ -77,7 +79,8 @@ public HiveSplit(
@JsonProperty("bucketValidation") Optional bucketValidation,
@JsonProperty("s3SelectPushdownEnabled") boolean s3SelectPushdownEnabled,
@JsonProperty("acidInfo") Optional acidInfo,
- @JsonProperty("splitNumber") long splitNumber)
+ @JsonProperty("splitNumber") long splitNumber,
+ @JsonProperty("customSplitInfo") Map customSplitInfo)
{
checkArgument(start >= 0, "start must be positive");
checkArgument(length >= 0, "length must be positive");
@@ -115,6 +118,7 @@ public HiveSplit(
this.s3SelectPushdownEnabled = s3SelectPushdownEnabled;
this.acidInfo = acidInfo;
this.splitNumber = splitNumber;
+ this.customSplitInfo = ImmutableMap.copyOf(requireNonNull(customSplitInfo, "customSplitInfo is null"));
}
@JsonProperty
@@ -244,6 +248,11 @@ public long getSplitNumber()
return splitNumber;
}
+ public Map getCustomSplitInfo()
+ {
+ return customSplitInfo;
+ }
+
@Override
public Object getInfo()
{
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java
index 68c5758adcc8..683c2184178a 100644
--- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java
@@ -384,7 +384,8 @@ else if (maxSplitBytes * 2 >= remainingBlockBytes) {
internalSplit.getBucketValidation(),
internalSplit.isS3SelectPushdownEnabled(),
internalSplit.getAcidInfo(),
- numberOfProcessedSplits.getAndIncrement()));
+ numberOfProcessedSplits.getAndIncrement(),
+ internalSplit.getCustomSplitInfo()));
internalSplit.increaseStart(splitBytes);
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveSplit.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveSplit.java
index af4aa4509630..7607e5519db4 100644
--- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveSplit.java
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveSplit.java
@@ -14,6 +14,7 @@
package io.trino.plugin.hive;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import io.trino.plugin.hive.HiveSplit.BucketConversion;
import io.trino.plugin.hive.HiveSplit.BucketValidation;
import io.trino.spi.HostAddress;
@@ -22,6 +23,7 @@
import javax.annotation.concurrent.NotThreadSafe;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
@@ -60,6 +62,7 @@ public class InternalHiveSplit
private final Optional bucketValidation;
private final boolean s3SelectPushdownEnabled;
private final Optional acidInfo;
+ private final Map customSplitInfo;
private long start;
private int currentBlockIndex;
@@ -82,7 +85,8 @@ public InternalHiveSplit(
Optional bucketConversion,
Optional bucketValidation,
boolean s3SelectPushdownEnabled,
- Optional acidInfo)
+ Optional acidInfo,
+ Map customSplitInfo)
{
checkArgument(start >= 0, "start must be positive");
checkArgument(end >= 0, "length must be positive");
@@ -116,6 +120,7 @@ public InternalHiveSplit(
this.bucketValidation = bucketValidation;
this.s3SelectPushdownEnabled = s3SelectPushdownEnabled;
this.acidInfo = acidInfo;
+ this.customSplitInfo = ImmutableMap.copyOf(requireNonNull(customSplitInfo, "customSplitInfo is null"));
}
public String getPath()
@@ -198,6 +203,11 @@ public Optional getBucketValidation()
return bucketValidation;
}
+ public Map getCustomSplitInfo()
+ {
+ return customSplitInfo;
+ }
+
public InternalHiveBlock currentBlock()
{
checkState(!isDone(), "All blocks have been consumed");
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java
index cf312dd95cc9..b164ef0e1356 100644
--- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java
@@ -87,6 +87,7 @@
import static io.trino.plugin.hive.HiveSessionProperties.isUseParquetColumnNames;
import static io.trino.plugin.hive.parquet.ParquetColumnIOConverter.constructField;
import static io.trino.plugin.hive.util.HiveUtil.getDeserializerClassName;
+import static io.trino.plugin.hive.util.HiveUtil.shouldUseRecordReaderFromInputFormat;
import static io.trino.spi.type.BigintType.BIGINT;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
@@ -147,7 +148,7 @@ public Optional createPageSource(
boolean originalFile,
AcidTransaction transaction)
{
- if (!PARQUET_SERDE_CLASS_NAMES.contains(getDeserializerClassName(schema))) {
+ if (!PARQUET_SERDE_CLASS_NAMES.contains(getDeserializerClassName(schema)) || shouldUseRecordReaderFromInputFormat(configuration, schema)) {
return Optional.empty();
}
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectRecordCursorProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectRecordCursorProvider.java
index 09bd05549373..fa5dbccf5836 100644
--- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectRecordCursorProvider.java
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectRecordCursorProvider.java
@@ -31,6 +31,7 @@
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
@@ -67,7 +68,8 @@ public Optional createRecordCursor(
List columns,
TupleDomain effectivePredicate,
TypeManager typeManager,
- boolean s3SelectPushdownEnabled)
+ boolean s3SelectPushdownEnabled,
+ Map customSplitInfo)
{
if (!s3SelectPushdownEnabled) {
return Optional.empty();
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/CustomSplitConversionUtils.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/CustomSplitConversionUtils.java
new file mode 100644
index 000000000000..cdcb43089421
--- /dev/null
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/CustomSplitConversionUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hive.util;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.trino.spi.TrinoException;
+import org.apache.hadoop.mapred.FileSplit;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT;
+
+/**
+ * Utility class for both extracting customSplitInfo Map from a custom FileSplit and transforming the customSplitInfo back into a FileSplit.
+ */
+public class CustomSplitConversionUtils
+{
+ private static final List converters = ImmutableList.of(new HudiRealtimeSplitConverter());
+
+ private CustomSplitConversionUtils()
+ {
+ }
+
+ public static Map extractCustomSplitInfo(FileSplit split)
+ {
+ for (CustomSplitConverter converter : converters) {
+ Optional