Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -604,3 +604,18 @@ spark.yarn.am.extraJavaOptions -Dsun.security.krb5.debug=true -Dsun.security.spn

Finally, if the log level for `org.apache.spark.deploy.yarn.Client` is set to `DEBUG`, the log
will include a list of all tokens obtained, and their expiry details

## Using the Spark History Server to replace the Spark Web UI

It is possible to use the Spark History Server application page as the tracking URL for running
applications when the application UI is disabled. This may be desirable on secure clusters, or to
reduce the memory usage of the Spark driver. To set up tracking through the Spark History Server,
do the following:

- On the application side, set <code>spark.yarn.historyServer.allowTracking=true</code> in Spark's
configuration. This will tell Spark to use the history server's URL as the tracking URL if
the application's UI is disabled.
- On the Spark History Server, add <code>org.apache.spark.deploy.yarn.YarnProxyRedirectFilter</code>
to the list of filters in the <code>spark.ui.filters</code> configuration.

Be aware that the history server information may not be up-to-date with the application's state.
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ private[spark] class ApplicationMaster(
_sparkConf: SparkConf,
_rpcEnv: RpcEnv,
driverRef: RpcEndpointRef,
uiAddress: String,
uiAddress: Option[String],
securityMgr: SecurityManager) = {
val appId = client.getAttemptId().getApplicationId().toString()
val attemptId = client.getAttemptId().getAttemptId().toString()
Expand Down Expand Up @@ -408,8 +408,7 @@ private[spark] class ApplicationMaster(
sc.getConf.get("spark.driver.host"),
sc.getConf.get("spark.driver.port"),
isClusterMode = true)
registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl).getOrElse(""),
securityMgr)
registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), securityMgr)
} else {
// Sanity check; should never happen in normal operation, since sc should only be null
// if the user app did not create a SparkContext.
Expand All @@ -435,7 +434,7 @@ private[spark] class ApplicationMaster(
clientMode = true)
val driverRef = waitForSparkDriver()
addAmIpFilter()
registerAM(sparkConf, rpcEnv, driverRef, sparkConf.get("spark.driver.appUIAddress", ""),
registerAM(sparkConf, rpcEnv, driverRef, sparkConf.getOption("spark.driver.appUIAddress"),
securityMgr)

// In client mode the actor will stop the reporter thread.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.yarn

import javax.servlet._
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}

import org.apache.spark.internal.Logging

/**
* A filter to be used in the Spark History Server for redirecting YARN proxy requests to the
* main SHS address. This is useful for applications that are using the history server as the
* tracking URL, since the SHS-generated pages cannot be rendered in that case without extra
* configuration to set up a proxy base URI (meaning the SHS cannot be ever used directly).
*/
class YarnProxyRedirectFilter extends Filter with Logging {

import YarnProxyRedirectFilter._

override def destroy(): Unit = { }

override def init(config: FilterConfig): Unit = { }

override def doFilter(req: ServletRequest, res: ServletResponse, chain: FilterChain): Unit = {
val hreq = req.asInstanceOf[HttpServletRequest]

// The YARN proxy will send a request with the "proxy-user" cookie set to the YARN's client
// user name. We don't expect any other clients to set this cookie, since the SHS does not
// use cookies for anything.
Option(hreq.getCookies()).flatMap(_.find(_.getName() == COOKIE_NAME)) match {
case Some(_) =>
doRedirect(hreq, res.asInstanceOf[HttpServletResponse])

case _ =>
chain.doFilter(req, res)
}
}

private def doRedirect(req: HttpServletRequest, res: HttpServletResponse): Unit = {
val redirect = req.getRequestURL().toString()

// Need a client-side redirect instead of an HTTP one, otherwise the YARN proxy itself
// will handle the redirect and get into an infinite loop.
val content = s"""
|<html xmlns="http://www.w3.org/1999/xhtml">
|<head>
| <title>Spark History Server Redirect</title>
| <meta http-equiv="refresh" content="0;URL='$redirect'" />
|</head>
|<body>
| <p>The requested page can be found at: <a href="$redirect">$redirect</a>.</p>
|</body>
|</html>
""".stripMargin

logDebug(s"Redirecting YARN proxy request to $redirect.")
res.setStatus(HttpServletResponse.SC_OK)
res.setContentType("text/html")
res.getWriter().write(content)
}

}

private[spark] object YarnProxyRedirectFilter {
val COOKIE_NAME = "proxy-user"
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private[spark] class YarnRMClient extends Logging {
driverRef: RpcEndpointRef,
conf: YarnConfiguration,
sparkConf: SparkConf,
uiAddress: String,
uiAddress: Option[String],
uiHistoryAddress: String,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource]
Expand All @@ -65,9 +65,13 @@ private[spark] class YarnRMClient extends Logging {
amClient.start()
this.uiHistoryAddress = uiHistoryAddress

val trackingUrl = uiAddress.getOrElse {
if (sparkConf.get(ALLOW_HISTORY_SERVER_TRACKING_URL)) uiHistoryAddress else ""
}

logInfo("Registering the ApplicationMaster")
synchronized {
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
amClient.registerApplicationMaster(Utils.localHostName(), 0, trackingUrl)
registered = true
}
new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ package object config {
.stringConf
.createOptional

private[spark] val ALLOW_HISTORY_SERVER_TRACKING_URL =
ConfigBuilder("spark.yarn.historyServer.allowTracking")
.doc("Allow using the History Server URL for the application as the tracking URL for the " +
"application when the Web UI is not enabled.")
.booleanConf
.createWithDefault(false)

/* File distribution. */

private[spark] val SPARK_ARCHIVE = ConfigBuilder("spark.yarn.archive")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.yarn

import java.io.{PrintWriter, StringWriter}
import javax.servlet.FilterChain
import javax.servlet.http.{Cookie, HttpServletRequest, HttpServletResponse}

import org.mockito.Mockito._

import org.apache.spark.SparkFunSuite

class YarnProxyRedirectFilterSuite extends SparkFunSuite {

test("redirect proxied requests, pass-through others") {
val requestURL = "http://example.com:1234/foo?"
val filter = new YarnProxyRedirectFilter()
val cookies = Array(new Cookie(YarnProxyRedirectFilter.COOKIE_NAME, "dr.who"))

val req = mock(classOf[HttpServletRequest])

// First request mocks a YARN proxy request (with the cookie set), second one has no cookies.
when(req.getCookies()).thenReturn(cookies, null)
when(req.getRequestURL()).thenReturn(new StringBuffer(requestURL))

val res = mock(classOf[HttpServletResponse])
when(res.getWriter()).thenReturn(new PrintWriter(new StringWriter()))

val chain = mock(classOf[FilterChain])

// First request is proxied.
filter.doFilter(req, res, chain)
verify(chain, never()).doFilter(req, res)

// Second request is not, so should invoke the filter chain.
filter.doFilter(req, res, chain)
verify(chain, times(1)).doFilter(req, res)
}

}