diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index a460634ad8a80..910b9178e4151 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL, ResolveDefaultColumns => DefaultCols} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, LookupCatalog, SupportsNamespaces, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, LookupCatalog, SupportsNamespaces, V1Table} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.command._ @@ -68,7 +68,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) throw QueryCompilationErrors.unsupportedTableOperationError(ident, "REPLACE COLUMNS") case a @ AlterColumn(ResolvedTable(catalog, ident, table: V1Table, _), _, _, _, _, _, _) - if isSessionCatalog(catalog) => + if supportsV1Command(catalog) => if (a.column.name.length > 1) { throw QueryCompilationErrors.unsupportedTableOperationError( catalog, ident, "ALTER COLUMN with qualified column") @@ -102,7 +102,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) AlterTableChangeColumnCommand(table.catalogTable.identifier, colName, newColumn) case AlterTableClusterBy(ResolvedTable(catalog, _, table: V1Table, _), clusterBySpecOpt) - if isSessionCatalog(catalog) => + if supportsV1Command(catalog) => val prop = Map(ClusterBySpec.toProperty(table.schema, clusterBySpecOpt.getOrElse(ClusterBySpec(Nil)), conf.resolver)) AlterTableSetPropertiesCommand(table.catalogTable.identifier, prop, isView = false) @@ -125,13 +125,13 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case UnsetViewProperties(ResolvedViewIdentifier(ident), keys, ifExists) => AlterTableUnsetPropertiesCommand(ident, keys, ifExists, isView = true) - case DescribeNamespace(DatabaseInSessionCatalog(db), extended, output) if conf.useV1Command => + case DescribeNamespace(ResolvedV1Database(db), extended, output) if conf.useV1Command => DescribeDatabaseCommand(db, extended, output) - case SetNamespaceProperties(DatabaseInSessionCatalog(db), properties) if conf.useV1Command => + case SetNamespaceProperties(ResolvedV1Database(db), properties) if conf.useV1Command => AlterDatabasePropertiesCommand(db, properties) - case SetNamespaceLocation(DatabaseInSessionCatalog(db), location) if conf.useV1Command => + case SetNamespaceLocation(ResolvedV1Database(db), location) if conf.useV1Command => AlterDatabaseSetLocationCommand(db, location) case RenameTable(ResolvedV1TableOrViewIdentifier(oldIdent), newName, isView) => @@ -221,7 +221,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case DropTable(ResolvedIdentifier(FakeSystemCatalog, ident), _, _) => DropTempViewCommand(ident) - case DropView(ResolvedV1Identifier(ident), ifExists) => + case DropView(ResolvedIdentifierInSessionCatalog(ident), ifExists) => DropTableCommand(ident, ifExists, isView = true, purge = false) case DropView(r @ ResolvedIdentifier(catalog, ident), _) => @@ -237,14 +237,14 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) val newProperties = c.properties -- CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES CreateDatabaseCommand(name, c.ifNotExists, location, comment, newProperties) - case d @ DropNamespace(DatabaseInSessionCatalog(db), _, _) if conf.useV1Command => + case d @ DropNamespace(ResolvedV1Database(db), _, _) if conf.useV1Command => DropDatabaseCommand(db, d.ifExists, d.cascade) - case ShowTables(DatabaseInSessionCatalog(db), pattern, output) if conf.useV1Command => + case ShowTables(ResolvedV1Database(db), pattern, output) if conf.useV1Command => ShowTablesCommand(Some(db), pattern, output) case ShowTablesExtended( - DatabaseInSessionCatalog(db), + ResolvedV1Database(db), pattern, output) => val newOutput = if (conf.getConf(SQLConf.LEGACY_KEEP_COMMAND_OUTPUT_SCHEMA)) { @@ -257,7 +257,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case ShowTablePartition( ResolvedTable(catalog, _, table: V1Table, _), partitionSpec, - output) if isSessionCatalog(catalog) => + output) if supportsV1Command(catalog) => val newOutput = if (conf.getConf(SQLConf.LEGACY_KEEP_COMMAND_OUTPUT_SCHEMA)) { output.head.withName("database") +: output.tail } else { @@ -277,7 +277,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) AnalyzePartitionCommand(ident, partitionSpec, noScan) } - case AnalyzeTables(DatabaseInSessionCatalog(db), noScan) => + case AnalyzeTables(ResolvedV1Database(db), noScan) => AnalyzeTablesCommand(Some(db), noScan) case AnalyzeColumn(ResolvedV1TableOrViewIdentifier(ident), columnNames, allColumns) => @@ -305,7 +305,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) if conf.useV1Command => ShowCreateTableCommand(ident, output) case ShowCreateTable(ResolvedTable(catalog, _, table: V1Table, _), _, output) - if isSessionCatalog(catalog) && DDLUtils.isHiveTable(table.catalogTable) => + if supportsV1Command(catalog) && DDLUtils.isHiveTable(table.catalogTable) => ShowCreateTableCommand(table.catalogTable.identifier, output) case TruncateTable(ResolvedV1TableIdentifier(ident)) => @@ -382,7 +382,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case AlterViewSchemaBinding(ResolvedViewIdentifier(ident), viewSchemaMode) => AlterViewSchemaBindingCommand(ident, viewSchemaMode) - case CreateView(ResolvedV1Identifier(ident), userSpecifiedColumns, comment, + case CreateView(ResolvedIdentifierInSessionCatalog(ident), userSpecifiedColumns, comment, properties, originalText, child, allowExisting, replace, viewSchemaMode) => CreateViewCommand( name = ident, @@ -401,7 +401,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case ShowViews(ns: ResolvedNamespace, pattern, output) => ns match { - case DatabaseInSessionCatalog(db) => ShowViewsCommand(db, pattern, output) + case ResolvedDatabaseInSessionCatalog(db) => ShowViewsCommand(db, pattern, output) case _ => throw QueryCompilationErrors.missingCatalogAbilityError(ns.catalog, "views") } @@ -424,7 +424,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "functions") } - case ShowFunctions(DatabaseInSessionCatalog(db), userScope, systemScope, pattern, output) => + case ShowFunctions( + ResolvedDatabaseInSessionCatalog(db), userScope, systemScope, pattern, output) => ShowFunctionsCommand(db, pattern, userScope, systemScope, output) case DropFunction(ResolvedPersistentFunc(catalog, identifier, _), ifExists) => @@ -445,7 +446,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "REFRESH FUNCTION") } - case CreateFunction(ResolvedV1Identifier(ident), className, resources, ifExists, replace) => + case CreateFunction( + ResolvedIdentifierInSessionCatalog(ident), className, resources, ifExists, replace) => CreateFunctionCommand( FunctionIdentifier(ident.table, ident.database, ident.catalog), className, @@ -583,7 +585,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) object ResolvedV1TableIdentifier { def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match { - case ResolvedTable(catalog, _, t: V1Table, _) if isSessionCatalog(catalog) => + case ResolvedTable(catalog, _, t: V1Table, _) if supportsV1Command(catalog) => Some(t.catalogTable.identifier) case _ => None } @@ -598,6 +600,19 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } object ResolvedV1Identifier { + def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match { + case ResolvedIdentifier(catalog, ident) if supportsV1Command(catalog) => + if (ident.namespace().length != 1) { + throw QueryCompilationErrors + .requiresSinglePartNamespaceError(ident.namespace().toImmutableArraySeq) + } + Some(TableIdentifier(ident.name, Some(ident.namespace.head), Some(catalog.name))) + case _ => None + } + } + + // Use this object to help match commands that do not have a v2 implementation. + object ResolvedIdentifierInSessionCatalog{ def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match { case ResolvedIdentifier(catalog, ident) if isSessionCatalog(catalog) => if (ident.namespace().length != 1) { @@ -622,7 +637,21 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) DataSourceV2Utils.getTableProvider(provider, conf).isDefined } - private object DatabaseInSessionCatalog { + private object ResolvedV1Database { + def unapply(resolved: ResolvedNamespace): Option[String] = resolved match { + case ResolvedNamespace(catalog, _, _) if !supportsV1Command(catalog) => None + case ResolvedNamespace(_, Seq(), _) => + throw QueryCompilationErrors.databaseFromV1SessionCatalogNotSpecifiedError() + case ResolvedNamespace(_, Seq(dbName), _) => Some(dbName) + case _ => + assert(resolved.namespace.length > 1) + throw QueryCompilationErrors.nestedDatabaseUnsupportedByV1SessionCatalogError( + resolved.namespace.map(quoteIfNeeded).mkString(".")) + } + } + + // Use this object to help match commands that do not have a v2 implementation. + private object ResolvedDatabaseInSessionCatalog { def unapply(resolved: ResolvedNamespace): Option[String] = resolved match { case ResolvedNamespace(catalog, _, _) if !isSessionCatalog(catalog) => None case ResolvedNamespace(_, Seq(), _) => @@ -637,11 +666,16 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) private object DatabaseNameInSessionCatalog { def unapply(resolved: ResolvedNamespace): Option[String] = resolved match { - case ResolvedNamespace(catalog, _, _) if !isSessionCatalog(catalog) => None + case ResolvedNamespace(catalog, _, _) if !supportsV1Command(catalog) => None case ResolvedNamespace(_, Seq(dbName), _) => Some(dbName) case _ => assert(resolved.namespace.length > 1) throw QueryCompilationErrors.requiresSinglePartNamespaceError(resolved.namespace) } } + + private def supportsV1Command(catalog: CatalogPlugin): Boolean = { + catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME) && + !SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index b0a89173060a5..693bd0cd06ee9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -53,6 +53,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat import DataSourceV2Implicits._ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + private def hadoopConf = session.sessionState.newHadoopConf() + private def refreshCache(r: DataSourceV2Relation)(): Unit = { session.sharedState.cacheManager.recacheByPlan(session, r) } @@ -87,7 +89,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat } private def qualifyLocInTableSpec(tableSpec: TableSpec): TableSpec = { - tableSpec.withNewLocation(tableSpec.location.map(makeQualifiedDBObjectPath(_))) + tableSpec.withNewLocation(tableSpec.location.map(loc => CatalogUtils.makeQualifiedPath( + CatalogUtils.stringToURI(loc), hadoopConf).toString)) } override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala index dd678ac48c687..3757284d7d3e3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAg import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.internal.{SqlApiConf, SQLConf} +import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION import org.apache.spark.sql.types.{ArrayType, MapType, StringType, StructField, StructType} class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { @@ -157,6 +158,7 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { } test("disable bucketing on collated string column") { + spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) def createTable(bucketColumns: String*): Unit = { val tableName = "test_partition_tbl" withTable(tableName) { @@ -758,6 +760,7 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { } test("disable partition on collated string column") { + spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) def createTable(partitionColumns: String*): Unit = { val tableName = "test_partition_tbl" withTable(tableName) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala index 7bbb6485c273f..ef22fb71bb405 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala @@ -84,7 +84,7 @@ class DataSourceV2DataFrameSessionCatalogSuite spark.range(20).write.format(v2Format).option("path", "/abc").saveAsTable(t1) val cat = spark.sessionState.catalogManager.currentCatalog.asInstanceOf[TableCatalog] val tableInfo = cat.loadTable(Identifier.of(Array("default"), t1)) - assert(tableInfo.properties().get("location") === "file:/abc") + assert(tableInfo.properties().get("location") === "file:///abc") assert(tableInfo.properties().get("provider") === v2Format) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index ec4b827c659f8..2672d0e768928 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -441,7 +441,7 @@ class DataSourceV2SQLSuiteV1Filter val location = spark.sql(s"DESCRIBE EXTENDED $identifier") .filter("col_name = 'Location'") .select("data_type").head().getString(0) - assert(location === "file:/tmp/foo") + assert(location === "file:///tmp/foo") } } } @@ -458,7 +458,7 @@ class DataSourceV2SQLSuiteV1Filter val location = spark.sql(s"DESCRIBE EXTENDED $identifier") .filter("col_name = 'Location'") .select("data_type").head().getString(0) - assert(location === "file:/tmp/foo") + assert(location === "file:///tmp/foo") } } } @@ -2104,15 +2104,10 @@ class DataSourceV2SQLSuiteV1Filter } test("REPLACE TABLE: v1 table") { - val e = intercept[AnalysisException] { - sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}") - } - checkError( - exception = e, - errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION", - sqlState = "0A000", - parameters = Map("tableName" -> "`spark_catalog`.`default`.`tbl`", - "operation" -> "REPLACE TABLE")) + sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}") + val v2Catalog = catalog("spark_catalog").asTableCatalog + val table = v2Catalog.loadTable(Identifier.of(Array("default"), "tbl")) + assert(table.properties().get(TableCatalog.PROP_PROVIDER) == classOf[SimpleScanSource].getName) } test("DeleteFrom: - delete with invalid predicate") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala index 3b2fc0379340b..ff944dbb805cb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala @@ -71,18 +71,22 @@ private[connector] trait TestV2SessionCatalogBase[T <: Table] extends Delegating properties: java.util.Map[String, String]): Table = { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper val key = TestV2SessionCatalogBase.SIMULATE_ALLOW_EXTERNAL_PROPERTY - val propsWithLocation = if (properties.containsKey(key)) { + val newProps = new java.util.HashMap[String, String]() + newProps.putAll(properties) + if (properties.containsKey(TableCatalog.PROP_LOCATION)) { + newProps.put(TableCatalog.PROP_EXTERNAL, "true") + } + + val propsWithLocation = if (newProps.containsKey(key)) { // Always set a location so that CREATE EXTERNAL TABLE won't fail with LOCATION not specified. - if (!properties.containsKey(TableCatalog.PROP_LOCATION)) { - val newProps = new java.util.HashMap[String, String]() - newProps.putAll(properties) + if (!newProps.containsKey(TableCatalog.PROP_LOCATION)) { newProps.put(TableCatalog.PROP_LOCATION, "file:/abc") newProps } else { - properties + newProps } } else { - properties + newProps } super.createTable(ident, columns, partitions, propsWithLocation) val schema = CatalogV2Util.v2ColumnsToStructType(columns) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowCreateTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowCreateTableSuite.scala index f72127cbd1de2..5b48faf5c8869 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowCreateTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowCreateTableSuite.scala @@ -106,7 +106,7 @@ class ShowCreateTableSuite extends command.ShowCreateTableSuiteBase with Command "'via' = '2')", "PARTITIONED BY (a)", "COMMENT 'This is a comment'", - "LOCATION 'file:/tmp'", + "LOCATION 'file:///tmp'", "TBLPROPERTIES (", "'password' = '*********(redacted)',", "'prop1' = '1',", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index 7c929b5da872a..88b8506138315 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -824,7 +824,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf assert(table.properties().get("comment").equals(description)) assert(table.properties().get("path").equals(dir.getAbsolutePath)) assert(table.properties().get("external").equals("true")) - assert(table.properties().get("location").equals("file:" + dir.getAbsolutePath)) + assert(table.properties().get("location").equals("file://" + dir.getAbsolutePath)) } }