Skip to content

Conversation

@JoshRosen
Copy link
Contributor

This fixes a non-deterministic bug introduced in #4021 that could cause tasks' accumulator updates to be lost. The problem is that localAccums should not hold weak references: after the task finishes running there won't be any strong references to these local accumulators, so they can get garbage-collected before the executor reads the localAccums map. We don't need weak references here anyways, since this map is cleared at the end of each task.

@JoshRosen
Copy link
Contributor Author

/cc @andrewor14, it turns out that the "flaky" accumulator test was actually a real bug (fixed by this patch).

@JoshRosen
Copy link
Contributor Author

This only affects master for the last 6 days or so, by the way.

@SparkQA
Copy link

SparkQA commented Feb 28, 2015

Test build #28131 has started for PR 4835 at commit 120c7b0.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 28, 2015

Test build #28131 has finished for PR 4835 at commit 120c7b0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/28131/
Test FAILed.

@JoshRosen
Copy link
Contributor Author

That's a legitimate test failure caused by a new assertion that I added here (being extra defensive about avoiding duplicate accumulator registration, since this might cause silent data loss):

Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 4, localhost): java.io.IOException: java.lang.IllegalArgumentException: requirement failed: Accumulator 2 has already been registered  at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1165)  at org.apache.spark.Accumulable.readObject(Accumulators.scala:133)  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)  at java.lang.reflect.Method.invoke(Method.java:606)  at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)  at 

@JoshRosen
Copy link
Contributor Author

I still think that we should investigate / add that assertion, since correctness of accumulators relies on the assumption that a deserialized task has only one instance of an an accumulator with a given id, but it looks like this issue existed prior to this recent WeakReference change; I'll roll back my new assertion and follow up on that issue in a separate patch.

@SparkQA
Copy link

SparkQA commented Feb 28, 2015

Test build #28133 has started for PR 4835 at commit 4f4b5b2.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 28, 2015

Test build #28133 has finished for PR 4835 at commit 4f4b5b2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/28133/
Test PASSed.

@JoshRosen
Copy link
Contributor Author

I'm going to commit this to master (1.4.0) in order to fix the bug and failing tests.

@asfgit asfgit closed this in 2df5f1f Mar 1, 2015
@JoshRosen JoshRosen deleted the SPARK-6075 branch March 1, 2015 23:59
@liancheng
Copy link
Contributor

I think this PR also fixes SPARK-6020, since InMemoryColumnarTableScan uses accumulator to generate debugging information which is dedicated for PartitionBatchPruningSuite. The test failures showed in SPARK-6020 suggested that accumulator updates may get lost nondeterminstically.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants