Skip to content

Support dynamic filtering with task retries#12152

Merged
raunaqmorarka merged 6 commits intotrinodb:masterfrom
raunaqmorarka:df-batch
Jul 27, 2022
Merged

Support dynamic filtering with task retries#12152
raunaqmorarka merged 6 commits intotrinodb:masterfrom
raunaqmorarka:df-batch

Conversation

@raunaqmorarka
Copy link
Member

@raunaqmorarka raunaqmorarka commented Apr 27, 2022

Description

High level changes:
Add a new plan node DynamicFilterSourceNode for DF collection in build source stage.
Add a new optimizer rule AddDynamicFilterSource near the end in PlanOptimizers which matches for a join with DFs with remote exchange as right child and adds new plan node below the exchange.
Changes to LocalExecutionPlanner to add DynamicFilterSourceOperator based on DynamicFilterSourceNode.
Change to DynamicFilterSourceOperator to short-circuit dynamic filter collection in source stage for subsequent splits if the initial splits already exceeded collection thresholds.
Changes to dynamic filter collection on worker node to keep "final" dynamic filter for collection after successful completion of task.
Changes to DynamicFiltersCollector to track whether the current version of dynamic filter collected is the "final" one. This allows HttpRemoteTask on the coordinator to figure out whether DynamicFiltersFetcher needs to the fetch the collected DF from the worker after task completion.
Changes to DynamicFilterService to make it aware about task retry mode.
All the changes implement DF collection for task level retry mode while keeping existing pipelined mode execution as-is.

Is this change a fix, improvement, new feature, refactoring, or other?

new feature

Is this a change to the core query engine, a connector, client library, or the SPI interfaces? (be specific)

core query engine

How would you describe this change to a non-technical end user or system administrator?

Implements support for dynamic filtering in task retry mode of execution

Related issues, pull requests, and links

fixes #9935

Documentation

(x) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.

Release notes

( ) No release notes entries required.
(x) Release notes entries required with the following suggested text:

# General
* Support dynamic filtering with task retries. ({issue}`9935`)

@sopel39
Copy link
Member

sopel39 commented Apr 27, 2022

@arhimondr could you do first pass?

@findepi findepi removed the WIP label Apr 27, 2022
@arhimondr arhimondr requested a review from losipiuk April 27, 2022 14:36
@raunaqmorarka raunaqmorarka force-pushed the df-batch branch 3 times, most recently from a90224b to 1c8b8a8 Compare April 28, 2022 08:15
@raunaqmorarka raunaqmorarka force-pushed the df-batch branch 8 times, most recently from 48aa5bd to 16ac46c Compare April 29, 2022 07:45
Copy link
Member

@losipiuk losipiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@raunaqmorarka It is super hard to comprehend (for me). Can you put up a short writeup explaining in simple English what is the data passing flow for DF with task retries; and how does it differ from pipelined case.

We can also chat but without whiteboard it will not be easy to discuss I think :/

@raunaqmorarka raunaqmorarka force-pushed the df-batch branch 8 times, most recently from 407e39d to e674b97 Compare May 6, 2022 09:50
@raunaqmorarka raunaqmorarka force-pushed the df-batch branch 2 times, most recently from 1471587 to b9bf25b Compare May 10, 2022 11:12
@raunaqmorarka raunaqmorarka force-pushed the df-batch branch 2 times, most recently from 6b5f803 to 6e4aac7 Compare July 16, 2022 08:57
@raunaqmorarka raunaqmorarka requested a review from arhimondr July 16, 2022 08:58
@raunaqmorarka raunaqmorarka force-pushed the df-batch branch 3 times, most recently from 728532b to fc70f11 Compare July 20, 2022 07:07
@raunaqmorarka raunaqmorarka force-pushed the df-batch branch 4 times, most recently from 87378b0 to a05618f Compare July 23, 2022 05:49
@raunaqmorarka
Copy link
Member Author

TPC benchmark results
Tardigrade ORC sf1000 partitioned.pdf
Tardigrade ORC sf1000 unpartitioned.pdf

Summary:
Big improvements to partitioned TPCDS, no major change to unpartitioned TPCDS
No major change to partitioned TPCH, slightly worse on unpartitioned TPCH

Using partitioning from tpch connector can result in repartitioned joins
on pre-partitioned columns like orders.orderkey avoiding remote
exchange on the build side. Skipping this optmization in tests
allows for easier testing of fault tolerant mode and doesn't affect testing
of pipelined mode of execution.
For fault tolerant execution, dynamic filter collection
from worker will take place after completion of task.
By default the number of splits is based on
system CPU count which leads to variation in local
and CI setup.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

Support dynamic filtering for fault tolerant execution

5 participants