Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: Hook to better support CollectLeft joins in distributed execution #12454

Open
thinkharderdev opened this issue Sep 13, 2024 · 24 comments
Labels
enhancement New feature or request

Comments

@thinkharderdev
Copy link
Contributor

Is your feature request related to a problem or challenge?

Suppose you are building a distributed query engine on top of DataFusion and you want to run a query like

SELECT facts.fact_value, data.id, data.fact_id
FROM facts OUTER JOIN data
ON data.fact_id = fact.id

where facts is a small "fact" table and data is some HUGE table (many, many TB lets assume).

The optimal way to do this in a single node execution is probably using CollectLeft since fact is small, but this doesn't really work in a distributed join because CollectLeft joins rely on in-memory state.

The correct way to do this in a distributed execution is to use a partitioned join and repartition data but this is a problem because data is huge and the repartition would require shuffling a potentially massive amount of data.

Describe the solution you'd like

Add a "hook" in HashJoinExec that would allow shared state to be managed in a distributed execution in a user-defined way.

This might look something like

pub struct DistributedJoinState {
    state_impl: Arc<dyn DistributedJoinStateImpl>,
}

impl DistributedJoinState {
    pub fn new(state_impl: Arc<dyn DistributedJoinStateImpl>) -> Self {
        Self { state_impl }
    }
}

pub enum DistributedProbeState {
    // Probes are still running in other distributed tasks
    Continue,
    // Current task is last probe running so emit unmatched rows
    // if required by join type
    Ready(BooleanBufferBuilder)
}

pub trait DistributedJoinStateImpl: Send + Sync + 'static {
    /// Poll the distributed state with the current task's build side visited bit mask
    fn poll_probe_completed(&self, mask: &BooleanBufferBuilder, cx: &mut Context<'_>) -> Poll<Result<DistributedProbeState>>;
}

type SharedBitmapBuilder = Mutex<BooleanBufferBuilder>;

/// HashTable and input data for the left (build side) of a join
struct JoinLeftData {
    /// The hash table with indices into `batch`
    hash_map: JoinHashMap,
    /// The input rows for the build side
    batch: RecordBatch,
    /// Shared bitmap builder for visited left indices
    visited_indices_bitmap: Mutex<BooleanBufferBuilder>,
    /// Counter of running probe-threads, potentially
    /// able to update `visited_indices_bitmap`
    probe_threads_counter: AtomicUsize,
    distributed_state: Option<Arc<DistributedJoinState>>,
    /// Memory reservation that tracks memory used by `hash_map` hash table
    /// `batch`. Cleared on drop.
    #[allow(dead_code)]
    reservation: MemoryReservation,
}

That is, JoinLeftData can have an optional DistributedJoinState that can be passed in through the TaskContext during execution. If not provided then everything works exactly as it does now. But if it is provided, then HashJoinStream can poll the distributed state when it's last (local) probe task completes and, if its the last global probe task, emit the unmatched rows based on the global bit mask.

Describe alternatives you've considered

Do nothing and rely on only hash partitioned joins for distributed use cases

Additional context

This sort of goes against the idea that DataFusion itself is not a library for distributed query execution, but given that many use cases of DF are in fact for distributed execution it might make sense to provide hooks for that directly in DF as long as they don't add any meaningful overhead to the single-node execution model.

If that is not the way we want to go then totally fine, just raising the question :)

@thinkharderdev thinkharderdev added the enhancement New feature or request label Sep 13, 2024
@alamb
Copy link
Contributor

alamb commented Sep 13, 2024

One challenge I predict with the above scenario is that it seems to assume that the order of rows from the build side will be the same on all nodes across all partitions (so you can match up the BooleanBuffer across ndoes)

This sort of goes against the idea that DataFusion itself is not a library for distributed query execution, b

I think adding hooks for making distributed engines is a very reasonable discussion

The correct way to do this in a distributed execution is to use a partitioned join and repartition data but this is a problem because data is huge and the repartition would require shuffling a potentially massive amount of data.

I am not sure this is the "correct" way though it is certainly one way to do it.

It seems like the core challenge you are describing is finding all rows in a small facts table that did not match any rows when joined with all rows of an arbitrarily distributed data table

You could manage this via a distributed state as you suggest. Another way might be to rewrite the query (automatically) to do the check of which rows didn't match on a single node

The single node might seem like a bad idea, but if the facts is really small the cost of rehashing it is probably low

So do something like

WITH 
  (SELECT facts.fact_value fact_value, data.id did, data.fact_id fact_id
  FROM facts JOIN data -- Note this is now INNER join, done in distributed fashion
  ON data.fact_id = fact.id)
as join_result
SELECT 
  f1.fv fv, 
  did,  
  f1.fid fid
FROM facts f1 OUTER JOIN join_result --- this outer query fills in all missing fact rows
ON f1.fact_id = join_result.fact_id

The idea is that you run the outer query either on a single node or after redistributing both sides on fid

This does mean you have to hash facts again, but you don't have to move the data tables around

@thinkharderdev
Copy link
Contributor Author

I am not sure this is the "correct" way though it is certainly one way to do it.

Yeah, fair. I meant basically that it seems like the way to do it that does not require any distributed state shared across distributed execution nodes.

So do something like

This was an approach we tried (or something very similar in spirit at least) but it breaks down when you have a join + aggregation. E.g. with a query like

SELECT data.key, SUM(CASE WHEN facts.fact_value IS NULL THEN 0 ELSE 1) as matched, SUM(CASE WHEN facts.fact_value IS NULL THEN 1 ELSE 0) as unmatched
FROM facts OUTER JOIN data
ON data.fact_id = fact.id
GROUP BY data.key

that tries to calculate the matched vs unmatched rows from data.

By adding a coalesce to do the outer join you can no longer fuse the partial aggregation with the hash join and you end up back in the position of having to shuffle huge amounts of data

FWIW, I implemented my proposal from above on our internal fork of DataFusion and it's not terribly intrusive in the DataFusion code (IMO): coralogix#269

@thinkharderdev
Copy link
Contributor Author

One challenge I predict with the above scenario is that it seems to assume that the order of rows from the build side will be the same on all nodes across all partitions (so you can match up the BooleanBuffer across ndoes)

Yeah, this is definitely a challenge in the general case if the build-side subquery is inlined into the hash join. In our case the build side subquery is a separate stage so we can pretty easily ensure a consistent row ordering since the its read from a shuffle

@alamb
Copy link
Contributor

alamb commented Sep 13, 2024

FWIW, I implemented my proposal from above on our internal fork of DataFusion and it's not terribly intrusive in the DataFusion code (IMO): coralogix#269

I would defer to @korowa and @comphead , who I think are our current join experts.

@comphead
Copy link
Contributor

If there 1 huge table and another one is small, in this case the broadcastJoin/mapJoin works best. in distributed env(which is not DF) the big table repartitioned across nodes(every node can be DF), then small table copied to every node and usual hash join happens. The final result needs to be merged from nodes to the stream.

@alamb
Copy link
Contributor

alamb commented Sep 14, 2024

You are right about broadcast join, but I think for OUTER JOIN cases the relation that is not preserved (aka the one that is not being padded with nulls) is what is broadcast and the other needs to be partitioned on the join key (to ensure all possible non-matching rows occur on only one node). In this case I think the distribution isn't quite right

Though @thinkharderdev maybe that is another idea: how about do the OUTER JOIN across all tables and then run the results through a second operator that removes any duplicate NULL padded rows 🤔

@thinkharderdev
Copy link
Contributor Author

If there 1 huge table and another one is small, in this case the broadcastJoin/mapJoin works best. in distributed env(which is not DF) the big table repartitioned across nodes(every node can be DF), then small table copied to every node and usual hash join happens. The final result needs to be merged from nodes to the stream.

Right, this is what we are doing now but the problem is that for an OUTER join this doesn't actually work in a distributed environment because it produces inconsistent results based on the exact partitioning of the probe side on each execution.

@thinkharderdev
Copy link
Contributor Author

Though @thinkharderdev maybe that is another idea: how about do the OUTER JOIN across all tables and then run the results through a second operator that removes any duplicate NULL padded rows 🤔

That would still require coalescing all the output partitions from the hash join into a single partition and processing that stream on a single node.

@Dandandan
Copy link
Contributor

Dandandan commented Sep 14, 2024

If there 1 huge table and another one is small, in this case the broadcastJoin/mapJoin works best. in distributed env(which is not DF) the big table repartitioned across nodes(every node can be DF), then small table copied to every node and usual hash join happens. The final result needs to be merged from nodes to the stream.

This unfortunately is a bug that is present in Ballista as well, for left and full outer joins (every join type that produces rows for unmatched left side rows), a plain broadcast join won't work when planned on multiple nodes, I will file an issue on Ballista repo as well. Sharing the bitmap (to one node) has the least overhead, every other solution will require moving the full output to one node, so the performance depends more on the join output size.

FYI @andygrove

@alamb
Copy link
Contributor

alamb commented Sep 15, 2024

Though @thinkharderdev maybe that is another idea: how about do the OUTER JOIN across all tables and then run the results through a second operator that removes any duplicate NULL padded rows 🤔

That would still require coalescing all the output partitions from the hash join into a single partition and processing that stream on a single node.

That is right (or alternately repartitioning the facts table and the output of the join

Depending on the join's output cardinality that might not be too bad (it is certainly better than repartitioning the base data table) but it could also be bad.

@alamb
Copy link
Contributor

alamb commented Sep 15, 2024

It probably isn't feasible, but the ideal way to run such a query is to pre-partitioning the data table such that each distinct value of fact_id is only stored on one node.

(this is how Vertica does this type of join, FWIW)

If re-partitioning the entire data table is too large, you could potentially only repartition a projection of fact_id (to quickly know what rows aren't matching) 🤔

@thinkharderdev
Copy link
Contributor Author

It probably isn't feasible, but the ideal way to run such a query is to pre-partitioning the data table such that each distinct value of fact_id is only stored on one node.

(this is how Vertica does this type of join, FWIW)

That would essentially just be the hash partitioning right? Or do you mean actually storing the underlying table data so it is already partitioned and doesn't require a repartition during query execution? The latter wouldn't be possible in our case since we don't know the specific query patterns that are required up front.

@thinkharderdev
Copy link
Contributor Author

Depending on the join's output cardinality that might not be too bad (it is certainly better than repartitioning the base data table) but it could also be bad.

In our case it doesn't help much since the output cardinality can be (in principle) even larger than the size of data

@comphead
Copy link
Contributor

It still not very clear for me what exactly required to be done on DF side 🤔
Referring to Parallel HJ concepts https://www.youtube.com/watch?v=QCTyOLvzR88 some external process needs to repartition(it can be hash or by specific columns) the data before sending it to the nodes. More on partitioning strategies is in https://www.youtube.com/watch?v=S40K8iGa8Ek

@thinkharderdev
Copy link
Contributor Author

It still not very clear for me what exactly required to be done on DF side

The basic idea (which I have implemented on our internal fork here coralogix#269) is to provide a hook in DataFusion which can be passed in through the TaskContext. The hook is to allow HashJoinStream to share state between partitions potentially running on different nodes. Specifically, it is sufficient to just share the bitmask of matched rows on the build side, then the last partition to execute can emit the unmatched rows.

Referring to Parallel HJ concepts https://www.youtube.com/watch?v=QCTyOLvzR88 some external process needs to repartition(it can be hash or by specific columns) the data before sending it to the nodes. More on partitioning strategies is in https://www.youtube.com/watch?v=S40K8iGa8Ek

Hash partitioning is the standard way to do this in general I agree but its just not possible in practice to shuffle that much data as required by hash partitioning. The best performing way to do it by far is a broadcast join which is currently not possible.

My opinion personally is that since DataFusion is being used by a number of organizations to build distributed query engines (even if DF itself is not that) it is reasonable to add hooks/extension points in DF to support that use case as long as

  1. It doesn't add any measurable overhead to single node query execution (eg it should be explicitly opt-in)
  2. It doesn't add significant code complexity and maintenance burden

If this conversation makes sense to concretize this on a particular code change I'm happy to submit a PR here to frame things.

@karlovnv
Copy link

We had a discussion about join of huge table with small table here: #7000 (comment)

There are several approaches discussed:

  1. Use Dictionaries (like in Clickhouse, details) with dictionary api (UDF like get_dict('users', user_id))

ClickHouse supports special functions for working with dictionaries that can be used in queries. It is easier and more efficient to use dictionaries with functions than a JOIN with reference tables.

  1. Use row-based layout for small table with btree/hash index for getting data by a key while joining (in poll method)
    This may lead us to create a special type of tables like RowBasedTable with special RowBasedTableProvider, which can give an access to data using hash-based indices or iterator for btree ones.

@thinkharderdev
Copy link
Contributor Author

We had a discussion about join of huge table with small table here: #7000 (comment)

There are several approaches discussed:

  1. Use Dictionaries (like in Clickhouse, details) with dictionary api (UDF like get_dict('users', user_id))

ClickHouse supports special functions for working with dictionaries that can be used in queries. It is easier and more efficient to use dictionaries with functions than a JOIN with reference tables.

  1. Use row-based layout for small table with btree/hash index for getting data by a key while joining (in poll method)
    This may lead us to create a special type of tables like RowBasedTable with special RowBasedTableProvider, which can give an access to data using hash-based indices or iterator for btree ones.

I don't think either of these would actually solve the issue with outer joins. The problem is that there is shared state across the partitions in the join stage about which build-side rows have been matched across the entire stage. In single-node execution you can deal with this as shared in-memory state (which is what HashJoinStream does) but in a distributed environment there is no way to share that state without some external mechanism for the execution nodes to communicate. The only way to deal with it (in the current execution model) is to coalesce back to a single node which is impractical with datasets of sufficient size.

@karlovnv
Copy link

karlovnv commented Sep 17, 2024

We had a discussion about join of huge table with small table here: #7000 (comment)
There are several approaches discussed:

I don't think either of these would actually solve the issue with outer joins.

I understand your point. I mentioned ClickHouse functionality and Dictionaries just to show that sometimes it is not necessary to do join when you'd like to do join. I agree that this will work only for inner joins (the we can replace join by getting data from dictionary).
ClickHouse has copies of dictionaries (as files, as connections to an external DB, etc) at each node to avoid sync process of join / dictionary replacements across the cluster

in a distributed environment there is no way to share that state without some external mechanism

Yes, this is true. There are some examples:
Greenplum for instance sends portions of data (and state) across the shards to parallelize execution. It's such a difficult thing to develop. Shards of ClickHouse don't send their data in that manner, so CH allows to do join only two distributed tables, no more.
That why some people choose Greenplum for its ability to perform any query despite the fact that it can be slow, some chooses CH for its speed despite its limitations.

So I think that if we can consider small Facts table as a some kind of dictionary, and build HashJoin index on each node using copies on this table.

This can help us to do join without resort to develop an external mechanism for communication.

@thinkharderdev
Copy link
Contributor Author

So I think that if we can consider small Facts table as a some kind of dictionary, and build HashJoin index on each node using copies on this table.

This is effectively what CollectLeft already does right?

@comphead
Copy link
Contributor

@thinkharderdev thats totally true, outer joins, and especially filtered outer joins require to track filtered/matched indexes before emitting the final result. The similar problem we encountered in SortMergeJoin #12359 even in a single node environment but the idea is the same. The processed partition has no idea about other partitions and the row can find a match in partition0, but no match in partition1. In this case the join result emitted not correctly.

There are probably 2 options to handle it:

  • Final join stage - which require sending data to some final join stage. to avoid data to be sent to a single node you have to partition that way so the same key has to be on same partition. This is skew prone of course.
  • Copy small table to every node, but again it will require proper partitioning to preserve the same key within the same partition so you can track matches correctly.

Both of approaches require to partition the keys appropriately.

@comphead
Copy link
Contributor

comphead commented Sep 17, 2024

this animation helps a lot for partitioning https://www.youtube.com/watch?v=GRONctC_Uh0

@thinkharderdev
Copy link
Contributor Author

There are probably 2 options to handle it

The third option which I am proposing here is to create an extension point to allow sharing just the bitmask. We don't have to (and shouldn't) try and solve the actual coordination of this sharing in DF because that is not what DF is, but I'm suggesting that we add the hooks that allow it to be implemented outside of DF for use cases (like ours) where it is needed.

@alamb
Copy link
Contributor

alamb commented Sep 17, 2024

There are probably 2 options to handle it

The third option which I am proposing here is to create an extension point to allow sharing just the bitmask.

I personally think the hook is fine as long as it is clearly documented (ideally with an example, but that is not required).

@comphead
Copy link
Contributor

I like hook idea, lets see a PR so we can talk in details there.

Just to double confirm @thinkharderdev you planning to send a bitmask or array of matched/nonmatched left indices to all nodes to calculate correctly the outer join?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

5 participants