diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 90e8f9b9d0ee..242756ff205d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.hive import java.io.IOException import java.lang.reflect.InvocationTargetException -import java.net.URI import java.util import java.util.Locale @@ -851,15 +850,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // source tables. Here we set the table location to `locationUri` field and filter out the // path option in storage properties, to avoid exposing this concept externally. val storageWithLocation = { - val tableLocation = getLocationFromStorageProps(table).map { path => - // Before SPARK-19257, created data source table does not use absolute uri. - // This makes Spark can't read these tables across HDFS clusters. - // Rewrite table path to absolute uri based on location uri (The location uri has been - // rewritten by HiveClientImpl.convertHiveTableToCatalogTable) to fix this issue. - toAbsoluteURI(CatalogUtils.stringToURI(path), table.storage.locationUri) - } + val tableLocation = getLocationFromStorageProps(table) // We pass None as `newPath` here, to remove the path option in storage properties. - updateLocationInStorageProps(table, newPath = None).copy(locationUri = tableLocation) + updateLocationInStorageProps(table, newPath = None).copy( + locationUri = tableLocation.map(CatalogUtils.stringToURI(_))) } val storageWithoutHiveGeneratedProperties = storageWithLocation.copy(properties = storageWithLocation.properties.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap) @@ -1446,19 +1440,4 @@ object HiveExternalCatalog { isHiveCompatibleDataType(m.keyType) && isHiveCompatibleDataType(m.valueType) case _ => true } - - /** Rewrite uri to absolute location. For example: - * uri: /user/hive/warehouse/test_table - * absoluteUri: viewfs://clusterA/user/hive/warehouse/ - * The result is: viewfs://clusterA/user/hive/warehouse/test_table - */ - private[spark] def toAbsoluteURI(uri: URI, absoluteUri: Option[URI]): URI = { - if (!uri.isAbsolute && absoluteUri.isDefined) { - val aUri = absoluteUri.get - new URI(aUri.getScheme, aUri.getUserInfo, aUri.getHost, aUri.getPort, - uri.getPath, uri.getQuery, uri.getFragment) - } else { - uri - } - } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 92c3ca0ab3ed..becca8eae5ef 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.hive.client import java.io.PrintStream import java.lang.{Iterable => JIterable} import java.lang.reflect.InvocationTargetException -import java.net.URI import java.nio.charset.StandardCharsets.UTF_8 import java.util.{HashMap => JHashMap, Locale, Map => JMap} import java.util.concurrent.TimeUnit._ @@ -54,7 +53,6 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, NoSuchTableException, PartitionsAlreadyExistException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.catalog.CatalogUtils.stringToURI import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.catalyst.util.CharVarcharUtils @@ -539,21 +537,7 @@ private[hive] class HiveClientImpl( createTime = h.getTTable.getCreateTime.toLong * 1000, lastAccessTime = h.getLastAccessTime.toLong * 1000, storage = CatalogStorageFormat( - locationUri = shim.getDataLocation(h).map { loc => - val tableUri = stringToURI(loc) - if (h.getTableType == HiveTableType.VIRTUAL_VIEW) { - // Data location of SQL view is useless. Do not qualify it even if it's present, as - // it can be an invalid path. - tableUri - } else { - // Before SPARK-19257, created data source table does not use absolute uri. - // This makes Spark can't read these tables across HDFS clusters. - // Rewrite table location to absolute uri based on database uri to fix this issue. - val absoluteUri = Option(tableUri).filterNot(_.isAbsolute) - .map(_ => stringToURI(client.getDatabase(h.getDbName).getLocationUri)) - HiveExternalCatalog.toAbsoluteURI(tableUri, absoluteUri) - } - }, + locationUri = shim.getDataLocation(h).map(CatalogUtils.stringToURI), // To avoid ClassNotFound exception, we try our best to not get the format class, but get // the class name directly. However, for non-native tables, there is no interface to get // the format class name, so we may still throw ClassNotFound in this case. @@ -793,8 +777,7 @@ private[hive] class HiveClientImpl( spec: TablePartitionSpec): Option[CatalogTablePartition] = withHiveState { val hiveTable = rawHiveTable.rawTable.asInstanceOf[HiveTable] val hivePartition = shim.getPartition(client, hiveTable, spec.asJava, false) - Option(hivePartition) - .map(fromHivePartition(_, rawHiveTable.toCatalogTable.storage.locationUri)) + Option(hivePartition).map(fromHivePartition) } override def getPartitions( @@ -816,10 +799,7 @@ private[hive] class HiveClientImpl( assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid") s } - val absoluteUri = shim.getDataLocation(hiveTable).map(stringToURI).filterNot(_.isAbsolute) - .map(_ => stringToURI(client.getDatabase(hiveTable.getDbName).getLocationUri)) - val parts = shim.getPartitions(client, hiveTable, partSpec.asJava) - .map(fromHivePartition(_, absoluteUri)) + val parts = shim.getPartitions(client, hiveTable, partSpec.asJava).map(fromHivePartition) HiveCatalogMetrics.incrementFetchedPartitions(parts.length) parts } @@ -829,9 +809,8 @@ private[hive] class HiveClientImpl( predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState { val hiveTable = rawHiveTable.rawTable.asInstanceOf[HiveTable] hiveTable.setOwner(userName) - val parts = - shim.getPartitionsByFilter(client, hiveTable, predicates, rawHiveTable.toCatalogTable) - .map(fromHivePartition(_, rawHiveTable.toCatalogTable.storage.locationUri)) + val parts = shim.getPartitionsByFilter( + client, hiveTable, predicates, rawHiveTable.toCatalogTable).map(fromHivePartition) HiveCatalogMetrics.incrementFetchedPartitions(parts.length) parts } @@ -1212,7 +1191,7 @@ private[hive] object HiveClientImpl extends Logging { /** * Build the native partition metadata from Hive's Partition. */ - def fromHivePartition(hp: HivePartition, absoluteUri: Option[URI]): CatalogTablePartition = { + def fromHivePartition(hp: HivePartition): CatalogTablePartition = { val apiPartition = hp.getTPartition val properties: Map[String, String] = if (hp.getParameters != null) { hp.getParameters.asScala.toMap @@ -1222,8 +1201,7 @@ private[hive] object HiveClientImpl extends Logging { CatalogTablePartition( spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty), storage = CatalogStorageFormat( - locationUri = Option(HiveExternalCatalog.toAbsoluteURI( - stringToURI(apiPartition.getSd.getLocation), absoluteUri)), + locationUri = Option(CatalogUtils.stringToURI(apiPartition.getSd.getLocation)), inputFormat = Option(apiPartition.getSd.getInputFormat), outputFormat = Option(apiPartition.getSd.getOutputFormat), serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index 7c36198c326b..e413e0ee73cb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -200,19 +200,4 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { assert(alteredTable.provider === Some("foo")) }) } - - test("SPARK-39203: Rewrite table location to absolute location based on database location") { - val tableLocation1 = CatalogUtils.stringToURI("/user/hive/warehouse/t1") - val tableLocation2 = CatalogUtils.stringToURI("viewfs://clusterB/user/hive/warehouse/t2") - val dbLocation = CatalogUtils.stringToURI("viewfs://clusterA/user/hive/warehouse/") - - assert(HiveExternalCatalog.toAbsoluteURI(tableLocation1, Some(dbLocation)) - .equals(CatalogUtils.stringToURI("viewfs://clusterA/user/hive/warehouse/t1"))) - - assert(HiveExternalCatalog.toAbsoluteURI(tableLocation1, None) - .equals(tableLocation1)) - - assert(HiveExternalCatalog.toAbsoluteURI(tableLocation2, Some(dbLocation)) - .equals(tableLocation2)) - } }