Skip to content

Commit 59496d3

Browse files
committed
Changed class names, made allocation more explicit and added cleanup
1 parent 19aec7d commit 59496d3

File tree

5 files changed

+95
-82
lines changed

5 files changed

+95
-82
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,6 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
9595
*/
9696
private[streaming] override def clearMetadata(time: Time) {
9797
super.clearMetadata(time)
98-
ssc.scheduler.receiverTracker.cleanupOldInfo(time - rememberDuration)
98+
ssc.scheduler.receiverTracker.cleanupOldMetadata(time - rememberDuration)
9999
}
100100
}

streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
112112
// Wait until all the received blocks in the network input tracker has
113113
// been consumed by network input DStreams, and jobs have been generated with them
114114
logInfo("Waiting for all received blocks to be consumed for job generation")
115-
while(!hasTimedOut && jobScheduler.receiverTracker.hasMoreReceivedBlockIds) {
115+
while(!hasTimedOut && jobScheduler.receiverTracker.hasUnallocatedBlocks) {
116116
Thread.sleep(pollTime)
117117
}
118118
logInfo("Waited for all received blocks to be consumed for job generation")
@@ -218,7 +218,10 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
218218
/** Generate jobs and perform checkpoint for the given `time`. */
219219
private def generateJobs(time: Time) {
220220
SparkEnv.set(ssc.env)
221-
Try(graph.generateJobs(time)) match {
221+
Try {
222+
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
223+
graph.generateJobs(time) // generate jobs using allocated block
224+
} match {
222225
case Success(jobs) =>
223226
val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
224227
val streamId = stream.id
@@ -235,6 +238,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
235238
/** Clear DStream metadata for the given `time`. */
236239
private def clearMetadata(time: Time) {
237240
ssc.graph.clearMetadata(time)
241+
jobScheduler.receiverTracker.cleanupOldMetadata(time - graph.batchDuration)
238242

239243
// If checkpointing is enabled, then checkpoint,
240244
// else mark batch to be fully processed

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala

Lines changed: 52 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -26,32 +26,31 @@ import org.apache.hadoop.conf.Configuration
2626
import org.apache.hadoop.fs.Path
2727

2828
import org.apache.spark.{Logging, SparkConf}
29-
import org.apache.spark.storage.StreamBlockId
3029
import org.apache.spark.streaming.Time
3130
import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
3231
import org.apache.spark.util.Utils
3332

34-
/** Trait representing any action done in the ReceivedBlockTracker */
35-
private[streaming] sealed trait ReceivedBlockTrackerAction
33+
/** Trait representing any event in the ReceivedBlockTracker that updates its state. */
34+
private[streaming] sealed trait ReceivedBlockTrackerLogEvent
3635

37-
private[streaming] case class BlockAddition(receivedBlockInfo: ReceivedBlockInfo)
38-
extends ReceivedBlockTrackerAction
39-
private[streaming] case class BatchAllocations(time: Time, allocatedBlocks: AllocatedBlocks)
40-
extends ReceivedBlockTrackerAction
41-
private[streaming] case class BatchCleanup(times: Seq[Time])
42-
extends ReceivedBlockTrackerAction
36+
private[streaming] case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo)
37+
extends ReceivedBlockTrackerLogEvent
38+
private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks)
39+
extends ReceivedBlockTrackerLogEvent
40+
private[streaming] case class BatchCleanupEvent(times: Seq[Time])
41+
extends ReceivedBlockTrackerLogEvent
4342

4443

4544
/** Class representing the blocks of all the streams allocated to a batch */
4645
case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
47-
def apply(streamId: Int) = streamIdToAllocatedBlocks(streamId)
46+
def getBlockForStream(streamId: Int) = streamIdToAllocatedBlocks(streamId)
4847
}
4948

5049
/**
5150
* Class that keep track of all the received blocks, and allocate them to batches
52-
* when required. All actions taken by this class can be saved to a write ahead log,
53-
* so that the state of the tracker (received blocks and block-to-batch allocations)
54-
* can be recovered after driver failure.
51+
* when required. All actions taken by this class can be saved to a write ahead log
52+
* (if a checkpoint directory has been provided), so that the state of the tracker
53+
* (received blocks and block-to-batch allocations) can be recovered after driver failure.
5554
*/
5655
private[streaming]
5756
class ReceivedBlockTracker(
@@ -60,8 +59,8 @@ class ReceivedBlockTracker(
6059

6160
private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
6261

63-
private val streamIdToUnallocatedBlockInfo = new mutable.HashMap[Int, ReceivedBlockQueue]
64-
private val timeToAllocatedBlockInfo = new mutable.HashMap[Time, AllocatedBlocks]
62+
private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
63+
private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
6564

6665
private val logManagerRollingIntervalSecs = conf.getInt(
6766
"spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
@@ -78,70 +77,69 @@ class ReceivedBlockTracker(
7877
// Recover block information from write ahead logs
7978
recoverFromWriteAheadLogs()
8079

81-
/** Add received block */
80+
/** Add received block. This event will get written to the write ahead log (if enabled). */
8281
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {
8382
try {
84-
writeToLog(BlockAddition(receivedBlockInfo))
83+
writeToLog(BlockAdditionEvent(receivedBlockInfo))
8584
getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
8685
logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
8786
s"block ${receivedBlockInfo.blockStoreResult.blockId}")
8887
true
8988
} catch {
9089
case e: Exception =>
91-
logError("Error adding block " + receivedBlockInfo, e)
90+
logError(s"Error adding block $receivedBlockInfo", e)
9291
false
9392
}
9493
}
9594

96-
/** Get blocks that have been added but not yet allocated to any batch */
95+
/**
96+
* Allocate all unallocated blocks to the given batch.
97+
* This event will get written to the write ahead log (if enabled).
98+
*/
99+
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
100+
val allocatedBlocks = AllocatedBlocks(streamIds.map { streamId =>
101+
(streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
102+
}.toMap)
103+
writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))
104+
timeToAllocatedBlocks(batchTime) = allocatedBlocks
105+
allocatedBlocks
106+
}
107+
108+
/** Get blocks that have been added but not yet allocated to any batch. */
97109
def getUnallocatedBlocks(streamId: Int): Seq[ReceivedBlockInfo] = synchronized {
98110
getReceivedBlockQueue(streamId).toSeq
99111
}
100112

101-
/** Get the blocks allocated to a batch, or allocate blocks to the batch and then get them */
102-
def getOrAllocateBlocksToBatch(batchTime: Time, streamId: Int): Seq[ReceivedBlockInfo] = {
103-
synchronized {
104-
if (!timeToAllocatedBlockInfo.contains(batchTime)) {
105-
allocateAllUnallocatedBlocksToBatch(batchTime)
106-
}
107-
timeToAllocatedBlockInfo(batchTime)(streamId)
108-
}
113+
/** Get the blocks allocated to a batch, or allocate blocks to the batch and then get them. */
114+
def getBlocksOfBatch(batchTime: Time, streamId: Int): Seq[ReceivedBlockInfo] = synchronized {
115+
timeToAllocatedBlocks.get(batchTime).map { _.getBlockForStream(streamId) }.getOrElse(Seq.empty)
109116
}
110117

111-
/** Check if any blocks are left to be allocated to batches */
112-
def hasUnallocatedReceivedBlocks(): Boolean = synchronized {
113-
!streamIdToUnallocatedBlockInfo.values.forall(_.isEmpty)
118+
/** Check if any blocks are left to be allocated to batches. */
119+
def hasUnallocatedReceivedBlocks: Boolean = synchronized {
120+
!streamIdToUnallocatedBlockQueues.values.forall(_.isEmpty)
114121
}
115122

116-
/** Clean up block information of old batches */
123+
/** Clean up block information of old batches. */
117124
def cleanupOldBatches(cleanupThreshTime: Time): Unit = synchronized {
118125
assert(cleanupThreshTime.milliseconds < clock.currentTime())
119-
val timesToCleanup = timeToAllocatedBlockInfo.keys.filter { _ < cleanupThreshTime }.toSeq
126+
val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
120127
logInfo("Deleting batches " + timesToCleanup)
121-
writeToLog(BatchCleanup(timesToCleanup))
122-
timeToAllocatedBlockInfo --= timesToCleanup
128+
writeToLog(BatchCleanupEvent(timesToCleanup))
129+
timeToAllocatedBlocks --= timesToCleanup
123130
logManagerOption.foreach(_.cleanupOldLogs(cleanupThreshTime.milliseconds))
124131
log
125132
}
126133

127-
/** Stop the block tracker */
134+
/** Stop the block tracker. */
128135
def stop() {
129136
logManagerOption.foreach { _.stop() }
130137
}
131138

132-
/** Allocate all unallocated blocks to the given batch */
133-
private def allocateAllUnallocatedBlocksToBatch(batchTime: Time): AllocatedBlocks = synchronized {
134-
val allocatedBlockInfos = AllocatedBlocks(streamIds.map { streamId =>
135-
(streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
136-
}.toMap)
137-
writeToLog(BatchAllocations(batchTime, allocatedBlockInfos))
138-
timeToAllocatedBlockInfo(batchTime) = allocatedBlockInfos
139-
allocatedBlockInfos
140-
}
141139

142140
/**
143141
* Recover all the tracker actions from the write ahead logs to recover the state (unallocated
144-
* and allocated block info) prior to failure
142+
* and allocated block info) prior to failure.
145143
*/
146144
private def recoverFromWriteAheadLogs(): Unit = synchronized {
147145
logInfo("Recovering from checkpoint")
@@ -160,34 +158,34 @@ class ReceivedBlockTracker(
160158
s"${allocatedBlocks.streamIdToAllocatedBlocks}")
161159
// println(s"Recovery: Inserting allocated batch for time $time to " +
162160
// s"${allocatedBlocks.streamIdToAllocatedBlocks}")
163-
streamIdToUnallocatedBlockInfo.values.foreach { _.clear() }
164-
timeToAllocatedBlockInfo.put(time, allocatedBlocks)
161+
streamIdToUnallocatedBlockQueues.values.foreach { _.clear() }
162+
timeToAllocatedBlocks.put(time, allocatedBlocks)
165163
}
166164

167165
// Cleanup the batch allocations
168166
def cleanupBatches(batchTimes: Seq[Time]) {
169167
logTrace(s"Recovery: Cleaning up batches $batchTimes")
170168
// println(s"Recovery: Cleaning up batches ${batchTimes}")
171-
timeToAllocatedBlockInfo --= batchTimes
169+
timeToAllocatedBlocks --= batchTimes
172170
}
173171

174172
logManagerOption.foreach { logManager =>
175173
logManager.readFromLog().foreach { byteBuffer =>
176174
logTrace("Recovering record " + byteBuffer)
177-
Utils.deserialize[ReceivedBlockTrackerAction](byteBuffer.array) match {
178-
case BlockAddition(receivedBlockInfo) =>
175+
Utils.deserialize[ReceivedBlockTrackerLogEvent](byteBuffer.array) match {
176+
case BlockAdditionEvent(receivedBlockInfo) =>
179177
insertAddedBlock(receivedBlockInfo)
180-
case BatchAllocations(time, allocatedBlocks) =>
178+
case BatchAllocationEvent(time, allocatedBlocks) =>
181179
insertAllocatedBatch(time, allocatedBlocks)
182-
case BatchCleanup(batchTimes) =>
180+
case BatchCleanupEvent(batchTimes) =>
183181
cleanupBatches(batchTimes)
184182
}
185183
}
186184
}
187185
}
188186

189187
/** Write an update to the tracker to the write ahead log */
190-
private def writeToLog(record: ReceivedBlockTrackerAction) {
188+
private def writeToLog(record: ReceivedBlockTrackerLogEvent) {
191189
logDebug(s"Writing to log $record")
192190
logManagerOption.foreach { logManager =>
193191
logManager.writeToLog(ByteBuffer.wrap(Utils.serialize(record)))
@@ -196,7 +194,7 @@ class ReceivedBlockTracker(
196194

197195
/** Get the queue of received blocks belonging to a particular stream */
198196
private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = {
199-
streamIdToUnallocatedBlockInfo.getOrElseUpdate(streamId, new ReceivedBlockQueue)
197+
streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)
200198
}
201199
}
202200

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,20 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
100100
}
101101
}
102102

103-
/** Return all the blocks received from a receiver. */
103+
/** Allocate all unallocated blocks to the given batch. */
104+
def allocateBlocksToBatch(batchTime: Time): Unit = {
105+
if (receiverInputStreams.nonEmpty) {
106+
receivedBlockTracker.allocateBlocksToBatch(batchTime)
107+
}
108+
}
109+
110+
/** Get all the block for batch time . */
104111
def getReceivedBlocks(batchTime: Time, streamId: Int): Seq[ReceivedBlockInfo] = {
105-
receivedBlockTracker.getOrAllocateBlocksToBatch(batchTime, streamId)
112+
receivedBlockTracker.getBlocksOfBatch(batchTime, streamId)
106113
}
107114

108-
def cleanupOldInfo(cleanupThreshTime: Time) {
115+
/** Clean up metadata older than the given threshold time */
116+
def cleanupOldMetadata(cleanupThreshTime: Time) {
109117
receivedBlockTracker.cleanupOldBatches(cleanupThreshTime)
110118
}
111119

@@ -170,8 +178,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
170178
}
171179

172180
/** Check if any blocks are left to be processed */
173-
def hasMoreReceivedBlockIds: Boolean = {
174-
receivedBlockTracker.hasUnallocatedReceivedBlocks()
181+
def hasUnallocatedBlocks: Boolean = {
182+
receivedBlockTracker.hasUnallocatedReceivedBlocks
175183
}
176184

177185
/** Actor to receive messages from the receivers. */

0 commit comments

Comments
 (0)