Skip to content

Commit 198892f

Browse files
tgravescsmateiz
authored andcommitted
[SPARK-1198] Allow pipes tasks to run in different sub-directories
This works as is on Linux/Mac/etc but doesn't cover working on Windows. In here I use ln -sf for symlinks. Putting this up for comments on that. Do we want to create perhaps some classes for doing shell commands - Linux vs Windows. Is there some other way we want to do this? I assume we are still supporting jdk1.6? Also should I update the Java API for pipes to allow this parameter? Author: Thomas Graves <[email protected]> Closes #128 from tgravescs/SPARK1198 and squashes the following commits: abc1289 [Thomas Graves] remove extra tag in pom file ba23fc0 [Thomas Graves] Add support for symlink on windows, remove commons-io usage da4b221 [Thomas Graves] Merge branch 'master' of https://github.com/tgravescs/spark into SPARK1198 61be271 [Thomas Graves] Fix file name filter 6b783bd [Thomas Graves] style fixes 1ab49ca [Thomas Graves] Add support for running pipe tasks is separate directories
1 parent a02b535 commit 198892f

File tree

4 files changed

+137
-7
lines changed

4 files changed

+137
-7
lines changed

core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.spark.rdd
1919

20+
import java.io.File
21+
import java.io.FilenameFilter
22+
import java.io.IOException
2023
import java.io.PrintWriter
2124
import java.util.StringTokenizer
2225

@@ -27,6 +30,7 @@ import scala.io.Source
2730
import scala.reflect.ClassTag
2831

2932
import org.apache.spark.{Partition, SparkEnv, TaskContext}
33+
import org.apache.spark.util.Utils
3034

3135

3236
/**
@@ -38,7 +42,8 @@ class PipedRDD[T: ClassTag](
3842
command: Seq[String],
3943
envVars: Map[String, String],
4044
printPipeContext: (String => Unit) => Unit,
41-
printRDDElement: (T, String => Unit) => Unit)
45+
printRDDElement: (T, String => Unit) => Unit,
46+
separateWorkingDir: Boolean)
4247
extends RDD[String](prev) {
4348

4449
// Similar to Runtime.exec(), if we are given a single string, split it into words
@@ -48,12 +53,24 @@ class PipedRDD[T: ClassTag](
4853
command: String,
4954
envVars: Map[String, String] = Map(),
5055
printPipeContext: (String => Unit) => Unit = null,
51-
printRDDElement: (T, String => Unit) => Unit = null) =
52-
this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement)
56+
printRDDElement: (T, String => Unit) => Unit = null,
57+
separateWorkingDir: Boolean = false) =
58+
this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement,
59+
separateWorkingDir)
5360

5461

5562
override def getPartitions: Array[Partition] = firstParent[T].partitions
5663

64+
/**
65+
* A FilenameFilter that accepts anything that isn't equal to the name passed in.
66+
* @param name of file or directory to leave out
67+
*/
68+
class NotEqualsFileNameFilter(filterName: String) extends FilenameFilter {
69+
def accept(dir: File, name: String): Boolean = {
70+
!name.equals(filterName)
71+
}
72+
}
73+
5774
override def compute(split: Partition, context: TaskContext): Iterator[String] = {
5875
val pb = new ProcessBuilder(command)
5976
// Add the environmental variables to the process.
@@ -67,6 +84,38 @@ class PipedRDD[T: ClassTag](
6784
currentEnvVars.putAll(hadoopSplit.getPipeEnvVars())
6885
}
6986

87+
// When spark.worker.separated.working.directory option is turned on, each
88+
// task will be run in separate directory. This should be resolve file
89+
// access conflict issue
90+
val taskDirectory = "./tasks/" + java.util.UUID.randomUUID.toString
91+
var workInTaskDirectory = false
92+
logDebug("taskDirectory = " + taskDirectory)
93+
if (separateWorkingDir == true) {
94+
val currentDir = new File(".")
95+
logDebug("currentDir = " + currentDir.getAbsolutePath())
96+
val taskDirFile = new File(taskDirectory)
97+
taskDirFile.mkdirs()
98+
99+
try {
100+
val tasksDirFilter = new NotEqualsFileNameFilter("tasks")
101+
102+
// Need to add symlinks to jars, files, and directories. On Yarn we could have
103+
// directories and other files not known to the SparkContext that were added via the
104+
// Hadoop distributed cache. We also don't want to symlink to the /tasks directories we
105+
// are creating here.
106+
for (file <- currentDir.list(tasksDirFilter)) {
107+
val fileWithDir = new File(currentDir, file)
108+
Utils.symlink(new File(fileWithDir.getAbsolutePath()),
109+
new File(taskDirectory + "/" + fileWithDir.getName()))
110+
}
111+
pb.directory(taskDirFile)
112+
workInTaskDirectory = true
113+
} catch {
114+
case e: Exception => logError("Unable to setup task working directory: " + e.getMessage +
115+
" (" + taskDirectory + ")")
116+
}
117+
}
118+
70119
val proc = pb.start()
71120
val env = SparkEnv.get
72121

@@ -112,6 +161,15 @@ class PipedRDD[T: ClassTag](
112161
if (exitStatus != 0) {
113162
throw new Exception("Subprocess exited with status " + exitStatus)
114163
}
164+
165+
// cleanup task working directory if used
166+
if (workInTaskDirectory == true) {
167+
scala.util.control.Exception.ignoring(classOf[IOException]) {
168+
Utils.deleteRecursively(new File(taskDirectory))
169+
}
170+
logDebug("Removed task working directory " + taskDirectory)
171+
}
172+
115173
false
116174
}
117175
}

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -481,16 +481,19 @@ abstract class RDD[T: ClassTag](
481481
* instead of constructing a huge String to concat all the elements:
482482
* def printRDDElement(record:(String, Seq[String]), f:String=>Unit) =
483483
* for (e <- record._2){f(e)}
484+
* @param separateWorkingDir Use separate working directories for each task.
484485
* @return the result RDD
485486
*/
486487
def pipe(
487488
command: Seq[String],
488489
env: Map[String, String] = Map(),
489490
printPipeContext: (String => Unit) => Unit = null,
490-
printRDDElement: (T, String => Unit) => Unit = null): RDD[String] = {
491+
printRDDElement: (T, String => Unit) => Unit = null,
492+
separateWorkingDir: Boolean = false): RDD[String] = {
491493
new PipedRDD(this, command, env,
492494
if (printPipeContext ne null) sc.clean(printPipeContext) else null,
493-
if (printRDDElement ne null) sc.clean(printRDDElement) else null)
495+
if (printRDDElement ne null) sc.clean(printRDDElement) else null,
496+
separateWorkingDir)
494497
}
495498

496499
/**

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
2626
import scala.collection.JavaConversions._
2727
import scala.collection.Map
2828
import scala.collection.mutable.ArrayBuffer
29+
import scala.collection.mutable.SortedSet
2930
import scala.io.Source
3031
import scala.reflect.ClassTag
3132

@@ -43,6 +44,8 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream,
4344
*/
4445
private[spark] object Utils extends Logging {
4546

47+
val osName = System.getProperty("os.name")
48+
4649
/** Serialize an object using Java serialization */
4750
def serialize[T](o: T): Array[Byte] = {
4851
val bos = new ByteArrayOutputStream()
@@ -521,9 +524,10 @@ private[spark] object Utils extends Logging {
521524

522525
/**
523526
* Delete a file or directory and its contents recursively.
527+
* Don't follow directories if they are symlinks.
524528
*/
525529
def deleteRecursively(file: File) {
526-
if (file.isDirectory) {
530+
if ((file.isDirectory) && !isSymlink(file)) {
527531
for (child <- listFilesSafely(file)) {
528532
deleteRecursively(child)
529533
}
@@ -536,6 +540,25 @@ private[spark] object Utils extends Logging {
536540
}
537541
}
538542

543+
/**
544+
* Check to see if file is a symbolic link.
545+
*/
546+
def isSymlink(file: File): Boolean = {
547+
if (file == null) throw new NullPointerException("File must not be null")
548+
if (osName.startsWith("Windows")) return false
549+
val fileInCanonicalDir = if (file.getParent() == null) {
550+
file
551+
} else {
552+
new File(file.getParentFile().getCanonicalFile(), file.getName())
553+
}
554+
555+
if (fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile())) {
556+
return false;
557+
} else {
558+
return true;
559+
}
560+
}
561+
539562
/**
540563
* Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
541564
*/
@@ -898,6 +921,26 @@ private[spark] object Utils extends Logging {
898921
count
899922
}
900923

924+
/**
925+
* Creates a symlink. Note jdk1.7 has Files.createSymbolicLink but not used here
926+
* for jdk1.6 support. Supports windows by doing copy, everything else uses "ln -sf".
927+
* @param src absolute path to the source
928+
* @param dst relative path for the destination
929+
*/
930+
def symlink(src: File, dst: File) {
931+
if (!src.isAbsolute()) {
932+
throw new IOException("Source must be absolute")
933+
}
934+
if (dst.isAbsolute()) {
935+
throw new IOException("Destination must be relative")
936+
}
937+
val linkCmd = if (osName.startsWith("Windows")) "copy" else "ln -sf"
938+
import scala.sys.process._
939+
(linkCmd + " " + src.getAbsolutePath() + " " + dst.getPath()) lines_! ProcessLogger(line =>
940+
(logInfo(line)))
941+
}
942+
943+
901944
/** Return the class name of the given object, removing all dollar signs */
902945
def getFormattedClassName(obj: AnyRef) = {
903946
obj.getClass.getSimpleName.replace("$", "")

core/src/test/scala/org/apache/spark/PipedRDDSuite.scala

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717

1818
package org.apache.spark
1919

20-
import org.scalatest.FunSuite
20+
import java.io.File
21+
22+
import com.google.common.io.Files
2123

24+
import org.scalatest.FunSuite
2225

2326
import org.apache.spark.rdd.{HadoopRDD, PipedRDD, HadoopPartition}
2427
import org.apache.hadoop.mapred.{JobConf, TextInputFormat, FileSplit}
@@ -126,6 +129,29 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext {
126129
}
127130
}
128131

132+
test("basic pipe with separate working directory") {
133+
if (testCommandAvailable("cat")) {
134+
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
135+
val piped = nums.pipe(Seq("cat"), separateWorkingDir = true)
136+
val c = piped.collect()
137+
assert(c.size === 4)
138+
assert(c(0) === "1")
139+
assert(c(1) === "2")
140+
assert(c(2) === "3")
141+
assert(c(3) === "4")
142+
val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true)
143+
val collectPwd = pipedPwd.collect()
144+
assert(collectPwd(0).contains("tasks/"))
145+
val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true).collect()
146+
// make sure symlinks were created
147+
assert(pipedLs.length > 0)
148+
// clean up top level tasks directory
149+
new File("tasks").delete()
150+
} else {
151+
assert(true)
152+
}
153+
}
154+
129155
test("test pipe exports map_input_file") {
130156
testExportInputFile("map_input_file")
131157
}

0 commit comments

Comments
 (0)