From 0a479a3e9656138269b68aab3fb5370695276009 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 24 Nov 2021 18:22:03 +0800 Subject: [PATCH 1/6] [SPARK-37452][SQL] Char and Varchar breaks backward compatibility between v3 and v2 --- .../spark/sql/hive/HiveExternalCatalog.scala | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 179b424fefb26..ebd87ad419ee8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive import java.io.IOException import java.lang.reflect.InvocationTargetException + import java.util import java.util.Locale @@ -31,8 +32,8 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.DDL_TIME import org.apache.hadoop.hive.ql.metadata.HiveException import org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT import org.apache.thrift.TException - import org.apache.spark.{SparkConf, SparkException} + import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier @@ -40,7 +41,7 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils} import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOptions} import org.apache.spark.sql.hive.client.HiveClient @@ -436,8 +437,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val properties = new mutable.HashMap[String, String] properties.put(CREATED_SPARK_VERSION, table.createVersion) + // This is for backward compatibility to Spark 2 to read tables with char/varchar created by + // Spark 3.1. At read side, we will restore a table schema from its properties. So, we meed to + // clear the `varchar(n)` and char(n)` and replace them with `string` as Spark 2 does not have + // a type mapping for them in `DataType.nameToType`. + // See `restoreHiveSerdeTable` for example. + val newSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(schema) CatalogTable.splitLargeTableProp( - DATASOURCE_SCHEMA, schema.json, properties.put, conf.get(SCHEMA_STRING_LENGTH_THRESHOLD)) + DATASOURCE_SCHEMA, + newSchema.json, + properties.put, + conf.get(SCHEMA_STRING_LENGTH_THRESHOLD)) if (partitionColumns.nonEmpty) { properties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString) From 19342859cbbb8257d269478fb7572aa66390edcb Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 24 Nov 2021 18:28:58 +0800 Subject: [PATCH 2/6] [SPARK-37452][SQL] Char and Varchar breaks backward compatibility between v3 and v2 --- .../scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index ebd87ad419ee8..f2107122e028b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.hive import java.io.IOException import java.lang.reflect.InvocationTargetException - import java.util import java.util.Locale @@ -32,8 +31,8 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.DDL_TIME import org.apache.hadoop.hive.ql.metadata.HiveException import org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT import org.apache.thrift.TException -import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier From 6e29c558d39ac9c5e52b5e4eb24dab477daf80c3 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 24 Nov 2021 20:13:20 +0800 Subject: [PATCH 3/6] read path: restore raw schema from json --- .../apache/spark/sql/hive/HiveExternalCatalog.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index f2107122e028b..19b6c8390ec9b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -752,7 +752,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // If this is a view created by Spark 2.2 or higher versions, we should restore its schema // from table properties. CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA).foreach { schemaJson => - table = table.copy(schema = DataType.fromJson(schemaJson).asInstanceOf[StructType]) + val rawSchema = CharVarcharUtils.getRawSchema( + DataType.fromJson(schemaJson).asInstanceOf[StructType]) + table = table.copy(schema = rawSchema) } // No provider in table properties, which means this is a Hive serde table. @@ -805,8 +807,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val schemaJson = CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA) if (schemaJson.isDefined) { val schemaFromTableProps = DataType.fromJson(schemaJson.get).asInstanceOf[StructType] + val rawSchema = CharVarcharUtils.getRawSchema(schemaFromTableProps) val partColumnNames = getPartitionColumnsFromTableProperties(table) - val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames) + val reorderedSchema = reorderSchema(schema = rawSchema, partColumnNames) if (DataType.equalsIgnoreCaseAndNullability(reorderedSchema, table.schema) || options.respectSparkSchema) { @@ -845,7 +848,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER) val schemaFromTableProps = CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA) - .map(json => DataType.fromJson(json).asInstanceOf[StructType]).getOrElse(new StructType()) + .map { json => + CharVarcharUtils.getRawSchema(DataType.fromJson(json).asInstanceOf[StructType]) + }.getOrElse(new StructType()) val partColumnNames = getPartitionColumnsFromTableProperties(table) val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames) From 799a761b8a48673508d838a8b0d3247dcebfd032 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 24 Nov 2021 20:14:17 +0800 Subject: [PATCH 4/6] nit --- .../scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 19b6c8390ec9b..2d8015257262b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -438,7 +438,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat properties.put(CREATED_SPARK_VERSION, table.createVersion) // This is for backward compatibility to Spark 2 to read tables with char/varchar created by // Spark 3.1. At read side, we will restore a table schema from its properties. So, we meed to - // clear the `varchar(n)` and char(n)` and replace them with `string` as Spark 2 does not have + // clear the `varchar(n)` and `char(n)` and replace them with `string` as Spark 2 does not have // a type mapping for them in `DataType.nameToType`. // See `restoreHiveSerdeTable` for example. val newSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(schema) From 03343099d1b8f78dd49fdc0c101411d33b37754b Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 25 Nov 2021 10:12:23 +0800 Subject: [PATCH 5/6] nit --- .../spark/sql/hive/HiveExternalCatalog.scala | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 2d8015257262b..da003229c6f03 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -437,7 +437,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat properties.put(CREATED_SPARK_VERSION, table.createVersion) // This is for backward compatibility to Spark 2 to read tables with char/varchar created by - // Spark 3.1. At read side, we will restore a table schema from its properties. So, we meed to + // Spark 3.1. At read side, we will restore a table schema from its properties. So, we need to // clear the `varchar(n)` and `char(n)` and replace them with `string` as Spark 2 does not have // a type mapping for them in `DataType.nameToType`. // See `restoreHiveSerdeTable` for example. @@ -751,10 +751,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat case None if table.tableType == VIEW => // If this is a view created by Spark 2.2 or higher versions, we should restore its schema // from table properties. - CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA).foreach { schemaJson => - val rawSchema = CharVarcharUtils.getRawSchema( - DataType.fromJson(schemaJson).asInstanceOf[StructType]) - table = table.copy(schema = rawSchema) + getSchemaFromTableProperties(table.properties).foreach { schemaFromTableProps => + table = table.copy(schema = schemaFromTableProps) } // No provider in table properties, which means this is a Hive serde table. @@ -804,12 +802,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // If this is a Hive serde table created by Spark 2.1 or higher versions, we should restore its // schema from table properties. - val schemaJson = CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA) - if (schemaJson.isDefined) { - val schemaFromTableProps = DataType.fromJson(schemaJson.get).asInstanceOf[StructType] - val rawSchema = CharVarcharUtils.getRawSchema(schemaFromTableProps) + val maybeSchemaFromTableProps = getSchemaFromTableProperties(table.properties) + if (maybeSchemaFromTableProps.isDefined) { val partColumnNames = getPartitionColumnsFromTableProperties(table) - val reorderedSchema = reorderSchema(schema = rawSchema, partColumnNames) + val reorderedSchema = reorderSchema(schema = maybeSchemaFromTableProps.get, partColumnNames) if (DataType.equalsIgnoreCaseAndNullability(reorderedSchema, table.schema) || options.respectSparkSchema) { @@ -833,6 +829,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } } + private def getSchemaFromTableProperties( + tableProperties: Map[String, String]): Option[StructType] = { + CatalogTable.readLargeTableProp(tableProperties, DATASOURCE_SCHEMA).map { schemaJson => + val parsed = DataType.fromJson(schemaJson).asInstanceOf[StructType] + CharVarcharUtils.getRawSchema(parsed) + } + } + private def restoreDataSourceTable(table: CatalogTable, provider: String): CatalogTable = { // Internally we store the table location in storage properties with key "path" for data // source tables. Here we set the table location to `locationUri` field and filter out the @@ -847,10 +851,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat storageWithLocation.properties.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap) val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER) - val schemaFromTableProps = CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA) - .map { json => - CharVarcharUtils.getRawSchema(DataType.fromJson(json).asInstanceOf[StructType]) - }.getOrElse(new StructType()) + val schemaFromTableProps = + getSchemaFromTableProperties(table.properties).getOrElse(new StructType()) val partColumnNames = getPartitionColumnsFromTableProperties(table) val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames) From d3a284fc82bdda95b46970976f07b5144e599017 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 25 Nov 2021 10:58:54 +0800 Subject: [PATCH 6/6] fix build --- .../scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index da003229c6f03..5fccce2678f86 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -804,8 +804,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // schema from table properties. val maybeSchemaFromTableProps = getSchemaFromTableProperties(table.properties) if (maybeSchemaFromTableProps.isDefined) { + val schemaFromTableProps = maybeSchemaFromTableProps.get val partColumnNames = getPartitionColumnsFromTableProperties(table) - val reorderedSchema = reorderSchema(schema = maybeSchemaFromTableProps.get, partColumnNames) + val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames) if (DataType.equalsIgnoreCaseAndNullability(reorderedSchema, table.schema) || options.respectSparkSchema) {