Skip to content

Commit 63b7f12

Browse files
dongjoon-hyuncloud-fan
authored andcommitted
[SPARK-15076][SQL] Add ReorderAssociativeOperator optimizer
## What changes were proposed in this pull request? This issue add a new optimizer `ReorderAssociativeOperator` by taking advantage of integral associative property. Currently, Spark works like the following. 1) Can optimize `1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + a` into `45 + a`. 2) Cannot optimize `a + 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9`. This PR can handle Case 2 for **Add/Multiply** expression whose data types are `ByteType`, `ShortType`, `IntegerType`, and `LongType`. The followings are the plan comparison between `before` and `after` this issue. **Before** ```scala scala> sql("select a+1+2+3+4+5+6+7+8+9 from (select explode(array(1)) a)").explain == Physical Plan == WholeStageCodegen : +- Project [(((((((((a#7 + 1) + 2) + 3) + 4) + 5) + 6) + 7) + 8) + 9) AS (((((((((a + 1) + 2) + 3) + 4) + 5) + 6) + 7) + 8) + 9)alteryx#8] : +- INPUT +- Generate explode([1]), false, false, [a#7] +- Scan OneRowRelation[] scala> sql("select a*1*2*3*4*5*6*7*8*9 from (select explode(array(1)) a)").explain == Physical Plan == *Project [(((((((((a#18 * 1) * 2) * 3) * 4) * 5) * 6) * 7) * 8) * 9) AS (((((((((a * 1) * 2) * 3) * 4) * 5) * 6) * 7) * 8) * 9)alteryx#19] +- Generate explode([1]), false, false, [a#18] +- Scan OneRowRelation[] ``` **After** ```scala scala> sql("select a+1+2+3+4+5+6+7+8+9 from (select explode(array(1)) a)").explain == Physical Plan == WholeStageCodegen : +- Project [(a#7 + 45) AS (((((((((a + 1) + 2) + 3) + 4) + 5) + 6) + 7) + 8) + 9)alteryx#8] : +- INPUT +- Generate explode([1]), false, false, [a#7] +- Scan OneRowRelation[] scala> sql("select a*1*2*3*4*5*6*7*8*9 from (select explode(array(1)) a)").explain == Physical Plan == *Project [(a#18 * 362880) AS (((((((((a * 1) * 2) * 3) * 4) * 5) * 6) * 7) * 8) * 9)alteryx#19] +- Generate explode([1]), false, false, [a#18] +- Scan OneRowRelation[] ``` This PR is greatly generalized by cloud-fan 's key ideas; he should be credited for the work he did. ## How was this patch tested? Pass the Jenkins tests including new testsuite. Author: Dongjoon Hyun <[email protected]> Closes apache#12850 from dongjoon-hyun/SPARK-15076.
1 parent 252417f commit 63b7f12

File tree

2 files changed

+102
-0
lines changed

2 files changed

+102
-0
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
9494
FoldablePropagation,
9595
OptimizeIn(conf),
9696
ConstantFolding,
97+
ReorderAssociativeOperator,
9798
LikeSimplification,
9899
BooleanSimplification,
99100
SimplifyConditionals,
@@ -737,6 +738,44 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe
737738
}
738739
}
739740

741+
/**
742+
* Reorder associative integral-type operators and fold all constants into one.
743+
*/
744+
object ReorderAssociativeOperator extends Rule[LogicalPlan] {
745+
private def flattenAdd(e: Expression): Seq[Expression] = e match {
746+
case Add(l, r) => flattenAdd(l) ++ flattenAdd(r)
747+
case other => other :: Nil
748+
}
749+
750+
private def flattenMultiply(e: Expression): Seq[Expression] = e match {
751+
case Multiply(l, r) => flattenMultiply(l) ++ flattenMultiply(r)
752+
case other => other :: Nil
753+
}
754+
755+
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
756+
case q: LogicalPlan => q transformExpressionsDown {
757+
case a: Add if a.deterministic && a.dataType.isInstanceOf[IntegralType] =>
758+
val (foldables, others) = flattenAdd(a).partition(_.foldable)
759+
if (foldables.size > 1) {
760+
val foldableExpr = foldables.reduce((x, y) => Add(x, y))
761+
val c = Literal.create(foldableExpr.eval(EmptyRow), a.dataType)
762+
if (others.isEmpty) c else Add(others.reduce((x, y) => Add(x, y)), c)
763+
} else {
764+
a
765+
}
766+
case m: Multiply if m.deterministic && m.dataType.isInstanceOf[IntegralType] =>
767+
val (foldables, others) = flattenMultiply(m).partition(_.foldable)
768+
if (foldables.size > 1) {
769+
val foldableExpr = foldables.reduce((x, y) => Multiply(x, y))
770+
val c = Literal.create(foldableExpr.eval(EmptyRow), m.dataType)
771+
if (others.isEmpty) c else Multiply(others.reduce((x, y) => Multiply(x, y)), c)
772+
} else {
773+
m
774+
}
775+
}
776+
}
777+
}
778+
740779
/**
741780
* Replaces [[Expression Expressions]] that can be statically evaluated with
742781
* equivalent [[Literal]] values.
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.optimizer
19+
20+
import org.apache.spark.sql.catalyst.dsl.expressions._
21+
import org.apache.spark.sql.catalyst.dsl.plans._
22+
import org.apache.spark.sql.catalyst.expressions._
23+
import org.apache.spark.sql.catalyst.plans.PlanTest
24+
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
25+
import org.apache.spark.sql.catalyst.rules.RuleExecutor
26+
27+
class ReorderAssociativeOperatorSuite extends PlanTest {
28+
29+
object Optimize extends RuleExecutor[LogicalPlan] {
30+
val batches =
31+
Batch("ReorderAssociativeOperator", Once,
32+
ReorderAssociativeOperator) :: Nil
33+
}
34+
35+
val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
36+
37+
test("Reorder associative operators") {
38+
val originalQuery =
39+
testRelation
40+
.select(
41+
(Literal(3) + ((Literal(1) + 'a) + 2)) + 4,
42+
'b * 1 * 2 * 3 * 4,
43+
('b + 1) * 2 * 3 * 4,
44+
'a + 1 + 'b + 2 + 'c + 3,
45+
'a + 1 + 'b * 2 + 'c + 3,
46+
Rand(0) * 1 * 2 * 3 * 4)
47+
48+
val optimized = Optimize.execute(originalQuery.analyze)
49+
50+
val correctAnswer =
51+
testRelation
52+
.select(
53+
('a + 10).as("((3 + ((1 + a) + 2)) + 4)"),
54+
('b * 24).as("((((b * 1) * 2) * 3) * 4)"),
55+
(('b + 1) * 24).as("((((b + 1) * 2) * 3) * 4)"),
56+
('a + 'b + 'c + 6).as("(((((a + 1) + b) + 2) + c) + 3)"),
57+
('a + 'b * 2 + 'c + 4).as("((((a + 1) + (b * 2)) + c) + 3)"),
58+
Rand(0) * 1 * 2 * 3 * 4)
59+
.analyze
60+
61+
comparePlans(optimized, correctAnswer)
62+
}
63+
}

0 commit comments

Comments
 (0)