From 307e115499bcdcf6aa386d0fb0d490a05c0a79bf Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Mon, 10 Jan 2022 10:20:45 +0800 Subject: [PATCH 1/4] Support propagate empty relation through aggregate/union --- .../optimizer/PropagateEmptyRelation.scala | 80 ++++++++----------- .../adaptive/AQEPropagateEmptyRelation.scala | 11 +++ .../adaptive/AdaptiveQueryExecSuite.scala | 63 ++++++++++++++- 3 files changed, 106 insertions(+), 48 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index 6ad0793fb642..7e13e099dff3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -28,14 +28,17 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, TRUE_OR_ /** * The base class of two rules in the normal and AQE Optimizer. It simplifies query plans with * empty or non-empty relations: - * 1. Binary-node Logical Plans + * 1. Higher-node Logical Plans + * - Union with all empty children. + * 2. Binary-node Logical Plans * - Join with one or two empty children (including Intersect/Except). * - Left semi Join * Right side is non-empty and condition is empty. Eliminate join to its left side. * - Left anti join * Right side is non-empty and condition is empty. Eliminate join to an empty * [[LocalRelation]]. - * 2. Unary-node Logical Plans + * 3. Unary-node Logical Plans + * - Project/Filter/Sample with all empty children. * - Limit/Repartition with all empty children. * - Aggregate with all empty children and at least one grouping expression. * - Generate(Explode) with all empty children. Others like Hive UDTF may return results. @@ -59,6 +62,27 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup plan.output.map{ a => Alias(cast(Literal(null), a.dataType), a.name)(a.exprId) } protected def commonApplyFunc: PartialFunction[LogicalPlan, LogicalPlan] = { + case p: Union if p.children.exists(isEmpty) => + val newChildren = p.children.filterNot(isEmpty) + if (newChildren.isEmpty) { + empty(p) + } else { + val newPlan = if (newChildren.size > 1) Union(newChildren) else newChildren.head + val outputs = newPlan.output.zip(p.output) + // the original Union may produce different output attributes than the new one so we alias + // them if needed + if (outputs.forall { case (newAttr, oldAttr) => newAttr.exprId == oldAttr.exprId }) { + newPlan + } else { + val outputAliases = outputs.map { case (newAttr, oldAttr) => + val newExplicitMetadata = + if (oldAttr.metadata != newAttr.metadata) Some(oldAttr.metadata) else None + Alias(newAttr, oldAttr.name)(explicitMetadata = newExplicitMetadata) + } + Project(outputAliases, newPlan) + } + } + // Joins on empty LocalRelations generated from streaming sources are not eliminated // as stateful streaming joins need to perform other state management operations other than // just processing the input data. @@ -98,7 +122,13 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup p } + // the only case can be matched here is that LogicalQueryStage is empty + case p: LeafNode if !p.isInstanceOf[LocalRelation] && isEmpty(p) => empty(p) + case p: UnaryNode if p.children.nonEmpty && p.children.forall(isEmpty) => p match { + case _: Project => empty(p) + case _: Filter => empty(p) + case _: Sample => empty(p) case _: Sort => empty(p) case _: GlobalLimit if !p.isStreaming => empty(p) case _: LocalLimit if !p.isStreaming => empty(p) @@ -128,53 +158,11 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup } /** - * This rule runs in the normal optimizer and optimizes more cases - * compared to [[PropagateEmptyRelationBase]]: - * 1. Higher-node Logical Plans - * - Union with all empty children. - * 2. Unary-node Logical Plans - * - Project/Filter/Sample with all empty children. - * - * The reason why we don't apply this rule at AQE optimizer side is: the benefit is not big enough - * and it may introduce extra exchanges. + * This rule runs in the normal optimizer */ object PropagateEmptyRelation extends PropagateEmptyRelationBase { - private def applyFunc: PartialFunction[LogicalPlan, LogicalPlan] = { - case p: Union if p.children.exists(isEmpty) => - val newChildren = p.children.filterNot(isEmpty) - if (newChildren.isEmpty) { - empty(p) - } else { - val newPlan = if (newChildren.size > 1) Union(newChildren) else newChildren.head - val outputs = newPlan.output.zip(p.output) - // the original Union may produce different output attributes than the new one so we alias - // them if needed - if (outputs.forall { case (newAttr, oldAttr) => newAttr.exprId == oldAttr.exprId }) { - newPlan - } else { - val outputAliases = outputs.map { case (newAttr, oldAttr) => - val newExplicitMetadata = - if (oldAttr.metadata != newAttr.metadata) Some(oldAttr.metadata) else None - Alias(newAttr, oldAttr.name)(oldAttr.exprId, explicitMetadata = newExplicitMetadata) - } - Project(outputAliases, newPlan) - } - } - - case p: UnaryNode if p.children.nonEmpty && p.children.forall(isEmpty) && canPropagate(p) => - empty(p) - } - - // extract the pattern avoid conflict with commonApplyFunc - private def canPropagate(plan: LogicalPlan): Boolean = plan match { - case _: Project => true - case _: Filter => true - case _: Sample => true - case _ => false - } - override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning( _.containsAnyPattern(LOCAL_RELATION, TRUE_OR_FALSE_LITERAL), ruleId) { - applyFunc.orElse(commonApplyFunc) + commonApplyFunc } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala index ea2fb1c3130a..6233fd489f35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelationBase import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, LOGICAL_QUERY_STAGE, TRUE_OR_FALSE_LITERAL} +import org.apache.spark.sql.execution.aggregate.BaseAggregateExec import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys /** @@ -40,6 +41,16 @@ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase { private def getRowCount(plan: LogicalPlan): Option[BigInt] = plan match { case LogicalQueryStage(_, stage: QueryStageExec) if stage.isMaterialized => stage.getRuntimeStatistics.rowCount + + case LogicalQueryStage(_, agg: BaseAggregateExec) if agg.groupingExpressions.nonEmpty && + agg.child.isInstanceOf[QueryStageExec] => + val stage = agg.child.asInstanceOf[QueryStageExec] + if (stage.isMaterialized) { + stage.getRuntimeStatistics.rowCount + } else { + None + } + case _ => None } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 51d476703a76..92b9a8a9eee8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} import org.apache.spark.sql.execution.{CollectLimitExec, CommandResultExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, UnaryExecNode, UnionExec} +import org.apache.spark.sql.execution.aggregate.BaseAggregateExec import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.datasources.noop.NoopDataSource import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec @@ -120,6 +121,12 @@ class AdaptiveQueryExecSuite } } + private def findTopLevelBaeAggregate(plan: SparkPlan): Seq[BaseAggregateExec] = { + collect(plan) { + case agg: BaseAggregateExec => agg + } + } + private def findTopLevelSort(plan: SparkPlan): Seq[SortExec] = { collect(plan) { case s: SortExec => s @@ -1406,6 +1413,56 @@ class AdaptiveQueryExecSuite } } + test("SPARK-35442: Support propagate empty relation through aggregate") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + val (plan1, adaptivePlan1) = runAdaptiveAndVerifyResult( + "SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key") + assert(findTopLevelBaeAggregate(plan1).size == 2) + assert(stripAQEPlan(adaptivePlan1).isInstanceOf[LocalTableScanExec]) + + val (plan2, adaptivePlan2) = runAdaptiveAndVerifyResult( + "SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key limit 1") + assert(findTopLevelBaeAggregate(plan2).size == 2) + assert(stripAQEPlan(adaptivePlan2).isInstanceOf[LocalTableScanExec]) + + val (plan3, adaptivePlan3) = runAdaptiveAndVerifyResult( + "SELECT count(*) FROM testData WHERE value = 'no_match'") + assert(findTopLevelBaeAggregate(plan3).size == 2) + assert(findTopLevelBaeAggregate(adaptivePlan3).size == 2) + } + } + + test("SPARK-35442: Support propagate empty relation through union") { + def checkNumUnion(plan: SparkPlan, numUnion: Int): Unit = { + assert( + collect(plan) { + case u: UnionExec => u + }.size == numUnion) + } + + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + val (plan1, adaptivePlan1) = runAdaptiveAndVerifyResult( + """ + |SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key + |UNION ALL + |SELECT key, 1 FROM testData + |""".stripMargin) + checkNumUnion(plan1, 1) + checkNumUnion(adaptivePlan1, 0) + assert(!stripAQEPlan(adaptivePlan1).isInstanceOf[LocalTableScanExec]) + + val (plan2, adaptivePlan2) = runAdaptiveAndVerifyResult( + """ + |SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key + |UNION ALL + |SELECT /*+ REPARTITION */ key, 1 FROM testData WHERE value = 'no_match' + |""".stripMargin) + checkNumUnion(plan2, 1) + checkNumUnion(adaptivePlan2, 0) + assert(stripAQEPlan(adaptivePlan2).isInstanceOf[LocalTableScanExec]) + } + } + test("SPARK-32753: Only copy tags to node with no tags") { withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { withTempView("v1") { @@ -1794,7 +1851,8 @@ class AdaptiveQueryExecSuite test("SPARK-35239: Coalesce shuffle partition should handle empty input RDD") { withTable("t") { withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1", - SQLConf.SHUFFLE_PARTITIONS.key -> "2") { + SQLConf.SHUFFLE_PARTITIONS.key -> "2", + SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName) { spark.sql("CREATE TABLE t (c1 int) USING PARQUET") val (_, adaptive) = runAdaptiveAndVerifyResult("SELECT c1, count(*) FROM t GROUP BY c1") assert( @@ -2261,7 +2319,8 @@ class AdaptiveQueryExecSuite test("SPARK-37742: AQE reads invalid InMemoryRelation stats and mistakenly plans BHJ") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1048584") { + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1048584", + SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName) { // Spark estimates a string column as 20 bytes so with 60k rows, these relations should be // estimated at ~120m bytes which is greater than the broadcast join threshold. val joinKeyOne = "00112233445566778899" From ee1bc1afab1f777d67c356f7e85d518f5f2052ba Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Mon, 10 Jan 2022 11:08:17 +0800 Subject: [PATCH 2/4] address comment --- .../sql/execution/adaptive/AQEPropagateEmptyRelation.scala | 6 +++--- .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 4 ++++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala index 6233fd489f35..6763fecdc644 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala @@ -33,12 +33,12 @@ import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys */ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase { override protected def isEmpty(plan: LogicalPlan): Boolean = - super.isEmpty(plan) || getRowCount(plan).contains(0) + super.isEmpty(plan) || getEstimatedRowCount(plan).contains(0) override protected def nonEmpty(plan: LogicalPlan): Boolean = - super.nonEmpty(plan) || getRowCount(plan).exists(_ > 0) + super.nonEmpty(plan) || getEstimatedRowCount(plan).exists(_ > 0) - private def getRowCount(plan: LogicalPlan): Option[BigInt] = plan match { + private def getEstimatedRowCount(plan: LogicalPlan): Option[BigInt] = plan match { case LogicalQueryStage(_, stage: QueryStageExec) if stage.isMaterialized => stage.getRuntimeStatistics.rowCount diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 92b9a8a9eee8..9ae2e90f80ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1418,17 +1418,21 @@ class AdaptiveQueryExecSuite val (plan1, adaptivePlan1) = runAdaptiveAndVerifyResult( "SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key") assert(findTopLevelBaeAggregate(plan1).size == 2) + assert(!plan1.isInstanceOf[LocalTableScanExec]) assert(stripAQEPlan(adaptivePlan1).isInstanceOf[LocalTableScanExec]) val (plan2, adaptivePlan2) = runAdaptiveAndVerifyResult( "SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key limit 1") assert(findTopLevelBaeAggregate(plan2).size == 2) + assert(!plan2.isInstanceOf[LocalTableScanExec]) assert(stripAQEPlan(adaptivePlan2).isInstanceOf[LocalTableScanExec]) val (plan3, adaptivePlan3) = runAdaptiveAndVerifyResult( "SELECT count(*) FROM testData WHERE value = 'no_match'") assert(findTopLevelBaeAggregate(plan3).size == 2) + assert(!plan3.isInstanceOf[LocalTableScanExec]) assert(findTopLevelBaeAggregate(adaptivePlan3).size == 2) + assert(!stripAQEPlan(adaptivePlan3).isInstanceOf[LocalTableScanExec]) } } From 86329f01fcd3cf27fa2eb3907febdd263d79513d Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Mon, 10 Jan 2022 11:18:42 +0800 Subject: [PATCH 3/4] fix --- .../optimizer/PropagateEmptyRelation.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index 7e13e099dff3..d02f12d67e19 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -74,12 +74,16 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup if (outputs.forall { case (newAttr, oldAttr) => newAttr.exprId == oldAttr.exprId }) { newPlan } else { - val outputAliases = outputs.map { case (newAttr, oldAttr) => - val newExplicitMetadata = - if (oldAttr.metadata != newAttr.metadata) Some(oldAttr.metadata) else None - Alias(newAttr, oldAttr.name)(explicitMetadata = newExplicitMetadata) + val newOutput = outputs.map { case (newAttr, oldAttr) => + if (newAttr.exprId == oldAttr.exprId) { + newAttr + } else { + val newExplicitMetadata = + if (oldAttr.metadata != newAttr.metadata) Some(oldAttr.metadata) else None + Alias(newAttr, oldAttr.name)(oldAttr.exprId, explicitMetadata = newExplicitMetadata) + } } - Project(outputAliases, newPlan) + Project(newOutput, newPlan) } } From a2fa0c7c196cc9d1bfafd7c384ad9b0ed9a06c55 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Mon, 10 Jan 2022 17:50:17 +0800 Subject: [PATCH 4/4] address comment --- .../adaptive/AQEPropagateEmptyRelation.scala | 4 ++++ .../execution/adaptive/AdaptiveQueryExecSuite.scala | 11 ----------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala index 6763fecdc644..bab77515f79a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala @@ -38,6 +38,10 @@ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase { override protected def nonEmpty(plan: LogicalPlan): Boolean = super.nonEmpty(plan) || getEstimatedRowCount(plan).exists(_ > 0) + // The returned value follows: + // - 0 means the plan must produce 0 row + // - positive value means an estimated row count which can be over-estimated + // - none means the plan has not materialized or the plan can not be estimated private def getEstimatedRowCount(plan: LogicalPlan): Option[BigInt] = plan match { case LogicalQueryStage(_, stage: QueryStageExec) if stage.isMaterialized => stage.getRuntimeStatistics.rowCount diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 9ae2e90f80ae..a29989cc06c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -28,7 +28,6 @@ import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} import org.apache.spark.sql.execution.{CollectLimitExec, CommandResultExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, UnaryExecNode, UnionExec} -import org.apache.spark.sql.execution.aggregate.BaseAggregateExec import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.datasources.noop.NoopDataSource import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec @@ -121,12 +120,6 @@ class AdaptiveQueryExecSuite } } - private def findTopLevelBaeAggregate(plan: SparkPlan): Seq[BaseAggregateExec] = { - collect(plan) { - case agg: BaseAggregateExec => agg - } - } - private def findTopLevelSort(plan: SparkPlan): Seq[SortExec] = { collect(plan) { case s: SortExec => s @@ -1417,21 +1410,17 @@ class AdaptiveQueryExecSuite withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { val (plan1, adaptivePlan1) = runAdaptiveAndVerifyResult( "SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key") - assert(findTopLevelBaeAggregate(plan1).size == 2) assert(!plan1.isInstanceOf[LocalTableScanExec]) assert(stripAQEPlan(adaptivePlan1).isInstanceOf[LocalTableScanExec]) val (plan2, adaptivePlan2) = runAdaptiveAndVerifyResult( "SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key limit 1") - assert(findTopLevelBaeAggregate(plan2).size == 2) assert(!plan2.isInstanceOf[LocalTableScanExec]) assert(stripAQEPlan(adaptivePlan2).isInstanceOf[LocalTableScanExec]) val (plan3, adaptivePlan3) = runAdaptiveAndVerifyResult( "SELECT count(*) FROM testData WHERE value = 'no_match'") - assert(findTopLevelBaeAggregate(plan3).size == 2) assert(!plan3.isInstanceOf[LocalTableScanExec]) - assert(findTopLevelBaeAggregate(adaptivePlan3).size == 2) assert(!stripAQEPlan(adaptivePlan3).isInstanceOf[LocalTableScanExec]) } }