Skip to content

Conversation

@srowen
Copy link
Member

@srowen srowen commented Aug 10, 2018

What changes were proposed in this pull request?

First attempt to resolve issue with inferring func types in 2.12 by instead using info captured when UDF is registered -- capturing which types are nullable (i.e. not primitive)

How was this patch tested?

Existing tests.

@srowen srowen changed the title [SPARK-25044][SQL] Address translation of LMF closure primitive args to Object in Scala 2.12 [WIP][SPARK-25044][SQL] Address translation of LMF closure primitive args to Object in Scala 2.12 Aug 10, 2018
Copy link
Member Author

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a WIP for now. I think this almost works, but not quite. Putting it out there for comments early.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably the weak point: unless there is nullability info, don't do anything to the UDF plan, but, that's probably wrong in some cases

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The approach here is to capture at registration time whether the arg types are primitive, or nullable. Not a great way to record this, but might be the least hack for now

@SparkQA
Copy link

SparkQA commented Aug 10, 2018

Test build #94535 has finished for PR 22063 at commit 92598f0.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of having 2 list, shall we just keep a Seq[ScalaReflection.Schema] or Seq[(DataType, Boolean)]?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that can be optimized. I'll fix the MiMa issue too by restoring a constructor.

@cloud-fan
Copy link
Contributor

the idea LGTM

@SparkQA
Copy link

SparkQA commented Aug 10, 2018

Test build #94561 has finished for PR 22063 at commit a1850b0.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 10, 2018

Test build #94567 has finished for PR 22063 at commit 4a88da5.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 10, 2018

Test build #94569 has finished for PR 22063 at commit c3d1561.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 10, 2018

Test build #94571 has finished for PR 22063 at commit a803869.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 11, 2018

Test build #94592 has finished for PR 22063 at commit 3c67d9d.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 11, 2018

Test build #94610 has finished for PR 22063 at commit c285e93.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member Author

srowen commented Aug 11, 2018

For those following along, the problem right now is this: functions like udf() have always declared that their type params have TypeTags, meaning the caller has to have runtime info about types. This supports schema inference.

Although the internals of the udf() methods changed in this PR, the signature did not. SchemaReflection.schemaFor was called before. However, with this change, suddenly it seems like the TypeTags are needed by the compiler, and many instances of udf() fail now because the call site does not have one.

I worked around a few, but am having more trouble with others.
I am still not clear why it is that only with this change do the TypeTags seem to matter; they were always required?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm really surprised that this worked before...

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Wrong Scala version

@SparkQA
Copy link

SparkQA commented Aug 21, 2018

Test build #95027 has finished for PR 22063 at commit f029f6d.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 21, 2018

Test build #95031 has finished for PR 22063 at commit a5cc5ec.

  • This patch fails from timeout after a configured wait of `400m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 22, 2018

Test build #95106 has finished for PR 22063 at commit e7abb67.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 22, 2018

Test build #95121 has finished for PR 22063 at commit 09c3a3b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@cloud-fan cloud-fan Aug 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into this, and now I understand why it worked before.

Scala 2.11 somehow can generate type tag for Any, then Spark gets the input schema from type tag Try(ScalaReflection.schemaFor(typeTag[A1]).dataType :: Nil).toOption. It will fail and input schema will be None, so no type check will be applied later.

I think it makes more sense to specify the type and ask Spark to do type check.

+1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review @cloud-fan , I could really use your input here. That's a good find. It may be that we want to explicitly support UDFs where a schema isn't available -- see below. But I agree I'd rather not. It gets kind of messy though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@skonto @lrytz this might be of interest. Don't think it's a Scala issue per se but just checking if that behavior change makes sense.

Copy link
Contributor

@skonto skonto Aug 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adriaanm @lrytz any more info?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No idea, but in any case the new version seems nicer :-) Both 2.11 and 2.12 will happily generate a typeTag for Any, though, so that wouldn't immediately explain it. To see what was actually inferred, you could compile with -Xprint:typer (ideally after a full compile and then just making this file recompile incrementally).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I apologize, that's my mistake. In the end it isn't related to TypeTags for Any and that is not a difference. Thanks for your input, I think we are close.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem! Happy to help with the 2.12 upgrade.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with this workaround, but I think we should create an expression for this checkedCast. The current UDF doesn't work well with inputs of different types.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't even actually work, now that I dig further. Using Number doesn't work either. There are a number of UDFs that fail now in MLlib unfortunately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we want to keep doing this. cc @gatorsmile

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, messy. The constructor and class are public so felt it was worth erring on the side of compatibility.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By convention, everything under catalyst package is private, so compatibility is not a concern here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is safe to do. Maybe a == b is different from a.toString == b.toString.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel udf using Any as input type is rare but a valid use case. Sometimes they just want to accept any input type.

How about we create a few udfInternal methods that takes Any as inputs? e.g.

def udfInternal[R: TypeTag](f: Function1[Any, R]): UserDefinedFunction = {
  val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[R]
  val udf = UserDefinedFunction(f, dataType, Nil)
  if (nullable) udf else udf.asNonNullable()
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right that the change I made here is not bulletproof. Unfortunately there are several more problems like this. Anywhere there's a UDF on Row it now fails, and workarounds are ugly.

I like your idea, let me work on that. Because the alternative I've been working on is driving me nuts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why add this? UserDefinedFunction is public but its constructor is not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was just for MiMa, but I could also suppress the warning

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, can we combine it with the data types?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was hoping to minimize the change in the signature, but yeah it could be Option[Seq[(DataType, Boolean)]]?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or Option[Seq[ScalaReflection.Schema]]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to all your comments. I'm overhauling this whole PR and will force push with a rebase once it seems to basically work.

…nstead using info captured when UDF is registered -- capturing which types are nullable (i.e. not primitive)
@SparkQA
Copy link

SparkQA commented Aug 25, 2018

Test build #95234 has finished for PR 22063 at commit 721860d.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

|def udf[$typeTags](f: Function$x[$types]): UserDefinedFunction = {
| val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
| val inputTypes = Try($inputTypes).toOption
| val inputTypes = Try($argSchema).toOption
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan might be worth another look now. So, after making this change, I realize, maybe the whole reason it started failing was that I had moved the schema inference outside the Try(). Now it's back inside. Maybe that makes the whole problem go back to being silent. Did you mean you preferred tackling the problem directly and not suppressing the failure to infer a schema? I added udfInternal above for that.

But maybe this isn't the best approach as user UDFs could fail for the same reason. Maybe I need to back this whole thing out after all, now that I understand what's happening after your comments.

@SparkQA
Copy link

SparkQA commented Aug 25, 2018

Test build #4288 has finished for PR 22063 at commit 721860d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 26, 2018

Test build #4289 has finished for PR 22063 at commit 721860d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 27, 2018

Test build #4293 has finished for PR 22063 at commit 721860d.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member Author

srowen commented Aug 28, 2018

See also #22259

asfgit pushed a commit that referenced this pull request Aug 29, 2018
…primitive args to Object in Scala 2.12

## What changes were proposed in this pull request?

Alternative take on #22063 that does not introduce udfInternal.
Resolve issue with inferring func types in 2.12 by instead using info captured when UDF is registered -- capturing which types are nullable (i.e. not primitive)

## How was this patch tested?

Existing tests.

Closes #22259 from srowen/SPARK-25044.2.

Authored-by: Sean Owen <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
@srowen srowen closed this Aug 29, 2018
f: AnyRef,
dataType: DataType,
inputTypes: Option[Seq[DataType]]) {
inputTypes: Option[Seq[ScalaReflection.Schema]]) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a stable API. Are we able to make this change in 2.4 instead of 3.0?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the constructor is protected[sql]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see, it's a case class, so we would need to keep the def inputTypes(): Option[Seq[DataType]]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(BTW it was PR #22259 that was merged)

We can add back accessors, constructors, if it would make life easier for callers. But if this is protected, who are the callers of this code we're accommodating? maybe some hacky but important integration?

We'd have to rename inputTypes and then add back an accessor with the old type.

@srowen srowen deleted the SPARK-25044 branch March 10, 2019 19:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants