@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicInteger
2424
2525import scala .collection .Map
2626import scala .collection .generic .Growable
27- import scala .collection .JavaConverters ._
2827import scala .collection .mutable .ArrayBuffer
2928import scala .collection .mutable .HashMap
3029
@@ -81,7 +80,7 @@ class SparkContext(
8180 val sparkHome : String = null ,
8281 val jars : Seq [String ] = Nil ,
8382 val environment : Map [String , String ] = Map (),
84- // This is used only by yarn for now, but should be relevant to other cluster types (mesos , etc)
83+ // This is used only by YARN for now, but should be relevant to other cluster types (Mesos , etc)
8584 // too. This is typically generated from InputFormatInfo.computePreferredLocations .. host, set
8685 // of data-local splits on host
8786 val preferredNodeLocationData : scala.collection.Map [String , scala.collection.Set [SplitInfo ]] =
@@ -153,110 +152,11 @@ class SparkContext(
153152 executorEnvs(" SPARK_USER" ) = sparkUser
154153
155154 // Create and start the scheduler
156- private [spark] var taskScheduler : TaskScheduler = {
157- // Regular expression used for local[N] master format
158- val LOCAL_N_REGEX = """ local\[([0-9]+)\]""" .r
159- // Regular expression for local[N, maxRetries], used in tests with failing tasks
160- val LOCAL_N_FAILURES_REGEX = """ local\[([0-9]+)\s*,\s*([0-9]+)\]""" .r
161- // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
162- val LOCAL_CLUSTER_REGEX = """ local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""" .r
163- // Regular expression for connecting to Spark deploy clusters
164- val SPARK_REGEX = """ spark://(.*)""" .r
165- // Regular expression for connection to Mesos cluster
166- val MESOS_REGEX = """ mesos://(.*)""" .r
167- // Regular expression for connection to Simr cluster
168- val SIMR_REGEX = """ simr://(.*)""" .r
169-
170- // When running locally, don't try to re-execute tasks on failure.
171- val MAX_LOCAL_TASK_FAILURES = 0
172-
173- master match {
174- case " local" =>
175- val scheduler = new ClusterScheduler (this , MAX_LOCAL_TASK_FAILURES , isLocal = true )
176- val backend = new LocalBackend (scheduler, 1 )
177- scheduler.initialize(backend)
178- scheduler
179-
180- case LOCAL_N_REGEX (threads) =>
181- val scheduler = new ClusterScheduler (this , MAX_LOCAL_TASK_FAILURES , isLocal = true )
182- val backend = new LocalBackend (scheduler, threads.toInt)
183- scheduler.initialize(backend)
184- scheduler
185-
186- case LOCAL_N_FAILURES_REGEX (threads, maxFailures) =>
187- val scheduler = new ClusterScheduler (this , maxFailures.toInt, isLocal = true )
188- val backend = new LocalBackend (scheduler, threads.toInt)
189- scheduler.initialize(backend)
190- scheduler
191-
192- case SPARK_REGEX (sparkUrl) =>
193- val scheduler = new ClusterScheduler (this )
194- val masterUrls = sparkUrl.split(" ," ).map(" spark://" + _)
195- val backend = new SparkDeploySchedulerBackend (scheduler, this , masterUrls, appName)
196- scheduler.initialize(backend)
197- scheduler
198-
199- case SIMR_REGEX (simrUrl) =>
200- val scheduler = new ClusterScheduler (this )
201- val backend = new SimrSchedulerBackend (scheduler, this , simrUrl)
202- scheduler.initialize(backend)
203- scheduler
204-
205- case LOCAL_CLUSTER_REGEX (numSlaves, coresPerSlave, memoryPerSlave) =>
206- // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
207- val memoryPerSlaveInt = memoryPerSlave.toInt
208- if (SparkContext .executorMemoryRequested > memoryPerSlaveInt) {
209- throw new SparkException (
210- " Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker" .format(
211- memoryPerSlaveInt, SparkContext .executorMemoryRequested))
212- }
213-
214- val scheduler = new ClusterScheduler (this )
215- val localCluster = new LocalSparkCluster (
216- numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
217- val masterUrls = localCluster.start()
218- val backend = new SparkDeploySchedulerBackend (scheduler, this , masterUrls, appName)
219- scheduler.initialize(backend)
220- backend.shutdownCallback = (backend : SparkDeploySchedulerBackend ) => {
221- localCluster.stop()
222- }
223- scheduler
224-
225- case " yarn-standalone" =>
226- val scheduler = try {
227- val clazz = Class .forName(" org.apache.spark.scheduler.cluster.YarnClusterScheduler" )
228- val cons = clazz.getConstructor(classOf [SparkContext ])
229- cons.newInstance(this ).asInstanceOf [ClusterScheduler ]
230- } catch {
231- // TODO: Enumerate the exact reasons why it can fail
232- // But irrespective of it, it means we cannot proceed !
233- case th : Throwable => {
234- throw new SparkException (" YARN mode not available ?" , th)
235- }
236- }
237- val backend = new CoarseGrainedSchedulerBackend (scheduler, this .env.actorSystem)
238- scheduler.initialize(backend)
239- scheduler
240-
241- case MESOS_REGEX (mesosUrl) =>
242- MesosNativeLibrary .load()
243- val scheduler = new ClusterScheduler (this )
244- val coarseGrained = System .getProperty(" spark.mesos.coarse" , " false" ).toBoolean
245- val backend = if (coarseGrained) {
246- new CoarseMesosSchedulerBackend (scheduler, this , mesosUrl, appName)
247- } else {
248- new MesosSchedulerBackend (scheduler, this , mesosUrl, appName)
249- }
250- scheduler.initialize(backend)
251- scheduler
252-
253- case _ =>
254- throw new SparkException (" Could not parse Master URL: '" + master + " '" )
255- }
256- }
155+ private [spark] var taskScheduler = SparkContext .createTaskScheduler(this , master, appName)
257156 taskScheduler.start()
258157
259158 @ volatile private [spark] var dagScheduler = new DAGScheduler (taskScheduler)
159+ dagScheduler.start()
260160
261161 ui.start()
262162
@@ -1121,6 +1021,136 @@ object SparkContext {
11211021 .map(Utils .memoryStringToMb)
11221022 .getOrElse(512 )
11231023 }
1024+
1025+ // Creates a task scheduler based on a given master URL. Extracted for testing.
1026+ private
1027+ def createTaskScheduler (sc : SparkContext , master : String , appName : String ): TaskScheduler = {
1028+ // Regular expression used for local[N] master format
1029+ val LOCAL_N_REGEX = """ local\[([0-9]+)\]""" .r
1030+ // Regular expression for local[N, maxRetries], used in tests with failing tasks
1031+ val LOCAL_N_FAILURES_REGEX = """ local\[([0-9]+)\s*,\s*([0-9]+)\]""" .r
1032+ // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
1033+ val LOCAL_CLUSTER_REGEX = """ local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""" .r
1034+ // Regular expression for connecting to Spark deploy clusters
1035+ val SPARK_REGEX = """ spark://(.*)""" .r
1036+ // Regular expression for connection to Mesos cluster by mesos:// or zk:// url
1037+ val MESOS_REGEX = """ (mesos|zk)://.*""" .r
1038+ // Regular expression for connection to Simr cluster
1039+ val SIMR_REGEX = """ simr://(.*)""" .r
1040+
1041+ // When running locally, don't try to re-execute tasks on failure.
1042+ val MAX_LOCAL_TASK_FAILURES = 0
1043+
1044+ master match {
1045+ case " local" =>
1046+ val scheduler = new ClusterScheduler (sc, MAX_LOCAL_TASK_FAILURES , isLocal = true )
1047+ val backend = new LocalBackend (scheduler, 1 )
1048+ scheduler.initialize(backend)
1049+ scheduler
1050+
1051+ case LOCAL_N_REGEX (threads) =>
1052+ val scheduler = new ClusterScheduler (sc, MAX_LOCAL_TASK_FAILURES , isLocal = true )
1053+ val backend = new LocalBackend (scheduler, threads.toInt)
1054+ scheduler.initialize(backend)
1055+ scheduler
1056+
1057+ case LOCAL_N_FAILURES_REGEX (threads, maxFailures) =>
1058+ val scheduler = new ClusterScheduler (sc, maxFailures.toInt, isLocal = true )
1059+ val backend = new LocalBackend (scheduler, threads.toInt)
1060+ scheduler.initialize(backend)
1061+ scheduler
1062+
1063+ case SPARK_REGEX (sparkUrl) =>
1064+ val scheduler = new ClusterScheduler (sc)
1065+ val masterUrls = sparkUrl.split(" ," ).map(" spark://" + _)
1066+ val backend = new SparkDeploySchedulerBackend (scheduler, sc, masterUrls, appName)
1067+ scheduler.initialize(backend)
1068+ scheduler
1069+
1070+ case LOCAL_CLUSTER_REGEX (numSlaves, coresPerSlave, memoryPerSlave) =>
1071+ // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
1072+ val memoryPerSlaveInt = memoryPerSlave.toInt
1073+ if (SparkContext .executorMemoryRequested > memoryPerSlaveInt) {
1074+ throw new SparkException (
1075+ " Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker" .format(
1076+ memoryPerSlaveInt, SparkContext .executorMemoryRequested))
1077+ }
1078+
1079+ val scheduler = new ClusterScheduler (sc)
1080+ val localCluster = new LocalSparkCluster (
1081+ numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
1082+ val masterUrls = localCluster.start()
1083+ val backend = new SparkDeploySchedulerBackend (scheduler, sc, masterUrls, appName)
1084+ scheduler.initialize(backend)
1085+ backend.shutdownCallback = (backend : SparkDeploySchedulerBackend ) => {
1086+ localCluster.stop()
1087+ }
1088+ scheduler
1089+
1090+ case " yarn-standalone" =>
1091+ val scheduler = try {
1092+ val clazz = Class .forName(" org.apache.spark.scheduler.cluster.YarnClusterScheduler" )
1093+ val cons = clazz.getConstructor(classOf [SparkContext ])
1094+ cons.newInstance(sc).asInstanceOf [ClusterScheduler ]
1095+ } catch {
1096+ // TODO: Enumerate the exact reasons why it can fail
1097+ // But irrespective of it, it means we cannot proceed !
1098+ case th : Throwable => {
1099+ throw new SparkException (" YARN mode not available ?" , th)
1100+ }
1101+ }
1102+ val backend = new CoarseGrainedSchedulerBackend (scheduler, sc.env.actorSystem)
1103+ scheduler.initialize(backend)
1104+ scheduler
1105+
1106+ case " yarn-client" =>
1107+ val scheduler = try {
1108+ val clazz = Class .forName(" org.apache.spark.scheduler.cluster.YarnClientClusterScheduler" )
1109+ val cons = clazz.getConstructor(classOf [SparkContext ])
1110+ cons.newInstance(sc).asInstanceOf [ClusterScheduler ]
1111+
1112+ } catch {
1113+ case th : Throwable => {
1114+ throw new SparkException (" YARN mode not available ?" , th)
1115+ }
1116+ }
1117+
1118+ val backend = try {
1119+ val clazz = Class .forName(" org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend" )
1120+ val cons = clazz.getConstructor(classOf [ClusterScheduler ], classOf [SparkContext ])
1121+ cons.newInstance(scheduler, sc).asInstanceOf [CoarseGrainedSchedulerBackend ]
1122+ } catch {
1123+ case th : Throwable => {
1124+ throw new SparkException (" YARN mode not available ?" , th)
1125+ }
1126+ }
1127+
1128+ scheduler.initialize(backend)
1129+ scheduler
1130+
1131+ case mesosUrl @ MESOS_REGEX (_) =>
1132+ MesosNativeLibrary .load()
1133+ val scheduler = new ClusterScheduler (sc)
1134+ val coarseGrained = System .getProperty(" spark.mesos.coarse" , " false" ).toBoolean
1135+ val url = mesosUrl.stripPrefix(" mesos://" ) // strip scheme from raw Mesos URLs
1136+ val backend = if (coarseGrained) {
1137+ new CoarseMesosSchedulerBackend (scheduler, sc, url, appName)
1138+ } else {
1139+ new MesosSchedulerBackend (scheduler, sc, url, appName)
1140+ }
1141+ scheduler.initialize(backend)
1142+ scheduler
1143+
1144+ case SIMR_REGEX (simrUrl) =>
1145+ val scheduler = new ClusterScheduler (sc)
1146+ val backend = new SimrSchedulerBackend (scheduler, sc, simrUrl)
1147+ scheduler.initialize(backend)
1148+ scheduler
1149+
1150+ case _ =>
1151+ throw new SparkException (" Could not parse Master URL: '" + master + " '" )
1152+ }
1153+ }
11241154}
11251155
11261156/**
0 commit comments