From ec1f3ce6f05fe35ed096d6e2ccb95c8d396766e9 Mon Sep 17 00:00:00 2001 From: allisonwang-db Date: Tue, 5 Dec 2023 12:00:27 -0800 Subject: [PATCH 1/7] CTAS --- .../sql/errors/QueryCompilationErrors.scala | 4 +- .../datasources/v2/TableCapabilityCheck.scala | 2 +- .../sql/connector/DataSourceV2Suite.scala | 153 ++++++++++++++++++ 3 files changed, 156 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index cdab854c004b..8940382f52aa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -926,8 +926,8 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat unsupportedTableOperationError(table.name(), "either micro-batch or continuous scan") } - def unsupportedAppendInBatchModeError(table: Table): Throwable = { - unsupportedTableOperationError(table.name(), "append in batch mode") + def unsupportedAppendInBatchModeError(name: String): Throwable = { + unsupportedTableOperationError(name, "append in batch mode") } def unsupportedDynamicOverwriteInBatchModeError(table: Table): Throwable = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala index b1a93addc80b..e332c6b8014a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala @@ -47,7 +47,7 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) { // TODO: check STREAMING_WRITE capability. It's not doable now because we don't have a // a logical plan for streaming write. case AppendData(r: DataSourceV2Relation, _, _, _, _, _) if !supportsBatchWrite(r.table) => - throw QueryCompilationErrors.unsupportedAppendInBatchModeError(r.table) + throw QueryCompilationErrors.unsupportedAppendInBatchModeError(r.name) case OverwritePartitionsDynamic(r: DataSourceV2Relation, _, _, _, _) if !r.table.supports(BATCH_WRITE) || !r.table.supports(OVERWRITE_DYNAMIC) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index 6e365e1d6059..107f413f1989 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -26,6 +26,7 @@ import test.org.apache.spark.sql.connector._ import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.connector.catalog.{PartitionInternalRow, SupportsRead, Table, TableCapability, TableProvider} import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, Literal, NamedReference, NullOrdering, SortDirection, SortOrder, Transform} @@ -725,6 +726,158 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS } } + test("SPARK-46272: create table as select") { + val cls = classOf[SupportsExternalMetadataDataSource] + withTable("test") { + sql( + s""" + |CREATE TABLE test USING ${cls.getName} + |AS VALUES (0, 1), (1, 2) + |""".stripMargin) + checkAnswer(sql("SELECT * FROM test"), Seq(Row(0, 1), Row(1, 2))) + sql( + s""" + |CREATE OR REPLACE TABLE test USING ${cls.getName} + |AS VALUES (2, 3), (4, 5) + |""".stripMargin) + checkAnswer(sql("SELECT * FROM test"), Seq(Row(2, 3), Row(4, 5))) + sql( + s""" + |CREATE TABLE IF NOT EXISTS test USING ${cls.getName} + |AS VALUES (3, 4), (4, 5) + |""".stripMargin) + checkAnswer(sql("SELECT * FROM test"), Seq(Row(2, 3), Row(4, 5))) + } + } + + test("SPARK-46272: create table as select - error cases") { + val cls = classOf[SupportsExternalMetadataDataSource] + // CTAS with too many columns + withTable("test") { + checkError( + exception = intercept[AnalysisException] { + sql( + s""" + |CREATE TABLE test USING ${cls.getName} + |AS VALUES (0, 1, 2), (1, 2, 3) + |""".stripMargin) + }, + errorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`test`", + "tableColumns" -> "`i`, `j`", + "dataColumns" -> "`col1`, `col2`, `col3`")) + } + // CTAS with not enough columns + withTable("test") { + checkError( + exception = intercept[AnalysisException] { + sql( + s""" + |CREATE TABLE test USING ${cls.getName} + |AS VALUES (0), (1) + |""".stripMargin) + }, + errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`test`", + "tableColumns" -> "`i`, `j`", + "dataColumns" -> "`col1`")) + } + // CTAS with type mismatch + withTable("test") { + checkError( + exception = intercept[AnalysisException] { + sql( + s""" + |CREATE TABLE test USING ${cls.getName} + |AS VALUES ('a', 'b'), ('c', 'd') + |""".stripMargin) + }, + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`test`", + "colName" -> "`i`", "srcType" -> "\"STRING\"", "targetType" -> "\"INT\"")) + } + // CTAS with schema specified + withTable("test") { + val error = intercept[ParseException] { + sql( + s""" + |CREATE TABLE test(x INT, y INT) USING ${cls.getName} + |AS VALUES (1, 2), (3, 4) + |""".stripMargin + ) + } + assert(error.getMessage.contains( + "Operation not allowed: Schema may not be specified in a " + + "Create Table As Select (CTAS) statement")) + } + } + + test("SPARK-46272: create or replace table as select with path options") { + val cls = classOf[SupportsExternalMetadataDataSource] + withTempDir { dir => + val path = s"${dir.getCanonicalPath}/test" + Seq((0, 1), (1, 2)).toDF("x", "y").write.format("csv").save(path) + withTable("test") { + sql( + s""" + |CREATE TABLE test USING ${cls.getName} + |OPTIONS (PATH '$path') + |AS VALUES (0, 1) + |""".stripMargin) + checkAnswer(sql("SELECT * FROM test"), Seq(Row(0, 1), Row(0, 1), Row(1, 2))) + // Check the data currently in the path location. + checkAnswer( + spark.read.format("csv").load(path), + Seq(Row("0", "1"), Row("0", "1"), Row("1", "2"))) + // Replace the table with new data. + sql( + s""" + |CREATE OR REPLACE TABLE test USING ${cls.getName} + |OPTIONS (PATH '$path') + |AS VALUES (2, 3) + |""".stripMargin) + checkAnswer(sql("SELECT * FROM test"), Seq(Row(0, 1), Row(0, 1), Row(1, 2), Row(2, 3))) + // Replace the table without the path options. + sql( + s""" + |CREATE OR REPLACE TABLE test USING ${cls.getName} + |AS VALUES (3, 4) + |""".stripMargin) + checkAnswer(sql("SELECT * FROM test"), Seq(Row(3, 4))) + } + } + } + + test("SPARK-46272: create table as select with incompatible data sources") { + // CTAS with data sources that do not support external metadata. + withTable("test") { + val cls = classOf[SimpleDataSourceV2] + checkError( + exception = intercept[SparkUnsupportedOperationException] { + sql(s"CREATE TABLE test USING ${cls.getName} AS VALUES (0, 1)") + }, + errorClass = "CANNOT_CREATE_DATA_SOURCE_TABLE.EXTERNAL_METADATA_UNSUPPORTED", + parameters = Map( + "tableName" -> "`default`.`test`", + "provider" -> "org.apache.spark.sql.connector.SimpleDataSourceV2")) + } + // CTAS with data sources that do not support batch write. + withTable("test") { + val cls = classOf[SchemaRequiredDataSource] + checkError( + exception = intercept[AnalysisException] { + sql(s"CREATE TABLE test USING ${cls.getName} AS SELECT * FROM VALUES (0, 1)") + }, + errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`test`", + "operation" -> "append in batch mode")) + } + } + test("SPARK-46273: insert into") { val cls = classOf[SupportsExternalMetadataDataSource] withTable("test") { From 1568aec96cb3e1e8ee71d3084021b5bfaef3be10 Mon Sep 17 00:00:00 2001 From: allisonwang-db Date: Mon, 11 Dec 2023 16:03:19 +0800 Subject: [PATCH 2/7] address comments --- .../sql/connector/DataSourceV2Suite.scala | 34 ++++++------------- 1 file changed, 10 insertions(+), 24 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index 107f413f1989..eb6811f2137d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -26,7 +26,6 @@ import test.org.apache.spark.sql.connector._ import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.connector.catalog.{PartitionInternalRow, SupportsRead, Table, TableCapability, TableProvider} import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, Literal, NamedReference, NullOrdering, SortDirection, SortOrder, Transform} @@ -750,17 +749,12 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS } } - test("SPARK-46272: create table as select - error cases") { + test("SPARK-46272: create table as select - too many columns") { val cls = classOf[SupportsExternalMetadataDataSource] - // CTAS with too many columns withTable("test") { checkError( exception = intercept[AnalysisException] { - sql( - s""" - |CREATE TABLE test USING ${cls.getName} - |AS VALUES (0, 1, 2), (1, 2, 3) - |""".stripMargin) + sql(s"CREATE TABLE test USING ${cls.getName} AS VALUES (0, 1, 2), (1, 2, 3)") }, errorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS", parameters = Map( @@ -768,7 +762,10 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS "tableColumns" -> "`i`, `j`", "dataColumns" -> "`col1`, `col2`, `col3`")) } - // CTAS with not enough columns + } + + test("SPARK-46272: create table as select - not enough columns") { + val cls = classOf[SupportsExternalMetadataDataSource] withTable("test") { checkError( exception = intercept[AnalysisException] { @@ -784,7 +781,10 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS "tableColumns" -> "`i`, `j`", "dataColumns" -> "`col1`")) } - // CTAS with type mismatch + } + + test("SPARK-46272: create table as select - column type mismatch") { + val cls = classOf[SupportsExternalMetadataDataSource] withTable("test") { checkError( exception = intercept[AnalysisException] { @@ -799,20 +799,6 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS "tableName" -> "`spark_catalog`.`default`.`test`", "colName" -> "`i`", "srcType" -> "\"STRING\"", "targetType" -> "\"INT\"")) } - // CTAS with schema specified - withTable("test") { - val error = intercept[ParseException] { - sql( - s""" - |CREATE TABLE test(x INT, y INT) USING ${cls.getName} - |AS VALUES (1, 2), (3, 4) - |""".stripMargin - ) - } - assert(error.getMessage.contains( - "Operation not allowed: Schema may not be specified in a " + - "Create Table As Select (CTAS) statement")) - } } test("SPARK-46272: create or replace table as select with path options") { From 05d16472737bfd354d10554660c88fc994bc809b Mon Sep 17 00:00:00 2001 From: allisonwang-db Date: Tue, 12 Dec 2023 15:50:14 +0800 Subject: [PATCH 3/7] address comments --- .../main/resources/error/error-classes.json | 5 ++ docs/sql-error-conditions.md | 6 ++ .../sql/errors/QueryCompilationErrors.scala | 9 +++ .../v2/WriteToDataSourceV2Exec.scala | 8 +- .../sql/connector/DataSourceV2Suite.scala | 77 ++++++++----------- 5 files changed, 60 insertions(+), 45 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 30aacc07d318..c3659219f417 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -893,6 +893,11 @@ ], "sqlState" : "42K02" }, + "DATA_SOURCE_TABLE_SCHEMA_MISMATCH" : { + "message" : [ + "The schema of the data source table does not match the actual schema . If you are using the DataFrameReader.schema API or creating a table, avoid specifying the schema." + ] + }, "DATETIME_OVERFLOW" : { "message" : [ "Datetime operation overflow: ." diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index e4b04ce02fe2..b027f9b93c56 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -492,6 +492,12 @@ Data source '``' not found. Please make sure the data source is regist Failed to find the data source: ``. Please find packages at `https://spark.apache.org/third-party-projects.html`. +### DATA_SOURCE_TABLE_SCHEMA_MISMATCH + +SQLSTATE: none assigned + +The schema of the data source table `` does not match the actual schema ``. If you are using the DataFrameReader.schema API or creating a table, avoid specifying the schema. + ### DATETIME_OVERFLOW [SQLSTATE: 22008](sql-error-conditions-sqlstates.html#class-22-data-exception) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 8940382f52aa..3fd1fe04aed6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -3924,4 +3924,13 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat errorClass = "NESTED_EXECUTE_IMMEDIATE", messageParameters = Map("sqlString" -> toSQLStmt(queryString))) } + + def dataSourceTableSchemaMismatchError( + tableSchema: StructType, actualSchema: StructType): Throwable = { + new AnalysisException( + errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH", + messageParameters = Map( + "tableSchema" -> toSQLType(tableSchema), + "actualSchema" -> toSQLType(actualSchema))) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 2527f201f3a8..ad0ceed8452a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterF import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric, SQLMetrics} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.util.{LongAccumulator, Utils} import org.apache.spark.util.ArrayImplicits._ @@ -76,6 +76,7 @@ case class CreateTableAsSelectExec( val properties = CatalogV2Util.convertTableProperties(tableSpec) override protected def run(): Seq[InternalRow] = { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ if (catalog.tableExists(ident)) { if (ifNotExists) { return Nil @@ -85,6 +86,11 @@ case class CreateTableAsSelectExec( val table = catalog.createTable( ident, getV2Columns(query.schema, catalog.useNullableQuerySchema), partitioning.toArray, properties.asJava) + // Check if the table schema matches the schema of the query. + if (!DataType.equalsIgnoreNullability(table.columns().asSchema, query.schema)) { + throw QueryCompilationErrors.dataSourceTableSchemaMismatchError( + table.columns().asSchema, query.schema) + } writeToTable(catalog, table, writeOptions, ident, query) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index eb6811f2137d..dc576bab2bf6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -698,7 +698,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS } test("SPARK-46043: create table in SQL with path option") { - val cls = classOf[SupportsExternalMetadataDataSource] + val cls = classOf[WritableDataSourceSupportsExternalMetadata] withTempDir { dir => val path = s"${dir.getCanonicalPath}/test" Seq((0, 1), (1, 2)).toDF("x", "y").write.format("csv").save(path) @@ -726,83 +726,63 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS } test("SPARK-46272: create table as select") { - val cls = classOf[SupportsExternalMetadataDataSource] + val cls = classOf[WritableDataSourceSupportsExternalMetadata] withTable("test") { sql( s""" |CREATE TABLE test USING ${cls.getName} - |AS VALUES (0, 1), (1, 2) + |AS VALUES (0, 1), (1, 2) t(i, j) |""".stripMargin) - checkAnswer(sql("SELECT * FROM test"), Seq(Row(0, 1), Row(1, 2))) + checkAnswer(sql("SELECT * FROM test"), Seq((0, 1), (1, 2)).toDF("i", "j")) sql( s""" |CREATE OR REPLACE TABLE test USING ${cls.getName} - |AS VALUES (2, 3), (4, 5) + |AS VALUES (2, 3), (4, 5) t(i, j) |""".stripMargin) - checkAnswer(sql("SELECT * FROM test"), Seq(Row(2, 3), Row(4, 5))) + checkAnswer(sql("SELECT * FROM test"), Seq((2, 3), (4, 5)).toDF("i", "j")) sql( s""" |CREATE TABLE IF NOT EXISTS test USING ${cls.getName} |AS VALUES (3, 4), (4, 5) |""".stripMargin) - checkAnswer(sql("SELECT * FROM test"), Seq(Row(2, 3), Row(4, 5))) + checkAnswer(sql("SELECT * FROM test"), Seq((2, 3), (4, 5)).toDF("i", "j")) } } - test("SPARK-46272: create table as select - too many columns") { - val cls = classOf[SupportsExternalMetadataDataSource] + test("SPARK-46272: create table as select - schema name mismatch") { + val cls = classOf[WritableDataSourceSupportsExternalMetadata] withTable("test") { checkError( exception = intercept[AnalysisException] { - sql(s"CREATE TABLE test USING ${cls.getName} AS VALUES (0, 1, 2), (1, 2, 3)") + sql(s"CREATE TABLE test USING ${cls.getName} AS VALUES (0, 1), (1, 2)") }, - errorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS", - parameters = Map( - "tableName" -> "`spark_catalog`.`default`.`test`", - "tableColumns" -> "`i`, `j`", - "dataColumns" -> "`col1`, `col2`, `col3`")) - } - } - - test("SPARK-46272: create table as select - not enough columns") { - val cls = classOf[SupportsExternalMetadataDataSource] - withTable("test") { - checkError( - exception = intercept[AnalysisException] { - sql( - s""" - |CREATE TABLE test USING ${cls.getName} - |AS VALUES (0), (1) - |""".stripMargin) - }, - errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", + errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH", parameters = Map( - "tableName" -> "`spark_catalog`.`default`.`test`", - "tableColumns" -> "`i`, `j`", - "dataColumns" -> "`col1`")) + "tableSchema" -> "\"STRUCT\"", + "actualSchema" -> "\"STRUCT\"")) } } test("SPARK-46272: create table as select - column type mismatch") { - val cls = classOf[SupportsExternalMetadataDataSource] + val cls = classOf[WritableDataSourceSupportsExternalMetadata] withTable("test") { checkError( exception = intercept[AnalysisException] { sql( s""" |CREATE TABLE test USING ${cls.getName} - |AS VALUES ('a', 'b'), ('c', 'd') + |AS VALUES ('a', 'b'), ('c', 'd') t(i, j) |""".stripMargin) }, - errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH", parameters = Map( - "tableName" -> "`spark_catalog`.`default`.`test`", - "colName" -> "`i`", "srcType" -> "\"STRING\"", "targetType" -> "\"INT\"")) + "tableSchema" -> "\"STRUCT\"", + "actualSchema" -> "\"STRUCT\"")) } } test("SPARK-46272: create or replace table as select with path options") { - val cls = classOf[SupportsExternalMetadataDataSource] + val cls = classOf[CustomSchemaAndPartitioningDataSource] withTempDir { dir => val path = s"${dir.getCanonicalPath}/test" Seq((0, 1), (1, 2)).toDF("x", "y").write.format("csv").save(path) @@ -865,7 +845,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS } test("SPARK-46273: insert into") { - val cls = classOf[SupportsExternalMetadataDataSource] + val cls = classOf[CustomSchemaAndPartitioningDataSource] withTable("test") { sql( s""" @@ -905,7 +885,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS } test("SPARK-46273: insert overwrite") { - val cls = classOf[SupportsExternalMetadataDataSource] + val cls = classOf[CustomSchemaAndPartitioningDataSource] withTable("test") { sql( s""" @@ -927,7 +907,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS } test("SPARK-46273: insert into with partition") { - val cls = classOf[SupportsExternalMetadataDataSource] + val cls = classOf[CustomSchemaAndPartitioningDataSource] withTable("test") { sql(s"CREATE TABLE test(x INT, y INT) USING ${cls.getName} PARTITIONED BY (x, y)") sql("INSERT INTO test PARTITION(x = 1) VALUES (2)") @@ -957,7 +937,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS } test("SPARK-46273: insert overwrite with partition") { - val cls = classOf[SupportsExternalMetadataDataSource] + val cls = classOf[CustomSchemaAndPartitioningDataSource] withTable("test") { sql(s"CREATE TABLE test (x INT, y INT) USING ${cls.getName} PARTITIONED BY (x, y)") sql("INSERT INTO test PARTITION(x = 1) VALUES (2)") @@ -1473,9 +1453,18 @@ class SimpleWriteOnlyDataSource extends SimpleWritableDataSource { } } -class SupportsExternalMetadataDataSource extends SimpleWritableDataSource { +/** + * A writable data source that supports external metadata with a fixed schema (i int, j int). + */ +class WritableDataSourceSupportsExternalMetadata extends SimpleWritableDataSource { override def supportsExternalMetadata(): Boolean = true +} +/** + * A writable data source that supports external metadata with + * user-specified schema and partitioning. + */ +class CustomSchemaAndPartitioningDataSource extends WritableDataSourceSupportsExternalMetadata { class TestTable( schema: StructType, partitioning: Array[Transform], From 5f45a136737b383f50db62d6d19fa5575821a2ad Mon Sep 17 00:00:00 2001 From: allisonwang-db Date: Wed, 13 Dec 2023 15:39:07 +0800 Subject: [PATCH 4/7] address comments --- .../src/main/resources/error/error-classes.json | 3 ++- docs/sql-error-conditions.md | 2 +- .../datasources/v2/V2SessionCatalog.scala | 17 +++++++++++++++-- .../v2/WriteToDataSourceV2Exec.scala | 8 +------- .../spark/sql/connector/DataSourceV2Suite.scala | 14 ++++++++++++++ 5 files changed, 33 insertions(+), 11 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index c3659219f417..930042505379 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -896,7 +896,8 @@ "DATA_SOURCE_TABLE_SCHEMA_MISMATCH" : { "message" : [ "The schema of the data source table does not match the actual schema . If you are using the DataFrameReader.schema API or creating a table, avoid specifying the schema." - ] + ], + "sqlState" : "42K03" }, "DATETIME_OVERFLOW" : { "message" : [ diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index b027f9b93c56..94c7c167e392 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -494,7 +494,7 @@ Failed to find the data source: ``. Please find packages at `https://s ### DATA_SOURCE_TABLE_SCHEMA_MISMATCH -SQLSTATE: none assigned +[SQLSTATE: 42K03](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) The schema of the data source table `` does not match the actual schema ``. If you are using the DataFrameReader.schema API or creating a table, avoid specifying the schema. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 5b1ff7c67b26..4b6eb484f0ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -27,6 +27,7 @@ import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils, ClusterBySpec, SessionCatalog} +import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.catalyst.util.TypeUtils._ import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Column, FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableCatalogCapability, TableChange, V1Table} import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty @@ -36,7 +37,7 @@ import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.connector.V1Function -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.ArrayImplicits._ @@ -232,7 +233,19 @@ class V2SessionCatalog(catalog: SessionCatalog) throw QueryCompilationErrors.tableAlreadyExistsError(ident) } - loadTable(ident) + val table = loadTable(ident) + + // Check if the schema of the created table matches the given schema. + if (schema.nonEmpty) { + val tableSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema( + table.columns().asSchema) + if (!DataType.equalsIgnoreNullability(tableSchema, schema)) { + throw QueryCompilationErrors.dataSourceTableSchemaMismatchError( + table.columns().asSchema, schema) + } + } + + table } private def toOptions(properties: Map[String, String]): Map[String, String] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index ad0ceed8452a..2527f201f3a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterF import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric, SQLMetrics} -import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.types.StructType import org.apache.spark.util.{LongAccumulator, Utils} import org.apache.spark.util.ArrayImplicits._ @@ -76,7 +76,6 @@ case class CreateTableAsSelectExec( val properties = CatalogV2Util.convertTableProperties(tableSpec) override protected def run(): Seq[InternalRow] = { - import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ if (catalog.tableExists(ident)) { if (ifNotExists) { return Nil @@ -86,11 +85,6 @@ case class CreateTableAsSelectExec( val table = catalog.createTable( ident, getV2Columns(query.schema, catalog.useNullableQuerySchema), partitioning.toArray, properties.asJava) - // Check if the table schema matches the schema of the query. - if (!DataType.equalsIgnoreNullability(table.columns().asSchema, query.schema)) { - throw QueryCompilationErrors.dataSourceTableSchemaMismatchError( - table.columns().asSchema, query.schema) - } writeToTable(catalog, table, writeOptions, ident, query) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index dc576bab2bf6..3f3dc82da5ad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -725,6 +725,20 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS } } + test("SPARK-46272: create table - schema mismatch") { + withTable("test") { + val cls = classOf[WritableDataSourceSupportsExternalMetadata] + checkError( + exception = intercept[AnalysisException] { + sql(s"CREATE TABLE test (x INT, y INT) USING ${cls.getName}") + }, + errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH", + parameters = Map( + "tableSchema" -> "\"STRUCT\"", + "actualSchema" -> "\"STRUCT\"")) + } + } + test("SPARK-46272: create table as select") { val cls = classOf[WritableDataSourceSupportsExternalMetadata] withTable("test") { From 725ecdac2af5c6a124627f56106eb20040d8181e Mon Sep 17 00:00:00 2001 From: allisonwang-db Date: Wed, 13 Dec 2023 21:15:34 +0800 Subject: [PATCH 5/7] fix tests --- ...SourceV2DataFrameSessionCatalogSuite.scala | 2 +- .../spark/sql/connector/FakeV2Provider.scala | 22 +++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala index 835566238c9c..5d5ea6499c49 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala @@ -153,7 +153,7 @@ private [connector] trait SessionCatalogTest[T <: Table, Catalog <: TestV2Sessio spark.sessionState.catalogManager.catalog(name) } - protected val v2Format: String = classOf[FakeV2Provider].getName + protected val v2Format: String = classOf[FakeV2ProviderWithCustomSchema].getName protected val catalogClassName: String = classOf[InMemoryTableSessionCatalog].getName diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala index 174700e8d24f..25d2d5a67d44 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala @@ -61,3 +61,25 @@ class FakeV2Provider extends TableProvider { object FakeV2Provider { val schema: StructType = new StructType().add("i", "int").add("j", "int") } + +class FakeV2ProviderWithCustomSchema extends FakeV2Provider { + class FakeTable( + schema: StructType, + partitioning: Array[Transform], + options: CaseInsensitiveStringMap) extends SimpleBatchTable { + override def schema(): StructType = schema + + override def partitioning(): Array[Transform] = partitioning + + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + new MyScanBuilder() + } + } + + override def getTable( + schema: StructType, + partitioning: Array[Transform], + properties: util.Map[String, String]): Table = { + new FakeTable(schema, partitioning, new CaseInsensitiveStringMap(properties)) + } +} From 564fbcff1639f2141aaf9fc0857407cdd4970800 Mon Sep 17 00:00:00 2001 From: allisonwang-db Date: Thu, 14 Dec 2023 16:01:17 +0800 Subject: [PATCH 6/7] fix tests --- .../spark/sql/execution/datasources/v2/V2SessionCatalog.scala | 2 ++ .../org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala | 2 +- .../spark/sql/connector/SupportsCatalogOptionsSuite.scala | 2 +- .../org/apache/spark/sql/connector/V1WriteFallbackSuite.scala | 2 +- 4 files changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 4b6eb484f0ec..a7694f5d829d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -236,6 +236,8 @@ class V2SessionCatalog(catalog: SessionCatalog) val table = loadTable(ident) // Check if the schema of the created table matches the given schema. + // TODO: move this check in loadTable to match the behavior with + // existing file data sources. if (schema.nonEmpty) { val tableSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema( table.columns().asSchema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 47e79e45b737..589283a29b85 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -54,7 +54,7 @@ abstract class DataSourceV2SQLSuite with DeleteFromTests with DatasourceV2SQLBase with StatsEstimationTestBase with AdaptiveSparkPlanHelper { - protected val v2Source = classOf[FakeV2Provider].getName + protected val v2Source = classOf[FakeV2ProviderWithCustomSchema].getName override protected val v2Format = v2Source protected def doInsert(tableName: String, insert: DataFrame, mode: SaveMode): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala index fd4f719417e4..b952270fc786 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala @@ -380,7 +380,7 @@ class SupportsCatalogOptionsSuite extends QueryTest with SharedSparkSession with } class CatalogSupportingInMemoryTableProvider - extends FakeV2Provider + extends FakeV2ProviderWithCustomSchema with SupportsCatalogOptions { override def extractIdentifier(options: CaseInsensitiveStringMap): Identifier = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala index 181dc0ea2074..ad31cf84eeb3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala @@ -249,7 +249,7 @@ private object InMemoryV1Provider { } class InMemoryV1Provider - extends FakeV2Provider + extends FakeV2ProviderWithCustomSchema with DataSourceRegister with CreatableRelationProvider { override def getTable(options: CaseInsensitiveStringMap): Table = { From 7ed9ad2673273d67df7b73cf913b111de25c9948 Mon Sep 17 00:00:00 2001 From: allisonwang-db Date: Fri, 15 Dec 2023 14:14:02 +0800 Subject: [PATCH 7/7] fix tests --- .../spark/sql/streaming/test/DataStreamTableAPISuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala index 7bf81fb98655..5a4f386f1d1d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 -import org.apache.spark.sql.connector.{FakeV2Provider, InMemoryTableSessionCatalog} +import org.apache.spark.sql.connector.{FakeV2Provider, FakeV2ProviderWithCustomSchema, InMemoryTableSessionCatalog} import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTableCatalog, MetadataColumn, SupportsMetadataColumns, SupportsRead, Table, TableCapability, V2TableWithV1Fallback} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.read.ScanBuilder @@ -204,7 +204,7 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter { } test("write: write to table with default session catalog") { - val v2Source = classOf[FakeV2Provider].getName + val v2Source = classOf[FakeV2ProviderWithCustomSchema].getName spark.conf.set(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION.key, classOf[InMemoryTableSessionCatalog].getName)