Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public final class LongArray {
private final long length;

public LongArray(MemoryBlock memory) {
assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size > 4 billion elements";
assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size >= Integer.MAX_VALUE elements";
this.memory = memory;
this.baseObj = memory.getBaseObject();
this.baseOffset = memory.getBaseOffset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,17 @@ public interface HashMapGrowthStrategy {
HashMapGrowthStrategy DOUBLING = new Doubling();

class Doubling implements HashMapGrowthStrategy {

// Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat
// smaller. Be conservative and lower the cap a little.
private static final int ARRAY_MAX = Integer.MAX_VALUE - 8;

@Override
public int nextCapacity(int currentCapacity) {
assert (currentCapacity > 0);
int doubleCapacity = currentCapacity * 2;
// Guard against overflow
return (currentCapacity * 2 > 0) ? (currentCapacity * 2) : Integer.MAX_VALUE;
return (doubleCapacity > 0 && doubleCapacity <= ARRAY_MAX) ? doubleCapacity : ARRAY_MAX;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,22 +126,22 @@ private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable

/** Increase our size to newSize and grow the backing array if needed. */
private def growToSize(newSize: Int): Unit = {
if (newSize < 0) {
throw new UnsupportedOperationException("Can't grow buffer past Int.MaxValue elements")
// Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat
// smaller. Be conservative and lower the cap a little.
val arrayMax = Int.MaxValue - 8
if (newSize < 0 || newSize - 2 > arrayMax) {
throw new UnsupportedOperationException(s"Can't grow buffer past $arrayMax elements")
}
val capacity = if (otherElements != null) otherElements.length + 2 else 2
if (newSize > capacity) {
var newArrayLen = 8
var newArrayLen = 8L
while (newSize - 2 > newArrayLen) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove - 2 now since newArrayLen is a Long

Copy link

@ascii766164696D ascii766164696D Sep 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I see that it's reserved, wasn't clear to me why - 2, seems like a magic number to me, I see it in other places too.

newArrayLen *= 2
if (newArrayLen == Int.MinValue) {
// Prevent overflow if we double from 2^30 to 2^31, which will become Int.MinValue.
// Note that we set the new array length to Int.MaxValue - 2 so that our capacity
// calculation above still gives a positive integer.
newArrayLen = Int.MaxValue - 2
}
}
val newArray = new Array[T](newArrayLen)
if (newArrayLen > arrayMax) {
newArrayLen = arrayMax
}
val newArray = new Array[T](newArrayLen.toInt)
if (otherElements != null) {
System.arraycopy(otherElements, 0, newArray, 0, otherElements.length)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.util.collection.WritablePartitionedPairCollection._
* Append-only buffer of key-value pairs, each with a corresponding partition ID, that keeps track
* of its estimated size in bytes.
*
* The buffer can support up to `1073741823 (2 ^ 30 - 1)` elements.
* The buffer can support up to 1073741819 elements.
*/
private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
extends WritablePartitionedPairCollection[K, V] with SizeTracker
Expand Down Expand Up @@ -59,7 +59,7 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
throw new IllegalStateException(s"Can't insert more than ${MAXIMUM_CAPACITY} elements")
}
val newCapacity =
if (capacity * 2 < 0 || capacity * 2 > MAXIMUM_CAPACITY) { // Overflow
if (capacity * 2 > MAXIMUM_CAPACITY) { // Overflow
MAXIMUM_CAPACITY
} else {
capacity * 2
Expand Down Expand Up @@ -96,5 +96,7 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
}

private object PartitionedPairBuffer {
val MAXIMUM_CAPACITY = Int.MaxValue / 2 // 2 ^ 30 - 1
// Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat
// smaller. Be conservative and lower the cap a little.
val MAXIMUM_CAPACITY: Int = (Int.MaxValue - 8) / 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
* if the fields of row are all fixed-length, as the size of result row is also fixed.
*/
public class BufferHolder {

// Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat
// smaller. Be conservative and lower the cap a little.
private static final int ARRAY_MAX = Integer.MAX_VALUE - 8;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious how to get this value -8?

Also cc @liufengdb

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not also fixing line 54?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen I think we can use Integer.MAX_VALUE - 7 instead of Integer.MAX_VALUE - 8 to make the size align with words, otherwise, this check will fail: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java#L170.

This is the reason why all the size inputs to the methods are rounded, for example, https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java#L216.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile have a look at the JIRA for some detail; you can see a similar limit in the JDK at for example http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/tip/src/share/classes/java/util/ArrayList.java#l229

You are right, I think around line 54 needs to be something straightforward like:

long totalSize = initialSize + bitsetWidthInBytes + 8L * row.numFields();
if (totalSize > ARRAY_MAX) { ...error... }
this.buffer = new byte[(int) totalSize];

Yes I agree with your new JIRA @liufengdb though think we'll need to go the other way to Integer.MAX_VALUE - 15 where the value must be divisible by 8.


public byte[] buffer;
public int cursor = Platform.BYTE_ARRAY_OFFSET;
private final UnsafeRow row;
Expand All @@ -61,15 +66,15 @@ public BufferHolder(UnsafeRow row, int initialSize) {
* Grows the buffer by at least neededSize and points the row to the buffer.
*/
public void grow(int neededSize) {
if (neededSize > Integer.MAX_VALUE - totalSize()) {
if (neededSize > ARRAY_MAX - totalSize()) {
throw new UnsupportedOperationException(
"Cannot grow BufferHolder by size " + neededSize + " because the size after growing " +
"exceeds size limitation " + Integer.MAX_VALUE);
"exceeds size limitation " + ARRAY_MAX);
}
final int length = totalSize() + neededSize;
if (buffer.length < length) {
// This will not happen frequently, because the buffer is re-used.
int newLength = length < Integer.MAX_VALUE / 2 ? length * 2 : Integer.MAX_VALUE;
int newLength = length < ARRAY_MAX / 2 ? length * 2 : ARRAY_MAX;
final byte[] tmp = new byte[newLength];
Platform.copyMemory(
buffer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ public final int appendStruct(boolean isNull) {
* Upper limit for the maximum capacity for this column.
*/
@VisibleForTesting
protected int MAX_CAPACITY = Integer.MAX_VALUE;
protected int MAX_CAPACITY = Integer.MAX_VALUE - 8;

/**
* Number of nulls in this column. This is an optimization for the reader, to skip NULL checks.
Expand Down