From 9ad47895aee72b7b6614281b9f45bb952ac19cb0 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 21 Feb 2017 17:50:13 +0900 Subject: [PATCH 1/7] Improve error message for invalid JavaBean --- .../spark/sql/catalyst/JavaTypeInference.scala | 6 +++++- .../org/apache/spark/sql/JavaDataFrameSuite.java | 15 +++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala index 8b53d988cbc5..2a8e0093153d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala @@ -123,7 +123,11 @@ object JavaTypeInference { val beanInfo = Introspector.getBeanInfo(typeToken.getRawType) val properties = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class") val fields = properties.map { property => - val returnType = typeToken.method(property.getReadMethod).getReturnType + val readMethod = Option(property.getReadMethod).getOrElse { + throw new UnsupportedOperationException( + s"Cannot read the property ${property.getName} because it does not have the getter") + } + val returnType = typeToken.method(readMethod).getReturnType val (dataType, nullable) = inferDataType(returnType) new StructField(property.getName, dataType, nullable) } diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java index c3b94a44c2e9..fc4751e34a88 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java @@ -153,6 +153,14 @@ public List getD() { public BigInteger getE() { return e; } } + public static class BeanWithoutGetter implements Serializable { + private String a; + + public void setA(String a) { + this.a = a; + } + } + void validateDataFrameWithBeans(Bean bean, Dataset df) { StructType schema = df.schema(); Assert.assertEquals(new StructField("a", DoubleType$.MODULE$, false, Metadata.empty()), @@ -397,4 +405,11 @@ public void testBloomFilter() { Assert.assertTrue(filter4.mightContain(i * 3)); } } + + @Test(expected = UnsupportedOperationException.class) + public void testBeanWithoutGetter() { + BeanWithoutGetter bean = new BeanWithoutGetter(); + List data = Arrays.asList(bean); + spark.createDataFrame(data, BeanWithoutGetter.class); + } } From ae4c9aa16b94d4997a69daabc6572a54db124241 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 21 Feb 2017 20:09:51 +0900 Subject: [PATCH 2/7] Address comments --- .../org/apache/spark/sql/catalyst/JavaTypeInference.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala index 2a8e0093153d..343505f9c150 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala @@ -117,7 +117,7 @@ object JavaTypeInference { val (valueDataType, nullable) = inferDataType(valueType) (MapType(keyDataType, valueDataType, nullable), true) - case _ => + case c => // TODO: we should only collect properties that have getter and setter. However, some tests // pass in scala case class as java bean class which doesn't have getter and setter. val beanInfo = Introspector.getBeanInfo(typeToken.getRawType) @@ -125,7 +125,8 @@ object JavaTypeInference { val fields = properties.map { property => val readMethod = Option(property.getReadMethod).getOrElse { throw new UnsupportedOperationException( - s"Cannot read the property ${property.getName} because it does not have the getter") + s"Cannot infer type for class ${c.getName} " + + s"because property ${property.getName} does not have the getter") } val returnType = typeToken.method(readMethod).getReturnType val (dataType, nullable) = inferDataType(returnType) From 91cee264e936421e33ecfb0cd5d3c3b4a474d4f2 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 22 Feb 2017 12:49:58 +0900 Subject: [PATCH 3/7] Allow setter only bean in java schema inference and empty bean in encoder creation --- .../sql/catalyst/JavaTypeInference.scala | 46 ++++++++----------- .../org/apache/spark/sql/SQLContext.scala | 6 +-- .../org/apache/spark/sql/SparkSession.scala | 7 +-- .../apache/spark/sql/JavaDataFrameSuite.java | 6 ++- .../apache/spark/sql/JavaDatasetSuite.java | 11 +++++ 5 files changed, 39 insertions(+), 37 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala index 343505f9c150..b1e73c7fb602 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala @@ -117,18 +117,10 @@ object JavaTypeInference { val (valueDataType, nullable) = inferDataType(valueType) (MapType(keyDataType, valueDataType, nullable), true) - case c => - // TODO: we should only collect properties that have getter and setter. However, some tests - // pass in scala case class as java bean class which doesn't have getter and setter. - val beanInfo = Introspector.getBeanInfo(typeToken.getRawType) - val properties = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class") + case _ => + val properties = getJavaBeanPropertiesWithGetters(typeToken.getRawType) val fields = properties.map { property => - val readMethod = Option(property.getReadMethod).getOrElse { - throw new UnsupportedOperationException( - s"Cannot infer type for class ${c.getName} " + - s"because property ${property.getName} does not have the getter") - } - val returnType = typeToken.method(readMethod).getReturnType + val returnType = typeToken.method(property.getReadMethod).getReturnType val (dataType, nullable) = inferDataType(returnType) new StructField(property.getName, dataType, nullable) } @@ -136,9 +128,16 @@ object JavaTypeInference { } } + def getJavaBeanPropertiesWithGetters(beanClass: Class[_]): Array[PropertyDescriptor] = { + val beanInfo = Introspector.getBeanInfo(beanClass) + beanInfo.getPropertyDescriptors + .filterNot(_.getName == "class").filter(p => p.getReadMethod != null) + } + private def getJavaBeanProperties(beanClass: Class[_]): Array[PropertyDescriptor] = { val beanInfo = Introspector.getBeanInfo(beanClass) beanInfo.getPropertyDescriptors + .filterNot(_.getName == "class") .filter(p => p.getReadMethod != null && p.getWriteMethod != null) } @@ -304,8 +303,6 @@ object JavaTypeInference { case other => val properties = getJavaBeanProperties(other) - assert(properties.length > 0) - val setters = properties.map { p => val fieldName = p.getName val fieldType = typeToken.method(p.getReadMethod).getReturnType @@ -423,20 +420,15 @@ object JavaTypeInference { case other => val properties = getJavaBeanProperties(other) - if (properties.length > 0) { - CreateNamedStruct(properties.flatMap { p => - val fieldName = p.getName - val fieldType = typeToken.method(p.getReadMethod).getReturnType - val fieldValue = Invoke( - inputObject, - p.getReadMethod.getName, - inferExternalType(fieldType.getRawType)) - expressions.Literal(fieldName) :: serializerFor(fieldValue, fieldType) :: Nil - }) - } else { - throw new UnsupportedOperationException( - s"Cannot infer type for class ${other.getName} because it is not bean-compliant") - } + CreateNamedStruct(properties.flatMap { p => + val fieldName = p.getName + val fieldType = typeToken.method(p.getReadMethod).getReturnType + val fieldValue = Invoke( + inputObject, + p.getReadMethod.getName, + inferExternalType(fieldType.getRawType)) + expressions.Literal(fieldName) :: serializerFor(fieldValue, fieldType) :: Nil + }) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index dbe55090ea11..e667018b6661 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -1090,14 +1090,14 @@ object SQLContext { */ private[sql] def beansToRows( data: Iterator[_], - beanInfo: BeanInfo, + beanClass: Class[_], attrs: Seq[AttributeReference]): Iterator[InternalRow] = { val extractors = - beanInfo.getPropertyDescriptors.filterNot(_.getName == "class").map(_.getReadMethod) + JavaTypeInference.getJavaBeanPropertiesWithGetters(beanClass).map(_.getReadMethod) val methodsToConverts = extractors.zip(attrs).map { case (e, attr) => (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType)) } - data.map{ element => + data.map { element => new GenericInternalRow( methodsToConverts.map { case (e, convert) => convert(e.invoke(element)) } ): InternalRow diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 72af55c1fa14..afc1827e7eec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql -import java.beans.Introspector import java.io.Closeable import java.util.concurrent.atomic.AtomicReference @@ -347,8 +346,7 @@ class SparkSession private( val className = beanClass.getName val rowRdd = rdd.mapPartitions { iter => // BeanInfo is not serializable so we must rediscover it remotely for each partition. - val localBeanInfo = Introspector.getBeanInfo(Utils.classForName(className)) - SQLContext.beansToRows(iter, localBeanInfo, attributeSeq) + SQLContext.beansToRows(iter, Utils.classForName(className), attributeSeq) } Dataset.ofRows(self, LogicalRDD(attributeSeq, rowRdd)(self)) } @@ -374,8 +372,7 @@ class SparkSession private( */ def createDataFrame(data: java.util.List[_], beanClass: Class[_]): DataFrame = { val attrSeq = getSchema(beanClass) - val beanInfo = Introspector.getBeanInfo(beanClass) - val rows = SQLContext.beansToRows(data.asScala.iterator, beanInfo, attrSeq) + val rows = SQLContext.beansToRows(data.asScala.iterator, beanClass, attrSeq) Dataset.ofRows(self, LocalRelation(attrSeq, rows.toSeq)) } diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java index fc4751e34a88..01edb1575719 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java @@ -406,10 +406,12 @@ public void testBloomFilter() { } } - @Test(expected = UnsupportedOperationException.class) + @Test public void testBeanWithoutGetter() { BeanWithoutGetter bean = new BeanWithoutGetter(); List data = Arrays.asList(bean); - spark.createDataFrame(data, BeanWithoutGetter.class); + Dataset df = spark.createDataFrame(data, BeanWithoutGetter.class); + Assert.assertEquals(df.schema().length(), 0); + Assert.assertEquals(df.count(), 1); } } diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java index 577672ca8e08..f7bc79357441 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java @@ -1276,4 +1276,15 @@ public void test() { spark.createDataset(data, Encoders.bean(NestedComplicatedJavaBean.class)); ds.collectAsList(); } + + public static class EmptyBean implements Serializable {} + + @Test + public void testEmptyBean() { + EmptyBean bean = new EmptyBean(); + List data = Arrays.asList(bean); + Dataset df = spark.createDataset(data, Encoders.bean(EmptyBean.class)); + Assert.assertEquals(df.schema().length(), 0); + Assert.assertEquals(df.count(), 1); + } } From 5808d71c5284ce9fcaaaffb7ada6d91e34e0b29e Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 22 Feb 2017 12:51:57 +0900 Subject: [PATCH 4/7] Cleaner and comments --- .../org/apache/spark/sql/catalyst/JavaTypeInference.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala index b1e73c7fb602..0145df271ea9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala @@ -117,8 +117,10 @@ object JavaTypeInference { val (valueDataType, nullable) = inferDataType(valueType) (MapType(keyDataType, valueDataType, nullable), true) - case _ => - val properties = getJavaBeanPropertiesWithGetters(typeToken.getRawType) + case other => + // TODO: we should only collect properties that have getter and setter. However, some tests + // pass in scala case class as java bean class which doesn't have getter and setter. + val properties = getJavaBeanPropertiesWithGetters(other) val fields = properties.map { property => val returnType = typeToken.method(property.getReadMethod).getReturnType val (dataType, nullable) = inferDataType(returnType) From ed686fae82fcd8984817615955ff5b2caf24ea08 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 22 Feb 2017 13:05:48 +0900 Subject: [PATCH 5/7] Cleaner tests --- .../apache/spark/sql/JavaDataFrameSuite.java | 18 +++++++++--------- .../org/apache/spark/sql/JavaDatasetSuite.java | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java index 01edb1575719..a8f814bfae53 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java @@ -153,14 +153,6 @@ public List getD() { public BigInteger getE() { return e; } } - public static class BeanWithoutGetter implements Serializable { - private String a; - - public void setA(String a) { - this.a = a; - } - } - void validateDataFrameWithBeans(Bean bean, Dataset df) { StructType schema = df.schema(); Assert.assertEquals(new StructField("a", DoubleType$.MODULE$, false, Metadata.empty()), @@ -406,12 +398,20 @@ public void testBloomFilter() { } } + public static class BeanWithoutGetter implements Serializable { + private String a; + + public void setA(String a) { + this.a = a; + } + } + @Test public void testBeanWithoutGetter() { BeanWithoutGetter bean = new BeanWithoutGetter(); List data = Arrays.asList(bean); Dataset df = spark.createDataFrame(data, BeanWithoutGetter.class); Assert.assertEquals(df.schema().length(), 0); - Assert.assertEquals(df.count(), 1); + Assert.assertEquals(df.collectAsList().size(), 1); } } diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java index f7bc79357441..4581c6ebe9ef 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java @@ -1285,6 +1285,6 @@ public void testEmptyBean() { List data = Arrays.asList(bean); Dataset df = spark.createDataset(data, Encoders.bean(EmptyBean.class)); Assert.assertEquals(df.schema().length(), 0); - Assert.assertEquals(df.count(), 1); + Assert.assertEquals(df.collectAsList().size(), 1); } } From 36048555a620b77e896ba57b6d5b5e3d655cc793 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 23 Feb 2017 01:15:57 +0900 Subject: [PATCH 6/7] Address comments --- .../sql/catalyst/JavaTypeInference.scala | 20 +++++++++---------- .../org/apache/spark/sql/SQLContext.scala | 2 +- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala index 0145df271ea9..80706f6871ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala @@ -120,7 +120,7 @@ object JavaTypeInference { case other => // TODO: we should only collect properties that have getter and setter. However, some tests // pass in scala case class as java bean class which doesn't have getter and setter. - val properties = getJavaBeanPropertiesWithGetters(other) + val properties = getJavaBeanReadableProperties(other) val fields = properties.map { property => val returnType = typeToken.method(property.getReadMethod).getReturnType val (dataType, nullable) = inferDataType(returnType) @@ -130,17 +130,15 @@ object JavaTypeInference { } } - def getJavaBeanPropertiesWithGetters(beanClass: Class[_]): Array[PropertyDescriptor] = { + def getJavaBeanReadableProperties(beanClass: Class[_]): Array[PropertyDescriptor] = { val beanInfo = Introspector.getBeanInfo(beanClass) - beanInfo.getPropertyDescriptors - .filterNot(_.getName == "class").filter(p => p.getReadMethod != null) + beanInfo.getPropertyDescriptors.filterNot(_.getName == "class") + .filter(_.getReadMethod != null) } - private def getJavaBeanProperties(beanClass: Class[_]): Array[PropertyDescriptor] = { - val beanInfo = Introspector.getBeanInfo(beanClass) - beanInfo.getPropertyDescriptors - .filterNot(_.getName == "class") - .filter(p => p.getReadMethod != null && p.getWriteMethod != null) + private def getJavaBeanReadableWritableProperties( + beanClass: Class[_]): Array[PropertyDescriptor] = { + getJavaBeanReadableProperties(beanClass).filter(_.getWriteMethod != null) } private def elementType(typeToken: TypeToken[_]): TypeToken[_] = { @@ -304,7 +302,7 @@ object JavaTypeInference { keyData :: valueData :: Nil) case other => - val properties = getJavaBeanProperties(other) + val properties = getJavaBeanReadableWritableProperties(other) val setters = properties.map { p => val fieldName = p.getName val fieldType = typeToken.method(p.getReadMethod).getReturnType @@ -421,7 +419,7 @@ object JavaTypeInference { ) case other => - val properties = getJavaBeanProperties(other) + val properties = getJavaBeanReadableWritableProperties(other) CreateNamedStruct(properties.flatMap { p => val fieldName = p.getName val fieldType = typeToken.method(p.getReadMethod).getReturnType diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index e667018b6661..234ef2dffc6b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -1093,7 +1093,7 @@ object SQLContext { beanClass: Class[_], attrs: Seq[AttributeReference]): Iterator[InternalRow] = { val extractors = - JavaTypeInference.getJavaBeanPropertiesWithGetters(beanClass).map(_.getReadMethod) + JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod) val methodsToConverts = extractors.zip(attrs).map { case (e, attr) => (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType)) } From ac5cc7d0e4606beec9ce3c811107e17f0c9f2259 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 23 Feb 2017 01:23:33 +0900 Subject: [PATCH 7/7] Cleaner --- .../org/apache/spark/sql/catalyst/JavaTypeInference.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala index 80706f6871ce..e9d9508e5adf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala @@ -136,7 +136,7 @@ object JavaTypeInference { .filter(_.getReadMethod != null) } - private def getJavaBeanReadableWritableProperties( + private def getJavaBeanReadableAndWritableProperties( beanClass: Class[_]): Array[PropertyDescriptor] = { getJavaBeanReadableProperties(beanClass).filter(_.getWriteMethod != null) } @@ -302,7 +302,7 @@ object JavaTypeInference { keyData :: valueData :: Nil) case other => - val properties = getJavaBeanReadableWritableProperties(other) + val properties = getJavaBeanReadableAndWritableProperties(other) val setters = properties.map { p => val fieldName = p.getName val fieldType = typeToken.method(p.getReadMethod).getReturnType @@ -419,7 +419,7 @@ object JavaTypeInference { ) case other => - val properties = getJavaBeanReadableWritableProperties(other) + val properties = getJavaBeanReadableAndWritableProperties(other) CreateNamedStruct(properties.flatMap { p => val fieldName = p.getName val fieldType = typeToken.method(p.getReadMethod).getReturnType