From dc9ca8dbb011de6d28f2fd78571269810ab5324a Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Wed, 13 Jul 2022 21:29:45 +0900 Subject: [PATCH 1/5] [SPARK-39748][SQL][SS][FOLLOWUP] Fix a bug on column stat in LogicalRDD on mismatching exprIDs --- .../spark/sql/execution/ExistingRDD.scala | 12 +- .../streaming/sources/ForeachBatchSink.scala | 7 +- .../org/apache/spark/sql/DataFrameSuite.scala | 109 +++++++++++++++++- 3 files changed, 122 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index bf9ef6991e3e..f4c716e9d699 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -116,10 +116,20 @@ case class LogicalRDD( case e: Attribute => rewrite.getOrElse(e, e) }.asInstanceOf[SortOrder]) + val rewrittenOriginLogicalPlan = originLogicalPlan.map { plan => + val newPlan = plan.transformAllExpressions { + case e: Attribute => rewrite.getOrElse(e, e) + } + // It's not feasible to transform column stats directly for all logical nodes. Instead, we + // invalidate the cache so that the column stats are recalculated with new expression IDs. + newPlan.invalidateStatsCache() + newPlan + } + LogicalRDD( output.map(rewrite), rdd, - originLogicalPlan, + rewrittenOriginLogicalPlan, rewrittenPartitioning, rewrittenOrdering, isStreaming diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSink.scala index 1c6bca241af4..395ed056be28 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSink.scala @@ -30,10 +30,13 @@ class ForeachBatchSink[T](batchWriter: (Dataset[T], Long) => Unit, encoder: Expr override def addBatch(batchId: Long, data: DataFrame): Unit = { val rdd = data.queryExecution.toRdd val executedPlan = data.queryExecution.executedPlan + val analyzedPlanWithoutMarkerNode = eliminateWriteMarkerNode(data.queryExecution.analyzed) + // assertion on precondition + assert(data.logicalPlan.output == analyzedPlanWithoutMarkerNode.output) val node = LogicalRDD( - data.schema.toAttributes, + data.logicalPlan.output, rdd, - Some(eliminateWriteMarkerNode(data.queryExecution.analyzed)), + Some(analyzedPlanWithoutMarkerNode), executedPlan.outputPartitioning, executedPlan.outputOrdering)(data.sparkSession) implicit val enc = encoder diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 41593c701a7f..e6de2d59f1e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -32,13 +32,14 @@ import org.scalatest.matchers.should.Matchers._ import org.apache.spark.SparkException import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} -import org.apache.spark.sql.catalyst.expressions.Uuid +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Uuid} import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, OneRowRelation} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LeafNode, LocalRelation, LogicalPlan, OneRowRelation, Statistics} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.connector.FakeV2Provider -import org.apache.spark.sql.execution.{FilterExec, QueryExecution, WholeStageCodegenExec} +import org.apache.spark.sql.execution.{FilterExec, LogicalRDD, QueryExecution, WholeStageCodegenExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec} @@ -2010,6 +2011,64 @@ class DataFrameSuite extends QueryTest } } + test("SPARK-39748: build the stats for LogicalRDD based on originLogicalPlan") { + def buildExpectedColumnStats(attrs: Seq[Attribute]): AttributeMap[ColumnStat] = { + AttributeMap( + attrs.map { + case attr if attr.dataType == BooleanType => + attr -> ColumnStat( + distinctCount = Some(2), + min = Some(false), + max = Some(true), + nullCount = Some(0), + avgLen = Some(1), + maxLen = Some(1)) + + case attr if attr.dataType == ByteType => + attr -> ColumnStat( + distinctCount = Some(2), + min = Some(1), + max = Some(2), + nullCount = Some(0), + avgLen = Some(1), + maxLen = Some(1)) + + case attr => attr -> ColumnStat() + } + ) + } + + val outputList = Seq( + AttributeReference("cbool", BooleanType)(), + AttributeReference("cbyte", BooleanType)() + ) + + val expectedSize = 16 + val statsPlan = OutputListAwareStatsTestPlan( + outputList = outputList, + rowCount = 2, + size = Some(expectedSize)) + + val df = Dataset.ofRows(spark, statsPlan) + + val logicalRDD = LogicalRDD( + df.logicalPlan.output, spark.sparkContext.emptyRDD, Some(df.queryExecution.analyzed), + isStreaming = true)(spark) + + val stats = logicalRDD.computeStats() + val expectedStats = Statistics(sizeInBytes = expectedSize, rowCount = Some(2), + attributeStats = buildExpectedColumnStats(logicalRDD.output)) + assert(stats === expectedStats) + + // This method re-issues expression IDs for all outputs. We expect column stats to be + // reflected as well. + val newLogicalRDD = logicalRDD.newInstance() + val newStats = newLogicalRDD.computeStats() + val newExpectedStats = Statistics(sizeInBytes = expectedSize, rowCount = Some(2), + attributeStats = buildExpectedColumnStats(newLogicalRDD.output)) + assert(newStats === newExpectedStats) + } + test("SPARK-10656: completely support special chars") { val df = Seq(1 -> "a").toDF("i_$.a", "d^'a.") checkAnswer(df.select(df("*")), Row(1, "a")) @@ -3249,3 +3308,47 @@ class DataFrameSuite extends QueryTest case class GroupByKey(a: Int, b: Int) case class Bar2(s: String) + +/** + * This class is used for unit-testing. It's a logical plan whose output and stats are passed in. + */ +case class OutputListAwareStatsTestPlan( + outputList: Seq[Attribute], + rowCount: BigInt, + size: Option[BigInt] = None) extends LeafNode with MultiInstanceRelation { + override def output: Seq[Attribute] = outputList + override def computeStats(): Statistics = { + val columnInfo = outputList.map { attr => + attr.dataType match { + case BooleanType => + attr -> ColumnStat( + distinctCount = Some(2), + min = Some(false), + max = Some(true), + nullCount = Some(0), + avgLen = Some(1), + maxLen = Some(1)) + + case ByteType => + attr -> ColumnStat( + distinctCount = Some(2), + min = Some(1), + max = Some(2), + nullCount = Some(0), + avgLen = Some(1), + maxLen = Some(1)) + + case _ => + attr -> ColumnStat() + } + } + val attrStats = AttributeMap(columnInfo) + + Statistics( + // If sizeInBytes is useless in testing, we just use a fake value + sizeInBytes = size.getOrElse(Int.MaxValue), + rowCount = Some(rowCount), + attributeStats = attrStats) + } + override def newInstance(): LogicalPlan = copy(outputList = outputList.map(_.newInstance())) +} From 593181ed997496d6e55b6e99060dca37abf6c4c0 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Thu, 14 Jul 2022 08:30:26 +0900 Subject: [PATCH 2/5] reflect the suggestion from @cloud-fan --- .../spark/sql/execution/ExistingRDD.scala | 9 ++--- .../org/apache/spark/sql/DataFrameSuite.scala | 40 ++++++++++--------- 2 files changed, 25 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index f4c716e9d699..1236b6b6f6b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -117,13 +117,10 @@ case class LogicalRDD( }.asInstanceOf[SortOrder]) val rewrittenOriginLogicalPlan = originLogicalPlan.map { plan => - val newPlan = plan.transformAllExpressions { - case e: Attribute => rewrite.getOrElse(e, e) + val projectList = output.map { attr => + Alias(attr, attr.name)(exprId = rewrite.getOrElse(attr, attr).exprId) } - // It's not feasible to transform column stats directly for all logical nodes. Instead, we - // invalidate the cache so that the column stats are recalculated with new expression IDs. - newPlan.invalidateStatsCache() - newPlan + Project(projectList, plan) } LogicalRDD( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index e6de2d59f1e6..e802159f2634 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2049,24 +2049,28 @@ class DataFrameSuite extends QueryTest rowCount = 2, size = Some(expectedSize)) - val df = Dataset.ofRows(spark, statsPlan) - - val logicalRDD = LogicalRDD( - df.logicalPlan.output, spark.sparkContext.emptyRDD, Some(df.queryExecution.analyzed), - isStreaming = true)(spark) - - val stats = logicalRDD.computeStats() - val expectedStats = Statistics(sizeInBytes = expectedSize, rowCount = Some(2), - attributeStats = buildExpectedColumnStats(logicalRDD.output)) - assert(stats === expectedStats) - - // This method re-issues expression IDs for all outputs. We expect column stats to be - // reflected as well. - val newLogicalRDD = logicalRDD.newInstance() - val newStats = newLogicalRDD.computeStats() - val newExpectedStats = Statistics(sizeInBytes = expectedSize, rowCount = Some(2), - attributeStats = buildExpectedColumnStats(newLogicalRDD.output)) - assert(newStats === newExpectedStats) + withSQLConf(SQLConf.CBO_ENABLED.key -> "true") { + val df = Dataset.ofRows(spark, statsPlan) + + val logicalRDD = LogicalRDD( + df.logicalPlan.output, spark.sparkContext.emptyRDD, Some(df.queryExecution.analyzed), + isStreaming = true)(spark) + + val stats = logicalRDD.computeStats() + val expectedStats = Statistics(sizeInBytes = expectedSize, rowCount = Some(2), + attributeStats = buildExpectedColumnStats(logicalRDD.output)) + assert(stats === expectedStats) + + // This method re-issues expression IDs for all outputs. We expect column stats to be + // reflected as well. + val newLogicalRDD = logicalRDD.newInstance() + val newStats = newLogicalRDD.computeStats() + // LogicalRDD.newInstance adds projection to originLogicalPlan, which triggers estimation + // on sizeInBytes. We don't intend to check the estimated value. + val newExpectedStats = Statistics(sizeInBytes = newStats.sizeInBytes, rowCount = Some(2), + attributeStats = buildExpectedColumnStats(newLogicalRDD.output)) + assert(newStats === newExpectedStats) + } } test("SPARK-10656: completely support special chars") { From dc7935d77d4e7dc94048108dac5b1413a9c9153c Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Thu, 14 Jul 2022 15:04:48 +0900 Subject: [PATCH 3/5] reflect feedback, assert precondition --- .../scala/org/apache/spark/sql/execution/ExistingRDD.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 1236b6b6f6b7..d87514c2e54d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -98,6 +98,11 @@ case class LogicalRDD( override val isStreaming: Boolean = false)(session: SparkSession) extends LeafNode with MultiInstanceRelation { + originLogicalPlan.foreach { originPlan => + assert(output == originPlan.output, "The output columns are expected to the same for output " + + "and originLogicalPlan") + } + override protected final def otherCopyArgs: Seq[AnyRef] = session :: Nil override def newInstance(): LogicalRDD.this.type = { @@ -118,7 +123,7 @@ case class LogicalRDD( val rewrittenOriginLogicalPlan = originLogicalPlan.map { plan => val projectList = output.map { attr => - Alias(attr, attr.name)(exprId = rewrite.getOrElse(attr, attr).exprId) + Alias(attr, attr.name)(exprId = rewrite(attr).exprId) } Project(projectList, plan) } From 53ef82057422795018a986a207648773215a55c8 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Thu, 14 Jul 2022 17:14:45 +0900 Subject: [PATCH 4/5] fix - previous assertion does not work with canonicalization --- .../org/apache/spark/sql/execution/ExistingRDD.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index d87514c2e54d..149e70e56d02 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -98,11 +98,6 @@ case class LogicalRDD( override val isStreaming: Boolean = false)(session: SparkSession) extends LeafNode with MultiInstanceRelation { - originLogicalPlan.foreach { originPlan => - assert(output == originPlan.output, "The output columns are expected to the same for output " + - "and originLogicalPlan") - } - override protected final def otherCopyArgs: Seq[AnyRef] = session :: Nil override def newInstance(): LogicalRDD.this.type = { @@ -122,6 +117,9 @@ case class LogicalRDD( }.asInstanceOf[SortOrder]) val rewrittenOriginLogicalPlan = originLogicalPlan.map { plan => + assert(output == plan.output, "The output columns are expected to the same for output " + + s"and originLogicalPlan. output: $output / output in originLogicalPlan: ${plan.output}") + val projectList = output.map { attr => Alias(attr, attr.name)(exprId = rewrite(attr).exprId) } From b736c12b4c57794e602713e54ce6b02ea398dd70 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Thu, 14 Jul 2022 21:41:51 +0900 Subject: [PATCH 5/5] retrigger