diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java b/presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java index e995bdca2a2bb..58f26df518d5f 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.Deque; import java.util.Iterator; +import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.Executor; @@ -71,7 +72,7 @@ public class BackgroundHiveSplitLoader public BackgroundHiveSplitLoader( Table table, Iterable partitions, - Optional pathDomain, + Map infoColumnConstraints, Optional tableBucketInfo, ConnectorSession session, HdfsEnvironment hdfsEnvironment, @@ -87,7 +88,7 @@ public BackgroundHiveSplitLoader( checkArgument(loaderConcurrency > 0, "loaderConcurrency must be > 0, found: %s", loaderConcurrency); this.executor = requireNonNull(executor, "executor is null"); this.partitions = new ConcurrentLazyQueue<>(requireNonNull(partitions, "partitions is null")); - this.delegatingPartitionLoader = new DelegatingPartitionLoader(table, pathDomain, tableBucketInfo, session, hdfsEnvironment, namenodeStats, directoryLister, fileIterators, recursiveDirWalkerEnabled, schedulerUsesHostAddresses, partialAggregationsPushedDown); + this.delegatingPartitionLoader = new DelegatingPartitionLoader(table, infoColumnConstraints, tableBucketInfo, session, hdfsEnvironment, namenodeStats, directoryLister, fileIterators, recursiveDirWalkerEnabled, schedulerUsesHostAddresses, partialAggregationsPushedDown); } @Override diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/DelegatingPartitionLoader.java b/presto-hive/src/main/java/com/facebook/presto/hive/DelegatingPartitionLoader.java index 28938eb2c337d..a0ab89cccf074 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/DelegatingPartitionLoader.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/DelegatingPartitionLoader.java @@ -39,7 +39,7 @@ public class DelegatingPartitionLoader public DelegatingPartitionLoader( Table table, - Optional pathDomain, + Map infoColumnConstraints, Optional tableBucketInfo, ConnectorSession session, HdfsEnvironment hdfsEnvironment, @@ -53,7 +53,7 @@ public DelegatingPartitionLoader( this.session = requireNonNull(session, "session is null"); this.storagePartitionLoader = new StoragePartitionLoader( table, - pathDomain, + infoColumnConstraints, tableBucketInfo, session, hdfsEnvironment, @@ -63,7 +63,7 @@ public DelegatingPartitionLoader( recursiveDirWalkerEnabled, schedulerUsesHostAddresses, partialAggregationsPushedDown); - this.manifestPartitionLoader = new ManifestPartitionLoader(table, pathDomain, session, hdfsEnvironment, namenodeStats, directoryLister, recursiveDirWalkerEnabled, schedulerUsesHostAddresses); + this.manifestPartitionLoader = new ManifestPartitionLoader(table, infoColumnConstraints, session, hdfsEnvironment, namenodeStats, directoryLister, recursiveDirWalkerEnabled, schedulerUsesHostAddresses); } @Override diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveColumnHandle.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveColumnHandle.java index 83b4f0bec23df..a33c49b07e540 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveColumnHandle.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveColumnHandle.java @@ -277,4 +277,10 @@ public static boolean isFileModifiedTimeColumnHandle(HiveColumnHandle column) { return column.getHiveColumnIndex() == FILE_MODIFIED_TIME_COLUMN_INDEX; } + + public static boolean isInfoColumnHandle(HiveColumnHandle column) + { + return isPathColumnHandle(column) || isFileSizeColumnHandle(column) + || isFileModifiedTimeColumnHandle(column); + } } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitManager.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitManager.java index 85154eaf156ca..18adab6976cd6 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitManager.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitManager.java @@ -78,11 +78,13 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; +import java.util.stream.Collectors; import java.util.stream.Stream; import static com.facebook.presto.common.type.Decimals.encodeScaledValue; import static com.facebook.presto.common.type.Decimals.isShortDecimal; import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR; +import static com.facebook.presto.hive.HiveColumnHandle.isInfoColumnHandle; import static com.facebook.presto.hive.HiveColumnHandle.isPathColumnHandle; import static com.facebook.presto.hive.HiveCommonSessionProperties.isUseParquetColumnNames; import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_METADATA; @@ -303,7 +305,7 @@ public ConnectorSplitSource getSplits( HiveSplitLoader hiveSplitLoader = new BackgroundHiveSplitLoader( table, hivePartitions, - getPathDomain(layout.getDomainPredicate(), layout.getPredicateColumns()), + getInfoColumnConstraints(layout.getDomainPredicate(), layout.getPredicateColumns()), createBucketSplitInfo(bucketHandle, bucketFilter), session, hdfsEnvironment, @@ -434,6 +436,21 @@ private static Optional getPathDomain(TupleDomain domainPredic .map(Map.Entry::getValue); } + private static Map getInfoColumnConstraints(TupleDomain domainPredicate, Map predicateColumns) + { + checkArgument(!domainPredicate.isNone(), "Unexpected domain predicate: none"); + + if (domainPredicate.getDomains().isPresent()) { + return domainPredicate.getDomains().get() + .entrySet() + .stream() + .filter(kv -> isInfoColumnHandle(predicateColumns.get(kv.getKey().getRootName()))) + .collect(Collectors.toMap(e -> predicateColumns.get(e.getKey().getRootName()).getHiveColumnIndex(), e -> e.getValue())); + } + + return ImmutableMap.of(); + } + @Managed @Nested public CounterStat getHighMemorySplitSource() diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/ManifestPartitionLoader.java b/presto-hive/src/main/java/com/facebook/presto/hive/ManifestPartitionLoader.java index 1ac12adc267b5..67a1e24cc84a7 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/ManifestPartitionLoader.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/ManifestPartitionLoader.java @@ -67,7 +67,7 @@ public class ManifestPartitionLoader private static final String[] BLOCK_LOCATION_HOSTS = {"localhost"}; private final Table table; - private final Optional pathDomain; + Map infoColumnConstraints; private final ConnectorSession session; private final HdfsEnvironment hdfsEnvironment; private final HdfsContext hdfsContext; @@ -78,7 +78,7 @@ public class ManifestPartitionLoader public ManifestPartitionLoader( Table table, - Optional pathDomain, + Map infoColumnConstraints, ConnectorSession session, HdfsEnvironment hdfsEnvironment, NamenodeStats namenodeStats, @@ -87,7 +87,7 @@ public ManifestPartitionLoader( boolean schedulerUsesHostAddresses) { this.table = requireNonNull(table, "table is null"); - this.pathDomain = requireNonNull(pathDomain, "pathDomain is null"); + this.infoColumnConstraints = requireNonNull(infoColumnConstraints, "pathDomain is null"); this.session = requireNonNull(session, "session is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.hdfsContext = new HdfsContext(session, table.getDatabaseName(), table.getTableName(), table.getStorage().getLocation(), false); @@ -133,7 +133,7 @@ public ListenableFuture loadPartition(HivePartitionMetadata partition, HiveSp } } - InternalHiveSplitFactory splitFactory = createInternalHiveSplitFactory(table, partition, session, pathDomain, hdfsEnvironment, hdfsContext, schedulerUsesHostAddresses); + InternalHiveSplitFactory splitFactory = createInternalHiveSplitFactory(table, partition, session, infoColumnConstraints, hdfsEnvironment, hdfsContext, schedulerUsesHostAddresses); return hiveSplitSource.addToQueue(fileListBuilder.build().stream() .map(status -> splitFactory.createInternalHiveSplit(status, true)) @@ -146,7 +146,7 @@ private InternalHiveSplitFactory createInternalHiveSplitFactory( Table table, HivePartitionMetadata partition, ConnectorSession session, - Optional pathDomain, + Map infoColumnConstraints, HdfsEnvironment hdfsEnvironment, HdfsContext hdfsContext, boolean schedulerUsesHostAddresses) @@ -167,7 +167,7 @@ private InternalHiveSplitFactory createInternalHiveSplitFactory( return new InternalHiveSplitFactory( fileSystem, inputFormat, - pathDomain, + infoColumnConstraints, getNodeSelectionStrategy(session), getMaxInitialSplitSize(session), false, diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java b/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java index 453174aa93da5..3800fb9a2fdfc 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java @@ -49,6 +49,7 @@ import java.util.Deque; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.OptionalInt; import java.util.Properties; @@ -99,7 +100,7 @@ public class StoragePartitionLoader private static final ListenableFuture COMPLETED_FUTURE = immediateFuture(null); private final Table table; - private final Optional pathDomain; + Map infoColumnConstraints; private final Optional tableBucketInfo; private final HdfsEnvironment hdfsEnvironment; private final HdfsContext hdfsContext; @@ -113,7 +114,7 @@ public class StoragePartitionLoader public StoragePartitionLoader( Table table, - Optional pathDomain, + Map infoColumnConstraints, Optional tableBucketInfo, ConnectorSession session, HdfsEnvironment hdfsEnvironment, @@ -125,7 +126,7 @@ public StoragePartitionLoader( boolean partialAggregationsPushedDown) { this.table = requireNonNull(table, "table is null"); - this.pathDomain = requireNonNull(pathDomain, "pathDomain is null"); + this.infoColumnConstraints = requireNonNull(infoColumnConstraints, "infoColumnConstraints is null"); this.tableBucketInfo = requireNonNull(tableBucketInfo, "tableBucketInfo is null"); this.session = requireNonNull(session, "session is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); @@ -232,7 +233,7 @@ private InternalHiveSplitFactory getHiveSplitFactory(ExtendedFileSystem fs, return new InternalHiveSplitFactory( fs, inputFormat, - pathDomain, + infoColumnConstraints, getNodeSelectionStrategy(session), getMaxInitialSplitSize(session), s3SelectPushdownEnabled, diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/util/InternalHiveSplitFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/util/InternalHiveSplitFactory.java index 4e3430b748cd7..2568b956e4049 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/util/InternalHiveSplitFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/util/InternalHiveSplitFactory.java @@ -39,6 +39,9 @@ import java.util.OptionalInt; import static com.facebook.presto.hive.BlockLocation.fromHiveBlockLocations; +import static com.facebook.presto.hive.HiveColumnHandle.FILE_MODIFIED_TIME_COLUMN_INDEX; +import static com.facebook.presto.hive.HiveColumnHandle.FILE_SIZE_COLUMN_INDEX; +import static com.facebook.presto.hive.HiveColumnHandle.PATH_COLUMN_INDEX; import static com.facebook.presto.hive.HiveUtil.isSelectSplittable; import static com.facebook.presto.hive.HiveUtil.isSplittable; import static com.facebook.presto.hive.util.CustomSplitConversionUtils.extractCustomSplitInfo; @@ -52,7 +55,7 @@ public class InternalHiveSplitFactory { private final FileSystem fileSystem; private final InputFormat inputFormat; - private final Optional pathDomain; + private final Map infoColumnConstraints; private final NodeSelectionStrategy nodeSelectionStrategy; private final boolean s3SelectPushdownEnabled; private final HiveSplitPartitionInfo partitionInfo; @@ -63,7 +66,7 @@ public class InternalHiveSplitFactory public InternalHiveSplitFactory( FileSystem fileSystem, InputFormat inputFormat, - Optional pathDomain, + Map infoColumnConstraints, NodeSelectionStrategy nodeSelectionStrategy, DataSize minimumTargetSplitSize, boolean s3SelectPushdownEnabled, @@ -73,7 +76,7 @@ public InternalHiveSplitFactory( { this.fileSystem = requireNonNull(fileSystem, "fileSystem is null"); this.inputFormat = requireNonNull(inputFormat, "inputFormat is null"); - this.pathDomain = requireNonNull(pathDomain, "pathDomain is null"); + this.infoColumnConstraints = requireNonNull(infoColumnConstraints, "infoColumnConstraints is null"); this.nodeSelectionStrategy = requireNonNull(nodeSelectionStrategy, "nodeSelectionStrategy is null"); this.s3SelectPushdownEnabled = s3SelectPushdownEnabled; this.partitionInfo = partitionInfo; @@ -147,7 +150,8 @@ private Optional createInternalHiveSplit( Map customSplitInfo) { String pathString = path.toString(); - if (!pathMatchesPredicate(pathDomain, pathString)) { + + if (!infoColumnsMatchPredicates(infoColumnConstraints, pathString, fileSize, fileModificationTime)) { return Optional.empty(); } @@ -252,4 +256,32 @@ private static boolean pathMatchesPredicate(Optional pathDomain, String return pathDomain.get().includesNullableValue(utf8Slice(path)); } + + private static boolean infoColumnsMatchPredicates(Map constraints, + String path, + long fileSize, + long fileModificationTime) + { + if (constraints.isEmpty()) { + return true; + } + + boolean matches = true; + + for (Map.Entry constraint : constraints.entrySet()) { + switch (constraint.getKey()) { + case PATH_COLUMN_INDEX: + matches &= constraint.getValue().includesNullableValue(utf8Slice(path)); + break; + case FILE_SIZE_COLUMN_INDEX: + matches &= constraint.getValue().includesNullableValue(fileSize); + break; + case FILE_MODIFIED_TIME_COLUMN_INDEX: + matches &= constraint.getValue().includesNullableValue(fileModificationTime); + break; + } + } + + return matches; + } } diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestBackgroundHiveSplitLoader.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestBackgroundHiveSplitLoader.java index ed09968f7e881..8f769c349ab91 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestBackgroundHiveSplitLoader.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestBackgroundHiveSplitLoader.java @@ -71,6 +71,7 @@ import static com.facebook.presto.common.type.VarcharType.VARCHAR; import static com.facebook.presto.hive.BucketFunctionType.HIVE_COMPATIBLE; import static com.facebook.presto.hive.CacheQuotaScope.GLOBAL; +import static com.facebook.presto.hive.HiveColumnHandle.PATH_COLUMN_INDEX; import static com.facebook.presto.hive.HiveSessionProperties.getMaxInitialSplitSize; import static com.facebook.presto.hive.HiveStorageFormat.ORC; import static com.facebook.presto.hive.HiveTestUtils.SESSION; @@ -106,6 +107,7 @@ public class TestBackgroundHiveSplitLoader private static final ExecutorService EXECUTOR = newCachedThreadPool(daemonThreadsNamed("test-%s")); private static final Domain RETURNED_PATH_DOMAIN = Domain.singleValue(VARCHAR, utf8Slice(RETURNED_PATH.toString())); + private static final Map RETURNED_PATH_CONSTRAINT = ImmutableMap.of(PATH_COLUMN_INDEX, RETURNED_PATH_DOMAIN); private static final List TEST_FILES = ImmutableList.of( locatedFileStatus(RETURNED_PATH, 0L), @@ -129,7 +131,7 @@ public void testNoPathFilter() { BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( TEST_FILES, - Optional.empty()); + ImmutableMap.of()); HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader); backgroundHiveSplitLoader.start(hiveSplitSource); @@ -143,7 +145,7 @@ public void testPathFilter() { BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( TEST_FILES, - Optional.of(RETURNED_PATH_DOMAIN)); + RETURNED_PATH_CONSTRAINT); HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader); backgroundHiveSplitLoader.start(hiveSplitSource); @@ -158,7 +160,7 @@ public void testPathFilterOneBucketMatchPartitionedTable() { BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( TEST_FILES, - Optional.of(RETURNED_PATH_DOMAIN), + RETURNED_PATH_CONSTRAINT, Optional.of(new HiveBucketFilter(ImmutableSet.of(0, 1))), PARTITIONED_TABLE, Optional.of(new HiveBucketHandle(BUCKET_COLUMN_HANDLES, BUCKET_COUNT, BUCKET_COUNT))); @@ -176,7 +178,7 @@ public void testPathFilterBucketedPartitionedTable() { BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( TEST_FILES, - Optional.of(RETURNED_PATH_DOMAIN), + RETURNED_PATH_CONSTRAINT, Optional.empty(), PARTITIONED_TABLE, Optional.of( @@ -198,7 +200,7 @@ public void testEmptyFileWithNoBlocks() { BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( ImmutableList.of(locatedFileStatusWithNoBlocks(RETURNED_PATH)), - Optional.empty()); + ImmutableMap.of()); HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader); backgroundHiveSplitLoader.start(hiveSplitSource); @@ -241,7 +243,7 @@ public void testUnsupportedTableFormat() BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( SESSION, ImmutableList.of(locatedFileStatus(new Path(SAMPLE_PATH), getMaxInitialSplitSize(SESSION).toBytes())), - Optional.empty(), + ImmutableMap.of(), Optional.empty(), unsupportedTable, Optional.empty(), @@ -328,7 +330,7 @@ public void testSplittableNotCheckedOnSmallFiles() BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( SESSION, ImmutableList.of(locatedFileStatus(new Path(SAMPLE_PATH), initialSplitSize.toBytes())), - Optional.empty(), + ImmutableMap.of(), Optional.empty(), table, Optional.empty(), @@ -343,7 +345,7 @@ public void testSplittableNotCheckedOnSmallFiles() backgroundHiveSplitLoader = backgroundHiveSplitLoader( SESSION, ImmutableList.of(locatedFileStatus(new Path(SAMPLE_PATH), initialSplitSize.toBytes() + 1)), - Optional.empty(), + ImmutableMap.of(), Optional.empty(), table, Optional.empty(), @@ -402,7 +404,7 @@ private void assertTextFileSplitCount(DataSize fileSize, Map tab BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( ImmutableList.of(locatedFileStatus(new Path(SAMPLE_PATH), fileSize.toBytes())), - Optional.empty(), + ImmutableMap.of(), Optional.empty(), table, Optional.empty()); @@ -490,7 +492,7 @@ private static List drainSplits(HiveSplitSource source) private static BackgroundHiveSplitLoader backgroundHiveSplitLoader( List files, - Optional pathDomain) + Map pathDomain) { return backgroundHiveSplitLoader( files, @@ -502,7 +504,7 @@ private static BackgroundHiveSplitLoader backgroundHiveSplitLoader( private static BackgroundHiveSplitLoader backgroundHiveSplitLoader( List files, - Optional pathDomain, + Map constraints, Optional hiveBucketFilter, Table table, Optional bucketHandle) @@ -510,13 +512,13 @@ private static BackgroundHiveSplitLoader backgroundHiveSplitLoader( ConnectorSession connectorSession = new TestingConnectorSession(getAllSessionProperties( new HiveClientConfig().setMaxSplitSize(new DataSize(1.0, GIGABYTE)), new HiveCommonClientConfig())); - return backgroundHiveSplitLoader(connectorSession, files, pathDomain, hiveBucketFilter, table, bucketHandle, samplePartitionMetadatas()); + return backgroundHiveSplitLoader(connectorSession, files, constraints, hiveBucketFilter, table, bucketHandle, samplePartitionMetadatas()); } private static BackgroundHiveSplitLoader backgroundHiveSplitLoader( ConnectorSession connectorSession, List files, - Optional pathDomain, + Map constraints, Optional hiveBucketFilter, Table table, Optional bucketHandle, @@ -525,7 +527,7 @@ private static BackgroundHiveSplitLoader backgroundHiveSplitLoader( return new BackgroundHiveSplitLoader( table, hivePartitionMetadatas, - pathDomain, + constraints, createBucketSplitInfo(bucketHandle, hiveBucketFilter), connectorSession, new TestingHdfsEnvironment(files), @@ -560,7 +562,7 @@ private static BackgroundHiveSplitLoader backgroundHiveSplitLoader(List