Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ statement
(identifier | FOR COLUMNS identifierSeq | FOR ALL COLUMNS)? #analyze
| ALTER TABLE tableIdentifier
ADD COLUMNS '(' columns=colTypeList ')' #addTableColumns
| ALTER TABLE tableIdentifier
REPLACE COLUMNS '(' columns=colTypeList ')' #replaceTableColumns
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine to add this syntax, but this only supports changing column comments, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dropping the column is still risky to us. When you adding the columns back, we might see the unexpected values.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile the idea is actually to implement alter table replace colu�mns fully:

  • drop column if not preset in the new schema
  • add column, if not preset in the table’s schema
  • keep column if colum_name and colum_type match between schemas
  • raise exception if trying to replace an existing column with a different data type

I believe this is a much needed feature to be able to manage meta-stores fully from Spark; however I understand the complexity and risks of this operation, so please let’s take the time to ensure it’s the right thing to do and all possibile ramifications.

I tried to make the commit history as clean and as descriptive as possibile and just added some more details in the PR description known issues section.

I've also added an extra validation step to prevent columns to be replaced if the data type doesn't match (8b6da23)

Let me know if anything isn’t clear and I'll be happy add more details.

Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me give a typical scenario. For an existing table, if we drop the column col1, the data will not be visible to the end users. This is expected. However, if we add col1 back, the previous values of the column col1 should not be visible to the end users too. This is how the other database systems work. What is your thought?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i understand your point, it makes total sense and i would completely agree if hive was like the other dbms. according to your scenario, also the below should not be possibile, but it is:

>>> spark.sql("CREATE EXTERNAL TABLE test_table (c1 string, c2 int ) STORED AS PARQUET LOCATION '/tmp/test_table'")
DataFrame[]

>>> spark.sql("INSERT INTO test_table VALUES ('c1', 2)")
DataFrame[]

>>> spark.sql("SELECT * FROM test_table").show()
+---+---+
| c1| c2|
+---+---+
| c1|  2|
+---+---+

>>> spark.sql("DROP TABLE test_table")
DataFrame[]

>>> spark.catalog.listTables()
[]

>>> spark.sql("CREATE EXTERNAL TABLE test_table (c1 string, c2 int, c3 boolean) STORED AS PARQUET LOCATION '/tmp/test_table'")
DataFrame[]

>>> spark.sql("SELECT * FROM test_table").show()
+---+---+----+
| c1| c2|  c3|
+---+---+----+
| c1|  2|null|
+---+---+----+

do you agree it's the same behaviour you explained in the "drop / re-create column" example?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile any thoughts about my previous comment?

i was wandering if this could be just a miscommunication issue as you are referring to the behaviour of alter table replace columns with "normal tables", while i'm manly talking about "external tables"?

maybe replace columns could be enabled only for external tables? would that be acceptable / less risky in your opinion?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile any thoughts about my previous comment?

i was wandering if this could be just a miscommunication issue as you are referring to the behaviour of alter table replace columns with "normal tables", while i'm manly talking about "external tables"?

maybe replace columns could be enabled only for external tables? would that be acceptable / less risky in your opinion?

| ALTER (TABLE | VIEW) from=tableIdentifier
RENAME TO to=tableIdentifier #renameTable
| ALTER (TABLE | VIEW) tableIdentifier
Expand Down Expand Up @@ -232,7 +234,6 @@ unsupportedHiveNativeCommands
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=COMPACT
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=CONCATENATE
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=SET kw4=FILEFORMAT
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=REPLACE kw4=COLUMNS
| kw1=START kw2=TRANSACTION
| kw1=COMMIT
| kw1=ROLLBACK
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ trait ExternalCatalog {
def alterTable(tableDefinition: CatalogTable): Unit

/**
* Alter the data schema of a table identified by the provided database and table name. The new
* data schema should not have conflict column names with the existing partition columns, and
* should still contain all the existing data columns.
* Alter the data schema of a table identified by the provided database and table name.
* The new data schema should not have conflicting column names with existing partition columns.
* The existing data schema will be overwritten with the new data schema provided.
*
* @param db Database that table to alter schema for exists in
* @param table Name of table to alter schema for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,9 @@ class SessionCatalog(
}

/**
* Alter the data schema of a table identified by the provided table identifier. The new data
* schema should not have conflict column names with the existing partition columns, and should
* still contain all the existing data columns.
* Alter the data schema of a table identified by the provided table identifier.
* The new data schema should not have conflicting column names with existing partition columns.
* The existing data schema will be overwritten with the new data schema provided.
*
* @param identifier TableIdentifier
* @param newDataSchema Updated data schema to be used for the table
Expand All @@ -375,19 +375,6 @@ class SessionCatalog(
requireDbExists(db)
requireTableExists(tableIdentifier)

val catalogTable = externalCatalog.getTable(db, table)
val oldDataSchema = catalogTable.dataSchema
// not supporting dropping columns yet
val nonExistentColumnNames =
oldDataSchema.map(_.name).filterNot(columnNameResolved(newDataSchema, _))
if (nonExistentColumnNames.nonEmpty) {
throw new AnalysisException(
s"""
|Some existing schema fields (${nonExistentColumnNames.mkString("[", ",", "]")}) are
|not present in the new schema. We don't support dropping columns yet.
""".stripMargin)
}

externalCatalog.alterTableDataSchema(db, table, newDataSchema)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,33 @@ private[spark] object SchemaUtils {
s"Found duplicate column(s) $colType: ${duplicateColumns.mkString(", ")}")
}
}

/**
* Checks if the two provided schemas have columns with matching names. If yes, also the
* data type has to match otherwise an exception is raised
*
* @param schemaOne first schema to compare
* @param schemaTwo second schema to compare
* @param resolver resolver used to determine if two identifiers are equal
*/
def checkDataTypeMatchesForSameColumnName(
schemaOne: StructType, schemaTwo: StructType, resolver: Resolver): Unit = {
checkDataTypeMatchesForSameColumnName(schemaOne, schemaTwo, isCaseSensitiveAnalysis(resolver))
}

def checkDataTypeMatchesForSameColumnName(
schemaOne: StructType, schemaTwo: StructType, caseSensitiveAnalysis: Boolean): Unit = {
for (s1 <- schemaOne; s2 <- schemaTwo) {
// scalastyle:off caselocale
val schemaOneColumName = if (caseSensitiveAnalysis) s1.name else s1.name.toLowerCase
val schemaTwoColumName = if (caseSensitiveAnalysis) s2.name else s2.name.toLowerCase
// scalastyle:on caselocale

if (schemaOneColumName == schemaTwoColumName & s1.dataType != s2.dataType) {
throw new AnalysisException(
s"Column `$schemaOneColumName` type doesn't match between schemas ($s1 <> $s2)")
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -479,11 +479,8 @@ abstract class SessionCatalogSuite extends AnalysisTest {
withBasicCatalog { sessionCatalog =>
sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)
val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
val e = intercept[AnalysisException] {
sessionCatalog.alterTableDataSchema(
TableIdentifier("t1", Some("default")), StructType(oldTab.dataSchema.drop(1)))
}.getMessage
assert(e.contains("We don't support dropping columns yet."))
sessionCatalog.alterTableDataSchema(
TableIdentifier("t1", Some("default")), StructType(oldTab.dataSchema.drop(1)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,48 @@ class SchemaUtilsSuite extends SparkFunSuite {

checkNoExceptionCases("a INT, b INT, c INT", caseSensitive = false)
}

test(s"Test checkDataTypeMatchesForSameColumnName") {
def compareSchemas(schema1_str: String, schema2_str: String,
caseSensitive: Boolean, shouldRaiseException: Boolean): Unit = {
val schema1 = StructType.fromDDL(schema1_str)
val schema2 = StructType.fromDDL(schema2_str)

if (shouldRaiseException) {
val msg = intercept[AnalysisException] {
SchemaUtils.checkDataTypeMatchesForSameColumnName(schema1, schema2, caseSensitive)
}.getMessage
assert(msg.contains("type doesn't match between schemas"))
}
else SchemaUtils.checkDataTypeMatchesForSameColumnName(schema1, schema2, caseSensitive)

}
// pass when datatype is the same
compareSchemas("a int, b string", "a int, B string",
caseSensitive = false, shouldRaiseException = false)
compareSchemas("a int, b string, B int", "a int, b string, B int",
caseSensitive = true, shouldRaiseException = false)

// fail when there's at least one mismatch
compareSchemas("a int, b string", "a string, b string",
caseSensitive = false, shouldRaiseException = true)
compareSchemas("a int, b string", "a int, b string, B int",
caseSensitive = false, shouldRaiseException = true)

// work as expected when schemas structures differ
compareSchemas("a int, b string", "c string, D int, A int",
caseSensitive = true, shouldRaiseException = false)
compareSchemas("a int, b string", "b string",
caseSensitive = false, shouldRaiseException = false)
compareSchemas("a int, b string", "B string",
caseSensitive = false, shouldRaiseException = false)
compareSchemas("a int, b string", "a string",
caseSensitive = true, shouldRaiseException = true)
compareSchemas("a int, b string", "A string",
caseSensitive = false, shouldRaiseException = true)
compareSchemas("a int", "a int, A string",
caseSensitive = true, shouldRaiseException = false)
compareSchemas("b string", "b string, B int",
caseSensitive = false, shouldRaiseException = true)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,23 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
)
}

/**
* Create a [[AlterTableReplaceColumnsCommand]] command.
*
* For example:
* {{{
* ALTER TABLE table1
* REPLACE COLUMNS (col_name data_type [COMMENT col_comment], ...);
* }}}
*/
override def visitReplaceTableColumns(ctx: ReplaceTableColumnsContext):
LogicalPlan = withOrigin(ctx) {
AlterTableReplaceColumnsCommand(
visitTableIdentifier(ctx.tableIdentifier),
visitColTypeList(ctx.columns)
)
}

/**
* Create an [[AlterTableSetPropertiesCommand]] command.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,9 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIdentifier}
import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils}
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2
import org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2
import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.Utils

/**
* A command to create a table with the same definition of the given existing table.
Expand Down Expand Up @@ -181,20 +174,106 @@ case class AlterTableRenameCommand(

}

abstract class AlterTableAddReplaceColumnsCommandsBase extends RunnableCommand {
/**
* Ensure the columns to add/replace meet requirements:
* - columns to add should not have conflicting names with existing columns
* - columns to replace should have distinct names
* - if a column to replace exists already in the table, the data type has to match
* - column names have to match the given datasource format specifications
*/
protected def verifyColumnsToAddReplace(
table: TableIdentifier,
catalogTable: CatalogTable,
colsToVerify: Seq[StructField]): Unit = {

SchemaUtils.checkColumnNameDuplication(
colsToVerify.map(_.name),
"in the table definition of " + table.identifier,
conf.caseSensitiveAnalysis)

SchemaUtils.checkDataTypeMatchesForSameColumnName(
StructType(colsToVerify),
catalogTable.dataSchema,
conf.caseSensitiveAnalysis)

DDLUtils.checkDataColNames(catalogTable, colsToVerify.map(_.name))
}

/**
* ALTER TABLE [ADD|REPLACE] COLUMNS command do not support temporary view/table, view
* or external provider. Datasource table currently supports: parquet, json, orc.
* ADD COLUMNS also supports csv. REPLACE COLUMNS also supports text.
*/
protected def verifyAlterTableAddReplaceColumn(
conf: SQLConf,
catalog: SessionCatalog,
table: TableIdentifier): CatalogTable = {
val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table)

if (catalogTable.tableType == CatalogTableType.VIEW) {
throw new AnalysisException(
s"""
|ALTER [ADD|REPLACE] COLUMNS do not support views.
|You must drop and re-create the views for adding the new columns. Views: $table
""".stripMargin)
}

if (DDLUtils.isDatasourceTable(catalogTable)) {
DataSource.lookupDataSource(catalogTable.provider.get, conf).
getConstructor().newInstance() match {
// Hive type is already considered as hive serde table, so the logic will not
// come in here.
case s if isDatasourceFormatSupported(s.getClass.getCanonicalName) =>
case s =>
throw new AnalysisException(
s"""
|ALTER [ADD|REPLACE] COLUMNS do not support datasource table with type $s.
|You must drop and re-create the table for adding the new columns. Tables: $table
""".stripMargin)
}
}
catalogTable
}

/**
* Checks if the datasource class canonical name provided ends with one of the
* formats supported specified in `supportedDatasourceFormats` as a sequence of strings
* representing the class names of the supported datasource formats
*/
val supportedDatasourceFormats: Seq[String]

protected def isDatasourceFormatSupported(
datasourceClassCanonicalName: String): Boolean = {
supportedDatasourceFormats.foreach { ds =>
if (datasourceClassCanonicalName.endsWith(ds)) {
return true
}
}
false
}
}

/**
* A command that add columns to a table
* The syntax of using this command in SQL is:
* {{{
* ALTER TABLE table_identifier
* ADD COLUMNS (col_name data_type [COMMENT col_comment], ...);
* }}}
*/
*/
case class AlterTableAddColumnsCommand(
table: TableIdentifier,
colsToAdd: Seq[StructField]) extends RunnableCommand {
colsToAdd: Seq[StructField]) extends AlterTableAddReplaceColumnsCommandsBase {

// Text format doesn't need ADD COLUMNS as text datasource can have only one column
override val supportedDatasourceFormats = Seq("ParquetFileFormat", "OrcFileFormat",
"OrcDataSourceV2", "JsonFileFormat", "JsonDataSourceV2", "CSVFileFormat", "CSVDataSourceV2")

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val catalogTable = verifyAlterTableAddColumn(sparkSession.sessionState.conf, catalog, table)
val catalogTable = verifyAlterTableAddReplaceColumn(
sparkSession.sessionState.conf, catalog, table)

try {
sparkSession.catalog.uncacheTable(table.quotedString)
Expand All @@ -204,54 +283,47 @@ case class AlterTableAddColumnsCommand(
}
catalog.refreshTable(table)

SchemaUtils.checkColumnNameDuplication(
(colsToAdd ++ catalogTable.schema).map(_.name),
"in the table definition of " + table.identifier,
conf.caseSensitiveAnalysis)
DDLUtils.checkDataColNames(catalogTable, colsToAdd.map(_.name))
verifyColumnsToAddReplace(table, catalogTable, colsToAdd ++ catalogTable.schema)

catalog.alterTableDataSchema(table, StructType(catalogTable.dataSchema ++ colsToAdd))
Seq.empty[Row]
}
}

/**
* ALTER TABLE ADD COLUMNS command does not support temporary view/table,
* view, or datasource table with text, orc formats or external provider.
* For datasource table, it currently only supports parquet, json, csv, orc.
*/
private def verifyAlterTableAddColumn(
conf: SQLConf,
catalog: SessionCatalog,
table: TableIdentifier): CatalogTable = {
val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table)
/**
* A command that replace columns in a table
* The syntax of using this command in SQL is:
* {{{
* ALTER TABLE table_identifier
* REPLACE COLUMNS (col_name data_type [COMMENT col_comment], ...);
* }}}
*/
case class AlterTableReplaceColumnsCommand(
table: TableIdentifier,
colsToReplace: Seq[StructField]) extends AlterTableAddReplaceColumnsCommandsBase {

if (catalogTable.tableType == CatalogTableType.VIEW) {
throw new AnalysisException(
s"""
|ALTER ADD COLUMNS does not support views.
|You must drop and re-create the views for adding the new columns. Views: $table
""".stripMargin)
}
// Csv format not supported by REPLACE COLUMNS as csv datasource is read positionally
// and a new replacement column would reference an old replaced column's data
override val supportedDatasourceFormats = Seq("ParquetFileFormat", "OrcFileFormat",
"OrcDataSourceV2", "JsonFileFormat", "JsonDataSourceV2", "TextFileFormat", "TextDataSourceV2")

if (DDLUtils.isDatasourceTable(catalogTable)) {
DataSource.lookupDataSource(catalogTable.provider.get, conf).
getConstructor().newInstance() match {
// For datasource table, this command can only support the following File format.
// TextFileFormat only default to one column "value"
// Hive type is already considered as hive serde table, so the logic will not
// come in here.
case _: CSVFileFormat | _: JsonFileFormat | _: ParquetFileFormat =>
case _: JsonDataSourceV2 | _: CSVDataSourceV2 | _: OrcDataSourceV2 =>
case s if s.getClass.getCanonicalName.endsWith("OrcFileFormat") =>
case s =>
throw new AnalysisException(
s"""
|ALTER ADD COLUMNS does not support datasource table with type $s.
|You must drop and re-create the table for adding the new columns. Tables: $table
""".stripMargin)
}
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val catalogTable = verifyAlterTableAddReplaceColumn(
sparkSession.sessionState.conf, catalog, table)

try {
sparkSession.catalog.uncacheTable(table.quotedString)
} catch {
case NonFatal(e) =>
log.warn(s"Exception when attempting to uncache table ${table.quotedString}", e)
}
catalogTable
catalog.refreshTable(table)

verifyColumnsToAddReplace(table, catalogTable, colsToReplace)

catalog.alterTableDataSchema(table, StructType(colsToReplace))
Seq.empty[Row]
}
}

Expand Down
Loading