From 60634f9f54fd2bee1a242544f992d19eb2756c48 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 26 Mar 2015 16:16:59 -0700 Subject: [PATCH 1/4] [SPARK-6405] Limiting the maximum Kryo buffer size to be 2GB. Kryo buffers are backed by byte arrays, but primitive arrays can only be up to 2GB in size. It is misleading to allow users to set buffers past this size. --- .../apache/spark/serializer/KryoSerializer.scala | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index f83bcaa5cc09..97a3fe5f46d5 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -49,10 +49,20 @@ class KryoSerializer(conf: SparkConf) with Logging with Serializable { - private val bufferSize = - (conf.getDouble("spark.kryoserializer.buffer.mb", 0.064) * 1024 * 1024).toInt + private val bufferSizeMb: Double = conf.getDouble("spark.kryoserializer.buffer.mb", 0.064) + if (bufferSizeMb > 2048) { + throw new IllegalArgumentException("spark.kryoserializer.buffer.mb must be less than " + + s"2048 megabytes, got: + $bufferSizeMb mb.") + } + private val bufferSize = (bufferSizeMb * 1024 * 1024).toInt + + val maxBufferSizeMb: Int = conf.getInt("spark.kryoserializer.buffer.max.mb", 64) + if (maxBufferSizeMb > 2048) { + throw new IllegalArgumentException("spark.kryoserializer.buffer.max.mb must be less than " + + s"2048 megabytes, got: + $maxBufferSizeMb mb.") + } + private val maxBufferSize = maxBufferSizeMb * 1024 * 1024 - private val maxBufferSize = conf.getInt("spark.kryoserializer.buffer.max.mb", 64) * 1024 * 1024 private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true) private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false) private val userRegistrator = conf.getOption("spark.kryo.registrator") From 09fd80b2782be1ad38c1df63cdc3b501063e98a7 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 26 Mar 2015 16:28:14 -0700 Subject: [PATCH 2/4] Should be >= not >. Slightly more consistent error message. --- .../org/apache/spark/serializer/KryoSerializer.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 97a3fe5f46d5..6b39d05c6621 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -50,16 +50,16 @@ class KryoSerializer(conf: SparkConf) with Serializable { private val bufferSizeMb: Double = conf.getDouble("spark.kryoserializer.buffer.mb", 0.064) - if (bufferSizeMb > 2048) { + if (bufferSizeMb >= 2048) { throw new IllegalArgumentException("spark.kryoserializer.buffer.mb must be less than " + - s"2048 megabytes, got: + $bufferSizeMb mb.") + s"2048 mb, got: + $bufferSizeMb mb.") } private val bufferSize = (bufferSizeMb * 1024 * 1024).toInt val maxBufferSizeMb: Int = conf.getInt("spark.kryoserializer.buffer.max.mb", 64) - if (maxBufferSizeMb > 2048) { + if (maxBufferSizeMb >= 2048) { throw new IllegalArgumentException("spark.kryoserializer.buffer.max.mb must be less than " + - s"2048 megabytes, got: + $maxBufferSizeMb mb.") + s"2048 mb, got: + $maxBufferSizeMb mb.") } private val maxBufferSize = maxBufferSizeMb * 1024 * 1024 From e2e30ce2fa98d6aa8fcd05b12ca08de4064a05cd Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 26 Mar 2015 20:18:20 -0700 Subject: [PATCH 3/4] Removing explicit int and double type to match style --- .../scala/org/apache/spark/serializer/KryoSerializer.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 6b39d05c6621..f52ba7d71517 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -49,19 +49,19 @@ class KryoSerializer(conf: SparkConf) with Logging with Serializable { - private val bufferSizeMb: Double = conf.getDouble("spark.kryoserializer.buffer.mb", 0.064) + private val bufferSizeMb = conf.getDouble("spark.kryoserializer.buffer.mb", 0.064) if (bufferSizeMb >= 2048) { throw new IllegalArgumentException("spark.kryoserializer.buffer.mb must be less than " + s"2048 mb, got: + $bufferSizeMb mb.") } private val bufferSize = (bufferSizeMb * 1024 * 1024).toInt - val maxBufferSizeMb: Int = conf.getInt("spark.kryoserializer.buffer.max.mb", 64) + val maxBufferSizeMb = conf.getInt("spark.kryoserializer.buffer.max.mb", 64) if (maxBufferSizeMb >= 2048) { throw new IllegalArgumentException("spark.kryoserializer.buffer.max.mb must be less than " + s"2048 mb, got: + $maxBufferSizeMb mb.") } - private val maxBufferSize = maxBufferSizeMb * 1024 * 1024 + private val maxBufferSize = maxBufferSizeMb * 1026 * 1024 private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true) private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false) From 1d6d1be9ef335c51b477d8c2f661a72cd1c82f73 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 26 Mar 2015 20:39:29 -0700 Subject: [PATCH 4/4] Fixing numeric typo --- .../main/scala/org/apache/spark/serializer/KryoSerializer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index f52ba7d71517..579fb6624e69 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -61,7 +61,7 @@ class KryoSerializer(conf: SparkConf) throw new IllegalArgumentException("spark.kryoserializer.buffer.max.mb must be less than " + s"2048 mb, got: + $maxBufferSizeMb mb.") } - private val maxBufferSize = maxBufferSizeMb * 1026 * 1024 + private val maxBufferSize = maxBufferSizeMb * 1024 * 1024 private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true) private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false)