Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -688,10 +688,11 @@ private[spark] class TaskSchedulerImpl(
val errorMsg =
s"Fail resource offers for barrier stage ${taskSet.stageId} because only " +
s"${addressesWithDescs.size} out of a total number of ${taskSet.numTasks}" +
s" tasks got resource offers. This happens because barrier execution currently " +
s"does not work gracefully with delay scheduling. We highly recommend you to " +
s"disable delay scheduling by setting spark.locality.wait=0 as a workaround if " +
s"you see this error frequently."
s" tasks got resource offers. This could happen if delay scheduling or " +
s"blacklisting is enabled, as barrier execution currently does not work " +
s"gracefully with them. We highly recommend you to disable delay scheduling " +
s"by setting spark.locality.wait=0 or disable blacklisting by setting " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we should recommend disabling blacklisting. If it was supposed to be blacklisted assumption is something was wrong with it so job would probably fail anyway. Maybe we should just say it may have been blacklisted like before.

Copy link
Member Author

@Ngone51 Ngone51 May 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for review.

I realized a fact that blacklisting actually does not work for barrier taskset. As you may know, blacklisting only takes effect when there's failed task. But for a barrier task set, it will be marked as zombie once there's any failed task and we don't consider blacklisting for a zombie task set, see:

if (tasks(index).isBarrier) {
isZombie = true
}
sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, metricPeaks, info)
if (!isZombie && reason.countTowardsTaskFailures) {
assert (null != failureReason)
taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask(
info.host, info.executorId, index, failureReason))

So, I think blacklisting actually won't cause partial tasks launching. I will close this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

s"spark.blacklist.enabled=false as a workaround if you see this error frequently."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might also want to check whether blacklist is actually enabled here.

logWarning(errorMsg)
taskSet.abort(errorMsg)
throw new SparkException(errorMsg)
Expand Down