Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.catalog

import org.apache.hudi.AvroConversionUtils
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.common.config.DFSPropertiesConfiguration
import org.apache.hudi.common.model.HoodieTableType
Expand All @@ -30,7 +31,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlCommonUtils}
import org.apache.spark.sql.hudi.HoodieOptionConfig
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.types.{StructField, StructType}

Expand Down Expand Up @@ -62,7 +63,7 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
* hoodie table's location.
* if create managed hoodie table, use `catalog.defaultTablePath`.
*/
val tableLocation: String = HoodieSqlCommonUtils.getTableLocation(table, spark)
val tableLocation: String = getTableLocation(table, spark)

/**
* A flag to whether the hoodie table exists.
Expand Down Expand Up @@ -114,17 +115,27 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten

/**
* The schema of table.
* Make StructField nullable.
* Make StructField nullable and fill the comments in.
*/
lazy val tableSchema: StructType = {
val resolver = spark.sessionState.conf.resolver
val originSchema = getTableSqlSchema(metaClient, includeMetadataFields = true).getOrElse(table.schema)
StructType(originSchema.map(_.copy(nullable = true)))
val fields = originSchema.fields.map { f =>
val nullableField: StructField = f.copy(nullable = true)
val catalogField = findColumnByName(table.schema, nullableField.name, resolver)
if (catalogField.isDefined) {
catalogField.get.getComment().map(nullableField.withComment).getOrElse(nullableField)
} else {
nullableField
}
}
StructType(fields)
}

/**
* The schema without hoodie meta fields
*/
lazy val tableSchemaWithoutMetaFields: StructType = HoodieSqlCommonUtils.removeMetaFields(tableSchema)
lazy val tableSchemaWithoutMetaFields: StructType = removeMetaFields(tableSchema)

/**
* The schema of data fields
Expand All @@ -136,7 +147,7 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
/**
* The schema of data fields not including hoodie meta fields
*/
lazy val dataSchemaWithoutMetaFields: StructType = HoodieSqlCommonUtils.removeMetaFields(dataSchema)
lazy val dataSchemaWithoutMetaFields: StructType = removeMetaFields(dataSchema)

/**
* The schema of partition fields
Expand All @@ -146,7 +157,7 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
/**
* All the partition paths
*/
def getAllPartitionPaths: Seq[String] = HoodieSqlCommonUtils.getAllPartitionPaths(spark, table)
def getPartitionPaths: Seq[String] = getAllPartitionPaths(spark, table)

/**
* Check if table is a partitioned table
Expand All @@ -171,10 +182,12 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
.setTableCreateSchema(SchemaConverters.toAvroType(finalSchema).toString())
.initTable(hadoopConf, tableLocation)
} else {
val (recordName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(table.identifier.table)
val schema = SchemaConverters.toAvroType(finalSchema, false, recordName, namespace)
HoodieTableMetaClient.withPropertyBuilder()
.fromProperties(properties)
.setTableName(table.identifier.table)
.setTableCreateSchema(SchemaConverters.toAvroType(finalSchema).toString())
.setTableCreateSchema(schema.toString())
.setPartitionFields(table.partitionColumnNames.mkString(","))
.initTable(hadoopConf, tableLocation)
}
Expand Down Expand Up @@ -239,7 +252,7 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
originTableConfig: Map[String, String] = Map.empty): Map[String, String] = {
val extraConfig = mutable.Map.empty[String, String]
if (isTableExists) {
val allPartitionPaths = getAllPartitionPaths
val allPartitionPaths = getPartitionPaths
if (originTableConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)) {
extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) =
originTableConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ package org.apache.spark.sql.hudi

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieMetadataConfig}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.{AvroConversionUtils, SparkAdapterSupport}

import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
Expand All @@ -40,6 +42,7 @@ import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import java.net.URI
import java.text.SimpleDateFormat
import java.util.{Locale, Properties}

import scala.collection.JavaConverters._
import scala.collection.immutable.Map

Expand Down Expand Up @@ -301,4 +304,12 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
true
}
}

// Find the origin column from schema by column name, throw an AnalysisException if the column
// reference is invalid.
def findColumnByName(schema: StructType, name: String, resolver: Resolver):Option[StructField] = {
schema.fields.collectFirst {
case field if resolver(field.name, name) => field
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.hudi.exception.HoodieException
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.types.{StructField, StructType}

import scala.util.control.NonFatal
Expand All @@ -40,24 +41,27 @@ case class AlterHoodieTableChangeColumnCommand(
extends HoodieLeafRunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val resolver = sparkSession.sessionState.conf.resolver
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)

val resolver = sparkSession.sessionState.conf.resolver
if (!resolver(columnName, newColumn.name)) {
throw new AnalysisException(s"Can not support change column name for hudi table currently.")
}
// Find the origin column from dataSchema by column name.
val originColumn = findColumnByName(hoodieCatalogTable.dataSchema, columnName, resolver).getOrElse(
throw new AnalysisException(s"Can't find column `$columnName` given table data columns " +
s"${hoodieCatalogTable.dataSchema.fieldNames.mkString("[`", "`, `", "`]")}")
)

// Get the new schema
val newTableSchema = StructType(
hoodieCatalogTable.tableSchema.fields.map { field =>
if (resolver(field.name, columnName)) {
if (field.name == originColumn.name) {
newColumn
} else {
field
}
})
val newDataSchema = StructType(
hoodieCatalogTable.dataSchema.fields.map { field =>
if (resolver(field.name, columnName)) {
if (field.name == columnName) {
newColumn
} else {
field
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ case class AlterHoodieTableDropPartitionCommand(
hoodieCatalogTable: HoodieCatalogTable,
normalizedSpecs: Seq[Map[String, String]]): String = {
val table = hoodieCatalogTable.table
val allPartitionPaths = hoodieCatalogTable.getAllPartitionPaths
val allPartitionPaths = hoodieCatalogTable.getPartitionPaths
val enableHiveStylePartitioning = isHiveStyledPartitioning(allPartitionPaths, table)
val enableEncodeUrl = isUrlEncodeEnabled(allPartitionPaths, table)
val partitionsToDrop = normalizedSpecs.map { spec =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ case class ShowHoodieTablePartitionsCommand(

if (partitionColumnNamesOpt.isPresent && partitionColumnNamesOpt.get.nonEmpty && schemaOpt.nonEmpty) {
if (specOpt.isEmpty) {
hoodieCatalogTable.getAllPartitionPaths.map(Row(_))
hoodieCatalogTable.getPartitionPaths.map(Row(_))
} else {
val spec = specOpt.get
hoodieCatalogTable.getAllPartitionPaths.filter { partitionPath =>
hoodieCatalogTable.getPartitionPaths.filter { partitionPath =>
val part = PartitioningUtils.parsePathFragment(partitionPath)
spec.forall { case (col, value) =>
PartitionPathEncodeUtils.escapePartitionValue(value) == part.getOrElse(col, null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hudi

import org.apache.hudi.common.table.HoodieTableMetaClient

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.types.{LongType, StructField, StructType}

Expand All @@ -44,7 +45,24 @@ class TestAlterTable extends TestHoodieSqlBase {
| preCombineField = 'ts'
| )
""".stripMargin)
// Alter table name.

// change column comment
spark.sql(s"alter table $tableName change column id id int comment 'primary id'")
var catalogTable = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(tableName))
assertResult("primary id") (
catalogTable.schema(catalogTable.schema.fieldIndex("id")).getComment().get
)
spark.sql(s"alter table $tableName change column name name string comment 'name column'")
spark.sessionState.catalog.refreshTable(new TableIdentifier(tableName))
catalogTable = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(tableName))
assertResult("primary id") (
catalogTable.schema(catalogTable.schema.fieldIndex("id")).getComment().get
)
assertResult("name column") (
catalogTable.schema(catalogTable.schema.fieldIndex("name")).getComment().get
)

// alter table name.
val newTableName = s"${tableName}_1"
spark.sql(s"alter table $tableName rename to $newTableName")
assertResult(false)(
Expand All @@ -53,24 +71,26 @@ class TestAlterTable extends TestHoodieSqlBase {
assertResult(true) (
spark.sessionState.catalog.tableExists(new TableIdentifier(newTableName))
)

val hadoopConf = spark.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder().setBasePath(tablePath)
.setConf(hadoopConf).build()
assertResult(newTableName) (
metaClient.getTableConfig.getTableName
)
assertResult(newTableName) (metaClient.getTableConfig.getTableName)

// insert some data
spark.sql(s"insert into $newTableName values(1, 'a1', 10, 1000)")

// Add table column
// add column
spark.sql(s"alter table $newTableName add columns(ext0 string)")
val table = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(newTableName))
catalogTable = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(newTableName))
assertResult(Seq("id", "name", "price", "ts", "ext0")) {
HoodieSqlCommonUtils.removeMetaFields(table.schema).fields.map(_.name)
HoodieSqlCommonUtils.removeMetaFields(catalogTable.schema).fields.map(_.name)
}
checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")(
Seq(1, "a1", 10.0, 1000, null)
)
// Alter table column type

// change column's data type
spark.sql(s"alter table $newTableName change column id id bigint")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now, hudi on spark cannot support dataType change。 hudi use spark parquetFileFormat to read parquet file,but that reader is hardly support type change。 see the origin code of spark project ParquetVectorUpdaterFactory.getUpdater
This test is actually wrong, if you add spark.sql(s"select id from $newTableName").show(false) in line 95, this test will failed。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that. See the details in https://issues.apache.org/jira/browse/HUDI-3237.

assertResult(StructType(Seq(StructField("id", LongType, nullable = true))))(
spark.sql(s"select id from $newTableName").schema)
Expand Down