Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/joins/hash_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! [`HashJoinExec`] Partitioned Hash Join Operator
pub use exec::HashJoinExec;
pub use partitioned_hash_eval::HashTableLookupExpr;

mod exec;
mod inlist_builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::{hash_utils::create_hashes, joins::utils::JoinHashMapType};
/// This is used for:
/// - Computing routing hashes (with RepartitionExec's 0,0,0,0 seeds)
/// - Computing lookup hashes (with HashJoin's 'J','O','I','N' seeds)
pub(super) struct HashExpr {
pub struct HashExpr {
/// Columns to hash
on_columns: Vec<PhysicalExprRef>,
/// Random state for hashing
Expand Down Expand Up @@ -179,7 +179,11 @@ impl HashTableLookupExpr {
/// * `hash_expr` - Expression that computes hash values
/// * `hash_map` - Hash table to check membership
/// * `description` - Description for debugging
pub(super) fn new(
///
/// # Note
/// This is public for internal testing purposes only and is not
/// guaranteed to be stable across versions.
pub fn new(
hash_expr: PhysicalExprRef,
hash_map: Arc<dyn JoinHashMapType>,
description: String,
Expand Down
8 changes: 6 additions & 2 deletions datafusion/physical-plan/src/joins/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use arrow::array::BooleanBufferBuilder;
pub use cross_join::CrossJoinExec;
use datafusion_physical_expr::PhysicalExprRef;
pub use hash_join::HashJoinExec;
pub use hash_join::{HashJoinExec, HashTableLookupExpr};
pub use nested_loop_join::NestedLoopJoinExec;
use parking_lot::Mutex;
// Note: SortMergeJoin is not used in plans yet
Expand All @@ -37,7 +37,11 @@ mod symmetric_hash_join;
pub mod utils;

mod join_filter;
mod join_hash_map;
/// Hash map implementations for join operations.
///
/// Note: This module is public for internal testing purposes only
/// and is not guaranteed to be stable across versions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

pub mod join_hash_map;

#[cfg(test)]
pub mod test_utils;
Expand Down
25 changes: 25 additions & 0 deletions datafusion/proto/src/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use datafusion_physical_plan::expressions::{
BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr,
Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
};
use datafusion_physical_plan::joins::HashTableLookupExpr;
use datafusion_physical_plan::udaf::AggregateFunctionExpr;
use datafusion_physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr};
use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
Expand Down Expand Up @@ -227,6 +228,30 @@ pub fn serialize_physical_expr(
let value = snapshot_physical_expr(Arc::clone(value))?;
let expr = value.as_any();

// HashTableLookupExpr is used for dynamic filter pushdown in hash joins.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice if we could move the protobuf serialization logic into the PhysicalExpr trait itself so we don't forget new structures like this

However, given this just follows the existing pattern, i think it looks good to me

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree

// It contains an Arc<dyn JoinHashMapType> (the build-side hash table) which
// cannot be serialized - the hash table is a runtime structure built during
// execution on the build side.
//
// We replace it with lit(true) which is safe because:
// 1. The filter is a performance optimization, not a correctness requirement
// 2. lit(true) passes all rows, so no valid rows are incorrectly filtered out
// 3. The join itself will still produce correct results, just without the
// benefit of early filtering on the probe side
//
// In distributed execution, the remote worker won't have access to the hash
// table anyway, so the best we can do is skip this optimization.
if expr.downcast_ref::<HashTableLookupExpr>().is_some() {
let value = datafusion_proto_common::ScalarValue {
value: Some(datafusion_proto_common::scalar_value::Value::BoolValue(
true,
)),
};
return Ok(protobuf::PhysicalExprNode {
expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(value)),
});
}

if let Some(expr) = expr.downcast_ref::<Column>() {
Ok(protobuf::PhysicalExprNode {
expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
Expand Down
50 changes: 48 additions & 2 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ use datafusion::physical_plan::expressions::{
};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::joins::{
HashJoinExec, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec,
StreamJoinPartitionMode, SymmetricHashJoinExec,
HashJoinExec, HashTableLookupExpr, NestedLoopJoinExec, PartitionMode,
SortMergeJoinExec, StreamJoinPartitionMode, SymmetricHashJoinExec,
};
use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
Expand Down Expand Up @@ -116,6 +116,7 @@ use datafusion_expr::{
use datafusion_functions_aggregate::average::avg_udaf;
use datafusion_functions_aggregate::nth_value::nth_value_udaf;
use datafusion_functions_aggregate::string_agg::string_agg_udaf;
use datafusion_physical_plan::joins::join_hash_map::JoinHashMapU32;
use datafusion_proto::physical_plan::{
AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec,
};
Expand Down Expand Up @@ -2327,3 +2328,48 @@ async fn roundtrip_async_func_exec() -> Result<()> {

Ok(())
}

/// Test that HashTableLookupExpr serializes to lit(true)
///
/// HashTableLookupExpr contains a runtime hash table that cannot be serialized.
/// The serialization code replaces it with lit(true) which is safe because
/// it's a performance optimization filter, not a correctness requirement.
#[test]
fn roundtrip_hash_table_lookup_expr_to_lit() -> Result<()> {
// Create a simple schema and input plan
let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Int64, false)]));
let input = Arc::new(EmptyExec::new(schema.clone()));

// Create a HashTableLookupExpr - it will be replaced with lit(true) during serialization
let hash_map = Arc::new(JoinHashMapU32::with_capacity(0));
let hash_expr: Arc<dyn PhysicalExpr> = Arc::new(Column::new("col", 0));
let lookup_expr: Arc<dyn PhysicalExpr> = Arc::new(HashTableLookupExpr::new(
hash_expr,
hash_map,
"test_lookup".to_string(),
));

// Create a filter with the lookup expression
let filter = Arc::new(FilterExec::try_new(lookup_expr, input)?);

// Serialize
let ctx = SessionContext::new();
let codec = DefaultPhysicalExtensionCodec {};
let proto: protobuf::PhysicalPlanNode =
protobuf::PhysicalPlanNode::try_from_physical_plan(filter.clone(), &codec)
.expect("serialization should succeed");

// Deserialize
let result: Arc<dyn ExecutionPlan> = proto
.try_into_physical_plan(&ctx.task_ctx(), &codec)
.expect("deserialization should succeed");

// The deserialized plan should have lit(true) instead of HashTableLookupExpr
// Verify the filter predicate is a Literal(true)
let result_filter = result.as_any().downcast_ref::<FilterExec>().unwrap();
let predicate = result_filter.predicate();
let literal = predicate.as_any().downcast_ref::<Literal>().unwrap();
assert_eq!(*literal.value(), ScalarValue::Boolean(Some(true)));

Ok(())
}