Skip to content

Fix deadlock in join spilling#16293

Merged
arhimondr merged 3 commits intoprestodb:masterfrom
arhimondr:fix-join-spill-with-limit
Jun 24, 2021
Merged

Fix deadlock in join spilling#16293
arhimondr merged 3 commits intoprestodb:masterfrom
arhimondr:fix-join-spill-with-limit

Conversation

@arhimondr
Copy link
Member

@arhimondr arhimondr commented Jun 18, 2021

The problem occurs when a spillable join is followed by a limit
operator.

This is a very tricky bug to reproduce. In order to reproduce it the
task concurrency should be at least 2. The issue is happening when the
first probe operator (LookupJoinOperator) is able to produce enough rows
to fill the limit operator so the early termination is triggered. It has
to produce enough rows with only a single build side partition
unspilled. During the early termination it doesn't release the current
open partition, and as long as the first partition is not released the
second one cannot be opened. Now the second instance of the
LookupJoinOperator must not produce enough rows for early termination
from joining with the first unspilled partition and must request the
second partition to unspill. But since the first operator never released
the first - the second partition cannot be unspilled, thus the second
instance of the LookupJoinOperator cannot produce any additional output.

The issue is also specific to Presto on Spark. It doesn't reproduce for
Classic Presto. In classic Presto all stages are running in parallel.
When any partial limit operator produces enough rows for the downstream
final limit to finish the signal is propagated from the downstream task
to the upstream task, and the upstream task is being canceled
terminating all operators including LookupJoinOperator's that are stuck.
In Presto on Spark for a stage to start running all upstream stages have
to finish first.

The fix is to always close open partitions on close to make sure
partitions are always released even when the LookupJoinOperator
terminates early due to the limit operator back pressure.

Test plan:

  • Additional unit tests
== RELEASE NOTES ==

General Changes
* Resolve deadlock problem that results into join + limit getting stuck when spilling is enabled

@arhimondr arhimondr force-pushed the fix-join-spill-with-limit branch 2 times, most recently from 02df9f8 to aa7b2ee Compare June 18, 2021 09:09
@rschlussel
Copy link
Contributor

Haven't reviewed yet. From the description it sounds like it fixes a real issue, but not the only issue since the test still hangs

2021-06-18T05:38:07.479-0500 WARN TestHangMonitor com.facebook.presto.testng.services.LogTestDurationListener No test started or completed in 8.00m. Running tests:
com.facebook.presto.spark.TestPrestoSparkSpilledJoinQueries::testLimitWithJoin running for 8.06m
com.facebook.presto.spark.TestPrestoSparkJoinQueries running for 55.85m
com.facebook.presto.spark.TestPrestoSparkSpilledJoinQueries running for 55.84m
com.facebook.presto.spark.TestPrestoSparkJoinQueries::testLimitWithJoin running for 8.09m.

@arhimondr
Copy link
Member Author

arhimondr commented Jun 18, 2021

@rschlussel Yeah, looks like a Klondike =D I think I know what is the second issue is. I just thought it should never happen in practice though. Let me fix it and resubmit.

@arhimondr arhimondr force-pushed the fix-join-spill-with-limit branch 2 times, most recently from b30a50b to 4987e55 Compare June 18, 2021 14:57
@viczhang861
Copy link
Contributor

task concurrency should be at least 2 -- this is because join spilling is only enabled for concurrency >=2

I did a quick read.

  • The work you put here will introduce conflict with Improve memory for HashBuilderOperator unspill #16212
  • You were continuing the work based on our offline discussion and my initial investigation about issue related to PartitionConsumption, it would be nice if you could sync up with me and keep me informed that you were planning to take over this task.

@arhimondr
Copy link
Member Author

arhimondr commented Jun 18, 2021

@v-jizhang

#16293 (comment)

The work you put here will introduce conflict with #16212

You should feel free to land your patch first. I will rebase, no worries.

You were continuing the work based on our offline discussion and my initial investigation about issue related to PartitionConsumption, it would be nice if you could sync up with me and keep me informed that you were planning to take over this task.

While you correctly pointed out that the issue is likely related to PartitionConsumption it wasn't clear to me how the issue is triggered, why it is triggered only for Presto on Spark and what makes Presto on Spark special.

That was a little concerning. The non spilling test failing was the biggest concern though. At that time it wasn't clear to me that the failure of non spilling test is just a fluke of 2 join limit tests executing in parallel at the same time. That seemed rather unlikely to me (though it ended up being true).

Thus I felt obliged to get to the bottom of it to avoid potential regression in production. I had to reiterate the entire investigation e2e as at the beginning it wasn't obvious why the problem in the PartitionConsumption is only triggered on Presto on Spark and not on Presto Classic.

@arhimondr
Copy link
Member Author

I had to split the tests into 3 separate jobs and readjust the thread pools and the concurrency settings. Now the tests pass.

@rschlussel
Copy link
Contributor

I had to split the tests into 3 separate jobs and readjust the thread pools and the concurrency settings. Now the tests pass.

why did this fix the timeouts? was it taking too long but doing real work and this fixed the fundamental issue, or did it just make some still existing deadlock issue less likely happen?

@arhimondr
Copy link
Member Author

arhimondr commented Jun 21, 2021

@rschlussel

why did this fix the timeouts? was it taking too long but doing real work and this fixed the fundamental issue, or did it just make some still existing deadlock issue less likely happen?

The deadlock is fixed by the first commit, Fix another deadlock in join spilling. After the fix the tests weren't getting stuck, but simply taking to long to finish.

I tried to rerun the tests locally several times, and locally they seem to finish much faster (timing out at 1h+ on CI vs <30m on my desktop with nvme flash).

My guess it might be related to lower IOPS provided on the CI (maybe a spinning disk?). Decreasing task concurrency decreases IO pressure, as with lower task concurrency there are less tiny partitions to spill.

I haven't been able to confirm whether increasing the number of spiller threads actually improves throughput, but I thought it could be a good idea in general to have enough threads for each partition to spill in parallel.

However even after the threads / concurrency adjustments the overall runtime was still over ~30 minutes, so I decided to also split the tests into 3 separate jobs to improve the build time.

Copy link
Contributor

@rschlussel rschlussel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looks good, but I'm confused why the second lookup join operator doesn't also get an early termination signal if the limit was reached by the first one? Shouldn't all operators below the limit get an early termination signal?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a comment here that this code is needed for the case of early termination where the operator is closed before being finished.

@arhimondr
Copy link
Member Author

arhimondr commented Jun 23, 2021

@rschlussel

Generally looks good, but I'm confused why the second lookup join operator doesn't also get an early termination signal if the limit was reached by the first one? Shouldn't all operators below the limit get an early termination signal?

That's actually something that also bothered me. I had to take a closer look.

So it looks like it actually depends on the plan and on the execution mode. Assuming the task concurrency is 2 and the plan allows to pushdown limit, there will be 2 Drivers running a LookupJoinOperator instance and a partial LimitOperator instance:

Driver 1:

[... input operator ...] -> [LookupJoinOperator] -> [LimitOperator (partial)] -> [... output operator ...]

Driver 2:

[... input operator ...] -> [LookupJoinOperator] -> [LimitOperator (partial)] -> [... output operator ...]

Now for the finish to get propagated to the LookupJoinOperator one of 2 things has to happen:

  1. The partial LimitOperator must fill it's buffer and signal "finish" to it's direct upstream operator
  2. The global limit operator must signal finish to all the upstream drivers

It looks like we have an understanding why the first doesn't happen. But the question is why the second doesn't happen?

The second doesn't happen due to some specifics how we plan partial limits and execution specifics of Presto on Spark.

Our planner doesn't try to plan the query in a way that partial limit outputs are merged locally. Basically if N threads are running partial limit, each thread is going to send it's output to the downstream stage. This in theory is suboptimal, as Ideally there should be a local (merge) exchange and a single partial limit operator on top to reduce the number of values sent over the network to the downstream stage:

presto:di> explain (type distributed) SELECT o1.orderkey, o2.orderkey FROM tpch.tiny.orders o1 JOIN tpch.tiny.orders o2 on o1.orderkey = o2.orderkey LIMIT 10;
                                                                                                            Query Plan                                                                                                             
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]                                                                                                                                                                                                               
     Output layout: [orderkey, orderkey]                                                                                                                                                                                           
     Output partitioning: SINGLE []                                                                                                                                                                                                
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                 
     - Output[orderkey, orderkey] => [orderkey:bigint, orderkey:bigint]                                                                                                                                                            
             Estimates: {rows: 10 (180B), cpu: 2025180.00, memory: 270000.00, network: 90.00}                                                                                                                                      
         - Limit[10] => [orderkey:bigint]                                                                                                                                                                                          
                 Estimates: {rows: 10 (90B), cpu: 2025180.00, memory: 270000.00, network: 90.00}                                                                                                                                   
             - LocalExchange[SINGLE] () => [orderkey:bigint]                                                                                                                                                                       
                     Estimates: {rows: 10 (90B), cpu: 2025090.00, memory: 270000.00, network: 90.00}                                                                                                                               
                 - RemoteSource[1] => [orderkey:bigint]                                                                                                                                                                            
                                                                                                                                                                                                                                   
 Fragment 1 [tpch:orders:15000]                                                                                                                                                                                                    
     Output layout: [orderkey]                                                                                                                                                                                                     
     Output partitioning: SINGLE []                                                                                                                                                                                                
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                 
     - LimitPartial[10] => [orderkey:bigint]                                                                                                                                                                                       
             Estimates: {rows: 10 (90B), cpu: 2025090.00, memory: 270000.00, network: 0.00}                                                                                                                                        
         - InnerJoin[("orderkey" = "orderkey_0")][$hashvalue, $hashvalue_20] => [orderkey:bigint]                                                                                                                                  
                 Estimates: {rows: 15000 (131.84kB), cpu: 2025000.00, memory: 270000.00, network: 0.00}                                                                                                                            
                 Distribution: PARTITIONED                                                                                                                                                                                         
             - LocalExchange[ROUND_ROBIN] () => [orderkey:bigint, $hashvalue:bigint]                                                                                                                                               
                     Estimates: {rows: 15000 (263.67kB), cpu: 675000.00, memory: 0.00, network: 0.00}                                                                                                                              
                 - ScanProject[table = TableHandle {connectorId='tpch', connectorHandle='orders:sf0.01', layout='Optional[orders:sf0.01]'}, grouped = false, projectLocality = LOCAL] => [orderkey:bigint, $hashvalue_19:bigint]   
                         Estimates: {rows: 15000 (263.67kB), cpu: 135000.00, memory: 0.00, network: 0.00}/{rows: 15000 (263.67kB), cpu: 405000.00, memory: 0.00, network: 0.00}                                                    
                         $hashvalue_19 := combine_hash(BIGINT 0, COALESCE($operator$hash_code(orderkey), BIGINT 0))                                                                                                                
                         orderkey := tpch:orderkey                                                                                                                                                                                 
                         tpch:orderstatus                                                                                                                                                                                          
                             :: [[F], [O], [P]]                                                                                                                                                                                    
             - LocalExchange[HASH][$hashvalue_20] (orderkey_0) => [orderkey_0:bigint, $hashvalue_20:bigint]                                                                                                                        
                     Estimates: {rows: 15000 (263.67kB), cpu: 675000.00, memory: 0.00, network: 0.00}                                                                                                                              
                 - ScanProject[table = TableHandle {connectorId='tpch', connectorHandle='orders:sf0.01', layout='Optional[orders:sf0.01]'}, grouped = false, projectLocality = LOCAL] => [orderkey_0:bigint, $hashvalue_21:bigint] 
                         Estimates: {rows: 15000 (263.67kB), cpu: 135000.00, memory: 0.00, network: 0.00}/{rows: 15000 (263.67kB), cpu: 405000.00, memory: 0.00, network: 0.00}                                                    
                         $hashvalue_21 := combine_hash(BIGINT 0, COALESCE($operator$hash_code(orderkey_0), BIGINT 0))                                                                                                              
                         orderkey_0 := tpch:orderkey                                                                                                                                                                               
                         tpch:orderstatus                                                                                                                                                                                          
                             :: [[F], [O], [P]]                                                                                                                                                                                    
                                                                                                                                                                                                                                   
                                                                                                                                                                                                                                   
(1 row)

Query 20210623_165957_22989_ajpzd, FINISHED, 1 node
Splits: 1 total, 1 done (100.00%)
0:01 [0 rows, 0B] [0 rows/s, 0B/s]

Now, in Presto Classic this is not a problem, since all stages are running in parallel. So the downstream stage can signal it back immediately once the limit is reached.

However in Presto on Spark it is not the case, as an upstream stage has to write full results before the downstream stage can start execution.

So it looks like the assumption I made in the PR description is actually incorrect:

Most likely it is due to a different page size and different partition distribution for Presto on Spark and classic Presto.

Let me know if it makes sense. If you think it does I will update the PR description with the latest findings.

@rschlussel
Copy link
Contributor

that makes a lot of sense. Can you update the commit description also?

Copy link
Contributor

@rschlussel rschlussel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good pending update to pr/commit description and comment

@arhimondr arhimondr force-pushed the fix-join-spill-with-limit branch from 072851e to 93fdd78 Compare June 23, 2021 20:43
@arhimondr
Copy link
Member Author

@rschlussel updated

Copy link
Contributor

@viczhang861 viczhang861 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Fix another deadlock in join spilling"

  • "another" is not necessary, as people may wonder what is the first one.
  • It feels like it is not deadlock, but a future that is waiting for something to happen but never happens, still looking.

@arhimondr
Copy link
Member Author

@viczhang861

Another deadlock was fixed by @rschlussel by #15975. This is the second one =D But yeah, I agree, the word another makes the description confusing. Let me reword it.

@arhimondr arhimondr force-pushed the fix-join-spill-with-limit branch from 93fdd78 to 494ed24 Compare June 23, 2021 22:27
The problem occurs when a spillable join is followed by a limit
operator.

This is a very tricky bug to reproduce. In order to reproduce it the
task concurrency should be at least 2. The issue is happening when the
first probe operator (LookupJoinOperator) is able to produce enough rows
to fill the limit operator so the early termination is triggered. It has
to produce enough rows with only a single build side partition
unspilled. During the early termination it doesn't release the current
open partition, and as long as the first partition is not released the
second one cannot be opened. Now the second instance of the
LookupJoinOperator must not produce enough rows for early termination
from joining with the first unspilled partition and must request the
second partition to unspill. But since the first operator never released
the first - the second partition cannot be unspilled, thus the second
instance of the LookupJoinOperator cannot produce any additional output.

The issue is also specific to Presto on Spark. It doesn't reproduce for
Classic Presto. In classic Presto all stages are running in parallel.
When any partial limit operator produces enough rows for the downstream
final limit to finish the signal is propagated from the downstream task
to the upstream task, and the upstream task is being canceled
terminating all operators including LookupJoinOperator's that are stuck.
In Presto on Spark for a stage to start running all upstream stages have
to finish first.

The fix is to always close open partitions on close to make sure
partitions are always released even when the LookupJoinOperator
terminates early due to the limit operator back pressure.
The tests finish in less then 20m locally on a machine with nvme flash.
It looks like on the CI the bottleneck is IOPS.

Decreasing task concurrency should result in less granular spills
decreasing overall IO pressure.

Increasing number of threads for spiller is needed to avoid thread
starvation when the spilling is slow.
@arhimondr arhimondr force-pushed the fix-join-spill-with-limit branch from 494ed24 to 3369d43 Compare June 24, 2021 15:10
@arhimondr arhimondr changed the title Fix another deadlock in join spilling Fix deadlock in join spilling Jun 24, 2021
@arhimondr arhimondr merged commit 87d759c into prestodb:master Jun 24, 2021
@arhimondr arhimondr deleted the fix-join-spill-with-limit branch June 24, 2021 16:13
@ajaygeorge ajaygeorge mentioned this pull request Jul 7, 2021
1 task
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants