Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 27 additions & 13 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.util

import java.io._
import java.lang.management.{LockInfo, ManagementFactory, MonitorInfo, ThreadInfo}
import java.math.{MathContext, RoundingMode}
import java.net._
import java.nio.ByteBuffer
import java.nio.channels.Channels
Expand Down Expand Up @@ -1109,26 +1110,39 @@ private[spark] object Utils extends Logging {
/**
* Convert a quantity in bytes to a human-readable string such as "4.0 MB".
*/
def bytesToString(size: Long): String = {
def bytesToString(size: Long): String = bytesToString(BigInt(size))

def bytesToString(size: BigInt): String = {
val EB = 1L << 60
val PB = 1L << 50
val TB = 1L << 40
val GB = 1L << 30
val MB = 1L << 20
val KB = 1L << 10

val (value, unit) = {
if (size >= 2*TB) {
(size.asInstanceOf[Double] / TB, "TB")
} else if (size >= 2*GB) {
(size.asInstanceOf[Double] / GB, "GB")
} else if (size >= 2*MB) {
(size.asInstanceOf[Double] / MB, "MB")
} else if (size >= 2*KB) {
(size.asInstanceOf[Double] / KB, "KB")
} else {
(size.asInstanceOf[Double], "B")
if (size >= BigInt(1L << 11) * EB) {
// The number is too large, show it in scientific notation.
BigDecimal(size, new MathContext(3, RoundingMode.HALF_UP)).toString() + " B"
} else {
val (value, unit) = {
if (size >= 2 * EB) {
(BigDecimal(size) / EB, "EB")
} else if (size >= 2 * PB) {
(BigDecimal(size) / PB, "PB")
} else if (size >= 2 * TB) {
(BigDecimal(size) / TB, "TB")
} else if (size >= 2 * GB) {
(BigDecimal(size) / GB, "GB")
} else if (size >= 2 * MB) {
(BigDecimal(size) / MB, "MB")
} else if (size >= 2 * KB) {
(BigDecimal(size) / KB, "KB")
} else {
(BigDecimal(size), "B")
}
}
"%.1f %s".formatLocal(Locale.US, value, unit)
}
"%.1f %s".formatLocal(Locale.US, value, unit)
}

/**
Expand Down
5 changes: 4 additions & 1 deletion core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,10 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assert(Utils.bytesToString(2097152) === "2.0 MB")
assert(Utils.bytesToString(2306867) === "2.2 MB")
assert(Utils.bytesToString(5368709120L) === "5.0 GB")
assert(Utils.bytesToString(5L * 1024L * 1024L * 1024L * 1024L) === "5.0 TB")
assert(Utils.bytesToString(5L * (1L << 40)) === "5.0 TB")
assert(Utils.bytesToString(5L * (1L << 50)) === "5.0 PB")
assert(Utils.bytesToString(5L * (1L << 60)) === "5.0 EB")
assert(Utils.bytesToString(BigInt(1L << 11) * (1L << 60)) === "2.36E+21 B")
}

test("copyStream") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ statement
| CREATE TEMPORARY? FUNCTION qualifiedName AS className=STRING
(USING resource (',' resource)*)? #createFunction
| DROP TEMPORARY? FUNCTION (IF EXISTS)? qualifiedName #dropFunction
| EXPLAIN (LOGICAL | FORMATTED | EXTENDED | CODEGEN)? statement #explain
| EXPLAIN (LOGICAL | FORMATTED | EXTENDED | CODEGEN | COST)?
statement #explain
| SHOW TABLES ((FROM | IN) db=identifier)?
(LIKE? pattern=STRING)? #showTables
| SHOW TABLE EXTENDED ((FROM | IN) db=identifier)?
Expand Down Expand Up @@ -693,7 +694,7 @@ nonReserved
| DELIMITED | FIELDS | TERMINATED | COLLECTION | ITEMS | KEYS | ESCAPED | LINES | SEPARATED
| EXTENDED | REFRESH | CLEAR | CACHE | UNCACHE | LAZY | GLOBAL | TEMPORARY | OPTIONS
| GROUPING | CUBE | ROLLUP
| EXPLAIN | FORMAT | LOGICAL | FORMATTED | CODEGEN
| EXPLAIN | FORMAT | LOGICAL | FORMATTED | CODEGEN | COST
| TABLESAMPLE | USE | TO | BUCKET | PERCENTLIT | OUT | OF
| SET | RESET
| VIEW | REPLACE
Expand Down Expand Up @@ -794,6 +795,7 @@ EXPLAIN: 'EXPLAIN';
FORMAT: 'FORMAT';
LOGICAL: 'LOGICAL';
CODEGEN: 'CODEGEN';
COST: 'COST';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also put in it nonReserved

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Also please update the hiveNonReservedKeyword in TableIdentifierParserSuite

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Updated.

CAST: 'CAST';
SHOW: 'SHOW';
TABLES: 'TABLES';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
Statistics(sizeInBytes = children.map(_.stats(conf).sizeInBytes).product)
}

override def verboseStringWithSuffix: String = {
super.verboseString + statsCache.map(", " + _.toString).getOrElse("")
}

/**
* Returns the maximum number of rows that this plan may compute.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

package org.apache.spark.sql.catalyst.plans.logical

import java.math.{MathContext, RoundingMode}

import scala.util.control.NonFatal

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils


/**
Expand Down Expand Up @@ -54,8 +57,13 @@ case class Statistics(

/** Readable string representation for the Statistics. */
def simpleString: String = {
Seq(s"sizeInBytes=$sizeInBytes",
if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "",
Seq(s"sizeInBytes=${Utils.bytesToString(sizeInBytes)}",
if (rowCount.isDefined) {
// Show row count in scientific notation.
s"rowCount=${BigDecimal(rowCount.get, new MathContext(3, RoundingMode.HALF_UP)).toString()}"
} else {
""
},
s"isBroadcastable=$isBroadcastable"
).filter(_.nonEmpty).mkString(", ")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,13 +453,16 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
/** ONE line description of this node with more information */
def verboseString: String

/** ONE line description of this node with some suffix information */
def verboseStringWithSuffix: String = verboseString

override def toString: String = treeString

/** Returns a string representation of the nodes in this tree */
def treeString: String = treeString(verbose = true)

def treeString(verbose: Boolean): String = {
generateTreeString(0, Nil, new StringBuilder, verbose).toString
def treeString(verbose: Boolean, addSuffix: Boolean = false): String = {
generateTreeString(0, Nil, new StringBuilder, verbose = verbose, addSuffix = addSuffix).toString
}

/**
Expand Down Expand Up @@ -524,7 +527,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
lastChildren: Seq[Boolean],
builder: StringBuilder,
verbose: Boolean,
prefix: String = ""): StringBuilder = {
prefix: String = "",
addSuffix: Boolean = false): StringBuilder = {

if (depth > 0) {
lastChildren.init.foreach { isLast =>
Expand All @@ -533,22 +537,29 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
builder.append(if (lastChildren.last) "+- " else ":- ")
}

val str = if (verbose) {
if (addSuffix) verboseStringWithSuffix else verboseString
} else {
simpleString
}
builder.append(prefix)
builder.append(if (verbose) verboseString else simpleString)
builder.append(str)
builder.append("\n")

if (innerChildren.nonEmpty) {
innerChildren.init.foreach(_.generateTreeString(
depth + 2, lastChildren :+ children.isEmpty :+ false, builder, verbose))
depth + 2, lastChildren :+ children.isEmpty :+ false, builder, verbose,
addSuffix = addSuffix))
innerChildren.last.generateTreeString(
depth + 2, lastChildren :+ children.isEmpty :+ true, builder, verbose)
depth + 2, lastChildren :+ children.isEmpty :+ true, builder, verbose,
addSuffix = addSuffix)
}

if (children.nonEmpty) {
children.init.foreach(_.generateTreeString(
depth + 1, lastChildren :+ false, builder, verbose, prefix))
depth + 1, lastChildren :+ false, builder, verbose, prefix, addSuffix))
children.last.generateTreeString(
depth + 1, lastChildren :+ true, builder, verbose, prefix)
depth + 1, lastChildren :+ true, builder, verbose, prefix, addSuffix)
}

builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ class TableIdentifierParserSuite extends SparkFunSuite {
// Add "$elem$", "$value$" & "$key$"
val hiveNonReservedKeyword = Array("add", "admin", "after", "analyze", "archive", "asc", "before",
"bucket", "buckets", "cascade", "change", "cluster", "clustered", "clusterstatus", "collection",
"columns", "comment", "compact", "compactions", "compute", "concatenate", "continue", "data",
"day", "databases", "datetime", "dbproperties", "deferred", "defined", "delimited",
"columns", "comment", "compact", "compactions", "compute", "concatenate", "continue", "cost",
"data", "day", "databases", "datetime", "dbproperties", "deferred", "defined", "delimited",
"dependency", "desc", "directories", "directory", "disable", "distribute",
"enable", "escaped", "exclusive", "explain", "export", "fields", "file", "fileformat", "first",
"format", "formatted", "functions", "hold_ddltime", "hour", "idxproperties", "ignore", "index",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,20 +197,32 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
""".stripMargin.trim
}

override def toString: String = {
override def toString: String = completeString(appendStats = false)

def toStringWithStats: String = completeString(appendStats = true)

private def completeString(appendStats: Boolean): String = {
def output = Utils.truncatedString(
analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ")
val analyzedPlan = Seq(
stringOrError(output),
stringOrError(analyzed.treeString(verbose = true))
).filter(_.nonEmpty).mkString("\n")

val optimizedPlanString = if (appendStats) {
// trigger to compute stats for logical plans
optimizedPlan.stats(sparkSession.sessionState.conf)
optimizedPlan.treeString(verbose = true, addSuffix = true)
} else {
optimizedPlan.treeString(verbose = true)
}

s"""== Parsed Logical Plan ==
|${stringOrError(logical.treeString(verbose = true))}
|== Analyzed Logical Plan ==
|$analyzedPlan
|== Optimized Logical Plan ==
|${stringOrError(optimizedPlan.treeString(verbose = true))}
|${stringOrError(optimizedPlanString)}
|== Physical Plan ==
|${stringOrError(executedPlan.treeString(verbose = true))}
""".stripMargin.trim
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
if (statement == null) {
null // This is enough since ParseException will raise later.
} else if (isExplainableStatement(statement)) {
ExplainCommand(statement, extended = ctx.EXTENDED != null, codegen = ctx.CODEGEN != null)
ExplainCommand(
logicalPlan = statement,
extended = ctx.EXTENDED != null,
codegen = ctx.CODEGEN != null,
cost = ctx.COST != null)
} else {
ExplainCommand(OneRowRelation)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp
lastChildren: Seq[Boolean],
builder: StringBuilder,
verbose: Boolean,
prefix: String = ""): StringBuilder = {
prefix: String = "",
addSuffix: Boolean = false): StringBuilder = {
child.generateTreeString(depth, lastChildren, builder, verbose, "")
}
}
Expand Down Expand Up @@ -428,7 +429,8 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co
lastChildren: Seq[Boolean],
builder: StringBuilder,
verbose: Boolean,
prefix: String = ""): StringBuilder = {
prefix: String = "",
addSuffix: Boolean = false): StringBuilder = {
child.generateTreeString(depth, lastChildren, builder, verbose, "*")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,13 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkPlan {
* @param logicalPlan plan to explain
* @param extended whether to do extended explain or not
* @param codegen whether to output generated code from whole-stage codegen or not
* @param cost whether to show cost information for operators.
*/
case class ExplainCommand(
logicalPlan: LogicalPlan,
extended: Boolean = false,
codegen: Boolean = false)
codegen: Boolean = false,
cost: Boolean = false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add @parm like the other parameters

extends RunnableCommand {

override val output: Seq[Attribute] =
Expand All @@ -113,6 +115,8 @@ case class ExplainCommand(
codegenString(queryExecution.executedPlan)
} else if (extended) {
queryExecution.toString
} else if (cost) {
queryExecution.toStringWithStats
} else {
queryExecution.simpleString
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,27 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
}
checkColStats(df, mutable.LinkedHashMap(expectedColStats: _*))
}

test("number format in statistics") {
val numbers = Seq(
BigInt(0) -> ("0.0 B", "0"),
BigInt(100) -> ("100.0 B", "100"),
BigInt(2047) -> ("2047.0 B", "2.05E+3"),
BigInt(2048) -> ("2.0 KB", "2.05E+3"),
BigInt(3333333) -> ("3.2 MB", "3.33E+6"),
BigInt(4444444444L) -> ("4.1 GB", "4.44E+9"),
BigInt(5555555555555L) -> ("5.1 TB", "5.56E+12"),
BigInt(6666666666666666L) -> ("5.9 PB", "6.67E+15"),
BigInt(1L << 10 ) * (1L << 60) -> ("1024.0 EB", "1.18E+21"),
BigInt(1L << 11) * (1L << 60) -> ("2.36E+21 B", "2.36E+21")
)
numbers.foreach { case (input, (expectedSize, expectedRows)) =>
val stats = Statistics(sizeInBytes = input, rowCount = Some(input))
val expectedString = s"sizeInBytes=$expectedSize, rowCount=$expectedRows," +
s" isBroadcastable=${stats.isBroadcastable}"
assert(stats.simpleString == expectedString)
}
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@ import org.apache.spark.sql.test.SQLTestUtils
*/
class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {

test("show cost in explain command") {
// Only has sizeInBytes before ANALYZE command
checkKeywordsExist(sql("EXPLAIN COST SELECT * FROM src "), "sizeInBytes")
checkKeywordsNotExist(sql("EXPLAIN COST SELECT * FROM src "), "rowCount")

// Has both sizeInBytes and rowCount after ANALYZE command
sql("ANALYZE TABLE src COMPUTE STATISTICS")
checkKeywordsExist(sql("EXPLAIN COST SELECT * FROM src "), "sizeInBytes", "rowCount")

// No cost information
checkKeywordsNotExist(sql("EXPLAIN SELECT * FROM src "), "sizeInBytes", "rowCount")
}

test("explain extended command") {
checkKeywordsExist(sql(" explain select * from src where key=123 "),
"== Physical Plan ==")
Expand Down