@@ -41,41 +41,47 @@ class WriteAheadLogBackedBlockRDDPartition(
4141
4242
4343/**
44- * This class represents a special case of the BlockRDD where the data blocks in the block manager are also
45- * backed by segments in write ahead logs. For reading the data, this RDD first looks up the blocks by their ids
46- * in the block manager. If it does not find them, it looks up the corresponding file segment.
44+ * This class represents a special case of the BlockRDD where the data blocks in
45+ * the block manager are also backed by segments in write ahead logs. For reading
46+ * the data, this RDD first looks up the blocks by their ids in the block manager.
47+ * If it does not find them, it looks up the corresponding file segment.
4748 *
4849 * @param sc SparkContext
49- * @param hadoopConfiguration Hadoop configuration
50+ * @param hadoopConfig Hadoop configuration
5051 * @param blockIds Ids of the blocks that contains this RDD's data
5152 * @param segments Segments in write ahead logs that contain this RDD's data
52- * @param storeInBlockManager Whether to store in the block manager after reading from the log segment
53- * @param storageLevel storage level to store when storing in block manager (applicable when storeInBlockManager = true)
53+ * @param storeInBlockManager Whether to store in the block manager after reading from the segment
54+ * @param storageLevel storage level to store when storing in block manager
55+ * (applicable when storeInBlockManager = true)
5456 */
5557private [streaming]
5658class WriteAheadLogBackedBlockRDD [T : ClassTag ](
5759 @ transient sc : SparkContext ,
58- @ transient hadoopConfiguration : Configuration ,
60+ @ transient hadoopConfig : Configuration ,
5961 @ transient override val blockIds : Array [BlockId ],
6062 @ transient val segments : Array [WriteAheadLogFileSegment ],
6163 val storeInBlockManager : Boolean ,
6264 val storageLevel : StorageLevel
6365 ) extends BlockRDD [T ](sc, blockIds) {
6466
65- require(blockIds.length == segments.length,
66- s " Number of block ids ( ${blockIds.length}) must be the same as number of segments ( ${segments.length}})! " )
67+ require(
68+ blockIds.length == segments.length,
69+ s " Number of block ids ( ${blockIds.length}) must be " +
70+ s " the same as number of segments ( ${segments.length}})! " )
6771
6872 // Hadoop configuration is not serializable, so broadcast it as a serializable.
69- private val broadcastedHadoopConf = new SerializableWritable (hadoopConfiguration )
73+ private val broadcastedHadoopConf = new SerializableWritable (hadoopConfig )
7074
7175 override def getPartitions : Array [Partition ] = {
7276 assertValid()
73- Array .tabulate(blockIds.size){ i => new WriteAheadLogBackedBlockRDDPartition (i, blockIds(i), segments(i)) }
77+ Array .tabulate(blockIds.size) { i =>
78+ new WriteAheadLogBackedBlockRDDPartition (i, blockIds(i), segments(i)) }
7479 }
7580
7681 /**
77- * Gets the partition data by getting the corresponding block from the block manager. If the block does not
78- * exist, then the data is read from the corresponding segment in write ahead log files.
82+ * Gets the partition data by getting the corresponding block from the block manager.
83+ * If the block does not exist, then the data is read from the corresponding segment
84+ * in write ahead log files.
7985 */
8086 override def compute (split : Partition , context : TaskContext ): Iterator [T ] = {
8187 assertValid()
@@ -86,31 +92,32 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
8692 blockManager.get(blockId) match {
8793 case Some (block) => // Data is in Block Manager
8894 val iterator = block.data.asInstanceOf [Iterator [T ]]
89- logDebug(s " Read partition data of RDD $this from block manager, block $blockId" )
95+ logDebug(s " Read partition data of $this from block manager, block $blockId" )
9096 iterator
9197 case None => // Data not found in Block Manager, grab it from write ahead log file
9298 val reader = new WriteAheadLogRandomReader (partition.segment.path, hadoopConf)
9399 val dataRead = reader.read(partition.segment)
94100 reader.close()
95- logInfo(s " Read partition data of RDD $this from write ahead log, segment ${partition.segment}" )
101+ logInfo(s " Read partition data of $this from write ahead log, segment ${partition.segment}" )
96102 if (storeInBlockManager) {
97103 blockManager.putBytes(blockId, dataRead, storageLevel)
98- logDebug(s " Stored partition data of RDD $this into block manager with level $storageLevel" )
104+ logDebug(s " Stored partition data of $this into block manager with level $storageLevel" )
99105 dataRead.rewind()
100106 }
101107 blockManager.dataDeserialize(blockId, dataRead).asInstanceOf [Iterator [T ]]
102108 }
103109 }
104110
105111 /**
106- * Get the preferred location of the partition. This returns the locations of the block if it is present in the
107- * block manager, else it returns the location of the corresponding segment in HDFS.
112+ * Get the preferred location of the partition. This returns the locations of the block
113+ * if it is present in the block manager, else it returns the location of the
114+ * corresponding segment in HDFS.
108115 */
109116 override def getPreferredLocations (split : Partition ): Seq [String ] = {
110117 val partition = split.asInstanceOf [WriteAheadLogBackedBlockRDDPartition ]
111118 val blockLocations = getBlockIdLocations().get(partition.blockId)
112119 lazy val segmentLocations = HdfsUtils .getBlockLocations(
113- partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfiguration )
120+ partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfig )
114121 blockLocations.orElse(segmentLocations).getOrElse(Seq .empty)
115122 }
116123}
0 commit comments