Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,25 @@

package org.apache.spark.sql.catalyst.expressions

import java.lang.{Boolean => JavaBoolean}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally think it is more clear to just say java.lang.Boolean in the code, but this is fine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have conventionally used JLong etc rather than JavaLong when doing this kind of rename.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just named these along with other classes such as https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala#L20. Nitpicking though, should we fix these name, too (e.g., JavaDecimal=>JDecimal)?

import java.lang.{Byte => JavaByte}
import java.lang.{Double => JavaDouble}
import java.lang.{Float => JavaFloat}
import java.lang.{Integer => JavaInteger}
import java.lang.{Long => JavaLong}
import java.lang.{Short => JavaShort}
import java.math.{BigDecimal => JavaBigDecimal}
import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import java.util
import java.util.Objects
import javax.xml.bind.DatatypeConverter

import scala.math.{BigDecimal, BigInt}

import org.json4s.JsonAST._

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
Expand All @@ -46,19 +57,63 @@ object Literal {
case s: String => Literal(UTF8String.fromString(s), StringType)
case b: Boolean => Literal(b, BooleanType)
case d: BigDecimal => Literal(Decimal(d), DecimalType(Math.max(d.precision, d.scale), d.scale))
case d: java.math.BigDecimal =>
case d: JavaBigDecimal =>
Literal(Decimal(d), DecimalType(Math.max(d.precision, d.scale), d.scale()))
case d: Decimal => Literal(d, DecimalType(Math.max(d.precision, d.scale), d.scale))
case t: Timestamp => Literal(DateTimeUtils.fromJavaTimestamp(t), TimestampType)
case d: Date => Literal(DateTimeUtils.fromJavaDate(d), DateType)
case a: Array[Byte] => Literal(a, BinaryType)
case a: Array[_] =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use schemaFor for this? Looks like we can remove the type inference codes added in this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, I tried though, I couldn't use it here because scala Array types are mapped into java ones in run-time, e.g., Array[Int]=>int[].

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may not understand your point. Where it will be mapped to java type? You use Array[_] here, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for my bad explanation.
For example, I added check codes below;

case a: Array[_] =>
   println("Array[] class in runtime:" + a.getClass.getSimpleName())
   ...

And, then run this code;

scala> Literal(scala.Array(1, 2, 3))
  Array[] class in runtime:int[]

IIUC, int[] is a java native array type and there is no type signature here.
Since schemaFor needs type signatures, I'm afraid we cannot this reflection func here.
There are other workarounds though, I couldn't find them.
Any better idea?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems work:

val t = scala.Array(1, 2, 3)
val classSymbol = runtimeMirror(getClass.getClassLoader).classSymbol(t.getClass)
val tpe = classSymbol.selfType
ScalaReflection.schemaFor(tpe).dataType

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it holds TypeTag, seems we can write this;

  def create[T : TypeTag](v: T): Literal = {
    val ScalaReflection.Schema(dataType, _) = ScalaReflection.schemaFor[T]
    val convert = CatalystTypeConverters.createToCatalystConverter(dataType)
    Literal(convert(v), dataType)
  }

This certainly works.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu ya, we will get a generic type back and schemaFor needs to know what the element type is.

Copy link
Member

@jodersky jodersky Sep 30, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu, I tested my solution in a shell and it worked. To fix the compilation error, simply make the overloaded method ScalaReflection.dataTypeFor public. I don't see any reason for it being private in the first place, apart from it just being a default.
Basically, just remove the "private" from the following in ScalaReflection.scala:
private def dataTypeFor(tpe:Type): DataType = //...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jodersky Can you check the returned DataType? From the code, looks like it will return ObjectType instead of ArrayType for an array type.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, you're right. I didn't realize you specifically wanted an ArrayType

val elementType = componentTypeToDataType(a.getClass.getComponentType())
val dataType = ArrayType(elementType)
val convert = CatalystTypeConverters.createToCatalystConverter(dataType)
Literal(convert(a), dataType)
case i: CalendarInterval => Literal(i, CalendarIntervalType)
case null => Literal(null, NullType)
case v: Literal => v
case _ =>
throw new RuntimeException("Unsupported literal type " + v.getClass + " " + v)
}

/**
* Returns the Spark SQL DataType for a given class object. Since this type needs to be resolved
* in runtime, we use match-case idioms for class objects here. However, there are similar
* functions in other files (e.g., HiveInspectors), so these functions need to merged into one.
*/
private[this] def componentTypeToDataType(clz: Class[_]): DataType = clz match {
// primitive types
case JavaShort.TYPE => ShortType
case JavaInteger.TYPE => IntegerType
case JavaLong.TYPE => LongType
case JavaDouble.TYPE => DoubleType
case JavaByte.TYPE => ByteType
case JavaFloat.TYPE => FloatType
case JavaBoolean.TYPE => BooleanType

// java classes
case _ if clz == classOf[Date] => DateType
case _ if clz == classOf[Timestamp] => TimestampType
case _ if clz == classOf[JavaBigDecimal] => DecimalType.SYSTEM_DEFAULT
case _ if clz == classOf[Array[Byte]] => BinaryType
case _ if clz == classOf[JavaShort] => ShortType
case _ if clz == classOf[JavaInteger] => IntegerType
case _ if clz == classOf[JavaLong] => LongType
case _ if clz == classOf[JavaDouble] => DoubleType
case _ if clz == classOf[JavaByte] => ByteType
case _ if clz == classOf[JavaFloat] => FloatType
case _ if clz == classOf[JavaBoolean] => BooleanType

// other scala classes
case _ if clz == classOf[String] => StringType
case _ if clz == classOf[BigInt] => DecimalType.SYSTEM_DEFAULT
case _ if clz == classOf[BigDecimal] => DecimalType.SYSTEM_DEFAULT
case _ if clz == classOf[CalendarInterval] => CalendarIntervalType

case _ if clz.isArray => ArrayType(componentTypeToDataType(clz.getComponentType))

case _ => throw new AnalysisException(s"Unsupported component type $clz in arrays")
}

/**
* Constructs a [[Literal]] of [[ObjectType]], for example when you need to pass an object
* into code generation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
Expand All @@ -43,6 +44,7 @@ class LiteralExpressionSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(Literal.create(null, TimestampType), null)
checkEvaluation(Literal.create(null, CalendarIntervalType), null)
checkEvaluation(Literal.create(null, ArrayType(ByteType, true)), null)
checkEvaluation(Literal.create(null, ArrayType(StringType, true)), null)
checkEvaluation(Literal.create(null, MapType(StringType, IntegerType)), null)
checkEvaluation(Literal.create(null, StructType(Seq.empty)), null)
}
Expand Down Expand Up @@ -122,5 +124,28 @@ class LiteralExpressionSuite extends SparkFunSuite with ExpressionEvalHelper {
}
}

// TODO(davies): add tests for ArrayType, MapType and StructType
test("array") {
def checkArrayLiteral(a: Array[_], elementType: DataType): Unit = {
val toCatalyst = (a: Array[_], elementType: DataType) => {
CatalystTypeConverters.createToCatalystConverter(ArrayType(elementType))(a)
}
checkEvaluation(Literal(a), toCatalyst(a, elementType))
}
checkArrayLiteral(Array(1, 2, 3), IntegerType)
checkArrayLiteral(Array("a", "b", "c"), StringType)
checkArrayLiteral(Array(1.0, 4.0), DoubleType)
checkArrayLiteral(Array(CalendarInterval.MICROS_PER_DAY, CalendarInterval.MICROS_PER_HOUR),
CalendarIntervalType)
}

test("unsupported types (map and struct) in literals") {
def checkUnsupportedTypeInLiteral(v: Any): Unit = {
val errMsgMap = intercept[RuntimeException] {
Literal(v)
}
assert(errMsgMap.getMessage.startsWith("Unsupported literal type"))
}
checkUnsupportedTypeInLiteral(Map("key1" -> 1, "key2" -> 2))
checkUnsupportedTypeInLiteral(("mike", 29, 1.0))
}
}