Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.catalog

import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.util.Shell

Expand Down Expand Up @@ -258,6 +259,24 @@ object CatalogUtils {
new Path(str).toUri
}

def makeQualifiedNamespacePath(
locationUri: URI,
warehousePath: String,
hadoopConf: Configuration): URI = {
if (locationUri.isAbsolute) {
locationUri
} else {
val fullPath = new Path(warehousePath, CatalogUtils.URIToString(locationUri))
makeQualifiedPath(fullPath.toUri, hadoopConf)
}
}

def makeQualifiedPath(path: URI, hadoopConf: Configuration): URI = {
val hadoopPath = new Path(path)
val fs = hadoopPath.getFileSystem(hadoopConf)
fs.makeQualified(hadoopPath).toUri
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are copied from SessionCatalog.scala

private def normalizeColumnName(
tableName: String,
tableCols: Seq[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,7 @@ class SessionCatalog(
* FileSystem is changed.
*/
private def makeQualifiedPath(path: URI): URI = {
val hadoopPath = new Path(path)
val fs = hadoopPath.getFileSystem(hadoopConf)
fs.makeQualified(hadoopPath).toUri
CatalogUtils.makeQualifiedPath(path, hadoopConf)
}

private def requireDbExists(db: String): Unit = {
Expand Down Expand Up @@ -254,12 +252,7 @@ class SessionCatalog(
}

private def makeQualifiedDBPath(locationUri: URI): URI = {
if (locationUri.isAbsolute) {
locationUri
} else {
val fullPath = new Path(conf.warehousePath, CatalogUtils.URIToString(locationUri))
makeQualifiedPath(fullPath.toUri)
}
CatalogUtils.makeQualifiedNamespacePath(locationUri, conf.warehousePath, hadoopConf)
}

def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.collection.mutable

import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.analysis.{ResolvedDBObjectName, ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable}
import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning, EmptyRow, Expression, Literal, NamedExpression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
Expand All @@ -38,6 +39,7 @@ import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors
import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, LocalTableScanExec, ProjectExec, RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, PushableColumn, PushableColumnBase}
import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types.{BooleanType, StringType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -311,10 +313,13 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
AlterNamespaceSetPropertiesExec(catalog.asNamespaceCatalog, ns, properties) :: Nil

case SetNamespaceLocation(ResolvedNamespace(catalog, ns), location) =>
val warehousePath = session.sharedState.conf.get(WAREHOUSE_PATH)
val nsPath = CatalogUtils.makeQualifiedNamespacePath(
CatalogUtils.stringToURI(location), warehousePath, session.sharedState.hadoopConf)
AlterNamespaceSetPropertiesExec(
catalog.asNamespaceCatalog,
ns,
Map(SupportsNamespaces.PROP_LOCATION -> location)) :: Nil
Map(SupportsNamespaces.PROP_LOCATION -> CatalogUtils.URIToString(nsPath))) :: Nil

case CommentOnNamespace(ResolvedNamespace(catalog, ns), comment) =>
AlterNamespaceSetPropertiesExec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1294,13 +1294,25 @@ class DataSourceV2SQLSuite
assert(descriptionDf.collect() === Seq(
Row("Namespace Name", "ns2"),
Row(SupportsNamespaces.PROP_COMMENT.capitalize, "test namespace"),
Row(SupportsNamespaces.PROP_LOCATION.capitalize, "/tmp/ns_test_2"),
Row(SupportsNamespaces.PROP_LOCATION.capitalize, "file:/tmp/ns_test_2"),
Row(SupportsNamespaces.PROP_OWNER.capitalize, defaultUser),
Row("Properties", ""))
)
}
}

test("SPARK-37444: ALTER NAMESPACE .. SET LOCATION using v2 catalog with empty location") {
val ns = "testcat.ns1.ns2"
withNamespace(ns) {
sql(s"CREATE NAMESPACE IF NOT EXISTS $ns COMMENT " +
"'test namespace' LOCATION '/tmp/ns_test_1'")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we do the same thing for CREATE NAMESPACE? We can do it in a separated PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we should fix this. I will do it in a separate PR. Thanks!

val e = intercept[IllegalArgumentException] {
sql(s"ALTER DATABASE $ns SET LOCATION ''")
}
assert(e.getMessage.contains("Can not create a Path from an empty string"))
}
}

private def testShowNamespaces(
sqlText: String,
expected: Seq[String]): Unit = {
Expand Down