diff --git a/src/daft-local-execution/src/lib.rs b/src/daft-local-execution/src/lib.rs index 8db6b11957..005e623dcc 100644 --- a/src/daft-local-execution/src/lib.rs +++ b/src/daft-local-execution/src/lib.rs @@ -1,5 +1,6 @@ mod channel; mod intermediate_ops; +mod pipeline; mod run; mod sinks; mod sources; diff --git a/src/daft-local-execution/src/pipeline.rs b/src/daft-local-execution/src/pipeline.rs new file mode 100644 index 0000000000..d00f25a23a --- /dev/null +++ b/src/daft-local-execution/src/pipeline.rs @@ -0,0 +1,185 @@ +use std::{collections::HashMap, sync::Arc}; + +use daft_dsl::Expr; +use daft_micropartition::MicroPartition; +use daft_physical_plan::{ + Concat, Filter, InMemoryScan, Limit, LocalPhysicalPlan, PhysicalScan, Project, + UnGroupedAggregate, +}; +use daft_plan::populate_aggregation_stages; + +use crate::{ + channel::MultiSender, + intermediate_ops::{ + aggregate::AggregateOperator, + filter::FilterOperator, + intermediate_op::{run_intermediate_op, IntermediateOperator}, + project::ProjectOperator, + }, + sinks::{ + aggregate::AggregateSink, + concat::ConcatSink, + limit::LimitSink, + sink::{run_double_input_sink, run_single_input_sink, DoubleInputSink, SingleInputSink}, + }, + sources::{ + in_memory::InMemorySource, + scan_task::ScanTaskSource, + source::{run_source, Source}, + }, +}; + +pub enum PipelineNode { + Source { + source: Arc, + }, + IntermediateOp { + intermediate_op: Box, + child: Box, + }, + SingleInputSink { + sink: Box, + child: Box, + }, + DoubleInputSink { + sink: Box, + left_child: Box, + right_child: Box, + }, +} + +impl PipelineNode { + pub fn start(&self, sender: MultiSender) { + match self { + PipelineNode::Source { source } => { + run_source(source.clone(), sender); + } + PipelineNode::IntermediateOp { + intermediate_op, + child, + } => { + let sender = run_intermediate_op(intermediate_op.clone(), sender); + child.start(sender); + } + PipelineNode::SingleInputSink { sink, child } => { + let sender = run_single_input_sink(sink.clone(), sender); + child.start(sender); + } + PipelineNode::DoubleInputSink { + sink, + left_child, + right_child, + } => { + let (left_sender, right_sender) = run_double_input_sink(sink.clone(), sender); + left_child.start(left_sender); + right_child.start(right_sender); + } + } + } +} + +pub fn physical_plan_to_pipeline( + physical_plan: &LocalPhysicalPlan, + psets: &HashMap>>, +) -> PipelineNode { + match physical_plan { + LocalPhysicalPlan::PhysicalScan(PhysicalScan { scan_tasks, .. }) => { + let scan_task_source = ScanTaskSource::new(scan_tasks.clone()); + PipelineNode::Source { + source: Arc::new(scan_task_source), + } + } + LocalPhysicalPlan::InMemoryScan(InMemoryScan { info, .. }) => { + let partitions = psets.get(&info.cache_key).expect("Cache key not found"); + let in_memory_source = InMemorySource::new(partitions.clone()); + PipelineNode::Source { + source: Arc::new(in_memory_source), + } + } + LocalPhysicalPlan::Project(Project { + input, projection, .. + }) => { + let proj_op = ProjectOperator::new(projection.clone()); + let child_node = physical_plan_to_pipeline(input, psets); + PipelineNode::IntermediateOp { + intermediate_op: Box::new(proj_op), + child: Box::new(child_node), + } + } + LocalPhysicalPlan::Filter(Filter { + input, predicate, .. + }) => { + let filter_op = FilterOperator::new(predicate.clone()); + let child_node = physical_plan_to_pipeline(input, psets); + PipelineNode::IntermediateOp { + intermediate_op: Box::new(filter_op), + child: Box::new(child_node), + } + } + LocalPhysicalPlan::Limit(Limit { + input, num_rows, .. + }) => { + let sink = LimitSink::new(*num_rows as usize); + let child_node = physical_plan_to_pipeline(input, psets); + PipelineNode::SingleInputSink { + sink: Box::new(sink), + child: Box::new(child_node), + } + } + LocalPhysicalPlan::Concat(Concat { input, other, .. }) => { + let sink = ConcatSink::new(); + let left_child = physical_plan_to_pipeline(input, psets); + let right_child = physical_plan_to_pipeline(other, psets); + PipelineNode::DoubleInputSink { + sink: Box::new(sink), + left_child: Box::new(left_child), + right_child: Box::new(right_child), + } + } + LocalPhysicalPlan::UnGroupedAggregate(UnGroupedAggregate { + input, + aggregations, + schema, + .. + }) => { + let (first_stage_aggs, second_stage_aggs, final_exprs) = + populate_aggregation_stages(aggregations, schema, &[]); + let first_stage_agg_op = AggregateOperator::new( + first_stage_aggs + .values() + .cloned() + .map(|e| Arc::new(Expr::Agg(e.clone()))) + .collect(), + vec![], + ); + let second_stage_agg_sink = AggregateSink::new( + second_stage_aggs + .values() + .cloned() + .map(|e| Arc::new(Expr::Agg(e.clone()))) + .collect(), + vec![], + ); + let final_stage_project = ProjectOperator::new(final_exprs); + + let child_node = physical_plan_to_pipeline(input, psets); + let intermediate_agg_op_node = PipelineNode::IntermediateOp { + intermediate_op: Box::new(first_stage_agg_op), + child: Box::new(child_node), + }; + + let sink_node = PipelineNode::SingleInputSink { + sink: Box::new(second_stage_agg_sink), + child: Box::new(intermediate_agg_op_node), + }; + + PipelineNode::IntermediateOp { + intermediate_op: Box::new(final_stage_project), + child: Box::new(sink_node), + } + } + _ => { + unimplemented!("Physical plan not supported: {}", physical_plan.name()); + } + } +} diff --git a/src/daft-local-execution/src/run.rs b/src/daft-local-execution/src/run.rs index 96cc4e1c9a..ef152c1e48 100644 --- a/src/daft-local-execution/src/run.rs +++ b/src/daft-local-execution/src/run.rs @@ -1,13 +1,8 @@ use std::{collections::HashMap, sync::Arc}; use common_error::DaftResult; -use daft_dsl::Expr; use daft_micropartition::MicroPartition; -use daft_physical_plan::{ - translate, Concat, Filter, InMemoryScan, Limit, LocalPhysicalPlan, PhysicalScan, Project, - UnGroupedAggregate, -}; -use daft_plan::populate_aggregation_stages; +use daft_physical_plan::{translate, LocalPhysicalPlan}; #[cfg(feature = "python")] use { @@ -16,20 +11,7 @@ use { pyo3::{pyclass, pymethods, IntoPy, PyObject, PyRef, PyRefMut, PyResult, Python}, }; -use crate::{ - channel::{create_channel, MultiSender}, - intermediate_ops::{ - aggregate::AggregateOperator, filter::FilterOperator, intermediate_op::run_intermediate_op, - project::ProjectOperator, - }, - sinks::{ - aggregate::AggregateSink, - concat::ConcatSink, - limit::LimitSink, - sink::{run_double_input_sink, run_single_input_sink}, - }, - sources::{in_memory::InMemorySource, scan_task::ScanTaskSource, source::run_source}, -}; +use crate::{channel::create_channel, pipeline::physical_plan_to_pipeline}; #[cfg(feature = "python")] #[pyclass] @@ -103,8 +85,9 @@ pub fn run_local( ) -> DaftResult>> + Send>> { let runtime = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime"); let res = runtime.block_on(async { + let pipeline = physical_plan_to_pipeline(physical_plan, &psets); let (sender, mut receiver) = create_channel(1, true); - run_physical_plan(physical_plan, &psets, sender); + pipeline.start(sender); let mut result = vec![]; while let Some(val) = receiver.recv().await { result.push(val); @@ -113,83 +96,3 @@ pub fn run_local( }); Ok(Box::new(res)) } - -pub fn run_physical_plan( - physical_plan: &LocalPhysicalPlan, - psets: &HashMap>>, - sender: MultiSender, -) { - match physical_plan { - LocalPhysicalPlan::PhysicalScan(PhysicalScan { scan_tasks, .. }) => { - run_source(Arc::new(ScanTaskSource::new(scan_tasks.clone())), sender); - } - LocalPhysicalPlan::InMemoryScan(InMemoryScan { info, .. }) => { - let partitions = psets.get(&info.cache_key).expect("Cache key not found"); - run_source(Arc::new(InMemorySource::new(partitions.clone())), sender); - } - LocalPhysicalPlan::Project(Project { - input, projection, .. - }) => { - let proj_op = ProjectOperator::new(projection.clone()); - let next_sender = run_intermediate_op(Box::new(proj_op), sender); - run_physical_plan(input, psets, next_sender); - } - LocalPhysicalPlan::Filter(Filter { - input, predicate, .. - }) => { - let filter_op = FilterOperator::new(predicate.clone()); - let next_sender = run_intermediate_op(Box::new(filter_op), sender); - run_physical_plan(input, psets, next_sender); - } - LocalPhysicalPlan::Limit(Limit { - input, num_rows, .. - }) => { - let sink = LimitSink::new(*num_rows as usize); - let sink_sender = run_single_input_sink(Box::new(sink), sender); - run_physical_plan(input, psets, sink_sender); - } - LocalPhysicalPlan::Concat(Concat { input, other, .. }) => { - let sink = ConcatSink::new(); - let (left_sender, right_sender) = run_double_input_sink(Box::new(sink), sender); - - run_physical_plan(input, psets, left_sender); - run_physical_plan(other, psets, right_sender); - } - LocalPhysicalPlan::UnGroupedAggregate(UnGroupedAggregate { - input, - aggregations, - schema, - .. - }) => { - let (first_stage_aggs, second_stage_aggs, final_exprs) = - populate_aggregation_stages(aggregations, schema, &[]); - - let final_stage_project = ProjectOperator::new(final_exprs); - let next_sender = run_intermediate_op(Box::new(final_stage_project), sender); - - let second_stage_agg_sink = AggregateSink::new( - second_stage_aggs - .values() - .cloned() - .map(|e| Arc::new(Expr::Agg(e.clone()))) - .collect(), - vec![], - ); - let next_sender = run_single_input_sink(Box::new(second_stage_agg_sink), next_sender); - - let first_stage_agg_op = AggregateOperator::new( - first_stage_aggs - .values() - .cloned() - .map(|e| Arc::new(Expr::Agg(e.clone()))) - .collect(), - vec![], - ); - let next_sender = run_intermediate_op(Box::new(first_stage_agg_op), next_sender); - run_physical_plan(input, psets, next_sender); - } - _ => { - unimplemented!("Physical plan not supported: {}", physical_plan.name()); - } - } -}