From b2f4028ab15f201f030ca665b6d884ee72ff415a Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 17 Dec 2025 15:43:15 -0600 Subject: [PATCH 1/9] feat(proto): Add protobuf serialization for HashExpr MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR adds protobuf serialization/deserialization support for HashExpr, enabling distributed query execution to serialize hash expressions used in hash joins and repartitioning. ## Changes ### SeededRandomState wrapper - Added `SeededRandomState` struct that wraps `ahash::RandomState` while preserving the seeds used to create it (since RandomState doesn't expose seeds after creation) - Updated `HASH_JOIN_SEED` and `REPARTITION_RANDOM_STATE` constants to use `SeededRandomState` ### HashExpr updates - Changed `HashExpr` to use `SeededRandomState` instead of raw `RandomState` - Added getter methods: `on_columns()`, `seeds()`, `description()` - Exported `HashExpr` and `SeededRandomState` from joins module ### Protobuf support - Added `PhysicalHashExprNode` message to datafusion.proto with fields for on_columns, seeds (4 u64 values), and description - Implemented serialization in to_proto.rs - Implemented deserialization in from_proto.rs - Added roundtrip test to verify serde works correctly 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../physical-plan/src/joins/hash_join/exec.rs | 16 +- .../physical-plan/src/joins/hash_join/mod.rs | 2 +- .../joins/hash_join/partitioned_hash_eval.rs | 57 ++++- .../src/joins/hash_join/shared_bounds.rs | 10 +- datafusion/physical-plan/src/joins/mod.rs | 2 +- .../physical-plan/src/repartition/mod.rs | 7 +- datafusion/proto/proto/datafusion.proto | 11 + datafusion/proto/src/generated/pbjson.rs | 207 ++++++++++++++++++ datafusion/proto/src/generated/prost.rs | 19 +- .../proto/src/physical_plan/from_proto.rs | 15 ++ .../proto/src/physical_plan/to_proto.rs | 16 +- .../tests/cases/roundtrip_physical_plan.rs | 28 +++ 12 files changed, 364 insertions(+), 26 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 26447847631fa..2d4073cef2412 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -86,9 +86,11 @@ use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays; use futures::TryStreamExt; use parking_lot::Mutex; +use super::partitioned_hash_eval::SeededRandomState; + /// Hard-coded seed to ensure hash values from the hash join differ from `RepartitionExec`, avoiding collisions. -pub(crate) const HASH_JOIN_SEED: RandomState = - RandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64); +pub(crate) const HASH_JOIN_SEED: SeededRandomState = + SeededRandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64); /// HashTable and input data for the left (build side) of a join pub(super) struct JoinLeftData { @@ -334,8 +336,8 @@ pub struct HashJoinExec { /// Each output stream waits on the `OnceAsync` to signal the completion of /// the hash table creation. left_fut: Arc>, - /// Shared the `RandomState` for the hashing algorithm - random_state: RandomState, + /// Shared the `SeededRandomState` for the hashing algorithm (seeds preserved for serialization) + random_state: SeededRandomState, /// Partitioning mode to use pub mode: PartitionMode, /// Execution metrics @@ -930,7 +932,7 @@ impl ExecutionPlan for HashJoinExec { MemoryConsumer::new("HashJoinInput").register(context.memory_pool()); Ok(collect_left_input( - self.random_state.clone(), + self.random_state.random_state().clone(), left_stream, on_left.clone(), join_metrics.clone(), @@ -958,7 +960,7 @@ impl ExecutionPlan for HashJoinExec { .register(context.memory_pool()); OnceFut::new(collect_left_input( - self.random_state.clone(), + self.random_state.random_state().clone(), left_stream, on_left.clone(), join_metrics.clone(), @@ -1041,7 +1043,7 @@ impl ExecutionPlan for HashJoinExec { self.filter.clone(), self.join_type, right_stream, - self.random_state.clone(), + self.random_state.random_state().clone(), join_metrics, column_indices_after_projection, self.null_equality, diff --git a/datafusion/physical-plan/src/joins/hash_join/mod.rs b/datafusion/physical-plan/src/joins/hash_join/mod.rs index 352209e9c3f75..8592e1d968535 100644 --- a/datafusion/physical-plan/src/joins/hash_join/mod.rs +++ b/datafusion/physical-plan/src/joins/hash_join/mod.rs @@ -18,7 +18,7 @@ //! [`HashJoinExec`] Partitioned Hash Join Operator pub use exec::HashJoinExec; -pub use partitioned_hash_eval::HashTableLookupExpr; +pub use partitioned_hash_eval::{HashExpr, HashTableLookupExpr, SeededRandomState}; mod exec; mod inlist_builder; diff --git a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs index b91f5f46d719c..0413f992befb1 100644 --- a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs +++ b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs @@ -34,6 +34,36 @@ use datafusion_physical_expr_common::physical_expr::{ use crate::{hash_utils::create_hashes, joins::utils::JoinHashMapType}; +/// RandomState wrapper that preserves the seeds used to create it. +/// +/// This is needed because ahash's `RandomState` doesn't expose its seeds after creation, +/// but we need them for serialization (e.g., protobuf serde). +#[derive(Clone, Debug)] +pub struct SeededRandomState { + random_state: RandomState, + seeds: (u64, u64, u64, u64), +} + +impl SeededRandomState { + /// Create a new SeededRandomState with the given seeds. + pub const fn with_seeds(k0: u64, k1: u64, k2: u64, k3: u64) -> Self { + Self { + random_state: RandomState::with_seeds(k0, k1, k2, k3), + seeds: (k0, k1, k2, k3), + } + } + + /// Get the inner RandomState. + pub fn random_state(&self) -> &RandomState { + &self.random_state + } + + /// Get the seeds used to create this RandomState. + pub fn seeds(&self) -> (u64, u64, u64, u64) { + self.seeds + } +} + /// Physical expression that computes hash values for a set of columns /// /// This expression computes the hash of join key columns using a specific RandomState. @@ -45,8 +75,8 @@ use crate::{hash_utils::create_hashes, joins::utils::JoinHashMapType}; pub struct HashExpr { /// Columns to hash on_columns: Vec, - /// Random state for hashing - random_state: RandomState, + /// Random state for hashing (with seeds preserved for serialization) + random_state: SeededRandomState, /// Description for display description: String, } @@ -56,11 +86,11 @@ impl HashExpr { /// /// # Arguments /// * `on_columns` - Columns to hash - /// * `random_state` - RandomState for hashing + /// * `random_state` - SeededRandomState for hashing /// * `description` - Description for debugging (e.g., "hash_repartition", "hash_join") - pub(super) fn new( + pub fn new( on_columns: Vec, - random_state: RandomState, + random_state: SeededRandomState, description: String, ) -> Self { Self { @@ -69,6 +99,21 @@ impl HashExpr { description, } } + + /// Get the columns being hashed. + pub fn on_columns(&self) -> &[PhysicalExprRef] { + &self.on_columns + } + + /// Get the seeds used for hashing. + pub fn seeds(&self) -> (u64, u64, u64, u64) { + self.random_state.seeds() + } + + /// Get the description. + pub fn description(&self) -> &str { + &self.description + } } impl std::fmt::Debug for HashExpr { @@ -147,7 +192,7 @@ impl PhysicalExpr for HashExpr { // Compute hashes let mut hashes_buffer = vec![0; num_rows]; - create_hashes(&keys_values, &self.random_state, &mut hashes_buffer)?; + create_hashes(&keys_values, self.random_state.random_state(), &mut hashes_buffer)?; Ok(ColumnarValue::Array(Arc::new(UInt64Array::from( hashes_buffer, diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index 5aa2bbb57df4e..9dc2dffa1514d 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -26,10 +26,8 @@ use crate::ExecutionPlanProperties; use crate::joins::PartitionMode; use crate::joins::hash_join::exec::HASH_JOIN_SEED; use crate::joins::hash_join::inlist_builder::build_struct_fields; -use crate::joins::hash_join::partitioned_hash_eval::{HashExpr, HashTableLookupExpr}; +use crate::joins::hash_join::partitioned_hash_eval::{HashExpr, HashTableLookupExpr, SeededRandomState}; use crate::joins::utils::JoinHashMapType; - -use ahash::RandomState; use arrow::array::ArrayRef; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::config::ConfigOptions; @@ -88,7 +86,7 @@ impl PartitionBounds { fn create_membership_predicate( on_right: &[PhysicalExprRef], pushdown: PushdownStrategy, - random_state: &RandomState, + random_state: &SeededRandomState, schema: &Schema, ) -> Result>> { match pushdown { @@ -230,7 +228,7 @@ pub(crate) struct SharedBuildAccumulator { on_right: Vec, /// Random state for partitioning (RepartitionExec's hash function with 0,0,0,0 seeds) /// Used for PartitionedHashLookupPhysicalExpr - repartition_random_state: RandomState, + repartition_random_state: SeededRandomState, /// Schema of the probe (right) side for evaluating filter expressions probe_schema: Arc, } @@ -308,7 +306,7 @@ impl SharedBuildAccumulator { right_child: &dyn ExecutionPlan, dynamic_filter: Arc, on_right: Vec, - repartition_random_state: RandomState, + repartition_random_state: SeededRandomState, ) -> Self { // Troubleshooting: If partition counts are incorrect, verify this logic matches // the actual execution pattern in collect_build_side() diff --git a/datafusion/physical-plan/src/joins/mod.rs b/datafusion/physical-plan/src/joins/mod.rs index 0ca77b3cae982..3ff61ecf1dacc 100644 --- a/datafusion/physical-plan/src/joins/mod.rs +++ b/datafusion/physical-plan/src/joins/mod.rs @@ -20,7 +20,7 @@ use arrow::array::BooleanBufferBuilder; pub use cross_join::CrossJoinExec; use datafusion_physical_expr::PhysicalExprRef; -pub use hash_join::{HashJoinExec, HashTableLookupExpr}; +pub use hash_join::{HashExpr, HashJoinExec, HashTableLookupExpr, SeededRandomState}; pub use nested_loop_join::NestedLoopJoinExec; use parking_lot::Mutex; // Note: SortMergeJoin is not used in plans yet diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 5c9472182b0a1..2e88c031418c6 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -57,6 +57,7 @@ use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; +use crate::joins::SeededRandomState; use crate::filter_pushdown::{ ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, @@ -429,8 +430,8 @@ enum BatchPartitionerState { /// Fixed RandomState used for hash repartitioning to ensure consistent behavior across /// executions and runs. -pub const REPARTITION_RANDOM_STATE: ahash::RandomState = - ahash::RandomState::with_seeds(0, 0, 0, 0); +pub const REPARTITION_RANDOM_STATE: SeededRandomState = + SeededRandomState::with_seeds(0, 0, 0, 0); impl BatchPartitioner { /// Create a new [`BatchPartitioner`] with the provided [`Partitioning`] @@ -514,7 +515,7 @@ impl BatchPartitioner { hash_buffer.clear(); hash_buffer.resize(batch.num_rows(), 0); - create_hashes(&arrays, &REPARTITION_RANDOM_STATE, hash_buffer)?; + create_hashes(&arrays, REPARTITION_RANDOM_STATE.random_state(), hash_buffer)?; let mut indices: Vec<_> = (0..*partitions) .map(|_| Vec::with_capacity(batch.num_rows())) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 6616f77a5bb51..bd7dd3a6aff3c 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -872,6 +872,8 @@ message PhysicalExprNode { PhysicalExtensionExprNode extension = 19; UnknownColumn unknown_column = 20; + + PhysicalHashExprNode hash_expr = 21; } } @@ -990,6 +992,15 @@ message PhysicalExtensionExprNode { repeated PhysicalExprNode inputs = 2; } +message PhysicalHashExprNode { + repeated PhysicalExprNode on_columns = 1; + uint64 seed0 = 2; + uint64 seed1 = 3; + uint64 seed2 = 4; + uint64 seed3 = 5; + string description = 6; +} + message FilterExecNode { PhysicalPlanNode input = 1; PhysicalExprNode expr = 2; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index cf3dcfe01bede..e269606d163a3 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -15934,6 +15934,9 @@ impl serde::Serialize for PhysicalExprNode { physical_expr_node::ExprType::UnknownColumn(v) => { struct_ser.serialize_field("unknownColumn", v)?; } + physical_expr_node::ExprType::HashExpr(v) => { + struct_ser.serialize_field("hashExpr", v)?; + } } } struct_ser.end() @@ -15976,6 +15979,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "extension", "unknown_column", "unknownColumn", + "hash_expr", + "hashExpr", ]; #[allow(clippy::enum_variant_names)] @@ -15998,6 +16003,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { LikeExpr, Extension, UnknownColumn, + HashExpr, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -16037,6 +16043,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "likeExpr" | "like_expr" => Ok(GeneratedField::LikeExpr), "extension" => Ok(GeneratedField::Extension), "unknownColumn" | "unknown_column" => Ok(GeneratedField::UnknownColumn), + "hashExpr" | "hash_expr" => Ok(GeneratedField::HashExpr), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -16183,6 +16190,13 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { return Err(serde::de::Error::duplicate_field("unknownColumn")); } expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::UnknownColumn) +; + } + GeneratedField::HashExpr => { + if expr_type__.is_some() { + return Err(serde::de::Error::duplicate_field("hashExpr")); + } + expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::HashExpr) ; } } @@ -16419,6 +16433,199 @@ impl<'de> serde::Deserialize<'de> for PhysicalExtensionNode { deserializer.deserialize_struct("datafusion.PhysicalExtensionNode", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for PhysicalHashExprNode { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.on_columns.is_empty() { + len += 1; + } + if self.seed0 != 0 { + len += 1; + } + if self.seed1 != 0 { + len += 1; + } + if self.seed2 != 0 { + len += 1; + } + if self.seed3 != 0 { + len += 1; + } + if !self.description.is_empty() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalHashExprNode", len)?; + if !self.on_columns.is_empty() { + struct_ser.serialize_field("onColumns", &self.on_columns)?; + } + if self.seed0 != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("seed0", ToString::to_string(&self.seed0).as_str())?; + } + if self.seed1 != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("seed1", ToString::to_string(&self.seed1).as_str())?; + } + if self.seed2 != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("seed2", ToString::to_string(&self.seed2).as_str())?; + } + if self.seed3 != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("seed3", ToString::to_string(&self.seed3).as_str())?; + } + if !self.description.is_empty() { + struct_ser.serialize_field("description", &self.description)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for PhysicalHashExprNode { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "on_columns", + "onColumns", + "seed0", + "seed1", + "seed2", + "seed3", + "description", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + OnColumns, + Seed0, + Seed1, + Seed2, + Seed3, + Description, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "onColumns" | "on_columns" => Ok(GeneratedField::OnColumns), + "seed0" => Ok(GeneratedField::Seed0), + "seed1" => Ok(GeneratedField::Seed1), + "seed2" => Ok(GeneratedField::Seed2), + "seed3" => Ok(GeneratedField::Seed3), + "description" => Ok(GeneratedField::Description), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = PhysicalHashExprNode; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.PhysicalHashExprNode") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut on_columns__ = None; + let mut seed0__ = None; + let mut seed1__ = None; + let mut seed2__ = None; + let mut seed3__ = None; + let mut description__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::OnColumns => { + if on_columns__.is_some() { + return Err(serde::de::Error::duplicate_field("onColumns")); + } + on_columns__ = Some(map_.next_value()?); + } + GeneratedField::Seed0 => { + if seed0__.is_some() { + return Err(serde::de::Error::duplicate_field("seed0")); + } + seed0__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::Seed1 => { + if seed1__.is_some() { + return Err(serde::de::Error::duplicate_field("seed1")); + } + seed1__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::Seed2 => { + if seed2__.is_some() { + return Err(serde::de::Error::duplicate_field("seed2")); + } + seed2__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::Seed3 => { + if seed3__.is_some() { + return Err(serde::de::Error::duplicate_field("seed3")); + } + seed3__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::Description => { + if description__.is_some() { + return Err(serde::de::Error::duplicate_field("description")); + } + description__ = Some(map_.next_value()?); + } + } + } + Ok(PhysicalHashExprNode { + on_columns: on_columns__.unwrap_or_default(), + seed0: seed0__.unwrap_or_default(), + seed1: seed1__.unwrap_or_default(), + seed2: seed2__.unwrap_or_default(), + seed3: seed3__.unwrap_or_default(), + description: description__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.PhysicalHashExprNode", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for PhysicalHashRepartition { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 885b61001c91e..cf343e0258d0b 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1276,7 +1276,7 @@ pub struct PhysicalExtensionNode { pub struct PhysicalExprNode { #[prost( oneof = "physical_expr_node::ExprType", - tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20" + tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20, 21" )] pub expr_type: ::core::option::Option, } @@ -1327,6 +1327,8 @@ pub mod physical_expr_node { Extension(super::PhysicalExtensionExprNode), #[prost(message, tag = "20")] UnknownColumn(super::UnknownColumn), + #[prost(message, tag = "21")] + HashExpr(super::PhysicalHashExprNode), } } #[derive(Clone, PartialEq, ::prost::Message)] @@ -1517,6 +1519,21 @@ pub struct PhysicalExtensionExprNode { pub inputs: ::prost::alloc::vec::Vec, } #[derive(Clone, PartialEq, ::prost::Message)] +pub struct PhysicalHashExprNode { + #[prost(message, repeated, tag = "1")] + pub on_columns: ::prost::alloc::vec::Vec, + #[prost(uint64, tag = "2")] + pub seed0: u64, + #[prost(uint64, tag = "3")] + pub seed1: u64, + #[prost(uint64, tag = "4")] + pub seed2: u64, + #[prost(uint64, tag = "5")] + pub seed3: u64, + #[prost(string, tag = "6")] + pub description: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] pub struct FilterExecNode { #[prost(message, optional, boxed, tag = "1")] pub input: ::core::option::Option<::prost::alloc::boxed::Box>, diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index aa02e63a5d0d0..073fdd858cdd3 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -48,6 +48,7 @@ use datafusion_physical_plan::expressions::{ BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, in_list, }; +use datafusion_physical_plan::joins::{HashExpr, SeededRandomState}; use datafusion_physical_plan::windows::{create_window_expr, schema_add_window_field}; use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr}; use datafusion_proto_common::common::proto_error; @@ -399,6 +400,20 @@ pub fn parse_physical_expr( codec, )?, )), + ExprType::HashExpr(hash_expr) => { + let on_columns = + parse_physical_exprs(&hash_expr.on_columns, ctx, input_schema, codec)?; + Arc::new(HashExpr::new( + on_columns, + SeededRandomState::with_seeds( + hash_expr.seed0, + hash_expr.seed1, + hash_expr.seed2, + hash_expr.seed3, + ), + hash_expr.description.clone(), + )) + } ExprType::Extension(extension) => { let inputs: Vec> = extension .inputs diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index b06dec592d5c3..9558effb8a2a6 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -41,7 +41,7 @@ use datafusion_physical_plan::expressions::{ BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, }; -use datafusion_physical_plan::joins::HashTableLookupExpr; +use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr}; use datafusion_physical_plan::udaf::AggregateFunctionExpr; use datafusion_physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr}; use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr}; @@ -410,6 +410,20 @@ pub fn serialize_physical_expr( }, ))), }) + } else if let Some(expr) = expr.downcast_ref::() { + let (s0, s1, s2, s3) = expr.seeds(); + Ok(protobuf::PhysicalExprNode { + expr_type: Some(protobuf::physical_expr_node::ExprType::HashExpr( + protobuf::PhysicalHashExprNode { + on_columns: serialize_physical_exprs(expr.on_columns(), codec)?, + seed0: s0, + seed1: s1, + seed2: s2, + seed3: s3, + description: expr.description().to_string(), + }, + )), + }) } else { let mut buf: Vec = vec![]; match codec.try_encode_expr(&value, &mut buf) { diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index fa505e6f1520a..f5e0b4f157f5e 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -2372,3 +2372,31 @@ fn roundtrip_hash_table_lookup_expr_to_lit() -> Result<()> { Ok(()) } + +#[test] +fn roundtrip_hash_expr() -> Result<()> { + use datafusion::physical_plan::joins::{HashExpr, SeededRandomState}; + + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Utf8, false), + ])); + + // Create a HashExpr with test columns and seeds + let on_columns = vec![col("a", &schema)?, col("b", &schema)?]; + let hash_expr: Arc = Arc::new(HashExpr::new( + on_columns, + SeededRandomState::with_seeds(0, 0, 0, 0), // repartition seeds + "test_hash".to_string(), + )); + + // Wrap in a filter by comparing hash value to a literal + // hash_expr > 0 is always boolean + let filter_expr = binary(hash_expr, Operator::Gt, lit(0u64), &schema)?; + let filter = Arc::new(FilterExec::try_new( + filter_expr, + Arc::new(EmptyExec::new(schema)), + )?); + + roundtrip_test(filter) +} From eaf4bf78213d7241de81a86919e70c9e989db911 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 17 Dec 2025 15:46:17 -0600 Subject: [PATCH 2/9] fmt --- .../src/joins/hash_join/partitioned_hash_eval.rs | 6 +++++- .../physical-plan/src/joins/hash_join/shared_bounds.rs | 4 +++- datafusion/physical-plan/src/repartition/mod.rs | 8 ++++++-- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs index 0413f992befb1..b239054a9e925 100644 --- a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs +++ b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs @@ -192,7 +192,11 @@ impl PhysicalExpr for HashExpr { // Compute hashes let mut hashes_buffer = vec![0; num_rows]; - create_hashes(&keys_values, self.random_state.random_state(), &mut hashes_buffer)?; + create_hashes( + &keys_values, + self.random_state.random_state(), + &mut hashes_buffer, + )?; Ok(ColumnarValue::Array(Arc::new(UInt64Array::from( hashes_buffer, diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index 9dc2dffa1514d..7d34ce9acbd57 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -26,7 +26,9 @@ use crate::ExecutionPlanProperties; use crate::joins::PartitionMode; use crate::joins::hash_join::exec::HASH_JOIN_SEED; use crate::joins::hash_join::inlist_builder::build_struct_fields; -use crate::joins::hash_join::partitioned_hash_eval::{HashExpr, HashTableLookupExpr, SeededRandomState}; +use crate::joins::hash_join::partitioned_hash_eval::{ + HashExpr, HashTableLookupExpr, SeededRandomState, +}; use crate::joins::utils::JoinHashMapType; use arrow::array::ArrayRef; use arrow::datatypes::{DataType, Field, Schema}; diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 2e88c031418c6..1efdaaabc7d6a 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -57,11 +57,11 @@ use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; -use crate::joins::SeededRandomState; use crate::filter_pushdown::{ ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, }; +use crate::joins::SeededRandomState; use crate::sort_pushdown::SortOrderPushdownResult; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays; @@ -515,7 +515,11 @@ impl BatchPartitioner { hash_buffer.clear(); hash_buffer.resize(batch.num_rows(), 0); - create_hashes(&arrays, REPARTITION_RANDOM_STATE.random_state(), hash_buffer)?; + create_hashes( + &arrays, + REPARTITION_RANDOM_STATE.random_state(), + hash_buffer, + )?; let mut indices: Vec<_> = (0..*partitions) .map(|_| Vec::with_capacity(batch.num_rows())) From c2db472772c8b04bbafecdb6f344cdcd9f2fc0aa Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 17 Dec 2025 16:50:02 -0600 Subject: [PATCH 3/9] use seeds in Hash/PartialEq --- .../src/joins/hash_join/partitioned_hash_eval.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs index b239054a9e925..b3f9161de503c 100644 --- a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs +++ b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs @@ -132,12 +132,15 @@ impl Hash for HashExpr { fn hash(&self, state: &mut H) { self.on_columns.dyn_hash(state); self.description.hash(state); + self.seeds().hash(state); } } impl PartialEq for HashExpr { fn eq(&self, other: &Self) -> bool { - self.on_columns == other.on_columns && self.description == other.description + self.on_columns == other.on_columns + && self.description == other.description + && self.seeds() == other.seeds() } } From 2c89b17f00b362c41df209f4ddae252728431c8e Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 17 Dec 2025 16:51:14 -0600 Subject: [PATCH 4/9] include hash expr in comparisons --- .../physical-plan/src/joins/hash_join/partitioned_hash_eval.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs index b3f9161de503c..913c07dd071fe 100644 --- a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs +++ b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs @@ -263,8 +263,7 @@ impl Hash for HashTableLookupExpr { impl PartialEq for HashTableLookupExpr { fn eq(&self, other: &Self) -> bool { - Arc::ptr_eq(&self.hash_expr, &other.hash_expr) - && self.description == other.description + self.hash_expr.dyn_eq(&other.hash_expr) && self.description == other.description } } From 6ccd80b29ac876bf0d7fb17181d90c84db4ea859 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 18 Dec 2025 06:26:27 -0600 Subject: [PATCH 5/9] include seeds in debug repr, include hash map pointer in hash/equality --- .../src/joins/hash_join/partitioned_hash_eval.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs index 913c07dd071fe..9bc98a3021070 100644 --- a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs +++ b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs @@ -124,7 +124,8 @@ impl std::fmt::Debug for HashExpr { .map(|e| e.to_string()) .collect::>() .join(", "); - write!(f, "{}({})", self.description, cols) + let (s1, s2, s3, s4) = self.seeds(); + write!(f, "{}({cols}, [{s1},{s2},{s3},{s4}])", self.description) } } @@ -258,12 +259,15 @@ impl Hash for HashTableLookupExpr { fn hash(&self, state: &mut H) { self.hash_expr.dyn_hash(state); self.description.hash(state); + Arc::as_ptr(&self.hash_map).hash(state); } } impl PartialEq for HashTableLookupExpr { fn eq(&self, other: &Self) -> bool { - self.hash_expr.dyn_eq(&other.hash_expr) && self.description == other.description + self.hash_expr.dyn_eq(&other.hash_expr) + && self.description == other.description + && Arc::ptr_eq(&self.hash_map, &other.hash_map) } } From 401255156bca581b69b21d84dfa9ca4e07bfdc4a Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 18 Dec 2025 06:39:49 -0600 Subject: [PATCH 6/9] tests, fix --- .../joins/hash_join/partitioned_hash_eval.rs | 264 +++++++++++++++++- 1 file changed, 263 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs index 9bc98a3021070..202f9b9db73b5 100644 --- a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs +++ b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs @@ -265,7 +265,7 @@ impl Hash for HashTableLookupExpr { impl PartialEq for HashTableLookupExpr { fn eq(&self, other: &Self) -> bool { - self.hash_expr.dyn_eq(&other.hash_expr) + self.hash_expr.as_ref() == other.hash_expr.as_ref() && self.description == other.description && Arc::ptr_eq(&self.hash_map, &other.hash_map) } @@ -349,3 +349,265 @@ impl PhysicalExpr for HashTableLookupExpr { write!(f, "{}", self.description) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::joins::join_hash_map::JoinHashMapU32; + use datafusion_physical_expr::expressions::Column; + use std::collections::hash_map::DefaultHasher; + use std::hash::Hasher; + + fn compute_hash(value: &T) -> u64 { + let mut hasher = DefaultHasher::new(); + value.hash(&mut hasher); + hasher.finish() + } + + #[test] + fn test_hash_expr_eq_same() { + let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0)); + let col_b: PhysicalExprRef = Arc::new(Column::new("b", 1)); + + let expr1 = HashExpr::new( + vec![Arc::clone(&col_a), Arc::clone(&col_b)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "test_hash".to_string(), + ); + + let expr2 = HashExpr::new( + vec![Arc::clone(&col_a), Arc::clone(&col_b)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "test_hash".to_string(), + ); + + assert_eq!(expr1, expr2); + } + + #[test] + fn test_hash_expr_eq_different_columns() { + let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0)); + let col_b: PhysicalExprRef = Arc::new(Column::new("b", 1)); + let col_c: PhysicalExprRef = Arc::new(Column::new("c", 2)); + + let expr1 = HashExpr::new( + vec![Arc::clone(&col_a), Arc::clone(&col_b)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "test_hash".to_string(), + ); + + let expr2 = HashExpr::new( + vec![Arc::clone(&col_a), Arc::clone(&col_c)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "test_hash".to_string(), + ); + + assert_ne!(expr1, expr2); + } + + #[test] + fn test_hash_expr_eq_different_description() { + let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0)); + + let expr1 = HashExpr::new( + vec![Arc::clone(&col_a)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "hash_one".to_string(), + ); + + let expr2 = HashExpr::new( + vec![Arc::clone(&col_a)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "hash_two".to_string(), + ); + + assert_ne!(expr1, expr2); + } + + #[test] + fn test_hash_expr_eq_different_seeds() { + let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0)); + + let expr1 = HashExpr::new( + vec![Arc::clone(&col_a)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "test_hash".to_string(), + ); + + let expr2 = HashExpr::new( + vec![Arc::clone(&col_a)], + SeededRandomState::with_seeds(5, 6, 7, 8), + "test_hash".to_string(), + ); + + assert_ne!(expr1, expr2); + } + + #[test] + fn test_hash_expr_hash_consistency() { + let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0)); + let col_b: PhysicalExprRef = Arc::new(Column::new("b", 1)); + + let expr1 = HashExpr::new( + vec![Arc::clone(&col_a), Arc::clone(&col_b)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "test_hash".to_string(), + ); + + let expr2 = HashExpr::new( + vec![Arc::clone(&col_a), Arc::clone(&col_b)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "test_hash".to_string(), + ); + + // Equal expressions should have equal hashes + assert_eq!(expr1, expr2); + assert_eq!(compute_hash(&expr1), compute_hash(&expr2)); + } + + #[test] + fn test_hash_table_lookup_expr_eq_same() { + let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0)); + let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new( + vec![Arc::clone(&col_a)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "inner_hash".to_string(), + )); + let hash_map: Arc = + Arc::new(JoinHashMapU32::with_capacity(10)); + + let expr1 = HashTableLookupExpr::new( + Arc::clone(&hash_expr), + Arc::clone(&hash_map), + "lookup".to_string(), + ); + + let expr2 = HashTableLookupExpr::new( + Arc::clone(&hash_expr), + Arc::clone(&hash_map), + "lookup".to_string(), + ); + + assert_eq!(expr1, expr2); + } + + #[test] + fn test_hash_table_lookup_expr_eq_different_hash_expr() { + let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0)); + let col_b: PhysicalExprRef = Arc::new(Column::new("b", 1)); + + let hash_expr1: PhysicalExprRef = Arc::new(HashExpr::new( + vec![Arc::clone(&col_a)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "inner_hash".to_string(), + )); + + let hash_expr2: PhysicalExprRef = Arc::new(HashExpr::new( + vec![Arc::clone(&col_b)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "inner_hash".to_string(), + )); + + let hash_map: Arc = + Arc::new(JoinHashMapU32::with_capacity(10)); + + let expr1 = HashTableLookupExpr::new( + Arc::clone(&hash_expr1), + Arc::clone(&hash_map), + "lookup".to_string(), + ); + + let expr2 = HashTableLookupExpr::new( + Arc::clone(&hash_expr2), + Arc::clone(&hash_map), + "lookup".to_string(), + ); + + assert_ne!(expr1, expr2); + } + + #[test] + fn test_hash_table_lookup_expr_eq_different_description() { + let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0)); + let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new( + vec![Arc::clone(&col_a)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "inner_hash".to_string(), + )); + let hash_map: Arc = + Arc::new(JoinHashMapU32::with_capacity(10)); + + let expr1 = HashTableLookupExpr::new( + Arc::clone(&hash_expr), + Arc::clone(&hash_map), + "lookup_one".to_string(), + ); + + let expr2 = HashTableLookupExpr::new( + Arc::clone(&hash_expr), + Arc::clone(&hash_map), + "lookup_two".to_string(), + ); + + assert_ne!(expr1, expr2); + } + + #[test] + fn test_hash_table_lookup_expr_eq_different_hash_map() { + let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0)); + let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new( + vec![Arc::clone(&col_a)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "inner_hash".to_string(), + )); + + // Two different Arc pointers (even with same content) should not be equal + let hash_map1: Arc = + Arc::new(JoinHashMapU32::with_capacity(10)); + let hash_map2: Arc = + Arc::new(JoinHashMapU32::with_capacity(10)); + + let expr1 = HashTableLookupExpr::new( + Arc::clone(&hash_expr), + hash_map1, + "lookup".to_string(), + ); + + let expr2 = HashTableLookupExpr::new( + Arc::clone(&hash_expr), + hash_map2, + "lookup".to_string(), + ); + + // Different Arc pointers means not equal (uses Arc::ptr_eq) + assert_ne!(expr1, expr2); + } + + #[test] + fn test_hash_table_lookup_expr_hash_consistency() { + let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0)); + let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new( + vec![Arc::clone(&col_a)], + SeededRandomState::with_seeds(1, 2, 3, 4), + "inner_hash".to_string(), + )); + let hash_map: Arc = + Arc::new(JoinHashMapU32::with_capacity(10)); + + let expr1 = HashTableLookupExpr::new( + Arc::clone(&hash_expr), + Arc::clone(&hash_map), + "lookup".to_string(), + ); + + let expr2 = HashTableLookupExpr::new( + Arc::clone(&hash_expr), + Arc::clone(&hash_map), + "lookup".to_string(), + ); + + // Equal expressions should have equal hashes + assert_eq!(expr1, expr2); + assert_eq!(compute_hash(&expr1), compute_hash(&expr2)); + } +} From 1cb6b1cf1b1fb6a1732feba1a343bb61cb40cf30 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 18 Dec 2025 06:57:20 -0600 Subject: [PATCH 7/9] fix --- datafusion/proto/tests/cases/roundtrip_physical_plan.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index f5e0b4f157f5e..063bb166fa9e9 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -2386,7 +2386,7 @@ fn roundtrip_hash_expr() -> Result<()> { let on_columns = vec![col("a", &schema)?, col("b", &schema)?]; let hash_expr: Arc = Arc::new(HashExpr::new( on_columns, - SeededRandomState::with_seeds(0, 0, 0, 0), // repartition seeds + SeededRandomState::with_seeds(0, 1, 2, 3), // arbitrary random seeds for testing "test_hash".to_string(), )); @@ -2398,5 +2398,7 @@ fn roundtrip_hash_expr() -> Result<()> { Arc::new(EmptyExec::new(schema)), )?); + // Confirm that the debug string contains the random state seeds + assert!("{filter:?}".contains("[0, 1, 2, 3]")); roundtrip_test(filter) } From 476415ad32d69e17b4c075fe4b466cf39024ae76 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 18 Dec 2025 06:59:30 -0600 Subject: [PATCH 8/9] add missing format! --- datafusion/proto/tests/cases/roundtrip_physical_plan.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 063bb166fa9e9..5c70bcbdff100 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -2399,6 +2399,9 @@ fn roundtrip_hash_expr() -> Result<()> { )?); // Confirm that the debug string contains the random state seeds - assert!("{filter:?}".contains("[0, 1, 2, 3]")); + assert!( + format!("{filter:?}").contains("test_hash(a@0, b@1, [0,1,2,3])"), + "Debug string missing seeds: {filter:?}" + ); roundtrip_test(filter) } From c68da54b71c5f925284465b36596e05d5b2ac5c9 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 19 Dec 2025 07:06:08 -0600 Subject: [PATCH 9/9] add comments about equality comparisons --- .../src/joins/hash_join/partitioned_hash_eval.rs | 14 ++++++++++++++ .../physical-plan/src/joins/join_hash_map.rs | 6 ++++++ 2 files changed, 20 insertions(+) diff --git a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs index 202f9b9db73b5..4c437e813139d 100644 --- a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs +++ b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs @@ -259,12 +259,26 @@ impl Hash for HashTableLookupExpr { fn hash(&self, state: &mut H) { self.hash_expr.dyn_hash(state); self.description.hash(state); + // Note that we compare hash_map by pointer equality. + // Actually comparing the contents of the hash maps would be expensive. + // The way these hash maps are used in actuality is that HashJoinExec creates + // one per partition per query execution, thus it is never possible for two different + // hash maps to have the same content in practice. + // Theoretically this is a public API and users could create identical hash maps, + // but that seems unlikely and not worth paying the cost of deep comparison all the time. Arc::as_ptr(&self.hash_map).hash(state); } } impl PartialEq for HashTableLookupExpr { fn eq(&self, other: &Self) -> bool { + // Note that we compare hash_map by pointer equality. + // Actually comparing the contents of the hash maps would be expensive. + // The way these hash maps are used in actuality is that HashJoinExec creates + // one per partition per query execution, thus it is never possible for two different + // hash maps to have the same content in practice. + // Theoretically this is a public API and users could create identical hash maps, + // but that seems unlikely and not worth paying the cost of deep comparison all the time. self.hash_expr.as_ref() == other.hash_expr.as_ref() && self.description == other.description && Arc::ptr_eq(&self.hash_map, &other.hash_map) diff --git a/datafusion/physical-plan/src/joins/join_hash_map.rs b/datafusion/physical-plan/src/joins/join_hash_map.rs index ed370fdb16cf0..b0ed6dcc7c255 100644 --- a/datafusion/physical-plan/src/joins/join_hash_map.rs +++ b/datafusion/physical-plan/src/joins/join_hash_map.rs @@ -94,6 +94,12 @@ use hashbrown::hash_table::Entry::{Occupied, Vacant}; /// /// At runtime we choose between using `JoinHashMapU32` and `JoinHashMapU64` which oth implement /// `JoinHashMapType`. +/// +/// ## Note on use of this trait as a public API +/// This is currently a public trait but is mainly intended for internal use within DataFusion. +/// For example, we may compare references to `JoinHashMapType` implementations by pointer equality +/// rather than deep equality of contents, as deep equality would be expensive and in our usage +/// patterns it is impossible for two different hash maps to have identical contents in a practical sense. pub trait JoinHashMapType: Send + Sync { fn extend_zero(&mut self, len: usize);