diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index b699707d85235..1c42948748a3e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -544,6 +544,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ENABLED = + buildConf("spark.sql.hive.metastorePartitionPruning.fallback.enabled") + .doc("When true, enable fallback to fetch all partitions if Hive metastore partition " + + "push down fails. This is applicable only if partition pruning is enabled (see " + + s" ${HIVE_METASTORE_PARTITION_PRUNING.key}). Enabling this may degrade performance " + + "if there is a large number of partitions." ) + .booleanConf + .createWithDefault(false) + val HIVE_MANAGE_FILESOURCE_PARTITIONS = buildConf("spark.sql.hive.manageFilesourcePartitions") .doc("When true, enable metastore partition management for file source tables as well. " + @@ -1697,6 +1706,9 @@ class SQLConf extends Serializable with Logging { def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING) + def metastorePartitionPruningFallback: Boolean = + getConf(HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ENABLED) + def manageFilesourcePartitions: Boolean = getConf(HIVE_MANAGE_FILESOURCE_PARTITIONS) def filesourcePartitionFileCacheSize: Long = getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index bc9d4cd7f4181..83adaa1fd3ca0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -746,34 +746,43 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] } else { logDebug(s"Hive metastore filter is '$filter'.") - val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL - // We should get this config value from the metaStore. otherwise hit SPARK-18681. - // To be compatible with hive-0.12 and hive-0.13, In the future we can achieve this by: - // val tryDirectSql = hive.getMetaConf(tryDirectSqlConfVar.varname).toBoolean - val tryDirectSql = hive.getMSC.getConfigValue(tryDirectSqlConfVar.varname, - tryDirectSqlConfVar.defaultBoolVal.toString).toBoolean + val shouldFallback = SQLConf.get.metastorePartitionPruningFallback try { - // Hive may throw an exception when calling this method in some circumstances, such as - // when filtering on a non-string partition column when the hive config key - // hive.metastore.try.direct.sql is false getPartitionsByFilterMethod.invoke(hive, table, filter) .asInstanceOf[JArrayList[Partition]] } catch { - case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && - !tryDirectSql => - logWarning("Caught Hive MetaException attempting to get partition metadata by " + - "filter from Hive. Falling back to fetching all partition metadata, which will " + - "degrade performance. Modifying your Hive metastore configuration to set " + - s"${tryDirectSqlConfVar.varname} to true may resolve this problem.", ex) - // HiveShim clients are expected to handle a superset of the requested partitions - getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] - case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && - tryDirectSql => - throw new RuntimeException("Caught Hive MetaException attempting to get partition " + - "metadata by filter from Hive. You can set the Spark configuration setting " + - s"${SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key} to false to work around this " + - "problem, however this will result in degraded performance. Please report a bug: " + - "https://issues.apache.org/jira/browse/SPARK", ex) + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] => + if (shouldFallback) { + val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL + // We should get this config value from the metaStore. otherwise hit SPARK-18681. + // To be compatible with hive-0.12 and hive-0.13, In the future we can achieve this by + // val tryDirectSql = hive.getMetaConf(tryDirectSqlConfVar.varname).toBoolean + val tryDirectSql = hive.getMSC.getConfigValue(tryDirectSqlConfVar.varname, + tryDirectSqlConfVar.defaultBoolVal.toString).toBoolean + if (!tryDirectSql) { + logWarning("Caught Hive MetaException attempting to get partition metadata by " + + "filter from Hive. Falling back to fetching all partition metadata, which will " + + "degrade performance. Modifying your Hive metastore configuration to set " + + s"${tryDirectSqlConfVar.varname} to true may resolve this problem.") + } else { + logWarning("Caught Hive MetaException attempting to get partition metadata " + + "by filter from Hive. Hive metastore's direct SQL feature has been enabled, " + + "but it is an optimistic optimization and not guaranteed to work. Falling back " + + "to fetching all partition metadata, which will degrade performance (for the " + + "current query). If you see this error consistently, you can set the Spark " + + s"configuration setting ${SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key} to " + + "false as a work around, however this will result in degraded performance.") + } + // HiveShim clients are expected to handle a superset of the requested partitions + getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] + } else { + // Fallback mode has been disabled. Rethrow exception. + throw new RuntimeException("Caught Hive MetaException attempting to get partition " + + "metadata from Hive. Fallback mechanism is not enabled. You can set " + + s"${SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ENABLED} to true to fetch " + + "all partition metadata as a fallback mechanism, however this may result in " + + "degraded performance.", ex) + } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala index fa9f753795f65..581af68cdc959 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala @@ -24,13 +24,17 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.SQLHelper +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{BooleanType, IntegerType, LongType} // TODO: Refactor this to `HivePartitionFilteringSuite` class HiveClientSuite(version: String) - extends HiveVersionSuite(version) with BeforeAndAfterAll { + extends HiveVersionSuite(version) with BeforeAndAfterAll with SQLHelper { private val tryDirectSqlKey = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname + private val partPruningKey = SQLConf.HIVE_METASTORE_PARTITION_PRUNING.key + private val partPruningFallbackKey = SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ENABLED.key private val testPartitionCount = 3 * 24 * 4 @@ -79,12 +83,30 @@ class HiveClientSuite(version: String) client = init(true) } - test(s"getPartitionsByFilter returns all partitions when $tryDirectSqlKey=false") { - val client = init(false) - val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test"), - Seq(attr("ds") === 20170101)) + test(s"getPartitionsByFilter should return all partitions if the underlying call to the " + + s"metastore fails and $partPruningFallbackKey=true") { + withSQLConf(partPruningFallbackKey -> "true", partPruningKey -> "true") { + val client = init(false) + // tryDirectSql = false and a non-string partition filter will always fail. This condition + // is used to test if the fallback works + val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test"), + Seq(attr("ds") === 20170101)) - assert(filteredPartitions.size == testPartitionCount) + assert(filteredPartitions.size == testPartitionCount) + } + } + + test(s"getPartitionsByFilter should fail if the underlying call to the metastore fails and " + + s"$partPruningFallbackKey=false") { + withSQLConf(partPruningFallbackKey -> "false", partPruningKey -> "true") { + val client = init(false) + // tryDirectSql = false and a non-string partition filter will always fail. This condition + // is used to test if the fallback works + assertThrows[RuntimeException]( + client.getPartitionsByFilter(client.getTable("default", "test"), + Seq(attr("ds") === 20170101)) + ) + } } test("getPartitionsByFilter: ds<=>20170101") {