Skip to content

Implement sequential execution policy#10350

Merged
sopel39 merged 5 commits intotrinodb:masterfrom
starburstdata:ks/new_scheduler
Jan 3, 2022
Merged

Implement sequential execution policy#10350
sopel39 merged 5 commits intotrinodb:masterfrom
starburstdata:ks/new_scheduler

Conversation

@sopel39
Copy link
Copy Markdown
Member

@sopel39 sopel39 commented Dec 20, 2021

Currently, Trino uses AllAtOnceExecutionSchedule execution policy by default.
This makes all stages start at same time. This means that join probe splits take
valuable driver slots even though they cannot progress until build side stage completes.
This can throttle cluster resource utilization for more complex queries
or in high concurrency scenarios.

New sequential execution policy follows principles:
* schedules join build source stages before join probe source stages
* immediately schedules stages for which it's know that data is going to be consumed
  (e.g there is final aggregation downstream)
    Benchmark results:
            label                   TPCH wall time  TPC-DS wall time        TPCH CPU time   TPC-DS CPU time         TPCH peak mem   TPC-DS peak mem
    0       before sequential part  937.515833      1184.331667             111597.4        135037.311333           2.144486e+09    1.284780e+09
    1       after sequential part   829.780167      1133.327333             111747.4        132606.885667           2.141816e+09    1.267524e+09

            label                   TPCH wall time  TPC-DS wall time        TPCH CPU time   TPC-DS CPU time         TPCH peak mem   TPC-DS peak mem
    0       before seq unpart       769.708500      1925.4000               98571.0         237859.442667           2.093233e+09    1.147734e+09
    1       after seq unpart        758.691167      1816.2895               99912.3         242806.413833           2.082893e+09    1.132478e+09
Improvments
* part tpch wall time 11.5%
* part tpcds wall time ~4.5%
* unpart tpcds wall time ~5.7%

Fixes #10269

@cla-bot cla-bot bot added the cla-signed label Dec 20, 2021
@sopel39
Copy link
Copy Markdown
Member Author

sopel39 commented Dec 20, 2021

cc @martint

@sopel39
Copy link
Copy Markdown
Member Author

sopel39 commented Dec 20, 2021

Query example:
tpch/q18 partitioned with default filter factor enabled:

all-at-once
Query 20211218_202329_00007_nj9xz, FINISHED, 6 nodes
Splits: 10 023 total, 10 023 done (100,00%)
4:03 [13,7B rows, 62,9GB] [56,1M rows/s, 265MB/s]

sequential
Query 20211218_202054_00005_nj9xz, FINISHED, 6 nodes
Splits: 10 083 total, 10 083 done (100,00%)
1:31 [13,7B rows, 63GB] [150M rows/s, 708MB/s]

@findepi
Copy link
Copy Markdown
Member

findepi commented Dec 20, 2021

cc @losipiuk

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we want to switch the default. You mentioned in the commit message that overally sequential provides perf improvement. But are there queries which degraded with the change?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we want to switch the default. You mentioned in the commit message that overally sequential provides perf improvement. But are there queries which degraded with the change?

I don't think there are. Some small things:

  • there might be slight CPU increase for some queries due to more concurrency now at source stage. Source stage
    concurrency is another topic to look at.
  • isAnyTaskBlocked is not propagated by Future yet, but it shouldn't happen in practice too much because of
    broadcast join limits).

Overall this is much, much better than current scheduler in high concurrency workloads and complex queries.
Hence IMO benefits greatly exceeds risks, which are small here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess low latency queries can degrade. For example if the build side is extremely small it might be better to start scheduling probe tasks right away.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't observed that. Current code starts next stage when parent stage enters SCHEDULED stage (all splits are scheduled, but they still might be running).

Note that with current notification mechanism (via futures) we respond to task/stage state change very fast

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: this allows for scheduling of dependent stage before parent stage completes or enters flushing state.
I've benchmarked code where we consider flushing and done stages to be completed, but results were not better (there was even a slight duration regression for tpcds):

label TPCH wall time TPC-DS wall time TPCH CPU time TPC-DS CPU time TPCH peak mem TPC-DS peak mem
before staged unpart 769.708500 1925.400000 98571.0 237859.442667 2.093233e+09 1.147734e+09
after staged unpart 758.691167 1816.289500 99912.3 242806.413833 2.082893e+09 1.132478e+09
after staged unpart compl 757.288833 1848.024833 99515.5 242440.454833 2.082462e+09 1.101463e+09
label TPCH wall time TPC-DS wall time TPCH CPU time TPC-DS CPU time TPCH peak mem TPC-DS peak mem
before staged part 937.515833 1184.331667 111597.4 135037.311333 2.144486e+09 1.284780e+09
after staged part 829.780167 1133.327333 111747.4 132606.885667 2.141816e+09 1.267524e+09
after staged part compl 827.721667 1153.291667 110661.2 133056.365000 2.148532e+09 1.246323e+09

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is useful comment - please put that in the code.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly I never really understood this condition. In theory as long as there's at least a single stage that is not blocked the scheduler should keep scheduling. Currently the logic is "as long as there's at least a single stage that is blocked wait for at least a single stage to get unblocked or wait a second".

@dain Do you remember why it is implemented this way?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because not every signal is governed by Futures. Examples are:

  • isAnyTaskBlocked
  • is task memory full
  • ...
    etc.
    We might improve that as we go (propagate more signals with futures), but for now we have this loop.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess low latency queries can degrade. For example if the build side is extremely small it might be better to start scheduling probe tasks right away.

@lhofhansl
Copy link
Copy Markdown
Member

It's not "sequential" as such, right? More like a topological sort for stage ordering.
Not important, but perhaps a different name might be more helpful. Maybe "ordered", "sorted", "joinordered", "optimized", "dependent", etc...?

Copy link
Copy Markdown
Member Author

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ac

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't observed that. Current code starts next stage when parent stage enters SCHEDULED stage (all splits are scheduled, but they still might be running).

Note that with current notification mechanism (via futures) we respond to task/stage state change very fast

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is the contract of the future. It can only be finished once and then it's to be disposed. Asking for isBlocked at different time might yield different results (since situation has changed)

We have similar contract in places like:

io.trino.operator.exchange.LocalExchangeSource#waitForReading
io.trino.operator.exchange.LocalExchangeMemoryManager#getNotFullFuture

and possibly others.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While visiting plan nodes Fragments does not contain currently visited stage. However, processFragment result does contain stage for which it was invoked. Maybe we could split Fragments into some intermediate structure (while visting plan nodes) and final one, but I don't think it would be simpler or easier to understand.

@sopel39 sopel39 force-pushed the ks/new_scheduler branch 2 times, most recently from 8a554eb to e5b18b6 Compare December 27, 2021 21:38
This makes it easier to write scheduler tests
Currently, Trino uses AllAtOnceExecutionSchedule execution policy by default.
This makes all stages start at same time. This means that join probe splits take
valuable driver slots even though they cannot progress until build side stage completes.
This can throttle cluster resource utilization for more complex queries
or in high concurrency scenarios.

New sequential execution policy follows principles:
* schedules join build source stages before join probe source stages
* immediately schedules stages for which it's know that data is going to be consumed
  (e.g there is final aggregation downstream)

Benchmark results:
	label 			TPCH wall time 	TPC-DS wall time 	TPCH CPU time 	TPC-DS CPU time 	TPCH peak mem 	TPC-DS peak mem
0 	before sequential part 	937.515833 	1184.331667 		111597.4 	135037.311333 		2.144486e+09 	1.284780e+09
1 	after sequential part 	829.780167 	1133.327333 		111747.4 	132606.885667 		2.141816e+09 	1.267524e+09

        label 			TPCH wall time 	TPC-DS wall time 	TPCH CPU time 	TPC-DS CPU time 	TPCH peak mem 	TPC-DS peak mem
0 	before seq unpart 	769.708500 	1925.4000 		98571.0 	237859.442667 		2.093233e+09 	1.147734e+09
1 	after seq unpart 	758.691167 	1816.2895 		99912.3 	242806.413833 		2.082893e+09 	1.132478e+09

Improvments
* part tpch wall time 11.5%
* part tpcds wall time ~4.5%
* unpart tpcds wall time ~5.7%
@sopel39
Copy link
Copy Markdown
Member Author

sopel39 commented Jan 3, 2022

Will enable phased scheduler in separate PR

@sopel39 sopel39 merged commit 58f9acc into trinodb:master Jan 3, 2022
@sopel39 sopel39 deleted the ks/new_scheduler branch January 3, 2022 23:08
@github-actions github-actions bot added this to the 368 milestone Jan 3, 2022
import static java.util.function.Function.identity;

/**
* Schedules stages choosing to order to provide the best resource utilization.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

provide the best resource utilization.

This is misleading. Resource utilization is governed by many other factors unrelated to the order in which stages are scheduled.

What this strategy avoid is scheduling stages that will immediately block waiting for another stage to start producing data. In a way, if I'm understanding the code correctly, it schedules stages that form full non-blocking pipelines in the order in which they are able to make progress.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. That's how it works. Do you want a suggestion for a better name? I would use phased execution policy, but it's already taken

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. maybe we could just take the name "phased execution" and call the previous one "legacy phased execution". If this is better in every way, we'll eventually get rid of the old one, anyway.

Incidentally, back when we added phased execution, the original intent was to eventually take advantage of knowledge of which operators were "blocking" vs which ones could make progress in a pipelined manner for the purpose of scheduling stages. We could think of this PR as finally realizing that, but implementing it in a way that provides an escape hatch for the old behavior in case of bugs.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed in #10455

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

Implement better scheduler for Trino

7 participants