From 3aa1ffe109284c67513c792df1680b6dca19c77c Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sat, 21 May 2022 18:01:32 +0800 Subject: [PATCH 1/7] SPARK-39203: Rewrite table location to absolute location based on database location --- .../spark/sql/hive/HiveExternalCatalog.scala | 24 ++++++++++++++++--- .../sql/hive/client/HiveClientImpl.scala | 22 ++++++++++++----- .../sql/hive/HiveExternalCatalogSuite.scala | 16 +++++++++++++ 3 files changed, 53 insertions(+), 9 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index fefa032d3510..6c38dfa9e70e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -842,10 +842,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // source tables. Here we set the table location to `locationUri` field and filter out the // path option in storage properties, to avoid exposing this concept externally. val storageWithLocation = { - val tableLocation = getLocationFromStorageProps(table) + val tableLocation = getLocationFromStorageProps(table).map { loc => + HiveExternalCatalog.toAbsoluteURI( + CatalogUtils.stringToURI(loc), table.storage.locationUri) + } // We pass None as `newPath` here, to remove the path option in storage properties. - updateLocationInStorageProps(table, newPath = None).copy( - locationUri = tableLocation.map(CatalogUtils.stringToURI(_))) + updateLocationInStorageProps(table, newPath = None).copy(locationUri = tableLocation) } val storageWithoutHiveGeneratedProperties = storageWithLocation.copy(properties = storageWithLocation.properties.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap) @@ -1432,4 +1434,20 @@ object HiveExternalCatalog { isHiveCompatibleDataType(m.keyType) && isHiveCompatibleDataType(m.valueType) case _ => true } + + /** + * Rewrite uri to absolute location. For example: + * uri: /user/hive/warehouse/test_table + * parentUri: viewfs://clusterA/user/hive/warehouse/ + * The result is: viewfs://clusterA/user/hive/warehouse/test_table + */ + private[spark] def toAbsoluteURI(uri: URI, parentUri: Option[URI]): URI = { + if (!uri.isAbsolute && parentUri.exists(_.isAbsolute)) { + val pUri = parentUri.get + new URI(pUri.getScheme, pUri.getUserInfo, pUri.getHost, pUri.getPort, uri.getPath, + uri.getQuery, uri.getFragment) + } else { + uri + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index d70ac781c039..b9d0931a414c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -518,7 +518,15 @@ private[hive] class HiveClientImpl( createTime = h.getTTable.getCreateTime.toLong * 1000, lastAccessTime = h.getLastAccessTime.toLong * 1000, storage = CatalogStorageFormat( - locationUri = shim.getDataLocation(h).map(CatalogUtils.stringToURI), + locationUri = shim.getDataLocation(h).map { location => + val tableUri = CatalogUtils.stringToURI(location) + val dbUri = if (!tableUri.isAbsolute) { + Some(CatalogUtils.stringToURI(client.getDatabase(h.getDbName).getLocationUri)) + } else { + None + } + HiveExternalCatalog.toAbsoluteURI(tableUri, dbUri) + }, // To avoid ClassNotFound exception, we try our best to not get the format class, but get // the class name directly. However, for non-native tables, there is no interface to get // the format class name, so we may still throw ClassNotFound in this case. @@ -731,7 +739,7 @@ private[hive] class HiveClientImpl( spec: TablePartitionSpec): Option[CatalogTablePartition] = withHiveState { val hiveTable = toHiveTable(table, Some(userName)) val hivePartition = shim.getPartition(client, hiveTable, spec.asJava, false) - Option(hivePartition).map(fromHivePartition) + Option(hivePartition).map(fromHivePartition(_, table)) } override def getPartitions( @@ -753,7 +761,8 @@ private[hive] class HiveClientImpl( assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid") s } - val parts = shim.getPartitions(client, hiveTable, partSpec.asJava).map(fromHivePartition) + val parts = shim.getPartitions(client, hiveTable, partSpec.asJava) + .map(fromHivePartition(_, convertHiveTableToCatalogTable(hiveTable))) HiveCatalogMetrics.incrementFetchedPartitions(parts.length) parts.toSeq } @@ -763,7 +772,7 @@ private[hive] class HiveClientImpl( predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState { val hiveTable = toHiveTable(table, Some(userName)) val parts = shim.getPartitionsByFilter(client, hiveTable, predicates, table) - .map(fromHivePartition) + .map(fromHivePartition(_, table)) HiveCatalogMetrics.incrementFetchedPartitions(parts.length) parts } @@ -1144,7 +1153,7 @@ private[hive] object HiveClientImpl extends Logging { /** * Build the native partition metadata from Hive's Partition. */ - def fromHivePartition(hp: HivePartition): CatalogTablePartition = { + def fromHivePartition(hp: HivePartition, table: CatalogTable): CatalogTablePartition = { val apiPartition = hp.getTPartition val properties: Map[String, String] = if (hp.getParameters != null) { hp.getParameters.asScala.toMap @@ -1154,7 +1163,8 @@ private[hive] object HiveClientImpl extends Logging { CatalogTablePartition( spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty), storage = CatalogStorageFormat( - locationUri = Option(CatalogUtils.stringToURI(apiPartition.getSd.getLocation)), + locationUri = Option(HiveExternalCatalog.toAbsoluteURI( + CatalogUtils.stringToURI(apiPartition.getSd.getLocation), Some(table.location))), inputFormat = Option(apiPartition.getSd.getInputFormat), outputFormat = Option(apiPartition.getSd.getOutputFormat), serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index e413e0ee73cb..2d53aa6d98b3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -200,4 +200,20 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { assert(alteredTable.provider === Some("foo")) }) } + + test("SPARK-39203: Rewrite table location to absolute location based on database location") { + val tableLocation1 = CatalogUtils.stringToURI("/user/hive/warehouse/t1") + val tableLocation2 = CatalogUtils.stringToURI("viewfs://clusterB/user/hive/warehouse/t2") + + val dbLocation = CatalogUtils.stringToURI("viewfs://clusterA/user/hive/warehouse/") + + assert(HiveExternalCatalog.toAbsoluteURI(tableLocation1, Some(dbLocation)) + .equals(CatalogUtils.stringToURI("viewfs://clusterA/user/hive/warehouse/t1"))) + + assert(HiveExternalCatalog.toAbsoluteURI(tableLocation1, None) + .equals(tableLocation1)) + + assert(HiveExternalCatalog.toAbsoluteURI(tableLocation2, Some(dbLocation)) + .equals(tableLocation2)) + } } From ef049b9e2c06a11354622405270d9a1d76ddcdc6 Mon Sep 17 00:00:00 2001 From: yumwang Date: Thu, 10 Sep 2020 11:07:17 +0800 Subject: [PATCH 2/7] fix --- .../scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 6c38dfa9e70e..8e4d64edccf6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive import java.io.IOException import java.lang.reflect.InvocationTargetException +import java.net.URI import java.util import java.util.Locale From ea7e42c8e97d77d25fec735c7a74fdd5be4a84e8 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Tue, 24 May 2022 14:51:41 +0800 Subject: [PATCH 3/7] Simplify --- .../apache/spark/sql/hive/HiveExternalCatalog.scala | 13 ++++++------- .../spark/sql/hive/client/HiveClientImpl.scala | 10 +++++----- .../spark/sql/hive/HiveExternalCatalogSuite.scala | 9 ++------- 3 files changed, 13 insertions(+), 19 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 8e4d64edccf6..e869b50793aa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -844,8 +844,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // path option in storage properties, to avoid exposing this concept externally. val storageWithLocation = { val tableLocation = getLocationFromStorageProps(table).map { loc => - HiveExternalCatalog.toAbsoluteURI( - CatalogUtils.stringToURI(loc), table.storage.locationUri) + val uri = CatalogUtils.stringToURI(loc) + table.storage.locationUri.map(HiveExternalCatalog.toAbsoluteURI(uri, _)).getOrElse(uri) } // We pass None as `newPath` here, to remove the path option in storage properties. updateLocationInStorageProps(table, newPath = None).copy(locationUri = tableLocation) @@ -1442,11 +1442,10 @@ object HiveExternalCatalog { * parentUri: viewfs://clusterA/user/hive/warehouse/ * The result is: viewfs://clusterA/user/hive/warehouse/test_table */ - private[spark] def toAbsoluteURI(uri: URI, parentUri: Option[URI]): URI = { - if (!uri.isAbsolute && parentUri.exists(_.isAbsolute)) { - val pUri = parentUri.get - new URI(pUri.getScheme, pUri.getUserInfo, pUri.getHost, pUri.getPort, uri.getPath, - uri.getQuery, uri.getFragment) + private[spark] def toAbsoluteURI(uri: URI, parentUri: URI): URI = { + if (!uri.isAbsolute && parentUri.isAbsolute) { + new URI(parentUri.getScheme, parentUri.getUserInfo, parentUri.getHost, parentUri.getPort, + uri.getPath, uri.getQuery, uri.getFragment) } else { uri } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index b9d0931a414c..03ee1651f702 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -520,12 +520,12 @@ private[hive] class HiveClientImpl( storage = CatalogStorageFormat( locationUri = shim.getDataLocation(h).map { location => val tableUri = CatalogUtils.stringToURI(location) - val dbUri = if (!tableUri.isAbsolute) { - Some(CatalogUtils.stringToURI(client.getDatabase(h.getDbName).getLocationUri)) + if (!tableUri.isAbsolute) { + val dbUri = CatalogUtils.stringToURI(client.getDatabase(h.getDbName).getLocationUri) + HiveExternalCatalog.toAbsoluteURI(tableUri, dbUri) } else { - None + tableUri } - HiveExternalCatalog.toAbsoluteURI(tableUri, dbUri) }, // To avoid ClassNotFound exception, we try our best to not get the format class, but get // the class name directly. However, for non-native tables, there is no interface to get @@ -1164,7 +1164,7 @@ private[hive] object HiveClientImpl extends Logging { spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty), storage = CatalogStorageFormat( locationUri = Option(HiveExternalCatalog.toAbsoluteURI( - CatalogUtils.stringToURI(apiPartition.getSd.getLocation), Some(table.location))), + CatalogUtils.stringToURI(apiPartition.getSd.getLocation), table.location)), inputFormat = Option(apiPartition.getSd.getInputFormat), outputFormat = Option(apiPartition.getSd.getOutputFormat), serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index 2d53aa6d98b3..5858a8b40ffa 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -204,16 +204,11 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { test("SPARK-39203: Rewrite table location to absolute location based on database location") { val tableLocation1 = CatalogUtils.stringToURI("/user/hive/warehouse/t1") val tableLocation2 = CatalogUtils.stringToURI("viewfs://clusterB/user/hive/warehouse/t2") - val dbLocation = CatalogUtils.stringToURI("viewfs://clusterA/user/hive/warehouse/") - assert(HiveExternalCatalog.toAbsoluteURI(tableLocation1, Some(dbLocation)) + assert(HiveExternalCatalog.toAbsoluteURI(tableLocation1, dbLocation) .equals(CatalogUtils.stringToURI("viewfs://clusterA/user/hive/warehouse/t1"))) - - assert(HiveExternalCatalog.toAbsoluteURI(tableLocation1, None) - .equals(tableLocation1)) - - assert(HiveExternalCatalog.toAbsoluteURI(tableLocation2, Some(dbLocation)) + assert(HiveExternalCatalog.toAbsoluteURI(tableLocation2, dbLocation) .equals(tableLocation2)) } } From 2a3e5111edd34a644ae09fad0df1df0cc0e90b07 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Fri, 27 May 2022 14:11:50 +0800 Subject: [PATCH 4/7] Fix --- .../spark/sql/hive/HiveExternalCatalog.scala | 20 +++++++------ .../sql/hive/client/HiveClientImpl.scala | 28 +++++++++++-------- .../sql/hive/HiveExternalCatalogSuite.scala | 8 ++++-- 3 files changed, 35 insertions(+), 21 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index e869b50793aa..a84262b1c04f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -843,9 +843,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // source tables. Here we set the table location to `locationUri` field and filter out the // path option in storage properties, to avoid exposing this concept externally. val storageWithLocation = { - val tableLocation = getLocationFromStorageProps(table).map { loc => - val uri = CatalogUtils.stringToURI(loc) - table.storage.locationUri.map(HiveExternalCatalog.toAbsoluteURI(uri, _)).getOrElse(uri) + val tableLocation = getLocationFromStorageProps(table).map { path => + toAbsoluteURI(CatalogUtils.stringToURI(path), table.storage.locationUri) } // We pass None as `newPath` here, to remove the path option in storage properties. updateLocationInStorageProps(table, newPath = None).copy(locationUri = tableLocation) @@ -1442,12 +1441,17 @@ object HiveExternalCatalog { * parentUri: viewfs://clusterA/user/hive/warehouse/ * The result is: viewfs://clusterA/user/hive/warehouse/test_table */ - private[spark] def toAbsoluteURI(uri: URI, parentUri: URI): URI = { - if (!uri.isAbsolute && parentUri.isAbsolute) { - new URI(parentUri.getScheme, parentUri.getUserInfo, parentUri.getHost, parentUri.getPort, - uri.getPath, uri.getQuery, uri.getFragment) - } else { + private[spark] def toAbsoluteURI(uri: URI, parentUri: Option[URI]): URI = { + if (uri.isAbsolute) { uri + } else { + parentUri match { + case Some(pUri) if pUri.isAbsolute => + new URI(pUri.getScheme, pUri.getUserInfo, pUri.getHost, pUri.getPort, + uri.getPath, uri.getQuery, uri.getFragment) + case _ => + uri + } } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 03ee1651f702..396d730d0a64 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.client import java.io.PrintStream import java.lang.{Iterable => JIterable} import java.lang.reflect.InvocationTargetException +import java.net.URI import java.nio.charset.StandardCharsets.UTF_8 import java.util.{Locale, Map => JMap} import java.util.concurrent.TimeUnit._ @@ -518,14 +519,14 @@ private[hive] class HiveClientImpl( createTime = h.getTTable.getCreateTime.toLong * 1000, lastAccessTime = h.getLastAccessTime.toLong * 1000, storage = CatalogStorageFormat( - locationUri = shim.getDataLocation(h).map { location => - val tableUri = CatalogUtils.stringToURI(location) - if (!tableUri.isAbsolute) { - val dbUri = CatalogUtils.stringToURI(client.getDatabase(h.getDbName).getLocationUri) - HiveExternalCatalog.toAbsoluteURI(tableUri, dbUri) + locationUri = shim.getDataLocation(h).map { loc => + val tableUri = CatalogUtils.stringToURI(loc) + val parentUri = if (tableUri.isAbsolute) { + None } else { - tableUri + Some(CatalogUtils.stringToURI(client.getDatabase(h.getDbName).getLocationUri)) } + HiveExternalCatalog.toAbsoluteURI(tableUri, parentUri) }, // To avoid ClassNotFound exception, we try our best to not get the format class, but get // the class name directly. However, for non-native tables, there is no interface to get @@ -739,7 +740,7 @@ private[hive] class HiveClientImpl( spec: TablePartitionSpec): Option[CatalogTablePartition] = withHiveState { val hiveTable = toHiveTable(table, Some(userName)) val hivePartition = shim.getPartition(client, hiveTable, spec.asJava, false) - Option(hivePartition).map(fromHivePartition(_, table)) + Option(hivePartition).map(fromHivePartition(_, table.storage.locationUri)) } override def getPartitions( @@ -761,8 +762,13 @@ private[hive] class HiveClientImpl( assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid") s } + val parentUri = if (hiveTable.getDataLocation.toUri.isAbsolute) { + None + } else { + convertHiveTableToCatalogTable(hiveTable).storage.locationUri + } val parts = shim.getPartitions(client, hiveTable, partSpec.asJava) - .map(fromHivePartition(_, convertHiveTableToCatalogTable(hiveTable))) + .map(fromHivePartition(_, parentUri)) HiveCatalogMetrics.incrementFetchedPartitions(parts.length) parts.toSeq } @@ -772,7 +778,7 @@ private[hive] class HiveClientImpl( predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState { val hiveTable = toHiveTable(table, Some(userName)) val parts = shim.getPartitionsByFilter(client, hiveTable, predicates, table) - .map(fromHivePartition(_, table)) + .map(fromHivePartition(_, table.storage.locationUri)) HiveCatalogMetrics.incrementFetchedPartitions(parts.length) parts } @@ -1153,7 +1159,7 @@ private[hive] object HiveClientImpl extends Logging { /** * Build the native partition metadata from Hive's Partition. */ - def fromHivePartition(hp: HivePartition, table: CatalogTable): CatalogTablePartition = { + def fromHivePartition(hp: HivePartition, parentUri: Option[URI]): CatalogTablePartition = { val apiPartition = hp.getTPartition val properties: Map[String, String] = if (hp.getParameters != null) { hp.getParameters.asScala.toMap @@ -1164,7 +1170,7 @@ private[hive] object HiveClientImpl extends Logging { spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty), storage = CatalogStorageFormat( locationUri = Option(HiveExternalCatalog.toAbsoluteURI( - CatalogUtils.stringToURI(apiPartition.getSd.getLocation), table.location)), + CatalogUtils.stringToURI(apiPartition.getSd.getLocation), parentUri)), inputFormat = Option(apiPartition.getSd.getInputFormat), outputFormat = Option(apiPartition.getSd.getOutputFormat), serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index 5858a8b40ffa..7c36198c326b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -206,9 +206,13 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { val tableLocation2 = CatalogUtils.stringToURI("viewfs://clusterB/user/hive/warehouse/t2") val dbLocation = CatalogUtils.stringToURI("viewfs://clusterA/user/hive/warehouse/") - assert(HiveExternalCatalog.toAbsoluteURI(tableLocation1, dbLocation) + assert(HiveExternalCatalog.toAbsoluteURI(tableLocation1, Some(dbLocation)) .equals(CatalogUtils.stringToURI("viewfs://clusterA/user/hive/warehouse/t1"))) - assert(HiveExternalCatalog.toAbsoluteURI(tableLocation2, dbLocation) + + assert(HiveExternalCatalog.toAbsoluteURI(tableLocation1, None) + .equals(tableLocation1)) + + assert(HiveExternalCatalog.toAbsoluteURI(tableLocation2, Some(dbLocation)) .equals(tableLocation2)) } } From 23c7e68e45b61b22693348c1332d5d47ddc7c2eb Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Fri, 27 May 2022 15:36:05 +0800 Subject: [PATCH 5/7] Address comments --- .../spark/sql/hive/HiveExternalCatalog.scala | 16 ++++++---------- .../spark/sql/hive/client/HiveClientImpl.scala | 14 +++++++------- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index a84262b1c04f..cf27d779e21f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -1441,17 +1441,13 @@ object HiveExternalCatalog { * parentUri: viewfs://clusterA/user/hive/warehouse/ * The result is: viewfs://clusterA/user/hive/warehouse/test_table */ - private[spark] def toAbsoluteURI(uri: URI, parentUri: Option[URI]): URI = { - if (uri.isAbsolute) { - uri + private[spark] def toAbsoluteURI(uri: URI, absoluteUri: Option[URI]): URI = { + if (!uri.isAbsolute && absoluteUri.isDefined) { + val aUri = absoluteUri.get + new URI(aUri.getScheme, aUri.getUserInfo, aUri.getHost, aUri.getPort, + uri.getPath, uri.getQuery, uri.getFragment) } else { - parentUri match { - case Some(pUri) if pUri.isAbsolute => - new URI(pUri.getScheme, pUri.getUserInfo, pUri.getHost, pUri.getPort, - uri.getPath, uri.getQuery, uri.getFragment) - case _ => - uri - } + uri } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 396d730d0a64..e6c06c85b867 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -521,12 +521,12 @@ private[hive] class HiveClientImpl( storage = CatalogStorageFormat( locationUri = shim.getDataLocation(h).map { loc => val tableUri = CatalogUtils.stringToURI(loc) - val parentUri = if (tableUri.isAbsolute) { + val absoluteUri = if (tableUri.isAbsolute) { None } else { Some(CatalogUtils.stringToURI(client.getDatabase(h.getDbName).getLocationUri)) } - HiveExternalCatalog.toAbsoluteURI(tableUri, parentUri) + HiveExternalCatalog.toAbsoluteURI(tableUri, absoluteUri) }, // To avoid ClassNotFound exception, we try our best to not get the format class, but get // the class name directly. However, for non-native tables, there is no interface to get @@ -762,13 +762,13 @@ private[hive] class HiveClientImpl( assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid") s } - val parentUri = if (hiveTable.getDataLocation.toUri.isAbsolute) { + val absoluteUri = if (hiveTable.getDataLocation.toUri.isAbsolute) { None } else { - convertHiveTableToCatalogTable(hiveTable).storage.locationUri + Some(CatalogUtils.stringToURI(client.getDatabase(hiveTable.getDbName).getLocationUri)) } val parts = shim.getPartitions(client, hiveTable, partSpec.asJava) - .map(fromHivePartition(_, parentUri)) + .map(fromHivePartition(_, absoluteUri)) HiveCatalogMetrics.incrementFetchedPartitions(parts.length) parts.toSeq } @@ -1159,7 +1159,7 @@ private[hive] object HiveClientImpl extends Logging { /** * Build the native partition metadata from Hive's Partition. */ - def fromHivePartition(hp: HivePartition, parentUri: Option[URI]): CatalogTablePartition = { + def fromHivePartition(hp: HivePartition, absoluteUri: Option[URI]): CatalogTablePartition = { val apiPartition = hp.getTPartition val properties: Map[String, String] = if (hp.getParameters != null) { hp.getParameters.asScala.toMap @@ -1170,7 +1170,7 @@ private[hive] object HiveClientImpl extends Logging { spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty), storage = CatalogStorageFormat( locationUri = Option(HiveExternalCatalog.toAbsoluteURI( - CatalogUtils.stringToURI(apiPartition.getSd.getLocation), parentUri)), + CatalogUtils.stringToURI(apiPartition.getSd.getLocation), absoluteUri)), inputFormat = Option(apiPartition.getSd.getInputFormat), outputFormat = Option(apiPartition.getSd.getOutputFormat), serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib), From 6b944193dcf1b9a33123a3e959ebbe5cf4e49c90 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sat, 28 May 2022 11:06:11 +0800 Subject: [PATCH 6/7] fix: [info] - 0.12: getPartitionNames(catalogTable) (74 milliseconds) [info] org.apache.spark.sql.hive.client.HiveClientSuites *** ABORTED *** (13 seconds, 434 milliseconds) [info] java.lang.NoSuchMethodError: org.apache.hadoop.hive.ql.metadata.Table.getDataLocation()Lorg/apache/hadoop/fs/Path; [info] at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$getPartitions$3(HiveClientImpl.scala:765) --- .../spark/sql/hive/HiveExternalCatalog.scala | 10 ++++------ .../sql/hive/client/HiveClientImpl.scala | 19 +++++++------------ 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index cf27d779e21f..5a3eb420fb9f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -1435,12 +1435,10 @@ object HiveExternalCatalog { case _ => true } - /** - * Rewrite uri to absolute location. For example: - * uri: /user/hive/warehouse/test_table - * parentUri: viewfs://clusterA/user/hive/warehouse/ - * The result is: viewfs://clusterA/user/hive/warehouse/test_table - */ + // Rewrite uri to absolute location. For example: + // uri: /user/hive/warehouse/test_table + // absoluteUri: viewfs://clusterA/user/hive/warehouse/ + // The result is: viewfs://clusterA/user/hive/warehouse/test_table private[spark] def toAbsoluteURI(uri: URI, absoluteUri: Option[URI]): URI = { if (!uri.isAbsolute && absoluteUri.isDefined) { val aUri = absoluteUri.get diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index e6c06c85b867..62f2c13d8ce8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -53,6 +53,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, NoSuchTableException, PartitionAlreadyExistsException, PartitionsAlreadyExistException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.catalog.CatalogUtils.stringToURI import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.catalyst.util.CharVarcharUtils @@ -520,12 +521,9 @@ private[hive] class HiveClientImpl( lastAccessTime = h.getLastAccessTime.toLong * 1000, storage = CatalogStorageFormat( locationUri = shim.getDataLocation(h).map { loc => - val tableUri = CatalogUtils.stringToURI(loc) - val absoluteUri = if (tableUri.isAbsolute) { - None - } else { - Some(CatalogUtils.stringToURI(client.getDatabase(h.getDbName).getLocationUri)) - } + val tableUri = stringToURI(loc) + val absoluteUri = Option(tableUri).filter(!_.isAbsolute) + .map(_ => stringToURI(client.getDatabase(h.getDbName).getLocationUri)) HiveExternalCatalog.toAbsoluteURI(tableUri, absoluteUri) }, // To avoid ClassNotFound exception, we try our best to not get the format class, but get @@ -762,11 +760,8 @@ private[hive] class HiveClientImpl( assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid") s } - val absoluteUri = if (hiveTable.getDataLocation.toUri.isAbsolute) { - None - } else { - Some(CatalogUtils.stringToURI(client.getDatabase(hiveTable.getDbName).getLocationUri)) - } + val absoluteUri = shim.getDataLocation(hiveTable).map(stringToURI).filter(!_.isAbsolute) + .map(_ => stringToURI(client.getDatabase(hiveTable.getDbName).getLocationUri)) val parts = shim.getPartitions(client, hiveTable, partSpec.asJava) .map(fromHivePartition(_, absoluteUri)) HiveCatalogMetrics.incrementFetchedPartitions(parts.length) @@ -1170,7 +1165,7 @@ private[hive] object HiveClientImpl extends Logging { spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty), storage = CatalogStorageFormat( locationUri = Option(HiveExternalCatalog.toAbsoluteURI( - CatalogUtils.stringToURI(apiPartition.getSd.getLocation), absoluteUri)), + stringToURI(apiPartition.getSd.getLocation), absoluteUri)), inputFormat = Option(apiPartition.getSd.getInputFormat), outputFormat = Option(apiPartition.getSd.getOutputFormat), serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib), From facef7d6f683bf7d008594333e0718739bd207b6 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Tue, 31 May 2022 12:30:53 +0800 Subject: [PATCH 7/7] fix --- .../apache/spark/sql/hive/HiveExternalCatalog.scala | 13 +++++++++---- .../spark/sql/hive/client/HiveClientImpl.scala | 7 +++++-- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 5a3eb420fb9f..32e453911799 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -844,6 +844,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // path option in storage properties, to avoid exposing this concept externally. val storageWithLocation = { val tableLocation = getLocationFromStorageProps(table).map { path => + // Before SPARK-19257, created data source table does not use absolute uri. + // This makes Spark can't read these tables across HDFS clusters. + // Rewrite table path to absolute uri based on location uri (The location uri has been + // rewritten by HiveClientImpl.convertHiveTableToCatalogTable) to fix this issue. toAbsoluteURI(CatalogUtils.stringToURI(path), table.storage.locationUri) } // We pass None as `newPath` here, to remove the path option in storage properties. @@ -1435,10 +1439,11 @@ object HiveExternalCatalog { case _ => true } - // Rewrite uri to absolute location. For example: - // uri: /user/hive/warehouse/test_table - // absoluteUri: viewfs://clusterA/user/hive/warehouse/ - // The result is: viewfs://clusterA/user/hive/warehouse/test_table + /** Rewrite uri to absolute location. For example: + * uri: /user/hive/warehouse/test_table + * absoluteUri: viewfs://clusterA/user/hive/warehouse/ + * The result is: viewfs://clusterA/user/hive/warehouse/test_table + */ private[spark] def toAbsoluteURI(uri: URI, absoluteUri: Option[URI]): URI = { if (!uri.isAbsolute && absoluteUri.isDefined) { val aUri = absoluteUri.get diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 62f2c13d8ce8..7047166413f7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -522,7 +522,10 @@ private[hive] class HiveClientImpl( storage = CatalogStorageFormat( locationUri = shim.getDataLocation(h).map { loc => val tableUri = stringToURI(loc) - val absoluteUri = Option(tableUri).filter(!_.isAbsolute) + // Before SPARK-19257, created data source table does not use absolute uri. + // This makes Spark can't read these tables across HDFS clusters. + // Rewrite table location to absolute uri based on database uri to fix this issue. + val absoluteUri = Option(tableUri).filterNot(_.isAbsolute) .map(_ => stringToURI(client.getDatabase(h.getDbName).getLocationUri)) HiveExternalCatalog.toAbsoluteURI(tableUri, absoluteUri) }, @@ -760,7 +763,7 @@ private[hive] class HiveClientImpl( assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid") s } - val absoluteUri = shim.getDataLocation(hiveTable).map(stringToURI).filter(!_.isAbsolute) + val absoluteUri = shim.getDataLocation(hiveTable).map(stringToURI).filterNot(_.isAbsolute) .map(_ => stringToURI(client.getDatabase(hiveTable.getDbName).getLocationUri)) val parts = shim.getPartitions(client, hiveTable, partSpec.asJava) .map(fromHivePartition(_, absoluteUri))