@@ -43,21 +43,26 @@ import org.apache.spark.rpc.RpcAddress
4343import scala .collection .mutable
4444
4545object CoarseCookSchedulerBackend {
46- def fetchUri (uri : String ): String =
47- Option (URI .create(uri).getScheme).map(_.toLowerCase) match {
48- case Some (" http" ) => s " curl -O $uri"
49- case Some (" spark-rsync" ) =>
50- val regex = " ^spark-rsync://" .r
51- val cleanURI = regex.replaceFirstIn(uri, " " )
52- " RSYNC_CONNECT_PROG=" + " \" " + " knc spark-rsync@%H $SPARK_DRIVER_PULL_PORT" + " \" " +
53- s " rsync $$ SPARK_DRIVER_PULL_HOST::spark/ ${cleanURI} ./ "
54- case Some (" hdfs" ) => s " $$ HADOOP_COMMAND fs -copyToLocal $uri . "
55- case Some (" rover" ) =>
56- val storage = " /opt/ts/services/storage-client.ts_storage_client/bin/storage"
57- s " $storage -X-Dtwosigma.logdir= $$ {MESOS_SANDBOX} cp $uri . "
58- case None | Some (" file" ) => s " cp $uri . "
59- case Some (x) => sys.error(s " $x not supported yet " )
60- }
46+
47+ // A collection of regexes for extracting information from an URI
48+ private val HTTP_URI_REGEX = """ http://(.*)""" .r
49+ private val RSYNC_URI_REGEX = """ rsync://(.*)""" .r
50+ private val SPARK_RSYNC_URI_REGEX = """ spark-rsync://(.*)""" .r
51+ private val HDFS_URI_REGEX = """ hdfs://(.*)""" .r
52+
53+ private [spark] def fetchURI (uri : String ): String = uri.toLowerCase match {
54+ case HTTP_URI_REGEX (httpURI) =>
55+ s " curl -O http:// $httpURI"
56+ case RSYNC_URI_REGEX (file) =>
57+ s " rsync $file ./ "
58+ case SPARK_RSYNC_URI_REGEX (file) =>
59+ " RSYNC_CONNECT_PROG=\" knc spark-rsync@%H $SPARK_DRIVER_PULL_PORT\" " +
60+ s " rsync $$ SPARK_DRIVER_PULL_HOST::spark/ $file ./ "
61+ case HDFS_URI_REGEX (file) =>
62+ s " $$ HADOOP_COMMAND fs -copyToLocal hdfs:// $file . "
63+ case _ =>
64+ sys.error(s " $uri not supported yet " )
65+ }
6166
6267 def apply (scheduler : TaskSchedulerImpl , sc : SparkContext , cookHost : String ,
6368 cookPort : Int ): CoarseGrainedSchedulerBackend = {
@@ -176,7 +181,7 @@ class CoarseCookSchedulerBackend(
176181 override def applicationAttemptId (): Option [String ] = Some (applicationId())
177182
178183 def createJob (numCores : Double ): Job = {
179- import CoarseCookSchedulerBackend .fetchUri
184+ import CoarseCookSchedulerBackend .fetchURI
180185
181186 val jobId = UUID .randomUUID()
182187 executorUUIDWriter(jobId)
@@ -215,20 +220,20 @@ class CoarseCookSchedulerBackend(
215220
216221 val keystoreUri = conf.getOption(" spark.executor.keyStoreFilename" )
217222 val keystorePull = keystoreUri.map { uri =>
218- s " ${fetchUri (uri)} && mv $$ (basename $uri) spark-executor-keystore "
223+ s " ${fetchURI (uri)} && mv $$ (basename $uri) spark-executor-keystore "
219224 }
220225
221226 val urisCommand =
222227 uriValues.map { uri =>
223- s " [ ! -e $$ (basename $uri) ] && ${fetchUri (uri)} && tar -xvzf $$ (basename $uri) " +
228+ s " [ ! -e $$ (basename $uri) ] && ${fetchURI (uri)} && tar -xvzf $$ (basename $uri) " +
224229 " || (echo \" ERROR FETCHING\" && exit 1)"
225230 }
226231
227232 val shippedTarballs : Seq [String ] = conf.getOption(" spark.cook.shippedTarballs" )
228233 .fold(Seq [String ]()){ tgz => tgz.split(" ," ).map(_.trim).toList }
229234
230235 val shippedTarballsCommand = shippedTarballs.map { uri =>
231- s " [ ! -e $$ (basename $uri) ] && ${fetchUri (uri)} && tar -xvzf $$ (basename $uri) "
236+ s " [ ! -e $$ (basename $uri) ] && ${fetchURI (uri)} && tar -xvzf $$ (basename $uri) "
232237 }
233238
234239 logDebug(s " command: $commandString" )
@@ -244,7 +249,7 @@ class CoarseCookSchedulerBackend(
244249 val remoteConfFetch = if (remoteHdfsConf.nonEmpty) {
245250 val name = Paths .get(remoteHdfsConf).getFileName
246251 Seq (
247- fetchUri (remoteHdfsConf),
252+ fetchURI (remoteHdfsConf),
248253 " mkdir HADOOP_CONF_DIR" ,
249254 s " tar --strip-components=1 -xvzf $name -C HADOOP_CONF_DIR " ,
250255 // This must be absolute because we cd into the spark directory
0 commit comments