File tree Expand file tree Collapse file tree 4 files changed +11
-7
lines changed
core/src/main/scala/org/apache/spark
yarn/src/main/scala/org/apache/spark/deploy/yarn Expand file tree Collapse file tree 4 files changed +11
-7
lines changed Original file line number Diff line number Diff line change @@ -372,5 +372,5 @@ private[spark] object SparkConf {
372372 /**
373373 * Return whether the given config is a Spark port config.
374374 */
375- def isSparkPortConf (name : String ): Boolean = name.startsWith(" spark." ) && name.endsWith (" .port" )
375+ def isSparkPortConf (name : String ): Boolean = name.startsWith(" spark." ) && name.contains (" .port" )
376376}
Original file line number Diff line number Diff line change @@ -176,6 +176,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
176176 logInfo(s " Running Spark version $SPARK_VERSION" )
177177
178178 private [spark] val conf = config.clone()
179+ val portRetriesConf = conf.getOption(" spark.port.maxRetries" )
180+ if (portRetriesConf.isDefined) {
181+ System .setProperty(" spark.port.maxRetries" , portRetriesConf.get)
182+ }
179183 conf.validateSettings()
180184
181185 /**
Original file line number Diff line number Diff line change @@ -1691,15 +1691,12 @@ private[spark] object Utils extends Logging {
16911691 /**
16921692 * Default maximum number of retries when binding to a port before giving up.
16931693 */
1694- val portMaxRetries : Int = {
1694+ lazy val portMaxRetries : Int = {
16951695 if (sys.props.contains(" spark.testing" )) {
16961696 // Set a higher number of retries for tests...
16971697 sys.props.get(" spark.port.maxRetries" ).map(_.toInt).getOrElse(100 )
16981698 } else {
1699- Option (SparkEnv .get)
1700- .flatMap(_.conf.getOption(" spark.port.maxRetries" ))
1701- .map(_.toInt)
1702- .getOrElse(16 )
1699+ sys.props.get(" spark.port.maxRetries" ).map(_.toInt).getOrElse(16 )
17031700 }
17041701 }
17051702
@@ -1719,6 +1716,7 @@ private[spark] object Utils extends Logging {
17191716 serviceName : String = " " ,
17201717 maxRetries : Int = portMaxRetries): (T , Int ) = {
17211718 val serviceString = if (serviceName.isEmpty) " " else s " ' $serviceName' "
1719+ logInfo(s " Starting service $serviceString on port $port with maximum $maxRetries retries. " )
17221720 for (offset <- 0 to maxRetries) {
17231721 // Do not increment port if startPort is 0, which is treated as a special port
17241722 val tryPort = if (startPort == 0 ) {
Original file line number Diff line number Diff line change @@ -76,7 +76,9 @@ trait ExecutorRunnableUtil extends Logging {
7676 // uses Akka to connect to the scheduler, the akka settings are needed as well as the
7777 // authentication settings.
7878 sparkConf.getAll.
79- filter { case (k, v) => k.startsWith(" spark.auth" ) || k.startsWith(" spark.akka" ) }.
79+ filter { case (k, v) =>
80+ k.startsWith(" spark.auth" ) || k.startsWith(" spark.akka" ) || k.equals(" spark.port.maxRetries" )
81+ }.
8082 foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil .escapeForShell(s " -D $k= $v" ) }
8183
8284 sparkConf.getAkkaConf.
You can’t perform that action at this time.
0 commit comments