-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Minor: Improve ExecutionPlan
documentation
#8019
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -91,16 +91,27 @@ pub use datafusion_physical_expr::{ | |
pub use crate::stream::EmptyRecordBatchStream; | ||
pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; | ||
|
||
/// `ExecutionPlan` represent nodes in the DataFusion Physical Plan. | ||
/// Represent nodes in the DataFusion Physical Plan. | ||
/// | ||
/// Each `ExecutionPlan` is partition-aware and is responsible for | ||
/// creating the actual `async` [`SendableRecordBatchStream`]s | ||
/// of [`RecordBatch`] that incrementally compute the operator's | ||
/// output from its input partition. | ||
/// Calling [`execute`] produces an `async` [`SendableRecordBatchStream`] of | ||
/// [`RecordBatch`] that incrementally computes a partition of the | ||
/// `ExecutionPlan`'s output from its input. See [`Partitioning`] for more | ||
/// details on partitioning. | ||
/// | ||
/// Methods such as [`schema`] and [`output_partitioning`] communicate | ||
/// properties of this output to the DataFusion optimizer, and methods such as | ||
/// [`required_input_distribution`] and [`required_input_ordering`] express | ||
/// requirements of the `ExecutionPlan` from its input. | ||
/// | ||
/// [`ExecutionPlan`] can be displayed in a simplified form using the | ||
/// return value from [`displayable`] in addition to the (normally | ||
/// quite verbose) `Debug` output. | ||
/// | ||
/// [`execute`]: ExecutionPlan::execute | ||
/// [`schema`]: ExecutionPlan::schema | ||
/// [`output_partitioning`]: ExecutionPlan::output_partitioning | ||
/// [`required_input_distribution`]: ExecutionPlan::required_input_distribution | ||
/// [`required_input_ordering`]: ExecutionPlan::required_input_ordering | ||
pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { | ||
/// Returns the execution plan as [`Any`] so that it can be | ||
/// downcast to a specific implementation. | ||
|
@@ -109,7 +120,8 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { | |
/// Get the schema for this execution plan | ||
fn schema(&self) -> SchemaRef; | ||
|
||
/// Specifies the output partitioning scheme of this plan | ||
/// Specifies how the output of this `ExecutionPlan` is split into | ||
/// partitions. | ||
fn output_partitioning(&self) -> Partitioning; | ||
|
||
/// Specifies whether this plan generates an infinite stream of records. | ||
|
@@ -123,25 +135,27 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { | |
} | ||
} | ||
|
||
/// If the output of this operator within each partition is sorted, | ||
/// If the output of this `ExecutionPlan` within each partition is sorted, | ||
/// returns `Some(keys)` with the description of how it was sorted. | ||
/// | ||
/// For example, Sort, (obviously) produces sorted output as does | ||
/// SortPreservingMergeStream. Less obviously `Projection` | ||
/// produces sorted output if its input was sorted as it does not | ||
/// reorder the input rows, | ||
/// | ||
/// It is safe to return `None` here if your operator does not | ||
/// It is safe to return `None` here if your `ExecutionPlan` does not | ||
/// have any particular output order here | ||
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>; | ||
|
||
/// Specifies the data distribution requirements for all the | ||
/// children for this operator, By default it's [[Distribution::UnspecifiedDistribution]] for each child, | ||
/// children for this `ExecutionPlan`, By default it's [[Distribution::UnspecifiedDistribution]] for each child, | ||
fn required_input_distribution(&self) -> Vec<Distribution> { | ||
vec![Distribution::UnspecifiedDistribution; self.children().len()] | ||
} | ||
|
||
/// Specifies the ordering requirements for all of the children | ||
/// Specifies the ordering required for all of the children of this | ||
/// `ExecutionPlan`. | ||
/// | ||
/// For each child, it's the local ordering requirement within | ||
/// each partition rather than the global ordering | ||
/// | ||
|
@@ -152,7 +166,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { | |
vec![None; self.children().len()] | ||
} | ||
|
||
/// Returns `false` if this operator's implementation may reorder | ||
/// Returns `false` if this `ExecutionPlan`'s implementation may reorder | ||
/// rows within or between partitions. | ||
/// | ||
/// For example, Projection, Filter, and Limit maintain the order | ||
|
@@ -166,19 +180,21 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { | |
/// The default implementation returns `false` | ||
/// | ||
/// WARNING: if you override this default, you *MUST* ensure that | ||
/// the operator's maintains the ordering invariant or else | ||
/// the `ExecutionPlan`'s maintains the ordering invariant or else | ||
/// DataFusion may produce incorrect results. | ||
fn maintains_input_order(&self) -> Vec<bool> { | ||
vec![false; self.children().len()] | ||
} | ||
|
||
/// Specifies whether the operator benefits from increased parallelization | ||
/// at its input for each child. If set to `true`, this indicates that the | ||
/// operator would benefit from partitioning its corresponding child | ||
/// (and thus from more parallelism). For operators that do very little work | ||
/// the overhead of extra parallelism may outweigh any benefits | ||
/// Specifies whether the `ExecutionPlan` benefits from increased | ||
/// parallelization at its input for each child. | ||
/// | ||
/// The default implementation returns `true` unless this operator | ||
/// If returns `true`, the `ExecutionPlan` would benefit from partitioning | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
/// its corresponding child (and thus from more parallelism). For | ||
/// `ExecutionPlan` that do very little work the overhead of extra | ||
/// parallelism may outweigh any benefits | ||
/// | ||
/// The default implementation returns `true` unless this `ExecutionPlan` | ||
/// has signalled it requires a single child input partition. | ||
fn benefits_from_input_partitioning(&self) -> Vec<bool> { | ||
// By default try to maximize parallelism with more CPUs if | ||
|
@@ -199,12 +215,14 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { | |
OrderingEquivalenceProperties::new(self.schema()) | ||
} | ||
|
||
/// Get a list of child execution plans that provide the input for this plan. The returned list | ||
/// will be empty for leaf nodes, will contain a single value for unary nodes, or two | ||
/// values for binary nodes (such as joins). | ||
/// Get a list of `ExecutionPlan` that provide input for this plan. The | ||
/// returned list will be empty for leaf nodes such as scans, will contain a | ||
/// single value for unary nodes, or two values for binary nodes (such as | ||
/// joins). | ||
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>>; | ||
|
||
/// Returns a new plan where all children were replaced by new plans. | ||
/// Returns a new `ExecutionPlan` where all existing children were replaced | ||
/// by the `children`, oi order | ||
fn with_new_children( | ||
self: Arc<Self>, | ||
children: Vec<Arc<dyn ExecutionPlan>>, | ||
|
@@ -235,13 +253,40 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { | |
Ok(None) | ||
} | ||
|
||
/// Begin execution of `partition`, returning a stream of [`RecordBatch`]es. | ||
/// Begin execution of `partition`, returning a [`Stream`] of | ||
/// [`RecordBatch`]es. | ||
/// | ||
/// # Notes | ||
/// | ||
/// The `execute` method itself is not `async` but it returns an `async` | ||
/// [`futures::stream::Stream`]. This `Stream` should incrementally compute | ||
/// the output, `RecordBatch` by `RecordBatch` (in a streaming fashion). | ||
/// Most `ExecutionPlan`s should not do any work before the first | ||
/// `RecordBatch` is requested from the stream. | ||
/// | ||
/// [`RecordBatchStreamAdapter`] can be used to convert an `async` | ||
/// [`Stream`] into a [`SendableRecordBatchStream`]. | ||
/// | ||
/// Using `async` `Streams` allows for network I/O during execution and | ||
/// takes advantage of Rust's built in support for `async` continuations and | ||
/// crate ecosystem. | ||
/// | ||
/// [`Stream`]: futures::stream::Stream | ||
/// [`StreamExt`]: futures::stream::StreamExt | ||
/// [`TryStreamExt`]: futures::stream::TryStreamExt | ||
/// [`RecordBatchStreamAdapter`]: crate::stream::RecordBatchStreamAdapter | ||
/// | ||
/// # Implementation Examples | ||
/// | ||
/// ## Return Precomputed Batch | ||
/// While `async` `Stream`s have a non trivial learning curve, the | ||
/// [`futures`] crate provides [`StreamExt`] and [`TryStreamExt`] | ||
/// which help simplify many common operations. | ||
/// | ||
/// We can return a precomputed batch as a stream | ||
/// Here are some common patterns: | ||
/// | ||
/// ## Return Precomputed `RecordBatch` | ||
/// | ||
/// We can return a precomputed `RecordBatch` as a `Stream`: | ||
/// | ||
/// ``` | ||
/// # use std::sync::Arc; | ||
|
@@ -261,16 +306,17 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { | |
/// partition: usize, | ||
/// context: Arc<TaskContext> | ||
/// ) -> Result<SendableRecordBatchStream> { | ||
/// // use functions from futures crate convert the batch into a stream | ||
/// let fut = futures::future::ready(Ok(self.batch.clone())); | ||
/// let stream = futures::stream::once(fut); | ||
/// Ok(Box::pin(RecordBatchStreamAdapter::new(self.batch.schema(), stream))) | ||
/// } | ||
/// } | ||
/// ``` | ||
/// | ||
/// ## Async Compute Batch | ||
/// ## Lazily (async) Compute `RecordBatch` | ||
/// | ||
/// We can also lazily compute a RecordBatch when the returned stream is polled | ||
/// We can also lazily compute a `RecordBatch` when the returned `Stream` is polled | ||
/// | ||
/// ``` | ||
/// # use std::sync::Arc; | ||
|
@@ -284,6 +330,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { | |
/// schema: SchemaRef, | ||
/// } | ||
/// | ||
/// /// Returns a single batch when the returned stream is polled | ||
/// async fn get_batch() -> Result<RecordBatch> { | ||
/// todo!() | ||
/// } | ||
|
@@ -301,10 +348,10 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { | |
/// } | ||
/// ``` | ||
/// | ||
/// ## Async Compute Batch Stream | ||
/// ## Lazily (async) create a Stream | ||
/// | ||
/// We can lazily compute a RecordBatch stream when the returned stream is polled | ||
/// flattening the result into a single stream | ||
/// If you need to to create the return `Stream` using an `async` function, | ||
/// you can do so by flattening the result: | ||
/// | ||
/// ``` | ||
/// # use std::sync::Arc; | ||
|
@@ -319,6 +366,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { | |
/// schema: SchemaRef, | ||
/// } | ||
/// | ||
/// /// async function that returns a stream | ||
/// async fn get_batch_stream() -> Result<SendableRecordBatchStream> { | ||
/// todo!() | ||
/// } | ||
|
@@ -337,9 +385,6 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { | |
/// } | ||
/// } | ||
/// ``` | ||
/// | ||
/// See [`futures::stream::StreamExt`] and [`futures::stream::TryStreamExt`] for further | ||
/// combinators that can be used with streams | ||
fn execute( | ||
&self, | ||
partition: usize, | ||
|
@@ -372,7 +417,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { | |
/// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful | ||
/// especially for the distributed engine to judge whether need to deal with shuffling. | ||
/// Currently there are 3 kinds of execution plan which needs data exchange | ||
/// 1. RepartitionExec for changing the partition number between two operators | ||
/// 1. RepartitionExec for changing the partition number between two `ExecutionPlan`s | ||
/// 2. CoalescePartitionsExec for collapsing all of the partitions into one without ordering guarantee | ||
/// 3. SortPreservingMergeExec for collapsing all of the sorted partitions into one with ordering guarantee | ||
pub fn need_data_exchange(plan: Arc<dyn ExecutionPlan>) -> bool { | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when we talk about partition we mean a separate task(thread) that processes a chunk of input data by applying
.execute()
method?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a link to
Partitioning
in this PR and I have made another PR to try and document better what partitioning means: #8022 -- please take a look when you have time.