Skip to content

Commit 0167cc3

Browse files
gatorsmileYun Ni
authored andcommitted
[SPARK-19670][SQL][TEST] Enable Bucketed Table Reading and Writing Testing Without Hive Support
### What changes were proposed in this pull request? Bucketed table reading and writing does not need Hive support. We can move the test cases from `sql/hive` to `sql/core`. After this PR, we can improve the test case coverage. Bucket table reading and writing can be tested with and without Hive support. ### How was this patch tested? N/A Author: Xiao Li <[email protected]> Closes apache#17004 from gatorsmile/mvTestCaseForBuckets.
1 parent 0bd7ec5 commit 0167cc3

File tree

4 files changed

+101
-21
lines changed

4 files changed

+101
-21
lines changed

sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala renamed to sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,25 @@ import org.apache.spark.sql.execution.datasources.DataSourceStrategy
2929
import org.apache.spark.sql.execution.exchange.ShuffleExchange
3030
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
3131
import org.apache.spark.sql.functions._
32-
import org.apache.spark.sql.hive.test.TestHiveSingleton
3332
import org.apache.spark.sql.internal.SQLConf
34-
import org.apache.spark.sql.test.SQLTestUtils
33+
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
34+
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
3535
import org.apache.spark.util.Utils
3636
import org.apache.spark.util.collection.BitSet
3737

38-
class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
38+
class BucketedReadWithoutHiveSupportSuite extends BucketedReadSuite with SharedSQLContext {
39+
protected override def beforeAll(): Unit = {
40+
super.beforeAll()
41+
assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
42+
}
43+
}
44+
45+
46+
abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
3947
import testImplicits._
4048

41-
private val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
42-
private val nullDF = (for {
49+
private lazy val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
50+
private lazy val nullDF = (for {
4351
i <- 0 to 50
4452
s <- Seq(null, "a", "b", "c", "d", "e", "f", null, "g")
4553
} yield (i % 5, s, i % 13)).toDF("i", "j", "k")
@@ -224,8 +232,10 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
224232
}
225233
}
226234

227-
private val df1 = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k").as("df1")
228-
private val df2 = (0 until 50).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k").as("df2")
235+
private lazy val df1 =
236+
(0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k").as("df1")
237+
private lazy val df2 =
238+
(0 until 50).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k").as("df2")
229239

230240
case class BucketedTableTestSpec(
231241
bucketSpec: Option[BucketSpec],
@@ -535,7 +545,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
535545
test("error if there exists any malformed bucket files") {
536546
withTable("bucketed_table") {
537547
df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table")
538-
val warehouseFilePath = new URI(hiveContext.sparkSession.getWarehousePath).getPath
548+
val warehouseFilePath = new URI(spark.sessionState.conf.warehousePath).getPath
539549
val tableDir = new File(warehouseFilePath, "bucketed_table")
540550
Utils.deleteRecursively(tableDir)
541551
df1.write.parquet(tableDir.getAbsolutePath)
@@ -553,9 +563,9 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
553563
withTable("bucketed_table") {
554564
df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table")
555565

556-
checkAnswer(hiveContext.table("bucketed_table").select("j"), df1.select("j"))
566+
checkAnswer(spark.table("bucketed_table").select("j"), df1.select("j"))
557567

558-
checkAnswer(hiveContext.table("bucketed_table").groupBy("j").agg(max("k")),
568+
checkAnswer(spark.table("bucketed_table").groupBy("j").agg(max("k")),
559569
df1.groupBy("j").agg(max("k")))
560570
}
561571
}

sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala renamed to sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,29 @@ package org.apache.spark.sql.sources
2020
import java.io.File
2121
import java.net.URI
2222

23-
import org.apache.spark.SparkException
2423
import org.apache.spark.sql.{AnalysisException, QueryTest}
2524
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
2625
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
2726
import org.apache.spark.sql.execution.datasources.BucketingUtils
2827
import org.apache.spark.sql.functions._
29-
import org.apache.spark.sql.hive.test.TestHiveSingleton
3028
import org.apache.spark.sql.internal.SQLConf
31-
import org.apache.spark.sql.test.SQLTestUtils
29+
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
30+
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
3231

33-
class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
32+
class BucketedWriteWithoutHiveSupportSuite extends BucketedWriteSuite with SharedSQLContext {
33+
protected override def beforeAll(): Unit = {
34+
super.beforeAll()
35+
assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
36+
}
37+
38+
override protected def fileFormatsToTest: Seq[String] = Seq("parquet", "json")
39+
}
40+
41+
abstract class BucketedWriteSuite extends QueryTest with SQLTestUtils {
3442
import testImplicits._
3543

44+
protected def fileFormatsToTest: Seq[String]
45+
3646
test("bucketed by non-existing column") {
3747
val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
3848
intercept[AnalysisException](df.write.bucketBy(2, "k").saveAsTable("tt"))
@@ -76,11 +86,13 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
7686
assert(e.getMessage == "'insertInto' does not support bucketing right now;")
7787
}
7888

79-
private val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
89+
private lazy val df = {
90+
(0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
91+
}
8092

8193
def tableDir: File = {
8294
val identifier = spark.sessionState.sqlParser.parseTableIdentifier("bucketed_table")
83-
new File(URI.create(hiveContext.sessionState.catalog.hiveDefaultTableFilePath(identifier)))
95+
new File(URI.create(spark.sessionState.catalog.defaultTablePath(identifier)))
8496
}
8597

8698
/**
@@ -141,7 +153,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
141153
}
142154

143155
test("write bucketed data") {
144-
for (source <- Seq("parquet", "json", "orc")) {
156+
for (source <- fileFormatsToTest) {
145157
withTable("bucketed_table") {
146158
df.write
147159
.format(source)
@@ -157,7 +169,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
157169
}
158170

159171
test("write bucketed data with sortBy") {
160-
for (source <- Seq("parquet", "json", "orc")) {
172+
for (source <- fileFormatsToTest) {
161173
withTable("bucketed_table") {
162174
df.write
163175
.format(source)
@@ -190,7 +202,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
190202
}
191203

192204
test("write bucketed data without partitionBy") {
193-
for (source <- Seq("parquet", "json", "orc")) {
205+
for (source <- fileFormatsToTest) {
194206
withTable("bucketed_table") {
195207
df.write
196208
.format(source)
@@ -203,7 +215,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
203215
}
204216

205217
test("write bucketed data without partitionBy with sortBy") {
206-
for (source <- Seq("parquet", "json", "orc")) {
218+
for (source <- fileFormatsToTest) {
207219
withTable("bucketed_table") {
208220
df.write
209221
.format(source)
@@ -219,7 +231,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
219231
test("write bucketed data with bucketing disabled") {
220232
// The configuration BUCKETING_ENABLED does not affect the writing path
221233
withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") {
222-
for (source <- Seq("parquet", "json", "orc")) {
234+
for (source <- fileFormatsToTest) {
223235
withTable("bucketed_table") {
224236
df.write
225237
.format(source)
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources
19+
20+
import org.apache.spark.sql.hive.test.TestHiveSingleton
21+
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
22+
23+
class BucketedReadWithHiveSupportSuite extends BucketedReadSuite with TestHiveSingleton {
24+
protected override def beforeAll(): Unit = {
25+
super.beforeAll()
26+
assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "hive")
27+
}
28+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources
19+
20+
import org.apache.spark.sql.hive.test.TestHiveSingleton
21+
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
22+
23+
class BucketedWriteWithHiveSupportSuite extends BucketedWriteSuite with TestHiveSingleton {
24+
protected override def beforeAll(): Unit = {
25+
super.beforeAll()
26+
assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "hive")
27+
}
28+
29+
override protected def fileFormatsToTest: Seq[String] = Seq("parquet", "orc")
30+
}

0 commit comments

Comments
 (0)