Skip to content

Conversation

@michalsenkyr
Copy link
Contributor

What changes were proposed in this pull request?

When constructing a DataFrame from a Java bean, using nested beans throws an error despite documentation stating otherwise. This PR aims to add that support.

This PR does not yet add nested beans support in array or List fields. This can be added later or in another PR.

How was this patch tested?

Nested bean was added to the appropriate unit test.

Also manually tested in Spark shell on code emulating the referenced JIRA:

scala> import scala.beans.BeanProperty
import scala.beans.BeanProperty

scala> class SubCategory(@BeanProperty var id: String, @BeanProperty var name: String) extends Serializable
defined class SubCategory

scala> class Category(@BeanProperty var id: String, @BeanProperty var subCategory: SubCategory) extends Serializable
defined class Category

scala> import scala.collection.JavaConverters._
import scala.collection.JavaConverters._

scala> spark.createDataFrame(Seq(new Category("s-111", new SubCategory("sc-111", "Sub-1"))).asJava, classOf[Category])
java.lang.IllegalArgumentException: The value (SubCategory@65130cf2) of the type (SubCategory) cannot be converted to struct<id:string,name:string>
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:262)
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:238)
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396)
  at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1108)
  at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1108)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1.apply(SQLContext.scala:1108)
  at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1.apply(SQLContext.scala:1106)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
  at scala.collection.Iterator$class.toStream(Iterator.scala:1320)
  at scala.collection.AbstractIterator.toStream(Iterator.scala:1334)
  at scala.collection.TraversableOnce$class.toSeq(TraversableOnce.scala:298)
  at scala.collection.AbstractIterator.toSeq(Iterator.scala:1334)
  at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:423)
  ... 51 elided

New behavior:

scala> spark.createDataFrame(Seq(new Category("s-111", new SubCategory("sc-111", "Sub-1"))).asJava, classOf[Category])
res0: org.apache.spark.sql.DataFrame = [id: string, subCategory: struct<id: string, name: string>]

scala> res0.show()
+-----+---------------+
|   id|    subCategory|
+-----+---------------+
|s-111|[sc-111, Sub-1]|
+-----+---------------+

.toMap
new GenericInternalRow(structType.map(nestedProperty =>
invoke(value)(nestedExtractors(nestedProperty.name) -> nestedProperty.dataType)
).toArray)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should we use a map here while we don't need it for the root bean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, we don't have to. Just checked and JavaTypeInference.inferDataType also uses JavaTypeInference.getJavaBeanReadableProperties so the order should be the same. Also double-checked manually in Spark shell with a more complex nested bean to be sure.

DataTypes.createStructType(Collections.singletonList(new StructField(
"a", IntegerType$.MODULE$, false, Metadata.empty()))),
true, Metadata.empty()),
schema.apply("f"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be double spaced.

@felixcheung
Copy link
Member

Jenkins, ok to test

@SparkQA
Copy link

SparkQA commented Oct 1, 2018

Test build #96809 has finished for PR 22527 at commit d8083cf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val methodsToTypes = extractors.zip(attrs).map { case (e, attr) =>
(e, attr.dataType)
}
def invoke(element: Any)(tuple: (Method, DataType)): Any = tuple match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create converters before data.map { ... } instead of calculating converters for each row?

I mean something like:

def converter(e: Method, dt: DataType): Any => Any = dt match {
  case StructType(fields) =>
    val nestedExtractors =
      JavaTypeInference.getJavaBeanReadableProperties(e.getReturnType).map(_.getReadMethod)
    val nestedConverters =
      nestedExtractors.zip(fields).map { case (extractor, field) =>
        converter(extractor, field.dataType)
      }

    element =>
      val value = e.invoke(element)
      new GenericInternalRow(nestedConverters.map(_(value)))
  case _ =>
    val convert = CatalystTypeConverters.createToCatalystConverter(dt)
    element => convert(e.invoke(element))
}

and then

val converters = extractors.zip(attrs).map { case (e, attr) =>
  converter(e, attr.dataType)
}
data.map { element =>
  new GenericInternalRow(converters.map(_(element))): InternalRow
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Thank you. Changed it in the latest commit

@michalsenkyr
Copy link
Contributor Author

I restructured the code in this commit to allow easier addition of array/list support in the future.

@SparkQA
Copy link

SparkQA commented Oct 2, 2018

Test build #96869 has finished for PR 22527 at commit 3fe63c8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM for the current changes except for some comments.
I guess we need to support array/list of beans as you suggested, and map of beans as well.
@michalsenkyr Do you want to address them? We can do that here or in separate prs.

JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod)
val methodsToConverts = extractors.zip(attrs).map { case (e, attr) =>
(e, CatalystTypeConverters.createToCatalystConverter(attr.dataType))
def createStructConverter(cls: Class[_], fieldTypes: Iterator[DataType]): Any => InternalRow = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Seq[DataType] instead of Iterator[DataType]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used Iterators instead of Seqs in order to avoid creating intermediate collections. However, I agree it's more concise without that.

val method = property.getReadMethod
method -> createConverter(method.getReturnType, fieldType)
}.toArray
value => new GenericInternalRow(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check whether the value is null or not? Also could you add a test for the case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. Added. Thanks.

@michalsenkyr
Copy link
Contributor Author

@ueshin Yes. I am already working on array/list support. Will add maps as well. It shouldn't require a rewrite now that the code is restructured, just new cases in pattern match. So I think it's ok to do in another PR.

@SparkQA
Copy link

SparkQA commented Oct 3, 2018

Test build #96901 has finished for PR 22527 at commit 3b2a431.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except for a nit.
@HyukjinKwon Could you take a look again please?

else new GenericInternalRow(
methodConverters.map { case (method, converter) =>
converter(method.invoke(value))
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please use braces for multi-lined if-else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@SparkQA
Copy link

SparkQA commented Oct 5, 2018

Test build #96954 has finished for PR 22527 at commit e9b5a98.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member

ueshin commented Oct 5, 2018

I'd merge this not to block the following prs to support array/list and map of beans.

@ueshin
Copy link
Member

ueshin commented Oct 5, 2018

Thanks! merging to master.

@ueshin
Copy link
Member

ueshin commented Oct 5, 2018

Sorry, the merge script failed. Let me try again a while later.

@ueshin
Copy link
Member

ueshin commented Oct 5, 2018

Seems like there is a merge commit in apache git https://git-wip-us.apache.org/repos/asf?p=spark.git, but not in GitHub yet.

@michalsenkyr
Copy link
Contributor Author

Thanks! I created a new PR with array, list and map support.

@HyukjinKwon
Copy link
Member

Hey @ueshin, sorry I was late. I'm a bit busy for these couple of weeks so please don't block by me and ignore me. Thank you for asking it to me.

@HyukjinKwon
Copy link
Member

Looks good to me too!

jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

When constructing a DataFrame from a Java bean, using nested beans throws an error despite [documentation](http://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection) stating otherwise. This PR aims to add that support.

This PR does not yet add nested beans support in array or List fields. This can be added later or in another PR.

## How was this patch tested?

Nested bean was added to the appropriate unit test.

Also manually tested in Spark shell on code emulating the referenced JIRA:

```
scala> import scala.beans.BeanProperty
import scala.beans.BeanProperty

scala> class SubCategory(BeanProperty var id: String, BeanProperty var name: String) extends Serializable
defined class SubCategory

scala> class Category(BeanProperty var id: String, BeanProperty var subCategory: SubCategory) extends Serializable
defined class Category

scala> import scala.collection.JavaConverters._
import scala.collection.JavaConverters._

scala> spark.createDataFrame(Seq(new Category("s-111", new SubCategory("sc-111", "Sub-1"))).asJava, classOf[Category])
java.lang.IllegalArgumentException: The value (SubCategory65130cf2) of the type (SubCategory) cannot be converted to struct<id:string,name:string>
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:262)
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:238)
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396)
  at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1108)
  at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1108)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1.apply(SQLContext.scala:1108)
  at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1.apply(SQLContext.scala:1106)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
  at scala.collection.Iterator$class.toStream(Iterator.scala:1320)
  at scala.collection.AbstractIterator.toStream(Iterator.scala:1334)
  at scala.collection.TraversableOnce$class.toSeq(TraversableOnce.scala:298)
  at scala.collection.AbstractIterator.toSeq(Iterator.scala:1334)
  at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:423)
  ... 51 elided
```

New behavior:

```
scala> spark.createDataFrame(Seq(new Category("s-111", new SubCategory("sc-111", "Sub-1"))).asJava, classOf[Category])
res0: org.apache.spark.sql.DataFrame = [id: string, subCategory: struct<id: string, name: string>]

scala> res0.show()
+-----+---------------+
|   id|    subCategory|
+-----+---------------+
|s-111|[sc-111, Sub-1]|
+-----+---------------+
```

Closes apache#22527 from michalsenkyr/SPARK-17952.

Authored-by: Michal Senkyr <[email protected]>
Signed-off-by: Takuya UESHIN <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants