Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils}
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOptions}
import org.apache.spark.sql.hive.client.HiveClient
Expand Down Expand Up @@ -436,8 +436,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val properties = new mutable.HashMap[String, String]

properties.put(CREATED_SPARK_VERSION, table.createVersion)
// This is for backward compatibility to Spark 2 to read tables with char/varchar created by
// Spark 3.1. At read side, we will restore a table schema from its properties. So, we need to
// clear the `varchar(n)` and `char(n)` and replace them with `string` as Spark 2 does not have
// a type mapping for them in `DataType.nameToType`.
// See `restoreHiveSerdeTable` for example.
val newSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(schema)
CatalogTable.splitLargeTableProp(
DATASOURCE_SCHEMA, schema.json, properties.put, conf.get(SCHEMA_STRING_LENGTH_THRESHOLD))
DATASOURCE_SCHEMA,
newSchema.json,
properties.put,
conf.get(SCHEMA_STRING_LENGTH_THRESHOLD))

if (partitionColumns.nonEmpty) {
properties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString)
Expand Down Expand Up @@ -742,8 +751,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
case None if table.tableType == VIEW =>
// If this is a view created by Spark 2.2 or higher versions, we should restore its schema
// from table properties.
CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA).foreach { schemaJson =>
table = table.copy(schema = DataType.fromJson(schemaJson).asInstanceOf[StructType])
getSchemaFromTableProperties(table.properties).foreach { schemaFromTableProps =>
table = table.copy(schema = schemaFromTableProps)
}

// No provider in table properties, which means this is a Hive serde table.
Expand Down Expand Up @@ -793,9 +802,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat

// If this is a Hive serde table created by Spark 2.1 or higher versions, we should restore its
// schema from table properties.
val schemaJson = CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA)
if (schemaJson.isDefined) {
val schemaFromTableProps = DataType.fromJson(schemaJson.get).asInstanceOf[StructType]
val maybeSchemaFromTableProps = getSchemaFromTableProperties(table.properties)
if (maybeSchemaFromTableProps.isDefined) {
val schemaFromTableProps = maybeSchemaFromTableProps.get
val partColumnNames = getPartitionColumnsFromTableProperties(table)
val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)

Expand All @@ -821,6 +830,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
}

private def getSchemaFromTableProperties(
tableProperties: Map[String, String]): Option[StructType] = {
CatalogTable.readLargeTableProp(tableProperties, DATASOURCE_SCHEMA).map { schemaJson =>
val parsed = DataType.fromJson(schemaJson).asInstanceOf[StructType]
CharVarcharUtils.getRawSchema(parsed)
}
}

private def restoreDataSourceTable(table: CatalogTable, provider: String): CatalogTable = {
// Internally we store the table location in storage properties with key "path" for data
// source tables. Here we set the table location to `locationUri` field and filter out the
Expand All @@ -835,8 +852,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
storageWithLocation.properties.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap)
val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)

val schemaFromTableProps = CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA)
.map(json => DataType.fromJson(json).asInstanceOf[StructType]).getOrElse(new StructType())
val schemaFromTableProps =
getSchemaFromTableProperties(table.properties).getOrElse(new StructType())
val partColumnNames = getPartitionColumnsFromTableProperties(table)
val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)

Expand Down