-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23593][SQL] Add interpreted execution for InitializeJavaBean expression #20756
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
978080b
0c48a9b
b8f171e
b51303a
6372f04
970ed6c
e20c207
f3fdf57
1a78334
e7640e1
0d6b3d3
2954e8d
96a8fe6
573db59
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1261,8 +1261,42 @@ case class InitializeJavaBean(beanInstance: Expression, setters: Map[String, Exp | |
| override def children: Seq[Expression] = beanInstance +: setters.values.toSeq | ||
| override def dataType: DataType = beanInstance.dataType | ||
|
|
||
| override def eval(input: InternalRow): Any = | ||
| throw new UnsupportedOperationException("Only code-generated evaluation is supported.") | ||
| private lazy val resolvedSetters = { | ||
| assert(beanInstance.dataType.isInstanceOf[ObjectType]) | ||
|
|
||
| val ObjectType(beanClass) = beanInstance.dataType | ||
| setters.map { | ||
| case (name, expr) => | ||
| // Looking for known type mapping first, then using Class attached in `ObjectType`. | ||
| // Finally also looking for general `Object`-type parameter for generic methods. | ||
| val paramTypes = CallMethodViaReflection.typeMapping.getOrElse(expr.dataType, | ||
|
||
| Seq(expr.dataType.asInstanceOf[ObjectType].cls)) ++ Seq(classOf[Object]) | ||
| val methods = paramTypes.flatMap { fieldClass => | ||
| try { | ||
| Some(beanClass.getDeclaredMethod(name, fieldClass)) | ||
| } catch { | ||
| case e: NoSuchMethodException => None | ||
| } | ||
| } | ||
| if (methods.isEmpty) { | ||
| throw new NoSuchMethodException(s"""A method named "$name" is not declared """ + | ||
| "in any enclosing class nor any supertype, nor through a static import") | ||
|
||
| } | ||
| methods.head -> expr | ||
| } | ||
| } | ||
|
|
||
| override def eval(input: InternalRow): Any = { | ||
| val instance = beanInstance.eval(input) | ||
| if (instance != null) { | ||
| val bean = instance.asInstanceOf[Object] | ||
| resolvedSetters.foreach { | ||
| case (setter, expr) => | ||
| setter.invoke(bean, expr.eval(input).asInstanceOf[Object]) | ||
|
||
| } | ||
| } | ||
| instance | ||
| } | ||
|
|
||
| override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { | ||
| val instanceGen = beanInstance.genCode(ctx) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -55,7 +55,8 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks { | |
|
|
||
| protected def checkEvaluation( | ||
| expression: => Expression, expected: Any, inputRow: InternalRow = EmptyRow): Unit = { | ||
| val expr = prepareEvaluation(expression) | ||
| // Make it as method to obtain fresh expression everytime. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why this change?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The content of bean instance will be changed after first evaluation of interpreted execution. For example, in the added unit test, the input bean of the later evaluation will become
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we using a literal? Ok, makes sense. |
||
| def expr = prepareEvaluation(expression) | ||
| val catalystValue = CatalystTypeConverters.convertToCatalyst(expected) | ||
| checkEvaluationWithoutCodegen(expr, catalystValue, inputRow) | ||
| checkEvaluationWithGeneratedMutableProjection(expr, catalystValue, inputRow) | ||
|
|
@@ -111,12 +112,14 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks { | |
| val errMsg = intercept[T] { | ||
| eval | ||
| }.getMessage | ||
| if (errMsg != expectedErrMsg) { | ||
| if (!errMsg.contains(expectedErrMsg)) { | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For codegen error, it has very verbose message like: So changes it to test if it contains the given error message. |
||
| fail(s"Expected error message is `$expectedErrMsg`, but `$errMsg` found") | ||
| } | ||
| } | ||
| } | ||
| val expr = prepareEvaluation(expression) | ||
|
|
||
| // Make it as method to obtain fresh expression everytime. | ||
| def expr = prepareEvaluation(expression) | ||
| checkException(evaluateWithoutCodegen(expr, inputRow), "non-codegen mode") | ||
| checkException(evaluateWithGeneratedMutableProjection(expr, inputRow), "codegen mode") | ||
| if (GenerateUnsafeProjection.canSupport(expr.dataType)) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -68,6 +68,23 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { | |
| mapEncoder.serializer.head, mapExpected, mapInputRow) | ||
| } | ||
|
|
||
| test("SPARK-23593: InitializeJavaBean should support interpreted execution") { | ||
| val list = new java.util.LinkedList[Int]() | ||
| list.add(1) | ||
|
|
||
| val initializeBean = InitializeJavaBean(Literal.fromObject(new java.util.LinkedList[Int]), | ||
| Map("add" -> Literal(1))) | ||
| checkEvaluation(initializeBean, list, InternalRow.fromSeq(Seq())) | ||
|
|
||
| val initializeWithNonexistingMethod = InitializeJavaBean( | ||
| Literal.fromObject(new java.util.LinkedList[Int]), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you also add a test for when the parameter types do not match up?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added below. Note that because for generic method, its parameter type is |
||
| Map("nonexisting" -> Literal(1))) | ||
| checkExceptionInExpression[Exception](initializeWithNonexistingMethod, | ||
| InternalRow.fromSeq(Seq()), | ||
| """A method named "nonexisting" is not declared in any enclosing class """ + | ||
| "nor any supertype, nor through a static import") | ||
| } | ||
|
|
||
| test("SPARK-23585: UnwrapOption should support interpreted execution") { | ||
| val cls = classOf[Option[Int]] | ||
| val inputObject = BoundReference(0, ObjectType(cls), nullable = true) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to put
assert(beanInstance.dataType.isInstanceOf[ObjectType])in the constructor?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.