Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1410,8 +1410,47 @@ case class InitializeJavaBean(beanInstance: Expression, setters: Map[String, Exp
override def children: Seq[Expression] = beanInstance +: setters.values.toSeq
override def dataType: DataType = beanInstance.dataType

override def eval(input: InternalRow): Any =
throw new UnsupportedOperationException("Only code-generated evaluation is supported.")
private lazy val resolvedSetters = {
assert(beanInstance.dataType.isInstanceOf[ObjectType])

val ObjectType(beanClass) = beanInstance.dataType
setters.map {
case (name, expr) =>
// Looking for known type mapping.
// But also looking for general `Object`-type parameter for generic methods.
val paramTypes = ScalaReflection.expressionJavaClasses(Seq(expr)) ++ Seq(classOf[Object])
val methods = paramTypes.flatMap { fieldClass =>
try {
Some(beanClass.getDeclaredMethod(name, fieldClass))
} catch {
case e: NoSuchMethodException => None
}
}
if (methods.isEmpty) {
throw new NoSuchMethodException(s"""A method named "$name" is not declared """ +
"in any enclosing class nor any supertype")
}
methods.head -> expr
}
}

override def eval(input: InternalRow): Any = {
val instance = beanInstance.eval(input)
if (instance != null) {
val bean = instance.asInstanceOf[Object]
resolvedSetters.foreach {
case (setter, expr) =>
val paramVal = expr.eval(input)
if (paramVal == null) {
throw new NullPointerException("The parameter value for setters in " +
"`InitializeJavaBean` can not be null")
} else {
setter.invoke(bean, paramVal.asInstanceOf[AnyRef])
}
}
}
instance
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val instanceGen = beanInstance.genCode(ctx)
Expand All @@ -1424,6 +1463,10 @@ case class InitializeJavaBean(beanInstance: Expression, setters: Map[String, Exp
val fieldGen = fieldValue.genCode(ctx)
s"""
|${fieldGen.code}
|if (${fieldGen.isNull}) {
| throw new NullPointerException("The parameter value for setters in " +
| "`InitializeJavaBean` can not be null");
|}
|$javaBeanInstance.$setterMethod(${fieldGen.value});
""".stripMargin
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {

protected def checkEvaluation(
expression: => Expression, expected: Any, inputRow: InternalRow = EmptyRow): Unit = {
val expr = prepareEvaluation(expression)
// Make it as method to obtain fresh expression everytime.
def expr = prepareEvaluation(expression)
val catalystValue = CatalystTypeConverters.convertToCatalyst(expected)
checkEvaluationWithoutCodegen(expr, catalystValue, inputRow)
checkEvaluationWithGeneratedMutableProjection(expr, catalystValue, inputRow)
Expand Down Expand Up @@ -111,12 +112,14 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
val errMsg = intercept[T] {
eval
}.getMessage
if (errMsg != expectedErrMsg) {
if (!errMsg.contains(expectedErrMsg)) {
Copy link
Member Author

@viirya viirya Mar 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For codegen error, it has very verbose message like:

Code generation of initializejavabean([], (nonexisting,1)) failed:
[info]   java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 41, Column 22
: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 41, Column 22: A method named "nonexisting
" is not declared in any enclosing class nor any supertype, nor through a static import
[info]   java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 41, Column 22
: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 41, Column 22: A method named "nonexisting
" is not declared in any enclosing class nor any supertype, nor through a static import
[info]          at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
[info]          at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)

So changes it to test if it contains the given error message.

fail(s"Expected error message is `$expectedErrMsg`, but `$errMsg` found")
}
}
}
val expr = prepareEvaluation(expression)

// Make it as method to obtain fresh expression everytime.
def expr = prepareEvaluation(expression)
checkException(evaluateWithoutCodegen(expr, inputRow), "non-codegen mode")
checkException(evaluateWithGeneratedMutableProjection(expr, inputRow), "codegen mode")
if (GenerateUnsafeProjection.canSupport(expr.dataType)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,50 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
Invoke(funcSubObj, "binOp", DoubleType, inputSum), 0.75, InternalRow.apply(1, 0.25))
}

test("SPARK-23593: InitializeJavaBean should support interpreted execution") {
val list = new java.util.LinkedList[Int]()
list.add(1)

val initializeBean = InitializeJavaBean(Literal.fromObject(new java.util.LinkedList[Int]),
Map("add" -> Literal(1)))
checkEvaluation(initializeBean, list, InternalRow.fromSeq(Seq()))

val initializeWithNonexistingMethod = InitializeJavaBean(
Literal.fromObject(new java.util.LinkedList[Int]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add a test for when the parameter types do not match up?

Copy link
Member Author

@viirya viirya Mar 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added below.

Note that because for generic method, its parameter type is Object, so for LinkedList[Int], it doesn't make sense to test it with something like Map("add" -> Literal("a string")). So I add TestBean to test this case.

Map("nonexisting" -> Literal(1)))
checkExceptionInExpression[Exception](initializeWithNonexistingMethod,
InternalRow.fromSeq(Seq()),
"""A method named "nonexisting" is not declared in any enclosing class """ +
"nor any supertype")

val initializeWithWrongParamType = InitializeJavaBean(
Literal.fromObject(new TestBean),
Map("setX" -> Literal("1")))
intercept[Exception] {
evaluateWithoutCodegen(initializeWithWrongParamType, InternalRow.fromSeq(Seq()))
}.getMessage.contains(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For codegen the compile exception is like:

No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void org.apache.spark.sql.catalyst.expressions.TestBean.setX(int)"

I'm not sure if we want to exactly match this kind of exception message from interpreted execution. Might be a little overkill to do that by looking methods with same name. So currently I only test interpreted execution.

"""A method named "setX" is not declared in any enclosing class """ +
"nor any supertype")
}

test("Can not pass in null into setters in InitializeJavaBean") {
val initializeBean = InitializeJavaBean(
Literal.fromObject(new TestBean),
Map("setNonPrimitive" -> Literal(null)))
intercept[NullPointerException] {
evaluateWithoutCodegen(initializeBean, InternalRow.fromSeq(Seq()))
}.getMessage.contains("The parameter value for setters in `InitializeJavaBean` can not be null")
intercept[NullPointerException] {
evaluateWithGeneratedMutableProjection(initializeBean, InternalRow.fromSeq(Seq()))
}.getMessage.contains("The parameter value for setters in `InitializeJavaBean` can not be null")

val initializeBean2 = InitializeJavaBean(
Literal.fromObject(new TestBean),
Map("setNonPrimitive" -> Literal("string")))
evaluateWithoutCodegen(initializeBean2, InternalRow.fromSeq(Seq()))
evaluateWithGeneratedMutableProjection(initializeBean2, InternalRow.fromSeq(Seq()))
}

test("SPARK-23585: UnwrapOption should support interpreted execution") {
val cls = classOf[Option[Int]]
val inputObject = BoundReference(0, ObjectType(cls), nullable = true)
Expand Down Expand Up @@ -278,3 +322,11 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
}
}
}

class TestBean extends Serializable {
private var x: Int = 0

def setX(i: Int): Unit = x = i
def setNonPrimitive(i: AnyRef): Unit =
assert(i != null, "this setter should not be called with null.")
}