Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,54 +28,30 @@ import scala.reflect.ClassTag
* some keys to have very few elements.
*/
private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable {
// First two elements
private var element0: T = _
private var element1: T = _

// Number of elements, including our two in the main object
private var curSize = 0

// Array for extra elements
private var otherElements: Array[T] = null
private var otherElements: Array[T] = new Array[T](2)

def apply(position: Int): T = {
if (position < 0 || position >= curSize) {
throw new IndexOutOfBoundsException
}
if (position == 0) {
element0
} else if (position == 1) {
element1
} else {
otherElements(position - 2)
}
otherElements(position)
}

private def update(position: Int, value: T): Unit = {
if (position < 0 || position >= curSize) {
throw new IndexOutOfBoundsException
}
if (position == 0) {
element0 = value
} else if (position == 1) {
element1 = value
} else {
otherElements(position - 2) = value
}
otherElements(position) = value
}

def += (value: T): CompactBuffer[T] = {
val newIndex = curSize
if (newIndex == 0) {
element0 = value
curSize = 1
} else if (newIndex == 1) {
element1 = value
curSize = 2
} else {
growToSize(curSize + 1)
otherElements(newIndex - 2) = value
}
growToSize(curSize + 1)
otherElements(newIndex) = value
this
}

Expand All @@ -88,19 +64,7 @@ private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable
val itsSize = compactBuf.curSize
val itsElements = compactBuf.otherElements
growToSize(curSize + itsSize)
if (itsSize == 1) {
this(oldSize) = compactBuf.element0
} else if (itsSize == 2) {
this(oldSize) = compactBuf.element0
this(oldSize + 1) = compactBuf.element1
} else if (itsSize > 2) {
this(oldSize) = compactBuf.element0
this(oldSize + 1) = compactBuf.element1
// At this point our size is also above 2, so just copy its array directly into ours.
// Note that since we added two elements above, the index in this.otherElements that we
// should copy to is oldSize.
System.arraycopy(itsElements, 0, otherElements, oldSize, itsSize - 2)
}
System.arraycopy(itsElements, 0, otherElements, oldSize, itsSize)

case _ =>
values.foreach(e => this += e)
Expand Down Expand Up @@ -129,22 +93,20 @@ private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable
if (newSize < 0) {
throw new UnsupportedOperationException("Can't grow buffer past Int.MaxValue elements")
}
val capacity = if (otherElements != null) otherElements.length + 2 else 2
val capacity = otherElements.length
if (newSize > capacity) {
var newArrayLen = 8
while (newSize - 2 > newArrayLen) {
while (newSize > newArrayLen) {
newArrayLen *= 2
if (newArrayLen == Int.MinValue) {
// Prevent overflow if we double from 2^30 to 2^31, which will become Int.MinValue.
// Note that we set the new array length to Int.MaxValue - 2 so that our capacity
// calculation above still gives a positive integer.
newArrayLen = Int.MaxValue - 2
newArrayLen = Int.MaxValue
}
}
val newArray = new Array[T](newArrayLen)
if (otherElements != null) {
System.arraycopy(otherElements, 0, newArray, 0, otherElements.length)
}
System.arraycopy(otherElements, 0, newArray, 0, otherElements.length)
otherElements = newArray
}
curSize = newSize
Expand Down