Skip to content

Fix no value present scheduling failure with grouped execution#22538

Merged
rschlussel merged 2 commits intoprestodb:masterfrom
rschlussel:debug-scheduler-failure
Apr 17, 2024
Merged

Fix no value present scheduling failure with grouped execution#22538
rschlussel merged 2 commits intoprestodb:masterfrom
rschlussel:debug-scheduler-failure

Conversation

@rschlussel
Copy link
Contributor

@rschlussel rschlussel commented Apr 16, 2024

Description

Only use dynamic scheduling if all scans in a stage are using grouped execution

Motivation and Context

Previously queries with a mix of ungrouped and grouped scans in the same fragment could fail with the following stack trace:

java.util.NoSuchElementException: No value present
	at java.base/java.util.Optional.get(Optional.java:148)
	at com.facebook.presto.execution.scheduler.NodeScheduler.selectDistributionNodes(NodeScheduler.java:414)
	at com.facebook.presto.execution.scheduler.nodeSelection.SimpleNodeSelector.computeAssignments(SimpleNodeSelector.java:231)
	at com.facebook.presto.execution.scheduler.FixedSourcePartitionedScheduler$BucketedSplitPlacementPolicy.computeAssignments(FixedSourcePartitionedScheduler.java:316)
	at com.facebook.presto.execution.scheduler.SourcePartitionedScheduler.schedule(SourcePartitionedScheduler.java:273)
	at com.facebook.presto.execution.scheduler.FixedSourcePartitionedScheduler$AsGroupedSourceScheduler.schedule(FixedSourcePartitionedScheduler.java:353)
	at com.facebook.presto.execution.scheduler.FixedSourcePartitionedScheduler.schedule(FixedSourcePartitionedScheduler.java:240)
	at com.facebook.presto.execution.scheduler.SqlQueryScheduler.schedule(SqlQueryScheduler.java:434)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Example query shape:
SELECT count(*) from bucketed_table1 t1 join (SELECT * FROM bucketed_table2 where some_column in (SELECTk key FROM my_small_broadcast_table) t2 on t1.key=t2.key group by t1.key;

The plan for the relevant fragment would look as follows:

          AGGREGATION
              |
           JOIN
           /    \
          /      \
TableScan(grouped) SemiJoin
                    /   \
 TableScan(ungrouped)   RemoteSource (replicated exchange)

Impact

Fixes a bug with mix of joins and semijoins using grouped execution

Test Plan

Added new test

Contributor checklist

  • Please make sure your submission complies with our development, formatting, commit message, and attribution guidelines.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.

Release Notes

Please follow release notes guidelines and fill in the release notes below.

== RELEASE NOTES ==

General Changes
* Fix an error for some queries using a mix of joins and semi-joins when grouped execution is enabled.

@rschlussel rschlussel requested review from a team and jaystarshot as code owners April 16, 2024 21:13
@rschlussel rschlussel requested a review from presto-oss April 16, 2024 21:13
@rschlussel rschlussel force-pushed the debug-scheduler-failure branch from 85c8225 to 043ec4b Compare April 16, 2024 21:15
@rschlussel rschlussel requested a review from arhimondr April 16, 2024 21:16
@rschlussel rschlussel force-pushed the debug-scheduler-failure branch 4 times, most recently from dca7f13 to 9d7620f Compare April 17, 2024 00:48
Copy link
Member

@hantangwangd hantangwangd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice catch, overall looks good to me. A little question for discussing.

Comment on lines +345 to +347
if (plan.getFragment().getRemoteSourceNodes().stream().allMatch(node -> node.getExchangeType() == REPLICATE)
&& schedulingOrder.stream().allMatch(id -> plan.getFragment().getStageExecutionDescriptor().isScanGroupedExecution(id))) {
// no non-replicated remote source and all scans are grouped
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little question, do we need this change? Should a fragment with broadcast-only remote sources but couldn't support dynamic lifespan scheduling still go through this path?

Copy link
Contributor Author

@rschlussel rschlussel Apr 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I think you are right that we don't need this! The case you describe does go through here, but the stage execution descriptor won't be dynamicLifespanSchedule, and so we'll get a non-dynamic bucket node map in the code two lines below. Now I see that the else path is for stages that need a nodePartitionMap, which I believe is only needed for a partitioned remote source.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Dynamic lifespan schedule grouped execution assumes all scans are using
grouped execution.  Queries like the following can produce a plan where
some scans are grouped and some are ungrouped

SELECT count(*) from bucketed_table1 t1 join (SELECT * FROM bucketed_table2
where some_column in (SELECTk key FROM my_small_broadcast_table) t2 on
t1.key=t2.key group by t1.key;

The relevant would look as follows:

          AGGREGATION
              |
           JOIN
           /    \
          /      \
TableScan(grouped) SemiJoin
                    /   \
 TableScan(ungrouped)   RemoteSource (replicated exchange)

This fixes an error where such queries could fail with a "no value
present" error during node scheduling.
@rschlussel rschlussel force-pushed the debug-scheduler-failure branch from 9d7620f to 2c4c693 Compare April 17, 2024 13:43
Copy link
Member

@hantangwangd hantangwangd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@rschlussel rschlussel merged commit b398569 into prestodb:master Apr 17, 2024
@rschlussel rschlussel deleted the debug-scheduler-failure branch April 17, 2024 19:17
@wanglinsong wanglinsong mentioned this pull request Jun 25, 2024
36 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants