Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql

import java.lang.reflect.Method
import java.util.Properties

import scala.collection.immutable
Expand Down Expand Up @@ -1100,13 +1101,24 @@ object SQLContext {
attrs: Seq[AttributeReference]): Iterator[InternalRow] = {
val extractors =
JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod)
val methodsToConverts = extractors.zip(attrs).map { case (e, attr) =>
(e, CatalystTypeConverters.createToCatalystConverter(attr.dataType))
val methodsToTypes = extractors.zip(attrs).map { case (e, attr) =>
(e, attr.dataType)
}
def invoke(element: Any)(tuple: (Method, DataType)): Any = tuple match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create converters before data.map { ... } instead of calculating converters for each row?

I mean something like:

def converter(e: Method, dt: DataType): Any => Any = dt match {
  case StructType(fields) =>
    val nestedExtractors =
      JavaTypeInference.getJavaBeanReadableProperties(e.getReturnType).map(_.getReadMethod)
    val nestedConverters =
      nestedExtractors.zip(fields).map { case (extractor, field) =>
        converter(extractor, field.dataType)
      }

    element =>
      val value = e.invoke(element)
      new GenericInternalRow(nestedConverters.map(_(value)))
  case _ =>
    val convert = CatalystTypeConverters.createToCatalystConverter(dt)
    element => convert(e.invoke(element))
}

and then

val converters = extractors.zip(attrs).map { case (e, attr) =>
  converter(e, attr.dataType)
}
data.map { element =>
  new GenericInternalRow(converters.map(_(element))): InternalRow
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Thank you. Changed it in the latest commit

case (e, structType: StructType) =>
val value = e.invoke(element)
val nestedExtractors = JavaTypeInference.getJavaBeanReadableProperties(value.getClass)
.map(desc => desc.getName -> desc.getReadMethod)
.toMap
new GenericInternalRow(structType.map(nestedProperty =>
invoke(value)(nestedExtractors(nestedProperty.name) -> nestedProperty.dataType)
).toArray)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should we use a map here while we don't need it for the root bean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, we don't have to. Just checked and JavaTypeInference.inferDataType also uses JavaTypeInference.getJavaBeanReadableProperties so the order should be the same. Also double-checked manually in Spark shell with a more complex nested bean to be sure.

case (e, dataType) =>
CatalystTypeConverters.createToCatalystConverter(dataType)(e.invoke(element))
}
data.map { element =>
new GenericInternalRow(
methodsToConverts.map { case (e, convert) => convert(e.invoke(element)) }
): InternalRow
methodsToTypes.map(invoke(element))): InternalRow
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public static class Bean implements Serializable {
private Map<String, int[]> c = ImmutableMap.of("hello", new int[] { 1, 2 });
private List<String> d = Arrays.asList("floppy", "disk");
private BigInteger e = new BigInteger("1234567");
private NestedBean f = new NestedBean();

public double getA() {
return a;
Expand All @@ -152,6 +153,18 @@ public List<String> getD() {
}

public BigInteger getE() { return e; }

public NestedBean getF() {
return f;
}

public static class NestedBean implements Serializable {
private int a = 1;

public int getA() {
return a;
}
}
}

void validateDataFrameWithBeans(Bean bean, Dataset<Row> df) {
Expand All @@ -171,7 +184,12 @@ void validateDataFrameWithBeans(Bean bean, Dataset<Row> df) {
schema.apply("d"));
Assert.assertEquals(new StructField("e", DataTypes.createDecimalType(38,0), true,
Metadata.empty()), schema.apply("e"));
Row first = df.select("a", "b", "c", "d", "e").first();
Assert.assertEquals(new StructField("f",
DataTypes.createStructType(Collections.singletonList(new StructField(
"a", IntegerType$.MODULE$, false, Metadata.empty()))),
true, Metadata.empty()),
schema.apply("f"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be double spaced.

Row first = df.select("a", "b", "c", "d", "e", "f").first();
Assert.assertEquals(bean.getA(), first.getDouble(0), 0.0);
// Now Java lists and maps are converted to Scala Seq's and Map's. Once we get a Seq below,
// verify that it has the expected length, and contains expected elements.
Expand All @@ -192,6 +210,8 @@ void validateDataFrameWithBeans(Bean bean, Dataset<Row> df) {
}
// Java.math.BigInteger is equivalent to Spark Decimal(38,0)
Assert.assertEquals(new BigDecimal(bean.getE()), first.getDecimal(4));
Row nested = first.getStruct(5);
Assert.assertEquals(bean.getF().getA(), nested.getInt(0));
}

@Test
Expand Down