Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,19 @@
],
"sqlState" : "42846"
},
"CANNOT_CREATE_DATA_SOURCE_V2_TABLE" : {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't find other errors that mention data source v2. I think it's a developer thing and we should not expose it to end users via error message. How about just CANNOT_CREATE_TABLE?

"message" : [
"Failed to create data source V2 table:"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we include the table name?

],
"subClass" : {
"EXTERNAL_METADATA_UNSUPPORTED" : {
"message" : [
"provider '<provider>' does not support external metadata but a schema is provided. Please remove the schema when creating the table."
]
}
},
"sqlState" : "42KDE"
},
"CANNOT_DECODE_URL" : {
"message" : [
"The provided URL cannot be decoded: <url>. Please ensure that the URL is properly formatted and try again."
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
---
layout: global
title: CANNOT_CREATE_DATA_SOURCE_V2_TABLE error class
displayTitle: CANNOT_CREATE_DATA_SOURCE_V2_TABLE error class
license: |
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---

[SQLSTATE: 42KDE](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

Failed to create data source V2 table:

This error class has the following derived error classes:

## EXTERNAL_METADATA_UNSUPPORTED

provider '`<provider>`' does not support external metadata but a schema is provided. Please remove the schema when creating the table.


8 changes: 8 additions & 0 deletions docs/sql-error-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,14 @@ Cannot convert SQL `<sqlColumn>` to Protobuf `<protobufColumn>` because schema i

Cannot convert SQL `<sqlColumn>` to Protobuf `<protobufColumn>` because `<data>` is not in defined values for enum: `<enumString>`.

### [CANNOT_CREATE_DATA_SOURCE_V2_TABLE](sql-error-conditions-cannot-create-data-source-v2-table-error-class.html)

[SQLSTATE: 42KDE](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

Failed to create data source V2 table:

For more details see [CANNOT_CREATE_DATA_SOURCE_V2_TABLE](sql-error-conditions-cannot-create-data-source-v2-table-error-class.html)

### CANNOT_DECODE_URL

[SQLSTATE: 22546](sql-error-conditions-sqlstates.html#class-22-data-exception)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelat
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Expression, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, ExposesMetadataColumns, Histogram, HistogramBin, LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.catalyst.util.{truncatedString, CharVarcharUtils}
import org.apache.spark.sql.catalyst.util.{quoteIdentifier, truncatedString, CharVarcharUtils}
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, SupportsMetadataColumns, Table, TableCapability}
import org.apache.spark.sql.connector.read.{Scan, Statistics => V2Statistics, SupportsReportStatistics}
import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
Expand Down Expand Up @@ -61,7 +61,14 @@ case class DataSourceV2Relation(
Nil
}

override def name: String = table.name()
override def name: String = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
(catalog, identifier) match {
case (Some(cat), Some(ident)) => s"${quoteIdentifier(cat.name())}.${ident.quoted}"
case (None, Some(ident)) => ident.quoted
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this can happen. We can add an assert.

case _ => table.name()
}
}

override def skipSchemaResolution: Boolean = table.supports(TableCapability.ACCEPT_ANY_SCHEMA)

Expand Down Expand Up @@ -127,7 +134,7 @@ case class DataSourceV2ScanRelation(
keyGroupedPartitioning: Option[Seq[Expression]] = None,
ordering: Option[Seq[SortOrder]] = None) extends LeafNode with NamedRelation {

override def name: String = relation.table.name()
override def name: String = relation.name

override def simpleString(maxFields: Int): String = {
s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ trait SimpleTableProvider extends TableProvider {
schema: StructType,
partitioning: Array[Transform],
properties: util.Map[String, String]): Table = {
assert(partitioning.isEmpty)
getOrLoadTable(new CaseInsensitiveStringMap(properties))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Lo
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1, DataSource}
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.internal.connector.V1Function
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
Expand Down Expand Up @@ -612,15 +612,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
}

private def isV2Provider(provider: String): Boolean = {
// Return earlier since `lookupDataSourceV2` may fail to resolve provider "hive" to
// `HiveFileFormat`, when running tests in sql/core.
if (DDLUtils.isHiveTable(Some(provider))) return false
DataSource.lookupDataSourceV2(provider, conf) match {
// TODO(SPARK-28396): Currently file source v2 can't work with tables.
case Some(_: FileDataSourceV2) => false
case Some(_) => true
case _ => false
}
DataSourceV2Utils.getTableProvider(provider, conf).isDefined
}

private object DatabaseInSessionCatalog {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SessionConfigSupport, SupportsCatalogOptions, SupportsRead, Table, TableProvider}
import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{LongType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -151,6 +153,20 @@ private[sql] object DataSourceV2Utils extends Logging {
}
}

/**
* Returns the table provider for the given format, or None if it cannot be found.
*/
def getTableProvider(provider: String, conf: SQLConf): Option[TableProvider] = {
// Return earlier since `lookupDataSourceV2` may fail to resolve provider "hive" to
// `HiveFileFormat`, when running tests in sql/core.
if (DDLUtils.isHiveTable(Some(provider))) return None
DataSource.lookupDataSourceV2(provider, conf) match {
// TODO(SPARK-28396): Currently file source v2 can't work with tables.
case Some(p) if !p.isInstanceOf[FileDataSourceV2] => Some(p)
case _ => None
}
}

private lazy val objectMapper = new ObjectMapper()
def getOptionsWithPaths(
extraOptions: CaseInsensitiveMap[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ import java.util
import scala.collection.mutable
import scala.jdk.CollectionConverters._

import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType, CatalogUtils, ClusterBySpec, SessionCatalog}
import org.apache.spark.sql.catalyst.util.TypeUtils._
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Column, FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableCatalogCapability, TableChange, V1Table}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Column, FunctionCatalog, Identifier, NamespaceChange, SupportsCatalogOptions, SupportsNamespaces, Table, TableCatalog, TableCatalogCapability, TableChange, V1Table}
import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.connector.expressions.Transform
Expand Down Expand Up @@ -73,7 +74,45 @@ class V2SessionCatalog(catalog: SessionCatalog)

override def loadTable(ident: Identifier): Table = {
try {
V1Table(catalog.getTableMetadata(ident.asTableIdentifier))
val table = catalog.getTableMetadata(ident.asTableIdentifier)
if (table.provider.isDefined) {
DataSourceV2Utils.getTableProvider(table.provider.get, conf) match {
case Some(provider) =>
// Get the table properties during creation and append the path option
// to the properties.
val tableProperties = table.properties
val properties = tableProperties ++
table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
val dsOptions = new CaseInsensitiveStringMap(properties.asJava)

provider match {
case _: SupportsCatalogOptions =>
// Currently, loadTable cannot support V2 provider that implements the
// SupportsCatalogOptions. Keep the behavior as before.
V1Table(table)

case _ =>
// If the source accepts external table metadata, we can pass the schema and
// partitioning information stored in Hive to `getTable` to avoid expensive
// schema/partitioning inference.
if (provider.supportsExternalMetadata()) {
provider.getTable(
table.schema,
getV2Partitioning(table),
dsOptions.asCaseSensitiveMap())
} else {
provider.getTable(
provider.inferSchema(dsOptions),
provider.inferPartitioning(dsOptions),
dsOptions.asCaseSensitiveMap())
}
}
case _ =>
V1Table(table)
}
} else {
V1Table(table)
}
} catch {
case _: NoSuchDatabaseException =>
throw QueryCompilationErrors.noSuchTableError(ident)
Expand All @@ -96,6 +135,16 @@ class V2SessionCatalog(catalog: SessionCatalog)
throw QueryCompilationErrors.timeTravelUnsupportedError(toSQLId(nameParts))
}

private def getV2Partitioning(table: CatalogTable): Array[Transform] = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
val v2Partitioning = table.partitionColumnNames.asTransforms
val v2Bucketing = table.bucketSpec.map(
spec => Array(spec.asTransform)).getOrElse(Array.empty)
val v2Clustering = table.clusterBySpec.map(
spec => Array(spec.asTransform)).getOrElse(Array.empty)
v2Partitioning ++ v2Bucketing ++ v2Clustering
}

override def invalidateTable(ident: Identifier): Unit = {
catalog.refreshTable(ident.asTableIdentifier)
}
Expand All @@ -115,9 +164,44 @@ class V2SessionCatalog(catalog: SessionCatalog)
partitions: Array[Transform],
properties: util.Map[String, String]): Table = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TransformHelper
val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) =
partitions.toImmutableArraySeq.convertTransforms
val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, conf.defaultDataSourceName)

val (newSchema, newPartitions) = DataSourceV2Utils.getTableProvider(provider, conf) match {
// If the provider does not support external metadata, users should not be allowed to
// specify custom schema when creating the data source table, since the schema will not
// be used when loading the table.
case Some(p) if !p.supportsExternalMetadata() =>
if (schema.nonEmpty) {
throw new SparkUnsupportedOperationException(
errorClass = "CANNOT_CREATE_DATA_SOURCE_V2_TABLE.EXTERNAL_METADATA_UNSUPPORTED",
messageParameters = Map("provider" -> provider))
}
// V2CreateTablePlan does not allow non-empty partitions when schema is empty. This
// is checked in `PreProcessTableCreation` rule.
assert(partitions.isEmpty,
s"Partitions should be empty when the schema is empty: ${partitions.mkString(", ")}")
(schema, partitions)

case Some(tableProvider) =>
assert(tableProvider.supportsExternalMetadata())
lazy val dsOptions = new CaseInsensitiveStringMap(properties)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to put the path option?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can add a new method to create ds options from a CatalogTable, to save duplicated code.

if (schema.isEmpty) {
assert(partitions.isEmpty,
s"Partitions should be empty when the schema is empty: ${partitions.mkString(", ")}")
// Infer the schema and partitions and store them in the catalog.
(tableProvider.inferSchema(dsOptions), tableProvider.inferPartitioning(dsOptions))
} else if (partitions.isEmpty) {
(schema, tableProvider.inferPartitioning(dsOptions))
} else {
(schema, partitions)
}

case _ =>
(schema, partitions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we fail here if it's not a valid data source?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can do it latter. It's the current behavior that allows any table provider.

}

val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) =
newPartitions.toImmutableArraySeq.convertTransforms
val tableProperties = properties.asScala
val location = Option(properties.get(TableCatalog.PROP_LOCATION))
val storage = DataSource.buildStorageFormatFromOptions(toOptions(tableProperties.toMap))
Expand All @@ -133,13 +217,13 @@ class V2SessionCatalog(catalog: SessionCatalog)
identifier = ident.asTableIdentifier,
tableType = tableType,
storage = storage,
schema = schema,
schema = newSchema,
provider = Some(provider),
partitionColumnNames = partitionColumns,
bucketSpec = maybeBucketSpec,
properties = tableProperties.toMap ++
maybeClusterBySpec.map(
clusterBySpec => ClusterBySpec.toProperty(schema, clusterBySpec, conf.resolver)),
clusterBySpec => ClusterBySpec.toProperty(newSchema, clusterBySpec, conf.resolver)),
tracksPartitionsInCatalog = conf.manageFilesourcePartitions,
comment = Option(properties.get(TableCatalog.PROP_COMMENT)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ public StructType readSchema() {

@Override
public InputPartition[] planInputPartitions() {
return new InputPartition[0];
InputPartition[] partitions = new InputPartition[1];
partitions[0] = new JavaRangeInputPartition(0, 2);
return partitions;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.sql.connector.catalog.{Column => ColumnV2, _}
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership
import org.apache.spark.sql.connector.expressions.LiteralValue
import org.apache.spark.sql.connector.read.{InputPartition, ScanBuilder}
import org.apache.spark.sql.errors.QueryErrorsBase
import org.apache.spark.sql.execution.FilterExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
Expand Down Expand Up @@ -1120,7 +1121,7 @@ class DataSourceV2SQLSuiteV1Filter
},
errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
parameters = Map(
"tableName" -> "`default`.`tbl`",
"tableName" -> "`spark_catalog`.`default`.`tbl`",
"tableColumns" -> "`id`, `data`",
"dataColumns" -> "`col1`"
)
Expand Down Expand Up @@ -1155,7 +1156,7 @@ class DataSourceV2SQLSuiteV1Filter
},
errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
parameters = Map(
"tableName" -> "`default`.`tbl`",
"tableName" -> "`spark_catalog`.`default`.`tbl`",
"tableColumns" -> "`id`, `data`",
"dataColumns" -> "`col1`"
)
Expand Down Expand Up @@ -1191,7 +1192,7 @@ class DataSourceV2SQLSuiteV1Filter
},
errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
parameters = Map(
"tableName" -> "`default`.`tbl`",
"tableName" -> "`spark_catalog`.`default`.`tbl`",
"tableColumns" -> "`id`, `data`, `data2`",
"dataColumns" -> "`col1`, `col2`"
)
Expand Down Expand Up @@ -3366,8 +3367,21 @@ class DataSourceV2SQLSuiteV2Filter extends DataSourceV2SQLSuite {

/** Used as a V2 DataSource for V2SessionCatalog DDL */
class FakeV2Provider extends SimpleTableProvider {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid extending SimpleTableProvider here? I think it's not meant to support external metadata.


override def supportsExternalMetadata(): Boolean = true

class MyScanBuilder extends SimpleScanBuilder {
override def planInputPartitions(): Array[InputPartition] = {
Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10))
}
}

override def getTable(options: CaseInsensitiveStringMap): Table = {
throw new UnsupportedOperationException("Unnecessary for DDL tests")
new SimpleBatchTable {
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
new MyScanBuilder()
}
}
}
}

Expand Down
Loading