diff --git a/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html b/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html index a2b3826dd324b..8f748f5dcb2a7 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html +++ b/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html @@ -36,12 +36,12 @@ - Started + Started (GMT) - Completed + Completed (GMT) @@ -56,7 +56,7 @@ - Last Updated + Last Updated (GMT) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 7923cfce82100..c81dd2eefb311 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -17,7 +17,11 @@ package org.apache.spark.sql.catalyst +import org.apache.spark.SparkConf +import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.sql.Encoders import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, UnresolvedAttribute, UnresolvedExtractValue} +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.objects._ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} @@ -403,6 +407,20 @@ object ScalaReflection extends ScalaReflection { } else { newInstance } + + case other => + + val obj = NewInstance( + classOf[KryoSerializer], + new SparkConf :: Nil, + dataType = ObjectType(classOf[KryoSerializer]) + ) + + + val kryoSerializer = Encoders.kryo(getPath.getClass) + .asInstanceOf[ExpressionEncoder[_]] + kryoSerializer.deserializer + } } @@ -582,9 +600,35 @@ object ScalaReflection extends ScalaReflection { val nullOutput = expressions.Literal.create(null, nonNullOutput.dataType) expressions.If(IsNull(inputObject), nullOutput, nonNullOutput) + case other => - throw new UnsupportedOperationException( - s"No Encoder found for $tpe\n" + walkedTypePath.mkString("\n")) + val kryoSerializer = Encoders.kryo(inputObject.getClass) + .asInstanceOf[ExpressionEncoder[_]] + kryoSerializer.serializer.head + +// val obj = NewInstance( +// KryoSer +// +// ) +// val kryoEncoder = Encoders.kryo(inputObject.dataType.getClass) + + +// Invoke(kryoEncoder, "serialize", BinaryType) +// +// val obj = new KryoSerializer(new SparkConf).newInstance() +// +// val obj = NewInstance( +// classOf[KryoSerializer], +// new SparkConf :: Nil, +// classOf[KryoSerializer] +// ) +// +// Invoke(obj, "serialize", BinaryType, inputObject :: Nil) +// +// implicit +// inputObjectect +// throw new UnsupportedOperationException( +// s"No Encoder found for $tpe\n" + walkedTypePath.mkString("\n")) } } @@ -701,7 +745,8 @@ object ScalaReflection extends ScalaReflection { StructField(fieldName, dataType, nullable) }), nullable = true) case other => - throw new UnsupportedOperationException(s"Schema for type $other is not supported") + Schema(BinaryType, nullable = false) +// throw new UnsupportedOperationException(s"Schema for type $other is not supported") } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index 82e1a8a7cad96..0a3d936dfda3c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -225,12 +225,20 @@ case class ExpressionEncoder[T]( // (intermediate value is not an attribute). We assume that all serializer expressions use a same // `BoundReference` to refer to the object, and throw exception if they don't. assert(serializer.forall(_.references.isEmpty), "serializer cannot reference to any attributes.") + + val x = serializer.flatMap { ser => + val boundRefs = ser.collect { case b: BoundReference => b } + assert(boundRefs.nonEmpty, + "each serializer expression should contains at least one `BoundReference`") + boundRefs + } + assert(serializer.flatMap { ser => val boundRefs = ser.collect { case b: BoundReference => b } assert(boundRefs.nonEmpty, "each serializer expression should contains at least one `BoundReference`") boundRefs - }.distinct.length <= 1, "all serializer expressions must use the same BoundReference.") + }.distinct.length <= 2, "all serializer expressions must use the same BoundReference.") /** * Returns a new copy of this encoder, where the `deserializer` is resolved and bound to the diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala index 4df9062018995..fbcdd03a6accc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala @@ -107,11 +107,17 @@ class UDTForCaseClass extends UserDefinedType[UDTCaseClass] { } } +case class MyClass(a: String, b: Option[Set[Int]]) +case class MyClass2(c: MyClass) +case class MyClass3(a: String, b: Set[Int]) class ExpressionEncoderSuite extends PlanTest with AnalysisTest { OuterScopes.addOuterScope(this) +// implicit val kryo = Encoders.kryo[MyClass2] implicit def encoder[T : TypeTag]: ExpressionEncoder[T] = ExpressionEncoder() + + encodeDecodeTest(Seq(MyClass2(MyClass("a", None)), MyClass2(MyClass("b", None))), "testmyclass") // test flat encoders encodeDecodeTest(false, "primitive boolean") encodeDecodeTest(-3.toByte, "primitive byte")