Skip to content

Conversation

@ilganeli
Copy link

Instead of storing a strong reference to accumulators, I've replaced this with a weak reference and updated any code that uses these accumulators to check whether the reference resolves before using the accumulator. A weak reference will be cleared when there is no longer an existing copy of the variable versus using a soft reference in which case accumulators would only be cleared when the GC explicitly ran out of memory.

@SparkQA
Copy link

SparkQA commented Jan 13, 2015

Test build #25475 has started for PR 4021 at commit 28f705c.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 13, 2015

Test build #25475 has finished for PR 4021 at commit 28f705c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • val classServer = new HttpServer(conf, outputDir, new SecurityManager(conf), classServerPort, "HTTP class server")

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25475/
Test PASSed.

@JoshRosen
Copy link
Contributor

It looks like a number of file permissions changes got mixed into this PR; mind reverting those changes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guava MapMaper supports weakValues; not sure if we want to use that here, since it's not super Scala-friendly (e.g. returns nulls, etc): http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/collect/MapMaker.html

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Josh - are you suggesting to replace this snippet with a MapMaker just to simplify the initialization code? I believe the usage of either object would be the same - do you see a specific advantage to trying to use the MapMaker?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just leave this as-is; I don't think MapMaker will buy us much now that I think about it.

@SparkQA
Copy link

SparkQA commented Jan 15, 2015

Test build #25606 has started for PR 4021 at commit 18d62ec.

  • This patch merges cleanly.

@ilganeli
Copy link
Author

I've updated the code to throw an exception in the error case you mentioned and I've reverted the file permission change. Thanks!

@SparkQA
Copy link

SparkQA commented Jan 15, 2015

Test build #25606 has finished for PR 4021 at commit 18d62ec.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25606/
Test PASSed.

@JoshRosen
Copy link
Contributor

This looks pretty nice. I wonder if there's a good way to add a unit test for this behavior (maybe in AccumulatorSuite or something) to ensure that we don't accidentally break the garbage-collection. This could be tricky, though, since we don't want to introduce a test that's flaky due to nondeterminism. Maybe a good test would be something like creating a job that uses an accumulator, storing its id, letting all references to it fall out of scope, then calling System.gc() and asserting that dereferencing the WeakReference returns null. This would help to avoid memory leaks if we accidentally write some new code in the future that stores strong references to accumulators.

@ilganeli
Copy link
Author

Thanks for the suggestion Josh - that sounds quite reasonable. I was having trouble coming up with an effective test for this. I'll add a test based on this idea to the accumulator suite.

@ilganeli
Copy link
Author

ilganeli commented Feb 7, 2015

Hi @JoshRosen, I've added a test for this patch to the Accumulator Suite. Please let me know if you think it's sufficient. Thanks!

@SparkQA
Copy link

SparkQA commented Feb 7, 2015

Test build #27005 has started for PR 4021 at commit c8e0f2b.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 7, 2015

Test build #27005 has finished for PR 4021 at commit c8e0f2b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27005/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27800/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Feb 21, 2015

Test build #27801 has started for PR 4021 at commit 8510943.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 21, 2015

Test build #27801 has finished for PR 4021 at commit 8510943.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27801/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Feb 21, 2015

Test build #27803 has started for PR 4021 at commit 4ba9575.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 21, 2015

Test build #27803 has finished for PR 4021 at commit 4ba9575.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27803/
Test PASSed.

@JoshRosen
Copy link
Contributor

I just realized that ContextCleaner won't manage cleanup of accumulators in the thread-local accumulator registries in executors, but I don't think this is a big deal: we're more concerned about the driver OOMing in a long-running app than the executors, and it would be prohibitively expensive to add a bunch of driver -> executor RPCs just to reclaim one map entry.

Therefore, this looks good to me. There's one minor import-ordering nit, but I'll just fix that up myself on merge. Thanks for working on this, esp. since this code is a bit tricky to test.

@JoshRosen
Copy link
Contributor

Actually, we won't leak accumulators on executors because we clear the thread-local between tasks. So this is great!

@asfgit asfgit closed this in 95cd643 Feb 23, 2015
@JoshRosen
Copy link
Contributor

I've merged this into master (1.4.0). Thanks!

@ilganeli, it looks like your text editor might be leaving trailing whitespace on lines, which makes the diffs look kind of messy:

image

I fixed this myself when committing, so not a huge deal, but you might consider changing your editor settings to prevent htis in the future (I have a special vim setting that complains about trailing whitespace, and I think there's a way to configure IntelliJ to do the same).

@ilganeli
Copy link
Author

Hi Josh - thanks for the review and suggestions. With regards to the local thread cleanup I didn't add anything since as you pointed out it already gets cleaned up. Cheers!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Accumulator is garbage collected, should we log and continue ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception thrown here is caught at higher levels of the stack. For example, DAGScheduler wraps calls to accumulator methods in a try block and logs any uncaught exceptions. Have you run into a case where the current behavior causes a problem?

@tedyu
Copy link
Contributor

tedyu commented Feb 23, 2015

Not yet.

@JoshRosen
Copy link
Contributor

I think that this patch may have introduced a bug that may cause accumulator updates to be lost:
https://issues.apache.org/jira/browse/SPARK-6075

I'm still trying to see if I can spot the problem, but my hunch is that maybe the localAccums thread-local maps should not hold weak references. When deserializing an accumulator in an executor and registering it with localAccums, is there ever a moment in which the accumulator has no strong references pointing to it? Does someone object hold a strong reference to an accumulator while it's being deserialized? If not, this could lead to it being dropped from the localAccums map, causing that task's accumulator updates to be lost.

@JoshRosen
Copy link
Contributor

Another thought: if register() is somehow called twice for the same accumulator, then it looks like we'll silently overwrite the existing value in localAccums. We should probably throw an exception instead, since that scenario could lead to lost updates.

@JoshRosen
Copy link
Contributor

I think I've figured it out: consider the lifecycle of an accumulator in a task, say ShuffleMapTask: on the executor, each task deserializes its own copy of the RDD inside of its runTask method, so the strong reference to the RDD disappears at the end of runTask. In Executor.run(), we call Accumulators.values after runTask has exited, so there's a small window in which the tasks's RDD can be GC'd, causing accumulators to be GC'd as well because there are no longer any strong references to them.

The fix is to keep strong references in localAccums, since we clear this at the end of each task anyways. I'm glad that I was able to figure out precisely why this was necessary and sorry that I missed this during review; I'll submit a fix shortly. In terms of preventative measures, it might be a good idea to write up the lifetime / lifecycle of objects' strong references whenever we're using WeakReferences, since the process of explicitly writing that out would prevent these sorts of mistakes in the future.

@JoshRosen
Copy link
Contributor

Actually, we won't leak accumulators on executors because we clear the thread-local between tasks.

At this point, I should have spotted that it doesn't make sense to store WeakReferences if those references are going to be destroyed at the end of tasks anyways.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was dumb for me to overlook, too: if the data structure is invalid, then this would just silently ignore it; even if this was a rare error-condition, there should have been a warning here.

@JoshRosen
Copy link
Contributor

I have a fix at #4835.

@ilganeli
Copy link
Author

ilganeli commented Mar 1, 2015

Thanks for the detailed write up and the fix.

asfgit pushed a commit that referenced this pull request Mar 1, 2015
…store WeakReferences in localAccums map

This fixes a non-deterministic bug introduced in #4021 that could cause tasks' accumulator updates to be lost.  The problem is that `localAccums` should not hold weak references: after the task finishes running there won't be any strong references to these local accumulators, so they can get garbage-collected before the executor reads the `localAccums` map.  We don't need weak references here anyways, since this map is cleared at the end of each task.

Author: Josh Rosen <[email protected]>

Closes #4835 from JoshRosen/SPARK-6075 and squashes the following commits:

4f4b5b2 [Josh Rosen] Remove defensive assertions that caused test failures in code unrelated to this change
120c7b0 [Josh Rosen] [SPARK-6075] Do not store WeakReferences in localAccums map
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants