Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1221,7 +1221,6 @@ private[spark] class DAGScheduler(
s"(available: ${stage.isAvailable}," +
s"available outputs: ${stage.numAvailableOutputs}," +
s"partitions: ${stage.numPartitions})")
markMapStageJobsAsFinished(stage)
case stage : ResultStage =>
logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
}
Expand Down Expand Up @@ -1402,7 +1401,7 @@ private[spark] class DAGScheduler(
shuffleStage.pendingPartitions -= task.partitionId
}

if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
if (shuffleStage.pendingPartitions.isEmpty) {
markStageAsFinished(shuffleStage)
logInfo("looking for newly runnable stages")
logInfo("running: " + runningStages)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.scheduler

import java.util.Properties
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}

import scala.annotation.meta.param
Expand Down Expand Up @@ -2246,58 +2247,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
assertDataStructuresEmpty()
}

test("Trigger mapstage's job listener in submitMissingTasks") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you give some explain for deleting this test?

Copy link
Contributor Author

@liutang123 liutang123 Aug 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because that PR is conflict with this PR.
In that PR, shuffleMapStage waits the completion of parent stages's rerun.
In this PR, shuffleMapStage completes immediately when all partitions are ready.

val rdd1 = new MyRDD(sc, 2, Nil)
val dep1 = new ShuffleDependency(rdd1, new HashPartitioner(2))
val rdd2 = new MyRDD(sc, 2, List(dep1), tracker = mapOutputTracker)
val dep2 = new ShuffleDependency(rdd2, new HashPartitioner(2))

val listener1 = new SimpleListener
val listener2 = new SimpleListener

submitMapStage(dep1, listener1)
submitMapStage(dep2, listener2)

// Complete the stage0.
assert(taskSets(0).stageId === 0)
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", rdd1.partitions.length)),
(Success, makeMapStatus("hostB", rdd1.partitions.length))))
assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet ===
HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
assert(listener1.results.size === 1)

// When attempting stage1, trigger a fetch failure.
assert(taskSets(1).stageId === 1)
complete(taskSets(1), Seq(
(Success, makeMapStatus("hostC", rdd2.partitions.length)),
(FetchFailed(makeBlockManagerId("hostA"), dep1.shuffleId, 0, 0, "ignored"), null)))
scheduler.resubmitFailedStages()
// Stage1 listener should not have a result yet
assert(listener2.results.size === 0)

// Speculative task succeeded in stage1.
runEvent(makeCompletionEvent(
taskSets(1).tasks(1),
Success,
makeMapStatus("hostD", rdd2.partitions.length)))
// stage1 listener still should not have a result, though there's no missing partitions
// in it. Because stage1 has been failed and is not inside `runningStages` at this moment.
assert(listener2.results.size === 0)

// Stage0 should now be running as task set 2; make its task succeed
assert(taskSets(2).stageId === 0)
complete(taskSets(2), Seq(
(Success, makeMapStatus("hostC", rdd2.partitions.length))))
assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet ===
Set(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))

// After stage0 is finished, stage1 will be submitted and found there is no missing
// partitions in it. Then listener got triggered.
assert(listener2.results.size === 1)
assertDataStructuresEmpty()
}

/**
* In this test, we run a map stage where one of the executors fails but we still receive a
* "zombie" complete message from that executor. We want to make sure the stage is not reported
Expand Down Expand Up @@ -2627,6 +2576,39 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
assert(countSubmittedMapStageAttempts() === 2)
}

test("SPARK-25211 speculation and fetch failed result in hang of job") {
val shuffleMapRDD1 = new MyRDD(sc, 1, Nil)
val dep1 = new ShuffleDependency(shuffleMapRDD1, new HashPartitioner(2))
val shuffleMapRDD2 = new MyRDD(sc, 2, List(dep1))
val dep2 = new ShuffleDependency(shuffleMapRDD2, new HashPartitioner(2))

val jobId = scheduler.nextJobId.get()
val waiter = new JobWaiter(scheduler, jobId, 1, (_: Int, _: MapOutputStatistics) => {})
val realJobId = submitMapStage(dep2, waiter)
assert(waiter.jobId === realJobId)
assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
complete(taskSets(0),
(Success, makeMapStatus("hostA", dep1.partitioner.numPartitions)) :: Nil
)
assert(taskSets(1).stageId === 1 && taskSets(1).stageAttemptId === 0)
complete(taskSets(1), Seq(
(Success, makeMapStatus("hostB", dep2.partitioner.numPartitions)),
(FetchFailed(makeBlockManagerId("hostA"), dep1.shuffleId, 0, 0, "ignored"), null)))

// waiting for resubmitting of failed stages
TimeUnit.MILLISECONDS.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2)
assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1)

// A speculated task finished
runEvent(makeCompletionEvent(taskSets(1).tasks(1),
Success, makeMapStatus("hostC", dep2.partitioner.numPartitions)))
assert(waiter.jobFinished)

runEvent(makeCompletionEvent(taskSets(2).tasks(0),
Success, makeMapStatus("hostD", dep2.partitioner.numPartitions)))
assert(taskSets.size === 3)
}

/**
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
* Note that this checks only the host and not the executor ID.
Expand Down