Commit 450136e
[SPARK-14930][SPARK-13693] Fix race condition in CheckpointWriter.stop()
CheckpointWriter.stop() is prone to a race condition: if one thread calls `stop()` right as a checkpoint write task begins to execute, that write task may become blocked when trying to access `fs`, the shared Hadoop FileSystem, since both the `fs` getter and `stop` method synchronize on the same lock. Here's a thread-dump excerpt which illustrates the problem:
```java
"pool-31-thread-1" apache#156 prio=5 os_prio=31 tid=0x00007fea02cd2000 nid=0x5c0b waiting for monitor entry [0x000000013bc4c000]
java.lang.Thread.State: BLOCKED (on object monitor)
at org.apache.spark.streaming.CheckpointWriter.org$apache$spark$streaming$CheckpointWriter$$fs(Checkpoint.scala:302)
- waiting to lock <0x00000007bf53ee78> (a org.apache.spark.streaming.CheckpointWriter)
at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:224)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
"pool-1-thread-1-ScalaTest-running-MapWithStateSuite" #11 prio=5 os_prio=31 tid=0x00007fe9ff879800 nid=0x5703 waiting on condition [0x000000012e54c000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000007bf564568> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1465)
at org.apache.spark.streaming.CheckpointWriter.stop(Checkpoint.scala:291)
- locked <0x00000007bf53ee78> (a org.apache.spark.streaming.CheckpointWriter)
at org.apache.spark.streaming.scheduler.JobGenerator.stop(JobGenerator.scala:159)
- locked <0x00000007bf53ea90> (a org.apache.spark.streaming.scheduler.JobGenerator)
at org.apache.spark.streaming.scheduler.JobScheduler.stop(JobScheduler.scala:115)
- locked <0x00000007bf53d3f0> (a org.apache.spark.streaming.scheduler.JobScheduler)
at org.apache.spark.streaming.StreamingContext$$anonfun$stop$1.apply$mcV$sp(StreamingContext.scala:680)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1219)
at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:679)
- locked <0x00000007bf516a70> (a org.apache.spark.streaming.StreamingContext)
at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:644)
- locked <0x00000007bf516a70> (a org.apache.spark.streaming.StreamingContext)
[...]
```
We can fix this problem by having `stop` and `fs` be synchronized on different locks: the synchronization on `stop` only needs to guard against multiple threads calling `stop` at the same time, whereas the synchronization on `fs` is only necessary for cross-thread visibility. There's only ever a single active checkpoint writer thread at a time, so we don't need to guard against concurrent access to `fs`. Thus, `fs` can simply become a `volatile` var, similar to `lastCheckpointTime`.
This change should fix [SPARK-13693](https://issues.apache.org/jira/browse/SPARK-13693), a flaky `MapWithStateSuite` test suite which has recently been failing several times per day. It also results in a huge test speedup: prior to this patch, `MapWithStateSuite` took about 80 seconds to run, whereas it now runs in less than 10 seconds. For the `streaming` project's tests as a whole, they now run in ~220 seconds vs. ~354 before.
/cc zsxwing and tdas for review.
Author: Josh Rosen <[email protected]>
Closes apache#12712 from JoshRosen/fix-checkpoint-writer-race.1 parent e4d439c commit 450136e
File tree
1 file changed
+5
-12
lines changed- streaming/src/main/scala/org/apache/spark/streaming
1 file changed
+5
-12
lines changedLines changed: 5 additions & 12 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
184 | 184 | | |
185 | 185 | | |
186 | 186 | | |
187 | | - | |
188 | | - | |
| 187 | + | |
189 | 188 | | |
190 | 189 | | |
191 | 190 | | |
| |||
196 | 195 | | |
197 | 196 | | |
198 | 197 | | |
| 198 | + | |
| 199 | + | |
| 200 | + | |
199 | 201 | | |
200 | 202 | | |
201 | 203 | | |
| |||
263 | 265 | | |
264 | 266 | | |
265 | 267 | | |
266 | | - | |
| 268 | + | |
267 | 269 | | |
268 | 270 | | |
269 | 271 | | |
| |||
297 | 299 | | |
298 | 300 | | |
299 | 301 | | |
300 | | - | |
301 | | - | |
302 | | - | |
303 | | - | |
304 | | - | |
305 | | - | |
306 | | - | |
307 | | - | |
308 | | - | |
309 | 302 | | |
310 | 303 | | |
311 | 304 | | |
| |||
0 commit comments