Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ object CatalogUtils {
new Path(str).toUri
}

def makeQualifiedNamespacePath(
def makeQualifiedDBObjectPath(
locationUri: URI,
warehousePath: String,
hadoopConf: Configuration): URI = {
Expand All @@ -271,6 +271,14 @@ object CatalogUtils {
}
}

def makeQualifiedDBObjectPath(
warehouse: String,
location: String,
hadoopConf: Configuration): String = {
val nsPath = makeQualifiedDBObjectPath(stringToURI(location), warehouse, hadoopConf)
URIToString(nsPath)
}

def makeQualifiedPath(path: URI, hadoopConf: Configuration): URI = {
val hadoopPath = new Path(path)
val fs = hadoopPath.getFileSystem(hadoopConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ class SessionCatalog(
}

private def makeQualifiedDBPath(locationUri: URI): URI = {
CatalogUtils.makeQualifiedNamespacePath(locationUri, conf.warehousePath, hadoopConf)
CatalogUtils.makeQualifiedDBObjectPath(locationUri, conf.warehousePath, hadoopConf)
}

def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true)
}

private def makeQualifiedNamespacePath(location: String): String = {
val warehousePath = session.sharedState.conf.get(WAREHOUSE_PATH)
val nsPath = CatalogUtils.makeQualifiedNamespacePath(
CatalogUtils.stringToURI(location), warehousePath, session.sharedState.hadoopConf)
CatalogUtils.URIToString(nsPath)
private def makeQualifiedDBObjectPath(location: String): String = {
CatalogUtils.makeQualifiedDBObjectPath(session.sharedState.conf.get(WAREHOUSE_PATH),
location, session.sharedState.hadoopConf)
}

override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
Expand Down Expand Up @@ -167,8 +165,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat

case CreateTable(ResolvedDBObjectName(catalog, ident), schema, partitioning,
tableSpec, ifNotExists) =>
val qualifiedLocation = tableSpec.location.map(makeQualifiedDBObjectPath(_))
CreateTableExec(catalog.asTableCatalog, ident.asIdentifier, schema,
partitioning, tableSpec, ifNotExists) :: Nil
partitioning, tableSpec.copy(location = qualifiedLocation), ifNotExists) :: Nil

case CreateTableAsSelect(catalog, ident, parts, query, props, options, ifNotExists) =>
val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
Expand All @@ -186,7 +185,10 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
RefreshTableExec(r.catalog, r.identifier, recacheTable(r)) :: Nil

case ReplaceTable(catalog, ident, schema, parts, props, orCreate) =>
val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
val newProps = props.get(TableCatalog.PROP_LOCATION).map { loc =>
props + (TableCatalog.PROP_LOCATION -> makeQualifiedDBObjectPath(loc))
}.getOrElse(props)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As #PR not merged

val propsWithOwner = CatalogV2Util.withDefaultOwnership(newProps)
catalog match {
case staging: StagingTableCatalog =>
AtomicReplaceTableExec(
Expand Down Expand Up @@ -324,7 +326,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
AlterNamespaceSetPropertiesExec(
catalog.asNamespaceCatalog,
ns,
Map(SupportsNamespaces.PROP_LOCATION -> makeQualifiedNamespacePath(location))) :: Nil
Map(SupportsNamespaces.PROP_LOCATION -> makeQualifiedDBObjectPath(location))) :: Nil

case CommentOnNamespace(ResolvedNamespace(catalog, ns), comment) =>
AlterNamespaceSetPropertiesExec(
Expand All @@ -334,7 +336,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat

case CreateNamespace(ResolvedDBObjectName(catalog, name), ifNotExists, properties) =>
val finalProperties = properties.get(SupportsNamespaces.PROP_LOCATION).map { loc =>
properties + (SupportsNamespaces.PROP_LOCATION -> makeQualifiedNamespacePath(loc))
properties + (SupportsNamespaces.PROP_LOCATION -> makeQualifiedDBObjectPath(loc))
}.getOrElse(properties)
CreateNamespaceExec(catalog.asNamespaceCatalog, name, ifNotExists, finalProperties) :: Nil

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class DataSourceV2SQLSuite
" PARTITIONED BY (id)" +
" TBLPROPERTIES ('bar'='baz')" +
" COMMENT 'this is a test table'" +
" LOCATION '/tmp/testcat/table_name'")
" LOCATION 'file:/tmp/testcat/table_name'")
val descriptionDf = spark.sql("DESCRIBE TABLE EXTENDED testcat.table_name")
assert(descriptionDf.schema.map(field => (field.name, field.dataType))
=== Seq(
Expand All @@ -149,7 +149,7 @@ class DataSourceV2SQLSuite
Array("# Detailed Table Information", "", ""),
Array("Name", "testcat.table_name", ""),
Array("Comment", "this is a test table", ""),
Array("Location", "/tmp/testcat/table_name", ""),
Array("Location", "file:/tmp/testcat/table_name", ""),
Array("Provider", "foo", ""),
Array(TableCatalog.PROP_OWNER.capitalize, defaultUser, ""),
Array("Table Properties", "[bar=baz]", "")))
Expand Down Expand Up @@ -1179,8 +1179,9 @@ class DataSourceV2SQLSuite
s" ('path'='bar', 'Path'='noop')")
val tableCatalog = catalog("testcat").asTableCatalog
val identifier = Identifier.of(Array(), "reservedTest")
assert(tableCatalog.loadTable(identifier).properties()
.get(TableCatalog.PROP_LOCATION) == "foo",
val location = tableCatalog.loadTable(identifier).properties()
.get(TableCatalog.PROP_LOCATION)
assert(location.startsWith("file:") && location.endsWith("foo"),
"path as a table property should not have side effects")
assert(tableCatalog.loadTable(identifier).properties().get("path") == "bar",
"path as a table property should not have side effects")
Expand Down Expand Up @@ -2012,7 +2013,7 @@ class DataSourceV2SQLSuite
|COMMENT 'This is a comment'
|TBLPROPERTIES ('prop1' = '1', 'prop2' = '2', 'prop3' = 3, 'prop4' = 4)
|PARTITIONED BY (a)
|LOCATION '/tmp'
|LOCATION 'file:/tmp'
""".stripMargin)
val showDDL = getShowCreateDDL(s"SHOW CREATE TABLE $t")
assert(showDDL === Array(
Expand All @@ -2029,7 +2030,7 @@ class DataSourceV2SQLSuite
"'via' = '2')",
"PARTITIONED BY (a)",
"COMMENT 'This is a comment'",
"LOCATION '/tmp'",
"LOCATION 'file:/tmp'",
"TBLPROPERTIES(",
"'prop1' = '1',",
"'prop2' = '2',",
Expand Down