Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ case class BucketSpec(
* sensitive schema was unable to be read from the table properties.
* Used to trigger case-sensitive schema inference at query time, when
* configured.
* @param ignoredProperties is a list of table properties that are used by the underlying table
* but ignored by Spark SQL yet.
*/
case class CatalogTable(
identifier: TableIdentifier,
Expand All @@ -221,7 +223,8 @@ case class CatalogTable(
comment: Option[String] = None,
unsupportedFeatures: Seq[String] = Seq.empty,
tracksPartitionsInCatalog: Boolean = false,
schemaPreservesCase: Boolean = true) {
schemaPreservesCase: Boolean = true,
ignoredProperties: Map[String, String] = Map.empty) {

import CatalogTable._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,8 @@ class TreeNodeSuite extends SparkFunSuite {
"tracksPartitionsInCatalog" -> false,
"properties" -> JNull,
"unsupportedFeatures" -> List.empty[String],
"schemaPreservesCase" -> JBool(true)))
"schemaPreservesCase" -> JBool(true),
"ignoredProperties" -> JNull))

// For unknown case class, returns JNull.
val bigValue = new Array[Int](10000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,20 +119,7 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
case relation: CatalogRelation
if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
val table = relation.tableMeta
// TODO: check if this estimate is valid for tables after partition pruning.
// NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
// relatively cheap if parameters for the table are populated into the metastore.
// Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys
// (see StatsSetupConst in Hive) that we can look at in the future.
// When table is external,`totalSize` is always zero, which will influence join strategy
// so when `totalSize` is zero, use `rawDataSize` instead.
val totalSize = table.properties.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
val rawDataSize = table.properties.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
val sizeInBytes = if (totalSize.isDefined && totalSize.get > 0) {
totalSize.get
} else if (rawDataSize.isDefined && rawDataSize.get > 0) {
rawDataSize.get
} else if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
try {
val hadoopConf = session.sessionState.newHadoopConf()
val tablePath = new Path(table.location)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Order}
Expand Down Expand Up @@ -414,6 +415,47 @@ private[hive] class HiveClientImpl(

val properties = Option(h.getParameters).map(_.asScala.toMap).orNull

// Hive-generated Statistics are also recorded in ignoredProperties
val ignoredProperties = scala.collection.mutable.Map.empty[String, String]
for (key <- HiveStatisticsProperties; value <- properties.get(key)) {
ignoredProperties += key -> value
}

val excludedTableProperties = HiveStatisticsProperties ++ Set(
// The property value of "comment" is moved to the dedicated field "comment"
"comment",
// For EXTERNAL_TABLE, the table properties has a particular field "EXTERNAL". This is added
// in the function toHiveTable.
"EXTERNAL"
)

val filteredProperties = properties.filterNot {
case (key, _) => excludedTableProperties.contains(key)
}
val comment = properties.get("comment")

val totalSize = properties.get(StatsSetupConst.TOTAL_SIZE).map(BigInt(_))
val rawDataSize = properties.get(StatsSetupConst.RAW_DATA_SIZE).map(BigInt(_))
val rowCount = properties.get(StatsSetupConst.ROW_COUNT).map(BigInt(_)).filter(_ >= 0)
// TODO: check if this estimate is valid for tables after partition pruning.
// NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
// relatively cheap if parameters for the table are populated into the metastore.
// Currently, only totalSize, rawDataSize, and rowCount are used to build the field `stats`
// TODO: stats should include all the other two fields (`numFiles` and `numPartitions`).
// (see StatsSetupConst in Hive)
val stats =
// When table is external, `totalSize` is always zero, which will influence join strategy
// so when `totalSize` is zero, use `rawDataSize` instead. When `rawDataSize` is also zero,
// return None. Later, we will use the other ways to estimate the statistics.
if (totalSize.isDefined && totalSize.get > 0L) {
Some(CatalogStatistics(sizeInBytes = totalSize.get, rowCount = rowCount))
} else if (rawDataSize.isDefined && rawDataSize.get > 0) {
Some(CatalogStatistics(sizeInBytes = rawDataSize.get, rowCount = rowCount))
} else {
// TODO: still fill the rowCount even if sizeInBytes is empty. Might break anything?
None
}

CatalogTable(
identifier = TableIdentifier(h.getTableName, Option(h.getDbName)),
tableType = h.getTableType match {
Expand Down Expand Up @@ -451,13 +493,15 @@ private[hive] class HiveClientImpl(
),
// For EXTERNAL_TABLE, the table properties has a particular field "EXTERNAL". This is added
// in the function toHiveTable.
properties = properties.filter(kv => kv._1 != "comment" && kv._1 != "EXTERNAL"),
comment = properties.get("comment"),
properties = filteredProperties,
stats = stats,
comment = comment,
// In older versions of Spark(before 2.2.0), we expand the view original text and store
// that into `viewExpandedText`, and that should be used in view resolution. So we get
// `viewExpandedText` instead of `viewOriginalText` for viewText here.
viewText = Option(h.getViewExpandedText),
unsupportedFeatures = unsupportedFeatures)
unsupportedFeatures = unsupportedFeatures,
ignoredProperties = ignoredProperties.toMap)
}
}

Expand All @@ -474,7 +518,12 @@ private[hive] class HiveClientImpl(
}

override def alterTable(tableName: String, table: CatalogTable): Unit = withHiveState {
val hiveTable = toHiveTable(table, Some(userName))
// getTableOption removes all the Hive-specific properties. Here, we fill them back to ensure
// these properties are still available to the others that share the same Hive metastore.
// If users explicitly alter these Hive-specific properties through ALTER TABLE DDL, we respect
// these user-specified values.
val hiveTable = toHiveTable(
table.copy(properties = table.ignoredProperties ++ table.properties), Some(userName))
// Do not use `table.qualifiedName` here because this may be a rename
val qualifiedTableName = s"${table.database}.$tableName"
shim.alterTable(client, qualifiedTableName, hiveTable)
Expand Down Expand Up @@ -956,4 +1005,14 @@ private[hive] object HiveClientImpl {
parameters =
if (hp.getParameters() != null) hp.getParameters().asScala.toMap else Map.empty)
}

// Below is the key of table properties for storing Hive-generated statistics
private val HiveStatisticsProperties = Set(
StatsSetupConst.COLUMN_STATS_ACCURATE,
StatsSetupConst.NUM_FILES,
StatsSetupConst.NUM_PARTITIONS,
StatsSetupConst.ROW_COUNT,
StatsSetupConst.RAW_DATA_SIZE,
StatsSetupConst.TOTAL_SIZE
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ package org.apache.spark.sql.hive.test

import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.client.HiveClient


trait TestHiveSingleton extends SparkFunSuite with BeforeAndAfterAll {
protected val spark: SparkSession = TestHive.sparkSession
protected val hiveContext: TestHiveContext = TestHive
protected val hiveClient: HiveClient =
spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client

protected override def afterAll(): Unit = {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,6 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
}

test("SPARK-15887: hive-site.xml should be loaded") {
val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
assert(hiveClient.getConf("hive.in.test", "") == "true")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ import org.apache.spark.util.Utils
class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {

// To test `HiveExternalCatalog`, we need to read/write the raw table meta from/to hive client.
val hiveClient: HiveClient =
spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client

val tempDir = Utils.createTempDir().getCanonicalFile
val tempDirUri = tempDir.toURI
val tempDirStr = tempDir.getAbsolutePath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
jsonFilePath = Utils.getSparkClassLoader.getResource("sample.json").getFile
}

// To test `HiveExternalCatalog`, we need to read the raw table metadata(schema, partition
// columns and bucket specification are still in table properties) from hive client.
private def hiveClient: HiveClient =
sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client

test("persistent JSON table") {
withTable("jsonTable") {
sql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,26 +325,20 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing
"last_modified_by",
"last_modified_time",
"Owner:",
"COLUMN_STATS_ACCURATE",
// The following are hive specific schema parameters which we do not need to match exactly.
"numFiles",
"numRows",
"rawDataSize",
"totalSize",
"totalNumberFiles",
"maxFileSize",
"minFileSize",
// EXTERNAL is not non-deterministic, but it is filtered out for external tables.
"EXTERNAL"
"minFileSize"
)

table.copy(
createTime = 0L,
lastAccessTime = 0L,
properties = table.properties.filterKeys(!nondeterministicProps.contains(_))
properties = table.properties.filterKeys(!nondeterministicProps.contains(_)),
stats = None,
ignoredProperties = Map.empty
)
}

assert(normalize(actual) == normalize(expected))
}
}
Loading