| {driver.id} {killLink} |
{driver.submitDate} |
- {driver.worker.map(w => {w.id.toString}).getOrElse("None")}
+ | {driver.worker.map(w =>
+
+ {w.id.toString}
+ ).getOrElse("None")}
|
{driver.state} |
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index a0727ad83fb6..8cfd0f682932 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -17,6 +17,10 @@
package org.apache.spark.deploy.master.ui
+import scala.collection.mutable.HashMap
+
+import org.eclipse.jetty.servlet.ServletContextHandler
+
import org.apache.spark.deploy.master.Master
import org.apache.spark.internal.Logging
import org.apache.spark.ui.{SparkUI, WebUI}
@@ -34,6 +38,7 @@ class MasterWebUI(
val masterEndpointRef = master.self
val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)
+ private val proxyHandlers = new HashMap[String, ServletContextHandler]
initialize()
@@ -48,6 +53,17 @@ class MasterWebUI(
attachHandler(createRedirectHandler(
"/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = Set("POST")))
}
+
+ def addProxyTargets(id: String, target: String): Unit = {
+ var endTarget = target.stripSuffix("/")
+ val handler = createProxyHandler("/proxy/" + id, endTarget)
+ attachHandler(handler)
+ proxyHandlers(id) = handler
+ }
+
+ def removeProxyTargets(id: String): Unit = {
+ proxyHandlers.remove(id).foreach(detachHandler)
+ }
}
private[master] object MasterWebUI {
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 06066248ea5d..d4d8521cc820 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -156,7 +156,11 @@ private[deploy] class ExecutorRunner(
// Add webUI log urls
val baseUrl =
- s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
+ if (conf.getBoolean("spark.ui.reverseProxy", false)) {
+ s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="
+ } else {
+ s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
+ }
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 724206bf94c6..0bedd9a20a96 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -203,6 +203,9 @@ private[deploy] class Worker(
activeMasterWebUiUrl = uiUrl
master = Some(masterRef)
connected = true
+ if (conf.getBoolean("spark.ui.reverseProxy", false)) {
+ logInfo(s"WorkerWebUI is available at $activeMasterWebUiUrl/proxy/$workerId")
+ }
// Cancel any outstanding re-registration attempts because we found a new master
cancelLastRegistrationRetry()
}
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index 50283f2b74a4..24f3f757157f 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -25,6 +25,8 @@ import scala.collection.mutable.ArrayBuffer
import scala.language.implicitConversions
import scala.xml.Node
+import org.eclipse.jetty.client.api.Response
+import org.eclipse.jetty.proxy.ProxyServlet
import org.eclipse.jetty.server.{Request, Server, ServerConnector}
import org.eclipse.jetty.server.handler._
import org.eclipse.jetty.servlet._
@@ -186,6 +188,47 @@ private[spark] object JettyUtils extends Logging {
contextHandler
}
+ /** Create a handler for proxying request to Workers and Application Drivers */
+ def createProxyHandler(
+ prefix: String,
+ target: String): ServletContextHandler = {
+ val servlet = new ProxyServlet {
+ override def rewriteTarget(request: HttpServletRequest): String = {
+ val rewrittenURI = createProxyURI(
+ prefix, target, request.getRequestURI(), request.getQueryString())
+ if (rewrittenURI == null) {
+ return null
+ }
+ if (!validateDestination(rewrittenURI.getHost(), rewrittenURI.getPort())) {
+ return null
+ }
+ rewrittenURI.toString()
+ }
+
+ override def filterServerResponseHeader(
+ clientRequest: HttpServletRequest,
+ serverResponse: Response,
+ headerName: String,
+ headerValue: String): String = {
+ if (headerName.equalsIgnoreCase("location")) {
+ val newHeader = createProxyLocationHeader(
+ prefix, headerValue, clientRequest, serverResponse.getRequest().getURI())
+ if (newHeader != null) {
+ return newHeader
+ }
+ }
+ super.filterServerResponseHeader(
+ clientRequest, serverResponse, headerName, headerValue)
+ }
+ }
+
+ val contextHandler = new ServletContextHandler
+ val holder = new ServletHolder(servlet)
+ contextHandler.setContextPath(prefix)
+ contextHandler.addServlet(holder, "/")
+ contextHandler
+ }
+
/** Add filters, if any, to the given list of ServletContextHandlers */
def addFilters(handlers: Seq[ServletContextHandler], conf: SparkConf) {
val filters: Array[String] = conf.get("spark.ui.filters", "").split(',').map(_.trim())
@@ -332,6 +375,48 @@ private[spark] object JettyUtils extends Logging {
redirectHandler
}
+ def createProxyURI(prefix: String, target: String, path: String, query: String): URI = {
+ if (!path.startsWith(prefix)) {
+ return null
+ }
+
+ val uri = new StringBuilder(target)
+ val rest = path.substring(prefix.length())
+
+ if (!rest.isEmpty()) {
+ if (!rest.startsWith("/")) {
+ uri.append("/")
+ }
+ uri.append(rest)
+ }
+
+ val rewrittenURI = URI.create(uri.toString())
+ if (query != null) {
+ return new URI(
+ rewrittenURI.getScheme(),
+ rewrittenURI.getAuthority(),
+ rewrittenURI.getPath(),
+ query,
+ rewrittenURI.getFragment()
+ ).normalize()
+ }
+ rewrittenURI.normalize()
+ }
+
+ def createProxyLocationHeader(
+ prefix: String,
+ headerValue: String,
+ clientRequest: HttpServletRequest,
+ targetUri: URI): String = {
+ val toReplace = targetUri.getScheme() + "://" + targetUri.getAuthority()
+ if (headerValue.startsWith(toReplace)) {
+ clientRequest.getScheme() + "://" + clientRequest.getHeader("host") +
+ prefix + headerValue.substring(toReplace.length())
+ } else {
+ null
+ }
+ }
+
// Create a new URI from the arguments, handling IPv6 host encoding and default ports.
private def createRedirectURI(
scheme: String, server: String, port: Int, path: String, query: String) = {
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 2b6c538485c5..c0d1a2220f62 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -510,4 +510,16 @@ private[spark] object UIUtils extends Logging {
def getTimeZoneOffset() : Int =
TimeZone.getDefault().getOffset(System.currentTimeMillis()) / 1000 / 60
+
+ /**
+ * Return the correct Href after checking if master is running in the
+ * reverse proxy mode or not.
+ */
+ def makeHref(proxy: Boolean, id: String, origHref: String): String = {
+ if (proxy) {
+ s"/proxy/$id"
+ } else {
+ origHref
+ }
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 7cbe4e342eaa..831a7bcb1274 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -157,6 +157,33 @@ class MasterSuite extends SparkFunSuite
}
}
+ test("master/worker web ui available with reverseProxy") {
+ implicit val formats = org.json4s.DefaultFormats
+ val reverseProxyUrl = "http://localhost:8080"
+ val conf = new SparkConf()
+ conf.set("spark.ui.reverseProxy", "true")
+ conf.set("spark.ui.reverseProxyUrl", reverseProxyUrl)
+ val localCluster = new LocalSparkCluster(2, 2, 512, conf)
+ localCluster.start()
+ try {
+ eventually(timeout(5 seconds), interval(100 milliseconds)) {
+ val json = Source.fromURL(s"http://localhost:${localCluster.masterWebUIPort}/json")
+ .getLines().mkString("\n")
+ val JArray(workers) = (parse(json) \ "workers")
+ workers.size should be (2)
+ workers.foreach { workerSummaryJson =>
+ val JString(workerId) = workerSummaryJson \ "id"
+ val url = s"http://localhost:${localCluster.masterWebUIPort}/proxy/${workerId}/json"
+ val workerResponse = parse(Source.fromURL(url).getLines().mkString("\n"))
+ (workerResponse \ "cores").extract[Int] should be (2)
+ (workerResponse \ "masterwebuiurl").extract[String] should be (reverseProxyUrl)
+ }
+ }
+ } finally {
+ localCluster.stop()
+ }
+ }
+
test("basic scheduling - spread out") {
basicScheduling(spreadOut = true)
}
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 2b59b48d8bc9..dbb8dca4c8da 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -18,10 +18,13 @@
package org.apache.spark.ui
import java.net.{BindException, ServerSocket}
+import java.net.URI
+import javax.servlet.http.HttpServletRequest
import scala.io.Source
import org.eclipse.jetty.servlet.ServletContextHandler
+import org.mockito.Mockito.{mock, when}
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
@@ -190,6 +193,40 @@ class UISuite extends SparkFunSuite {
}
}
+ test("verify proxy rewrittenURI") {
+ val prefix = "/proxy/worker-id"
+ val target = "http://localhost:8081"
+ val path = "/proxy/worker-id/json"
+ var rewrittenURI = JettyUtils.createProxyURI(prefix, target, path, null)
+ assert(rewrittenURI.toString() === "http://localhost:8081/json")
+ rewrittenURI = JettyUtils.createProxyURI(prefix, target, path, "test=done")
+ assert(rewrittenURI.toString() === "http://localhost:8081/json?test=done")
+ rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-id", null)
+ assert(rewrittenURI.toString() === "http://localhost:8081")
+ rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-id/test%2F", null)
+ assert(rewrittenURI.toString() === "http://localhost:8081/test%2F")
+ rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-id/%F0%9F%98%84", null)
+ assert(rewrittenURI.toString() === "http://localhost:8081/%F0%9F%98%84")
+ rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-noid/json", null)
+ assert(rewrittenURI === null)
+ }
+
+ test("verify rewriting location header for reverse proxy") {
+ val clientRequest = mock(classOf[HttpServletRequest])
+ var headerValue = "http://localhost:4040/jobs"
+ val prefix = "/proxy/worker-id"
+ val targetUri = URI.create("http://localhost:4040")
+ when(clientRequest.getScheme()).thenReturn("http")
+ when(clientRequest.getHeader("host")).thenReturn("localhost:8080")
+ var newHeader = JettyUtils.createProxyLocationHeader(
+ prefix, headerValue, clientRequest, targetUri)
+ assert(newHeader.toString() === "http://localhost:8080/proxy/worker-id/jobs")
+ headerValue = "http://localhost:4041/jobs"
+ newHeader = JettyUtils.createProxyLocationHeader(
+ prefix, headerValue, clientRequest, targetUri)
+ assert(newHeader === null)
+ }
+
def stopServer(info: ServerInfo): Unit = {
if (info != null && info.server != null) info.server.stop
}
diff --git a/docs/configuration.md b/docs/configuration.md
index 2f801961050e..182b533ccaec 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -634,6 +634,20 @@ Apart from these, the following properties are also available, and may be useful
collecting.
|
+