From 7178ccfbd726149354ad2f9544ed978537efe70f Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 7 Sep 2017 23:21:49 -0700 Subject: [PATCH] [SPARK-21936][SQL] backward compatibility test framework for HiveExternalCatalog `HiveExternalCatalog` is a semi-public interface. When creating tables, `HiveExternalCatalog` converts the table metadata to hive table format and save into hive metastore. It's very import to guarantee backward compatibility here, i.e., tables created by previous Spark versions should still be readable in newer Spark versions. Previously we find backward compatibility issues manually, which is really easy to miss bugs. This PR introduces a test framework to automatically test `HiveExternalCatalog` backward compatibility, by downloading Spark binaries with different versions, and create tables with these Spark versions, and read these tables with current Spark version. test-only change Author: Wenchen Fan Closes #19148 from cloud-fan/test. --- sql/hive/pom.xml | 4 + ...nalCatalogBackwardCompatibilitySuite.scala | 264 ------------------ .../HiveExternalCatalogVersionsSuite.scala | 194 +++++++++++++ .../spark/sql/hive/HiveSparkSubmitSuite.scala | 77 +---- .../sql/hive/MetastoreDataSourcesSuite.scala | 27 -- .../spark/sql/hive/SparkSubmitTestUtils.scala | 101 +++++++ 6 files changed, 301 insertions(+), 366 deletions(-) delete mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index 3dca866307232..616f7cd2bc490 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -162,6 +162,10 @@ org.apache.thrift libfb303 + + org.apache.derby + derby + org.scalacheck scalacheck_${scala.binary.version} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala deleted file mode 100644 index 705d43f1f3aba..0000000000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import java.net.URI - -import org.apache.hadoop.fs.Path -import org.scalatest.BeforeAndAfterEach - -import org.apache.spark.sql.QueryTest -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} -import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.hive.test.TestHiveSingleton -import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.sql.types.StructType -import org.apache.spark.util.Utils - - -class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest - with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach { - - // To test `HiveExternalCatalog`, we need to read/write the raw table meta from/to hive client. - val hiveClient: HiveClient = - spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client - - val tempDir = Utils.createTempDir().getCanonicalFile - val tempDirUri = tempDir.toURI - val tempDirStr = tempDir.getAbsolutePath - - override def beforeEach(): Unit = { - sql("CREATE DATABASE test_db") - for ((tbl, _) <- rawTablesAndExpectations) { - hiveClient.createTable(tbl, ignoreIfExists = false) - } - } - - override def afterEach(): Unit = { - Utils.deleteRecursively(tempDir) - hiveClient.dropDatabase("test_db", ignoreIfNotExists = false, cascade = true) - } - - private def getTableMetadata(tableName: String): CatalogTable = { - spark.sharedState.externalCatalog.getTable("test_db", tableName) - } - - private def defaultTableURI(tableName: String): URI = { - spark.sessionState.catalog.defaultTablePath(TableIdentifier(tableName, Some("test_db"))) - } - - // Raw table metadata that are dumped from tables created by Spark 2.0. Note that, all spark - // versions prior to 2.1 would generate almost same raw table metadata for a specific table. - val simpleSchema = new StructType().add("i", "int") - val partitionedSchema = new StructType().add("i", "int").add("j", "int") - - lazy val hiveTable = CatalogTable( - identifier = TableIdentifier("tbl1", Some("test_db")), - tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat.empty.copy( - inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"), - outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")), - schema = simpleSchema) - - lazy val externalHiveTable = CatalogTable( - identifier = TableIdentifier("tbl2", Some("test_db")), - tableType = CatalogTableType.EXTERNAL, - storage = CatalogStorageFormat.empty.copy( - locationUri = Some(tempDirUri), - inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"), - outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")), - schema = simpleSchema) - - lazy val partitionedHiveTable = CatalogTable( - identifier = TableIdentifier("tbl3", Some("test_db")), - tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat.empty.copy( - inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"), - outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")), - schema = partitionedSchema, - partitionColumnNames = Seq("j")) - - - val simpleSchemaJson = - """ - |{ - | "type": "struct", - | "fields": [{ - | "name": "i", - | "type": "integer", - | "nullable": true, - | "metadata": {} - | }] - |} - """.stripMargin - - val partitionedSchemaJson = - """ - |{ - | "type": "struct", - | "fields": [{ - | "name": "i", - | "type": "integer", - | "nullable": true, - | "metadata": {} - | }, - | { - | "name": "j", - | "type": "integer", - | "nullable": true, - | "metadata": {} - | }] - |} - """.stripMargin - - lazy val dataSourceTable = CatalogTable( - identifier = TableIdentifier("tbl4", Some("test_db")), - tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat.empty.copy( - properties = Map("path" -> defaultTableURI("tbl4").toString)), - schema = new StructType(), - provider = Some("json"), - properties = Map( - "spark.sql.sources.provider" -> "json", - "spark.sql.sources.schema.numParts" -> "1", - "spark.sql.sources.schema.part.0" -> simpleSchemaJson)) - - lazy val hiveCompatibleDataSourceTable = CatalogTable( - identifier = TableIdentifier("tbl5", Some("test_db")), - tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat.empty.copy( - properties = Map("path" -> defaultTableURI("tbl5").toString)), - schema = simpleSchema, - provider = Some("parquet"), - properties = Map( - "spark.sql.sources.provider" -> "parquet", - "spark.sql.sources.schema.numParts" -> "1", - "spark.sql.sources.schema.part.0" -> simpleSchemaJson)) - - lazy val partitionedDataSourceTable = CatalogTable( - identifier = TableIdentifier("tbl6", Some("test_db")), - tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat.empty.copy( - properties = Map("path" -> defaultTableURI("tbl6").toString)), - schema = new StructType(), - provider = Some("json"), - properties = Map( - "spark.sql.sources.provider" -> "json", - "spark.sql.sources.schema.numParts" -> "1", - "spark.sql.sources.schema.part.0" -> partitionedSchemaJson, - "spark.sql.sources.schema.numPartCols" -> "1", - "spark.sql.sources.schema.partCol.0" -> "j")) - - lazy val externalDataSourceTable = CatalogTable( - identifier = TableIdentifier("tbl7", Some("test_db")), - tableType = CatalogTableType.EXTERNAL, - storage = CatalogStorageFormat.empty.copy( - locationUri = Some(new URI(defaultTableURI("tbl7") + "-__PLACEHOLDER__")), - properties = Map("path" -> tempDirStr)), - schema = new StructType(), - provider = Some("json"), - properties = Map( - "spark.sql.sources.provider" -> "json", - "spark.sql.sources.schema.numParts" -> "1", - "spark.sql.sources.schema.part.0" -> simpleSchemaJson)) - - lazy val hiveCompatibleExternalDataSourceTable = CatalogTable( - identifier = TableIdentifier("tbl8", Some("test_db")), - tableType = CatalogTableType.EXTERNAL, - storage = CatalogStorageFormat.empty.copy( - locationUri = Some(tempDirUri), - properties = Map("path" -> tempDirStr)), - schema = simpleSchema, - properties = Map( - "spark.sql.sources.provider" -> "parquet", - "spark.sql.sources.schema.numParts" -> "1", - "spark.sql.sources.schema.part.0" -> simpleSchemaJson)) - - lazy val dataSourceTableWithoutSchema = CatalogTable( - identifier = TableIdentifier("tbl9", Some("test_db")), - tableType = CatalogTableType.EXTERNAL, - storage = CatalogStorageFormat.empty.copy( - locationUri = Some(new URI(defaultTableURI("tbl9") + "-__PLACEHOLDER__")), - properties = Map("path" -> tempDirStr)), - schema = new StructType(), - provider = Some("json"), - properties = Map("spark.sql.sources.provider" -> "json")) - - // A list of all raw tables we want to test, with their expected schema. - lazy val rawTablesAndExpectations = Seq( - hiveTable -> simpleSchema, - externalHiveTable -> simpleSchema, - partitionedHiveTable -> partitionedSchema, - dataSourceTable -> simpleSchema, - hiveCompatibleDataSourceTable -> simpleSchema, - partitionedDataSourceTable -> partitionedSchema, - externalDataSourceTable -> simpleSchema, - hiveCompatibleExternalDataSourceTable -> simpleSchema, - dataSourceTableWithoutSchema -> new StructType()) - - test("make sure we can read table created by old version of Spark") { - for ((tbl, expectedSchema) <- rawTablesAndExpectations) { - val readBack = getTableMetadata(tbl.identifier.table) - assert(readBack.schema.sameType(expectedSchema)) - - if (tbl.tableType == CatalogTableType.EXTERNAL) { - // trim the URI prefix - val tableLocation = readBack.storage.locationUri.get.getPath - val expectedLocation = tempDir.toURI.getPath.stripSuffix("/") - assert(tableLocation == expectedLocation) - } - } - } - - test("make sure we can alter table location created by old version of Spark") { - withTempDir { dir => - for ((tbl, _) <- rawTablesAndExpectations if tbl.tableType == CatalogTableType.EXTERNAL) { - val path = dir.toURI.toString.stripSuffix("/") - sql(s"ALTER TABLE ${tbl.identifier} SET LOCATION '$path'") - - val readBack = getTableMetadata(tbl.identifier.table) - - // trim the URI prefix - val actualTableLocation = readBack.storage.locationUri.get.getPath - val expected = dir.toURI.getPath.stripSuffix("/") - assert(actualTableLocation == expected) - } - } - } - - test("make sure we can rename table created by old version of Spark") { - for ((tbl, expectedSchema) <- rawTablesAndExpectations) { - val newName = tbl.identifier.table + "_renamed" - sql(s"ALTER TABLE ${tbl.identifier} RENAME TO $newName") - - val readBack = getTableMetadata(newName) - assert(readBack.schema.sameType(expectedSchema)) - - // trim the URI prefix - val actualTableLocation = readBack.storage.locationUri.get.getPath - val expectedLocation = if (tbl.tableType == CatalogTableType.EXTERNAL) { - tempDir.toURI.getPath.stripSuffix("/") - } else { - // trim the URI prefix - defaultTableURI(newName).getPath - } - assert(actualTableLocation == expectedLocation) - } - } -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala new file mode 100644 index 0000000000000..2928a734a7e36 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.io.File +import java.nio.file.Files + +import org.apache.spark.TestUtils +import org.apache.spark.sql.{QueryTest, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.util.Utils + +/** + * Test HiveExternalCatalog backward compatibility. + * + * Note that, this test suite will automatically download spark binary packages of different + * versions to a local directory `/tmp/spark-test`. If there is already a spark folder with + * expected version under this local directory, e.g. `/tmp/spark-test/spark-2.0.3`, we will skip the + * downloading for this spark version. + */ +class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils { + private val wareHousePath = Utils.createTempDir(namePrefix = "warehouse") + private val tmpDataDir = Utils.createTempDir(namePrefix = "test-data") + private val sparkTestingDir = "/tmp/spark-test" + private val unusedJar = TestUtils.createJarWithClasses(Seq.empty) + + override def afterAll(): Unit = { + Utils.deleteRecursively(wareHousePath) + Utils.deleteRecursively(tmpDataDir) + super.afterAll() + } + + private def downloadSpark(version: String): Unit = { + import scala.sys.process._ + + val url = s"https://d3kbcqa49mib13.cloudfront.net/spark-$version-bin-hadoop2.7.tgz" + + Seq("wget", url, "-q", "-P", sparkTestingDir).! + + val downloaded = new File(sparkTestingDir, s"spark-$version-bin-hadoop2.7.tgz").getCanonicalPath + val targetDir = new File(sparkTestingDir, s"spark-$version").getCanonicalPath + + Seq("mkdir", targetDir).! + + Seq("tar", "-xzf", downloaded, "-C", targetDir, "--strip-components=1").! + + Seq("rm", downloaded).! + } + + private def genDataDir(name: String): String = { + new File(tmpDataDir, name).getCanonicalPath + } + + override def beforeAll(): Unit = { + super.beforeAll() + + val tempPyFile = File.createTempFile("test", ".py") + Files.write(tempPyFile.toPath, + s""" + |from pyspark.sql import SparkSession + | + |spark = SparkSession.builder.enableHiveSupport().getOrCreate() + |version_index = spark.conf.get("spark.sql.test.version.index", None) + | + |spark.sql("create table data_source_tbl_{} using json as select 1 i".format(version_index)) + | + |spark.sql("create table hive_compatible_data_source_tbl_" + version_index + \\ + | " using parquet as select 1 i") + | + |json_file = "${genDataDir("json_")}" + str(version_index) + |spark.range(1, 2).selectExpr("cast(id as int) as i").write.json(json_file) + |spark.sql("create table external_data_source_tbl_" + version_index + \\ + | "(i int) using json options (path '{}')".format(json_file)) + | + |parquet_file = "${genDataDir("parquet_")}" + str(version_index) + |spark.range(1, 2).selectExpr("cast(id as int) as i").write.parquet(parquet_file) + |spark.sql("create table hive_compatible_external_data_source_tbl_" + version_index + \\ + | "(i int) using parquet options (path '{}')".format(parquet_file)) + | + |json_file2 = "${genDataDir("json2_")}" + str(version_index) + |spark.range(1, 2).selectExpr("cast(id as int) as i").write.json(json_file2) + |spark.sql("create table external_table_without_schema_" + version_index + \\ + | " using json options (path '{}')".format(json_file2)) + | + |spark.sql("create view v_{} as select 1 i".format(version_index)) + """.stripMargin.getBytes("utf8")) + + PROCESS_TABLES.testingVersions.zipWithIndex.foreach { case (version, index) => + val sparkHome = new File(sparkTestingDir, s"spark-$version") + if (!sparkHome.exists()) { + downloadSpark(version) + } + + val args = Seq( + "--name", "prepare testing tables", + "--master", "local[2]", + "--conf", "spark.ui.enabled=false", + "--conf", "spark.master.rest.enabled=false", + "--conf", s"spark.sql.warehouse.dir=${wareHousePath.getCanonicalPath}", + "--conf", s"spark.sql.test.version.index=$index", + "--driver-java-options", s"-Dderby.system.home=${wareHousePath.getCanonicalPath}", + tempPyFile.getCanonicalPath) + runSparkSubmit(args, Some(sparkHome.getCanonicalPath)) + } + + tempPyFile.delete() + } + + test("backward compatibility") { + val args = Seq( + "--class", PROCESS_TABLES.getClass.getName.stripSuffix("$"), + "--name", "HiveExternalCatalog backward compatibility test", + "--master", "local[2]", + "--conf", "spark.ui.enabled=false", + "--conf", "spark.master.rest.enabled=false", + "--conf", s"spark.sql.warehouse.dir=${wareHousePath.getCanonicalPath}", + "--driver-java-options", s"-Dderby.system.home=${wareHousePath.getCanonicalPath}", + unusedJar.toString) + runSparkSubmit(args) + } +} + +object PROCESS_TABLES extends QueryTest with SQLTestUtils { + // Tests the latest version of every release line. + val testingVersions = Seq("2.0.2", "2.1.1", "2.2.0") + + protected var spark: SparkSession = _ + + def main(args: Array[String]): Unit = { + val session = SparkSession.builder() + .enableHiveSupport() + .getOrCreate() + spark = session + + testingVersions.indices.foreach { index => + Seq( + s"data_source_tbl_$index", + s"hive_compatible_data_source_tbl_$index", + s"external_data_source_tbl_$index", + s"hive_compatible_external_data_source_tbl_$index", + s"external_table_without_schema_$index").foreach { tbl => + val tableMeta = spark.sharedState.externalCatalog.getTable("default", tbl) + + // make sure we can insert and query these tables. + session.sql(s"insert into $tbl select 2") + checkAnswer(session.sql(s"select * from $tbl"), Row(1) :: Row(2) :: Nil) + checkAnswer(session.sql(s"select i from $tbl where i > 1"), Row(2)) + + // make sure we can rename table. + val newName = tbl + "_renamed" + sql(s"ALTER TABLE $tbl RENAME TO $newName") + val readBack = spark.sharedState.externalCatalog.getTable("default", newName) + + val actualTableLocation = readBack.storage.locationUri.get.getPath + val expectedLocation = if (tableMeta.tableType == CatalogTableType.EXTERNAL) { + tableMeta.storage.locationUri.get.getPath + } else { + spark.sessionState.catalog.defaultTablePath(TableIdentifier(newName, None)).getPath + } + assert(actualTableLocation == expectedLocation) + + // make sure we can alter table location. + withTempDir { dir => + val path = dir.toURI.toString.stripSuffix("/") + sql(s"ALTER TABLE ${tbl}_renamed SET LOCATION '$path'") + val readBack = spark.sharedState.externalCatalog.getTable("default", tbl + "_renamed") + val actualTableLocation = readBack.storage.locationUri.get.getPath + val expected = dir.toURI.getPath.stripSuffix("/") + assert(actualTableLocation == expected) + } + } + + // test permanent view + checkAnswer(sql(s"select i from v_$index"), Row(1)) + } + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index 5f15a705a2e99..cf145c845eef0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -18,17 +18,11 @@ package org.apache.spark.sql.hive import java.io.{BufferedWriter, File, FileWriter} -import java.sql.Timestamp -import java.util.Date -import scala.collection.mutable.ArrayBuffer import scala.tools.nsc.Properties import org.apache.hadoop.fs.Path import org.scalatest.{BeforeAndAfterEach, Matchers} -import org.scalatest.concurrent.Timeouts -import org.scalatest.exceptions.TestFailedDueToTimeoutException -import org.scalatest.time.SpanSugar._ import org.apache.spark._ import org.apache.spark.internal.Logging @@ -38,7 +32,6 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext} -import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer import org.apache.spark.sql.types.{DecimalType, StructType} import org.apache.spark.util.{ResetSystemProperties, Utils} @@ -46,11 +39,10 @@ import org.apache.spark.util.{ResetSystemProperties, Utils} * This suite tests spark-submit with applications using HiveContext. */ class HiveSparkSubmitSuite - extends SparkFunSuite + extends SparkSubmitTestUtils with Matchers with BeforeAndAfterEach - with ResetSystemProperties - with Timeouts { + with ResetSystemProperties { // TODO: rewrite these or mark them as slow tests to be run sparingly @@ -333,71 +325,6 @@ class HiveSparkSubmitSuite unusedJar.toString) runSparkSubmit(argsForShowTables) } - - // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. - // This is copied from org.apache.spark.deploy.SparkSubmitSuite - private def runSparkSubmit(args: Seq[String]): Unit = { - val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) - val history = ArrayBuffer.empty[String] - val sparkSubmit = if (Utils.isWindows) { - // On Windows, `ProcessBuilder.directory` does not change the current working directory. - new File("..\\..\\bin\\spark-submit.cmd").getAbsolutePath - } else { - "./bin/spark-submit" - } - val commands = Seq(sparkSubmit) ++ args - val commandLine = commands.mkString("'", "' '", "'") - - val builder = new ProcessBuilder(commands: _*).directory(new File(sparkHome)) - val env = builder.environment() - env.put("SPARK_TESTING", "1") - env.put("SPARK_HOME", sparkHome) - - def captureOutput(source: String)(line: String): Unit = { - // This test suite has some weird behaviors when executed on Jenkins: - // - // 1. Sometimes it gets extremely slow out of unknown reason on Jenkins. Here we add a - // timestamp to provide more diagnosis information. - // 2. Log lines are not correctly redirected to unit-tests.log as expected, so here we print - // them out for debugging purposes. - val logLine = s"${new Timestamp(new Date().getTime)} - $source> $line" - // scalastyle:off println - println(logLine) - // scalastyle:on println - history += logLine - } - - val process = builder.start() - new ProcessOutputCapturer(process.getInputStream, captureOutput("stdout")).start() - new ProcessOutputCapturer(process.getErrorStream, captureOutput("stderr")).start() - - try { - val exitCode = failAfter(300.seconds) { process.waitFor() } - if (exitCode != 0) { - // include logs in output. Note that logging is async and may not have completed - // at the time this exception is raised - Thread.sleep(1000) - val historyLog = history.mkString("\n") - fail { - s"""spark-submit returned with exit code $exitCode. - |Command line: $commandLine - | - |$historyLog - """.stripMargin - } - } - } catch { - case to: TestFailedDueToTimeoutException => - val historyLog = history.mkString("\n") - fail(s"Timeout of $commandLine" + - s" See the log4j logs for more detail." + - s"\n$historyLog", to) - case t: Throwable => throw t - } finally { - // Ensure we still kill the process in case it timed out - process.destroy() - } - } } object SetMetastoreURLTest extends Logging { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 06a30b726549e..07d641d72e709 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -1359,31 +1359,4 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv sparkSession.sparkContext.conf.set(DEBUG_MODE, previousValue) } } - - test("SPARK-18464: support old table which doesn't store schema in table properties") { - withTable("old") { - withTempPath { path => - Seq(1 -> "a").toDF("i", "j").write.parquet(path.getAbsolutePath) - val tableDesc = CatalogTable( - identifier = TableIdentifier("old", Some("default")), - tableType = CatalogTableType.EXTERNAL, - storage = CatalogStorageFormat.empty.copy( - properties = Map("path" -> path.getAbsolutePath) - ), - schema = new StructType(), - provider = Some("parquet"), - properties = Map( - HiveExternalCatalog.DATASOURCE_PROVIDER -> "parquet")) - hiveClient.createTable(tableDesc, ignoreIfExists = false) - - checkAnswer(spark.table("old"), Row(1, "a")) - checkAnswer(sql("select * from old"), Row(1, "a")) - - val expectedSchema = StructType(Seq( - StructField("i", IntegerType, nullable = true), - StructField("j", StringType, nullable = true))) - assert(table("old").schema === expectedSchema) - } - } - } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala new file mode 100644 index 0000000000000..4b28d4f362b80 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.io.File +import java.sql.Timestamp +import java.util.Date + +import scala.collection.mutable.ArrayBuffer + +import org.scalatest.concurrent.Timeouts +import org.scalatest.exceptions.TestFailedDueToTimeoutException +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer +import org.apache.spark.util.Utils + +trait SparkSubmitTestUtils extends SparkFunSuite with Timeouts { + + // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. + // This is copied from org.apache.spark.deploy.SparkSubmitSuite + protected def runSparkSubmit(args: Seq[String], sparkHomeOpt: Option[String] = None): Unit = { + val sparkHome = sparkHomeOpt.getOrElse( + sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))) + val history = ArrayBuffer.empty[String] + val sparkSubmit = if (Utils.isWindows) { + // On Windows, `ProcessBuilder.directory` does not change the current working directory. + new File("..\\..\\bin\\spark-submit.cmd").getAbsolutePath + } else { + "./bin/spark-submit" + } + val commands = Seq(sparkSubmit) ++ args + val commandLine = commands.mkString("'", "' '", "'") + + val builder = new ProcessBuilder(commands: _*).directory(new File(sparkHome)) + val env = builder.environment() + env.put("SPARK_TESTING", "1") + env.put("SPARK_HOME", sparkHome) + + def captureOutput(source: String)(line: String): Unit = { + // This test suite has some weird behaviors when executed on Jenkins: + // + // 1. Sometimes it gets extremely slow out of unknown reason on Jenkins. Here we add a + // timestamp to provide more diagnosis information. + // 2. Log lines are not correctly redirected to unit-tests.log as expected, so here we print + // them out for debugging purposes. + val logLine = s"${new Timestamp(new Date().getTime)} - $source> $line" + // scalastyle:off println + println(logLine) + // scalastyle:on println + history += logLine + } + + val process = builder.start() + new ProcessOutputCapturer(process.getInputStream, captureOutput("stdout")).start() + new ProcessOutputCapturer(process.getErrorStream, captureOutput("stderr")).start() + + try { + val exitCode = failAfter(300.seconds) { process.waitFor() } + if (exitCode != 0) { + // include logs in output. Note that logging is async and may not have completed + // at the time this exception is raised + Thread.sleep(1000) + val historyLog = history.mkString("\n") + fail { + s"""spark-submit returned with exit code $exitCode. + |Command line: $commandLine + | + |$historyLog + """.stripMargin + } + } + } catch { + case to: TestFailedDueToTimeoutException => + val historyLog = history.mkString("\n") + fail(s"Timeout of $commandLine" + + s" See the log4j logs for more detail." + + s"\n$historyLog", to) + case t: Throwable => throw t + } finally { + // Ensure we still kill the process in case it timed out + process.destroy() + } + } +}