From 0305497e8d3c8f66340d3055e6a226bd6ee2922a Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Thu, 18 Oct 2018 15:32:42 +0200 Subject: [PATCH 1/4] [SPARK-25768][SQL] fix constant argument expecting UDFs --- .../org/apache/spark/sql/hive/hiveUDFs.scala | 49 +++++++++---------- .../sql/hive/execution/HiveUDFSuite.scala | 15 ++++++ 2 files changed, 39 insertions(+), 25 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 68af99ea272a..d1734c5e3b21 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -339,40 +339,38 @@ private[hive] case class HiveUDAFFunction( val parameterInfo = new SimpleGenericUDAFParameterInfo(inputInspectors, false, false) resolver.getEvaluator(parameterInfo) } + + case class Mode(evaluator: GenericUDAFEvaluator, objectInspector: ObjectInspector) // The UDAF evaluator used to consume raw input rows and produce partial aggregation results. - @transient - private lazy val partial1ModeEvaluator = newEvaluator() - // Hive `ObjectInspector` used to inspect partial aggregation results. @transient - private val partialResultInspector = partial1ModeEvaluator.init( - GenericUDAFEvaluator.Mode.PARTIAL1, - inputInspectors - ) + private lazy val partial1Mode = { + val evaluator = newEvaluator() + Mode(evaluator, evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputInspectors)) + } // The UDAF evaluator used to merge partial aggregation results. @transient private lazy val partial2ModeEvaluator = { val evaluator = newEvaluator() - evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL2, Array(partialResultInspector)) + evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL2, Array(partial1Mode.objectInspector)) evaluator } // Spark SQL data type of partial aggregation results @transient - private lazy val partialResultDataType = inspectorToDataType(partialResultInspector) + private lazy val partialResultDataType = inspectorToDataType(partial1Mode.objectInspector) // The UDAF evaluator used to compute the final result from a partial aggregation result objects. - @transient - private lazy val finalModeEvaluator = newEvaluator() - // Hive `ObjectInspector` used to inspect the final aggregation result object. @transient - private val returnInspector = finalModeEvaluator.init( - GenericUDAFEvaluator.Mode.FINAL, - Array(partialResultInspector) - ) + private lazy val finalMode = { + val evaluator = newEvaluator() + Mode( + evaluator, + evaluator.init(GenericUDAFEvaluator.Mode.FINAL, Array(partial1Mode.objectInspector))) + } // Wrapper functions used to wrap Spark SQL input arguments into Hive specific format. @transient @@ -381,7 +379,7 @@ private[hive] case class HiveUDAFFunction( // Unwrapper function used to unwrap final aggregation result objects returned by Hive UDAFs into // Spark SQL specific format. @transient - private lazy val resultUnwrapper = unwrapperFor(returnInspector) + private lazy val resultUnwrapper = unwrapperFor(finalMode.objectInspector) @transient private lazy val cached: Array[AnyRef] = new Array[AnyRef](children.length) @@ -391,7 +389,7 @@ private[hive] case class HiveUDAFFunction( override def nullable: Boolean = true - override lazy val dataType: DataType = inspectorToDataType(returnInspector) + override lazy val dataType: DataType = inspectorToDataType(finalMode.objectInspector) override def prettyName: String = name @@ -401,13 +399,13 @@ private[hive] case class HiveUDAFFunction( } override def createAggregationBuffer(): AggregationBuffer = - partial1ModeEvaluator.getNewAggregationBuffer + partial1Mode.evaluator.getNewAggregationBuffer @transient private lazy val inputProjection = UnsafeProjection.create(children) override def update(buffer: AggregationBuffer, input: InternalRow): AggregationBuffer = { - partial1ModeEvaluator.iterate( + partial1Mode.evaluator.iterate( buffer, wrap(inputProjection(input), inputWrappers, cached, inputDataTypes)) buffer } @@ -417,12 +415,12 @@ private[hive] case class HiveUDAFFunction( // buffer in the 3rd format mentioned in the ScalaDoc of this class. Originally, Hive converts // this `AggregationBuffer`s into this format before shuffling partial aggregation results, and // calls `GenericUDAFEvaluator.terminatePartial()` to do the conversion. - partial2ModeEvaluator.merge(buffer, partial1ModeEvaluator.terminatePartial(input)) + partial2ModeEvaluator.merge(buffer, partial1Mode.evaluator.terminatePartial(input)) buffer } override def eval(buffer: AggregationBuffer): Any = { - resultUnwrapper(finalModeEvaluator.terminate(buffer)) + resultUnwrapper(finalMode.evaluator.terminate(buffer)) } override def serialize(buffer: AggregationBuffer): Array[Byte] = { @@ -439,9 +437,10 @@ private[hive] case class HiveUDAFFunction( // Helper class used to de/serialize Hive UDAF `AggregationBuffer` objects private class AggregationBufferSerDe { - private val partialResultUnwrapper = unwrapperFor(partialResultInspector) + private val partialResultUnwrapper = unwrapperFor(partial1Mode.objectInspector) - private val partialResultWrapper = wrapperFor(partialResultInspector, partialResultDataType) + private val partialResultWrapper = + wrapperFor(partial1Mode.objectInspector, partialResultDataType) private val projection = UnsafeProjection.create(Array(partialResultDataType)) @@ -451,7 +450,7 @@ private[hive] case class HiveUDAFFunction( // `GenericUDAFEvaluator.terminatePartial()` converts an `AggregationBuffer` into an object // that can be inspected by the `ObjectInspector` returned by `GenericUDAFEvaluator.init()`. // Then we can unwrap it to a Spark SQL value. - mutableRow.update(0, partialResultUnwrapper(partial1ModeEvaluator.terminatePartial(buffer))) + mutableRow.update(0, partialResultUnwrapper(partial1Mode.evaluator.terminatePartial(buffer))) val unsafeRow = projection(mutableRow) val bytes = ByteBuffer.allocate(unsafeRow.getSizeInBytes) unsafeRow.writeTo(bytes) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 6198d4963df3..2f03b852ddf6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -638,6 +638,21 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { Row(3) :: Row(3) :: Nil) } } + + test("constant argument expecting Hive UDF") { + val testData = spark.range(10).toDF() + withTempView("inputTable") { + testData.createOrReplaceTempView("inputTable") + withUserDefinedFunction("testGenericUDAFPercentileApprox" -> false) { + val numFunc = spark.catalog.listFunctions().count() + sql(s"CREATE FUNCTION testGenericUDAFPercentileApprox AS '" + + s"${classOf[GenericUDAFPercentileApprox].getName}'") + checkAnswer( + sql("SELECT testGenericUDAFPercentileApprox(id, 0.5) FROM inputTable"), + Seq(Row(4.0))) + } + } + } } class TestPair(x: Int, y: Int) extends Writable with Serializable { From 00115fffdc09da6d4998415ae906d05f739b7ca2 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Thu, 18 Oct 2018 17:04:50 +0200 Subject: [PATCH 2/4] fix review findings Change-Id: If5a53df72c616d8b54662619426f5c8f34a9c0c0 --- .../src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala | 2 +- .../org/apache/spark/sql/hive/execution/HiveUDFSuite.scala | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index d1734c5e3b21..3673d09be0c3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -340,7 +340,7 @@ private[hive] case class HiveUDAFFunction( resolver.getEvaluator(parameterInfo) } - case class Mode(evaluator: GenericUDAFEvaluator, objectInspector: ObjectInspector) + private case class Mode(evaluator: GenericUDAFEvaluator, objectInspector: ObjectInspector) // The UDAF evaluator used to consume raw input rows and produce partial aggregation results. // Hive `ObjectInspector` used to inspect partial aggregation results. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 2f03b852ddf6..a6fc744cc8b5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -639,10 +639,9 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { } } - test("constant argument expecting Hive UDF") { - val testData = spark.range(10).toDF() + test("SPARK-25768 constant argument expecting Hive UDF") { withTempView("inputTable") { - testData.createOrReplaceTempView("inputTable") + spark.range(10).createOrReplaceTempView("inputTable") withUserDefinedFunction("testGenericUDAFPercentileApprox" -> false) { val numFunc = spark.catalog.listFunctions().count() sql(s"CREATE FUNCTION testGenericUDAFPercentileApprox AS '" + From 844d0ba2aa6cc56efb2d36de09c7b08fd00b15df Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Thu, 18 Oct 2018 19:53:48 +0200 Subject: [PATCH 3/4] fix review findings 2 Change-Id: I7379b21c00336e389af9b5186fac71c566699c14 --- .../scala/org/apache/spark/sql/hive/hiveUDFs.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 3673d09be0c3..c47436afdfca 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -339,15 +339,17 @@ private[hive] case class HiveUDAFFunction( val parameterInfo = new SimpleGenericUDAFParameterInfo(inputInspectors, false, false) resolver.getEvaluator(parameterInfo) } - - private case class Mode(evaluator: GenericUDAFEvaluator, objectInspector: ObjectInspector) + + private case class PartialEvaluator( + evaluator: GenericUDAFEvaluator, + objectInspector: ObjectInspector) // The UDAF evaluator used to consume raw input rows and produce partial aggregation results. // Hive `ObjectInspector` used to inspect partial aggregation results. @transient private lazy val partial1Mode = { val evaluator = newEvaluator() - Mode(evaluator, evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputInspectors)) + PartialEvaluator(evaluator, evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputInspectors)) } // The UDAF evaluator used to merge partial aggregation results. @@ -367,7 +369,7 @@ private[hive] case class HiveUDAFFunction( @transient private lazy val finalMode = { val evaluator = newEvaluator() - Mode( + PartialEvaluator( evaluator, evaluator.init(GenericUDAFEvaluator.Mode.FINAL, Array(partial1Mode.objectInspector))) } From 6e6eca400ba4040d5001e26b20d7815ed2a0c2f4 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Fri, 19 Oct 2018 07:35:09 +0200 Subject: [PATCH 4/4] fix review findings 3 --- .../org/apache/spark/sql/hive/hiveUDFs.scala | 40 ++++++++++--------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index c47436afdfca..4a8450901e3a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -340,38 +340,39 @@ private[hive] case class HiveUDAFFunction( resolver.getEvaluator(parameterInfo) } - private case class PartialEvaluator( - evaluator: GenericUDAFEvaluator, - objectInspector: ObjectInspector) + private case class HiveEvaluator( + evaluator: GenericUDAFEvaluator, + objectInspector: ObjectInspector) // The UDAF evaluator used to consume raw input rows and produce partial aggregation results. // Hive `ObjectInspector` used to inspect partial aggregation results. @transient - private lazy val partial1Mode = { + private lazy val partial1HiveEvaluator = { val evaluator = newEvaluator() - PartialEvaluator(evaluator, evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputInspectors)) + HiveEvaluator(evaluator, evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputInspectors)) } // The UDAF evaluator used to merge partial aggregation results. @transient private lazy val partial2ModeEvaluator = { val evaluator = newEvaluator() - evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL2, Array(partial1Mode.objectInspector)) + evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL2, Array(partial1HiveEvaluator.objectInspector)) evaluator } // Spark SQL data type of partial aggregation results @transient - private lazy val partialResultDataType = inspectorToDataType(partial1Mode.objectInspector) + private lazy val partialResultDataType = + inspectorToDataType(partial1HiveEvaluator.objectInspector) // The UDAF evaluator used to compute the final result from a partial aggregation result objects. // Hive `ObjectInspector` used to inspect the final aggregation result object. @transient - private lazy val finalMode = { + private lazy val finalHiveEvaluator = { val evaluator = newEvaluator() - PartialEvaluator( + HiveEvaluator( evaluator, - evaluator.init(GenericUDAFEvaluator.Mode.FINAL, Array(partial1Mode.objectInspector))) + evaluator.init(GenericUDAFEvaluator.Mode.FINAL, Array(partial1HiveEvaluator.objectInspector))) } // Wrapper functions used to wrap Spark SQL input arguments into Hive specific format. @@ -381,7 +382,7 @@ private[hive] case class HiveUDAFFunction( // Unwrapper function used to unwrap final aggregation result objects returned by Hive UDAFs into // Spark SQL specific format. @transient - private lazy val resultUnwrapper = unwrapperFor(finalMode.objectInspector) + private lazy val resultUnwrapper = unwrapperFor(finalHiveEvaluator.objectInspector) @transient private lazy val cached: Array[AnyRef] = new Array[AnyRef](children.length) @@ -391,7 +392,7 @@ private[hive] case class HiveUDAFFunction( override def nullable: Boolean = true - override lazy val dataType: DataType = inspectorToDataType(finalMode.objectInspector) + override lazy val dataType: DataType = inspectorToDataType(finalHiveEvaluator.objectInspector) override def prettyName: String = name @@ -401,13 +402,13 @@ private[hive] case class HiveUDAFFunction( } override def createAggregationBuffer(): AggregationBuffer = - partial1Mode.evaluator.getNewAggregationBuffer + partial1HiveEvaluator.evaluator.getNewAggregationBuffer @transient private lazy val inputProjection = UnsafeProjection.create(children) override def update(buffer: AggregationBuffer, input: InternalRow): AggregationBuffer = { - partial1Mode.evaluator.iterate( + partial1HiveEvaluator.evaluator.iterate( buffer, wrap(inputProjection(input), inputWrappers, cached, inputDataTypes)) buffer } @@ -417,12 +418,12 @@ private[hive] case class HiveUDAFFunction( // buffer in the 3rd format mentioned in the ScalaDoc of this class. Originally, Hive converts // this `AggregationBuffer`s into this format before shuffling partial aggregation results, and // calls `GenericUDAFEvaluator.terminatePartial()` to do the conversion. - partial2ModeEvaluator.merge(buffer, partial1Mode.evaluator.terminatePartial(input)) + partial2ModeEvaluator.merge(buffer, partial1HiveEvaluator.evaluator.terminatePartial(input)) buffer } override def eval(buffer: AggregationBuffer): Any = { - resultUnwrapper(finalMode.evaluator.terminate(buffer)) + resultUnwrapper(finalHiveEvaluator.evaluator.terminate(buffer)) } override def serialize(buffer: AggregationBuffer): Array[Byte] = { @@ -439,10 +440,10 @@ private[hive] case class HiveUDAFFunction( // Helper class used to de/serialize Hive UDAF `AggregationBuffer` objects private class AggregationBufferSerDe { - private val partialResultUnwrapper = unwrapperFor(partial1Mode.objectInspector) + private val partialResultUnwrapper = unwrapperFor(partial1HiveEvaluator.objectInspector) private val partialResultWrapper = - wrapperFor(partial1Mode.objectInspector, partialResultDataType) + wrapperFor(partial1HiveEvaluator.objectInspector, partialResultDataType) private val projection = UnsafeProjection.create(Array(partialResultDataType)) @@ -452,7 +453,8 @@ private[hive] case class HiveUDAFFunction( // `GenericUDAFEvaluator.terminatePartial()` converts an `AggregationBuffer` into an object // that can be inspected by the `ObjectInspector` returned by `GenericUDAFEvaluator.init()`. // Then we can unwrap it to a Spark SQL value. - mutableRow.update(0, partialResultUnwrapper(partial1Mode.evaluator.terminatePartial(buffer))) + mutableRow.update(0, partialResultUnwrapper( + partial1HiveEvaluator.evaluator.terminatePartial(buffer))) val unsafeRow = projection(mutableRow) val bytes = ByteBuffer.allocate(unsafeRow.getSizeInBytes) unsafeRow.writeTo(bytes)