From 558bdc30f8d3134deea436165944704b5a3c31db Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 14 Jan 2015 13:24:21 +0800 Subject: [PATCH 1/4] Correctly replay the WAL log when recovering from failure --- .../examples/streaming/KafkaWordCount.scala | 2 +- .../streaming/scheduler/JobGenerator.scala | 18 ++++++++++++------ .../scheduler/ReceivedBlockTracker.scala | 6 ++++++ 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala index 2adc63f7ff30..387c0e421334 100644 --- a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala +++ b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala @@ -76,7 +76,7 @@ object KafkaWordCountProducer { val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args - // Zookeper connection properties + // Zookeeper connection properties val props = new Properties() props.put("metadata.broker.list", brokers) props.put("serializer.class", "kafka.serializer.StringEncoder") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index d86f852aba97..b3e2bc703b94 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -17,12 +17,14 @@ package org.apache.spark.streaming.scheduler -import akka.actor.{ActorRef, ActorSystem, Props, Actor} -import org.apache.spark.{SparkException, SparkEnv, Logging} -import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter} -import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock} import scala.util.{Failure, Success, Try} +import akka.actor.{ActorRef, Props, Actor} + +import org.apache.spark.{SparkEnv, Logging} +import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time} +import org.apache.spark.streaming.util.{Clock, ManualClock, RecurringTimer} + /** Event classes for JobGenerator */ private[scheduler] sealed trait JobGeneratorEvent private[scheduler] case class GenerateJobs(time: Time) extends JobGeneratorEvent @@ -206,9 +208,13 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering) logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " + timesToReschedule.mkString(", ")) - timesToReschedule.foreach(time => + timesToReschedule.foreach { time => + // Allocate the related blocks when recovering from failure, because some added but not + // allocated block is dangled in the queue after recovering, we have to insert some block + // allocation event to group up them and get the right behavior. + jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time))) - ) + } // Restart the timer timer.start(restartTime.milliseconds) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index ef23b5c79f2e..b2fa2d8b510a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -106,6 +106,12 @@ private[streaming] class ReceivedBlockTracker( timeToAllocatedBlocks(batchTime) = allocatedBlocks lastAllocatedBatchTime = batchTime allocatedBlocks + } else if (batchTime == lastAllocatedBatchTime) { + // This situation occurs when WAL is ended with BatchAllocationEvent, + // but without BatchCleanupEvent, possibly processed batch job or half-processed batch + // job need to process again, so the batchTime will be equal to lastAllocatedBatchTime. + // This situation will only occurs in recovery time. + logWarning(s"Possibly processed batch $batchTime need to be processed again in WAL recovery") } else { throw new SparkException(s"Unexpected allocation of blocks, " + s"last batch = $lastAllocatedBatchTime, batch time to allocate = $batchTime ") From e356258b309abfe39ca221d55a93f6505db459a4 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 14 Jan 2015 13:41:20 +0800 Subject: [PATCH 2/4] Fix bug in unit test --- .../apache/spark/streaming/ReceivedBlockTrackerSuite.scala | 5 ----- 1 file changed, 5 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index de7e9d624bf6..ba422e7da7af 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -82,11 +82,6 @@ class ReceivedBlockTrackerSuite receivedBlockTracker.allocateBlocksToBatch(2) receivedBlockTracker.getBlocksOfBatchAndStream(2, streamId) shouldBe empty - // Verify that batch 2 cannot be allocated again - intercept[SparkException] { - receivedBlockTracker.allocateBlocksToBatch(2) - } - // Verify that older batches cannot be allocated again intercept[SparkException] { receivedBlockTracker.allocateBlocksToBatch(1) From a237c75f87518b26245fd688de9bbae4bc151ae2 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 22 Jan 2015 17:46:58 +0800 Subject: [PATCH 3/4] Address the comments --- .../streaming/scheduler/ReceivedBlockTracker.scala | 14 +++++++------- .../streaming/ReceivedBlockTrackerSuite.scala | 11 +++++++---- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index b2fa2d8b510a..476d3ed6ec5f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -106,15 +106,15 @@ private[streaming] class ReceivedBlockTracker( timeToAllocatedBlocks(batchTime) = allocatedBlocks lastAllocatedBatchTime = batchTime allocatedBlocks - } else if (batchTime == lastAllocatedBatchTime) { - // This situation occurs when WAL is ended with BatchAllocationEvent, - // but without BatchCleanupEvent, possibly processed batch job or half-processed batch - // job need to process again, so the batchTime will be equal to lastAllocatedBatchTime. + } else { + // This situation occurs when: + // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent, + // possibly processed batch job or half-processed batch job need to be processed again, + // so the batchTime will be equal to lastAllocatedBatchTime. + // 2. Slow checkpointing makes recovered batch time older than WAL recovered + // lastAllocatedBatchTime. // This situation will only occurs in recovery time. logWarning(s"Possibly processed batch $batchTime need to be processed again in WAL recovery") - } else { - throw new SparkException(s"Unexpected allocation of blocks, " + - s"last batch = $lastAllocatedBatchTime, batch time to allocate = $batchTime ") } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index ba422e7da7af..bca83d68dbb8 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -82,10 +82,13 @@ class ReceivedBlockTrackerSuite receivedBlockTracker.allocateBlocksToBatch(2) receivedBlockTracker.getBlocksOfBatchAndStream(2, streamId) shouldBe empty - // Verify that older batches cannot be allocated again - intercept[SparkException] { - receivedBlockTracker.allocateBlocksToBatch(1) - } + // Verify that older batches have no operation on batch allocation, + // will return the same blocks as previously allocated. + receivedBlockTracker.allocateBlocksToBatch(1) + receivedBlockTracker.getBlocksOfBatchAndStream(1, streamId) shouldEqual blockInfos + + receivedBlockTracker.allocateBlocksToBatch(2) + receivedBlockTracker.getBlocksOfBatchAndStream(2, streamId) shouldBe empty } test("block addition, block to batch allocation and cleanup with write ahead log") { From f0b0c0bcc8db551c57c1590c8a12b01f5f0ae2d0 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 22 Jan 2015 21:35:20 +0800 Subject: [PATCH 4/4] Further address the comments --- .../org/apache/spark/streaming/scheduler/JobGenerator.scala | 6 +++--- .../spark/streaming/scheduler/ReceivedBlockTracker.scala | 4 ++-- .../apache/spark/streaming/ReceivedBlockTrackerSuite.scala | 6 ++++-- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index b3e2bc703b94..8632c94349bf 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -209,9 +209,9 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " + timesToReschedule.mkString(", ")) timesToReschedule.foreach { time => - // Allocate the related blocks when recovering from failure, because some added but not - // allocated block is dangled in the queue after recovering, we have to insert some block - // allocation event to group up them and get the right behavior. + // Allocate the related blocks when recovering from failure, because some blocks that were + // added but not allocated, are dangling in the queue after recovering, we have to allocate + // those blocks to the next batch, which is the batch they were supposed to go. jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time))) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index 476d3ed6ec5f..e19ac939f9ac 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -67,7 +67,7 @@ private[streaming] class ReceivedBlockTracker( extends Logging { private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo] - + private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue] private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks] private val logManagerOption = createLogManager() @@ -114,7 +114,7 @@ private[streaming] class ReceivedBlockTracker( // 2. Slow checkpointing makes recovered batch time older than WAL recovered // lastAllocatedBatchTime. // This situation will only occurs in recovery time. - logWarning(s"Possibly processed batch $batchTime need to be processed again in WAL recovery") + logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery") } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index bca83d68dbb8..fbb7b0bfebaf 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -87,8 +87,10 @@ class ReceivedBlockTrackerSuite receivedBlockTracker.allocateBlocksToBatch(1) receivedBlockTracker.getBlocksOfBatchAndStream(1, streamId) shouldEqual blockInfos + blockInfos.map(receivedBlockTracker.addBlock) receivedBlockTracker.allocateBlocksToBatch(2) receivedBlockTracker.getBlocksOfBatchAndStream(2, streamId) shouldBe empty + receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos } test("block addition, block to batch allocation and cleanup with write ahead log") { @@ -184,14 +186,14 @@ class ReceivedBlockTrackerSuite tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty // should be cleaned tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2 } - + test("enabling write ahead log but not setting checkpoint dir") { conf.set("spark.streaming.receiver.writeAheadLog.enable", "true") intercept[SparkException] { createTracker(setCheckpointDir = false) } } - + test("setting checkpoint dir but not enabling write ahead log") { // When WAL config is not set, log manager should not be enabled val tracker1 = createTracker(setCheckpointDir = true)