Skip to content

Commit 72b7768

Browse files
committed
merge master
2 parents 9f7fa1b + 60e18ce commit 72b7768

File tree

19 files changed

+239
-48
lines changed

19 files changed

+239
-48
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ class SparkContext(
399399
* (a-hdfs-path/part-nnnnn, its content)
400400
* }}}
401401
*
402-
* @note Small files are perferred, large file is also allowable, but may cause bad performance.
402+
* @note Small files are preferred, as each file will be loaded fully in memory.
403403
*/
404404
def wholeTextFiles(path: String): RDD[(String, String)] = {
405405
newAPIHadoopFile(

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
177177
* (a-hdfs-path/part-nnnnn, its content)
178178
* }}}
179179
*
180-
* @note Small files are perferred, large file is also allowable, but may cause bad performance.
180+
* @note Small files are preferred, as each file will be loaded fully in memory.
181181
*/
182182
def wholeTextFiles(path: String): JavaPairRDD[String, String] =
183183
new JavaPairRDD(sc.wholeTextFiles(path))

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.api.python
1919

2020
import java.io._
2121
import java.net._
22+
import java.nio.charset.Charset
2223
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
2324

2425
import scala.collection.JavaConversions._
@@ -206,6 +207,7 @@ private object SpecialLengths {
206207
}
207208

208209
private[spark] object PythonRDD {
210+
val UTF8 = Charset.forName("UTF-8")
209211

210212
def readRDDFromFile(sc: JavaSparkContext, filename: String, parallelism: Int):
211213
JavaRDD[Array[Byte]] = {
@@ -266,7 +268,7 @@ private[spark] object PythonRDD {
266268
}
267269

268270
def writeUTF(str: String, dataOut: DataOutputStream) {
269-
val bytes = str.getBytes("UTF-8")
271+
val bytes = str.getBytes(UTF8)
270272
dataOut.writeInt(bytes.length)
271273
dataOut.write(bytes)
272274
}
@@ -286,7 +288,7 @@ private[spark] object PythonRDD {
286288

287289
private
288290
class BytesToString extends org.apache.spark.api.java.function.Function[Array[Byte], String] {
289-
override def call(arr: Array[Byte]) : String = new String(arr, "UTF-8")
291+
override def call(arr: Array[Byte]) : String = new String(arr, PythonRDD.UTF8)
290292
}
291293

292294
/**

core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.spark.rdd
1919

20+
import java.io.File
21+
import java.io.FilenameFilter
22+
import java.io.IOException
2023
import java.io.PrintWriter
2124
import java.util.StringTokenizer
2225

@@ -27,6 +30,7 @@ import scala.io.Source
2730
import scala.reflect.ClassTag
2831

2932
import org.apache.spark.{Partition, SparkEnv, TaskContext}
33+
import org.apache.spark.util.Utils
3034

3135

3236
/**
@@ -38,7 +42,8 @@ class PipedRDD[T: ClassTag](
3842
command: Seq[String],
3943
envVars: Map[String, String],
4044
printPipeContext: (String => Unit) => Unit,
41-
printRDDElement: (T, String => Unit) => Unit)
45+
printRDDElement: (T, String => Unit) => Unit,
46+
separateWorkingDir: Boolean)
4247
extends RDD[String](prev) {
4348

4449
// Similar to Runtime.exec(), if we are given a single string, split it into words
@@ -48,12 +53,24 @@ class PipedRDD[T: ClassTag](
4853
command: String,
4954
envVars: Map[String, String] = Map(),
5055
printPipeContext: (String => Unit) => Unit = null,
51-
printRDDElement: (T, String => Unit) => Unit = null) =
52-
this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement)
56+
printRDDElement: (T, String => Unit) => Unit = null,
57+
separateWorkingDir: Boolean = false) =
58+
this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement,
59+
separateWorkingDir)
5360

5461

5562
override def getPartitions: Array[Partition] = firstParent[T].partitions
5663

64+
/**
65+
* A FilenameFilter that accepts anything that isn't equal to the name passed in.
66+
* @param name of file or directory to leave out
67+
*/
68+
class NotEqualsFileNameFilter(filterName: String) extends FilenameFilter {
69+
def accept(dir: File, name: String): Boolean = {
70+
!name.equals(filterName)
71+
}
72+
}
73+
5774
override def compute(split: Partition, context: TaskContext): Iterator[String] = {
5875
val pb = new ProcessBuilder(command)
5976
// Add the environmental variables to the process.
@@ -67,6 +84,38 @@ class PipedRDD[T: ClassTag](
6784
currentEnvVars.putAll(hadoopSplit.getPipeEnvVars())
6885
}
6986

87+
// When spark.worker.separated.working.directory option is turned on, each
88+
// task will be run in separate directory. This should be resolve file
89+
// access conflict issue
90+
val taskDirectory = "./tasks/" + java.util.UUID.randomUUID.toString
91+
var workInTaskDirectory = false
92+
logDebug("taskDirectory = " + taskDirectory)
93+
if (separateWorkingDir == true) {
94+
val currentDir = new File(".")
95+
logDebug("currentDir = " + currentDir.getAbsolutePath())
96+
val taskDirFile = new File(taskDirectory)
97+
taskDirFile.mkdirs()
98+
99+
try {
100+
val tasksDirFilter = new NotEqualsFileNameFilter("tasks")
101+
102+
// Need to add symlinks to jars, files, and directories. On Yarn we could have
103+
// directories and other files not known to the SparkContext that were added via the
104+
// Hadoop distributed cache. We also don't want to symlink to the /tasks directories we
105+
// are creating here.
106+
for (file <- currentDir.list(tasksDirFilter)) {
107+
val fileWithDir = new File(currentDir, file)
108+
Utils.symlink(new File(fileWithDir.getAbsolutePath()),
109+
new File(taskDirectory + "/" + fileWithDir.getName()))
110+
}
111+
pb.directory(taskDirFile)
112+
workInTaskDirectory = true
113+
} catch {
114+
case e: Exception => logError("Unable to setup task working directory: " + e.getMessage +
115+
" (" + taskDirectory + ")")
116+
}
117+
}
118+
70119
val proc = pb.start()
71120
val env = SparkEnv.get
72121

@@ -112,6 +161,15 @@ class PipedRDD[T: ClassTag](
112161
if (exitStatus != 0) {
113162
throw new Exception("Subprocess exited with status " + exitStatus)
114163
}
164+
165+
// cleanup task working directory if used
166+
if (workInTaskDirectory == true) {
167+
scala.util.control.Exception.ignoring(classOf[IOException]) {
168+
Utils.deleteRecursively(new File(taskDirectory))
169+
}
170+
logDebug("Removed task working directory " + taskDirectory)
171+
}
172+
115173
false
116174
}
117175
}

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -481,16 +481,19 @@ abstract class RDD[T: ClassTag](
481481
* instead of constructing a huge String to concat all the elements:
482482
* def printRDDElement(record:(String, Seq[String]), f:String=>Unit) =
483483
* for (e <- record._2){f(e)}
484+
* @param separateWorkingDir Use separate working directories for each task.
484485
* @return the result RDD
485486
*/
486487
def pipe(
487488
command: Seq[String],
488489
env: Map[String, String] = Map(),
489490
printPipeContext: (String => Unit) => Unit = null,
490-
printRDDElement: (T, String => Unit) => Unit = null): RDD[String] = {
491+
printRDDElement: (T, String => Unit) => Unit = null,
492+
separateWorkingDir: Boolean = false): RDD[String] = {
491493
new PipedRDD(this, command, env,
492494
if (printPipeContext ne null) sc.clean(printPipeContext) else null,
493-
if (printRDDElement ne null) sc.clean(printRDDElement) else null)
495+
if (printRDDElement ne null) sc.clean(printRDDElement) else null,
496+
separateWorkingDir)
494497
}
495498

496499
/**

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
2626
import scala.collection.JavaConversions._
2727
import scala.collection.Map
2828
import scala.collection.mutable.ArrayBuffer
29+
import scala.collection.mutable.SortedSet
2930
import scala.io.Source
3031
import scala.reflect.ClassTag
3132

@@ -45,6 +46,8 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream,
4546
*/
4647
private[spark] object Utils extends Logging {
4748

49+
val osName = System.getProperty("os.name")
50+
4851
/** Serialize an object using Java serialization */
4952
def serialize[T](o: T): Array[Byte] = {
5053
val bos = new ByteArrayOutputStream()
@@ -556,9 +559,10 @@ private[spark] object Utils extends Logging {
556559

557560
/**
558561
* Delete a file or directory and its contents recursively.
562+
* Don't follow directories if they are symlinks.
559563
*/
560564
def deleteRecursively(file: File) {
561-
if (file.isDirectory) {
565+
if ((file.isDirectory) && !isSymlink(file)) {
562566
for (child <- listFilesSafely(file)) {
563567
deleteRecursively(child)
564568
}
@@ -580,6 +584,25 @@ private[spark] object Utils extends Logging {
580584
}
581585
}
582586

587+
/**
588+
* Check to see if file is a symbolic link.
589+
*/
590+
def isSymlink(file: File): Boolean = {
591+
if (file == null) throw new NullPointerException("File must not be null")
592+
if (osName.startsWith("Windows")) return false
593+
val fileInCanonicalDir = if (file.getParent() == null) {
594+
file
595+
} else {
596+
new File(file.getParentFile().getCanonicalFile(), file.getName())
597+
}
598+
599+
if (fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile())) {
600+
return false;
601+
} else {
602+
return true;
603+
}
604+
}
605+
583606
/**
584607
* Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
585608
*/
@@ -942,6 +965,26 @@ private[spark] object Utils extends Logging {
942965
count
943966
}
944967

968+
/**
969+
* Creates a symlink. Note jdk1.7 has Files.createSymbolicLink but not used here
970+
* for jdk1.6 support. Supports windows by doing copy, everything else uses "ln -sf".
971+
* @param src absolute path to the source
972+
* @param dst relative path for the destination
973+
*/
974+
def symlink(src: File, dst: File) {
975+
if (!src.isAbsolute()) {
976+
throw new IOException("Source must be absolute")
977+
}
978+
if (dst.isAbsolute()) {
979+
throw new IOException("Destination must be relative")
980+
}
981+
val linkCmd = if (osName.startsWith("Windows")) "copy" else "ln -sf"
982+
import scala.sys.process._
983+
(linkCmd + " " + src.getAbsolutePath() + " " + dst.getPath()) lines_! ProcessLogger(line =>
984+
(logInfo(line)))
985+
}
986+
987+
945988
/** Return the class name of the given object, removing all dollar signs */
946989
def getFormattedClassName(obj: AnyRef) = {
947990
obj.getClass.getSimpleName.replace("$", "")

core/src/test/scala/org/apache/spark/PipedRDDSuite.scala

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717

1818
package org.apache.spark
1919

20-
import org.scalatest.FunSuite
20+
import java.io.File
21+
22+
import com.google.common.io.Files
2123

24+
import org.scalatest.FunSuite
2225

2326
import org.apache.spark.rdd.{HadoopRDD, PipedRDD, HadoopPartition}
2427
import org.apache.hadoop.mapred.{JobConf, TextInputFormat, FileSplit}
@@ -126,6 +129,29 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext {
126129
}
127130
}
128131

132+
test("basic pipe with separate working directory") {
133+
if (testCommandAvailable("cat")) {
134+
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
135+
val piped = nums.pipe(Seq("cat"), separateWorkingDir = true)
136+
val c = piped.collect()
137+
assert(c.size === 4)
138+
assert(c(0) === "1")
139+
assert(c(1) === "2")
140+
assert(c(2) === "3")
141+
assert(c(3) === "4")
142+
val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true)
143+
val collectPwd = pipedPwd.collect()
144+
assert(collectPwd(0).contains("tasks/"))
145+
val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true).collect()
146+
// make sure symlinks were created
147+
assert(pipedLs.length > 0)
148+
// clean up top level tasks directory
149+
new File("tasks").delete()
150+
} else {
151+
assert(true)
152+
}
153+
}
154+
129155
test("test pipe exports map_input_file") {
130156
testExportInputFile("map_input_file")
131157
}

python/pyspark/context.py

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@
2828
from pyspark.conf import SparkConf
2929
from pyspark.files import SparkFiles
3030
from pyspark.java_gateway import launch_gateway
31-
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
31+
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
32+
PairDeserializer
3233
from pyspark.storagelevel import StorageLevel
3334
from pyspark import rdd
3435
from pyspark.rdd import RDD
@@ -257,6 +258,45 @@ def textFile(self, name, minSplits=None):
257258
return RDD(self._jsc.textFile(name, minSplits), self,
258259
UTF8Deserializer())
259260

261+
def wholeTextFiles(self, path):
262+
"""
263+
Read a directory of text files from HDFS, a local file system
264+
(available on all nodes), or any Hadoop-supported file system
265+
URI. Each file is read as a single record and returned in a
266+
key-value pair, where the key is the path of each file, the
267+
value is the content of each file.
268+
269+
For example, if you have the following files::
270+
271+
hdfs://a-hdfs-path/part-00000
272+
hdfs://a-hdfs-path/part-00001
273+
...
274+
hdfs://a-hdfs-path/part-nnnnn
275+
276+
Do C{rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path")},
277+
then C{rdd} contains::
278+
279+
(a-hdfs-path/part-00000, its content)
280+
(a-hdfs-path/part-00001, its content)
281+
...
282+
(a-hdfs-path/part-nnnnn, its content)
283+
284+
NOTE: Small files are preferred, as each file will be loaded
285+
fully in memory.
286+
287+
>>> dirPath = os.path.join(tempdir, "files")
288+
>>> os.mkdir(dirPath)
289+
>>> with open(os.path.join(dirPath, "1.txt"), "w") as file1:
290+
... file1.write("1")
291+
>>> with open(os.path.join(dirPath, "2.txt"), "w") as file2:
292+
... file2.write("2")
293+
>>> textFiles = sc.wholeTextFiles(dirPath)
294+
>>> sorted(textFiles.collect())
295+
[(u'.../1.txt', u'1'), (u'.../2.txt', u'2')]
296+
"""
297+
return RDD(self._jsc.wholeTextFiles(path), self,
298+
PairDeserializer(UTF8Deserializer(), UTF8Deserializer()))
299+
260300
def _checkpointFile(self, name, input_deserializer):
261301
jrdd = self._jsc.checkpointFile(name)
262302
return RDD(jrdd, self, input_deserializer)
@@ -428,7 +468,7 @@ def _test():
428468
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
429469
globs['tempdir'] = tempfile.mkdtemp()
430470
atexit.register(lambda: shutil.rmtree(globs['tempdir']))
431-
(failure_count, test_count) = doctest.testmod(globs=globs)
471+
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
432472
globs['sc'].stop()
433473
if failure_count:
434474
exit(-1)

python/pyspark/serializers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ class MarshalSerializer(FramedSerializer):
290290

291291
class UTF8Deserializer(Serializer):
292292
"""
293-
Deserializes streams written by getBytes.
293+
Deserializes streams written by String.getBytes.
294294
"""
295295

296296
def loads(self, stream):

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ class SqlParser extends StandardTokenParsers {
219219

220220
protected lazy val relationFactor: Parser[LogicalPlan] =
221221
ident ~ (opt(AS) ~> opt(ident)) ^^ {
222-
case ident ~ alias => UnresolvedRelation(alias, ident)
222+
case tableName ~ alias => UnresolvedRelation(None, tableName, alias)
223223
} |
224224
"(" ~> query ~ ")" ~ opt(AS) ~ ident ^^ { case s ~ _ ~ _ ~ a => Subquery(a, s) }
225225

0 commit comments

Comments
 (0)