Skip to content

Commit 222485d

Browse files
committed
Load new master URL if present when recovering streaming context from checkpoint
1 parent b943f5d commit 222485d

File tree

2 files changed

+31
-8
lines changed

2 files changed

+31
-8
lines changed

streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,15 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
4343
val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
4444
val sparkConfPairs = ssc.conf.getAll
4545

46-
def sparkConf = {
47-
new SparkConf(false).setAll(sparkConfPairs)
46+
def sparkConf: SparkConf = {
47+
val newSparkConf = new SparkConf(loadDefaults = false).setAll(sparkConfPairs)
4848
.remove("spark.driver.host")
4949
.remove("spark.driver.port")
50+
new SparkConf(loadDefaults = true).getOption("spark.master") match {
51+
case Some(newMaster) => newSparkConf.setMaster(newMaster)
52+
case _ => None
53+
}
54+
newSparkConf
5055
}
5156

5257
def validate() {

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,18 +91,36 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
9191
}
9292

9393
test("from checkpoint") {
94-
val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
94+
// Create Checkpoint and verify whether SparkConf configuration is captured
95+
val myConf = SparkContext.updatedConf(new SparkConf(loadDefaults = false), master, appName)
9596
myConf.set("spark.cleaner.ttl", "10")
96-
val ssc1 = new StreamingContext(myConf, batchDuration)
97-
addInputStream(ssc1).register()
98-
ssc1.start()
99-
val cp = new Checkpoint(ssc1, Time(1000))
97+
ssc = new StreamingContext(myConf, batchDuration)
98+
addInputStream(ssc).register()
99+
ssc.start()
100+
val cp = new Checkpoint(ssc, Time(1000))
101+
ssc.stop()
100102
assert(cp.sparkConfPairs.toMap.getOrElse("spark.cleaner.ttl", "-1") === "10")
101-
ssc1.stop()
103+
104+
// Verify SparkConf recreated from Checkpoint has the same configuration
102105
val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
103106
assert(newCp.sparkConf.getInt("spark.cleaner.ttl", -1) === 10)
104107
ssc = new StreamingContext(null, newCp, null)
105108
assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
109+
assert(ssc.conf.get("spark.master") === master)
110+
ssc.stop()
111+
112+
// Verify SparkConf recreated from Checkpoint picks up new master
113+
try {
114+
val newMaster = "local[100]"
115+
System.setProperty("spark.master", newMaster)
116+
val anotherNewCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
117+
ssc = new StreamingContext(null, anotherNewCp, null)
118+
assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
119+
assert(ssc.conf.get("spark.master") === newMaster)
120+
ssc.stop()
121+
} finally {
122+
System.clearProperty("spark.master")
123+
}
106124
}
107125

108126
test("start and stop state check") {

0 commit comments

Comments
 (0)