@@ -19,14 +19,13 @@ package org.apache.spark
1919
2020import java .io ._
2121import java .net .URI
22- import java .util .{Properties , UUID }
2322import java .util .concurrent .atomic .AtomicInteger
24-
23+ import java .util .{Properties , UUID }
24+ import java .util .UUID .randomUUID
2525import scala .collection .{Map , Set }
2626import scala .collection .generic .Growable
2727import scala .collection .mutable .{ArrayBuffer , HashMap }
2828import scala .reflect .{ClassTag , classTag }
29-
3029import org .apache .hadoop .conf .Configuration
3130import org .apache .hadoop .fs .Path
3231import org .apache .hadoop .io .{ArrayWritable , BooleanWritable , BytesWritable , DoubleWritable , FloatWritable , IntWritable , LongWritable , NullWritable , Text , Writable }
@@ -37,6 +36,7 @@ import org.apache.mesos.MesosNativeLibrary
3736
3837import org .apache .spark .broadcast .Broadcast
3938import org .apache .spark .deploy .{LocalSparkCluster , SparkHadoopUtil }
39+ import org .apache .spark .input .WholeTextFileInputFormat
4040import org .apache .spark .partial .{ApproximateEvaluator , PartialResult }
4141import org .apache .spark .rdd ._
4242import org .apache .spark .scheduler ._
@@ -129,6 +129,11 @@ class SparkContext(
129129 val master = conf.get(" spark.master" )
130130 val appName = conf.get(" spark.app.name" )
131131
132+ // Generate the random name for a temp folder in Tachyon
133+ // Add a timestamp as the suffix here to make it more safe
134+ val tachyonFolderName = " spark-" + randomUUID.toString()
135+ conf.set(" spark.tachyonStore.folderName" , tachyonFolderName)
136+
132137 val isLocal = (master == " local" || master.startsWith(" local[" ))
133138
134139 if (master == " yarn-client" ) System .setProperty(" SPARK_YARN_MODE" , " true" )
@@ -378,6 +383,39 @@ class SparkContext(
378383 minSplits).map(pair => pair._2.toString)
379384 }
380385
386+ /**
387+ * Read a directory of text files from HDFS, a local file system (available on all nodes), or any
388+ * Hadoop-supported file system URI. Each file is read as a single record and returned in a
389+ * key-value pair, where the key is the path of each file, the value is the content of each file.
390+ *
391+ * <p> For example, if you have the following files:
392+ * {{{
393+ * hdfs://a-hdfs-path/part-00000
394+ * hdfs://a-hdfs-path/part-00001
395+ * ...
396+ * hdfs://a-hdfs-path/part-nnnnn
397+ * }}}
398+ *
399+ * Do `val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path")`,
400+ *
401+ * <p> then `rdd` contains
402+ * {{{
403+ * (a-hdfs-path/part-00000, its content)
404+ * (a-hdfs-path/part-00001, its content)
405+ * ...
406+ * (a-hdfs-path/part-nnnnn, its content)
407+ * }}}
408+ *
409+ * @note Small files are preferred, as each file will be loaded fully in memory.
410+ */
411+ def wholeTextFiles (path : String ): RDD [(String , String )] = {
412+ newAPIHadoopFile(
413+ path,
414+ classOf [WholeTextFileInputFormat ],
415+ classOf [String ],
416+ classOf [String ])
417+ }
418+
381419 /**
382420 * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other
383421 * necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),
@@ -704,10 +742,6 @@ class SparkContext(
704742 */
705743 def getPersistentRDDs : Map [Int , RDD [_]] = persistentRdds.toMap
706744
707- def getStageInfo : Map [Stage , StageInfo ] = {
708- dagScheduler.stageToInfos
709- }
710-
711745 /**
712746 * Return information about blocks stored in all of the slaves
713747 */
@@ -1262,8 +1296,8 @@ object SparkContext extends Logging {
12621296
12631297 /** Creates a task scheduler based on a given master URL. Extracted for testing. */
12641298 private def createTaskScheduler (sc : SparkContext , master : String ): TaskScheduler = {
1265- // Regular expression used for local[N] master format
1266- val LOCAL_N_REGEX = """ local\[([0-9]+)\]""" .r
1299+ // Regular expression used for local[N] and local[*] master formats
1300+ val LOCAL_N_REGEX = """ local\[([0-9\* ]+)\]""" .r
12671301 // Regular expression for local[N, maxRetries], used in tests with failing tasks
12681302 val LOCAL_N_FAILURES_REGEX = """ local\[([0-9]+)\s*,\s*([0-9]+)\]""" .r
12691303 // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
@@ -1286,8 +1320,11 @@ object SparkContext extends Logging {
12861320 scheduler
12871321
12881322 case LOCAL_N_REGEX (threads) =>
1323+ def localCpuCount = Runtime .getRuntime.availableProcessors()
1324+ // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
1325+ val threadCount = if (threads == " *" ) localCpuCount else threads.toInt
12891326 val scheduler = new TaskSchedulerImpl (sc, MAX_LOCAL_TASK_FAILURES , isLocal = true )
1290- val backend = new LocalBackend (scheduler, threads.toInt )
1327+ val backend = new LocalBackend (scheduler, threadCount )
12911328 scheduler.initialize(backend)
12921329 scheduler
12931330
0 commit comments