Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive

import java.io.IOException
import java.lang.reflect.InvocationTargetException
import java.net.URI
import java.util
import java.util.Locale

Expand Down Expand Up @@ -842,10 +843,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// source tables. Here we set the table location to `locationUri` field and filter out the
// path option in storage properties, to avoid exposing this concept externally.
val storageWithLocation = {
val tableLocation = getLocationFromStorageProps(table)
val tableLocation = getLocationFromStorageProps(table).map { path =>
// Before SPARK-19257, created data source table does not use absolute uri.
// This makes Spark can't read these tables across HDFS clusters.
// Rewrite table path to absolute uri based on location uri (The location uri has been
// rewritten by HiveClientImpl.convertHiveTableToCatalogTable) to fix this issue.
toAbsoluteURI(CatalogUtils.stringToURI(path), table.storage.locationUri)
Copy link
Contributor

@cloud-fan cloud-fan May 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, let's explain the backward compatibility story with code comments.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

}
// We pass None as `newPath` here, to remove the path option in storage properties.
updateLocationInStorageProps(table, newPath = None).copy(
locationUri = tableLocation.map(CatalogUtils.stringToURI(_)))
updateLocationInStorageProps(table, newPath = None).copy(locationUri = tableLocation)
}
val storageWithoutHiveGeneratedProperties = storageWithLocation.copy(properties =
storageWithLocation.properties.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap)
Expand Down Expand Up @@ -1432,4 +1438,19 @@ object HiveExternalCatalog {
isHiveCompatibleDataType(m.keyType) && isHiveCompatibleDataType(m.valueType)
case _ => true
}

/** Rewrite uri to absolute location. For example:
* uri: /user/hive/warehouse/test_table
* absoluteUri: viewfs://clusterA/user/hive/warehouse/
* The result is: viewfs://clusterA/user/hive/warehouse/test_table
*/
private[spark] def toAbsoluteURI(uri: URI, absoluteUri: Option[URI]): URI = {
if (!uri.isAbsolute && absoluteUri.isDefined) {
val aUri = absoluteUri.get
new URI(aUri.getScheme, aUri.getUserInfo, aUri.getHost, aUri.getPort,
uri.getPath, uri.getQuery, uri.getFragment)
} else {
uri
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.client
import java.io.PrintStream
import java.lang.{Iterable => JIterable}
import java.lang.reflect.InvocationTargetException
import java.net.URI
import java.nio.charset.StandardCharsets.UTF_8
import java.util.{Locale, Map => JMap}
import java.util.concurrent.TimeUnit._
Expand Down Expand Up @@ -52,6 +53,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, NoSuchTableException, PartitionAlreadyExistsException, PartitionsAlreadyExistException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.CatalogUtils.stringToURI
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
Expand Down Expand Up @@ -518,7 +520,15 @@ private[hive] class HiveClientImpl(
createTime = h.getTTable.getCreateTime.toLong * 1000,
lastAccessTime = h.getLastAccessTime.toLong * 1000,
storage = CatalogStorageFormat(
locationUri = shim.getDataLocation(h).map(CatalogUtils.stringToURI),
locationUri = shim.getDataLocation(h).map { loc =>
val tableUri = stringToURI(loc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some code comments to explain this backward compatibility story?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

// Before SPARK-19257, created data source table does not use absolute uri.
// This makes Spark can't read these tables across HDFS clusters.
// Rewrite table location to absolute uri based on database uri to fix this issue.
val absoluteUri = Option(tableUri).filterNot(_.isAbsolute)
.map(_ => stringToURI(client.getDatabase(h.getDbName).getLocationUri))
HiveExternalCatalog.toAbsoluteURI(tableUri, absoluteUri)
},
// To avoid ClassNotFound exception, we try our best to not get the format class, but get
// the class name directly. However, for non-native tables, there is no interface to get
// the format class name, so we may still throw ClassNotFound in this case.
Expand Down Expand Up @@ -731,7 +741,7 @@ private[hive] class HiveClientImpl(
spec: TablePartitionSpec): Option[CatalogTablePartition] = withHiveState {
val hiveTable = toHiveTable(table, Some(userName))
val hivePartition = shim.getPartition(client, hiveTable, spec.asJava, false)
Option(hivePartition).map(fromHivePartition)
Option(hivePartition).map(fromHivePartition(_, table.storage.locationUri))
}

override def getPartitions(
Expand All @@ -753,7 +763,10 @@ private[hive] class HiveClientImpl(
assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
s
}
val parts = shim.getPartitions(client, hiveTable, partSpec.asJava).map(fromHivePartition)
val absoluteUri = shim.getDataLocation(hiveTable).map(stringToURI).filterNot(_.isAbsolute)
.map(_ => stringToURI(client.getDatabase(hiveTable.getDbName).getLocationUri))
val parts = shim.getPartitions(client, hiveTable, partSpec.asJava)
.map(fromHivePartition(_, absoluteUri))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean we always calculate the absoluteUri even if the partition uri is absolute?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. the absoluteUri is None if table location is absolute uri.

HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
parts.toSeq
}
Expand All @@ -763,7 +776,7 @@ private[hive] class HiveClientImpl(
predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState {
val hiveTable = toHiveTable(table, Some(userName))
val parts = shim.getPartitionsByFilter(client, hiveTable, predicates, table)
.map(fromHivePartition)
.map(fromHivePartition(_, table.storage.locationUri))
HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
parts
}
Expand Down Expand Up @@ -1144,7 +1157,7 @@ private[hive] object HiveClientImpl extends Logging {
/**
* Build the native partition metadata from Hive's Partition.
*/
def fromHivePartition(hp: HivePartition): CatalogTablePartition = {
def fromHivePartition(hp: HivePartition, absoluteUri: Option[URI]): CatalogTablePartition = {
val apiPartition = hp.getTPartition
val properties: Map[String, String] = if (hp.getParameters != null) {
hp.getParameters.asScala.toMap
Expand All @@ -1154,7 +1167,8 @@ private[hive] object HiveClientImpl extends Logging {
CatalogTablePartition(
spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty),
storage = CatalogStorageFormat(
locationUri = Option(CatalogUtils.stringToURI(apiPartition.getSd.getLocation)),
locationUri = Option(HiveExternalCatalog.toAbsoluteURI(
stringToURI(apiPartition.getSd.getLocation), absoluteUri)),
inputFormat = Option(apiPartition.getSd.getInputFormat),
outputFormat = Option(apiPartition.getSd.getOutputFormat),
serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,4 +200,19 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
assert(alteredTable.provider === Some("foo"))
})
}

test("SPARK-39203: Rewrite table location to absolute location based on database location") {
val tableLocation1 = CatalogUtils.stringToURI("/user/hive/warehouse/t1")
val tableLocation2 = CatalogUtils.stringToURI("viewfs://clusterB/user/hive/warehouse/t2")
val dbLocation = CatalogUtils.stringToURI("viewfs://clusterA/user/hive/warehouse/")

assert(HiveExternalCatalog.toAbsoluteURI(tableLocation1, Some(dbLocation))
.equals(CatalogUtils.stringToURI("viewfs://clusterA/user/hive/warehouse/t1")))

assert(HiveExternalCatalog.toAbsoluteURI(tableLocation1, None)
.equals(tableLocation1))

assert(HiveExternalCatalog.toAbsoluteURI(tableLocation2, Some(dbLocation))
.equals(tableLocation2))
}
}