Skip to content

Commit 27918c7

Browse files
committed
Add failing unit tests for standalone log URL viewing
1 parent c250fbe commit 27918c7

File tree

1 file changed

+45
-10
lines changed

1 file changed

+45
-10
lines changed

core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,35 +17,70 @@
1717

1818
package org.apache.spark.deploy
1919

20+
import java.net.URL
21+
2022
import scala.collection.mutable
23+
import scala.io.Source
2124

22-
import org.scalatest.{BeforeAndAfter, FunSuite}
25+
import org.scalatest.FunSuite
2326

2427
import org.apache.spark.scheduler.cluster.ExecutorInfo
2528
import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListener}
26-
import org.apache.spark.{SparkContext, LocalSparkContext}
29+
import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext}
2730

28-
class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext with BeforeAndAfter {
31+
class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext {
2932

3033
/** Length of time to wait while draining listener events. */
31-
val WAIT_TIMEOUT_MILLIS = 10000
34+
private val WAIT_TIMEOUT_MILLIS = 10000
3235

33-
before {
36+
test("verify that correct log urls get propagated from workers") {
3437
sc = new SparkContext("local-cluster[2,1,512]", "test")
38+
39+
val listener = new SaveExecutorInfo
40+
sc.addSparkListener(listener)
41+
42+
// Trigger a job so that executors get added
43+
sc.parallelize(1 to 100, 4).map(_.toString).count()
44+
45+
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
46+
listener.addedExecutorInfos.values.foreach { info =>
47+
assert(info.logUrlMap.nonEmpty)
48+
// Browse to each URL to check that it's valid
49+
info.logUrlMap.foreach { case (logType, logUrl) =>
50+
println(logUrl)
51+
val html = Source.fromURL(logUrl).mkString
52+
assert(html.contains(s"$logType log page"))
53+
}
54+
}
3555
}
3656

37-
test("verify log urls get propagated from workers") {
57+
test("verify that log urls reflect SPARK_PUBLIC_DNS (SPARK-6175)") {
58+
val SPARK_PUBLIC_DNS = "public_dns"
59+
class MySparkConf extends SparkConf(false) {
60+
override def getenv(name: String) = {
61+
if (name == "SPARK_PUBLIC_DNS") SPARK_PUBLIC_DNS
62+
else super.getenv(name)
63+
}
64+
65+
override def clone: SparkConf = {
66+
new MySparkConf().setAll(getAll)
67+
}
68+
}
69+
val conf = new MySparkConf()
70+
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
71+
3872
val listener = new SaveExecutorInfo
3973
sc.addSparkListener(listener)
4074

41-
val rdd1 = sc.parallelize(1 to 100, 4)
42-
val rdd2 = rdd1.map(_.toString)
43-
rdd2.setName("Target RDD")
44-
rdd2.count()
75+
// Trigger a job so that executors get added
76+
sc.parallelize(1 to 100, 4).map(_.toString).count()
4577

4678
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
4779
listener.addedExecutorInfos.values.foreach { info =>
4880
assert(info.logUrlMap.nonEmpty)
81+
info.logUrlMap.values.foreach { logUrl =>
82+
assert(new URL(logUrl).getHost === SPARK_PUBLIC_DNS)
83+
}
4984
}
5085
}
5186

0 commit comments

Comments
 (0)