Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions.objects

import java.lang.reflect.Modifier

import scala.collection.JavaConverters._
import scala.collection.mutable.Builder
import scala.language.existentials
import scala.reflect.ClassTag
Expand Down Expand Up @@ -501,12 +502,22 @@ case class LambdaVariable(
value: String,
isNull: String,
dataType: DataType,
nullable: Boolean = true) extends LeafExpression
with Unevaluable with NonSQLExpression {
nullable: Boolean = true) extends LeafExpression with NonSQLExpression {

// Interpreted execution of `LambdaVariable` always get the 0-index element from input row.
override def eval(input: InternalRow): Any = {
assert(input.numFields == 1,
"The input row of interpreted LambdaVariable should have only 1 field.")
input.get(0, dataType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a change for this PR. Maybe we should use accessors here? This uses a matching under the hood and is slower than virtual function dispatch. Implementing this would also be useful for BoundReference for example.

Copy link
Member Author

@viirya viirya Mar 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean something like this?

lazy val accessor:  InternalRow => Any = dataType match {
  case IntegerType => (inputRow) => inputRow.getInt(0)
  case LongType => (inputRow) => inputRow.getLong(0)
  ...
}

override def eval(input: InternalRow): Any = accessor(input)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's spin that off into a different ticket if we want to work on it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. After this is merged, I will create another PR for it.

}

override def genCode(ctx: CodegenContext): ExprCode = {
ExprCode(code = "", value = value, isNull = if (nullable) isNull else "false")
}

// This won't be called as `genCode` is overrided, just overriding it to make
// `LambdaVariable` non-abstract.
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev
}

/**
Expand Down Expand Up @@ -599,8 +610,79 @@ case class MapObjects private(

override def children: Seq[Expression] = lambdaFunction :: inputData :: Nil

override def eval(input: InternalRow): Any =
throw new UnsupportedOperationException("Only code-generated evaluation is supported")
// The data with UserDefinedType are actually stored with the data type of its sqlType.
// When we want to apply MapObjects on it, we have to use it.
lazy private val inputDataType = inputData.dataType match {
case u: UserDefinedType[_] => u.sqlType
case _ => inputData.dataType
}

private def executeFuncOnCollection(inputCollection: Seq[_]): Seq[_] = {
inputCollection.map { element =>
val row = InternalRow.fromSeq(Seq(element))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT reuse the row object.

lambdaFunction.eval(row)
}
}

// Executes lambda function on input collection.
private lazy val executeFunc: Any => Seq[_] = inputDataType match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we shouldn't just return an Iterator instead of a Seq? This seems a bit more flexible, allows us to avoid materializing an intermediate sequence. WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea!

case ObjectType(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
x => executeFuncOnCollection(x.asInstanceOf[Seq[_]])
case ObjectType(cls) if cls.isArray =>
x => executeFuncOnCollection(x.asInstanceOf[Array[_]].toSeq)
case ObjectType(cls) if classOf[java.util.List[_]].isAssignableFrom(cls) =>
x => executeFuncOnCollection(x.asInstanceOf[java.util.List[_]].asScala)
case ObjectType(cls) if cls == classOf[Object] =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugghh... I know understand why this needed. RowEncoder does not pass the needed type information down: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala#L146

This obviously needs to be done during evaluation. You got it right in the previous commit. I am sorry for misunderstanding this, and making you move it. Next time please call me out on this!

(inputCollection) => {
if (inputCollection.getClass.isArray) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I am sorry for sounding like a broken record) But can we move this check out of the the function closure?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry...

executeFuncOnCollection(inputCollection.asInstanceOf[Array[_]].toSeq)
} else {
executeFuncOnCollection(inputCollection.asInstanceOf[Seq[_]])
}
}
case ArrayType(et, _) =>
x => executeFuncOnCollection(x.asInstanceOf[ArrayData].array)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will blow up with UnsafeArrayData :(... It would be nice if we can avoid copying the entire array. We could implement an ArrayData wrapper that implements Seq or Iterable (I slightly prefer the latter).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we implement this wrapper here, or a follow-up?

}

// Converts the processed collection to custom collection class if any.
private lazy val getResults: Seq[_] => Any = customCollectionCls match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a catch all clause that throws a nice exception to this match statement?

case Some(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
// Scala sequence
_.toSeq
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This identity right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yap.

case Some(cls) if classOf[scala.collection.Set[_]].isAssignableFrom(cls) =>
// Scala set
_.toSet
case Some(cls) if classOf[java.util.List[_]].isAssignableFrom(cls) =>
// Java list
if (cls == classOf[java.util.List[_]] || cls == classOf[java.util.AbstractList[_]] ||
Copy link
Contributor

@hvanhovell hvanhovell Mar 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC you are matching against non concrete implementations of java.util.List? Maybe add this as documentation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

cls == classOf[java.util.AbstractSequentialList[_]]) {
_.asJava
} else {
(results) => {
val builder = Try(cls.getConstructor(Integer.TYPE)).map { constructor =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you try to do the constructor lookup only once? The duplication that that will cause is ok.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I understand correctly. Please check update again.

constructor.newInstance(results.length.asInstanceOf[Object])
}.getOrElse {
cls.getConstructor().newInstance()
}.asInstanceOf[java.util.List[Any]]

results.foreach(builder.add(_))
builder
}
}
case None =>
// array
x => new GenericArrayData(x.toArray)
}

override def eval(input: InternalRow): Any = {
val inputCollection = inputData.eval(input)

if (inputCollection == null) {
return null
}

getResults(executeFunc(inputCollection))
}

override def dataType: DataType =
customCollectionCls.map(ObjectType.apply).getOrElse(
Expand Down Expand Up @@ -647,13 +729,6 @@ case class MapObjects private(
case _ => ""
}

// The data with PythonUserDefinedType are actually stored with the data type of its sqlType.
// When we want to apply MapObjects on it, we have to use it.
val inputDataType = inputData.dataType match {
case p: PythonUserDefinedType => p.sqlType
case _ => inputData.dataType
}

// `MapObjects` generates a while loop to traverse the elements of the input collection. We
// need to take care of Seq and List because they may have O(n) complexity for indexed accessing
// like `list.get(1)`. Here we use Iterator to traverse Seq and List.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.expressions

import scala.collection.JavaConverters._
import scala.reflect.ClassTag

import org.apache.spark.{SparkConf, SparkFunSuite}
Expand All @@ -25,7 +26,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.objects._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData}
import org.apache.spark.sql.types._


Expand Down Expand Up @@ -126,6 +127,57 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
}
}

test("SPARK-23587: MapObjects should support interpreted execution") {
val customCollectionClasses = Seq(classOf[Seq[Int]], classOf[scala.collection.Set[Int]],
classOf[java.util.List[Int]], classOf[java.util.AbstractList[Int]],
classOf[java.util.AbstractSequentialList[Int]], classOf[java.util.Vector[Int]],
classOf[java.util.Stack[Int]], null)
val function = (lambda: Expression) => Add(lambda, Literal(1))
val elementType = IntegerType
val expected = Seq(2, 3, 4)

val list = new java.util.ArrayList[Int]()
list.add(1)
list.add(2)
list.add(3)
val arrayData = new GenericArrayData(Array(1, 2, 3))
val vector = new java.util.Vector[Int]()
vector.add(1)
vector.add(2)
vector.add(3)
val stack = new java.util.Stack[Int]()
stack.add(1)
stack.add(2)
stack.add(3)

Seq(
(Seq(1, 2, 3), ObjectType(classOf[Seq[Int]])),
(list, ObjectType(classOf[java.util.List[Int]])),
(vector, ObjectType(classOf[java.util.Vector[Int]])),
(arrayData, ArrayType(IntegerType))
).foreach { case (collection, inputType) =>
val inputObject = BoundReference(0, inputType, nullable = true)

customCollectionClasses.foreach { customCollectionCls =>
val optClass = Option(customCollectionCls)
val mapObj = MapObjects(function, inputObject, elementType, true, optClass)
val row = InternalRow.fromSeq(Seq(collection))
val result = mapObj.eval(row)

customCollectionCls match {
case null =>
assert(result.asInstanceOf[ArrayData].array.toSeq == expected)
case l if classOf[java.util.List[_]].isAssignableFrom(l) =>
assert(result.asInstanceOf[java.util.List[_]].asScala.toSeq == expected)
case s if classOf[Seq[_]].isAssignableFrom(s) =>
assert(result.asInstanceOf[Seq[_]].toSeq == expected)
case s if classOf[scala.collection.Set[_]].isAssignableFrom(s) =>
assert(result.asInstanceOf[scala.collection.Set[_]] == expected.toSet)
}
}
}
}

test("SPARK-23592: DecodeUsingSerializer should support interpreted execution") {
val cls = classOf[java.lang.Integer]
val inputObject = BoundReference(0, ObjectType(classOf[Array[Byte]]), nullable = true)
Expand Down