Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,16 @@
<artifactId>jetty-servlet</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-proxy</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These new dependencies need to be added to the copy-dependencies invocation later in this file.

<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-client</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlets</artifactId>
Expand Down Expand Up @@ -389,7 +399,7 @@
<overWriteIfNewer>true</overWriteIfNewer>
<useSubDirectoryPerType>true</useSubDirectoryPerType>
<includeArtifactIds>
guava,jetty-io,jetty-servlet,jetty-servlets,jetty-continuation,jetty-http,jetty-plus,jetty-util,jetty-server,jetty-security
guava,jetty-io,jetty-servlet,jetty-servlets,jetty-continuation,jetty-http,jetty-plus,jetty-util,jetty-server,jetty-security,jetty-proxy,jetty-client
</includeArtifactIds>
<silent>true</silent>
</configuration>
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_applicationId = _taskScheduler.applicationId()
_applicationAttemptId = taskScheduler.applicationAttemptId()
_conf.set("spark.app.id", _applicationId)
if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
}
_ui.foreach(_.setAppId(_applicationId))
_env.blockManager.initialize(_applicationId)

Expand Down
18 changes: 18 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ private[deploy] class Master(

// Default maxCores for applications that don't specify it (i.e. pass Int.MaxValue)
private val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
val reverseProxy = conf.getBoolean("spark.ui.reverseProxy", false)
if (defaultCores < 1) {
throw new SparkException("spark.deploy.defaultCores must be positive")
}
Expand All @@ -129,6 +130,11 @@ private[deploy] class Master(
webUi = new MasterWebUI(this, webUiPort)
webUi.bind()
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
if (reverseProxy) {
masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl)
logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and " +
s"Applications UIs are available at $masterWebUiUrl")
}
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerTimeOut)
Expand Down Expand Up @@ -755,6 +761,9 @@ private[deploy] class Master(
workers += worker
idToWorker(worker.id) = worker
addressToWorker(workerAddress) = worker
if (reverseProxy) {
webUi.addProxyTargets(worker.id, worker.webUiAddress)
}
true
}

Expand All @@ -763,6 +772,9 @@ private[deploy] class Master(
worker.setState(WorkerState.DEAD)
idToWorker -= worker.id
addressToWorker -= worker.endpoint.address
if (reverseProxy) {
webUi.removeProxyTargets(worker.id)
}
for (exec <- worker.executors.values) {
logInfo("Telling app of lost executor: " + exec.id)
exec.application.driver.send(ExecutorUpdated(
Expand Down Expand Up @@ -810,6 +822,9 @@ private[deploy] class Master(
endpointToApp(app.driver) = app
addressToApp(appAddress) = app
waitingApps += app
if (reverseProxy) {
webUi.addProxyTargets(app.id, app.desc.appUiUrl)
}
}

private def finishApplication(app: ApplicationInfo) {
Expand All @@ -823,6 +838,9 @@ private[deploy] class Master(
idToApp -= app.id
endpointToApp -= app.driver
addressToApp -= app.driver.address
if (reverseProxy) {
webUi.removeProxyTargets(app.id)
}
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach { a =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
<li><strong>State:</strong> {app.state}</li>
{
if (!app.isFinished) {
<li><strong><a href={app.desc.appUiUrl}>Application Detail UI</a></strong></li>
<li><strong>
<a href={UIUtils.makeHref(parent.master.reverseProxy,
app.id, app.desc.appUiUrl)}>Application Detail UI</a>
</strong></li>
}
}
</ul>
Expand All @@ -100,19 +103,21 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
}

private def executorRow(executor: ExecutorDesc): Seq[Node] = {
val workerUrlRef = UIUtils.makeHref(parent.master.reverseProxy,
executor.worker.id, executor.worker.webUiAddress)
<tr>
<td>{executor.id}</td>
<td>
<a href={executor.worker.webUiAddress}>{executor.worker.id}</a>
<a href={workerUrlRef}>{executor.worker.id}</a>
</td>
<td>{executor.cores}</td>
<td>{executor.memory}</td>
<td>{executor.state}</td>
<td>
<a href={"%s/logPage?appId=%s&executorId=%s&logType=stdout"
.format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stdout</a>
.format(workerUrlRef, executor.application.id, executor.id)}>stdout</a>
<a href={"%s/logPage?appId=%s&executorId=%s&logType=stderr"
.format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stderr</a>
.format(workerUrlRef, executor.application.id, executor.id)}>stderr</a>
</td>
</tr>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
private def workerRow(worker: WorkerInfo): Seq[Node] = {
<tr>
<td>
<a href={worker.webUiAddress}>{worker.id}</a>
<a href={UIUtils.makeHref(parent.master.reverseProxy,
worker.id, worker.webUiAddress)}>{worker.id}</a>
</td>
<td>{worker.host}:{worker.port}</td>
<td>{worker.state}</td>
Expand Down Expand Up @@ -210,7 +211,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
if (app.isFinished) {
app.desc.name
} else {
<a href={app.desc.appUiUrl}>{app.desc.name}</a>
<a href={UIUtils.makeHref(parent.master.reverseProxy,
app.id, app.desc.appUiUrl)}>{app.desc.name}</a>
}
}
</td>
Expand Down Expand Up @@ -244,7 +246,11 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
<tr>
<td>{driver.id} {killLink}</td>
<td>{driver.submitDate}</td>
<td>{driver.worker.map(w => <a href={w.webUiAddress}>{w.id.toString}</a>).getOrElse("None")}
<td>{driver.worker.map(w =>
<a href=
{UIUtils.makeHref(parent.master.reverseProxy, w.id, w.webUiAddress)}>
{w.id.toString}</a>
).getOrElse("None")}
</td>
<td>{driver.state}</td>
<td sorttable_customkey={driver.desc.cores.toString}>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark.deploy.master.ui

import scala.collection.mutable.HashMap

import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.deploy.master.Master
import org.apache.spark.internal.Logging
import org.apache.spark.ui.{SparkUI, WebUI}
Expand All @@ -34,6 +38,7 @@ class MasterWebUI(

val masterEndpointRef = master.self
val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)
private val proxyHandlers = new HashMap[String, ServletContextHandler]

initialize()

Expand All @@ -48,6 +53,17 @@ class MasterWebUI(
attachHandler(createRedirectHandler(
"/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = Set("POST")))
}

def addProxyTargets(id: String, target: String): Unit = {
var endTarget = target.stripSuffix("/")
val handler = createProxyHandler("/proxy/" + id, endTarget)
attachHandler(handler)
proxyHandlers(id) = handler
}

def removeProxyTargets(id: String): Unit = {
proxyHandlers.remove(id).foreach(detachHandler)
}
}

private[master] object MasterWebUI {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,11 @@ private[deploy] class ExecutorRunner(

// Add webUI log urls
val baseUrl =
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
if (conf.getBoolean("spark.ui.reverseProxy", false)) {
s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="
} else {
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
}
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ private[deploy] class Worker(
activeMasterWebUiUrl = uiUrl
master = Some(masterRef)
connected = true
if (conf.getBoolean("spark.ui.reverseProxy", false)) {
logInfo(s"WorkerWebUI is available at $activeMasterWebUiUrl/proxy/$workerId")
}
// Cancel any outstanding re-registration attempts because we found a new master
cancelLastRegistrationRetry()
}
Expand Down
85 changes: 85 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import scala.collection.mutable.ArrayBuffer
import scala.language.implicitConversions
import scala.xml.Node

import org.eclipse.jetty.client.api.Response
import org.eclipse.jetty.proxy.ProxyServlet
import org.eclipse.jetty.server.{Request, Server, ServerConnector}
import org.eclipse.jetty.server.handler._
import org.eclipse.jetty.servlet._
Expand Down Expand Up @@ -186,6 +188,47 @@ private[spark] object JettyUtils extends Logging {
contextHandler
}

/** Create a handler for proxying request to Workers and Application Drivers */
def createProxyHandler(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Always a little scary to see string manipulation code with no tests.

Copy link
Author

@gurvindersingh gurvindersingh Jun 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might have overlooked the tests for other handlers, can you point me to them if there is any. I can add test for this handler too then. Of course I can break the string manipulation part in separate function and add test for that. let me know which is preferable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to targeted tests of string manipulation code.

Move the string manipulation part into its own method, and write targeted unit tests for it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a test under UISuite, let me know if that is not the right place for it.

prefix: String,
target: String): ServletContextHandler = {
val servlet = new ProxyServlet {
override def rewriteTarget(request: HttpServletRequest): String = {
val rewrittenURI = createProxyURI(
prefix, target, request.getRequestURI(), request.getQueryString())
if (rewrittenURI == null) {
return null
}
if (!validateDestination(rewrittenURI.getHost(), rewrittenURI.getPort())) {
return null
}
rewrittenURI.toString()
}

override def filterServerResponseHeader(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add empty line between methods

clientRequest: HttpServletRequest,
serverResponse: Response,
headerName: String,
headerValue: String): String = {
if (headerName.equalsIgnoreCase("location")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would also be nice to have targeted unit tests for this code.

val newHeader = createProxyLocationHeader(
prefix, headerValue, clientRequest, serverResponse.getRequest().getURI())
if (newHeader != null) {
return newHeader
}
}
super.filterServerResponseHeader(
clientRequest, serverResponse, headerName, headerValue)
}
}

val contextHandler = new ServletContextHandler
val holder = new ServletHolder(servlet)
contextHandler.setContextPath(prefix)
contextHandler.addServlet(holder, "/")
contextHandler
}

/** Add filters, if any, to the given list of ServletContextHandlers */
def addFilters(handlers: Seq[ServletContextHandler], conf: SparkConf) {
val filters: Array[String] = conf.get("spark.ui.filters", "").split(',').map(_.trim())
Expand Down Expand Up @@ -332,6 +375,48 @@ private[spark] object JettyUtils extends Logging {
redirectHandler
}

def createProxyURI(prefix: String, target: String, path: String, query: String): URI = {
if (!path.startsWith(prefix)) {
return null
}

val uri = new StringBuilder(target)
val rest = path.substring(prefix.length())

if (!rest.isEmpty()) {
if (!rest.startsWith("/")) {
uri.append("/")
}
uri.append(rest)
}

val rewrittenURI = URI.create(uri.toString())
if (query != null) {
return new URI(
rewrittenURI.getScheme(),
rewrittenURI.getAuthority(),
rewrittenURI.getPath(),
query,
rewrittenURI.getFragment()
).normalize()
}
rewrittenURI.normalize()
}

def createProxyLocationHeader(
prefix: String,
headerValue: String,
clientRequest: HttpServletRequest,
targetUri: URI): String = {
val toReplace = targetUri.getScheme() + "://" + targetUri.getAuthority()
if (headerValue.startsWith(toReplace)) {
clientRequest.getScheme() + "://" + clientRequest.getHeader("host") +
prefix + headerValue.substring(toReplace.length())
} else {
null
}
}

// Create a new URI from the arguments, handling IPv6 host encoding and default ports.
private def createRedirectURI(
scheme: String, server: String, port: Int, path: String, query: String) = {
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -510,4 +510,16 @@ private[spark] object UIUtils extends Logging {

def getTimeZoneOffset() : Int =
TimeZone.getDefault().getOffset(System.currentTimeMillis()) / 1000 / 60

/**
* Return the correct Href after checking if master is running in the
* reverse proxy mode or not.
*/
def makeHref(proxy: Boolean, id: String, origHref: String): String = {
if (proxy) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to previous comment.

s"/proxy/$id"
} else {
origHref
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,33 @@ class MasterSuite extends SparkFunSuite
}
}

test("master/worker web ui available with reverseProxy") {
implicit val formats = org.json4s.DefaultFormats
val reverseProxyUrl = "http://localhost:8080"
val conf = new SparkConf()
conf.set("spark.ui.reverseProxy", "true")
conf.set("spark.ui.reverseProxyUrl", reverseProxyUrl)
val localCluster = new LocalSparkCluster(2, 2, 512, conf)
localCluster.start()
try {
eventually(timeout(5 seconds), interval(100 milliseconds)) {
val json = Source.fromURL(s"http://localhost:${localCluster.masterWebUIPort}/json")
.getLines().mkString("\n")
val JArray(workers) = (parse(json) \ "workers")
workers.size should be (2)
workers.foreach { workerSummaryJson =>
val JString(workerId) = workerSummaryJson \ "id"
val url = s"http://localhost:${localCluster.masterWebUIPort}/proxy/${workerId}/json"
val workerResponse = parse(Source.fromURL(url).getLines().mkString("\n"))
(workerResponse \ "cores").extract[Int] should be (2)
(workerResponse \ "masterwebuiurl").extract[String] should be (reverseProxyUrl)
}
}
} finally {
localCluster.stop()
}
}

test("basic scheduling - spread out") {
basicScheduling(spreadOut = true)
}
Expand Down
Loading