Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog,
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
import org.apache.spark.sql.connector.expressions.{FieldReference, LiteralValue}
import org.apache.spark.sql.connector.expressions.filter.{And => V2And, Not => V2Not, Or => V2Or, Predicate}
import org.apache.spark.sql.connector.read.LocalScan
import org.apache.spark.sql.connector.read.{LocalScan, SupportsReportStatistics}
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.connector.write.V1Write
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
Expand Down Expand Up @@ -329,10 +329,17 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
}
DescribeTableExec(output, r.table, isExtended) :: Nil

case DescribeColumn(_: ResolvedTable, column, isExtended, output) =>
case DescribeColumn(r: ResolvedTable, column, isExtended, output) =>
column match {
case c: Attribute =>
DescribeColumnExec(output, c, isExtended) :: Nil
val colStats =
r.table.asReadable.newScanBuilder(CaseInsensitiveStringMap.empty()).build() match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this may not be very cheap. Shall we only do it if isExtended is true?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and what if the table is not readable (e.g. write-only)? We should not fail but show no column stats.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I think it's cleaner to pass v2 Table to DescribeColumnExec and move this code block to DescribeColumnExec

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Thanks for your comments. I will have a follow-up to fix this and also move the test to parent suite.

case s: SupportsReportStatistics =>
val stats = s.estimateStatistics()
Some(stats.columnStats().get(FieldReference.column(c.name)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is columnStats case-sensitive or not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is controlled by SQLConf.CASE_SENSITIVE

case _ => None
}
DescribeColumnExec(output, c, isExtended, colStats) :: Nil
case nested =>
throw QueryCompilationErrors.commandNotSupportNestedColumnError(
"DESC TABLE COLUMN", toPrettySQL(nested))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.read.colstats.ColumnStatistics

case class DescribeColumnExec(
override val output: Seq[Attribute],
column: Attribute,
isExtended: Boolean) extends LeafV2CommandExec {
isExtended: Boolean,
colStats: Option[ColumnStatistics] = None) extends LeafV2CommandExec {

override protected def run(): Seq[InternalRow] = {
val rows = new ArrayBuffer[InternalRow]()
Expand All @@ -42,7 +44,43 @@ case class DescribeColumnExec(
CharVarcharUtils.getRawType(column.metadata).getOrElse(column.dataType).catalogString)
rows += toCatalystRow("comment", comment)

// TODO: The extended description (isExtended = true) can be added here.
if (isExtended && colStats.nonEmpty) {
if (colStats.get.min().isPresent) {
rows += toCatalystRow("min", colStats.get.min().toString)
} else {
rows += toCatalystRow("min", "NULL")
}

if (colStats.get.max().isPresent) {
rows += toCatalystRow("max", colStats.get.max().toString)
} else {
rows += toCatalystRow("max", "NULL")
}

if (colStats.get.nullCount().isPresent) {
rows += toCatalystRow("num_nulls", colStats.get.nullCount().getAsLong.toString)
} else {
rows += toCatalystRow("num_nulls", "NULL")
}

if (colStats.get.distinctCount().isPresent) {
rows += toCatalystRow("distinct_count", colStats.get.distinctCount().getAsLong.toString)
} else {
rows += toCatalystRow("distinct_count", "NULL")
}

if (colStats.get.avgLen().isPresent) {
rows += toCatalystRow("avg_col_len", colStats.get.avgLen().getAsLong.toString)
} else {
rows += toCatalystRow("avg_col_len", "NULL")
}

if (colStats.get.maxLen().isPresent) {
rows += toCatalystRow("max_col_len", colStats.get.maxLen().getAsLong.toString)
} else {
rows += toCatalystRow("max_col_len", "NULL")
}
}

rows.toSeq
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,14 @@ class DescribeTableSuite extends command.DescribeTableSuiteBase
}
}

// TODO(SPARK-39859): Support v2 `DESCRIBE TABLE EXTENDED` for columns
test("describe extended (formatted) a column") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't have to be done immediately, but it's better to move this test to the parent suite, to make sure v1 and v2 commands have the same behavior.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

withNamespaceAndTable("ns", "tbl") { tbl =>
sql(s"""
|CREATE TABLE $tbl
|(key INT COMMENT 'column_comment', col STRING)
|$defaultUsing""".stripMargin)

sql(s"INSERT INTO $tbl values (1, 'aaa'), (2, 'bbb'), (3, 'ccc'), (null, 'ddd')")
val descriptionDf = sql(s"DESCRIBE TABLE EXTENDED $tbl key")
assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === Seq(
("info_name", StringType),
Expand All @@ -165,7 +166,13 @@ class DescribeTableSuite extends command.DescribeTableSuiteBase
Seq(
Row("col_name", "key"),
Row("data_type", "int"),
Row("comment", "column_comment")))
Row("comment", "column_comment"),
Row("min", "NULL"),
Row("max", "NULL"),
Row("num_nulls", "1"),
Row("distinct_count", "4"),
Row("avg_col_len", "NULL"),
Row("max_col_len", "NULL")))
}
}
}