Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.catalog

import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.util.Shell

Expand Down Expand Up @@ -258,6 +259,24 @@ object CatalogUtils {
new Path(str).toUri
}

def makeQualifiedNamespacePath(
locationUri: URI,
warehousePath: String,
hadoopConf: Configuration): URI = {
if (locationUri.isAbsolute) {
locationUri
} else {
val fullPath = new Path(warehousePath, CatalogUtils.URIToString(locationUri))
makeQualifiedPath(fullPath.toUri, hadoopConf)
}
}

def makeQualifiedPath(path: URI, hadoopConf: Configuration): URI = {
val hadoopPath = new Path(path)
val fs = hadoopPath.getFileSystem(hadoopConf)
fs.makeQualified(hadoopPath).toUri
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are copied from SessionCatalog.scala

private def normalizeColumnName(
tableName: String,
tableCols: Seq[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,7 @@ class SessionCatalog(
* FileSystem is changed.
*/
private def makeQualifiedPath(path: URI): URI = {
val hadoopPath = new Path(path)
val fs = hadoopPath.getFileSystem(hadoopConf)
fs.makeQualified(hadoopPath).toUri
CatalogUtils.makeQualifiedPath(path, hadoopConf)
}

private def requireDbExists(db: String): Unit = {
Expand Down Expand Up @@ -254,12 +252,7 @@ class SessionCatalog(
}

private def makeQualifiedDBPath(locationUri: URI): URI = {
if (locationUri.isAbsolute) {
locationUri
} else {
val fullPath = new Path(conf.warehousePath, CatalogUtils.URIToString(locationUri))
makeQualifiedPath(fullPath.toUri)
}
CatalogUtils.makeQualifiedNamespacePath(locationUri, conf.warehousePath, hadoopConf)
}

def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.{expressions, SQLConfHelper}
import org.apache.spark.sql.catalyst.analysis.{ResolvedDBObjectName, ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable}
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning, EmptyRow, Expression, Literal, NamedExpression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
Expand All @@ -44,7 +45,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.StorageLevel
import org.apache.spark.unsafe.types.UTF8String

class DataSourceV2Strategy(session: SparkSession) extends Strategy with PredicateHelper {
class DataSourceV2Strategy(session: SparkSession) extends Strategy
with PredicateHelper with SQLConfHelper {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need SQLConfHelper. We can get the conf by session.sessionState.conf

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.


import DataSourceV2Implicits._
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
Expand Down Expand Up @@ -311,10 +313,12 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
AlterNamespaceSetPropertiesExec(catalog.asNamespaceCatalog, ns, properties) :: Nil

case SetNamespaceLocation(ResolvedNamespace(catalog, ns), location) =>
val nsPath = CatalogUtils.makeQualifiedNamespacePath(
CatalogUtils.stringToURI(location), conf.warehousePath, session.sharedState.hadoopConf)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will now qualify the namespace location for v2 command.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

session.conf will contain the qualified warehouse path, but using SQLConf is fine since it will be qualified anyway (same as v1):

SharedState.setWarehousePathConf(confClone, hadoopConfClone, qualified)

AlterNamespaceSetPropertiesExec(
catalog.asNamespaceCatalog,
ns,
Map(SupportsNamespaces.PROP_LOCATION -> location)) :: Nil
Map(SupportsNamespaces.PROP_LOCATION -> CatalogUtils.URIToString(nsPath))) :: Nil

case CommentOnNamespace(ResolvedNamespace(catalog, ns), comment) =>
AlterNamespaceSetPropertiesExec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1294,13 +1294,25 @@ class DataSourceV2SQLSuite
assert(descriptionDf.collect() === Seq(
Row("Namespace Name", "ns2"),
Row(SupportsNamespaces.PROP_COMMENT.capitalize, "test namespace"),
Row(SupportsNamespaces.PROP_LOCATION.capitalize, "/tmp/ns_test_2"),
Row(SupportsNamespaces.PROP_LOCATION.capitalize, "file:/tmp/ns_test_2"),
Row(SupportsNamespaces.PROP_OWNER.capitalize, defaultUser),
Row("Properties", ""))
)
}
}

test("SPARK-37444: ALTER NAMESPACE .. SET LOCATION using v2 catalog with empty location") {
val ns = "testcat.ns1.ns2"
withNamespace(ns) {
sql(s"CREATE NAMESPACE IF NOT EXISTS $ns COMMENT " +
"'test namespace' LOCATION '/tmp/ns_test_1'")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we do the same thing for CREATE NAMESPACE? We can do it in a separated PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we should fix this. I will do it in a separate PR. Thanks!

val e = intercept[IllegalArgumentException] {
sql(s"ALTER DATABASE $ns SET LOCATION ''")
}
assert(e.getMessage.contains("Can not create a Path from an empty string"))
}
}

private def testShowNamespaces(
sqlText: String,
expected: Seq[String]): Unit = {
Expand Down