From 56ccfa9cc8f093c6092298e82b3cf66375faf4b9 Mon Sep 17 00:00:00 2001 From: praveenkrishna Date: Fri, 10 Dec 2021 11:45:59 +0530 Subject: [PATCH 1/3] More coverage to TestHivePlans * Test to ensure filter on build side table is derived from table properties * Test to ensure query fails if it scans too many partitions --- .../plugin/hive/optimizer/TestHivePlans.java | 37 ++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/optimizer/TestHivePlans.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/optimizer/TestHivePlans.java index 4ce0edc67003..ead277d3496e 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/optimizer/TestHivePlans.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/optimizer/TestHivePlans.java @@ -31,6 +31,7 @@ import io.trino.plugin.hive.metastore.MetastoreConfig; import io.trino.plugin.hive.metastore.file.FileHiveMetastore; import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig; +import io.trino.spi.TrinoException; import io.trino.spi.security.PrincipalType; import io.trino.sql.planner.assertions.BasePlanTest; import io.trino.testing.LocalQueryRunner; @@ -61,6 +62,7 @@ import static io.trino.sql.planner.plan.ExchangeNode.Type.REPARTITION; import static io.trino.sql.planner.plan.JoinNode.Type.INNER; import static io.trino.testing.TestingSession.testSessionBuilder; +import static org.assertj.core.api.Assertions.assertThatThrownBy; public class TestHivePlans extends BasePlanTest @@ -104,7 +106,7 @@ protected LocalQueryRunner createLocalQueryRunner() protected LocalQueryRunner createQueryRunner(Session session, HiveMetastore metastore) { LocalQueryRunner queryRunner = LocalQueryRunner.create(session); - queryRunner.createCatalog(HIVE_CATALOG_NAME, new TestingHiveConnectorFactory(metastore), Map.of()); + queryRunner.createCatalog(HIVE_CATALOG_NAME, new TestingHiveConnectorFactory(metastore), Map.of("hive.max-partitions-per-scan", "5")); return queryRunner; } @@ -122,6 +124,9 @@ public void setUp() // partitioned on varchar queryRunner.execute("CREATE TABLE table_str_partitioned WITH (partitioned_by = ARRAY['str_part']) AS SELECT int_col, str_part FROM (" + values + ") t(str_part, int_col)"); + // with too many partitions + queryRunner.execute("CREATE TABLE table_int_with_too_many_partitions WITH (partitioned_by = ARRAY['int_part']) AS SELECT str_col, int_part FROM (" + values + ", ('six', 6)) t(str_col, int_part)"); + // unpartitioned queryRunner.execute("CREATE TABLE table_unpartitioned AS SELECT str_col, int_col FROM (" + values + ") t(str_col, int_col)"); } @@ -267,6 +272,36 @@ public void testSubsumePartitionFilterNotConvertibleToTupleDomain() tableScan("table_unpartitioned", Map.of("R_STR_COL", "str_col", "R_INT_COL", "int_col")))))))))); } + @Test + public void testFilterDerivedFromTableProperties() + { + // Test that the filter is on build side table is derived from table properties + assertDistributedPlan( + "SELECT l.str_col, r.str_col FROM table_int_partitioned l JOIN table_unpartitioned r ON l.int_part = r.int_col", + noJoinReordering(), + output( + exchange(REMOTE, GATHER, + join(INNER, List.of(equiJoinClause("L_INT_PART", "R_INT_COL")), + exchange(REMOTE, REPARTITION, + project( + filter("true", //dynamic filter + tableScan("table_int_partitioned", Map.of("L_INT_PART", "int_part", "L_STR_COL", "str_col"))))), + exchange(LOCAL, + exchange(REMOTE, REPARTITION, + project( + filter("R_INT_COL IN (1, 2, 3, 4, 5)", + tableScan("table_unpartitioned", Map.of("R_STR_COL", "str_col", "R_INT_COL", "int_col")))))))))); + } + + @Test + public void testQueryScanningForTooManyPartitions() + { + assertThatThrownBy(() -> plan("SELECT l.str_col, r.str_col FROM table_int_with_too_many_partitions l JOIN table_unpartitioned r ON l.int_part = r.int_col")) + .getCause() + .isInstanceOf(TrinoException.class) + .hasMessage("Query over table 'test_schema.table_int_with_too_many_partitions' can potentially read more than 5 partitions"); + } + // Disable join ordering so that expected plans are well defined. private Session noJoinReordering() { From 3a6e1da912e78466ca87955b2584fef9ed158c55 Mon Sep 17 00:00:00 2001 From: praveenkrishna Date: Tue, 7 Dec 2021 19:22:40 +0530 Subject: [PATCH 2/3] Defer loading hive partition if number of partitions crosses limit We defer the initial loading of HivePartitionInformation if the number of partitions crosses a limit. This allows further invocation of applyFilter which could reduce the number of partitions to be scanned. --- .../io/trino/plugin/hive/HiveMetadata.java | 68 +++++++++++++++---- .../plugin/hive/HivePartitionManager.java | 60 +++++++++++----- .../plugin/hive/HivePartitionResult.java | 29 ++++---- .../io/trino/plugin/hive/HiveTableHandle.java | 33 ++++++++- .../plugin/hive/BaseHiveConnectorTest.java | 5 +- .../plugin/hive/optimizer/TestHivePlans.java | 19 +++++- 6 files changed, 163 insertions(+), 51 deletions(-) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java index 1bb6a337f349..944f2eb2dc9f 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java @@ -288,6 +288,7 @@ import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND; +import static io.trino.spi.connector.Constraint.alwaysTrue; import static io.trino.spi.connector.RetryMode.NO_RETRIES; import static io.trino.spi.predicate.TupleDomain.withColumnDomains; import static io.trino.spi.statistics.TableStatisticType.ROW_COUNT; @@ -485,7 +486,7 @@ public ConnectorTableHandle getTableHandleForStatisticsCollection(ConnectorSessi handle = handle.withAnalyzePartitionValues(list); HivePartitionResult partitions = partitionManager.getPartitions(handle, list); - handle = partitionManager.applyPartitionResult(handle, partitions, Optional.empty()); + handle = partitionManager.applyPartitionResult(handle, partitions, alwaysTrue()); } if (analyzeColumnNames.isPresent()) { @@ -761,8 +762,14 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab Map columnTypes = columns.entrySet().stream() .collect(toImmutableMap(Map.Entry::getKey, entry -> getColumnMetadata(session, tableHandle, entry.getValue()).getType())); HivePartitionResult partitionResult = partitionManager.getPartitions(metastore, tableHandle, constraint); - List partitions = partitionManager.getPartitionsAsList(partitionResult); - return hiveStatisticsProvider.getTableStatistics(session, ((HiveTableHandle) tableHandle).getSchemaTableName(), columns, columnTypes, partitions); + // If partitions are not loaded, then don't generate table statistics. + // Note that the computation is not persisted in the table handle, so can be redone many times + // TODO: https://github.com/trinodb/trino/issues/10980. + if (partitionManager.canPartitionsBeLoaded(partitionResult)) { + List partitions = partitionManager.getPartitionsAsList(partitionResult); + return hiveStatisticsProvider.getTableStatistics(session, ((HiveTableHandle) tableHandle).getSchemaTableName(), columns, columnTypes, partitions); + } + return TableStatistics.empty(); } private List listTables(ConnectorSession session, SchemaTablePrefix prefix) @@ -2536,17 +2543,40 @@ public ConnectorTableProperties getTableProperties(ConnectorSession session, Con HiveTableHandle hiveTable = (HiveTableHandle) table; List partitionColumns = ImmutableList.copyOf(hiveTable.getPartitionColumns()); - List partitions = partitionManager.getOrLoadPartitions(metastore, hiveTable); - - TupleDomain predicate = createPredicate(partitionColumns, partitions); - + TupleDomain predicate = TupleDomain.all(); Optional discretePredicates = Optional.empty(); - if (!partitionColumns.isEmpty()) { - // Do not create tuple domains for every partition at the same time! - // There can be a huge number of partitions so use an iterable so - // all domains do not need to be in memory at the same time. - Iterable> partitionDomains = Iterables.transform(partitions, hivePartition -> TupleDomain.fromFixedValues(hivePartition.getKeys())); - discretePredicates = Optional.of(new DiscretePredicates(partitionColumns, partitionDomains)); + + // If only partition names are loaded, then the predicates are partially enforced. + // So computation of predicate and discretePredicates are not valid. + if (hiveTable.getPartitionNames().isEmpty()) { + Optional> partitions = hiveTable.getPartitions() + // If the partitions are not loaded, try out if they can be loaded. + .or(() -> { + // We load the partitions to compute the predicates enforced by the table. + // Note that the computation is not persisted in the table handle, so can be redone many times + // TODO: https://github.com/trinodb/trino/issues/10980. + HivePartitionResult partitionResult = partitionManager.getPartitions(metastore, table, new Constraint(hiveTable.getEnforcedConstraint())); + if (partitionManager.canPartitionsBeLoaded(partitionResult)) { + return Optional.of(partitionManager.getPartitionsAsList(partitionResult)); + } + return Optional.empty(); + }); + + if (partitions.isPresent()) { + List hivePartitions = partitions.orElseThrow(); + // Since the partitions are fully loaded now, we need to compute + predicate = createPredicate(partitionColumns, hivePartitions); + + // Un-partitioned tables can have a partition with ID - UNPARTITIONED, + // this check allows us to ensure that table is partitioned + if (!partitionColumns.isEmpty()) { + // Do not create tuple domains for every partition at the same time! + // There can be a huge number of partitions so use an iterable so + // all domains do not need to be in memory at the same time. + Iterable> partitionDomains = Iterables.transform(hivePartitions, hivePartition -> TupleDomain.fromFixedValues(hivePartition.getKeys())); + discretePredicates = Optional.of(new DiscretePredicates(partitionColumns, partitionDomains)); + } + } } Optional tablePartitioning = Optional.empty(); @@ -2595,16 +2625,23 @@ public Optional> applyFilter(C checkArgument(handle.getAnalyzePartitionValues().isEmpty() || constraint.getSummary().isAll(), "Analyze should not have a constraint"); HivePartitionResult partitionResult = partitionManager.getPartitions(metastore, handle, constraint); - HiveTableHandle newHandle = partitionManager.applyPartitionResult(handle, partitionResult, constraint.getPredicateColumns()); + HiveTableHandle newHandle = partitionManager.applyPartitionResult(handle, partitionResult, constraint); if (handle.getPartitions().equals(newHandle.getPartitions()) && + handle.getPartitionNames().equals(newHandle.getPartitionNames()) && handle.getCompactEffectivePredicate().equals(newHandle.getCompactEffectivePredicate()) && handle.getBucketFilter().equals(newHandle.getBucketFilter()) && handle.getConstraintColumns().equals(newHandle.getConstraintColumns())) { return Optional.empty(); } - return Optional.of(new ConstraintApplicationResult<>(newHandle, partitionResult.getUnenforcedConstraint(), false)); + TupleDomain unenforcedConstraint = partitionResult.getEffectivePredicate(); + if (newHandle.getPartitions().isPresent()) { + List partitionColumns = partitionResult.getPartitionColumns(); + unenforcedConstraint = partitionResult.getEffectivePredicate().filter((column, domain) -> !partitionColumns.contains(column)); + } + + return Optional.of(new ConstraintApplicationResult<>(newHandle, unenforcedConstraint, false)); } @Override @@ -2838,6 +2875,7 @@ public ConnectorTableHandle makeCompatiblePartitioning(ConnectorSession session, hiveTable.getTableParameters(), hiveTable.getPartitionColumns(), hiveTable.getDataColumns(), + hiveTable.getPartitionNames(), hiveTable.getPartitions(), hiveTable.getCompactEffectivePredicate(), hiveTable.getEnforcedConstraint(), diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePartitionManager.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePartitionManager.java index f96a9ae3c508..a29f258c69d6 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePartitionManager.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePartitionManager.java @@ -38,7 +38,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.function.Predicate; import static com.google.common.base.Preconditions.checkArgument; @@ -86,7 +85,7 @@ public HivePartitionResult getPartitions(SemiTransactionalHiveMetastore metastor List partitionColumns = hiveTableHandle.getPartitionColumns(); if (effectivePredicate.isNone()) { - return new HivePartitionResult(partitionColumns, ImmutableList.of(), none(), none(), none(), hiveBucketHandle, Optional.empty()); + return new HivePartitionResult(partitionColumns, Optional.empty(), ImmutableList.of(), none(), none(), hiveBucketHandle, Optional.empty()); } Optional bucketFilter = getHiveBucketFilter(hiveTableHandle, effectivePredicate); @@ -97,10 +96,10 @@ public HivePartitionResult getPartitions(SemiTransactionalHiveMetastore metastor if (partitionColumns.isEmpty()) { return new HivePartitionResult( partitionColumns, + Optional.empty(), ImmutableList.of(new HivePartition(tableName)), - compactEffectivePredicate, effectivePredicate, - TupleDomain.all(), + compactEffectivePredicate, hiveBucketHandle, bucketFilter); } @@ -109,6 +108,7 @@ public HivePartitionResult getPartitions(SemiTransactionalHiveMetastore metastor .map(HiveColumnHandle::getType) .collect(toList()); + Optional> partitionNames = Optional.empty(); Iterable partitionsIterable; Predicate> predicate = constraint.predicate().orElse(value -> true); if (hiveTableHandle.getPartitions().isPresent()) { @@ -117,19 +117,18 @@ public HivePartitionResult getPartitions(SemiTransactionalHiveMetastore metastor .collect(toImmutableList()); } else { - List partitionNames = getFilteredPartitionNames(metastore, tableName, partitionColumns, compactEffectivePredicate); - partitionsIterable = () -> partitionNames.stream() + List partitionNamesList = hiveTableHandle.getPartitionNames() + .orElseGet(() -> getFilteredPartitionNames(metastore, tableName, partitionColumns, compactEffectivePredicate)); + partitionsIterable = () -> partitionNamesList.stream() // Apply extra filters which could not be done by getFilteredPartitionNames .map(partitionName -> parseValuesAndFilterPartition(tableName, partitionName, partitionColumns, partitionTypes, effectivePredicate, predicate)) .filter(Optional::isPresent) .map(Optional::get) .iterator(); + partitionNames = Optional.of(partitionNamesList); } - // All partition key domains will be fully evaluated, so we don't need to include those - TupleDomain remainingTupleDomain = effectivePredicate.filter((column, domain) -> !partitionColumns.contains(column)); - TupleDomain enforcedTupleDomain = effectivePredicate.filter((column, domain) -> partitionColumns.contains(column)); - return new HivePartitionResult(partitionColumns, partitionsIterable, compactEffectivePredicate, remainingTupleDomain, enforcedTupleDomain, hiveBucketHandle, bucketFilter); + return new HivePartitionResult(partitionColumns, partitionNames, partitionsIterable, effectivePredicate, compactEffectivePredicate, hiveBucketHandle, bucketFilter); } public HivePartitionResult getPartitions(ConnectorTableHandle tableHandle, List> partitionValuesList) @@ -153,7 +152,7 @@ public HivePartitionResult getPartitions(ConnectorTableHandle tableHandle, List< .map(partition -> partition.orElseThrow(() -> new VerifyException("partition must exist"))) .collect(toImmutableList()); - return new HivePartitionResult(partitionColumns, partitionList, TupleDomain.all(), TupleDomain.all(), TupleDomain.all(), bucketHandle, Optional.empty()); + return new HivePartitionResult(partitionColumns, Optional.empty(), partitionList, TupleDomain.all(), TupleDomain.all(), bucketHandle, Optional.empty()); } public List getPartitionsAsList(HivePartitionResult partitionResult) @@ -175,22 +174,38 @@ public List getPartitionsAsList(HivePartitionResult partitionResu return partitionList.build(); } - public HiveTableHandle applyPartitionResult(HiveTableHandle handle, HivePartitionResult partitions, Optional> columns) + public HiveTableHandle applyPartitionResult(HiveTableHandle handle, HivePartitionResult partitions, Constraint constraint) { + Optional> partitionNames = partitions.getPartitionNames(); + Optional> partitionList = Optional.empty(); + TupleDomain enforcedConstraint = handle.getEnforcedConstraint(); + + // Partitions will be loaded if + // 1. Number of partitionNames is less than or equal to threshold value. Thereby generating additional filter criteria + // that can be applied on other join side (if the join is based on partition column), + // 2. If additional predicate is passed as a part of Constraint. (specified via loadPartition). This delays the partition checks + // until we have additional filtering based on Constraint + if (canPartitionsBeLoaded(partitions) || constraint.predicate().isPresent()) { + partitionNames = Optional.empty(); + partitionList = Optional.of(getPartitionsAsList(partitions)); + List partitionColumns = partitions.getPartitionColumns(); + enforcedConstraint = partitions.getEffectivePredicate().filter((column, domain) -> partitionColumns.contains(column)); + } return new HiveTableHandle( handle.getSchemaName(), handle.getTableName(), handle.getTableParameters(), ImmutableList.copyOf(partitions.getPartitionColumns()), handle.getDataColumns(), - Optional.of(getPartitionsAsList(partitions)), + partitionNames, + partitionList, partitions.getCompactEffectivePredicate(), - partitions.getEnforcedConstraint(), + enforcedConstraint, partitions.getBucketHandle(), partitions.getBucketFilter(), handle.getAnalyzePartitionValues(), handle.getAnalyzeColumnNames(), - Sets.union(handle.getConstraintColumns(), columns.orElseGet(ImmutableSet::of)), + Sets.union(handle.getConstraintColumns(), constraint.getPredicateColumns().orElseGet(ImmutableSet::of)), handle.getProjectedColumns(), handle.getTransaction(), handle.isRecordScannedFiles(), @@ -199,8 +214,21 @@ public HiveTableHandle applyPartitionResult(HiveTableHandle handle, HivePartitio public List getOrLoadPartitions(SemiTransactionalHiveMetastore metastore, HiveTableHandle table) { + // In case of partitions not being loaded, their permissible values are specified in `HiveTableHandle#getCompactEffectivePredicate, + // so we do an intersection of getCompactEffectivePredicate and HiveTable's enforced constraint + TupleDomain summary = table.getEnforcedConstraint().intersect( + table.getCompactEffectivePredicate() + .transformKeys(ColumnHandle.class::cast)); return table.getPartitions().orElseGet(() -> - getPartitionsAsList(getPartitions(metastore, table, new Constraint(table.getEnforcedConstraint())))); + getPartitionsAsList(getPartitions(metastore, table, new Constraint(summary)))); + } + + public boolean canPartitionsBeLoaded(HivePartitionResult partitionResult) + { + if (partitionResult.getPartitionNames().isPresent()) { + return partitionResult.getPartitionNames().orElseThrow().size() <= maxPartitions; + } + return true; } private Optional parseValuesAndFilterPartition( diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePartitionResult.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePartitionResult.java index d2270b27a11a..77dedf2c09ec 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePartitionResult.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePartitionResult.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.hive; +import com.google.common.collect.ImmutableList; import io.trino.plugin.hive.util.HiveBucketing.HiveBucketFilter; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.predicate.TupleDomain; @@ -35,26 +36,26 @@ public class HivePartitionResult { private final List partitionColumns; private final Iterable partitions; + private final TupleDomain effectivePredicate; private final TupleDomain compactEffectivePredicate; - private final TupleDomain unenforcedConstraint; - private final TupleDomain enforcedConstraint; private final Optional bucketHandle; private final Optional bucketFilter; + private final Optional> partitionNames; public HivePartitionResult( List partitionColumns, + Optional> partitionNames, Iterable partitions, + TupleDomain effectivePredicate, TupleDomain compactEffectivePredicate, - TupleDomain unenforcedConstraint, - TupleDomain enforcedConstraint, Optional bucketHandle, Optional bucketFilter) { this.partitionColumns = requireNonNull(partitionColumns, "partitionColumns is null"); + this.partitionNames = requireNonNull(partitionNames, "partitionNames is null").map(ImmutableList::copyOf); this.partitions = requireNonNull(partitions, "partitions is null"); + this.effectivePredicate = requireNonNull(effectivePredicate, "effectivePredicate is null"); this.compactEffectivePredicate = requireNonNull(compactEffectivePredicate, "compactEffectivePredicate is null"); - this.unenforcedConstraint = requireNonNull(unenforcedConstraint, "unenforcedConstraint is null"); - this.enforcedConstraint = requireNonNull(enforcedConstraint, "enforcedConstraint is null"); this.bucketHandle = requireNonNull(bucketHandle, "bucketHandle is null"); this.bucketFilter = requireNonNull(bucketFilter, "bucketFilter is null"); } @@ -64,24 +65,24 @@ public List getPartitionColumns() return partitionColumns; } - public Iterator getPartitions() + public Optional> getPartitionNames() { - return partitions.iterator(); + return partitionNames; } - public TupleDomain getCompactEffectivePredicate() + public Iterator getPartitions() { - return compactEffectivePredicate; + return partitions.iterator(); } - public TupleDomain getUnenforcedConstraint() + public TupleDomain getEffectivePredicate() { - return unenforcedConstraint; + return effectivePredicate; } - public TupleDomain getEnforcedConstraint() + public TupleDomain getCompactEffectivePredicate() { - return enforcedConstraint; + return compactEffectivePredicate; } public Optional getBucketHandle() diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveTableHandle.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveTableHandle.java index 153eb7d0325f..e8153b6ca957 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveTableHandle.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveTableHandle.java @@ -46,6 +46,7 @@ public class HiveTableHandle private final Optional> tableParameters; private final List partitionColumns; private final List dataColumns; + private final Optional> partitionNames; private final Optional> partitions; private final TupleDomain compactEffectivePredicate; private final TupleDomain enforcedConstraint; @@ -80,6 +81,7 @@ public HiveTableHandle( partitionColumns, dataColumns, Optional.empty(), + Optional.empty(), compactEffectivePredicate, enforcedConstraint, bucketHandle, @@ -108,6 +110,7 @@ public HiveTableHandle( partitionColumns, dataColumns, Optional.empty(), + Optional.empty(), TupleDomain.all(), TupleDomain.all(), bucketHandle, @@ -127,6 +130,7 @@ public HiveTableHandle( Optional> tableParameters, List partitionColumns, List dataColumns, + Optional> partitionNames, Optional> partitions, TupleDomain compactEffectivePredicate, TupleDomain enforcedConstraint, @@ -140,11 +144,13 @@ public HiveTableHandle( boolean recordScannedFiles, Optional maxSplitFileSize) { + checkState(partitionNames.isEmpty() || partitions.isEmpty(), "partition names and partitions list cannot be present at same time"); this.schemaName = requireNonNull(schemaName, "schemaName is null"); this.tableName = requireNonNull(tableName, "tableName is null"); this.tableParameters = requireNonNull(tableParameters, "tableParameters is null").map(ImmutableMap::copyOf); this.partitionColumns = ImmutableList.copyOf(requireNonNull(partitionColumns, "partitionColumns is null")); this.dataColumns = ImmutableList.copyOf(requireNonNull(dataColumns, "dataColumns is null")); + this.partitionNames = requireNonNull(partitionNames, "partitionNames is null").map(ImmutableList::copyOf); this.partitions = requireNonNull(partitions, "partitions is null").map(ImmutableList::copyOf); this.compactEffectivePredicate = requireNonNull(compactEffectivePredicate, "compactEffectivePredicate is null"); this.enforcedConstraint = requireNonNull(enforcedConstraint, "enforcedConstraint is null"); @@ -167,6 +173,7 @@ public HiveTableHandle withAnalyzePartitionValues(List> analyzePart tableParameters, partitionColumns, dataColumns, + partitionNames, partitions, compactEffectivePredicate, enforcedConstraint, @@ -189,6 +196,7 @@ public HiveTableHandle withAnalyzeColumnNames(Set analyzeColumnNames) tableParameters, partitionColumns, dataColumns, + partitionNames, partitions, compactEffectivePredicate, enforcedConstraint, @@ -211,6 +219,7 @@ public HiveTableHandle withTransaction(AcidTransaction transaction) tableParameters, partitionColumns, dataColumns, + partitionNames, partitions, compactEffectivePredicate, enforcedConstraint, @@ -234,6 +243,7 @@ public HiveTableHandle withUpdateProcessor(AcidTransaction transaction, HiveUpda tableParameters, partitionColumns, dataColumns, + partitionNames, partitions, compactEffectivePredicate, enforcedConstraint, @@ -256,6 +266,7 @@ public HiveTableHandle withProjectedColumns(Set projectedColumns) tableParameters, partitionColumns, dataColumns, + partitionNames, partitions, compactEffectivePredicate, enforcedConstraint, @@ -278,6 +289,7 @@ public HiveTableHandle withRecordScannedFiles(boolean recordScannedFiles) tableParameters, partitionColumns, dataColumns, + partitionNames, partitions, compactEffectivePredicate, enforcedConstraint, @@ -300,6 +312,7 @@ public HiveTableHandle withMaxScannedFileSize(Optional maxScannedFileSize) tableParameters, partitionColumns, dataColumns, + partitionNames, partitions, compactEffectivePredicate, enforcedConstraint, @@ -345,7 +358,23 @@ public List getDataColumns() return dataColumns; } - // do not serialize partitions as they are not needed on workers + /** + * Represents raw partition information as String. + * These are partially satisfied by the table filter criteria. + * This will be set to `Optional#empty` if parsed partition information are loaded. + * Skip serialization as they are not needed on workers + */ + @JsonIgnore + public Optional> getPartitionNames() + { + return partitionNames; + } + + /** + * Represents parsed partition information (which is derived from raw partition string). + * These are fully satisfied by the table filter criteria. + * Skip serialization as they are not needed on workers + */ @JsonIgnore public Optional> getPartitions() { @@ -477,6 +506,7 @@ public boolean equals(Object o) Objects.equals(tableName, that.tableName) && Objects.equals(tableParameters, that.tableParameters) && Objects.equals(partitionColumns, that.partitionColumns) && + Objects.equals(partitionNames, that.partitionNames) && Objects.equals(partitions, that.partitions) && Objects.equals(compactEffectivePredicate, that.compactEffectivePredicate) && Objects.equals(enforcedConstraint, that.enforcedConstraint) && @@ -495,6 +525,7 @@ public int hashCode() tableName, tableParameters, partitionColumns, + partitionNames, partitions, compactEffectivePredicate, enforcedConstraint, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java index d77d078e5309..5b5c9f8eda31 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java @@ -3183,9 +3183,8 @@ public void testPartitionPerScanLimitWithMultiplePartitionColumns() .hasMessage("Query over table 'tpch.%s' can potentially read more than 1000 partitions", tableName); // verify we can query with a predicate that is not representable as a TupleDomain - // TODO this shouldn't fail - assertThatThrownBy(() -> query("SELECT * FROM " + tableName + " WHERE part1 % 400 = 3")) // may be translated to Domain.all - .hasMessage("Query over table 'tpch.%s' can potentially read more than 1000 partitions", tableName); + assertThat(query("SELECT * FROM " + tableName + " WHERE part1 % 400 = 3")) // may be translated to Domain.all + .matches("VALUES (VARCHAR 'bar', BIGINT '3', BIGINT '3')"); assertThat(query("SELECT * FROM " + tableName + " WHERE part1 % 400 = 3 AND part1 IS NOT NULL")) // may be translated to Domain.all except nulls .matches("VALUES (VARCHAR 'bar', BIGINT '3', BIGINT '3')"); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/optimizer/TestHivePlans.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/optimizer/TestHivePlans.java index ead277d3496e..30554769c677 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/optimizer/TestHivePlans.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/optimizer/TestHivePlans.java @@ -60,6 +60,7 @@ import static io.trino.sql.planner.plan.ExchangeNode.Scope.REMOTE; import static io.trino.sql.planner.plan.ExchangeNode.Type.GATHER; import static io.trino.sql.planner.plan.ExchangeNode.Type.REPARTITION; +import static io.trino.sql.planner.plan.ExchangeNode.Type.REPLICATE; import static io.trino.sql.planner.plan.JoinNode.Type.INNER; import static io.trino.testing.TestingSession.testSessionBuilder; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -296,8 +297,22 @@ public void testFilterDerivedFromTableProperties() @Test public void testQueryScanningForTooManyPartitions() { - assertThatThrownBy(() -> plan("SELECT l.str_col, r.str_col FROM table_int_with_too_many_partitions l JOIN table_unpartitioned r ON l.int_part = r.int_col")) - .getCause() + String query = "SELECT l.str_col, r.str_col FROM table_int_with_too_many_partitions l JOIN table_unpartitioned r ON l.int_part = r.int_col"; + assertDistributedPlan( + query, + output( + exchange(REMOTE, GATHER, + join(INNER, List.of(equiJoinClause("L_INT_PART", "R_INT_COL")), + project( + filter("true", //dynamic filter + tableScan("table_int_with_too_many_partitions", Map.of("L_INT_PART", "int_part", "L_STR_COL", "str_col")))), + exchange(LOCAL, + exchange(REMOTE, REPLICATE, + project( + tableScan("table_unpartitioned", Map.of("R_STR_COL", "str_col", "R_INT_COL", "int_col"))))))))); + + // The partitions will be loaded during split creation, so it fails during execution. + assertThatThrownBy(() -> getQueryRunner().execute(query)) .isInstanceOf(TrinoException.class) .hasMessage("Query over table 'test_schema.table_int_with_too_many_partitions' can potentially read more than 5 partitions"); } From d7a6b9cd53a8c185f22e5ef9fc3390a2051d3e97 Mon Sep 17 00:00:00 2001 From: praveenkrishna Date: Fri, 18 Feb 2022 18:28:10 +0530 Subject: [PATCH 3/3] Avoid static import of TupleDomain#none --- .../main/java/io/trino/plugin/hive/HivePartitionManager.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePartitionManager.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePartitionManager.java index a29f258c69d6..e2e9061c5700 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePartitionManager.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePartitionManager.java @@ -47,7 +47,6 @@ import static io.trino.plugin.hive.metastore.MetastoreUtil.toPartitionName; import static io.trino.plugin.hive.util.HiveBucketing.getHiveBucketFilter; import static io.trino.plugin.hive.util.HiveUtil.parsePartitionValue; -import static io.trino.spi.predicate.TupleDomain.none; import static java.lang.String.format; import static java.util.stream.Collectors.toList; @@ -85,7 +84,7 @@ public HivePartitionResult getPartitions(SemiTransactionalHiveMetastore metastor List partitionColumns = hiveTableHandle.getPartitionColumns(); if (effectivePredicate.isNone()) { - return new HivePartitionResult(partitionColumns, Optional.empty(), ImmutableList.of(), none(), none(), hiveBucketHandle, Optional.empty()); + return new HivePartitionResult(partitionColumns, Optional.empty(), ImmutableList.of(), TupleDomain.none(), TupleDomain.none(), hiveBucketHandle, Optional.empty()); } Optional bucketFilter = getHiveBucketFilter(hiveTableHandle, effectivePredicate);