Skip to content

Commit 882c538

Browse files
committed
Remove attributes field from InMemoryColumnarTableScan
1 parent 32cc9ce commit 882c538

File tree

6 files changed

+14
-17
lines changed

6 files changed

+14
-17
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
121121
def cacheTable(tableName: String): Unit = {
122122
val currentTable = catalog.lookupRelation(None, tableName)
123123
val asInMemoryRelation =
124-
InMemoryColumnarTableScan(currentTable.output, executePlan(currentTable).executedPlan)
124+
InMemoryColumnarTableScan(executePlan(currentTable).executedPlan)
125125

126126
catalog.registerTable(None, tableName, SparkLogicalPlan(asInMemoryRelation))
127127
}
@@ -131,7 +131,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
131131
EliminateAnalysisOperators(catalog.lookupRelation(None, tableName)) match {
132132
// This is kind of a hack to make sure that if this was just an RDD registered as a table,
133133
// we reregister the RDD as a table.
134-
case SparkLogicalPlan(inMem @ InMemoryColumnarTableScan(_, e: ExistingRdd)) =>
134+
case SparkLogicalPlan(inMem @ InMemoryColumnarTableScan(e: ExistingRdd)) =>
135135
inMem.cachedColumnBuffers.unpersist()
136136
catalog.unregisterTable(None, tableName)
137137
catalog.registerTable(None, tableName, SparkLogicalPlan(e))

sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,26 +21,24 @@ import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Attribute}
2121
import org.apache.spark.sql.execution.{SparkPlan, LeafNode}
2222
import org.apache.spark.sql.Row
2323

24-
private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute], child: SparkPlan)
24+
private[sql] case class InMemoryColumnarTableScan(child: SparkPlan)
2525
extends LeafNode {
2626

27-
override def output: Seq[Attribute] = attributes
27+
override def output: Seq[Attribute] = child.output
2828

2929
lazy val cachedColumnBuffers = {
30-
val ordinals = attributes.map(a => child.output.indexWhere(_.name == a.name))
31-
val output = child.output
30+
val childOutput = child.output
3231
val cached = child.execute().mapPartitions { iterator =>
33-
val columnBuilders = ordinals.map { i =>
34-
val attribute = output(i)
32+
val columnBuilders = childOutput.map { attribute =>
3533
ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name)
3634
}.toArray
3735

3836
var row: Row = null
3937
while (iterator.hasNext) {
4038
row = iterator.next()
4139
var i = 0
42-
while (i < ordinals.length) {
43-
columnBuilders(i).appendFrom(row, ordinals(i))
40+
while (i < childOutput.length) {
41+
columnBuilders(i).appendFrom(row, i)
4442
i += 1
4543
}
4644
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,7 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)
7070
SparkLogicalPlan(
7171
alreadyPlanned match {
7272
case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd)
73-
case InMemoryColumnarTableScan(output, child) =>
74-
InMemoryColumnarTableScan(output.map(_.newInstance), child)
73+
case scan @ InMemoryColumnarTableScan(child) => scan
7574
case _ => sys.error("Multiple instance of the same relation detected.")
7675
}).asInstanceOf[this.type]
7776
}

sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,14 @@ class InMemoryColumnarQuerySuite extends QueryTest {
2828

2929
test("simple columnar query") {
3030
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
31-
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
31+
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan))
3232

3333
checkAnswer(scan, testData.collect().toSeq)
3434
}
3535

3636
test("projection") {
3737
val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan
38-
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
38+
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan))
3939

4040
checkAnswer(scan, testData.collect().map {
4141
case Row(key: Int, value: String) => value -> key
@@ -44,7 +44,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
4444

4545
test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
4646
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
47-
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
47+
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan))
4848

4949
checkAnswer(scan, testData.collect().toSeq)
5050
checkAnswer(scan, testData.collect().toSeq)

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {
120120
castChildOutput(p, table, child)
121121

122122
case p @ logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan(
123-
_, HiveTableScan(_, table, _))), _, child, _) =>
123+
HiveTableScan(_, table, _))), _, child, _) =>
124124
castChildOutput(p, table, child)
125125
}
126126

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ trait HiveStrategies {
4444
case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) =>
4545
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
4646
case logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan(
47-
_, HiveTableScan(_, table, _))), partition, child, overwrite) =>
47+
HiveTableScan(_, table, _))), partition, child, overwrite) =>
4848
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
4949
case _ => Nil
5050
}

0 commit comments

Comments
 (0)