From 465fc8e18147b9e8cf34e0f5bcbc338d03ad4f95 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 8 Oct 2015 11:34:14 -0700 Subject: [PATCH 1/2] [SPARK-10914] UnsafeRow serialization breaks when two machines have different Oops size. The problem is that UnsafeRow contains 3 pieces of information when pointing to some data in memory (an object, a base offset, and length). When the row is serialized with Java/Kryo serialization, the object layout in memory can change if two machines have different pointer width (Oops in JVM). To reproduce, launch Spark using MASTER=local-cluster[2,1,1024] bin/spark-shell --conf "spark.executor.extraJavaOptions=-XX:-UseCompressedOops" And then run the following scala> sql("select 1 xx").collect() (cherry picked from commit 157b2a818d3993b1321cc41fb7b30407bd13490b) Signed-off-by: Reynold Xin --- .../sql/catalyst/expressions/UnsafeRow.java | 47 +++++++++++++++++-- .../execution/joins/BroadcastHashJoin.scala | 7 ++- .../org/apache/spark/sql/UnsafeRowSuite.scala | 29 +++++++++++- 3 files changed, 78 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index e8ac2999c2d2..5af7ed5d6eb6 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -17,8 +17,7 @@ package org.apache.spark.sql.catalyst.expressions; -import java.io.IOException; -import java.io.OutputStream; +import java.io.*; import java.math.BigDecimal; import java.math.BigInteger; import java.util.Arrays; @@ -26,6 +25,11 @@ import java.util.HashSet; import java.util.Set; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoSerializable; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + import org.apache.spark.sql.types.*; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.ByteArrayMethods; @@ -35,6 +39,7 @@ import org.apache.spark.unsafe.types.UTF8String; import static org.apache.spark.sql.types.DataTypes.*; +import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET; /** * An Unsafe implementation of Row which is backed by raw memory instead of Java objects. @@ -52,7 +57,7 @@ * * Instances of `UnsafeRow` act as pointers to row data stored in this format. */ -public final class UnsafeRow extends MutableRow { +public final class UnsafeRow extends MutableRow implements Externalizable, KryoSerializable { ////////////////////////////////////////////////////////////////////////////// // Static methods @@ -596,4 +601,40 @@ public boolean anyNull() { public void writeToMemory(Object target, long targetOffset) { Platform.copyMemory(baseObject, baseOffset, target, targetOffset, sizeInBytes); } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + byte[] bytes = getBytes(); + out.writeInt(bytes.length); + out.writeInt(this.numFields); + out.write(bytes); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + this.baseOffset = BYTE_ARRAY_OFFSET; + this.sizeInBytes = in.readInt(); + this.numFields = in.readInt(); + this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields); + this.baseObject = new byte[sizeInBytes]; + in.readFully((byte[]) baseObject); + } + + @Override + public void write(Kryo kryo, Output out) { + byte[] bytes = getBytes(); + out.writeInt(bytes.length); + out.writeInt(this.numFields); + out.write(bytes); + } + + @Override + public void read(Kryo kryo, Input in) { + this.baseOffset = BYTE_ARRAY_OFFSET; + this.sizeInBytes = in.readInt(); + this.numFields = in.readInt(); + this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields); + this.baseObject = new byte[sizeInBytes]; + in.read((byte[]) baseObject); + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala index 2e108cb81451..d0a890c2e9e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala @@ -89,8 +89,13 @@ case class BroadcastHashJoin( // The following line doesn't run in a job so we cannot track the metric value. However, we // have already tracked it in the above lines. So here we can use // `SQLMetrics.nullLongMetric` to ignore it. + + input.foreach { row => + println(row) + } + val hashed = HashedRelation( - input.iterator, SQLMetrics.nullLongMetric, buildSideKeyGenerator, input.size) + input.iterator, SQLMetrics.nullLongMetric, buildSideKeyGenerator, input.length) sparkContext.broadcast(hashed) } }(BroadcastHashJoin.broadcastHashJoinExecutionContext) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala index 944d4e11348c..7d1ee39d4b53 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql import java.io.ByteArrayOutputStream -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.serializer.{KryoSerializer, JavaSerializer} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, UnsafeProjection} import org.apache.spark.sql.types._ @@ -29,6 +30,32 @@ import org.apache.spark.unsafe.types.UTF8String class UnsafeRowSuite extends SparkFunSuite { + test("UnsafeRow Java serialization") { + // serializing an UnsafeRow pointing to a large buffer should only serialize the relevant data + val data = new Array[Byte](1024) + val row = new UnsafeRow + row.pointTo(data, 1, 16) + row.setLong(0, 19285) + + val ser = new JavaSerializer(new SparkConf).newInstance() + val row1 = ser.deserialize[UnsafeRow](ser.serialize(row)) + assert(row1.getLong(0) == 19285) + assert(row1.getBaseObject().asInstanceOf[Array[Byte]].length == 16) + } + + test("UnsafeRow Kryo serialization") { + // serializing an UnsafeRow pointing to a large buffer should only serialize the relevant data + val data = new Array[Byte](1024) + val row = new UnsafeRow + row.pointTo(data, 1, 16) + row.setLong(0, 19285) + + val ser = new KryoSerializer(new SparkConf).newInstance() + val row1 = ser.deserialize[UnsafeRow](ser.serialize(row)) + assert(row1.getLong(0) == 19285) + assert(row1.getBaseObject().asInstanceOf[Array[Byte]].length == 16) + } + test("bitset width calculation") { assert(UnsafeRow.calculateBitSetWidthInBytes(0) === 0) assert(UnsafeRow.calculateBitSetWidthInBytes(1) === 8) From 9b79e6f1de4114d7a9a1dc693c079409b6846ffb Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 8 Oct 2015 11:38:00 -0700 Subject: [PATCH 2/2] Revert BroadcastJoin change. --- .../spark/sql/execution/joins/BroadcastHashJoin.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala index d0a890c2e9e8..2e108cb81451 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala @@ -89,13 +89,8 @@ case class BroadcastHashJoin( // The following line doesn't run in a job so we cannot track the metric value. However, we // have already tracked it in the above lines. So here we can use // `SQLMetrics.nullLongMetric` to ignore it. - - input.foreach { row => - println(row) - } - val hashed = HashedRelation( - input.iterator, SQLMetrics.nullLongMetric, buildSideKeyGenerator, input.length) + input.iterator, SQLMetrics.nullLongMetric, buildSideKeyGenerator, input.size) sparkContext.broadcast(hashed) } }(BroadcastHashJoin.broadcastHashJoinExecutionContext)