Skip to content

Conversation

@mbautin
Copy link

@mbautin mbautin commented Feb 29, 2016

markhamstra added a commit that referenced this pull request Feb 29, 2016
Make broadcast future package-private and add a flag allowing to check if it is initialized
@markhamstra markhamstra merged commit 13c88b0 into alteryx:csd-1.6 Feb 29, 2016
markhamstra pushed a commit to markhamstra/spark that referenced this pull request Apr 27, 2016
CheckpointWriter.stop() is prone to a race condition: if one thread calls `stop()` right as a checkpoint write task begins to execute, that write task may become blocked when trying to access `fs`, the shared Hadoop FileSystem, since both the `fs` getter and `stop` method synchronize on the same lock. Here's a thread-dump excerpt which illustrates the problem:

```java
"pool-31-thread-1" alteryx#156 prio=5 os_prio=31 tid=0x00007fea02cd2000 nid=0x5c0b waiting for monitor entry [0x000000013bc4c000]
   java.lang.Thread.State: BLOCKED (on object monitor)
    at org.apache.spark.streaming.CheckpointWriter.org$apache$spark$streaming$CheckpointWriter$$fs(Checkpoint.scala:302)
    - waiting to lock <0x00000007bf53ee78> (a org.apache.spark.streaming.CheckpointWriter)
    at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:224)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

"pool-1-thread-1-ScalaTest-running-MapWithStateSuite" alteryx#11 prio=5 os_prio=31 tid=0x00007fe9ff879800 nid=0x5703 waiting on condition [0x000000012e54c000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007bf564568> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
    at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1465)
    at org.apache.spark.streaming.CheckpointWriter.stop(Checkpoint.scala:291)
    - locked <0x00000007bf53ee78> (a org.apache.spark.streaming.CheckpointWriter)
    at org.apache.spark.streaming.scheduler.JobGenerator.stop(JobGenerator.scala:159)
    - locked <0x00000007bf53ea90> (a org.apache.spark.streaming.scheduler.JobGenerator)
    at org.apache.spark.streaming.scheduler.JobScheduler.stop(JobScheduler.scala:115)
    - locked <0x00000007bf53d3f0> (a org.apache.spark.streaming.scheduler.JobScheduler)
    at org.apache.spark.streaming.StreamingContext$$anonfun$stop$1.apply$mcV$sp(StreamingContext.scala:680)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1219)
    at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:679)
    - locked <0x00000007bf516a70> (a org.apache.spark.streaming.StreamingContext)
    at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:644)
    - locked <0x00000007bf516a70> (a org.apache.spark.streaming.StreamingContext)
[...]
```

We can fix this problem by having `stop` and `fs` be synchronized on different locks: the synchronization on `stop` only needs to guard against multiple threads calling `stop` at the same time, whereas the synchronization on `fs` is only necessary for cross-thread visibility. There's only ever a single active checkpoint writer thread at a time, so we don't need to guard against concurrent access to `fs`. Thus, `fs` can simply become a `volatile` var, similar to `lastCheckpointTime`.

This change should fix [SPARK-13693](https://issues.apache.org/jira/browse/SPARK-13693), a flaky `MapWithStateSuite` test suite which has recently been failing several times per day. It also results in a huge test speedup: prior to this patch, `MapWithStateSuite` took about 80 seconds to run, whereas it now runs in less than 10 seconds. For the `streaming` project's tests as a whole, they now run in ~220 seconds vs. ~354 before.

/cc zsxwing and tdas for review.

Author: Josh Rosen <[email protected]>

Closes apache#12712 from JoshRosen/fix-checkpoint-writer-race.
markhamstra pushed a commit to markhamstra/spark that referenced this pull request Nov 7, 2017
* Exclude SparkSubmitSuite from Travis unit test build

* Remove SortShuffleSuite

* Exclude Java tests
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants