Skip to content

Conversation

@ulysses-you
Copy link
Contributor

@ulysses-you ulysses-you commented Dec 13, 2019

What changes were proposed in this pull request?

Add cache for Like and RLike when pattern is not static

Why are the changes needed?

When pattern is not static, we should avoid compile pattern every time if some pattern is same.
Here is perf numbers, include 3 test groups and use range to make it easy.

// ---------------------
// 10,000 rows and 10 partitions
val df1 = spark.range(0, 10000, 1, 10).withColumnRenamed("id", "id1")
val df2 = spark.range(0, 10000, 1, 10).withColumnRenamed("id", "id2")

val start = System.currentTimeMillis
df1.join(df2).where("id2 like id1").count()
// before  16939 
// after    6352
println(System.currentTimeMillis - start)

// ---------------------
// 10,000 rows and 100 partitions
val df1 = spark.range(0, 10000, 1, 100).withColumnRenamed("id", "id1")
val df2 = spark.range(0, 10000, 1, 100).withColumnRenamed("id", "id2")

val start = System.currentTimeMillis
df1.join(df2).where("id2 like id1").count()
// before  11070 
// after    4680
println(System.currentTimeMillis - start)

// ---------------------
// 20,000 rows and 10 partitions
val df1 = spark.range(0, 20000, 1, 10).withColumnRenamed("id", "id1")
val df2 = spark.range(0, 20000, 1, 10).withColumnRenamed("id", "id2")

val start = System.currentTimeMillis
df1.join(df2).where("id2 like id1").count()
// before 66962
// after  29934
println(System.currentTimeMillis - start)

Does this PR introduce any user-facing change?

No.

How was this patch tested?

s"""
String $rightStr = $eval2.toString();
$patternClass $pattern = $patternClass.compile($escapeFunc($rightStr, '$newEscapeChar'));
if ($lastRightStr != $rightStr) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this optimization is a corner case and highly depends on input data.... cc: @dongjoon-hyun

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and here I have an another idea.
Now BroadcastNestedLoopJoinExec always loop streamed firstly then nest loop broadcast and the pattern is always at broadcast side.
In inner like join, we can add another optimization that loop broadcast first and then nest loop streamed. So we can use the feature of this pr.

@SparkQA
Copy link

SparkQA commented Dec 13, 2019

Test build #115276 has finished for PR 26875 at commit 5250186.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 13, 2019

Test build #115285 has finished for PR 26875 at commit 424e0e3.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ulysses-you
Copy link
Contributor Author

Do you have time to take a look? @cloud-fan @HyukjinKwon @viirya

@cloud-fan
Copy link
Contributor

This seems not very useful. We should recommend users to use string literal as pattern.

@ulysses-you
Copy link
Contributor Author

Yes, it just has a few scene and it is disencouraging. But when it happens, spark will run very slowly.

I meet a scene that check two table which storage some file locations, then use join and like to compare location. There is only one way spark can choose is BroadcastNestedLoopJoin.

E.G. table t1 and table t2 both have 1 million rows, and t1 has 200 partitions, t2 is broadcasted, then spark will compile pattern 1 million * 1 million count. But with cache, just compile 1 million * 200 count.

s"""
String $rightStr = $eval2.toString();
$patternClass $pattern = $patternClass.compile($rightStr);
if ($rightStr != $lastRightStr) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean to use equals()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, forget this is not scala code.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be an uncommon use case. Do you have benchmark number?

@HyukjinKwon HyukjinKwon reopened this Dec 24, 2019
@HyukjinKwon
Copy link
Member

Sorry, I mistakenly closed. Yeah, it doesn't look worthwhile enough. and yeah, do you have perf numbers?

@ulysses-you
Copy link
Contributor Author

ulysses-you commented Dec 24, 2019

I test this code in my scene, about 10 times increase. Test sql like this

select /*+ broadcast(t2) */ count(*) from t1 join (
select c1 from t1 where limit 100
) t2 on t2.c1 like concat(t1.c1, '%')

t1 has 200,000 rows without duplicate.

@ulysses-you
Copy link
Contributor Author

It seems this sql is not in TPCDS and TPCH.

@SparkQA
Copy link

SparkQA commented Dec 24, 2019

Test build #115731 has finished for PR 26875 at commit d6650e1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ulysses-you
Copy link
Contributor Author

The failed ut is not related with this pr.

@HyukjinKwon
Copy link
Member

@ulysses-you can you show the exact steps and perf numbers in the PR description?

@ulysses-you
Copy link
Contributor Author

@ulysses-you can you show the exact steps and perf numbers in the PR description?

Sure I will add it later.

@ulysses-you
Copy link
Contributor Author

cc @HyukjinKwon @viirya @cloud-fan

I add the perf numbers and update pr description.

@ulysses-you
Copy link
Contributor Author

Do you have time to take a look? Thanks in advance.
@HyukjinKwon @viirya @maropu

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Jan 7, 2020

Test build #116209 has finished for PR 26875 at commit d6650e1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ulysses-you
Copy link
Contributor Author

@cloud-fan Do you have time to take a look again? Thanks in advance.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Jan 13, 2020

Test build #116582 has finished for PR 26875 at commit d6650e1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Merged to master.

@HyukjinKwon
Copy link
Member

Gentle ping @ulysses-you, would you mind if I ask to do a benchmark, for instance, with a very long strings like "aaaaaaaaaaaab" so strings have to be compared up to the end. I think we should check the worst case as well to see if this PR is really worhty.

@HyukjinKwon
Copy link
Member

@ulysses-you, we might have to revert this commit due to the lack of benchmark and performance investigation. Can we clarify this?

@beliefer
Copy link
Contributor

beliefer commented Feb 8, 2020

Gentle ping @ulysses-you, would you mind if I ask to do a benchmark, for instance, with a very long strings like "aaaaaaaaaaaab" so strings have to be compared up to the end. I think we should check the worst case as well to see if this PR is really worhty.

Yes, It seems to consider more.

@HyukjinKwon
Copy link
Member

@beliefer, seems the author is inactive. Can you make some perf numbers? If that's going to take a while, I am going to revert this PR before Spark 3.0 RC.

@beliefer
Copy link
Contributor

beliefer commented Feb 9, 2020

@HyukjinKwon You can revert this PR first. If I'm ready, I will pick this PR.

@HyukjinKwon
Copy link
Member

I am going to revert this at #27514

@beliefer
Copy link
Contributor

beliefer commented Mar 21, 2020

@HyukjinKwon After many tests, I found that this modification did not bring a clear performance improvement.

spark-sql> select count(*) from bigtable;
360014110

I test the sql show below:
select count(*) from bigtable where string(ws_ship_customer_sk) like '/%SystemDrive/%//Users%';

Spark3.1.0

Time Min 25th percentile Median 75th percentile Max
2.4 min 4 s 9 s 13 s 32 s 2.2 min
1.3 min 6 s 9 s 12 s 17 s 56 s
1.4 min 6 s 11 s 13 s 17 s 1.1 min
1.2 min 6 s 11 s 14 s 20 s 40 s
1.1 min 4 s 10 s 12 s 15 s 38 s
2.6 min 5 s 9 s 12 s 18 s 2.5 min

Spark3.1.0 apply this PR

Time Min 25th percentile Median 75th percentile Max
1.1 min 5 s 9 s 12 s 16 s 40 s
46 s 7 s 9 s 11 s 14 s 37 s
1.4 min 7 s 11 s 14 s 18 s 1.0 min
2.2 min 5 s 11 s 13 s 19 s 1.9 min
1.0 min 4 s 10 s 12 s 14 s 50 s
2.0 min 6 s 9 s 12 s 15 s 1.9 min

So I think we should not make this change and I will close #27497

@maropu
Copy link
Member

maropu commented Mar 21, 2020

Yea, closing #27497 looks fine to me.

@HyukjinKwon
Copy link
Member

Thank you so much @beliefer.

@ulysses-you
Copy link
Contributor Author

@HyukjinKwon sorry, I missed many things. And thanks @beliefer do the benchmark, but it seems something wrong, the right part should not be a foldable value.

The PR aims to improve perf of the dynamic like. e.g select count(*) from t where c1 like c2. Not affect the static like.

I will do a new benchmark with a very long strings what @HyukjinKwon suggested.

@HyukjinKwon
Copy link
Member

@ulysses-you, thank you so much. Feel free to reopen the PR.

@ulysses-you
Copy link
Contributor Author

Env: centos 7, 40cores, 4GB

---- test 1 ----

val df1 = spark.range(0, 20000, 1, 200).selectExpr("uuid() as c1")
val df2 = spark.range(0, 20000, 1, 200).selectExpr("uuid() as c2")
val start = System.currentTimeMillis
df1.join(df2).where("c2 like c1").count()
// 3 times test
// before  159228, 157541, 157721
// after   14378,  11545,  11498
println(System.currentTimeMillis - start)

---- test2 ----

// 17+1 length stirngs
val df1 = spark.range(0, 20000, 1, 200).selectExpr("concat('aaaaaaaaaaaaaaaaa', id%2) as c1")
val df2 = spark.range(0, 20000, 1, 200).selectExpr("concat('bbbbbbbbbbbbbbbbb', id%2) as c2")
val start = System.currentTimeMillis
df1.join(df2).where("c2 like c1").count()
// 3 times test
// before  90054, 90350, 90283
// after   13077, 10097, 9770
println(System.currentTimeMillis - start)

About 10x time performance improvement. Seems equals is more quickly than compile pattern. And longer strings would make performance improvement better.
cc @HyukjinKwon

s"""
String $rightStr = $eval2.toString();
$patternClass $pattern = $patternClass.compile($rightStr);
if (!$rightStr.equals($lastRightStr)) {
Copy link
Member

@HyukjinKwon HyukjinKwon Jun 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The positive cases are good enough. The concern I heard was actually here we add some overhead for string comparison, and it could be worse when the strings are very long.

Can we identify the worst cases? It's okay to show the trade-off explicitly. I tend to agree with compiling the pattern once is better in general. Feel free to reopen the PR once we're clear on the trade-off.

cc @rednaxelafx as well FYI.

@ulysses-you
Copy link
Contributor Author

---- test3 ----

val df1 = spark.range(0, 20000, 1, 200).selectExpr("uuid() as c1")
val df2 = spark.range(0, 20000, 1, 200).selectExpr("uuid() as c2")
val start = System.currentTimeMillis
df1.join(df2).where("c1 like c2").count()
// 3 times test
// before  159226, 159147, 159587
// after   159641, 160960, 160091
println(System.currentTimeMillis - start)

The worst case is that do compare and compile each row. And it seems only little regression.

@ulysses-you
Copy link
Contributor Author

Seems merged PR cann't reopen. Is there any way ? If not I will send an another pr for this.

@HyukjinKwon
Copy link
Member

You can create new one. Can I have some feedback from @rednaxelafx before we go ahead?

@rednaxelafx, does it looks good enough to you or do you have any guidance on the testing for him?

@HyukjinKwon
Copy link
Member

@kiszk too FYI

@beliefer
Copy link
Contributor

@ulysses-you I'm sorry! I lost some thing.
---- test 1 ----

val df1 = spark.range(0, 20000, 1, 200).selectExpr("uuid() as c1")
val df2 = spark.range(0, 20000, 1, 200).selectExpr("uuid() as c2")
val start = System.currentTimeMillis
df1.join(df2).where("c2 like c1").count()
// 3 times test
// before  159228, 157541, 157721
// after   14378,  11545,  11498
println(System.currentTimeMillis - start)

---- test3 ----

val df1 = spark.range(0, 20000, 1, 200).selectExpr("uuid() as c1")
val df2 = spark.range(0, 20000, 1, 200).selectExpr("uuid() as c2")
val start = System.currentTimeMillis
df1.join(df2).where("c1 like c2").count()
// 3 times test
// before  159226, 159147, 159587
// after   159641, 160960, 160091
println(System.currentTimeMillis - start)

Why is test1 and test3 so different in time?

@beliefer
Copy link
Contributor

---- test2 ----

// 17+1 length stirngs
val df1 = spark.range(0, 20000, 1, 200).selectExpr("concat('aaaaaaaaaaaaaaaaa', id%2) as c1")
val df2 = spark.range(0, 20000, 1, 200).selectExpr("concat('bbbbbbbbbbbbbbbbb', id%2) as c2")
val start = System.currentTimeMillis
df1.join(df2).where("c2 like c1").count()
// 3 times test
// before  90054, 90350, 90283
// after   13077, 10097, 9770
println(System.currentTimeMillis - start)

You use %2 is the extreme best case scenarios are used, so 10x time performance improvement cannot be demonstrated

@ulysses-you
Copy link
Contributor Author

The positive test case (test1, test2) is to confirm this pr has much better performance in dynamic like scene. The test1 and test2 aim to check the different string length. %2 has no meaning.

The negative test case (test3) is to confirm this pr has little performance regression.

Why is test1 and test3 so different in time?

Since it's a join opt, the best way is using the column which is at right of like as the BufferedSide, then when loop StreamSide, the BufferedSide column will always same as last, so it can avoid pattern compile each row.

@beliefer
Copy link
Contributor

test1 looks the same as test3.

@ulysses-you
Copy link
Contributor Author

ulysses-you commented Jun 23, 2020

It's different. test1 condition use c2 like c1 and test3 use c1 like c2. The result is test1 can always reuse the pattern and test3 always need to re-compile.

As I said above, make column that need to compile at join BufferedSide can avoid compile.

@beliefer
Copy link
Contributor

@ulysses-you OK. I understand it. But it seems that the performance is just good at scenarios that left join right.

@ulysses-you
Copy link
Contributor Author

yeah, but it also has little performance regression with normal case seen as test3. I think it's a reason to do this.

@beliefer
Copy link
Contributor

I think you should to supplement a lot of benchmark and performance investigation. Consider a variety of scenarios.
Then it's easier for us to make a judgment or decision.

@ulysses-you
Copy link
Contributor Author

Maybe I miss some point, test3 is the worst case so that other scenarios is always better than it, and test1 and test2 is the best positive case.

The result is that add the cache can make the performance better within [0, 10]x times. But if you have any special scenarios, I'm ready to do the benchmark.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants