diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 5ad2caba07fc..2981e391c043 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.internal.Logging -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -257,8 +257,20 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } // The inferred schema may have different field names as the table schema, we should respect // it, but also respect the exprId in table relation output. - assert(result.output.length == relation.output.length && - result.output.zip(relation.output).forall { case (a1, a2) => a1.dataType == a2.dataType }) + if (result.output.length != relation.output.length) { + throw new AnalysisException( + s"Converted table has ${result.output.length} columns, " + + s"but source Hive table has ${relation.output.length} columns. " + + s"Set ${HiveUtils.CONVERT_METASTORE_PARQUET.key} to false, " + + s"or recreate table ${relation.tableMeta.identifier} to workaround.") + } + if (!result.output.zip(relation.output).forall { + case (a1, a2) => a1.dataType == a2.dataType }) { + throw new AnalysisException( + s"Column in converted table has different data type with source Hive table's. " + + s"Set ${HiveUtils.CONVERT_METASTORE_PARQUET.key} to false, " + + s"or recreate table ${relation.tableMeta.identifier} to workaround.") + } val newOutput = result.output.zip(relation.output).map { case (a1, a2) => a1.withExprId(a2.exprId) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 0e45e18c4b17..9f2906df19bf 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive -import org.apache.spark.sql.{QueryTest, Row, SaveMode} +import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.CatalogTableType import org.apache.spark.sql.catalyst.parser.CatalystSqlParser @@ -358,4 +358,24 @@ class DataSourceWithHiveMetastoreCatalogSuite Seq(table("src").count().toString)) } } + + test("SPARK-29869: Fix convertToLogicalRelation throws unclear AssertionError") { + withTempPath(dir => { + val baseDir = s"${dir.getCanonicalFile.toURI.toString}/non_partition_table" + val partitionLikeDir = s"$baseDir/dt=20191113" + spark.range(3).selectExpr("id").write.parquet(partitionLikeDir) + withTable("non_partition_table") { + withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "true") { + spark.sql( + s""" + |CREATE TABLE non_partition_table (id bigint) + |STORED AS PARQUET LOCATION '$baseDir' + |""".stripMargin) + val e = intercept[AnalysisException]( + spark.table("non_partition_table")).getMessage + assert(e.contains("Converted table has 2 columns, but source Hive table has 1 columns.")) + } + } + }) + } }