Skip to content

Commit a019e6e

Browse files
Sital Kediasrowen
authored andcommitted
[SPARK-14542][CORE] PipeRDD should allow configurable buffer size for…
## What changes were proposed in this pull request? Currently PipedRDD internally uses PrintWriter to write data to the stdin of the piped process, which by default uses a BufferedWriter of buffer size 8k. In our experiment, we have seen that 8k buffer size is too small and the job spends significant amount of CPU time in system calls to copy the data. We should have a way to configure the buffer size for the writer. ## How was this patch tested? Ran PipedRDDSuite tests. Author: Sital Kedia <[email protected]> Closes #12309 from sitalkedia/bufferedPipedRDD.
1 parent 5706472 commit a019e6e

File tree

5 files changed

+54
-27
lines changed

5 files changed

+54
-27
lines changed

core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.api.java
1919

2020
import java.{lang => jl}
2121
import java.lang.{Iterable => JIterable}
22-
import java.util.{Comparator, Iterator => JIterator, List => JList}
22+
import java.util.{Comparator, Iterator => JIterator, List => JList, Map => JMap}
2323

2424
import scala.collection.JavaConverters._
2525
import scala.reflect.ClassTag
@@ -80,7 +80,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
8080
* This should ''not'' be called by users directly, but is available for implementors of custom
8181
* subclasses of RDD.
8282
*/
83-
def iterator(split: Partition, taskContext: TaskContext): java.util.Iterator[T] =
83+
def iterator(split: Partition, taskContext: TaskContext): JIterator[T] =
8484
rdd.iterator(split, taskContext).asJava
8585

8686
// Transformations (return a new RDD)
@@ -96,7 +96,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
9696
* of the original partition.
9797
*/
9898
def mapPartitionsWithIndex[R](
99-
f: JFunction2[jl.Integer, java.util.Iterator[T], java.util.Iterator[R]],
99+
f: JFunction2[jl.Integer, JIterator[T], JIterator[R]],
100100
preservesPartitioning: Boolean = false): JavaRDD[R] =
101101
new JavaRDD(rdd.mapPartitionsWithIndex((a, b) => f.call(a, b.asJava).asScala,
102102
preservesPartitioning)(fakeClassTag))(fakeClassTag)
@@ -147,7 +147,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
147147
/**
148148
* Return a new RDD by applying a function to each partition of this RDD.
149149
*/
150-
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = {
150+
def mapPartitions[U](f: FlatMapFunction[JIterator[T], U]): JavaRDD[U] = {
151151
def fn: (Iterator[T]) => Iterator[U] = {
152152
(x: Iterator[T]) => f.call(x.asJava).asScala
153153
}
@@ -157,7 +157,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
157157
/**
158158
* Return a new RDD by applying a function to each partition of this RDD.
159159
*/
160-
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U],
160+
def mapPartitions[U](f: FlatMapFunction[JIterator[T], U],
161161
preservesPartitioning: Boolean): JavaRDD[U] = {
162162
def fn: (Iterator[T]) => Iterator[U] = {
163163
(x: Iterator[T]) => f.call(x.asJava).asScala
@@ -169,7 +169,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
169169
/**
170170
* Return a new RDD by applying a function to each partition of this RDD.
171171
*/
172-
def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
172+
def mapPartitionsToDouble(f: DoubleFlatMapFunction[JIterator[T]]): JavaDoubleRDD = {
173173
def fn: (Iterator[T]) => Iterator[jl.Double] = {
174174
(x: Iterator[T]) => f.call(x.asJava).asScala
175175
}
@@ -179,7 +179,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
179179
/**
180180
* Return a new RDD by applying a function to each partition of this RDD.
181181
*/
182-
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
182+
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[JIterator[T], K2, V2]):
183183
JavaPairRDD[K2, V2] = {
184184
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
185185
(x: Iterator[T]) => f.call(x.asJava).asScala
@@ -190,7 +190,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
190190
/**
191191
* Return a new RDD by applying a function to each partition of this RDD.
192192
*/
193-
def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]],
193+
def mapPartitionsToDouble(f: DoubleFlatMapFunction[JIterator[T]],
194194
preservesPartitioning: Boolean): JavaDoubleRDD = {
195195
def fn: (Iterator[T]) => Iterator[jl.Double] = {
196196
(x: Iterator[T]) => f.call(x.asJava).asScala
@@ -202,7 +202,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
202202
/**
203203
* Return a new RDD by applying a function to each partition of this RDD.
204204
*/
205-
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2],
205+
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[JIterator[T], K2, V2],
206206
preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = {
207207
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
208208
(x: Iterator[T]) => f.call(x.asJava).asScala
@@ -214,7 +214,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
214214
/**
215215
* Applies a function f to each partition of this RDD.
216216
*/
217-
def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) {
217+
def foreachPartition(f: VoidFunction[JIterator[T]]): Unit = {
218218
rdd.foreachPartition(x => f.call(x.asJava))
219219
}
220220

@@ -256,19 +256,33 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
256256
/**
257257
* Return an RDD created by piping elements to a forked external process.
258258
*/
259-
def pipe(command: String): JavaRDD[String] = rdd.pipe(command)
259+
def pipe(command: String): JavaRDD[String] = {
260+
rdd.pipe(command)
261+
}
260262

261263
/**
262264
* Return an RDD created by piping elements to a forked external process.
263265
*/
264-
def pipe(command: JList[String]): JavaRDD[String] =
266+
def pipe(command: JList[String]): JavaRDD[String] = {
265267
rdd.pipe(command.asScala)
268+
}
266269

267270
/**
268271
* Return an RDD created by piping elements to a forked external process.
269272
*/
270-
def pipe(command: JList[String], env: java.util.Map[String, String]): JavaRDD[String] =
273+
def pipe(command: JList[String], env: JMap[String, String]): JavaRDD[String] = {
271274
rdd.pipe(command.asScala, env.asScala)
275+
}
276+
277+
/**
278+
* Return an RDD created by piping elements to a forked external process.
279+
*/
280+
def pipe(command: JList[String],
281+
env: JMap[String, String],
282+
separateWorkingDir: Boolean,
283+
bufferSize: Int): JavaRDD[String] = {
284+
rdd.pipe(command.asScala, env.asScala, null, null, separateWorkingDir, bufferSize)
285+
}
272286

273287
/**
274288
* Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
@@ -288,7 +302,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
288302
*/
289303
def zipPartitions[U, V](
290304
other: JavaRDDLike[U, _],
291-
f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = {
305+
f: FlatMapFunction2[JIterator[T], JIterator[U], V]): JavaRDD[V] = {
292306
def fn: (Iterator[T], Iterator[U]) => Iterator[V] = {
293307
(x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).asScala
294308
}
@@ -446,22 +460,22 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
446460
* Return the count of each unique value in this RDD as a map of (value, count) pairs. The final
447461
* combine step happens locally on the master, equivalent to running a single reduce task.
448462
*/
449-
def countByValue(): java.util.Map[T, jl.Long] =
450-
mapAsSerializableJavaMap(rdd.countByValue()).asInstanceOf[java.util.Map[T, jl.Long]]
463+
def countByValue(): JMap[T, jl.Long] =
464+
mapAsSerializableJavaMap(rdd.countByValue()).asInstanceOf[JMap[T, jl.Long]]
451465

452466
/**
453467
* (Experimental) Approximate version of countByValue().
454468
*/
455469
def countByValueApprox(
456470
timeout: Long,
457471
confidence: Double
458-
): PartialResult[java.util.Map[T, BoundedDouble]] =
472+
): PartialResult[JMap[T, BoundedDouble]] =
459473
rdd.countByValueApprox(timeout, confidence).map(mapAsSerializableJavaMap)
460474

461475
/**
462476
* (Experimental) Approximate version of countByValue().
463477
*/
464-
def countByValueApprox(timeout: Long): PartialResult[java.util.Map[T, BoundedDouble]] =
478+
def countByValueApprox(timeout: Long): PartialResult[JMap[T, BoundedDouble]] =
465479
rdd.countByValueApprox(timeout).map(mapAsSerializableJavaMap)
466480

467481
/**
@@ -596,19 +610,21 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
596610
/**
597611
* Returns the maximum element from this RDD as defined by the specified
598612
* Comparator[T].
613+
*
599614
* @param comp the comparator that defines ordering
600615
* @return the maximum of the RDD
601-
* */
616+
*/
602617
def max(comp: Comparator[T]): T = {
603618
rdd.max()(Ordering.comparatorToOrdering(comp))
604619
}
605620

606621
/**
607622
* Returns the minimum element from this RDD as defined by the specified
608623
* Comparator[T].
624+
*
609625
* @param comp the comparator that defines ordering
610626
* @return the minimum of the RDD
611-
* */
627+
*/
612628
def min(comp: Comparator[T]): T = {
613629
rdd.min()(Ordering.comparatorToOrdering(comp))
614630
}
@@ -684,7 +700,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
684700
* The asynchronous version of the `foreachPartition` action, which
685701
* applies a function f to each partition of this RDD.
686702
*/
687-
def foreachPartitionAsync(f: VoidFunction[java.util.Iterator[T]]): JavaFutureAction[Void] = {
703+
def foreachPartitionAsync(f: VoidFunction[JIterator[T]]): JavaFutureAction[Void] = {
688704
new JavaFutureActionWrapper[Unit, Void](rdd.foreachPartitionAsync(x => f.call(x.asJava)),
689705
{ x => null.asInstanceOf[Void] })
690706
}

core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717

1818
package org.apache.spark.rdd
1919

20+
import java.io.BufferedWriter
2021
import java.io.File
2122
import java.io.FilenameFilter
2223
import java.io.IOException
24+
import java.io.OutputStreamWriter
2325
import java.io.PrintWriter
2426
import java.util.StringTokenizer
2527
import java.util.concurrent.atomic.AtomicReference
@@ -45,7 +47,8 @@ private[spark] class PipedRDD[T: ClassTag](
4547
envVars: Map[String, String],
4648
printPipeContext: (String => Unit) => Unit,
4749
printRDDElement: (T, String => Unit) => Unit,
48-
separateWorkingDir: Boolean)
50+
separateWorkingDir: Boolean,
51+
bufferSize: Int)
4952
extends RDD[String](prev) {
5053

5154
// Similar to Runtime.exec(), if we are given a single string, split it into words
@@ -58,7 +61,7 @@ private[spark] class PipedRDD[T: ClassTag](
5861
printRDDElement: (T, String => Unit) => Unit = null,
5962
separateWorkingDir: Boolean = false) =
6063
this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement,
61-
separateWorkingDir)
64+
separateWorkingDir, 8192)
6265

6366

6467
override def getPartitions: Array[Partition] = firstParent[T].partitions
@@ -144,7 +147,8 @@ private[spark] class PipedRDD[T: ClassTag](
144147
new Thread(s"stdin writer for $command") {
145148
override def run(): Unit = {
146149
TaskContext.setTaskContext(context)
147-
val out = new PrintWriter(proc.getOutputStream)
150+
val out = new PrintWriter(new BufferedWriter(
151+
new OutputStreamWriter(proc.getOutputStream), bufferSize))
148152
try {
149153
// scalastyle:off println
150154
// input the pipe context firstly

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -724,18 +724,21 @@ abstract class RDD[T: ClassTag](
724724
* def printRDDElement(record:(String, Seq[String]), f:String=&gt;Unit) =
725725
* for (e &lt;- record._2) {f(e)}
726726
* @param separateWorkingDir Use separate working directories for each task.
727+
* @param bufferSize Buffer size for the stdin writer for the piped process.
727728
* @return the result RDD
728729
*/
729730
def pipe(
730731
command: Seq[String],
731732
env: Map[String, String] = Map(),
732733
printPipeContext: (String => Unit) => Unit = null,
733734
printRDDElement: (T, String => Unit) => Unit = null,
734-
separateWorkingDir: Boolean = false): RDD[String] = withScope {
735+
separateWorkingDir: Boolean = false,
736+
bufferSize: Int = 8192): RDD[String] = withScope {
735737
new PipedRDD(this, command, env,
736738
if (printPipeContext ne null) sc.clean(printPipeContext) else null,
737739
if (printRDDElement ne null) sc.clean(printRDDElement) else null,
738-
separateWorkingDir)
740+
separateWorkingDir,
741+
bufferSize)
739742
}
740743

741744
/**

core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
171171
val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true)
172172
val collectPwd = pipedPwd.collect()
173173
assert(collectPwd(0).contains("tasks/"))
174-
val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true).collect()
174+
val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true, bufferSize = 16384).collect()
175175
// make sure symlinks were created
176176
assert(pipedLs.length > 0)
177177
// clean up top level tasks directory

project/MimaExcludes.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -685,6 +685,10 @@ object MimaExcludes {
685685
"org.apache.spark.sql.Dataset.this"),
686686
ProblemFilters.exclude[IncompatibleMethTypeProblem](
687687
"org.apache.spark.sql.DataFrameReader.this")
688+
) ++ Seq(
689+
// SPARK-14542 configurable buffer size for pipe RDD
690+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.rdd.RDD.pipe"),
691+
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.pipe")
688692
) ++ Seq(
689693
// [SPARK-4452][Core]Shuffle data structures can starve others on the same thread for memory
690694
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.util.collection.Spillable")

0 commit comments

Comments
 (0)