1818package org .apache .spark .status
1919
2020import java .util .Date
21- import java .util .concurrent .ConcurrentHashMap
21+ import java .util .concurrent .{ ConcurrentHashMap , TimeUnit }
2222
2323import scala .collection .JavaConverters ._
2424import scala .collection .concurrent .TrieMap
2525import scala .collection .mutable
2626import scala .collection .mutable .ArrayBuffer
2727
28+ import com .google .common .cache .{CacheBuilder , RemovalListener , RemovalNotification }
29+
2830import org .apache .spark ._
2931import org .apache .spark .executor .{ExecutorMetrics , TaskMetrics }
3032import org .apache .spark .internal .Logging
@@ -73,11 +75,61 @@ private[spark] class AppStatusListener(
7375
7476 // Keep track of live entities, so that task metrics can be efficiently updated (without
7577 // causing too many writes to the underlying store, and other expensive operations).
76- private val liveStages = new ConcurrentHashMap [(Int , Int ), LiveStage ]()
77- private val liveJobs = new TrieMap [Int , LiveJob ]()
78+
79+ // In a long running thrift-server, events may be dropped due to limited event queue.
80+ // If JobEnd/StageCompleted/TaskEnd events missed, the LiveJob/LiveStage/LiveTask instances
81+ // will be kept in memory forever. To prevent this leak, use a map with expired time (TTL)
82+ // to remove these leak instances automatically.
83+ private val liveJobs =
84+ if (conf.get(LIVE_JOBS_EXPIRE_HOURS ) <= 0 ) {
85+ new ConcurrentHashMap [java.lang.Integer , LiveJob ]()
86+ } else {
87+ CacheBuilder .newBuilder()
88+ .expireAfterWrite(conf.get(LIVE_JOBS_EXPIRE_HOURS ), TimeUnit .HOURS )
89+ .removalListener(new RemovalListener [java.lang.Integer , LiveJob ] {
90+ override def onRemoval (rn : RemovalNotification [Integer , LiveJob ]): Unit = {
91+ // We set job SUCCEEDED, when the trigger of kvstore invokes cleanupJobs,
92+ // it will release from kvstore to avoid memory leak.
93+ val job = rn.getValue
94+ job.status = JobExecutionStatus .SUCCEEDED
95+ update(job, System .nanoTime(), last = true )
96+ logInfo(s " Try to clean live job ${rn.getKey} and update its status in kvstore " )
97+ }
98+ }).build[java.lang.Integer , LiveJob ]().asMap()
99+ }
100+ private val liveStages =
101+ if (conf.get(LIVE_STAGES_EXPIRE_HOURS ) <= 0 ) {
102+ new ConcurrentHashMap [(java.lang.Integer , java.lang.Integer ), LiveStage ]()
103+ } else {
104+ CacheBuilder .newBuilder()
105+ .expireAfterWrite(conf.get(LIVE_STAGES_EXPIRE_HOURS ), TimeUnit .HOURS )
106+ .removalListener(new RemovalListener [(java.lang.Integer , java.lang.Integer ), LiveStage ] {
107+ override def onRemoval (rn : RemovalNotification [(Integer , Integer ), LiveStage ]): Unit = {
108+ val stage = rn.getValue
109+ stage.cleaning = true
110+ // cleanupTasks is heavy, async
111+ kvstore.doAsync {
112+ cleanupTasks(stage, skipActive = false )
113+ // We set stage COMPLETE here, when the trigger of kvstore invokes cleanupStages,
114+ // it will release from kvstore to avoid memory leak.
115+ stage.status == v1.StageStatus .COMPLETE
116+ update(stage, System .nanoTime(), last = true )
117+ logInfo(s " Try to clean live stage ${rn.getKey._1}. ${rn.getKey._2} and " +
118+ s " update its status in kvstore " )
119+ }
120+ }
121+ }).build[(java.lang.Integer , java.lang.Integer ), LiveStage ]().asMap()
122+ }
123+ private val liveTasks =
124+ if (conf.get(LIVE_TASKS_EXPIRE_HOURS ) <= 0 ) {
125+ new ConcurrentHashMap [java.lang.Long , LiveTask ]()
126+ } else {
127+ CacheBuilder .newBuilder()
128+ .expireAfterWrite(conf.get(LIVE_TASKS_EXPIRE_HOURS ), TimeUnit .HOURS )
129+ .build[java.lang.Long , LiveTask ]().asMap()
130+ }
78131 private [spark] val liveExecutors = new TrieMap [String , LiveExecutor ]()
79132 private [spark] val deadExecutors = new TrieMap [String , LiveExecutor ]()
80- private val liveTasks = new TrieMap [Long , LiveTask ]()
81133 private val liveRDDs = new TrieMap [Int , LiveRDD ]()
82134 private val pools = new TrieMap [String , SchedulerPool ]()
83135 private val liveResourceProfiles = new TrieMap [Int , LiveResourceProfile ]()
@@ -494,7 +546,7 @@ private[spark] class AppStatusListener(
494546 }
495547
496548 override def onJobEnd (event : SparkListenerJobEnd ): Unit = {
497- liveJobs.remove(event.jobId).foreach { job =>
549+ Option ( liveJobs.remove(event.jobId) ).foreach { job =>
498550 val now = System .nanoTime()
499551
500552 // Check if there are any pending stages that match this job; mark those as skipped.
@@ -565,7 +617,7 @@ private[spark] class AppStatusListener(
565617 }.getOrElse(SparkUI .DEFAULT_POOL_NAME )
566618
567619 // Look at all active jobs to find the ones that mention this stage.
568- stage.jobs = liveJobs.values
620+ stage.jobs = liveJobs.values.asScala
569621 .filter(_.stageIds.contains(event.stageInfo.stageId))
570622 .toSeq
571623 stage.jobIds = stage.jobs.map(_.jobId).toSet
@@ -638,7 +690,7 @@ private[spark] class AppStatusListener(
638690 override def onTaskGettingResult (event : SparkListenerTaskGettingResult ): Unit = {
639691 // Call update on the task so that the "getting result" time is written to the store; the
640692 // value is part of the mutable TaskInfo state that the live entity already references.
641- liveTasks.get(event.taskInfo.taskId).foreach { task =>
693+ Option ( liveTasks.get(event.taskInfo.taskId) ).foreach { task =>
642694 maybeUpdate(task, System .nanoTime())
643695 }
644696 }
@@ -651,7 +703,7 @@ private[spark] class AppStatusListener(
651703
652704 val now = System .nanoTime()
653705
654- val metricsDelta = liveTasks.remove(event.taskInfo.taskId).map { task =>
706+ val metricsDelta = Option ( liveTasks.remove(event.taskInfo.taskId) ).map { task =>
655707 task.info = event.taskInfo
656708
657709 val errorMessage = event.reason match {
@@ -941,7 +993,7 @@ private[spark] class AppStatusListener(
941993 val now = System .nanoTime()
942994
943995 event.accumUpdates.foreach { case (taskId, sid, sAttempt, accumUpdates) =>
944- liveTasks.get(taskId).foreach { task =>
996+ Option ( liveTasks.get(taskId) ).foreach { task =>
945997 val metrics = TaskMetrics .fromAccumulatorInfos(accumUpdates)
946998 val delta = task.updateMetrics(metrics)
947999 maybeUpdate(task, now)
@@ -1032,9 +1084,9 @@ private[spark] class AppStatusListener(
10321084 entityFlushFunc(stage)
10331085 stage.executorSummaries.values.foreach(entityFlushFunc)
10341086 }
1035- liveJobs.values.foreach( entityFlushFunc)
1087+ liveJobs.values.forEach(e => entityFlushFunc(e) )
10361088 liveExecutors.values.foreach(entityFlushFunc)
1037- liveTasks.values.foreach( entityFlushFunc)
1089+ liveTasks.values.forEach(e => entityFlushFunc(e) )
10381090 liveRDDs.values.foreach(entityFlushFunc)
10391091 pools.values.foreach(entityFlushFunc)
10401092 }
@@ -1213,7 +1265,7 @@ private[spark] class AppStatusListener(
12131265
12141266 private def getOrCreateStage (info : StageInfo ): LiveStage = {
12151267 val stage = liveStages.computeIfAbsent((info.stageId, info.attemptNumber),
1216- (_ : (Int , Int )) => new LiveStage (info))
1268+ (_ : (java.lang. Integer , java.lang. Integer )) => new LiveStage (info))
12171269 stage.info = info
12181270 stage
12191271 }
@@ -1274,7 +1326,12 @@ private[spark] class AppStatusListener(
12741326 val toDelete = KVUtils .viewToSeq(view, countToDelete.toInt) { j =>
12751327 j.info.status != JobExecutionStatus .RUNNING && j.info.status != JobExecutionStatus .UNKNOWN
12761328 }
1277- toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) }
1329+ toDelete.foreach { j =>
1330+ if (j.info.status == JobExecutionStatus .SUCCEEDED && j.info.completionTime.isEmpty) {
1331+ logInfo(s " Clean up leaked job ${j.info.jobId} from kvstore " )
1332+ }
1333+ kvstore.delete(j.getClass(), j.info.jobId)
1334+ }
12781335 }
12791336
12801337 private case class StageCompletionTime (
@@ -1331,6 +1388,9 @@ private[spark] class AppStatusListener(
13311388 s " including ${stages.map(_.info.stageId).mkString(" ," )}. " )
13321389
13331390 stages.map { s =>
1391+ if (s.info.status == v1.StageStatus .COMPLETE && s.info.completionTime.isEmpty) {
1392+ logInfo(s " Clean up leaked stage ${s.info.stageId}. ${s.info.attemptId} from kvstore " )
1393+ }
13341394 val key = Array (s.info.stageId, s.info.attemptId)
13351395 kvstore.delete(s.getClass(), key)
13361396
@@ -1381,7 +1441,7 @@ private[spark] class AppStatusListener(
13811441 kvstore.removeAllByIndexValues(classOf [TaskDataWrapper ], TaskIndexNames .STAGE , stageIds)
13821442 }
13831443
1384- private def cleanupTasks (stage : LiveStage ): Unit = {
1444+ private def cleanupTasks (stage : LiveStage , skipActive : Boolean = true ): Unit = {
13851445 val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), maxTasksPerStage).toInt
13861446 if (countToDelete > 0 ) {
13871447 val stageKey = Array (stage.info.stageId, stage.info.attemptNumber)
@@ -1391,7 +1451,7 @@ private[spark] class AppStatusListener(
13911451
13921452 // Try to delete finished tasks only.
13931453 val toDelete = KVUtils .viewToSeq(view, countToDelete) { t =>
1394- ! live || t.status != TaskState .RUNNING .toString()
1454+ ! live || (skipActive && t.status != TaskState .RUNNING .toString() )
13951455 }
13961456 toDelete.foreach { t => kvstore.delete(t.getClass(), t.taskId) }
13971457 stage.savedTasks.addAndGet(- toDelete.size)
0 commit comments