Support partition based metadata query optimization for Iceberg#22080
Conversation
a2cf6a9 to
a922048
Compare
tdcmeehan
left a comment
There was a problem hiding this comment.
One thing I'm not sure about is: does the Iceberg spec guarantee reliability of the min/max values for all types, and will this code work in the presence of them being absent?
presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergLogicalPlanner.java
Outdated
Show resolved
Hide resolved
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java
Outdated
Show resolved
Hide resolved
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java
Outdated
Show resolved
Hide resolved
I think it would work well in the absence of min/max values on some or all columns, as the filter on other columns except identity partition column just help to reduce the files which could be ensured to filter out. That is, they are something enhanced rather than mandatory. So, if they work well using the min/max values, it would be great. And if it do not filter out any files, it would be OK as well. The only situation which could lead in trouble might be that, there exists wrong min/max values in data file which causing us to wrongly filter out some valid partitions. And if it really happens, I think that would be a bug in Iceberg implementation which need to be fixed. |
Sorry that the previous response may have missed the point. The strict requirement to apply this optimization is that all the columns(including group by key, aggregation key, filter key) used in tableScan below the aggregation must be identity partition column. So that it would work well in this scenario. |
|
Are there any scenarios where the min/max may be off slightly or corrupted? For example, if I have a huge string column, is it guaranteed that the max will not be truncated and be faithfully represented in the manifest files? |
In Iceberg spec, it described as follows: Referring to: https://iceberg.apache.org/spec/#scan-planning So I think it should ensure that the file filtering through min/max values would not make mistake, that is, it should never filter out valid files. I would try reading the code in iceberg to figure it out. But I think it shouldn't meet the scenarios as you mentioned, for Iceberg write data file and corresponding metadata in an atomic transaction. And if it indeed need to truncate the huge string column for max value(I'm not sure whether it need to do so), just truncate the string to a shorter but bigger one would be ok. |
|
@hantangwangd would you be able to double check this from the Iceberg community? If so, I'd actually be supportive of somehow ensuring metadata optimizations are enabled by default somehow just for the Iceberg connector. |
|
@tdcmeehan OK, I would try to figure out it. |
5b6ea5e to
38633b4
Compare
|
Hi @tdcmeehan, I found that the Iceberg community has supported aggregation push down for Spark, referring to issue #6622. It rely on the min/max/count stats of files to support the specified aggregation(min/max/count) pushing down. It's a little different from this PR, and is more similar with the issue #21885 you have risen(will discribe later). Referring to https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java#L202
That PR demonstrate two things:
It differs from this PR in several aspects:
To summarize, I think we can get the following conclusion:
Any misunderstanding please let me know, would be greatly appreciated for any information! @nastra @rdblue |
|
@hantangwangd thanks for the pointers. I also came across apache/iceberg#113 which confirms your findings. I think the above optimizations could be written as a connector-specific optimizer in Iceberg--what do you think? My worry about |
|
@tdcmeehan Makes sense, it would be more flexible and safe to written as a connector-specific optimizer in Iceberg. I will do it. |
3d14925 to
bd9465f
Compare
|
Hi @tdcmeehan , I have written a connector-specific metadata optimizer in Iceberg. It was written as a Please take a look, thanks! |
ZacBlanco
left a comment
There was a problem hiding this comment.
Took a first pass! Really neat optimizations. Had a few minor things
| RowExpression result = evaluateMinMax( | ||
| functionMetadataManager.getFunctionMetadata(node.getAggregations().get(outputVariable).getFunctionHandle()), | ||
| inputColumnValues.get(inputVariable)); |
There was a problem hiding this comment.
I'm curious about the benefit of calculating the aggregation result here vs just replacing the table scan with VALUES and letting the engine calculate the aggregation. I just worry about the complexity of maintaining this code.
There was a problem hiding this comment.
Intuitively, I think it could have benefits because it further optimizes min/max to greatest/least which can do calculating in batch.
There was a problem hiding this comment.
Have you done any benchmarks to compare running the aggregations in the here in the connector vs only replacing the table scans?
There was a problem hiding this comment.
I didn't do that, but it worth a try. I will figure it out.
There was a problem hiding this comment.
I wrote a benchmark test to compare min/max with least/greatest on ValuesNode with different rows. The result is as follows:
Benchmark (recordCount) Mode Cnt Score Error Units
BenchmarkMinMaxWithLeastGreatest.baseline 2 * 5 avgt 10 24.250 ± 0.304 ms/op
BenchmarkMinMaxWithLeastGreatest.baseline 2 * 50 avgt 10 24.429 ± 0.383 ms/op
BenchmarkMinMaxWithLeastGreatest.baseline 20 * 50 avgt 10 24.463 ± 0.445 ms/op
BenchmarkMinMaxWithLeastGreatest.baseline 200 * 50 avgt 10 24.233 ± 0.382 ms/op
BenchmarkMinMaxWithLeastGreatest.testWithFurtherFlag 2 * 5 avgt 10 36.249 ± 1.835 ms/op
BenchmarkMinMaxWithLeastGreatest.testWithFurtherFlag 2 * 50 avgt 10 36.407 ± 1.333 ms/op
BenchmarkMinMaxWithLeastGreatest.testWithFurtherFlag 20 * 50 avgt 10 43.463 ± 0.607 ms/op
BenchmarkMinMaxWithLeastGreatest.testWithFurtherFlag 200 * 50 avgt 10 191.785 ± 2.027 ms/op
BenchmarkMinMaxWithLeastGreatest.testWithoutFurtherFlag 2 * 5 avgt 10 34.020 ± 0.798 ms/op
BenchmarkMinMaxWithLeastGreatest.testWithoutFurtherFlag 2 * 50 avgt 10 34.072 ± 0.894 ms/op
BenchmarkMinMaxWithLeastGreatest.testWithoutFurtherFlag 20 * 50 avgt 10 49.647 ± 1.118 ms/op
BenchmarkMinMaxWithLeastGreatest.testWithoutFurtherFlag 200 * 50 avgt 10 286.988 ± 72.066 ms/op
The benchmark test code could be found here: hantangwangd@9943a6a
Seems when row count is less than 1000, the two are roughly similar, with least/greatest have a little higher overhead. But when row count is larger, least/greatest would be obviously faster. The reason might be that they have the time complexity of n/100.
There was a problem hiding this comment.
By the way, the problems about huge ValuesNode make it more worthy to do further optimization by pre-calculating with least/greatest.
There was a problem hiding this comment.
I think we need a way to disable these optimization in this PR in the case that they cause issues on the coordinator due to memory pressure/GC issues. Either through having some flag which sets a max row count in a ValuesNode, or a boolean flag to disable it altogether.
There was a problem hiding this comment.
Sure, makes sense. I think a limit on row count for this optimization would be better than a boolean flag, as the queries on not so large number partitions would still get benefits when memory pressure is not so high, and we can explicitly define the value 0 as completely disable this optimization and document that.
I think the default value should be larger than optimizer.optimize-metadata-queries-call-threshold's default value 100 used in MetadataQueryOptimizer, for Iceberg have no need to call metastore to confirm whether each partition is valid like hive does. Maybe set to 1000, any thoughts?
There was a problem hiding this comment.
I have thought about the limitation on partitions number in this optimization, and I think we should apply the limit when getting the partitions. When we get table layout for an Iceberg table we should not urgently load all the partitions there, but just return a lazily loaded data structure, it should be something like LazyBlock. We can loaded the data by calling assureLoaded() when necessary, and if it return false or throw an exception, we would simply skip this optimization. And when all the partitions are already loaded, we could just do the optimization. And if that's a feasible solution, then may be a subsequent separate PR would be better, what's you opinion?
I think that sounds like a reasonable solution. I am okay with waiting for a PR as long as we have a config flag added.
I think a limit on row count for this optimization would be better than a boolean flag, as the queries on not so large number partitions would still get benefits when memory pressure is not so high, and we can explicitly define the value 0 as completely disable this optimization and document that.
I agree with this.
I think the default value should be larger than optimizer.optimize-metadata-queries-call-threshold's default value 100 used in MetadataQueryOptimizer, for Iceberg have no need to call metastore to confirm whether each partition is valid like hive does. Maybe set to 1000, any thoughts
I think the concern about metadata calls is an adjacent issue, but one that can't be avoided from the iceberg side. Getting the list of partitions requires generating a scan plan which means loading the manifests. As long as those manifests are being cached and not requiring a additional metadata lookups, I think we don't need to worry because during actual scan planning those manifests would need to have been loaded anyways without this optimization.
The real issue is the memory-performance tradeoff. From the benchmarks we know that there is a benefit at ~1k rows, but I'm not sure how much a few ms is going to save in the grand scheme of things (at least according to your benchmark). I don't know the point at which memory usage could become an issue. I am happy to leave it at 1k for now. If you end up using this and prod and find a better value we can always tweak the default later.
Related: If we are adding configuration, we should add documentation for it as well.
There was a problem hiding this comment.
Sure, I will do this.
...ceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergPlanOptimizerProvider.java
Show resolved
Hide resolved
presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergLogicalPlanner.java
Outdated
Show resolved
Hide resolved
...to-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergMetadataOptimizer.java
Show resolved
Hide resolved
bd9465f to
688df0d
Compare
688df0d to
b8c0bb2
Compare
17cec68 to
1c57fc3
Compare
1c57fc3 to
e0155f1
Compare
|
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java
Show resolved
Hide resolved
...to-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergMetadataOptimizer.java
Show resolved
Hide resolved
...to-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergMetadataOptimizer.java
Show resolved
Hide resolved
e0155f1 to
4f76ba5
Compare
|
@aaneja Thanks for your review and message.
I think the answer for this question is no. We get partition informations by planning all the valid files in current snapshot of the table. Iceberg is based on immutable file format, we can only delete datas/rows via deleting the whole data files or writing delete files. So there would not exist any valid file that do not have any data. And if we find that any partition contains delete files, we would get empty partition informations directly and skip this optimization as we might meet the situation that one partition contains valid data files and delete files but eventually does not contain any data. So after these checks there is no need to check the validity of partitions again.
I think this optimization is very closely linked to the implementation details of the underlying storage connector. One example is the question discussed above. And further more, for Iceberg, we may have a series of changes that would affect this optimization, for example:
To sum up, extracting common code is the thing that we should always consider, and there indeed exists some extract space. But I think we can hold a while and consider about it once the evolution of Iceberg connector stabilizes a bit. What's your opinion? |
aaneja
left a comment
There was a problem hiding this comment.
The common pattern in the optimization is that if the Agg column(s) has a constrained TupleDomain, we can rewrite this Agg as a Values node
If we can extract this into a connector agnostic implementation, we reduce the test area significantly.
The Iceberg (or other connector) specific features that supply this TupleDomain can then evolve independently (with a much more easy to cover test area)
I don't have strong opinions on doing this right away, but that's how I see this evolving.
|
Yes, as you said, currently there is a common pattern. I think this optimization is mainly about using metadata message to optimize the cardinality-insensitive aggregations. That is, if we can enumerate the distinct tuples consistent of all the columns existing in aggregation keys and group-by keys through metadata, and the total amount of these tuples is not so large, we can do this optimization, translate the TableScanNode to a ValuesNode to gain some benefits. So the connector needs to determine whether it can exactly retrieve all the distinct tuples on specified columns under the maybe existing filter predicate through metadata. This common pattern is a somewhat coarse grained pattern. I think we can just wait a moment for it to go stable and make sure that no further optimizations would be added to evolve the pattern. Then we can re-check which part is connector independent and invariable at a more finer granularity, and extract the common pattern which maybe not exactly the same as current one. |
ZacBlanco
left a comment
There was a problem hiding this comment.
Two unrelated questions, otherwise LGTM
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java
Show resolved
Hide resolved
| if (type.equals(DATE) || type.equals(TIME) || type.equals(TIMESTAMP)) { | ||
| if (type.equals(TIMESTAMP) || type.equals(TIME)) { | ||
| return MICROSECONDS.toMillis(parseLong(valueString)); | ||
| } |
There was a problem hiding this comment.
I noticed this earlier when authoring the histogram PR. Iceberg specifies TIME and TIMESTAMP as microsecond precision. However, while we might be able to read/write partition metadata with the correct values here, I think there is issues with inter-engine interop here.
If you write a TIME[STAMP] field in Spark to an Iceberg table, I believe it should be microsecond precision. But if you write it with Presto, won't it write with millisecond-precision? I haven't done much investigation into this but it seems like we could be inadvertently writing incompatible data with Presto if we don't resolve the data to microseconds.
I don't think it's something we need to fix in this PR, but it may be worth updating the supported types to only include TIMESTAMP_MICROSECONDS and similar instead of standard TIMESTAMP for Iceberg tables..
There was a problem hiding this comment.
Agree, currently the TimestampType/TimeType indeed bring a lot inconvenient. We really need to start considering and updating the supported type in Iceberg to consist with the spec.
4f76ba5 to
2bd2d5d
Compare
2bd2d5d to
1c764d9
Compare
| @@ -24,6 +24,8 @@ public interface RowExpressionService | |||
|
|
|||
| ExpressionOptimizer getExpressionOptimizer(); | |||
There was a problem hiding this comment.
Did you consider just using the existing getExpressionOptimizer and using the optimize overload that accepts a variable resolver? We can use an identity resolver for this case.
There was a problem hiding this comment.
Thanks for the reminder, I have overlooked the method in ExpressionOptimizer which can accept the parameter variableResolver. Will fix the code to use this method!
1c764d9 to
92fc1bd
Compare
|
@tdcmeehan I have change the logic in |
|
Nice work! |
…ng (#23645) ## Description Currently, when querying Iceberg tables, we always eagerly load all partition values of the table in `PickTableLayout` or `IcebergFilterPushdown` during the optimization phase. Due to the fact that this eagerly loaded partition values are currently only used in metadata based optimization rules, in many cases this information is not used at all. It can result in a lot of waste of resources and performance in the following cases: - For queries that cannot be optimized based on metadata, we do not use these partition values at all, so we shouldn't load them eagerly. - For tables with a huge number of partitions that are not suitable for metadata optimization, we need to limit the max number that can be loaded in the loading phase, rather than loading all of them first and then determine whether they exceed the threshold. (to be implemented in a subsequent PR) For details on queries that can be optimized using partitions message, see here: #22080. This PR makes the partition loading behavior lazy and only executes the loading when necessary. In this way, we can avoid a lot of unnecessary loading in many scenarios, as well as the resulting resource consumption and performance loss. The benchmark's results also support the above conclusion. We execute regular query statements, and query statements which are applicable for further reducible metadata optimization (which means always load and use all partitions) on tables with two different partition numbers: `300 * 2` and `1600 * 4`. The selected queries are as follows: ``` // Table `iceberg_partition` is partitioned by column `a` and `b`. create table iceberg_partition(a int, b int, c double) with (partitioning = ARRAY['a', 'b']); // This query will always be optimized using the partitions information, regardless of the partitions count. select min(a), max(a), min(b), max(b) from iceberg_partition; // This query does not need partitions information at all. select a, c from iceberg_partition where b >= xxx; ``` The benchmark test result before this change is as follows: ``` ----Before this change, always load partitions eagerly---- Benchmark (recordCount) Mode Cnt Score Error Units BenchmarkIcebergLazyLoading.testFurtherOptimize 300 * 2 avgt 10 39.971 ± 1.133 ms/op BenchmarkIcebergLazyLoading.testFurtherOptimize 1600 * 4 avgt 10 66.593 ± 2.253 ms/op BenchmarkIcebergLazyLoading.testNormalQuery 300 * 2 avgt 10 166.274 ± 5.041 ms/op BenchmarkIcebergLazyLoading.testNormalQuery 1600 * 4 avgt 10 659.756 ± 69.708 ms/op ``` While the benchmark test result after this change is as follows: ``` ----After this change, lazy load partitions and check the max threshold in loading phase---- Benchmark (recordCount) Mode Cnt Score Error Units BenchmarkIcebergLazyLoading.testFurtherOptimize 300 * 2 avgt 10 50.806 ± 2.471 ms/op BenchmarkIcebergLazyLoading.testFurtherOptimize 1600 * 4 avgt 10 77.579 ± 1.025 ms/op BenchmarkIcebergLazyLoading.testNormalQuery 300 * 2 avgt 10 155.059 ± 4.522 ms/op BenchmarkIcebergLazyLoading.testNormalQuery 1600 * 4 avgt 10 596.955 ± 41.619 ms/op ``` Due to the issues mentioned above, we found that this change significantly improves the performance of queries that are not suitable for metadata optimization. This aligns with expectations, it introduces a minimal fixed cost for workloads that load all partitions. However, for most queries that don't need to load the table partitions, the performance gain scales significantly with the partitions count. ## Motivation and Context Make partitions loading for iceberg table lazy to avoid unnecessary loading ## Impact N/A ## Test Plan - Make sure the change do not affect existing tests - Newly added test case in `TestIcebergLogicalPlanner` to show the behaviors with different max partition thresholds - Newly added benchmark tests in `BenchmarkIcebergLazyLoading` to show the improvement in performance ## Contributor checklist - [x] Please make sure your submission complies with our [development](https://github.com/prestodb/presto/wiki/Presto-Development-Guidelines#development), [formatting](https://github.com/prestodb/presto/wiki/Presto-Development-Guidelines#formatting), [commit message](https://github.com/prestodb/presto/wiki/Review-and-Commit-guidelines#commit-formatting-and-pull-requests), and [attribution guidelines](https://github.com/prestodb/presto/wiki/Review-and-Commit-guidelines#attribution). - [x] PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced. - [x] Documented new properties (with its default value), SQL syntax, functions, or other functionality. - [x] If release notes are required, they follow the [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines). - [x] Adequate tests were added if applicable. - [x] CI passed. ## Release Notes ``` == RELEASE NOTES == Iceberg Connector Changes * Improve partition loading for Iceberg tables by making it lazy, preventing unnecessary loading. ```
…ng (prestodb#23645) ## Description Currently, when querying Iceberg tables, we always eagerly load all partition values of the table in `PickTableLayout` or `IcebergFilterPushdown` during the optimization phase. Due to the fact that this eagerly loaded partition values are currently only used in metadata based optimization rules, in many cases this information is not used at all. It can result in a lot of waste of resources and performance in the following cases: - For queries that cannot be optimized based on metadata, we do not use these partition values at all, so we shouldn't load them eagerly. - For tables with a huge number of partitions that are not suitable for metadata optimization, we need to limit the max number that can be loaded in the loading phase, rather than loading all of them first and then determine whether they exceed the threshold. (to be implemented in a subsequent PR) For details on queries that can be optimized using partitions message, see here: prestodb#22080. This PR makes the partition loading behavior lazy and only executes the loading when necessary. In this way, we can avoid a lot of unnecessary loading in many scenarios, as well as the resulting resource consumption and performance loss. The benchmark's results also support the above conclusion. We execute regular query statements, and query statements which are applicable for further reducible metadata optimization (which means always load and use all partitions) on tables with two different partition numbers: `300 * 2` and `1600 * 4`. The selected queries are as follows: ``` // Table `iceberg_partition` is partitioned by column `a` and `b`. create table iceberg_partition(a int, b int, c double) with (partitioning = ARRAY['a', 'b']); // This query will always be optimized using the partitions information, regardless of the partitions count. select min(a), max(a), min(b), max(b) from iceberg_partition; // This query does not need partitions information at all. select a, c from iceberg_partition where b >= xxx; ``` The benchmark test result before this change is as follows: ``` ----Before this change, always load partitions eagerly---- Benchmark (recordCount) Mode Cnt Score Error Units BenchmarkIcebergLazyLoading.testFurtherOptimize 300 * 2 avgt 10 39.971 ± 1.133 ms/op BenchmarkIcebergLazyLoading.testFurtherOptimize 1600 * 4 avgt 10 66.593 ± 2.253 ms/op BenchmarkIcebergLazyLoading.testNormalQuery 300 * 2 avgt 10 166.274 ± 5.041 ms/op BenchmarkIcebergLazyLoading.testNormalQuery 1600 * 4 avgt 10 659.756 ± 69.708 ms/op ``` While the benchmark test result after this change is as follows: ``` ----After this change, lazy load partitions and check the max threshold in loading phase---- Benchmark (recordCount) Mode Cnt Score Error Units BenchmarkIcebergLazyLoading.testFurtherOptimize 300 * 2 avgt 10 50.806 ± 2.471 ms/op BenchmarkIcebergLazyLoading.testFurtherOptimize 1600 * 4 avgt 10 77.579 ± 1.025 ms/op BenchmarkIcebergLazyLoading.testNormalQuery 300 * 2 avgt 10 155.059 ± 4.522 ms/op BenchmarkIcebergLazyLoading.testNormalQuery 1600 * 4 avgt 10 596.955 ± 41.619 ms/op ``` Due to the issues mentioned above, we found that this change significantly improves the performance of queries that are not suitable for metadata optimization. This aligns with expectations, it introduces a minimal fixed cost for workloads that load all partitions. However, for most queries that don't need to load the table partitions, the performance gain scales significantly with the partitions count. ## Motivation and Context Make partitions loading for iceberg table lazy to avoid unnecessary loading ## Impact N/A ## Test Plan - Make sure the change do not affect existing tests - Newly added test case in `TestIcebergLogicalPlanner` to show the behaviors with different max partition thresholds - Newly added benchmark tests in `BenchmarkIcebergLazyLoading` to show the improvement in performance ## Contributor checklist - [x] Please make sure your submission complies with our [development](https://github.com/prestodb/presto/wiki/Presto-Development-Guidelines#development), [formatting](https://github.com/prestodb/presto/wiki/Presto-Development-Guidelines#formatting), [commit message](https://github.com/prestodb/presto/wiki/Review-and-Commit-guidelines#commit-formatting-and-pull-requests), and [attribution guidelines](https://github.com/prestodb/presto/wiki/Review-and-Commit-guidelines#attribution). - [x] PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced. - [x] Documented new properties (with its default value), SQL syntax, functions, or other functionality. - [x] If release notes are required, they follow the [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines). - [x] Adequate tests were added if applicable. - [x] CI passed. ## Release Notes ``` == RELEASE NOTES == Iceberg Connector Changes * Improve partition loading for Iceberg tables by making it lazy, preventing unnecessary loading. ```
Description
We can take advantage of metadata such as partition values in Iceberg to do some metadata optimizations. For example, we can optimize min/max/distinct aggregation on Iceberg table's partition columns to a
ValuesNodewith or withoutAggregationNodeabove it. Or we can optimize query with filters that do not match any partitions to an emptyValuesNode.This PR try to take advantage of Iceberg partition values to do metadata query optimizations. Fixed various problems encountered when dealing with partition values, and ensured proper behavior in cases of partition evolution.
This optimization is mainly using metadata message to optimize the cardinality-insensitive aggregations. That is, if we can enumerate the distinct tuples consistent of all the columns existing in aggregation keys and group-by keys through metadata, and the total amount of these tuples is not so large, we can do this optimization, translate the
TableScanNodeto aValuesNodeto gain some benefits. For example, for an Iceberg table partitioned byARRAY['a', 'b'], the following queries could take advantage of this optimization:Note: the supported aggregation functions include the following: min, max, distinct, approx, approx_distinct. Additionally, all columns appearing in the aggregation keys and group-by keys must be identity partition columns.
Test Plan
Contributor checklist
Release Notes