diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 2a2ce0504dbb..956724b14bba 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -579,7 +579,9 @@ private[spark] object SparkConf extends Logging {
"are no longer accepted. To specify the equivalent now, one may use '64k'."),
DeprecatedConfig("spark.rpc", "2.0", "Not used any more."),
DeprecatedConfig("spark.scheduler.executorTaskBlacklistTime", "2.1.0",
- "Please use the new blacklisting options, spark.blacklist.*")
+ "Please use the new blacklisting options, spark.blacklist.*"),
+ DeprecatedConfig("spark.yarn.am.port", "2.0.0", "Not used any more"),
+ DeprecatedConfig("spark.executor.port", "2.0.0", "Not used any more")
)
Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index f4a59f069a5f..3196c1ece15e 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -177,7 +177,7 @@ object SparkEnv extends Logging {
SparkContext.DRIVER_IDENTIFIER,
bindAddress,
advertiseAddress,
- port,
+ Option(port),
isLocal,
numCores,
ioEncryptionKey,
@@ -194,7 +194,6 @@ object SparkEnv extends Logging {
conf: SparkConf,
executorId: String,
hostname: String,
- port: Int,
numCores: Int,
ioEncryptionKey: Option[Array[Byte]],
isLocal: Boolean): SparkEnv = {
@@ -203,7 +202,7 @@ object SparkEnv extends Logging {
executorId,
hostname,
hostname,
- port,
+ None,
isLocal,
numCores,
ioEncryptionKey
@@ -220,7 +219,7 @@ object SparkEnv extends Logging {
executorId: String,
bindAddress: String,
advertiseAddress: String,
- port: Int,
+ port: Option[Int],
isLocal: Boolean,
numUsableCores: Int,
ioEncryptionKey: Option[Array[Byte]],
@@ -243,17 +242,12 @@ object SparkEnv extends Logging {
}
val systemName = if (isDriver) driverSystemName else executorSystemName
- val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,
+ val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
securityManager, clientMode = !isDriver)
// Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
- // In the non-driver case, the RPC env's address may be null since it may not be listening
- // for incoming connections.
if (isDriver) {
conf.set("spark.driver.port", rpcEnv.address.port.toString)
- } else if (rpcEnv.address != null) {
- conf.set("spark.executor.port", rpcEnv.address.port.toString)
- logInfo(s"Setting spark.executor.port to: ${rpcEnv.address.port.toString}")
}
// Create an instance of the class with the given name, possibly initializing it with our conf
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index b2b26ee107c0..a2f1aa22b006 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -191,11 +191,10 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
// Bootstrap to fetch the driver's Spark properties.
val executorConf = new SparkConf
- val port = executorConf.getInt("spark.executor.port", 0)
val fetcher = RpcEnv.create(
"driverPropsFetcher",
hostname,
- port,
+ -1,
executorConf,
new SecurityManager(executorConf),
clientMode = true)
@@ -221,7 +220,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
}
val env = SparkEnv.createExecutorEnv(
- driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false)
+ driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 314a806edf39..c1344ad99a7d 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -209,7 +209,7 @@ provide such guarantees on the offer stream.
In this mode spark executors will honor port allocation if such is
provided from the user. Specifically if the user defines
-`spark.executor.port` or `spark.blockManager.port` in Spark configuration,
+`spark.blockManager.port` in Spark configuration,
the mesos scheduler will check the available offers for a valid port
range containing the port numbers. If no such range is available it will
not launch any task. If no restriction is imposed on port numbers by the
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index e9ddaa76a797..2d56123028f2 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -239,13 +239,6 @@ To use a custom metrics.properties for the application master and executors, upd
Same as spark.yarn.driver.memoryOverhead, but for the YARN Application Master in client mode.
-
- spark.yarn.am.port |
- (random) |
-
- Port for the YARN Application Master to listen on. In YARN client mode, this is used to communicate between the Spark driver running on a gateway and the YARN Application Master running on YARN. In YARN cluster mode, this is used for the dynamic executor feature, where it handles the kill from the scheduler backend.
- |
-
spark.yarn.queue |
default |
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index a086ec7ea2da..61bfa27a84fd 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -74,9 +74,8 @@ private[spark] class MesosExecutorBackend
val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++
Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue))
val conf = new SparkConf(loadDefaults = true).setAll(properties)
- val port = conf.getInt("spark.executor.port", 0)
val env = SparkEnv.createExecutorEnv(
- conf, executorId, slaveInfo.getHostname, port, cpusPerTask, None, isLocal = false)
+ conf, executorId, slaveInfo.getHostname, cpusPerTask, None, isLocal = false)
executor = new Executor(
executorId,
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 9d81025a3016..062ed1f93fa5 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -438,7 +438,7 @@ trait MesosSchedulerUtils extends Logging {
}
}
- val managedPortNames = List("spark.executor.port", BLOCK_MANAGER_PORT.key)
+ val managedPortNames = List(BLOCK_MANAGER_PORT.key)
/**
* The values of the non-zero ports to be used by the executor process.
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
index ec47ab153177..5d4bf6d082c4 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
@@ -179,40 +179,25 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
test("Port reservation is done correctly with user specified ports only") {
val conf = new SparkConf()
- conf.set("spark.executor.port", "3000" )
conf.set(BLOCK_MANAGER_PORT, 4000)
val portResource = createTestPortResource((3000, 5000), Some("my_role"))
val (resourcesLeft, resourcesToBeUsed) = utils
- .partitionPortResources(List(3000, 4000), List(portResource))
- resourcesToBeUsed.length shouldBe 2
+ .partitionPortResources(List(4000), List(portResource))
+ resourcesToBeUsed.length shouldBe 1
val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}.toArray
- portsToUse.length shouldBe 2
- arePortsEqual(portsToUse, Array(3000L, 4000L)) shouldBe true
+ portsToUse.length shouldBe 1
+ arePortsEqual(portsToUse, Array(4000L)) shouldBe true
val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)
- val expectedUSed = Array((3000L, 3000L), (4000L, 4000L))
+ val expectedUSed = Array((4000L, 4000L))
arePortsEqual(portRangesToBeUsed.toArray, expectedUSed) shouldBe true
}
- test("Port reservation is done correctly with some user specified ports (spark.executor.port)") {
- val conf = new SparkConf()
- conf.set("spark.executor.port", "3100" )
- val portResource = createTestPortResource((3000, 5000), Some("my_role"))
-
- val (resourcesLeft, resourcesToBeUsed) = utils
- .partitionPortResources(List(3100), List(portResource))
-
- val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
-
- portsToUse.length shouldBe 1
- portsToUse.contains(3100) shouldBe true
- }
-
test("Port reservation is done correctly with all random ports") {
val conf = new SparkConf()
val portResource = createTestPortResource((3000L, 5000L), Some("my_role"))
@@ -226,21 +211,20 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
test("Port reservation is done correctly with user specified ports only - multiple ranges") {
val conf = new SparkConf()
- conf.set("spark.executor.port", "2100" )
conf.set("spark.blockManager.port", "4000")
val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")),
createTestPortResource((2000, 2500), Some("other_role")))
val (resourcesLeft, resourcesToBeUsed) = utils
- .partitionPortResources(List(2100, 4000), portResourceList)
+ .partitionPortResources(List(4000), portResourceList)
val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
- portsToUse.length shouldBe 2
+ portsToUse.length shouldBe 1
val portsRangesLeft = rangesResourcesToTuple(resourcesLeft)
val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)
- val expectedUsed = Array((2100L, 2100L), (4000L, 4000L))
+ val expectedUsed = Array((4000L, 4000L))
- arePortsEqual(portsToUse.toArray, Array(2100L, 4000L)) shouldBe true
+ arePortsEqual(portsToUse.toArray, Array(4000L)) shouldBe true
arePortsEqual(portRangesToBeUsed.toArray, expectedUsed) shouldBe true
}
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 864c834d110f..6da2c0b5f330 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -429,8 +429,7 @@ private[spark] class ApplicationMaster(
}
private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
- val port = sparkConf.get(AM_PORT)
- rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr,
+ rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, -1, sparkConf, securityMgr,
clientMode = true)
val driverRef = waitForSparkDriver()
addAmIpFilter()
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index d8c96c35ca71..d4108caab28c 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -40,11 +40,6 @@ package object config {
.timeConf(TimeUnit.MILLISECONDS)
.createOptional
- private[spark] val AM_PORT =
- ConfigBuilder("spark.yarn.am.port")
- .intConf
- .createWithDefault(0)
-
private[spark] val EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
ConfigBuilder("spark.yarn.executor.failuresValidityInterval")
.doc("Interval after which Executor failures will be considered independent and not " +