From f3c35bf1540eeb90bd0e75aa34139333ede280c4 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Fri, 9 Mar 2018 16:37:57 +0900 Subject: [PATCH 1/4] ExternalMapToCatalyst should support interpreted execution --- .../expressions/objects/objects.scala | 24 +++++++++++++++++-- .../expressions/ObjectExpressionsSuite.scala | 11 ++++++++- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index f1ffcaec8a48..b36594ca6780 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -1255,8 +1255,28 @@ case class ExternalMapToCatalyst private( override def dataType: MapType = MapType( keyConverter.dataType, valueConverter.dataType, valueContainsNull = valueConverter.nullable) - override def eval(input: InternalRow): Any = - throw new UnsupportedOperationException("Only code-generated evaluation is supported") + private lazy val keyCatalystConverter = + CatalystTypeConverters.createToCatalystConverter(dataType.keyType) + private lazy val valueCatalystConverter = + CatalystTypeConverters.createToCatalystConverter(dataType.valueType) + + override def eval(input: InternalRow): Any = { + val result = child.eval(input) + if (result != null) { + val mapValue = result.asInstanceOf[Map[Any, Any]] + val keys = new Array[Any](mapValue.size) + val values = new Array[Any](mapValue.size) + var i = 0 + for ((k, v) <- mapValue) { + keys(i) = if (k != null) keyCatalystConverter(k) else null + values(i) = if (v != null) valueCatalystConverter(v) else null + i += 1 + } + new ArrayBasedMapData(ArrayData.toArrayData(keys), ArrayData.toArrayData(values)) + } else { + null + } + } override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val inputMap = child.genCode(ctx) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala index 7136af893448..0c0c6926c222 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala @@ -26,7 +26,7 @@ import scala.util.Random import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.sql.{RandomDataGenerator, Row} -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, ScalaReflection} import org.apache.spark.sql.catalyst.analysis.{ResolveTimeZone, SimpleAnalyzer, UnresolvedDeserializer} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.encoders._ @@ -473,6 +473,7 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkObjectExprEvaluation(deserializer, expected = data) } +<<<<<<< c48085aa91c60615a4de3b391f019f46f3fcdbe3 test("SPARK-23595 ValidateExternalType should support interpreted execution") { val inputObject = BoundReference(0, ObjectType(classOf[Row]), nullable = true) Seq( @@ -501,6 +502,14 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { InternalRow.fromSeq(Seq(Row(1))), "java.lang.Integer is not a valid external type for schema of double") } + + test("SPARK-23589 ExternalMapToCatalyst should support interpreted execution") { + val data = Map[Int, String](0 -> "v0", 1 -> "v1", 2 -> null, 3 -> "v3") + val serializer = GetStructField( + ScalaReflection.serializerFor[Map[Int, String]](Literal.fromObject(data)), 0) + val catalystValue = CatalystTypeConverters.convertToCatalyst(data) + checkEvaluation(serializer, catalystValue) + } } class TestBean extends Serializable { From ce0004024ab96a517c40adda8325f53d10b920d5 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Fri, 6 Apr 2018 18:23:47 +0900 Subject: [PATCH 2/4] Fix --- .../sql/catalyst/JavaTypeInference.scala | 3 +- .../expressions/objects/objects.scala | 62 +++++++++++++++---- .../expressions/ObjectExpressionsSuite.scala | 39 +++++++++--- 3 files changed, 83 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala index 3ecc137c8cd7..e4e66416dd9c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala @@ -357,7 +357,8 @@ object JavaTypeInference { } } - private def serializerFor(inputObject: Expression, typeToken: TypeToken[_]): Expression = { + private[catalyst] def serializerFor( + inputObject: Expression, typeToken: TypeToken[_]): Expression = { def toCatalystArray(input: Expression, elementType: TypeToken[_]): Expression = { val (dataType, nullable) = inferDataType(elementType) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index b36594ca6780..904709bd846a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -1255,23 +1255,59 @@ case class ExternalMapToCatalyst private( override def dataType: MapType = MapType( keyConverter.dataType, valueConverter.dataType, valueContainsNull = valueConverter.nullable) - private lazy val keyCatalystConverter = - CatalystTypeConverters.createToCatalystConverter(dataType.keyType) - private lazy val valueCatalystConverter = - CatalystTypeConverters.createToCatalystConverter(dataType.valueType) + private lazy val mapCatalystConverter: Any => (Array[Any], Array[Any]) = child.dataType match { + case ObjectType(cls) if classOf[java.util.Map[_, _]].isAssignableFrom(cls) => + (input: Any) => { + val data = input.asInstanceOf[java.util.Map[Any, Any]] + val keys = new Array[Any](data.size) + val values = new Array[Any](data.size) + val iter = data.entrySet().iterator() + var i = 0 + while (iter.hasNext) { + val entry = iter.next() + val (key, value) = (entry.getKey, entry.getValue) + keys(i) = if (key != null) { + keyConverter.eval(InternalRow.fromSeq(key :: Nil)) + } else { + throw new RuntimeException("Cannot use null as map key!") + } + values(i) = if (value != null) { + valueConverter.eval(InternalRow.fromSeq(value :: Nil)) + } else { + null + } + i += 1 + } + (keys, values) + } + + case ObjectType(cls) if classOf[scala.collection.Map[_, _]].isAssignableFrom(cls) => + (input: Any) => { + val data = input.asInstanceOf[scala.collection.Map[Any, Any]] + val keys = new Array[Any](data.size) + val values = new Array[Any](data.size) + var i = 0 + for ((key, value) <- data) { + keys(i) = if (key != null) { + keyConverter.eval(InternalRow.fromSeq(key :: Nil)) + } else { + throw new RuntimeException("Cannot use null as map key!") + } + values(i) = if (value != null) { + valueConverter.eval(InternalRow.fromSeq(value :: Nil)) + } else { + null + } + i += 1 + } + (keys, values) + } + } override def eval(input: InternalRow): Any = { val result = child.eval(input) if (result != null) { - val mapValue = result.asInstanceOf[Map[Any, Any]] - val keys = new Array[Any](mapValue.size) - val values = new Array[Any](mapValue.size) - var i = 0 - for ((k, v) <- mapValue) { - keys(i) = if (k != null) keyCatalystConverter(k) else null - values(i) = if (v != null) valueCatalystConverter(v) else null - i += 1 - } + val (keys, values) = mapCatalystConverter(result) new ArrayBasedMapData(ArrayData.toArrayData(keys), ArrayData.toArrayData(values)) } else { null diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala index 0c0c6926c222..59aa71cafead 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala @@ -23,11 +23,13 @@ import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.util.Random +import com.google.common.reflect.TypeToken + import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.sql.{RandomDataGenerator, Row} -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, ScalaReflection} -import org.apache.spark.sql.catalyst.analysis.{ResolveTimeZone, SimpleAnalyzer, UnresolvedDeserializer} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, JavaTypeInference, ScalaReflection} +import org.apache.spark.sql.catalyst.analysis.{SimpleAnalyzer, UnresolvedDeserializer} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection @@ -473,6 +475,7 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkObjectExprEvaluation(deserializer, expected = data) } +<<<<<<< f3c35bf1540eeb90bd0e75aa34139333ede280c4 <<<<<<< c48085aa91c60615a4de3b391f019f46f3fcdbe3 test("SPARK-23595 ValidateExternalType should support interpreted execution") { val inputObject = BoundReference(0, ObjectType(classOf[Row]), nullable = true) @@ -503,12 +506,34 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { "java.lang.Integer is not a valid external type for schema of double") } + private def javaSerializerFor(beanClass: Class[_])(inputObject: Expression): CreateNamedStruct = { + JavaTypeInference.serializerFor(inputObject, TypeToken.of(beanClass)) match { + case e => CreateNamedStruct(Literal("value") :: e :: Nil) + } + } + test("SPARK-23589 ExternalMapToCatalyst should support interpreted execution") { - val data = Map[Int, String](0 -> "v0", 1 -> "v1", 2 -> null, 3 -> "v3") - val serializer = GetStructField( - ScalaReflection.serializerFor[Map[Int, String]](Literal.fromObject(data)), 0) - val catalystValue = CatalystTypeConverters.convertToCatalyst(data) - checkEvaluation(serializer, catalystValue) + val scalaMap = scala.collection.Map[Int, String](0 -> "v0", 1 -> "v1", 2 -> null, 3 -> "v3") + val javaMap = new java.util.HashMap[java.lang.Integer, java.lang.String]() { + { + put(0, "v0") + put(1, "v1") + put(2, null) + put(3, "v3") + } + } + val expected = CatalystTypeConverters.convertToCatalyst(scalaMap) + + // Java Map + val serializer1 = GetStructField( + javaSerializerFor(javaMap.getClass)(Literal.fromObject(javaMap)), 0) + checkEvaluation(serializer1, expected) + + // Scala Map + val serializer2 = GetStructField( + ScalaReflection.serializerFor[scala.collection.Map[Int, String]]( + Literal.fromObject(scalaMap)), 0) + checkEvaluation(serializer2, expected) } } From 10d89ff24a44831f0748bf542c28c3145e059800 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Fri, 20 Apr 2018 13:50:32 +0900 Subject: [PATCH 3/4] Fix --- .../sql/catalyst/JavaTypeInference.scala | 3 ++- .../expressions/objects/objects.scala | 2 +- .../expressions/ObjectExpressionsSuite.scala | 27 ++++++++++++++++++- 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala index e4e66416dd9c..f7649d85b773 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala @@ -358,7 +358,8 @@ object JavaTypeInference { } private[catalyst] def serializerFor( - inputObject: Expression, typeToken: TypeToken[_]): Expression = { + inputObject: Expression, + typeToken: TypeToken[_]): Expression = { def toCatalystArray(input: Expression, elementType: TypeToken[_]): Expression = { val (dataType, nullable) = inferDataType(elementType) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 904709bd846a..9c7e76467d15 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -1308,7 +1308,7 @@ case class ExternalMapToCatalyst private( val result = child.eval(input) if (result != null) { val (keys, values) = mapCatalystConverter(result) - new ArrayBasedMapData(ArrayData.toArrayData(keys), ArrayData.toArrayData(values)) + new ArrayBasedMapData(new GenericArrayData(keys), new GenericArrayData(values)) } else { null } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala index 59aa71cafead..d1c99f533325 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.sql.{RandomDataGenerator, Row} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, JavaTypeInference, ScalaReflection} -import org.apache.spark.sql.catalyst.analysis.{SimpleAnalyzer, UnresolvedDeserializer} +import org.apache.spark.sql.catalyst.analysis.{ResolveTimeZone, SimpleAnalyzer, UnresolvedDeserializer} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection @@ -513,6 +513,7 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } test("SPARK-23589 ExternalMapToCatalyst should support interpreted execution") { + // Simple test val scalaMap = scala.collection.Map[Int, String](0 -> "v0", 1 -> "v1", 2 -> null, 3 -> "v3") val javaMap = new java.util.HashMap[java.lang.Integer, java.lang.String]() { { @@ -534,6 +535,30 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { ScalaReflection.serializerFor[scala.collection.Map[Int, String]]( Literal.fromObject(scalaMap)), 0) checkEvaluation(serializer2, expected) + + // NULL key test + val scalaMapHasNullKey = scala.collection.Map[java.lang.Integer, String]( + null.asInstanceOf[java.lang.Integer] -> "v0", new java.lang.Integer(1) -> "v1") + val javaMapHasNullKey = new java.util.HashMap[java.lang.Integer, java.lang.String]() { + { + put(null, "v0") + put(1, "v1") + } + } + + // Java Map + val serializer3 = GetStructField( + javaSerializerFor(javaMap.getClass)(Literal.fromObject(javaMapHasNullKey)), 0) + checkExceptionInExpression[RuntimeException]( + serializer3, EmptyRow, "Cannot use null as map key!") + + // Scala Map + val serializer4 = GetStructField( + ScalaReflection.serializerFor[scala.collection.Map[java.lang.Integer, String]]( + Literal.fromObject(scalaMapHasNullKey)), 0) + + checkExceptionInExpression[RuntimeException]( + serializer4, EmptyRow, "Cannot use null as map key!") } } From eaef6b374f86835bb08b9abf6d09d28aec1da9a8 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Mon, 23 Apr 2018 14:29:25 +0900 Subject: [PATCH 4/4] Fix --- .../sql/catalyst/JavaTypeInference.scala | 4 +- .../expressions/ObjectExpressionsSuite.scala | 81 +++++++++++++++---- 2 files changed, 65 insertions(+), 20 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala index f7649d85b773..3ecc137c8cd7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala @@ -357,9 +357,7 @@ object JavaTypeInference { } } - private[catalyst] def serializerFor( - inputObject: Expression, - typeToken: TypeToken[_]): Expression = { + private def serializerFor(inputObject: Expression, typeToken: TypeToken[_]): Expression = { def toCatalystArray(input: Expression, elementType: TypeToken[_]): Expression = { val (dataType, nullable) = inferDataType(elementType) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala index d1c99f533325..730b36c32333 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala @@ -21,10 +21,9 @@ import java.sql.{Date, Timestamp} import scala.collection.JavaConverters._ import scala.reflect.ClassTag +import scala.reflect.runtime.universe.TypeTag import scala.util.Random -import com.google.common.reflect.TypeToken - import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.sql.{RandomDataGenerator, Row} @@ -475,8 +474,6 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkObjectExprEvaluation(deserializer, expected = data) } -<<<<<<< f3c35bf1540eeb90bd0e75aa34139333ede280c4 -<<<<<<< c48085aa91c60615a4de3b391f019f46f3fcdbe3 test("SPARK-23595 ValidateExternalType should support interpreted execution") { val inputObject = BoundReference(0, ObjectType(classOf[Row]), nullable = true) Seq( @@ -506,10 +503,62 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { "java.lang.Integer is not a valid external type for schema of double") } - private def javaSerializerFor(beanClass: Class[_])(inputObject: Expression): CreateNamedStruct = { - JavaTypeInference.serializerFor(inputObject, TypeToken.of(beanClass)) match { - case e => CreateNamedStruct(Literal("value") :: e :: Nil) + private def javaMapSerializerFor( + keyClazz: Class[_], + valueClazz: Class[_])(inputObject: Expression): Expression = { + + def kvSerializerFor(inputObject: Expression, clazz: Class[_]): Expression = clazz match { + case c if c == classOf[java.lang.Integer] => + Invoke(inputObject, "intValue", IntegerType) + case c if c == classOf[java.lang.String] => + StaticInvoke( + classOf[UTF8String], + StringType, + "fromString", + inputObject :: Nil, + returnNullable = false) + } + + ExternalMapToCatalyst( + inputObject, + ObjectType(keyClazz), + kvSerializerFor(_, keyClazz), + keyNullable = true, + ObjectType(valueClazz), + kvSerializerFor(_, valueClazz), + valueNullable = true + ) + } + + private def scalaMapSerializerFor[T: TypeTag, U: TypeTag](inputObject: Expression): Expression = { + import org.apache.spark.sql.catalyst.ScalaReflection._ + + val curId = new java.util.concurrent.atomic.AtomicInteger() + + def kvSerializerFor[V: TypeTag](inputObject: Expression): Expression = + localTypeOf[V].dealias match { + case t if t <:< localTypeOf[java.lang.Integer] => + Invoke(inputObject, "intValue", IntegerType) + case t if t <:< localTypeOf[String] => + StaticInvoke( + classOf[UTF8String], + StringType, + "fromString", + inputObject :: Nil, + returnNullable = false) + case _ => + inputObject } + + ExternalMapToCatalyst( + inputObject, + dataTypeFor[T], + kvSerializerFor[T], + keyNullable = !localTypeOf[T].typeSymbol.asClass.isPrimitive, + dataTypeFor[U], + kvSerializerFor[U], + valueNullable = !localTypeOf[U].typeSymbol.asClass.isPrimitive + ) } test("SPARK-23589 ExternalMapToCatalyst should support interpreted execution") { @@ -526,14 +575,12 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val expected = CatalystTypeConverters.convertToCatalyst(scalaMap) // Java Map - val serializer1 = GetStructField( - javaSerializerFor(javaMap.getClass)(Literal.fromObject(javaMap)), 0) + val serializer1 = javaMapSerializerFor(classOf[java.lang.Integer], classOf[java.lang.String])( + Literal.fromObject(javaMap)) checkEvaluation(serializer1, expected) // Scala Map - val serializer2 = GetStructField( - ScalaReflection.serializerFor[scala.collection.Map[Int, String]]( - Literal.fromObject(scalaMap)), 0) + val serializer2 = scalaMapSerializerFor[Int, String](Literal.fromObject(scalaMap)) checkEvaluation(serializer2, expected) // NULL key test @@ -547,15 +594,15 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } // Java Map - val serializer3 = GetStructField( - javaSerializerFor(javaMap.getClass)(Literal.fromObject(javaMapHasNullKey)), 0) + val serializer3 = + javaMapSerializerFor(classOf[java.lang.Integer], classOf[java.lang.String])( + Literal.fromObject(javaMapHasNullKey)) checkExceptionInExpression[RuntimeException]( serializer3, EmptyRow, "Cannot use null as map key!") // Scala Map - val serializer4 = GetStructField( - ScalaReflection.serializerFor[scala.collection.Map[java.lang.Integer, String]]( - Literal.fromObject(scalaMapHasNullKey)), 0) + val serializer4 = scalaMapSerializerFor[java.lang.Integer, String]( + Literal.fromObject(scalaMapHasNullKey)) checkExceptionInExpression[RuntimeException]( serializer4, EmptyRow, "Cannot use null as map key!")