@@ -19,6 +19,7 @@ package org.apache.spark.ui
1919
2020import java .net .InetSocketAddress
2121import java .net .URL
22+ import javax .servlet .DispatcherType
2223import javax .servlet .http .{HttpServlet , HttpServletResponse , HttpServletRequest }
2324
2425import scala .annotation .tailrec
@@ -28,7 +29,7 @@ import scala.xml.Node
2829import org .json4s .JValue
2930import org .json4s .jackson .JsonMethods .{pretty , render }
3031
31- import org .eclipse .jetty .server .{DispatcherType , Server }
32+ import org .eclipse .jetty .server .{NetworkConnector , Server }
3233import org .eclipse .jetty .server .handler .HandlerList
3334import org .eclipse .jetty .servlet .{DefaultServlet , FilterHolder , ServletContextHandler , ServletHolder }
3435import org .eclipse .jetty .util .thread .QueuedThreadPool
@@ -60,8 +61,7 @@ private[spark] object JettyUtils extends Logging {
6061 def createServlet [T <% AnyRef ](servletParams : ServletParams [T ],
6162 securityMgr : SecurityManager ): HttpServlet = {
6263 new HttpServlet {
63- override def doGet (request : HttpServletRequest ,
64- response : HttpServletResponse ) {
64+ override def doGet (request : HttpServletRequest , response : HttpServletResponse ) {
6565 if (securityMgr.checkUIViewPermissions(request.getRemoteUser())) {
6666 response.setContentType(" %s;charset=utf-8" .format(servletParams.contentType))
6767 response.setStatus(HttpServletResponse .SC_OK )
@@ -72,7 +72,7 @@ private[spark] object JettyUtils extends Logging {
7272 response.setStatus(HttpServletResponse .SC_UNAUTHORIZED )
7373 response.setHeader(" Cache-Control" , " no-cache, no-store, must-revalidate" )
7474 response.sendError(HttpServletResponse .SC_UNAUTHORIZED ,
75- " User is not authorized to access this page." );
75+ " User is not authorized to access this page." )
7676 }
7777 }
7878 }
@@ -120,26 +120,25 @@ private[spark] object JettyUtils extends Logging {
120120
121121 private def addFilters (handlers : Seq [ServletContextHandler ], conf : SparkConf ) {
122122 val filters : Array [String ] = conf.get(" spark.ui.filters" , " " ).split(',' ).map(_.trim())
123- filters.foreach {
124- case filter : String =>
125- if (! filter.isEmpty) {
126- logInfo(" Adding filter: " + filter)
127- val holder : FilterHolder = new FilterHolder ()
128- holder.setClassName(filter)
129- // get any parameters for each filter
130- val paramName = " spark." + filter + " .params"
131- val params = conf.get(paramName, " " ).split(',' ).map(_.trim()).toSet
132- params.foreach {
133- case param : String =>
134- if (! param.isEmpty) {
135- val parts = param.split(" =" )
136- if (parts.length == 2 ) holder.setInitParameter(parts(0 ), parts(1 ))
137- }
138- }
139- val enumDispatcher = java.util.EnumSet .of(DispatcherType .ASYNC , DispatcherType .ERROR ,
140- DispatcherType .FORWARD , DispatcherType .INCLUDE , DispatcherType .REQUEST )
141- handlers.foreach { case (handler) => handler.addFilter(holder, " /*" , enumDispatcher) }
123+ filters.foreach { filter =>
124+ if (! filter.isEmpty) {
125+ logInfo(" Adding filter: " + filter)
126+ val holder : FilterHolder = new FilterHolder ()
127+ holder.setClassName(filter)
128+ // get any parameters for each filter
129+ val paramName = " spark." + filter + " .params"
130+ val params = conf.get(paramName, " " ).split(',' ).map(_.trim()).toSet
131+ params.foreach {
132+ case param : String =>
133+ if (! param.isEmpty) {
134+ val parts = param.split(" =" )
135+ if (parts.length == 2 ) holder.setInitParameter(parts(0 ), parts(1 ))
136+ }
142137 }
138+ val enumDispatcher = java.util.EnumSet .of(DispatcherType .ASYNC , DispatcherType .ERROR ,
139+ DispatcherType .FORWARD , DispatcherType .INCLUDE , DispatcherType .REQUEST )
140+ handlers.foreach { handler => handler.addFilter(holder, " /*" , enumDispatcher) }
141+ }
143142 }
144143 }
145144
@@ -150,7 +149,10 @@ private[spark] object JettyUtils extends Logging {
150149 * If the desired port number is contented, continues incrementing ports until a free port is
151150 * found. Returns the chosen port and the jetty Server object.
152151 */
153- def startJettyServer (hostName : String , port : Int , handlers : Seq [ServletContextHandler ],
152+ def startJettyServer (
153+ hostName : String ,
154+ port : Int ,
155+ handlers : Seq [ServletContextHandler ],
154156 conf : SparkConf ): (Server , Int ) = {
155157
156158 addFilters(handlers, conf)
@@ -160,16 +162,18 @@ private[spark] object JettyUtils extends Logging {
160162 @ tailrec
161163 def connect (currentPort : Int ): (Server , Int ) = {
162164 val server = new Server (new InetSocketAddress (hostName, currentPort))
163- val pool = new QueuedThreadPool
165+ // Unfortunately Jetty 9 doesn't allow us to set both the thread pool and the port number in
166+ // constructor. But fortunately the pool allocated by Jetty is always a QueuedThreadPool.
167+ val pool = server.getThreadPool.asInstanceOf [QueuedThreadPool ]
164168 pool.setDaemon(true )
165- server.setThreadPool(pool)
169+
166170 server.setHandler(handlerList)
167171
168172 Try {
169173 server.start()
170174 } match {
171175 case s : Success [_] =>
172- (server, server.getConnectors.head.getLocalPort)
176+ (server, server.getConnectors.head.asInstanceOf [ NetworkConnector ]. getLocalPort)
173177 case f : Failure [_] =>
174178 server.stop()
175179 logInfo(" Failed to create UI at port, %s. Trying again." .format(currentPort))
0 commit comments