Skip to content

Conversation

@taosaildrone
Copy link

What changes were proposed in this pull request?

If SortMergeJoinScanner doesn't consume the iterator from
UnsafeExternalRowSorter entirely, the memory that
UnsafeExternalSorter acquired from TaskMemoryManager will not
be released. This leads to a memory leak, spills, and OOME. A
page will be held per partition of the unused iterator.
This patch will allow the SortMergeJoinScanner to explicitly close the iterators (for non-generated code)

How was this patch tested?

Manual testing and profiling with scripts in SPARK-21492 comments.

If SortMergeJoinScanner doesn't consume the iterator from
UnsafeExternalRowSorter entirely, the memory that
UnsafeExternalSorter acquired from TaskMemoryManager will not
be released. This leads to a memory leak, spills, and OOME. A
page will be held per partition of the unused iterator.
This patch will allow the SortMergeJoinScanner to explicitly close the iterators (for non-generated code)

Manual testing and profiling with scripts in SPARK-21492 comments.
Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good idea in principle; I don't know this code well.

If SortMergeJoinScanner doesn't consume the iterator from
UnsafeExternalRowSorter entirely, the memory that
UnsafeExternalSorter acquired from TaskMemoryManager will not
be released. This leads to a memory leak, spills, and OOME. A
page will be held per partition of the unused iterator.
This patch will allow the SortMergeJoinScanner to explicitly close the iterators (for non-generated code)

Manual testing and profiling with scripts in SPARK-21492 comments.
@taosaildrone
Copy link
Author

Thanks for reviewing @srowen

@kiszk @tejasapatil, would appreciate it if you have any input on how to approach the code-generation version of the fix.

import java.io.Closeable;
import java.io.IOException;

public abstract class UnsafeExternalRowIterator extends AbstractIterator<UnsafeRow> implements Closeable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we need an iterator API with a close method. Shall we reuse RowIterator and add close method to it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would require introducing a dependency on the spark-sql module from catalyst.
And would require more refactoring of RowIterator and its subclasses.

@hvanhovell
Copy link
Contributor

@taosaildrone the sorted iterators consumed by SMJ should be cleaned up when the task finishes. Do you have any idea why this is not working?

@cloud-fan
Copy link
Contributor

IIUC the sorted iterator will be cleaned up eventually, this PR just clean it up earlier. I think this is not a memory leak fix, but an optimization.

@taosaildrone
Copy link
Author

taosaildrone commented Feb 13, 2019

There's a task completion listener, but 1) we could hit an OOME before the task completes or 2) impact performance by holding unnecessary memory and causing a bunch of unneeded spills.
It's holding onto memory that is no longer needed, reducing available memory to the point that the application fails when it tries to allocate memory that's actually needed to make progress. I would classify that as a memory leak.

Something as simple as joining a DF with 1000 rows with another DF of 2 rows (with one overlapping row_id) will cause an OOME (both locally, and on a cluster) and cause the job to fail:

An example locally: ./bin/pyspark --master local[10] (tested on 2.4.0 and 3.0.0-master)

from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 1001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show()

@hvanhovell
Copy link
Contributor

@taosaildrone I have two problems with this approach:

  1. This only works for a SMJ with Sorts as its direct input. That is the common case, however this won't work for any situation when there are operators in between the SMJ and the sort (for example when the sort is used between operators).
  2. I am not sure if it safe to assume that you can close an underlying child like this. You are toast as soon as something used in a downstream operator still points to a sort buffer you are releasing. This should generally not be an issue because it would probably has surfaced as a bug. However this is a pretty low bar to clear as it also depends on usage patterns. I would like to see a more principled approach here where you have both improved API (close on iterators???) and well defined semantics.

We should discuss this on the dev list.

@taosaildrone
Copy link
Author

@srowen @hvanhovell @cloud-fan
Thanks for taking a look. I'm not tied to this approach, in fact hopefully someone with more knowledge in the Spark SQL internals can take this up. Here's the thread on the dev-list:

http://apache-spark-developers-list.1001551.n3.nabble.com/Memory-leak-in-SortMergeJoin-td27152.html

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

cloud-fan pushed a commit that referenced this pull request Oct 22, 2019
### What changes were proposed in this pull request?
We shall have a new mechanism that the downstream operators may notify its parents that they may release the output data stream. In this PR, we implement the mechanism as below:
- Add function named `cleanupResources` in SparkPlan, which default call children's `cleanupResources` function, the operator which need a resource cleanup should rewrite this with the self cleanup and also call `super.cleanupResources`, like SortExec in this PR.
- Add logic support on the trigger side, in this PR is SortMergeJoinExec, which make sure and call the `cleanupResources` to do the cleanup job for all its upstream(children) operator.

### Why are the changes needed?
Bugfix for SortMergeJoin memory leak, and implement a general framework for SparkPlan resource cleanup.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
UT: Add new test suite JoinWithResourceCleanSuite to check both standard and code generation scenario.

Integrate Test: Test with driver/executor default memory set 1g, local mode 10 thread. The below test(thanks taosaildrone for providing this test  [here](#23762 (comment))) will pass with this PR.

```
from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
# spark.conf.set("spark.sql.sortMergeJoinExec.eagerCleanupResources", "true")

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 1001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show()
```

Closes #26164 from xuanyuanking/SPARK-21492.

Authored-by: Yuanjian Li <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
xuanyuanking added a commit to xuanyuanking/spark that referenced this pull request Oct 22, 2019
/### What changes were proposed in this pull request?
We shall have a new mechanism that the downstream operators may notify its parents that they may release the output data stream. In this PR, we implement the mechanism as below:
- Add function named `cleanupResources` in SparkPlan, which default call children's `cleanupResources` function, the operator which need a resource cleanup should rewrite this with the self cleanup and also call `super.cleanupResources`, like SortExec in this PR.
- Add logic support on the trigger side, in this PR is SortMergeJoinExec, which make sure and call the `cleanupResources` to do the cleanup job for all its upstream(children) operator.

/### Why are the changes needed?
Bugfix for SortMergeJoin memory leak, and implement a general framework for SparkPlan resource cleanup.

/### Does this PR introduce any user-facing change?
No.

/### How was this patch tested?
UT: Add new test suite JoinWithResourceCleanSuite to check both standard and code generation scenario.

Integrate Test: Test with driver/executor default memory set 1g, local mode 10 thread. The below test(thanks taosaildrone for providing this test  [here](apache#23762 (comment))) will pass with this PR.

```
from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 1001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show()
```
cloud-fan pushed a commit that referenced this pull request Oct 23, 2019
### What changes were proposed in this pull request?
We shall have a new mechanism that the downstream operators may notify its parents that they may release the output data stream. In this PR, we implement the mechanism as below:
- Add function named `cleanupResources` in SparkPlan, which default call children's `cleanupResources` function, the operator which need a resource cleanup should rewrite this with the self cleanup and also call `super.cleanupResources`, like SortExec in this PR.
- Add logic support on the trigger side, in this PR is SortMergeJoinExec, which make sure and call the `cleanupResources` to do the cleanup job for all its upstream(children) operator.

### Why are the changes needed?
Bugfix for SortMergeJoin memory leak, and implement a general framework for SparkPlan resource cleanup.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
UT: Add new test suite JoinWithResourceCleanSuite to check both standard and code generation scenario.

Integrate Test: Test with driver/executor default memory set 1g, local mode 10 thread. The below test(thanks taosaildrone for providing this test  [here](#23762 (comment))) will pass with this PR.

```
from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 1001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show()
```

Closes #26210 from xuanyuanking/SPARK-21492-backport.

Authored-by: Yuanjian Li <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
wangqia0309 pushed a commit to bigo-sg/spark that referenced this pull request Oct 30, 2019
### What changes were proposed in this pull request?
We shall have a new mechanism that the downstream operators may notify its parents that they may release the output data stream. In this PR, we implement the mechanism as below:
- Add function named `cleanupResources` in SparkPlan, which default call children's `cleanupResources` function, the operator which need a resource cleanup should rewrite this with the self cleanup and also call `super.cleanupResources`, like SortExec in this PR.
- Add logic support on the trigger side, in this PR is SortMergeJoinExec, which make sure and call the `cleanupResources` to do the cleanup job for all its upstream(children) operator.

### Why are the changes needed?
Bugfix for SortMergeJoin memory leak, and implement a general framework for SparkPlan resource cleanup.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
UT: Add new test suite JoinWithResourceCleanSuite to check both standard and code generation scenario.

Integrate Test: Test with driver/executor default memory set 1g, local mode 10 thread. The below test(thanks taosaildrone for providing this test  [here](apache#23762 (comment))) will pass with this PR.

```
from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 1001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show()
```

Closes apache#26210 from xuanyuanking/SPARK-21492-backport.

Authored-by: Yuanjian Li <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
@github-actions
Copy link

github-actions bot commented Jan 1, 2020

We're closing this PR because it hasn't been updated in a while.
This isn't a judgement on the merit of the PR in any way. It's just
a way of keeping the PR queue manageable.

If you'd like to revive this PR, please reopen it!

@github-actions github-actions bot added the Stale label Jan 1, 2020
@github-actions github-actions bot closed this Jan 2, 2020
jstokes pushed a commit to amperity/spark that referenced this pull request Jul 31, 2020
### What changes were proposed in this pull request?
We shall have a new mechanism that the downstream operators may notify its parents that they may release the output data stream. In this PR, we implement the mechanism as below:
- Add function named `cleanupResources` in SparkPlan, which default call children's `cleanupResources` function, the operator which need a resource cleanup should rewrite this with the self cleanup and also call `super.cleanupResources`, like SortExec in this PR.
- Add logic support on the trigger side, in this PR is SortMergeJoinExec, which make sure and call the `cleanupResources` to do the cleanup job for all its upstream(children) operator.

### Why are the changes needed?
Bugfix for SortMergeJoin memory leak, and implement a general framework for SparkPlan resource cleanup.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
UT: Add new test suite JoinWithResourceCleanSuite to check both standard and code generation scenario.

Integrate Test: Test with driver/executor default memory set 1g, local mode 10 thread. The below test(thanks taosaildrone for providing this test  [here](apache#23762 (comment))) will pass with this PR.

```
from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 1001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show()
```

Closes apache#26210 from xuanyuanking/SPARK-21492-backport.

Authored-by: Yuanjian Li <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants