Skip to content

Conversation

@hensg
Copy link
Contributor

@hensg hensg commented Jan 2, 2020

What changes were proposed in this pull request?

Ensure that all StreamStates are removed from OneForOneStreamManager memory map even if there's an error trying to release buffers

Why are the changes needed?

OneForOneStreamManager may not remove all StreamStates from memory map when a connection is terminated. A RuntimeException might be thrown in StreamState$buffers.next() by one of ExternalShuffleBlockResolver$getBlockData... breaking the loop through streams.entrySet(), keeping StreamStates in memory forever leaking memory.
That may happen when an application is terminated abruptly and executors removed before the connection is terminated or if shuffleIndexCache fails to get ShuffleIndexInformation

References:
https://github.com/apache/spark/blob/ee050ddbc6eb6bc08c7751a0eb00e7a05b011b52/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java#L319

https://github.com/apache/spark/blob/ee050ddbc6eb6bc08c7751a0eb00e7a05b011b52/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java#L357

https://github.com/apache/spark/blob/ee050ddbc6eb6bc08c7751a0eb00e7a05b011b52/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java#L195

https://github.com/apache/spark/blob/ee050ddbc6eb6bc08c7751a0eb00e7a05b011b52/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java#L208

https://github.com/apache/spark/blob/ee050ddbc6eb6bc08c7751a0eb00e7a05b011b52/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java#L330

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit test added

OneForOneStreamManager may not remove all StreamStates from memory map when a connection is terminated.

A RuntimeException might be thrown in StreamState$buffers.next() by one of ExternalShuffleBlockResolver$getBlockData/getRddBlock... breaking the loop through streams.entrySet()

This commit removes all StreamStates from memory map first and, after that, releases all buffers from each removed StreamState
@hensg hensg changed the title [SPARK-30246]OneForOneStreamManager might leak memory in connectionTerminated [SPARK-30246]OneForOneStreamManager may leak memory in connectionTerminated Jan 2, 2020
@hensg hensg changed the title [SPARK-30246]OneForOneStreamManager may leak memory in connectionTerminated [SPARK-30246]OneForOneStreamManager might leak memory in connectionTerminated Jan 2, 2020
@HeartSaVioR
Copy link
Contributor

cc. @viirya

// Release all remaining buffers.
try {
while (state.buffers.hasNext()) {
ManagedBuffer buffer = state.buffers.next();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we got RuntimeException, don't we just fail? Is there memory leak?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Override
public void channelInactive() {
if (streamManager != null) {
try {
streamManager.connectionTerminated(channel);
} catch (RuntimeException e) {
logger.error("StreamManager connectionTerminated() callback failed.", e);
}
}
rpcHandler.channelInactive(reverseClient);
}

TransportRequestHandler.channelInactive will log error message and swallow the exception, so no, we don't fail because of exception thrown here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see. Then this change may also swallow the RuntimeException so channelInactive cannot log as before? Should we rethrow the same exception?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think there're two options here with achieving goal;

  1. store any exception between exceptions (as we'll try to release from all the streams) and rethrow, then channelInactive will log it. other exceptions should be logged as well but maybe at here (or a new exception containing all exceptions and throw it instead).

  2. catch them, and log and swallow at here.

Technically the change from logging side would be logger name which may not a big deal, but it's also valid concern if we think channelInactive is the one to decide how to handle the exception.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, I think it might make the caller (channelInactive or others if any) thinks everything is fine inside connectionTerminated. Maybe not big deal for now, but sounds a potential concern.

We might have multiple exception during releasing buffers. We can rethrow a RuntimeException if any exception happens during releasing buffers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK thanks for the input. @hensg Could you follow up the comment above? Thanks!

@viirya
Copy link
Member

viirya commented Jan 2, 2020

ok to test.

@SparkQA
Copy link

SparkQA commented Jan 3, 2020

Test build #116058 has finished for PR 27064 at commit a9ebe4f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@012huang 012huang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there exists an case, if the application has already finished(failed), executors' info of the app has been cleaned, but the connectionTerminated be called delay, when execute this code, it will look up the buffer with appId and execId , so the buffer cannot be got and released. this happend in spark 2.4.3 as I report and I also make a pr, pls help review(#27060), thanks

@HeartSaVioR
Copy link
Contributor

@012huang I feel it's orthogonal to this. Could you please cc. to some committers modified around the code having bug in your PR?

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. cc. @viirya Could you please take a look again? Thanks in advance.

@SparkQA
Copy link

SparkQA commented Jan 9, 2020

Test build #116383 has finished for PR 27064 at commit 410651d.

  • This patch fails Java style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

[ERROR] src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java:[123] (sizes) LineLength: Line is longer than 100 characters (found 106).
@SparkQA
Copy link

SparkQA commented Jan 9, 2020

Test build #116384 has finished for PR 27064 at commit ac58798.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor

retest this, please

@SparkQA
Copy link

SparkQA commented Jan 9, 2020

Test build #116398 has finished for PR 27064 at commit ac58798.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Jan 9, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jan 9, 2020

Test build #116414 has finished for PR 27064 at commit ac58798.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Jan 9, 2020

looks like a valid failure test_memory_limit (pyspark.tests.test_worker.WorkerMemoryTest)? It continues to fail on this.

@HeartSaVioR
Copy link
Contributor

@viirya The test fails from other PRs as well - please refer #26201 (comment)

Looks like other PRs succeed to build because the pyspark test seems run conditionally.

@viirya
Copy link
Member

viirya commented Jan 9, 2020

@HeartSaVioR hmm, I just ran the test locally and it passed.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Jan 9, 2020

Yeah same for me as well, but I don't have pypy. Installing now.

EDIT: and no luck. It works with my local. Maybe there's something with Jenkins worker machine?

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Jan 9, 2020

EDIT: It seems to fail intermittently, so not able to reproduce the original failure.

Oh wait I'm seeing failure "after" installing "resource", but the test fails elsewhere which is odd.

Running PySpark tests. Output is in /Users/jlim/WorkArea/ScalaProjects/spark/python/unit-tests.log
Will test against the following Python executables: ['/usr/local/bin/python3', 'python2.7', 'pypy']
Will test the following Python tests: ['pyspark.tests.test_worker']
Starting test(/usr/local/bin/python3): pyspark.tests.test_worker
Starting test(pypy): pyspark.tests.test_worker
Starting test(python2.7): pyspark.tests.test_worker
test_memory_limit (pyspark.tests.test_worker.WorkerMemoryTest) ... Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
/Users/jlim/WorkArea/ScalaProjects/spark/python/pyspark/context.py:219: DeprecationWarning: Support for Python 2 and Python 3 prior to version 3.6 is deprecated as of Spark 3.0. See also the plan for dropping Python 2 support at https://spark.apache.org/news/plan-for-dropping-python-2-support.html.
  DeprecationWarning)
[Stage 0:>                                                          (0 + 1) / 1]Current mem limits: 9223372036854775807 of max 9223372036854775807

Setting mem limits to 1048576 of max 1048576

/Users/jlim/WorkArea/ScalaProjects/spark/python/pyspark/tests/test_worker.py:190: RuntimeWarning: Parent module 'pyspark.tests' not found while handling absolute import
  import resource
ok
test_reuse_worker_of_parallelize_xrange (pyspark.tests.test_worker.WorkerReuseTest) ... FAIL
test_accumulator_when_reuse_worker (pyspark.tests.test_worker.WorkerTests) ... ok
test_after_exception (pyspark.tests.test_worker.WorkerTests) ... ok
test_after_jvm_exception (pyspark.tests.test_worker.WorkerTests) ... ok
test_cancel_task (pyspark.tests.test_worker.WorkerTests) ... /Users/jlim/WorkArea/ScalaProjects/spark/python/pyspark/tests/test_worker.py:45: RuntimeWarning: Parent module 'pyspark.tests' not found while handling absolute import
  import os
/Users/jlim/WorkArea/ScalaProjects/spark/python/pyspark/tests/test_worker.py:46: RuntimeWarning: Parent module 'pyspark.tests' not found while handling absolute import
  import time
ok
test_python_exception_non_hanging (pyspark.tests.test_worker.WorkerTests) ... ok
test_reuse_worker_after_take (pyspark.tests.test_worker.WorkerTests) ... ok
test_with_different_versions_of_python (pyspark.tests.test_worker.WorkerTests) ... ok

======================================================================
Finished test(/usr/local/bin/python3): pyspark.tests.test_worker (15s)
FAIL: test_reuse_worker_of_parallelize_xrange (pyspark.tests.test_worker.WorkerReuseTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/Users/jlim/WorkArea/ScalaProjects/spark/python/pyspark/tests/test_worker.py", line 176, in test_reuse_worker_of_parallelize_xrange
    self.assertTrue(pid in previous_pids)
AssertionError: False is not true

----------------------------------------------------------------------
Ran 9 tests in 15.360s

FAILED (failures=1)

dbtsai pushed a commit to dbtsai/spark that referenced this pull request Jan 10, 2020
### What changes were proposed in this pull request?

This patch increases the memory limit in the test 'test_memory_limit' from 1m to 8m.
Credit to srowen and HyukjinKwon to provide the idea of suspicion and guide how to fix.

### Why are the changes needed?

We observed consistent Pyspark test failures on multiple PRs (apache#26955, apache#26201, apache#27064) which block the PR builds whenever the test is included.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Jenkins builds passed in WIP PR (apache#27159)

Closes apache#27162 from HeartSaVioR/SPARK-30480.

Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Jan 10, 2020

Test build #116453 has finished for PR 27064 at commit ac58798.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor

retest this, please

@SparkQA
Copy link

SparkQA commented Jan 10, 2020

Test build #116472 has finished for PR 27064 at commit ac58798.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 12, 2020

Test build #116556 has finished for PR 27064 at commit d76483f.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 14, 2020

Test build #116663 has finished for PR 27064 at commit b02e48a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good. cc @cloud-fan

buffer.release();
}
}
} catch (RuntimeException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the only exception type we need to deal with here?

Copy link
Contributor

@HeartSaVioR HeartSaVioR Jan 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other kind of exception would be Error which TransportRequestHandler.channelInactive is also not catching and let the process fail.

@SparkQA
Copy link

SparkQA commented Jan 14, 2020

Test build #116727 has finished for PR 27064 at commit 0fde4c9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hensg hensg requested a review from cloud-fan January 14, 2020 23:32
@dongjoon-hyun dongjoon-hyun changed the title [SPARK-30246]OneForOneStreamManager might leak memory in connectionTerminated [SPARK-30246][CORE] OneForOneStreamManager might leak memory in connectionTerminated Jan 15, 2020
@dongjoon-hyun
Copy link
Member

cc @vanzin , too.

@vanzin
Copy link
Contributor

vanzin commented Jan 15, 2020

Merging to master / 2.4.

@vanzin vanzin closed this in d42cf45 Jan 15, 2020
vanzin pushed a commit that referenced this pull request Jan 15, 2020
…ctionTerminated

Ensure that all StreamStates are removed from OneForOneStreamManager memory map even if there's an error trying to release buffers

OneForOneStreamManager may not remove all StreamStates from memory map when a connection is terminated. A RuntimeException might be thrown in StreamState$buffers.next() by one of ExternalShuffleBlockResolver$getBlockData... **breaking the loop through streams.entrySet(), keeping StreamStates in memory forever leaking memory.**
That may happen when an application is terminated abruptly and executors removed before the connection is terminated or if shuffleIndexCache fails to get ShuffleIndexInformation

References:
https://github.com/apache/spark/blob/ee050ddbc6eb6bc08c7751a0eb00e7a05b011b52/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java#L319

https://github.com/apache/spark/blob/ee050ddbc6eb6bc08c7751a0eb00e7a05b011b52/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java#L357

https://github.com/apache/spark/blob/ee050ddbc6eb6bc08c7751a0eb00e7a05b011b52/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java#L195

https://github.com/apache/spark/blob/ee050ddbc6eb6bc08c7751a0eb00e7a05b011b52/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java#L208

https://github.com/apache/spark/blob/ee050ddbc6eb6bc08c7751a0eb00e7a05b011b52/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java#L330

No

Unit test added

Closes #27064 from hensg/SPARK-30246.

Lead-authored-by: Henrique Goulart <[email protected]>
Co-authored-by: Henrique Goulart <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
(cherry picked from commit d42cf45)
Signed-off-by: Marcelo Vanzin <[email protected]>
@vanzin
Copy link
Contributor

vanzin commented Jan 15, 2020

FYI: fixed a trivial conflict in 2.4, and had to add a couple of things to the test:

  • import the Assert class
  • add a couple of @SuppressWarnings otherwise compilation was failing. Not sure why master did not complain...

@dongjoon-hyun
Copy link
Member

Thank you all!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants