Skip to content

Commit b0b2f30

Browse files
committed
Merge pull request #1 from zhuguangbin/v1.1.0+
make SparkSQL in-memory columnar storage level configurable
2 parents 12ce731 + cbd4093 commit b0b2f30

File tree

7 files changed

+26
-11
lines changed

7 files changed

+26
-11
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql
1919

20+
import org.apache.spark.storage.StorageLevel
2021
import scala.collection.immutable
2122
import scala.collection.JavaConversions._
2223

@@ -26,6 +27,7 @@ import java.util.Properties
2627
private[spark] object SQLConf {
2728
val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed"
2829
val COLUMN_BATCH_SIZE = "spark.sql.inMemoryColumnarStorage.batchSize"
30+
val STORAGE_LEVEL = "spark.sql.inMemoryColumnarStorage.level"
2931
val AUTO_BROADCASTJOIN_THRESHOLD = "spark.sql.autoBroadcastJoinThreshold"
3032
val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes"
3133
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
@@ -85,6 +87,10 @@ trait SQLConf {
8587
/** The number of rows that will be */
8688
private[spark] def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE, "1000").toInt
8789

90+
/** The Storage Level of cached RDD */
91+
private[spark] def storageLevel: StorageLevel =
92+
StorageLevel.fromString(getConf(STORAGE_LEVEL, "MEMORY_ONLY"))
93+
8894
/** Number of partitions to use for shuffle operators. */
8995
private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS, "200").toInt
9096

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
275275
currentTable.logicalPlan
276276

277277
case _ =>
278-
InMemoryRelation(useCompression, columnBatchSize, executePlan(currentTable).executedPlan)
278+
InMemoryRelation(useCompression, columnBatchSize, storageLevel,
279+
executePlan(currentTable).executedPlan)
279280
}
280281

281282
catalog.registerTable(None, tableName, asInMemoryRelation)
@@ -286,7 +287,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
286287
table(tableName).queryExecution.analyzed match {
287288
// This is kind of a hack to make sure that if this was just an RDD registered as a table,
288289
// we reregister the RDD as a table.
289-
case inMem @ InMemoryRelation(_, _, _, e: ExistingRdd) =>
290+
case inMem @ InMemoryRelation(_, _, _, _, e: ExistingRdd) =>
290291
inMem.cachedColumnBuffers.unpersist()
291292
catalog.unregisterTable(None, tableName)
292293
catalog.registerTable(None, tableName, SparkLogicalPlan(e)(self))

sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,22 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
2525
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow}
2626
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2727
import org.apache.spark.sql.execution.{LeafNode, SparkPlan}
28+
import org.apache.spark.storage.StorageLevel
2829

2930
object InMemoryRelation {
30-
def apply(useCompression: Boolean, batchSize: Int, child: SparkPlan): InMemoryRelation =
31-
new InMemoryRelation(child.output, useCompression, batchSize, child)()
31+
def apply(
32+
useCompression: Boolean,
33+
batchSize: Int,
34+
storageLevel: StorageLevel,
35+
child: SparkPlan): InMemoryRelation =
36+
new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child)()
3237
}
3338

3439
private[sql] case class InMemoryRelation(
3540
output: Seq[Attribute],
3641
useCompression: Boolean,
3742
batchSize: Int,
43+
storageLevel: StorageLevel,
3844
child: SparkPlan)
3945
(private var _cachedColumnBuffers: RDD[Array[ByteBuffer]] = null)
4046
extends LogicalPlan with MultiInstanceRelation {
@@ -73,7 +79,7 @@ private[sql] case class InMemoryRelation(
7379

7480
def hasNext = baseIterator.hasNext
7581
}
76-
}.cache()
82+
}.persist(storageLevel)
7783

7884
cached.setName(child.toString)
7985
_cachedColumnBuffers = cached
@@ -87,6 +93,7 @@ private[sql] case class InMemoryRelation(
8793
output.map(_.newInstance),
8894
useCompression,
8995
batchSize,
96+
storageLevel,
9097
child)(
9198
_cachedColumnBuffers).asInstanceOf[this.type]
9299
}

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class CachedTableSuite extends QueryTest {
4747

4848
cacheTable("testData")
4949
table("testData").queryExecution.analyzed match {
50-
case InMemoryRelation(_, _, _, _: InMemoryColumnarTableScan) =>
50+
case InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan) =>
5151
fail("cacheTable is not idempotent")
5252

5353
case _ =>

sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@ package org.apache.spark.sql.columnar
2020
import org.apache.spark.sql.catalyst.expressions.Row
2121
import org.apache.spark.sql.test.TestSQLContext
2222
import org.apache.spark.sql.{SQLConf, QueryTest, TestData}
23+
import org.apache.spark.storage.StorageLevel
2324

2425
class InMemoryColumnarQuerySuite extends QueryTest {
2526
import org.apache.spark.sql.TestData._
2627
import org.apache.spark.sql.test.TestSQLContext._
2728

2829
test("simple columnar query") {
2930
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
30-
val scan = InMemoryRelation(useCompression = true, 5, plan)
31+
val scan = InMemoryRelation(useCompression = true, 5, StorageLevel.MEMORY_ONLY, plan)
3132

3233
checkAnswer(scan, testData.collect().toSeq)
3334
}
@@ -42,7 +43,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
4243

4344
test("projection") {
4445
val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan
45-
val scan = InMemoryRelation(useCompression = true, 5, plan)
46+
val scan = InMemoryRelation(useCompression = true, 5, StorageLevel.MEMORY_ONLY, plan)
4647

4748
checkAnswer(scan, testData.collect().map {
4849
case Row(key: Int, value: String) => value -> key
@@ -51,7 +52,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
5152

5253
test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
5354
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
54-
val scan = InMemoryRelation(useCompression = true, 5, plan)
55+
val scan = InMemoryRelation(useCompression = true, 5, StorageLevel.MEMORY_ONLY, plan)
5556

5657
checkAnswer(scan, testData.collect().toSeq)
5758
checkAnswer(scan, testData.collect().toSeq)

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
137137
castChildOutput(p, table, child)
138138

139139
case p @ logical.InsertIntoTable(
140-
InMemoryRelation(_, _, _,
140+
InMemoryRelation(_, _, _, _,
141141
HiveTableScan(_, table, _)), _, child, _) =>
142142
castChildOutput(p, table, child)
143143
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ private[hive] trait HiveStrategies {
160160
case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) =>
161161
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
162162
case logical.InsertIntoTable(
163-
InMemoryRelation(_, _, _,
163+
InMemoryRelation(_, _, _, _,
164164
HiveTableScan(_, table, _)), partition, child, overwrite) =>
165165
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
166166
case _ => Nil

0 commit comments

Comments
 (0)