Skip to content

Commit ed5fbf0

Browse files
committed
Minor updates.
1 parent b0a18b1 commit ed5fbf0

File tree

2 files changed

+12
-11
lines changed

2 files changed

+12
-11
lines changed

streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ private[streaming]
3737
class WriteAheadLogBackedBlockRDDPartition(
3838
val index: Int,
3939
val blockId: BlockId,
40-
val segment: WriteAheadLogFileSegment
41-
) extends Partition
40+
val segment: WriteAheadLogFileSegment)
41+
extends Partition
4242

4343

4444
/**
@@ -59,11 +59,11 @@ private[streaming]
5959
class WriteAheadLogBackedBlockRDD[T: ClassTag](
6060
@transient sc: SparkContext,
6161
@transient hadoopConfig: Configuration,
62-
@transient override val blockIds: Array[BlockId],
63-
@transient val segments: Array[WriteAheadLogFileSegment],
64-
val storeInBlockManager: Boolean,
65-
val storageLevel: StorageLevel
66-
) extends BlockRDD[T](sc, blockIds) {
62+
@transient blockIds: Array[BlockId],
63+
@transient segments: Array[WriteAheadLogFileSegment],
64+
storeInBlockManager: Boolean,
65+
storageLevel: StorageLevel)
66+
extends BlockRDD[T](sc, blockIds) {
6767

6868
require(
6969
blockIds.length == segments.length,
@@ -76,7 +76,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
7676
override def getPartitions: Array[Partition] = {
7777
assertValid()
7878
Array.tabulate(blockIds.size) { i =>
79-
new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), segments(i)) }
79+
new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), segments(i))
80+
}
8081
}
8182

8283
/**
@@ -116,8 +117,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
116117
*/
117118
override def getPreferredLocations(split: Partition): Seq[String] = {
118119
val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
119-
val blockLocations = getBlockIdLocations().get(partition.blockId)
120-
lazy val segmentLocations = HdfsUtils.getFileSegmentLocations(
120+
def blockLocations = getBlockIdLocations().get(partition.blockId)
121+
def segmentLocations = HdfsUtils.getFileSegmentLocations(
121122
partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfig)
122123
blockLocations.orElse(segmentLocations).getOrElse(Seq.empty)
123124
}

streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll {
8686
testStoreInBM: Boolean = false
8787
) {
8888
val numBlocks = numPartitionssInBM + numPartitionsInWAL
89-
val data = Seq.tabulate(numBlocks) { _ => Seq.fill(10) { scala.util.Random.nextString(50) } }
89+
val data = Seq.fill(numBlocks, 10)(scala.util.Random.nextString(50))
9090

9191
// Put the necessary blocks in the block manager
9292
val blockIds = Array.fill(numBlocks)(StreamBlockId(Random.nextInt(), Random.nextInt()))

0 commit comments

Comments
 (0)