Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,11 @@ use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
use futures::TryStreamExt;
use parking_lot::Mutex;

use super::partitioned_hash_eval::SeededRandomState;

/// Hard-coded seed to ensure hash values from the hash join differ from `RepartitionExec`, avoiding collisions.
pub(crate) const HASH_JOIN_SEED: RandomState =
RandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64);
pub(crate) const HASH_JOIN_SEED: SeededRandomState =
SeededRandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64);
Comment on lines +92 to +93
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I like the fact that having this SeededRandomState struct is an explicit indicator that the underlaying RandomState is not completely random.


/// HashTable and input data for the left (build side) of a join
pub(super) struct JoinLeftData {
Expand Down Expand Up @@ -334,8 +336,8 @@ pub struct HashJoinExec {
/// Each output stream waits on the `OnceAsync` to signal the completion of
/// the hash table creation.
left_fut: Arc<OnceAsync<JoinLeftData>>,
/// Shared the `RandomState` for the hashing algorithm
random_state: RandomState,
/// Shared the `SeededRandomState` for the hashing algorithm (seeds preserved for serialization)
random_state: SeededRandomState,
/// Partitioning mode to use
pub mode: PartitionMode,
/// Execution metrics
Expand Down Expand Up @@ -930,7 +932,7 @@ impl ExecutionPlan for HashJoinExec {
MemoryConsumer::new("HashJoinInput").register(context.memory_pool());

Ok(collect_left_input(
self.random_state.clone(),
self.random_state.random_state().clone(),
left_stream,
on_left.clone(),
join_metrics.clone(),
Expand Down Expand Up @@ -958,7 +960,7 @@ impl ExecutionPlan for HashJoinExec {
.register(context.memory_pool());

OnceFut::new(collect_left_input(
self.random_state.clone(),
self.random_state.random_state().clone(),
left_stream,
on_left.clone(),
join_metrics.clone(),
Expand Down Expand Up @@ -1041,7 +1043,7 @@ impl ExecutionPlan for HashJoinExec {
self.filter.clone(),
self.join_type,
right_stream,
self.random_state.clone(),
self.random_state.random_state().clone(),
join_metrics,
column_indices_after_projection,
self.null_equality,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/joins/hash_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! [`HashJoinExec`] Partitioned Hash Join Operator

pub use exec::HashJoinExec;
pub use partitioned_hash_eval::HashTableLookupExpr;
pub use partitioned_hash_eval::{HashExpr, HashTableLookupExpr, SeededRandomState};

mod exec;
mod inlist_builder;
Expand Down
Loading