diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index ce8c23ac6dceb..a6ec48a477b64 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.catalog +import java.math.RoundingMode.UP import java.net.URI import java.time.ZoneOffset import java.util.Date @@ -24,6 +25,8 @@ import java.util.Date import scala.collection.mutable import scala.util.control.NonFatal +import com.google.common.math.DoubleMath.roundToBigInteger + import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} @@ -378,6 +381,7 @@ object CatalogTable { */ case class CatalogStatistics( sizeInBytes: BigInt, + deserFactor: Option[Int] = None, rowCount: Option[BigInt] = None, colStats: Map[String, CatalogColumnStat] = Map.empty) { @@ -385,7 +389,8 @@ case class CatalogStatistics( * Convert [[CatalogStatistics]] to [[Statistics]], and match column stats to attributes based * on column names. */ - def toPlanStats(planOutput: Seq[Attribute], cboEnabled: Boolean): Statistics = { + def toPlanStats(planOutput: Seq[Attribute], cboEnabled: Boolean, deserFactorDistortion: Double) + : Statistics = { if (cboEnabled && rowCount.isDefined) { val attrStats = AttributeMap(planOutput .flatMap(a => colStats.get(a.name).map(a -> _.toPlanStat(a.name, a.dataType)))) @@ -393,16 +398,20 @@ case class CatalogStatistics( val size = EstimationUtils.getOutputSize(planOutput, rowCount.get, attrStats) Statistics(sizeInBytes = size, rowCount = rowCount, attributeStats = attrStats) } else { - // When CBO is disabled or the table doesn't have other statistics, we apply the size-only - // estimation strategy and only propagate sizeInBytes in statistics. - Statistics(sizeInBytes = sizeInBytes) + // When CBO is disabled or the table doesn't have other statistics, we apply the file size + // based estimation strategy and only propagate sizeInBytes in statistics. + val size = deserFactor.map { factor => + BigInt(roundToBigInteger(sizeInBytes.doubleValue * deserFactorDistortion * factor, UP)) + }.getOrElse(sizeInBytes) + Statistics(sizeInBytes = size) } } /** Readable string representation for the CatalogStatistics. */ def simpleString: String = { - val rowCountString = if (rowCount.isDefined) s", ${rowCount.get} rows" else "" - s"$sizeInBytes bytes$rowCountString" + val rowCountString = rowCount.map(c => s", $c rows").getOrElse("") + val deserFactorString = deserFactor.map(f => s", deserfactor=$f ").getOrElse("") + s"$sizeInBytes bytes$rowCountString$deserFactorString" } } @@ -631,7 +640,7 @@ case class HiveTableRelation( ) override def computeStats(): Statistics = { - tableMeta.stats.map(_.toPlanStats(output, conf.cboEnabled)) + tableMeta.stats.map(_.toPlanStats(output, conf.cboEnabled, conf.deserFactorDistortion)) .orElse(tableStats) .getOrElse { throw new IllegalStateException("table stats must be specified.") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d9b0a72618c7e..d7d98f0c12908 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1336,6 +1336,29 @@ object SQLConf { .booleanConf .createWithDefault(false) + val DESERIALIZATION_FACTOR_CALC_ENABLED = + buildConf("spark.sql.statistics.deserFactor.calc.enabled") + .doc("Enables the calculation of the deserialization factor as a table statistic. " + + "This factor is calculated for columnar storage formats as a ratio of actual data size " + + "to raw file size. Spark uses this ratio to scale up the estimated size, which leads to " + + "better estimate of in-memory data size and improves the query optimization (i.e., join " + + "strategy). Spark stores a ratio, rather than the data size, so that the table can grow " + + "without having to recompute statistics. In case of partitioned table the maximum of " + + "these factors is taken. When the factor is already calculated (and stored in the meta " + + "store) but the calculation is disabled in a subsequent ANALYZE TABLE (by setting this " + + "config to false) then the old factor will be applied as this factor can be removed only " + + "by TRUNCATE or a DROP table.") + .booleanConf + .createWithDefault(false) + + val DESERIALIZATION_FACTOR_EXTRA_DISTORTION = + buildConf("spark.sql.statistics.deserFactor.distortion") + .doc("Distortion value used as an extra multiplier at the application of the " + + "deserialization factor making one capable to modify the computed table size even after " + + "the deserialization factor is calculated and stored in the meta store.") + .doubleConf + .createWithDefault(1.0) + val CBO_ENABLED = buildConf("spark.sql.cbo.enabled") .doc("Enables CBO for estimation of plan statistics when set true.") @@ -2360,6 +2383,10 @@ class SQLConf extends Serializable with Logging { def autoSizeUpdateEnabled: Boolean = getConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED) + def deserFactorStatCalcEnabled: Boolean = getConf(SQLConf.DESERIALIZATION_FACTOR_CALC_ENABLED) + + def deserFactorDistortion: Double = getConf(SQLConf.DESERIALIZATION_FACTOR_EXTRA_DISTORTION) + def joinReorderEnabled: Boolean = getConf(SQLConf.JOIN_REORDER_ENABLED) def joinReorderDPThreshold: Int = getConf(SQLConf.JOIN_REORDER_DP_THRESHOLD) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala index 5017893077922..a0692622daf84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala @@ -111,7 +111,7 @@ case class AnalyzeColumnCommand( throw new AnalysisException("ANALYZE TABLE is not supported on views.") } } else { - val sizeInBytes = CommandUtils.calculateTotalSize(sparkSession, tableMeta) + val sizeWithDeserFactor = CommandUtils.calculateTotalSize(sparkSession, tableMeta) val relation = sparkSession.table(tableIdent).logicalPlan val columnsToAnalyze = getColumnsToAnalyze(tableIdent, relation, columnNames, allColumns) @@ -126,7 +126,8 @@ case class AnalyzeColumnCommand( // We also update table-level stats in order to keep them consistent with column-level stats. val statistics = CatalogStatistics( - sizeInBytes = sizeInBytes, + sizeInBytes = sizeWithDeserFactor.sizeInBytes, + deserFactor = sizeWithDeserFactor.deserFactor, rowCount = Some(rowCount), // Newly computed column stats should override the existing ones. colStats = tableMeta.stats.map(_.colStats).getOrElse(Map.empty) ++ newColCatalogStats) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala index 18fefa0a6f19f..3f157e7f3bda0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala @@ -107,10 +107,11 @@ case class AnalyzePartitionCommand( // Update the metastore if newly computed statistics are different from those // recorded in the metastore. val newPartitions = partitions.flatMap { p => - val newTotalSize = CommandUtils.calculateLocationSize( - sessionState, tableMeta.identifier, p.storage.locationUri) + val totalSizeWithDeserFact = CommandUtils.calculateLocationSize( + sparkSession.sessionState, tableMeta.identifier, p.storage.locationUri) val newRowCount = rowCounts.get(p.spec) - val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount) + val newStats = + CommandUtils.compareAndGetNewStats(tableMeta.stats, totalSizeWithDeserFact, newRowCount) newStats.map(_ => p.copy(stats = newStats)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index b644e6dc471d6..e47e3c20296ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -22,21 +22,27 @@ import java.net.URI import scala.collection.mutable import scala.util.control.NonFatal -import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} -import org.apache.spark.sql.catalyst.catalog.{CatalogColumnStat, CatalogStatistics, CatalogTable} +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.ArrayData import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.datasources.{DataSourceUtils, InMemoryFileIndex} +import org.apache.spark.sql.execution.datasources.orc.OrcUtils +import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.internal.{SessionState, SQLConf} import org.apache.spark.sql.types._ +case class SizeInBytesWithDeserFactor( + sizeInBytes: BigInt, + deserFactor: Option[Int]) object CommandUtils extends Logging { @@ -45,42 +51,93 @@ object CommandUtils extends Logging { val catalog = sparkSession.sessionState.catalog if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) { val newTable = catalog.getTableMetadata(table.identifier) - val newSize = CommandUtils.calculateTotalSize(sparkSession, newTable) - val newStats = CatalogStatistics(sizeInBytes = newSize) + val oldDeserFactor = newTable.stats.flatMap(_.deserFactor) + val newSizeWithDeserFactor = CommandUtils.calculateTotalSize(sparkSession, newTable) + val newStats = CatalogStatistics( + sizeInBytes = newSizeWithDeserFactor.sizeInBytes, + deserFactor = newSizeWithDeserFactor.deserFactor.orElse(oldDeserFactor)) catalog.alterTableStats(table.identifier, Some(newStats)) } else if (table.stats.nonEmpty) { catalog.alterTableStats(table.identifier, None) } } - def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { + def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable) + : SizeInBytesWithDeserFactor = { val sessionState = spark.sessionState if (catalogTable.partitionColumnNames.isEmpty) { - calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) + calculateLocationSize( + spark.sessionState, + catalogTable.identifier, + catalogTable.storage.locationUri) } else { // Calculate table size as a sum of the visible partitions. See SPARK-21079 val partitions = sessionState.catalog.listPartitions(catalogTable.identifier) - if (spark.sessionState.conf.parallelFileListingInStatsComputation) { - val paths = partitions.map(x => new Path(x.storage.locationUri.get)) - val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") - val pathFilter = new PathFilter with Serializable { - override def accept(path: Path): Boolean = isDataPath(path, stagingDir) + val sizeWithDeserFactorsForPartitions = + if (spark.sessionState.conf.parallelFileListingInStatsComputation) { + val paths = partitions.map(x => new Path(x.storage.locationUri.get)) + val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") + val pathFilter = new PathFilter with Serializable { + override def accept(path: Path): Boolean = isDataPath(path, stagingDir) + } + val calcDeserFactEnabled = sessionState.conf.deserFactorStatCalcEnabled + val hadoopConf = sessionState.newHadoopConf() + val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles( + paths, hadoopConf, pathFilter, spark, areRootPaths = true) + fileStatusSeq.flatMap { case (_, fileStatuses) => + fileStatuses.map(sizeInBytesWithDeserFactor(calcDeserFactEnabled, hadoopConf, _)) + } + } else { + partitions.map { tablePartition => + calculateLocationSize( + spark.sessionState, + catalogTable.identifier, + tablePartition.storage.locationUri) + } } - val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles( - paths, sessionState.newHadoopConf(), pathFilter, spark, areRootPaths = true) - fileStatusSeq.flatMap(_._2.map(_.getLen)).sum + sumSizeWithMaxDeserializationFactor(sizeWithDeserFactorsForPartitions) + } + } + + def sumSizeWithMaxDeserializationFactor( + sizesWithFactors: Seq[SizeInBytesWithDeserFactor]): SizeInBytesWithDeserFactor = { + val definedFactors = sizesWithFactors.filter(_.deserFactor.isDefined).map(_.deserFactor.get) + SizeInBytesWithDeserFactor( + sizesWithFactors.map(_.sizeInBytes).sum, + if (definedFactors.isEmpty) None else Some(definedFactors.max)) + } + + def sizeInBytesWithDeserFactor( + calcDeserFactEnabled: Boolean, + hadoopConf: Configuration, + fileStatus: FileStatus): SizeInBytesWithDeserFactor = { + assert(fileStatus.isFile) + val factor = if (calcDeserFactEnabled) { + val rawSize = if (fileStatus.getPath.getName.endsWith(".parquet")) { + Some(ParquetUtils.rawSize(hadoopConf, fileStatus.getPath)) + } else if (fileStatus.getPath.getName.endsWith(".orc")) { + Some(OrcUtils.rawSize(hadoopConf, fileStatus.getPath)) } else { - partitions.map { p => - calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri) - }.sum + None + } + + rawSize.map { rawSize => + // deserialization factor is the quotient of the raw byte size (uncompressed data size) + // with the file size round up to the next integer number + val divAndRemain = rawSize /% BigInt(fileStatus.getLen) + if (divAndRemain._2.signum == 1) divAndRemain._1.toInt + 1 else divAndRemain._1.toInt } + } else { + None } + + SizeInBytesWithDeserFactor(fileStatus.getLen, factor) } def calculateLocationSize( sessionState: SessionState, identifier: TableIdentifier, - locationUri: Option[URI]): Long = { + locationUri: Option[URI]): SizeInBytesWithDeserFactor = { // This method is mainly based on // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table) // in Hive 0.13 (except that we do not use fs.getContentSummary). @@ -90,50 +147,51 @@ object CommandUtils extends Logging { // Seems fs.getContentSummary returns wrong table size on Jenkins. So we use // countFileSize to count the table size. val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") + val hadoopConf = sessionState.newHadoopConf() - def getPathSize(fs: FileSystem, path: Path): Long = { + val calcDeserFactEnabled = sessionState.conf.deserFactorStatCalcEnabled + def getSumSizeInBytesWithDeserFactor(fs: FileSystem, path: Path): SizeInBytesWithDeserFactor = { val fileStatus = fs.getFileStatus(path) - val size = if (fileStatus.isDirectory) { - fs.listStatus(path) - .map { status => - if (isDataPath(status.getPath, stagingDir)) { - getPathSize(fs, status.getPath) - } else { - 0L - } - }.sum + if (fileStatus.isDirectory) { + val fileSizesWithDeserFactor = fs.listStatus(path).map { status => + if (isDataPath(status.getPath, stagingDir)) { + getSumSizeInBytesWithDeserFactor(fs, status.getPath) + } else { + SizeInBytesWithDeserFactor(0L, None) + } + } + sumSizeWithMaxDeserializationFactor(fileSizesWithDeserFactor) } else { - fileStatus.getLen + sizeInBytesWithDeserFactor(calcDeserFactEnabled, hadoopConf, fileStatus) } - - size } val startTime = System.nanoTime() logInfo(s"Starting to calculate the total file size under path $locationUri.") - val size = locationUri.map { p => + val fileSizesWithDeserFactor = locationUri.map { p => val path = new Path(p) try { - val fs = path.getFileSystem(sessionState.newHadoopConf()) - getPathSize(fs, path) + val fs = path.getFileSystem(hadoopConf) + getSumSizeInBytesWithDeserFactor(fs, path) } catch { case NonFatal(e) => logWarning( s"Failed to get the size of table ${identifier.table} in the " + s"database ${identifier.database} because of ${e.toString}", e) - 0L + SizeInBytesWithDeserFactor(0L, None) } - }.getOrElse(0L) + }.getOrElse(SizeInBytesWithDeserFactor(0L, None)) val durationInMs = (System.nanoTime() - startTime) / (1000 * 1000) logInfo(s"It took $durationInMs ms to calculate the total file size under path $locationUri.") - size + fileSizesWithDeserFactor } def compareAndGetNewStats( oldStats: Option[CatalogStatistics], - newTotalSize: BigInt, + newSizeWithDeserFactor: SizeInBytesWithDeserFactor, newRowCount: Option[BigInt]): Option[CatalogStatistics] = { + val newTotalSize = newSizeWithDeserFactor.sizeInBytes val oldTotalSize = oldStats.map(_.sizeInBytes).getOrElse(BigInt(-1)) val oldRowCount = oldStats.flatMap(_.rowCount).getOrElse(BigInt(-1)) var newStats: Option[CatalogStatistics] = None @@ -144,15 +202,26 @@ object CommandUtils extends Logging { // 1. when total size is not changed, we don't need to alter the table; // 2. when total size is changed, `oldRowCount` becomes invalid. // This is to make sure that we only record the right statistics. - if (newRowCount.isDefined) { - if (newRowCount.get >= 0 && newRowCount.get != oldRowCount) { - newStats = if (newStats.isDefined) { - newStats.map(_.copy(rowCount = newRowCount)) - } else { - Some(CatalogStatistics(sizeInBytes = oldTotalSize, rowCount = newRowCount)) - } + newRowCount.foreach { rowCount => + if (rowCount >= 0 && rowCount != oldRowCount) { + newStats = newStats + .map(_.copy(rowCount = newRowCount)) + .orElse(Some(CatalogStatistics(sizeInBytes = oldTotalSize, rowCount = newRowCount))) + } + } + val oldDeserFactor = oldStats.flatMap(_.deserFactor) + val newDeserFactor = newSizeWithDeserFactor.deserFactor.orElse(oldDeserFactor) + if (oldDeserFactor != newDeserFactor || newStats.isDefined) { + newDeserFactor.foreach { _ => + newStats = newStats + .map(_.copy(deserFactor = newDeserFactor)) + .orElse(Some(CatalogStatistics( + sizeInBytes = oldTotalSize, + deserFactor = newDeserFactor, + rowCount = None))) } } + newStats } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index ee5d37cebf2f3..e57e7fa592f77 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -450,14 +450,25 @@ case class AlterTableAddPartitionCommand( } catalog.createPartitions(table.identifier, parts, ignoreIfExists = ifNotExists) - if (table.stats.nonEmpty) { + table.stats.foreach { stats => if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) { - val addedSize = parts.map { part => - CommandUtils.calculateLocationSize(sparkSession.sessionState, table.identifier, - part.storage.locationUri) - }.sum + val newPartsTotalSizeAndDeserializationFactor = + CommandUtils.sumSizeWithMaxDeserializationFactor( + parts.map { part => + CommandUtils.calculateLocationSize( + sparkSession.sessionState, + table.identifier, + part.storage.locationUri) + }) + val addedSize = newPartsTotalSizeAndDeserializationFactor.sizeInBytes + // if the calculation of the deserialization factor is disabled now then take the old factor + // otherwise take the largest factor + val newFactor = newPartsTotalSizeAndDeserializationFactor + .deserFactor + .map(Math.max(_, stats.deserFactor.getOrElse(0))) + .orElse(stats.deserFactor) if (addedSize > 0) { - val newStats = CatalogStatistics(sizeInBytes = table.stats.get.sizeInBytes + addedSize) + val newStats = CatalogStatistics(sizeInBytes = stats.sizeInBytes + addedSize, newFactor) catalog.alterTableStats(table.identifier, Some(newStats)) } } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 9377cb0174673..4a51edda3337e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -492,7 +492,9 @@ case class TruncateTableCommand( if (table.stats.nonEmpty) { // empty table after truncation - val newStats = CatalogStatistics(sizeInBytes = 0, rowCount = Some(0)) + val newStats = CatalogStatistics(sizeInBytes = 0, + deserFactor = None, + rowCount = Some(0)) catalog.alterTableStats(tableName, Some(newStats)) } Seq.empty[Row] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index 35bda5682fda1..33c6af7e29a1a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -41,7 +41,7 @@ case class LogicalRelation( override def computeStats(): Statistics = { catalogTable - .flatMap(_.stats.map(_.toPlanStats(output, conf.cboEnabled))) + .flatMap(_.stats.map(_.toPlanStats(output, conf.cboEnabled, conf.deserFactorDistortion))) .getOrElse(Statistics(sizeInBytes = relation.sizeInBytes)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 927e77a53bf47..449dd7812a9d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -56,8 +56,12 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { val prunedFsRelation = fsRelation.copy(location = prunedFileIndex)(sparkSession) // Change table stats based on the sizeInBytes of pruned files - val withStats = logicalRelation.catalogTable.map(_.copy( - stats = Some(CatalogStatistics(sizeInBytes = BigInt(prunedFileIndex.sizeInBytes))))) + val withStats = logicalRelation.catalogTable.map { catalogTable => + catalogTable.copy( + stats = Some(CatalogStatistics( + sizeInBytes = BigInt(prunedFileIndex.sizeInBytes), + deserFactor = catalogTable.stats.flatMap(_.deserFactor)))) + } val prunedLogicalRelation = logicalRelation.copy( relation = prunedFsRelation, catalogTable = withStats) // Keep partition-pruning predicates so that they are visible in physical planning diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index 12d4244e19812..8c3c4751f2adc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources.orc +import java.io.IOException import java.nio.charset.StandardCharsets.UTF_8 import java.util.Locale @@ -35,7 +36,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.execution.datasources.SchemaMergeUtils import org.apache.spark.sql.types._ -import org.apache.spark.util.{SerializableConfiguration, ThreadUtils} +import org.apache.spark.util.ThreadUtils object OrcUtils extends Logging { @@ -46,6 +47,17 @@ object OrcUtils extends Logging { "ZLIB" -> ".zlib", "LZO" -> ".lzo") + def rawSize(hadoopConf: Configuration, filePath: Path): BigInt = { + val fs = filePath.getFileSystem(hadoopConf) + val readerOptions = OrcFile.readerOptions(hadoopConf).filesystem(fs) + try { + val reader = OrcFile.createReader(filePath, readerOptions) + reader.getRawDataSize + } catch { + case _: IOException => BigInt(0) + } + } + def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = { val origPath = new Path(pathStr) val fs = origPath.getFileSystem(conf) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala index 7e7dba92f37b5..c22ed4535ba26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala @@ -16,13 +16,28 @@ */ package org.apache.spark.sql.execution.datasources.parquet +import java.io.IOException + +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.parquet.hadoop.ParquetFileWriter +import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType object ParquetUtils { + + def rawSize(hadoopConf: Configuration, filePath: Path): BigInt = try { + val footer = ParquetFileReader.readFooter(hadoopConf, filePath) + footer + .getBlocks + .stream + .mapToLong(_.getColumns.stream.mapToLong(_.getTotalUncompressedSize).sum()) + .sum() + } catch { + case _: IOException => BigInt(0) + } + def inferSchema( sparkSession: SparkSession, parameters: Map[String, String], diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala index 915f66526c3e6..9ad06dfafd6bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala @@ -30,6 +30,8 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogColumnStat, CatalogStatisti import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Histogram, HistogramBin, HistogramSerializer, LogicalPlan} import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._ import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.exchange.EnsureRequirements +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.test.SQLTestUtils @@ -369,4 +371,14 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils assert(relation.stats.attributeStats.isEmpty) } } + + def checkNumBroadcastHashJoins(df: DataFrame, expectedNumBhj: Int, clue: String): Unit = { + val plan = EnsureRequirements(spark.sessionState.conf).apply(df.queryExecution.sparkPlan) + assert(plan.collect { case p: BroadcastHashJoinExec => p }.size === expectedNumBhj, clue) + } + + def checkNumSortMergeJoins(df: DataFrame, expectedNumSmj: Int, clue: String): Unit = { + val plan = EnsureRequirements(spark.sessionState.conf).apply(df.queryExecution.sparkPlan) + assert(plan.collect { case p: SortMergeJoinExec => p }.size === expectedNumSmj, clue) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CommandUtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CommandUtilsSuite.scala index f3e15189a6418..086aa9bbd7f8c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CommandUtilsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CommandUtilsSuite.scala @@ -23,22 +23,22 @@ import org.apache.spark.sql.catalyst.catalog.CatalogStatistics class CommandUtilsSuite extends SparkFunSuite { test("Check if compareAndGetNewStats returns correct results") { - val oldStats1 = CatalogStatistics(sizeInBytes = 10, rowCount = Some(100)) + val oldStats1 = CatalogStatistics(sizeInBytes = 10, None, rowCount = Some(100)) val newStats1 = CommandUtils.compareAndGetNewStats( - Some(oldStats1), newTotalSize = 10, newRowCount = Some(100)) + Some(oldStats1), SizeInBytesWithDeserFactor(10, None), newRowCount = Some(100)) assert(newStats1.isEmpty) val newStats2 = CommandUtils.compareAndGetNewStats( - Some(oldStats1), newTotalSize = -1, newRowCount = None) + Some(oldStats1), SizeInBytesWithDeserFactor(-1, None), newRowCount = None) assert(newStats2.isEmpty) val newStats3 = CommandUtils.compareAndGetNewStats( - Some(oldStats1), newTotalSize = 20, newRowCount = Some(-1)) + Some(oldStats1), SizeInBytesWithDeserFactor(20, None), newRowCount = Some(-1)) assert(newStats3.isDefined) newStats3.foreach { stat => assert(stat.sizeInBytes === 20) assert(stat.rowCount.isEmpty) } val newStats4 = CommandUtils.compareAndGetNewStats( - Some(oldStats1), newTotalSize = -1, newRowCount = Some(200)) + Some(oldStats1), SizeInBytesWithDeserFactor(-1, None), newRowCount = Some(200)) assert(newStats4.isDefined) newStats4.foreach { stat => assert(stat.sizeInBytes === 10) @@ -48,9 +48,36 @@ class CommandUtilsSuite extends SparkFunSuite { test("Check if compareAndGetNewStats can handle large values") { // Tests for large values - val oldStats2 = CatalogStatistics(sizeInBytes = BigInt(Long.MaxValue) * 2) + val oldStats2 = CatalogStatistics(sizeInBytes = BigInt(Long.MaxValue) * 2, None) val newStats5 = CommandUtils.compareAndGetNewStats( - Some(oldStats2), newTotalSize = BigInt(Long.MaxValue) * 2, None) + Some(oldStats2), SizeInBytesWithDeserFactor(BigInt(Long.MaxValue) * 2, None), None) assert(newStats5.isEmpty) } + + test("compareAndGetNewStats with deserialization factor") { + val oldStats3 = Some(CatalogStatistics(sizeInBytes = BigInt(1), Some(2), Some(300))) + val newStats6 = CommandUtils.compareAndGetNewStats( + oldStats3, SizeInBytesWithDeserFactor(BigInt(1), deserFactor = None), Some(300)) + assert(newStats6.isEmpty, + "the old deserFactor should be inherited when its calculation disabled at subsequent runs" + + "and it must be empty as no other attribute changed") + + val newStats7 = CommandUtils.compareAndGetNewStats( + oldStats3, SizeInBytesWithDeserFactor(BigInt(1), deserFactor = Some(2)), Some(300)) + assert(newStats7.isEmpty) + + val newStats8 = CommandUtils.compareAndGetNewStats( + oldStats3, SizeInBytesWithDeserFactor(BigInt(5), deserFactor = Some(2)), Some(300)) + assert(newStats8.isDefined && + newStats8.get === CatalogStatistics(BigInt(5), deserFactor = Some(2), None), + "sizeInBytes is changed so a new catalog statistics is needed as the rowCount " + + "is not changed it is None") + + val newStats9 = CommandUtils.compareAndGetNewStats( + oldStats3, SizeInBytesWithDeserFactor(BigInt(1), deserFactor = Some(4)), Some(300)) + assert(newStats9.isDefined && + newStats9.get === CatalogStatistics(BigInt(1), deserFactor = Some(4), None), + "factor is changed so a new catalog statistics is needed as the rowCount " + + "is not changed it is None") + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 03874d005a6e6..62e4e7f2440d7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -1079,6 +1079,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat if (stats.rowCount.isDefined) { statsProperties += STATISTICS_NUM_ROWS -> stats.rowCount.get.toString() } + if (stats.deserFactor.isDefined) { + statsProperties += STATISTICS_DESER_FACTOR -> stats.deserFactor.get.toString() + } stats.colStats.foreach { case (colName, colStat) => colStat.toMap(colName).foreach { case (k, v) => @@ -1120,6 +1123,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat Some(CatalogStatistics( sizeInBytes = BigInt(statsProps(STATISTICS_TOTAL_SIZE)), + deserFactor = statsProps.get(STATISTICS_DESER_FACTOR).map(_.toInt), rowCount = statsProps.get(STATISTICS_NUM_ROWS).map(BigInt(_)), colStats = colStats.toMap)) } @@ -1330,6 +1334,8 @@ object HiveExternalCatalog { val STATISTICS_PREFIX = SPARK_SQL_PREFIX + "statistics." val STATISTICS_TOTAL_SIZE = STATISTICS_PREFIX + "totalSize" + val STATISTICS_DESER_FACTOR = STATISTICS_PREFIX + "deserFactor" + val STATISTICS_NUM_ROWS = STATISTICS_PREFIX + "numRows" val STATISTICS_COL_STATS_PREFIX = STATISTICS_PREFIX + "colStats." diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 5b2eeb2cf34c0..46ee401f333a0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -55,7 +55,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA, DATASOURCE_SCHEMA_NUMPARTS, DATASOURCE_SCHEMA_PART_PREFIX} +import org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA, DATASOURCE_SCHEMA_NUMPARTS, DATASOURCE_SCHEMA_PART_PREFIX, STATISTICS_DESER_FACTOR} import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.client.HiveClientImpl._ import org.apache.spark.sql.types._ @@ -1169,10 +1169,17 @@ private[hive] object HiveClientImpl { // return None. // In Hive, when statistics gathering is disabled, `rawDataSize` and `numRows` is always // zero after INSERT command. So they are used here only if they are larger than zero. + val deserFactor = properties.get(STATISTICS_DESER_FACTOR).map(_.toInt) if (totalSize.isDefined && totalSize.get > 0L) { - Some(CatalogStatistics(sizeInBytes = totalSize.get, rowCount = rowCount.filter(_ > 0))) + Some(CatalogStatistics( + sizeInBytes = totalSize.get, + deserFactor = deserFactor, + rowCount = rowCount.filter(_ > 0))) } else if (rawDataSize.isDefined && rawDataSize.get > 0) { - Some(CatalogStatistics(sizeInBytes = rawDataSize.get, rowCount = rowCount.filter(_ > 0))) + Some(CatalogStatistics( + sizeInBytes = rawDataSize.get, + deserFactor = deserFactor, + rowCount = rowCount.filter(_ > 0))) } else { // TODO: still fill the rowCount even if sizeInBytes is empty. Might break anything? None diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 40581066c62bb..8a0abf5ae6bcf 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -31,11 +31,10 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException import org.apache.spark.sql.catalyst.catalog.{CatalogColumnStat, CatalogStatistics, HiveTableRelation} -import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, HistogramBin, HistogramSerializer} +import org.apache.spark.sql.catalyst.plans.logical.HistogramBin import org.apache.spark.sql.catalyst.util.{DateTimeUtils, StringUtils} import org.apache.spark.sql.execution.command.{AnalyzeColumnCommand, CommandUtils, DDLUtils} import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.hive.HiveExternalCatalog._ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf @@ -204,9 +203,9 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto .getTableMetadata(TableIdentifier(checkSizeTable)) HiveCatalogMetrics.reset() assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0) - val size = CommandUtils.calculateTotalSize(spark, tableMeta) + val sizeWithDeserFactor = CommandUtils.calculateTotalSize(spark, tableMeta) assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 1) - assert(size === BigInt(17436)) + assert(sizeWithDeserFactor.sizeInBytes === BigInt(17436)) } } } @@ -1328,12 +1327,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto && sizes(1) <= spark.sessionState.conf.autoBroadcastJoinThreshold, s"query should contain two relations, each of which has size smaller than autoConvertSize") - // Using `sparkPlan` because for relevant patterns in HashJoin to be - // matched, other strategies need to be applied. - var bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoinExec => j } - assert(bhj.size === 1, + checkNumBroadcastHashJoins(df, 1, s"actual query plans do not contain broadcast join: ${df.queryExecution}") - checkAnswer(df, expectedAnswer) // check correctness of output spark.sessionState.conf.settings.synchronized { @@ -1341,11 +1336,10 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto sql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=-1""") df = sql(query) - bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoinExec => j } - assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off") - val shj = df.queryExecution.sparkPlan.collect { case j: SortMergeJoinExec => j } - assert(shj.size === 1, + checkNumBroadcastHashJoins(df, 0, + "BroadcastHashJoin still planned even though it is switched off") + checkNumSortMergeJoins(df, 1, "SortMergeJoin should be planned when BroadcastHashJoin is turned off") sql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=$tmp""") @@ -1382,14 +1376,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto && sizes(0) <= spark.sessionState.conf.autoBroadcastJoinThreshold, s"query should contain two relations, each of which has size smaller than autoConvertSize") - // Using `sparkPlan` because for relevant patterns in HashJoin to be - // matched, other strategies need to be applied. - var bhj = df.queryExecution.sparkPlan.collect { - case j: BroadcastHashJoinExec => j - } - assert(bhj.size === 1, + checkNumBroadcastHashJoins(df, 1, s"actual query plans do not contain broadcast join: ${df.queryExecution}") - checkAnswer(df, answer) // check correctness of output spark.sessionState.conf.settings.synchronized { @@ -1397,15 +1385,9 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=-1") df = sql(leftSemiJoinQuery) - bhj = df.queryExecution.sparkPlan.collect { - case j: BroadcastHashJoinExec => j - } - assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off") - - val shj = df.queryExecution.sparkPlan.collect { - case j: SortMergeJoinExec => j - } - assert(shj.size === 1, + checkNumBroadcastHashJoins(df, 0, + "BroadcastHashJoin still planned even though it is switched off") + checkNumSortMergeJoins(df, 1, "SortMergeJoinExec should be planned when BroadcastHashJoin is turned off") sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=$tmp") @@ -1514,4 +1496,100 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } } + + private def checkDeserializationFactor(tableName: String, exists: Boolean): Unit = { + spark.sessionState.catalog.refreshTable(TableIdentifier(tableName)) + val catalogTable = getCatalogTable(tableName) + assert(catalogTable.stats.isDefined) + assert(catalogTable.stats.get.deserFactor.isDefined === exists) + } + + private def testDeserializationFactor(fileformat: String) + : Unit = test(s"SPARK-24914 - test deserialization factor ($fileformat)") { + val table = s"sizeTest" + + withTable(table) { + sql(s"CREATE TABLE $table (key STRING, value STRING) PARTITIONED BY (ds STRING) " + + s"STORED AS $fileformat") + sql(s"INSERT INTO TABLE $table PARTITION (ds='2010-01-01') SELECT '1', 'A' FROM SRC") + + val catalogTable = getCatalogTable(table) + assert(catalogTable.stats.isEmpty) + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS") + checkDeserializationFactor(table, exists = false) + val origSizeInBytes = spark.table(table).queryExecution.optimizedPlan.stats.sizeInBytes + logInfo(s"original sizeInBytes (file size): $origSizeInBytes") + + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> s"${origSizeInBytes - 1}") { + val res = sql(s"SELECT * FROM $table t1, $table t2 WHERE t1.key = t2.key") + checkNumBroadcastHashJoins(res, 0, + "sort merge join should be taken as threshold is smaller than table size") + } + + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> s"${origSizeInBytes + 1}") { + val res = sql(s"SELECT * FROM $table t1, $table t2 WHERE t1.key = t2.key") + checkNumBroadcastHashJoins(res, 1, + "broadcast join should be taken as the threshold is greater than table size") + } + + withSQLConf( + SQLConf.DESERIALIZATION_FACTOR_CALC_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> s"${origSizeInBytes + 1}") { + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS") + checkDeserializationFactor(table, exists = true) + val newSizeInBytes = spark.table(table).queryExecution.optimizedPlan.stats.sizeInBytes + assert(2 * origSizeInBytes <= newSizeInBytes) + logInfo(s"sizeInBytes after applying deserFactor: $newSizeInBytes") + val res = sql(s"SELECT * FROM $table t1, $table t2 WHERE t1.key = t2.key") + checkNumBroadcastHashJoins(res, 0, + "sort merge join should be taken despite the threshold is greater than the table" + + "size as the deserialization factor is applied") + } + + withSQLConf( + SQLConf.DESERIALIZATION_FACTOR_CALC_ENABLED.key -> "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> s"${origSizeInBytes + 1}") { + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS") + checkDeserializationFactor(table, exists = true) + val res = sql(s"SELECT * FROM $table t1, $table t2 WHERE t1.key = t2.key") + checkNumBroadcastHashJoins(res, 0, + "sort merge join should be taken despite deserialization factor calculation is " + + "disabled as the old factor is reused") + } + + withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> "true") { + val catalogTableBefore = getCatalogTable(table) + val deserFactorBefore = catalogTableBefore.stats.get.deserFactor.get + sql(s"INSERT INTO TABLE $table PARTITION (ds='2010-01-02') SELECT '1', 'A' from SRC") + spark.sessionState.catalog.refreshTable(TableIdentifier(table)) + val catalogTable1 = getCatalogTable(table) + assert(catalogTable1.stats.isDefined && + catalogTable1.stats.get.deserFactor.isDefined) + assert(catalogTable1.stats.get.deserFactor.get === deserFactorBefore, + "deserFactor should not change by adding a smaller or same size partition") + + sql(s"INSERT INTO TABLE $table PARTITION (ds='2010-01-03') SELECT * FROM SRC") + spark.sessionState.catalog.refreshTable(TableIdentifier(table)) + val catalogTable2 = getCatalogTable(table) + assert(catalogTable2.stats.isDefined && + catalogTable2.stats.get.deserFactor.isDefined) + assert(catalogTable2.stats.get.deserFactor.get > deserFactorBefore, + "deserialization factor increased after adding a larger partition") + } + + sql(s"TRUNCATE TABLE $table") + + withSQLConf( + SQLConf.DESERIALIZATION_FACTOR_CALC_ENABLED.key -> "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> s"${origSizeInBytes + 1}") { + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS") + checkDeserializationFactor(table, exists = false) + val res = sql(s"SELECT * FROM $table t1, $table t2 WHERE t1.key = t2.key") + checkNumBroadcastHashJoins(res, 1, + "broadcast join should be taken as deserialization factor is deleted by TRUNCATE") + } + } + } + + Seq("PARQUET", "ORC").foreach(testDeserializationFactor) }