Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.IOException;
import java.util.Deque;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -71,7 +72,7 @@ public class BackgroundHiveSplitLoader
public BackgroundHiveSplitLoader(
Table table,
Iterable<HivePartitionMetadata> partitions,
Optional<Domain> pathDomain,
Map<Integer, Domain> infoColumnConstraints,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

infoColumn -> metadataColumn

Optional<BucketSplitInfo> tableBucketInfo,
ConnectorSession session,
HdfsEnvironment hdfsEnvironment,
Expand All @@ -87,7 +88,7 @@ public BackgroundHiveSplitLoader(
checkArgument(loaderConcurrency > 0, "loaderConcurrency must be > 0, found: %s", loaderConcurrency);
this.executor = requireNonNull(executor, "executor is null");
this.partitions = new ConcurrentLazyQueue<>(requireNonNull(partitions, "partitions is null"));
this.delegatingPartitionLoader = new DelegatingPartitionLoader(table, pathDomain, tableBucketInfo, session, hdfsEnvironment, namenodeStats, directoryLister, fileIterators, recursiveDirWalkerEnabled, schedulerUsesHostAddresses, partialAggregationsPushedDown);
this.delegatingPartitionLoader = new DelegatingPartitionLoader(table, infoColumnConstraints, tableBucketInfo, session, hdfsEnvironment, namenodeStats, directoryLister, fileIterators, recursiveDirWalkerEnabled, schedulerUsesHostAddresses, partialAggregationsPushedDown);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class DelegatingPartitionLoader

public DelegatingPartitionLoader(
Table table,
Optional<Domain> pathDomain,
Map<Integer, Domain> infoColumnConstraints,
Optional<BucketSplitInfo> tableBucketInfo,
ConnectorSession session,
HdfsEnvironment hdfsEnvironment,
Expand All @@ -53,7 +53,7 @@ public DelegatingPartitionLoader(
this.session = requireNonNull(session, "session is null");
this.storagePartitionLoader = new StoragePartitionLoader(
table,
pathDomain,
infoColumnConstraints,
tableBucketInfo,
session,
hdfsEnvironment,
Expand All @@ -63,7 +63,7 @@ public DelegatingPartitionLoader(
recursiveDirWalkerEnabled,
schedulerUsesHostAddresses,
partialAggregationsPushedDown);
this.manifestPartitionLoader = new ManifestPartitionLoader(table, pathDomain, session, hdfsEnvironment, namenodeStats, directoryLister, recursiveDirWalkerEnabled, schedulerUsesHostAddresses);
this.manifestPartitionLoader = new ManifestPartitionLoader(table, infoColumnConstraints, session, hdfsEnvironment, namenodeStats, directoryLister, recursiveDirWalkerEnabled, schedulerUsesHostAddresses);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,4 +277,10 @@ public static boolean isFileModifiedTimeColumnHandle(HiveColumnHandle column)
{
return column.getHiveColumnIndex() == FILE_MODIFIED_TIME_COLUMN_INDEX;
}

public static boolean isInfoColumnHandle(HiveColumnHandle column)
{
return isPathColumnHandle(column) || isFileSizeColumnHandle(column)
|| isFileModifiedTimeColumnHandle(column);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,13 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.facebook.presto.common.type.Decimals.encodeScaledValue;
import static com.facebook.presto.common.type.Decimals.isShortDecimal;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.hive.HiveColumnHandle.isInfoColumnHandle;
import static com.facebook.presto.hive.HiveColumnHandle.isPathColumnHandle;
import static com.facebook.presto.hive.HiveCommonSessionProperties.isUseParquetColumnNames;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_METADATA;
Expand Down Expand Up @@ -303,7 +305,7 @@ public ConnectorSplitSource getSplits(
HiveSplitLoader hiveSplitLoader = new BackgroundHiveSplitLoader(
table,
hivePartitions,
getPathDomain(layout.getDomainPredicate(), layout.getPredicateColumns()),
getInfoColumnConstraints(layout.getDomainPredicate(), layout.getPredicateColumns()),
createBucketSplitInfo(bucketHandle, bucketFilter),
session,
hdfsEnvironment,
Expand Down Expand Up @@ -434,6 +436,21 @@ private static Optional<Domain> getPathDomain(TupleDomain<Subfield> domainPredic
.map(Map.Entry::getValue);
}

private static Map<Integer, Domain> getInfoColumnConstraints(TupleDomain<Subfield> domainPredicate, Map<String, HiveColumnHandle> predicateColumns)
{
checkArgument(!domainPredicate.isNone(), "Unexpected domain predicate: none");

if (domainPredicate.getDomains().isPresent()) {
return domainPredicate.getDomains().get()
.entrySet()
.stream()
.filter(kv -> isInfoColumnHandle(predicateColumns.get(kv.getKey().getRootName())))
.collect(Collectors.toMap(e -> predicateColumns.get(e.getKey().getRootName()).getHiveColumnIndex(), e -> e.getValue()));
}

return ImmutableMap.of();
}

@Managed
@Nested
public CounterStat getHighMemorySplitSource()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class ManifestPartitionLoader
private static final String[] BLOCK_LOCATION_HOSTS = {"localhost"};

private final Table table;
private final Optional<Domain> pathDomain;
Map<Integer, Domain> infoColumnConstraints;
private final ConnectorSession session;
private final HdfsEnvironment hdfsEnvironment;
private final HdfsContext hdfsContext;
Expand All @@ -78,7 +78,7 @@ public class ManifestPartitionLoader

public ManifestPartitionLoader(
Table table,
Optional<Domain> pathDomain,
Map<Integer, Domain> infoColumnConstraints,
ConnectorSession session,
HdfsEnvironment hdfsEnvironment,
NamenodeStats namenodeStats,
Expand All @@ -87,7 +87,7 @@ public ManifestPartitionLoader(
boolean schedulerUsesHostAddresses)
{
this.table = requireNonNull(table, "table is null");
this.pathDomain = requireNonNull(pathDomain, "pathDomain is null");
this.infoColumnConstraints = requireNonNull(infoColumnConstraints, "pathDomain is null");
this.session = requireNonNull(session, "session is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.hdfsContext = new HdfsContext(session, table.getDatabaseName(), table.getTableName(), table.getStorage().getLocation(), false);
Expand Down Expand Up @@ -133,7 +133,7 @@ public ListenableFuture<?> loadPartition(HivePartitionMetadata partition, HiveSp
}
}

InternalHiveSplitFactory splitFactory = createInternalHiveSplitFactory(table, partition, session, pathDomain, hdfsEnvironment, hdfsContext, schedulerUsesHostAddresses);
InternalHiveSplitFactory splitFactory = createInternalHiveSplitFactory(table, partition, session, infoColumnConstraints, hdfsEnvironment, hdfsContext, schedulerUsesHostAddresses);

return hiveSplitSource.addToQueue(fileListBuilder.build().stream()
.map(status -> splitFactory.createInternalHiveSplit(status, true))
Expand All @@ -146,7 +146,7 @@ private InternalHiveSplitFactory createInternalHiveSplitFactory(
Table table,
HivePartitionMetadata partition,
ConnectorSession session,
Optional<Domain> pathDomain,
Map<Integer, Domain> infoColumnConstraints,
HdfsEnvironment hdfsEnvironment,
HdfsContext hdfsContext,
boolean schedulerUsesHostAddresses)
Expand All @@ -167,7 +167,7 @@ private InternalHiveSplitFactory createInternalHiveSplitFactory(
return new InternalHiveSplitFactory(
fileSystem,
inputFormat,
pathDomain,
infoColumnConstraints,
getNodeSelectionStrategy(session),
getMaxInitialSplitSize(session),
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
Expand Down Expand Up @@ -99,7 +100,7 @@ public class StoragePartitionLoader
private static final ListenableFuture<?> COMPLETED_FUTURE = immediateFuture(null);

private final Table table;
private final Optional<Domain> pathDomain;
Map<Integer, Domain> infoColumnConstraints;
private final Optional<BucketSplitInfo> tableBucketInfo;
private final HdfsEnvironment hdfsEnvironment;
private final HdfsContext hdfsContext;
Expand All @@ -113,7 +114,7 @@ public class StoragePartitionLoader

public StoragePartitionLoader(
Table table,
Optional<Domain> pathDomain,
Map<Integer, Domain> infoColumnConstraints,
Optional<BucketSplitInfo> tableBucketInfo,
ConnectorSession session,
HdfsEnvironment hdfsEnvironment,
Expand All @@ -125,7 +126,7 @@ public StoragePartitionLoader(
boolean partialAggregationsPushedDown)
{
this.table = requireNonNull(table, "table is null");
this.pathDomain = requireNonNull(pathDomain, "pathDomain is null");
this.infoColumnConstraints = requireNonNull(infoColumnConstraints, "infoColumnConstraints is null");
this.tableBucketInfo = requireNonNull(tableBucketInfo, "tableBucketInfo is null");
this.session = requireNonNull(session, "session is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
Expand Down Expand Up @@ -232,7 +233,7 @@ private InternalHiveSplitFactory getHiveSplitFactory(ExtendedFileSystem fs,
return new InternalHiveSplitFactory(
fs,
inputFormat,
pathDomain,
infoColumnConstraints,
getNodeSelectionStrategy(session),
getMaxInitialSplitSize(session),
s3SelectPushdownEnabled,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
import java.util.OptionalInt;

import static com.facebook.presto.hive.BlockLocation.fromHiveBlockLocations;
import static com.facebook.presto.hive.HiveColumnHandle.FILE_MODIFIED_TIME_COLUMN_INDEX;
import static com.facebook.presto.hive.HiveColumnHandle.FILE_SIZE_COLUMN_INDEX;
import static com.facebook.presto.hive.HiveColumnHandle.PATH_COLUMN_INDEX;
import static com.facebook.presto.hive.HiveUtil.isSelectSplittable;
import static com.facebook.presto.hive.HiveUtil.isSplittable;
import static com.facebook.presto.hive.util.CustomSplitConversionUtils.extractCustomSplitInfo;
Expand All @@ -52,7 +55,7 @@ public class InternalHiveSplitFactory
{
private final FileSystem fileSystem;
private final InputFormat<?, ?> inputFormat;
private final Optional<Domain> pathDomain;
private final Map<Integer, Domain> infoColumnConstraints;
private final NodeSelectionStrategy nodeSelectionStrategy;
private final boolean s3SelectPushdownEnabled;
private final HiveSplitPartitionInfo partitionInfo;
Expand All @@ -63,7 +66,7 @@ public class InternalHiveSplitFactory
public InternalHiveSplitFactory(
FileSystem fileSystem,
InputFormat<?, ?> inputFormat,
Optional<Domain> pathDomain,
Map<Integer, Domain> infoColumnConstraints,
NodeSelectionStrategy nodeSelectionStrategy,
DataSize minimumTargetSplitSize,
boolean s3SelectPushdownEnabled,
Expand All @@ -73,7 +76,7 @@ public InternalHiveSplitFactory(
{
this.fileSystem = requireNonNull(fileSystem, "fileSystem is null");
this.inputFormat = requireNonNull(inputFormat, "inputFormat is null");
this.pathDomain = requireNonNull(pathDomain, "pathDomain is null");
this.infoColumnConstraints = requireNonNull(infoColumnConstraints, "infoColumnConstraints is null");
this.nodeSelectionStrategy = requireNonNull(nodeSelectionStrategy, "nodeSelectionStrategy is null");
this.s3SelectPushdownEnabled = s3SelectPushdownEnabled;
this.partitionInfo = partitionInfo;
Expand Down Expand Up @@ -147,7 +150,8 @@ private Optional<InternalHiveSplit> createInternalHiveSplit(
Map<String, String> customSplitInfo)
{
String pathString = path.toString();
if (!pathMatchesPredicate(pathDomain, pathString)) {

if (!infoColumnsMatchPredicates(infoColumnConstraints, pathString, fileSize, fileModificationTime)) {
return Optional.empty();
}

Expand Down Expand Up @@ -252,4 +256,32 @@ private static boolean pathMatchesPredicate(Optional<Domain> pathDomain, String

return pathDomain.get().includesNullableValue(utf8Slice(path));
}

private static boolean infoColumnsMatchPredicates(Map<Integer, Domain> constraints,
String path,
long fileSize,
long fileModificationTime)
{
if (constraints.isEmpty()) {
return true;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be false instead of true ? I'm not super familiar with this code. But when would constraints be empty ? Would be great to have a test for this situation.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Constraints would be empty when there are no filters on info columns in the query. We should return true, so the split will be scheduled to worker. Any simple SELECT query without filters or filters on non-info columns will already be testing this scenario.

}

boolean matches = true;

for (Map.Entry<Integer, Domain> constraint : constraints.entrySet()) {
switch (constraint.getKey()) {
case PATH_COLUMN_INDEX:
matches &= constraint.getValue().includesNullableValue(utf8Slice(path));
break;
case FILE_SIZE_COLUMN_INDEX:
matches &= constraint.getValue().includesNullableValue(fileSize);
break;
case FILE_MODIFIED_TIME_COLUMN_INDEX:
matches &= constraint.getValue().includesNullableValue(fileModificationTime);
break;
}
}

return matches;
}
}
Loading