diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala index c01145f479d99..377d4db083d76 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.catalog +import org.apache.hudi.AvroConversionUtils import org.apache.hudi.HoodieWriterUtils._ import org.apache.hudi.common.config.DFSPropertiesConfiguration import org.apache.hudi.common.model.HoodieTableType @@ -30,7 +31,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlCommonUtils} +import org.apache.spark.sql.hudi.HoodieOptionConfig import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._ import org.apache.spark.sql.types.{StructField, StructType} @@ -62,7 +63,7 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten * hoodie table's location. * if create managed hoodie table, use `catalog.defaultTablePath`. */ - val tableLocation: String = HoodieSqlCommonUtils.getTableLocation(table, spark) + val tableLocation: String = getTableLocation(table, spark) /** * A flag to whether the hoodie table exists. @@ -114,17 +115,27 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten /** * The schema of table. - * Make StructField nullable. + * Make StructField nullable and fill the comments in. */ lazy val tableSchema: StructType = { + val resolver = spark.sessionState.conf.resolver val originSchema = getTableSqlSchema(metaClient, includeMetadataFields = true).getOrElse(table.schema) - StructType(originSchema.map(_.copy(nullable = true))) + val fields = originSchema.fields.map { f => + val nullableField: StructField = f.copy(nullable = true) + val catalogField = findColumnByName(table.schema, nullableField.name, resolver) + if (catalogField.isDefined) { + catalogField.get.getComment().map(nullableField.withComment).getOrElse(nullableField) + } else { + nullableField + } + } + StructType(fields) } /** * The schema without hoodie meta fields */ - lazy val tableSchemaWithoutMetaFields: StructType = HoodieSqlCommonUtils.removeMetaFields(tableSchema) + lazy val tableSchemaWithoutMetaFields: StructType = removeMetaFields(tableSchema) /** * The schema of data fields @@ -136,7 +147,7 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten /** * The schema of data fields not including hoodie meta fields */ - lazy val dataSchemaWithoutMetaFields: StructType = HoodieSqlCommonUtils.removeMetaFields(dataSchema) + lazy val dataSchemaWithoutMetaFields: StructType = removeMetaFields(dataSchema) /** * The schema of partition fields @@ -146,7 +157,7 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten /** * All the partition paths */ - def getAllPartitionPaths: Seq[String] = HoodieSqlCommonUtils.getAllPartitionPaths(spark, table) + def getPartitionPaths: Seq[String] = getAllPartitionPaths(spark, table) /** * Check if table is a partitioned table @@ -171,10 +182,12 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten .setTableCreateSchema(SchemaConverters.toAvroType(finalSchema).toString()) .initTable(hadoopConf, tableLocation) } else { + val (recordName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(table.identifier.table) + val schema = SchemaConverters.toAvroType(finalSchema, false, recordName, namespace) HoodieTableMetaClient.withPropertyBuilder() .fromProperties(properties) .setTableName(table.identifier.table) - .setTableCreateSchema(SchemaConverters.toAvroType(finalSchema).toString()) + .setTableCreateSchema(schema.toString()) .setPartitionFields(table.partitionColumnNames.mkString(",")) .initTable(hadoopConf, tableLocation) } @@ -239,7 +252,7 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten originTableConfig: Map[String, String] = Map.empty): Map[String, String] = { val extraConfig = mutable.Map.empty[String, String] if (isTableExists) { - val allPartitionPaths = getAllPartitionPaths + val allPartitionPaths = getPartitionPaths if (originTableConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)) { extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) = originTableConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala index b5b75a6049beb..4901c0d39117d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path + import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieMetadataConfig} import org.apache.hudi.common.fs.FSUtils @@ -26,9 +27,10 @@ import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.{AvroConversionUtils, SparkAdapterSupport} + import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} @@ -40,6 +42,7 @@ import org.apache.spark.sql.{Column, DataFrame, SparkSession} import java.net.URI import java.text.SimpleDateFormat import java.util.{Locale, Properties} + import scala.collection.JavaConverters._ import scala.collection.immutable.Map @@ -301,4 +304,12 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport { true } } + + // Find the origin column from schema by column name, throw an AnalysisException if the column + // reference is invalid. + def findColumnByName(schema: StructType, name: String, resolver: Resolver):Option[StructField] = { + schema.fields.collectFirst { + case field if resolver(field.name, name) => field + } + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala index 9a965e2a99c8a..befda70680f85 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala @@ -26,6 +26,7 @@ import org.apache.hudi.exception.HoodieException import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._ import org.apache.spark.sql.types.{StructField, StructType} import scala.util.control.NonFatal @@ -40,16 +41,19 @@ case class AlterHoodieTableChangeColumnCommand( extends HoodieLeafRunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { + val resolver = sparkSession.sessionState.conf.resolver val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier) - val resolver = sparkSession.sessionState.conf.resolver - if (!resolver(columnName, newColumn.name)) { - throw new AnalysisException(s"Can not support change column name for hudi table currently.") - } + // Find the origin column from dataSchema by column name. + val originColumn = findColumnByName(hoodieCatalogTable.dataSchema, columnName, resolver).getOrElse( + throw new AnalysisException(s"Can't find column `$columnName` given table data columns " + + s"${hoodieCatalogTable.dataSchema.fieldNames.mkString("[`", "`, `", "`]")}") + ) + // Get the new schema val newTableSchema = StructType( hoodieCatalogTable.tableSchema.fields.map { field => - if (resolver(field.name, columnName)) { + if (field.name == originColumn.name) { newColumn } else { field @@ -57,7 +61,7 @@ case class AlterHoodieTableChangeColumnCommand( }) val newDataSchema = StructType( hoodieCatalogTable.dataSchema.fields.map { field => - if (resolver(field.name, columnName)) { + if (field.name == columnName) { newColumn } else { field diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala index b2b8911e36573..7f3cedbd3986e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala @@ -148,7 +148,7 @@ case class AlterHoodieTableDropPartitionCommand( hoodieCatalogTable: HoodieCatalogTable, normalizedSpecs: Seq[Map[String, String]]): String = { val table = hoodieCatalogTable.table - val allPartitionPaths = hoodieCatalogTable.getAllPartitionPaths + val allPartitionPaths = hoodieCatalogTable.getPartitionPaths val enableHiveStylePartitioning = isHiveStyledPartitioning(allPartitionPaths, table) val enableEncodeUrl = isUrlEncodeEnabled(allPartitionPaths, table) val partitionsToDrop = normalizedSpecs.map { spec => diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/ShowHoodieTablePartitionsCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/ShowHoodieTablePartitionsCommand.scala index f7511f5b5542f..d896fecae0cd0 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/ShowHoodieTablePartitionsCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/ShowHoodieTablePartitionsCommand.scala @@ -48,10 +48,10 @@ case class ShowHoodieTablePartitionsCommand( if (partitionColumnNamesOpt.isPresent && partitionColumnNamesOpt.get.nonEmpty && schemaOpt.nonEmpty) { if (specOpt.isEmpty) { - hoodieCatalogTable.getAllPartitionPaths.map(Row(_)) + hoodieCatalogTable.getPartitionPaths.map(Row(_)) } else { val spec = specOpt.get - hoodieCatalogTable.getAllPartitionPaths.filter { partitionPath => + hoodieCatalogTable.getPartitionPaths.filter { partitionPath => val part = PartitioningUtils.parsePathFragment(partitionPath) spec.forall { case (col, value) => PartitionPathEncodeUtils.escapePartitionValue(value) == part.getOrElse(col, null) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala index 9a74d23c2f937..469b135959846 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hudi import org.apache.hudi.common.table.HoodieTableMetaClient + import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.types.{LongType, StructField, StructType} @@ -44,7 +45,24 @@ class TestAlterTable extends TestHoodieSqlBase { | preCombineField = 'ts' | ) """.stripMargin) - // Alter table name. + + // change column comment + spark.sql(s"alter table $tableName change column id id int comment 'primary id'") + var catalogTable = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(tableName)) + assertResult("primary id") ( + catalogTable.schema(catalogTable.schema.fieldIndex("id")).getComment().get + ) + spark.sql(s"alter table $tableName change column name name string comment 'name column'") + spark.sessionState.catalog.refreshTable(new TableIdentifier(tableName)) + catalogTable = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(tableName)) + assertResult("primary id") ( + catalogTable.schema(catalogTable.schema.fieldIndex("id")).getComment().get + ) + assertResult("name column") ( + catalogTable.schema(catalogTable.schema.fieldIndex("name")).getComment().get + ) + + // alter table name. val newTableName = s"${tableName}_1" spark.sql(s"alter table $tableName rename to $newTableName") assertResult(false)( @@ -53,24 +71,26 @@ class TestAlterTable extends TestHoodieSqlBase { assertResult(true) ( spark.sessionState.catalog.tableExists(new TableIdentifier(newTableName)) ) + val hadoopConf = spark.sessionState.newHadoopConf() val metaClient = HoodieTableMetaClient.builder().setBasePath(tablePath) .setConf(hadoopConf).build() - assertResult(newTableName) ( - metaClient.getTableConfig.getTableName - ) + assertResult(newTableName) (metaClient.getTableConfig.getTableName) + + // insert some data spark.sql(s"insert into $newTableName values(1, 'a1', 10, 1000)") - // Add table column + // add column spark.sql(s"alter table $newTableName add columns(ext0 string)") - val table = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(newTableName)) + catalogTable = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(newTableName)) assertResult(Seq("id", "name", "price", "ts", "ext0")) { - HoodieSqlCommonUtils.removeMetaFields(table.schema).fields.map(_.name) + HoodieSqlCommonUtils.removeMetaFields(catalogTable.schema).fields.map(_.name) } checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")( Seq(1, "a1", 10.0, 1000, null) ) - // Alter table column type + + // change column's data type spark.sql(s"alter table $newTableName change column id id bigint") assertResult(StructType(Seq(StructField("id", LongType, nullable = true))))( spark.sql(s"select id from $newTableName").schema)