Skip to content
Closed
62 changes: 60 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.spark.ui

import java.text.SimpleDateFormat
import java.util.{Locale, Date}
import java.util.{Date, Locale}

import scala.xml.{Node, Text, Unparsed}
import scala.util.control.NonFatal
import scala.xml._
import scala.xml.transform.{RewriteRule, RuleTransformer}

import org.apache.spark.Logging
import org.apache.spark.ui.scope.RDDOperationGraph
Expand Down Expand Up @@ -395,4 +397,60 @@ private[spark] object UIUtils extends Logging {
</script>
}

/**
* Returns HTML rendering of a job or stage description. It will try to parse the string as HTML
* and make sure that it only contains anchors with root-relative links. Otherwise,
* the whole string will rendered as a simple escaped text.
*
* Note: In terms of security, only anchor tags with root relative links are supported. So any
* attempts to embed links outside Spark UI, or other tags like <script> will cause in the whole
* description to be treated as plain text.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a paragraph on the security implications here? (What happens if I put in a <script> tag or something, or if I reference a malicious link)

def makeDescription(desc: String, basePathUri: String): NodeSeq = {
import scala.language.postfixOps

// If the description can be parsed as HTML and has only relative links, then render
// as HTML, otherwise render as escaped string
try {
// Try to load the description as unescaped HTML
val xml = XML.loadString(s"""<span class="description-input">$desc</span>""")

// Verify that this has only anchors and span (we are wrapping in span)
val allowedNodeLabels = Set("a", "span")
val illegalNodes = xml \\ "_" filterNot { case node: Node =>
allowedNodeLabels.contains(node.label)
}
if (illegalNodes.nonEmpty) {
throw new IllegalArgumentException(
"Only HTML anchors allowed in job descriptions\n" +
illegalNodes.map { n => s"${n.label} in $n"}.mkString("\n\t"))
}

// Verify that all links are relative links starting with "/"
val allLinks =
xml \\ "a" flatMap { _.attributes } filter { _.key == "href" } map { _.value.toString }
if (allLinks.exists { ! _.startsWith ("/") }) {
throw new IllegalArgumentException(
"Links in job descriptions must be root-relative:\n" + allLinks.mkString("\n\t"))
}

// Prepend the relative links with basePathUri
val rule = new RewriteRule() {
override def transform(n: Node): Seq[Node] = {
n match {
case e: Elem if e \ "@href" nonEmpty =>
val relativePath = e.attribute("href").get.toString
val fullUri = s"${basePathUri.stripSuffix("/")}/${relativePath.stripPrefix("/")}"
e % Attribute(null, "href", fullUri, Null)
case _ => n
}
}
}
new RuleTransformer(rule).transform(xml)
} catch {
case NonFatal(e) =>
logWarning(s"Invalid job description: $desc ", e)
<span class="description-input">{desc}</span>
}
}
}
14 changes: 8 additions & 6 deletions core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

package org.apache.spark.ui.jobs

import scala.collection.mutable.{HashMap, ListBuffer}
import scala.xml.{Node, NodeSeq, Unparsed, Utility}

import java.util.Date
import javax.servlet.http.HttpServletRequest

import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
import org.apache.spark.ui.jobs.UIData.{ExecutorUIData, JobUIData}
import scala.collection.mutable.{HashMap, ListBuffer}
import scala.xml._

import org.apache.spark.JobExecutionStatus
import org.apache.spark.ui.jobs.UIData.{ExecutorUIData, JobUIData}
import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}

/** Page showing list of all ongoing and recently finished jobs */
private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
Expand Down Expand Up @@ -224,14 +224,16 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
}
val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
val formattedSubmissionTime = job.submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
val jobDescription = UIUtils.makeDescription(lastStageDescription, parent.basePath)

val detailUrl =
"%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), job.jobId)
<tr id={"job-" + job.jobId}>
<td sorttable_customkey={job.jobId.toString}>
{job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")}
</td>
<td>
<span class="description-input" title={lastStageDescription}>{lastStageDescription}</span>
{jobDescription}
<a href={detailUrl} class="name-link">{lastStageName}</a>
</td>
<td sorttable_customkey={job.submissionTime.getOrElse(-1).toString}>
Expand Down
7 changes: 3 additions & 4 deletions core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@

package org.apache.spark.ui.jobs

import scala.xml.Node
import scala.xml.Text

import java.util.Date

import scala.xml.{Node, Text}

import org.apache.commons.lang3.StringEscapeUtils

import org.apache.spark.scheduler.StageInfo
Expand Down Expand Up @@ -116,7 +115,7 @@ private[ui] class StageTableBase(
stageData <- listener.stageIdToData.get((s.stageId, s.attemptId))
desc <- stageData.description
} yield {
<span class="description-input" title={desc}>{desc}</span>
UIUtils.makeDescription(desc, basePathUri)
}
<div>{stageDesc.getOrElse("")} {killLink} {nameLink} {details}</div>
}
Expand Down
66 changes: 66 additions & 0 deletions core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ui

import scala.xml.Elem

import org.apache.spark.SparkFunSuite

class UIUtilsSuite extends SparkFunSuite {
import UIUtils._

test("makeDescription") {
verify(
"""test <a href="/link"> text </a>""",
<span class="description-input">test <a href="/link"> text </a></span>,
"Correctly formatted text with only anchors and relative links should generate HTML"
)

verify(
"""test <a href="/link" text </a>""",
<span class="description-input">{"""test <a href="/link" text </a>"""}</span>,
"Badly formatted text should make the description be treated as a streaming instead of HTML"
)

verify(
"""test <a href="link"> text </a>""",
<span class="description-input">{"""test <a href="link"> text </a>"""}</span>,
"Non-relative links should make the description be treated as a string instead of HTML"
)

verify(
"""test<a><img></img></a>""",
<span class="description-input">{"""test<a><img></img></a>"""}</span>,
"Non-anchor elements should make the description be treated as a string instead of HTML"
)

verify(
"""test <a href="/link"> text </a>""",
<span class="description-input">test <a href="base/link"> text </a></span>,
baseUrl = "base",
errorMsg = "Base URL should be prepended to html links"
)
}

private def verify(
desc: String, expected: Elem, errorMsg: String = "", baseUrl: String = ""): Unit = {
val generated = makeDescription(desc, baseUrl)
assert(generated.sameElements(expected),
s"\n$errorMsg\n\nExpected:\n$expected\nGenerated:\n$generated")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ class StreamingContext private[streaming] (

private val startSite = new AtomicReference[CallSite](null)

private[streaming] def getStartSite(): CallSite = startSite.get()

private var shutdownHookRef: AnyRef = _

conf.getOption("spark.streaming.checkpoint.directory").foreach(checkpoint)
Expand Down Expand Up @@ -744,7 +746,7 @@ object StreamingContext extends Logging {
throw new IllegalStateException(
"Only one StreamingContext may be started in this JVM. " +
"Currently running StreamingContext was started at" +
activeContext.get.startSite.get.longForm)
activeContext.get.getStartSite().longForm)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.util.{Failure, Success}
import org.apache.spark.Logging
import org.apache.spark.rdd.PairRDDFunctions
import org.apache.spark.streaming._
import org.apache.spark.streaming.ui.UIUtils
import org.apache.spark.util.{EventLoop, ThreadUtils}


Expand Down Expand Up @@ -190,10 +191,20 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}

private class JobHandler(job: Job) extends Runnable with Logging {
import JobScheduler._

def run() {
ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
try {
val formattedTime = UIUtils.formatBatchTime(
job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"

ssc.sc.setJobDescription(
s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is assuming the UI is the only consumer of this? I suppose that's OK since no one uses JobLogger anymore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoever uses it will get the data as text. Only the UI would actually process it as a link. Nothing breaks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this caused a cosmetic issue in the tooltips though: #11845
EDIT: not quite tooltip, but the event timeline display. Also could have been subsequent to this change. Worth a look at the new issue in any event

ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)

// We need to assign `eventLoop` to a temp variable. Otherwise, because
// `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
// it's possible that when `post` is called, `eventLoop` happens to null.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.rpc._
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.receiver._
import org.apache.spark.util.{ThreadUtils, SerializableConfiguration}
import org.apache.spark.util.{Utils, ThreadUtils, SerializableConfiguration}


/** Enumeration to identify current state of a Receiver */
Expand Down Expand Up @@ -554,6 +554,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
ssc.sc.makeRDD(Seq(receiver -> scheduledExecutors))
}
receiverRDD.setName(s"Receiver $receiverId")
ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))

val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
// We will keep restarting the receiver job until ReceiverTracker is stopped
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ package org.apache.spark.streaming.ui

import javax.servlet.http.HttpServletRequest

import scala.xml.{NodeSeq, Node, Text, Unparsed}
import scala.xml._

import org.apache.commons.lang3.StringEscapeUtils

import org.apache.spark.streaming.Time
import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}
import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId}
import org.apache.spark.streaming.ui.StreamingJobProgressListener.{OutputOpId, SparkJobId}
import org.apache.spark.ui.jobs.UIData.JobUIData
import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}

private[ui] case class SparkJobIdWithUIData(sparkJobId: SparkJobId, jobUIData: Option[JobUIData])

Expand Down Expand Up @@ -207,16 +207,25 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
sparkListener.stageIdToInfo.get(sparkJob.stageIds.max)
}
}
val lastStageData = lastStageInfo.flatMap { s =>
sparkListener.stageIdToData.get((s.stageId, s.attemptId))
}

val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
lastStageInfo match {
case Some(stageInfo) =>
val details = if (stageInfo.details.nonEmpty) {
<span
onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')"
class="expand-details">
+details
</span> ++
<div class="stage-details collapsed">
<pre>{stageInfo.details}</pre>
</div>
} else {
NodeSeq.Empty
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else is missing.


<span class="description-input" title={lastStageDescription}>
{lastStageDescription}
</span> ++ Text(lastStageName)
<div> {stageInfo.name} {details} </div>
case None =>
Text("(Unknown)")
}
}

private def failureReasonCell(failureReason: String): Seq[Node] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo

// Verify streaming jobs have expected thread-local properties
assert(jobGroupFound === null)
assert(jobDescFound === null)
assert(jobDescFound.contains("Streaming job from"))
assert(jobInterruptFound === "false")

// Verify current thread's thread-local properties have not changed
Expand Down