Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ object EmptyRow extends Row {
*/
class GenericRow(protected[sql] val values: Array[Any]) extends Row {
/** No-arg constructor for serialization. */
def this() = this(null)
protected def this() = this(null)

def this(size: Int) = this(new Array[Any](size))

Expand Down Expand Up @@ -172,11 +172,14 @@ class GenericRow(protected[sql] val values: Array[Any]) extends Row {

class GenericRowWithSchema(values: Array[Any], override val schema: StructType)
extends GenericRow(values) {

/** No-arg constructor for serialization. */
protected def this() = this(null, null)
}

class GenericMutableRow(v: Array[Any]) extends GenericRow(v) with MutableRow {
/** No-arg constructor for serialization. */
def this() = this(null)
protected def this() = this(null)

def this(size: Int) = this(new Array[Any](size))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ import org.apache.spark.annotation.DeveloperApi
sealed class Metadata private[types] (private[types] val map: Map[String, Any])
extends Serializable {

/** No-arg constructor for kryo. */
protected def this() = this(null)

/** Tests whether this Metadata contains a binding for a key. */
def contains(key: String): Boolean = map.contains(key)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,10 @@ case class PrecisionInfo(precision: Int, scale: Int)
*/
@DeveloperApi
case class DecimalType(precisionInfo: Option[PrecisionInfo]) extends FractionalType {

/** No-arg constructor for kryo. */
protected def this() = this(null)

private[sql] type JvmType = Decimal
@transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
private[sql] val numeric = Decimal.DecimalIsFractional
Expand Down Expand Up @@ -819,6 +823,10 @@ object ArrayType {
*/
@DeveloperApi
case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataType {

/** No-arg constructor for kryo. */
protected def this() = this(null, false)

private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
builder.append(
s"$prefix-- element: ${elementType.typeName} (containsNull = $containsNull)\n")
Expand Down Expand Up @@ -857,6 +865,9 @@ case class StructField(
nullable: Boolean = true,
metadata: Metadata = Metadata.empty) {

/** No-arg constructor for kryo. */
protected def this() = this(null, null)

private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
builder.append(s"$prefix-- $name: ${dataType.typeName} (nullable = $nullable)\n")
DataType.buildFormattedString(dataType, s"$prefix |", builder)
Expand Down Expand Up @@ -1003,6 +1014,9 @@ object StructType {
@DeveloperApi
case class StructType(fields: Array[StructField]) extends DataType with Seq[StructField] {

/** No-arg constructor for kryo. */
protected def this() = this(null)

/** Returns all field names in an array. */
def fieldNames: Array[String] = fields.map(_.name)

Expand Down Expand Up @@ -1121,6 +1135,10 @@ case class MapType(
keyType: DataType,
valueType: DataType,
valueContainsNull: Boolean) extends DataType {

/** No-arg constructor for kryo. */
def this() = this(null, null, false)

private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
builder.append(s"$prefix-- key: ${keyType.typeName}\n")
builder.append(s"$prefix-- value: ${valueType.typeName} " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{IntegerHashSet, LongHa

private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) {
override def newKryo(): Kryo = {
val kryo = new Kryo()
val kryo = super.newKryo()
kryo.setRegistrationRequired(false)
kryo.register(classOf[MutablePair[_, _]])
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow])
Expand All @@ -57,8 +57,6 @@ private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(co
kryo.register(classOf[Decimal])

kryo.setReferences(false)
kryo.setClassLoader(Utils.getSparkClassLoader)
new AllScalaRegistrar().apply(kryo)
kryo
}
}
Expand Down
12 changes: 12 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.spark.sql

import org.apache.spark.sql.execution.SparkSqlSerializer
import org.scalatest.FunSuite

import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, SpecificMutableRow}
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext.implicits._
import org.apache.spark.sql.types._

class RowSuite extends FunSuite {
Expand Down Expand Up @@ -50,4 +53,13 @@ class RowSuite extends FunSuite {
row(0) = null
assert(row.isNullAt(0))
}

test("serialize w/ kryo") {
val row = Seq((1, Seq(1), Map(1 -> 1), BigDecimal(1))).toDF().first()
val serializer = new SparkSqlSerializer(TestSQLContext.sparkContext.getConf)
val instance = serializer.newInstance()
val ser = instance.serialize(row)
val de = instance.deserialize(ser).asInstanceOf[Row]
assert(de === row)
}
}