diff --git a/rust/datafusion/src/physical_plan/coalesce_batches.rs b/rust/datafusion/src/physical_plan/coalesce_batches.rs index 1be8c2e8221..2c5209018f9 100644 --- a/rust/datafusion/src/physical_plan/coalesce_batches.rs +++ b/rust/datafusion/src/physical_plan/coalesce_batches.rs @@ -194,7 +194,8 @@ impl RecordBatchStream for CoalesceBatchesStream { } } -fn concat_batches( +/// Concatenates an array of `RecordBatch` into one batch +pub fn concat_batches( schema: &SchemaRef, batches: &[RecordBatch], row_count: usize, diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 50800df9091..31056dc47d0 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -18,8 +18,14 @@ //! Defines the join plan for executing partitions in parallel and then joining the results //! into a set of partitions. -use arrow::array::{TimestampMicrosecondArray, TimestampNanosecondArray}; -use arrow::{array::ArrayRef, compute}; +use arrow::{ + array::{ArrayRef, UInt64Builder}, + compute, +}; +use arrow::{ + array::{TimestampMicrosecondArray, TimestampNanosecondArray, UInt32Builder}, + datatypes::TimeUnit, +}; use std::sync::Arc; use std::time::Instant; use std::{any::Any, collections::HashSet}; @@ -29,8 +35,8 @@ use futures::{Stream, StreamExt, TryStreamExt}; use hashbrown::HashMap; use tokio::sync::Mutex; -use arrow::array::{make_array, Array, MutableArrayData}; -use arrow::datatypes::{DataType, TimeUnit}; +use arrow::array::Array; +use arrow::datatypes::DataType; use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; @@ -48,23 +54,15 @@ use super::{ use crate::error::{DataFusionError, Result}; use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream}; +use crate::physical_plan::coalesce_batches::concat_batches; use ahash::RandomState; use log::debug; -// An index of (batch, row) uniquely identifying a row in a part. -type Index = (usize, usize); -// A pair (left index, right index) -// Note that while this is currently equal to `Index`, the `JoinIndex` is semantically different -// as a left join may issue None indices, in which case -type JoinIndex = Option<(usize, usize)>; -// An index of row uniquely identifying a row in a batch -type RightIndex = Option; - // Maps ["on" value] -> [list of indices with this key's value] // E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true // for rows 3 and 8 from batch 0 and row 6 from batch 1. -type JoinHashMap = HashMap, Vec, RandomState>; -type JoinLeftData = Arc<(JoinHashMap, Vec)>; +type JoinHashMap = HashMap, Vec, RandomState>; +type JoinLeftData = Arc<(JoinHashMap, RecordBatch)>; /// join execution plan executes partitions in parallel and combines them into a set of /// partitions. @@ -84,6 +82,14 @@ pub struct HashJoinExec { build_side: Arc>>, } +/// Information about the index and placement (left or right) of the columns +struct ColumnIndex { + /// Index of the column + index: usize, + /// Whether the column is at the left or right side + is_left: bool, +} + impl HashJoinExec { /// Tries to create a new [HashJoinExec]. /// # Error @@ -119,6 +125,36 @@ impl HashJoinExec { build_side: Arc::new(Mutex::new(None)), }) } + + /// Calculates column indices and left/right placement on input / output schemas and jointype + fn column_indices_from_schema(&self) -> ArrowResult> { + let (primary_is_left, primary_schema, secondary_schema) = match self.join_type { + JoinType::Inner | JoinType::Left => { + (true, self.left.schema(), self.right.schema()) + } + JoinType::Right => (false, self.right.schema(), self.left.schema()), + }; + let mut column_indices = Vec::with_capacity(self.schema.fields().len()); + for field in self.schema.fields() { + let (is_primary, index) = match primary_schema.index_of(field.name()) { + Ok(i) => Ok((true, i)), + Err(_) => { + match secondary_schema.index_of(field.name()) { + Ok(i) => Ok((false, i)), + _ => Err(DataFusionError::Internal( + format!("During execution, the column {} was not found in neither the left or right side of the join", field.name()).to_string() + )) + } + } + }.map_err(DataFusionError::into_arrow_external_error)?; + + let is_left = + is_primary && primary_is_left || !is_primary && !primary_is_left; + column_indices.push(ColumnIndex { index, is_left }); + } + + Ok(column_indices) + } } #[async_trait] @@ -174,27 +210,29 @@ impl ExecutionPlan for HashJoinExec { .iter() .map(|on| on.0.clone()) .collect::>(); - // This operation performs 2 steps at once: // 1. creates a [JoinHashMap] of all batches from the stream // 2. stores the batches in a vector. let initial = (JoinHashMap::default(), Vec::new(), 0); - let left_data = stream + let (hashmap, batches, num_rows) = stream .try_fold(initial, |mut acc, batch| async { let hash = &mut acc.0; let values = &mut acc.1; - let index = acc.2; - update_hash(&on_left, &batch, hash, index).unwrap(); + let offset = acc.2; + update_hash(&on_left, &batch, hash, offset).unwrap(); + acc.2 += batch.num_rows(); values.push(batch); - acc.2 += 1; Ok(acc) }) .await?; - let num_rows: usize = - left_data.1.iter().map(|batch| batch.num_rows()).sum(); + // Merge all batches into a single batch, so we + // can directly index into the arrays + let single_batch = + concat_batches(&batches[0].schema(), &batches, num_rows)?; + + let left_side = Arc::new((hashmap, single_batch)); - let left_side = Arc::new((left_data.0, left_data.1)); *build_side = Some(left_side.clone()); debug!( @@ -217,12 +255,15 @@ impl ExecutionPlan for HashJoinExec { .iter() .map(|on| on.1.clone()) .collect::>(); + + let column_indices = self.column_indices_from_schema()?; Ok(Box::pin(HashJoinStream { schema: self.schema.clone(), on_right, join_type: self.join_type, left_data, right: stream, + column_indices, num_input_batches: 0, num_input_rows: 0, num_output_batches: 0, @@ -238,7 +279,7 @@ fn update_hash( on: &HashSet, batch: &RecordBatch, hash: &mut JoinHashMap, - index: usize, + offset: usize, ) -> Result<()> { // evaluate the keys let keys_values = on @@ -254,8 +295,8 @@ fn update_hash( hash.raw_entry_mut() .from_key(&key) - .and_modify(|_, v| v.push((index, row))) - .or_insert_with(|| (key.clone(), vec![(index, row)])); + .and_modify(|_, v| v.push((row + offset) as u64)) + .or_insert_with(|| (key.clone(), vec![(row + offset) as u64])); } Ok(()) } @@ -272,6 +313,8 @@ struct HashJoinStream { left_data: JoinLeftData, /// right right: SendableRecordBatchStream, + /// Information of index and left / right placement of columns + column_indices: Vec, /// number of input batches num_input_batches: usize, /// number of input rows @@ -297,69 +340,28 @@ impl RecordBatchStream for HashJoinStream { /// * fn build_batch_from_indices( schema: &Schema, - left: &Vec, + left: &RecordBatch, right: &RecordBatch, - join_type: &JoinType, - indices: &[(JoinIndex, RightIndex)], + left_indices: UInt64Array, + right_indices: UInt32Array, + column_indices: &Vec, ) -> ArrowResult { - if left.is_empty() { - todo!("Create empty record batch"); - } - - let (primary_is_left, primary_schema, secondary_schema) = match join_type { - JoinType::Inner | JoinType::Left => (true, left[0].schema(), right.schema()), - JoinType::Right => (false, right.schema(), left[0].schema()), - }; - // build the columns of the new [RecordBatch]: // 1. pick whether the column is from the left or right - // 2. based on the pick, `take` items from the different recordBatches + // 2. based on the pick, `take` items from the different RecordBatches let mut columns: Vec> = Vec::with_capacity(schema.fields().len()); - let right_indices: UInt32Array = - indices.iter().map(|(_, join_index)| join_index).collect(); - - for field in schema.fields() { - // pick the column (left or right) based on the field name. - let (is_primary, column_index) = match primary_schema.index_of(field.name()) { - Ok(i) => Ok((true, i)), - Err(_) => { - match secondary_schema.index_of(field.name()) { - Ok(i) => Ok((false, i)), - _ => Err(DataFusionError::Internal( - format!("During execution, the column {} was not found in neither the left or right side of the join", field.name()).to_string() - )) - } - } - }.map_err(DataFusionError::into_arrow_external_error)?; - - let is_left = - (is_primary && primary_is_left) || (!is_primary && !primary_is_left); - - let array = if is_left { - // Note that we take `.data_ref()` to gather the [ArrayData] of each array. - let arrays = left - .iter() - .map(|batch| batch.column(column_index).data_ref().as_ref()) - .collect::>(); - - let mut mutable = MutableArrayData::new(arrays, true, indices.len()); - // use the left indices - for (join_index, _) in indices { - match join_index { - Some((batch, row)) => mutable.extend(*batch, *row, *row + 1), - None => mutable.extend_nulls(1), - } - } - make_array(Arc::new(mutable.freeze())) + for column_index in column_indices { + let array = if column_index.is_left { + let array = left.column(column_index.index); + compute::take(array.as_ref(), &left_indices, None)? } else { - // use the right indices - let array = right.column(column_index); + let array = right.column(column_index.index); compute::take(array.as_ref(), &right_indices, None)? }; columns.push(array); } - Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?) + RecordBatch::try_new(Arc::new(schema.clone()), columns) } /// Create a key `Vec` that is used as key for the hashmap @@ -374,23 +376,23 @@ pub(crate) fn create_key( match col.data_type() { DataType::UInt8 => { let array = col.as_any().downcast_ref::().unwrap(); - vec.extend(array.value(row).to_le_bytes().iter()); + vec.extend_from_slice(&array.value(row).to_le_bytes()); } DataType::UInt16 => { let array = col.as_any().downcast_ref::().unwrap(); - vec.extend(array.value(row).to_le_bytes().iter()); + vec.extend_from_slice(&array.value(row).to_le_bytes()); } DataType::UInt32 => { let array = col.as_any().downcast_ref::().unwrap(); - vec.extend(array.value(row).to_le_bytes().iter()); + vec.extend_from_slice(&array.value(row).to_le_bytes()); } DataType::UInt64 => { let array = col.as_any().downcast_ref::().unwrap(); - vec.extend(array.value(row).to_le_bytes().iter()); + vec.extend_from_slice(&array.value(row).to_le_bytes()); } DataType::Int8 => { let array = col.as_any().downcast_ref::().unwrap(); - vec.extend(array.value(row).to_le_bytes().iter()); + vec.extend_from_slice(&array.value(row).to_le_bytes()); } DataType::Int16 => { let array = col.as_any().downcast_ref::().unwrap(); @@ -398,33 +400,33 @@ pub(crate) fn create_key( } DataType::Int32 => { let array = col.as_any().downcast_ref::().unwrap(); - vec.extend(array.value(row).to_le_bytes().iter()); + vec.extend_from_slice(&array.value(row).to_le_bytes()); } DataType::Int64 => { let array = col.as_any().downcast_ref::().unwrap(); - vec.extend(array.value(row).to_le_bytes().iter()); + vec.extend_from_slice(&array.value(row).to_le_bytes()); } DataType::Timestamp(TimeUnit::Microsecond, None) => { let array = col .as_any() .downcast_ref::() .unwrap(); - vec.extend(array.value(row).to_le_bytes().iter()); + vec.extend_from_slice(&array.value(row).to_le_bytes()); } DataType::Timestamp(TimeUnit::Nanosecond, None) => { let array = col .as_any() .downcast_ref::() .unwrap(); - vec.extend(array.value(row).to_le_bytes().iter()); + vec.extend_from_slice(&array.value(row).to_le_bytes()); } DataType::Utf8 => { let array = col.as_any().downcast_ref::().unwrap(); let value = array.value(row); // store the size - vec.extend(value.len().to_le_bytes().iter()); + vec.extend_from_slice(&value.len().to_le_bytes()); // store the string value - vec.extend(array.value(row).as_bytes().iter()); + vec.extend_from_slice(value.as_bytes()); } _ => { // This is internal because we should have caught this before. @@ -441,12 +443,21 @@ fn build_batch( batch: &RecordBatch, left_data: &JoinLeftData, on_right: &HashSet, - join_type: &JoinType, + join_type: JoinType, schema: &Schema, + column_indices: &Vec, ) -> ArrowResult { - let indices = build_join_indexes(&left_data.0, &batch, join_type, on_right).unwrap(); - - build_batch_from_indices(schema, &left_data.1, batch, join_type, &indices) + let (left_indices, right_indices) = + build_join_indexes(&left_data.0, &batch, join_type, on_right).unwrap(); + + build_batch_from_indices( + schema, + &left_data.1, + batch, + left_indices, + right_indices, + column_indices, + ) } /// returns a vector with (index from left, index from right). @@ -479,9 +490,9 @@ fn build_batch( fn build_join_indexes( left: &JoinHashMap, right: &RecordBatch, - join_type: &JoinType, + join_type: JoinType, right_on: &HashSet, -) -> Result> { +) -> Result<(UInt64Array, UInt32Array)> { let keys_values = right_on .iter() .map(|name| Ok(col(name).evaluate(right)?.into_array(right.num_rows()))) @@ -489,75 +500,79 @@ fn build_join_indexes( let mut key = Vec::with_capacity(keys_values.len()); + let mut left_indices = UInt64Builder::new(0); + let mut right_indices = UInt32Builder::new(0); + match join_type { JoinType::Inner => { - let mut indexes = Vec::new(); // unknown a prior size - // Visit all of the right rows for row in 0..right.num_rows() { // Get the key and find it in the build index create_key(&keys_values, row, &mut key)?; - let left_indexes = left.get(&key); // for every item on the left and right with this key, add the respective pair - left_indexes.unwrap_or(&vec![]).iter().for_each(|x| { - // on an inner join, left and right indices are present - indexes.push((Some(*x), Some(row as u32))); - }) + if let Some(indices) = left.get(&key) { + left_indices.append_slice(&indices)?; + + for _ in 0..indices.len() { + right_indices.append_value(row as u32)?; + } + } } - Ok(indexes) + Ok((left_indices.finish(), right_indices.finish())) } JoinType::Left => { - let mut indexes = Vec::new(); // unknown a prior size - // Keep track of which item is visited in the build input // TODO: this can be stored more efficiently with a marker + // https://issues.apache.org/jira/browse/ARROW-11116 + // TODO: Fix LEFT join with multiple right batches + // https://issues.apache.org/jira/browse/ARROW-10971 let mut is_visited = HashSet::new(); // First visit all of the rows for row in 0..right.num_rows() { create_key(&keys_values, row, &mut key)?; - let left_indexes = left.get(&key); - if let Some(indices) = left_indexes { + if let Some(indices) = left.get(&key) { is_visited.insert(key.clone()); - - indices.iter().for_each(|x| { - indexes.push((Some(*x), Some(row as u32))); - }) + left_indices.append_slice(&indices)?; + for _ in 0..indices.len() { + right_indices.append_value(row as u32)?; + } }; } // Add the remaining left rows to the result set with None on the right side for (key, indices) in left { if !is_visited.contains(key) { - indices.iter().for_each(|x| { - indexes.push((Some(*x), None)); - }); + left_indices.append_slice(&indices)?; + for _ in 0..indices.len() { + right_indices.append_null()?; + } } } - Ok(indexes) + Ok((left_indices.finish(), right_indices.finish())) } JoinType::Right => { - let mut indexes = Vec::new(); // unknown a prior size for row in 0..right.num_rows() { create_key(&keys_values, row, &mut key)?; - let left_indices = left.get(&key); - - match left_indices { + match left.get(&key) { Some(indices) => { - indices.iter().for_each(|x| { - indexes.push((Some(*x), Some(row as u32))); - }); + left_indices.append_slice(&indices)?; + + for _ in 0..indices.len() { + right_indices.append_value(row as u32)?; + } } None => { // when no match, add the row with None for the left side - indexes.push((None, Some(row as u32))); + left_indices.append_null()?; + right_indices.append_value(row as u32)?; } } } - Ok(indexes) + Ok((left_indices.finish(), right_indices.finish())) } } } @@ -578,8 +593,9 @@ impl Stream for HashJoinStream { &batch, &self.left_data, &self.on_right, - &self.join_type, + self.join_type, &self.schema, + &self.column_indices, ); self.num_input_batches += 1; self.num_input_rows += batch.num_rows();