Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: Add more documentation about Partitioning #8022

Merged
merged 11 commits into from
Nov 5, 2023
89 changes: 85 additions & 4 deletions datafusion/physical-expr/src/partitioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,94 @@
// specific language governing permissions and limitations
// under the License.

//! [`Partitioning`] and [`Distribution`] for physical expressions
//! [`Partitioning`] and [`Distribution`] for `ExecutionPlans`

use std::fmt;
use std::sync::Arc;

use crate::{expr_list_eq_strict_order, EquivalenceProperties, PhysicalExpr};

/// Partitioning schemes supported by operators.
/// Output partitioning supported by [`ExecutionPlan`]s.
///
/// When `executed`, `ExecutionPlan`s produce one or more independent stream of
/// data batches in parallel, referred to as partitions. The streams are Rust
/// `aync` [`Stream`]s (a special kind of future). The number of output
/// partitions varies based on the input and the operation performed.
///
/// For example, an `ExecutionPlan` that has output partitioning of 3 will
/// produce 3 distinct output streams as the result of calling
/// `ExecutionPlan::execute(0)`, `ExecutionPlan::execute(1)`, and
/// `ExecutionPlan::execute(2)`, as shown below:
///
/// ```text
/// ... ... ...
/// ... ▲ ▲ ▲
/// │ │ │
/// ▲ │ │ │
/// │ │ │ │
/// │ ┌───┴────┐ ┌───┴────┐ ┌───┴────┐
/// ┌────────────────────┐ │ Stream │ │ Stream │ │ Stream │
/// │ ExecutionPlan │ │ (0) │ │ (1) │ │ (2) │
/// └────────────────────┘ └────────┘ └────────┘ └────────┘
/// ▲ ▲ ▲ ▲
/// │ │ │ │
/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │ │
/// Input │ │ │ │
/// └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │ │
/// ▲ ┌ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ┌ ─ ─ ─ ─
/// │ Input │ Input │ Input │
/// │ │ Stream │ Stream │ Stream
/// (0) │ (1) │ (2) │
/// ... └ ─ ▲ ─ ─ └ ─ ▲ ─ ─ └ ─ ▲ ─ ─
/// │ │ │
/// │ │ │
/// │ │ │
///
// ExecutionPlan with 3 output 3 (async) streams, one for each
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we have 3 arrows in the diagram above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am struggling with how to draw this -- the ExecutionPlan has a single child input (another ExecutionPlan) but both its child and itself produce 3 output partitions / streams

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering should we pursue this or not if its not feasible to draw?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I purposely drew the execution plan with one arrow in and one arrow out to represent the fact it has one input child input ExecutionPlan and can be itself an input to another plan. I tried to update the text to make this clearer.

// partitions output partition
/// ```
///
/// It is common (but not required) that an `ExecutionPlan` has the same number
/// of input partitions as output partitons. However, some plans have different
/// numbers such as the `RepartitionExec` that redistributes batches from some
/// number of inputs to some number of outputs
///
/// ```text
/// ... ... ... ...
///
/// ▲ ▲ ▲
/// ▲ │ │ │
/// │ │ │ │
/// ┌────────┴───────────┐ │ │ │
/// │ RepartitionExec │ ┌────┴───┐ ┌────┴───┐ ┌────┴───┐
/// └────────────────────┘ │ Stream │ │ Stream │ │ Stream │
/// ▲ │ (0) │ │ (1) │ │ (2) │
/// │ └────────┘ └────────┘ └────────┘
/// │ ▲ ▲ ▲
/// ... │ │ │
/// └──────────┐│┌──────────┘
/// │││
/// │││
/// RepartitionExec with 3 output
/// partitions and 1 input partition 3 (async) streams, that internally
/// pull from the same input stream
/// ...
/// ```
///
/// # Additional Examples
///
/// A simple `FileScanExec` might produce one output stream (partition) for each
/// file (note the actual DataFusion file scaners can read individual files in
/// parallel, potentially producing multiple partitions per file)
///
/// Plans such as `SortPreservingMerge` produce a single output stream
/// (1 output partition) by combining some number of input streams (input partitions)
///
/// Plans such as `FilterExec` produce the same number of output streams
/// (partitions) as input streams (partitions).
///
/// [`ExecutionPlan`]: crate::physical_plan::ExecutionPlan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably can add that every partition is the future by itself, and execution plan node is considered as completed when each partition(future) is completed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably phrase it differently, like "the result of executing a Partition is a async stream (a kind of future)" or something. I'll try and clarify

/// [`Stream`]: futures::stream::Stream
#[derive(Debug, Clone)]
pub enum Partitioning {
/// Allocate batches using a round-robin algorithm and the specified number of partitions
Expand Down Expand Up @@ -129,7 +209,8 @@ impl PartialEq for Partitioning {
}
}

/// Distribution schemes
/// How data is distributed amongst partitions. See [`Partitioning`] for more
/// details.
#[derive(Debug, Clone)]
pub enum Distribution {
/// Unspecified distribution
Expand All @@ -142,7 +223,7 @@ pub enum Distribution {
}

impl Distribution {
/// Creates a Partitioning for this Distribution to satisfy itself
/// Creates a `Partitioning` that satisfies this `Distribution`
pub fn create_partitioning(&self, partition_count: usize) -> Partitioning {
match self {
Distribution::UnspecifiedDistribution => {
Expand Down
Loading