diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 179b424fefb26..5fccce2678f86 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils} import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOptions} import org.apache.spark.sql.hive.client.HiveClient @@ -436,8 +436,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val properties = new mutable.HashMap[String, String] properties.put(CREATED_SPARK_VERSION, table.createVersion) + // This is for backward compatibility to Spark 2 to read tables with char/varchar created by + // Spark 3.1. At read side, we will restore a table schema from its properties. So, we need to + // clear the `varchar(n)` and `char(n)` and replace them with `string` as Spark 2 does not have + // a type mapping for them in `DataType.nameToType`. + // See `restoreHiveSerdeTable` for example. + val newSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(schema) CatalogTable.splitLargeTableProp( - DATASOURCE_SCHEMA, schema.json, properties.put, conf.get(SCHEMA_STRING_LENGTH_THRESHOLD)) + DATASOURCE_SCHEMA, + newSchema.json, + properties.put, + conf.get(SCHEMA_STRING_LENGTH_THRESHOLD)) if (partitionColumns.nonEmpty) { properties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString) @@ -742,8 +751,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat case None if table.tableType == VIEW => // If this is a view created by Spark 2.2 or higher versions, we should restore its schema // from table properties. - CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA).foreach { schemaJson => - table = table.copy(schema = DataType.fromJson(schemaJson).asInstanceOf[StructType]) + getSchemaFromTableProperties(table.properties).foreach { schemaFromTableProps => + table = table.copy(schema = schemaFromTableProps) } // No provider in table properties, which means this is a Hive serde table. @@ -793,9 +802,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // If this is a Hive serde table created by Spark 2.1 or higher versions, we should restore its // schema from table properties. - val schemaJson = CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA) - if (schemaJson.isDefined) { - val schemaFromTableProps = DataType.fromJson(schemaJson.get).asInstanceOf[StructType] + val maybeSchemaFromTableProps = getSchemaFromTableProperties(table.properties) + if (maybeSchemaFromTableProps.isDefined) { + val schemaFromTableProps = maybeSchemaFromTableProps.get val partColumnNames = getPartitionColumnsFromTableProperties(table) val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames) @@ -821,6 +830,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } } + private def getSchemaFromTableProperties( + tableProperties: Map[String, String]): Option[StructType] = { + CatalogTable.readLargeTableProp(tableProperties, DATASOURCE_SCHEMA).map { schemaJson => + val parsed = DataType.fromJson(schemaJson).asInstanceOf[StructType] + CharVarcharUtils.getRawSchema(parsed) + } + } + private def restoreDataSourceTable(table: CatalogTable, provider: String): CatalogTable = { // Internally we store the table location in storage properties with key "path" for data // source tables. Here we set the table location to `locationUri` field and filter out the @@ -835,8 +852,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat storageWithLocation.properties.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap) val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER) - val schemaFromTableProps = CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA) - .map(json => DataType.fromJson(json).asInstanceOf[StructType]).getOrElse(new StructType()) + val schemaFromTableProps = + getSchemaFromTableProperties(table.properties).getOrElse(new StructType()) val partColumnNames = getPartitionColumnsFromTableProperties(table) val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)