diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 66812a54846c..1e1c27c47787 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -42,7 +42,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.network.util.ByteUnit import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus} import org.apache.spark.storage._ -import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf, Utils} +import org.apache.spark.util.{BoundedPriorityQueue, ByteBufferInputStream, SerializableConfiguration, SerializableJobConf, Utils} import org.apache.spark.util.collection.CompactBuffer /** @@ -417,7 +417,12 @@ private[spark] class KryoSerializerInstance( override def deserialize[T: ClassTag](bytes: ByteBuffer): T = { val kryo = borrowKryo() try { - input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining()) + if (bytes.hasArray) { + input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining()) + } else { + input.setBuffer(new Array[Byte](4096)) + input.setInputStream(new ByteBufferInputStream(bytes)) + } kryo.readClassAndObject(input).asInstanceOf[T] } finally { releaseKryo(kryo) @@ -429,7 +434,12 @@ private[spark] class KryoSerializerInstance( val oldClassLoader = kryo.getClassLoader try { kryo.setClassLoader(loader) - input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining()) + if (bytes.hasArray) { + input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining()) + } else { + input.setBuffer(new Array[Byte](4096)) + input.setInputStream(new ByteBufferInputStream(bytes)) + } kryo.readClassAndObject(input).asInstanceOf[T] } finally { kryo.setClassLoader(oldClassLoader) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index e413fe3b774d..a7eed4b6a8b8 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.serializer import java.io.{ByteArrayInputStream, ByteArrayOutputStream, FileInputStream, FileOutputStream} +import java.nio.ByteBuffer import java.util.concurrent.Executors import scala.collection.JavaConverters._ @@ -551,6 +552,17 @@ class KryoSerializerAutoResetDisabledSuite extends SparkFunSuite with SharedSpar deserializationStream.close() assert(serInstance.deserialize[Any](helloHello) === ((hello, hello))) } + + test("SPARK-25786: ByteBuffer.array -- UnsupportedOperationException") { + val serInstance = new KryoSerializer(conf).newInstance().asInstanceOf[KryoSerializerInstance] + val obj = "UnsupportedOperationException" + val serObj = serInstance.serialize(obj) + val byteBuffer = ByteBuffer.allocateDirect(serObj.array().length) + byteBuffer.put(serObj.array()) + byteBuffer.flip() + assert(serInstance.deserialize[Any](serObj) === (obj)) + assert(serInstance.deserialize[Any](byteBuffer) === (obj)) + } } class ClassLoaderTestingObject