Skip to content

Conversation

@adriangb
Copy link
Contributor

@adriangb adriangb commented Dec 17, 2025

Summary

This PR adds protobuf serialization/deserialization support for HashExpr, enabling distributed query execution to serialize hash expressions used in hash joins and repartitioning.

This is a followup to #18393 which introduced HashExpr but did not add serialization support.
This causes errors when serialization is triggered on a query that pushes down dynamic filters from a HashJoinExec.

As of #18393 HashJoinExec produces filters of the form:

CASE (hash_repartition % 2)
    WHEN 0 THEN 
        a >= ab AND a <= ab AND 
        b >= bb AND b <= bb AND 
        hash_lookup(a,b)
    WHEN 1 THEN 
        a >= aa AND a <= aa AND 
        b >= ba AND b <= ba AND 
        hash_lookup(a,b)
    ELSE 
        FALSE
END

Where hash_lookup is an expression that holds a reference to a given partitions hash join hash table and will check for membership.

Since we created these new expressions but didn't make any of them serializable any attempt to do a distributed query or similar would run into errors.

In #19300 we fixed hash_lookup by replacing it with true since it can't be serialized across the wire (we'd have to send the entire hash table). The logic was that this preserves the bounds checks, which as still valuable.

This PR handles hash_repartition which determines which partition (and hence which branch of the CASE expression) the row belongs to. For this expression we can serialize it, so that's what I'm doing in this PR.

Key Changes

  • SeededRandomState wrapper: Added a SeededRandomState struct that wraps ahash::RandomState while preserving the seeds used to create it. This is necessary because RandomState doesn't expose seeds after creation, but we need them for serialization.

  • Updated seed constants: Changed HASH_JOIN_SEED and REPARTITION_RANDOM_STATE constants to use SeededRandomState instead of raw RandomState.

  • HashExpr enhancements:

    • Changed HashExpr to use SeededRandomState
    • Added getter methods: on_columns(), seeds(), description()
    • Exported HashExpr and SeededRandomState from the joins module
  • Protobuf support:

    • Added PhysicalHashExprNode message to datafusion.proto with fields for on_columns, seeds (4 u64 values), and description
    • Implemented serialization in to_proto.rs
    • Implemented deserialization in from_proto.rs

Test plan

  • Added roundtrip test in roundtrip_physical_plan.rs that creates a HashExpr, serializes it, deserializes it, and verifies the result
  • All existing hash join tests pass (583 tests)
  • All proto roundtrip tests pass

🤖 Generated with Claude Code

This PR adds protobuf serialization/deserialization support for HashExpr,
enabling distributed query execution to serialize hash expressions used
in hash joins and repartitioning.

## Changes

### SeededRandomState wrapper
- Added `SeededRandomState` struct that wraps `ahash::RandomState` while
  preserving the seeds used to create it (since RandomState doesn't expose
  seeds after creation)
- Updated `HASH_JOIN_SEED` and `REPARTITION_RANDOM_STATE` constants to use
  `SeededRandomState`

### HashExpr updates
- Changed `HashExpr` to use `SeededRandomState` instead of raw `RandomState`
- Added getter methods: `on_columns()`, `seeds()`, `description()`
- Exported `HashExpr` and `SeededRandomState` from joins module

### Protobuf support
- Added `PhysicalHashExprNode` message to datafusion.proto with fields for
  on_columns, seeds (4 u64 values), and description
- Implemented serialization in to_proto.rs
- Implemented deserialization in from_proto.rs
- Added roundtrip test to verify serde works correctly

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <[email protected]>
@github-actions github-actions bot added proto Related to proto crate physical-plan Changes to the physical-plan crate labels Dec 17, 2025
Comment on lines +135 to +143
self.seeds().hash(state);
}
}

impl PartialEq for HashExpr {
fn eq(&self, other: &Self) -> bool {
self.on_columns == other.on_columns && self.description == other.description
self.on_columns == other.on_columns
&& self.description == other.description
&& self.seeds() == other.seeds()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there was also a bug lurking here where expressions would erroneously compare equal even if they had different RandomStates. I don't think it was likely to result in a real bug but let's fix it while were here.

fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.hash_expr, &other.hash_expr)
&& self.description == other.description
self.hash_expr.dyn_eq(&other.hash_expr) && self.description == other.description
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be:

Suggested change
self.hash_expr.dyn_eq(&other.hash_expr) && self.description == other.description
self.hash_expr.dyn_eq(&other.hash_expr.as_any()) && self.description == other.description

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes good catch, I've fixed and added tests: 4012551

Arc::new(EmptyExec::new(schema)),
)?);

roundtrip_test(filter)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The roundtrip compares the Debug representations of the input and output, but the random_state is not printed, so the seed is not asserted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I added the seeds to the debug representation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adriangb adriangb requested review from alamb and gabotechs December 18, 2025 14:14
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.hash_expr.dyn_hash(state);
self.description.hash(state);
Arc::as_ptr(&self.hash_map).hash(state);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is hashing the pointer address necessary here? it looks exotic enough to justify a comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we could not hash it and let any sort of hash comparison fall back to equality on collision.
The point (a good pun with pointer?) is that we don't want to actually compare the hash maps (expensive) but if the pointer points to different hashmaps we can just assume the expression is different.

Arc::ptr_eq(&self.hash_expr, &other.hash_expr)
self.hash_expr.as_ref() == other.hash_expr.as_ref()
&& self.description == other.description
&& Arc::ptr_eq(&self.hash_map, &other.hash_map)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like it was like this already, but want to make the question anyways:

How it is that for two HashTableLookupExpr to be equal, the pointer address of hash_map need to be the same? Shouldn't it be up to the trait JoinHashMapType implementation to decide how to perform the equality comparison?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes in theory that would be better. But given the actual real world usage I think just saying "if they're not the same hash table treat them as not being the same" is good enough. The only case this doesn't cover is where two hash tables have the same data but given how this is used in HashJoinExec that's not possible (it would be a major bug!).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it looks correct to compare the pointer addresses today. I wonder how that decision will age though, if in the future someone starts relying on actual hash table comparison by value, do you think that can silently be rendered into a bug?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The alternative seems like it would have a lot of negative performance and complexity implications. Can we just document this on the trait? I feel like it's a pseudo-internal trait that we only use for implementation reasons, I don't know that there's an intention of users creating their own implementations even if it is theoretically possible to do so.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I think just adding a comment for readers should be enough, that way if contributors need to perform comparisons in other places of the codebase over this struct they can easily take this into account.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do. I also don't know why JoinHashMapType is public, I opened an issue in #19408

Copy link
Contributor

@gabotechs gabotechs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines +92 to +93
pub(crate) const HASH_JOIN_SEED: SeededRandomState =
SeededRandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I like the fact that having this SeededRandomState struct is an explicit indicator that the underlaying RandomState is not completely random.

Copy link
Member

@xudong963 xudong963 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adriangb adriangb added this pull request to the merge queue Dec 19, 2025
Merged via the queue into apache:main with commit 91cfb69 Dec 19, 2025
31 checks passed
adriangb added a commit to pydantic/datafusion that referenced this pull request Jan 2, 2026
## Summary

This PR adds protobuf serialization/deserialization support for
`HashExpr`, enabling distributed query execution to serialize hash
expressions used in hash joins and repartitioning.

This is a followup to apache#18393 which introduced `HashExpr` but did not add
serialization support.
This causes errors when serialization is triggered on a query that
pushes down dynamic filters from a `HashJoinExec`.

As of  apache#18393 `HashJoinExec` produces filters of the form:

```sql
CASE (hash_repartition % 2)
    WHEN 0 THEN 
        a >= ab AND a <= ab AND 
        b >= bb AND b <= bb AND 
        hash_lookup(a,b)
    WHEN 1 THEN 
        a >= aa AND a <= aa AND 
        b >= ba AND b <= ba AND 
        hash_lookup(a,b)
    ELSE 
        FALSE
END
```

Where `hash_lookup` is an expression that holds a reference to a given
partitions hash join hash table and will check for membership.

Since we created these new expressions but didn't make any of them
serializable any attempt to do a distributed query or similar would run
into errors.

In apache#19300 we fixed
`hash_lookup` by replacing it with `true` since it can't be serialized
across the wire (we'd have to send the entire hash table). The logic was
that this preserves the bounds checks, which as still valuable.

This PR handles `hash_repartition` which determines which partition (and
hence which branch of the `CASE` expression) the row belongs to. For
this expression we *can* serialize it, so that's what I'm doing in this
PR.

### Key Changes

- **SeededRandomState wrapper**: Added a `SeededRandomState` struct that
wraps `ahash::RandomState` while preserving the seeds used to create it.
This is necessary because `RandomState` doesn't expose seeds after
creation, but we need them for serialization.

- **Updated seed constants**: Changed `HASH_JOIN_SEED` and
`REPARTITION_RANDOM_STATE` constants to use `SeededRandomState` instead
of raw `RandomState`.

- **HashExpr enhancements**: 
  - Changed `HashExpr` to use `SeededRandomState`
  - Added getter methods: `on_columns()`, `seeds()`, `description()`
  - Exported `HashExpr` and `SeededRandomState` from the joins module

- **Protobuf support**:
- Added `PhysicalHashExprNode` message to `datafusion.proto` with fields
for `on_columns`, seeds (4 `u64` values), and `description`
  - Implemented serialization in `to_proto.rs`
  - Implemented deserialization in `from_proto.rs`

## Test plan

- [x] Added roundtrip test in `roundtrip_physical_plan.rs` that creates
a `HashExpr`, serializes it, deserializes it, and verifies the result
- [x] All existing hash join tests pass (583 tests)
- [x] All proto roundtrip tests pass

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Opus 4.5 <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate proto Related to proto crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants