Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 37 additions & 21 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.api.java

import java.{lang => jl}
import java.lang.{Iterable => JIterable}
import java.util.{Comparator, Iterator => JIterator, List => JList}
import java.util.{Comparator, Iterator => JIterator, List => JList, Map => JMap}

import scala.collection.JavaConverters._
import scala.reflect.ClassTag
Expand Down Expand Up @@ -80,7 +80,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* This should ''not'' be called by users directly, but is available for implementors of custom
* subclasses of RDD.
*/
def iterator(split: Partition, taskContext: TaskContext): java.util.Iterator[T] =
def iterator(split: Partition, taskContext: TaskContext): JIterator[T] =
rdd.iterator(split, taskContext).asJava

// Transformations (return a new RDD)
Expand All @@ -96,7 +96,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* of the original partition.
*/
def mapPartitionsWithIndex[R](
f: JFunction2[jl.Integer, java.util.Iterator[T], java.util.Iterator[R]],
f: JFunction2[jl.Integer, JIterator[T], JIterator[R]],
preservesPartitioning: Boolean = false): JavaRDD[R] =
new JavaRDD(rdd.mapPartitionsWithIndex((a, b) => f.call(a, b.asJava).asScala,
preservesPartitioning)(fakeClassTag))(fakeClassTag)
Expand Down Expand Up @@ -147,7 +147,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = {
def mapPartitions[U](f: FlatMapFunction[JIterator[T], U]): JavaRDD[U] = {
def fn: (Iterator[T]) => Iterator[U] = {
(x: Iterator[T]) => f.call(x.asJava).asScala
}
Expand All @@ -157,7 +157,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U],
def mapPartitions[U](f: FlatMapFunction[JIterator[T], U],
preservesPartitioning: Boolean): JavaRDD[U] = {
def fn: (Iterator[T]) => Iterator[U] = {
(x: Iterator[T]) => f.call(x.asJava).asScala
Expand All @@ -169,7 +169,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
def mapPartitionsToDouble(f: DoubleFlatMapFunction[JIterator[T]]): JavaDoubleRDD = {
def fn: (Iterator[T]) => Iterator[jl.Double] = {
(x: Iterator[T]) => f.call(x.asJava).asScala
}
Expand All @@ -179,7 +179,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[JIterator[T], K2, V2]):
JavaPairRDD[K2, V2] = {
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
(x: Iterator[T]) => f.call(x.asJava).asScala
Expand All @@ -190,7 +190,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]],
def mapPartitionsToDouble(f: DoubleFlatMapFunction[JIterator[T]],
preservesPartitioning: Boolean): JavaDoubleRDD = {
def fn: (Iterator[T]) => Iterator[jl.Double] = {
(x: Iterator[T]) => f.call(x.asJava).asScala
Expand All @@ -202,7 +202,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2],
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[JIterator[T], K2, V2],
preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = {
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
(x: Iterator[T]) => f.call(x.asJava).asScala
Expand All @@ -214,7 +214,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Applies a function f to each partition of this RDD.
*/
def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) {
def foreachPartition(f: VoidFunction[JIterator[T]]): Unit = {
rdd.foreachPartition(x => f.call(x.asJava))
}

Expand Down Expand Up @@ -256,19 +256,33 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: String): JavaRDD[String] = rdd.pipe(command)
def pipe(command: String): JavaRDD[String] = {
rdd.pipe(command)
}

/**
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: JList[String]): JavaRDD[String] =
def pipe(command: JList[String]): JavaRDD[String] = {
rdd.pipe(command.asScala)
}

/**
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: JList[String], env: java.util.Map[String, String]): JavaRDD[String] =
def pipe(command: JList[String], env: JMap[String, String]): JavaRDD[String] = {
rdd.pipe(command.asScala, env.asScala)
}

/**
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: JList[String],
env: JMap[String, String],
separateWorkingDir: Boolean,
bufferSize: Int): JavaRDD[String] = {
rdd.pipe(command.asScala, env.asScala, null, null, separateWorkingDir, bufferSize)
}

/**
* Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
Expand All @@ -288,7 +302,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def zipPartitions[U, V](
other: JavaRDDLike[U, _],
f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = {
f: FlatMapFunction2[JIterator[T], JIterator[U], V]): JavaRDD[V] = {
def fn: (Iterator[T], Iterator[U]) => Iterator[V] = {
(x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).asScala
}
Expand Down Expand Up @@ -446,22 +460,22 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return the count of each unique value in this RDD as a map of (value, count) pairs. The final
* combine step happens locally on the master, equivalent to running a single reduce task.
*/
def countByValue(): java.util.Map[T, jl.Long] =
mapAsSerializableJavaMap(rdd.countByValue()).asInstanceOf[java.util.Map[T, jl.Long]]
def countByValue(): JMap[T, jl.Long] =
mapAsSerializableJavaMap(rdd.countByValue()).asInstanceOf[JMap[T, jl.Long]]

/**
* (Experimental) Approximate version of countByValue().
*/
def countByValueApprox(
timeout: Long,
confidence: Double
): PartialResult[java.util.Map[T, BoundedDouble]] =
): PartialResult[JMap[T, BoundedDouble]] =
rdd.countByValueApprox(timeout, confidence).map(mapAsSerializableJavaMap)

/**
* (Experimental) Approximate version of countByValue().
*/
def countByValueApprox(timeout: Long): PartialResult[java.util.Map[T, BoundedDouble]] =
def countByValueApprox(timeout: Long): PartialResult[JMap[T, BoundedDouble]] =
rdd.countByValueApprox(timeout).map(mapAsSerializableJavaMap)

/**
Expand Down Expand Up @@ -596,19 +610,21 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Returns the maximum element from this RDD as defined by the specified
* Comparator[T].
*
* @param comp the comparator that defines ordering
* @return the maximum of the RDD
* */
*/
def max(comp: Comparator[T]): T = {
rdd.max()(Ordering.comparatorToOrdering(comp))
}

/**
* Returns the minimum element from this RDD as defined by the specified
* Comparator[T].
*
* @param comp the comparator that defines ordering
* @return the minimum of the RDD
* */
*/
def min(comp: Comparator[T]): T = {
rdd.min()(Ordering.comparatorToOrdering(comp))
}
Expand Down Expand Up @@ -684,7 +700,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* The asynchronous version of the `foreachPartition` action, which
* applies a function f to each partition of this RDD.
*/
def foreachPartitionAsync(f: VoidFunction[java.util.Iterator[T]]): JavaFutureAction[Void] = {
def foreachPartitionAsync(f: VoidFunction[JIterator[T]]): JavaFutureAction[Void] = {
new JavaFutureActionWrapper[Unit, Void](rdd.foreachPartitionAsync(x => f.call(x.asJava)),
{ x => null.asInstanceOf[Void] })
}
Expand Down
10 changes: 7 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.spark.rdd

import java.io.BufferedWriter
import java.io.File
import java.io.FilenameFilter
import java.io.IOException
import java.io.OutputStreamWriter
import java.io.PrintWriter
import java.util.StringTokenizer
import java.util.concurrent.atomic.AtomicReference
Expand All @@ -45,7 +47,8 @@ private[spark] class PipedRDD[T: ClassTag](
envVars: Map[String, String],
printPipeContext: (String => Unit) => Unit,
printRDDElement: (T, String => Unit) => Unit,
separateWorkingDir: Boolean)
separateWorkingDir: Boolean,
bufferSize: Int)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This causes a MiMa failure. This could be resolved with a default value for this arg; normally that would be essential although we could also just exclude the failure on the missing old method signature. I don't have a strong feeling but suppose it makes sense to have a default value?

[error]  * method pipe(scala.collection.Seq,scala.collection.Map,scala.Function1,scala.Function2,Boolean)org.apache.spark.rdd.RDD in class org.apache.spark.rdd.RDD does not have a correspondent in current version
[error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.rdd.RDD.pipe")
[error]  * method pipe(java.util.List,java.util.Map,Boolean,Int)org.apache.spark.api.java.JavaRDD in trait org.apache.spark.api.java.JavaRDDLike is present only in current version
[error]    filter with: ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.pipe")
[info] spark-mllib: found 0 potential binary incompatibilities while checking against org.apache.spark:spark-mllib_2.11:1.6.0  (filtered 498)

The other failure in JavaRDDLike can be excluded safely.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have excluded the missing old method signature.

extends RDD[String](prev) {

// Similar to Runtime.exec(), if we are given a single string, split it into words
Expand All @@ -58,7 +61,7 @@ private[spark] class PipedRDD[T: ClassTag](
printRDDElement: (T, String => Unit) => Unit = null,
separateWorkingDir: Boolean = false) =
this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement,
separateWorkingDir)
separateWorkingDir, 8192)


override def getPartitions: Array[Partition] = firstParent[T].partitions
Expand Down Expand Up @@ -144,7 +147,8 @@ private[spark] class PipedRDD[T: ClassTag](
new Thread(s"stdin writer for $command") {
override def run(): Unit = {
TaskContext.setTaskContext(context)
val out = new PrintWriter(proc.getOutputStream)
val out = new PrintWriter(new BufferedWriter(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Buffering here is probably a decent idea, with a small buffer. Is it even necessary to make it configurable? 8K is pretty standard; you've found a larger buffer (32K?) is better. Would you ever want to turn it off or make it quite larger than that? The reason is just that this requires you to change a public API and that's going to require additional steps.

Separately, this needs to specify UTF-8 encoding. Actually, we have this same problem in the stderr and stdout readers above, that they rely on platform encoding. I can sort of see an argument that using platform encoding makes sense when dealing with platform binaries, but, there's still no particular reason to expect the JVM default more often matches whatever some binary is using.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen - Thanks for taking a look. In our testing we found out that using a buffer of large size (1 MB) gives us a cpu savings of around 15%. It makes sense to be able to increase the buffer size when we are piping a large amount of data. If changing a public API is not too much trouble, it would be pretty useful for us to have a configurable buffer size.

Regarding your second point, I am not sure if I understand you. My change is not going to change the behavior of the PrintWriter at all. Do you mean to say the issue with UTF-8 encoding already exists and I should fix it in this diff?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, is it crazy to just set a large buffer size? 1M isn't that much since an executors isn't generally going to run lots of processes. I was thinking some conf parameter might be nicer than an API param, but, I suppose the right buffer size depends on what you're doing.

Maybe an API method argument isn't so bad. I was going to say, this needs to be plumbed through to the Java API, but its pipe method already lacks most of the args of the core Scala version. Maybe it's sensible if we do this for 2.0.0, and, fix up the Java API? (Python API works entirely differently anyway here)

@tejasapatil and erm, @andrewor14 or @tgravescs any particular opinions on adding a bufferSize arg to the pipe method?

Yes, if there's an encoding issue it's not introduced by this change of course. In general we never want to depend on the platform encoding. I wonder ... if this is one exception, since it's communicating with platform binaries. It's still not so great, but maybe on second thought that can be left alone for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introducing a new configuration has a limitation that it will force the entire pipeline (which might have several pipe() operations) to use the same buffer size globally. I prefer this to be in the API itself and preferably should be backward compatible so that existing jobs are not affected.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I personally think this is good to merge if we also update the Java API to include an overload of this method that exposes all of these args, including the new one.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. I will add the overloaded method to the Java API as well.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen - I added a Java api to specify separateWorkingDir and bufferSize. For printPipeContext and printRDDElement, these are functions which take another function as argument and there is no straightforward way to specify these in Java (probably that's the reason they are left out in the first place). Let me know what you think about the change.

Copy link
Contributor

@andrewor14 andrewor14 May 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably OK to expose the buffer size, though in other places the buffer size is usually a config. If there's a real use case (and it seems like there is) then maybe it's fine. It just seems like a pretty low level thing for the application to think about.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree with that; the problem is that it may quite reasonably vary from one RDD to another, so it didn't seem right as a global conf. Unless I've missed a third way, an optional param seemed most reasonable.

new OutputStreamWriter(proc.getOutputStream), bufferSize))
try {
// scalastyle:off println
// input the pipe context firstly
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -724,18 +724,21 @@ abstract class RDD[T: ClassTag](
* def printRDDElement(record:(String, Seq[String]), f:String=&gt;Unit) =
* for (e &lt;- record._2) {f(e)}
* @param separateWorkingDir Use separate working directories for each task.
* @param bufferSize Buffer size for the stdin writer for the piped process.
* @return the result RDD
*/
def pipe(
command: Seq[String],
env: Map[String, String] = Map(),
printPipeContext: (String => Unit) => Unit = null,
printRDDElement: (T, String => Unit) => Unit = null,
separateWorkingDir: Boolean = false): RDD[String] = withScope {
separateWorkingDir: Boolean = false,
bufferSize: Int = 8192): RDD[String] = withScope {
new PipedRDD(this, command, env,
if (printPipeContext ne null) sc.clean(printPipeContext) else null,
if (printRDDElement ne null) sc.clean(printRDDElement) else null,
separateWorkingDir)
separateWorkingDir,
bufferSize)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true)
val collectPwd = pipedPwd.collect()
assert(collectPwd(0).contains("tasks/"))
val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true).collect()
val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true, bufferSize = 16384).collect()
// make sure symlinks were created
assert(pipedLs.length > 0)
// clean up top level tasks directory
Expand Down
4 changes: 4 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,10 @@ object MimaExcludes {
"org.apache.spark.sql.Dataset.this"),
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"org.apache.spark.sql.DataFrameReader.this")
) ++ Seq(
// SPARK-14542 configurable buffer size for pipe RDD
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.rdd.RDD.pipe"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this one still needed? I'd think MiMa is fine with the Scala API change because there isn't now a method invocation that no longer works.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's needed. Without it the MiMa tests failed.

[error]  * method pipe(scala.collection.Seq,scala.collection.Map,scala.Function1,scala.Function2,Boolean)org.apache.spark.rdd.RDD in class org.apache.spark.rdd.RDD does not have a correspondent in current version
[error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.rdd.RDD.pipe")

ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.pipe")
) ++ Seq(
// [SPARK-4452][Core]Shuffle data structures can starve others on the same thread for memory
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.util.collection.Spillable")
Expand Down