Skip to content

Conversation

@maryannxue
Copy link
Contributor

@maryannxue maryannxue commented Oct 15, 2018

What changes were proposed in this pull request?

This is a follow-up PR for #22259. The extra field added in ScalaUDF with the original PR was declared optional, but should be indeed required, otherwise callers of ScalaUDF's constructor could ignore this new field and cause the result to be incorrect. This PR makes the new field required and changes its name to handleNullForInputs.

This PR also restores the original behavior for null-handling of primitive-type input parameters, which was broken by #22259. For example, for val f = udf({(x: Int, y: Any) => x}), f(null, "str") should return null but would return 0 after #22259.

In addition, now that we have this extra field indicating if a null-test should be applied on the corresponding input value, we can also make use of this flag to avoid the rule HandleNullInputsForUDF being applied infinitely.

How was this patch tested?

Added UT in UDFSuite

Passed affected existing UTs:
AnalysisSuite
UDFSuite

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to change this constructor, I believe. It's preserved for backwards-compatibility. It should pass an empty Seq or Nil for the new param.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized this is different from you original PR already, @cloud-fan did a follow-up PR adding the "nullableTypes". I'll revert the change here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should just remove this constructor. It's weird to keep backward compatibility for a private class, and I don't think it can work anymore. It's not OK to omit the nullable info.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe give a quick example to clarify? like "For example, primitive types can't take on the value null, and need special handling."

Copy link
Contributor Author

@maryannxue maryannxue Oct 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it needs a little explanation here regarding the primitive types. Since the types are not nullable, when the values are null, it usually ends up being represented as a zero value. I should add it to the java doc here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking my understanding -- string gets false because it's a reference type, and a double known to not be null also doesn't need null checking, so gets false. OK that sounds right.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as explained in #22732 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not specified, assume nothing needs special null handling. OK, check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am actually not so sure about this part, but this is just to be consistent with the behavior in your previous check-in. Can you give an example of such end-user cases where these flags are unavailable/not specified?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know of a case. I suppose that's fine default behavior, and matches what happened in the previous change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually assume the default behavior should be the other way around. If we don't know, we just do the if-else null handling and it wouldn't do us any harm correctness-wise, right? Anyway I'm not gonna change that in this PR but hope we can get an idea if and when this default behavior will happen.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing !nullable instead of nullable, yep, looks right. The meaning is flipped here, correct.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about if handleNullForInputs.exists(_)? if I understand the meaning right, that could be simpler. If anything is true you have to trigger the handling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TODO above seems to apply to the line that was removed? I'm not totally clear. It's OK to not check !expr.isInstanceOf[KnownNotNull] now? you know this better than I. Hey if tests pass I think it proves it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should answer/confirm a couple of other questions above:
Since we already have this flag handleNullForInputs in ScalaUDF, we can take advantage of it in this rule as well. Say, the first time a ScalaUDF hits this rule with handleNullForInputs as "false, true, false", then we add a null-handling (if clause) for the second input which is flagged with "true", and from this point on we are all good with all inputs, and we can flag the new ScalaUDF's handleNullForInputs as all "false". So even if the new ScalaUDF hits this rule for a second time, nothing will be done.
It should work the same way for the "TODO" above, if handleNullForInputs has a "true" flag and the corresponding expression is NOT nullable, we can skip the null handling while flagging it as "false" in the new ScalaUDF in the end.

@SparkQA
Copy link

SparkQA commented Oct 15, 2018

Test build #97409 has finished for PR 22732 at commit 5cf3c89.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I missed something but:

  1. Why don't we just merge handleNullForInputs and inputTypes?
  2. Why handleNullForInputs is required whereas inputTypes's default is Nil?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding handleNullForInputs doesn't look reducing confusion a lot to me. Since this PR targets only refactoring mainly to reduce confusion and the easy of use, this concern should be addressed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be merged. I guess the preference is to minimize the change required, so we just have a new field.

That said, UDFs now really require nullability information to work correctly in Scala 2.12. This is the reason the new field is required and not optional. It already caused a new test to fail, so I'm more persuaded that it's OK to make a less backwards-compatible change to make it clear to any future callers that new info is needed. It's an internal class, so reasonable to do so.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Among all other reasons, one argument for not merging the flags with the types is #22732 (comment).
As to your second question, it would be for same reason as above, plus #22259 (comment).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about acceptNullInputs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Since I'm also using this flag to replace KnownNotNull, I think inputsNullSafe would be a better name?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Seq(false, false))
handleNullForInputs = Seq(false, false))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think we need to do that for required arguments, or we do?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to be more conservative here. The default behavior needs to handle NULLs. We should fix it.

// Need to handle NULL when the fields can't accept NULL. 
val handleNullForInputs = nullableTypes.map(_.map(!_)).getOrElse(Seq.fill(exprs.size)(true))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen @cloud-fan This sounds a new behavior added in Spark 2.4 release by the PR #22259, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's new in the sense that Scala 2.12 forced us to change where we get type information for UDFS, or specifically whether the type is allowed to be null. Before we could infer it directly from the closure, but not in 2.12. This change is supposed to just get the info from elsewhere -- from TypeTags when the UDFs are declared. The change ought to be invisible to users and if done correctly should not result in a behavior change. This follow-up change is more about being conservative and lessening the chance of a mistake (see the test case mary ann found) internally or of a mistake for a library that uses ScalaUDF directly anyway.

For that reason, yeah I agree with your comment here. Default to assuming null checks are needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the only place where we'd get a not-specified inputSchemas is when ScalaReflection.schemaFor doesn't recognize a type and throws an exception (

throw new UnsupportedOperationException(s"Schema for type $other is not supported")
). The caller seems to be doing a bad job by calling it this way, for example:

    val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: Nil).toOption
    val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas)

It would mean if the type of only one of the parameters is unrecognizable by ScalaReflection, we'd end up having the entire Seq as None. I think it's fine not to check null for user-defined types that we don't know, coz they can't be primitive types anyway, but I do think we should make the type inference of each parameter independent so we do handle the nulls that need to be taken care of.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's right. There are a number of UDFs in MLlib, etc that have inputs of type "Any", which isn't great, but I wanted to work around rather than change them for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to what I just pointed out, which is when we did try to get inputSchemas through ScalaReflection.schemaFor and got an exception for unrecognized types, there's another case where we could get an unspecified nullableTypes, and that is when UserDefinedFunction is instantiated calling the constructor but not the create method.
Then I assume it's created by an earlier version, and we should use the old logic, i.e., ScalaReflection.getParameterTypes (https://github.com/apache/spark/pull/22259/files#diff-57b3d87be744b7d79a9beacf8e5e5eb2L2153) to get the correct information for nullableTypes. Is that right, @cloud-fan @srowen ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, but we can't use getParameterTypes anymore. It won't work in Scala 2.12. Where the nullability info is definitely not available, be conservative and assume it all needs null handling?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can't get the type information at compile time(e.g. (a: Any) => xxx), I don't think ScalaReflection.getParameterTypes can get something useful at runtime. For this case we won't add null check anyway, so I think there is no behavior change.

@SparkQA
Copy link

SparkQA commented Oct 16, 2018

Test build #97464 has finished for PR 22732 at commit 19ed5e8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inputsNullSafe SGTM, let's also rename UserDefinedFunction.nullableTypes

@cloud-fan
Copy link
Contributor

There is an argument about whether #22259 introduced behavior changes. Here is my analysis.

Before #22259 , the type and null check was done as

  1. user registers UDFs, like (a: Int) => xxx, (b: Any) => xxx
  2. at compile time, get the type info, and set input types, so that the analyzer can add cast or fail the query if the real input data type doesn't match. Note that, UDFs like (b: Any) => xxx has no type info and we won't do type check.
  3. at runtime, use reflection to get type info again, and add null check if an input is primitive type.

After #22259 , the type and null check is done as

  1. user registers UDFs, like (a: Int) => xxx, (b: Any) => xxx
  2. at compile time, get the type info, set input types and input nullable, so that the analyzer can add cast or fail the query if the real input data type doesn't match, and add null check if necessary. Note that, UDFs like (b: Any) => xxx has no type info and we won't do type and null check.

So we may have a behavior change if users register UDFs in a weird way. e.g. they define (a: Int) => xxx, but cast it to Any => xx during registration. Then we can't get the real type info at compile time.

I'd say this is an invalid use case, because the data type check is also lost. I think we don't need to treat it as a behavior change.

@cloud-fan
Copy link
Contributor

After more thoughts, there is one case we don't handle well. For UDFs like (a: Any, i: Int) => xxx, previously we can still get the int type info at runtime, and add null check for it. Now we can't get type info at compile time, because we do something like Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: Nil).toOption. When we fail to get type info of Any, we just give up and don't try the next input.

@srowen
Copy link
Member

srowen commented Oct 17, 2018

That's a good point, but that was already an issue right? it isn't introduced by this change at least?

@maryannxue
Copy link
Contributor Author

@srowen What @cloud-fan described is a change introduced in #22259. We can fix it by keeping each call to ScalaReflection.schemaFor in their own Try blocks.
As to UserDefinedFunction, after offline discussions with @cloud-fan, we decided that there should be no occurrences of calling the constructor without setting the nullableTypes, so we'll just assert nullableTypes.length == exprs.length.
I'll go ahead and fix these two items and update the PR.

@srowen
Copy link
Member

srowen commented Oct 17, 2018

At this point I think you both know more than I do about this, so go ahead. To the limits of my understanding it sounds reasonable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this constructor is here for backward compatibility. If we have to change it, I don't think we need to keep it anymore. cc @gatorsmile

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not break it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll do the Scala 2.11 approach for such places where nullableTypes info is unavailable so to at least keep legacy usage of ScalaUDF working.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea. We should not break our end users if they just use Scala 2.11. In Spark 3.0, we can be more aggressive and break the binary compatibility.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

@SparkQA
Copy link

SparkQA commented Oct 18, 2018

Test build #97508 has finished for PR 22732 at commit 968ed26.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we are here, shall we also check exprs.length == numOfArgs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think the "assert" should better be in ScalaUDF, but then it would be slightly different from the current behavior whatsoever. I'm keeping other things as they are.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit udf((x: Long, y: Any) => x * 2 + (if (y == null) 1 else 0))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: upper case for keywords

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this target to backport to 2.4?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this fixes a bug introduced by a commit in 2.4

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we explicitly return seq of true if it's not scala 2.11? Then the behavior is more predictable than may be inaccurate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it simply returns if going through the below code path. I should probably make the java doc clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The argument here is it's not necessarily wrong if using scala 2.12. if all inputs are of boxed types, then it can still be good. I think it's just enough to say "we don't support it. switch to the new interface otherwise we can't guarantee correctness."

@maryannxue maryannxue force-pushed the spark-25044-followup branch from 7a6b2e1 to 84cb456 Compare October 19, 2018 02:59
@SparkQA
Copy link

SparkQA commented Oct 19, 2018

Test build #97578 has finished for PR 22732 at commit 84cb456.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 19, 2018

Test build #97574 has finished for PR 22732 at commit 7a6b2e1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 19, 2018

Test build #97573 has finished for PR 22732 at commit cf8c457.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
  • trait ScalaReflection extends Logging
  • // TODO: make sure this class is only instantiated throughSparkUserDefinedFunction.create()``

@gatorsmile
Copy link
Member

gatorsmile commented Oct 19, 2018

For Scala 2.11, we should not introduce any behavior change and also keep binary and source compatibility.

scala> import org.apache.spark.sql.types.DataTypes
import org.apache.spark.sql.types.DataTypes

scala> val f2 = udf({(x: Int) => x}, DataTypes.IntegerType)
f2: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,None)

scala> spark.range(3).select(f2('id + null)).show()
+----------------+
|UDF((id + null))|
+----------------+
|            null|
|            null|
|            null|
+----------------+

For Scala 2.12, since we are unable to know the type nullability in a few APIs, we issue a warning message in these cases. Below is the example which will generate a wrong answer due to the primitive data types:

scala> import org.apache.spark.sql.types.DataTypes
import org.apache.spark.sql.types.DataTypes

scala> val f2 = udf({(x: Int) => x}, DataTypes.IntegerType)
f2: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction($Lambda$2801/26868055@5eb35a26,IntegerType,None)

scala> spark.range(3).select(f2('id + null)).show()
18/10/18 23:07:02 WARN ScalaReflection: Scala version 2.12.7 cannot get type nullability correctly via reflection, thus Spark cannot add proper input null check for UDF. To avoid this problem, use the typed UDF interfaces instead.
+----------------+
|UDF((id + null))|
+----------------+
|               0|
|               0|
|               0|
+----------------+

In Spark 3.0, we should have a follow-up PR to block these APIs. In Spark 2.4, the support of Scala 2.12 is beta.

@SparkQA
Copy link

SparkQA commented Oct 19, 2018

Test build #97587 has finished for PR 22732 at commit cb7e97a.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 19, 2018

Test build #97581 has finished for PR 22732 at commit e848ec7.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

retest this please

@gatorsmile
Copy link
Member

LGTM pending Jenkins

@SparkQA
Copy link

SparkQA commented Oct 19, 2018

Test build #97601 has finished for PR 22732 at commit cb7e97a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master/2.4!

asfgit pushed a commit that referenced this pull request Oct 19, 2018
## What changes were proposed in this pull request?

This is a follow-up PR for #22259. The extra field added in `ScalaUDF` with the original PR was declared optional, but should be indeed required, otherwise callers of `ScalaUDF`'s constructor could ignore this new field and cause the result to be incorrect. This PR makes the new field required and changes its name to `handleNullForInputs`.

#22259 breaks the previous behavior for null-handling of primitive-type input parameters. For example, for `val f = udf({(x: Int, y: Any) => x})`, `f(null, "str")` should return `null` but would return `0` after #22259. In this PR, all UDF methods except `def udf(f: AnyRef, dataType: DataType): UserDefinedFunction` have been restored with the original behavior. The only exception is documented in the Spark SQL migration guide.

In addition, now that we have this extra field indicating if a null-test should be applied on the corresponding input value, we can also make use of this flag to avoid the rule `HandleNullInputsForUDF` being applied infinitely.

## How was this patch tested?
Added UT in UDFSuite

Passed affected existing UTs:
AnalysisSuite
UDFSuite

Closes #22732 from maryannxue/spark-25044-followup.

Lead-authored-by: maryannxue <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit e816776)
Signed-off-by: Wenchen Fan <[email protected]>
@asfgit asfgit closed this in e816776 Oct 19, 2018
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

This is a follow-up PR for apache#22259. The extra field added in `ScalaUDF` with the original PR was declared optional, but should be indeed required, otherwise callers of `ScalaUDF`'s constructor could ignore this new field and cause the result to be incorrect. This PR makes the new field required and changes its name to `handleNullForInputs`.

apache#22259 breaks the previous behavior for null-handling of primitive-type input parameters. For example, for `val f = udf({(x: Int, y: Any) => x})`, `f(null, "str")` should return `null` but would return `0` after apache#22259. In this PR, all UDF methods except `def udf(f: AnyRef, dataType: DataType): UserDefinedFunction` have been restored with the original behavior. The only exception is documented in the Spark SQL migration guide.

In addition, now that we have this extra field indicating if a null-test should be applied on the corresponding input value, we can also make use of this flag to avoid the rule `HandleNullInputsForUDF` being applied infinitely.

## How was this patch tested?
Added UT in UDFSuite

Passed affected existing UTs:
AnalysisSuite
UDFSuite

Closes apache#22732 from maryannxue/spark-25044-followup.

Lead-authored-by: maryannxue <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

In apache#22732 , we tried our best to keep the behavior of Scala UDF unchanged in Spark 2.4.

However, since Spark 3.0, Scala 2.12 is the default. The trick that was used to keep the behavior unchanged doesn't work with Scala 2.12.

This PR proposes to remove the Scala 2.11 hack, as it's not useful.

## How was this patch tested?

existing tests.

Closes apache#23498 from cloud-fan/udf.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants