Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,15 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ENABLED =
buildConf("spark.sql.hive.metastorePartitionPruning.fallback.enabled")
.doc("When true, enable fallback to fetch all partitions if Hive metastore partition " +
"push down fails. This is applicable only if partition pruning is enabled (see " +
s" ${HIVE_METASTORE_PARTITION_PRUNING.key}). Enabling this may degrade performance " +
"if there is a large number of partitions." )
.booleanConf
.createWithDefault(false)

val HIVE_MANAGE_FILESOURCE_PARTITIONS =
buildConf("spark.sql.hive.manageFilesourcePartitions")
.doc("When true, enable metastore partition management for file source tables as well. " +
Expand Down Expand Up @@ -1697,6 +1706,9 @@ class SQLConf extends Serializable with Logging {

def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)

def metastorePartitionPruningFallback: Boolean =
getConf(HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ENABLED)

def manageFilesourcePartitions: Boolean = getConf(HIVE_MANAGE_FILESOURCE_PARTITIONS)

def filesourcePartitionFileCacheSize: Long = getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -746,34 +746,43 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]]
} else {
logDebug(s"Hive metastore filter is '$filter'.")
val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL
// We should get this config value from the metaStore. otherwise hit SPARK-18681.
// To be compatible with hive-0.12 and hive-0.13, In the future we can achieve this by:
// val tryDirectSql = hive.getMetaConf(tryDirectSqlConfVar.varname).toBoolean
val tryDirectSql = hive.getMSC.getConfigValue(tryDirectSqlConfVar.varname,
tryDirectSqlConfVar.defaultBoolVal.toString).toBoolean
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping @wangyum, too.

val shouldFallback = SQLConf.get.metastorePartitionPruningFallback
try {
// Hive may throw an exception when calling this method in some circumstances, such as
// when filtering on a non-string partition column when the hive config key
// hive.metastore.try.direct.sql is false
getPartitionsByFilterMethod.invoke(hive, table, filter)
.asInstanceOf[JArrayList[Partition]]
} catch {
case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] &&
!tryDirectSql =>
logWarning("Caught Hive MetaException attempting to get partition metadata by " +
"filter from Hive. Falling back to fetching all partition metadata, which will " +
"degrade performance. Modifying your Hive metastore configuration to set " +
s"${tryDirectSqlConfVar.varname} to true may resolve this problem.", ex)
// HiveShim clients are expected to handle a superset of the requested partitions
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]]
case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] &&
tryDirectSql =>
throw new RuntimeException("Caught Hive MetaException attempting to get partition " +
"metadata by filter from Hive. You can set the Spark configuration setting " +
s"${SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key} to false to work around this " +
"problem, however this will result in degraded performance. Please report a bug: " +
"https://issues.apache.org/jira/browse/SPARK", ex)
case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not blindly call getAllPartitions. This will be super slow. We should do some retries. It depends on the errors we got.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile From HMS side, the error is always the same "MetaException" and there is no way to tell apart a direct SQL error from an error of "not supported" (unfortunately!). How do you propose we address this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it's not blindly calling that API right? It was already being called before if direct sql was disabled. In the other case, it was just throwing an exception. So now instead of erroring out it will work, just more slowly than expected.

Unless there's some retry at a higher layer that I'm not aware of.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @sameeragarwal @tejasapatil Could you share what FB does for the retry?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile : Sorry for late reply. We had seen issues with this in past and resorted to do exponential backoff with retries. Fetching all the partitions is going to be bad in a prod setting.... even if it makes it through, the underlying problem if left un-noticed is bad for the system health.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you review the newer changes I have done? Basically, yes, I agree that fetching all partitions is going to be bad and hence we'll leave it up to the user. They can disable fetching all the partitions by setting "spark.sql.hive.metastorePartitionPruning.fallback.enabled" to false. In that case, we'll never retry. If it is set to "true", then we'll retry. As simple as that.

I don't completely understand "exponential backoff with retries". Do you do this at the HMS level? or at the query level? If HMS filter pushdown fails once, does it mean it will succeed in the future? Maybe this is a future improvement to this where instead of a boolean "fallback-enabled" or "fallback-disabled", we can have multiple levels for trying the fallback with timing etc. Thoughts @tejasapatil

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kmanamcheri : Lets do this:

  • We should prefer doing getPartitionsByFilterMethod(). If it fails, we retry with increasing delay across retries.
  • If retries are exhausted, we could fetch all the partitions of the table. Some people might not want this so lets control this using a conf flag. For those who don't want it, the query could fail at this point.

What do you think ?

if (shouldFallback) {
val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL
// We should get this config value from the metaStore. otherwise hit SPARK-18681.
// To be compatible with hive-0.12 and hive-0.13, In the future we can achieve this by
// val tryDirectSql = hive.getMetaConf(tryDirectSqlConfVar.varname).toBoolean
val tryDirectSql = hive.getMSC.getConfigValue(tryDirectSqlConfVar.varname,
tryDirectSqlConfVar.defaultBoolVal.toString).toBoolean
if (!tryDirectSql) {
logWarning("Caught Hive MetaException attempting to get partition metadata by " +
"filter from Hive. Falling back to fetching all partition metadata, which will " +
"degrade performance. Modifying your Hive metastore configuration to set " +
s"${tryDirectSqlConfVar.varname} to true may resolve this problem.")
} else {
logWarning("Caught Hive MetaException attempting to get partition metadata " +
"by filter from Hive. Hive metastore's direct SQL feature has been enabled, " +
"but it is an optimistic optimization and not guaranteed to work. Falling back " +
"to fetching all partition metadata, which will degrade performance (for the " +
"current query). If you see this error consistently, you can set the Spark " +
s"configuration setting ${SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key} to " +
"false as a work around, however this will result in degraded performance.")
}
// HiveShim clients are expected to handle a superset of the requested partitions
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]]
} else {
// Fallback mode has been disabled. Rethrow exception.
throw new RuntimeException("Caught Hive MetaException attempting to get partition " +
"metadata from Hive. Fallback mechanism is not enabled. You can set " +
s"${SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ENABLED} to true to fetch " +
"all partition metadata as a fallback mechanism, however this may result in " +
"degraded performance.", ex)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{BooleanType, IntegerType, LongType}

// TODO: Refactor this to `HivePartitionFilteringSuite`
class HiveClientSuite(version: String)
extends HiveVersionSuite(version) with BeforeAndAfterAll {
extends HiveVersionSuite(version) with BeforeAndAfterAll with SQLHelper {

private val tryDirectSqlKey = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname
private val partPruningKey = SQLConf.HIVE_METASTORE_PARTITION_PRUNING.key
private val partPruningFallbackKey = SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ENABLED.key

private val testPartitionCount = 3 * 24 * 4

Expand Down Expand Up @@ -79,12 +83,30 @@ class HiveClientSuite(version: String)
client = init(true)
}

test(s"getPartitionsByFilter returns all partitions when $tryDirectSqlKey=false") {
val client = init(false)
val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test"),
Seq(attr("ds") === 20170101))
test(s"getPartitionsByFilter should return all partitions if the underlying call to the " +
s"metastore fails and $partPruningFallbackKey=true") {
withSQLConf(partPruningFallbackKey -> "true", partPruningKey -> "true") {
val client = init(false)
// tryDirectSql = false and a non-string partition filter will always fail. This condition
// is used to test if the fallback works
val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test"),
Seq(attr("ds") === 20170101))

assert(filteredPartitions.size == testPartitionCount)
assert(filteredPartitions.size == testPartitionCount)
}
}

test(s"getPartitionsByFilter should fail if the underlying call to the metastore fails and " +
s"$partPruningFallbackKey=false") {
withSQLConf(partPruningFallbackKey -> "false", partPruningKey -> "true") {
val client = init(false)
// tryDirectSql = false and a non-string partition filter will always fail. This condition
// is used to test if the fallback works
assertThrows[RuntimeException](
client.getPartitionsByFilter(client.getTable("default", "test"),
Seq(attr("ds") === 20170101))
)
}
}

test("getPartitionsByFilter: ds<=>20170101") {
Expand Down