From d855815e6d1b6aba237c6091d62de5bd14821348 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 4 Dec 2020 12:33:07 +0100 Subject: [PATCH 1/6] Avoid double hashing using raw_entry api --- rust/datafusion/src/lib.rs | 2 +- rust/datafusion/src/physical_plan/hash_join.rs | 13 +++++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/rust/datafusion/src/lib.rs b/rust/datafusion/src/lib.rs index 4e4222d97da..db1dd7528ce 100644 --- a/rust/datafusion/src/lib.rs +++ b/rust/datafusion/src/lib.rs @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. - +#![feature(hash_raw_entry)] #![warn(missing_docs)] // Clippy lints, some should be disabled incrementally #![allow( diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 17d9f69545d..81cbb6d9876 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -214,12 +214,13 @@ fn update_hash( // update the hash map for row in 0..batch.num_rows() { create_key(&keys_values, row, &mut key)?; - match hash.get_mut(&key) { - Some(v) => v.push((index, row)), - None => { - hash.insert(key.clone(), vec![(index, row)]); - } - }; + + hash + .raw_entry_mut() + .from_key(&key) + .and_modify(|_, v| v.push((index, row))) + .or_insert_with(|| (key.clone(), vec![(index, row)])); + } Ok(()) } From 475946fafe7713f621c263910f77de6342867550 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 4 Dec 2020 12:46:10 +0100 Subject: [PATCH 2/6] Avoid double hashing using raw_entry api --- .../src/physical_plan/hash_aggregate.rs | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_aggregate.rs b/rust/datafusion/src/physical_plan/hash_aggregate.rs index 0c64bc9ea02..964ce2012e8 100644 --- a/rust/datafusion/src/physical_plan/hash_aggregate.rs +++ b/rust/datafusion/src/physical_plan/hash_aggregate.rs @@ -257,19 +257,15 @@ fn group_aggregate_batch( // 1.1 create_key(&group_values, row, &mut key) .map_err(DataFusionError::into_arrow_external_error)?; - - match accumulators.get_mut(&key) { - // 1.2 - None => { + accumulators + .raw_entry_mut() + .from_key(&key) + .and_modify(|_, (_, v)| v.push(row as u32)) + .or_insert_with(|| { let accumulator_set = create_accumulators(aggr_expr) - .map_err(DataFusionError::into_arrow_external_error)?; - - accumulators - .insert(key.clone(), (accumulator_set, Box::new(vec![row as u32]))); - } - // 1.3 - Some((_, v)) => v.push(row as u32), - } + .expect("Couldn't generate accumulators"); + (key.clone(), (accumulator_set, Box::new(vec![row as u32]))) + }); } // 2.1 for each key From 379fc6cd97a0cc1e78be475d5c9970b04c0a601e Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 4 Dec 2020 13:11:43 +0100 Subject: [PATCH 3/6] fmt --- rust/datafusion/src/physical_plan/hash_join.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 81cbb6d9876..00dfac404a3 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -215,12 +215,10 @@ fn update_hash( for row in 0..batch.num_rows() { create_key(&keys_values, row, &mut key)?; - hash - .raw_entry_mut() - .from_key(&key) - .and_modify(|_, v| v.push((index, row))) - .or_insert_with(|| (key.clone(), vec![(index, row)])); - + hash.raw_entry_mut() + .from_key(&key) + .and_modify(|_, v| v.push((index, row))) + .or_insert_with(|| (key.clone(), vec![(index, row)])); } Ok(()) } From 32c84a291b1d3c65521aebc3622b2fe2dc785fee Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 4 Dec 2020 17:50:35 +0100 Subject: [PATCH 4/6] Use hashbrown to avoid unstable feature --- rust/datafusion/Cargo.toml | 1 + rust/datafusion/src/lib.rs | 1 - rust/datafusion/src/physical_plan/hash_aggregate.rs | 2 +- rust/datafusion/src/physical_plan/hash_join.rs | 6 ++---- 4 files changed, 4 insertions(+), 6 deletions(-) diff --git a/rust/datafusion/Cargo.toml b/rust/datafusion/Cargo.toml index bd2ae974c38..193ad6b807d 100644 --- a/rust/datafusion/Cargo.toml +++ b/rust/datafusion/Cargo.toml @@ -45,6 +45,7 @@ cli = ["rustyline"] [dependencies] ahash = "0.6" +hashbrown = "0.9" arrow = { path = "../arrow", version = "3.0.0-SNAPSHOT", features = ["prettyprint"] } parquet = { path = "../parquet", version = "3.0.0-SNAPSHOT", features = ["arrow"] } sqlparser = "0.6.1" diff --git a/rust/datafusion/src/lib.rs b/rust/datafusion/src/lib.rs index db1dd7528ce..6f12f4c2602 100644 --- a/rust/datafusion/src/lib.rs +++ b/rust/datafusion/src/lib.rs @@ -14,7 +14,6 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -#![feature(hash_raw_entry)] #![warn(missing_docs)] // Clippy lints, some should be disabled incrementally #![allow( diff --git a/rust/datafusion/src/physical_plan/hash_aggregate.rs b/rust/datafusion/src/physical_plan/hash_aggregate.rs index 964ce2012e8..e5c647f6775 100644 --- a/rust/datafusion/src/physical_plan/hash_aggregate.rs +++ b/rust/datafusion/src/physical_plan/hash_aggregate.rs @@ -47,7 +47,7 @@ use super::{ SendableRecordBatchStream, }; use ahash::RandomState; -use std::collections::HashMap; +use hashbrown::HashMap; use async_trait::async_trait; diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 00dfac404a3..d2bb8cf7c41 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -19,13 +19,11 @@ //! into a set of partitions. use std::sync::Arc; -use std::{ - any::Any, - collections::{HashMap, HashSet}, -}; +use std::{any::Any, collections::HashSet}; use async_trait::async_trait; use futures::{Stream, StreamExt, TryStreamExt}; +use hashbrown::HashMap; use arrow::array::{make_array, Array, MutableArrayData}; use arrow::datatypes::{Schema, SchemaRef}; From a5bb8a3f3bc3732e55b655e359856d563697fc72 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 4 Dec 2020 18:29:30 +0100 Subject: [PATCH 5/6] Return error as before --- rust/datafusion/src/physical_plan/hash_aggregate.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_aggregate.rs b/rust/datafusion/src/physical_plan/hash_aggregate.rs index e5c647f6775..75c8388986a 100644 --- a/rust/datafusion/src/physical_plan/hash_aggregate.rs +++ b/rust/datafusion/src/physical_plan/hash_aggregate.rs @@ -253,6 +253,10 @@ fn group_aggregate_batch( // 1.1 construct the key from the group values // 1.2 construct the mapping key if it does not exist // 1.3 add the row' index to `indices` + + // Make sure we can create the accumulators or otherwise return an error + create_accumulators(aggr_expr)?; + for row in 0..batch.num_rows() { // 1.1 create_key(&group_values, row, &mut key) @@ -260,10 +264,12 @@ fn group_aggregate_batch( accumulators .raw_entry_mut() .from_key(&key) + // 1.3 .and_modify(|_, (_, v)| v.push(row as u32)) + // 1.2 .or_insert_with(|| { - let accumulator_set = create_accumulators(aggr_expr) - .expect("Couldn't generate accumulators"); + // We can safely unwrap here as we checked we can create an accumulator before + let accumulator_set = create_accumulators(aggr_expr).unwrap(); (key.clone(), (accumulator_set, Box::new(vec![row as u32]))) }); } From 94e12029251c857101ccebd900fa96eefb077a88 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 4 Dec 2020 18:45:40 +0100 Subject: [PATCH 6/6] Map to same error --- rust/datafusion/src/physical_plan/hash_aggregate.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/datafusion/src/physical_plan/hash_aggregate.rs b/rust/datafusion/src/physical_plan/hash_aggregate.rs index 75c8388986a..6be23263c04 100644 --- a/rust/datafusion/src/physical_plan/hash_aggregate.rs +++ b/rust/datafusion/src/physical_plan/hash_aggregate.rs @@ -255,7 +255,7 @@ fn group_aggregate_batch( // 1.3 add the row' index to `indices` // Make sure we can create the accumulators or otherwise return an error - create_accumulators(aggr_expr)?; + create_accumulators(aggr_expr).map_err(DataFusionError::into_arrow_external_error)?; for row in 0..batch.num_rows() { // 1.1