Skip to content

Spark SystemFunctions are not pushed down during JOIN#9233

Closed
tmnd1991 wants to merge 1 commit intoapache:mainfrom
tmnd1991:feature/9232
Closed

Spark SystemFunctions are not pushed down during JOIN#9233
tmnd1991 wants to merge 1 commit intoapache:mainfrom
tmnd1991:feature/9232

Conversation

@tmnd1991
Copy link

@tmnd1991 tmnd1991 commented Dec 6, 2023

PR to verify bug reported in issue #9232

With some guidance I'm open to work on the fix too.

@github-actions github-actions bot added the spark label Dec 6, 2023
@tmnd1991
Copy link
Author

tmnd1991 commented Dec 7, 2023

Hi @ConeyLiu, this still needs some refinement (mostly wrt testing) but do you think the change make sense? I would avoid to work more on it if I'm way off ;)
Thanks 🙏

@tmnd1991 tmnd1991 changed the title Spark SystemFunctions are not pushed down during MERGE Spark SystemFunctions are not pushed down during JOIN Dec 7, 2023
} else {
filter.copy(condition = newCondition)
}
case j @ Join(_, _, _, Some(condition), _) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the join condition can be pushed to the leaf node by the Spark optimizer, right? I think this can not cover the COW/MOR cases. COW/MOR needs to do some special handling here. I plan to do it, however, I've been quite busy lately.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discovered the bug working with a MERGE statement and actually this works both with CoW and MoR, I have it running on my cluster like that, and it's correctly pruning all the partitions

@tmnd1991
Copy link
Author

tmnd1991 commented Dec 7, 2023

cc @nastra @dramaticlly @advancedxy for review
thanks

Copy link
Contributor

@advancedxy advancedxy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you determine that the SystemFunctions are not pushed down?

Spark will push down predicate(which includes predicates containing system functions) through join(except for full outer join), see: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1912 . So I don't think you need to handle joins specifically in ReplaceStaticInvoke.

@tmnd1991
Copy link
Author

tmnd1991 commented Dec 8, 2023

How do you determine that the SystemFunctions are not pushed down?

Spark will push down predicate(which includes predicates containing system functions) through join(except for full outer join), see: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1912 . So I don't think you need to handle joins specifically in ReplaceStaticInvoke.

Thanks @advancedxy , now, that explains a lot of what I was observing happening in my project.
During a MERGE (which is 2 joins, one LeftSemi + one FullOuter) I was observing that the first join was correctly pruning the partitions, while the secondo one, was not. Adding this patch though helps pruning more partitions, this is because the batch scan on the target table cannot prune partitions because the file names (collected as a result of the first join) are not known when performing physical planning. I think we should limit the replacement to the "full outer" case, what do you think?

@advancedxy
Copy link
Contributor

Adding this patch though helps pruning more partitions, this is because the batch scan on the target table cannot prune partitions because the file names (collected as a result of the first join) are not known when performing physical planning. I think we should limit the replacement to the "full outer" case, what do you think?

Could you elaborate a bit more? the planning tree string/dag of Spark SQL would be helpful.
If the join type is full outer, the predicate could not be pushed down, therefore the partition pruning is unlikely to be performed.

@tmnd1991
Copy link
Author

tmnd1991 commented Dec 8, 2023

Sure, let me add a bit of context:
I have two table with the exact same schema/layout, partitioned on 3 columns:

  • identity(MEAS_YM)
  • identity(MEAS_DD)
  • bucket(POD, 4)
    The source table (small one) has strictly a subset of partitions w/r/t the target table (big one).
    In this example I will talk about a local reproducer but keep in mind we are talking about a 65TB table with 400k partitions, so every 1% improvement actually means a lot.

I started running a merge statement as following, taking advantage of SPJ:

MERGE INTO target USING (SELECT * FROM source)
ON target.MEAS_YM = source.MEAS_YM AND target. MEAS_DD = source. MEAS_DD AND target.POD = source.POD
WHEN MATCHED THEN UPDATE SET ...

This results in the following physical plan:

== Physical Plan ==
ReplaceData (13)
+- * Sort (12)
   +- * Project (11)
      +- MergeRows (10)
         +- SortMergeJoin FullOuter (9)
            :- * Sort (4)
            :  +- * Project (3)
            :     +- * ColumnarToRow (2)
            :        +- BatchScan target (1)
            +- * Sort (8)
               +- * Project (7)
                  +- * ColumnarToRow (6)
                     +- BatchScan source (5)
===== Subqueries =====

Subquery:1 Hosting operator id = 1 Hosting Expression = _file#2274 IN subquery#2672
* HashAggregate (26)
+- Exchange (25)
   +- * HashAggregate (24)
      +- * Project (23)
         +- * SortMergeJoin LeftSemi (22)
            :- * Sort (17)
            :  +- * Filter (16)
            :     +- * ColumnarToRow (15)
            :        +- BatchScan target (14)
            +- * Sort (21)
               +- * Filter (20)
                  +- * ColumnarToRow (19)
                     +- BatchScan source (18)

with

(1) BatchScan target
Output [60]: [..., _file#2274]
target (branch=null) [filters=, groupedBy=MEAS_YM, MEAS_DD, POD_bucket]

(5) BatchScan source
Output [60]: [...]
source (branch=null) [filters=, groupedBy=MEAS_YM, MEAS_DD, POD_bucket]

(14) BatchScan target
Output [8]: [..., _file#2590]
target (branch=null) [filters=POD IS NOT NULL, MEAS_YM IS NOT NULL, MEAS_DD IS NOT NULL, groupedBy=MEAS_YM, MEAS_DD, POD_bucket]

(18) BatchScan source
Output [7]: [...]
source (branch=null) [filters=POD IS NOT NULL, MEAS_YM IS NOT NULL, MEAS_DD IS NOT NULL, groupedBy=MEAS_YM, MEAS_DD, POD_bucket]

This was creating 33 (+10 to exchange the file names) tasks for the subquery and 33 tasks for the second join.
Practically I know for sure that I hit only 25 partitions, not 33 (i.e. some files were still read even if we know upfront that they are not needed, also the _file IN (subquery) can't prune any file because it's dynamic. On top of that, I observed that even if files should've been excluded by Spark in post-scan filter, still the execution of the task was not as fast as I expected (i.e. close to 0ms)).

Therefore, knowing exactly the partitions that I hit beforehand, I tried to help iceberg/spark a little enumerating the partitions values that are actually hit:

MERGE INTO target USING (SELECT * FROM source)
ON target.`POD` = source.`POD` AND target.`MEAS_YM` = source.`MEAS_YM` AND target.`MEAS_DD` = source.`MEAS_DD` AND (
  (target.`meas_ym` = '202306' AND target.`meas_dd` = '02' AND system.bucket(4, target.`pod`) IN (0,2,3)) OR
  (target.`meas_ym` = '202306' AND target.`meas_dd` = '01') OR 
  (target.`meas_ym` = '202307' AND target.`meas_dd` = '02' AND system.bucket(4, target.`pod`) IN (1,3)) OR 
  (target.`meas_ym` = '202306' AND target.`meas_dd` = '03') OR 
  (target.`meas_ym` = '202308' AND target.`meas_dd` = '01' AND system.bucket(4, target.`pod`) IN (0,1,2)) OR 
  (target.`meas_ym` = '202307' AND target.`meas_dd` = '03' AND system.bucket(4, target.`pod`) IN (0,1,2)) OR 
  (target.`meas_ym` = '202308' AND target.`meas_dd` = '03' AND system.bucket(4, target.`pod`) IN (0,3)) OR 
  (target.`meas_ym` = '202307' AND target.`meas_dd` = '01' AND system.bucket(4, target.`pod`) IN (0,1,2)) OR 
  (target.`meas_ym` = '202308' AND target.`meas_dd` = '02' AND system.bucket(4, target.`pod`) IN (3)))
WHEN MATCHED THEN UPDATE SET ...

To my surprise the plan was exactly the same...

Then I fixed this issue and also #9191 locally (adding an optimiser to my spark session) and the scans actually changed:

(1) BatchScan target
Output [60]: [..., _file#2279]
target (branch=null) [filters=((((MEAS_YM = '202306' AND ((MEAS_DD = '02' AND bucket[4](POD) IN (0, 2, 3)) OR MEAS_DD = '01')) OR ((MEAS_YM = '202307' AND MEAS_DD = '02') AND bucket[4](POD) IN (1, 3))) OR ((MEAS_YM = '202306' AND MEAS_DD = '03') OR ((MEAS_YM = '202308' AND MEAS_DD = '01') AND bucket[4](POD) IN (0, 1, 2)))) OR ((MEAS_DD = '03' AND ((MEAS_YM = '202307' AND bucket[4](POD) IN (0, 1, 2)) OR (MEAS_YM = '202308' AND bucket[4](POD) IN (0, 3)))) OR (((MEAS_YM = '202307' AND MEAS_DD = '01') AND bucket[4](POD) IN (0, 1, 2)) OR ((MEAS_YM = '202308' AND MEAS_DD = '02') AND bucket[4](POD) = 3)))), groupedBy=MEAS_YM, MEAS_DD, POD_bucket]

(5) BatchScan source
Output [60]: [...]
source (branch=null) [filters=, groupedBy=MEAS_YM, MEAS_DD, POD_bucket]

(14) BatchScan target
Output [8]: [..., _file#2590]
target (branch=null) [filters=((((MEAS_YM = '202306' AND ((MEAS_DD = '02' AND bucket[4](POD) IN (0, 2, 3)) OR MEAS_DD = '01')) OR ((MEAS_YM = '202307' AND MEAS_DD = '02') AND bucket[4](POD) IN (1, 3))) OR ((MEAS_YM = '202306' AND MEAS_DD = '03') OR ((MEAS_YM = '202308' AND MEAS_DD = '01') AND bucket[4](POD) IN (0, 1, 2)))) OR ((MEAS_DD = '03' AND ((MEAS_YM = '202307' AND bucket[4](POD) IN (0, 1, 2)) OR (MEAS_YM = '202308' AND bucket[4](POD) IN (0, 3)))) OR (((MEAS_YM = '202307' AND MEAS_DD = '01') AND bucket[4](POD) IN (0, 1, 2)) OR ((MEAS_YM = '202308' AND MEAS_DD = '02') AND bucket[4](POD) = 3)))), POD IS NOT NULL, MEAS_YM IS NOT NULL, MEAS_DD IS NOT NULL, MAGNITUDE IS NOT NULL, METER_KEY IS NOT NULL, REC_ID IS NOT NULL, COLLECT_ID IS NOT NULL, groupedBy=MEAS_YM, MEAS_DD, POD_bucket]

(18) BatchScan source
Output [7]: [...]
source (branch=null) [filters=POD IS NOT NULL, MEAS_YM IS NOT NULL, MEAS_DD IS NOT NULL, MAGNITUDE IS NOT NULL, METER_KEY IS NOT NULL, REC_ID IS NOT NULL, COLLECT_ID IS NOT NULL, groupedBy=MEAS_YM, MEAS_DD, POD_bucket]

With this plan I obtain 25 (+10 of shuffle) + 25 tasks, hitting actually only the minimum number of partitions.


Given the context, I think that I probably highlighted 2 "bugs":

  1. the fact that also the full-outer join condition can be used to prune partitions (fixed in this PR)
  2. for some reason spark is not able to detect correctly the minimum subset of hit partitions (maybe I can work on another PR for this, but I guess it's much harder and maybe part of Spark codebase)

@advancedxy
Copy link
Contributor

== Physical Plan ==
ReplaceData (13)
+- * Sort (12)
   +- * Project (11)
      +- MergeRows (10)
         +- SortMergeJoin FullOuter (9)  <---- Full Outer here

If the join type is full outer, it means that there are NoMatchedActions. So your merge into command should have an when not matched clause, is that correct?

Output [60]: [..., _file#2279]
target (branch=null) [filters=((((MEAS_YM = '202306' AND ((MEAS_DD = '02' AND bucket[4](POD) IN (0, 2, 3)) OR MEAS_DD = '01')) OR ((MEAS_YM = '202307' AND MEAS_DD = '02') AND bucket[4](POD) IN (1, 3))) OR ((MEAS_YM = '202306' AND MEAS_DD = '03') OR ((MEAS_YM = '202308' AND MEAS_DD = '01') AND bucket[4](POD) IN (0, 1, 2)))) OR ((MEAS_DD = '03' AND ((MEAS_YM = '202307' AND bucket[4](POD) IN (0, 1, 2)) OR (MEAS_YM = '202308' AND bucket[4](POD) IN (0, 3)))) OR (((MEAS_YM = '202307' AND MEAS_DD = '01') AND bucket[4](POD) IN (0, 1, 2)) OR ((MEAS_YM = '202308' AND MEAS_DD = '02') AND bucket[4](POD) = 3)))), groupedBy=MEAS_YM, MEAS_DD, POD_bucket]

(5) BatchScan source
Output [60]: [...]
source (branch=null) [filters=, groupedBy=MEAS_YM, MEAS_DD, POD_bucket]

(14) BatchScan target
Output [8]: [..., _file#2590]
target (branch=null) [filters=((((MEAS_YM = '202306' AND ((MEAS_DD = '02' AND bucket[4](POD) IN (0, 2, 3)) OR MEAS_DD = '01')) OR ((MEAS_YM = '202307' AND MEAS_DD = '02') AND bucket[4](POD) IN (1, 3))) OR ((MEAS_YM = '202306' AND MEAS_DD = '03') OR ((MEAS_YM = '202308' AND MEAS_DD = '01') AND bucket[4](POD) IN (0, 1, 2)))) OR ((MEAS_DD = '03' AND ((MEAS_YM = '202307' AND bucket[4](POD) IN (0, 1, 2)) OR (MEAS_YM = '202308' AND bucket[4](POD) IN (0, 3)))) OR (((MEAS_YM = '202307' AND MEAS_DD = '01') AND bucket[4](POD) IN (0, 1, 2)) OR ((MEAS_YM = '202308' AND MEAS_DD = '02') AND bucket[4](POD) = 3)))), POD IS NOT NULL, MEAS_YM IS NOT NULL, MEAS_DD IS NOT NULL, MAGNITUDE IS NOT NULL, METER_KEY IS NOT NULL, REC_ID IS NOT NULL, COLLECT_ID IS NOT NULL, groupedBy=MEAS_YM, MEAS_DD, POD_bucket]

(18) BatchScan source
Output [7]: [...]
source (branch=null) [filters=POD IS NOT NULL, MEAS_YM IS NOT NULL, MEAS_DD IS NOT NULL, MAGNITUDE IS NOT NULL, METER_KEY IS NOT NULL, REC_ID IS NOT NULL, COLLECT_ID IS NOT NULL, groupedBy=MEAS_YM, MEAS_DD, POD_bucket]

Could you give the full plan tree or dag for this changed plan? Is the join type still full outer? This is quite strange. I'm not sure why Filter would be pushed down to the data source for a full outer join. You may set spark.sql.planChangeLog.level to INFO to get which rule changes the plan, and posted related plan changes in a gist, that would help to clarify the problem.

@tmnd1991
Copy link
Author

tmnd1991 commented Dec 8, 2023

yes sorry, there’s also a when not matched statement. i can’t attach the plan, but i’ll push a reproducer soon

@tmnd1991
Copy link
Author

tmnd1991 commented Dec 8, 2023

Finally I got a reproducer inside the codebase, you can find it at TestSPJWithBucketing.
Spark 3.4 (same as my app) with the condition on the partitions will actually prune the unaffected partitions, while 3.5 will not.

Anyway the more I work on this, the more I think the issue should be solved directly on the Scan, not by adding conditions manually. All info should be available to Spark beforehand, am I right?

@advancedxy
Copy link
Contributor

Spark 3.4 (same as my app) with the condition on the partitions will actually prune the unaffected partitions, while 3.5 will not.

I did some quick debug. The reason why spark 3.4 succeeded is that org.apache.spark.sql.execution.datasources.v2.RowLevelCommandScanRelationPushDown in Iceberg pushes down join conditions into the target source. In spark 3.5, this rule is removed in favor of upstream spark's GroupBasedRowLevelOperationScanPlanning, which push down commands' condition instead of rewrite plan's filter(apache/spark@5a92ecc#diff-635af3d82f2675b4bb3fd07673916477844a2a7b76d65b23b9cda9a63228ec6dR40).
So to make system function push down work for merge statement, you may have to pattern match ReplaceData and MergeRows, etc. Also cc @aokolnychyi .

I'm not sure why Filter would be pushed down to the data source for a full outer join

This question is answered, it's covered by RowLevelCommandScanRelationPushDown or GroupBasedRowLevelOperationScanPlanning

@wypoon
Copy link
Contributor

wypoon commented Dec 20, 2023

The main in my local clone is at d6eba2a. I applied the diff from this PR to my local main.
The TestSPJWithBucketing added fails for me:

> Task :iceberg-spark:iceberg-spark-extensions-3.5_2.12:test

org.apache.iceberg.spark.extensions.TestSPJWithBucketing > testMergeSPJwithoutCondition[catalogName = testhive, implementation = org.apache.iceberg.spark.SparkCatalog, config = {type=hive, default-namespace=default}] FAILED
    org.opentest4j.AssertionFailedError: 
    expected: 12
     but was: 18
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at org.apache.iceberg.spark.extensions.TestSPJWithBucketing.testWithCondition(TestSPJWithBucketing.java:85)
        at org.apache.iceberg.spark.extensions.TestSPJWithBucketing.testMergeSPJwithoutCondition(TestSPJWithBucketing.java:55)

org.apache.iceberg.spark.extensions.TestSPJWithBucketing > testMergeSPJwithCondition[catalogName = testhive, implementation = org.apache.iceberg.spark.SparkCatalog, config = {type=hive, default-namespace=default}] FAILED
    org.opentest4j.AssertionFailedError: 
    expected: 12
     but was: 15
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at org.apache.iceberg.spark.extensions.TestSPJWithBucketing.testWithCondition(TestSPJWithBucketing.java:85)
        at org.apache.iceberg.spark.extensions.TestSPJWithBucketing.testMergeSPJwithCondition(TestSPJWithBucketing.java:44)

2 tests completed, 2 failed

> Task :iceberg-spark:iceberg-spark-extensions-3.5_2.12:test FAILED

Same with 3.4.

I assume the test is expected to pass with the changes in ReplaceStaticInvoke in this PR. In other words, the test should fail without the changes in ReplaceStaticInvoke in this PR and should pass with them.

Comment on lines 82 to 124
long affectedPartitions =
sql(spark, "SELECT DISTINCT(partition) FROM %s.files", sourceTableName).count();
int shufflePartitions = Integer.parseInt(spark.conf().get("spark.sql.shuffle.partitions"));
Assertions.assertThat(tasks).isEqualTo(affectedPartitions * 2 + shufflePartitions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please explain the reasoning behind this assertion?

Copy link
Author

@tmnd1991 tmnd1991 Dec 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.
the target table is created with the following partitions (year_month, day, bucket(4, id)):

  • 202306/01/0
  • 202306/01/1
  • 202306/01/2
  • 202306/01/3
  • 202306/02/0
  • 202306/02/1
  • 202307/01/3

the source table is created with the following partitions:

  • 202306/01/0
  • 202306/01/1
  • 202306/02/0
  • 202307/01/3

so the source table partitions are (is) a subset of the target table partitions.

Spark statically knows that info, because it's part of the metadata that iceberg keeps.

So copy-on-write "merge" consists of 2 jobs:

  1. left-semi join to understand which files are affected by the merge
  2. full-outer join where on the left side discards all the files not found while executing 1

In our particular case (where we know that source table partitions are a subset of target table partitions) if we do that with a Storage Partitioned Join, the most efficient way to do it is to:

  1. create 1 task for each partition that will change, read all the files from both tables, join locally, collect the file names
  2. create 1 task for each partition that will change, for each task, read all the files from the target table / partition except the ones that will not change (that's the effect of the IN), read all the files from the source table, join and apply merge logic locally, write out new files, add these files to the snapshot and remove the original files from the snapshot

Disclaimer: I know very little about internals and I can only imagine how hard can this be to actually done like that, but I'm quite sure that is "logically" doable 😄

so the reasoning of the number of tasks is:

  • 1 task per partition that is going to change to collect the affected files
  • spark.sql.shuffle.partitions to shuffle the file list (which I thought could be broadcasted, but I think it's not important right now)
  • 1 task per partition that is going to change to actually rewrite it

Let me know if there is any fallacy in my reasoning

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forgot to add: if I add the condition (after the patch to ReplaceStaticInvoke) it actually prunes the partitions (and tasks) in 3.4 (but not in 3.5).

Another thing: I know that the tasks that get created are actually very fast (I would say almost skipped) but the thing is that if the target table has 400.000 partitions, even the scheduling of those no-op tasks kills the performance of my job

@tmnd1991
Copy link
Author

The main in my local clone is at d6eba2a. I applied the diff from this PR to my local main. The TestSPJWithBucketing added fails for me:

> Task :iceberg-spark:iceberg-spark-extensions-3.5_2.12:test

org.apache.iceberg.spark.extensions.TestSPJWithBucketing > testMergeSPJwithoutCondition[catalogName = testhive, implementation = org.apache.iceberg.spark.SparkCatalog, config = {type=hive, default-namespace=default}] FAILED
    org.opentest4j.AssertionFailedError: 
    expected: 12
     but was: 18
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at org.apache.iceberg.spark.extensions.TestSPJWithBucketing.testWithCondition(TestSPJWithBucketing.java:85)
        at org.apache.iceberg.spark.extensions.TestSPJWithBucketing.testMergeSPJwithoutCondition(TestSPJWithBucketing.java:55)

org.apache.iceberg.spark.extensions.TestSPJWithBucketing > testMergeSPJwithCondition[catalogName = testhive, implementation = org.apache.iceberg.spark.SparkCatalog, config = {type=hive, default-namespace=default}] FAILED
    org.opentest4j.AssertionFailedError: 
    expected: 12
     but was: 15
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at org.apache.iceberg.spark.extensions.TestSPJWithBucketing.testWithCondition(TestSPJWithBucketing.java:85)
        at org.apache.iceberg.spark.extensions.TestSPJWithBucketing.testMergeSPJwithCondition(TestSPJWithBucketing.java:44)

2 tests completed, 2 failed

> Task :iceberg-spark:iceberg-spark-extensions-3.5_2.12:test FAILED

Same with 3.4.

I assume the test is expected to pass with the changes in ReplaceStaticInvoke in this PR. In other words, the test should fail without the changes in ReplaceStaticInvoke in this PR and should pass with them.

I just rebased and tested on top of latest main (2eea697) and I have all tests (TestSPJWithBucketing) failing on 3.5, while on 3.4 testMergeSPJwithCondition passes and testMergeSPJwithoutCondition does not (and that is expected to me, because merge/SPJ is not smart enough imho, so with the guidance on the condition it works, otherwise it does not).

@wypoon
Copy link
Contributor

wypoon commented Dec 21, 2023

@tmnd1991 are you saying that TestSPJWithBucketing is supposed to fail? I thought that the idea is to write a test that fails without the change in this PR but passes with it.

@tmnd1991
Copy link
Author

@tmnd1991 are you saying that TestSPJWithBucketing is supposed to fail? I thought that the idea is to write a test that fails without the change in this PR but passes with it.

Test "TestSPJWithBucketing#testMergeSPJwithCondition` on 3.4 passes with the patch and fails without the patch, the other one always fails because I wanted to highlight another fallacy, probably I should remove it and address it in another PR (or directly on Spark repo?).

@wypoon
Copy link
Contributor

wypoon commented Dec 21, 2023

I ran TestSPJWithBucketing on Spark 3.4 and I do see that testMergeSPJwithCondition passes and testMergeSPJwithoutCondition does not (on Spark 3.5 both fail). I believe that the reason the test results are different between Spark 3.4 and 3.5 is

% diff spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceStaticInvoke.scala spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceStaticInvoke.scala
25a26
> import org.apache.spark.sql.catalyst.plans.FullOuter
61c62
<       case j @ Join(_, _, _, Some(condition), _) =>
---
>       case j @ Join(_, _, FullOuter, Some(condition), _) =>

@wypoon
Copy link
Contributor

wypoon commented Dec 21, 2023

I ran TestSPJWithBucketing on Spark 3.4 and I do see that testMergeSPJwithCondition passes and testMergeSPJwithoutCondition does not (on Spark 3.5 both fail). I believe that the reason the test results are different between Spark 3.4 and 3.5 is

% diff spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceStaticInvoke.scala spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceStaticInvoke.scala
25a26
> import org.apache.spark.sql.catalyst.plans.FullOuter
61c62
<       case j @ Join(_, _, _, Some(condition), _) =>
---
>       case j @ Join(_, _, FullOuter, Some(condition), _) =>

I was wrong in the above hypothesis. Nevertheless, why the difference in ReplaceStaticInvoke between Spark 3.4 and 3.5?

@tmnd1991
Copy link
Author

I ran TestSPJWithBucketing on Spark 3.4 and I do see that testMergeSPJwithCondition passes and testMergeSPJwithoutCondition does not (on Spark 3.5 both fail). I believe that the reason the test results are different between Spark 3.4 and 3.5 is

% diff spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceStaticInvoke.scala spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceStaticInvoke.scala
25a26
> import org.apache.spark.sql.catalyst.plans.FullOuter
61c62
<       case j @ Join(_, _, _, Some(condition), _) =>
---
>       case j @ Join(_, _, FullOuter, Some(condition), _) =>

I was wrong in the above hypothesis. Nevertheless, why the difference in ReplaceStaticInvoke between Spark 3.4 and 3.5?

It was just me playing around, now they are identical, but the test results are the same as before (i.e. different between 3.4 and 3.5)

reformat

The fix works but the test does not :)

Change test

Replace only full outer joins

Add merge SPJ reproducer

Align ReplaceStaticInvoke between 3.4 and 3.5
@tmnd1991
Copy link
Author

@aokolnychyi I see you fixed part of this in #9873 but Spark 3.4 looks stil bugged on main. Do you have any time to give me a feedback on this?

@aokolnychyi
Copy link
Contributor

Sorry for the delay, @tmnd1991. The last several months were really busy. I think a simple cherry-pick to 3.4 should be enough. The original change was for 3.5. Could you take a look at PR #10119?

@github-actions
Copy link

github-actions bot commented Oct 7, 2024

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Oct 7, 2024
@github-actions
Copy link

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Oct 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants