Skip to content

Commit d40837a

Browse files
wangyumGitHub Enterprise
authored andcommitted
[CARMEL-7340][CARMEL-3697] Task Summary Support in TaskScheduler (apache#100)
1 parent 433ff4c commit d40837a

File tree

9 files changed

+38
-14
lines changed

9 files changed

+38
-14
lines changed

core/src/main/scala/org/apache/spark/scheduler/AnalyticsTaskSetManager.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,6 @@ private[spark] class AnalyticsTaskSetManager(
100100

101101
def userInfo(): Option[UserInfo] = _userInfo
102102

103-
def successfulTasks: Int = tasksSuccessful
104-
105103
// Add all our tasks to the pending lists. We do this in reverse order
106104
// of task index so that tasks with low indices get launched first.
107105
addPendingTasks()

core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,4 +125,5 @@ private[spark] trait TaskScheduler {
125125
*/
126126
def applicationAttemptId(): Option[String]
127127

128+
def taskSummary(): (Int, Int, Int)
128129
}

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1258,6 +1258,20 @@ private[spark] class TaskSchedulerImpl(
12581258
manager
12591259
}
12601260
}
1261+
1262+
override def taskSummary(): (Int, Int, Int) = {
1263+
var totalTasks = 0
1264+
var runningTasks = 0
1265+
var successfulTasks = 0
1266+
for (taskSet <- rootPool.getTaskSetQueue) {
1267+
if (!taskSet.isZombie) {
1268+
totalTasks += taskSet.numTasks
1269+
runningTasks += taskSet.runningTasks
1270+
successfulTasks += taskSet.successfulTasks
1271+
}
1272+
}
1273+
(totalTasks, runningTasks, successfulTasks)
1274+
}
12611275
}
12621276

12631277

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,8 @@ private[spark] class TaskSetManager(
172172

173173
override def runningTasks: Int = runningTasksSet.size
174174

175+
def successfulTasks: Int = tasksSuccessful
176+
175177
def someAttemptSucceeded(tid: Long): Boolean = {
176178
successful(taskInfos(tid).index)
177179
}

core/src/main/scala/org/apache/spark/status/AppStatusStore.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -593,16 +593,21 @@ private[spark] class AppStatusStore(
593593
constructTaskDataList(taskDataWrapperSeq)
594594
}
595595

596-
def resourceUsage(): v1.ClusterResourceSummary = {
596+
def resourceUsage(sc: Option[SparkContext]): v1.ClusterResourceSummary = {
597597
val executors = store.view(classOf[ExecutorSummaryWrapper]).asScala
598598
.filter(d => d.info.isActive)
599599
.map(_.info.totalCores)
600600
.sum
601-
val tasks = store.view(classOf[StageDataWrapper]).reverse().asScala
602-
.filter(d => d.info.status.equals(v1.StageStatus.ACTIVE))
603-
val numTasks = tasks.map(_.info.numTasks).sum
604-
val numActiveTasks = tasks.map(_.info.numActiveTasks).sum
605-
val numCompleteTasks = tasks.map(_.info.numCompleteTasks).sum
601+
val (numTasks, numActiveTasks, numCompleteTasks) = if (sc.isDefined) {
602+
sc.get.taskScheduler.taskSummary()
603+
} else {
604+
val tasks = store.view(classOf[StageDataWrapper]).reverse().asScala
605+
.filter(d => d.info.status.equals(v1.StageStatus.ACTIVE))
606+
val numTasks = tasks.map(_.info.numTasks).sum
607+
val numActiveTasks = tasks.map(_.info.numActiveTasks).sum
608+
val numCompleteTasks = tasks.map(_.info.numCompleteTasks).sum
609+
(numTasks, numActiveTasks, numCompleteTasks)
610+
}
606611

607612
new v1.ClusterResourceSummary(executors, numTasks, numActiveTasks, numCompleteTasks)
608613
}

core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,7 @@ private[v1] class AbstractApplicationResource extends BaseAppResource {
174174

175175
@GET
176176
@Path("cluster/usage")
177-
def executorSimpleList(): ClusterResourceSummary = withUI(_.store.resourceUsage())
178-
177+
def executorSimpleList(): ClusterResourceSummary = withUI(ui => ui.store.resourceUsage(ui.sc))
179178
}
180179

181180
private[v1] class OneApplicationResource extends AbstractApplicationResource {

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
207207
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
208208
override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
209209
override def applicationAttemptId(): Option[String] = None
210+
override def taskSummary(): (Int, Int, Int) = (0, 0, 0)
210211
override def executorDecommission(
211212
executorId: String,
212213
decommissionInfo: ExecutorDecommissionInfo): Unit = {
@@ -890,6 +891,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
890891
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
891892
override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
892893
override def applicationAttemptId(): Option[String] = None
894+
override def taskSummary(): (Int, Int, Int) = (0, 0, 0)
893895
override def executorDecommission(
894896
executorId: String,
895897
decommissionInfo: ExecutorDecommissionInfo): Unit = {}

core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ private class DummyTaskScheduler extends TaskScheduler {
9393
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
9494
override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
9595
override def applicationAttemptId(): Option[String] = None
96+
override def taskSummary(): (Int, Int, Int) = (0, 0, 0)
9697
def executorHeartbeatReceived(
9798
execId: String,
9899
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],

core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -330,10 +330,12 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
330330
val execIdToBlocksMapping = storageStatus.map(
331331
status => (status.blockManagerId.executorId, status.blocks)).toMap
332332
// No cached blocks should be present on executor which was decommissioned
333-
assert(
334-
!execIdToBlocksMapping.contains(execToDecommission) ||
335-
execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq === Seq(),
336-
"Cache blocks should be migrated")
333+
eventually(timeout(2.seconds), interval(100.milliseconds)) {
334+
assert(
335+
!execIdToBlocksMapping.contains(execToDecommission) ||
336+
execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq === Seq(),
337+
"Cache blocks should be migrated")
338+
}
337339
if (persist) {
338340
// There should still be all the RDD blocks cached
339341
assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) === numParts)

0 commit comments

Comments
 (0)