-
Notifications
You must be signed in to change notification settings - Fork 13
Add support for a Flatten annotation #20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
BenFradet
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, thanks a lot! 👍
Tell me when you're happy to merge.
|
|
||
| import scala.collection.generic.IsTraversableOnce | ||
|
|
||
| final class Meta(val metadata: Metadata) extends StaticAnnotation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we have made this a case class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think we should do that, and remove the val keyword.
kmate
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks OK so far, but I think we should add a schema mapping function before we can consider it complete: we must convert the flattened schema back to a nested one using Dataset.select to make .as[T] work again with this.
| StructTypeEncoder[Foo].encode shouldBe StructType( | ||
| StructField("a", StringType, false) :: | ||
| StructField("b", IntegerType, false, metadata) :: Nil | ||
| case class Bar(@Flatten(2) a: Seq[Foo], @Flatten(1, Seq("x", "y")) b: Map[Symbol, Foo], @Flatten c: Foo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use Map[String, Foo] here as the keys in flatten are Strings anyway.
|
|
||
| import scala.collection.generic.IsTraversableOnce | ||
|
|
||
| final class Meta(val metadata: Metadata) extends StaticAnnotation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think we should do that, and remove the val keyword.
| } | ||
|
|
||
| implicit def recordEncoder[A, H <: HList, HA <: HList]( | ||
| private def flattenFields(fields: Seq[StructField], dt: DataType, prefix: String, flatten: Flatten) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we specify the return type?
| @@ -0,0 +1,197 @@ | |||
| package ste | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we add the license header in this file and the associated spec?
| } | ||
|
|
||
| object DFUtils { | ||
| implicit class EnhancedDF(df: DataFrame) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that these are names of implicits, but I would rename at least the class. EnhancedDF should be something like FlattenedDataFrame. Also, I would split the method, and create a public wrapper around the select only, like selectNested: DataFrame. Then asNested can be implemented with selectNested and as.
BenFradet
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some nits but it looks great 👍 . Also, could we, in a separate PR, document those new features (Metadata and Flatten) in the readme?
| tSelector.select(dfNested, parentPrefixes, flatten.tail) | ||
| } | ||
|
|
||
| private def getChildPrefixes(prefixes: Seq[Prefix], flatten: Option[Flatten]) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we're also missing the return type here
| } | ||
|
|
||
| private def getChildPrefixes(prefixes: Seq[Prefix], flatten: Option[Flatten]) = | ||
| flatten.map(_ match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can just do
flatten.map {
case ...
}| } | ||
| }(breakOut) | ||
|
|
||
| private def orderedSelect(df: DataFrame, nestedCols: Map[Prefix, Column], fields: Seq[Prefix]) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing return type here too
|
|
||
| private def orderedSelect(df: DataFrame, nestedCols: Map[Prefix, Column], fields: Seq[Prefix]) = { | ||
| @tailrec | ||
| def loop(nestedCols: Map[Prefix, Column], fields: Seq[Prefix], cols: Seq[Column]): Seq[Column] = fields match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we make fields and cols lists instead of seqs?
| StructTypeEncoder[Foo].encode shouldBe StructType( | ||
| StructField("a", StringType, false) :: | ||
| StructField("b", IntegerType, false, metadata) :: Nil | ||
| case class Bar(@Flatten(2) a: Seq[Foo], @Flatten(1, Seq("x", "y")) b: collection.Map[String, Foo], @Flatten c: Foo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't we have Map instead of collection.Map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately not, this fix hasn't been backported to spark 2.1 apache/spark#16161
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, would you be against an upgrade to 2.2.1 in release 0.2.0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wrong, they backported it to 2.1.x, I'm upgrading the patch version, I think we should do the minor update in a separate PR
|
|
||
| object StructSelectorSpec { | ||
| case class Foo(a: Int, b: String) | ||
| case class Bar(@Flatten(1, Seq("asd", "qwe")) foo: collection.Map[String, Foo], c: Int) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same remark for the collection.Map
|
@BenFradet I'm going to write some documentation for these new features today hopefully |
| } | ||
|
|
||
| object DFUtils { | ||
| def selectNested[A](df: DataFrame)(implicit s: StructTypeSelector[A]): DataFrame = s.select(df, None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would move this into FlattenedDataFrame .
| @tailrec | ||
| def loop(nestedCols: Map[Prefix, Column], fields: List[Prefix], cols: List[Column]): List[Column] = fields match { | ||
| case Nil => cols.reverse | ||
| case hd +: tail => nestedCols.find { case (p, _) => p.isParentOf(hd) } match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since you have lists you can do hd :: tail, same thing below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea, unfortunately shapeless overrides the :: definition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, too bad :(
| case class Bar(@Flatten(1, Seq("asd", "qwe")) foo: Map[String, Foo], c: Int) | ||
| case class Baz(@Flatten(2) bar: Seq[Bar], e: Int) | ||
| case class Asd(@Flatten foo: Foo, x: Int) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we move this at the top of the spec, this make things easier to understand
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved the companion object up, it needs to be in an object for spark encoder to access the scope that those case classes were defined in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I don't have an issue with encapsulating them in an object, it's just that if we move them at the top it makes understanding the tests easier 👍
BenFradet
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great, two nits and I'll merge 👍 Thanks a lot for all your effort
|
Merging, thanks a lot! 👍 |
No description provided.