Skip to content

Commit 95f8d1c

Browse files
committed
[SPARK-6331] Load new master URL if present when recovering streaming context from checkpoint
In streaming driver recovery, when the SparkConf is reconstructed based on the checkpointed configuration, it recovers the old master URL. This okay if the cluster on which the streaming application is relaunched is the same cluster as it was running before. But if that cluster changes, there is no way to inject the new master URL of the new cluster. As a result, the restarted app tries to connect to the non-existent old cluster and fails. The solution is to check whether a master URL is set in the System properties (by Spark submit) before recreating the SparkConf. If a new master url is set in the properties, then use it as that is obviously the most relevant one. Otherwise load the old one (to maintain existing behavior). Author: Tathagata Das <[email protected]> Closes #5024 from tdas/SPARK-6331 and squashes the following commits: 392fd44 [Tathagata Das] Fixed naming issue. c7c0b99 [Tathagata Das] Addressed comments. 6a0857c [Tathagata Das] Updated testsuites. 222485d [Tathagata Das] Load new master URL if present when recovering streaming context from checkpoint (cherry picked from commit c928796) Signed-off-by: Tathagata Das <[email protected]>
1 parent 426816b commit 95f8d1c

File tree

4 files changed

+25
-7
lines changed

4 files changed

+25
-7
lines changed

streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,13 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
4343
val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
4444
val sparkConfPairs = ssc.conf.getAll
4545

46-
def sparkConf = {
47-
new SparkConf(false).setAll(sparkConfPairs)
46+
def createSparkConf(): SparkConf = {
47+
val newSparkConf = new SparkConf(loadDefaults = false).setAll(sparkConfPairs)
4848
.remove("spark.driver.host")
4949
.remove("spark.driver.port")
50+
val newMasterOption = new SparkConf(loadDefaults = true).getOption("spark.master")
51+
newMasterOption.foreach { newMaster => newSparkConf.setMaster(newMaster) }
52+
newSparkConf
5053
}
5154

5255
def validate() {

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ class StreamingContext private[streaming] (
116116

117117
private[streaming] val sc: SparkContext = {
118118
if (isCheckpointPresent) {
119-
new SparkContext(cp_.sparkConf)
119+
new SparkContext(cp_.createSparkConf())
120120
} else {
121121
sc_
122122
}

streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,15 +146,15 @@ class CheckpointSuite extends TestSuiteBase {
146146

147147
// This tests whether spark conf persists through checkpoints, and certain
148148
// configs gets scrubbed
149-
test("persistence of conf through checkpoints") {
149+
test("recovery of conf through checkpoints") {
150150
val key = "spark.mykey"
151151
val value = "myvalue"
152152
System.setProperty(key, value)
153153
ssc = new StreamingContext(master, framework, batchDuration)
154154
val originalConf = ssc.conf
155155

156156
val cp = new Checkpoint(ssc, Time(1000))
157-
val cpConf = cp.sparkConf
157+
val cpConf = cp.createSparkConf()
158158
assert(cpConf.get("spark.master") === originalConf.get("spark.master"))
159159
assert(cpConf.get("spark.app.name") === originalConf.get("spark.app.name"))
160160
assert(cpConf.get(key) === value)
@@ -163,7 +163,8 @@ class CheckpointSuite extends TestSuiteBase {
163163
// Serialize/deserialize to simulate write to storage and reading it back
164164
val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
165165

166-
val newCpConf = newCp.sparkConf
166+
// Verify new SparkConf has all the previous properties
167+
val newCpConf = newCp.createSparkConf()
167168
assert(newCpConf.get("spark.master") === originalConf.get("spark.master"))
168169
assert(newCpConf.get("spark.app.name") === originalConf.get("spark.app.name"))
169170
assert(newCpConf.get(key) === value)
@@ -174,6 +175,20 @@ class CheckpointSuite extends TestSuiteBase {
174175
ssc = new StreamingContext(null, newCp, null)
175176
val restoredConf = ssc.conf
176177
assert(restoredConf.get(key) === value)
178+
ssc.stop()
179+
180+
// Verify new SparkConf picks up new master url if it is set in the properties. See SPARK-6331.
181+
try {
182+
val newMaster = "local[100]"
183+
System.setProperty("spark.master", newMaster)
184+
val newCpConf = newCp.createSparkConf()
185+
assert(newCpConf.get("spark.master") === newMaster)
186+
assert(newCpConf.get("spark.app.name") === originalConf.get("spark.app.name"))
187+
ssc = new StreamingContext(null, newCp, null)
188+
assert(ssc.sparkContext.master === newMaster)
189+
} finally {
190+
System.clearProperty("spark.master")
191+
}
177192
}
178193

179194

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
100100
assert(cp.sparkConfPairs.toMap.getOrElse("spark.cleaner.ttl", "-1") === "10")
101101
ssc1.stop()
102102
val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
103-
assert(newCp.sparkConf.getInt("spark.cleaner.ttl", -1) === 10)
103+
assert(newCp.createSparkConf().getInt("spark.cleaner.ttl", -1) === 10)
104104
ssc = new StreamingContext(null, newCp, null)
105105
assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
106106
}

0 commit comments

Comments
 (0)