Skip to content

Commit

Permalink
fix merge
Browse files Browse the repository at this point in the history
  • Loading branch information
Colin Ho authored and Colin Ho committed Aug 16, 2024
1 parent 74c477a commit 99c299f
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 97 deletions.
15 changes: 9 additions & 6 deletions src/daft-local-execution/src/intermediate_ops/intermediate_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,15 @@ impl IntermediateNode {
for _ in 0..num_senders {
let (worker_sender, worker_receiver) = create_single_channel(1);
let destination_sender = destination.get_next_sender();
runtime_handle.spawn(Self::run_worker(
self.intermediate_op.clone(),
worker_receiver,
destination_sender,
self.runtime_stats.clone(),
));
runtime_handle.spawn(
Self::run_worker(
self.intermediate_op.clone(),
worker_receiver,
destination_sender,
self.runtime_stats.clone(),
),
self.intermediate_op.name(),
);
worker_senders.push(worker_sender);
}
worker_senders
Expand Down
3 changes: 1 addition & 2 deletions src/daft-local-execution/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use crate::{

use async_trait::async_trait;
use common_display::{mermaid::MermaidDisplayVisitor, tree::TreeDisplay};
use common_error::DaftResult;
use daft_dsl::Expr;
use daft_micropartition::MicroPartition;
use daft_physical_plan::{
Expand All @@ -39,7 +38,7 @@ pub trait PipelineNode: Sync + Send + TreeDisplay {
&mut self,
destination: MultiSender,
runtime_handle: &mut ExecutionRuntimeHandle,
) -> DaftResult<()>;
) -> crate::Result<()>;

fn as_tree_display(&self) -> &dyn TreeDisplay;
}
Expand Down
45 changes: 24 additions & 21 deletions src/daft-local-execution/src/sinks/blocking_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,28 +90,31 @@ impl PipelineNode for BlockingSinkNode {
let op = self.op.clone();

let rt_context = self.runtime_stats.clone();
runtime_handle.spawn(async move {
let span = info_span!("BlockingSinkNode::execute");
let mut guard = op.lock().await;
while let Some(val) = streaming_receiver.recv().await {
rt_context.mark_rows_received(val.len() as u64);
if let BlockingSinkStatus::Finished =
rt_context.in_span(&span, || guard.sink(&val))?
{
break;
runtime_handle.spawn(
async move {
let span = info_span!("BlockingSinkNode::execute");
let mut guard = op.lock().await;
while let Some(val) = streaming_receiver.recv().await {
rt_context.mark_rows_received(val.len() as u64);
if let BlockingSinkStatus::Finished =
rt_context.in_span(&span, || guard.sink(&val))?
{
break;
}
}
}
let finalized_result = rt_context
.in_span(&info_span!("BlockingSinkNode::finalize"), || {
guard.finalize()
})?;
if let Some(part) = finalized_result {
let len = part.len();
let _ = destination.get_next_sender().send(part).await;
rt_context.mark_rows_emitted(len as u64);
}
Ok(())
});
let finalized_result = rt_context
.in_span(&info_span!("BlockingSinkNode::finalize"), || {
guard.finalize()
})?;
if let Some(part) = finalized_result {
let len = part.len();
let _ = destination.get_next_sender().send(part).await;
rt_context.mark_rows_emitted(len as u64);
}
Ok(())
},
self.name(),
);
Ok(())
}
fn as_tree_display(&self) -> &dyn TreeDisplay {
Expand Down
33 changes: 19 additions & 14 deletions src/daft-local-execution/src/sinks/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
intermediate_ops::intermediate_op::{IntermediateNode, IntermediateOperator},
pipeline::PipelineNode,
runtime_stats::RuntimeStatsContext,
ExecutionRuntimeHandle, NUM_CPUS,
ExecutionRuntimeHandle, JoinSnafu, PipelineExecutionSnafu, NUM_CPUS,
};
use async_trait::async_trait;
use common_display::tree::TreeDisplay;
Expand Down Expand Up @@ -302,21 +302,26 @@ impl PipelineNode for HashJoinNode {
self.left.start(sender, runtime_handle).await?;
let hash_join = self.hash_join.clone();
let build_runtime_stats = self.build_runtime_stats.clone();
let probe_table_build = tokio::spawn(async move {
let span = info_span!("ProbeTable::sink");
let mut guard = hash_join.lock().await;
let sink = guard.as_sink();
while let Some(val) = pt_receiver.recv().await {
build_runtime_stats.mark_rows_received(val.len() as u64);
if let BlockingSinkStatus::Finished =
build_runtime_stats.in_span(&span, || sink.sink(&val))?
{
break;
let name = self.name();
let probe_table_build = tokio::spawn(
async move {
let span = info_span!("ProbeTable::sink");
let mut guard = hash_join.lock().await;
let sink = guard.as_sink();
while let Some(val) = pt_receiver.recv().await {
build_runtime_stats.mark_rows_received(val.len() as u64);
if let BlockingSinkStatus::Finished =
build_runtime_stats.in_span(&span, || sink.sink(&val))?
{
break;
}
}
build_runtime_stats
.in_span(&info_span!("ProbeTable::finalize"), || sink.finalize())?;
DaftResult::Ok(())
}
build_runtime_stats.in_span(&info_span!("ProbeTable::finalize"), || sink.finalize())?;
DaftResult::Ok(())
});
.with_context(move |_| PipelineExecutionSnafu { node_name: name }),
);
// should wrap in context join handle

let (right_sender, streaming_receiver) = create_channel(*NUM_CPUS, destination.in_order());
Expand Down
65 changes: 34 additions & 31 deletions src/daft-local-execution/src/sinks/streaming_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,47 +99,50 @@ impl PipelineNode for StreamingSinkNode {
child.start(sender, runtime_handle).await?;
let op = self.op.clone();
let runtime_stats = self.runtime_stats.clone();
runtime_handle.spawn(async move {
// this should be a RWLock and run in concurrent workers
let span = info_span!("StreamingSink::execute");
runtime_handle.spawn(
async move {
// this should be a RWLock and run in concurrent workers
let span = info_span!("StreamingSink::execute");

let mut sink = op.lock().await;
let mut is_active = true;
while is_active && let Some(val) = streaming_receiver.recv().await {
runtime_stats.mark_rows_received(val.len() as u64);
loop {
let result = runtime_stats.in_span(&span, || sink.execute(0, &val))?;
match result {
StreamSinkOutput::HasMoreOutput(mp) => {
let len = mp.len() as u64;
let sender = destination.get_next_sender();
sender.send(mp).await.unwrap();
runtime_stats.mark_rows_emitted(len);
}
StreamSinkOutput::NeedMoreInput(mp) => {
if let Some(mp) = mp {
let mut sink = op.lock().await;
let mut is_active = true;
while is_active && let Some(val) = streaming_receiver.recv().await {
runtime_stats.mark_rows_received(val.len() as u64);
loop {
let result = runtime_stats.in_span(&span, || sink.execute(0, &val))?;
match result {
StreamSinkOutput::HasMoreOutput(mp) => {
let len = mp.len() as u64;
let sender = destination.get_next_sender();
sender.send(mp).await.unwrap();
runtime_stats.mark_rows_emitted(len);
}
break;
}
StreamSinkOutput::Finished(mp) => {
if let Some(mp) = mp {
let len = mp.len() as u64;
let sender = destination.get_next_sender();
sender.send(mp).await.unwrap();
runtime_stats.mark_rows_emitted(len);
StreamSinkOutput::NeedMoreInput(mp) => {
if let Some(mp) = mp {
let len = mp.len() as u64;
let sender = destination.get_next_sender();
sender.send(mp).await.unwrap();
runtime_stats.mark_rows_emitted(len);
}
break;
}
StreamSinkOutput::Finished(mp) => {
if let Some(mp) = mp {
let len = mp.len() as u64;
let sender = destination.get_next_sender();
sender.send(mp).await.unwrap();
runtime_stats.mark_rows_emitted(len);
}
is_active = false;
break;
}
is_active = false;
break;
}
}
}
}
DaftResult::Ok(())
});
DaftResult::Ok(())
},
self.name(),
);
Ok(())
}
fn as_tree_display(&self) -> &dyn TreeDisplay {
Expand Down
22 changes: 12 additions & 10 deletions src/daft-local-execution/src/sources/in_memory.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::sync::Arc;

use crate::{channel::MultiSender, runtime_stats::RuntimeStatsContext, ExecutionRuntimeHandle};
use common_error::DaftResult;
use daft_io::IOStatsRef;
use daft_micropartition::MicroPartition;
use tracing::instrument;
Expand Down Expand Up @@ -29,16 +28,19 @@ impl Source for InMemorySource {
runtime_handle: &mut ExecutionRuntimeHandle,
runtime_stats: Arc<RuntimeStatsContext>,
_io_stats: IOStatsRef,
) -> DaftResult<()> {
) -> crate::Result<()> {
let data = self.data.clone();
runtime_handle.spawn(async move {
for part in data {
let len = part.len();
let _ = destination.get_next_sender().send(part).await;
runtime_stats.mark_rows_emitted(len as u64);
}
Ok(())
});
runtime_handle.spawn(
async move {
for part in data {
let len = part.len();
let _ = destination.get_next_sender().send(part).await;
runtime_stats.mark_rows_emitted(len as u64);
}
Ok(())
},
self.name(),
);
Ok(())
}
fn name(&self) -> &'static str {
Expand Down
21 changes: 12 additions & 9 deletions src/daft-local-execution/src/sources/scan_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,22 @@ impl Source for ScanTaskSource {
runtime_handle: &mut ExecutionRuntimeHandle,
runtime_stats: Arc<RuntimeStatsContext>,
io_stats: IOStatsRef,
) -> DaftResult<()> {
) -> crate::Result<()> {
let morsel_size = DEFAULT_MORSEL_SIZE;
let maintain_order = destination.in_order();
for scan_task in self.scan_tasks.clone() {
let sender = destination.get_next_sender();
runtime_handle.spawn(Self::process_scan_task_stream(
scan_task,
sender,
morsel_size,
maintain_order,
io_stats.clone(),
runtime_stats.clone(),
));
runtime_handle.spawn(
Self::process_scan_task_stream(
scan_task,
sender,
morsel_size,
maintain_order,
io_stats.clone(),
runtime_stats.clone(),
),
self.name(),
);
}
Ok(())
}
Expand Down
6 changes: 2 additions & 4 deletions src/daft-local-execution/src/sources/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use daft_micropartition::MicroPartition;
use futures::stream::BoxStream;

use async_trait::async_trait;
use snafu::ResultExt;

use crate::{
channel::MultiSender, pipeline::PipelineNode, runtime_stats::RuntimeStatsContext,
Expand All @@ -24,8 +23,7 @@ pub(crate) trait Source: Send + Sync {
runtime_handle: &mut ExecutionRuntimeHandle,
runtime_stats: Arc<RuntimeStatsContext>,
io_stats: IOStatsRef,
) -> DaftResult<()>;
fn name(&self) -> &'static str;
) -> crate::Result<()>;
}

struct SourceNode {
Expand Down Expand Up @@ -78,7 +76,7 @@ impl PipelineNode for SourceNode {
&mut self,
destination: MultiSender,
runtime_handle: &mut ExecutionRuntimeHandle,
) -> DaftResult<()> {
) -> crate::Result<()> {
self.source.get_data(
destination,
runtime_handle,
Expand Down

0 comments on commit 99c299f

Please sign in to comment.