Skip to content

Commit d06fa21

Browse files
committed
Enabled ReceivedBlockTracker by default, stored block metadata and optimized block fetching in WALBackedBlockRDD
1 parent caf0136 commit d06fa21

File tree

9 files changed

+134
-89
lines changed

9 files changed

+134
-89
lines changed

core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@ private[spark]
3131
class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds: Array[BlockId])
3232
extends RDD[T](sc, Nil) {
3333

34-
@transient lazy val locations_ = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
34+
@transient lazy val _locations = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
3535
@volatile private var _isValid = true
36+
@volatile private var _setInvalid = true
3637

3738
override def getPartitions: Array[Partition] = {
3839
assertValid()
@@ -54,7 +55,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
5455

5556
override def getPreferredLocations(split: Partition): Seq[String] = {
5657
assertValid()
57-
locations_(split.asInstanceOf[BlockRDDPartition].blockId)
58+
_locations(split.asInstanceOf[BlockRDDPartition].blockId)
5859
}
5960

6061
/**
@@ -66,7 +67,9 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
6667
blockIds.foreach { blockId =>
6768
sc.env.blockManager.master.removeBlock(blockId)
6869
}
69-
_isValid = false
70+
if (_setInvalid) {
71+
_isValid = false
72+
}
7073
}
7174

7275
/**
@@ -85,8 +88,12 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
8588
}
8689
}
8790

91+
protected def setInvalidIfBlocksRemoved(setInvalid: Boolean): Unit = {
92+
_setInvalid = setInvalid
93+
}
94+
8895
protected def getBlockIdLocations(): Map[BlockId, Seq[String]] = {
89-
locations_
96+
_locations
9097
}
9198
}
9299

streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -67,27 +67,26 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
6767
} else {
6868
// Otherwise, ask the tracker for all the blocks that have been allocated to this stream
6969
// for this batch
70-
val blockInfos =
71-
ssc.scheduler.receiverTracker.getBlocksOfBatch(validTime).get(id).getOrElse(Seq.empty)
72-
val blockStoreResults = blockInfos.map { _.blockStoreResult }
73-
val blockIds = blockStoreResults.map { _.blockId.asInstanceOf[BlockId] }.toArray
70+
val receiverTracker = ssc.scheduler.receiverTracker
71+
val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
72+
val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
7473

75-
// Check whether all the results are of the same type
76-
val resultTypes = blockStoreResults.map { _.getClass }.distinct
77-
if (resultTypes.size > 1) {
78-
logWarning("Multiple result types in block information, WAL information will be ignored.")
79-
}
74+
// Is WAL segment info present with all the blocks
75+
val isWALSegmentInfoPresent = blockInfos.forall { _.writeAheadLogSegmentOption.nonEmpty }
8076

81-
// If all the results are of type WriteAheadLogBasedStoreResult, then create
82-
// WriteAheadLogBackedBlockRDD else create simple BlockRDD.
83-
if (resultTypes.size == 1 && resultTypes.head == classOf[WriteAheadLogBasedStoreResult]) {
84-
val logSegments = blockStoreResults.map {
85-
_.asInstanceOf[WriteAheadLogBasedStoreResult].segment
86-
}.toArray
87-
// Since storeInBlockManager = false, the storage level does not matter.
88-
new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext,
89-
blockIds, logSegments, storeInBlockManager = false, StorageLevel.MEMORY_ONLY_SER)
77+
if (isWALSegmentInfoPresent) {
78+
// If all the blocks have WAL segment info, then create a WALBackedBlockRDD
79+
val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
80+
val blockWALSegments = blockInfos.map { _.writeAheadLogSegmentOption.get }.toArray
81+
new WriteAheadLogBackedBlockRDD[T](
82+
ssc.sparkContext, blockIds, blockWALSegments, isBlockIdValid)
9083
} else {
84+
// Else, create a BlockRDD. However, if there are some blocks with WAL info but not others
85+
// then that is unexpected and log a warning accordingly.
86+
if (blockInfos.find(_.writeAheadLogSegmentOption.nonEmpty).nonEmpty) {
87+
logWarning("Could not find Write Ahead Log information on some of the blocks, " +
88+
"data may not be recoverable after driver failures")
89+
}
9190
new BlockRDD[T](ssc.sc, blockIds)
9291
}
9392
}

streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala

Lines changed: 55 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.hadoop.conf.Configuration
2222

2323
import org.apache.spark._
2424
import org.apache.spark.rdd.BlockRDD
25-
import org.apache.spark.storage.{BlockId, StorageLevel}
25+
import org.apache.spark.storage.{StreamBlockId, BlockId, StorageLevel}
2626
import org.apache.spark.streaming.util.{HdfsUtils, WriteAheadLogFileSegment, WriteAheadLogRandomReader}
2727

2828
/**
@@ -37,6 +37,7 @@ private[streaming]
3737
class WriteAheadLogBackedBlockRDDPartition(
3838
val index: Int,
3939
val blockId: BlockId,
40+
val isBlockIdValid: Boolean,
4041
val segment: WriteAheadLogFileSegment)
4142
extends Partition
4243

@@ -45,11 +46,19 @@ class WriteAheadLogBackedBlockRDDPartition(
4546
* This class represents a special case of the BlockRDD where the data blocks in
4647
* the block manager are also backed by segments in write ahead logs. For reading
4748
* the data, this RDD first looks up the blocks by their ids in the block manager.
48-
* If it does not find them, it looks up the corresponding file segment.
49+
* If it does not find them, it looks up the corresponding file segment. The finding
50+
* of the blocks by their ids can be skipped by setting the corresponding element in
51+
* isBlockIdValid to false. This is a performance optimization which does not affect
52+
* correctness, and it can be used in situations where it is known that the block
53+
* does not exist in the Spark executors (e.g. after a failed driver is restarted).
54+
*
4955
*
5056
* @param sc SparkContext
5157
* @param blockIds Ids of the blocks that contains this RDD's data
5258
* @param segments Segments in write ahead logs that contain this RDD's data
59+
* @param isBlockIdValid Whether the block Ids are valid (i.e., the blocks are present in the Spark
60+
* executors). If not, then block lookups by the block ids will be skipped.
61+
* By default, this is an empty array signifying true for all the blocks.
5362
* @param storeInBlockManager Whether to store in the block manager after reading from the segment
5463
* @param storageLevel storage level to store when storing in block manager
5564
* (applicable when storeInBlockManager = true)
@@ -59,23 +68,32 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
5968
@transient sc: SparkContext,
6069
@transient blockIds: Array[BlockId],
6170
@transient segments: Array[WriteAheadLogFileSegment],
62-
storeInBlockManager: Boolean,
63-
storageLevel: StorageLevel)
71+
@transient isBlockIdValid: Array[Boolean] = Array.empty,
72+
storeInBlockManager: Boolean = false,
73+
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER)
6474
extends BlockRDD[T](sc, blockIds) {
6575

6676
require(
6777
blockIds.length == segments.length,
68-
s"Number of block ids (${blockIds.length}) must be " +
69-
s"the same as number of segments (${segments.length}})!")
78+
s"Number of block Ids (${blockIds.length}) must be " +
79+
s" same as number of segments (${segments.length}})")
80+
81+
require(
82+
isBlockIdValid.isEmpty || isBlockIdValid.length == blockIds.length,
83+
s"Number of elements in isBlockIdValid (${isBlockIdValid.length}) must be " +
84+
s" same as number of block Ids (${blockIds.length})")
7085

7186
// Hadoop configuration is not serializable, so broadcast it as a serializable.
7287
@transient private val hadoopConfig = sc.hadoopConfiguration
7388
private val broadcastedHadoopConf = new SerializableWritable(hadoopConfig)
7489

90+
setInvalidIfBlocksRemoved(false)
91+
7592
override def getPartitions: Array[Partition] = {
7693
assertValid()
77-
Array.tabulate(blockIds.size) { i =>
78-
new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), segments(i))
94+
Array.tabulate(blockIds.length) { i =>
95+
val isValid = if (isBlockIdValid.length == 0) true else isBlockIdValid(i)
96+
new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), isValid, segments(i))
7997
}
8098
}
8199

@@ -90,22 +108,29 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
90108
val blockManager = SparkEnv.get.blockManager
91109
val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
92110
val blockId = partition.blockId
93-
blockManager.get(blockId) match {
94-
case Some(block) => // Data is in Block Manager
95-
val iterator = block.data.asInstanceOf[Iterator[T]]
96-
logDebug(s"Read partition data of $this from block manager, block $blockId")
97-
iterator
98-
case None => // Data not found in Block Manager, grab it from write ahead log file
99-
val reader = new WriteAheadLogRandomReader(partition.segment.path, hadoopConf)
100-
val dataRead = reader.read(partition.segment)
101-
reader.close()
102-
logInfo(s"Read partition data of $this from write ahead log, segment ${partition.segment}")
103-
if (storeInBlockManager) {
104-
blockManager.putBytes(blockId, dataRead, storageLevel)
105-
logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
106-
dataRead.rewind()
107-
}
108-
blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]]
111+
val segment = partition.segment
112+
113+
def getBlockFromBlockManager(): Option[Iterator[T]] = {
114+
blockManager.get(blockId).map(_.data.asInstanceOf[Iterator[T]])
115+
}
116+
117+
def getBlockFromWriteAheadLog(): Iterator[T] = {
118+
val reader = new WriteAheadLogRandomReader(segment.path, hadoopConf)
119+
val dataRead = reader.read(segment)
120+
reader.close()
121+
logDebug(s"Read partition data of $this from write ahead log, segment ${partition.segment}")
122+
if (storeInBlockManager) {
123+
blockManager.putBytes(blockId, dataRead, storageLevel)
124+
logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
125+
dataRead.rewind()
126+
}
127+
blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]]
128+
}
129+
130+
if (partition.isBlockIdValid) {
131+
getBlockFromBlockManager().getOrElse { getBlockFromWriteAheadLog() }
132+
} else {
133+
getBlockFromWriteAheadLog()
109134
}
110135
}
111136

@@ -116,7 +141,12 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
116141
*/
117142
override def getPreferredLocations(split: Partition): Seq[String] = {
118143
val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
119-
val blockLocations = getBlockIdLocations().get(partition.blockId)
144+
val blockLocations = if (partition.isBlockIdValid) {
145+
getBlockIdLocations().get(partition.blockId)
146+
} else {
147+
None
148+
}
149+
120150
blockLocations.getOrElse(
121151
HdfsUtils.getFileSegmentLocations(
122152
partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfig))

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ private[streaming] class ReceiverSupervisorImpl(
145145
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
146146
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
147147

148-
val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult)
148+
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
149149
trackerEndpoint.askWithReply[Boolean](AddBlock(blockInfo))
150150
logDebug(s"Reported block $blockId")
151151
}

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,38 @@
1717

1818
package org.apache.spark.streaming.scheduler
1919

20-
import org.apache.spark.streaming.receiver.ReceivedBlockStoreResult
20+
import org.apache.spark.storage.StreamBlockId
21+
import org.apache.spark.streaming.receiver.{WriteAheadLogBasedStoreResult, ReceivedBlockStoreResult}
22+
import org.apache.spark.streaming.util.WriteAheadLogFileSegment
2123

2224
/** Information about blocks received by the receiver */
2325
private[streaming] case class ReceivedBlockInfo(
2426
streamId: Int,
2527
numRecords: Long,
28+
metadataOption: Option[Any],
2629
blockStoreResult: ReceivedBlockStoreResult
27-
)
30+
) {
31+
32+
@volatile private var _isBlockIdValid = true
33+
34+
def blockId: StreamBlockId = blockStoreResult.blockId
35+
36+
def writeAheadLogSegmentOption: Option[WriteAheadLogFileSegment] = {
37+
blockStoreResult match {
38+
case walStoreResult: WriteAheadLogBasedStoreResult => Some(walStoreResult.segment)
39+
case _ => None
40+
}
41+
}
42+
43+
/** Is the block ID valid, that is, is the block present in the Spark executors. */
44+
def isBlockIdValid(): Boolean = _isBlockIdValid
45+
46+
/**
47+
* Set the block ID as invalid. This is useful when it is known that the block is not present
48+
* in the Spark executors.
49+
*/
50+
def setBlockIdInvalid(): Unit = {
51+
_isBlockIdValid = false
52+
}
53+
}
2854

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ import scala.language.implicitConversions
2525
import org.apache.hadoop.conf.Configuration
2626
import org.apache.hadoop.fs.Path
2727

28-
import org.apache.spark.{SparkException, Logging, SparkConf}
2928
import org.apache.spark.streaming.Time
3029
import org.apache.spark.streaming.util.WriteAheadLogManager
3130
import org.apache.spark.util.{Clock, Utils}
31+
import org.apache.spark.{Logging, SparkConf, SparkException}
3232

3333
/** Trait representing any event in the ReceivedBlockTracker that updates its state. */
3434
private[streaming] sealed trait ReceivedBlockTrackerLogEvent
@@ -45,7 +45,7 @@ private[streaming] case class BatchCleanupEvent(times: Seq[Time])
4545
private[streaming]
4646
case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
4747
def getBlocksOfStream(streamId: Int): Seq[ReceivedBlockInfo] = {
48-
streamIdToAllocatedBlocks.get(streamId).getOrElse(Seq.empty)
48+
streamIdToAllocatedBlocks.getOrElse(streamId, Seq.empty)
4949
}
5050
}
5151

@@ -171,6 +171,7 @@ private[streaming] class ReceivedBlockTracker(
171171
// Insert the recovered block information
172172
def insertAddedBlock(receivedBlockInfo: ReceivedBlockInfo) {
173173
logTrace(s"Recovery: Inserting added block $receivedBlockInfo")
174+
receivedBlockInfo.setBlockIdInvalid()
174175
getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
175176
}
176177

@@ -223,22 +224,13 @@ private[streaming] class ReceivedBlockTracker(
223224

224225
/** Optionally create the write ahead log manager only if the feature is enabled */
225226
private def createLogManager(): Option[WriteAheadLogManager] = {
226-
if (conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
227-
if (checkpointDirOption.isEmpty) {
228-
throw new SparkException(
229-
"Cannot enable receiver write-ahead log without checkpoint directory set. " +
230-
"Please use streamingContext.checkpoint() to set the checkpoint directory. " +
231-
"See documentation for more details.")
232-
}
233-
val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)
227+
checkpointDirOption.map { checkpointDir =>
228+
val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir)
234229
val rollingIntervalSecs = conf.getInt(
235230
"spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
236-
val logManager = new WriteAheadLogManager(logDir, hadoopConf,
231+
new WriteAheadLogManager(logDir, hadoopConf,
237232
rollingIntervalSecs = rollingIntervalSecs, clock = clock,
238233
callerName = "ReceivedBlockHandlerMaster")
239-
Some(logManager)
240-
} else {
241-
None
242234
}
243235
}
244236

streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ class ReceivedBlockTrackerSuite
8888
receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
8989
}
9090

91-
test("block addition, block to batch allocation and cleanup with write ahead log") {
91+
test("recovery and cleanup with write ahead logs") {
9292
val manualClock = new ManualClock
9393
// Set the time increment level to twice the rotation interval so that every increment creates
9494
// a new log file
@@ -114,7 +114,6 @@ class ReceivedBlockTrackerSuite
114114
}
115115

116116
// Start tracker and add blocks
117-
conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
118117
conf.set("spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", "1")
119118
val tracker1 = createTracker(clock = manualClock)
120119
tracker1.isLogManagerEnabled should be (true)
@@ -130,7 +129,11 @@ class ReceivedBlockTrackerSuite
130129
// Restart tracker and verify recovered list of unallocated blocks
131130
incrementTime()
132131
val tracker2 = createTracker(clock = manualClock)
133-
tracker2.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1
132+
val unallocatedBlocks = tracker2.getUnallocatedBlocks(streamId).toList
133+
unallocatedBlocks shouldEqual blockInfos1
134+
unallocatedBlocks.foreach { block =>
135+
block.isBlockIdValid() should be (false)
136+
}
134137

135138
// Allocate blocks to batch and verify whether the unallocated blocks got allocated
136139
val batchTime1 = manualClock.getTimeMillis()
@@ -182,22 +185,10 @@ class ReceivedBlockTrackerSuite
182185
tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
183186
}
184187

185-
test("enabling write ahead log but not setting checkpoint dir") {
186-
conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
187-
intercept[SparkException] {
188-
createTracker(setCheckpointDir = false)
189-
}
190-
}
191-
192-
test("setting checkpoint dir but not enabling write ahead log") {
193-
// When WAL config is not set, log manager should not be enabled
194-
val tracker1 = createTracker(setCheckpointDir = true)
188+
test("write ahead log disabled when not checkpoint directory is set") {
189+
// When checkpoint is not enabled, then the write ahead log is also disabled
190+
val tracker1 = createTracker(setCheckpointDir = false)
195191
tracker1.isLogManagerEnabled should be (false)
196-
197-
// When WAL is explicitly disabled, log manager should not be enabled
198-
conf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
199-
val tracker2 = createTracker(setCheckpointDir = true)
200-
tracker2.isLogManagerEnabled should be(false)
201192
}
202193

203194
/**
@@ -215,7 +206,7 @@ class ReceivedBlockTrackerSuite
215206

216207
/** Generate blocks infos using random ids */
217208
def generateBlockInfos(): Seq[ReceivedBlockInfo] = {
218-
List.fill(5)(ReceivedBlockInfo(streamId, 0,
209+
List.fill(5)(ReceivedBlockInfo(streamId, 0, None,
219210
BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)))))
220211
}
221212

0 commit comments

Comments
 (0)