Skip to content

[SPARK-33704][SQL] Support latest version of initialize() in HiveGenericUDTF#30665

Closed
southernriver wants to merge 3 commits intoapache:branch-2.4from
southernriverchen:SPARK-33704
Closed

[SPARK-33704][SQL] Support latest version of initialize() in HiveGenericUDTF#30665
southernriver wants to merge 3 commits intoapache:branch-2.4from
southernriverchen:SPARK-33704

Conversation

@southernriver
Copy link
Contributor

What changes were proposed in this pull request?

For HiveGenericUDTF , there are two initialization methods:

  public StructObjectInspector initialize(StructObjectInspector argOIs)
      throws UDFArgumentException {
    List<? extends StructField> inputFields = argOIs.getAllStructFieldRefs();
    ObjectInspector[] udtfInputOIs = new ObjectInspector[inputFields.size()];
    for (int i = 0; i < inputFields.size(); i++) {
      udtfInputOIs[i] = inputFields.get(i).getFieldObjectInspector();
    }
    return initialize(udtfInputOIs);
  }
  @Deprecated
  public StructObjectInspector initialize(ObjectInspector[] argOIs)
      throws UDFArgumentException {
    throw new IllegalStateException("Should not be called directly");
  }

As https://issues.apache.org/jira/browse/HIVE-5737 mentioned, hive provided StructObjectInspector for UDTFs rather than ObjectInspect[], but Spark SQL still only support deprecated function.

An exception will be reported before fix:

Error in query: No handler for UDF/UDAF/UDTF 'FeatureParseUDTF1': java.lang.IllegalStateException: Should not be called directly
Please make sure your function overrides public StructObjectInspector initialize(ObjectInspector[] args).; line 1 pos 7

This pr will resolve the exception.

Why are the changes needed?

For the migration of Hive to Spark SQL,we face many UDTF functions throw exception because this issue,It’s really a great improvement for attracting users to Spark SQL.

Does this PR introduce any user-facing change?

NO

How was this patch tested?

manual

@HyukjinKwon
Copy link
Member

cc @wangyum

@HyukjinKwon
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Dec 9, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37082/

@SparkQA
Copy link

SparkQA commented Dec 9, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37082/

@SparkQA
Copy link

SparkQA commented Dec 9, 2020

Test build #132480 has finished for PR 30665 at commit 0e3dca1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member

wangyum commented Dec 9, 2020

@southernriver Could you make pr against master branch?

Copy link
Member

@wangyum wangyum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove build/._scala-2.12.10 and build/._zinc-0.3.15.

Comment on lines +94 to +104
try {
udfExpr = Some(HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), input))
// Force it to check input data types.
udfExpr.get.asInstanceOf[HiveGenericUDTF].elementSchema
} catch {
case exception: Exception =>
logInfo(s"HiveGenericUDTF initialize(ObjectInspector[] args) is deprecated, and" +
s" we will suit the latest version of initialize(StructObjectInspector argOIs).")
udfExpr = Some(HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName),
input, false))
udfExpr.get.asInstanceOf[HiveGenericUDTF].elementSchema
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about?

val funcWrapper = new HiveFunctionWrapper(clazz.getName)
try {
  udfExpr = Some(HiveGenericUDTF(name, funcWrapper, input, true))
  // Force it to check data types.
  udfExpr.get.asInstanceOf[HiveGenericUDTF].elementSchema
} catch {
  case e: IllegalStateException if e.getMessage.equals("Should not be called directly") =>
    logInfo("Fallback to use the non deprecated UDTF constructor.")
    udfExpr = Some(HiveGenericUDTF(name, funcWrapper, input, false))
    // Force it to check data types.
    udfExpr.get.asInstanceOf[HiveGenericUDTF].elementSchema
}

funcWrapper: HiveFunctionWrapper,
children: Seq[Expression])
children: Seq[Expression],
deprecated: Boolean = true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deprecated: Boolean = true -> isDeprecatedConstructor: Boolean?

Comment on lines +223 to +228
protected lazy val outputInspector =
if (deprecated) {
function.initialize(inputInspectors.toArray)
} else {
function.initialize(rowOI)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about?

protected lazy val outputInspector = {
  if (isDeprecatedConstructor) {
    function.initialize(inputInspectors.toArray)
  } else {
    val rowOI = ObjectInspectorFactory.getStandardStructObjectInspector(
      children.zipWithIndex.map(e => s"_col${e._2}").asJava, inputInspectors.asJava)
    function.initialize(rowOI)
  }
}

Comment on lines +2135 to +2137
val num =
sql("SELECT udtf_stack2(2, 'A', 10, date '2015-01-01', 'B', 20, date '2016-01-01')").count()
assert(num === 2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about?

checkAnswer(
  sql("SELECT udtf_stack2(2, 'A', 10, date '2015-01-01', 'B', 20, date '2016-01-01')"),
  Seq(Row("A", 10, java.sql.Date.valueOf("2015-01-01")),
    Row("B", 20, java.sql.Date.valueOf("2016-01-01"))))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you update test name?

@southernriver
Copy link
Contributor Author

@wangyum Thanks a lot, that's good catch. I'll do some fix according to your suggestion.

@wangyum
Copy link
Member

wangyum commented Dec 30, 2020

@southernriver Any update?

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is trying to do the same thing as #29490

// Force it to check input data types.
udfExpr.get.asInstanceOf[HiveGenericUDTF].elementSchema
} catch {
case exception: Exception =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this fallback mechanism? can we just switch to the new API without the deprecated flag?

@wangyum
Copy link
Member

wangyum commented Jan 1, 2021

@southernriver is busy these days. Let's close this.

@wangyum wangyum closed this Jan 1, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants