diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 8a3ee4b09ba..a9d5963cc40 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -21,6 +21,7 @@ use arrow::array::{TimestampMicrosecondArray, TimestampNanosecondArray}; use arrow::{array::ArrayRef, compute}; use std::sync::Arc; +use std::time::Instant; use std::{any::Any, collections::HashSet}; use async_trait::async_trait; @@ -48,6 +49,7 @@ use crate::error::{DataFusionError, Result}; use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream}; use ahash::RandomState; +use log::debug; // An index of (batch, row) uniquely identifying a row in a part. type Index = (usize, usize); @@ -161,6 +163,8 @@ impl ExecutionPlan for HashJoinExec { match build_side.as_ref() { Some(stream) => stream.clone(), None => { + let start = Instant::now(); + // merge all left parts into a single stream let merge = MergeExec::new(self.left.clone()); let stream = merge.execute(0).await?; @@ -187,8 +191,18 @@ impl ExecutionPlan for HashJoinExec { }) .await?; + let num_rows: usize = + left_data.1.iter().map(|batch| batch.num_rows()).sum(); + let left_side = Arc::new((left_data.0, left_data.1)); *build_side = Some(left_side.clone()); + + debug!( + "Built build-side of hash join containing {} rows in {} ms", + num_rows, + start.elapsed().as_millis() + ); + left_side } } @@ -209,6 +223,11 @@ impl ExecutionPlan for HashJoinExec { join_type: self.join_type, left_data, right: stream, + num_input_batches: 0, + num_input_rows: 0, + num_output_batches: 0, + num_output_rows: 0, + join_time: 0, })) } } @@ -253,6 +272,16 @@ struct HashJoinStream { left_data: JoinLeftData, /// right right: SendableRecordBatchStream, + /// number of input batches + num_input_batches: usize, + /// number of input rows + num_input_rows: usize, + /// number of batches produced + num_output_batches: usize, + /// number of rows produced + num_output_rows: usize, + /// total time for joining probe-side batches to the build-side batches + join_time: usize, } impl RecordBatchStream for HashJoinStream { @@ -542,14 +571,36 @@ impl Stream for HashJoinStream { self.right .poll_next_unpin(cx) .map(|maybe_batch| match maybe_batch { - Some(Ok(batch)) => Some(build_batch( - &batch, - &self.left_data, - &self.on_right, - &self.join_type, - &self.schema, - )), - other => other, + Some(Ok(batch)) => { + let start = Instant::now(); + let result = build_batch( + &batch, + &self.left_data, + &self.on_right, + &self.join_type, + &self.schema, + ); + self.num_input_batches += 1; + self.num_input_rows += batch.num_rows(); + if let Ok(ref batch) = result { + self.join_time += start.elapsed().as_millis() as usize; + self.num_output_batches += 1; + self.num_output_rows += batch.num_rows(); + } + Some(result) + } + other => { + debug!( + "Processed {} probe-side input batches containing {} rows and \ + produced {} output batches containing {} rows in {} ms", + self.num_input_batches, + self.num_input_rows, + self.num_output_batches, + self.num_output_rows, + self.join_time + ); + other + } }) } }