Skip to content

Commit c3a8246

Browse files
JoshRosenjeanlyn
authored andcommitted
[SPARK-7311] Introduce internal Serializer API for determining if serializers support object relocation
This patch extends the `Serializer` interface with a new `Private` API which allows serializers to indicate whether they support relocation of serialized objects in serializer stream output. This relocatibilty property is described in more detail in `Serializer.scala`, but in a nutshell a serializer supports relocation if reordering the bytes of serialized objects in serialization stream output is equivalent to having re-ordered those elements prior to serializing them. The optimized shuffle path introduced in apache#4450 and apache#5868 both rely on serializers having this property; this patch just centralizes the logic for determining whether a serializer has this property. I also added tests and comments clarifying when this works for KryoSerializer. This change allows the optimizations in apache#4450 to be applied for shuffles that use `SqlSerializer2`. Author: Josh Rosen <[email protected]> Closes apache#5924 from JoshRosen/SPARK-7311 and squashes the following commits: 50a68ca [Josh Rosen] Address minor nits 0a7ebd7 [Josh Rosen] Clarify reason why SqlSerializer2 supports this serializer 123b992 [Josh Rosen] Cleanup for submitting as standalone patch. 4aa61b2 [Josh Rosen] Add missing newline 2c1233a [Josh Rosen] Small refactoring of SerializerPropertiesSuite to enable test re-use: 0ba75e6 [Josh Rosen] Add tests for serializer relocation property. 450fa21 [Josh Rosen] Back out accidental log4j.properties change 86d4dcd [Josh Rosen] Flag that SparkSqlSerializer2 supports relocation b9624ee [Josh Rosen] Expand serializer API and use new function to help control when new UnsafeShuffle path is used.
1 parent 9548543 commit c3a8246

File tree

5 files changed

+166
-3
lines changed

5 files changed

+166
-3
lines changed

core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,13 @@ class KryoSerializer(conf: SparkConf)
125125
override def newInstance(): SerializerInstance = {
126126
new KryoSerializerInstance(this)
127127
}
128+
129+
private[spark] override lazy val supportsRelocationOfSerializedObjects: Boolean = {
130+
// If auto-reset is disabled, then Kryo may store references to duplicate occurrences of objects
131+
// in the stream rather than writing those objects' serialized bytes, breaking relocation. See
132+
// https://groups.google.com/d/msg/kryo-users/6ZUSyfjjtdo/FhGG1KHDXPgJ for more details.
133+
newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset()
134+
}
128135
}
129136

130137
private[spark]

core/src/main/scala/org/apache/spark/serializer/Serializer.scala

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.nio.ByteBuffer
2323
import scala.reflect.ClassTag
2424

2525
import org.apache.spark.{SparkConf, SparkEnv}
26-
import org.apache.spark.annotation.DeveloperApi
26+
import org.apache.spark.annotation.{DeveloperApi, Private}
2727
import org.apache.spark.util.{Utils, ByteBufferInputStream, NextIterator}
2828

2929
/**
@@ -63,6 +63,39 @@ abstract class Serializer {
6363

6464
/** Creates a new [[SerializerInstance]]. */
6565
def newInstance(): SerializerInstance
66+
67+
/**
68+
* :: Private ::
69+
* Returns true if this serializer supports relocation of its serialized objects and false
70+
* otherwise. This should return true if and only if reordering the bytes of serialized objects
71+
* in serialization stream output is equivalent to having re-ordered those elements prior to
72+
* serializing them. More specifically, the following should hold if a serializer supports
73+
* relocation:
74+
*
75+
* {{{
76+
* serOut.open()
77+
* position = 0
78+
* serOut.write(obj1)
79+
* serOut.flush()
80+
* position = # of bytes writen to stream so far
81+
* obj1Bytes = output[0:position-1]
82+
* serOut.write(obj2)
83+
* serOut.flush()
84+
* position2 = # of bytes written to stream so far
85+
* obj2Bytes = output[position:position2-1]
86+
* serIn.open([obj2bytes] concatenate [obj1bytes]) should return (obj2, obj1)
87+
* }}}
88+
*
89+
* In general, this property should hold for serializers that are stateless and that do not
90+
* write special metadata at the beginning or end of the serialization stream.
91+
*
92+
* This API is private to Spark; this method should not be overridden in third-party subclasses
93+
* or called in user code and is subject to removal in future Spark releases.
94+
*
95+
* See SPARK-7311 for more details.
96+
*/
97+
@Private
98+
private[spark] def supportsRelocationOfSerializedObjects: Boolean = false
6699
}
67100

68101

core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,7 @@ private[spark] class ExternalSorter[K, V, C](
131131
private val kvChunkSize = conf.getInt("spark.shuffle.sort.kvChunkSize", 1 << 22) // 4 MB
132132
private val useSerializedPairBuffer =
133133
!ordering.isDefined && conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) &&
134-
ser.isInstanceOf[KryoSerializer] &&
135-
serInstance.asInstanceOf[KryoSerializerInstance].getAutoReset
134+
ser.supportsRelocationOfSerializedObjects
136135

137136
// Data structures to store in-memory objects before we spill. Depending on whether we have an
138137
// Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.serializer
19+
20+
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
21+
22+
import scala.util.Random
23+
24+
import org.scalatest.{Assertions, FunSuite}
25+
26+
import org.apache.spark.SparkConf
27+
import org.apache.spark.serializer.KryoTest.RegistratorWithoutAutoReset
28+
29+
/**
30+
* Tests to ensure that [[Serializer]] implementations obey the API contracts for methods that
31+
* describe properties of the serialized stream, such as
32+
* [[Serializer.supportsRelocationOfSerializedObjects]].
33+
*/
34+
class SerializerPropertiesSuite extends FunSuite {
35+
36+
import SerializerPropertiesSuite._
37+
38+
test("JavaSerializer does not support relocation") {
39+
// Per a comment on the SPARK-4550 JIRA ticket, Java serialization appears to write out the
40+
// full class name the first time an object is written to an output stream, but subsequent
41+
// references to the class write a more compact identifier; this prevents relocation.
42+
val ser = new JavaSerializer(new SparkConf())
43+
testSupportsRelocationOfSerializedObjects(ser, generateRandomItem)
44+
}
45+
46+
test("KryoSerializer supports relocation when auto-reset is enabled") {
47+
val ser = new KryoSerializer(new SparkConf)
48+
assert(ser.newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset())
49+
testSupportsRelocationOfSerializedObjects(ser, generateRandomItem)
50+
}
51+
52+
test("KryoSerializer does not support relocation when auto-reset is disabled") {
53+
val conf = new SparkConf().set("spark.kryo.registrator",
54+
classOf[RegistratorWithoutAutoReset].getName)
55+
val ser = new KryoSerializer(conf)
56+
assert(!ser.newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset())
57+
testSupportsRelocationOfSerializedObjects(ser, generateRandomItem)
58+
}
59+
60+
}
61+
62+
object SerializerPropertiesSuite extends Assertions {
63+
64+
def generateRandomItem(rand: Random): Any = {
65+
val randomFunctions: Seq[() => Any] = Seq(
66+
() => rand.nextInt(),
67+
() => rand.nextString(rand.nextInt(10)),
68+
() => rand.nextDouble(),
69+
() => rand.nextBoolean(),
70+
() => (rand.nextInt(), rand.nextString(rand.nextInt(10))),
71+
() => MyCaseClass(rand.nextInt(), rand.nextString(rand.nextInt(10))),
72+
() => {
73+
val x = MyCaseClass(rand.nextInt(), rand.nextString(rand.nextInt(10)))
74+
(x, x)
75+
}
76+
)
77+
randomFunctions(rand.nextInt(randomFunctions.size)).apply()
78+
}
79+
80+
def testSupportsRelocationOfSerializedObjects(
81+
serializer: Serializer,
82+
generateRandomItem: Random => Any): Unit = {
83+
if (!serializer.supportsRelocationOfSerializedObjects) {
84+
return
85+
}
86+
val NUM_TRIALS = 5
87+
val rand = new Random(42)
88+
for (_ <- 1 to NUM_TRIALS) {
89+
val items = {
90+
// Make sure that we have duplicate occurrences of the same object in the stream:
91+
val randomItems = Seq.fill(10)(generateRandomItem(rand))
92+
randomItems ++ randomItems.take(5)
93+
}
94+
val baos = new ByteArrayOutputStream()
95+
val serStream = serializer.newInstance().serializeStream(baos)
96+
def serializeItem(item: Any): Array[Byte] = {
97+
val itemStartOffset = baos.toByteArray.length
98+
serStream.writeObject(item)
99+
serStream.flush()
100+
val itemEndOffset = baos.toByteArray.length
101+
baos.toByteArray.slice(itemStartOffset, itemEndOffset).clone()
102+
}
103+
val itemsAndSerializedItems: Seq[(Any, Array[Byte])] = {
104+
val serItems = items.map {
105+
item => (item, serializeItem(item))
106+
}
107+
serStream.close()
108+
rand.shuffle(serItems)
109+
}
110+
val reorderedSerializedData: Array[Byte] = itemsAndSerializedItems.flatMap(_._2).toArray
111+
val deserializedItemsStream = serializer.newInstance().deserializeStream(
112+
new ByteArrayInputStream(reorderedSerializedData))
113+
assert(deserializedItemsStream.asIterator.toSeq === itemsAndSerializedItems.map(_._1))
114+
deserializedItemsStream.close()
115+
}
116+
}
117+
}
118+
119+
private case class MyCaseClass(foo: Int, bar: String)

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,11 @@ private[sql] class SparkSqlSerializer2(keySchema: Array[DataType], valueSchema:
154154
with Serializable{
155155

156156
def newInstance(): SerializerInstance = new ShuffleSerializerInstance(keySchema, valueSchema)
157+
158+
override def supportsRelocationOfSerializedObjects: Boolean = {
159+
// SparkSqlSerializer2 is stateless and writes no stream headers
160+
true
161+
}
157162
}
158163

159164
private[sql] object SparkSqlSerializer2 {

0 commit comments

Comments
 (0)