Skip to content

Commit 9c86a61

Browse files
committed
Merge pull request #22 from harishreedharan/driver-ha-rdd
Shutdown spark context after tests. Formatting/minor fixes
2 parents c709f2f + 2878c38 commit 9c86a61

File tree

2 files changed

+36
-30
lines changed

2 files changed

+36
-30
lines changed

streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,17 @@ import scala.reflect.ClassTag
2020

2121
import org.apache.hadoop.conf.Configuration
2222

23-
import org.apache.spark.broadcast.Broadcast
2423
import org.apache.spark.rdd.BlockRDD
2524
import org.apache.spark.storage.{BlockId, StorageLevel}
2625
import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, HdfsUtils, WriteAheadLogRandomReader}
2726
import org.apache.spark._
2827

2928
private[streaming]
3029
class HDFSBackedBlockRDDPartition(
31-
val blockId: BlockId, idx: Int, val segment: WriteAheadLogFileSegment) extends Partition {
32-
val index = idx
33-
}
30+
val blockId: BlockId,
31+
val index: Int,
32+
val segment: WriteAheadLogFileSegment
33+
) extends Partition
3434

3535
private[streaming]
3636
class HDFSBackedBlockRDD[T: ClassTag](
@@ -42,13 +42,12 @@ class HDFSBackedBlockRDD[T: ClassTag](
4242
val storageLevel: StorageLevel
4343
) extends BlockRDD[T](sc, blockIds) {
4444

45-
if (blockIds.length != segments.length) {
46-
throw new IllegalStateException("Number of block ids must be the same as number of segments!")
47-
}
45+
require(blockIds.length == segments.length,
46+
"Number of block ids must be the same as number of segments!")
4847

4948
// Hadoop Configuration is not serializable, so broadcast it as a serializable.
5049
val broadcastedHadoopConf = sc.broadcast(new SerializableWritable(hadoopConfiguration))
51-
.asInstanceOf[Broadcast[SerializableWritable[Configuration]]]
50+
5251
override def getPartitions: Array[Partition] = {
5352
assertValid()
5453
(0 until blockIds.size).map { i =>

streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,18 @@ package org.apache.spark.streaming.rdd
1919
import java.io.File
2020
import java.util.concurrent.atomic.AtomicInteger
2121

22+
import org.apache.spark.{SparkConf, SparkContext}
23+
2224
import scala.collection.mutable.ArrayBuffer
23-
import org.scalatest.{BeforeAndAfter, FunSuite}
25+
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite}
2426

2527
import com.google.common.io.Files
2628
import org.apache.hadoop.conf.Configuration
2729

2830
import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId}
2931
import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter}
30-
import org.apache.spark.{SparkConf, SparkContext}
3132

32-
class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter {
33+
class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll {
3334
val conf = new SparkConf()
3435
.setMaster("local[2]")
3536
.setAppName(this.getClass.getSimpleName)
@@ -51,6 +52,13 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter {
5152
dir.delete()
5253
}
5354

55+
override def afterAll(): Unit = {
56+
// Copied from LocalSparkContext which can't be imported since spark-core test-jar does not
57+
// get imported properly by sbt even if it is created.
58+
sparkContext.stop()
59+
System.clearProperty("spark.driver.port")
60+
}
61+
5462
test("Data available in BM and HDFS") {
5563
doTestHDFSBackedRDD(5, 5, 20, 5)
5664
}
@@ -70,8 +78,8 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter {
7078
/**
7179
* Write a bunch of events into the HDFS Block RDD. Put a part of all of them to the
7280
* BlockManager, so all reads need not happen from HDFS.
73-
* @param total - Total number of Strings to write
74-
* @param blockCount - Number of blocks to write (therefore, total # of events per block =
81+
* @param total Total number of Strings to write
82+
* @param blockCount Number of blocks to write (therefore, total # of events per block =
7583
* total/blockCount
7684
*/
7785
private def doTestHDFSBackedRDD(
@@ -81,8 +89,7 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter {
8189
blockCount: Int
8290
) {
8391
val countPerBlock = total / blockCount
84-
val blockIds = (0 until blockCount).map {
85-
i =>
92+
val blockIds = (0 until blockCount).map { i =>
8693
StreamBlockId(idGenerator.incrementAndGet(), idGenerator.incrementAndGet())
8794
}
8895

@@ -95,16 +102,17 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter {
95102
}
96103
}
97104

98-
val segments = new ArrayBuffer[WriteAheadLogFileSegment]
99-
if (writeToHDFSCount != 0) {
100-
// Generate some fake segments for the blocks in BM so the RDD does not complain
101-
segments ++= generateFakeSegments(writeToBMCount)
102-
segments ++= writeDataToHDFS(writtenStrings.slice(writeToBMCount, blockCount),
103-
blockIds.slice(writeToBMCount, blockCount))
104-
105-
} else {
106-
segments ++= generateFakeSegments(blockCount)
105+
val segments = {
106+
if (writeToHDFSCount != 0) {
107+
// Generate some fake segments for the blocks in BM so the RDD does not complain
108+
generateFakeSegments(writeToBMCount) ++
109+
writeDataToHDFS(writtenStrings.slice(writeToBMCount, blockCount),
110+
blockIds.slice(writeToBMCount, blockCount))
111+
} else {
112+
generateFakeSegments(blockCount)
113+
}
107114
}
115+
108116
val rdd = new HDFSBackedBlockRDD[String](sparkContext, hadoopConf, blockIds.toArray,
109117
segments.toArray, false, StorageLevel.MEMORY_ONLY)
110118

@@ -116,10 +124,9 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter {
116124
/**
117125
* Write data to HDFS and get a list of Seq of Seqs in which each Seq represents the data that
118126
* went into one block.
119-
* @param count - Number of Strings to write
120-
* @param countPerBlock - Number of Strings per block
121-
* @return - Tuple of (Seq of Seqs, each of these Seqs is one block, Seq of WriteAheadLogFileSegments,
122-
* each representing the block being written to HDFS.
127+
* @param count Number of Strings to write
128+
* @param countPerBlock Number of Strings per block
129+
* @return Seq of Seqs, each of these Seqs is one block
123130
*/
124131
private def generateData(
125132
count: Int,
@@ -130,8 +137,8 @@ class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter {
130137
}
131138

132139
private def writeDataToHDFS(
133-
blockData: Seq[Seq[String]],
134-
blockIds: Seq[BlockId]
140+
blockData: Seq[Seq[String]],
141+
blockIds: Seq[BlockId]
135142
): Seq[WriteAheadLogFileSegment] = {
136143
assert(blockData.size === blockIds.size)
137144
val segments = new ArrayBuffer[WriteAheadLogFileSegment]()

0 commit comments

Comments
 (0)