Skip to content

Commit 9c19fc0

Browse files
committed
Add configuration options for heap vs. offheap
1 parent 6ffdaa1 commit 9c19fc0

File tree

5 files changed

+26
-6
lines changed

5 files changed

+26
-6
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ private[spark] object SQLConf {
3030
val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes"
3131
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
3232
val CODEGEN_ENABLED = "spark.sql.codegen"
33-
val UNSAFE_ENABLED = "spark.sql.unsafe"
33+
val UNSAFE_ENABLED = "spark.sql.unsafe.enabled"
34+
val UNSAFE_USE_OFF_HEAP = "spark.sql.unsafe.offHeap"
3435
val DIALECT = "spark.sql.dialect"
3536

3637
val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
@@ -150,8 +151,21 @@ private[sql] class SQLConf extends Serializable {
150151
*/
151152
private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, "false").toBoolean
152153

154+
/**
155+
* When set to true, Spark SQL will use managed memory for certain operations. This option only
156+
* takes effect if codegen is enabled.
157+
*
158+
* Defaults to false as this feature is currently experimental.
159+
*/
153160
private[spark] def unsafeEnabled: Boolean = getConf(UNSAFE_ENABLED, "false").toBoolean
154161

162+
/**
163+
* When set to true, Spark SQL will use off-heap memory allocation for managed memory operations.
164+
*
165+
* Defaults to false.
166+
*/
167+
private[spark] def unsafeUseOffHeap: Boolean = getConf(UNSAFE_USE_OFF_HEAP, "false").toBoolean
168+
155169
private[spark] def useSqlSerializer2: Boolean = getConf(USE_SQL_SERIALIZER2, "true").toBoolean
156170

157171
/**

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1013,6 +1013,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
10131013

10141014
def unsafeEnabled: Boolean = self.conf.unsafeEnabled
10151015

1016+
def unsafeUseOffHeap: Boolean = self.conf.unsafeUseOffHeap
1017+
10161018
def numPartitions: Int = self.conf.numShufflePartitions
10171019

10181020
def strategies: Seq[Strategy] =

sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,16 @@ case class AggregateEvaluation(
4343
* @param aggregateExpressions expressions that are computed for each group.
4444
* @param child the input data source.
4545
* @param unsafeEnabled whether to allow Unsafe-based aggregation buffers to be used.
46+
* @param useOffHeap whether to use off-heap allocation (only takes effect if unsafeEnabled=true)
4647
*/
4748
@DeveloperApi
4849
case class GeneratedAggregate(
4950
partial: Boolean,
5051
groupingExpressions: Seq[Expression],
5152
aggregateExpressions: Seq[NamedExpression],
5253
child: SparkPlan,
53-
unsafeEnabled: Boolean)
54+
unsafeEnabled: Boolean,
55+
useOffHeap: Boolean)
5456
extends UnaryNode {
5557

5658
override def requiredChildDistribution: Seq[Distribution] =
@@ -289,7 +291,7 @@ case class GeneratedAggregate(
289291
newAggregationBuffer(EmptyRow),
290292
aggregationBufferSchema,
291293
groupKeySchema,
292-
MemoryAllocator.UNSAFE,
294+
if (useOffHeap) MemoryAllocator.UNSAFE else MemoryAllocator.HEAP,
293295
1024 * 16,
294296
false
295297
)

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
141141
groupingExpressions,
142142
partialComputation,
143143
planLater(child),
144-
unsafeEnabled),
145-
unsafeEnabled) :: Nil
144+
unsafeEnabled,
145+
unsafeUseOffHeap),
146+
unsafeEnabled,
147+
unsafeUseOffHeap) :: Nil
146148

147149
// Cases where some aggregate can not be codegened
148150
case PartialAggregation(

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql
1919

2020
import org.scalatest.BeforeAndAfterAll
2121

22-
import org.apache.spark.sql.execution.{GeneratedAggregate}
22+
import org.apache.spark.sql.execution.GeneratedAggregate
2323
import org.apache.spark.sql.functions._
2424
import org.apache.spark.sql.TestData._
2525
import org.apache.spark.sql.test.TestSQLContext

0 commit comments

Comments
 (0)