-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-20384][SQL] Support value class in schema of Dataset #22309
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@cloud-fan @liancheng @marmbrus could you please take a look at this and start the tests? |
|
Hello. I've just found this PR while trying to debug serialization errors for a case class containing value classes. This would be very helpful! |
| } | ||
|
|
||
| object TestingValueClass { | ||
| case class IntWrapper(i: Int) extends AnyVal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does value class must be a case class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't but since Spark only supports case class (not class) for schema type. So I keep it that way.
Child columns can be class though. I think adding that in the future on top of this is not difficult.
|
somehow I lost track of this PR. ok to test |
|
ok to test |
|
Test build #97232 has finished for PR 22309 at commit
|
| intField: Int, | ||
| wrappedInt: IntWrapper, | ||
| strField: String, | ||
| wrappedStr: StrWrapper) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might need a comment to describe what this class is look like in Java.
Seems like it has 2 int fields intField, wrappedInt, and 2 string fields strField, wrappedStr. I'm not sure it is the same in Scala 2.12, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I added some comments for this
|
Test build #97331 has finished for PR 22309 at commit
|
|
@cloud-fan may I ask what is |
|
retest this please |
|
Test build #97360 has finished for PR 22309 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why can we skip the NewInstance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take class User above for example. After compile, field id of type Id will become Int so when constructing User we need id to be Int.
Also why we need NewInstance in case Id is itself the schema? Because Id may remain as Id if it is treated as another type (following allocation rule). For example, in method encodeDecodeTest, if we pass an instance of Id as input, it will not be converted to Int. In the other case when the required type is explicitly Id, then both the input and the result returned from deserialization will both become Int.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have the same question. When we deserialize a value class, don't we always have NewInstance to construct it back? When constructing User, can't we pass in a Id?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya we cannot because after compile, field id: Id becomes id: Int
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So after Scala compile, value class will be its underlying type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, sorry I didn't see the identify case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the T is User, will Id class be instantiated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No in that case Id will be Int
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So for Dataset, can we conclude that if a value class is top-level, it will be instantiated, but if it is nested field, it will be just underlying type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes @viirya
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about getClassNameFromType(t)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I cannot use that because it returns the class name after erasure (e.g. it returns Int for IntWrapper). I'll create a separate method to make this clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why a value class must be a Product? Is it because there is no way to get the value class field name and type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, now you ask this, I realize that the Product constraint can be completely removed.
Also, after scanning through the type api, I found a built-in way to check for value class... Don't know why I never thought about this 🤦♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not true, top level value class can be a single primitive type column
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here what I mean by "treated as a product" is we must create a new instance for it in case we have Dataset[Id] for example (as in my comment above). Seems like this is confusing? Should I reword it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you have an example of this message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, here is the message when running with value class Id above
- Scala value class: org.apache.spark.sql.catalyst.encoders.Id(scala.Int)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we update dataTypeFor to handle value class, instead of special handling it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updating dataTypeFor is not enough since trueFieldType is used in code below as well. We can move this into getConstructorParameters but then this special handling is still there, so it will not be cleaner.
|
what's still missing to support top level value class? |
|
Test build #97511 has finished for PR 22309 at commit
|
c14b5f9 to
ca98663
Compare
|
Test build #97950 has finished for PR 22309 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it is usually to name it as tpe for Type in ScalaReflection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree, it's better to use tpe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use getUnderlyingTypeOf consistently? Let getUnderlyingTypeOf return both parameter name and type.
Or just use getConstructorParameters and get rid of getUnderlyingTypeOf?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're right, I should return both name and type from getUnderlyingTypeOf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need such special handling? There is new serialization handling for value class added above, can't we simple get the object of value class here and let recursively call of serializerFor to handle it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried moving the special logic to the value class case but have a concern I don't know how to resolve yet. I need to change dataTypeFor to return ObjectType for top level value class and dataTypeFor(underlyingType) otherwise (see my comment). I'm going with something like this:
private def dataTypeFor(tpe: `Type`, isTopLevelValueClass: Boolean = true)but this isn't right because:
- the default value
truedoesn't make sense for other types - if default is
falseor there is no default value, many places that call this method need to be changed - it also feels clunky because
dataTypeFornow has to be aware of the context of its parameter
Do you have any suggestion on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for explaining! In order to cover both value class instantiated and not instantiated cases, I think we may need this special handling.
|
Test build #98128 has finished for PR 22309 at commit
|
|
@cloud-fan It works now. Actually, top level value class is supported from SPARK-17368. I try to maintain that and add support for nested value class in this patch. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to confirm, scala value class for primitive type can't be null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup, value class in general cannot be null since it is a subtype of AnyVal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we also test with null values?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, I added a test with StringWrapper(null)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you rebase? I think path is not Option anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I haven't rebased since last week.
Hmm, now since path is not Option anymore, I think I have to use if (walkedTypePath.length > 1) to have the same logic. But this seems a little bit hacky. Do you have any suggestion on this?
|
Test build #98170 has finished for PR 22309 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
Outdated
Show resolved
Hide resolved
| // for us automatically. | ||
| val arg = deserializerFor(underlyingType, path, newTypePath, Some(t)) | ||
| val isCollectionElement = lastType.exists { lt => | ||
| lt <:< localTypeOf[Array[_]] || lt <:< localTypeOf[Seq[_]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the support for Map
| val isCollectionElement = lastType.exists { lt => | ||
| lt <:< localTypeOf[Array[_]] || lt <:< localTypeOf[Seq[_]] | ||
| } | ||
| if (lastType.isEmpty || isCollectionElement) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks to me that we don't need lastType, but just a boolean parameter "needInstantiateValueClass".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I choose the lastType approach because it may be useful for other use cases also. But what you said is also true. I changed it to a boolean parameter
| * @param tpe The `Type` of deserialized object. | ||
| * @param path The expression which can be used to extract serialized value. | ||
| * @param walkedTypePath The paths from top to bottom to access current field when deserializing. | ||
| * @param instantiateValueClass If `true`, create an instance for Scala value class |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it will be good to explain when we need to instantiate value class and why
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I updated the comment
|
|
||
| /** Returns the name and type of the underlying parameter of value class `tpe`. */ | ||
| def getUnderlyingParameterOf(tpe: `Type`): (String, Type) = { | ||
| getConstructorParameters(tpe).head |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a more official way to get the value class field name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure, I can't find any
|
My only concern is, the value class handling is kind of spread out in |
|
Test build #98203 has finished for PR 22309 at commit
|
|
Test build #98200 has finished for PR 22309 at commit
|
|
@cloud-fan while what you said is true, I think it's not that bad. Overall, the only major special logic is inside the case class |
|
Test build #98248 has finished for PR 22309 at commit
|
|
adding @liancheng BTW. IIRC, he took a look for this one before and abandoned the change (fix me if I'm wrongly remembering this). |
|
I would love to have this change in - if anyone can review this and merge - it would be great. |
|
Hi, is there any update on whether this patch is ok to merge? |
|
Can one of the admins verify this patch? |
|
We're closing this PR because it hasn't been updated in a while. If you'd like to revive this PR, please reopen it! |
### What changes were proposed in this pull request? - This PR revisits #22309, and [SPARK-20384](https://issues.apache.org/jira/browse/SPARK-20384) solving the original problem, but additionally will prevent backward-compat break on schema of top-level `AnyVal` value class. - Why previous break? We currently support top-level value classes just as any other case class; field of the underlying type is present in schema. This means any dataframe SQL filtering on this expects the field name to be present. The previous PR changes this schema and would result in breaking current usage. See test `"schema for case class that is a value class"`. This PR keeps the schema. - We actually currently support collection of value classes prior to this change, but not case class of nested value class. This means the schema of these classes shouldn't change to prevent breaking too. - However, what we can change, without breaking, is schema of nested value class, which will fails due to the compile problem, and thus its schema now isn't actually valid. After the change, the schema of this nested value class is now flattened - With this PR, there's flattening only for nested value class (new), but not for top-level and collection classes (existing behavior) - This PR revisits #27153 by handling tuple `Tuple2[AnyVal, AnyVal]` which is a constructor ("nested class") but is a generic type, so it should not be flattened behaving similarly to `Seq[AnyVal]` ### Why are the changes needed? - Currently, nested value class isn't supported. This is because when the generated code treats `anyVal` class in its unwrapped form, but we encode the type to be the wrapped case class. This results in compile of generated code For example, For a given `AnyVal` wrapper and its root-level class container ``` case class IntWrapper(i: Int) extends AnyVal case class ComplexValueClassContainer(c: IntWrapper) ``` The problematic part of generated code: ``` private InternalRow If_1(InternalRow i) { boolean isNull_42 = i.isNullAt(0); // 1) ******** The root-level case class we care org.apache.spark.sql.catalyst.encoders.ComplexValueClassContainer value_46 = isNull_42 ? null : ((org.apache.spark.sql.catalyst.encoders.ComplexValueClassContainer) i.get(0, null)); if (isNull_42) { throw new NullPointerException(((java.lang.String) references[5] /* errMsg */ )); } boolean isNull_39 = true; // 2) ******** We specify its member to be unwrapped case class extending `AnyVal` org.apache.spark.sql.catalyst.encoders.IntWrapper value_43 = null; if (!false) { isNull_39 = false; if (!isNull_39) { // 3) ******** ERROR: `c()` compiled however is of type `int` and thus we see error value_43 = value_46.c(); } } ``` We get this errror: Assignment conversion not possible from type "int" to type "org.apache.spark.sql.catalyst.encoders.IntWrapper" ``` java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 159, Column 1: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 159, Column 1: Assignment conversion not possible from type "int" to type "org.apache.spark.sql.catalyst.encoders.IntWrapper" ``` From [doc](https://docs.scala-lang.org/overviews/core/value-classes.html) on value class: , Given: `class Wrapper(val underlying: Int) extends AnyVal`, 1) "The type at compile time is `Wrapper`, but at runtime, the representation is an `Int`". This implies that when our struct has a field of value class, the generated code should support the underlying type during runtime execution. 2) `Wrapper` "must be instantiated... when a value class is used as a type argument". This implies that `scala.Tuple[Wrapper, ...], Seq[Wrapper], Map[String, Wrapper], Option[Wrapper]` will still contain Wrapper as-is in during runtime instead of `Int`. ### Does this PR introduce _any_ user-facing change? - Yes, this will allow support for the nested value class. ### How was this patch tested? - Added unit tests to illustrate - raw schema - projection - round-trip encode/decode Closes #33205 from mickjermsurawong-stripe/SPARK-20384-2. Lead-authored-by: Mick Jermsurawong <[email protected]> Co-authored-by: Emil Ejbyfeldt <[email protected]> Signed-off-by: Sean Owen <[email protected]>
### What changes were proposed in this pull request? - This PR revisits apache/spark#22309, and [SPARK-20384](https://issues.apache.org/jira/browse/SPARK-20384) solving the original problem, but additionally will prevent backward-compat break on schema of top-level `AnyVal` value class. - Why previous break? We currently support top-level value classes just as any other case class; field of the underlying type is present in schema. This means any dataframe SQL filtering on this expects the field name to be present. The previous PR changes this schema and would result in breaking current usage. See test `"schema for case class that is a value class"`. This PR keeps the schema. - We actually currently support collection of value classes prior to this change, but not case class of nested value class. This means the schema of these classes shouldn't change to prevent breaking too. - However, what we can change, without breaking, is schema of nested value class, which will fails due to the compile problem, and thus its schema now isn't actually valid. After the change, the schema of this nested value class is now flattened - With this PR, there's flattening only for nested value class (new), but not for top-level and collection classes (existing behavior) - This PR revisits apache/spark#27153 by handling tuple `Tuple2[AnyVal, AnyVal]` which is a constructor ("nested class") but is a generic type, so it should not be flattened behaving similarly to `Seq[AnyVal]` ### Why are the changes needed? - Currently, nested value class isn't supported. This is because when the generated code treats `anyVal` class in its unwrapped form, but we encode the type to be the wrapped case class. This results in compile of generated code For example, For a given `AnyVal` wrapper and its root-level class container ``` case class IntWrapper(i: Int) extends AnyVal case class ComplexValueClassContainer(c: IntWrapper) ``` The problematic part of generated code: ``` private InternalRow If_1(InternalRow i) { boolean isNull_42 = i.isNullAt(0); // 1) ******** The root-level case class we care org.apache.spark.sql.catalyst.encoders.ComplexValueClassContainer value_46 = isNull_42 ? null : ((org.apache.spark.sql.catalyst.encoders.ComplexValueClassContainer) i.get(0, null)); if (isNull_42) { throw new NullPointerException(((java.lang.String) references[5] /* errMsg */ )); } boolean isNull_39 = true; // 2) ******** We specify its member to be unwrapped case class extending `AnyVal` org.apache.spark.sql.catalyst.encoders.IntWrapper value_43 = null; if (!false) { isNull_39 = false; if (!isNull_39) { // 3) ******** ERROR: `c()` compiled however is of type `int` and thus we see error value_43 = value_46.c(); } } ``` We get this errror: Assignment conversion not possible from type "int" to type "org.apache.spark.sql.catalyst.encoders.IntWrapper" ``` java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 159, Column 1: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 159, Column 1: Assignment conversion not possible from type "int" to type "org.apache.spark.sql.catalyst.encoders.IntWrapper" ``` From [doc](https://docs.scala-lang.org/overviews/core/value-classes.html) on value class: , Given: `class Wrapper(val underlying: Int) extends AnyVal`, 1) "The type at compile time is `Wrapper`, but at runtime, the representation is an `Int`". This implies that when our struct has a field of value class, the generated code should support the underlying type during runtime execution. 2) `Wrapper` "must be instantiated... when a value class is used as a type argument". This implies that `scala.Tuple[Wrapper, ...], Seq[Wrapper], Map[String, Wrapper], Option[Wrapper]` will still contain Wrapper as-is in during runtime instead of `Int`. ### Does this PR introduce _any_ user-facing change? - Yes, this will allow support for the nested value class. ### How was this patch tested? - Added unit tests to illustrate - raw schema - projection - round-trip encode/decode Closes #33205 from mickjermsurawong-stripe/SPARK-20384-2. Lead-authored-by: Mick Jermsurawong <[email protected]> Co-authored-by: Emil Ejbyfeldt <[email protected]> Signed-off-by: Sean Owen <[email protected]>
What changes were proposed in this pull request?
This PR adds support for Scala value class in schema of Datasets (as both top level class and nested field).
The idea is to treat value class as its underlying type at run time. For example:
However, if the value class is top-level (e.g.
Dataset[Id]) then it must be treated like a boxed type and must be instantiated. I'm not sure why it behaves this way but I suspect it is related to the expansion of value class when we do casting (e.g.asInstanceOf[T])Actually, this feature is addressed before in SPARK-17368 but the patch only supports top-level case. Hence we see the error when value class is nested as in SPARK-19741 and SPARK-20384
How was this patch tested?
I added unit tests for top-level and nested case.