Skip to content

Conversation

@HeartSaVioR
Copy link
Contributor

What changes were proposed in this pull request?

This patch proposes refactoring serializerFor method between ScalaReflection and JavaTypeInference, being consistent with what we refactored for deserializerFor in #23854.

This patch also extracts the logic on recording walk type path since the logic is duplicated across serializerFor and deserializerFor with ScalaReflection and JavaTypeInference.

How was this patch tested?

Existing tests.

@SparkQA
Copy link

SparkQA commented Feb 27, 2019

Test build #102825 has finished for PR 23908 at commit 9d62bc9.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about making this as class (with storing built path in each instance) but soon realized it requires touching other thing as well and feel a bit overkill. I'm still open to make this as individual class so please let me know if it sounds better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

making it a class looks better, as it needs to accumulate the walked type path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the support! Just addressed.

@fottey
Copy link

fottey commented Feb 27, 2019

As an outside observer, would this refactoring allow the method ScalaReflection.serializeFor to handle arbitrary types that conform to the Java bean interface, and/or common Java specific types, such as java.util.List?

I recently discovered that because most of the common Scala implicit encoders reduce to ExpressionEncoder's apply method, it's very difficult to work with arbitrary Java bean type's in the Dataset API.

Specifically, given a java bean type, MyBean, and an implicit encoder of that bean type in scope, existing Spark 2.4.0 machinery in can't synthesize a valid encoder at runtime for hybrid Scala / Java types, like Seq[MyBean] or tuple types like (Int, MyBean) despite the fact that we have encoders for Seq[_], Tuple2[_, _], and MyBean available separately.

While it may be unreasonable to solve the problem generically across all potential classes, it would be really nice if ExpressionEncoder's apply method could somehow detect and support at least java beans and java.util.Lists at runtime...

See here on Stackoverflow for more details...

See below code examples:

import com.example.MyBean
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder

object Example {
    case class Test()

    def main(args: Array[String]): Unit = {
        val spark: SparkSession = ???

        import spark.implicits._

        // Works today after above implicit import
        val ds: Dataset[Seq[Test]] = Seq(Seq(Test()), Seq(Test()), ...).toDS

        // DOES NOT WORK
        // ExpressionEncoder's apply method cannot handle type MyBean!
        implicit def newMyBeanExpressionEncoder: Encoder[MyBean] = ExpressionEncoder()
        // 
        // Need to do the following:
        implicit def newMyBeanBeanEncoder: Encoder[MyBean] = Encoders.bean(classOf[MyBean])

        // But this only allows expressing things like this:
        val ds: Dataset[MyBean] = Seq(new MyBean(), new MyBean(), ...).toDS

        // Due to the above limitation we CANNOT do the following, EVEN AFTER
        // newMyBeanBeanEncoder is brought into scope!
        // DOES NOT WORK 
        val ds: Dataset[Seq[MyBean]] = Seq(Seq(new MyBean()), Seq(new MyBean()), ...).toDS

        // Finally, these do not work: 

        // DOES NOT WORK 
        val ds: Dataset[(Int, MyBean)] = Seq((0, new MyBean()),(0, new MyBean()), ...).toDS

        // DOES NOT WORK
        implicit def newMyBeanEncoder: Encoder[Seq[MyBean]] = ExpressionEncoder()
        
        // DOES NOT WORK
        implicit def newMyBeanEncoder: Encoder[java.util.List[MyBean]] = ExpressionEncoder()

        // The above samples all rely on ExpressionEncoder
        // being able to handle every type in the expression...
        // currently seems to work for:
        // - case classes
        // - tuples
        // - scala.Product
        // - scala "primitives"
        // other common types with encoders... BUT NOT java beans or java.util.List... :'(
    }
}

@SparkQA
Copy link

SparkQA commented Feb 27, 2019

Test build #102828 has finished for PR 23908 at commit 3e17117.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 27, 2019

Test build #102829 has finished for PR 23908 at commit d7b8292.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Feb 27, 2019

@fottey
Could you file an issue to JIRA for your case? Refactor is normally done without touching behavior, or at most making some minor changes, so I think it's out of scope for this PR.

Someone may want to take a look at it, or I may spend time to take a look at. Just would like to limit scope of concerns.

@SparkQA
Copy link

SparkQA commented Feb 27, 2019

Test build #102833 has finished for PR 23908 at commit 01b7c41.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

Some test failures are occurred just because of one more new line. Fixed.
Some other test failures complain plan do not match: though string representation of two plans look same. Need to investigate.

@HeartSaVioR
Copy link
Contributor Author

The default implementation of equals in WalkedTypePath affects comparison. Fixed.

@SparkQA
Copy link

SparkQA commented Feb 28, 2019

Test build #102841 has finished for PR 23908 at commit 0d42fb4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this belongs to the previous PR: why not just let the caller side create the expression and pass to deserializerForWithNullSafety?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Feb 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I might thought too complicated. Not a big deal and looks simpler. Will address. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use recordRoot here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we use a mutable list for better performance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was to address diverged paths for map key and value, but we can also copy instance via cloning internal list if necessary. Will address.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed via 90df8a3 - I found it a bit complicated to maintain the list without polluting, so please take a look at the change and let me know if you would like to roll back to immutable one if performance gain doesn't seem to have more value than complexity.

@SparkQA
Copy link

SparkQA commented Feb 28, 2019

Test build #102842 has finished for PR 23908 at commit 8e62f4c.

  • This patch fails PySpark pip packaging tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class WalkedTypePath(walkedPaths: Seq[String] = Nil) extends Serializable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this is same as expressionWithNullSafety?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah now it's same. I've left comment on #23916. Btw I feel we might be better to focus review channel to one, either this PR or #23916 .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't notice you also make a lot change to DeserializerBuildHelper in this PR. There might be conflicts if continuing #23916, I will close it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think expressionWithNullSafety is more general naming so might be preferred one, but deserializerForWithNullSafety is also a good name cause we have relevant method deserializerForWithNullSafetyAndUpcast.

So that's a matter of preference and either can be removed. Which method would we prefer to keep?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @viirya , please feel free to comment even it belongs to previous PR. Thanks again!

Copy link
Member

@viirya viirya Feb 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why you remove the funcForCreatingNewExpr from this and turn to pass in created expression (deserializer)?

I think the previous deserializerForWithNullSafety is more consistent to deserializerForWithNullSafetyAndUpcast.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to follow the suggestion (#23908 (comment)) unless I have strong opinion, as I'm fairly new to contributing SQL area. For consistency I agree having func is better, but for simplicity we can remove it like applying inline. Either is reasonable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we just need to keep expressionWithNullSafety. I don't see why we have to have 2 methods for deserializeFor. Leaving only a deserializerForWithNullSafetyAndUpcast is fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left deserializerForWithNullSafetyAndUpcast and expressionWithNullSafety since both are used in multiple places. Please let me know if it doesn't work.

@viirya
Copy link
Member

viirya commented Feb 28, 2019

So this is a purely refactoring PR and doesn't address bug, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems these helper methods don't reduce code and just add one more wrapper around calling Invoke. Are they needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not for reducing code. This is for consistency. These methods ensure we are consistently serialize / deserialize things between ScalaReflection and JavaTypeInference if the type is same.

@HeartSaVioR
Copy link
Contributor Author

So this is a purely refactoring PR and doesn't address bug, right?

Yes, and make things consistent between deserializerFor and serializerFor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this revert what you did in previous PR?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Feb 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got review comment for previous PR in here as well - this is the reflection of replacing function to applied expression which ends up making deserializerForWithNullSafety and expressionWithNullSafety being same.

@SparkQA
Copy link

SparkQA commented Feb 28, 2019

Test build #102850 has finished for PR 23908 at commit 90df8a3.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class WalkedTypePath() extends Serializable
  • case class AssertNotNull(child: Expression, walkedTypePath: WalkedTypePath = WalkedTypePath())

@SparkQA
Copy link

SparkQA commented Mar 1, 2019

Test build #102882 has finished for PR 23908 at commit ff7512b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Mar 1, 2019

Nice work, @HeartSaVioR! btw, this pr consists of the two parts you described in the PR description. If so, how about splitting this into the two prs for easy reviews? Refactoring the code for the consistency between ScalaReflection and JavaTypeInference, and adding WalkedTypePath then?

expr
} else {
AssertNotNull(expr, walkedTypePath)
AssertNotNull(expr, walkedTypePath.copy())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can let AssertNotNull take a Seq[String], to force us to copy the WalkedTypePath when creating AssertNotNull

case _: ArrayType => expr
case _: MapType => expr
case _ => UpCast(expr, expected, walkedTypePath)
case _ => UpCast(expr, expected, walkedTypePath.copy())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

valueType.getType.getTypeName)

val newTypePathForKey = walkedTypePath.copy()
val newTypePathForValue = walkedTypePath.copy()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the back and forth. But seems it's better to make WalkedTypePath immutable as there are branches. It's hard to maintain and we can easily mess it up if we forget the call copy somewhere.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Mar 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, same understanding. No problem! I'll revert back to let WalkedTypePath be immutable one.

@HeartSaVioR
Copy link
Contributor Author

@maropu I like the idea of splitting PR, but since @cloud-fan already provides feedbacks on WalkedTypePath, might be better to hear opinion and decide. Let me first address his feedback on WalkedTypePath - even we decide to break down it would be needed work.

This reverts commit c67826a.

NOTE: there's conflict which makes revert commit not clearly reverting as before, but WalkedTypePath is clearly reverted
@cloud-fan
Copy link
Contributor

I'm OK both ways. Since the PR already contains the WalkedTypePath refactor, I think it's fine to include it as it's pretty intuitive.

@HeartSaVioR
Copy link
Contributor Author

Thanks! I'll keep it as it is. How about applying DeserializerBuildHelper/SerializerBuildHelper to RowEncoder? Better to have it as separate PR?

"fromPrimitiveArray",
input :: Nil,
returnNullable = false)
createSerializerForPrimitiveArray(input, dt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems this branch is missing in the java side. We can address it in the followup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raised PR: #24015

case class UpCast(
child: Expression,
dataType: DataType,
walkedTypePath: WalkedTypePath = new WalkedTypePath())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we keep it Seq[String]? When we reach here, the walkedTypePath is only needed for logging/error message, and we don't need the WalkedTypePath class to help accumulate the paths.

* non-null `s`, `s.i` can't be null.
*/
case class AssertNotNull(child: Expression, walkedTypePath: Seq[String] = Nil)
case class AssertNotNull(child: Expression, walkedTypePath: WalkedTypePath = new WalkedTypePath())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@SparkQA
Copy link

SparkQA commented Mar 1, 2019

Test build #102899 has finished for PR 23908 at commit 852debd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 1, 2019

Test build #102902 has finished for PR 23908 at commit 578d8fe.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 1, 2019

Test build #102906 has finished for PR 23908 at commit 20e8d5a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class AssertNotNull(child: Expression, walkedTypePath: Seq[String] = Nil)

val inputObject = BoundReference(0, ObjectType(beanClass), nullable = true)
val nullSafeInput = AssertNotNull(inputObject, Seq("top level input bean"))
val nullSafeInput = AssertNotNull(inputObject,
WalkedTypePath().recordRoot("top level input bean").getPaths)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can keep it unchanged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes that's not even same. Will revert.

// For input object of Product type, we can't encode it to row if it's null, as Spark SQL
// doesn't allow top-level row to be null, only its columns can be null.
AssertNotNull(r, Seq("top level Product or row object"))
AssertNotNull(r, WalkedTypePath().recordRoot("top level Product or row object").getPaths)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can keep it unchanged.

case class UpCast(
child: Expression,
dataType: DataType,
walkedTypePath: Seq[String] = Nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we revert the code style change?

import org.apache.spark.serializer._
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, ScalaReflection}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, ScalaReflection, WalkedTypePath}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary change

@SparkQA
Copy link

SparkQA commented Mar 2, 2019

Test build #102936 has finished for PR 23908 at commit 50c2ddc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class UpCast(child: Expression, dataType: DataType, walkedTypePath: Seq[String] = Nil)

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 34f6066 Mar 4, 2019
@HeartSaVioR
Copy link
Contributor Author

Thanks all for reviewing and merging!

@HeartSaVioR HeartSaVioR deleted the SPARK-27001 branch March 4, 2019 03:04
cloud-fan pushed a commit that referenced this pull request Mar 8, 2019
## What changes were proposed in this pull request?

This is follow-up PR which addresses review comment in PR for SPARK-27001:
#23908 (comment)

This patch proposes addressing primitive array type for serializer - instead of handling it to generic one, Spark now handles it efficiently as primitive array.

## How was this patch tested?

UT modified to include primitive array.

Closes #24015 from HeartSaVioR/SPARK-27001-FOLLOW-UP-java-primitive-array.

Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants