Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ package org.apache.spark.sql.hudi.command

import org.apache.avro.Schema
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType}
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.timeline.HoodieInstant.State
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCommitMetadata
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant}
import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.util.CommitUtils
import org.apache.hudi.table.HoodieSparkTable
import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, HoodieWriterUtils, SparkAdapterSupport}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.hudi.HoodieOptionConfig
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
Expand Down Expand Up @@ -64,33 +65,23 @@ case class AlterHoodieTableAddColumnsCommand(tableId: TableIdentifier,
// Commit with new schema to change the table schema
AlterHoodieTableAddColumnsCommand.commitWithSchema(newSchema, hoodieCatalogTable, sparkSession)

// Refresh the new schema to meta
val newDataSchema = StructType(hoodieCatalogTable.dataSchema.fields ++ colsToAdd)
refreshSchemaInMeta(sparkSession, hoodieCatalogTable.table, newDataSchema)
validateSchema(newDataSchema)
// Refresh the new schema to meta
AlterHoodieTableAddColumnsCommand.refreshSchema(sparkSession, hoodieCatalogTable, newDataSchema)
}
Seq.empty[Row]
}

private def refreshSchemaInMeta(sparkSession: SparkSession, table: CatalogTable,
newSqlDataSchema: StructType): Unit = {
try {
sparkSession.catalog.uncacheTable(tableId.quotedString)
} catch {
case NonFatal(e) =>
log.warn(s"Exception when attempting to uncache table ${tableId.quotedString}", e)
}
sparkSession.catalog.refreshTable(table.identifier.unquotedString)

private def validateSchema(dataSchema: StructType): Unit = {
AlterHoodieTableAddColumnsCommand.checkColumnNameDuplication(
newSqlDataSchema.map(_.name),
"in the table definition of " + table.identifier,
dataSchema.map(_.name),
"in the table definition of " + tableId.identifier,
conf.caseSensitiveAnalysis)

sparkSession.sessionState.catalog.alterTableDataSchema(tableId, newSqlDataSchema)
}
}

object AlterHoodieTableAddColumnsCommand extends SparkAdapterSupport {
object AlterHoodieTableAddColumnsCommand extends SparkAdapterSupport with Logging {
/**
* Generate an empty commit with new schema to change the table's schema.
*
Expand Down Expand Up @@ -139,4 +130,33 @@ object AlterHoodieTableAddColumnsCommand extends SparkAdapterSupport {
def checkColumnNameDuplication(columnNames: Seq[String], colType: String, caseSensitiveAnalysis: Boolean): Unit = {
sparkAdapter.getSchemaUtils.checkColumnNameDuplication(columnNames, colType, caseSensitiveAnalysis)
}

def refreshSchema(session: SparkSession, catalogTable: HoodieCatalogTable, dataSchema: StructType): Unit = {
refreshSchemaInMeta(session, catalogTable.table.identifier, dataSchema)
if (catalogTable.tableType == HoodieTableType.MERGE_ON_READ) {
val tableId = catalogTable.table.identifier
val tableName = catalogTable.tableName
// refresh schema of rt table if exist
val rtTableId = tableId.copy(table = s"${tableName}_rt")
if (session.catalog.tableExists(rtTableId.unquotedString)) {
refreshSchemaInMeta(session, rtTableId, dataSchema)
}
// refresh schema of ro table if exist
val roTableId = tableId.copy(table = s"${tableName}_ro")
if (session.catalog.tableExists(roTableId.unquotedString)) {
refreshSchemaInMeta(session, roTableId, dataSchema)
}
}
}

private def refreshSchemaInMeta(session: SparkSession, tableId: TableIdentifier, dataSchema: StructType): Unit = {
try {
session.catalog.uncacheTable(tableId.quotedString)
} catch {
case NonFatal(e) =>
log.warn(s"Exception when attempting to uncache table ${tableId.quotedString}", e)
}
session.catalog.refreshTable(tableId.unquotedString)
session.sessionState.catalog.alterTableDataSchema(tableId, dataSchema)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.types.{StructField, StructType}

import scala.util.control.NonFatal

/**
* Command for alter hudi table's column type.
*/
Expand Down Expand Up @@ -81,16 +79,8 @@ case class AlterHoodieTableChangeColumnCommand(
// Commit new schema to change the table schema
AlterHoodieTableAddColumnsCommand.commitWithSchema(newSchema, hoodieCatalogTable, sparkSession)

try {
sparkSession.catalog.uncacheTable(tableIdentifier.quotedString)
} catch {
case NonFatal(e) =>
log.warn(s"Exception when attempting to uncache table ${tableIdentifier.quotedString}", e)
}
sparkSession.catalog.refreshTable(tableIdentifier.unquotedString)
// Change the schema in the meta using new data schema.
sparkSession.sessionState.catalog.alterTableDataSchema(tableIdentifier, newDataSchema)

// Refresh the new schema to meta
AlterHoodieTableAddColumnsCommand.refreshSchema(sparkSession, hoodieCatalogTable, newDataSchema)
Seq.empty[Row]
}

Expand Down