Skip to content

Add ability to run AQE with optimizations end-to-end#19563

Merged
rschlussel merged 8 commits intoprestodb:masterfrom
rschlussel:aqe-stats-2
Jun 12, 2023
Merged

Add ability to run AQE with optimizations end-to-end#19563
rschlussel merged 8 commits intoprestodb:masterfrom
rschlussel:aqe-stats-2

Conversation

@rschlussel
Copy link
Contributor

@rschlussel rschlussel commented May 4, 2023

Test plan - added unit tests. TODO: do a verifier run

This PR adds the missing features to complete the work to incorporate the PickJoinSides optimization into AQE

  1. Adds a new stats calculator for Presto on Spark that chooses runtime stats over historical stats if they are different
  2. Supports size-based joins for remote source nodes in PickJoinSides
  3. Fixes PickJoinSides to correctly set local exchanges
  4. Adds a re-optimization step into the PrestoSparkAdaptiveExecutor
== RELEASE NOTES ==

Presto on Spark Changes
* Add optimization to switch the build and probe sides of a join at runtime when a query runs with adaptive execution.  This optimizer can be enabled by setting the session property ``adaptive_join_side_switching_enabeld = true`` or configuration property ``optimizer.adaptive-join-side-switching-enabled = true``

@rschlussel rschlussel force-pushed the aqe-stats-2 branch 6 times, most recently from 0ae884e to 9e86cf1 Compare May 4, 2023 20:37
@rschlussel rschlussel marked this pull request as ready for review May 5, 2023 15:23
@rschlussel rschlussel requested a review from a team as a code owner May 5, 2023 15:23
Copy link
Contributor

@pranjalssh pranjalssh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed till commit 4

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test has crazy union alls now because PrestoSparkRowOutputOperator has a minimum row batch target size of about 1mb, which is also about the size of a hash table for a single orders table, so I needed to make that side even bigger to be able to validate the memory used by the join operator. Additionally, with optimize_hash_generation disabled, the row batch ends up having a retained size a bit bigger than 5mb (haven't gotten to the bottom of exactly why), so I needed to make the other side of the join big enough that it would fail on the build side with a 6mb memory limit.

@rschlussel rschlussel force-pushed the aqe-stats-2 branch 2 times, most recently from 1e8977b to b8eb408 Compare May 12, 2023 18:33
@rschlussel
Copy link
Contributor Author

Gah... PrestoSparkRowOutputOperator memory usage is so temperamental and so close to the memory needed for the bigger table on the build side. it's making this pair of tests impossible to write.

com.facebook.presto.spark.adaptive.execution.TestPrestoSparkAdaptiveJoinQueries.testQuerySucceedsWithAQE  Time elapsed: 3.554 s  <<< FAILURE!
2023-05-12T19:17:58.5260588Z java.lang.AssertionError: Execution of 'actual' query failed: SELECT * FROM nation n JOIN (SELECT * from orders UNION ALL SELECT * FROM orders  UNION ALL SELECT * FROM orders UNION ALL SELECT * FROM orders UNION ALL SELECT * FROM orders UNION ALL SELECT * FROM orders UNION ALL SELECT * FROM orders) o ON n.nationkey = o.orderkey
2023-05-12T19:17:58.5261456Z 	at org.testng.Assert.fail(Assert.java:98)
2023-05-12T19:17:58.5262073Z 	at com.facebook.presto.tests.QueryAssertions.assertQuery(QueryAssertions.java:178)
2023-05-12T19:17:58.5262801Z 	at com.facebook.presto.tests.QueryAssertions.assertQuery(QueryAssertions.java:106)
2023-05-12T19:17:58.5263632Z 	at com.facebook.presto.tests.AbstractTestQueryFramework.assertQuery(AbstractTestQueryFramework.java:149)
2023-05-12T19:17:58.5264831Z 	at com.facebook.presto.spark.adaptive.execution.TestPrestoSparkAdaptiveJoinQueries.testQuerySucceedsWithAQE(TestPrestoSparkAdaptiveJoinQueries.java:55)
2023-05-12T19:17:58.5265863Z 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2023-05-12T19:17:58.5266492Z 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2023-05-12T19:17:58.5267246Z 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2023-05-12T19:17:58.5267871Z 	at java.lang.reflect.Method.invoke(Method.java:498)
2023-05-12T19:17:58.5268592Z 	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:135)
2023-05-12T19:17:58.5269414Z 	at org.testng.internal.invokers.TestInvoker.invokeMethod(TestInvoker.java:673)
2023-05-12T19:17:58.5270146Z 	at org.testng.internal.invokers.TestInvoker.invokeTestMethod(TestInvoker.java:220)
2023-05-12T19:17:58.5270992Z 	at org.testng.internal.invokers.MethodRunner.runInSequence(MethodRunner.java:50)
2023-05-12T19:17:58.5272259Z 	at org.testng.internal.invokers.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:945)
2023-05-12T19:17:58.5273331Z 	at org.testng.internal.invokers.TestInvoker.invokeTestMethods(TestInvoker.java:193)
2023-05-12T19:17:58.5274514Z 	at org.testng.internal.invokers.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:146)
2023-05-12T19:17:58.5275578Z 	at org.testng.internal.invokers.TestMethodWorker.run(TestMethodWorker.java:128)
2023-05-12T19:17:58.5276602Z 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
2023-05-12T19:17:58.5277776Z 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
2023-05-12T19:17:58.5278344Z 	at java.lang.Thread.run(Thread.java:750)
2023-05-12T19:17:58.5288315Z Caused by: com.facebook.presto.ExceededMemoryLimitException: Query exceeded per-node total memory limit of 3MB [Allocated: 2.84MB, Delta: 1.28MB (PrestoSparkRowOutputOperator), Top Consumers: {PrestoSparkRowOutputOperator=2.55MB, ScanFilterAndProjectOperator=294.38kB}, Details: [{"taskId":"2.0.2","reservation":"2.84MB","topConsumers":[{"type":"PrestoSparkRowOutputOperator","planNodeId":"758","reservations":["1.28MB","1.28MB","0B","0B"],"total":"2.55MB"},{"type":"ScanFilterAndProjectOperator","planNodeId":"871","reservations":["294.38kB"],"total":"294.38kB"}]}]]
2023-05-12T19:17:58.5290108Z 	at com.facebook.presto.spark.util.PrestoSparkFailureUtils.toPrestoSparkFailure(PrestoSparkFailureUtils.java:61)
2023-05-12T19:17:58.5291154Z 	at com.facebook.presto.spark.execution.AbstractPrestoSparkQueryExecution.execute(AbstractPrestoSparkQueryExecution.java:388)
2023-05-12T19:17:58.5292120Z 	at com.facebook.presto.spark.PrestoSparkQueryRunner.execute(PrestoSparkQueryRunner.java:514)
2023-05-12T19:17:58.5292915Z 	at com.facebook.presto.spark.PrestoSparkQueryRunner.execute(PrestoSparkQueryRunner.java:497)
2023-05-12T19:17:58.5293684Z 	at com.facebook.presto.tests.QueryAssertions.assertQuery(QueryAssertions.java:175)
2023-05-12T19:17:58.5294200Z 	... 18 more
2023-05-12T19:17:58.5294432Z 
2023-05-12T19:17:58.5295179Z [ERROR] com.facebook.presto.spark.adaptive.execution.TestPrestoSparkAdaptiveSpilledJoinQueries.testQuerySucceedsWithAQE  Time elapsed: 2.291 s  <<< FAILURE!
2023-05-12T19:17:58.5299983Z java.lang.AssertionError: Execution of 'actual' query failed: SELECT * FROM nation n JOIN (SELECT * from orders UNION ALL SELECT * FROM orders  UNION ALL SELECT * FROM orders UNION ALL SELECT * FROM orders UNION ALL SELECT * FROM orders UNION ALL SELECT * FROM orders UNION ALL SELECT * FROM orders) o ON n.nationkey = o.orderkey
2023-05-12T19:17:58.5300829Z 	at org.testng.Assert.fail(Assert.java:98)
2023-05-12T19:17:58.5301419Z 	at com.facebook.presto.tests.QueryAssertions.assertQuery(QueryAssertions.java:178)
2023-05-12T19:17:58.5302159Z 	at com.facebook.presto.tests.QueryAssertions.assertQuery(QueryAssertions.java:106)
2023-05-12T19:17:58.5303016Z 	at com.facebook.presto.tests.AbstractTestQueryFramework.assertQuery(AbstractTestQueryFramework.java:149)
2023-05-12T19:17:58.5305334Z 	at com.facebook.presto.spark.adaptive.execution.TestPrestoSparkAdaptiveJoinQueries.testQuerySucceedsWithAQE(TestPrestoSparkAdaptiveJoinQueries.java:55)
2023-05-12T19:17:58.5306357Z 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2023-05-12T19:17:58.5307030Z 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2023-05-12T19:17:58.5307774Z 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2023-05-12T19:17:58.5311391Z 	at java.lang.reflect.Method.invoke(Method.java:498)
2023-05-12T19:17:58.5315269Z 	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:135)
2023-05-12T19:17:58.5315984Z 	at org.testng.internal.invokers.TestInvoker.invokeMethod(TestInvoker.java:673)
2023-05-12T19:17:58.5316617Z 	at org.testng.internal.invokers.TestInvoker.invokeTestMethod(TestInvoker.java:220)
2023-05-12T19:17:58.5317257Z 	at org.testng.internal.invokers.MethodRunner.runInSequence(MethodRunner.java:50)
2023-05-12T19:17:58.5317900Z 	at org.testng.internal.invokers.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:945)
2023-05-12T19:17:58.5318546Z 	at org.testng.internal.invokers.TestInvoker.invokeTestMethods(TestInvoker.java:193)
2023-05-12T19:17:58.5321846Z 	at org.testng.internal.invokers.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:146)
2023-05-12T19:17:58.5322503Z 	at org.testng.internal.invokers.TestMethodWorker.run(TestMethodWorker.java:128)
2023-05-12T19:17:58.5323124Z 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
2023-05-12T19:17:58.5324002Z 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
2023-05-12T19:17:58.5324460Z 	at java.lang.Thread.run(Thread.java:750)
2023-05-12T19:17:58.5329980Z Caused by: com.facebook.presto.ExceededMemoryLimitException: Query exceeded per-node total memory limit of 3MB [Allocated: 2.55MB, Delta: 1.28MB (PrestoSparkRowOutputOperator), Top Consumers: {PrestoSparkRowOutputOperator=2.55MB, ScanFilterAndProjectOperator=1.63kB}, Details: [{"taskId":"2.0.0","reservation":"2.55MB","topConsumers":[{"type":"PrestoSparkRowOutputOperator","planNodeId":"758","reservations":["1.28MB","1.28MB","0B","0B"],"total":"2.55MB"},{"type":"ScanFilterAndProjectOperator","planNodeId":"871","reservations":["1.63kB"],"total":"1.63kB"}]}]]
2023-05-12T19:17:58.5331766Z 	at com.facebook.presto.spark.util.PrestoSparkFailureUtils.toPrestoSparkFailure(PrestoSparkFailureUtils.java:61)
2023-05-12T19:17:58.5333554Z 	at com.facebook.presto.spark.execution.AbstractPrestoSparkQueryExecution.execute(AbstractPrestoSparkQueryExecution.java:388)
2023-05-12T19:17:58.5334352Z 	at com.facebook.presto.spark.PrestoSparkQueryRunner.execute(PrestoSparkQueryRunner.java:514)
2023-05-12T19:17:58.5335013Z 	at com.facebook.presto.spark.PrestoSparkQueryRunner.execute(PrestoSparkQueryRunner.java:497)
2023-05-12T19:17:58.5335825Z 	at com.facebook.presto.tests.QueryAssertions.assertQuery(QueryAssertions.java:175)
2023-05-12T19:17:58.5336250Z 	... 18 more
2023-05-12T19:17:58.5336675Z 
2023-05-12T19:17:58.8218219Z [INFO] 
2023-05-12T19:17:58.8218805Z [INFO] Results:
2023-05-12T19:17:58.8219140Z [INFO] 
2023-05-12T19:17:58.8219472Z [ERROR] Failures: 
2023-05-12T19:17:58.8226290Z [ERROR]   TestPrestoSparkAdaptiveJoinQueries.testQuerySucceedsWithAQE:55-

Copy link
Contributor

@pranjalssh pranjalssh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM till commit 6

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you checked whether hbo stats come up correctly for a remotesourcenode? It may not have a statsequivalentplannode attached

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great catch. it wasn't getting populated correctly. Didn't notice since i added it explicitly in my test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put the fix for this earlier in the stack, so commit 6 is now commit 7

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will a test like this work:

SELECT * FROM nation n JOIN (SELECT * FROM orders cross join unnest sequence(1, 50)) o ON n.nationkey = o.orderkey

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, wouldn't help.

The PrestoSparkStatsCalculator uses runtime stats if they differ from
historical stats.
Allow using stats from RemoteSourceNodes for size-based join. This
is necessary for runtime optimizations.
Join build and probe sides have different requirements for the local
distribution they need. However, at runtime local exchanges have already
been added, so when we flip join sides, we also need to adjust local
exchanges on the left and right accordingly.

Extract the code that handles this from RuntimeReorderJoinSides to a
common utility class so we can reuse it for this optimizer.

Without this change, queries will have wrong results. The test for
this is in the commit that adds the reoptimization step for AQE.
Add re-optimization step for Presto on Spark adaptive execution
Move methods shared between multiple join swapping optimizers to
JoinSwappingUtils.
Copy link
Contributor

@pranjalssh pranjalssh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good

@rschlussel rschlussel merged commit 8ec1756 into prestodb:master Jun 12, 2023
@wanglinsong wanglinsong mentioned this pull request Jul 27, 2023
28 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants