Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.catalog

import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException}
import org.apache.spark.sql.catalyst.expressions.Expression


/**
Expand Down Expand Up @@ -196,6 +197,18 @@ abstract class ExternalCatalog {
table: String,
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition]

/**
* List the metadata of selected partitions according to the given partition predicates.
*
* @param db database name
* @param table table name
* @param predicates partition predicated
*/
def listPartitionsByFilter(
db: String,
table: String,
predicates: Seq[Expression]): Seq[CatalogTablePartition]

// --------------------------------------------------------------------------
// Functions
// --------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.util.StringUtils

/**
Expand Down Expand Up @@ -477,6 +478,13 @@ class InMemoryCatalog(
catalog(db).tables(table).partitions.values.toSeq
}

override def listPartitionsByFilter(
db: String,
table: String,
predicates: Seq[Expression]): Seq[CatalogTablePartition] = {
throw new UnsupportedOperationException("listPartitionsByFilter is not implemented.")
}

// --------------------------------------------------------------------------
// Functions
// --------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics}
import org.apache.spark.sql.execution.command.{ColumnStatStruct, DDLUtils}
import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
Expand Down Expand Up @@ -636,6 +637,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
client.getPartitions(db, table, partialSpec)
}

override def listPartitionsByFilter(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need a test case in HiveExternalCatalogSuite, just like the other APIs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

db: String,
table: String,
predicates: Seq[Expression]): Seq[CatalogTablePartition] = {
client.getPartitionsByFilter(db, table, predicates)
}

// --------------------------------------------------------------------------
// Functions
// --------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ import org.apache.spark.sql.types._
*/
private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging {
private val sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState]
private val client =
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client

/** A fully qualified identifier for a table (i.e., database.tableName) */
case class QualifiedTableName(database: String, name: String)
Expand Down Expand Up @@ -104,7 +102,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = {
// Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName)
val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
new Path(new Path(client.getDatabase(dbName).locationUri), tblName).toString
val dbLocation = sparkSession.sharedState.externalCatalog.getDatabase(dbName).locationUri
new Path(new Path(dbLocation), tblName).toString
}

def lookupRelation(
Expand All @@ -129,7 +128,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
} else {
val qualifiedTable =
MetastoreRelation(
qualifiedTableName.database, qualifiedTableName.name)(table, client, sparkSession)
qualifiedTableName.database, qualifiedTableName.name)(table, sparkSession)
alias.map(a => SubqueryAlias(a, qualifiedTable, None)).getOrElse(qualifiedTable)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ private[hive] case class MetastoreRelation(
databaseName: String,
tableName: String)
(val catalogTable: CatalogTable,
@transient private val client: HiveClient,
@transient private val sparkSession: SparkSession)
extends LeafNode with MultiInstanceRelation with FileRelation with CatalogRelation {

Expand All @@ -59,7 +58,7 @@ private[hive] case class MetastoreRelation(
Objects.hashCode(databaseName, tableName, output)
}

override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: client :: sparkSession :: Nil
override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil

private def toHiveColumn(c: StructField): FieldSchema = {
new FieldSchema(c.name, c.dataType.catalogString, c.getComment.orNull)
Expand Down Expand Up @@ -146,11 +145,18 @@ private[hive] case class MetastoreRelation(

// When metastore partition pruning is turned off, we cache the list of all partitions to
// mimic the behavior of Spark < 1.5
private lazy val allPartitions: Seq[CatalogTablePartition] = client.getPartitions(catalogTable)
private lazy val allPartitions: Seq[CatalogTablePartition] = {
sparkSession.sharedState.externalCatalog.listPartitions(
catalogTable.database,
catalogTable.identifier.table)
}

def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sparkSession.sessionState.conf.metastorePartitionPruning) {
client.getPartitionsByFilter(catalogTable, predicates)
sparkSession.sharedState.externalCatalog.listPartitionsByFilter(
catalogTable.database,
catalogTable.identifier.table,
predicates)
} else {
allPartitions
}
Expand Down Expand Up @@ -234,8 +240,7 @@ private[hive] case class MetastoreRelation(
val columnOrdinals = AttributeMap(attributes.zipWithIndex)

override def inputFiles: Array[String] = {
val partLocations = client
.getPartitionsByFilter(catalogTable, Nil)
val partLocations = allPartitions
.flatMap(_.storage.locationUri)
.toArray
if (partLocations.nonEmpty) {
Expand All @@ -248,6 +253,6 @@ private[hive] case class MetastoreRelation(
}

override def newInstance(): MetastoreRelation = {
MetastoreRelation(databaseName, tableName)(catalogTable, client, sparkSession)
MetastoreRelation(databaseName, tableName)(catalogTable, sparkSession)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,7 @@ class HadoopTableReader(
* subdirectory of each partition being read. If None, then all files are accepted.
*/
def makeRDDForPartitionedTable(
partitionToDeserializer: Map[HivePartition,
Class[_ <: Deserializer]],
partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]],
filterOpt: Option[PathFilter]): RDD[InternalRow] = {

// SPARK-5068:get FileStatus and do the filtering locally when the path is not exists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,24 +172,15 @@ private[hive] trait HiveClient {
* Returns the partitions for the given table that match the supplied partition spec.
* If no partition spec is specified, all partitions are returned.
*/
final def getPartitions(
def getPartitions(
db: String,
table: String,
partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = {
getPartitions(getTable(db, table), partialSpec)
}

/**
* Returns the partitions for the given table that match the supplied partition spec.
* If no partition spec is specified, all partitions are returned.
*/
def getPartitions(
table: CatalogTable,
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition]

/** Returns partitions filtered by predicates for the given table. */
def getPartitionsByFilter(
table: CatalogTable,
db: String,
table: String,
predicates: Seq[Expression]): Seq[CatalogTablePartition]

/** Loads a static partition into an existing table. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,19 +525,21 @@ private[hive] class HiveClientImpl(
* If no partition spec is specified, all partitions are returned.
*/
override def getPartitions(
table: CatalogTable,
db: String,
table: String,
spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = withHiveState {
val hiveTable = toHiveTable(table)
val hiveTable = toHiveTable(getTable(db, table))
spec match {
case None => shim.getAllPartitions(client, hiveTable).map(fromHivePartition)
case Some(s) => client.getPartitions(hiveTable, s.asJava).asScala.map(fromHivePartition)
}
}

override def getPartitionsByFilter(
table: CatalogTable,
db: String,
table: String,
predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState {
val hiveTable = toHiveTable(table)
val hiveTable = toHiveTable(getTable(db, table))
shim.getPartitionsByFilter(client, hiveTable, predicates).map(fromHivePartition)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration

import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.dsl.expressions._

/**
* Test suite for the [[HiveExternalCatalog]].
Expand All @@ -43,4 +44,12 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
externalCatalog.client.reset()
}

import utils._

test("list partitions by filter") {
val catalog = newBasicCatalog()
val selectedPartitions = catalog.listPartitionsByFilter("db2", "tbl2", Seq('a.int === 1))
assert(selectedPartitions.length == 1)
assert(selectedPartitions.head.spec == part1.spec)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class MetastoreRelationSuite extends SparkFunSuite {
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = StructType(StructField("a", IntegerType, true) :: Nil))
val relation = MetastoreRelation("db", "test")(table, null, null)
val relation = MetastoreRelation("db", "test")(table, null)

// No exception should be thrown
relation.makeCopy(Array("db", "test"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,12 +295,12 @@ class VersionsSuite extends SparkFunSuite with Logging {
}

test(s"$version: getPartitions(catalogTable)") {
assert(2 == client.getPartitions(client.getTable("default", "src_part")).size)
assert(2 == client.getPartitions("default", "src_part").size)
}

test(s"$version: getPartitionsByFilter") {
// Only one partition [1, 1] for key2 == 1
val result = client.getPartitionsByFilter(client.getTable("default", "src_part"),
val result = client.getPartitionsByFilter("default", "src_part",
Seq(EqualTo(AttributeReference("key2", IntegerType)(), Literal(1))))

// Hive 0.12 doesn't support getPartitionsByFilter, it ignores the filter condition.
Expand Down