@@ -19,7 +19,6 @@ package org.apache.spark.ui
1919
2020import java .net .InetSocketAddress
2121import java .net .URL
22- import javax .servlet .DispatcherType
2322import javax .servlet .http .{HttpServlet , HttpServletResponse , HttpServletRequest }
2423
2524import scala .annotation .tailrec
@@ -29,7 +28,7 @@ import scala.xml.Node
2928import org .json4s .JValue
3029import org .json4s .jackson .JsonMethods .{pretty , render }
3130
32- import org .eclipse .jetty .server .{NetworkConnector , Server }
31+ import org .eclipse .jetty .server .{DispatcherType , Server }
3332import org .eclipse .jetty .server .handler .HandlerList
3433import org .eclipse .jetty .servlet .{DefaultServlet , FilterHolder , ServletContextHandler , ServletHolder }
3534import org .eclipse .jetty .util .thread .QueuedThreadPool
@@ -61,7 +60,8 @@ private[spark] object JettyUtils extends Logging {
6160 def createServlet [T <% AnyRef ](servletParams : ServletParams [T ],
6261 securityMgr : SecurityManager ): HttpServlet = {
6362 new HttpServlet {
64- override def doGet (request : HttpServletRequest , response : HttpServletResponse ) {
63+ override def doGet (request : HttpServletRequest ,
64+ response : HttpServletResponse ) {
6565 if (securityMgr.checkUIViewPermissions(request.getRemoteUser())) {
6666 response.setContentType(" %s;charset=utf-8" .format(servletParams.contentType))
6767 response.setStatus(HttpServletResponse .SC_OK )
@@ -72,7 +72,7 @@ private[spark] object JettyUtils extends Logging {
7272 response.setStatus(HttpServletResponse .SC_UNAUTHORIZED )
7373 response.setHeader(" Cache-Control" , " no-cache, no-store, must-revalidate" )
7474 response.sendError(HttpServletResponse .SC_UNAUTHORIZED ,
75- " User is not authorized to access this page." )
75+ " User is not authorized to access this page." );
7676 }
7777 }
7878 }
@@ -120,25 +120,26 @@ private[spark] object JettyUtils extends Logging {
120120
121121 private def addFilters (handlers : Seq [ServletContextHandler ], conf : SparkConf ) {
122122 val filters : Array [String ] = conf.get(" spark.ui.filters" , " " ).split(',' ).map(_.trim())
123- filters.foreach { filter =>
124- if (! filter.isEmpty) {
125- logInfo(" Adding filter: " + filter)
126- val holder : FilterHolder = new FilterHolder ()
127- holder.setClassName(filter)
128- // get any parameters for each filter
129- val paramName = " spark." + filter + " .params"
130- val params = conf.get(paramName, " " ).split(',' ).map(_.trim()).toSet
131- params.foreach {
132- case param : String =>
133- if (! param.isEmpty) {
134- val parts = param.split(" =" )
135- if (parts.length == 2 ) holder.setInitParameter(parts(0 ), parts(1 ))
136- }
123+ filters.foreach {
124+ case filter : String =>
125+ if (! filter.isEmpty) {
126+ logInfo(" Adding filter: " + filter)
127+ val holder : FilterHolder = new FilterHolder ()
128+ holder.setClassName(filter)
129+ // get any parameters for each filter
130+ val paramName = " spark." + filter + " .params"
131+ val params = conf.get(paramName, " " ).split(',' ).map(_.trim()).toSet
132+ params.foreach {
133+ case param : String =>
134+ if (! param.isEmpty) {
135+ val parts = param.split(" =" )
136+ if (parts.length == 2 ) holder.setInitParameter(parts(0 ), parts(1 ))
137+ }
138+ }
139+ val enumDispatcher = java.util.EnumSet .of(DispatcherType .ASYNC , DispatcherType .ERROR ,
140+ DispatcherType .FORWARD , DispatcherType .INCLUDE , DispatcherType .REQUEST )
141+ handlers.foreach { case (handler) => handler.addFilter(holder, " /*" , enumDispatcher) }
137142 }
138- val enumDispatcher = java.util.EnumSet .of(DispatcherType .ASYNC , DispatcherType .ERROR ,
139- DispatcherType .FORWARD , DispatcherType .INCLUDE , DispatcherType .REQUEST )
140- handlers.foreach { handler => handler.addFilter(holder, " /*" , enumDispatcher) }
141- }
142143 }
143144 }
144145
@@ -149,10 +150,7 @@ private[spark] object JettyUtils extends Logging {
149150 * If the desired port number is contented, continues incrementing ports until a free port is
150151 * found. Returns the chosen port and the jetty Server object.
151152 */
152- def startJettyServer (
153- hostName : String ,
154- port : Int ,
155- handlers : Seq [ServletContextHandler ],
153+ def startJettyServer (hostName : String , port : Int , handlers : Seq [ServletContextHandler ],
156154 conf : SparkConf ): (Server , Int ) = {
157155
158156 addFilters(handlers, conf)
@@ -162,18 +160,16 @@ private[spark] object JettyUtils extends Logging {
162160 @ tailrec
163161 def connect (currentPort : Int ): (Server , Int ) = {
164162 val server = new Server (new InetSocketAddress (hostName, currentPort))
165- // Unfortunately Jetty 9 doesn't allow us to set both the thread pool and the port number in
166- // constructor. But fortunately the pool allocated by Jetty is always a QueuedThreadPool.
167- val pool = server.getThreadPool.asInstanceOf [QueuedThreadPool ]
163+ val pool = new QueuedThreadPool
168164 pool.setDaemon(true )
169-
165+ server.setThreadPool(pool)
170166 server.setHandler(handlerList)
171167
172168 Try {
173169 server.start()
174170 } match {
175171 case s : Success [_] =>
176- (server, server.getConnectors.head.asInstanceOf [ NetworkConnector ]. getLocalPort)
172+ (server, server.getConnectors.head.getLocalPort)
177173 case f : Failure [_] =>
178174 server.stop()
179175 logInfo(" Failed to create UI at port, %s. Trying again." .format(currentPort))
0 commit comments