Skip to content
15 changes: 11 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ private[spark]
class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds: Array[BlockId])
extends RDD[T](sc, Nil) {

@transient lazy val locations_ = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
@transient lazy val _locations = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
@volatile private var _isValid = true
@volatile private var _setInvalid = true

override def getPartitions: Array[Partition] = {
assertValid()
Expand All @@ -54,7 +55,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds

override def getPreferredLocations(split: Partition): Seq[String] = {
assertValid()
locations_(split.asInstanceOf[BlockRDDPartition].blockId)
_locations(split.asInstanceOf[BlockRDDPartition].blockId)
}

/**
Expand All @@ -66,7 +67,9 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
blockIds.foreach { blockId =>
sc.env.blockManager.master.removeBlock(blockId)
}
_isValid = false
if (_setInvalid) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So rather than having this maybe the subclass of this could override isValid. It's a bit confusing as it stands.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do. That is a more intuitive design.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I will add unit tests to test this behavior for BlockRDD and WALBackedBlockRDD.

_isValid = false
}
}

/**
Expand All @@ -85,8 +88,12 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
}
}

protected def setInvalidIfBlocksRemoved(setInvalid: Boolean): Unit = {
_setInvalid = setInvalid
}

protected def getBlockIdLocations(): Map[BlockId, Seq[String]] = {
locations_
_locations
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -67,27 +67,26 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
} else {
// Otherwise, ask the tracker for all the blocks that have been allocated to this stream
// for this batch
val blockInfos =
ssc.scheduler.receiverTracker.getBlocksOfBatch(validTime).get(id).getOrElse(Seq.empty)
val blockStoreResults = blockInfos.map { _.blockStoreResult }
val blockIds = blockStoreResults.map { _.blockId.asInstanceOf[BlockId] }.toArray
val receiverTracker = ssc.scheduler.receiverTracker
val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray

// Check whether all the results are of the same type
val resultTypes = blockStoreResults.map { _.getClass }.distinct
if (resultTypes.size > 1) {
logWarning("Multiple result types in block information, WAL information will be ignored.")
}
// Are WAL record handles present with all the blocks
val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do wonder if this should just throw an exception if any have missing wal records... that just means there is an exception right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or rather, can we tell whether the WAL is in use with this receiver and then just throw an exception if we see any blocks that do not have a record handle?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do that. Since it is a conf property.


// If all the results are of type WriteAheadLogBasedStoreResult, then create
// WriteAheadLogBackedBlockRDD else create simple BlockRDD.
if (resultTypes.size == 1 && resultTypes.head == classOf[WriteAheadLogBasedStoreResult]) {
val logSegments = blockStoreResults.map {
_.asInstanceOf[WriteAheadLogBasedStoreResult].walRecordHandle
}.toArray
// Since storeInBlockManager = false, the storage level does not matter.
new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext,
blockIds, logSegments, storeInBlockManager = false, StorageLevel.MEMORY_ONLY_SER)
if (areWALRecordHandlesPresent) {
// If all the blocks have WAL record handle, then create a WALBackedBlockRDD
val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
new WriteAheadLogBackedBlockRDD[T](
ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
} else {
// Else, create a BlockRDD. However, if there are some blocks with WAL info but not others
// then that is unexpected and log a warning accordingly.
if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
logWarning("Could not find Write Ahead Log information on some of the blocks, " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like at least an ERROR level log right? Is there a code path that is expected where this branch is realized?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cant imagine a CURRENT code path whether this can happen. So I am not sure what to do. logError is probably a better idea.

"data may not be recoverable after driver failures")
}
new BlockRDD[T](ssc.sc, blockIds)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.streaming.util._
/**
* Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]].
* It contains information about the id of the blocks having this partition's data and
* the segment of the write ahead log that backs the partition.
* the corresponding record handle in the write ahead log that backs the partition.
* @param index index of the partition
* @param blockId id of the block having the partition data
* @param walRecordHandle Handle of the record in a write ahead log having the partition data
Expand All @@ -40,21 +40,30 @@ private[streaming]
class WriteAheadLogBackedBlockRDDPartition(
val index: Int,
val blockId: BlockId,
val walRecordHandle: WriteAheadLogRecordHandle)
extends Partition
val isBlockIdValid: Boolean,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add this param to the scaladoc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

val walRecordHandle: WriteAheadLogRecordHandle
) extends Partition


/**
* This class represents a special case of the BlockRDD where the data blocks in
* the block manager are also backed by data in write ahead logs. For reading
* the data, this RDD first looks up the blocks by their ids in the block manager.
* If it does not find them, it looks up the corresponding data in the write ahead log.
* If it does not find them, it looks up the WAL using the corresponding record handle.
* The lookup of the blocks from the block manager can be skipped by setting the corresponding
* element in isBlockIdValid to false. This is a performance optimization which does not affect
* correctness, and it can be used in situations where it is known that the block
* does not exist in the Spark executors (e.g. after a failed driver is restarted).
*
*
* @param sc SparkContext
* @param blockIds Ids of the blocks that contains this RDD's data
* @param walRecordHandles Record handles in write ahead logs that contain this RDD's data
* @param storeInBlockManager Whether to store in the block manager after reading
* from the WAL record
* @param isBlockIdValid Whether the block Ids are valid (i.e., the blocks are present in the Spark
* executors). If not, then block lookups by the block ids will be skipped.
* By default, this is an empty array signifying true for all the blocks.
* @param storeInBlockManager Whether to store a block in the block manager
* after reading it from the WAL
* @param storageLevel storage level to store when storing in block manager
* (applicable when storeInBlockManager = true)
*/
Expand All @@ -63,23 +72,32 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
@transient sc: SparkContext,
@transient blockIds: Array[BlockId],
@transient walRecordHandles: Array[WriteAheadLogRecordHandle],
storeInBlockManager: Boolean,
storageLevel: StorageLevel)
@transient isBlockIdValid: Array[Boolean] = Array.empty,
storeInBlockManager: Boolean = false,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER)
extends BlockRDD[T](sc, blockIds) {

require(
blockIds.length == walRecordHandles.length,
s"Number of block ids (${blockIds.length}) must be " +
s"the same as number of WAL record handles (${walRecordHandles.length}})!")
s"Number of block Ids (${blockIds.length}) must be " +
s" same as number of WAL record handles (${walRecordHandles.length}})")

require(
isBlockIdValid.isEmpty || isBlockIdValid.length == blockIds.length,
s"Number of elements in isBlockIdValid (${isBlockIdValid.length}) must be " +
s" same as number of block Ids (${blockIds.length})")

// Hadoop configuration is not serializable, so broadcast it as a serializable.
@transient private val hadoopConfig = sc.hadoopConfiguration
private val broadcastedHadoopConf = new SerializableWritable(hadoopConfig)

setInvalidIfBlocksRemoved(false)

override def getPartitions: Array[Partition] = {
assertValid()
Array.tabulate(blockIds.size) { i =>
new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), walRecordHandles(i))
Array.tabulate(blockIds.length) { i =>
val isValid = if (isBlockIdValid.length == 0) true else isBlockIdValid(i)
new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), isValid, walRecordHandles(i))
}
}

Expand All @@ -94,47 +112,53 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
val blockManager = SparkEnv.get.blockManager
val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
val blockId = partition.blockId
blockManager.get(blockId) match {
case Some(block) => // Data is in Block Manager
val iterator = block.data.asInstanceOf[Iterator[T]]
logDebug(s"Read partition data of $this from block manager, block $blockId")
iterator
case None => // Data not found in Block Manager, grab it from write ahead log file
var dataRead: ByteBuffer = null
var writeAheadLog: WriteAheadLog = null
try {
// The WriteAheadLogUtils.createLog*** method needs a directory to create a
// WriteAheadLog object as the default FileBasedWriteAheadLog needs a directory for
// writing log data. However, the directory is not needed if data needs to be read, hence
// a dummy path is provided to satisfy the method parameter requirements.
// FileBasedWriteAheadLog will not create any file or directory at that path.
val dummyDirectory = FileUtils.getTempDirectoryPath()
writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
SparkEnv.get.conf, dummyDirectory, hadoopConf)
dataRead = writeAheadLog.read(partition.walRecordHandle)
} catch {
case NonFatal(e) =>
throw new SparkException(
s"Could not read data from write ahead log record ${partition.walRecordHandle}", e)
} finally {
if (writeAheadLog != null) {
writeAheadLog.close()
writeAheadLog = null
}
}
if (dataRead == null) {

def getBlockFromBlockManager(): Option[Iterator[T]] = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole thing just puts the old code within inner functions, not really a big change. The real change is that these functions are called selectively in lines 157..160.

blockManager.get(blockId).map(_.data.asInstanceOf[Iterator[T]])
}

def getBlockFromWriteAheadLog(): Iterator[T] = {
var dataRead: ByteBuffer = null
var writeAheadLog: WriteAheadLog = null
try {
// The WriteAheadLogUtils.createLog*** method needs a directory to create a
// WriteAheadLog object as the default FileBasedWriteAheadLog needs a directory for
// writing log data. However, the directory is not needed if data needs to be read, hence
// a dummy path is provided to satisfy the method parameter requirements.
// FileBasedWriteAheadLog will not create any file or directory at that path.
val dummyDirectory = FileUtils.getTempDirectoryPath()
writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
SparkEnv.get.conf, dummyDirectory, hadoopConf)
dataRead = writeAheadLog.read(partition.walRecordHandle)
} catch {
case NonFatal(e) =>
throw new SparkException(
s"Could not read data from write ahead log record ${partition.walRecordHandle}, " +
s"read returned null")
s"Could not read data from write ahead log record ${partition.walRecordHandle}", e)
} finally {
if (writeAheadLog != null) {
writeAheadLog.close()
writeAheadLog = null
}
logInfo(s"Read partition data of $this from write ahead log, record handle " +
partition.walRecordHandle)
if (storeInBlockManager) {
blockManager.putBytes(blockId, dataRead, storageLevel)
logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
dataRead.rewind()
}
blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]]
}
if (dataRead == null) {
throw new SparkException(
s"Could not read data from write ahead log record ${partition.walRecordHandle}, " +
s"read returned null")
}
logInfo(s"Read partition data of $this from write ahead log, record handle " +
partition.walRecordHandle)
if (storeInBlockManager) {
blockManager.putBytes(blockId, dataRead, storageLevel)
logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
dataRead.rewind()
}
blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]]
}

if (partition.isBlockIdValid) {
getBlockFromBlockManager().getOrElse { getBlockFromWriteAheadLog() }
} else {
getBlockFromWriteAheadLog()
}
}

Expand All @@ -145,7 +169,12 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
*/
override def getPreferredLocations(split: Partition): Seq[String] = {
val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
val blockLocations = getBlockIdLocations().get(partition.blockId)
val blockLocations = if (partition.isBlockIdValid) {
getBlockIdLocations().get(partition.blockId)
} else {
None
}

blockLocations.getOrElse {
partition.walRecordHandle match {
case fileSegment: FileBasedWriteAheadLogSegment =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private[streaming] class ReceiverSupervisorImpl(
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")

val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult)
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
logDebug(s"Reported block $blockId")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,38 @@

package org.apache.spark.streaming.scheduler

import org.apache.spark.streaming.receiver.ReceivedBlockStoreResult
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.receiver.{ReceivedBlockStoreResult, WriteAheadLogBasedStoreResult}
import org.apache.spark.streaming.util.WriteAheadLogRecordHandle

/** Information about blocks received by the receiver */
private[streaming] case class ReceivedBlockInfo(
streamId: Int,
numRecords: Long,
metadataOption: Option[Any],
blockStoreResult: ReceivedBlockStoreResult
)
) {

@volatile private var _isBlockIdValid = true

def blockId: StreamBlockId = blockStoreResult.blockId

def walRecordHandleOption: Option[WriteAheadLogRecordHandle] = {
blockStoreResult match {
case walStoreResult: WriteAheadLogBasedStoreResult => Some(walStoreResult.walRecordHandle)
case _ => None
}
}

/** Is the block ID valid, that is, is the block present in the Spark executors. */
def isBlockIdValid(): Boolean = _isBlockIdValid

/**
* Set the block ID as invalid. This is useful when it is known that the block is not present
* in the Spark executors.
*/
def setBlockIdInvalid(): Unit = {
_isBlockIdValid = false
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private[streaming] case class BatchCleanupEvent(times: Seq[Time])
private[streaming]
case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
def getBlocksOfStream(streamId: Int): Seq[ReceivedBlockInfo] = {
streamIdToAllocatedBlocks.get(streamId).getOrElse(Seq.empty)
streamIdToAllocatedBlocks.getOrElse(streamId, Seq.empty)
}
}

Expand All @@ -63,6 +63,7 @@ private[streaming] class ReceivedBlockTracker(
hadoopConf: Configuration,
streamIds: Seq[Int],
clock: Clock,
recoverFromWriteAheadLog: Boolean,
checkpointDirOption: Option[String])
extends Logging {

Expand All @@ -75,7 +76,9 @@ private[streaming] class ReceivedBlockTracker(
private var lastAllocatedBatchTime: Time = null

// Recover block information from write ahead logs
recoverFromWriteAheadLogs()
if (recoverFromWriteAheadLog) {
recoverPastEvents()
}

/** Add received block. This event will get written to the write ahead log (if enabled). */
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {
Expand Down Expand Up @@ -167,10 +170,11 @@ private[streaming] class ReceivedBlockTracker(
* Recover all the tracker actions from the write ahead logs to recover the state (unallocated
* and allocated block info) prior to failure.
*/
private def recoverFromWriteAheadLogs(): Unit = synchronized {
private def recoverPastEvents(): Unit = synchronized {
// Insert the recovered block information
def insertAddedBlock(receivedBlockInfo: ReceivedBlockInfo) {
logTrace(s"Recovery: Inserting added block $receivedBlockInfo")
receivedBlockInfo.setBlockIdInvalid()
getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
}

Expand Down Expand Up @@ -224,19 +228,9 @@ private[streaming] class ReceivedBlockTracker(

/** Optionally create the write ahead log manager only if the feature is enabled */
private def createWriteAheadLog(): Option[WriteAheadLog] = {
if (WriteAheadLogUtils.enableReceiverLog(conf)) {
if (checkpointDirOption.isEmpty) {
throw new SparkException(
"Cannot enable receiver write-ahead log without checkpoint directory set. " +
"Please use streamingContext.checkpoint() to set the checkpoint directory. " +
"See documentation for more details.")
}
checkpointDirOption.map { checkpointDir =>
val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)

val log = WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf)
Some(log)
} else {
None
WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf)
}
}

Expand Down
Loading