Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ private[spark] class HttpServer(
} else {
logInfo("Starting HTTP Server")
val (actualServer, actualPort) =
Utils.startServiceOnPort[Server](requestedPort, doStart, serverName)
Utils.startServiceOnPort[Server](
requestedPort, doStart, securityManager.sparkConf, serverName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems a bit odd here to be grabbing the spark conf out of the securityManager. Generally it would be better to pass into HttpServer itself. I'll leave that up to one of the core committers though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both ok. If we pass into HttpServer a sparkConf, actually it is same with securityManager.sparkConf

server = actualServer
port = actualPort
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ import org.apache.spark.network.sasl.SecretKeyHolder
* can take place.
*/

private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with SecretKeyHolder {
private[spark] class SecurityManager(
val sparkConf: SparkConf) extends Logging with SecretKeyHolder {

// key used to store the spark secret in the Hadoop UGI
private val sparkSecretLookupKey = "sparkCookie"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private[nio] class ConnectionManager(
serverChannel.socket.bind(new InetSocketAddress(port))
(serverChannel, serverChannel.socket.getLocalPort)
}
Utils.startServiceOnPort[ServerSocketChannel](port, startService, name)
Utils.startServiceOnPort[ServerSocketChannel](port, startService, conf, name)
serverChannel.register(selector, SelectionKey.OP_ACCEPT)

val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ private[spark] object JettyUtils extends Logging {
}
}

val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, serverName)
val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, conf, serverName)
ServerInfo(server, boundPort, collection)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private[spark] object AkkaUtils extends Logging {
val startService: Int => (ActorSystem, Int) = { actualPort =>
doCreateActorSystem(name, host, actualPort, conf, securityManager)
}
Utils.startServiceOnPort(port, startService, name)
Utils.startServiceOnPort(port, startService, conf, name)
}

private def doCreateActorSystem(
Expand Down
17 changes: 10 additions & 7 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1652,25 +1652,28 @@ private[spark] object Utils extends Logging {
* Attempt to start a service on the given port, or fail after a number of attempts.
* Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0).
*
* @param startPort The initial port to start the service on.
* @param port The initial port to start the service on.
* @param maxRetries Maximum number of retries to attempt.
* A value of 3 means attempting ports n, n+1, n+2, and n+3, for example.
* @param startService Function to start service on a given port.
* This is expected to throw java.net.BindException on port collision.
*/
def startServiceOnPort[T](
startPort: Int,
port: Int,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this api, startPort means it would try from this port. I think startPort is a better name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have used startPort and endPort for range, Using port here also make sense

startService: Int => (T, Int),
conf: SparkConf = new SparkConf(),
serviceName: String = "",
maxRetries: Int = portMaxRetries): (T, Int) = {
val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
val startPort = conf.getInt("spark.port.min", 1024)
val endPort = conf.getInt("spark.port.max", 65536)
for (offset <- 0 to maxRetries) {
// Do not increment port if startPort is 0, which is treated as a special port
val tryPort = if (startPort == 0) {
startPort
// Do not increment port if port is 0, which is treated as a special port
val tryPort = if (port == 0) {
port
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps I'm missing something, why aren't we applying the restriction when using port 0 (ephemeral port), most of the things default to 0 which is pick a port, we want those to end up in this range.

It seems like this would be more clear if the range is specified, just to ignore port passed in and iterate over that range.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I think your last comment/question hits on this issue. that seems better. As long as all the services default to port 0 (other then web ui) this seems fine. That way if user does specify a port explicitly it will still use that.

} else {
// If the new port wraps around, do not try a privilege port
((startPort + offset - 1024) % (65536 - 1024)) + 1024
// If the new port wraps around, ensure it is in range(startPort, endPort)
((port + offset) % (endPort - startPort + 1)) + startPort
}
try {
val (service, port) = startService(tryPort)
Expand Down
14 changes: 14 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,20 @@ of the most common options to set are:
Logs the effective SparkConf as INFO when a SparkContext is started.
</td>
</tr>
<tr>
<td><code>spark.port.min</code></td>
<td>1024</td>
<td>
Min port for spark(UI, HttpServer. ConnectionManager, Akka)
</td>
</tr>
<tr>
<td><code>spark.port.max</code></td>
<td>65536</td>
<td>
Max port for spark(UI, HttpServer. ConnectionManager, Akka)
</td>
</tr>
</table>

Apart from these, the following properties are also available, and may be useful in some situations:
Expand Down