diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index fefa032d3510..32e453911799 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive import java.io.IOException import java.lang.reflect.InvocationTargetException +import java.net.URI import java.util import java.util.Locale @@ -842,10 +843,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // source tables. Here we set the table location to `locationUri` field and filter out the // path option in storage properties, to avoid exposing this concept externally. val storageWithLocation = { - val tableLocation = getLocationFromStorageProps(table) + val tableLocation = getLocationFromStorageProps(table).map { path => + // Before SPARK-19257, created data source table does not use absolute uri. + // This makes Spark can't read these tables across HDFS clusters. + // Rewrite table path to absolute uri based on location uri (The location uri has been + // rewritten by HiveClientImpl.convertHiveTableToCatalogTable) to fix this issue. + toAbsoluteURI(CatalogUtils.stringToURI(path), table.storage.locationUri) + } // We pass None as `newPath` here, to remove the path option in storage properties. - updateLocationInStorageProps(table, newPath = None).copy( - locationUri = tableLocation.map(CatalogUtils.stringToURI(_))) + updateLocationInStorageProps(table, newPath = None).copy(locationUri = tableLocation) } val storageWithoutHiveGeneratedProperties = storageWithLocation.copy(properties = storageWithLocation.properties.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap) @@ -1432,4 +1438,19 @@ object HiveExternalCatalog { isHiveCompatibleDataType(m.keyType) && isHiveCompatibleDataType(m.valueType) case _ => true } + + /** Rewrite uri to absolute location. For example: + * uri: /user/hive/warehouse/test_table + * absoluteUri: viewfs://clusterA/user/hive/warehouse/ + * The result is: viewfs://clusterA/user/hive/warehouse/test_table + */ + private[spark] def toAbsoluteURI(uri: URI, absoluteUri: Option[URI]): URI = { + if (!uri.isAbsolute && absoluteUri.isDefined) { + val aUri = absoluteUri.get + new URI(aUri.getScheme, aUri.getUserInfo, aUri.getHost, aUri.getPort, + uri.getPath, uri.getQuery, uri.getFragment) + } else { + uri + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index d70ac781c039..7047166413f7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.client import java.io.PrintStream import java.lang.{Iterable => JIterable} import java.lang.reflect.InvocationTargetException +import java.net.URI import java.nio.charset.StandardCharsets.UTF_8 import java.util.{Locale, Map => JMap} import java.util.concurrent.TimeUnit._ @@ -52,6 +53,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, NoSuchTableException, PartitionAlreadyExistsException, PartitionsAlreadyExistException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.catalog.CatalogUtils.stringToURI import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.catalyst.util.CharVarcharUtils @@ -518,7 +520,15 @@ private[hive] class HiveClientImpl( createTime = h.getTTable.getCreateTime.toLong * 1000, lastAccessTime = h.getLastAccessTime.toLong * 1000, storage = CatalogStorageFormat( - locationUri = shim.getDataLocation(h).map(CatalogUtils.stringToURI), + locationUri = shim.getDataLocation(h).map { loc => + val tableUri = stringToURI(loc) + // Before SPARK-19257, created data source table does not use absolute uri. + // This makes Spark can't read these tables across HDFS clusters. + // Rewrite table location to absolute uri based on database uri to fix this issue. + val absoluteUri = Option(tableUri).filterNot(_.isAbsolute) + .map(_ => stringToURI(client.getDatabase(h.getDbName).getLocationUri)) + HiveExternalCatalog.toAbsoluteURI(tableUri, absoluteUri) + }, // To avoid ClassNotFound exception, we try our best to not get the format class, but get // the class name directly. However, for non-native tables, there is no interface to get // the format class name, so we may still throw ClassNotFound in this case. @@ -731,7 +741,7 @@ private[hive] class HiveClientImpl( spec: TablePartitionSpec): Option[CatalogTablePartition] = withHiveState { val hiveTable = toHiveTable(table, Some(userName)) val hivePartition = shim.getPartition(client, hiveTable, spec.asJava, false) - Option(hivePartition).map(fromHivePartition) + Option(hivePartition).map(fromHivePartition(_, table.storage.locationUri)) } override def getPartitions( @@ -753,7 +763,10 @@ private[hive] class HiveClientImpl( assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid") s } - val parts = shim.getPartitions(client, hiveTable, partSpec.asJava).map(fromHivePartition) + val absoluteUri = shim.getDataLocation(hiveTable).map(stringToURI).filterNot(_.isAbsolute) + .map(_ => stringToURI(client.getDatabase(hiveTable.getDbName).getLocationUri)) + val parts = shim.getPartitions(client, hiveTable, partSpec.asJava) + .map(fromHivePartition(_, absoluteUri)) HiveCatalogMetrics.incrementFetchedPartitions(parts.length) parts.toSeq } @@ -763,7 +776,7 @@ private[hive] class HiveClientImpl( predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState { val hiveTable = toHiveTable(table, Some(userName)) val parts = shim.getPartitionsByFilter(client, hiveTable, predicates, table) - .map(fromHivePartition) + .map(fromHivePartition(_, table.storage.locationUri)) HiveCatalogMetrics.incrementFetchedPartitions(parts.length) parts } @@ -1144,7 +1157,7 @@ private[hive] object HiveClientImpl extends Logging { /** * Build the native partition metadata from Hive's Partition. */ - def fromHivePartition(hp: HivePartition): CatalogTablePartition = { + def fromHivePartition(hp: HivePartition, absoluteUri: Option[URI]): CatalogTablePartition = { val apiPartition = hp.getTPartition val properties: Map[String, String] = if (hp.getParameters != null) { hp.getParameters.asScala.toMap @@ -1154,7 +1167,8 @@ private[hive] object HiveClientImpl extends Logging { CatalogTablePartition( spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty), storage = CatalogStorageFormat( - locationUri = Option(CatalogUtils.stringToURI(apiPartition.getSd.getLocation)), + locationUri = Option(HiveExternalCatalog.toAbsoluteURI( + stringToURI(apiPartition.getSd.getLocation), absoluteUri)), inputFormat = Option(apiPartition.getSd.getInputFormat), outputFormat = Option(apiPartition.getSd.getOutputFormat), serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index e413e0ee73cb..7c36198c326b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -200,4 +200,19 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { assert(alteredTable.provider === Some("foo")) }) } + + test("SPARK-39203: Rewrite table location to absolute location based on database location") { + val tableLocation1 = CatalogUtils.stringToURI("/user/hive/warehouse/t1") + val tableLocation2 = CatalogUtils.stringToURI("viewfs://clusterB/user/hive/warehouse/t2") + val dbLocation = CatalogUtils.stringToURI("viewfs://clusterA/user/hive/warehouse/") + + assert(HiveExternalCatalog.toAbsoluteURI(tableLocation1, Some(dbLocation)) + .equals(CatalogUtils.stringToURI("viewfs://clusterA/user/hive/warehouse/t1"))) + + assert(HiveExternalCatalog.toAbsoluteURI(tableLocation1, None) + .equals(tableLocation1)) + + assert(HiveExternalCatalog.toAbsoluteURI(tableLocation2, Some(dbLocation)) + .equals(tableLocation2)) + } }