Skip to content

Conversation

@lyogavin
Copy link
Contributor

@lyogavin lyogavin commented Jun 2, 2013

In Yahoo we want to migrate some of our hadoop based pipelines to Spark. Our pipelines are mostly implemented in hadoop streaming. We hope we can reuse most of the hadoop streaming code so that we can reduce the effort of the migration, and maybe only focus on rewriting the perf bottleneck. In our prototyping we saw that reusing existing code we can still get significant perf improvement. This should be also helpful for other users if they are currently using hadoop and want to change to Spark.

Though Spark has rdd.pipe() to do some similar work as hadoop streaming, we found some use cases as following still couldn't be supported.

  1. We have some problem with reuse the reducer side hadoop streaming code with pipe(). The problem we had was, in our use case, after keyBy() or groupBy() the size of the data for some key can be very big. If we resolve this problem using map() then pipe(), we need to construct a big string to concat all the strings in the Seq[String] of the RDD of one key, this would consume a quite big chunk of memory in some cases, and seems like a not very scalable and performant solution. Especially Spark is a so memory intensive system, optimizing memory usage is very important for us.
  2. Sometimes in hadoop streaming we need to use distributed cache(-cacheFile, -cacheArchive) to distribute the shared data to all tasktrackers. We need some similar mechanism in spark to do the same thing.

In this Pull Request, we enhanced the pipeRdd and rdd.pipe() to support above 2 features. For 1st, we allow user of pipe() to provide a function to specify how to transform the content of RDD and do the outputing. For 2nd, we allow user to provide a Seq[String] which will be piped to the shell process. User can use broadcast values to implement the similar feature as distributed cache in hadoop streaming.

@AmplabJenkins
Copy link

Thank you for your pull request. An admin will review this request soon.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can probably collapse the two (or even 3) additional instructors by using default arguments.

Also, wrap lines at 100 char width.

@rxin
Copy link
Member

rxin commented Jun 3, 2013

Thanks, Gavin, for submitting this. This is pretty cool. I made some comments in the coding style (and ask for a little bit more documentation on the new parameters).

@JoshRosen
Copy link
Member

Is there anything that transform does that can't be done by just calling map() on your RDD prior to pipe()?

Similarly, why does transform have the side-effect of writing to an output stream? Why not make transform a pure function and perform the output writing in PipedRDD?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The delimeter should be configurable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I'll add another paramter to allow the user to config this.

@mridulm
Copy link
Contributor

mridulm commented Jun 3, 2013

I agree with Josh, I am not sure what I am missing here : cant this not just be shuffle, map, pipe ?

@lyogavin
Copy link
Contributor Author

lyogavin commented Jun 3, 2013

@rxin Thanks Reynold, I'll change according to your comments later.

@lyogavin
Copy link
Contributor Author

lyogavin commented Jun 3, 2013

@JoshRosen Thanks Josh. Sorry I think I didn't describe the problem very clearly. The problem we had with directly using map() was, in our use case, after keyBy() or groupBy() the size of the data for some key can be very big. If we resolve this problem using map(), we need to construct a big string to concat all the strings in the Seq[String] of the RDD of one key. I think this would consume a quite big chunk of memory in some cases, and seems like a not very scalable and performant solution. Especially Spark is a so memory intensive system, optimizing memory usage is very important for us. I'm not sure if there are some better solution to resolve the problem without constructing a very big string using map(). If you can think of some better solution, i'm willing to stay with map().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this called with out.println instead of out.print? Using out.print would give users the freedom to incrementally build up output on a single line, whereas out.println forces them to either convert the record into one huge string or to split it across multiple lines.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, why not just pass the raw PrintWriter to transform? It would simplify transform's signature and provide a lot of flexibility without sacrificing performance.

This would efficiently support the case where you want to write a reducer's input as a key followed by one value per line (with appropriate delimiters between keys) because it wouldn't force you to append '\n' to the end of each value's string before printing it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think supporting customizing "\n" is so urgently needed, and I think it's out of the scope of this pull request.

Plus I think it's actually a complicated topic. I think a good API should have a good trade-off between flexibility and cohesion. When we expose PrintWriter to the users, we totally lose control of how they use it, they might use all the methods in PrintWriter interface, some of which are not what we want them to use. And this way, we would be tightly coupled with PrintWriter's implementation details. Exposing an abstract interface like here is a better way to decouple the implementation details. Hadoop abstracts the interface as org.apache.hadoop.streaming.io.InputWriter, and allows customization of almost anything by specifying a customized implementation of InputWriter. So if we really want to support the customization well, i think we can reuse org.apache.hadoop.streaming.io.InputWriter as what we do in hadoopFile for InputFormat. But again, i think that's out of the scope of this pull request.

@JoshRosen
Copy link
Member

In this implementation, the arguments are serialized and shipped to works as part of a task. If the arguments are expected to be large, it might make sense to ship them using broadcast variables. This could be done by wrapping arguments in a broadcast variable inside of the RDD.pipe() method, then changing PipedRDD's constructor to accept a Broadcast[Seq[String]].

I imagine that arguments will only be used for passing large values to the workers; for small values (e.g. input filenames), it makes more sense to pass them via environment variables or shell arguments. To avoid confusion of its purpose, is there a better name for this new mechanism than arguments?

@mridulm
Copy link
Contributor

mridulm commented Jun 3, 2013

What sort of key/value distribution are you targeting - specifically related to the memory pressure comment.
Heavy skew of values to keys ? Large number of unique keys ? something else ?

@lyogavin
Copy link
Contributor Author

lyogavin commented Jun 3, 2013

@JoshRosen you are right. It was suppose to be Broadcast[Seq[String]], I missed that, will add it.

@lyogavin
Copy link
Contributor Author

lyogavin commented Jun 3, 2013

@mridulm , I our use case, sometimes some key may contain values of huge size, which may be too large that we even dont know if it's possible to fit all into one node even with this change. There's no problem at all with Hadoop to process this kind of case as the data in hadoop are persisted. So I think the purpose of this patch is to make spark more scalable for this kind of use cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you have the 3rd pipe function with default arguments, I don't think you need the first two anymore?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this; we should consolidate these into one function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin @mateiz , I agree with you. I actually tried to combine as much as I can. ;) But I got some weird issues when I define default arguments for 2 overloaded version of pipe(): "in class RDD, multiple overloaded alternatives of method pipe define default arguments.". I dont know if you have run into this as well.

I tested, it seems like when there are more than 1 overloaded version of a function, only 1 is allowed to have default parameters:

scala> class A() { def f(a:Int = 0, b:Int) =0;def f(a:Int, b:String) =0}
defined class A

scala> class A() { def f(a:Int = 0, b:Int) =0;def f(a:Int = 0, b:String) =0}
:7: error: in class A, multiple overloaded alternatives of method f define default arguments.
class A() { def f(a:Int = 0, b:Int) =0;def f(a:Int = 0, b:String) =0}

Looks like there are some complaints about this weird policy already: http://stackoverflow.com/questions/4652095/why-does-the-scala-compiler-disallow-overloaded-methods-with-default-arguments.

Do you know any better way to resolve this problem?

The reason I kept the first 2 version of pipe() were, I wanted to still have the pipe(String) and pipe(Seq[String]) available to maintain backward compatibility. I thought we'd better still keep users' previous code working, otherwise, after this change they would need to modify their code and rebuild. Do you think we should consider the backward compatibility? What's your opinion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, how about only the Seq[String] version of the pipe can have the "advanced" features?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds good to me.

@rxin
Copy link
Member

rxin commented Jun 6, 2013

Hi Gavin,

Thanks for taking the effort to clean up. I think the number of functions and constructors can each be reduced to 1 with what your have. You can just delete the first two def pipe, and move the default arguments from the 2nd constructor to the first one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: trasnform should be transform.

@mateiz
Copy link
Member

mateiz commented Jun 9, 2013

Hey Gavin, just a couple of comments on this:

  • Using default arguments would be good, as others pointed out
  • For the context, would it make sense to pass a sequence of broadcast variables instead of just one? I can imagine apps that would share part of the context across pipe() calls.

@lyogavin
Copy link
Contributor Author

@mateiz want to make sure i understand your point clearly, did you mean seq[Broadcast[U]]?

@mateiz
Copy link
Member

mateiz commented Jun 13, 2013

Sorry, yes, that's what I meant. Do you think it would be useful?

@lyogavin
Copy link
Contributor Author

@mateiz I think it should be useful. The way we are using it is, we have some big arraies from collect() of some other jobs. Then we combine them into one array, wrap it with Broadcast, then stream it into the pipe process. With Seq[Broadcast[U]], I think we can avoid doing the array merging here. The only thing I'm not sure is, if it makes it a little too complicated. But I think it's worth it. I'll change it accordingly.

@rxin
Copy link
Member

rxin commented Jun 14, 2013

Gavin - if you think this is getting too complicated, the other option is to have this in a Yahoo internal class YahooRddOps, and RDDs can be implicitly converted into a YahooRddOps (like how it is done in PairRDDFunctions).

@lyogavin
Copy link
Contributor Author

Hi Matei and Reynold, I do feel it's a little too complicated. I thought about it further, I think there's another better solution. We can remove the pipeContext and delimiter parameters, and define another function parameter to allow user to pipe context with. So we'll just have 2 functions one for context data, one for RDD each elements. This way the user can use the Broadcast and delimiter in any way they want.

I have updated the code with this new solution, and updated according to the feedbacks.

Sorry i merged my local branch which makes it a little confusing to read. Please look at fb6d733 and 4508089.

@mateiz
Copy link
Member

mateiz commented Jun 17, 2013

Hey Gavin,

That's a great idea! It's really simplified it. I'm going to merge it in.

mateiz added a commit that referenced this pull request Jun 17, 2013
Enhance pipe to support more features we can do in hadoop streaming
@mateiz mateiz merged commit e6d1277 into mesos:master Jun 17, 2013
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a typo here in the doc. Gavin, do you mind submitting a new pull request to fix this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Thanks for pointing it out. I'll fix it.

mateiz added a commit that referenced this pull request Jun 19, 2013
aarondav pushed a commit to aarondav/mesos-spark that referenced this pull request Feb 26, 2014
…data to them.

[SPARK-1108] This allows us to use, e.g. HBase's TableOutputFormat with PairRDDFunctions.saveAsNewAPIHadoopFile, which otherwise would throw NullPointerException because the output table name hasn't been configured.

Note this bug also affects branch-0.9

Author: Bryn Keller <[email protected]>

Closes mesos#638 from xoltar/SPARK-1108 and squashes the following commits:

7e94e7d [Bryn Keller] Import, comment, and format cleanup per code review
7cbcaa1 [Bryn Keller] For outputformats that are Configurable, call setConf before sending data to them. This allows us to use, e.g. HBase TableOutputFormat, which otherwise would throw NullPointerException because the output table name hasn't been configured
pwendell pushed a commit to andyk/mesos-spark that referenced this pull request May 5, 2014
This is specially import because some ssh errors are raised as UsageError, preventing an automated usage of the script from detecting the failure.

Author: Allan Douglas R. de Oliveira <[email protected]>

Closes mesos#638 from douglaz/ec2_exit_code_fix and squashes the following commits:

5915e6d [Allan Douglas R. de Oliveira] EC2 script should exit with non-zero code on UsageError
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants