diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 0c24ad2760e08..13abebf305a97 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -47,7 +47,10 @@ private[spark] class SparkUI private ( /** Initialize all components of the server. */ def initialize() { - attachTab(new JobsTab(this)) + val jobsTab = new JobsTab(this) + attachTab(jobsTab) + attachHandler( + createRedirectHandler("/jobs/job/kill", "/jobs", jobsTab.handleKillRequest)) val stagesTab = new StagesTab(this) attachTab(stagesTab) attachTab(new StorageTab(this)) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index bd923d78a86ce..fcf282b03a8c3 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -29,7 +29,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { private val startTime: Option[Long] = parent.sc.map(_.startTime) private val listener = parent.listener - private def jobsTable(jobs: Seq[JobUIData]): Seq[Node] = { + private def jobsTable(jobs: Seq[JobUIData], killEnabled: Boolean): Seq[Node] = { val someJobHasJobGroup = jobs.exists(_.jobGroup.isDefined) val columns: Seq[Node] = { @@ -42,6 +42,20 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { } def makeRow(job: JobUIData): Seq[Node] = { + // scalastyle:off + val killLink = if (killEnabled) { + val killLinkUri = s"${UIUtils.prependBaseUri(parent.basePath)}/jobs/job/kill?" + + s"id=${job.jobId}&terminate=true" + val confirm = "return window.confirm(" + + s"'Are you sure you want to kill job ${job.jobId} ?');" + + (kill) + + } else { + Seq.empty + } + // scalastyle:on + val lastStageInfo = Option(job.stageIds) .filter(_.nonEmpty) .flatMap { ids => listener.stageIdToInfo.get(ids.max) } @@ -68,6 +82,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { {lastStageDescription} {lastStageName} + {killLink} {formattedSubmissionTime} @@ -102,11 +117,14 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { val now = System.currentTimeMillis val activeJobsTable = - jobsTable(activeJobs.sortBy(_.submissionTime.getOrElse(-1L)).reverse) + jobsTable(activeJobs.sortBy(_.submissionTime.getOrElse(-1L)).reverse, + killEnabled = parent.killEnabled) val completedJobsTable = - jobsTable(completedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse) + jobsTable(completedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse, + killEnabled = false) val failedJobsTable = - jobsTable(failedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse) + jobsTable(failedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse, + killEnabled = false) val shouldShowActiveJobs = activeJobs.nonEmpty val shouldShowCompletedJobs = completedJobs.nonEmpty diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala index b2bbfdee56946..f7549036390a7 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala @@ -17,6 +17,8 @@ package org.apache.spark.ui.jobs +import javax.servlet.http.HttpServletRequest + import org.apache.spark.scheduler.SchedulingMode import org.apache.spark.ui.{SparkUI, SparkUITab} @@ -29,4 +31,21 @@ private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") { attachPage(new AllJobsPage(this)) attachPage(new JobPage(this)) + + def handleKillRequest(request: HttpServletRequest): Unit = { + if (killEnabled && (parent.securityManager.checkModifyPermissions(request.getRemoteUser))) { + val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean + val jobIdOpt = Option(request.getParameter("id")) + if (jobIdOpt.isDefined && killFlag) { + val jobId = jobIdOpt.get.toInt + if (listener.activeJobs.contains(jobId)) { + sc.get.cancelJob(jobId) + } + } + // Do a quick pause here to give Spark time to kill the job so it shows up as + // killed after the refresh. Note that this will block the serving thread so the + // time should be limited in duration. + Thread.sleep(100) + } + } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala index 937261de00e3a..a960f386f1424 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala @@ -34,12 +34,15 @@ private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages" def isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR) - def handleKillRequest(request: HttpServletRequest) = { - if ((killEnabled) && (parent.securityManager.checkModifyPermissions(request.getRemoteUser))) { + def handleKillRequest(request: HttpServletRequest): Unit = { + if (killEnabled && (parent.securityManager.checkModifyPermissions(request.getRemoteUser))) { val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean - val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt - if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) { - sc.get.cancelStage(stageId) + val stageIdOpt = Option(request.getParameter("id")) + if (stageIdOpt.isDefined && killFlag) { + val stageId = stageIdOpt.get.toInt + if (listener.activeStages.contains(stageId)) { + sc.get.cancelStage(stageId) + } } // Do a quick pause here to give Spark time to kill the stage so it shows up as // killed after the refresh. Note that this will block the serving thread so the