Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ object KafkaWordCountProducer {

val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args

// Zookeper connection properties
// Zookeeper connection properties
val props = new Properties()
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

package org.apache.spark.streaming.scheduler

import akka.actor.{ActorRef, ActorSystem, Props, Actor}
import org.apache.spark.{SparkException, SparkEnv, Logging}
import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter}
import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock}
import scala.util.{Failure, Success, Try}

import akka.actor.{ActorRef, Props, Actor}

import org.apache.spark.{SparkEnv, Logging}
import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time}
import org.apache.spark.streaming.util.{Clock, ManualClock, RecurringTimer}

/** Event classes for JobGenerator */
private[scheduler] sealed trait JobGeneratorEvent
private[scheduler] case class GenerateJobs(time: Time) extends JobGeneratorEvent
Expand Down Expand Up @@ -206,9 +208,13 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering)
logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " +
timesToReschedule.mkString(", "))
timesToReschedule.foreach(time =>
timesToReschedule.foreach { time =>
// Allocate the related blocks when recovering from failure, because some blocks that were
// added but not allocated, are dangling in the queue after recovering, we have to allocate
// those blocks to the next batch, which is the batch they were supposed to go.
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time)))
)
}

// Restart the timer
timer.start(restartTime.milliseconds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private[streaming] class ReceivedBlockTracker(
extends Logging {

private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]

private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
private val logManagerOption = createLogManager()
Expand Down Expand Up @@ -107,8 +107,14 @@ private[streaming] class ReceivedBlockTracker(
lastAllocatedBatchTime = batchTime
allocatedBlocks
} else {
throw new SparkException(s"Unexpected allocation of blocks, " +
s"last batch = $lastAllocatedBatchTime, batch time to allocate = $batchTime ")
// This situation occurs when:
// 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
// possibly processed batch job or half-processed batch job need to be processed again,
// so the batchTime will be equal to lastAllocatedBatchTime.
// 2. Slow checkpointing makes recovered batch time older than WAL recovered
// lastAllocatedBatchTime.
// This situation will only occurs in recovery time.
logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,15 @@ class ReceivedBlockTrackerSuite
receivedBlockTracker.allocateBlocksToBatch(2)
receivedBlockTracker.getBlocksOfBatchAndStream(2, streamId) shouldBe empty

// Verify that batch 2 cannot be allocated again
intercept[SparkException] {
receivedBlockTracker.allocateBlocksToBatch(2)
}
// Verify that older batches have no operation on batch allocation,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add test to verify that allocateBlocksToBatch(x) where x<=2 does nothing.

// will return the same blocks as previously allocated.
receivedBlockTracker.allocateBlocksToBatch(1)
receivedBlockTracker.getBlocksOfBatchAndStream(1, streamId) shouldEqual blockInfos

// Verify that older batches cannot be allocated again
intercept[SparkException] {
receivedBlockTracker.allocateBlocksToBatch(1)
}
blockInfos.map(receivedBlockTracker.addBlock)
receivedBlockTracker.allocateBlocksToBatch(2)
receivedBlockTracker.getBlocksOfBatchAndStream(2, streamId) shouldBe empty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also test that even if there are new unallocated blocks, calling receivedBlockTracker.allocateBlocksToBatch(2) does not allocate those blocks? I dont think that is covered in this test.

receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
}

test("block addition, block to batch allocation and cleanup with write ahead log") {
Expand Down Expand Up @@ -186,14 +186,14 @@ class ReceivedBlockTrackerSuite
tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty // should be cleaned
tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
}

test("enabling write ahead log but not setting checkpoint dir") {
conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
intercept[SparkException] {
createTracker(setCheckpointDir = false)
}
}

test("setting checkpoint dir but not enabling write ahead log") {
// When WAL config is not set, log manager should not be enabled
val tracker1 = createTracker(setCheckpointDir = true)
Expand Down