From 494696e20382c65871cbb3c54d5155263a581018 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 13 Nov 2015 09:47:14 -0800 Subject: [PATCH 1/3] Fix the race condition of two checkpoints in a batch --- python/pyspark/streaming/tests.py | 9 ++++----- .../org/apache/spark/streaming/Checkpoint.scala | 15 +++++++++++++-- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 6ee864d8d3da6..c5952eb3d6dbd 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -630,7 +630,6 @@ def tearDown(self): if self.cpd is not None: shutil.rmtree(self.cpd) - @unittest.skip("Enable it when we fix the checkpoint bug") def test_get_or_create_and_get_active_or_create(self): inputd = tempfile.mkdtemp() outputd = tempfile.mkdtemp() + "/" @@ -699,11 +698,11 @@ def check_output(n): # Verify that getOrCreate() uses existing SparkContext self.ssc.stop(True, True) time.sleep(1) - sc = SparkContext(SparkConf()) + self.sc = SparkContext(conf=SparkConf()) self.setupCalled = False self.ssc = StreamingContext.getOrCreate(self.cpd, setup) self.assertFalse(self.setupCalled) - self.assertTrue(self.ssc.sparkContext == sc) + self.assertTrue(self.ssc.sparkContext == self.sc) # Verify the getActiveOrCreate() recovers from checkpoint files self.ssc.stop(True, True) @@ -722,11 +721,11 @@ def check_output(n): # Verify that getActiveOrCreate() uses existing SparkContext self.ssc.stop(True, True) time.sleep(1) - self.sc = SparkContext(SparkConf()) + self.sc = SparkContext(conf=SparkConf()) self.setupCalled = False self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup) self.assertFalse(self.setupCalled) - self.assertTrue(self.ssc.sparkContext == sc) + self.assertTrue(self.ssc.sparkContext == self.sc) # Verify that getActiveOrCreate() calls setup() in absence of checkpoint files self.ssc.stop(True, True) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 0cd55d9aec2cd..96b9301d63133 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -187,16 +187,27 @@ class CheckpointWriter( private var stopped = false private var fs_ : FileSystem = _ + @volatile private var latestCheckpointTime: Time = null + class CheckpointWriteHandler( checkpointTime: Time, bytes: Array[Byte], clearCheckpointDataLater: Boolean) extends Runnable { def run() { + if (latestCheckpointTime == null || latestCheckpointTime < checkpointTime) { + latestCheckpointTime = checkpointTime + } var attempts = 0 val startTime = System.currentTimeMillis() val tempFile = new Path(checkpointDir, "temp") - val checkpointFile = Checkpoint.checkpointFile(checkpointDir, checkpointTime) - val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, checkpointTime) + // We will do checkpoint when generating a batch and completing a batch. When the processing + // time of a batch is greater than the batch interval, checkpointing for completing an old + // batch may run after checkpointing of a new batch. If this happens, checkpoint of an old + // batch actually has the latest information, so we want to recovery from it. Therefore, we + // also use the latest checkpoint time as the file name, so that we can recovery from the + // latest checkpoint file. + val checkpointFile = Checkpoint.checkpointFile(checkpointDir, latestCheckpointTime) + val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, latestCheckpointTime) while (attempts < MAX_ATTEMPTS && !stopped) { attempts += 1 From 84ee813188d5007efd2ec72fe6db0c7e49cb141d Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 17 Nov 2015 13:25:08 -0800 Subject: [PATCH 2/3] Add a unit test --- .../spark/streaming/CheckpointSuite.scala | 27 +++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 84f5294aa39cc..b1cbc7163bee3 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.streaming import java.io.{ObjectOutputStream, ByteArrayOutputStream, ByteArrayInputStream, File} -import org.apache.spark.TestUtils import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import scala.reflect.ClassTag @@ -30,11 +29,13 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.{IntWritable, Text} import org.apache.hadoop.mapred.TextOutputFormat import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat} +import org.mockito.Mockito.mock import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ +import org.apache.spark.TestUtils import org.apache.spark.streaming.dstream.{DStream, FileInputDStream} -import org.apache.spark.streaming.scheduler.{ConstantEstimator, RateTestInputDStream, RateTestReceiver} +import org.apache.spark.streaming.scheduler._ import org.apache.spark.util.{MutableURLClassLoader, Clock, ManualClock, Utils} /** @@ -611,6 +612,28 @@ class CheckpointSuite extends TestSuiteBase { assert(ois.readObject().asInstanceOf[Class[_]].getName == "[LtestClz;") } + test("SPARK-11267: the race condition of two checkpoints in a batch") { + val jobGenerator = mock(classOf[JobGenerator]) + val checkpointDir = Utils.createTempDir().toString + val checkpointWriter = + new CheckpointWriter(jobGenerator, conf, checkpointDir, new Configuration()) + val bytes1 = Array.fill[Byte](10)(1) + new checkpointWriter.CheckpointWriteHandler( + Time(2000), bytes1, clearCheckpointDataLater = false).run() + val bytes2 = Array.fill[Byte](10)(2) + new checkpointWriter.CheckpointWriteHandler( + Time(1000), bytes2, clearCheckpointDataLater = true).run() + val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir).reverse.map { path => + new File(path.toUri) + } + assert(checkpointFiles.size === 2) + // Although bytes2 was written with an old time, it contains the latest status, so we should + // try to read from it at first. + assert(Files.toByteArray(checkpointFiles(0)) === bytes2) + assert(Files.toByteArray(checkpointFiles(1)) === bytes1) + checkpointWriter.stop() + } + /** * Tests a streaming operation under checkpointing, by restarting the operation * from checkpoint file and verifying whether the final output is correct. From fa580b76b1f176a843ca797e6dba08d1caabedf9 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 17 Nov 2015 13:26:27 -0800 Subject: [PATCH 3/3] Add comment --- .../src/main/scala/org/apache/spark/streaming/Checkpoint.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 96b9301d63133..fd0e8d5d690b6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -206,6 +206,9 @@ class CheckpointWriter( // batch actually has the latest information, so we want to recovery from it. Therefore, we // also use the latest checkpoint time as the file name, so that we can recovery from the // latest checkpoint file. + // + // Note: there is only one thread writting the checkpoint files, so we don't need to worry + // about thread-safety. val checkpointFile = Checkpoint.checkpointFile(checkpointDir, latestCheckpointTime) val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, latestCheckpointTime)