Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHORE] Add error snafus for local executor #2660

Merged
merged 4 commits into from
Aug 17, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions src/daft-local-execution/src/intermediate_ops/intermediate_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,15 @@ impl IntermediateNode {
for _ in 0..num_senders {
let (worker_sender, worker_receiver) = create_single_channel(1);
let destination_sender = destination.get_next_sender();
runtime_handle.spawn(Self::run_worker(
self.intermediate_op.clone(),
worker_receiver,
destination_sender,
self.runtime_stats.clone(),
));
runtime_handle.spawn(
Self::run_worker(
self.intermediate_op.clone(),
worker_receiver,
destination_sender,
self.runtime_stats.clone(),
),
self.intermediate_op.name(),
);
worker_senders.push(worker_sender);
}
worker_senders
Expand Down Expand Up @@ -152,7 +155,7 @@ impl PipelineNode for IntermediateNode {
&mut self,
mut destination: MultiSender,
runtime_handle: &mut ExecutionRuntimeHandle,
) -> DaftResult<()> {
) -> crate::Result<()> {
assert_eq!(
self.children.len(),
1,
Expand All @@ -166,7 +169,7 @@ impl PipelineNode for IntermediateNode {
child.start(sender, runtime_handle).await?;

let worker_senders = self.spawn_workers(&mut destination, runtime_handle).await;
runtime_handle.spawn(Self::send_to_workers(receiver, worker_senders));
runtime_handle.spawn(Self::send_to_workers(receiver, worker_senders), self.name());
Ok(())
}
fn as_tree_display(&self) -> &dyn TreeDisplay {
Expand Down
30 changes: 25 additions & 5 deletions src/daft-local-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ mod sources;
use common_error::{DaftError, DaftResult};
use lazy_static::lazy_static;
pub use run::NativeExecutor;
use snafu::futures::TryFutureExt;
use snafu::Snafu;

lazy_static! {
pub static ref NUM_CPUS: usize = std::thread::available_parallelism().unwrap().get();
}

pub struct ExecutionRuntimeHandle {
pub worker_set: tokio::task::JoinSet<DaftResult<()>>,
pub worker_set: tokio::task::JoinSet<crate::Result<()>>,
}

impl Default for ExecutionRuntimeHandle {
Expand All @@ -34,11 +34,14 @@ impl ExecutionRuntimeHandle {
pub fn spawn(
&mut self,
task: impl std::future::Future<Output = DaftResult<()>> + Send + 'static,
node_name: &str,
) {
self.worker_set.spawn(task);
let node_name = node_name.to_string();
self.worker_set
.spawn(task.with_context(|_| PipelineExecutionSnafu { node_name }));
}

pub async fn join_next(&mut self) -> Option<Result<DaftResult<()>, tokio::task::JoinError>> {
pub async fn join_next(&mut self) -> Option<Result<crate::Result<()>, tokio::task::JoinError>> {
self.worker_set.join_next().await
}

Expand All @@ -63,14 +66,31 @@ pub enum Error {
OneShotRecvError {
source: tokio::sync::oneshot::error::RecvError,
},
#[snafu(display("Error creating pipeline from {}: {}", plan_name, source))]
PipelineCreationError {
source: DaftError,
plan_name: String,
},
#[snafu(display("Error when running pipeline node {}: {}", node_name, source))]
PipelineExecutionError {
source: DaftError,
node_name: String,
},
}

impl From<Error> for DaftError {
fn from(err: Error) -> DaftError {
DaftError::External(err.into())
match err {
Error::PipelineCreationError { .. } | Error::PipelineExecutionError { .. } => {
DaftError::InternalError(err.to_string())
}
_ => DaftError::External(err.into()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably don't want to nest the execution error under DaftError::External otherwise the conversion of the DaftError to the correct PythonError won't happen.

For now, we can unnest the daft error, log where it came from and then return that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done and updated PR description example. Assumed you meant not to nest DaftErrors under DaftError::InternalError, the Externals are for the tokio oneshot and join errors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh whoops, yeah looks good now!

}
}
}

type Result<T, E = Error> = std::result::Result<T, E>;

#[cfg(feature = "python")]
pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> {
parent.add_class::<NativeExecutor>()?;
Expand Down
13 changes: 8 additions & 5 deletions src/daft-local-execution/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ use crate::{
streaming_sink::StreamingSinkNode,
},
sources::in_memory::InMemorySource,
ExecutionRuntimeHandle,
ExecutionRuntimeHandle, PipelineCreationSnafu,
};

use async_trait::async_trait;
use common_display::{mermaid::MermaidDisplayVisitor, tree::TreeDisplay};
use common_error::DaftResult;
use daft_dsl::Expr;
use daft_micropartition::MicroPartition;
use daft_physical_plan::{
Filter, HashAggregate, HashJoin, InMemoryScan, Limit, LocalPhysicalPlan, Project, Sort,
UnGroupedAggregate,
};
use daft_plan::populate_aggregation_stages;
use snafu::ResultExt;

use crate::channel::MultiSender;

Expand All @@ -38,7 +38,7 @@ pub trait PipelineNode: Sync + Send + TreeDisplay {
&mut self,
destination: MultiSender,
runtime_handle: &mut ExecutionRuntimeHandle,
) -> DaftResult<()>;
) -> crate::Result<()>;

fn as_tree_display(&self) -> &dyn TreeDisplay;
}
Expand All @@ -58,7 +58,7 @@ pub(crate) fn viz_pipeline(root: &dyn PipelineNode) -> String {
pub fn physical_plan_to_pipeline(
physical_plan: &LocalPhysicalPlan,
psets: &HashMap<String, Vec<Arc<MicroPartition>>>,
) -> DaftResult<Box<dyn PipelineNode>> {
) -> crate::Result<Box<dyn PipelineNode>> {
use crate::sources::scan_task::ScanTaskSource;
use daft_physical_plan::PhysicalScan;
let out: Box<dyn PipelineNode> = match physical_plan {
Expand Down Expand Up @@ -199,7 +199,10 @@ pub fn physical_plan_to_pipeline(
*join_type,
left_schema,
right_schema,
)?;
)
.with_context(|_| PipelineCreationSnafu {
plan_name: physical_plan.name(),
})?;
HashJoinNode::new(sink, left_node, right_node).boxed()
}
_ => {
Expand Down
4 changes: 2 additions & 2 deletions src/daft-local-execution/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pub fn run_local(
.expect("Failed to create tokio runtime");

let res = runtime.block_on(async {
let mut pipeline = physical_plan_to_pipeline(physical_plan, &psets).unwrap();
let mut pipeline = physical_plan_to_pipeline(physical_plan, &psets)?;
let (sender, mut receiver) = create_channel(1, true);

let mut runtime_handle = ExecutionRuntimeHandle::default();
Expand All @@ -135,7 +135,7 @@ pub fn run_local(
match result {
Ok(Err(e)) => {
runtime_handle.shutdown().await;
return DaftResult::Err(e);
return DaftResult::Err(e.into());
}
Err(e) => {
runtime_handle.shutdown().await;
Expand Down
47 changes: 25 additions & 22 deletions src/daft-local-execution/src/sinks/blocking_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,36 +82,39 @@ impl PipelineNode for BlockingSinkNode {
&mut self,
mut destination: MultiSender,
runtime_handle: &mut ExecutionRuntimeHandle,
) -> DaftResult<()> {
) -> crate::Result<()> {
let (sender, mut streaming_receiver) = create_channel(*NUM_CPUS, true);
// now we can start building the right side
let child = self.child.as_mut();
child.start(sender, runtime_handle).await?;
let op = self.op.clone();

let rt_context = self.runtime_stats.clone();
runtime_handle.spawn(async move {
let span = info_span!("BlockingSinkNode::execute");
let mut guard = op.lock().await;
while let Some(val) = streaming_receiver.recv().await {
rt_context.mark_rows_received(val.len() as u64);
if let BlockingSinkStatus::Finished =
rt_context.in_span(&span, || guard.sink(&val))?
{
break;
runtime_handle.spawn(
async move {
let span = info_span!("BlockingSinkNode::execute");
let mut guard = op.lock().await;
while let Some(val) = streaming_receiver.recv().await {
rt_context.mark_rows_received(val.len() as u64);
if let BlockingSinkStatus::Finished =
rt_context.in_span(&span, || guard.sink(&val))?
{
break;
}
}
}
let finalized_result = rt_context
.in_span(&info_span!("BlockingSinkNode::finalize"), || {
guard.finalize()
})?;
if let Some(part) = finalized_result {
let len = part.len();
let _ = destination.get_next_sender().send(part).await;
rt_context.mark_rows_emitted(len as u64);
}
Ok(())
});
let finalized_result = rt_context
.in_span(&info_span!("BlockingSinkNode::finalize"), || {
guard.finalize()
})?;
if let Some(part) = finalized_result {
let len = part.len();
let _ = destination.get_next_sender().send(part).await;
rt_context.mark_rows_emitted(len as u64);
}
Ok(())
},
self.name(),
);
Ok(())
}
fn as_tree_display(&self) -> &dyn TreeDisplay {
Expand Down
46 changes: 26 additions & 20 deletions src/daft-local-execution/src/sinks/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
intermediate_ops::intermediate_op::{IntermediateNode, IntermediateOperator},
pipeline::PipelineNode,
runtime_stats::RuntimeStatsContext,
ExecutionRuntimeHandle, NUM_CPUS,
ExecutionRuntimeHandle, JoinSnafu, PipelineExecutionSnafu, NUM_CPUS,
};
use async_trait::async_trait;
use common_display::tree::TreeDisplay;
Expand All @@ -18,6 +18,7 @@ use daft_core::{
use daft_dsl::ExprRef;
use daft_micropartition::MicroPartition;
use daft_plan::JoinType;
use snafu::{futures::TryFutureExt, ResultExt};
use tracing::info_span;

use super::blocking_sink::{BlockingSink, BlockingSinkStatus};
Expand Down Expand Up @@ -296,33 +297,38 @@ impl PipelineNode for HashJoinNode {
&mut self,
mut destination: MultiSender,
runtime_handle: &mut ExecutionRuntimeHandle,
) -> DaftResult<()> {
) -> crate::Result<()> {
let (sender, mut pt_receiver) = create_channel(*NUM_CPUS, false);
self.left.start(sender, runtime_handle).await?;
let hash_join = self.hash_join.clone();
let build_runtime_stats = self.build_runtime_stats.clone();
let probe_table_build = tokio::spawn(async move {
let span = info_span!("ProbeTable::sink");
let mut guard = hash_join.lock().await;
let sink = guard.as_sink();
while let Some(val) = pt_receiver.recv().await {
build_runtime_stats.mark_rows_received(val.len() as u64);
if let BlockingSinkStatus::Finished =
build_runtime_stats.in_span(&span, || sink.sink(&val))?
{
break;
let name = self.name();
let probe_table_build = tokio::spawn(
async move {
let span = info_span!("ProbeTable::sink");
let mut guard = hash_join.lock().await;
let sink = guard.as_sink();
while let Some(val) = pt_receiver.recv().await {
build_runtime_stats.mark_rows_received(val.len() as u64);
if let BlockingSinkStatus::Finished =
build_runtime_stats.in_span(&span, || sink.sink(&val))?
{
break;
}
}
build_runtime_stats
.in_span(&info_span!("ProbeTable::finalize"), || sink.finalize())?;
DaftResult::Ok(())
}
build_runtime_stats.in_span(&info_span!("ProbeTable::finalize"), || sink.finalize())?;
DaftResult::Ok(())
});
.with_context(move |_| PipelineExecutionSnafu { node_name: name }),
);
// should wrap in context join handle

let (right_sender, streaming_receiver) = create_channel(*NUM_CPUS, destination.in_order());
// now we can start building the right side
self.right.start(right_sender, runtime_handle).await?;

probe_table_build.await.unwrap()?;
probe_table_build.await.context(JoinSnafu {})??;

let hash_join = self.hash_join.clone();
let probing_op = {
Expand All @@ -337,10 +343,10 @@ impl PipelineNode for HashJoinNode {
let worker_senders = probing_node
.spawn_workers(&mut destination, runtime_handle)
.await;
runtime_handle.spawn(IntermediateNode::send_to_workers(
streaming_receiver,
worker_senders,
));
runtime_handle.spawn(
IntermediateNode::send_to_workers(streaming_receiver, worker_senders),
self.name(),
);
Ok(())
}

Expand Down
Loading
Loading