diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 5b70edf249d1..286985b7c76b 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -149,6 +149,19 @@ ], "sqlState" : "42846" }, + "CANNOT_CREATE_DATA_SOURCE_TABLE" : { + "message" : [ + "Failed to create data source table :" + ], + "subClass" : { + "EXTERNAL_METADATA_UNSUPPORTED" : { + "message" : [ + "provider '' does not support external metadata but a schema is provided. Please remove the schema when creating the table." + ] + } + }, + "sqlState" : "42KDE" + }, "CANNOT_DECODE_URL" : { "message" : [ "The provided URL cannot be decoded: . Please ensure that the URL is properly formatted and try again." diff --git a/docs/sql-error-conditions-cannot-create-data-source-table-error-class.md b/docs/sql-error-conditions-cannot-create-data-source-table-error-class.md new file mode 100644 index 000000000000..f52e4f462bc9 --- /dev/null +++ b/docs/sql-error-conditions-cannot-create-data-source-table-error-class.md @@ -0,0 +1,32 @@ +--- +layout: global +title: CANNOT_CREATE_DATA_SOURCE_TABLE error class +displayTitle: CANNOT_CREATE_DATA_SOURCE_TABLE error class +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +[SQLSTATE: 42KDE](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) + +Failed to create data source table ``: + +This error class has the following derived error classes: + +## EXTERNAL_METADATA_UNSUPPORTED + +provider '``' does not support external metadata but a schema is provided. Please remove the schema when creating the table. + + diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index 71abf10da328..5d9f41d15720 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -163,6 +163,14 @@ Cannot convert SQL `` to Protobuf `` because schema i Cannot convert SQL `` to Protobuf `` because `` is not in defined values for enum: ``. +### [CANNOT_CREATE_DATA_SOURCE_TABLE](sql-error-conditions-cannot-create-data-source-table-error-class.html) + +[SQLSTATE: 42KDE](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) + +Failed to create data source table ``: + +For more details see [CANNOT_CREATE_DATA_SOURCE_TABLE](sql-error-conditions-cannot-create-data-source-table-error-class.html) + ### CANNOT_DECODE_URL [SQLSTATE: 22546](sql-error-conditions-sqlstates.html#class-22-data-exception) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/QuotingUtils.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/QuotingUtils.scala index 43af533b85a4..97d110f7ffc5 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/QuotingUtils.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/QuotingUtils.scala @@ -61,6 +61,14 @@ object QuotingUtils { } } + def fullyQuoted(ident: Identifier): String = { + if (ident.namespace.nonEmpty) { + ident.namespace.map(quoteIdentifier).mkString(".") + "." + quoteIdentifier(ident.name) + } else { + quoteIdentifier(ident.name) + } + } + def escapeSingleQuotedString(str: String): String = { val builder = new StringBuilder diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala index 133011ad9fac..2b712241633b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala @@ -144,10 +144,16 @@ private[sql] object CatalogV2Implicits { } implicit class IdentifierHelper(ident: Identifier) { + /* Quote the identifier if needed. */ def quoted: String = { QuotingUtils.quoted(ident) } + /* Always quote the identifier. */ + def fullyQuoted: String = { + QuotingUtils.fullyQuoted(ident) + } + def original: String = ident.namespace() :+ ident.name() mkString "." def asMultipartIdentifier: Seq[String] = (ident.namespace :+ ident.name).toImmutableArraySeq diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 92638a152871..573b0274e958 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -17,11 +17,12 @@ package org.apache.spark.sql.execution.datasources.v2 +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Expression, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, ExposesMetadataColumns, Histogram, HistogramBin, LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes -import org.apache.spark.sql.catalyst.util.{truncatedString, CharVarcharUtils} +import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, truncatedString, CharVarcharUtils} import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, SupportsMetadataColumns, Table, TableCapability} import org.apache.spark.sql.connector.read.{Scan, Statistics => V2Statistics, SupportsReportStatistics} import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream} @@ -61,7 +62,19 @@ case class DataSourceV2Relation( Nil } - override def name: String = table.name() + override def name: String = { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + (catalog, identifier) match { + case (Some(cat), Some(ident)) => s"${quoteIfNeeded(cat.name())}.${ident.quoted}" + case (None, None) => table.name() + case _ => + throw SparkException.internalError( + "Invalid catalog and identifier pair. Both 'catalog' and 'identifier' must be " + + s"specified or leave as None. Current input - " + + s"catalog: '${catalog.map(_.name()).getOrElse(None)}', " + + s"identifier: ${identifier.map(_.quoted).getOrElse(None)}.") + } + } override def skipSchemaResolution: Boolean = table.supports(TableCapability.ACCEPT_ANY_SCHEMA) @@ -127,7 +140,7 @@ case class DataSourceV2ScanRelation( keyGroupedPartitioning: Option[Seq[Expression]] = None, ordering: Option[Seq[SortOrder]] = None) extends LeafNode with NamedRelation { - override def name: String = relation.table.name() + override def name: String = relation.name override def simpleString(maxFields: Int): String = { s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index b4c549a90191..d44de0b260b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -31,8 +31,8 @@ import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Lo import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1, DataSource} -import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 +import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} import org.apache.spark.sql.internal.connector.V1Function import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType} @@ -612,15 +612,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } private def isV2Provider(provider: String): Boolean = { - // Return earlier since `lookupDataSourceV2` may fail to resolve provider "hive" to - // `HiveFileFormat`, when running tests in sql/core. - if (DDLUtils.isHiveTable(Some(provider))) return false - DataSource.lookupDataSourceV2(provider, conf) match { - // TODO(SPARK-28396): Currently file source v2 can't work with tables. - case Some(_: FileDataSourceV2) => false - case Some(_) => true - case _ => false - } + DataSourceV2Utils.getTableProvider(provider, conf).isDefined } private object DatabaseInSessionCatalog { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala index 3dde20ac44e7..80d8fe787c24 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala @@ -31,6 +31,8 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SessionConfigSupport, SupportsCatalogOptions, SupportsRead, Table, TableProvider} import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{LongType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -151,6 +153,20 @@ private[sql] object DataSourceV2Utils extends Logging { } } + /** + * Returns the table provider for the given format, or None if it cannot be found. + */ + def getTableProvider(provider: String, conf: SQLConf): Option[TableProvider] = { + // Return earlier since `lookupDataSourceV2` may fail to resolve provider "hive" to + // `HiveFileFormat`, when running tests in sql/core. + if (DDLUtils.isHiveTable(Some(provider))) return None + DataSource.lookupDataSourceV2(provider, conf) match { + // TODO(SPARK-28396): Currently file source v2 can't work with tables. + case Some(p) if !p.isInstanceOf[FileDataSourceV2] => Some(p) + case _ => None + } + } + private lazy val objectMapper = new ObjectMapper() def getOptionsWithPaths( extraOptions: CaseInsensitiveMap[String], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 6cd7ec403be3..3b34d6a8f8c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -23,9 +23,10 @@ import java.util import scala.collection.mutable import scala.jdk.CollectionConverters._ +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException} -import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType, CatalogUtils, ClusterBySpec, SessionCatalog} +import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils, ClusterBySpec, SessionCatalog} import org.apache.spark.sql.catalyst.util.TypeUtils._ import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Column, FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableCatalogCapability, TableChange, V1Table} import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty @@ -71,9 +72,44 @@ class V2SessionCatalog(catalog: SessionCatalog) } } + // Get data source options from the catalog table properties with the path option. + private def getDataSourceOptions( + properties: Map[String, String], + storage: CatalogStorageFormat): CaseInsensitiveStringMap = { + val propertiesWithPath = properties ++ + storage.locationUri.map("path" -> CatalogUtils.URIToString(_)) + new CaseInsensitiveStringMap(propertiesWithPath.asJava) + } + override def loadTable(ident: Identifier): Table = { try { - V1Table(catalog.getTableMetadata(ident.asTableIdentifier)) + val table = catalog.getTableMetadata(ident.asTableIdentifier) + if (table.provider.isDefined) { + DataSourceV2Utils.getTableProvider(table.provider.get, conf) match { + case Some(provider) => + // Get the table properties during creation and append the path option + // to the properties. + val dsOptions = getDataSourceOptions(table.properties, table.storage) + // If the source accepts external table metadata, we can pass the schema and + // partitioning information stored in Hive to `getTable` to avoid expensive + // schema/partitioning inference. + if (provider.supportsExternalMetadata()) { + provider.getTable( + table.schema, + getV2Partitioning(table), + dsOptions.asCaseSensitiveMap()) + } else { + provider.getTable( + provider.inferSchema(dsOptions), + provider.inferPartitioning(dsOptions), + dsOptions.asCaseSensitiveMap()) + } + case _ => + V1Table(table) + } + } else { + V1Table(table) + } } catch { case _: NoSuchDatabaseException => throw QueryCompilationErrors.noSuchTableError(ident) @@ -96,6 +132,16 @@ class V2SessionCatalog(catalog: SessionCatalog) throw QueryCompilationErrors.timeTravelUnsupportedError(toSQLId(nameParts)) } + private def getV2Partitioning(table: CatalogTable): Array[Transform] = { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + val v2Partitioning = table.partitionColumnNames.asTransforms + val v2Bucketing = table.bucketSpec.map( + spec => Array(spec.asTransform)).getOrElse(Array.empty) + val v2Clustering = table.clusterBySpec.map( + spec => Array(spec.asTransform)).getOrElse(Array.empty) + v2Partitioning ++ v2Bucketing ++ v2Clustering + } + override def invalidateTable(ident: Identifier): Unit = { catalog.refreshTable(ident.asTableIdentifier) } @@ -114,14 +160,12 @@ class V2SessionCatalog(catalog: SessionCatalog) schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): Table = { - import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TransformHelper - val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) = - partitions.toImmutableArraySeq.convertTransforms + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, conf.defaultDataSourceName) - val tableProperties = properties.asScala + val tableProperties = properties.asScala.toMap val location = Option(properties.get(TableCatalog.PROP_LOCATION)) - val storage = DataSource.buildStorageFormatFromOptions(toOptions(tableProperties.toMap)) - .copy(locationUri = location.map(CatalogUtils.stringToURI)) + val storage = DataSource.buildStorageFormatFromOptions(toOptions(tableProperties)) + .copy(locationUri = location.map(CatalogUtils.stringToURI)) val isExternal = properties.containsKey(TableCatalog.PROP_EXTERNAL) val tableType = if (isExternal || location.isDefined) { CatalogTableType.EXTERNAL @@ -129,17 +173,55 @@ class V2SessionCatalog(catalog: SessionCatalog) CatalogTableType.MANAGED } + val (newSchema, newPartitions) = DataSourceV2Utils.getTableProvider(provider, conf) match { + // If the provider does not support external metadata, users should not be allowed to + // specify custom schema when creating the data source table, since the schema will not + // be used when loading the table. + case Some(p) if !p.supportsExternalMetadata() => + if (schema.nonEmpty) { + throw new SparkUnsupportedOperationException( + errorClass = "CANNOT_CREATE_DATA_SOURCE_TABLE.EXTERNAL_METADATA_UNSUPPORTED", + messageParameters = Map("tableName" -> ident.fullyQuoted, "provider" -> provider)) + } + // V2CreateTablePlan does not allow non-empty partitions when schema is empty. This + // is checked in `PreProcessTableCreation` rule. + assert(partitions.isEmpty, + s"Partitions should be empty when the schema is empty: ${partitions.mkString(", ")}") + (schema, partitions) + + case Some(tableProvider) => + assert(tableProvider.supportsExternalMetadata()) + lazy val dsOptions = getDataSourceOptions(tableProperties, storage) + if (schema.isEmpty) { + assert(partitions.isEmpty, + s"Partitions should be empty when the schema is empty: ${partitions.mkString(", ")}") + // Infer the schema and partitions and store them in the catalog. + (tableProvider.inferSchema(dsOptions), tableProvider.inferPartitioning(dsOptions)) + } else if (partitions.isEmpty) { + (schema, tableProvider.inferPartitioning(dsOptions)) + } else { + (schema, partitions) + } + + case _ => + // The provider is not a V2 provider so we return the schema and partitions as is. + (schema, partitions) + } + + val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) = + newPartitions.toImmutableArraySeq.convertTransforms + val tableDesc = CatalogTable( identifier = ident.asTableIdentifier, tableType = tableType, storage = storage, - schema = schema, + schema = newSchema, provider = Some(provider), partitionColumnNames = partitionColumns, bucketSpec = maybeBucketSpec, - properties = tableProperties.toMap ++ + properties = tableProperties ++ maybeClusterBySpec.map( - clusterBySpec => ClusterBySpec.toProperty(schema, clusterBySpec, conf.resolver)), + clusterBySpec => ClusterBySpec.toProperty(newSchema, clusterBySpec, conf.resolver)), tracksPartitionsInCatalog = conf.manageFilesourcePartitions, comment = Option(properties.get(TableCatalog.PROP_COMMENT))) diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSchemaRequiredDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSchemaRequiredDataSource.java index e7d728fc5a08..91b6621e7767 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSchemaRequiredDataSource.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSchemaRequiredDataSource.java @@ -44,7 +44,9 @@ public StructType readSchema() { @Override public InputPartition[] planInputPartitions() { - return new InputPartition[0]; + InputPartition[] partitions = new InputPartition[1]; + partitions[0] = new JavaRangeInputPartition(0, 2); + return partitions; } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index c2e759efe402..adc2039b8ce5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -45,10 +45,8 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode, V2_SESSION_CATALOG_IMPLEMENTATION} -import org.apache.spark.sql.internal.connector.SimpleTableProvider import org.apache.spark.sql.sources.SimpleScanSource import org.apache.spark.sql.types.{LongType, StringType, StructType} -import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.unsafe.types.UTF8String abstract class DataSourceV2SQLSuite @@ -1120,7 +1118,7 @@ class DataSourceV2SQLSuiteV1Filter }, errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", parameters = Map( - "tableName" -> "`default`.`tbl`", + "tableName" -> "`spark_catalog`.`default`.`tbl`", "tableColumns" -> "`id`, `data`", "dataColumns" -> "`col1`" ) @@ -1155,7 +1153,7 @@ class DataSourceV2SQLSuiteV1Filter }, errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", parameters = Map( - "tableName" -> "`default`.`tbl`", + "tableName" -> "`spark_catalog`.`default`.`tbl`", "tableColumns" -> "`id`, `data`", "dataColumns" -> "`col1`" ) @@ -1191,7 +1189,7 @@ class DataSourceV2SQLSuiteV1Filter }, errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", parameters = Map( - "tableName" -> "`default`.`tbl`", + "tableName" -> "`spark_catalog`.`default`.`tbl`", "tableColumns" -> "`id`, `data`, `data2`", "dataColumns" -> "`col1`, `col2`" ) @@ -3364,13 +3362,6 @@ class DataSourceV2SQLSuiteV2Filter extends DataSourceV2SQLSuite { override protected val catalogAndNamespace = "testv2filter.ns1.ns2." } -/** Used as a V2 DataSource for V2SessionCatalog DDL */ -class FakeV2Provider extends SimpleTableProvider { - override def getTable(options: CaseInsensitiveStringMap): Table = { - throw new UnsupportedOperationException("Unnecessary for DDL tests") - } -} - class ReserveSchemaNullabilityCatalog extends InMemoryCatalog { override def useNullableQuerySchema(): Boolean = false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index e61f8cb0bf06..f2e518e8acc4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -22,6 +22,7 @@ import java.util.OptionalLong import test.org.apache.spark.sql.connector._ +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.catalog.{PartitionInternalRow, SupportsRead, Table, TableCapability, TableProvider} @@ -231,11 +232,11 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS val e = intercept[IllegalArgumentException](spark.read.format(cls.getName).load()) assert(e.getMessage.contains("requires a user-supplied schema")) - val schema = new StructType().add("i", "int").add("s", "string") + val schema = new StructType().add("i", "int").add("j", "int") val df = spark.read.format(cls.getName).schema(schema).load() assert(df.schema == schema) - assert(df.collect().isEmpty) + checkAnswer(df, Seq(Row(0, 0), Row(1, -1))) } } } @@ -633,6 +634,95 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS } } } + + test("SPARK-46043: create table in SQL using a DSv2 source") { + Seq(classOf[SimpleDataSourceV2], classOf[JavaSimpleDataSourceV2]).foreach { cls => + withClue(cls.getName) { + // Create a table with empty schema. + withTable("test") { + sql(s"CREATE TABLE test USING ${cls.getName}") + checkAnswer( + sql(s"SELECT * FROM test WHERE i < 3"), + Seq(Row(0, 0), Row(1, -1), Row(2, -2))) + } + // Create a table with non-empty schema is not allowed. + checkError( + exception = intercept[SparkUnsupportedOperationException] { + sql(s"CREATE TABLE test(a INT, b INT) USING ${cls.getName}") + }, + errorClass = "CANNOT_CREATE_DATA_SOURCE_TABLE.EXTERNAL_METADATA_UNSUPPORTED", + parameters = Map("tableName" -> "`default`.`test`", "provider" -> cls.getName) + ) + } + } + } + + test("SPARK-46043: create table in SQL with schema required data source") { + val cls = classOf[SchemaRequiredDataSource] + val e = intercept[IllegalArgumentException] { + sql(s"CREATE TABLE test USING ${cls.getName}") + } + assert(e.getMessage.contains("requires a user-supplied schema")) + withTable("test") { + sql(s"CREATE TABLE test(i INT, j INT) USING ${cls.getName}") + checkAnswer(sql(s"SELECT * FROM test"), Seq(Row(0, 0), Row(1, -1))) + } + withTable("test") { + sql(s"CREATE TABLE test(i INT) USING ${cls.getName}") + checkAnswer(sql(s"SELECT * FROM test"), Seq(Row(0), Row(1))) + } + withTable("test") { + // Test the behavior when there is a mismatch between the schema defined in the + // CREATE TABLE command and the actual schema produced by the data source. The + // resulting behavior is not guaranteed and may vary based on the data source's + // implementation. + sql(s"CREATE TABLE test(i INT, j INT, k INT) USING ${cls.getName}") + val e = intercept[Exception] { + sql("SELECT * FROM test").collect() + } + assert(e.getMessage.contains( + "java.lang.ArrayIndexOutOfBoundsException: Index 2 out of bounds for length 2")) + } + } + + test("SPARK-46043: create table in SQL with partitioning required data source") { + val cls = classOf[PartitionsRequiredDataSource] + val e = intercept[IllegalArgumentException]( + sql(s"CREATE TABLE test(a INT) USING ${cls.getName}")) + assert(e.getMessage.contains("user-supplied partitioning")) + withTable("test") { + sql(s"CREATE TABLE test(i INT, j INT) USING ${cls.getName} PARTITIONED BY (i)") + checkAnswer(sql(s"SELECT * FROM test"), Seq(Row(0, 0), Row(1, -1))) + } + } + + test("SPARK-46043: create table in SQL with path option") { + val cls = classOf[SupportsExternalMetadataDataSource] + withTempDir { dir => + val path = s"${dir.getCanonicalPath}/test" + Seq((0, 1), (1, 2)).toDF("x", "y").write.format("csv").save(path) + withTable("test") { + sql( + s""" + |CREATE TABLE test USING ${cls.getName} + |OPTIONS (PATH '$path') + |""".stripMargin) + checkAnswer(sql("SELECT * FROM test"), Seq(Row(0, 1), Row(1, 2))) + sql( + s""" + |CREATE OR REPLACE TABLE test USING ${cls.getName} + |OPTIONS (PATH '${dir.getCanonicalPath}/non-existing') + |""".stripMargin) + checkAnswer(sql("SELECT * FROM test"), Nil) + sql( + s""" + |CREATE OR REPLACE TABLE test USING ${cls.getName} + |LOCATION '$path' + |""".stripMargin) + checkAnswer(sql("SELECT * FROM test"), Seq(Row(0, 1), Row(1, 2))) + } + } + } } @@ -910,7 +1000,9 @@ class AdvancedReaderFactory(requiredSchema: StructType) extends PartitionReaderF class SchemaRequiredDataSource extends TableProvider { class MyScanBuilder(schema: StructType) extends SimpleScanBuilder { - override def planInputPartitions(): Array[InputPartition] = Array.empty + override def planInputPartitions(): Array[InputPartition] = { + Array(RangeInputPartition(0, 2)) + } override def readSchema(): StructType = schema } @@ -936,6 +1028,12 @@ class SchemaRequiredDataSource extends TableProvider { } } +class PartitionsRequiredDataSource extends SchemaRequiredDataSource { + override def inferPartitioning(options: CaseInsensitiveStringMap): Array[Transform] = { + throw new IllegalArgumentException("requires user-supplied partitioning") + } +} + class ColumnarDataSourceV2 extends TestingV2Source { class MyScanBuilder extends SimpleScanBuilder { @@ -1127,6 +1225,10 @@ class SimpleWriteOnlyDataSource extends SimpleWritableDataSource { } } +class SupportsExternalMetadataDataSource extends SimpleWritableDataSource { + override def supportsExternalMetadata(): Boolean = true +} + class SupportsExternalMetadataWritableDataSource extends SimpleWritableDataSource { override def supportsExternalMetadata(): Boolean = true diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala new file mode 100644 index 000000000000..174700e8d24f --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import java.util + +import org.apache.spark.sql.connector.catalog.{Table, TableProvider} +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.connector.read.{InputPartition, ScanBuilder} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +/** Used as a V2 DataSource for V2SessionCatalog DDL */ +class FakeV2Provider extends TableProvider { + + // Supports external metadata so that users can specify schema and partitions + // when creating the table. + override def supportsExternalMetadata(): Boolean = true + + class MyScanBuilder extends SimpleScanBuilder { + override def planInputPartitions(): Array[InputPartition] = { + Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10)) + } + } + + override def inferSchema(options: CaseInsensitiveStringMap): StructType = { + FakeV2Provider.schema + } + + def getTable(options: CaseInsensitiveStringMap): Table = { + new SimpleBatchTable { + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + new MyScanBuilder() + } + } + } + + override def getTable( + schema: StructType, + partitioning: Array[Transform], + properties: util.Map[String, String]): Table = { + getTable(new CaseInsensitiveStringMap(properties)) + } +} + +object FakeV2Provider { + val schema: StructType = new StructType().add("i", "int").add("j", "int") +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala index 63bb0148972c..fa30969d65c5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala @@ -126,7 +126,11 @@ abstract class InsertIntoTests( val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") verifyTable(t1, Seq.empty[(Long, String, String)].toDF("id", "data", "missing")) - val tableName = if (catalogAndNamespace.isEmpty) toSQLId(s"default.$t1") else toSQLId(t1) + val tableName = if (catalogAndNamespace.isEmpty) { + toSQLId(s"spark_catalog.default.$t1") + } else { + toSQLId(t1) + } checkError( exception = intercept[AnalysisException] { doInsert(t1, df) @@ -145,7 +149,11 @@ abstract class InsertIntoTests( sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format") val df = Seq((1L, "a", "mango")).toDF("id", "data", "fruit") verifyTable(t1, Seq.empty[(Long, String)].toDF("id", "data")) - val tableName = if (catalogAndNamespace.isEmpty) toSQLId(s"default.$t1") else toSQLId(t1) + val tableName = if (catalogAndNamespace.isEmpty) { + toSQLId(s"spark_catalog.default.$t1") + } else { + toSQLId(t1) + } checkError( exception = intercept[AnalysisException] { doInsert(t1, df) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala index 1396ef82925a..719b006c1460 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala @@ -78,6 +78,7 @@ private[connector] trait TestV2SessionCatalogBase[T <: Table] extends Delegating schema: StructType, partitions: Array[Transform], properties: java.util.Map[String, String]): Table = { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper val key = TestV2SessionCatalogBase.SIMULATE_ALLOW_EXTERNAL_PROPERTY val propsWithLocation = if (properties.containsKey(key)) { // Always set a location so that CREATE EXTERNAL TABLE won't fail with LOCATION not specified. @@ -92,8 +93,8 @@ private[connector] trait TestV2SessionCatalogBase[T <: Table] extends Delegating } else { properties } - val created = super.createTable(ident, schema, partitions, propsWithLocation) - val t = newTable(created.name(), schema, partitions, propsWithLocation) + super.createTable(ident, schema, partitions, propsWithLocation) + val t = newTable(ident.quoted, schema, partitions, propsWithLocation) addTable(ident, t) t } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala index ec62739b9cf2..181dc0ea2074 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala @@ -36,7 +36,6 @@ import org.apache.spark.sql.connector.write.{LogicalWriteInfo, LogicalWriteInfoI import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.functions.lit import org.apache.spark.sql.internal.SQLConf.{OPTIMIZER_MAX_ITERATIONS, V2_SESSION_CATALOG_IMPLEMENTATION} -import org.apache.spark.sql.internal.connector.SimpleTableProvider import org.apache.spark.sql.sources._ import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.StructType @@ -250,7 +249,7 @@ private object InMemoryV1Provider { } class InMemoryV1Provider - extends SimpleTableProvider + extends FakeV2Provider with DataSourceRegister with CreatableRelationProvider { override def getTable(options: CaseInsensitiveStringMap): Table = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index e39cc91d5f04..69b3285fc7f1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -1275,7 +1275,7 @@ class PlanResolutionSuite extends AnalysisTest { }, errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", parameters = Map( - "tableName" -> "`tab2`", + "tableName" -> "`testcat`.`tab2`", "tableColumns" -> "`i`, `x`", "dataColumns" -> "`col1`") )