-
Notifications
You must be signed in to change notification settings - Fork 29.1k
[SPARK-33704][SQL] Support latest version of initialize() in HiveGenericUDTF #30665
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -91,8 +91,18 @@ private[sql] class HiveSessionCatalog( | |
| isUDAFBridgeRequired = true)) | ||
| udfExpr.get.dataType // Force it to check input data types. | ||
| } else if (classOf[GenericUDTF].isAssignableFrom(clazz)) { | ||
| udfExpr = Some(HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), input)) | ||
| udfExpr.get.asInstanceOf[HiveGenericUDTF].elementSchema // Force it to check data types. | ||
| try { | ||
| udfExpr = Some(HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), input)) | ||
| // Force it to check input data types. | ||
| udfExpr.get.asInstanceOf[HiveGenericUDTF].elementSchema | ||
| } catch { | ||
| case exception: Exception => | ||
| logInfo(s"HiveGenericUDTF initialize(ObjectInspector[] args) is deprecated, and" + | ||
| s" we will suit the latest version of initialize(StructObjectInspector argOIs).") | ||
| udfExpr = Some(HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), | ||
| input, false)) | ||
| udfExpr.get.asInstanceOf[HiveGenericUDTF].elementSchema | ||
|
Comment on lines
+94
to
+104
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about? val funcWrapper = new HiveFunctionWrapper(clazz.getName)
try {
udfExpr = Some(HiveGenericUDTF(name, funcWrapper, input, true))
// Force it to check data types.
udfExpr.get.asInstanceOf[HiveGenericUDTF].elementSchema
} catch {
case e: IllegalStateException if e.getMessage.equals("Should not be called directly") =>
logInfo("Fallback to use the non deprecated UDTF constructor.")
udfExpr = Some(HiveGenericUDTF(name, funcWrapper, input, false))
// Force it to check data types.
udfExpr.get.asInstanceOf[HiveGenericUDTF].elementSchema
} |
||
| } | ||
| } | ||
| } catch { | ||
| case NonFatal(e) => | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -198,7 +198,8 @@ private[hive] case class HiveGenericUDF( | |
| private[hive] case class HiveGenericUDTF( | ||
| name: String, | ||
| funcWrapper: HiveFunctionWrapper, | ||
| children: Seq[Expression]) | ||
| children: Seq[Expression], | ||
| deprecated: Boolean = true) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| extends Generator with HiveInspectors with CodegenFallback with UserDefinedExpression { | ||
|
|
||
| @transient | ||
|
|
@@ -212,7 +213,19 @@ private[hive] case class HiveGenericUDTF( | |
| protected lazy val inputInspectors = children.map(toInspector) | ||
|
|
||
| @transient | ||
| protected lazy val outputInspector = function.initialize(inputInspectors.toArray) | ||
| protected lazy val inpuColNames = children.map(_ => "field_name").asJava | ||
|
|
||
| @transient | ||
| protected lazy val rowOI = ObjectInspectorFactory.getStandardStructObjectInspector( | ||
| inpuColNames, inputInspectors.asJava) | ||
|
|
||
| @transient | ||
| protected lazy val outputInspector = | ||
| if (deprecated) { | ||
| function.initialize(inputInspectors.toArray) | ||
| } else { | ||
| function.initialize(rowOI) | ||
| } | ||
|
Comment on lines
+223
to
+228
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about? protected lazy val outputInspector = {
if (isDeprecatedConstructor) {
function.initialize(inputInspectors.toArray)
} else {
val rowOI = ObjectInspectorFactory.getStandardStructObjectInspector(
children.zipWithIndex.map(e => s"_col${e._2}").asJava, inputInspectors.asJava)
function.initialize(rowOI)
}
} |
||
|
|
||
| @transient | ||
| protected lazy val udtInput = new Array[AnyRef](children.length) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2132,11 +2132,9 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { | |
| |AS 'org.apache.spark.sql.hive.execution.UDTFStack2' | ||
| |USING JAR '${hiveContext.getHiveFile("SPARK-21101-1.0.jar").toURI}' | ||
| """.stripMargin) | ||
| val e = intercept[org.apache.spark.sql.AnalysisException] { | ||
| sql("SELECT udtf_stack2(2, 'A', 10, date '2015-01-01', 'B', 20, date '2016-01-01')") | ||
| } | ||
| assert( | ||
| e.getMessage.contains("public StructObjectInspector initialize(ObjectInspector[] args)")) | ||
| val num = | ||
| sql("SELECT udtf_stack2(2, 'A', 10, date '2015-01-01', 'B', 20, date '2016-01-01')").count() | ||
| assert(num === 2) | ||
|
Comment on lines
+2135
to
+2137
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about? checkAnswer(
sql("SELECT udtf_stack2(2, 'A', 10, date '2015-01-01', 'B', 20, date '2016-01-01')"),
Seq(Row("A", 10, java.sql.Date.valueOf("2015-01-01")),
Row("B", 20, java.sql.Date.valueOf("2016-01-01"))))
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you update test name? |
||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this fallback mechanism? can we just switch to the new API without the
deprecatedflag?