diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 30aacc07d318..930042505379 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -893,6 +893,12 @@ ], "sqlState" : "42K02" }, + "DATA_SOURCE_TABLE_SCHEMA_MISMATCH" : { + "message" : [ + "The schema of the data source table does not match the actual schema . If you are using the DataFrameReader.schema API or creating a table, avoid specifying the schema." + ], + "sqlState" : "42K03" + }, "DATETIME_OVERFLOW" : { "message" : [ "Datetime operation overflow: ." diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index e4b04ce02fe2..94c7c167e392 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -492,6 +492,12 @@ Data source '``' not found. Please make sure the data source is regist Failed to find the data source: ``. Please find packages at `https://spark.apache.org/third-party-projects.html`. +### DATA_SOURCE_TABLE_SCHEMA_MISMATCH + +[SQLSTATE: 42K03](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) + +The schema of the data source table `` does not match the actual schema ``. If you are using the DataFrameReader.schema API or creating a table, avoid specifying the schema. + ### DATETIME_OVERFLOW [SQLSTATE: 22008](sql-error-conditions-sqlstates.html#class-22-data-exception) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index cdab854c004b..3fd1fe04aed6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -926,8 +926,8 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat unsupportedTableOperationError(table.name(), "either micro-batch or continuous scan") } - def unsupportedAppendInBatchModeError(table: Table): Throwable = { - unsupportedTableOperationError(table.name(), "append in batch mode") + def unsupportedAppendInBatchModeError(name: String): Throwable = { + unsupportedTableOperationError(name, "append in batch mode") } def unsupportedDynamicOverwriteInBatchModeError(table: Table): Throwable = { @@ -3924,4 +3924,13 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat errorClass = "NESTED_EXECUTE_IMMEDIATE", messageParameters = Map("sqlString" -> toSQLStmt(queryString))) } + + def dataSourceTableSchemaMismatchError( + tableSchema: StructType, actualSchema: StructType): Throwable = { + new AnalysisException( + errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH", + messageParameters = Map( + "tableSchema" -> toSQLType(tableSchema), + "actualSchema" -> toSQLType(actualSchema))) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala index b1a93addc80b..e332c6b8014a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala @@ -47,7 +47,7 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) { // TODO: check STREAMING_WRITE capability. It's not doable now because we don't have a // a logical plan for streaming write. case AppendData(r: DataSourceV2Relation, _, _, _, _, _) if !supportsBatchWrite(r.table) => - throw QueryCompilationErrors.unsupportedAppendInBatchModeError(r.table) + throw QueryCompilationErrors.unsupportedAppendInBatchModeError(r.name) case OverwritePartitionsDynamic(r: DataSourceV2Relation, _, _, _, _) if !r.table.supports(BATCH_WRITE) || !r.table.supports(OVERWRITE_DYNAMIC) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 5b1ff7c67b26..a7694f5d829d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -27,6 +27,7 @@ import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils, ClusterBySpec, SessionCatalog} +import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.catalyst.util.TypeUtils._ import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Column, FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableCatalogCapability, TableChange, V1Table} import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty @@ -36,7 +37,7 @@ import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.connector.V1Function -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.ArrayImplicits._ @@ -232,7 +233,21 @@ class V2SessionCatalog(catalog: SessionCatalog) throw QueryCompilationErrors.tableAlreadyExistsError(ident) } - loadTable(ident) + val table = loadTable(ident) + + // Check if the schema of the created table matches the given schema. + // TODO: move this check in loadTable to match the behavior with + // existing file data sources. + if (schema.nonEmpty) { + val tableSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema( + table.columns().asSchema) + if (!DataType.equalsIgnoreNullability(tableSchema, schema)) { + throw QueryCompilationErrors.dataSourceTableSchemaMismatchError( + table.columns().asSchema, schema) + } + } + + table } private def toOptions(properties: Map[String, String]): Map[String, String] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala index 835566238c9c..5d5ea6499c49 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala @@ -153,7 +153,7 @@ private [connector] trait SessionCatalogTest[T <: Table, Catalog <: TestV2Sessio spark.sessionState.catalogManager.catalog(name) } - protected val v2Format: String = classOf[FakeV2Provider].getName + protected val v2Format: String = classOf[FakeV2ProviderWithCustomSchema].getName protected val catalogClassName: String = classOf[InMemoryTableSessionCatalog].getName diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 47e79e45b737..589283a29b85 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -54,7 +54,7 @@ abstract class DataSourceV2SQLSuite with DeleteFromTests with DatasourceV2SQLBase with StatsEstimationTestBase with AdaptiveSparkPlanHelper { - protected val v2Source = classOf[FakeV2Provider].getName + protected val v2Source = classOf[FakeV2ProviderWithCustomSchema].getName override protected val v2Format = v2Source protected def doInsert(tableName: String, insert: DataFrame, mode: SaveMode): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index 6e365e1d6059..3f3dc82da5ad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -698,7 +698,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS } test("SPARK-46043: create table in SQL with path option") { - val cls = classOf[SupportsExternalMetadataDataSource] + val cls = classOf[WritableDataSourceSupportsExternalMetadata] withTempDir { dir => val path = s"${dir.getCanonicalPath}/test" Seq((0, 1), (1, 2)).toDF("x", "y").write.format("csv").save(path) @@ -725,8 +725,141 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS } } + test("SPARK-46272: create table - schema mismatch") { + withTable("test") { + val cls = classOf[WritableDataSourceSupportsExternalMetadata] + checkError( + exception = intercept[AnalysisException] { + sql(s"CREATE TABLE test (x INT, y INT) USING ${cls.getName}") + }, + errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH", + parameters = Map( + "tableSchema" -> "\"STRUCT\"", + "actualSchema" -> "\"STRUCT\"")) + } + } + + test("SPARK-46272: create table as select") { + val cls = classOf[WritableDataSourceSupportsExternalMetadata] + withTable("test") { + sql( + s""" + |CREATE TABLE test USING ${cls.getName} + |AS VALUES (0, 1), (1, 2) t(i, j) + |""".stripMargin) + checkAnswer(sql("SELECT * FROM test"), Seq((0, 1), (1, 2)).toDF("i", "j")) + sql( + s""" + |CREATE OR REPLACE TABLE test USING ${cls.getName} + |AS VALUES (2, 3), (4, 5) t(i, j) + |""".stripMargin) + checkAnswer(sql("SELECT * FROM test"), Seq((2, 3), (4, 5)).toDF("i", "j")) + sql( + s""" + |CREATE TABLE IF NOT EXISTS test USING ${cls.getName} + |AS VALUES (3, 4), (4, 5) + |""".stripMargin) + checkAnswer(sql("SELECT * FROM test"), Seq((2, 3), (4, 5)).toDF("i", "j")) + } + } + + test("SPARK-46272: create table as select - schema name mismatch") { + val cls = classOf[WritableDataSourceSupportsExternalMetadata] + withTable("test") { + checkError( + exception = intercept[AnalysisException] { + sql(s"CREATE TABLE test USING ${cls.getName} AS VALUES (0, 1), (1, 2)") + }, + errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH", + parameters = Map( + "tableSchema" -> "\"STRUCT\"", + "actualSchema" -> "\"STRUCT\"")) + } + } + + test("SPARK-46272: create table as select - column type mismatch") { + val cls = classOf[WritableDataSourceSupportsExternalMetadata] + withTable("test") { + checkError( + exception = intercept[AnalysisException] { + sql( + s""" + |CREATE TABLE test USING ${cls.getName} + |AS VALUES ('a', 'b'), ('c', 'd') t(i, j) + |""".stripMargin) + }, + errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH", + parameters = Map( + "tableSchema" -> "\"STRUCT\"", + "actualSchema" -> "\"STRUCT\"")) + } + } + + test("SPARK-46272: create or replace table as select with path options") { + val cls = classOf[CustomSchemaAndPartitioningDataSource] + withTempDir { dir => + val path = s"${dir.getCanonicalPath}/test" + Seq((0, 1), (1, 2)).toDF("x", "y").write.format("csv").save(path) + withTable("test") { + sql( + s""" + |CREATE TABLE test USING ${cls.getName} + |OPTIONS (PATH '$path') + |AS VALUES (0, 1) + |""".stripMargin) + checkAnswer(sql("SELECT * FROM test"), Seq(Row(0, 1), Row(0, 1), Row(1, 2))) + // Check the data currently in the path location. + checkAnswer( + spark.read.format("csv").load(path), + Seq(Row("0", "1"), Row("0", "1"), Row("1", "2"))) + // Replace the table with new data. + sql( + s""" + |CREATE OR REPLACE TABLE test USING ${cls.getName} + |OPTIONS (PATH '$path') + |AS VALUES (2, 3) + |""".stripMargin) + checkAnswer(sql("SELECT * FROM test"), Seq(Row(0, 1), Row(0, 1), Row(1, 2), Row(2, 3))) + // Replace the table without the path options. + sql( + s""" + |CREATE OR REPLACE TABLE test USING ${cls.getName} + |AS VALUES (3, 4) + |""".stripMargin) + checkAnswer(sql("SELECT * FROM test"), Seq(Row(3, 4))) + } + } + } + + test("SPARK-46272: create table as select with incompatible data sources") { + // CTAS with data sources that do not support external metadata. + withTable("test") { + val cls = classOf[SimpleDataSourceV2] + checkError( + exception = intercept[SparkUnsupportedOperationException] { + sql(s"CREATE TABLE test USING ${cls.getName} AS VALUES (0, 1)") + }, + errorClass = "CANNOT_CREATE_DATA_SOURCE_TABLE.EXTERNAL_METADATA_UNSUPPORTED", + parameters = Map( + "tableName" -> "`default`.`test`", + "provider" -> "org.apache.spark.sql.connector.SimpleDataSourceV2")) + } + // CTAS with data sources that do not support batch write. + withTable("test") { + val cls = classOf[SchemaRequiredDataSource] + checkError( + exception = intercept[AnalysisException] { + sql(s"CREATE TABLE test USING ${cls.getName} AS SELECT * FROM VALUES (0, 1)") + }, + errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`test`", + "operation" -> "append in batch mode")) + } + } + test("SPARK-46273: insert into") { - val cls = classOf[SupportsExternalMetadataDataSource] + val cls = classOf[CustomSchemaAndPartitioningDataSource] withTable("test") { sql( s""" @@ -766,7 +899,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS } test("SPARK-46273: insert overwrite") { - val cls = classOf[SupportsExternalMetadataDataSource] + val cls = classOf[CustomSchemaAndPartitioningDataSource] withTable("test") { sql( s""" @@ -788,7 +921,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS } test("SPARK-46273: insert into with partition") { - val cls = classOf[SupportsExternalMetadataDataSource] + val cls = classOf[CustomSchemaAndPartitioningDataSource] withTable("test") { sql(s"CREATE TABLE test(x INT, y INT) USING ${cls.getName} PARTITIONED BY (x, y)") sql("INSERT INTO test PARTITION(x = 1) VALUES (2)") @@ -818,7 +951,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS } test("SPARK-46273: insert overwrite with partition") { - val cls = classOf[SupportsExternalMetadataDataSource] + val cls = classOf[CustomSchemaAndPartitioningDataSource] withTable("test") { sql(s"CREATE TABLE test (x INT, y INT) USING ${cls.getName} PARTITIONED BY (x, y)") sql("INSERT INTO test PARTITION(x = 1) VALUES (2)") @@ -1334,9 +1467,18 @@ class SimpleWriteOnlyDataSource extends SimpleWritableDataSource { } } -class SupportsExternalMetadataDataSource extends SimpleWritableDataSource { +/** + * A writable data source that supports external metadata with a fixed schema (i int, j int). + */ +class WritableDataSourceSupportsExternalMetadata extends SimpleWritableDataSource { override def supportsExternalMetadata(): Boolean = true +} +/** + * A writable data source that supports external metadata with + * user-specified schema and partitioning. + */ +class CustomSchemaAndPartitioningDataSource extends WritableDataSourceSupportsExternalMetadata { class TestTable( schema: StructType, partitioning: Array[Transform], diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala index 174700e8d24f..25d2d5a67d44 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala @@ -61,3 +61,25 @@ class FakeV2Provider extends TableProvider { object FakeV2Provider { val schema: StructType = new StructType().add("i", "int").add("j", "int") } + +class FakeV2ProviderWithCustomSchema extends FakeV2Provider { + class FakeTable( + schema: StructType, + partitioning: Array[Transform], + options: CaseInsensitiveStringMap) extends SimpleBatchTable { + override def schema(): StructType = schema + + override def partitioning(): Array[Transform] = partitioning + + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + new MyScanBuilder() + } + } + + override def getTable( + schema: StructType, + partitioning: Array[Transform], + properties: util.Map[String, String]): Table = { + new FakeTable(schema, partitioning, new CaseInsensitiveStringMap(properties)) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala index fd4f719417e4..b952270fc786 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala @@ -380,7 +380,7 @@ class SupportsCatalogOptionsSuite extends QueryTest with SharedSparkSession with } class CatalogSupportingInMemoryTableProvider - extends FakeV2Provider + extends FakeV2ProviderWithCustomSchema with SupportsCatalogOptions { override def extractIdentifier(options: CaseInsensitiveStringMap): Identifier = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala index 181dc0ea2074..ad31cf84eeb3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala @@ -249,7 +249,7 @@ private object InMemoryV1Provider { } class InMemoryV1Provider - extends FakeV2Provider + extends FakeV2ProviderWithCustomSchema with DataSourceRegister with CreatableRelationProvider { override def getTable(options: CaseInsensitiveStringMap): Table = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala index 7bf81fb98655..5a4f386f1d1d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 -import org.apache.spark.sql.connector.{FakeV2Provider, InMemoryTableSessionCatalog} +import org.apache.spark.sql.connector.{FakeV2Provider, FakeV2ProviderWithCustomSchema, InMemoryTableSessionCatalog} import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTableCatalog, MetadataColumn, SupportsMetadataColumns, SupportsRead, Table, TableCapability, V2TableWithV1Fallback} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.read.ScanBuilder @@ -204,7 +204,7 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter { } test("write: write to table with default session catalog") { - val v2Source = classOf[FakeV2Provider].getName + val v2Source = classOf[FakeV2ProviderWithCustomSchema].getName spark.conf.set(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION.key, classOf[InMemoryTableSessionCatalog].getName)