diff --git a/datafusion/physical-plan/src/joins/hash_join/mod.rs b/datafusion/physical-plan/src/joins/hash_join/mod.rs index ac1c54f4f6034..352209e9c3f75 100644 --- a/datafusion/physical-plan/src/joins/hash_join/mod.rs +++ b/datafusion/physical-plan/src/joins/hash_join/mod.rs @@ -18,6 +18,7 @@ //! [`HashJoinExec`] Partitioned Hash Join Operator pub use exec::HashJoinExec; +pub use partitioned_hash_eval::HashTableLookupExpr; mod exec; mod inlist_builder; diff --git a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs index ffceb6b659aa6..b91f5f46d719c 100644 --- a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs +++ b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs @@ -42,7 +42,7 @@ use crate::{hash_utils::create_hashes, joins::utils::JoinHashMapType}; /// This is used for: /// - Computing routing hashes (with RepartitionExec's 0,0,0,0 seeds) /// - Computing lookup hashes (with HashJoin's 'J','O','I','N' seeds) -pub(super) struct HashExpr { +pub struct HashExpr { /// Columns to hash on_columns: Vec, /// Random state for hashing @@ -179,7 +179,11 @@ impl HashTableLookupExpr { /// * `hash_expr` - Expression that computes hash values /// * `hash_map` - Hash table to check membership /// * `description` - Description for debugging - pub(super) fn new( + /// + /// # Note + /// This is public for internal testing purposes only and is not + /// guaranteed to be stable across versions. + pub fn new( hash_expr: PhysicalExprRef, hash_map: Arc, description: String, diff --git a/datafusion/physical-plan/src/joins/mod.rs b/datafusion/physical-plan/src/joins/mod.rs index b0c28cf994f71..0ca77b3cae982 100644 --- a/datafusion/physical-plan/src/joins/mod.rs +++ b/datafusion/physical-plan/src/joins/mod.rs @@ -20,7 +20,7 @@ use arrow::array::BooleanBufferBuilder; pub use cross_join::CrossJoinExec; use datafusion_physical_expr::PhysicalExprRef; -pub use hash_join::HashJoinExec; +pub use hash_join::{HashJoinExec, HashTableLookupExpr}; pub use nested_loop_join::NestedLoopJoinExec; use parking_lot::Mutex; // Note: SortMergeJoin is not used in plans yet @@ -37,7 +37,11 @@ mod symmetric_hash_join; pub mod utils; mod join_filter; -mod join_hash_map; +/// Hash map implementations for join operations. +/// +/// Note: This module is public for internal testing purposes only +/// and is not guaranteed to be stable across versions. +pub mod join_hash_map; #[cfg(test)] pub mod test_utils; diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 146e9258111a2..6b9eb9a33e1fe 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -41,6 +41,7 @@ use datafusion_physical_plan::expressions::{ BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, }; +use datafusion_physical_plan::joins::HashTableLookupExpr; use datafusion_physical_plan::udaf::AggregateFunctionExpr; use datafusion_physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr}; use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr}; @@ -227,6 +228,30 @@ pub fn serialize_physical_expr( let value = snapshot_physical_expr(Arc::clone(value))?; let expr = value.as_any(); + // HashTableLookupExpr is used for dynamic filter pushdown in hash joins. + // It contains an Arc (the build-side hash table) which + // cannot be serialized - the hash table is a runtime structure built during + // execution on the build side. + // + // We replace it with lit(true) which is safe because: + // 1. The filter is a performance optimization, not a correctness requirement + // 2. lit(true) passes all rows, so no valid rows are incorrectly filtered out + // 3. The join itself will still produce correct results, just without the + // benefit of early filtering on the probe side + // + // In distributed execution, the remote worker won't have access to the hash + // table anyway, so the best we can do is skip this optimization. + if expr.downcast_ref::().is_some() { + let value = datafusion_proto_common::ScalarValue { + value: Some(datafusion_proto_common::scalar_value::Value::BoolValue( + true, + )), + }; + return Ok(protobuf::PhysicalExprNode { + expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(value)), + }); + } + if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_type: Some(protobuf::physical_expr_node::ExprType::Column( diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 438e65f60b001..19c2f437d8a7f 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -79,8 +79,8 @@ use datafusion::physical_plan::expressions::{ }; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::joins::{ - HashJoinExec, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec, - StreamJoinPartitionMode, SymmetricHashJoinExec, + HashJoinExec, HashTableLookupExpr, NestedLoopJoinExec, PartitionMode, + SortMergeJoinExec, StreamJoinPartitionMode, SymmetricHashJoinExec, }; use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion::physical_plan::placeholder_row::PlaceholderRowExec; @@ -116,6 +116,7 @@ use datafusion_expr::{ use datafusion_functions_aggregate::average::avg_udaf; use datafusion_functions_aggregate::nth_value::nth_value_udaf; use datafusion_functions_aggregate::string_agg::string_agg_udaf; +use datafusion_physical_plan::joins::join_hash_map::JoinHashMapU32; use datafusion_proto::physical_plan::{ AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec, }; @@ -2327,3 +2328,48 @@ async fn roundtrip_async_func_exec() -> Result<()> { Ok(()) } + +/// Test that HashTableLookupExpr serializes to lit(true) +/// +/// HashTableLookupExpr contains a runtime hash table that cannot be serialized. +/// The serialization code replaces it with lit(true) which is safe because +/// it's a performance optimization filter, not a correctness requirement. +#[test] +fn roundtrip_hash_table_lookup_expr_to_lit() -> Result<()> { + // Create a simple schema and input plan + let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Int64, false)])); + let input = Arc::new(EmptyExec::new(schema.clone())); + + // Create a HashTableLookupExpr - it will be replaced with lit(true) during serialization + let hash_map = Arc::new(JoinHashMapU32::with_capacity(0)); + let hash_expr: Arc = Arc::new(Column::new("col", 0)); + let lookup_expr: Arc = Arc::new(HashTableLookupExpr::new( + hash_expr, + hash_map, + "test_lookup".to_string(), + )); + + // Create a filter with the lookup expression + let filter = Arc::new(FilterExec::try_new(lookup_expr, input)?); + + // Serialize + let ctx = SessionContext::new(); + let codec = DefaultPhysicalExtensionCodec {}; + let proto: protobuf::PhysicalPlanNode = + protobuf::PhysicalPlanNode::try_from_physical_plan(filter.clone(), &codec) + .expect("serialization should succeed"); + + // Deserialize + let result: Arc = proto + .try_into_physical_plan(&ctx.task_ctx(), &codec) + .expect("deserialization should succeed"); + + // The deserialized plan should have lit(true) instead of HashTableLookupExpr + // Verify the filter predicate is a Literal(true) + let result_filter = result.as_any().downcast_ref::().unwrap(); + let predicate = result_filter.predicate(); + let literal = predicate.as_any().downcast_ref::().unwrap(); + assert_eq!(*literal.value(), ScalarValue::Boolean(Some(true))); + + Ok(()) +}