Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 3 additions & 12 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ private[deploy] class Master(
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
if (reverseProxy) {
masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl)
webUi.addProxy()
logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and " +
s"Applications UIs are available at $masterWebUiUrl")
}
Expand Down Expand Up @@ -769,9 +770,6 @@ private[deploy] class Master(
workers += worker
idToWorker(worker.id) = worker
addressToWorker(workerAddress) = worker
if (reverseProxy) {
webUi.addProxyTargets(worker.id, worker.webUiAddress)
}
true
}

Expand All @@ -780,9 +778,7 @@ private[deploy] class Master(
worker.setState(WorkerState.DEAD)
idToWorker -= worker.id
addressToWorker -= worker.endpoint.address
if (reverseProxy) {
webUi.removeProxyTargets(worker.id)
}

for (exec <- worker.executors.values) {
logInfo("Telling app of lost executor: " + exec.id)
exec.application.driver.send(ExecutorUpdated(
Expand Down Expand Up @@ -844,9 +840,6 @@ private[deploy] class Master(
endpointToApp(app.driver) = app
addressToApp(appAddress) = app
waitingApps += app
if (reverseProxy) {
webUi.addProxyTargets(app.id, app.desc.appUiUrl)
}
}

private def finishApplication(app: ApplicationInfo) {
Expand All @@ -860,9 +853,7 @@ private[deploy] class Master(
idToApp -= app.id
endpointToApp -= app.driver
addressToApp -= app.driver.address
if (reverseProxy) {
webUi.removeProxyTargets(app.id)
}

if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach { a =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@

package org.apache.spark.deploy.master.ui

import scala.collection.mutable.HashMap

import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.Master
import org.apache.spark.internal.Logging
import org.apache.spark.ui.{SparkUI, WebUI}
Expand All @@ -38,7 +35,6 @@ class MasterWebUI(

val masterEndpointRef = master.self
val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)
private val proxyHandlers = new HashMap[String, ServletContextHandler]

initialize()

Expand All @@ -54,16 +50,19 @@ class MasterWebUI(
"/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = Set("POST")))
}

def addProxyTargets(id: String, target: String): Unit = {
val endTarget = target.stripSuffix("/")
val handler = createProxyHandler("/proxy/" + id, endTarget)
def addProxy(): Unit = {
val handler = createProxyHandler(idToUiAddress)
attachHandler(handler)
proxyHandlers(id) = handler
}

def removeProxyTargets(id: String): Unit = {
proxyHandlers.remove(id).foreach(detachHandler)
def idToUiAddress(id: String): Option[String] = {
val state = masterEndpointRef.askSync[MasterStateResponse](RequestMasterState)
val maybeWorkerUiAddress = state.workers.find(_.id == id).map(_.webUiAddress)
val maybeAppUiAddress = state.activeApps.find(_.id == id).map(_.desc.appUiUrl)

maybeWorkerUiAddress.orElse(maybeAppUiAddress)
}

}

private[master] object MasterWebUI {
Expand Down
45 changes: 24 additions & 21 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -194,28 +194,32 @@ private[spark] object JettyUtils extends Logging {
}

/** Create a handler for proxying request to Workers and Application Drivers */
def createProxyHandler(
prefix: String,
target: String): ServletContextHandler = {
def createProxyHandler(idToUiAddress: String => Option[String]): ServletContextHandler = {
val servlet = new ProxyServlet {
override def rewriteTarget(request: HttpServletRequest): String = {
val rewrittenURI = createProxyURI(
prefix, target, request.getRequestURI(), request.getQueryString())
if (rewrittenURI == null) {
return null
}
if (!validateDestination(rewrittenURI.getHost(), rewrittenURI.getPort())) {
return null
val path = request.getPathInfo
if (path == null) return null

val prefixTrailingSlashIndex = path.indexOf('/', 1)
val prefix = if (prefixTrailingSlashIndex == -1) {
path
} else {
path.substring(0, prefixTrailingSlashIndex)
}
rewrittenURI.toString()
val id = prefix.drop(1)

// Query master state for id's corresponding UI address
// If that address exists, turn it into a valid, target URI string or return null
idToUiAddress(id)
.map(createProxyURI(prefix, _, path, request.getQueryString))
.filter(uri => uri != null && validateDestination(uri.getHost, uri.getPort))
.map(_.toString)
.orNull
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add comments for the above code?

}

override def newHttpClient(): HttpClient = {
// SPARK-21176: Use the Jetty logic to calculate the number of selector threads (#CPUs/2),
// but limit it to 8 max.
// Otherwise, it might happen that we exhaust the threadpool since in reverse proxy mode
// a proxy is instantiated for each executor. If the head node has many processors, this
// can quickly add up to an unreasonably high number of threads.
val numSelectors = math.max(1, math.min(8, Runtime.getRuntime().availableProcessors() / 2))
new HttpClient(new HttpClientTransportOverHTTP(numSelectors), null)
}
Expand All @@ -226,8 +230,8 @@ private[spark] object JettyUtils extends Logging {
headerName: String,
headerValue: String): String = {
if (headerName.equalsIgnoreCase("location")) {
val newHeader = createProxyLocationHeader(
prefix, headerValue, clientRequest, serverResponse.getRequest().getURI())
val newHeader = createProxyLocationHeader(headerValue, clientRequest,
serverResponse.getRequest().getURI())
if (newHeader != null) {
return newHeader
}
Expand All @@ -239,8 +243,8 @@ private[spark] object JettyUtils extends Logging {

val contextHandler = new ServletContextHandler
val holder = new ServletHolder(servlet)
contextHandler.setContextPath(prefix)
contextHandler.addServlet(holder, "/")
contextHandler.setContextPath("/proxy")
contextHandler.addServlet(holder, "/*")
contextHandler
}

Expand Down Expand Up @@ -438,7 +442,7 @@ private[spark] object JettyUtils extends Logging {
val rest = path.substring(prefix.length())

if (!rest.isEmpty()) {
if (!rest.startsWith("/")) {
if (!rest.startsWith("/") && !uri.endsWith("/")) {
uri.append("/")
}
uri.append(rest)
Expand All @@ -458,14 +462,13 @@ private[spark] object JettyUtils extends Logging {
}

def createProxyLocationHeader(
prefix: String,
headerValue: String,
clientRequest: HttpServletRequest,
targetUri: URI): String = {
val toReplace = targetUri.getScheme() + "://" + targetUri.getAuthority()
if (headerValue.startsWith(toReplace)) {
clientRequest.getScheme() + "://" + clientRequest.getHeader("host") +
prefix + headerValue.substring(toReplace.length())
clientRequest.getPathInfo() + headerValue.substring(toReplace.length())
} else {
null
}
Expand Down
20 changes: 9 additions & 11 deletions core/src/test/scala/org/apache/spark/ui/UISuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -200,36 +200,34 @@ class UISuite extends SparkFunSuite {
}

test("verify proxy rewrittenURI") {
val prefix = "/proxy/worker-id"
val prefix = "/worker-id"
val target = "http://localhost:8081"
val path = "/proxy/worker-id/json"
val path = "/worker-id/json"
var rewrittenURI = JettyUtils.createProxyURI(prefix, target, path, null)
assert(rewrittenURI.toString() === "http://localhost:8081/json")
rewrittenURI = JettyUtils.createProxyURI(prefix, target, path, "test=done")
assert(rewrittenURI.toString() === "http://localhost:8081/json?test=done")
rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-id", null)
rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/worker-id", null)
assert(rewrittenURI.toString() === "http://localhost:8081")
rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-id/test%2F", null)
rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/worker-id/test%2F", null)
assert(rewrittenURI.toString() === "http://localhost:8081/test%2F")
rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-id/%F0%9F%98%84", null)
rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/worker-id/%F0%9F%98%84", null)
assert(rewrittenURI.toString() === "http://localhost:8081/%F0%9F%98%84")
rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-noid/json", null)
rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/worker-noid/json", null)
assert(rewrittenURI === null)
}

test("verify rewriting location header for reverse proxy") {
val clientRequest = mock(classOf[HttpServletRequest])
var headerValue = "http://localhost:4040/jobs"
val prefix = "/proxy/worker-id"
val targetUri = URI.create("http://localhost:4040")
when(clientRequest.getScheme()).thenReturn("http")
when(clientRequest.getHeader("host")).thenReturn("localhost:8080")
var newHeader = JettyUtils.createProxyLocationHeader(
prefix, headerValue, clientRequest, targetUri)
when(clientRequest.getPathInfo()).thenReturn("/proxy/worker-id")
var newHeader = JettyUtils.createProxyLocationHeader(headerValue, clientRequest, targetUri)
assert(newHeader.toString() === "http://localhost:8080/proxy/worker-id/jobs")
headerValue = "http://localhost:4041/jobs"
newHeader = JettyUtils.createProxyLocationHeader(
prefix, headerValue, clientRequest, targetUri)
newHeader = JettyUtils.createProxyLocationHeader(headerValue, clientRequest, targetUri)
assert(newHeader === null)
}

Expand Down