diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java index 8bbfe5352956..534e1b86eca6 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java @@ -18,6 +18,7 @@ package org.apache.spark.sql.connector.catalog; import java.util.Map; +import java.util.Set; import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.catalyst.analysis.*; @@ -52,6 +53,11 @@ public String name() { @Override public final void initialize(String name, CaseInsensitiveStringMap options) {} + @Override + public Set capabilities() { + return asTableCatalog().capabilities(); + } + @Override public String[] defaultNamespace() { return delegate.defaultNamespace(); diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java index 84a2a0f76481..5ccb15ff1f0a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java @@ -33,16 +33,31 @@ public enum TableCatalogCapability { /** - * Signals that the TableCatalog supports defining generated columns upon table creation in SQL. - *

- * Without this capability, any create/replace table statements with a generated column defined - * in the table schema will throw an exception during analysis. - *

- * A generated column is defined with syntax: {@code colName colType GENERATED ALWAYS AS (expr)} - *

- * Generation expression are included in the column definition for APIs like - * {@link TableCatalog#createTable}. - * See {@link Column#generationExpression()}. - */ - SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS + * Signals that the TableCatalog supports defining generated columns upon table creation in SQL. + *

+ * Without this capability, any create/replace table statements with a generated column defined + * in the table schema will throw an exception during analysis. + *

+ * A generated column is defined with syntax: {@code colName colType GENERATED ALWAYS AS (expr)} + *

+ * Generation expression are included in the column definition for APIs like + * {@link TableCatalog#createTable}. + * See {@link Column#generationExpression()}. + */ + SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS, + + /** + * Signals that the TableCatalog supports defining column default value as expression in + * CREATE/REPLACE/ALTER TABLE. + *

+ * Without this capability, any CREATE/REPLACE/ALTER TABLE statement with a column default value + * defined in the table schema will throw an exception during analysis. + *

+ * A column default value is defined with syntax: {@code colName colType DEFAULT expr} + *

+ * Column default value expression is included in the column definition for APIs like + * {@link TableCatalog#createTable}. + * See {@link Column#defaultValue()}. + */ + SUPPORT_COLUMN_DEFAULT_VALUE } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala index 6ff5df98d3cc..9a1ce5b0295f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project} import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.BuiltInFunctionCatalog -import org.apache.spark.sql.connector.catalog.{CatalogManager, TableCatalog, TableCatalogCapability} +import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, TableCatalog, TableCatalogCapability} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, StructField, StructType} @@ -182,12 +182,13 @@ object GeneratedColumn { def validateGeneratedColumns( schema: StructType, catalog: TableCatalog, - ident: Seq[String], + ident: Identifier, statementType: String): Unit = { if (hasGeneratedColumns(schema)) { if (!catalog.capabilities().contains( TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS)) { - throw QueryCompilationErrors.generatedColumnsUnsupported(ident) + throw QueryCompilationErrors.unsupportedTableOperationError( + catalog, ident, "generated columns") } GeneratedColumn.verifyGeneratedColumns(schema, statementType) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala index be7d74b07823..d0287cc602b1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.optimizer.ConstantFolding import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION -import org.apache.spark.sql.connector.catalog.{CatalogManager, FunctionCatalog, Identifier} +import org.apache.spark.sql.connector.catalog.{CatalogManager, FunctionCatalog, Identifier, TableCatalog, TableCatalogCapability} import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf @@ -90,39 +90,15 @@ object ResolveDefaultColumns { * EXISTS_DEFAULT metadata for such columns where the value is not present in storage. * * @param tableSchema represents the names and types of the columns of the statement to process. - * @param tableProvider provider of the target table to store default values for, if any. * @param statementType name of the statement being processed, such as INSERT; useful for errors. - * @param addNewColumnToExistingTable true if the statement being processed adds a new column to - * a table that already exists. * @return a copy of `tableSchema` with field metadata updated with the constant-folded values. */ def constantFoldCurrentDefaultsToExistDefaults( tableSchema: StructType, - tableProvider: Option[String], - statementType: String, - addNewColumnToExistingTable: Boolean): StructType = { + statementType: String): StructType = { if (SQLConf.get.enableDefaultColumns) { - val keywords: Array[String] = - SQLConf.get.getConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS) - .toLowerCase().split(",").map(_.trim) - val allowedTableProviders: Array[String] = - keywords.map(_.stripSuffix("*")) - val addColumnExistingTableBannedProviders: Array[String] = - keywords.filter(_.endsWith("*")).map(_.stripSuffix("*")) - val givenTableProvider: String = tableProvider.getOrElse("").toLowerCase() val newFields: Seq[StructField] = tableSchema.fields.map { field => if (field.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) { - // Make sure that the target table has a provider that supports default column values. - if (!allowedTableProviders.contains(givenTableProvider)) { - throw QueryCompilationErrors - .defaultReferencesNotAllowedInDataSource(statementType, givenTableProvider) - } - if (addNewColumnToExistingTable && - givenTableProvider.nonEmpty && - addColumnExistingTableBannedProviders.contains(givenTableProvider)) { - throw QueryCompilationErrors - .addNewDefaultColumnToExistingTableNotAllowed(statementType, givenTableProvider) - } val analyzed: Expression = analyze(field, statementType) val newMetadata: Metadata = new MetadataBuilder().withMetadata(field.metadata) .putString(EXISTS_DEFAULT_COLUMN_METADATA_KEY, analyzed.sql).build() @@ -137,6 +113,47 @@ object ResolveDefaultColumns { } } + // Fails if the given catalog does not support column default value. + def validateCatalogForDefaultValue( + schema: StructType, + catalog: TableCatalog, + ident: Identifier): Unit = { + if (SQLConf.get.enableDefaultColumns && + schema.exists(_.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) && + !catalog.capabilities().contains(TableCatalogCapability.SUPPORT_COLUMN_DEFAULT_VALUE)) { + throw QueryCompilationErrors.unsupportedTableOperationError( + catalog, ident, "column default value") + } + } + + // Fails if the given table provider of the session catalog does not support column default value. + def validateTableProviderForDefaultValue( + schema: StructType, + tableProvider: Option[String], + statementType: String, + addNewColumnToExistingTable: Boolean): Unit = { + if (SQLConf.get.enableDefaultColumns && + schema.exists(_.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY))) { + val keywords: Array[String] = SQLConf.get.getConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS) + .toLowerCase().split(",").map(_.trim) + val allowedTableProviders: Array[String] = keywords.map(_.stripSuffix("*")) + val addColumnExistingTableBannedProviders: Array[String] = + keywords.filter(_.endsWith("*")).map(_.stripSuffix("*")) + val givenTableProvider: String = tableProvider.getOrElse("").toLowerCase() + // Make sure that the target table has a provider that supports default column values. + if (!allowedTableProviders.contains(givenTableProvider)) { + throw QueryCompilationErrors.defaultReferencesNotAllowedInDataSource( + statementType, givenTableProvider) + } + if (addNewColumnToExistingTable && + givenTableProvider.nonEmpty && + addColumnExistingTableBannedProviders.contains(givenTableProvider)) { + throw QueryCompilationErrors.addNewDefaultColumnToExistingTableNotAllowed( + statementType, givenTableProvider) + } + } + } + /** * Parses and analyzes the DEFAULT column text in `field`, returning an error upon failure. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index 12a8db923635..e5d9720bb022 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -248,8 +248,9 @@ private[sql] object CatalogV2Util { val (before, after) = schema.fields.splitAt(fieldIndex + 1) StructType(before ++ (field +: after)) } - constantFoldCurrentDefaultsToExistDefaults( + validateTableProviderForDefaultValue( newSchema, tableProvider, statementType, addNewColumnToExistingTable) + constantFoldCurrentDefaultsToExistDefaults(newSchema, statementType) } private def replace( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index e4475980cf93..497a7be1420c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -749,13 +749,28 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { messageParameters = Map.empty) } - def operationOnlySupportedWithV2TableError( - nameParts: Seq[String], + def unsupportedTableOperationError( + catalog: CatalogPlugin, + ident: Identifier, + operation: String): Throwable = { + unsupportedTableOperationError( + catalog.name +: ident.namespace :+ ident.name, operation) + } + + def unsupportedTableOperationError( + ident: TableIdentifier, + operation: String): Throwable = { + unsupportedTableOperationError( + Seq(ident.catalog.get, ident.database.get, ident.table), operation) + } + + private def unsupportedTableOperationError( + qualifiedTableName: Seq[String], operation: String): Throwable = { new AnalysisException( errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION", messageParameters = Map( - "tableName" -> toSQLId(nameParts), + "tableName" -> toSQLId(qualifiedTableName), "operation" -> operation)) } @@ -3405,16 +3420,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { } } - def generatedColumnsUnsupported(nameParts: Seq[String]): AnalysisException = { - new AnalysisException( - errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION", - messageParameters = Map( - "tableName" -> toSQLId(nameParts), - "operation" -> "generated columns" - ) - ) - } - def ambiguousLateralColumnAliasError(name: String, numOfMatches: Int): Throwable = { new AnalysisException( errorClass = "AMBIGUOUS_LATERAL_COLUMN_ALIAS", diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index a7254865c1ed..9959dbf65165 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -177,7 +177,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually { // disabled. withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") { val result: StructType = ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults( - db1tbl3.schema, db1tbl3.provider, "CREATE TABLE", false) + db1tbl3.schema, "CREATE TABLE") val columnEWithFeatureDisabled: StructField = findField("e", result) // No constant-folding has taken place to the EXISTS_DEFAULT metadata. assert(!columnEWithFeatureDisabled.metadata.contains("EXISTS_DEFAULT")) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala index e82f203742b1..8a744c1c1981 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala @@ -172,7 +172,10 @@ class BasicInMemoryTableCatalog extends TableCatalog { class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamespaces { override def capabilities: java.util.Set[TableCatalogCapability] = { - Set(TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS).asJava + Set( + TableCatalogCapability.SUPPORT_COLUMN_DEFAULT_VALUE, + TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS + ).asJava } protected def allNamespaces: Seq[Seq[String]] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 7b2d5015840c..b2b35b404928 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -53,9 +53,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case AddColumns(ResolvedV1TableIdentifier(ident), cols) => cols.foreach { c => if (c.name.length > 1) { - throw QueryCompilationErrors.operationOnlySupportedWithV2TableError( - Seq(ident.catalog.get, ident.database.get, ident.table), - "ADD COLUMN with qualified column") + throw QueryCompilationErrors.unsupportedTableOperationError( + ident, "ADD COLUMN with qualified column") } if (!c.nullable) { throw QueryCompilationErrors.addColumnWithV1TableCannotSpecifyNotNullError @@ -64,24 +63,20 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) AlterTableAddColumnsCommand(ident, cols.map(convertToStructField)) case ReplaceColumns(ResolvedV1TableIdentifier(ident), _) => - throw QueryCompilationErrors.operationOnlySupportedWithV2TableError( - Seq(ident.catalog.get, ident.database.get, ident.table), - "REPLACE COLUMNS") + throw QueryCompilationErrors.unsupportedTableOperationError(ident, "REPLACE COLUMNS") case a @ AlterColumn(ResolvedTable(catalog, ident, table: V1Table, _), _, _, _, _, _, _) if isSessionCatalog(catalog) => if (a.column.name.length > 1) { - throw QueryCompilationErrors.operationOnlySupportedWithV2TableError( - Seq(catalog.name, ident.namespace()(0), ident.name), - "ALTER COLUMN with qualified column") + throw QueryCompilationErrors.unsupportedTableOperationError( + catalog, ident, "ALTER COLUMN with qualified column") } if (a.nullable.isDefined) { throw QueryCompilationErrors.alterColumnWithV1TableCannotSpecifyNotNullError } if (a.position.isDefined) { - throw QueryCompilationErrors.operationOnlySupportedWithV2TableError( - Seq(catalog.name, ident.namespace()(0), ident.name), - "ALTER COLUMN ... FIRST | ALTER") + throw QueryCompilationErrors.unsupportedTableOperationError( + catalog, ident, "ALTER COLUMN ... FIRST | ALTER") } val builder = new MetadataBuilder // Add comment to metadata @@ -105,14 +100,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) AlterTableChangeColumnCommand(table.catalogTable.identifier, colName, newColumn) case RenameColumn(ResolvedV1TableIdentifier(ident), _, _) => - throw QueryCompilationErrors.operationOnlySupportedWithV2TableError( - Seq(ident.catalog.get, ident.database.get, ident.table), - "RENAME COLUMN") + throw QueryCompilationErrors.unsupportedTableOperationError(ident, "RENAME COLUMN") case DropColumns(ResolvedV1TableIdentifier(ident), _, _) => - throw QueryCompilationErrors.operationOnlySupportedWithV2TableError( - Seq(ident.catalog.get, ident.database.get, ident.table), - "DROP COLUMN") + throw QueryCompilationErrors.unsupportedTableOperationError(ident, "DROP COLUMN") case SetTableProperties(ResolvedV1TableIdentifier(ident), props) => AlterTableSetPropertiesCommand(ident, props, isView = false) @@ -204,9 +195,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case c @ ReplaceTable(ResolvedV1Identifier(ident), _, _, _, _) => val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName) if (!isV2Provider(provider)) { - throw QueryCompilationErrors.operationOnlySupportedWithV2TableError( - Seq(ident.catalog.get, ident.database.get, ident.table), - "REPLACE TABLE") + throw QueryCompilationErrors.unsupportedTableOperationError( + ident, "REPLACE TABLE") } else { c } @@ -214,9 +204,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case c @ ReplaceTableAsSelect(ResolvedV1Identifier(ident), _, _, _, _, _, _) => val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName) if (!isV2Provider(provider)) { - throw QueryCompilationErrors.operationOnlySupportedWithV2TableError( - Seq(ident.catalog.get, ident.database.get, ident.table), - "REPLACE TABLE AS SELECT") + throw QueryCompilationErrors.unsupportedTableOperationError( + ident, "REPLACE TABLE AS SELECT") } else { c } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index f6266bcb33f6..351f6d5456d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -289,8 +289,11 @@ case class AlterTableAddColumnsCommand( sparkSession: SparkSession, tableProvider: Option[String]): Seq[StructField] = { colsToAdd.map { col: StructField => if (col.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) { + val schema = StructType(Array(col)) + ResolveDefaultColumns.validateTableProviderForDefaultValue( + schema, tableProvider, "ALTER TABLE ADD COLUMNS", true) val foldedStructType = ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults( - StructType(Array(col)), tableProvider, "ALTER TABLE ADD COLUMNS", true) + schema, "ALTER TABLE ADD COLUMNS") foldedStructType.fields(0) } else { col diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 74539b54117f..e3a1f6f6b684 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -133,14 +133,15 @@ case class DataSourceAnalysis(analyzer: Analyzer) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case CreateTable(tableDesc, mode, None) if DDLUtils.isDatasourceTable(tableDesc) => + ResolveDefaultColumns.validateTableProviderForDefaultValue( + tableDesc.schema, tableDesc.provider, "CREATE TABLE", false) val newSchema: StructType = ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults( - tableDesc.schema, tableDesc.provider, "CREATE TABLE", false) + tableDesc.schema, "CREATE TABLE") if (GeneratedColumn.hasGeneratedColumns(newSchema)) { - throw QueryCompilationErrors.generatedColumnsUnsupported( - Seq(tableDesc.identifier.catalog.get, tableDesc.identifier.database.get, - tableDesc.identifier.table)) + throw QueryCompilationErrors.unsupportedTableOperationError( + tableDesc.identifier, "generated columns") } val newTableDesc = tableDesc.copy(schema = newSchema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 4d84c42bc5be..55dc8b54e005 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -22,6 +22,7 @@ import scala.collection.mutable import org.apache.commons.lang3.StringUtils +import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable} @@ -175,11 +176,12 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case CreateTable(ResolvedIdentifier(catalog, ident), schema, partitioning, tableSpec, ifNotExists) => + ResolveDefaultColumns.validateCatalogForDefaultValue(schema, catalog.asTableCatalog, ident) val newSchema: StructType = ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults( - schema, tableSpec.provider, "CREATE TABLE", false) + schema, "CREATE TABLE") GeneratedColumn.validateGeneratedColumns( - newSchema, catalog.asTableCatalog, ident.asMultipartIdentifier, "CREATE TABLE") + newSchema, catalog.asTableCatalog, ident, "CREATE TABLE") CreateTableExec(catalog.asTableCatalog, ident, structTypeToV2Columns(newSchema), partitioning, qualifyLocInTableSpec(tableSpec), ifNotExists) :: Nil @@ -201,11 +203,12 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat RefreshTableExec(r.catalog, r.identifier, recacheTable(r)) :: Nil case ReplaceTable(ResolvedIdentifier(catalog, ident), schema, parts, tableSpec, orCreate) => + ResolveDefaultColumns.validateCatalogForDefaultValue(schema, catalog.asTableCatalog, ident) val newSchema: StructType = ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults( - schema, tableSpec.provider, "CREATE TABLE", false) + schema, "CREATE TABLE") GeneratedColumn.validateGeneratedColumns( - newSchema, catalog.asTableCatalog, ident.asMultipartIdentifier, "CREATE TABLE") + newSchema, catalog.asTableCatalog, ident, "CREATE TABLE") val v2Columns = structTypeToV2Columns(newSchema) catalog match { @@ -308,12 +311,11 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat } case LogicalRelation(_, _, catalogTable, _) => val tableIdentifier = catalogTable.get.identifier - throw QueryCompilationErrors.operationOnlySupportedWithV2TableError( - Seq(tableIdentifier.catalog.get, tableIdentifier.database.get, tableIdentifier.table), + throw QueryCompilationErrors.unsupportedTableOperationError( + tableIdentifier, "DELETE") - case _ => - throw QueryCompilationErrors.operationOnlySupportedWithV2TableError( - Seq(), "DELETE") + case other => + throw SparkException.internalError("Unexpected table relation: " + other) } case ReplaceData(_: DataSourceV2Relation, _, query, r: DataSourceV2Relation, Some(write)) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 461e948b0297..b4789c98df95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -26,7 +26,7 @@ import scala.collection.mutable import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType, CatalogUtils, SessionCatalog} -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Column, FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableChange, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Column, FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableCatalogCapability, TableChange, V1Table} import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.connector.expressions.Transform @@ -51,6 +51,12 @@ class V2SessionCatalog(catalog: SessionCatalog) // This class is instantiated by Spark, so `initialize` method will not be called. override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {} + override def capabilities(): util.Set[TableCatalogCapability] = { + Set( + TableCatalogCapability.SUPPORT_COLUMN_DEFAULT_VALUE + ).asJava + } + override def listTables(namespace: Array[String]): Array[Identifier] = { namespace match { case Array(db) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index f24a85aa6daa..bb3843e3fee8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1422,6 +1422,38 @@ class DataSourceV2SQLSuiteV1Filter } } + test("SPARK-42684: Column default value only allowed with TableCatalogs that " + + "SUPPORT_COLUMN_DEFAULT_VALUE") { + val tblName = "my_tab" + val tableDefinition = + s"$tblName(c1 INT, c2 INT DEFAULT 0)" + for (statement <- Seq("CREATE TABLE", "REPLACE TABLE")) { + // InMemoryTableCatalog.capabilities() contains SUPPORT_COLUMN_DEFAULT_VALUE + withTable(s"testcat.$tblName") { + if (statement == "REPLACE TABLE") { + sql(s"CREATE TABLE testcat.$tblName(a INT) USING foo") + } + // Can create table with a generated column + sql(s"$statement testcat.$tableDefinition") + assert(catalog("testcat").asTableCatalog.tableExists(Identifier.of(Array(), tblName))) + } + // BasicInMemoryTableCatalog.capabilities() = {} + withSQLConf("spark.sql.catalog.dummy" -> classOf[BasicInMemoryTableCatalog].getName) { + checkError( + exception = intercept[AnalysisException] { + sql("USE dummy") + sql(s"$statement dummy.$tableDefinition") + }, + errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION", + parameters = Map( + "tableName" -> "`dummy`.`my_tab`", + "operation" -> "column default value" + ) + ) + } + } + } + test("SPARK-41290: Generated columns only allowed with TableCatalogs that " + "SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS") { val tblName = "my_tab" @@ -1446,7 +1478,7 @@ class DataSourceV2SQLSuiteV1Filter }, errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION", parameters = Map( - "tableName" -> "`my_tab`", + "tableName" -> "`dummy`.`my_tab`", "operation" -> "generated columns" ) )