Skip to content

Conversation

@ulysses-you
Copy link
Contributor

What changes were proposed in this pull request?

Add code in ScalaReflection to support scala enumeration and make enumeration type as string type in Spark.

Why are the changes needed?

We support java enum but failed with scala enum, it's better to keep the same behavior.

Here is a example.

package test

object TestEnum extends Enumeration {
  type TestEnum = Value
  val E1, E2, E3 = Value
}
import TestEnum._
case class TestClass(i: Int,  e: TestEnum) {
}

import test._
Seq(TestClass(1, TestEnum.E1)).toDS

Before this PR

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for test.TestEnum.TestEnum
- field (class: "scala.Enumeration.Value", name: "e")
- root class: "test.TestClass"
  at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerFor$1(ScalaReflection.scala:567)
  at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:69)
  at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:882)
  at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:881)

After this PR
org.apache.spark.sql.Dataset[test.TestClass] = [i: int, e: string]

Does this PR introduce any user-facing change?

Yes, user can make case class which include scala enumeration field as dataset.

How was this patch tested?

Add test.

@SparkQA
Copy link

SparkQA commented Aug 11, 2020

Test build #127308 has finished for PR 29403 at commit 556b709.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 11, 2020

Test build #127319 has finished for PR 29403 at commit 0f98bef.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 12, 2020

Test build #127352 has finished for PR 29403 at commit 462b0f7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rednaxelafx would you have any thoughts? you might know of any potential issues in use of Scala enums

@ulysses-you
Copy link
Contributor Author

I don't know if there exists any issue or reason why we not support it.

@maropu @cloud-fan @dongjoon-hyun do you have any thought ?

Copy link
Contributor

@marmbrus marmbrus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems generally useful to me. I would use it and there are blog posts that talk about working around this. A few comments on nullability.

StructField(fieldName, dataType, nullable)
}), nullable = true)
case t if isSubtype(t, localTypeOf[Enumeration#Value]) =>
Schema(StringType, nullable = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is nullable = false, I believe this should be reserved for primitive types that cannot be null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it should be true.

test("SPARK-32585: Support scala enumeration in ScalaReflection") {
checkDataset(
Seq(FooClassWithEnum(1, FooEnum.E1), FooClassWithEnum(2, FooEnum.E2)).toDS(),
Seq(FooClassWithEnum(1, FooEnum.E1), FooClassWithEnum(2, FooEnum.E2)): _*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to the above comment, I would add a test case where the enum value is null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the null test.

assert(deserializerFor[FooWithAnnotation].dataType == ObjectType(classOf[FooWithAnnotation]))
}

test("SPARK-32585: Support scala enumeration in ScalaReflection") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would consider putting a test case in ExpressionEncoderSuite as well as I think that checks various combinations of evaluation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, not realized this test.

// we can call example.Foo.withName to deserialize string to enumeration.
val className = t.asInstanceOf[TypeRef].pre.typeSymbol.asClass.fullName
// this check is for spark-shell which give a default package name like '$line1.$read$$iw'
if (className.startsWith("$")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't actually understand this limitation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually a jline issue that default package is something like $line32.$read.$iw.$iw.$iw.$iw.$iw. We can't find Enum class using reflect. I choose to forbid it, but please tell me if there exists a better way to go.

s"Enumeration class required package name, but found $className")
}

val clazz = Utils.classForName(className)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative:

val parent = t.asInstanceOf[TypeRef].pre.typeSymbol.asClass
val cls = scala.reflect.runtime.universe
  .runtimeMirror(getClass.getClassLoader)
  .runtimeClass(parent)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this pure scala reflection fix the issues with the REPL?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not fixed, exists the same issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, what is the error? Regardless, I'm not sure this limitation should hold up merging the basic support for enums.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me check this again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After deep check some code, I found the magic.

  1. we can get FooEnum class with default package using mirror. runtimeClass.
  2. we failed in StaticInvoke which will re-reflect the class if class name end with $.

The StaticInvoke related code and pr #20753:

  val objectName = staticObject.getName.stripSuffix("$")
  val cls = if (staticObject.getName == objectName) {
    staticObject
  } else {
    Utils.classForName(objectName)
  }

I cann't find more comment about this code, seems we can use staticObject directly instead of re-reflect it, is it ?
cc @kiszk

@SparkQA
Copy link

SparkQA commented Sep 27, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33762/

@SparkQA
Copy link

SparkQA commented Sep 27, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33762/

@SparkQA
Copy link

SparkQA commented Sep 27, 2020

Test build #129147 has finished for PR 29403 at commit 68c163a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • encodeDecodeTest(FooClassWithEnum(1, FooEnum.E1), \"case class with Int and scala Enum\")
  • encodeDecodeTest(FooEnum.E1, \"case class with Int and scala Enum\")

@SparkQA
Copy link

SparkQA commented Sep 28, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33773/

@SparkQA
Copy link

SparkQA commented Sep 28, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33773/

@SparkQA
Copy link

SparkQA commented Sep 28, 2020

Test build #129158 has finished for PR 29403 at commit ed81055.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

ObjectType(getClassFromType(t)),
"withName",
createDeserializerForString(path, false) :: Nil,
returnNullable = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should also be true?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, if I'm correct, it would be good to include a test case for this case. What happens if you actually do operations on the null value stored as an enum? Is the nullability of the resulting schema correct. Do operations like .isNotNull work correctly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need returnNullable = false since Enumeration.withName will never return null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you are correct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if you actually do operations on the null value stored as an enum

We will get an exception, and the behavior is same as Java enum what we supported.

        case other if other.isEnum =>
          createSerializerForString(
            Invoke(inputObject, "name", ObjectType(classOf[String]), returnNullable = false))

inputObject,
"toString",
ObjectType(classOf[java.lang.String]),
returnNullable = false))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as withName, toString also return a non-null value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. I agree.

@marmbrus
Copy link
Contributor

cc @tdas modulo the nullability concerns above, this LGTM.

StructField(fieldName, dataType, nullable)
}), nullable = true)
case t if isSubtype(t, localTypeOf[Enumeration#Value]) =>
Schema(StringType, nullable = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we are returning the schema generated from enum as nullable, but the serializer expression is not nullable (because it does not produce null). Why is that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two side of nullable.

  • For schema, nullable means the column can be null value.
  • For returnNullable which used in Invoke and StaticInvoke means the method promise will never produce a null value.

Some code get from Invoke

def invoke(
      obj: Any,
      method: Method,
      arguments: Seq[Expression],
      input: InternalRow,
      dataType: DataType): Any = {
    val args = arguments.map(e => e.eval(input).asInstanceOf[Object])
    if (needNullCheck && args.exists(_ == null)) {
      // return null if one of arguments is null
      null
    } else {
      val ret = method.invoke(obj, args: _*)
      val boxedClass = ScalaReflection.typeBoxedJavaMapping.get(dataType)
      if (boxedClass.isDefined) {
        boxedClass.get.cast(ret)
      } else {
        ret
      }
    }
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So effectively, in the serialized form, we are allowing nulls to be present in the column mapping to the enum field. But if there is indeed a row with a null in that column and we attempt to deserialize that rows, then it will cause a runtime failure...isnt it?

If this understanding is correct, then serializing will never produce null, and even if there is a null, serializing will fail. Then why keep this column nullable = true? I am not necessarily opposed to it, I am just trying to understand the rationale. cc @marmbrus

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if there is indeed a row with a null in that column and we attempt to deserialize that rows, then it will cause a runtime failure...isnt it

Actually we will get null back. It's a trick that if input value is null the serializing will return null.

@SparkQA
Copy link

SparkQA commented Sep 29, 2020

Test build #129216 has finished for PR 29403 at commit 2d675ca.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 29, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33831/

@SparkQA
Copy link

SparkQA commented Sep 29, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33831/

// the fullName of tpe is example.Foo.Foo, but we need example.Foo so that
// we can call example.Foo.withName to deserialize string to enumeration.
val parent = t.asInstanceOf[TypeRef].pre.typeSymbol.asClass
val cls = mirror.runtimeClass(parent)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

follow scala reflection.

@tdas
Copy link
Contributor

tdas commented Oct 1, 2020

LGTM. Merging it to master.

@asfgit asfgit closed this in e62d247 Oct 1, 2020
@ulysses-you
Copy link
Contributor Author

thanks for merging and review !

@ulysses-you ulysses-you deleted the SPARK-32585 branch March 10, 2021 06:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants