Skip to content

Commit 0ba75e6

Browse files
committed
Add tests for serializer relocation property.
I verified that the Kryo tests will fail if we remove the auto-reset check in KryoSerializer. I also checked that this test fails if we mistakenly enable this flag for JavaSerializer. This demonstrates that the test case is actually capable of detecting the types of bugs that it's trying to prevent. Of course, it's possible that certain bugs will only surface when serializing specific data types, so we'll still have to be cautious when overriding `supportsRelocationOfSerializedObjects` for new serializers.
1 parent 450fa21 commit 0ba75e6

File tree

2 files changed

+106
-1
lines changed

2 files changed

+106
-1
lines changed

core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,9 @@ class KryoSerializer(conf: SparkConf)
127127
}
128128

129129
override def supportsRelocationOfSerializedObjects: Boolean = {
130-
// TODO: we should have a citation / explanatory comment here clarifying _why_ this is the case
130+
// If auto-flush is disabled, then Kryo may store references to duplicate occurrences of objects
131+
// in the stream rather than writing those objects' serialized bytes, breaking relocation. See
132+
// https://groups.google.com/d/msg/kryo-users/6ZUSyfjjtdo/FhGG1KHDXPgJ for more details.
131133
newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset()
132134
}
133135
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.serializer
19+
20+
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
21+
22+
import scala.util.Random
23+
24+
import org.scalatest.FunSuite
25+
26+
import org.apache.spark.SparkConf
27+
import org.apache.spark.serializer.KryoTest.RegistratorWithoutAutoReset
28+
29+
private case class MyCaseClass(foo: Int, bar: String)
30+
31+
class SerializerPropertiesSuite extends FunSuite {
32+
33+
test("JavaSerializer does not support relocation") {
34+
testSupportsRelocationOfSerializedObjects(new JavaSerializer(new SparkConf()))
35+
}
36+
37+
test("KryoSerializer supports relocation when auto-reset is enabled") {
38+
val ser = new KryoSerializer(new SparkConf)
39+
assert(ser.newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset())
40+
testSupportsRelocationOfSerializedObjects(ser)
41+
}
42+
43+
test("KryoSerializer does not support relocation when auto-reset is disabled") {
44+
val conf = new SparkConf().set("spark.kryo.registrator",
45+
classOf[RegistratorWithoutAutoReset].getName)
46+
val ser = new KryoSerializer(conf)
47+
assert(!ser.newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset())
48+
testSupportsRelocationOfSerializedObjects(ser)
49+
}
50+
51+
def testSupportsRelocationOfSerializedObjects(serializer: Serializer): Unit = {
52+
val NUM_TRIALS = 100
53+
if (!serializer.supportsRelocationOfSerializedObjects) {
54+
return
55+
}
56+
val rand = new Random(42)
57+
val randomFunctions: Seq[() => Any] = Seq(
58+
() => rand.nextInt(),
59+
() => rand.nextString(rand.nextInt(10)),
60+
() => rand.nextDouble(),
61+
() => rand.nextBoolean(),
62+
() => (rand.nextInt(), rand.nextString(rand.nextInt(10))),
63+
() => MyCaseClass(rand.nextInt(), rand.nextString(rand.nextInt(10))),
64+
() => {
65+
val x = MyCaseClass(rand.nextInt(), rand.nextString(rand.nextInt(10)))
66+
(x, x)
67+
}
68+
)
69+
def generateRandomItem(): Any = {
70+
randomFunctions(rand.nextInt(randomFunctions.size)).apply()
71+
}
72+
73+
for (_ <- 1 to NUM_TRIALS) {
74+
val items = {
75+
// Make sure that we have duplicate occurrences of the same object in the stream:
76+
val randomItems = Seq.fill(10)(generateRandomItem())
77+
randomItems ++ randomItems.take(5)
78+
}
79+
val baos = new ByteArrayOutputStream()
80+
val serStream = serializer.newInstance().serializeStream(baos)
81+
def serializeItem(item: Any): Array[Byte] = {
82+
val itemStartOffset = baos.toByteArray.length
83+
serStream.writeObject(item)
84+
serStream.flush()
85+
val itemEndOffset = baos.toByteArray.length
86+
baos.toByteArray.slice(itemStartOffset, itemEndOffset).clone()
87+
}
88+
val itemsAndSerializedItems: Seq[(Any, Array[Byte])] = {
89+
val serItems = items.map {
90+
item => (item, serializeItem(item))
91+
}
92+
serStream.close()
93+
rand.shuffle(serItems)
94+
}
95+
val reorderedSerializedData: Array[Byte] = itemsAndSerializedItems.flatMap(_._2).toArray
96+
val deserializedItemsStream = serializer.newInstance().deserializeStream(
97+
new ByteArrayInputStream(reorderedSerializedData))
98+
assert(deserializedItemsStream.asIterator.toSeq === itemsAndSerializedItems.map(_._1))
99+
deserializedItemsStream.close()
100+
}
101+
}
102+
103+
}

0 commit comments

Comments
 (0)