Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions plugin/trino-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-parquet</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-encoding</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down Expand Up @@ -225,6 +231,12 @@
<artifactId>alluxio-shaded-client</artifactId>
</dependency>

<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-hadoop-mr</artifactId>
<version>${dep.hudi.version}</version>
</dependency>

<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
Expand Down Expand Up @@ -48,6 +50,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
Expand All @@ -60,6 +63,9 @@
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.HoodieROTablePathFilter;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;

import java.io.BufferedReader;
import java.io.IOException;
Expand Down Expand Up @@ -181,6 +187,7 @@ public class BackgroundHiveSplitLoader
// * if you hold a read lock but not a write lock, you can do any of the above three operations, but you may
// see a series of operations involving two or more of the operations carried out half way.
private final ReadWriteLock taskExecutionLock = new ReentrantReadWriteLock();
private final Supplier<HoodieROTablePathFilter> hoodiePathFilterSupplier;

private HiveSplitSource hiveSplitSource;
private Stopwatch stopwatch;
Expand Down Expand Up @@ -226,6 +233,7 @@ public BackgroundHiveSplitLoader(
this.partitions = new ConcurrentLazyQueue<>(partitions);
this.hdfsContext = new HdfsContext(session);
this.validWriteIds = requireNonNull(validWriteIds, "validWriteIds is null");
this.hoodiePathFilterSupplier = Suppliers.memoize(HoodieROTablePathFilter::new);
}

@Override
Expand Down Expand Up @@ -378,6 +386,7 @@ private ListenableFuture<Void> loadPartition(HivePartitionMetadata partition)
InputFormat<?, ?> inputFormat = getInputFormat(configuration, schema, false);
FileSystem fs = hdfsEnvironment.getFileSystem(hdfsContext, path);
boolean s3SelectPushdownEnabled = shouldEnablePushdownForTable(session, table, path.toString(), partition.getPartition());
PathFilter pathFilter = isHudiParquetInputFormat(inputFormat) ? hoodiePathFilterSupplier.get() : path1 -> true;

// S3 Select pushdown works at the granularity of individual S3 objects,
// therefore we must not split files when it is enabled.
Expand Down Expand Up @@ -408,7 +417,7 @@ private ListenableFuture<Void> loadPartition(HivePartitionMetadata partition)
partition.getTableToPartitionMapping(),
getOnlyElement(parents),
targetPaths,
splittable);
splittable, pathFilter);
if (manifestFileIterator.isPresent()) {
fileIterators.addLast(manifestFileIterator.get());
return COMPLETED_FUTURE;
Expand Down Expand Up @@ -469,7 +478,7 @@ private ListenableFuture<Void> loadPartition(HivePartitionMetadata partition)

// To support custom input formats, we want to call getSplits()
// on the input format to obtain file splits.
if (shouldUseFileSplitsFromInputFormat(inputFormat)) {
if (!isHudiParquetInputFormat(inputFormat) && shouldUseFileSplitsFromInputFormat(inputFormat)) {
if (tableBucketInfo.isPresent()) {
throw new TrinoException(NOT_SUPPORTED, "Trino cannot read bucketed partition in an input format with UseFileSplitsFromInputFormat annotation: " + inputFormat.getClass().getSimpleName());
}
Expand Down Expand Up @@ -562,14 +571,15 @@ private ListenableFuture<Void> loadPartition(HivePartitionMetadata partition)
acidInfoBuilder.setOrcAcidVersionValidated(true); // no ACID; no further validation needed
readPaths = ImmutableList.of(path);
}

// Bucketed partitions are fully loaded immediately since all files must be loaded to determine the file to bucket mapping
if (tableBucketInfo.isPresent()) {
ListenableFuture<Void> lastResult = immediateVoidFuture(); // TODO document in addToQueue() that it is sufficient to hold on to last returned future
for (Path readPath : readPaths) {
// list all files in the partition
List<LocatedFileStatus> files = new ArrayList<>();
try {
Iterators.addAll(files, new HiveFileIterator(table, readPath, fs, directoryLister, namenodeStats, FAIL, ignoreAbsentPartitions));
Iterators.addAll(files, new HiveFileIterator(table, readPath, fs, directoryLister, namenodeStats, FAIL, ignoreAbsentPartitions, pathFilter));
}
catch (HiveFileIterator.NestedDirectoryNotAllowedException e) {
// Fail here to be on the safe side. This seems to be the same as what Hive does
Expand Down Expand Up @@ -597,7 +607,7 @@ private ListenableFuture<Void> loadPartition(HivePartitionMetadata partition)

for (Path readPath : readPaths) {
Optional<AcidInfo> acidInfo = isFullAcid ? acidInfoBuilder.build() : Optional.empty();
fileIterators.addLast(createInternalHiveSplitIterator(readPath, fs, splitFactory, splittable, acidInfo));
fileIterators.addLast(createInternalHiveSplitIterator(readPath, fs, splitFactory, splittable, acidInfo, pathFilter));
}

if (!fileStatusOriginalFiles.isEmpty()) {
Expand Down Expand Up @@ -674,13 +684,15 @@ Optional<Iterator<InternalHiveSplit>> buildManifestFileIterator(
TableToPartitionMapping tableToPartitionMapping,
Path parent,
List<Path> paths,
boolean splittable)
boolean splittable,
PathFilter pathFilter)
throws IOException
{
FileSystem targetFilesystem = hdfsEnvironment.getFileSystem(hdfsContext, parent);

Map<Path, LocatedFileStatus> fileStatuses = new HashMap<>();
HiveFileIterator fileStatusIterator = new HiveFileIterator(table, parent, targetFilesystem, directoryLister, namenodeStats, IGNORED, false);
pathFilter = path1 -> true;
HiveFileIterator fileStatusIterator = new HiveFileIterator(table, parent, targetFilesystem, directoryLister, namenodeStats, IGNORED, false, pathFilter);
fileStatusIterator.forEachRemaining(status -> fileStatuses.put(getPathWithoutSchemeAndAuthority(status.getPath()), status));

List<LocatedFileStatus> locatedFileStatuses = new ArrayList<>();
Expand Down Expand Up @@ -757,6 +769,14 @@ private ListenableFuture<Void> addSplitsToSource(InputSplit[] targetSplits, Inte
return lastResult;
}

private static boolean isHudiParquetInputFormat(InputFormat<?, ?> inputFormat)
{
if (inputFormat instanceof HoodieParquetRealtimeInputFormat) {
return false;
}
return inputFormat instanceof HoodieParquetInputFormat;
}

private static boolean shouldUseFileSplitsFromInputFormat(InputFormat<?, ?> inputFormat)
{
return Arrays.stream(inputFormat.getClass().getAnnotations())
Expand All @@ -765,9 +785,9 @@ private static boolean shouldUseFileSplitsFromInputFormat(InputFormat<?, ?> inpu
.anyMatch(name -> name.equals("UseFileSplitsFromInputFormat"));
}

private Iterator<InternalHiveSplit> createInternalHiveSplitIterator(Path path, FileSystem fileSystem, InternalHiveSplitFactory splitFactory, boolean splittable, Optional<AcidInfo> acidInfo)
private Iterator<InternalHiveSplit> createInternalHiveSplitIterator(Path path, FileSystem fileSystem, InternalHiveSplitFactory splitFactory, boolean splittable, Optional<AcidInfo> acidInfo, PathFilter pathFilter)
{
return Streams.stream(new HiveFileIterator(table, path, fileSystem, directoryLister, namenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED, ignoreAbsentPartitions))
return Streams.stream(new HiveFileIterator(table, path, fileSystem, directoryLister, namenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED, ignoreAbsentPartitions, pathFilter))
.map(status -> splitFactory.createInternalHiveSplit(status, OptionalInt.empty(), splittable, acidInfo))
.filter(Optional::isPresent)
.map(Optional::get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

Expand Down Expand Up @@ -71,7 +72,8 @@ public Optional<ReaderRecordCursorWithProjections> createRecordCursor(
List<HiveColumnHandle> columns,
TupleDomain<HiveColumnHandle> effectivePredicate,
TypeManager typeManager,
boolean s3SelectPushdownEnabled)
boolean s3SelectPushdownEnabled,
final Map<String, String> customSplitInfo)
{
configuration.setInt(LineRecordReader.MAX_LINE_LENGTH, textMaxLineLengthBytes);

Expand All @@ -98,7 +100,8 @@ public Optional<ReaderRecordCursorWithProjections> createRecordCursor(
start,
length,
schema,
readerColumns);
readerColumns,
customSplitInfo);

try {
return new GenericHiveRecordCursor<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ public ConnectorPageSource createPageSource(
hiveSplit.isS3SelectPushdownEnabled(),
hiveSplit.getAcidInfo(),
originalFile,
hiveTable.getTransaction());
hiveTable.getTransaction(),
hiveSplit.getCustomSplitInfo());

if (pageSource.isPresent()) {
ConnectorPageSource source = pageSource.get();
Expand Down Expand Up @@ -259,7 +260,8 @@ public static Optional<ConnectorPageSource> createHivePageSource(
boolean s3SelectPushdownEnabled,
Optional<AcidInfo> acidInfo,
boolean originalFile,
AcidTransaction transaction)
AcidTransaction transaction,
Map<String, String> customSplitInfo)
{
if (effectivePredicate.isNone()) {
return Optional.of(new EmptyPageSource());
Expand Down Expand Up @@ -333,7 +335,8 @@ public static Optional<ConnectorPageSource> createHivePageSource(
desiredColumns,
effectivePredicate,
typeManager,
s3SelectPushdownEnabled);
s3SelectPushdownEnabled,
customSplitInfo);

if (readerWithProjections.isPresent()) {
RecordCursor delegate = readerWithProjections.get().getRecordCursor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.fs.Path;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

Expand All @@ -39,7 +40,8 @@ Optional<ReaderRecordCursorWithProjections> createRecordCursor(
List<HiveColumnHandle> columns,
TupleDomain<HiveColumnHandle> effectivePredicate,
TypeManager typeManager,
boolean s3SelectPushdownEnabled);
boolean s3SelectPushdownEnabled,
Map<String, String> customSplitInfo);

/**
* A wrapper class for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.spi.connector.ConnectorSplit;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
Expand Down Expand Up @@ -55,6 +56,7 @@ public class HiveSplit
private final boolean s3SelectPushdownEnabled;
private final Optional<AcidInfo> acidInfo;
private final long splitNumber;
private final Map<String, String> customSplitInfo;

@JsonCreator
public HiveSplit(
Expand All @@ -77,7 +79,8 @@ public HiveSplit(
@JsonProperty("bucketValidation") Optional<BucketValidation> bucketValidation,
@JsonProperty("s3SelectPushdownEnabled") boolean s3SelectPushdownEnabled,
@JsonProperty("acidInfo") Optional<AcidInfo> acidInfo,
@JsonProperty("splitNumber") long splitNumber)
@JsonProperty("splitNumber") long splitNumber,
@JsonProperty("customSplitInfo") Map<String, String> customSplitInfo)
{
checkArgument(start >= 0, "start must be positive");
checkArgument(length >= 0, "length must be positive");
Expand Down Expand Up @@ -115,6 +118,7 @@ public HiveSplit(
this.s3SelectPushdownEnabled = s3SelectPushdownEnabled;
this.acidInfo = acidInfo;
this.splitNumber = splitNumber;
this.customSplitInfo = ImmutableMap.copyOf(requireNonNull(customSplitInfo, "customSplitInfo is null"));
}

@JsonProperty
Expand Down Expand Up @@ -244,6 +248,11 @@ public long getSplitNumber()
return splitNumber;
}

public Map<String, String> getCustomSplitInfo()
{
return customSplitInfo;
}

@Override
public Object getInfo()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,8 @@ else if (maxSplitBytes * 2 >= remainingBlockBytes) {
internalSplit.getBucketValidation(),
internalSplit.isS3SelectPushdownEnabled(),
internalSplit.getAcidInfo(),
numberOfProcessedSplits.getAndIncrement()));
numberOfProcessedSplits.getAndIncrement(),
internalSplit.getCustomSplitInfo()));

internalSplit.increaseStart(splitBytes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.hive;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.plugin.hive.HiveSplit.BucketConversion;
import io.trino.plugin.hive.HiveSplit.BucketValidation;
import io.trino.spi.HostAddress;
Expand All @@ -22,6 +23,7 @@
import javax.annotation.concurrent.NotThreadSafe;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
Expand Down Expand Up @@ -60,6 +62,7 @@ public class InternalHiveSplit
private final Optional<BucketValidation> bucketValidation;
private final boolean s3SelectPushdownEnabled;
private final Optional<AcidInfo> acidInfo;
private final Map<String, String> customSplitInfo;

private long start;
private int currentBlockIndex;
Expand All @@ -82,7 +85,8 @@ public InternalHiveSplit(
Optional<BucketConversion> bucketConversion,
Optional<BucketValidation> bucketValidation,
boolean s3SelectPushdownEnabled,
Optional<AcidInfo> acidInfo)
Optional<AcidInfo> acidInfo,
Map<String, String> customSplitInfo)
{
checkArgument(start >= 0, "start must be positive");
checkArgument(end >= 0, "length must be positive");
Expand Down Expand Up @@ -116,6 +120,7 @@ public InternalHiveSplit(
this.bucketValidation = bucketValidation;
this.s3SelectPushdownEnabled = s3SelectPushdownEnabled;
this.acidInfo = acidInfo;
this.customSplitInfo = ImmutableMap.copyOf(requireNonNull(customSplitInfo, "customSplitInfo is null"));
}

public String getPath()
Expand Down Expand Up @@ -198,6 +203,11 @@ public Optional<BucketValidation> getBucketValidation()
return bucketValidation;
}

public Map<String, String> getCustomSplitInfo()
{
return customSplitInfo;
}

public InternalHiveBlock currentBlock()
{
checkState(!isDone(), "All blocks have been consumed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import static io.trino.plugin.hive.HiveSessionProperties.isUseParquetColumnNames;
import static io.trino.plugin.hive.parquet.ParquetColumnIOConverter.constructField;
import static io.trino.plugin.hive.util.HiveUtil.getDeserializerClassName;
import static io.trino.plugin.hive.util.HiveUtil.shouldUseRecordReaderFromInputFormat;
import static io.trino.spi.type.BigintType.BIGINT;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -147,7 +148,7 @@ public Optional<ReaderPageSource> createPageSource(
boolean originalFile,
AcidTransaction transaction)
{
if (!PARQUET_SERDE_CLASS_NAMES.contains(getDeserializerClassName(schema))) {
if (!PARQUET_SERDE_CLASS_NAMES.contains(getDeserializerClassName(schema)) || shouldUseRecordReaderFromInputFormat(configuration, schema)) {
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
Expand Down Expand Up @@ -67,7 +68,8 @@ public Optional<ReaderRecordCursorWithProjections> createRecordCursor(
List<HiveColumnHandle> columns,
TupleDomain<HiveColumnHandle> effectivePredicate,
TypeManager typeManager,
boolean s3SelectPushdownEnabled)
boolean s3SelectPushdownEnabled,
Map<String, String> customSplitInfo)
{
if (!s3SelectPushdownEnabled) {
return Optional.empty();
Expand Down
Loading