-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-10735] [SQL] Generate aggregation w/o grouping keys [WIP] #10786
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Conflicts: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
Conflicts: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
Conflicts: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
|
Test build #49526 has finished for PR 10786 at commit
|
Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
|
Test build #49537 has finished for PR 10786 at commit
|
|
Test build #49540 has finished for PR 10786 at commit
|
|
Test build #49546 has finished for PR 10786 at commit
|
|
Test build #49548 has finished for PR 10786 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Glad to see 5x speed-up! +1 on switching to declarative on stddev (and other 2nd-order statistics). But for skewness and kurtosis, we still need to benchmark the performance to decide because their expressions are more complex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've always preferred declarative aggregate because its much easier to optimize (very cool the kind of speed ups you are getting!). As such, I'd support having all of our built in functions done this way.
@mengxr argues that its too confusing for users and that we should also support the imperative one. How high cost is this for us?
As discussed in #10786, the generated TungstenAggregate does not support imperative functions. For a query ``` sqlContext.range(10).filter("id > 1").groupBy().count() ``` The generated code will looks like: ``` /* 032 */ if (!initAgg0) { /* 033 */ initAgg0 = true; /* 034 */ /* 035 */ // initialize aggregation buffer /* 037 */ long bufValue2 = 0L; /* 038 */ /* 039 */ /* 040 */ // initialize Range /* 041 */ if (!range_initRange5) { /* 042 */ range_initRange5 = true; ... /* 071 */ } /* 072 */ /* 073 */ while (!range_overflow8 && range_number7 < range_partitionEnd6) { /* 074 */ long range_value9 = range_number7; /* 075 */ range_number7 += 1L; /* 076 */ if (range_number7 < range_value9 ^ 1L < 0) { /* 077 */ range_overflow8 = true; /* 078 */ } /* 079 */ /* 085 */ boolean primitive11 = false; /* 086 */ primitive11 = range_value9 > 1L; /* 087 */ if (!false && primitive11) { /* 092 */ // do aggregate and update aggregation buffer /* 099 */ long primitive17 = -1L; /* 100 */ primitive17 = bufValue2 + 1L; /* 101 */ bufValue2 = primitive17; /* 105 */ } /* 107 */ } /* 109 */ /* 110 */ // output the result /* 112 */ bufferHolder25.reset(); /* 114 */ rowWriter26.initialize(bufferHolder25, 1); /* 118 */ rowWriter26.write(0, bufValue2); /* 120 */ result24.pointTo(bufferHolder25.buffer, bufferHolder25.totalSize()); /* 121 */ currentRow = result24; /* 122 */ return; /* 124 */ } /* 125 */ ``` cc nongli Author: Davies Liu <[email protected]> Closes #10840 from davies/gen_agg.
|
This PR will be replaced by multiple small PRs. |
The benchmark show that generated aggregation could be 6X time faster than non-generated (with generated filter/range).
It also showed that declarative function is much faster than imperative one (5X faster for stddev), we may need to switch to a declarative version. (the difference need to be measured when there are grouping keys).