Skip to content

Commit 1ab49ca

Browse files
committed
Add support for running pipe tasks is separate directories
1 parent af7f2f1 commit 1ab49ca

File tree

6 files changed

+113
-12
lines changed

6 files changed

+113
-12
lines changed

core/pom.xml

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -183,14 +183,13 @@
183183
<groupId>com.codahale.metrics</groupId>
184184
<artifactId>metrics-graphite</artifactId>
185185
</dependency>
186-
<dependency>
187-
<groupId>org.apache.derby</groupId>
188-
<artifactId>derby</artifactId>
189-
<scope>test</scope>
190-
</dependency>
191186
<dependency>
192187
<groupId>commons-io</groupId>
193188
<artifactId>commons-io</artifactId>
189+
</dependency>
190+
<dependency>
191+
<groupId>org.apache.derby</groupId>
192+
<artifactId>derby</artifactId>
194193
<scope>test</scope>
195194
</dependency>
196195
<dependency>

core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.rdd
1919

20+
import java.io.File
21+
import java.io.FilenameFilter
2022
import java.io.PrintWriter
2123
import java.util.StringTokenizer
2224

@@ -26,7 +28,9 @@ import scala.collection.mutable.ArrayBuffer
2628
import scala.io.Source
2729
import scala.reflect.ClassTag
2830

31+
import org.apache.commons.io.FileUtils
2932
import org.apache.spark.{Partition, SparkEnv, TaskContext}
33+
import org.apache.spark.util.Utils
3034

3135

3236
/**
@@ -38,7 +42,8 @@ class PipedRDD[T: ClassTag](
3842
command: Seq[String],
3943
envVars: Map[String, String],
4044
printPipeContext: (String => Unit) => Unit,
41-
printRDDElement: (T, String => Unit) => Unit)
45+
printRDDElement: (T, String => Unit) => Unit,
46+
separateWorkingDir: Boolean)
4247
extends RDD[String](prev) {
4348

4449
// Similar to Runtime.exec(), if we are given a single string, split it into words
@@ -48,12 +53,24 @@ class PipedRDD[T: ClassTag](
4853
command: String,
4954
envVars: Map[String, String] = Map(),
5055
printPipeContext: (String => Unit) => Unit = null,
51-
printRDDElement: (T, String => Unit) => Unit = null) =
52-
this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement)
56+
printRDDElement: (T, String => Unit) => Unit = null,
57+
separateWorkingDir: Boolean = false) =
58+
this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement,
59+
separateWorkingDir)
5360

5461

5562
override def getPartitions: Array[Partition] = firstParent[T].partitions
5663

64+
/**
65+
* A FilenameFilter that accepts anything that isn't equal to the name passed in.
66+
* @param name of file or directory to leave out
67+
*/
68+
class NotEqualsFileNameFilter(name: String) extends FilenameFilter {
69+
def accept(dir: File, name: String): Boolean = {
70+
!name.equals(name)
71+
}
72+
}
73+
5774
override def compute(split: Partition, context: TaskContext): Iterator[String] = {
5875
val pb = new ProcessBuilder(command)
5976
// Add the environmental variables to the process.
@@ -67,6 +84,38 @@ class PipedRDD[T: ClassTag](
6784
currentEnvVars.putAll(hadoopSplit.getPipeEnvVars())
6885
}
6986

87+
// When spark.worker.separated.working.directory option is turned on, each
88+
// task will be run in separate directory. This should be resolve file
89+
// access conflict issue
90+
val taskDirectory = "./tasks/" + java.util.UUID.randomUUID.toString
91+
var workInTaskDirectory = false
92+
logDebug("taskDirectory = " + taskDirectory)
93+
if (separateWorkingDir == true) {
94+
val currentDir = new File(".")
95+
logDebug("currentDir = " + currentDir)
96+
val taskDirFile = new File(taskDirectory)
97+
taskDirFile.mkdirs()
98+
99+
try {
100+
val tasksDirFilter = new NotEqualsFileNameFilter("tasks")
101+
102+
// Need to add symlinks to jars, files, and directories. On Yarn we could have
103+
// directories and other files not known to the SparkContext that were added via the
104+
// Hadoop distributed cache. We also don't want to symlink to the /tasks directories we
105+
// are creating here.
106+
for (file <- currentDir.list(tasksDirFilter)) {
107+
val fileWithDir = new File(currentDir, file)
108+
Utils.symlink(new File(fileWithDir.getAbsolutePath()),
109+
new File(taskDirectory + "/" + fileWithDir.getName()))
110+
}
111+
pb.directory(taskDirFile)
112+
workInTaskDirectory = true
113+
} catch {
114+
case e: Exception => logError("Unable to setup task working directory: " + e.getMessage +
115+
" (" + taskDirectory + ")")
116+
}
117+
}
118+
70119
val proc = pb.start()
71120
val env = SparkEnv.get
72121

@@ -112,6 +161,13 @@ class PipedRDD[T: ClassTag](
112161
if (exitStatus != 0) {
113162
throw new Exception("Subprocess exited with status " + exitStatus)
114163
}
164+
165+
// cleanup task working directory if used
166+
if(workInTaskDirectory == true) {
167+
FileUtils.deleteQuietly(new File(taskDirectory))
168+
logDebug("Removed task working directory " + taskDirectory)
169+
}
170+
115171
false
116172
}
117173
}

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -478,16 +478,19 @@ abstract class RDD[T: ClassTag](
478478
* instead of constructing a huge String to concat all the elements:
479479
* def printRDDElement(record:(String, Seq[String]), f:String=>Unit) =
480480
* for (e <- record._2){f(e)}
481+
* @param separateWorkingDir Use separate working directories for each task.
481482
* @return the result RDD
482483
*/
483484
def pipe(
484485
command: Seq[String],
485486
env: Map[String, String] = Map(),
486487
printPipeContext: (String => Unit) => Unit = null,
487-
printRDDElement: (T, String => Unit) => Unit = null): RDD[String] = {
488+
printRDDElement: (T, String => Unit) => Unit = null,
489+
separateWorkingDir: Boolean = false): RDD[String] = {
488490
new PipedRDD(this, command, env,
489491
if (printPipeContext ne null) sc.clean(printPipeContext) else null,
490-
if (printRDDElement ne null) sc.clean(printRDDElement) else null)
492+
if (printRDDElement ne null) sc.clean(printRDDElement) else null,
493+
separateWorkingDir)
491494
}
492495

493496
/**

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
2626
import scala.collection.JavaConversions._
2727
import scala.collection.Map
2828
import scala.collection.mutable.ArrayBuffer
29+
import scala.collection.mutable.SortedSet
2930
import scala.io.Source
3031
import scala.reflect.ClassTag
3132

@@ -895,4 +896,24 @@ private[spark] object Utils extends Logging {
895896
}
896897
count
897898
}
899+
900+
/**
901+
* Creates a symlink. Note jdk1.7 has Files.createSymbolicLink but not used here
902+
* for jdk1.6 support. Doesn't support windows or any other platform without 'ln'.
903+
* @param src absolute path to the source
904+
* @param dst relative path for the destination
905+
*/
906+
def symlink(src: File, dst: File) {
907+
if (!src.isAbsolute()) {
908+
throw new IOException("Source must be absolute")
909+
}
910+
if (dst.isAbsolute()) {
911+
throw new IOException("Destination must be relative")
912+
}
913+
import scala.sys.process._
914+
("ln -sf " + src.getAbsolutePath() + " " + dst.getPath()) lines_! ProcessLogger(line =>
915+
(logInfo(line)))
916+
}
917+
918+
898919
}

core/src/test/scala/org/apache/spark/PipedRDDSuite.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,28 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext {
123123
}
124124
}
125125

126+
test("basic pipe with separate working directory") {
127+
if (testCommandAvailable("cat")) {
128+
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
129+
130+
val piped = nums.pipe(Seq("cat"), separateWorkingDir = true)
131+
132+
val c = piped.collect()
133+
assert(c.size === 4)
134+
assert(c(0) === "1")
135+
assert(c(1) === "2")
136+
assert(c(2) === "3")
137+
assert(c(3) === "4")
138+
val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true)
139+
val collectPwd = pipedPwd.collect()
140+
println("collect pwd is: " + collectPwd(0))
141+
assert(collectPwd(0).contains("tasks/"))
142+
assert(collectPwd(0).matches("tasks/"))
143+
} else {
144+
assert(true)
145+
}
146+
}
147+
126148
test("test pipe exports map_input_file") {
127149
testExportInputFile("map_input_file")
128150
}

project/SparkBuild.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,12 +238,12 @@ object SparkBuild extends Build {
238238
"org.eclipse.jetty" % "jetty-security" % "7.6.8.v20121106",
239239
/** Workaround for SPARK-959. Dependency used by org.eclipse.jetty. Fixed in ivy 2.3.0. */
240240
"org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"),
241+
"commons-io" % "commons-io" % "2.4",
241242
"org.scalatest" %% "scalatest" % "1.9.1" % "test",
242243
"org.scalacheck" %% "scalacheck" % "1.10.0" % "test",
243244
"com.novocode" % "junit-interface" % "0.10" % "test",
244245
"org.easymock" % "easymock" % "3.1" % "test",
245-
"org.mockito" % "mockito-all" % "1.8.5" % "test",
246-
"commons-io" % "commons-io" % "2.4" % "test"
246+
"org.mockito" % "mockito-all" % "1.8.5" % "test"
247247
),
248248

249249
testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"),

0 commit comments

Comments
 (0)