Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug Fix]: Deem hash repartition unnecessary when input and output has 1 partition #10095

Merged
merged 2 commits into from
Apr 16, 2024

Conversation

mustafasrepo
Copy link
Contributor

Which issue does this PR close?

Closes #9928.

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Yes

Are there any user-facing changes?

@github-actions github-actions bot added core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Apr 16, 2024
@mustafasrepo
Copy link
Contributor Author

cc @echai58, this fix should solve the problem in the issue. Feel free to review, if you have time.

@mustafasrepo
Copy link
Contributor Author

@korowa thanks again for the analysis which helped a lot in understanding the issue and replicating it in datafusion.

Copy link
Contributor

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thank you

Copy link

@echai58 echai58 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix!

@alamb
Copy link
Contributor

alamb commented Apr 16, 2024

Thanks @mustafasrepo and @ozankabak

Omega359 pushed a commit to Omega359/arrow-datafusion that referenced this pull request Apr 16, 2024
…s 1 partition (apache#10095)

* Add input partition number check

* Minor changes
@korowa
Copy link
Contributor

korowa commented Apr 16, 2024

@mustafasrepo good catch on UNION -- I previously suggested that this kind of plan (with partitions > config.target_partitions) is "illegal" in DF.

@mustafasrepo
Copy link
Contributor Author

@mustafasrepo good catch on UNION -- I previously suggested that this kind of plan (with partitions > config.target_partitions) is "illegal" in DF.

I agree, this behaviour is a bit counter intuitive. However, with current implementation of the UnionExec it is hard to make sure partitions<=config.target_partitions always true. If this behaviour prone to errors. Maybe we can insert RepartitionExec on top UnionExecs if their output partition number > config.target_partitions. By this way, we can guarantee this violation wouldn't propagate to other operators.

11)------------ProjectionExec: expr=[1 as c, 3 as d]
12)--------------PlaceholderRowExec

query IIII
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this query is non determisitic and fails sometimes on CI as it doesn't have an ORDER BY and isn't annotated with rowsort. Here is a PR to fix that: #10120

@korowa
Copy link
Contributor

korowa commented Apr 23, 2024

Maybe we can insert RepartitionExec on top UnionExecs if their output partition number > config.target_partitions. By this way, we can guarantee this violation wouldn't propagate to other operators.

Probably better solution would be planning union inputs execution according to total available partitions -- e.g

    select l_linenumber as f
    from lineitem
    union all
    select l_orderkey as f
    from lineitem

with target_partitions = 4, could plan 2 threads for each ParquetExec (ideally we could also use byte/row statistics and plan according to them -- not only 2-2, but probably 1-3 if there is significant data skew across inputs/files).

Currently, with target_partitions = 4, it's planned as 4 threads per ParquetExec, and 8 output partitions for UNION.

And on top of it, when target_partitions is less then number of UNION inputs (e.g. UNION has 10 inputs, target_partitions = 4, and we need at least 1 thread for each input) there could be RepartitionExec.

@mustafasrepo
Copy link
Contributor Author

mustafasrepo commented Apr 24, 2024

Maybe we can insert RepartitionExec on top UnionExecs if their output partition number > config.target_partitions. By this way, we can guarantee this violation wouldn't propagate to other operators.

Probably better solution would be planning union inputs execution according to total available partitions -- e.g

    select l_linenumber as f
    from lineitem
    union all
    select l_orderkey as f
    from lineitem

with target_partitions = 4, could plan 2 threads for each ParquetExec (ideally we could also use byte/row statistics and plan according to them -- not only 2-2, but probably 1-3 if there is significant data skew across inputs/files).

Currently, with target_partitions = 4, it's planned as 4 threads per ParquetExec, and 8 output partitions for UNION.

And on top of it, when target_partitions is less then number of UNION inputs (e.g. UNION has 10 inputs, target_partitions = 4, and we need at least 1 thread for each input) there could be RepartitionExec.

That might work. However, this approach cannot solve all cases I guess. For the following query

select * from table
union all
select * from table
union all 
select * from table

when target_partitions=2, there is no way to change input partitions to generate 2 partitions after union. We would generate at least 3 partitions(assuming each input generates single partition). Hence, this approach may not solve all use cases. If I am not mistaken.

@echai58
Copy link

echai58 commented Apr 26, 2024

Hi guys, not familiar with datafusion's release process - is there a estimate of when this will be released in a new datafusion version?

@alamb
Copy link
Contributor

alamb commented Apr 26, 2024

Hi guys, not familiar with datafusion's release process - is there a estimate of when this will be released in a new datafusion version?

This should be included in 38.0.0 -- I just filed #10255 to track that release if you want to watch it @echai58

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Internal error: Invalid HashJoinExec partition count mismatch 1!=2 when constructing merge plan with 1 CPU
5 participants