-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Lazily load hive partition information #10215
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -288,6 +288,7 @@ | |
| import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY; | ||
| import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; | ||
| import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND; | ||
| import static io.trino.spi.connector.Constraint.alwaysTrue; | ||
| import static io.trino.spi.connector.RetryMode.NO_RETRIES; | ||
| import static io.trino.spi.predicate.TupleDomain.withColumnDomains; | ||
| import static io.trino.spi.statistics.TableStatisticType.ROW_COUNT; | ||
|
|
@@ -485,7 +486,7 @@ public ConnectorTableHandle getTableHandleForStatisticsCollection(ConnectorSessi | |
|
|
||
| handle = handle.withAnalyzePartitionValues(list); | ||
| HivePartitionResult partitions = partitionManager.getPartitions(handle, list); | ||
| handle = partitionManager.applyPartitionResult(handle, partitions, Optional.empty()); | ||
| handle = partitionManager.applyPartitionResult(handle, partitions, alwaysTrue()); | ||
| } | ||
|
|
||
| if (analyzeColumnNames.isPresent()) { | ||
|
|
@@ -761,8 +762,14 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab | |
| Map<String, Type> columnTypes = columns.entrySet().stream() | ||
| .collect(toImmutableMap(Map.Entry::getKey, entry -> getColumnMetadata(session, tableHandle, entry.getValue()).getType())); | ||
| HivePartitionResult partitionResult = partitionManager.getPartitions(metastore, tableHandle, constraint); | ||
| List<HivePartition> partitions = partitionManager.getPartitionsAsList(partitionResult); | ||
| return hiveStatisticsProvider.getTableStatistics(session, ((HiveTableHandle) tableHandle).getSchemaTableName(), columns, columnTypes, partitions); | ||
| // If partitions are not loaded, then don't generate table statistics. | ||
| // Note that the computation is not persisted in the table handle, so can be redone many times | ||
| // TODO: https://github.com/trinodb/trino/issues/10980. | ||
| if (partitionManager.canPartitionsBeLoaded(partitionResult)) { | ||
| List<HivePartition> partitions = partitionManager.getPartitionsAsList(partitionResult); | ||
| return hiveStatisticsProvider.getTableStatistics(session, ((HiveTableHandle) tableHandle).getSchemaTableName(), columns, columnTypes, partitions); | ||
| } | ||
| return TableStatistics.empty(); | ||
| } | ||
|
|
||
| private List<SchemaTableName> listTables(ConnectorSession session, SchemaTablePrefix prefix) | ||
|
|
@@ -2536,17 +2543,40 @@ public ConnectorTableProperties getTableProperties(ConnectorSession session, Con | |
| HiveTableHandle hiveTable = (HiveTableHandle) table; | ||
|
|
||
| List<ColumnHandle> partitionColumns = ImmutableList.copyOf(hiveTable.getPartitionColumns()); | ||
| List<HivePartition> partitions = partitionManager.getOrLoadPartitions(metastore, hiveTable); | ||
|
|
||
| TupleDomain<ColumnHandle> predicate = createPredicate(partitionColumns, partitions); | ||
|
|
||
| TupleDomain<ColumnHandle> predicate = TupleDomain.all(); | ||
| Optional<DiscretePredicates> discretePredicates = Optional.empty(); | ||
| if (!partitionColumns.isEmpty()) { | ||
| // Do not create tuple domains for every partition at the same time! | ||
| // There can be a huge number of partitions so use an iterable so | ||
| // all domains do not need to be in memory at the same time. | ||
| Iterable<TupleDomain<ColumnHandle>> partitionDomains = Iterables.transform(partitions, hivePartition -> TupleDomain.fromFixedValues(hivePartition.getKeys())); | ||
| discretePredicates = Optional.of(new DiscretePredicates(partitionColumns, partitionDomains)); | ||
|
|
||
| // If only partition names are loaded, then the predicates are partially enforced. | ||
| // So computation of predicate and discretePredicates are not valid. | ||
| if (hiveTable.getPartitionNames().isEmpty()) { | ||
|
Praveen2112 marked this conversation as resolved.
Outdated
|
||
| Optional<List<HivePartition>> partitions = hiveTable.getPartitions() | ||
| // If the partitions are not loaded, try out if they can be loaded. | ||
| .or(() -> { | ||
| // We load the partitions to compute the predicates enforced by the table. | ||
| // Note that the computation is not persisted in the table handle, so can be redone many times | ||
| // TODO: https://github.com/trinodb/trino/issues/10980. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this change worth doing until we have that done? Seems like loading the partition information once eagerly is better than lazily doing it multiple times
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @alexjo2144 not sure what's your suggestion here?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm just asking if this change to lazily load the partition information is actually an improvement until that linked issue is completed.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or if that issue is a blocker for this to be merged.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We lazily evaluate so that it will be loaded after all the filters are pushed to the Hive. |
||
| HivePartitionResult partitionResult = partitionManager.getPartitions(metastore, table, new Constraint(hiveTable.getEnforcedConstraint())); | ||
| if (partitionManager.canPartitionsBeLoaded(partitionResult)) { | ||
| return Optional.of(partitionManager.getPartitionsAsList(partitionResult)); | ||
| } | ||
| return Optional.empty(); | ||
| }); | ||
|
|
||
| if (partitions.isPresent()) { | ||
|
findepi marked this conversation as resolved.
Outdated
|
||
| List<HivePartition> hivePartitions = partitions.orElseThrow(); | ||
| // Since the partitions are fully loaded now, we need to compute | ||
| predicate = createPredicate(partitionColumns, hivePartitions); | ||
|
|
||
| // Un-partitioned tables can have a partition with ID - UNPARTITIONED, | ||
| // this check allows us to ensure that table is partitioned | ||
| if (!partitionColumns.isEmpty()) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In case of
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a comment
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for adding a comment. |
||
| // Do not create tuple domains for every partition at the same time! | ||
| // There can be a huge number of partitions so use an iterable so | ||
| // all domains do not need to be in memory at the same time. | ||
| Iterable<TupleDomain<ColumnHandle>> partitionDomains = Iterables.transform(hivePartitions, hivePartition -> TupleDomain.fromFixedValues(hivePartition.getKeys())); | ||
| discretePredicates = Optional.of(new DiscretePredicates(partitionColumns, partitionDomains)); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Optional<ConnectorTablePartitioning> tablePartitioning = Optional.empty(); | ||
|
|
@@ -2595,16 +2625,23 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C | |
| checkArgument(handle.getAnalyzePartitionValues().isEmpty() || constraint.getSummary().isAll(), "Analyze should not have a constraint"); | ||
|
|
||
| HivePartitionResult partitionResult = partitionManager.getPartitions(metastore, handle, constraint); | ||
| HiveTableHandle newHandle = partitionManager.applyPartitionResult(handle, partitionResult, constraint.getPredicateColumns()); | ||
| HiveTableHandle newHandle = partitionManager.applyPartitionResult(handle, partitionResult, constraint); | ||
|
|
||
| if (handle.getPartitions().equals(newHandle.getPartitions()) && | ||
|
findepi marked this conversation as resolved.
Outdated
|
||
| handle.getPartitionNames().equals(newHandle.getPartitionNames()) && | ||
| handle.getCompactEffectivePredicate().equals(newHandle.getCompactEffectivePredicate()) && | ||
| handle.getBucketFilter().equals(newHandle.getBucketFilter()) && | ||
| handle.getConstraintColumns().equals(newHandle.getConstraintColumns())) { | ||
| return Optional.empty(); | ||
| } | ||
|
|
||
| return Optional.of(new ConstraintApplicationResult<>(newHandle, partitionResult.getUnenforcedConstraint(), false)); | ||
| TupleDomain<ColumnHandle> unenforcedConstraint = partitionResult.getEffectivePredicate(); | ||
| if (newHandle.getPartitions().isPresent()) { | ||
| List<HiveColumnHandle> partitionColumns = partitionResult.getPartitionColumns(); | ||
| unenforcedConstraint = partitionResult.getEffectivePredicate().filter((column, domain) -> !partitionColumns.contains(column)); | ||
| } | ||
|
|
||
| return Optional.of(new ConstraintApplicationResult<>(newHandle, unenforcedConstraint, false)); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -2838,6 +2875,7 @@ public ConnectorTableHandle makeCompatiblePartitioning(ConnectorSession session, | |
| hiveTable.getTableParameters(), | ||
| hiveTable.getPartitionColumns(), | ||
| hiveTable.getDataColumns(), | ||
| hiveTable.getPartitionNames(), | ||
| hiveTable.getPartitions(), | ||
| hiveTable.getCompactEffectivePredicate(), | ||
| hiveTable.getEnforcedConstraint(), | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,7 +38,6 @@ | |
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Optional; | ||
| import java.util.Set; | ||
| import java.util.function.Predicate; | ||
|
|
||
| import static com.google.common.base.Preconditions.checkArgument; | ||
|
|
@@ -48,7 +47,6 @@ | |
| import static io.trino.plugin.hive.metastore.MetastoreUtil.toPartitionName; | ||
| import static io.trino.plugin.hive.util.HiveBucketing.getHiveBucketFilter; | ||
| import static io.trino.plugin.hive.util.HiveUtil.parsePartitionValue; | ||
| import static io.trino.spi.predicate.TupleDomain.none; | ||
| import static java.lang.String.format; | ||
| import static java.util.stream.Collectors.toList; | ||
|
|
||
|
|
@@ -86,7 +84,7 @@ public HivePartitionResult getPartitions(SemiTransactionalHiveMetastore metastor | |
| List<HiveColumnHandle> partitionColumns = hiveTableHandle.getPartitionColumns(); | ||
|
|
||
| if (effectivePredicate.isNone()) { | ||
| return new HivePartitionResult(partitionColumns, ImmutableList.of(), none(), none(), none(), hiveBucketHandle, Optional.empty()); | ||
| return new HivePartitionResult(partitionColumns, Optional.empty(), ImmutableList.of(), TupleDomain.none(), TupleDomain.none(), hiveBucketHandle, Optional.empty()); | ||
| } | ||
|
|
||
| Optional<HiveBucketFilter> bucketFilter = getHiveBucketFilter(hiveTableHandle, effectivePredicate); | ||
|
|
@@ -97,10 +95,10 @@ public HivePartitionResult getPartitions(SemiTransactionalHiveMetastore metastor | |
| if (partitionColumns.isEmpty()) { | ||
| return new HivePartitionResult( | ||
| partitionColumns, | ||
| Optional.empty(), | ||
| ImmutableList.of(new HivePartition(tableName)), | ||
| compactEffectivePredicate, | ||
| effectivePredicate, | ||
| TupleDomain.all(), | ||
| compactEffectivePredicate, | ||
| hiveBucketHandle, | ||
| bucketFilter); | ||
| } | ||
|
|
@@ -109,6 +107,7 @@ public HivePartitionResult getPartitions(SemiTransactionalHiveMetastore metastor | |
| .map(HiveColumnHandle::getType) | ||
| .collect(toList()); | ||
|
|
||
| Optional<List<String>> partitionNames = Optional.empty(); | ||
| Iterable<HivePartition> partitionsIterable; | ||
| Predicate<Map<ColumnHandle, NullableValue>> predicate = constraint.predicate().orElse(value -> true); | ||
| if (hiveTableHandle.getPartitions().isPresent()) { | ||
|
|
@@ -117,19 +116,18 @@ public HivePartitionResult getPartitions(SemiTransactionalHiveMetastore metastor | |
| .collect(toImmutableList()); | ||
| } | ||
| else { | ||
| List<String> partitionNames = getFilteredPartitionNames(metastore, tableName, partitionColumns, compactEffectivePredicate); | ||
| partitionsIterable = () -> partitionNames.stream() | ||
| List<String> partitionNamesList = hiveTableHandle.getPartitionNames() | ||
| .orElseGet(() -> getFilteredPartitionNames(metastore, tableName, partitionColumns, compactEffectivePredicate)); | ||
| partitionsIterable = () -> partitionNamesList.stream() | ||
| // Apply extra filters which could not be done by getFilteredPartitionNames | ||
| .map(partitionName -> parseValuesAndFilterPartition(tableName, partitionName, partitionColumns, partitionTypes, effectivePredicate, predicate)) | ||
| .filter(Optional::isPresent) | ||
| .map(Optional::get) | ||
| .iterator(); | ||
| partitionNames = Optional.of(partitionNamesList); | ||
| } | ||
|
|
||
| // All partition key domains will be fully evaluated, so we don't need to include those | ||
| TupleDomain<ColumnHandle> remainingTupleDomain = effectivePredicate.filter((column, domain) -> !partitionColumns.contains(column)); | ||
| TupleDomain<ColumnHandle> enforcedTupleDomain = effectivePredicate.filter((column, domain) -> partitionColumns.contains(column)); | ||
| return new HivePartitionResult(partitionColumns, partitionsIterable, compactEffectivePredicate, remainingTupleDomain, enforcedTupleDomain, hiveBucketHandle, bucketFilter); | ||
| return new HivePartitionResult(partitionColumns, partitionNames, partitionsIterable, effectivePredicate, compactEffectivePredicate, hiveBucketHandle, bucketFilter); | ||
| } | ||
|
|
||
| public HivePartitionResult getPartitions(ConnectorTableHandle tableHandle, List<List<String>> partitionValuesList) | ||
|
|
@@ -153,7 +151,7 @@ public HivePartitionResult getPartitions(ConnectorTableHandle tableHandle, List< | |
| .map(partition -> partition.orElseThrow(() -> new VerifyException("partition must exist"))) | ||
| .collect(toImmutableList()); | ||
|
|
||
| return new HivePartitionResult(partitionColumns, partitionList, TupleDomain.all(), TupleDomain.all(), TupleDomain.all(), bucketHandle, Optional.empty()); | ||
| return new HivePartitionResult(partitionColumns, Optional.empty(), partitionList, TupleDomain.all(), TupleDomain.all(), bucketHandle, Optional.empty()); | ||
| } | ||
|
|
||
| public List<HivePartition> getPartitionsAsList(HivePartitionResult partitionResult) | ||
|
|
@@ -175,22 +173,38 @@ public List<HivePartition> getPartitionsAsList(HivePartitionResult partitionResu | |
| return partitionList.build(); | ||
| } | ||
|
|
||
| public HiveTableHandle applyPartitionResult(HiveTableHandle handle, HivePartitionResult partitions, Optional<Set<ColumnHandle>> columns) | ||
| public HiveTableHandle applyPartitionResult(HiveTableHandle handle, HivePartitionResult partitions, Constraint constraint) | ||
| { | ||
| Optional<List<String>> partitionNames = partitions.getPartitionNames(); | ||
|
Praveen2112 marked this conversation as resolved.
Outdated
|
||
| Optional<List<HivePartition>> partitionList = Optional.empty(); | ||
| TupleDomain<ColumnHandle> enforcedConstraint = handle.getEnforcedConstraint(); | ||
|
|
||
| // Partitions will be loaded if | ||
| // 1. Number of partitionNames is less than or equal to threshold value. Thereby generating additional filter criteria | ||
| // that can be applied on other join side (if the join is based on partition column), | ||
| // 2. If additional predicate is passed as a part of Constraint. (specified via loadPartition). This delays the partition checks | ||
| // until we have additional filtering based on Constraint | ||
| if (canPartitionsBeLoaded(partitions) || constraint.predicate().isPresent()) { | ||
| partitionNames = Optional.empty(); | ||
| partitionList = Optional.of(getPartitionsAsList(partitions)); | ||
| List<HiveColumnHandle> partitionColumns = partitions.getPartitionColumns(); | ||
| enforcedConstraint = partitions.getEffectivePredicate().filter((column, domain) -> partitionColumns.contains(column)); | ||
| } | ||
| return new HiveTableHandle( | ||
| handle.getSchemaName(), | ||
| handle.getTableName(), | ||
| handle.getTableParameters(), | ||
| ImmutableList.copyOf(partitions.getPartitionColumns()), | ||
| handle.getDataColumns(), | ||
| Optional.of(getPartitionsAsList(partitions)), | ||
| partitionNames, | ||
| partitionList, | ||
| partitions.getCompactEffectivePredicate(), | ||
| partitions.getEnforcedConstraint(), | ||
| enforcedConstraint, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we skip the
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Actually it would contain |
||
| partitions.getBucketHandle(), | ||
| partitions.getBucketFilter(), | ||
| handle.getAnalyzePartitionValues(), | ||
| handle.getAnalyzeColumnNames(), | ||
| Sets.union(handle.getConstraintColumns(), columns.orElseGet(ImmutableSet::of)), | ||
| Sets.union(handle.getConstraintColumns(), constraint.getPredicateColumns().orElseGet(ImmutableSet::of)), | ||
| handle.getProjectedColumns(), | ||
| handle.getTransaction(), | ||
| handle.isRecordScannedFiles(), | ||
|
|
@@ -199,8 +213,21 @@ public HiveTableHandle applyPartitionResult(HiveTableHandle handle, HivePartitio | |
|
|
||
| public List<HivePartition> getOrLoadPartitions(SemiTransactionalHiveMetastore metastore, HiveTableHandle table) | ||
| { | ||
| // In case of partitions not being loaded, their permissible values are specified in `HiveTableHandle#getCompactEffectivePredicate, | ||
| // so we do an intersection of getCompactEffectivePredicate and HiveTable's enforced constraint | ||
| TupleDomain<ColumnHandle> summary = table.getEnforcedConstraint().intersect( | ||
| table.getCompactEffectivePredicate() | ||
| .transformKeys(ColumnHandle.class::cast)); | ||
|
findepi marked this conversation as resolved.
Outdated
|
||
| return table.getPartitions().orElseGet(() -> | ||
| getPartitionsAsList(getPartitions(metastore, table, new Constraint(table.getEnforcedConstraint())))); | ||
| getPartitionsAsList(getPartitions(metastore, table, new Constraint(summary)))); | ||
| } | ||
|
|
||
| public boolean canPartitionsBeLoaded(HivePartitionResult partitionResult) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are some callers of |
||
| { | ||
| if (partitionResult.getPartitionNames().isPresent()) { | ||
| return partitionResult.getPartitionNames().orElseThrow().size() <= maxPartitions; | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
| private Optional<HivePartition> parseValuesAndFilterPartition( | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.