Skip to content

Conversation

@michalsenkyr
Copy link
Contributor

What changes were proposed in this pull request?

Continuing from #22527, this PR seeks to add support for beans in array, list and map fields when creating DataFrames from Java beans.

How was this patch tested?

Appropriate unit tests were amended.

Also manually tested in Spark shell:

scala> import scala.beans.BeanProperty
import scala.beans.BeanProperty

scala> class Nested(@BeanProperty var i: Int) extends Serializable
defined class Nested

scala> class Test(@BeanProperty var array: Array[Nested], @BeanProperty var list: java.util.List[Nested], @BeanProperty var map: java.util.Map[Integer, Nested]) extends Serializable
defined class Test

scala> import scala.collection.JavaConverters._
import scala.collection.JavaConverters._

scala> val array = Array(new Nested(1))
array: Array[Nested] = Array(Nested@757ad227)

scala> val list = Seq(new Nested(2), new Nested(3)).asJava
list: java.util.List[Nested] = [Nested@633dce39, Nested@4dd28982]

scala> val map = Map(Int.box(1) -> new Nested(4), Int.box(2) -> new Nested(5)).asJava
map: java.util.Map[Integer,Nested] = {1=Nested@57421e4e, 2=Nested@5a75bad4}

scala> val df = spark.createDataFrame(Seq(new Test(array, list, map)).asJava, classOf[Test])
df: org.apache.spark.sql.DataFrame = [array: array<struct<i:int>>, list: array<struct<i:int>> ... 1 more field]

scala> df.show()
+-----+----------+--------------------+
|array|      list|                 map|
+-----+----------+--------------------+
|[[1]]|[[2], [3]]|[1 -> [4], 2 -> [5]]|
+-----+----------+--------------------+

Previous behavior:

scala> val df = spark.createDataFrame(Seq(new Test(array, list, map)).asJava, classOf[Test])
java.lang.IllegalArgumentException: The value (Nested@3dedc8b8) of the type (Nested) cannot be converted to struct<i:int>
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:262)
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:238)
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter$$anonfun$toCatalystImpl$1.apply(CatalystTypeConverters.scala:162)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:162)
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:154)
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396)
  at org.apache.spark.sql.SQLContext$$anonfun$createStructConverter$1$1$$anonfun$apply$1.apply(SQLContext.scala:1114)
  at org.apache.spark.sql.SQLContext$$anonfun$createStructConverter$1$1$$anonfun$apply$1.apply(SQLContext.scala:1113)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  at org.apache.spark.sql.SQLContext$$anonfun$createStructConverter$1$1.apply(SQLContext.scala:1113)
  at org.apache.spark.sql.SQLContext$$anonfun$createStructConverter$1$1.apply(SQLContext.scala:1108)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
  at scala.collection.Iterator$class.toStream(Iterator.scala:1320)
  at scala.collection.AbstractIterator.toStream(Iterator.scala:1334)
  at scala.collection.TraversableOnce$class.toSeq(TraversableOnce.scala:298)
  at scala.collection.AbstractIterator.toSeq(Iterator.scala:1334)
  at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:423)
  ... 51 elided

@michalsenkyr michalsenkyr changed the title Support for nested JavaBean arrays, lists and maps in createDataFrame [SPARK-25654] Support for nested JavaBean arrays, lists and maps in createDataFrame Oct 5, 2018
@michalsenkyr michalsenkyr changed the title [SPARK-25654] Support for nested JavaBean arrays, lists and maps in createDataFrame [SPARK-25654][SQL] Support for nested JavaBean arrays, lists and maps in createDataFrame Oct 5, 2018
beanClass: Class[_],
attrs: Seq[AttributeReference]): Iterator[InternalRow] = {
import scala.collection.JavaConverters._
import java.lang.reflect.{Type, ParameterizedType, Array => JavaArray}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add import here? Can we move it to top?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to needlessly add those to the whole file as the reflection stuff is needed only in this method. Ditto with collection converters. But if you think it is better at the top, I'll move it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems rarely to see import like this in Spark codebase.

def interfaceParameters(t: Type, interface: Class[_]): Array[Type] = t match {
case parType: ParameterizedType if parType.getRawType == interface =>
parType.getActualTypeArguments
case _ => throw new UnsupportedOperationException(s"$t is not an $interface")
Copy link
Member

@viirya viirya Oct 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exception message looks a bit confusing. We can say the given type is not supported and we only support the certain type (java.util.List and java.util.Map).

value => new GenericArrayData(
(0 until JavaArray.getLength(value)).map(i =>
converter(JavaArray.get(value, i))).toArray)
case (_, array: ArrayType) =>
Copy link
Member

@viirya viirya Oct 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add few code comments explaining why having two cases both for ArrayType?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I should have added a check for cls.isArray in the array case. That would make it clearer. I will also add a comment to each case with the actual type expected for that conversion.

@viirya
Copy link
Member

viirya commented Oct 6, 2018

The createDataFrame API for Java Beans doesn't have clear document about what JavaBeans are supportd. Can you also update it to explicitly document this?

@HyukjinKwon
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Oct 6, 2018

Test build #97034 has finished for PR 22646 at commit b477d07.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 7, 2018

Test build #97086 has finished for PR 22646 at commit 095c923.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

converter(JavaArray.get(value, i))).toArray)
case (_, array: ArrayType) =>
// java.util.List type
val cls = classOf[java.util.List[_]]
Copy link
Member

@ueshin ueshin Oct 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like JavaTypeInference.inferDataType() supports java.lang.Iterable, not only List, but serializer/deserializer don't. I'm not sure whether we should change inferDataType(). This issue would be in a separate pr anyway, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right. It should be better to change it to avoid confusion. I also agree with a separate PR for that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thoughts, we should use java.lang.Iterable here. We can convert Iterable to ArrayType as ArrayConverter is trying. If we use java.util.List here, it leads behavior changes for list of primitives.

def createConverter(cls: Class[_], dataType: DataType): Any => Any = dataType match {
case struct: StructType => createStructConverter(cls, struct.map(_.dataType))
case _ => CatalystTypeConverters.createToCatalystConverter(dataType)
def createConverter(t: Type, dataType: DataType): Any => Any = (t, dataType) match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, how about we put this method in CatalystTypeConverters? Looks it is a Catalyst converter for beans. Few Java types like java.lang.Iterable, java.math.BigDecimal and java.math.BigInteger are being handled there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm okay to move this to CatalystTypeConverters , but note that unfortunately seems like CatalystTypeConverters doesn't work properly with nested beans as we are trying to support it here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea .. was just thinking of moving this func to there .. looks ugly that this file getting long.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a quick look at CatalystTypeConverters and I believe there would be a problem in not being able to reliably distinguish Java beans from other arbitrary classes. We might use setters or set fields directly to objects which would not be prepared for such manipulation, potentially creating hard to find errors. This method already assumes a Java bean so that problem is not present here. Isn't that so?

case struct: StructType => createStructConverter(cls, struct.map(_.dataType))
case _ => CatalystTypeConverters.createToCatalystConverter(dataType)
def createConverter(t: Type, dataType: DataType): Any => Any = (t, dataType) match {
case (cls: Class[_], struct: StructType) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait .. can we reuse JavaTypeInference.serializerFor and make a projection, rather then reimplementing whole logics here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// TODO: we should only collect properties that have getter and setter. However, some tests
// pass in scala case class as java bean class which doesn't have getter and setter.

We should drop the support for getter or setter only. adding @cloud-fan here as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reusing JavaTypeInference.serializerFor would be great, but currently it behaves a little differently. At least it doesn't support java.lang.Iterable[_], so we can't use it immediately. We need to extend it to support Iterable (and also deserializerFor).

Copy link
Member

@HyukjinKwon HyukjinKwon Oct 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, how about we fix them together while we are here?

I also checked another difference which is beans without getter and/or setter but I think this is something we should fix in 3.0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Frankly, I was not really sure about serializing sets as arrays as the result stops behaving like a set, but I found a PR (#18416) where this seems to have been permitted, so I will go ahead and add that.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@github-actions
Copy link

github-actions bot commented Jan 6, 2020

We're closing this PR because it hasn't been updated in a while.
This isn't a judgement on the merit of the PR in any way. It's just
a way of keeping the PR queue manageable.

If you'd like to revive this PR, please reopen it!

@github-actions github-actions bot added the Stale label Jan 6, 2020
@github-actions github-actions bot closed this Jan 7, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants