Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2493,6 +2493,15 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val AUTO_UPDATE_STATS_BASED_ON_METRICS_ENABLED =
buildConf("spark.sql.statistics.autoUpdateStatsBasedOnMetrics.enabled")
.doc("Enables automatic update for table size and row count based on write metrics. Only " +
"update stats for overwriting non-partition table as we don't known if the old " +
"are accurate.")
.version("3.4.0")
.booleanConf
.createWithDefault(true)

val CBO_ENABLED =
buildConf("spark.sql.cbo.enabled")
.doc("Enables CBO for estimation of plan statistics when set true.")
Expand Down Expand Up @@ -4626,6 +4635,9 @@ class SQLConf extends Serializable with Logging {

def autoSizeUpdateEnabled: Boolean = getConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED)

def autoUpdateStatsBasedOnMetricsEnabled: Boolean =
getConf(SQLConf.AUTO_UPDATE_STATS_BASED_ON_METRICS_ENABLED)

def joinReorderEnabled: Boolean = getConf(SQLConf.JOIN_REORDER_ENABLED)

def joinReorderDPThreshold: Int = getConf(SQLConf.JOIN_REORDER_DP_THRESHOLD)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.util.control.NonFatal
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions._
Expand All @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, InMemoryFileIndex}
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, InMemoryFileIndex, WriteStats}
import org.apache.spark.sql.internal.{SessionState, SQLConf}
import org.apache.spark.sql.types._

Expand All @@ -52,10 +52,28 @@ class PathFilterIgnoreNonData(stagingDir: String) extends PathFilter with Serial

object CommandUtils extends Logging {

/** Change statistics after changing data by commands. */
def updateTableStats(sparkSession: SparkSession, table: CatalogTable): Unit = {
val catalog = sparkSession.sessionState.catalog
if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
/**
* Change statistics after changing data by commands.
* If SparkSession.sessionState.catalog is HiveExternalCatalog, after we set table stats
* to None, hive metastore will auto update NUM_FILES and TOTAL_SIZE stats for un-partitioned
* table in MetastoreUtils.populateQuickStats()
* If SparkSession.sessionState.catalog is InMemoryCatalog, after we set table stats to None,
* the table stats will be None.
* */
def updateTableStats(
sparkSession: SparkSession,
table: CatalogTable,
wroteStats: Option[WriteStats] = None): Unit = {
val sessionState = sparkSession.sessionState
val catalog = sessionState.catalog
if (sessionState.conf.autoUpdateStatsBasedOnMetricsEnabled &&
table.partitionColumnNames.isEmpty && wroteStats.nonEmpty &&
(wroteStats.get.mode == SaveMode.Overwrite ||
wroteStats.get.mode == SaveMode.ErrorIfExists)) {
val stat = wroteStats.get
catalog.alterTableStats(table.identifier,
Some(CatalogStatistics(stat.numBytes, stat.numRows)))
} else if (sessionState.conf.autoSizeUpdateEnabled) {
val newTable = catalog.getTableMetadata(table.identifier)
val (newSize, newPartitions) = CommandUtils.calculateTotalSize(sparkSession, newTable)
val isNewStats = newTable.stats.map(newSize != _.sizeInBytes).getOrElse(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ case class CreateDataSourceTableAsSelectCommand(
}
}

CommandUtils.updateTableStats(sparkSession, table)
val writeStats = BasicWriteJobStatsTracker.getWriteStats(mode, metrics)
CommandUtils.updateTableStats(sparkSession, table, writeStats)

Seq.empty[Row]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.{SparkContext, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker._
Expand All @@ -45,6 +46,10 @@ case class BasicWriteTaskStats(
numRows: Long)
extends WriteTaskStats

/**
* The write mode and statistics, used to update table statistics.
*/
case class WriteStats(mode: SaveMode, numBytes: BigInt, numRows: Option[BigInt])

/**
* Simple [[WriteTaskStatsTracker]] implementation that produces [[BasicWriteTaskStats]].
Expand Down Expand Up @@ -249,4 +254,14 @@ object BasicWriteJobStatsTracker {
JOB_COMMIT_TIME -> SQLMetrics.createTimingMetric(sparkContext, "job commit time")
)
}

def getWriteStats(mode: SaveMode, metrics: Map[String, SQLMetric]): Option[WriteStats] = {
if (metrics.contains(NUM_OUTPUT_BYTES_KEY) && metrics.contains(NUM_OUTPUT_ROWS_KEY)) {
val numBytes = metrics.get(NUM_OUTPUT_BYTES_KEY).map(_.value).map(BigInt(_))
val numRows = metrics.get(NUM_OUTPUT_ROWS_KEY).map(_.value).map(BigInt(_))
numBytes.map(WriteStats(mode, _, numRows))
} else {
None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ case class InsertIntoHadoopFsRelationCommand(
sparkSession.sharedState.cacheManager.recacheByPath(sparkSession, outputPath, fs)

if (catalogTable.nonEmpty) {
CommandUtils.updateTableStats(sparkSession, catalogTable.get)
val writeStats = BasicWriteJobStatsTracker.getWriteStats(mode, metrics)
CommandUtils.updateTableStats(sparkSession, catalogTable.get, writeStats)
}

} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ Last Access [not included in comparison]
Created By [not included in comparison]
Type MANAGED
Provider parquet
Statistics 458 bytes, 0 rows
Location [not included in comparison]/{warehouse_dir}/char_tbl2


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,25 +99,27 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
}

test("analyze empty table") {
val table = "emptyTable"
withTable(table) {
val df = Seq.empty[Int].toDF("key")
df.write.format("json").saveAsTable(table)
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS noscan")
val fetchedStats1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None)
assert(fetchedStats1.get.sizeInBytes == 0)
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS")
val fetchedStats2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0))
assert(fetchedStats2.get.sizeInBytes == 0)

val expectedColStat =
"key" -> CatalogColumnStat(Some(0), None, None, Some(0),
Some(IntegerType.defaultSize), Some(IntegerType.defaultSize))

// There won't be histogram for empty column.
Seq("true", "false").foreach { histogramEnabled =>
withSQLConf(SQLConf.HISTOGRAM_ENABLED.key -> histogramEnabled) {
checkColStats(df, mutable.LinkedHashMap(expectedColStat))
withSQLConf(SQLConf.AUTO_UPDATE_STATS_BASED_ON_METRICS_ENABLED.key -> "false") {
val table = "emptyTable"
withTable(table) {
val df = Seq.empty[Int].toDF("key")
df.write.format("json").saveAsTable(table)
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS noscan")
val stats1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None)
assert(stats1.get.sizeInBytes == 0)
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS")
val stats2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0))
assert(stats2.get.sizeInBytes == 0)

val expectedColStat =
"key" -> CatalogColumnStat(Some(0), None, None, Some(0),
Some(IntegerType.defaultSize), Some(IntegerType.defaultSize))

// There won't be histogram for empty column.
Seq("true", "false").foreach { histogramEnabled =>
withSQLConf(SQLConf.HISTOGRAM_ENABLED.key -> histogramEnabled) {
checkColStats(df, mutable.LinkedHashMap(expectedColStat))
}
}
}
}
Expand Down Expand Up @@ -156,18 +158,20 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
}

test("test table-level statistics for data source table") {
val tableName = "tbl"
withTable(tableName) {
sql(s"CREATE TABLE $tableName(i INT, j STRING) USING parquet")
Seq(1 -> "a", 2 -> "b").toDF("i", "j").write.mode("overwrite").insertInto(tableName)

// noscan won't count the number of rows
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS noscan")
checkTableStats(tableName, hasSizeInBytes = true, expectedRowCounts = None)

// without noscan, we count the number of rows
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
checkTableStats(tableName, hasSizeInBytes = true, expectedRowCounts = Some(2))
withSQLConf(SQLConf.AUTO_UPDATE_STATS_BASED_ON_METRICS_ENABLED.key -> "false") {
val tableName = "tbl"
withTable(tableName) {
sql(s"CREATE TABLE $tableName(i INT, j STRING) USING parquet")
Seq(1 -> "a", 2 -> "b").toDF("i", "j").write.mode("overwrite").insertInto(tableName)

// noscan won't count the number of rows
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS noscan")
checkTableStats(tableName, hasSizeInBytes = true, expectedRowCounts = None)

// without noscan, we count the number of rows
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
checkTableStats(tableName, hasSizeInBytes = true, expectedRowCounts = Some(2))
}
}
}

Expand Down Expand Up @@ -319,7 +323,8 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
test("change stats after insert command for datasource table") {
val table = "change_stats_insert_datasource_table"
Seq(false, true).foreach { autoUpdate =>
withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> autoUpdate.toString) {
withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> autoUpdate.toString,
SQLConf.AUTO_UPDATE_STATS_BASED_ON_METRICS_ENABLED.key -> "false") {
withTable(table) {
sql(s"CREATE TABLE $table (i int, j string) USING PARQUET")
// analyze to get initial stats
Expand Down Expand Up @@ -352,7 +357,8 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
test("auto gather stats after insert command") {
val table = "change_stats_insert_datasource_table"
Seq(false, true).foreach { autoUpdate =>
withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> autoUpdate.toString) {
withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> autoUpdate.toString,
SQLConf.AUTO_UPDATE_STATS_BASED_ON_METRICS_ENABLED.key -> "false") {
withTable(table) {
sql(s"CREATE TABLE $table (i int, j string) USING PARQUET")
// insert into command
Expand Down Expand Up @@ -686,7 +692,8 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
test(s"CTAS should update statistics if ${SQLConf.AUTO_SIZE_UPDATE_ENABLED.key} is enabled") {
val tableName = "spark_27694"
Seq(false, true).foreach { updateEnabled =>
withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> updateEnabled.toString) {
withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> updateEnabled.toString,
SQLConf.AUTO_UPDATE_STATS_BASED_ON_METRICS_ENABLED.key -> "false") {
withTable(tableName) {
// Create a data source table using the result of a query.
sql(s"CREATE TABLE $tableName USING parquet AS SELECT 'a', 'b'")
Expand Down Expand Up @@ -788,41 +795,75 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
}

test("SPARK-33687: analyze all tables in a specific database") {
withTempDatabase { database =>
spark.catalog.setCurrentDatabase(database)
withTempDir { dir =>
withTable("t1", "t2") {
spark.range(10).write.saveAsTable("t1")
sql(s"CREATE EXTERNAL TABLE t2 USING parquet LOCATION '${dir.toURI}' " +
"AS SELECT * FROM range(20)")
withView("v1", "v2") {
sql("CREATE VIEW v1 AS SELECT 1 c1")
sql("CREATE VIEW v2 AS SELECT 2 c2")
sql("CACHE TABLE v1")
sql("CACHE LAZY TABLE v2")

sql(s"ANALYZE TABLES IN $database COMPUTE STATISTICS NOSCAN")
checkTableStats("t1", hasSizeInBytes = true, expectedRowCounts = None)
checkTableStats("t2", hasSizeInBytes = true, expectedRowCounts = None)
assert(getCatalogTable("v1").stats.isEmpty)
checkOptimizedPlanStats(spark.table("v1"), 4, Some(1), Seq.empty)
checkOptimizedPlanStats(spark.table("v2"), 1, None, Seq.empty)

sql("ANALYZE TABLES COMPUTE STATISTICS")
checkTableStats("t1", hasSizeInBytes = true, expectedRowCounts = Some(10))
checkTableStats("t2", hasSizeInBytes = true, expectedRowCounts = Some(20))
checkOptimizedPlanStats(spark.table("v1"), 4, Some(1), Seq.empty)
checkOptimizedPlanStats(spark.table("v2"), 4, Some(1), Seq.empty)
withSQLConf(SQLConf.AUTO_UPDATE_STATS_BASED_ON_METRICS_ENABLED.key -> "false") {
withTempDatabase { database =>
spark.catalog.setCurrentDatabase(database)
withTempDir { dir =>
withTable("t1", "t2") {
spark.range(10).write.saveAsTable("t1")
sql(s"CREATE EXTERNAL TABLE t2 USING parquet LOCATION '${dir.toURI}' " +
"AS SELECT * FROM range(20)")
withView("v1", "v2") {
sql("CREATE VIEW v1 AS SELECT 1 c1")
sql("CREATE VIEW v2 AS SELECT 2 c2")
sql("CACHE TABLE v1")
sql("CACHE LAZY TABLE v2")

sql(s"ANALYZE TABLES IN $database COMPUTE STATISTICS NOSCAN")
checkTableStats("t1", hasSizeInBytes = true, expectedRowCounts = None)
checkTableStats("t2", hasSizeInBytes = true, expectedRowCounts = None)
assert(getCatalogTable("v1").stats.isEmpty)
checkOptimizedPlanStats(spark.table("v1"), 4, Some(1), Seq.empty)
checkOptimizedPlanStats(spark.table("v2"), 1, None, Seq.empty)

sql("ANALYZE TABLES COMPUTE STATISTICS")
checkTableStats("t1", hasSizeInBytes = true, expectedRowCounts = Some(10))
checkTableStats("t2", hasSizeInBytes = true, expectedRowCounts = Some(20))
checkOptimizedPlanStats(spark.table("v1"), 4, Some(1), Seq.empty)
checkOptimizedPlanStats(spark.table("v2"), 4, Some(1), Seq.empty)
}
}
}
}

val e = intercept[AnalysisException] {
sql(s"ANALYZE TABLES IN db_not_exists COMPUTE STATISTICS")
}
checkError(e,
errorClass = "SCHEMA_NOT_FOUND",
parameters = Map("schemaName" -> "`db_not_exists`"))
}
}

test("update datasource table stats based on metrics after insert command") {
Seq(false, true).foreach { autoUpdate =>
withSQLConf(
SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> "false",
SQLConf.AUTO_UPDATE_STATS_BASED_ON_METRICS_ENABLED.key -> autoUpdate.toString) {
val table1 = "datasource_table1"
val table2 = "datasource_table2"
withTable(table1, table2) {
// CreateDataSourceTableAsSelectCommand
sql(s"CREATE TABLE $table1 USING PARQUET SELECT 1 as i, 'abc' as j")
if (autoUpdate) {
checkTableStats(table1, true, Some(1))
} else {
checkTableStats(table1, false, None)
}

// InsertIntoHadoopFsRelationCommand
sql(s"INSERT INTO TABLE $table1 SELECT 1, 'abc'")
checkTableStats(table1, false, None)

val e = intercept[AnalysisException] {
sql(s"ANALYZE TABLES IN db_not_exists COMPUTE STATISTICS")
// CreateDataSourceTableCommand
sql(s"CREATE TABLE $table2 (id int, name string) USING PARQUET")
checkTableStats(table2, false, None)

// InsertIntoHadoopFsRelationCommand
sql(s"INSERT INTO TABLE $table2 SELECT 1, 'abc'")
checkTableStats(table2, false, None)
}
}
}
checkError(e,
errorClass = "SCHEMA_NOT_FOUND",
parameters = Map("schemaName" -> "`db_not_exists`"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,12 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
hasSizeInBytes: Boolean,
expectedRowCounts: Option[Int]): Option[CatalogStatistics] = {
val stats = getCatalogTable(tableName).stats
if (hasSizeInBytes || expectedRowCounts.nonEmpty) {
if (hasSizeInBytes) {
assert(stats.isDefined)
assert(stats.get.sizeInBytes >= 0)
assert(stats.get.rowCount === expectedRowCounts)
if(expectedRowCounts.nonEmpty) {
assert(stats.get.rowCount === expectedRowCounts)
}
} else {
assert(stats.isEmpty)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1944,7 +1944,9 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase {
}

test(s"basic DDL using locale tr - caseSensitive $caseSensitive") {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> s"$caseSensitive") {
withSQLConf(
SQLConf.CASE_SENSITIVE.key -> s"$caseSensitive",
SQLConf.AUTO_UPDATE_STATS_BASED_ON_METRICS_ENABLED.key -> "false") {
withLocale("tr") {
val dbName = "DaTaBaSe_I"
withDatabase(dbName) {
Expand Down
Loading