Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ private[spark] class SparkUI private (

/** Initialize all components of the server. */
def initialize() {
attachTab(new JobsTab(this))
val jobsTab = new JobsTab(this)
attachTab(jobsTab)
attachHandler(
createRedirectHandler("/jobs/job/kill", "/jobs", jobsTab.handleKillRequest))
val stagesTab = new StagesTab(this)
attachTab(stagesTab)
attachTab(new StorageTab(this))
Expand Down
26 changes: 22 additions & 4 deletions core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
private val startTime: Option[Long] = parent.sc.map(_.startTime)
private val listener = parent.listener

private def jobsTable(jobs: Seq[JobUIData]): Seq[Node] = {
private def jobsTable(jobs: Seq[JobUIData], killEnabled: Boolean): Seq[Node] = {
val someJobHasJobGroup = jobs.exists(_.jobGroup.isDefined)

val columns: Seq[Node] = {
Expand All @@ -42,6 +42,20 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
}

def makeRow(job: JobUIData): Seq[Node] = {
// scalastyle:off
val killLink = if (killEnabled) {
val killLinkUri = s"${UIUtils.prependBaseUri(parent.basePath)}/jobs/job/kill?" +
s"id=${job.jobId}&terminate=true"
val confirm = "return window.confirm(" +
s"'Are you sure you want to kill job ${job.jobId} ?');"
<span class="kill-link">
(<a href={killLinkUri} onclick={confirm}>kill</a>)
</span>
} else {
Seq.empty
}
// scalastyle:on
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need to disable scalastyle here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, I'm not sure what the type of killLink is. It seems to me to be Any because there's no else case. Can you make it explicit by adding an else case that returns Seq.empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, if killEnabled = false, the type of killLink is Any.
i will add an else case that returns Seq.empty.thanks.


val lastStageInfo = Option(job.stageIds)
.filter(_.nonEmpty)
.flatMap { ids => listener.stageIdToInfo.get(ids.max) }
Expand All @@ -68,6 +82,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
<td>
<span class="description-input" title={lastStageDescription}>{lastStageDescription}</span>
<a href={detailUrl}>{lastStageName}</a>
{killLink}
</td>
<td sorttable_customkey={job.submissionTime.getOrElse(-1).toString}>
{formattedSubmissionTime}
Expand Down Expand Up @@ -102,11 +117,14 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
val now = System.currentTimeMillis

val activeJobsTable =
jobsTable(activeJobs.sortBy(_.submissionTime.getOrElse(-1L)).reverse)
jobsTable(activeJobs.sortBy(_.submissionTime.getOrElse(-1L)).reverse,
killEnabled = parent.killEnabled)
val completedJobsTable =
jobsTable(completedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse)
jobsTable(completedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse,
killEnabled = false)
val failedJobsTable =
jobsTable(failedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse)
jobsTable(failedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse,
killEnabled = false)

val shouldShowActiveJobs = activeJobs.nonEmpty
val shouldShowCompletedJobs = completedJobs.nonEmpty
Expand Down
19 changes: 19 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.ui.jobs

import javax.servlet.http.HttpServletRequest

import org.apache.spark.scheduler.SchedulingMode
import org.apache.spark.ui.{SparkUI, SparkUITab}

Expand All @@ -29,4 +31,21 @@ private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") {

attachPage(new AllJobsPage(this))
attachPage(new JobPage(this))

def handleKillRequest(request: HttpServletRequest): Unit = {
if (killEnabled && (parent.securityManager.checkModifyPermissions(request.getRemoteUser))) {
val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
val jobIdOpt = Option(request.getParameter("id"))
if (jobIdOpt.isDefined && killFlag) {
val jobId = jobIdOpt.get.toInt
if (listener.activeJobs.contains(jobId)) {
sc.get.cancelJob(jobId)
}
}
// Do a quick pause here to give Spark time to kill the job so it shows up as
// killed after the refresh. Note that this will block the serving thread so the
// time should be limited in duration.
Thread.sleep(100)
}
}
}
13 changes: 8 additions & 5 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,15 @@ private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages"

def isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)

def handleKillRequest(request: HttpServletRequest) = {
if ((killEnabled) && (parent.securityManager.checkModifyPermissions(request.getRemoteUser))) {
def handleKillRequest(request: HttpServletRequest): Unit = {
if (killEnabled && (parent.securityManager.checkModifyPermissions(request.getRemoteUser))) {
val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt
if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) {
sc.get.cancelStage(stageId)
val stageIdOpt = Option(request.getParameter("id"))
if (stageIdOpt.isDefined && killFlag) {
val stageId = stageIdOpt.get.toInt
if (listener.activeStages.contains(stageId)) {
sc.get.cancelStage(stageId)
}
}
// Do a quick pause here to give Spark time to kill the stage so it shows up as
// killed after the refresh. Note that this will block the serving thread so the
Expand Down