Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions connector/kafka-0-10-sql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<groupId>org.eclipse.jetty.ee10</groupId>
<artifactId>jetty-ee10-servlet</artifactId>
<version>${jetty.version}</version>
<scope>test</scope>
</dependency>
Expand Down
16 changes: 8 additions & 8 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@
<!-- Jetty dependencies promoted to compile here so they are shaded
and inlined into spark-core jar -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-plus</artifactId>
<groupId>org.eclipse.jetty.ee10</groupId>
<artifactId>jetty-ee10-plus</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
Expand All @@ -147,13 +147,13 @@
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<groupId>org.eclipse.jetty.ee10</groupId>
<artifactId>jetty-ee10-servlet</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-proxy</artifactId>
<groupId>org.eclipse.jetty.ee10</groupId>
<artifactId>jetty-ee10-proxy</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
Expand All @@ -162,8 +162,8 @@
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlets</artifactId>
<groupId>org.eclipse.jetty.ee10</groupId>
<artifactId>jetty-ee10-servlets</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
Expand Down
9 changes: 5 additions & 4 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,16 @@ import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilderFact
import org.eclipse.jetty.server.Handler
import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.handler.DefaultHandler
import org.eclipse.jetty.server.handler.HandlerList
import org.eclipse.jetty.server.handler.ResourceHandler
import org.eclipse.jetty.util.resource.ResourceFactory
import org.json4s.JsonAST.JValue
import org.json4s.jackson.JsonMethods.{compact, render}

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.util.{SparkTestUtils, Utils}


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this kind of empty lines to minimize your PR.

/**
* Utilities for tests. Included in main codebase since it's used by multiple
* projects.
Expand Down Expand Up @@ -335,9 +336,9 @@ private[spark] object TestUtils extends SparkTestUtils {
// 0 as port means choosing randomly from the available ports
val server = new Server(new InetSocketAddress(Utils.localCanonicalHostName(), 0))
val resHandler = new ResourceHandler()
resHandler.setResourceBase(resBaseDir)
val handlers = new HandlerList()
handlers.setHandlers(Array[Handler](resHandler, new DefaultHandler()))
resHandler.setBaseResource(ResourceFactory.of(resHandler).newResource(resBaseDir))
val handlers = new Handler.Sequence;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please follow the Scala style. We don't need ; at the end.

handlers.setHandlers(resHandler, new DefaultHandler())
server.setHandler(handlers)
server.start()
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, Removal
import com.google.common.util.concurrent.UncheckedExecutionException
import jakarta.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, ServletException, ServletRequest, ServletResponse}
import jakarta.servlet.http.{HttpServletRequest, HttpServletResponse}
import org.eclipse.jetty.servlet.FilterHolder
import org.eclipse.jetty.ee10.servlet.FilterHolder

import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.Source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.util.control.NonFatal
import scala.xml.Node

import jakarta.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
import org.eclipse.jetty.ee10.servlet.{ServletContextHandler, ServletHolder}

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import scala.io.Source

import com.fasterxml.jackson.core.JsonProcessingException
import jakarta.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
import org.eclipse.jetty.ee10.servlet.{ServletContextHandler, ServletHolder}
import org.eclipse.jetty.server.{HttpConnectionFactory, Server, ServerConnector}
import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
import org.eclipse.jetty.util.thread.{QueuedThreadPool, ScheduledExecutorScheduler}
import org.json4s._
import org.json4s.jackson.JsonMethods._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit
import scala.collection.mutable

import com.codahale.metrics.{Metric, MetricRegistry}
import org.eclipse.jetty.servlet.ServletContextHandler
import org.eclipse.jetty.ee10.servlet.ServletContextHandler

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.internal.Logging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.json.MetricsModule
import com.fasterxml.jackson.databind.ObjectMapper
import jakarta.servlet.http.HttpServletRequest
import org.eclipse.jetty.servlet.ServletContextHandler
import org.eclipse.jetty.ee10.servlet.ServletContextHandler

import org.apache.spark.SparkConf
import org.apache.spark.ui.JettyUtils._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.Properties

import com.codahale.metrics.MetricRegistry
import jakarta.servlet.http.HttpServletRequest
import org.eclipse.jetty.servlet.ServletContextHandler
import org.eclipse.jetty.ee10.servlet.ServletContextHandler

import org.apache.spark.SparkConf
import org.apache.spark.ui.JettyUtils._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import jakarta.servlet.ServletContext
import jakarta.servlet.http.HttpServletRequest
import jakarta.ws.rs._
import jakarta.ws.rs.core.{Context, Response}
import org.eclipse.jetty.ee10.servlet.{ServletContextHandler, ServletHolder}
import org.eclipse.jetty.server.handler.ContextHandler
import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
import org.glassfish.jersey.server.ServerProperties
import org.glassfish.jersey.servlet.ServletContainer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.status.api.v1

import jakarta.ws.rs._
import jakarta.ws.rs.core.MediaType
import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
import org.eclipse.jetty.ee10.servlet.{ServletContextHandler, ServletHolder}
import org.glassfish.jersey.server.ServerProperties
import org.glassfish.jersey.servlet.ServletContainer

Expand Down
125 changes: 64 additions & 61 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,25 @@
package org.apache.spark.ui

import java.net.{URI, URL, URLDecoder}
import java.nio.charset.Charset
import java.util.EnumSet

import scala.jdk.CollectionConverters._
import scala.language.implicitConversions
import scala.util.Try
import scala.xml.Node

import jakarta.servlet.DispatcherType
import jakarta.servlet.http._
import org.eclipse.jetty.client.HttpClient
import org.eclipse.jetty.client.api.Response
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP
import org.eclipse.jetty.proxy.ProxyServlet
import org.eclipse.jetty.client.transport.HttpClientTransportOverHTTP
import org.eclipse.jetty.ee10.proxy.ProxyServlet
import org.eclipse.jetty.ee10.servlet.{DefaultServlet, FilterHolder, ServletContextHandler, ServletHolder}
import org.eclipse.jetty.http.HttpHeader
import org.eclipse.jetty.server._
import org.eclipse.jetty.server.handler._
import org.eclipse.jetty.server.handler.gzip.GzipHandler
import org.eclipse.jetty.servlet._
import org.eclipse.jetty.util.{Callback, UrlEncoded}
import org.eclipse.jetty.util.component.LifeCycle
import org.eclipse.jetty.util.thread.{QueuedThreadPool, ScheduledExecutorScheduler}
import org.json4s.JValue
Expand Down Expand Up @@ -149,6 +152,7 @@ private[spark] object JettyUtils extends Logging {
// Make sure we don't end up with "//" in the middle
val newUrl = new URL(new URL(request.getRequestURL.toString), prefixedDestPath).toString
response.sendRedirect(newUrl)
// Response.sendRedirect(request, response, callback, location)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please clean up this leftover.

}
// SPARK-5983 ensure TRACE is not supported
protected override def doTrace(req: HttpServletRequest, res: HttpServletResponse): Unit = {
Expand Down Expand Up @@ -209,12 +213,12 @@ private[spark] object JettyUtils extends Logging {

override def filterServerResponseHeader(
clientRequest: HttpServletRequest,
serverResponse: Response,
serverResponse: org.eclipse.jetty.client.Response,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use import statement. If there is a conflict, you can use Scala's import renaming feature like JMap usage.

headerName: String,
headerValue: String): String = {
if (headerName.equalsIgnoreCase("location")) {
val newHeader = createProxyLocationHeader(headerValue, clientRequest,
serverResponse.getRequest().getURI())
serverResponse.getRequest.getURI)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this change really? Otherwise, please revert all this kind of style change.

if (newHeader != null) {
return newHeader
}
Expand Down Expand Up @@ -259,7 +263,7 @@ private[spark] object JettyUtils extends Logging {

val errorHandler = new ErrorHandler()
errorHandler.setShowStacks(true)
errorHandler.setServer(server)
server.setErrorHandler(errorHandler);
server.addBean(errorHandler)

val collection = new ContextHandlerCollection
Expand Down Expand Up @@ -387,20 +391,19 @@ private[spark] object JettyUtils extends Logging {
private def createRedirectHttpsHandler(securePort: Int, scheme: String): ContextHandler = {
val redirectHandler: ContextHandler = new ContextHandler
redirectHandler.setContextPath("/")
redirectHandler.setVirtualHosts(toVirtualHosts(REDIRECT_CONNECTOR_NAME))
redirectHandler.setHandler(new AbstractHandler {
redirectHandler.setVirtualHosts(toVirtualHosts(REDIRECT_CONNECTOR_NAME).asJava)
redirectHandler.setHandler(new Handler.Abstract {
override def handle(
target: String,
baseRequest: Request,
request: HttpServletRequest,
response: HttpServletResponse): Unit = {
if (baseRequest.isSecure) {
return
}
val httpsURI = createRedirectURI(scheme, securePort, baseRequest)
response.setContentLength(0)
response.sendRedirect(response.encodeRedirectURL(httpsURI))
baseRequest.setHandled(true)
request: Request,
response: Response,
callback: Callback): Boolean = {
if (request.isSecure) return false
val httpsURI = createRedirectURI(scheme, securePort, request)
val responseHeaders = response.getHeaders
responseHeaders.put(HttpHeader.CONTENT_LENGTH, 0L)
val location = Response.toRedirectURI(request, httpsURI);
Response.sendRedirect(request, response, callback, location)
true
}
})
redirectHandler
Expand Down Expand Up @@ -455,7 +458,7 @@ private[spark] object JettyUtils extends Logging {
handler.addFilter(holder, "/*", EnumSet.allOf(classOf[DispatcherType]))
}

private def decodeURL(url: String, encoding: String): String = {
private def decodeURL(url: String, encoding: Charset): String = {
if (url == null) {
null
} else {
Expand All @@ -465,27 +468,20 @@ private[spark] object JettyUtils extends Logging {

// Create a new URI from the arguments, handling IPv6 host encoding and default ports.
private def createRedirectURI(scheme: String, port: Int, request: Request): String = {
val server = request.getServerName
val server = Request.getServerName(request)
val redirectServer = if (server.contains(":") && !server.startsWith("[")) {
s"[${server}]"
} else {
server
}
request.getHttpURI.getDecodedPath
val authority = s"$redirectServer:$port"
val queryEncoding = if (request.getQueryEncoding != null) {
request.getQueryEncoding
} else {
// By default decoding the URI as "UTF-8" should be enough for SparkUI
"UTF-8"
}
// The request URL can be raw or encoded here. To avoid the request URL being
// encoded twice, let's decode it here.
val requestURI = decodeURL(request.getRequestURI, queryEncoding)
val queryString = decodeURL(request.getQueryString, queryEncoding)
val requestURI = request.getHttpURI.getDecodedPath
val queryString = decodeURL(request.getHttpURI.getQuery, UrlEncoded.ENCODING)
new URI(scheme, authority, requestURI, queryString, null).toString
}

def toVirtualHosts(connectors: String*): Array[String] = connectors.map("@" + _).toArray
def toVirtualHosts(connectors: String*): List[String] = connectors.map("@" + _).toList

}

Expand All @@ -499,7 +495,7 @@ private[spark] case class ServerInfo(
def addHandler(
handler: ServletContextHandler,
securityMgr: SecurityManager): Unit = synchronized {
handler.setVirtualHosts(JettyUtils.toVirtualHosts(JettyUtils.SPARK_CONNECTOR_NAME))
handler.setVirtualHosts(JettyUtils.toVirtualHosts(JettyUtils.SPARK_CONNECTOR_NAME).asJava)
addFilters(handler, securityMgr)

val gzipHandler = new GzipHandler()
Expand All @@ -515,7 +511,7 @@ private[spark] case class ServerInfo(
def removeHandler(handler: ServletContextHandler): Unit = synchronized {
// Since addHandler() always adds a wrapping gzip handler, find the container handler
// and remove it.
rootHandler.getHandlers()
rootHandler.getHandlers.asScala
.find { h =>
h.isInstanceOf[GzipHandler] && h.asInstanceOf[GzipHandler].getHandler() == handler
}
Expand Down Expand Up @@ -579,6 +575,12 @@ private[spark] case class ServerInfo(

}

// private def getRedirectUrl(location: String): Unit = {
//
// val proxyUri = _proxyUri.stripSuffix("/")
//
// }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please clean up this left-over.

/**
* A Jetty handler to handle redirects to a proxy server. It intercepts redirects and rewrites the
* location to point to the proxy server.
Expand All @@ -588,36 +590,37 @@ private[spark] case class ServerInfo(
* a servlet context without the trailing slash (e.g. "/jobs") - Jetty will send a redirect to the
* same URL, but with a trailing slash.
*/
private class ProxyRedirectHandler(_proxyUri: String) extends HandlerWrapper {
private class ProxyRedirectHandler(_proxyUri: String) extends Handler.Wrapper {

private val proxyUri = _proxyUri.stripSuffix("/")

override def handle(
target: String,
baseRequest: Request,
request: HttpServletRequest,
response: HttpServletResponse): Unit = {
super.handle(target, baseRequest, request, new ResponseWrapper(request, response))
}

private class ResponseWrapper(
req: HttpServletRequest,
res: HttpServletResponse)
extends HttpServletResponseWrapper(res) {

override def sendRedirect(location: String): Unit = {
val newTarget = if (location != null) {
val target = new URI(location)
// The target path should already be encoded, so don't re-encode it, just the
// proxy address part.
val proxyBase = UIUtils.uiRoot(req)
val proxyPrefix = if (proxyBase.nonEmpty) s"$proxyUri$proxyBase" else proxyUri
s"${res.encodeURL(proxyPrefix)}${target.getPath()}"
} else {
null
}
super.sendRedirect(newTarget)
}
request: Request,
response: org.eclipse.jetty.server.Response,
callback: Callback): Boolean = {
// Todo: Fix the proxy redirect behaviour.
// super.handle(request, new ResponseWrapper(request, response), callback)
super.handle(request, response, callback)
}
//
// private class ResponseWrapper(
// req: Request,
// res: Response)
// extends Response.Wrapper(req, res) {
//
// override def sendRedirect(location: String): Unit = {
// val newTarget = if (location != null) {
// val target = new URI(location)
// // The target path should already be encoded, so don't re-encode it, just the
// // proxy address part.
// val proxyBase = UIUtils.uiRoot(req)
// val proxyPrefix = if (proxyBase.nonEmpty) s"$proxyUri$proxyBase" else proxyUri
// s"${res.encodeURL(proxyPrefix)}${target.getPath()}"
// } else {
// null
// }
// super.sendRedirect(newTarget)
// }
// }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this leftover.


}
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.ui
import java.util.Date

import jakarta.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
import org.eclipse.jetty.servlet.ServletContextHandler
import org.eclipse.jetty.ee10.servlet.ServletContextHandler

import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
import org.apache.spark.internal.Logging
Expand Down
Loading