Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,22 +58,23 @@ private[spark] class SparkUI private (

val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false)


val stagesTab = new StagesTab(this)

var appId: String = _

/** Initialize all components of the server. */
def initialize() {
attachTab(new JobsTab(this))
val jobsTab = new JobsTab(this)
attachTab(jobsTab)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #15603 which reminds me that these two tabs don't need to be members. They can be locals. You can make the change for both here.

val stagesTab = new StagesTab(this)
attachTab(stagesTab)
attachTab(new StorageTab(this))
attachTab(new EnvironmentTab(this))
attachTab(new ExecutorsTab(this))
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath))
attachHandler(ApiRootResource.getServletHandler(this))
// This should be POST only, but, the YARN AM proxy won't proxy POSTs
// These should be POST only, but, the YARN AM proxy won't proxy POSTs
attachHandler(createRedirectHandler(
"/jobs/job/kill", "/jobs/", jobsTab.handleKillRequest, httpMethods = Set("GET", "POST")))
attachHandler(createRedirectHandler(
"/stages/stage/kill", "/stages/", stagesTab.handleKillRequest,
httpMethods = Set("GET", "POST")))
Expand Down
34 changes: 29 additions & 5 deletions core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
request: HttpServletRequest,
tableHeaderId: String,
jobTag: String,
jobs: Seq[JobUIData]): Seq[Node] = {
jobs: Seq[JobUIData],
killEnabled: Boolean): Seq[Node] = {
val allParameters = request.getParameterMap.asScala.toMap
val parameterOtherTable = allParameters.filterNot(_._1.startsWith(jobTag))
.map(para => para._1 + "=" + para._2(0))
Expand Down Expand Up @@ -264,6 +265,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
parameterOtherTable,
parent.jobProgresslistener.stageIdToInfo,
parent.jobProgresslistener.stageIdToData,
killEnabled,
currentTime,
jobIdTitle,
pageSize = jobPageSize,
Expand All @@ -290,9 +292,12 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
val completedJobs = listener.completedJobs.reverse.toSeq
val failedJobs = listener.failedJobs.reverse.toSeq

val activeJobsTable = jobsTable(request, "active", "activeJob", activeJobs)
val completedJobsTable = jobsTable(request, "completed", "completedJob", completedJobs)
val failedJobsTable = jobsTable(request, "failed", "failedJob", failedJobs)
val activeJobsTable =
jobsTable(request, "active", "activeJob", activeJobs, killEnabled = parent.killEnabled)
val completedJobsTable =
jobsTable(request, "completed", "completedJob", completedJobs, killEnabled = false)
val failedJobsTable =
jobsTable(request, "failed", "failedJob", failedJobs, killEnabled = false)

val shouldShowActiveJobs = activeJobs.nonEmpty
val shouldShowCompletedJobs = completedJobs.nonEmpty
Expand Down Expand Up @@ -483,6 +488,7 @@ private[ui] class JobPagedTable(
parameterOtherTable: Iterable[String],
stageIdToInfo: HashMap[Int, StageInfo],
stageIdToData: HashMap[(Int, Int), StageUIData],
killEnabled: Boolean,
currentTime: Long,
jobIdTitle: String,
pageSize: Int,
Expand Down Expand Up @@ -586,12 +592,30 @@ private[ui] class JobPagedTable(
override def row(jobTableRow: JobTableRowData): Seq[Node] = {
val job = jobTableRow.jobData

val killLink = if (killEnabled) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, does this work? there's no 'else' so this becomes an Any and I don't know what happens below when it's rendered as a string. Should it be an empty string otherwise?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, since it's supposed to be html I'll add a else Seq.empty to each instance of this

val confirm =
s"if (window.confirm('Are you sure you want to kill job ${job.jobId} ?')) " +
"{ this.parentNode.submit(); return true; } else { return false; }"
// SPARK-6846 this should be POST-only but YARN AM won't proxy POST
/*
val killLinkUri = s"$basePathUri/jobs/job/kill/"
<form action={killLinkUri} method="POST" style="display:inline">
<input type="hidden" name="id" value={job.jobId.toString}/>
<a href="#" onclick={confirm} class="kill-link">(kill)</a>
</form>
*/
val killLinkUri = s"$basePath/jobs/job/kill/?id=${job.jobId}"
<a href={killLinkUri} onclick={confirm} class="kill-link">(kill)</a>
} else {
Seq.empty
}

<tr id={"job-" + job.jobId}>
<td>
{job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")}
</td>
<td>
{jobTableRow.jobDescription}
{jobTableRow.jobDescription} {killLink}
<a href={jobTableRow.detailUrl} class="name-link">{jobTableRow.lastStageName}</a>
</td>
<td>
Expand Down
17 changes: 17 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.ui.jobs

import javax.servlet.http.HttpServletRequest

import org.apache.spark.scheduler.SchedulingMode
import org.apache.spark.ui.{SparkUI, SparkUITab}

Expand All @@ -35,4 +37,19 @@ private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") {

attachPage(new AllJobsPage(this))
attachPage(new JobPage(this))

def handleKillRequest(request: HttpServletRequest): Unit = {
if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) {
val jobId = Option(request.getParameter("id")).map(_.toInt)
jobId.foreach { id =>
if (jobProgresslistener.activeJobs.contains(id)) {
sc.foreach(_.cancelJob(id))
// Do a quick pause here to give Spark time to kill the job so it shows up as
// killed after the refresh. Note that this will block the serving thread so the
// time should be limited in duration.
Thread.sleep(100)
Copy link
Contributor

@markhamstra markhamstra Oct 12, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're doing the sleep here even if sc is None, but I wouldn't expect that that is even possible or that it is handled completely correctly elsewhere in the Web UI if it actually is possible, so this is likely fine in practice.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was my thinking, that in the case of sc being None that this code shouldn't be able to be called. Plus by moving the sleep into the if statement it at least only run when an actual kill is attempted.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, in both this and the kill-stage endpoint I wonder why we have a 'terminate' flag at all. What would it mean to call this with terminate=false? it seems like this can be checked once, if at all, at the start of the method. Or just removed, if I'm not missing something.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just did some looking into the git history and thats left over from when there wasn't a separate /kill endpoint, so I'll remove that logic in my next commit

}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating an Option only to immediately get the value out of it is poor style, and unnecessary.

val jobId = Option(request.getParameter("id"))
jobId.foreach { id =>
  if (killFlag && jobProgresslistener.activeJobs.contains(id)) {
    sc.get.cancelJob(id)
  }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, there is no need for sc.get. In fact, that's a bug if parent.sc really should be an Option and thus could be None.

sc.foreach(_.cancelJob(id))

Copy link
Contributor

@markhamstra markhamstra Oct 12, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if the Job isn't actually going to be canceled, then there is no need to delay the page refresh (which I'm not entirely happy with, but I'm not going to try to resolve that issue right now.) So...

sc.foreach { sparkContext =>
  sparkContext.cancelJob(id)
  Thread.sleep(100)
}

And we better make that jobId an Option[Int] instead of an Option[String], so...

val jobId = Option(request.getParameter("id")).map(_.toInt)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied this code from StagesTab.scala so I'll fix these issues for both in the morning

}
}
}
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -353,12 +353,13 @@ private[ui] class StagePagedTable(
val killLinkUri = s"$basePathUri/stages/stage/kill/"
<form action={killLinkUri} method="POST" style="display:inline">
<input type="hidden" name="id" value={s.stageId.toString}/>
<input type="hidden" name="terminate" value="true"/>
<a href="#" onclick={confirm} class="kill-link">(kill)</a>
</form>
*/
val killLinkUri = s"$basePathUri/stages/stage/kill/?id=${s.stageId}&terminate=true"
val killLinkUri = s"$basePathUri/stages/stage/kill/?id=${s.stageId}"
<a href={killLinkUri} onclick={confirm} class="kill-link">(kill)</a>
} else {
Seq.empty
}

val nameLinkUri = s"$basePathUri/stages/stage?id=${s.stageId}&attempt=${s.attemptId}"
Expand Down
17 changes: 9 additions & 8 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,16 @@ private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages"

def handleKillRequest(request: HttpServletRequest): Unit = {
if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) {
val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt
if (stageId >= 0 && killFlag && progressListener.activeStages.contains(stageId)) {
sc.get.cancelStage(stageId)
val stageId = Option(request.getParameter("id")).map(_.toInt)
stageId.foreach { id =>
if (progressListener.activeStages.contains(id)) {
sc.foreach(_.cancelStage(id))
// Do a quick pause here to give Spark time to kill the stage so it shows up as
// killed after the refresh. Note that this will block the serving thread so the
// time should be limited in duration.
Thread.sleep(100)
}
}
// Do a quick pause here to give Spark time to kill the stage so it shows up as
// killed after the refresh. Note that this will block the serving thread so the
// time should be limited in duration.
Thread.sleep(100)
}
}

Expand Down
47 changes: 39 additions & 8 deletions core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,22 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync()
}

withSpark(newSparkContext(killEnabled = true)) { sc =>
runSlowJob(sc)
eventually(timeout(5 seconds), interval(50 milliseconds)) {
goToUi(sc, "/jobs")
assert(hasKillLink)
}
}

withSpark(newSparkContext(killEnabled = false)) { sc =>
runSlowJob(sc)
eventually(timeout(5 seconds), interval(50 milliseconds)) {
goToUi(sc, "/jobs")
assert(!hasKillLink)
}
}

withSpark(newSparkContext(killEnabled = true)) { sc =>
runSlowJob(sc)
eventually(timeout(5 seconds), interval(50 milliseconds)) {
Expand Down Expand Up @@ -453,20 +469,24 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
}

test("kill stage POST/GET response is correct") {
def getResponseCode(url: URL, method: String): Int = {
val connection = url.openConnection().asInstanceOf[HttpURLConnection]
connection.setRequestMethod(method)
connection.connect()
val code = connection.getResponseCode()
connection.disconnect()
code
withSpark(newSparkContext(killEnabled = true)) { sc =>
sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync()
eventually(timeout(5 seconds), interval(50 milliseconds)) {
val url = new URL(
sc.ui.get.appUIAddress.stripSuffix("/") + "/stages/stage/kill/?id=0")
// SPARK-6846: should be POST only but YARN AM doesn't proxy POST
getResponseCode(url, "GET") should be (200)
getResponseCode(url, "POST") should be (200)
}
}
}

test("kill job POST/GET response is correct") {
withSpark(newSparkContext(killEnabled = true)) { sc =>
sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync()
eventually(timeout(5 seconds), interval(50 milliseconds)) {
val url = new URL(
sc.ui.get.appUIAddress.stripSuffix("/") + "/stages/stage/kill/?id=0&terminate=true")
sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs/job/kill/?id=0")
// SPARK-6846: should be POST only but YARN AM doesn't proxy POST
getResponseCode(url, "GET") should be (200)
getResponseCode(url, "POST") should be (200)
Expand Down Expand Up @@ -651,6 +671,17 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
}
}

def getResponseCode(url: URL, method: String): Int = {
val connection = url.openConnection().asInstanceOf[HttpURLConnection]
connection.setRequestMethod(method)
try {
connection.connect()
connection.getResponseCode()
} finally {
connection.disconnect()
}
}

def goToUi(sc: SparkContext, path: String): Unit = {
goToUi(sc.ui.get, path)
}
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.ui.killEnabled</code></td>
<td>true</td>
<td>
Allows stages and corresponding jobs to be killed from the web ui.
Allows jobs and stages to be killed from the web UI.
</td>
</tr>
<tr>
Expand Down