Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ statement
(COMMENT comment=STRING) |
(TBLPROPERTIES tableProps=tablePropertyList))*
(AS? query)? #replaceTable
| ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS
| ANALYZE TABLE multipartIdentifier partitionSpec? COMPUTE STATISTICS
(identifier | FOR COLUMNS identifierSeq | FOR ALL COLUMNS)? #analyze
| ALTER TABLE multipartIdentifier
ADD (COLUMN | COLUMNS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2660,4 +2660,52 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
isExtended)
}
}

/**
* Create an [[AnalyzeTableStatement]], or an [[AnalyzeColumnStatement]].
* Example SQL for analyzing a table or a set of partitions :
* {{{
* ANALYZE TABLE multi_part_name [PARTITION (partcol1[=val1], partcol2[=val2], ...)]
* COMPUTE STATISTICS [NOSCAN];
* }}}
*
* Example SQL for analyzing columns :
* {{{
* ANALYZE TABLE multi_part_name COMPUTE STATISTICS FOR COLUMNS column1, column2;
* }}}
*
* Example SQL for analyzing all columns of a table:
* {{{
* ANALYZE TABLE multi_part_name COMPUTE STATISTICS FOR ALL COLUMNS;
* }}}
*/
override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = withOrigin(ctx) {
def checkPartitionSpec(): Unit = {
if (ctx.partitionSpec != null) {
logWarning("Partition specification is ignored when collecting column statistics: " +
ctx.partitionSpec.getText)
}
}
if (ctx.identifier != null &&
ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
throw new ParseException(s"Expected `NOSCAN` instead of `${ctx.identifier.getText}`", ctx)
}

val tableName = visitMultipartIdentifier(ctx.multipartIdentifier())
if (ctx.ALL() != null) {
checkPartitionSpec()
AnalyzeColumnStatement(tableName, None, allColumns = true)
} else if (ctx.identifierSeq() == null) {
val partitionSpec = if (ctx.partitionSpec != null) {
visitPartitionSpec(ctx.partitionSpec)
} else {
Map.empty[String, Option[String]]
}
AnalyzeTableStatement(tableName, partitionSpec, noScan = ctx.identifier != null)
} else {
checkPartitionSpec()
AnalyzeColumnStatement(
tableName, Option(visitIdentifierSeq(ctx.identifierSeq())), allColumns = false)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,22 @@ case class ShowNamespacesStatement(namespace: Option[Seq[String]], pattern: Opti
* A USE statement, as parsed from SQL.
*/
case class UseStatement(isNamespaceSet: Boolean, nameParts: Seq[String]) extends ParsedStatement

/**
* An ANALYZE TABLE statement, as parsed from SQL.
*/
case class AnalyzeTableStatement(
tableName: Seq[String],
partitionSpec: Map[String, Option[String]],
noScan: Boolean) extends ParsedStatement

/**
* An ANALYZE TABLE FOR COLUMNS statement, as parsed from SQL.
*/
case class AnalyzeColumnStatement(
tableName: Seq[String],
columnNames: Option[Seq[String]],
allColumns: Boolean) extends ParsedStatement {
require(columnNames.isDefined ^ allColumns, "Parameter `columnNames` or `allColumns` are " +
"mutually exclusive. Only one of them should be specified.")
}
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,82 @@ class DDLParserSuite extends AnalysisTest {
ShowNamespacesStatement(Some(Seq("testcat", "ns1")), Some("*pattern*")))
}

test("analyze table statistics") {
comparePlans(parsePlan("analyze table a.b.c compute statistics"),
AnalyzeTableStatement(Seq("a", "b", "c"), Map.empty, noScan = false))
comparePlans(parsePlan("analyze table a.b.c compute statistics noscan"),
AnalyzeTableStatement(Seq("a", "b", "c"), Map.empty, noScan = true))
comparePlans(parsePlan("analyze table a.b.c partition (a) compute statistics nOscAn"),
AnalyzeTableStatement(Seq("a", "b", "c"), Map("a" -> None), noScan = true))

// Partitions specified
comparePlans(
parsePlan("ANALYZE TABLE a.b.c PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS"),
AnalyzeTableStatement(
Seq("a", "b", "c"), Map("ds" -> Some("2008-04-09"), "hr" -> Some("11")), noScan = false))
comparePlans(
parsePlan("ANALYZE TABLE a.b.c PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS noscan"),
AnalyzeTableStatement(
Seq("a", "b", "c"), Map("ds" -> Some("2008-04-09"), "hr" -> Some("11")), noScan = true))
comparePlans(
parsePlan("ANALYZE TABLE a.b.c PARTITION(ds='2008-04-09') COMPUTE STATISTICS noscan"),
AnalyzeTableStatement(Seq("a", "b", "c"), Map("ds" -> Some("2008-04-09")), noScan = true))
comparePlans(
parsePlan("ANALYZE TABLE a.b.c PARTITION(ds='2008-04-09', hr) COMPUTE STATISTICS"),
AnalyzeTableStatement(
Seq("a", "b", "c"), Map("ds" -> Some("2008-04-09"), "hr" -> None), noScan = false))
comparePlans(
parsePlan("ANALYZE TABLE a.b.c PARTITION(ds='2008-04-09', hr) COMPUTE STATISTICS noscan"),
AnalyzeTableStatement(
Seq("a", "b", "c"), Map("ds" -> Some("2008-04-09"), "hr" -> None), noScan = true))
comparePlans(
parsePlan("ANALYZE TABLE a.b.c PARTITION(ds, hr=11) COMPUTE STATISTICS noscan"),
AnalyzeTableStatement(
Seq("a", "b", "c"), Map("ds" -> None, "hr" -> Some("11")), noScan = true))
comparePlans(
parsePlan("ANALYZE TABLE a.b.c PARTITION(ds, hr) COMPUTE STATISTICS"),
AnalyzeTableStatement(Seq("a", "b", "c"), Map("ds" -> None, "hr" -> None), noScan = false))
comparePlans(
parsePlan("ANALYZE TABLE a.b.c PARTITION(ds, hr) COMPUTE STATISTICS noscan"),
AnalyzeTableStatement(Seq("a", "b", "c"), Map("ds" -> None, "hr" -> None), noScan = true))

intercept("analyze table a.b.c compute statistics xxxx",
"Expected `NOSCAN` instead of `xxxx`")
intercept("analyze table a.b.c partition (a) compute statistics xxxx",
"Expected `NOSCAN` instead of `xxxx`")
}

test("analyze table column statistics") {
intercept("ANALYZE TABLE a.b.c COMPUTE STATISTICS FOR COLUMNS", "")

comparePlans(
parsePlan("ANALYZE TABLE a.b.c COMPUTE STATISTICS FOR COLUMNS key, value"),
AnalyzeColumnStatement(Seq("a", "b", "c"), Option(Seq("key", "value")), allColumns = false))

// Partition specified - should be ignored
comparePlans(
parsePlan(
s"""
|ANALYZE TABLE a.b.c PARTITION(ds='2017-06-10')
|COMPUTE STATISTICS FOR COLUMNS key, value
""".stripMargin),
AnalyzeColumnStatement(Seq("a", "b", "c"), Option(Seq("key", "value")), allColumns = false))

// Partition specified should be ignored in case of COMPUTE STATISTICS FOR ALL COLUMNS
comparePlans(
parsePlan(
s"""
|ANALYZE TABLE a.b.c PARTITION(ds='2017-06-10')
|COMPUTE STATISTICS FOR ALL COLUMNS
""".stripMargin),
AnalyzeColumnStatement(Seq("a", "b", "c"), None, allColumns = true))

intercept("ANALYZE TABLE a.b.c COMPUTE STATISTICS FOR ALL COLUMNS key, value",
"mismatched input 'key' expecting <EOF>")
intercept("ANALYZE TABLE a.b.c COMPUTE STATISTICS FOR ALL",
"missing 'COLUMNS' at '<EOF>'")
}

private case class TableSpec(
name: Seq[String],
schema: Option[StructType],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, TableChange, V1Table}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowTablesCommand}
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowTablesCommand}
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -265,6 +265,24 @@ class ResolveSessionCatalog(
// TODO (SPARK-29014): we should check if the current catalog is session catalog here.
case ShowTablesStatement(None, pattern) if defaultCatalog.isEmpty =>
ShowTablesCommand(None, pattern)

case AnalyzeTableStatement(tableName, partitionSpec, noScan) =>
val CatalogAndIdentifierParts(catalog, parts) = tableName
if (!isSessionCatalog(catalog)) {
throw new AnalysisException("ANALYZE TABLE is only supported with v1 tables.")
}
if (partitionSpec.isEmpty) {
AnalyzeTableCommand(parts.asTableIdentifier, noScan)
} else {
AnalyzePartitionCommand(parts.asTableIdentifier, partitionSpec, noScan)
}

case AnalyzeColumnStatement(tableName, columnNames, allColumns) =>
val CatalogAndIdentifierParts(catalog, parts) = tableName
if (!isSessionCatalog(catalog)) {
throw new AnalysisException("ANALYZE TABLE is only supported with v1 tables.")
}
AnalyzeColumnCommand(parts.asTableIdentifier, columnNames, allColumns)
}

private def buildCatalogTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,55 +89,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
ResetCommand
}

/**
* Create an [[AnalyzeTableCommand]] command, or an [[AnalyzePartitionCommand]]
* or an [[AnalyzeColumnCommand]] command.
* Example SQL for analyzing a table or a set of partitions :
* {{{
* ANALYZE TABLE [db_name.]tablename [PARTITION (partcol1[=val1], partcol2[=val2], ...)]
* COMPUTE STATISTICS [NOSCAN];
* }}}
*
* Example SQL for analyzing columns :
* {{{
* ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR COLUMNS column1, column2;
* }}}
*
* Example SQL for analyzing all columns of a table:
* {{{
* ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR ALL COLUMNS;
* }}}
*/
override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = withOrigin(ctx) {
def checkPartitionSpec(): Unit = {
if (ctx.partitionSpec != null) {
logWarning("Partition specification is ignored when collecting column statistics: " +
ctx.partitionSpec.getText)
}
}
if (ctx.identifier != null &&
ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
throw new ParseException(s"Expected `NOSCAN` instead of `${ctx.identifier.getText}`", ctx)
}

val table = visitTableIdentifier(ctx.tableIdentifier)
if (ctx.ALL() != null) {
checkPartitionSpec()
AnalyzeColumnCommand(table, None, allColumns = true)
} else if (ctx.identifierSeq() == null) {
if (ctx.partitionSpec != null) {
AnalyzePartitionCommand(table, visitPartitionSpec(ctx.partitionSpec),
noscan = ctx.identifier != null)
} else {
AnalyzeTableCommand(table, noscan = ctx.identifier != null)
}
} else {
checkPartitionSpec()
AnalyzeColumnCommand(table,
Option(visitIdentifierSeq(ctx.identifierSeq())), allColumns = false)
}
}

/**
* Create a [[ShowTablesCommand]] logical plan.
* Example SQL :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,23 @@ class DataSourceV2SQLSuite
}
}

test("ANALYZE TABLE") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo")

val e = intercept[AnalysisException] {
sql(s"ANALYZE TABLE $t COMPUTE STATISTICS")
}
assert(e.message.contains("ANALYZE TABLE is only supported with v1 tables"))

val e2 = intercept[AnalysisException] {
sql(s"ANALYZE TABLE $t COMPUTE STATISTICS FOR ALL COLUMNS")
}
assert(e2.message.contains("ANALYZE TABLE is only supported with v1 tables"))
}
}

private def assertAnalysisError(sqlStatement: String, expectedError: String): Unit = {
val errMsg = intercept[AnalysisException] {
sql(sqlStatement)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,68 +216,6 @@ class SparkSqlParserSuite extends AnalysisTest {
assertEqual("DESCRIBE " + query, DescribeQueryCommand(query, parser.parsePlan(query)))
}

test("analyze table statistics") {
assertEqual("analyze table t compute statistics",
AnalyzeTableCommand(TableIdentifier("t"), noscan = false))
assertEqual("analyze table t compute statistics noscan",
AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
assertEqual("analyze table t partition (a) compute statistics nOscAn",
AnalyzePartitionCommand(TableIdentifier("t"), Map("a" -> None), noscan = true))

// Partitions specified
assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS",
AnalyzePartitionCommand(TableIdentifier("t"), noscan = false,
partitionSpec = Map("ds" -> Some("2008-04-09"), "hr" -> Some("11"))))
assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS noscan",
AnalyzePartitionCommand(TableIdentifier("t"), noscan = true,
partitionSpec = Map("ds" -> Some("2008-04-09"), "hr" -> Some("11"))))
assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09') COMPUTE STATISTICS noscan",
AnalyzePartitionCommand(TableIdentifier("t"), noscan = true,
partitionSpec = Map("ds" -> Some("2008-04-09"))))
assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr) COMPUTE STATISTICS",
AnalyzePartitionCommand(TableIdentifier("t"), noscan = false,
partitionSpec = Map("ds" -> Some("2008-04-09"), "hr" -> None)))
assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr) COMPUTE STATISTICS noscan",
AnalyzePartitionCommand(TableIdentifier("t"), noscan = true,
partitionSpec = Map("ds" -> Some("2008-04-09"), "hr" -> None)))
assertEqual("ANALYZE TABLE t PARTITION(ds, hr=11) COMPUTE STATISTICS noscan",
AnalyzePartitionCommand(TableIdentifier("t"), noscan = true,
partitionSpec = Map("ds" -> None, "hr" -> Some("11"))))
assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS",
AnalyzePartitionCommand(TableIdentifier("t"), noscan = false,
partitionSpec = Map("ds" -> None, "hr" -> None)))
assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS noscan",
AnalyzePartitionCommand(TableIdentifier("t"), noscan = true,
partitionSpec = Map("ds" -> None, "hr" -> None)))

intercept("analyze table t compute statistics xxxx",
"Expected `NOSCAN` instead of `xxxx`")
intercept("analyze table t partition (a) compute statistics xxxx",
"Expected `NOSCAN` instead of `xxxx`")
}

test("analyze table column statistics") {
intercept("ANALYZE TABLE t COMPUTE STATISTICS FOR COLUMNS", "")

assertEqual("ANALYZE TABLE t COMPUTE STATISTICS FOR COLUMNS key, value",
AnalyzeColumnCommand(TableIdentifier("t"), Option(Seq("key", "value")), allColumns = false))

// Partition specified - should be ignored
assertEqual("ANALYZE TABLE t PARTITION(ds='2017-06-10') " +
"COMPUTE STATISTICS FOR COLUMNS key, value",
AnalyzeColumnCommand(TableIdentifier("t"), Option(Seq("key", "value")), allColumns = false))

// Partition specified should be ignored in case of COMPUTE STATISTICS FOR ALL COLUMNS
assertEqual("ANALYZE TABLE t PARTITION(ds='2017-06-10') " +
"COMPUTE STATISTICS FOR ALL COLUMNS",
AnalyzeColumnCommand(TableIdentifier("t"), None, allColumns = true))

intercept("ANALYZE TABLE t COMPUTE STATISTICS FOR ALL COLUMNS key, value",
"mismatched input 'key' expecting <EOF>")
intercept("ANALYZE TABLE t COMPUTE STATISTICS FOR ALL",
"missing 'COLUMNS' at '<EOF>'")
}

test("query organization") {
// Test all valid combinations of order by/sort by/distribute by/cluster by/limit/windows
val baseSql = "select * from t"
Expand Down