Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -386,13 +386,15 @@ class DAGScheduler(
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
visited += rdd
if (getCacheLocs(rdd).contains(Nil)) {
if (rdd.dependencies.size < 2 || getCacheLocs(rdd).contains(Nil)) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a general aside, I find getCacheLocs(rdd).contains(Nil) to be hard to understand to begin with. I think that this condition is meant to be read as "if at least one partition of this RDD is not cached anywhere...". Maybe this code would be easier to review / parse if we extracted this condition into a variable, perhaps a lazy val if we want to short-circuit, named rddHasUncachedPartitions, or !rddIsCached if we don't mind negation.

for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
val mapStage = getShuffleMapStage(shufDep, stage.jobId)
if (!mapStage.isAvailable) {
missing += mapStage
if (getCacheLocs(rdd).contains(Nil)) {
val mapStage = getShuffleMapStage(shufDep, stage.jobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
}
case narrowDep: NarrowDependency[_] =>
waitingForVisit.push(narrowDep.rdd)
Expand Down