Skip to content

Commit c83dbcf

Browse files
wakunwankunde
authored andcommitted
[SPARK-44239][SQL] Free memory allocated by large vectors when vectors are reset (apache#237)
Add a memory reserve policy for WritableColumnVector: * If the vector capacity < VECTORIZED_HUGE_VECTOR_THRESHOLD, will reserve requested capacity * 2 memory. * If the vector capacity >= VECTORIZED_HUGE_VECTOR_THRESHOLD, will reserve requested capacity * VECTORIZED_HUGE_VECTOR_RESERVE_RATIO memory. * Free the WritableColumnVector memory if the vector capacity >= VECTORIZED_HUGE_VECTOR_THRESHOLD which will reuse the allocated array object for small column vectors and free the memory for huge column vectors. When spark reads a data file into a WritableColumnVector, the memory allocated by the WritableColumnVectors is not freed until the VectorizedColumnReader completes. It will save memory allocation time by reusing the allocated array objects. But it also takes up too many unused memory after the current large vector batch has been read. Add a memory reserve policy for this scenario which will reuse the allocated array object for small column vectors and free the memory for huge column vectors. ![image](https://github.com/apache/spark/assets/3626747/a7a487bd-f184-4b24-bea0-75e530702887) ![image](https://github.com/apache/spark/assets/3626747/01d0268f-68e7-416f-b9b3-6c9d60919596) No Added UT Closes apache#41782 from wankunde/vector. Lead-authored-by: Kun Wan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> Co-authored-by: Kun Wan <[email protected]>
1 parent 246172c commit c83dbcf

File tree

5 files changed

+100
-9
lines changed

5 files changed

+100
-9
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,26 @@ object SQLConf {
492492
.intConf
493493
.createWithDefault(10000)
494494

495+
val VECTORIZED_HUGE_VECTOR_RESERVE_RATIO =
496+
buildConf("spark.sql.inMemoryColumnarStorage.hugeVectorReserveRatio")
497+
.doc("When spark.sql.inMemoryColumnarStorage.hugeVectorThreshold <= 0 or the required " +
498+
"memory is smaller than spark.sql.inMemoryColumnarStorage.hugeVectorThreshold, spark " +
499+
"reserves required memory * 2 memory; otherwise, spark reserves " +
500+
"required memory * this ratio memory, and will release this column vector memory before " +
501+
"reading the next batch rows.")
502+
.version("4.0.0")
503+
.doubleConf
504+
.createWithDefault(1.2)
505+
506+
val VECTORIZED_HUGE_VECTOR_THRESHOLD =
507+
buildConf("spark.sql.inMemoryColumnarStorage.hugeVectorThreshold")
508+
.doc("When the required memory is larger than this, spark reserves required memory * " +
509+
s"${VECTORIZED_HUGE_VECTOR_RESERVE_RATIO.key} memory next time and release this column " +
510+
s"vector memory before reading the next batch rows. -1 means disabling the optimization.")
511+
.version("4.0.0")
512+
.bytesConf(ByteUnit.BYTE)
513+
.createWithDefault(-1)
514+
495515
val IN_MEMORY_PARTITION_PRUNING =
496516
buildConf("spark.sql.inMemoryColumnarStorage.partitionPruning")
497517
.internal()
@@ -5127,6 +5147,10 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
51275147

51285148
def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)
51295149

5150+
def vectorizedHugeVectorThreshold: Int = getConf(VECTORIZED_HUGE_VECTOR_THRESHOLD).toInt
5151+
5152+
def vectorizedHugeVectorReserveRatio: Double = getConf(VECTORIZED_HUGE_VECTOR_RESERVE_RATIO)
5153+
51305154
def cacheVectorizedReaderEnabled: Boolean = getConf(CACHE_VECTORIZED_READER_ENABLED)
51315155

51325156
def defaultNumShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,7 @@ public long valuesNativeAddress() {
8484
return data;
8585
}
8686

87-
@Override
88-
public void close() {
89-
super.close();
87+
protected void releaseMemory() {
9088
Platform.freeMemory(nulls);
9189
Platform.freeMemory(data);
9290
Platform.freeMemory(lengthData);
@@ -97,6 +95,11 @@ public void close() {
9795
offsetData = 0;
9896
}
9997

98+
@Override
99+
public void close() {
100+
super.close();
101+
}
102+
100103
//
101104
// APIs dealing with nulls
102105
//

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,7 @@ public OnHeapColumnVector(int capacity, DataType type) {
8080
reset();
8181
}
8282

83-
@Override
84-
public void close() {
85-
super.close();
83+
protected void releaseMemory() {
8684
nulls = null;
8785
byteData = null;
8886
shortData = null;
@@ -94,6 +92,11 @@ public void close() {
9492
arrayOffsets = null;
9593
}
9694

95+
@Override
96+
public void close() {
97+
super.close();
98+
}
99+
97100
//
98101
// APIs dealing with nulls
99102
//

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@
5353
public abstract class WritableColumnVector extends ColumnVector {
5454
private final byte[] byte8 = new byte[8];
5555

56+
protected abstract void releaseMemory();
57+
5658
/**
5759
* Resets this column for writing. The currently stored values are no longer accessible.
5860
*/
@@ -69,6 +71,12 @@ public void reset() {
6971
putNotNulls(0, capacity);
7072
numNulls = 0;
7173
}
74+
75+
if (hugeVectorThreshold > 0 && capacity > hugeVectorThreshold) {
76+
capacity = defaultCapacity;
77+
releaseMemory();
78+
reserveInternal(capacity);
79+
}
7280
}
7381

7482
@Override
@@ -85,6 +93,7 @@ public void close() {
8593
dictionaryIds = null;
8694
}
8795
dictionary = null;
96+
releaseMemory();
8897
}
8998

9099
public void reserveAdditional(int additionalCapacity) {
@@ -95,7 +104,10 @@ public void reserve(int requiredCapacity) {
95104
if (requiredCapacity < 0) {
96105
throwUnsupportedException(requiredCapacity, null);
97106
} else if (requiredCapacity > capacity) {
98-
int newCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L);
107+
int newCapacity =
108+
hugeVectorThreshold < 0 || requiredCapacity < hugeVectorThreshold ?
109+
(int) Math.min(MAX_CAPACITY, requiredCapacity * 2L) :
110+
(int) Math.min(MAX_CAPACITY, requiredCapacity * hugeVectorReserveRatio);
99111
if (requiredCapacity <= newCapacity) {
100112
try {
101113
reserveInternal(newCapacity);
@@ -846,7 +858,14 @@ public final void addElementsAppended(int num) {
846858
/**
847859
* Marks this column as being constant.
848860
*/
849-
public final void setIsConstant() { isConstant = true; }
861+
public final void setIsConstant() {
862+
if (childColumns != null) {
863+
for (WritableColumnVector c : childColumns) {
864+
c.setIsConstant();
865+
}
866+
}
867+
isConstant = true;
868+
}
850869

851870
/**
852871
* Marks this column only contains null values.
@@ -867,12 +886,21 @@ public final boolean isAllNull() {
867886
*/
868887
protected int capacity;
869888

889+
/**
890+
* The default number of rows that can be stored in this column.
891+
*/
892+
protected final int defaultCapacity;
893+
870894
/**
871895
* Upper limit for the maximum capacity for this column.
872896
*/
873897
@VisibleForTesting
874898
protected int MAX_CAPACITY = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
875899

900+
protected int hugeVectorThreshold;
901+
902+
protected double hugeVectorReserveRatio;
903+
876904
/**
877905
* Number of nulls in this column. This is an optimization for the reader, to skip NULL checks.
878906
*/
@@ -922,6 +950,9 @@ protected boolean isArray() {
922950
protected WritableColumnVector(int capacity, DataType dataType) {
923951
super(dataType);
924952
this.capacity = capacity;
953+
this.defaultCapacity = capacity;
954+
this.hugeVectorThreshold = SQLConf.get().vectorizedHugeVectorThreshold();
955+
this.hugeVectorReserveRatio = SQLConf.get().vectorizedHugeVectorReserveRatio();
925956

926957
if (isArray()) {
927958
DataType childType;

sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@ package org.apache.spark.sql.execution.vectorized
1919

2020
import org.apache.spark.SparkFunSuite
2121
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
22+
import org.apache.spark.sql.catalyst.plans.SQLHelper
2223
import org.apache.spark.sql.execution.columnar.{ColumnAccessor, ColumnDictionary}
2324
import org.apache.spark.sql.execution.columnar.compression.ColumnBuilderHelper
25+
import org.apache.spark.sql.internal.SQLConf
2426
import org.apache.spark.sql.types._
2527
import org.apache.spark.sql.vectorized.ColumnarArray
2628
import org.apache.spark.unsafe.types.UTF8String
2729

28-
class ColumnVectorSuite extends SparkFunSuite {
30+
class ColumnVectorSuite extends SparkFunSuite with SQLHelper {
2931
private def withVector(
3032
vector: WritableColumnVector)(
3133
block: WritableColumnVector => Unit): Unit = {
@@ -666,6 +668,34 @@ class ColumnVectorSuite extends SparkFunSuite {
666668
}
667669
}
668670

671+
test("SPARK-44239: Test column vector reserve policy") {
672+
withSQLConf(
673+
SQLConf.VECTORIZED_HUGE_VECTOR_THRESHOLD.key -> "300",
674+
SQLConf.VECTORIZED_HUGE_VECTOR_RESERVE_RATIO.key -> "1.2") {
675+
val dataType = ByteType
676+
677+
Array(new OnHeapColumnVector(80, dataType),
678+
new OffHeapColumnVector(80, dataType)).foreach { vector =>
679+
try {
680+
// The new capacity of small vector = request capacity * 2 and will not be reset
681+
vector.appendBytes(100, 0)
682+
assert(vector.capacity == 200)
683+
vector.reset()
684+
assert(vector.capacity == 200)
685+
686+
// The new capacity of huge vector = (request capacity - HUGE_VECTOR_THRESHOLD) * 1.2 +
687+
// HUGE_VECTOR_THRESHOLD * 2 = 300 * 1.2 and will be reset.
688+
vector.appendBytes(300, 0)
689+
assert(vector.capacity == 360)
690+
vector.reset()
691+
assert(vector.capacity == 80)
692+
} finally {
693+
vector.close()
694+
}
695+
}
696+
}
697+
}
698+
669699
DataTypeTestUtils.yearMonthIntervalTypes.foreach { dt =>
670700
val structType = new StructType().add(dt.typeName, dt)
671701
testVectors("ColumnarRow " + dt.typeName, 10, structType) { v =>

0 commit comments

Comments
 (0)