Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,15 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
val sparkConfPairs = ssc.conf.getAll

def sparkConf = {
new SparkConf(false).setAll(sparkConfPairs)
def createSparkConf(): SparkConf = {
val newSparkConf = new SparkConf(loadDefaults = false).setAll(sparkConfPairs)
.remove("spark.driver.host")
.remove("spark.driver.port")
new SparkConf(loadDefaults = true).getOption("spark.master") match {
case Some(newMaster) => newSparkConf.setMaster(newMaster)
case _ => None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case _ => should be enough, no? Is there a need to return None? (Since None is an object, I am not sure what the real cost is in this case for returning something - so this can be ignored if the cost here is ~zero)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I can make it better by using foreach on option. That was stupid and
too hurried.
On Mar 14, 2015 12:05 AM, "Hari Shreedharan" [email protected]
wrote:

In streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
#5024 (comment):

   .remove("spark.driver.host")
   .remove("spark.driver.port")
  • new SparkConf(loadDefaults = true).getOption("spark.master") match {
  •  case Some(newMaster) => newSparkConf.setMaster(newMaster)
    
  •  case _ => None
    

case _ => should be enough, no? Is there a need to return None? (Since
None is an object, I am not sure what the real cost is in this case for
returning something - so this can be ignored if the cost here is ~zero)


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/5024/files#r26436080.

}
newSparkConf
}

def validate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class StreamingContext private[streaming] (

private[streaming] val sc: SparkContext = {
if (isCheckpointPresent) {
new SparkContext(cp_.sparkConf)
new SparkContext(cp_.createSparkConf())
} else {
sc_
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,15 @@ class CheckpointSuite extends TestSuiteBase {

// This tests whether spark conf persists through checkpoints, and certain
// configs gets scrubbed
test("persistence of conf through checkpoints") {
test("recovery of conf through checkpoints") {
val key = "spark.mykey"
val value = "myvalue"
System.setProperty(key, value)
ssc = new StreamingContext(master, framework, batchDuration)
val originalConf = ssc.conf

val cp = new Checkpoint(ssc, Time(1000))
val cpConf = cp.sparkConf
val cpConf = cp.createSparkConf()
assert(cpConf.get("spark.master") === originalConf.get("spark.master"))
assert(cpConf.get("spark.app.name") === originalConf.get("spark.app.name"))
assert(cpConf.get(key) === value)
Expand All @@ -163,7 +163,8 @@ class CheckpointSuite extends TestSuiteBase {
// Serialize/deserialize to simulate write to storage and reading it back
val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))

val newCpConf = newCp.sparkConf
// Verify new SparkConf has all the previous properties
val newCpConf = newCp.createSparkConf()
assert(newCpConf.get("spark.master") === originalConf.get("spark.master"))
assert(newCpConf.get("spark.app.name") === originalConf.get("spark.app.name"))
assert(newCpConf.get(key) === value)
Expand All @@ -174,6 +175,20 @@ class CheckpointSuite extends TestSuiteBase {
ssc = new StreamingContext(null, newCp, null)
val restoredConf = ssc.conf
assert(restoredConf.get(key) === value)
ssc.stop()

// Verify new SparkConf picks up new master url if it is set in the properties. See SPARK-6331.
try {
val newMaster = "local[100]"
System.setProperty("spark.master", newMaster)
val newCpConf = newCp.createSparkConf()
assert(newCpConf.get("spark.master") === newMaster)
assert(newCpConf.get("spark.app.name") === originalConf.get("spark.app.name"))
ssc = new StreamingContext(null, newCp, null)
assert(ssc.sparkContext.master === newMaster)
} finally {
System.clearProperty("spark.master")
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
assert(cp.sparkConfPairs.toMap.getOrElse("spark.cleaner.ttl", "-1") === "10")
ssc1.stop()
val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
assert(newCp.sparkConf.getInt("spark.cleaner.ttl", -1) === 10)
assert(newCp.createSparkConf().getInt("spark.cleaner.ttl", -1) === 10)
ssc = new StreamingContext(null, newCp, null)
assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
}
Expand Down