Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ class SparkContext(config: SparkConf) extends Logging {
override protected def childValue(parent: Properties): Properties = {
// Note: make a clone such that changes in the parent properties aren't reflected in
// the those of the children threads, which has confusing semantics (SPARK-10563).
SerializationUtils.clone(parent)
Utils.cloneProperties(parent)
}
override protected def initialValue(): Properties = new Properties()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import scala.collection.mutable.{HashMap, HashSet, ListBuffer}
import scala.concurrent.duration._
import scala.util.control.NonFatal

import org.apache.commons.lang3.SerializationUtils

import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
Expand Down Expand Up @@ -698,7 +696,7 @@ private[spark] class DAGScheduler(
if (partitions.isEmpty) {
val time = clock.getTimeMillis()
listenerBus.post(
SparkListenerJobStart(jobId, time, Seq[StageInfo](), SerializationUtils.clone(properties)))
SparkListenerJobStart(jobId, time, Seq[StageInfo](), Utils.cloneProperties(properties)))
listenerBus.post(
SparkListenerJobEnd(jobId, time, JobSucceeded))
// Return immediately if the job is running 0 tasks
Expand All @@ -710,7 +708,7 @@ private[spark] class DAGScheduler(
val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
Utils.cloneProperties(properties)))
waiter
}

Expand Down Expand Up @@ -782,7 +780,7 @@ private[spark] class DAGScheduler(
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, rdd.partitions.indices.toArray, callSite, listener,
SerializationUtils.clone(properties)))
Utils.cloneProperties(properties)))
listener.awaitResult() // Will throw an exception if the job fails
}

Expand Down Expand Up @@ -819,7 +817,7 @@ private[spark] class DAGScheduler(
this, jobId, 1,
(_: Int, r: MapOutputStatistics) => callback(r))
eventProcessLoop.post(MapStageSubmitted(
jobId, dependency, callSite, waiter, SerializationUtils.clone(properties)))
jobId, dependency, callSite, waiter, Utils.cloneProperties(properties)))
waiter
}

Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2950,6 +2950,13 @@ private[spark] object Utils extends Logging {
val codec = codecFactory.getCodec(path)
codec == null || codec.isInstanceOf[SplittableCompressionCodec]
}

/** Create a new properties object with the same values as `props` */
def cloneProperties(props: Properties): Properties = {
val resultProps = new Properties()
props.forEach((k, v) => resultProps.put(k, v))
resultProps
}
}

private[util] object CallerContext extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.util

import java.util.Properties

import org.apache.commons.lang3.SerializationUtils
import org.scalatest.{BeforeAndAfterEach, Suite}

/**
Expand All @@ -43,11 +42,11 @@ private[spark] trait ResetSystemProperties extends BeforeAndAfterEach { this: Su
var oldProperties: Properties = null

override def beforeEach(): Unit = {
// we need SerializationUtils.clone instead of `new Properties(System.getProperties())` because
// we need Utils.cloneProperties instead of `new Properties(System.getProperties())` because
// the later way of creating a copy does not copy the properties but it initializes a new
// Properties object with the given properties as defaults. They are not recognized at all
// by standard Scala wrapper over Java Properties then.
oldProperties = SerializationUtils.clone(System.getProperties)
oldProperties = Utils.cloneProperties(System.getProperties)
super.beforeEach()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import scala.collection.mutable.Queue
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import org.apache.commons.lang3.SerializationUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
Expand Down Expand Up @@ -586,7 +585,7 @@ class StreamingContext private[streaming] (
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get()))
savedProperties.set(Utils.cloneProperties(sparkContext.localProperties.get()))
scheduler.start()
}
state = StreamingContextState.ACTIVE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,14 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import scala.collection.JavaConverters._
import scala.util.Failure

import org.apache.commons.lang3.SerializationUtils

import org.apache.spark.ExecutorAllocationClient
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.SparkHadoopWriterUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.api.python.PythonDStream
import org.apache.spark.streaming.ui.UIUtils
import org.apache.spark.util.{EventLoop, ThreadUtils}
import org.apache.spark.util.{EventLoop, ThreadUtils, Utils}


private[scheduler] sealed trait JobSchedulerEvent
Expand Down Expand Up @@ -231,7 +229,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
def run() {
val oldProps = ssc.sparkContext.getLocalProperties
try {
ssc.sparkContext.setLocalProperties(SerializationUtils.clone(ssc.savedProperties.get()))
ssc.sparkContext.setLocalProperties(Utils.cloneProperties(ssc.savedProperties.get()))
val formattedTime = UIUtils.formatBatchTime(
job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
Expand Down