Skip to content

Conversation

@sitalkedia
Copy link

What changes were proposed in this pull request?

Currently PipedRDD internally uses PrintWriter to write data to the stdin of the piped process, which by default uses a BufferedWriter of buffer size 8k. In our experiment, we have seen that 8k buffer size is too small and the job spends significant amount of CPU time in system calls to copy the data. We should have a way to configure the buffer size for the writer.

How was this patch tested?

Ran PipedRDDSuite tests.

@sitalkedia
Copy link
Author

cc @srowen

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Buffering here is probably a decent idea, with a small buffer. Is it even necessary to make it configurable? 8K is pretty standard; you've found a larger buffer (32K?) is better. Would you ever want to turn it off or make it quite larger than that? The reason is just that this requires you to change a public API and that's going to require additional steps.

Separately, this needs to specify UTF-8 encoding. Actually, we have this same problem in the stderr and stdout readers above, that they rely on platform encoding. I can sort of see an argument that using platform encoding makes sense when dealing with platform binaries, but, there's still no particular reason to expect the JVM default more often matches whatever some binary is using.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen - Thanks for taking a look. In our testing we found out that using a buffer of large size (1 MB) gives us a cpu savings of around 15%. It makes sense to be able to increase the buffer size when we are piping a large amount of data. If changing a public API is not too much trouble, it would be pretty useful for us to have a configurable buffer size.

Regarding your second point, I am not sure if I understand you. My change is not going to change the behavior of the PrintWriter at all. Do you mean to say the issue with UTF-8 encoding already exists and I should fix it in this diff?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, is it crazy to just set a large buffer size? 1M isn't that much since an executors isn't generally going to run lots of processes. I was thinking some conf parameter might be nicer than an API param, but, I suppose the right buffer size depends on what you're doing.

Maybe an API method argument isn't so bad. I was going to say, this needs to be plumbed through to the Java API, but its pipe method already lacks most of the args of the core Scala version. Maybe it's sensible if we do this for 2.0.0, and, fix up the Java API? (Python API works entirely differently anyway here)

@tejasapatil and erm, @andrewor14 or @tgravescs any particular opinions on adding a bufferSize arg to the pipe method?

Yes, if there's an encoding issue it's not introduced by this change of course. In general we never want to depend on the platform encoding. I wonder ... if this is one exception, since it's communicating with platform binaries. It's still not so great, but maybe on second thought that can be left alone for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introducing a new configuration has a limitation that it will force the entire pipeline (which might have several pipe() operations) to use the same buffer size globally. I prefer this to be in the API itself and preferably should be backward compatible so that existing jobs are not affected.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I personally think this is good to merge if we also update the Java API to include an overload of this method that exposes all of these args, including the new one.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. I will add the overloaded method to the Java API as well.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen - I added a Java api to specify separateWorkingDir and bufferSize. For printPipeContext and printRDDElement, these are functions which take another function as argument and there is no straightforward way to specify these in Java (probably that's the reason they are left out in the first place). Let me know what you think about the change.

Copy link
Contributor

@andrewor14 andrewor14 May 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably OK to expose the buffer size, though in other places the buffer size is usually a config. If there's a real use case (and it seems like there is) then maybe it's fine. It just seems like a pretty low level thing for the application to think about.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree with that; the problem is that it may quite reasonably vary from one RDD to another, so it didn't seem right as a global conf. Unless I've missed a third way, an optional param seemed most reasonable.

@sitalkedia sitalkedia force-pushed the bufferedPipedRDD branch 3 times, most recently from 38d2ab9 to 3524a22 Compare April 25, 2016 23:51
Copy link
Member

@srowen srowen Apr 26, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: wouldn't mind importing this as JMap and using it consistently in this file while we're here.

This can also take arguments for printPipeContext and printRDDElement. That's not part of your change of course but make sense to fix this omission while adding the all-args API method to support the new arg. EDIT: to answer your question the argument type is simply VoidFunction[String] and you just pass the Scala function s => f.call(s) to the Scala API. Something mostly like that.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I changed to use JMap and JIterator across the file.

Regarding the arguments for printPipeContext and printRDDElement, simply passing VoidFunction[String] is not sufficient because printPipeContext is of type (String => Unit) => Unit. Any idea how to deal with that?

@sitalkedia sitalkedia force-pushed the bufferedPipedRDD branch 2 times, most recently from 1bc15c9 to 6ecd927 Compare May 1, 2016 06:06
@srowen
Copy link
Member

srowen commented May 2, 2016

I'm still supportive of this change, but I think fleshing out the new Java API method would really complete it, as we'd fix up an inconsistency while making this change. It ought to be one line. It'd also be nice to change or add the existing pipe tests to at least try setting a different buffer size.

@sitalkedia
Copy link
Author

@srowen - I think you missed my comment earlier. I totally agree with you that new Java API should be in sync with scala api.

Repeating my comment below -

Regarding the arguments for printPipeContext and printRDDElement, simply passing VoidFunction[String] is not sufficient because printPipeContext is of type (String => Unit) => Unit. Any idea how to deal with that?

@srowen
Copy link
Member

srowen commented May 2, 2016

Oops, right, that got collapsed. Hm, so it's actually a VoidFunction[VoidFunction[String]] then? a little bit trickier but should still be fairly easy to support. Give it a shot and see if that just works out pretty easily.

@sitalkedia
Copy link
Author

So I tried the following, but it does not work.

  def pipe(command: JList[String],
           env: JMap[String, String],
           printPipeContext: VoidFunction[VoidFunction[String]],
           separateWorkingDir: Boolean,
           bufferSize: Int): JavaRDD[String] =
    rdd.pipe(command.asScala, env.asScala, s => printPipeContext.call(s),
      null, separateWorkingDir, bufferSize)

@srowen
Copy link
Member

srowen commented May 2, 2016

Yeah I think it has to be something like (a: String => Unit) => printPipeContext.call(new VoidFunction[String]() { override def call(s: String): Unit = a(s) }) I'm happy to punt on this aspect since it's non-trivial and not the intent of your change. If you can create any test code to cover this path I think it's GTG.

@sitalkedia
Copy link
Author

Thanks @srowen. I changed one of the test to cover the code path. Let me know what you think.

@srowen
Copy link
Member

srowen commented May 3, 2016

Jenkins retest this please

@SparkQA
Copy link

SparkQA commented May 3, 2016

Test build #57634 has finished for PR 12309 at commit a193a47.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sitalkedia
Copy link
Author

Fixed checkstyle.

@andrewor14
Copy link
Contributor

retest this please

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style:

def pipe(
    command: JList[String],
    ...
    bufferSize: Int): JavaRDD[String] = {
  rdd.pipe(...)
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed, thanks!

@SparkQA
Copy link

SparkQA commented May 3, 2016

Test build #57687 has finished for PR 12309 at commit a674b3c.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This causes a MiMa failure. This could be resolved with a default value for this arg; normally that would be essential although we could also just exclude the failure on the missing old method signature. I don't have a strong feeling but suppose it makes sense to have a default value?

[error]  * method pipe(scala.collection.Seq,scala.collection.Map,scala.Function1,scala.Function2,Boolean)org.apache.spark.rdd.RDD in class org.apache.spark.rdd.RDD does not have a correspondent in current version
[error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.rdd.RDD.pipe")
[error]  * method pipe(java.util.List,java.util.Map,Boolean,Int)org.apache.spark.api.java.JavaRDD in trait org.apache.spark.api.java.JavaRDDLike is present only in current version
[error]    filter with: ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.pipe")
[info] spark-mllib: found 0 potential binary incompatibilities while checking against org.apache.spark:spark-mllib_2.11:1.6.0  (filtered 498)

The other failure in JavaRDDLike can be excluded safely.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have excluded the missing old method signature.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this added extra parens which aren't needed. If you wouldn't mind, while changing this, end the line above with ): Unit = { to standardize

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will fix.

@srowen
Copy link
Member

srowen commented May 5, 2016

Jenkins retest this please

@SparkQA
Copy link

SparkQA commented May 5, 2016

Test build #57874 has finished for PR 12309 at commit bd252b7.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sitalkedia
Copy link
Author

I don't understand why the MiMa failure is still there. I have added exclusions for them. Any idea?

[error]  * method pipe(scala.collection.Seq,scala.collection.Map,scala.Function1,scala.Function2,Boolean)org.apache.spark.rdd.RDD in class org.apache.spark.rdd.RDD does not have a correspondent in current version
[error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.rdd.RDD.pipe")
[error]  * method pipe(java.util.List,java.util.Map,Boolean,Int)org.apache.spark.api.java.JavaRDD in trait org.apache.spark.api.java.JavaRDDLike is present only in current version
[error]    filter with: ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.pipe")

@sitalkedia
Copy link
Author

Ah, I had made the exclusion for v1.6 and we are building v2.0. Moved the exclusion for v2.0 instead, hopefully that will fix the MiMa issue.

@srowen
Copy link
Member

srowen commented May 6, 2016

Jenkins retest this please

@SparkQA
Copy link

SparkQA commented May 6, 2016

Test build #57979 has finished for PR 12309 at commit 9efb2cf.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sitalkedia
Copy link
Author

@srowen - Do you know how I can run the MiMa test locally?

@srowen
Copy link
Member

srowen commented May 6, 2016

Sure just ./dev/mima

@sitalkedia
Copy link
Author

I don't understand ./dev/mima passes on my laptop. I also verified that ./dev/mima fails without my changes in MimaExcludes.scala. Something weird with the Jenkins build?

@sitalkedia
Copy link
Author

Ah, this time its not the MiMa failure, seems like some flaky test failed - https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/57979/

@srowen
Copy link
Member

srowen commented May 6, 2016

Jenkins retest this please

@SparkQA
Copy link

SparkQA commented May 6, 2016

Test build #58016 has finished for PR 12309 at commit 9efb2cf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

ProblemFilters.exclude[IncompatibleMethTypeProblem](
"org.apache.spark.sql.DataFrameReader.this")
) ++ Seq(
// SPARK-14542 configurable buffer size for pipe RDD
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this one still needed? I'd think MiMa is fine with the Scala API change because there isn't now a method invocation that no longer works.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's needed. Without it the MiMa tests failed.

[error]  * method pipe(scala.collection.Seq,scala.collection.Map,scala.Function1,scala.Function2,Boolean)org.apache.spark.rdd.RDD in class org.apache.spark.rdd.RDD does not have a correspondent in current version
[error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.rdd.RDD.pipe")

@srowen
Copy link
Member

srowen commented May 8, 2016

Aside from one final question I think this is OK.

@srowen
Copy link
Member

srowen commented May 10, 2016

Merged to master/2.0

asfgit pushed a commit that referenced this pull request May 10, 2016
## What changes were proposed in this pull request?

Currently PipedRDD internally uses PrintWriter to write data to the stdin of the piped process, which by default uses a BufferedWriter of buffer size 8k. In our experiment, we have seen that 8k buffer size is too small and the job spends significant amount of CPU time in system calls to copy the data. We should have a way to configure the buffer size for the writer.

## How was this patch tested?
Ran PipedRDDSuite tests.

Author: Sital Kedia <[email protected]>

Closes #12309 from sitalkedia/bufferedPipedRDD.

(cherry picked from commit a019e6e)
Signed-off-by: Sean Owen <[email protected]>
@asfgit asfgit closed this in a019e6e May 10, 2016
@sitalkedia
Copy link
Author

Thanks for the review @srowen.

superbobry pushed a commit to criteo-forks/spark that referenced this pull request Mar 1, 2017
Currently PipedRDD internally uses PrintWriter to write data to the stdin of the piped process, which by default uses a BufferedWriter of buffer size 8k. In our experiment, we have seen that 8k buffer size is too small and the job spends significant amount of CPU time in system calls to copy the data. We should have a way to configure the buffer size for the writer.

Ran PipedRDDSuite tests.

Author: Sital Kedia <[email protected]>

Closes apache#12309 from sitalkedia/bufferedPipedRDD.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants