Skip to content

Commit f3c9a14

Browse files
author
Bharath Bhushan
committed
Merge remote-tracking branch 'upstream/master' into spark-1403
2 parents 42d3d6a + 7f32fd4 commit f3c9a14

File tree

4 files changed

+38
-20
lines changed

4 files changed

+38
-20
lines changed

core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {
8181
/** If stages is too large, remove and garbage collect old stages */
8282
private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
8383
if (stages.size > retainedStages) {
84-
val toRemove = retainedStages / 10
85-
stages.takeRight(toRemove).foreach( s => {
84+
val toRemove = math.max(retainedStages / 10, 1)
85+
stages.take(toRemove).foreach { s =>
8686
stageIdToTaskData.remove(s.stageId)
8787
stageIdToTime.remove(s.stageId)
8888
stageIdToShuffleRead.remove(s.stageId)
@@ -94,8 +94,8 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {
9494
stageIdToTasksFailed.remove(s.stageId)
9595
stageIdToPool.remove(s.stageId)
9696
if (stageIdToDescription.contains(s.stageId)) {stageIdToDescription.remove(s.stageId)}
97-
})
98-
stages.trimEnd(toRemove)
97+
}
98+
stages.trimStart(toRemove)
9999
}
100100
}
101101

core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,42 @@
1818
package org.apache.spark.ui.jobs
1919

2020
import org.scalatest.FunSuite
21+
import org.scalatest.matchers.ShouldMatchers
2122

22-
import org.apache.spark.{LocalSparkContext, SparkContext, Success}
23+
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, Success}
2324
import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
2425
import org.apache.spark.scheduler._
2526
import org.apache.spark.util.Utils
2627

27-
class JobProgressListenerSuite extends FunSuite with LocalSparkContext {
28+
class JobProgressListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
29+
test("test LRU eviction of stages") {
30+
val conf = new SparkConf()
31+
conf.set("spark.ui.retainedStages", 5.toString)
32+
val listener = new JobProgressListener(conf)
33+
34+
def createStageStartEvent(stageId: Int) = {
35+
val stageInfo = new StageInfo(stageId, stageId.toString, 0, null)
36+
SparkListenerStageSubmitted(stageInfo)
37+
}
38+
39+
def createStageEndEvent(stageId: Int) = {
40+
val stageInfo = new StageInfo(stageId, stageId.toString, 0, null)
41+
SparkListenerStageCompleted(stageInfo)
42+
}
43+
44+
for (i <- 1 to 50) {
45+
listener.onStageSubmitted(createStageStartEvent(i))
46+
listener.onStageCompleted(createStageEndEvent(i))
47+
}
48+
49+
listener.completedStages.size should be (5)
50+
listener.completedStages.filter(_.stageId == 50).size should be (1)
51+
listener.completedStages.filter(_.stageId == 49).size should be (1)
52+
listener.completedStages.filter(_.stageId == 48).size should be (1)
53+
listener.completedStages.filter(_.stageId == 47).size should be (1)
54+
listener.completedStages.filter(_.stageId == 46).size should be (1)
55+
}
56+
2857
test("test executor id to summary") {
2958
val sc = new SparkContext("local", "test")
3059
val listener = new JobProgressListener(sc.conf)

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -345,14 +345,8 @@ trait ClientBase extends Logging {
345345
}
346346

347347
// Command for the ApplicationMaster
348-
var javaCommand = "java"
349-
val javaHome = System.getenv("JAVA_HOME")
350-
if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
351-
javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
352-
}
353-
354348
val commands = List[String](
355-
javaCommand +
349+
Environment.JAVA_HOME.$() + "/bin/java" +
356350
" -server " +
357351
JAVA_OPTS +
358352
" " + args.amClass +

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,8 @@ trait ExecutorRunnableUtil extends Logging {
8888
}
8989
*/
9090

91-
var javaCommand = "java"
92-
val javaHome = System.getenv("JAVA_HOME")
93-
if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
94-
javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
95-
}
96-
97-
val commands = List[String](javaCommand +
91+
val commands = List[String](
92+
Environment.JAVA_HOME.$() + "/bin/java" +
9893
" -server " +
9994
// Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
10095
// Not killing the task leaves various aspects of the executor and (to some extent) the jvm in

0 commit comments

Comments
 (0)