diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index d047953327958..30b157bd9eb58 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -193,6 +193,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log None) val logicalRelation = cached.getOrElse { val updatedTable = inferIfNeeded(relation, options, fileFormat) + // Intialize the catalogTable stats if its not defined.An intial value has to be defined + // so that the hive statistics will be updated after each insert command. + val withStats = { + if (updatedTable.stats == None) { + val sizeInBytes = HiveUtils.getSizeInBytes(updatedTable, sparkSession) + updatedTable.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes)))) + } else { + updatedTable + } + } val created = LogicalRelation( DataSource( @@ -202,7 +212,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log bucketSpec = None, options = options, className = fileType).resolveRelation(), - table = updatedTable) + table = withStats) catalogProxy.cacheTable(tableIdentifier, created) created diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 07ee105404311..c26d0aa8c5cea 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -118,21 +118,7 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { case relation: HiveTableRelation if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty => val table = relation.tableMeta - val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { - try { - val hadoopConf = session.sessionState.newHadoopConf() - val tablePath = new Path(table.location) - val fs: FileSystem = tablePath.getFileSystem(hadoopConf) - fs.getContentSummary(tablePath).getLength - } catch { - case e: IOException => - logWarning("Failed to get table size from hdfs.", e) - session.sessionState.conf.defaultSizeInBytes - } - } else { - session.sessionState.conf.defaultSizeInBytes - } - + val sizeInBytes = HiveUtils.getSizeInBytes(table, session) val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes)))) relation.copy(tableMeta = withStats) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index cd321d41f43e8..a6c06e518aae6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive import java.io.File +import java.io.IOException import java.net.{URL, URLClassLoader} import java.nio.charset.StandardCharsets import java.sql.Timestamp @@ -29,6 +30,7 @@ import scala.collection.mutable.HashMap import scala.language.implicitConversions import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.hive.common.`type`.HiveDecimal import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars @@ -499,4 +501,27 @@ private[spark] object HiveUtils extends Logging { table.copy(schema = StructType(dataCols ++ partCols)) } } + + /** + * Method will return the stats for a particular CatalogTable by considering + * session.sessionState.conf.fallBackToHdfsForStatsEnabled proprty, if its not enabled + * then return default stats. + */ + def getSizeInBytes(table: CatalogTable, session: SparkSession): Long = { + val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { + try { + val hadoopConf = session.sessionState.newHadoopConf() + val tablePath = new Path(table.location) + val fs: FileSystem = tablePath.getFileSystem(hadoopConf) + fs.getContentSummary(tablePath).getLength + } catch { + case e: IOException => + logWarning("Failed to get table size from hdfs.", e) + session.sessionState.conf.defaultSizeInBytes + } + } else { + session.sessionState.conf.defaultSizeInBytes + } + sizeInBytes + } }