Skip to content
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1602,6 +1602,15 @@ class SparkContext(config: SparkConf) extends Logging {
}
}

/**
* Get the max number of tasks that can be concurrent launched currently.
Copy link
Member

@kiszk kiszk Aug 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about like this?

 * Get the max number of tasks that can be concurrently launched when the method is called.
 * Note that please don't cache the value returned by this method, because the number can be
 * changed by adding/removing executors. 

* Note that please don't cache the value returned by this method, because the number can change
* due to add/remove executors.
*
* @return The max number of tasks that can be concurrent launched currently.
*/
private[spark] def maxNumConcurrentTasks(): Int = schedulerBackend.maxNumConcurrentTasks()

/**
* Update the cluster manager on our scheduling needs. Three bits of information are included
* to help it make decisions.
Expand Down
27 changes: 27 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -577,4 +577,31 @@ package object config {
.timeConf(TimeUnit.SECONDS)
.checkValue(v => v > 0, "The value should be a positive time value.")
.createWithDefaultString("365d")

private[spark] val BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL =
ConfigBuilder("spark.scheduler.barrier.maxConcurrentTasksCheck.interval")
.doc("Time in seconds to wait between a max concurrent tasks check failure and the next " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a max -> max?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"a ... failure"

"check. A max concurrent tasks check ensures the cluster can launch more concurrent " +
"tasks than required by a barrier stage on job submitted. The check can fail in case " +
"a cluster has just started and not enough executors have registered, so we wait for a " +
"little while and try to perform the check again. If the check fails more than a " +
"configured max failure times for a job then fail current job submission. Note this " +
"config only applies to jobs that contain one or more barrier stages, we won't perform " +
"the check on non-barrier jobs.")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("15s")

private[spark] val BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES =
ConfigBuilder("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures")
.doc("Number of max concurrent tasks check failures allowed before fail a job submission. " +
"A max concurrent tasks check ensures the cluster can launch more concurrent tasks than " +
"required by a barrier stage on job submitted. The check can fail in case a cluster " +
"has just started and not enough executors have registered, so we wait for a little " +
"while and try to perform the check again. If the check fails more than a configured " +
"max failure times for a job then fail current job submission. Note this config only " +
"applies to jobs that contain one or more barrier stages, we won't perform the check on " +
"non-barrier jobs.")
.intConf
.checkValue(v => v > 0, "The max failures should be a positive value.")
.createWithDefault(40)
}
72 changes: 70 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ package org.apache.spark.scheduler

import java.io.NotSerializableException
import java.util.Properties
import java.util.concurrent.TimeUnit
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
import java.util.function.BiFunction

import scala.annotation.tailrec
import scala.collection.Map
Expand All @@ -39,7 +40,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.{PartitionPruningRDD, RDD, RDDCheckpointData}
import org.apache.spark.rdd.{RDD, RDDCheckpointData}
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.storage._
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
Expand Down Expand Up @@ -203,6 +204,24 @@ class DAGScheduler(
sc.getConf.getInt("spark.stage.maxConsecutiveAttempts",
DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS)

/**
* Number of max concurrent tasks check failures for each job.
*/
private[scheduler] val jobIdToNumTasksCheckFailures = new ConcurrentHashMap[Int, Int]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do entries in this map get cleaned?


/**
* Time in seconds to wait between a max concurrent tasks check failure and the next check.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a max -> max?

*/
private val timeIntervalNumTasksCheck = sc.getConf
.get(config.BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL)

/**
* Max number of max concurrent tasks check failures allowed for a job before fail the job
* submission.
*/
private val maxFailureNumTasksCheck = sc.getConf
.get(config.BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES)

private val messageScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message")

Expand Down Expand Up @@ -365,6 +384,7 @@ class DAGScheduler(
def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd)
checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions)
val numTasks = rdd.partitions.length
val parents = getOrCreateParentStages(rdd, jobId)
Expand Down Expand Up @@ -402,6 +422,19 @@ class DAGScheduler(
}
}

/**
* Check whether the barrier stage requires more slots (to be able to launch all tasks in the
* barrier stage together) than the total number of active slots currently. Fail current check
* if trying to submit a barrier stage that requires more slots than current total number. If
* the check fails consecutively for three times for a job, then fail current job submission.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems I do not find the code about "consecutively for three times", but only maxFailureNumTasksCheck ?

*/
private def checkBarrierStageWithNumSlots(rdd: RDD[_]): Unit = {
if (rdd.isBarrier() && rdd.getNumPartitions > sc.maxNumConcurrentTasks) {
throw new SparkException(
Copy link
Contributor

@mengxr mengxr Aug 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should tolerate temporarily unavailability here by adding a wait or retry logic.

DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER)
}
}

/**
* Create a ResultStage associated with the provided jobId.
*/
Expand All @@ -412,6 +445,7 @@ class DAGScheduler(
jobId: Int,
callSite: CallSite): ResultStage = {
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd)
checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
Expand Down Expand Up @@ -929,11 +963,38 @@ class DAGScheduler(
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception if e.getMessage.contains(
DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER) =>
logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " +
"than the total number of slots in the cluster currently.")
jobIdToNumTasksCheckFailures.compute(jobId, new BiFunction[Int, Int, Int] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: Should have an inline comment that mentions the implicit conversation from null to 0: Int to handle new keys.

override def apply(key: Int, value: Int): Int = value + 1
})
val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: this is the return value from compute. we don't need get.

if (numCheckFailures <= maxFailureNumTasksCheck) {
messageScheduler.schedule(
new Runnable {
override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
partitions, callSite, listener, properties))
},
timeIntervalNumTasksCheck * 1000,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: how about removing 1000 and changing the time unit to SECONDS?

TimeUnit.MILLISECONDS
)
return
} else {
// Job failed, clear internal data.
jobIdToNumTasksCheckFailures.remove(jobId)
listener.jobFailed(e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you expect the same job submitted again? if not, we should remove the key from the hashmap.

return
}

case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
// Job submitted, clear internal data.
jobIdToNumTasksCheckFailures.remove(jobId)

val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
Expand Down Expand Up @@ -2026,4 +2087,11 @@ private[spark] object DAGScheduler {
"[SPARK-24942]: Barrier execution mode does not support dynamic resource allocation for " +
"now. You can disable dynamic resource allocation by setting Spark conf " +
"\"spark.dynamicAllocation.enabled\" to \"false\"."

// Error message when running a barrier stage that requires more slots than current total number.
val ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER =
"[SPARK-24819]: Barrier execution mode does not allow run a barrier stage that requires " +
"more slots than the total number of slots in the cluster currently. Please init a new " +
"cluster with more CPU cores or repartition the input RDD(s) to reduce the number of " +
"slots required to run this barrier stage."
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,13 @@ private[spark] trait SchedulerBackend {
*/
def getDriverLogUrls: Option[Map[String, String]] = None

/**
* Get the max number of tasks that can be concurrent launched currently.
* Note that please don't cache the value returned by this method, because the number can change
* due to add/remove executors.
*
* @return The max number of tasks that can be concurrent launched currently.
*/
def maxNumConcurrentTasks(): Int

}
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
executorDataMap.keySet.toSeq
}

override def maxNumConcurrentTasks(): Int = {
executorDataMap.values.map { executor =>
executor.totalCores / scheduler.CPUS_PER_TASK
}.sum
}

/**
* Request an additional number of executors from the cluster manager.
* @return whether the request is acknowledged.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ private[spark] class LocalSchedulerBackend(

override def applicationId(): String = appId

override def maxNumConcurrentTasks(): Int = totalCores / scheduler.CPUS_PER_TASK

private def stop(finalState: SparkAppHandle.State): Unit = {
localEndpoint.ask(StopExecutor)
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,4 +185,78 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext
testSubmitJob(sc, rdd,
message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a test verifies if total slots are good but some are running other jobs, we shouldn't fail the barrier job.

test("submit a barrier ResultStage that requires more slots than current total under local " +
"mode") {
val conf = new SparkConf()
// Shorten the time interval between two failed checks to make the test fail faster.
.set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s")
// Reduce max check failures allowed to make the test fail faster.
.set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3")
.setMaster("local[4]")
.setAppName("test")
sc = createSparkContext(Some(conf))
val rdd = sc.parallelize(1 to 10, 5)
.barrier()
.mapPartitions(iter => iter)
testSubmitJob(sc, rdd,
message = DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER)
}

test("submit a barrier ShuffleMapStage that requires more slots than current total under " +
"local mode") {
val conf = new SparkConf()
// Shorten the time interval between two failed checks to make the test fail faster.
.set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s")
// Reduce max check failures allowed to make the test fail faster.
.set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3")
.setMaster("local[4]")
.setAppName("test")
sc = createSparkContext(Some(conf))
val rdd = sc.parallelize(1 to 10, 5)
.barrier()
.mapPartitions(iter => iter)
.repartition(2)
.map(x => x + 1)
testSubmitJob(sc, rdd,
message = DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER)
}

test("submit a barrier ResultStage that requires more slots than current total under " +
"local-cluster mode") {
val conf = new SparkConf()
.set("spark.task.cpus", "2")
// Shorten the time interval between two failed checks to make the test fail faster.
.set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s")
// Reduce max check failures allowed to make the test fail faster.
.set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3")
.setMaster("local-cluster[4, 3, 1024]")
.setAppName("test")
sc = createSparkContext(Some(conf))
val rdd = sc.parallelize(1 to 10, 5)
.barrier()
.mapPartitions(iter => iter)
testSubmitJob(sc, rdd,
message = DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER)
}

test("submit a barrier ShuffleMapStage that requires more slots than current total under " +
"local-cluster mode") {
val conf = new SparkConf()
.set("spark.task.cpus", "2")
// Shorten the time interval between two failed checks to make the test fail faster.
.set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s")
// Reduce max check failures allowed to make the test fail faster.
.set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3")
.setMaster("local-cluster[4, 3, 1024]")
.setAppName("test")
sc = createSparkContext(Some(conf))
val rdd = sc.parallelize(1 to 10, 5)
.barrier()
.mapPartitions(iter => iter)
.repartition(2)
.map(x => x + 1)
testSubmitJob(sc, rdd,
message = DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1376,6 +1376,8 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend

override def defaultParallelism(): Int = sb.defaultParallelism()

override def maxNumConcurrentTasks(): Int = sb.maxNumConcurrentTasks()

override def killExecutorsOnHost(host: String): Boolean = {
false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
.setMaster("local-cluster[3, 1, 1024]")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a unit test for getNumSlots.

.setAppName("test-cluster")
sc = new SparkContext(conf)

val rdd = sc.makeRDD(Seq(1, 2, 3, 4), 2)
val rdd2 = rdd.barrier().mapPartitions { it =>
val context = BarrierTaskContext.get()
Expand Down
Loading