diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala index 0077f35d97543..47ad11e17bc7c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala @@ -19,16 +19,17 @@ package org.apache.spark.sql.hudi.command import org.apache.avro.Schema import org.apache.hudi.avro.HoodieAvroUtils -import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType} +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieTableType, WriteOperationType} import org.apache.hudi.common.table.timeline.HoodieInstant.State import org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCommitMetadata -import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant} +import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.hudi.common.util.CommitUtils import org.apache.hudi.table.HoodieSparkTable import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, HoodieWriterUtils, SparkAdapterSupport} import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable} +import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.hudi.HoodieOptionConfig import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{AnalysisException, Row, SparkSession} @@ -64,33 +65,23 @@ case class AlterHoodieTableAddColumnsCommand(tableId: TableIdentifier, // Commit with new schema to change the table schema AlterHoodieTableAddColumnsCommand.commitWithSchema(newSchema, hoodieCatalogTable, sparkSession) - // Refresh the new schema to meta val newDataSchema = StructType(hoodieCatalogTable.dataSchema.fields ++ colsToAdd) - refreshSchemaInMeta(sparkSession, hoodieCatalogTable.table, newDataSchema) + validateSchema(newDataSchema) + // Refresh the new schema to meta + AlterHoodieTableAddColumnsCommand.refreshSchema(sparkSession, hoodieCatalogTable, newDataSchema) } Seq.empty[Row] } - private def refreshSchemaInMeta(sparkSession: SparkSession, table: CatalogTable, - newSqlDataSchema: StructType): Unit = { - try { - sparkSession.catalog.uncacheTable(tableId.quotedString) - } catch { - case NonFatal(e) => - log.warn(s"Exception when attempting to uncache table ${tableId.quotedString}", e) - } - sparkSession.catalog.refreshTable(table.identifier.unquotedString) - + private def validateSchema(dataSchema: StructType): Unit = { AlterHoodieTableAddColumnsCommand.checkColumnNameDuplication( - newSqlDataSchema.map(_.name), - "in the table definition of " + table.identifier, + dataSchema.map(_.name), + "in the table definition of " + tableId.identifier, conf.caseSensitiveAnalysis) - - sparkSession.sessionState.catalog.alterTableDataSchema(tableId, newSqlDataSchema) } } -object AlterHoodieTableAddColumnsCommand extends SparkAdapterSupport { +object AlterHoodieTableAddColumnsCommand extends SparkAdapterSupport with Logging { /** * Generate an empty commit with new schema to change the table's schema. * @@ -139,4 +130,33 @@ object AlterHoodieTableAddColumnsCommand extends SparkAdapterSupport { def checkColumnNameDuplication(columnNames: Seq[String], colType: String, caseSensitiveAnalysis: Boolean): Unit = { sparkAdapter.getSchemaUtils.checkColumnNameDuplication(columnNames, colType, caseSensitiveAnalysis) } + + def refreshSchema(session: SparkSession, catalogTable: HoodieCatalogTable, dataSchema: StructType): Unit = { + refreshSchemaInMeta(session, catalogTable.table.identifier, dataSchema) + if (catalogTable.tableType == HoodieTableType.MERGE_ON_READ) { + val tableId = catalogTable.table.identifier + val tableName = catalogTable.tableName + // refresh schema of rt table if exist + val rtTableId = tableId.copy(table = s"${tableName}_rt") + if (session.catalog.tableExists(rtTableId.unquotedString)) { + refreshSchemaInMeta(session, rtTableId, dataSchema) + } + // refresh schema of ro table if exist + val roTableId = tableId.copy(table = s"${tableName}_ro") + if (session.catalog.tableExists(roTableId.unquotedString)) { + refreshSchemaInMeta(session, roTableId, dataSchema) + } + } + } + + private def refreshSchemaInMeta(session: SparkSession, tableId: TableIdentifier, dataSchema: StructType): Unit = { + try { + session.catalog.uncacheTable(tableId.quotedString) + } catch { + case NonFatal(e) => + log.warn(s"Exception when attempting to uncache table ${tableId.quotedString}", e) + } + session.catalog.refreshTable(tableId.unquotedString) + session.sessionState.catalog.alterTableDataSchema(tableId, dataSchema) + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala index a6cbf1de48430..73bde280dde1f 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala @@ -28,8 +28,6 @@ import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._ import org.apache.spark.sql.types.{StructField, StructType} -import scala.util.control.NonFatal - /** * Command for alter hudi table's column type. */ @@ -81,16 +79,8 @@ case class AlterHoodieTableChangeColumnCommand( // Commit new schema to change the table schema AlterHoodieTableAddColumnsCommand.commitWithSchema(newSchema, hoodieCatalogTable, sparkSession) - try { - sparkSession.catalog.uncacheTable(tableIdentifier.quotedString) - } catch { - case NonFatal(e) => - log.warn(s"Exception when attempting to uncache table ${tableIdentifier.quotedString}", e) - } - sparkSession.catalog.refreshTable(tableIdentifier.unquotedString) - // Change the schema in the meta using new data schema. - sparkSession.sessionState.catalog.alterTableDataSchema(tableIdentifier, newDataSchema) - + // Refresh the new schema to meta + AlterHoodieTableAddColumnsCommand.refreshSchema(sparkSession, hoodieCatalogTable, newDataSchema) Seq.empty[Row] }