Skip to content

Commit 8ce2a68

Browse files
committed
1 parent 1270e71 commit 8ce2a68

File tree

2 files changed

+19
-0
lines changed

2 files changed

+19
-0
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,7 @@ public void loadBytes(ColumnVector.Array array) {
517517
protected void reserveInternal(int newCapacity) {
518518
int oldCapacity = (this.data == 0L) ? 0 : capacity;
519519
if (this.resultArray != null) {
520+
oldCapacity = (this.lengthData == 0L) ? 0 : capacity;
520521
this.lengthData =
521522
Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 4);
522523
this.offsetData =

sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,4 +198,22 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
198198
assert(array.get(1, schema).asInstanceOf[ColumnarBatch.Row].get(0, IntegerType) === 456)
199199
assert(array.get(1, schema).asInstanceOf[ColumnarBatch.Row].get(1, DoubleType) === 5.67)
200200
}
201+
202+
test("[SPARK-22092] off-heap column vector reallocation corrupts array data") {
203+
val arrayType = ArrayType(IntegerType, true)
204+
testVector = new OffHeapColumnVector(8, arrayType)
205+
206+
val data = testVector.arrayData()
207+
(0 until 8).foreach(i => data.putInt(i, i))
208+
(0 until 8).foreach(i => testVector.putArray(i, i, 1))
209+
210+
// Increase vector's capacity and reallocate the data to new bigger buffers.
211+
testVector.reserve(16)
212+
213+
// Check that none of the values got lost/overwritten.
214+
val array = new ColumnVector.Array(testVector)
215+
(0 until 8).foreach { i =>
216+
assert(array.get(i, arrayType).asInstanceOf[ArrayData].toIntArray() === Array(i))
217+
}
218+
}
201219
}

0 commit comments

Comments
 (0)