From 222485d3f93143ff46cc8473ab6f1b1a3a23ecb0 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 13 Mar 2015 18:58:15 -0700 Subject: [PATCH 1/4] Load new master URL if present when recovering streaming context from checkpoint --- .../apache/spark/streaming/Checkpoint.scala | 9 ++++-- .../streaming/StreamingContextSuite.scala | 30 +++++++++++++++---- 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index f88a8a015155..dced04cb0186 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -43,10 +43,15 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf) val sparkConfPairs = ssc.conf.getAll - def sparkConf = { - new SparkConf(false).setAll(sparkConfPairs) + def sparkConf: SparkConf = { + val newSparkConf = new SparkConf(loadDefaults = false).setAll(sparkConfPairs) .remove("spark.driver.host") .remove("spark.driver.port") + new SparkConf(loadDefaults = true).getOption("spark.master") match { + case Some(newMaster) => newSparkConf.setMaster(newMaster) + case _ => None + } + newSparkConf } def validate() { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 6a7cd97aa322..55864a99627b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -91,18 +91,36 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w } test("from checkpoint") { - val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) + // Create Checkpoint and verify whether SparkConf configuration is captured + val myConf = SparkContext.updatedConf(new SparkConf(loadDefaults = false), master, appName) myConf.set("spark.cleaner.ttl", "10") - val ssc1 = new StreamingContext(myConf, batchDuration) - addInputStream(ssc1).register() - ssc1.start() - val cp = new Checkpoint(ssc1, Time(1000)) + ssc = new StreamingContext(myConf, batchDuration) + addInputStream(ssc).register() + ssc.start() + val cp = new Checkpoint(ssc, Time(1000)) + ssc.stop() assert(cp.sparkConfPairs.toMap.getOrElse("spark.cleaner.ttl", "-1") === "10") - ssc1.stop() + + // Verify SparkConf recreated from Checkpoint has the same configuration val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp)) assert(newCp.sparkConf.getInt("spark.cleaner.ttl", -1) === 10) ssc = new StreamingContext(null, newCp, null) assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10) + assert(ssc.conf.get("spark.master") === master) + ssc.stop() + + // Verify SparkConf recreated from Checkpoint picks up new master + try { + val newMaster = "local[100]" + System.setProperty("spark.master", newMaster) + val anotherNewCp = Utils.deserialize[Checkpoint](Utils.serialize(cp)) + ssc = new StreamingContext(null, anotherNewCp, null) + assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10) + assert(ssc.conf.get("spark.master") === newMaster) + ssc.stop() + } finally { + System.clearProperty("spark.master") + } } test("start and stop state check") { From 6a0857c25fe10d6dd42031484ed5c3ff5319d08d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 13 Mar 2015 19:15:27 -0700 Subject: [PATCH 2/4] Updated testsuites. --- .../apache/spark/streaming/Checkpoint.scala | 2 +- .../spark/streaming/StreamingContext.scala | 2 +- .../spark/streaming/CheckpointSuite.scala | 21 ++++++++++-- .../streaming/StreamingContextSuite.scala | 32 ++++--------------- 4 files changed, 27 insertions(+), 30 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index dced04cb0186..dfd9655112e9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -43,7 +43,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf) val sparkConfPairs = ssc.conf.getAll - def sparkConf: SparkConf = { + def createSparkConf(): SparkConf = { val newSparkConf = new SparkConf(loadDefaults = false).setAll(sparkConfPairs) .remove("spark.driver.host") .remove("spark.driver.port") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index ba3f23434f24..54b98ce3ec8d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -116,7 +116,7 @@ class StreamingContext private[streaming] ( private[streaming] val sc: SparkContext = { if (isCheckpointPresent) { - new SparkContext(cp_.sparkConf) + new SparkContext(cp_.createSparkConf()) } else { sc_ } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 03c448f1df5f..8ea91eca683c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -146,7 +146,7 @@ class CheckpointSuite extends TestSuiteBase { // This tests whether spark conf persists through checkpoints, and certain // configs gets scrubbed - test("persistence of conf through checkpoints") { + test("recovery of conf through checkpoints") { val key = "spark.mykey" val value = "myvalue" System.setProperty(key, value) @@ -154,7 +154,7 @@ class CheckpointSuite extends TestSuiteBase { val originalConf = ssc.conf val cp = new Checkpoint(ssc, Time(1000)) - val cpConf = cp.sparkConf + val cpConf = cp.createSparkConf() assert(cpConf.get("spark.master") === originalConf.get("spark.master")) assert(cpConf.get("spark.app.name") === originalConf.get("spark.app.name")) assert(cpConf.get(key) === value) @@ -163,7 +163,8 @@ class CheckpointSuite extends TestSuiteBase { // Serialize/deserialize to simulate write to storage and reading it back val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp)) - val newCpConf = newCp.sparkConf + // Verify new SparkConf has all the previous properties + val newCpConf = newCp.createSparkConf() assert(newCpConf.get("spark.master") === originalConf.get("spark.master")) assert(newCpConf.get("spark.app.name") === originalConf.get("spark.app.name")) assert(newCpConf.get(key) === value) @@ -174,6 +175,20 @@ class CheckpointSuite extends TestSuiteBase { ssc = new StreamingContext(null, newCp, null) val restoredConf = ssc.conf assert(restoredConf.get(key) === value) + ssc.stop() + + // Verify new SparkConf picks up new master url if it is set in the properties. See SPARK-6331. + try { + val newMaster = "local[100]" + System.setProperty("spark.master", newMaster) + val newCpConf = newCp.createSparkConf() + assert(newCpConf.get("spark.master") === newMaster) + assert(newCpConf.get("spark.app.name") === originalConf.get("spark.app.name")) + ssc = new StreamingContext(null, newCp, null) + assert(ssc.sparkContext.master === newMaster) + } finally { + System.clearProperty("spark.master") + } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 55864a99627b..2e5005ef6ff1 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -91,36 +91,18 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w } test("from checkpoint") { - // Create Checkpoint and verify whether SparkConf configuration is captured - val myConf = SparkContext.updatedConf(new SparkConf(loadDefaults = false), master, appName) + val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) myConf.set("spark.cleaner.ttl", "10") - ssc = new StreamingContext(myConf, batchDuration) - addInputStream(ssc).register() - ssc.start() - val cp = new Checkpoint(ssc, Time(1000)) - ssc.stop() + val ssc1 = new StreamingContext(myConf, batchDuration) + addInputStream(ssc1).register() + ssc1.start() + val cp = new Checkpoint(ssc1, Time(1000)) assert(cp.sparkConfPairs.toMap.getOrElse("spark.cleaner.ttl", "-1") === "10") - - // Verify SparkConf recreated from Checkpoint has the same configuration + ssc1.stop() val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp)) - assert(newCp.sparkConf.getInt("spark.cleaner.ttl", -1) === 10) + assert(newCp.createSparkConf().getInt("spark.cleaner.ttl", -1) === 10) ssc = new StreamingContext(null, newCp, null) assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10) - assert(ssc.conf.get("spark.master") === master) - ssc.stop() - - // Verify SparkConf recreated from Checkpoint picks up new master - try { - val newMaster = "local[100]" - System.setProperty("spark.master", newMaster) - val anotherNewCp = Utils.deserialize[Checkpoint](Utils.serialize(cp)) - ssc = new StreamingContext(null, anotherNewCp, null) - assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10) - assert(ssc.conf.get("spark.master") === newMaster) - ssc.stop() - } finally { - System.clearProperty("spark.master") - } } test("start and stop state check") { From c7c0b99a2373c72ac4855b23f46c623672e82fa3 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 16 Mar 2015 12:16:23 -0700 Subject: [PATCH 3/4] Addressed comments. --- .../main/scala/org/apache/spark/streaming/Checkpoint.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index dfd9655112e9..7e744d995268 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -47,9 +47,9 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val newSparkConf = new SparkConf(loadDefaults = false).setAll(sparkConfPairs) .remove("spark.driver.host") .remove("spark.driver.port") - new SparkConf(loadDefaults = true).getOption("spark.master") match { - case Some(newMaster) => newSparkConf.setMaster(newMaster) - case _ => None + val oldMaster = new SparkConf(loadDefaults = true).getOption("spark.master") + oldMaster.foreach { newMaster => + newSparkConf.setMaster(newMaster) } newSparkConf } From 392fd441ec606a2b32f575f5ed2d89005d619b1d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 16 Mar 2015 14:49:50 -0700 Subject: [PATCH 4/4] Fixed naming issue. --- .../main/scala/org/apache/spark/streaming/Checkpoint.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 7e744d995268..cb4c94fb9d5a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -47,10 +47,8 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val newSparkConf = new SparkConf(loadDefaults = false).setAll(sparkConfPairs) .remove("spark.driver.host") .remove("spark.driver.port") - val oldMaster = new SparkConf(loadDefaults = true).getOption("spark.master") - oldMaster.foreach { newMaster => - newSparkConf.setMaster(newMaster) - } + val newMasterOption = new SparkConf(loadDefaults = true).getOption("spark.master") + newMasterOption.foreach { newMaster => newSparkConf.setMaster(newMaster) } newSparkConf }