Skip to content

Conversation

@jeanlyn
Copy link
Contributor

@jeanlyn jeanlyn commented Mar 29, 2016

What changes were proposed in this pull request?

We have a streaming job using FlumePollInputStream always driver OOM after few days, here is some driver heap dump before OOM

 num     #instances         #bytes  class name
----------------------------------------------
   1:      13845916      553836640  org.apache.spark.storage.BlockStatus
   2:      14020324      336487776  org.apache.spark.storage.StreamBlockId
   3:      13883881      333213144  scala.collection.mutable.DefaultEntry
   4:          8907       89043952  [Lscala.collection.mutable.HashEntry;
   5:         62360       65107352  [B
   6:        163368       24453904  [Ljava.lang.Object;
   7:        293651       20342664  [C
...

BlockStatus and StreamBlockId keep on growing, and the driver OOM in the end.
After investigated, i found the executorIdToStorageStatus in StorageStatusListener seems never remove the blocks from StorageStatus.
In order to fix the issue, i try to use onBlockUpdated replace onTaskEnd , so we can update the block informations(add blocks, drop the block from memory to disk and delete the blocks) in time.

How was this patch tested?

Existing unit tests and manual tests

@SparkQA
Copy link

SparkQA commented Mar 29, 2016

Test build #54416 has finished for PR 12028 at commit f37f359.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jeanlyn
Copy link
Contributor Author

jeanlyn commented Mar 29, 2016

/cc @andrewor14

@andrewor14
Copy link
Contributor

Merged into 1.6

asfgit pushed a commit that referenced this pull request Mar 29, 2016
…askEnd avioding driver OOM

## What changes were proposed in this pull request?

We have a streaming job using `FlumePollInputStream` always driver OOM after few days, here is some driver heap dump before OOM
```
 num     #instances         #bytes  class name
----------------------------------------------
   1:      13845916      553836640  org.apache.spark.storage.BlockStatus
   2:      14020324      336487776  org.apache.spark.storage.StreamBlockId
   3:      13883881      333213144  scala.collection.mutable.DefaultEntry
   4:          8907       89043952  [Lscala.collection.mutable.HashEntry;
   5:         62360       65107352  [B
   6:        163368       24453904  [Ljava.lang.Object;
   7:        293651       20342664  [C
...
```
`BlockStatus` and `StreamBlockId` keep on growing, and the driver OOM in the end.
After investigated, i found the `executorIdToStorageStatus` in `StorageStatusListener` seems never remove the blocks from `StorageStatus`.
In order to fix the issue, i try to use `onBlockUpdated` replace `onTaskEnd ` , so we can update the block informations(add blocks, drop the block from memory to disk and delete the blocks) in time.

## How was this patch tested?

Existing unit tests and manual tests

Author: jeanlyn <[email protected]>

Closes #12028 from jeanlyn/fixoom1.6.
@andrewor14
Copy link
Contributor

Can you close this PR now?

@jeanlyn
Copy link
Contributor Author

jeanlyn commented Mar 30, 2016

OK.

@jeanlyn jeanlyn closed this Mar 30, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants