Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,6 @@
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
Expand Down
13 changes: 6 additions & 7 deletions core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ package org.apache.spark.scheduler

import java.io.{Externalizable, ObjectInput, ObjectOutput}

import org.roaringbitmap.RoaringBitmap

import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.collection.BitSet
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -133,7 +132,7 @@ private[spark] class CompressedMapStatus(
private[spark] class HighlyCompressedMapStatus private (
private[this] var loc: BlockManagerId,
private[this] var numNonEmptyBlocks: Int,
private[this] var emptyBlocks: RoaringBitmap,
private[this] var emptyBlocks: BitSet,
private[this] var avgSize: Long)
extends MapStatus with Externalizable {

Expand All @@ -146,7 +145,7 @@ private[spark] class HighlyCompressedMapStatus private (
override def location: BlockManagerId = loc

override def getSizeForBlock(reduceId: Int): Long = {
if (emptyBlocks.contains(reduceId)) {
if (emptyBlocks.get(reduceId)) {
0
} else {
avgSize
Expand All @@ -161,7 +160,7 @@ private[spark] class HighlyCompressedMapStatus private (

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
emptyBlocks = new RoaringBitmap()
emptyBlocks = new BitSet(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except that I'd make this a call to the no-arg constructor. It's almost a plus, in that it verifies it exists for exactly the purpose it's needed.

Any objections? i'll merge soon anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated it.

emptyBlocks.readExternal(in)
avgSize = in.readLong()
}
Expand All @@ -177,15 +176,15 @@ private[spark] object HighlyCompressedMapStatus {
// From a compression standpoint, it shouldn't matter whether we track empty or non-empty
// blocks. From a performance standpoint, we benefit from tracking empty blocks because
// we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
val emptyBlocks = new RoaringBitmap()
val totalNumBlocks = uncompressedSizes.length
val emptyBlocks = new BitSet(totalNumBlocks)
while (i < totalNumBlocks) {
var size = uncompressedSizes(i)
if (size > 0) {
numNonEmptyBlocks += 1
totalSize += size
} else {
emptyBlocks.add(i)
emptyBlocks.set(i)
}
i += 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.roaringbitmap.{ArrayContainer, BitmapContainer, RoaringArray, RoaringBitmap}

import org.apache.spark._
import org.apache.spark.api.python.PythonBroadcast
Expand All @@ -39,7 +38,7 @@ import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._
import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf}
import org.apache.spark.util.collection.CompactBuffer
import org.apache.spark.util.collection.{BitSet, CompactBuffer}

/**
* A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
Expand Down Expand Up @@ -363,12 +362,7 @@ private[serializer] object KryoSerializer {
classOf[StorageLevel],
classOf[CompressedMapStatus],
classOf[HighlyCompressedMapStatus],
classOf[RoaringBitmap],
classOf[RoaringArray],
classOf[RoaringArray.Element],
classOf[Array[RoaringArray.Element]],
classOf[ArrayContainer],
classOf[BitmapContainer],
classOf[BitSet],
classOf[CompactBuffer[_]],
classOf[BlockManagerId],
classOf[Array[Byte]],
Expand Down
28 changes: 25 additions & 3 deletions core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@

package org.apache.spark.util.collection

import java.io.{Externalizable, ObjectInput, ObjectOutput}

import org.apache.spark.util.{Utils => UUtils}


/**
* A simple, fixed-size bit set implementation. This implementation is fast because it avoids
* safety/bound checking.
*/
class BitSet(numBits: Int) extends Serializable {
class BitSet(private[this] var numBits: Int) extends Externalizable {

private val words = new Array[Long](bit2words(numBits))
private val numWords = words.length
private var words = new Array[Long](bit2words(numBits))
private def numWords = words.length

def this() = this(0)

/**
* Compute the capacity (number of bits) that can be represented
Expand Down Expand Up @@ -230,4 +237,19 @@ class BitSet(numBits: Int) extends Serializable {

/** Return the number of longs it would take to hold numBits. */
private def bit2words(numBits: Int) = ((numBits - 1) >> 6) + 1

override def writeExternal(out: ObjectOutput): Unit = UUtils.tryOrIOException {
out.writeInt(numBits)
words.foreach(out.writeLong(_))
}

override def readExternal(in: ObjectInput): Unit = UUtils.tryOrIOException {
numBits = in.readInt()
words = new Array[Long](bit2words(numBits))
var index = 0
while (index < words.length) {
words(index) = in.readLong()
index += 1
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -322,12 +322,6 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
val conf = new SparkConf(false)
conf.set("spark.kryo.registrationRequired", "true")

// these cases require knowing the internals of RoaringBitmap a little. Blocks span 2^16
// values, and they use a bitmap (dense) if they have more than 4096 values, and an
// array (sparse) if they use less. So we just create two cases, one sparse and one dense.
// and we use a roaring bitmap for the empty blocks, so we trigger the dense case w/ mostly
// empty blocks

val ser = new KryoSerializer(conf).newInstance()
val denseBlockSizes = new Array[Long](5000)
val sparseBlockSizes = Array[Long](0L, 1L, 0L, 2L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.spark.util.collection

import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}

import org.apache.spark.SparkFunSuite
import org.apache.spark.util.{Utils => UUtils}

class BitSetSuite extends SparkFunSuite {

Expand Down Expand Up @@ -152,4 +155,50 @@ class BitSetSuite extends SparkFunSuite {
assert(bitsetDiff.nextSetBit(85) === 85)
assert(bitsetDiff.nextSetBit(86) === -1)
}

test("read and write externally") {
val tempDir = UUtils.createTempDir()
val outputFile = File.createTempFile("bits", null, tempDir)

val fos = new FileOutputStream(outputFile)
val oos = new ObjectOutputStream(fos)

// Create BitSet
val setBits = Seq(0, 9, 1, 10, 90, 96)
val bitset = new BitSet(100)

for (i <- 0 until 100) {
assert(!bitset.get(i))
}

setBits.foreach(i => bitset.set(i))

for (i <- 0 until 100) {
if (setBits.contains(i)) {
assert(bitset.get(i))
} else {
assert(!bitset.get(i))
}
}
assert(bitset.cardinality() === setBits.size)

bitset.writeExternal(oos)
oos.close()

val fis = new FileInputStream(outputFile)
val ois = new ObjectInputStream(fis)

// Read BitSet from the file
val bitset2 = new BitSet(0)
bitset2.readExternal(ois)

for (i <- 0 until 100) {
if (setBits.contains(i)) {
assert(bitset2.get(i))
} else {
assert(!bitset2.get(i))
}
}
assert(bitset2.cardinality() === setBits.size)
}
}
5 changes: 0 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -623,11 +623,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
<version>0.4.5</version>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
Expand Down