From a85b585ed59e8b8e90db66f55bf0fc3c36a3ea0a Mon Sep 17 00:00:00 2001 From: huaxingao Date: Thu, 16 Feb 2023 13:39:22 -0800 Subject: [PATCH 1/2] [Spark-39859][SQL] Support v2 DESCRIBE TABLE EXTENDED for columns --- .../datasources/v2/DataSourceV2Strategy.scala | 13 ++++-- .../datasources/v2/DescribeColumnExec.scala | 42 ++++++++++++++++++- .../command/v2/DescribeTableSuite.scala | 11 ++++- 3 files changed, 59 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 29f0da1158ff..757b66e1534a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, import org.apache.spark.sql.connector.catalog.index.SupportsIndex import org.apache.spark.sql.connector.expressions.{FieldReference, LiteralValue} import org.apache.spark.sql.connector.expressions.filter.{And => V2And, Not => V2Not, Or => V2Or, Predicate} -import org.apache.spark.sql.connector.read.LocalScan +import org.apache.spark.sql.connector.read.{LocalScan, SupportsReportStatistics} import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} import org.apache.spark.sql.connector.write.V1Write import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} @@ -329,10 +329,17 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat } DescribeTableExec(output, r.table, isExtended) :: Nil - case DescribeColumn(_: ResolvedTable, column, isExtended, output) => + case DescribeColumn(r: ResolvedTable, column, isExtended, output) => column match { case c: Attribute => - DescribeColumnExec(output, c, isExtended) :: Nil + val colStats = + r.table.asReadable.newScanBuilder(CaseInsensitiveStringMap.empty()).build() match { + case s: SupportsReportStatistics => + val stats = s.estimateStatistics() + Some(stats.columnStats().get(FieldReference.column(c.name))) + case _ => None + } + DescribeColumnExec(output, c, isExtended, colStats) :: Nil case nested => throw QueryCompilationErrors.commandNotSupportNestedColumnError( "DESC TABLE COLUMN", toPrettySQL(nested)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeColumnExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeColumnExec.scala index 3be9b5c5471a..ebe051f6180e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeColumnExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeColumnExec.scala @@ -22,11 +22,13 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.util.CharVarcharUtils +import org.apache.spark.sql.connector.read.colstats.ColumnStatistics case class DescribeColumnExec( override val output: Seq[Attribute], column: Attribute, - isExtended: Boolean) extends LeafV2CommandExec { + isExtended: Boolean, + colStats: Option[ColumnStatistics] = None) extends LeafV2CommandExec { override protected def run(): Seq[InternalRow] = { val rows = new ArrayBuffer[InternalRow]() @@ -42,7 +44,43 @@ case class DescribeColumnExec( CharVarcharUtils.getRawType(column.metadata).getOrElse(column.dataType).catalogString) rows += toCatalystRow("comment", comment) - // TODO: The extended description (isExtended = true) can be added here. + if (isExtended && colStats.nonEmpty) { + if (colStats.get.min().isPresent) { + rows += toCatalystRow("min", colStats.get.min().toString) + } else { + rows += toCatalystRow("min", "NULL") + } + + if (colStats.get.max().isPresent) { + rows += toCatalystRow("max", colStats.get.max().toString) + } else { + rows += toCatalystRow("max", "NULL") + } + + if (colStats.get.nullCount().isPresent) { + rows += toCatalystRow("num_nulls", colStats.get.nullCount().getAsLong.toString) + } else { + rows += toCatalystRow("num_nulls", "NULL") + } + + if (colStats.get.distinctCount().isPresent) { + rows += toCatalystRow("distinct_count", colStats.get.distinctCount().getAsLong.toString) + } else { + rows += toCatalystRow("distinct_count", "NULL") + } + + if (colStats.get.avgLen().isPresent) { + rows += toCatalystRow("avg_col_len", colStats.get.avgLen().getAsLong.toString) + } else { + rows += toCatalystRow("avg_col_len", "NULL") + } + + if (colStats.get.avgLen().isPresent) { + rows += toCatalystRow("max_col_len", colStats.get.avgLen().getAsLong.toString) + } else { + rows += toCatalystRow("max_col_len", "NULL") + } + } rows.toSeq } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala index 334521a96e5b..25363dcea699 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala @@ -149,13 +149,14 @@ class DescribeTableSuite extends command.DescribeTableSuiteBase } } - // TODO(SPARK-39859): Support v2 `DESCRIBE TABLE EXTENDED` for columns test("describe extended (formatted) a column") { withNamespaceAndTable("ns", "tbl") { tbl => sql(s""" |CREATE TABLE $tbl |(key INT COMMENT 'column_comment', col STRING) |$defaultUsing""".stripMargin) + + sql(s"INSERT INTO $tbl values (1, 'aaa'), (2, 'bbb'), (3, 'ccc'), (null, 'ddd')") val descriptionDf = sql(s"DESCRIBE TABLE EXTENDED $tbl key") assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === Seq( ("info_name", StringType), @@ -165,7 +166,13 @@ class DescribeTableSuite extends command.DescribeTableSuiteBase Seq( Row("col_name", "key"), Row("data_type", "int"), - Row("comment", "column_comment"))) + Row("comment", "column_comment"), + Row("min", "NULL"), + Row("max", "NULL"), + Row("num_nulls", "1"), + Row("distinct_count", "4"), + Row("avg_col_len", "NULL"), + Row("max_col_len", "NULL"))) } } } From 248564a4fdf1d9b1a5e4e00ad663bdbcc08bae86 Mon Sep 17 00:00:00 2001 From: huaxingao Date: Thu, 16 Feb 2023 15:09:54 -0800 Subject: [PATCH 2/2] address comments --- .../sql/execution/datasources/v2/DescribeColumnExec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeColumnExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeColumnExec.scala index ebe051f6180e..491c214080a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeColumnExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeColumnExec.scala @@ -75,8 +75,8 @@ case class DescribeColumnExec( rows += toCatalystRow("avg_col_len", "NULL") } - if (colStats.get.avgLen().isPresent) { - rows += toCatalystRow("max_col_len", colStats.get.avgLen().getAsLong.toString) + if (colStats.get.maxLen().isPresent) { + rows += toCatalystRow("max_col_len", colStats.get.maxLen().getAsLong.toString) } else { rows += toCatalystRow("max_col_len", "NULL") }