Skip to content

Commit 9a7e3e4

Browse files
committed
Refactored ReceivedBlockTracker API a bit to make things a little cleaner for users of the tracker.
1 parent af63655 commit 9a7e3e4

File tree

5 files changed

+98
-54
lines changed

5 files changed

+98
-54
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,22 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
5454

5555
def stop() {}
5656

57-
/** Ask ReceiverInputTracker for received data blocks and generates RDDs with them. */
57+
/**
58+
* Generates RDDs with blocks received by the receiver of this stream. */
5859
override def compute(validTime: Time): Option[RDD[T]] = {
5960
val blockRDD = {
60-
if (validTime >= graph.startTime) {
61-
val blockStoreResults = getReceivedBlockInfo(validTime).map { _.blockStoreResult }
61+
62+
if (validTime < graph.startTime) {
63+
// If this is called for any time before the start time of the context,
64+
// then this returns an empty RDD. This may happen when recovering from a
65+
// driver failure without any write ahead log to recover pre-failure data.
66+
new BlockRDD[T](ssc.sc, Array.empty)
67+
} else {
68+
// Otherwise, ask the tracker for all the blocks that have been allocated to this stream
69+
// for this batch
70+
val blockInfos =
71+
ssc.scheduler.receiverTracker.getBlocksOfBatch(validTime).get(id).getOrElse(Seq.empty)
72+
val blockStoreResults = blockInfos.map { _.blockStoreResult }
6273
val blockIds = blockStoreResults.map { _.blockId.asInstanceOf[BlockId] }.toArray
6374
val isWriteAheadLogBased = blockStoreResults.forall {
6475
_.isInstanceOf[WriteAheadLogBasedStoreResult]
@@ -72,21 +83,11 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
7283
} else {
7384
new BlockRDD[T](ssc.sc, blockIds)
7485
}
75-
} else {
76-
// If this is called for any time before the start time of the context,
77-
// then this returns an empty RDD. This may happen when recovering from a
78-
// driver failure, a
79-
new BlockRDD[T](ssc.sc, Array.empty)
8086
}
8187
}
8288
Some(blockRDD)
8389
}
8490

85-
/** Get information on received blocks. */
86-
private[streaming] def getReceivedBlockInfo(time: Time): Seq[ReceivedBlockInfo] = {
87-
ssc.scheduler.receiverTracker.getReceivedBlocks(time, id)
88-
}
89-
9091
/**
9192
* Clear metadata that are older than `rememberDuration` of this DStream.
9293
* This is an internal method that should not be called directly. This

streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -223,12 +223,9 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
223223
graph.generateJobs(time) // generate jobs using allocated block
224224
} match {
225225
case Success(jobs) =>
226-
val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
227-
val streamId = stream.id
228-
val receivedBlockInfo = stream.getReceivedBlockInfo(time).toArray
229-
(streamId, receivedBlockInfo)
230-
}.toMap
231-
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo.toMap))
226+
val receivedBlockInfos =
227+
jobScheduler.receiverTracker.getBlocksOfBatch(time).mapValues { _.toArray }
228+
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfos))
232229
case Failure(e) =>
233230
jobScheduler.reportError("Error generating jobs for time " + time, e)
234231
}

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import scala.language.implicitConversions
2525
import org.apache.hadoop.conf.Configuration
2626
import org.apache.hadoop.fs.Path
2727

28-
import org.apache.spark.{Logging, SparkConf}
28+
import org.apache.spark.{SparkException, Logging, SparkConf}
2929
import org.apache.spark.streaming.Time
3030
import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
3131
import org.apache.spark.util.Utils
@@ -42,8 +42,11 @@ private[streaming] case class BatchCleanupEvent(times: Seq[Time])
4242

4343

4444
/** Class representing the blocks of all the streams allocated to a batch */
45-
private[streaming] case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
46-
def getBlockForStream(streamId: Int) = streamIdToAllocatedBlocks(streamId)
45+
private[streaming]
46+
case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
47+
def getBlocksOfStream(streamId: Int): Seq[ReceivedBlockInfo] = {
48+
streamIdToAllocatedBlocks.get(streamId).getOrElse(Seq.empty)
49+
}
4750
}
4851

4952
/**
@@ -80,6 +83,8 @@ private[streaming] class ReceivedBlockTracker(
8083
)
8184
}
8285

86+
private var lastAllocatedBatchTime: Time = null
87+
8388
// Recover block information from write ahead logs
8489
recoverFromWriteAheadLogs()
8590

@@ -103,22 +108,40 @@ private[streaming] class ReceivedBlockTracker(
103108
* This event will get written to the write ahead log (if enabled).
104109
*/
105110
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
106-
val allocatedBlocks = AllocatedBlocks(streamIds.map { streamId =>
107-
(streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
108-
}.toMap)
109-
writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))
110-
timeToAllocatedBlocks(batchTime) = allocatedBlocks
111-
allocatedBlocks
111+
if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
112+
val allocatedBlocks = {
113+
val streamIdToBlocks = streamIds.map { streamId =>
114+
(streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
115+
}.toMap
116+
AllocatedBlocks(streamIdToBlocks)
117+
}
118+
writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))
119+
timeToAllocatedBlocks(batchTime) = allocatedBlocks
120+
lastAllocatedBatchTime = batchTime
121+
allocatedBlocks
122+
} else {
123+
throw new SparkException(s"Unexpected allocation of blocks, " +
124+
s"last batch = $lastAllocatedBatchTime, batch time to allocate = $batchTime ")
125+
}
112126
}
113127

114128
/** Get blocks that have been added but not yet allocated to any batch. */
115129
def getUnallocatedBlocks(streamId: Int): Seq[ReceivedBlockInfo] = synchronized {
116130
getReceivedBlockQueue(streamId).toSeq
117-
}
131+
}
132+
133+
/** Get the blocks allocated to the given batch. */
134+
def getBlocksOfBatch(batchTime: Time): Map[Int, Seq[ReceivedBlockInfo]] = synchronized {
135+
timeToAllocatedBlocks.get(batchTime).map { _.streamIdToAllocatedBlocks }.getOrElse(Map.empty)
136+
}
118137

119-
/** Get the blocks allocated to a batch, or allocate blocks to the batch and then get them. */
120-
def getBlocksOfBatch(batchTime: Time, streamId: Int): Seq[ReceivedBlockInfo] = synchronized {
121-
timeToAllocatedBlocks.get(batchTime).map { _.getBlockForStream(streamId) }.getOrElse(Seq.empty)
138+
/** Get the blocks allocated to the given batch and stream. */
139+
def getBlocksOfBatchAndStream(batchTime: Time, streamId: Int): Seq[ReceivedBlockInfo] = {
140+
synchronized {
141+
timeToAllocatedBlocks.get(batchTime).map {
142+
_.getBlocksOfStream(streamId)
143+
}.getOrElse(Seq.empty)
144+
}
122145
}
123146

124147
/** Check if any blocks are left to be allocated to batches. */
@@ -155,11 +178,12 @@ private[streaming] class ReceivedBlockTracker(
155178

156179
// Insert the recovered block-to-batch allocations and clear the queue of received blocks
157180
// (when the blocks were originally allocated to the batch, the queue must have been cleared).
158-
def insertAllocatedBatch(time: Time, allocatedBlocks: AllocatedBlocks) {
159-
logTrace(s"Recovery: Inserting allocated batch for time $time to " +
181+
def insertAllocatedBatch(batchTime: Time, allocatedBlocks: AllocatedBlocks) {
182+
logTrace(s"Recovery: Inserting allocated batch for time $batchTime to " +
160183
s"${allocatedBlocks.streamIdToAllocatedBlocks}")
161184
streamIdToUnallocatedBlockQueues.values.foreach { _.clear() }
162-
timeToAllocatedBlocks.put(time, allocatedBlocks)
185+
lastAllocatedBatchTime = batchTime
186+
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
163187
}
164188

165189
// Cleanup the batch allocations

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,13 @@ private[streaming]
5656
class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging {
5757

5858
private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
59-
private val receiverInputStreamMap = Map(receiverInputStreams.map(x => (x.id, x)): _*)
59+
private val receiverInputStreamIds = receiverInputStreams.map { _.id }
6060
private val receiverExecutor = new ReceiverLauncher()
6161
private val receiverInfo = new HashMap[Int, ReceiverInfo] with SynchronizedMap[Int, ReceiverInfo]
6262
private val receivedBlockTracker = new ReceivedBlockTracker(
6363
ssc.sparkContext.conf,
6464
ssc.sparkContext.hadoopConfiguration,
65-
receiverInputStreams.map { _.id },
65+
receiverInputStreamIds,
6666
ssc.scheduler.clock,
6767
Option(ssc.checkpointDir)
6868
)
@@ -107,12 +107,19 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
107107
}
108108
}
109109

110-
/** Get all the block for batch time . */
111-
def getReceivedBlocks(batchTime: Time, streamId: Int): Seq[ReceivedBlockInfo] = {
112-
receivedBlockTracker.getBlocksOfBatch(batchTime, streamId)
110+
/** Get the blocks for the given batch and all input streams. */
111+
def getBlocksOfBatch(batchTime: Time): Map[Int, Seq[ReceivedBlockInfo]] = {
112+
receivedBlockTracker.getBlocksOfBatch(batchTime)
113113
}
114114

115-
/** Clean up metadata older than the given threshold time */
115+
/** Get the blocks allocated to the given batch and stream. */
116+
def getBlocksOfBatchAndStream(batchTime: Time, streamId: Int): Seq[ReceivedBlockInfo] = {
117+
synchronized {
118+
receivedBlockTracker.getBlocksOfBatchAndStream(batchTime, streamId)
119+
}
120+
}
121+
122+
/** Clean up metadata older than the given threshold time */
116123
def cleanupOldMetadata(cleanupThreshTime: Time) {
117124
receivedBlockTracker.cleanupOldBatches(cleanupThreshTime)
118125
}
@@ -125,8 +132,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
125132
receiverActor: ActorRef,
126133
sender: ActorRef
127134
) {
128-
if (!receiverInputStreamMap.contains(streamId)) {
129-
throw new Exception("Register received for unexpected id " + streamId)
135+
if (!receiverInputStreamIds.contains(streamId)) {
136+
throw new SparkException("Register received for unexpected id " + streamId)
130137
}
131138
receiverInfo(streamId) = ReceiverInfo(
132139
streamId, s"${typ}-${streamId}", receiverActor, true, host)

streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import scala.util.Random
2828
import com.google.common.io.Files
2929
import org.apache.commons.io.FileUtils
3030
import org.apache.hadoop.conf.Configuration
31-
import org.apache.spark.{Logging, SparkConf}
31+
import org.apache.spark.{SparkException, Logging, SparkConf}
3232
import org.apache.spark.storage.StreamBlockId
3333
import org.apache.spark.streaming.scheduler._
3434
import org.apache.spark.streaming.util.WriteAheadLogSuite._
@@ -73,12 +73,27 @@ class ReceivedBlockTrackerSuite
7373
val blockInfos = generateBlockInfos()
7474
blockInfos.map(receivedBlockTracker.addBlock)
7575

76+
// Verify added blocks are unallocated blocks
7677
receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
78+
79+
// Allocate the blocks to a batch and verify that all of them have been allocated
7780
receivedBlockTracker.allocateBlocksToBatch(1)
78-
receivedBlockTracker.getBlocksOfBatch(1, streamId) shouldEqual blockInfos
79-
receivedBlockTracker.getUnallocatedBlocks(streamId) should have size 0
81+
receivedBlockTracker.getBlocksOfBatchAndStream(1, streamId) shouldEqual blockInfos
82+
receivedBlockTracker.getUnallocatedBlocks(streamId) shouldBe empty
83+
84+
// Allocate no blocks to another batch
8085
receivedBlockTracker.allocateBlocksToBatch(2)
81-
receivedBlockTracker.getBlocksOfBatch(2, streamId) should have size 0
86+
receivedBlockTracker.getBlocksOfBatchAndStream(2, streamId) shouldBe empty
87+
88+
// Verify that batch 2 cannot be allocated again
89+
intercept[SparkException] {
90+
receivedBlockTracker.allocateBlocksToBatch(2)
91+
}
92+
93+
// Verify that older batches cannot be allocated again
94+
intercept[SparkException] {
95+
receivedBlockTracker.allocateBlocksToBatch(1)
96+
}
8297
}
8398

8499
test("block addition, block to batch allocation and cleanup with write ahead log") {
@@ -126,14 +141,14 @@ class ReceivedBlockTrackerSuite
126141
// Allocate blocks to batch and verify whether the unallocated blocks got allocated
127142
val batchTime1 = manualClock.currentTime
128143
tracker2.allocateBlocksToBatch(batchTime1)
129-
tracker2.getBlocksOfBatch(batchTime1, streamId) shouldEqual blockInfos1
144+
tracker2.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual blockInfos1
130145

131146
// Add more blocks and allocate to another batch
132147
incrementTime()
133148
val batchTime2 = manualClock.currentTime
134149
val blockInfos2 = addBlockInfos(tracker2)
135150
tracker2.allocateBlocksToBatch(batchTime2)
136-
tracker2.getBlocksOfBatch(batchTime2, streamId) shouldEqual blockInfos2
151+
tracker2.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
137152

138153
// Verify whether log has correct contents
139154
val expectedWrittenData2 = expectedWrittenData1 ++
@@ -145,8 +160,8 @@ class ReceivedBlockTrackerSuite
145160
// Restart tracker and verify recovered state
146161
incrementTime()
147162
val tracker3 = createTracker(enableCheckpoint = true, clock = manualClock)
148-
tracker3.getBlocksOfBatch(batchTime1, streamId) shouldEqual blockInfos1
149-
tracker3.getBlocksOfBatch(batchTime2, streamId) shouldEqual blockInfos2
163+
tracker3.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual blockInfos1
164+
tracker3.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
150165
tracker3.getUnallocatedBlocks(streamId) shouldBe empty
151166

152167
// Cleanup first batch but not second batch
@@ -155,7 +170,7 @@ class ReceivedBlockTrackerSuite
155170
tracker3.cleanupOldBatches(batchTime2)
156171

157172
// Verify that the batch allocations have been cleaned, and the act has been written to log
158-
tracker3.getBlocksOfBatch(batchTime1, streamId) shouldEqual Seq.empty
173+
tracker3.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual Seq.empty
159174
getWrittenLogData(getWriteAheadLogFiles().last) should contain(createBatchCleanup(batchTime1))
160175

161176
// Verify that at least one log file gets deleted
@@ -169,8 +184,8 @@ class ReceivedBlockTrackerSuite
169184
incrementTime()
170185
val tracker4 = createTracker(enableCheckpoint = true, clock = manualClock)
171186
tracker4.getUnallocatedBlocks(streamId) shouldBe empty
172-
tracker4.getBlocksOfBatch(batchTime1, streamId) shouldBe empty // should be cleaned
173-
tracker4.getBlocksOfBatch(batchTime2, streamId) shouldEqual blockInfos2
187+
tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty // should be cleaned
188+
tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
174189
}
175190

176191
/**

0 commit comments

Comments
 (0)