Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 74 additions & 36 deletions rust/datafusion/src/physical_plan/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ use ahash::RandomState;

use arrow::{
array::{
ArrayRef, BooleanArray, LargeStringArray, TimestampMicrosecondArray,
TimestampNanosecondArray, UInt32Builder, UInt64Builder,
ArrayData, ArrayRef, BooleanArray, LargeStringArray, PrimitiveArray,
TimestampMicrosecondArray, TimestampNanosecondArray, UInt32BufferBuilder,
UInt32Builder, UInt64BufferBuilder, UInt64Builder,
},
compute,
datatypes::TimeUnit,
datatypes::{TimeUnit, UInt32Type, UInt64Type},
};
use std::time::Instant;
use std::{any::Any, collections::HashSet};
Expand Down Expand Up @@ -237,19 +238,26 @@ impl ExecutionPlan for HashJoinExec {
// This operation performs 2 steps at once:
// 1. creates a [JoinHashMap] of all batches from the stream
// 2. stores the batches in a vector.
let initial =
(JoinHashMap::with_hasher(IdHashBuilder {}), Vec::new(), 0);
let (hashmap, batches, num_rows) = stream
let initial = (
JoinHashMap::with_hasher(IdHashBuilder {}),
Vec::new(),
0,
Vec::new(),
);
let (hashmap, batches, num_rows, _) = stream
.try_fold(initial, |mut acc, batch| async {
let hash = &mut acc.0;
let values = &mut acc.1;
let offset = acc.2;
acc.3.clear();
acc.3.resize(batch.num_rows(), 0);
update_hash(
&on_left,
&batch,
hash,
offset,
&self.random_state,
&mut acc.3,
)
.unwrap();
acc.2 += batch.num_rows();
Expand Down Expand Up @@ -311,6 +319,7 @@ fn update_hash(
hash: &mut JoinHashMap,
offset: usize,
random_state: &RandomState,
hashes_buffer: &mut Vec<u64>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is effectively allowing hashes_buffer to be reused, right?

It may eventually make sense to make some struct that holds all the relevant state (on, random_state, hash_buf, etc).

Copy link
Contributor Author

@Dandandan Dandandan Mar 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, this change is for reusing the allocated Vec.

Yes, makes sense to group them in a struct. There are some opportunities in other functions build_join_indexes build_batch, etc. for this as well. Not sure if it makes sense they all receive the same struct, or maybe all of them a subset of the most commonly needed parts 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 definitely not for this PR

) -> Result<()> {
// evaluate the keys
let keys_values = on
Expand All @@ -319,7 +328,7 @@ fn update_hash(
.collect::<Result<Vec<_>>>()?;

// update the hash map
let hash_values = create_hashes(&keys_values, &random_state)?;
let hash_values = create_hashes(&keys_values, &random_state, hashes_buffer)?;

// insert hashes to key of the hashmap
for (row, hash_value) in hash_values.iter().enumerate() {
Expand Down Expand Up @@ -476,15 +485,16 @@ fn build_join_indexes(
.into_array(left_data.1.num_rows()))
})
.collect::<Result<Vec<_>>>()?;

let hash_values = create_hashes(&keys_values, &random_state)?;
let hashes_buffer = &mut vec![0; keys_values[0].len()];
let hash_values = create_hashes(&keys_values, &random_state, hashes_buffer)?;
let left = &left_data.0;

let mut left_indices = UInt64Builder::new(0);
let mut right_indices = UInt32Builder::new(0);

match join_type {
JoinType::Inner => {
// Using a buffer builder to avoid slower normal builder
let mut left_indices = UInt64BufferBuilder::new(0);
let mut right_indices = UInt32BufferBuilder::new(0);

// Visit all of the right rows
for (row, hash_value) in hash_values.iter().enumerate() {
// Get the hash and find it in the build index
Expand All @@ -496,15 +506,30 @@ fn build_join_indexes(
for &i in indices {
// Check hash collisions
if equal_rows(i as usize, row, &left_join_values, &keys_values)? {
left_indices.append_value(i)?;
right_indices.append_value(row as u32)?;
left_indices.append(i);
right_indices.append(row as u32);
}
}
}
}
Ok((left_indices.finish(), right_indices.finish()))
let left = ArrayData::builder(DataType::UInt64)
.len(left_indices.len())
.add_buffer(left_indices.finish())
.build();
let right = ArrayData::builder(DataType::UInt32)
.len(right_indices.len())
.add_buffer(right_indices.finish())
.build();

Ok((
PrimitiveArray::<UInt64Type>::from(left),
PrimitiveArray::<UInt32Type>::from(right),
))
}
JoinType::Left => {
let mut left_indices = UInt64Builder::new(0);
let mut right_indices = UInt32Builder::new(0);

// Keep track of which item is visited in the build input
// TODO: this can be stored more efficiently with a marker
// https://issues.apache.org/jira/browse/ARROW-11116
Expand Down Expand Up @@ -534,10 +559,12 @@ fn build_join_indexes(
}
}
}

Ok((left_indices.finish(), right_indices.finish()))
}
JoinType::Right => {
let mut left_indices = UInt64Builder::new(0);
let mut right_indices = UInt32Builder::new(0);

for (row, hash_value) in hash_values.iter().enumerate() {
match left.get(hash_value) {
Some(indices) => {
Expand Down Expand Up @@ -699,50 +726,60 @@ macro_rules! hash_array {
}

/// Creates hash values for every element in the row based on the values in the columns
pub fn create_hashes(
pub fn create_hashes<'a>(
arrays: &[ArrayRef],
random_state: &RandomState,
) -> Result<Vec<u64>> {
let rows = arrays[0].len();
let mut hashes = vec![0; rows];

hashes_buffer: &'a mut Vec<u64>,
) -> Result<&'a mut Vec<u64>> {
for col in arrays {
match col.data_type() {
DataType::UInt8 => {
hash_array!(UInt8Array, col, u8, hashes, random_state);
hash_array!(UInt8Array, col, u8, hashes_buffer, random_state);
}
DataType::UInt16 => {
hash_array!(UInt16Array, col, u16, hashes, random_state);
hash_array!(UInt16Array, col, u16, hashes_buffer, random_state);
}
DataType::UInt32 => {
hash_array!(UInt32Array, col, u32, hashes, random_state);
hash_array!(UInt32Array, col, u32, hashes_buffer, random_state);
}
DataType::UInt64 => {
hash_array!(UInt64Array, col, u64, hashes, random_state);
hash_array!(UInt64Array, col, u64, hashes_buffer, random_state);
}
DataType::Int8 => {
hash_array!(Int8Array, col, i8, hashes, random_state);
hash_array!(Int8Array, col, i8, hashes_buffer, random_state);
}
DataType::Int16 => {
hash_array!(Int16Array, col, i16, hashes, random_state);
hash_array!(Int16Array, col, i16, hashes_buffer, random_state);
}
DataType::Int32 => {
hash_array!(Int32Array, col, i32, hashes, random_state);
hash_array!(Int32Array, col, i32, hashes_buffer, random_state);
}
DataType::Int64 => {
hash_array!(Int64Array, col, i64, hashes, random_state);
hash_array!(Int64Array, col, i64, hashes_buffer, random_state);
}
DataType::Timestamp(TimeUnit::Microsecond, None) => {
hash_array!(TimestampMicrosecondArray, col, i64, hashes, random_state);
hash_array!(
TimestampMicrosecondArray,
col,
i64,
hashes_buffer,
random_state
);
}
DataType::Timestamp(TimeUnit::Nanosecond, None) => {
hash_array!(TimestampNanosecondArray, col, i64, hashes, random_state);
hash_array!(
TimestampNanosecondArray,
col,
i64,
hashes_buffer,
random_state
);
}
DataType::Boolean => {
hash_array!(BooleanArray, col, u8, hashes, random_state);
hash_array!(BooleanArray, col, u8, hashes_buffer, random_state);
}
DataType::Utf8 => {
hash_array!(StringArray, col, str, hashes, random_state);
hash_array!(StringArray, col, str, hashes_buffer, random_state);
}
_ => {
// This is internal because we should have caught this before.
Expand All @@ -752,7 +789,7 @@ pub fn create_hashes(
}
}
}
Ok(hashes)
Ok(hashes_buffer)
}

impl Stream for HashJoinStream {
Expand Down Expand Up @@ -1136,8 +1173,9 @@ mod tests {
);

let random_state = RandomState::new();

let hashes = create_hashes(&[left.columns()[0].clone()], &random_state)?;
let hashes_buff = &mut vec![0; left.num_rows()];
let hashes =
create_hashes(&[left.columns()[0].clone()], &random_state, hashes_buff)?;

// Create hash collisions
hashmap_left.insert(hashes[0], vec![0, 1]);
Expand Down
4 changes: 3 additions & 1 deletion rust/datafusion/src/physical_plan/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ impl ExecutionPlan for RepartitionExec {
})
.collect::<Result<Vec<_>>>()?;
// Hash arrays and compute buckets based on number of partitions
let hashes = create_hashes(&arrays, &random_state)?;
let hashes_buf = &mut vec![0; arrays[0].len()];
let hashes =
create_hashes(&arrays, &random_state, hashes_buf)?;
let mut indices = vec![vec![]; num_output_partitions];
for (index, hash) in hashes.iter().enumerate() {
indices
Expand Down