Schedule dynamic filtering collecting task immediately#10868
Schedule dynamic filtering collecting task immediately#10868sopel39 merged 2 commits intotrinodb:masterfrom
Conversation
9da1326 to
697c6a1
Compare
plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveDynamicPartitionPruningTest.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveDynamicPartitionPruningTest.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveDynamicPartitionPruningTest.java
Outdated
Show resolved
Hide resolved
697c6a1 to
30783f4
Compare
dain
left a comment
There was a problem hiding this comment.
The scheduler stuff and the tests look good to me. I don't understand the implications of the change to DynamicFilterService.
...no-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergDynamicPartitionPruningTest.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/SourcePartitionedScheduler.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/SourcePartitionedScheduler.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/SourcePartitionedScheduler.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/SourcePartitionedScheduler.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/server/DynamicFilterService.java
Outdated
Show resolved
Hide resolved
30783f4 to
fa077da
Compare
fa077da to
79eb0ef
Compare
|
ac @raunaqmorarka @dain . I've narrowed down when phased scheduler won't start join stage immediately to only non-fixed source stages. I've also improved tests with regards to source stage partitioning |
d4b6e8a to
6dc46de
Compare
core/trino-main/src/main/java/io/trino/server/DynamicFilterService.java
Outdated
Show resolved
Hide resolved
arhimondr
left a comment
There was a problem hiding this comment.
It doesn't feel like I really understand the problem. Added some questions.
It would be great if you could extract refactoring and improvements that are unrelated to have a commit that only does what's needed to address the problem.
Additionally if you could elaborate more on a problem in the commit message it would be great. Maybe you can provide an example of a query that triggers a deadlock with a description of what join distributions are, where are the stage boundaries and how dynamic filters interact with each other.
core/trino-main/src/main/java/io/trino/execution/scheduler/SourcePartitionedScheduler.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/server/DynamicFilterService.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/server/DynamicFilterService.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/policy/PhasedExecutionSchedule.java
Outdated
Show resolved
Hide resolved
333e82e to
d50a4c8
Compare
|
@arhimondr I've simplified this PR (removed refactor), keeping just needed parts. i cannot answer some outdated comments
I've changed to logic to create collecting task if there is any lazy DF produced by stage. This is needed because there might be consumers of that lazy DF outside of stage.
There won't be extra collecting task for partitioned stages. Hence, if there are any lazy DFs produced by that stage, then stage needs to be scheduled without delay |
d50a4c8 to
7ca7808
Compare
3655c3b to
30c9834
Compare
30c9834 to
0901a25
Compare
core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/StageScheduler.java
Outdated
Show resolved
Hide resolved
In case of plan
J1
/ \
J2 S3
/ \
S1 S2
It might happen that dynamic filtering evaluation order is:
S3 => S2 => S1
With phased scheduler source stage consisting of
(J1, J2, S1) won't be scheduled until stages running S3 and S2
have finished split enumaration. However, it might happen
that S2 is waiting for dynamic filters produced for S3.
In that case, S2 will never complete because DFs for S3
are collected in stage (J1, J2, S1) which won't be scheduled
until all S2 split are enumerated.
This commit makes scheduling of DF collecting task immediately
which will prevent queries from deadlock.
0901a25 to
bce823e
Compare
|
Thanks for review |
In case of plan
J1
/
J2 S3
/
S1 S2
It might happen that that dynamic filtering dependencies are:
S3 => S3 => S1
With phased scheduler source stage consisting of
(J1, J2, S1) won't be scheduler until stages with S3 and S2
have finished split enumaration. However, it might happen
that S2 is waiting for dynamic filters for S3. In that case,
S3 will never complete because DFs for S3 are collected in
stage (J1, J2, S1).
This commit makes scheduling of DF collecting task immediately
which will prevent queries from deadlock.