diff --git a/dev/deps/spark-deps-hadoop-3.1 b/dev/deps/spark-deps-hadoop-3.1 index da874026d7d10..56f6c0a69efd0 100644 --- a/dev/deps/spark-deps-hadoop-3.1 +++ b/dev/deps/spark-deps-hadoop-3.1 @@ -93,8 +93,6 @@ jackson-core-2.6.7.jar jackson-core-asl-1.9.13.jar jackson-databind-2.6.7.1.jar jackson-dataformat-yaml-2.6.7.jar -jackson-jaxrs-base-2.7.8.jar -jackson-jaxrs-json-provider-2.7.8.jar jackson-mapper-asl-1.9.13.jar jackson-module-jaxb-annotations-2.6.7.jar jackson-module-paranamer-2.7.9.jar diff --git a/dev/run-tests.py b/dev/run-tests.py index cd4590864b7d7..c76a67d37d6cd 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -305,8 +305,8 @@ def get_hadoop_profiles(hadoop_version): """ sbt_maven_hadoop_profiles = { - "hadoop2.6": ["-Phadoop-2.6"], - "hadoop2.7": ["-Phadoop-2.7"], + "hadoop2.6": ["-Phadoop-3.1"], + "hadoop2.7": ["-Phadoop-3.1"], } if hadoop_version in sbt_maven_hadoop_profiles: diff --git a/pom.xml b/pom.xml index 4b4e6c13ea8fd..19019f4c5d87f 100644 --- a/pom.xml +++ b/pom.xml @@ -123,7 +123,7 @@ 1.6.0 3.4.6 2.6.0 - org.spark-project.hive + com.github.hyukjinkwon 1.2.1.spark2 @@ -1080,6 +1080,10 @@ com.sun.jersey.contribs * + + com.fasterxml.jackson.jaxrs + jackson-jaxrs-json-provider + diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index b606f9355e03b..e8529455aae4c 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -317,7 +317,7 @@ object SparkBuild extends PomBuild { // Note ordering of these settings matter. /* Enable shared settings on all projects */ (allProjects ++ optionallyEnabledProjects ++ assemblyProjects ++ copyJarsProjects ++ Seq(spark, tools)) - .foreach(enable(sharedSettings ++ DependencyOverrides.settings ++ + .foreach(enable(sharedSettings ++ DependencyOverrides.settings ++ ExcludeDependencies.settings ++ ExcludedDependencies.settings ++ Checkstyle.settings)) /* Enable tests settings for all projects except examples, assembly and tools */ @@ -464,7 +464,20 @@ object DockerIntegrationTests { */ object DependencyOverrides { lazy val settings = Seq( - dependencyOverrides += "com.google.guava" % "guava" % "14.0.1") + dependencyOverrides += "com.google.guava" % "guava" % "14.0.1", + dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-annotations" % "2.6.7", + dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-core" % "2.6.7", + dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-module-jaxb-annotations" % "2.6.7", + dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7") +} + +/** + * Exclusions to work around sbt's dependency resolution being different from Maven's. + */ +object ExcludeDependencies { + lazy val settings = Seq( + excludeDependencies += "com.fasterxml.jackson.jaxrs" % "jackson-jaxrs-json-provider", + excludeDependencies += "javax.ws.rs" % "jsr311-api") } /** diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index c55ba32fa458c..4289c30ce9c95 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -121,7 +121,7 @@ avro + hadoop2 classifier for hadoop 2 API. avro-mapred is a dependency of com.github.hyukjinkwon:hive-serde --> org.apache.avro avro-mapred diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 2f34f69b5cf48..3063c18a4efee 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -177,7 +177,10 @@ private[hive] class IsolatedClientLoader( protected def isSharedClass(name: String): Boolean = { val isHadoopClass = - name.startsWith("org.apache.hadoop.") && !name.startsWith("org.apache.hadoop.hive.") + name.startsWith("org.apache.hadoop.") && !name.startsWith("org.apache.hadoop.hive.") || + // Also, includes configuration2 as a min fix for Hadoop 3+ for now. This is failed + // during class resolution. It is fine when 'sharesHadoopClasses' is disabled. + name.startsWith("org.apache.commons.configuration2.") name.startsWith("org.slf4j") || name.startsWith("org.apache.log4j") || // log4j1.x diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index aa5b531992613..a7273820b3749 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -22,6 +22,7 @@ import java.io.{BufferedWriter, File, FileWriter} import scala.tools.nsc.Properties import org.apache.hadoop.fs.Path +import org.apache.hadoop.util.VersionInfo import org.scalatest.{BeforeAndAfterEach, Matchers} import org.apache.spark._ @@ -123,6 +124,7 @@ class HiveSparkSubmitSuite } test("SPARK-8020: set sql conf in spark conf") { + assume(VersionInfo.getVersion < "3.0.0", "Only Hive 2.3+ supports Hadoop 3+. See HIVE-16081.") val unusedJar = TestUtils.createJarWithClasses(Seq.empty) val args = Seq( "--class", SparkSQLConfTest.getClass.getName.stripSuffix("$"), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala index e5963d03f6b52..9751105c0523b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.client import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.util.VersionInfo import org.scalactic.source.Position import org.scalatest.Tag @@ -49,6 +50,9 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu override protected def test(testName: String, testTags: Tag*)(testFun: => Any) (implicit pos: Position): Unit = { + assume( + VersionInfo.getVersion < "3.0.0" || version >= "2.3", + "Hive 2.3+ supports Hadoop 3+. See HIVE-16081.") super.test(s"$version: $testName", testTags: _*)(testFun) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index dc96ec416afd8..c02b1cecdc348 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.hadoop.mapred.TextInputFormat +import org.apache.hadoop.util.VersionInfo import org.apache.spark.SparkFunSuite import org.apache.spark.internal.Logging @@ -75,19 +76,29 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test("success sanity check") { + private val testBuiltinVersion = testHiveVersion(HiveUtils.builtinHiveVersion, _: String) _ + testBuiltinVersion("success sanity check") { val badClient = buildClient(HiveUtils.builtinHiveVersion, new Configuration()) val db = new CatalogDatabase("default", "desc", new URI("loc"), Map()) badClient.createDatabase(db, ignoreIfExists = true) } - test("hadoop configuration preserved") { + testBuiltinVersion("hadoop configuration preserved") { val hadoopConf = new Configuration() hadoopConf.set("test", "success") val client = buildClient(HiveUtils.builtinHiveVersion, hadoopConf) assert("success" === client.getConf("test", null)) } + private def testHiveVersion(hiveVersion: String, title: String)(func: => Unit): Unit = { + test(title) { + assume( + VersionInfo.getVersion < "3.0.0" || hiveVersion >= "2.3", + "Hive 2.3+ supports Hadoop 3+. See HIVE-16081.") + func + } + } + private def getNestedMessages(e: Throwable): String = { var causes = "" var lastException = e @@ -119,7 +130,8 @@ class VersionsSuite extends SparkFunSuite with Logging { private var versionSpark: TestHiveVersion = null versions.foreach { version => - test(s"$version: create client") { + val testVersion = testHiveVersion(version, _: String) _ + testVersion(s"$version: create client") { client = null System.gc() // Hack to avoid SEGV on some JVM versions. val hadoopConf = new Configuration() @@ -159,7 +171,7 @@ class VersionsSuite extends SparkFunSuite with Logging { val tempDatabasePath = Utils.createTempDir().toURI - test(s"$version: createDatabase") { + testVersion(s"$version: createDatabase") { val defaultDB = CatalogDatabase("default", "desc", new URI("loc"), Map()) client.createDatabase(defaultDB, ignoreIfExists = true) val tempDB = CatalogDatabase( @@ -167,7 +179,7 @@ class VersionsSuite extends SparkFunSuite with Logging { client.createDatabase(tempDB, ignoreIfExists = true) } - test(s"$version: createDatabase with null description") { + testVersion(s"$version: createDatabase with null description") { withTempDir { tmpDir => val dbWithNullDesc = CatalogDatabase("dbWithNullDesc", description = null, tmpDir.toURI, Map()) @@ -176,32 +188,32 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: setCurrentDatabase") { + testVersion(s"$version: setCurrentDatabase") { client.setCurrentDatabase("default") } - test(s"$version: getDatabase") { + testVersion(s"$version: getDatabase") { // No exception should be thrown client.getDatabase("default") intercept[NoSuchDatabaseException](client.getDatabase("nonexist")) } - test(s"$version: databaseExists") { + testVersion(s"$version: databaseExists") { assert(client.databaseExists("default") == true) assert(client.databaseExists("nonexist") == false) } - test(s"$version: listDatabases") { + testVersion(s"$version: listDatabases") { assert(client.listDatabases("defau.*") == Seq("default")) } - test(s"$version: alterDatabase") { + testVersion(s"$version: alterDatabase") { val database = client.getDatabase("temporary").copy(properties = Map("flag" -> "true")) client.alterDatabase(database) assert(client.getDatabase("temporary").properties.contains("flag")) } - test(s"$version: dropDatabase") { + testVersion(s"$version: dropDatabase") { assert(client.databaseExists("temporary") == true) client.dropDatabase("temporary", ignoreIfNotExists = false, cascade = true) assert(client.databaseExists("temporary") == false) @@ -211,12 +223,12 @@ class VersionsSuite extends SparkFunSuite with Logging { // Table related API /////////////////////////////////////////////////////////////////////////// - test(s"$version: createTable") { + testVersion(s"$version: createTable") { client.createTable(table("default", tableName = "src"), ignoreIfExists = false) client.createTable(table("default", "temporary"), ignoreIfExists = false) } - test(s"$version: loadTable") { + testVersion(s"$version: loadTable") { client.loadTable( emptyDir, tableName = "src", @@ -224,34 +236,34 @@ class VersionsSuite extends SparkFunSuite with Logging { isSrcLocal = false) } - test(s"$version: tableExists") { + testVersion(s"$version: tableExists") { // No exception should be thrown assert(client.tableExists("default", "src")) assert(!client.tableExists("default", "nonexistent")) } - test(s"$version: getTable") { + testVersion(s"$version: getTable") { // No exception should be thrown client.getTable("default", "src") } - test(s"$version: getTableOption") { + testVersion(s"$version: getTableOption") { assert(client.getTableOption("default", "src").isDefined) } - test(s"$version: alterTable(table: CatalogTable)") { + testVersion(s"$version: alterTable(table: CatalogTable)") { val newTable = client.getTable("default", "src").copy(properties = Map("changed" -> "")) client.alterTable(newTable) assert(client.getTable("default", "src").properties.contains("changed")) } - test(s"$version: alterTable(dbName: String, tableName: String, table: CatalogTable)") { + testVersion(s"$version: alterTable(dbName: String, tableName: String, table: CatalogTable)") { val newTable = client.getTable("default", "src").copy(properties = Map("changedAgain" -> "")) client.alterTable("default", "src", newTable) assert(client.getTable("default", "src").properties.contains("changedAgain")) } - test(s"$version: alterTable - rename") { + testVersion(s"$version: alterTable - rename") { val newTable = client.getTable("default", "src") .copy(identifier = TableIdentifier("tgt", database = Some("default"))) assert(!client.tableExists("default", "tgt")) @@ -262,7 +274,7 @@ class VersionsSuite extends SparkFunSuite with Logging { assert(!client.tableExists("default", "src")) } - test(s"$version: alterTable - change database") { + testVersion(s"$version: alterTable - change database") { val tempDB = CatalogDatabase( "temporary", description = "test create", tempDatabasePath, Map()) client.createDatabase(tempDB, ignoreIfExists = true) @@ -277,7 +289,7 @@ class VersionsSuite extends SparkFunSuite with Logging { assert(!client.tableExists("default", "tgt")) } - test(s"$version: alterTable - change database and table names") { + testVersion(s"$version: alterTable - change database and table names") { val newTable = client.getTable("temporary", "tgt") .copy(identifier = TableIdentifier("src", database = Some("default"))) assert(!client.tableExists("default", "src")) @@ -288,16 +300,16 @@ class VersionsSuite extends SparkFunSuite with Logging { assert(!client.tableExists("temporary", "tgt")) } - test(s"$version: listTables(database)") { + testVersion(s"$version: listTables(database)") { assert(client.listTables("default") === Seq("src", "temporary")) } - test(s"$version: listTables(database, pattern)") { + testVersion(s"$version: listTables(database, pattern)") { assert(client.listTables("default", pattern = "src") === Seq("src")) assert(client.listTables("default", pattern = "nonexist").isEmpty) } - test(s"$version: dropTable") { + testVersion(s"$version: dropTable") { val versionsWithoutPurge = versions.takeWhile(_ != "0.14") // First try with the purge option set. This should fail if the version is < 0.14, in which // case we check the version and try without it. @@ -326,13 +338,13 @@ class VersionsSuite extends SparkFunSuite with Logging { compressed = false, properties = Map.empty) - test(s"$version: sql create partitioned table") { + testVersion(s"$version: sql create partitioned table") { client.runSqlHive("CREATE TABLE src_part (value INT) PARTITIONED BY (key1 INT, key2 INT)") } val testPartitionCount = 2 - test(s"$version: createPartitions") { + testVersion(s"$version: createPartitions") { val partitions = (1 to testPartitionCount).map { key2 => CatalogTablePartition(Map("key1" -> "1", "key2" -> key2.toString), storageFormat) } @@ -340,17 +352,17 @@ class VersionsSuite extends SparkFunSuite with Logging { "default", "src_part", partitions, ignoreIfExists = true) } - test(s"$version: getPartitionNames(catalogTable)") { + testVersion(s"$version: getPartitionNames(catalogTable)") { val partitionNames = (1 to testPartitionCount).map(key2 => s"key1=1/key2=$key2") assert(partitionNames == client.getPartitionNames(client.getTable("default", "src_part"))) } - test(s"$version: getPartitions(catalogTable)") { + testVersion(s"$version: getPartitions(catalogTable)") { assert(testPartitionCount == client.getPartitions(client.getTable("default", "src_part")).size) } - test(s"$version: getPartitionsByFilter") { + testVersion(s"$version: getPartitionsByFilter") { // Only one partition [1, 1] for key2 == 1 val result = client.getPartitionsByFilter(client.getTable("default", "src_part"), Seq(EqualTo(AttributeReference("key2", IntegerType)(), Literal(1)))) @@ -363,28 +375,29 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: getPartition") { + testVersion(s"$version: getPartition") { // No exception should be thrown client.getPartition("default", "src_part", Map("key1" -> "1", "key2" -> "2")) } - test(s"$version: getPartitionOption(db: String, table: String, spec: TablePartitionSpec)") { + testVersion( + s"$version: getPartitionOption(db: String, table: String, spec: TablePartitionSpec)") { val partition = client.getPartitionOption( "default", "src_part", Map("key1" -> "1", "key2" -> "2")) assert(partition.isDefined) } - test(s"$version: getPartitionOption(table: CatalogTable, spec: TablePartitionSpec)") { + testVersion(s"$version: getPartitionOption(table: CatalogTable, spec: TablePartitionSpec)") { val partition = client.getPartitionOption( client.getTable("default", "src_part"), Map("key1" -> "1", "key2" -> "2")) assert(partition.isDefined) } - test(s"$version: getPartitions(db: String, table: String)") { + testVersion(s"$version: getPartitions(db: String, table: String)") { assert(testPartitionCount == client.getPartitions("default", "src_part", None).size) } - test(s"$version: loadPartition") { + testVersion(s"$version: loadPartition") { val partSpec = new java.util.LinkedHashMap[String, String] partSpec.put("key1", "1") partSpec.put("key2", "2") @@ -399,7 +412,7 @@ class VersionsSuite extends SparkFunSuite with Logging { isSrcLocal = false) } - test(s"$version: loadDynamicPartitions") { + testVersion(s"$version: loadDynamicPartitions") { val partSpec = new java.util.LinkedHashMap[String, String] partSpec.put("key1", "1") partSpec.put("key2", "") // Dynamic partition @@ -413,7 +426,7 @@ class VersionsSuite extends SparkFunSuite with Logging { numDP = 1) } - test(s"$version: renamePartitions") { + testVersion(s"$version: renamePartitions") { val oldSpec = Map("key1" -> "1", "key2" -> "1") val newSpec = Map("key1" -> "1", "key2" -> "3") client.renamePartitions("default", "src_part", Seq(oldSpec), Seq(newSpec)) @@ -422,7 +435,7 @@ class VersionsSuite extends SparkFunSuite with Logging { assert(client.getPartitionOption("default", "src_part", newSpec).isDefined) } - test(s"$version: alterPartitions") { + testVersion(s"$version: alterPartitions") { val spec = Map("key1" -> "1", "key2" -> "2") val parameters = Map(StatsSetupConst.TOTAL_SIZE -> "0", StatsSetupConst.NUM_FILES -> "1") val newLocation = new URI(Utils.createTempDir().toURI.toString.stripSuffix("/")) @@ -438,7 +451,7 @@ class VersionsSuite extends SparkFunSuite with Logging { .parameters.get(StatsSetupConst.TOTAL_SIZE) == Some("0")) } - test(s"$version: dropPartitions") { + testVersion(s"$version: dropPartitions") { val spec = Map("key1" -> "1", "key2" -> "3") val versionsWithoutPurge = versions.takeWhile(_ != "1.2") // Similar to dropTable; try with purge set, and if it fails, make sure we're running @@ -466,7 +479,7 @@ class VersionsSuite extends SparkFunSuite with Logging { FunctionIdentifier(name, Some("default")), className, Seq.empty[FunctionResource]) } - test(s"$version: createFunction") { + testVersion(s"$version: createFunction") { val functionClass = "org.apache.spark.MyFunc1" if (version == "0.12") { // Hive 0.12 doesn't support creating permanent functions @@ -478,7 +491,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: functionExists") { + testVersion(s"$version: functionExists") { if (version == "0.12") { // Hive 0.12 doesn't allow customized permanent functions assert(client.functionExists("default", "func1") == false) @@ -487,7 +500,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: renameFunction") { + testVersion(s"$version: renameFunction") { if (version == "0.12") { // Hive 0.12 doesn't allow customized permanent functions intercept[NoSuchPermanentFunctionException] { @@ -499,7 +512,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: alterFunction") { + testVersion(s"$version: alterFunction") { val functionClass = "org.apache.spark.MyFunc2" if (version == "0.12") { // Hive 0.12 doesn't allow customized permanent functions @@ -511,7 +524,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: getFunction") { + testVersion(s"$version: getFunction") { if (version == "0.12") { // Hive 0.12 doesn't allow customized permanent functions intercept[NoSuchPermanentFunctionException] { @@ -524,7 +537,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: getFunctionOption") { + testVersion(s"$version: getFunctionOption") { if (version == "0.12") { // Hive 0.12 doesn't allow customized permanent functions assert(client.getFunctionOption("default", "func2").isEmpty) @@ -534,7 +547,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: listFunctions") { + testVersion(s"$version: listFunctions") { if (version == "0.12") { // Hive 0.12 doesn't allow customized permanent functions assert(client.listFunctions("default", "fun.*").isEmpty) @@ -543,7 +556,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: dropFunction") { + testVersion(s"$version: dropFunction") { if (version == "0.12") { // Hive 0.12 doesn't support creating permanent functions intercept[NoSuchPermanentFunctionException] { @@ -560,11 +573,11 @@ class VersionsSuite extends SparkFunSuite with Logging { // SQL related API /////////////////////////////////////////////////////////////////////////// - test(s"$version: sql set command") { + testVersion(s"$version: sql set command") { client.runSqlHive("SET spark.sql.test.key=1") } - test(s"$version: sql create index and reset") { + testVersion(s"$version: sql create index and reset") { client.runSqlHive("CREATE TABLE indexed_table (key INT)") client.runSqlHive("CREATE INDEX index_1 ON TABLE indexed_table(key) " + "as 'COMPACT' WITH DEFERRED REBUILD") @@ -574,32 +587,32 @@ class VersionsSuite extends SparkFunSuite with Logging { // Miscellaneous API /////////////////////////////////////////////////////////////////////////// - test(s"$version: version") { + testVersion(s"$version: version") { assert(client.version.fullVersion.startsWith(version)) } - test(s"$version: getConf") { + testVersion(s"$version: getConf") { assert("success" === client.getConf("test", null)) } - test(s"$version: setOut") { + testVersion(s"$version: setOut") { client.setOut(new PrintStream(new ByteArrayOutputStream())) } - test(s"$version: setInfo") { + testVersion(s"$version: setInfo") { client.setInfo(new PrintStream(new ByteArrayOutputStream())) } - test(s"$version: setError") { + testVersion(s"$version: setError") { client.setError(new PrintStream(new ByteArrayOutputStream())) } - test(s"$version: newSession") { + testVersion(s"$version: newSession") { val newClient = client.newSession() assert(newClient != null) } - test(s"$version: withHiveState and addJar") { + testVersion(s"$version: withHiveState and addJar") { val newClassPath = "." client.addJar(newClassPath) client.withHiveState { @@ -613,7 +626,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: reset") { + testVersion(s"$version: reset") { // Clears all database, tables, functions... client.reset() assert(client.listTables("default").isEmpty) @@ -623,7 +636,7 @@ class VersionsSuite extends SparkFunSuite with Logging { // End-To-End tests /////////////////////////////////////////////////////////////////////////// - test(s"$version: CREATE TABLE AS SELECT") { + testVersion(s"$version: CREATE TABLE AS SELECT") { withTable("tbl") { versionSpark.sql("CREATE TABLE tbl AS SELECT 1 AS a") assert(versionSpark.table("tbl").collect().toSeq == Seq(Row(1))) @@ -638,7 +651,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: CREATE Partitioned TABLE AS SELECT") { + testVersion(s"$version: CREATE Partitioned TABLE AS SELECT") { withTable("tbl") { versionSpark.sql( """ @@ -678,7 +691,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: Delete the temporary staging directory and files after each insert") { + testVersion(s"$version: Delete the temporary staging directory and files after each insert") { withTempDir { tmpDir => withTable("tab") { versionSpark.sql( @@ -704,7 +717,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: SPARK-13709: reading partitioned Avro table with nested schema") { + testVersion(s"$version: SPARK-13709: reading partitioned Avro table with nested schema") { withTempDir { dir => val path = dir.toURI.toString val tableName = "spark_13709" @@ -781,7 +794,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: CTAS for managed data source tables") { + testVersion(s"$version: CTAS for managed data source tables") { withTable("t", "t1") { versionSpark.range(1).write.saveAsTable("t") assert(versionSpark.table("t").collect() === Array(Row(0))) @@ -790,7 +803,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: Decimal support of Avro Hive serde") { + testVersion(s"$version: Decimal support of Avro Hive serde") { val tableName = "tab1" // TODO: add the other logical types. For details, see the link: // https://avro.apache.org/docs/1.8.1/spec.html#Logical+Types @@ -856,7 +869,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: read avro file containing decimal") { + testVersion(s"$version: read avro file containing decimal") { val url = Thread.currentThread().getContextClassLoader.getResource("avroDecimal") val location = new File(url.getFile).toURI.toString @@ -897,7 +910,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } } - test(s"$version: SPARK-17920: Insert into/overwrite avro table") { + testVersion(s"$version: SPARK-17920: Insert into/overwrite avro table") { // skipped because it's failed in the condition on Windows assume(!(Utils.isWindows && version == "0.12")) withTempDir { dir =>