diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 33f0ecff6352..0a86809a3439 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -109,39 +109,59 @@ private[hive] case class MetastoreRelation( } @transient override lazy val statistics: Statistics = { - catalogTable.stats.getOrElse(Statistics( - sizeInBytes = { - val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE) - val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE) - // TODO: check if this estimate is valid for tables after partition pruning. - // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be - // relatively cheap if parameters for the table are populated into the metastore. - // Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys - // (see StatsSetupConst in Hive) that we can look at in the future. - BigInt( - // When table is external,`totalSize` is always zero, which will influence join strategy - // so when `totalSize` is zero, use `rawDataSize` instead - // when `rawDataSize` is also zero, use `HiveExternalCatalog.STATISTICS_TOTAL_SIZE`, - // which is generated by analyze command. - if (totalSize != null && totalSize.toLong > 0L) { - totalSize.toLong - } else if (rawDataSize != null && rawDataSize.toLong > 0) { - rawDataSize.toLong - } else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) { - try { - val hadoopConf = sparkSession.sessionState.newHadoopConf() - val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf) - fs.getContentSummary(hiveQlTable.getPath).getLength - } catch { - case e: IOException => - logWarning("Failed to get table size from hdfs.", e) - sparkSession.sessionState.conf.defaultSizeInBytes - } - } else { - sparkSession.sessionState.conf.defaultSizeInBytes - }) + catalogTable.stats.getOrElse { + // For non-partitioned tables, Hive-generated statistics are stored in table properties + // For partitioned tables, Hive-generated statistics are stored in partition properties + val (totalSize, rawDataSize) = if (catalogTable.partitionColumnNames.isEmpty) { + val properties = Option(hiveQlTable.getParameters).map(_.asScala.toMap).orNull + (properties.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong), + properties.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)) + } else { + (getTotalTableSize(StatsSetupConst.TOTAL_SIZE), + getTotalTableSize(StatsSetupConst.RAW_DATA_SIZE)) } - )) + Statistics( + sizeInBytes = { + BigInt( + // When table is external,`totalSize` is always zero, which will influence join strategy + // so when `totalSize` is zero, use `rawDataSize` instead. When `rawDataSize` is also + // zero, fall back to filesystem to get file sizes, if enabled. + if (totalSize.isDefined && totalSize.get > 0L) { + totalSize.get + } else if (rawDataSize.isDefined && rawDataSize.get > 0L) { + rawDataSize.get + } else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) { + try { + val hadoopConf = sparkSession.sessionState.newHadoopConf() + val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf) + fs.getContentSummary(hiveQlTable.getPath).getLength + } catch { + case e: IOException => + logWarning("Failed to get table size from hdfs.", e) + sparkSession.sessionState.conf.defaultSizeInBytes + } + } else { + sparkSession.sessionState.conf.defaultSizeInBytes + }) + } + ) + } + } + + // For partitioned tables, get the size of all the partitions. + // Note: the statistics might not be gathered for all the partitions. + // For partial collection, we will not utilize the Hive-generated statistics. + private def getTotalTableSize(statType: String): Option[Long] = { + allPartitions.foldLeft[Option[Long]](Some(0L)) { + (totalSize: Option[Long], cur: CatalogTablePartition) => + val curSize = cur.parameters.get(statType).map(_.toLong) + if (totalSize.isEmpty || curSize.isEmpty || curSize.get < 0L) { + // return None when hitting the first negative or empty stats. + return None + } else { + Some(totalSize.get + curSize.get) + } + } } // When metastore partition pruning is turned off, we cache the list of all partitions to diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 9956706929cd..ba8c5e5a49c4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -119,9 +119,6 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils } test("analyze MetastoreRelations") { - def queryTotalSize(tableName: String): BigInt = - spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName)).statistics.sizeInBytes - // Non-partitioned table sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect() sql("INSERT INTO TABLE analyzeTable SELECT * FROM src").collect() @@ -154,11 +151,15 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils |SELECT * FROM src """.stripMargin).collect() - assert(queryTotalSize("analyzeTable_part") === spark.sessionState.conf.defaultSizeInBytes) + // This is from Hive-generated statistics + val totalSizeFromHive = queryTotalSize("analyzeTable_part") sql("ANALYZE TABLE analyzeTable_part COMPUTE STATISTICS noscan") - assert(queryTotalSize("analyzeTable_part") === BigInt(17436)) + // This is from Spark-generated statistics + val totalSizeFromSpark = queryTotalSize("analyzeTable_part") + + assert(totalSizeFromHive == totalSizeFromSpark) sql("DROP TABLE analyzeTable_part").collect() @@ -171,6 +172,9 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils TableIdentifier("tempTable"), ignoreIfNotExists = true, purge = false) } + private def queryTotalSize(tableName: String): BigInt = + spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName)).statistics.sizeInBytes + private def checkStats( stats: Option[Statistics], hasSizeInBytes: Boolean, @@ -235,6 +239,79 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils } } + test("gather table-level statistics of managed partitioned tables from Hive") { + val managedTable = "partitionedTable" + withTable(managedTable) { + sql( + s""" + |CREATE TABLE $managedTable (key INT, value STRING) + |PARTITIONED BY (ds STRING, hr STRING) + """.stripMargin) + + for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) { + sql( + s""" + |INSERT OVERWRITE TABLE $managedTable + |partition (ds='$ds',hr='$hr') + |SELECT 1, 'a' + """.stripMargin) + } + + checkStats( + managedTable, isDataSourceTable = false, hasSizeInBytes = false, expectedRowCounts = None) + + // This is from Hive-generated statistics + val totalSizeFromHive = queryTotalSize(managedTable) + + sql(s"ANALYZE TABLE $managedTable COMPUTE STATISTICS noscan") + + // This is from Spark-generated statistics + val totalSizeFromSpark = queryTotalSize(managedTable) + + assert(totalSizeFromHive == totalSizeFromSpark) + } + } + + test("gather statistics for external partitioned table from Hive") { + val catalog = spark.sessionState.catalog + val externalPartitionedTable = "partitionedTable" + withTempDir { tempDir => + val basePath = tempDir.getCanonicalPath + withTable(externalPartitionedTable) { + sql( + s""" + |CREATE EXTERNAL TABLE $externalPartitionedTable (key INT, value STRING) + |PARTITIONED BY (ds STRING, hr STRING) + |LOCATION '$basePath' + """.stripMargin) + for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) { + sql( + s""" + |INSERT OVERWRITE TABLE $externalPartitionedTable + |partition (ds='$ds',hr='$hr') + |SELECT 1, 'a' + """.stripMargin) + } + val totalSizeFromHive1 = queryTotalSize(externalPartitionedTable) + + sql( + s""" + |ALTER TABLE $externalPartitionedTable DROP PARTITION (ds='2008-04-08'), + |PARTITION (hr='12') + """.stripMargin) + assert( + catalog.listPartitions(TableIdentifier(externalPartitionedTable)).map(_.spec).toSet == + Set(Map("ds" -> "2008-04-09", "hr" -> "11"))) + val totalSizeFromHive2 = queryTotalSize(externalPartitionedTable) + + sql(s"ALTER TABLE $externalPartitionedTable ADD PARTITION (ds='2008-04-08', hr='12')") + val totalSizeFromHive3 = queryTotalSize(externalPartitionedTable) + + assert(totalSizeFromHive1 > totalSizeFromHive3 && totalSizeFromHive3 > totalSizeFromHive2) + } + } + } + test("test elimination of the influences of the old stats") { val textTable = "textTable" withTable(textTable) {