Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ statement
(WITH (DBPROPERTIES | PROPERTIES) tablePropertyList))* #createNamespace
| ALTER (database | NAMESPACE) multipartIdentifier
SET (DBPROPERTIES | PROPERTIES) tablePropertyList #setNamespaceProperties
| ALTER database db=errorCapturingIdentifier
SET locationSpec #setDatabaseLocation
| ALTER (database | NAMESPACE) multipartIdentifier
SET locationSpec #setNamespaceLocation
| DROP (database | NAMESPACE) (IF EXISTS)? multipartIdentifier
(RESTRICT | CASCADE)? #dropNamespace
| SHOW (DATABASES | NAMESPACES) ((FROM | IN) multipartIdentifier)?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,11 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
s"because view support in catalog has not been implemented yet")

case AlterNamespaceSetPropertiesStatement(NonSessionCatalog(catalog, nameParts), properties) =>
AlterNamespaceSetProperties(catalog, nameParts, properties)
AlterNamespaceSetProperties(catalog.asNamespaceCatalog, nameParts, properties)

case AlterNamespaceSetLocationStatement(NonSessionCatalog(catalog, nameParts), location) =>
AlterNamespaceSetProperties(
catalog.asNamespaceCatalog, nameParts, Map("location" -> location))

case DescribeTableStatement(
nameParts @ NonSessionCatalog(catalog, tableName), partitionSpec, isExtended) =>
Expand Down Expand Up @@ -176,7 +180,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
DropNamespace(catalog, nameParts, ifExists, cascade)

case DescribeNamespaceStatement(NonSessionCatalog(catalog, nameParts), extended) =>
DescribeNamespace(catalog, nameParts, extended)
DescribeNamespace(catalog.asNamespaceCatalog, nameParts, extended)

case ShowNamespacesStatement(Some(CatalogAndNamespace(catalog, namespace)), pattern) =>
ShowNamespaces(catalog.asNamespaceCatalog, namespace, pattern)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2545,6 +2545,22 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
}
}

/**
* Create an [[AlterNamespaceSetLocationStatement]] logical plan.
*
* For example:
* {{{
* ALTER (DATABASE|SCHEMA|NAMESPACE) namespace SET LOCATION path;
* }}}
*/
override def visitSetNamespaceLocation(ctx: SetNamespaceLocationContext): LogicalPlan = {
withOrigin(ctx) {
AlterNamespaceSetLocationStatement(
visitMultipartIdentifier(ctx.multipartIdentifier),
visitLocationSpec(ctx.locationSpec))
}
}

/**
* Create a [[ShowNamespacesStatement]] command.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,13 @@ case class AlterNamespaceSetPropertiesStatement(
namespace: Seq[String],
properties: Map[String, String]) extends ParsedStatement

/**
* ALTER (DATABASE|SCHEMA|NAMESPACE) ... SET LOCATION command, as parsed from SQL.
*/
case class AlterNamespaceSetLocationStatement(
namespace: Seq[String],
location: String) extends ParsedStatement

/**
* A SHOW NAMESPACES statement, as parsed from SQL.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ case class DropNamespace(
* The logical plan of the DESCRIBE NAMESPACE command that works for v2 catalogs.
*/
case class DescribeNamespace(
catalog: CatalogPlugin,
catalog: SupportsNamespaces,
namespace: Seq[String],
extended: Boolean) extends Command {

Expand All @@ -275,7 +275,7 @@ case class DescribeNamespace(
* command that works for v2 catalogs.
*/
case class AlterNamespaceSetProperties(
catalog: CatalogPlugin,
catalog: SupportsNamespaces,
namespace: Seq[String],
properties: Map[String, String]) extends Command

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1192,6 +1192,20 @@ class DDLParserSuite extends AnalysisTest {
Seq("a", "b", "c"), Map("b" -> "b")))
}

test("set namespace location") {
comparePlans(
parsePlan("ALTER DATABASE a.b.c SET LOCATION '/home/user/db'"),
AlterNamespaceSetLocationStatement(Seq("a", "b", "c"), "/home/user/db"))

comparePlans(
parsePlan("ALTER SCHEMA a.b.c SET LOCATION '/home/user/db'"),
AlterNamespaceSetLocationStatement(Seq("a", "b", "c"), "/home/user/db"))

comparePlans(
parsePlan("ALTER NAMESPACE a.b.c SET LOCATION '/home/user/db'"),
AlterNamespaceSetLocationStatement(Seq("a", "b", "c"), "/home/user/db"))
}

test("show databases: basic") {
comparePlans(
parsePlan("SHOW DATABASES"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,13 @@ class ResolveSessionCatalog(
}
AlterDatabasePropertiesCommand(nameParts.head, properties)

case AlterNamespaceSetLocationStatement(SessionCatalog(_, nameParts), location) =>
if (nameParts.length != 1) {
throw new AnalysisException(
s"The database name is not valid: ${nameParts.quoted}")
}
AlterDatabaseSetLocationCommand(nameParts.head, location)

case DescribeTableStatement(
nameParts @ SessionCatalog(catalog, tableName), partitionSpec, isExtended) =>
loadTable(catalog, tableName.asIdentifier).collect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,22 +227,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty))
}

/**
* Create an [[AlterDatabaseSetLocationCommand]] command.
*
* For example:
* {{{
* ALTER (DATABASE|SCHEMA) database SET LOCATION path;
* }}}
*/
override def visitSetDatabaseLocation(
ctx: SetDatabaseLocationContext): LogicalPlan = withOrigin(ctx) {
AlterDatabaseSetLocationCommand(
ctx.db.getText,
visitLocationSpec(ctx.locationSpec)
)
}

/**
* Create a plan for a DESCRIBE FUNCTION command.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,20 @@ package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, NamespaceChange}
import org.apache.spark.sql.connector.catalog.{NamespaceChange, SupportsNamespaces}

/**
* Physical plan node for setting properties of namespace.
*/
case class AlterNamespaceSetPropertiesExec(
catalog: CatalogPlugin,
catalog: SupportsNamespaces,
namespace: Seq[String],
props: Map[String, String])
extends V2CommandExec {
props: Map[String, String]) extends V2CommandExec {
override protected def run(): Seq[InternalRow] = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

val changes = props.map{ case (k, v) =>
NamespaceChange.setProperty(k, v)
}.toSeq
catalog.asNamespaceCatalog.alterNamespace(namespace.toArray, changes: _*)
catalog.alterNamespace(namespace.toArray, changes: _*)
Seq.empty
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchema}
import org.apache.spark.sql.connector.catalog.CatalogPlugin
import org.apache.spark.sql.connector.catalog.SupportsNamespaces
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.COMMENT_TABLE_PROP
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.LOCATION_TABLE_PROP
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.RESERVED_PROPERTIES
Expand All @@ -34,19 +34,15 @@ import org.apache.spark.sql.types.StructType
*/
case class DescribeNamespaceExec(
output: Seq[Attribute],
catalog: CatalogPlugin,
catalog: SupportsNamespaces,
namespace: Seq[String],
isExtended: Boolean) extends V2CommandExec {

private val encoder = RowEncoder(StructType.fromAttributes(output)).resolveAndBind()

override protected def run(): Seq[InternalRow] = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

val rows = new ArrayBuffer[InternalRow]()
val nsCatalog = catalog.asNamespaceCatalog
val ns = namespace.toArray
val metadata = nsCatalog.loadNamespaceMetadata(ns)
val metadata = catalog.loadNamespaceMetadata(ns)

rows += toCatalystRow("Namespace Name", ns.last)
rows += toCatalystRow("Description", metadata.get(COMMENT_TABLE_PROP))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,20 @@ class DataSourceV2SQLSuite
}
}

test("AlterNamespaceSetLocation using v2 catalog") {
withNamespace("testcat.ns1.ns2") {
sql("CREATE NAMESPACE IF NOT EXISTS testcat.ns1.ns2 COMMENT " +
"'test namespace' LOCATION '/tmp/ns_test_1'")
sql("ALTER NAMESPACE testcat.ns1.ns2 SET LOCATION '/tmp/ns_test_2'")
val descriptionDf = sql("DESCRIBE NAMESPACE EXTENDED testcat.ns1.ns2")
assert(descriptionDf.collect() === Seq(
Row("Namespace Name", "ns2"),
Row("Description", "test namespace"),
Row("Location", "/tmp/ns_test_2")
))
}
}

test("ShowNamespaces: show root namespaces with default v2 catalog") {
spark.conf.set(SQLConf.DEFAULT_CATALOG.key, "testcat")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,6 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
containsThesePhrases = Seq("key_without_value"))
}

test("alter database set location") {
// ALTER (DATABASE|SCHEMA) database_name SET LOCATION
val sql1 = "ALTER DATABASE database_name SET LOCATION '/home/user/db'"
val parsed1 = parser.parsePlan(sql1)

val expected1 = AlterDatabaseSetLocationCommand("database_name", "/home/user/db")
comparePlans(parsed1, expected1)
}

test("create function") {
val sql1 =
"""
Expand Down