@@ -33,7 +33,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
3333 val taskScheduler = new TaskSchedulerImpl (sc)
3434 taskScheduler.initialize(new FakeSchedulerBackend )
3535 // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
36- val dagScheduler = new DAGScheduler (sc, taskScheduler) {
36+ new DAGScheduler (sc, taskScheduler) {
3737 override def taskStarted (task : Task [_], taskInfo : TaskInfo ) {}
3838 override def executorAdded (execId : String , host : String ) {}
3939 }
@@ -67,7 +67,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
6767 val taskScheduler = new TaskSchedulerImpl (sc)
6868 taskScheduler.initialize(new FakeSchedulerBackend )
6969 // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
70- val dagScheduler = new DAGScheduler (sc, taskScheduler) {
70+ new DAGScheduler (sc, taskScheduler) {
7171 override def taskStarted (task : Task [_], taskInfo : TaskInfo ) {}
7272 override def executorAdded (execId : String , host : String ) {}
7373 }
@@ -138,18 +138,103 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
138138 override def executorAdded (execId : String , host : String ) {}
139139 }
140140 taskScheduler.setDAGScheduler(dagScheduler)
141- val attempt1 = new TaskSet ( Array ( new FakeTask ( 0 )) , 0 , 0 , 0 , null )
142- val attempt2 = new TaskSet ( Array ( new FakeTask ( 0 )), 0 , 1 , 0 , null )
141+ val attempt1 = FakeTask .createTaskSet( 1 , 0 )
142+ val attempt2 = FakeTask .createTaskSet( 1 , 1 )
143143 taskScheduler.submitTasks(attempt1)
144144 intercept[IllegalStateException ] { taskScheduler.submitTasks(attempt2) }
145145
146146 // OK to submit multiple if previous attempts are all zombie
147- taskScheduler.activeTaskSets(attempt1.id).isZombie = true
147+ taskScheduler.taskSetManagerForAttempt(attempt1.stageId, attempt1.stageAttemptId)
148+ .get.isZombie = true
148149 taskScheduler.submitTasks(attempt2)
149- val attempt3 = new TaskSet ( Array ( new FakeTask ( 0 )), 0 , 2 , 0 , null )
150+ val attempt3 = FakeTask .createTaskSet( 1 , 2 )
150151 intercept[IllegalStateException ] { taskScheduler.submitTasks(attempt3) }
151- taskScheduler.activeTaskSets(attempt2.id).isZombie = true
152+ taskScheduler.taskSetManagerForAttempt(attempt2.stageId, attempt2.stageAttemptId)
153+ .get.isZombie = true
152154 taskScheduler.submitTasks(attempt3)
153155 }
154156
157+ test(" don't schedule more tasks after a taskset is zombie" ) {
158+ sc = new SparkContext (" local" , " TaskSchedulerImplSuite" )
159+ val taskScheduler = new TaskSchedulerImpl (sc)
160+ taskScheduler.initialize(new FakeSchedulerBackend )
161+ // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
162+ new DAGScheduler (sc, taskScheduler) {
163+ override def taskStarted (task : Task [_], taskInfo : TaskInfo ) {}
164+ override def executorAdded (execId : String , host : String ) {}
165+ }
166+
167+ val numFreeCores = 1
168+ val workerOffers = Seq (new WorkerOffer (" executor0" , " host0" , numFreeCores))
169+ val attempt1 = FakeTask .createTaskSet(10 )
170+
171+ // submit attempt 1, offer some resources, some tasks get scheduled
172+ taskScheduler.submitTasks(attempt1)
173+ val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
174+ assert(1 === taskDescriptions.length)
175+
176+ // now mark attempt 1 as a zombie
177+ taskScheduler.taskSetManagerForAttempt(attempt1.stageId, attempt1.stageAttemptId)
178+ .get.isZombie = true
179+
180+ // don't schedule anything on another resource offer
181+ val taskDescriptions2 = taskScheduler.resourceOffers(workerOffers).flatten
182+ assert(0 === taskDescriptions2.length)
183+
184+ // if we schedule another attempt for the same stage, it should get scheduled
185+ val attempt2 = FakeTask .createTaskSet(10 , 1 )
186+
187+ // submit attempt 2, offer some resources, some tasks get scheduled
188+ taskScheduler.submitTasks(attempt2)
189+ val taskDescriptions3 = taskScheduler.resourceOffers(workerOffers).flatten
190+ assert(1 === taskDescriptions3.length)
191+ val mgr = taskScheduler.taskSetManagerForTask(taskDescriptions3(0 ).taskId).get
192+ assert(mgr.taskSet.stageAttemptId === 1 )
193+ }
194+
195+ test(" if a zombie attempt finishes, continue scheduling tasks for non-zombie attempts" ) {
196+ sc = new SparkContext (" local" , " TaskSchedulerImplSuite" )
197+ val taskScheduler = new TaskSchedulerImpl (sc)
198+ taskScheduler.initialize(new FakeSchedulerBackend )
199+ // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
200+ new DAGScheduler (sc, taskScheduler) {
201+ override def taskStarted (task : Task [_], taskInfo : TaskInfo ) {}
202+ override def executorAdded (execId : String , host : String ) {}
203+ }
204+
205+ val numFreeCores = 10
206+ val workerOffers = Seq (new WorkerOffer (" executor0" , " host0" , numFreeCores))
207+ val attempt1 = FakeTask .createTaskSet(10 )
208+
209+ // submit attempt 1, offer some resources, some tasks get scheduled
210+ taskScheduler.submitTasks(attempt1)
211+ val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
212+ assert(10 === taskDescriptions.length)
213+
214+ // now mark attempt 1 as a zombie
215+ val mgr1 = taskScheduler.taskSetManagerForAttempt(attempt1.stageId, attempt1.stageAttemptId).get
216+ mgr1.isZombie = true
217+
218+ // don't schedule anything on another resource offer
219+ val taskDescriptions2 = taskScheduler.resourceOffers(workerOffers).flatten
220+ assert(0 === taskDescriptions2.length)
221+
222+ // submit attempt 2
223+ val attempt2 = FakeTask .createTaskSet(10 , 1 )
224+ taskScheduler.submitTasks(attempt2)
225+
226+ // attempt 1 finished (this can happen even if it was marked zombie earlier -- all tasks were
227+ // already submitted, and then they finish)
228+ taskScheduler.taskSetFinished(mgr1)
229+
230+ // now with another resource offer, we should still schedule all the tasks in attempt2
231+ val taskDescriptions3 = taskScheduler.resourceOffers(workerOffers).flatten
232+ assert(10 === taskDescriptions3.length)
233+
234+ taskDescriptions3.foreach{ task =>
235+ val mgr = taskScheduler.taskSetManagerForTask(task.taskId).get
236+ assert(mgr.taskSet.stageAttemptId === 1 )
237+ }
238+ }
239+
155240}
0 commit comments