From 980e08ef0c4da375d13c4bfe1e919543f988b222 Mon Sep 17 00:00:00 2001 From: Xiao Li Date: Sun, 19 Feb 2017 09:03:44 -0800 Subject: [PATCH 1/6] move --- .../sql/sources/BucketedWriteSuite.scala | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) rename sql/{hive => core}/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala (92%) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala similarity index 92% rename from sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index 8528dfc4cea4..35ce97feb853 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -20,19 +20,19 @@ package org.apache.spark.sql.sources import java.io.File import java.net.URI -import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, QueryTest} import org.apache.spark.sql.catalyst.expressions.UnsafeProjection import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.datasources.BucketingUtils import org.apache.spark.sql.functions._ -import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.test.SharedSQLContext -class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { +class BucketedWriteSuite extends QueryTest with SharedSQLContext { import testImplicits._ + val fileFormatsToTest = Seq("parquet", "json") + test("bucketed by non-existing column") { val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j") intercept[AnalysisException](df.write.bucketBy(2, "k").saveAsTable("tt")) @@ -72,11 +72,13 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle assert(e.getMessage == "'insertInto' does not support bucketing right now;") } - private val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") + private lazy val df = { + (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") + } def tableDir: File = { val identifier = spark.sessionState.sqlParser.parseTableIdentifier("bucketed_table") - new File(URI.create(hiveContext.sessionState.catalog.hiveDefaultTableFilePath(identifier))) + new File(URI.create(spark.sessionState.catalog.defaultTablePath(identifier))) } /** @@ -137,7 +139,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle } test("write bucketed data") { - for (source <- Seq("parquet", "json", "orc")) { + for (source <- fileFormatsToTest) { withTable("bucketed_table") { df.write .format(source) @@ -153,7 +155,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle } test("write bucketed data with sortBy") { - for (source <- Seq("parquet", "json", "orc")) { + for (source <- fileFormatsToTest) { withTable("bucketed_table") { df.write .format(source) @@ -186,7 +188,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle } test("write bucketed data without partitionBy") { - for (source <- Seq("parquet", "json", "orc")) { + for (source <- fileFormatsToTest) { withTable("bucketed_table") { df.write .format(source) @@ -199,7 +201,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle } test("write bucketed data without partitionBy with sortBy") { - for (source <- Seq("parquet", "json", "orc")) { + for (source <- fileFormatsToTest) { withTable("bucketed_table") { df.write .format(source) @@ -215,7 +217,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle test("write bucketed data with bucketing disabled") { // The configuration BUCKETING_ENABLED does not affect the writing path withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") { - for (source <- Seq("parquet", "json", "orc")) { + for (source <- fileFormatsToTest) { withTable("bucketed_table") { df.write .format(source) From 7a35f16e0f6f51cd2cf9db0171658d047c4a711b Mon Sep 17 00:00:00 2001 From: Xiao Li Date: Sun, 19 Feb 2017 09:47:48 -0800 Subject: [PATCH 2/6] add a new test suite --- .../sql/sources/BucketedWriteSuite.scala | 16 ++++++++-- .../BucketedWriteWithHiveSupportSuite.scala | 30 +++++++++++++++++++ 2 files changed, 43 insertions(+), 3 deletions(-) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index e2b85ecba4bf..9082261af7b0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -26,12 +26,22 @@ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.datasources.BucketingUtils import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION +import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} -class BucketedWriteSuite extends QueryTest with SharedSQLContext { +class BucketedWriteWithoutHiveSupportSuite extends BucketedWriteSuite with SharedSQLContext { + protected override def beforeAll(): Unit = { + super.beforeAll() + assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory") + } + + override protected def fileFormatsToTest: Seq[String] = Seq("parquet", "json") +} + +abstract class BucketedWriteSuite extends QueryTest with SQLTestUtils { import testImplicits._ - val fileFormatsToTest = Seq("parquet", "json") + protected def fileFormatsToTest: Seq[String] test("bucketed by non-existing column") { val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala new file mode 100644 index 000000000000..c6f32d13a2b9 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION + +class BucketedWriteWithHiveSupportSuite extends BucketedWriteSuite with TestHiveSingleton { + protected override def beforeAll(): Unit = { + super.beforeAll() + assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "hive") + } + + override protected def fileFormatsToTest: Seq[String] = Seq("parquet", "orc") +} \ No newline at end of file From 4c94cef6516af85a8ecc18896e75c936c43e450d Mon Sep 17 00:00:00 2001 From: Xiao Li Date: Mon, 20 Feb 2017 09:49:32 -0800 Subject: [PATCH 3/6] move BucketedRead --- .../spark/sql/sources/BucketedReadSuite.scala | 31 +++++++++++++------ .../BucketedReadWithHiveSupportSuite.scala | 28 +++++++++++++++++ 2 files changed, 49 insertions(+), 10 deletions(-) rename sql/{hive => core}/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala (95%) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadWithHiveSupportSuite.scala diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala similarity index 95% rename from sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 4fc72b9e4759..d55d05b37ce7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -29,17 +29,26 @@ import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.apache.spark.sql.execution.joins.SortMergeJoinExec import org.apache.spark.sql.functions._ -import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION +import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} import org.apache.spark.util.Utils import org.apache.spark.util.collection.BitSet -class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { + +class BucketedReadWithoutHiveSupportSuite extends BucketedReadSuite with SharedSQLContext { + protected override def beforeAll(): Unit = { + super.beforeAll() + assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory") + } +} + + +abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { import testImplicits._ - private val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") - private val nullDF = (for { + private lazy val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") + private lazy val nullDF = (for { i <- 0 to 50 s <- Seq(null, "a", "b", "c", "d", "e", "f", null, "g") } yield (i % 5, s, i % 13)).toDF("i", "j", "k") @@ -224,8 +233,10 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet } } - private val df1 = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k").as("df1") - private val df2 = (0 until 50).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k").as("df2") + private lazy val df1 = + (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k").as("df1") + private lazy val df2 = + (0 until 50).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k").as("df2") case class BucketedTableTestSpec( bucketSpec: Option[BucketSpec], @@ -535,7 +546,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet test("error if there exists any malformed bucket files") { withTable("bucketed_table") { df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") - val warehouseFilePath = new URI(hiveContext.sparkSession.getWarehousePath).getPath + val warehouseFilePath = new URI(spark.sessionState.conf.warehousePath).getPath val tableDir = new File(warehouseFilePath, "bucketed_table") Utils.deleteRecursively(tableDir) df1.write.parquet(tableDir.getAbsolutePath) @@ -553,9 +564,9 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet withTable("bucketed_table") { df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") - checkAnswer(hiveContext.table("bucketed_table").select("j"), df1.select("j")) + checkAnswer(spark.table("bucketed_table").select("j"), df1.select("j")) - checkAnswer(hiveContext.table("bucketed_table").groupBy("j").agg(max("k")), + checkAnswer(spark.table("bucketed_table").groupBy("j").agg(max("k")), df1.groupBy("j").agg(max("k"))) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadWithHiveSupportSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadWithHiveSupportSuite.scala new file mode 100644 index 000000000000..edf131ede3bf --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadWithHiveSupportSuite.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION + +class BucketedReadWithHiveSupportSuite extends BucketedReadSuite with TestHiveSingleton { + protected override def beforeAll(): Unit = { + super.beforeAll() + assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "hive") + } +} \ No newline at end of file From e8d0fc26eadf59a34c8362dcd5f191ec9935eda6 Mon Sep 17 00:00:00 2001 From: Xiao Li Date: Mon, 20 Feb 2017 09:50:53 -0800 Subject: [PATCH 4/6] style fix. --- .../spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala index c6f32d13a2b9..454e2f65d5d8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala @@ -27,4 +27,4 @@ class BucketedWriteWithHiveSupportSuite extends BucketedWriteSuite with TestHive } override protected def fileFormatsToTest: Seq[String] = Seq("parquet", "orc") -} \ No newline at end of file +} From 444aae8ee300f40b104abc0bdd192a3a4c7064b5 Mon Sep 17 00:00:00 2001 From: Xiao Li Date: Mon, 20 Feb 2017 09:51:13 -0800 Subject: [PATCH 5/6] style fix. --- .../spark/sql/sources/BucketedReadWithHiveSupportSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadWithHiveSupportSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadWithHiveSupportSuite.scala index edf131ede3bf..f277f99805a4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadWithHiveSupportSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadWithHiveSupportSuite.scala @@ -25,4 +25,4 @@ class BucketedReadWithHiveSupportSuite extends BucketedReadSuite with TestHiveSi super.beforeAll() assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "hive") } -} \ No newline at end of file +} From 3ecf1872550b72d5eb21e607e23fb07f503ba2f2 Mon Sep 17 00:00:00 2001 From: Xiao Li Date: Mon, 20 Feb 2017 09:55:50 -0800 Subject: [PATCH 6/6] style fix. --- .../scala/org/apache/spark/sql/sources/BucketedReadSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index d55d05b37ce7..9b65419dba23 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -35,7 +35,6 @@ import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} import org.apache.spark.util.Utils import org.apache.spark.util.collection.BitSet - class BucketedReadWithoutHiveSupportSuite extends BucketedReadSuite with SharedSQLContext { protected override def beforeAll(): Unit = { super.beforeAll()