diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 3abd2578d4a5..01ee754a4519 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2493,6 +2493,15 @@ object SQLConf { .booleanConf .createWithDefault(false) + val AUTO_UPDATE_STATS_BASED_ON_METRICS_ENABLED = + buildConf("spark.sql.statistics.autoUpdateStatsBasedOnMetrics.enabled") + .doc("Enables automatic update for table size and row count based on write metrics. Only " + + "update stats for overwriting non-partition table as we don't known if the old " + + "are accurate.") + .version("3.4.0") + .booleanConf + .createWithDefault(true) + val CBO_ENABLED = buildConf("spark.sql.cbo.enabled") .doc("Enables CBO for estimation of plan statistics when set true.") @@ -4626,6 +4635,9 @@ class SQLConf extends Serializable with Logging { def autoSizeUpdateEnabled: Boolean = getConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED) + def autoUpdateStatsBasedOnMetricsEnabled: Boolean = + getConf(SQLConf.AUTO_UPDATE_STATS_BASED_ON_METRICS_ENABLED) + def joinReorderEnabled: Boolean = getConf(SQLConf.JOIN_REORDER_ENABLED) def joinReorderDPThreshold: Int = getConf(SQLConf.JOIN_REORDER_DP_THRESHOLD) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index d847868c0ce9..3a4eee2cc1b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -25,7 +25,7 @@ import scala.util.control.NonFatal import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} import org.apache.spark.internal.Logging -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, CatalogTableType} import org.apache.spark.sql.catalyst.expressions._ @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.execution.datasources.{DataSourceUtils, InMemoryFileIndex} +import org.apache.spark.sql.execution.datasources.{DataSourceUtils, InMemoryFileIndex, WriteStats} import org.apache.spark.sql.internal.{SessionState, SQLConf} import org.apache.spark.sql.types._ @@ -52,10 +52,28 @@ class PathFilterIgnoreNonData(stagingDir: String) extends PathFilter with Serial object CommandUtils extends Logging { - /** Change statistics after changing data by commands. */ - def updateTableStats(sparkSession: SparkSession, table: CatalogTable): Unit = { - val catalog = sparkSession.sessionState.catalog - if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) { + /** + * Change statistics after changing data by commands. + * If SparkSession.sessionState.catalog is HiveExternalCatalog, after we set table stats + * to None, hive metastore will auto update NUM_FILES and TOTAL_SIZE stats for un-partitioned + * table in MetastoreUtils.populateQuickStats() + * If SparkSession.sessionState.catalog is InMemoryCatalog, after we set table stats to None, + * the table stats will be None. + * */ + def updateTableStats( + sparkSession: SparkSession, + table: CatalogTable, + wroteStats: Option[WriteStats] = None): Unit = { + val sessionState = sparkSession.sessionState + val catalog = sessionState.catalog + if (sessionState.conf.autoUpdateStatsBasedOnMetricsEnabled && + table.partitionColumnNames.isEmpty && wroteStats.nonEmpty && + (wroteStats.get.mode == SaveMode.Overwrite || + wroteStats.get.mode == SaveMode.ErrorIfExists)) { + val stat = wroteStats.get + catalog.alterTableStats(table.identifier, + Some(CatalogStatistics(stat.numBytes, stat.numRows))) + } else if (sessionState.conf.autoSizeUpdateEnabled) { val newTable = catalog.getTableMetadata(table.identifier) val (newSize, newPartitions) = CommandUtils.calculateTotalSize(sparkSession, newTable) val isNewStats = newTable.stats.map(newSize != _.sizeInBytes).getOrElse(true) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index bf14ef14cf46..c27baf5afee4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -203,7 +203,8 @@ case class CreateDataSourceTableAsSelectCommand( } } - CommandUtils.updateTableStats(sparkSession, table) + val writeStats = BasicWriteJobStatsTracker.getWriteStats(mode, metrics) + CommandUtils.updateTableStats(sparkSession, table, writeStats) Seq.empty[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala index 476858997849..cbadb21450c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.{SparkContext, TaskContext} import org.apache.spark.internal.Logging +import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker._ @@ -45,6 +46,10 @@ case class BasicWriteTaskStats( numRows: Long) extends WriteTaskStats +/** + * The write mode and statistics, used to update table statistics. + */ +case class WriteStats(mode: SaveMode, numBytes: BigInt, numRows: Option[BigInt]) /** * Simple [[WriteTaskStatsTracker]] implementation that produces [[BasicWriteTaskStats]]. @@ -249,4 +254,14 @@ object BasicWriteJobStatsTracker { JOB_COMMIT_TIME -> SQLMetrics.createTimingMetric(sparkContext, "job commit time") ) } + + def getWriteStats(mode: SaveMode, metrics: Map[String, SQLMetric]): Option[WriteStats] = { + if (metrics.contains(NUM_OUTPUT_BYTES_KEY) && metrics.contains(NUM_OUTPUT_ROWS_KEY)) { + val numBytes = metrics.get(NUM_OUTPUT_BYTES_KEY).map(_.value).map(BigInt(_)) + val numRows = metrics.get(NUM_OUTPUT_ROWS_KEY).map(_.value).map(BigInt(_)) + numBytes.map(WriteStats(mode, _, numRows)) + } else { + None + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index fe6ec094812e..90347d65108d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -207,7 +207,8 @@ case class InsertIntoHadoopFsRelationCommand( sparkSession.sharedState.cacheManager.recacheByPath(sparkSession, outputPath, fs) if (catalogTable.nonEmpty) { - CommandUtils.updateTableStats(sparkSession, catalogTable.get) + val writeStats = BasicWriteJobStatsTracker.getWriteStats(mode, metrics) + CommandUtils.updateTableStats(sparkSession, catalogTable.get, writeStats) } } else { diff --git a/sql/core/src/test/resources/sql-tests/results/charvarchar.sql.out b/sql/core/src/test/resources/sql-tests/results/charvarchar.sql.out index ae88227121b6..3944a5030fba 100644 --- a/sql/core/src/test/resources/sql-tests/results/charvarchar.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/charvarchar.sql.out @@ -91,6 +91,7 @@ Last Access [not included in comparison] Created By [not included in comparison] Type MANAGED Provider parquet +Statistics 458 bytes, 0 rows Location [not included in comparison]/{warehouse_dir}/char_tbl2 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala index 2ab8bb25a8bd..3b46aff19057 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala @@ -99,25 +99,27 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } test("analyze empty table") { - val table = "emptyTable" - withTable(table) { - val df = Seq.empty[Int].toDF("key") - df.write.format("json").saveAsTable(table) - sql(s"ANALYZE TABLE $table COMPUTE STATISTICS noscan") - val fetchedStats1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None) - assert(fetchedStats1.get.sizeInBytes == 0) - sql(s"ANALYZE TABLE $table COMPUTE STATISTICS") - val fetchedStats2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0)) - assert(fetchedStats2.get.sizeInBytes == 0) - - val expectedColStat = - "key" -> CatalogColumnStat(Some(0), None, None, Some(0), - Some(IntegerType.defaultSize), Some(IntegerType.defaultSize)) - - // There won't be histogram for empty column. - Seq("true", "false").foreach { histogramEnabled => - withSQLConf(SQLConf.HISTOGRAM_ENABLED.key -> histogramEnabled) { - checkColStats(df, mutable.LinkedHashMap(expectedColStat)) + withSQLConf(SQLConf.AUTO_UPDATE_STATS_BASED_ON_METRICS_ENABLED.key -> "false") { + val table = "emptyTable" + withTable(table) { + val df = Seq.empty[Int].toDF("key") + df.write.format("json").saveAsTable(table) + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS noscan") + val stats1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None) + assert(stats1.get.sizeInBytes == 0) + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS") + val stats2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0)) + assert(stats2.get.sizeInBytes == 0) + + val expectedColStat = + "key" -> CatalogColumnStat(Some(0), None, None, Some(0), + Some(IntegerType.defaultSize), Some(IntegerType.defaultSize)) + + // There won't be histogram for empty column. + Seq("true", "false").foreach { histogramEnabled => + withSQLConf(SQLConf.HISTOGRAM_ENABLED.key -> histogramEnabled) { + checkColStats(df, mutable.LinkedHashMap(expectedColStat)) + } } } } @@ -156,18 +158,20 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } test("test table-level statistics for data source table") { - val tableName = "tbl" - withTable(tableName) { - sql(s"CREATE TABLE $tableName(i INT, j STRING) USING parquet") - Seq(1 -> "a", 2 -> "b").toDF("i", "j").write.mode("overwrite").insertInto(tableName) - - // noscan won't count the number of rows - sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS noscan") - checkTableStats(tableName, hasSizeInBytes = true, expectedRowCounts = None) - - // without noscan, we count the number of rows - sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS") - checkTableStats(tableName, hasSizeInBytes = true, expectedRowCounts = Some(2)) + withSQLConf(SQLConf.AUTO_UPDATE_STATS_BASED_ON_METRICS_ENABLED.key -> "false") { + val tableName = "tbl" + withTable(tableName) { + sql(s"CREATE TABLE $tableName(i INT, j STRING) USING parquet") + Seq(1 -> "a", 2 -> "b").toDF("i", "j").write.mode("overwrite").insertInto(tableName) + + // noscan won't count the number of rows + sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS noscan") + checkTableStats(tableName, hasSizeInBytes = true, expectedRowCounts = None) + + // without noscan, we count the number of rows + sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS") + checkTableStats(tableName, hasSizeInBytes = true, expectedRowCounts = Some(2)) + } } } @@ -319,7 +323,8 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared test("change stats after insert command for datasource table") { val table = "change_stats_insert_datasource_table" Seq(false, true).foreach { autoUpdate => - withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> autoUpdate.toString) { + withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> autoUpdate.toString, + SQLConf.AUTO_UPDATE_STATS_BASED_ON_METRICS_ENABLED.key -> "false") { withTable(table) { sql(s"CREATE TABLE $table (i int, j string) USING PARQUET") // analyze to get initial stats @@ -352,7 +357,8 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared test("auto gather stats after insert command") { val table = "change_stats_insert_datasource_table" Seq(false, true).foreach { autoUpdate => - withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> autoUpdate.toString) { + withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> autoUpdate.toString, + SQLConf.AUTO_UPDATE_STATS_BASED_ON_METRICS_ENABLED.key -> "false") { withTable(table) { sql(s"CREATE TABLE $table (i int, j string) USING PARQUET") // insert into command @@ -686,7 +692,8 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared test(s"CTAS should update statistics if ${SQLConf.AUTO_SIZE_UPDATE_ENABLED.key} is enabled") { val tableName = "spark_27694" Seq(false, true).foreach { updateEnabled => - withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> updateEnabled.toString) { + withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> updateEnabled.toString, + SQLConf.AUTO_UPDATE_STATS_BASED_ON_METRICS_ENABLED.key -> "false") { withTable(tableName) { // Create a data source table using the result of a query. sql(s"CREATE TABLE $tableName USING parquet AS SELECT 'a', 'b'") @@ -788,41 +795,75 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } test("SPARK-33687: analyze all tables in a specific database") { - withTempDatabase { database => - spark.catalog.setCurrentDatabase(database) - withTempDir { dir => - withTable("t1", "t2") { - spark.range(10).write.saveAsTable("t1") - sql(s"CREATE EXTERNAL TABLE t2 USING parquet LOCATION '${dir.toURI}' " + - "AS SELECT * FROM range(20)") - withView("v1", "v2") { - sql("CREATE VIEW v1 AS SELECT 1 c1") - sql("CREATE VIEW v2 AS SELECT 2 c2") - sql("CACHE TABLE v1") - sql("CACHE LAZY TABLE v2") - - sql(s"ANALYZE TABLES IN $database COMPUTE STATISTICS NOSCAN") - checkTableStats("t1", hasSizeInBytes = true, expectedRowCounts = None) - checkTableStats("t2", hasSizeInBytes = true, expectedRowCounts = None) - assert(getCatalogTable("v1").stats.isEmpty) - checkOptimizedPlanStats(spark.table("v1"), 4, Some(1), Seq.empty) - checkOptimizedPlanStats(spark.table("v2"), 1, None, Seq.empty) - - sql("ANALYZE TABLES COMPUTE STATISTICS") - checkTableStats("t1", hasSizeInBytes = true, expectedRowCounts = Some(10)) - checkTableStats("t2", hasSizeInBytes = true, expectedRowCounts = Some(20)) - checkOptimizedPlanStats(spark.table("v1"), 4, Some(1), Seq.empty) - checkOptimizedPlanStats(spark.table("v2"), 4, Some(1), Seq.empty) + withSQLConf(SQLConf.AUTO_UPDATE_STATS_BASED_ON_METRICS_ENABLED.key -> "false") { + withTempDatabase { database => + spark.catalog.setCurrentDatabase(database) + withTempDir { dir => + withTable("t1", "t2") { + spark.range(10).write.saveAsTable("t1") + sql(s"CREATE EXTERNAL TABLE t2 USING parquet LOCATION '${dir.toURI}' " + + "AS SELECT * FROM range(20)") + withView("v1", "v2") { + sql("CREATE VIEW v1 AS SELECT 1 c1") + sql("CREATE VIEW v2 AS SELECT 2 c2") + sql("CACHE TABLE v1") + sql("CACHE LAZY TABLE v2") + + sql(s"ANALYZE TABLES IN $database COMPUTE STATISTICS NOSCAN") + checkTableStats("t1", hasSizeInBytes = true, expectedRowCounts = None) + checkTableStats("t2", hasSizeInBytes = true, expectedRowCounts = None) + assert(getCatalogTable("v1").stats.isEmpty) + checkOptimizedPlanStats(spark.table("v1"), 4, Some(1), Seq.empty) + checkOptimizedPlanStats(spark.table("v2"), 1, None, Seq.empty) + + sql("ANALYZE TABLES COMPUTE STATISTICS") + checkTableStats("t1", hasSizeInBytes = true, expectedRowCounts = Some(10)) + checkTableStats("t2", hasSizeInBytes = true, expectedRowCounts = Some(20)) + checkOptimizedPlanStats(spark.table("v1"), 4, Some(1), Seq.empty) + checkOptimizedPlanStats(spark.table("v2"), 4, Some(1), Seq.empty) + } } } } + + val e = intercept[AnalysisException] { + sql(s"ANALYZE TABLES IN db_not_exists COMPUTE STATISTICS") + } + checkError(e, + errorClass = "SCHEMA_NOT_FOUND", + parameters = Map("schemaName" -> "`db_not_exists`")) } + } + + test("update datasource table stats based on metrics after insert command") { + Seq(false, true).foreach { autoUpdate => + withSQLConf( + SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> "false", + SQLConf.AUTO_UPDATE_STATS_BASED_ON_METRICS_ENABLED.key -> autoUpdate.toString) { + val table1 = "datasource_table1" + val table2 = "datasource_table2" + withTable(table1, table2) { + // CreateDataSourceTableAsSelectCommand + sql(s"CREATE TABLE $table1 USING PARQUET SELECT 1 as i, 'abc' as j") + if (autoUpdate) { + checkTableStats(table1, true, Some(1)) + } else { + checkTableStats(table1, false, None) + } + + // InsertIntoHadoopFsRelationCommand + sql(s"INSERT INTO TABLE $table1 SELECT 1, 'abc'") + checkTableStats(table1, false, None) - val e = intercept[AnalysisException] { - sql(s"ANALYZE TABLES IN db_not_exists COMPUTE STATISTICS") + // CreateDataSourceTableCommand + sql(s"CREATE TABLE $table2 (id int, name string) USING PARQUET") + checkTableStats(table2, false, None) + + // InsertIntoHadoopFsRelationCommand + sql(s"INSERT INTO TABLE $table2 SELECT 1, 'abc'") + checkTableStats(table2, false, None) + } + } } - checkError(e, - errorClass = "SCHEMA_NOT_FOUND", - parameters = Map("schemaName" -> "`db_not_exists`")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala index 6c6ef1a118f4..15186781a087 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala @@ -273,10 +273,12 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils hasSizeInBytes: Boolean, expectedRowCounts: Option[Int]): Option[CatalogStatistics] = { val stats = getCatalogTable(tableName).stats - if (hasSizeInBytes || expectedRowCounts.nonEmpty) { + if (hasSizeInBytes) { assert(stats.isDefined) assert(stats.get.sizeInBytes >= 0) - assert(stats.get.rowCount === expectedRowCounts) + if(expectedRowCounts.nonEmpty) { + assert(stats.get.rowCount === expectedRowCounts) + } } else { assert(stats.isEmpty) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index f5d17b142e21..a4675bce64a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1944,7 +1944,9 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase { } test(s"basic DDL using locale tr - caseSensitive $caseSensitive") { - withSQLConf(SQLConf.CASE_SENSITIVE.key -> s"$caseSensitive") { + withSQLConf( + SQLConf.CASE_SENSITIVE.key -> s"$caseSensitive", + SQLConf.AUTO_UPDATE_STATS_BASED_ON_METRICS_ENABLED.key -> "false") { withLocale("tr") { val dbName = "DaTaBaSe_I" withDatabase(dbName) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 2c9720e089aa..aad96ce904ad 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.plan.TableDesc -import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.CommandUtils -import org.apache.spark.sql.execution.datasources.{FileFormat, V1WriteCommand, V1WritesUtils} +import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormat, V1WriteCommand, V1WritesUtils} import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive.client.HiveClientImpl @@ -113,7 +113,13 @@ case class InsertIntoHiveTable( CommandUtils.uncacheTableOrView(sparkSession, table.identifier.quotedString) sparkSession.sessionState.catalog.refreshTable(table.identifier) - CommandUtils.updateTableStats(sparkSession, table) + val writeStats = if (partition.isEmpty) { + val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append + BasicWriteJobStatsTracker.getWriteStats(mode, metrics) + } else { + None + } + CommandUtils.updateTableStats(sparkSession, table, writeStats) // It would be nice to just return the childRdd unchanged so insert operations could be chained, // however for now we return an empty list to simplify compatibility checks with hive, which diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 65fd2f72727e..b712045255ed 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -633,31 +633,33 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } test("test table-level statistics for hive tables created in HiveExternalCatalog") { - val textTable = "textTable" - withTable(textTable) { - // Currently Spark's statistics are self-contained, we don't have statistics until we use - // the `ANALYZE TABLE` command. - sql(s"CREATE TABLE $textTable (key STRING, value STRING) STORED AS TEXTFILE") - checkTableStats( - textTable, - hasSizeInBytes = false, - expectedRowCounts = None) - sql(s"INSERT INTO TABLE $textTable SELECT * FROM src") - checkTableStats( - textTable, - hasSizeInBytes = true, - expectedRowCounts = None) + withSQLConf(SQLConf.AUTO_UPDATE_STATS_BASED_ON_METRICS_ENABLED.key -> "false") { + val textTable = "textTable" + withTable(textTable) { + // Currently Spark's statistics are self-contained, we don't have statistics until we use + // the `ANALYZE TABLE` command. + sql(s"CREATE TABLE $textTable (key STRING, value STRING) STORED AS TEXTFILE") + checkTableStats( + textTable, + hasSizeInBytes = false, + expectedRowCounts = None) + sql(s"INSERT INTO TABLE $textTable SELECT * FROM src") + checkTableStats( + textTable, + hasSizeInBytes = true, + expectedRowCounts = None) - // noscan won't count the number of rows - sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan") - val fetchedStats1 = - checkTableStats(textTable, hasSizeInBytes = true, expectedRowCounts = None) + // noscan won't count the number of rows + sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan") + val fetchedStats1 = + checkTableStats(textTable, hasSizeInBytes = true, expectedRowCounts = None) - // without noscan, we count the number of rows - sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS") - val fetchedStats2 = - checkTableStats(textTable, hasSizeInBytes = true, expectedRowCounts = Some(500)) - assert(fetchedStats1.get.sizeInBytes == fetchedStats2.get.sizeInBytes) + // without noscan, we count the number of rows + sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS") + val fetchedStats2 = + checkTableStats(textTable, hasSizeInBytes = true, expectedRowCounts = Some(500)) + assert(fetchedStats1.get.sizeInBytes == fetchedStats2.get.sizeInBytes) + } } } @@ -679,45 +681,48 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } test("keep existing column stats if table is not changed") { - val table = "update_col_stats_table" - withTable(table) { - sql(s"CREATE TABLE $table (c1 INT, c2 STRING, c3 DOUBLE)") - sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS c1") - val fetchedStats0 = - checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0)) - assert(fetchedStats0.get.colStats == - Map("c1" -> CatalogColumnStat(Some(0), None, None, Some(0), Some(4), Some(4)))) - - // Insert new data and analyze: have the latest column stats. - sql(s"INSERT INTO TABLE $table SELECT 1, 'a', 10.0") - sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS c1") - val fetchedStats1 = - checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(1)).get - assert(fetchedStats1.colStats == Map( - "c1" -> CatalogColumnStat(distinctCount = Some(1), min = Some("1"), max = Some("1"), - nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)))) - - // Analyze another column: since the table is not changed, the precious column stats are kept. - sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS c2") - val fetchedStats2 = - checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(1)).get - assert(fetchedStats2.colStats == Map( - "c1" -> CatalogColumnStat(distinctCount = Some(1), min = Some("1"), max = Some("1"), - nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)), - "c2" -> CatalogColumnStat(distinctCount = Some(1), min = None, max = None, - nullCount = Some(0), avgLen = Some(1), maxLen = Some(1)))) - - // Insert new data and analyze: stale column stats are removed and newly collected column - // stats are added. - sql(s"INSERT INTO TABLE $table SELECT 2, 'b', 20.0") - sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS c1, c3") - val fetchedStats3 = - checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(2)).get - assert(fetchedStats3.colStats == Map( - "c1" -> CatalogColumnStat(distinctCount = Some(2), min = Some("1"), max = Some("2"), - nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)), - "c3" -> CatalogColumnStat(distinctCount = Some(2), min = Some("10.0"), max = Some("20.0"), - nullCount = Some(0), avgLen = Some(8), maxLen = Some(8)))) + withSQLConf(SQLConf.AUTO_UPDATE_STATS_BASED_ON_METRICS_ENABLED.key -> "false") { + val table = "update_col_stats_table" + withTable(table) { + sql(s"CREATE TABLE $table (c1 INT, c2 STRING, c3 DOUBLE)") + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS c1") + val fetchedStats0 = + checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0)) + assert(fetchedStats0.get.colStats == + Map("c1" -> CatalogColumnStat(Some(0), None, None, Some(0), Some(4), Some(4)))) + + // Insert new data and analyze: have the latest column stats. + sql(s"INSERT INTO TABLE $table SELECT 1, 'a', 10.0") + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS c1") + val fetchedStats1 = + checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(1)).get + assert(fetchedStats1.colStats == Map( + "c1" -> CatalogColumnStat(distinctCount = Some(1), min = Some("1"), max = Some("1"), + nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)))) + + // Analyze another column: since the table is not changed, the precious column stats are + // kept. + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS c2") + val fetchedStats2 = + checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(1)).get + assert(fetchedStats2.colStats == Map( + "c1" -> CatalogColumnStat(distinctCount = Some(1), min = Some("1"), max = Some("1"), + nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)), + "c2" -> CatalogColumnStat(distinctCount = Some(1), min = None, max = None, + nullCount = Some(0), avgLen = Some(1), maxLen = Some(1)))) + + // Insert new data and analyze: stale column stats are removed and newly collected column + // stats are added. + sql(s"INSERT INTO TABLE $table SELECT 2, 'b', 20.0") + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS c1, c3") + val fetchedStats3 = + checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(2)).get + assert(fetchedStats3.colStats == Map( + "c1" -> CatalogColumnStat(distinctCount = Some(2), min = Some("1"), max = Some("2"), + nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)), + "c3" -> CatalogColumnStat(distinctCount = Some(2), min = Some("10.0"), max = Some("20.0"), + nullCount = Some(0), avgLen = Some(8), maxLen = Some(8)))) + } } } @@ -815,21 +820,23 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } test("get statistics when not analyzed in Hive or Spark") { - val tabName = "tab1" - withTable(tabName) { - createNonPartitionedTable(tabName, analyzedByHive = false, analyzedBySpark = false) - checkTableStats(tabName, hasSizeInBytes = true, expectedRowCounts = None) - - // ALTER TABLE SET TBLPROPERTIES invalidates some contents of Hive specific statistics - // This is triggered by the Hive alterTable API - val describeResult = hiveClient.runSqlHive(s"DESCRIBE FORMATTED $tabName") - - val rawDataSize = extractStatsPropValues(describeResult, "rawDataSize") - val numRows = extractStatsPropValues(describeResult, "numRows") - val totalSize = extractStatsPropValues(describeResult, "totalSize") - assert(rawDataSize.isEmpty, "rawDataSize should not be shown without table analysis") - assert(numRows.isEmpty, "numRows should not be shown without table analysis") - assert(totalSize.isDefined && totalSize.get > 0, "totalSize is lost") + withSQLConf(SQLConf.AUTO_UPDATE_STATS_BASED_ON_METRICS_ENABLED.key -> "false") { + val tabName = "tab1" + withTable(tabName) { + createNonPartitionedTable(tabName, analyzedByHive = false, analyzedBySpark = false) + checkTableStats(tabName, hasSizeInBytes = true, expectedRowCounts = None) + + // ALTER TABLE SET TBLPROPERTIES invalidates some contents of Hive specific statistics + // This is triggered by the Hive alterTable API + val describeResult = hiveClient.runSqlHive(s"DESCRIBE FORMATTED $tabName") + + val rawDataSize = extractStatsPropValues(describeResult, "rawDataSize") + val numRows = extractStatsPropValues(describeResult, "numRows") + val totalSize = extractStatsPropValues(describeResult, "totalSize") + assert(rawDataSize.isEmpty, "rawDataSize should not be shown without table analysis") + assert(numRows.isEmpty, "numRows should not be shown without table analysis") + assert(totalSize.isDefined && totalSize.get > 0, "totalSize is lost") + } } } @@ -898,7 +905,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto test("change stats after insert command for hive table") { val table = s"change_stats_insert_hive_table" Seq(false, true).foreach { autoUpdate => - withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> autoUpdate.toString) { + withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> autoUpdate.toString, + SQLConf.AUTO_UPDATE_STATS_BASED_ON_METRICS_ENABLED.key -> "false") { withTable(table) { sql(s"CREATE TABLE $table (i int, j string)") // analyze to get initial stats @@ -1105,7 +1113,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto Seq(true, false).foreach { isConverted => withSQLConf( HiveUtils.CONVERT_METASTORE_ORC.key -> s"$isConverted", - HiveUtils.CONVERT_METASTORE_PARQUET.key -> s"$isConverted") { + HiveUtils.CONVERT_METASTORE_PARQUET.key -> s"$isConverted", + SQLConf.AUTO_UPDATE_STATS_BASED_ON_METRICS_ENABLED.key -> "false") { withTable(format) { sql(s"CREATE TABLE $format (key STRING, value STRING) STORED AS $format") sql(s"INSERT INTO TABLE $format SELECT * FROM src") @@ -1219,32 +1228,35 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto private def testUpdatingTableStats(tableDescription: String, createTableCmd: String): Unit = { test("test table-level statistics for " + tableDescription) { - val parquetTable = "parquetTable" - withTable(parquetTable) { - sql(createTableCmd) - val catalogTable = getCatalogTable(parquetTable) - assert(DDLUtils.isDatasourceTable(catalogTable)) - - // Add a filter to avoid creating too many partitions - sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src WHERE key < 10") - checkTableStats(parquetTable, hasSizeInBytes = false, expectedRowCounts = None) - - // noscan won't count the number of rows - sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS noscan") - val fetchedStats1 = - checkTableStats(parquetTable, hasSizeInBytes = true, expectedRowCounts = None) - - sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src WHERE key < 10") - sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS noscan") - val fetchedStats2 = - checkTableStats(parquetTable, hasSizeInBytes = true, expectedRowCounts = None) - assert(fetchedStats2.get.sizeInBytes > fetchedStats1.get.sizeInBytes) - - // without noscan, we count the number of rows - sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS") - val fetchedStats3 = - checkTableStats(parquetTable, hasSizeInBytes = true, expectedRowCounts = Some(20)) - assert(fetchedStats3.get.sizeInBytes == fetchedStats2.get.sizeInBytes) + withSQLConf(SQLConf.AUTO_UPDATE_STATS_BASED_ON_METRICS_ENABLED.key -> "false") { + val parquetTable = "parquetTable" + withTable(parquetTable) { + sql(createTableCmd) + val catalogTable = getCatalogTable(parquetTable) + assert(DDLUtils.isDatasourceTable(catalogTable)) + + // Add a filter to avoid creating too many partitions + sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src WHERE key < 10") + checkTableStats(parquetTable, hasSizeInBytes = false, + expectedRowCounts = None) + + // noscan won't count the number of rows + sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS noscan") + val fetchedStats1 = + checkTableStats(parquetTable, hasSizeInBytes = true, expectedRowCounts = None) + + sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src WHERE key < 10") + sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS noscan") + val fetchedStats2 = + checkTableStats(parquetTable, hasSizeInBytes = true, expectedRowCounts = None) + assert(fetchedStats2.get.sizeInBytes > fetchedStats1.get.sizeInBytes) + + // without noscan, we count the number of rows + sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS") + val fetchedStats3 = + checkTableStats(parquetTable, hasSizeInBytes = true, expectedRowCounts = Some(20)) + assert(fetchedStats3.get.sizeInBytes == fetchedStats2.get.sizeInBytes) + } } } } @@ -1434,20 +1446,22 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } test(s"CTAS should update statistics if ${SQLConf.AUTO_SIZE_UPDATE_ENABLED.key} is enabled") { - val tableName = "SPARK_23263" - Seq(false, true).foreach { isConverted => - Seq(false, true).foreach { updateEnabled => - withSQLConf( - SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> updateEnabled.toString, - HiveUtils.CONVERT_METASTORE_PARQUET.key -> isConverted.toString) { - withTable(tableName) { - sql(s"CREATE TABLE $tableName STORED AS parquet AS SELECT 'a', 'b'") - val catalogTable = getCatalogTable(tableName) - // Hive serde tables always update statistics by Hive metastore - if (!isConverted || updateEnabled) { - assert(catalogTable.stats.nonEmpty) - } else { - assert(catalogTable.stats.isEmpty) + withSQLConf(SQLConf.AUTO_UPDATE_STATS_BASED_ON_METRICS_ENABLED.key -> "false") { + val tableName = "SPARK_23263" + Seq(false, true).foreach { isConverted => + Seq(false, true).foreach { updateEnabled => + withSQLConf( + SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> updateEnabled.toString, + HiveUtils.CONVERT_METASTORE_PARQUET.key -> isConverted.toString) { + withTable(tableName) { + sql(s"CREATE TABLE $tableName STORED AS parquet AS SELECT 'a', 'b'") + val catalogTable = getCatalogTable(tableName) + // Hive serde tables always update statistics by Hive metastore + if (!isConverted || updateEnabled) { + assert(catalogTable.stats.nonEmpty) + } else { + assert(catalogTable.stats.isEmpty) + } } } } @@ -1518,45 +1532,47 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } test("SPARK-30269 failed to update partition stats if it's equal to table's old stats") { - val tbl = "SPARK_30269" - val ext_tbl = "SPARK_30269_external" - withTempDir { dir => - withTable(tbl, ext_tbl) { - sql(s"CREATE TABLE $tbl (key INT, value STRING, ds STRING)" + - "USING parquet PARTITIONED BY (ds)") - sql( - s""" - | CREATE TABLE $ext_tbl (key INT, value STRING, ds STRING) - | USING PARQUET - | PARTITIONED BY (ds) - | LOCATION '${dir.toURI}' + withSQLConf(SQLConf.AUTO_UPDATE_STATS_BASED_ON_METRICS_ENABLED.key -> "false") { + val tbl = "SPARK_30269" + val ext_tbl = "SPARK_30269_external" + withTempDir { dir => + withTable(tbl, ext_tbl) { + sql(s"CREATE TABLE $tbl (key INT, value STRING, ds STRING)" + + "USING parquet PARTITIONED BY (ds)") + sql( + s""" + | CREATE TABLE $ext_tbl (key INT, value STRING, ds STRING) + | USING PARQUET + | PARTITIONED BY (ds) + | LOCATION '${dir.toURI}' """.stripMargin) - Seq(tbl, ext_tbl).foreach { tblName => - sql(s"INSERT INTO $tblName VALUES (1, 'a', '2019-12-13')") - - val expectedSize = 657 - // analyze table - sql(s"ANALYZE TABLE $tblName COMPUTE STATISTICS NOSCAN") - var tableStats = getTableStats(tblName) - assert(tableStats.sizeInBytes == expectedSize) - assert(tableStats.rowCount.isEmpty) - - sql(s"ANALYZE TABLE $tblName COMPUTE STATISTICS") - tableStats = getTableStats(tblName) - assert(tableStats.sizeInBytes == expectedSize) - assert(tableStats.rowCount.get == 1) - - // analyze a single partition - sql(s"ANALYZE TABLE $tblName PARTITION (ds='2019-12-13') COMPUTE STATISTICS NOSCAN") - var partStats = getPartitionStats(tblName, Map("ds" -> "2019-12-13")) - assert(partStats.sizeInBytes == expectedSize) - assert(partStats.rowCount.isEmpty) - - sql(s"ANALYZE TABLE $tblName PARTITION (ds='2019-12-13') COMPUTE STATISTICS") - partStats = getPartitionStats(tblName, Map("ds" -> "2019-12-13")) - assert(partStats.sizeInBytes == expectedSize) - assert(partStats.rowCount.get == 1) + Seq(tbl, ext_tbl).foreach { tblName => + sql(s"INSERT INTO $tblName VALUES (1, 'a', '2019-12-13')") + + val expectedSize = 657 + // analyze table + sql(s"ANALYZE TABLE $tblName COMPUTE STATISTICS NOSCAN") + var tableStats = getTableStats(tblName) + assert(tableStats.sizeInBytes == expectedSize) + assert(tableStats.rowCount.isEmpty) + + sql(s"ANALYZE TABLE $tblName COMPUTE STATISTICS") + tableStats = getTableStats(tblName) + assert(tableStats.sizeInBytes == expectedSize) + assert(tableStats.rowCount.get == 1) + + // analyze a single partition + sql(s"ANALYZE TABLE $tblName PARTITION (ds='2019-12-13') COMPUTE STATISTICS NOSCAN") + var partStats = getPartitionStats(tblName, Map("ds" -> "2019-12-13")) + assert(partStats.sizeInBytes == expectedSize) + assert(partStats.rowCount.isEmpty) + + sql(s"ANALYZE TABLE $tblName PARTITION (ds='2019-12-13') COMPUTE STATISTICS") + partStats = getPartitionStats(tblName, Map("ds" -> "2019-12-13")) + assert(partStats.sizeInBytes == expectedSize) + assert(partStats.rowCount.get == 1) + } } } } @@ -1647,4 +1663,35 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto ) } } + + test("update hive table stats based on metrics after insert command") { + Seq(false, true).foreach { autoUpdate => + withSQLConf( + SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> "false", + SQLConf.AUTO_UPDATE_STATS_BASED_ON_METRICS_ENABLED.key -> autoUpdate.toString) { + val table1 = "hive_table1" + val table2 = "hive_table2" + withTable(table1, table2) { + // OptimizedCreateHiveTableAsSelectCommand + sql(s"CREATE TABLE $table1 STORED AS PARQUET SELECT 1 as i, 'abc' as j") + if (autoUpdate) { + checkTableStats(table1, true, Some(1)) + } else { + checkTableStats(table1, false, None) + } + + // InsertIntoHadoopFsRelationCommand + sql(s"INSERT INTO TABLE $table1 SELECT 1, 'abc'") + checkTableStats(table1, autoUpdate, None) + + // CreateTableCommand + sql(s"CREATE TABLE $table2 (id int, name string) STORED AS PARQUET") + checkTableStats(table2, false, None) + + sql(s"INSERT INTO TABLE $table2 SELECT 1, 'abc'") + checkTableStats(table2, false, None) + } + } + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala index 6f799bbe7d3f..dbb4b4761ce3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala @@ -737,7 +737,7 @@ class HiveClientSuite(version: String, allVersions: Seq[String]) val totalSize = tableMeta.stats.map(_.sizeInBytes) // Except 0.12, all the following versions will fill the Hive-generated statistics if (version == "0.12") { - assert(totalSize.isEmpty) + assert(totalSize.nonEmpty) } else { assert(totalSize.nonEmpty && totalSize.get > 0) }