Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 59 additions & 8 deletions rust/datafusion/src/physical_plan/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use arrow::array::{TimestampMicrosecondArray, TimestampNanosecondArray};
use arrow::{array::ArrayRef, compute};
use std::sync::Arc;
use std::time::Instant;
use std::{any::Any, collections::HashSet};

use async_trait::async_trait;
Expand Down Expand Up @@ -48,6 +49,7 @@ use crate::error::{DataFusionError, Result};

use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream};
use ahash::RandomState;
use log::debug;

// An index of (batch, row) uniquely identifying a row in a part.
type Index = (usize, usize);
Expand Down Expand Up @@ -161,6 +163,8 @@ impl ExecutionPlan for HashJoinExec {
match build_side.as_ref() {
Some(stream) => stream.clone(),
None => {
let start = Instant::now();

// merge all left parts into a single stream
let merge = MergeExec::new(self.left.clone());
let stream = merge.execute(0).await?;
Expand All @@ -187,8 +191,18 @@ impl ExecutionPlan for HashJoinExec {
})
.await?;

let num_rows: usize =
left_data.1.iter().map(|batch| batch.num_rows()).sum();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we want to put this behind a flag so we don't do this in the macro so we don't compute the num_rows when not being logged?


let left_side = Arc::new((left_data.0, left_data.1));
*build_side = Some(left_side.clone());

debug!(
"Built build-side of hash join containing {} rows in {} ms",
num_rows,
start.elapsed().as_millis()
);

left_side
}
}
Expand All @@ -209,6 +223,11 @@ impl ExecutionPlan for HashJoinExec {
join_type: self.join_type,
left_data,
right: stream,
num_input_batches: 0,
num_input_rows: 0,
num_output_batches: 0,
num_output_rows: 0,
join_time: 0,
}))
}
}
Expand Down Expand Up @@ -253,6 +272,16 @@ struct HashJoinStream {
left_data: JoinLeftData,
/// right
right: SendableRecordBatchStream,
/// number of input batches
num_input_batches: usize,
/// number of input rows
num_input_rows: usize,
/// number of batches produced
num_output_batches: usize,
/// number of rows produced
num_output_rows: usize,
/// total time for joining probe-side batches to the build-side batches
join_time: usize,
}

impl RecordBatchStream for HashJoinStream {
Expand Down Expand Up @@ -542,14 +571,36 @@ impl Stream for HashJoinStream {
self.right
.poll_next_unpin(cx)
.map(|maybe_batch| match maybe_batch {
Some(Ok(batch)) => Some(build_batch(
&batch,
&self.left_data,
&self.on_right,
&self.join_type,
&self.schema,
)),
other => other,
Some(Ok(batch)) => {
let start = Instant::now();
let result = build_batch(
&batch,
&self.left_data,
&self.on_right,
&self.join_type,
&self.schema,
);
self.num_input_batches += 1;
self.num_input_rows += batch.num_rows();
if let Ok(ref batch) = result {
self.join_time += start.elapsed().as_millis() as usize;
self.num_output_batches += 1;
self.num_output_rows += batch.num_rows();
}
Some(result)
}
other => {
debug!(
"Processed {} probe-side input batches containing {} rows and \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can make this a bit less verbose? Processed {} rows probe-side and {} output rows in {} ms or something like that?

produced {} output batches containing {} rows in {} ms",
self.num_input_batches,
self.num_input_rows,
self.num_output_batches,
self.num_output_rows,
self.join_time
);
other
}
})
}
}
Expand Down