@@ -42,7 +42,7 @@ private[streaming] case class BatchCleanupEvent(times: Seq[Time])
4242
4343
4444/** Class representing the blocks of all the streams allocated to a batch */
45- case class AllocatedBlocks (streamIdToAllocatedBlocks : Map [Int , Seq [ReceivedBlockInfo ]]) {
45+ private [streaming] case class AllocatedBlocks (streamIdToAllocatedBlocks : Map [Int , Seq [ReceivedBlockInfo ]]) {
4646 def getBlockForStream (streamId : Int ) = streamIdToAllocatedBlocks(streamId)
4747}
4848
@@ -51,11 +51,17 @@ case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlock
5151 * when required. All actions taken by this class can be saved to a write ahead log
5252 * (if a checkpoint directory has been provided), so that the state of the tracker
5353 * (received blocks and block-to-batch allocations) can be recovered after driver failure.
54+ *
55+ * Note that when any instance of this class is created with a checkpoint directory,
56+ * it will try reading events from logs in the directory.
5457 */
55- private [streaming]
56- class ReceivedBlockTracker (
57- conf : SparkConf , hadoopConf : Configuration , streamIds : Seq [Int ], clock : Clock ,
58- checkpointDirOption : Option [String ]) extends Logging {
58+ private [streaming] class ReceivedBlockTracker (
59+ conf : SparkConf ,
60+ hadoopConf : Configuration ,
61+ streamIds : Seq [Int ],
62+ clock : Clock ,
63+ checkpointDirOption : Option [String ])
64+ extends Logging {
5965
6066 private type ReceivedBlockQueue = mutable.Queue [ReceivedBlockInfo ]
6167
@@ -136,14 +142,11 @@ class ReceivedBlockTracker(
136142 logManagerOption.foreach { _.stop() }
137143 }
138144
139-
140145 /**
141146 * Recover all the tracker actions from the write ahead logs to recover the state (unallocated
142147 * and allocated block info) prior to failure.
143148 */
144149 private def recoverFromWriteAheadLogs (): Unit = synchronized {
145- logInfo(" Recovering from checkpoint" )
146-
147150 // Insert the recovered block information
148151 def insertAddedBlock (receivedBlockInfo : ReceivedBlockInfo ) {
149152 logTrace(s " Recovery: Inserting added block $receivedBlockInfo" )
@@ -166,6 +169,7 @@ class ReceivedBlockTracker(
166169 }
167170
168171 logManagerOption.foreach { logManager =>
172+ logInfo(s " Recovering from write ahead logs in ${checkpointDirOption.get}" )
169173 logManager.readFromLog().foreach { byteBuffer =>
170174 logTrace(" Recovering record " + byteBuffer)
171175 Utils .deserialize[ReceivedBlockTrackerLogEvent ](byteBuffer.array) match {
0 commit comments