Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL, ResolveDefaultColumns => DefaultCols}
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, LookupCatalog, SupportsNamespaces, V1Table}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, LookupCatalog, SupportsNamespaces, V1Table}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command._
Expand Down Expand Up @@ -68,7 +68,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
throw QueryCompilationErrors.unsupportedTableOperationError(ident, "REPLACE COLUMNS")

case a @ AlterColumn(ResolvedTable(catalog, ident, table: V1Table, _), _, _, _, _, _, _)
if isSessionCatalog(catalog) =>
if supportsV1Command(catalog) =>
if (a.column.name.length > 1) {
throw QueryCompilationErrors.unsupportedTableOperationError(
catalog, ident, "ALTER COLUMN with qualified column")
Expand Down Expand Up @@ -102,7 +102,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
AlterTableChangeColumnCommand(table.catalogTable.identifier, colName, newColumn)

case AlterTableClusterBy(ResolvedTable(catalog, _, table: V1Table, _), clusterBySpecOpt)
if isSessionCatalog(catalog) =>
if supportsV1Command(catalog) =>
val prop = Map(ClusterBySpec.toProperty(table.schema,
clusterBySpecOpt.getOrElse(ClusterBySpec(Nil)), conf.resolver))
AlterTableSetPropertiesCommand(table.catalogTable.identifier, prop, isView = false)
Expand All @@ -125,13 +125,13 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case UnsetViewProperties(ResolvedViewIdentifier(ident), keys, ifExists) =>
AlterTableUnsetPropertiesCommand(ident, keys, ifExists, isView = true)

case DescribeNamespace(DatabaseInSessionCatalog(db), extended, output) if conf.useV1Command =>
case DescribeNamespace(ResolvedV1Database(db), extended, output) if conf.useV1Command =>
DescribeDatabaseCommand(db, extended, output)

case SetNamespaceProperties(DatabaseInSessionCatalog(db), properties) if conf.useV1Command =>
case SetNamespaceProperties(ResolvedV1Database(db), properties) if conf.useV1Command =>
AlterDatabasePropertiesCommand(db, properties)

case SetNamespaceLocation(DatabaseInSessionCatalog(db), location) if conf.useV1Command =>
case SetNamespaceLocation(ResolvedV1Database(db), location) if conf.useV1Command =>
AlterDatabaseSetLocationCommand(db, location)

case RenameTable(ResolvedV1TableOrViewIdentifier(oldIdent), newName, isView) =>
Expand Down Expand Up @@ -221,7 +221,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case DropTable(ResolvedIdentifier(FakeSystemCatalog, ident), _, _) =>
DropTempViewCommand(ident)

case DropView(ResolvedV1Identifier(ident), ifExists) =>
case DropView(ResolvedIdentifierInSessionCatalog(ident), ifExists) =>
DropTableCommand(ident, ifExists, isView = true, purge = false)

case DropView(r @ ResolvedIdentifier(catalog, ident), _) =>
Expand All @@ -237,14 +237,14 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
val newProperties = c.properties -- CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES
CreateDatabaseCommand(name, c.ifNotExists, location, comment, newProperties)

case d @ DropNamespace(DatabaseInSessionCatalog(db), _, _) if conf.useV1Command =>
case d @ DropNamespace(ResolvedV1Database(db), _, _) if conf.useV1Command =>
DropDatabaseCommand(db, d.ifExists, d.cascade)

case ShowTables(DatabaseInSessionCatalog(db), pattern, output) if conf.useV1Command =>
case ShowTables(ResolvedV1Database(db), pattern, output) if conf.useV1Command =>
ShowTablesCommand(Some(db), pattern, output)

case ShowTablesExtended(
DatabaseInSessionCatalog(db),
ResolvedV1Database(db),
pattern,
output) =>
val newOutput = if (conf.getConf(SQLConf.LEGACY_KEEP_COMMAND_OUTPUT_SCHEMA)) {
Expand All @@ -257,7 +257,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case ShowTablePartition(
ResolvedTable(catalog, _, table: V1Table, _),
partitionSpec,
output) if isSessionCatalog(catalog) =>
output) if supportsV1Command(catalog) =>
val newOutput = if (conf.getConf(SQLConf.LEGACY_KEEP_COMMAND_OUTPUT_SCHEMA)) {
output.head.withName("database") +: output.tail
} else {
Expand All @@ -277,7 +277,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
AnalyzePartitionCommand(ident, partitionSpec, noScan)
}

case AnalyzeTables(DatabaseInSessionCatalog(db), noScan) =>
case AnalyzeTables(ResolvedV1Database(db), noScan) =>
AnalyzeTablesCommand(Some(db), noScan)

case AnalyzeColumn(ResolvedV1TableOrViewIdentifier(ident), columnNames, allColumns) =>
Expand Down Expand Up @@ -305,7 +305,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
if conf.useV1Command => ShowCreateTableCommand(ident, output)

case ShowCreateTable(ResolvedTable(catalog, _, table: V1Table, _), _, output)
if isSessionCatalog(catalog) && DDLUtils.isHiveTable(table.catalogTable) =>
if supportsV1Command(catalog) && DDLUtils.isHiveTable(table.catalogTable) =>
ShowCreateTableCommand(table.catalogTable.identifier, output)

case TruncateTable(ResolvedV1TableIdentifier(ident)) =>
Expand Down Expand Up @@ -382,7 +382,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case AlterViewSchemaBinding(ResolvedViewIdentifier(ident), viewSchemaMode) =>
AlterViewSchemaBindingCommand(ident, viewSchemaMode)

case CreateView(ResolvedV1Identifier(ident), userSpecifiedColumns, comment,
case CreateView(ResolvedIdentifierInSessionCatalog(ident), userSpecifiedColumns, comment,
properties, originalText, child, allowExisting, replace, viewSchemaMode) =>
CreateViewCommand(
name = ident,
Expand All @@ -401,7 +401,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)

case ShowViews(ns: ResolvedNamespace, pattern, output) =>
ns match {
case DatabaseInSessionCatalog(db) => ShowViewsCommand(db, pattern, output)
case ResolvedDatabaseInSessionCatalog(db) => ShowViewsCommand(db, pattern, output)
case _ =>
throw QueryCompilationErrors.missingCatalogAbilityError(ns.catalog, "views")
}
Expand All @@ -424,7 +424,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "functions")
}

case ShowFunctions(DatabaseInSessionCatalog(db), userScope, systemScope, pattern, output) =>
case ShowFunctions(
ResolvedDatabaseInSessionCatalog(db), userScope, systemScope, pattern, output) =>
ShowFunctionsCommand(db, pattern, userScope, systemScope, output)

case DropFunction(ResolvedPersistentFunc(catalog, identifier, _), ifExists) =>
Expand All @@ -445,7 +446,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "REFRESH FUNCTION")
}

case CreateFunction(ResolvedV1Identifier(ident), className, resources, ifExists, replace) =>
case CreateFunction(
ResolvedIdentifierInSessionCatalog(ident), className, resources, ifExists, replace) =>
CreateFunctionCommand(
FunctionIdentifier(ident.table, ident.database, ident.catalog),
className,
Expand Down Expand Up @@ -583,7 +585,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)

object ResolvedV1TableIdentifier {
def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match {
case ResolvedTable(catalog, _, t: V1Table, _) if isSessionCatalog(catalog) =>
case ResolvedTable(catalog, _, t: V1Table, _) if supportsV1Command(catalog) =>
Some(t.catalogTable.identifier)
case _ => None
}
Expand All @@ -598,6 +600,19 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
}

object ResolvedV1Identifier {
def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match {
case ResolvedIdentifier(catalog, ident) if supportsV1Command(catalog) =>
if (ident.namespace().length != 1) {
throw QueryCompilationErrors
.requiresSinglePartNamespaceError(ident.namespace().toImmutableArraySeq)
}
Some(TableIdentifier(ident.name, Some(ident.namespace.head), Some(catalog.name)))
case _ => None
}
}

// Use this object to help match commands that do not have a v2 implementation.
object ResolvedIdentifierInSessionCatalog{
def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match {
case ResolvedIdentifier(catalog, ident) if isSessionCatalog(catalog) =>
if (ident.namespace().length != 1) {
Expand All @@ -622,7 +637,21 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
DataSourceV2Utils.getTableProvider(provider, conf).isDefined
}

private object DatabaseInSessionCatalog {
private object ResolvedV1Database {
def unapply(resolved: ResolvedNamespace): Option[String] = resolved match {
case ResolvedNamespace(catalog, _, _) if !supportsV1Command(catalog) => None
case ResolvedNamespace(_, Seq(), _) =>
throw QueryCompilationErrors.databaseFromV1SessionCatalogNotSpecifiedError()
case ResolvedNamespace(_, Seq(dbName), _) => Some(dbName)
case _ =>
assert(resolved.namespace.length > 1)
throw QueryCompilationErrors.nestedDatabaseUnsupportedByV1SessionCatalogError(
resolved.namespace.map(quoteIfNeeded).mkString("."))
}
}

// Use this object to help match commands that do not have a v2 implementation.
private object ResolvedDatabaseInSessionCatalog {
def unapply(resolved: ResolvedNamespace): Option[String] = resolved match {
case ResolvedNamespace(catalog, _, _) if !isSessionCatalog(catalog) => None
case ResolvedNamespace(_, Seq(), _) =>
Expand All @@ -637,11 +666,16 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)

private object DatabaseNameInSessionCatalog {
def unapply(resolved: ResolvedNamespace): Option[String] = resolved match {
case ResolvedNamespace(catalog, _, _) if !isSessionCatalog(catalog) => None
case ResolvedNamespace(catalog, _, _) if !supportsV1Command(catalog) => None
case ResolvedNamespace(_, Seq(dbName), _) => Some(dbName)
case _ =>
assert(resolved.namespace.length > 1)
throw QueryCompilationErrors.requiresSinglePartNamespaceError(resolved.namespace)
}
}

private def supportsV1Command(catalog: CatalogPlugin): Boolean = {
catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME) &&
!SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
import DataSourceV2Implicits._
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

private def hadoopConf = session.sessionState.newHadoopConf()

private def refreshCache(r: DataSourceV2Relation)(): Unit = {
session.sharedState.cacheManager.recacheByPlan(session, r)
}
Expand Down Expand Up @@ -87,7 +89,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
}

private def qualifyLocInTableSpec(tableSpec: TableSpec): TableSpec = {
tableSpec.withNewLocation(tableSpec.location.map(makeQualifiedDBObjectPath(_)))
tableSpec.withNewLocation(tableSpec.location.map(loc => CatalogUtils.makeQualifiedPath(
CatalogUtils.stringToURI(loc), hadoopConf).toString))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should follow v1 command code path and call CatalogUtils.URIToString to get the path string.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixing at #47759

}

override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAg
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.internal.{SqlApiConf, SQLConf}
import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION
import org.apache.spark.sql.types.{ArrayType, MapType, StringType, StructField, StructType}

class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
Expand Down Expand Up @@ -157,6 +158,7 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
}

test("disable bucketing on collated string column") {
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
def createTable(bucketColumns: String*): Unit = {
val tableName = "test_partition_tbl"
withTable(tableName) {
Expand Down Expand Up @@ -758,6 +760,7 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
}

test("disable partition on collated string column") {
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
def createTable(partitionColumns: String*): Unit = {
val tableName = "test_partition_tbl"
withTable(tableName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class DataSourceV2DataFrameSessionCatalogSuite
spark.range(20).write.format(v2Format).option("path", "/abc").saveAsTable(t1)
val cat = spark.sessionState.catalogManager.currentCatalog.asInstanceOf[TableCatalog]
val tableInfo = cat.loadTable(Identifier.of(Array("default"), t1))
assert(tableInfo.properties().get("location") === "file:/abc")
assert(tableInfo.properties().get("location") === "file:///abc")
assert(tableInfo.properties().get("provider") === v2Format)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ class DataSourceV2SQLSuiteV1Filter
val location = spark.sql(s"DESCRIBE EXTENDED $identifier")
.filter("col_name = 'Location'")
.select("data_type").head().getString(0)
assert(location === "file:/tmp/foo")
assert(location === "file:///tmp/foo")
}
}
}
Expand All @@ -458,7 +458,7 @@ class DataSourceV2SQLSuiteV1Filter
val location = spark.sql(s"DESCRIBE EXTENDED $identifier")
.filter("col_name = 'Location'")
.select("data_type").head().getString(0)
assert(location === "file:/tmp/foo")
assert(location === "file:///tmp/foo")
}
}
}
Expand Down Expand Up @@ -2104,15 +2104,10 @@ class DataSourceV2SQLSuiteV1Filter
}

test("REPLACE TABLE: v1 table") {
val e = intercept[AnalysisException] {
sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}")
}
checkError(
exception = e,
errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
sqlState = "0A000",
parameters = Map("tableName" -> "`spark_catalog`.`default`.`tbl`",
"operation" -> "REPLACE TABLE"))
sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}")
val v2Catalog = catalog("spark_catalog").asTableCatalog
val table = v2Catalog.loadTable(Identifier.of(Array("default"), "tbl"))
assert(table.properties().get(TableCatalog.PROP_PROVIDER) == classOf[SimpleScanSource].getName)
}

test("DeleteFrom: - delete with invalid predicate") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,22 @@ private[connector] trait TestV2SessionCatalogBase[T <: Table] extends Delegating
properties: java.util.Map[String, String]): Table = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
val key = TestV2SessionCatalogBase.SIMULATE_ALLOW_EXTERNAL_PROPERTY
val propsWithLocation = if (properties.containsKey(key)) {
val newProps = new java.util.HashMap[String, String]()
newProps.putAll(properties)
if (properties.containsKey(TableCatalog.PROP_LOCATION)) {
newProps.put(TableCatalog.PROP_EXTERNAL, "true")
}

val propsWithLocation = if (newProps.containsKey(key)) {
// Always set a location so that CREATE EXTERNAL TABLE won't fail with LOCATION not specified.
if (!properties.containsKey(TableCatalog.PROP_LOCATION)) {
val newProps = new java.util.HashMap[String, String]()
newProps.putAll(properties)
if (!newProps.containsKey(TableCatalog.PROP_LOCATION)) {
newProps.put(TableCatalog.PROP_LOCATION, "file:/abc")
newProps
} else {
properties
newProps
}
} else {
properties
newProps
}
super.createTable(ident, columns, partitions, propsWithLocation)
val schema = CatalogV2Util.v2ColumnsToStructType(columns)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class ShowCreateTableSuite extends command.ShowCreateTableSuiteBase with Command
"'via' = '2')",
"PARTITIONED BY (a)",
"COMMENT 'This is a comment'",
"LOCATION 'file:/tmp'",
"LOCATION 'file:///tmp'",
"TBLPROPERTIES (",
"'password' = '*********(redacted)',",
"'prop1' = '1',",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
assert(table.properties().get("comment").equals(description))
assert(table.properties().get("path").equals(dir.getAbsolutePath))
assert(table.properties().get("external").equals("true"))
assert(table.properties().get("location").equals("file:" + dir.getAbsolutePath))
assert(table.properties().get("location").equals("file://" + dir.getAbsolutePath))
}
}

Expand Down