Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: Add more documentation about Partitioning #8022

Merged
merged 11 commits into from
Nov 5, 2023
90 changes: 86 additions & 4 deletions datafusion/physical-expr/src/partitioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,95 @@
// specific language governing permissions and limitations
// under the License.

//! [`Partitioning`] and [`Distribution`] for physical expressions
//! [`Partitioning`] and [`Distribution`] for `ExecutionPlans`

use std::fmt;
use std::sync::Arc;

use crate::{physical_exprs_equal, EquivalenceProperties, PhysicalExpr};

/// Partitioning schemes supported by operators.
/// Output partitioning supported by [`ExecutionPlan`]s.
///
/// When `executed`, `ExecutionPlan`s produce one or more independent stream of
/// data batches in parallel, referred to as partitions. The streams are Rust
/// `aync` [`Stream`]s (a special kind of future). The number of output
/// partitions varies based on the input and the operation performed.
///
/// For example, an `ExecutionPlan` that has output partitioning of 3 will
/// produce 3 distinct output streams as the result of calling
/// `ExecutionPlan::execute(0)`, `ExecutionPlan::execute(1)`, and
/// `ExecutionPlan::execute(2)`, as shown below:
///
/// ```text
/// ... ... ...
/// ... ▲ ▲ ▲
/// │ │ │
/// ▲ │ │ │
/// │ │ │ │
/// │ ┌───┴────┐ ┌───┴────┐ ┌───┴────┐
/// ┌────────────────────┐ │ Stream │ │ Stream │ │ Stream │
/// │ ExecutionPlan │ │ (0) │ │ (1) │ │ (2) │
/// └────────────────────┘ └────────┘ └────────┘ └────────┘
/// ▲ ▲ ▲ ▲
/// │ │ │ │
/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │ │
/// Input │ │ │ │
/// └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │ │
/// ▲ ┌ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ┌ ─ ─ ─ ─
/// │ Input │ Input │ Input │
/// │ │ Stream │ Stream │ Stream
/// (0) │ (1) │ (2) │
/// ... └ ─ ▲ ─ ─ └ ─ ▲ ─ ─ └ ─ ▲ ─ ─
/// │ │ │
/// │ │ │
/// │ │ │
///
/// ExecutionPlan with 1 input 3 (async) streams, one for each
/// that has 3 partitions, which itself output partition
/// has 3 output partitions
/// ```
///
/// It is common (but not required) that an `ExecutionPlan` has the same number
/// of input partitions as output partitons. However, some plans have different
/// numbers such as the `RepartitionExec` that redistributes batches from some
/// number of inputs to some number of outputs
///
/// ```text
/// ... ... ... ...
///
/// ▲ ▲ ▲
/// ▲ │ │ │
/// │ │ │ │
/// ┌────────┴───────────┐ │ │ │
/// │ RepartitionExec │ ┌────┴───┐ ┌────┴───┐ ┌────┴───┐
/// └────────────────────┘ │ Stream │ │ Stream │ │ Stream │
/// ▲ │ (0) │ │ (1) │ │ (2) │
/// │ └────────┘ └────────┘ └────────┘
/// │ ▲ ▲ ▲
/// ... │ │ │
/// └──────────┐│┌──────────┘
/// │││
/// │││
/// RepartitionExec with one input
/// that has 3 partitions, but 3 (async) streams, that internally
/// itself has only 1 output partition pull from the same input stream
/// ...
/// ```
///
/// # Additional Examples
///
/// A simple `FileScanExec` might produce one output stream (partition) for each
/// file (note the actual DataFusion file scaners can read individual files in
/// parallel, potentially producing multiple partitions per file)
///
/// Plans such as `SortPreservingMerge` produce a single output stream
/// (1 output partition) by combining some number of input streams (input partitions)
///
/// Plans such as `FilterExec` produce the same number of output streams
/// (partitions) as input streams (partitions).
///
/// [`ExecutionPlan`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html
/// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
#[derive(Debug, Clone)]
pub enum Partitioning {
/// Allocate batches using a round-robin algorithm and the specified number of partitions
Expand Down Expand Up @@ -126,7 +207,8 @@ impl PartialEq for Partitioning {
}
}

/// Distribution schemes
/// How data is distributed amongst partitions. See [`Partitioning`] for more
/// details.
#[derive(Debug, Clone)]
pub enum Distribution {
/// Unspecified distribution
Expand All @@ -139,7 +221,7 @@ pub enum Distribution {
}

impl Distribution {
/// Creates a Partitioning for this Distribution to satisfy itself
/// Creates a `Partitioning` that satisfies this `Distribution`
pub fn create_partitioning(&self, partition_count: usize) -> Partitioning {
match self {
Distribution::UnspecifiedDistribution => {
Expand Down