From 392975d3b5c48bc61bfa6caff7bfd23b9e095cde Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 23 Oct 2015 15:59:17 +0800 Subject: [PATCH 1/5] Use Spark BitSet instead of RoaringBitmap to reduce memory usage. --- .../apache/spark/scheduler/MapStatus.scala | 13 +++-- .../apache/spark/util/collection/BitSet.scala | 26 ++++++++-- .../spark/util/collection/BitSetSuite.scala | 49 +++++++++++++++++++ 3 files changed, 78 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index 1efce124c0a6..d252332c0601 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -19,9 +19,8 @@ package org.apache.spark.scheduler import java.io.{Externalizable, ObjectInput, ObjectOutput} -import org.roaringbitmap.RoaringBitmap - import org.apache.spark.storage.BlockManagerId +import org.apache.spark.util.collection.BitSet import org.apache.spark.util.Utils /** @@ -133,7 +132,7 @@ private[spark] class CompressedMapStatus( private[spark] class HighlyCompressedMapStatus private ( private[this] var loc: BlockManagerId, private[this] var numNonEmptyBlocks: Int, - private[this] var emptyBlocks: RoaringBitmap, + private[this] var emptyBlocks: BitSet, private[this] var avgSize: Long) extends MapStatus with Externalizable { @@ -146,7 +145,7 @@ private[spark] class HighlyCompressedMapStatus private ( override def location: BlockManagerId = loc override def getSizeForBlock(reduceId: Int): Long = { - if (emptyBlocks.contains(reduceId)) { + if (emptyBlocks.get(reduceId)) { 0 } else { avgSize @@ -161,7 +160,7 @@ private[spark] class HighlyCompressedMapStatus private ( override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { loc = BlockManagerId(in) - emptyBlocks = new RoaringBitmap() + emptyBlocks = new BitSet(0) emptyBlocks.readExternal(in) avgSize = in.readLong() } @@ -177,15 +176,15 @@ private[spark] object HighlyCompressedMapStatus { // From a compression standpoint, it shouldn't matter whether we track empty or non-empty // blocks. From a performance standpoint, we benefit from tracking empty blocks because // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions. - val emptyBlocks = new RoaringBitmap() val totalNumBlocks = uncompressedSizes.length + val emptyBlocks = new BitSet(totalNumBlocks) while (i < totalNumBlocks) { var size = uncompressedSizes(i) if (size > 0) { numNonEmptyBlocks += 1 totalSize += size } else { - emptyBlocks.add(i) + emptyBlocks.set(i) } i += 1 } diff --git a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala index 7ab67fc3a2de..81fdcdaab9da 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala @@ -17,14 +17,19 @@ package org.apache.spark.util.collection +import java.io.{Externalizable, ObjectInput, ObjectOutput} + +import org.apache.spark.util.{Utils => UUtils} + + /** * A simple, fixed-size bit set implementation. This implementation is fast because it avoids * safety/bound checking. */ -class BitSet(numBits: Int) extends Serializable { +class BitSet(private[this] var numBits: Int) extends Externalizable { - private val words = new Array[Long](bit2words(numBits)) - private val numWords = words.length + private var words = new Array[Long](bit2words(numBits)) + private def numWords = words.length /** * Compute the capacity (number of bits) that can be represented @@ -230,4 +235,19 @@ class BitSet(numBits: Int) extends Serializable { /** Return the number of longs it would take to hold numBits. */ private def bit2words(numBits: Int) = ((numBits - 1) >> 6) + 1 + + override def writeExternal(out: ObjectOutput): Unit = UUtils.tryOrIOException { + out.writeInt(numBits) + words.foreach(out.writeLong(_)) + } + + override def readExternal(in: ObjectInput): Unit = UUtils.tryOrIOException { + numBits = in.readInt() + words = new Array[Long](bit2words(numBits)) + var index = 0 + while (index < words.length) { + words(index) = in.readLong() + index += 1 + } + } } diff --git a/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala index 69dbfa9cd714..b0db0988eeaa 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala @@ -17,7 +17,10 @@ package org.apache.spark.util.collection +import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream} + import org.apache.spark.SparkFunSuite +import org.apache.spark.util.{Utils => UUtils} class BitSetSuite extends SparkFunSuite { @@ -152,4 +155,50 @@ class BitSetSuite extends SparkFunSuite { assert(bitsetDiff.nextSetBit(85) === 85) assert(bitsetDiff.nextSetBit(86) === -1) } + + test("read and write externally") { + val tempDir = UUtils.createTempDir() + val outputFile = File.createTempFile("bits", null, tempDir) + + val fos = new FileOutputStream(outputFile) + val oos = new ObjectOutputStream(fos) + + // Create BitSet + val setBits = Seq(0, 9, 1, 10, 90, 96) + val bitset = new BitSet(100) + + for (i <- 0 until 100) { + assert(!bitset.get(i)) + } + + setBits.foreach(i => bitset.set(i)) + + for (i <- 0 until 100) { + if (setBits.contains(i)) { + assert(bitset.get(i)) + } else { + assert(!bitset.get(i)) + } + } + assert(bitset.cardinality() === setBits.size) + + bitset.writeExternal(oos) + oos.close() + + val fis = new FileInputStream(outputFile) + val ois = new ObjectInputStream(fis) + + // Read BitSet from the file + val bitset2 = new BitSet(0) + bitset2.readExternal(ois) + + for (i <- 0 until 100) { + if (setBits.contains(i)) { + assert(bitset2.get(i)) + } else { + assert(!bitset2.get(i)) + } + } + assert(bitset2.cardinality() === setBits.size) + } } From d30ec974401bbf09ecea3f825faa5f55b3ccfcf3 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 23 Oct 2015 21:59:20 +0800 Subject: [PATCH 2/5] Remove RoaringBitmap from KryoSerializer and pom. --- core/pom.xml | 4 ---- .../scala/org/apache/spark/serializer/KryoSerializer.scala | 7 ------- pom.xml | 5 ----- 3 files changed, 16 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index 319a50049a82..1b6b13517bd5 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -173,10 +173,6 @@ net.jpountz.lz4 lz4 - - org.roaringbitmap - RoaringBitmap - commons-net commons-net diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index c5195c1143a8..73f81e0fc772 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -30,7 +30,6 @@ import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator} import org.apache.avro.generic.{GenericData, GenericRecord} -import org.roaringbitmap.{ArrayContainer, BitmapContainer, RoaringArray, RoaringBitmap} import org.apache.spark._ import org.apache.spark.api.python.PythonBroadcast @@ -363,12 +362,6 @@ private[serializer] object KryoSerializer { classOf[StorageLevel], classOf[CompressedMapStatus], classOf[HighlyCompressedMapStatus], - classOf[RoaringBitmap], - classOf[RoaringArray], - classOf[RoaringArray.Element], - classOf[Array[RoaringArray.Element]], - classOf[ArrayContainer], - classOf[BitmapContainer], classOf[CompactBuffer[_]], classOf[BlockManagerId], classOf[Array[Byte]], diff --git a/pom.xml b/pom.xml index 445e65c0459b..05981205a734 100644 --- a/pom.xml +++ b/pom.xml @@ -623,11 +623,6 @@ - - org.roaringbitmap - RoaringBitmap - 0.4.5 - commons-net commons-net From e981fc9a5de8bf14cad153995eabd5feeace3098 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 24 Oct 2015 19:00:31 +0800 Subject: [PATCH 3/5] Add BitSet to KryoSerializer. --- .../scala/org/apache/spark/serializer/KryoSerializer.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 73f81e0fc772..bc51d4f2820c 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -38,7 +38,7 @@ import org.apache.spark.network.util.ByteUnit import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus} import org.apache.spark.storage._ import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf} -import org.apache.spark.util.collection.CompactBuffer +import org.apache.spark.util.collection.{BitSet, CompactBuffer} /** * A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]]. @@ -362,6 +362,7 @@ private[serializer] object KryoSerializer { classOf[StorageLevel], classOf[CompressedMapStatus], classOf[HighlyCompressedMapStatus], + classOf[BitSet], classOf[CompactBuffer[_]], classOf[BlockManagerId], classOf[Array[Byte]], From 37f85ceca6d4c7b8f77a7dd7e4886243270ac660 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 31 Oct 2015 08:16:11 +0800 Subject: [PATCH 4/5] Fix the test getting stuck by adding default constructor. Remove comment mentioning roaring bitmaps in KryoSerializerSuite. --- .../scala/org/apache/spark/util/collection/BitSet.scala | 2 ++ .../org/apache/spark/serializer/KryoSerializerSuite.scala | 6 ------ 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala index 81fdcdaab9da..85c5bdbfcebc 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala @@ -31,6 +31,8 @@ class BitSet(private[this] var numBits: Int) extends Externalizable { private var words = new Array[Long](bit2words(numBits)) private def numWords = words.length + def this() = this(0) + /** * Compute the capacity (number of bits) that can be represented * by this bitset. diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index e428414cf6e8..afe2e80358ca 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -322,12 +322,6 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { val conf = new SparkConf(false) conf.set("spark.kryo.registrationRequired", "true") - // these cases require knowing the internals of RoaringBitmap a little. Blocks span 2^16 - // values, and they use a bitmap (dense) if they have more than 4096 values, and an - // array (sparse) if they use less. So we just create two cases, one sparse and one dense. - // and we use a roaring bitmap for the empty blocks, so we trigger the dense case w/ mostly - // empty blocks - val ser = new KryoSerializer(conf).newInstance() val denseBlockSizes = new Array[Long](5000) val sparseBlockSizes = Array[Long](0L, 1L, 0L, 2L) From a6ea5fcba2e9dd0aacdc9f0353c1512e612bc091 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 2 Nov 2015 12:50:40 +0800 Subject: [PATCH 5/5] Use default constructor. --- core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index d252332c0601..180c8d1827e1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -160,7 +160,7 @@ private[spark] class HighlyCompressedMapStatus private ( override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { loc = BlockManagerId(in) - emptyBlocks = new BitSet(0) + emptyBlocks = new BitSet emptyBlocks.readExternal(in) avgSize = in.readLong() }