Skip to content

Commit 6a0857c

Browse files
committed
Updated testsuites.
1 parent 222485d commit 6a0857c

File tree

4 files changed

+27
-30
lines changed

4 files changed

+27
-30
lines changed

streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
4343
val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
4444
val sparkConfPairs = ssc.conf.getAll
4545

46-
def sparkConf: SparkConf = {
46+
def createSparkConf(): SparkConf = {
4747
val newSparkConf = new SparkConf(loadDefaults = false).setAll(sparkConfPairs)
4848
.remove("spark.driver.host")
4949
.remove("spark.driver.port")

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ class StreamingContext private[streaming] (
116116

117117
private[streaming] val sc: SparkContext = {
118118
if (isCheckpointPresent) {
119-
new SparkContext(cp_.sparkConf)
119+
new SparkContext(cp_.createSparkConf())
120120
} else {
121121
sc_
122122
}

streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,15 +146,15 @@ class CheckpointSuite extends TestSuiteBase {
146146

147147
// This tests whether spark conf persists through checkpoints, and certain
148148
// configs gets scrubbed
149-
test("persistence of conf through checkpoints") {
149+
test("recovery of conf through checkpoints") {
150150
val key = "spark.mykey"
151151
val value = "myvalue"
152152
System.setProperty(key, value)
153153
ssc = new StreamingContext(master, framework, batchDuration)
154154
val originalConf = ssc.conf
155155

156156
val cp = new Checkpoint(ssc, Time(1000))
157-
val cpConf = cp.sparkConf
157+
val cpConf = cp.createSparkConf()
158158
assert(cpConf.get("spark.master") === originalConf.get("spark.master"))
159159
assert(cpConf.get("spark.app.name") === originalConf.get("spark.app.name"))
160160
assert(cpConf.get(key) === value)
@@ -163,7 +163,8 @@ class CheckpointSuite extends TestSuiteBase {
163163
// Serialize/deserialize to simulate write to storage and reading it back
164164
val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
165165

166-
val newCpConf = newCp.sparkConf
166+
// Verify new SparkConf has all the previous properties
167+
val newCpConf = newCp.createSparkConf()
167168
assert(newCpConf.get("spark.master") === originalConf.get("spark.master"))
168169
assert(newCpConf.get("spark.app.name") === originalConf.get("spark.app.name"))
169170
assert(newCpConf.get(key) === value)
@@ -174,6 +175,20 @@ class CheckpointSuite extends TestSuiteBase {
174175
ssc = new StreamingContext(null, newCp, null)
175176
val restoredConf = ssc.conf
176177
assert(restoredConf.get(key) === value)
178+
ssc.stop()
179+
180+
// Verify new SparkConf picks up new master url if it is set in the properties. See SPARK-6331.
181+
try {
182+
val newMaster = "local[100]"
183+
System.setProperty("spark.master", newMaster)
184+
val newCpConf = newCp.createSparkConf()
185+
assert(newCpConf.get("spark.master") === newMaster)
186+
assert(newCpConf.get("spark.app.name") === originalConf.get("spark.app.name"))
187+
ssc = new StreamingContext(null, newCp, null)
188+
assert(ssc.sparkContext.master === newMaster)
189+
} finally {
190+
System.clearProperty("spark.master")
191+
}
177192
}
178193

179194

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -91,36 +91,18 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
9191
}
9292

9393
test("from checkpoint") {
94-
// Create Checkpoint and verify whether SparkConf configuration is captured
95-
val myConf = SparkContext.updatedConf(new SparkConf(loadDefaults = false), master, appName)
94+
val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
9695
myConf.set("spark.cleaner.ttl", "10")
97-
ssc = new StreamingContext(myConf, batchDuration)
98-
addInputStream(ssc).register()
99-
ssc.start()
100-
val cp = new Checkpoint(ssc, Time(1000))
101-
ssc.stop()
96+
val ssc1 = new StreamingContext(myConf, batchDuration)
97+
addInputStream(ssc1).register()
98+
ssc1.start()
99+
val cp = new Checkpoint(ssc1, Time(1000))
102100
assert(cp.sparkConfPairs.toMap.getOrElse("spark.cleaner.ttl", "-1") === "10")
103-
104-
// Verify SparkConf recreated from Checkpoint has the same configuration
101+
ssc1.stop()
105102
val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
106-
assert(newCp.sparkConf.getInt("spark.cleaner.ttl", -1) === 10)
103+
assert(newCp.createSparkConf().getInt("spark.cleaner.ttl", -1) === 10)
107104
ssc = new StreamingContext(null, newCp, null)
108105
assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
109-
assert(ssc.conf.get("spark.master") === master)
110-
ssc.stop()
111-
112-
// Verify SparkConf recreated from Checkpoint picks up new master
113-
try {
114-
val newMaster = "local[100]"
115-
System.setProperty("spark.master", newMaster)
116-
val anotherNewCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
117-
ssc = new StreamingContext(null, anotherNewCp, null)
118-
assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
119-
assert(ssc.conf.get("spark.master") === newMaster)
120-
ssc.stop()
121-
} finally {
122-
System.clearProperty("spark.master")
123-
}
124106
}
125107

126108
test("start and stop state check") {

0 commit comments

Comments
 (0)