Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Add concat to new execution model + buffered intermediate ops #2519

Merged
merged 2 commits into from
Jul 17, 2024

Conversation

colin-ho
Copy link
Contributor

This PR adds Concat and buffered intermediate ops to the new execution model, in addition to some refactors to support this change.

  • There are now two types of sinks, single input (limit, agg) and double input (concat, join).
  • Intermediate ops can now buffer their outputs via OperatorTaskState.
  • Add a channel abstraction for in-order vs out-of-order channels. The in-order channel uses a round robin implementation over a vec of channels with capacity 1, while the out-of-order channel just uses a single MPSC channel with capacity n.
  • Removed Pipelines and opt for Actors. Each actor has control over a single source / op / sink, can control it's own parallelism, and can also configure it's own input channel.

@github-actions github-actions bot added the enhancement New feature or request label Jul 16, 2024
Copy link

codecov bot commented Jul 16, 2024

Codecov Report

Attention: Patch coverage is 0% with 402 lines in your changes missing coverage. Please review.

Please upload report for BASE (main@924c905). Learn more about missing BASE report.
Report is 1 commits behind head on main.

Additional details and impacted files

Impacted file tree graph

@@           Coverage Diff           @@
##             main    #2519   +/-   ##
=======================================
  Coverage        ?   63.22%           
=======================================
  Files           ?      972           
  Lines           ?   108363           
  Branches        ?        0           
=======================================
  Hits            ?    68517           
  Misses          ?    39846           
  Partials        ?        0           
Files Coverage Δ
src/daft-local-execution/src/lib.rs 50.00% <ø> (ø)
src/daft-local-execution/src/sinks/aggregate.rs 0.00% <0.00%> (ø)
src/daft-local-execution/src/sinks/limit.rs 0.00% <0.00%> (ø)
src/daft-local-execution/src/sources/in_memory.rs 0.00% <0.00%> (ø)
src/daft-physical-plan/src/translate.rs 0.00% <0.00%> (ø)
src/daft-local-execution/src/sources/scan_task.rs 0.00% <0.00%> (ø)
src/daft-physical-plan/src/local_plan.rs 0.00% <0.00%> (ø)
src/daft-local-execution/src/sources/source.rs 0.00% <0.00%> (ø)
src/daft-local-execution/src/sinks/concat.rs 0.00% <0.00%> (ø)
src/daft-local-execution/src/sinks/sink.rs 0.00% <0.00%> (ø)
... and 3 more

pub trait IntermediateOperator: dyn_clone::DynClone + Send + Sync {
fn execute(&self, input: &Arc<MicroPartition>) -> DaftResult<Arc<MicroPartition>>;
fn name(&self) -> &'static str;
}

dyn_clone::clone_trait_object!(IntermediateOperator);

/// The number of rows that will trigger an intermediate operator to output its data.
pub const OUTPUT_THRESHOLD: usize = 1000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have this as an env variable with a default


pub fn add(&mut self, part: Arc<MicroPartition>) {
self.buffer.push(part);
self.curr_len += 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be len of part

pub const OUTPUT_THRESHOLD: usize = 1000;

/// State of an operator task, used to buffer data and output it when a threshold is reached.
pub struct OperatorTaskState {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No action needed but we should refactor this to be a trait that is serde-able so diff operators can have diff impls.

}
}

async fn single_operator_task(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run_single_pipeline

Ok(())
}

pub async fn run(&mut self) -> DaftResult<()> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run_parallel

if let Some(s) = inner_task_senders.get(curr_sender_idx) {
let _ = s.send(morsel).await;
} else {
let next_sender = self.sender.get_next_sender();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't exist so create the task runner.

let (single_sender, single_receiver) = create_single_channel(1);

let op = self.op.clone();
tokio::spawn(async move {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't have any tokio::spawn in our code. have a util function that is called
spawn_compute(...)

tokio::spawn(async move {
let _ = Self::single_operator_task(single_receiver, next_sender, op).await;
});
let _ = single_sender.send(morsel).await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

? this error.

}
}

pub fn blocking_recv(&mut self) -> Option<DaftResult<Arc<MicroPartition>>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this?

Ok(Box::new(res))
}

pub fn run_physical_plan(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decouple, building pipeline and starting execution.

  1. build full pipeline
  2. run start for each op
  3. then wait on result

@colin-ho colin-ho merged commit eb040ce into main Jul 17, 2024
45 checks passed
@colin-ho colin-ho deleted the colin/execution-refactor branch July 17, 2024 00:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants