Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 85 additions & 4 deletions core/src/main/scala/spark/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import org.apache.hadoop.mapred.TextOutputFormat

import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}

import spark.broadcast.Broadcast
import spark.Partitioner._
import spark.partial.BoundedDouble
import spark.partial.CountEvaluator
Expand Down Expand Up @@ -351,13 +352,93 @@ abstract class RDD[T: ClassManifest](
/**
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: Seq[String]): RDD[String] = new PipedRDD(this, command)
def pipe(command: String, env: Map[String, String]): RDD[String] =
new PipedRDD(this, command, env)

/**
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: Seq[String], env: Map[String, String]): RDD[String] =
new PipedRDD(this, command, env)
* How each record in RDD is outputed to the process can be controled by providing a
* function trasnform(T, outputFunction: String => Unit). transform() will be called with
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: trasnform should be transform.

* the currnet record in RDD as the 1st parameter, and the function to output the record to
* the external process (like out.println()) as the 2nd parameter.
* Here's an example on how to pipe the RDD data of groupBy() in a streaming way,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add blank lines inside the comment so that this isn't a wall of text? And maybe enclose parameter names in backticks to make them more apparent?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be cleaner to use the Scaladoc @param convention to describe each parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will change.

* instead of constructing a huge String to concat all the records:
* def tranform(record:(String, Seq[String]), f:String=>Unit) = for (e <- record._2){f(e)}
* pipeContext can be used to transfer additional context data to the external process
* besides the RDD. pipeContext is a broadcast Seq[String], each line would be piped to
* external process with "^A" as the delimiter in the end of context data. Delimiter can also
* be customized by the last parameter delimiter.
*/
def pipe[U<: Seq[String]](
command: String,
env: Map[String, String],
transform: (T,String => Unit) => Any,
pipeContext: Broadcast[U],
delimiter: String): RDD[String] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe delimiter should be renamed to pipeContextDelimiter to clarify that it only delimits items passed in pipeContext.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will change.

new PipedRDD(this, command, env, transform, pipeContext, delimiter)

/**
* Return an RDD created by piping elements to a forked external process.
* How each record in RDD is outputed to the process can be controled by providing a
* function trasnform(T, outputFunction: String => Unit). transform() will be called with
* the currnet record in RDD as the 1st parameter, and the function to output the record to
* the external process (like out.println()) as the 2nd parameter.
* Here's an example on how to pipe the RDD data of groupBy() in a streaming way,
* instead of constructing a huge String to concat all the records:
* def tranform(record:(String, Seq[String]), f:String=>Unit) = for (e <- record._2){f(e)}
* pipeContext can be used to transfer additional context data to the external process
* besides the RDD. pipeContext is a broadcast Seq[String], each line would be piped to
* external process with "^A" as the delimiter in the end of context data. Delimiter can also
* be customized by the last parameter delimiter.
*/
def pipe[U<: Seq[String]](
command: String,
transform: (T,String => Unit) => Any,
pipeContext: Broadcast[U]): RDD[String] =
new PipedRDD(this, command, Map[String, String](), transform, pipeContext, "\u0001")

/**
* Return an RDD created by piping elements to a forked external process.
* How each record in RDD is outputed to the process can be controled by providing a
* function trasnform(T, outputFunction: String => Unit). transform() will be called with
* the currnet record in RDD as the 1st parameter, and the function to output the record to
* the external process (like out.println()) as the 2nd parameter.
* Here's an example on how to pipe the RDD data of groupBy() in a streaming way,
* instead of constructing a huge String to concat all the records:
* def tranform(record:(String, Seq[String]), f:String=>Unit) = for (e <- record._2){f(e)}
* pipeContext can be used to transfer additional context data to the external process
* besides the RDD. pipeContext is a broadcast Seq[String], each line would be piped to
* external process with "^A" as the delimiter in the end of context data. Delimiter can also
* be customized by the last parameter delimiter.
*/
def pipe[U<: Seq[String]](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you have the 3rd pipe function with default arguments, I don't think you need the first two anymore?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this; we should consolidate these into one function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin @mateiz , I agree with you. I actually tried to combine as much as I can. ;) But I got some weird issues when I define default arguments for 2 overloaded version of pipe(): "in class RDD, multiple overloaded alternatives of method pipe define default arguments.". I dont know if you have run into this as well.

I tested, it seems like when there are more than 1 overloaded version of a function, only 1 is allowed to have default parameters:

scala> class A() { def f(a:Int = 0, b:Int) =0;def f(a:Int, b:String) =0}
defined class A

scala> class A() { def f(a:Int = 0, b:Int) =0;def f(a:Int = 0, b:String) =0}
:7: error: in class A, multiple overloaded alternatives of method f define default arguments.
class A() { def f(a:Int = 0, b:Int) =0;def f(a:Int = 0, b:String) =0}

Looks like there are some complaints about this weird policy already: http://stackoverflow.com/questions/4652095/why-does-the-scala-compiler-disallow-overloaded-methods-with-default-arguments.

Do you know any better way to resolve this problem?

The reason I kept the first 2 version of pipe() were, I wanted to still have the pipe(String) and pipe(Seq[String]) available to maintain backward compatibility. I thought we'd better still keep users' previous code working, otherwise, after this change they would need to modify their code and rebuild. Do you think we should consider the backward compatibility? What's your opinion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, how about only the Seq[String] version of the pipe can have the "advanced" features?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds good to me.

command: String,
env: Map[String, String],
transform: (T,String => Unit) => Any,
pipeContext: Broadcast[U]): RDD[String] =
new PipedRDD(this, command, env, transform, pipeContext, "\u0001")

/**
* Return an RDD created by piping elements to a forked external process.
* How each record in RDD is outputed to the process can be controled by providing a
* function trasnform(T, outputFunction: String => Unit). transform() will be called with
* the currnet record in RDD as the 1st parameter, and the function to output the record to
* the external process (like out.println()) as the 2nd parameter.
* Here's an example on how to pipe the RDD data of groupBy() in a streaming way,
* instead of constructing a huge String to concat all the records:
* def tranform(record:(String, Seq[String]), f:String=>Unit) = for (e <- record._2){f(e)}
* pipeContext can be used to transfer additional context data to the external process
* besides the RDD. pipeContext is a broadcast Seq[String], each line would be piped to
* external process with "^A" as the delimiter in the end of context data. Delimiter can also
* be customized by the last parameter delimiter.
*/
def pipe[U<: Seq[String]](
command: Seq[String],
env: Map[String, String] = Map(),
transform: (T,String => Unit) => Any = null,
pipeContext: Broadcast[U] = null,
delimiter: String = "\u0001"): RDD[String] =
new PipedRDD(this, command, env, transform, pipeContext, delimiter)

/**
* Return a new RDD by applying a function to each partition of this RDD.
Expand Down
36 changes: 30 additions & 6 deletions core/src/main/scala/spark/rdd/PipedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,34 @@ import scala.collection.mutable.ArrayBuffer
import scala.io.Source

import spark.{RDD, SparkEnv, Partition, TaskContext}
import spark.broadcast.Broadcast


/**
* An RDD that pipes the contents of each parent partition through an external command
* (printing them one per line) and returns the output as a collection of strings.
*/
class PipedRDD[T: ClassManifest](
class PipedRDD[T: ClassManifest, U <: Seq[String]](
prev: RDD[T],
command: Seq[String],
envVars: Map[String, String])
envVars: Map[String, String],
transform: (T, String => Unit) => Any,
pipeContext: Broadcast[U],
delimiter: String
)
extends RDD[String](prev) {

def this(prev: RDD[T], command: Seq[String]) = this(prev, command, Map())

// Similar to Runtime.exec(), if we are given a single string, split it into words
// using a standard StringTokenizer (i.e. by spaces)
def this(prev: RDD[T], command: String) = this(prev, PipedRDD.tokenize(command))
def this(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can put the default arguments in the default constructor (i.e. the one right after "class PipedRDD").

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ehm. You are right. I'll change this.

prev: RDD[T],
command: String,
envVars: Map[String, String] = Map(),
transform: (T, String => Unit) => Any = null,
pipeContext: Broadcast[U] = null,
delimiter: String = "\u0001") =
this(prev, PipedRDD.tokenize(command), envVars, transform, pipeContext, delimiter)


override def getPartitions: Array[Partition] = firstParent[T].partitions

Expand All @@ -52,8 +63,21 @@ class PipedRDD[T: ClassManifest](
override def run() {
SparkEnv.set(env)
val out = new PrintWriter(proc.getOutputStream)

// input the pipeContext firstly
if ( pipeContext != null) {
for (elem <- pipeContext.value) {
out.println(elem)
}
// delimiter\n as the marker of the end of the pipeContext
out.println(delimiter)
}
for (elem <- firstParent[T].iterator(split, context)) {
out.println(elem)
if (transform != null) {
transform(elem, out.println(_))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this called with out.println instead of out.print? Using out.print would give users the freedom to incrementally build up output on a single line, whereas out.println forces them to either convert the record into one huge string or to split it across multiple lines.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, why not just pass the raw PrintWriter to transform? It would simplify transform's signature and provide a lot of flexibility without sacrificing performance.

This would efficiently support the case where you want to write a reducer's input as a key followed by one value per line (with appropriate delimiters between keys) because it wouldn't force you to append '\n' to the end of each value's string before printing it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think supporting customizing "\n" is so urgently needed, and I think it's out of the scope of this pull request.

Plus I think it's actually a complicated topic. I think a good API should have a good trade-off between flexibility and cohesion. When we expose PrintWriter to the users, we totally lose control of how they use it, they might use all the methods in PrintWriter interface, some of which are not what we want them to use. And this way, we would be tightly coupled with PrintWriter's implementation details. Exposing an abstract interface like here is a better way to decouple the implementation details. Hadoop abstracts the interface as org.apache.hadoop.streaming.io.InputWriter, and allows customization of almost anything by specifying a customized implementation of InputWriter. So if we really want to support the customization well, i think we can reuse org.apache.hadoop.streaming.io.InputWriter as what we do in hadoopFile for InputFormat. But again, i think that's out of the scope of this pull request.

} else {
out.println(elem)
}
}
out.close()
}
Expand Down
34 changes: 34 additions & 0 deletions core/src/test/scala/spark/PipedRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,40 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext {
assert(c(3) === "4")
}

test("advanced pipe") {
sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)

val piped = nums.pipe(Seq("cat"), Map[String, String](),
(i:Int, f: String=> Unit) => f(i + "_"), sc.broadcast(List("0")))

val c = piped.collect()

assert(c.size === 8)
assert(c(0) === "0")
assert(c(1) === "\u0001")
assert(c(2) === "1_")
assert(c(3) === "2_")
assert(c(4) === "0")
assert(c(5) === "\u0001")
assert(c(6) === "3_")
assert(c(7) === "4_")

val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2)
val d = nums1.groupBy(str=>str.split("\t")(0)).
pipe(Seq("cat"), Map[String, String](), (i:Tuple2[String, Seq[String]], f: String=> Unit) =>
{for (e <- i._2){ f(e + "_")}}, sc.broadcast(List("0"))).collect()
assert(d.size === 8)
assert(d(0) === "0")
assert(d(1) === "\u0001")
assert(d(2) === "b\t2_")
assert(d(3) === "b\t4_")
assert(d(4) === "0")
assert(d(5) === "\u0001")
assert(d(6) === "a\t1_")
assert(d(7) === "a\t3_")
}

test("pipe with env variable") {
sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
Expand Down