Skip to content

Conversation

@adriangb
Copy link
Contributor

@adriangb adriangb commented Dec 12, 2025

#18393 introduced a new expression that can't be serialized. This causes errors when serializing now, and would break any users using joins + protobuf.

@github-actions github-actions bot added proto Related to proto crate physical-plan Changes to the physical-plan crate labels Dec 12, 2025
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @adriangb -- I think we should have a basic unit test for this code too

let value = snapshot_physical_expr(Arc::clone(value))?;
let expr = value.as_any();

// HashTableLookupExpr is used for dynamic filter pushdown in hash joins.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice if we could move the protobuf serialization logic into the PhysicalExpr trait itself so we don't forget new structures like this

However, given this just follows the existing pattern, i think it looks good to me

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree

@adriangb adriangb requested a review from alamb December 12, 2025 19:24
/// Hash map implementations for join operations.
///
/// Note: This module is public for internal testing purposes only
/// and is not guaranteed to be stable across versions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@adriangb adriangb requested a review from alamb December 13, 2025 14:52
@adriangb adriangb added this pull request to the merge queue Dec 13, 2025
Merged via the queue into apache:main with commit d61f1a7 Dec 13, 2025
31 checks passed
@adriangb adriangb deleted the replace-hash-map-table branch December 13, 2025 18:34
LiaCastaneda pushed a commit to DataDog/datafusion that referenced this pull request Dec 17, 2025
…ache#19300)

*errors* when serializing now, and would break any users using joins +
protobuf.

(cherry picked from commit d61f1a7)
LiaCastaneda pushed a commit to DataDog/datafusion that referenced this pull request Dec 17, 2025
…ache#19300)

*errors* when serializing now, and would break any users using joins +
protobuf.

(cherry picked from commit d61f1a7)
(cherry picked from commit e0a1211)
github-merge-queue bot pushed a commit that referenced this pull request Dec 19, 2025
## Summary

This PR adds protobuf serialization/deserialization support for
`HashExpr`, enabling distributed query execution to serialize hash
expressions used in hash joins and repartitioning.

This is a followup to #18393 which introduced `HashExpr` but did not add
serialization support.
This causes errors when serialization is triggered on a query that
pushes down dynamic filters from a `HashJoinExec`.

As of  #18393 `HashJoinExec` produces filters of the form:

```sql
CASE (hash_repartition % 2)
    WHEN 0 THEN 
        a >= ab AND a <= ab AND 
        b >= bb AND b <= bb AND 
        hash_lookup(a,b)
    WHEN 1 THEN 
        a >= aa AND a <= aa AND 
        b >= ba AND b <= ba AND 
        hash_lookup(a,b)
    ELSE 
        FALSE
END
```

Where `hash_lookup` is an expression that holds a reference to a given
partitions hash join hash table and will check for membership.

Since we created these new expressions but didn't make any of them
serializable any attempt to do a distributed query or similar would run
into errors.

In #19300 we fixed
`hash_lookup` by replacing it with `true` since it can't be serialized
across the wire (we'd have to send the entire hash table). The logic was
that this preserves the bounds checks, which as still valuable.

This PR handles `hash_repartition` which determines which partition (and
hence which branch of the `CASE` expression) the row belongs to. For
this expression we *can* serialize it, so that's what I'm doing in this
PR.

### Key Changes

- **SeededRandomState wrapper**: Added a `SeededRandomState` struct that
wraps `ahash::RandomState` while preserving the seeds used to create it.
This is necessary because `RandomState` doesn't expose seeds after
creation, but we need them for serialization.

- **Updated seed constants**: Changed `HASH_JOIN_SEED` and
`REPARTITION_RANDOM_STATE` constants to use `SeededRandomState` instead
of raw `RandomState`.

- **HashExpr enhancements**: 
  - Changed `HashExpr` to use `SeededRandomState`
  - Added getter methods: `on_columns()`, `seeds()`, `description()`
  - Exported `HashExpr` and `SeededRandomState` from the joins module

- **Protobuf support**:
- Added `PhysicalHashExprNode` message to `datafusion.proto` with fields
for `on_columns`, seeds (4 `u64` values), and `description`
  - Implemented serialization in `to_proto.rs`
  - Implemented deserialization in `from_proto.rs`

## Test plan

- [x] Added roundtrip test in `roundtrip_physical_plan.rs` that creates
a `HashExpr`, serializes it, deserializes it, and verifies the result
- [x] All existing hash join tests pass (583 tests)
- [x] All proto roundtrip tests pass

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Opus 4.5 <[email protected]>
adriangb added a commit to pydantic/datafusion that referenced this pull request Jan 2, 2026
## Summary

This PR adds protobuf serialization/deserialization support for
`HashExpr`, enabling distributed query execution to serialize hash
expressions used in hash joins and repartitioning.

This is a followup to apache#18393 which introduced `HashExpr` but did not add
serialization support.
This causes errors when serialization is triggered on a query that
pushes down dynamic filters from a `HashJoinExec`.

As of  apache#18393 `HashJoinExec` produces filters of the form:

```sql
CASE (hash_repartition % 2)
    WHEN 0 THEN 
        a >= ab AND a <= ab AND 
        b >= bb AND b <= bb AND 
        hash_lookup(a,b)
    WHEN 1 THEN 
        a >= aa AND a <= aa AND 
        b >= ba AND b <= ba AND 
        hash_lookup(a,b)
    ELSE 
        FALSE
END
```

Where `hash_lookup` is an expression that holds a reference to a given
partitions hash join hash table and will check for membership.

Since we created these new expressions but didn't make any of them
serializable any attempt to do a distributed query or similar would run
into errors.

In apache#19300 we fixed
`hash_lookup` by replacing it with `true` since it can't be serialized
across the wire (we'd have to send the entire hash table). The logic was
that this preserves the bounds checks, which as still valuable.

This PR handles `hash_repartition` which determines which partition (and
hence which branch of the `CASE` expression) the row belongs to. For
this expression we *can* serialize it, so that's what I'm doing in this
PR.

### Key Changes

- **SeededRandomState wrapper**: Added a `SeededRandomState` struct that
wraps `ahash::RandomState` while preserving the seeds used to create it.
This is necessary because `RandomState` doesn't expose seeds after
creation, but we need them for serialization.

- **Updated seed constants**: Changed `HASH_JOIN_SEED` and
`REPARTITION_RANDOM_STATE` constants to use `SeededRandomState` instead
of raw `RandomState`.

- **HashExpr enhancements**: 
  - Changed `HashExpr` to use `SeededRandomState`
  - Added getter methods: `on_columns()`, `seeds()`, `description()`
  - Exported `HashExpr` and `SeededRandomState` from the joins module

- **Protobuf support**:
- Added `PhysicalHashExprNode` message to `datafusion.proto` with fields
for `on_columns`, seeds (4 `u64` values), and `description`
  - Implemented serialization in `to_proto.rs`
  - Implemented deserialization in `from_proto.rs`

## Test plan

- [x] Added roundtrip test in `roundtrip_physical_plan.rs` that creates
a `HashExpr`, serializes it, deserializes it, and verifies the result
- [x] All existing hash join tests pass (583 tests)
- [x] All proto roundtrip tests pass

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Opus 4.5 <[email protected]>
LiaCastaneda pushed a commit to DataDog/datafusion that referenced this pull request Jan 12, 2026
…ache#19300)

*errors* when serializing now, and would break any users using joins +
protobuf.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate proto Related to proto crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants