diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 26447847631fa..2d4073cef2412 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -86,9 +86,11 @@ use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays; use futures::TryStreamExt; use parking_lot::Mutex; +use super::partitioned_hash_eval::SeededRandomState; + /// Hard-coded seed to ensure hash values from the hash join differ from `RepartitionExec`, avoiding collisions. -pub(crate) const HASH_JOIN_SEED: RandomState = - RandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64); +pub(crate) const HASH_JOIN_SEED: SeededRandomState = + SeededRandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64); /// HashTable and input data for the left (build side) of a join pub(super) struct JoinLeftData { @@ -334,8 +336,8 @@ pub struct HashJoinExec { /// Each output stream waits on the `OnceAsync` to signal the completion of /// the hash table creation. left_fut: Arc>, - /// Shared the `RandomState` for the hashing algorithm - random_state: RandomState, + /// Shared the `SeededRandomState` for the hashing algorithm (seeds preserved for serialization) + random_state: SeededRandomState, /// Partitioning mode to use pub mode: PartitionMode, /// Execution metrics @@ -930,7 +932,7 @@ impl ExecutionPlan for HashJoinExec { MemoryConsumer::new("HashJoinInput").register(context.memory_pool()); Ok(collect_left_input( - self.random_state.clone(), + self.random_state.random_state().clone(), left_stream, on_left.clone(), join_metrics.clone(), @@ -958,7 +960,7 @@ impl ExecutionPlan for HashJoinExec { .register(context.memory_pool()); OnceFut::new(collect_left_input( - self.random_state.clone(), + self.random_state.random_state().clone(), left_stream, on_left.clone(), join_metrics.clone(), @@ -1041,7 +1043,7 @@ impl ExecutionPlan for HashJoinExec { self.filter.clone(), self.join_type, right_stream, - self.random_state.clone(), + self.random_state.random_state().clone(), join_metrics, column_indices_after_projection, self.null_equality, diff --git a/datafusion/physical-plan/src/joins/hash_join/mod.rs b/datafusion/physical-plan/src/joins/hash_join/mod.rs index 352209e9c3f75..8592e1d968535 100644 --- a/datafusion/physical-plan/src/joins/hash_join/mod.rs +++ b/datafusion/physical-plan/src/joins/hash_join/mod.rs @@ -18,7 +18,7 @@ //! [`HashJoinExec`] Partitioned Hash Join Operator pub use exec::HashJoinExec; -pub use partitioned_hash_eval::HashTableLookupExpr; +pub use partitioned_hash_eval::{HashExpr, HashTableLookupExpr, SeededRandomState}; mod exec; mod inlist_builder; diff --git a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs index b91f5f46d719c..4c437e813139d 100644 --- a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs +++ b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs @@ -34,6 +34,36 @@ use datafusion_physical_expr_common::physical_expr::{ use crate::{hash_utils::create_hashes, joins::utils::JoinHashMapType}; +/// RandomState wrapper that preserves the seeds used to create it. +/// +/// This is needed because ahash's `RandomState` doesn't expose its seeds after creation, +/// but we need them for serialization (e.g., protobuf serde). +#[derive(Clone, Debug)] +pub struct SeededRandomState { + random_state: RandomState, + seeds: (u64, u64, u64, u64), +} + +impl SeededRandomState { + /// Create a new SeededRandomState with the given seeds. + pub const fn with_seeds(k0: u64, k1: u64, k2: u64, k3: u64) -> Self { + Self { + random_state: RandomState::with_seeds(k0, k1, k2, k3), + seeds: (k0, k1, k2, k3), + } + } + + /// Get the inner RandomState. + pub fn random_state(&self) -> &RandomState { + &self.random_state + } + + /// Get the seeds used to create this RandomState. + pub fn seeds(&self) -> (u64, u64, u64, u64) { + self.seeds + } +} + /// Physical expression that computes hash values for a set of columns /// /// This expression computes the hash of join key columns using a specific RandomState. @@ -45,8 +75,8 @@ use crate::{hash_utils::create_hashes, joins::utils::JoinHashMapType}; pub struct HashExpr { /// Columns to hash on_columns: Vec, - /// Random state for hashing - random_state: RandomState, + /// Random state for hashing (with seeds preserved for serialization) + random_state: SeededRandomState, /// Description for display description: String, } @@ -56,11 +86,11 @@ impl HashExpr { /// /// # Arguments /// * `on_columns` - Columns to hash - /// * `random_state` - RandomState for hashing + /// * `random_state` - SeededRandomState for hashing /// * `description` - Description for debugging (e.g., "hash_repartition", "hash_join") - pub(super) fn new( + pub fn new( on_columns: Vec, - random_state: RandomState, + random_state: SeededRandomState, description: String, ) -> Self { Self { @@ -69,6 +99,21 @@ impl HashExpr { description, } } + + /// Get the columns being hashed. + pub fn on_columns(&self) -> &[PhysicalExprRef] { + &self.on_columns + } + + /// Get the seeds used for hashing. + pub fn seeds(&self) -> (u64, u64, u64, u64) { + self.random_state.seeds() + } + + /// Get the description. + pub fn description(&self) -> &str { + &self.description + } } impl std::fmt::Debug for HashExpr { @@ -79,7 +124,8 @@ impl std::fmt::Debug for HashExpr { .map(|e| e.to_string()) .collect::>() .join(", "); - write!(f, "{}({})", self.description, cols) + let (s1, s2, s3, s4) = self.seeds(); + write!(f, "{}({cols}, [{s1},{s2},{s3},{s4}])", self.description) } } @@ -87,12 +133,15 @@ impl Hash for HashExpr { fn hash(&self, state: &mut H) { self.on_columns.dyn_hash(state); self.description.hash(state); + self.seeds().hash(state); } } impl PartialEq for HashExpr { fn eq(&self, other: &Self) -> bool { - self.on_columns == other.on_columns && self.description == other.description + self.on_columns == other.on_columns + && self.description == other.description + && self.seeds() == other.seeds() } } @@ -147,7 +196,11 @@ impl PhysicalExpr for HashExpr { // Compute hashes let mut hashes_buffer = vec![0; num_rows]; - create_hashes(&keys_values, &self.random_state, &mut hashes_buffer)?; + create_hashes( + &keys_values, + self.random_state.random_state(), + &mut hashes_buffer, + )?; Ok(ColumnarValue::Array(Arc::new(UInt64Array::from( hashes_buffer, @@ -206,13 +259,29 @@ impl Hash for HashTableLookupExpr { fn hash(&self, state: &mut H) { self.hash_expr.dyn_hash(state); self.description.hash(state); + // Note that we compare hash_map by pointer equality. + // Actually comparing the contents of the hash maps would be expensive. + // The way these hash maps are used in actuality is that HashJoinExec creates + // one per partition per query execution, thus it is never possible for two different + // hash maps to have the same content in practice. + // Theoretically this is a public API and users could create identical hash maps, + // but that seems unlikely and not worth paying the cost of deep comparison all the time. + Arc::as_ptr(&self.hash_map).hash(state); } } impl PartialEq for HashTableLookupExpr { fn eq(&self, other: &Self) -> bool { - Arc::ptr_eq(&self.hash_expr, &other.hash_expr) + // Note that we compare hash_map by pointer equality. + // Actually comparing the contents of the hash maps would be expensive. + // The way these hash maps are used in actuality is that HashJoinExec creates + // one per partition per query execution, thus it is never possible for two different + // hash maps to have the same content in practice. + // Theoretically this is a public API and users could create identical hash maps, + // but that seems unlikely and not worth paying the cost of deep comparison all the time. + self.hash_expr.as_ref() == other.hash_expr.as_ref() && self.description == other.description + && Arc::ptr_eq(&self.hash_map, &other.hash_map) } } @@ -294,3 +363,265 @@ impl PhysicalExpr for HashTableLookupExpr { write!(f, "{}", self.description) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::joins::join_hash_map::JoinHashMapU32; + use datafusion_physical_expr::expressions::Column; + use std::collections::hash_map::DefaultHasher; + use std::hash::Hasher; + + fn compute_hash(value: &T) -> u64 { + let mut hasher = DefaultHasher::new(); + value.hash(&mut hasher); + hasher.finish() + } + + #[test] + fn test_hash_expr_eq_same() { + let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0)); + let col_b: PhysicalExprRef = Arc::new(Column::new("b", 1)); + + let expr1 = HashExpr::new( + vec![Arc::clone(&col_a), Arc::clone(&col_b)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "test_hash".to_string(), + ); + + let expr2 = HashExpr::new( + vec![Arc::clone(&col_a), Arc::clone(&col_b)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "test_hash".to_string(), + ); + + assert_eq!(expr1, expr2); + } + + #[test] + fn test_hash_expr_eq_different_columns() { + let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0)); + let col_b: PhysicalExprRef = Arc::new(Column::new("b", 1)); + let col_c: PhysicalExprRef = Arc::new(Column::new("c", 2)); + + let expr1 = HashExpr::new( + vec![Arc::clone(&col_a), Arc::clone(&col_b)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "test_hash".to_string(), + ); + + let expr2 = HashExpr::new( + vec![Arc::clone(&col_a), Arc::clone(&col_c)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "test_hash".to_string(), + ); + + assert_ne!(expr1, expr2); + } + + #[test] + fn test_hash_expr_eq_different_description() { + let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0)); + + let expr1 = HashExpr::new( + vec![Arc::clone(&col_a)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "hash_one".to_string(), + ); + + let expr2 = HashExpr::new( + vec![Arc::clone(&col_a)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "hash_two".to_string(), + ); + + assert_ne!(expr1, expr2); + } + + #[test] + fn test_hash_expr_eq_different_seeds() { + let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0)); + + let expr1 = HashExpr::new( + vec![Arc::clone(&col_a)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "test_hash".to_string(), + ); + + let expr2 = HashExpr::new( + vec![Arc::clone(&col_a)], + SeededRandomState::with_seeds(5, 6, 7, 8), + "test_hash".to_string(), + ); + + assert_ne!(expr1, expr2); + } + + #[test] + fn test_hash_expr_hash_consistency() { + let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0)); + let col_b: PhysicalExprRef = Arc::new(Column::new("b", 1)); + + let expr1 = HashExpr::new( + vec![Arc::clone(&col_a), Arc::clone(&col_b)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "test_hash".to_string(), + ); + + let expr2 = HashExpr::new( + vec![Arc::clone(&col_a), Arc::clone(&col_b)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "test_hash".to_string(), + ); + + // Equal expressions should have equal hashes + assert_eq!(expr1, expr2); + assert_eq!(compute_hash(&expr1), compute_hash(&expr2)); + } + + #[test] + fn test_hash_table_lookup_expr_eq_same() { + let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0)); + let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new( + vec![Arc::clone(&col_a)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "inner_hash".to_string(), + )); + let hash_map: Arc = + Arc::new(JoinHashMapU32::with_capacity(10)); + + let expr1 = HashTableLookupExpr::new( + Arc::clone(&hash_expr), + Arc::clone(&hash_map), + "lookup".to_string(), + ); + + let expr2 = HashTableLookupExpr::new( + Arc::clone(&hash_expr), + Arc::clone(&hash_map), + "lookup".to_string(), + ); + + assert_eq!(expr1, expr2); + } + + #[test] + fn test_hash_table_lookup_expr_eq_different_hash_expr() { + let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0)); + let col_b: PhysicalExprRef = Arc::new(Column::new("b", 1)); + + let hash_expr1: PhysicalExprRef = Arc::new(HashExpr::new( + vec![Arc::clone(&col_a)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "inner_hash".to_string(), + )); + + let hash_expr2: PhysicalExprRef = Arc::new(HashExpr::new( + vec![Arc::clone(&col_b)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "inner_hash".to_string(), + )); + + let hash_map: Arc = + Arc::new(JoinHashMapU32::with_capacity(10)); + + let expr1 = HashTableLookupExpr::new( + Arc::clone(&hash_expr1), + Arc::clone(&hash_map), + "lookup".to_string(), + ); + + let expr2 = HashTableLookupExpr::new( + Arc::clone(&hash_expr2), + Arc::clone(&hash_map), + "lookup".to_string(), + ); + + assert_ne!(expr1, expr2); + } + + #[test] + fn test_hash_table_lookup_expr_eq_different_description() { + let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0)); + let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new( + vec![Arc::clone(&col_a)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "inner_hash".to_string(), + )); + let hash_map: Arc = + Arc::new(JoinHashMapU32::with_capacity(10)); + + let expr1 = HashTableLookupExpr::new( + Arc::clone(&hash_expr), + Arc::clone(&hash_map), + "lookup_one".to_string(), + ); + + let expr2 = HashTableLookupExpr::new( + Arc::clone(&hash_expr), + Arc::clone(&hash_map), + "lookup_two".to_string(), + ); + + assert_ne!(expr1, expr2); + } + + #[test] + fn test_hash_table_lookup_expr_eq_different_hash_map() { + let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0)); + let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new( + vec![Arc::clone(&col_a)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "inner_hash".to_string(), + )); + + // Two different Arc pointers (even with same content) should not be equal + let hash_map1: Arc = + Arc::new(JoinHashMapU32::with_capacity(10)); + let hash_map2: Arc = + Arc::new(JoinHashMapU32::with_capacity(10)); + + let expr1 = HashTableLookupExpr::new( + Arc::clone(&hash_expr), + hash_map1, + "lookup".to_string(), + ); + + let expr2 = HashTableLookupExpr::new( + Arc::clone(&hash_expr), + hash_map2, + "lookup".to_string(), + ); + + // Different Arc pointers means not equal (uses Arc::ptr_eq) + assert_ne!(expr1, expr2); + } + + #[test] + fn test_hash_table_lookup_expr_hash_consistency() { + let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0)); + let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new( + vec![Arc::clone(&col_a)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "inner_hash".to_string(), + )); + let hash_map: Arc = + Arc::new(JoinHashMapU32::with_capacity(10)); + + let expr1 = HashTableLookupExpr::new( + Arc::clone(&hash_expr), + Arc::clone(&hash_map), + "lookup".to_string(), + ); + + let expr2 = HashTableLookupExpr::new( + Arc::clone(&hash_expr), + Arc::clone(&hash_map), + "lookup".to_string(), + ); + + // Equal expressions should have equal hashes + assert_eq!(expr1, expr2); + assert_eq!(compute_hash(&expr1), compute_hash(&expr2)); + } +} diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index 5aa2bbb57df4e..7d34ce9acbd57 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -26,10 +26,10 @@ use crate::ExecutionPlanProperties; use crate::joins::PartitionMode; use crate::joins::hash_join::exec::HASH_JOIN_SEED; use crate::joins::hash_join::inlist_builder::build_struct_fields; -use crate::joins::hash_join::partitioned_hash_eval::{HashExpr, HashTableLookupExpr}; +use crate::joins::hash_join::partitioned_hash_eval::{ + HashExpr, HashTableLookupExpr, SeededRandomState, +}; use crate::joins::utils::JoinHashMapType; - -use ahash::RandomState; use arrow::array::ArrayRef; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::config::ConfigOptions; @@ -88,7 +88,7 @@ impl PartitionBounds { fn create_membership_predicate( on_right: &[PhysicalExprRef], pushdown: PushdownStrategy, - random_state: &RandomState, + random_state: &SeededRandomState, schema: &Schema, ) -> Result>> { match pushdown { @@ -230,7 +230,7 @@ pub(crate) struct SharedBuildAccumulator { on_right: Vec, /// Random state for partitioning (RepartitionExec's hash function with 0,0,0,0 seeds) /// Used for PartitionedHashLookupPhysicalExpr - repartition_random_state: RandomState, + repartition_random_state: SeededRandomState, /// Schema of the probe (right) side for evaluating filter expressions probe_schema: Arc, } @@ -308,7 +308,7 @@ impl SharedBuildAccumulator { right_child: &dyn ExecutionPlan, dynamic_filter: Arc, on_right: Vec, - repartition_random_state: RandomState, + repartition_random_state: SeededRandomState, ) -> Self { // Troubleshooting: If partition counts are incorrect, verify this logic matches // the actual execution pattern in collect_build_side() diff --git a/datafusion/physical-plan/src/joins/join_hash_map.rs b/datafusion/physical-plan/src/joins/join_hash_map.rs index ed370fdb16cf0..b0ed6dcc7c255 100644 --- a/datafusion/physical-plan/src/joins/join_hash_map.rs +++ b/datafusion/physical-plan/src/joins/join_hash_map.rs @@ -94,6 +94,12 @@ use hashbrown::hash_table::Entry::{Occupied, Vacant}; /// /// At runtime we choose between using `JoinHashMapU32` and `JoinHashMapU64` which oth implement /// `JoinHashMapType`. +/// +/// ## Note on use of this trait as a public API +/// This is currently a public trait but is mainly intended for internal use within DataFusion. +/// For example, we may compare references to `JoinHashMapType` implementations by pointer equality +/// rather than deep equality of contents, as deep equality would be expensive and in our usage +/// patterns it is impossible for two different hash maps to have identical contents in a practical sense. pub trait JoinHashMapType: Send + Sync { fn extend_zero(&mut self, len: usize); diff --git a/datafusion/physical-plan/src/joins/mod.rs b/datafusion/physical-plan/src/joins/mod.rs index 0ca77b3cae982..3ff61ecf1dacc 100644 --- a/datafusion/physical-plan/src/joins/mod.rs +++ b/datafusion/physical-plan/src/joins/mod.rs @@ -20,7 +20,7 @@ use arrow::array::BooleanBufferBuilder; pub use cross_join::CrossJoinExec; use datafusion_physical_expr::PhysicalExprRef; -pub use hash_join::{HashJoinExec, HashTableLookupExpr}; +pub use hash_join::{HashExpr, HashJoinExec, HashTableLookupExpr, SeededRandomState}; pub use nested_loop_join::NestedLoopJoinExec; use parking_lot::Mutex; // Note: SortMergeJoin is not used in plans yet diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 5c9472182b0a1..1efdaaabc7d6a 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -61,6 +61,7 @@ use crate::filter_pushdown::{ ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, }; +use crate::joins::SeededRandomState; use crate::sort_pushdown::SortOrderPushdownResult; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays; @@ -429,8 +430,8 @@ enum BatchPartitionerState { /// Fixed RandomState used for hash repartitioning to ensure consistent behavior across /// executions and runs. -pub const REPARTITION_RANDOM_STATE: ahash::RandomState = - ahash::RandomState::with_seeds(0, 0, 0, 0); +pub const REPARTITION_RANDOM_STATE: SeededRandomState = + SeededRandomState::with_seeds(0, 0, 0, 0); impl BatchPartitioner { /// Create a new [`BatchPartitioner`] with the provided [`Partitioning`] @@ -514,7 +515,11 @@ impl BatchPartitioner { hash_buffer.clear(); hash_buffer.resize(batch.num_rows(), 0); - create_hashes(&arrays, &REPARTITION_RANDOM_STATE, hash_buffer)?; + create_hashes( + &arrays, + REPARTITION_RANDOM_STATE.random_state(), + hash_buffer, + )?; let mut indices: Vec<_> = (0..*partitions) .map(|_| Vec::with_capacity(batch.num_rows())) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 6616f77a5bb51..bd7dd3a6aff3c 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -872,6 +872,8 @@ message PhysicalExprNode { PhysicalExtensionExprNode extension = 19; UnknownColumn unknown_column = 20; + + PhysicalHashExprNode hash_expr = 21; } } @@ -990,6 +992,15 @@ message PhysicalExtensionExprNode { repeated PhysicalExprNode inputs = 2; } +message PhysicalHashExprNode { + repeated PhysicalExprNode on_columns = 1; + uint64 seed0 = 2; + uint64 seed1 = 3; + uint64 seed2 = 4; + uint64 seed3 = 5; + string description = 6; +} + message FilterExecNode { PhysicalPlanNode input = 1; PhysicalExprNode expr = 2; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index cf3dcfe01bede..e269606d163a3 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -15934,6 +15934,9 @@ impl serde::Serialize for PhysicalExprNode { physical_expr_node::ExprType::UnknownColumn(v) => { struct_ser.serialize_field("unknownColumn", v)?; } + physical_expr_node::ExprType::HashExpr(v) => { + struct_ser.serialize_field("hashExpr", v)?; + } } } struct_ser.end() @@ -15976,6 +15979,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "extension", "unknown_column", "unknownColumn", + "hash_expr", + "hashExpr", ]; #[allow(clippy::enum_variant_names)] @@ -15998,6 +16003,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { LikeExpr, Extension, UnknownColumn, + HashExpr, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -16037,6 +16043,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "likeExpr" | "like_expr" => Ok(GeneratedField::LikeExpr), "extension" => Ok(GeneratedField::Extension), "unknownColumn" | "unknown_column" => Ok(GeneratedField::UnknownColumn), + "hashExpr" | "hash_expr" => Ok(GeneratedField::HashExpr), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -16183,6 +16190,13 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { return Err(serde::de::Error::duplicate_field("unknownColumn")); } expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::UnknownColumn) +; + } + GeneratedField::HashExpr => { + if expr_type__.is_some() { + return Err(serde::de::Error::duplicate_field("hashExpr")); + } + expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::HashExpr) ; } } @@ -16419,6 +16433,199 @@ impl<'de> serde::Deserialize<'de> for PhysicalExtensionNode { deserializer.deserialize_struct("datafusion.PhysicalExtensionNode", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for PhysicalHashExprNode { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.on_columns.is_empty() { + len += 1; + } + if self.seed0 != 0 { + len += 1; + } + if self.seed1 != 0 { + len += 1; + } + if self.seed2 != 0 { + len += 1; + } + if self.seed3 != 0 { + len += 1; + } + if !self.description.is_empty() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalHashExprNode", len)?; + if !self.on_columns.is_empty() { + struct_ser.serialize_field("onColumns", &self.on_columns)?; + } + if self.seed0 != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("seed0", ToString::to_string(&self.seed0).as_str())?; + } + if self.seed1 != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("seed1", ToString::to_string(&self.seed1).as_str())?; + } + if self.seed2 != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("seed2", ToString::to_string(&self.seed2).as_str())?; + } + if self.seed3 != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("seed3", ToString::to_string(&self.seed3).as_str())?; + } + if !self.description.is_empty() { + struct_ser.serialize_field("description", &self.description)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for PhysicalHashExprNode { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "on_columns", + "onColumns", + "seed0", + "seed1", + "seed2", + "seed3", + "description", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + OnColumns, + Seed0, + Seed1, + Seed2, + Seed3, + Description, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "onColumns" | "on_columns" => Ok(GeneratedField::OnColumns), + "seed0" => Ok(GeneratedField::Seed0), + "seed1" => Ok(GeneratedField::Seed1), + "seed2" => Ok(GeneratedField::Seed2), + "seed3" => Ok(GeneratedField::Seed3), + "description" => Ok(GeneratedField::Description), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = PhysicalHashExprNode; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.PhysicalHashExprNode") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut on_columns__ = None; + let mut seed0__ = None; + let mut seed1__ = None; + let mut seed2__ = None; + let mut seed3__ = None; + let mut description__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::OnColumns => { + if on_columns__.is_some() { + return Err(serde::de::Error::duplicate_field("onColumns")); + } + on_columns__ = Some(map_.next_value()?); + } + GeneratedField::Seed0 => { + if seed0__.is_some() { + return Err(serde::de::Error::duplicate_field("seed0")); + } + seed0__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::Seed1 => { + if seed1__.is_some() { + return Err(serde::de::Error::duplicate_field("seed1")); + } + seed1__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::Seed2 => { + if seed2__.is_some() { + return Err(serde::de::Error::duplicate_field("seed2")); + } + seed2__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::Seed3 => { + if seed3__.is_some() { + return Err(serde::de::Error::duplicate_field("seed3")); + } + seed3__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::Description => { + if description__.is_some() { + return Err(serde::de::Error::duplicate_field("description")); + } + description__ = Some(map_.next_value()?); + } + } + } + Ok(PhysicalHashExprNode { + on_columns: on_columns__.unwrap_or_default(), + seed0: seed0__.unwrap_or_default(), + seed1: seed1__.unwrap_or_default(), + seed2: seed2__.unwrap_or_default(), + seed3: seed3__.unwrap_or_default(), + description: description__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.PhysicalHashExprNode", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for PhysicalHashRepartition { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 885b61001c91e..cf343e0258d0b 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1276,7 +1276,7 @@ pub struct PhysicalExtensionNode { pub struct PhysicalExprNode { #[prost( oneof = "physical_expr_node::ExprType", - tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20" + tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20, 21" )] pub expr_type: ::core::option::Option, } @@ -1327,6 +1327,8 @@ pub mod physical_expr_node { Extension(super::PhysicalExtensionExprNode), #[prost(message, tag = "20")] UnknownColumn(super::UnknownColumn), + #[prost(message, tag = "21")] + HashExpr(super::PhysicalHashExprNode), } } #[derive(Clone, PartialEq, ::prost::Message)] @@ -1517,6 +1519,21 @@ pub struct PhysicalExtensionExprNode { pub inputs: ::prost::alloc::vec::Vec, } #[derive(Clone, PartialEq, ::prost::Message)] +pub struct PhysicalHashExprNode { + #[prost(message, repeated, tag = "1")] + pub on_columns: ::prost::alloc::vec::Vec, + #[prost(uint64, tag = "2")] + pub seed0: u64, + #[prost(uint64, tag = "3")] + pub seed1: u64, + #[prost(uint64, tag = "4")] + pub seed2: u64, + #[prost(uint64, tag = "5")] + pub seed3: u64, + #[prost(string, tag = "6")] + pub description: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] pub struct FilterExecNode { #[prost(message, optional, boxed, tag = "1")] pub input: ::core::option::Option<::prost::alloc::boxed::Box>, diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index aa02e63a5d0d0..073fdd858cdd3 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -48,6 +48,7 @@ use datafusion_physical_plan::expressions::{ BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, in_list, }; +use datafusion_physical_plan::joins::{HashExpr, SeededRandomState}; use datafusion_physical_plan::windows::{create_window_expr, schema_add_window_field}; use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr}; use datafusion_proto_common::common::proto_error; @@ -399,6 +400,20 @@ pub fn parse_physical_expr( codec, )?, )), + ExprType::HashExpr(hash_expr) => { + let on_columns = + parse_physical_exprs(&hash_expr.on_columns, ctx, input_schema, codec)?; + Arc::new(HashExpr::new( + on_columns, + SeededRandomState::with_seeds( + hash_expr.seed0, + hash_expr.seed1, + hash_expr.seed2, + hash_expr.seed3, + ), + hash_expr.description.clone(), + )) + } ExprType::Extension(extension) => { let inputs: Vec> = extension .inputs diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index b06dec592d5c3..9558effb8a2a6 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -41,7 +41,7 @@ use datafusion_physical_plan::expressions::{ BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, }; -use datafusion_physical_plan::joins::HashTableLookupExpr; +use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr}; use datafusion_physical_plan::udaf::AggregateFunctionExpr; use datafusion_physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr}; use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr}; @@ -410,6 +410,20 @@ pub fn serialize_physical_expr( }, ))), }) + } else if let Some(expr) = expr.downcast_ref::() { + let (s0, s1, s2, s3) = expr.seeds(); + Ok(protobuf::PhysicalExprNode { + expr_type: Some(protobuf::physical_expr_node::ExprType::HashExpr( + protobuf::PhysicalHashExprNode { + on_columns: serialize_physical_exprs(expr.on_columns(), codec)?, + seed0: s0, + seed1: s1, + seed2: s2, + seed3: s3, + description: expr.description().to_string(), + }, + )), + }) } else { let mut buf: Vec = vec![]; match codec.try_encode_expr(&value, &mut buf) { diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index fa505e6f1520a..5c70bcbdff100 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -2372,3 +2372,36 @@ fn roundtrip_hash_table_lookup_expr_to_lit() -> Result<()> { Ok(()) } + +#[test] +fn roundtrip_hash_expr() -> Result<()> { + use datafusion::physical_plan::joins::{HashExpr, SeededRandomState}; + + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Utf8, false), + ])); + + // Create a HashExpr with test columns and seeds + let on_columns = vec![col("a", &schema)?, col("b", &schema)?]; + let hash_expr: Arc = Arc::new(HashExpr::new( + on_columns, + SeededRandomState::with_seeds(0, 1, 2, 3), // arbitrary random seeds for testing + "test_hash".to_string(), + )); + + // Wrap in a filter by comparing hash value to a literal + // hash_expr > 0 is always boolean + let filter_expr = binary(hash_expr, Operator::Gt, lit(0u64), &schema)?; + let filter = Arc::new(FilterExec::try_new( + filter_expr, + Arc::new(EmptyExec::new(schema)), + )?); + + // Confirm that the debug string contains the random state seeds + assert!( + format!("{filter:?}").contains("test_hash(a@0, b@1, [0,1,2,3])"), + "Debug string missing seeds: {filter:?}" + ); + roundtrip_test(filter) +}