Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql

import org.apache.spark.storage.StorageLevel
import scala.collection.immutable
import scala.collection.JavaConversions._

Expand All @@ -26,6 +27,7 @@ import java.util.Properties
private[spark] object SQLConf {
val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed"
val COLUMN_BATCH_SIZE = "spark.sql.inMemoryColumnarStorage.batchSize"
val STORAGE_LEVEL = "spark.sql.inMemoryColumnarStorage.level"
val AUTO_BROADCASTJOIN_THRESHOLD = "spark.sql.autoBroadcastJoinThreshold"
val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes"
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
Expand Down Expand Up @@ -85,6 +87,10 @@ trait SQLConf {
/** The number of rows that will be */
private[spark] def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE, "1000").toInt

/** The Storage Level of cached RDD */
private[spark] def storageLevel: StorageLevel =
StorageLevel.fromString(getConf(STORAGE_LEVEL, "MEMORY_ONLY"))

/** Number of partitions to use for shuffle operators. */
private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS, "200").toInt

Expand Down
5 changes: 3 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
currentTable.logicalPlan

case _ =>
InMemoryRelation(useCompression, columnBatchSize, executePlan(currentTable).executedPlan)
InMemoryRelation(useCompression, columnBatchSize, storageLevel,
executePlan(currentTable).executedPlan)
}

catalog.registerTable(None, tableName, asInMemoryRelation)
Expand All @@ -286,7 +287,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
table(tableName).queryExecution.analyzed match {
// This is kind of a hack to make sure that if this was just an RDD registered as a table,
// we reregister the RDD as a table.
case inMem @ InMemoryRelation(_, _, _, e: ExistingRdd) =>
case inMem @ InMemoryRelation(_, _, _, _, e: ExistingRdd) =>
inMem.cachedColumnBuffers.unpersist()
catalog.unregisterTable(None, tableName)
catalog.registerTable(None, tableName, SparkLogicalPlan(e)(self))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,22 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{LeafNode, SparkPlan}
import org.apache.spark.storage.StorageLevel

object InMemoryRelation {
def apply(useCompression: Boolean, batchSize: Int, child: SparkPlan): InMemoryRelation =
new InMemoryRelation(child.output, useCompression, batchSize, child)()
def apply(
useCompression: Boolean,
batchSize: Int,
storageLevel: StorageLevel,
child: SparkPlan): InMemoryRelation =
new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child)()
}

private[sql] case class InMemoryRelation(
output: Seq[Attribute],
useCompression: Boolean,
batchSize: Int,
storageLevel: StorageLevel,
child: SparkPlan)
(private var _cachedColumnBuffers: RDD[Array[ByteBuffer]] = null)
extends LogicalPlan with MultiInstanceRelation {
Expand Down Expand Up @@ -73,7 +79,7 @@ private[sql] case class InMemoryRelation(

def hasNext = baseIterator.hasNext
}
}.cache()
}.persist(storageLevel)

cached.setName(child.toString)
_cachedColumnBuffers = cached
Expand All @@ -87,6 +93,7 @@ private[sql] case class InMemoryRelation(
output.map(_.newInstance),
useCompression,
batchSize,
storageLevel,
child)(
_cachedColumnBuffers).asInstanceOf[this.type]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class CachedTableSuite extends QueryTest {

cacheTable("testData")
table("testData").queryExecution.analyzed match {
case InMemoryRelation(_, _, _, _: InMemoryColumnarTableScan) =>
case InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan) =>
fail("cacheTable is not idempotent")

case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ package org.apache.spark.sql.columnar
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.{SQLConf, QueryTest, TestData}
import org.apache.spark.storage.StorageLevel

class InMemoryColumnarQuerySuite extends QueryTest {
import org.apache.spark.sql.TestData._
import org.apache.spark.sql.test.TestSQLContext._

test("simple columnar query") {
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
val scan = InMemoryRelation(useCompression = true, 5, plan)
val scan = InMemoryRelation(useCompression = true, 5, StorageLevel.MEMORY_ONLY, plan)

checkAnswer(scan, testData.collect().toSeq)
}
Expand All @@ -42,7 +43,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {

test("projection") {
val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan
val scan = InMemoryRelation(useCompression = true, 5, plan)
val scan = InMemoryRelation(useCompression = true, 5, StorageLevel.MEMORY_ONLY, plan)

checkAnswer(scan, testData.collect().map {
case Row(key: Int, value: String) => value -> key
Expand All @@ -51,7 +52,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {

test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
val scan = InMemoryRelation(useCompression = true, 5, plan)
val scan = InMemoryRelation(useCompression = true, 5, StorageLevel.MEMORY_ONLY, plan)

checkAnswer(scan, testData.collect().toSeq)
checkAnswer(scan, testData.collect().toSeq)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
castChildOutput(p, table, child)

case p @ logical.InsertIntoTable(
InMemoryRelation(_, _, _,
InMemoryRelation(_, _, _, _,
HiveTableScan(_, table, _)), _, child, _) =>
castChildOutput(p, table, child)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private[hive] trait HiveStrategies {
case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
case logical.InsertIntoTable(
InMemoryRelation(_, _, _,
InMemoryRelation(_, _, _, _,
HiveTableScan(_, table, _)), partition, child, overwrite) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
case _ => Nil
Expand Down