Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] New Local Execution Model #2437

Merged
merged 12 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ daft-dsl = {path = "src/daft-dsl", default-features = false}
daft-execution = {path = "src/daft-execution", default-features = false}
daft-io = {path = "src/daft-io", default-features = false}
daft-json = {path = "src/daft-json", default-features = false}
daft-local-execution = {path = "src/daft-local-execution", default-features = false}
daft-micropartition = {path = "src/daft-micropartition", default-features = false}
daft-parquet = {path = "src/daft-parquet", default-features = false}
daft-plan = {path = "src/daft-plan", default-features = false}
Expand All @@ -31,6 +32,7 @@ python = [
"daft-csv/python",
"daft-dsl/python",
"daft-execution/python",
"daft-local-execution/python",
"daft-io/python",
"daft-json/python",
"daft-micropartition/python",
Expand Down Expand Up @@ -93,6 +95,7 @@ members = [
"src/common/system-info",
"src/daft-core",
"src/daft-execution",
"src/daft-local-execution",
"src/daft-io",
"src/daft-parquet",
"src/daft-csv",
Expand All @@ -118,6 +121,7 @@ bytes = "1.6.0"
chrono = "0.4.38"
chrono-tz = "0.8.4"
comfy-table = "7.1.1"
dyn-clone = "1"
futures = "0.3.30"
html-escape = "0.2.13"
indexmap = "2.1.0"
Expand Down
19 changes: 13 additions & 6 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,20 @@ def run_iter(
# Finalize the logical plan and get a physical plan scheduler for translating the
# physical plan to executable tasks.
plan_scheduler = builder.to_physical_plan_scheduler(daft_execution_config)
psets = {k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()}
# Get executable tasks from planner.
tasks = plan_scheduler.to_partition_tasks(psets)
del psets
with profiler("profile_PyRunner.run_{datetime.now().isoformat()}.json"):
results_gen = self._physical_plan_to_partitions(tasks)
if daft_execution_config.enable_native_executor:
logger.info("Using new executor")
results_gen = plan_scheduler.run(
{k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()}
)
yield from results_gen
else:
psets = {k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()}
# Get executable tasks from planner.
tasks = plan_scheduler.to_partition_tasks(psets)
del psets
with profiler("profile_PyRunner.run_{datetime.now().isoformat()}.json"):
results_gen = self._physical_plan_to_partitions(tasks)
yield from results_gen

def run_iter_tables(
self, builder: LogicalPlanBuilder, results_buffer_size: int | None = None
Expand Down
10 changes: 5 additions & 5 deletions src/daft-execution/src/stage/run.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use std::{collections::HashMap, sync::Arc, thread::JoinHandle};

use common_error::DaftResult;
use daft_dsl::common_treenode::{self, TreeNode};
use daft_micropartition::MicroPartition;
use daft_plan::QueryStageOutput;

use super::{
planner::physical_plan_to_stage,
runner::{ExchangeStageRunner, SinkStageRunner},
};
use crate::{
executor::{
local::{
Expand All @@ -17,11 +22,6 @@ use crate::{
stage::Stage,
};

use super::{
planner::physical_plan_to_stage,
runner::{ExchangeStageRunner, SinkStageRunner},
};

/// Run a stage locally and synchronously, with all tasks executed serially.
pub fn run_local_sync(
query_stage: &QueryStageOutput,
Expand Down
25 changes: 25 additions & 0 deletions src/daft-local-execution/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[dependencies]
common-error = {path = "../common/error", default-features = false}
daft-core = {path = "../daft-core", default-features = false}
daft-dsl = {path = "../daft-dsl", default-features = false}
daft-io = {path = "../daft-io", default-features = false}
daft-micropartition = {path = "../daft-micropartition", default-features = false}
daft-plan = {path = "../daft-plan", default-features = false}
daft-scan = {path = "../daft-scan", default-features = false}
dyn-clone = {workspace = true}
futures = {workspace = true}
lazy_static = {workspace = true}
log = {workspace = true}
pyo3 = {workspace = true, optional = true}
snafu = {workspace = true}
tokio = {workspace = true}
tokio-stream = {workspace = true}

[features]
default = ["python"]
python = ["dep:pyo3", "common-error/python", "daft-core/python", "daft-dsl/python", "daft-io/python", "daft-micropartition/python", "daft-plan/python", "daft-scan/python"]

[package]
edition = {workspace = true}
name = "daft-local-execution"
version = {workspace = true}
71 changes: 71 additions & 0 deletions src/daft-local-execution/src/create_pipeline.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use std::{collections::HashMap, sync::Arc};

use daft_dsl::Expr;
use daft_micropartition::MicroPartition;
use daft_plan::{
physical_ops::{Aggregate, Filter, InMemoryScan, Limit, Project, TabularScan},
PhysicalPlan,
};

use crate::{
intermediate_ops::{filter::FilterOperator, project::ProjectOperator},
pipeline::Pipeline,
sinks::{aggregate::AggregateSink, limit::LimitSink},
sources::{in_memory::InMemorySource, scan_task::ScanTaskSource},
};

pub fn physical_plan_to_pipeline(
physical_plan: &Arc<PhysicalPlan>,
psets: &HashMap<String, Vec<Arc<MicroPartition>>>,
) -> Pipeline {
match physical_plan.as_ref() {
PhysicalPlan::InMemoryScan(InMemoryScan { in_memory_info, .. }) => {
let partitions = psets
.get(&in_memory_info.cache_key)
.expect("Cache key not found");
Pipeline::new(Box::new(InMemorySource::new(partitions.clone())))
}
PhysicalPlan::TabularScan(TabularScan { scan_tasks, .. }) => {
Pipeline::new(Box::new(ScanTaskSource::new(scan_tasks.clone())))
}
PhysicalPlan::Project(Project {
input, projection, ..
}) => {
let current_pipeline = physical_plan_to_pipeline(input, psets);
let proj_op = ProjectOperator::new(projection.clone());
current_pipeline.with_intermediate_operator(Box::new(proj_op))
}
PhysicalPlan::Filter(Filter { input, predicate }) => {
let current_pipeline = physical_plan_to_pipeline(input, psets);
let filter_op = FilterOperator::new(predicate.clone());
current_pipeline.with_intermediate_operator(Box::new(filter_op))
}
PhysicalPlan::Limit(Limit { limit, input, .. }) => {
let current_pipeline = physical_plan_to_pipeline(input, psets);
let sink = LimitSink::new(*limit as usize);
let current_pipeline = current_pipeline.with_sink(Box::new(sink));

Pipeline::new(Box::new(current_pipeline))
}
PhysicalPlan::Aggregate(Aggregate {
input,
aggregations,
groupby,
}) => {
let current_pipeline = physical_plan_to_pipeline(input, psets);
let sink = AggregateSink::new(
aggregations
.iter()
.map(|agg| Arc::new(Expr::Agg(agg.clone())))
.collect::<Vec<_>>(),
groupby.clone(),
);
let current_pipeline = current_pipeline.with_sink(Box::new(sink));

Pipeline::new(Box::new(current_pipeline))
}
_ => {
unimplemented!("Physical plan not supported: {}", physical_plan.name());
}
}
}
30 changes: 30 additions & 0 deletions src/daft-local-execution/src/intermediate_ops/filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use std::sync::Arc;

use common_error::DaftResult;
use daft_dsl::ExprRef;
use daft_micropartition::MicroPartition;

use super::intermediate_op::IntermediateOperator;

#[derive(Clone)]
pub struct FilterOperator {
predicate: ExprRef,
}

impl FilterOperator {
pub fn new(predicate: ExprRef) -> Self {
Self { predicate }
}
}

impl IntermediateOperator for FilterOperator {
fn execute(&self, input: &Arc<MicroPartition>) -> DaftResult<Arc<MicroPartition>> {
log::debug!("FilterOperator::execute");
let out = input.filter(&[self.predicate.clone()])?;
Ok(Arc::new(out))
}

fn name(&self) -> String {
"FilterOperator".to_string()
}
}
11 changes: 11 additions & 0 deletions src/daft-local-execution/src/intermediate_ops/intermediate_op.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use std::sync::Arc;

use common_error::DaftResult;
use daft_micropartition::MicroPartition;

pub trait IntermediateOperator: dyn_clone::DynClone + Send + Sync {
fn execute(&self, input: &Arc<MicroPartition>) -> DaftResult<Arc<MicroPartition>>;
fn name(&self) -> String;
}

dyn_clone::clone_trait_object!(IntermediateOperator);
3 changes: 3 additions & 0 deletions src/daft-local-execution/src/intermediate_ops/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod filter;
pub mod intermediate_op;
pub mod project;
30 changes: 30 additions & 0 deletions src/daft-local-execution/src/intermediate_ops/project.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use std::sync::Arc;

use common_error::DaftResult;
use daft_dsl::ExprRef;
use daft_micropartition::MicroPartition;

use super::intermediate_op::IntermediateOperator;

#[derive(Clone)]
pub struct ProjectOperator {
projection: Vec<ExprRef>,
}

impl ProjectOperator {
pub fn new(projection: Vec<ExprRef>) -> Self {
Self { projection }
}
}

impl IntermediateOperator for ProjectOperator {
fn execute(&self, input: &Arc<MicroPartition>) -> DaftResult<Arc<MicroPartition>> {
log::debug!("ProjectOperator::execute");
let out = input.eval_expression_list(&self.projection)?;
Ok(Arc::new(out))
}

fn name(&self) -> String {
"ProjectOperator".to_string()
}
}
47 changes: 47 additions & 0 deletions src/daft-local-execution/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
mod create_pipeline;
mod intermediate_ops;
mod pipeline;
mod run;
mod sinks;
mod sources;

use std::sync::Arc;

use common_error::{DaftError, DaftResult};
use daft_micropartition::MicroPartition;
pub use run::run_streaming;
use snafu::Snafu;

type Sender = tokio::sync::mpsc::Sender<DaftResult<Arc<MicroPartition>>>;
type Receiver = tokio::sync::mpsc::Receiver<DaftResult<Arc<MicroPartition>>>;

pub fn create_channel() -> (Sender, Receiver) {
tokio::sync::mpsc::channel(1)
}

#[cfg(feature = "python")]
use pyo3::prelude::*;

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error joining spawned task: {}", source))]
JoinError { source: tokio::task::JoinError },
#[snafu(display(
"Sender of OneShot Channel Dropped before sending data over: {}",
source
))]
OneShotRecvError {
source: tokio::sync::oneshot::error::RecvError,
},
}

impl From<Error> for DaftError {
fn from(err: Error) -> DaftError {
DaftError::External(err.into())
}
}

#[cfg(feature = "python")]
pub fn register_modules(_py: Python, _parent: &PyModule) -> PyResult<()> {
Ok(())
}
Loading
Loading