Skip to content

Commit cad16da

Browse files
committed
Add fallover increment logic for HttpServer
1 parent c5a0568 commit cad16da

File tree

1 file changed

+52
-29
lines changed

1 file changed

+52
-29
lines changed

core/src/main/scala/org/apache/spark/HttpServer.scala

Lines changed: 52 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -47,40 +47,63 @@ private[spark] class HttpServer(resourceBase: File,
4747
private var server: Server = null
4848
private var port: Int = localPort
4949

50+
private def startOnPort(startPort: Int) {
51+
val server = new Server()
52+
val connector = new SocketConnector
53+
connector.setMaxIdleTime(60*1000)
54+
connector.setSoLingerTime(-1)
55+
connector.setPort(startPort)
56+
server.addConnector(connector)
57+
58+
val threadPool = new QueuedThreadPool
59+
threadPool.setDaemon(true)
60+
server.setThreadPool(threadPool)
61+
val resHandler = new ResourceHandler
62+
resHandler.setResourceBase(resourceBase.getAbsolutePath)
63+
64+
val handlerList = new HandlerList
65+
handlerList.setHandlers(Array(resHandler, new DefaultHandler))
66+
67+
if (securityManager.isAuthenticationEnabled()) {
68+
logDebug("HttpServer is using security")
69+
val sh = setupSecurityHandler(securityManager)
70+
// make sure we go through security handler to get resources
71+
sh.setHandler(handlerList)
72+
server.setHandler(sh)
73+
} else {
74+
logDebug("HttpServer is not using security")
75+
server.setHandler(handlerList)
76+
}
77+
78+
server.start()
79+
val actualPort = server.getConnectors()(0).getLocalPort()
80+
81+
return (server, actualPort)
82+
}
83+
84+
private def startWithIncrements(startPort: Int, maxTries: Int) {
85+
for( tryPort <- startPort until (startPort+maxTries)) {
86+
try {
87+
val (server, actualPort) = startOnPort(startPort)
88+
return (server, actualPort)
89+
} catch {
90+
case e: java.net.BindException => {
91+
if (!e.getMessage.contains("Address already in use")) {
92+
throw e
93+
}
94+
logInfo("Could not bind on port: " + (tryPort))
95+
}
96+
case e: Exception => throw e
97+
}
98+
}
99+
}
100+
50101
def start() {
51102
if (server != null) {
52103
throw new ServerStateException("Server is already started")
53104
} else {
54105
logInfo("Starting HTTP Server")
55-
server = new Server()
56-
val connector = new SocketConnector
57-
connector.setMaxIdleTime(60*1000)
58-
connector.setSoLingerTime(-1)
59-
connector.setPort(localPort)
60-
server.addConnector(connector)
61-
62-
val threadPool = new QueuedThreadPool
63-
threadPool.setDaemon(true)
64-
server.setThreadPool(threadPool)
65-
val resHandler = new ResourceHandler
66-
resHandler.setResourceBase(resourceBase.getAbsolutePath)
67-
68-
val handlerList = new HandlerList
69-
handlerList.setHandlers(Array(resHandler, new DefaultHandler))
70-
71-
if (securityManager.isAuthenticationEnabled()) {
72-
logDebug("HttpServer is using security")
73-
val sh = setupSecurityHandler(securityManager)
74-
// make sure we go through security handler to get resources
75-
sh.setHandler(handlerList)
76-
server.setHandler(sh)
77-
} else {
78-
logDebug("HttpServer is not using security")
79-
server.setHandler(handlerList)
80-
}
81-
82-
server.start()
83-
port = server.getConnectors()(0).getLocalPort()
106+
(server, port) = startWithIncrements(localPort, 3)
84107
}
85108
}
86109

0 commit comments

Comments
 (0)