Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,39 +109,59 @@ private[hive] case class MetastoreRelation(
}

@transient override lazy val statistics: Statistics = {
catalogTable.stats.getOrElse(Statistics(
sizeInBytes = {
val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
// TODO: check if this estimate is valid for tables after partition pruning.
// NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
// relatively cheap if parameters for the table are populated into the metastore.
// Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys
// (see StatsSetupConst in Hive) that we can look at in the future.
BigInt(
// When table is external,`totalSize` is always zero, which will influence join strategy
// so when `totalSize` is zero, use `rawDataSize` instead
// when `rawDataSize` is also zero, use `HiveExternalCatalog.STATISTICS_TOTAL_SIZE`,
// which is generated by analyze command.
if (totalSize != null && totalSize.toLong > 0L) {
totalSize.toLong
} else if (rawDataSize != null && rawDataSize.toLong > 0) {
rawDataSize.toLong
} else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) {
try {
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf)
fs.getContentSummary(hiveQlTable.getPath).getLength
} catch {
case e: IOException =>
logWarning("Failed to get table size from hdfs.", e)
sparkSession.sessionState.conf.defaultSizeInBytes
}
} else {
sparkSession.sessionState.conf.defaultSizeInBytes
})
catalogTable.stats.getOrElse {
// For non-partitioned tables, Hive-generated statistics are stored in table properties
// For partitioned tables, Hive-generated statistics are stored in partition properties
val (totalSize, rawDataSize) = if (catalogTable.partitionColumnNames.isEmpty) {
val properties = Option(hiveQlTable.getParameters).map(_.asScala.toMap).orNull
(properties.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong),
properties.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong))
} else {
(getTotalTableSize(StatsSetupConst.TOTAL_SIZE),
getTotalTableSize(StatsSetupConst.RAW_DATA_SIZE))
}
))
Statistics(
sizeInBytes = {
BigInt(
// When table is external,`totalSize` is always zero, which will influence join strategy
// so when `totalSize` is zero, use `rawDataSize` instead. When `rawDataSize` is also
// zero, fall back to filesystem to get file sizes, if enabled.
if (totalSize.isDefined && totalSize.get > 0L) {
totalSize.get
} else if (rawDataSize.isDefined && rawDataSize.get > 0L) {
rawDataSize.get
} else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) {
try {
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf)
fs.getContentSummary(hiveQlTable.getPath).getLength
} catch {
case e: IOException =>
logWarning("Failed to get table size from hdfs.", e)
sparkSession.sessionState.conf.defaultSizeInBytes
}
} else {
sparkSession.sessionState.conf.defaultSizeInBytes
})
}
)
}
}

// For partitioned tables, get the size of all the partitions.
// Note: the statistics might not be gathered for all the partitions.
// For partial collection, we will not utilize the Hive-generated statistics.
private def getTotalTableSize(statType: String): Option[Long] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if a query only reads some partitions? Looks like the table statistics depend on partition pruning.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

: ) We are facing the same issue in both data source tables and hive tables. That requires another PR to change how we use the statistics of leaf nodes. Partition filtering on statistics should be considered by the Filter nodes. Is my understanding right?

allPartitions.foldLeft[Option[Long]](Some(0L)) {
(totalSize: Option[Long], cur: CatalogTablePartition) =>
val curSize = cur.parameters.get(statType).map(_.toLong)
if (totalSize.isEmpty || curSize.isEmpty || curSize.get < 0L) {
// return None when hitting the first negative or empty stats.
return None
} else {
Some(totalSize.get + curSize.get)
}
}
}

// When metastore partition pruning is turned off, we cache the list of all partitions to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,6 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
}

test("analyze MetastoreRelations") {
def queryTotalSize(tableName: String): BigInt =
spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName)).statistics.sizeInBytes

// Non-partitioned table
sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect()
sql("INSERT INTO TABLE analyzeTable SELECT * FROM src").collect()
Expand Down Expand Up @@ -154,11 +151,15 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
|SELECT * FROM src
""".stripMargin).collect()

assert(queryTotalSize("analyzeTable_part") === spark.sessionState.conf.defaultSizeInBytes)
// This is from Hive-generated statistics
val totalSizeFromHive = queryTotalSize("analyzeTable_part")

sql("ANALYZE TABLE analyzeTable_part COMPUTE STATISTICS noscan")

assert(queryTotalSize("analyzeTable_part") === BigInt(17436))
// This is from Spark-generated statistics
val totalSizeFromSpark = queryTotalSize("analyzeTable_part")

assert(totalSizeFromHive == totalSizeFromSpark)

sql("DROP TABLE analyzeTable_part").collect()

Expand All @@ -171,6 +172,9 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
TableIdentifier("tempTable"), ignoreIfNotExists = true, purge = false)
}

private def queryTotalSize(tableName: String): BigInt =
spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName)).statistics.sizeInBytes

private def checkStats(
stats: Option[Statistics],
hasSizeInBytes: Boolean,
Expand Down Expand Up @@ -235,6 +239,79 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
}
}

test("gather table-level statistics of managed partitioned tables from Hive") {
val managedTable = "partitionedTable"
withTable(managedTable) {
sql(
s"""
|CREATE TABLE $managedTable (key INT, value STRING)
|PARTITIONED BY (ds STRING, hr STRING)
""".stripMargin)

for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) {
sql(
s"""
|INSERT OVERWRITE TABLE $managedTable
|partition (ds='$ds',hr='$hr')
|SELECT 1, 'a'
""".stripMargin)
}

checkStats(
managedTable, isDataSourceTable = false, hasSizeInBytes = false, expectedRowCounts = None)

// This is from Hive-generated statistics
val totalSizeFromHive = queryTotalSize(managedTable)

sql(s"ANALYZE TABLE $managedTable COMPUTE STATISTICS noscan")

// This is from Spark-generated statistics
val totalSizeFromSpark = queryTotalSize(managedTable)

assert(totalSizeFromHive == totalSizeFromSpark)
}
}

test("gather statistics for external partitioned table from Hive") {
val catalog = spark.sessionState.catalog
val externalPartitionedTable = "partitionedTable"
withTempDir { tempDir =>
val basePath = tempDir.getCanonicalPath
withTable(externalPartitionedTable) {
sql(
s"""
|CREATE EXTERNAL TABLE $externalPartitionedTable (key INT, value STRING)
|PARTITIONED BY (ds STRING, hr STRING)
|LOCATION '$basePath'
""".stripMargin)
for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) {
sql(
s"""
|INSERT OVERWRITE TABLE $externalPartitionedTable
|partition (ds='$ds',hr='$hr')
|SELECT 1, 'a'
""".stripMargin)
}
val totalSizeFromHive1 = queryTotalSize(externalPartitionedTable)

sql(
s"""
|ALTER TABLE $externalPartitionedTable DROP PARTITION (ds='2008-04-08'),
|PARTITION (hr='12')
""".stripMargin)
assert(
catalog.listPartitions(TableIdentifier(externalPartitionedTable)).map(_.spec).toSet ==
Set(Map("ds" -> "2008-04-09", "hr" -> "11")))
val totalSizeFromHive2 = queryTotalSize(externalPartitionedTable)

sql(s"ALTER TABLE $externalPartitionedTable ADD PARTITION (ds='2008-04-08', hr='12')")
val totalSizeFromHive3 = queryTotalSize(externalPartitionedTable)

assert(totalSizeFromHive1 > totalSizeFromHive3 && totalSizeFromHive3 > totalSizeFromHive2)
}
}
}

test("test elimination of the influences of the old stats") {
val textTable = "textTable"
withTable(textTable) {
Expand Down