diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Statistics.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Statistics.java index a4ef785da2c65..270b75025946d 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Statistics.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Statistics.java @@ -17,9 +17,13 @@ package org.apache.spark.sql.connector.read; +import java.util.Map; +import java.util.Optional; import java.util.OptionalLong; import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.NamedReference; +import org.apache.spark.sql.connector.read.colstats.ColumnStatistics; /** * An interface to represent statistics for a data source, which is returned by @@ -31,4 +35,7 @@ public interface Statistics { OptionalLong sizeInBytes(); OptionalLong numRows(); + default Optional> columnStats() { + return Optional.empty(); + } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/colstats/ColumnStatistics.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/colstats/ColumnStatistics.java new file mode 100644 index 0000000000000..295e7ce922e1d --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/colstats/ColumnStatistics.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read.colstats; + +import java.util.Optional; +import java.util.OptionalLong; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.read.Statistics; + +/** + * An interface to represent column statistics, which is part of + * {@link Statistics}. + * + * @since 3.4.0 + */ +@Evolving +public interface ColumnStatistics { + + /** + * @return number of distinct values in the column + */ + default OptionalLong distinctCount() { + return OptionalLong.empty(); + } + + /** + * @return minimum value in the column + */ + default Optional min() { + return Optional.empty(); + } + + /** + * @return maximum value in the column + */ + default Optional max() { + return Optional.empty(); + } + + /** + * @return number of nulls in the column + */ + default OptionalLong nullCount() { + return OptionalLong.empty(); + } + + /** + * @return average length of the values in the column + */ + default OptionalLong avgLen() { + return OptionalLong.empty(); + } + + /** + * @return maximum length of the values in the column + */ + default OptionalLong maxLen() { + return OptionalLong.empty(); + } + + /** + * @return histogram of the values in the column + */ + default Optional histogram() { + return Optional.empty(); + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/colstats/Histogram.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/colstats/Histogram.java new file mode 100644 index 0000000000000..a991c9e3d7114 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/colstats/Histogram.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read.colstats; + +import org.apache.spark.annotation.Evolving; + +/** + * An interface to represent an equi-height histogram, which is a part of + * {@link ColumnStatistics}. Equi-height histogram represents the distribution of + * a column's values by a sequence of bins. + * + * @since 3.4.0 + */ +@Evolving +public interface Histogram { + /** + * @return number of rows in each bin + */ + double height(); + + /** + * @return equi-height histogram bins + */ + HistogramBin[] bins(); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/colstats/HistogramBin.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/colstats/HistogramBin.java new file mode 100644 index 0000000000000..50bb00bd3cb49 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/colstats/HistogramBin.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read.colstats; + +import org.apache.spark.annotation.Evolving; + +/** + * An interface to represent a bin in an equi-height histogram. + * + * @since 3.4.0 + */ +@Evolving +public interface HistogramBin { + /** + * @return lower bound of the value range in this bin + */ + double lo(); + + /** + * @return higher bound of the value range in this bin + */ + double hi(); + + /** + * @return approximate number of distinct values in this bin + */ + long ndv(); +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 4fe01ac76078d..0d7ce1388db4f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -18,8 +18,8 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation} -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, SortOrder} -import org.apache.spark.sql.catalyst.plans.logical.{ExposesMetadataColumns, LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Expression, SortOrder} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, ExposesMetadataColumns, Histogram, HistogramBin, LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.util.{truncatedString, CharVarcharUtils} import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, MetadataColumn, SupportsMetadataColumns, Table, TableCapability} import org.apache.spark.sql.connector.read.{Scan, Statistics => V2Statistics, SupportsReportStatistics} @@ -91,7 +91,7 @@ case class DataSourceV2Relation( table.asReadable.newScanBuilder(options).build() match { case r: SupportsReportStatistics => val statistics = r.estimateStatistics() - DataSourceV2Relation.transformV2Stats(statistics, None, conf.defaultSizeInBytes) + DataSourceV2Relation.transformV2Stats(statistics, None, conf.defaultSizeInBytes, output) case _ => Statistics(sizeInBytes = conf.defaultSizeInBytes) } @@ -142,7 +142,7 @@ case class DataSourceV2ScanRelation( scan match { case r: SupportsReportStatistics => val statistics = r.estimateStatistics() - DataSourceV2Relation.transformV2Stats(statistics, None, conf.defaultSizeInBytes) + DataSourceV2Relation.transformV2Stats(statistics, None, conf.defaultSizeInBytes, output) case _ => Statistics(sizeInBytes = conf.defaultSizeInBytes) } @@ -173,7 +173,7 @@ case class StreamingDataSourceV2Relation( override def computeStats(): Statistics = scan match { case r: SupportsReportStatistics => val statistics = r.estimateStatistics() - DataSourceV2Relation.transformV2Stats(statistics, None, conf.defaultSizeInBytes) + DataSourceV2Relation.transformV2Stats(statistics, None, conf.defaultSizeInBytes, output) case _ => Statistics(sizeInBytes = conf.defaultSizeInBytes) } @@ -214,14 +214,52 @@ object DataSourceV2Relation { def transformV2Stats( v2Statistics: V2Statistics, defaultRowCount: Option[BigInt], - defaultSizeInBytes: Long): Statistics = { + defaultSizeInBytes: Long, + output: Seq[Attribute] = Seq.empty): Statistics = { val numRows: Option[BigInt] = if (v2Statistics.numRows().isPresent) { Some(v2Statistics.numRows().getAsLong) } else { defaultRowCount } + + var colStats: Seq[(Attribute, ColumnStat)] = Seq.empty[(Attribute, ColumnStat)] + if (v2Statistics.columnStats().isPresent) { + val v2ColumnStat = v2Statistics.columnStats().get() + val keys = v2ColumnStat.keySet() + + keys.forEach(key => { + val colStat = v2ColumnStat.get(key) + val distinct: Option[BigInt] = + if (colStat.distinctCount().isPresent) Some(colStat.distinctCount().getAsLong) else None + val min: Option[Any] = if (colStat.min().isPresent) Some(colStat.min().get) else None + val max: Option[Any] = if (colStat.max().isPresent) Some(colStat.max().get) else None + val nullCount: Option[BigInt] = + if (colStat.nullCount().isPresent) Some(colStat.nullCount().getAsLong) else None + val avgLen: Option[Long] = + if (colStat.avgLen().isPresent) Some(colStat.avgLen().getAsLong) else None + val maxLen: Option[Long] = + if (colStat.maxLen().isPresent) Some(colStat.maxLen().getAsLong) else None + val histogram = if (colStat.histogram().isPresent) { + val v2Histogram = colStat.histogram().get() + val bins = v2Histogram.bins() + Some(Histogram(v2Histogram.height(), + bins.map(bin => HistogramBin(bin.lo, bin.hi, bin.ndv)))) + } else { + None + } + + val catalystColStat = ColumnStat(distinct, min, max, nullCount, avgLen, maxLen, histogram) + + output.foreach(attribute => { + if (attribute.name.equals(key.describe())) { + colStats = colStats :+ (attribute -> catalystColStat) + } + }) + }) + } Statistics( sizeInBytes = v2Statistics.sizeInBytes().orElse(defaultSizeInBytes), - rowCount = numRows) + rowCount = numRows, + attributeStats = AttributeMap(colStats)) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala index 7da6c1480e079..dd255290f3d8d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.connector.catalog import java.time.{Instant, ZoneId} import java.time.temporal.ChronoUnit import java.util -import java.util.OptionalLong +import java.util.{Optional, OptionalLong} import scala.collection.mutable @@ -33,6 +33,7 @@ import org.apache.spark.sql.connector.distributions.{Distribution, Distributions import org.apache.spark.sql.connector.expressions._ import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric} import org.apache.spark.sql.connector.read._ +import org.apache.spark.sql.connector.read.colstats.{ColumnStatistics, Histogram, HistogramBin} import org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.connector.write._ import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite} @@ -273,7 +274,19 @@ abstract class InMemoryBaseTable( } } - case class InMemoryStats(sizeInBytes: OptionalLong, numRows: OptionalLong) extends Statistics + case class InMemoryStats( + sizeInBytes: OptionalLong, + numRows: OptionalLong, + override val columnStats: Optional[util.Map[NamedReference, ColumnStatistics]]) + extends Statistics + + case class InMemoryColumnStats( + override val distinctCount: OptionalLong, + override val nullCount: OptionalLong) extends ColumnStatistics + + case class InMemoryHistogramBin(lo: Double, hi: Double, ndv: Long) extends HistogramBin + + case class InMemoryHistogram(height: Double, bins: Array[HistogramBin]) extends Histogram abstract class BatchScanBaseClass( var data: Seq[InputPartition], @@ -285,7 +298,7 @@ abstract class InMemoryBaseTable( override def estimateStatistics(): Statistics = { if (data.isEmpty) { - return InMemoryStats(OptionalLong.of(0L), OptionalLong.of(0L)) + return InMemoryStats(OptionalLong.of(0L), OptionalLong.of(0L), Optional.empty()) } val inputPartitions = data.map(_.asInstanceOf[BufferedRows]) @@ -294,7 +307,39 @@ abstract class InMemoryBaseTable( val objectHeaderSizeInBytes = 12L val rowSizeInBytes = objectHeaderSizeInBytes + schema.defaultSize val sizeInBytes = numRows * rowSizeInBytes - InMemoryStats(OptionalLong.of(sizeInBytes), OptionalLong.of(numRows)) + + val numOfCols = tableSchema.fields.length + val dataTypes = tableSchema.fields.map(_.dataType) + val colValueSets = new Array[util.HashSet[Object]](numOfCols) + val numOfNulls = new Array[Long](numOfCols) + for (i <- 0 until numOfCols) { + colValueSets(i) = new util.HashSet[Object] + } + + inputPartitions.foreach(inputPartition => + inputPartition.rows.foreach(row => + for (i <- 0 until numOfCols) { + colValueSets(i).add(row.get(i, dataTypes(i))) + if (row.isNullAt(i)) { + numOfNulls(i) += 1 + } + } + ) + ) + + val map = new util.HashMap[NamedReference, ColumnStatistics]() + val colNames = tableSchema.fields.map(_.name) + var i = 0 + for (col <- colNames) { + val fieldReference = FieldReference(col) + val colStats = InMemoryColumnStats( + OptionalLong.of(colValueSets(i).size()), + OptionalLong.of(numOfNulls(i))) + map.put(fieldReference, colStats) + i = i + 1 + } + + InMemoryStats(OptionalLong.of(sizeInBytes), OptionalLong.of(numRows), Optional.of(map)) } override def outputPartitioning(): Partitioning = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 43c1d5e43bd6e..d99c170fae579 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAM import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership import org.apache.spark.sql.errors.QueryErrorsBase import org.apache.spark.sql.execution.columnar.InMemoryRelation +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode, V2_SESSION_CATALOG_IMPLEMENTATION} @@ -2772,6 +2773,26 @@ class DataSourceV2SQLSuiteV1Filter } } + test("SPARK-41378: test column stats") { + spark.sql("CREATE TABLE testcat.test (id bigint NOT NULL, data string)") + spark.sql("INSERT INTO testcat.test values (1, 'test1'), (2, null), (3, null)," + + " (4, null), (5, 'test5')") + val df = spark.sql("select * from testcat.test") + + df.queryExecution.optimizedPlan.collect { + case scan: DataSourceV2ScanRelation => + val stats = scan.stats + assert(stats.sizeInBytes == 200) + assert(stats.rowCount.get == 5) + val colStats = stats.attributeStats.values.toArray + assert(colStats.length == 2) + assert(colStats(0).distinctCount.get == 3) + assert(colStats(0).nullCount.get == 3) + assert(colStats(1).distinctCount.get == 5) + assert(colStats(1).nullCount.get == 0) + } + } + private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = { checkError( exception = intercept[AnalysisException] {