-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24020][SQL] Sort-merge join inner range optimization #21109
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
ok to test |
|
Test build #89592 has finished for PR 21109 at commit
|
|
Can you please restart the build? It took 5 hours for some reason, but the "inner range join" tests completed successfully. |
|
retest this please |
|
Test build #89641 has finished for PR 21109 at commit
|
|
Can you put performance numbers w/ this pr? Also, you'd be better to add benchmark code in |
|
I added benchmark code in |
|
Test build #89934 has finished for PR 21109 at commit
|
|
Thanks for working on this. Based on the description on JIRA, I think the main cause of the bad performance is re-calculation an expensive function on matches rows. With the added benchmark, I adjust the order of conditions so the expensive UDF is put at the end of predicate. Below is the results. The first one is original benchmark. The second is the one with UDF at the end of predicate. It can be easily improved because short-circuit evaluation of predicate. This can be applied to also other conditions other than just range comparison. So I'm thinking if we just need a way to give a hint to Spark to adjust the order of expression for an expensive one like UDF. |
| *AMD EPYC 7401 24-Core Processor | ||
| *sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| *--------------------------------------------------------------------------------------------- | ||
| *sort merge join wholestage off 12723 / 12800 0.0 1988008.4 1.0X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why wholestage-off case doesn't get much improvement as wholestage-on case?
|
Hey Liang-Chi, thanks for looking into this. With the optimization: This shows a 10x improvement over the non-optimized case (as I already said, this depends on the range condition, number of matched rows, the calculated function, etc.). Regarding your second question as to why is the "wholestage off" case in the optimized version so slow, that is because the optimization is turned off when the wholestage code generation is turned off. |
|
@zecevicp wholestage codegen now is turned on by default only if we have few columns (less than 100). This can be false in many real use-cases. Is there any specific reason why this optimization cannot be applied to the non-wholestage codegen case? If not, I think it is worth to consider also this case. |
|
Hi, Gaido, thanks for the comment. As I said, it was difficult to debug it and I didn't have time. We might open a different ticket for the non-wholestage codegen case, once this is merged? |
|
Test build #89961 has finished for PR 21109 at commit
|
|
Why do you say that it is difficult to debug? What was difficult? |
|
The code path with the optimization but without wholegen code generation gives wrong results. And I haven't been able to figure out where is the bug. I spent several hours at this again today. I've been using this code as an example: With wholeStage off this gives 65 as a result and 88 if wholeStage is on (with the optimization turned on). Without the optimization it is always 88, which is correct. |
|
Since I feel this is a limited case, I'm not certainly sure this optimization needs to be handled in smj. For spatial or temporal use cases, is it not enough to add dummy join keys to split tasks into pieces for workaround? Btw, can you fix this issue by more simpler code change? (I'm not sure this big change pays the performance gain...) |
|
@viirya I think predicate reordering based on cost estimation and others is an interesting topic in optimizer (other databases, e.g., postgresql, apply the optimization). But, IMO the topic is not directly related to this pr, it'd be better to file another jira to keep the discussion (and the benchmark result you reported)? I couldn't find a jira entry for the topic. |
|
@maropu Regarding the first point, whether this belongs to SMJ or not, take a look at this paper, page 11. They describe several special cases of SMJ, one of them being "epsilon-join", which is exactly what is implemented here. Regarding adding dummy join keys (some kind of binning) to do spatial or temporal joining, that wouldn't help in our case and I believe in many other cases because you would need to bin the data by the second column in a fixed manner. And there you would have the problems of how to join data beyond the borders of those bins (which would probably require additional data duplication). These bins would probably be calculated on the fly, so additional computing is required, and most probably an additional sorting step would be needed (additional pass through the data). Regarding making it a simpler change, although the number of changed lines is somewhat large, the change is well-confined to a specific code path and the minimum of existing code is changed. Most of the changes are new additions (new code for code generation; the The only thing that could be removed currently is the |
|
Test build #90475 has finished for PR 21109 at commit
|
|
I managed to fix the code path that is executed when the wholestage codegen is turned off. Now both code paths give the same results and have the optimization implemented. I also changed the tests in the With inner range optimization: So, there is 10x improvement for wholestage ON case and 21x improvement for wholestage OFF case. I believe this is now ready to be merged, which would greatly help us in our projects. |
|
Test build #90477 has finished for PR 21109 at commit
|
mgaido91
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just left some comments after a quick pass, but I am a bit concerned about the amount of changes we need for this patch. May you please check if you can reduce the amount of code changes?
|
|
||
| def unapply(plan: LogicalPlan): Option[ReturnType] = plan match { | ||
| case join @ Join(left, right, joinType, condition) => | ||
| logDebug(s"Considering join on: $condition") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why removing this debug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added it in the first place. Tried to remove unnecessary code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, this is the diff with the master, it was added in 2014....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, you could be right. I'll put it back.
| val (leftKeys, rightKeys) = joinKeys.unzip | ||
| logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys") | ||
| Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right)) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unnecessary empty line
| // and which can be used for secondary sort optimizations. | ||
| val rangePreds: mutable.Set[Expression] = mutable.Set.empty | ||
| var rangeConditions: Seq[BinaryComparison] = | ||
| if (SQLConf.get.useSmjInnerRangeOptimization) { // && SQLConf.get.wholeStageEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the comment please
| // of the two tables being joined, | ||
| // which are not used in the equijoin expressions, | ||
| // and which can be used for secondary sort optimizations. | ||
| val rangePreds: mutable.Set[Expression] = mutable.Set.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this? Can't we just use rangeConditions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to separate range conditions that are relevant for the optimizations from other join conditions. rangePreds is used later to remove range predicates from "other predicates" (if any).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that you are using it later but can't you use rangeConditions instead? they seem duplicates...they contain the same data IIUC
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rangeConditions contain "vice-versa" conditions in case left and right plans need to be switched.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see... maybe we can make this more clear in the comments then, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, will do
| } | ||
|
|
||
| private def isValidRangeCondition(l : Expression, r : Expression, | ||
| left : LogicalPlan, right : LogicalPlan, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: bad indent
| val equiset = joinKeys.filter{ case (ljk : Expression, rjk : Expression) => | ||
| ljk.references.toSeq.contains(lattrs(0)) && rjk.references.toSeq.contains(rattrs(0)) } | ||
| if (equiset.isEmpty) { | ||
| "asis" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we return something more meaningful than this string? Maybe a Option(Bool) in enough which tells whether to reverse or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, makes sense.
| "none" | ||
| } | ||
| else if (canEvaluate(l, left) && canEvaluate(r, right)) { | ||
| val equiset = joinKeys.filter{ case (ljk : Expression, rjk : Expression) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about using exists?
| val rightRngTmpKeyVars = createJoinKey(ctx, rightTmpRow, | ||
| rightUpperKeys.slice(0, 1), right.output) | ||
| val rightRngTmpKeyVarsDecl = rightRngTmpKeyVars.map(_.code).mkString("\n") | ||
| rightRngTmpKeyVars.foreach(_.code = "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we doing this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you mean why are we clearing the code variable, I found the same thing in WholestageCodegenExec:263 where it's claimed that that prevents the variable to be evaluated twice. The code works, so I didn't investigate further.
| rightLowerKeys.size == 0 || rightUpperKeys.size == 0) { | ||
| ctx.addNewFunction("dequeueUntilUpperConditionHolds", | ||
| "private void dequeueUntilUpperConditionHolds() { }", | ||
| inlineToOuterClass = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need this? can't we just use the returned name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate please? I'm not sure what you mean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we avoid to use inlineToOuterClass = true? I think we can avoid doing that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, OK. I'll investigate.
| * @param spillThreshold Threshold for number of rows to be spilled by internal buffer | ||
| */ | ||
| private[joins] class SortMergeJoinInnerRangeScanner( | ||
| streamedKeyGenerator: Projection, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent
| } | ||
| case _ => None | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: } else {?
|
@mgaido91 Regarding the amount of code, maybe you can suggest how to reduce it? Because I don't see a way... |
|
@zecevicp for instance do we really need |
|
Well, that is the essence of the contribution: to have a moving window over the data, instead of a fixed block (per equi-join match). To implement a moving window you need something like a queue. |
|
Test build #90642 has finished for PR 21109 at commit
|
srowen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some tiny code comments. I don't feel qualified to merge this, but would instead get a nod from at least one of people like @marmbrus @cloud-fan @gatorsmile or others who know the SQL planning code.
|
|
||
| // Only using secondary join optimization when both lower and upper conditions | ||
| // are specified (e.g. t1.a < t2.b + x and t1.a > t2.b - x) | ||
| if(rangeConditions.size != 2 || |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: space after "if" here and elsewhere
| // are specified (e.g. t1.a < t2.b + x and t1.a > t2.b - x) | ||
| if(rangeConditions.size != 2 || | ||
| // Looking for one < and one > comparison: | ||
| rangeConditions.filter(x => x.isInstanceOf[LessThan] || |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of checking .size == 0, something like rangeConditions.forall(... not instance of either ...)?
| // which are not used in the equijoin expressions, | ||
| // and which can be used for secondary sort optimizations. | ||
| // rangePreds will contain the original expressions to be filtered out later. | ||
| val rangePreds: mutable.Set[Expression] = mutable.Set.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to prefer val rangePreds = mutable.Set.empty[Expression] as it's shorter, but that's just taste
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'll adopt your version
| if(lattrs.size != 1 || rattrs.size != 1) { | ||
| None | ||
| } | ||
| else if (canEvaluate(l, left) && canEvaluate(r, right)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: pull else onto previous line
| */ | ||
| private def checkRangeConditions(l : Expression, r : Expression, | ||
| left : LogicalPlan, right : LogicalPlan, | ||
| joinKeys : Seq[(Expression, Expression)]) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For clarity add a return type to this method. Option[Boolean]?
| leftPlan: SparkPlan, | ||
| rightPlan: SparkPlan, | ||
| side: BuildSide) = { | ||
| leftKeys: Seq[Expression], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Undo this whitespace change and the next one)
|
|
||
| // Disabling these because the code would never follow this path in case of a inner range join | ||
| if (!expectRangeJoin) { | ||
| var counter = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want to avoid a var, just configOptions.zipWithIndex.foreach { case ((config, confValue), counter) =>. Just a tiny bit more idiomatic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, will do that.
| * excessive disk writes. This may lead to a performance regression compared to the normal case | ||
| * of using an [[ArrayBuffer]] or [[Array]]. | ||
| */ | ||
| private[sql] class InMemoryUnsafeRowQueue( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No way to avoid making a custom queue implementation here? is it messier without such a structure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A queue is needed here because it's a moving window instead of a fixed block of rows. Maybe I missed an existing class that could do this easily so I'll take another look. But, I believe any alternative would indeed be messier.
| .booleanConf | ||
| .createWithDefault(true) | ||
|
|
||
| val USE_SMJ_INNER_RANGE_OPTIMIZATION = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, at best make this internal. Are there conditions where you would not want to apply this? is it just a safety valve?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just a safety valve. In case there are some queries that I don't foresee now where this could get in the way.
…InMemoryUnsafeRowQueue class.
af59b8a to
0a5c8de
Compare
|
Test build #95807 has finished for PR 21109 at commit
|
|
Test build #99907 has finished for PR 21109 at commit
|
|
Can one of the admins verify this patch? |
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
JIRA description
The problem we are solving is the case where you have two big tables partitioned by X column, but also sorted within partitions by Y column and you need to calculate an expensive function on the joined rows, which reduces the number of output rows (e.g. condition based on a spatial distance calculation). But you could theoretically reduce the number of joined rows for which the calculation itself is performed by using a range condition on the Y column. Something like this:
... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d AND <function calculation...>However, during a sort-merge join with this range condition specified, Spark will first cross-join all the rows with the same X value and only then will it try to apply the range condition and any function calculations. This happens because, inside the generated sort-merge join (SMJ) code, these extra conditions are put in the same block with the function being calculated and there is no way to evaluate these conditions before reading all the rows to be checked into memory (into an
ExternalAppendOnlyUnsafeRowArray). If the two tables have a large number of rows per X, this can result in a huge number of calculations and a huge number of rows in executor memory, which can be unfeasible.We therefore propose a change to the sort-merge join so that, when these extra conditions are specified, a queue is used instead of the
ExternalAppendOnlyUnsafeRowArrayclass. This queue is then used as a moving window through the values from the right relation as the left row changes. You could call this a combination of an equi-join and a theta join; in literature it is sometimes called an “epsilon join”. We call it a "sort-merge inner range join".This design uses much less memory (not all rows with the same values of X need to be loaded into memory at once) and requires a much lower number of comparisons (the validity of this statement depends on the actual data and conditions used).
The optimization should be triggered automatically when an equi-join expression is present AND lower and upper range conditions on a secondary column are specified. If the tables aren't sorted by both columns, appropriate sorts should be added.
To limit the impact of this change we also propose adding a new parameter (tentatively named
spark.sql.join.smj.useInnerRangeOptimization) which could be used to switch off the optimization entirely.What changes were proposed in this pull request?
The main changes are made to these classes:
ExtractEquiJoinKeys– a pattern that needs to be extended to be able to recognize the case where a simple range condition with lower and upper limits is used on a secondary column (a column not included in the equi-join condition). The pattern also needs to extract the information later required for code generation etc.ExternalAppendOnlyUnsafeRowArray– changed so that it can function as a queue too. The rows need to be removed and added to/from the structure as the left key (X) changes, or the left secondary value (Y) changes, so the structure needs to be a queue. The class is no longer "append only", but I didn't want to change too many things.JoinSelection– a strategy that usesExtractEquiJoinKeysand needs to be aware of the extracted range conditionsSortMergeJoinExec– the main implementation of the optimization. Needs to support two code paths:• when whole-stage code generation is turned off (method
doExecute, which usessortMergeJoinInnerRangeScanner)• when whole-stage code generation is turned on (methods
doProduceandgenScanner)SortMergeJoinInnerRangeScanner– implements the SMJ with inner-range optimization in the case when whole-stage codegen is turned offHow was this patch tested?
Unit tests (
InnerRangeSuite.scala,JoinSuite.scala,ExternalAppendOnlyUnsafeRowArraySuite)Performance tests in
JoinBenchmark. The tests show 8x improvement over non-optimized code. Although, it should be noted that the results depend on the exact range conditions and the calculations performed on each matched row.In our case, we were not able to cross-match two rather large datasets (1.2 billion rows x 800 million rows) without this optimization. With the optimization, the cross-match finishes in less than 2 minutes.