diff --git a/CMake/resolve_dependency_modules/cudf.cmake b/CMake/resolve_dependency_modules/cudf.cmake new file mode 100644 index 00000000000..56f7b9bf1d1 --- /dev/null +++ b/CMake/resolve_dependency_modules/cudf.cmake @@ -0,0 +1,95 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +include_guard(GLOBAL) + +# 3.30.4 is the minimum version required by cudf +cmake_minimum_required(VERSION 3.30.4) + +set(VELOX_rapids_cmake_VERSION 25.04) +set(VELOX_rapids_cmake_BUILD_SHA256_CHECKSUM + 458c14eaff9000067b32d65c8c914f4521090ede7690e16eb57035ce731386db) +set(VELOX_rapids_cmake_SOURCE_URL + "https://github.com/rapidsai/rapids-cmake/archive/7828fc8ff2e9f4fa86099f3c844505c2f47ac672.tar.gz" +) +velox_resolve_dependency_url(rapids_cmake) + +set(VELOX_rmm_VERSION 25.04) +set(VELOX_rmm_BUILD_SHA256_CHECKSUM + 294905094213a2d1fd8e024500359ff871bc52f913a3fbaca3514727c49f62de) +set(VELOX_rmm_SOURCE_URL + "https://github.com/rapidsai/rmm/archive/d8b7dacdeda302d2e37313c02d14ef5e1d1e98ea.tar.gz" +) +velox_resolve_dependency_url(rmm) + +set(VELOX_kvikio_VERSION 25.04) +set(VELOX_kvikio_BUILD_SHA256_CHECKSUM + 4a0b15295d0a397433930bf9a309e4ad2361b25dc7a7b3e6a35d0c9419d0cb62) +set(VELOX_kvikio_SOURCE_URL + "https://github.com/rapidsai/kvikio/archive/5c710f37236bda76e447e929e17b1efbc6c632c3.tar.gz" +) +velox_resolve_dependency_url(kvikio) + +set(VELOX_cudf_VERSION 25.04) +set(VELOX_cudf_BUILD_SHA256_CHECKSUM + e5a1900dfaf23dab2c5808afa17a2d04fa867d2892ecec1cb37908f3b73715c2) +set(VELOX_cudf_SOURCE_URL + "https://github.com/rapidsai/cudf/archive/4c1c99011da2c23856244e05adda78ba66697105.tar.gz" +) +velox_resolve_dependency_url(cudf) + +# Use block so we don't leak variables +block(SCOPE_FOR VARIABLES) +# Setup libcudf build to not have testing components +set(BUILD_TESTS OFF) +set(CUDF_BUILD_TESTUTIL OFF) +set(BUILD_SHARED_LIBS ON) + +FetchContent_Declare( + rapids-cmake + URL ${VELOX_rapids_cmake_SOURCE_URL} + URL_HASH ${VELOX_rapids_cmake_BUILD_SHA256_CHECKSUM} + UPDATE_DISCONNECTED 1) + +FetchContent_Declare( + rmm + URL ${VELOX_rmm_SOURCE_URL} + URL_HASH ${VELOX_rmm_BUILD_SHA256_CHECKSUM} + UPDATE_DISCONNECTED 1) + +FetchContent_Declare( + kvikio + URL ${VELOX_kvikio_SOURCE_URL} + URL_HASH ${VELOX_kvikio_BUILD_SHA256_CHECKSUM} + SOURCE_SUBDIR cpp + UPDATE_DISCONNECTED 1) + +FetchContent_Declare( + cudf + URL ${VELOX_cudf_SOURCE_URL} + URL_HASH ${VELOX_cudf_BUILD_SHA256_CHECKSUM} + SOURCE_SUBDIR cpp + UPDATE_DISCONNECTED 1) + +FetchContent_MakeAvailable(cudf) + +# cudf sets all warnings as errors, and therefore fails to compile with velox +# expanded set of warnings. We selectively disable problematic warnings just for +# cudf +target_compile_options( + cudf PRIVATE -Wno-non-virtual-dtor -Wno-missing-field-initializers + -Wno-deprecated-copy) + +unset(BUILD_SHARED_LIBS) +endblock() diff --git a/CMakeLists.txt b/CMakeLists.txt index 8c98220533c..b7aedf3bbe2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -238,7 +238,8 @@ find_package(OpenSSL REQUIRED) if(VELOX_ENABLE_CCACHE AND NOT CMAKE_C_COMPILER_LAUNCHER - AND NOT CMAKE_CXX_COMPILER_LAUNCHER) + AND NOT CMAKE_CXX_COMPILER_LAUNCHER + AND NOT CMAKE_CUDA_COMPILER_LAUNCHER) find_program(CCACHE_FOUND ccache) @@ -246,6 +247,7 @@ if(VELOX_ENABLE_CCACHE message(STATUS "Using ccache: ${CCACHE_FOUND}") set(CMAKE_C_COMPILER_LAUNCHER ${CCACHE_FOUND}) set(CMAKE_CXX_COMPILER_LAUNCHER ${CCACHE_FOUND}) + set(CMAKE_CUDA_COMPILER_LAUNCHER ${CCACHE_FOUND}) # keep comments as they might matter to the compiler set(ENV{CCACHE_COMMENTS} "1") endif() @@ -384,7 +386,7 @@ if(ENABLE_ALL_WARNINGS) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra ${KNOWN_WARNINGS}") endif() -if(${VELOX_ENABLE_GPU}) +if(VELOX_ENABLE_GPU) enable_language(CUDA) # Determine CUDA_ARCHITECTURES automatically. cmake_policy(SET CMP0104 NEW) @@ -396,6 +398,19 @@ if(${VELOX_ENABLE_GPU}) add_compile_options("$<$:-G>") endif() find_package(CUDAToolkit REQUIRED) + if(VELOX_ENABLE_CUDF) + foreach(arch ${CMAKE_CUDA_ARCHITECTURES}) + if(arch LESS 70) + message( + FATAL_ERROR + "CUDA architecture ${arch} is below 70. CUDF requires Volta (SM 70) or newer GPUs." + ) + endif() + endforeach() + set(VELOX_ENABLE_ARROW ON) + velox_set_source(cudf) + velox_resolve_dependency(cudf) + endif() endif() # Set after the test of the CUDA compiler. Otherwise, the test fails with diff --git a/Makefile b/Makefile index c5c8888f231..f21453833ad 100644 --- a/Makefile +++ b/Makefile @@ -101,7 +101,7 @@ cmake: #: Use CMake to create a Makefile build system ${EXTRA_CMAKE_FLAGS} cmake-gpu: - $(MAKE) EXTRA_CMAKE_FLAGS="${EXTRA_CMAKE_FLAGS} -DVELOX_ENABLE_GPU=ON" cmake + $(MAKE) EXTRA_CMAKE_FLAGS="${EXTRA_CMAKE_FLAGS} -DVELOX_ENABLE_GPU=ON -DVELOX_ENABLE_CUDF=ON" cmake build: #: Build the software based in BUILD_DIR and BUILD_TYPE variables cmake --build $(BUILD_BASE_DIR)/$(BUILD_DIR) -j $(NUM_THREADS) @@ -125,11 +125,11 @@ minimal: #: Minimal build $(MAKE) build BUILD_DIR=release gpu: #: Build with GPU support - $(MAKE) cmake BUILD_DIR=release BUILD_TYPE=release EXTRA_CMAKE_FLAGS="${EXTRA_CMAKE_FLAGS} -DVELOX_ENABLE_GPU=ON" + $(MAKE) cmake BUILD_DIR=release BUILD_TYPE=release EXTRA_CMAKE_FLAGS="${EXTRA_CMAKE_FLAGS} -DVELOX_ENABLE_GPU=ON -DVELOX_ENABLE_CUDF=ON" $(MAKE) build BUILD_DIR=release gpu_debug: #: Build with debugging symbols and GPU support - $(MAKE) cmake BUILD_DIR=debug BUILD_TYPE=debug EXTRA_CMAKE_FLAGS="${EXTRA_CMAKE_FLAGS} -DVELOX_ENABLE_GPU=ON" + $(MAKE) cmake BUILD_DIR=debug BUILD_TYPE=debug EXTRA_CMAKE_FLAGS="${EXTRA_CMAKE_FLAGS} -DVELOX_ENABLE_GPU=ON -DVELOX_ENABLE_CUDF=ON" $(MAKE) build BUILD_DIR=debug dwio: #: Minimal build with dwio enabled. diff --git a/scripts/setup-centos9.sh b/scripts/setup-centos9.sh index 2ce9f869317..6c72f96c9ec 100755 --- a/scripts/setup-centos9.sh +++ b/scripts/setup-centos9.sh @@ -68,7 +68,7 @@ function install_build_prerequisites { dnf_install ninja-build cmake ccache gcc-toolset-12 git wget which dnf_install autoconf automake python3-devel pip libtool - pip install cmake==3.28.3 + pip install cmake==3.30.4 if [[ ${USE_CLANG} != "false" ]]; then install_clang15 diff --git a/velox/CMakeLists.txt b/velox/CMakeLists.txt index 1d9bca43fda..2d9cb2342a5 100644 --- a/velox/CMakeLists.txt +++ b/velox/CMakeLists.txt @@ -70,6 +70,9 @@ if(${VELOX_ENABLE_DUCKDB}) endif() if(${VELOX_ENABLE_GPU}) + if(${VELOX_ENABLE_CUDF}) + add_subdirectory(experimental/cudf) + endif() add_subdirectory(experimental/gpu) add_subdirectory(experimental/wave) add_subdirectory(external/jitify) diff --git a/velox/experimental/cudf/.clang-format b/velox/experimental/cudf/.clang-format new file mode 100644 index 00000000000..7b028e6ff68 --- /dev/null +++ b/velox/experimental/cudf/.clang-format @@ -0,0 +1,27 @@ +BasedOnStyle: InheritParentConfig +IncludeBlocks: Regroup +IncludeCategories: + - Regex: '^"velox/experimental/' # velox/experimental includes + Priority: 0 + - Regex: '^"' # quoted includes + Priority: 1 + - Regex: '^<(benchmarks|tests)/' # benchmark includes + Priority: 2 + - Regex: '^ + readability-identifier-naming, + modernize-use-nullptr, + modernize-use-using + +HeaderFilterRegex: '.*' + +WarningsAsErrors: '' + +CheckOptions: + # Naming conventions as explicitly stated in CODING_STYLE.md + - key: readability-identifier-naming.ClassCase + value: CamelCase + - key: readability-identifier-naming.StructCase + value: CamelCase + - key: readability-identifier-naming.EnumCase + value: CamelCase + - key: readability-identifier-naming.TypeAliasCase + value: CamelCase + - key: readability-identifier-naming.TypeTemplateParameterCase + value: CamelCase + - key: readability-identifier-naming.FunctionCase + value: camelBack + - key: readability-identifier-naming.VariableCase + value: camelBack + - key: readability-identifier-naming.ParameterCase + value: camelBack + - key: readability-identifier-naming.PrivateMemberCase + value: camelBack + - key: readability-identifier-naming.PrivateMemberSuffix + value: _ + - key: readability-identifier-naming.ProtectedMemberCase + value: camelBack + - key: readability-identifier-naming.ProtectedMemberSuffix + value: _ + - key: readability-identifier-naming.MacroDefinitionCase + value: UPPER_CASE + - key: readability-identifier-naming.NamespaceCase + value: lower_case + - key: readability-identifier-naming.StaticConstantPrefix + value: k + - key: readability-identifier-naming.EnumConstantCase + value: CamelCase + - key: readability-identifier-naming.EnumConstantPrefix + value: k + + # Use nullptr instead of NULL or 0 + - key: modernize-use-nullptr.NullMacros + value: 'NULL' + + # Prefer enum class over enum + - key: modernize-use-using.IgnoreUsingStdAllocator + value: 1 \ No newline at end of file diff --git a/velox/experimental/cudf/CMakeLists.txt b/velox/experimental/cudf/CMakeLists.txt new file mode 100644 index 00000000000..6d400056c35 --- /dev/null +++ b/velox/experimental/cudf/CMakeLists.txt @@ -0,0 +1,19 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_subdirectory(exec) + +if(VELOX_BUILD_TESTING) + add_subdirectory(tests) +endif() diff --git a/velox/experimental/cudf/exec/CMakeLists.txt b/velox/experimental/cudf/exec/CMakeLists.txt new file mode 100644 index 00000000000..430ce71bca6 --- /dev/null +++ b/velox/experimental/cudf/exec/CMakeLists.txt @@ -0,0 +1,32 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_library( + velox_cudf_exec + CudfConversion.cpp + CudfOrderBy.cpp + ToCudf.cpp + Utilities.cpp + VeloxCudfInterop.cpp) + +target_link_libraries( + velox_cudf_exec + cudf::cudf + arrow + velox_arrow_bridge + velox_exception + velox_common_base + velox_exec) + +target_compile_options(velox_cudf_exec PRIVATE -Wno-missing-field-initializers) diff --git a/velox/experimental/cudf/exec/CudfConversion.cpp b/velox/experimental/cudf/exec/CudfConversion.cpp new file mode 100644 index 00000000000..b84dc98f183 --- /dev/null +++ b/velox/experimental/cudf/exec/CudfConversion.cpp @@ -0,0 +1,206 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/experimental/cudf/exec/CudfConversion.h" +#include "velox/experimental/cudf/exec/NvtxHelper.h" +#include "velox/experimental/cudf/exec/Utilities.h" +#include "velox/experimental/cudf/exec/VeloxCudfInterop.h" +#include "velox/experimental/cudf/vector/CudfVector.h" + +#include "velox/exec/Driver.h" +#include "velox/exec/Operator.h" +#include "velox/vector/ComplexVector.h" + +#include +#include +#include + +namespace facebook::velox::cudf_velox { + +namespace { +// Concatenate multiple RowVectors into a single RowVector. +// Copied from AggregationFuzzer.cpp. +RowVectorPtr mergeRowVectors( + const std::vector& results, + velox::memory::MemoryPool* pool) { + VELOX_NVTX_FUNC_RANGE(); + vector_size_t totalCount = 0; + for (const auto& result : results) { + totalCount += result->size(); + } + auto copy = + BaseVector::create(results[0]->type(), totalCount, pool); + auto copyCount = 0; + for (const auto& result : results) { + copy->copy(result.get(), copyCount, 0, result->size()); + copyCount += result->size(); + } + return copy; +} + +cudf::size_type preferredGpuBatchSizeRows( + const facebook::velox::core::QueryConfig& queryConfig) { + constexpr cudf::size_type kDefaultGpuBatchSizeRows = 100000; + const auto batchSize = queryConfig.get( + CudfFromVelox::kGpuBatchSizeRows, kDefaultGpuBatchSizeRows); + VELOX_CHECK_GT(batchSize, 0, "VELOX_CUDF_GPU_BATCH_SIZE_ROWS must be > 0"); + VELOX_CHECK_LE( + batchSize, + std::numeric_limits::max(), + "VELOX_CUDF_GPU_BATCH_SIZE_ROWS must be <= max(vector_size_t)"); + return batchSize; +} +} // namespace + +CudfFromVelox::CudfFromVelox( + int32_t operatorId, + RowTypePtr outputType, + exec::DriverCtx* driverCtx, + std::string planNodeId) + : exec::Operator( + driverCtx, + outputType, + operatorId, + planNodeId, + "CudfFromVelox"), + NvtxHelper(nvtx3::rgb{255, 140, 0}, operatorId) {} // Orange + +void CudfFromVelox::addInput(RowVectorPtr input) { + VELOX_NVTX_OPERATOR_FUNC_RANGE(); + if (input->size() > 0) { + // Materialize lazy vectors + for (auto& child : input->children()) { + child->loadedVector(); + } + input->loadedVector(); + + // Accumulate inputs + inputs_.push_back(input); + currentOutputSize_ += input->size(); + } +} + +RowVectorPtr CudfFromVelox::getOutput() { + VELOX_NVTX_OPERATOR_FUNC_RANGE(); + const auto targetOutputSize = + preferredGpuBatchSizeRows(operatorCtx_->driverCtx()->queryConfig()); + + finished_ = noMoreInput_ && inputs_.empty(); + + if (finished_ or + (currentOutputSize_ < targetOutputSize and not noMoreInput_) or + inputs_.empty()) { + return nullptr; + } + + // Select inputs that don't exceed the max vector size limit + std::vector selectedInputs; + vector_size_t totalSize = 0; + auto const maxVectorSize = std::numeric_limits::max(); + + for (const auto& input : inputs_) { + if (totalSize + input->size() <= maxVectorSize) { + selectedInputs.push_back(input); + totalSize += input->size(); + } else { + break; + } + } + + // Combine selected RowVectors into a single RowVector + auto input = mergeRowVectors(selectedInputs, inputs_[0]->pool()); + + // Remove processed inputs + inputs_.erase(inputs_.begin(), inputs_.begin() + selectedInputs.size()); + currentOutputSize_ -= totalSize; + + // Early return if no input + if (input->size() == 0) { + return nullptr; + } + + // Get a stream from the global stream pool + auto stream = cudfGlobalStreamPool().get_stream(); + + // Convert RowVector to cudf table + auto tbl = with_arrow::toCudfTable(input, input->pool(), stream); + + stream.synchronize(); + + VELOX_CHECK_NOT_NULL(tbl); + + // Return a CudfVector that owns the cudf table + const auto size = tbl->num_rows(); + return std::make_shared( + input->pool(), outputType_, size, std::move(tbl), stream); +} + +void CudfFromVelox::close() { + cudf::get_default_stream().synchronize(); + exec::Operator::close(); + inputs_.clear(); +} + +CudfToVelox::CudfToVelox( + int32_t operatorId, + RowTypePtr outputType, + exec::DriverCtx* driverCtx, + std::string planNodeId) + : exec::Operator( + driverCtx, + outputType, + operatorId, + planNodeId, + "CudfToVelox"), + NvtxHelper(nvtx3::rgb{148, 0, 211}, operatorId) {} // Purple + +void CudfToVelox::addInput(RowVectorPtr input) { + // Accumulate inputs + if (input->size() > 0) { + auto cudfInput = std::dynamic_pointer_cast(input); + VELOX_CHECK_NOT_NULL(cudfInput); + inputs_.push_back(std::move(cudfInput)); + } +} + +RowVectorPtr CudfToVelox::getOutput() { + VELOX_NVTX_OPERATOR_FUNC_RANGE(); + if (finished_ || inputs_.empty()) { + finished_ = noMoreInput_ && inputs_.empty(); + return nullptr; + } + + auto stream = inputs_.front()->stream(); + std::unique_ptr tbl = inputs_.front()->release(); + inputs_.pop_front(); + + VELOX_CHECK_NOT_NULL(tbl); + if (tbl->num_rows() == 0) { + return nullptr; + } + RowVectorPtr output = + with_arrow::toVeloxColumn(tbl->view(), pool(), "", stream); + stream.synchronize(); + finished_ = noMoreInput_ && inputs_.empty(); + output->setType(outputType_); + return output; +} + +void CudfToVelox::close() { + exec::Operator::close(); + inputs_.clear(); +} + +} // namespace facebook::velox::cudf_velox diff --git a/velox/experimental/cudf/exec/CudfConversion.h b/velox/experimental/cudf/exec/CudfConversion.h new file mode 100644 index 00000000000..16ca33d786c --- /dev/null +++ b/velox/experimental/cudf/exec/CudfConversion.h @@ -0,0 +1,100 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/experimental/cudf/exec/NvtxHelper.h" +#include "velox/experimental/cudf/vector/CudfVector.h" + +#include "velox/exec/Driver.h" +#include "velox/exec/Operator.h" +#include "velox/vector/ComplexVector.h" + +#include + +#include +#include +#include + +namespace facebook::velox::cudf_velox { + +class CudfFromVelox : public exec::Operator, public NvtxHelper { + public: + static constexpr const char* kGpuBatchSizeRows = + "velox.cudf.gpu_batch_size_rows"; + + CudfFromVelox( + int32_t operatorId, + RowTypePtr outputType, + exec::DriverCtx* driverCtx, + std::string planNodeId); + + bool needsInput() const override { + return !finished_; + } + + void addInput(RowVectorPtr input) override; + + RowVectorPtr getOutput() override; + + exec::BlockingReason isBlocked(ContinueFuture* /*future*/) override { + return exec::BlockingReason::kNotBlocked; + } + + bool isFinished() override { + return finished_; + } + + void close() override; + + private: + std::vector inputs_; + std::size_t currentOutputSize_ = 0; + bool finished_ = false; +}; + +class CudfToVelox : public exec::Operator, public NvtxHelper { + public: + CudfToVelox( + int32_t operatorId, + RowTypePtr outputType, + exec::DriverCtx* driverCtx, + std::string planNodeId); + + bool needsInput() const override { + return !finished_; + } + + void addInput(RowVectorPtr input) override; + + RowVectorPtr getOutput() override; + + exec::BlockingReason isBlocked(ContinueFuture* /*future*/) override { + return exec::BlockingReason::kNotBlocked; + } + + bool isFinished() override { + return finished_; + } + + void close() override; + + private: + std::deque inputs_; + bool finished_ = false; +}; + +} // namespace facebook::velox::cudf_velox diff --git a/velox/experimental/cudf/exec/CudfOrderBy.cpp b/velox/experimental/cudf/exec/CudfOrderBy.cpp new file mode 100644 index 00000000000..e1f2a011147 --- /dev/null +++ b/velox/experimental/cudf/exec/CudfOrderBy.cpp @@ -0,0 +1,113 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/experimental/cudf/exec/CudfOrderBy.h" +#include "velox/experimental/cudf/exec/NvtxHelper.h" +#include "velox/experimental/cudf/exec/Utilities.h" +#include "velox/experimental/cudf/exec/VeloxCudfInterop.h" + +#include +#include +#include +#include + +namespace facebook::velox::cudf_velox { + +CudfOrderBy::CudfOrderBy( + int32_t operatorId, + exec::DriverCtx* driverCtx, + const std::shared_ptr& orderByNode) + : exec::Operator( + driverCtx, + orderByNode->outputType(), + operatorId, + orderByNode->id(), + "CudfOrderBy"), + NvtxHelper(nvtx3::rgb{64, 224, 208}, operatorId), // Turquoise + orderByNode_(orderByNode) { + sortKeys_.reserve(orderByNode->sortingKeys().size()); + columnOrder_.reserve(orderByNode->sortingKeys().size()); + nullOrder_.reserve(orderByNode->sortingKeys().size()); + for (int i = 0; i < orderByNode->sortingKeys().size(); ++i) { + const auto channel = + exec::exprToChannel(orderByNode->sortingKeys()[i].get(), outputType_); + VELOX_CHECK( + channel != kConstantChannel, + "OrderBy doesn't allow constant sorting keys"); + sortKeys_.push_back(channel); + auto const& sortingOrder = orderByNode->sortingOrders()[i]; + columnOrder_.push_back( + sortingOrder.isAscending() ? cudf::order::ASCENDING + : cudf::order::DESCENDING); + nullOrder_.push_back( + (sortingOrder.isNullsFirst() ^ !sortingOrder.isAscending()) + ? cudf::null_order::BEFORE + : cudf::null_order::AFTER); + } +} + +void CudfOrderBy::addInput(RowVectorPtr input) { + // Accumulate inputs + if (input->size() > 0) { + auto cudfInput = std::dynamic_pointer_cast(input); + VELOX_CHECK_NOT_NULL(cudfInput); + inputs_.push_back(std::move(cudfInput)); + } +} + +void CudfOrderBy::noMoreInput() { + exec::Operator::noMoreInput(); + + VELOX_NVTX_OPERATOR_FUNC_RANGE(); + + if (inputs_.empty()) { + return; + } + + auto stream = cudfGlobalStreamPool().get_stream(); + auto tbl = getConcatenatedTable(inputs_, stream); + + // Release input data after synchronizing + stream.synchronize(); + inputs_.clear(); + + VELOX_CHECK_NOT_NULL(tbl); + + auto keys = tbl->view().select(sortKeys_); + auto values = tbl->view(); + auto result = + cudf::sort_by_key(values, keys, columnOrder_, nullOrder_, stream); + auto const size = result->num_rows(); + outputTable_ = std::make_shared( + pool(), outputType_, size, std::move(result), stream); +} + +RowVectorPtr CudfOrderBy::getOutput() { + if (finished_ || !noMoreInput_) { + return nullptr; + } + finished_ = noMoreInput_; + return outputTable_; +} + +void CudfOrderBy::close() { + exec::Operator::close(); + // Release stored inputs + // Release cudf memory resources + inputs_.clear(); + outputTable_.reset(); +} +} // namespace facebook::velox::cudf_velox diff --git a/velox/experimental/cudf/exec/CudfOrderBy.h b/velox/experimental/cudf/exec/CudfOrderBy.h new file mode 100644 index 00000000000..75e56315746 --- /dev/null +++ b/velox/experimental/cudf/exec/CudfOrderBy.h @@ -0,0 +1,66 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/experimental/cudf/exec/NvtxHelper.h" +#include "velox/experimental/cudf/vector/CudfVector.h" + +#include "velox/exec/Operator.h" +#include "velox/vector/ComplexVector.h" + +#include + +namespace facebook::velox::cudf_velox { + +class CudfOrderBy : public exec::Operator, public NvtxHelper { + public: + CudfOrderBy( + int32_t operatorId, + exec::DriverCtx* driverCtx, + const std::shared_ptr& orderByNode); + + bool needsInput() const override { + return !finished_; + } + + void addInput(RowVectorPtr input) override; + + void noMoreInput() override; + + RowVectorPtr getOutput() override; + + exec::BlockingReason isBlocked(ContinueFuture* /*future*/) override { + return exec::BlockingReason::kNotBlocked; + } + + bool isFinished() override { + return finished_; + } + + void close() override; + + private: + CudfVectorPtr outputTable_; + std::shared_ptr orderByNode_; + std::vector inputs_; + std::vector sortKeys_; + std::vector columnOrder_; + std::vector nullOrder_; + bool finished_{false}; +}; + +} // namespace facebook::velox::cudf_velox diff --git a/velox/experimental/cudf/exec/NvtxHelper.h b/velox/experimental/cudf/exec/NvtxHelper.h new file mode 100644 index 00000000000..dde348e4744 --- /dev/null +++ b/velox/experimental/cudf/exec/NvtxHelper.h @@ -0,0 +1,67 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include + +namespace facebook::velox::cudf_velox { + +class NvtxHelper { + public: + NvtxHelper(); + NvtxHelper(nvtx3::color color, std::optional payload = std::nullopt) + : color_(color), payload_(payload) {} + + nvtx3::color color_{nvtx3::rgb{125, 125, 125}}; // Gray + std::optional payload_{}; +}; + +/** + * @brief Tag type for Velox's NVTX domain. + */ +struct VeloxDomain { + static constexpr char const* name{"velox"}; +}; + +using NvtxRegisteredStringT = nvtx3::registered_string_in; + +#define VELOX_NVTX_OPERATOR_FUNC_RANGE() \ + static_assert( \ + std::is_base_of::type>:: \ + value, \ + "VELOX_NVTX_OPERATOR_FUNC_RANGE can only be used" \ + " in Operators derived from NvtxHelper"); \ + static NvtxRegisteredStringT const nvtx3_func_name__{ \ + std::string(__func__) + " " + std::string(__PRETTY_FUNCTION__)}; \ + static ::nvtx3::event_attributes const nvtx3_func_attr__{ \ + this->payload_.has_value() ? \ + ::nvtx3::event_attributes{nvtx3_func_name__, this->color_, \ + nvtx3::payload{this->payload_.value()}} : \ + ::nvtx3::event_attributes{nvtx3_func_name__, this->color_}}; \ + ::nvtx3::scoped_range_in const nvtx3_range__{nvtx3_func_attr__}; + +#define VELOX_NVTX_PRETTY_FUNC_RANGE() \ + static NvtxRegisteredStringT const nvtx3_func_name__{ \ + std::string(__func__) + " " + std::string(__PRETTY_FUNCTION__)}; \ + static ::nvtx3::event_attributes const nvtx3_func_attr__{nvtx3_func_name__}; \ + ::nvtx3::scoped_range_in const nvtx3_range__{nvtx3_func_attr__}; + +#define VELOX_NVTX_FUNC_RANGE() NVTX3_FUNC_RANGE_IN(VeloxDomain) + +} // namespace facebook::velox::cudf_velox diff --git a/velox/experimental/cudf/exec/ToCudf.cpp b/velox/experimental/cudf/exec/ToCudf.cpp new file mode 100644 index 00000000000..55fb8a27849 --- /dev/null +++ b/velox/experimental/cudf/exec/ToCudf.cpp @@ -0,0 +1,222 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/experimental/cudf/exec/CudfConversion.h" +#include "velox/experimental/cudf/exec/CudfOrderBy.h" +#include "velox/experimental/cudf/exec/ToCudf.h" +#include "velox/experimental/cudf/exec/Utilities.h" + +#include "velox/exec/Driver.h" +#include "velox/exec/FilterProject.h" +#include "velox/exec/HashAggregation.h" +#include "velox/exec/HashBuild.h" +#include "velox/exec/HashProbe.h" +#include "velox/exec/Operator.h" +#include "velox/exec/OrderBy.h" + +#include + +#include + +#include + +DEFINE_bool(velox_cudf_enabled, true, "Enable cuDF-Velox acceleration"); +DEFINE_string(velox_cudf_memory_resource, "async", "Memory resource for cuDF"); +DEFINE_bool(velox_cudf_debug, false, "Enable debug printing"); + +namespace facebook::velox::cudf_velox { + +namespace { + +template +bool isAnyOf(const Base* p) { + return ((dynamic_cast(p) != nullptr) || ...); +} + +} // namespace + +bool CompileState::compile() { + auto operators = driver_.operators(); + + if (FLAGS_velox_cudf_debug) { + std::cout << "Operators before adapting for cuDF: count [" + << operators.size() << "]" << std::endl; + for (auto& op : operators) { + std::cout << " Operator: ID " << op->operatorId() << ": " + << op->toString() << std::endl; + } + } + + // Make sure operator states are initialized. We will need to inspect some of + // them during the transformation. + driver_.initializeOperators(); + + bool replacementsMade = false; + auto ctx = driver_.driverCtx(); + + // Get plan node by id lookup. + auto getPlanNode = [&](const core::PlanNodeId& id) { + auto& nodes = driverFactory_.planNodes; + auto it = + std::find_if(nodes.cbegin(), nodes.cend(), [&id](const auto& node) { + return node->id() == id; + }); + if (it != nodes.end()) { + return *it; + } + VELOX_CHECK(driverFactory_.consumerNode->id() == id); + return driverFactory_.consumerNode; + }; + + auto isSupportedGpuOperator = [](const exec::Operator* op) { + return isAnyOf(op); + }; + + std::vector isSupportedGpuOperators(operators.size()); + std::transform( + operators.begin(), + operators.end(), + isSupportedGpuOperators.begin(), + isSupportedGpuOperator); + + auto acceptsGpuInput = [](const exec::Operator* op) { + return isAnyOf(op); + }; + + auto producesGpuOutput = [](const exec::Operator* op) { + return isAnyOf(op); + }; + + int32_t operatorsOffset = 0; + for (int32_t operatorIndex = 0; operatorIndex < operators.size(); + ++operatorIndex) { + std::vector> replaceOp; + + exec::Operator* oper = operators[operatorIndex]; + auto replacingOperatorIndex = operatorIndex + operatorsOffset; + VELOX_CHECK(oper); + + const bool previousOperatorIsNotGpu = + (operatorIndex > 0 and !isSupportedGpuOperators[operatorIndex - 1]); + const bool nextOperatorIsNotGpu = + (operatorIndex < operators.size() - 1 and + !isSupportedGpuOperators[operatorIndex + 1]); + const bool isLastOperatorOfTask = + driverFactory_.outputDriver and operatorIndex == operators.size() - 1; + + auto id = oper->operatorId(); + if (previousOperatorIsNotGpu and acceptsGpuInput(oper)) { + auto planNode = getPlanNode(oper->planNodeId()); + replaceOp.push_back(std::make_unique( + id, planNode->outputType(), ctx, planNode->id() + "-from-velox")); + replaceOp.back()->initialize(); + } + + if (auto* orderByOp = dynamic_cast(oper)) { + auto id = orderByOp->operatorId(); + auto planNode = std::dynamic_pointer_cast( + getPlanNode(orderByOp->planNodeId())); + VELOX_CHECK(planNode != nullptr); + replaceOp.push_back(std::make_unique(id, ctx, planNode)); + replaceOp.back()->initialize(); + } + + if (producesGpuOutput(oper) and + (nextOperatorIsNotGpu or isLastOperatorOfTask)) { + auto planNode = getPlanNode(oper->planNodeId()); + replaceOp.push_back(std::make_unique( + id, planNode->outputType(), ctx, planNode->id() + "-to-velox")); + replaceOp.back()->initialize(); + } + + if (not replaceOp.empty()) { + operatorsOffset += replaceOp.size() - 1; + [[maybe_unused]] auto replaced = driverFactory_.replaceOperators( + driver_, + replacingOperatorIndex, + replacingOperatorIndex + 1, + std::move(replaceOp)); + replacementsMade = true; + } + } + + if (FLAGS_velox_cudf_debug) { + operators = driver_.operators(); + std::cout << "Operators after adapting for cuDF: count [" + << operators.size() << "]" << std::endl; + for (auto& op : operators) { + std::cout << " Operator: ID " << op->operatorId() << ": " + << op->toString() << std::endl; + } + } + + return replacementsMade; +} + +struct CudfDriverAdapter { + std::shared_ptr mr_; + + CudfDriverAdapter(std::shared_ptr mr) + : mr_(mr) {} + + // Call operator needed by DriverAdapter + bool operator()(const exec::DriverFactory& factory, exec::Driver& driver) { + auto state = CompileState(factory, driver); + auto res = state.compile(); + return res; + } +}; + +static bool isCudfRegistered = false; + +void registerCudf(const CudfOptions& options) { + if (cudfIsRegistered()) { + return; + } + if (!options.cudfEnabled) { + return; + } + + CUDF_FUNC_RANGE(); + cudaFree(nullptr); // Initialize CUDA context at startup + + const std::string mrMode = options.cudfMemoryResource; + auto mr = cudf_velox::createMemoryResource(mrMode); + cudf::set_current_device_resource(mr.get()); + CudfDriverAdapter cda{mr}; + exec::DriverAdapter cudfAdapter{kCudfAdapterName, {}, cda}; + exec::DriverFactory::registerAdapter(cudfAdapter); + isCudfRegistered = true; +} + +void unregisterCudf() { + exec::DriverFactory::adapters.erase( + std::remove_if( + exec::DriverFactory::adapters.begin(), + exec::DriverFactory::adapters.end(), + [](const exec::DriverAdapter& adapter) { + return adapter.label == kCudfAdapterName; + }), + exec::DriverFactory::adapters.end()); + + isCudfRegistered = false; +} + +bool cudfIsRegistered() { + return isCudfRegistered; +} + +} // namespace facebook::velox::cudf_velox diff --git a/velox/experimental/cudf/exec/ToCudf.h b/velox/experimental/cudf/exec/ToCudf.h new file mode 100644 index 00000000000..63fcf0d5dd7 --- /dev/null +++ b/velox/experimental/cudf/exec/ToCudf.h @@ -0,0 +1,64 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/exec/Driver.h" +#include "velox/exec/Operator.h" + +#include + +DECLARE_bool(velox_cudf_enabled); +DECLARE_string(velox_cudf_memory_resource); +DECLARE_bool(velox_cudf_debug); + +namespace facebook::velox::cudf_velox { + +static const std::string kCudfAdapterName = "cuDF"; + +class CompileState { + public: + CompileState(const exec::DriverFactory& driverFactory, exec::Driver& driver) + : driverFactory_(driverFactory), driver_(driver) {} + + exec::Driver& driver() { + return driver_; + } + + // Replaces sequences of Operators in the Driver given at construction with + // cuDF equivalents. Returns true if the Driver was changed. + bool compile(); + + const exec::DriverFactory& driverFactory_; + exec::Driver& driver_; +}; + +struct CudfOptions { + bool cudfEnabled = FLAGS_velox_cudf_enabled; + std::string cudfMemoryResource = FLAGS_velox_cudf_memory_resource; + static CudfOptions defaultOptions() { + return CudfOptions(); + } +}; + +/// Registers adapter to add cuDF operators to Drivers. +void registerCudf(const CudfOptions& options = CudfOptions::defaultOptions()); +void unregisterCudf(); + +/// Returns true if cuDF is registered. +bool cudfIsRegistered(); + +} // namespace facebook::velox::cudf_velox diff --git a/velox/experimental/cudf/exec/Utilities.cpp b/velox/experimental/cudf/exec/Utilities.cpp new file mode 100644 index 00000000000..f861724ee63 --- /dev/null +++ b/velox/experimental/cudf/exec/Utilities.cpp @@ -0,0 +1,143 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/experimental/cudf/exec/Utilities.h" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include + +namespace facebook::velox::cudf_velox { + +namespace { +[[nodiscard]] auto makeCudaMr() { + return std::make_shared(); +} + +[[nodiscard]] auto makePoolMr() { + return rmm::mr::make_owning_wrapper( + makeCudaMr(), rmm::percent_of_free_device_memory(50)); +} + +[[nodiscard]] auto makeAsyncMr() { + return std::make_shared(); +} + +[[nodiscard]] auto makeManagedMr() { + return std::make_shared(); +} + +[[nodiscard]] auto makeArenaMr() { + return rmm::mr::make_owning_wrapper( + makeCudaMr()); +} + +[[nodiscard]] auto makeManagedPoolMr() { + return rmm::mr::make_owning_wrapper( + makeManagedMr(), rmm::percent_of_free_device_memory(50)); +} +} // namespace + +std::shared_ptr createMemoryResource( + std::string_view mode) { + if (mode == "cuda") + return makeCudaMr(); + if (mode == "pool") + return makePoolMr(); + if (mode == "async") + return makeAsyncMr(); + if (mode == "arena") + return makeArenaMr(); + if (mode == "managed") + return makeManagedMr(); + if (mode == "managed_pool") + return makeManagedPoolMr(); + VELOX_FAIL( + "Unknown memory resource mode: " + std::string(mode) + + "\nExpecting: cuda, pool, async, arena, managed, or managed_pool"); +} + +cudf::detail::cuda_stream_pool& cudfGlobalStreamPool() { + return cudf::detail::global_cuda_stream_pool(); +}; + +std::unique_ptr concatenateTables( + std::vector> tables, + rmm::cuda_stream_view stream) { + // Check for empty vector + VELOX_CHECK_GT(tables.size(), 0); + + if (tables.size() == 1) { + return std::move(tables[0]); + } + std::vector tableViews; + tableViews.reserve(tables.size()); + std::transform( + tables.begin(), + tables.end(), + std::back_inserter(tableViews), + [&](const auto& tbl) { return tbl->view(); }); + return cudf::concatenate( + tableViews, stream, cudf::get_current_device_resource_ref()); +} + +std::unique_ptr getConcatenatedTable( + std::vector& tables, + rmm::cuda_stream_view stream) { + // Check for empty vector + VELOX_CHECK_GT(tables.size(), 0); + + auto inputStreams = std::vector(); + auto tableViews = std::vector(); + + inputStreams.reserve(tables.size()); + tableViews.reserve(tables.size()); + + for (const auto& table : tables) { + VELOX_CHECK_NOT_NULL(table); + tableViews.push_back(table->getTableView()); + inputStreams.push_back(table->stream()); + } + + cudf::detail::join_streams(inputStreams, stream); + + if (tables.size() == 1) { + return tables[0]->release(); + } + + auto output = cudf::concatenate( + tableViews, stream, cudf::get_current_device_resource_ref()); + stream.synchronize(); + return output; +} + +} // namespace facebook::velox::cudf_velox diff --git a/velox/experimental/cudf/exec/Utilities.h b/velox/experimental/cudf/exec/Utilities.h new file mode 100644 index 00000000000..31f529bb34a --- /dev/null +++ b/velox/experimental/cudf/exec/Utilities.h @@ -0,0 +1,54 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/experimental/cudf/vector/CudfVector.h" + +#include +#include + +#include + +#include +#include + +namespace facebook::velox::cudf_velox { + +/** + * @brief Creates a memory resource based on the given mode. + */ +[[nodiscard]] std::shared_ptr +createMemoryResource(std::string_view mode); + +/** + * @brief Returns the global CUDA stream pool used by cudf. + */ +[[nodiscard]] cudf::detail::cuda_stream_pool& cudfGlobalStreamPool(); + +// Concatenate a vector of cuDF tables into a single table +[[nodiscard]] std::unique_ptr concatenateTables( + std::vector> tables, + rmm::cuda_stream_view stream); + +// Concatenate a vector of cuDF tables into a single table. +// This function joins the streams owned by individual tables on the passed +// stream. Inputs are not safe to use after calling this function. +[[nodiscard]] std::unique_ptr getConcatenatedTable( + std::vector& tables, + rmm::cuda_stream_view stream); + +} // namespace facebook::velox::cudf_velox diff --git a/velox/experimental/cudf/exec/VeloxCudfInterop.cpp b/velox/experimental/cudf/exec/VeloxCudfInterop.cpp new file mode 100644 index 00000000000..7a2ad969397 --- /dev/null +++ b/velox/experimental/cudf/exec/VeloxCudfInterop.cpp @@ -0,0 +1,145 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/experimental/cudf/exec/NvtxHelper.h" +#include "velox/experimental/cudf/exec/Utilities.h" +#include "velox/experimental/cudf/exec/VeloxCudfInterop.h" + +#include "velox/common/memory/Memory.h" +#include "velox/type/Type.h" +#include "velox/vector/BaseVector.h" +#include "velox/vector/ComplexVector.h" +#include "velox/vector/DictionaryVector.h" +#include "velox/vector/FlatVector.h" +#include "velox/vector/arrow/Bridge.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +#include +#include + +namespace facebook::velox::cudf_velox { + +namespace with_arrow { + +std::unique_ptr toCudfTable( + const facebook::velox::RowVectorPtr& veloxTable, + facebook::velox::memory::MemoryPool* pool, + rmm::cuda_stream_view stream) { + // Need to flattenDictionary and flattenConstant, otherwise we observe issues + // in the null mask. + ArrowOptions arrowOptions{true, true}; + ArrowArray arrowArray; + exportToArrow( + std::dynamic_pointer_cast(veloxTable), + arrowArray, + pool, + arrowOptions); + ArrowSchema arrowSchema; + exportToArrow( + std::dynamic_pointer_cast(veloxTable), + arrowSchema, + arrowOptions); + auto tbl = cudf::from_arrow(&arrowSchema, &arrowArray, stream); + + // Release Arrow resources + if (arrowArray.release) { + arrowArray.release(&arrowArray); + } + if (arrowSchema.release) { + arrowSchema.release(&arrowSchema); + } + return tbl; +} + +namespace { + +RowVectorPtr toVeloxColumn( + const cudf::table_view& table, + memory::MemoryPool* pool, + const std::vector& metadata, + rmm::cuda_stream_view stream) { + auto arrowDeviceArray = cudf::to_arrow_host(table, stream); + auto& arrowArray = arrowDeviceArray->array; + + auto arrowSchema = cudf::to_arrow_schema(table, metadata); + auto veloxTable = importFromArrowAsOwner(*arrowSchema, arrowArray, pool); + // BaseVector to RowVector + auto castedPtr = + std::dynamic_pointer_cast(veloxTable); + VELOX_CHECK_NOT_NULL(castedPtr); + return castedPtr; +} + +template +std::vector +getMetadata(Iterator begin, Iterator end, const std::string& namePrefix) { + std::vector metadata; + int i = 0; + for (auto c = begin; c < end; c++) { + metadata.push_back(cudf::column_metadata(namePrefix + std::to_string(i))); + metadata.back().children_meta = getMetadata( + c->child_begin(), c->child_end(), namePrefix + std::to_string(i)); + i++; + } + return metadata; +} + +} // namespace + +facebook::velox::RowVectorPtr toVeloxColumn( + const cudf::table_view& table, + facebook::velox::memory::MemoryPool* pool, + std::string namePrefix, + rmm::cuda_stream_view stream) { + auto metadata = getMetadata(table.begin(), table.end(), namePrefix); + return toVeloxColumn(table, pool, metadata, stream); +} + +RowVectorPtr toVeloxColumn( + const cudf::table_view& table, + memory::MemoryPool* pool, + const std::vector& columnNames, + rmm::cuda_stream_view stream) { + std::vector metadata; + for (auto name : columnNames) { + metadata.emplace_back(cudf::column_metadata(name)); + } + return toVeloxColumn(table, pool, metadata, stream); +} + +} // namespace with_arrow +} // namespace facebook::velox::cudf_velox diff --git a/velox/experimental/cudf/exec/VeloxCudfInterop.h b/velox/experimental/cudf/exec/VeloxCudfInterop.h new file mode 100644 index 00000000000..529045245f7 --- /dev/null +++ b/velox/experimental/cudf/exec/VeloxCudfInterop.h @@ -0,0 +1,45 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/common/memory/Memory.h" +#include "velox/vector/BaseVector.h" +#include "velox/vector/ComplexVector.h" + +#include +#include +#include + +namespace facebook::velox::cudf_velox::with_arrow { +std::unique_ptr toCudfTable( + const facebook::velox::RowVectorPtr& veloxTable, + facebook::velox::memory::MemoryPool* pool, + rmm::cuda_stream_view stream); + +facebook::velox::RowVectorPtr toVeloxColumn( + const cudf::table_view& table, + facebook::velox::memory::MemoryPool* pool, + std::string namePrefix, + rmm::cuda_stream_view stream); + +facebook::velox::RowVectorPtr toVeloxColumn( + const cudf::table_view& table, + facebook::velox::memory::MemoryPool* pool, + const std::vector& columnNames, + rmm::cuda_stream_view stream); + +} // namespace facebook::velox::cudf_velox::with_arrow diff --git a/velox/experimental/cudf/tests/CMakeLists.txt b/velox/experimental/cudf/tests/CMakeLists.txt new file mode 100644 index 00000000000..1847f00cd6d --- /dev/null +++ b/velox/experimental/cudf/tests/CMakeLists.txt @@ -0,0 +1,33 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_executable(velox_cudf_order_by_test Main.cpp OrderByTest.cpp) + +add_test( + NAME velox_cudf_order_by_test + COMMAND velox_cudf_order_by_test + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) + +set_tests_properties(velox_cudf_order_by_test PROPERTIES LABELS cuda_driver + TIMEOUT 3000) + +target_link_libraries( + velox_cudf_order_by_test + velox_cudf_exec + velox_exec + velox_exec_test_lib + velox_test_util + gtest + gtest_main + fmt::fmt) diff --git a/velox/experimental/cudf/tests/Main.cpp b/velox/experimental/cudf/tests/Main.cpp new file mode 100644 index 00000000000..164b6422fe8 --- /dev/null +++ b/velox/experimental/cudf/tests/Main.cpp @@ -0,0 +1,29 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/common/process/ThreadDebugInfo.h" + +#include +#include +#include + +// This main is needed for some tests on linux. +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + // Signal handler required for ThreadDebugInfoTest + facebook::velox::process::addDefaultFatalSignalHandler(); + folly::Init init(&argc, &argv, false); + return RUN_ALL_TESTS(); +} diff --git a/velox/experimental/cudf/tests/OrderByTest.cpp b/velox/experimental/cudf/tests/OrderByTest.cpp new file mode 100644 index 00000000000..d3eb5a75867 --- /dev/null +++ b/velox/experimental/cudf/tests/OrderByTest.cpp @@ -0,0 +1,322 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/experimental/cudf/exec/ToCudf.h" +#include "velox/experimental/cudf/exec/Utilities.h" + +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/core/QueryConfig.h" +#include "velox/dwio/common/tests/utils/BatchMaker.h" +#include "velox/exec/PlanNodeStats.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/OperatorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" + +#include +#include + +using namespace facebook::velox; +using namespace facebook::velox::exec; +using namespace facebook::velox::exec::test; +using namespace facebook::velox::common::testutil; + +namespace { + +class OrderByTest : public OperatorTestBase { + protected: + void SetUp() override { + OperatorTestBase::SetUp(); + filesystems::registerLocalFileSystem(); + cudf_velox::registerCudf(); + rng_.seed(123); + + rowType_ = ROW( + {{"c0", INTEGER()}, + {"c1", INTEGER()}, + {"c2", VARCHAR()}, + {"c3", VARCHAR()}}); + } + + void TearDown() override { + cudf_velox::unregisterCudf(); + OperatorTestBase::TearDown(); + } + + void testSingleKey( + const std::vector& input, + const std::string& key) { + core::PlanNodeId orderById; + auto keyIndex = input[0]->type()->asRow().getChildIdx(key); + auto plan = PlanBuilder() + .values(input) + .orderBy({fmt::format("{} ASC NULLS LAST", key)}, false) + .capturePlanNodeId(orderById) + .planNode(); + runTest( + plan, + orderById, + fmt::format("SELECT * FROM tmp ORDER BY {} NULLS LAST", key), + {keyIndex}); + + plan = PlanBuilder() + .values(input) + .orderBy({fmt::format("{} DESC NULLS FIRST", key)}, false) + .planNode(); + runTest( + plan, + orderById, + fmt::format("SELECT * FROM tmp ORDER BY {} DESC NULLS FIRST", key), + {keyIndex}); + } + + void testSingleKey( + const std::vector& input, + const std::string& key, + const std::string& filter) { + core::PlanNodeId orderById; + auto keyIndex = input[0]->type()->asRow().getChildIdx(key); + auto plan = PlanBuilder() + .values(input) + .filter(filter) + .orderBy({fmt::format("{} ASC NULLS LAST", key)}, false) + .capturePlanNodeId(orderById) + .planNode(); + runTest( + plan, + orderById, + fmt::format( + "SELECT * FROM tmp WHERE {} ORDER BY {} NULLS LAST", filter, key), + {keyIndex}); + + plan = PlanBuilder() + .values(input) + .filter(filter) + .orderBy({fmt::format("{} DESC NULLS FIRST", key)}, false) + .capturePlanNodeId(orderById) + .planNode(); + runTest( + plan, + orderById, + fmt::format( + "SELECT * FROM tmp WHERE {} ORDER BY {} DESC NULLS FIRST", + filter, + key), + {keyIndex}); + } + + void testTwoKeys( + const std::vector& input, + const std::string& key1, + const std::string& key2) { + auto& rowType = input[0]->type()->asRow(); + auto keyIndices = {rowType.getChildIdx(key1), rowType.getChildIdx(key2)}; + + std::vector sortOrders = { + core::kAscNullsLast, core::kDescNullsFirst}; + std::vector sortOrderSqls = {"NULLS LAST", "DESC NULLS FIRST"}; + + for (int i = 0; i < sortOrders.size(); i++) { + for (int j = 0; j < sortOrders.size(); j++) { + core::PlanNodeId orderById; + auto plan = PlanBuilder() + .values(input) + .orderBy( + {fmt::format("{} {}", key1, sortOrderSqls[i]), + fmt::format("{} {}", key2, sortOrderSqls[j])}, + false) + .capturePlanNodeId(orderById) + .planNode(); + runTest( + plan, + orderById, + fmt::format( + "SELECT * FROM tmp ORDER BY {} {}, {} {}", + key1, + sortOrderSqls[i], + key2, + sortOrderSqls[j]), + keyIndices); + } + } + } + + void runTest( + core::PlanNodePtr planNode, + const core::PlanNodeId& orderById, + const std::string& duckDbSql, + const std::vector& sortingKeys) { + { + SCOPED_TRACE("run without spilling"); + assertQueryOrdered(planNode, duckDbSql, sortingKeys); + } + } + + std::vector makeVectors( + const RowTypePtr& rowType, + int32_t numVectors, + int32_t rowsPerVector) { + std::vector vectors; + for (int32_t i = 0; i < numVectors; ++i) { + auto vector = std::dynamic_pointer_cast( + facebook::velox::test::BatchMaker::createBatch( + rowType, rowsPerVector, *pool_)); + vectors.push_back(vector); + } + return vectors; + } + + folly::Random::DefaultGenerator rng_; + RowTypePtr rowType_; +}; + +TEST_F(OrderByTest, selectiveFilter) { + vector_size_t batchSize = 1000; + std::vector vectors; + for (int32_t i = 0; i < 3; ++i) { + auto c0 = makeFlatVector( + batchSize, + [&](vector_size_t row) { return batchSize * i + row; }, + nullEvery(5)); + auto c1 = makeFlatVector( + batchSize, [&](vector_size_t row) { return row; }, nullEvery(5)); + auto c2 = makeFlatVector( + batchSize, [](vector_size_t row) { return row * 0.1; }, nullEvery(11)); + vectors.push_back(makeRowVector({c0, c1, c2})); + } + createDuckDbTable(vectors); + + // c0 values are unique across batches + testSingleKey(vectors, "c0", "c0 % 333 = 0"); + + // c1 values are unique only within a batch + testSingleKey(vectors, "c1", "c1 % 333 = 0"); +} + +TEST_F(OrderByTest, singleKey) { + vector_size_t batchSize = 1000; + std::vector vectors; + for (int32_t i = 0; i < 2; ++i) { + auto c0 = makeFlatVector( + batchSize, [&](vector_size_t row) { return row; }, nullEvery(5)); + auto c1 = makeFlatVector( + batchSize, [](vector_size_t row) { return row * 0.1; }, nullEvery(11)); + vectors.push_back(makeRowVector({c0, c1})); + } + createDuckDbTable(vectors); + + testSingleKey(vectors, "c0"); + + // parser doesn't support "is not null" expression, hence, using c0 % 2 >= 0 + testSingleKey(vectors, "c0", "c0 % 2 >= 0"); + + core::PlanNodeId orderById; + auto plan = PlanBuilder() + .values(vectors) + .orderBy({"c0 DESC NULLS LAST"}, false) + .capturePlanNodeId(orderById) + .planNode(); + runTest( + plan, orderById, "SELECT * FROM tmp ORDER BY c0 DESC NULLS LAST", {0}); + + plan = PlanBuilder() + .values(vectors) + .orderBy({"c0 ASC NULLS FIRST"}, false) + .capturePlanNodeId(orderById) + .planNode(); + runTest(plan, orderById, "SELECT * FROM tmp ORDER BY c0 NULLS FIRST", {0}); +} + +TEST_F(OrderByTest, multipleKeys) { + vector_size_t batchSize = 1000; + std::vector vectors; + for (int32_t i = 0; i < 2; ++i) { + // c0: half of rows are null, a quarter is 0 and remaining quarter is 1 + auto c0 = makeFlatVector( + batchSize, [](vector_size_t row) { return row % 4; }, nullEvery(2, 1)); + auto c1 = makeFlatVector( + batchSize, [](vector_size_t row) { return row; }, nullEvery(7)); + auto c2 = makeFlatVector( + batchSize, [](vector_size_t row) { return row * 0.1; }, nullEvery(11)); + vectors.push_back(makeRowVector({c0, c1, c2})); + } + createDuckDbTable(vectors); + + testTwoKeys(vectors, "c0", "c1"); + + core::PlanNodeId orderById; + auto plan = PlanBuilder() + .values(vectors) + .orderBy({"c0 ASC NULLS FIRST", "c1 ASC NULLS LAST"}, false) + .capturePlanNodeId(orderById) + .planNode(); + runTest( + plan, + orderById, + "SELECT * FROM tmp ORDER BY c0 NULLS FIRST, c1 NULLS LAST", + {0, 1}); + + plan = PlanBuilder() + .values(vectors) + .orderBy({"c0 DESC NULLS LAST", "c1 DESC NULLS FIRST"}, false) + .capturePlanNodeId(orderById) + .planNode(); + runTest( + plan, + orderById, + "SELECT * FROM tmp ORDER BY c0 DESC NULLS LAST, c1 DESC NULLS FIRST", + {0, 1}); +} + +TEST_F(OrderByTest, multiBatchResult) { + vector_size_t batchSize = 5000; + std::vector vectors; + for (int32_t i = 0; i < 10; ++i) { + auto c0 = makeFlatVector( + batchSize, + [&](vector_size_t row) { return batchSize * i + row; }, + nullEvery(5)); + auto c1 = makeFlatVector( + batchSize, [](vector_size_t row) { return row * 0.1; }, nullEvery(11)); + vectors.push_back(makeRowVector({c0, c1, c1, c1, c1, c1})); + } + createDuckDbTable(vectors); + + testSingleKey(vectors, "c0"); +} + +TEST_F(OrderByTest, varfields) { + vector_size_t batchSize = 1000; + std::vector vectors; + for (int32_t i = 0; i < 5; ++i) { + auto c0 = makeFlatVector( + batchSize, + [&](vector_size_t row) { return batchSize * i + row; }, + nullEvery(5)); + auto c1 = makeFlatVector( + batchSize, [](vector_size_t row) { return row * 0.1; }, nullEvery(11)); + auto c2 = makeFlatVector( + batchSize, + [](vector_size_t row) { + return StringView::makeInline(std::to_string(row)); + }, + nullEvery(17)); + vectors.push_back(makeRowVector({c0, c1, c2})); + } + createDuckDbTable(vectors); + + testSingleKey(vectors, "c2"); +} + +} // namespace diff --git a/velox/experimental/cudf/vector/CudfVector.h b/velox/experimental/cudf/vector/CudfVector.h new file mode 100644 index 00000000000..db1590b3c08 --- /dev/null +++ b/velox/experimental/cudf/vector/CudfVector.h @@ -0,0 +1,69 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/buffer/Buffer.h" +#include "velox/common/memory/MemoryPool.h" +#include "velox/vector/ComplexVector.h" +#include "velox/vector/TypeAliases.h" + +#include +#include + +#include +#include + +namespace facebook::velox::cudf_velox { + +// Vector class which holds GPU data from cuDF. +class CudfVector : public RowVector { + public: + CudfVector( + velox::memory::MemoryPool* pool, + TypePtr type, + vector_size_t size, + std::unique_ptr&& table, + rmm::cuda_stream_view stream) + : RowVector( + pool, + std::move(type), + BufferPtr(nullptr), + size, + std::vector(), + std::nullopt), + table_{std::move(table)}, + stream_{stream} {} + + rmm::cuda_stream_view stream() const { + return stream_; + } + + cudf::table_view getTableView() const { + return table_->view(); + } + + std::unique_ptr&& release() { + return std::move(table_); + } + + private: + std::unique_ptr table_; + rmm::cuda_stream_view stream_; +}; + +using CudfVectorPtr = std::shared_ptr; + +} // namespace facebook::velox::cudf_velox