Skip to content

Commit d07d94b

Browse files
committed
Add tests, support for creating parquet files and hive tables.
1 parent fa3fe81 commit d07d94b

File tree

13 files changed

+392
-121
lines changed

13 files changed

+392
-121
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,25 @@ package org.apache.spark.sql
2020
import scala.language.implicitConversions
2121
import scala.reflect.runtime.universe.TypeTag
2222

23+
import org.apache.hadoop.conf.Configuration
24+
2325
import org.apache.spark.SparkContext
2426
import org.apache.spark.annotation.{AlphaComponent, Experimental}
2527
import org.apache.spark.rdd.RDD
28+
2629
import org.apache.spark.sql.catalyst.analysis._
27-
import org.apache.spark.sql.catalyst.dsl
30+
import org.apache.spark.sql.catalyst.{ScalaReflection, dsl}
2831
import org.apache.spark.sql.catalyst.expressions._
2932
import org.apache.spark.sql.catalyst.optimizer.Optimizer
3033
import org.apache.spark.sql.catalyst.plans.logical.{Subquery, LogicalPlan}
3134
import org.apache.spark.sql.catalyst.rules.RuleExecutor
35+
3236
import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
37+
3338
import org.apache.spark.sql.execution._
39+
import org.apache.spark.sql.execution.SparkStrategies
40+
41+
import org.apache.spark.sql.parquet.ParquetRelation
3442

3543
/**
3644
* :: AlphaComponent ::
@@ -88,6 +96,26 @@ class SQLContext(@transient val sparkContext: SparkContext)
8896
def parquetFile(path: String): SchemaRDD =
8997
new SchemaRDD(this, parquet.ParquetRelation(path))
9098

99+
/**
100+
* :: Experimental ::
101+
*
102+
* Creates an empty parquet file with the schema of class `A`, which can be registered as a table.
103+
* This registered table can be used as the target of future `insertInto` operations.
104+
*
105+
* @param path
106+
* @param allowExisting
107+
* @param conf
108+
* @tparam A
109+
*/
110+
@Experimental
111+
def createParquetFile[A <: Product : TypeTag](
112+
path: String,
113+
allowExisting: Boolean = true,
114+
conf: Configuration = new Configuration()): SchemaRDD = {
115+
new SchemaRDD(
116+
this,
117+
ParquetRelation.createEmpty(path, ScalaReflection.attributesFor[A], allowExisting, conf))
118+
}
91119

92120
/**
93121
* Registers the given RDD as a temporary table in the catalog. Temporary tables exist only

sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -272,8 +272,8 @@ class SchemaRDD(
272272
* an `OUTER JOIN` in SQL. When no output rows are produced by the generator for a
273273
* given row, a single row will be output, with `NULL` values for each of the
274274
* generated columns.
275-
* @param alias an optional alias that can be used as qualif for the attributes that are produced
276-
* by this generate operation.
275+
* @param alias an optional alias that can be used as qualifier for the attributes that are
276+
* produced by this generate operation.
277277
*
278278
* @group Query
279279
*/
@@ -285,21 +285,6 @@ class SchemaRDD(
285285
alias: Option[String] = None) =
286286
new SchemaRDD(sqlContext, Generate(generator, join, outer, None, logicalPlan))
287287

288-
/**
289-
* :: Experimental ::
290-
* Adds the rows from this RDD to the specified table. Note in a standard [[SQLContext]] there is
291-
* no notion of persistent tables, and thus queries that contain this operator will fail to
292-
* optimize. When working with an extension of a SQLContext that has a persistent catalog, such
293-
* as a `HiveContext`, this operation will result in insertions to the table specified.
294-
*
295-
* @group schema
296-
*/
297-
@Experimental
298-
def insertInto(tableName: String, overwrite: Boolean = false) =
299-
new SchemaRDD(
300-
sqlContext,
301-
InsertIntoTable(UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite))
302-
303288
/**
304289
* Returns this RDD as a SchemaRDD.
305290
* @group schema

sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql
1919

20+
import org.apache.spark.annotation.Experimental
2021
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
2122
import org.apache.spark.sql.catalyst.plans.logical._
2223

@@ -66,27 +67,29 @@ trait SchemaRDDLike {
6667
}
6768

6869
/**
69-
* <span class="badge badge-red" style="float: right;">EXPERIMENTAL</span>
70+
* :: Experimental ::
7071
*
7172
* Adds the rows from this RDD to the specified table, optionally overwriting the existing data.
7273
*
7374
* @group schema
7475
*/
76+
@Experimental
7577
def insertInto(tableName: String, overwrite: Boolean): Unit =
7678
sqlContext.executePlan(
7779
InsertIntoTable(UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite)).toRdd
7880

7981
/**
80-
* <span class="badge badge-red" style="float: right;">EXPERIMENTAL</span>
82+
* :: Experimental ::
8183
*
8284
* Appends the rows from this RDD to the specified table.
8385
*
8486
* @group schema
8587
*/
88+
@Experimental
8689
def insertInto(tableName: String): Unit = insertInto(tableName, false)
8790

8891
/**
89-
* <span class="badge badge-red" style="float: right;">EXPERIMENTAL</span>
92+
* :: Experimental ::
9093
*
9194
* Creates a table from the the contents of this SchemaRDD. This will fail if the table already
9295
* exists.
@@ -98,7 +101,7 @@ trait SchemaRDDLike {
98101
*
99102
* @param tableName
100103
*/
101-
def createTableAs(tableName: String) =
102-
sqlContext.executePlan(
103-
InsertIntoCreatedTable(None, tableName, logicalPlan))
104+
@Experimental
105+
def createTableAs(tableName: String): Unit =
106+
sqlContext.executePlan(InsertIntoCreatedTable(None, tableName, logicalPlan)).toRdd
104107
}

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ private[sql] object ParquetRelation {
119119
child,
120120
"Attempt to create Parquet table from unresolved child (when schema is not available)")
121121
}
122-
createEmpty(pathString, child.output, conf)
122+
createEmpty(pathString, child.output, false, conf)
123123
}
124124

125125
/**
@@ -133,8 +133,9 @@ private[sql] object ParquetRelation {
133133
*/
134134
def createEmpty(pathString: String,
135135
attributes: Seq[Attribute],
136+
allowExisting: Boolean,
136137
conf: Configuration): ParquetRelation = {
137-
val path = checkPath(pathString, conf)
138+
val path = checkPath(pathString, allowExisting, conf)
138139
if (conf.get(ParquetOutputFormat.COMPRESSION) == null) {
139140
conf.set(ParquetOutputFormat.COMPRESSION, ParquetRelation.defaultCompression.name())
140141
}
@@ -143,7 +144,7 @@ private[sql] object ParquetRelation {
143144
new ParquetRelation(path.toString)
144145
}
145146

146-
private def checkPath(pathStr: String, conf: Configuration): Path = {
147+
private def checkPath(pathStr: String, allowExisting: Boolean, conf: Configuration): Path = {
147148
if (pathStr == null) {
148149
throw new IllegalArgumentException("Unable to create ParquetRelation: path is null")
149150
}
@@ -154,6 +155,10 @@ private[sql] object ParquetRelation {
154155
s"Unable to create ParquetRelation: incorrectly formatted path $pathStr")
155156
}
156157
val path = origPath.makeQualified(fs)
158+
if (!allowExisting && fs.exists(path)) {
159+
sys.error(s"File $pathStr already exists.")
160+
}
161+
157162
if (fs.exists(path) &&
158163
!fs.getFileStatus(path)
159164
.getPermission
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql
19+
20+
import java.io.File
21+
22+
/* Implicits */
23+
import org.apache.spark.sql.test.TestSQLContext._
24+
25+
class InsertIntoSuite extends QueryTest {
26+
TestData // Initialize TestData
27+
import TestData._
28+
29+
test("insertInto() created parquet file") {
30+
val testFilePath = File.createTempFile("sparkSql", "pqt")
31+
testFilePath.delete()
32+
val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath)
33+
testFile.registerAsTable("createAndInsertTest")
34+
35+
// Add some data.
36+
testData.insertInto("createAndInsertTest")
37+
38+
// Make sure its there for a new instance of parquet file.
39+
checkAnswer(
40+
parquetFile(testFilePath.getCanonicalPath),
41+
testData.collect().toSeq
42+
)
43+
44+
// Make sure the registered table has also been updated.
45+
checkAnswer(
46+
sql("SELECT * FROM createAndInsertTest"),
47+
testData.collect().toSeq
48+
)
49+
50+
// Add more data.
51+
testData.insertInto("createAndInsertTest")
52+
53+
// Make sure all data is there for a new instance of parquet file.
54+
checkAnswer(
55+
parquetFile(testFilePath.getCanonicalPath),
56+
testData.collect().toSeq ++ testData.collect().toSeq
57+
)
58+
59+
// Make sure the registered table has also been updated.
60+
checkAnswer(
61+
sql("SELECT * FROM createAndInsertTest"),
62+
testData.collect().toSeq ++ testData.collect().toSeq
63+
)
64+
65+
// Now overwrite.
66+
testData.insertInto("createAndInsertTest", overwrite = true)
67+
68+
// Make sure its there for a new instance of parquet file.
69+
checkAnswer(
70+
parquetFile(testFilePath.getCanonicalPath),
71+
testData.collect().toSeq
72+
)
73+
74+
// Make sure the registered table has also been updated.
75+
checkAnswer(
76+
sql("SELECT * FROM createAndInsertTest"),
77+
testData.collect().toSeq
78+
)
79+
}
80+
81+
test("INSERT INTO parquet table") {
82+
val testFilePath = File.createTempFile("sparkSql", "pqt")
83+
testFilePath.delete()
84+
val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath)
85+
testFile.registerAsTable("createAndInsertSQLTest")
86+
87+
sql("INSERT INTO createAndInsertSQLTest SELECT * FROM testData")
88+
89+
// Make sure its there for a new instance of parquet file.
90+
checkAnswer(
91+
parquetFile(testFilePath.getCanonicalPath),
92+
testData.collect().toSeq
93+
)
94+
95+
// Make sure the registered table has also been updated.
96+
checkAnswer(
97+
sql("SELECT * FROM createAndInsertSQLTest"),
98+
testData.collect().toSeq
99+
)
100+
101+
// Append more data.
102+
sql("INSERT INTO createAndInsertSQLTest SELECT * FROM testData")
103+
104+
// Make sure all data is there for a new instance of parquet file.
105+
checkAnswer(
106+
parquetFile(testFilePath.getCanonicalPath),
107+
testData.collect().toSeq ++ testData.collect().toSeq
108+
)
109+
110+
// Make sure the registered table has also been updated.
111+
checkAnswer(
112+
sql("SELECT * FROM createAndInsertSQLTest"),
113+
testData.collect().toSeq ++ testData.collect().toSeq
114+
)
115+
116+
sql("INSERT OVERWRITE INTO createAndInsertSQLTest SELECT * FROM testData")
117+
118+
// Make sure its there for a new instance of parquet file.
119+
checkAnswer(
120+
parquetFile(testFilePath.getCanonicalPath),
121+
testData.collect().toSeq
122+
)
123+
124+
// Make sure the registered table has also been updated.
125+
checkAnswer(
126+
sql("SELECT * FROM createAndInsertSQLTest"),
127+
testData.collect().toSeq
128+
)
129+
}
130+
131+
test("Double create fails when allowExisting = false") {
132+
val testFilePath = File.createTempFile("sparkSql", "pqt")
133+
testFilePath.delete()
134+
val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath)
135+
136+
intercept[RuntimeException] {
137+
createParquetFile[TestData](testFilePath.getCanonicalPath, allowExisting = false)
138+
}
139+
}
140+
141+
test("Double create does not fail when allowExisting = true") {
142+
val testFilePath = File.createTempFile("sparkSql", "pqt")
143+
testFilePath.delete()
144+
val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath)
145+
146+
createParquetFile[TestData](testFilePath.getCanonicalPath, allowExisting = true)
147+
}
148+
}

sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,21 @@ class QueryTest extends FunSuite {
4949
|$e
5050
""".stripMargin)
5151
}
52+
5253
if(prepareAnswer(convertedAnswer) != prepareAnswer(sparkAnswer)) {
5354
fail(s"""
5455
|Results do not match for query:
5556
|${rdd.logicalPlan}
5657
|== Analyzed Plan ==
5758
|${rdd.queryExecution.analyzed}
58-
|== RDD ==
59-
|$rdd
59+
|== Physical Plan ==
60+
|${rdd.queryExecution.executedPlan}
6061
|== Results ==
6162
|${sideBySide(
62-
prepareAnswer(convertedAnswer).map(_.toString),
63-
prepareAnswer(sparkAnswer).map(_.toString)).mkString("\n")}
63+
s"== Correct Answer - ${convertedAnswer.size} ==" +:
64+
prepareAnswer(convertedAnswer).map(_.toString),
65+
s"== Spark Answer - ${sparkAnswer.size} ==" +:
66+
prepareAnswer(sparkAnswer).map(_.toString)).mkString("\n")}
6467
""".stripMargin)
6568
}
6669
}

sql/core/src/test/scala/org/apache/spark/sql/TestData.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ import org.apache.spark.sql.test._
2323
/* Implicits */
2424
import TestSQLContext._
2525

26+
case class TestData(key: Int, value: String)
27+
2628
object TestData {
27-
case class TestData(key: Int, value: String)
2829
val testData: SchemaRDD = TestSQLContext.sparkContext.parallelize(
2930
(1 to 100).map(i => TestData(i, i.toString)))
3031
testData.registerAsTable("testData")

0 commit comments

Comments
 (0)