Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, Literal, Na
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, DeleteFromTable, InsertAction, LogicalPlan, MergeIntoTable, Project, UpdateAction, UpdateTable}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.CreateDataSourceTableCommand
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableChangeColumnCommand, AlterTableRenameCommand, CreateDataSourceTableCommand}
import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
import org.apache.spark.sql.hudi.HoodieSqlUtils
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.hudi.command.{CreateHoodieTableAsSelectCommand, CreateHoodieTableCommand, DeleteHoodieTableCommand, InsertIntoHoodieTableCommand, MergeIntoHoodieTableCommand, UpdateHoodieTableCommand}
import org.apache.spark.sql.hudi.command.{AlterHoodieTableAddColumnsCommand, AlterHoodieTableChangeColumnCommand, AlterHoodieTableRenameCommand, CreateHoodieTableAsSelectCommand, CreateHoodieTableCommand, DeleteHoodieTableCommand, InsertIntoHoodieTableCommand, MergeIntoHoodieTableCommand, UpdateHoodieTableCommand}
import org.apache.spark.sql.types.StringType

object HoodieAnalysis {
Expand Down Expand Up @@ -86,6 +86,7 @@ case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan]
case CreateTable(table, mode, Some(query))
if query.resolved && isHoodieTable(table) =>
CreateHoodieTableAsSelectCommand(table, mode, query)

case _=> plan
}
}
Expand Down Expand Up @@ -307,6 +308,18 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic
case CreateDataSourceTableCommand(table, ignoreIfExists)
if isHoodieTable(table) =>
CreateHoodieTableCommand(table, ignoreIfExists)
// Rewrite the AlterTableAddColumnsCommand to AlterHoodieTableAddColumnsCommand
case AlterTableAddColumnsCommand(tableId, colsToAdd)
if isHoodieTable(tableId, sparkSession) =>
AlterHoodieTableAddColumnsCommand(tableId, colsToAdd)
// Rewrite the AlterTableRenameCommand to AlterHoodieTableRenameCommand
case AlterTableRenameCommand(oldName, newName, isView)
if !isView && isHoodieTable(oldName, sparkSession) =>
new AlterHoodieTableRenameCommand(oldName, newName, isView)
// Rewrite the AlterTableChangeColumnCommand to AlterHoodieTableChangeColumnCommand
case AlterTableChangeColumnCommand(tableName, columnName, newColumn)
if isHoodieTable(tableName, sparkSession) =>
AlterHoodieTableChangeColumnCommand(tableName, columnName, newColumn)
case _ => plan
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi.command

import java.nio.charset.StandardCharsets

import org.apache.avro.Schema
import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieInstant.State
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant}
import org.apache.hudi.common.util.{CommitUtils, Option}
import org.apache.hudi.table.HoodieSparkTable

import scala.collection.JavaConverters._
import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, HoodieWriterUtils}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand}
import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.util.SchemaUtils

import scala.util.control.NonFatal

/**
* Command for add new columns to the hudi table.
*/
case class AlterHoodieTableAddColumnsCommand(
tableId: TableIdentifier,
colsToAdd: Seq[StructField])
extends RunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
if (colsToAdd.nonEmpty) {
val table = sparkSession.sessionState.catalog.getTableMetadata(tableId)
// Get the new schema
val newSqlSchema = StructType(table.schema.fields ++ colsToAdd)
val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableId.table)
val newSchema = AvroConversionUtils.convertStructTypeToAvroSchema(newSqlSchema, structName, nameSpace)

// Commit with new schema to change the table schema
AlterHoodieTableAddColumnsCommand.commitWithSchema(newSchema, table, sparkSession)

// Refresh the new schema to meta
refreshSchemaInMeta(sparkSession, table, newSqlSchema)
}
Seq.empty[Row]
}

private def refreshSchemaInMeta(sparkSession: SparkSession, table: CatalogTable,
newSqlSchema: StructType): Unit = {
try {
sparkSession.catalog.uncacheTable(tableId.quotedString)
} catch {
case NonFatal(e) =>
log.warn(s"Exception when attempting to uncache table ${tableId.quotedString}", e)
}
sparkSession.catalog.refreshTable(table.identifier.unquotedString)

SchemaUtils.checkColumnNameDuplication(
newSqlSchema.map(_.name),
"in the table definition of " + table.identifier,
conf.caseSensitiveAnalysis)
DDLUtils.checkDataColNames(table, colsToAdd.map(_.name))

sparkSession.sessionState.catalog.alterTableDataSchema(tableId, newSqlSchema)
}
}

object AlterHoodieTableAddColumnsCommand {
/**
* Generate an empty commit with new schema to change the table's schema.
* @param schema The new schema to commit.
* @param table The hoodie table.
* @param sparkSession The spark session.
*/
def commitWithSchema(schema: Schema, table: CatalogTable, sparkSession: SparkSession): Unit = {
val path = getTableLocation(table, sparkSession)
.getOrElse(s"missing location for ${table.identifier}")

val jsc = new JavaSparkContext(sparkSession.sparkContext)
val client = DataSourceUtils.createHoodieClient(jsc, schema.toString,
path, table.identifier.table, HoodieWriterUtils.parametersWithWriteDefaults(table.storage.properties).asJava)

val hadoopConf = sparkSession.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder().setBasePath(path).setConf(hadoopConf).build()

val commitActionType = CommitUtils.getCommitActionType(WriteOperationType.INSERT, metaClient.getTableType)
val instantTime = HoodieActiveTimeline.createNewInstantTime
client.startCommitWithTime(instantTime, commitActionType)

val hoodieTable = HoodieSparkTable.create(client.getConfig, client.getEngineContext)
val timeLine = hoodieTable.getActiveTimeline
val requested = new HoodieInstant(State.REQUESTED, commitActionType, instantTime)
val metadata = new HoodieCommitMetadata
metadata.setOperationType(WriteOperationType.INSERT)
timeLine.transitionRequestedToInflight(requested, Option.of(metadata.toJsonString.getBytes(StandardCharsets.UTF_8)))

client.commit(instantTime, jsc.emptyRDD)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi.command

import org.apache.avro.Schema
import org.apache.hudi.AvroConversionUtils
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.exception.HoodieException
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation
import org.apache.spark.sql.types.{StructField, StructType}

import scala.util.control.NonFatal

/**
* Command for alter hudi table's column type.
*/
case class AlterHoodieTableChangeColumnCommand(
tableName: TableIdentifier,
columnName: String,
newColumn: StructField)
extends RunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
val resolver = sparkSession.sessionState.conf.resolver

if (!resolver(columnName, newColumn.name)) {
throw new AnalysisException(s"Can not support change column name for hudi table currently.")
}
// Get the new schema
val newSqlSchema = StructType(
table.dataSchema.fields.map { field =>
if (resolver(field.name, columnName)) {
newColumn
} else {
field
}
})
val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName.table)
val newSchema = AvroConversionUtils.convertStructTypeToAvroSchema(newSqlSchema, structName, nameSpace)

val path = getTableLocation(table, sparkSession)
.getOrElse(s"missing location for ${table.identifier}")
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
.setConf(hadoopConf).build()
// Validate the compatibility between new schema and origin schema.
validateSchema(newSchema, metaClient)
// Commit new schema to change the table schema
AlterHoodieTableAddColumnsCommand.commitWithSchema(newSchema, table, sparkSession)

try {
sparkSession.catalog.uncacheTable(tableName.quotedString)
} catch {
case NonFatal(e) =>
log.warn(s"Exception when attempting to uncache table ${tableName.quotedString}", e)
}
sparkSession.catalog.refreshTable(tableName.unquotedString)
// Change the schema in the meta
catalog.alterTableDataSchema(tableName, newSqlSchema)

Seq.empty[Row]
}

private def validateSchema(newSchema: Schema, metaClient: HoodieTableMetaClient): Unit = {
val schemaUtil = new TableSchemaResolver(metaClient)
val tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields)
if (!TableSchemaResolver.isSchemaCompatible(tableSchema, newSchema)) {
throw new HoodieException("Failed schema compatibility check for newSchema :" + newSchema +
", origin table schema :" + tableSchema + ", base path :" + metaClient.getBasePath)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi.command

import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.AlterTableRenameCommand
import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation

/**
* Command for alter hudi table's table name.
*/
class AlterHoodieTableRenameCommand(
oldName: TableIdentifier,
newName: TableIdentifier,
isView: Boolean)
extends AlterTableRenameCommand(oldName, newName, isView) {

override def run(sparkSession: SparkSession): Seq[Row] = {
if (newName != oldName) {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(oldName)
val path = getTableLocation(table, sparkSession)
.getOrElse(s"missing location for ${table.identifier}")

val hadoopConf = sparkSession.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
.setConf(hadoopConf).build()
// Init table with new name.
HoodieTableMetaClient.withPropertyBuilder()
.fromProperties(metaClient.getTableConfig.getProperties)
.setTableName(newName.table)
.initTable(hadoopConf, path)
// Call AlterTableRenameCommand#run to rename table in meta.
super.run(sparkSession)
}
Seq.empty[Row]
}
}
Loading