From 19c84d784d5e9b9ab901e33a3a20011c00bd72ba Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 31 Dec 2020 14:22:59 +0100 Subject: [PATCH 01/19] Calculate column indices upfront --- .../datafusion/src/physical_plan/hash_join.rs | 84 ++++++++++++------- 1 file changed, 54 insertions(+), 30 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 9ac7447a8ab..2cf82d21e65 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -202,12 +202,20 @@ impl ExecutionPlan for HashJoinExec { .iter() .map(|on| on.1.clone()) .collect::>(); + + let column_indices = column_indices_from_schema( + &self.left.schema(), + &self.right.schema(), + &self.schema(), + self.join_type, + )?; Ok(Box::pin(HashJoinStream { schema: self.schema.clone(), on_right, join_type: self.join_type, left_data, right: stream, + column_indices, })) } } @@ -252,6 +260,8 @@ struct HashJoinStream { left_data: JoinLeftData, /// right right: SendableRecordBatchStream, + /// Information of index and left / right placement of columns + column_indices: Vec<(usize, bool)>, } impl RecordBatchStream for HashJoinStream { @@ -260,6 +270,39 @@ impl RecordBatchStream for HashJoinStream { } } +/// Calculates column index based on input / output schemas and jointype +fn column_indices_from_schema( + left: &Schema, + right: &Schema, + output_schema: &Schema, + join_type: JoinType, +) -> ArrowResult> { + let (primary_is_left, primary_schema, secondary_schema) = match join_type { + JoinType::Inner | JoinType::Left => (true, left, right), + JoinType::Right => (false, right, left), + }; + let mut column_indices = vec![]; + for field in output_schema.fields() { + let (is_primary, column_index) = match primary_schema.index_of(field.name()) { + Ok(i) => Ok((true, i)), + Err(_) => { + match secondary_schema.index_of(field.name()) { + Ok(i) => Ok((false, i)), + _ => Err(DataFusionError::Internal( + format!("During execution, the column {} was not found in neither the left or right side of the join", field.name()).to_string() + )) + } + } + }.map_err(DataFusionError::into_arrow_external_error)?; + + let is_left = + (is_primary && primary_is_left) || (!is_primary && !primary_is_left); + column_indices.push((column_index, is_left)); + } + + Ok(column_indices) +} + /// Returns a new [RecordBatch] by combining the `left` and `right` according to `indices`. /// The resulting batch has [Schema] `schema`. /// # Error @@ -269,18 +312,13 @@ fn build_batch_from_indices( schema: &Schema, left: &Vec, right: &RecordBatch, - join_type: &JoinType, indices: &[(JoinIndex, RightIndex)], + column_indices: &Vec<(usize, bool)>, ) -> ArrowResult { if left.is_empty() { todo!("Create empty record batch"); } - let (primary_is_left, primary_schema, secondary_schema) = match join_type { - JoinType::Inner | JoinType::Left => (true, left[0].schema(), right.schema()), - JoinType::Right => (false, right.schema(), left[0].schema()), - }; - // build the columns of the new [RecordBatch]: // 1. pick whether the column is from the left or right // 2. based on the pick, `take` items from the different recordBatches @@ -288,28 +326,12 @@ fn build_batch_from_indices( let right_indices = indices.iter().map(|(_, join_index)| join_index).collect(); - for field in schema.fields() { - // pick the column (left or right) based on the field name. - let (is_primary, column_index) = match primary_schema.index_of(field.name()) { - Ok(i) => Ok((true, i)), - Err(_) => { - match secondary_schema.index_of(field.name()) { - Ok(i) => Ok((false, i)), - _ => Err(DataFusionError::Internal( - format!("During execution, the column {} was not found in neither the left or right side of the join", field.name()).to_string() - )) - } - } - }.map_err(DataFusionError::into_arrow_external_error)?; - - let is_left = - (is_primary && primary_is_left) || (!is_primary && !primary_is_left); - - let array = if is_left { + for (column_index, is_left) in column_indices { + let array = if *is_left { // Note that we take `.data_ref()` to gather the [ArrayData] of each array. let arrays = left .iter() - .map(|batch| batch.column(column_index).data_ref().as_ref()) + .map(|batch| batch.column(*column_index).data_ref().as_ref()) .collect::>(); let mut mutable = MutableArrayData::new(arrays, true, indices.len()); @@ -323,7 +345,7 @@ fn build_batch_from_indices( make_array(Arc::new(mutable.freeze())) } else { // use the right indices - let array = right.column(column_index); + let array = right.column(*column_index); compute::take(array.as_ref(), &right_indices, None)? }; columns.push(array); @@ -396,12 +418,13 @@ fn build_batch( batch: &RecordBatch, left_data: &JoinLeftData, on_right: &HashSet, - join_type: &JoinType, + join_type: JoinType, schema: &Schema, + column_indices: &Vec<(usize, bool)>, ) -> ArrowResult { let indices = build_join_indexes(&left_data.0, &batch, join_type, on_right).unwrap(); - build_batch_from_indices(schema, &left_data.1, batch, join_type, &indices) + build_batch_from_indices(schema, &left_data.1, batch, &indices, column_indices) } /// returns a vector with (index from left, index from right). @@ -434,7 +457,7 @@ fn build_batch( fn build_join_indexes( left: &JoinHashMap, right: &RecordBatch, - join_type: &JoinType, + join_type: JoinType, right_on: &HashSet, ) -> Result> { let keys_values = right_on @@ -531,8 +554,9 @@ impl Stream for HashJoinStream { &batch, &self.left_data, &self.on_right, - &self.join_type, + self.join_type, &self.schema, + &self.column_indices, )), other => other, }) From 45fa819569442951694fdb9cbb2a1dd223e41fcf Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 31 Dec 2020 15:58:04 +0100 Subject: [PATCH 02/19] Make function part of HashJoinExec --- .../datafusion/src/physical_plan/hash_join.rs | 70 ++++++++----------- 1 file changed, 31 insertions(+), 39 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 2cf82d21e65..566152f27b9 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -116,6 +116,36 @@ impl HashJoinExec { build_side: Arc::new(Mutex::new(None)), }) } + + /// Calculates column indices and left/right placement on input / output schemas and jointype + fn column_indices_from_schema(&self) -> ArrowResult> { + let (primary_is_left, primary_schema, secondary_schema) = match self.join_type { + JoinType::Inner | JoinType::Left => { + (true, self.left.schema(), self.right.schema()) + } + JoinType::Right => (false, self.right.schema(), self.left.schema()), + }; + let mut column_indices = vec![]; + for field in self.schema.fields() { + let (is_primary, column_index) = match primary_schema.index_of(field.name()) { + Ok(i) => Ok((true, i)), + Err(_) => { + match secondary_schema.index_of(field.name()) { + Ok(i) => Ok((false, i)), + _ => Err(DataFusionError::Internal( + format!("During execution, the column {} was not found in neither the left or right side of the join", field.name()).to_string() + )) + } + } + }.map_err(DataFusionError::into_arrow_external_error)?; + + let is_left = + is_primary && primary_is_left || !is_primary && !primary_is_left; + column_indices.push((column_index, is_left)); + } + + Ok(column_indices) + } } #[async_trait] @@ -203,12 +233,7 @@ impl ExecutionPlan for HashJoinExec { .map(|on| on.1.clone()) .collect::>(); - let column_indices = column_indices_from_schema( - &self.left.schema(), - &self.right.schema(), - &self.schema(), - self.join_type, - )?; + let column_indices = self.column_indices_from_schema()?; Ok(Box::pin(HashJoinStream { schema: self.schema.clone(), on_right, @@ -270,39 +295,6 @@ impl RecordBatchStream for HashJoinStream { } } -/// Calculates column index based on input / output schemas and jointype -fn column_indices_from_schema( - left: &Schema, - right: &Schema, - output_schema: &Schema, - join_type: JoinType, -) -> ArrowResult> { - let (primary_is_left, primary_schema, secondary_schema) = match join_type { - JoinType::Inner | JoinType::Left => (true, left, right), - JoinType::Right => (false, right, left), - }; - let mut column_indices = vec![]; - for field in output_schema.fields() { - let (is_primary, column_index) = match primary_schema.index_of(field.name()) { - Ok(i) => Ok((true, i)), - Err(_) => { - match secondary_schema.index_of(field.name()) { - Ok(i) => Ok((false, i)), - _ => Err(DataFusionError::Internal( - format!("During execution, the column {} was not found in neither the left or right side of the join", field.name()).to_string() - )) - } - } - }.map_err(DataFusionError::into_arrow_external_error)?; - - let is_left = - (is_primary && primary_is_left) || (!is_primary && !primary_is_left); - column_indices.push((column_index, is_left)); - } - - Ok(column_indices) -} - /// Returns a new [RecordBatch] by combining the `left` and `right` according to `indices`. /// The resulting batch has [Schema] `schema`. /// # Error From baa5dcea80392f8f575ff1cf723c121dc6cdc9e6 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 1 Jan 2021 12:56:03 +0100 Subject: [PATCH 03/19] Small refactor by using struct --- .../datafusion/src/physical_plan/hash_join.rs | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 566152f27b9..82032d5f133 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -81,6 +81,11 @@ pub struct HashJoinExec { build_side: Arc>>, } +struct ColumnIndex { + index: usize, + is_left: bool, +} + impl HashJoinExec { /// Tries to create a new [HashJoinExec]. /// # Error @@ -118,16 +123,16 @@ impl HashJoinExec { } /// Calculates column indices and left/right placement on input / output schemas and jointype - fn column_indices_from_schema(&self) -> ArrowResult> { + fn column_indices_from_schema(&self) -> ArrowResult> { let (primary_is_left, primary_schema, secondary_schema) = match self.join_type { JoinType::Inner | JoinType::Left => { (true, self.left.schema(), self.right.schema()) } JoinType::Right => (false, self.right.schema(), self.left.schema()), }; - let mut column_indices = vec![]; + let mut column_indices = Vec::with_capacity(self.schema.fields().len()); for field in self.schema.fields() { - let (is_primary, column_index) = match primary_schema.index_of(field.name()) { + let (is_primary, index) = match primary_schema.index_of(field.name()) { Ok(i) => Ok((true, i)), Err(_) => { match secondary_schema.index_of(field.name()) { @@ -141,7 +146,7 @@ impl HashJoinExec { let is_left = is_primary && primary_is_left || !is_primary && !primary_is_left; - column_indices.push((column_index, is_left)); + column_indices.push(ColumnIndex { index, is_left }); } Ok(column_indices) @@ -286,7 +291,7 @@ struct HashJoinStream { /// right right: SendableRecordBatchStream, /// Information of index and left / right placement of columns - column_indices: Vec<(usize, bool)>, + column_indices: Vec, } impl RecordBatchStream for HashJoinStream { @@ -305,7 +310,7 @@ fn build_batch_from_indices( left: &Vec, right: &RecordBatch, indices: &[(JoinIndex, RightIndex)], - column_indices: &Vec<(usize, bool)>, + column_indices: &Vec, ) -> ArrowResult { if left.is_empty() { todo!("Create empty record batch"); @@ -318,12 +323,12 @@ fn build_batch_from_indices( let right_indices = indices.iter().map(|(_, join_index)| join_index).collect(); - for (column_index, is_left) in column_indices { - let array = if *is_left { + for column_index in column_indices { + let array = if column_index.is_left { // Note that we take `.data_ref()` to gather the [ArrayData] of each array. let arrays = left .iter() - .map(|batch| batch.column(*column_index).data_ref().as_ref()) + .map(|batch| batch.column(column_index.index).data_ref().as_ref()) .collect::>(); let mut mutable = MutableArrayData::new(arrays, true, indices.len()); @@ -337,7 +342,7 @@ fn build_batch_from_indices( make_array(Arc::new(mutable.freeze())) } else { // use the right indices - let array = right.column(*column_index); + let array = right.column(column_index.index); compute::take(array.as_ref(), &right_indices, None)? }; columns.push(array); @@ -412,7 +417,7 @@ fn build_batch( on_right: &HashSet, join_type: JoinType, schema: &Schema, - column_indices: &Vec<(usize, bool)>, + column_indices: &Vec, ) -> ArrowResult { let indices = build_join_indexes(&left_data.0, &batch, join_type, on_right).unwrap(); From 5b6bf8316cc692ea01f9a0f185e1eb2d14994083 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 1 Jan 2021 13:11:49 +0100 Subject: [PATCH 04/19] Add comments --- rust/datafusion/src/physical_plan/hash_join.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 82032d5f133..d6b3b937004 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -81,8 +81,11 @@ pub struct HashJoinExec { build_side: Arc>>, } +/// Information about the index and placement (left or right) of the columns struct ColumnIndex { + /// Index of the column index: usize, + /// Whether the column is at the left or right side is_left: bool, } From 38d465121cd2a37ba4998bca223b39f90ec1307d Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 1 Jan 2021 13:27:09 +0100 Subject: [PATCH 05/19] Experiment with single batch --- rust/datafusion/src/physical_plan/coalesce_batches.rs | 2 +- rust/datafusion/src/physical_plan/hash_join.rs | 11 ++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/rust/datafusion/src/physical_plan/coalesce_batches.rs b/rust/datafusion/src/physical_plan/coalesce_batches.rs index 1be8c2e8221..8ab2fdf6d3c 100644 --- a/rust/datafusion/src/physical_plan/coalesce_batches.rs +++ b/rust/datafusion/src/physical_plan/coalesce_batches.rs @@ -194,7 +194,7 @@ impl RecordBatchStream for CoalesceBatchesStream { } } -fn concat_batches( +pub fn concat_batches( schema: &SchemaRef, batches: &[RecordBatch], row_count: usize, diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index d6b3b937004..47e614a2dce 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -46,6 +46,7 @@ use super::{ use crate::error::{DataFusionError, Result}; use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream}; +use crate::physical_plan::coalesce_batches::concat_batches; use ahash::RandomState; // An index of (batch, row) uniquely identifying a row in a part. @@ -83,7 +84,7 @@ pub struct HashJoinExec { /// Information about the index and placement (left or right) of the columns struct ColumnIndex { - /// Index of the column + /// Index of the column index: usize, /// Whether the column is at the left or right side is_left: bool, @@ -207,24 +208,24 @@ impl ExecutionPlan for HashJoinExec { .iter() .map(|on| on.0.clone()) .collect::>(); - // This operation performs 2 steps at once: // 1. creates a [JoinHashMap] of all batches from the stream // 2. stores the batches in a vector. let initial = (JoinHashMap::default(), Vec::new(), 0); - let left_data = stream + let (hashmap, batches, _len) = stream .try_fold(initial, |mut acc, batch| async { let hash = &mut acc.0; let values = &mut acc.1; let index = acc.2; - update_hash(&on_left, &batch, hash, index).unwrap(); + update_hash(&on_left, &batch, hash, 0).unwrap(); values.push(batch); acc.2 += 1; Ok(acc) }) .await?; + let single_batch = vec![concat_batches(&batches[0].schema(), &batches, 32768)?]; - let left_side = Arc::new((left_data.0, left_data.1)); + let left_side = Arc::new((hashmap, single_batch)); *build_side = Some(left_side.clone()); left_side } From 54b9c3ae2de2dedbcfaa6fe9e3faa61d8ca5c73a Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 1 Jan 2021 13:34:28 +0100 Subject: [PATCH 06/19] Use index as offset --- rust/datafusion/src/physical_plan/hash_join.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 47e614a2dce..c5ea87dfc57 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -217,9 +217,9 @@ impl ExecutionPlan for HashJoinExec { let hash = &mut acc.0; let values = &mut acc.1; let index = acc.2; - update_hash(&on_left, &batch, hash, 0).unwrap(); + update_hash(&on_left, &batch, hash, index).unwrap(); + acc.2 += batch.num_rows(); values.push(batch); - acc.2 += 1; Ok(acc) }) .await?; @@ -276,8 +276,8 @@ fn update_hash( hash.raw_entry_mut() .from_key(&key) - .and_modify(|_, v| v.push((index, row))) - .or_insert_with(|| (key.clone(), vec![(index, row)])); + .and_modify(|_, v| v.push((0, row + index))) + .or_insert_with(|| (key.clone(), vec![(0, row + index)])); } Ok(()) } From 2db270c303a5879687f27120f15b18704ec599f0 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 1 Jan 2021 14:01:11 +0100 Subject: [PATCH 07/19] Use rows --- rust/datafusion/src/physical_plan/hash_join.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index c5ea87dfc57..4fe9f67cfca 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -212,7 +212,7 @@ impl ExecutionPlan for HashJoinExec { // 1. creates a [JoinHashMap] of all batches from the stream // 2. stores the batches in a vector. let initial = (JoinHashMap::default(), Vec::new(), 0); - let (hashmap, batches, _len) = stream + let (hashmap, batches, num_rows) = stream .try_fold(initial, |mut acc, batch| async { let hash = &mut acc.0; let values = &mut acc.1; @@ -223,7 +223,9 @@ impl ExecutionPlan for HashJoinExec { Ok(acc) }) .await?; - let single_batch = vec![concat_batches(&batches[0].schema(), &batches, 32768)?]; + + let single_batch = + vec![concat_batches(&batches[0].schema(), &batches, num_rows)?]; let left_side = Arc::new((hashmap, single_batch)); *build_side = Some(left_side.clone()); From 5842c730048c91812e7eb10e69590e25ff0411d3 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 1 Jan 2021 14:57:05 +0100 Subject: [PATCH 08/19] Refactor with single batch --- .../datafusion/src/physical_plan/hash_join.rs | 45 +++++++------------ 1 file changed, 17 insertions(+), 28 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 4fe9f67cfca..33150588feb 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -27,7 +27,7 @@ use futures::{Stream, StreamExt, TryStreamExt}; use hashbrown::HashMap; use tokio::sync::Mutex; -use arrow::array::{make_array, Array, MutableArrayData}; +use arrow::array::Array; use arrow::datatypes::DataType; use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::Result as ArrowResult; @@ -49,12 +49,12 @@ use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchS use crate::physical_plan::coalesce_batches::concat_batches; use ahash::RandomState; -// An index of (batch, row) uniquely identifying a row in a part. -type Index = (usize, usize); +// An index of uniquely identifying a row. +type Index = usize; // A pair (left index, right index) // Note that while this is currently equal to `Index`, the `JoinIndex` is semantically different // as a left join may issue None indices, in which case -type JoinIndex = Option<(usize, usize)>; +type JoinIndex = Option; // An index of row uniquely identifying a row in a batch type RightIndex = Option; @@ -62,7 +62,7 @@ type RightIndex = Option; // E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true // for rows 3 and 8 from batch 0 and row 6 from batch 1. type JoinHashMap = HashMap, Vec, RandomState>; -type JoinLeftData = Arc<(JoinHashMap, Vec)>; +type JoinLeftData = Arc<(JoinHashMap, RecordBatch)>; /// join execution plan executes partitions in parallel and combines them into a set of /// partitions. @@ -225,7 +225,7 @@ impl ExecutionPlan for HashJoinExec { .await?; let single_batch = - vec![concat_batches(&batches[0].schema(), &batches, num_rows)?]; + concat_batches(&batches[0].schema(), &batches, num_rows)?; let left_side = Arc::new((hashmap, single_batch)); *build_side = Some(left_side.clone()); @@ -278,8 +278,8 @@ fn update_hash( hash.raw_entry_mut() .from_key(&key) - .and_modify(|_, v| v.push((0, row + index))) - .or_insert_with(|| (key.clone(), vec![(0, row + index)])); + .and_modify(|_, v| v.push(row + index)) + .or_insert_with(|| (key.clone(), vec![row + index])); } Ok(()) } @@ -313,39 +313,28 @@ impl RecordBatchStream for HashJoinStream { /// * fn build_batch_from_indices( schema: &Schema, - left: &Vec, + left: &RecordBatch, right: &RecordBatch, indices: &[(JoinIndex, RightIndex)], column_indices: &Vec, ) -> ArrowResult { - if left.is_empty() { - todo!("Create empty record batch"); - } - // build the columns of the new [RecordBatch]: // 1. pick whether the column is from the left or right // 2. based on the pick, `take` items from the different recordBatches let mut columns: Vec> = Vec::with_capacity(schema.fields().len()); + // TODO: u64 + let left_indices = indices + .iter() + .map(|(left_index, _)| left_index.map(|x| x as u32)) + .collect(); + let right_indices = indices.iter().map(|(_, join_index)| join_index).collect(); for column_index in column_indices { let array = if column_index.is_left { - // Note that we take `.data_ref()` to gather the [ArrayData] of each array. - let arrays = left - .iter() - .map(|batch| batch.column(column_index.index).data_ref().as_ref()) - .collect::>(); - - let mut mutable = MutableArrayData::new(arrays, true, indices.len()); - // use the left indices - for (join_index, _) in indices { - match join_index { - Some((batch, row)) => mutable.extend(*batch, *row, *row + 1), - None => mutable.extend_nulls(1), - } - } - make_array(Arc::new(mutable.freeze())) + let array = left.column(column_index.index); + compute::take(array.as_ref(), &left_indices, None)? } else { // use the right indices let array = right.column(column_index.index); From 1511f0eb770f3332d6559bb141dec10df68d39ee Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 1 Jan 2021 19:45:40 +0100 Subject: [PATCH 09/19] Refactor --- .../datafusion/src/physical_plan/hash_join.rs | 96 +++++++++---------- 1 file changed, 47 insertions(+), 49 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 33150588feb..22be430e69f 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -18,7 +18,10 @@ //! Defines the join plan for executing partitions in parallel and then joining the results //! into a set of partitions. -use arrow::{array::ArrayRef, compute}; +use arrow::{ + array::{ArrayRef, UInt32Builder}, + compute, +}; use std::sync::Arc; use std::{any::Any, collections::HashSet}; @@ -49,19 +52,10 @@ use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchS use crate::physical_plan::coalesce_batches::concat_batches; use ahash::RandomState; -// An index of uniquely identifying a row. -type Index = usize; -// A pair (left index, right index) -// Note that while this is currently equal to `Index`, the `JoinIndex` is semantically different -// as a left join may issue None indices, in which case -type JoinIndex = Option; -// An index of row uniquely identifying a row in a batch -type RightIndex = Option; - // Maps ["on" value] -> [list of indices with this key's value] // E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true // for rows 3 and 8 from batch 0 and row 6 from batch 1. -type JoinHashMap = HashMap, Vec, RandomState>; +type JoinHashMap = HashMap, Vec, RandomState>; type JoinLeftData = Arc<(JoinHashMap, RecordBatch)>; /// join execution plan executes partitions in parallel and combines them into a set of @@ -315,22 +309,15 @@ fn build_batch_from_indices( schema: &Schema, left: &RecordBatch, right: &RecordBatch, - indices: &[(JoinIndex, RightIndex)], + left_indices: UInt32Array, + right_indices: UInt32Array, column_indices: &Vec, ) -> ArrowResult { // build the columns of the new [RecordBatch]: // 1. pick whether the column is from the left or right - // 2. based on the pick, `take` items from the different recordBatches + // 2. based on the pick, `take` items from the different RecordBatches let mut columns: Vec> = Vec::with_capacity(schema.fields().len()); - // TODO: u64 - let left_indices = indices - .iter() - .map(|(left_index, _)| left_index.map(|x| x as u32)) - .collect(); - - let right_indices = indices.iter().map(|(_, join_index)| join_index).collect(); - for column_index in column_indices { let array = if column_index.is_left { let array = left.column(column_index.index); @@ -342,7 +329,7 @@ fn build_batch_from_indices( }; columns.push(array); } - Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?) + RecordBatch::try_new(Arc::new(schema.clone()), columns) } /// Create a key `Vec` that is used as key for the hashmap @@ -414,9 +401,17 @@ fn build_batch( schema: &Schema, column_indices: &Vec, ) -> ArrowResult { - let indices = build_join_indexes(&left_data.0, &batch, join_type, on_right).unwrap(); - - build_batch_from_indices(schema, &left_data.1, batch, &indices, column_indices) + let (left_indices, right_indices) = + build_join_indexes(&left_data.0, &batch, join_type, on_right).unwrap(); + + build_batch_from_indices( + schema, + &left_data.1, + batch, + left_indices, + right_indices, + column_indices, + ) } /// returns a vector with (index from left, index from right). @@ -451,7 +446,7 @@ fn build_join_indexes( right: &RecordBatch, join_type: JoinType, right_on: &HashSet, -) -> Result> { +) -> Result<(UInt32Array, UInt32Array)> { let keys_values = right_on .iter() .map(|name| Ok(col(name).evaluate(right)?.into_array(right.num_rows()))) @@ -459,10 +454,11 @@ fn build_join_indexes( let mut key = Vec::with_capacity(keys_values.len()); + let mut left_indices = UInt32Builder::new(0); + let mut right_indices = UInt32Builder::new(0); + match join_type { JoinType::Inner => { - let mut indexes = Vec::new(); // unknown a prior size - // Visit all of the right rows for row in 0..right.num_rows() { // Get the key and find it in the build index @@ -470,16 +466,15 @@ fn build_join_indexes( let left_indexes = left.get(&key); // for every item on the left and right with this key, add the respective pair - left_indexes.unwrap_or(&vec![]).iter().for_each(|x| { + for x in left_indexes.unwrap_or(&vec![]) { // on an inner join, left and right indices are present - indexes.push((Some(*x), Some(row as u32))); - }) + left_indices.append_value(*x as u32)?; + right_indices.append_value(row as u32)?; + } } - Ok(indexes) + Ok((left_indices.finish(), right_indices.finish())) } JoinType::Left => { - let mut indexes = Vec::new(); // unknown a prior size - // Keep track of which item is visited in the build input // TODO: this can be stored more efficiently with a marker let mut is_visited = HashSet::new(); @@ -492,42 +487,45 @@ fn build_join_indexes( if let Some(indices) = left_indexes { is_visited.insert(key.clone()); - indices.iter().for_each(|x| { - indexes.push((Some(*x), Some(row as u32))); - }) + for x in indices { + left_indices.append_value(*x as u32)?; + right_indices.append_value(row as u32)?; + } }; } // Add the remaining left rows to the result set with None on the right side for (key, indices) in left { if !is_visited.contains(key) { - indices.iter().for_each(|x| { - indexes.push((Some(*x), None)); - }); + for x in indices { + left_indices.append_value(*x as u32)?; + right_indices.append_null()?; + }; } } - Ok(indexes) + Ok((left_indices.finish(), right_indices.finish())) } JoinType::Right => { - let mut indexes = Vec::new(); // unknown a prior size for row in 0..right.num_rows() { create_key(&keys_values, row, &mut key)?; - let left_indices = left.get(&key); + let left_indexes = left.get(&key); - match left_indices { + match left_indexes { Some(indices) => { - indices.iter().for_each(|x| { - indexes.push((Some(*x), Some(row as u32))); - }); + for x in indices { + left_indices.append_value(*x as u32)?; + right_indices.append_value(row as u32)?; + } } None => { // when no match, add the row with None for the left side - indexes.push((None, Some(row as u32))); + left_indices.append_null()?; + right_indices.append_value(row as u32)?; } } } - Ok(indexes) + Ok((left_indices.finish(), right_indices.finish())) } } } From a976ace567e18c01337e4e6f94e852ff7ccc901f Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 2 Jan 2021 17:31:17 +0100 Subject: [PATCH 10/19] Small simplification --- rust/datafusion/src/physical_plan/hash_join.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 22be430e69f..4372feb796d 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -380,7 +380,7 @@ pub(crate) fn create_key( // store the size vec.extend(value.len().to_le_bytes().iter()); // store the string value - vec.extend(array.value(row).as_bytes().iter()); + vec.extend(value.as_bytes().iter()); } _ => { // This is internal because we should have caught this before. From 797ffb6c8a41cbe9fd0c06e1d0379680b83a1197 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 2 Jan 2021 19:17:24 +0100 Subject: [PATCH 11/19] Add comment --- rust/datafusion/src/physical_plan/coalesce_batches.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/datafusion/src/physical_plan/coalesce_batches.rs b/rust/datafusion/src/physical_plan/coalesce_batches.rs index 8ab2fdf6d3c..2c5209018f9 100644 --- a/rust/datafusion/src/physical_plan/coalesce_batches.rs +++ b/rust/datafusion/src/physical_plan/coalesce_batches.rs @@ -194,6 +194,7 @@ impl RecordBatchStream for CoalesceBatchesStream { } } +/// Concatenates an array of `RecordBatch` into one batch pub fn concat_batches( schema: &SchemaRef, batches: &[RecordBatch], From b865b8a1d876956e969e7c4263f033db45a8cc52 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 3 Jan 2021 05:11:50 +0100 Subject: [PATCH 12/19] Small simplifiction --- .../datafusion/src/physical_plan/hash_join.rs | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 3f8c42a7bcd..23033ec366c 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -18,8 +18,11 @@ //! Defines the join plan for executing partitions in parallel and then joining the results //! into a set of partitions. -use arrow::{array::{TimestampMicrosecondArray, TimestampNanosecondArray, UInt32Builder}, datatypes::TimeUnit}; use arrow::{array::ArrayRef, compute}; +use arrow::{ + array::{TimestampMicrosecondArray, TimestampNanosecondArray, UInt32Builder}, + datatypes::TimeUnit, +}; use std::sync::Arc; use std::{any::Any, collections::HashSet}; @@ -342,23 +345,23 @@ pub(crate) fn create_key( match col.data_type() { DataType::UInt8 => { let array = col.as_any().downcast_ref::().unwrap(); - vec.extend(array.value(row).to_le_bytes().iter()); + vec.extend_from_slice(&array.value(row).to_le_bytes()); } DataType::UInt16 => { let array = col.as_any().downcast_ref::().unwrap(); - vec.extend(array.value(row).to_le_bytes().iter()); + vec.extend_from_slice(&array.value(row).to_le_bytes()); } DataType::UInt32 => { let array = col.as_any().downcast_ref::().unwrap(); - vec.extend(array.value(row).to_le_bytes().iter()); + vec.extend_from_slice(&array.value(row).to_le_bytes()); } DataType::UInt64 => { let array = col.as_any().downcast_ref::().unwrap(); - vec.extend(array.value(row).to_le_bytes().iter()); + vec.extend_from_slice(&array.value(row).to_le_bytes()); } DataType::Int8 => { let array = col.as_any().downcast_ref::().unwrap(); - vec.extend(array.value(row).to_le_bytes().iter()); + vec.extend_from_slice(&array.value(row).to_le_bytes()); } DataType::Int16 => { let array = col.as_any().downcast_ref::().unwrap(); @@ -366,33 +369,33 @@ pub(crate) fn create_key( } DataType::Int32 => { let array = col.as_any().downcast_ref::().unwrap(); - vec.extend(array.value(row).to_le_bytes().iter()); + vec.extend_from_slice(&array.value(row).to_le_bytes()); } DataType::Int64 => { let array = col.as_any().downcast_ref::().unwrap(); - vec.extend(array.value(row).to_le_bytes().iter()); + vec.extend_from_slice(&array.value(row).to_le_bytes()); } DataType::Timestamp(TimeUnit::Microsecond, None) => { let array = col .as_any() .downcast_ref::() .unwrap(); - vec.extend(array.value(row).to_le_bytes().iter()); + vec.extend_from_slice(&array.value(row).to_le_bytes()); } DataType::Timestamp(TimeUnit::Nanosecond, None) => { let array = col .as_any() .downcast_ref::() .unwrap(); - vec.extend(array.value(row).to_le_bytes().iter()); + vec.extend_from_slice(&array.value(row).to_le_bytes()); } DataType::Utf8 => { let array = col.as_any().downcast_ref::().unwrap(); let value = array.value(row); // store the size - vec.extend(value.len().to_le_bytes().iter()); + vec.extend_from_slice(&value.len().to_le_bytes()); // store the string value - vec.extend(value.as_bytes().iter()); + vec.extend_from_slice(value.as_bytes()); } _ => { // This is internal because we should have caught this before. From 0efb5a7bd2f692040ccc5c70c5fd607a5c995577 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 3 Jan 2021 05:16:12 +0100 Subject: [PATCH 13/19] Reuse num rows --- rust/datafusion/src/physical_plan/hash_join.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index a4081d68fbc..862a5577106 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -222,8 +222,6 @@ impl ExecutionPlan for HashJoinExec { Ok(acc) }) .await?; - let num_rows: usize = - batches.iter().map(|batch| batch.num_rows()).sum(); let single_batch = concat_batches(&batches[0].schema(), &batches, num_rows)?; From bbec07005b090bbb4aa063a7df543069aeadbcaf Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 3 Jan 2021 05:17:08 +0100 Subject: [PATCH 14/19] Rename as offset --- rust/datafusion/src/physical_plan/hash_join.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 862a5577106..b6d02fe5e7f 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -215,8 +215,8 @@ impl ExecutionPlan for HashJoinExec { .try_fold(initial, |mut acc, batch| async { let hash = &mut acc.0; let values = &mut acc.1; - let index = acc.2; - update_hash(&on_left, &batch, hash, index).unwrap(); + let offset = acc.2; + update_hash(&on_left, &batch, hash, offset).unwrap(); acc.2 += batch.num_rows(); values.push(batch); Ok(acc) @@ -274,7 +274,7 @@ fn update_hash( on: &HashSet, batch: &RecordBatch, hash: &mut JoinHashMap, - index: usize, + offset: usize, ) -> Result<()> { // evaluate the keys let keys_values = on @@ -290,8 +290,8 @@ fn update_hash( hash.raw_entry_mut() .from_key(&key) - .and_modify(|_, v| v.push(row + index)) - .or_insert_with(|| (key.clone(), vec![row + index])); + .and_modify(|_, v| v.push(row + offset)) + .or_insert_with(|| (key.clone(), vec![row + offset])); } Ok(()) } From 6d182c957ad1947778c6012fea86c9fc224dbd54 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 3 Jan 2021 05:23:23 +0100 Subject: [PATCH 15/19] Use u64 for left indices --- rust/datafusion/src/physical_plan/hash_join.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index b6d02fe5e7f..32e16276981 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -18,7 +18,7 @@ //! Defines the join plan for executing partitions in parallel and then joining the results //! into a set of partitions. -use arrow::{array::ArrayRef, compute}; +use arrow::{array::{ArrayRef, UInt64Builder}, compute}; use arrow::{ array::{TimestampMicrosecondArray, TimestampNanosecondArray, UInt32Builder}, datatypes::TimeUnit, @@ -337,7 +337,7 @@ fn build_batch_from_indices( schema: &Schema, left: &RecordBatch, right: &RecordBatch, - left_indices: UInt32Array, + left_indices: UInt64Array, right_indices: UInt32Array, column_indices: &Vec, ) -> ArrowResult { @@ -487,7 +487,7 @@ fn build_join_indexes( right: &RecordBatch, join_type: JoinType, right_on: &HashSet, -) -> Result<(UInt32Array, UInt32Array)> { +) -> Result<(UInt64Array, UInt32Array)> { let keys_values = right_on .iter() .map(|name| Ok(col(name).evaluate(right)?.into_array(right.num_rows()))) @@ -495,7 +495,7 @@ fn build_join_indexes( let mut key = Vec::with_capacity(keys_values.len()); - let mut left_indices = UInt32Builder::new(0); + let mut left_indices = UInt64Builder::new(0); let mut right_indices = UInt32Builder::new(0); match join_type { @@ -509,7 +509,7 @@ fn build_join_indexes( // for every item on the left and right with this key, add the respective pair for x in left_indexes.unwrap_or(&vec![]) { // on an inner join, left and right indices are present - left_indices.append_value(*x as u32)?; + left_indices.append_value(*x as u64)?; right_indices.append_value(row as u32)?; } } @@ -529,7 +529,7 @@ fn build_join_indexes( is_visited.insert(key.clone()); for x in indices { - left_indices.append_value(*x as u32)?; + left_indices.append_value(*x as u64)?; right_indices.append_value(row as u32)?; } }; @@ -538,7 +538,7 @@ fn build_join_indexes( for (key, indices) in left { if !is_visited.contains(key) { for x in indices { - left_indices.append_value(*x as u32)?; + left_indices.append_value(*x as u64)?; right_indices.append_null()?; } } @@ -555,7 +555,7 @@ fn build_join_indexes( match left_indexes { Some(indices) => { for x in indices { - left_indices.append_value(*x as u32)?; + left_indices.append_value(*x as u64)?; right_indices.append_value(row as u32)?; } } From d58abf638a2a311d1657c29733f89ce395db40b3 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 3 Jan 2021 06:12:13 +0100 Subject: [PATCH 16/19] Use appending by slice, small refactor --- .../datafusion/src/physical_plan/hash_join.rs | 41 +++++++++++-------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 32e16276981..5a7226741a7 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -18,7 +18,10 @@ //! Defines the join plan for executing partitions in parallel and then joining the results //! into a set of partitions. -use arrow::{array::{ArrayRef, UInt64Builder}, compute}; +use arrow::{ + array::{ArrayRef, UInt64Builder}, + compute, +}; use arrow::{ array::{TimestampMicrosecondArray, TimestampNanosecondArray, UInt32Builder}, datatypes::TimeUnit, @@ -58,7 +61,7 @@ use log::debug; // Maps ["on" value] -> [list of indices with this key's value] // E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true // for rows 3 and 8 from batch 0 and row 6 from batch 1. -type JoinHashMap = HashMap, Vec, RandomState>; +type JoinHashMap = HashMap, Vec, RandomState>; type JoinLeftData = Arc<(JoinHashMap, RecordBatch)>; /// join execution plan executes partitions in parallel and combines them into a set of @@ -223,6 +226,8 @@ impl ExecutionPlan for HashJoinExec { }) .await?; + // Merge all batches into a single batch, so we + // can directly index into the arrays let single_batch = concat_batches(&batches[0].schema(), &batches, num_rows)?; @@ -290,8 +295,8 @@ fn update_hash( hash.raw_entry_mut() .from_key(&key) - .and_modify(|_, v| v.push(row + offset)) - .or_insert_with(|| (key.clone(), vec![row + offset])); + .and_modify(|_, v| v.push(row as u64 + offset as u64)) + .or_insert_with(|| (key.clone(), vec![row as u64 + offset as u64])); } Ok(()) } @@ -505,12 +510,15 @@ fn build_join_indexes( // Get the key and find it in the build index create_key(&keys_values, row, &mut key)?; let left_indexes = left.get(&key); - // for every item on the left and right with this key, add the respective pair - for x in left_indexes.unwrap_or(&vec![]) { - // on an inner join, left and right indices are present - left_indices.append_value(*x as u64)?; - right_indices.append_value(row as u32)?; + + if let Some(indices) = left_indexes { + left_indices.append_slice(&indices)?; + + for _ in 0..indices.len() { + // on an inner join, left and right indices are present + right_indices.append_value(row as u32)?; + } } } Ok((left_indices.finish(), right_indices.finish())) @@ -527,9 +535,9 @@ fn build_join_indexes( if let Some(indices) = left_indexes { is_visited.insert(key.clone()); - - for x in indices { - left_indices.append_value(*x as u64)?; + left_indices.append_slice(&indices)?; + for _ in 0..indices.len() { + // on an inner join, left and right indices are present right_indices.append_value(row as u32)?; } }; @@ -537,8 +545,8 @@ fn build_join_indexes( // Add the remaining left rows to the result set with None on the right side for (key, indices) in left { if !is_visited.contains(key) { - for x in indices { - left_indices.append_value(*x as u64)?; + left_indices.append_slice(&indices)?; + for _ in 0..indices.len() { right_indices.append_null()?; } } @@ -554,8 +562,9 @@ fn build_join_indexes( match left_indexes { Some(indices) => { - for x in indices { - left_indices.append_value(*x as u64)?; + left_indices.append_slice(&indices)?; + + for _ in 0..indices.len() { right_indices.append_value(row as u32)?; } } From 69d328663a9434bb4f58748d802e854411e6d49b Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 3 Jan 2021 06:29:51 +0100 Subject: [PATCH 17/19] Style tweak --- rust/datafusion/src/physical_plan/hash_join.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 5a7226741a7..20535a2c49e 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -295,8 +295,8 @@ fn update_hash( hash.raw_entry_mut() .from_key(&key) - .and_modify(|_, v| v.push(row as u64 + offset as u64)) - .or_insert_with(|| (key.clone(), vec![row as u64 + offset as u64])); + .and_modify(|_, v| v.push((row + offset) as u64)) + .or_insert_with(|| (key.clone(), vec![(row + offset) as u64])); } Ok(()) } From c96ba9fa533c8973e76d2cbada55a948e8331513 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 3 Jan 2021 06:39:09 +0100 Subject: [PATCH 18/19] Style --- rust/datafusion/src/physical_plan/hash_join.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 20535a2c49e..c297eccf37c 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -509,10 +509,9 @@ fn build_join_indexes( for row in 0..right.num_rows() { // Get the key and find it in the build index create_key(&keys_values, row, &mut key)?; - let left_indexes = left.get(&key); - // for every item on the left and right with this key, add the respective pair - if let Some(indices) = left_indexes { + // for every item on the left and right with this key, add the respective pair + if let Some(indices) = left.get(&key) { left_indices.append_slice(&indices)?; for _ in 0..indices.len() { @@ -531,9 +530,8 @@ fn build_join_indexes( // First visit all of the rows for row in 0..right.num_rows() { create_key(&keys_values, row, &mut key)?; - let left_indexes = left.get(&key); - if let Some(indices) = left_indexes { + if let Some(indices) = left.get(&key) { is_visited.insert(key.clone()); left_indices.append_slice(&indices)?; for _ in 0..indices.len() { @@ -558,9 +556,7 @@ fn build_join_indexes( for row in 0..right.num_rows() { create_key(&keys_values, row, &mut key)?; - let left_indexes = left.get(&key); - - match left_indexes { + match left.get(&key) { Some(indices) => { left_indices.append_slice(&indices)?; From 33b96f3c667ab4d824a56f16e6166c2107d051ec Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 3 Jan 2021 12:05:56 +0100 Subject: [PATCH 19/19] Doc updates --- rust/datafusion/src/physical_plan/hash_join.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index c297eccf37c..31056dc47d0 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -515,7 +515,6 @@ fn build_join_indexes( left_indices.append_slice(&indices)?; for _ in 0..indices.len() { - // on an inner join, left and right indices are present right_indices.append_value(row as u32)?; } } @@ -525,6 +524,9 @@ fn build_join_indexes( JoinType::Left => { // Keep track of which item is visited in the build input // TODO: this can be stored more efficiently with a marker + // https://issues.apache.org/jira/browse/ARROW-11116 + // TODO: Fix LEFT join with multiple right batches + // https://issues.apache.org/jira/browse/ARROW-10971 let mut is_visited = HashSet::new(); // First visit all of the rows @@ -535,7 +537,6 @@ fn build_join_indexes( is_visited.insert(key.clone()); left_indices.append_slice(&indices)?; for _ in 0..indices.len() { - // on an inner join, left and right indices are present right_indices.append_value(row as u32)?; } };