From fc1249d40a247cf6583086b36caf60621ffff58b Mon Sep 17 00:00:00 2001 From: allisonwang-db Date: Tue, 21 Nov 2023 19:39:15 -0800 Subject: [PATCH 01/10] create table --- .../analysis/ResolveSessionCatalog.scala | 24 +++-------- .../datasources/v2/DataSourceV2Utils.scala | 23 +++++++++++ .../datasources/v2/V2SessionCatalog.scala | 40 ++++++++++++++++++- .../sql/connector/DataSourceV2Suite.scala | 32 +++++++++++++++ 4 files changed, 100 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index b4c549a90191..78454c6771d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -31,8 +31,8 @@ import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Lo import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1, DataSource} -import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 +import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} import org.apache.spark.sql.internal.connector.V1Function import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType} @@ -163,7 +163,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) val (storageFormat, provider) = getStorageFormatAndProvider( c.tableSpec.provider, tableSpec.options, c.tableSpec.location, c.tableSpec.serde, ctas = false) - if (!isV2Provider(provider)) { + if (!DataSourceV2Utils.isV2Provider(provider, conf)) { constructV1TableCmd(None, c.tableSpec, ident, c.tableSchema, c.partitioning, c.ignoreIfExists, storageFormat, provider) } else { @@ -179,7 +179,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) c.tableSpec.serde, ctas = true) - if (!isV2Provider(provider)) { + if (!DataSourceV2Utils.isV2Provider(provider, conf)) { constructV1TableCmd(Some(c.query), c.tableSpec, ident, new StructType, c.partitioning, c.ignoreIfExists, storageFormat, provider) } else { @@ -193,7 +193,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) // session catalog and the table provider is not v2. case c @ ReplaceTable(ResolvedV1Identifier(ident), _, _, _, _) => val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName) - if (!isV2Provider(provider)) { + if (!DataSourceV2Utils.isV2Provider(provider, conf)) { throw QueryCompilationErrors.unsupportedTableOperationError( ident, "REPLACE TABLE") } else { @@ -202,7 +202,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case c @ ReplaceTableAsSelect(ResolvedV1Identifier(ident), _, _, _, _, _, _) => val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName) - if (!isV2Provider(provider)) { + if (!DataSourceV2Utils.isV2Provider(provider, conf)) { throw QueryCompilationErrors.unsupportedTableOperationError( ident, "REPLACE TABLE AS SELECT") } else { @@ -611,18 +611,6 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) StructField(col.name.head, col.dataType, nullable = true, builder.build()) } - private def isV2Provider(provider: String): Boolean = { - // Return earlier since `lookupDataSourceV2` may fail to resolve provider "hive" to - // `HiveFileFormat`, when running tests in sql/core. - if (DDLUtils.isHiveTable(Some(provider))) return false - DataSource.lookupDataSourceV2(provider, conf) match { - // TODO(SPARK-28396): Currently file source v2 can't work with tables. - case Some(_: FileDataSourceV2) => false - case Some(_) => true - case _ => false - } - } - private object DatabaseInSessionCatalog { def unapply(resolved: ResolvedNamespace): Option[String] = resolved match { case ResolvedNamespace(catalog, _, _) if !isSessionCatalog(catalog) => None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala index 3dde20ac44e7..8f9859592fba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala @@ -31,6 +31,8 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SessionConfigSupport, SupportsCatalogOptions, SupportsRead, Table, TableProvider} import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{LongType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -151,6 +153,27 @@ private[sql] object DataSourceV2Utils extends Logging { } } + /** + * Returns the table provider for the given format, or None if it cannot be found. + */ + def getTableProvider(provider: String, conf: SQLConf): Option[TableProvider] = { + // Return earlier since `lookupDataSourceV2` may fail to resolve provider "hive" to + // `HiveFileFormat`, when running tests in sql/core. + if (DDLUtils.isHiveTable(Some(provider))) return None + DataSource.lookupDataSourceV2(provider, conf) match { + // TODO(SPARK-28396): Currently file source v2 can't work with tables. + case Some(_: FileDataSourceV2) => None + case o => o + } + } + + /** + * Check if the provider is a v2 provider. + */ + def isV2Provider(provider: String, conf: SQLConf): Boolean = { + getTableProvider(provider, conf).isDefined + } + private lazy val objectMapper = new ObjectMapper() def getOptionsWithPaths( extraOptions: CaseInsensitiveMap[String], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 6cd7ec403be3..dfee55911b7a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -73,7 +73,45 @@ class V2SessionCatalog(catalog: SessionCatalog) override def loadTable(ident: Identifier): Table = { try { - V1Table(catalog.getTableMetadata(ident.asTableIdentifier)) + val catalogTable = catalog.getTableMetadata(ident.asTableIdentifier) + if (catalogTable.provider.isDefined) { + DataSourceV2Utils.getTableProvider(catalogTable.provider.get, conf) match { + case Some(tableProvider) => + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + // TODO: should we use the session property? + // val sessionOptions = DataSourceV2Utils.extractSessionConfigs(tableProvider, conf) + val dsOptions = catalogTable.properties + val pathOption = catalogTable.storage.locationUri.map(CatalogUtils.URIToString) + val optionsWithPath = if (pathOption.isDefined) { + dsOptions ++ Map("path" -> pathOption.get) + } else { + dsOptions + } + // If the source accepts external table metadata, we can pass the schema and + // partitioning information stored in Hive to `getTable` to avoid schema/partitioning + // inference, which can be very expensive. + if (tableProvider.supportsExternalMetadata()) { + val v2Partitioning = catalogTable.partitionColumnNames.asTransforms + val v2Bucketing = catalogTable.bucketSpec.map( + spec => Array(spec.asTransform)).getOrElse(Array.empty) + val v2Clustering = catalogTable.clusterBySpec.map( + spec => Array(spec.asTransform)).getOrElse(Array.empty) + val partitioning = v2Partitioning ++ v2Bucketing ++ v2Clustering + tableProvider.getTable( + catalogTable.schema, + partitioning, + optionsWithPath.asJava) + } else { + val options = new CaseInsensitiveStringMap(optionsWithPath.asJava) + DataSourceV2Utils.getTableFromProvider( + tableProvider, options, userSpecifiedSchema = None) + } + case _ => + V1Table(catalogTable) + } + } else { + V1Table(catalogTable) + } } catch { case _: NoSuchDatabaseException => throw QueryCompilationErrors.noSuchTableError(ident) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index e61f8cb0bf06..43e3a7450a22 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -633,6 +633,38 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS } } } + + test("SPARK-46043: Support create table using DSv2 sources") { + Seq(classOf[SimpleDataSourceV2], classOf[JavaSimpleDataSourceV2]).foreach { cls => + withClue(cls.getName) { + withTable("test") { + sql(s"CREATE TABLE test USING ${cls.getName}") + checkAnswer( + sql(s"SELECT * FROM test WHERE i < 3"), + Seq(Row(0, 0), Row(1, -1), Row(2, -2))) + } + } + } + withTable("test") { + val cls = classOf[SchemaRequiredDataSource] + withClue(cls.getName) { + sql(s"CREATE TABLE test USING ${cls.getName}") + checkAnswer(sql(s"SELECT * FROM test"), Nil) + } + } + withTable("test") { + val cls = classOf[SupportsExternalMetadataWritableDataSource] + withClue(cls.getName) { + withTempDir { dir => + sql(s"CREATE TABLE test USING ${cls.getName} OPTIONS (path '${dir.getCanonicalPath}')") + checkAnswer(sql(s"SELECT * FROM test"), Nil) + sql(s"CREATE OR REPLACE TABLE test USING ${cls.getName} " + + s"LOCATION '${dir.getCanonicalPath}'") + checkAnswer(sql(s"SELECT * FROM test"), Nil) + } + } + } + } } From b0126b863fef0d1b8dccd50b3f8639a36e92fc26 Mon Sep 17 00:00:00 2001 From: allisonwang-db Date: Tue, 21 Nov 2023 21:10:30 -0800 Subject: [PATCH 02/10] update --- .../spark/sql/execution/datasources/v2/V2SessionCatalog.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index dfee55911b7a..0445a71f9bef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -78,8 +78,6 @@ class V2SessionCatalog(catalog: SessionCatalog) DataSourceV2Utils.getTableProvider(catalogTable.provider.get, conf) match { case Some(tableProvider) => import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - // TODO: should we use the session property? - // val sessionOptions = DataSourceV2Utils.extractSessionConfigs(tableProvider, conf) val dsOptions = catalogTable.properties val pathOption = catalogTable.storage.locationUri.map(CatalogUtils.URIToString) val optionsWithPath = if (pathOption.isDefined) { From 9b28f92b7ec1eb49c04f412dda272f1746a7225b Mon Sep 17 00:00:00 2001 From: allisonwang-db Date: Mon, 27 Nov 2023 23:29:05 -0800 Subject: [PATCH 03/10] address comments --- .../main/resources/error/error-classes.json | 18 +++ ...create-data-source-v2-table-error-class.md | 36 ++++++ docs/sql-error-conditions.md | 8 ++ .../datasources/v2/V2SessionCatalog.scala | 115 ++++++++++++------ .../JavaSchemaRequiredDataSource.java | 4 +- .../sql/connector/DataSourceV2Suite.scala | 89 +++++++++++--- .../SupportsCatalogOptionsSuite.scala | 12 +- 7 files changed, 226 insertions(+), 56 deletions(-) create mode 100644 docs/sql-error-conditions-cannot-create-data-source-v2-table-error-class.md diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 5b70edf249d1..291ab870d125 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -376,6 +376,24 @@ }, "sqlState" : "58030" }, + "CANNOT_CREATE_DATA_SOURCE_V2_TABLE" : { + "message" : [ + "Failed to create data source V2 table:" + ], + "subClass" : { + "CATALOG_OPTIONS_UNSUPPORTED" : { + "message": [ + "provider '' implements 'SupportsCatalogOptions' which is not supported. Please use a compatible provider." + ] + }, + "EXTERNAL_METADATA_UNSUPPORTED" : { + "message": [ + "provider '' does not support external metadata but schema is provided. Please remove the schema and try again." + ] + } + }, + "sqlState": "42KDE" + }, "CAST_INVALID_INPUT" : { "message" : [ "The value of the type cannot be cast to because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. If necessary set to \"false\" to bypass this error." diff --git a/docs/sql-error-conditions-cannot-create-data-source-v2-table-error-class.md b/docs/sql-error-conditions-cannot-create-data-source-v2-table-error-class.md new file mode 100644 index 000000000000..950fd7d11278 --- /dev/null +++ b/docs/sql-error-conditions-cannot-create-data-source-v2-table-error-class.md @@ -0,0 +1,36 @@ +--- +layout: global +title: CANNOT_CREATE_DATA_SOURCE_V2_TABLE error class +displayTitle: CANNOT_CREATE_DATA_SOURCE_V2_TABLE error class +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +[SQLSTATE: 42KDE](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) + +Failed to create data source V2 table: + +This error class has the following derived error classes: + +## CATALOG_OPTIONS_UNSUPPORTED + +provider '``' implements 'SupportsCatalogOptions' which is not supported. Please use a compatible provider. + +## EXTERNAL_METADATA_UNSUPPORTED + +provider '``' does not support external metadata but schema is provided. Please remove the schema and try again. + + diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index 71abf10da328..8697b3a37e39 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -163,6 +163,14 @@ Cannot convert SQL `` to Protobuf `` because schema i Cannot convert SQL `` to Protobuf `` because `` is not in defined values for enum: ``. +### [CANNOT_CREATE_DATA_SOURCE_V2_TABLE](sql-error-conditions-cannot-create-data-source-v2-table-error-class.html) + +[SQLSTATE: 42KDE](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) + +Failed to create data source V2 table: + +For more details see [CANNOT_CREATE_DATA_SOURCE_V2_TABLE](sql-error-conditions-cannot-create-data-source-v2-table-error-class.html) + ### CANNOT_DECODE_URL [SQLSTATE: 22546](sql-error-conditions-sqlstates.html#class-22-data-exception) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 0445a71f9bef..cc51f776caf7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -23,11 +23,12 @@ import java.util import scala.collection.mutable import scala.jdk.CollectionConverters._ +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType, CatalogUtils, ClusterBySpec, SessionCatalog} import org.apache.spark.sql.catalyst.util.TypeUtils._ -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Column, FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableCatalogCapability, TableChange, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Column, FunctionCatalog, Identifier, NamespaceChange, SupportsCatalogOptions, SupportsNamespaces, Table, TableCatalog, TableCatalogCapability, TableChange, V1Table} import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.connector.expressions.Transform @@ -73,42 +74,43 @@ class V2SessionCatalog(catalog: SessionCatalog) override def loadTable(ident: Identifier): Table = { try { - val catalogTable = catalog.getTableMetadata(ident.asTableIdentifier) - if (catalogTable.provider.isDefined) { - DataSourceV2Utils.getTableProvider(catalogTable.provider.get, conf) match { - case Some(tableProvider) => - import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - val dsOptions = catalogTable.properties - val pathOption = catalogTable.storage.locationUri.map(CatalogUtils.URIToString) - val optionsWithPath = if (pathOption.isDefined) { - dsOptions ++ Map("path" -> pathOption.get) - } else { - dsOptions - } - // If the source accepts external table metadata, we can pass the schema and - // partitioning information stored in Hive to `getTable` to avoid schema/partitioning - // inference, which can be very expensive. - if (tableProvider.supportsExternalMetadata()) { - val v2Partitioning = catalogTable.partitionColumnNames.asTransforms - val v2Bucketing = catalogTable.bucketSpec.map( - spec => Array(spec.asTransform)).getOrElse(Array.empty) - val v2Clustering = catalogTable.clusterBySpec.map( - spec => Array(spec.asTransform)).getOrElse(Array.empty) - val partitioning = v2Partitioning ++ v2Bucketing ++ v2Clustering - tableProvider.getTable( - catalogTable.schema, - partitioning, - optionsWithPath.asJava) - } else { - val options = new CaseInsensitiveStringMap(optionsWithPath.asJava) - DataSourceV2Utils.getTableFromProvider( - tableProvider, options, userSpecifiedSchema = None) + val table = catalog.getTableMetadata(ident.asTableIdentifier) + if (table.provider.isDefined) { + DataSourceV2Utils.getTableProvider(table.provider.get, conf) match { + case Some(provider) => + // Get the table properties during creation and append the path option + // to the properties. + val tableProperties = table.properties + val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_)) + val properties = tableProperties ++ pathOption + val dsOptions = new CaseInsensitiveStringMap(properties.asJava) + + provider match { + case p: SupportsCatalogOptions => + throw new IllegalArgumentException( + f"provider $p should not implement SupportsCatalogOptions") + + case _ => + // If the source accepts external table metadata, we can pass the schema and + // partitioning information stored in Hive to `getTable` to avoid expensive + // schema/partitioning inference. + if (provider.supportsExternalMetadata()) { + provider.getTable( + table.schema, + getV2Partitioning(table), + dsOptions.asCaseSensitiveMap()) + } else { + provider.getTable( + provider.inferSchema(dsOptions), + provider.inferPartitioning(dsOptions), + dsOptions.asCaseSensitiveMap()) + } } case _ => - V1Table(catalogTable) + V1Table(table) } } else { - V1Table(catalogTable) + V1Table(table) } } catch { case _: NoSuchDatabaseException => @@ -132,6 +134,16 @@ class V2SessionCatalog(catalog: SessionCatalog) throw QueryCompilationErrors.timeTravelUnsupportedError(toSQLId(nameParts)) } + private def getV2Partitioning(table: CatalogTable): Array[Transform] = { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + val v2Partitioning = table.partitionColumnNames.asTransforms + val v2Bucketing = table.bucketSpec.map( + spec => Array(spec.asTransform)).getOrElse(Array.empty) + val v2Clustering = table.clusterBySpec.map( + spec => Array(spec.asTransform)).getOrElse(Array.empty) + v2Partitioning ++ v2Bucketing ++ v2Clustering + } + override def invalidateTable(ident: Identifier): Unit = { catalog.refreshTable(ident.asTableIdentifier) } @@ -151,9 +163,40 @@ class V2SessionCatalog(catalog: SessionCatalog) partitions: Array[Transform], properties: util.Map[String, String]): Table = { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TransformHelper + val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, conf.defaultDataSourceName) + + val (newSchema, newPartitions) = DataSourceV2Utils.getTableProvider(provider, conf) match { + case Some(_: SupportsCatalogOptions) => + throw new SparkUnsupportedOperationException( + errorClass = "CANNOT_CREATE_DATA_SOURCE_V2_TABLE.CATALOG_OPTIONS_UNSUPPORTED", + messageParameters = Map("provider" -> provider)) + + case Some(p) if !p.supportsExternalMetadata() => + // Partitions cannot be specified when schema is empty. + if (schema.nonEmpty) { + throw new SparkUnsupportedOperationException( + errorClass = "CANNOT_CREATE_DATA_SOURCE_V2_TABLE.EXTERNAL_METADATA_UNSUPPORTED", + messageParameters = Map("provider" -> provider)) + } + (schema, partitions) + + case Some(tableProvider) => + assert(tableProvider.supportsExternalMetadata()) + if (schema.isEmpty) { + // Infer the schema and partitions and store them in the catalog. + val dsOptions = new CaseInsensitiveStringMap(properties) + (tableProvider.inferSchema(dsOptions), tableProvider.inferPartitioning(dsOptions)) + } else { + // TODO: when schema is defined but partitioning is empty, should we infer it? + (schema, partitions) + } + + case _ => + (schema, partitions) + } + val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) = partitions.toImmutableArraySeq.convertTransforms - val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, conf.defaultDataSourceName) val tableProperties = properties.asScala val location = Option(properties.get(TableCatalog.PROP_LOCATION)) val storage = DataSource.buildStorageFormatFromOptions(toOptions(tableProperties.toMap)) @@ -169,13 +212,13 @@ class V2SessionCatalog(catalog: SessionCatalog) identifier = ident.asTableIdentifier, tableType = tableType, storage = storage, - schema = schema, + schema = newSchema, provider = Some(provider), partitionColumnNames = partitionColumns, bucketSpec = maybeBucketSpec, properties = tableProperties.toMap ++ maybeClusterBySpec.map( - clusterBySpec => ClusterBySpec.toProperty(schema, clusterBySpec, conf.resolver)), + clusterBySpec => ClusterBySpec.toProperty(newSchema, clusterBySpec, conf.resolver)), tracksPartitionsInCatalog = conf.manageFilesourcePartitions, comment = Option(properties.get(TableCatalog.PROP_COMMENT))) diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSchemaRequiredDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSchemaRequiredDataSource.java index e7d728fc5a08..91b6621e7767 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSchemaRequiredDataSource.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSchemaRequiredDataSource.java @@ -44,7 +44,9 @@ public StructType readSchema() { @Override public InputPartition[] planInputPartitions() { - return new InputPartition[0]; + InputPartition[] partitions = new InputPartition[1]; + partitions[0] = new JavaRangeInputPartition(0, 2); + return partitions; } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index 43e3a7450a22..202e9132c7ad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -22,6 +22,7 @@ import java.util.OptionalLong import test.org.apache.spark.sql.connector._ +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.catalog.{PartitionInternalRow, SupportsRead, Table, TableCapability, TableProvider} @@ -231,11 +232,11 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS val e = intercept[IllegalArgumentException](spark.read.format(cls.getName).load()) assert(e.getMessage.contains("requires a user-supplied schema")) - val schema = new StructType().add("i", "int").add("s", "string") + val schema = new StructType().add("i", "int").add("j", "int") val df = spark.read.format(cls.getName).schema(schema).load() assert(df.schema == schema) - assert(df.collect().isEmpty) + checkAnswer(df, Seq(Row(0, 0), Row(1, -1))) } } } @@ -634,34 +635,80 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS } } - test("SPARK-46043: Support create table using DSv2 sources") { + test("SPARK-46043: create table in SQL using a DSv2 source") { Seq(classOf[SimpleDataSourceV2], classOf[JavaSimpleDataSourceV2]).foreach { cls => withClue(cls.getName) { + // Create a table with empty schema. withTable("test") { sql(s"CREATE TABLE test USING ${cls.getName}") checkAnswer( sql(s"SELECT * FROM test WHERE i < 3"), Seq(Row(0, 0), Row(1, -1), Row(2, -2))) } + // Create a table with non-empty schema is not allowed. + checkError( + exception = intercept[SparkUnsupportedOperationException] { + sql(s"CREATE TABLE test(a INT, b INT) USING ${cls.getName}") + }, + errorClass = "CANNOT_CREATE_DATA_SOURCE_V2_TABLE.EXTERNAL_METADATA_UNSUPPORTED", + parameters = Map("provider" -> cls.getName) + ) } } + } + + test("SPARK-46043: create table in SQL with schema required data source") { + val cls = classOf[SchemaRequiredDataSource] + val e = intercept[IllegalArgumentException] { + sql(s"CREATE TABLE test USING ${cls.getName}") + } + assert(e.getMessage.contains("requires a user-supplied schema")) withTable("test") { - val cls = classOf[SchemaRequiredDataSource] - withClue(cls.getName) { - sql(s"CREATE TABLE test USING ${cls.getName}") - checkAnswer(sql(s"SELECT * FROM test"), Nil) - } + sql(s"CREATE TABLE test(i INT, j INT) USING ${cls.getName}") + checkAnswer(sql(s"SELECT * FROM test"), Seq(Row(0, 0), Row(1, -1))) } withTable("test") { - val cls = classOf[SupportsExternalMetadataWritableDataSource] - withClue(cls.getName) { - withTempDir { dir => - sql(s"CREATE TABLE test USING ${cls.getName} OPTIONS (path '${dir.getCanonicalPath}')") - checkAnswer(sql(s"SELECT * FROM test"), Nil) - sql(s"CREATE OR REPLACE TABLE test USING ${cls.getName} " + - s"LOCATION '${dir.getCanonicalPath}'") - checkAnswer(sql(s"SELECT * FROM test"), Nil) - } + sql(s"CREATE TABLE test(i INT) USING ${cls.getName}") + checkAnswer(sql(s"SELECT * FROM test"), Seq(Row(0), Row(1))) + } + withTable("test") { + // Test the behavior when there is a mismatch between the schema defined in the + // CREATE TABLE command and the actual schema produced by the data source. The + // resulting behavior is not guaranteed and may vary based on the data source's + // implementation. + sql(s"CREATE TABLE test(i INT, j INT, k INT) USING ${cls.getName}") + val e = intercept[Exception] { + sql("SELECT * FROM test").collect() + } + assert(e.getMessage.contains( + "java.lang.ArrayIndexOutOfBoundsException: Index 2 out of bounds for length 2")) + } + } + + test("SPARK-46043: create table in SQL with path option") { + val cls = classOf[SupportsExternalMetadataDataSource] + withTempDir { dir => + val path = s"${dir.getCanonicalPath}/test" + Seq((0, 1), (1, 2)).toDF("x", "y").write.format("csv").save(path) + withTable("test") { + sql( + s""" + |CREATE TABLE test USING ${cls.getName} + |OPTIONS (PATH '$path') + |""".stripMargin) + checkAnswer(sql("SELECT * FROM test"), Seq(Row(0, 1), Row(1, 2))) + sql( + s""" + |CREATE OR REPLACE TABLE test USING ${cls.getName} + |OPTIONS (PATH '${dir.getCanonicalPath}/non-existing') + |""".stripMargin) + checkAnswer(sql("SELECT * FROM test"), Nil) + sql( + s""" + |CREATE OR REPLACE TABLE test USING ${cls.getName} + |LOCATION '$path' + |""".stripMargin) + checkAnswer(sql("SELECT * FROM test"), Seq(Row(0, 1), Row(1, 2))) } } } @@ -942,7 +989,9 @@ class AdvancedReaderFactory(requiredSchema: StructType) extends PartitionReaderF class SchemaRequiredDataSource extends TableProvider { class MyScanBuilder(schema: StructType) extends SimpleScanBuilder { - override def planInputPartitions(): Array[InputPartition] = Array.empty + override def planInputPartitions(): Array[InputPartition] = { + Array(RangeInputPartition(0, 2)) + } override def readSchema(): StructType = schema } @@ -1159,6 +1208,10 @@ class SimpleWriteOnlyDataSource extends SimpleWritableDataSource { } } +class SupportsExternalMetadataDataSource extends SimpleWritableDataSource { + override def supportsExternalMetadata(): Boolean = true +} + class SupportsExternalMetadataWritableDataSource extends SimpleWritableDataSource { override def supportsExternalMetadata(): Boolean = true diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala index fd4f719417e4..b07257a07496 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala @@ -25,7 +25,7 @@ import scala.util.Try import org.scalatest.BeforeAndAfter -import org.apache.spark.SparkException +import org.apache.spark.{SparkException, SparkUnsupportedOperationException} import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, SaveMode} import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression} @@ -163,6 +163,16 @@ class SupportsCatalogOptionsSuite extends QueryTest with SharedSparkSession with assert(load("t1", Some(catalogName)).count() === 0) } + test("SPARK-46043: create table in SQL with catalog options supported source") { + checkError( + exception = intercept[SparkUnsupportedOperationException] { + sql(s"CREATE TABLE test USING $format") + }, + errorClass = "CANNOT_CREATE_DATA_SOURCE_V2_TABLE.CATALOG_OPTIONS_UNSUPPORTED", + parameters = Map("provider" -> format) + ) + } + test("append and overwrite modes - session catalog") { sql(s"create table t1 (id bigint) using $format") val df = spark.range(10) From ac74a3683297ab5916c4400deac8a2d00953c42b Mon Sep 17 00:00:00 2001 From: allisonwang-db Date: Tue, 28 Nov 2023 13:23:20 -0800 Subject: [PATCH 04/10] address comments --- .../main/resources/error/error-classes.json | 36 +++++++++--------- ...create-data-source-v2-table-error-class.md | 2 +- .../connector/SimpleTableProvider.scala | 1 + .../analysis/ResolveSessionCatalog.scala | 12 ++++-- .../datasources/v2/DataSourceV2Utils.scala | 11 +----- .../datasources/v2/V2SessionCatalog.scala | 17 +++++++-- .../sql/connector/DataSourceV2SQLSuite.scala | 38 +++++++++++++++---- 7 files changed, 74 insertions(+), 43 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 291ab870d125..753b9c9b1a56 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -149,6 +149,24 @@ ], "sqlState" : "42846" }, + "CANNOT_CREATE_DATA_SOURCE_V2_TABLE" : { + "message" : [ + "Failed to create data source V2 table:" + ], + "subClass" : { + "CATALOG_OPTIONS_UNSUPPORTED" : { + "message" : [ + "provider '' implements 'SupportsCatalogOptions' which is not supported. Please use a compatible provider." + ] + }, + "EXTERNAL_METADATA_UNSUPPORTED" : { + "message" : [ + "provider '' does not support external metadata but a schema is provided. Please remove the schema when creating the table." + ] + } + }, + "sqlState" : "42KDE" + }, "CANNOT_DECODE_URL" : { "message" : [ "The provided URL cannot be decoded: . Please ensure that the URL is properly formatted and try again." @@ -376,24 +394,6 @@ }, "sqlState" : "58030" }, - "CANNOT_CREATE_DATA_SOURCE_V2_TABLE" : { - "message" : [ - "Failed to create data source V2 table:" - ], - "subClass" : { - "CATALOG_OPTIONS_UNSUPPORTED" : { - "message": [ - "provider '' implements 'SupportsCatalogOptions' which is not supported. Please use a compatible provider." - ] - }, - "EXTERNAL_METADATA_UNSUPPORTED" : { - "message": [ - "provider '' does not support external metadata but schema is provided. Please remove the schema and try again." - ] - } - }, - "sqlState": "42KDE" - }, "CAST_INVALID_INPUT" : { "message" : [ "The value of the type cannot be cast to because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. If necessary set to \"false\" to bypass this error." diff --git a/docs/sql-error-conditions-cannot-create-data-source-v2-table-error-class.md b/docs/sql-error-conditions-cannot-create-data-source-v2-table-error-class.md index 950fd7d11278..746c5c3f0716 100644 --- a/docs/sql-error-conditions-cannot-create-data-source-v2-table-error-class.md +++ b/docs/sql-error-conditions-cannot-create-data-source-v2-table-error-class.md @@ -31,6 +31,6 @@ provider '``' implements 'SupportsCatalogOptions' which is not support ## EXTERNAL_METADATA_UNSUPPORTED -provider '``' does not support external metadata but schema is provided. Please remove the schema and try again. +provider '``' does not support external metadata but a schema is provided. Please remove the schema when creating the table. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SimpleTableProvider.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SimpleTableProvider.scala index f8b237195fa8..24ac2c77a682 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SimpleTableProvider.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SimpleTableProvider.scala @@ -31,6 +31,7 @@ trait SimpleTableProvider extends TableProvider { def getTable(options: CaseInsensitiveStringMap): Table private[this] var loadedTable: Table = _ + private def getOrLoadTable(options: CaseInsensitiveStringMap): Table = { if (loadedTable == null) loadedTable = getTable(options) loadedTable diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 78454c6771d9..d44de0b260b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -163,7 +163,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) val (storageFormat, provider) = getStorageFormatAndProvider( c.tableSpec.provider, tableSpec.options, c.tableSpec.location, c.tableSpec.serde, ctas = false) - if (!DataSourceV2Utils.isV2Provider(provider, conf)) { + if (!isV2Provider(provider)) { constructV1TableCmd(None, c.tableSpec, ident, c.tableSchema, c.partitioning, c.ignoreIfExists, storageFormat, provider) } else { @@ -179,7 +179,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) c.tableSpec.serde, ctas = true) - if (!DataSourceV2Utils.isV2Provider(provider, conf)) { + if (!isV2Provider(provider)) { constructV1TableCmd(Some(c.query), c.tableSpec, ident, new StructType, c.partitioning, c.ignoreIfExists, storageFormat, provider) } else { @@ -193,7 +193,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) // session catalog and the table provider is not v2. case c @ ReplaceTable(ResolvedV1Identifier(ident), _, _, _, _) => val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName) - if (!DataSourceV2Utils.isV2Provider(provider, conf)) { + if (!isV2Provider(provider)) { throw QueryCompilationErrors.unsupportedTableOperationError( ident, "REPLACE TABLE") } else { @@ -202,7 +202,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case c @ ReplaceTableAsSelect(ResolvedV1Identifier(ident), _, _, _, _, _, _) => val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName) - if (!DataSourceV2Utils.isV2Provider(provider, conf)) { + if (!isV2Provider(provider)) { throw QueryCompilationErrors.unsupportedTableOperationError( ident, "REPLACE TABLE AS SELECT") } else { @@ -611,6 +611,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) StructField(col.name.head, col.dataType, nullable = true, builder.build()) } + private def isV2Provider(provider: String): Boolean = { + DataSourceV2Utils.getTableProvider(provider, conf).isDefined + } + private object DatabaseInSessionCatalog { def unapply(resolved: ResolvedNamespace): Option[String] = resolved match { case ResolvedNamespace(catalog, _, _) if !isSessionCatalog(catalog) => None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala index 8f9859592fba..80d8fe787c24 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala @@ -162,18 +162,11 @@ private[sql] object DataSourceV2Utils extends Logging { if (DDLUtils.isHiveTable(Some(provider))) return None DataSource.lookupDataSourceV2(provider, conf) match { // TODO(SPARK-28396): Currently file source v2 can't work with tables. - case Some(_: FileDataSourceV2) => None - case o => o + case Some(p) if !p.isInstanceOf[FileDataSourceV2] => Some(p) + case _ => None } } - /** - * Check if the provider is a v2 provider. - */ - def isV2Provider(provider: String, conf: SQLConf): Boolean = { - getTableProvider(provider, conf).isDefined - } - private lazy val objectMapper = new ObjectMapper() def getOptionsWithPaths( extraOptions: CaseInsensitiveMap[String], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index cc51f776caf7..ec7613dabc4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -88,7 +88,7 @@ class V2SessionCatalog(catalog: SessionCatalog) provider match { case p: SupportsCatalogOptions => throw new IllegalArgumentException( - f"provider $p should not implement SupportsCatalogOptions") + f"provider $p should not implement SupportsCatalogOptions.") case _ => // If the source accepts external table metadata, we can pass the schema and @@ -171,23 +171,32 @@ class V2SessionCatalog(catalog: SessionCatalog) errorClass = "CANNOT_CREATE_DATA_SOURCE_V2_TABLE.CATALOG_OPTIONS_UNSUPPORTED", messageParameters = Map("provider" -> provider)) + // If the provider does not support external metadata, users should not be allowed to + // specify custom schema when creating the data source table, since the schema will not + // be used when loading the table. case Some(p) if !p.supportsExternalMetadata() => - // Partitions cannot be specified when schema is empty. if (schema.nonEmpty) { throw new SparkUnsupportedOperationException( errorClass = "CANNOT_CREATE_DATA_SOURCE_V2_TABLE.EXTERNAL_METADATA_UNSUPPORTED", messageParameters = Map("provider" -> provider)) } + // V2CreateTablePlan does not allow non-empty partitions when schema is empty. This + // is checked in `PreProcessTableCreation` rule. + assert(partitions.isEmpty, + s"Partitions should be empty when the schema is empty: ${partitions.mkString(", ")}") (schema, partitions) case Some(tableProvider) => assert(tableProvider.supportsExternalMetadata()) + lazy val dsOptions = new CaseInsensitiveStringMap(properties) if (schema.isEmpty) { + assert(partitions.isEmpty, + s"Partitions should be empty when the schema is empty: ${partitions.mkString(", ")}") // Infer the schema and partitions and store them in the catalog. - val dsOptions = new CaseInsensitiveStringMap(properties) (tableProvider.inferSchema(dsOptions), tableProvider.inferPartitioning(dsOptions)) + } else if (partitions.isEmpty) { + (schema, tableProvider.inferPartitioning(dsOptions)) } else { - // TODO: when schema is defined but partitioning is empty, should we infer it? (schema, partitions) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index c2e759efe402..c69155370541 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.connector import java.sql.Timestamp import java.time.{Duration, LocalDate, Period} +import java.util import java.util.Locale import scala.concurrent.duration.MICROSECONDS @@ -36,7 +37,8 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.connector.catalog.{Column => ColumnV2, _} import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership -import org.apache.spark.sql.connector.expressions.LiteralValue +import org.apache.spark.sql.connector.expressions.{LiteralValue, Transform} +import org.apache.spark.sql.connector.read.{InputPartition, ScanBuilder} import org.apache.spark.sql.errors.QueryErrorsBase import org.apache.spark.sql.execution.FilterExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -191,7 +193,7 @@ class DataSourceV2SQLSuiteV1Filter val testCatalog = catalog(SESSION_CATALOG_NAME).asTableCatalog val table = testCatalog.loadTable(Identifier.of(Array("default"), "table_name")) - assert(table.name == "default.table_name") + assert(table.name == "org.apache.spark.sql.connector.FakeV2Provider") assert(table.partitioning.isEmpty) assert(table.properties == withDefaultOwnership(Map("provider" -> v2Source)).asJava) assert(table.schema == new StructType().add("id", LongType).add("data", StringType)) @@ -666,7 +668,7 @@ class DataSourceV2SQLSuiteV1Filter val testCatalog = catalog(SESSION_CATALOG_NAME).asTableCatalog val table = testCatalog.loadTable(Identifier.of(Array("default"), "table_name")) - assert(table.name == "default.table_name") + assert(table.name == "org.apache.spark.sql.connector.FakeV2Provider") assert(table.partitioning.isEmpty) assert(table.properties == withDefaultOwnership(Map("provider" -> v2Source)).asJava) assert(table.schema == new StructType() @@ -1120,7 +1122,7 @@ class DataSourceV2SQLSuiteV1Filter }, errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", parameters = Map( - "tableName" -> "`default`.`tbl`", + "tableName" -> "`org`.`apache`.`spark`.`sql`.`connector`.`FakeV2Provider`", "tableColumns" -> "`id`, `data`", "dataColumns" -> "`col1`" ) @@ -1155,7 +1157,7 @@ class DataSourceV2SQLSuiteV1Filter }, errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", parameters = Map( - "tableName" -> "`default`.`tbl`", + "tableName" -> "`org`.`apache`.`spark`.`sql`.`connector`.`FakeV2Provider`", "tableColumns" -> "`id`, `data`", "dataColumns" -> "`col1`" ) @@ -1191,7 +1193,7 @@ class DataSourceV2SQLSuiteV1Filter }, errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", parameters = Map( - "tableName" -> "`default`.`tbl`", + "tableName" -> "`org`.`apache`.`spark`.`sql`.`connector`.`FakeV2Provider`", "tableColumns" -> "`id`, `data`, `data2`", "dataColumns" -> "`col1`, `col2`" ) @@ -3366,8 +3368,30 @@ class DataSourceV2SQLSuiteV2Filter extends DataSourceV2SQLSuite { /** Used as a V2 DataSource for V2SessionCatalog DDL */ class FakeV2Provider extends SimpleTableProvider { + + override def supportsExternalMetadata(): Boolean = true + + class MyScanBuilder extends SimpleScanBuilder { + override def planInputPartitions(): Array[InputPartition] = { + Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10)) + } + } + + override def getTable( + schema: StructType, + partitioning: Array[Transform], + properties: util.Map[String, String]): Table = { + getTable(new CaseInsensitiveStringMap(properties)) + } + override def getTable(options: CaseInsensitiveStringMap): Table = { - throw new UnsupportedOperationException("Unnecessary for DDL tests") + new SimpleBatchTable { + override def name(): String = options.get("provider") + + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + new MyScanBuilder() + } + } } } From 572fea00abe85a6dfcdb0e9ff22e1be77e97ff10 Mon Sep 17 00:00:00 2001 From: allisonwang-db Date: Wed, 29 Nov 2023 15:04:57 -0800 Subject: [PATCH 05/10] fix tests --- .../main/resources/error/error-classes.json | 5 ----- ...create-data-source-v2-table-error-class.md | 4 ---- .../datasources/v2/DataSourceV2Relation.scala | 13 ++++++++--- .../connector/SimpleTableProvider.scala | 2 -- .../datasources/v2/V2SessionCatalog.scala | 16 +++++--------- .../sql/connector/DataSourceV2SQLSuite.scala | 22 +++++-------------- .../spark/sql/connector/InsertIntoTests.scala | 12 ++++++++-- .../SupportsCatalogOptionsSuite.scala | 10 --------- .../connector/TestV2SessionCatalogBase.scala | 5 +++-- .../sql/connector/V1WriteFallbackSuite.scala | 2 ++ 10 files changed, 37 insertions(+), 54 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 753b9c9b1a56..f987a309c4a3 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -154,11 +154,6 @@ "Failed to create data source V2 table:" ], "subClass" : { - "CATALOG_OPTIONS_UNSUPPORTED" : { - "message" : [ - "provider '' implements 'SupportsCatalogOptions' which is not supported. Please use a compatible provider." - ] - }, "EXTERNAL_METADATA_UNSUPPORTED" : { "message" : [ "provider '' does not support external metadata but a schema is provided. Please remove the schema when creating the table." diff --git a/docs/sql-error-conditions-cannot-create-data-source-v2-table-error-class.md b/docs/sql-error-conditions-cannot-create-data-source-v2-table-error-class.md index 746c5c3f0716..b240efd717ef 100644 --- a/docs/sql-error-conditions-cannot-create-data-source-v2-table-error-class.md +++ b/docs/sql-error-conditions-cannot-create-data-source-v2-table-error-class.md @@ -25,10 +25,6 @@ Failed to create data source V2 table: This error class has the following derived error classes: -## CATALOG_OPTIONS_UNSUPPORTED - -provider '``' implements 'SupportsCatalogOptions' which is not supported. Please use a compatible provider. - ## EXTERNAL_METADATA_UNSUPPORTED provider '``' does not support external metadata but a schema is provided. Please remove the schema when creating the table. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 92638a152871..0d44211610bd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelat import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Expression, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, ExposesMetadataColumns, Histogram, HistogramBin, LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes -import org.apache.spark.sql.catalyst.util.{truncatedString, CharVarcharUtils} +import org.apache.spark.sql.catalyst.util.{quoteIdentifier, truncatedString, CharVarcharUtils} import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, SupportsMetadataColumns, Table, TableCapability} import org.apache.spark.sql.connector.read.{Scan, Statistics => V2Statistics, SupportsReportStatistics} import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream} @@ -61,7 +61,14 @@ case class DataSourceV2Relation( Nil } - override def name: String = table.name() + override def name: String = { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + (catalog, identifier) match { + case (Some(cat), Some(ident)) => s"${quoteIdentifier(cat.name())}.${ident.quoted}" + case (None, Some(ident)) => ident.quoted + case _ => table.name() + } + } override def skipSchemaResolution: Boolean = table.supports(TableCapability.ACCEPT_ANY_SCHEMA) @@ -127,7 +134,7 @@ case class DataSourceV2ScanRelation( keyGroupedPartitioning: Option[Seq[Expression]] = None, ordering: Option[Seq[SortOrder]] = None) extends LeafNode with NamedRelation { - override def name: String = relation.table.name() + override def name: String = relation.name override def simpleString(maxFields: Int): String = { s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SimpleTableProvider.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SimpleTableProvider.scala index 24ac2c77a682..0e9072ddf510 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SimpleTableProvider.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SimpleTableProvider.scala @@ -31,7 +31,6 @@ trait SimpleTableProvider extends TableProvider { def getTable(options: CaseInsensitiveStringMap): Table private[this] var loadedTable: Table = _ - private def getOrLoadTable(options: CaseInsensitiveStringMap): Table = { if (loadedTable == null) loadedTable = getTable(options) loadedTable @@ -46,7 +45,6 @@ trait SimpleTableProvider extends TableProvider { schema: StructType, partitioning: Array[Transform], properties: util.Map[String, String]): Table = { - assert(partitioning.isEmpty) getOrLoadTable(new CaseInsensitiveStringMap(properties)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index ec7613dabc4d..8019a5395690 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -81,14 +81,15 @@ class V2SessionCatalog(catalog: SessionCatalog) // Get the table properties during creation and append the path option // to the properties. val tableProperties = table.properties - val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_)) - val properties = tableProperties ++ pathOption + val properties = tableProperties ++ + table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_)) val dsOptions = new CaseInsensitiveStringMap(properties.asJava) provider match { - case p: SupportsCatalogOptions => - throw new IllegalArgumentException( - f"provider $p should not implement SupportsCatalogOptions.") + case _: SupportsCatalogOptions => + // Currently, loadTable cannot support V2 provider that implements the + // SupportsCatalogOptions. Keep the behavior as before. + V1Table(table) case _ => // If the source accepts external table metadata, we can pass the schema and @@ -166,11 +167,6 @@ class V2SessionCatalog(catalog: SessionCatalog) val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, conf.defaultDataSourceName) val (newSchema, newPartitions) = DataSourceV2Utils.getTableProvider(provider, conf) match { - case Some(_: SupportsCatalogOptions) => - throw new SparkUnsupportedOperationException( - errorClass = "CANNOT_CREATE_DATA_SOURCE_V2_TABLE.CATALOG_OPTIONS_UNSUPPORTED", - messageParameters = Map("provider" -> provider)) - // If the provider does not support external metadata, users should not be allowed to // specify custom schema when creating the data source table, since the schema will not // be used when loading the table. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index c69155370541..4d0d90fe6523 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.connector import java.sql.Timestamp import java.time.{Duration, LocalDate, Period} -import java.util import java.util.Locale import scala.concurrent.duration.MICROSECONDS @@ -37,7 +36,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.connector.catalog.{Column => ColumnV2, _} import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership -import org.apache.spark.sql.connector.expressions.{LiteralValue, Transform} +import org.apache.spark.sql.connector.expressions.LiteralValue import org.apache.spark.sql.connector.read.{InputPartition, ScanBuilder} import org.apache.spark.sql.errors.QueryErrorsBase import org.apache.spark.sql.execution.FilterExec @@ -193,7 +192,7 @@ class DataSourceV2SQLSuiteV1Filter val testCatalog = catalog(SESSION_CATALOG_NAME).asTableCatalog val table = testCatalog.loadTable(Identifier.of(Array("default"), "table_name")) - assert(table.name == "org.apache.spark.sql.connector.FakeV2Provider") + assert(table.name == "default.table_name") assert(table.partitioning.isEmpty) assert(table.properties == withDefaultOwnership(Map("provider" -> v2Source)).asJava) assert(table.schema == new StructType().add("id", LongType).add("data", StringType)) @@ -668,7 +667,7 @@ class DataSourceV2SQLSuiteV1Filter val testCatalog = catalog(SESSION_CATALOG_NAME).asTableCatalog val table = testCatalog.loadTable(Identifier.of(Array("default"), "table_name")) - assert(table.name == "org.apache.spark.sql.connector.FakeV2Provider") + assert(table.name == "default.table_name") assert(table.partitioning.isEmpty) assert(table.properties == withDefaultOwnership(Map("provider" -> v2Source)).asJava) assert(table.schema == new StructType() @@ -1122,7 +1121,7 @@ class DataSourceV2SQLSuiteV1Filter }, errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", parameters = Map( - "tableName" -> "`org`.`apache`.`spark`.`sql`.`connector`.`FakeV2Provider`", + "tableName" -> "`spark_catalog`.`default`.`tbl`", "tableColumns" -> "`id`, `data`", "dataColumns" -> "`col1`" ) @@ -1157,7 +1156,7 @@ class DataSourceV2SQLSuiteV1Filter }, errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", parameters = Map( - "tableName" -> "`org`.`apache`.`spark`.`sql`.`connector`.`FakeV2Provider`", + "tableName" -> "`spark_catalog`.`default`.`tbl`", "tableColumns" -> "`id`, `data`", "dataColumns" -> "`col1`" ) @@ -1193,7 +1192,7 @@ class DataSourceV2SQLSuiteV1Filter }, errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", parameters = Map( - "tableName" -> "`org`.`apache`.`spark`.`sql`.`connector`.`FakeV2Provider`", + "tableName" -> "`spark_catalog`.`default`.`tbl`", "tableColumns" -> "`id`, `data`, `data2`", "dataColumns" -> "`col1`, `col2`" ) @@ -3377,17 +3376,8 @@ class FakeV2Provider extends SimpleTableProvider { } } - override def getTable( - schema: StructType, - partitioning: Array[Transform], - properties: util.Map[String, String]): Table = { - getTable(new CaseInsensitiveStringMap(properties)) - } - override def getTable(options: CaseInsensitiveStringMap): Table = { new SimpleBatchTable { - override def name(): String = options.get("provider") - override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { new MyScanBuilder() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala index 63bb0148972c..fa30969d65c5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala @@ -126,7 +126,11 @@ abstract class InsertIntoTests( val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") verifyTable(t1, Seq.empty[(Long, String, String)].toDF("id", "data", "missing")) - val tableName = if (catalogAndNamespace.isEmpty) toSQLId(s"default.$t1") else toSQLId(t1) + val tableName = if (catalogAndNamespace.isEmpty) { + toSQLId(s"spark_catalog.default.$t1") + } else { + toSQLId(t1) + } checkError( exception = intercept[AnalysisException] { doInsert(t1, df) @@ -145,7 +149,11 @@ abstract class InsertIntoTests( sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format") val df = Seq((1L, "a", "mango")).toDF("id", "data", "fruit") verifyTable(t1, Seq.empty[(Long, String)].toDF("id", "data")) - val tableName = if (catalogAndNamespace.isEmpty) toSQLId(s"default.$t1") else toSQLId(t1) + val tableName = if (catalogAndNamespace.isEmpty) { + toSQLId(s"spark_catalog.default.$t1") + } else { + toSQLId(t1) + } checkError( exception = intercept[AnalysisException] { doInsert(t1, df) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala index b07257a07496..f0b9890fdb64 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala @@ -163,16 +163,6 @@ class SupportsCatalogOptionsSuite extends QueryTest with SharedSparkSession with assert(load("t1", Some(catalogName)).count() === 0) } - test("SPARK-46043: create table in SQL with catalog options supported source") { - checkError( - exception = intercept[SparkUnsupportedOperationException] { - sql(s"CREATE TABLE test USING $format") - }, - errorClass = "CANNOT_CREATE_DATA_SOURCE_V2_TABLE.CATALOG_OPTIONS_UNSUPPORTED", - parameters = Map("provider" -> format) - ) - } - test("append and overwrite modes - session catalog") { sql(s"create table t1 (id bigint) using $format") val df = spark.range(10) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala index 1396ef82925a..719b006c1460 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala @@ -78,6 +78,7 @@ private[connector] trait TestV2SessionCatalogBase[T <: Table] extends Delegating schema: StructType, partitions: Array[Transform], properties: java.util.Map[String, String]): Table = { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper val key = TestV2SessionCatalogBase.SIMULATE_ALLOW_EXTERNAL_PROPERTY val propsWithLocation = if (properties.containsKey(key)) { // Always set a location so that CREATE EXTERNAL TABLE won't fail with LOCATION not specified. @@ -92,8 +93,8 @@ private[connector] trait TestV2SessionCatalogBase[T <: Table] extends Delegating } else { properties } - val created = super.createTable(ident, schema, partitions, propsWithLocation) - val t = newTable(created.name(), schema, partitions, propsWithLocation) + super.createTable(ident, schema, partitions, propsWithLocation) + val t = newTable(ident.quoted, schema, partitions, propsWithLocation) addTable(ident, t) t } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala index ec62739b9cf2..a150298534b9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala @@ -253,6 +253,8 @@ class InMemoryV1Provider extends SimpleTableProvider with DataSourceRegister with CreatableRelationProvider { + override def supportsExternalMetadata(): Boolean = true + override def getTable(options: CaseInsensitiveStringMap): Table = { InMemoryV1Provider.tables.getOrElse(options.get("name"), { From fa5243b317a60d3d77d9236ffed604590885709d Mon Sep 17 00:00:00 2001 From: allisonwang-db Date: Wed, 29 Nov 2023 15:10:15 -0800 Subject: [PATCH 06/10] fix --- .../spark/sql/execution/datasources/v2/V2SessionCatalog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 8019a5395690..9ecb07edd499 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -201,7 +201,7 @@ class V2SessionCatalog(catalog: SessionCatalog) } val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) = - partitions.toImmutableArraySeq.convertTransforms + newPartitions.toImmutableArraySeq.convertTransforms val tableProperties = properties.asScala val location = Option(properties.get(TableCatalog.PROP_LOCATION)) val storage = DataSource.buildStorageFormatFromOptions(toOptions(tableProperties.toMap)) From 8575ce037605b81a0302d27438a54f9c22de13e2 Mon Sep 17 00:00:00 2001 From: allisonwang-db Date: Thu, 30 Nov 2023 13:22:01 -0800 Subject: [PATCH 07/10] fix style --- .../datasources/v2/V2SessionCatalog.scala | 37 +++++++------------ .../SupportsCatalogOptionsSuite.scala | 4 +- 2 files changed, 16 insertions(+), 25 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 9ecb07edd499..399929d34d26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableId import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType, CatalogUtils, ClusterBySpec, SessionCatalog} import org.apache.spark.sql.catalyst.util.TypeUtils._ -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Column, FunctionCatalog, Identifier, NamespaceChange, SupportsCatalogOptions, SupportsNamespaces, Table, TableCatalog, TableCatalogCapability, TableChange, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Column, FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableCatalogCapability, TableChange, V1Table} import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.connector.expressions.Transform @@ -84,28 +84,19 @@ class V2SessionCatalog(catalog: SessionCatalog) val properties = tableProperties ++ table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_)) val dsOptions = new CaseInsensitiveStringMap(properties.asJava) - - provider match { - case _: SupportsCatalogOptions => - // Currently, loadTable cannot support V2 provider that implements the - // SupportsCatalogOptions. Keep the behavior as before. - V1Table(table) - - case _ => - // If the source accepts external table metadata, we can pass the schema and - // partitioning information stored in Hive to `getTable` to avoid expensive - // schema/partitioning inference. - if (provider.supportsExternalMetadata()) { - provider.getTable( - table.schema, - getV2Partitioning(table), - dsOptions.asCaseSensitiveMap()) - } else { - provider.getTable( - provider.inferSchema(dsOptions), - provider.inferPartitioning(dsOptions), - dsOptions.asCaseSensitiveMap()) - } + // If the source accepts external table metadata, we can pass the schema and + // partitioning information stored in Hive to `getTable` to avoid expensive + // schema/partitioning inference. + if (provider.supportsExternalMetadata()) { + provider.getTable( + table.schema, + getV2Partitioning(table), + dsOptions.asCaseSensitiveMap()) + } else { + provider.getTable( + provider.inferSchema(dsOptions), + provider.inferPartitioning(dsOptions), + dsOptions.asCaseSensitiveMap()) } case _ => V1Table(table) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala index f0b9890fdb64..891147bd6f6a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala @@ -25,7 +25,7 @@ import scala.util.Try import org.scalatest.BeforeAndAfter -import org.apache.spark.{SparkException, SparkUnsupportedOperationException} +import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, SaveMode} import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression} @@ -192,7 +192,7 @@ class SupportsCatalogOptionsSuite extends QueryTest with SharedSparkSession with } test("fail on user specified schema when reading - session catalog") { - sql(s"create table t1 (id bigint) using $format") + sql(s"create table t1 (id bigint) using $format options ('name'='t1')") val e = intercept[IllegalArgumentException] { spark.read.format(format).option("name", "t1").schema("id bigint").load() } From 23177fc4f5e7a9be4f7364c32fa28b8401e6d854 Mon Sep 17 00:00:00 2001 From: allisonwang-db Date: Thu, 30 Nov 2023 15:34:52 -0800 Subject: [PATCH 08/10] address comments --- .../main/resources/error/error-classes.json | 4 ++-- ...t-create-data-source-table-error-class.md} | 6 +++--- docs/sql-error-conditions.md | 6 +++--- .../datasources/v2/DataSourceV2Relation.scala | 13 ++++++++---- .../datasources/v2/V2SessionCatalog.scala | 7 ++++--- .../sql/connector/DataSourceV2Suite.scala | 21 +++++++++++++++++-- .../SupportsCatalogOptionsSuite.scala | 2 +- .../command/PlanResolutionSuite.scala | 2 +- 8 files changed, 42 insertions(+), 19 deletions(-) rename docs/{sql-error-conditions-cannot-create-data-source-v2-table-error-class.md => sql-error-conditions-cannot-create-data-source-table-error-class.md} (87%) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index f987a309c4a3..286985b7c76b 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -149,9 +149,9 @@ ], "sqlState" : "42846" }, - "CANNOT_CREATE_DATA_SOURCE_V2_TABLE" : { + "CANNOT_CREATE_DATA_SOURCE_TABLE" : { "message" : [ - "Failed to create data source V2 table:" + "Failed to create data source table :" ], "subClass" : { "EXTERNAL_METADATA_UNSUPPORTED" : { diff --git a/docs/sql-error-conditions-cannot-create-data-source-v2-table-error-class.md b/docs/sql-error-conditions-cannot-create-data-source-table-error-class.md similarity index 87% rename from docs/sql-error-conditions-cannot-create-data-source-v2-table-error-class.md rename to docs/sql-error-conditions-cannot-create-data-source-table-error-class.md index b240efd717ef..f52e4f462bc9 100644 --- a/docs/sql-error-conditions-cannot-create-data-source-v2-table-error-class.md +++ b/docs/sql-error-conditions-cannot-create-data-source-table-error-class.md @@ -1,7 +1,7 @@ --- layout: global -title: CANNOT_CREATE_DATA_SOURCE_V2_TABLE error class -displayTitle: CANNOT_CREATE_DATA_SOURCE_V2_TABLE error class +title: CANNOT_CREATE_DATA_SOURCE_TABLE error class +displayTitle: CANNOT_CREATE_DATA_SOURCE_TABLE error class license: | Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with @@ -21,7 +21,7 @@ license: | [SQLSTATE: 42KDE](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) -Failed to create data source V2 table: +Failed to create data source table ``: This error class has the following derived error classes: diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index 8697b3a37e39..5d9f41d15720 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -163,13 +163,13 @@ Cannot convert SQL `` to Protobuf `` because schema i Cannot convert SQL `` to Protobuf `` because `` is not in defined values for enum: ``. -### [CANNOT_CREATE_DATA_SOURCE_V2_TABLE](sql-error-conditions-cannot-create-data-source-v2-table-error-class.html) +### [CANNOT_CREATE_DATA_SOURCE_TABLE](sql-error-conditions-cannot-create-data-source-table-error-class.html) [SQLSTATE: 42KDE](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) -Failed to create data source V2 table: +Failed to create data source table ``: -For more details see [CANNOT_CREATE_DATA_SOURCE_V2_TABLE](sql-error-conditions-cannot-create-data-source-v2-table-error-class.html) +For more details see [CANNOT_CREATE_DATA_SOURCE_TABLE](sql-error-conditions-cannot-create-data-source-table-error-class.html) ### CANNOT_DECODE_URL diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 0d44211610bd..a84722785dff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelat import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Expression, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, ExposesMetadataColumns, Histogram, HistogramBin, LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes -import org.apache.spark.sql.catalyst.util.{quoteIdentifier, truncatedString, CharVarcharUtils} +import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, truncatedString, CharVarcharUtils} import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, SupportsMetadataColumns, Table, TableCapability} import org.apache.spark.sql.connector.read.{Scan, Statistics => V2Statistics, SupportsReportStatistics} import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream} @@ -64,9 +64,14 @@ case class DataSourceV2Relation( override def name: String = { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ (catalog, identifier) match { - case (Some(cat), Some(ident)) => s"${quoteIdentifier(cat.name())}.${ident.quoted}" - case (None, Some(ident)) => ident.quoted - case _ => table.name() + case (Some(cat), Some(ident)) => s"${quoteIfNeeded(cat.name())}.${ident.quoted}" + case (None, None) => table.name() + case _ => + throw new IllegalArgumentException( + "Invalid catalog and identifier pair. Both 'catalog' and 'identifier' must be " + + s"specified or leave as None. Current input - " + + s"catalog: '${catalog.map(_.name()).getOrElse(None)}', " + + s"identifier: ${identifier.map(_.quoted).getOrElse(None)}.") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 399929d34d26..3d9972deb769 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -154,7 +154,7 @@ class V2SessionCatalog(catalog: SessionCatalog) schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): Table = { - import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TransformHelper + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, conf.defaultDataSourceName) val (newSchema, newPartitions) = DataSourceV2Utils.getTableProvider(provider, conf) match { @@ -164,8 +164,8 @@ class V2SessionCatalog(catalog: SessionCatalog) case Some(p) if !p.supportsExternalMetadata() => if (schema.nonEmpty) { throw new SparkUnsupportedOperationException( - errorClass = "CANNOT_CREATE_DATA_SOURCE_V2_TABLE.EXTERNAL_METADATA_UNSUPPORTED", - messageParameters = Map("provider" -> provider)) + errorClass = "CANNOT_CREATE_DATA_SOURCE_TABLE.EXTERNAL_METADATA_UNSUPPORTED", + messageParameters = Map("tableName" -> ident.quoted, "provider" -> provider)) } // V2CreateTablePlan does not allow non-empty partitions when schema is empty. This // is checked in `PreProcessTableCreation` rule. @@ -188,6 +188,7 @@ class V2SessionCatalog(catalog: SessionCatalog) } case _ => + // The provider is not a V2 provider so we return the schema and partitions as is. (schema, partitions) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index 202e9132c7ad..648cada654ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -650,8 +650,8 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS exception = intercept[SparkUnsupportedOperationException] { sql(s"CREATE TABLE test(a INT, b INT) USING ${cls.getName}") }, - errorClass = "CANNOT_CREATE_DATA_SOURCE_V2_TABLE.EXTERNAL_METADATA_UNSUPPORTED", - parameters = Map("provider" -> cls.getName) + errorClass = "CANNOT_CREATE_DATA_SOURCE_TABLE.EXTERNAL_METADATA_UNSUPPORTED", + parameters = Map("tableName" -> "default.test", "provider" -> cls.getName) ) } } @@ -685,6 +685,17 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS } } + test("SPARK-46043: create table in SQL with partitioning required data source") { + val cls = classOf[PartitionsRequiredDataSource] + val e = intercept[IllegalArgumentException]( + sql(s"CREATE TABLE test(a INT) USING ${cls.getName}")) + assert(e.getMessage.contains("user-supplied partitioning")) + withTable("test") { + sql(s"CREATE TABLE test(i INT, j INT) USING ${cls.getName} PARTITIONED BY (i)") + checkAnswer(sql(s"SELECT * FROM test"), Seq(Row(0, 0), Row(1, -1))) + } + } + test("SPARK-46043: create table in SQL with path option") { val cls = classOf[SupportsExternalMetadataDataSource] withTempDir { dir => @@ -1017,6 +1028,12 @@ class SchemaRequiredDataSource extends TableProvider { } } +class PartitionsRequiredDataSource extends SchemaRequiredDataSource { + override def inferPartitioning(options: CaseInsensitiveStringMap): Array[Transform] = { + throw new IllegalArgumentException("requires user-supplied partitioning") + } +} + class ColumnarDataSourceV2 extends TestingV2Source { class MyScanBuilder extends SimpleScanBuilder { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala index 891147bd6f6a..fd4f719417e4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala @@ -192,7 +192,7 @@ class SupportsCatalogOptionsSuite extends QueryTest with SharedSparkSession with } test("fail on user specified schema when reading - session catalog") { - sql(s"create table t1 (id bigint) using $format options ('name'='t1')") + sql(s"create table t1 (id bigint) using $format") val e = intercept[IllegalArgumentException] { spark.read.format(format).option("name", "t1").schema("id bigint").load() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index e39cc91d5f04..69b3285fc7f1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -1275,7 +1275,7 @@ class PlanResolutionSuite extends AnalysisTest { }, errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", parameters = Map( - "tableName" -> "`tab2`", + "tableName" -> "`testcat`.`tab2`", "tableColumns" -> "`i`, `x`", "dataColumns" -> "`col1`") ) From 75e368b0b6fcfb1576ea4a3401e7f8522b032cfc Mon Sep 17 00:00:00 2001 From: allisonwang-db Date: Fri, 1 Dec 2023 22:27:11 -0800 Subject: [PATCH 09/10] address comments --- .../sql/catalyst/util/QuotingUtils.scala | 8 +++ .../catalog/CatalogV2Implicits.scala | 6 ++ .../connector/SimpleTableProvider.scala | 1 + .../datasources/v2/V2SessionCatalog.scala | 42 +++++++------ .../sql/connector/DataSourceV2SQLSuite.scala | 23 ------- .../sql/connector/DataSourceV2Suite.scala | 2 +- .../spark/sql/connector/FakeV2Provider.scala | 63 +++++++++++++++++++ .../sql/connector/V1WriteFallbackSuite.scala | 5 +- 8 files changed, 104 insertions(+), 46 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/QuotingUtils.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/QuotingUtils.scala index 43af533b85a4..97d110f7ffc5 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/QuotingUtils.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/QuotingUtils.scala @@ -61,6 +61,14 @@ object QuotingUtils { } } + def fullyQuoted(ident: Identifier): String = { + if (ident.namespace.nonEmpty) { + ident.namespace.map(quoteIdentifier).mkString(".") + "." + quoteIdentifier(ident.name) + } else { + quoteIdentifier(ident.name) + } + } + def escapeSingleQuotedString(str: String): String = { val builder = new StringBuilder diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala index 133011ad9fac..2b712241633b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala @@ -144,10 +144,16 @@ private[sql] object CatalogV2Implicits { } implicit class IdentifierHelper(ident: Identifier) { + /* Quote the identifier if needed. */ def quoted: String = { QuotingUtils.quoted(ident) } + /* Always quote the identifier. */ + def fullyQuoted: String = { + QuotingUtils.fullyQuoted(ident) + } + def original: String = ident.namespace() :+ ident.name() mkString "." def asMultipartIdentifier: Seq[String] = (ident.namespace :+ ident.name).toImmutableArraySeq diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SimpleTableProvider.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SimpleTableProvider.scala index 0e9072ddf510..f8b237195fa8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SimpleTableProvider.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SimpleTableProvider.scala @@ -45,6 +45,7 @@ trait SimpleTableProvider extends TableProvider { schema: StructType, partitioning: Array[Transform], properties: util.Map[String, String]): Table = { + assert(partitioning.isEmpty) getOrLoadTable(new CaseInsensitiveStringMap(properties)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 3d9972deb769..3b34d6a8f8c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -26,7 +26,7 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException} -import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType, CatalogUtils, ClusterBySpec, SessionCatalog} +import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils, ClusterBySpec, SessionCatalog} import org.apache.spark.sql.catalyst.util.TypeUtils._ import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Column, FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableCatalogCapability, TableChange, V1Table} import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty @@ -72,6 +72,15 @@ class V2SessionCatalog(catalog: SessionCatalog) } } + // Get data source options from the catalog table properties with the path option. + private def getDataSourceOptions( + properties: Map[String, String], + storage: CatalogStorageFormat): CaseInsensitiveStringMap = { + val propertiesWithPath = properties ++ + storage.locationUri.map("path" -> CatalogUtils.URIToString(_)) + new CaseInsensitiveStringMap(propertiesWithPath.asJava) + } + override def loadTable(ident: Identifier): Table = { try { val table = catalog.getTableMetadata(ident.asTableIdentifier) @@ -80,10 +89,7 @@ class V2SessionCatalog(catalog: SessionCatalog) case Some(provider) => // Get the table properties during creation and append the path option // to the properties. - val tableProperties = table.properties - val properties = tableProperties ++ - table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_)) - val dsOptions = new CaseInsensitiveStringMap(properties.asJava) + val dsOptions = getDataSourceOptions(table.properties, table.storage) // If the source accepts external table metadata, we can pass the schema and // partitioning information stored in Hive to `getTable` to avoid expensive // schema/partitioning inference. @@ -156,6 +162,16 @@ class V2SessionCatalog(catalog: SessionCatalog) properties: util.Map[String, String]): Table = { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, conf.defaultDataSourceName) + val tableProperties = properties.asScala.toMap + val location = Option(properties.get(TableCatalog.PROP_LOCATION)) + val storage = DataSource.buildStorageFormatFromOptions(toOptions(tableProperties)) + .copy(locationUri = location.map(CatalogUtils.stringToURI)) + val isExternal = properties.containsKey(TableCatalog.PROP_EXTERNAL) + val tableType = if (isExternal || location.isDefined) { + CatalogTableType.EXTERNAL + } else { + CatalogTableType.MANAGED + } val (newSchema, newPartitions) = DataSourceV2Utils.getTableProvider(provider, conf) match { // If the provider does not support external metadata, users should not be allowed to @@ -165,7 +181,7 @@ class V2SessionCatalog(catalog: SessionCatalog) if (schema.nonEmpty) { throw new SparkUnsupportedOperationException( errorClass = "CANNOT_CREATE_DATA_SOURCE_TABLE.EXTERNAL_METADATA_UNSUPPORTED", - messageParameters = Map("tableName" -> ident.quoted, "provider" -> provider)) + messageParameters = Map("tableName" -> ident.fullyQuoted, "provider" -> provider)) } // V2CreateTablePlan does not allow non-empty partitions when schema is empty. This // is checked in `PreProcessTableCreation` rule. @@ -175,7 +191,7 @@ class V2SessionCatalog(catalog: SessionCatalog) case Some(tableProvider) => assert(tableProvider.supportsExternalMetadata()) - lazy val dsOptions = new CaseInsensitiveStringMap(properties) + lazy val dsOptions = getDataSourceOptions(tableProperties, storage) if (schema.isEmpty) { assert(partitions.isEmpty, s"Partitions should be empty when the schema is empty: ${partitions.mkString(", ")}") @@ -194,16 +210,6 @@ class V2SessionCatalog(catalog: SessionCatalog) val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) = newPartitions.toImmutableArraySeq.convertTransforms - val tableProperties = properties.asScala - val location = Option(properties.get(TableCatalog.PROP_LOCATION)) - val storage = DataSource.buildStorageFormatFromOptions(toOptions(tableProperties.toMap)) - .copy(locationUri = location.map(CatalogUtils.stringToURI)) - val isExternal = properties.containsKey(TableCatalog.PROP_EXTERNAL) - val tableType = if (isExternal || location.isDefined) { - CatalogTableType.EXTERNAL - } else { - CatalogTableType.MANAGED - } val tableDesc = CatalogTable( identifier = ident.asTableIdentifier, @@ -213,7 +219,7 @@ class V2SessionCatalog(catalog: SessionCatalog) provider = Some(provider), partitionColumnNames = partitionColumns, bucketSpec = maybeBucketSpec, - properties = tableProperties.toMap ++ + properties = tableProperties ++ maybeClusterBySpec.map( clusterBySpec => ClusterBySpec.toProperty(newSchema, clusterBySpec, conf.resolver)), tracksPartitionsInCatalog = conf.manageFilesourcePartitions, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 4d0d90fe6523..adc2039b8ce5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -37,7 +37,6 @@ import org.apache.spark.sql.connector.catalog.{Column => ColumnV2, _} import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership import org.apache.spark.sql.connector.expressions.LiteralValue -import org.apache.spark.sql.connector.read.{InputPartition, ScanBuilder} import org.apache.spark.sql.errors.QueryErrorsBase import org.apache.spark.sql.execution.FilterExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -46,10 +45,8 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode, V2_SESSION_CATALOG_IMPLEMENTATION} -import org.apache.spark.sql.internal.connector.SimpleTableProvider import org.apache.spark.sql.sources.SimpleScanSource import org.apache.spark.sql.types.{LongType, StringType, StructType} -import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.unsafe.types.UTF8String abstract class DataSourceV2SQLSuite @@ -3365,26 +3362,6 @@ class DataSourceV2SQLSuiteV2Filter extends DataSourceV2SQLSuite { override protected val catalogAndNamespace = "testv2filter.ns1.ns2." } -/** Used as a V2 DataSource for V2SessionCatalog DDL */ -class FakeV2Provider extends SimpleTableProvider { - - override def supportsExternalMetadata(): Boolean = true - - class MyScanBuilder extends SimpleScanBuilder { - override def planInputPartitions(): Array[InputPartition] = { - Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10)) - } - } - - override def getTable(options: CaseInsensitiveStringMap): Table = { - new SimpleBatchTable { - override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { - new MyScanBuilder() - } - } - } -} - class ReserveSchemaNullabilityCatalog extends InMemoryCatalog { override def useNullableQuerySchema(): Boolean = false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index 648cada654ba..f2e518e8acc4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -651,7 +651,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS sql(s"CREATE TABLE test(a INT, b INT) USING ${cls.getName}") }, errorClass = "CANNOT_CREATE_DATA_SOURCE_TABLE.EXTERNAL_METADATA_UNSUPPORTED", - parameters = Map("tableName" -> "default.test", "provider" -> cls.getName) + parameters = Map("tableName" -> "`default`.`test`", "provider" -> cls.getName) ) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala new file mode 100644 index 000000000000..174700e8d24f --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import java.util + +import org.apache.spark.sql.connector.catalog.{Table, TableProvider} +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.connector.read.{InputPartition, ScanBuilder} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +/** Used as a V2 DataSource for V2SessionCatalog DDL */ +class FakeV2Provider extends TableProvider { + + // Supports external metadata so that users can specify schema and partitions + // when creating the table. + override def supportsExternalMetadata(): Boolean = true + + class MyScanBuilder extends SimpleScanBuilder { + override def planInputPartitions(): Array[InputPartition] = { + Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10)) + } + } + + override def inferSchema(options: CaseInsensitiveStringMap): StructType = { + FakeV2Provider.schema + } + + def getTable(options: CaseInsensitiveStringMap): Table = { + new SimpleBatchTable { + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + new MyScanBuilder() + } + } + } + + override def getTable( + schema: StructType, + partitioning: Array[Transform], + properties: util.Map[String, String]): Table = { + getTable(new CaseInsensitiveStringMap(properties)) + } +} + +object FakeV2Provider { + val schema: StructType = new StructType().add("i", "int").add("j", "int") +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala index a150298534b9..181dc0ea2074 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala @@ -36,7 +36,6 @@ import org.apache.spark.sql.connector.write.{LogicalWriteInfo, LogicalWriteInfoI import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.functions.lit import org.apache.spark.sql.internal.SQLConf.{OPTIMIZER_MAX_ITERATIONS, V2_SESSION_CATALOG_IMPLEMENTATION} -import org.apache.spark.sql.internal.connector.SimpleTableProvider import org.apache.spark.sql.sources._ import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.StructType @@ -250,11 +249,9 @@ private object InMemoryV1Provider { } class InMemoryV1Provider - extends SimpleTableProvider + extends FakeV2Provider with DataSourceRegister with CreatableRelationProvider { - override def supportsExternalMetadata(): Boolean = true - override def getTable(options: CaseInsensitiveStringMap): Table = { InMemoryV1Provider.tables.getOrElse(options.get("name"), { From 422d65efe7b541557085373396371ff2f0a9890c Mon Sep 17 00:00:00 2001 From: allisonwang-db Date: Mon, 4 Dec 2023 09:17:33 -0800 Subject: [PATCH 10/10] address comments --- .../sql/execution/datasources/v2/DataSourceV2Relation.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index a84722785dff..573b0274e958 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources.v2 +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Expression, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, ExposesMetadataColumns, Histogram, HistogramBin, LeafNode, LogicalPlan, Statistics} @@ -67,7 +68,7 @@ case class DataSourceV2Relation( case (Some(cat), Some(ident)) => s"${quoteIfNeeded(cat.name())}.${ident.quoted}" case (None, None) => table.name() case _ => - throw new IllegalArgumentException( + throw SparkException.internalError( "Invalid catalog and identifier pair. Both 'catalog' and 'identifier' must be " + s"specified or leave as None. Current input - " + s"catalog: '${catalog.map(_.name()).getOrElse(None)}', " +