Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partitioning reasoning in DataFusion and Ballista #284

Closed
mingmwang opened this issue Sep 26, 2022 · 5 comments
Closed

Partitioning reasoning in DataFusion and Ballista #284

mingmwang opened this issue Sep 26, 2022 · 5 comments
Labels
enhancement New feature or request

Comments

@mingmwang
Copy link
Contributor

mingmwang commented Sep 26, 2022

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
(This section helps Arrow developers understand the context and why for this feature, in addition to the what)

In current Ballista code base, when it generates the distributed plan, it will remove any non-hash repartition from the distributed plan.
And In DataFusion, when it does the physical planning, it added RepartitionExec node blindly when it sees the hash join or aggregation without considering the children's output partitioning.

            match repart.output_partitioning() {
                Partitioning::Hash(_, _) => {
                    let shuffle_writer = create_shuffle_writer(
                        job_id,
                        self.next_stage_id(),
                        children[0].clone(),
                        Some(repart.partitioning().to_owned()),
                    )?;
                    let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
                        shuffle_writer.stage_id(),
                        shuffle_writer.schema(),
                        shuffle_writer.output_partitioning().partition_count(),
                        shuffle_writer
                            .shuffle_output_partitioning()
                            .map(|p| p.partition_count())
                            .unwrap_or_else(|| {
                                shuffle_writer.output_partitioning().partition_count()
                            }),
                    ));
                    stages.push(shuffle_writer);
                    Ok((unresolved_shuffle, stages))
                }
                _ => {
                    // remove any non-hash repartition from the distributed plan
                    Ok((children[0].clone(), stages))
                }
            }

When I look into Presto's source code, Presto's distributed plan can includes both remote exchanges and local exchanges.
Local exchange can benefit the inner Stage parallelism. Presto can add the remote exchanges and local exchanges only when necessary. I think it is time to introduce more advanced methods to reason the partitioning in a distributed plan, something more powerful than Spark SQL's EnsureRequirements rule

Incorporating Partitioning and Parallel Plans into the SCOPE Optimizer
http://www.cs.albany.edu/~jhh/courses/readings/zhou10.pdf

Describe the solution you'd like
A clear and concise description of what you want to happen.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

@mingmwang mingmwang added the enhancement New feature or request label Sep 26, 2022
@thinkharderdev
Copy link
Contributor

👍

I would add a couple things to this discussion:

Currently partitioning is a bit restrictive in that either the client has to specify the partitioning or we fall back to a single default partitioning. This has several limitations

  1. Partitioning should be sensitive to cluster capacity. Adding more partitions is probably counter-productive if you don't have enough executor capacity to execute the concurrently. All you are doing is adding scheduling overhead and shuffle exchange cost.
  2. The client probably has no information about the cluster capacity (which may itself be dynamic based on auto-scaling rules) nor does it necessarily have any information about how much data is scanned for a given query. So the client would lack any relevant information for deciding what the appropriate number of partitions are.
  3. Allowing the client to specify the number of partitions can be problematic for multi-tenancy since there is no way to prevent a greedy client from consuming too many cluster resources.

There may not be an optimal general solution for all of this so I think it might be useful to make this functionality pluggable. Something I've been tinkering with is to put the planning behind a trait which can allow for custom implementations. Something like:

trait BallistaPlanner {
    fn plan_query_stages<'a>(
        &'a mut self,
        job_id: &'a str,
        execution_plan: Arc<dyn ExecutionPlan>,
    ) -> Result<Vec<Arc<ShuffleWriterExec>>>;
}

trait BallistaPlannerFactory {
    type Output: BallistaPlanner;

    fn new_planner<T: 'static + AsLogicalPlan,U: 'static + AsExecutionPlan>(state: &Arc<SchedulerState<T,U>>) -> Output;
}

The existing BallistaPlanner could be the default implementation of course. But his could give us a nice mechanism for experimenting with different implementations as well.

@Dandandan
Copy link
Contributor

I'll add the addition of range partitioning as well to this list - currently normal sorts are not running in parallel / distributed fashion.

@mingmwang
Copy link
Contributor Author

mingmwang commented Sep 28, 2022

I'll add the addition of range partitioning as well to this list - currently normal sorts are not running in parallel / distributed fashion.

Yes, currently there are couple of gaps in the physical plan phase. The ExecutionPlan trait need to be enhanced also.
Considering below SQL, ideally it should only need 1 or 2 remote shuffle exchanges.
Today SparkSQL has 4 shuffle exchanges, not sure how many remote shuffles PrestoSQL/Trino has.

select cntry_id, count(*), count(*) over (partition by cntry_id) as w,
count(*) over (partition by curncy_id) as c,
count(cntry_desc) over (partition by curncy_id,cntry_desc) as d,
count(cntry_desc) over (partition by cntry_id, curncy_id,cntry_desc) as e
from dw_countries group by cre_date, cntry_id, curncy_id,cntry_desc having cntry_id = 3;

I'm working on an experimenting rule for this now and also try to verify a new optimization process, if it is proved. it will be much easy to write new optimization rules. And the same methods can be applied to logical optimization rules as well.

@mingmwang
Copy link
Contributor Author

Another recent paper, see the EXCHANGE PLACEMENT sections.

https://vldb.org/pvldb/vol15/p936-rajan.pdf

@mingmwang
Copy link
Contributor Author

The issue is partially addressed by the new Enforcement rule in DataFusion, so just close the issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants