feat(cudf): Add cuDF based OrderBy operator#12735
feat(cudf): Add cuDF based OrderBy operator#12735devavret wants to merge 40 commits intofacebookincubator:mainfrom
Conversation
- West const - header cleanup - nodiscard
✅ Deploy Preview for meta-velox canceled.
|
bdice
left a comment
There was a problem hiding this comment.
Leaving a few explanatory comments for reviewers. Thanks in advance for reviewing!
Lots of credit to @karthikeyann, @devavret, @mhaseeb123, @GregoryKimball on the cuDF side, and lots of appreciation to @oerling @pedroerp @Yuhta @kgpai @assignUser (and more!) on the Meta / Voltron side for all your assistance. We are looking forward to upstreaming more features after this initial PR lands.
There was a problem hiding this comment.
We are pinning this to specific commits of cuDF and its dependencies to avoid breakage from any final changes in the 25.04 release. Once the RAPIDS 25.04 release is out (currently targeting April 9-10), we can remove a lot of this logic for rapids-cmake, rmm, and kvikio -- and just pin cuDF to the stable release.
| endif() | ||
| find_package(CUDAToolkit REQUIRED) | ||
| if(VELOX_ENABLE_CUDF) | ||
| set(VELOX_ENABLE_ARROW ON) |
There was a problem hiding this comment.
cuDF itself does not need Arrow (cuDF uses nanoarrow), but the Velox-cuDF interop requires Arrow functionality in Velox.
| dnf_install autoconf automake python3-devel pip libtool | ||
|
|
||
| pip install cmake==3.28.3 | ||
| pip install cmake==3.30.4 |
There was a problem hiding this comment.
cuDF and its dependencies require CMake 3.30.4. That CMake version shipped with a fix for finding some CUDA Toolkit components that cuDF and its dependencies use.
There was a problem hiding this comment.
Could you add a cmake_minimum_required to the top of cudf.cmake with a comment?
|
Hi @devavret! Thank you for your pull request and welcome to our community. Action RequiredIn order to merge any pull request (code, docs, etc.), we require contributors to sign our Contributor License Agreement, and we don't seem to have one on file for you. ProcessIn order for us to review and merge your suggested changes, please sign at https://code.facebook.com/cla. If you are contributing on behalf of someone else (eg your employer), the individual CLA may not be sufficient and your employer may need to sign the corporate CLA. Once the CLA is signed, our tooling will perform checks and validations. Afterwards, the pull request will be tagged with If you have received this in error or have any questions, please contact us at cla@meta.com. Thanks! |
|
Thank you for signing our Contributor License Agreement. We can now accept your code for this (and any) Meta Open Source project. Thanks! |
|
|
||
| DECLARE_bool(velox_cudf_enabled); | ||
| DECLARE_string(velox_cudf_memory_resource); | ||
|
|
There was a problem hiding this comment.
Can you also declare velox_cudf_debug` here? I also need to set the flag.
| return nullptr; | ||
| } | ||
| finished_ = noMoreInput_; | ||
| return outputTable_; |
There was a problem hiding this comment.
The output table might be a very big table, containing all the input data, which is not allowed.
|
Failed by PlanNode destructor |
| bool operator()(const exec::DriverFactory& factory, exec::Driver& driver) { | ||
| auto state = CompileState(factory, driver, *planNodes_); | ||
| // Stored planNodes_ from inspect. | ||
| auto res = state.compile(); |
There was a problem hiding this comment.
call planNodes_->clear(); here can solve this issue #12735 (comment)
There was a problem hiding this comment.
I don't believe this is quite right. In plans involving multiple pipelines, the compile operator is called multiple times, once for each pipeline. So we need these stored plan nodes in all subsequent calls. Would adding a separate clear() method to the cudf driver adapter help? If you know when the task is finished, you can manually clear out the adapter.
There was a problem hiding this comment.
Do you mean the Task parallel execution mode? I read the wave DriverAdaper https://github.com/facebookincubator/velox/blob/main/velox/experimental/wave/exec/ToWave.cpp#L251, it does not need to store the plan nodes, is there any difference?
There was a problem hiding this comment.
Gluten uses the single thread task mode, so It's ok to clear it here. And if we have multiple drivers, after createAndStartDrivers in Task::start, can we clear the PlanNodes?
Is there any difference between the planNodes stored in DriverFactory and CompileState?
struct DriverFactory {
std::vector<std::shared_ptr<const core::PlanNode>> planNodes;
There was a problem hiding this comment.
The adapters is saved to static zone, if we call the clear after the task is finished, we don't know which driver adapter is attached to the task. Gluten has parallel tasks in a single machine.
There was a problem hiding this comment.
Do you mean the Task parallel execution mode?
I meant plans that say have a join in them. Those would need to be run through the compile at least twice, once for each branch.
There was a problem hiding this comment.
The adapters is saved to static zone, if we call the clear after the task is finished, we don't know which driver adapter is attached to the task. Gluten has parallel tasks in a single machine.
You wouldn't need to wait until a task is finished. You only need to wait until operator replacements have been made in it.
To be clear, I am not standing my ground here. I just think we need a different solution to the one suggested.
There was a problem hiding this comment.
Is there any difference between the planNodes stored in DriverFactory and CompileState?
Yes, the CompileState stores planNodes from the whole task while planNodes stored in DriverFactory are only from the current pipeline. The latter sometimes excludes planNodes which contribute operators to multiple pipelines like partition and hash join.
But I observed now that if the required planNode cannot be found in DriverFactory::planNodes then it's usually found in DriverFactory::consumerNode. I've made this change in 5575205 and it seems to work fine both for this PR and for our internal fork with all tpch operators replaced.
@karthikeyann since you originally wrote this piece, can you verify this commit?
There was a problem hiding this comment.
It looks good. I will verify this with all tpch queries. (with partition and hashjoin).
There was a problem hiding this comment.
I verified with tpch benchmarks. It works.
| tableViews, stream, cudf::get_current_device_resource_ref()); | ||
| } | ||
|
|
||
| std::unique_ptr<cudf::table> getConcatenatedTable( |
There was a problem hiding this comment.
What if the number of ConcatenatedTable rows is beyond the range? We have a case that accumulated rows in OrderBy is beyond vector_size_t range, #10848
There was a problem hiding this comment.
@jinchengchenghh how does velox handle larger than vector_size_t range (all inputs together) for Orderby?
There was a problem hiding this comment.
velox stores the input in RowContainer, and store the sortedRows pointer in std::vector, so I assume the input rows it can sort is size_t, and extract the outputRows with data type vector_size_t, it will output several batches whose size should satisfy the output rows config.
|
@jinchengchenghh Unfortunately my attempts for simple fixes for the decimal issue have been unsuccessful. I'm going to have to postpone decimal support to a follow up PR. The planNode lifetime issue should be fixed but I have no way of verifying that. Can you give another look to see if there's any showstopping bugs in this PR? |
removing fixDictionaryIndices after cudf changed dictionary indices to signed
|
@pedroerp has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
|
Thanks, I will verify it. @devavret |
Summary: This PR adds a cuDF based OrderBy operator and tooling to replace existing Velox based operators. This includes: - CudfVector class that holds a cudf::table and is a replacement for Velox's RowVector when dealing with cuDF. - Interop code to convert between Velox and cuDF RowVectors. - CudfToVelox and CudfFromVelox operators that sit between the cuDF and Velox operators and handle the conversion of RowVectors to cudf::table and back. - A cuDF driver adapter that converts Velox operators to cuDF operators. - Nvtx tooling to help with profiling Pull Request resolved: facebookincubator#12735 Reviewed By: Yuhta Differential Revision: D73003714 Pulled By: pedroerp fbshipit-source-id: 5ac1e3db2d3754528802f51ded42b43e7250f191
|
Thanks for your actively update, I have verified in Gluten, all the issues are resolved. @devavret |
This PR adds a cuDF based OrderBy operator and tooling to replace existing
Velox based operators. This includes:
RowVector when dealing with cuDF.
operators and handle the conversion of RowVectors to cudf::table and back.