diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 1e6e9a223e29..100ed3ee3687 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -19,6 +19,7 @@ package org.apache.spark.util import java.io._ import java.lang.management.{LockInfo, ManagementFactory, MonitorInfo, ThreadInfo} +import java.math.{MathContext, RoundingMode} import java.net._ import java.nio.ByteBuffer import java.nio.channels.Channels @@ -1109,26 +1110,39 @@ private[spark] object Utils extends Logging { /** * Convert a quantity in bytes to a human-readable string such as "4.0 MB". */ - def bytesToString(size: Long): String = { + def bytesToString(size: Long): String = bytesToString(BigInt(size)) + + def bytesToString(size: BigInt): String = { + val EB = 1L << 60 + val PB = 1L << 50 val TB = 1L << 40 val GB = 1L << 30 val MB = 1L << 20 val KB = 1L << 10 - val (value, unit) = { - if (size >= 2*TB) { - (size.asInstanceOf[Double] / TB, "TB") - } else if (size >= 2*GB) { - (size.asInstanceOf[Double] / GB, "GB") - } else if (size >= 2*MB) { - (size.asInstanceOf[Double] / MB, "MB") - } else if (size >= 2*KB) { - (size.asInstanceOf[Double] / KB, "KB") - } else { - (size.asInstanceOf[Double], "B") + if (size >= BigInt(1L << 11) * EB) { + // The number is too large, show it in scientific notation. + BigDecimal(size, new MathContext(3, RoundingMode.HALF_UP)).toString() + " B" + } else { + val (value, unit) = { + if (size >= 2 * EB) { + (BigDecimal(size) / EB, "EB") + } else if (size >= 2 * PB) { + (BigDecimal(size) / PB, "PB") + } else if (size >= 2 * TB) { + (BigDecimal(size) / TB, "TB") + } else if (size >= 2 * GB) { + (BigDecimal(size) / GB, "GB") + } else if (size >= 2 * MB) { + (BigDecimal(size) / MB, "MB") + } else if (size >= 2 * KB) { + (BigDecimal(size) / KB, "KB") + } else { + (BigDecimal(size), "B") + } } + "%.1f %s".formatLocal(Locale.US, value, unit) } - "%.1f %s".formatLocal(Locale.US, value, unit) } /** diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 43f77e68c153..71cb667d9dbe 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -200,7 +200,10 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assert(Utils.bytesToString(2097152) === "2.0 MB") assert(Utils.bytesToString(2306867) === "2.2 MB") assert(Utils.bytesToString(5368709120L) === "5.0 GB") - assert(Utils.bytesToString(5L * 1024L * 1024L * 1024L * 1024L) === "5.0 TB") + assert(Utils.bytesToString(5L * (1L << 40)) === "5.0 TB") + assert(Utils.bytesToString(5L * (1L << 50)) === "5.0 PB") + assert(Utils.bytesToString(5L * (1L << 60)) === "5.0 EB") + assert(Utils.bytesToString(BigInt(1L << 11) * (1L << 60)) === "2.36E+21 B") } test("copyStream") { diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index d8cd68e2d9e9..59f93b3c469d 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -123,7 +123,8 @@ statement | CREATE TEMPORARY? FUNCTION qualifiedName AS className=STRING (USING resource (',' resource)*)? #createFunction | DROP TEMPORARY? FUNCTION (IF EXISTS)? qualifiedName #dropFunction - | EXPLAIN (LOGICAL | FORMATTED | EXTENDED | CODEGEN)? statement #explain + | EXPLAIN (LOGICAL | FORMATTED | EXTENDED | CODEGEN | COST)? + statement #explain | SHOW TABLES ((FROM | IN) db=identifier)? (LIKE? pattern=STRING)? #showTables | SHOW TABLE EXTENDED ((FROM | IN) db=identifier)? @@ -693,7 +694,7 @@ nonReserved | DELIMITED | FIELDS | TERMINATED | COLLECTION | ITEMS | KEYS | ESCAPED | LINES | SEPARATED | EXTENDED | REFRESH | CLEAR | CACHE | UNCACHE | LAZY | GLOBAL | TEMPORARY | OPTIONS | GROUPING | CUBE | ROLLUP - | EXPLAIN | FORMAT | LOGICAL | FORMATTED | CODEGEN + | EXPLAIN | FORMAT | LOGICAL | FORMATTED | CODEGEN | COST | TABLESAMPLE | USE | TO | BUCKET | PERCENTLIT | OUT | OF | SET | RESET | VIEW | REPLACE @@ -794,6 +795,7 @@ EXPLAIN: 'EXPLAIN'; FORMAT: 'FORMAT'; LOGICAL: 'LOGICAL'; CODEGEN: 'CODEGEN'; +COST: 'COST'; CAST: 'CAST'; SHOW: 'SHOW'; TABLES: 'TABLES'; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 0937825e273a..e22b429aec68 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -115,6 +115,10 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { Statistics(sizeInBytes = children.map(_.stats(conf).sizeInBytes).product) } + override def verboseStringWithSuffix: String = { + super.verboseString + statsCache.map(", " + _.toString).getOrElse("") + } + /** * Returns the maximum number of rows that this plan may compute. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala index 91404d4bb81b..f24b240956a6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.plans.logical +import java.math.{MathContext, RoundingMode} + import scala.util.control.NonFatal import org.apache.spark.internal.Logging @@ -24,6 +26,7 @@ import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils /** @@ -54,8 +57,13 @@ case class Statistics( /** Readable string representation for the Statistics. */ def simpleString: String = { - Seq(s"sizeInBytes=$sizeInBytes", - if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "", + Seq(s"sizeInBytes=${Utils.bytesToString(sizeInBytes)}", + if (rowCount.isDefined) { + // Show row count in scientific notation. + s"rowCount=${BigDecimal(rowCount.get, new MathContext(3, RoundingMode.HALF_UP)).toString()}" + } else { + "" + }, s"isBroadcastable=$isBroadcastable" ).filter(_.nonEmpty).mkString(", ") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index f37661c31584..cc4c0835954b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -453,13 +453,16 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { /** ONE line description of this node with more information */ def verboseString: String + /** ONE line description of this node with some suffix information */ + def verboseStringWithSuffix: String = verboseString + override def toString: String = treeString /** Returns a string representation of the nodes in this tree */ def treeString: String = treeString(verbose = true) - def treeString(verbose: Boolean): String = { - generateTreeString(0, Nil, new StringBuilder, verbose).toString + def treeString(verbose: Boolean, addSuffix: Boolean = false): String = { + generateTreeString(0, Nil, new StringBuilder, verbose = verbose, addSuffix = addSuffix).toString } /** @@ -524,7 +527,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { lastChildren: Seq[Boolean], builder: StringBuilder, verbose: Boolean, - prefix: String = ""): StringBuilder = { + prefix: String = "", + addSuffix: Boolean = false): StringBuilder = { if (depth > 0) { lastChildren.init.foreach { isLast => @@ -533,22 +537,29 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { builder.append(if (lastChildren.last) "+- " else ":- ") } + val str = if (verbose) { + if (addSuffix) verboseStringWithSuffix else verboseString + } else { + simpleString + } builder.append(prefix) - builder.append(if (verbose) verboseString else simpleString) + builder.append(str) builder.append("\n") if (innerChildren.nonEmpty) { innerChildren.init.foreach(_.generateTreeString( - depth + 2, lastChildren :+ children.isEmpty :+ false, builder, verbose)) + depth + 2, lastChildren :+ children.isEmpty :+ false, builder, verbose, + addSuffix = addSuffix)) innerChildren.last.generateTreeString( - depth + 2, lastChildren :+ children.isEmpty :+ true, builder, verbose) + depth + 2, lastChildren :+ children.isEmpty :+ true, builder, verbose, + addSuffix = addSuffix) } if (children.nonEmpty) { children.init.foreach(_.generateTreeString( - depth + 1, lastChildren :+ false, builder, verbose, prefix)) + depth + 1, lastChildren :+ false, builder, verbose, prefix, addSuffix)) children.last.generateTreeString( - depth + 1, lastChildren :+ true, builder, verbose, prefix) + depth + 1, lastChildren :+ true, builder, verbose, prefix, addSuffix) } builder diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/TableIdentifierParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/TableIdentifierParserSuite.scala index 7d46011b410e..170c469197e7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/TableIdentifierParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/TableIdentifierParserSuite.scala @@ -25,8 +25,8 @@ class TableIdentifierParserSuite extends SparkFunSuite { // Add "$elem$", "$value$" & "$key$" val hiveNonReservedKeyword = Array("add", "admin", "after", "analyze", "archive", "asc", "before", "bucket", "buckets", "cascade", "change", "cluster", "clustered", "clusterstatus", "collection", - "columns", "comment", "compact", "compactions", "compute", "concatenate", "continue", "data", - "day", "databases", "datetime", "dbproperties", "deferred", "defined", "delimited", + "columns", "comment", "compact", "compactions", "compute", "concatenate", "continue", "cost", + "data", "day", "databases", "datetime", "dbproperties", "deferred", "defined", "delimited", "dependency", "desc", "directories", "directory", "disable", "distribute", "enable", "escaped", "exclusive", "explain", "export", "fields", "file", "fileformat", "first", "format", "formatted", "functions", "hold_ddltime", "hour", "idxproperties", "ignore", "index", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 9d046c0766aa..137f7ba04d57 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -197,7 +197,11 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { """.stripMargin.trim } - override def toString: String = { + override def toString: String = completeString(appendStats = false) + + def toStringWithStats: String = completeString(appendStats = true) + + private def completeString(appendStats: Boolean): String = { def output = Utils.truncatedString( analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ") val analyzedPlan = Seq( @@ -205,12 +209,20 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { stringOrError(analyzed.treeString(verbose = true)) ).filter(_.nonEmpty).mkString("\n") + val optimizedPlanString = if (appendStats) { + // trigger to compute stats for logical plans + optimizedPlan.stats(sparkSession.sessionState.conf) + optimizedPlan.treeString(verbose = true, addSuffix = true) + } else { + optimizedPlan.treeString(verbose = true) + } + s"""== Parsed Logical Plan == |${stringOrError(logical.treeString(verbose = true))} |== Analyzed Logical Plan == |$analyzedPlan |== Optimized Logical Plan == - |${stringOrError(optimizedPlan.treeString(verbose = true))} + |${stringOrError(optimizedPlanString)} |== Physical Plan == |${stringOrError(executedPlan.treeString(verbose = true))} """.stripMargin.trim diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index d50800235264..96b24a89cc73 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -282,7 +282,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { if (statement == null) { null // This is enough since ParseException will raise later. } else if (isExplainableStatement(statement)) { - ExplainCommand(statement, extended = ctx.EXTENDED != null, codegen = ctx.CODEGEN != null) + ExplainCommand( + logicalPlan = statement, + extended = ctx.EXTENDED != null, + codegen = ctx.CODEGEN != null, + cost = ctx.COST != null) } else { ExplainCommand(OneRowRelation) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 2ead8f6baae6..c58474eba05d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -254,7 +254,8 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp lastChildren: Seq[Boolean], builder: StringBuilder, verbose: Boolean, - prefix: String = ""): StringBuilder = { + prefix: String = "", + addSuffix: Boolean = false): StringBuilder = { child.generateTreeString(depth, lastChildren, builder, verbose, "") } } @@ -428,7 +429,8 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co lastChildren: Seq[Boolean], builder: StringBuilder, verbose: Boolean, - prefix: String = ""): StringBuilder = { + prefix: String = "", + addSuffix: Boolean = false): StringBuilder = { child.generateTreeString(depth, lastChildren, builder, verbose, "*") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 58f507119325..5de45b159684 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -88,11 +88,13 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkPlan { * @param logicalPlan plan to explain * @param extended whether to do extended explain or not * @param codegen whether to output generated code from whole-stage codegen or not + * @param cost whether to show cost information for operators. */ case class ExplainCommand( logicalPlan: LogicalPlan, extended: Boolean = false, - codegen: Boolean = false) + codegen: Boolean = false, + cost: Boolean = false) extends RunnableCommand { override val output: Seq[Attribute] = @@ -113,6 +115,8 @@ case class ExplainCommand( codegenString(queryExecution.executedPlan) } else if (extended) { queryExecution.toString + } else if (cost) { + queryExecution.toStringWithStats } else { queryExecution.simpleString } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala index bd1ce8aa3eb1..b38bbd8e7eef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala @@ -170,6 +170,27 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } checkColStats(df, mutable.LinkedHashMap(expectedColStats: _*)) } + + test("number format in statistics") { + val numbers = Seq( + BigInt(0) -> ("0.0 B", "0"), + BigInt(100) -> ("100.0 B", "100"), + BigInt(2047) -> ("2047.0 B", "2.05E+3"), + BigInt(2048) -> ("2.0 KB", "2.05E+3"), + BigInt(3333333) -> ("3.2 MB", "3.33E+6"), + BigInt(4444444444L) -> ("4.1 GB", "4.44E+9"), + BigInt(5555555555555L) -> ("5.1 TB", "5.56E+12"), + BigInt(6666666666666666L) -> ("5.9 PB", "6.67E+15"), + BigInt(1L << 10 ) * (1L << 60) -> ("1024.0 EB", "1.18E+21"), + BigInt(1L << 11) * (1L << 60) -> ("2.36E+21 B", "2.36E+21") + ) + numbers.foreach { case (input, (expectedSize, expectedRows)) => + val stats = Statistics(sizeInBytes = input, rowCount = Some(input)) + val expectedString = s"sizeInBytes=$expectedSize, rowCount=$expectedRows," + + s" isBroadcastable=${stats.isBroadcastable}" + assert(stats.simpleString == expectedString) + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala index f9751e3d5f2e..cfca1d79836b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala @@ -27,6 +27,19 @@ import org.apache.spark.sql.test.SQLTestUtils */ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { + test("show cost in explain command") { + // Only has sizeInBytes before ANALYZE command + checkKeywordsExist(sql("EXPLAIN COST SELECT * FROM src "), "sizeInBytes") + checkKeywordsNotExist(sql("EXPLAIN COST SELECT * FROM src "), "rowCount") + + // Has both sizeInBytes and rowCount after ANALYZE command + sql("ANALYZE TABLE src COMPUTE STATISTICS") + checkKeywordsExist(sql("EXPLAIN COST SELECT * FROM src "), "sizeInBytes", "rowCount") + + // No cost information + checkKeywordsNotExist(sql("EXPLAIN SELECT * FROM src "), "sizeInBytes", "rowCount") + } + test("explain extended command") { checkKeywordsExist(sql(" explain select * from src where key=123 "), "== Physical Plan ==")