Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Accelerate Bloom filtered joins #7803

Closed
4 tasks done
jlowe opened this issue Feb 22, 2023 · 11 comments · Fixed by #8892
Closed
4 tasks done

[FEA] Accelerate Bloom filtered joins #7803

jlowe opened this issue Feb 22, 2023 · 11 comments · Fixed by #8892
Assignees
Labels
feature request New feature or request performance A performance related task/issue

Comments

@jlowe
Copy link
Member

jlowe commented Feb 22, 2023

Spark supports an optimization that leverages a Bloom filter to try to reduce the amount of shuffled data before a join. See SPARK-32268 for details. Currently the RAPIDS Accelerator falls back to the GPU for the ObjectHashAggregates that are introduced to build the bloom filter and then falls back to the GPU on the subsequent Filter expression that uses the generated Bloom filter.

The bloom filter construction is technically an aggregation expression but it appears to always be used in a reduction context. Therefore we should be able to write Bloom filter construction and merge kernels without needing Bloom filters to be supported by libcudf groupby aggregations. In addition to the Bloom filter kernels, we would also need to support the xxhash64 function which is used to hash the input data into a 64-bit long which is then subsequently fed into the Bloom filter.

Note that it is important that we match the behavior of the Spark Bloom filter exactly, otherwise we risk data loss if the CPU ends up building the filter but the GPU ends up evaluating it or vice-versa.

Dependencies

For the plugin tagging logic, we need to make sure that we can replace both sides of the joins fully on GPU before committing to changing the plan.

@jlowe jlowe added feature request New feature or request ? - Needs Triage Need team to review and classify performance A performance related task/issue labels Feb 22, 2023
@ttnghia
Copy link
Collaborator

ttnghia commented Feb 23, 2023

Supporting GPU xxhash64 will also benefit other operations such as approx_count_distinct since Spark uses that hasher for implementing that function. If we can have GPU xxhash64, we could also implement a GPU version of approx_count_distinct with the output matching its CPU counterpart.

@ttnghia
Copy link
Collaborator

ttnghia commented Feb 23, 2023

I've filed a cudf issue asking for XXHash64 support: rapidsai/cudf#12829

@jlowe
Copy link
Member Author

jlowe commented Feb 23, 2023

Wouldn't this be a spark-rapids-jni kernel? Or is the thinking that this applies to a non-Spark use case as well?

@ttnghia
Copy link
Collaborator

ttnghia commented Feb 23, 2023

Yes in the meantime I think XXHash64 can be implemented in spark-rapids-jni for faster development loop, as libcudf doesn't need it (now) and doesn't consider it a high priority. We can port it back to libcudf whenever needed.

@mattahrens mattahrens removed the ? - Needs Triage Need team to review and classify label Feb 28, 2023
@jlowe
Copy link
Member Author

jlowe commented Apr 7, 2023

For reference, here's pointers to Spark code related to Bloom filter assisted join processing:

@PointKernel
Copy link
Member

Just for your information, @sleeepyjack is working on a GPU-based bloom filter, cuco::bloom_filter. I know the draft PR ( NVIDIA/cuCollections#101) has been up for a while, but he's actively working on refactoring thus the new/improved code should be ready to use soon.

@sleeepyjack
Copy link

sleeepyjack commented Apr 11, 2023

Can confirm. I was going to mention that in the XXHash issue. Yes, the Bloom filter overhaul (which depends on solving NVIDIA/cuCollections#290 and NVIDIA/cuCollections#284 first) will be my next work item, and I'm prioritizing it. I'm OOO this week, so please expect more updates to come by next week when I'm back at the office.

@sleeepyjack
Copy link

sleeepyjack commented Apr 11, 2023

Note that it is important that we match the behavior of the Spark Bloom filter exactly, otherwise we risk data loss if the CPU ends up building the filter but the GPU ends up evaluating it or vice-versa.

Could you elaborate on the Spark (CPU) Bloom filter implementation? Would it be viable to emulate the GPU filter design/layout on the CPU instead?

Maybe related: Why exactly do you need a 64-bit hash function for this?

@revans2
Copy link
Collaborator

revans2 commented Apr 11, 2023

@sleeepyjack and @PointKernel I am not sure that @jlowe explained all of the requirements that we have perfectly. We really would like to match the Spark CPU implementation bit for bit if possible. This is because the data is produced and/or interpreted in multiple different operators. If we change the implementation, even in a very subtle way, we then would have to be 100% sure that we replaced all CPU operators with GPU versions or a compatible CPU version. This gets really complicated to do. Especially when it is not an intermediate result. In theory a user could create a bloom filter, then write out the result to Parquet or ORC for later use. We would have no way to guarantee that every spark job that reads in that column has our plugin and could replace how the serialized bloom filter is used. I know that this is a rather far fetched example, but I would rather err on the safe side if possible.

@jlowe
Copy link
Member Author

jlowe commented Apr 11, 2023

Could you elaborate on the Spark (CPU) Bloom filter implementation?

See the bloom filter code link above which points to the code the CPU uses, especially putLong and mightContainLong which cover how they insert and probe, respectively.

Why exactly do you need a 64-bit hash function for this?

Spark uses xxhash64 to hash arbitrary objects into a 64-bit long which is then in turn fed to the bloom filter (either for creation or probing). Not exactly sure why they decided to hash-the-hash instead of hashing the object directly in the bloom filter. It might be related to the cost of hashing certain objects, and hashing-the-hash would be cheaper in that case.

I agree with @revans2 that we really would like a Spark-compatible implementation of this bloom filter. First is that it's far simpler for us to plan properly, as discussed above. The other reason to keep them in sync is then we get the exact same behavior as the CPU--only plan in terms of how large the filter is and amount of data passing through the filter.

It is possible to deal with a custom bloom filter solution, but it makes the planning side far trickier to get a proper plan that won't corrupt data.

Note that any solution we use requires the bloom filter to be serializable, as we need to ship it across nodes in the cluster.

@revans2
Copy link
Collaborator

revans2 commented Apr 11, 2023

Yes, I made the mistake that I thought the bloom filter operations where actually exposed publicly in Spark, but they are not. So we can make the assumption that if someone goes through the front door then we will not run into odd situations.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request performance A performance related task/issue
Projects
None yet
7 participants