Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1602,6 +1602,15 @@ class SparkContext(config: SparkConf) extends Logging {
}
}

/**
* Get the max number of tasks that can be concurrent launched currently.
Copy link
Member

@kiszk kiszk Aug 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about like this?

 * Get the max number of tasks that can be concurrently launched when the method is called.
 * Note that please don't cache the value returned by this method, because the number can be
 * changed by adding/removing executors. 

* Note that please don't cache the value returned by this method, because the number can change
* due to add/remove executors.
*
* @return The max number of tasks that can be concurrent launched currently.
*/
private[spark] def maxNumConcurrentTasks(): Int = schedulerBackend.maxNumConcurrentTasks()

/**
* Update the cluster manager on our scheduling needs. Three bits of information are included
* to help it make decisions.
Expand Down
27 changes: 27 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -577,4 +577,31 @@ package object config {
.timeConf(TimeUnit.SECONDS)
.checkValue(v => v > 0, "The value should be a positive time value.")
.createWithDefaultString("365d")

private[spark] val BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL =
ConfigBuilder("spark.scheduler.barrier.maxConcurrentTasksCheck.interval")
.doc("Time in seconds to wait between a max concurrent tasks check failure and the next " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a max -> max?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"a ... failure"

"check. A max concurrent tasks check ensures the cluster can launch more concurrent " +
"tasks than required by a barrier stage on job submitted. The check can fail in case " +
"a cluster has just started and not enough executors have registered, so we wait for a " +
"little while and try to perform the check again. If the check fails more than a " +
"configured max failure times for a job then fail current job submission. Note this " +
"config only applies to jobs that contain one or more barrier stages, we won't perform " +
"the check on non-barrier jobs.")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("15s")

private[spark] val BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES =
ConfigBuilder("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures")
.doc("Number of max concurrent tasks check failures allowed before fail a job submission. " +
"A max concurrent tasks check ensures the cluster can launch more concurrent tasks than " +
"required by a barrier stage on job submitted. The check can fail in case a cluster " +
"has just started and not enough executors have registered, so we wait for a little " +
"while and try to perform the check again. If the check fails more than a configured " +
"max failure times for a job then fail current job submission. Note this config only " +
"applies to jobs that contain one or more barrier stages, we won't perform the check on " +
"non-barrier jobs.")
.intConf
.checkValue(v => v > 0, "The max failures should be a positive value.")
.createWithDefault(40)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import org.apache.spark.SparkException

/**
* Exception thrown when submit a job with barrier stage(s) failing a required check.
*/
private[spark] class BarrierJobAllocationFailed(message: String) extends SparkException(message)

private[spark] class BarrierJobUnsupportedRDDChainException
extends BarrierJobAllocationFailed(
BarrierJobAllocationFailed.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)

private[spark] class BarrierJobRunWithDynamicAllocationException
extends BarrierJobAllocationFailed(
BarrierJobAllocationFailed.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION)

private[spark] class BarrierJobSlotsNumberCheckFailed
extends BarrierJobAllocationFailed(
BarrierJobAllocationFailed.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER)

private[spark] object BarrierJobAllocationFailed {

// Error message when running a barrier stage that have unsupported RDD chain pattern.
val ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN =
"[SPARK-24820][SPARK-24821]: Barrier execution mode does not allow the following pattern of " +
"RDD chain within a barrier stage:\n1. Ancestor RDDs that have different number of " +
"partitions from the resulting RDD (eg. union()/coalesce()/first()/take()/" +
"PartitionPruningRDD). A workaround for first()/take() can be barrierRdd.collect().head " +
"(scala) or barrierRdd.collect()[0] (python).\n" +
"2. An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2))."

// Error message when running a barrier stage with dynamic resource allocation enabled.
val ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION =
"[SPARK-24942]: Barrier execution mode does not support dynamic resource allocation for " +
"now. You can disable dynamic resource allocation by setting Spark conf " +
"\"spark.dynamicAllocation.enabled\" to \"false\"."

// Error message when running a barrier stage that requires more slots than current total number.
val ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER =
"[SPARK-24819]: Barrier execution mode does not allow run a barrier stage that requires " +
"more slots than the total number of slots in the cluster currently. Please init a new " +
"cluster with more CPU cores or repartition the input RDD(s) to reduce the number of " +
"slots required to run this barrier stage."
}
88 changes: 66 additions & 22 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ package org.apache.spark.scheduler

import java.io.NotSerializableException
import java.util.Properties
import java.util.concurrent.TimeUnit
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
import java.util.function.BiFunction

import scala.annotation.tailrec
import scala.collection.Map
Expand All @@ -39,7 +40,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.{PartitionPruningRDD, RDD, RDDCheckpointData}
import org.apache.spark.rdd.{RDD, RDDCheckpointData}
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.storage._
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
Expand Down Expand Up @@ -111,8 +112,7 @@ import org.apache.spark.util._
* - When adding a new data structure, update `DAGSchedulerSuite.assertDataStructuresEmpty` to
* include the new structure. This will help to catch memory leaks.
*/
private[spark]
class DAGScheduler(
private[spark] class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
Expand Down Expand Up @@ -203,6 +203,24 @@ class DAGScheduler(
sc.getConf.getInt("spark.stage.maxConsecutiveAttempts",
DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS)

/**
* Number of max concurrent tasks check failures for each barrier job.
*/
private[scheduler] val barrierJobIdToNumTasksCheckFailures = new ConcurrentHashMap[Int, Int]

/**
* Time in seconds to wait between a max concurrent tasks check failure and the next check.
*/
private val timeIntervalNumTasksCheck = sc.getConf
.get(config.BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL)

/**
* Max number of max concurrent tasks check failures allowed for a job before fail the job
* submission.
*/
private val maxFailureNumTasksCheck = sc.getConf
.get(config.BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES)

private val messageScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message")

Expand Down Expand Up @@ -351,8 +369,7 @@ class DAGScheduler(
val predicate: RDD[_] => Boolean = (r =>
r.getNumPartitions == numTasksInStage && r.dependencies.filter(_.rdd.isBarrier()).size <= 1)
if (rdd.isBarrier() && !traverseParentRDDsWithinStage(rdd, predicate)) {
throw new SparkException(
DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
throw new BarrierJobUnsupportedRDDChainException
}
}

Expand All @@ -365,6 +382,7 @@ class DAGScheduler(
def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd)
checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions)
val numTasks = rdd.partitions.length
val parents = getOrCreateParentStages(rdd, jobId)
Expand Down Expand Up @@ -398,7 +416,20 @@ class DAGScheduler(
*/
private def checkBarrierStageWithDynamicAllocation(rdd: RDD[_]): Unit = {
if (rdd.isBarrier() && Utils.isDynamicAllocationEnabled(sc.getConf)) {
throw new SparkException(DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION)
throw new BarrierJobRunWithDynamicAllocationException
}
}

/**
* Check whether the barrier stage requires more slots (to be able to launch all tasks in the
* barrier stage together) than the total number of active slots currently. Fail current check
* if trying to submit a barrier stage that requires more slots than current total number. If
* the check fails consecutively beyond a configured number for a job, then fail current job
* submission.
*/
private def checkBarrierStageWithNumSlots(rdd: RDD[_]): Unit = {
if (rdd.isBarrier() && rdd.getNumPartitions > sc.maxNumConcurrentTasks) {
throw new BarrierJobSlotsNumberCheckFailed
}
}

Expand All @@ -412,6 +443,7 @@ class DAGScheduler(
jobId: Int,
callSite: CallSite): ResultStage = {
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd)
checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
Expand Down Expand Up @@ -929,11 +961,38 @@ class DAGScheduler(
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: BarrierJobSlotsNumberCheckFailed =>
logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " +
"than the total number of slots in the cluster currently.")
// If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically.
val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,
new BiFunction[Int, Int, Int] {
override def apply(key: Int, value: Int): Int = value + 1
})
if (numCheckFailures <= maxFailureNumTasksCheck) {
messageScheduler.schedule(
new Runnable {
override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
partitions, callSite, listener, properties))
},
timeIntervalNumTasksCheck,
TimeUnit.SECONDS
)
return
} else {
// Job failed, clear internal data.
barrierJobIdToNumTasksCheckFailures.remove(jobId)
listener.jobFailed(e)
return
}

case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
// Job submitted, clear internal data.
barrierJobIdToNumTasksCheckFailures.remove(jobId)

val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
Expand Down Expand Up @@ -2011,19 +2070,4 @@ private[spark] object DAGScheduler {

// Number of consecutive stage attempts allowed before a stage is aborted
val DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS = 4

// Error message when running a barrier stage that have unsupported RDD chain pattern.
val ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN =
"[SPARK-24820][SPARK-24821]: Barrier execution mode does not allow the following pattern of " +
"RDD chain within a barrier stage:\n1. Ancestor RDDs that have different number of " +
"partitions from the resulting RDD (eg. union()/coalesce()/first()/take()/" +
"PartitionPruningRDD). A workaround for first()/take() can be barrierRdd.collect().head " +
"(scala) or barrierRdd.collect()[0] (python).\n" +
"2. An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2))."

// Error message when running a barrier stage with dynamic resource allocation enabled.
val ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION =
"[SPARK-24942]: Barrier execution mode does not support dynamic resource allocation for " +
"now. You can disable dynamic resource allocation by setting Spark conf " +
"\"spark.dynamicAllocation.enabled\" to \"false\"."
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,13 @@ private[spark] trait SchedulerBackend {
*/
def getDriverLogUrls: Option[Map[String, String]] = None

/**
* Get the max number of tasks that can be concurrent launched currently.
* Note that please don't cache the value returned by this method, because the number can change
* due to add/remove executors.
*
* @return The max number of tasks that can be concurrent launched currently.
*/
def maxNumConcurrentTasks(): Int

}
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
executorDataMap.keySet.toSeq
}

override def maxNumConcurrentTasks(): Int = {
executorDataMap.values.map { executor =>
executor.totalCores / scheduler.CPUS_PER_TASK
}.sum
}

/**
* Request an additional number of executors from the cluster manager.
* @return whether the request is acknowledged.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ private[spark] class LocalSchedulerBackend(

override def applicationId(): String = appId

override def maxNumConcurrentTasks(): Int = totalCores / scheduler.CPUS_PER_TASK

private def stop(finalState: SparkAppHandle.State): Unit = {
localEndpoint.ask(StopExecutor)
try {
Expand Down
Loading