diff --git a/core/benchmarks/PropertiesCloneBenchmark-results.txt b/core/benchmarks/PropertiesCloneBenchmark-results.txt new file mode 100644 index 000000000000..00c9561648be --- /dev/null +++ b/core/benchmarks/PropertiesCloneBenchmark-results.txt @@ -0,0 +1,40 @@ +================================================================================================ +Properties Cloning +================================================================================================ + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Mac OS X 10.14.6 +Intel(R) Core(TM) i9-8950HK CPU @ 2.90GHz +Empty Properties: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +SerializationUtils.clone 0 0 0 0.2 4184.0 1.0X +Utils.cloneProperties 0 0 0 55.6 18.0 232.4X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Mac OS X 10.14.6 +Intel(R) Core(TM) i9-8950HK CPU @ 2.90GHz +System Properties: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +SerializationUtils.clone 0 0 0 0.0 107612.0 1.0X +Utils.cloneProperties 0 0 0 1.0 962.0 111.9X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Mac OS X 10.14.6 +Intel(R) Core(TM) i9-8950HK CPU @ 2.90GHz +Small Properties: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +SerializationUtils.clone 0 0 0 0.0 330210.0 1.0X +Utils.cloneProperties 0 0 0 0.9 1082.0 305.2X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Mac OS X 10.14.6 +Intel(R) Core(TM) i9-8950HK CPU @ 2.90GHz +Medium Properties: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +SerializationUtils.clone 1 2 0 0.0 1336301.0 1.0X +Utils.cloneProperties 0 0 0 0.2 5456.0 244.9X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Mac OS X 10.14.6 +Intel(R) Core(TM) i9-8950HK CPU @ 2.90GHz +Large Properties: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +SerializationUtils.clone 3 3 0 0.0 2634336.0 1.0X +Utils.cloneProperties 0 0 0 0.1 10822.0 243.4X + + diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 396d712bd739..44c59e2a56c2 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -31,7 +31,6 @@ import scala.reflect.{classTag, ClassTag} import scala.util.control.NonFatal import com.google.common.collect.MapMaker -import org.apache.commons.lang3.SerializationUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable} @@ -346,7 +345,7 @@ class SparkContext(config: SparkConf) extends Logging { override protected def childValue(parent: Properties): Properties = { // Note: make a clone such that changes in the parent properties aren't reflected in // the those of the children threads, which has confusing semantics (SPARK-10563). - SerializationUtils.clone(parent) + Utils.cloneProperties(parent) } override protected def initialValue(): Properties = new Properties() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 9df59459ca79..894234f70e05 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -29,8 +29,6 @@ import scala.collection.mutable.{HashMap, HashSet, ListBuffer} import scala.concurrent.duration._ import scala.util.control.NonFatal -import org.apache.commons.lang3.SerializationUtils - import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} @@ -698,7 +696,7 @@ private[spark] class DAGScheduler( if (partitions.isEmpty) { val time = clock.getTimeMillis() listenerBus.post( - SparkListenerJobStart(jobId, time, Seq[StageInfo](), SerializationUtils.clone(properties))) + SparkListenerJobStart(jobId, time, Seq[StageInfo](), Utils.cloneProperties(properties))) listenerBus.post( SparkListenerJobEnd(jobId, time, JobSucceeded)) // Return immediately if the job is running 0 tasks @@ -710,7 +708,7 @@ private[spark] class DAGScheduler( val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler) eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, - SerializationUtils.clone(properties))) + Utils.cloneProperties(properties))) waiter } @@ -782,7 +780,7 @@ private[spark] class DAGScheduler( val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, rdd.partitions.indices.toArray, callSite, listener, - SerializationUtils.clone(properties))) + Utils.cloneProperties(properties))) listener.awaitResult() // Will throw an exception if the job fails } @@ -819,7 +817,7 @@ private[spark] class DAGScheduler( this, jobId, 1, (_: Int, r: MapOutputStatistics) => callback(r)) eventProcessLoop.post(MapStageSubmitted( - jobId, dependency, callSite, waiter, SerializationUtils.clone(properties))) + jobId, dependency, callSite, waiter, Utils.cloneProperties(properties))) waiter } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 5cd937aeb8ea..c47a23edde72 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2950,6 +2950,13 @@ private[spark] object Utils extends Logging { val codec = codecFactory.getCodec(path) codec == null || codec.isInstanceOf[SplittableCompressionCodec] } + + /** Create a new properties object with the same values as `props` */ + def cloneProperties(props: Properties): Properties = { + val resultProps = new Properties() + props.forEach((k, v) => resultProps.put(k, v)) + resultProps + } } private[util] object CallerContext extends Logging { diff --git a/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala b/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala index 73f9d0e2bc0e..022fcbb25b0a 100644 --- a/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala +++ b/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala @@ -141,12 +141,14 @@ private[spark] class Benchmark( val minIters = if (overrideNumIters != 0) overrideNumIters else minNumIters val minDuration = if (overrideNumIters != 0) 0 else minTime.toNanos val runTimes = ArrayBuffer[Long]() + var totalTime = 0L var i = 0 - while (i < minIters || runTimes.sum < minDuration) { + while (i < minIters || totalTime < minDuration) { val timer = new Benchmark.Timer(i) f(timer) val runTime = timer.totalTime() runTimes += runTime + totalTime += runTime if (outputPerIteration) { // scalastyle:off diff --git a/core/src/test/scala/org/apache/spark/util/PropertiesCloneBenchmark.scala b/core/src/test/scala/org/apache/spark/util/PropertiesCloneBenchmark.scala new file mode 100644 index 000000000000..0726886c70fe --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/PropertiesCloneBenchmark.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util + +import java.util.Properties + +import scala.util.Random + +import org.apache.commons.lang.SerializationUtils + +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} + + +/** + * Benchmark for Kryo Unsafe vs safe Serialization. + * To run this benchmark: + * {{{ + * 1. without sbt: + * bin/spark-submit --class --jars + * 2. build/sbt "core/test:runMain " + * 3. generate result: + * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain " + * Results will be written to "benchmarks/PropertiesCloneBenchmark-results.txt". + * }}} + */ +object PropertiesCloneBenchmark extends BenchmarkBase { + /** + * Benchmark various cases of cloning properties objects + */ + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + runBenchmark("Properties Cloning") { + def compareSerialization(name: String, props: Properties): Unit = { + val benchmark = new Benchmark(name, 1, output = output) + benchmark.addCase("SerializationUtils.clone") { _ => + SerializationUtils.clone(props) + } + benchmark.addCase("Utils.cloneProperties") { _ => + Utils.cloneProperties(props) + } + benchmark.run() + } + compareSerialization("Empty Properties", new Properties) + compareSerialization("System Properties", System.getProperties) + compareSerialization("Small Properties", makeRandomProps(10, 40, 100)) + compareSerialization("Medium Properties", makeRandomProps(50, 40, 100)) + compareSerialization("Large Properties", makeRandomProps(100, 40, 100)) + } + } + + def makeRandomProps(numProperties: Int, keySize: Int, valueSize: Int): Properties = { + val props = new Properties + for (_ <- 1 to numProperties) { + props.put( + Random.alphanumeric.take(keySize), + Random.alphanumeric.take(valueSize) + ) + } + props + } +} diff --git a/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala b/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala index 75e450485067..0b1796540abb 100644 --- a/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala +++ b/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala @@ -19,7 +19,6 @@ package org.apache.spark.util import java.util.Properties -import org.apache.commons.lang3.SerializationUtils import org.scalatest.{BeforeAndAfterEach, Suite} /** @@ -43,11 +42,11 @@ private[spark] trait ResetSystemProperties extends BeforeAndAfterEach { this: Su var oldProperties: Properties = null override def beforeEach(): Unit = { - // we need SerializationUtils.clone instead of `new Properties(System.getProperties())` because + // we need Utils.cloneProperties instead of `new Properties(System.getProperties())` because // the later way of creating a copy does not copy the properties but it initializes a new // Properties object with the given properties as defaults. They are not recognized at all // by standard Scala wrapper over Java Properties then. - oldProperties = SerializationUtils.clone(System.getProperties) + oldProperties = Utils.cloneProperties(System.getProperties) super.beforeEach() } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 589dd877c8c9..21ffa78297ee 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -26,7 +26,6 @@ import scala.collection.mutable.Queue import scala.reflect.ClassTag import scala.util.control.NonFatal -import org.apache.commons.lang3.SerializationUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{BytesWritable, LongWritable, Text} @@ -586,7 +585,7 @@ class StreamingContext private[streaming] ( sparkContext.setCallSite(startSite.get) sparkContext.clearJobGroup() sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") - savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get())) + savedProperties.set(Utils.cloneProperties(sparkContext.localProperties.get())) scheduler.start() } state = StreamingContextState.ACTIVE diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 68594e8977cf..2388ca805980 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -22,8 +22,6 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import scala.collection.JavaConverters._ import scala.util.Failure -import org.apache.commons.lang3.SerializationUtils - import org.apache.spark.ExecutorAllocationClient import org.apache.spark.internal.Logging import org.apache.spark.internal.io.SparkHadoopWriterUtils @@ -31,7 +29,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.streaming._ import org.apache.spark.streaming.api.python.PythonDStream import org.apache.spark.streaming.ui.UIUtils -import org.apache.spark.util.{EventLoop, ThreadUtils} +import org.apache.spark.util.{EventLoop, ThreadUtils, Utils} private[scheduler] sealed trait JobSchedulerEvent @@ -231,7 +229,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { def run() { val oldProps = ssc.sparkContext.getLocalProperties try { - ssc.sparkContext.setLocalProperties(SerializationUtils.clone(ssc.savedProperties.get())) + ssc.sparkContext.setLocalProperties(Utils.cloneProperties(ssc.savedProperties.get())) val formattedTime = UIUtils.formatBatchTime( job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false) val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"