From 13595e330f27463d7cbe5d4ddd7ec62b597b9806 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 31 Oct 2024 10:54:53 -0400 Subject: [PATCH 1/2] Derive `Clone` for more ExecutionPlans --- datafusion/physical-plan/src/aggregates/mod.rs | 2 +- datafusion/physical-plan/src/coalesce_batches.rs | 2 +- datafusion/physical-plan/src/coalesce_partitions.rs | 2 +- datafusion/physical-plan/src/empty.rs | 2 +- datafusion/physical-plan/src/filter.rs | 2 +- datafusion/physical-plan/src/insert.rs | 1 + datafusion/physical-plan/src/joins/cross_join.rs | 2 +- datafusion/physical-plan/src/joins/hash_join.rs | 2 +- datafusion/physical-plan/src/joins/nested_loop_join.rs | 2 +- datafusion/physical-plan/src/joins/sort_merge_join.rs | 2 +- datafusion/physical-plan/src/joins/symmetric_hash_join.rs | 2 +- datafusion/physical-plan/src/limit.rs | 2 +- datafusion/physical-plan/src/memory.rs | 1 + datafusion/physical-plan/src/placeholder_row.rs | 2 +- datafusion/physical-plan/src/recursive_query.rs | 2 +- datafusion/physical-plan/src/repartition/mod.rs | 2 +- datafusion/physical-plan/src/sorts/sort.rs | 2 +- datafusion/physical-plan/src/sorts/sort_preserving_merge.rs | 2 +- datafusion/physical-plan/src/streaming.rs | 1 + datafusion/physical-plan/src/union.rs | 4 ++-- datafusion/physical-plan/src/unnest.rs | 2 +- datafusion/physical-plan/src/values.rs | 2 +- .../physical-plan/src/windows/bounded_window_agg_exec.rs | 2 +- datafusion/physical-plan/src/windows/window_agg_exec.rs | 2 +- 24 files changed, 25 insertions(+), 22 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 48a03af19dbd..65fbe3ca12c2 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -344,7 +344,7 @@ impl From for SendableRecordBatchStream { } /// Hash aggregate execution plan -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct AggregateExec { /// Aggregation mode (full, partial) mode: AggregateMode, diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 61fb3599f013..11678e7a4696 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -48,7 +48,7 @@ use futures::stream::{Stream, StreamExt}; /// reaches the `fetch` value. /// /// See [`BatchCoalescer`] for more information -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct CoalesceBatchesExec { /// The input plan input: Arc, diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index f9d4ec6a1a34..3da101d6092f 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -36,7 +36,7 @@ use datafusion_execution::TaskContext; /// Merge execution plan executes partitions in parallel and combines them into a single /// partition. No guarantees are made about the order of the resulting partition. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct CoalescePartitionsExec { /// Input execution plan input: Arc, diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index f6e0abb94fa8..192619f69f6a 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -35,7 +35,7 @@ use datafusion_physical_expr::EquivalenceProperties; use log::trace; /// Execution plan for empty relation with produce_one_row=false -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct EmptyExec { /// The schema for the produced row schema: SchemaRef, diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 30b0af19f43b..97d8159137f4 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -54,7 +54,7 @@ use log::trace; /// FilterExec evaluates a boolean predicate against all input batches to determine which rows to /// include in its output batches. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct FilterExec { /// The expression to filter on. This expression must evaluate to a boolean value. predicate: Arc, diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index 8b3ef5ae01e4..e478cecb7ffc 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -79,6 +79,7 @@ pub type FileSinkExec = DataSinkExec; /// Execution plan for writing record batches to a [`DataSink`] /// /// Returns a single row with the number of values written +#[derive(Clone)] pub struct DataSinkExec { /// Input plan that produces the record batches to be written. input: Arc, diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 8f49885068fd..e9b7b109b5bf 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -51,7 +51,7 @@ type JoinLeftData = (RecordBatch, MemoryReservation); /// executes partitions in parallel and combines them into a set of /// partitions by combining all values from the left with all values on the right -#[derive(Debug)] +#[derive(Debug)] // note not Clone because of the OnceAsync pub struct CrossJoinExec { /// left (build) side which gets loaded in memory pub left: Arc, diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 2d11e03814a3..5a9ccd57a3c0 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -293,7 +293,7 @@ impl JoinLeftData { /// │ "dimension" │ │ "fact" │ /// └───────────────┘ └───────────────┘ /// ``` -#[derive(Debug)] +#[derive(Debug)] // note not Clone because of the OnceAsync pub struct HashJoinExec { /// left (build) side which gets hashed pub left: Arc, diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 358ff02473a6..2c041c5a5e35 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -140,7 +140,7 @@ impl JoinLeftData { /// "reports" about probe phase completion (which means that "visited" bitmap won't be /// updated anymore), and only the last thread, reporting about completion, will return output. /// -#[derive(Debug)] +#[derive(Debug)] // note not Clone because of the OnceAsync pub struct NestedLoopJoinExec { /// left side pub(crate) left: Arc, diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index b299b495c504..3346c5ecc235 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -69,7 +69,7 @@ use crate::{ /// join execution plan executes partitions in parallel and combines them into a set of /// partitions. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct SortMergeJoinExec { /// Left sorted joining execution plan pub left: Arc, diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index eb6a30d17e92..4abeee558fe4 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -165,7 +165,7 @@ const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4; /// making the smallest value in 'left_sorted' 1231 and any rows below (since ascending) /// than that can be dropped from the inner buffer. /// ``` -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct SymmetricHashJoinExec { /// Left side stream pub(crate) left: Arc, diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 1fe550a93056..ab1e6cb37bc8 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -39,7 +39,7 @@ use futures::stream::{Stream, StreamExt}; use log::trace; /// Limit execution plan -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct GlobalLimitExec { /// Input execution plan input: Arc, diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index dd4868d1bfcc..00d044b48fe5 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -41,6 +41,7 @@ use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; use futures::Stream; /// Execution plan for reading in-memory batches of data +#[derive(Clone)] pub struct MemoryExec { /// The partitions to query partitions: Vec>, diff --git a/datafusion/physical-plan/src/placeholder_row.rs b/datafusion/physical-plan/src/placeholder_row.rs index 5d8ca7e76935..f9437f46f8a6 100644 --- a/datafusion/physical-plan/src/placeholder_row.rs +++ b/datafusion/physical-plan/src/placeholder_row.rs @@ -37,7 +37,7 @@ use datafusion_physical_expr::EquivalenceProperties; use log::trace; /// Execution plan for empty relation with produce_one_row=true -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct PlaceholderRowExec { /// The schema for the produced row schema: SchemaRef, diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index e9ea9d4f5032..cbf22a4b392f 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -53,7 +53,7 @@ use futures::{ready, Stream, StreamExt}; /// Note that there won't be any limit or checks applied to detect /// an infinite recursion, so it is up to the planner to ensure that /// it won't happen. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct RecursiveQueryExec { /// Name of the query handler name: String, diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 601c1e873152..f8e7c651843d 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -398,7 +398,7 @@ impl BatchPartitioner { /// Paper](https://w6113.github.io/files/papers/volcanoparallelism-89.pdf) /// which uses the term "Exchange" for the concept of repartitioning /// data across threads. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct RepartitionExec { /// Input execution plan input: Arc, diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 921678a4ad92..a3c4a994d6ac 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -673,7 +673,7 @@ pub(crate) fn lexsort_to_indices_multi_columns( /// /// Support sorting datasets that are larger than the memory allotted /// by the memory manager, by spilling to disk. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct SortExec { /// Input schema pub(crate) input: Arc, diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index f17161306c7a..f28b9c70986b 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -70,7 +70,7 @@ use log::{debug, trace}; /// /// If any of the input partitions return an error, the error is propagated to /// the output and inputs are not polled again. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct SortPreservingMergeExec { /// Input plan input: Arc, diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index cdb94af1fe8a..7ccef3248069 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -55,6 +55,7 @@ pub trait PartitionStream: Debug + Send + Sync { /// /// If your source can be represented as one or more [`PartitionStream`]s, you can /// use this struct to implement [`ExecutionPlan`]. +#[derive(Clone)] pub struct StreamingTableExec { partitions: Vec>, projection: Option>, diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 433dda870def..9cf9b5a06b1e 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -85,7 +85,7 @@ use tokio::macros::support::thread_rng_n; /// │Input 1 │ │Input 2 │ /// └─────────────────┘ └──────────────────┘ /// ``` -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct UnionExec { /// Input execution plan inputs: Vec>, @@ -298,7 +298,7 @@ impl ExecutionPlan for UnionExec { /// | |-----------------+ /// +---------+ /// ``` -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct InterleaveExec { /// Input execution plan inputs: Vec>, diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 3e312b7451be..b7b9f17eb1b6 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -56,7 +56,7 @@ use log::trace; /// Thus the original RecordBatch with dimension (n x m) may have new dimension (n' x m') /// /// See [`UnnestOptions`] for more details and an example. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct UnnestExec { /// Input execution plan input: Arc, diff --git a/datafusion/physical-plan/src/values.rs b/datafusion/physical-plan/src/values.rs index 991146d245a7..edadf98cb10c 100644 --- a/datafusion/physical-plan/src/values.rs +++ b/datafusion/physical-plan/src/values.rs @@ -36,7 +36,7 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; /// Execution plan for values list based relation (produces constant rows) -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ValuesExec { /// The schema schema: SchemaRef, diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 6495657339fa..636ea703d118 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -68,7 +68,7 @@ use indexmap::IndexMap; use log::debug; /// Window execution plan -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct BoundedWindowAggExec { /// Input plan input: Arc, diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index afe9700ed08c..0057e4cc76e9 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -47,7 +47,7 @@ use datafusion_physical_expr_common::sort_expr::LexRequirement; use futures::{ready, Stream, StreamExt}; /// Window execution plan -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct WindowAggExec { /// Input plan pub(crate) input: Arc, From b5ffef0249a56ed6ebb62d3e9f61ed56b1ea22e4 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 1 Nov 2024 17:26:42 -0400 Subject: [PATCH 2/2] improve docs --- datafusion/physical-plan/src/joins/cross_join.rs | 7 ++++++- datafusion/physical-plan/src/joins/hash_join.rs | 7 ++++++- datafusion/physical-plan/src/joins/nested_loop_join.rs | 6 +++++- 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index e9b7b109b5bf..a67e1df47bc7 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -49,9 +49,14 @@ use futures::{ready, Stream, StreamExt, TryStreamExt}; /// Data of the left side type JoinLeftData = (RecordBatch, MemoryReservation); +#[allow(rustdoc::private_intra_doc_links)] /// executes partitions in parallel and combines them into a set of /// partitions by combining all values from the left with all values on the right -#[derive(Debug)] // note not Clone because of the OnceAsync +/// +/// Note that the `Clone` trait is not implemented for this struct due to the +/// `left_fut` [`OnceAsync`], which is used to coordinate the loading of the +/// left side with the processing in each output stream. +#[derive(Debug)] pub struct CrossJoinExec { /// left (build) side which gets loaded in memory pub left: Arc, diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 4fa3ac595ed5..57d8a9ce7b35 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -136,6 +136,7 @@ impl JoinLeftData { } } +#[allow(rustdoc::private_intra_doc_links)] /// Join execution plan: Evaluates eqijoin predicates in parallel on multiple /// partitions using a hash table and an optional filter list to apply post /// join. @@ -293,7 +294,11 @@ impl JoinLeftData { /// │ "dimension" │ │ "fact" │ /// └───────────────┘ └───────────────┘ /// ``` -#[derive(Debug)] // note not Clone because of the OnceAsync +/// +/// Note that the `Clone` trait is not implemented for this struct due to the +/// `left_fut` [`OnceAsync`], which is used to coordinate the loading of the +/// left side with the processing in each output stream. +#[derive(Debug)] pub struct HashJoinExec { /// left (build) side which gets hashed pub left: Arc, diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index f6036ee1a235..f36c2395e20f 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -105,6 +105,7 @@ impl JoinLeftData { } } +#[allow(rustdoc::private_intra_doc_links)] /// NestedLoopJoinExec is build-probe join operator, whose main task is to /// perform joins without any equijoin conditions in `ON` clause. /// @@ -140,7 +141,10 @@ impl JoinLeftData { /// "reports" about probe phase completion (which means that "visited" bitmap won't be /// updated anymore), and only the last thread, reporting about completion, will return output. /// -#[derive(Debug)] // note not Clone because of the OnceAsync +/// Note that the `Clone` trait is not implemented for this struct due to the +/// `left_fut` [`OnceAsync`], which is used to coordinate the loading of the +/// left side with the processing in each output stream. +#[derive(Debug)] pub struct NestedLoopJoinExec { /// left side pub(crate) left: Arc,