Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.util.control.NonFatal

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.{Function => HiveFunction, FunctionType, NoSuchObjectException, PrincipalType, ResourceType, ResourceUri}
import org.apache.hadoop.hive.metastore.api.{Function => HiveFunction, FunctionType, MetaException, PrincipalType, ResourceType, ResourceUri}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition, Table}
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
Expand All @@ -43,6 +43,7 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, FunctionResource, FunctionResourceType}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegralType, StringType}
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -586,17 +587,31 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]]
} else {
logDebug(s"Hive metastore filter is '$filter'.")
val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL
val tryDirectSql =
hive.getConf.getBoolean(tryDirectSqlConfVar.varname, tryDirectSqlConfVar.defaultBoolVal)
try {
// Hive may throw an exception when calling this method in some circumstances, such as
// when filtering on a non-string partition column when the hive config key
// hive.metastore.try.direct.sql is false
getPartitionsByFilterMethod.invoke(hive, table, filter)
.asInstanceOf[JArrayList[Partition]]
} catch {
case e: InvocationTargetException =>
// SPARK-18167 retry to investigate the flaky test. This should be reverted before
// the release is cut.
val retry = Try(getPartitionsByFilterMethod.invoke(hive, table, filter))
logError("getPartitionsByFilter failed, retry success = " + retry.isSuccess)
logError("all partitions: " + getAllPartitions(hive, table))
throw e
case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] &&
!tryDirectSql =>
logWarning("Caught Hive MetaException attempting to get partition metadata by " +
"filter from Hive. Falling back to fetching all partition metadata, which will " +
"degrade performance. Modifying your Hive metastore configuration to set " +
s"${tryDirectSqlConfVar.varname} to true may resolve this problem.", ex)
// HiveShim clients are expected to handle a superset of the requested partitions
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]]
case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] &&
tryDirectSql =>
throw new RuntimeException("Caught Hive MetaException attempting to get partition " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mallman sorry to disturb you here, but what is the reason that when direct sql isn't set only a warning is logged?and why when direct sql is set a runtime exception is being raised instead of just a warning like no direct sql case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @rezasafi

I believe the reasoning is if the user has disabled direct sql, we will try to fetch the partitions for the requested partition predicate anyway. However, since we don't expect that call to succeed, we just log a warning and fallback to the legacy behavior.

On the other hand, if the user has enabled direct sql, then we expect the call to Hive to succeed. If it fails, we consider that an error and throw an exception.

I hope that helps clarify things.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for the explanation @mallman. I appreciate it.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mallman Your assumption is incorrect. If Hive on direct sql fails, it will retry with ORM. So in this case, I am able to reproduce a issue with postgres where direct sql fails and if it retries with ORM, spark fails! Hive has fallback behavior for direct sql.

Filed SPARK-25561

"metadata by filter from Hive. You can set the Spark configuration setting " +
s"${SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key} to false to work around this " +
"problem, however this will result in degraded performance. Please report a bug: " +
"https://issues.apache.org/jira/browse/SPARK", ex)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.client

import java.io.File

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.util.VersionInfo

import org.apache.spark.SparkConf
import org.apache.spark.util.Utils

private[client] class HiveClientBuilder {
private val sparkConf = new SparkConf()

// In order to speed up test execution during development or in Jenkins, you can specify the path
// of an existing Ivy cache:
private val ivyPath: Option[String] = {
sys.env.get("SPARK_VERSIONS_SUITE_IVY_PATH").orElse(
Some(new File(sys.props("java.io.tmpdir"), "hive-ivy-cache").getAbsolutePath))
}

private def buildConf() = {
lazy val warehousePath = Utils.createTempDir()
lazy val metastorePath = Utils.createTempDir()
metastorePath.delete()
Map(
"javax.jdo.option.ConnectionURL" -> s"jdbc:derby:;databaseName=$metastorePath;create=true",
"hive.metastore.warehouse.dir" -> warehousePath.toString)
}

def buildClient(version: String, hadoopConf: Configuration): HiveClient = {
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = version,
hadoopVersion = VersionInfo.getVersion,
sparkConf = sparkConf,
hadoopConf = hadoopConf,
config = buildConf(),
ivyPath = ivyPath).createClient()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.client

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.conf.HiveConf

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.types.IntegerType

class HiveClientSuite extends SparkFunSuite {
private val clientBuilder = new HiveClientBuilder

private val tryDirectSqlKey = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname

test(s"getPartitionsByFilter returns all partitions when $tryDirectSqlKey=false") {
val testPartitionCount = 5

val storageFormat = CatalogStorageFormat(
locationUri = None,
inputFormat = None,
outputFormat = None,
serde = None,
compressed = false,
properties = Map.empty)

val hadoopConf = new Configuration()
hadoopConf.setBoolean(tryDirectSqlKey, false)
val client = clientBuilder.buildClient(HiveUtils.hiveExecutionVersion, hadoopConf)
client.runSqlHive("CREATE TABLE test (value INT) PARTITIONED BY (part INT)")

val partitions = (1 to testPartitionCount).map { part =>
CatalogTablePartition(Map("part" -> part.toString), storageFormat)
}
client.createPartitions(
"default", "test", partitions, ignoreIfExists = false)

val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test"),
Seq(EqualTo(AttributeReference("part", IntegerType)(), Literal(3))))

assert(filteredPartitions.size == testPartitionCount)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.util.VersionInfo

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
Expand All @@ -48,46 +47,19 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
@ExtendedHiveTest
class VersionsSuite extends SparkFunSuite with Logging {

private val sparkConf = new SparkConf()

// In order to speed up test execution during development or in Jenkins, you can specify the path
// of an existing Ivy cache:
private val ivyPath: Option[String] = {
sys.env.get("SPARK_VERSIONS_SUITE_IVY_PATH").orElse(
Some(new File(sys.props("java.io.tmpdir"), "hive-ivy-cache").getAbsolutePath))
}

private def buildConf() = {
lazy val warehousePath = Utils.createTempDir()
lazy val metastorePath = Utils.createTempDir()
metastorePath.delete()
Map(
"javax.jdo.option.ConnectionURL" -> s"jdbc:derby:;databaseName=$metastorePath;create=true",
"hive.metastore.warehouse.dir" -> warehousePath.toString)
}
private val clientBuilder = new HiveClientBuilder
import clientBuilder.buildClient

test("success sanity check") {
val badClient = IsolatedClientLoader.forVersion(
hiveMetastoreVersion = HiveUtils.hiveExecutionVersion,
hadoopVersion = VersionInfo.getVersion,
sparkConf = sparkConf,
hadoopConf = new Configuration(),
config = buildConf(),
ivyPath = ivyPath).createClient()
val badClient = buildClient(HiveUtils.hiveExecutionVersion, new Configuration())
val db = new CatalogDatabase("default", "desc", "loc", Map())
badClient.createDatabase(db, ignoreIfExists = true)
}

test("hadoop configuration preserved") {
val hadoopConf = new Configuration();
val hadoopConf = new Configuration()
hadoopConf.set("test", "success")
val client = IsolatedClientLoader.forVersion(
hiveMetastoreVersion = HiveUtils.hiveExecutionVersion,
hadoopVersion = VersionInfo.getVersion,
sparkConf = sparkConf,
hadoopConf = hadoopConf,
config = buildConf(),
ivyPath = ivyPath).createClient()
val client = buildClient(HiveUtils.hiveExecutionVersion, hadoopConf)
assert("success" === client.getConf("test", null))
}

Expand All @@ -109,15 +81,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
// TODO: currently only works on mysql where we manually create the schema...
ignore("failure sanity check") {
val e = intercept[Throwable] {
val badClient = quietly {
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = "13",
hadoopVersion = VersionInfo.getVersion,
sparkConf = sparkConf,
hadoopConf = new Configuration(),
config = buildConf(),
ivyPath = ivyPath).createClient()
}
val badClient = quietly { buildClient("13", new Configuration()) }
}
assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'")
}
Expand All @@ -130,16 +94,9 @@ class VersionsSuite extends SparkFunSuite with Logging {
test(s"$version: create client") {
client = null
System.gc() // Hack to avoid SEGV on some JVM versions.
val hadoopConf = new Configuration();
val hadoopConf = new Configuration()
hadoopConf.set("test", "success")
client =
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = version,
hadoopVersion = VersionInfo.getVersion,
sparkConf = sparkConf,
hadoopConf = hadoopConf,
config = buildConf(),
ivyPath = ivyPath).createClient()
client = buildClient(version, hadoopConf)
}

def table(database: String, tableName: String): CatalogTable = {
Expand Down Expand Up @@ -287,15 +244,19 @@ class VersionsSuite extends SparkFunSuite with Logging {
client.runSqlHive("CREATE TABLE src_part (value INT) PARTITIONED BY (key1 INT, key2 INT)")
}

val testPartitionCount = 2

test(s"$version: createPartitions") {
val partition1 = CatalogTablePartition(Map("key1" -> "1", "key2" -> "1"), storageFormat)
val partition2 = CatalogTablePartition(Map("key1" -> "1", "key2" -> "2"), storageFormat)
val partitions = (1 to testPartitionCount).map { key2 =>
CatalogTablePartition(Map("key1" -> "1", "key2" -> key2.toString), storageFormat)
}
client.createPartitions(
"default", "src_part", Seq(partition1, partition2), ignoreIfExists = true)
"default", "src_part", partitions, ignoreIfExists = true)
}

test(s"$version: getPartitions(catalogTable)") {
assert(2 == client.getPartitions(client.getTable("default", "src_part")).size)
assert(testPartitionCount ==
client.getPartitions(client.getTable("default", "src_part")).size)
}

test(s"$version: getPartitionsByFilter") {
Expand All @@ -306,6 +267,8 @@ class VersionsSuite extends SparkFunSuite with Logging {
// Hive 0.12 doesn't support getPartitionsByFilter, it ignores the filter condition.
if (version != "0.12") {
assert(result.size == 1)
} else {
assert(result.size == testPartitionCount)
}
}

Expand All @@ -327,7 +290,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
}

test(s"$version: getPartitions(db: String, table: String)") {
assert(2 == client.getPartitions("default", "src_part", None).size)
assert(testPartitionCount == client.getPartitions("default", "src_part", None).size)
}

test(s"$version: loadPartition") {
Expand Down