Skip to content

Commit c2048a5

Browse files
concretevitaminmarmbrus
authored andcommitted
[SPARK-2498] [SQL] Synchronize on a lock when using scala reflection inside data type objects.
JIRA ticket: https://issues.apache.org/jira/browse/SPARK-2498 Author: Zongheng Yang <[email protected]> Closes apache#1423 from concretevitamin/scala-ref-catalyst and squashes the following commits: 325a149 [Zongheng Yang] Synchronize on a lock when initializing data type objects in Catalyst.
1 parent 502f907 commit c2048a5

File tree

1 file changed

+19
-15
lines changed
  • sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types

1 file changed

+19
-15
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,20 @@ package org.apache.spark.sql.catalyst.types
1919

2020
import java.sql.Timestamp
2121

22-
import scala.util.parsing.combinator.RegexParsers
23-
2422
import scala.reflect.ClassTag
2523
import scala.reflect.runtime.universe.{typeTag, TypeTag, runtimeMirror}
24+
import scala.util.parsing.combinator.RegexParsers
2625

2726
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
2827
import org.apache.spark.util.Utils
2928

3029
/**
31-
*
30+
* A JVM-global lock that should be used to prevent thread safety issues when using things in
31+
* scala.reflect.*. Note that Scala Reflection API is made thread-safe in 2.11, but not yet for
32+
* 2.10.* builds. See SI-6240 for more details.
3233
*/
34+
protected[catalyst] object ScalaReflectionLock
35+
3336
object DataType extends RegexParsers {
3437
protected lazy val primitiveType: Parser[DataType] =
3538
"StringType" ^^^ StringType |
@@ -62,7 +65,6 @@ object DataType extends RegexParsers {
6265
"true" ^^^ true |
6366
"false" ^^^ false
6467

65-
6668
protected lazy val structType: Parser[DataType] =
6769
"StructType\\([A-zA-z]*\\(".r ~> repsep(structField, ",") <~ "))" ^^ {
6870
case fields => new StructType(fields)
@@ -106,30 +108,32 @@ abstract class NativeType extends DataType {
106108
@transient val tag: TypeTag[JvmType]
107109
val ordering: Ordering[JvmType]
108110

109-
@transient val classTag = {
111+
@transient val classTag = ScalaReflectionLock.synchronized {
110112
val mirror = runtimeMirror(Utils.getSparkClassLoader)
111113
ClassTag[JvmType](mirror.runtimeClass(tag.tpe))
112114
}
113115
}
114116

115117
case object StringType extends NativeType with PrimitiveType {
116118
type JvmType = String
117-
@transient lazy val tag = typeTag[JvmType]
119+
@transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
118120
val ordering = implicitly[Ordering[JvmType]]
119121
}
122+
120123
case object BinaryType extends DataType with PrimitiveType {
121124
type JvmType = Array[Byte]
122125
}
126+
123127
case object BooleanType extends NativeType with PrimitiveType {
124128
type JvmType = Boolean
125-
@transient lazy val tag = typeTag[JvmType]
129+
@transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
126130
val ordering = implicitly[Ordering[JvmType]]
127131
}
128132

129133
case object TimestampType extends NativeType {
130134
type JvmType = Timestamp
131135

132-
@transient lazy val tag = typeTag[JvmType]
136+
@transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
133137

134138
val ordering = new Ordering[JvmType] {
135139
def compare(x: Timestamp, y: Timestamp) = x.compareTo(y)
@@ -159,31 +163,31 @@ abstract class IntegralType extends NumericType {
159163

160164
case object LongType extends IntegralType {
161165
type JvmType = Long
162-
@transient lazy val tag = typeTag[JvmType]
166+
@transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
163167
val numeric = implicitly[Numeric[Long]]
164168
val integral = implicitly[Integral[Long]]
165169
val ordering = implicitly[Ordering[JvmType]]
166170
}
167171

168172
case object IntegerType extends IntegralType {
169173
type JvmType = Int
170-
@transient lazy val tag = typeTag[JvmType]
174+
@transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
171175
val numeric = implicitly[Numeric[Int]]
172176
val integral = implicitly[Integral[Int]]
173177
val ordering = implicitly[Ordering[JvmType]]
174178
}
175179

176180
case object ShortType extends IntegralType {
177181
type JvmType = Short
178-
@transient lazy val tag = typeTag[JvmType]
182+
@transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
179183
val numeric = implicitly[Numeric[Short]]
180184
val integral = implicitly[Integral[Short]]
181185
val ordering = implicitly[Ordering[JvmType]]
182186
}
183187

184188
case object ByteType extends IntegralType {
185189
type JvmType = Byte
186-
@transient lazy val tag = typeTag[JvmType]
190+
@transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
187191
val numeric = implicitly[Numeric[Byte]]
188192
val integral = implicitly[Integral[Byte]]
189193
val ordering = implicitly[Ordering[JvmType]]
@@ -202,23 +206,23 @@ abstract class FractionalType extends NumericType {
202206

203207
case object DecimalType extends FractionalType {
204208
type JvmType = BigDecimal
205-
@transient lazy val tag = typeTag[JvmType]
209+
@transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
206210
val numeric = implicitly[Numeric[BigDecimal]]
207211
val fractional = implicitly[Fractional[BigDecimal]]
208212
val ordering = implicitly[Ordering[JvmType]]
209213
}
210214

211215
case object DoubleType extends FractionalType {
212216
type JvmType = Double
213-
@transient lazy val tag = typeTag[JvmType]
217+
@transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
214218
val numeric = implicitly[Numeric[Double]]
215219
val fractional = implicitly[Fractional[Double]]
216220
val ordering = implicitly[Ordering[JvmType]]
217221
}
218222

219223
case object FloatType extends FractionalType {
220224
type JvmType = Float
221-
@transient lazy val tag = typeTag[JvmType]
225+
@transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
222226
val numeric = implicitly[Numeric[Float]]
223227
val fractional = implicitly[Fractional[Float]]
224228
val ordering = implicitly[Ordering[JvmType]]

0 commit comments

Comments
 (0)