feat(plugin-iceberg): Lazy load partitions to avoid unnecessary loading#23645
Conversation
9028384 to
e390db7
Compare
ZacBlanco
left a comment
There was a problem hiding this comment.
I understand the motivation for this, but I am a little concerned about the other parts which implement the limiting of the partition loading. It feels a little convoluted, especially requiring checking the instance type of the iterable to set the max number of partitions to iterate. I feel like it could be simplified by just creating an iterable implementation which accepts the limit in the first place, rather than updating it after the fact. I may be missing something in my understanding if that isn't a feasible solution
The current implementation of partition loading in IcebergPartitionLoader looks good though.
|
|
||
| import static java.util.Objects.requireNonNull; | ||
|
|
||
| public class LazyLoadedPartitions |
There was a problem hiding this comment.
I feel like this class itself should just implement Iterable. Is there a reason we need both LazyLoadedPartitions and LazyPartitionsIterable?
There was a problem hiding this comment.
Good suggestion, after recheck the code I think it's feasible and reasonable. Fixed!
| */ | ||
| package com.facebook.presto.spi; | ||
|
|
||
| public interface LazyIterable<T> |
There was a problem hiding this comment.
I feel like interface makes the implementation more complex. Is there a reason we need to have the max iteration count mutable? I feel like having this mutable iteration count is a bit of an anti-pattern. What if instead we use something like limit from the stream API to prevent loading too many items? e.g.
Stream.of(new Iterable<>(...){}).limit(<N>)...There was a problem hiding this comment.
Another option could be adding a new field inside of the DiscretePredicate class with the maximum number of predicates to iterate over? Then we don't have to rely on the casting + lazy iterable.
Another alternative could be putting a limit on the iterator we pass to the DiscretePredicates constructor too, rather than even having to modify the DiscretePredicate class.
There was a problem hiding this comment.
Thanks for the review and feedback, the entire thinking is described as follows, please let me know if there is anything unreasonable.
There are several points to consider when implementing setting max threshold for partitions lazy loading:
-
If any delete files are found during the file scanning process, the scanning and loading of partition values would be no longer continue, but instead a single value representing unpartitioned should be returned. So we cannot just scan a part of files to get a specified number of available partition values. So it seems that we cannot set the max threshold and terminate the scan in advance through methods such like
Stream.of(new Iterable<>(...){}).limit(<N>).... -
The max threshold can be passed into
IcebergPartitionLoaderand used there during the loading to terminate the entire scanning process. Because during the loading process, even if no delete files are encountered, when the number of loaded available partitions exceeds the max threshold, we can terminate directly and return a value representing unpartitioned. So we have to figure out a way to passmax thresholdintoIcebergPartitionLoader. -
The construction of the table layout, that is, the lazy partitions and
DiscretePredicatesinformation is built earlier than the actual loading. Meanwhile, the value of max threshold can only be determined when actual loading is required. For example, inIcebergMetadataOptimizer, if further reducible optimization is available, we do not limit the max number of partitions loaded; otherwise, the max number of loaded partitions is set 1000 by default. So it seems that we need to support this delay setting of maximum threshold for the partition loader. -
LazyLoadedPartitionsandPartitionLoaderare both defined inpresto-hive-common, whileDiscretePredicatesis defined inpresto-spi, which do not depend onpresto-hive-common. As mentioned above, we need a way to set the max threshold intoPartitionLoaderthroughDiscretePredicatesbefore actual loading. So we define an interface inpresto-spiand letLazyLoadedPartitionsinpresto-hive-commonto implement it, in this way, max threshold can be finally passed intoPartitionLoaderthroughLazyLoadedPartitons.
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPartitionLoader.java
Outdated
Show resolved
Hide resolved
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPartitionLoader.java
Outdated
Show resolved
Hide resolved
e390db7 to
827feb5
Compare
827feb5 to
819c643
Compare
|
Thanks for the release note entry! Minor suggestion to help follow the Order of changes in the Release Note Guidelines. |
|
@steveburnett Thanks for your suggestion, fixed! Please take a look when convenient. |
@hantangwangd LGTM, thanks! |
819c643 to
ce6b43f
Compare
|
Hi, new release note guidelines. Please remove the manual PR link in the following format from the release note entries for this PR. I have updated the Release Notes Guidelines to remove the examples of manually adding the PR link. |
Oh sorry forgot that. Thanks for your reminder @steveburnett, fixed! |
|
@hantangwangd, if you rebase this PR that should pick up fixes to some tests that were made in the last few weeks, and should help with the blocked tests. |
ce6b43f to
828c19f
Compare
|
@steveburnett Sure, thanks for the reminder, done! |
|
I was playing around with this because I am not happy with the way the abstraction exposes a setMaxCount to change the iterable. It just doesn't sit right with me. I made a patch to slightly change the current approach. Could you take a look and tell me what you think? |
@ZacBlanco Sure, thank you very much for your perspective and the effort you have put into this. I'll check the patch and do some benchmark as soon as possible. |
|
Hi @ZacBlanco, after checking the changes in the patch, I believe there is a slight misunderstanding about the use of threshold limit in the following code: The However, in this patch, the use of But if you're still unsure about the max load limit change, I'm OK to split this PR into two separated PRs. We can go ahead to pass the lazy loading part first, and then discuss the max load limit part of change in detail separately. What's your opinion? |
828c19f to
f6cb168
Compare
f6cb168 to
a28241d
Compare
PingLiuPing
left a comment
There was a problem hiding this comment.
Thank you @hantangwangd .
For queries that cannot be optimized based on metadata, we do not use these partition values at all, so we shouldn't load them eagerly.
It would be helpful if you can elaborate more on the pattern of those queries, for example query with aggregation on non-partition columns can benefit from this PR.
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java
Show resolved
Hide resolved
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPartitionLoader.java
Show resolved
Hide resolved
a28241d to
bd6c0ff
Compare
|
@PingLiuPing thanks for the review and suggestion. I've supplemented some content about the queries that can be optimized using partitions information, along with the description of the benchmark selected queries. |
| return TupleDomain.fromFixedValues(hivePartition.getKeys()); | ||
| } | ||
| }); | ||
| if (!isUnPartitioned.get()) { |
There was a problem hiding this comment.
Seems isUnpartitioned is used before setting the value, meaning isUnpartitioned is always false here.
The isUnPartitioned flag is set inside a lazy Iterables.transform() lambda, but checked before the iterable is consumed.
There was a problem hiding this comment.
Nice catch! Upon re-examining the relevant code, I think this logic can be removed directly. Simply returning a TupleDomain.all is sufficient. Later, it will be interpreted as Unpartitioned. Seeing the code here: https://github.com/prestodb/presto/blob/master/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergMetadataOptimizer.java#L239-L249
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPartitionLoader.java
Show resolved
Hide resolved
bd6c0ff to
3a2e431
Compare
| assertEquals(partitionsLazyLoadingCountMetric.getCount(), 2); | ||
|
|
||
| // Do not perform metadata optimization when the number of partitions exceeds the threshold | ||
| Session sessionWithThresholdSmaller = getSessionWithOptimizeMetadataQueriesAndThreshold(false, 3); |
There was a problem hiding this comment.
The boolean flag is intentionally set to false to disable pushdown_filter_enabled. This is because when the number of partitions exceeds the threshold, metadata optimization is skipped and the tableScanNode remains in the plan tree. Enabling pushdown_filter_enabled in this scenario would cause queries to fail directly with the error:
com.facebook.presto.spi.PrestoException: Filter Pushdown not supported for Iceberg Java Connector
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPartitionLoader.java
Outdated
Show resolved
Hide resolved
presto-hive-common/src/main/java/com/facebook/presto/hive/LazyLoadedPartitions.java
Outdated
Show resolved
Hide resolved
| */ | ||
| @JsonIgnore | ||
| public Optional<List<HivePartition>> getPartitions() | ||
| public Optional<? extends Iterable<HivePartition>> getPartitions() |
There was a problem hiding this comment.
I think it would be better to simply return LazyLoadedPartitions here, as the generics are a bit awkward. If we do this, I would rename LazyLoadedPartitions to something simpler, maybe PartitionSet, as there's an eager constructor on that class already.
There was a problem hiding this comment.
Thanks for the suggestion, sounds great to me. I've refactored the code accordingly. Please take a look when you get a chance.
Updates method `BaseHiveTableLayoutHandle.getPartitions()` to return `Optional<PartitionSet>`.
3a2e431 to
cdbc0c6
Compare
…ng (prestodb#23645) ## Description Currently, when querying Iceberg tables, we always eagerly load all partition values of the table in `PickTableLayout` or `IcebergFilterPushdown` during the optimization phase. Due to the fact that this eagerly loaded partition values are currently only used in metadata based optimization rules, in many cases this information is not used at all. It can result in a lot of waste of resources and performance in the following cases: - For queries that cannot be optimized based on metadata, we do not use these partition values at all, so we shouldn't load them eagerly. - For tables with a huge number of partitions that are not suitable for metadata optimization, we need to limit the max number that can be loaded in the loading phase, rather than loading all of them first and then determine whether they exceed the threshold. (to be implemented in a subsequent PR) For details on queries that can be optimized using partitions message, see here: prestodb#22080. This PR makes the partition loading behavior lazy and only executes the loading when necessary. In this way, we can avoid a lot of unnecessary loading in many scenarios, as well as the resulting resource consumption and performance loss. The benchmark's results also support the above conclusion. We execute regular query statements, and query statements which are applicable for further reducible metadata optimization (which means always load and use all partitions) on tables with two different partition numbers: `300 * 2` and `1600 * 4`. The selected queries are as follows: ``` // Table `iceberg_partition` is partitioned by column `a` and `b`. create table iceberg_partition(a int, b int, c double) with (partitioning = ARRAY['a', 'b']); // This query will always be optimized using the partitions information, regardless of the partitions count. select min(a), max(a), min(b), max(b) from iceberg_partition; // This query does not need partitions information at all. select a, c from iceberg_partition where b >= xxx; ``` The benchmark test result before this change is as follows: ``` ----Before this change, always load partitions eagerly---- Benchmark (recordCount) Mode Cnt Score Error Units BenchmarkIcebergLazyLoading.testFurtherOptimize 300 * 2 avgt 10 39.971 ± 1.133 ms/op BenchmarkIcebergLazyLoading.testFurtherOptimize 1600 * 4 avgt 10 66.593 ± 2.253 ms/op BenchmarkIcebergLazyLoading.testNormalQuery 300 * 2 avgt 10 166.274 ± 5.041 ms/op BenchmarkIcebergLazyLoading.testNormalQuery 1600 * 4 avgt 10 659.756 ± 69.708 ms/op ``` While the benchmark test result after this change is as follows: ``` ----After this change, lazy load partitions and check the max threshold in loading phase---- Benchmark (recordCount) Mode Cnt Score Error Units BenchmarkIcebergLazyLoading.testFurtherOptimize 300 * 2 avgt 10 50.806 ± 2.471 ms/op BenchmarkIcebergLazyLoading.testFurtherOptimize 1600 * 4 avgt 10 77.579 ± 1.025 ms/op BenchmarkIcebergLazyLoading.testNormalQuery 300 * 2 avgt 10 155.059 ± 4.522 ms/op BenchmarkIcebergLazyLoading.testNormalQuery 1600 * 4 avgt 10 596.955 ± 41.619 ms/op ``` Due to the issues mentioned above, we found that this change significantly improves the performance of queries that are not suitable for metadata optimization. This aligns with expectations, it introduces a minimal fixed cost for workloads that load all partitions. However, for most queries that don't need to load the table partitions, the performance gain scales significantly with the partitions count. ## Motivation and Context Make partitions loading for iceberg table lazy to avoid unnecessary loading ## Impact N/A ## Test Plan - Make sure the change do not affect existing tests - Newly added test case in `TestIcebergLogicalPlanner` to show the behaviors with different max partition thresholds - Newly added benchmark tests in `BenchmarkIcebergLazyLoading` to show the improvement in performance ## Contributor checklist - [x] Please make sure your submission complies with our [development](https://github.com/prestodb/presto/wiki/Presto-Development-Guidelines#development), [formatting](https://github.com/prestodb/presto/wiki/Presto-Development-Guidelines#formatting), [commit message](https://github.com/prestodb/presto/wiki/Review-and-Commit-guidelines#commit-formatting-and-pull-requests), and [attribution guidelines](https://github.com/prestodb/presto/wiki/Review-and-Commit-guidelines#attribution). - [x] PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced. - [x] Documented new properties (with its default value), SQL syntax, functions, or other functionality. - [x] If release notes are required, they follow the [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines). - [x] Adequate tests were added if applicable. - [x] CI passed. ## Release Notes ``` == RELEASE NOTES == Iceberg Connector Changes * Improve partition loading for Iceberg tables by making it lazy, preventing unnecessary loading. ```
Description
Currently, when querying Iceberg tables, we always eagerly load all partition values of the table in
PickTableLayoutorIcebergFilterPushdownduring the optimization phase. Due to the fact that this eagerly loaded partition values are currently only used in metadata based optimization rules, in many cases this information is not used at all. It can result in a lot of waste of resources and performance in the following cases:For details on queries that can be optimized using partitions message, see here: #22080.
This PR makes the partition loading behavior lazy and only executes the loading when necessary. In this way, we can avoid a lot of unnecessary loading in many scenarios, as well as the resulting resource consumption and performance loss.
The benchmark's results also support the above conclusion. We execute regular query statements, and query statements which are applicable for further reducible metadata optimization (which means always load and use all partitions) on tables with two different partition numbers:
300 * 2and1600 * 4. The selected queries are as follows:The benchmark test result before this change is as follows:
While the benchmark test result after this change is as follows:
Due to the issues mentioned above, we found that this change significantly improves the performance of queries that are not suitable for metadata optimization. This aligns with expectations, it introduces a minimal fixed cost for workloads that load all partitions. However, for most queries that don't need to load the table partitions, the performance gain scales significantly with the partitions count.
Motivation and Context
Make partitions loading for iceberg table lazy to avoid unnecessary loading
Impact
N/A
Test Plan
TestIcebergLogicalPlannerto show the behaviors with different max partition thresholdsBenchmarkIcebergLazyLoadingto show the improvement in performanceContributor checklist
Release Notes