diff --git a/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala index 4d43d8d5cc8d..79e2818c9502 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala @@ -28,54 +28,30 @@ import scala.reflect.ClassTag * some keys to have very few elements. */ private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable { - // First two elements - private var element0: T = _ - private var element1: T = _ - // Number of elements, including our two in the main object private var curSize = 0 // Array for extra elements - private var otherElements: Array[T] = null + private var otherElements: Array[T] = new Array[T](2) def apply(position: Int): T = { if (position < 0 || position >= curSize) { throw new IndexOutOfBoundsException } - if (position == 0) { - element0 - } else if (position == 1) { - element1 - } else { - otherElements(position - 2) - } + otherElements(position) } private def update(position: Int, value: T): Unit = { if (position < 0 || position >= curSize) { throw new IndexOutOfBoundsException } - if (position == 0) { - element0 = value - } else if (position == 1) { - element1 = value - } else { - otherElements(position - 2) = value - } + otherElements(position) = value } def += (value: T): CompactBuffer[T] = { val newIndex = curSize - if (newIndex == 0) { - element0 = value - curSize = 1 - } else if (newIndex == 1) { - element1 = value - curSize = 2 - } else { - growToSize(curSize + 1) - otherElements(newIndex - 2) = value - } + growToSize(curSize + 1) + otherElements(newIndex) = value this } @@ -88,19 +64,7 @@ private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable val itsSize = compactBuf.curSize val itsElements = compactBuf.otherElements growToSize(curSize + itsSize) - if (itsSize == 1) { - this(oldSize) = compactBuf.element0 - } else if (itsSize == 2) { - this(oldSize) = compactBuf.element0 - this(oldSize + 1) = compactBuf.element1 - } else if (itsSize > 2) { - this(oldSize) = compactBuf.element0 - this(oldSize + 1) = compactBuf.element1 - // At this point our size is also above 2, so just copy its array directly into ours. - // Note that since we added two elements above, the index in this.otherElements that we - // should copy to is oldSize. - System.arraycopy(itsElements, 0, otherElements, oldSize, itsSize - 2) - } + System.arraycopy(itsElements, 0, otherElements, oldSize, itsSize) case _ => values.foreach(e => this += e) @@ -129,22 +93,20 @@ private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable if (newSize < 0) { throw new UnsupportedOperationException("Can't grow buffer past Int.MaxValue elements") } - val capacity = if (otherElements != null) otherElements.length + 2 else 2 + val capacity = otherElements.length if (newSize > capacity) { var newArrayLen = 8 - while (newSize - 2 > newArrayLen) { + while (newSize > newArrayLen) { newArrayLen *= 2 if (newArrayLen == Int.MinValue) { // Prevent overflow if we double from 2^30 to 2^31, which will become Int.MinValue. // Note that we set the new array length to Int.MaxValue - 2 so that our capacity // calculation above still gives a positive integer. - newArrayLen = Int.MaxValue - 2 + newArrayLen = Int.MaxValue } } val newArray = new Array[T](newArrayLen) - if (otherElements != null) { - System.arraycopy(otherElements, 0, newArray, 0, otherElements.length) - } + System.arraycopy(otherElements, 0, newArray, 0, otherElements.length) otherElements = newArray } curSize = newSize