From f0522074018c2fc9e6efbac2f505ca347c155d21 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 16 Feb 2022 14:41:54 +0800 Subject: [PATCH 1/4] fix --- .../scala/org/apache/spark/deploy/master/Master.scala | 9 +-------- .../spark/deploy/rest/SubmitRestProtocolMessage.scala | 3 --- .../spark/executor/CoarseGrainedExecutorBackend.scala | 5 ----- .../main/scala/org/apache/spark/rdd/NewHadoopRDD.scala | 1 - .../scala/org/apache/spark/resource/ResourceUtils.scala | 2 -- .../org/apache/spark/graphx/impl/EdgePartition.scala | 3 --- .../scala/org/apache/spark/graphx/impl/GraphImpl.scala | 9 --------- 7 files changed, 1 insertion(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 7dbf6b92b4088..775b27bcbf279 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -25,7 +25,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.util.Random import org.apache.spark.{SecurityManager, SparkConf, SparkException} -import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState, SparkHadoopUtil} +import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.DriverState.DriverState import org.apache.spark.deploy.master.MasterMessages._ @@ -53,8 +53,6 @@ private[deploy] class Master( private val forwardMessageThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread") - private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) - // For application IDs private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US) @@ -95,11 +93,6 @@ private[deploy] class Master( // After onStart, webUi will be set private var webUi: MasterWebUI = null - private val masterPublicAddress = { - val envVar = conf.getenv("SPARK_PUBLIC_DNS") - if (envVar != null) envVar else address.host - } - private val masterUrl = address.toSparkURL private var masterWebUiUrl: String = _ diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala index 97b689cdadd5f..6340f1a6ab2e8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala @@ -46,9 +46,6 @@ private[rest] abstract class SubmitRestProtocolMessage { val action: String = messageType var message: String = null - // For JSON deserialization - private def setAction(a: String): Unit = { } - /** * Serialize the message to JSON. * This also ensures that the message is valid and its fields are in the expected format. diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index fb7b4e62150db..a94e63656e1a1 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -42,7 +42,6 @@ import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.rpc._ import org.apache.spark.scheduler.{ExecutorLossMessage, ExecutorLossReason, TaskDescription} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ -import org.apache.spark.serializer.SerializerInstance import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, SignalUtils, ThreadUtils, Utils} private[spark] class CoarseGrainedExecutorBackend( @@ -65,10 +64,6 @@ private[spark] class CoarseGrainedExecutorBackend( var executor: Executor = null @volatile var driver: Option[RpcEndpointRef] = None - // If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need - // to be changed so that we don't share the serializer instance across threads - private[this] val ser: SerializerInstance = env.closureSerializer.newInstance() - private var _resources = Map.empty[String, ResourceInformation] /** diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index c6959a5a4dafa..596298b222e05 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -244,7 +244,6 @@ class NewHadoopRDD[K, V]( } private var havePair = false - private var recordsSinceMetricsUpdate = 0 override def hasNext: Boolean = { if (!finished && !havePair) { diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala index 837b2d80aace6..e711317af2f53 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala @@ -385,8 +385,6 @@ private[spark] object ResourceUtils extends Logging { val pluginClasses = sparkConf.get(RESOURCES_DISCOVERY_PLUGIN) :+ discoveryScriptPlugin val resourcePlugins = Utils.loadExtensions(classOf[ResourceDiscoveryPlugin], pluginClasses, sparkConf) - // apply each plugin until one of them returns the information for this resource - var riOption: Optional[ResourceInformation] = Optional.empty() resourcePlugins.foreach { plugin => val riOption = plugin.discoverResource(resourceRequest, sparkConf) if (riOption.isPresent()) { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala index c0a2ba67d2942..1bb802550acdc 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala @@ -64,9 +64,6 @@ class EdgePartition[ activeSet: Option[VertexSet]) extends Serializable { - /** No-arg constructor for serialization. */ - private def this() = this(null, null, null, null, null, null, null, null) - /** Return a new `EdgePartition` with the specified edge data. */ def withData[ED2: ClassTag](data: Array[ED2]): EdgePartition[ED2, VD] = { new EdgePartition( diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 8564597f4f135..4a790878cf9dc 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -21,7 +21,6 @@ import scala.reflect.{classTag, ClassTag} import org.apache.spark.HashPartitioner import org.apache.spark.graphx._ -import org.apache.spark.graphx.util.BytecodeUtils import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -265,14 +264,6 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( } } - /** Test whether the closure accesses the attribute with name `attrName`. */ - private def accessesVertexAttr(closure: AnyRef, attrName: String): Boolean = { - try { - BytecodeUtils.invokedMethod(closure, classOf[EdgeTriplet[VD, ED]], attrName) - } catch { - case _: ClassNotFoundException => true // if we don't know, be conservative - } - } } // end of class GraphImpl From 740edc66e5901c7618169ed16e9e0d4ddc6e6706 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 16 Feb 2022 17:25:31 +0800 Subject: [PATCH 2/4] revert --- .../scala/org/apache/spark/graphx/impl/EdgePartition.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala index 1bb802550acdc..c0a2ba67d2942 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala @@ -64,6 +64,9 @@ class EdgePartition[ activeSet: Option[VertexSet]) extends Serializable { + /** No-arg constructor for serialization. */ + private def this() = this(null, null, null, null, null, null, null, null) + /** Return a new `EdgePartition` with the specified edge data. */ def withData[ED2: ClassTag](data: Array[ED2]): EdgePartition[ED2, VD] = { new EdgePartition( From eff79e2d798a15f7f7c57cd2f3e359984b04e4b7 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Fri, 18 Feb 2022 15:13:14 +0800 Subject: [PATCH 3/4] restore the comments in ResourceUtils --- .../src/main/scala/org/apache/spark/resource/ResourceUtils.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala index e711317af2f53..3f0a0d36dff6e 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala @@ -385,6 +385,7 @@ private[spark] object ResourceUtils extends Logging { val pluginClasses = sparkConf.get(RESOURCES_DISCOVERY_PLUGIN) :+ discoveryScriptPlugin val resourcePlugins = Utils.loadExtensions(classOf[ResourceDiscoveryPlugin], pluginClasses, sparkConf) + // apply each plugin until one of them returns the information for this resource resourcePlugins.foreach { plugin => val riOption = plugin.discoverResource(resourceRequest, sparkConf) if (riOption.isPresent()) { From b2f2f35973dfcdb736f943d72b1756f7c99352db Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Fri, 18 Feb 2022 21:02:06 +0800 Subject: [PATCH 4/4] revert change of SubmitRestProtocolMessage --- .../apache/spark/deploy/rest/SubmitRestProtocolMessage.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala index 6340f1a6ab2e8..97b689cdadd5f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala @@ -46,6 +46,9 @@ private[rest] abstract class SubmitRestProtocolMessage { val action: String = messageType var message: String = null + // For JSON deserialization + private def setAction(a: String): Unit = { } + /** * Serialize the message to JSON. * This also ensures that the message is valid and its fields are in the expected format.